diff --git a/src/switch_core_session.c b/src/switch_core_session.c index 16351f2d59..974c4f2434 100644 --- a/src/switch_core_session.c +++ b/src/switch_core_session.c @@ -739,6 +739,7 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread(switch_thread_t * thr switch_assert(event_str); switch_core_memory_pool_tag(switch_core_session_get_pool(session), switch_core_session_strdup(session, event_str)); free(event_str); + switch_event_destroy(&event); } } diff --git a/src/switch_event.c b/src/switch_event.c index 60e306da7d..0136328776 100644 --- a/src/switch_event.c +++ b/src/switch_event.c @@ -35,7 +35,8 @@ #include #include #define MAX_DISPATCH 20 -#define DISPATCH_QUEUE_LEN 1000 +#define DISPATCH_QUEUE_LEN 5000 +//#define DEBUG_DISPATCH_QUEUES static int SOFT_MAX_DISPATCH = 0; static char hostname[128] = ""; @@ -44,18 +45,15 @@ static char guess_ip_v6[80] = ""; static switch_event_node_t *EVENT_NODES[SWITCH_EVENT_ALL + 1] = { NULL }; static switch_mutex_t *BLOCK = NULL; static switch_mutex_t *POOL_LOCK = NULL; -static switch_mutex_t *EVENT_QUEUE_MUTEX = NULL; -static switch_mutex_t *EVENT_QUEUE_HAVEMORE_MUTEX = NULL; -static switch_thread_cond_t *EVENT_QUEUE_CONDITIONAL = NULL; static switch_memory_pool_t *RUNTIME_POOL = NULL; static switch_memory_pool_t *THRUNTIME_POOL = NULL; static switch_queue_t *EVENT_QUEUE[3] = { 0, 0, 0 }; static switch_queue_t *EVENT_DISPATCH_QUEUE[MAX_DISPATCH] = { 0 }; static int POOL_COUNT_MAX = SWITCH_CORE_QUEUE_LEN; - +static switch_mutex_t *EVENT_QUEUE_MUTEX = NULL; static switch_hash_t *CUSTOM_HASH = NULL; -static int THREAD_RUNNING = 0; -static int EVENT_QUEUE_HAVEMORE = 0; +static int THREAD_COUNT = 0; +static int SYSTEM_RUNNING = 0; static switch_queue_t *EVENT_RECYCLE_QUEUE = NULL; static switch_queue_t *EVENT_HEADER_RECYCLE_QUEUE = NULL; static void launch_dispatch_threads(int max, int len, switch_memory_pool_t *pool); @@ -176,9 +174,61 @@ static int switch_events_match(switch_event_t *event, switch_event_node_t *node) static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t * thread, void *obj) { switch_queue_t *queue = (switch_queue_t *) obj; - int THREAD_RUNNING = 1; + int my_id = 0; + switch_mutex_lock(EVENT_QUEUE_MUTEX); + THREAD_COUNT++; + switch_mutex_unlock(EVENT_QUEUE_MUTEX); + + for (my_id = 0; my_id < MAX_DISPATCH; my_id++) { + if (EVENT_DISPATCH_QUEUE[my_id] == queue) { + break; + } + } + + for(;;) { + void *pop = NULL; + switch_event_t *event = NULL; + + if (switch_queue_pop(queue, &pop) != SWITCH_STATUS_SUCCESS) { + break; + } + + if (!pop) { + break; + } + + event = (switch_event_t *) pop; + switch_event_deliver(&event); + } + + + switch_mutex_lock(EVENT_QUEUE_MUTEX); + THREAD_COUNT--; + switch_mutex_unlock(EVENT_QUEUE_MUTEX); - while (THREAD_RUNNING == 1) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Dispatch Thread %d Ended.\n", my_id); + return NULL; + +} + + +static void *SWITCH_THREAD_FUNC switch_event_thread(switch_thread_t * thread, void *obj) +{ + switch_queue_t *queue = (switch_queue_t *) obj; + int index = 0; + int my_id = 0; + + switch_mutex_lock(EVENT_QUEUE_MUTEX); + THREAD_COUNT++; + switch_mutex_unlock(EVENT_QUEUE_MUTEX); + + for (my_id = 0; my_id < MAX_DISPATCH; my_id++) { + if (EVENT_QUEUE[my_id] == queue) { + break; + } + } + + for(;;) { void *pop = NULL; switch_event_t *event = NULL; @@ -191,135 +241,34 @@ static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t * t } event = (switch_event_t *) pop; - switch_event_deliver(&event); + + while(event) { + for (index = 0; index < SOFT_MAX_DISPATCH; index++) { + if (switch_queue_trypush(EVENT_DISPATCH_QUEUE[index], event) == SWITCH_STATUS_SUCCESS) { + event = NULL; + break; + } + } + + if (event) { + if (SOFT_MAX_DISPATCH+1 < MAX_DISPATCH) { + switch_mutex_lock(EVENT_QUEUE_MUTEX); + launch_dispatch_threads(SOFT_MAX_DISPATCH+1, DISPATCH_QUEUE_LEN, RUNTIME_POOL); + switch_mutex_unlock(EVENT_QUEUE_MUTEX); + } + } + } } - THREAD_RUNNING = 0; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Dispatch Ended.\n"); + switch_mutex_lock(EVENT_QUEUE_MUTEX); + THREAD_COUNT--; + switch_mutex_unlock(EVENT_QUEUE_MUTEX); + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Event Thread %d Ended.\n", my_id); return NULL; } -static void *SWITCH_THREAD_FUNC switch_event_thread(switch_thread_t * thread, void *obj) -{ - switch_event_t *out_event = NULL; - switch_queue_t *queue = NULL; - switch_queue_t *queues[3] = { 0, 0, 0 }; - void *pop; - int i, len[3] = { 0, 0, 0 }; - int index = 0; - - switch_assert(thread != NULL); - switch_assert(obj == NULL); - switch_assert(POOL_LOCK != NULL); - switch_assert(RUNTIME_POOL != NULL); - switch_assert(EVENT_QUEUE_MUTEX != NULL); - switch_assert(EVENT_QUEUE_HAVEMORE_MUTEX != NULL); - switch_assert(EVENT_QUEUE_CONDITIONAL != NULL); - THREAD_RUNNING = 1; - - queues[0] = EVENT_QUEUE[SWITCH_PRIORITY_HIGH]; - queues[1] = EVENT_QUEUE[SWITCH_PRIORITY_NORMAL]; - queues[2] = EVENT_QUEUE[SWITCH_PRIORITY_LOW]; - - switch_mutex_lock(EVENT_QUEUE_MUTEX); - - for (;;) { - int any; - - len[1] = switch_queue_size(EVENT_QUEUE[SWITCH_PRIORITY_NORMAL]); - len[2] = switch_queue_size(EVENT_QUEUE[SWITCH_PRIORITY_LOW]); - len[0] = switch_queue_size(EVENT_QUEUE[SWITCH_PRIORITY_HIGH]); - any = len[1] + len[2] + len[0]; - - if (!any) { - /* lock on havemore so we are the only ones poking at it while we check it - * see if we saw anything in the queues or have a check again flag - */ - switch_mutex_lock(EVENT_QUEUE_HAVEMORE_MUTEX); - if (!EVENT_QUEUE_HAVEMORE) { - /* See if we need to quit */ - if (THREAD_RUNNING != 1) { - /* give up our lock */ - switch_mutex_unlock(EVENT_QUEUE_HAVEMORE_MUTEX); - - /* game over */ - break; - } - - /* give up our lock */ - switch_mutex_unlock(EVENT_QUEUE_HAVEMORE_MUTEX); - - /* wait until someone tells us we have something to do */ - switch_thread_cond_wait(EVENT_QUEUE_CONDITIONAL, EVENT_QUEUE_MUTEX); - } else { - /* Caught a race, one of the queues was updated after we looked at it - * reset our flag - */ - EVENT_QUEUE_HAVEMORE = 0; - - /* give up our lock */ - switch_mutex_unlock(EVENT_QUEUE_HAVEMORE_MUTEX); - } - - /* go grab some events */ - continue; - } - - for (i = 0; i < 3; i++) { - if (len[i]) { - - queue = queues[i]; - - while (queue) { - if (switch_queue_trypop(queue, &pop) == SWITCH_STATUS_SUCCESS) { - out_event = pop; - - if (!pop) { - continue; - } - - retry: - - for (index = 0; index < SOFT_MAX_DISPATCH; index++) { - if (switch_queue_trypush(EVENT_DISPATCH_QUEUE[index], out_event) == SWITCH_STATUS_SUCCESS) { - out_event = NULL; - break; - } - } - - if (out_event) { - switch_yield(1000); - - if (SOFT_MAX_DISPATCH+1 < MAX_DISPATCH) { - launch_dispatch_threads(SOFT_MAX_DISPATCH+1, DISPATCH_QUEUE_LEN, RUNTIME_POOL); - } - - goto retry; - } - -#ifdef DEBUG_DISPATCH_QUEUES - printf ("SIZE "); - for (index = 0; index < SOFT_MAX_DISPATCH; index++) { - printf ("%d ", switch_queue_size(EVENT_DISPATCH_QUEUE[index])); - } - printf("\n"); -#endif - } else { - break; - } - } - } - } - - if (THREAD_RUNNING < 0) { - THREAD_RUNNING--; - } - } - - THREAD_RUNNING = 0; - return NULL; -} SWITCH_DECLARE(void) switch_event_deliver(switch_event_t **event) { @@ -344,7 +293,7 @@ SWITCH_DECLARE(void) switch_event_deliver(switch_event_t **event) SWITCH_DECLARE(switch_status_t) switch_event_running(void) { - return THREAD_RUNNING ? SWITCH_STATUS_SUCCESS : SWITCH_STATUS_FALSE; + return SYSTEM_RUNNING ? SWITCH_STATUS_SUCCESS : SWITCH_STATUS_FALSE; } SWITCH_DECLARE(char *) switch_event_name(switch_event_types_t event) @@ -417,47 +366,29 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void) { int x = 0, last = 0; - if (THREAD_RUNNING > 0) { - THREAD_RUNNING = -1; + switch_mutex_lock(EVENT_QUEUE_MUTEX); + SYSTEM_RUNNING = 0; + switch_mutex_unlock(EVENT_QUEUE_MUTEX); - /* lock on havemore to make sure the event thread, if currently running - * doesn't check the HAVEMORE flag before we set it - */ - switch_mutex_lock(EVENT_QUEUE_HAVEMORE_MUTEX); - /* see if the event thread is sitting */ - if (switch_mutex_trylock(EVENT_QUEUE_MUTEX) == SWITCH_STATUS_SUCCESS) { - /* we don't need havemore anymore, the thread was sitting already */ - switch_mutex_unlock(EVENT_QUEUE_HAVEMORE_MUTEX); - - /* wake up the event thread */ - switch_thread_cond_signal(EVENT_QUEUE_CONDITIONAL); - - /* give up our lock */ - switch_mutex_unlock(EVENT_QUEUE_MUTEX); - } else { - /* it wasn't waiting which means we might have updated a queue it already looked at - * set a flag so it knows to read the queues again - */ - EVENT_QUEUE_HAVEMORE = 1; - - /* variable updated, give up the mutex */ - switch_mutex_unlock(EVENT_QUEUE_HAVEMORE_MUTEX); - } - - for(x = 0; x < SOFT_MAX_DISPATCH; x++) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch queue %d\n", x); - switch_queue_trypush(EVENT_DISPATCH_QUEUE[x], NULL); - } - - while (x < 10000 && THREAD_RUNNING) { - switch_yield(1000); - if (THREAD_RUNNING == last) { - x++; - } - last = THREAD_RUNNING; - } + for(x = 0; x < 3; x++) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping event queue %d\n", x); + switch_queue_push(EVENT_QUEUE[x], NULL); } + for(x = 0; x < SOFT_MAX_DISPATCH; x++) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch queue %d\n", x); + switch_queue_push(EVENT_DISPATCH_QUEUE[x], NULL); + } + + while (x < 10000 && THREAD_COUNT) { + switch_yield(1000); + if (THREAD_COUNT == last) { + x++; + } + last = THREAD_COUNT; + } + + switch_core_hash_destroy(&CUSTOM_HASH); switch_core_memory_reclaim_events(); @@ -517,18 +448,24 @@ SWITCH_DECLARE(switch_status_t) switch_event_init(switch_memory_pool_t *pool) switch_mutex_init(&BLOCK, SWITCH_MUTEX_NESTED, RUNTIME_POOL); switch_mutex_init(&POOL_LOCK, SWITCH_MUTEX_NESTED, RUNTIME_POOL); switch_mutex_init(&EVENT_QUEUE_MUTEX, SWITCH_MUTEX_NESTED, RUNTIME_POOL); - switch_mutex_init(&EVENT_QUEUE_HAVEMORE_MUTEX, SWITCH_MUTEX_NESTED, RUNTIME_POOL); - switch_thread_cond_create(&EVENT_QUEUE_CONDITIONAL, RUNTIME_POOL); switch_core_hash_init(&CUSTOM_HASH, RUNTIME_POOL); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); switch_threadattr_priority_increase(thd_attr); launch_dispatch_threads(1, DISPATCH_QUEUE_LEN, RUNTIME_POOL); - switch_thread_create(&thread, thd_attr, switch_event_thread, NULL, RUNTIME_POOL); - - while (!THREAD_RUNNING) { + switch_thread_create(&thread, thd_attr, switch_event_thread, EVENT_QUEUE[0], RUNTIME_POOL); + switch_thread_create(&thread, thd_attr, switch_event_thread, EVENT_QUEUE[1], RUNTIME_POOL); + switch_thread_create(&thread, thd_attr, switch_event_thread, EVENT_QUEUE[2], RUNTIME_POOL); + + while (!THREAD_COUNT) { switch_yield(1000); } + + + switch_mutex_lock(EVENT_QUEUE_MUTEX); + SYSTEM_RUNNING = 1; + switch_mutex_unlock(EVENT_QUEUE_MUTEX); + return SWITCH_STATUS_SUCCESS; } @@ -969,12 +906,10 @@ SWITCH_DECLARE(switch_status_t) switch_event_fire_detailed(const char *file, con switch_assert(BLOCK != NULL); switch_assert(RUNTIME_POOL != NULL); - switch_assert(EVENT_QUEUE_HAVEMORE_MUTEX != NULL); switch_assert(EVENT_QUEUE_MUTEX != NULL); - switch_assert(EVENT_QUEUE_CONDITIONAL != NULL); switch_assert(RUNTIME_POOL != NULL); - if (THREAD_RUNNING <= 0) { + if (SYSTEM_RUNNING <= 0) { /* sorry we're closed */ switch_event_destroy(event); return SWITCH_STATUS_FALSE; @@ -1005,32 +940,8 @@ SWITCH_DECLARE(switch_status_t) switch_event_fire_detailed(const char *file, con (*event)->event_user_data = user_data; } + switch_queue_push(EVENT_QUEUE[(*event)->priority], *event); - - /* lock on havemore to make sure he event thread, if currently running - * doesn't check the HAVEMORE flag before we set it - */ - switch_mutex_lock(EVENT_QUEUE_HAVEMORE_MUTEX); - /* see if the event thread is sitting */ - if (switch_mutex_trylock(EVENT_QUEUE_MUTEX) == SWITCH_STATUS_SUCCESS) { - /* we don't need havemore anymore, the thread was sitting already */ - switch_mutex_unlock(EVENT_QUEUE_HAVEMORE_MUTEX); - - /* wake up the event thread */ - switch_thread_cond_signal(EVENT_QUEUE_CONDITIONAL); - - /* give up our lock */ - switch_mutex_unlock(EVENT_QUEUE_MUTEX); - } else { - /* it wasn't waiting which means we might have updated a queue it already looked at - * set a flag so it knows to read the queues again - */ - EVENT_QUEUE_HAVEMORE = 1; - - /* variable updated, give up the mutex */ - switch_mutex_unlock(EVENT_QUEUE_HAVEMORE_MUTEX); - } - *event = NULL; return SWITCH_STATUS_SUCCESS;