From 68d1c32ad145c712c1d7939e5649a0f187f20170 Mon Sep 17 00:00:00 2001 From: Anthony Minessale Date: Tue, 7 Sep 2010 10:51:02 -0500 Subject: [PATCH] FSCORE-668 --- src/switch_core_sqldb.c | 49 ++++++++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 20 deletions(-) diff --git a/src/switch_core_sqldb.c b/src/switch_core_sqldb.c index feb92b3c80..3a5e0bb6e2 100644 --- a/src/switch_core_sqldb.c +++ b/src/switch_core_sqldb.c @@ -35,6 +35,8 @@ #include #include "private/switch_core_pvt.h" +#define SQLLEN 32768 + static struct { switch_cache_db_handle_t *event_db; switch_queue_t *sql_queue[2]; @@ -537,7 +539,7 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql(switch_cache_db_hand switch (dbh->type) { default: { - status = switch_cache_db_execute_sql_chunked(dbh, (char *) sql, 32768, err); + status = switch_cache_db_execute_sql_chunked(dbh, (char *) sql, SQLLEN, err); } break; } @@ -845,7 +847,7 @@ SWITCH_DECLARE(switch_bool_t) switch_cache_db_test_reactive(switch_cache_db_hand -#define SQLLEN 1024 * 1024 + static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, void *obj) { void *pop; @@ -853,13 +855,14 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, uint8_t trans = 0, nothing_in_queue = 0; uint32_t target = 100000; switch_size_t len = 0, sql_len = SQLLEN; - char *tmp, *sqlbuf = (char *) malloc(sql_len); - char *sql; + char *sqlbuf = (char *) malloc(sql_len); + char *sql = NULL; switch_size_t newlen; int lc = 0; uint32_t loops = 0, sec = 0; uint32_t l1 = 1000; uint32_t sanity = 120; + int item_remained = 0; switch_assert(sqlbuf); @@ -897,10 +900,17 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, continue; } - if (switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS || - switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) { - sql = (char *) pop; + //printf("SIZE %d %d\n", switch_queue_size(sql_manager.sql_queue[0]), switch_queue_size(sql_manager.sql_queue[1])); + if (item_remained || switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS || + switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) { + + if (item_remained) { + item_remained = 0; + } else { + sql = (char *) pop; + } + if (sql) { newlen = strlen(sql) + 2; @@ -911,20 +921,18 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, /* ignore abnormally large strings sql strings as potential buffer overflow */ if (newlen < SQLLEN) { itterations++; - if (len + newlen > sql_len) { - sql_len = len + SQLLEN; - if (!(tmp = realloc(sqlbuf, sql_len))) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread ending on mem err\n"); - abort(); - break; - } - sqlbuf = tmp; + + if (len + newlen < sql_len) { + sprintf(sqlbuf + len, "%s;\n", sql); + len += newlen; + } else { + item_remained = 1; } - sprintf(sqlbuf + len, "%s;\n", sql); - len += newlen; - } - free(sql); + + if (!item_remained) { + free(sql); + } } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "SQL thread ending\n"); break; @@ -934,7 +942,8 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, } - if (trans && ((itterations == target) || (nothing_in_queue && ++lc >= 500))) { + if ((item_remained || (trans && ((itterations == target) || (nothing_in_queue && ++lc >= 500)))) && + (sql_manager.event_db->native_handle.core_db_dbh)) { if (switch_cache_db_persistant_execute_trans(sql_manager.event_db, sqlbuf, 1) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread unable to commit transaction, records lost!\n"); }