diff --git a/libs/libks/src/dht/ks_dht_bucket.c b/libs/libks/src/dht/ks_dht_bucket.c index 53f8a2e2b2..796b0ce825 100644 --- a/libs/libks/src/dht/ks_dht_bucket.c +++ b/libs/libks/src/dht/ks_dht_bucket.c @@ -65,6 +65,7 @@ typedef struct ks_dhtrt_bucket_s { ks_dhtrt_bucket_entry_t entries[KS_DHT_BUCKETSIZE]; uint8_t count; uint8_t expired_count; + ks_rwl_t *lock; /* lock for safe traversal of the entry array */ } ks_dhtrt_bucket_t; @@ -81,11 +82,17 @@ typedef struct ks_dhtrt_bucket_header_s { unsigned char flags; } ks_dhtrt_bucket_header_t; +typedef struct ks_dhtrt_deletednode_s { + ks_dht_node_t* node; + struct ks_dhtrt_deletednode_s *next; +} ks_dhtrt_deletednode_t; typedef struct ks_dhtrt_internal_s { uint8_t localid[KS_DHT_NODEID_SIZE]; ks_dhtrt_bucket_header_t *buckets; /* root bucketheader */ - ks_rwl_t *lock; /* lock for safe traversal of the tree */ + ks_rwl_t *lock; /* lock for safe traversal of the tree */ + ks_mutex_t *deleted_node_lock; + ks_dhtrt_deletednode_t *deleted_node; } ks_dhtrt_internal_t; typedef struct ks_dhtrt_xort_s { @@ -132,6 +139,10 @@ static void ks_dhtrt_xor(const uint8_t *id1, const uint8_t *id2, uint8_t *xor); static int ks_dhtrt_ismasked(const uint8_t *id1, const uint8_t *mask); +static +void ks_dhtrt_queue_node_fordelete(ks_dhtrt_routetable_t *table, ks_dht_node_t* node); +static +void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table); static ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *node); @@ -177,6 +188,7 @@ KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_po ks_dhtrt_internal_t *internal = ks_pool_alloc(pool, sizeof(ks_dhtrt_internal_t)); ks_rwl_create(&internal->lock, pool); + ks_mutex_create(&internal->deleted_node_lock, KS_MUTEX_FLAG_DEFAULT, pool); table->internal = internal; /* initialize root bucket */ @@ -210,15 +222,20 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table, unsigned short port, ks_dht_node_t **node) { + ks_dhtrt_internal_t* internal = table->internal; + ks_rwl_read_lock(internal->lock); /* grab write lock and insert */ + ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id); assert(header != NULL); /* should always find a header */ ks_dhtrt_bucket_entry_t *bentry = ks_dhtrt_find_bucketentry(header, nodeid.id); if (bentry != 0) { - bentry->type = ks_time_now_sec(); - (*node) = bentry->gptr; - return KS_STATUS_SUCCESS; + bentry->type = ks_time_now_sec(); + (*node) = bentry->gptr; + ks_rwl_read_unlock(internal->lock); + return KS_STATUS_SUCCESS; } + ks_rwl_read_unlock(internal->lock); /* @todo - replace with reusable memory pool */ ks_dht_node_t *tnode = ks_pool_alloc(table->pool, sizeof(ks_dht_node_t)); @@ -241,10 +258,7 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table, return KS_STATUS_FAIL; } - ks_dhtrt_internal_t* internal = table->internal; - ks_rwl_write_lock(internal->lock); /* grab write lock and insert */ ks_status_t s = ks_dhtrt_insert_node(table, tnode); - ks_rwl_write_unlock(internal->lock); /* release write lock */ (*node) = tnode; @@ -255,39 +269,56 @@ KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t *table, ks_dh { ks_status_t s = KS_STATUS_FAIL; ks_dhtrt_internal_t* internal = table->internal; - ks_rwl_write_lock(internal->lock); /* grab write lock and delete */ + ks_rwl_read_lock(internal->lock); /* grab read lock */ ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, node->nodeid.id); if (header != 0) { ks_dhtrt_bucket_t *bucket = header->bucket; if (bucket != 0) { /* we found a bucket*/ + ks_rwl_write_lock(bucket->lock); s = ks_dhtrt_delete_id(bucket, node->nodeid.id); + ks_rwl_write_unlock(bucket->lock); } } - ks_rwl_write_unlock(internal->lock); /* release write lock */ + ks_rwl_read_unlock(internal->lock); /* release write lock */ + /* at this point no subsequent find/query will return the node - so we can + safely free it if we can grab the write lock + Having held the write lock on the bucket we know no other thread + is awaiting a read/write lock on the node + */ - ks_rwl_destroy(&(node->reflock)); - ks_pool_free(table->pool, node); + if (ks_rwl_try_write_lock(node->reflock) == KS_STATUS_SUCCESS) { /* grab exclusive lock on node */ + ks_rwl_destroy(&(node->reflock)); + ks_pool_free(table->pool, node); + } + else { + ks_dhtrt_queue_node_fordelete(table, node); + } return s; } static ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *node) { + ks_dhtrt_internal_t* internal = table->internal; ks_dhtrt_bucket_t *bucket = 0; int insanity = 0; + ks_rwl_write_lock(internal->lock); ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, node->nodeid.id); assert(header != NULL); /* should always find a header */ bucket = header->bucket; if (bucket == 0) { + ks_rwl_write_unlock(internal->lock); return KS_STATUS_FAIL; /* we were not able to find a bucket*/ } + + ks_rwl_write_lock(bucket->lock); while (bucket->count == KS_DHT_BUCKETSIZE) { if (insanity > 3200) assert(insanity < 3200); @@ -295,8 +326,11 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no /* first - seek a stale entry to eject */ if (bucket->expired_count) { ks_status_t s = ks_dhtrt_insert_id(bucket, node); - - if (s == KS_STATUS_SUCCESS) return KS_STATUS_SUCCESS; + if (s == KS_STATUS_SUCCESS) { + ks_rwl_write_unlock(bucket->lock); + ks_rwl_write_unlock(internal->lock); + return KS_STATUS_SUCCESS; + } } /* @@ -310,6 +344,8 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no char buffer[100]; printf(" nodeid %s was not inserted\n", ks_dhtrt_printableid(node->nodeid.id, buffer)); #endif + ks_rwl_write_unlock(bucket->lock); + ks_rwl_write_unlock(internal->lock); return KS_STATUS_FAIL; } @@ -323,6 +359,8 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no char buffer[100]; printf(" nodeid %s was not inserted\n", ks_dhtrt_printableid(node->nodeid.id, buffer)); #endif + ks_rwl_write_unlock(bucket->lock); + ks_rwl_write_unlock(internal->lock); return KS_STATUS_FAIL; } @@ -343,9 +381,12 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no /* which bucket do care about */ if (ks_dhtrt_ismasked(node->nodeid.id, newleft->mask)) { bucket = newleft->bucket; + ks_rwl_write_lock(bucket->lock); /* lock new bucket */ + ks_rwl_write_unlock(header->right->bucket->lock); /* unlock old bucket */ header = newleft; } else { bucket = newright->bucket; + /* note: we still hold a lock on the bucket */ header = newright; } ++insanity; @@ -357,20 +398,49 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no printf("into bucket %s\n", ks_dhtrt_printableid(header->mask, buffer)); #endif - /* by this point we have a viable bucket */ - return ks_dhtrt_insert_id(bucket, node); + /* by this point we have a viable & locked bucket + so downgrade the internal lock to read. safe as we hold the bucket write lock + preventing it being sptlit under us. + */ + ks_rwl_write_unlock(internal->lock); + ks_rwl_read_lock(internal->lock); + + ks_status_t s = ks_dhtrt_insert_id(bucket, node); + ks_rwl_read_unlock(internal->lock); + ks_rwl_write_unlock(bucket->lock); + return s; } -KS_DECLARE(ks_dht_node_t *) ks_dhtrt_find_node(ks_dhtrt_routetable_t *table, ks_dht_nodeid_t nodeid) { +KS_DECLARE(ks_dht_node_t *) ks_dhtrt_find_node(ks_dhtrt_routetable_t *table, ks_dht_nodeid_t nodeid) +{ + + ks_dht_node_t* node = NULL; + + ks_dhtrt_internal_t* internal = table->internal; + ks_rwl_read_lock(internal->lock); /* grab read lock */ + ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id); - if (header == 0) return NULL; + if (header != 0) { - ks_dhtrt_bucket_t *bucket = header->bucket; + ks_dhtrt_bucket_t *bucket = header->bucket; - if (bucket == 0) return NULL; /* probably a logic error ?*/ + if (bucket != 0) { /* probably a logic error ?*/ - return ks_dhtrt_find_nodeid(bucket, nodeid.id); + ks_rwl_read_lock(bucket->lock); + ks_dht_node_t* node = ks_dhtrt_find_nodeid(bucket, nodeid.id); + + if (node != NULL) { + ks_rwl_read_lock(node->reflock); + } + + ks_rwl_read_unlock(bucket->lock); + } + + } + + ks_rwl_read_unlock(internal->lock); + return node; } KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table, ks_dht_nodeid_t nodeid) @@ -378,9 +448,11 @@ KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table, ks_dh ks_status_t s = KS_STATUS_FAIL; ks_dhtrt_internal_t* internal = table->internal; ks_rwl_read_lock(internal->lock); /* grab read lock */ + ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id); if (header != 0 && header->bucket != 0) { + ks_rwl_write_lock(header->bucket->lock); ks_dhtrt_bucket_entry_t *e = ks_dhtrt_find_bucketentry(header, nodeid.id); if (e != 0) { @@ -394,9 +466,9 @@ KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table, ks_dh e->flags = DHTPEER_ACTIVE; s = KS_STATUS_SUCCESS; } - + ks_rwl_write_unlock(header->bucket->lock); } - ks_rwl_read_lock(internal->lock); /* release read lock */ + ks_rwl_read_unlock(internal->lock); /* release read lock */ return s; } @@ -407,15 +479,15 @@ KS_DECLARE(ks_status_t) ks_dhtrt_expire_node(ks_dhtrt_routetable_t *table, ks_dh ks_rwl_read_lock(internal->lock); /* grab read lock */ ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id); - if (header != 0) { - + if (header != 0 && header->bucket != 0) { + ks_rwl_write_lock(header->bucket->lock); ks_dhtrt_bucket_entry_t *e = ks_dhtrt_find_bucketentry(header, nodeid.id); if (e != 0) { e->flags = DHTPEER_EXPIRED; s = KS_STATUS_SUCCESS; } - + ks_rwl_write_unlock(header->bucket->lock); } ks_rwl_read_unlock(internal->lock); /* release read lock */ return s; @@ -536,7 +608,6 @@ uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt if (lheader) { xortn = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_sortedxors_t)); - memset(xortn, 0, sizeof(ks_dhtrt_sortedxors_t)); if (tofree == 0) tofree = xortn; @@ -558,7 +629,6 @@ uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt if (rheader) { xortn1 = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_sortedxors_t)); - memset(xortn1, 0, sizeof(ks_dhtrt_sortedxors_t)); prev->next = xortn1; prev = xortn1; cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, query->type, query->family, @@ -598,7 +668,7 @@ uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt KS_DECLARE(ks_status_t) ks_dhtrt_release_node(ks_dht_node_t* node) { return KS_STATUS_SUCCESS; - /* return ks_rwl_read_unlock(node->reflock);*/ + return ks_rwl_read_unlock(node->reflock); } @@ -630,33 +700,66 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table) stack[stackix++] = header; if (header->bucket) { + ks_dhtrt_bucket_t *b = header->bucket; - for (int ix=0; ixentries[ix]; + if (ks_rwl_try_write_lock(b->lock) == KS_STATUS_SUCCESS) { - if (e->inuse == 1) { - /* more than n pings outstanding? */ +#ifdef KS_DHT_DEBUGLOCKPRINTF_ + char buf[100]; + printf("process_table: LOCKING bucket %s\n", + ks_dhtrt_printableid(header->mask, buf)); + fflush(stdout); +#endif - if (e->outstanding_pings >= KS_DHTRT_MAXPING) { - e->flags = DHTPEER_EXPIRED; - ++b->expired_count; - continue; - } - if (e->flags == DHTPEER_SUSPECT) { - ks_dhtrt_ping(e); - continue; - } + for (int ix=0; ixentries[ix]; - ks_time_t tdiff = t0 - e->tyme; + if (e->inuse == 1) { + /* more than n pings outstanding? */ - if (tdiff > KS_DHTRT_INACTIVETIME) { - e->flags = DHTPEER_SUSPECT; - ks_dhtrt_ping(e); - } - } - } /* end for each bucket_entry */ + if (e->outstanding_pings >= KS_DHTRT_MAXPING) { + e->flags = DHTPEER_EXPIRED; + ++b->expired_count; + continue; + } + + if (e->flags == DHTPEER_SUSPECT) { + ks_dhtrt_ping(e); + continue; + } + + ks_time_t tdiff = t0 - e->tyme; + + if (tdiff > KS_DHTRT_INACTIVETIME) { + e->flags = DHTPEER_SUSPECT; + ks_dhtrt_ping(e); + } + + } /* end if e->inuse */ + + } /* end for each bucket_entry */ + +#ifdef KS_DHT_DEBUGLOCKPRINTF_ + char buf1[100]; + printf("process_table: UNLOCKING bucket %s\n", + ks_dhtrt_printableid(header->mask, buf1)); + fflush(stdout); +#endif + + ks_rwl_write_unlock(b->lock); + + } /* end of if trywrite_lock successful */ + else { +#ifdef KS_DHT_DEBUGPRINTF_ + char buf2[100]; + printf("process_table: unble to LOCK bucket %s\n", + ks_dhtrt_printableid(header->mask, buf2)); + fflush(stdout); +#endif + + } } header = header->left; @@ -668,9 +771,46 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table) } } ks_rwl_read_unlock(internal->lock); /* release read lock */ + + ks_dhtrt_process_deleted(table); + return; } +void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table) +{ + ks_dhtrt_internal_t* internal = table->internal; + ks_mutex_lock(internal->deleted_node_lock); + + ks_dhtrt_deletednode_t *deleted = internal->deleted_node; + ks_dhtrt_deletednode_t *prev = NULL, *temp=NULL; + + while(deleted) { + ks_dht_node_t* node = deleted->node; + + if (ks_rwl_try_write_lock(node->reflock) == KS_STATUS_SUCCESS) { + ks_rwl_destroy(&(node->reflock)); + ks_pool_free(table->pool, node); + temp = deleted; + deleted = deleted->next; + ks_pool_free(table->pool, temp); + if (prev != NULL) { + prev->next = deleted; + } + else { + internal->deleted_node = deleted; + } + } + else { + prev = deleted; + deleted = prev->next; + } + + } + + ks_mutex_unlock(internal->deleted_node_lock); +} + KS_DECLARE(void) ks_dhtrt_dump(ks_dhtrt_routetable_t *table, int level) { /* dump buffer headers */ @@ -728,7 +868,6 @@ ks_dhtrt_bucket_header_t *ks_dhtrt_create_bucketheader(ks_pool_t *pool, ks_dhtrt { ks_dhtrt_bucket_header_t *header = ks_pool_alloc(pool, sizeof(ks_dhtrt_bucket_header_t)); - memset(header, 0, sizeof(ks_dhtrt_bucket_header_t)); memcpy(header->mask, mask, sizeof(header->mask)); header->parent = parent; @@ -745,9 +884,7 @@ static ks_dhtrt_bucket_t *ks_dhtrt_create_bucket(ks_pool_t *pool) { ks_dhtrt_bucket_t *bucket = ks_pool_alloc(pool, sizeof(ks_dhtrt_bucket_t)); - - memset(bucket, 0, sizeof(ks_dhtrt_bucket_t)); - /*ks_rwl_create(&bucket->lock, pool);*/ + ks_rwl_create(&bucket->lock, pool); return bucket; } @@ -995,6 +1132,15 @@ uint8_t ks_dhtrt_findclosest_bucketnodes(ks_dhtrt_nodeid_t id, } + ks_rwl_read_lock(bucket->lock); /* get a read lock : released in load_query when the results are copied */ +#ifdef KS_DHT_DEBUGLOCKPRINTF_ + char buf[100]; + printf("closestbucketnodes: LOCKING bucket %s\n", + ks_dhtrt_printableid(header->mask, buf)); + fflush(stdout); +#endif + + for (uint8_t ix=0; ixentries[ix].inuse == 1 && (family == ifboth || bucket->entries[ix].family == family) && @@ -1064,14 +1210,35 @@ uint8_t ks_dhtrt_load_query(ks_dhtrt_querynodes_t *query, ks_dhtrt_sortedxors_t query->nodes[ix] = current->bheader->bucket->entries[z].gptr; xorix = current->xort[xorix].nextix; ++loaded; - } + } +#ifdef KS_DHT_DEBUGLOCKPRINTF_ + char buf1[100]; + printf("load_query: UNLOCKING bucket %s\n", + ks_dhtrt_printableid(current->bheader->mask, buf1)); + fflush(stdout); +#endif + ks_rwl_read_unlock(current->bheader->bucket->lock); /* release the read lock from findclosest_bucketnodes */ + if (loaded >= query->max) break; current = current->next; } query->count = loaded; + return loaded; } +void ks_dhtrt_queue_node_fordelete(ks_dhtrt_routetable_t* table, ks_dht_node_t* node) +{ + ks_dhtrt_internal_t* internal = table->internal; + ks_dhtrt_deletednode_t* deleted = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_deletednode_t)); + deleted->node = node; + ks_mutex_lock(internal->deleted_node_lock); + deleted->next = internal->deleted_node; + internal->deleted_node = deleted; + ks_mutex_unlock(internal->deleted_node_lock); +} + + void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t *entry) { ++entry->outstanding_pings; /* @todo */