mirror of
https://github.com/signalwire/freeswitch.git
synced 2025-08-13 17:38:59 +00:00
FS-9775: Renamed registries, added query registry with ping callback, unit test updated
This commit is contained in:
committed by
Mike Jerris
parent
e9fdd9c946
commit
ed8e4dc610
@@ -9,7 +9,9 @@ KS_BEGIN_EXTERN_C
|
||||
KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht);
|
||||
KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr);
|
||||
|
||||
|
||||
KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);
|
||||
KS_DECLARE(ks_status_t) ks_dht2_process_query_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);
|
||||
|
||||
KS_END_EXTERN_C
|
||||
|
||||
#endif /* KS_DHT_INT_H */
|
||||
|
@@ -70,8 +70,12 @@ KS_DECLARE(ks_status_t) ks_dht2_init(ks_dht2_t *dht, const uint8_t *nodeid)
|
||||
return KS_STATUS_FAIL;
|
||||
}
|
||||
|
||||
ks_hash_create(&dht->registry_y, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
|
||||
// @todo ks_hash_insert the q/r/e callbacks into y registry
|
||||
ks_hash_create(&dht->registry_type, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
|
||||
ks_dht2_register_type(dht, "q", ks_dht2_process_query);
|
||||
// @todo ks_hash_insert the r/e callbacks into type registry
|
||||
|
||||
ks_hash_create(&dht->registry_query, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
|
||||
ks_dht2_register_query(dht, "ping", ks_dht2_process_query_ping);
|
||||
|
||||
dht->bind_ipv4 = KS_FALSE;
|
||||
dht->bind_ipv6 = KS_FALSE;
|
||||
@@ -116,9 +120,13 @@ KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht)
|
||||
dht->bind_ipv4 = KS_FALSE;
|
||||
dht->bind_ipv6 = KS_FALSE;
|
||||
|
||||
if (dht->registry_y) {
|
||||
ks_hash_destroy(&dht->registry_y);
|
||||
dht->registry_y = NULL;
|
||||
if (dht->registry_type) {
|
||||
ks_hash_destroy(&dht->registry_type);
|
||||
dht->registry_type = NULL;
|
||||
}
|
||||
if (dht->registry_query) {
|
||||
ks_hash_destroy(&dht->registry_query);
|
||||
dht->registry_query = NULL;
|
||||
}
|
||||
|
||||
ks_dht2_nodeid_deinit(&dht->nodeid);
|
||||
@@ -129,13 +137,25 @@ KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht)
|
||||
/**
|
||||
*
|
||||
*/
|
||||
KS_DECLARE(ks_status_t) ks_dht2_register_y(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback)
|
||||
KS_DECLARE(ks_status_t) ks_dht2_register_type(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback)
|
||||
{
|
||||
ks_assert(dht);
|
||||
ks_assert(value);
|
||||
ks_assert(callback);
|
||||
|
||||
return ks_hash_insert(dht->registry_y, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
|
||||
return ks_hash_insert(dht->registry_type, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
KS_DECLARE(ks_status_t) ks_dht2_register_query(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback)
|
||||
{
|
||||
ks_assert(dht);
|
||||
ks_assert(value);
|
||||
ks_assert(callback);
|
||||
|
||||
return ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -151,10 +171,7 @@ KS_DECLARE(ks_status_t) ks_dht2_bind(ks_dht2_t *dht, const ks_sockaddr_t *addr)
|
||||
ks_assert(addr);
|
||||
ks_assert(addr->family == AF_INET || addr->family == AF_INET6);
|
||||
ks_assert(addr->port);
|
||||
|
||||
//if (!addr->port) {
|
||||
// addr->port = KS_DHT_DEFAULT_PORT;
|
||||
//}
|
||||
|
||||
|
||||
dht->bind_ipv4 |= addr->family == AF_INET;
|
||||
dht->bind_ipv6 |= addr->family == AF_INET6;
|
||||
@@ -281,7 +298,7 @@ KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr)
|
||||
return KS_STATUS_FAIL;
|
||||
}
|
||||
|
||||
if (!(callback = (ks_dht2_registry_callback_t)(intptr_t)ks_hash_search(dht->registry_y, message.type, KS_UNLOCKED))) {
|
||||
if (!(callback = (ks_dht2_registry_callback_t)(intptr_t)ks_hash_search(dht->registry_type, message.type, KS_UNLOCKED))) {
|
||||
ks_log(KS_LOG_DEBUG, "Message type '%s' is not registered\n", message.type);
|
||||
} else {
|
||||
ret = callback(dht, raddr, &message);
|
||||
@@ -292,6 +309,101 @@ KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr)
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message)
|
||||
{
|
||||
struct bencode *q;
|
||||
struct bencode *a;
|
||||
const char *qv;
|
||||
ks_size_t qv_len;
|
||||
char query[KS_DHT_MESSAGE_QUERY_MAX_SIZE];
|
||||
ks_dht2_registry_callback_t callback;
|
||||
ks_status_t ret = KS_STATUS_FAIL;
|
||||
|
||||
ks_assert(dht);
|
||||
ks_assert(raddr);
|
||||
ks_assert(message);
|
||||
|
||||
q = ben_dict_get_by_str(message->data, "q");
|
||||
if (!q) {
|
||||
ks_log(KS_LOG_DEBUG, "Message missing required key 'q'\n");
|
||||
return KS_STATUS_FAIL;
|
||||
}
|
||||
|
||||
qv = ben_str_val(q);
|
||||
qv_len = ben_str_len(q);
|
||||
if (qv_len >= KS_DHT_MESSAGE_QUERY_MAX_SIZE) {
|
||||
ks_log(KS_LOG_DEBUG, "Message 'q' value has an unexpectedly large size of %d\n", qv_len);
|
||||
return KS_STATUS_FAIL;
|
||||
}
|
||||
|
||||
memcpy(query, qv, qv_len);
|
||||
query[qv_len] = '\0';
|
||||
ks_log(KS_LOG_DEBUG, "Message query is '%s'\n", query);
|
||||
|
||||
a = ben_dict_get_by_str(message->data, "a");
|
||||
if (!a) {
|
||||
ks_log(KS_LOG_DEBUG, "Message missing required key 'a'\n");
|
||||
return KS_STATUS_FAIL;
|
||||
}
|
||||
|
||||
message->args = a;
|
||||
|
||||
if (!(callback = (ks_dht2_registry_callback_t)(intptr_t)ks_hash_search(dht->registry_query, query, KS_UNLOCKED))) {
|
||||
ks_log(KS_LOG_DEBUG, "Message query '%s' is not registered\n", query);
|
||||
} else {
|
||||
ret = callback(dht, raddr, message);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
KS_DECLARE(ks_status_t) ks_dht2_process_query_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message)
|
||||
{
|
||||
struct bencode *id;
|
||||
const char *idv;
|
||||
ks_size_t idv_len;
|
||||
ks_dht2_nodeid_t nid;
|
||||
|
||||
ks_assert(dht);
|
||||
ks_assert(raddr);
|
||||
ks_assert(message);
|
||||
ks_assert(message->args);
|
||||
|
||||
id = ben_dict_get_by_str(message->args, "id");
|
||||
if (!id) {
|
||||
ks_log(KS_LOG_DEBUG, "Message args missing required key 'id'\n");
|
||||
return KS_STATUS_FAIL;
|
||||
}
|
||||
|
||||
idv = ben_str_val(id);
|
||||
idv_len = ben_str_len(id);
|
||||
if (idv_len != KS_DHT_NODEID_LENGTH) {
|
||||
ks_log(KS_LOG_DEBUG, "Message args 'id' value has an unexpected size of %d\n", idv_len);
|
||||
return KS_STATUS_FAIL;
|
||||
}
|
||||
|
||||
if (ks_dht2_nodeid_prealloc(&nid, dht->pool) != KS_STATUS_SUCCESS) {
|
||||
return KS_STATUS_FAIL;
|
||||
}
|
||||
|
||||
if (ks_dht2_nodeid_init(&nid, (const uint8_t *)idv) != KS_STATUS_SUCCESS) {
|
||||
return KS_STATUS_FAIL;
|
||||
}
|
||||
|
||||
//ks_log(KS_LOG_DEBUG, "Message query ping id is '%s'\n", id->id);
|
||||
ks_log(KS_LOG_DEBUG, "Mesage query ping is valid\n");
|
||||
|
||||
ks_dht2_nodeid_deinit(&nid);
|
||||
|
||||
return KS_STATUS_SUCCESS;
|
||||
}
|
||||
|
||||
/* For Emacs:
|
||||
* Local Variables:
|
||||
* mode:c
|
||||
|
@@ -21,7 +21,8 @@ struct ks_dht2_s {
|
||||
|
||||
ks_dht2_nodeid_t nodeid;
|
||||
|
||||
ks_hash_t *registry_y;
|
||||
ks_hash_t *registry_type;
|
||||
ks_hash_t *registry_query;
|
||||
|
||||
ks_bool_t bind_ipv4;
|
||||
ks_bool_t bind_ipv6;
|
||||
@@ -51,7 +52,8 @@ KS_DECLARE(ks_status_t) ks_dht2_bind(ks_dht2_t *dht, const ks_sockaddr_t *addr);
|
||||
KS_DECLARE(ks_status_t) ks_dht2_pulse(ks_dht2_t *dht, int32_t timeout);
|
||||
|
||||
|
||||
KS_DECLARE(ks_status_t) ks_dht2_register_y(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback);
|
||||
KS_DECLARE(ks_status_t) ks_dht2_register_type(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback);
|
||||
KS_DECLARE(ks_status_t) ks_dht2_register_query(ks_dht2_t *dht, const char *value, ks_dht2_registry_callback_t callback);
|
||||
|
||||
|
||||
KS_END_EXTERN_C
|
||||
|
@@ -59,6 +59,8 @@ KS_DECLARE(ks_status_t) ks_dht2_message_init(ks_dht2_message_t *message, const u
|
||||
ks_assert(message->pool);
|
||||
ks_assert(buffer);
|
||||
|
||||
message->args = NULL;
|
||||
|
||||
message->data = ben_decode((const void *)buffer, buffer_length);
|
||||
if (!message->data) {
|
||||
ks_log(KS_LOG_DEBUG, "Message cannot be decoded\n");
|
||||
@@ -117,6 +119,7 @@ KS_DECLARE(ks_status_t) ks_dht2_message_deinit(ks_dht2_message_t *message)
|
||||
{
|
||||
ks_assert(message);
|
||||
|
||||
message->args = NULL;
|
||||
message->type[0] = '\0';
|
||||
message->transactionid_length = 0;
|
||||
if (message->data) {
|
||||
|
@@ -7,6 +7,7 @@ KS_BEGIN_EXTERN_C
|
||||
|
||||
#define KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE 20
|
||||
#define KS_DHT_MESSAGE_TYPE_MAX_SIZE 20
|
||||
#define KS_DHT_MESSAGE_QUERY_MAX_SIZE 20
|
||||
|
||||
typedef struct ks_dht2_message_s ks_dht2_message_t;
|
||||
struct ks_dht2_message_s {
|
||||
@@ -15,6 +16,7 @@ struct ks_dht2_message_s {
|
||||
uint8_t transactionid[KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE];
|
||||
ks_size_t transactionid_length;
|
||||
char type[KS_DHT_MESSAGE_TYPE_MAX_SIZE];
|
||||
struct bencode *args;
|
||||
};
|
||||
|
||||
KS_DECLARE(ks_status_t) ks_dht2_message_alloc(ks_dht2_message_t **message, ks_pool_t *pool);
|
||||
|
Reference in New Issue
Block a user