From de417e99f0c41421f701f86ee5e4e507868be81f Mon Sep 17 00:00:00 2001 From: Mathieu Rene Date: Thu, 18 Nov 2010 21:00:21 -0500 Subject: [PATCH] Implement switch_queue_pop_timeout() and refactor sofia_profile_worker_thread_run() to use it so it doesn't wake up too often --- libs/apr-util/.update | 2 +- libs/apr-util/include/apr_queue.h | 16 ++++++ libs/apr-util/misc/apr_queue.c | 65 +++++++++++++++++++++ src/include/switch_apr.h | 16 ++++++ src/mod/endpoints/mod_sofia/sofia.c | 89 ++++++++++++++--------------- src/switch_apr.c | 6 ++ 6 files changed, 148 insertions(+), 46 deletions(-) diff --git a/libs/apr-util/.update b/libs/apr-util/.update index f430e64b3b..a573c80504 100644 --- a/libs/apr-util/.update +++ b/libs/apr-util/.update @@ -1 +1 @@ -Mon Dec 28 14:55:57 EST 2009 +Thu 18 Nov 2010 20:56:38 EST diff --git a/libs/apr-util/include/apr_queue.h b/libs/apr-util/include/apr_queue.h index 5a0181b29e..dcf0c137ed 100644 --- a/libs/apr-util/include/apr_queue.h +++ b/libs/apr-util/include/apr_queue.h @@ -78,6 +78,22 @@ APU_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data); */ APU_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data); +/** + * pop/get an object from the queue, blocking if the queue is already empty + * + * @param queue the queue + * @param data the data + * @param timeout The amount of time in microseconds to wait. This is + * a maximum, not a minimum. If the condition is signaled, we + * will wake up before this time, otherwise the error APR_TIMEUP + * is returned. + * @returns APR_TIMEUP the request timed out + * @returns APR_EINTR the blocking was interrupted (try again) + * @returns APR_EOF if the queue has been terminated + * @returns APR_SUCCESS on a successfull pop + */ +APU_DECLARE(apr_status_t) apr_queue_pop_timeout(apr_queue_t *queue, void **data, apr_interval_time_t timeout); + /** * push/add a object to the queue, returning immediatly if the queue is full * diff --git a/libs/apr-util/misc/apr_queue.c b/libs/apr-util/misc/apr_queue.c index 28d79afcb5..e905a53ebb 100644 --- a/libs/apr-util/misc/apr_queue.c +++ b/libs/apr-util/misc/apr_queue.c @@ -313,6 +313,71 @@ APU_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data) return rv; } +/** + * Retrieves the next item from the queue. If there are no + * items available, it will block until one becomes available, or + * until timeout is elapsed. Once retrieved, the item is placed into + * the address specified by'data'. + */ +APU_DECLARE(apr_status_t) apr_queue_pop_timeout(apr_queue_t *queue, void **data, apr_interval_time_t timeout) +{ + apr_status_t rv; + + if (queue->terminated) { + return APR_EOF; /* no more elements ever again */ + } + + rv = apr_thread_mutex_lock(queue->one_big_mutex); + if (rv != APR_SUCCESS) { + return rv; + } + + /* Keep waiting until we wake up and find that the queue is not empty. */ + if (apr_queue_empty(queue)) { + if (!queue->terminated) { + queue->empty_waiters++; + rv = apr_thread_cond_timedwait(queue->not_empty, queue->one_big_mutex, timeout); + queue->empty_waiters--; + /* In the event of a timemout, APR_TIMEUP will be returned */ + if (rv != APR_SUCCESS) { + apr_thread_mutex_unlock(queue->one_big_mutex); + return rv; + } + } + /* If we wake up and it's still empty, then we were interrupted */ + if (apr_queue_empty(queue)) { + Q_DBG("queue empty (intr)", queue); + rv = apr_thread_mutex_unlock(queue->one_big_mutex); + if (rv != APR_SUCCESS) { + return rv; + } + if (queue->terminated) { + return APR_EOF; /* no more elements ever again */ + } + else { + return APR_EINTR; + } + } + } + + *data = queue->data[queue->out]; + queue->nelts--; + + queue->out = (queue->out + 1) % queue->bounds; + if (queue->full_waiters) { + Q_DBG("signal !full", queue); + rv = apr_thread_cond_signal(queue->not_full); + if (rv != APR_SUCCESS) { + apr_thread_mutex_unlock(queue->one_big_mutex); + return rv; + } + } + + rv = apr_thread_mutex_unlock(queue->one_big_mutex); + return rv; +} + + /** * Retrieves the next item from the queue. If there are no * items available, return APR_EAGAIN. Once retrieved, diff --git a/src/include/switch_apr.h b/src/include/switch_apr.h index f82b81a7f8..dc81d1a522 100644 --- a/src/include/switch_apr.h +++ b/src/include/switch_apr.h @@ -592,6 +592,22 @@ SWITCH_DECLARE(switch_status_t) switch_queue_create(switch_queue_t ** queue, uns */ SWITCH_DECLARE(switch_status_t) switch_queue_pop(switch_queue_t *queue, void **data); +/** + * pop/get an object from the queue, blocking if the queue is already empty + * + * @param queue the queue + * @param data the data + * @param timeout The amount of time in microseconds to wait. This is + * a maximum, not a minimum. If the condition is signaled, we + * will wake up before this time, otherwise the error APR_TIMEUP + * is returned. + * @returns APR_TIMEUP the request timed out + * @returns APR_EINTR the blocking was interrupted (try again) + * @returns APR_EOF if the queue has been terminated + * @returns APR_SUCCESS on a successfull pop + */ +SWITCH_DECLARE(switch_status_t) switch_queue_pop_timeout(switch_queue_t *queue, void **data, switch_interval_time_t timeout); + /** * push/add a object to the queue, blocking if the queue is already full * diff --git a/src/mod/endpoints/mod_sofia/sofia.c b/src/mod/endpoints/mod_sofia/sofia.c index 5bb2ff0d66..9fadced17f 100644 --- a/src/mod/endpoints/mod_sofia/sofia.c +++ b/src/mod/endpoints/mod_sofia/sofia.c @@ -1230,43 +1230,42 @@ static void sofia_perform_profile_start_failure(sofia_profile_t *profile, char * void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread, void *obj) { sofia_profile_t *profile = (sofia_profile_t *) obj; - uint32_t ireg_loops = 0; - uint32_t gateway_loops = 0; - int loops = 0; - uint32_t qsize; - void *pop = NULL; - int loop_count = 0; - switch_size_t sql_len = 1024 * 32; - char *tmp, *sqlbuf = NULL; - char *sql = NULL; + uint32_t ireg_loops = IREG_SECONDS; /* Number of loop iterations done when we haven't checked for registrations */ + uint32_t gateway_loops = GATEWAY_SECONDS; /* Number of loop iterations done when we haven't checked for gateways */ + void *pop = NULL; /* queue_pop placeholder */ + switch_size_t sql_len = 1024 * 32; /* length of sqlbuf */ + char *tmp, *sqlbuf = NULL; /* Buffer for SQL statements */ + char *sql = NULL; /* Current SQL statement */ + switch_time_t last_commit; /* Last time we committed stuff to the DB */ + switch_time_t last_check; /* Last time we did the second-resolution loop that checks various stuff */ + switch_size_t len = 0; /* Current length of sqlbuf */ + uint32_t statements = 0; /* Number of statements in the current sql buffer */ + + last_commit = last_check = switch_micro_time_now(); if (sofia_test_pflag(profile, PFLAG_SQL_IN_TRANS)) { sqlbuf = (char *) malloc(sql_len); } - ireg_loops = IREG_SECONDS; - gateway_loops = GATEWAY_SECONDS; - sofia_set_pflag_locked(profile, PFLAG_WORKER_RUNNING); switch_queue_create(&profile->sql_queue, SOFIA_QUEUE_SIZE, profile->pool); - qsize = switch_queue_size(profile->sql_queue); - - while ((mod_sofia_globals.running == 1 && sofia_test_pflag(profile, PFLAG_RUNNING)) || qsize) { + /* While we're running, or there is a pending sql statment that we haven't appended to sqlbuf yet, because of a lack of buffer space */ + while ((mod_sofia_globals.running == 1 && sofia_test_pflag(profile, PFLAG_RUNNING)) || sql) { if (sofia_test_pflag(profile, PFLAG_SQL_IN_TRANS)) { - if (qsize > 0 && (qsize >= 1024 || ++loop_count >= (int)profile->trans_timeout)) { - switch_size_t newlen; - uint32_t iterations = 0; - switch_size_t len = 0; - - switch_mutex_lock(profile->ireg_mutex); + /* Do we have enough statements or is the timeout expired */ + while (sql || (sofia_test_pflag(profile, PFLAG_RUNNING) && mod_sofia_globals.running == 1 && + (statements == 0 || (statements <= 1024 && (switch_micro_time_now() - last_commit)/1000 < profile->trans_timeout)))) { - while (sql || (switch_queue_trypop(profile->sql_queue, &pop) == SWITCH_STATUS_SUCCESS && pop)) { + switch_interval_time_t sleepy_time = !statements ? 1000000 : switch_micro_time_now() - last_commit - profile->trans_timeout*1000; + + if (sql || (switch_queue_pop_timeout(profile->sql_queue, &pop, sleepy_time) == SWITCH_STATUS_SUCCESS && pop)) { + switch_size_t newlen; + if (!sql) sql = (char *) pop; - newlen = strlen(sql) + 2; - iterations++; + newlen = strlen(sql) + 2 /* strlen(";\n") */ ; if (len + newlen + 10 > sql_len) { switch_size_t new_mlen = len + newlen + 10 + 10240; @@ -1280,7 +1279,7 @@ void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread } sqlbuf = tmp; } else { - goto skip; + break; } } @@ -1288,31 +1287,32 @@ void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread len += newlen; free(sql); sql = NULL; + + statements++; } - - skip: - + } + + /* Execute here */ + last_commit = switch_micro_time_now(); + + if (len) { //printf("TRANS:\n%s\n", sqlbuf); + switch_mutex_lock(profile->ireg_mutex); sofia_glue_actually_execute_sql_trans(profile, sqlbuf, NULL); //sofia_glue_actually_execute_sql(profile, "commit;\n", NULL); switch_mutex_unlock(profile->ireg_mutex); - loop_count = 0; + statements = 0; + len = 0; } + } else { - if (qsize) { - //switch_mutex_lock(profile->ireg_mutex); - while (switch_queue_trypop(profile->sql_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) { - sofia_glue_actually_execute_sql(profile, (char *) pop, profile->ireg_mutex); - free(pop); - } - //switch_mutex_unlock(profile->ireg_mutex); + if (switch_queue_pop_timeout(profile->sql_queue, &pop, 1000000) == SWITCH_STATUS_SUCCESS && pop) { + sofia_glue_actually_execute_sql(profile, (char *) pop, profile->ireg_mutex); + free(pop); } } - if (++loops >= 1000) { - - - + if (switch_micro_time_now() - last_check >= 1000000) { if (profile->watchdog_enabled) { uint32_t event_diff = 0, step_diff = 0, event_fail = 0, step_fail = 0; @@ -1339,7 +1339,7 @@ void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread if (event_fail || step_fail) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile %s: SIP STACK FAILURE DETECTED!\n" "GOODBYE CRUEL WORLD, I'M LEAVING YOU TODAY....GOODBYE, GOODBYE, GOOD BYE\n", profile->name); - switch_yield(2000); + switch_yield(2000000); abort(); } } @@ -1354,12 +1354,11 @@ void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread sofia_reg_check_gateway(profile, switch_epoch_time_now(NULL)); gateway_loops = 0; } + sofia_sub_check_gateway(profile, time(NULL)); - loops = 0; + + last_check = switch_micro_time_now(); } - - switch_cond_next(); - qsize = switch_queue_size(profile->sql_queue); } switch_mutex_lock(profile->ireg_mutex); diff --git a/src/switch_apr.c b/src/switch_apr.c index bdff179b4a..dccc37be96 100644 --- a/src/switch_apr.c +++ b/src/switch_apr.c @@ -986,6 +986,12 @@ SWITCH_DECLARE(switch_status_t) switch_queue_pop(switch_queue_t *queue, void **d return apr_queue_pop(queue, data); } +SWITCH_DECLARE(switch_status_t) switch_queue_pop_timeout(switch_queue_t *queue, void **data, switch_interval_time_t timeout) +{ + return apr_queue_pop_timeout(queue, data, timeout); +} + + SWITCH_DECLARE(switch_status_t) switch_queue_push(switch_queue_t *queue, void *data) { apr_status_t s;