FS-9775: DHT Repopulate empty buckets

This commit is contained in:
colm 2016-12-30 20:30:11 -05:00 committed by Mike Jerris
parent 4a0132f7b7
commit a698651018
6 changed files with 289 additions and 82 deletions

View File

@ -296,6 +296,11 @@ KS_DECLARE(void) ks_dht_job_build_put(ks_dht_job_t *job,
ks_dht_token_t *token,
int64_t cas,
ks_dht_storageitem_t *item);
KS_DECLARE(void) ks_dht_job_build_search_findnode(ks_dht_job_t *job,
ks_dht_nodeid_t *target,
uint32_t family,
ks_dht_job_callback_t query_callback,
ks_dht_job_callback_t finish_callback);
KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job);

View File

@ -3046,6 +3046,43 @@ KS_DECLARE(ks_status_t) ks_dht_process_response_put(ks_dht_t *dht, ks_dht_job_t
return ret;
}
KS_DECLARE(ks_status_t) ks_dht_exec_search_findnode(ks_dht_t *dht, ks_dht_job_t *job)
{
return ks_dht_search_findnode(dht,
job->query_family,
&job->query_target,
NULL,
NULL);
}
KS_DECLARE(ks_status_t) ks_dht_queue_search_findnode(ks_dht_t* dht,
ks_dhtrt_routetable_t *rt,
ks_dht_nodeid_t *target,
ks_dht_job_callback_t callback)
{
ks_dht_job_t *job = NULL;
ks_status_t ret = KS_STATUS_SUCCESS;
ks_assert(dht);
ks_assert(rt);
ks_assert(target);
ks_sockaddr_t taddr; /* just to satisfy the api */
if ((ret = ks_dht_job_create(&job, dht->pool, &taddr, 3)) == KS_STATUS_SUCCESS) {
int32_t family = AF_INET;
if (rt == dht->rt_ipv6) {
family = AF_INET6;
}
ks_dht_job_build_search_findnode(job, target, family, ks_dht_exec_search_findnode, callback);
}
return ret;
}
/* For Emacs:
* Local Variables:
* mode:c

View File

@ -145,6 +145,7 @@ struct ks_dht_job_s {
int64_t query_cas;
ks_dht_token_t query_token;
ks_dht_storageitem_t *query_storageitem;
uint32_t query_family;
// job specific response parameters
ks_dht_nodeid_t response_id;
@ -480,6 +481,14 @@ KS_DECLARE(ks_status_t) ks_dht_search_findnode(ks_dht_t *dht,
ks_dht_search_callback_t callback,
ks_dht_search_t **search);
KS_DECLARE(ks_status_t) ks_dht_queue_search_findnode(ks_dht_t* dht,
ks_dhtrt_routetable_t *rt,
ks_dht_nodeid_t *target,
ks_dht_job_callback_t callback);
KS_DECLARE(ks_status_t) ks_dht_exec_search_findnode(ks_dht_t *dht, ks_dht_job_t *job);
/**
* route table methods

View File

@ -58,6 +58,7 @@ typedef uint8_t ks_dhtrt_nodeid_t[KS_DHT_NODEID_SIZE];
/* internal structures */
typedef struct ks_dhtrt_bucket_entry_s {
ks_time_t tyme;
ks_time_t ping_tyme;
uint8_t id[KS_DHT_NODEID_SIZE];
ks_dht_node_t *gptr; /* ptr to peer */
uint8_t inuse;
@ -186,9 +187,9 @@ uint8_t ks_dhtrt_findclosest_bucketnodes(unsigned char *nodeid,
unsigned int max);
static
void ks_dhtrt_ping(ks_dhtrt_internal_t *table, ks_dhtrt_bucket_entry_t *entry);
void ks_dhtrt_ping(ks_dhtrt_internal_t *internal, ks_dhtrt_bucket_entry_t *entry);
static
void ks_dhtrt_find(ks_dhtrt_internal_t *internal, ks_dht_nodeid_t *nodeid);
void ks_dhtrt_find(ks_dhtrt_routetable_t *table, ks_dhtrt_internal_t *internal, ks_dht_nodeid_t *nodeid);
/* debugging */
@ -484,20 +485,20 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
return KS_STATUS_FAIL;
}
/* shift right x bits : todo 1 bit for the moment */
/* shift right 1 bit */
ks_dhtrt_shiftright(newmask);
/* create the new bucket structures */
ks_dhtrt_bucket_header_t *newleft = ks_dhtrt_create_bucketheader(table->pool, header, newmask);
header->right1bit = newleft;
newleft->left1bit = header;
newleft->bucket = ks_dhtrt_create_bucket(table->pool);
newleft->flags = BHF_LEFT; /* flag as left hand side - therefore splitable */
ks_dhtrt_bucket_header_t *newright = ks_dhtrt_create_bucketheader(table->pool, header, header->mask);
newright->right1bit = newleft;
newleft->left1bit = newright;
ks_dhtrt_split_bucket(header, newleft, newright);
/* ok now we need to try again to see if the bucket has capacity */
@ -938,6 +939,7 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
ks_time_t t0 = ks_time_now_sec();
if (t0 - internal->last_process_table < internal->next_process_table_delta) {
/*printf("process table: next scan not scheduled\n");*/
return;
}
@ -967,87 +969,90 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
if (b->count == 0) {
if (t0 - b->findtyme >= KS_DHTRT_EXPIREDTIME) { /* bucket has been empty for a while */
if (t0 - b->findtyme >= (ks_time_t)KS_DHTRT_EXPIREDTIME) { /* bucket has been empty for a while */
ks_dht_nodeid_t targetid;
if (header->left1bit) {
ks_dhtrt_midmask(header->left1bit->mask, header->mask, targetid.id);
}
else if (header->right1bit) {
if (header->right1bit) {
ks_dhtrt_midmask(header->mask, header->right1bit->mask, targetid.id);
}
else {
ks_dhtrt_shiftright(targetid.id);
ks_dhtrt_nodeid_t rightid;
memcpy(rightid, header->mask, KS_DHT_NODEID_SIZE);
ks_dhtrt_shiftright(rightid);
ks_dhtrt_midmask(header->mask, rightid, targetid.id);
}
ks_dhtrt_find(internal, &targetid);
continue;
ks_dhtrt_find(table, internal, &targetid);
b->findtyme = t0;
}
}
else {
for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
ks_dhtrt_bucket_entry_t *e = &b->entries[ix];
for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
ks_dhtrt_bucket_entry_t *e = &b->entries[ix];
if (e->inuse == 1) {
if (e->inuse == 1) {
ks_time_t tdiff = t0 - e->tyme;
if (e->gptr->type != KS_DHT_LOCAL) { /* 'local' nodes do not get expired */
if (e->gptr->type != KS_DHT_LOCAL) { /* 'local' nodes do not get expired */
/* more than n pings outstanding? */
/* more than n pings outstanding? */
if (e->flags == DHTPEER_DUBIOUS) {
continue;
}
if ( e->flags != DHTPEER_EXPIRED &&
e->outstanding_pings >= KS_DHTRT_MAXPING ) {
ks_log(KS_LOG_DEBUG,"process_table: expiring node %s\n",
ks_dhtrt_printableid(e->id, buf));
e->flags = DHTPEER_EXPIRED;
++b->expired_count;
e->outstanding_pings = 0; /* extinguish all hope: do not retry again */
continue;
}
/* if not on shortest interval and there are any outstanding pings - send another */
if ( internal->next_process_table_delta == KS_DHTRT_PROCESSTABLE_SHORTINTERVAL120 &&
e->outstanding_pings > 0) {
ks_dhtrt_ping(internal, e);
if (e->outstanding_pings == 2) {
++ping2_count; /* return in 60 seconds for final check */
}
else {
++ping_count;
if (e->flags == DHTPEER_DUBIOUS) {
continue; /* nothin' to see here */
}
continue;
}
/* refresh empty buckets */
if ( e->flags != DHTPEER_EXPIRED &&
tdiff >= KS_DHTRT_EXPIREDTIME && /* beyond expired time */
e->outstanding_pings >= KS_DHTRT_MAXPING ) { /* has been retried */
ks_log(KS_LOG_DEBUG,"process_table: expiring node %s\n",
ks_dhtrt_printableid(e->id, buf));
e->flags = DHTPEER_EXPIRED;
++b->expired_count;
e->outstanding_pings = 0; /* extinguish all hope: do not retry again */
continue;
}
/* if on shortest interval and there are two outstanding pings - send another and final */
if ( internal->next_process_table_delta == KS_DHTRT_PROCESSTABLE_SHORTINTERVAL60 &&
e->outstanding_pings >= 2) {
ks_dhtrt_ping(internal, e);
++ping_count;
continue;
}
ks_time_t tdiff = t0 - e->tyme;
/* re ping in-doubt nodes */
if ( e->outstanding_pings > 0) {
ks_time_t tping = t0 - e->ping_tyme; /* time since we last pinged */
if (tdiff > KS_DHTRT_EXPIREDTIME) {
e->flags = DHTPEER_DUBIOUS; /* mark as dubious */
ks_dhtrt_ping(internal, e); /* final effort to activate */
continue;
}
if (e->outstanding_pings == KS_DHTRT_MAXPING - 1) { /* final ping */
ks_dhtrt_ping(internal, e);
e->ping_tyme = t0;
++ping2_count;
}
else if (tping >= KS_DHTRT_PROCESSTABLE_SHORTINTERVAL120) {
ks_dhtrt_ping(internal, e);
e->ping_tyme = t0;
++ping_count;
}
continue;
}
if (tdiff > KS_DHTRT_INACTIVETIME) { /* inactive for suspicious length */
ks_dhtrt_ping(internal, e); /* kick */
++ping_count;
continue;
}
/* look for newly expired nodes */
if (tdiff > KS_DHTRT_EXPIREDTIME) {
e->flags = DHTPEER_DUBIOUS; /* mark as dubious */
ks_dhtrt_ping(internal, e); /* final effort to activate */
e->ping_tyme = t0;
continue;
}
} /* end if not local */
if (tdiff > KS_DHTRT_INACTIVETIME) { /* inactive for suspicious length */
ks_dhtrt_ping(internal, e); /* kick */
e->ping_tyme = t0;
++ping_count;
continue;
}
} /* end if e->inuse */
} /* end if not local */
} /* end for each bucket_entry */
} /* end if e->inuse */
} /* end for each bucket_entry */
} /* if bucket->count == 0 .... else */
#ifdef KS_DHT_DEBUGLOCKPRINTF_
char buf[100];
@ -1664,6 +1669,7 @@ void ks_dhtrt_ping(ks_dhtrt_internal_t *internal, ks_dhtrt_bucket_entry_t *entry
char buf[100];
ks_log(KS_LOG_DEBUG, "Ping queued for nodeid %s count %d\n",
ks_dhtrt_printableid(entry->id,buf), entry->outstanding_pings);
/*printf("ping: %s\n", buf); fflush(stdout);*/
#endif
ks_dht_node_t* node = entry->gptr;
ks_log(KS_LOG_DEBUG, "Node addr %s %d\n", node->addr.host, node->addr.port);
@ -1673,19 +1679,14 @@ void ks_dhtrt_ping(ks_dhtrt_internal_t *internal, ks_dhtrt_bucket_entry_t *entry
}
static
void ks_dhtrt_find(ks_dhtrt_internal_t *internal, ks_dht_nodeid_t *nodeid) {
void ks_dhtrt_find(ks_dhtrt_routetable_t *table, ks_dhtrt_internal_t *internal, ks_dht_nodeid_t *target) {
#ifdef KS_DHT_DEBUGPRINTF_
char buf[100];
ks_log(KS_LOG_DEBUG, "Find queued for mask %s\n", ks_dhtrt_printableid(nodeid->id, buf));
#endif
ks_log(KS_LOG_DEBUG, "Find queued for target %s\n", ks_dhtrt_printableid(target->id, buf));
ks_dht_queue_search_findnode(internal->dht, table, target, NULL);
return;
}
/*
strictly for shifting the bucketheader mask
so format must be a right filled mask (hex: ..ffffffff)
@ -1723,7 +1724,8 @@ void ks_dhtrt_shiftleft(uint8_t *id) {
static
void ks_dhtrt_midmask(uint8_t *leftid, uint8_t *rightid, uint8_t *midpt) {
int i = 0;
uint8_t i = 0;
memset(midpt, 0, sizeof KS_DHT_NODEID_SIZE);
for ( ; i < KS_DHT_NODEID_SIZE; ++i) {
@ -1731,17 +1733,28 @@ void ks_dhtrt_midmask(uint8_t *leftid, uint8_t *rightid, uint8_t *midpt) {
if (leftid[i] == 0 && rightid[i] == 0) {
continue;
}
break; /* first non zero */
else if (leftid[i] == 0 || rightid[i] == 0) {
midpt[i] = leftid[i] | rightid[i];
continue;
}
else {
if (leftid[i] == rightid[i]) {
midpt[i] = leftid[i] >> 1;
i++;
}
else {
uint16_t x = leftid[i] + rightid[i];
x >>= 1;
midpt[i++] = (uint8_t)x;
}
break;
}
}
if (i == KS_DHT_NODEID_SIZE) {
return;
}
uint16_t x = leftid[i] + rightid[i];
x >>= 1;
midpt[i++] = (uint8_t)x;
if ( i < KS_DHT_NODEID_SIZE ) {
memcpy(&midpt[i], &rightid[i], KS_DHT_NODEID_SIZE-i);
}

View File

@ -98,6 +98,23 @@ KS_DECLARE(void) ks_dht_job_build_put(ks_dht_job_t *job,
job->query_storageitem = item;
}
KS_DECLARE(void) ks_dht_job_build_search_findnode(ks_dht_job_t *job,
ks_dht_nodeid_t *target,
uint32_t family,
ks_dht_job_callback_t query_callback,
ks_dht_job_callback_t finish_callback)
{
ks_assert(job);
ks_assert(target);
ks_assert(family);
job->search = NULL;
job->query_callback = query_callback;
job->finish_callback = finish_callback;
job->query_target = *target;
job->query_family = family;
}
KS_DECLARE(void) ks_dht_job_destroy(ks_dht_job_t **job)
{
ks_dht_job_t *j;

View File

@ -548,6 +548,117 @@ void test07()
}
void test08()
{
printf("**** testbuckets - test08 start\n"); fflush(stdout);
ks_dht_node_t *peer;
memset(g_nodeid1.id, 0xef, KS_DHT_NODEID_SIZE);
memset(g_nodeid2.id, 0xef, KS_DHT_NODEID_SIZE);
char ipv6[] = "1234:1234:1234:1234";
char ipv4[] = "123.123.123.123";
unsigned short port = 7000;
/* build a delete queue */
int cix=0;
for(int i0=0, i1=0; i0<150; ++i0, ++i1) {
if (i0%20 == 0) {
g_nodeid2.id[cix]>>=1;
//ks_dhtrt_dump(rt, 7);
if ( g_nodeid2.id[cix] == 0) ++cix;
g_nodeid2.id[19] = 0;
}
else {
++g_nodeid2.id[19];
}
ks_dhtrt_create_node(rt, g_nodeid2, KS_DHT_REMOTE, ipv4, port, KS_DHTRT_CREATE_DEFAULT, &peer);
ks_dhtrt_touch_node(rt, g_nodeid2);
ks_dhtrt_release_node(peer);
}
cix = 0;
memset(g_nodeid2.id, 0xef, KS_DHT_NODEID_SIZE);
for (int i0=0, i1=0; i0<150; ++i0, ++i1) {
if (i0%20 == 0) {
g_nodeid2.id[cix]>>=1;
if ( g_nodeid2.id[cix] == 0) ++cix;
g_nodeid2.id[19] = 0;
}
else {
++g_nodeid2.id[19];
}
ks_dht_node_t* n = ks_dhtrt_find_node(rt, g_nodeid2);
ks_dhtrt_release_node(n);
ks_dhtrt_delete_node(rt, n);
}
/* this should drive the search_findnode */
for(int i=0; i<45; ++i) {
printf("firing process table\n");
ks_dhtrt_process_table(rt);
ks_sleep(1000 * 1000 * 60); /* sleep one minutes */
}
printf("**** testbuckets - test08 ended\n"); fflush(stdout);
}
void test09()
{
printf("**** testbuckets - test09 start\n"); fflush(stdout);
ks_dht_node_t *peer;
memset(g_nodeid1.id, 0xef, KS_DHT_NODEID_SIZE);
memset(g_nodeid2.id, 0xef, KS_DHT_NODEID_SIZE);
char ipv6[] = "1234:1234:1234:1234";
char ipv4[] = "123.123.123.123";
unsigned short port = 7000;
/* build a delete queue */
int cix=0;
for(int i0=0, i1=0; i0<150; ++i0, ++i1) {
if (i0%20 == 0) {
g_nodeid2.id[cix]>>=1;
//ks_dhtrt_dump(rt, 7);
if ( g_nodeid2.id[cix] == 0) ++cix;
g_nodeid2.id[19] = 0;
}
else {
++g_nodeid2.id[19];
}
ks_dhtrt_create_node(rt, g_nodeid2, KS_DHT_REMOTE, ipv4, port, KS_DHTRT_CREATE_DEFAULT, &peer);
ks_dhtrt_touch_node(rt, g_nodeid2);
ks_dhtrt_release_node(peer);
}
/* this should expire all nodes after 15 minutes and 3 pings */
printf("\n\n\n\n");
for(int i=0; i<45; ++i) {
printf("firing process table\n");
ks_dhtrt_process_table(rt);
ks_sleep(1000 * 1000 * 30); /* sleep 30 seconds */
}
printf("**** testbuckets - test09 ended\n"); fflush(stdout);
}
static int gindex = 1;
static ks_mutex_t *glock;
static int gstop = 0;
@ -1020,7 +1131,6 @@ int main(int argc, char *argv[]) {
continue;
}
if (tests[tix] == 7) {
ks_dhtrt_initroute(&rt, dht, pool);
test07();
@ -1028,6 +1138,22 @@ int main(int argc, char *argv[]) {
continue;
}
if (tests[tix] == 8) {
ks_dhtrt_initroute(&rt, dht, pool);
test08();
ks_dhtrt_deinitroute(&rt);
continue;
}
if (tests[tix] == 9) {
ks_dhtrt_initroute(&rt, dht, pool);
test09();
ks_dhtrt_deinitroute(&rt);
continue;
}
if (tests[tix] == 30) {
ks_dhtrt_initroute(&rt, dht, pool);
test30();