FS-9775: Added support for removing finished transactions via latent purging while expiring

Also added support to send error message responses and updated the test to confirm, errors still need to be updated to send an error responses
This commit is contained in:
Shane Bryldt 2016-12-05 20:43:52 +00:00 committed by Mike Jerris
parent f95ca83b06
commit 7ac7a7e75b
6 changed files with 290 additions and 47 deletions

View File

@ -8,12 +8,21 @@ KS_BEGIN_EXTERN_C
/** /**
* *
*/ */
KS_DECLARE(ks_status_t) ks_dht2_send(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);
KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht); KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht);
KS_DECLARE(ks_status_t) ks_dht2_idle_expirations(ks_dht2_t *dht);
KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr); KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr);
KS_DECLARE(ks_status_t) ks_dht2_send(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);
KS_DECLARE(ks_status_t) ks_dht2_send_error(ks_dht2_t *dht,
ks_sockaddr_t *raddr,
uint8_t *transactionid,
ks_size_t transactionid_length,
long long errorcode,
const char *errorstr);
KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);
KS_DECLARE(ks_status_t) ks_dht2_process_response(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); KS_DECLARE(ks_status_t) ks_dht2_process_response(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);
KS_DECLARE(ks_status_t) ks_dht2_process_error(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);
KS_DECLARE(ks_status_t) ks_dht2_process_query_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); KS_DECLARE(ks_status_t) ks_dht2_process_query_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);
KS_DECLARE(ks_status_t) ks_dht2_process_response_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message); KS_DECLARE(ks_status_t) ks_dht2_process_response_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);

View File

