From ed8e4dc610e73a4b8ab3233cc67f5da09c8c09b6 Mon Sep 17 00:00:00 2001 From: Shane Bryldt Date: Thu, 1 Dec 2016 21:16:35 +0000 Subject: [PATCH] FS-9775: Renamed registries, added query registry with ping callback, unit test updated --- libs/libks/src/dht/ks_dht-int.h | 4 +- libs/libks/src/dht/ks_dht.c | 136 +++++++++++++++++++++++++--- libs/libks/src/dht/ks_dht.h | 6 +- libs/libks/src/dht/ks_dht_message.c | 3 + libs/libks/src/dht/ks_dht_message.h | 2 + libs/libks/test/testdht2.c | 18 +++- 6 files changed, 149 insertions(+), 20 deletions(-) diff --git a/libs/libks/src/dht/ks_dht-int.h b/libs/libks/src/dht/ks_dht-int.h index d253245e9f..9f10f5599e 100644 --- a/libs/libks/src/dht/ks_dht-int.h +++ b/libs/libks/src/dht/ks_dht-int.h @@ -9,7 +9,9 @@ KS_BEGIN_EXTERN_C KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht); KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr); - +KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); +KS_DECLARE(ks_status_t) ks_dht2_process_query_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); + KS_END_EXTERN_C #endif /* KS_DHT_INT_H */ diff --git a/libs/libks/src/dht/ks_dht.c b/libs/libks/src/dht/ks_dht.c index d11a5be131..bf80f55fc1 100644 --- a/libs/libks/src/dht/ks_dht.c +++ b/libs/libks/src/dht/ks_dht.c @@ -70,8 +70,12 @@ KS_DECLARE(ks_status_t) ks_dht2_init(ks_dht2_t *dht, const uint8_t *nodeid) return KS_STATUS_FAIL; } - ks_hash_create(&dht->registry_y, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool); - // @todo ks_hash_insert the q/r/e callbacks into y registry + ks_hash_create(&dht->registry_type, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool); + ks_dht2_register_type(dht, "q", ks_dht2_process_query); + // @todo ks_hash_insert the r/e callbacks into type registry + + ks_hash_create(&dht->registry_query, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool); + ks_dht2_register_query(dht, "ping", ks_dht2_process_query_ping); dht->bind_ipv4 = KS_FALSE; dht->bind_ipv6 = KS_FALSE; @@ -116,9 +120,13 @@ KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht) dht->bind_ipv4 = KS_FALSE; dht->bind_ipv6 = KS_FALSE; - if (dht->registry_y) { - ks_hash_destroy(&dht->registry_y); - dht->registry_y = NULL; + if (dht->registry_type) { + ks_hash_destroy(&dht->registry_type); + dht->registry_type = NULL; + } + if (dht->registry_query) { + ks_hash_destroy(&dht->registry_query); + dht->registry_query = NULL; } ks_dht2_nodeid_deinit(&dht->nodeid); @@ -129,13 +137,25 @@ KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht) /** * */ -KS_DECLARE(ks_status_t) ks_dht2_register_y(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback) +KS_DECLARE(ks_status_t) ks_dht2_register_type(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback) { ks_assert(dht); ks_assert(value); ks_assert(callback); - return ks_hash_insert(dht->registry_y, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL; + return ks_hash_insert(dht->registry_type, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL; +} + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_register_query(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback) +{ + ks_assert(dht); + ks_assert(value); + ks_assert(callback); + + return ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL; } /** @@ -151,10 +171,7 @@ KS_DECLARE(ks_status_t) ks_dht2_bind(ks_dht2_t *dht, const ks_sockaddr_t *addr) ks_assert(addr); ks_assert(addr->family == AF_INET || addr->family == AF_INET6); ks_assert(addr->port); - - //if (!addr->port) { - // addr->port = KS_DHT_DEFAULT_PORT; - //} + dht->bind_ipv4 |= addr->family == AF_INET; dht->bind_ipv6 |= addr->family == AF_INET6; @@ -281,7 +298,7 @@ KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr) return KS_STATUS_FAIL; } - if (!(callback = (ks_dht2_registry_callback_t)(intptr_t)ks_hash_search(dht->registry_y, message.type, KS_UNLOCKED))) { + if (!(callback = (ks_dht2_registry_callback_t)(intptr_t)ks_hash_search(dht->registry_type, message.type, KS_UNLOCKED))) { ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", message.type); } else { ret = callback(dht, raddr, &message); @@ -292,6 +309,101 @@ KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr) return ret; } +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message) +{ + struct bencode *q; + struct bencode *a; + const char *qv; + ks_size_t qv_len; + char query[KS_DHT_MESSAGE_QUERY_MAX_SIZE]; + ks_dht2_registry_callback_t callback; + ks_status_t ret = KS_STATUS_FAIL; + + ks_assert(dht); + ks_assert(raddr); + ks_assert(message); + + q = ben_dict_get_by_str(message->data, "q"); + if (!q) { + ks_log(KS_LOG_DEBUG, "Message missing required key 'q'\n"); + return KS_STATUS_FAIL; + } + + qv = ben_str_val(q); + qv_len = ben_str_len(q); + if (qv_len >= KS_DHT_MESSAGE_QUERY_MAX_SIZE) { + ks_log(KS_LOG_DEBUG, "Message 'q' value has an unexpectedly large size of %d\n", qv_len); + return KS_STATUS_FAIL; + } + + memcpy(query, qv, qv_len); + query[qv_len] = '\0'; + ks_log(KS_LOG_DEBUG, "Message query is '%s'\n", query); + + a = ben_dict_get_by_str(message->data, "a"); + if (!a) { + ks_log(KS_LOG_DEBUG, "Message missing required key 'a'\n"); + return KS_STATUS_FAIL; + } + + message->args = a; + + if (!(callback = (ks_dht2_registry_callback_t)(intptr_t)ks_hash_search(dht->registry_query, query, KS_UNLOCKED))) { + ks_log(KS_LOG_DEBUG, "Message query '%s' is not registered\n", query); + } else { + ret = callback(dht, raddr, message); + } + + return ret; +} + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht2_process_query_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message) +{ + struct bencode *id; + const char *idv; + ks_size_t idv_len; + ks_dht2_nodeid_t nid; + + ks_assert(dht); + ks_assert(raddr); + ks_assert(message); + ks_assert(message->args); + + id = ben_dict_get_by_str(message->args, "id"); + if (!id) { + ks_log(KS_LOG_DEBUG, "Message args missing required key 'id'\n"); + return KS_STATUS_FAIL; + } + + idv = ben_str_val(id); + idv_len = ben_str_len(id); + if (idv_len != KS_DHT_NODEID_LENGTH) { + ks_log(KS_LOG_DEBUG, "Message args 'id' value has an unexpected size of %d\n", idv_len); + return KS_STATUS_FAIL; + } + + if (ks_dht2_nodeid_prealloc(&nid, dht->pool) != KS_STATUS_SUCCESS) { + return KS_STATUS_FAIL; + } + + if (ks_dht2_nodeid_init(&nid, (const uint8_t *)idv) != KS_STATUS_SUCCESS) { + return KS_STATUS_FAIL; + } + + //ks_log(KS_LOG_DEBUG, "Message query ping id is '%s'\n", id->id); + ks_log(KS_LOG_DEBUG, "Mesage query ping is valid\n"); + + ks_dht2_nodeid_deinit(&nid); + + return KS_STATUS_SUCCESS; +} + /* For Emacs: * Local Variables: * mode:c diff --git a/libs/libks/src/dht/ks_dht.h b/libs/libks/src/dht/ks_dht.h index 03633e2da6..bd087b9c10 100644 --- a/libs/libks/src/dht/ks_dht.h +++ b/libs/libks/src/dht/ks_dht.h @@ -21,7 +21,8 @@ struct ks_dht2_s { ks_dht2_nodeid_t nodeid; - ks_hash_t *registry_y; + ks_hash_t *registry_type; + ks_hash_t *registry_query; ks_bool_t bind_ipv4; ks_bool_t bind_ipv6; @@ -51,7 +52,8 @@ KS_DECLARE(ks_status_t) ks_dht2_bind(ks_dht2_t *dht, const ks_sockaddr_t *addr); KS_DECLARE(ks_status_t) ks_dht2_pulse(ks_dht2_t *dht, int32_t timeout); -KS_DECLARE(ks_status_t) ks_dht2_register_y(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback); +KS_DECLARE(ks_status_t) ks_dht2_register_type(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback); +KS_DECLARE(ks_status_t) ks_dht2_register_query(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback); KS_END_EXTERN_C diff --git a/libs/libks/src/dht/ks_dht_message.c b/libs/libks/src/dht/ks_dht_message.c index 13ad52ee22..f7ee960809 100644 --- a/libs/libks/src/dht/ks_dht_message.c +++ b/libs/libks/src/dht/ks_dht_message.c @@ -59,6 +59,8 @@ KS_DECLARE(ks_status_t) ks_dht2_message_init(ks_dht2_message_t *message, const u ks_assert(message->pool); ks_assert(buffer); + message->args = NULL; + message->data = ben_decode((const void *)buffer, buffer_length); if (!message->data) { ks_log(KS_LOG_DEBUG, "Message cannot be decoded\n"); @@ -117,6 +119,7 @@ KS_DECLARE(ks_status_t) ks_dht2_message_deinit(ks_dht2_message_t *message) { ks_assert(message); + message->args = NULL; message->type[0] = '\0'; message->transactionid_length = 0; if (message->data) { diff --git a/libs/libks/src/dht/ks_dht_message.h b/libs/libks/src/dht/ks_dht_message.h index 0e0cd1f765..1530dc94f1 100644 --- a/libs/libks/src/dht/ks_dht_message.h +++ b/libs/libks/src/dht/ks_dht_message.h @@ -7,6 +7,7 @@ KS_BEGIN_EXTERN_C #define KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE 20 #define KS_DHT_MESSAGE_TYPE_MAX_SIZE 20 +#define KS_DHT_MESSAGE_QUERY_MAX_SIZE 20 typedef struct ks_dht2_message_s ks_dht2_message_t; struct ks_dht2_message_s { @@ -15,6 +16,7 @@ struct ks_dht2_message_s { uint8_t transactionid[KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE]; ks_size_t transactionid_length; char type[KS_DHT_MESSAGE_TYPE_MAX_SIZE]; + struct bencode *args; }; KS_DECLARE(ks_status_t) ks_dht2_message_alloc(ks_dht2_message_t **message, ks_pool_t *pool); diff --git a/libs/libks/test/testdht2.c b/libs/libks/test/testdht2.c index fa6f1ecc95..7fd5d31e0e 100644 --- a/libs/libks/test/testdht2.c +++ b/libs/libks/test/testdht2.c @@ -4,8 +4,8 @@ #include <../dht/ks_dht_endpoint-int.h> #include -#define TEST_DHT1_REGISTER_Y_BUFFER "d1:ad2:id20:12345678901234567890e1:q4:ping1:t2:421:y1:ze" -#define TEST_DHT1_PROCESS_BUFFER "d1:ad2:id20:12345678901234567890e1:q4:ping1:t2:421:y1:qe" +#define TEST_DHT1_REGISTER_TYPE_BUFFER "d1:ad2:id20:12345678901234567890e1:q4:ping1:t2:421:y1:ze" +#define TEST_DHT1_PROCESS_QUERY_PING_BUFFER "d1:ad2:id20:12345678901234567890e1:q4:ping1:t2:421:y1:qe" ks_status_t dht_z_callback(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message) { @@ -59,7 +59,7 @@ int main() { err = ks_dht2_init(&dht2, NULL); ok(err == KS_STATUS_SUCCESS); - ks_dht2_register_y(dht1, "z", dht_z_callback); + ks_dht2_register_type(dht1, "z", dht_z_callback); if (have_v4) { err = ks_addr_set(&addr, v4, KS_DHT_DEFAULT_PORT, AF_INET); @@ -91,8 +91,16 @@ int main() { ok(err == KS_STATUS_SUCCESS); } - buflen = strlen(TEST_DHT1_REGISTER_Y_BUFFER); - memcpy(dht1->recv_buffer, TEST_DHT1_REGISTER_Y_BUFFER, buflen); + buflen = strlen(TEST_DHT1_REGISTER_TYPE_BUFFER); + memcpy(dht1->recv_buffer, TEST_DHT1_REGISTER_TYPE_BUFFER, buflen); + dht1->recv_buffer_length = buflen; + + err = ks_dht2_process(dht1, &raddr); + ok(err == KS_STATUS_SUCCESS); + + + buflen = strlen(TEST_DHT1_PROCESS_QUERY_PING_BUFFER); + memcpy(dht1->recv_buffer, TEST_DHT1_PROCESS_QUERY_PING_BUFFER, buflen); dht1->recv_buffer_length = buflen; err = ks_dht2_process(dht1, &raddr);