FS-9952: Temporary commit for some peer review

This commit is contained in:
Shane Bryldt 2017-01-27 19:49:02 +00:00 committed by Mike Jerris
parent 2bce38afce
commit eb57b7910e
14 changed files with 978 additions and 154 deletions

View File

@ -11,13 +11,13 @@ libunqlite_la_CFLAGS = -DUNQLITE_ENABLE_THREADS
libunqlite_la_LIBADD = -lpthread
lib_LTLIBRARIES = libblade.la
libblade_la_SOURCES = src/blade.c src/blade_stack.c src/blade_peer.c src/bpcp.c src/blade_datastore.c
libblade_la_SOURCES = src/blade.c src/blade_stack.c src/blade_peer.c src/blade_service.c src/bpcp.c src/blade_datastore.c src/blade_directory.c
libblade_la_CFLAGS = $(AM_CFLAGS) $(AM_CPPFLAGS)
libblade_la_LDFLAGS = -version-info 0:1:0 -lncurses -lpthread -lm $(AM_LDFLAGS)
libblade_la_LDFLAGS = -version-info 0:1:0 -lncurses -lpthread -lm -lconfig $(AM_LDFLAGS)
libblade_la_LIBADD = libunqlite.la
library_includedir = $(prefix)/include
library_include_HEADERS = src/include/blade.h src/include/blade_types.h src/include/blade_stack.h src/include/blade_peer.h src/include/bpcp.h
library_include_HEADERS += src/include/blade_datastore.h
library_include_HEADERS = src/include/blade.h src/include/blade_types.h src/include/blade_stack.h src/include/blade_peer.h src/include/blade_service.h
library_include_HEADERS += src/include/bpcp.h src/include/blade_datastore.h src/include/blade_directory.h
library_include_HEADERS += src/include/unqlite.h test/tap.h
tests: libblade.la

View File

