From 58cae7723faa0ae2f82c74f4c510c1f36311883f Mon Sep 17 00:00:00 2001 From: Anthony Minessale Date: Wed, 17 Dec 2008 20:43:13 +0000 Subject: [PATCH] fix bottleneck in sql thread git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@10858 d0543943-73ff-0310-b7d9-9358b9ac24b2 --- src/switch_core_db.c | 2 +- src/switch_core_sqldb.c | 83 +++++++++++++++++++++++++++-------------- 2 files changed, 56 insertions(+), 29 deletions(-) diff --git a/src/switch_core_db.c b/src/switch_core_db.c index 2bfc877563..3b0ae3bfbe 100644 --- a/src/switch_core_db.c +++ b/src/switch_core_db.c @@ -86,7 +86,7 @@ SWITCH_DECLARE(int) switch_core_db_exec(switch_core_db_t *db, const char *sql, s if (ret == SQLITE_BUSY || ret == SQLITE_LOCKED) { if (sane > 1) { switch_safe_free(err); - switch_yield(20000); + switch_cond_next(); continue; } } else { diff --git a/src/switch_core_sqldb.c b/src/switch_core_sqldb.c index 1bc23c8335..acac6b1f89 100644 --- a/src/switch_core_sqldb.c +++ b/src/switch_core_sqldb.c @@ -38,7 +38,7 @@ static struct { switch_core_db_t *db; switch_core_db_t *event_db; - switch_queue_t *sql_queue; + switch_queue_t *sql_queue[2]; switch_memory_pool_t *memory_pool; int thread_running; } sql_manager; @@ -61,7 +61,7 @@ again: while (begin_retries > 0) { again = 0; - switch_core_db_exec(db, "begin transaction", NULL, NULL, &errmsg); + switch_core_db_exec(db, "BEGIN", NULL, NULL, &errmsg); if (errmsg) { begin_retries--; @@ -74,7 +74,7 @@ again: errmsg = NULL; if (again) { - switch_core_db_exec(db, "end transaction", NULL, NULL, NULL); + switch_core_db_exec(db, "COMMIT", NULL, NULL, NULL); goto again; } @@ -109,7 +109,7 @@ again: done: - switch_core_db_exec(db, "end transaction", NULL, NULL, NULL); + switch_core_db_exec(db, "COMMIT", NULL, NULL, NULL); return status; } @@ -151,13 +151,13 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t * thread, void *pop; uint32_t itterations = 0; uint8_t trans = 0, nothing_in_queue = 0; - uint32_t target = 1000; + uint32_t target = 50000; switch_size_t len = 0, sql_len = SQLLEN; char *tmp, *sqlbuf = (char *) malloc(sql_len); char *sql; switch_size_t newlen; int lc = 0; - + switch_assert(sqlbuf); if (!sql_manager.event_db) { @@ -167,7 +167,8 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t * thread, sql_manager.thread_running = 1; for (;;) { - if (switch_queue_trypop(sql_manager.sql_queue, &pop) == SWITCH_STATUS_SUCCESS) { + if (switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS || + switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) { sql = (char *) pop; if (sql) { @@ -201,9 +202,9 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t * thread, } else { nothing_in_queue = 1; } - - - if (trans && ((itterations == target) || nothing_in_queue)) { + + + if (trans && ((itterations == target) || (nothing_in_queue && ++lc >= 500))) { if (switch_core_db_persistant_execute_trans(sql_manager.event_db, sqlbuf, 100) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread unable to commit transaction, records lost!\n"); } @@ -212,19 +213,19 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t * thread, nothing_in_queue = 0; len = 0; *sqlbuf = '\0'; - } - - if (++lc == 300000) { - switch_core_db_exec(sql_manager.db, "vacuum;", NULL, NULL, NULL); lc = 0; } - + if (nothing_in_queue) { switch_cond_next(); } } - while (switch_queue_trypop(sql_manager.sql_queue, &pop) == SWITCH_STATUS_SUCCESS) { + while (switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS) { + free(pop); + } + + while (switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) { free(pop); } @@ -288,17 +289,20 @@ static void core_event_handler(switch_event_t *event) sql = switch_mprintf ("update channels set read_codec='%q',read_rate='%q',write_codec='%q',write_rate='%q' where uuid='%q'", - switch_event_get_header_nil(event, "channel-read-codec-name"), switch_event_get_header_nil(event, - "channel-read-codec-rate"), - switch_event_get_header_nil(event, "channel-write-codec-name"), switch_event_get_header_nil(event, - "channel-write-codec-rate"), + switch_event_get_header_nil(event, "channel-read-codec-name"), + switch_event_get_header_nil(event, "channel-read-codec-rate"), + switch_event_get_header_nil(event, "channel-write-codec-name"), + switch_event_get_header_nil(event, "channel-write-codec-rate"), switch_event_get_header_nil(event, "unique-id")); break; case SWITCH_EVENT_CHANNEL_EXECUTE: sql = switch_mprintf("update channels set application='%q',application_data='%q' where uuid='%q'", switch_event_get_header_nil(event, "application"), - switch_event_get_header_nil(event, "application-data"), switch_event_get_header_nil(event, "unique-id") - ); + switch_event_get_header_nil(event, "application-data"), + switch_event_get_header_nil(event, "unique-id") + + ); + break; case SWITCH_EVENT_CHANNEL_STATE: { @@ -384,7 +388,11 @@ static void core_event_handler(switch_event_t *event) } if (sql) { - switch_queue_push(sql_manager.sql_queue, sql); + if (switch_stristr("update channels", sql)) { + switch_queue_push(sql_manager.sql_queue[1], sql); + } else { + switch_queue_push(sql_manager.sql_queue[0], sql); + } sql = NULL; } } @@ -442,7 +450,7 @@ void switch_core_sqldb_start(switch_memory_pool_t *pool) " read_rate VARCHAR(255),\n" " write_codec VARCHAR(255),\n" " write_rate VARCHAR(255)\n" - ");\n"; + ");\ncreate index uuindex on channels (uuid);\n"; char create_calls_sql[] = "CREATE TABLE calls (\n" " created VARCHAR(255),\n" @@ -458,7 +466,9 @@ void switch_core_sqldb_start(switch_memory_pool_t *pool) " callee_dest_num VARCHAR(255),\n" " callee_chan_name VARCHAR(255),\n" " callee_uuid VARCHAR(255)\n" - ");\n"; + ");\n" + "create index eruuindex on calls (caller_uuid);\n" + "create index eeuuindex on calls (callee_uuid);\n"; char create_interfaces_sql[] = "CREATE TABLE interfaces (\n" " type VARCHAR(255),\n" @@ -488,6 +498,17 @@ void switch_core_sqldb_start(switch_memory_pool_t *pool) switch_core_db_test_reactive(sql_manager.db, "select sticky from aliases", "DROP TABLE aliases", create_alias_sql); switch_core_db_exec(sql_manager.db, "delete from complete where sticky=0", NULL, NULL, NULL); switch_core_db_exec(sql_manager.db, "delete from aliases where sticky=0", NULL, NULL, NULL); + switch_core_db_exec(sql_manager.db, "create index if not exists alias1 on aliases (alias)", NULL, NULL, NULL); + switch_core_db_exec(sql_manager.db, "create index if not exists complete1 on complete (a1)", NULL, NULL, NULL); + switch_core_db_exec(sql_manager.db, "create index if not exists complete2 on complete (a2)", NULL, NULL, NULL); + switch_core_db_exec(sql_manager.db, "create index if not exists complete3 on complete (a3)", NULL, NULL, NULL); + switch_core_db_exec(sql_manager.db, "create index if not exists complete4 on complete (a4)", NULL, NULL, NULL); + switch_core_db_exec(sql_manager.db, "create index if not exists complete5 on complete (a5)", NULL, NULL, NULL); + switch_core_db_exec(sql_manager.db, "create index if not exists complete6 on complete (a6)", NULL, NULL, NULL); + switch_core_db_exec(sql_manager.db, "create index if not exists complete7 on complete (a7)", NULL, NULL, NULL); + switch_core_db_exec(sql_manager.db, "create index if not exists complete8 on complete (a8)", NULL, NULL, NULL); + switch_core_db_exec(sql_manager.db, "create index if not exists complete9 on complete (a9)", NULL, NULL, NULL); + switch_core_db_exec(sql_manager.db, "create index if not exists complete10 on complete (a10)", NULL, NULL, NULL); switch_core_db_exec(sql_manager.db, create_channels_sql, NULL, NULL, NULL); switch_core_db_exec(sql_manager.db, create_calls_sql, NULL, NULL, NULL); switch_core_db_exec(sql_manager.db, create_interfaces_sql, NULL, NULL, NULL); @@ -497,12 +518,14 @@ void switch_core_sqldb_start(switch_memory_pool_t *pool) } } - switch_queue_create(&sql_manager.sql_queue, SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool); + switch_queue_create(&sql_manager.sql_queue[0], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool); + switch_queue_create(&sql_manager.sql_queue[1], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool); switch_threadattr_create(&thd_attr, sql_manager.memory_pool); switch_threadattr_detach_set(thd_attr, 1); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); switch_thread_create(&thread, thd_attr, switch_core_sql_thread, NULL, sql_manager.memory_pool); + while (!sql_manager.thread_running) { switch_yield(10000); } @@ -510,10 +533,14 @@ void switch_core_sqldb_start(switch_memory_pool_t *pool) void switch_core_sqldb_stop(void) { - switch_queue_push(sql_manager.sql_queue, NULL); + switch_queue_push(sql_manager.sql_queue[0], NULL); + switch_queue_push(sql_manager.sql_queue[1], NULL); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Waiting for unfinished SQL transactions\n"); - while (switch_queue_size(sql_manager.sql_queue) > 0) { + while (switch_queue_size(sql_manager.sql_queue[0]) > 0) { + switch_yield(10000); + } + while (switch_queue_size(sql_manager.sql_queue[1]) > 0) { switch_yield(10000); }