From 64a44ed3a56a7cb1978db6479c8142ae11ef8323 Mon Sep 17 00:00:00 2001 From: colm Date: Tue, 20 Dec 2016 19:50:59 -0500 Subject: [PATCH] FS-9775: Improve dht route table query performance --- libs/libks/src/dht/ks_dht_bucket.c | 117 +++++++++++++++++++-------- libs/libks/test/testbuckets.c | 124 +++++++++++++++++++++++++++-- 2 files changed, 201 insertions(+), 40 deletions(-) diff --git a/libs/libks/src/dht/ks_dht_bucket.c b/libs/libks/src/dht/ks_dht_bucket.c index f0d2bbadd0..1e9f24e313 100644 --- a/libs/libks/src/dht/ks_dht_bucket.c +++ b/libs/libks/src/dht/ks_dht_bucket.c @@ -81,6 +81,8 @@ typedef struct ks_dhtrt_bucket_header_s { struct ks_dhtrt_bucket_header_s * parent; struct ks_dhtrt_bucket_header_s * left; struct ks_dhtrt_bucket_header_s * right; + struct ks_dhtrt_bucket_header_s * left1bit; + struct ks_dhtrt_bucket_header_s * right1bit; ks_dhtrt_bucket_t * bucket; ks_time_t tyme; /* last processed time */ unsigned char mask[KS_DHT_NODEID_SIZE]; /* node id mask */ @@ -354,7 +356,6 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no #ifdef KS_DHT_DEBUGLOCKPRINTF_ char buf[100]; ks_log(KS_LOG_DEBUG, "Insert node: LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); - //fflush(stdout); #endif ks_rwl_write_lock(bucket->lock); @@ -365,10 +366,10 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no /* first - seek a stale entry to eject */ if (bucket->expired_count) { ks_status_t s = ks_dhtrt_insert_id(bucket, node); + if (s == KS_STATUS_SUCCESS) { #ifdef KS_DHT_DEBUGLOCKPRINTF_ ks_log(KS_LOG_DEBUG, "insert node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); - //fflush(stdout); #endif ks_rwl_write_unlock(bucket->lock); ks_rwl_write_unlock(internal->lock); @@ -389,7 +390,6 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no #endif #ifdef KS_DHT_DEBUGLOCKPRINTF_ ks_log(KS_LOG_DEBUG, "Insert node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); - //fflush(stdout); #endif ks_rwl_write_unlock(bucket->lock); ks_rwl_write_unlock(internal->lock); @@ -408,7 +408,6 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no #endif #ifdef KS_DHT_DEBUGLOCKPRINTF_ ks_log(KS_LOG_DEBUG, "Insert node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); - //fflush(stdout); #endif ks_rwl_write_unlock(bucket->lock); ks_rwl_write_unlock(internal->lock); @@ -435,7 +434,6 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no #ifdef KS_DHT_DEBUGLOCKPRINTF_ ks_log(KS_LOG_DEBUG, "Insert node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->right->mask, buf)); ks_log(KS_LOG_DEBUG, "Insert node: LOCKING bucket %s\n", ks_dhtrt_printableid(newleft->mask, buf)); - //fflush(stdout); #endif ks_rwl_write_lock(bucket->lock); /* lock new bucket */ @@ -460,7 +458,6 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no #ifdef KS_DHT_DEBUGLOCKPRINTF_ ks_log(KS_LOG_DEBUG, "Insert node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); - //fflush(stdout); #endif ks_rwl_write_unlock(bucket->lock); return s; @@ -485,7 +482,6 @@ KS_DECLARE(ks_dht_node_t *) ks_dhtrt_find_node(ks_dhtrt_routetable_t *table, ks_ #ifdef KS_DHT_DEBUGLOCKPRINTF_ char buf[100]; ks_log(KS_LOG_DEBUG, "Find node: read LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); - //fflush(stdout); #endif ks_rwl_read_lock(bucket->lock); @@ -496,7 +492,6 @@ KS_DECLARE(ks_dht_node_t *) ks_dhtrt_find_node(ks_dhtrt_routetable_t *table, ks_ } #ifdef KS_DHT_DEBUGLOCKPRINTF_ ks_log(KS_LOG_DEBUG, "Find node: read UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); - //fflush(stdout); #endif ks_rwl_read_unlock(bucket->lock); } @@ -520,7 +515,6 @@ KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table, ks_dh #ifdef KS_DHT_DEBUGLOCKPRINTF_ char buf[100]; ks_log(KS_LOG_DEBUG, "Touch node: write bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); - //fflush(stdout); #endif ks_dhtrt_bucket_entry_t *e = ks_dhtrt_find_bucketentry(header, nodeid.id); @@ -539,7 +533,6 @@ KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table, ks_dh } #ifdef KS_DHT_DEBUGLOCKPRINTF_ ks_log(KS_LOG_DEBUG, "Touch node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); - //fflush(stdout); #endif ks_rwl_write_unlock(header->bucket->lock); } @@ -581,13 +574,11 @@ KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable_t *table, ks_ static uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt_querynodes_t *query) { - uint8_t max = query->max; uint8_t total = 0; uint8_t cnt; - if (max == 0) return 0; /* sanity checks */ - if (max > KS_DHTRT_MAXQUERYSIZE) { /* enforce the maximum */ - max = KS_DHTRT_MAXQUERYSIZE; + if (query->max == 0) return 0; /* sanity checks */ + if (query->max > KS_DHTRT_MAXQUERYSIZE) { /* enforce the maximum */ query->max = KS_DHTRT_MAXQUERYSIZE; } @@ -597,7 +588,9 @@ uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt #ifdef KS_DHT_DEBUGPRINTF_ char buffer[100]; - ks_log(KS_LOG_DEBUG, "Finding %d closest nodes for nodeid %s\n", max, ks_dhtrt_printableid(query->nodeid.id, buffer)); + ks_log(KS_LOG_DEBUG, "Finding %d closest nodes for nodeid %s\n", + query->max, + ks_dhtrt_printableid(query->nodeid.id, buffer)); ks_log(KS_LOG_DEBUG, " ...starting at mask: %s\n", ks_dhtrt_printableid(header->mask, buffer)); #endif @@ -611,8 +604,8 @@ uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt /* step 1 - look at immediate bucket */ /* --------------------------------- */ + int max = query->max; cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, query->type, query->family, header, &xort0, initid ,max); - max -= cnt; total += cnt; #ifdef KS_DHT_DEBUGPRINTF_ @@ -644,8 +637,8 @@ uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt } } + max = query->count - total; cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, query->type, query->family, header, &xort1, initid ,max); - max -= cnt; total += cnt; #ifdef KS_DHT_DEBUGPRINTF_ @@ -668,56 +661,108 @@ uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt memcpy(rightid, xort1.bheader->mask, KS_DHT_NODEID_SIZE); int insanity = 0; - ks_dhtrt_bucket_header_t *lheader; - ks_dhtrt_bucket_header_t *rheader; + ks_dhtrt_bucket_header_t *lheader = 0; + ks_dhtrt_bucket_header_t *rheader = 0; + ks_dhtrt_bucket_header_t *last_rheader = 0; + ks_dhtrt_bucket_header_t *last_lheader = 0; ks_dhtrt_sortedxors_t *prev = &xort1; ks_dhtrt_sortedxors_t *tofree = 0; ks_dhtrt_sortedxors_t *xortn; ks_dhtrt_sortedxors_t *xortn1; do { + last_lheader = lheader; lheader = 0; + last_rheader = rheader; rheader = 0; xortn = 0; xortn1 = 0; if (leftid[0] != 0xff) { + ks_dhtrt_shiftleft(leftid); - lheader = ks_dhtrt_find_bucketheader(table, leftid); + + if (last_lheader && last_lheader->left1bit) { + lheader = last_lheader->left1bit = ks_dhtrt_find_relatedbucketheader(last_lheader->left1bit, leftid); + } + else { + lheader = ks_dhtrt_find_bucketheader(table, leftid); + if (last_lheader) { + last_lheader->left1bit = lheader; /* remember so we can take a shortcut next query */ + } + } if (lheader) { xortn = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_sortedxors_t)); - if (tofree == 0) tofree = xortn; + if (tofree == 0) { + tofree = xortn; + } prev->next = xortn; prev = xortn; - cnt += ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, query->type, query->family, + max = query->max - total; + cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, query->type, query->family, lheader, xortn, leftid ,max); - max -= cnt; + total += cnt; #ifdef KS_DHT_DEBUGPRINTF_ ks_log(KS_LOG_DEBUG," stage3: seaching left bucket header %s yielded %d nodes, total=%d\n", ks_dhtrt_printableid(lheader->mask, buffer), cnt, total); #endif } +#ifdef KS_DHT_DEBUGPRINTF_ + else { + ks_log(KS_LOG_DEBUG," stage3: failed to find left header %s\n", + ks_dhtrt_printableid(leftid, buffer)); + } +#endif + } - if (max > 0 && rightid[KS_DHT_NODEID_SIZE-1] != 0x00) { + if (rightid[KS_DHT_NODEID_SIZE-1] != 0x00) { + ks_dhtrt_shiftright(rightid); - rheader = ks_dhtrt_find_bucketheader(table, rightid); + + if (last_rheader && last_rheader->right1bit) { + rheader = last_rheader->right1bit = ks_dhtrt_find_relatedbucketheader(last_rheader->right1bit, rightid); + } + else { + rheader = ks_dhtrt_find_bucketheader(table, rightid); + if (rheader == last_rheader) { /* did we get the same bucket header returned */ + rheader = 0; /* yes: we are done on the left hand branch */ + } + else { + if (last_rheader) { + last_rheader->left1bit = rheader; /* remember so we can take a shortcut next query */ + } + } + } if (rheader) { xortn1 = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_sortedxors_t)); + + if (tofree == 0) { + tofree = xortn1; + } + prev->next = xortn1; prev = xortn1; + max = query->max - total; cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, query->type, query->family, rheader, xortn1, rightid , max); - max -= cnt; + total += cnt; #ifdef KS_DHT_DEBUGPRINTF_ ks_log(KS_LOG_DEBUG," stage3: seaching right bucket header %s yielded %d nodes, total=%d\n", ks_dhtrt_printableid(rheader->mask, buffer), cnt, total); #endif } +#ifdef KS_DHT_DEBUGPRINTF_ + else { + ks_log(KS_LOG_DEBUG," stage3: failed to find right header %s\n", + ks_dhtrt_printableid(rightid, buffer)); + } +#endif + } if (!lheader && !rheader) { @@ -729,8 +774,7 @@ uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt if (insanity > 159) { assert(insanity <= 159); } - - } while (max < query->max); + } while (total < query->max); ks_dhtrt_load_query(query, &xort0); @@ -831,6 +875,7 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table) #endif e->flags = DHTPEER_EXPIRED; ++b->expired_count; + e->outstanding_pings = 0; /* extinguish all hope: do not retry again */ continue; } @@ -981,12 +1026,19 @@ KS_DECLARE(void) ks_dhtrt_dump(ks_dhtrt_routetable_t *table, int level) { for (int ix=0; ixentries[ix].inuse == 1) ks_dhtrt_printableid(b->entries[ix].id, buffer); - else strcpy(buffer, ""); - ks_log(KS_LOG_DEBUG, " slot %d: %d %d %s\n", ix, - b->entries[ix].flags, + if (b->entries[ix].inuse == 1) { + ks_dhtrt_printableid(b->entries[ix].id, buffer); + ks_dht_node_t *n = b->entries[ix].gptr; + ks_log(KS_LOG_DEBUG, " slot %d: flags:%d %d type:%d family:%d %s\n", ix, + b->entries[ix].flags, b->entries[ix].outstanding_pings, + n->type, + n->family, buffer); + } + else { + ks_log(KS_LOG_DEBUG, " slot %d: \n", ix); + } } ks_log(KS_LOG_DEBUG, " --------------------------\n\n"); @@ -1312,7 +1364,6 @@ uint8_t ks_dhtrt_findclosest_bucketnodes(ks_dhtrt_nodeid_t id, char buf[100]; ks_log(KS_LOG_DEBUG, "closestbucketnodes: LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); - //fflush(stdout); #endif diff --git a/libs/libks/test/testbuckets.c b/libs/libks/test/testbuckets.c index e5204faf1b..e4df3fe221 100644 --- a/libs/libks/test/testbuckets.c +++ b/libs/libks/test/testbuckets.c @@ -178,33 +178,45 @@ void test03() enum ks_afflags_t both = ifboth; ks_status_t status; + int ipv4_remote = 0; + int ipv4_local = 0; for (int i=0; i<200; ++i) { if (i%10 == 0) { ++nodeid.id[0]; + nodeid.id[1] = 0; } else { ++nodeid.id[1]; } - ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer); - ks_dhtrt_touch_node(rt, nodeid); + ks_status_t s0 = ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer); + if (s0 == KS_STATUS_SUCCESS) { + ks_dhtrt_touch_node(rt, nodeid); + ++ipv4_remote; + } } + for (int i=0; i<2; ++i) { if (i%10 == 0) { ++nodeid.id[0]; + nodeid.id[1] = 0; } else { ++nodeid.id[1]; } - ks_dhtrt_create_node(rt, nodeid, KS_DHT_LOCAL, ipv4, port, &peer); - ks_dhtrt_touch_node(rt, nodeid); + ks_status_t s0 = ks_dhtrt_create_node(rt, nodeid, KS_DHT_LOCAL, ipv4, port, &peer); + if (s0 == KS_STATUS_SUCCESS) { + ks_dhtrt_touch_node(rt, nodeid); + ++ipv4_local; + } } for (int i=0; i<201; ++i) { if (i%10 == 0) { ++nodeid.id[0]; + nodeid.id[1] = 0; } else { ++nodeid.id[1]; @@ -214,8 +226,12 @@ void test03() } + ks_dhtrt_dump(rt, 7); + + int qcount = doquery(rt, nodeid.id, KS_DHT_LOCAL, both); - printf("\n** local query count expected 2, actual %d\n", qcount); fflush(stdout); + printf("\n** local query count expected 2, actual %d, max %d\n", qcount, ipv4_local); fflush(stdout); + qcount = doquery(rt, nodeid.id, KS_DHT_REMOTE, both); printf("\n*** remote query count expected 20, actual %d\n", qcount); fflush(stdout); qcount = doquery(rt, nodeid.id, KS_DHT_BOTH, both); @@ -230,7 +246,7 @@ void test03() printf("\n*** AF_INET6 count expected 20, actual %d\n", qcount); fflush(stdout); qcount = doquery(rt, nodeid.id, KS_DHT_REMOTE, ifv4); - printf("\n*** remote AF_INET query count expected 20, actual %d\n", qcount); fflush(stdout); + printf("\n*** remote AF_INET query count expected 20, actual %d max %d\n", qcount, ipv4_remote); fflush(stdout); qcount = doquery(rt, nodeid.id, KS_DHT_REMOTE, ifv6); printf("\n*** remote AF_INET6 query count expected 20, actual %d\n", qcount); fflush(stdout); @@ -452,6 +468,88 @@ void test06() } +void test30() +{ + printf("*** testbuckets - test03 start\n"); fflush(stdout); + + ks_dht_node_t* peer; + ks_dht_nodeid_t nodeid; + memset(nodeid.id, 0xef, KS_DHT_NODEID_SIZE); + + char ipv6[] = "1234:1234:1234:1234"; + char ipv4[] = "123.123.123.123"; + unsigned short port = 7000; + enum ks_afflags_t both = ifboth; + + ks_status_t status; + int ipv4_remote = 0; + int ipv4_local = 0; + + for (int i=0; i<200; ++i) { + if (i%10 == 0) { + ++nodeid.id[0]; + nodeid.id[1] = 0; + } + else { + ++nodeid.id[1]; + } + ks_status_t s0 = ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer); + if (s0 == KS_STATUS_SUCCESS) { + ks_dhtrt_touch_node(rt, nodeid); + ++ipv4_remote; + } + } + + for (int i=0; i<2; ++i) { + if (i%10 == 0) { + ++nodeid.id[0]; + nodeid.id[1] = 0; + } + else { + ++nodeid.id[1]; + } + + ks_status_t s0 = ks_dhtrt_create_node(rt, nodeid, KS_DHT_LOCAL, ipv4, port, &peer); + if (s0 == KS_STATUS_SUCCESS) { + ks_dhtrt_touch_node(rt, nodeid); + ++ipv4_local; + } + } + + for (int i=0; i<201; ++i) { + if (i%10 == 0) { + ++nodeid.id[0]; + nodeid.id[1] = 0; + } + else { + ++nodeid.id[1]; + } + ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv6, port, &peer); + ks_dhtrt_touch_node(rt, nodeid); + } + + + ks_dhtrt_dump(rt, 7); + + + int qcount = doquery(rt, nodeid.id, KS_DHT_LOCAL, both); + printf("\n** local query count expected 2, actual %d, max %d\n", qcount, ipv4_local); fflush(stdout); + + qcount = doquery(rt, nodeid.id, KS_DHT_LOCAL, both); + printf("\n** local query count expected 2, actual %d, max %d\n", qcount, ipv4_local); fflush(stdout); + + qcount = doquery(rt, nodeid.id, KS_DHT_BOTH, both); + printf("\n** local query count expected 20, actual %d, max %d\n", qcount, ipv4_local); fflush(stdout); + + return; +} + + + + + + + /* test resue of node memory */ void test50() { @@ -610,8 +708,11 @@ int main(int argc, char* argv[]) { ks_dht_create(&dht, NULL, NULL); - ks_thread_pool_create(&tpool, 0, KS_DHT_TPOOL_MAX, KS_DHT_TPOOL_STACK, KS_PRI_NORMAL, KS_DHT_TPOOL_IDLE); + // ks_thread_pool_create(&tpool, 0, KS_DHT_TPOOL_MAX, KS_DHT_TPOOL_STACK, KS_PRI_NORMAL, KS_DHT_TPOOL_IDLE); + + tpool = 0; + ks_status_t status; char *str = NULL; int bytes = 1024; @@ -675,6 +776,15 @@ int main(int argc, char* argv[]) { continue; } + + if (tests[tix] == 30) { + ks_dhtrt_initroute(&rt, dht, pool, tpool); + test30(); + ks_dhtrt_deinitroute(&rt); + continue; + } + + if (tests[tix] == 50) { ks_dhtrt_initroute(&rt, dht, pool, tpool); test50();