From 51c1b7a71962bb87e0995d580877ebed9d40c7be Mon Sep 17 00:00:00 2001 From: colm Date: Mon, 19 Dec 2016 16:14:23 -0500 Subject: [PATCH] FS-9775: Exclude non-active nodes from dhtrt_find_node --- libs/libks/src/dht/ks_dht_bucket.c | 84 +++++++++---- libs/libks/test/testbuckets.c | 183 +++++++++++++++++++++++------ 2 files changed, 204 insertions(+), 63 deletions(-) diff --git a/libs/libks/src/dht/ks_dht_bucket.c b/libs/libks/src/dht/ks_dht_bucket.c index 05828fe17c..5577b49dc9 100644 --- a/libs/libks/src/dht/ks_dht_bucket.c +++ b/libs/libks/src/dht/ks_dht_bucket.c @@ -98,7 +98,7 @@ typedef struct ks_dhtrt_internal_s { ks_time_t last_process_table; ks_mutex_t *deleted_node_lock; ks_dhtrt_deletednode_t *deleted_node; - ks_dhtrt_deletednode_t *free_nodes; + ks_dhtrt_deletednode_t *free_node_ex; uint32_t deleted_count; } ks_dhtrt_internal_t; @@ -184,10 +184,14 @@ void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t *entry); /* debugging */ #define KS_DHT_DEBUGPRINTF_ -/* # define KS_DHT_DEBUGPRINTFX_ very verbose */ -/* # define KS_DHT_DEBUGLOCKPRINTF_ debug locking */ +/* very verbose */ +/* # define KS_DHT_DEBUGPRINTFX_ */ +/* debug locking */ +/* # define KS_DHT_DEBUGLOCKPRINTF_ */ -KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_pool_t *pool, ks_thread_pool_t* tpool) +KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, + ks_pool_t *pool, + ks_thread_pool_t* tpool) { unsigned char initmask[KS_DHT_NODEID_SIZE]; memset(initmask, 0xff, sizeof(initmask)); @@ -232,6 +236,7 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table, unsigned short port, ks_dht_node_t **node) { + ks_dht_node_t *tnode; ks_dhtrt_internal_t* internal = table->internal; ks_rwl_read_lock(internal->lock); /* grab write lock and insert */ @@ -246,13 +251,15 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table, bentry->flags = DHTPEER_ACTIVE; } - (*node) = bentry->gptr; + tnode = bentry->gptr; + ks_rwl_read_lock( tnode->reflock); ks_rwl_read_unlock(internal->lock); + (*node) = tnode; return KS_STATUS_SUCCESS; } ks_rwl_read_unlock(internal->lock); - ks_dht_node_t *tnode = ks_dhtrt_make_node(table); + tnode = ks_dhtrt_make_node(table); tnode->table = table; for (int i = 0; i < 5; ++i) { @@ -274,7 +281,11 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table, } ks_status_t s = ks_dhtrt_insert_node(table, tnode); - + + if (tnode && s == KS_STATUS_SUCCESS) { + ks_rwl_read_lock( tnode->reflock); + } + (*node) = tnode; return s; @@ -466,7 +477,7 @@ KS_DECLARE(ks_dht_node_t *) ks_dhtrt_find_node(ks_dhtrt_routetable_t *table, ks_ #ifdef KS_DHT_DEBUGLOCKPRINTF_ char buf[100]; - ks_log(KS_LOG_DEBUG, "Insert node: read LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); + ks_log(KS_LOG_DEBUG, "Find node: read LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); //fflush(stdout); #endif @@ -477,7 +488,7 @@ KS_DECLARE(ks_dht_node_t *) ks_dhtrt_find_node(ks_dhtrt_routetable_t *table, ks_ ks_rwl_read_lock(node->reflock); } #ifdef KS_DHT_DEBUGLOCKPRINTF_ - ks_log(KS_LOG_DEBUG, "Insert node: read UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); + ks_log(KS_LOG_DEBUG, "Find node: read UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); //fflush(stdout); #endif ks_rwl_read_unlock(bucket->lock); @@ -857,10 +868,19 @@ void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table) ks_dhtrt_deletednode_t *deleted = internal->deleted_node; ks_dhtrt_deletednode_t *prev = NULL, *temp=NULL; +#ifdef KS_DHT_DEBUGPRINTF_ + ks_log(KS_LOG_DEBUG, "ALLOC process_deleted entry: internal->deleted_count %d\n", internal->deleted_count); +#endif + + /* reclaim excess memory */ while(internal->deleted_count > KS_DHTRT_RECYCLE_NODE_THRESHOLD && deleted) { ks_dht_node_t* node = deleted->node; +#ifdef KS_DHT_DEBUGPRINTFX__ + ks_log(KS_LOG_DEBUG, "ALLOC process_deleted entry: try write lock\n"); +#endif + if (ks_rwl_try_write_lock(node->reflock) == KS_STATUS_SUCCESS) { ks_rwl_destroy(&(node->reflock)); ks_pool_free(table->pool, &node); @@ -868,7 +888,9 @@ void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table) deleted = deleted->next; ks_pool_free(table->pool, &temp); --internal->deleted_count; - +#ifdef KS_DHT_DEBUGPRINTFX_ + ks_log(KS_LOG_DEBUG, "ALLOC process_deleted: internal->deleted_count %d\n", internal->deleted_count); +#endif if (prev != NULL) { prev->next = deleted; } @@ -878,10 +900,18 @@ void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table) } else { +#ifdef KS_DHT_DEBUGPRINTFX__ + ks_log(KS_LOG_DEBUG, "ALLOC process_deleted entry: try write lock failed\n"); +#endif prev = deleted; deleted = prev->next; } } + +#ifdef KS_DHT_DEBUGPRINTF_ + ks_log(KS_LOG_DEBUG, "ALLOC process_deleted exit: internal->deleted_count %d\n", internal->deleted_count); +#endif + ks_mutex_unlock(internal->deleted_node_lock); } @@ -913,7 +943,7 @@ KS_DECLARE(void) ks_dhtrt_dump(ks_dhtrt_routetable_t *table, int level) { memset(buffer, 0, 100); if (b->entries[ix].inuse == 1) ks_dhtrt_printableid(b->entries[ix].id, buffer); else strcpy(buffer, ""); - ks_log(KS_LOG_DEBUG, " slot %d: %s\n", ix, buffer); + ks_log(KS_LOG_DEBUG, " slot %d: %d %s\n", ix, b->entries[ix].flags, buffer); } ks_log(KS_LOG_DEBUG, " --------------------------\n\n"); @@ -947,8 +977,8 @@ ks_dhtrt_bucket_header_t *ks_dhtrt_create_bucketheader(ks_pool_t *pool, ks_dhtrt #ifdef KS_DHT_DEBUGPRINTF_ char buffer[100]; - ks_log(KS_LOG_DEBUG, "creating bucket header for mask: %s ", ks_dhtrt_printableid(mask, buffer)); - if (parent) ks_log(KS_LOG_DEBUG, "from parent mask: %s ", ks_dhtrt_printableid(parent->mask, buffer)); + ks_log(KS_LOG_DEBUG, "creating bucket header for mask: %s\n", ks_dhtrt_printableid(mask, buffer)); + if (parent) ks_log(KS_LOG_DEBUG, " ... from parent mask: %s\n", ks_dhtrt_printableid(parent->mask, buffer)); printf("\n"); #endif return header; @@ -1049,7 +1079,7 @@ void ks_dhtrt_split_bucket(ks_dhtrt_bucket_header_t *original, char buffer[100]; ks_log(KS_LOG_DEBUG, "\nsplitting bucket orginal: %s\n", ks_dhtrt_printableid(original->mask, buffer)); ks_log(KS_LOG_DEBUG, " into (left) mask: %s size: %d\n", ks_dhtrt_printableid(left->mask, buffer), left->bucket->count); - ks_log(KS_LOG_DEBUG, " and (right) mask: %s size: %d\n\n", ks_dhtrt_printableid(right->mask, buffer), right->bucket->count); + ks_log(KS_LOG_DEBUG, " and (right) mask: %s size: %d\n", ks_dhtrt_printableid(right->mask, buffer), right->bucket->count); #endif return; } @@ -1138,7 +1168,7 @@ ks_dht_node_t *ks_dhtrt_find_nodeid(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t for (int ix=0; ixentries[ix].inuse == 1) { + if ( bucket->entries[ix].inuse == 1 && bucket->entries[ix].flags == DHTPEER_ACTIVE ) { ks_log(KS_LOG_DEBUG, "bucket->entries[%d].id = %s inuse=%x\n", ix, ks_dhtrt_printableid(bucket->entries[ix].id, bufferx), bucket->entries[ix].inuse ); @@ -1162,7 +1192,7 @@ ks_status_t ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id) for (int ix=0; ixentries[%d].id = %s inuse=%c\n", ix, ks_dhtrt_printableid(bucket->entries[ix].id, bufferx), bucket->entries[ix].inuse ); @@ -1315,19 +1345,22 @@ void ks_dhtrt_queue_node_fordelete(ks_dhtrt_routetable_t* table, ks_dht_node_t* { ks_dhtrt_internal_t* internal = table->internal; ks_mutex_lock(internal->deleted_node_lock); - ks_dhtrt_deletednode_t* deleted = internal->free_nodes; + ks_dhtrt_deletednode_t* deleted = internal->free_node_ex; /* grab a free stub */ if (deleted) { - internal->free_nodes = deleted->next; + internal->free_node_ex = deleted->next; } else { deleted = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_deletednode_t)); } deleted->node = node; - deleted->next = internal->deleted_node; - internal->deleted_node = deleted; + deleted->next = internal->deleted_node; + internal->deleted_node = deleted; /* add to deleted queue */ ++internal->deleted_count; +#ifdef KS_DHT_DEBUGPRINTFX_ + ks_log(KS_LOG_DEBUG, "ALLOC: Queue for delete %d\n", internal->deleted_count); +#endif ks_mutex_unlock(internal->deleted_node_lock); } @@ -1340,12 +1373,15 @@ ks_dht_node_t* ks_dhtrt_make_node(ks_dhtrt_routetable_t* table) /* to to reuse a deleted node */ if (internal->deleted_count) { ks_dhtrt_deletednode_t *deleted = internal->deleted_node; - node = deleted->node; + node = deleted->node; /* take the node */ memset(node, 0, sizeof(ks_dht_node_t)); + deleted->node = 0; /* avoid accidents */ internal->deleted_node = deleted->next; - deleted->next = internal->free_nodes; - internal->free_nodes = deleted; - --internal->deleted_count; + deleted->next = internal->free_node_ex; /* save the stub for reuse */ + --internal->deleted_count; +#ifdef KS_DHT_DEBUGPRINTFX_ + ks_log(KS_LOG_DEBUG, "ALLOC: Reusing a node struct %d\n", internal->deleted_count); +#endif } ks_mutex_unlock(internal->deleted_node_lock); diff --git a/libs/libks/test/testbuckets.c b/libs/libks/test/testbuckets.c index 2793542141..43fd1b2136 100644 --- a/libs/libks/test/testbuckets.c +++ b/libs/libks/test/testbuckets.c @@ -447,27 +447,108 @@ void test06() } +/* test resue of node memory */ +void test50() +{ + printf("*** testbuckets - test50 start\n"); fflush(stdout); + + ks_dht_node_t* peer; + ks_dht_nodeid_t nodeid, nodeid2; + memset(nodeid.id, 0xef, KS_DHT_NODEID_SIZE); + memset(nodeid2.id, 0xef, KS_DHT_NODEID_SIZE); + + char ipv6[] = "1234:1234:1234:1234"; + char ipv4[] = "123.123.123.123"; + unsigned short port = 7000; + enum ks_afflags_t both = ifboth; + + ks_status_t status; + + for (int i=0,i2=0; i<200; ++i, ++i2) { + if (i%20 == 0) { + nodeid.id[0] = nodeid.id[0] / 2; + if(i2%20 == 0) { + i2 = 0; + nodeid.id[1] = nodeid.id[1] / 2; + } + else { + ++nodeid.id[2]; + } + } + else { + ++nodeid.id[1]; + } + ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer); + ks_dhtrt_touch_node(rt, nodeid); + } + + memset(nodeid.id, 0xef, KS_DHT_NODEID_SIZE); + for (int i=0,i2=0; i<200; ++i, ++i2) { + if (i%20 == 0) { + nodeid.id[0] = nodeid.id[0] / 2; + if(i2%20 == 0) { + i2 = 0; + nodeid.id[1] = nodeid.id[1] / 2; + } + else { + ++nodeid.id[2]; + } + } + else { + ++nodeid.id[1]; + } + ks_dht_node_t *n = ks_dhtrt_find_node(rt, nodeid); + if (n != NULL) { + ks_dhtrt_release_node(n); + ks_dhtrt_delete_node(rt, n); + } + } + + ks_dhtrt_process_table(rt); + + memset(nodeid.id, 0xef, KS_DHT_NODEID_SIZE); + + for (int i=0,i2=0; i<200; ++i, ++i2) { + if (i%20 == 0) { + nodeid.id[0] = nodeid.id[0] / 2; + if(i2%20 == 0) { + i2 = 0; + nodeid.id[1] = nodeid.id[1] / 2; + } + else { + ++nodeid.id[2]; + } + } + else { + ++nodeid.id[1]; + } + ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer); + ks_dhtrt_touch_node(rt, nodeid); + } + + printf("*** testbuckets - test50 start\n"); fflush(stdout); + return; +} + + + int main(int argc, char* argv[]) { printf("testdhtbuckets - start\n"); - int tests[10]; + int tests[100]; if (argc == 0) { + tests[0] = 1; tests[1] = 1; tests[2] = 1; tests[3] = 1; tests[4] = 1; - tests[5] = 1; - tests[6] = 0; - tests[7] = 0; - tests[8] = 0; - tests[9] = 0; } else { - for(int tix=1; tix<10 && tix