FS-9775: add threadpool to init_routetable

This commit is contained in:
colm 2016-12-16 21:06:38 -05:00 committed by Mike Jerris
parent df61ab87bf
commit 2c3b074897
3 changed files with 68 additions and 35 deletions

View File

@ -533,7 +533,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
* If the route table for the family doesn't exist yet, initialize a new route table and create a local node for the endpoint.
*/
if (ep->addr.family == AF_INET) {
if (!dht->rt_ipv4 && (ret = ks_dhtrt_initroute(&dht->rt_ipv4, dht->pool)) != KS_STATUS_SUCCESS) goto done;
if (!dht->rt_ipv4 && (ret = ks_dhtrt_initroute(&dht->rt_ipv4, dht->pool, dht->tpool)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dhtrt_create_node(dht->rt_ipv4,
ep->nodeid,
KS_DHT_LOCAL,
@ -541,7 +541,7 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid
ep->addr.port,
&ep->node)) != KS_STATUS_SUCCESS) goto done;
} else {
if (!dht->rt_ipv6 && (ret = ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool)) != KS_STATUS_SUCCESS) goto done;
if (!dht->rt_ipv6 && (ret = ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool, dht->tpool)) != KS_STATUS_SUCCESS) goto done;
if ((ret = ks_dhtrt_create_node(dht->rt_ipv6,
ep->nodeid,
KS_DHT_LOCAL,

View File

@ -368,7 +368,7 @@ KS_DECLARE(ks_status_t) ks_dht_message_error(ks_dht_message_t *message,
* route table methods
*
*/
KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_pool_t *pool);
KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_pool_t *pool, ks_thread_pool_t* tpool);
KS_DECLARE(void) ks_dhtrt_deinitroute(ks_dhtrt_routetable_t **table);
KS_DECLARE(ks_status_t) ks_dhtrt_create_node(ks_dhtrt_routetable_t* table,

View File

@ -42,7 +42,7 @@
#define KS_DHTRT_INACTIVETIME (15*60)
#define KS_DHTRT_MAXPING 3
#define KS_DHTRT_PROCESSTABLE_INTERVAL (5*60)
#define KS_DHTRT_RECYCLE_NODE_THRESHOLD 100
/* peer flags */
#define DHTPEER_DUBIOUS 0
@ -61,6 +61,7 @@ typedef struct ks_dhtrt_bucket_entry_s {
uint8_t inuse;
uint8_t outstanding_pings;
uint8_t flags; /* active, suspect, expired */
uint8_t touched; /* did we ever get a touch */
} ks_dhtrt_bucket_entry_t;
typedef struct ks_dhtrt_bucket_s {
@ -92,10 +93,12 @@ typedef struct ks_dhtrt_deletednode_s {
typedef struct ks_dhtrt_internal_s {
uint8_t localid[KS_DHT_NODEID_SIZE];
ks_dhtrt_bucket_header_t *buckets; /* root bucketheader */
ks_thread_pool_t *tpool;
ks_rwl_t *lock; /* lock for safe traversal of the tree */
ks_time_t last_process_table;
ks_mutex_t *deleted_node_lock;
ks_dhtrt_deletednode_t *deleted_node;
ks_dhtrt_deletednode_t *free_nodes;
uint32_t deleted_count;
} ks_dhtrt_internal_t;
@ -148,6 +151,8 @@ void ks_dhtrt_queue_node_fordelete(ks_dhtrt_routetable_t *table, ks_dht_node_t*
static
void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table);
static
ks_dht_node_t *ks_dhtrt_make_node(ks_dhtrt_routetable_t *table);
static
ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *node);
static
@ -182,7 +187,7 @@ void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t *entry);
/* # define KS_DHT_DEBUGPRINTFX_ very verbose */
/* # define KS_DHT_DEBUGLOCKPRINTF_ debug locking */
KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_pool_t *pool)
KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_pool_t *pool, ks_thread_pool_t* tpool)
{
unsigned char initmask[KS_DHT_NODEID_SIZE];
memset(initmask, 0xff, sizeof(initmask));
@ -192,6 +197,7 @@ KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_po
ks_dhtrt_internal_t *internal = ks_pool_alloc(pool, sizeof(ks_dhtrt_internal_t));
ks_rwl_create(&internal->lock, pool);
internal->tpool = tpool;
ks_mutex_create(&internal->deleted_node_lock, KS_MUTEX_FLAG_DEFAULT, pool);
table->internal = internal;
@ -235,14 +241,18 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table,
ks_dhtrt_bucket_entry_t *bentry = ks_dhtrt_find_bucketentry(header, nodeid.id);
if (bentry != 0) {
bentry->tyme = ks_time_now_sec();
if (bentry->touched) {
bentry->flags = DHTPEER_ACTIVE;
}
(*node) = bentry->gptr;
ks_rwl_read_unlock(internal->lock);
return KS_STATUS_SUCCESS;
}
ks_rwl_read_unlock(internal->lock);
/* @todo - replace with reusable memory pool */
ks_dht_node_t *tnode = ks_pool_alloc(table->pool, sizeof(ks_dht_node_t));
ks_dht_node_t *tnode = ks_dhtrt_make_node(table);
tnode->table = table;
for (int i = 0; i < 5; ++i) {
@ -297,23 +307,12 @@ KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t *table, ks_dh
ks_rwl_write_unlock(bucket->lock);
}
}
ks_rwl_read_unlock(internal->lock); /* release write lock */
/* at this point no subsequent find/query will return the node - so we can
safely free it if we can grab the write lock
Having held the write lock on the bucket we know no other thread
is awaiting a read/write lock on the node
*/
ks_rwl_read_unlock(internal->lock); /* release write lock */
/* at this point no subsequent find/query will return the node */
if (ks_rwl_try_write_lock(node->reflock) == KS_STATUS_SUCCESS) { /* grab exclusive lock on node */
ks_rwl_destroy(&(node->reflock));
ks_pool_free(table->pool, &node);
}
else {
ks_dhtrt_queue_node_fordelete(table, node);
}
ks_dhtrt_queue_node_fordelete(table, node);
return s;
}
@ -511,6 +510,7 @@ KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table, ks_dh
if (e != 0) {
e->tyme = ks_time_now_sec();
e->outstanding_pings = 0;
e->touched = 1;
if (e->flags == DHTPEER_EXPIRED) {
--header->bucket->expired_count;
@ -758,10 +758,13 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
ks_dhtrt_internal_t *internal = table->internal;
ks_time_t t0 = ks_time_now_sec();
if (t0 - internal->last_process_table < KS_DHTRT_PROCESSTABLE_INTERVAL) {
return;
}
ks_log(KS_LOG_DEBUG,"process_table in progress\n");
ks_rwl_read_lock(internal->lock); /* grab read lock */
ks_dhtrt_bucket_header_t *header = internal->buckets;
@ -780,7 +783,6 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
#ifdef KS_DHT_DEBUGLOCKPRINTF_
char buf[100];
ks_log(KS_LOG_DEBUG,"process_table: LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf));
//fflush(stdout);
#endif
for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
@ -819,7 +821,6 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
#ifdef KS_DHT_DEBUGLOCKPRINTF_
char buf1[100];
ks_log(KS_LOG_DEBUG,"process_table: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf1));
//fflush(stdout);
#endif
ks_rwl_write_unlock(b->lock);
@ -829,7 +830,6 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
#ifdef KS_DHT_DEBUGPRINTF_
char buf2[100];
ks_log(KS_LOG_DEBUG,"process_table: unble to LOCK bucket %s\n", ks_dhtrt_printableid(header->mask, buf2));
//fflush(stdout);
#endif
}
}
@ -857,7 +857,8 @@ void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table)
ks_dhtrt_deletednode_t *deleted = internal->deleted_node;
ks_dhtrt_deletednode_t *prev = NULL, *temp=NULL;
while(deleted) {
/* reclaim excess memory */
while(internal->deleted_count > KS_DHTRT_RECYCLE_NODE_THRESHOLD && deleted) {
ks_dht_node_t* node = deleted->node;
if (ks_rwl_try_write_lock(node->reflock) == KS_STATUS_SUCCESS) {
@ -867,20 +868,20 @@ void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table)
deleted = deleted->next;
ks_pool_free(table->pool, &temp);
--internal->deleted_count;
if (prev != NULL) {
prev->next = deleted;
}
else {
internal->deleted_node = deleted;
}
}
}
else {
prev = deleted;
deleted = prev->next;
}
}
ks_mutex_unlock(internal->deleted_node_lock);
}
@ -1313,15 +1314,47 @@ uint8_t ks_dhtrt_load_query(ks_dhtrt_querynodes_t *query, ks_dhtrt_sortedxors_t
void ks_dhtrt_queue_node_fordelete(ks_dhtrt_routetable_t* table, ks_dht_node_t* node)
{
ks_dhtrt_internal_t* internal = table->internal;
ks_dhtrt_deletednode_t* deleted = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_deletednode_t));
deleted->node = node;
ks_mutex_lock(internal->deleted_node_lock);
deleted->next = internal->deleted_node;
internal->deleted_node = deleted;
++internal->deleted_count;
ks_mutex_unlock(internal->deleted_node_lock);
ks_mutex_lock(internal->deleted_node_lock);
ks_dhtrt_deletednode_t* deleted = internal->free_nodes;
if (deleted) {
internal->free_nodes = deleted->next;
}
else {
deleted = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_deletednode_t));
}
deleted->node = node;
deleted->next = internal->deleted_node;
internal->deleted_node = deleted;
++internal->deleted_count;
ks_mutex_unlock(internal->deleted_node_lock);
}
ks_dht_node_t* ks_dhtrt_make_node(ks_dhtrt_routetable_t* table)
{
ks_dht_node_t *node = NULL;
ks_dhtrt_internal_t *internal = table->internal;
ks_mutex_lock(internal->deleted_node_lock);
/* to to reuse a deleted node */
if (internal->deleted_count) {
ks_dhtrt_deletednode_t *deleted = internal->deleted_node;
node = deleted->node;
memset(node, 0, sizeof(ks_dht_node_t));
internal->deleted_node = deleted->next;
deleted->next = internal->free_nodes;
internal->free_nodes = deleted;
--internal->deleted_count;
}
ks_mutex_unlock(internal->deleted_node_lock);
if (!node) {
node = ks_pool_alloc(table->pool, sizeof(ks_dht_node_t));
}
return node;
}
void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t *entry) {
++entry->outstanding_pings;