FS-9952: More work on the blade service transport layer, now compiles but is untested, and still missing a few pieces to be functional

This commit is contained in:
Shane Bryldt 2017-01-30 05:02:58 +00:00 committed by Mike Jerris
parent eb57b7910e
commit 80179e7bd0
12 changed files with 373 additions and 284 deletions

View File

@ -11,13 +11,14 @@ libunqlite_la_CFLAGS = -DUNQLITE_ENABLE_THREADS
libunqlite_la_LIBADD = -lpthread
lib_LTLIBRARIES = libblade.la
libblade_la_SOURCES = src/blade.c src/blade_stack.c src/blade_peer.c src/blade_service.c src/bpcp.c src/blade_datastore.c src/blade_directory.c
libblade_la_SOURCES = src/blade.c src/blade_stack.c src/blade_peer.c src/blade_service.c src/bpcp.c src/blade_datastore.c
libblade_la_SOURCES += src/blade_message.c
libblade_la_CFLAGS = $(AM_CFLAGS) $(AM_CPPFLAGS)
libblade_la_LDFLAGS = -version-info 0:1:0 -lncurses -lpthread -lm -lconfig $(AM_LDFLAGS)
libblade_la_LIBADD = libunqlite.la
library_includedir = $(prefix)/include
library_include_HEADERS = src/include/blade.h src/include/blade_types.h src/include/blade_stack.h src/include/blade_peer.h src/include/blade_service.h
library_include_HEADERS += src/include/bpcp.h src/include/blade_datastore.h src/include/blade_directory.h
library_include_HEADERS += src/include/bpcp.h src/include/blade_datastore.h src/include/blade_message.h
library_include_HEADERS += src/include/unqlite.h test/tap.h
tests: libblade.la

View File

@ -1,165 +0,0 @@
/*
* Copyright (c) 2007-2014, Anthony Minessale II
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* * Neither the name of the original author; nor the names of any contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
* OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "blade.h"
typedef enum {
BD_NONE = 0,
BD_MYPOOL = (1 << 0),
BD_MYTPOOL = (1 << 1),
} bdpvt_flag_t;
struct blade_directory_s {
bdpvt_flag_t flags;
ks_pool_t *pool;
ks_thread_pool_t *tpool;
config_setting_t *config_service;
blade_service_t *service;
};
KS_DECLARE(ks_status_t) blade_directory_destroy(blade_directory_t **bdP)
{
blade_directory_t *bd = NULL;
bdpvt_flag_t flags;
ks_pool_t *pool;
ks_assert(bdP);
bd = *bdP;
*bdP = NULL;
ks_assert(bd);
flags = bd->flags;
pool = bd->pool;
blade_directory_shutdown(bd);
if (bd->tpool && (flags & BD_MYTPOOL)) ks_thread_pool_destroy(&bd->tpool);
ks_pool_free(bd->pool, &bd);
if (pool && (flags & BD_MYPOOL)) ks_pool_close(&pool);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_directory_create(blade_directory_t **bdP, ks_pool_t *pool, ks_thread_pool_t *tpool)
{
bdpvt_flag_t newflags = BD_NONE;
blade_directory_t *bd = NULL;
if (!pool) {
newflags |= BD_MYPOOL;
ks_pool_open(&pool);
ks_assert(pool);
}
// @todo: move thread pool creation to startup which allows thread pool to be configurable
if (!tpool) {
newflags |= BD_MYTPOOL;
ks_thread_pool_create(&tpool,
BLADE_DIRECTORY_TPOOL_MIN,
BLADE_DIRECTORY_TPOOL_MAX,
BLADE_DIRECTORY_TPOOL_STACK,
KS_PRI_NORMAL,
BLADE_DIRECTORY_TPOOL_IDLE);
ks_assert(tpool);
}
bd = ks_pool_alloc(pool, sizeof(*bd));
bd->flags = newflags;
bd->pool = pool;
bd->tpool = tpool;
*bdP = bd;
return KS_STATUS_SUCCESS;
}
ks_status_t blade_directory_config(blade_directory_t *bd, config_setting_t *config)
{
config_setting_t *service = NULL;
ks_assert(bd);
if (!config) return KS_STATUS_FAIL;
if (!config_setting_is_group(config)) return KS_STATUS_FAIL;
service = config_setting_get_member(config, "service");
if (!service) return KS_STATUS_FAIL;
bd->config_service = service;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_directory_startup(blade_directory_t *bd, config_setting_t *config)
{
ks_assert(bd);
blade_directory_shutdown(bd);
if (blade_directory_config(bd, config) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
blade_service_create(&bd->service, bd->pool, bd->tpool);
ks_assert(bd->service);
if (blade_service_startup(bd->service, bd->config_service) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_directory_shutdown(blade_directory_t *bd)
{
ks_assert(bd);
if (bd->service) blade_service_destroy(&bd->service);
return KS_STATUS_SUCCESS;
}
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
*/

