FS-9952: Added the first half of the session negotations for the server side, untested as it requires the second half coming soon for client side

This commit is contained in:
Shane Bryldt 2017-02-21 21:20:44 +00:00 committed by Mike Jerris
parent cb7e95fd9a
commit 3d8fd5dcaf
8 changed files with 288 additions and 23 deletions

View File

@ -50,6 +50,8 @@ struct blade_connection_s {
ks_rwl_t *lock;
ks_q_t *sending;
const char *session;
};
void *blade_connection_state_thread(ks_thread_t *thread, void *data);
@ -147,6 +149,8 @@ KS_DECLARE(ks_status_t) blade_connection_shutdown(blade_connection_t *bc)
bc->shutdown = KS_FALSE;
}
if (bc->session) ks_pool_free(bc->pool, &bc->session);
while (ks_q_trypop(bc->sending, (void **)&json) == KS_STATUS_SUCCESS && json) cJSON_Delete(json);
return KS_STATUS_SUCCESS;
@ -304,6 +308,20 @@ KS_DECLARE(ks_status_t) blade_connection_sending_pop(blade_connection_t *bc, cJS
return ks_q_trypop(bc->sending, (void **)json);
}
KS_DECLARE(const char *) blade_connection_session_get(blade_connection_t *bc)
{
ks_assert(bc);
return bc->session;
}
KS_DECLARE(void) blade_connection_session_set(blade_connection_t *bc, const char *id)
{
ks_assert(bc);
if (bc->session) ks_pool_free(bc->pool, &bc->session);
bc->session = ks_pstrdup(bc->pool, id);
}
void *blade_connection_state_thread(ks_thread_t *thread, void *data)
{
@ -369,11 +387,18 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data)
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_ATTACH);
break;
case BLADE_CONNECTION_STATE_ATTACH:
// @todo receive message with nullable session id for reconnect and some sort of secure token for a reconnect challenge?
// determine how much of session management is handled here... do we process these session negotiation messages without
// passing it up to the application layer? or does the application layer give back a session and build the response?
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_READY);
break;
{
blade_session_t *bs = blade_handle_sessions_get(bc->handle, bc->session);
ks_assert(bs); // should not happen because bs should still be locked
blade_session_connections_add(bs, bc->id);
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_READY);
blade_session_state_set(bs, BLADE_SESSION_STATE_READY);
blade_session_read_unlock(bs); // unlock the session we expect to be locked during the callback to ensure we can finish attaching
break;
}
case BLADE_CONNECTION_STATE_DETACH:
// @todo detach from session if this connection is attached
blade_connection_state_set(bc, BLADE_CONNECTION_STATE_DISCONNECT);

View File

