diff --git a/src/mod/applications/mod_fifo/mod_fifo.c b/src/mod/applications/mod_fifo/mod_fifo.c index 78e8950b40..8970803f44 100644 --- a/src/mod/applications/mod_fifo/mod_fifo.c +++ b/src/mod/applications/mod_fifo/mod_fifo.c @@ -594,6 +594,11 @@ static struct { switch_thread_t *node_thread; int debug; struct fifo_node *nodes; + char *pre_trans_execute; + char *post_trans_execute; + char *inner_pre_trans_execute; + char *inner_post_trans_execute; + switch_sql_queue_manager_t *qm; } globals; @@ -742,7 +747,29 @@ switch_cache_db_handle_t *fifo_get_db_handle(void) return dbh; } +static switch_status_t fifo_execute_sql_queued(char **sqlp, switch_bool_t sql_already_dynamic, switch_bool_t block) +{ + int index = 1; + char *sql; + switch_assert(sqlp && *sqlp); + sql = *sqlp; + + + if (switch_stristr("insert", sql)) { + index = 0; + } + + switch_sql_queue_manager_push(globals.qm, sql, index, !sql_already_dynamic); + + if (sql_already_dynamic) { + *sqlp = NULL; + } + + return SWITCH_STATUS_SUCCESS; + +} +#if 0 static switch_status_t fifo_execute_sql(char *sql, switch_mutex_t *mutex) { switch_cache_db_handle_t *dbh = NULL; @@ -771,6 +798,7 @@ static switch_status_t fifo_execute_sql(char *sql, switch_mutex_t *mutex) return status; } +#endif static switch_bool_t fifo_execute_sql_callback(switch_mutex_t *mutex, char *sql, switch_core_db_callback_func_t callback, void *pdata) { @@ -937,9 +965,7 @@ static void do_unbridge(switch_core_session_t *consumer_session, switch_core_ses switch_strftime_nocheck(date, &retsize, sizeof(date), "%Y-%m-%d %T", &tm); sql = switch_mprintf("delete from fifo_bridge where consumer_uuid='%q'", switch_core_session_get_uuid(consumer_session)); - fifo_execute_sql(sql, globals.sql_mutex); - switch_safe_free(sql); - + fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE); switch_channel_set_variable(consumer_channel, "fifo_status", "WAITING"); @@ -1025,8 +1051,7 @@ static switch_status_t messagehook (switch_core_session_t *session, switch_core_ switch_str_nil(msg->string_array_arg[0]), switch_str_nil(msg->string_array_arg[1]), switch_core_session_get_uuid(session)); - fifo_execute_sql(sql, globals.sql_mutex); - switch_safe_free(sql); + fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE); goto end; default: goto end; @@ -1124,8 +1149,7 @@ static switch_status_t messagehook (switch_core_session_t *session, switch_core_ ); } - fifo_execute_sql(sql, globals.sql_mutex); - switch_safe_free(sql); + fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE); epoch_start = (long)switch_epoch_time_now(NULL); @@ -1381,8 +1405,7 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void struct call_helper *h = cbh->rows[i]; char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count+1 where uuid='%s'", h->uuid); - fifo_execute_sql(sql, globals.sql_mutex); - switch_safe_free(sql); + fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE); } @@ -1424,8 +1447,7 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void struct call_helper *h = cbh->rows[i]; char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1 " "where uuid='%q' and ring_count > 0", h->uuid); - fifo_execute_sql(sql, globals.sql_mutex); - switch_safe_free(sql); + fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE); } } @@ -1439,8 +1461,7 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void "outbound_fail_total_count = outbound_fail_total_count+1, " "next_avail=%ld + lag + 1 where uuid='%q' and ring_count > 0", (long) switch_epoch_time_now(NULL), h->uuid); - fifo_execute_sql(sql, globals.sql_mutex); - switch_safe_free(sql); + fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE); } } @@ -1497,8 +1518,7 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void for (i = 0; i < cbh->rowcount; i++) { struct call_helper *h = cbh->rows[i]; char *sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1 where uuid='%q' and ring_count > 0", h->uuid); - fifo_execute_sql(sql, globals.sql_mutex); - switch_safe_free(sql); + fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE); } end: @@ -1608,8 +1628,7 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj) sql = switch_mprintf("update fifo_outbound set ring_count=ring_count+1 where uuid='%s'", h->uuid); - fifo_execute_sql(sql, globals.sql_mutex); - switch_safe_free(sql); + fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE); status = switch_ivr_originate(NULL, &session, &cause, originate_string, h->timeout, NULL, NULL, NULL, NULL, ovars, SOF_NONE, NULL); free(originate_string); @@ -1619,8 +1638,7 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj) sql = switch_mprintf("update fifo_outbound set ring_count=ring_count-1, " "outbound_fail_count=outbound_fail_count+1, next_avail=%ld + lag + 1 where uuid='%q'", (long) switch_epoch_time_now(NULL), h->uuid); - fifo_execute_sql(sql, globals.sql_mutex); - switch_safe_free(sql); + fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE); if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) { switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Name", node->name); @@ -2123,15 +2141,12 @@ static void dec_use_count(switch_core_session_t *session, switch_bool_t send_eve sql = switch_mprintf("delete from fifo_bridge where consumer_uuid='%q'", switch_core_session_get_uuid(session)); - fifo_execute_sql(sql, globals.sql_mutex); - switch_safe_free(sql); + fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE); del_bridge_call(outbound_id); sql = switch_mprintf("update fifo_outbound set use_count=use_count-1, stop_time=%ld, next_avail=%ld + lag + 1 where use_count > 0 and uuid='%q'", now, now, outbound_id); - - fifo_execute_sql(sql, globals.sql_mutex); - switch_safe_free(sql); + fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE); } if (send_event) { @@ -2198,9 +2213,7 @@ SWITCH_STANDARD_APP(fifo_track_call_function) sql = switch_mprintf("update fifo_outbound set stop_time=0,start_time=%ld,outbound_fail_count=0,use_count=use_count+1,%s=%s+1,%s=%s+1 where uuid='%q'", (long) switch_epoch_time_now(NULL), col1, col1, col2, col2, data); - fifo_execute_sql(sql, globals.sql_mutex); - - switch_safe_free(sql); + fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE); if (switch_channel_direction(channel) == SWITCH_CALL_DIRECTION_INBOUND) { @@ -2235,8 +2248,7 @@ static void fifo_caller_add(fifo_node_t *node, switch_core_session_t *session) switch_str_nil(switch_channel_get_variable(channel, "caller_id_number")), switch_epoch_time_now(NULL)); - fifo_execute_sql(sql, globals.sql_mutex); - switch_safe_free(sql); + fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE); } static void fifo_caller_del(const char *uuid) @@ -2249,8 +2261,7 @@ static void fifo_caller_del(const char *uuid) sql = switch_mprintf("delete from fifo_callers"); } - fifo_execute_sql(sql, globals.sql_mutex); - switch_safe_free(sql); + fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE); } @@ -3018,8 +3029,7 @@ SWITCH_STANDARD_APP(fifo_function) switch_epoch_time_now(NULL), outbound_id); - fifo_execute_sql(sql, globals.sql_mutex); - switch_safe_free(sql); + fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE); } add_bridge_call(switch_core_session_get_uuid(other_session)); @@ -3038,8 +3048,7 @@ SWITCH_STANDARD_APP(fifo_function) ); - fifo_execute_sql(sql, globals.sql_mutex); - switch_safe_free(sql); + fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE); switch_channel_set_variable(channel, SWITCH_SIGNAL_BOND_VARIABLE, switch_core_session_get_uuid(other_session)); @@ -3055,8 +3064,7 @@ SWITCH_STANDARD_APP(fifo_function) "outbound_call_count=outbound_call_count+1, next_avail=%ld + lag + 1 where uuid='%s' and use_count > 0", now, now, outbound_id); - fifo_execute_sql(sql, globals.sql_mutex); - switch_safe_free(sql); + fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE); del_bridge_call(outbound_id); @@ -3088,8 +3096,7 @@ SWITCH_STANDARD_APP(fifo_function) switch_channel_set_variable_printf(other_channel, "fifo_bridge_seconds", "%d", epoch_end - epoch_start); sql = switch_mprintf("delete from fifo_bridge where consumer_uuid='%q'", switch_core_session_get_uuid(session)); - fifo_execute_sql(sql, globals.sql_mutex); - switch_safe_free(sql); + fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_FALSE); switch_core_media_bug_pause(session); @@ -4009,6 +4016,14 @@ static switch_status_t load_config(int reload, int del_all) } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "ODBC IS NOT AVAILABLE!\n"); } + } else if (!strcasecmp(var, "db-pre-trans-execute") && !zstr(val)) { + globals.pre_trans_execute = switch_core_strdup(globals.pool, val); + } else if (!strcasecmp(var, "db-post-trans-execute") && !zstr(val)) { + globals.post_trans_execute = switch_core_strdup(globals.pool, val); + } else if (!strcasecmp(var, "db-inner-pre-trans-execute") && !zstr(val)) { + globals.inner_pre_trans_execute = switch_core_strdup(globals.pool, val); + } else if (!strcasecmp(var, "db-inner-post-trans-execute") && !zstr(val)) { + globals.inner_post_trans_execute = switch_core_strdup(globals.pool, val); } else if (!strcasecmp(var, "delete-all-outbound-member-on-startup")) { delete_all_outbound_member_on_startup = switch_true(val); } @@ -4019,6 +4034,18 @@ static switch_status_t load_config(int reload, int del_all) globals.dbname = "fifo"; } + switch_sql_queue_manager_init_name("fifo", + &globals.qm, + 2, + globals.odbc_dsn ? globals.odbc_dsn : globals.dbname, + SWITCH_MAX_TRANS, + globals.pre_trans_execute, + globals.post_trans_execute, + globals.inner_pre_trans_execute, + globals.inner_post_trans_execute); + + switch_sql_queue_manager_start(globals.qm); + if (!(dbh = fifo_get_db_handle())) { @@ -4036,8 +4063,8 @@ static switch_status_t load_config(int reload, int del_all) switch_cache_db_release_db_handle(&dbh); if (!reload) { - fifo_execute_sql("update fifo_outbound set start_time=0,stop_time=0,ring_count=0,use_count=0," - "outbound_call_count=0,outbound_fail_count=0 where static=0", globals.sql_mutex); + char *sql= "update fifo_outbound set start_time=0,stop_time=0,ring_count=0,use_count=0,outbound_call_count=0,outbound_fail_count=0 where static=0"; + fifo_execute_sql_queued(&sql, SWITCH_FALSE, SWITCH_TRUE); } if (reload) { @@ -4060,8 +4087,7 @@ static switch_status_t load_config(int reload, int del_all) sql = switch_mprintf("delete from fifo_outbound where static=1 and hostname='%q'", globals.hostname); } - fifo_execute_sql(sql, globals.sql_mutex); - switch_safe_free(sql); + fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE); if (!(node = switch_core_hash_find(globals.fifo_hash, MANUAL_QUEUE_NAME))) { node = create_node(MANUAL_QUEUE_NAME, 0, globals.sql_mutex); @@ -4214,8 +4240,7 @@ static switch_status_t load_config(int reload, int del_all) (long) switch_epoch_time_now(NULL)); switch_assert(sql); - fifo_execute_sql(sql, globals.sql_mutex); - free(sql); + fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE); free(name_dup); node->has_outbound = 1; node->member_count++; @@ -4270,8 +4295,7 @@ static void fifo_member_add(char *fifo_name, char *originate_string, int simo_co sql = switch_mprintf("delete from fifo_outbound where fifo_name='%q' and uuid = '%q'", fifo_name, digest); switch_assert(sql); - fifo_execute_sql(sql, globals.sql_mutex); - free(sql); + fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE); switch_mutex_lock(globals.mutex); @@ -4293,8 +4317,7 @@ static void fifo_member_add(char *fifo_name, char *originate_string, int simo_co digest, fifo_name, originate_string, simo_count, 0, timeout, lag, 0, (long) expires, globals.hostname, taking_calls, (long)switch_epoch_time_now(NULL)); switch_assert(sql); - fifo_execute_sql(sql, globals.sql_mutex); - free(sql); + fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE); free(name_dup); cbt.buf = outbound_count; @@ -4329,8 +4352,7 @@ static void fifo_member_del(char *fifo_name, char *originate_string) sql = switch_mprintf("delete from fifo_outbound where fifo_name='%q' and uuid = '%q' and hostname='%q'", fifo_name, digest, globals.hostname); switch_assert(sql); - fifo_execute_sql(sql, globals.sql_mutex); - free(sql); + fifo_execute_sql_queued(&sql, SWITCH_TRUE, SWITCH_TRUE); switch_mutex_lock(globals.mutex); if (!(node = switch_core_hash_find(globals.fifo_hash, fifo_name))) {