View File

@ -0,0 +1,129 @@
/*
* Copyright (c) 2007-2014, Anthony Minessale II
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* * Neither the name of the original author; nor the names of any contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
* OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "blade.h"
struct blade_message_s {
ks_pool_t *pool;
blade_handle_t *handle;
void *data;
ks_size_t data_length;
ks_size_t data_size;
};
KS_DECLARE(ks_status_t) blade_message_destroy(blade_message_t **bmP)
{
blade_message_t *bm = NULL;
ks_assert(bmP);
bm = *bmP;
*bmP = NULL;
ks_assert(bm);
if (bm->data) ks_pool_free(bm->pool, &bm->data);
ks_pool_free(bm->pool, &bm);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_message_create(blade_message_t **bmP, ks_pool_t *pool, blade_handle_t *handle)
{
blade_message_t *bm = NULL;
ks_assert(bmP);
ks_assert(pool);
ks_assert(handle);
bm = ks_pool_alloc(pool, sizeof(*bm));
bm->pool = pool;
bm->handle = handle;
*bmP = bm;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_message_discard(blade_message_t **bm)
{
ks_assert(bm);
ks_assert(*bm);
return blade_handle_message_discard((*bm)->handle, bm);
}
KS_DECLARE(ks_status_t) blade_message_set(blade_message_t *bm, void *data, ks_size_t data_length)
{
ks_assert(bm);
ks_assert(data);
ks_assert(data_length > 0);
// @todo fail on a max message size?
if (data_length > bm->data_size) {
// @todo talk to tony about adding flags to ks_pool_resize_ex to prevent the memcpy, don't need to copy old memory here
// otherwise switch to a new allocation instead of resizing
bm->data = ks_pool_resize(bm->pool, bm->data, data_length);
ks_assert(bm->data);
bm->data_size = data_length;
}
memcpy(bm->data, data, data_length);
bm->data_length = data_length;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_message_get(blade_message_t *bm, void **data, ks_size_t *data_length)
{
ks_assert(bm);
*data = bm->data;
*data_length = bm->data_length;
return KS_STATUS_SUCCESS;
}
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
*/

View File

