diff --git a/libs/libks/src/dht/ks_dht-int.h b/libs/libks/src/dht/ks_dht-int.h index 57a1fff882..fbb434f114 100644 --- a/libs/libks/src/dht/ks_dht-int.h +++ b/libs/libks/src/dht/ks_dht-int.h @@ -6,21 +6,98 @@ KS_BEGIN_EXTERN_C /** - * + * Determines the appropriate endpoint to reach a remote address. + * If an endpoint is provided, nothing more needs to be done. + * If no endpoint is provided, first it will check for an active endpoint it can route though. + * If no active endpoint is available and autorouting is enabled it will attempt to bind a usable endpoint. + * @param dht pointer to the dht instance + * @param raddr pointer to the remote address + * @param endpoint dereferenced in/out pointer to the endpoint, if populated then returns immediately + * @return The ks_status_t result: KS_STATUS_SUCCESS, ... + * @see ks_ip_route + * @see ks_hash_read_unlock + * @see ks_addr_set + * @see ks_dht_bind + */ +KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, ks_sockaddr_t *raddr, ks_dht_endpoint_t **endpoint); + +/** + * Called internally to expire various data. + * Handles purging of expired and finished transactions, rotating token secrets, etc. + * @param dht pointer to the dht instance + */ +KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht); + +/** + * Called internally to send queued messages. + * Handles throttling of message sending to ensure system buffers are not overloaded and messages are not dropped. + * @param dht pointer to the dht instance + */ +KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht); + +/** + * Converts a ks_dht_nodeid_t into it's hex string representation. + * @param id pointer to the nodeid + * @param buffer pointer to the buffer able to contain at least (KS_DHT_NODEID_SIZE * 2) + 1 characters + * @return The pointer to the front of the populated string buffer + */ +KS_DECLARE(char *) ks_dht_hexid(ks_dht_nodeid_t *id, char *buffer); + +/** + * Compacts address information as per the DHT specifications. + * @param address pointer to the address being compacted from + * @param buffer pointer to the buffer containing compacted data + * @param buffer_length pointer to the buffer length consumed + * @param buffer_size max size of the buffer + * @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_NO_MEM */ KS_DECLARE(ks_status_t) ks_dht_utility_compact_addressinfo(const ks_sockaddr_t *address, uint8_t *buffer, ks_size_t *buffer_length, ks_size_t buffer_size); + +/** + * Expands address information as per the DHT specifications. + * @param buffer pointer to the buffer containing compacted data + * @param buffer_length pointer to the buffer length consumed + * @param buffer_size max size of the buffer + * @param address pointer to the address being expanded into + * @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_NO_MEM, ... + * @see ks_addr_set_raw + */ KS_DECLARE(ks_status_t) ks_dht_utility_expand_addressinfo(const uint8_t *buffer, ks_size_t *buffer_length, ks_size_t buffer_size, ks_sockaddr_t *address); + +/** + * Compacts node information as per the DHT specifications. + * Compacts address information after the nodeid. + * @param nodeid pointer to the nodeid being compacted from + * @param address pointer to the address being compacted from + * @param buffer pointer to the buffer containing compacted data + * @param buffer_length pointer to the buffer length consumed + * @param buffer_size max size of the buffer + * @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_NO_MEM, ... + * @see ks_dht_utility_compact_addressinfo + */ KS_DECLARE(ks_status_t) ks_dht_utility_compact_nodeinfo(const ks_dht_nodeid_t *nodeid, const ks_sockaddr_t *address, uint8_t *buffer, ks_size_t *buffer_length, ks_size_t buffer_size); + +/** + * Expands address information as per the DHT specifications. + * Expands compacted address information after the nodeid. + * @param buffer pointer to the buffer containing compacted data + * @param buffer_length pointer to the buffer length consumed + * @param buffer_size max size of the buffer + * @param address pointer to the address being expanded into + * @param nodeid pointer to the nodeid being expanded into + * @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_NO_MEM, ... + * @see ks_dht_utility_expand_addressinfo + */ KS_DECLARE(ks_status_t) ks_dht_utility_expand_nodeinfo(const uint8_t *buffer, ks_size_t *buffer_length, ks_size_t buffer_size, @@ -28,12 +105,107 @@ KS_DECLARE(ks_status_t) ks_dht_utility_expand_nodeinfo(const uint8_t *buffer, ks_sockaddr_t *address); /** - * + * Extracts a ks_dht_nodeid_t from a bencode dictionary given a string key. + * @param args pointer to the bencode dictionary + * @param key string key in the bencode dictionary to extract the value from + * @param nodeid dereferenced out pointer to the nodeid + * @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_ARG_INVALID */ -KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht); -KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht); +KS_DECLARE(ks_status_t) ks_dht_utility_extract_nodeid(struct bencode *args, const char *key, ks_dht_nodeid_t **nodeid); +/** + * Extracts a ks_dht_token_t from a bencode dictionary given a string key. + * @param args pointer to the bencode dictionary + * @param key string key in the bencode dictionary to extract the value from + * @param nodeid dereferenced out pointer to the token + * @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_ARG_INVALID + */ +KS_DECLARE(ks_status_t) ks_dht_utility_extract_token(struct bencode *args, const char *key, ks_dht_token_t **token); + +/** + * Generates an opaque write token based on a shifting secret value, the remote address and target nodeid of interest. + * This token ensures that future operations can be verified to the remote peer and target id requested. + * @param secret rotating secret portion of the token hash + * @param raddr pointer to the remote address used for the ip and port in the token hash + * @param target pointer to the nodeid of the target used for the token hash + * @param token pointer to the output token being generated + * @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL + */ +KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token); + +/** + * Verify an opaque write token matches the provided remote address and target nodeid. + * Handles checking against the last two secret values for the token hash. + * @param dht pointer to the dht instance + * @param raddr pointer to the remote address used for the ip and port in the token hash + * @param target pointer to the nodeid of the target used for the token hash + * @param token pointer to the input token being compared + * @return Either KS_TRUE if verification passes, otherwise KS_FALSE + */ +KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token); + +/** + * Encodes a message for transmission as a UDP datagram and sends it. + * Uses the internally tracked local endpoint and remote address to route the UDP datagram. + * @param dht pointer to the dht instance + * @param message pointer to the message being sent + * @return The ks_status_t result: KS_STATUS_SUCCESS, ... + * @see ks_socket_sendto + */ KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message); + +/** + * Sets up the common parts of a query message. + * Determines the local endpoint aware of autorouting, assigns the remote address, generates a transaction, and queues a callback. + * @param dht pointer to the dht instance + * @param ep pointer to the endpoint, may be NULL to find an endpoint or autoroute one + * @param raddr pointer to the remote address + * @param query string value of the query type, for example "ping" + * @param callback callback to be called when response to transaction is received + * @param message dereferenced out pointer to the allocated message + * @param args dereferenced out pointer to the allocated bencode args, may be NULL to ignore output + * @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL, ... + * @see ks_dht_autoroute_check + * @see ks_dht_transaction_alloc + * @see ks_dht_transaction_init + * @see ks_dht_message_alloc + * @see ks_dht_message_init + * @see ks_dht_message_query + * @see ks_hash_insert + */ +KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht, + ks_dht_endpoint_t *ep, + ks_sockaddr_t *raddr, + const char *query, + ks_dht_message_callback_t callback, + ks_dht_message_t **message, + struct bencode **args); + +/** + * Sets up the common parts of a response message. + * Determines the local endpoint aware of autorouting, assigns the remote address, and assigns the transaction. + * @param dht pointer to the dht instance + * @param ep pointer to the endpoint, may be NULL to find an endpoint or autoroute one + * @param raddr pointer to the remote address + * @param transactionid pointer to the buffer containing the transactionid, may be of variable size depending on the querying node + * @param transactionid_length length of the transactionid buffer + * @param message dereferenced out pointer to the allocated message + * @param args dereferenced out pointer to the allocated bencode args, may be NULL to ignore output + * @return The ks_status_t result: KS_STATUS_SUCCESS, ... + * @see ks_dht_autoroute_check + * @see ks_dht_message_alloc + * @see ks_dht_message_init + * @see ks_dht_message_response + */ +KS_DECLARE(ks_status_t) ks_dht_setup_response(ks_dht_t *dht, + ks_dht_endpoint_t *ep, + ks_sockaddr_t *raddr, + uint8_t *transactionid, + ks_size_t transactionid_length, + ks_dht_message_t **message, + struct bencode **args); + + KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, diff --git a/libs/libks/src/dht/ks_dht.c b/libs/libks/src/dht/ks_dht.c index f1372c65ca..ac341c85ab 100644 --- a/libs/libks/src/dht/ks_dht.c +++ b/libs/libks/src/dht/ks_dht.c @@ -2,250 +2,426 @@ #include "ks_dht-int.h" #include "sodium.h" -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_alloc(ks_dht_t **dht, ks_pool_t *pool) { ks_bool_t pool_alloc = !pool; ks_dht_t *d; ks_assert(dht); - - if (pool_alloc) ks_pool_open(&pool); - *dht = d = ks_pool_alloc(pool, sizeof(ks_dht_t)); + /** + * Create a new internally managed pool if one wasn't provided, and returns KS_STATUS_NO_MEM if pool was not created. + */ + if (pool_alloc) ks_pool_open(&pool); + if (!pool) return KS_STATUS_NO_MEM; + + /** + * Allocate the dht instance from the pool, and returns KS_STATUS_NO_MEM if the dht was not created. + */ + *dht = d = ks_pool_alloc(pool, sizeof(ks_dht_t)); + if (!d) return KS_STATUS_NO_MEM; + + /** + * Keep track of the pool used for future allocations and cleanup. + * Keep track of whether the pool was created internally or not. + */ d->pool = pool; d->pool_alloc = pool_alloc; return KS_STATUS_SUCCESS; } -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_prealloc(ks_dht_t *dht, ks_pool_t *pool) +KS_DECLARE(void) ks_dht_prealloc(ks_dht_t *dht, ks_pool_t *pool) { ks_assert(dht); ks_assert(pool); + /** + * Treat preallocate function like allocate, zero the memory like pool allocations do. + */ memset(dht, 0, sizeof(ks_dht_t)); + /** + * Keep track of the pool used for future allocations, pool must + */ dht->pool = pool; dht->pool_alloc = KS_FALSE; - - return KS_STATUS_SUCCESS; } -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_free(ks_dht_t **dht) { - ks_pool_t *pool; - ks_bool_t pool_alloc; + ks_pool_t *pool = NULL; + ks_bool_t pool_alloc = KS_FALSE; + ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); ks_assert(*dht); + + /** + * Call ks_dht_deinit to ensure everything has been cleaned up internally. + * The pool member variables must not be messed with in deinit, they are managed at the allocator layer. + */ + if ((ret = ks_dht_deinit(*dht)) != KS_STATUS_SUCCESS) return ret; + /** + * Temporarily store the allocator level variables because freeing the dht instance will invalidate it. + */ pool = (*dht)->pool; pool_alloc = (*dht)->pool_alloc; + + /** + * Free the dht instance from the pool, after this the dht instance memory is invalid. + */ + if ((ret = ks_pool_free((*dht)->pool, *dht)) != KS_STATUS_SUCCESS) return ret; - ks_dht_deinit(*dht); - ks_pool_free(pool, *dht); - if (pool_alloc) { - ks_pool_close(&pool); - } - + /** + * At this point dht instance is invalidated so NULL the pointer. + */ *dht = NULL; + /** + * If the pool was allocated internally, destroy it using the temporary variables stored earlier. + * If this fails, something catastrophically bad happened like memory corruption. + */ + if (pool_alloc && (ret = ks_pool_close(&pool)) != KS_STATUS_SUCCESS) return ret; + + return KS_STATUS_SUCCESS; } -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_init(ks_dht_t *dht) { + ks_status_t ret = KS_STATUS_SUCCESS; + ks_assert(dht); ks_assert(dht->pool); + /** + * Default autorouting to disabled. + */ dht->autoroute = KS_FALSE; dht->autoroute_port = 0; - ks_hash_create(&dht->registry_type, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool); + /** + * Create the message type registry. + */ + if ((ret = ks_hash_create(&dht->registry_type, + KS_HASH_MODE_DEFAULT, + KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, + dht->pool)) != KS_STATUS_SUCCESS) return ret; + + /** + * Register the message type callbacks for query (q), response (r), and error (e) + */ ks_dht_register_type(dht, "q", ks_dht_process_query); ks_dht_register_type(dht, "r", ks_dht_process_response); ks_dht_register_type(dht, "e", ks_dht_process_error); - ks_hash_create(&dht->registry_query, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool); + /** + * Create the message query registry. + */ + if ((ret = ks_hash_create(&dht->registry_query, + KS_HASH_MODE_DEFAULT, + KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, + dht->pool)) != KS_STATUS_SUCCESS) return ret; + + /** + * Register the message query callbacks for ping, find_node, etc. + */ ks_dht_register_query(dht, "ping", ks_dht_process_query_ping); ks_dht_register_query(dht, "find_node", ks_dht_process_query_findnode); ks_dht_register_query(dht, "get", ks_dht_process_query_get); ks_dht_register_query(dht, "put", ks_dht_process_query_put); - ks_hash_create(&dht->registry_error, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool); + /** + * Create the message error registry. + */ + if ((ret = ks_hash_create(&dht->registry_error, + KS_HASH_MODE_DEFAULT, + KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, + dht->pool)) != KS_STATUS_SUCCESS) return ret; // @todo register 301 error for internal get/put CAS hash mismatch retry handler + /** + * Default these to FALSE, binding will set them TRUE when a respective address is bound. + * @todo these may not be useful anymore they are from legacy code + */ dht->bind_ipv4 = KS_FALSE; dht->bind_ipv6 = KS_FALSE; + /** + * Initialize the data used to track endpoints to NULL, binding will handle latent allocations. + * The endpoints and endpoints_poll arrays are maintained in parallel to optimize polling. + */ dht->endpoints = NULL; dht->endpoints_size = 0; - ks_hash_create(&dht->endpoints_hash, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK, dht->pool); dht->endpoints_poll = NULL; + /** + * Create the endpoints hash for fast lookup, this is used to route externally provided remote addresses when the local endpoint is unknown. + * This also provides the basis for autorouting to find unbound interfaces and bind them at runtime. + * This hash uses the host ip string concatenated with a colon and the port, ie: "123.123.123.123:123" or ipv6 equivilent + */ + if ((ret = ks_hash_create(&dht->endpoints_hash, + KS_HASH_MODE_DEFAULT, + KS_HASH_FLAG_RWLOCK, + dht->pool)) != KS_STATUS_SUCCESS) return ret; + + /** + * Default expirations to not be checked for one pulse. + */ dht->pulse_expirations = ks_time_now_sec() + KS_DHT_PULSE_EXPIRATIONS; - ks_q_create(&dht->send_q, dht->pool, 0); + /** + * Create the queue for outgoing messages, this ensures sending remains async and can be throttled when system buffers are full. + */ + if ((ret = ks_q_create(&dht->send_q, dht->pool, 0)) != KS_STATUS_SUCCESS) return ret; + + /** + * If a message is popped from the queue for sending but the system buffers are too full, this is used to temporarily store the message. + */ dht->send_q_unsent = NULL; + + /** + * The dht uses a single internal large receive buffer for receiving all frames, this may change in the future to offload processing to a threadpool. + */ dht->recv_buffer_length = 0; + /** + * Initialize the first transaction id randomly, this doesn't really matter. + */ dht->transactionid_next = 1; //rand(); - ks_hash_create(&dht->transactions_hash, KS_HASH_MODE_INT, KS_HASH_FLAG_RWLOCK, dht->pool); + /** + * Create the hash to track pending transactions on queries that are pending responses. + * It should be impossible to receive a duplicate transaction id in the hash before it expires, but if it does an error is preferred. + */ + if ((ret = ks_hash_create(&dht->transactions_hash, + KS_HASH_MODE_INT, + KS_HASH_FLAG_RWLOCK, + dht->pool)) != KS_STATUS_SUCCESS) return ret; + + /** + * The internal route tables will be latent allocated when binding. + */ dht->rt_ipv4 = NULL; dht->rt_ipv6 = NULL; + /** + * The opaque write tokens require some entropy for generating which needs to change periodically but accept tokens using the last two secrets. + */ dht->token_secret_current = dht->token_secret_previous = rand(); dht->token_secret_expiration = ks_time_now_sec() + KS_DHT_TOKENSECRET_EXPIRATION; - ks_hash_create(&dht->storage_hash, KS_HASH_MODE_ARBITRARY, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool); + /** + * Create the hash to store arbitrary data for BEP44. + */ + if ((ret = ks_hash_create(&dht->storage_hash, + KS_HASH_MODE_ARBITRARY, + KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, + dht->pool)) != KS_STATUS_SUCCESS) return ret; + /** + * The storage hash uses arbitrary key size, which requires the key size be provided, they are the same size as nodeid's. + */ ks_hash_set_keysize(dht->storage_hash, KS_DHT_NODEID_SIZE); return KS_STATUS_SUCCESS; } -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_deinit(ks_dht_t *dht) { ks_hash_iterator_t *it; + ks_status_t ret = KS_STATUS_SUCCESS; + ks_assert(dht); + /** + * Cleanup the storage hash and it's contents if it is allocated. + */ if (dht->storage_hash) { for (it = ks_hash_first(dht->storage_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) { const void *key; ks_dht_storageitem_t *val; ks_hash_this(it, &key, NULL, (void **)&val); - ks_dht_storageitem_deinit(val); - ks_dht_storageitem_free(&val); + if ((ret = ks_dht_storageitem_deinit(val)) != KS_STATUS_SUCCESS) return ret; + if ((ret = ks_dht_storageitem_free(&val)) != KS_STATUS_SUCCESS) return ret; } ks_hash_destroy(&dht->storage_hash); } + /** + * Zero out the opaque write token variables. + */ dht->token_secret_current = 0; dht->token_secret_previous = 0; dht->token_secret_expiration = 0; + /** + * Cleanup the route tables if they are allocated. + */ if (dht->rt_ipv4) ks_dhtrt_deinitroute(&dht->rt_ipv4); if (dht->rt_ipv6) ks_dhtrt_deinitroute(&dht->rt_ipv6); + /** + * Cleanup the transactions hash if it is allocated. + */ dht->transactionid_next = 0; if (dht->transactions_hash) ks_hash_destroy(&dht->transactions_hash); + /** + * Probably don't need this, recv_buffer_length is temporary and may change + */ dht->recv_buffer_length = 0; + /** + * Cleanup the send queue and it's contents if it is allocated. + */ if (dht->send_q) { ks_dht_message_t *msg; while (ks_q_pop_timeout(dht->send_q, (void **)&msg, 1) == KS_STATUS_SUCCESS && msg) { - ks_dht_message_deinit(msg); - ks_dht_message_free(&msg); + if ((ret = ks_dht_message_deinit(msg)) != KS_STATUS_SUCCESS) return ret; + if ((ret = ks_dht_message_free(&msg)) != KS_STATUS_SUCCESS) return ret; } - ks_q_destroy(&dht->send_q); + if ((ret = ks_q_destroy(&dht->send_q)) != KS_STATUS_SUCCESS) return ret; } + + /** + * Cleanup the cached popped message if it is set. + */ if (dht->send_q_unsent) { - ks_dht_message_deinit(dht->send_q_unsent); - ks_dht_message_free(&dht->send_q_unsent); + if ((ret = ks_dht_message_deinit(dht->send_q_unsent)) != KS_STATUS_SUCCESS) return ret; + if ((ret = ks_dht_message_free(&dht->send_q_unsent)) != KS_STATUS_SUCCESS) return ret; } + /** + * Probably don't need this + */ dht->pulse_expirations = 0; + /** + * Cleanup any endpoints that have been allocated. + */ for (int32_t i = 0; i < dht->endpoints_size; ++i) { ks_dht_endpoint_t *ep = dht->endpoints[i]; - ks_dht_endpoint_deinit(ep); - ks_dht_endpoint_free(&ep); + if ((ret = ks_dht_endpoint_deinit(ep)) != KS_STATUS_SUCCESS) return ret; + if ((ret = ks_dht_endpoint_free(&ep)) != KS_STATUS_SUCCESS) return ret; } dht->endpoints_size = 0; + + /** + * Cleanup the array of endpoint pointers if it is allocated. + */ if (dht->endpoints) { - ks_pool_free(dht->pool, dht->endpoints); + if ((ret = ks_pool_free(dht->pool, dht->endpoints)) != KS_STATUS_SUCCESS) return ret; dht->endpoints = NULL; } + /** + * Cleanup the array of endpoint polling data if it is allocated. + */ if (dht->endpoints_poll) { - ks_pool_free(dht->pool, dht->endpoints_poll); + if ((ret = ks_pool_free(dht->pool, dht->endpoints_poll)) != KS_STATUS_SUCCESS) return ret; dht->endpoints_poll = NULL; } + + /** + * Cleanup the endpoints hash if it is allocated. + */ if (dht->endpoints_hash) ks_hash_destroy(&dht->endpoints_hash); + /** + * Probably don't need this + */ dht->bind_ipv4 = KS_FALSE; dht->bind_ipv6 = KS_FALSE; + /** + * Cleanup the type, query, and error registries if they have been allocated. + */ if (dht->registry_type) ks_hash_destroy(&dht->registry_type); if (dht->registry_query) ks_hash_destroy(&dht->registry_query); if (dht->registry_error) ks_hash_destroy(&dht->registry_error); + /** + * Probably don't need this + */ dht->autoroute = KS_FALSE; dht->autoroute_port = 0; return KS_STATUS_SUCCESS; } -/** - * - */ -KS_DECLARE(ks_status_t) ks_dht_autoroute(ks_dht_t *dht, ks_bool_t autoroute, ks_port_t port) +KS_DECLARE(void) ks_dht_autoroute(ks_dht_t *dht, ks_bool_t autoroute, ks_port_t port) { ks_assert(dht); + /** + * If autorouting is being disabled, port is always set to zero, otherwise if the port is zero use the DHT default port + */ if (!autoroute) port = 0; else if (port <= 0) port = KS_DHT_DEFAULT_PORT; + /** + * Set the autoroute state + */ dht->autoroute = autoroute; dht->autoroute_port = port; - - return KS_STATUS_SUCCESS; } -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_autoroute_check(ks_dht_t *dht, ks_sockaddr_t *raddr, ks_dht_endpoint_t **endpoint) { // @todo lookup standard def for IPV6 max size char ip[48]; ks_dht_endpoint_t *ep = NULL; + ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); ks_assert(raddr); ks_assert(endpoint); - *endpoint = NULL; + /** + * If the endpoint is already provided just leave it alone and return successfully. + */ + if (*endpoint) return KS_STATUS_SUCCESS; - ks_ip_route(ip, sizeof(ip), raddr->host); + /** + * Use the remote address to figure out what local address we should use to attempt contacting it. + */ + if ((ret = ks_ip_route(ip, sizeof(ip), raddr->host)) != KS_STATUS_SUCCESS) return ret; + /** + * Check if the endpoint has already been bound for the address we want to route through. + * @todo ip:port for key to allow a single ip with multiple endpoints on different ports + */ ep = ks_hash_search(dht->endpoints_hash, ip, KS_READLOCKED); - ks_hash_read_unlock(dht->endpoints_hash); + if ((ret = ks_hash_read_unlock(dht->endpoints_hash)) != KS_STATUS_SUCCESS) return ret; + /** + * If the endpoint has not been bound, and autorouting is enabled then try to bind the new address. + */ if (!ep && dht->autoroute) { ks_sockaddr_t addr; - ks_addr_set(&addr, ip, dht->autoroute_port, raddr->family); - if (ks_dht_bind(dht, NULL, &addr, &ep) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if ((ret = ks_addr_set(&addr, ip, dht->autoroute_port, raddr->family)) != KS_STATUS_SUCCESS) return ret; + if ((ret = ks_dht_bind(dht, NULL, &addr, &ep)) != KS_STATUS_SUCCESS) return ret; } + /** + * If no endpoint can be found to route through then all hope is lost, bail out with a failure. + */ if (!ep) { ks_log(KS_LOG_DEBUG, "No route available to %s\n", raddr->host); return KS_STATUS_FAIL; } + /** + * Reaching here means an endpoint is available, assign it and return successfully. + */ + *endpoint = ep; + return KS_STATUS_SUCCESS; } -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_register_type(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback) { ks_assert(dht); @@ -255,9 +431,6 @@ KS_DECLARE(ks_status_t) ks_dht_register_type(ks_dht_t *dht, const char *value, k return ks_hash_insert(dht->registry_type, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL; } -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback) { ks_assert(dht); @@ -267,9 +440,6 @@ KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value, return ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL; } -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback) { ks_assert(dht); @@ -279,77 +449,128 @@ KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value, return ks_hash_insert(dht->registry_error, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL; } -/** - * - */ + KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid, const ks_sockaddr_t *addr, ks_dht_endpoint_t **endpoint) { - ks_dht_endpoint_t *ep; - ks_socket_t sock; - int32_t epindex; + ks_dht_endpoint_t *ep = NULL; + ks_socket_t sock = KS_SOCK_INVALID; + int32_t epindex = 0; + ks_status_t ret = KS_STATUS_SUCCESS; ks_assert(dht); ks_assert(addr); ks_assert(addr->family == AF_INET || addr->family == AF_INET6); ks_assert(addr->port); + /** + * If capturing the endpoint output, make sure it is set NULL to start with. + */ if (endpoint) *endpoint = NULL; + /** + * Legacy code, this can probably go away + */ dht->bind_ipv4 |= addr->family == AF_INET; dht->bind_ipv6 |= addr->family == AF_INET6; + /** + * Attempt to open a UDP datagram socket for the given address family. + */ if ((sock = socket(addr->family, SOCK_DGRAM, IPPROTO_UDP)) == KS_SOCK_INVALID) return KS_STATUS_FAIL; + /** + * Set some common socket options for non-blocking IO and forced binding when already in use + */ + if ((ret = ks_socket_option(sock, SO_REUSEADDR, KS_TRUE)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_socket_option(sock, KS_SO_NONBLOCK, KS_TRUE)) != KS_STATUS_SUCCESS) goto done; + + /** + * Attempt to bind the socket to the desired local address. + */ // @todo shouldn't ks_addr_bind take a const addr *? - if (ks_addr_bind(sock, (ks_sockaddr_t *)addr) != KS_STATUS_SUCCESS) { - ks_socket_close(&sock); - return KS_STATUS_FAIL; - } + if ((ret = ks_addr_bind(sock, (ks_sockaddr_t *)addr)) != KS_STATUS_SUCCESS) goto done; - if (ks_dht_endpoint_alloc(&ep, dht->pool) != KS_STATUS_SUCCESS) { - ks_socket_close(&sock); - return KS_STATUS_FAIL; - } - - if (ks_dht_endpoint_init(ep, nodeid, addr, sock) != KS_STATUS_SUCCESS) { - ks_dht_endpoint_free(&ep); - ks_socket_close(&sock); - return KS_STATUS_FAIL; - } - - ks_socket_option(ep->sock, SO_REUSEADDR, KS_TRUE); - ks_socket_option(ep->sock, KS_SO_NONBLOCK, KS_TRUE); + /** + * Allocate the endpoint to track the local socket. + */ + if ((ret = ks_dht_endpoint_alloc(&ep, dht->pool)) != KS_STATUS_SUCCESS) goto done; + /** + * Initialize the node, may provide NULL nodeid to have one generated internally. + */ + if ((ret = ks_dht_endpoint_init(ep, nodeid, addr, sock)) != KS_STATUS_SUCCESS) goto done; + /** + * Resize the endpoints array to take another endpoint pointer. + */ epindex = dht->endpoints_size++; dht->endpoints = (ks_dht_endpoint_t **)ks_pool_resize(dht->pool, (void *)dht->endpoints, sizeof(ks_dht_endpoint_t *) * dht->endpoints_size); dht->endpoints[epindex] = ep; - ks_hash_insert(dht->endpoints_hash, ep->addr.host, ep); + /** + * Add the new endpoint into the endpoints hash for quick lookups. + * @todo ip:port for key to allow a single ip with multiple endpoints on different ports + */ + if (!ks_hash_insert(dht->endpoints_hash, ep->addr.host, ep)) { + ret = KS_STATUS_FAIL; + goto done; + } + + /** + * Resize the endpoints_poll array to keep in parallel with endpoints array, populate new entry with the right data. + */ dht->endpoints_poll = (struct pollfd *)ks_pool_resize(dht->pool, (void *)dht->endpoints_poll, sizeof(struct pollfd) * dht->endpoints_size); dht->endpoints_poll[epindex].fd = ep->sock; dht->endpoints_poll[epindex].events = POLLIN | POLLERR; + /** + * If the route table for the family doesn't exist yet, initialize a new route table and create a local node for the endpoint. + */ if (ep->addr.family == AF_INET) { - if (!dht->rt_ipv4) ks_dhtrt_initroute(&dht->rt_ipv4, dht->pool); - ks_dhtrt_create_node(dht->rt_ipv4, ep->nodeid, ks_dht_local_t, ep->addr.host, ep->addr.port, &ep->node); + if (!dht->rt_ipv4 && (ret = ks_dhtrt_initroute(&dht->rt_ipv4, dht->pool)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dhtrt_create_node(dht->rt_ipv4, + ep->nodeid, + ks_dht_local_t, + ep->addr.host, + ep->addr.port, + &ep->node)) != KS_STATUS_SUCCESS) goto done; } else { - if (!dht->rt_ipv6) ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool); - ks_dhtrt_create_node(dht->rt_ipv6, ep->nodeid, ks_dht_local_t, ep->addr.host, ep->addr.port, &ep->node); + if (!dht->rt_ipv6 && (ret = ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool)) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dhtrt_create_node(dht->rt_ipv6, + ep->nodeid, + ks_dht_local_t, + ep->addr.host, + ep->addr.port, + &ep->node)) != KS_STATUS_SUCCESS) goto done; } + /** + * If the endpoint output is being captured, assign it and return successfully. + */ if (endpoint) *endpoint = ep; - return KS_STATUS_SUCCESS; + ret = KS_STATUS_SUCCESS; + + done: + if (ret != KS_STATUS_SUCCESS) { + /** + * If any failures occur, we need to make sure the socket is properly closed. + * This will be done in ks_dht_endpoint_deinit only if the socket was assigned during a successful ks_dht_endpoint_init. + * Then return whatever failure condition resulted in landed here. + */ + if (sock != KS_SOCK_INVALID && ep && ep->sock == KS_SOCK_INVALID) ks_socket_close(&sock); + if (ep) { + ks_dht_endpoint_deinit(ep); + ks_dht_endpoint_free(&ep); + } + } + return ret; } -/** - * - */ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout) { int32_t result; @@ -387,9 +608,6 @@ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout) if (dht->rt_ipv6) ks_dhtrt_process_table(dht->rt_ipv6); } -/** - * - */ KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht) { ks_hash_iterator_t *it = NULL; @@ -427,9 +645,6 @@ KS_DECLARE(void) ks_dht_pulse_expirations(ks_dht_t *dht) } } -/** - * - */ KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht) { ks_dht_message_t *message; @@ -456,10 +671,7 @@ KS_DECLARE(void) ks_dht_pulse_send(ks_dht_t *dht) } } -/** - * - */ -static char *ks_dht_hexid(ks_dht_nodeid_t *id, char *buffer) +KS_DECLARE(char *) ks_dht_hexid(ks_dht_nodeid_t *id, char *buffer) { char *t = buffer; @@ -473,9 +685,6 @@ static char *ks_dht_hexid(ks_dht_nodeid_t *id, char *buffer) return buffer; } -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_utility_compact_addressinfo(const ks_sockaddr_t *address, uint8_t *buffer, ks_size_t *buffer_length, @@ -495,7 +704,7 @@ KS_DECLARE(ks_status_t) ks_dht_utility_compact_addressinfo(const ks_sockaddr_t * if (*buffer_length + addr_len + sizeof(uint16_t) > buffer_size) { ks_log(KS_LOG_DEBUG, "Insufficient space remaining for compacting\n"); - return KS_STATUS_FAIL; + return KS_STATUS_NO_MEM; } if (address->family == AF_INET) { @@ -514,9 +723,6 @@ KS_DECLARE(ks_status_t) ks_dht_utility_compact_addressinfo(const ks_sockaddr_t * return KS_STATUS_SUCCESS; } -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_utility_expand_addressinfo(const uint8_t *buffer, ks_size_t *buffer_length, ks_size_t buffer_size, @@ -532,7 +738,7 @@ KS_DECLARE(ks_status_t) ks_dht_utility_expand_addressinfo(const uint8_t *buffer, ks_assert(address->family == AF_INET ||address->family == AF_INET6); addr_len = address->family == AF_INET ? sizeof(uint32_t) : (sizeof(uint16_t) * 8); - if (*buffer_length + addr_len + sizeof(uint16_t) > buffer_size) return KS_STATUS_FAIL; + if (*buffer_length + addr_len + sizeof(uint16_t) > buffer_size) return KS_STATUS_NO_MEM; paddr = buffer + *buffer_length; *buffer_length += addr_len; @@ -540,14 +746,9 @@ KS_DECLARE(ks_status_t) ks_dht_utility_expand_addressinfo(const uint8_t *buffer, *buffer_length += sizeof(uint16_t); // @todo ks_addr_set_raw second parameter should be const? - ks_addr_set_raw(address, (void *)paddr, port, address->family); - - return KS_STATUS_SUCCESS; + return ks_addr_set_raw(address, (void *)paddr, port, address->family); } -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_utility_compact_nodeinfo(const ks_dht_nodeid_t *nodeid, const ks_sockaddr_t *address, uint8_t *buffer, @@ -562,7 +763,7 @@ KS_DECLARE(ks_status_t) ks_dht_utility_compact_nodeinfo(const ks_dht_nodeid_t *n if (*buffer_length + KS_DHT_NODEID_SIZE > buffer_size) { ks_log(KS_LOG_DEBUG, "Insufficient space remaining for compacting\n"); - return KS_STATUS_FAIL; + return KS_STATUS_NO_MEM; } memcpy(buffer + (*buffer_length), (void *)nodeid, KS_DHT_NODEID_SIZE); @@ -571,9 +772,6 @@ KS_DECLARE(ks_status_t) ks_dht_utility_compact_nodeinfo(const ks_dht_nodeid_t *n return ks_dht_utility_compact_addressinfo(address, buffer, buffer_length, buffer_size); } -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_utility_expand_nodeinfo(const uint8_t *buffer, ks_size_t *buffer_length, ks_size_t buffer_size, @@ -586,7 +784,7 @@ KS_DECLARE(ks_status_t) ks_dht_utility_expand_nodeinfo(const uint8_t *buffer, ks_assert(address); ks_assert(address->family == AF_INET ||address->family == AF_INET6); - if (*buffer_length + KS_DHT_NODEID_SIZE > buffer_size) return KS_STATUS_FAIL; + if (*buffer_length + KS_DHT_NODEID_SIZE > buffer_size) return KS_STATUS_NO_MEM; memcpy(nodeid->id, buffer, KS_DHT_NODEID_SIZE); *buffer_length += KS_DHT_NODEID_SIZE; @@ -594,9 +792,6 @@ KS_DECLARE(ks_status_t) ks_dht_utility_expand_nodeinfo(const uint8_t *buffer, return ks_dht_utility_expand_addressinfo(buffer, buffer_length, buffer_size, address); } -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_utility_extract_nodeid(struct bencode *args, const char *key, ks_dht_nodeid_t **nodeid) { struct bencode *id; @@ -612,14 +807,14 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_nodeid(struct bencode *args, cons id = ben_dict_get_by_str(args, key); if (!id) { ks_log(KS_LOG_DEBUG, "Message args missing key '%s'\n", key); - return KS_STATUS_FAIL; + return KS_STATUS_ARG_INVALID; } idv = ben_str_val(id); idv_len = ben_str_len(id); if (idv_len != KS_DHT_NODEID_SIZE) { ks_log(KS_LOG_DEBUG, "Message args '%s' value has an unexpected size of %d\n", key, idv_len); - return KS_STATUS_FAIL; + return KS_STATUS_ARG_INVALID; } *nodeid = (ks_dht_nodeid_t *)idv; @@ -627,9 +822,6 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_nodeid(struct bencode *args, cons return KS_STATUS_SUCCESS; } -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_utility_extract_token(struct bencode *args, const char *key, ks_dht_token_t **token) { struct bencode *tok; @@ -645,14 +837,14 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_token(struct bencode *args, const tok = ben_dict_get_by_str(args, key); if (!tok) { ks_log(KS_LOG_DEBUG, "Message args missing key '%s'\n", key); - return KS_STATUS_FAIL; + return KS_STATUS_ARG_INVALID; } tokv = ben_str_val(tok); tokv_len = ben_str_len(tok); if (tokv_len != KS_DHT_TOKEN_SIZE) { ks_log(KS_LOG_DEBUG, "Message args '%s' value has an unexpected size of %d\n", key, tokv_len); - return KS_STATUS_FAIL; + return KS_STATUS_ARG_INVALID; } *token = (ks_dht_token_t *)tokv; @@ -661,9 +853,6 @@ KS_DECLARE(ks_status_t) ks_dht_utility_extract_token(struct bencode *args, const } -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token) { SHA_CTX sha; @@ -677,35 +866,30 @@ KS_DECLARE(ks_status_t) ks_dht_token_generate(uint32_t secret, ks_sockaddr_t *ra secret = htonl(secret); port = htons(raddr->port); - SHA1_Init(&sha); - SHA1_Update(&sha, &secret, sizeof(uint32_t)); - SHA1_Update(&sha, raddr->host, strlen(raddr->host)); - SHA1_Update(&sha, &port, sizeof(uint16_t)); - SHA1_Update(&sha, target->id, KS_DHT_NODEID_SIZE); - SHA1_Final(token->token, &sha); + if (!SHA1_Init(&sha) || + !SHA1_Update(&sha, &secret, sizeof(uint32_t)) || + !SHA1_Update(&sha, raddr->host, strlen(raddr->host)) || + !SHA1_Update(&sha, &port, sizeof(uint16_t)) || + !SHA1_Update(&sha, target->id, KS_DHT_NODEID_SIZE) || + !SHA1_Final(token->token, &sha)) return KS_STATUS_FAIL; return KS_STATUS_SUCCESS; } -/** - * - */ KS_DECLARE(ks_bool_t) ks_dht_token_verify(ks_dht_t *dht, ks_sockaddr_t *raddr, ks_dht_nodeid_t *target, ks_dht_token_t *token) { ks_dht_token_t tok; - ks_dht_token_generate(dht->token_secret_current, raddr, target, &tok); + if (ks_dht_token_generate(dht->token_secret_current, raddr, target, &tok) != KS_STATUS_SUCCESS) return KS_FALSE; - if (!memcmp(tok.token, token->token, KS_DHT_TOKEN_SIZE)) return KS_TRUE; + if (memcmp(tok.token, token->token, KS_DHT_TOKEN_SIZE) == 0) return KS_TRUE; - ks_dht_token_generate(dht->token_secret_previous, raddr, target, &tok); + if (ks_dht_token_generate(dht->token_secret_previous, raddr, target, &tok) != KS_STATUS_SUCCESS) return KS_FALSE; return memcmp(tok.token, token->token, KS_DHT_TOKEN_SIZE) == 0; } -/** - * - */ + KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message) { // @todo calculate max IPV6 payload size? @@ -727,9 +911,7 @@ KS_DECLARE(ks_status_t) ks_dht_send(ks_dht_t *dht, ks_dht_message_t *message) return ks_socket_sendto(message->endpoint->sock, (void *)buf, &buf_len, &message->raddr); } -/** - * - */ + KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, @@ -751,24 +933,27 @@ KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht, *message = NULL; - if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if (!ep && (ret = ks_dht_autoroute_check(dht, raddr, &ep)) != KS_STATUS_SUCCESS) return ret; // @todo atomic increment or mutex transactionid = dht->transactionid_next++; - if (ks_dht_transaction_alloc(&trans, dht->pool) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_transaction_alloc(&trans, dht->pool)) != KS_STATUS_SUCCESS) goto done; - if (ks_dht_transaction_init(trans, raddr, transactionid, callback) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_transaction_init(trans, raddr, transactionid, callback)) != KS_STATUS_SUCCESS) goto done; - if (ks_dht_message_alloc(&msg, dht->pool) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_message_alloc(&msg, dht->pool)) != KS_STATUS_SUCCESS) goto done; - if (ks_dht_message_init(msg, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_message_init(msg, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done; - if (ks_dht_message_query(msg, transactionid, query, args) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_message_query(msg, transactionid, query, args)) != KS_STATUS_SUCCESS) goto done; *message = msg; - ks_hash_insert(dht->transactions_hash, (void *)&trans->transactionid, trans); + if (!ks_hash_insert(dht->transactions_hash, (void *)&trans->transactionid, trans)) { + ret = KS_STATUS_FAIL; + goto done; + } ret = KS_STATUS_SUCCESS; @@ -787,9 +972,6 @@ KS_DECLARE(ks_status_t) ks_dht_setup_query(ks_dht_t *dht, return ret; } -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_setup_response(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, @@ -808,13 +990,13 @@ KS_DECLARE(ks_status_t) ks_dht_setup_response(ks_dht_t *dht, *message = NULL; - if (!ep && ks_dht_autoroute_check(dht, raddr, &ep) != KS_STATUS_SUCCESS) return KS_STATUS_FAIL; + if (!ep && (ret = ks_dht_autoroute_check(dht, raddr, &ep)) != KS_STATUS_SUCCESS) return ret; - if (ks_dht_message_alloc(&msg, dht->pool) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_message_alloc(&msg, dht->pool)) != KS_STATUS_SUCCESS) goto done; - if (ks_dht_message_init(msg, ep, raddr, KS_TRUE) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_message_init(msg, ep, raddr, KS_TRUE)) != KS_STATUS_SUCCESS) goto done; - if (ks_dht_message_response(msg, transactionid, transactionid_length, args) != KS_STATUS_SUCCESS) goto done; + if ((ret = ks_dht_message_response(msg, transactionid, transactionid_length, args)) != KS_STATUS_SUCCESS) goto done; *message = msg; @@ -829,9 +1011,7 @@ KS_DECLARE(ks_status_t) ks_dht_setup_response(ks_dht_t *dht, return ret; } -/** - * - */ + KS_DECLARE(ks_status_t) ks_dht_process(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr) { ks_dht_message_t message; @@ -867,9 +1047,6 @@ KS_DECLARE(ks_status_t) ks_dht_process(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_ return ret; } -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *message) { struct bencode *q; @@ -919,9 +1096,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_query(ks_dht_t *dht, ks_dht_message_t *me return ret; } -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t *message) { struct bencode *r; @@ -966,10 +1140,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_response(ks_dht_t *dht, ks_dht_message_t } - -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht, ks_dht_nodeid_t *id) //, ks_dht_search_callback_t callback) { ks_assert(dht); @@ -1007,10 +1177,6 @@ KS_DECLARE(ks_status_t) ks_dht_search(ks_dht_t *dht, ks_dht_nodeid_t *id) //, ks } - -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, @@ -1052,9 +1218,6 @@ KS_DECLARE(ks_status_t) ks_dht_send_error(ks_dht_t *dht, return ret; } -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *message) { struct bencode *e; @@ -1127,9 +1290,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_error(ks_dht_t *dht, ks_dht_message_t *me } -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_send_ping(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr) { ks_dht_message_t *message = NULL; @@ -1148,9 +1308,6 @@ KS_DECLARE(ks_status_t) ks_dht_send_ping(ks_dht_t *dht, ks_dht_endpoint_t *ep, k return KS_STATUS_SUCCESS; } -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_t *message) { ks_dht_nodeid_t *id; @@ -1192,9 +1349,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_ping(ks_dht_t *dht, ks_dht_message_ return KS_STATUS_SUCCESS; } -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_message_t *message) { ks_dht_nodeid_t *id; @@ -1218,9 +1372,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_ping(ks_dht_t *dht, ks_dht_messa } -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid) { ks_dht_message_t *message = NULL; @@ -1241,9 +1392,6 @@ KS_DECLARE(ks_status_t) ks_dht_send_findnode(ks_dht_t *dht, ks_dht_endpoint_t *e return KS_STATUS_SUCCESS; } -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_message_t *message) { ks_dht_nodeid_t *id; @@ -1356,9 +1504,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_findnode(ks_dht_t *dht, ks_dht_mess return KS_STATUS_SUCCESS; } -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_message_t *message) { ks_dht_nodeid_t *id; @@ -1438,9 +1583,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_findnode(ks_dht_t *dht, ks_dht_m } -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_send_get(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_dht_nodeid_t *targetid) { ks_dht_message_t *message = NULL; @@ -1462,9 +1604,6 @@ KS_DECLARE(ks_status_t) ks_dht_send_get(ks_dht_t *dht, ks_dht_endpoint_t *ep, ks return KS_STATUS_SUCCESS; } -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t *message) { ks_dht_nodeid_t *id; @@ -1540,9 +1679,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_get(ks_dht_t *dht, ks_dht_message_t return KS_STATUS_SUCCESS; } -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_message_t *message) { ks_dht_nodeid_t *id; @@ -1574,9 +1710,8 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_get(ks_dht_t *dht, ks_dht_messag } -/** - * - */ +// @todo ks_dht_send_put + KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t *message) { ks_dht_nodeid_t *id; @@ -1617,9 +1752,6 @@ KS_DECLARE(ks_status_t) ks_dht_process_query_put(ks_dht_t *dht, ks_dht_message_t return KS_STATUS_SUCCESS; } -/** - * - */ KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_message_t *message) { ks_dht_nodeid_t *id; diff --git a/libs/libks/src/dht/ks_dht.h b/libs/libks/src/dht/ks_dht.h index bb4580a471..c77750d146 100644 --- a/libs/libks/src/dht/ks_dht.h +++ b/libs/libks/src/dht/ks_dht.h @@ -174,45 +174,176 @@ struct ks_dht_s { }; /** - * + * Allocator function for ks_dht_t. + * Should be used when a ks_dht_t is allocated on the heap, and may provide an external memory pool or allocate one internally. + * @param dht dereferenced out pointer to the allocated dht instance + * @param pool pointer to the memory pool used by the dht instance, may be NULL to create a new pool internally + * @param The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_NO_MEM */ KS_DECLARE(ks_status_t) ks_dht_alloc(ks_dht_t **dht, ks_pool_t *pool); -KS_DECLARE(ks_status_t) ks_dht_prealloc(ks_dht_t *dht, ks_pool_t *pool); + +/** + * Preallocator function for ks_dht_t. + * Should be used when a ks_dht_t is preallocated on the stack or within another structure, and must provide an external memory pool. + * @param dht pointer to the dht instance + * @param pool pointer to the memory pool used by the dht instance + */ +KS_DECLARE(void) ks_dht_prealloc(ks_dht_t *dht, ks_pool_t *pool); + +/** + * Deallocator function for ks_dht_t. + * Must be used when a ks_dht_t is allocated using ks_dht_alloc, will also destroy memory pool if it was created internally. + * @param dht dereferenced in/out pointer to the dht instance, NULL upon return + * @return The ks_status_t result: KS_STATUS_SUCCESS, ... + * @see ks_dht_deinit + * @see ks_pool_free + * @see ks_pool_close + */ KS_DECLARE(ks_status_t) ks_dht_free(ks_dht_t **dht); - +/** + * Constructor function for ks_dht_t. + * Must be used regardless of how ks_dht_t is allocated, will allocate and initialize internal state including registration of message handlers. + * @param dht pointer to the dht instance + * @return The ks_status_t result: KS_STATUS_SUCCESS, ... + * @see ks_hash_create + * @see ks_dht_register_type + * @see ks_q_create + */ KS_DECLARE(ks_status_t) ks_dht_init(ks_dht_t *dht); + +/** + * Destructor function for ks_dht_t. + * Must be used regardless of how ks_dht_t is allocated, will deallocate and deinitialize internal state. + * @param dht pointer to the dht instance + * @return The ks_status_t result: KS_STATUS_SUCCESS, ... + * @see ks_dht_storageitem_deinit + * @see ks_dht_storageitem_free + * @see ks_hash_destroy + * @see ks_dht_message_deinit + * @see ks_dht_message_free + * @see ks_q_destroy + * @see ks_dht_endpoint_deinit + * @see ks_dht_endpoint_free + * @see ks_pool_free + */ KS_DECLARE(ks_status_t) ks_dht_deinit(ks_dht_t *dht); -KS_DECLARE(ks_status_t) ks_dht_autoroute(ks_dht_t *dht, ks_bool_t autoroute, ks_port_t port); +/** + * Enable or disable (default) autorouting support. + * When enabled, autorouting will allow sending to remote addresses on interfaces which are not yet bound. + * The address will be bound with the provided autoroute port when this occurs. + * @param dht pointer to the dht instance + * @param autoroute enable or disable autorouting + * @param port when enabling autorouting this port will be used to bind new addresses, may be 0 to use the default DHT port + */ +KS_DECLARE(void) ks_dht_autoroute(ks_dht_t *dht, ks_bool_t autoroute, ks_port_t port); +/** + * Register a callback for a specific message type. + * Will overwrite any duplicate handlers. + * @param dht pointer to the dht instance + * @param value string of the type text under the 'y' key of a message + * @param callback the callback to be called when a message matches + * @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL + */ +KS_DECLARE(ks_status_t) ks_dht_register_type(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback); + +/** + * Register a callback for a specific message query. + * Will overwrite any duplicate handlers. + * @param dht pointer to the dht instance + * @param value string of the type text under the 'q' key of a message + * @param callback the callback to be called when a message matches + * @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL + */ +KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback); + +/** + * Register a callback for a specific message error. + * Will overwrite any duplicate handlers. + * @param dht pointer to the dht instance + * @param value string of the errorcode under the first item of the 'e' key of a message + * @param callback the callback to be called when a message matches + * @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL + */ +KS_DECLARE(ks_status_t) ks_dht_register_error(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback); + +/** + * Bind a local address and port for receiving UDP datagrams. + * @param dht pointer to the dht instance + * @param nodeid pointer to a nodeid for this endpoint, may be NULL to generate one randomly + * @param addr pointer to the remote address information + * @param dereferenced out pointer to the allocated endpoint, may be NULL to ignore endpoint + * @return The ks_status_t result: KS_STATUS_SUCCESS, KS_STATUS_FAIL, ... + * @see ks_socket_option + * @see ks_addr_bind + * @see ks_dht_endpoint_alloc + * @see ks_dht_endpoint_init + * @see ks_hash_insert + * @see ks_dhtrt_initroute + * @see ks_dhtrt_create_node + */ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid, const ks_sockaddr_t *addr, ks_dht_endpoint_t **endpoint); + +/** + * Pulse the internals of dht. + * Handles receiving UDP datagrams, dispatching processing, handles expirations, throttled message sending, route table pulsing, etc. + * @param dht pointer to the dht instance + * @param timeout timeout value used when polling sockets for new UDP datagrams + */ KS_DECLARE(void) ks_dht_pulse(ks_dht_t *dht, int32_t timeout); -KS_DECLARE(ks_status_t) ks_dht_register_type(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback); -KS_DECLARE(ks_status_t) ks_dht_register_query(ks_dht_t *dht, const char *value, ks_dht_message_callback_t callback); - /** * */ KS_DECLARE(ks_status_t) ks_dht_message_alloc(ks_dht_message_t **message, ks_pool_t *pool); + +/** + * + */ KS_DECLARE(ks_status_t) ks_dht_message_prealloc(ks_dht_message_t *message, ks_pool_t *pool); + +/** + * + */ KS_DECLARE(ks_status_t) ks_dht_message_free(ks_dht_message_t **message); +/** + * + */ KS_DECLARE(ks_status_t) ks_dht_message_init(ks_dht_message_t *message, ks_dht_endpoint_t *ep, ks_sockaddr_t *raddr, ks_bool_t alloc_data); + +/** + * + */ KS_DECLARE(ks_status_t) ks_dht_message_deinit(ks_dht_message_t *message); +/** + * + */ KS_DECLARE(ks_status_t) ks_dht_message_parse(ks_dht_message_t *message, const uint8_t *buffer, ks_size_t buffer_length); +/** + * + */ KS_DECLARE(ks_status_t) ks_dht_message_query(ks_dht_message_t *message, uint32_t transactionid, const char *query, struct bencode **args); + +/** + * + */ KS_DECLARE(ks_status_t) ks_dht_message_response(ks_dht_message_t *message, uint8_t *transactionid, ks_size_t transactionid_length, struct bencode **args); + +/** + * + */ KS_DECLARE(ks_status_t) ks_dht_message_error(ks_dht_message_t *message, uint8_t *transactionid, ks_size_t transactionid_length, @@ -221,21 +352,30 @@ KS_DECLARE(ks_status_t) ks_dht_message_error(ks_dht_message_t *message, /** * */ +KS_DECLARE(ks_status_t) ks_dht_transaction_alloc(ks_dht_transaction_t **transaction, ks_pool_t *pool); /** * */ -KS_DECLARE(ks_status_t) ks_dht_transaction_alloc(ks_dht_transaction_t **transaction, ks_pool_t *pool); KS_DECLARE(ks_status_t) ks_dht_transaction_prealloc(ks_dht_transaction_t *transaction, ks_pool_t *pool); + +/** + * + */ KS_DECLARE(ks_status_t) ks_dht_transaction_free(ks_dht_transaction_t **transaction); KS_DECLARE(ks_status_t) ks_dht_transaction_init(ks_dht_transaction_t *transaction, ks_sockaddr_t *raddr, uint32_t transactionid, ks_dht_message_callback_t callback); + +/** + * + */ KS_DECLARE(ks_status_t) ks_dht_transaction_deinit(ks_dht_transaction_t *transaction); + /** * route table methods * @@ -244,10 +384,10 @@ KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_po KS_DECLARE(void) ks_dhtrt_deinitroute(ks_dhtrt_routetable_t **table); KS_DECLARE(ks_status_t) ks_dhtrt_create_node(ks_dhtrt_routetable_t* table, - ks_dht_nodeid_t nodeid, - enum ks_dht_nodetype_t type, - char* ip, unsigned short port, - ks_dht_node_t** node); + ks_dht_nodeid_t nodeid, + enum ks_dht_nodetype_t type, + char* ip, unsigned short port, + ks_dht_node_t** node); KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t* table, ks_dht_node_t* node); diff --git a/libs/libks/test/testdht2.c b/libs/libks/test/testdht2.c index eb7ec3a607..d77eee436c 100644 --- a/libs/libks/test/testdht2.c +++ b/libs/libks/test/testdht2.c @@ -59,8 +59,7 @@ int main() { err = ks_dht_init(dht1); ok(err == KS_STATUS_SUCCESS); - err = ks_dht_prealloc(&dht2, dht1->pool); - ok(err == KS_STATUS_SUCCESS); + ks_dht_prealloc(&dht2, dht1->pool); err = ks_dht_init(&dht2); ok(err == KS_STATUS_SUCCESS);