diff --git a/libs/libks/src/dht/ks_dht.c b/libs/libks/src/dht/ks_dht.c index 0d53283b02..a1b4c65fc3 100644 --- a/libs/libks/src/dht/ks_dht.c +++ b/libs/libks/src/dht/ks_dht.c @@ -533,7 +533,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid * If the route table for the family doesn't exist yet, initialize a new route table and create a local node for the endpoint. */ if (ep->addr.family == AF_INET) { - if (!dht->rt_ipv4 && (ret = ks_dhtrt_initroute(&dht->rt_ipv4, dht->pool)) != KS_STATUS_SUCCESS) goto done; + if (!dht->rt_ipv4 && (ret = ks_dhtrt_initroute(&dht->rt_ipv4, dht->pool, dht->tpool)) != KS_STATUS_SUCCESS) goto done; if ((ret = ks_dhtrt_create_node(dht->rt_ipv4, ep->nodeid, KS_DHT_LOCAL, @@ -541,7 +541,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid ep->addr.port, &ep->node)) != KS_STATUS_SUCCESS) goto done; } else { - if (!dht->rt_ipv6 && (ret = ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool)) != KS_STATUS_SUCCESS) goto done; + if (!dht->rt_ipv6 && (ret = ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool, dht->tpool)) != KS_STATUS_SUCCESS) goto done; if ((ret = ks_dhtrt_create_node(dht->rt_ipv6, ep->nodeid, KS_DHT_LOCAL, diff --git a/libs/libks/src/dht/ks_dht.h b/libs/libks/src/dht/ks_dht.h index 59311de801..c9f60ef181 100644 --- a/libs/libks/src/dht/ks_dht.h +++ b/libs/libks/src/dht/ks_dht.h @@ -368,7 +368,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_error(ks_dht_message_t *message, * route table methods * */ -KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_pool_t *pool); +KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_pool_t *pool, ks_thread_pool_t* tpool); KS_DECLARE(void) ks_dhtrt_deinitroute(ks_dhtrt_routetable_t **table); KS_DECLARE(ks_status_t) ks_dhtrt_create_node(ks_dhtrt_routetable_t* table, diff --git a/libs/libks/src/dht/ks_dht_bucket.c b/libs/libks/src/dht/ks_dht_bucket.c index 8d7ef6defc..05828fe17c 100644 --- a/libs/libks/src/dht/ks_dht_bucket.c +++ b/libs/libks/src/dht/ks_dht_bucket.c @@ -42,7 +42,7 @@ #define KS_DHTRT_INACTIVETIME (15*60) #define KS_DHTRT_MAXPING 3 #define KS_DHTRT_PROCESSTABLE_INTERVAL (5*60) - +#define KS_DHTRT_RECYCLE_NODE_THRESHOLD 100 /* peer flags */ #define DHTPEER_DUBIOUS 0 @@ -61,6 +61,7 @@ typedef struct ks_dhtrt_bucket_entry_s { uint8_t inuse; uint8_t outstanding_pings; uint8_t flags; /* active, suspect, expired */ + uint8_t touched; /* did we ever get a touch */ } ks_dhtrt_bucket_entry_t; typedef struct ks_dhtrt_bucket_s { @@ -92,10 +93,12 @@ typedef struct ks_dhtrt_deletednode_s { typedef struct ks_dhtrt_internal_s { uint8_t localid[KS_DHT_NODEID_SIZE]; ks_dhtrt_bucket_header_t *buckets; /* root bucketheader */ + ks_thread_pool_t *tpool; ks_rwl_t *lock; /* lock for safe traversal of the tree */ ks_time_t last_process_table; ks_mutex_t *deleted_node_lock; ks_dhtrt_deletednode_t *deleted_node; + ks_dhtrt_deletednode_t *free_nodes; uint32_t deleted_count; } ks_dhtrt_internal_t; @@ -148,6 +151,8 @@ void ks_dhtrt_queue_node_fordelete(ks_dhtrt_routetable_t *table, ks_dht_node_t* static void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table); +static +ks_dht_node_t *ks_dhtrt_make_node(ks_dhtrt_routetable_t *table); static ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *node); static @@ -182,7 +187,7 @@ void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t *entry); /* # define KS_DHT_DEBUGPRINTFX_ very verbose */ /* # define KS_DHT_DEBUGLOCKPRINTF_ debug locking */ -KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_pool_t *pool) +KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_pool_t *pool, ks_thread_pool_t* tpool) { unsigned char initmask[KS_DHT_NODEID_SIZE]; memset(initmask, 0xff, sizeof(initmask)); @@ -192,6 +197,7 @@ KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_po ks_dhtrt_internal_t *internal = ks_pool_alloc(pool, sizeof(ks_dhtrt_internal_t)); ks_rwl_create(&internal->lock, pool); + internal->tpool = tpool; ks_mutex_create(&internal->deleted_node_lock, KS_MUTEX_FLAG_DEFAULT, pool); table->internal = internal; @@ -235,14 +241,18 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table, ks_dhtrt_bucket_entry_t *bentry = ks_dhtrt_find_bucketentry(header, nodeid.id); if (bentry != 0) { bentry->tyme = ks_time_now_sec(); + + if (bentry->touched) { + bentry->flags = DHTPEER_ACTIVE; + } + (*node) = bentry->gptr; ks_rwl_read_unlock(internal->lock); return KS_STATUS_SUCCESS; } ks_rwl_read_unlock(internal->lock); - /* @todo - replace with reusable memory pool */ - ks_dht_node_t *tnode = ks_pool_alloc(table->pool, sizeof(ks_dht_node_t)); + ks_dht_node_t *tnode = ks_dhtrt_make_node(table); tnode->table = table; for (int i = 0; i < 5; ++i) { @@ -297,23 +307,12 @@ KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t *table, ks_dh ks_rwl_write_unlock(bucket->lock); } - } - ks_rwl_read_unlock(internal->lock); /* release write lock */ - /* at this point no subsequent find/query will return the node - so we can - safely free it if we can grab the write lock - Having held the write lock on the bucket we know no other thread - is awaiting a read/write lock on the node - */ + ks_rwl_read_unlock(internal->lock); /* release write lock */ + /* at this point no subsequent find/query will return the node */ - if (ks_rwl_try_write_lock(node->reflock) == KS_STATUS_SUCCESS) { /* grab exclusive lock on node */ - ks_rwl_destroy(&(node->reflock)); - ks_pool_free(table->pool, &node); - } - else { - ks_dhtrt_queue_node_fordelete(table, node); - } + ks_dhtrt_queue_node_fordelete(table, node); return s; } @@ -511,6 +510,7 @@ KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table, ks_dh if (e != 0) { e->tyme = ks_time_now_sec(); e->outstanding_pings = 0; + e->touched = 1; if (e->flags == DHTPEER_EXPIRED) { --header->bucket->expired_count; @@ -758,10 +758,13 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table) ks_dhtrt_internal_t *internal = table->internal; ks_time_t t0 = ks_time_now_sec(); + if (t0 - internal->last_process_table < KS_DHTRT_PROCESSTABLE_INTERVAL) { return; } + ks_log(KS_LOG_DEBUG,"process_table in progress\n"); + ks_rwl_read_lock(internal->lock); /* grab read lock */ ks_dhtrt_bucket_header_t *header = internal->buckets; @@ -780,7 +783,6 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table) #ifdef KS_DHT_DEBUGLOCKPRINTF_ char buf[100]; ks_log(KS_LOG_DEBUG,"process_table: LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); - //fflush(stdout); #endif for (int ix=0; ixmask, buf1)); - //fflush(stdout); #endif ks_rwl_write_unlock(b->lock); @@ -829,7 +830,6 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table) #ifdef KS_DHT_DEBUGPRINTF_ char buf2[100]; ks_log(KS_LOG_DEBUG,"process_table: unble to LOCK bucket %s\n", ks_dhtrt_printableid(header->mask, buf2)); - //fflush(stdout); #endif } } @@ -857,7 +857,8 @@ void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table) ks_dhtrt_deletednode_t *deleted = internal->deleted_node; ks_dhtrt_deletednode_t *prev = NULL, *temp=NULL; - while(deleted) { + /* reclaim excess memory */ + while(internal->deleted_count > KS_DHTRT_RECYCLE_NODE_THRESHOLD && deleted) { ks_dht_node_t* node = deleted->node; if (ks_rwl_try_write_lock(node->reflock) == KS_STATUS_SUCCESS) { @@ -867,20 +868,20 @@ void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table) deleted = deleted->next; ks_pool_free(table->pool, &temp); --internal->deleted_count; + if (prev != NULL) { prev->next = deleted; } else { internal->deleted_node = deleted; - } + } + } else { prev = deleted; deleted = prev->next; } - } - ks_mutex_unlock(internal->deleted_node_lock); } @@ -1313,15 +1314,47 @@ uint8_t ks_dhtrt_load_query(ks_dhtrt_querynodes_t *query, ks_dhtrt_sortedxors_t void ks_dhtrt_queue_node_fordelete(ks_dhtrt_routetable_t* table, ks_dht_node_t* node) { ks_dhtrt_internal_t* internal = table->internal; - ks_dhtrt_deletednode_t* deleted = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_deletednode_t)); - deleted->node = node; - ks_mutex_lock(internal->deleted_node_lock); - deleted->next = internal->deleted_node; - internal->deleted_node = deleted; - ++internal->deleted_count; - ks_mutex_unlock(internal->deleted_node_lock); + ks_mutex_lock(internal->deleted_node_lock); + ks_dhtrt_deletednode_t* deleted = internal->free_nodes; + + if (deleted) { + internal->free_nodes = deleted->next; + } + else { + deleted = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_deletednode_t)); + } + + deleted->node = node; + deleted->next = internal->deleted_node; + internal->deleted_node = deleted; + ++internal->deleted_count; + ks_mutex_unlock(internal->deleted_node_lock); } +ks_dht_node_t* ks_dhtrt_make_node(ks_dhtrt_routetable_t* table) +{ + ks_dht_node_t *node = NULL; + ks_dhtrt_internal_t *internal = table->internal; + ks_mutex_lock(internal->deleted_node_lock); + + /* to to reuse a deleted node */ + if (internal->deleted_count) { + ks_dhtrt_deletednode_t *deleted = internal->deleted_node; + node = deleted->node; + memset(node, 0, sizeof(ks_dht_node_t)); + internal->deleted_node = deleted->next; + deleted->next = internal->free_nodes; + internal->free_nodes = deleted; + --internal->deleted_count; + } + ks_mutex_unlock(internal->deleted_node_lock); + + if (!node) { + node = ks_pool_alloc(table->pool, sizeof(ks_dht_node_t)); + } + + return node; +} void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t *entry) { ++entry->outstanding_pings;