@ -45,7 +45,9 @@ struct blade_peer_s {
ks_thread_pool_t *tpool;
blade_service_t *service;
ks_socket_t sock;
ks_bool_t shutdown;
ks_bool_t disconnecting;
kws_t *kws;
ks_thread_t *kws_thread;
@ -118,8 +120,10 @@ KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, k
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, kws_t *kws)
KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, ks_socket_t sock)
{
kws_t *kws = NULL;
ks_assert(bp);
ks_assert(kws);
@ -127,7 +131,7 @@ KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, kws_t *kws)
blade_peer_shutdown(bp);
bp->kws = kws;
bp->sock = sock;
if (ks_thread_create_ex(&bp->kws_thread,
blade_peer_kws_thread,
@ -142,6 +146,8 @@ KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, kws_t *kws)
KS_DECLARE(ks_status_t) blade_peer_shutdown(blade_peer_t *bp)
{
blade_message_t *message = NULL;
ks_assert(bp);
bp->shutdown = KS_TRUE;
@ -150,61 +156,125 @@ KS_DECLARE(ks_status_t) blade_peer_shutdown(blade_peer_t *bp)
ks_thread_join(bp->kws_thread);
ks_pool_free(bp->pool, &bp->kws_thread);
}
while (ks_q_trypop(bp->messages_sending, (void **)&message) == KS_STATUS_SUCCESS && message) blade_message_discard(&message);
while (ks_q_trypop(bp->messages_receiving, (void **)&message) == KS_STATUS_SUCCESS && message) blade_message_discard(&message);
if (bp->kws) kws_destroy(&bp->kws);
else if (bp->sock != KS_SOCK_INVALID) ks_socket_close(&bp->sock);
bp->sock = KS_SOCK_INVALID;
bp->shutdown = KS_FALSE;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(void) blade_peer_disconnect(blade_peer_t *bp)
{
ks_assert(bp);
bp->disconnecting = KS_TRUE;
}
KS_DECLARE(ks_bool_t) blade_peer_disconnecting(blade_peer_t *bp)
{
ks_assert(bp);
return bp->disconnecting;
}
KS_DECLARE(ks_status_t) blade_peer_message_pop(blade_peer_t *peer, blade_message_t **message)
{
ks_assert(peer);
ks_assert(message);
*message = NULL;
return ks_q_trypop(peer->messages_receiving, (void **)message);
}
KS_DECLARE(ks_status_t) blade_peer_message_push(blade_peer_t *peer, void *data, ks_size_t data_length)
{
blade_message_t *message = NULL;
ks_assert(peer);
ks_assert(data);
ks_assert(data_length > 0);
if (blade_handle_message_claim(blade_service_handle(peer->service), &message, data, data_length) != KS_STATUS_SUCCESS || !message) {
// @todo error handling
// just drop the peer for now, the only failure scenarios are asserted OOM, or if the discard queue pop fails
peer->disconnecting = KS_TRUE;
return KS_STATUS_FAIL;
}
ks_q_push(peer->messages_sending, message);
return KS_STATUS_SUCCESS;
}
void *blade_peer_kws_thread(ks_thread_t *thread, void *data)
{
blade_peer_t *peer;
blade_peer_t *peer = NULL;
kws_opcode_t opcode;
uint8_t *data;
ks_size_t data_len;
blade_message_t *message;
uint8_t *frame_data = NULL;
ks_size_t frame_data_len = 0;
blade_message_t *message = NULL;
int32_t poll_flags = 0;
ks_assert(thread);
ks_assert(data);
peer = (blade_peer_t *)data;
// @todo: SSL init stuffs based on data from peer->service->config_websockets_ssl to pass into kws_init
if (kws_init(&peer->kws, peer->sock, NULL, NULL, KWS_BLOCK, peer->pool) != KS_STATUS_SUCCESS) {
peer->disconnecting = KS_TRUE;
return NULL;
}
while (!peer->shutdown) {
// @todo use nonblocking kws mode so that if no data at all is available yet we can still do other things such as sending messages before trying again
// or easier alternative, just use ks_poll (or select) to check if there is a POLLIN event pending, but this requires direct access to the socket, or
// kws can be updated to add a function to poll the inner socket for events (IE, kws_poll(kws, &inbool, NULL, &errbool, timeout))
data_len = kws_read_frame(peer->kws, &opcode, &data);
// @todo get exact timeout from service config?
poll_flags = ks_wait_sock(peer->sock, 100, KS_POLL_READ | KS_POLL_ERROR);
if (data_len <= 0) {
// @todo error handling, strerror(ks_errno())
// 0 means socket closed with WS_NONE, which closes websocket with no additional reason
// -1 means socket closed with a general failure
// -2 means nonblocking wait
// other values are based on WS_XXX reasons
// negative values are based on reasons, except for -1 is but -2 is nonblocking wait, and
if (poll_flags & KS_POLL_ERROR) {
// @todo switch this (and others below) to the enum for the state callback, called during the service connected peer cleanup
peer->disconnecting = KS_TRUE;
break;
}
if (poll_flags & KS_POLL_READ) {
frame_data_len = kws_read_frame(peer->kws, &opcode, &frame_data);
// @todo: this way of disconnecting would have the service periodically check the list of connected peers for those that are disconnecting,
// remove them from the connected peer list, and then call peer destroy which will wait for this thread to rejoin which it already will have,
// and then destroy the inner kws and finish any cleanup of the actual socket if neccessary, and can still call an ondisconnected callback
// at the service level
peer->disconnecting = KS_TRUE;
break;
}
if (frame_data_len <= 0) {
// @todo error handling, strerror(ks_errno())
// 0 means socket closed with WS_NONE, which closes websocket with no additional reason
// -1 means socket closed with a general failure
// -2 means nonblocking wait
// other values are based on WS_XXX reasons
// negative values are based on reasons, except for -1 is but -2 is nonblocking wait, and
// @todo: this way of disconnecting would have the service periodically check the list of connected peers for those that are disconnecting,
// remove them from the connected peer list, and then call peer destroy which will wait for this thread to rejoin which it already will have,
// and then destroy the inner kws and finish any cleanup of the actual socket if neccessary, and can still call an ondisconnected callback
// at the service level
peer->disconnecting = KS_TRUE;
break;
}
// @todo this will check the discarded queue first and realloc if there is not enough space, otherwise allocate a message, and finally copy the data
if (blade_handle_message_claim(peer->service->handle, &message, data, data_len) != KS_STATUS_SUCCESS || !message) {
// @todo error handling
// just drop the peer for now, the only failure scenarios are asserted OOM, or if the discard queue pop fails
peer->disconnecting = KS_TRUE;
break;
}
if (blade_handle_message_claim(blade_service_handle(peer->service), &message, frame_data, frame_data_len) != KS_STATUS_SUCCESS || !message) {
// @todo error handling
// just drop the peer for now, the only failure scenarios are asserted OOM, or if the discard queue pop fails
peer->disconnecting = KS_TRUE;
break;
}
ks_q_push(peer->messages_receiving, message);
// @todo callback up the stack to indicate a message has been received and can be popped (more efficient than constantly polling by popping)?
ks_q_push(peer->messages_receiving, message);
// @todo callback up the stack to indicate a message has been received and can be popped (more efficient than constantly polling by popping)?
// might not perfectly fit the traditional state callback, but it could work if it only sends the state temporarily and does NOT actually change
// the internal state to receiving
}
if (ks_q_trypop(peer->messages_sending, &message) == KS_STATUS_SUCCESS) {
// @todo consider only sending one message at a time and use shorter polling timeout to prevent any considerable blocking if send buffers get full
while (ks_q_trypop(peer->messages_sending, (void **)&message) == KS_STATUS_SUCCESS && message) {
blade_message_get(message, (void **)&frame_data, &frame_data_len);
kws_write_frame(peer->kws, WSOC_TEXT, frame_data, frame_data_len);
}
}

View File

@ -45,6 +45,7 @@ struct blade_service_s {
bspvt_flag_t flags;
ks_pool_t *pool;
ks_thread_pool_t *tpool;
blade_handle_t *handle;
ks_sockaddr_t config_websockets_endpoints_ipv4[BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX];
ks_sockaddr_t config_websockets_endpoints_ipv6[BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX];
@ -97,11 +98,14 @@ KS_DECLARE(ks_status_t) blade_service_destroy(blade_service_t **bsP)
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_service_create(blade_service_t **bsP, ks_pool_t *pool, ks_thread_pool_t *tpool)
KS_DECLARE(ks_status_t) blade_service_create(blade_service_t **bsP, ks_pool_t *pool, ks_thread_pool_t *tpool, blade_handle_t *handle)
{
bspvt_flag_t newflags = BS_NONE;
blade_service_t *bs = NULL;
ks_assert(bsP);
ks_assert(handle);
if (!pool) {
newflags |= BS_MYPOOL;
ks_pool_open(&pool);
@ -117,12 +121,19 @@ KS_DECLARE(ks_status_t) blade_service_create(blade_service_t **bsP, ks_pool_t *p
bs->flags = newflags;
bs->pool = pool;
bs->tpool = tpool;
bs->handle = handle;
list_init(&bs->connected);
*bsP = bs;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(blade_handle_t *) blade_service_handle(blade_service_t *bs)
{
ks_assert(bs);
return bs->handle;
}
ks_status_t blade_service_config(blade_service_t *bs, config_setting_t *config)
{
config_setting_t *websockets = NULL;
@ -325,7 +336,7 @@ ks_status_t blade_service_listen(blade_service_t *bs, ks_sockaddr_t *addr)
void *blade_service_listeners_thread(ks_thread_t *thread, void *data)
{
blade_service_t *service;
blade_service_t *service = NULL;
ks_assert(thread);
ks_assert(data);
@ -333,57 +344,50 @@ void *blade_service_listeners_thread(ks_thread_t *thread, void *data)
service = (blade_service_t *)data;
while (!service->shutdown) {
// @todo take exact timeout from a setting in config_service_endpoints
if (ks_poll(service->listeners_poll, service->listeners_length, 100) > 0) {
for (int32_t index = 0; index < service->listeners_length; ++index) {
ks_socket_t sock;
ks_sockaddr_t raddr;
socklen_t slen = 0;
kws_t *kws = NULL;
ks_socket_t sock = KS_SOCK_INVALID;
blade_peer_t *peer = NULL;
if (!(service->listeners_poll[index].revents & POLLIN)) continue;
if (service->listeners_poll[index].revents & POLLERR) {
// @todo: error handling, just skip the listener for now
// @todo: error handling, just skip the listener for now, it might recover, could skip X sanity times before closing?
continue;
}
if (service->listeners_families[index] == AF_INET) {
slen = sizeof(raddr.v.v4);
if ((sock = accept(service->listeners_poll[index].fd, (struct sockaddr *)&raddr.v.v4, &slen)) == KS_SOCK_INVALID) {
// @todo: error handling, just skip the socket for now
continue;
}
raddr.family = AF_INET;
} else {
slen = sizeof(raddr.v.v6);
if ((sock = accept(service->listeners_poll[index].fd, (struct sockaddr *)&raddr.v.v6, &slen)) == KS_SOCK_INVALID) {
// @todo: error handling, just skip the socket for now
continue;
}
raddr.family = AF_INET6;
}
ks_addr_get_host(&raddr);
ks_addr_get_port(&raddr);
// @todo: SSL init stuffs based on data from service->config_websockets_ssl
if (kws_init(&kws, sock, NULL, NULL, KWS_BLOCK, service->pool) != KS_STATUS_SUCCESS) {
// @todo: error handling, just close and skip the socket for now
ks_socket_close(&sock);
if ((sock = accept(service->listeners_poll[index].fd, NULL, NULL)) == KS_SOCK_INVALID) {
// @todo: error handling, just skip the socket for now as most causes are because the remote side suddenly became unreachable
continue;
}
blade_peer_create(&peer, service->pool, service->tpool);
// @todo consider a recycle queue of peers per service, and only have to call startup when one is already available
// blade_service_peer_claim(service, &peer);
blade_peer_create(&peer, service->pool, service->tpool, service);
ks_assert(peer);
// @todo: should probably assign kws before adding to list, in a separate call from startup because it starts the internal worker thread
// @todo call state callback with connecting enum state
blade_peer_startup(peer, sock);
list_append(&service->connected, peer);
blade_peer_startup(peer, kws);
}
}
list_iterator_start(&service->connected);
while (list_iterator_hasnext(&service->connected)) {
blade_peer_t *peer = (blade_peer_t *)list_iterator_next(&service->connected);
// @todo expose accessor for disconnecting, after changing it into the state callback enum
// ensure that every way kws_close might occur leads back to disconnecting = KS_TRUE for this to universally process disconnects
if (blade_peer_disconnecting(peer)) {
// @todo check if there is an iterator based remove function, or indexed iteration to use list_delete_at()
list_delete(&service->connected, peer);
// @todo call state callback with internal disconnecting enum state (stored in peer to hold specific reason for disconnecting)
// @todo switch to blade_peer_shutdown(&peer) and blade_peer_discard(&peer) after introducing recycling of peers
blade_peer_destroy(&peer);
}
}
list_iterator_stop(&service->connected);
}
return NULL;

View File

@ -44,11 +44,12 @@ struct blade_handle_s {
ks_pool_t *pool;
ks_thread_pool_t *tpool;
config_setting_t *config_service;
config_setting_t *config_datastore;
config_setting_t *config_directory;
//blade_peer_t *peer;
blade_directory_t *directory;
ks_q_t *messages_discarded;
blade_service_t *service;
blade_datastore_t *datastore;
};
@ -70,8 +71,12 @@ KS_DECLARE(ks_status_t) blade_handle_destroy(blade_handle_t **bhP)
pool = bh->pool;
blade_handle_shutdown(bh);
//blade_peer_destroy(&bh->peer);
if (bh->messages_discarded) {
// @todo make sure messages are cleaned up
ks_q_destroy(&bh->messages_discarded);
}
if (bh->tpool && (flags & BH_MYTPOOL)) ks_thread_pool_destroy(&bh->tpool);
ks_pool_free(bh->pool, &bh);
@ -104,7 +109,10 @@ KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *poo
bh->flags = newflags;
bh->pool = pool;
bh->tpool = tpool;
//blade_peer_create(&bh->peer, bh->pool, bh->tpool);
// @todo check thresholds from config, for now just ensure it doesn't grow out of control, allow 100 discarded messages
ks_q_create(&bh->messages_discarded, bh->pool, 100);
ks_assert(bh->messages_discarded);
*bhP = bh;
@ -113,22 +121,24 @@ KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *poo
ks_status_t blade_handle_config(blade_handle_t *bh, config_setting_t *config)
{
config_setting_t *service = NULL;
config_setting_t *datastore = NULL;
config_setting_t *directory = NULL;
ks_assert(bh);
if (!config) return KS_STATUS_FAIL;
if (!config_setting_is_group(config)) return KS_STATUS_FAIL;
// @todo config for messages_discarded threshold (ie, message count, message memory, etc)
service = config_setting_get_member(config, "service");
datastore = config_setting_get_member(config, "datastore");
//if (datastore && !config_setting_is_group(datastore)) return KS_STATUS_FAIL;
directory = config_setting_get_member(config, "directory");
//if (directory && !config_setting_is_group(directory)) return KS_STATUS_FAIL;
bh->config_service = service;
bh->config_datastore = datastore;
bh->config_directory = directory;
return KS_STATUS_SUCCESS;
}
@ -138,15 +148,17 @@ KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_
ks_assert(bh);
if (blade_handle_config(bh, config) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
if (bh->config_service && !blade_handle_service_available(bh)) {
blade_service_create(&bh->service, bh->pool, bh->tpool, bh);
ks_assert(bh->service);
if (blade_service_startup(bh->service, bh->config_service) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
}
if (bh->config_datastore && !blade_handle_datastore_available(bh)) {
blade_datastore_create(&bh->datastore, bh->pool, bh->tpool);
blade_datastore_startup(bh->datastore, bh->config_datastore);
}
if (bh->config_directory && !blade_handle_directory_available(bh)) {
blade_directory_create(&bh->directory, bh->pool, bh->tpool);
blade_directory_startup(bh->directory, config);
ks_assert(bh->datastore);
if (blade_datastore_startup(bh->datastore, bh->config_datastore) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
}
return KS_STATUS_SUCCESS;
@ -155,14 +167,55 @@ KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_
KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh)
{
ks_assert(bh);
if (blade_handle_directory_available(bh)) blade_directory_destroy(&bh->directory);
if (blade_handle_service_available(bh)) blade_service_destroy(&bh->service);
if (blade_handle_datastore_available(bh)) blade_datastore_destroy(&bh->datastore);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_handle_message_claim(blade_handle_t *bh, blade_message_t **message, void *data, ks_size_t data_length)
{
blade_message_t *msg = NULL;
ks_assert(bh);
ks_assert(message);
ks_assert(data);
*message = NULL;
if (ks_q_trypop(bh->messages_discarded, (void **)&msg) != KS_STATUS_SUCCESS || !msg) blade_message_create(&msg, bh->pool, bh);
ks_assert(msg);
if (blade_message_set(msg, data, data_length) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
*message = msg;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_handle_message_discard(blade_handle_t *bh, blade_message_t **message)
{
ks_assert(bh);
ks_assert(message);
ks_assert(*message);
// @todo check thresholds for discarded messages, if the queue is full just destroy the message for now (currently 100 messages)
if (ks_q_push(bh->messages_discarded, *message) != KS_STATUS_SUCCESS) blade_message_destroy(message);
*message = NULL;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_bool_t) blade_handle_service_available(blade_handle_t *bh)
{
ks_assert(bh);
return bh->service != NULL;
}
KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh)
{
ks_assert(bh);
@ -170,13 +223,6 @@ KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh)
return bh->datastore != NULL;
}
KS_DECLARE(ks_bool_t) blade_handle_directory_available(blade_handle_t *bh)
{
ks_assert(bh);
return bh->directory != NULL;
}
KS_DECLARE(ks_status_t) blade_handle_datastore_store(blade_handle_t *bh, const void *key, int32_t key_length, const void *data, int64_t data_length)
{
ks_assert(bh);

View File

@ -42,8 +42,8 @@
#include "blade_stack.h"
#include "blade_peer.h"
#include "blade_service.h"
#include "blade_message.h"
#include "blade_datastore.h"
#include "blade_directory.h"
#include "bpcp.h"
KS_BEGIN_EXTERN_C

View File

@ -31,20 +31,16 @@
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef _BLADE_DIRECTORY_H_
#define _BLADE_DIRECTORY_H_
#ifndef _BLADE_MESSAGE_H_
#define _BLADE_MESSAGE_H_
#include <blade.h>
#define BLADE_DIRECTORY_TPOOL_MIN 2
#define BLADE_DIRECTORY_TPOOL_MAX 8
#define BLADE_DIRECTORY_TPOOL_STACK (1024 * 256)
#define BLADE_DIRECTORY_TPOOL_IDLE 10
KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) blade_directory_create(blade_directory_t **bdP, ks_pool_t *pool, ks_thread_pool_t *tpool);
KS_DECLARE(ks_status_t) blade_directory_destroy(blade_directory_t **bdP);
KS_DECLARE(ks_status_t) blade_directory_startup(blade_directory_t *bd, config_setting_t *config);
KS_DECLARE(ks_status_t) blade_directory_shutdown(blade_directory_t *bd);
KS_DECLARE(ks_status_t) blade_message_create(blade_message_t **bmP, ks_pool_t *pool, blade_handle_t *handle);
KS_DECLARE(ks_status_t) blade_message_destroy(blade_message_t **bmP);
KS_DECLARE(ks_status_t) blade_message_set(blade_message_t *bm, void *data, ks_size_t data_length);
KS_DECLARE(ks_status_t) blade_message_get(blade_message_t *bm, void **data, ks_size_t *data_length);
KS_DECLARE(ks_status_t) blade_message_discard(blade_message_t **bm);
KS_END_EXTERN_C
#endif

View File

@ -31,8 +31,8 @@
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef _BPCP_H_
#define _BPCP_H_
#ifndef _BLADE_PEER_H_
#define _BLADE_PEER_H_
#include <blade.h>
#define BLADE_PEER_TPOOL_MIN 2
@ -41,10 +41,14 @@
#define BLADE_PEER_TPOOL_IDLE 10
KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, ks_thread_pool_t *tpool);
KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, ks_thread_pool_t *tpool, blade_service_t *service);
KS_DECLARE(ks_status_t) blade_peer_destroy(blade_peer_t **bpP);
KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, kws_t *kws);
KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, ks_socket_t sock);
KS_DECLARE(ks_status_t) blade_peer_shutdown(blade_peer_t *bp);
KS_DECLARE(void) blade_peer_disconnect(blade_peer_t *bp);
KS_DECLARE(ks_bool_t) blade_peer_disconnecting(blade_peer_t *bp);
KS_DECLARE(ks_status_t) blade_peer_message_pop(blade_peer_t *peer, blade_message_t **message);
KS_DECLARE(ks_status_t) blade_peer_message_push(blade_peer_t *peer, void *data, ks_size_t data_length);
KS_END_EXTERN_C
#endif

View File

@ -41,8 +41,9 @@
#define BLADE_SERVICE_TPOOL_IDLE 10
KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) blade_service_create(blade_service_t **bsP, ks_pool_t *pool, ks_thread_pool_t *tpool);
KS_DECLARE(ks_status_t) blade_service_create(blade_service_t **bsP, ks_pool_t *pool, ks_thread_pool_t *tpool, blade_handle_t *handle);
KS_DECLARE(ks_status_t) blade_service_destroy(blade_service_t **bsP);
KS_DECLARE(blade_handle_t *) blade_service_handle(blade_service_t *bs);
KS_DECLARE(ks_status_t) blade_service_startup(blade_service_t *bs, config_setting_t *config);
KS_DECLARE(ks_status_t) blade_service_shutdown(blade_service_t *bs);
KS_END_EXTERN_C

View File

@ -46,9 +46,12 @@ KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *poo
KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_t *config);
KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh);
KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh);
KS_DECLARE(ks_bool_t) blade_handle_directory_available(blade_handle_t *bh);
KS_DECLARE(ks_status_t) blade_handle_message_claim(blade_handle_t *bh, blade_message_t **message, void *data, ks_size_t data_length);
KS_DECLARE(ks_status_t) blade_handle_message_discard(blade_handle_t *bh, blade_message_t **message);
KS_DECLARE(ks_bool_t) blade_handle_service_available(blade_handle_t *bh);
KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh);
KS_DECLARE(ks_status_t) blade_handle_datastore_store(blade_handle_t *bh, const void *key, int32_t key_length, const void *data, int64_t data_length);
KS_DECLARE(ks_status_t) blade_handle_datastore_fetch(blade_handle_t *bh,
blade_datastore_fetch_callback_t callback,

View File

@ -40,8 +40,8 @@ KS_BEGIN_EXTERN_C
typedef struct blade_handle_s blade_handle_t;
typedef struct blade_peer_s blade_peer_t;
typedef struct blade_service_s blade_service_t;
typedef struct blade_message_s blade_message_t;
typedef struct blade_datastore_s blade_datastore_t;
typedef struct blade_directory_s blade_directory_t;
typedef ks_bool_t (*blade_datastore_fetch_callback_t)(blade_datastore_t *bds, const void *data, uint32_t data_length, void *userdata);