diff --git a/libs/libblade/Makefile.am b/libs/libblade/Makefile.am index f55b1266bb..ee5e5677d0 100644 --- a/libs/libblade/Makefile.am +++ b/libs/libblade/Makefile.am @@ -11,13 +11,14 @@ libunqlite_la_CFLAGS = -DUNQLITE_ENABLE_THREADS libunqlite_la_LIBADD = -lpthread lib_LTLIBRARIES = libblade.la -libblade_la_SOURCES = src/blade.c src/blade_stack.c src/blade_peer.c src/blade_service.c src/bpcp.c src/blade_datastore.c src/blade_directory.c +libblade_la_SOURCES = src/blade.c src/blade_stack.c src/blade_peer.c src/blade_service.c src/bpcp.c src/blade_datastore.c +libblade_la_SOURCES += src/blade_message.c libblade_la_CFLAGS = $(AM_CFLAGS) $(AM_CPPFLAGS) libblade_la_LDFLAGS = -version-info 0:1:0 -lncurses -lpthread -lm -lconfig $(AM_LDFLAGS) libblade_la_LIBADD = libunqlite.la library_includedir = $(prefix)/include library_include_HEADERS = src/include/blade.h src/include/blade_types.h src/include/blade_stack.h src/include/blade_peer.h src/include/blade_service.h -library_include_HEADERS += src/include/bpcp.h src/include/blade_datastore.h src/include/blade_directory.h +library_include_HEADERS += src/include/bpcp.h src/include/blade_datastore.h src/include/blade_message.h library_include_HEADERS += src/include/unqlite.h test/tap.h tests: libblade.la diff --git a/libs/libblade/src/blade_directory.c b/libs/libblade/src/blade_directory.c deleted file mode 100644 index ab24942dfe..0000000000 --- a/libs/libblade/src/blade_directory.c +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Copyright (c) 2007-2014, Anthony Minessale II - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions - * are met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * * Neither the name of the original author; nor the names of any contributors - * may be used to endorse or promote products derived from this software - * without specific prior written permission. - * - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER - * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, - * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, - * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING - * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -#include "blade.h" - - -typedef enum { - BD_NONE = 0, - BD_MYPOOL = (1 << 0), - BD_MYTPOOL = (1 << 1), -} bdpvt_flag_t; - -struct blade_directory_s { - bdpvt_flag_t flags; - ks_pool_t *pool; - ks_thread_pool_t *tpool; - - config_setting_t *config_service; - - blade_service_t *service; -}; - - - - -KS_DECLARE(ks_status_t) blade_directory_destroy(blade_directory_t **bdP) -{ - blade_directory_t *bd = NULL; - bdpvt_flag_t flags; - ks_pool_t *pool; - - ks_assert(bdP); - - bd = *bdP; - *bdP = NULL; - - ks_assert(bd); - - flags = bd->flags; - pool = bd->pool; - - blade_directory_shutdown(bd); - - if (bd->tpool && (flags & BD_MYTPOOL)) ks_thread_pool_destroy(&bd->tpool); - - ks_pool_free(bd->pool, &bd); - - if (pool && (flags & BD_MYPOOL)) ks_pool_close(&pool); - - return KS_STATUS_SUCCESS; -} - -KS_DECLARE(ks_status_t) blade_directory_create(blade_directory_t **bdP, ks_pool_t *pool, ks_thread_pool_t *tpool) -{ - bdpvt_flag_t newflags = BD_NONE; - blade_directory_t *bd = NULL; - - if (!pool) { - newflags |= BD_MYPOOL; - ks_pool_open(&pool); - ks_assert(pool); - } - // @todo: move thread pool creation to startup which allows thread pool to be configurable - if (!tpool) { - newflags |= BD_MYTPOOL; - ks_thread_pool_create(&tpool, - BLADE_DIRECTORY_TPOOL_MIN, - BLADE_DIRECTORY_TPOOL_MAX, - BLADE_DIRECTORY_TPOOL_STACK, - KS_PRI_NORMAL, - BLADE_DIRECTORY_TPOOL_IDLE); - ks_assert(tpool); - } - - bd = ks_pool_alloc(pool, sizeof(*bd)); - bd->flags = newflags; - bd->pool = pool; - bd->tpool = tpool; - *bdP = bd; - - return KS_STATUS_SUCCESS; -} - -ks_status_t blade_directory_config(blade_directory_t *bd, config_setting_t *config) -{ - config_setting_t *service = NULL; - - ks_assert(bd); - - if (!config) return KS_STATUS_FAIL; - if (!config_setting_is_group(config)) return KS_STATUS_FAIL; - - service = config_setting_get_member(config, "service"); - if (!service) return KS_STATUS_FAIL; - - bd->config_service = service; - - return KS_STATUS_SUCCESS; -} - -KS_DECLARE(ks_status_t) blade_directory_startup(blade_directory_t *bd, config_setting_t *config) -{ - ks_assert(bd); - - blade_directory_shutdown(bd); - - if (blade_directory_config(bd, config) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; - - blade_service_create(&bd->service, bd->pool, bd->tpool); - ks_assert(bd->service); - if (blade_service_startup(bd->service, bd->config_service) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; - - return KS_STATUS_SUCCESS; -} - -KS_DECLARE(ks_status_t) blade_directory_shutdown(blade_directory_t *bd) -{ - ks_assert(bd); - - if (bd->service) blade_service_destroy(&bd->service); - - return KS_STATUS_SUCCESS; -} - - -/* For Emacs: - * Local Variables: - * mode:c - * indent-tabs-mode:t - * tab-width:4 - * c-basic-offset:4 - * End: - * For VIM: - * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet: - */ diff --git a/libs/libblade/src/blade_message.c b/libs/libblade/src/blade_message.c new file mode 100644 index 0000000000..0bf401302f --- /dev/null +++ b/libs/libblade/src/blade_message.c @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2007-2014, Anthony Minessale II + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of the original author; nor the names of any contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "blade.h" + +struct blade_message_s { + ks_pool_t *pool; + blade_handle_t *handle; + + void *data; + ks_size_t data_length; + ks_size_t data_size; +}; + + +KS_DECLARE(ks_status_t) blade_message_destroy(blade_message_t **bmP) +{ + blade_message_t *bm = NULL; + + ks_assert(bmP); + + bm = *bmP; + *bmP = NULL; + + ks_assert(bm); + + if (bm->data) ks_pool_free(bm->pool, &bm->data); + + ks_pool_free(bm->pool, &bm); + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(ks_status_t) blade_message_create(blade_message_t **bmP, ks_pool_t *pool, blade_handle_t *handle) +{ + blade_message_t *bm = NULL; + + ks_assert(bmP); + ks_assert(pool); + ks_assert(handle); + + bm = ks_pool_alloc(pool, sizeof(*bm)); + bm->pool = pool; + bm->handle = handle; + *bmP = bm; + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(ks_status_t) blade_message_discard(blade_message_t **bm) +{ + ks_assert(bm); + ks_assert(*bm); + + return blade_handle_message_discard((*bm)->handle, bm); +} + +KS_DECLARE(ks_status_t) blade_message_set(blade_message_t *bm, void *data, ks_size_t data_length) +{ + ks_assert(bm); + ks_assert(data); + ks_assert(data_length > 0); + + // @todo fail on a max message size? + + if (data_length > bm->data_size) { + // @todo talk to tony about adding flags to ks_pool_resize_ex to prevent the memcpy, don't need to copy old memory here + // otherwise switch to a new allocation instead of resizing + bm->data = ks_pool_resize(bm->pool, bm->data, data_length); + ks_assert(bm->data); + bm->data_size = data_length; + } + memcpy(bm->data, data, data_length); + bm->data_length = data_length; + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(ks_status_t) blade_message_get(blade_message_t *bm, void **data, ks_size_t *data_length) +{ + ks_assert(bm); + + *data = bm->data; + *data_length = bm->data_length; + + return KS_STATUS_SUCCESS; +} + + +/* For Emacs: + * Local Variables: + * mode:c + * indent-tabs-mode:t + * tab-width:4 + * c-basic-offset:4 + * End: + * For VIM: + * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet: + */ diff --git a/libs/libblade/src/blade_peer.c b/libs/libblade/src/blade_peer.c index 2e794a629b..942841ce64 100644 --- a/libs/libblade/src/blade_peer.c +++ b/libs/libblade/src/blade_peer.c @@ -45,7 +45,9 @@ struct blade_peer_s { ks_thread_pool_t *tpool; blade_service_t *service; + ks_socket_t sock; ks_bool_t shutdown; + ks_bool_t disconnecting; kws_t *kws; ks_thread_t *kws_thread; @@ -118,8 +120,10 @@ KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, k return KS_STATUS_SUCCESS; } -KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, kws_t *kws) +KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, ks_socket_t sock) { + kws_t *kws = NULL; + ks_assert(bp); ks_assert(kws); @@ -127,7 +131,7 @@ KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, kws_t *kws) blade_peer_shutdown(bp); - bp->kws = kws; + bp->sock = sock; if (ks_thread_create_ex(&bp->kws_thread, blade_peer_kws_thread, @@ -142,6 +146,8 @@ KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, kws_t *kws) KS_DECLARE(ks_status_t) blade_peer_shutdown(blade_peer_t *bp) { + blade_message_t *message = NULL; + ks_assert(bp); bp->shutdown = KS_TRUE; @@ -150,61 +156,125 @@ KS_DECLARE(ks_status_t) blade_peer_shutdown(blade_peer_t *bp) ks_thread_join(bp->kws_thread); ks_pool_free(bp->pool, &bp->kws_thread); } + + while (ks_q_trypop(bp->messages_sending, (void **)&message) == KS_STATUS_SUCCESS && message) blade_message_discard(&message); + while (ks_q_trypop(bp->messages_receiving, (void **)&message) == KS_STATUS_SUCCESS && message) blade_message_discard(&message); if (bp->kws) kws_destroy(&bp->kws); + else if (bp->sock != KS_SOCK_INVALID) ks_socket_close(&bp->sock); + bp->sock = KS_SOCK_INVALID; bp->shutdown = KS_FALSE; return KS_STATUS_SUCCESS; } +KS_DECLARE(void) blade_peer_disconnect(blade_peer_t *bp) +{ + ks_assert(bp); + + bp->disconnecting = KS_TRUE; +} + +KS_DECLARE(ks_bool_t) blade_peer_disconnecting(blade_peer_t *bp) +{ + ks_assert(bp); + return bp->disconnecting; +} + +KS_DECLARE(ks_status_t) blade_peer_message_pop(blade_peer_t *peer, blade_message_t **message) +{ + ks_assert(peer); + ks_assert(message); + + *message = NULL; + return ks_q_trypop(peer->messages_receiving, (void **)message); +} + +KS_DECLARE(ks_status_t) blade_peer_message_push(blade_peer_t *peer, void *data, ks_size_t data_length) +{ + blade_message_t *message = NULL; + + ks_assert(peer); + ks_assert(data); + ks_assert(data_length > 0); + + if (blade_handle_message_claim(blade_service_handle(peer->service), &message, data, data_length) != KS_STATUS_SUCCESS || !message) { + // @todo error handling + // just drop the peer for now, the only failure scenarios are asserted OOM, or if the discard queue pop fails + peer->disconnecting = KS_TRUE; + return KS_STATUS_FAIL; + } + ks_q_push(peer->messages_sending, message); + return KS_STATUS_SUCCESS; +} + void *blade_peer_kws_thread(ks_thread_t *thread, void *data) { - blade_peer_t *peer; + blade_peer_t *peer = NULL; kws_opcode_t opcode; - uint8_t *data; - ks_size_t data_len; - blade_message_t *message; + uint8_t *frame_data = NULL; + ks_size_t frame_data_len = 0; + blade_message_t *message = NULL; + int32_t poll_flags = 0; ks_assert(thread); ks_assert(data); peer = (blade_peer_t *)data; + // @todo: SSL init stuffs based on data from peer->service->config_websockets_ssl to pass into kws_init + + if (kws_init(&peer->kws, peer->sock, NULL, NULL, KWS_BLOCK, peer->pool) != KS_STATUS_SUCCESS) { + peer->disconnecting = KS_TRUE; + return NULL; + } + while (!peer->shutdown) { - // @todo use nonblocking kws mode so that if no data at all is available yet we can still do other things such as sending messages before trying again - // or easier alternative, just use ks_poll (or select) to check if there is a POLLIN event pending, but this requires direct access to the socket, or - // kws can be updated to add a function to poll the inner socket for events (IE, kws_poll(kws, &inbool, NULL, &errbool, timeout)) - data_len = kws_read_frame(peer->kws, &opcode, &data); + // @todo get exact timeout from service config? + poll_flags = ks_wait_sock(peer->sock, 100, KS_POLL_READ | KS_POLL_ERROR); - if (data_len <= 0) { - // @todo error handling, strerror(ks_errno()) - // 0 means socket closed with WS_NONE, which closes websocket with no additional reason - // -1 means socket closed with a general failure - // -2 means nonblocking wait - // other values are based on WS_XXX reasons - // negative values are based on reasons, except for -1 is but -2 is nonblocking wait, and + if (poll_flags & KS_POLL_ERROR) { + // @todo switch this (and others below) to the enum for the state callback, called during the service connected peer cleanup + peer->disconnecting = KS_TRUE; + break; + } + + if (poll_flags & KS_POLL_READ) { + frame_data_len = kws_read_frame(peer->kws, &opcode, &frame_data); - // @todo: this way of disconnecting would have the service periodically check the list of connected peers for those that are disconnecting, - // remove them from the connected peer list, and then call peer destroy which will wait for this thread to rejoin which it already will have, - // and then destroy the inner kws and finish any cleanup of the actual socket if neccessary, and can still call an ondisconnected callback - // at the service level - peer->disconnecting = KS_TRUE; - break; - } + if (frame_data_len <= 0) { + // @todo error handling, strerror(ks_errno()) + // 0 means socket closed with WS_NONE, which closes websocket with no additional reason + // -1 means socket closed with a general failure + // -2 means nonblocking wait + // other values are based on WS_XXX reasons + // negative values are based on reasons, except for -1 is but -2 is nonblocking wait, and + + // @todo: this way of disconnecting would have the service periodically check the list of connected peers for those that are disconnecting, + // remove them from the connected peer list, and then call peer destroy which will wait for this thread to rejoin which it already will have, + // and then destroy the inner kws and finish any cleanup of the actual socket if neccessary, and can still call an ondisconnected callback + // at the service level + peer->disconnecting = KS_TRUE; + break; + } - // @todo this will check the discarded queue first and realloc if there is not enough space, otherwise allocate a message, and finally copy the data - if (blade_handle_message_claim(peer->service->handle, &message, data, data_len) != KS_STATUS_SUCCESS || !message) { - // @todo error handling - // just drop the peer for now, the only failure scenarios are asserted OOM, or if the discard queue pop fails - peer->disconnecting = KS_TRUE; - break; - } + if (blade_handle_message_claim(blade_service_handle(peer->service), &message, frame_data, frame_data_len) != KS_STATUS_SUCCESS || !message) { + // @todo error handling + // just drop the peer for now, the only failure scenarios are asserted OOM, or if the discard queue pop fails + peer->disconnecting = KS_TRUE; + break; + } - ks_q_push(peer->messages_receiving, message); - // @todo callback up the stack to indicate a message has been received and can be popped (more efficient than constantly polling by popping)? + ks_q_push(peer->messages_receiving, message); + // @todo callback up the stack to indicate a message has been received and can be popped (more efficient than constantly polling by popping)? + // might not perfectly fit the traditional state callback, but it could work if it only sends the state temporarily and does NOT actually change + // the internal state to receiving + } - - if (ks_q_trypop(peer->messages_sending, &message) == KS_STATUS_SUCCESS) { + // @todo consider only sending one message at a time and use shorter polling timeout to prevent any considerable blocking if send buffers get full + while (ks_q_trypop(peer->messages_sending, (void **)&message) == KS_STATUS_SUCCESS && message) { + blade_message_get(message, (void **)&frame_data, &frame_data_len); + kws_write_frame(peer->kws, WSOC_TEXT, frame_data, frame_data_len); } } diff --git a/libs/libblade/src/blade_service.c b/libs/libblade/src/blade_service.c index e1361bf9df..dc7dcadbf6 100644 --- a/libs/libblade/src/blade_service.c +++ b/libs/libblade/src/blade_service.c @@ -45,6 +45,7 @@ struct blade_service_s { bspvt_flag_t flags; ks_pool_t *pool; ks_thread_pool_t *tpool; + blade_handle_t *handle; ks_sockaddr_t config_websockets_endpoints_ipv4[BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX]; ks_sockaddr_t config_websockets_endpoints_ipv6[BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX]; @@ -97,11 +98,14 @@ KS_DECLARE(ks_status_t) blade_service_destroy(blade_service_t **bsP) return KS_STATUS_SUCCESS; } -KS_DECLARE(ks_status_t) blade_service_create(blade_service_t **bsP, ks_pool_t *pool, ks_thread_pool_t *tpool) +KS_DECLARE(ks_status_t) blade_service_create(blade_service_t **bsP, ks_pool_t *pool, ks_thread_pool_t *tpool, blade_handle_t *handle) { bspvt_flag_t newflags = BS_NONE; blade_service_t *bs = NULL; + ks_assert(bsP); + ks_assert(handle); + if (!pool) { newflags |= BS_MYPOOL; ks_pool_open(&pool); @@ -117,12 +121,19 @@ KS_DECLARE(ks_status_t) blade_service_create(blade_service_t **bsP, ks_pool_t *p bs->flags = newflags; bs->pool = pool; bs->tpool = tpool; + bs->handle = handle; list_init(&bs->connected); *bsP = bs; return KS_STATUS_SUCCESS; } +KS_DECLARE(blade_handle_t *) blade_service_handle(blade_service_t *bs) +{ + ks_assert(bs); + return bs->handle; +} + ks_status_t blade_service_config(blade_service_t *bs, config_setting_t *config) { config_setting_t *websockets = NULL; @@ -325,7 +336,7 @@ ks_status_t blade_service_listen(blade_service_t *bs, ks_sockaddr_t *addr) void *blade_service_listeners_thread(ks_thread_t *thread, void *data) { - blade_service_t *service; + blade_service_t *service = NULL; ks_assert(thread); ks_assert(data); @@ -333,57 +344,50 @@ void *blade_service_listeners_thread(ks_thread_t *thread, void *data) service = (blade_service_t *)data; while (!service->shutdown) { + // @todo take exact timeout from a setting in config_service_endpoints if (ks_poll(service->listeners_poll, service->listeners_length, 100) > 0) { for (int32_t index = 0; index < service->listeners_length; ++index) { - ks_socket_t sock; - ks_sockaddr_t raddr; - socklen_t slen = 0; - kws_t *kws = NULL; + ks_socket_t sock = KS_SOCK_INVALID; blade_peer_t *peer = NULL; if (!(service->listeners_poll[index].revents & POLLIN)) continue; if (service->listeners_poll[index].revents & POLLERR) { - // @todo: error handling, just skip the listener for now + // @todo: error handling, just skip the listener for now, it might recover, could skip X sanity times before closing? continue; } - if (service->listeners_families[index] == AF_INET) { - slen = sizeof(raddr.v.v4); - if ((sock = accept(service->listeners_poll[index].fd, (struct sockaddr *)&raddr.v.v4, &slen)) == KS_SOCK_INVALID) { - // @todo: error handling, just skip the socket for now - continue; - } - raddr.family = AF_INET; - } else { - slen = sizeof(raddr.v.v6); - if ((sock = accept(service->listeners_poll[index].fd, (struct sockaddr *)&raddr.v.v6, &slen)) == KS_SOCK_INVALID) { - // @todo: error handling, just skip the socket for now - continue; - } - raddr.family = AF_INET6; - } - - ks_addr_get_host(&raddr); - ks_addr_get_port(&raddr); - - // @todo: SSL init stuffs based on data from service->config_websockets_ssl - - if (kws_init(&kws, sock, NULL, NULL, KWS_BLOCK, service->pool) != KS_STATUS_SUCCESS) { - // @todo: error handling, just close and skip the socket for now - ks_socket_close(&sock); + if ((sock = accept(service->listeners_poll[index].fd, NULL, NULL)) == KS_SOCK_INVALID) { + // @todo: error handling, just skip the socket for now as most causes are because the remote side suddenly became unreachable continue; } - blade_peer_create(&peer, service->pool, service->tpool); + // @todo consider a recycle queue of peers per service, and only have to call startup when one is already available + // blade_service_peer_claim(service, &peer); + blade_peer_create(&peer, service->pool, service->tpool, service); ks_assert(peer); - // @todo: should probably assign kws before adding to list, in a separate call from startup because it starts the internal worker thread + // @todo call state callback with connecting enum state + + blade_peer_startup(peer, sock); list_append(&service->connected, peer); - - blade_peer_startup(peer, kws); } } + + list_iterator_start(&service->connected); + while (list_iterator_hasnext(&service->connected)) { + blade_peer_t *peer = (blade_peer_t *)list_iterator_next(&service->connected); + // @todo expose accessor for disconnecting, after changing it into the state callback enum + // ensure that every way kws_close might occur leads back to disconnecting = KS_TRUE for this to universally process disconnects + if (blade_peer_disconnecting(peer)) { + // @todo check if there is an iterator based remove function, or indexed iteration to use list_delete_at() + list_delete(&service->connected, peer); + // @todo call state callback with internal disconnecting enum state (stored in peer to hold specific reason for disconnecting) + // @todo switch to blade_peer_shutdown(&peer) and blade_peer_discard(&peer) after introducing recycling of peers + blade_peer_destroy(&peer); + } + } + list_iterator_stop(&service->connected); } return NULL; diff --git a/libs/libblade/src/blade_stack.c b/libs/libblade/src/blade_stack.c index 5f7c8a7d73..b262336bf9 100644 --- a/libs/libblade/src/blade_stack.c +++ b/libs/libblade/src/blade_stack.c @@ -44,11 +44,12 @@ struct blade_handle_s { ks_pool_t *pool; ks_thread_pool_t *tpool; + config_setting_t *config_service; config_setting_t *config_datastore; - config_setting_t *config_directory; - - //blade_peer_t *peer; - blade_directory_t *directory; + + ks_q_t *messages_discarded; + blade_service_t *service; + blade_datastore_t *datastore; }; @@ -70,8 +71,12 @@ KS_DECLARE(ks_status_t) blade_handle_destroy(blade_handle_t **bhP) pool = bh->pool; blade_handle_shutdown(bh); - - //blade_peer_destroy(&bh->peer); + + if (bh->messages_discarded) { + // @todo make sure messages are cleaned up + ks_q_destroy(&bh->messages_discarded); + } + if (bh->tpool && (flags & BH_MYTPOOL)) ks_thread_pool_destroy(&bh->tpool); ks_pool_free(bh->pool, &bh); @@ -104,7 +109,10 @@ KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *poo bh->flags = newflags; bh->pool = pool; bh->tpool = tpool; - //blade_peer_create(&bh->peer, bh->pool, bh->tpool); + + // @todo check thresholds from config, for now just ensure it doesn't grow out of control, allow 100 discarded messages + ks_q_create(&bh->messages_discarded, bh->pool, 100); + ks_assert(bh->messages_discarded); *bhP = bh; @@ -113,22 +121,24 @@ KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *poo ks_status_t blade_handle_config(blade_handle_t *bh, config_setting_t *config) { + config_setting_t *service = NULL; config_setting_t *datastore = NULL; - config_setting_t *directory = NULL; ks_assert(bh); if (!config) return KS_STATUS_FAIL; if (!config_setting_is_group(config)) return KS_STATUS_FAIL; + // @todo config for messages_discarded threshold (ie, message count, message memory, etc) + + service = config_setting_get_member(config, "service"); + datastore = config_setting_get_member(config, "datastore"); //if (datastore && !config_setting_is_group(datastore)) return KS_STATUS_FAIL; - - directory = config_setting_get_member(config, "directory"); - //if (directory && !config_setting_is_group(directory)) return KS_STATUS_FAIL; + + bh->config_service = service; bh->config_datastore = datastore; - bh->config_directory = directory; return KS_STATUS_SUCCESS; } @@ -138,15 +148,17 @@ KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_ ks_assert(bh); if (blade_handle_config(bh, config) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + + if (bh->config_service && !blade_handle_service_available(bh)) { + blade_service_create(&bh->service, bh->pool, bh->tpool, bh); + ks_assert(bh->service); + if (blade_service_startup(bh->service, bh->config_service) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + } if (bh->config_datastore && !blade_handle_datastore_available(bh)) { blade_datastore_create(&bh->datastore, bh->pool, bh->tpool); - blade_datastore_startup(bh->datastore, bh->config_datastore); - } - - if (bh->config_directory && !blade_handle_directory_available(bh)) { - blade_directory_create(&bh->directory, bh->pool, bh->tpool); - blade_directory_startup(bh->directory, config); + ks_assert(bh->datastore); + if (blade_datastore_startup(bh->datastore, bh->config_datastore) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; } return KS_STATUS_SUCCESS; @@ -155,14 +167,55 @@ KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_ KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh) { ks_assert(bh); - - if (blade_handle_directory_available(bh)) blade_directory_destroy(&bh->directory); + + if (blade_handle_service_available(bh)) blade_service_destroy(&bh->service); if (blade_handle_datastore_available(bh)) blade_datastore_destroy(&bh->datastore); return KS_STATUS_SUCCESS; } +KS_DECLARE(ks_status_t) blade_handle_message_claim(blade_handle_t *bh, blade_message_t **message, void *data, ks_size_t data_length) +{ + blade_message_t *msg = NULL; + + ks_assert(bh); + ks_assert(message); + ks_assert(data); + + *message = NULL; + + if (ks_q_trypop(bh->messages_discarded, (void **)&msg) != KS_STATUS_SUCCESS || !msg) blade_message_create(&msg, bh->pool, bh); + ks_assert(msg); + + if (blade_message_set(msg, data, data_length) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + + *message = msg; + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(ks_status_t) blade_handle_message_discard(blade_handle_t *bh, blade_message_t **message) +{ + ks_assert(bh); + ks_assert(message); + ks_assert(*message); + + // @todo check thresholds for discarded messages, if the queue is full just destroy the message for now (currently 100 messages) + if (ks_q_push(bh->messages_discarded, *message) != KS_STATUS_SUCCESS) blade_message_destroy(message); + + *message = NULL; + + return KS_STATUS_SUCCESS; +} + +KS_DECLARE(ks_bool_t) blade_handle_service_available(blade_handle_t *bh) +{ + ks_assert(bh); + + return bh->service != NULL; +} + KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh) { ks_assert(bh); @@ -170,13 +223,6 @@ KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh) return bh->datastore != NULL; } -KS_DECLARE(ks_bool_t) blade_handle_directory_available(blade_handle_t *bh) -{ - ks_assert(bh); - - return bh->directory != NULL; -} - KS_DECLARE(ks_status_t) blade_handle_datastore_store(blade_handle_t *bh, const void *key, int32_t key_length, const void *data, int64_t data_length) { ks_assert(bh); diff --git a/libs/libblade/src/include/blade.h b/libs/libblade/src/include/blade.h index 578d41fee0..f8a3a4459f 100644 --- a/libs/libblade/src/include/blade.h +++ b/libs/libblade/src/include/blade.h @@ -42,8 +42,8 @@ #include "blade_stack.h" #include "blade_peer.h" #include "blade_service.h" +#include "blade_message.h" #include "blade_datastore.h" -#include "blade_directory.h" #include "bpcp.h" KS_BEGIN_EXTERN_C diff --git a/libs/libblade/src/include/blade_directory.h b/libs/libblade/src/include/blade_message.h similarity index 76% rename from libs/libblade/src/include/blade_directory.h rename to libs/libblade/src/include/blade_message.h index e0a66980f3..5db9c60c22 100644 --- a/libs/libblade/src/include/blade_directory.h +++ b/libs/libblade/src/include/blade_message.h @@ -31,20 +31,16 @@ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -#ifndef _BLADE_DIRECTORY_H_ -#define _BLADE_DIRECTORY_H_ +#ifndef _BLADE_MESSAGE_H_ +#define _BLADE_MESSAGE_H_ #include -#define BLADE_DIRECTORY_TPOOL_MIN 2 -#define BLADE_DIRECTORY_TPOOL_MAX 8 -#define BLADE_DIRECTORY_TPOOL_STACK (1024 * 256) -#define BLADE_DIRECTORY_TPOOL_IDLE 10 - KS_BEGIN_EXTERN_C -KS_DECLARE(ks_status_t) blade_directory_create(blade_directory_t **bdP, ks_pool_t *pool, ks_thread_pool_t *tpool); -KS_DECLARE(ks_status_t) blade_directory_destroy(blade_directory_t **bdP); -KS_DECLARE(ks_status_t) blade_directory_startup(blade_directory_t *bd, config_setting_t *config); -KS_DECLARE(ks_status_t) blade_directory_shutdown(blade_directory_t *bd); +KS_DECLARE(ks_status_t) blade_message_create(blade_message_t **bmP, ks_pool_t *pool, blade_handle_t *handle); +KS_DECLARE(ks_status_t) blade_message_destroy(blade_message_t **bmP); +KS_DECLARE(ks_status_t) blade_message_set(blade_message_t *bm, void *data, ks_size_t data_length); +KS_DECLARE(ks_status_t) blade_message_get(blade_message_t *bm, void **data, ks_size_t *data_length); +KS_DECLARE(ks_status_t) blade_message_discard(blade_message_t **bm); KS_END_EXTERN_C #endif diff --git a/libs/libblade/src/include/blade_peer.h b/libs/libblade/src/include/blade_peer.h index 8a15044ac5..acffc37998 100644 --- a/libs/libblade/src/include/blade_peer.h +++ b/libs/libblade/src/include/blade_peer.h @@ -31,8 +31,8 @@ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -#ifndef _BPCP_H_ -#define _BPCP_H_ +#ifndef _BLADE_PEER_H_ +#define _BLADE_PEER_H_ #include #define BLADE_PEER_TPOOL_MIN 2 @@ -41,10 +41,14 @@ #define BLADE_PEER_TPOOL_IDLE 10 KS_BEGIN_EXTERN_C -KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, ks_thread_pool_t *tpool); +KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, ks_thread_pool_t *tpool, blade_service_t *service); KS_DECLARE(ks_status_t) blade_peer_destroy(blade_peer_t **bpP); -KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, kws_t *kws); +KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, ks_socket_t sock); KS_DECLARE(ks_status_t) blade_peer_shutdown(blade_peer_t *bp); +KS_DECLARE(void) blade_peer_disconnect(blade_peer_t *bp); +KS_DECLARE(ks_bool_t) blade_peer_disconnecting(blade_peer_t *bp); +KS_DECLARE(ks_status_t) blade_peer_message_pop(blade_peer_t *peer, blade_message_t **message); +KS_DECLARE(ks_status_t) blade_peer_message_push(blade_peer_t *peer, void *data, ks_size_t data_length); KS_END_EXTERN_C #endif diff --git a/libs/libblade/src/include/blade_service.h b/libs/libblade/src/include/blade_service.h index 9ec014f277..d6f1de40b5 100644 --- a/libs/libblade/src/include/blade_service.h +++ b/libs/libblade/src/include/blade_service.h @@ -41,8 +41,9 @@ #define BLADE_SERVICE_TPOOL_IDLE 10 KS_BEGIN_EXTERN_C -KS_DECLARE(ks_status_t) blade_service_create(blade_service_t **bsP, ks_pool_t *pool, ks_thread_pool_t *tpool); +KS_DECLARE(ks_status_t) blade_service_create(blade_service_t **bsP, ks_pool_t *pool, ks_thread_pool_t *tpool, blade_handle_t *handle); KS_DECLARE(ks_status_t) blade_service_destroy(blade_service_t **bsP); +KS_DECLARE(blade_handle_t *) blade_service_handle(blade_service_t *bs); KS_DECLARE(ks_status_t) blade_service_startup(blade_service_t *bs, config_setting_t *config); KS_DECLARE(ks_status_t) blade_service_shutdown(blade_service_t *bs); KS_END_EXTERN_C diff --git a/libs/libblade/src/include/blade_stack.h b/libs/libblade/src/include/blade_stack.h index 1eb4229d72..507d8a12f7 100644 --- a/libs/libblade/src/include/blade_stack.h +++ b/libs/libblade/src/include/blade_stack.h @@ -46,9 +46,12 @@ KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *poo KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_t *config); KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh); -KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh); -KS_DECLARE(ks_bool_t) blade_handle_directory_available(blade_handle_t *bh); +KS_DECLARE(ks_status_t) blade_handle_message_claim(blade_handle_t *bh, blade_message_t **message, void *data, ks_size_t data_length); +KS_DECLARE(ks_status_t) blade_handle_message_discard(blade_handle_t *bh, blade_message_t **message); +KS_DECLARE(ks_bool_t) blade_handle_service_available(blade_handle_t *bh); + +KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh); KS_DECLARE(ks_status_t) blade_handle_datastore_store(blade_handle_t *bh, const void *key, int32_t key_length, const void *data, int64_t data_length); KS_DECLARE(ks_status_t) blade_handle_datastore_fetch(blade_handle_t *bh, blade_datastore_fetch_callback_t callback, diff --git a/libs/libblade/src/include/blade_types.h b/libs/libblade/src/include/blade_types.h index 0cb2ec8e3e..e7484d0f38 100644 --- a/libs/libblade/src/include/blade_types.h +++ b/libs/libblade/src/include/blade_types.h @@ -40,8 +40,8 @@ KS_BEGIN_EXTERN_C typedef struct blade_handle_s blade_handle_t; typedef struct blade_peer_s blade_peer_t; typedef struct blade_service_s blade_service_t; +typedef struct blade_message_s blade_message_t; typedef struct blade_datastore_s blade_datastore_t; -typedef struct blade_directory_s blade_directory_t; typedef ks_bool_t (*blade_datastore_fetch_callback_t)(blade_datastore_t *bds, const void *data, uint32_t data_length, void *userdata);