From f60962ae877762db4e9f494148a2b104482ddca2 Mon Sep 17 00:00:00 2001 From: Anthony Minessale Date: Wed, 7 Nov 2012 12:10:50 -0600 Subject: [PATCH] fix some contention issues under really high load...That doesn't mean you need to push it this hard and bug me about it =p --- src/mod/endpoints/mod_sofia/mod_sofia.h | 4 + src/mod/endpoints/mod_sofia/sofia.c | 46 ++--------- src/mod/endpoints/mod_sofia/sofia_glue.c | 20 +++++ src/mod/endpoints/mod_sofia/sofia_presence.c | 77 +++++++++++++------ src/mod/endpoints/mod_sofia/sofia_reg.c | 5 +- .../mod_event_socket/mod_event_socket.c | 10 --- src/switch_event.c | 5 +- 7 files changed, 89 insertions(+), 78 deletions(-) diff --git a/src/mod/endpoints/mod_sofia/mod_sofia.h b/src/mod/endpoints/mod_sofia/mod_sofia.h index 864d3a1e6f..909516d63e 100644 --- a/src/mod/endpoints/mod_sofia/mod_sofia.h +++ b/src/mod/endpoints/mod_sofia/mod_sofia.h @@ -389,6 +389,7 @@ struct mod_sofia_globals { int tracelevel; char *capture_server; int rewrite_multicasted_fs_path; + int presence_flush; }; extern struct mod_sofia_globals mod_sofia_globals; @@ -694,6 +695,7 @@ struct sofia_profile { int ireg_seconds; sofia_paid_type_t paid_type; uint32_t rtp_digit_delay; + switch_queue_t *event_queue; }; struct private_object { @@ -1206,6 +1208,8 @@ int sofia_recover_callback(switch_core_session_t *session); void sofia_glue_set_name(private_object_t *tech_pvt, const char *channame); private_object_t *sofia_glue_new_pvt(switch_core_session_t *session); switch_status_t sofia_init(void); +void sofia_glue_fire_events(sofia_profile_t *profile); +void sofia_event_fire(sofia_profile_t *profile, switch_event_t **event); /* For Emacs: * Local Variables: diff --git a/src/mod/endpoints/mod_sofia/sofia.c b/src/mod/endpoints/mod_sofia/sofia.c index 1c14c473d7..6fcccabf53 100644 --- a/src/mod/endpoints/mod_sofia/sofia.c +++ b/src/mod/endpoints/mod_sofia/sofia.c @@ -1559,7 +1559,7 @@ void sofia_process_dispatch_event_in_thread(sofia_dispatch_event_t **dep) { sofia_dispatch_event_t *de = *dep; switch_memory_pool_t *pool; - sofia_profile_t *profile = (*dep)->profile; + //sofia_profile_t *profile = (*dep)->profile; switch_thread_data_t *td; switch_core_new_memory_pool(&pool); @@ -1571,45 +1571,10 @@ void sofia_process_dispatch_event_in_thread(sofia_dispatch_event_t **dep) td->func = sofia_msg_thread_run_once; td->obj = de; - switch_mutex_lock(profile->ireg_mutex); switch_thread_pool_launch_thread(&td); - switch_mutex_unlock(profile->ireg_mutex); + } -#if 0 -void sofia_process_dispatch_event_in_thread(sofia_dispatch_event_t **dep) -{ - sofia_dispatch_event_t *de = *dep; - switch_threadattr_t *thd_attr = NULL; - switch_memory_pool_t *pool; - switch_thread_t *thread; - sofia_profile_t *profile = (*dep)->profile; - switch_status_t status; - - switch_core_new_memory_pool(&pool); - - - *dep = NULL; - de->pool = pool; - - switch_mutex_lock(profile->ireg_mutex); - switch_threadattr_create(&thd_attr, de->pool); - switch_threadattr_detach_set(thd_attr, 1); - switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); - status = switch_thread_create(&thread, - thd_attr, - sofia_msg_thread_run_once, - de, - de->pool); - switch_mutex_unlock(profile->ireg_mutex); - - if (status != SWITCH_STATUS_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Cannot create threads!\n"); - sofia_process_dispatch_event(&de); - } -} -#endif - void sofia_process_dispatch_event(sofia_dispatch_event_t **dep) { sofia_dispatch_event_t *de = *dep; @@ -1992,6 +1957,7 @@ void sofia_event_callback(nua_event_t event, sofia_queue_message(de); end: + //switch_cond_next(); return; } @@ -2133,7 +2099,7 @@ void event_handler(switch_event_t *event) contact_str = fixed_contact_str; } - switch_mutex_lock(profile->ireg_mutex); + sofia_glue_execute_sql(profile, &sql, SWITCH_TRUE); switch_find_local_ip(guess_ip4, sizeof(guess_ip4), NULL, AF_INET); @@ -2150,7 +2116,7 @@ void event_handler(switch_event_t *event) sofia_glue_execute_sql(profile, &sql, SWITCH_TRUE); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Propagating registration for %s@%s->%s\n", from_user, from_host, contact_str); } - switch_mutex_unlock(profile->ireg_mutex); + if (profile) { sofia_glue_release_profile(profile); @@ -2557,6 +2523,8 @@ void *SWITCH_THREAD_FUNC sofia_profile_thread_run(switch_thread_t *thread, void switch_mutex_init(&profile->ireg_mutex, SWITCH_MUTEX_NESTED, profile->pool); switch_mutex_init(&profile->gateway_mutex, SWITCH_MUTEX_NESTED, profile->pool); + switch_queue_create(&profile->event_queue, SOFIA_QUEUE_SIZE, profile->pool); + switch_snprintf(qname, sizeof(qname), "sofia:%s", profile->name); switch_sql_queue_manager_init_name(qname, diff --git a/src/mod/endpoints/mod_sofia/sofia_glue.c b/src/mod/endpoints/mod_sofia/sofia_glue.c index bbaa9ae347..7e55407962 100644 --- a/src/mod/endpoints/mod_sofia/sofia_glue.c +++ b/src/mod/endpoints/mod_sofia/sofia_glue.c @@ -6498,6 +6498,9 @@ switch_bool_t sofia_glue_execute_sql_callback(sofia_profile_t *profile, switch_cache_db_release_db_handle(&dbh); + + sofia_glue_fire_events(profile); + return ret; } @@ -7139,6 +7142,23 @@ char *sofia_glue_get_host(const char *str, switch_memory_pool_t *pool) return s; } +void sofia_glue_fire_events(sofia_profile_t *profile) +{ + void *pop = NULL; + + while (profile->event_queue && switch_queue_trypop(profile->event_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) { + switch_event_t *event = (switch_event_t *) pop; + switch_event_fire(&event); + } + +} + +void sofia_event_fire(sofia_profile_t *profile, switch_event_t **event) +{ + switch_queue_push(profile->event_queue, *event); + *event = NULL; +} + /* For Emacs: * Local Variables: diff --git a/src/mod/endpoints/mod_sofia/sofia_presence.c b/src/mod/endpoints/mod_sofia/sofia_presence.c index 17ab6b6efa..ed156c5f05 100644 --- a/src/mod/endpoints/mod_sofia/sofia_presence.c +++ b/src/mod/endpoints/mod_sofia/sofia_presence.c @@ -1027,7 +1027,7 @@ static void conference_data_event_handler(switch_event_t *event) switch_safe_free(dup_domain); } -static void actual_sofia_presence_event_handler(switch_event_t *event) +static switch_event_t *actual_sofia_presence_event_handler(switch_event_t *event) { sofia_profile_t *profile = NULL; char *from = switch_event_get_header(event, "from"); @@ -1047,10 +1047,10 @@ static void actual_sofia_presence_event_handler(switch_event_t *event) switch_console_callback_match_t *matches; struct presence_helper helper = { 0 }; int hup = 0; - + switch_event_t *s_event = NULL; if (!mod_sofia_globals.running) { - return; + goto done; } if (zstr(proto) || !strcasecmp(proto, "any")) { @@ -1091,7 +1091,7 @@ static void actual_sofia_presence_event_handler(switch_event_t *event) if (!mod_sofia_globals.profile_hash) { - return; + goto done; } if (from) { @@ -1171,7 +1171,7 @@ static void actual_sofia_presence_event_handler(switch_event_t *event) } switch_safe_free(sql); - return; + goto done; } if (zstr(event_type)) { @@ -1195,7 +1195,7 @@ static void actual_sofia_presence_event_handler(switch_event_t *event) } } else { switch_safe_free(user); - return; + goto done; } if ((euser = strchr(user, '+'))) { euser++; @@ -1203,7 +1203,7 @@ static void actual_sofia_presence_event_handler(switch_event_t *event) euser = user; } } else { - return; + goto done; } switch (event->event_id) { @@ -1462,8 +1462,7 @@ static void actual_sofia_presence_event_handler(switch_event_t *event) if (hup && dh.hits < 1) { /* so many phones get confused when whe hangup we have to reprobe to get them all to reset to absolute states so the lights stay correct */ - switch_event_t *s_event; - + if (switch_event_create(&s_event, SWITCH_EVENT_PRESENCE_PROBE) == SWITCH_STATUS_SUCCESS) { switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "proto", SOFIA_CHAT_PROTO); switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "login", profile->name); @@ -1471,10 +1470,9 @@ static void actual_sofia_presence_event_handler(switch_event_t *event) switch_event_add_header(s_event, SWITCH_STACK_BOTTOM, "to", "%s@%s", euser, host); switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "event_type", "presence"); switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "alt_event_type", "dialog"); - switch_event_fire(&s_event); } } - + if (!zstr((char *) helper.stream.data)) { char *this_sql = (char *) helper.stream.data; @@ -1509,11 +1507,24 @@ static void actual_sofia_presence_event_handler(switch_event_t *event) switch_safe_free(sql); switch_safe_free(user); + + return s_event; } static int EVENT_THREAD_RUNNING = 0; static int EVENT_THREAD_STARTED = 0; +static void do_flush(void) +{ + void *pop = NULL; + + while (mod_sofia_globals.presence_queue && switch_queue_trypop(mod_sofia_globals.presence_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) { + switch_event_t *event = (switch_event_t *) pop; + switch_event_destroy(&event); + } + +} + void *SWITCH_THREAD_FUNC sofia_presence_event_thread_run(switch_thread_t *thread, void *obj) { void *pop; @@ -1544,6 +1555,15 @@ void *SWITCH_THREAD_FUNC sofia_presence_event_thread_run(switch_thread_t *thread break; } + if (mod_sofia_globals.presence_flush) { + switch_mutex_lock(mod_sofia_globals.mutex); + if (mod_sofia_globals.presence_flush) { + do_flush(); + mod_sofia_globals.presence_flush = 0; + } + switch_mutex_unlock(mod_sofia_globals.mutex); + } + switch(event->event_id) { case SWITCH_EVENT_MESSAGE_WAITING: actual_sofia_presence_mwi_event_handler(event); @@ -1552,7 +1572,11 @@ void *SWITCH_THREAD_FUNC sofia_presence_event_thread_run(switch_thread_t *thread conference_data_event_handler(event); break; default: - actual_sofia_presence_event_handler(event); + do { + switch_event_t *ievent = event; + event = actual_sofia_presence_event_handler(ievent); + switch_event_destroy(&ievent); + } while (event); break; } @@ -1561,10 +1585,7 @@ void *SWITCH_THREAD_FUNC sofia_presence_event_thread_run(switch_thread_t *thread } } - while (switch_queue_trypop(mod_sofia_globals.presence_queue, &pop) == SWITCH_STATUS_SUCCESS && pop) { - switch_event_t *event = (switch_event_t *) pop; - switch_event_destroy(&event); - } + do_flush(); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Event Thread Ended\n"); @@ -1606,13 +1627,23 @@ void sofia_presence_event_handler(switch_event_t *event) { switch_event_t *cloned_event; - switch_event_dup(&cloned_event, event); - switch_assert(cloned_event); - switch_queue_push(mod_sofia_globals.presence_queue, cloned_event); - if (!EVENT_THREAD_STARTED) { sofia_presence_event_thread_start(); + switch_yield(500000); } + + switch_event_dup(&cloned_event, event); + switch_assert(cloned_event); + + if (switch_queue_trypush(mod_sofia_globals.presence_queue, cloned_event) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Presence queue overloaded.... Flushing queue\n"); + switch_mutex_lock(mod_sofia_globals.mutex); + mod_sofia_globals.presence_flush = 1; + switch_mutex_unlock(mod_sofia_globals.mutex); + switch_event_destroy(&cloned_event); + } + + } @@ -1640,7 +1671,7 @@ static int sofia_presence_sub_reg_callback(void *pArg, int argc, char **argv, ch } - switch_event_fire(&event); + sofia_event_fire(profile, &event); } return 0; } @@ -1653,7 +1684,7 @@ static int sofia_presence_sub_reg_callback(void *pArg, int argc, char **argv, ch switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "event_subtype", "probe"); switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "proto-specific-event-name", event_name); switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "expires", expires); - switch_event_fire(&event); + sofia_event_fire(profile, &event); } return 0; @@ -1777,7 +1808,7 @@ static int sofia_presence_resub_callback(void *pArg, int argc, char **argv, char } } - switch_event_fire(&event); + sofia_event_fire(profile, &event); } switch_safe_free(free_me); diff --git a/src/mod/endpoints/mod_sofia/sofia_reg.c b/src/mod/endpoints/mod_sofia/sofia_reg.c index 3e0e3965fe..4efa9fabe6 100644 --- a/src/mod/endpoints/mod_sofia/sofia_reg.c +++ b/src/mod/endpoints/mod_sofia/sofia_reg.c @@ -635,7 +635,7 @@ int sofia_reg_del_callback(void *pArg, int argc, char **argv, char **columnNames switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "contact", argv[3]); switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "expires", argv[6]); switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "user-agent", argv[7]); - switch_event_fire(&s_event); + sofia_event_fire(profile, &s_event); } if (switch_event_create(&s_event, SWITCH_EVENT_PRESENCE_IN) == SWITCH_STATUS_SUCCESS) { @@ -653,7 +653,7 @@ int sofia_reg_del_callback(void *pArg, int argc, char **argv, char **columnNames switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "status", "Unregistered"); switch_event_add_header_string(s_event, SWITCH_STACK_BOTTOM, "event_type", "presence"); - switch_event_fire(&s_event); + sofia_event_fire(profile, &s_event); } } @@ -859,7 +859,6 @@ void sofia_reg_check_sync(sofia_profile_t *profile) sql = switch_mprintf("delete from sip_registrations where expires > 0 and hostname='%q'", mod_sofia_globals.hostname); sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE); - sql = switch_mprintf("delete from sip_presence where expires > 0 and hostname='%q'", mod_sofia_globals.hostname); sofia_glue_execute_sql_now(profile, &sql, SWITCH_TRUE); diff --git a/src/mod/event_handlers/mod_event_socket/mod_event_socket.c b/src/mod/event_handlers/mod_event_socket/mod_event_socket.c index f970a1c63e..f1dc5874e8 100644 --- a/src/mod/event_handlers/mod_event_socket/mod_event_socket.c +++ b/src/mod/event_handlers/mod_event_socket/mod_event_socket.c @@ -175,13 +175,8 @@ static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_l if (switch_queue_trypush(l->log_queue, dnode) == SWITCH_STATUS_SUCCESS) { if (l->lost_logs) { int ll = l->lost_logs; - switch_event_t *event; l->lost_logs = 0; switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Lost %d log lines!\n", ll); - if (switch_event_create(&event, SWITCH_EVENT_TRAP) == SWITCH_STATUS_SUCCESS) { - switch_event_add_header(event, SWITCH_STACK_BOTTOM, "info", "lost %d log lines", ll); - switch_event_fire(&event); - } } } else { switch_log_node_free(&dnode); @@ -378,11 +373,6 @@ static void event_handler(switch_event_t *event) int le = l->lost_events; l->lost_events = 0; switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(l->session), SWITCH_LOG_CRIT, "Lost %d events!\n", le); - clone = NULL; - if (switch_event_create(&clone, SWITCH_EVENT_TRAP) == SWITCH_STATUS_SUCCESS) { - switch_event_add_header(clone, SWITCH_STACK_BOTTOM, "info", "lost %d events", le); - switch_event_fire(&clone); - } } } else { if (++l->lost_events > MAX_MISSED) { diff --git a/src/switch_event.c b/src/switch_event.c index 566ccc82f8..e589e02f43 100644 --- a/src/switch_event.c +++ b/src/switch_event.c @@ -472,7 +472,6 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch queues\n"); - for(x = 0; x < (uint32_t)DISPATCH_THREAD_COUNT; x++) { switch_queue_trypush(EVENT_DISPATCH_QUEUE, NULL); } @@ -487,8 +486,8 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void) } x = 0; - while (x < 10000 && THREAD_COUNT) { - switch_cond_next(); + while (x < 100 && THREAD_COUNT) { + switch_yield(100000); if (THREAD_COUNT == last) { x++; }