@ -895,35 +895,104 @@ blade_connection_state_hook_t blade_transport_wss_on_state_connect_outbound(blad
blade_connection_state_hook_t blade_transport_wss_on_state_attach_inbound(blade_connection_t *bc, blade_connection_state_condition_t condition)
{
blade_connection_state_hook_t ret = BLADE_CONNECTION_STATE_HOOK_SUCCESS;
blade_transport_wss_t *bt_wss = NULL;
cJSON *json = NULL;
cJSON *params = NULL;
blade_session_t *bs = NULL;
blade_handle_t *bh = NULL;
const char *jsonrpc = NULL;
const char *method = NULL;
const char *id = NULL;
const char *sid = NULL;
ks_time_t timeout;
ks_assert(bc);
bh = blade_connection_handle_get(bc);
ks_assert(bh);
ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
// @todo block while reading expected message with blade_transport_wss_read(bt_wss, json)
bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc);
// @todo check if expected message is a request by confirming it has a method field (along with json field validation, stay compliant with jsonrpc)
// @todo very temporary, really need monotonic clock and get timeout delay and sleep delay from config
timeout = ks_time_now() + (5 * KS_USEC_PER_SEC);
while (blade_transport_wss_read(bt_wss, &json) == KS_STATUS_SUCCESS) {
if (json) break;
ks_sleep(250);
if (ks_time_now() >= timeout) break;
}
// @todo validate method is "blade.session.attach"
if (!json) {
// @todo error logging
ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
goto done;
}
// @todo validate parameters "session-id" and "session-token" must both be present or omitted, validate both are strings and valid uuid format
// if both are omitted, params may be omitted entirely by jsonrpc spec
// @todo validation wrapper for request and response/error to confirm jsonrpc and provide enum for output as to which it is
jsonrpc = cJSON_GetObjectCstr(json, "jsonrpc"); // @todo check for definitions of these keys and fixed values
if (!jsonrpc || strcmp(jsonrpc, "2.0")) {
// @todo error logging
ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
goto done;
}
// @todo if session-id is provided, lookup existing session within the blade_handle_t
id = cJSON_GetObjectCstr(json, "id"); // @todo switch to number if we are not using a uuid for message id
if (!id) {
// @todo error logging
ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
goto done;
}
// @todo if the session exists, verify the session-token, if it matches then use this session
method = cJSON_GetObjectCstr(json, "method");
if (!method || strcasecmp(method, "blade.session.attach")) {
// @todo error logging
ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
goto done;
}
// @todo if the session-token does not match, or the session does not exist, or the session-id and session-token are not provided then create a new session
params = cJSON_GetObjectItem(json, "params");
if (params) {
sid = cJSON_GetObjectCstr(params, "session-id");
if (sid) {
// @todo validate uuid format by parsing, not currently available in uuid functions
ks_log(KS_LOG_DEBUG, "Session Requested: %s\n", sid);
}
}
// @todo once session is established, associate it to the connection
// @todo if anything fails, return HOOK_DISCONNECT, otherwise return HOOK_SUCCESS which will continue the rest of the session attaching process
// which is to grab the expected session off the connection and attach the connection to the connection list on the session, start the session thread if
// it hasn't already been started, and set the session state to CONNECT or ATTACH... discuss with tony, finalize session state machine regarding multiple
// connections attempting to attach at the same time to the session and changing the session state, may need to queue pending connections to the session
// and process them from within the session state machine thread
if (sid) {
bs = blade_handle_sessions_get(bh, sid); // bs comes out read locked if not null to prevent it being cleaned up before we are done
if (bs) {
ks_log(KS_LOG_DEBUG, "Session Located: %s\n", blade_session_id_get(bs));
}
}
ks_sleep_ms(1000); // @todo temporary testing, remove this and return success once negotiations are done
return BLADE_CONNECTION_STATE_HOOK_BYPASS;
if (!bs) {
blade_session_create(&bs, bh);
ks_assert(bs);
ks_log(KS_LOG_DEBUG, "Session Created: %s\n", blade_session_id_get(bs));
blade_session_read_lock(bs, KS_TRUE); // this will be done by blade_handle_sessions_get() otherwise
if (blade_session_startup(bs) != KS_STATUS_SUCCESS) {
blade_session_read_unlock(bs);
blade_session_destroy(&bs);
ret = BLADE_CONNECTION_STATE_HOOK_DISCONNECT;
goto done;
}
blade_handle_sessions_add(bs);
}
blade_connection_session_set(bc, blade_session_id_get(bs));
done:
// @note the state machine expects if we return SUCCESS, that the session assigned to the connection will be read locked to ensure that the state
// machine can finish attaching the session, if you BYPASS then you can handle everything here in the callback, but this should be fairly standard
// behaviour to simply go as far as assigning a session to the connection and let the system handle the rest
if (json) cJSON_Delete(json);
return ret;
}
blade_connection_state_hook_t blade_transport_wss_on_state_attach_outbound(blade_connection_t *bc, blade_connection_state_condition_t condition)
@ -961,6 +1030,7 @@ blade_connection_state_hook_t blade_transport_wss_on_state_detach(blade_connecti
ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
ks_sleep(1000);
return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
}
@ -970,6 +1040,7 @@ blade_connection_state_hook_t blade_transport_wss_on_state_ready(blade_connectio
ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition);
ks_sleep(1000);
return BLADE_CONNECTION_STATE_HOOK_SUCCESS;
}

