Implement switch_queue_pop_timeout() and refactor sofia_profile_worker_thread_run() to use it so it doesn't wake up too often

This commit is contained in:
Mathieu Rene 2010-11-18 21:00:21 -05:00
parent 42a4a3dedf
commit de417e99f0
6 changed files with 148 additions and 46 deletions

View File

@ -1 +1 @@
Mon Dec 28 14:55:57 EST 2009
Thu 18 Nov 2010 20:56:38 EST

View File

@ -78,6 +78,22 @@ APU_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data);
*/
APU_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data);
/**
* pop/get an object from the queue, blocking if the queue is already empty
*
* @param queue the queue
* @param data the data
* @param timeout The amount of time in microseconds to wait. This is
* a maximum, not a minimum. If the condition is signaled, we
* will wake up before this time, otherwise the error APR_TIMEUP
* is returned.
* @returns APR_TIMEUP the request timed out
* @returns APR_EINTR the blocking was interrupted (try again)
* @returns APR_EOF if the queue has been terminated
* @returns APR_SUCCESS on a successfull pop
*/
APU_DECLARE(apr_status_t) apr_queue_pop_timeout(apr_queue_t *queue, void **data, apr_interval_time_t timeout);
/**
* push/add a object to the queue, returning immediatly if the queue is full
*

View File

@ -313,6 +313,71 @@ APU_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data)
return rv;
}
/**
* Retrieves the next item from the queue. If there are no
* items available, it will block until one becomes available, or
* until timeout is elapsed. Once retrieved, the item is placed into
* the address specified by'data'.
*/
APU_DECLARE(apr_status_t) apr_queue_pop_timeout(apr_queue_t *queue, void **data, apr_interval_time_t timeout)
{
apr_status_t rv;
if (queue->terminated) {
return APR_EOF; /* no more elements ever again */
}
rv = apr_thread_mutex_lock(queue->one_big_mutex);
if (rv != APR_SUCCESS) {
return rv;
}
/* Keep waiting until we wake up and find that the queue is not empty. */
if (apr_queue_empty(queue)) {
if (!queue->terminated) {
queue->empty_waiters++;
rv = apr_thread_cond_timedwait(queue->not_empty, queue->one_big_mutex, timeout);
queue->empty_waiters--;
/* In the event of a timemout, APR_TIMEUP will be returned */
if (rv != APR_SUCCESS) {
apr_thread_mutex_unlock(queue->one_big_mutex);
return rv;
}
}
/* If we wake up and it's still empty, then we were interrupted */
if (apr_queue_empty(queue)) {
Q_DBG("queue empty (intr)", queue);
rv = apr_thread_mutex_unlock(queue->one_big_mutex);
if (rv != APR_SUCCESS) {
return rv;
}
if (queue->terminated) {
return APR_EOF; /* no more elements ever again */
}
else {
return APR_EINTR;
}
}
}
*data = queue->data[queue->out];
queue->nelts--;
queue->out = (queue->out + 1) % queue->bounds;
if (queue->full_waiters) {
Q_DBG("signal !full", queue);
rv = apr_thread_cond_signal(queue->not_full);
if (rv != APR_SUCCESS) {
apr_thread_mutex_unlock(queue->one_big_mutex);
return rv;
}
}
rv = apr_thread_mutex_unlock(queue->one_big_mutex);
return rv;
}
/**
* Retrieves the next item from the queue. If there are no
* items available, return APR_EAGAIN. Once retrieved,

View File

@ -592,6 +592,22 @@ SWITCH_DECLARE(switch_status_t) switch_queue_create(switch_queue_t ** queue, uns
*/
SWITCH_DECLARE(switch_status_t) switch_queue_pop(switch_queue_t *queue, void **data);
/**
* pop/get an object from the queue, blocking if the queue is already empty
*
* @param queue the queue
* @param data the data
* @param timeout The amount of time in microseconds to wait. This is
* a maximum, not a minimum. If the condition is signaled, we
* will wake up before this time, otherwise the error APR_TIMEUP
* is returned.
* @returns APR_TIMEUP the request timed out
* @returns APR_EINTR the blocking was interrupted (try again)
* @returns APR_EOF if the queue has been terminated
* @returns APR_SUCCESS on a successfull pop
*/
SWITCH_DECLARE(switch_status_t) switch_queue_pop_timeout(switch_queue_t *queue, void **data, switch_interval_time_t timeout);
/**
* push/add a object to the queue, blocking if the queue is already full
*

View File

@ -1230,43 +1230,42 @@ static void sofia_perform_profile_start_failure(sofia_profile_t *profile, char *
void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread, void *obj)
{
sofia_profile_t *profile = (sofia_profile_t *) obj;
uint32_t ireg_loops = 0;
uint32_t gateway_loops = 0;
int loops = 0;
uint32_t qsize;
void *pop = NULL;
int loop_count = 0;
switch_size_t sql_len = 1024 * 32;
char *tmp, *sqlbuf = NULL;
char *sql = NULL;
uint32_t ireg_loops = IREG_SECONDS; /* Number of loop iterations done when we haven't checked for registrations */
uint32_t gateway_loops = GATEWAY_SECONDS; /* Number of loop iterations done when we haven't checked for gateways */
void *pop = NULL; /* queue_pop placeholder */
switch_size_t sql_len = 1024 * 32; /* length of sqlbuf */
char *tmp, *sqlbuf = NULL; /* Buffer for SQL statements */
char *sql = NULL; /* Current SQL statement */
switch_time_t last_commit; /* Last time we committed stuff to the DB */
switch_time_t last_check; /* Last time we did the second-resolution loop that checks various stuff */
switch_size_t len = 0; /* Current length of sqlbuf */
uint32_t statements = 0; /* Number of statements in the current sql buffer */
last_commit = last_check = switch_micro_time_now();
if (sofia_test_pflag(profile, PFLAG_SQL_IN_TRANS)) {
sqlbuf = (char *) malloc(sql_len);
}
ireg_loops = IREG_SECONDS;
gateway_loops = GATEWAY_SECONDS;
sofia_set_pflag_locked(profile, PFLAG_WORKER_RUNNING);
switch_queue_create(&profile->sql_queue, SOFIA_QUEUE_SIZE, profile->pool);
qsize = switch_queue_size(profile->sql_queue);
while ((mod_sofia_globals.running == 1 && sofia_test_pflag(profile, PFLAG_RUNNING)) || qsize) {
/* While we're running, or there is a pending sql statment that we haven't appended to sqlbuf yet, because of a lack of buffer space */
while ((mod_sofia_globals.running == 1 && sofia_test_pflag(profile, PFLAG_RUNNING)) || sql) {
if (sofia_test_pflag(profile, PFLAG_SQL_IN_TRANS)) {
if (qsize > 0 && (qsize >= 1024 || ++loop_count >= (int)profile->trans_timeout)) {
switch_size_t newlen;
uint32_t iterations = 0;
switch_size_t len = 0;
switch_mutex_lock(profile->ireg_mutex);
/* Do we have enough statements or is the timeout expired */
while (sql || (sofia_test_pflag(profile, PFLAG_RUNNING) && mod_sofia_globals.running == 1 &&
(statements == 0 || (statements <= 1024 && (switch_micro_time_now() - last_commit)/1000 < profile->trans_timeout)))) {
while (sql || (switch_queue_trypop(profile->sql_queue, &pop) == SWITCH_STATUS_SUCCESS && pop)) {
switch_interval_time_t sleepy_time = !statements ? 1000000 : switch_micro_time_now() - last_commit - profile->trans_timeout*1000;
if (sql || (switch_queue_pop_timeout(profile->sql_queue, &pop, sleepy_time) == SWITCH_STATUS_SUCCESS && pop)) {
switch_size_t newlen;
if (!sql) sql = (char *) pop;
newlen = strlen(sql) + 2;
iterations++;
newlen = strlen(sql) + 2 /* strlen(";\n") */ ;
if (len + newlen + 10 > sql_len) {
switch_size_t new_mlen = len + newlen + 10 + 10240;
@ -1280,7 +1279,7 @@ void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread
}
sqlbuf = tmp;
} else {
goto skip;
break;
}
}
@ -1288,31 +1287,32 @@ void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread
len += newlen;
free(sql);
sql = NULL;
statements++;
}
skip:
}
/* Execute here */
last_commit = switch_micro_time_now();
if (len) {
//printf("TRANS:\n%s\n", sqlbuf);
switch_mutex_lock(profile->ireg_mutex);
sofia_glue_actually_execute_sql_trans(profile, sqlbuf, NULL);
//sofia_glue_actually_execute_sql(profile, "commit;\n", NULL);
switch_mutex_unlock(profile->ireg_mutex);
loop_count = 0;
statements = 0;
len = 0;
}
} else {
if (qsize) {
//switch_mutex_lock(profile->ireg_mutex);
while (switch_queue_trypop(profile->sql_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) {
sofia_glue_actually_execute_sql(profile, (char *) pop, profile->ireg_mutex);
free(pop);
}
//switch_mutex_unlock(profile->ireg_mutex);
if (switch_queue_pop_timeout(profile->sql_queue, &pop, 1000000) == SWITCH_STATUS_SUCCESS && pop) {
sofia_glue_actually_execute_sql(profile, (char *) pop, profile->ireg_mutex);
free(pop);
}
}
if (++loops >= 1000) {
if (switch_micro_time_now() - last_check >= 1000000) {
if (profile->watchdog_enabled) {
uint32_t event_diff = 0, step_diff = 0, event_fail = 0, step_fail = 0;
@ -1339,7 +1339,7 @@ void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread
if (event_fail || step_fail) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile %s: SIP STACK FAILURE DETECTED!\n"
"GOODBYE CRUEL WORLD, I'M LEAVING YOU TODAY....GOODBYE, GOODBYE, GOOD BYE\n", profile->name);
switch_yield(2000);
switch_yield(2000000);
abort();
}
}
@ -1354,12 +1354,11 @@ void *SWITCH_THREAD_FUNC sofia_profile_worker_thread_run(switch_thread_t *thread
sofia_reg_check_gateway(profile, switch_epoch_time_now(NULL));
gateway_loops = 0;
}
sofia_sub_check_gateway(profile, time(NULL));
loops = 0;
last_check = switch_micro_time_now();
}
switch_cond_next();
qsize = switch_queue_size(profile->sql_queue);
}
switch_mutex_lock(profile->ireg_mutex);

View File

@ -986,6 +986,12 @@ SWITCH_DECLARE(switch_status_t) switch_queue_pop(switch_queue_t *queue, void **d
return apr_queue_pop(queue, data);
}
SWITCH_DECLARE(switch_status_t) switch_queue_pop_timeout(switch_queue_t *queue, void **data, switch_interval_time_t timeout)
{
return apr_queue_pop_timeout(queue, data, timeout);
}
SWITCH_DECLARE(switch_status_t) switch_queue_push(switch_queue_t *queue, void *data)
{
apr_status_t s;