diff --git a/libs/libks/src/dht/ks_dht.c b/libs/libks/src/dht/ks_dht.c index f0050cf9b0..6484a297cf 100644 --- a/libs/libks/src/dht/ks_dht.c +++ b/libs/libks/src/dht/ks_dht.c @@ -385,11 +385,11 @@ KS_DECLARE(ks_status_t) ks_dht_bind(ks_dht_t *dht, const ks_dht_nodeid_t *nodeid // @todo initialize or add local nodeid to appropriate route table if (ep->addr.family == AF_INET) { if (!dht->rt_ipv4) { - ks_dhtrt_initroute(&dht->rt_ipv4, dht->pool, ep->nodeid); + ks_dhtrt_initroute(&dht->rt_ipv4, dht->pool); } } else { if (!dht->rt_ipv6) { - ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool, ep->nodeid); + ks_dhtrt_initroute(&dht->rt_ipv6, dht->pool); } } diff --git a/libs/libks/src/dht/ks_dht.h b/libs/libks/src/dht/ks_dht.h index ac76542f9b..977e410dde 100644 --- a/libs/libks/src/dht/ks_dht.h +++ b/libs/libks/src/dht/ks_dht.h @@ -51,11 +51,15 @@ struct ks_dht_nodeid_s { }; enum ipfamily { ifv4=AF_INET, ifv6=AF_INET6, ifboth=AF_INET+AF_INET6}; +enum ks_dht_nodetype_t { ks_dht_remote_t=0x01, + ks_dht_local_t=0x02, + ks_dht_both_t=ks_dht_remote_t+ks_dht_local_t }; struct ks_dht_node_s { ks_dht_nodeid_t nodeid; ks_sockaddr_t addr; - enum ipfamily family; /* in: AF_INET or AF_INET6 or both */ + enum ipfamily family; /* AF_INET or AF_INET6 */ + enum ks_dht_nodetype_t type; /* local or remote */ ks_dhtrt_routetable_t* table; }; @@ -68,6 +72,7 @@ struct ks_dhtrt_routetable_s { struct ks_dhtrt_querynodes_s { ks_dht_nodeid_t nodeid; /* in: id to query */ enum ipfamily family; /* in: AF_INET or AF_INET6 or both */ + enum ks_dht_nodetype_t type; /* remote, local, or both */ uint8_t max; /* in: maximum to return */ uint8_t count; /* out: number returned */ ks_dht_node_t* nodes[ KS_DHT_MESSAGE_QUERY_MAX_SIZE]; /* out: array of peers (ks_dht_node_t* nodes[incount]) */ @@ -230,11 +235,12 @@ KS_DECLARE(ks_status_t) ks_dht_transaction_deinit(ks_dht_transaction_t *transact * route table methods * */ -KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_pool_t *pool, ks_dht_nodeid_t nodeid); +KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_pool_t *pool); KS_DECLARE(void) ks_dhtrt_deinitroute(ks_dhtrt_routetable_t **table); KS_DECLARE(ks_status_t) ks_dhtrt_create_node(ks_dhtrt_routetable_t* table, ks_dht_nodeid_t nodeid, + enum ks_dht_nodetype_t type, char* ip, unsigned short port, ks_dht_node_t** node); diff --git a/libs/libks/src/dht/ks_dht_bucket.c b/libs/libks/src/dht/ks_dht_bucket.c index 47365c42c9..31435842e7 100644 --- a/libs/libks/src/dht/ks_dht_bucket.c +++ b/libs/libks/src/dht/ks_dht_bucket.c @@ -53,7 +53,8 @@ typedef uint8_t ks_dhtrt_nodeid_t[KS_DHT_NODEID_SIZE]; typedef struct ks_dhtrt_bucket_entry_s { ks_time_t tyme; uint8_t id[KS_DHT_NODEID_SIZE]; - ks_dht_node_t *gptr; /* ptr to peer */ + ks_dht_node_t *gptr; /* ptr to peer */ + enum ks_dht_nodetype_t type; uint8_t inuse; uint8_t outstanding_pings; uint8_t flags; /* active, suspect, expired */ @@ -71,10 +72,11 @@ typedef struct ks_dhtrt_bucket_s { #define BHF_LEFT 0x80 typedef struct ks_dhtrt_bucket_header_s { - struct ks_dhtrt_bucket_header_s* parent; - struct ks_dhtrt_bucket_header_s* left; - struct ks_dhtrt_bucket_header_s* right; + struct ks_dhtrt_bucket_header_s * parent; + struct ks_dhtrt_bucket_header_s * left; + struct ks_dhtrt_bucket_header_s * right; ks_dhtrt_bucket_t * bucket; + ks_dhtrt_bucket_t * bucketv6; ks_time_t tyme; /* last processed time */ unsigned char mask[KS_DHT_NODEID_SIZE]; /* node id mask */ unsigned char flags; @@ -84,8 +86,8 @@ typedef struct ks_dhtrt_bucket_header_s { typedef struct ks_dhtrt_internal_s { uint8_t localid[KS_DHT_NODEID_SIZE]; ks_dhtrt_bucket_header_t *buckets; /* root bucketheader */ - ks_rwl_t * lock; /* lock for safe traversal of the tree */ - uint8_t locked; + ks_rwl_t *lock; /* lock for safe traversal of the tree */ + uint8_t locked; } ks_dhtrt_internal_t; typedef struct ks_dhtrt_xort_s { @@ -124,14 +126,14 @@ static ks_dht_node_t *ks_dhtrt_find_nodeid(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t nodeid); -static void -ks_dhtrt_shiftright(uint8_t *id); +static +void ks_dhtrt_shiftright(uint8_t *id); static void ks_dhtrt_shiftleft(uint8_t *id); -static void -ks_dhtrt_xor(const uint8_t *id1, const uint8_t *id2, uint8_t *xor); -static int -ks_dhtrt_ismasked(const uint8_t *id1, const uint8_t *mask); +static +void ks_dhtrt_xor(const uint8_t *id1, const uint8_t *id2, uint8_t *xor); +static +int ks_dhtrt_ismasked(const uint8_t *id1, const uint8_t *mask); static ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *node); @@ -147,6 +149,7 @@ static uint8_t ks_dhtrt_load_query(ks_dhtrt_querynodes_t *query, ks_dhtrt_sortedxors_t *xort); static uint8_t ks_dhtrt_findclosest_bucketnodes(unsigned char *nodeid, + enum ks_dht_nodetype_t type, ks_dhtrt_bucket_header_t *header, ks_dhtrt_sortedxors_t *xors, unsigned char *hixor, @@ -162,29 +165,16 @@ void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t *entry); /* # define KS_DHT_DEBUGPRINTFX_ very verbose */ -/* - Public interface - --------------- - ks_dhtrt_initroute - ks_dhtrt_drinitroute - - ks_dhtrt_insertnode - -*/ - -KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_pool_t *pool, ks_dht_nodeid_t nodeid) +KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_pool_t *pool) { unsigned char initmask[KS_DHT_NODEID_SIZE]; memset(initmask, 0xff, sizeof(initmask)); ks_dhtrt_routetable_t *table = ks_pool_alloc(pool, sizeof(ks_dhtrt_routetable_t)); - memset(table, 0, sizeof(ks_dhtrt_routetable_t)); ks_dhtrt_internal_t *internal = ks_pool_alloc(pool, sizeof(ks_dhtrt_internal_t)); - memset(internal, 0, sizeof(ks_dhtrt_internal_t)); /*ks_rwl_create(&internal->lock, pool);*/ - if (nodeid.id != 0) memcpy(internal->localid, nodeid.id, KS_DHT_NODEID_SIZE); table->internal = internal; /* initialize root bucket */ @@ -207,13 +197,13 @@ KS_DECLARE(void) ks_dhtrt_deinitroute(ks_dhtrt_routetable_t **table) ks_pool_t *pool = (*table)->pool; ks_pool_free(pool, *table); - *table = NULL; return; } KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table, - ks_dht_nodeid_t nodeid, + ks_dht_nodeid_t nodeid, + enum ks_dht_nodetype_t type, char *ip, unsigned short port, ks_dht_node_t **node) @@ -233,7 +223,8 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table, } } - memcpy(tnode->nodeid.id, nodeid.id, KS_DHT_NODEID_SIZE); + memcpy(tnode->nodeid.id, nodeid.id, KS_DHT_NODEID_SIZE); + tnode->type = type; if ((ks_addr_set(&tnode->addr, ip, port, tnode->family) != KS_STATUS_SUCCESS) || (ks_dhtrt_insert_node(table, tnode) != KS_STATUS_SUCCESS)) { @@ -268,13 +259,6 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no ks_dhtrt_bucket_t *bucket = 0; int insanity = 0; - /* first see if it exists */ - ks_dht_node_t *peer = ks_dhtrt_find_node(table, node->nodeid); - - if (peer != 0) { - return KS_STATUS_FAIL; - } - ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, node->nodeid.id); bucket = header->bucket; @@ -313,7 +297,7 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no if (newmask[KS_DHT_NODEID_SIZE-1] == 0) { /* no more bits to shift - is this possible */ #ifdef KS_DHT_DEBUGPRINTF_ char buffer[100]; - printf(" nodeid %s was not inserted\n", ks_dhtrt_printableid(peer->nodeid.id, buffer)); + printf(" nodeid %s was not inserted\n", ks_dhtrt_printableid(node->nodeid.id, buffer)); #endif return KS_STATUS_FAIL; } @@ -430,7 +414,7 @@ KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable_t *table, ks_ /* step 1 - look at immediate bucket */ /* --------------------------------- */ - cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, header, &xort0, initid ,max); + cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, query->type, header, &xort0, initid ,max); max -= cnt; total += cnt; @@ -438,7 +422,8 @@ KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable_t *table, ks_ printf(" bucket header %s yielded %d nodes; total=%d\n", buffer, cnt, total); #endif - if (total >= query->max) { /* is query answered ? */ + if (total >= query->max || + !header->parent ) { /* is query answered ? */ return ks_dhtrt_load_query(query, &xort0); } @@ -462,7 +447,7 @@ KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable_t *table, ks_ } } - cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, header, &xort1, initid ,max); + cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, query->type, header, &xort1, initid ,max); max -= cnt; total += cnt; @@ -511,7 +496,7 @@ KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable_t *table, ks_ prev->next = xortn; prev = xortn; - cnt += ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, lheader, xortn, leftid ,max); + cnt += ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, query->type, lheader, xortn, leftid ,max); max -= cnt; #ifdef KS_DHT_DEBUGPRINTF_ printf(" stage3: seaching left bucket header %s yielded %d nodes, total=%d\n", @@ -529,7 +514,7 @@ KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable_t *table, ks_ memset(xortn1, 0, sizeof(ks_dhtrt_sortedxors_t)); prev->next = xortn1; prev = xortn1; - cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, rheader, xortn1, rightid , max); + cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, query->type, rheader, xortn1, rightid , max); max -= cnt; #ifdef KS_DHT_DEBUGPRINTF_ printf(" stage3: seaching right bucket header %s yielded %d nodes, total=%d\n", @@ -860,8 +845,9 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node) if ( freeentries[free].inuse = 1; bucket->entries[free].gptr = node; - bucket->entries[free].tyme = ks_time_now(); - bucket->entries[free].flags &= DHTPEER_ACTIVE; + bucket->entries[free].type = node->type; + bucket->entries[free].tyme = ks_time_now(); + bucket->entries[free].flags &= DHTPEER_ACTIVE; ++bucket->count; memcpy(bucket->entries[free].id, node->nodeid.id, KS_DHT_NODEID_SIZE); @@ -938,6 +924,7 @@ void ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id) static uint8_t ks_dhtrt_findclosest_bucketnodes(ks_dhtrt_nodeid_t id, + enum ks_dht_nodetype_t type, ks_dhtrt_bucket_header_t *header, ks_dhtrt_sortedxors_t *xors, unsigned char *hixor, /*todo: remove */ @@ -963,8 +950,9 @@ uint8_t ks_dhtrt_findclosest_bucketnodes(ks_dhtrt_nodeid_t id, } for (uint8_t ix=0; ixentries[ix].inuse == 1 && - ks_dhtrt_isactive( &(bucket->entries[ix])) ) { + if ( bucket->entries[ix].inuse == 1 && + bucket->entries[ix].type & type && + ks_dhtrt_isactive( &(bucket->entries[ix])) ) { /* calculate xor value */ ks_dhtrt_xor(bucket->entries[ix].id, id, xorvalue ); @@ -1022,7 +1010,7 @@ uint8_t ks_dhtrt_load_query(ks_dhtrt_querynodes_t *query, ks_dhtrt_sortedxors_t ks_dhtrt_printableid(current->bheader->mask,buf), current->count); #endif int xorix = current->startix; - for (uint8_t ix = 0; ix<= current->count && loaded < query->max; ++ix ) { + for (uint8_t ix = 0; ix< current->count && loaded < query->max; ++ix ) { unsigned int z = current->xort[xorix].ix; query->nodes[ix] = current->bheader->bucket->entries[z].gptr; ++loaded; @@ -1069,7 +1057,6 @@ void ks_dhtrt_shiftright(uint8_t *id) } return; } - static void ks_dhtrt_shiftleft(uint8_t *id) {