FS-9775: Added initial registry for 'y' keys, and some unit testing
This commit is contained in:
parent
f9ed958ecc
commit
6eed8d3f94
|
@ -8,7 +8,12 @@ KS_BEGIN_EXTERN_C
|
|||
|
||||
KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht);
|
||||
KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr);
|
||||
|
||||
KS_DECLARE(ks_status_t) ks_dht2_parse(ks_dht2_t *dht,
|
||||
struct bencode **message,
|
||||
uint8_t *transactionid,
|
||||
ks_size_t *transactionid_len,
|
||||
char *messagetype);
|
||||
|
||||
KS_END_EXTERN_C
|
||||
|
||||
#endif /* KS_DHT_INT_H */
|
||||
|
|
|
@ -68,6 +68,9 @@ KS_DECLARE(ks_status_t) ks_dht2_init(ks_dht2_t *dht, const uint8_t *nodeid)
|
|||
if (ks_dht2_nodeid_init(&dht->nodeid, nodeid) != KS_STATUS_SUCCESS) {
|
||||
return KS_STATUS_FAIL;
|
||||
}
|
||||
|
||||
ks_hash_create(&dht->registry_y, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
|
||||
// @todo ks_hash_insert the q/r/e callbacks into y registry
|
||||
|
||||
dht->bind_ipv4 = KS_FALSE;
|
||||
dht->bind_ipv6 = KS_FALSE;
|
||||
|
@ -90,16 +93,43 @@ KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht)
|
|||
ks_assert(dht);
|
||||
|
||||
dht->recv_buffer_length = 0;
|
||||
// @todo dht->endpoints_poll deinit
|
||||
// @todo dht->endpoints deinit
|
||||
for (int32_t i = 0; i < dht->endpoints_size; ++i) {
|
||||
ks_dht2_endpoint_t *ep = dht->endpoints[i];
|
||||
//ks_hash_remove(dht->endpoints_hash, ep->addr.host);
|
||||
ks_dht2_endpoint_deinit(ep);
|
||||
ks_dht2_endpoint_free(ep);
|
||||
}
|
||||
if (dht->endpoints) {
|
||||
ks_pool_free(dht->pool, dht->endpoints);
|
||||
dht->endpoints = NULL;
|
||||
}
|
||||
if (dht->endpoints_poll) {
|
||||
ks_pool_free(dht->pool, dht->endpoints_poll);
|
||||
dht->endpoints_poll = NULL;
|
||||
}
|
||||
ks_hash_destroy(&dht->endpoints_hash);
|
||||
dht->bind_ipv4 = KS_FALSE;
|
||||
dht->bind_ipv6 = KS_FALSE;
|
||||
|
||||
ks_hash_destroy(&dht->registry_y);
|
||||
|
||||
ks_dht2_nodeid_deinit(&dht->nodeid);
|
||||
|
||||
return KS_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
KS_DECLARE(ks_status_t) ks_dht2_register_y(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback)
|
||||
{
|
||||
ks_assert(dht);
|
||||
ks_assert(value);
|
||||
ks_assert(callback);
|
||||
|
||||
return ks_hash_insert(dht->registry_y, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
|
@ -220,15 +250,12 @@ KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht)
|
|||
*/
|
||||
KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr)
|
||||
{
|
||||
struct bencode *message;
|
||||
struct bencode *t;
|
||||
struct bencode *y;
|
||||
const char *tv;
|
||||
const char *yv;
|
||||
ks_size_t tv_len;
|
||||
ks_size_t yv_len;
|
||||
uint16_t transactionid;
|
||||
char messagetype;
|
||||
struct bencode *message = NULL;
|
||||
uint8_t transactionid[KS_DHT_TRANSACTIONID_MAX_SIZE];
|
||||
ks_size_t transactionid_len;
|
||||
char messagetype[KS_DHT_MESSAGETYPE_MAX_SIZE];
|
||||
ks_dht2_registry_callback_t callback;
|
||||
ks_status_t ret = KS_STATUS_FAIL;
|
||||
|
||||
ks_assert(dht);
|
||||
ks_assert(raddr);
|
||||
|
@ -241,53 +268,99 @@ KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr)
|
|||
|
||||
// @todo blacklist check for bad actor nodes
|
||||
|
||||
message = ben_decode((const void *)dht->recv_buffer, dht->recv_buffer_length);
|
||||
if (!message) {
|
||||
ks_log(KS_LOG_DEBUG, "Message cannot be decoded\n");
|
||||
if (ks_dht2_parse(dht, &message, transactionid, &transactionid_len, messagetype) != KS_STATUS_SUCCESS) {
|
||||
return KS_STATUS_FAIL;
|
||||
}
|
||||
|
||||
if (!(callback = (ks_dht2_registry_callback_t)(intptr_t)ks_hash_search(dht->registry_y, messagetype, KS_UNLOCKED))) {
|
||||
ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", messagetype);
|
||||
} else {
|
||||
ret = callback(dht, raddr, transactionid, transactionid_len, message);
|
||||
}
|
||||
|
||||
ben_free(message);
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
KS_DECLARE(ks_status_t) ks_dht2_parse(ks_dht2_t *dht,
|
||||
struct bencode **message,
|
||||
uint8_t *transactionid,
|
||||
ks_size_t *transactionid_len,
|
||||
char *messagetype)
|
||||
{
|
||||
struct bencode *msg = NULL;
|
||||
struct bencode *t;
|
||||
struct bencode *y;
|
||||
const char *tv;
|
||||
const char *yv;
|
||||
ks_size_t tv_len;
|
||||
ks_size_t yv_len;
|
||||
|
||||
ks_assert(dht);
|
||||
ks_assert(message);
|
||||
ks_assert(transactionid);
|
||||
ks_assert(messagetype);
|
||||
|
||||
msg = ben_decode((const void *)dht->recv_buffer, dht->recv_buffer_length);
|
||||
if (!msg) {
|
||||
ks_log(KS_LOG_DEBUG, "Message cannot be decoded\n");
|
||||
goto failure;
|
||||
}
|
||||
|
||||
ks_log(KS_LOG_DEBUG, "Message decoded\n");
|
||||
ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message));
|
||||
ks_log(KS_LOG_DEBUG, "%s\n", ben_print(msg));
|
||||
|
||||
t = ben_dict_get_by_str(message, "t");
|
||||
t = ben_dict_get_by_str(msg, "t");
|
||||
if (!t) {
|
||||
ks_log(KS_LOG_DEBUG, "Message missing required key 't'\n");
|
||||
return KS_STATUS_FAIL;
|
||||
goto failure;
|
||||
}
|
||||
|
||||
tv = ben_str_val(t);
|
||||
tv_len = ben_str_len(t);
|
||||
if (tv_len != sizeof(uint16_t)) {
|
||||
ks_log(KS_LOG_DEBUG, "Message 't' value has an unexpected size of %d\n", tv_len);
|
||||
return KS_STATUS_FAIL;
|
||||
if (tv_len > KS_DHT_TRANSACTIONID_MAX_SIZE) {
|
||||
ks_log(KS_LOG_DEBUG, "Message 't' value has an unexpectedly large size of %d\n", tv_len);
|
||||
goto failure;
|
||||
}
|
||||
|
||||
transactionid = ntohs(*((uint16_t *)tv));
|
||||
ks_log(KS_LOG_DEBUG, "Message transaction id is %d\n", transactionid);
|
||||
memcpy(transactionid, tv, tv_len);
|
||||
*transactionid_len = tv_len;
|
||||
// @todo hex output of transactionid
|
||||
//ks_log(KS_LOG_DEBUG, "Message transaction id is %d\n", *transactionid);
|
||||
|
||||
y = ben_dict_get_by_str(message, "y");
|
||||
y = ben_dict_get_by_str(msg, "y");
|
||||
if (!y) {
|
||||
ks_log(KS_LOG_DEBUG, "Message missing required key 'y'\n");
|
||||
return KS_STATUS_FAIL;
|
||||
goto failure;
|
||||
}
|
||||
|
||||
yv = ben_str_val(y);
|
||||
yv_len = ben_str_len(y);
|
||||
if (yv_len != 1) {
|
||||
ks_log(KS_LOG_DEBUG, "Message 'y' value has an unexpected size of %d\n", yv_len);
|
||||
return KS_STATUS_FAIL;
|
||||
if (yv_len >= KS_DHT_MESSAGETYPE_MAX_SIZE) {
|
||||
ks_log(KS_LOG_DEBUG, "Message 'y' value has an unexpectedly large size of %d\n", yv_len);
|
||||
goto failure;
|
||||
}
|
||||
|
||||
messagetype = (char)yv[0];
|
||||
ks_log(KS_LOG_DEBUG, "Message type is '%c'\n", messagetype);
|
||||
|
||||
// @todo dispatch callback from the 'y' registry
|
||||
memcpy(messagetype, yv, yv_len);
|
||||
messagetype[yv_len] = '\0';
|
||||
ks_log(KS_LOG_DEBUG, "Message type is '%s'\n", messagetype);
|
||||
|
||||
*message = msg;
|
||||
return KS_STATUS_SUCCESS;
|
||||
|
||||
failure:
|
||||
if (msg) {
|
||||
ben_free(msg);
|
||||
}
|
||||
*message = NULL;
|
||||
*transactionid_len = 0;
|
||||
messagetype[0] = '\0';
|
||||
return KS_STATUS_FAIL;
|
||||
}
|
||||
|
||||
|
||||
/* For Emacs:
|
||||
* Local Variables:
|
||||
* mode:c
|
||||
|
|
|
@ -12,7 +12,8 @@ KS_BEGIN_EXTERN_C
|
|||
|
||||
#define KS_DHT_DEFAULT_PORT 5309
|
||||
#define KS_DHT_RECV_BUFFER_SIZE 0xFFFF
|
||||
|
||||
#define KS_DHT_TRANSACTIONID_MAX_SIZE 20
|
||||
#define KS_DHT_MESSAGETYPE_MAX_SIZE 20
|
||||
|
||||
typedef struct ks_dht2_s ks_dht2_t;
|
||||
struct ks_dht2_s {
|
||||
|
@ -21,6 +22,8 @@ struct ks_dht2_s {
|
|||
|
||||
ks_dht2_nodeid_t nodeid;
|
||||
|
||||
ks_hash_t *registry_y;
|
||||
|
||||
ks_bool_t bind_ipv4;
|
||||
ks_bool_t bind_ipv6;
|
||||
|
||||
|
@ -33,6 +36,12 @@ struct ks_dht2_s {
|
|||
ks_size_t recv_buffer_length;
|
||||
};
|
||||
|
||||
typedef ks_status_t (*ks_dht2_registry_callback_t)(ks_dht2_t *dht,
|
||||
ks_sockaddr_t *raddr,
|
||||
uint8_t *transactionid,
|
||||
ks_size_t transactionid_len,
|
||||
struct bencode *message);
|
||||
|
||||
|
||||
KS_DECLARE(ks_status_t) ks_dht2_alloc(ks_dht2_t **dht, ks_pool_t *pool);
|
||||
KS_DECLARE(ks_status_t) ks_dht2_prealloc(ks_dht2_t *dht, ks_pool_t *pool);
|
||||
|
@ -46,6 +55,10 @@ KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht);
|
|||
KS_DECLARE(ks_status_t) ks_dht2_bind(ks_dht2_t *dht, const ks_sockaddr_t *addr);
|
||||
KS_DECLARE(ks_status_t) ks_dht2_pulse(ks_dht2_t *dht, int32_t timeout);
|
||||
|
||||
|
||||
KS_DECLARE(ks_status_t) ks_dht2_register_y(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback);
|
||||
|
||||
|
||||
KS_END_EXTERN_C
|
||||
|
||||
#endif /* KS_DHT_H */
|
||||
|
|
|
@ -66,6 +66,8 @@ KS_DECLARE(ks_status_t) ks_dht2_endpoint_deinit(ks_dht2_endpoint_t *endpoint)
|
|||
{
|
||||
ks_assert(endpoint);
|
||||
|
||||
ks_socket_close(&endpoint->sock);
|
||||
|
||||
return KS_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -4,10 +4,18 @@
|
|||
#include <../dht/ks_dht_endpoint-int.h>
|
||||
#include <tap.h>
|
||||
|
||||
#define TEST_DHT1_REGISTER_Y_BUFFER "d1:ad2:id20:12345678901234567890e1:q4:ping1:t2:421:y1:ze"
|
||||
#define TEST_DHT1_PROCESS_BUFFER "d1:ad2:id20:12345678901234567890e1:q4:ping1:t2:421:y1:qe"
|
||||
|
||||
ks_status_t dht_z_callback(ks_dht2_t *dht, ks_sockaddr_t *raddr, uint8_t *transactionid, ks_size_t transactionid_len, struct bencode *message)
|
||||
{
|
||||
diag("dht_z_callback\n");
|
||||
ok(transactionid[0] == '4' && transactionid[1] == '2');
|
||||
return KS_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
int main() {
|
||||
ks_size_t buflen = strlen(TEST_DHT1_PROCESS_BUFFER);
|
||||
ks_size_t buflen;
|
||||
ks_status_t err;
|
||||
int mask = 0;
|
||||
ks_dht2_t *dht1 = NULL;
|
||||
|
@ -21,7 +29,7 @@ int main() {
|
|||
ok(!err);
|
||||
|
||||
ks_global_set_default_logger(7);
|
||||
|
||||
|
||||
err = ks_find_local_ip(v4, sizeof(v4), &mask, AF_INET, NULL);
|
||||
ok(err == KS_STATUS_SUCCESS);
|
||||
have_v4 = !zstr_buf(v4);
|
||||
|
@ -50,6 +58,8 @@ int main() {
|
|||
|
||||
err = ks_dht2_init(&dht2, NULL);
|
||||
ok(err == KS_STATUS_SUCCESS);
|
||||
|
||||
ks_dht2_register_y(dht1, "z", dht_z_callback);
|
||||
|
||||
if (have_v4) {
|
||||
err = ks_addr_set(&addr, v4, KS_DHT_DEFAULT_PORT, AF_INET);
|
||||
|
@ -81,8 +91,8 @@ int main() {
|
|||
ok(err == KS_STATUS_SUCCESS);
|
||||
}
|
||||
|
||||
// @todo populate dht1->recv_buffer and dht1->recv_buffer_length
|
||||
memcpy(dht1->recv_buffer, TEST_DHT1_PROCESS_BUFFER, buflen);
|
||||
buflen = strlen(TEST_DHT1_REGISTER_Y_BUFFER);
|
||||
memcpy(dht1->recv_buffer, TEST_DHT1_REGISTER_Y_BUFFER, buflen);
|
||||
dht1->recv_buffer_length = buflen;
|
||||
|
||||
err = ks_dht2_process(dht1, &raddr);
|
||||
|
|
Loading…
Reference in New Issue