@ -75,10 +75,13 @@ KS_DECLARE(ks_status_t) ks_dht2_init(ks_dht2_t *dht, const ks_dht2_nodeid_raw_t
ks_hash_create(&dht->registry_type, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool); ks_hash_create(&dht->registry_type, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
ks_dht2_register_type(dht, "q", ks_dht2_process_query); ks_dht2_register_type(dht, "q", ks_dht2_process_query);
ks_dht2_register_type(dht, "r", ks_dht2_process_response); ks_dht2_register_type(dht, "r", ks_dht2_process_response);
// @todo ks_hash_insert the r/e callbacks into type registry ks_dht2_register_type(dht, "e", ks_dht2_process_error);
ks_hash_create(&dht->registry_query, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool); ks_hash_create(&dht->registry_query, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
ks_dht2_register_query(dht, "ping", ks_dht2_process_query_ping); ks_dht2_register_query(dht, "ping", ks_dht2_process_query_ping);
ks_hash_create(&dht->registry_error, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
// @todo register 301 error for internal get/put CAS hash mismatch retry handler
dht->bind_ipv4 = KS_FALSE; dht->bind_ipv4 = KS_FALSE;
dht->bind_ipv6 = KS_FALSE; dht->bind_ipv6 = KS_FALSE;
@ -111,7 +114,6 @@ KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht)
dht->recv_buffer_length = 0; dht->recv_buffer_length = 0;
for (int32_t i = 0; i < dht->endpoints_size; ++i) { for (int32_t i = 0; i < dht->endpoints_size; ++i) {
ks_dht2_endpoint_t *ep = dht->endpoints[i]; ks_dht2_endpoint_t *ep = dht->endpoints[i];
//ks_hash_remove(dht->endpoints_hash, ep->addr.host);
ks_dht2_endpoint_deinit(ep); ks_dht2_endpoint_deinit(ep);
ks_dht2_endpoint_free(ep); ks_dht2_endpoint_free(ep);
} }
@ -139,6 +141,10 @@ KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht)
ks_hash_destroy(&dht->registry_query); ks_hash_destroy(&dht->registry_query);
dht->registry_query = NULL; dht->registry_query = NULL;
} }
if (dht->registry_error) {
ks_hash_destroy(&dht->registry_error);
dht->registry_error = NULL;
}
ks_dht2_nodeid_deinit(&dht->nodeid); ks_dht2_nodeid_deinit(&dht->nodeid);
@ -191,6 +197,18 @@ KS_DECLARE(ks_status_t) ks_dht2_register_query(ks_dht2_t *dht, const char *value
return ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL; return ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
} }
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht2_register_error(ks_dht2_t *dht, const char *value, ks_dht2_message_callback_t callback)
{
ks_assert(dht);
ks_assert(value);
ks_assert(callback);
return ks_hash_insert(dht->registry_error, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
}
/** /**
* *
*/ */
@ -297,9 +315,114 @@ KS_DECLARE(ks_status_t) ks_dht2_pulse(ks_dht2_t *dht, int32_t timeout)
} }
} }
ks_dht2_idle(dht);
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht2_maketid(ks_dht2_t *dht)
{
ks_assert(dht);
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht)
{
ks_assert(dht);
if (ks_dht2_idle_expirations(dht) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht2_idle_expirations(ks_dht2_t *dht)
{
ks_hash_iterator_t *it = NULL;
ks_time_t now = ks_time_now_sec();
ks_assert(dht);
// @todo add delay between checking expirations, every 10 seconds?
ks_hash_write_lock(dht->transactions_hash);
for (it = ks_hash_first(dht->transactions_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
const void *key = NULL;
ks_dht2_transaction_t *value = NULL;
ks_bool_t remove = KS_FALSE;
ks_hash_this(it, &key, NULL, (void **)&value);
if (value->finished) {
remove = KS_TRUE;
} else if (value->expiration <= now) {
ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid);
remove = KS_TRUE;
}
if (remove) {
ks_hash_remove(dht->transactions_hash, (char *)key);
ks_pool_free(value->pool, value);
}
}
ks_hash_write_unlock(dht->transactions_hash);
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr)
{
ks_dht2_message_t message;
ks_dht2_message_callback_t callback;
ks_status_t ret = KS_STATUS_FAIL;
ks_assert(dht);
ks_assert(raddr);
ks_log(KS_LOG_DEBUG, "Received message from %s %d\n", raddr->host, raddr->port);
if (raddr->family != AF_INET && raddr->family != AF_INET6) {
ks_log(KS_LOG_DEBUG, "Message from unsupported address family\n");
return KS_STATUS_FAIL;
}
// @todo blacklist check for bad actor nodes
if (ks_dht2_message_prealloc(&message, dht->pool) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
if (ks_dht2_message_init(&message, KS_FALSE) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
if (ks_dht2_message_parse(&message, dht->recv_buffer, dht->recv_buffer_length) != KS_STATUS_SUCCESS) {
goto done;
}
if (!(callback = (ks_dht2_message_callback_t)(intptr_t)ks_hash_search(dht->registry_type, message.type, KS_UNLOCKED))) {
ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", message.type);
} else {
ret = callback(dht, raddr, &message);
}
done:
ks_dht2_message_deinit(&message);
return ret;
}
/** /**
* *
*/ */
@ -350,64 +473,43 @@ KS_DECLARE(ks_status_t) ks_dht2_send(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dh
/** /**
* *
*/ */
KS_DECLARE(ks_status_t) ks_dht2_maketid(ks_dht2_t *dht) KS_DECLARE(ks_status_t) ks_dht2_send_error(ks_dht2_t *dht,
ks_sockaddr_t *raddr,
uint8_t *transactionid,
ks_size_t transactionid_length,
long long errorcode,
const char *errorstr)
{ {
ks_assert(dht); ks_dht2_message_t error;
struct bencode *e;
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht)
{
ks_assert(dht);
return KS_STATUS_SUCCESS;
}
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr)
{
ks_dht2_message_t message;
ks_dht2_message_callback_t callback;
ks_status_t ret = KS_STATUS_FAIL; ks_status_t ret = KS_STATUS_FAIL;
ks_assert(dht); ks_assert(dht);
ks_assert(raddr); ks_assert(raddr);
ks_assert(transactionid);
ks_assert(errorstr);
ks_log(KS_LOG_DEBUG, "Received message from %s %d\n", raddr->host, raddr->port); if (ks_dht2_message_prealloc(&error, dht->pool) != KS_STATUS_SUCCESS) {
if (raddr->family != AF_INET && raddr->family != AF_INET6) {
ks_log(KS_LOG_DEBUG, "Message from unsupported address family\n");
return KS_STATUS_FAIL; return KS_STATUS_FAIL;
} }
// @todo blacklist check for bad actor nodes if (ks_dht2_message_init(&error, KS_TRUE) != KS_STATUS_SUCCESS) {
if (ks_dht2_message_prealloc(&message, dht->pool) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL; return KS_STATUS_FAIL;
} }
if (ks_dht2_message_init(&message, KS_FALSE) != KS_STATUS_SUCCESS) { if (ks_dht2_message_error(&error, transactionid, transactionid_length, &e) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL;
}
if (ks_dht2_message_parse(&message, dht->recv_buffer, dht->recv_buffer_length) != KS_STATUS_SUCCESS) {
goto done; goto done;
} }
if (!(callback = (ks_dht2_message_callback_t)(intptr_t)ks_hash_search(dht->registry_type, message.type, KS_UNLOCKED))) { // @note e joins response.data and will be freed with it
ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", message.type); ben_list_append(e, ben_int(errorcode));
} else { ben_list_append(e, ben_blob(errorstr, strlen(errorstr)));
ret = callback(dht, raddr, &message);
} ks_log(KS_LOG_DEBUG, "Sending message error %d\n", errorcode);
ret = ks_dht2_send(dht, raddr, &error);
done: done:
ks_dht2_message_deinit(&message); ks_dht2_message_deinit(&error);
return ret; return ret;
} }
@ -497,13 +599,96 @@ KS_DECLARE(ks_status_t) ks_dht2_process_response(ks_dht2_t *dht, ks_sockaddr_t *
if (!transaction) { if (!transaction) {
ks_log(KS_LOG_DEBUG, "Message response rejected with unknown transaction id %d\n", transactionid); ks_log(KS_LOG_DEBUG, "Message response rejected with unknown transaction id %d\n", transactionid);
} else if (!ks_addr_cmp(raddr, &transaction->raddr)) {
ks_log(KS_LOG_DEBUG,
"Message response rejected due to spoofing from %s %d, expected %s %d\n",
raddr->host,
raddr->port,
transaction->raddr.host,
transaction->raddr.port);
} else { } else {
// @todo mark transaction for later removal
transaction->finished = KS_TRUE;
ret = transaction->callback(dht, raddr, message); ret = transaction->callback(dht, raddr, message);
} }
return ret; return ret;
} }
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht2_process_error(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message)
{
struct bencode *e;
struct bencode *ec;
struct bencode *es;
const char *et;
ks_size_t es_len;
long long errorcode;
char error[KS_DHT_MESSAGE_ERROR_MAX_SIZE];
ks_dht2_transaction_t *transaction;
uint32_t *tid;
uint32_t transactionid;
ks_status_t ret = KS_STATUS_FAIL;
ks_assert(dht);
ks_assert(raddr);
ks_assert(message);
// @todo start of ks_dht2_message_parse_error
e = ben_dict_get_by_str(message->data, "e");
if (!e) {
ks_log(KS_LOG_DEBUG, "Message error missing required key 'e'\n");
return KS_STATUS_FAIL;
}
ec = ben_list_get(e, 0);
es = ben_list_get(e, 1);
es_len = ben_str_len(es);
if (es_len >= KS_DHT_MESSAGE_ERROR_MAX_SIZE) {
ks_log(KS_LOG_DEBUG, "Message error value has an unexpectedly large size of %d\n", es_len);
return KS_STATUS_FAIL;
}
errorcode = ben_int_val(ec);
et = ben_str_val(es);
memcpy(error, et, es_len);
error[es_len] = '\0';
// todo end of ks_dht2_message_parse_error
message->args = e;
tid = (uint32_t *)message->transactionid;
transactionid = ntohl(*tid);
transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_READLOCKED);
ks_hash_read_unlock(dht->transactions_hash);
if (!transaction) {
ks_log(KS_LOG_DEBUG, "Message error rejected with unknown transaction id %d\n", transactionid);
} else if (!ks_addr_cmp(raddr, &transaction->raddr)) {
ks_log(KS_LOG_DEBUG,
"Message error rejected due to spoofing from %s %d, expected %s %d\n",
raddr->host,
raddr->port,
transaction->raddr.host,
transaction->raddr.port);
} else {
// @todo mark transaction for later removal
ks_dht2_message_callback_t callback;
transaction->finished = KS_TRUE;
if ((callback = (ks_dht2_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_UNLOCKED))) {
ret = callback(dht, raddr, message);
} else {
ks_log(KS_LOG_DEBUG, "Message error received for transaction id %d, error %d: %s\n", transactionid, errorcode, error);
ret = KS_STATUS_SUCCESS;
}
}
return ret;
}
/** /**
* *
*/ */
@ -585,7 +770,7 @@ KS_DECLARE(ks_status_t) ks_dht2_send_query_ping(ks_dht2_t *dht, ks_sockaddr_t *r
goto done; goto done;
} }
if (ks_dht2_transaction_init(transaction, transactionid, ks_dht2_process_response_ping) != KS_STATUS_SUCCESS) { if (ks_dht2_transaction_init(transaction, raddr, transactionid, ks_dht2_process_response_ping) != KS_STATUS_SUCCESS) {
goto done; goto done;
} }
@ -634,6 +819,7 @@ KS_DECLARE(ks_status_t) ks_dht2_send_response_ping(ks_dht2_t *dht,
ks_assert(dht); ks_assert(dht);
ks_assert(raddr); ks_assert(raddr);
ks_assert(transactionid);
if (ks_dht2_message_prealloc(&response, dht->pool) != KS_STATUS_SUCCESS) { if (ks_dht2_message_prealloc(&response, dht->pool) != KS_STATUS_SUCCESS) {
return KS_STATUS_FAIL; return KS_STATUS_FAIL;

View File

@ -16,7 +16,9 @@ KS_BEGIN_EXTERN_C
#define KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE 20 #define KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE 20
#define KS_DHT_MESSAGE_TYPE_MAX_SIZE 20 #define KS_DHT_MESSAGE_TYPE_MAX_SIZE 20
#define KS_DHT_MESSAGE_QUERY_MAX_SIZE 20 #define KS_DHT_MESSAGE_QUERY_MAX_SIZE 20
#define KS_DHT_MESSAGE_ERROR_MAX_SIZE 256
#define KS_DHT_TRANSACTION_EXPIRATION_DELAY 30
typedef struct ks_dht2_s ks_dht2_t; typedef struct ks_dht2_s ks_dht2_t;
typedef struct ks_dht2_nodeid_s ks_dht2_nodeid_t; typedef struct ks_dht2_nodeid_s ks_dht2_nodeid_t;
@ -54,9 +56,11 @@ struct ks_dht2_endpoint_s {
struct ks_dht2_transaction_s { struct ks_dht2_transaction_s {
ks_pool_t *pool; ks_pool_t *pool;
ks_sockaddr_t raddr;
uint32_t transactionid; uint32_t transactionid;
ks_dht2_message_callback_t callback; ks_dht2_message_callback_t callback;
// @todo expiration data ks_time_t expiration;
ks_bool_t finished;
}; };
@ -71,6 +75,7 @@ struct ks_dht2_s {
ks_hash_t *registry_type; ks_hash_t *registry_type;
ks_hash_t *registry_query; ks_hash_t *registry_query;
ks_hash_t *registry_error;
ks_bool_t bind_ipv4; ks_bool_t bind_ipv4;
ks_bool_t bind_ipv6; ks_bool_t bind_ipv6;
@ -137,6 +142,10 @@ KS_DECLARE(ks_status_t) ks_dht2_message_response(ks_dht2_message_t *message,
uint8_t *transactionid, uint8_t *transactionid,
ks_size_t transactionid_length, ks_size_t transactionid_length,
struct bencode **args); struct bencode **args);
KS_DECLARE(ks_status_t) ks_dht2_message_error(ks_dht2_message_t *message,
uint8_t *transactionid,
ks_size_t transactionid_length,
struct bencode **args);
/** /**
* *
@ -150,6 +159,7 @@ KS_DECLARE(ks_status_t) ks_dht2_transaction_prealloc(ks_dht2_transaction_t *tras
KS_DECLARE(ks_status_t) ks_dht2_transaction_free(ks_dht2_transaction_t *transaction); KS_DECLARE(ks_status_t) ks_dht2_transaction_free(ks_dht2_transaction_t *transaction);
KS_DECLARE(ks_status_t) ks_dht2_transaction_init(ks_dht2_transaction_t *transaction, KS_DECLARE(ks_status_t) ks_dht2_transaction_init(ks_dht2_transaction_t *transaction,
ks_sockaddr_t *raddr,
uint32_t transactionid, uint32_t transactionid,
ks_dht2_message_callback_t callback); ks_dht2_message_callback_t callback);
KS_DECLARE(ks_status_t) ks_dht2_transaction_deinit(ks_dht2_transaction_t *transaction); KS_DECLARE(ks_status_t) ks_dht2_transaction_deinit(ks_dht2_transaction_t *transaction);

View File

@ -207,6 +207,33 @@ KS_DECLARE(ks_status_t) ks_dht2_message_response(ks_dht2_message_t *message,
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
/**
*
*/
KS_DECLARE(ks_status_t) ks_dht2_message_error(ks_dht2_message_t *message,
uint8_t *transactionid,
ks_size_t transactionid_length,
struct bencode **args)
{
struct bencode *e;
ks_assert(message);
ks_assert(transactionid);
ben_dict_set(message->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length));
ben_dict_set(message->data, ben_blob("y", 1), ben_blob("e", 1));
// @note r joins message->data and will be freed with it
e = ben_list();
ben_dict_set(message->data, ben_blob("e", 1), e);
if (args) {
*args = e;
}
return KS_STATUS_SUCCESS;
}
/* For Emacs: /* For Emacs:
* Local Variables: * Local Variables:

View File

@ -48,15 +48,20 @@ KS_DECLARE(ks_status_t) ks_dht2_transaction_free(ks_dht2_transaction_t *transact
* *
*/ */
KS_DECLARE(ks_status_t) ks_dht2_transaction_init(ks_dht2_transaction_t *transaction, KS_DECLARE(ks_status_t) ks_dht2_transaction_init(ks_dht2_transaction_t *transaction,
ks_sockaddr_t *raddr,
uint32_t transactionid, uint32_t transactionid,
ks_dht2_message_callback_t callback) ks_dht2_message_callback_t callback)
{ {
ks_assert(transaction); ks_assert(transaction);
ks_assert(raddr);
ks_assert(transaction->pool); ks_assert(transaction->pool);
ks_assert(callback); ks_assert(callback);
transaction->raddr = *raddr;
transaction->transactionid = transactionid; transaction->transactionid = transactionid;
transaction->callback = callback; transaction->callback = callback;
transaction->expiration = ks_time_now_sec() + KS_DHT_TRANSACTION_EXPIRATION_DELAY;
transaction->finished = KS_FALSE;
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
@ -68,8 +73,11 @@ KS_DECLARE(ks_status_t) ks_dht2_transaction_deinit(ks_dht2_transaction_t *transa
{ {
ks_assert(transaction); ks_assert(transaction);
transaction->raddr = (const ks_sockaddr_t){ 0 };
transaction->transactionid = 0; transaction->transactionid = 0;
transaction->callback = NULL; transaction->callback = NULL;
transaction->expiration = 0;
transaction->finished = KS_FALSE;
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }

View File

@ -10,6 +10,7 @@ ks_status_t dht_z_callback(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message
{ {
diag("dht_z_callback\n"); diag("dht_z_callback\n");
ok(message->transactionid[0] == '4' && message->transactionid[1] == '2'); ok(message->transactionid[0] == '4' && message->transactionid[1] == '2');
ks_dht2_send_error(dht, raddr, message->transactionid, message->transactionid_length, 201, "Generic test error");
return KS_STATUS_SUCCESS; return KS_STATUS_SUCCESS;
} }
@ -97,6 +98,8 @@ int main() {
err = ks_dht2_process(dht1, &raddr); err = ks_dht2_process(dht1, &raddr);
ok(err == KS_STATUS_SUCCESS); ok(err == KS_STATUS_SUCCESS);
err = ks_dht2_pulse(&dht2, 1000);
ok(err == KS_STATUS_SUCCESS);
//buflen = strlen(TEST_DHT1_PROCESS_QUERY_PING_BUFFER); //buflen = strlen(TEST_DHT1_PROCESS_QUERY_PING_BUFFER);
//memcpy(dht1->recv_buffer, TEST_DHT1_PROCESS_QUERY_PING_BUFFER, buflen); //memcpy(dht1->recv_buffer, TEST_DHT1_PROCESS_QUERY_PING_BUFFER, buflen);