From d59cae60c70c0e4f99f0a3040b3ce6aaf69c8958 Mon Sep 17 00:00:00 2001 From: Anthony Minessale Date: Wed, 26 Apr 2006 17:18:33 +0000 Subject: [PATCH] add nitrous oxide tank to core event sql backend git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@1260 d0543943-73ff-0310-b7d9-9358b9ac24b2 --- Makefile.am | 2 +- Makefile.in | 2 +- .../endpoints/mod_dingaling/mod_dingaling.c | 1 + src/mod/endpoints/mod_wanpipe/mod_wanpipe.c | 1 + .../mod_spidermonkey/mod_spidermonkey.c | 1 + src/switch_core.c | 144 ++++++++++++++---- src/switch_event.c | 23 ++- src/switch_log.c | 1 + 8 files changed, 139 insertions(+), 36 deletions(-) diff --git a/Makefile.am b/Makefile.am index 9427700c2f..9bd8bdeb04 100644 --- a/Makefile.am +++ b/Makefile.am @@ -150,7 +150,7 @@ depends: @rm -f build/freeswitch.env @./build/addenv.sh build/freeswitch.env PREFIX $(PREFIX) @./build/addenv.sh build/freeswitch.env MAKE $(MAKE) - ./build/buildlib.sh . install sqlite-3.2.8.tar.gz --prefix=$(PREFIX) --disable-tcl --enable-threadsafe + ./build/buildlib.sh . install sqlite-3.3.5.tar.gz --prefix=$(PREFIX) --disable-tcl --enable-threadsafe ./build/buildlib.sh . install apr-1.2.6.tar.gz --prefix=$(PREFIX) ./build/buildlib.sh . install apr-util-1.2.6.tar.gz --with-apr=../apr-1.2.6 --prefix=$(PREFIX) ./build/buildlib.sh . libresample-0.1.3.tgz --prefix=$(PREFIX) diff --git a/Makefile.in b/Makefile.in index 16e9647a20..e5cfbe61f6 100644 --- a/Makefile.in +++ b/Makefile.in @@ -1050,7 +1050,7 @@ depends: @rm -f build/freeswitch.env @./build/addenv.sh build/freeswitch.env PREFIX $(PREFIX) @./build/addenv.sh build/freeswitch.env MAKE $(MAKE) - ./build/buildlib.sh . install sqlite-3.2.8.tar.gz --prefix=$(PREFIX) --disable-tcl --enable-threadsafe + ./build/buildlib.sh . install sqlite-3.3.5.tar.gz --prefix=$(PREFIX) --disable-tcl --enable-threadsafe ./build/buildlib.sh . install apr-1.2.6.tar.gz --prefix=$(PREFIX) ./build/buildlib.sh . install apr-util-1.2.6.tar.gz --with-apr=../apr-1.2.6 --prefix=$(PREFIX) ./build/buildlib.sh . libresample-0.1.3.tgz --prefix=$(PREFIX) diff --git a/src/mod/endpoints/mod_dingaling/mod_dingaling.c b/src/mod/endpoints/mod_dingaling/mod_dingaling.c index 16eb1e5ad8..5d6ad33dc2 100644 --- a/src/mod/endpoints/mod_dingaling/mod_dingaling.c +++ b/src/mod/endpoints/mod_dingaling/mod_dingaling.c @@ -220,6 +220,7 @@ static void handle_thread_launch(ldl_handle_t *handle) switch_threadattr_create(&thd_attr, module_pool); switch_threadattr_detach_set(thd_attr, 1); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); switch_thread_create(&thread, thd_attr, handle_thread_run, handle, module_pool); } diff --git a/src/mod/endpoints/mod_wanpipe/mod_wanpipe.c b/src/mod/endpoints/mod_wanpipe/mod_wanpipe.c index 53543c5c0a..53aa5b7f1e 100644 --- a/src/mod/endpoints/mod_wanpipe/mod_wanpipe.c +++ b/src/mod/endpoints/mod_wanpipe/mod_wanpipe.c @@ -1276,6 +1276,7 @@ static void pri_thread_launch(struct sangoma_pri *spri) switch_threadattr_create(&thd_attr, module_pool); switch_threadattr_detach_set(thd_attr, 1); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); switch_thread_create(&thread, thd_attr, pri_thread_run, spri, module_pool); } diff --git a/src/mod/languages/mod_spidermonkey/mod_spidermonkey.c b/src/mod/languages/mod_spidermonkey/mod_spidermonkey.c index cada70221a..9cda046007 100644 --- a/src/mod/languages/mod_spidermonkey/mod_spidermonkey.c +++ b/src/mod/languages/mod_spidermonkey/mod_spidermonkey.c @@ -2139,6 +2139,7 @@ static void js_thread_launch(char *text) switch_threadattr_create(&thd_attr, module_pool); switch_threadattr_detach_set(thd_attr, 1); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); switch_thread_create(&thread, thd_attr, js_thread_run, strdup(text), module_pool); } diff --git a/src/switch_core.c b/src/switch_core.c index 2c6b62d260..80eed61f69 100644 --- a/src/switch_core.c +++ b/src/switch_core.c @@ -35,6 +35,7 @@ //#define DEBUG_ALLOC #define SWITCH_EVENT_QUEUE_LEN 256 +#define SWITCH_SQL_QUEUE_LEN 2000 struct switch_core_session { uint32_t id; @@ -87,6 +88,7 @@ struct switch_core_runtime { const struct switch_state_handler_table *state_handlers[SWITCH_MAX_STATE_HANDLERS]; int state_handler_index; FILE *console; + switch_queue_t *sql_queue; }; /* Prototypes */ @@ -2092,7 +2094,7 @@ SWITCH_DECLARE(void) switch_core_session_destroy(switch_core_session **session) switch_event *event; if (switch_event_create(&event, SWITCH_EVENT_CHANNEL_DESTROY) == SWITCH_STATUS_SUCCESS) { - switch_channel_event_set_data(session->channel, event); + switch_channel_event_set_data((*session)->channel, event); switch_event_fire(&event); } @@ -2324,16 +2326,116 @@ SWITCH_DECLARE(switch_core_session *) switch_core_session_request_by_name(char * return switch_core_session_request(endpoint_interface, pool); } +static switch_status switch_core_sql_persistant_execute(switch_core_db *db, char *sql, uint32_t retries) +{ + char *errmsg; + switch_status status = SWITCH_STATUS_FALSE; + + while(retries > 0) { + switch_core_db_exec( + db, + sql, + NULL, + NULL, + &errmsg + ); + if (errmsg) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR [%s]\n", errmsg); + switch_core_db_free(errmsg); + switch_yield(100000); + retries--; + } else { + status = SWITCH_STATUS_SUCCESS; + break; + } + } + + return status; +} + +static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread *thread, void *obj) +{ + void *pop; + uint32_t itterations = 0; + uint8_t trans = 0; + switch_time_t last_commit = switch_time_now(); + uint32_t work = 0, freq = 500, target = 500, diff = 0; + + if (!runtime.event_db) { + runtime.event_db = switch_core_db_handle(); + } + + for(;;) { + if (switch_queue_trypop(runtime.sql_queue, &pop) == SWITCH_STATUS_SUCCESS) { + char *sql = (char *) pop; + + if (sql) { + work++; + if (itterations == 0) { + char *isql = "begin transaction CORE1;"; + if (switch_core_sql_persistant_execute(runtime.event_db, isql, 25) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL exec error! [%s]\n", isql); + } else { + trans = 1; + } + } + + itterations++; + + if (switch_core_sql_persistant_execute(runtime.event_db, sql, 25) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL exec error! [%s]\n", sql); + } + free(sql); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "SQL thread ending\n"); + break; + } + } + + if (diff < freq) { + diff = (switch_time_now() - last_commit) / 1000; + } + + if (trans && (itterations == target || diff >= freq)) { + char *sql = "end transaction CORE1"; + work++; + if (switch_core_sql_persistant_execute(runtime.event_db, sql, 25) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL exec error! [%s]\n", sql); + } + last_commit = switch_time_now(); + itterations = 0; + trans = 0; + diff = 0; + } + if (!work) { + switch_yield(1000); + } + } + return NULL; +} + + +static void switch_core_sql_thread_launch(void) +{ + switch_thread *thread; + switch_threadattr_t *thd_attr;; + + assert(runtime.memory_pool != NULL); + + switch_threadattr_create(&thd_attr, runtime.memory_pool); + switch_threadattr_detach_set(thd_attr, 1); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + switch_thread_create(&thread, thd_attr, switch_core_sql_thread, NULL, runtime.memory_pool); + +} + static void core_event_handler(switch_event *event) { char buf[1024]; char *sql = NULL; - char *errmsg; - if (!runtime.event_db) { - runtime.event_db = switch_core_db_handle(); - } + switch (event->event_id) { @@ -2419,32 +2521,12 @@ static void core_event_handler(switch_event *event) default: //buf[0] = '\0'; //switch_event_serialize(event, buf, sizeof(buf), NULL); - //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "\nCORE EVENT\n--------------------------------\n%s\n", buf); + //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "\nCORE EVENT\n--------------------------------\n%s\n", buf); break; } - - if (sql) { - uint8_t max = 25; - while(max > 0) { - switch_core_db_exec( - runtime.event_db, - sql, - NULL, - NULL, - &errmsg - ); - if (errmsg) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR [%s]\n", errmsg); - switch_core_db_free(errmsg); - switch_yield(100000); - max--; - } else { - break; - } - } - //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL [%s]\n", sql); + switch_queue_push(runtime.sql_queue, strdup(sql)); } } @@ -2494,6 +2576,7 @@ SWITCH_DECLARE(switch_status) switch_core_init(char *console) } assert(runtime.memory_pool != NULL); switch_log_init(runtime.memory_pool); + switch_core_sql_thread_launch(); if(console) { if (*console != '/') { @@ -2507,7 +2590,7 @@ SWITCH_DECLARE(switch_status) switch_core_init(char *console) } - + switch_queue_create(&runtime.sql_queue, SWITCH_SQL_QUEUE_LEN, runtime.memory_pool); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Allocated memory pool. Sessions are %u bytes\n", sizeof(struct switch_core_session)); @@ -2574,6 +2657,11 @@ SWITCH_DECLARE(switch_status) switch_core_destroy(void) switch_event_shutdown(); switch_log_shutdown(); + switch_queue_push(runtime.sql_queue, NULL); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Waiting for unfinished SQL transactions\n"); + while (switch_queue_size(runtime.sql_queue) > 0) { + switch_yield(1000); + } switch_core_db_close(runtime.db); if (runtime.memory_pool) { diff --git a/src/switch_event.c b/src/switch_event.c index 6bd49af7c0..e169c5456d 100644 --- a/src/switch_event.c +++ b/src/switch_event.c @@ -164,20 +164,31 @@ static void *SWITCH_THREAD_FUNC switch_event_thread(switch_thread *thread, void queues[2] = EVENT_QUEUE[SWITCH_PRIORITY_LOW]; for(;;) { + int any; + len[1] = switch_queue_size(EVENT_QUEUE[SWITCH_PRIORITY_NORMAL]); len[2] = switch_queue_size(EVENT_QUEUE[SWITCH_PRIORITY_LOW]); len[0] = switch_queue_size(EVENT_QUEUE[SWITCH_PRIORITY_HIGH]); + any = len[1] + len[2] + len[0]; - if (THREAD_RUNNING != 1 && !len[0] && !len[1] && !len[2]) { - break; + if (!any) { + switch_yield(1000); + if (THREAD_RUNNING != 1) { + break; + } + continue; } for(i = 0; i < 3; i++) { if (len[i]) { queue = queues[i]; - while (queue && switch_queue_trypop(queue, &pop) == SWITCH_STATUS_SUCCESS) { - out_event = pop; - switch_event_deliver(&out_event); + while(queue) { + if (switch_queue_trypop(queue, &pop) == SWITCH_STATUS_SUCCESS) { + out_event = pop; + switch_event_deliver(&out_event); + } else { + break; + } } } } @@ -185,7 +196,6 @@ static void *SWITCH_THREAD_FUNC switch_event_thread(switch_thread *thread, void if (THREAD_RUNNING < 0) { THREAD_RUNNING--; } - switch_yield(1000); } THREAD_RUNNING = 0; return NULL; @@ -299,6 +309,7 @@ SWITCH_DECLARE(switch_status) switch_event_init(switch_memory_pool *pool) switch_mutex_init(&BLOCK, SWITCH_MUTEX_NESTED, RUNTIME_POOL); switch_mutex_init(&POOL_LOCK, SWITCH_MUTEX_NESTED, RUNTIME_POOL); switch_core_hash_init(&CUSTOM_HASH, RUNTIME_POOL); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); switch_thread_create(&thread, thd_attr, switch_event_thread, NULL, RUNTIME_POOL); while (!THREAD_RUNNING) { diff --git a/src/switch_log.c b/src/switch_log.c index d7f123bdd4..7bb08d2b5c 100644 --- a/src/switch_log.c +++ b/src/switch_log.c @@ -264,6 +264,7 @@ SWITCH_DECLARE(switch_status) switch_log_init(switch_memory_pool *pool) switch_queue_create(&LOG_QUEUE, 2000, LOG_POOL); switch_mutex_init(&BINDLOCK, SWITCH_MUTEX_NESTED, LOG_POOL); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); switch_thread_create(&thread, thd_attr, log_thread, NULL, LOG_POOL); while (!THREAD_RUNNING) {