trying paul's patch

git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@2721 d0543943-73ff-0310-b7d9-9358b9ac24b2
This commit is contained in:
Anthony Minessale 2006-09-17 18:03:32 +00:00
parent 1fdc0a26a7
commit 32198c36e4
4 changed files with 142 additions and 17 deletions

View File

@ -6,7 +6,7 @@ LCFLAGS=-fPIC -DZTS -DPTHREADS
CFLAGS += `$(PCFG) --includes` -g3 -fno-strict-aliasing
MDIR += `$(PCFG) --extension-dir`
PHPMOD=freeswitch
PHPLDFLAGS = `$(PCFG) --ldflags` -lcrypt -lresolv -lm -ldl -lnsl -lxml2 -lz -lphp5
PHPLDFLAGS = `$(PCFG) --ldflags` -lm -ldl -lxml2 -lz -lphp5
MOD_CFLAGS += -fPIC
all: depends $(MODNAME).$(DYNAMIC_LIB_EXTEN) $(PHPMOD).$(DYNAMIC_LIB_EXTEN)

View File

@ -141,6 +141,11 @@ static void switch_core_standard_on_hold(switch_core_session_t *session);
/* The main runtime obj we keep this hidden for ourselves */
static struct switch_core_runtime runtime;
/* Mutex and conditional for sql lite */
static switch_mutex_t *SWITCH_SQL_MUTEX;
static switch_thread_cond_t *SWITCH_SQL_CONDITIONAL;
static void db_pick_path(char *dbname, char *buf, switch_size_t size)
{
@ -3208,20 +3213,23 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
uint8_t trans = 0;
switch_time_t last_commit = switch_time_now();
uint32_t freq = 1000, target = 1000, diff = 0;
//if these are null we have big problems
assert(SWITCH_SQL_MUTEX != NULL);
assert(SWITCH_SQL_CONDITIONAL != NULL);
if (!runtime.event_db) {
runtime.event_db = switch_core_db_handle();
}
switch_queue_create(&runtime.sql_queue, SWITCH_SQL_QUEUE_LEN, runtime.memory_pool);
switch_mutex_lock(SWITCH_SQL_MUTEX);
for(;;) {
uint32_t work = 0;
if (switch_queue_trypop(runtime.sql_queue, &pop) == SWITCH_STATUS_SUCCESS) {
char *sql = (char *) pop;
if (sql) {
work++;
if (itterations == 0) {
char *isql = "begin transaction CORE1;";
if (switch_core_db_persistant_execute(runtime.event_db, isql, 25) != SWITCH_STATUS_SUCCESS) {
@ -3241,6 +3249,15 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "SQL thread ending\n");
break;
}
} else {
//Are we currently in a transaction, wait accordingly
if(trans) {
//we need to finish a transaction in a bit, wait around until we have more work or that time comes
switch_thread_cond_timedwait(SWITCH_SQL_CONDITIONAL, SWITCH_SQL_MUTEX, (freq * 1000) + last_commit - switch_time_now());
} else {
//wait until we have more work
switch_thread_cond_wait(SWITCH_SQL_CONDITIONAL, SWITCH_SQL_MUTEX);
}
}
if (diff < freq) {
@ -3249,7 +3266,6 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
if (trans && (itterations == target || diff >= freq)) {
char *sql = "end transaction CORE1";
work++;
if (switch_core_db_persistant_execute(runtime.event_db, sql, 25) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL exec error! [%s]\n", sql);
}
@ -3258,9 +3274,7 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
trans = 0;
diff = 0;
}
if (!work) {
switch_yield(1000);
}
}
return NULL;
}
@ -3273,6 +3287,10 @@ static void switch_core_sql_thread_launch(void)
assert(runtime.memory_pool != NULL);
//create the mutex and conditional
switch_mutex_init(&SWITCH_SQL_MUTEX, SWITCH_MUTEX_NESTED, runtime.memory_pool);
switch_thread_cond_create(&SWITCH_SQL_CONDITIONAL, runtime.memory_pool);
switch_threadattr_create(&thd_attr, runtime.memory_pool);
switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
@ -3380,6 +3398,16 @@ static void core_event_handler(switch_event_t *event)
if (sql) {
switch_queue_push(runtime.sql_queue, sql);
//See if we need to wake up the sql thread
if(switch_mutex_trylock(SWITCH_SQL_MUTEX) == SWITCH_STATUS_SUCCESS) {
//wake up the SQL thread
switch_thread_cond_signal(SWITCH_SQL_CONDITIONAL);
//give up our lock
switch_mutex_unlock(SWITCH_SQL_MUTEX);
}
sql = NULL;
}
}
@ -3686,6 +3714,15 @@ SWITCH_DECLARE(switch_status_t) switch_core_destroy(void)
switch_event_shutdown();
switch_queue_push(runtime.sql_queue, NULL);
//See if we need to wake up the sql thread
if(switch_mutex_trylock(SWITCH_SQL_MUTEX) == SWITCH_STATUS_SUCCESS) {
//wake up the SQL thread
switch_thread_cond_signal(SWITCH_SQL_CONDITIONAL);
//give up our lock
switch_mutex_unlock(SWITCH_SQL_MUTEX);
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Waiting for unfinished SQL transactions\n");
while (switch_queue_size(runtime.sql_queue) > 0) {
switch_yield(10000);

View File

@ -35,6 +35,9 @@
static switch_event_node_t *EVENT_NODES[SWITCH_EVENT_ALL + 1] = { NULL };
static switch_mutex_t *BLOCK = NULL;
static switch_mutex_t *POOL_LOCK = NULL;
static switch_mutex_t *EVENT_QUEUE_MUTEX = NULL;
static switch_mutex_t *EVENT_QUEUE_HAVEMORE_MUTEX = NULL;
static switch_thread_cond_t *EVENT_QUEUE_CONDITIONAL = NULL;
static switch_memory_pool_t *RUNTIME_POOL = NULL;
//static switch_memory_pool_t *APOOL = NULL;
//static switch_memory_pool_t *BPOOL = NULL;
@ -44,6 +47,7 @@ static int POOL_COUNT_MAX = SWITCH_CORE_QUEUE_LEN;
static switch_hash_t *CUSTOM_HASH = NULL;
static int THREAD_RUNNING = 0;
static int EVENT_QUEUE_HAVEMORE = 0;
#if 0
static void *locked_alloc(switch_size_t len)
@ -174,25 +178,56 @@ static void *SWITCH_THREAD_FUNC switch_event_thread(switch_thread_t *thread, voi
assert(obj == NULL);
assert(POOL_LOCK != NULL);
assert(RUNTIME_POOL != NULL);
assert(EVENT_QUEUE_MUTEX != NULL);
assert(EVENT_QUEUE_HAVEMORE_MUTEX != NULL);
assert(EVENT_QUEUE_CONDITIONAL != NULL);
THREAD_RUNNING = 1;
queues[0] = EVENT_QUEUE[SWITCH_PRIORITY_HIGH];
queues[1] = EVENT_QUEUE[SWITCH_PRIORITY_NORMAL];
queues[2] = EVENT_QUEUE[SWITCH_PRIORITY_LOW];
switch_mutex_lock(EVENT_QUEUE_MUTEX);
for(;;) {
int any;
len[1] = switch_queue_size(EVENT_QUEUE[SWITCH_PRIORITY_NORMAL]);
len[2] = switch_queue_size(EVENT_QUEUE[SWITCH_PRIORITY_LOW]);
len[0] = switch_queue_size(EVENT_QUEUE[SWITCH_PRIORITY_HIGH]);
any = len[1] + len[2] + len[0];
if (!any) {
if (THREAD_RUNNING != 1) {
break;
//lock on havemore so we are the only ones poking at it while we check it
//see if we saw anything in the queues or have a check again flag
switch_mutex_lock(EVENT_QUEUE_HAVEMORE_MUTEX);
if(!EVENT_QUEUE_HAVEMORE) {
//See if we need to quit
if (THREAD_RUNNING != 1) {
//give up our lock
switch_mutex_unlock(EVENT_QUEUE_HAVEMORE_MUTEX);
//Game over
break;
}
//give up our lock
switch_mutex_unlock(EVENT_QUEUE_HAVEMORE_MUTEX);
//wait until someone tells us we have something to do
switch_thread_cond_wait(EVENT_QUEUE_CONDITIONAL, EVENT_QUEUE_MUTEX);
} else {
//Caught a race, one of the queues was updated after we looked at it
//reset our flag
EVENT_QUEUE_HAVEMORE = 0;
//Give up our lock
switch_mutex_unlock(EVENT_QUEUE_HAVEMORE_MUTEX);
}
switch_yield(1000);
//go grab some events
continue;
}
@ -304,6 +339,27 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void)
if (THREAD_RUNNING > 0) {
THREAD_RUNNING = -1;
//Lock on havemore to make sure he event thread, if currently running
// doesn't check the HAVEMORE flag before we set it
switch_mutex_lock(EVENT_QUEUE_HAVEMORE_MUTEX);
//See if the event thread is sitting
if(switch_mutex_trylock(EVENT_QUEUE_MUTEX) == SWITCH_STATUS_SUCCESS) {
//we don't need havemore anymore, the thread was sitting already
switch_mutex_unlock(EVENT_QUEUE_HAVEMORE_MUTEX);
//wake up the event thread
switch_thread_cond_signal(EVENT_QUEUE_CONDITIONAL);
//give up our lock
switch_mutex_unlock(EVENT_QUEUE_MUTEX);
} else { // it wasn't waiting which means we might have updated a queue it already looked at
//set a flag so it knows to read the queues again
EVENT_QUEUE_HAVEMORE = 1;
//variable updated, give up the mutex
switch_mutex_unlock(EVENT_QUEUE_HAVEMORE_MUTEX);
}
while (x < 100 && THREAD_RUNNING) {
switch_yield(1000);
if (THREAD_RUNNING == last) {
@ -344,6 +400,9 @@ SWITCH_DECLARE(switch_status_t) switch_event_init(switch_memory_pool_t *pool)
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Activate Eventing Engine.\n");
switch_mutex_init(&BLOCK, SWITCH_MUTEX_NESTED, RUNTIME_POOL);
switch_mutex_init(&POOL_LOCK, SWITCH_MUTEX_NESTED, RUNTIME_POOL);
switch_mutex_init(&EVENT_QUEUE_MUTEX, SWITCH_MUTEX_NESTED, RUNTIME_POOL);
switch_mutex_init(&EVENT_QUEUE_HAVEMORE_MUTEX, SWITCH_MUTEX_NESTED, RUNTIME_POOL);
switch_thread_cond_create(&EVENT_QUEUE_CONDITIONAL, RUNTIME_POOL);
switch_core_hash_init(&CUSTOM_HASH, RUNTIME_POOL);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_thread_create(&thread, thd_attr, switch_event_thread, NULL, RUNTIME_POOL);
@ -662,6 +721,10 @@ SWITCH_DECLARE(switch_status_t) switch_event_fire_detailed(char *file, char *fun
assert(BLOCK != NULL);
assert(RUNTIME_POOL != NULL);
assert(EVENT_QUEUE_HAVEMORE_MUTEX != NULL);
assert(EVENT_QUEUE_MUTEX != NULL);
assert(EVENT_QUEUE_CONDITIONAL != NULL);
assert(RUNTIME_POOL != NULL);
if (THREAD_RUNNING <= 0) {
/* sorry we're closed */
@ -690,6 +753,29 @@ SWITCH_DECLARE(switch_status_t) switch_event_fire_detailed(char *file, char *fun
}
switch_queue_push(EVENT_QUEUE[(*event)->priority], *event);
//Lock on havemore to make sure he event thread, if currently running
// doesn't check the HAVEMORE flag before we set it
switch_mutex_lock(EVENT_QUEUE_HAVEMORE_MUTEX);
//See if the event thread is sitting
if(switch_mutex_trylock(EVENT_QUEUE_MUTEX) == SWITCH_STATUS_SUCCESS) {
//we don't need havemore anymore, the thread was sitting already
switch_mutex_unlock(EVENT_QUEUE_HAVEMORE_MUTEX);
//wake up the event thread
switch_thread_cond_signal(EVENT_QUEUE_CONDITIONAL);
//give up our lock
switch_mutex_unlock(EVENT_QUEUE_MUTEX);
} else { // it wasn't waiting which means we might have updated a queue it already looked at
//set a flag so it knows to read the queues again
EVENT_QUEUE_HAVEMORE = 1;
//variable updated, give up the mutex
switch_mutex_unlock(EVENT_QUEUE_HAVEMORE_MUTEX);
}
*event = NULL;
return SWITCH_STATUS_SUCCESS;

View File

@ -483,12 +483,14 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_create(switch_rtp_t **new_rtp_session
timer_name = "soft";
}
if (switch_core_timer_init(&rtp_session->timer, timer_name, ms_per_packet / 1000, packet_size, rtp_session->pool) == SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Starting timer [%s] %d bytes per %dms\n", timer_name, packet_size, ms_per_packet);
} else {
memset(&rtp_session->timer, 0, sizeof(rtp_session->timer));
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error starting timer [%s], async RTP disabled\n", timer_name);
switch_clear_flag_locked(rtp_session, SWITCH_RTP_FLAG_USE_TIMER);
if (timer_name) {
if (switch_core_timer_init(&rtp_session->timer, timer_name, ms_per_packet / 1000, packet_size, rtp_session->pool) == SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Starting timer [%s] %d bytes per %dms\n", timer_name, packet_size, ms_per_packet);
} else {
memset(&rtp_session->timer, 0, sizeof(rtp_session->timer));
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error starting timer [%s], async RTP disabled\n", timer_name);
switch_clear_flag_locked(rtp_session, SWITCH_RTP_FLAG_USE_TIMER);
}
}
rtp_session->ready++;