diff --git a/libs/libblade/src/blade_connection.c b/libs/libblade/src/blade_connection.c index 69f1a39366..e18f2c247e 100644 --- a/libs/libblade/src/blade_connection.c +++ b/libs/libblade/src/blade_connection.c @@ -51,6 +51,49 @@ struct blade_connection_s { //ks_q_t *receiving; }; +// @todo may want to make this reusable for session as it'll need to queue the same details during temporary connection loss +typedef struct blade_connection_sending_s blade_connection_sending_t; +struct blade_connection_sending_s { + ks_pool_t *pool; + blade_identity_t *target; + cJSON *json; +}; + +ks_status_t blade_connection_sending_create(blade_connection_sending_t **bcsP, ks_pool_t *pool, blade_identity_t *target, cJSON *json) +{ + blade_connection_sending_t *bcs = NULL; + + ks_assert(bcsP); + ks_assert(pool); + ks_assert(json); + + bcs = ks_pool_alloc(pool, sizeof(blade_connection_sending_t)); + bcs->pool = pool; + bcs->target = target; + bcs->json = json; + *bcsP = bcs; + + return KS_STATUS_SUCCESS; +} + +ks_status_t blade_connection_sending_destroy(blade_connection_sending_t **bcsP) +{ + blade_connection_sending_t *bcs = NULL; + + ks_assert(bcsP); + ks_assert(*bcsP); + + bcs = *bcsP; + + if (bcs->target) blade_identity_destroy(&bcs->target); + if (bcs->json) cJSON_Delete(bcs->json); + + ks_pool_free(bcs->pool, bcsP); + + return KS_STATUS_SUCCESS; +} + + void *blade_connection_state_thread(ks_thread_t *thread, void *data); @@ -74,7 +117,8 @@ KS_DECLARE(ks_status_t) blade_connection_create(blade_connection_t **bcP, bc->transport_init_data = transport_init_data; bc->transport_callbacks = transport_callbacks; ks_q_create(&bc->sending, pool, 0); - //ks_q_create(&bc->receiving, pool, 0); + ks_assert(bc->sending); + *bcP = bc; return KS_STATUS_SUCCESS; @@ -92,7 +136,6 @@ KS_DECLARE(ks_status_t) blade_connection_destroy(blade_connection_t **bcP) blade_connection_shutdown(bc); ks_q_destroy(&bc->sending); - //ks_q_destroy(&bc->receiving); ks_pool_free(bc->pool, bcP); @@ -122,6 +165,8 @@ KS_DECLARE(ks_status_t) blade_connection_startup(blade_connection_t *bc, blade_c KS_DECLARE(ks_status_t) blade_connection_shutdown(blade_connection_t *bc) { + blade_connection_sending_t *bcs = NULL; + ks_assert(bc); if (bc->state_thread) { @@ -131,8 +176,7 @@ KS_DECLARE(ks_status_t) blade_connection_shutdown(blade_connection_t *bc) bc->shutdown = KS_FALSE; } - //while (ks_q_trypop(bc->sending, (void **)&message) == KS_STATUS_SUCCESS && message) blade_message_discard(&message); - //while (ks_q_trypop(bc->receiving, (void **)&message) == KS_STATUS_SUCCESS && message) blade_message_discard(&message); + while (ks_q_trypop(bc->sending, (void **)&bcs) == KS_STATUS_SUCCESS && bcs) blade_connection_sending_destroy(&bcs); return KS_STATUS_SUCCESS; } @@ -221,40 +265,40 @@ KS_DECLARE(void) blade_connection_disconnect(blade_connection_t *bc) KS_DECLARE(ks_status_t) blade_connection_sending_push(blade_connection_t *bc, blade_identity_t *target, cJSON *json) { + blade_connection_sending_t *bcs = NULL; + ks_assert(bc); ks_assert(json); - // @todo need internal envelope to wrap an identity object and a json object just for the queue + blade_connection_sending_create(&bcs, bc->pool, target, json); + ks_assert(bcs); - return KS_STATUS_SUCCESS; + return ks_q_push(bc->sending, bcs); } KS_DECLARE(ks_status_t) blade_connection_sending_pop(blade_connection_t *bc, blade_identity_t **target, cJSON **json) { + ks_status_t ret = KS_STATUS_SUCCESS; + blade_connection_sending_t *bcs = NULL; + ks_assert(bc); ks_assert(json); - // @todo need internal envelope to wrap an identity object and a json object just for the queue - - return KS_STATUS_SUCCESS; + ret = ks_q_trypop(bc->sending, (void **)&bcs); + + if (bcs) { + if (target) *target = bcs->target; + *json = bcs->json; + + bcs->target = NULL; + bcs->json = NULL; + + blade_connection_sending_destroy(&bcs); + } + + return ret; } -// @todo may not need receiving queue on connection, by the time we are queueing we should have a session to receive into -//KS_DECLARE(ks_status_t) blade_connection_receiving_push(blade_connection_t *bc, cJSON *json) -//{ -// ks_assert(bc); -// ks_assert(json); - -// return ks_q_push(bc->receiving, json); -//} - -//KS_DECLARE(ks_status_t) blade_connection_receiving_pop(blade_connection_t *bc, cJSON **json) -//{ -// ks_assert(bc); -// ks_assert(json); - -// return ks_q_trypop(bc->receiving, (void **)json); -//} void *blade_connection_state_thread(ks_thread_t *thread, void *data) { @@ -262,6 +306,7 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data) blade_connection_state_t state; blade_transport_state_callback_t callback = NULL; blade_connection_state_hook_t hook = BLADE_CONNECTION_STATE_HOOK_SUCCESS; + blade_identity_t *target = NULL; cJSON *json = NULL; ks_assert(thread); @@ -270,22 +315,28 @@ void *blade_connection_state_thread(ks_thread_t *thread, void *data) bc = (blade_connection_t *)data; while (!bc->shutdown) { - - // @todo pop from connection sending queue and call transport callback to write one message (passing target identity too) - // and delete the cJSON object here after returning from callback - - // @todo seems like connection will not need a receiving queue as the session will exist prior to async transmissions - state = bc->state; hook = BLADE_CONNECTION_STATE_HOOK_SUCCESS; callback = blade_connection_state_callback_lookup(bc, state); - // @todo should this just go in the ready state callback? it's generalized here, so the callback for READY doesn't really - // need to do anything - if (state == BLADE_CONNECTION_STATE_READY && bc->transport_callbacks->onreceive(bc, &json) == KS_STATUS_SUCCESS && json) { - // @todo push json to session receiving queue - + while (blade_connection_sending_pop(bc, &target, &json) == KS_STATUS_SUCCESS && json) { + if (bc->transport_callbacks->onsend(bc, target, json) != KS_STATUS_SUCCESS) { + blade_connection_disconnect(bc); + break; + } + } + + if (state == BLADE_CONNECTION_STATE_READY) { + do { + if (bc->transport_callbacks->onreceive(bc, &json) != KS_STATUS_SUCCESS) { + blade_connection_disconnect(bc); + break; + } + if (json) { + // @todo push json to session receiving queue + } + } while (json) ; } if (callback) hook = callback(bc, BLADE_CONNECTION_STATE_CONDITION_POST); diff --git a/libs/libblade/src/blade_identity.c b/libs/libblade/src/blade_identity.c index 3e814e80af..034923c7de 100644 --- a/libs/libblade/src/blade_identity.c +++ b/libs/libblade/src/blade_identity.c @@ -75,6 +75,9 @@ KS_DECLARE(ks_status_t) blade_identity_parse(blade_identity_t *bi, const char *u ks_assert(uri); if (bi->uri) ks_pool_free(bi->pool, &bi->uri); + bi->uri = ks_pstrdup(bi->pool, uri); + + // @todo parse into components return KS_STATUS_SUCCESS; } diff --git a/libs/libblade/src/blade_module_wss.c b/libs/libblade/src/blade_module_wss.c index 75f2993a51..f7c485b2ff 100644 --- a/libs/libblade/src/blade_module_wss.c +++ b/libs/libblade/src/blade_module_wss.c @@ -342,6 +342,8 @@ ks_status_t blade_module_wss_on_startup(blade_module_t *bm, config_setting_t *co bm_wss = (blade_module_wss_t *)blade_module_data_get(bm); + // @todo register wss transport to the blade_handle_t + if (blade_module_wss_config(bm_wss, config) != KS_STATUS_SUCCESS) { ks_log(KS_LOG_DEBUG, "blade_module_wss_config failed\n"); return KS_STATUS_FAIL; @@ -381,6 +383,8 @@ ks_status_t blade_module_wss_on_shutdown(blade_module_t *bm) bm_wss = (blade_module_wss_t *)blade_module_data_get(bm); + // @todo unregister wss transport from the blade_handle_t + if (bm_wss->listeners_thread) { bm_wss->shutdown = KS_TRUE; ks_thread_join(bm_wss->listeners_thread); @@ -396,6 +400,9 @@ ks_status_t blade_module_wss_on_shutdown(blade_module_t *bm) bm_wss->listeners_count = 0; if (bm_wss->listeners_poll) ks_pool_free(bm_wss->pool, &bm_wss->listeners_poll); + // @todo connections should be gracefully disconnected so that they detach from sessions properly + // which means this should occur before the listeners thread is terminated, which requires that + // the listener sockets be made inactive (or closed) to stop accepting while shutting down while (ks_q_trypop(bm_wss->disconnected, (void **)&bc) == KS_STATUS_SUCCESS) ; list_iterator_start(&bm_wss->connected); while (list_iterator_hasnext(&bm_wss->connected)) { @@ -581,22 +588,30 @@ blade_connection_rank_t blade_transport_wss_on_rank(blade_connection_t *bc, blad ks_status_t blade_transport_wss_write(blade_transport_wss_t *bt_wss, cJSON *json) { + ks_status_t ret = KS_STATUS_SUCCESS; char *json_str = cJSON_PrintUnformatted(json); ks_size_t json_str_len = 0; if (!json_str) { // @todo error logging - return KS_STATUS_FAIL; + ret = KS_STATUS_FAIL; + goto done; } json_str_len = strlen(json_str) + 1; // @todo determine if WSOC_TEXT null terminates when read_frame is called, or if it's safe to include like this - kws_write_frame(bt_wss->kws, WSOC_TEXT, json_str, json_str_len); + if (kws_write_frame(bt_wss->kws, WSOC_TEXT, json_str, json_str_len) != json_str_len) { + // @todo error logging + ret = KS_STATUS_FAIL; + goto done; + } - free(json_str); + done: + if (json_str) free(json_str); - return KS_STATUS_SUCCESS; + return ret; } ks_status_t blade_transport_wss_on_send(blade_connection_t *bc, blade_identity_t *target, cJSON *json) { + ks_status_t ret = KS_STATUS_SUCCESS; blade_transport_wss_t *bt_wss = NULL; ks_assert(bc); @@ -606,7 +621,13 @@ ks_status_t blade_transport_wss_on_send(blade_connection_t *bc, blade_identity_t bt_wss = (blade_transport_wss_t *)blade_connection_transport_get(bc); - return blade_transport_wss_write(bt_wss, json); + ret = blade_transport_wss_write(bt_wss, json); + + // @todo use reference counting on blade_identity_t and cJSON objects + if (target) blade_identity_destroy(&target); + cJSON_Delete(json); + + return ret; } ks_status_t blade_transport_wss_read(blade_transport_wss_t *bt_wss, cJSON **json) @@ -743,7 +764,8 @@ blade_connection_state_hook_t blade_transport_wss_on_state_attach_inbound(blade_ // @todo Establish sessid and discover existing session or create and register new session through BLADE commands // Set session state to CONNECT if its new or RECONNECT if existing // start session and its thread if its new - + + ks_sleep_ms(1000); // @todo temporary testing, remove this and return success once negotiations are done return BLADE_CONNECTION_STATE_HOOK_BYPASS; } diff --git a/libs/libblade/test/bladec.c b/libs/libblade/test/bladec.c index aadfed612e..1b7243bca8 100644 --- a/libs/libblade/test/bladec.c +++ b/libs/libblade/test/bladec.c @@ -16,7 +16,6 @@ char g_console_input[CONSOLE_INPUT_MAX]; size_t g_console_input_length = 0; size_t g_console_input_eol = 0; -void service_peer_state_callback(blade_service_t *service, blade_peer_t *peer, blade_peerstate_t state); void loop(blade_handle_t *bh); void process_console_input(blade_handle_t *bh, char *line); @@ -71,11 +70,13 @@ int main(int argc, char **argv) return EXIT_FAILURE; } - if (blade_handle_startup(bh, config_blade, service_peer_state_callback) != KS_STATUS_SUCCESS) { + if (blade_handle_startup(bh, config_blade) != KS_STATUS_SUCCESS) { ks_log(KS_LOG_ERROR, "Blade startup failed\n"); return EXIT_FAILURE; } + // @todo get to wss module callbacks, call onload to kick off registration + loop(bh); blade_handle_destroy(&bh); @@ -85,12 +86,8 @@ int main(int argc, char **argv) return 0; } -void service_peer_state_callback(blade_service_t *service, blade_peer_t *peer, blade_peerstate_t state) -{ - // @todo log output and pop peer messages if state == BLADE_PEERSTATE_RECEIVING - ks_log(KS_LOG_INFO, "service peer state callback: %d\n", (int)state); -} - + + void buffer_console_input(void) { ssize_t bytes = 0; diff --git a/libs/libblade/test/bladec.cfg b/libs/libblade/test/bladec.cfg index a14a4e7f7a..95a7f24397 100644 --- a/libs/libblade/test/bladec.cfg +++ b/libs/libblade/test/bladec.cfg @@ -1,28 +1,5 @@ blade: { - # client stuff, for peers who connect out to services - client: - { - directory: - { - # todo: hints for ways to find a directory service, at least kws client_data for now - # add DNS SRV in the future - uri = "???:127.0.0.1+2100:???"; # todo: confirm expected format, "uri:host:proto" - - websocket: - { - # SSL group is optional, disabled when absent - ssl: - { - # todo: client SSL stuffs here - }; - }; - }; - }; - - - # server stuff, for services that peers connect to - # todo: consider encapsulating in a "server" group for organizational structure datastore: { database: @@ -30,21 +7,18 @@ blade: path = ":mem:"; }; }; - service: + wss: { - websockets: + endpoints: { - endpoints: - { - ipv4 = ( { address = "0.0.0.0", port = 2100 } ); - ipv6 = ( { address = "::", port = 2100 } ); - backlog = 128; - }; - # SSL group is optional, disabled when absent - ssl: - { - # todo: service SSL stuffs here - }; + ipv4 = ( { address = "0.0.0.0", port = 2100 } ); + ipv6 = ( { address = "::", port = 2100 } ); + backlog = 128; + }; + # SSL group is optional, disabled when absent + ssl: + { + # todo: server SSL stuffs here }; }; };