View File

@ -42,6 +42,7 @@ struct blade_session_s {
blade_session_state_t state;
const char *id;
ks_rwl_t *lock;
list_t connections;
ks_q_t *sending;
@ -69,6 +70,9 @@ KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle
ks_uuid(&id);
bs->id = ks_uuid_str(pool, &id);
ks_rwl_create(&bs->lock, pool);
ks_assert(bs->lock);
list_init(&bs->connections);
ks_q_create(&bs->sending, pool, 0);
ks_assert(bs->sending);
@ -95,6 +99,8 @@ KS_DECLARE(ks_status_t) blade_session_destroy(blade_session_t **bsP)
ks_q_destroy(&bs->receiving);
ks_q_destroy(&bs->sending);
ks_rwl_destroy(&bs->lock);
ks_pool_free(bs->pool, &bs->id);
ks_pool_free(bs->pool, bsP);
@ -138,9 +144,24 @@ KS_DECLARE(ks_status_t) blade_session_shutdown(blade_session_t *bs)
while (ks_q_trypop(bs->sending, (void **)&json) == KS_STATUS_SUCCESS && json) cJSON_Delete(json);
while (ks_q_trypop(bs->receiving, (void **)&json) == KS_STATUS_SUCCESS && json) cJSON_Delete(json);
list_iterator_start(&bs->connections);
while (list_iterator_hasnext(&bs->connections)) {
const char *id = (const char *)list_iterator_next(&bs->connections);
ks_pool_free(bs->pool, &id);
}
list_iterator_stop(&bs->connections);
list_clear(&bs->connections);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(blade_handle_t *) blade_session_handle_get(blade_session_t *bs)
{
ks_assert(bs);
return bs->handle;
}
KS_DECLARE(const char *) blade_session_id_get(blade_session_t *bs)
{
ks_assert(bs);
@ -157,6 +178,43 @@ KS_DECLARE(void) blade_session_id_set(blade_session_t *bs, const char *id)
bs->id = ks_pstrdup(bs->pool, id);
}
KS_DECLARE(ks_status_t) blade_session_read_lock(blade_session_t *bs, ks_bool_t block)
{
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(bs);
if (block) ret = ks_rwl_read_lock(bs->lock);
else ret = ks_rwl_try_read_lock(bs->lock);
return ret;
}
KS_DECLARE(ks_status_t) blade_session_read_unlock(blade_session_t *bs)
{
ks_assert(bs);
return ks_rwl_read_unlock(bs->lock);
}
KS_DECLARE(ks_status_t) blade_session_write_lock(blade_session_t *bs, ks_bool_t block)
{
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(bs);
if (block) ret = ks_rwl_write_lock(bs->lock);
else ret = ks_rwl_try_write_lock(bs->lock);
return ret;
}
KS_DECLARE(ks_status_t) blade_session_write_unlock(blade_session_t *bs)
{
ks_assert(bs);
return ks_rwl_write_unlock(bs->lock);
}
KS_DECLARE(void) blade_session_state_set(blade_session_t *bs, blade_session_state_t state)
{
ks_assert(bs);
@ -172,6 +230,41 @@ KS_DECLARE(void) blade_session_hangup(blade_session_t *bs)
blade_session_state_set(bs, BLADE_SESSION_STATE_HANGUP);
}
KS_DECLARE(ks_status_t) blade_session_connections_add(blade_session_t *bs, const char *id)
{
ks_status_t ret = KS_STATUS_SUCCESS;
const char *cid = NULL;
ks_assert(bs);
cid = ks_pstrdup(bs->pool, id);
ks_assert(cid);
list_append(&bs->connections, cid);
return ret;
}
KS_DECLARE(ks_status_t) blade_session_connections_remove(blade_session_t *bs, const char *id)
{
ks_status_t ret = KS_STATUS_SUCCESS;
uint32_t size = 0;
ks_assert(bs);
size = list_size(&bs->connections);
for (uint32_t i = 0; i < size; ++i) {
const char *cid = (const char *)list_get_at(&bs->connections, i);
if (!strcasecmp(cid, id)) {
list_delete_at(&bs->connections, i);
ks_pool_free(bs->pool, &cid);
break;
}
}
return ret;
}
ks_status_t blade_session_connections_choose(blade_session_t *bs, cJSON *json, blade_connection_t **bcP)
{
blade_connection_t *bc = NULL;
@ -194,6 +287,7 @@ ks_status_t blade_session_connections_choose(blade_session_t *bs, cJSON *json, b
// @todo error logging... this shouldn't happen
return KS_STATUS_FAIL;
}
// @todo make sure the connection is in the READY state before allowing it to be choosen, just in case it is detaching or not quite fully attached
*bcP = bc;
@ -272,6 +366,12 @@ void *blade_session_state_thread(ks_thread_t *thread, void *data)
// @todo detach from session if this connection is attached
blade_session_state_set(bs, BLADE_SESSION_STATE_DESTROY);
break;
case BLADE_SESSION_STATE_CONNECT:
break;
case BLADE_SESSION_STATE_ATTACH:
break;
case BLADE_SESSION_STATE_DETACH:
break;
case BLADE_SESSION_STATE_READY:
// @todo pop from session receiving queue and pass to blade_protocol_process()
break;

View File

@ -419,6 +419,60 @@ KS_DECLARE(ks_status_t) blade_handle_connections_remove(blade_connection_t *bc)
}
KS_DECLARE(blade_session_t *) blade_handle_sessions_get(blade_handle_t *bh, const char *sid)
{
blade_session_t *bs = NULL;
ks_assert(bs);
ks_assert(sid);
ks_hash_read_lock(bh->sessions);
bs = ks_hash_search(bh->sessions, (void *)sid, KS_UNLOCKED);
if (bs && blade_session_read_lock(bs, KS_FALSE) != KS_STATUS_SUCCESS) bs = NULL;
ks_hash_read_unlock(bh->sessions);
return bs;
}
KS_DECLARE(ks_status_t) blade_handle_sessions_add(blade_session_t *bs)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_handle_t *bh = NULL;
ks_assert(bs);
bh = blade_session_handle_get(bs);
ks_assert(bh);
ks_hash_write_lock(bh->sessions);
ret = ks_hash_insert(bh->sessions, (void *)blade_session_id_get(bs), bs);
ks_hash_write_unlock(bh->sessions);
return ret;
}
KS_DECLARE(ks_status_t) blade_handle_sessions_remove(blade_session_t *bs)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_handle_t *bh = NULL;
ks_assert(bs);
bh = blade_session_handle_get(bs);
ks_assert(bh);
blade_session_write_lock(bs, KS_TRUE);
ks_hash_write_lock(bh->sessions);
if (ks_hash_remove(bh->sessions, (void *)blade_session_id_get(bs)) == NULL) ret = KS_STATUS_FAIL;
ks_hash_write_unlock(bh->sessions);
blade_session_write_unlock(bs);
return ret;
}
KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh)
{

View File

@ -57,6 +57,8 @@ KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc);
KS_DECLARE(blade_connection_rank_t) blade_connection_rank(blade_connection_t *bc, blade_identity_t *target);
KS_DECLARE(ks_status_t) blade_connection_sending_push(blade_connection_t *bc, cJSON *json);
KS_DECLARE(ks_status_t) blade_connection_sending_pop(blade_connection_t *bc, cJSON **json);
KS_DECLARE(const char *) blade_connection_session_get(blade_connection_t *bc);
KS_DECLARE(void) blade_connection_session_set(blade_connection_t *bc, const char *id);
KS_END_EXTERN_C
#endif

