diff --git a/libs/libks/src/dht/ks_dht_bucket.c b/libs/libks/src/dht/ks_dht_bucket.c index 3a68bceadc..53f8a2e2b2 100644 --- a/libs/libks/src/dht/ks_dht_bucket.c +++ b/libs/libks/src/dht/ks_dht_bucket.c @@ -65,8 +65,6 @@ typedef struct ks_dhtrt_bucket_s { ks_dhtrt_bucket_entry_t entries[KS_DHT_BUCKETSIZE]; uint8_t count; uint8_t expired_count; - ks_rwl_t * lock; - uint8_t locked; } ks_dhtrt_bucket_t; @@ -88,7 +86,6 @@ typedef struct ks_dhtrt_internal_s { uint8_t localid[KS_DHT_NODEID_SIZE]; ks_dhtrt_bucket_header_t *buckets; /* root bucketheader */ ks_rwl_t *lock; /* lock for safe traversal of the tree */ - uint8_t locked; } ks_dhtrt_internal_t; typedef struct ks_dhtrt_xort_s { @@ -141,11 +138,14 @@ ks_status_t ks_dhtrt_insert_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *no static ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node); static -void ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id); +ks_status_t ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id); static char *ks_dhtrt_printableid(uint8_t *id, char *buffer); static unsigned char ks_dhtrt_isactive(ks_dhtrt_bucket_entry_t *entry); + +static +uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt_querynodes_t *query); static uint8_t ks_dhtrt_load_query(ks_dhtrt_querynodes_t *query, ks_dhtrt_sortedxors_t *xort); static @@ -176,7 +176,7 @@ KS_DECLARE(ks_status_t) ks_dhtrt_initroute(ks_dhtrt_routetable_t **tableP, ks_po ks_dhtrt_internal_t *internal = ks_pool_alloc(pool, sizeof(ks_dhtrt_internal_t)); - /*ks_rwl_create(&internal->lock, pool);*/ + ks_rwl_create(&internal->lock, pool); table->internal = internal; /* initialize root bucket */ @@ -236,32 +236,42 @@ KS_DECLARE(ks_status_t) ks_dhtrt_create_node( ks_dhtrt_routetable_t *table, tnode->type = type; if (( ks_addr_set(&tnode->addr, ip, port, tnode->family) != KS_STATUS_SUCCESS) || - ( ks_dhtrt_insert_node(table, tnode) != KS_STATUS_SUCCESS) || - ( ks_rwl_create(&tnode->reflock, table->pool) != KS_STATUS_SUCCESS)) { + ( ks_rwl_create(&tnode->reflock, table->pool) != KS_STATUS_SUCCESS)) { ks_pool_free(table->pool, tnode); return KS_STATUS_FAIL; } + + ks_dhtrt_internal_t* internal = table->internal; + ks_rwl_write_lock(internal->lock); /* grab write lock and insert */ + ks_status_t s = ks_dhtrt_insert_node(table, tnode); + ks_rwl_write_unlock(internal->lock); /* release write lock */ (*node) = tnode; - return KS_STATUS_SUCCESS; + return s; } KS_DECLARE(ks_status_t) ks_dhtrt_delete_node(ks_dhtrt_routetable_t *table, ks_dht_node_t *node) { + ks_status_t s = KS_STATUS_FAIL; + ks_dhtrt_internal_t* internal = table->internal; + ks_rwl_write_lock(internal->lock); /* grab write lock and delete */ ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, node->nodeid.id); if (header != 0) { ks_dhtrt_bucket_t *bucket = header->bucket; - if (bucket != 0) { /* we were not able to find a bucket*/ - ks_dhtrt_delete_id(bucket, node->nodeid.id); + if (bucket != 0) { /* we found a bucket*/ + s = ks_dhtrt_delete_id(bucket, node->nodeid.id); } + } - ks_rwl_destroy(&node->reflock); + ks_rwl_write_unlock(internal->lock); /* release write lock */ + + ks_rwl_destroy(&(node->reflock)); ks_pool_free(table->pool, node); - return KS_STATUS_SUCCESS; + return s; } static @@ -365,40 +375,64 @@ KS_DECLARE(ks_dht_node_t *) ks_dhtrt_find_node(ks_dhtrt_routetable_t *table, ks_ KS_DECLARE(ks_status_t) ks_dhtrt_touch_node(ks_dhtrt_routetable_t *table, ks_dht_nodeid_t nodeid) { + ks_status_t s = KS_STATUS_FAIL; + ks_dhtrt_internal_t* internal = table->internal; + ks_rwl_read_lock(internal->lock); /* grab read lock */ ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id); - if (header == 0) return KS_STATUS_FAIL; - if (header->bucket == 0) return KS_STATUS_FAIL; + if (header != 0 && header->bucket != 0) { + ks_dhtrt_bucket_entry_t *e = ks_dhtrt_find_bucketentry(header, nodeid.id); - ks_dhtrt_bucket_entry_t *e = ks_dhtrt_find_bucketentry(header, nodeid.id); + if (e != 0) { + e->tyme = ks_time_now(); + e->outstanding_pings = 0; + + if (e->flags == DHTPEER_EXPIRED) { + --header->bucket->expired_count; + } + + e->flags = DHTPEER_ACTIVE; + s = KS_STATUS_SUCCESS; + } - if (e != 0) { - e->tyme = ks_time_now(); - e->outstanding_pings = 0; - if (e->flags == DHTPEER_EXPIRED) --header->bucket->expired_count; - e->flags = DHTPEER_ACTIVE; - return KS_STATUS_SUCCESS; } - - return KS_STATUS_FAIL; + ks_rwl_read_lock(internal->lock); /* release read lock */ + return s; } KS_DECLARE(ks_status_t) ks_dhtrt_expire_node(ks_dhtrt_routetable_t *table, ks_dht_nodeid_t nodeid) { + ks_status_t s = KS_STATUS_FAIL; + ks_dhtrt_internal_t *internal = table->internal; + ks_rwl_read_lock(internal->lock); /* grab read lock */ ks_dhtrt_bucket_header_t *header = ks_dhtrt_find_bucketheader(table, nodeid.id); - if (header == 0) return KS_STATUS_FAIL; + if (header != 0) { - ks_dhtrt_bucket_entry_t *e = ks_dhtrt_find_bucketentry(header, nodeid.id); + ks_dhtrt_bucket_entry_t *e = ks_dhtrt_find_bucketentry(header, nodeid.id); + + if (e != 0) { + e->flags = DHTPEER_EXPIRED; + s = KS_STATUS_SUCCESS; + } - if (e != 0) { - e->flags = DHTPEER_EXPIRED; - return KS_STATUS_SUCCESS; } - return KS_STATUS_FAIL; + ks_rwl_read_unlock(internal->lock); /* release read lock */ + return s; } -KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt_querynodes_t *query) +KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt_querynodes_t *query) +{ + uint8_t count = 0; + ks_dhtrt_internal_t *internal = table->internal; + ks_rwl_read_lock(internal->lock); /* grab read lock */ + count = ks_dhtrt_findclosest_locked_nodes(table, query); + ks_rwl_read_unlock(internal->lock); /* release read lock */ + return count; +} + +static +uint8_t ks_dhtrt_findclosest_locked_nodes(ks_dhtrt_routetable_t *table, ks_dhtrt_querynodes_t *query) { uint8_t max = query->max; uint8_t total = 0; @@ -416,9 +450,7 @@ KS_DECLARE(uint8_t) ks_dhtrt_findclosest_nodes(ks_dhtrt_routetable_t *table, ks_ printf(" starting at mask: %s\n", ks_dhtrt_printableid(header->mask, buffer)); #endif - ks_dhtrt_sortedxors_t xort0; - memset(&xort0, 0 , sizeof(xort0)); ks_dhtrt_nodeid_t initid; @@ -586,6 +618,9 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table) /* */ ks_dhtrt_internal_t *internal = table->internal; + + ks_rwl_read_lock(internal->lock); /* grab read lock */ + ks_dhtrt_bucket_header_t *header = internal->buckets; ks_dhtrt_bucket_header_t *stack[KS_DHT_NODEID_SIZE * 8]; int stackix=0; @@ -632,6 +667,7 @@ KS_DECLARE(void) ks_dhtrt_process_table(ks_dhtrt_routetable_t *table) header = header->right; } } + ks_rwl_read_unlock(internal->lock); /* release read lock */ return; } @@ -645,6 +681,7 @@ KS_DECLARE(void) ks_dhtrt_dump(ks_dhtrt_routetable_t *table, int level) { ks_dhtrt_bucket_header_t *stack[KS_DHT_NODEID_SIZE * 8]; int stackix = 0; + ks_rwl_read_lock(internal->lock); /* grab read lock */ while (header) { stack[stackix++] = header; /* walk and report left handsize */ @@ -655,7 +692,7 @@ KS_DECLARE(void) ks_dhtrt_dump(ks_dhtrt_routetable_t *table, int level) { ks_dhtrt_bucket_t *b = header->bucket; printf(" bucket holds %d entries\n", b->count); - if (level == 7) { + if (b->count > 0 && level == 7) { printf(" --------------------------\n"); for (int ix=0; ixright; } } - + ks_rwl_read_unlock(internal->lock); /* release read lock */ return; } @@ -774,12 +811,6 @@ void ks_dhtrt_split_bucket(ks_dhtrt_bucket_header_t *original, int lix = 0; int rix = 0; - /* ****************** */ - /* bucket write lock */ - /* ****************** */ - /*ks_rwl_write_lock(source->lock);*/ - source->locked=1; - for ( ; rixentries[rix].id, left->mask)) { /* move it to the left */ @@ -796,11 +827,6 @@ void ks_dhtrt_split_bucket(ks_dhtrt_bucket_header_t *original, --source->count; } } - /* *********************** */ - /* end bucket write lock */ - /* *********************** */ - source->locked=0; - /*ks_rwl_write_unlock(source->lock);*/ /* give original bucket to the new left hand side header */ right->bucket = source; @@ -826,7 +852,7 @@ static ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node) { /* sanity checks */ - if (!bucket || bucket->count >= KS_DHT_BUCKETSIZE) { + if (!bucket || bucket->count > KS_DHT_BUCKETSIZE) { assert(0); } @@ -848,7 +874,7 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node) else if (!memcmp(bucket->entries[ix].id, node->nodeid.id, KS_DHT_NODEID_SIZE)) { #ifdef KS_DHT_DEBUGPRINTF_ char buffer[100]; - printf("duplicate peer %s found at %d ", ks_dhtrt_printableid(node->nodeid.id, buffer), ix); + printf("duplicate peer %s found at %d\n", ks_dhtrt_printableid(node->nodeid.id, buffer), ix); #endif bucket->entries[ix].tyme = ks_time_now(); bucket->entries[ix].flags &= DHTPEER_ACTIVE; @@ -856,12 +882,6 @@ ks_status_t ks_dhtrt_insert_id(ks_dhtrt_bucket_t *bucket, ks_dht_node_t *node) } } - /* ****************** */ - /* bucket write lock */ - /* ****************** */ - /*ks_rwl_write_lock(bucket->lock);*/ - bucket->locked = 1; - if (free == KS_DHT_BUCKETSIZE && expiredixentries[free].tyme = ks_time_now(); bucket->entries[free].flags &= DHTPEER_ACTIVE; - ++bucket->count; + if (free != expiredix) { /* are we are taking a free slot rather than replacing an expired node? */ + ++bucket->count; /* yes: increment total count */ + } + memcpy(bucket->entries[free].id, node->nodeid.id, KS_DHT_NODEID_SIZE); - bucket->locked = 0; - /*ks_rwl_write_unlock(bucket->lock);*/ #ifdef KS_DHT_DEBUGPRINTF_ char buffer[100]; - printf("Inserting node %s\n", ks_dhtrt_printableid(node->nodeid.id, buffer)); + printf("Inserting node %s at %d\n", ks_dhtrt_printableid(node->nodeid.id, buffer), free); #endif return KS_STATUS_SUCCESS; } - bucket->locked = 0; - /*ks_rwl_write_unlock(bucket->lock);*/ - /* ********************** */ - /* end bucket write lock */ - /* ********************** */ - return KS_STATUS_FAIL; } @@ -923,7 +938,7 @@ ks_dht_node_t *ks_dhtrt_find_nodeid(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t } static -void ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id) +ks_status_t ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id) { #ifdef KS_DHT_DEBUGPRINTF_ @@ -932,20 +947,22 @@ void ks_dhtrt_delete_id(ks_dhtrt_bucket_t *bucket, ks_dhtrt_nodeid_t id) #endif for (int ix=0; ixentries[%d].id = %s inuse=%c\n", ix, - ks_dhtrt_printableid(bucket->entries[ix].id, buffer), + ks_dhtrt_printableid(bucket->entries[ix].id, bufferx), bucket->entries[ix].inuse ); #endif if ( bucket->entries[ix].inuse == 1 && (!memcmp(id, bucket->entries[ix].id, KS_DHT_NODEID_SIZE)) ) { bucket->entries[ix].inuse = 0; - bucket->entries[ix].gptr = 0; + bucket->entries[ix].gptr = 0; bucket->entries[ix].flags = 0; - return; + --bucket->count; + return KS_STATUS_SUCCESS; } } - return; + return KS_STATUS_FAIL; }