freeswitch/libs/libks/test/testq.c

226 lines
3.6 KiB
C

#include "ks.h"
#include "tap.h"
#define MAX 200
static void *test1_thread(ks_thread_t *thread, void *data)
{
ks_q_t *q = (ks_q_t *) data;
void *pop;
while(ks_q_pop(q, &pop) == KS_STATUS_SUCCESS) {
//int *i = (int *)pop;
//printf("POP %d\n", *i);
ks_pool_free(&pop);
}
return NULL;
}
static void do_flush(ks_q_t *q, void *ptr, void *flush_data)
{
//ks_pool_t *pool = (ks_pool_t *)flush_data;
ks_pool_free(&ptr);
}
int qtest1(int loops)
{
ks_thread_t *thread;
ks_q_t *q;
ks_pool_t *pool;
int i;
int r = 1;
void *pop;
ks_pool_open(&pool);
ks_q_create(&q, pool, loops);
ks_thread_create(&thread, test1_thread, q, pool);
if (ks_q_pop_timeout(q, &pop, 500) != KS_STATUS_TIMEOUT) {
r = 0;
goto end;
}
for (i = 0; i < 10000; i++) {
int *val = (int *)ks_pool_alloc(pool, sizeof(int));
*val = i;
ks_q_push(q, val);
}
ks_q_wait(q);
ks_q_term(q);
ks_thread_join(thread);
ks_q_destroy(&q);
ks_q_create(&q, pool, loops);
ks_q_set_flush_fn(q, do_flush, pool);
for (i = 0; i < loops; i++) {
int *val = (int *)ks_pool_alloc(pool, sizeof(int));
*val = i;
ks_q_push(q, val);
}
end:
ks_q_destroy(&q);
ks_pool_close(&pool);
return r;
}
struct test2_data {
ks_q_t *q;
int try;
int ready;
int running;
};
static void *test2_thread(ks_thread_t *thread, void *data)
{
struct test2_data *t2 = (struct test2_data *) data;
void *pop;
ks_status_t status;
int popped = 0;
while (t2->running && (t2->try && !t2->ready)) {
ks_sleep(10000);
continue;
}
while (t2->running || ks_q_size(t2->q)) {
if (t2->try) {
status = ks_q_trypop(t2->q, &pop);
} else {
status = ks_q_pop(t2->q, &pop);
}
if (status == KS_STATUS_SUCCESS) {
//int *i = (int *)pop;
//printf("%p POP %d\n", (void *)pthread_self(), *i);
popped++;
ks_pool_free(&pop);
} else if (status == KS_STATUS_INACTIVE) {
break;
} else if (t2->try && ks_q_size(t2->q)) {
int s = rand() % 100;
ks_sleep(s * 1000);
}
}
return (void *) (intptr_t)popped;
}
ks_size_t qtest2(int ttl, int try, int loops)
{
ks_thread_t *threads[MAX];
ks_q_t *q;
ks_pool_t *pool;
int i;
struct test2_data t2 = { 0 };
ks_size_t r;
int dropped = 0;
int qlen = loops / 2;
int total_popped = 0;
ks_pool_open(&pool);
ks_q_create(&q, pool, qlen);
t2.q = q;
t2.try = try;
t2.running = 1;
for (i = 0; i < ttl; i++) {
ks_thread_create(&threads[i], test2_thread, &t2, pool);
}
//ks_sleep(loops00);
for (i = 0; i < loops; i++) {
int *val = (int *)ks_pool_alloc(pool, sizeof(int));
*val = i;
if (try > 1) {
if (ks_q_trypush(q, val) != KS_STATUS_SUCCESS) {
dropped++;
}
} else {
ks_q_push(q, val);
}
if (i > qlen / 2) {
t2.ready = 1;
}
}
t2.running = 0;
if (!try) {
ks_q_wait(q);
ks_q_term(q);
}
for (i = 0; i < ttl; i++) {
int popped;
ks_thread_join(threads[i]);
popped = (int)(intptr_t)threads[i]->return_data;
if (popped) {
printf("%d/%d POPPED %d\n", i, ttl, popped);
}
total_popped += popped;
}
r = ks_q_size(q);
ks_assert(r == 0);
ks_q_destroy(&q);
printf("TOTAL POPPED: %d DROPPED %d SUM: %d\n", total_popped, dropped, total_popped + dropped);fflush(stdout);
if (try < 2) {
ks_assert(total_popped == loops);
} else {
ks_assert(total_popped + dropped == loops);
}
ks_pool_close(&pool);
return r;
}
int main(int argc, char **argv)
{
int ttl;
int size = 100000;
int runs = 1;
int i;
ks_init();
plan(4 * runs);
ttl = ks_cpu_count() * 5;
//ttl = 5;
for(i = 0; i < runs; i++) {
ok(qtest1(size));
ok(qtest2(ttl, 0, size) == 0);
ok(qtest2(ttl, 1, size) == 0);
ok(qtest2(ttl, 2, size) == 0);
}
printf("TTL %d RUNS %d\n", ttl, runs);
ks_shutdown();
done_testing();
}