diff --git a/libs/libks/src/dht/ks_dht_bucket.c b/libs/libks/src/dht/ks_dht_bucket.c index 4958a7622e..e4815d0a50 100644 --- a/libs/libks/src/dht/ks_dht_bucket.c +++ b/libs/libks/src/dht/ks_dht_bucket.c @@ -39,8 +39,10 @@ /* change for testing */ #define KS_DHT_BUCKETSIZE 20 -#define KS_DHTRT_INACTIVETIME (5*60) +#define KS_DHTRT_INACTIVETIME (15*60) #define KS_DHTRT_MAXPING 3 +#define KS_DHTRT_PROCESSTABLE_INTERVAL (5*60) + /* peer flags */ #define DHTPEER_ACTIVE 1 @@ -91,8 +93,10 @@ typedef struct ks_dhtrt_internal_s { uint8_t localid[KS_DHT_NODEID_SIZE]; ks_dhtrt_bucket_header_t *buckets; /* root bucketheader */ ks_rwl_t *lock; /* lock for safe traversal of the tree */ + ks_time_t last_process_table; ks_mutex_t *deleted_node_lock; ks_dhtrt_deletednode_t *deleted_node; + uint32_t deleted_count; } ks_dhtrt_internal_t; typedef struct ks_dhtrt_xort_s { @@ -176,7 +180,7 @@ void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t *entry); /* debugging */ #define KS_DHT_DEBUGPRINTF_ /* # define KS_DHT_DEBUGPRINTFX_ very verbose */ - +/* # define KS_DHT_DEBUGLOCKPRINTF_ debug locking */ KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_pool_t *pool) { @@ -230,7 +234,8 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table, ks_dhtrt_bucket_entry_t *bentry = ks_dhtrt_find_bucketentry(header, nodeid.id); if (bentry != 0) { - bentry->type = ks_time_now_sec(); + bentry->tyme = ks_time_now_sec(); + bentry->type = type; (*node) = bentry->gptr; ks_rwl_read_unlock(internal->lock); return KS_STATUS_SUCCESS; @@ -255,6 +260,7 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table, if (( ks_addr_set(&tnode->addr, ip, port, tnode->family) != KS_STATUS_SUCCESS) || ( ks_rwl_create(&tnode->reflock, table->pool) != KS_STATUS_SUCCESS)) { ks_pool_free(table->pool, tnode); + ks_rwl_read_unlock(internal->lock); return KS_STATUS_FAIL; } @@ -276,8 +282,18 @@ KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t *table, ks_dh ks_dhtrt_bucket_t *bucket = header->bucket; if (bucket != 0) { /* we found a bucket*/ +#ifdef KS_DHT_DEBUGLOCKPRINTF_ + char buf[100]; + printf("delete node: LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); + fflush(stdout); +#endif ks_rwl_write_lock(bucket->lock); s = ks_dhtrt_delete_id(bucket, node->nodeid.id); +#ifdef KS_DHT_DEBUGLOCKPRINTF_ + printf("delete node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); + fflush(stdout); +#endif + ks_rwl_write_unlock(bucket->lock); } @@ -317,6 +333,11 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no ks_rwl_write_unlock(internal->lock); return KS_STATUS_FAIL; /* we were not able to find a bucket*/ } +#ifdef KS_DHT_DEBUGLOCKPRINTF_ + char buf[100]; + printf("insert node: LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); + fflush(stdout); +#endif ks_rwl_write_lock(bucket->lock); @@ -327,6 +348,10 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no if (bucket->expired_count) { ks_status_t s = ks_dhtrt_insert_id(bucket, node); if (s == KS_STATUS_SUCCESS) { +#ifdef KS_DHT_DEBUGLOCKPRINTF_ + printf("insert node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); + fflush(stdout); +#endif ks_rwl_write_unlock(bucket->lock); ks_rwl_write_unlock(internal->lock); return KS_STATUS_SUCCESS; @@ -341,8 +366,12 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no if ( !(header->flags & BHF_LEFT) ) { /* only the left handside node can be split */ #ifdef KS_DHT_DEBUGPRINTF_ - char buffer[100]; - printf(" nodeid %s was not inserted\n", ks_dhtrt_printableid(node->nodeid.id, buffer)); + char bufx[100]; + printf(" nodeid %s was not inserted\n", ks_dhtrt_printableid(node->nodeid.id, bufx)); +#endif +#ifdef KS_DHT_DEBUGLOCKPRINTF_ + printf("insert node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); + fflush(stdout); #endif ks_rwl_write_unlock(bucket->lock); ks_rwl_write_unlock(internal->lock); @@ -356,8 +385,12 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no if (newmask[KS_DHT_NODEID_SIZE-1] == 0) { /* no more bits to shift - is this possible */ #ifdef KS_DHT_DEBUGPRINTF_ - char buffer[100]; - printf(" nodeid %s was not inserted\n", ks_dhtrt_printableid(node->nodeid.id, buffer)); + char bufx[100]; + printf(" nodeid %s was not inserted\n", ks_dhtrt_printableid(node->nodeid.id, bufx)); +#endif +#ifdef KS_DHT_DEBUGLOCKPRINTF_ + printf("insert node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); + fflush(stdout); #endif ks_rwl_write_unlock(bucket->lock); ks_rwl_write_unlock(internal->lock); @@ -381,6 +414,12 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no /* which bucket do care about */ if (ks_dhtrt_ismasked(node->nodeid.id, newleft->mask)) { bucket = newleft->bucket; +#ifdef KS_DHT_DEBUGLOCKPRINTF_ + printf("insert node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->right->mask, buf)); + printf("insert node: LOCKING bucket %s\n", ks_dhtrt_printableid(newleft->mask, buf)); + fflush(stdout); +#endif + ks_rwl_write_lock(bucket->lock); /* lock new bucket */ ks_rwl_write_unlock(header->right->bucket->lock); /* unlock old bucket */ header = newleft; @@ -398,15 +437,13 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no printf("into bucket %s\n", ks_dhtrt_printableid(header->mask, buffer)); #endif - /* by this point we have a viable & locked bucket - so downgrade the internal lock to read. safe as we hold the bucket write lock - preventing it being sptlit under us. - */ - ks_rwl_write_unlock(internal->lock); - ks_rwl_read_lock(internal->lock); - ks_status_t s = ks_dhtrt_insert_id(bucket, node); - ks_rwl_read_unlock(internal->lock); + ks_rwl_write_unlock(internal->lock); +#ifdef KS_DHT_DEBUGLOCKPRINTF_ + printf("insert node: UNLOCKING bucket %s\n", + ks_dhtrt_printableid(header->mask, buf)); + fflush(stdout); +#endif ks_rwl_write_unlock(bucket->lock); return s; } @@ -427,13 +464,22 @@ KS_DECLARE(ks_dht_node_t *) ks_dhtrt_find_node(ks_dhtrt_routetable_t *table, ks_ if (bucket != 0) { /* probably a logic error ?*/ +#ifdef KS_DHT_DEBUGLOCKPRINTF_ + char buf[100]; + printf("insert node: read LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); + fflush(stdout); +#endif + ks_rwl_read_lock(bucket->lock); node = ks_dhtrt_find_nodeid(bucket, nodeid.id); if (node != NULL) { ks_rwl_read_lock(node->reflock); } - +#ifdef KS_DHT_DEBUGLOCKPRINTF_ + printf("insert node: read UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); + fflush(stdout); +#endif ks_rwl_read_unlock(bucket->lock); } @@ -453,10 +499,16 @@ KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table, ks_dh if (header != 0 && header->bucket != 0) { ks_rwl_write_lock(header->bucket->lock); +#ifdef KS_DHT_DEBUGLOCKPRINTF_ + char buf[100]; + printf("insert node: write bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); + fflush(stdout); +#endif + ks_dhtrt_bucket_entry_t *e = ks_dhtrt_find_bucketentry(header, nodeid.id); if (e != 0) { - e->tyme = ks_time_now(); + e->tyme = ks_time_now_sec(); e->outstanding_pings = 0; if (e->flags == DHTPEER_EXPIRED) { @@ -466,6 +518,10 @@ KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table, ks_dh e->flags = DHTPEER_ACTIVE; s = KS_STATUS_SUCCESS; } +#ifdef KS_DHT_DEBUGLOCKPRINTF_ + printf("insert node: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); + fflush(stdout); +#endif ks_rwl_write_unlock(header->bucket->lock); } ks_rwl_read_unlock(internal->lock); /* release read lock */ @@ -698,12 +754,16 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table) ks_dhtrt_internal_t *internal = table->internal; + ks_time_t t0 = ks_time_now_sec(); + if (t0 - internal->last_process_table < KS_DHTRT_PROCESSTABLE_INTERVAL) { + return; + } + ks_rwl_read_lock(internal->lock); /* grab read lock */ ks_dhtrt_bucket_header_t *header = internal->buckets; ks_dhtrt_bucket_header_t *stack[KS_DHT_NODEID_SIZE * 8]; int stackix=0; - ks_time_t t0 = ks_time_now(); while (header) { stack[stackix++] = header; @@ -715,13 +775,11 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table) if (ks_rwl_try_write_lock(b->lock) == KS_STATUS_SUCCESS) { #ifdef KS_DHT_DEBUGLOCKPRINTF_ - char buf[100]; - printf("process_table: LOCKING bucket %s\n", - ks_dhtrt_printableid(header->mask, buf)); - fflush(stdout); + char buf[100]; + printf("process_table: LOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf)); + fflush(stdout); #endif - for (int ix=0; ixentries[ix]; @@ -757,8 +815,7 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table) #ifdef KS_DHT_DEBUGLOCKPRINTF_ char buf1[100]; - printf("process_table: UNLOCKING bucket %s\n", - ks_dhtrt_printableid(header->mask, buf1)); + printf("process_table: UNLOCKING bucket %s\n", ks_dhtrt_printableid(header->mask, buf1)); fflush(stdout); #endif @@ -767,12 +824,10 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table) } /* end of if trywrite_lock successful */ else { #ifdef KS_DHT_DEBUGPRINTF_ - char buf2[100]; - printf("process_table: unble to LOCK bucket %s\n", - ks_dhtrt_printableid(header->mask, buf2)); - fflush(stdout); + char buf2[100]; + printf("process_table: unble to LOCK bucket %s\n", ks_dhtrt_printableid(header->mask, buf2)); + fflush(stdout); #endif - } } @@ -808,6 +863,7 @@ void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table) temp = deleted; deleted = deleted->next; ks_pool_free(table->pool, temp); + --internal->deleted_count; if (prev != NULL) { prev->next = deleted; } @@ -936,8 +992,7 @@ ks_dhtrt_bucket_entry_t *ks_dhtrt_find_bucketentry(ks_dhtrt_bucket_header_t *hea if (bucket == 0) return NULL; for (int ix=0; ixentries[ix].inuse == 1 && (!memcmp(nodeid, bucket->entries[ix].id, KS_DHT_NODEID_SIZE)) ) { return &(bucket->entries[ix]); @@ -963,7 +1018,9 @@ void ks_dhtrt_split_bucket(ks_dhtrt_bucket_header_t *original, int rix = 0; for ( ; rixentries[rix].id, left->mask)) { + /* move it to the left */ memcpy(dest->entries[lix].id, source->entries[rix].id, KS_DHT_NODEID_SIZE); dest->entries[lix].gptr = source->entries[rix].gptr; @@ -1014,20 +1071,24 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node) uint8_t ix = 0; for (; ixentries[ix].inuse == 0) { + if (free == KS_DHT_BUCKETSIZE) { free = ix; /* use this one */ } + } else if (free == KS_DHT_BUCKETSIZE && bucket->entries[ix].flags == DHTPEER_EXPIRED) { expiredix = ix; } + else if (!memcmp(bucket->entries[ix].id, node->nodeid.id, KS_DHT_NODEID_SIZE)) { #ifdef KS_DHT_DEBUGPRINTF_ char buffer[100]; printf("duplicate peer %s found at %d\n", ks_dhtrt_printableid(node->nodeid.id, buffer), ix); #endif - bucket->entries[ix].tyme = ks_time_now(); + bucket->entries[ix].tyme = ks_time_now_sec(); bucket->entries[ix].flags &= DHTPEER_ACTIVE; return KS_STATUS_SUCCESS; /* already exists */ } @@ -1044,7 +1105,7 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node) bucket->entries[free].gptr = node; bucket->entries[free].type = node->type; bucket->entries[free].family = node->family; - bucket->entries[free].tyme = ks_time_now(); + bucket->entries[free].tyme = ks_time_now_sec(); bucket->entries[free].flags &= DHTPEER_ACTIVE; if (free != expiredix) { /* are we are taking a free slot rather than replacing an expired node? */ @@ -1092,14 +1153,13 @@ static ks_status_t ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id) { #ifdef KS_DHT_DEBUGPRINTF_ - char buffer[100]; printf("\ndeleting node for: %s\n", ks_dhtrt_printableid(id, buffer)); #endif for (int ix=0; ixentries[%d].id = %s inuse=%c\n", ix, ks_dhtrt_printableid(bucket->entries[ix].id, bufferx), bucket->entries[ix].inuse ); @@ -1156,6 +1216,7 @@ uint8_t ks_dhtrt_findclosest_bucketnodes(ks_dhtrt_nodeid_t id, for (uint8_t ix=0; ixentries[ix].inuse == 1 && (family == ifboth || bucket->entries[ix].family == family) && (bucket->entries[ix].type & type) && @@ -1210,6 +1271,7 @@ uint8_t ks_dhtrt_load_query(ks_dhtrt_querynodes_t *query, ks_dhtrt_sortedxors_t { ks_dhtrt_sortedxors_t *current = xort; uint8_t loaded = 0; + while (current) { #ifdef KS_DHT_DEBUGPRINTF_ char buf[100]; @@ -1217,6 +1279,7 @@ uint8_t ks_dhtrt_load_query(ks_dhtrt_querynodes_t *query, ks_dhtrt_sortedxors_t ks_dhtrt_printableid(current->bheader->mask,buf), current->count); #endif int xorix = current->startix; + for (uint8_t ix = 0; ix< current->count && loaded < query->max && xorix != KS_DHT_BUCKETSIZE; ++ix ) { @@ -1225,6 +1288,7 @@ uint8_t ks_dhtrt_load_query(ks_dhtrt_querynodes_t *query, ks_dhtrt_sortedxors_t xorix = current->xort[xorix].nextix; ++loaded; } + #ifdef KS_DHT_DEBUGLOCKPRINTF_ char buf1[100]; printf("load_query: UNLOCKING bucket %s\n", @@ -1249,6 +1313,7 @@ void ks_dhtrt_queue_node_fordelete(ks_dhtrt_routetable_t* table, ks_dht_node_t* ks_mutex_lock(internal->deleted_node_lock); deleted->next = internal->deleted_node; internal->deleted_node = deleted; + ++internal->deleted_count; ks_mutex_unlock(internal->deleted_node_lock); } @@ -1300,34 +1365,6 @@ void ks_dhtrt_shiftleft(uint8_t *id) { return; } -/* Determine whether id1 or id2 is closer to ref */ - -/* - @todo: remove ? simple memcpy seems to do the job ? - - static int - ks_dhtrt_xorcmp(const uint8_t *id1, const uint8_t *id2, const uint8_t *ref); - - static int ks_dhtrt_xorcmp(const uint8_t *id1, const uint8_t *id2, const uint8_t *ref) - { - int i; - for (i = 0; i < KS_DHT_NODEID_SIZE; i++) { - uint8_t xor1, xor2; - if (id1[i] == id2[i]) { - continue; - } - xor1 = id1[i] ^ ref[i]; - xor2 = id2[i] ^ ref[i]; - if (xor1 < xor2) { - return -1; / * id1 is closer * / - } - return 1; / * id2 is closer * / - } - return 0; / * id2 and id2 are identical ! * / - } -*/ - - /* create an xor value from two ids */ static void ks_dhtrt_xor(const uint8_t *id1, const uint8_t *id2, uint8_t *xor) { diff --git a/libs/libks/test/testbuckets.c b/libs/libks/test/testbuckets.c index 0468a2cb0a..43094bdba0 100644 --- a/libs/libks/test/testbuckets.c +++ b/libs/libks/test/testbuckets.c @@ -1,5 +1,6 @@ #pragma GCC diagnostic ignored "-Wunused-but-set-variable" #pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wunused-function" //#include "ks.h" #include "../src/dht/ks_dht.h" @@ -7,6 +8,8 @@ ks_dhtrt_routetable_t* rt; ks_pool_t* pool; +static ks_thread_t *threads[10]; + int doquery(ks_dhtrt_routetable_t* rt, uint8_t* id, enum ks_dht_nodetype_t type, enum ks_afflags_t family) { @@ -22,7 +25,8 @@ int doquery(ks_dhtrt_routetable_t* rt, uint8_t* id, enum ks_dht_nodetype_t type, void test01() { - printf("testbuckets - test01 start\n"); fflush(stdout); + printf("*** testbuckets - test01 start\n"); fflush(stdout); + ks_dhtrt_routetable_t* rt; ks_dhtrt_initroute(&rt, pool); ks_dhtrt_deinitroute(&rt); @@ -66,11 +70,13 @@ void test01() exit(104); } - printf("*** testbuckets - test01 complete\n"); fflush(stdout); + printf("*** testbuckets - test01 complete\n\n\n"); fflush(stdout); } void test02() { + printf("*** testbuckets - test02 start\n"); fflush(stdout); + ks_dht_node_t* peer; ks_dht_nodeid_t nodeid; memset(nodeid.id, 0xef, KS_DHT_NODEID_SIZE); @@ -126,6 +132,8 @@ void test02() qcount = doquery(rt, nodeid.id, KS_DHT_BOTH, ifv4); printf("\n*** AF_INET count expected 4, actual %d\n", qcount); fflush(stdout); + printf("*** testbuckets - test02 finished\n"); fflush(stdout); + return; } @@ -133,6 +141,8 @@ void test02() void test03() { + printf("*** testbuckets - test03 start\n"); fflush(stdout); + ks_dht_node_t* peer; ks_dht_nodeid_t nodeid; memset(nodeid.id, 0xef, KS_DHT_NODEID_SIZE); @@ -177,33 +187,36 @@ void test03() int qcount = doquery(rt, nodeid.id, KS_DHT_LOCAL, both); - printf("\n** local query count expected 3, actual %d\n", qcount); fflush(stdout); + printf("\n** local query count expected 2, actual %d\n", qcount); fflush(stdout); qcount = doquery(rt, nodeid.id, KS_DHT_REMOTE, both); - printf("\n*** remote query count expected 6, actual %d\n", qcount); fflush(stdout); + printf("\n*** remote query count expected 20, actual %d\n", qcount); fflush(stdout); qcount = doquery(rt, nodeid.id, KS_DHT_BOTH, both); - printf("\n*** both query count expected 9, actual %d\n", qcount); fflush(stdout); + printf("\n*** both query count expected 20, actual %d\n", qcount); fflush(stdout); qcount = doquery(rt, nodeid.id, KS_DHT_LOCAL, ifv4); - printf("\n*** local AF_INET query count expected 1, actual %d\n", qcount); fflush(stdout); + printf("\n*** local AF_INET query count expected 2, actual %d\n", qcount); fflush(stdout); qcount = doquery(rt, nodeid.id, KS_DHT_LOCAL, ifv6); - printf("\n*** local AF_INET6 query count expected 2, actual %d\n", qcount); fflush(stdout); + printf("\n*** local AF_INET6 query count expected 0, actual %d\n", qcount); fflush(stdout); qcount = doquery(rt, nodeid.id, KS_DHT_BOTH, ifv6); - printf("\n*** AF_INET6 count expected 5, actual %d\n", qcount); fflush(stdout); + printf("\n*** AF_INET6 count expected 20, actual %d\n", qcount); fflush(stdout); qcount = doquery(rt, nodeid.id, KS_DHT_REMOTE, ifv4); - printf("\n** remote AF_INET query count expected 3, actual %d\n", qcount); fflush(stdout); + printf("\n*** remote AF_INET query count expected 20, actual %d\n", qcount); fflush(stdout); qcount = doquery(rt, nodeid.id, KS_DHT_REMOTE, ifv6); - printf("\n*** remote AF_INET6 query count expected 3, actual %d\n", qcount); fflush(stdout); + printf("\n*** remote AF_INET6 query count expected 20, actual %d\n", qcount); fflush(stdout); qcount = doquery(rt, nodeid.id, KS_DHT_BOTH, ifv4); - printf("\n*** AF_INET count expected 4, actual %d\n", qcount); fflush(stdout); + printf("\n*** AF_INET count expected 20, actual %d\n", qcount); fflush(stdout); + printf("*** testbuckets - test03 finished\n\n\n"); fflush(stdout); return; } void test04() { + printf("*** testbuckets - test04 start\n"); fflush(stdout); + ks_dht_node_t* peer; ks_dht_nodeid_t nodeid; memset(nodeid.id, 0xef, KS_DHT_NODEID_SIZE); @@ -242,16 +255,193 @@ void test04() printf("*** query on 10k nodes in %d ms\n", tx); + printf("*** testbuckets - test04 finished\n\n\n"); fflush(stdout); return; } +/* test read/write node locking */ +void test05() +{ + printf("*** testbuckets - test05 start\n"); fflush(stdout); + + ks_dht_node_t* peer, *peer1, *peer2; + ks_dht_nodeid_t nodeid; + ks_status_t s; + + memset(nodeid.id, 0xef, KS_DHT_NODEID_SIZE); + + char ipv6[] = "1234:1234:1234:1234"; + char ipv4[] = "123.123.123.123"; + unsigned short port = 7001; + + ks_dhtrt_create_node(rt, nodeid, KS_DHT_REMOTE, ipv4, port, &peer); + + peer1 = ks_dhtrt_find_node(rt, nodeid); + printf("test05 - first find compelete\n"); fflush(stdout); + + peer2 = ks_dhtrt_find_node(rt, nodeid); + printf("test05 - second find compelete\n"); fflush(stdout); + + ks_dhtrt_delete_node(rt, peer); + printf("test05 - delete compelete\n"); fflush(stdout); + + s = ks_dhtrt_release_node(peer1); + if (s == KS_STATUS_FAIL) printf("release 1 failed\n"); fflush(stdout); -int main(int argx, char* argv[]) { + + s = ks_dhtrt_release_node(peer2); + if (s == KS_STATUS_FAIL) printf("release 1 failed\n"); + + printf("*** testbuckets - test05 finished\n\n\n"); fflush(stdout); + + return; +} + +static int gindex = 1; +static ks_mutex_t *glock; +static int gstop = 0; + +static int test06loops = 1000; +static int test06nodes = 200; /* max at 255 */ + +static void *test06ex1(ks_thread_t *thread, void *data) +{ + while(!gstop) { + ks_dhtrt_process_table(rt); + ks_sleep(100); + } + return NULL; +} + + +static void *test06ex2(ks_thread_t *thread, void *data) +{ + ks_dht_nodeid_t nodeid; + ks_dhtrt_querynodes_t query; + + + while(!gstop) { + + memset(&query, 0, sizeof(query)); + memset(query.nodeid.id, 0xef, KS_DHT_NODEID_SIZE); + query.max = 30; + query.family = ifv4; + query.type = KS_DHT_REMOTE; + + + ks_dhtrt_findclosest_nodes(rt, &query); + ks_sleep(10000); + + for(int i=0; i