fix regression in core event system

This commit is contained in:
Anthony Minessale 2010-09-02 12:30:26 -05:00
parent 6eca026342
commit 2002f38b4c
1 changed files with 21 additions and 15 deletions

View File

@ -79,6 +79,7 @@ static switch_memory_pool_t *THRUNTIME_POOL = NULL;
static switch_thread_t *EVENT_QUEUE_THREADS[NUMBER_OF_QUEUES] = { 0 };
static switch_queue_t *EVENT_QUEUE[NUMBER_OF_QUEUES] = { 0 };
static switch_thread_t *EVENT_DISPATCH_QUEUE_THREADS[MAX_DISPATCH_VAL] = { 0 };
static uint8_t EVENT_DISPATCH_QUEUE_RUNNING[MAX_DISPATCH_VAL] = { 0 };
static switch_queue_t *EVENT_DISPATCH_QUEUE[MAX_DISPATCH_VAL] = { 0 };
static int POOL_COUNT_MAX = SWITCH_CORE_QUEUE_LEN;
static switch_mutex_t *EVENT_QUEUE_MUTEX = NULL;
@ -238,9 +239,9 @@ static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t *th
{
switch_queue_t *queue = (switch_queue_t *) obj;
int my_id = 0;
switch_mutex_lock(EVENT_QUEUE_MUTEX);
THREAD_COUNT++;
switch_mutex_unlock(EVENT_QUEUE_MUTEX);
for (my_id = 0; my_id < NUMBER_OF_QUEUES; my_id++) {
if (EVENT_DISPATCH_QUEUE[my_id] == queue) {
@ -248,6 +249,9 @@ static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t *th
}
}
EVENT_DISPATCH_QUEUE_RUNNING[my_id] = 1;
switch_mutex_unlock(EVENT_QUEUE_MUTEX);
for (;;) {
void *pop = NULL;
switch_event_t *event = NULL;
@ -270,6 +274,7 @@ static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t *th
switch_mutex_lock(EVENT_QUEUE_MUTEX);
EVENT_DISPATCH_QUEUE_RUNNING[my_id] = 1;
THREAD_COUNT--;
switch_mutex_unlock(EVENT_QUEUE_MUTEX);
@ -298,6 +303,7 @@ static void *SWITCH_THREAD_FUNC switch_event_thread(switch_thread_t *thread, voi
for (;;) {
void *pop = NULL;
switch_event_t *event = NULL;
int loops = 0;
if (switch_queue_pop(queue, &pop) != SWITCH_STATUS_SUCCESS) {
break;
@ -314,13 +320,13 @@ static void *SWITCH_THREAD_FUNC switch_event_thread(switch_thread_t *thread, voi
event = (switch_event_t *) pop;
while (event) {
int max;
switch_mutex_lock(EVENT_QUEUE_MUTEX);
max = SOFT_MAX_DISPATCH;
switch_mutex_unlock(EVENT_QUEUE_MUTEX);
if (++loops > 2) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Event system overloading\n");
switch_yield(1000000);
}
for (index = 0; (int)index < max; index++) {
for (index = 0; index < SOFT_MAX_DISPATCH; index++) {
if (switch_queue_trypush(EVENT_DISPATCH_QUEUE[index], event) == SWITCH_STATUS_SUCCESS) {
event = NULL;
break;
@ -328,19 +334,15 @@ static void *SWITCH_THREAD_FUNC switch_event_thread(switch_thread_t *thread, voi
}
if (event) {
switch_mutex_lock(EVENT_QUEUE_MUTEX);
if (SOFT_MAX_DISPATCH + 1 < MAX_DISPATCH) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Adding a new event thread #%d\n", SOFT_MAX_DISPATCH + 1);
switch_mutex_lock(EVENT_QUEUE_MUTEX);
launch_dispatch_threads(SOFT_MAX_DISPATCH + 1, DISPATCH_QUEUE_LEN, RUNTIME_POOL);
switch_mutex_unlock(EVENT_QUEUE_MUTEX);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Event threads maxed out at %d.\n", SOFT_MAX_DISPATCH);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Out of threads!\n");
switch_yield(1000000);
}
switch_mutex_unlock(EVENT_QUEUE_MUTEX);
}
switch_cond_next();
}
}
@ -566,6 +568,8 @@ static void launch_dispatch_threads(uint32_t max, int len, switch_memory_pool_t
{
switch_threadattr_t *thd_attr;
uint32_t index = 0;
int launched = 0;
uint32_t sanity = 200;
if (max > MAX_DISPATCH) {
return;
@ -584,8 +588,10 @@ static void launch_dispatch_threads(uint32_t max, int len, switch_memory_pool_t
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_threadattr_priority_increase(thd_attr);
switch_thread_create(&EVENT_DISPATCH_QUEUE_THREADS[index], thd_attr, switch_event_dispatch_thread, EVENT_DISPATCH_QUEUE[index], pool);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Create event dispatch thread %d\n", index);
switch_yield(100000);
while(--sanity && !EVENT_DISPATCH_QUEUE_RUNNING[index]) switch_yield(10000);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Create event dispatch thread %d\n", index);
launched++;
break;
}
SOFT_MAX_DISPATCH = index;