@ -37,12 +37,19 @@
typedef enum {
BDS_NONE = 0,
BDS_MYPOOL = (1 << 0),
BDS_MYTPOOL = (1 << 1),
} bdspvt_flag_t;
struct blade_datastore_s {
bdspvt_flag_t flags;
ks_pool_t *pool;
ks_thread_pool_t *tpool;
const char *config_database_path;
//config_setting_t *config_service;
unqlite *db;
//blade_service_t *service;
};
struct blade_datastore_fetch_userdata_s
@ -71,11 +78,10 @@ KS_DECLARE(ks_status_t) blade_datastore_destroy(blade_datastore_t **bdsP)
flags = bds->flags;
pool = bds->pool;
if (bds->db) {
unqlite_close(bds->db);
bds->db = NULL;
}
blade_datastore_shutdown(bds);
if (bds->tpool && (flags & BDS_MYTPOOL)) ks_thread_pool_destroy(&bds->tpool);
ks_pool_free(bds->pool, &bds);
if (pool && (flags & BDS_MYPOOL)) ks_pool_close(&pool);
@ -83,7 +89,7 @@ KS_DECLARE(ks_status_t) blade_datastore_destroy(blade_datastore_t **bdsP)
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_datastore_create(blade_datastore_t **bdsP, ks_pool_t *pool)
KS_DECLARE(ks_status_t) blade_datastore_create(blade_datastore_t **bdsP, ks_pool_t *pool, ks_thread_pool_t *tpool)
{
bdspvt_flag_t newflags = BDS_NONE;
blade_datastore_t *bds = NULL;
@ -93,16 +99,67 @@ KS_DECLARE(ks_status_t) blade_datastore_create(blade_datastore_t **bdsP, ks_pool
ks_pool_open(&pool);
ks_assert(pool);
}
// @todo: move thread pool creation to startup which allows thread pool to be configurable
if (!tpool) {
newflags |= BDS_MYTPOOL;
ks_thread_pool_create(&tpool,
BLADE_DATASTORE_TPOOL_MIN,
BLADE_DATASTORE_TPOOL_MAX,
BLADE_DATASTORE_TPOOL_STACK,
KS_PRI_NORMAL,
BLADE_DATASTORE_TPOOL_IDLE);
ks_assert(tpool);
}
bds = ks_pool_alloc(pool, sizeof(*bds));
bds->flags = newflags;
bds->pool = pool;
bds->tpool = tpool;
*bdsP = bds;
if (unqlite_open(&bds->db, NULL, UNQLITE_OPEN_IN_MEMORY) != UNQLITE_OK) {
return KS_STATUS_SUCCESS;
}
ks_status_t blade_datastore_config(blade_datastore_t *bds, config_setting_t *config)
{
config_setting_t *tmp;
config_setting_t *database = NULL;
//config_setting_t *service = NULL;
const char *config_database_path = NULL;
ks_assert(bds);
if (!config) return KS_STATUS_FAIL;
if (!config_setting_is_group(config)) return KS_STATUS_FAIL;
database = config_setting_get_member(config, "database");
if (!database) return KS_STATUS_FAIL;
tmp = config_lookup_from(database, "path");
if (!tmp) return KS_STATUS_FAIL;
if (config_setting_type(tmp) != CONFIG_TYPE_STRING) return KS_STATUS_FAIL;
config_database_path = config_setting_get_string(tmp);
//service = config_setting_get_member(config, "service");
if (bds->config_database_path) ks_pool_free(bds->pool, &bds->config_database_path);
bds->config_database_path = ks_pstrdup(bds->pool, config_database_path);
//bds->config_service = service;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_datastore_startup(blade_datastore_t *bds, config_setting_t *config)
{
ks_assert(bds);
// @todo check if already started
if (blade_datastore_config(bds, config) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
//if (unqlite_open(&bds->db, NULL, UNQLITE_OPEN_IN_MEMORY) != UNQLITE_OK) {
if (unqlite_open(&bds->db, bds->config_database_path, UNQLITE_OPEN_CREATE) != UNQLITE_OK) {
const char *errbuf = NULL;
blade_datastore_error(bds, &errbuf, NULL);
ks_log(KS_LOG_ERROR, "BDS Error: %s\n", errbuf);
ks_log(KS_LOG_ERROR, "BDS Open Error: %s\n", errbuf);
return KS_STATUS_FAIL;
}
@ -110,15 +167,31 @@ KS_DECLARE(ks_status_t) blade_datastore_create(blade_datastore_t **bdsP, ks_pool
// @todo VM init if document store is used (and output consumer callback)
//blade_service_create(&bds->service, bds->pool, bds->tpool);
//ks_assert(bds->service);
//blade_service_startup(bds->service, bds->config_service);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(void) blade_datastore_pulse(blade_datastore_t *bds, int32_t timeout)
KS_DECLARE(ks_status_t) blade_datastore_shutdown(blade_datastore_t *bds)
{
ks_assert(bds);
ks_assert(timeout >= 0);
//if (bds->service) blade_service_destroy(&bds->service);
if (bds->db) {
unqlite_close(bds->db);
bds->db = NULL;
}
if (bds->config_database_path) ks_pool_free(bds->pool, &bds->config_database_path);
//bds->config_service = NULL;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(void) blade_datastore_error(blade_datastore_t *bds, const char **buffer, int32_t *buffer_length)
{
ks_assert(bds);
@ -147,7 +220,7 @@ KS_DECLARE(ks_status_t) blade_datastore_store(blade_datastore_t *bds, const void
else {
const char *errbuf;
blade_datastore_error(bds, &errbuf, NULL);
ks_log(KS_LOG_ERROR, "BDS Error: %s\n", errbuf);
ks_log(KS_LOG_ERROR, "BDS Store Error: %s\n", errbuf);
ret = KS_STATUS_FAIL;
}
@ -196,10 +269,11 @@ KS_DECLARE(ks_status_t) blade_datastore_fetch(blade_datastore_t *bds,
if (rc != UNQLITE_OK) {
if (rc == UNQLITE_BUSY) ret = KS_STATUS_TIMEOUT;
else if(rc == UNQLITE_NOTFOUND) ret = KS_STATUS_NOT_FOUND;
else {
const char *errbuf;
blade_datastore_error(bds, &errbuf, NULL);
ks_log(KS_LOG_ERROR, "BDS Error: %s\n", errbuf);
ks_log(KS_LOG_ERROR, "BDS Fetch Error: %s\n", errbuf);
ret = KS_STATUS_FAIL;
}

View File

@ -0,0 +1,165 @@
/*
* Copyright (c) 2007-2014, Anthony Minessale II
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* * Neither the name of the original author; nor the names of any contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
* OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "blade.h"
typedef enum {
BD_NONE = 0,
BD_MYPOOL = (1 << 0),
BD_MYTPOOL = (1 << 1),
} bdpvt_flag_t;
struct blade_directory_s {
bdpvt_flag_t flags;
ks_pool_t *pool;
ks_thread_pool_t *tpool;
config_setting_t *config_service;
blade_service_t *service;
};
KS_DECLARE(ks_status_t) blade_directory_destroy(blade_directory_t **bdP)
{
blade_directory_t *bd = NULL;
bdpvt_flag_t flags;
ks_pool_t *pool;
ks_assert(bdP);
bd = *bdP;
*bdP = NULL;
ks_assert(bd);
flags = bd->flags;
pool = bd->pool;
blade_directory_shutdown(bd);
if (bd->tpool && (flags & BD_MYTPOOL)) ks_thread_pool_destroy(&bd->tpool);
ks_pool_free(bd->pool, &bd);
if (pool && (flags & BD_MYPOOL)) ks_pool_close(&pool);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_directory_create(blade_directory_t **bdP, ks_pool_t *pool, ks_thread_pool_t *tpool)
{
bdpvt_flag_t newflags = BD_NONE;
blade_directory_t *bd = NULL;
if (!pool) {
newflags |= BD_MYPOOL;
ks_pool_open(&pool);
ks_assert(pool);
}
// @todo: move thread pool creation to startup which allows thread pool to be configurable
if (!tpool) {
newflags |= BD_MYTPOOL;
ks_thread_pool_create(&tpool,
BLADE_DIRECTORY_TPOOL_MIN,
BLADE_DIRECTORY_TPOOL_MAX,
BLADE_DIRECTORY_TPOOL_STACK,
KS_PRI_NORMAL,
BLADE_DIRECTORY_TPOOL_IDLE);
ks_assert(tpool);
}
bd = ks_pool_alloc(pool, sizeof(*bd));
bd->flags = newflags;
bd->pool = pool;
bd->tpool = tpool;
*bdP = bd;
return KS_STATUS_SUCCESS;
}
ks_status_t blade_directory_config(blade_directory_t *bd, config_setting_t *config)
{
config_setting_t *service = NULL;
ks_assert(bd);
if (!config) return KS_STATUS_FAIL;
if (!config_setting_is_group(config)) return KS_STATUS_FAIL;
service = config_setting_get_member(config, "service");
if (!service) return KS_STATUS_FAIL;
bd->config_service = service;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_directory_startup(blade_directory_t *bd, config_setting_t *config)
{
ks_assert(bd);
blade_directory_shutdown(bd);
if (blade_directory_config(bd, config) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
blade_service_create(&bd->service, bd->pool, bd->tpool);
ks_assert(bd->service);
if (blade_service_startup(bd->service, bd->config_service) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_directory_shutdown(blade_directory_t *bd)
{
ks_assert(bd);
if (bd->service) blade_service_destroy(&bd->service);
return KS_STATUS_SUCCESS;
}
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
*/

View File

@ -33,11 +33,6 @@
#include "blade.h"
#define KS_DHT_TPOOL_MIN 2
#define KS_DHT_TPOOL_MAX 8
#define KS_DHT_TPOOL_STACK (1024 * 256)
#define KS_DHT_TPOOL_IDLE 10
typedef enum {
BP_NONE = 0,
BP_MYPOOL = (1 << 0),
@ -48,10 +43,20 @@ struct blade_peer_s {
bppvt_flag_t flags;
ks_pool_t *pool;
ks_thread_pool_t *tpool;
ks_dht_t *dht;
blade_service_t *service;
ks_bool_t shutdown;
kws_t *kws;
ks_thread_t *kws_thread;
ks_q_t *messages_sending;
ks_q_t *messages_receiving;
};
void *blade_peer_kws_thread(ks_thread_t *thread, void *data);
KS_DECLARE(ks_status_t) blade_peer_destroy(blade_peer_t **bpP)
{
blade_peer_t *bp = NULL;
@ -68,7 +73,11 @@ KS_DECLARE(ks_status_t) blade_peer_destroy(blade_peer_t **bpP)
flags = bp->flags;
pool = bp->pool;
if (bp->dht) ks_dht_destroy(&bp->dht);
blade_peer_shutdown(bp);
ks_q_destroy(&bp->messages_sending);
ks_q_destroy(&bp->messages_receiving);
if (bp->tpool && (flags & BP_MYTPOOL)) ks_thread_pool_destroy(&bp->tpool);
ks_pool_free(bp->pool, &bp);
@ -78,11 +87,13 @@ KS_DECLARE(ks_status_t) blade_peer_destroy(blade_peer_t **bpP)
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, ks_thread_pool_t *tpool, ks_dht_nodeid_t *nodeid)
KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, ks_thread_pool_t *tpool, blade_service_t *service)
{
bppvt_flag_t newflags = BP_NONE;
blade_peer_t *bp = NULL;
ks_dht_t *dht = NULL;
ks_assert(bpP);
ks_assert(service);
if (!pool) {
newflags |= BP_MYPOOL;
@ -94,50 +105,112 @@ KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, k
ks_thread_pool_create(&tpool, BLADE_PEER_TPOOL_MIN, BLADE_PEER_TPOOL_MAX, BLADE_PEER_TPOOL_STACK, KS_PRI_NORMAL, BLADE_PEER_TPOOL_IDLE);
ks_assert(tpool);
}
ks_dht_create(&dht, pool, tpool, nodeid);
ks_assert(dht);
bp = ks_pool_alloc(pool, sizeof(*bp));
bp->flags = newflags;
bp->pool = pool;
bp->tpool = tpool;
bp->dht = dht;
bp->service = service;
ks_q_create(&bp->messages_sending, pool, 0);
ks_q_create(&bp->messages_receiving, pool, 0);
*bpP = bp;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_dht_nodeid_t *) blade_peer_myid(blade_peer_t *bp)
KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, kws_t *kws)
{
ks_assert(bp);
ks_assert(bp->dht);
ks_assert(kws);
return &bp->dht->nodeid;
// @todo: consider using a recycle queue for blade_peer_t in blade_service_t, just need to call startup then
blade_peer_shutdown(bp);
bp->kws = kws;
if (ks_thread_create_ex(&bp->kws_thread,
blade_peer_kws_thread,
bp,
KS_THREAD_FLAG_DEFAULT,
KS_THREAD_DEFAULT_STACK,
KS_PRI_NORMAL,
bp->pool) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(void) blade_peer_autoroute(blade_peer_t *bp, ks_bool_t autoroute, ks_port_t port)
KS_DECLARE(ks_status_t) blade_peer_shutdown(blade_peer_t *bp)
{
ks_assert(bp);
ks_dht_autoroute(bp->dht, autoroute, port);
bp->shutdown = KS_TRUE;
if (bp->kws_thread) {
ks_thread_join(bp->kws_thread);
ks_pool_free(bp->pool, &bp->kws_thread);
}
if (bp->kws) kws_destroy(&bp->kws);
bp->shutdown = KS_FALSE;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_peer_bind(blade_peer_t *bp, const ks_sockaddr_t *addr, ks_dht_endpoint_t **endpoint)
void *blade_peer_kws_thread(ks_thread_t *thread, void *data)
{
ks_assert(bp);
ks_assert(addr);
blade_peer_t *peer;
kws_opcode_t opcode;
uint8_t *data;
ks_size_t data_len;
blade_message_t *message;
return ks_dht_bind(bp->dht, addr, endpoint);
ks_assert(thread);
ks_assert(data);
peer = (blade_peer_t *)data;
while (!peer->shutdown) {
// @todo use nonblocking kws mode so that if no data at all is available yet we can still do other things such as sending messages before trying again
// or easier alternative, just use ks_poll (or select) to check if there is a POLLIN event pending, but this requires direct access to the socket, or
// kws can be updated to add a function to poll the inner socket for events (IE, kws_poll(kws, &inbool, NULL, &errbool, timeout))
data_len = kws_read_frame(peer->kws, &opcode, &data);
if (data_len <= 0) {
// @todo error handling, strerror(ks_errno())
// 0 means socket closed with WS_NONE, which closes websocket with no additional reason
// -1 means socket closed with a general failure
// -2 means nonblocking wait
// other values are based on WS_XXX reasons
// negative values are based on reasons, except for -1 is but -2 is nonblocking wait, and
// @todo: this way of disconnecting would have the service periodically check the list of connected peers for those that are disconnecting,
// remove them from the connected peer list, and then call peer destroy which will wait for this thread to rejoin which it already will have,
// and then destroy the inner kws and finish any cleanup of the actual socket if neccessary, and can still call an ondisconnected callback
// at the service level
peer->disconnecting = KS_TRUE;
break;
}
// @todo this will check the discarded queue first and realloc if there is not enough space, otherwise allocate a message, and finally copy the data
if (blade_handle_message_claim(peer->service->handle, &message, data, data_len) != KS_STATUS_SUCCESS || !message) {
// @todo error handling
// just drop the peer for now, the only failure scenarios are asserted OOM, or if the discard queue pop fails
peer->disconnecting = KS_TRUE;
break;
}
ks_q_push(peer->messages_receiving, message);
// @todo callback up the stack to indicate a message has been received and can be popped (more efficient than constantly polling by popping)?
if (ks_q_trypop(peer->messages_sending, &message) == KS_STATUS_SUCCESS) {
}
}
return NULL;
}
KS_DECLARE(void) blade_peer_pulse(blade_peer_t *bp, int32_t timeout)
{
ks_assert(bp);
ks_assert(timeout >= 0);
ks_dht_pulse(bp->dht, timeout);
}
/* For Emacs:
* Local Variables:
* mode:c

View File

@ -0,0 +1,401 @@
/*
* Copyright (c) 2007-2014, Anthony Minessale II
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* * Neither the name of the original author; nor the names of any contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
* OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "blade.h"
typedef enum {
BS_NONE = 0,
BS_MYPOOL = (1 << 0),
BS_MYTPOOL = (1 << 1)
} bspvt_flag_t;
#define BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX 16
struct blade_service_s {
bspvt_flag_t flags;
ks_pool_t *pool;
ks_thread_pool_t *tpool;
ks_sockaddr_t config_websockets_endpoints_ipv4[BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX];
ks_sockaddr_t config_websockets_endpoints_ipv6[BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX];
int32_t config_websockets_endpoints_ipv4_length;
int32_t config_websockets_endpoints_ipv6_length;
int32_t config_websockets_endpoints_backlog;
ks_bool_t shutdown;
struct pollfd *listeners_poll;
int32_t *listeners_families;
int32_t listeners_size;
int32_t listeners_length;
ks_thread_t *listeners_thread;
list_t connected;
};
void *blade_service_listeners_thread(ks_thread_t *thread, void *data);
ks_status_t blade_service_listen(blade_service_t *bs, ks_sockaddr_t *addr);
KS_DECLARE(ks_status_t) blade_service_destroy(blade_service_t **bsP)
{
blade_service_t *bs = NULL;
bspvt_flag_t flags;
ks_pool_t *pool;
ks_assert(bsP);
bs = *bsP;
*bsP = NULL;
ks_assert(bs);
flags = bs->flags;
pool = bs->pool;
blade_service_shutdown(bs);
list_destroy(&bs->connected);
if (bs->tpool && (flags & BS_MYTPOOL)) ks_thread_pool_destroy(&bs->tpool);
ks_pool_free(bs->pool, &bs);
if (pool && (flags & BS_MYPOOL)) ks_pool_close(&pool);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_service_create(blade_service_t **bsP, ks_pool_t *pool, ks_thread_pool_t *tpool)
{
bspvt_flag_t newflags = BS_NONE;
blade_service_t *bs = NULL;
if (!pool) {
newflags |= BS_MYPOOL;
ks_pool_open(&pool);
ks_assert(pool);
}
if (!tpool) {
newflags |= BS_MYTPOOL;
ks_thread_pool_create(&tpool, BLADE_SERVICE_TPOOL_MIN, BLADE_SERVICE_TPOOL_MAX, BLADE_SERVICE_TPOOL_STACK, KS_PRI_NORMAL, BLADE_SERVICE_TPOOL_IDLE);
ks_assert(tpool);
}
bs = ks_pool_alloc(pool, sizeof(*bs));
bs->flags = newflags;
bs->pool = pool;
bs->tpool = tpool;
list_init(&bs->connected);
*bsP = bs;
return KS_STATUS_SUCCESS;
}
ks_status_t blade_service_config(blade_service_t *bs, config_setting_t *config)
{
config_setting_t *websockets = NULL;
config_setting_t *websockets_endpoints = NULL;
config_setting_t *websockets_endpoints_ipv4 = NULL;
config_setting_t *websockets_endpoints_ipv6 = NULL;
config_setting_t *websockets_ssl = NULL;
config_setting_t *element;
config_setting_t *tmp1;
config_setting_t *tmp2;
ks_sockaddr_t config_websockets_endpoints_ipv4[BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX];
ks_sockaddr_t config_websockets_endpoints_ipv6[BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX];
int32_t config_websockets_endpoints_ipv4_length = 0;
int32_t config_websockets_endpoints_ipv6_length = 0;
int32_t config_websockets_endpoints_backlog = 8;
ks_assert(bs);
if (!config) return KS_STATUS_FAIL;
if (!config_setting_is_group(config)) return KS_STATUS_FAIL;
websockets = config_setting_get_member(config, "websockets");
if (!websockets) return KS_STATUS_FAIL;
websockets_endpoints = config_setting_get_member(config, "endpoints");
if (!websockets_endpoints) return KS_STATUS_FAIL;
websockets_endpoints_ipv4 = config_lookup_from(websockets_endpoints, "ipv4");
websockets_endpoints_ipv6 = config_lookup_from(websockets_endpoints, "ipv6");
if (websockets_endpoints_ipv4) {
if (config_setting_type(websockets_endpoints_ipv4) != CONFIG_TYPE_LIST) return KS_STATUS_FAIL;
if ((config_websockets_endpoints_ipv4_length = config_setting_length(websockets_endpoints_ipv4)) > BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX)
return KS_STATUS_FAIL;
for (int32_t index = 0; index < config_websockets_endpoints_ipv4_length; ++index) {
element = config_setting_get_elem(websockets_endpoints_ipv4, index);
tmp1 = config_lookup_from(element, "address");
tmp2 = config_lookup_from(element, "port");
if (!tmp1 || !tmp2) return KS_STATUS_FAIL;
if (config_setting_type(tmp1) != CONFIG_TYPE_STRING) return KS_STATUS_FAIL;
if (config_setting_type(tmp2) != CONFIG_TYPE_INT) return KS_STATUS_FAIL;
if (ks_addr_set(&config_websockets_endpoints_ipv4[index],
config_setting_get_string(tmp1),
config_setting_get_int(tmp2),
AF_INET) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
}
}
if (websockets_endpoints_ipv6) {
if (config_setting_type(websockets_endpoints_ipv6) != CONFIG_TYPE_LIST) return KS_STATUS_FAIL;
if ((config_websockets_endpoints_ipv6_length = config_setting_length(websockets_endpoints_ipv6)) > BLADE_SERVICE_WEBSOCKETS_ENDPOINTS_MULTIHOME_MAX)
return KS_STATUS_FAIL;
for (int32_t index = 0; index < config_websockets_endpoints_ipv6_length; ++index) {
element = config_setting_get_elem(websockets_endpoints_ipv6, index);
tmp1 = config_lookup_from(element, "address");
tmp2 = config_lookup_from(element, "port");
if (!tmp1 || !tmp2) return KS_STATUS_FAIL;
if (config_setting_type(tmp1) != CONFIG_TYPE_STRING) return KS_STATUS_FAIL;
if (config_setting_type(tmp2) != CONFIG_TYPE_INT) return KS_STATUS_FAIL;
if (ks_addr_set(&config_websockets_endpoints_ipv6[index],
config_setting_get_string(tmp1),
config_setting_get_int(tmp2),
AF_INET6) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
}
}
if (config_websockets_endpoints_ipv4_length + config_websockets_endpoints_ipv6_length <= 0) return KS_STATUS_FAIL;
tmp1 = config_lookup_from(websockets_endpoints, "backlog");
if (tmp1) {
if (config_setting_type(tmp1) != CONFIG_TYPE_INT) return KS_STATUS_FAIL;
config_websockets_endpoints_backlog = config_setting_get_int(tmp1);
}
websockets_ssl = config_setting_get_member(websockets, "ssl");
if (websockets_ssl) {
// @todo: SSL stuffs from websockets_ssl into config_websockets_ssl envelope
}
// Configuration is valid, now assign it to the variables that are used
// If the configuration was invalid, then this does not get changed from the current config when reloading a new config
for (int32_t index = 0; index < config_websockets_endpoints_ipv4_length; ++index)
bs->config_websockets_endpoints_ipv4[index] = config_websockets_endpoints_ipv4[index];
for (int32_t index = 0; index < config_websockets_endpoints_ipv6_length; ++index)
bs->config_websockets_endpoints_ipv6[index] = config_websockets_endpoints_ipv6[index];
bs->config_websockets_endpoints_ipv4_length = config_websockets_endpoints_ipv4_length;
bs->config_websockets_endpoints_ipv6_length = config_websockets_endpoints_ipv6_length;
bs->config_websockets_endpoints_backlog = config_websockets_endpoints_backlog;
//bs->config_websockets_ssl = config_websockets_ssl;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_service_startup(blade_service_t *bs, config_setting_t *config)
{
ks_assert(bs);
blade_service_shutdown(bs);
// @todo: If the configuration is invalid, and this is a case of reloading a new config, then the service shutdown shouldn't occur
// but the service may use configuration that changes before we shutdown if it is read successfully, may require a config reader/writer mutex?
if (blade_service_config(bs, config) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
for (int32_t index = 0; index < bs->config_websockets_endpoints_ipv4_length; ++index) {
if (blade_service_listen(bs, &bs->config_websockets_endpoints_ipv4[index]) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
}
for (int32_t index = 0; index < bs->config_websockets_endpoints_ipv6_length; ++index) {
if (blade_service_listen(bs, &bs->config_websockets_endpoints_ipv6[index]) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
}
if (ks_thread_create_ex(&bs->listeners_thread,
blade_service_listeners_thread,
bs,
KS_THREAD_FLAG_DEFAULT,
KS_THREAD_DEFAULT_STACK,
KS_PRI_NORMAL,
bs->pool) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_service_shutdown(blade_service_t *bs)
{
ks_assert(bs);
bs->shutdown = KS_TRUE;
if (bs->listeners_thread) {
ks_thread_join(bs->listeners_thread);
ks_pool_free(bs->pool, &bs->listeners_thread);
}
for (int32_t index = 0; index < bs->listeners_length; ++index) {
ks_socket_t sock = bs->listeners_poll[index].fd;
ks_socket_shutdown(sock, SHUT_RDWR);
ks_socket_close(&sock);
}
bs->listeners_length = 0;
list_iterator_start(&bs->connected);
while (list_iterator_hasnext(&bs->connected)) {
blade_peer_t *peer = (blade_peer_t *)list_iterator_next(&bs->connected);
blade_peer_destroy(&peer);
}
list_iterator_stop(&bs->connected);
list_clear(&bs->connected);
bs->shutdown = KS_FALSE;
return KS_STATUS_SUCCESS;
}
ks_status_t blade_service_listen(blade_service_t *bs, ks_sockaddr_t *addr)
{
ks_socket_t listener = KS_SOCK_INVALID;
int32_t listener_index = -1;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(bs);
ks_assert(addr);
if ((listener = socket(addr->family, SOCK_STREAM, IPPROTO_TCP)) == KS_SOCK_INVALID) {
ret = KS_STATUS_FAIL;
goto done;
}
ks_socket_option(listener, SO_REUSEADDR, KS_TRUE);
ks_socket_option(listener, TCP_NODELAY, KS_TRUE);
// @todo make sure v6 does not automatically map to a v4 using socket option IPV6_V6ONLY?
if (ks_addr_bind(listener, addr) != KS_STATUS_SUCCESS) {
ret = KS_STATUS_FAIL;
goto done;
}
if (listen(listener, bs->config_websockets_endpoints_backlog) != 0) {
ret = KS_STATUS_FAIL;
goto done;
}
listener_index = bs->listeners_length++;
if (bs->listeners_length > bs->listeners_size) {
bs->listeners_size = bs->listeners_length;
bs->listeners_poll = (struct pollfd *)ks_pool_resize(bs->pool, bs->listeners_poll, sizeof(struct pollfd) * bs->listeners_size);
ks_assert(bs->listeners_poll);
bs->listeners_families = (int32_t *)ks_pool_resize(bs->pool, bs->listeners_families, sizeof(int32_t) * bs->listeners_size);
ks_assert(bs->listeners_families);
}
bs->listeners_poll[listener_index].fd = listener;
bs->listeners_poll[listener_index].events = POLLIN | POLLERR;
bs->listeners_families[listener_index] = addr->family;
done:
if (ret != KS_STATUS_SUCCESS) {
if (listener != KS_SOCK_INVALID) {
ks_socket_shutdown(listener, SHUT_RDWR);
ks_socket_close(&listener);
}
}
return ret;
}
void *blade_service_listeners_thread(ks_thread_t *thread, void *data)
{
blade_service_t *service;
ks_assert(thread);
ks_assert(data);
service = (blade_service_t *)data;
while (!service->shutdown) {
if (ks_poll(service->listeners_poll, service->listeners_length, 100) > 0) {
for (int32_t index = 0; index < service->listeners_length; ++index) {
ks_socket_t sock;
ks_sockaddr_t raddr;
socklen_t slen = 0;
kws_t *kws = NULL;
blade_peer_t *peer = NULL;
if (!(service->listeners_poll[index].revents & POLLIN)) continue;
if (service->listeners_poll[index].revents & POLLERR) {
// @todo: error handling, just skip the listener for now
continue;
}
if (service->listeners_families[index] == AF_INET) {
slen = sizeof(raddr.v.v4);
if ((sock = accept(service->listeners_poll[index].fd, (struct sockaddr *)&raddr.v.v4, &slen)) == KS_SOCK_INVALID) {
// @todo: error handling, just skip the socket for now
continue;
}
raddr.family = AF_INET;
} else {
slen = sizeof(raddr.v.v6);
if ((sock = accept(service->listeners_poll[index].fd, (struct sockaddr *)&raddr.v.v6, &slen)) == KS_SOCK_INVALID) {
// @todo: error handling, just skip the socket for now
continue;
}
raddr.family = AF_INET6;
}
ks_addr_get_host(&raddr);
ks_addr_get_port(&raddr);
// @todo: SSL init stuffs based on data from service->config_websockets_ssl
if (kws_init(&kws, sock, NULL, NULL, KWS_BLOCK, service->pool) != KS_STATUS_SUCCESS) {
// @todo: error handling, just close and skip the socket for now
ks_socket_close(&sock);
continue;
}
blade_peer_create(&peer, service->pool, service->tpool);
ks_assert(peer);
// @todo: should probably assign kws before adding to list, in a separate call from startup because it starts the internal worker thread
list_append(&service->connected, peer);
blade_peer_startup(peer, kws);
}
}
}
return NULL;
}
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
*/

View File

@ -43,7 +43,12 @@ struct blade_handle_s {
bhpvt_flag_t flags;
ks_pool_t *pool;
ks_thread_pool_t *tpool;
blade_peer_t *peer;
config_setting_t *config_datastore;
config_setting_t *config_directory;
//blade_peer_t *peer;
blade_directory_t *directory;
blade_datastore_t *datastore;
};
@ -64,9 +69,9 @@ KS_DECLARE(ks_status_t) blade_handle_destroy(blade_handle_t **bhP)
flags = bh->flags;
pool = bh->pool;
if (bh->datastore) blade_datastore_destroy(&bh->datastore);
blade_peer_destroy(&bh->peer);
blade_handle_shutdown(bh);
//blade_peer_destroy(&bh->peer);
if (bh->tpool && (flags & BH_MYTPOOL)) ks_thread_pool_destroy(&bh->tpool);
ks_pool_free(bh->pool, &bh);
@ -78,14 +83,12 @@ KS_DECLARE(ks_status_t) blade_handle_destroy(blade_handle_t **bhP)
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *pool, ks_thread_pool_t *tpool, const char *nodeid)
KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *pool, ks_thread_pool_t *tpool)
{
bhpvt_flag_t newflags = BH_NONE;
blade_handle_t *bh = NULL;
ks_dht_nodeid_t nid;
ks_assert(nodeid);
ks_assert(strlen(nodeid) == (KS_DHT_NODEID_SIZE * 2));
ks_assert(bhP);
if (!pool) {
newflags |= BH_MYPOOL;
@ -101,75 +104,88 @@ KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *poo
bh->flags = newflags;
bh->pool = pool;
bh->tpool = tpool;
ks_dht_dehex(nid.id, nodeid, KS_DHT_NODEID_SIZE);
blade_peer_create(&bh->peer, bh->pool, bh->tpool, &nid);
//blade_peer_create(&bh->peer, bh->pool, bh->tpool);
*bhP = bh;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(void) blade_handle_myid(blade_handle_t *bh, char *buffer)
ks_status_t blade_handle_config(blade_handle_t *bh, config_setting_t *config)
{
ks_dht_nodeid_t *nodeid = NULL;
ks_assert(bh);
ks_assert(bh->peer);
nodeid = blade_peer_myid(bh->peer);
ks_dht_hex(nodeid->id, buffer, KS_DHT_NODEID_SIZE);
}
KS_DECLARE(void) blade_handle_autoroute(blade_handle_t *bh, ks_bool_t autoroute, ks_port_t port)
{
ks_assert(bh);
ks_assert(bh->peer);
blade_peer_autoroute(bh->peer, autoroute, port);
}
KS_DECLARE(ks_status_t) blade_handle_bind(blade_handle_t *bh, const char *ip, ks_port_t port, ks_dht_endpoint_t **endpoint)
{
ks_sockaddr_t addr;
int family = AF_INET;
config_setting_t *datastore = NULL;
config_setting_t *directory = NULL;
ks_assert(bh);
ks_assert(ip);
ks_assert(port);
if (ip[1] != '.' && ip[2] != '.' && ip[3] != '.') family = AF_INET6;
if (!config) return KS_STATUS_FAIL;
if (!config_setting_is_group(config)) return KS_STATUS_FAIL;
datastore = config_setting_get_member(config, "datastore");
//if (datastore && !config_setting_is_group(datastore)) return KS_STATUS_FAIL;
ks_addr_set(&addr, ip, port, family);
return blade_peer_bind(bh->peer, &addr, endpoint);
directory = config_setting_get_member(config, "directory");
//if (directory && !config_setting_is_group(directory)) return KS_STATUS_FAIL;
bh->config_datastore = datastore;
bh->config_directory = directory;
return KS_STATUS_SUCCESS;
}
KS_DECLARE(void) blade_handle_pulse(blade_handle_t *bh, int32_t timeout)
{
ks_assert(bh);
ks_assert(timeout >= 0);
blade_peer_pulse(bh->peer, timeout);
if (bh->datastore) blade_datastore_pulse(bh->datastore, timeout);
}
KS_DECLARE(void) blade_handle_datastore_start(blade_handle_t *bh)
KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_t *config)
{
ks_assert(bh);
if (bh->datastore) return;
if (blade_handle_config(bh, config) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL;
if (bh->config_datastore && !blade_handle_datastore_available(bh)) {
blade_datastore_create(&bh->datastore, bh->pool, bh->tpool);
blade_datastore_startup(bh->datastore, bh->config_datastore);
}
if (bh->config_directory && !blade_handle_directory_available(bh)) {
blade_directory_create(&bh->directory, bh->pool, bh->tpool);
blade_directory_startup(bh->directory, config);
}
return KS_STATUS_SUCCESS;
}
blade_datastore_create(&bh->datastore, bh->pool);
KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh)
{
ks_assert(bh);
if (blade_handle_directory_available(bh)) blade_directory_destroy(&bh->directory);
if (blade_handle_datastore_available(bh)) blade_datastore_destroy(&bh->datastore);
return KS_STATUS_SUCCESS;
}
KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh)
{
ks_assert(bh);
return bh->datastore != NULL;
}
KS_DECLARE(ks_bool_t) blade_handle_directory_available(blade_handle_t *bh)
{
ks_assert(bh);
return bh->directory != NULL;
}
KS_DECLARE(ks_status_t) blade_handle_datastore_store(blade_handle_t *bh, const void *key, int32_t key_length, const void *data, int64_t data_length)
{
ks_assert(bh);
ks_assert(bh->datastore);
ks_assert(key);
ks_assert(key_length > 0);
ks_assert(data);
ks_assert(data_length > 0);
if (!blade_handle_datastore_available(bh)) return KS_STATUS_INACTIVE;
return blade_datastore_store(bh->datastore, key, key_length, data, data_length);
}
@ -181,11 +197,12 @@ KS_DECLARE(ks_status_t) blade_handle_datastore_fetch(blade_handle_t *bh,
void *userdata)
{
ks_assert(bh);
ks_assert(bh->datastore);
ks_assert(callback);
ks_assert(key);
ks_assert(key_length > 0);
if (!blade_handle_datastore_available(bh)) return KS_STATUS_INACTIVE;
return blade_datastore_fetch(bh->datastore, callback, key, key_length, userdata);
}

View File

@ -36,11 +36,14 @@
#include <ks.h>
#include <ks_dht.h>
#include <sodium.h>
#include <libconfig.h>
#include "unqlite.h"
#include "blade_types.h"
#include "blade_stack.h"
#include "blade_peer.h"
#include "blade_service.h"
#include "blade_datastore.h"
#include "blade_directory.h"
#include "bpcp.h"
KS_BEGIN_EXTERN_C

View File

@ -35,10 +35,17 @@
#define _BLADE_DATASTORE_H_
#include <blade.h>
#define BLADE_DATASTORE_TPOOL_MIN 2
#define BLADE_DATASTORE_TPOOL_MAX 8
#define BLADE_DATASTORE_TPOOL_STACK (1024 * 256)
#define BLADE_DATASTORE_TPOOL_IDLE 10
KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) blade_datastore_create(blade_datastore_t **bdsP, ks_pool_t *pool);
KS_DECLARE(ks_status_t) blade_datastore_create(blade_datastore_t **bdsP, ks_pool_t *pool, ks_thread_pool_t *tpool);
KS_DECLARE(ks_status_t) blade_datastore_destroy(blade_datastore_t **bdsP);
KS_DECLARE(void) blade_datastore_pulse(blade_datastore_t *bds, int32_t timeout);
KS_DECLARE(ks_status_t) blade_datastore_startup(blade_datastore_t *bds, config_setting_t *config);
KS_DECLARE(ks_status_t) blade_datastore_shutdown(blade_datastore_t *bds);
KS_DECLARE(void) blade_datastore_error(blade_datastore_t *bds, const char **buffer, int32_t *buffer_length);
KS_DECLARE(ks_status_t) blade_datastore_store(blade_datastore_t *bds, const void *key, int32_t key_length, const void *data, int64_t data_length);
KS_DECLARE(ks_status_t) blade_datastore_fetch(blade_datastore_t *bds,

View File

@ -0,0 +1,61 @@
/*
* Copyright (c) 2007-2014, Anthony Minessale II
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* * Neither the name of the original author; nor the names of any contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
* OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef _BLADE_DIRECTORY_H_
#define _BLADE_DIRECTORY_H_
#include <blade.h>
#define BLADE_DIRECTORY_TPOOL_MIN 2
#define BLADE_DIRECTORY_TPOOL_MAX 8
#define BLADE_DIRECTORY_TPOOL_STACK (1024 * 256)
#define BLADE_DIRECTORY_TPOOL_IDLE 10
KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) blade_directory_create(blade_directory_t **bdP, ks_pool_t *pool, ks_thread_pool_t *tpool);
KS_DECLARE(ks_status_t) blade_directory_destroy(blade_directory_t **bdP);
KS_DECLARE(ks_status_t) blade_directory_startup(blade_directory_t *bd, config_setting_t *config);
KS_DECLARE(ks_status_t) blade_directory_shutdown(blade_directory_t *bd);
KS_END_EXTERN_C
#endif
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
*/

View File

@ -41,12 +41,10 @@
#define BLADE_PEER_TPOOL_IDLE 10
KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, ks_thread_pool_t *tpool, ks_dht_nodeid_t *nodeid);
KS_DECLARE(ks_status_t) blade_peer_create(blade_peer_t **bpP, ks_pool_t *pool, ks_thread_pool_t *tpool);
KS_DECLARE(ks_status_t) blade_peer_destroy(blade_peer_t **bpP);
KS_DECLARE(ks_dht_nodeid_t *) blade_peer_myid(blade_peer_t *bp);
KS_DECLARE(void) blade_peer_autoroute(blade_peer_t *bp, ks_bool_t autoroute, ks_port_t port);
KS_DECLARE(ks_status_t) blade_peer_bind(blade_peer_t *bp, const ks_sockaddr_t *addr, ks_dht_endpoint_t **endpoint);
KS_DECLARE(void) blade_peer_pulse(blade_peer_t *bp, int32_t timeout);
KS_DECLARE(ks_status_t) blade_peer_startup(blade_peer_t *bp, kws_t *kws);
KS_DECLARE(ks_status_t) blade_peer_shutdown(blade_peer_t *bp);
KS_END_EXTERN_C
#endif

View File

@ -0,0 +1,61 @@
/*
* Copyright (c) 2007-2014, Anthony Minessale II
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* * Neither the name of the original author; nor the names of any contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
* OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef _BLADE_SERVICE_H_
#define _BLADE_SERVICE_H_
#include <blade.h>
#define BLADE_SERVICE_TPOOL_MIN 2
#define BLADE_SERVICE_TPOOL_MAX 8
#define BLADE_SERVICE_TPOOL_STACK (1024 * 256)
#define BLADE_SERVICE_TPOOL_IDLE 10
KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) blade_service_create(blade_service_t **bsP, ks_pool_t *pool, ks_thread_pool_t *tpool);
KS_DECLARE(ks_status_t) blade_service_destroy(blade_service_t **bsP);
KS_DECLARE(ks_status_t) blade_service_startup(blade_service_t *bs, config_setting_t *config);
KS_DECLARE(ks_status_t) blade_service_shutdown(blade_service_t *bs);
KS_END_EXTERN_C
#endif
/* For Emacs:
* Local Variables:
* mode:c
* indent-tabs-mode:t
* tab-width:4
* c-basic-offset:4
* End:
* For VIM:
* vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
*/

View File

@ -42,12 +42,13 @@
KS_BEGIN_EXTERN_C
KS_DECLARE(ks_status_t) blade_handle_destroy(blade_handle_t **bhP);
KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *pool, ks_thread_pool_t *tpool, const char *nodeid);
KS_DECLARE(void) blade_handle_myid(blade_handle_t *bh, char *buffer);
KS_DECLARE(void) blade_handle_autoroute(blade_handle_t *bh, ks_bool_t autoroute, ks_port_t port);
KS_DECLARE(ks_status_t) blade_handle_bind(blade_handle_t *bh, const char *ip, ks_port_t port, ks_dht_endpoint_t **endpoint);
KS_DECLARE(void) blade_handle_pulse(blade_handle_t *bh, int32_t timeout);
KS_DECLARE(void) blade_handle_datastore_start(blade_handle_t *bh);
KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP, ks_pool_t *pool, ks_thread_pool_t *tpool);
KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_t *config);
KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh);
KS_DECLARE(ks_bool_t) blade_handle_datastore_available(blade_handle_t *bh);
KS_DECLARE(ks_bool_t) blade_handle_directory_available(blade_handle_t *bh);
KS_DECLARE(ks_status_t) blade_handle_datastore_store(blade_handle_t *bh, const void *key, int32_t key_length, const void *data, int64_t data_length);
KS_DECLARE(ks_status_t) blade_handle_datastore_fetch(blade_handle_t *bh,
blade_datastore_fetch_callback_t callback,

View File

@ -39,7 +39,9 @@ KS_BEGIN_EXTERN_C
typedef struct blade_handle_s blade_handle_t;
typedef struct blade_peer_s blade_peer_t;
typedef struct blade_service_s blade_service_t;
typedef struct blade_datastore_s blade_datastore_t;
typedef struct blade_directory_s blade_directory_t;
typedef ks_bool_t (*blade_datastore_fetch_callback_t)(blade_datastore_t *bds, const void *data, uint32_t data_length, void *userdata);

View File

@ -28,16 +28,12 @@ struct command_def_s {
void command_test(blade_handle_t *bh, char *args);
void command_quit(blade_handle_t *bh, char *args);
void command_myid(blade_handle_t *bh, char *args);
void command_bind(blade_handle_t *bh, char *args);
void command_store(blade_handle_t *bh, char *args);
void command_fetch(blade_handle_t *bh, char *args);
static const struct command_def_s command_defs[] = {
{ "test", command_test },
{ "quit", command_quit },
{ "myid", command_myid },
{ "bind", command_bind },
{ "store", command_store },
{ "fetch", command_fetch },
@ -48,19 +44,12 @@ static const struct command_def_s command_defs[] = {
int main(int argc, char **argv)
{
blade_handle_t *bh = NULL;
const char *nodeid;
ks_assert(argc >= 2);
nodeid = argv[1];
ks_global_set_default_logger(KS_LOG_LEVEL_DEBUG);
blade_init();
blade_handle_create(&bh, NULL, NULL, nodeid);
blade_handle_autoroute(bh, KS_TRUE, KS_DHT_DEFAULT_PORT);
blade_handle_create(&bh, NULL, NULL);
loop(bh);
@ -121,7 +110,7 @@ void loop(blade_handle_t *bh)
// @todo lines must not exceed 512 bytes, treat as error and ignore buffer until next new line?
ks_assert(0);
}
blade_handle_pulse(bh, 1);
blade_handle_pulse(bh);
}
}
@ -179,34 +168,6 @@ void command_quit(blade_handle_t *bh, char *args)
g_shutdown = KS_TRUE;
}
void command_myid(blade_handle_t *bh, char *args)
{
char buf[KS_DHT_NODEID_SIZE * 2 + 1];
ks_assert(bh);
ks_assert(args);
blade_handle_myid(bh, buf);
ks_log(KS_LOG_INFO, "%s\n", buf);
}
void command_bind(blade_handle_t *bh, char *args)
{
char *ip = NULL;
char *port = NULL;
ks_port_t p;
ks_assert(args);
parse_argument(&args, &ip, ' ');
parse_argument(&args, &port, ' ');
p = atoi(port); // @todo use strtol for error handling
blade_handle_bind(bh, ip, p, NULL);
}
void command_store(blade_handle_t *bh, char *args)
{
char *key;
@ -214,7 +175,7 @@ void command_store(blade_handle_t *bh, char *args)
ks_assert(args);
blade_handle_datastore_start(bh);
blade_handle_datastore_startup(bh, NULL);
parse_argument(&args, &key, ' ');
parse_argument(&args, &data, ' ');
@ -234,7 +195,7 @@ void command_fetch(blade_handle_t *bh, char *args)
ks_assert(args);
blade_handle_datastore_start(bh);
blade_handle_datastore_startup(bh, NULL);
parse_argument(&args, &key, ' ');