Merge pull request #475 in FS/freeswitch from ~ARTURZ/freeswitch:FS-8142-switch_core_session-thread-cache-races to master

* commit 'cd4c3188e4f715ff129dc4eea1a4ba50140c2a42':
  FS-8142 Fix a thread cache thread-safety and caching
This commit is contained in:
Anthony Minessale II 2015-09-09 11:38:14 -05:00 committed by Anthony Minessale
parent 4892f95216
commit b72c89cd2c
2 changed files with 35 additions and 177 deletions

View File

@ -290,16 +290,10 @@ struct switch_session_manager {
uint32_t session_limit;
switch_size_t session_id;
switch_queue_t *thread_queue;
switch_thread_t *manager_thread;
switch_mutex_t *mutex;
switch_thread_cond_t *cond;
switch_mutex_t *cond_mutex;
switch_mutex_t *cond2_mutex;
int ready;
int running;
int busy;
int popping;
int starting;
};
extern struct switch_session_manager session_manager;

View File

@ -1655,47 +1655,21 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_worker(switch_th
{
switch_thread_pool_node_t *node = (switch_thread_pool_node_t *) obj;
switch_memory_pool_t *pool = node->pool;
void *pop;
int check = 0;
switch_mutex_lock(session_manager.mutex);
session_manager.starting--;
session_manager.running++;
switch_mutex_unlock(session_manager.mutex);
#ifdef DEBUG_THREAD_POOL
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Started\n", (long) thread);
#endif
while(session_manager.ready) {
switch_status_t check_status;
pop = NULL;
if (check) {
check_status = switch_queue_trypop(session_manager.thread_queue, &pop);
} else {
switch_mutex_lock(session_manager.mutex);
session_manager.popping++;
switch_mutex_unlock(session_manager.mutex);
check_status = switch_queue_pop(session_manager.thread_queue, &pop);
switch_mutex_lock(session_manager.mutex);
session_manager.popping--;
switch_mutex_unlock(session_manager.mutex);
}
if (check_status == SWITCH_STATUS_SUCCESS && pop) {
while (1) {
void *pop;
switch_status_t check_status = switch_queue_pop_timeout(session_manager.thread_queue, &pop, apr_time_from_sec(5));
if (check_status == SWITCH_STATUS_SUCCESS) {
switch_thread_data_t *td = (switch_thread_data_t *) pop;
if (!td) break;
switch_mutex_lock(session_manager.mutex);
session_manager.busy++;
switch_mutex_unlock(session_manager.mutex);
#ifdef DEBUG_THREAD_POOL
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Processing\n", (long) thread);
#endif
td->func(thread, td->obj);
if (td->pool) {
@ -1711,23 +1685,22 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_worker(switch_th
switch_mutex_lock(session_manager.mutex);
session_manager.busy--;
switch_mutex_unlock(session_manager.mutex);
} else {
if (check) {
switch_mutex_lock(session_manager.mutex);
if (!switch_status_is_timeup(check_status) || session_manager.running > session_manager.busy) {
if (!--session_manager.running) {
switch_thread_cond_signal(session_manager.cond);
}
switch_mutex_unlock(session_manager.mutex);
break;
}
check++;
switch_mutex_unlock(session_manager.mutex);
}
}
#ifdef DEBUG_THREAD_POOL
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Ended\n", (long) thread);
#endif
switch_mutex_lock(session_manager.mutex);
session_manager.running--;
switch_mutex_unlock(session_manager.mutex);
switch_core_destroy_memory_pool(&pool);
return NULL;
}
@ -1754,46 +1727,18 @@ static void thread_launch_failure(void)
switch_mutex_unlock(session_manager.mutex);
}
static int wake_queue(void)
{
switch_status_t status;
int tries = 0;
top:
status = switch_mutex_trylock(session_manager.cond_mutex);
if (status == SWITCH_STATUS_SUCCESS) {
switch_thread_cond_signal(session_manager.cond);
switch_mutex_unlock(session_manager.cond_mutex);
return 1;
} else {
if (switch_mutex_trylock(session_manager.cond2_mutex) == SWITCH_STATUS_SUCCESS) {
switch_mutex_unlock(session_manager.cond2_mutex);
} else {
if (++tries < 10) {
switch_cond_next();
goto top;
}
}
}
return 0;
}
static switch_status_t check_queue(void)
{
switch_status_t status = SWITCH_STATUS_FALSE;
int ttl = 0;
int x = 0;
switch_mutex_lock(session_manager.mutex);
ttl = switch_queue_size(session_manager.thread_queue);
x = ((session_manager.running + session_manager.starting) - session_manager.busy);
if (session_manager.running >= ++session_manager.busy) {
switch_mutex_unlock(session_manager.mutex);
return SWITCH_STATUS_SUCCESS;
}
++session_manager.running;
switch_mutex_unlock(session_manager.mutex);
while (x < ttl) {
{
switch_thread_t *thread;
switch_threadattr_t *thd_attr;
switch_memory_pool_t *pool;
@ -1809,6 +1754,11 @@ static switch_status_t check_queue(void)
switch_threadattr_priority_set(thd_attr, SWITCH_PRI_LOW);
if (switch_thread_create(&thread, thd_attr, switch_core_session_thread_pool_worker, node, node->pool) != SWITCH_STATUS_SUCCESS) {
switch_mutex_lock(session_manager.mutex);
if (!--session_manager.running) {
switch_thread_cond_signal(session_manager.cond);
}
switch_mutex_unlock(session_manager.mutex);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Thread Failure!\n");
switch_core_destroy_memory_pool(&pool);
status = SWITCH_STATUS_GENERR;
@ -1816,76 +1766,11 @@ static switch_status_t check_queue(void)
} else {
status = SWITCH_STATUS_SUCCESS;
}
switch_mutex_lock(session_manager.mutex);
session_manager.starting++;
switch_mutex_unlock(session_manager.mutex);
x++;
}
return status;
}
static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_manager(switch_thread_t *thread, void *obj)
{
uint32_t sleep = 10000000;
switch_time_t next = switch_micro_time_now() + sleep;
switch_mutex_lock(session_manager.cond_mutex);
while(session_manager.ready) {
int check = 1;
int ttl = 0;
uint32_t xsleep = sleep;
switch_mutex_lock(session_manager.mutex);
ttl = switch_queue_size(session_manager.thread_queue);
switch_mutex_unlock(session_manager.mutex);
if (!ttl) {
xsleep = 10000;
}
if (switch_mutex_trylock(session_manager.cond2_mutex) == SWITCH_STATUS_SUCCESS) {
switch_thread_cond_timedwait(session_manager.cond, session_manager.cond_mutex, xsleep);
switch_mutex_unlock(session_manager.cond2_mutex);
}
if (switch_micro_time_now() >= next) {
if (session_manager.popping) {
#ifdef DEBUG_THREAD_POOL
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10,
"Thread pool: running:%d busy:%d popping:%d\n", session_manager.running, session_manager.busy, session_manager.popping);
#endif
switch_queue_interrupt_all(session_manager.thread_queue);
sleep = 100000;
check = 0;
} else {
sleep = 10000000;
}
}
if (check) check_queue();
next = switch_micro_time_now() + sleep;
}
switch_mutex_unlock(session_manager.cond_mutex);
while(session_manager.running) {
switch_queue_interrupt_all(session_manager.thread_queue);
switch_yield(20000);
}
return NULL;
}
SWITCH_DECLARE(switch_status_t) switch_thread_pool_launch_thread(switch_thread_data_t **tdp)
{
switch_status_t status = SWITCH_STATUS_SUCCESS;
@ -1896,8 +1781,8 @@ SWITCH_DECLARE(switch_status_t) switch_thread_pool_launch_thread(switch_thread_d
td = *tdp;
*tdp = NULL;
switch_queue_push(session_manager.thread_queue, td);
wake_queue();
status = switch_queue_push(session_manager.thread_queue, td);
check_queue();
return status;
}
@ -1913,14 +1798,13 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_thread_pool_launch(switch_co
} else if (switch_test_flag(session, SSF_THREAD_STARTED)) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_CRIT, "Cannot launch thread again after it has already been run!\n");
} else {
status = SWITCH_STATUS_SUCCESS;
switch_set_flag(session, SSF_THREAD_RUNNING);
switch_set_flag(session, SSF_THREAD_STARTED);
td = switch_core_session_alloc(session, sizeof(*td));
td->obj = session;
td->func = switch_core_session_thread;
switch_queue_push(session_manager.thread_queue, td);
wake_queue();
status = switch_queue_push(session_manager.thread_queue, td);
check_queue();
}
switch_mutex_unlock(session->mutex);
@ -2560,39 +2444,19 @@ void switch_core_session_init(switch_memory_pool_t *pool)
session_manager.session_id = 1;
session_manager.memory_pool = pool;
switch_core_hash_init(&session_manager.session_table);
if (switch_test_flag((&runtime), SCF_SESSION_THREAD_POOL)) {
switch_threadattr_t *thd_attr;
switch_mutex_init(&session_manager.mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool);
switch_thread_cond_create(&session_manager.cond, session_manager.memory_pool);
switch_mutex_init(&session_manager.cond_mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool);
switch_mutex_init(&session_manager.cond2_mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool);
switch_queue_create(&session_manager.thread_queue, 100000, session_manager.memory_pool);
switch_threadattr_create(&thd_attr, session_manager.memory_pool);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
session_manager.ready = 1;
switch_thread_create(&session_manager.manager_thread, thd_attr, switch_core_session_thread_pool_manager, NULL, session_manager.memory_pool);
}
switch_mutex_init(&session_manager.mutex, SWITCH_MUTEX_DEFAULT, session_manager.memory_pool);
switch_thread_cond_create(&session_manager.cond, session_manager.memory_pool);
switch_queue_create(&session_manager.thread_queue, 100000, session_manager.memory_pool);
}
void switch_core_session_uninit(void)
{
int sanity = 100;
switch_status_t st = SWITCH_STATUS_FALSE;
session_manager.ready = 0;
wake_queue();
while(session_manager.running && --sanity > 0) {
switch_queue_interrupt_all(session_manager.thread_queue);
switch_yield(100000);
}
switch_thread_join(&st, session_manager.manager_thread);
switch_queue_term(session_manager.thread_queue);
switch_mutex_lock(session_manager.mutex);
if (session_manager.running)
switch_thread_cond_timedwait(session_manager.cond, session_manager.mutex, apr_time_from_sec(10));
switch_mutex_unlock(session_manager.mutex);
switch_core_hash_destroy(&session_manager.session_table);
}
SWITCH_DECLARE(switch_app_log_t *) switch_core_session_get_app_log(switch_core_session_t *session)
@ -3049,7 +2913,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_refresh_video(switch_core_se
SWITCH_DECLARE(void) switch_core_session_debug_pool(switch_stream_handle_t *stream)
{
stream->write_function(stream, "Thread pool: running:%d busy:%d popping:%d\n",
session_manager.running, session_manager.busy, session_manager.popping);
session_manager.running, session_manager.busy, session_manager.running - session_manager.busy);
}
/* For Emacs: