diff --git a/src/include/switch_core.h b/src/include/switch_core.h index 73607926fa..d62cd8058d 100644 --- a/src/include/switch_core.h +++ b/src/include/switch_core.h @@ -2337,8 +2337,6 @@ SWITCH_DECLARE(int) switch_cache_db_affected_rows(switch_cache_db_handle_t *dbh) SWITCH_DECLARE(void) switch_cache_db_status(switch_stream_handle_t *stream); SWITCH_DECLARE(switch_status_t) _switch_core_db_handle(switch_cache_db_handle_t ** dbh, const char *file, const char *func, int line); #define switch_core_db_handle(_a) _switch_core_db_handle(_a, __FILE__, __SWITCH_FUNC__, __LINE__) -SWITCH_DECLARE(switch_status_t) _switch_core_recovery_db_handle(switch_cache_db_handle_t ** dbh, const char *file, const char *func, int line); -#define switch_core_recovery_db_handle(_a) _switch_core_recovery_db_handle(_a, __FILE__, __SWITCH_FUNC__, __LINE__) SWITCH_DECLARE(switch_bool_t) switch_cache_db_test_reactive(switch_cache_db_handle_t *db, const char *test_sql, const char *drop_sql, const char *reactive_sql); diff --git a/src/switch_core.c b/src/switch_core.c index 8e1a6567d7..389bbde756 100644 --- a/src/switch_core.c +++ b/src/switch_core.c @@ -1921,18 +1921,6 @@ static void switch_load_core_config(const char *file) } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "ODBC AND PGSQL ARE NOT AVAILABLE!\n"); } - } else if (!strcasecmp(var, "core-recovery-db-dsn") && !zstr(val)) { - if (switch_odbc_available() || switch_pgsql_available()) { - runtime.recovery_odbc_dsn = switch_core_strdup(runtime.memory_pool, val); - if ((runtime.recovery_odbc_user = strchr(runtime.recovery_odbc_dsn, ':'))) { - *runtime.recovery_odbc_user++ = '\0'; - if ((runtime.recovery_odbc_pass = strchr(runtime.recovery_odbc_user, ':'))) { - *runtime.recovery_odbc_pass++ = '\0'; - } - } - } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "ODBC AND PGSQL ARE NOT AVAILABLE!\n"); - } } else if (!strcasecmp(var, "core-non-sqlite-db-required") && !zstr(val)) { switch_set_flag((&runtime), SCF_CORE_NON_SQLITE_DB_REQ); } else if (!strcasecmp(var, "core-dbtype") && !zstr(val)) { diff --git a/src/switch_core_sqldb.c b/src/switch_core_sqldb.c index bb82202fbc..a557a867ff 100644 --- a/src/switch_core_sqldb.c +++ b/src/switch_core_sqldb.c @@ -57,7 +57,7 @@ struct switch_cache_db_handle { static struct { switch_cache_db_handle_t *event_db; - switch_queue_t *sql_queue[2]; + switch_queue_t *sql_queue[4]; switch_memory_pool_t *memory_pool; switch_thread_t *thread; switch_thread_t *db_thread; @@ -275,53 +275,6 @@ SWITCH_DECLARE(switch_status_t) _switch_core_persist_db_handle(switch_cache_db_h } -#define SWITCH_CORE_RECOVERY_DB "core_recovery" -SWITCH_DECLARE(switch_status_t) _switch_core_recovery_db_handle(switch_cache_db_handle_t **dbh, const char *file, const char *func, int line) -{ - switch_cache_db_connection_options_t options = { {0} }; - switch_status_t r; - - if (!sql_manager.manage) { - return SWITCH_STATUS_FALSE; - } - - if (zstr(runtime.recovery_odbc_dsn)) { - if (switch_test_flag((&runtime), SCF_CORE_NON_SQLITE_DB_REQ)) { - return SWITCH_STATUS_FALSE; - } - - if (runtime.recovery_dbname) { - options.core_db_options.db_path = runtime.recovery_dbname; - } else { - options.core_db_options.db_path = SWITCH_CORE_RECOVERY_DB; - } - r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_CORE_DB, &options, file, func, line); - - } else { - char *dsn; - if ((dsn = strstr(runtime.recovery_odbc_dsn, "pgsql;")) != NULL) { - options.pgsql_options.dsn = (char*)(dsn + 6); - - r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_PGSQL, &options, file, func, line); - } else { - options.odbc_options.dsn = runtime.recovery_odbc_dsn; - options.odbc_options.user = runtime.recovery_odbc_user; - options.odbc_options.pass = runtime.recovery_odbc_pass; - - r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_ODBC, &options, file, func, line); - } - } - - /* I *think* we can do without this now, if not let me know - if (r == SWITCH_STATUS_SUCCESS && !(*dbh)->io_mutex) { - (*dbh)->io_mutex = sql_manager.io_mutex; - } - */ - - return r; -} - - #define SQL_CACHE_TIMEOUT 30 #define SQL_REG_TIMEOUT 15 @@ -1248,8 +1201,12 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, } while (sql_manager.thread_running == 1) { - if (save_sql || switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS || - switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) { + if (save_sql || + switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS || + switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS || + switch_queue_trypop(sql_manager.sql_queue[2], &pop) == SWITCH_STATUS_SUCCESS || + switch_queue_trypop(sql_manager.sql_queue[3], &pop) == SWITCH_STATUS_SUCCESS + ) { if (save_sql) { sql = save_sql; @@ -1284,7 +1241,12 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, } else { if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, - "SAVE %d %d\n", switch_queue_size(sql_manager.sql_queue[0]), switch_queue_size(sql_manager.sql_queue[1])); + "SAVE %d %d %d %d\n", + switch_queue_size(sql_manager.sql_queue[0]), + switch_queue_size(sql_manager.sql_queue[1]), + switch_queue_size(sql_manager.sql_queue[2]), + switch_queue_size(sql_manager.sql_queue[3]) + ); } save_sql = sql; sql = NULL; @@ -1304,7 +1266,8 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, } } - lc = switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]); + lc = switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]) + + switch_queue_size(sql_manager.sql_queue[2]) + switch_queue_size(sql_manager.sql_queue[3]); if (lc > SWITCH_SQL_QUEUE_PAUSE_LEN) { @@ -1330,7 +1293,12 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, if (trans && iterations && (iterations > target || !lc)) { if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, - "RUN %d %d %d\n", switch_queue_size(sql_manager.sql_queue[0]), switch_queue_size(sql_manager.sql_queue[1]), iterations); + "RUN %d %d %d %d %d\n", + switch_queue_size(sql_manager.sql_queue[0]), + switch_queue_size(sql_manager.sql_queue[1]), + switch_queue_size(sql_manager.sql_queue[2]), + switch_queue_size(sql_manager.sql_queue[3]), + iterations); } if (switch_cache_db_persistant_execute_trans(sql_manager.event_db, sqlbuf, 1) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread unable to commit transaction, records lost!\n"); @@ -1353,7 +1321,8 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, wrote = 1; } - lc = switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]); + lc = switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]) + + switch_queue_size(sql_manager.sql_queue[2]) + switch_queue_size(sql_manager.sql_queue[3]); if (!lc) { switch_thread_cond_wait(sql_manager.cond, sql_manager.cond_mutex); @@ -1378,6 +1347,14 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, free(pop); } + while (switch_queue_trypop(sql_manager.sql_queue[2], &pop) == SWITCH_STATUS_SUCCESS) { + free(pop); + } + + while (switch_queue_trypop(sql_manager.sql_queue[3], &pop) == SWITCH_STATUS_SUCCESS) { + free(pop); + } + free(sqlbuf); sql_manager.thread_running = 0; @@ -2090,7 +2067,7 @@ SWITCH_DECLARE(void) switch_core_recovery_flush(const char *technology, const ch char *sql = NULL; switch_cache_db_handle_t *dbh; - if (switch_core_recovery_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) { + if (switch_core_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n"); return; } @@ -2206,7 +2183,7 @@ SWITCH_DECLARE(int) switch_core_recovery_recover(const char *technology, const c switch_cache_db_handle_t *dbh; int r = 0; - if (switch_core_recovery_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) { + if (switch_core_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n"); return 0; } @@ -2277,18 +2254,12 @@ SWITCH_DECLARE(int) switch_core_recovery_recover(const char *technology, const c SWITCH_DECLARE(void) switch_core_recovery_untrack(switch_core_session_t *session, switch_bool_t force) { char *sql = NULL; - switch_cache_db_handle_t *dbh; switch_channel_t *channel = switch_core_session_get_channel(session); if (!switch_channel_test_flag(channel, CF_TRACKABLE)) { return; } - if (switch_core_recovery_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n"); - return; - } - if ((switch_channel_test_flag(channel, CF_RECOVERING))) { return; } @@ -2303,15 +2274,11 @@ SWITCH_DECLARE(void) switch_core_recovery_untrack(switch_core_session_t *session switch_core_get_uuid(), switch_core_session_get_uuid(session)); } - switch_cache_db_execute_sql(dbh, sql, NULL); + switch_queue_push(sql_manager.sql_queue[3], sql); switch_channel_clear_flag(channel, CF_TRACKED); - - switch_safe_free(sql); } - switch_cache_db_release_db_handle(&dbh); - } SWITCH_DECLARE(void) switch_core_recovery_track(switch_core_session_t *session) @@ -2319,7 +2286,6 @@ SWITCH_DECLARE(void) switch_core_recovery_track(switch_core_session_t *session) switch_xml_t cdr = NULL; char *xml_cdr_text = NULL; char *sql = NULL; - switch_cache_db_handle_t *dbh; switch_channel_t *channel = switch_core_session_get_channel(session); const char *profile_name; const char *technology; @@ -2328,16 +2294,9 @@ SWITCH_DECLARE(void) switch_core_recovery_track(switch_core_session_t *session) return; } - profile_name = switch_channel_get_variable_dup(channel, "recovery_profile_name", SWITCH_FALSE, -1); technology = session->endpoint_interface->interface_name; - if (switch_core_recovery_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n"); - return; - } - - if (switch_ivr_generate_xml_cdr(session, &cdr) == SWITCH_STATUS_SUCCESS) { xml_cdr_text = switch_xml_toxml_nolock(cdr, SWITCH_FALSE); switch_xml_free(cdr); @@ -2353,16 +2312,13 @@ SWITCH_DECLARE(void) switch_core_recovery_track(switch_core_session_t *session) switch_str_nil(profile_name), switch_core_get_hostname(), switch_core_session_get_uuid(session), xml_cdr_text); } - switch_cache_db_execute_sql(dbh, sql, NULL); - switch_safe_free(sql); - + switch_queue_push(sql_manager.sql_queue[2], sql); + free(xml_cdr_text); switch_channel_set_flag(channel, CF_TRACKED); } - switch_cache_db_release_db_handle(&dbh); - } @@ -2667,6 +2623,8 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_ switch_queue_create(&sql_manager.sql_queue[0], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool); switch_queue_create(&sql_manager.sql_queue[1], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool); + switch_queue_create(&sql_manager.sql_queue[2], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool); + switch_queue_create(&sql_manager.sql_queue[3], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool); switch_threadattr_create(&thd_attr, sql_manager.memory_pool); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); @@ -2691,6 +2649,8 @@ SWITCH_DECLARE(void) switch_core_sqldb_stop_thread(void) if (sql_manager.manage) { switch_queue_push(sql_manager.sql_queue[0], NULL); switch_queue_push(sql_manager.sql_queue[1], NULL); + switch_queue_push(sql_manager.sql_queue[2], NULL); + switch_queue_push(sql_manager.sql_queue[3], NULL); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Waiting for unfinished SQL transactions\n"); wake_thread(0); sql_manager.thread_running = -1; @@ -2713,7 +2673,7 @@ SWITCH_DECLARE(void) switch_core_sqldb_start_thread(void) switch_mutex_lock(sql_manager.ctl_mutex); - if (switch_core_recovery_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) { + if (switch_core_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n"); if (switch_test_flag((&runtime), SCF_CORE_NON_SQLITE_DB_REQ)) {