View File

@ -38,12 +38,19 @@
KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle_t *bh);
KS_DECLARE(ks_status_t) blade_session_destroy(blade_session_t **bsP);
KS_DECLARE(ks_status_t) blade_sesssion_startup(blade_session_t *bs);
KS_DECLARE(ks_status_t) blade_session_startup(blade_session_t *bs);
KS_DECLARE(ks_status_t) blade_session_shutdown(blade_session_t *bs);
KS_DECLARE(blade_handle_t *) blade_session_handle_get(blade_session_t *bs);
KS_DECLARE(const char *) blade_session_id_get(blade_session_t *bs);
KS_DECLARE(void) blade_session_id_set(blade_session_t *bs, const char *id);
KS_DECLARE(ks_status_t) blade_session_read_lock(blade_session_t *bs, ks_bool_t block);
KS_DECLARE(ks_status_t) blade_session_read_unlock(blade_session_t *bs);
KS_DECLARE(ks_status_t) blade_session_write_lock(blade_session_t *bs, ks_bool_t block);
KS_DECLARE(ks_status_t) blade_session_write_unlock(blade_session_t *bs);
KS_DECLARE(void) blade_session_state_set(blade_session_t *bs, blade_session_state_t state);
KS_DECLARE(void) blade_session_hangup(blade_session_t *bs);
KS_DECLARE(ks_status_t) blade_session_connections_add(blade_session_t *bs, const char *id);
KS_DECLARE(ks_status_t) blade_session_connections_remove(blade_session_t *bs, const char *id);
KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json);
KS_DECLARE(ks_status_t) blade_session_sending_push(blade_session_t *bs, cJSON *json);
KS_DECLARE(ks_status_t) blade_session_sending_pop(blade_session_t *bs, cJSON **json);

