fold recovery db to only use core db and use transactional stmts for recovery stmts

This commit is contained in:
Anthony Minessale 2012-10-08 17:59:38 -05:00
parent a0a584b0c9
commit 6998695f01
3 changed files with 41 additions and 95 deletions

View File

@ -2337,8 +2337,6 @@ SWITCH_DECLARE(int) switch_cache_db_affected_rows(switch_cache_db_handle_t *dbh)
SWITCH_DECLARE(void) switch_cache_db_status(switch_stream_handle_t *stream);
SWITCH_DECLARE(switch_status_t) _switch_core_db_handle(switch_cache_db_handle_t ** dbh, const char *file, const char *func, int line);
#define switch_core_db_handle(_a) _switch_core_db_handle(_a, __FILE__, __SWITCH_FUNC__, __LINE__)
SWITCH_DECLARE(switch_status_t) _switch_core_recovery_db_handle(switch_cache_db_handle_t ** dbh, const char *file, const char *func, int line);
#define switch_core_recovery_db_handle(_a) _switch_core_recovery_db_handle(_a, __FILE__, __SWITCH_FUNC__, __LINE__)
SWITCH_DECLARE(switch_bool_t) switch_cache_db_test_reactive(switch_cache_db_handle_t *db,
const char *test_sql, const char *drop_sql, const char *reactive_sql);

View File

