FS-9952: A few changes and implemented the initial service peer state callbacks
This commit is contained in:
parent
80179e7bd0
commit
4ec0fbc581
|
@ -34,22 +34,13 @@
|
|||
#include "blade.h"
|
||||
|
||||
|
||||
typedef enum {
|
||||
BDS_NONE = 0,
|
||||
BDS_MYPOOL = (1 << 0),
|
||||
BDS_MYTPOOL = (1 << 1),
|
||||
} bdspvt_flag_t;
|
||||
|
||||
struct blade_datastore_s {
|
||||
bdspvt_flag_t flags;
|
||||
ks_pool_t *pool;
|
||||
ks_thread_pool_t *tpool;
|
||||
|
||||
const char *config_database_path;
|
||||
//config_setting_t *config_service;
|
||||
|
||||
unqlite *db;
|
||||
//blade_service_t *service;
|
||||
};
|
||||
|
||||
struct blade_datastore_fetch_userdata_s
|
||||
|
@ -65,8 +56,6 @@ typedef struct blade_datastore_fetch_userdata_s blade_datastore_fetch_userdata_t
|
|||
KS_DECLARE(ks_status_t) blade_datastore_destroy(blade_datastore_t **bdsP)
|
||||
{
|
||||
blade_datastore_t *bds = NULL;
|
||||
bdspvt_flag_t flags;
|
||||
ks_pool_t *pool;
|
||||
|
||||
ks_assert(bdsP);
|
||||
|
||||
|
@ -75,44 +64,22 @@ KS_DECLARE(ks_status_t) blade_datastore_destroy(blade_datastore_t **bdsP)
|
|||
|
||||
ks_assert(bds);
|
||||
|
||||
flags = bds->flags;
|
||||
pool = bds->pool;
|
||||
|
||||
blade_datastore_shutdown(bds);
|
||||
|
||||
if (bds->tpool && (flags & BDS_MYTPOOL)) ks_thread_pool_destroy(&bds->tpool);
|
||||
|
||||
ks_pool_free(bds->pool, &bds);
|
||||
|
||||
if (pool && (flags & BDS_MYPOOL)) ks_pool_close(&pool);
|
||||
|
||||
return KS_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
KS_DECLARE(ks_status_t) blade_datastore_create(blade_datastore_t **bdsP, ks_pool_t *pool, ks_thread_pool_t *tpool)
|
||||
{
|
||||
bdspvt_flag_t newflags = BDS_NONE;
|
||||
blade_datastore_t *bds = NULL;
|
||||
|
||||
if (!pool) {
|
||||
newflags |= BDS_MYPOOL;
|
||||
ks_pool_open(&pool);
|
||||
ks_assert(pool);
|
||||
}
|
||||
// @todo: move thread pool creation to startup which allows thread pool to be configurable
|
||||
if (!tpool) {
|
||||
newflags |= BDS_MYTPOOL;
|
||||
ks_thread_pool_create(&tpool,
|
||||
BLADE_DATASTORE_TPOOL_MIN,
|
||||
BLADE_DATASTORE_TPOOL_MAX,
|
||||
BLADE_DATASTORE_TPOOL_STACK,
|
||||
KS_PRI_NORMAL,
|
||||
BLADE_DATASTORE_TPOOL_IDLE);
|
||||
ks_assert(tpool);
|
||||
}
|
||||
ks_assert(bdsP);
|
||||
ks_assert(pool);
|
||||
ks_assert(tpool);
|
||||
|
||||
bds = ks_pool_alloc(pool, sizeof(*bds));
|
||||
bds->flags = newflags;
|
||||
bds->pool = pool;
|
||||
bds->tpool = tpool;
|
||||
*bdsP = bds;
|
||||
|
@ -124,7 +91,6 @@ ks_status_t blade_datastore_config(blade_datastore_t *bds, config_setting_t *con
|
|||
{
|
||||
config_setting_t *tmp;
|
||||
config_setting_t *database = NULL;
|
||||
//config_setting_t *service = NULL;
|
||||
const char *config_database_path = NULL;
|
||||
|
||||
ks_assert(bds);
|
||||
|
@ -138,11 +104,9 @@ ks_status_t blade_datastore_config(blade_datastore_t *bds, config_setting_t *con
|
|||
if (!tmp) return KS_STATUS_FAIL;
|
||||
if (config_setting_type(tmp) != CONFIG_TYPE_STRING) return KS_STATUS_FAIL;
|
||||
config_database_path = config_setting_get_string(tmp);
|
||||
//service = config_setting_get_member(config, "service");
|
||||
|
||||
if (bds->config_database_path) ks_pool_free(bds->pool, &bds->config_database_path);
|
||||
bds->config_database_path = ks_pstrdup(bds->pool, config_database_path);
|
||||
//bds->config_service = service;
|
||||
|
||||
return KS_STATUS_SUCCESS;
|
||||
}
|
||||
|
@ -155,7 +119,6 @@ KS_DECLARE(ks_status_t) blade_datastore_startup(blade_datastore_t *bds, config_s
|
|||
|
||||
if (blade_datastore_config(bds, config) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
|
||||
|
||||
//if (unqlite_open(&bds->db, NULL, UNQLITE_OPEN_IN_MEMORY) != UNQLITE_OK) {
|
||||
if (unqlite_open(&bds->db, bds->config_database_path, UNQLITE_OPEN_CREATE) != UNQLITE_OK) {
|
||||
const char *errbuf = NULL;
|
||||
blade_datastore_error(bds, &errbuf, NULL);
|
||||
|
@ -167,10 +130,6 @@ KS_DECLARE(ks_status_t) blade_datastore_startup(blade_datastore_t *bds, config_s
|
|||
|
||||
// @todo VM init if document store is used (and output consumer callback)
|
||||
|
||||
//blade_service_create(&bds->service, bds->pool, bds->tpool);
|
||||
//ks_assert(bds->service);
|
||||
//blade_service_startup(bds->service, bds->config_service);
|
||||
|
||||
return KS_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -178,15 +137,12 @@ KS_DECLARE(ks_status_t) blade_datastore_shutdown(blade_datastore_t *bds)
|
|||
{
|
||||
ks_assert(bds);
|
||||
|
||||
//if (bds->service) blade_service_destroy(&bds->service);
|
||||
|
||||
if (bds->db) {
|
||||
unqlite_close(bds->db);
|
||||
bds->db = NULL;
|
||||
}
|
||||
|
||||
if (bds->config_database_path) ks_pool_free(bds->pool, &bds->config_database_path);
|
||||
//bds->config_service = NULL;
|
||||
|
||||
return KS_STATUS_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -33,21 +33,16 @@
|
|||
|
||||
#include "blade.h"
|
||||
|
||||
typedef enum {
|
||||
BP_NONE = 0,
|
||||
BP_MYPOOL = (1 << 0),
|
||||
BP_MYTPOOL = (1 << 1)
|
||||
} bppvt_flag_t;
|
||||
|
||||
struct blade_peer_s {
|
||||
bppvt_flag_t flags;
|
||||
ks_pool_t *pool;
|
||||
ks_thread_pool_t *tpool;
|
||||
blade_service_t *service;
|
||||
|
||||
ks_socket_t sock;
|
||||
ks_bool_t shutdown;
|
||||
ks_bool_t disconnecting;
|
||||
blade_peerstate_t state;
|
||||
blade_peerreason_t reason;
|
||||
|
||||
kws_t *kws;
|
||||
ks_thread_t *kws_thread;
|
||||
|
||||
|
@ -62,8 +57,6 @@ void *blade_peer_kws_thread(ks_thread_t *thread, void *data);
|
|||
KS_DECLARE(ks_status_t) blade_peer_destroy(blade_peer_t **bpP)
|
||||
{
|
||||
blade_peer_t *bp = NULL;
|
||||
bppvt_flag_t flags;
|
||||
ks_pool_t *pool;
|
||||
|
||||
ks_assert(bpP);
|
||||
|
||||
|
@ -72,47 +65,31 @@ KS_DECLARE(ks_status_t) blade_peer_destroy(blade_peer_t **bpP)
|
|||
|
||||
ks_assert(bp);
|
||||
|
||||
flags = bp->flags;
|
||||
pool = bp->pool;
|
||||
|
||||
blade_peer_shutdown(bp);
|
||||
|
||||
ks_q_destroy(&bp->messages_sending);
|
||||
ks_q_destroy(&bp->messages_receiving);
|
||||
|
||||
if (bp->tpool && (flags & BP_MYTPOOL)) ks_thread_pool_destroy(&bp->tpool);
|
||||
|
||||
ks_pool_free(bp->pool, &bp);
|
||||
|
||||
if (pool && (flags & BP_MYPOOL)) ks_pool_close(&pool);
|
||||
|
||||
return KS_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, ks_thread_pool_t *tpool, blade_service_t *service)
|
||||
{
|
||||
bppvt_flag_t newflags = BP_NONE;
|
||||
blade_peer_t *bp = NULL;
|
||||
|
||||
ks_assert(bpP);
|
||||
ks_assert(pool);
|
||||
ks_assert(tpool);
|
||||
ks_assert(service);
|
||||
|
||||
if (!pool) {
|
||||
newflags |= BP_MYPOOL;
|
||||
ks_pool_open(&pool);
|
||||
ks_assert(pool);
|
||||
}
|
||||
if (!tpool) {
|
||||
newflags |= BP_MYTPOOL;
|
||||
ks_thread_pool_create(&tpool, BLADE_PEER_TPOOL_MIN, BLADE_PEER_TPOOL_MAX, BLADE_PEER_TPOOL_STACK, KS_PRI_NORMAL, BLADE_PEER_TPOOL_IDLE);
|
||||
ks_assert(tpool);
|
||||
}
|
||||
|
||||
bp = ks_pool_alloc(pool, sizeof(*bp));
|
||||
bp->flags = newflags;
|
||||
bp->pool = pool;
|
||||
bp->tpool = tpool;
|
||||
bp->service = service;
|
||||
bp->state = BLADE_PEERSTATE_CONNECTING;
|
||||
bp->reason = BLADE_PEERREASON_NORMAL;
|
||||
ks_q_create(&bp->messages_sending, pool, 0);
|
||||
ks_q_create(&bp->messages_receiving, pool, 0);
|
||||
*bpP = bp;
|
||||
|
@ -132,6 +109,8 @@ KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, ks_socket_t sock)
|
|||
blade_peer_shutdown(bp);
|
||||
|
||||
bp->sock = sock;
|
||||
bp->state = BLADE_PEERSTATE_CONNECTING;
|
||||
bp->reason = BLADE_PEERREASON_NORMAL;
|
||||
|
||||
if (ks_thread_create_ex(&bp->kws_thread,
|
||||
blade_peer_kws_thread,
|
||||
|
@ -139,7 +118,11 @@ KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, ks_socket_t sock)
|
|||
KS_THREAD_FLAG_DEFAULT,
|
||||
KS_THREAD_DEFAULT_STACK,
|
||||
KS_PRI_NORMAL,
|
||||
bp->pool) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
|
||||
bp->pool) != KS_STATUS_SUCCESS) {
|
||||
// @todo error logging
|
||||
blade_peer_disconnect(bp, BLADE_PEERREASON_ERROR);
|
||||
return KS_STATUS_FAIL;
|
||||
}
|
||||
|
||||
return KS_STATUS_SUCCESS;
|
||||
}
|
||||
|
@ -168,17 +151,19 @@ KS_DECLARE(ks_status_t) blade_peer_shutdown(blade_peer_t *bp)
|
|||
return KS_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
KS_DECLARE(void) blade_peer_disconnect(blade_peer_t *bp)
|
||||
KS_DECLARE(void) blade_peer_disconnect(blade_peer_t *bp, blade_peerreason_t reason)
|
||||
{
|
||||
ks_assert(bp);
|
||||
|
||||
bp->disconnecting = KS_TRUE;
|
||||
// @todo check if already disconnecting for another reason, avoid resetting to get initial reason for disconnect?
|
||||
bp->reason = reason;
|
||||
bp->state = BLADE_PEERSTATE_DISCONNECTING;
|
||||
}
|
||||
|
||||
KS_DECLARE(ks_bool_t) blade_peer_disconnecting(blade_peer_t *bp)
|
||||
KS_DECLARE(blade_peerstate_t) blade_peer_state(blade_peer_t *bp)
|
||||
{
|
||||
ks_assert(bp);
|
||||
return bp->disconnecting;
|
||||
return bp->state;
|
||||
}
|
||||
|
||||
KS_DECLARE(ks_status_t) blade_peer_message_pop(blade_peer_t *peer, blade_message_t **message)
|
||||
|
@ -199,9 +184,8 @@ KS_DECLARE(ks_status_t) blade_peer_message_push(blade_peer_t *peer, void *data,
|
|||
ks_assert(data_length > 0);
|
||||
|
||||
if (blade_handle_message_claim(blade_service_handle(peer->service), &message, data, data_length) != KS_STATUS_SUCCESS || !message) {
|
||||
// @todo error handling
|
||||
// just drop the peer for now, the only failure scenarios are asserted OOM, or if the discard queue pop fails
|
||||
peer->disconnecting = KS_TRUE;
|
||||
// @todo error logging
|
||||
blade_peer_disconnect(peer, BLADE_PEERREASON_ERROR);
|
||||
return KS_STATUS_FAIL;
|
||||
}
|
||||
ks_q_push(peer->messages_sending, message);
|
||||
|
@ -222,20 +206,27 @@ void *blade_peer_kws_thread(ks_thread_t *thread, void *data)
|
|||
|
||||
peer = (blade_peer_t *)data;
|
||||
|
||||
// @todo consider using an INITIALIZING state to track when there is problems during initialization (specifically SSL negotiations)?
|
||||
// @todo should stack be notified with an internal event callback here before logic layer initialization starts (IE, before SSL negotiations)?
|
||||
peer->state = BLADE_PEERSTATE_RUNNING;
|
||||
|
||||
// @todo: SSL init stuffs based on data from peer->service->config_websockets_ssl to pass into kws_init
|
||||
|
||||
if (kws_init(&peer->kws, peer->sock, NULL, NULL, KWS_BLOCK, peer->pool) != KS_STATUS_SUCCESS) {
|
||||
peer->disconnecting = KS_TRUE;
|
||||
// @todo error logging
|
||||
blade_peer_disconnect(peer, BLADE_PEERREASON_ERROR);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
blade_service_peer_state_callback(peer->service, peer, BLADE_PEERSTATE_RUNNING);
|
||||
|
||||
while (!peer->shutdown) {
|
||||
// @todo get exact timeout from service config?
|
||||
poll_flags = ks_wait_sock(peer->sock, 100, KS_POLL_READ | KS_POLL_ERROR);
|
||||
|
||||
if (poll_flags & KS_POLL_ERROR) {
|
||||
// @todo switch this (and others below) to the enum for the state callback, called during the service connected peer cleanup
|
||||
peer->disconnecting = KS_TRUE;
|
||||
// @todo error logging
|
||||
blade_peer_disconnect(peer, BLADE_PEERREASON_ERROR);
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -243,37 +234,31 @@ void *blade_peer_kws_thread(ks_thread_t *thread, void *data)
|
|||
frame_data_len = kws_read_frame(peer->kws, &opcode, &frame_data);
|
||||
|
||||
if (frame_data_len <= 0) {
|
||||
// @todo error handling, strerror(ks_errno())
|
||||
// @todo error logging, strerror(ks_errno())
|
||||
// 0 means socket closed with WS_NONE, which closes websocket with no additional reason
|
||||
// -1 means socket closed with a general failure
|
||||
// -2 means nonblocking wait
|
||||
// other values are based on WS_XXX reasons
|
||||
// negative values are based on reasons, except for -1 is but -2 is nonblocking wait, and
|
||||
|
||||
// @todo: this way of disconnecting would have the service periodically check the list of connected peers for those that are disconnecting,
|
||||
// remove them from the connected peer list, and then call peer destroy which will wait for this thread to rejoin which it already will have,
|
||||
// and then destroy the inner kws and finish any cleanup of the actual socket if neccessary, and can still call an ondisconnected callback
|
||||
// at the service level
|
||||
peer->disconnecting = KS_TRUE;
|
||||
blade_peer_disconnect(peer, BLADE_PEERREASON_ERROR);
|
||||
break;
|
||||
}
|
||||
|
||||
if (blade_handle_message_claim(blade_service_handle(peer->service), &message, frame_data, frame_data_len) != KS_STATUS_SUCCESS || !message) {
|
||||
// @todo error handling
|
||||
// just drop the peer for now, the only failure scenarios are asserted OOM, or if the discard queue pop fails
|
||||
peer->disconnecting = KS_TRUE;
|
||||
// @todo error logging
|
||||
blade_peer_disconnect(peer, BLADE_PEERREASON_ERROR);
|
||||
break;
|
||||
}
|
||||
|
||||
ks_q_push(peer->messages_receiving, message);
|
||||
// @todo callback up the stack to indicate a message has been received and can be popped (more efficient than constantly polling by popping)?
|
||||
// might not perfectly fit the traditional state callback, but it could work if it only sends the state temporarily and does NOT actually change
|
||||
// the internal state to receiving
|
||||
blade_service_peer_state_callback(peer->service, peer, BLADE_PEERSTATE_RECEIVING);
|
||||
}
|
||||
|
||||
// @todo consider only sending one message at a time and use shorter polling timeout to prevent any considerable blocking if send buffers get full
|
||||
while (ks_q_trypop(peer->messages_sending, (void **)&message) == KS_STATUS_SUCCESS && message) {
|
||||
blade_message_get(message, (void **)&frame_data, &frame_data_len);
|
||||
// @todo may need to get the WSOC_TEXT from the message if using WSOC_BINARY is desired later
|
||||
kws_write_frame(peer->kws, WSOC_TEXT, frame_data, frame_data_len);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,19 +33,13 @@
|
|||
|
||||
#include "blade.h"
|
||||
|
||||
typedef enum {
|
||||
BS_NONE = 0,
|
||||
BS_MYPOOL = (1 << 0),
|
||||
BS_MYTPOOL = (1 << 1)
|
||||
} bspvt_flag_t;
|
||||
|
||||
#define BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX 16
|
||||
|
||||
struct blade_service_s {
|
||||
bspvt_flag_t flags;
|
||||
ks_pool_t *pool;
|
||||
ks_thread_pool_t *tpool;
|
||||
blade_handle_t *handle;
|
||||
blade_service_peer_state_callback_t peer_state_callback;
|
||||
|
||||
ks_sockaddr_t config_websockets_endpoints_ipv4[BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX];
|
||||
ks_sockaddr_t config_websockets_endpoints_ipv6[BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX];
|
||||
|
@ -56,7 +50,6 @@ struct blade_service_s {
|
|||
ks_bool_t shutdown;
|
||||
|
||||
struct pollfd *listeners_poll;
|
||||
int32_t *listeners_families;
|
||||
int32_t listeners_size;
|
||||
int32_t listeners_length;
|
||||
ks_thread_t *listeners_thread;
|
||||
|
@ -72,8 +65,6 @@ ks_status_t blade_service_listen(blade_service_t *bs, ks_sockaddr_t *addr);
|
|||
KS_DECLARE(ks_status_t) blade_service_destroy(blade_service_t **bsP)
|
||||
{
|
||||
blade_service_t *bs = NULL;
|
||||
bspvt_flag_t flags;
|
||||
ks_pool_t *pool;
|
||||
|
||||
ks_assert(bsP);
|
||||
|
||||
|
@ -82,46 +73,33 @@ KS_DECLARE(ks_status_t) blade_service_destroy(blade_service_t **bsP)
|
|||
|
||||
ks_assert(bs);
|
||||
|
||||
flags = bs->flags;
|
||||
pool = bs->pool;
|
||||
|
||||
blade_service_shutdown(bs);
|
||||
|
||||
list_destroy(&bs->connected);
|
||||
|
||||
if (bs->tpool && (flags & BS_MYTPOOL)) ks_thread_pool_destroy(&bs->tpool);
|
||||
|
||||
ks_pool_free(bs->pool, &bs);
|
||||
|
||||
if (pool && (flags & BS_MYPOOL)) ks_pool_close(&pool);
|
||||
|
||||
return KS_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
KS_DECLARE(ks_status_t) blade_service_create(blade_service_t **bsP, ks_pool_t *pool, ks_thread_pool_t *tpool, blade_handle_t *handle)
|
||||
KS_DECLARE(ks_status_t) blade_service_create(blade_service_t **bsP,
|
||||
ks_pool_t *pool,
|
||||
ks_thread_pool_t *tpool,
|
||||
blade_handle_t *handle,
|
||||
blade_service_peer_state_callback_t peer_state_callback)
|
||||
{
|
||||
bspvt_flag_t newflags = BS_NONE;
|
||||
blade_service_t *bs = NULL;
|
||||
|
||||
ks_assert(bsP);
|
||||
ks_assert(pool);
|
||||
ks_assert(tpool);
|
||||
ks_assert(handle);
|
||||
|
||||
if (!pool) {
|
||||
newflags |= BS_MYPOOL;
|
||||
ks_pool_open(&pool);
|
||||
ks_assert(pool);
|
||||
}
|
||||
if (!tpool) {
|
||||
newflags |= BS_MYTPOOL;
|
||||
ks_thread_pool_create(&tpool, BLADE_SERVICE_TPOOL_MIN, BLADE_SERVICE_TPOOL_MAX, BLADE_SERVICE_TPOOL_STACK, KS_PRI_NORMAL, BLADE_SERVICE_TPOOL_IDLE);
|
||||
ks_assert(tpool);
|
||||
}
|
||||
|
||||
bs = ks_pool_alloc(pool, sizeof(*bs));
|
||||
bs->flags = newflags;
|
||||
bs->pool = pool;
|
||||
bs->tpool = tpool;
|
||||
bs->handle = handle;
|
||||
bs->peer_state_callback = peer_state_callback;
|
||||
list_init(&bs->connected);
|
||||
*bsP = bs;
|
||||
|
||||
|
@ -258,6 +236,8 @@ KS_DECLARE(ks_status_t) blade_service_shutdown(blade_service_t *bs)
|
|||
{
|
||||
ks_assert(bs);
|
||||
|
||||
// @todo 1 more callback for blade_service_state_callback_t? providing event up the stack on service startup, shutdown, and service errors?
|
||||
|
||||
bs->shutdown = KS_TRUE;
|
||||
|
||||
if (bs->listeners_thread) {
|
||||
|
@ -275,7 +255,7 @@ KS_DECLARE(ks_status_t) blade_service_shutdown(blade_service_t *bs)
|
|||
list_iterator_start(&bs->connected);
|
||||
while (list_iterator_hasnext(&bs->connected)) {
|
||||
blade_peer_t *peer = (blade_peer_t *)list_iterator_next(&bs->connected);
|
||||
blade_peer_destroy(&peer);
|
||||
blade_peer_destroy(&peer); // @todo determine if NOT receiving the DISCONNECTING event callback for these will matter, as service is being shutdown
|
||||
}
|
||||
list_iterator_stop(&bs->connected);
|
||||
list_clear(&bs->connected);
|
||||
|
@ -284,6 +264,14 @@ KS_DECLARE(ks_status_t) blade_service_shutdown(blade_service_t *bs)
|
|||
return KS_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
KS_DECLARE(void) blade_service_peer_state_callback(blade_service_t *bs, blade_peer_t *bp, blade_peerstate_t state)
|
||||
{
|
||||
ks_assert(bs);
|
||||
ks_assert(bp);
|
||||
|
||||
if (bs->peer_state_callback) bs->peer_state_callback(bs, bp, state);
|
||||
}
|
||||
|
||||
ks_status_t blade_service_listen(blade_service_t *bs, ks_sockaddr_t *addr)
|
||||
{
|
||||
ks_socket_t listener = KS_SOCK_INVALID;
|
||||
|
@ -317,12 +305,9 @@ ks_status_t blade_service_listen(blade_service_t *bs, ks_sockaddr_t *addr)
|
|||
bs->listeners_size = bs->listeners_length;
|
||||
bs->listeners_poll = (struct pollfd *)ks_pool_resize(bs->pool, bs->listeners_poll, sizeof(struct pollfd) * bs->listeners_size);
|
||||
ks_assert(bs->listeners_poll);
|
||||
bs->listeners_families = (int32_t *)ks_pool_resize(bs->pool, bs->listeners_families, sizeof(int32_t) * bs->listeners_size);
|
||||
ks_assert(bs->listeners_families);
|
||||
}
|
||||
bs->listeners_poll[listener_index].fd = listener;
|
||||
bs->listeners_poll[listener_index].events = POLLIN | POLLERR;
|
||||
bs->listeners_families[listener_index] = addr->family;
|
||||
|
||||
done:
|
||||
if (ret != KS_STATUS_SUCCESS) {
|
||||
|
@ -343,6 +328,8 @@ void *blade_service_listeners_thread(ks_thread_t *thread, void *data)
|
|||
|
||||
service = (blade_service_t *)data;
|
||||
|
||||
// @todo 1 more callback for blade_service_state_callback_t? providing event up the stack on service startup, shutdown, and service errors?
|
||||
|
||||
while (!service->shutdown) {
|
||||
// @todo take exact timeout from a setting in config_service_endpoints
|
||||
if (ks_poll(service->listeners_poll, service->listeners_length, 100) > 0) {
|
||||
|
@ -367,6 +354,7 @@ void *blade_service_listeners_thread(ks_thread_t *thread, void *data)
|
|||
ks_assert(peer);
|
||||
|
||||
// @todo call state callback with connecting enum state
|
||||
blade_service_peer_state_callback(service, peer, BLADE_PEERSTATE_CONNECTING);
|
||||
|
||||
blade_peer_startup(peer, sock);
|
||||
|
||||
|
@ -379,10 +367,11 @@ void *blade_service_listeners_thread(ks_thread_t *thread, void *data)
|
|||
blade_peer_t *peer = (blade_peer_t *)list_iterator_next(&service->connected);
|
||||
// @todo expose accessor for disconnecting, after changing it into the state callback enum
|
||||
// ensure that every way kws_close might occur leads back to disconnecting = KS_TRUE for this to universally process disconnects
|
||||
if (blade_peer_disconnecting(peer)) {
|
||||
if (blade_peer_state(peer) == BLADE_PEERSTATE_DISCONNECTING) {
|
||||
// @todo check if there is an iterator based remove function, or indexed iteration to use list_delete_at()
|
||||
list_delete(&service->connected, peer);
|
||||
// @todo call state callback with internal disconnecting enum state (stored in peer to hold specific reason for disconnecting)
|
||||
blade_service_peer_state_callback(service, peer, BLADE_PEERSTATE_DISCONNECTING);
|
||||
|
||||
// @todo switch to blade_peer_shutdown(&peer) and blade_peer_discard(&peer) after introducing recycling of peers
|
||||
blade_peer_destroy(&peer);
|
||||
}
|
||||
|
|
|
@ -143,14 +143,14 @@ ks_status_t blade_handle_config(blade_handle_t *bh, config_setting_t *config)
|
|||
return KS_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_t *config)
|
||||
KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_t *config, blade_service_peer_state_callback_t service_peer_state_callback)
|
||||
{
|
||||
ks_assert(bh);
|
||||
|
||||
if (blade_handle_config(bh, config) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
|
||||
|
||||
if (bh->config_service && !blade_handle_service_available(bh)) {
|
||||
blade_service_create(&bh->service, bh->pool, bh->tpool, bh);
|
||||
blade_service_create(&bh->service, bh->pool, bh->tpool, bh, service_peer_state_callback);
|
||||
ks_assert(bh->service);
|
||||
if (blade_service_startup(bh->service, bh->config_service) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
|
||||
}
|
||||
|
|
|
@ -35,11 +35,6 @@
|
|||
#define _BLADE_DATASTORE_H_
|
||||
#include <blade.h>
|
||||
|
||||
#define BLADE_DATASTORE_TPOOL_MIN 2
|
||||
#define BLADE_DATASTORE_TPOOL_MAX 8
|
||||
#define BLADE_DATASTORE_TPOOL_STACK (1024 * 256)
|
||||
#define BLADE_DATASTORE_TPOOL_IDLE 10
|
||||
|
||||
KS_BEGIN_EXTERN_C
|
||||
KS_DECLARE(ks_status_t) blade_datastore_create(blade_datastore_t **bdsP, ks_pool_t *pool, ks_thread_pool_t *tpool);
|
||||
KS_DECLARE(ks_status_t) blade_datastore_destroy(blade_datastore_t **bdsP);
|
||||
|
|
|
@ -35,18 +35,14 @@
|
|||
#define _BLADE_PEER_H_
|
||||
#include <blade.h>
|
||||
|
||||
#define BLADE_PEER_TPOOL_MIN 2
|
||||
#define BLADE_PEER_TPOOL_MAX 8
|
||||
#define BLADE_PEER_TPOOL_STACK (1024 * 256)
|
||||
#define BLADE_PEER_TPOOL_IDLE 10
|
||||
|
||||
KS_BEGIN_EXTERN_C
|
||||
KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, ks_thread_pool_t *tpool, blade_service_t *service);
|
||||
KS_DECLARE(ks_status_t) blade_peer_destroy(blade_peer_t **bpP);
|
||||
KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, ks_socket_t sock);
|
||||
KS_DECLARE(ks_status_t) blade_peer_shutdown(blade_peer_t *bp);
|
||||
KS_DECLARE(void) blade_peer_disconnect(blade_peer_t *bp);
|
||||
KS_DECLARE(ks_bool_t) blade_peer_disconnecting(blade_peer_t *bp);
|
||||
KS_DECLARE(void) blade_peer_disconnect(blade_peer_t *bp, blade_peerreason_t reason);
|
||||
KS_DECLARE(blade_peerstate_t) blade_peer_state(blade_peer_t *bp);
|
||||
KS_DECLARE(blade_peerreason_t) blade_peer_reason(blade_peer_t *bp);
|
||||
KS_DECLARE(ks_status_t) blade_peer_message_pop(blade_peer_t *peer, blade_message_t **message);
|
||||
KS_DECLARE(ks_status_t) blade_peer_message_push(blade_peer_t *peer, void *data, ks_size_t data_length);
|
||||
KS_END_EXTERN_C
|
||||
|
|
|
@ -35,17 +35,17 @@
|
|||
#define _BLADE_SERVICE_H_
|
||||
#include <blade.h>
|
||||
|
||||
#define BLADE_SERVICE_TPOOL_MIN 2
|
||||
#define BLADE_SERVICE_TPOOL_MAX 8
|
||||
#define BLADE_SERVICE_TPOOL_STACK (1024 * 256)
|
||||
#define BLADE_SERVICE_TPOOL_IDLE 10
|
||||
|
||||
KS_BEGIN_EXTERN_C
|
||||
KS_DECLARE(ks_status_t) blade_service_create(blade_service_t **bsP, ks_pool_t *pool, ks_thread_pool_t *tpool, blade_handle_t *handle);
|
||||
KS_DECLARE(ks_status_t) blade_service_create(blade_service_t **bsP,
|
||||
ks_pool_t *pool,
|
||||
ks_thread_pool_t *tpool,
|
||||
blade_handle_t *handle,
|
||||
blade_service_peer_state_callback_t peer_state_callback);
|
||||
KS_DECLARE(ks_status_t) blade_service_destroy(blade_service_t **bsP);
|
||||
KS_DECLARE(blade_handle_t *) blade_service_handle(blade_service_t *bs);
|
||||
KS_DECLARE(ks_status_t) blade_service_startup(blade_service_t *bs, config_setting_t *config);
|
||||
KS_DECLARE(ks_status_t) blade_service_shutdown(blade_service_t *bs);
|
||||
KS_DECLARE(void) blade_service_peer_state_callback(blade_service_t *bs, blade_peer_t *bp, blade_peerstate_t state);
|
||||
KS_END_EXTERN_C
|
||||
|
||||
#endif
|
||||
|
|
|
@ -43,7 +43,7 @@
|
|||
KS_BEGIN_EXTERN_C
|
||||
KS_DECLARE(ks_status_t) blade_handle_destroy(blade_handle_t **bhP);
|
||||
KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *pool, ks_thread_pool_t *tpool);
|
||||
KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_t *config);
|
||||
KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_t *config, blade_service_peer_state_callback_t service_peer_state_callback);
|
||||
KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh);
|
||||
|
||||
KS_DECLARE(ks_status_t) blade_handle_message_claim(blade_handle_t *bh, blade_message_t **message, void *data, ks_size_t data_length);
|
||||
|
|
|
@ -37,12 +37,26 @@
|
|||
|
||||
KS_BEGIN_EXTERN_C
|
||||
|
||||
typedef enum {
|
||||
BLADE_PEERSTATE_CONNECTING,
|
||||
BLADE_PEERSTATE_DISCONNECTING,
|
||||
BLADE_PEERSTATE_RUNNING,
|
||||
BLADE_PEERSTATE_RECEIVING,
|
||||
} blade_peerstate_t;
|
||||
|
||||
typedef enum {
|
||||
BLADE_PEERREASON_NORMAL,
|
||||
BLADE_PEERREASON_ERROR,
|
||||
// @todo populate more reasons for disconnecting as neccessary
|
||||
} blade_peerreason_t;
|
||||
|
||||
typedef struct blade_handle_s blade_handle_t;
|
||||
typedef struct blade_peer_s blade_peer_t;
|
||||
typedef struct blade_service_s blade_service_t;
|
||||
typedef struct blade_message_s blade_message_t;
|
||||
typedef struct blade_datastore_s blade_datastore_t;
|
||||
|
||||
typedef void (*blade_service_peer_state_callback_t)(blade_service_t *bs, blade_peer_t *bp, blade_peerstate_t state);
|
||||
typedef ks_bool_t (*blade_datastore_fetch_callback_t)(blade_datastore_t *bds, const void *data, uint32_t data_length, void *userdata);
|
||||
|
||||
KS_END_EXTERN_C
|
||||
|
|
Loading…
Reference in New Issue