diff --git a/libs/libblade/Makefile.am b/libs/libblade/Makefile.am index c4e1872375..46e751405f 100644 --- a/libs/libblade/Makefile.am +++ b/libs/libblade/Makefile.am @@ -14,13 +14,15 @@ lib_LTLIBRARIES = libblade.la libblade_la_SOURCES = src/blade.c src/blade_stack.c src/bpcp.c src/blade_datastore.c libblade_la_SOURCES += src/blade_rpcproto.c libblade_la_SOURCES += src/blade_identity.c src/blade_module.c src/blade_connection.c src/blade_module_wss.c +libblade_la_SOURCES += src/blade_session.c src/blade_protocol.c libblade_la_CFLAGS = $(AM_CFLAGS) $(AM_CPPFLAGS) libblade_la_LDFLAGS = -version-info 0:1:0 -lncurses -lpthread -lm -lconfig $(AM_LDFLAGS) libblade_la_LIBADD = libunqlite.la library_includedir = $(prefix)/include library_include_HEADERS = src/include/blade.h src/include/blade_types.h src/include/blade_stack.h library_include_HEADERS += src/include/bpcp.h src/include/blade_datastore.h src/include/blade_rpcproto.h -library_include_HEADERS += src/include/blade_identity.h src/include/blade_module.h src/include/blade_connection.h +library_include_HEADERS += src/include/blade_identity.h src/include/blade_module.h src/include/blade_connection.h +library_include_HEADERS += src/include/blade_session.h src/include/blade_protocol.h library_include_HEADERS += src/include/unqlite.h test/tap.h tests: libblade.la diff --git a/libs/libblade/src/blade_connection.c b/libs/libblade/src/blade_connection.c index e18f2c247e..d548c477de 100644 --- a/libs/libblade/src/blade_connection.c +++ b/libs/libblade/src/blade_connection.c @@ -42,58 +42,16 @@ struct blade_connection_s { blade_transport_callbacks_t *transport_callbacks; ks_bool_t shutdown; - // @todo add auto generated UUID blade_connection_direction_t direction; ks_thread_t *state_thread; blade_connection_state_t state; + + const char *id; + ks_rwl_t *lock; ks_q_t *sending; - //ks_q_t *receiving; }; -// @todo may want to make this reusable for session as it'll need to queue the same details during temporary connection loss -typedef struct blade_connection_sending_s blade_connection_sending_t; -struct blade_connection_sending_s { - ks_pool_t *pool; - blade_identity_t *target; - cJSON *json; -}; - -ks_status_t blade_connection_sending_create(blade_connection_sending_t **bcsP, ks_pool_t *pool, blade_identity_t *target, cJSON *json) -{ - blade_connection_sending_t *bcs = NULL; - - ks_assert(bcsP); - ks_assert(pool); - ks_assert(json); - - bcs = ks_pool_alloc(pool, sizeof(blade_connection_sending_t)); - bcs->pool = pool; - bcs->target = target; - bcs->json = json; - *bcsP = bcs; - - return KS_STATUS_SUCCESS; -} - -ks_status_t blade_connection_sending_destroy(blade_connection_sending_t **bcsP) -{ - blade_connection_sending_t *bcs = NULL; - - ks_assert(bcsP); - ks_assert(*bcsP); - - bcs = *bcsP; - - if (bcs->target) blade_identity_destroy(&bcs->target); - if (bcs->json) cJSON_Delete(bcs->json); - - ks_pool_free(bcs->pool, bcsP); - - return KS_STATUS_SUCCESS; -} - - void *blade_connection_state_thread(ks_thread_t *thread, void *data); @@ -104,6 +62,7 @@ KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP, { blade_connection_t *bc = NULL; ks_pool_t *pool = NULL; + uuid_t id; ks_assert(bcP); ks_assert(bh); @@ -116,6 +75,14 @@ KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP, bc->pool = pool; bc->transport_init_data = transport_init_data; bc->transport_callbacks = transport_callbacks; + + ks_uuid(&id); + bc->id = ks_uuid_str(pool, &id); + ks_assert(bc->id); + + ks_rwl_create(&bc->lock, pool); + ks_assert(bc->lock); + ks_q_create(&bc->sending, pool, 0); ks_assert(bc->sending); @@ -127,7 +94,7 @@ KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP, KS_DECLARE(ks_status_t) blade_connection_destroy(blade_connection_t **bcP) { blade_connection_t *bc = NULL; - + ks_assert(bcP); ks_assert(*bcP); @@ -137,6 +104,10 @@ KS_DECLARE(ks_status_t) blade_connection_destroy(blade_connection_t **bcP) ks_q_destroy(&bc->sending); + ks_rwl_destroy(&bc->lock); + + ks_pool_free(bc->pool, &bc->id); + ks_pool_free(bc->pool, bcP); return KS_STATUS_SUCCESS; @@ -165,7 +136,7 @@ KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc, blade_c KS_DECLARE(ks_status_t) blade_connection_shutdown(blade_connection_t *bc) { - blade_connection_sending_t *bcs = NULL; + cJSON *json = NULL; ks_assert(bc); @@ -176,11 +147,62 @@ KS_DECLARE(ks_status_t) blade_connection_shutdown(blade_connection_t *bc) bc->shutdown = KS_FALSE; } - while (ks_q_trypop(bc->sending, (void **)&bcs) == KS_STATUS_SUCCESS && bcs) blade_connection_sending_destroy(&bcs); + while (ks_q_trypop(bc->sending, (void **)&json) == KS_STATUS_SUCCESS && json) cJSON_Delete(json); return KS_STATUS_SUCCESS; } +KS_DECLARE(blade_handle_t *) blade_connection_handle_get(blade_connection_t *bc) +{ + ks_assert(bc); + + return bc->handle; +} + +KS_DECLARE(const char *) blade_connection_id_get(blade_connection_t *bc) +{ + ks_assert(bc); + + return bc->id; +} + +KS_DECLARE(ks_status_t) blade_connection_read_lock(blade_connection_t *bc, ks_bool_t block) +{ + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(bc); + + if (block) ret = ks_rwl_read_lock(bc->lock); + else ret = ks_rwl_try_read_lock(bc->lock); + return ret; +} + +KS_DECLARE(ks_status_t) blade_connection_read_unlock(blade_connection_t *bc) +{ + ks_assert(bc); + + return ks_rwl_read_unlock(bc->lock); +} + +KS_DECLARE(ks_status_t) blade_connection_write_lock(blade_connection_t *bc, ks_bool_t block) +{ + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(bc); + + if (block) ret = ks_rwl_write_lock(bc->lock); + else ret = ks_rwl_try_write_lock(bc->lock); + return ret; +} + +KS_DECLARE(ks_status_t) blade_connection_write_unlock(blade_connection_t *bc) +{ + ks_assert(bc); + + return ks_rwl_write_unlock(bc->lock); +} + + KS_DECLARE(void *) blade_connection_transport_init_get(blade_connection_t *bc) { ks_assert(bc); @@ -263,40 +285,23 @@ KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc) blade_connection_state_set(bc, BLADE_CONNECTION_STATE_DETACH); } -KS_DECLARE(ks_status_t) blade_connection_sending_push(blade_connection_t *bc, blade_identity_t *target, cJSON *json) +KS_DECLARE(ks_status_t) blade_connection_sending_push(blade_connection_t *bc, cJSON *json) { - blade_connection_sending_t *bcs = NULL; - - ks_assert(bc); - ks_assert(json); - - blade_connection_sending_create(&bcs, bc->pool, target, json); - ks_assert(bcs); - - return ks_q_push(bc->sending, bcs); -} - -KS_DECLARE(ks_status_t) blade_connection_sending_pop(blade_connection_t *bc, blade_identity_t **target, cJSON **json) -{ - ks_status_t ret = KS_STATUS_SUCCESS; - blade_connection_sending_t *bcs = NULL; + cJSON *json_copy = NULL; ks_assert(bc); ks_assert(json); - ret = ks_q_trypop(bc->sending, (void **)&bcs); + json_copy = cJSON_Duplicate(json, 1); + return ks_q_push(bc->sending, json_copy); +} - if (bcs) { - if (target) *target = bcs->target; - *json = bcs->json; +KS_DECLARE(ks_status_t) blade_connection_sending_pop(blade_connection_t *bc, cJSON **json) +{ + ks_assert(bc); + ks_assert(json); - bcs->target = NULL; - bcs->json = NULL; - - blade_connection_sending_destroy(&bcs); - } - - return ret; + return ks_q_trypop(bc->sending, (void **)json); } @@ -306,7 +311,6 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data) blade_connection_state_t state; blade_transport_state_callback_t callback = NULL; blade_connection_state_hook_t hook = BLADE_CONNECTION_STATE_HOOK_SUCCESS; - blade_identity_t *target = NULL; cJSON *json = NULL; ks_assert(thread); @@ -320,23 +324,32 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data) hook = BLADE_CONNECTION_STATE_HOOK_SUCCESS; callback = blade_connection_state_callback_lookup(bc, state); - while (blade_connection_sending_pop(bc, &target, &json) == KS_STATUS_SUCCESS && json) { - if (bc->transport_callbacks->onsend(bc, target, json) != KS_STATUS_SUCCESS) { - blade_connection_disconnect(bc); - break; + // @todo only READY state? + if (state != BLADE_CONNECTION_STATE_DETACH && state != BLADE_CONNECTION_STATE_DISCONNECT) { + while (blade_connection_sending_pop(bc, &json) == KS_STATUS_SUCCESS && json) { + ks_status_t ret = bc->transport_callbacks->onsend(bc, json); + cJSON_Delete(json); + + if (ret != KS_STATUS_SUCCESS) { + blade_connection_disconnect(bc); + break; + } } } if (state == BLADE_CONNECTION_STATE_READY) { - do { + ks_bool_t done = KS_FALSE; + while (!done) { if (bc->transport_callbacks->onreceive(bc, &json) != KS_STATUS_SUCCESS) { blade_connection_disconnect(bc); break; } - if (json) { + if (!(done = (json == NULL))) { // @todo push json to session receiving queue + cJSON_Delete(json); + json = NULL; } - } while (json) ; + } } if (callback) hook = callback(bc, BLADE_CONNECTION_STATE_CONDITION_POST); diff --git a/libs/libblade/src/blade_module_wss.c b/libs/libblade/src/blade_module_wss.c index 4478a788a1..8ca224cf9f 100644 --- a/libs/libblade/src/blade_module_wss.c +++ b/libs/libblade/src/blade_module_wss.c @@ -101,7 +101,7 @@ ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP); ks_status_t blade_transport_wss_on_connect(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target); blade_connection_rank_t blade_transport_wss_on_rank(blade_connection_t *bc, blade_identity_t *target); -ks_status_t blade_transport_wss_on_send(blade_connection_t *bc, blade_identity_t *target, cJSON *json); +ks_status_t blade_transport_wss_on_send(blade_connection_t *bc, cJSON *json); ks_status_t blade_transport_wss_on_receive(blade_connection_t *bc, cJSON **json); blade_connection_state_hook_t blade_transport_wss_on_state_disconnect(blade_connection_t *bc, blade_connection_state_condition_t condition); @@ -484,11 +484,6 @@ ks_status_t blade_module_wss_listen(blade_module_wss_t *bm_wss, ks_sockaddr_t *a goto done; } - ks_log(KS_LOG_DEBUG, "Listeners Before\n"); - for (int index = 0; index < bm_wss->listeners_count; ++index) { - ks_log(KS_LOG_DEBUG, " Listener %d = %d\n", index, bm_wss->listeners_poll[index].fd); - } - listener_index = bm_wss->listeners_count++; bm_wss->listeners_poll = (struct pollfd *)ks_pool_resize(bm_wss->pool, bm_wss->listeners_poll, @@ -497,11 +492,6 @@ ks_status_t blade_module_wss_listen(blade_module_wss_t *bm_wss, ks_sockaddr_t *a bm_wss->listeners_poll[listener_index].fd = listener; bm_wss->listeners_poll[listener_index].events = POLLIN | POLLERR; - ks_log(KS_LOG_DEBUG, "Listeners After\n"); - for (int index = 0; index < bm_wss->listeners_count; ++index) { - ks_log(KS_LOG_DEBUG, " Listener %d = %d\n", index, bm_wss->listeners_poll[index].fd); - } - done: if (ret != KS_STATUS_SUCCESS) { if (listener != KS_SOCK_INVALID) { @@ -533,7 +523,6 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data) if (bm_wss->listeners_poll[index].revents & POLLERR) { // @todo: error handling, just skip the listener for now, it might recover, could skip X times before closing? - ks_log(KS_LOG_DEBUG, "Listener POLLERR\n"); continue; } if (!(bm_wss->listeners_poll[index].revents & POLLIN)) continue; @@ -550,6 +539,8 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data) blade_connection_create(&bc, bm_wss->handle, bt_wss_init, bm_wss->transport_callbacks); ks_assert(bc); + + blade_connection_read_lock(bc, KS_TRUE); if (blade_connection_startup(bc, BLADE_CONNECTION_DIRECTION_INBOUND) != KS_STATUS_SUCCESS) { blade_connection_destroy(&bc); @@ -557,8 +548,11 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data) ks_socket_close(&sock); continue; } + blade_handle_connections_add(bc); list_append(&bm_wss->connected, bc); blade_connection_state_set(bc, BLADE_CONNECTION_STATE_NEW); + + blade_connection_read_unlock(bc); } } @@ -566,6 +560,7 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data) bt_wss_init = (blade_transport_wss_init_t *)blade_connection_transport_init_get(bc); bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc); + blade_handle_connections_remove(bc); list_delete(&bm_wss->connected, bc); if (bt_wss_init) blade_transport_wss_init_destroy(&bt_wss_init); @@ -730,7 +725,7 @@ ks_status_t blade_transport_wss_write(blade_transport_wss_t *bt_wss, cJSON *json return ret; } -ks_status_t blade_transport_wss_on_send(blade_connection_t *bc, blade_identity_t *target, cJSON *json) +ks_status_t blade_transport_wss_on_send(blade_connection_t *bc, cJSON *json) { ks_status_t ret = KS_STATUS_SUCCESS; blade_transport_wss_t *bt_wss = NULL; @@ -745,7 +740,6 @@ ks_status_t blade_transport_wss_on_send(blade_connection_t *bc, blade_identity_t ret = blade_transport_wss_write(bt_wss, json); // @todo use reference counting on blade_identity_t and cJSON objects - if (target) blade_identity_destroy(&target); cJSON_Delete(json); return ret; @@ -904,10 +898,30 @@ blade_connection_state_hook_t blade_transport_wss_on_state_attach_inbound(blade_ ks_assert(bc); ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition); - // @todo Establish sessid and discover existing session or create and register new session through BLADE commands - // Set session state to CONNECT if its new or RECONNECT if existing - // start session and its thread if its new + // @todo block while reading expected message with blade_transport_wss_read(bt_wss, json) + + // @todo check if expected message is a request by confirming it has a method field (along with json field validation, stay compliant with jsonrpc) + + // @todo validate method is "blade.session.attach" + + // @todo validate parameters "session-id" and "session-token" must both be present or omitted, validate both are strings and valid uuid format + // if both are omitted, params may be omitted entirely by jsonrpc spec + + // @todo if session-id is provided, lookup existing session within the blade_handle_t + + // @todo if the session exists, verify the session-token, if it matches then use this session + + // @todo if the session-token does not match, or the session does not exist, or the session-id and session-token are not provided then create a new session + + // @todo once session is established, associate it to the connection + + // @todo if anything fails, return HOOK_DISCONNECT, otherwise return HOOK_SUCCESS which will continue the rest of the session attaching process + // which is to grab the expected session off the connection and attach the connection to the connection list on the session, start the session thread if + // it hasn't already been started, and set the session state to CONNECT or ATTACH... discuss with tony, finalize session state machine regarding multiple + // connections attempting to attach at the same time to the session and changing the session state, may need to queue pending connections to the session + // and process them from within the session state machine thread + ks_sleep_ms(1000); // @todo temporary testing, remove this and return success once negotiations are done return BLADE_CONNECTION_STATE_HOOK_BYPASS; } @@ -918,6 +932,25 @@ blade_connection_state_hook_t blade_transport_wss_on_state_attach_outbound(blade ks_log(KS_LOG_DEBUG, "State Callback: %d\n", (int32_t)condition); + // @todo produce jsonrpc compliant message to call method "blade.session.attach" + + // @todo add params with nested session-id and session-token if attempting to reconnect as a client, this should probably be passed in from + // the blade_handle_connect() call and then through the init parameters for the transport (do not directly use the old session, but copy the id and token) + + // @todo block while sending message with blade_transport_wss_write(bt_wss, json) + + // @todo block while receiving expected response with blade_transport_wss_read(bt_wss, json) + + // @todo check for error field, log and return HOOK_DISCONNECT if any errors occur + + // @todo check for result field, and nested session-id and session-token + + // @todo lookup the old session from the blade_handle_t, if it still exists then use this session + + // @todo if the old session does not exist, then create a new session and populate with the parameters from the results + + // @todo once session is established, associate it to the connection, see attach_inbound for notes regarding universal actions after returning SUCCESS + ks_sleep_ms(1000); // @todo temporary testing, remove this and return success once negotiations are done return BLADE_CONNECTION_STATE_HOOK_BYPASS; } diff --git a/libs/libblade/src/blade_protocol.c b/libs/libblade/src/blade_protocol.c new file mode 100644 index 0000000000..4a3c071594 --- /dev/null +++ b/libs/libblade/src/blade_protocol.c @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2017, Shane Bryldt + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of the original author; nor the names of any contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "blade.h" + +KS_DECLARE(ks_status_t) blade_request_create(blade_request_t **breqP, ks_pool_t *pool, const char *session_id, cJSON *json /*, response_callback*/) +{ + blade_request_t *breq = NULL; + + ks_assert(breqP); + ks_assert(pool); + ks_assert(session_id); + ks_assert(json); + + breq = ks_pool_alloc(pool, sizeof(blade_request_t)); + breq->pool = pool; + breq->refs = 1; + breq->session_id = ks_pstrdup(pool, session_id); + breq->message = json; + breq->message_id = cJSON_GetObjectCstr(json, "id"); + //breq->response_callback = response_callback; + *breqP = breq; + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(ks_status_t) blade_request_destroy(blade_request_t **breqP) +{ + blade_request_t *breq = NULL; + + ks_assert(breqP); + ks_assert(*breqP); + + breq = *breqP; + + ks_pool_free(breq->pool, (void **)&breq->session_id); + cJSON_Delete(breq->message); + + ks_pool_free(breq->pool, breqP); + + return KS_STATUS_SUCCESS; +} + + +/* For Emacs: + * Local Variables: + * mode:c + * indent-tabs-mode:t + * tab-width:4 + * c-basic-offset:4 + * End: + * For VIM: + * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet: + */ diff --git a/libs/libblade/src/blade_session.c b/libs/libblade/src/blade_session.c new file mode 100644 index 0000000000..950a510838 --- /dev/null +++ b/libs/libblade/src/blade_session.c @@ -0,0 +1,294 @@ +/* + * Copyright (c) 2017, Shane Bryldt + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of the original author; nor the names of any contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "blade.h" + +struct blade_session_s { + blade_handle_t *handle; + ks_pool_t *pool; + + ks_bool_t shutdown; + ks_thread_t *state_thread; + blade_session_state_t state; + + const char *id; + list_t connections; + + ks_q_t *sending; + ks_q_t *receiving; +}; + +void *blade_session_state_thread(ks_thread_t *thread, void *data); + + +KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle_t *bh) +{ + blade_session_t *bs = NULL; + ks_pool_t *pool = NULL; + uuid_t id; + + ks_assert(bsP); + ks_assert(bh); + + pool = blade_handle_pool_get(bh); + + bs = ks_pool_alloc(pool, sizeof(blade_session_t)); + bs->handle = bh; + bs->pool = pool; + + ks_uuid(&id); + bs->id = ks_uuid_str(pool, &id); + + list_init(&bs->connections); + ks_q_create(&bs->sending, pool, 0); + ks_assert(bs->sending); + ks_q_create(&bs->receiving, pool, 0); + ks_assert(bs->receiving); + + *bsP = bs; + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(ks_status_t) blade_session_destroy(blade_session_t **bsP) +{ + blade_session_t *bs = NULL; + + ks_assert(bsP); + ks_assert(*bsP); + + bs = *bsP; + + blade_session_shutdown(bs); + + list_destroy(&bs->connections); + ks_q_destroy(&bs->receiving); + ks_q_destroy(&bs->sending); + + ks_pool_free(bs->pool, &bs->id); + + ks_pool_free(bs->pool, bsP); + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(ks_status_t) blade_session_startup(blade_session_t *bs) +{ + ks_assert(bs); + + blade_session_state_set(bs, BLADE_SESSION_STATE_NONE); + + if (ks_thread_create_ex(&bs->state_thread, + blade_session_state_thread, + bs, + KS_THREAD_FLAG_DEFAULT, + KS_THREAD_DEFAULT_STACK, + KS_PRI_NORMAL, + bs->pool) != KS_STATUS_SUCCESS) { + // @todo error logging + return KS_STATUS_FAIL; + } + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(ks_status_t) blade_session_shutdown(blade_session_t *bs) +{ + cJSON *json = NULL; + + ks_assert(bs); + + if (bs->state_thread) { + bs->shutdown = KS_TRUE; + ks_thread_join(bs->state_thread); + ks_pool_free(bs->pool, &bs->state_thread); + bs->shutdown = KS_FALSE; + } + + while (ks_q_trypop(bs->sending, (void **)&json) == KS_STATUS_SUCCESS && json) cJSON_Delete(json); + while (ks_q_trypop(bs->receiving, (void **)&json) == KS_STATUS_SUCCESS && json) cJSON_Delete(json); + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(const char *) blade_session_id_get(blade_session_t *bs) +{ + ks_assert(bs); + + return bs->id; +} + +KS_DECLARE(void) blade_session_id_set(blade_session_t *bs, const char *id) +{ + ks_assert(bs); + ks_assert(id); + + if (bs->id) ks_pool_free(bs->pool, &bs->id); + bs->id = ks_pstrdup(bs->pool, id); +} + +KS_DECLARE(void) blade_session_state_set(blade_session_t *bs, blade_session_state_t state) +{ + ks_assert(bs); + + bs->state = state; +} + +KS_DECLARE(void) blade_session_hangup(blade_session_t *bs) +{ + ks_assert(bs); + + if (bs->state != BLADE_SESSION_STATE_HANGUP && bs->state != BLADE_SESSION_STATE_DESTROY) + blade_session_state_set(bs, BLADE_SESSION_STATE_HANGUP); +} + +ks_status_t blade_session_connections_choose(blade_session_t *bs, cJSON *json, blade_connection_t **bcP) +{ + blade_connection_t *bc = NULL; + const char *cid = NULL; + + ks_assert(bs); + ks_assert(json); + ks_assert(bcP); + + // @todo may be multiple connections, for now let's just assume there will be only one + // later there will need to be a way to pick which connection to use + cid = list_get_at(&bs->connections, 0); + if (!cid) { + // @todo error logging... this shouldn't happen + return KS_STATUS_FAIL; + } + + bc = blade_handle_connections_get(bs->handle, cid); + if (!bc) { + // @todo error logging... this shouldn't happen + return KS_STATUS_FAIL; + } + + *bcP = bc; + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json) +{ + ks_assert(bs); + ks_assert(json); + + // @todo check json for "method", if this is an outgoing request then build up the data for a response to lookup the message id and get back to the request + // this can reuse blade_request_t so that when the blade_response_t is passed up the blade_request_t within it is familiar from inbound requests + + if (list_empty(&bs->connections)) { + // @todo cache the blade_request_t here if it exists to gaurentee it's cached before a response could be received + blade_session_sending_push(bs, json); + } else { + blade_connection_t *bc = NULL; + if (blade_session_connections_choose(bs, json, &bc) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + // @todo cache the blade_request_t here if it exists to gaurentee it's cached before a response could be received + blade_connection_sending_push(bc, json); + } + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(ks_status_t) blade_session_sending_push(blade_session_t *bs, cJSON *json) +{ + cJSON *json_copy = NULL; + + ks_assert(bs); + ks_assert(json); + + json_copy = cJSON_Duplicate(json, 1); + return ks_q_push(bs->sending, json_copy); +} + +KS_DECLARE(ks_status_t) blade_session_sending_pop(blade_session_t *bs, cJSON **json) +{ + ks_assert(bs); + ks_assert(json); + + return ks_q_trypop(bs->sending, (void **)json); +} + +// @todo receive queue push and pop + +void *blade_session_state_thread(ks_thread_t *thread, void *data) +{ + blade_session_t *bs = NULL; + blade_session_state_t state; + cJSON *json = NULL; + + ks_assert(thread); + ks_assert(data); + + bs = (blade_session_t *)data; + + while (!bs->shutdown) { + + state = bs->state; + + if (!list_empty(&bs->connections)) { + while (blade_session_sending_pop(bs, &json) == KS_STATUS_SUCCESS && json) { + blade_connection_t *bc = NULL; + if (blade_session_connections_choose(bs, json, &bc) == KS_STATUS_SUCCESS) blade_connection_sending_push(bc, json); + cJSON_Delete(json); + } + } + + switch (state) { + case BLADE_SESSION_STATE_DESTROY: + return NULL; + case BLADE_SESSION_STATE_HANGUP: + // @todo detach from session if this connection is attached + blade_session_state_set(bs, BLADE_SESSION_STATE_DESTROY); + break; + case BLADE_SESSION_STATE_READY: + // @todo pop from session receiving queue and pass to blade_protocol_process() + break; + default: break; + } + } + + return NULL; +} + +/* For Emacs: + * Local Variables: + * mode:c + * indent-tabs-mode:t + * tab-width:4 + * c-basic-offset:4 + * End: + * For VIM: + * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet: + */ diff --git a/libs/libblade/src/blade_stack.c b/libs/libblade/src/blade_stack.c index e522f2c617..6ec4af3a29 100644 --- a/libs/libblade/src/blade_stack.c +++ b/libs/libblade/src/blade_stack.c @@ -47,10 +47,17 @@ struct blade_handle_s { config_setting_t *config_directory; config_setting_t *config_datastore; - ks_hash_t *transports; + ks_hash_t *transports; // registered transports exposed by modules, NOT active connections - blade_identity_t *identity; + //blade_identity_t *identity; blade_datastore_t *datastore; + + // @todo insert on connection creations, remove on connection destructions, key based on a UUID for the connection + ks_hash_t *connections; // active connections keyed by connection id + // @todo insert on session creations, remove on session destructions, key based on a UUID for the session + ks_hash_t *sessions; // active sessions keyed by session id + // @todo another hash with sessions keyed by the remote identity without parameters for quick lookup by target identity on sending? + ks_hash_t *requests; // outgoing requests waiting for a response keyed by the message id }; @@ -127,6 +134,14 @@ KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *poo ks_hash_create(&bh->transports, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, bh->pool); ks_assert(bh->transports); + ks_hash_create(&bh->connections, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, bh->pool); + ks_assert(bh->connections); + ks_hash_create(&bh->sessions, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, bh->pool); + ks_assert(bh->sessions); + // @todo decide if this is uint32_t or uuid string, prefer uuid string to avoid needing another lock and variable for next id + ks_hash_create(&bh->requests, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK, bh->pool); + ks_assert(bh->requests); + *bhP = bh; return KS_STATUS_SUCCESS; @@ -150,6 +165,9 @@ KS_DECLARE(ks_status_t) blade_handle_destroy(blade_handle_t **bhP) blade_handle_shutdown(bh); + ks_hash_destroy(&bh->requests); + ks_hash_destroy(&bh->sessions); + ks_hash_destroy(&bh->connections); ks_hash_destroy(&bh->transports); if (bh->tpool && (flags & BH_MYTPOOL)) ks_thread_pool_destroy(&bh->tpool); @@ -212,9 +230,23 @@ KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_ KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh) { + ks_hash_iterator_t *it = NULL; + ks_assert(bh); - // @todo call onshutdown and onunload callbacks for modules from DSOs + for (it = ks_hash_first(bh->requests, KS_UNLOCKED); it; it = ks_hash_next(&it)) { + void *key = NULL; + blade_request_t *value = NULL; + + ks_hash_this(it, (const void **)&key, NULL, (void **)&value); + ks_hash_remove(bh->requests, key); + + blade_request_destroy(&value); + } + + // @todo terminate all sessions, which will disconnect all attached connections + + // @todo call onshutdown and onunload callbacks for modules from DSOs, which will unregister transports and disconnect remaining unattached connections // @todo unload DSOs @@ -245,6 +277,8 @@ KS_DECLARE(ks_status_t) blade_handle_transport_register(blade_handle_t *bh, blad ks_assert(name); ks_assert(callbacks); + // @todo reduce blade_handle_t parameter, pull from blade_module_t parameter + blade_handle_transport_registration_create(&bhtr, bh->pool, bm, callbacks); ks_assert(bhtr); @@ -331,6 +365,59 @@ KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connectio } +KS_DECLARE(blade_connection_t *) blade_handle_connections_get(blade_handle_t *bh, const char *cid) +{ + blade_connection_t *bc = NULL; + + ks_assert(bh); + ks_assert(cid); + + ks_hash_read_lock(bh->connections); + bc = ks_hash_search(bh->connections, (void *)cid, KS_UNLOCKED); + if (bc && blade_connection_read_lock(bc, KS_FALSE) != KS_STATUS_SUCCESS) bc = NULL; + ks_hash_read_unlock(bh->connections); + + return bc; +} + +KS_DECLARE(ks_status_t) blade_handle_connections_add(blade_connection_t *bc) +{ + ks_status_t ret = KS_STATUS_SUCCESS; + blade_handle_t *bh = NULL; + + ks_assert(bc); + + bh = blade_connection_handle_get(bc); + ks_assert(bh); + + ks_hash_write_lock(bh->connections); + ret = ks_hash_insert(bh->connections, (void *)blade_connection_id_get(bc), bc); + ks_hash_write_unlock(bh->connections); + + return ret; +} + +KS_DECLARE(ks_status_t) blade_handle_connections_remove(blade_connection_t *bc) +{ + ks_status_t ret = KS_STATUS_SUCCESS; + blade_handle_t *bh = NULL; + + ks_assert(bc); + + bh = blade_connection_handle_get(bc); + ks_assert(bh); + + blade_connection_write_lock(bc, KS_TRUE); + + ks_hash_write_lock(bh->connections); + if (ks_hash_remove(bh->connections, (void *)blade_connection_id_get(bc)) == NULL) ret = KS_STATUS_FAIL; + ks_hash_write_unlock(bh->connections); + + blade_connection_write_unlock(bc); + + return ret; +} + KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh) diff --git a/libs/libblade/src/include/blade.h b/libs/libblade/src/include/blade.h index d558fa5f4c..54d744657a 100644 --- a/libs/libblade/src/include/blade.h +++ b/libs/libblade/src/include/blade.h @@ -43,6 +43,8 @@ #include "blade_identity.h" #include "blade_module.h" #include "blade_connection.h" +#include "blade_session.h" +#include "blade_protocol.h" #include "blade_datastore.h" #include "bpcp.h" diff --git a/libs/libblade/src/include/blade_connection.h b/libs/libblade/src/include/blade_connection.h index 70ee105b7d..5d149bc445 100644 --- a/libs/libblade/src/include/blade_connection.h +++ b/libs/libblade/src/include/blade_connection.h @@ -43,16 +43,20 @@ KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP, KS_DECLARE(ks_status_t) blade_connection_destroy(blade_connection_t **bcP); KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc, blade_connection_direction_t direction); KS_DECLARE(ks_status_t) blade_connection_shutdown(blade_connection_t *bc); +KS_DECLARE(blade_handle_t *) blade_connection_handle_get(blade_connection_t *bc); +KS_DECLARE(const char *) blade_connection_id_get(blade_connection_t *bc); +KS_DECLARE(ks_status_t) blade_connection_read_lock(blade_connection_t *bc, ks_bool_t block); +KS_DECLARE(ks_status_t) blade_connection_read_unlock(blade_connection_t *bc); +KS_DECLARE(ks_status_t) blade_connection_write_lock(blade_connection_t *bc, ks_bool_t block); +KS_DECLARE(ks_status_t) blade_connection_write_unlock(blade_connection_t *bc); KS_DECLARE(void *) blade_connection_transport_init_get(blade_connection_t *bc); KS_DECLARE(void *) blade_connection_transport_get(blade_connection_t *bc); KS_DECLARE(void) blade_connection_transport_set(blade_connection_t *bc, void *transport_data); KS_DECLARE(void) blade_connection_state_set(blade_connection_t *bc, blade_connection_state_t state); KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc); KS_DECLARE(blade_connection_rank_t) blade_connection_rank(blade_connection_t *bc, blade_identity_t *target); -KS_DECLARE(ks_status_t) blade_connection_sending_push(blade_connection_t *bc, blade_identity_t *target, cJSON *json); -KS_DECLARE(ks_status_t) blade_connection_sending_pop(blade_connection_t *bc, blade_identity_t **target, cJSON **json); -KS_DECLARE(ks_status_t) blade_connection_receiving_push(blade_connection_t *bc, cJSON *json); -KS_DECLARE(ks_status_t) blade_connection_receiving_pop(blade_connection_t *bc, cJSON **json); +KS_DECLARE(ks_status_t) blade_connection_sending_push(blade_connection_t *bc, cJSON *json); +KS_DECLARE(ks_status_t) blade_connection_sending_pop(blade_connection_t *bc, cJSON **json); KS_END_EXTERN_C #endif diff --git a/libs/libblade/src/include/blade_protocol.h b/libs/libblade/src/include/blade_protocol.h new file mode 100644 index 0000000000..fcd89cc01a --- /dev/null +++ b/libs/libblade/src/include/blade_protocol.h @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2017, Shane Bryldt + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of the original author; nor the names of any contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _BLADE_PROTOCOL_H_ +#define _BLADE_PROTOCOL_H_ +#include + +KS_BEGIN_EXTERN_C +KS_DECLARE(ks_status_t) blade_request_create(blade_request_t **breqP, ks_pool_t *pool, const char *session_id, cJSON *json /*, response_callback*/); +KS_DECLARE(ks_status_t) blade_request_destroy(blade_request_t **breqP); +KS_END_EXTERN_C + +#endif + +/* For Emacs: + * Local Variables: + * mode:c + * indent-tabs-mode:t + * tab-width:4 + * c-basic-offset:4 + * End: + * For VIM: + * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet: + */ diff --git a/libs/libblade/src/include/blade_session.h b/libs/libblade/src/include/blade_session.h new file mode 100644 index 0000000000..975d10bbfb --- /dev/null +++ b/libs/libblade/src/include/blade_session.h @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2017, Shane Bryldt + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of the original author; nor the names of any contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _BLADE_SESSION_H_ +#define _BLADE_SESSION_H_ +#include + +KS_BEGIN_EXTERN_C +KS_DECLARE(ks_status_t) blade_session_create(blade_session_t **bsP, blade_handle_t *bh); +KS_DECLARE(ks_status_t) blade_session_destroy(blade_session_t **bsP); +KS_DECLARE(ks_status_t) blade_sesssion_startup(blade_session_t *bs); +KS_DECLARE(ks_status_t) blade_session_shutdown(blade_session_t *bs); +KS_DECLARE(const char *) blade_session_id_get(blade_session_t *bs); +KS_DECLARE(void) blade_session_id_set(blade_session_t *bs, const char *id); +KS_DECLARE(void) blade_session_state_set(blade_session_t *bs, blade_session_state_t state); +KS_DECLARE(void) blade_session_hangup(blade_session_t *bs); +KS_DECLARE(ks_status_t) blade_session_send(blade_session_t *bs, cJSON *json); +KS_DECLARE(ks_status_t) blade_session_sending_push(blade_session_t *bs, cJSON *json); +KS_DECLARE(ks_status_t) blade_session_sending_pop(blade_session_t *bs, cJSON **json); +KS_END_EXTERN_C + +#endif + +/* For Emacs: + * Local Variables: + * mode:c + * indent-tabs-mode:t + * tab-width:4 + * c-basic-offset:4 + * End: + * For VIM: + * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet: + */ diff --git a/libs/libblade/src/include/blade_stack.h b/libs/libblade/src/include/blade_stack.h index 40c355a8df..f6de9665ba 100644 --- a/libs/libblade/src/include/blade_stack.h +++ b/libs/libblade/src/include/blade_stack.h @@ -51,7 +51,10 @@ KS_DECLARE(ks_thread_pool_t *) blade_handle_tpool_get(blade_handle_t *bh); KS_DECLARE(ks_status_t) blade_handle_transport_register(blade_handle_t *bh, blade_module_t *bm, const char *name, blade_transport_callbacks_t *callbacks); KS_DECLARE(ks_status_t) blade_handle_transport_unregister(blade_handle_t *bh, const char *name); KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target); - +KS_DECLARE(blade_connection_t *) blade_handle_connections_get(blade_handle_t *bh, const char *cid); +KS_DECLARE(ks_status_t) blade_handle_connections_add(blade_connection_t *bc); +KS_DECLARE(ks_status_t) blade_handle_connections_remove(blade_connection_t *bc); + KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh); KS_DECLARE(ks_status_t) blade_handle_datastore_store(blade_handle_t *bh, const void *key, int32_t key_length, const void *data, int64_t data_length); KS_DECLARE(ks_status_t) blade_handle_datastore_fetch(blade_handle_t *bh, diff --git a/libs/libblade/src/include/blade_types.h b/libs/libblade/src/include/blade_types.h index 249dd9b31f..5531557dd6 100644 --- a/libs/libblade/src/include/blade_types.h +++ b/libs/libblade/src/include/blade_types.h @@ -44,6 +44,9 @@ typedef struct blade_module_s blade_module_t; typedef struct blade_module_callbacks_s blade_module_callbacks_t; typedef struct blade_transport_callbacks_s blade_transport_callbacks_t; typedef struct blade_connection_s blade_connection_t; +typedef struct blade_session_s blade_session_t; +typedef struct blade_request_s blade_request_t; +typedef struct blade_response_s blade_response_t; typedef struct blade_datastore_s blade_datastore_t; @@ -84,6 +87,18 @@ typedef enum { BLADE_CONNECTION_RANK_GREAT, } blade_connection_rank_t; + +typedef enum { + BLADE_SESSION_STATE_NONE, + BLADE_SESSION_STATE_DESTROY, + BLADE_SESSION_STATE_HANGUP, + BLADE_SESSION_STATE_ATTACH, + BLADE_SESSION_STATE_DETACH, + BLADE_SESSION_STATE_READY, +} blade_session_state_t; + + + typedef ks_status_t (*blade_module_load_callback_t)(blade_module_t **bmP, blade_handle_t *bh); typedef ks_status_t (*blade_module_unload_callback_t)(blade_module_t *bm); typedef ks_status_t (*blade_module_startup_callback_t)(blade_module_t *bm, config_setting_t *config); @@ -99,7 +114,7 @@ struct blade_module_callbacks_s { typedef ks_status_t (*blade_transport_connect_callback_t)(blade_connection_t **bcP, blade_module_t *bm, blade_identity_t *target); typedef blade_connection_rank_t (*blade_transport_rank_callback_t)(blade_connection_t *bc, blade_identity_t *target); -typedef ks_status_t (*blade_transport_send_callback_t)(blade_connection_t *bc, blade_identity_t *target, cJSON *json); +typedef ks_status_t (*blade_transport_send_callback_t)(blade_connection_t *bc, cJSON *json); typedef ks_status_t (*blade_transport_receive_callback_t)(blade_connection_t *bc, cJSON **json); typedef blade_connection_state_hook_t (*blade_transport_state_callback_t)(blade_connection_t *bc, blade_connection_state_condition_t condition); @@ -124,6 +139,26 @@ struct blade_transport_callbacks_s { }; +struct blade_request_s { + ks_pool_t *pool; + uint32_t refs; + const char *session_id; + + cJSON *message; + const char *message_id; // pulled from message for easier keying + // @todo ttl to wait for response before injecting an error response locally + // @todo rpc response callback +}; + +struct blade_response_s { + ks_pool_t *pool; + uint32_t refs; + const char *session_id; + blade_request_t *request; + + cJSON *message; +}; + KS_END_EXTERN_C #endif