break event parsing for mod_voicemail into its own internal queue (by Moc)

This commit is contained in:
Anthony Minessale 2011-08-17 11:27:20 -05:00
parent 25032153fb
commit 7531fed8d8
1 changed files with 122 additions and 3 deletions

View File

@ -45,6 +45,7 @@ SWITCH_MODULE_DEFINITION(mod_voicemail, mod_voicemail_load, mod_voicemail_shutdo
#define VM_EVENT_MAINT "vm::maintenance"
#define VM_MAX_GREETINGS 9
#define VM_EVENT_QUEUE_SIZE 50000
static switch_status_t voicemail_inject(const char *data, switch_core_session_t *session);
@ -53,6 +54,9 @@ static struct {
switch_hash_t *profile_hash;
int debug;
int message_query_exact_match;
int32_t threads;
int32_t running;
switch_queue_t *event_queue;
switch_mutex_t *mutex;
switch_memory_pool_t *pool;
} globals;
@ -3655,7 +3659,7 @@ SWITCH_STANDARD_API(prefs_api_function)
}
static void message_query_handler(switch_event_t *event)
static void actual_message_query_handler(switch_event_t *event)
{
char *account = switch_event_get_header(event, "message-account");
int created = 0;
@ -3727,6 +3731,101 @@ static void message_query_handler(switch_event_t *event)
}
static int EVENT_THREAD_RUNNING = 0;
static int EVENT_THREAD_STARTED = 0;
void *SWITCH_THREAD_FUNC vm_event_thread_run(switch_thread_t *thread, void *obj)
{
void *pop;
int done = 0;
switch_mutex_lock(globals.mutex);
if (!EVENT_THREAD_RUNNING) {
EVENT_THREAD_RUNNING++;
globals.threads++;
} else {
done = 1;
}
switch_mutex_unlock(globals.mutex);
if (done) {
return NULL;
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Event Thread Started\n");
while (globals.running == 1) {
int count = 0;
if (switch_queue_trypop(globals.event_queue, &pop) == SWITCH_STATUS_SUCCESS) {
switch_event_t *event = (switch_event_t *) pop;
if (!pop) {
break;
}
actual_message_query_handler(event);
switch_event_destroy(&event);
count++;
}
if (!count) {
switch_yield(100000);
}
}
while (switch_queue_trypop(globals.event_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) {
switch_event_t *event = (switch_event_t *) pop;
switch_event_destroy(&event);
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Event Thread Ended\n");
switch_mutex_lock(globals.mutex);
globals.threads--;
EVENT_THREAD_RUNNING = EVENT_THREAD_STARTED = 0;
switch_mutex_unlock(globals.mutex);
return NULL;
}
void vm_event_thread_start(void)
{
switch_thread_t *thread;
switch_threadattr_t *thd_attr = NULL;
int done = 0;
switch_mutex_lock(globals.mutex);
if (!EVENT_THREAD_STARTED) {
EVENT_THREAD_STARTED++;
} else {
done = 1;
}
switch_mutex_unlock(globals.mutex);
if (done) {
return;
}
switch_threadattr_create(&thd_attr, globals.pool);
switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_threadattr_priority_increase(thd_attr);
switch_thread_create(&thread, thd_attr, vm_event_thread_run, NULL, globals.pool);
}
void vm_event_handler(switch_event_t *event)
{
switch_event_t *cloned_event;
switch_event_dup(&cloned_event, event);
switch_assert(cloned_event);
switch_queue_push(globals.event_queue, cloned_event);
if (!EVENT_THREAD_STARTED) {
vm_event_thread_start();
}
}
struct holder {
vm_profile_t *profile;
@ -5502,14 +5601,20 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_voicemail_load)
switch_core_hash_init(&globals.profile_hash, globals.pool);
switch_mutex_init(&globals.mutex, SWITCH_MUTEX_NESTED, globals.pool);
switch_mutex_lock(globals.mutex);
globals.running = 1;
switch_mutex_unlock(globals.mutex);
switch_queue_create(&globals.event_queue, VM_EVENT_QUEUE_SIZE, globals.pool);
if ((status = load_config()) != SWITCH_STATUS_SUCCESS) {
globals.running = 0;
return status;
}
/* connect my internal structure to the blank pointer passed to me */
*module_interface = switch_loadable_module_create_module_interface(pool, modname);
if (switch_event_bind(modname, SWITCH_EVENT_MESSAGE_QUERY, SWITCH_EVENT_SUBCLASS_ANY, message_query_handler, NULL)
if (switch_event_bind(modname, SWITCH_EVENT_MESSAGE_QUERY, SWITCH_EVENT_SUBCLASS_ANY, vm_event_handler, NULL)
!= SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind!\n");
return SWITCH_STATUS_GENERR;
@ -5554,9 +5659,23 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_voicemail_shutdown)
void *val = NULL;
const void *key;
switch_ssize_t keylen;
int sanity = 0;
switch_mutex_lock(globals.mutex);
if (globals.running == 1) {
globals.running = 0;
}
switch_mutex_unlock(globals.mutex);
switch_event_free_subclass(VM_EVENT_MAINT);
switch_event_unbind_callback(message_query_handler);
switch_event_unbind_callback(vm_event_handler);
while (globals.threads) {
switch_cond_next();
if (++sanity >= 60000) {
break;
}
}
switch_mutex_lock(globals.mutex);
while ((hi = switch_hash_first(NULL, globals.profile_hash))) {