View File

@ -51,10 +51,15 @@ KS_DECLARE(ks_thread_pool_t *) blade_handle_tpool_get(blade_handle_t *bh);
KS_DECLARE(ks_status_t) blade_handle_transport_register(blade_handle_t *bh, blade_module_t *bm, const char *name, blade_transport_callbacks_t *callbacks);
KS_DECLARE(ks_status_t) blade_handle_transport_unregister(blade_handle_t *bh, const char *name);
KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target);
KS_DECLARE(blade_connection_t *) blade_handle_connections_get(blade_handle_t *bh, const char *cid);
KS_DECLARE(ks_status_t) blade_handle_connections_add(blade_connection_t *bc);
KS_DECLARE(ks_status_t) blade_handle_connections_remove(blade_connection_t *bc);
KS_DECLARE(blade_session_t *) blade_handle_sessions_get(blade_handle_t *bh, const char *sid);
KS_DECLARE(ks_status_t) blade_handle_sessions_add(blade_session_t *bs);
KS_DECLARE(ks_status_t) blade_handle_sessions_remove(blade_session_t *bs);
KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh);
KS_DECLARE(ks_status_t) blade_handle_datastore_store(blade_handle_t *bh, const void *key, int32_t key_length, const void *data, int64_t data_length);
KS_DECLARE(ks_status_t) blade_handle_datastore_fetch(blade_handle_t *bh,

View File

@ -92,6 +92,7 @@ typedef enum {
BLADE_SESSION_STATE_NONE,
BLADE_SESSION_STATE_DESTROY,
BLADE_SESSION_STATE_HANGUP,
BLADE_SESSION_STATE_CONNECT,
BLADE_SESSION_STATE_ATTACH,
BLADE_SESSION_STATE_DETACH,
BLADE_SESSION_STATE_READY,