diff --git a/libs/libblade/src/blade_connection.c b/libs/libblade/src/blade_connection.c index d548c477de..eadbea4adb 100644 --- a/libs/libblade/src/blade_connection.c +++ b/libs/libblade/src/blade_connection.c @@ -50,6 +50,8 @@ struct blade_connection_s { ks_rwl_t *lock; ks_q_t *sending; + + const char *session; }; void *blade_connection_state_thread(ks_thread_t *thread, void *data); @@ -147,6 +149,8 @@ KS_DECLARE(ks_status_t) blade_connection_shutdown(blade_connection_t *bc) bc->shutdown = KS_FALSE; } + if (bc->session) ks_pool_free(bc->pool, &bc->session); + while (ks_q_trypop(bc->sending, (void **)&json) == KS_STATUS_SUCCESS && json) cJSON_Delete(json); return KS_STATUS_SUCCESS; @@ -304,6 +308,20 @@ KS_DECLARE(ks_status_t) blade_connection_sending_pop(blade_connection_t *bc, cJS return ks_q_trypop(bc->sending, (void **)json); } +KS_DECLARE(const char *) blade_connection_session_get(blade_connection_t *bc) +{ + ks_assert(bc); + + return bc->session; +} + +KS_DECLARE(void) blade_connection_session_set(blade_connection_t *bc, const char *id) +{ + ks_assert(bc); + + if (bc->session) ks_pool_free(bc->pool, &bc->session); + bc->session = ks_pstrdup(bc->pool, id); +} void *blade_connection_state_thread(ks_thread_t *thread, void *data) { @@ -369,11 +387,18 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data) blade_connection_state_set(bc, BLADE_CONNECTION_STATE_ATTACH); break; case BLADE_CONNECTION_STATE_ATTACH: - // @todo receive message with nullable session id for reconnect and some sort of secure token for a reconnect challenge? - // determine how much of session management is handled here... do we process these session negotiation messages without - // passing it up to the application layer? or does the application layer give back a session and build the response? - blade_connection_state_set(bc, BLADE_CONNECTION_STATE_READY); - break; + { + blade_session_t *bs = blade_handle_sessions_get(bc->handle, bc->session); + ks_assert(bs); // should not happen because bs should still be locked + + blade_session_connections_add(bs, bc->id); + + blade_connection_state_set(bc, BLADE_CONNECTION_STATE_READY); + blade_session_state_set(bs, BLADE_SESSION_STATE_READY); + + blade_session_read_unlock(bs); // unlock the session we expect to be locked during the callback to ensure we can finish attaching + break; + } case BLADE_CONNECTION_STATE_DETACH: // @todo detach from session if this connection is attached blade_connection_state_set(bc, BLADE_CONNECTION_STATE_DISCONNECT); diff --git a/libs/libblade/src/blade_module_wss.c b/libs/libblade/src/blade_module_wss.c index 8ca224cf9f..429fdba344 100644 --- a/libs/libblade/src/blade_module_wss.c +++ b/libs/libblade/src/blade_module_wss.c @@ -895,35 +895,104 @@ blade_connection_state_hook_t blade_transport_wss_on_state_connect_outbound(blad blade_connection_state_hook_t blade_transport_wss_on_state_attach_inbound(blade_connection_t *bc, blade_connection_state_condition_t condition) { + blade_connection_state_hook_t ret = BLADE_CONNECTION_STATE_HOOK_SUCCESS; + blade_transport_wss_t *bt_wss = NULL; + cJSON *json = NULL; + cJSON *params = NULL; + blade_session_t *bs = NULL; + blade_handle_t *bh = NULL; + const char *jsonrpc = NULL; + const char *method = NULL; + const char *id = NULL; + const char *sid = NULL; + ks_time_t timeout; + ks_assert(bc); + bh = blade_connection_handle_get(bc); + ks_assert(bh); + ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition); - // @todo block while reading expected message with blade_transport_wss_read(bt_wss, json) + bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc); - // @todo check if expected message is a request by confirming it has a method field (along with json field validation, stay compliant with jsonrpc) + // @todo very temporary, really need monotonic clock and get timeout delay and sleep delay from config + timeout = ks_time_now() + (5 * KS_USEC_PER_SEC); + while (blade_transport_wss_read(bt_wss, &json) == KS_STATUS_SUCCESS) { + if (json) break; + ks_sleep(250); + if (ks_time_now() >= timeout) break; + } - // @todo validate method is "blade.session.attach" + if (!json) { + // @todo error logging + ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT; + goto done; + } - // @todo validate parameters "session-id" and "session-token" must both be present or omitted, validate both are strings and valid uuid format - // if both are omitted, params may be omitted entirely by jsonrpc spec + // @todo validation wrapper for request and response/error to confirm jsonrpc and provide enum for output as to which it is + jsonrpc = cJSON_GetObjectCstr(json, "jsonrpc"); // @todo check for definitions of these keys and fixed values + if (!jsonrpc || strcmp(jsonrpc, "2.0")) { + // @todo error logging + ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT; + goto done; + } - // @todo if session-id is provided, lookup existing session within the blade_handle_t + id = cJSON_GetObjectCstr(json, "id"); // @todo switch to number if we are not using a uuid for message id + if (!id) { + // @todo error logging + ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT; + goto done; + } - // @todo if the session exists, verify the session-token, if it matches then use this session + method = cJSON_GetObjectCstr(json, "method"); + if (!method || strcasecmp(method, "blade.session.attach")) { + // @todo error logging + ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT; + goto done; + } - // @todo if the session-token does not match, or the session does not exist, or the session-id and session-token are not provided then create a new session + params = cJSON_GetObjectItem(json, "params"); + if (params) { + sid = cJSON_GetObjectCstr(params, "session-id"); + if (sid) { + // @todo validate uuid format by parsing, not currently available in uuid functions + ks_log(KS_LOG_DEBUG, "Session Requested: %s\n", sid); + } + } - // @todo once session is established, associate it to the connection - - // @todo if anything fails, return HOOK_DISCONNECT, otherwise return HOOK_SUCCESS which will continue the rest of the session attaching process - // which is to grab the expected session off the connection and attach the connection to the connection list on the session, start the session thread if - // it hasn't already been started, and set the session state to CONNECT or ATTACH... discuss with tony, finalize session state machine regarding multiple - // connections attempting to attach at the same time to the session and changing the session state, may need to queue pending connections to the session - // and process them from within the session state machine thread + if (sid) { + bs = blade_handle_sessions_get(bh, sid); // bs comes out read locked if not null to prevent it being cleaned up before we are done + if (bs) { + ks_log(KS_LOG_DEBUG, "Session Located: %s\n", blade_session_id_get(bs)); + } + } - ks_sleep_ms(1000); // @todo temporary testing, remove this and return success once negotiations are done - return BLADE_CONNECTION_STATE_HOOK_BYPASS; + if (!bs) { + blade_session_create(&bs, bh); + ks_assert(bs); + + ks_log(KS_LOG_DEBUG, "Session Created: %s\n", blade_session_id_get(bs)); + + blade_session_read_lock(bs, KS_TRUE); // this will be done by blade_handle_sessions_get() otherwise + + if (blade_session_startup(bs) != KS_STATUS_SUCCESS) { + blade_session_read_unlock(bs); + blade_session_destroy(&bs); + ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT; + goto done; + } + blade_handle_sessions_add(bs); + } + + blade_connection_session_set(bc, blade_session_id_get(bs)); + + done: + // @note the state machine expects if we return SUCCESS, that the session assigned to the connection will be read locked to ensure that the state + // machine can finish attaching the session, if you BYPASS then you can handle everything here in the callback, but this should be fairly standard + // behaviour to simply go as far as assigning a session to the connection and let the system handle the rest + if (json) cJSON_Delete(json); + return ret; } blade_connection_state_hook_t blade_transport_wss_on_state_attach_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition) @@ -961,6 +1030,7 @@ blade_connection_state_hook_t blade_transport_wss_on_state_detach(blade_connecti ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition); + ks_sleep(1000); return BLADE_CONNECTION_STATE_HOOK_SUCCESS; } @@ -970,6 +1040,7 @@ blade_connection_state_hook_t blade_transport_wss_on_state_ready(blade_connectio ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition); + ks_sleep(1000); return BLADE_CONNECTION_STATE_HOOK_SUCCESS; } diff --git a/libs/libblade/src/blade_session.c b/libs/libblade/src/blade_session.c index 950a510838..fbfd227254 100644 --- a/libs/libblade/src/blade_session.c +++ b/libs/libblade/src/blade_session.c @@ -42,6 +42,7 @@ struct blade_session_s { blade_session_state_t state; const char *id; + ks_rwl_t *lock; list_t connections; ks_q_t *sending; @@ -69,6 +70,9 @@ KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle ks_uuid(&id); bs->id = ks_uuid_str(pool, &id); + ks_rwl_create(&bs->lock, pool); + ks_assert(bs->lock); + list_init(&bs->connections); ks_q_create(&bs->sending, pool, 0); ks_assert(bs->sending); @@ -95,6 +99,8 @@ KS_DECLARE(ks_status_t) blade_session_destroy(blade_session_t **bsP) ks_q_destroy(&bs->receiving); ks_q_destroy(&bs->sending); + ks_rwl_destroy(&bs->lock); + ks_pool_free(bs->pool, &bs->id); ks_pool_free(bs->pool, bsP); @@ -138,9 +144,24 @@ KS_DECLARE(ks_status_t) blade_session_shutdown(blade_session_t *bs) while (ks_q_trypop(bs->sending, (void **)&json) == KS_STATUS_SUCCESS && json) cJSON_Delete(json); while (ks_q_trypop(bs->receiving, (void **)&json) == KS_STATUS_SUCCESS && json) cJSON_Delete(json); + list_iterator_start(&bs->connections); + while (list_iterator_hasnext(&bs->connections)) { + const char *id = (const char *)list_iterator_next(&bs->connections); + ks_pool_free(bs->pool, &id); + } + list_iterator_stop(&bs->connections); + list_clear(&bs->connections); + return KS_STATUS_SUCCESS; } +KS_DECLARE(blade_handle_t *) blade_session_handle_get(blade_session_t *bs) +{ + ks_assert(bs); + + return bs->handle; +} + KS_DECLARE(const char *) blade_session_id_get(blade_session_t *bs) { ks_assert(bs); @@ -157,6 +178,43 @@ KS_DECLARE(void) blade_session_id_set(blade_session_t *bs, const char *id) bs->id = ks_pstrdup(bs->pool, id); } +KS_DECLARE(ks_status_t) blade_session_read_lock(blade_session_t *bs, ks_bool_t block) +{ + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(bs); + + if (block) ret = ks_rwl_read_lock(bs->lock); + else ret = ks_rwl_try_read_lock(bs->lock); + return ret; +} + +KS_DECLARE(ks_status_t) blade_session_read_unlock(blade_session_t *bs) +{ + ks_assert(bs); + + return ks_rwl_read_unlock(bs->lock); +} + +KS_DECLARE(ks_status_t) blade_session_write_lock(blade_session_t *bs, ks_bool_t block) +{ + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(bs); + + if (block) ret = ks_rwl_write_lock(bs->lock); + else ret = ks_rwl_try_write_lock(bs->lock); + return ret; +} + +KS_DECLARE(ks_status_t) blade_session_write_unlock(blade_session_t *bs) +{ + ks_assert(bs); + + return ks_rwl_write_unlock(bs->lock); +} + + KS_DECLARE(void) blade_session_state_set(blade_session_t *bs, blade_session_state_t state) { ks_assert(bs); @@ -172,6 +230,41 @@ KS_DECLARE(void) blade_session_hangup(blade_session_t *bs) blade_session_state_set(bs, BLADE_SESSION_STATE_HANGUP); } +KS_DECLARE(ks_status_t) blade_session_connections_add(blade_session_t *bs, const char *id) +{ + ks_status_t ret = KS_STATUS_SUCCESS; + const char *cid = NULL; + + ks_assert(bs); + + cid = ks_pstrdup(bs->pool, id); + ks_assert(cid); + + list_append(&bs->connections, cid); + + return ret; +} + +KS_DECLARE(ks_status_t) blade_session_connections_remove(blade_session_t *bs, const char *id) +{ + ks_status_t ret = KS_STATUS_SUCCESS; + uint32_t size = 0; + + ks_assert(bs); + + size = list_size(&bs->connections); + for (uint32_t i = 0; i < size; ++i) { + const char *cid = (const char *)list_get_at(&bs->connections, i); + if (!strcasecmp(cid, id)) { + list_delete_at(&bs->connections, i); + ks_pool_free(bs->pool, &cid); + break; + } + } + + return ret; +} + ks_status_t blade_session_connections_choose(blade_session_t *bs, cJSON *json, blade_connection_t **bcP) { blade_connection_t *bc = NULL; @@ -194,6 +287,7 @@ ks_status_t blade_session_connections_choose(blade_session_t *bs, cJSON *json, b // @todo error logging... this shouldn't happen return KS_STATUS_FAIL; } + // @todo make sure the connection is in the READY state before allowing it to be choosen, just in case it is detaching or not quite fully attached *bcP = bc; @@ -272,6 +366,12 @@ void *blade_session_state_thread(ks_thread_t *thread, void *data) // @todo detach from session if this connection is attached blade_session_state_set(bs, BLADE_SESSION_STATE_DESTROY); break; + case BLADE_SESSION_STATE_CONNECT: + break; + case BLADE_SESSION_STATE_ATTACH: + break; + case BLADE_SESSION_STATE_DETACH: + break; case BLADE_SESSION_STATE_READY: // @todo pop from session receiving queue and pass to blade_protocol_process() break; diff --git a/libs/libblade/src/blade_stack.c b/libs/libblade/src/blade_stack.c index 6ec4af3a29..5977c091d7 100644 --- a/libs/libblade/src/blade_stack.c +++ b/libs/libblade/src/blade_stack.c @@ -419,6 +419,60 @@ KS_DECLARE(ks_status_t) blade_handle_connections_remove(blade_connection_t *bc) } +KS_DECLARE(blade_session_t *) blade_handle_sessions_get(blade_handle_t *bh, const char *sid) +{ + blade_session_t *bs = NULL; + + ks_assert(bs); + ks_assert(sid); + + ks_hash_read_lock(bh->sessions); + bs = ks_hash_search(bh->sessions, (void *)sid, KS_UNLOCKED); + if (bs && blade_session_read_lock(bs, KS_FALSE) != KS_STATUS_SUCCESS) bs = NULL; + ks_hash_read_unlock(bh->sessions); + + return bs; +} + +KS_DECLARE(ks_status_t) blade_handle_sessions_add(blade_session_t *bs) +{ + ks_status_t ret = KS_STATUS_SUCCESS; + blade_handle_t *bh = NULL; + + ks_assert(bs); + + bh = blade_session_handle_get(bs); + ks_assert(bh); + + ks_hash_write_lock(bh->sessions); + ret = ks_hash_insert(bh->sessions, (void *)blade_session_id_get(bs), bs); + ks_hash_write_unlock(bh->sessions); + + return ret; +} + +KS_DECLARE(ks_status_t) blade_handle_sessions_remove(blade_session_t *bs) +{ + ks_status_t ret = KS_STATUS_SUCCESS; + blade_handle_t *bh = NULL; + + ks_assert(bs); + + bh = blade_session_handle_get(bs); + ks_assert(bh); + + blade_session_write_lock(bs, KS_TRUE); + + ks_hash_write_lock(bh->sessions); + if (ks_hash_remove(bh->sessions, (void *)blade_session_id_get(bs)) == NULL) ret = KS_STATUS_FAIL; + ks_hash_write_unlock(bh->sessions); + + blade_session_write_unlock(bs); + + return ret; +} + + KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh) { diff --git a/libs/libblade/src/include/blade_connection.h b/libs/libblade/src/include/blade_connection.h index 5d149bc445..2242197861 100644 --- a/libs/libblade/src/include/blade_connection.h +++ b/libs/libblade/src/include/blade_connection.h @@ -57,6 +57,8 @@ KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc); KS_DECLARE(blade_connection_rank_t) blade_connection_rank(blade_connection_t *bc, blade_identity_t *target); KS_DECLARE(ks_status_t) blade_connection_sending_push(blade_connection_t *bc, cJSON *json); KS_DECLARE(ks_status_t) blade_connection_sending_pop(blade_connection_t *bc, cJSON **json); +KS_DECLARE(const char *) blade_connection_session_get(blade_connection_t *bc); +KS_DECLARE(void) blade_connection_session_set(blade_connection_t *bc, const char *id); KS_END_EXTERN_C #endif diff --git a/libs/libblade/src/include/blade_session.h b/libs/libblade/src/include/blade_session.h index 975d10bbfb..c6d376acd9 100644 --- a/libs/libblade/src/include/blade_session.h +++ b/libs/libblade/src/include/blade_session.h @@ -38,12 +38,19 @@ KS_BEGIN_EXTERN_C KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle_t *bh); KS_DECLARE(ks_status_t) blade_session_destroy(blade_session_t **bsP); -KS_DECLARE(ks_status_t) blade_sesssion_startup(blade_session_t *bs); +KS_DECLARE(ks_status_t) blade_session_startup(blade_session_t *bs); KS_DECLARE(ks_status_t) blade_session_shutdown(blade_session_t *bs); +KS_DECLARE(blade_handle_t *) blade_session_handle_get(blade_session_t *bs); KS_DECLARE(const char *) blade_session_id_get(blade_session_t *bs); KS_DECLARE(void) blade_session_id_set(blade_session_t *bs, const char *id); +KS_DECLARE(ks_status_t) blade_session_read_lock(blade_session_t *bs, ks_bool_t block); +KS_DECLARE(ks_status_t) blade_session_read_unlock(blade_session_t *bs); +KS_DECLARE(ks_status_t) blade_session_write_lock(blade_session_t *bs, ks_bool_t block); +KS_DECLARE(ks_status_t) blade_session_write_unlock(blade_session_t *bs); KS_DECLARE(void) blade_session_state_set(blade_session_t *bs, blade_session_state_t state); KS_DECLARE(void) blade_session_hangup(blade_session_t *bs); +KS_DECLARE(ks_status_t) blade_session_connections_add(blade_session_t *bs, const char *id); +KS_DECLARE(ks_status_t) blade_session_connections_remove(blade_session_t *bs, const char *id); KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json); KS_DECLARE(ks_status_t) blade_session_sending_push(blade_session_t *bs, cJSON *json); KS_DECLARE(ks_status_t) blade_session_sending_pop(blade_session_t *bs, cJSON **json); diff --git a/libs/libblade/src/include/blade_stack.h b/libs/libblade/src/include/blade_stack.h index f6de9665ba..7b87fa84f3 100644 --- a/libs/libblade/src/include/blade_stack.h +++ b/libs/libblade/src/include/blade_stack.h @@ -51,10 +51,15 @@ KS_DECLARE(ks_thread_pool_t *) blade_handle_tpool_get(blade_handle_t *bh); KS_DECLARE(ks_status_t) blade_handle_transport_register(blade_handle_t *bh, blade_module_t *bm, const char *name, blade_transport_callbacks_t *callbacks); KS_DECLARE(ks_status_t) blade_handle_transport_unregister(blade_handle_t *bh, const char *name); KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target); + KS_DECLARE(blade_connection_t *) blade_handle_connections_get(blade_handle_t *bh, const char *cid); KS_DECLARE(ks_status_t) blade_handle_connections_add(blade_connection_t *bc); KS_DECLARE(ks_status_t) blade_handle_connections_remove(blade_connection_t *bc); +KS_DECLARE(blade_session_t *) blade_handle_sessions_get(blade_handle_t *bh, const char *sid); +KS_DECLARE(ks_status_t) blade_handle_sessions_add(blade_session_t *bs); +KS_DECLARE(ks_status_t) blade_handle_sessions_remove(blade_session_t *bs); + KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh); KS_DECLARE(ks_status_t) blade_handle_datastore_store(blade_handle_t *bh, const void *key, int32_t key_length, const void *data, int64_t data_length); KS_DECLARE(ks_status_t) blade_handle_datastore_fetch(blade_handle_t *bh, diff --git a/libs/libblade/src/include/blade_types.h b/libs/libblade/src/include/blade_types.h index 5531557dd6..2442d88a06 100644 --- a/libs/libblade/src/include/blade_types.h +++ b/libs/libblade/src/include/blade_types.h @@ -92,6 +92,7 @@ typedef enum { BLADE_SESSION_STATE_NONE, BLADE_SESSION_STATE_DESTROY, BLADE_SESSION_STATE_HANGUP, + BLADE_SESSION_STATE_CONNECT, BLADE_SESSION_STATE_ATTACH, BLADE_SESSION_STATE_DETACH, BLADE_SESSION_STATE_READY,