FS-9775: Thread safe dht buckets

This commit is contained in:
colm 2016-12-13 20:47:55 -05:00 committed by Mike Jerris
parent e52a85eb8d
commit fbc46839d9
1 changed files with 86 additions and 69 deletions

View File

@ -65,8 +65,6 @@ typedef struct ks_dhtrt_bucket_s {
ks_dhtrt_bucket_entry_t entries[KS_DHT_BUCKETSIZE];
uint8_t count;
uint8_t expired_count;
ks_rwl_t * lock;
uint8_t locked;
} ks_dhtrt_bucket_t;
@ -88,7 +86,6 @@ typedef struct ks_dhtrt_internal_s {
uint8_t localid[KS_DHT_NODEID_SIZE];
ks_dhtrt_bucket_header_t *buckets; /* root bucketheader */
ks_rwl_t *lock; /* lock for safe traversal of the tree */
uint8_t locked;
} ks_dhtrt_internal_t;
typedef struct ks_dhtrt_xort_s {
@ -141,11 +138,14 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no
static
ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node);
static
void ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id);
ks_status_t ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id);
static
char *ks_dhtrt_printableid(uint8_t *id, char *buffer);
static
unsigned char ks_dhtrt_isactive(ks_dhtrt_bucket_entry_t *entry);
static
uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt_querynodes_t *query);
static
uint8_t ks_dhtrt_load_query(ks_dhtrt_querynodes_t *query, ks_dhtrt_sortedxors_t *xort);
static
@ -176,7 +176,7 @@ KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_po
ks_dhtrt_internal_t *internal = ks_pool_alloc(pool, sizeof(ks_dhtrt_internal_t));
/*ks_rwl_create(&internal->lock, pool);*/
ks_rwl_create(&internal->lock, pool);
table->internal = internal;
/* initialize root bucket */
@ -236,32 +236,42 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table,
tnode->type = type;
if (( ks_addr_set(&tnode->addr, ip, port, tnode->family) != KS_STATUS_SUCCESS) ||
( ks_dhtrt_insert_node(table, tnode) != KS_STATUS_SUCCESS) ||
( ks_rwl_create(&tnode->reflock, table->pool) != KS_STATUS_SUCCESS)) {
( ks_rwl_create(&tnode->reflock, table->pool) != KS_STATUS_SUCCESS)) {
ks_pool_free(table->pool, tnode);
return KS_STATUS_FAIL;
}
ks_dhtrt_internal_t* internal = table->internal;
ks_rwl_write_lock(internal->lock); /* grab write lock and insert */
ks_status_t s = ks_dhtrt_insert_node(table, tnode);
ks_rwl_write_unlock(internal->lock); /* release write lock */
(*node) = tnode;
return KS_STATUS_SUCCESS;
return s;
}
KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *node)
{
ks_status_t s = KS_STATUS_FAIL;
ks_dhtrt_internal_t* internal = table->internal;
ks_rwl_write_lock(internal->lock); /* grab write lock and delete */
ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, node->nodeid.id);
if (header != 0) {
ks_dhtrt_bucket_t *bucket = header->bucket;
if (bucket != 0) { /* we were not able to find a bucket*/
ks_dhtrt_delete_id(bucket, node->nodeid.id);
if (bucket != 0) { /* we found a bucket*/
s = ks_dhtrt_delete_id(bucket, node->nodeid.id);
}
}
ks_rwl_destroy(&node->reflock);
ks_rwl_write_unlock(internal->lock); /* release write lock */
ks_rwl_destroy(&(node->reflock));
ks_pool_free(table->pool, node);
return KS_STATUS_SUCCESS;
return s;
}
static
@ -365,40 +375,64 @@ KS_DECLARE(ks_dht_node_t *) ks_dhtrt_find_node(ks_dhtrt_routetable_t *table, ks_
KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table, ks_dht_nodeid_t nodeid)
{
ks_status_t s = KS_STATUS_FAIL;
ks_dhtrt_internal_t* internal = table->internal;
ks_rwl_read_lock(internal->lock); /* grab read lock */
ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id);
if (header == 0) return KS_STATUS_FAIL;
if (header->bucket == 0) return KS_STATUS_FAIL;
if (header != 0 && header->bucket != 0) {
ks_dhtrt_bucket_entry_t *e = ks_dhtrt_find_bucketentry(header, nodeid.id);
ks_dhtrt_bucket_entry_t *e = ks_dhtrt_find_bucketentry(header, nodeid.id);
if (e != 0) {
e->tyme = ks_time_now();
e->outstanding_pings = 0;
if (e->flags == DHTPEER_EXPIRED) {
--header->bucket->expired_count;
}
e->flags = DHTPEER_ACTIVE;
s = KS_STATUS_SUCCESS;
}
if (e != 0) {
e->tyme = ks_time_now();
e->outstanding_pings = 0;
if (e->flags == DHTPEER_EXPIRED) --header->bucket->expired_count;
e->flags = DHTPEER_ACTIVE;
return KS_STATUS_SUCCESS;
}
return KS_STATUS_FAIL;
ks_rwl_read_lock(internal->lock); /* release read lock */
return s;
}
KS_DECLARE(ks_status_t) ks_dhtrt_expire_node(ks_dhtrt_routetable_t *table, ks_dht_nodeid_t nodeid)
{
ks_status_t s = KS_STATUS_FAIL;
ks_dhtrt_internal_t *internal = table->internal;
ks_rwl_read_lock(internal->lock); /* grab read lock */
ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id);
if (header == 0) return KS_STATUS_FAIL;
if (header != 0) {
ks_dhtrt_bucket_entry_t *e = ks_dhtrt_find_bucketentry(header, nodeid.id);
ks_dhtrt_bucket_entry_t *e = ks_dhtrt_find_bucketentry(header, nodeid.id);
if (e != 0) {
e->flags = DHTPEER_EXPIRED;
s = KS_STATUS_SUCCESS;
}
if (e != 0) {
e->flags = DHTPEER_EXPIRED;
return KS_STATUS_SUCCESS;
}
return KS_STATUS_FAIL;
ks_rwl_read_unlock(internal->lock); /* release read lock */
return s;
}
KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt_querynodes_t *query)
KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt_querynodes_t *query)
{
uint8_t count = 0;
ks_dhtrt_internal_t *internal = table->internal;
ks_rwl_read_lock(internal->lock); /* grab read lock */
count = ks_dhtrt_findclosest_locked_nodes(table, query);
ks_rwl_read_unlock(internal->lock); /* release read lock */
return count;
}
static
uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt_querynodes_t *query)
{
uint8_t max = query->max;
uint8_t total = 0;
@ -416,9 +450,7 @@ KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable_t *table, ks_
printf(" starting at mask: %s\n", ks_dhtrt_printableid(header->mask, buffer));
#endif
ks_dhtrt_sortedxors_t xort0;
memset(&xort0, 0 , sizeof(xort0));
ks_dhtrt_nodeid_t initid;
@ -586,6 +618,9 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
/* */
ks_dhtrt_internal_t *internal = table->internal;
ks_rwl_read_lock(internal->lock); /* grab read lock */
ks_dhtrt_bucket_header_t *header = internal->buckets;
ks_dhtrt_bucket_header_t *stack[KS_DHT_NODEID_SIZE * 8];
int stackix=0;
@ -632,6 +667,7 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table)
header = header->right;
}
}
ks_rwl_read_unlock(internal->lock); /* release read lock */
return;
}
@ -645,6 +681,7 @@ KS_DECLARE(void) ks_dhtrt_dump(ks_dhtrt_routetable_t *table, int level) {
ks_dhtrt_bucket_header_t *stack[KS_DHT_NODEID_SIZE * 8];
int stackix = 0;
ks_rwl_read_lock(internal->lock); /* grab read lock */
while (header) {
stack[stackix++] = header;
/* walk and report left handsize */
@ -655,7 +692,7 @@ KS_DECLARE(void) ks_dhtrt_dump(ks_dhtrt_routetable_t *table, int level) {
ks_dhtrt_bucket_t *b = header->bucket;
printf(" bucket holds %d entries\n", b->count);
if (level == 7) {
if (b->count > 0 && level == 7) {
printf(" --------------------------\n");
for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
@ -678,7 +715,7 @@ KS_DECLARE(void) ks_dhtrt_dump(ks_dhtrt_routetable_t *table, int level) {
header = header->right;
}
}
ks_rwl_read_unlock(internal->lock); /* release read lock */
return;
}
@ -774,12 +811,6 @@ void ks_dhtrt_split_bucket(ks_dhtrt_bucket_header_t *original,
int lix = 0;
int rix = 0;
/* ****************** */
/* bucket write lock */
/* ****************** */
/*ks_rwl_write_lock(source->lock);*/
source->locked=1;
for ( ; rix<KS_DHT_BUCKETSIZE; ++rix) {
if (ks_dhtrt_ismasked(source->entries[rix].id, left->mask)) {
/* move it to the left */
@ -796,11 +827,6 @@ void ks_dhtrt_split_bucket(ks_dhtrt_bucket_header_t *original,
--source->count;
}
}
/* *********************** */
/* end bucket write lock */
/* *********************** */
source->locked=0;
/*ks_rwl_write_unlock(source->lock);*/
/* give original bucket to the new left hand side header */
right->bucket = source;
@ -826,7 +852,7 @@ static
ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node)
{
/* sanity checks */
if (!bucket || bucket->count >= KS_DHT_BUCKETSIZE) {
if (!bucket || bucket->count > KS_DHT_BUCKETSIZE) {
assert(0);
}
@ -848,7 +874,7 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node)
else if (!memcmp(bucket->entries[ix].id, node->nodeid.id, KS_DHT_NODEID_SIZE)) {
#ifdef KS_DHT_DEBUGPRINTF_
char buffer[100];
printf("duplicate peer %s found at %d ", ks_dhtrt_printableid(node->nodeid.id, buffer), ix);
printf("duplicate peer %s found at %d\n", ks_dhtrt_printableid(node->nodeid.id, buffer), ix);
#endif
bucket->entries[ix].tyme = ks_time_now();
bucket->entries[ix].flags &= DHTPEER_ACTIVE;
@ -856,12 +882,6 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node)
}
}
/* ****************** */
/* bucket write lock */
/* ****************** */
/*ks_rwl_write_lock(bucket->lock);*/
bucket->locked = 1;
if (free == KS_DHT_BUCKETSIZE && expiredix<KS_DHT_BUCKETSIZE ) {
/* bump this one - but only if we have no other option */
free = expiredix;
@ -876,23 +896,18 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node)
bucket->entries[free].tyme = ks_time_now();
bucket->entries[free].flags &= DHTPEER_ACTIVE;
++bucket->count;
if (free != expiredix) { /* are we are taking a free slot rather than replacing an expired node? */
++bucket->count; /* yes: increment total count */
}
memcpy(bucket->entries[free].id, node->nodeid.id, KS_DHT_NODEID_SIZE);
bucket->locked = 0;
/*ks_rwl_write_unlock(bucket->lock);*/
#ifdef KS_DHT_DEBUGPRINTF_
char buffer[100];
printf("Inserting node %s\n", ks_dhtrt_printableid(node->nodeid.id, buffer));
printf("Inserting node %s at %d\n", ks_dhtrt_printableid(node->nodeid.id, buffer), free);
#endif
return KS_STATUS_SUCCESS;
}
bucket->locked = 0;
/*ks_rwl_write_unlock(bucket->lock);*/
/* ********************** */
/* end bucket write lock */
/* ********************** */
return KS_STATUS_FAIL;
}
@ -923,7 +938,7 @@ ks_dht_node_t *ks_dhtrt_find_nodeid(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t
}
static
void ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id)
ks_status_t ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id)
{
#ifdef KS_DHT_DEBUGPRINTF_
@ -932,20 +947,22 @@ void ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id)
#endif
for (int ix=0; ix<KS_DHT_BUCKETSIZE; ++ix) {
#ifdef KS_DHT_DEBUGPRINTF_
#ifdef KS_DHT_DEBUGPRINTFX_
char bufferx[100];_
printf("\nbucket->entries[%d].id = %s inuse=%c\n", ix,
ks_dhtrt_printableid(bucket->entries[ix].id, buffer),
ks_dhtrt_printableid(bucket->entries[ix].id, bufferx),
bucket->entries[ix].inuse );
#endif
if ( bucket->entries[ix].inuse == 1 &&
(!memcmp(id, bucket->entries[ix].id, KS_DHT_NODEID_SIZE)) ) {
bucket->entries[ix].inuse = 0;
bucket->entries[ix].gptr = 0;
bucket->entries[ix].gptr = 0;
bucket->entries[ix].flags = 0;
return;
--bucket->count;
return KS_STATUS_SUCCESS;
}
}
return;
return KS_STATUS_FAIL;
}