FS-9775: Thread safe dht buckets continued

This commit is contained in:
colm 2016-12-14 21:07:04 -05:00 committed by Mike Jerris
parent fbc46839d9
commit 6013808781
1 changed files with 220 additions and 53 deletions

View File

@ -65,6 +65,7 @@ typedef struct ks_dhtrt_bucket_s {
ks_dhtrt_bucket_entry_t entries[KS_DHT_BUCKETSIZE];
uint8_t count;
uint8_t expired_count;
ks_rwl_t *lock; /* lock for safe traversal of the entry array */
} ks_dhtrt_bucket_t;
@ -81,11 +82,17 @@ typedef struct ks_dhtrt_bucket_header_s {
unsigned char flags;
} ks_dhtrt_bucket_header_t;
typedef struct ks_dhtrt_deletednode_s {
ks_dht_node_t* node;
struct ks_dhtrt_deletednode_s *next;
} ks_dhtrt_deletednode_t;
typedef struct ks_dhtrt_internal_s {
uint8_t localid[KS_DHT_NODEID_SIZE];
ks_dhtrt_bucket_header_t *buckets; /* root bucketheader */
ks_rwl_t *lock; /* lock for safe traversal of the tree */
ks_rwl_t *lock; /* lock for safe traversal of the tree */
ks_mutex_t *deleted_node_lock;
ks_dhtrt_deletednode_t *deleted_node;
} ks_dhtrt_internal_t;
typedef struct ks_dhtrt_xort_s {
@ -132,6 +139,10 @@ static
void ks_dhtrt_xor(const uint8_t *id1, const uint8_t *id2, uint8_t *xor);
static
int ks_dhtrt_ismasked(const uint8_t *id1, const uint8_t *mask);
static
void ks_dhtrt_queue_node_fordelete(ks_dhtrt_routetable_t *table, ks_dht_node_t* node);
static
void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table);
static
ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *node);
@ -177,6 +188,7 @@ KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_po
ks_dhtrt_internal_t *internal = ks_pool_alloc(pool, sizeof(ks_dhtrt_internal_t));
ks_rwl_create(&internal->lock, pool);
ks_mutex_create(&internal->deleted_node_lock, KS_MUTEX_FLAG_DEFAULT, pool);
table->internal = internal;
/* initialize root bucket */
@ -210,15 +222,20 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table,
unsigned short port,
ks_dht_node_t **node)
{
ks_dhtrt_internal_t* internal = table->internal;
ks_rwl_read_lock(internal->lock); /* grab write lock and insert */
ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id);
assert(header != NULL); /* should always find a header */
ks_dhtrt_bucket_entry_t *bentry = ks_dhtrt_find_bucketentry(header, nodeid.id);
if (bentry != 0) {
bentry->type = ks_time_now_sec();
(*node) = bentry->gptr;
return KS_STATUS_SUCCESS;
bentry->type = ks_time_now_sec();
(*node) = bentry->gptr;
ks_rwl_read_unlock(internal->lock);
return KS_STATUS_SUCCESS;
}
ks_rwl_read_unlock(internal->lock);
/* @todo - replace with reusable memory pool */
ks_dht_node_t *tnode = ks_pool_alloc(table->pool, sizeof(ks_dht_node_t));
@ -241,10 +258,7 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table,
return KS_STATUS_FAIL;
}
ks_dhtrt_internal_t* internal = table->internal;
ks_rwl_write_lock(internal->lock); /* grab write lock and insert */
ks_status_t s = ks_dhtrt_insert_node(table, tnode);
ks_rwl_write_unlock(internal->lock); /* release write lock */
(*node) = tnode;
@ -255,39 +269,56 @@ KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t *table, ks_dh
{
ks_status_t s = KS_STATUS_FAIL;
ks_dhtrt_internal_t* internal = table->internal;
ks_rwl_write_lock(internal->lock); /* grab write lock and delete */
ks_rwl_read_lock(internal->lock); /* grab read lock */
ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, node->nodeid.id);
if (header != 0) {
ks_dhtrt_bucket_t *bucket = header->bucket;
if (bucket != 0) { /* we found a bucket*/
ks_rwl_write_lock(bucket->lock);
s = ks_dhtrt_delete_id(bucket, node->nodeid.id);
ks_rwl_write_unlock(bucket->lock);
}
}
ks_rwl_write_unlock(internal->lock); /* release write lock */
ks_rwl_read_unlock(internal->lock); /* release write lock */
/* at this point no subsequent find/query will return the node - so we can
safely free it if we can grab the write lock
Having held the write lock on the bucket we know no other thread
is awaiting a read/write lock on the node
*/
ks_rwl_destroy(&(node->reflock));
ks_pool_free(table->pool, node);
if (ks_rwl_try_write_lock(node->reflock) == KS_STATUS_SUCCESS) { /* grab exclusive lock on node */
ks_rwl_destroy(&(node->reflock));
ks_pool_free(table->pool, node);
}
else {
ks_dhtrt_queue_node_fordelete(table, node);
}
return s;
}
static
ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *node)
{
ks_dhtrt_internal_t* internal = table->internal;
ks_dhtrt_bucket_t *bucket = 0;
int insanity = 0;
ks_rwl_write_lock(internal->lock);
ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, node->nodeid.id);
assert(header != NULL); /* should always find a header */
bucket = header->bucket;
if (bucket == 0) {
ks_rwl_write_unlock(internal->lock);
return KS_STATUS_FAIL; /* we were not able to find a bucket*/
}
ks_rwl_write_lock(bucket->lock);
while (bucket->count == KS_DHT_BUCKETSIZE) {
if (insanity > 3200) assert(insanity < 3200);
@ -295,8 +326,11 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
/* first - seek a stale entry to eject */
if (bucket->expired_count) {
ks_status_t s = ks_dhtrt_insert_id(bucket, node);
if (s == KS_STATUS_SUCCESS) return KS_STATUS_SUCCESS;
if (s == KS_STATUS_SUCCESS) {
ks_rwl_write_unlock(bucket->lock);
ks_rwl_write_unlock(internal->lock);
return KS_STATUS_SUCCESS;
}
}
/*
@ -310,6 +344,8 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
char buffer[100];
printf(" nodeid %s was not inserted\n", ks_dhtrt_printableid(node->nodeid.id, buffer));
#endif
ks_rwl_write_unlock(bucket->lock);
ks_rwl_write_unlock(internal->lock);
return KS_STATUS_FAIL;
}
@ -323,6 +359,8 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
char buffer[100];
printf(" nodeid %s was not inserted\n", ks_dhtrt_printableid(node->nodeid.id, buffer));
#endif
ks_rwl_write_unlock(bucket->lock);
ks_rwl_write_unlock(internal->lock);
return KS_STATUS_FAIL;
}
@ -343,9 +381,12 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
/* which bucket do care about */
if (ks_dhtrt_ismasked(node->nodeid.id, newleft->mask)) {
bucket = newleft->bucket;
ks_rwl_write_lock(bucket->lock); /* lock new bucket */
ks_rwl_write_unlock(header->right->bucket->lock); /* unlock old bucket */
header = newleft;
} else {
bucket = newright->bucket;
/* note: we still hold a lock on the bucket */
header = newright;
}
++insanity;
@ -357,20 +398,49 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
printf("into bucket %s\n", ks_dhtrt_printableid(header->mask, buffer));
#endif
/* by this point we have a viable bucket */
return ks_dhtrt_insert_id(bucket, node);
/* by this point we have a viable & locked bucket
so downgrade the internal lock to read. safe as we hold the bucket write lock
preventing it being sptlit under us.
*/
ks_rwl_write_unlock(internal->lock);
ks_rwl_read_lock(internal->lock);
ks_status_t s = ks_dhtrt_insert_id(bucket, node);
ks_rwl_read_unlock(internal->lock);
ks_rwl_write_unlock(bucket->lock);
return s;
}
KS_DECLARE(ks_dht_node_t *) ks_dhtrt_find_node(ks_dhtrt_routetable_t *table, ks_dht_nodeid_t nodeid) {
KS_DECLARE(ks_dht_node_t *) ks_dhtrt_find_node(ks_dhtrt_routetable_t *table, ks_dht_nodeid_t nodeid)
{
ks_dht_node_t* node = NULL;
ks_dhtrt_internal_t* internal = table->internal;
ks_rwl_read_lock(internal->lock); /* grab read lock */
ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id);
if (header == 0) return NULL;
if (header != 0) {
ks_dhtrt_bucket_t *bucket = header->bucket;
ks_dhtrt_bucket_t *bucket = header->bucket;
if (bucket == 0) return NULL; /* probably a logic error ?*/
if (bucket != 0) { /* probably a logic error ?*/
return ks_dhtrt_find_nodeid(bucket, nodeid.id);
ks_rwl_read_lock(bucket->lock);
ks_dht_node_t* node = ks_dhtrt_find_nodeid(bucket, nodeid.id);
if (node != NULL) {
ks_rwl_read_lock(node->reflock);
}
ks_rwl_read_unlock(bucket->lock);
}
}
ks_rwl_read_unlock(internal->lock);
return node;
}
KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table, ks_dht_nodeid_t nodeid)
@ -378,9 +448,11 @@ KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table, ks_dh
ks_status_t s = KS_STATUS_FAIL;
ks_dhtrt_internal_t* internal = table->internal;
ks_rwl_read_lock(internal->lock); /* grab read lock */
ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id);
if (header != 0 && header->bucket != 0) {
ks_rwl_write_lock(header->bucket->lock);
ks_dhtrt_bucket_entry_t *e = ks_dhtrt_find_bucketentry(header, nodeid.id);
if (e != 0) {
@ -394,9 +466,9 @@ KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table, ks_dh
e->flags = DHTPEER_ACTIVE;
s = KS_STATUS_SUCCESS;
}
ks_rwl_write_unlock(header->bucket->lock);
}
ks_rwl_read_lock(internal->lock); /* release read lock */
ks_rwl_read_unlock(internal->lock); /* release read lock */
return s;
}
@ -407,15 +479,15 @@ KS_DECLARE(ks_status_t) ks_dhtrt_expire_node(ks_dhtrt_routetable_t *table, ks_dh
ks_rwl_read_lock(internal->lock); /* grab read lock */
ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id);
if (header != 0) {
if (header != 0 && header->bucket != 0) {
ks_rwl_write_lock(header->bucket->lock);
ks_dhtrt_bucket_entry_t *e = ks_dhtrt_find_bucketentry(header, nodeid.id);
if (e != 0) {
e->flags = DHTPEER_EXPIRED;
s = KS_STATUS_SUCCESS;
}
ks_rwl_write_unlock(header->bucket->lock);
}
ks_rwl_read_unlock(internal->lock); /* release read lock */
return s;
@ -536,7 +608,6 @@ uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt
if (lheader) {
xortn = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_sortedxors_t));
memset(xortn, 0, sizeof(ks_dhtrt_sortedxors_t));
if (tofree == 0) tofree = xortn;
@ -558,7 +629,6 @@ uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt
if (rheader) {
xortn1 = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_sortedxors_t));
memset(xortn1, 0, sizeof(ks_dhtrt_sortedxors_t));
prev->next = xortn1;
prev = xortn1;
cnt = ks_dhtrt_findclosest_bucketnodes(query->nodeid.id, query->type, query->family,
@ -598,7 +668,7 @@ uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt
KS_DECLARE(ks_status_t) ks_dhtrt_release_node(ks_dht_node_t* node)
{
return KS_STATUS_SUCCESS;
/* return ks_rwl_read_unlock(node->reflock);*/
return ks_rwl_read_unlock(node->reflock);
}
@ -630,33 +700,66 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
stack[stackix++] = header;
if (header->bucket) {
ks_dhtrt_bucket_t *b = header->bucket;
for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
ks_dhtrt_bucket_entry_t *e = &b->entries[ix];
if (ks_rwl_try_write_lock(b->lock) == KS_STATUS_SUCCESS) {
if (e->inuse == 1) {
/* more than n pings outstanding? */
#ifdef KS_DHT_DEBUGLOCKPRINTF_
char buf[100];
printf("process_table: LOCKING bucket %s\n",
ks_dhtrt_printableid(header->mask, buf));
fflush(stdout);
#endif
if (e->outstanding_pings >= KS_DHTRT_MAXPING) {
e->flags = DHTPEER_EXPIRED;
++b->expired_count;
continue;
}
if (e->flags == DHTPEER_SUSPECT) {
ks_dhtrt_ping(e);
continue;
}
for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
ks_dhtrt_bucket_entry_t *e = &b->entries[ix];
ks_time_t tdiff = t0 - e->tyme;
if (e->inuse == 1) {
/* more than n pings outstanding? */
if (tdiff > KS_DHTRT_INACTIVETIME) {
e->flags = DHTPEER_SUSPECT;
ks_dhtrt_ping(e);
}
}
} /* end for each bucket_entry */
if (e->outstanding_pings >= KS_DHTRT_MAXPING) {
e->flags = DHTPEER_EXPIRED;
++b->expired_count;
continue;
}
if (e->flags == DHTPEER_SUSPECT) {
ks_dhtrt_ping(e);
continue;
}
ks_time_t tdiff = t0 - e->tyme;
if (tdiff > KS_DHTRT_INACTIVETIME) {
e->flags = DHTPEER_SUSPECT;
ks_dhtrt_ping(e);
}
} /* end if e->inuse */
} /* end for each bucket_entry */
#ifdef KS_DHT_DEBUGLOCKPRINTF_
char buf1[100];
printf("process_table: UNLOCKING bucket %s\n",
ks_dhtrt_printableid(header->mask, buf1));
fflush(stdout);
#endif
ks_rwl_write_unlock(b->lock);
} /* end of if trywrite_lock successful */
else {
#ifdef KS_DHT_DEBUGPRINTF_
char buf2[100];
printf("process_table: unble to LOCK bucket %s\n",
ks_dhtrt_printableid(header->mask, buf2));
fflush(stdout);
#endif
}
}
header = header->left;
@ -668,9 +771,46 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
}
}
ks_rwl_read_unlock(internal->lock); /* release read lock */
ks_dhtrt_process_deleted(table);
return;
}
void ks_dhtrt_process_deleted(ks_dhtrt_routetable_t *table)
{
ks_dhtrt_internal_t* internal = table->internal;
ks_mutex_lock(internal->deleted_node_lock);
ks_dhtrt_deletednode_t *deleted = internal->deleted_node;
ks_dhtrt_deletednode_t *prev = NULL, *temp=NULL;
while(deleted) {
ks_dht_node_t* node = deleted->node;
if (ks_rwl_try_write_lock(node->reflock) == KS_STATUS_SUCCESS) {
ks_rwl_destroy(&(node->reflock));
ks_pool_free(table->pool, node);
temp = deleted;
deleted = deleted->next;
ks_pool_free(table->pool, temp);
if (prev != NULL) {
prev->next = deleted;
}
else {
internal->deleted_node = deleted;
}
}
else {
prev = deleted;
deleted = prev->next;
}
}
ks_mutex_unlock(internal->deleted_node_lock);
}
KS_DECLARE(void) ks_dhtrt_dump(ks_dhtrt_routetable_t *table, int level) {
/* dump buffer headers */
@ -728,7 +868,6 @@ ks_dhtrt_bucket_header_t *ks_dhtrt_create_bucketheader(ks_pool_t *pool, ks_dhtrt
{
ks_dhtrt_bucket_header_t *header = ks_pool_alloc(pool, sizeof(ks_dhtrt_bucket_header_t));
memset(header, 0, sizeof(ks_dhtrt_bucket_header_t));
memcpy(header->mask, mask, sizeof(header->mask));
header->parent = parent;
@ -745,9 +884,7 @@ static
ks_dhtrt_bucket_t *ks_dhtrt_create_bucket(ks_pool_t *pool)
{
ks_dhtrt_bucket_t *bucket = ks_pool_alloc(pool, sizeof(ks_dhtrt_bucket_t));
memset(bucket, 0, sizeof(ks_dhtrt_bucket_t));
/*ks_rwl_create(&bucket->lock, pool);*/
ks_rwl_create(&bucket->lock, pool);
return bucket;
}
@ -995,6 +1132,15 @@ uint8_t ks_dhtrt_findclosest_bucketnodes(ks_dhtrt_nodeid_t id,
}
ks_rwl_read_lock(bucket->lock); /* get a read lock : released in load_query when the results are copied */
#ifdef KS_DHT_DEBUGLOCKPRINTF_
char buf[100];
printf("closestbucketnodes: LOCKING bucket %s\n",
ks_dhtrt_printableid(header->mask, buf));
fflush(stdout);
#endif
for (uint8_t ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
if ( bucket->entries[ix].inuse == 1 &&
(family == ifboth || bucket->entries[ix].family == family) &&
@ -1064,14 +1210,35 @@ uint8_t ks_dhtrt_load_query(ks_dhtrt_querynodes_t *query, ks_dhtrt_sortedxors_t
query->nodes[ix] = current->bheader->bucket->entries[z].gptr;
xorix = current->xort[xorix].nextix;
++loaded;
}
}
#ifdef KS_DHT_DEBUGLOCKPRINTF_
char buf1[100];
printf("load_query: UNLOCKING bucket %s\n",
ks_dhtrt_printableid(current->bheader->mask, buf1));
fflush(stdout);
#endif
ks_rwl_read_unlock(current->bheader->bucket->lock); /* release the read lock from findclosest_bucketnodes */
if (loaded >= query->max) break;
current = current->next;
}
query->count = loaded;
return loaded;
}
void ks_dhtrt_queue_node_fordelete(ks_dhtrt_routetable_t* table, ks_dht_node_t* node)
{
ks_dhtrt_internal_t* internal = table->internal;
ks_dhtrt_deletednode_t* deleted = ks_pool_alloc(table->pool, sizeof(ks_dhtrt_deletednode_t));
deleted->node = node;
ks_mutex_lock(internal->deleted_node_lock);
deleted->next = internal->deleted_node;
internal->deleted_node = deleted;
ks_mutex_unlock(internal->deleted_node_lock);
}
void ks_dhtrt_ping(ks_dhtrt_bucket_entry_t *entry) {
++entry->outstanding_pings;
/* @todo */