From 9e9adb8e4bb00b2af0902c4784d9cdd397c94cfa Mon Sep 17 00:00:00 2001 From: Shane Bryldt Date: Mon, 12 Dec 2016 01:02:43 +0000 Subject: [PATCH] FS-9775: Incorporated route table to test find_node before adding deep searching, but routetable bug is currently returning same values for all closest nodes results --- libs/libks/src/dht/ks_dht-int.h | 41 +- libs/libks/src/dht/ks_dht.c | 1167 +++++++++++++---------- libs/libks/src/dht/ks_dht.h | 5 + libs/libks/src/dht/ks_dht_endpoint.c | 14 +- libs/libks/src/dht/ks_dht_message.c | 34 +- libs/libks/src/dht/ks_dht_storageitem.c | 4 +- libs/libks/test/testdht2.c | 80 +- 7 files changed, 781 insertions(+), 564 deletions(-) diff --git a/libs/libks/src/dht/ks_dht-int.h b/libs/libks/src/dht/ks_dht-int.h index 9c5b0ea691..57a1fff882 100644 --- a/libs/libks/src/dht/ks_dht-int.h +++ b/libs/libks/src/dht/ks_dht-int.h @@ -8,22 +8,30 @@ KS_BEGIN_EXTERN_C /** * */ -KS_DECLARE(ks_status_t) ks_dht_utility_compact_address(ks_sockaddr_t *address, - uint8_t *buffer, +KS_DECLARE(ks_status_t) ks_dht_utility_compact_addressinfo(const ks_sockaddr_t *address, + uint8_t *buffer, + ks_size_t *buffer_length, + ks_size_t buffer_size); +KS_DECLARE(ks_status_t) ks_dht_utility_expand_addressinfo(const uint8_t *buffer, + ks_size_t *buffer_length, + ks_size_t buffer_size, + ks_sockaddr_t *address); +KS_DECLARE(ks_status_t) ks_dht_utility_compact_nodeinfo(const ks_dht_nodeid_t *nodeid, + const ks_sockaddr_t *address, + uint8_t *buffer, + ks_size_t *buffer_length, + ks_size_t buffer_size); +KS_DECLARE(ks_status_t) ks_dht_utility_expand_nodeinfo(const uint8_t *buffer, ks_size_t *buffer_length, - ks_size_t buffer_size); -KS_DECLARE(ks_status_t) ks_dht_utility_compact_node(ks_dht_nodeid_t *nodeid, - ks_sockaddr_t *address, - uint8_t *buffer, - ks_size_t *buffer_length, - ks_size_t buffer_size); + ks_size_t buffer_size, + ks_dht_nodeid_t *nodeid, + ks_sockaddr_t *address); /** * */ -KS_DECLARE(void) ks_dht_idle(ks_dht_t *dht); -KS_DECLARE(void) ks_dht_idle_expirations(ks_dht_t *dht); -KS_DECLARE(void) ks_dht_idle_send(ks_dht_t *dht); +KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht); +KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht); KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message); KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht, @@ -44,14 +52,17 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *message); KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_t *message); -KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_message_t *message); -KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t *message); -KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t *message); - KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_message_t *message); + +KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_message_t *message); KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_message_t *message); + +KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t *message); KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_message_t *message); +KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t *message); +KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_message_t *message); + /** * */ diff --git a/libs/libks/src/dht/ks_dht.c b/libs/libks/src/dht/ks_dht.c index b4b25a0edd..f1372c65ca 100644 --- a/libs/libks/src/dht/ks_dht.c +++ b/libs/libks/src/dht/ks_dht.c @@ -73,7 +73,7 @@ KS_DECLARE(ks_status_t) ks_dht_init(ks_dht_t *dht) dht->autoroute = KS_FALSE; dht->autoroute_port = 0; - + ks_hash_create(&dht->registry_type, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool); ks_dht_register_type(dht, "q", ks_dht_process_query); ks_dht_register_type(dht, "r", ks_dht_process_response); @@ -87,15 +87,17 @@ KS_DECLARE(ks_status_t) ks_dht_init(ks_dht_t *dht) ks_hash_create(&dht->registry_error, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool); // @todo register 301 error for internal get/put CAS hash mismatch retry handler - + dht->bind_ipv4 = KS_FALSE; dht->bind_ipv6 = KS_FALSE; - + dht->endpoints = NULL; dht->endpoints_size = 0; ks_hash_create(&dht->endpoints_hash, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK, dht->pool); dht->endpoints_poll = NULL; + dht->pulse_expirations = ks_time_now_sec() + KS_DHT_PULSE_EXPIRATIONS; + ks_q_create(&dht->send_q, dht->pool, 0); dht->send_q_unsent = NULL; dht->recv_buffer_length = 0; @@ -111,7 +113,7 @@ KS_DECLARE(ks_status_t) ks_dht_init(ks_dht_t *dht) ks_hash_create(&dht->storage_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool); ks_hash_set_keysize(dht->storage_hash, KS_DHT_NODEID_SIZE); - + return KS_STATUS_SUCCESS; } @@ -120,30 +122,32 @@ KS_DECLARE(ks_status_t) ks_dht_init(ks_dht_t *dht) */ KS_DECLARE(ks_status_t) ks_dht_deinit(ks_dht_t *dht) { + ks_hash_iterator_t *it; ks_assert(dht); - // @todo free storage_hash entries if (dht->storage_hash) { + for (it = ks_hash_first(dht->storage_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) { + const void *key; + ks_dht_storageitem_t *val; + ks_hash_this(it, &key, NULL, (void **)&val); + ks_dht_storageitem_deinit(val); + ks_dht_storageitem_free(&val); + } ks_hash_destroy(&dht->storage_hash); - dht->storage_hash = NULL; } + dht->token_secret_current = 0; dht->token_secret_previous = 0; dht->token_secret_expiration = 0; - if (dht->rt_ipv4) { - ks_dhtrt_deinitroute(&dht->rt_ipv4); - dht->rt_ipv4 = NULL; - } - if (dht->rt_ipv6) { - ks_dhtrt_deinitroute(&dht->rt_ipv6); - dht->rt_ipv6 = NULL; - } + + if (dht->rt_ipv4) ks_dhtrt_deinitroute(&dht->rt_ipv4); + if (dht->rt_ipv6) ks_dhtrt_deinitroute(&dht->rt_ipv6); + dht->transactionid_next = 0; - if (dht->transactions_hash) { - ks_hash_destroy(&dht->transactions_hash); - dht->transactions_hash = NULL; - } + if (dht->transactions_hash) ks_hash_destroy(&dht->transactions_hash); + dht->recv_buffer_length = 0; + if (dht->send_q) { ks_dht_message_t *msg; while (ks_q_pop_timeout(dht->send_q, (void **)&msg, 1) == KS_STATUS_SUCCESS && msg) { @@ -151,12 +155,14 @@ KS_DECLARE(ks_status_t) ks_dht_deinit(ks_dht_t *dht) ks_dht_message_free(&msg); } ks_q_destroy(&dht->send_q); - dht->send_q = NULL; } if (dht->send_q_unsent) { ks_dht_message_deinit(dht->send_q_unsent); ks_dht_message_free(&dht->send_q_unsent); } + + dht->pulse_expirations = 0; + for (int32_t i = 0; i < dht->endpoints_size; ++i) { ks_dht_endpoint_t *ep = dht->endpoints[i]; ks_dht_endpoint_deinit(ep); @@ -167,33 +173,23 @@ KS_DECLARE(ks_status_t) ks_dht_deinit(ks_dht_t *dht) ks_pool_free(dht->pool, dht->endpoints); dht->endpoints = NULL; } + if (dht->endpoints_poll) { ks_pool_free(dht->pool, dht->endpoints_poll); dht->endpoints_poll = NULL; } - if (dht->endpoints_hash) { - ks_hash_destroy(&dht->endpoints_hash); - dht->endpoints_hash = NULL; - } + if (dht->endpoints_hash) ks_hash_destroy(&dht->endpoints_hash); + dht->bind_ipv4 = KS_FALSE; dht->bind_ipv6 = KS_FALSE; - if (dht->registry_type) { - ks_hash_destroy(&dht->registry_type); - dht->registry_type = NULL; - } - if (dht->registry_query) { - ks_hash_destroy(&dht->registry_query); - dht->registry_query = NULL; - } - if (dht->registry_error) { - ks_hash_destroy(&dht->registry_error); - dht->registry_error = NULL; - } + if (dht->registry_type) ks_hash_destroy(&dht->registry_type); + if (dht->registry_query) ks_hash_destroy(&dht->registry_query); + if (dht->registry_error) ks_hash_destroy(&dht->registry_error); dht->autoroute = KS_FALSE; dht->autoroute_port = 0; - + return KS_STATUS_SUCCESS; } @@ -204,15 +200,12 @@ KS_DECLARE(ks_status_t) ks_dht_autoroute(ks_dht_t *dht, ks_bool_t autoroute, ks_ { ks_assert(dht); - if (!autoroute) { - port = 0; - } else if (port <= 0) { - port = KS_DHT_DEFAULT_PORT; - } - + if (!autoroute) port = 0; + else if (port <= 0) port = KS_DHT_DEFAULT_PORT; + dht->autoroute = autoroute; dht->autoroute_port = port; - + return KS_STATUS_SUCCESS; } @@ -230,23 +223,23 @@ KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, ks_sockaddr_t *rad ks_assert(endpoint); *endpoint = NULL; - + ks_ip_route(ip, sizeof(ip), raddr->host); - // @todo readlock hash - if (!(ep = ks_hash_search(dht->endpoints_hash, ip, KS_UNLOCKED)) && dht->autoroute) { + ep = ks_hash_search(dht->endpoints_hash, ip, KS_READLOCKED); + ks_hash_read_unlock(dht->endpoints_hash); + + if (!ep && dht->autoroute) { ks_sockaddr_t addr; ks_addr_set(&addr, ip, dht->autoroute_port, raddr->family); - if (ks_dht_bind(dht, NULL, &addr, &ep) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } + if (ks_dht_bind(dht, NULL, &addr, &ep) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; } if (!ep) { ks_log(KS_LOG_DEBUG, "No route available to %s\n", raddr->host); return KS_STATUS_FAIL; } - + return KS_STATUS_SUCCESS; } @@ -258,7 +251,7 @@ KS_DECLARE(ks_status_t) ks_dht_register_type(ks_dht_t *dht, const char *value, k ks_assert(dht); ks_assert(value); ks_assert(callback); - // @todo writelock registry + return ks_hash_insert(dht->registry_type, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL; } @@ -270,7 +263,7 @@ KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value, ks_assert(dht); ks_assert(value); ks_assert(callback); - // @todo writelock registry + return ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL; } @@ -282,7 +275,7 @@ KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value, ks_assert(dht); ks_assert(value); ks_assert(callback); - // @todo writelock registry + return ks_hash_insert(dht->registry_error, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL; } @@ -294,34 +287,30 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid ks_dht_endpoint_t *ep; ks_socket_t sock; int32_t epindex; - + ks_assert(dht); ks_assert(addr); ks_assert(addr->family == AF_INET || addr->family == AF_INET6); ks_assert(addr->port); - if (endpoint) { - *endpoint = NULL; - } + if (endpoint) *endpoint = NULL; dht->bind_ipv4 |= addr->family == AF_INET; dht->bind_ipv6 |= addr->family == AF_INET6; - if ((sock = socket(addr->family, SOCK_DGRAM, IPPROTO_UDP)) == KS_SOCK_INVALID) { - return KS_STATUS_FAIL; - } + if ((sock = socket(addr->family, SOCK_DGRAM, IPPROTO_UDP)) == KS_SOCK_INVALID) return KS_STATUS_FAIL; // @todo shouldn't ks_addr_bind take a const addr *? if (ks_addr_bind(sock, (ks_sockaddr_t *)addr) != KS_STATUS_SUCCESS) { ks_socket_close(&sock); return KS_STATUS_FAIL; } - + if (ks_dht_endpoint_alloc(&ep, dht->pool) != KS_STATUS_SUCCESS) { ks_socket_close(&sock); return KS_STATUS_FAIL; } - + if (ks_dht_endpoint_init(ep, nodeid, addr, sock) != KS_STATUS_SUCCESS) { ks_dht_endpoint_free(&ep); ks_socket_close(&sock); @@ -331,35 +320,30 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid ks_socket_option(ep->sock, SO_REUSEADDR, KS_TRUE); ks_socket_option(ep->sock, KS_SO_NONBLOCK, KS_TRUE); - + epindex = dht->endpoints_size++; dht->endpoints = (ks_dht_endpoint_t **)ks_pool_resize(dht->pool, (void *)dht->endpoints, sizeof(ks_dht_endpoint_t *) * dht->endpoints_size); dht->endpoints[epindex] = ep; ks_hash_insert(dht->endpoints_hash, ep->addr.host, ep); - + dht->endpoints_poll = (struct pollfd *)ks_pool_resize(dht->pool, (void *)dht->endpoints_poll, sizeof(struct pollfd) * dht->endpoints_size); dht->endpoints_poll[epindex].fd = ep->sock; dht->endpoints_poll[epindex].events = POLLIN | POLLERR; - // @todo initialize or add local nodeid to appropriate route table if (ep->addr.family == AF_INET) { - if (!dht->rt_ipv4) { - ks_dhtrt_initroute(&dht->rt_ipv4, dht->pool); - } + if (!dht->rt_ipv4) ks_dhtrt_initroute(&dht->rt_ipv4, dht->pool); + ks_dhtrt_create_node(dht->rt_ipv4, ep->nodeid, ks_dht_local_t, ep->addr.host, ep->addr.port, &ep->node); } else { - if (!dht->rt_ipv6) { - ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool); - } - } - - if (endpoint) { - *endpoint = ep; + if (!dht->rt_ipv6) ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool); + ks_dhtrt_create_node(dht->rt_ipv6, ep->nodeid, ks_dht_local_t, ep->addr.host, ep->addr.port, &ep->node); } + if (endpoint) *endpoint = ep; + return KS_STATUS_SUCCESS; } @@ -369,7 +353,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout) { int32_t result; - + ks_assert(dht); ks_assert (timeout >= 0); @@ -378,14 +362,14 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout) if (timeout == 0) { // @todo deal with default timeout, should return quickly but not hog the CPU polling } - + result = ks_poll(dht->endpoints_poll, dht->endpoints_size, timeout); if (result > 0) { for (int32_t i = 0; i < dht->endpoints_size; ++i) { if (dht->endpoints_poll[i].revents & POLLIN) { ks_sockaddr_t raddr = KS_SA_INIT; dht->recv_buffer_length = KS_DHT_RECV_BUFFER_SIZE; - + raddr.family = dht->endpoints[i]->addr.family; if (ks_socket_recvfrom(dht->endpoints_poll[i].fd, dht->recv_buffer, &dht->recv_buffer_length, &raddr) == KS_STATUS_SUCCESS) { // @todo copy data to a ks_dht_frame then create job to call ks_dht_process from threadpool @@ -395,69 +379,180 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout) } } - ks_dht_idle(dht); + ks_dht_pulse_expirations(dht); + + ks_dht_pulse_send(dht); + + if (dht->rt_ipv4) ks_dhtrt_process_table(dht->rt_ipv4); + if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6); } /** * */ -KS_DECLARE(ks_status_t) ks_dht_utility_compact_address(ks_sockaddr_t *address, - uint8_t *buffer, - ks_size_t *buffer_length, - ks_size_t buffer_size) +KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht) { - ks_size_t required = sizeof(uint16_t); + ks_hash_iterator_t *it = NULL; + ks_time_t now = ks_time_now_sec(); + + ks_assert(dht); + + if (dht->pulse_expirations <= now) { + dht->pulse_expirations = now + KS_DHT_PULSE_EXPIRATIONS; + } + + ks_hash_write_lock(dht->transactions_hash); + for (it = ks_hash_first(dht->transactions_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) { + const void *key = NULL; + ks_dht_transaction_t *value = NULL; + ks_bool_t remove = KS_FALSE; + + ks_hash_this(it, &key, NULL, (void **)&value); + if (value->finished) remove = KS_TRUE; + else if (value->expiration <= now) { + ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid); + remove = KS_TRUE; + } + if (remove) { + ks_hash_remove(dht->transactions_hash, (char *)key); + ks_pool_free(value->pool, value); + } + } + ks_hash_write_unlock(dht->transactions_hash); + + if (dht->token_secret_expiration && dht->token_secret_expiration <= now) { + dht->token_secret_expiration = ks_time_now_sec() + KS_DHT_TOKENSECRET_EXPIRATION; + dht->token_secret_previous = dht->token_secret_current; + dht->token_secret_current = rand(); + } +} + +/** + * + */ +KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht) +{ + ks_dht_message_t *message; + ks_bool_t bail = KS_FALSE; + ks_status_t ret = KS_STATUS_SUCCESS; + + ks_assert(dht); + + while (!bail) { + message = NULL; + if (dht->send_q_unsent) { + message = dht->send_q_unsent; + dht->send_q_unsent = NULL; + } + if (!message) bail = ks_q_pop_timeout(dht->send_q, (void **)&message, 1) != KS_STATUS_SUCCESS || !message; + if (!bail) { + bail = (ret = ks_dht_send(dht, message)) != KS_STATUS_SUCCESS; + if (ret == KS_STATUS_BREAK) dht->send_q_unsent = message; + else if (ret == KS_STATUS_SUCCESS) { + ks_dht_message_deinit(message); + ks_dht_message_free(&message); + } + } + } +} + +/** + * + */ +static char *ks_dht_hexid(ks_dht_nodeid_t *id, char *buffer) +{ + char *t = buffer; + + ks_assert(id); + ks_assert(buffer); + + memset(buffer, 0, KS_DHT_NODEID_SIZE * 2 + 1); + + for (int i = 0; i < KS_DHT_NODEID_SIZE; ++i, t += 2) sprintf(t, "%02X", id->id[i]); + + return buffer; +} + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht_utility_compact_addressinfo(const ks_sockaddr_t *address, + uint8_t *buffer, + ks_size_t *buffer_length, + ks_size_t buffer_size) +{ + ks_size_t addr_len; + const void *paddr = NULL; uint16_t port = 0; - + ks_assert(address); ks_assert(buffer); ks_assert(buffer_length); ks_assert(buffer_size); ks_assert(address->family == AF_INET || address->family == AF_INET6); - if (address->family == AF_INET) { - required += sizeof(uint32_t); - } else { - required += 8 * sizeof(uint16_t); - } - - if (*buffer_length + required > buffer_size) { + addr_len = address->family == AF_INET ? sizeof(uint32_t) : (sizeof(uint16_t) * 8); + + if (*buffer_length + addr_len + sizeof(uint16_t) > buffer_size) { ks_log(KS_LOG_DEBUG, "Insufficient space remaining for compacting\n"); return KS_STATUS_FAIL; } if (address->family == AF_INET) { - uint32_t *paddr = (uint32_t *)&address->v.v4.sin_addr; - uint32_t addr = htonl(*paddr); - port = htons(address->v.v4.sin_port); - - memcpy(buffer + (*buffer_length), (void *)&addr, sizeof(uint32_t)); - *buffer_length += sizeof(uint32_t); + paddr = &address->v.v4.sin_addr; // already network byte order + port = address->v.v4.sin_port; // already network byte order } else { - uint16_t *paddr = (uint16_t *)&address->v.v6.sin6_addr; - port = htons(address->v.v6.sin6_port); - - for (int32_t i = 0; i < 8; ++i) { - uint16_t addr = htons(paddr[i]); - memcpy(buffer + (*buffer_length), (void *)&addr, sizeof(uint16_t)); - *buffer_length += sizeof(uint16_t); - } + paddr = &address->v.v6.sin6_addr; // already network byte order + port = address->v.v6.sin6_port; // already network byte order } + memcpy(buffer + (*buffer_length), paddr, sizeof(uint32_t)); + *buffer_length += addr_len; - memcpy(buffer + (*buffer_length), (void *)&port, sizeof(uint16_t)); + memcpy(buffer + (*buffer_length), (const void *)&port, sizeof(uint16_t)); *buffer_length += sizeof(uint16_t); - + return KS_STATUS_SUCCESS; } /** * */ -KS_DECLARE(ks_status_t) ks_dht_utility_compact_node(ks_dht_nodeid_t *nodeid, - ks_sockaddr_t *address, - uint8_t *buffer, - ks_size_t *buffer_length, - ks_size_t buffer_size) +KS_DECLARE(ks_status_t) ks_dht_utility_expand_addressinfo(const uint8_t *buffer, + ks_size_t *buffer_length, + ks_size_t buffer_size, + ks_sockaddr_t *address) +{ + ks_size_t addr_len; + const void *paddr = NULL; + uint16_t port = 0; + + ks_assert(buffer); + ks_assert(buffer_length); + ks_assert(address); + ks_assert(address->family == AF_INET ||address->family == AF_INET6); + + addr_len = address->family == AF_INET ? sizeof(uint32_t) : (sizeof(uint16_t) * 8); + if (*buffer_length + addr_len + sizeof(uint16_t) > buffer_size) return KS_STATUS_FAIL; + + paddr = buffer + *buffer_length; + *buffer_length += addr_len; + port = *((uint16_t *)(buffer + *buffer_length)); + *buffer_length += sizeof(uint16_t); + + // @todo ks_addr_set_raw second parameter should be const? + ks_addr_set_raw(address, (void *)paddr, port, address->family); + + return KS_STATUS_SUCCESS; +} + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht_utility_compact_nodeinfo(const ks_dht_nodeid_t *nodeid, + const ks_sockaddr_t *address, + uint8_t *buffer, + ks_size_t *buffer_length, + ks_size_t buffer_size) { ks_assert(address); ks_assert(buffer); @@ -473,7 +568,30 @@ KS_DECLARE(ks_status_t) ks_dht_utility_compact_node(ks_dht_nodeid_t *nodeid, memcpy(buffer + (*buffer_length), (void *)nodeid, KS_DHT_NODEID_SIZE); *buffer_length += KS_DHT_NODEID_SIZE; - return ks_dht_utility_compact_address(address, buffer, buffer_length, buffer_size); + return ks_dht_utility_compact_addressinfo(address, buffer, buffer_length, buffer_size); +} + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht_utility_expand_nodeinfo(const uint8_t *buffer, + ks_size_t *buffer_length, + ks_size_t buffer_size, + ks_dht_nodeid_t *nodeid, + ks_sockaddr_t *address) +{ + ks_assert(buffer); + ks_assert(buffer_length); + ks_assert(nodeid); + ks_assert(address); + ks_assert(address->family == AF_INET ||address->family == AF_INET6); + + if (*buffer_length + KS_DHT_NODEID_SIZE > buffer_size) return KS_STATUS_FAIL; + + memcpy(nodeid->id, buffer, KS_DHT_NODEID_SIZE); + *buffer_length += KS_DHT_NODEID_SIZE; + + return ks_dht_utility_expand_addressinfo(buffer, buffer_length, buffer_size, address); } /** @@ -490,13 +608,13 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_nodeid(struct bencode *args, cons ks_assert(nodeid); *nodeid = NULL; - + id = ben_dict_get_by_str(args, key); if (!id) { ks_log(KS_LOG_DEBUG, "Message args missing key '%s'\n", key); return KS_STATUS_FAIL; } - + idv = ben_str_val(id); idv_len = ben_str_len(id); if (idv_len != KS_DHT_NODEID_SIZE) { @@ -523,7 +641,7 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_token(struct bencode *args, const ks_assert(token); *token = NULL; - + tok = ben_dict_get_by_str(args, key); if (!tok) { ks_log(KS_LOG_DEBUG, "Message args missing key '%s'\n", key); @@ -558,7 +676,7 @@ KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, ks_sockaddr_t *ra secret = htonl(secret); port = htons(raddr->port); - + SHA1_Init(&sha); SHA1_Update(&sha, &secret, sizeof(uint32_t)); SHA1_Update(&sha, raddr->host, strlen(raddr->host)); @@ -578,98 +696,13 @@ KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, ks_sockaddr_t *raddr, k ks_dht_token_generate(dht->token_secret_current, raddr, target, &tok); - if (!memcmp(tok.token, token->token, KS_DHT_TOKEN_SIZE)) { - return KS_TRUE; - } + if (!memcmp(tok.token, token->token, KS_DHT_TOKEN_SIZE)) return KS_TRUE; ks_dht_token_generate(dht->token_secret_previous, raddr, target, &tok); return memcmp(tok.token, token->token, KS_DHT_TOKEN_SIZE) == 0; } -/** - * - */ -KS_DECLARE(void) ks_dht_idle(ks_dht_t *dht) -{ - ks_assert(dht); - - ks_dht_idle_expirations(dht); - - ks_dht_idle_send(dht); -} - -/** - * - */ -KS_DECLARE(void) ks_dht_idle_expirations(ks_dht_t *dht) -{ - ks_hash_iterator_t *it = NULL; - ks_time_t now = ks_time_now_sec(); - - ks_assert(dht); - - // @todo add delay between checking expirations, every 10 seconds? - - ks_hash_write_lock(dht->transactions_hash); - for (it = ks_hash_first(dht->transactions_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) { - const void *key = NULL; - ks_dht_transaction_t *value = NULL; - ks_bool_t remove = KS_FALSE; - - ks_hash_this(it, &key, NULL, (void **)&value); - if (value->finished) { - remove = KS_TRUE; - } else if (value->expiration <= now) { - ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid); - remove = KS_TRUE; - } - if (remove) { - ks_hash_remove(dht->transactions_hash, (char *)key); - ks_pool_free(value->pool, value); - } - } - ks_hash_write_unlock(dht->transactions_hash); - - if (dht->token_secret_expiration && dht->token_secret_expiration <= now) { - dht->token_secret_expiration = ks_time_now_sec() + KS_DHT_TOKENSECRET_EXPIRATION; - dht->token_secret_previous = dht->token_secret_current; - dht->token_secret_current = rand(); - } -} - -/** - * - */ -KS_DECLARE(void) ks_dht_idle_send(ks_dht_t *dht) -{ - ks_dht_message_t *message; - ks_bool_t bail = KS_FALSE; - ks_status_t ret = KS_STATUS_SUCCESS; - - ks_assert(dht); - - while (!bail) { - message = NULL; - if (dht->send_q_unsent) { - message = dht->send_q_unsent; - dht->send_q_unsent = NULL; - } - if (!message) { - bail = ks_q_pop_timeout(dht->send_q, (void **)&message, 1) != KS_STATUS_SUCCESS || !message; - } - if (!bail) { - bail = (ret = ks_dht_send(dht, message)) != KS_STATUS_SUCCESS; - if (ret == KS_STATUS_BREAK) { - dht->send_q_unsent = message; - } else if (ret == KS_STATUS_SUCCESS) { - ks_dht_message_deinit(message); - ks_dht_message_free(&message); - } - } - } -} - /** * */ @@ -678,7 +711,7 @@ KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message) // @todo calculate max IPV6 payload size? char buf[1000]; ks_size_t buf_len; - + ks_assert(dht); ks_assert(message); ks_assert(message->endpoint); @@ -694,58 +727,6 @@ KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message) return ks_socket_sendto(message->endpoint->sock, (void *)buf, &buf_len, &message->raddr); } -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht, - ks_dht_endpoint_t *ep, - ks_sockaddr_t *raddr, - uint8_t *transactionid, - ks_size_t transactionid_length, - long long errorcode, - const char *errorstr) -{ - ks_dht_message_t *error = NULL; - struct bencode *e = NULL; - ks_status_t ret = KS_STATUS_FAIL; - - ks_assert(dht); - ks_assert(raddr); - ks_assert(transactionid); - ks_assert(errorstr); - - if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } - - if (ks_dht_message_alloc(&error, dht->pool) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } - - if (ks_dht_message_init(error, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) { - goto done; - } - - if (ks_dht_message_error(error, transactionid, transactionid_length, &e) != KS_STATUS_SUCCESS) { - goto done; - } - - ben_list_append(e, ben_int(errorcode)); - ben_list_append(e, ben_blob(errorstr, strlen(errorstr))); - - ks_log(KS_LOG_DEBUG, "Sending message error %d\n", errorcode); - ks_q_push(dht->send_q, (void *)error); - - ret = KS_STATUS_SUCCESS; - - done: - if (ret != KS_STATUS_SUCCESS && error) { - ks_dht_message_deinit(error); - ks_dht_message_free(&error); - } - return ret; -} - /** * */ @@ -770,32 +751,20 @@ KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht, *message = NULL; - if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } + if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; // @todo atomic increment or mutex transactionid = dht->transactionid_next++; - if (ks_dht_transaction_alloc(&trans, dht->pool) != KS_STATUS_SUCCESS) { - goto done; - } + if (ks_dht_transaction_alloc(&trans, dht->pool) != KS_STATUS_SUCCESS) goto done; - if (ks_dht_transaction_init(trans, raddr, transactionid, callback) != KS_STATUS_SUCCESS) { - goto done; - } + if (ks_dht_transaction_init(trans, raddr, transactionid, callback) != KS_STATUS_SUCCESS) goto done; - if (ks_dht_message_alloc(&msg, dht->pool) != KS_STATUS_SUCCESS) { - goto done; - } + if (ks_dht_message_alloc(&msg, dht->pool) != KS_STATUS_SUCCESS) goto done; - if (ks_dht_message_init(msg, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) { - goto done; - } + if (ks_dht_message_init(msg, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) goto done; - if (ks_dht_message_query(msg, transactionid, query, args) != KS_STATUS_SUCCESS) { - goto done; - } + if (ks_dht_message_query(msg, transactionid, query, args) != KS_STATUS_SUCCESS) goto done; *message = msg; @@ -836,25 +805,17 @@ KS_DECLARE(ks_status_t) ks_dht_setup_response(ks_dht_t *dht, ks_assert(raddr); ks_assert(transactionid); ks_assert(message); - + *message = NULL; - if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } + if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; - if (ks_dht_message_alloc(&msg, dht->pool) != KS_STATUS_SUCCESS) { - goto done; - } + if (ks_dht_message_alloc(&msg, dht->pool) != KS_STATUS_SUCCESS) goto done; - if (ks_dht_message_init(msg, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) { - goto done; - } + if (ks_dht_message_init(msg, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) goto done; + + if (ks_dht_message_response(msg, transactionid, transactionid_length, args) != KS_STATUS_SUCCESS) goto done; - if (ks_dht_message_response(msg, transactionid, transactionid_length, args) != KS_STATUS_SUCCESS) { - goto done; - } - *message = msg; ret = KS_STATUS_SUCCESS; @@ -868,80 +829,6 @@ KS_DECLARE(ks_status_t) ks_dht_setup_response(ks_dht_t *dht, return ret; } -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_send_ping(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr) -{ - ks_dht_message_t *message = NULL; - struct bencode *a = NULL; - - ks_assert(dht); - ks_assert(raddr); - - if (ks_dht_setup_query(dht, ep, raddr, "ping", ks_dht_process_response_ping, &message, &a) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } - - ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); - - ks_log(KS_LOG_DEBUG, "Sending message query ping\n"); - ks_q_push(dht->send_q, (void *)message); - - return KS_STATUS_SUCCESS; -} - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid) -{ - ks_dht_message_t *message = NULL; - struct bencode *a = NULL; - - ks_assert(dht); - ks_assert(raddr); - ks_assert(targetid); - - if (ks_dht_setup_query(dht, ep, raddr, "find_node", ks_dht_process_response_findnode, &message, &a) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } - - ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); - ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE)); - - ks_log(KS_LOG_DEBUG, "Sending message query find_node\n"); - ks_q_push(dht->send_q, (void *)message); - - return KS_STATUS_SUCCESS; -} - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_send_get(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid) -{ - ks_dht_message_t *message = NULL; - struct bencode *a = NULL; - - ks_assert(dht); - ks_assert(raddr); - ks_assert(targetid); - - if (ks_dht_setup_query(dht, ep, raddr, "get", ks_dht_process_response_get, &message, &a) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } - - ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); - // @todo check for target item locally, set seq to item seq to prevent getting back what we already have if a newer seq is not available - ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE)); - - ks_log(KS_LOG_DEBUG, "Sending message query get\n"); - ks_q_push(dht->send_q, (void *)message); - - return KS_STATUS_SUCCESS; -} - /** * */ @@ -961,29 +848,22 @@ KS_DECLARE(ks_status_t) ks_dht_process(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_ } // @todo blacklist check for bad actor nodes - - if (ks_dht_message_prealloc(&message, dht->pool) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } - if (ks_dht_message_init(&message, ep, raddr, KS_FALSE) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } + if (ks_dht_message_prealloc(&message, dht->pool) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; - if (ks_dht_message_parse(&message, dht->recv_buffer, dht->recv_buffer_length) != KS_STATUS_SUCCESS) { - goto done; - } + if (ks_dht_message_init(&message, ep, raddr, KS_FALSE) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; - // @todo readlocking registry for calling from threadpool - if (!(callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_type, message.type, KS_UNLOCKED))) { - ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", message.type); - } else { - ret = callback(dht, &message); - } + if (ks_dht_message_parse(&message, dht->recv_buffer, dht->recv_buffer_length) != KS_STATUS_SUCCESS) goto done; + + callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_type, message.type, KS_READLOCKED); + ks_hash_read_unlock(dht->registry_type); + + if (!callback) ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", message.type); + else ret = callback(dht, &message); done: ks_dht_message_deinit(&message); - + return ret; } @@ -1009,7 +889,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me ks_log(KS_LOG_DEBUG, "Message query missing required key 'q'\n"); return KS_STATUS_FAIL; } - + qv = ben_str_val(q); qv_len = ben_str_len(q); if (qv_len >= KS_DHT_MESSAGE_QUERY_MAX_SIZE) { @@ -1030,12 +910,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me message->args = a; - // @todo readlocking registry for calling from threadpool - if (!(callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_query, query, KS_UNLOCKED))) { - ks_log(KS_LOG_DEBUG, "Message query '%s' is not registered\n", query); - } else { - ret = callback(dht, message); - } + callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_query, query, KS_READLOCKED); + ks_hash_read_unlock(dht->registry_query); + + if (!callback) ks_log(KS_LOG_DEBUG, "Message query '%s' is not registered\n", query); + else ret = callback(dht, message); return ret; } @@ -1069,10 +948,9 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_READLOCKED); ks_hash_read_unlock(dht->transactions_hash); - - if (!transaction) { - ks_log(KS_LOG_DEBUG, "Message response rejected with unknown transaction id %d\n", transactionid); - } else if (!ks_addr_cmp(&message->raddr, &transaction->raddr)) { + + if (!transaction) ks_log(KS_LOG_DEBUG, "Message response rejected with unknown transaction id %d\n", transactionid); + else if (!ks_addr_cmp(&message->raddr, &transaction->raddr)) { ks_log(KS_LOG_DEBUG, "Message response rejected due to spoofing from %s %d, expected %s %d\n", message->raddr.host, @@ -1087,6 +965,93 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t return ret; } + + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht, ks_dht_nodeid_t *id) //, ks_dht_search_callback_t callback) +{ + ks_assert(dht); + ks_assert(id); + + // @todo check hash for id to see if search already exists + + // @todo if search does not exist, create new search and store in hash by id + + // @todo queue callback into search, if multiple tasks are searching the same id they can all be notified of results + + // @todo if search existed already and is already running then bail out and let it run + + // @todo find closest nodes to id locally, store as closest results, and queue in search pending a find_node call for closer nodes + + // @todo pop a pending find_node call from search queue and call ks_dht_send_find_node, track last popped for timeout + + + // @todo upon receiving response to find_node, check for an existing search by the id + + // @todo keep track of the closest K(8) nodes found to the id + + // @todo if there is closer node(s) in response, update furthest search result(s) and queue find_node calls for closer nodes + + // @todo if search queue is empty, call callbacks + + // @todo otherwise pop a pending find_node call from search queue and call ks_dht_send_find_node, track last popped for timeout + + + // @todo during pulse iterate searches and check for last popped timeout where find_node received no reply + + // @todo pop a pending find_node call, or call callbacks if empty + + return KS_STATUS_SUCCESS; +} + + + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht, + ks_dht_endpoint_t *ep, + ks_sockaddr_t *raddr, + uint8_t *transactionid, + ks_size_t transactionid_length, + long long errorcode, + const char *errorstr) +{ + ks_dht_message_t *error = NULL; + struct bencode *e = NULL; + ks_status_t ret = KS_STATUS_FAIL; + + ks_assert(dht); + ks_assert(raddr); + ks_assert(transactionid); + ks_assert(errorstr); + + if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + + if (ks_dht_message_alloc(&error, dht->pool) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + + if (ks_dht_message_init(error, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) goto done; + + if (ks_dht_message_error(error, transactionid, transactionid_length, &e) != KS_STATUS_SUCCESS) goto done; + + ben_list_append(e, ben_int(errorcode)); + ben_list_append(e, ben_blob(errorstr, strlen(errorstr))); + + ks_log(KS_LOG_DEBUG, "Sending message error %d\n", errorcode); + ks_q_push(dht->send_q, (void *)error); + + ret = KS_STATUS_SUCCESS; + + done: + if (ret != KS_STATUS_SUCCESS && error) { + ks_dht_message_deinit(error); + ks_dht_message_free(&error); + } + return ret; +} + /** * */ @@ -1122,7 +1087,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me } errorcode = ben_int_val(ec); et = ben_str_val(es); - + memcpy(error, et, es_len); error[es_len] = '\0'; // @todo end of ks_dht_message_parse_error @@ -1134,7 +1099,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_READLOCKED); ks_hash_read_unlock(dht->transactions_hash); - + if (!transaction) { ks_log(KS_LOG_DEBUG, "Message error rejected with unknown transaction id %d\n", transactionid); } else if (!ks_addr_cmp(&message->raddr, &transaction->raddr)) { @@ -1148,10 +1113,11 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me ks_dht_message_callback_t callback; transaction->finished = KS_TRUE; - // @todo readlock on registry - if ((callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_UNLOCKED))) { - ret = callback(dht, message); - } else { + callback = (ks_dht_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_READLOCKED); + ks_hash_read_unlock(dht->registry_error); + + if (callback) ret = callback(dht, message); + else { ks_log(KS_LOG_DEBUG, "Message error received for transaction id %d, error %d: %s\n", transactionid, errorcode, error); ret = KS_STATUS_SUCCESS; } @@ -1160,6 +1126,28 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me return ret; } + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht_send_ping(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr) +{ + ks_dht_message_t *message = NULL; + struct bencode *a = NULL; + + ks_assert(dht); + ks_assert(raddr); + + if (ks_dht_setup_query(dht, ep, raddr, "ping", ks_dht_process_response_ping, &message, &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + + ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); + + ks_log(KS_LOG_DEBUG, "Sending message query ping\n"); + ks_q_push(dht->send_q, (void *)message); + + return KS_STATUS_SUCCESS; +} + /** * */ @@ -1168,16 +1156,21 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_ ks_dht_nodeid_t *id; ks_dht_message_t *response = NULL; struct bencode *r = NULL; + ks_dhtrt_routetable_t *routetable = NULL; + ks_dht_node_t *node = NULL; ks_assert(dht); ks_assert(message); ks_assert(message->args); - if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } + if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; - // @todo add/touch bucket entry for remote node + routetable = message->endpoint->node->table; + + // @todo touch here, or only create if not exists? + if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) { + ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node); + } ks_log(KS_LOG_DEBUG, "Message query ping is valid\n"); @@ -1199,6 +1192,55 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_ return KS_STATUS_SUCCESS; } +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_message_t *message) +{ + ks_dht_nodeid_t *id; + ks_dhtrt_routetable_t *routetable = NULL; + ks_dht_node_t *node = NULL; + + ks_assert(dht); + ks_assert(message); + + if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + + routetable = message->endpoint->node->table; + + if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) { + ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node); + } + + ks_log(KS_LOG_DEBUG, "Message response ping is reached\n"); + + return KS_STATUS_SUCCESS; +} + + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid) +{ + ks_dht_message_t *message = NULL; + struct bencode *a = NULL; + + ks_assert(dht); + ks_assert(raddr); + ks_assert(targetid); + + if (ks_dht_setup_query(dht, ep, raddr, "find_node", ks_dht_process_response_findnode, &message, &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + + ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); + ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE)); + + ks_log(KS_LOG_DEBUG, "Sending message query find_node\n"); + ks_q_push(dht->send_q, (void *)message); + + return KS_STATUS_SUCCESS; +} + /** * */ @@ -1215,30 +1257,26 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess uint8_t buffer6[1000]; ks_size_t buffer4_length = 0; ks_size_t buffer6_length = 0; + ks_dhtrt_routetable_t *routetable = NULL; + ks_dht_node_t *node = NULL; + ks_dhtrt_querynodes_t query; + char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; ks_assert(dht); ks_assert(message); ks_assert(message->args); - if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } + if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; - if (ks_dht_utility_extract_nodeid(message->args, "target", &target) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } + if (ks_dht_utility_extract_nodeid(message->args, "target", &target) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; want = ben_dict_get_by_str(message->args, "want"); if (want) { size_t want_len = ben_list_len(want); for (size_t i = 0; i < want_len; ++i) { struct bencode *iv = ben_list_get(want, i); - if (!ben_cmp_with_str(iv, "n4")) { - want4 = KS_TRUE; - } - if (!ben_cmp_with_str(iv, "n6")) { - want6 = KS_TRUE; - } + if (!ben_cmp_with_str(iv, "n4")) want4 = KS_TRUE; + if (!ben_cmp_with_str(iv, "n6")) want6 = KS_TRUE; } } @@ -1247,27 +1285,55 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess want6 = message->raddr.family == AF_INET6; } - // @todo add/touch bucket entry for remote node - + routetable = message->endpoint->node->table; + + if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) { + ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node); + } + ks_log(KS_LOG_DEBUG, "Message query find_node is valid\n"); + query.nodeid = *target; + query.type = ks_dht_remote_t; + query.max = 8; // should be like KS_DHTRT_BUCKET_SIZE if (want4) { - // @todo get closest nodes to target from ipv4 route table - // @todo compact nodes into buffer4 + query.family = AF_INET; + ks_dhtrt_findclosest_nodes(routetable, &query); + + for (int32_t i = 0; i < query.count; ++i) { + if (ks_dht_utility_compact_nodeinfo(&query.nodes[i]->nodeid, + &query.nodes[i]->addr, + buffer4, + &buffer4_length, + sizeof(buffer4)) != KS_STATUS_SUCCESS) { + return KS_STATUS_FAIL; + } + ks_log(KS_LOG_DEBUG, + "Compacted ipv4 nodeinfo for %s (%s %d)\n", + ks_dht_hexid(&query.nodes[i]->nodeid, id_buf), + query.nodes[i]->addr.host, + query.nodes[i]->addr.port); + } } if (want6) { - // @todo get closest nodes to target from ipv6 route table - // @todo compact nodes into buffer6 - } + query.family = AF_INET6; + ks_dhtrt_findclosest_nodes(routetable, &query); - // @todo remove this, testing only - if (ks_dht_utility_compact_node(id, - &message->raddr, - message->raddr.family == AF_INET ? buffer4 : buffer6, - message->raddr.family == AF_INET ? &buffer4_length : &buffer6_length, - message->raddr.family == AF_INET ? sizeof(buffer4) : sizeof(buffer6)) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; + for (int32_t i = 0; i < query.count; ++i) { + if (ks_dht_utility_compact_nodeinfo(&query.nodes[i]->nodeid, + &query.nodes[i]->addr, + buffer6, + &buffer6_length, + sizeof(buffer6)) != KS_STATUS_SUCCESS) { + return KS_STATUS_FAIL; + } + ks_log(KS_LOG_DEBUG, + "Compacted ipv6 nodeinfo for %s (%s %d)\n", + ks_dht_hexid(&query.nodes[i]->nodeid, id_buf), + query.nodes[i]->addr.host, + query.nodes[i]->addr.port); + } } if (ks_dht_setup_response(dht, @@ -1281,12 +1347,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess } ben_dict_set(r, ben_blob("id", 2), ben_blob(response->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); - if (want4) { - ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length)); - } - if (want6) { - ben_dict_set(r, ben_blob("nodes6", 6), ben_blob(buffer6, buffer6_length)); - } + if (want4) ben_dict_set(r, ben_blob("nodes", 5), ben_blob(buffer4, buffer4_length)); + if (want6) ben_dict_set(r, ben_blob("nodes6", 6), ben_blob(buffer6, buffer6_length)); ks_log(KS_LOG_DEBUG, "Sending message response find_node\n"); ks_q_push(dht->send_q, (void *)response); @@ -1294,6 +1356,112 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess return KS_STATUS_SUCCESS; } +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_message_t *message) +{ + ks_dht_nodeid_t *id; + struct bencode *n; + const uint8_t *nodes = NULL; + const uint8_t *nodes6 = NULL; + size_t nodes_size = 0; + size_t nodes6_size = 0; + size_t nodes_len = 0; + size_t nodes6_len = 0; + ks_dhtrt_routetable_t *routetable = NULL; + ks_dht_node_t *node = NULL; + char id_buf[KS_DHT_NODEID_SIZE * 2 + 1]; + + ks_assert(dht); + ks_assert(message); + + if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + + n = ben_dict_get_by_str(message->args, "nodes"); + if (n) { + nodes = (const uint8_t *)ben_str_val(n); + nodes_size = ben_str_len(n); + } + n = ben_dict_get_by_str(message->args, "nodes6"); + if (n) { + nodes6 = (const uint8_t *)ben_str_val(n); + nodes6_size = ben_str_len(n); + } + + routetable = message->endpoint->node->table; + + if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) { + ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node); + } + + while (nodes_len < nodes_size) { + ks_dht_nodeid_t nid; + ks_sockaddr_t addr; + + addr.family = AF_INET; + if (ks_dht_utility_expand_nodeinfo(nodes, &nodes_len, nodes_size, &nid, &addr) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + + ks_log(KS_LOG_DEBUG, + "Expanded ipv4 nodeinfo for %s (%s %d)\n", + ks_dht_hexid(&nid, id_buf), + addr.host, + addr.port); + + if (ks_dhtrt_touch_node(dht->rt_ipv4, nid) != KS_STATUS_SUCCESS) { + ks_dhtrt_create_node(dht->rt_ipv4, nid, ks_dht_remote_t, addr.host, addr.port, &node); + } + } + + while (nodes6_len < nodes6_size) { + ks_dht_nodeid_t nid; + ks_sockaddr_t addr; + + addr.family = AF_INET6; + if (ks_dht_utility_expand_nodeinfo(nodes6, &nodes6_len, nodes6_size, &nid, &addr) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + + ks_log(KS_LOG_DEBUG, + "Expanded ipv6 nodeinfo for %s (%s %d)\n", + ks_dht_hexid(&nid, id_buf), + addr.host, + addr.port); + + if (ks_dhtrt_touch_node(dht->rt_ipv6, nid) != KS_STATUS_SUCCESS) { + ks_dhtrt_create_node(dht->rt_ipv6, nid, ks_dht_remote_t, addr.host, addr.port, &node); + } + } + // @todo repeat above for ipv6 table + + ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n"); + + return KS_STATUS_SUCCESS; +} + + +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht_send_get(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid) +{ + ks_dht_message_t *message = NULL; + struct bencode *a = NULL; + + ks_assert(dht); + ks_assert(raddr); + ks_assert(targetid); + + if (ks_dht_setup_query(dht, ep, raddr, "get", ks_dht_process_response_get, &message, &a) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + + ben_dict_set(a, ben_blob("id", 2), ben_blob(message->endpoint->nodeid.id, KS_DHT_NODEID_SIZE)); + // @todo check for target item locally, set seq to item seq to prevent getting back what we already have if a newer seq is not available + ben_dict_set(a, ben_blob("target", 6), ben_blob(targetid->id, KS_DHT_NODEID_SIZE)); + + ks_log(KS_LOG_DEBUG, "Sending message query get\n"); + ks_q_push(dht->send_q, (void *)message); + + return KS_STATUS_SUCCESS; +} + /** * */ @@ -1308,30 +1476,30 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t ks_dht_storageitem_t *item = NULL; ks_dht_message_t *response = NULL; struct bencode *r = NULL; + ks_dhtrt_routetable_t *routetable = NULL; + ks_dht_node_t *node = NULL; ks_assert(dht); ks_assert(message); ks_assert(message->args); - if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } + if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + + if (ks_dht_utility_extract_nodeid(message->args, "target", &target) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; - if (ks_dht_utility_extract_nodeid(message->args, "target", &target) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } - seq = ben_dict_get_by_str(message->args, "seq"); - if (seq) { - sequence = ben_int_val(seq); - } + if (seq) sequence = ben_int_val(seq); - // @todo add/touch bucket entry for remote node + routetable = message->endpoint->node->table; + + if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) { + ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node); + } ks_log(KS_LOG_DEBUG, "Message query get is valid\n"); ks_dht_token_generate(dht->token_secret_current, &message->raddr, target, &token); - + item = ks_hash_search(dht->storage_hash, (void *)target, KS_READLOCKED); ks_hash_read_unlock(dht->storage_hash); @@ -1341,7 +1509,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t // @todo find closest ipv4 and ipv6 nodes to target // @todo compact ipv4 and ipv6 nodes into separate buffers - + if (ks_dht_setup_response(dht, message->endpoint, &message->raddr, @@ -1362,9 +1530,7 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t } ben_dict_set(r, ben_blob("seq", 3), ben_int(item->seq)); } - if (!sequence_snuffed) { - ben_dict_set(r, ben_blob("v", 1), ben_clone(item->v)); - } + if (!sequence_snuffed) ben_dict_set(r, ben_blob("v", 1), ben_clone(item->v)); } // @todo nodes, nodes6 @@ -1374,19 +1540,62 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t return KS_STATUS_SUCCESS; } +/** + * + */ +KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_message_t *message) +{ + ks_dht_nodeid_t *id; + ks_dht_token_t *token; + ks_dhtrt_routetable_t *routetable = NULL; + ks_dht_node_t *node = NULL; + + ks_assert(dht); + ks_assert(message); + + // @todo use ks_dht_storageitem_mutable or ks_dht_storageitem_immutable if v is provided + if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + + if (ks_dht_utility_extract_token(message->args, "token", &token) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + + // @todo add extract function for mutable ks_dht_storageitem_key_t + // @todo add extract function for mutable ks_dht_storageitem_signature_t + + routetable = message->endpoint->node->table; + + if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) { + ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node); + } + // @todo add/touch bucket entries for other nodes/nodes6 returned + + ks_log(KS_LOG_DEBUG, "Message response get is reached\n"); + + return KS_STATUS_SUCCESS; +} + + /** * */ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t *message) { + ks_dht_nodeid_t *id; ks_dht_message_t *response = NULL; struct bencode *r = NULL; + ks_dhtrt_routetable_t *routetable = NULL; + ks_dht_node_t *node = NULL; ks_assert(dht); ks_assert(message); ks_assert(message->args); - // @todo add/touch bucket entry for remote node + if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + + routetable = message->endpoint->node->table; + + if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) { + ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node); + } ks_log(KS_LOG_DEBUG, "Message query put is valid\n"); @@ -1408,63 +1617,27 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t return KS_STATUS_SUCCESS; } - /** * */ -KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_message_t *message) -{ - ks_assert(dht); - ks_assert(message); - - // @todo add/touch bucket entry for remote node - - ks_log(KS_LOG_DEBUG, "Message response ping is reached\n"); - - return KS_STATUS_SUCCESS; -} - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_message_t *message) -{ - ks_assert(dht); - ks_assert(message); - - // @todo add/touch bucket entry for remote node and other nodes returned - - ks_log(KS_LOG_DEBUG, "Message response find_node is reached\n"); - - return KS_STATUS_SUCCESS; -} - -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_message_t *message) +KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_message_t *message) { ks_dht_nodeid_t *id; - ks_dht_token_t *token; - + ks_dhtrt_routetable_t *routetable = NULL; + ks_dht_node_t *node = NULL; + ks_assert(dht); ks_assert(message); - // @todo use ks_dht_storageitem_mutable or ks_dht_storageitem_immutable if v is provided - if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; - } - - if (ks_dht_utility_extract_token(message->args, "token", &token) != KS_STATUS_SUCCESS) { - return KS_STATUS_FAIL; + if (ks_dht_utility_extract_nodeid(message->args, "id", &id) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + + routetable = message->endpoint->node->table; + + if (ks_dhtrt_touch_node(routetable, *id) != KS_STATUS_SUCCESS) { + ks_dhtrt_create_node(routetable, *id, ks_dht_remote_t, message->raddr.host, message->raddr.port, &node); } - // @todo add extract function for mutable ks_dht_storageitem_key_t - // @todo add extract function for mutable ks_dht_storageitem_signature_t - - // @todo add/touch bucket entry for remote node and other nodes returned - - ks_log(KS_LOG_DEBUG, "Message response get is reached\n"); + ks_log(KS_LOG_DEBUG, "Message response put is reached\n"); return KS_STATUS_SUCCESS; } diff --git a/libs/libks/src/dht/ks_dht.h b/libs/libks/src/dht/ks_dht.h index b2df07ed3a..bb4580a471 100644 --- a/libs/libks/src/dht/ks_dht.h +++ b/libs/libks/src/dht/ks_dht.h @@ -10,6 +10,7 @@ KS_BEGIN_EXTERN_C #define KS_DHT_DEFAULT_PORT 5309 #define KS_DHT_RECV_BUFFER_SIZE 0xFFFF +#define KS_DHT_PULSE_EXPIRATIONS 10 #define KS_DHT_NODEID_SIZE 20 @@ -19,6 +20,7 @@ KS_BEGIN_EXTERN_C #define KS_DHT_MESSAGE_ERROR_MAX_SIZE 256 #define KS_DHT_TRANSACTION_EXPIRATION_DELAY 30 +#define KS_DHT_SEARCH_EXPIRATION 10 #define KS_DHT_STORAGEITEM_KEY_SIZE crypto_sign_PUBLICKEYBYTES #define KS_DHT_STORAGEITEM_SALT_MAX_SIZE 64 @@ -106,6 +108,7 @@ struct ks_dht_endpoint_s { ks_dht_nodeid_t nodeid; ks_sockaddr_t addr; ks_socket_t sock; + ks_dht_node_t *node; }; struct ks_dht_transaction_s { @@ -151,6 +154,8 @@ struct ks_dht_s { ks_hash_t *endpoints_hash; struct pollfd *endpoints_poll; + ks_time_t pulse_expirations; + ks_q_t *send_q; ks_dht_message_t *send_q_unsent; uint8_t recv_buffer[KS_DHT_RECV_BUFFER_SIZE]; diff --git a/libs/libks/src/dht/ks_dht_endpoint.c b/libs/libks/src/dht/ks_dht_endpoint.c index e31a29a6a0..cf07c8ccb6 100644 --- a/libs/libks/src/dht/ks_dht_endpoint.c +++ b/libs/libks/src/dht/ks_dht_endpoint.c @@ -62,11 +62,8 @@ KS_DECLARE(ks_status_t) ks_dht_endpoint_init(ks_dht_endpoint_t *endpoint, const ks_assert(addr); ks_assert(addr->family == AF_INET || addr->family == AF_INET6); - if (!nodeid) { - randombytes_buf(endpoint->nodeid.id, KS_DHT_NODEID_SIZE); - } else { - memcpy(endpoint->nodeid.id, nodeid->id, KS_DHT_NODEID_SIZE); - } + if (!nodeid) randombytes_buf(endpoint->nodeid.id, KS_DHT_NODEID_SIZE); + else memcpy(endpoint->nodeid.id, nodeid->id, KS_DHT_NODEID_SIZE); endpoint->addr = *addr; endpoint->sock = sock; @@ -81,10 +78,9 @@ KS_DECLARE(ks_status_t) ks_dht_endpoint_deinit(ks_dht_endpoint_t *endpoint) { ks_assert(endpoint); - if (endpoint->sock != KS_SOCK_INVALID) { - ks_socket_close(&endpoint->sock); - endpoint->sock = KS_SOCK_INVALID; - } + endpoint->node = NULL; + if (endpoint->sock != KS_SOCK_INVALID) ks_socket_close(&endpoint->sock); + endpoint->addr = (const ks_sockaddr_t){ 0 }; return KS_STATUS_SUCCESS; } diff --git a/libs/libks/src/dht/ks_dht_message.c b/libs/libks/src/dht/ks_dht_message.c index 122983ca7d..8c71a9f6e0 100644 --- a/libs/libks/src/dht/ks_dht_message.c +++ b/libs/libks/src/dht/ks_dht_message.c @@ -59,13 +59,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_init(ks_dht_message_t *message, ks_dht_en message->endpoint = ep; message->raddr = *raddr; - message->data = NULL; - message->args = NULL; - message->transactionid_length = 0; - message->type[0] = '\0'; - if (alloc_data) { - message->data = ben_dict(); - } + if (alloc_data) message->data = ben_dict(); return KS_STATUS_SUCCESS; } @@ -173,7 +167,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_query(ks_dht_message_t *message, ks_assert(query); tid = htonl(transactionid); - + ben_dict_set(message->data, ben_blob("t", 1), ben_blob((uint8_t *)&tid, sizeof(uint32_t))); ben_dict_set(message->data, ben_blob("y", 1), ben_blob("q", 1)); ben_dict_set(message->data, ben_blob("q", 1), ben_blob(query, strlen(query))); @@ -182,9 +176,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_query(ks_dht_message_t *message, a = ben_dict(); ben_dict_set(message->data, ben_blob("a", 1), a); - if (args) { - *args = a; - } + if (args) *args = a; return KS_STATUS_SUCCESS; } @@ -198,20 +190,18 @@ KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message, struct bencode **args) { struct bencode *r; - + ks_assert(message); ks_assert(transactionid); - + ben_dict_set(message->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length)); ben_dict_set(message->data, ben_blob("y", 1), ben_blob("r", 1)); - + // @note r joins message->data and will be freed with it r = ben_dict(); ben_dict_set(message->data, ben_blob("r", 1), r); - if (args) { - *args = r; - } + if (args) *args = r; return KS_STATUS_SUCCESS; } @@ -225,20 +215,18 @@ KS_DECLARE(ks_status_t) ks_dht_message_error(ks_dht_message_t *message, struct bencode **args) { struct bencode *e; - + ks_assert(message); ks_assert(transactionid); - + ben_dict_set(message->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length)); ben_dict_set(message->data, ben_blob("y", 1), ben_blob("e", 1)); - + // @note r joins message->data and will be freed with it e = ben_list(); ben_dict_set(message->data, ben_blob("e", 1), e); - if (args) { - *args = e; - } + if (args) *args = e; return KS_STATUS_SUCCESS; } diff --git a/libs/libks/src/dht/ks_dht_storageitem.c b/libs/libks/src/dht/ks_dht_storageitem.c index e785528463..d76247bcf7 100644 --- a/libs/libks/src/dht/ks_dht_storageitem.c +++ b/libs/libks/src/dht/ks_dht_storageitem.c @@ -185,9 +185,7 @@ KS_DECLARE(ks_status_t) ks_dht_storageitem_mutable(ks_dht_storageitem_t *item, SHA1_Init(&sha); SHA1_Update(&sha, item->pk.key, KS_DHT_STORAGEITEM_KEY_SIZE); - if (item->salt && item->salt_length > 0) { - SHA1_Update(&sha, item->salt, item->salt_length); - } + if (item->salt && item->salt_length > 0) SHA1_Update(&sha, item->salt, item->salt_length); SHA1_Final(item->id.id, &sha); return KS_STATUS_SUCCESS; diff --git a/libs/libks/test/testdht2.c b/libs/libks/test/testdht2.c index 618f034488..eb7ec3a607 100644 --- a/libs/libks/test/testdht2.c +++ b/libs/libks/test/testdht2.c @@ -15,18 +15,22 @@ ks_status_t dht_z_callback(ks_dht_t *dht, ks_dht_message_t *message) } int main() { - ks_size_t buflen; + //ks_size_t buflen; ks_status_t err; int mask = 0; ks_dht_t *dht1 = NULL; ks_dht_t dht2; + ks_dht_t *dht3 = NULL; ks_dht_endpoint_t *ep1; ks_dht_endpoint_t *ep2; + ks_dht_endpoint_t *ep3; ks_bool_t have_v4, have_v6; char v4[48] = {0}, v6[48] = {0}; ks_sockaddr_t addr; - ks_sockaddr_t raddr; - + ks_sockaddr_t raddr1; + //ks_sockaddr_t raddr2; + //ks_sockaddr_t raddr3; + err = ks_init(); ok(!err); @@ -61,6 +65,13 @@ int main() { err = ks_dht_init(&dht2); ok(err == KS_STATUS_SUCCESS); + err = ks_dht_alloc(&dht3, NULL); + ok(err == KS_STATUS_SUCCESS); + + err = ks_dht_init(dht3); + ok(err == KS_STATUS_SUCCESS); + + ks_dht_register_type(dht1, "z", dht_z_callback); if (have_v4) { @@ -70,13 +81,23 @@ int main() { err = ks_dht_bind(dht1, NULL, &addr, &ep1); ok(err == KS_STATUS_SUCCESS); + raddr1 = addr; + err = ks_addr_set(&addr, v4, KS_DHT_DEFAULT_PORT + 1, AF_INET); ok(err == KS_STATUS_SUCCESS); err = ks_dht_bind(&dht2, NULL, &addr, &ep2); ok(err == KS_STATUS_SUCCESS); - raddr = addr; + //raddr2 = addr; + + err = ks_addr_set(&addr, v4, KS_DHT_DEFAULT_PORT + 2, AF_INET); + ok(err == KS_STATUS_SUCCESS); + + err = ks_dht_bind(dht3, NULL, &addr, &ep3); + ok(err == KS_STATUS_SUCCESS); + + //raddr3 = addr; } if (have_v6) { @@ -91,20 +112,26 @@ int main() { err = ks_dht_bind(&dht2, NULL, &addr, NULL); ok(err == KS_STATUS_SUCCESS); + + err = ks_addr_set(&addr, v6, KS_DHT_DEFAULT_PORT + 2, AF_INET6); + ok(err == KS_STATUS_SUCCESS); + + err = ks_dht_bind(dht3, NULL, &addr, NULL); + ok(err == KS_STATUS_SUCCESS); } - diag("Custom type tests\n"); + //diag("Custom type tests\n"); - buflen = strlen(TEST_DHT1_REGISTER_TYPE_BUFFER); - memcpy(dht1->recv_buffer, TEST_DHT1_REGISTER_TYPE_BUFFER, buflen); - dht1->recv_buffer_length = buflen; + //buflen = strlen(TEST_DHT1_REGISTER_TYPE_BUFFER); + //memcpy(dht1->recv_buffer, TEST_DHT1_REGISTER_TYPE_BUFFER, buflen); + //dht1->recv_buffer_length = buflen; - err = ks_dht_process(dht1, ep1, &raddr); - ok(err == KS_STATUS_SUCCESS); + //err = ks_dht_process(dht1, ep1, &raddr); + //ok(err == KS_STATUS_SUCCESS); - ks_dht_pulse(dht1, 100); + //ks_dht_pulse(dht1, 100); - ks_dht_pulse(&dht2, 100); + //ks_dht_pulse(&dht2, 100); //buflen = strlen(TEST_DHT1_PROCESS_QUERY_PING_BUFFER); @@ -115,20 +142,39 @@ int main() { //ok(err == KS_STATUS_SUCCESS); - diag("Ping tests\n"); + diag("Ping test\n"); - ks_dht_send_ping(dht1, ep1, &raddr); + ks_dht_send_ping(&dht2, ep2, &raddr1); // Queue ping from dht2 to dht1 - ks_dht_pulse(dht1, 100); + ks_dht_pulse(&dht2, 100); // Send queued ping from dht2 to dht1 - ks_dht_pulse(&dht2, 100); + ks_dht_pulse(dht1, 100); // Receive and process ping query from dht2, queue and send ping response - ks_dht_pulse(dht1, 100); + ks_dht_pulse(&dht2, 100); // Receive and process ping response from dht1 + // Test blind find_node from dht3 to dht1 to find dht2 nodeid + + diag("Find_Node test\n"); + + ks_dht_send_findnode(dht3, ep3, &raddr1, &ep2->nodeid); // Queue findnode from dht3 to dht1 + + ks_dht_pulse(dht3, 100); // Send queued findnode from dht3 to dht1 + + ks_dht_pulse(dht1, 100); // Receive and process findnode query from dht3, queue and send findnode response + + ks_dht_pulse(dht3, 100); // Receive and process findnode response from dht1 + + ok(ks_dhtrt_find_node(dht3->rt_ipv4, ep2->nodeid) != NULL); diag("Cleanup\n"); /* Cleanup and shutdown */ + err = ks_dht_deinit(dht3); + ok(err == KS_STATUS_SUCCESS); + + err = ks_dht_free(&dht3); + ok(err == KS_STATUS_SUCCESS); + err = ks_dht_deinit(&dht2); ok(err == KS_STATUS_SUCCESS);