@ -1921,18 +1921,6 @@ static void switch_load_core_config(const char *file)
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "ODBC AND PGSQL ARE NOT AVAILABLE!\n");
}
} else if (!strcasecmp(var, "core-recovery-db-dsn") && !zstr(val)) {
if (switch_odbc_available() || switch_pgsql_available()) {
runtime.recovery_odbc_dsn = switch_core_strdup(runtime.memory_pool, val);
if ((runtime.recovery_odbc_user = strchr(runtime.recovery_odbc_dsn, ':'))) {
*runtime.recovery_odbc_user++ = '\0';
if ((runtime.recovery_odbc_pass = strchr(runtime.recovery_odbc_user, ':'))) {
*runtime.recovery_odbc_pass++ = '\0';
}
}
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "ODBC AND PGSQL ARE NOT AVAILABLE!\n");
}
} else if (!strcasecmp(var, "core-non-sqlite-db-required") && !zstr(val)) {
switch_set_flag((&runtime), SCF_CORE_NON_SQLITE_DB_REQ);
} else if (!strcasecmp(var, "core-dbtype") && !zstr(val)) {

View File

@ -57,7 +57,7 @@ struct switch_cache_db_handle {
static struct {
switch_cache_db_handle_t *event_db;
switch_queue_t *sql_queue[2];
switch_queue_t *sql_queue[4];
switch_memory_pool_t *memory_pool;
switch_thread_t *thread;
switch_thread_t *db_thread;
@ -275,53 +275,6 @@ SWITCH_DECLARE(switch_status_t) _switch_core_persist_db_handle(switch_cache_db_h
}
#define SWITCH_CORE_RECOVERY_DB "core_recovery"
SWITCH_DECLARE(switch_status_t) _switch_core_recovery_db_handle(switch_cache_db_handle_t **dbh, const char *file, const char *func, int line)
{
switch_cache_db_connection_options_t options = { {0} };
switch_status_t r;
if (!sql_manager.manage) {
return SWITCH_STATUS_FALSE;
}
if (zstr(runtime.recovery_odbc_dsn)) {
if (switch_test_flag((&runtime), SCF_CORE_NON_SQLITE_DB_REQ)) {
return SWITCH_STATUS_FALSE;
}
if (runtime.recovery_dbname) {
options.core_db_options.db_path = runtime.recovery_dbname;
} else {
options.core_db_options.db_path = SWITCH_CORE_RECOVERY_DB;
}
r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_CORE_DB, &options, file, func, line);
} else {
char *dsn;
if ((dsn = strstr(runtime.recovery_odbc_dsn, "pgsql;")) != NULL) {
options.pgsql_options.dsn = (char*)(dsn + 6);
r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_PGSQL, &options, file, func, line);
} else {
options.odbc_options.dsn = runtime.recovery_odbc_dsn;
options.odbc_options.user = runtime.recovery_odbc_user;
options.odbc_options.pass = runtime.recovery_odbc_pass;
r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_ODBC, &options, file, func, line);
}
}
/* I *think* we can do without this now, if not let me know
if (r == SWITCH_STATUS_SUCCESS && !(*dbh)->io_mutex) {
(*dbh)->io_mutex = sql_manager.io_mutex;
}
*/
return r;
}
#define SQL_CACHE_TIMEOUT 30
#define SQL_REG_TIMEOUT 15
@ -1248,8 +1201,12 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
}
while (sql_manager.thread_running == 1) {
if (save_sql || switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS ||
switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) {
if (save_sql ||
switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS ||
switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS ||
switch_queue_trypop(sql_manager.sql_queue[2], &pop) == SWITCH_STATUS_SUCCESS ||
switch_queue_trypop(sql_manager.sql_queue[3], &pop) == SWITCH_STATUS_SUCCESS
) {
if (save_sql) {
sql = save_sql;
@ -1284,7 +1241,12 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
} else {
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
"SAVE %d %d\n", switch_queue_size(sql_manager.sql_queue[0]), switch_queue_size(sql_manager.sql_queue[1]));
"SAVE %d %d %d %d\n",
switch_queue_size(sql_manager.sql_queue[0]),
switch_queue_size(sql_manager.sql_queue[1]),
switch_queue_size(sql_manager.sql_queue[2]),
switch_queue_size(sql_manager.sql_queue[3])
);
}
save_sql = sql;
sql = NULL;
@ -1304,7 +1266,8 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
}
}
lc = switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]);
lc = switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]) +
switch_queue_size(sql_manager.sql_queue[2]) + switch_queue_size(sql_manager.sql_queue[3]);
if (lc > SWITCH_SQL_QUEUE_PAUSE_LEN) {
@ -1330,7 +1293,12 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
if (trans && iterations && (iterations > target || !lc)) {
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
"RUN %d %d %d\n", switch_queue_size(sql_manager.sql_queue[0]), switch_queue_size(sql_manager.sql_queue[1]), iterations);
"RUN %d %d %d %d %d\n",
switch_queue_size(sql_manager.sql_queue[0]),
switch_queue_size(sql_manager.sql_queue[1]),
switch_queue_size(sql_manager.sql_queue[2]),
switch_queue_size(sql_manager.sql_queue[3]),
iterations);
}
if (switch_cache_db_persistant_execute_trans(sql_manager.event_db, sqlbuf, 1) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread unable to commit transaction, records lost!\n");
@ -1353,7 +1321,8 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
wrote = 1;
}
lc = switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]);
lc = switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]) +
switch_queue_size(sql_manager.sql_queue[2]) + switch_queue_size(sql_manager.sql_queue[3]);
if (!lc) {
switch_thread_cond_wait(sql_manager.cond, sql_manager.cond_mutex);
@ -1378,6 +1347,14 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
free(pop);
}
while (switch_queue_trypop(sql_manager.sql_queue[2], &pop) == SWITCH_STATUS_SUCCESS) {
free(pop);
}
while (switch_queue_trypop(sql_manager.sql_queue[3], &pop) == SWITCH_STATUS_SUCCESS) {
free(pop);
}
free(sqlbuf);
sql_manager.thread_running = 0;
@ -2090,7 +2067,7 @@ SWITCH_DECLARE(void) switch_core_recovery_flush(const char *technology, const ch
char *sql = NULL;
switch_cache_db_handle_t *dbh;
if (switch_core_recovery_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
if (switch_core_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
return;
}
@ -2206,7 +2183,7 @@ SWITCH_DECLARE(int) switch_core_recovery_recover(const char *technology, const c
switch_cache_db_handle_t *dbh;
int r = 0;
if (switch_core_recovery_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
if (switch_core_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
return 0;
}
@ -2277,18 +2254,12 @@ SWITCH_DECLARE(int) switch_core_recovery_recover(const char *technology, const c
SWITCH_DECLARE(void) switch_core_recovery_untrack(switch_core_session_t *session, switch_bool_t force)
{
char *sql = NULL;
switch_cache_db_handle_t *dbh;
switch_channel_t *channel = switch_core_session_get_channel(session);
if (!switch_channel_test_flag(channel, CF_TRACKABLE)) {
return;
}
if (switch_core_recovery_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
return;
}
if ((switch_channel_test_flag(channel, CF_RECOVERING))) {
return;
}
@ -2303,15 +2274,11 @@ SWITCH_DECLARE(void) switch_core_recovery_untrack(switch_core_session_t *session
switch_core_get_uuid(), switch_core_session_get_uuid(session));
}
switch_cache_db_execute_sql(dbh, sql, NULL);
switch_queue_push(sql_manager.sql_queue[3], sql);
switch_channel_clear_flag(channel, CF_TRACKED);
switch_safe_free(sql);
}
switch_cache_db_release_db_handle(&dbh);
}
SWITCH_DECLARE(void) switch_core_recovery_track(switch_core_session_t *session)
@ -2319,7 +2286,6 @@ SWITCH_DECLARE(void) switch_core_recovery_track(switch_core_session_t *session)
switch_xml_t cdr = NULL;
char *xml_cdr_text = NULL;
char *sql = NULL;
switch_cache_db_handle_t *dbh;
switch_channel_t *channel = switch_core_session_get_channel(session);
const char *profile_name;
const char *technology;
@ -2328,16 +2294,9 @@ SWITCH_DECLARE(void) switch_core_recovery_track(switch_core_session_t *session)
return;
}
profile_name = switch_channel_get_variable_dup(channel, "recovery_profile_name", SWITCH_FALSE, -1);
technology = session->endpoint_interface->interface_name;
if (switch_core_recovery_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
return;
}
if (switch_ivr_generate_xml_cdr(session, &cdr) == SWITCH_STATUS_SUCCESS) {
xml_cdr_text = switch_xml_toxml_nolock(cdr, SWITCH_FALSE);
switch_xml_free(cdr);
@ -2353,16 +2312,13 @@ SWITCH_DECLARE(void) switch_core_recovery_track(switch_core_session_t *session)
switch_str_nil(profile_name), switch_core_get_hostname(), switch_core_session_get_uuid(session), xml_cdr_text);
}
switch_cache_db_execute_sql(dbh, sql, NULL);
switch_safe_free(sql);
switch_queue_push(sql_manager.sql_queue[2], sql);
free(xml_cdr_text);
switch_channel_set_flag(channel, CF_TRACKED);
}
switch_cache_db_release_db_handle(&dbh);
}
@ -2667,6 +2623,8 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_
switch_queue_create(&sql_manager.sql_queue[0], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool);
switch_queue_create(&sql_manager.sql_queue[1], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool);
switch_queue_create(&sql_manager.sql_queue[2], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool);
switch_queue_create(&sql_manager.sql_queue[3], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool);
switch_threadattr_create(&thd_attr, sql_manager.memory_pool);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
@ -2691,6 +2649,8 @@ SWITCH_DECLARE(void) switch_core_sqldb_stop_thread(void)
if (sql_manager.manage) {
switch_queue_push(sql_manager.sql_queue[0], NULL);
switch_queue_push(sql_manager.sql_queue[1], NULL);
switch_queue_push(sql_manager.sql_queue[2], NULL);
switch_queue_push(sql_manager.sql_queue[3], NULL);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Waiting for unfinished SQL transactions\n");
wake_thread(0);
sql_manager.thread_running = -1;
@ -2713,7 +2673,7 @@ SWITCH_DECLARE(void) switch_core_sqldb_start_thread(void)
switch_mutex_lock(sql_manager.ctl_mutex);
if (switch_core_recovery_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
if (switch_core_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
if (switch_test_flag((&runtime), SCF_CORE_NON_SQLITE_DB_REQ)) {