FS-8811 #resolve [FS 1.7 crashes intermittently]

This commit is contained in:
Anthony Minessale 2016-02-17 15:15:03 -06:00
parent a0841e8606
commit 2b5f40b38e
3 changed files with 66 additions and 70 deletions

View File

@ -114,10 +114,28 @@ struct local_stream_source {
switch_image_t *cover_art;
char *banner_txt;
int serno;
switch_size_t abuflen;
switch_byte_t *abuf;
};
typedef struct local_stream_source local_stream_source_t;
local_stream_source_t *get_source(const char *path)
{
local_stream_source_t *source = NULL;
switch_mutex_lock(globals.mutex);
if ((source = switch_core_hash_find(globals.source_hash, path))) {
if (!RUNNING || source->stopped || switch_thread_rwlock_tryrdlock(source->rwlock) != SWITCH_STATUS_SUCCESS) {
source = NULL;
}
}
switch_mutex_unlock(globals.mutex);
return source;
}
switch_status_t list_streams_full(const char *line, const char *cursor, switch_console_callback_match_t **matches, switch_bool_t show_aliases)
{
local_stream_source_t *source;
@ -183,9 +201,9 @@ static void flush_video_queue(switch_queue_t *q)
static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void *obj)
{
local_stream_source_t *source = obj;
volatile local_stream_source_t *s = (local_stream_source_t *) obj;
local_stream_source_t *source = (local_stream_source_t *) s;
switch_file_handle_t fh = { 0 };
local_stream_context_t *cp;
char file_buf[128] = "", path_buf[512] = "", last_path[512], png_buf[512] = "", tmp_buf[512] = "";
switch_timer_t timer = { 0 };
int fd = -1;
@ -216,13 +234,13 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void
switch_thread_rwlock_create(&source->rwlock, source->pool);
if (RUNNING) {
source->ready = 1;
switch_mutex_lock(globals.mutex);
switch_core_hash_insert(globals.source_hash, source->name, source);
switch_mutex_unlock(globals.mutex);
source->ready = 1;
}
while (RUNNING && !source->stopped) {
while (RUNNING && !source->stopped && source->ready) {
const char *fname;
if (temp_pool) {
@ -258,7 +276,6 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void
while (RUNNING && !source->stopped) {
switch_size_t olen;
uint8_t abuf[SWITCH_RECOMMENDED_BUFFER_SIZE] = { 0 };
const char *artist = NULL, *title = NULL;
if (fd > -1) {
@ -452,11 +469,11 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void
if (use_fh == &source->chime_fh) {
olen = source->samples;
switch_core_file_read(&fh, abuf, &olen);
switch_core_file_read(&fh, source->abuf, &olen);
olen = source->samples;
}
if (switch_core_file_read(use_fh, abuf, &olen) != SWITCH_STATUS_SUCCESS || !olen) {
if (switch_core_file_read(use_fh, source->abuf, &olen) != SWITCH_STATUS_SUCCESS || !olen) {
switch_core_file_close(use_fh);
flush_video_queue(source->video_q);
@ -480,7 +497,7 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void
}
if (source->total) {
switch_buffer_write(audio_buffer, abuf, olen * 2 * source->channels);
switch_buffer_write(audio_buffer, source->abuf, olen * 2 * source->channels);
} else {
switch_buffer_zero(audio_buffer);
}
@ -502,6 +519,7 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void
//if (!is_open || used >= source->prebuf || (source->total && used > source->samples * 2 * source->channels)) {
void *pop;
uint32_t bused;
local_stream_context_t *cp = NULL;
used = switch_buffer_read(audio_buffer, dist_buf, source->samples * 2 * source->channels);
@ -509,7 +527,6 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void
switch_mutex_lock(source->mutex);
for (cp = source->context_list; cp && RUNNING; cp = cp->next) {
if (source->has_video) {
switch_set_flag(cp->handle, SWITCH_FILE_FLAG_VIDEO);
} else {
@ -621,6 +638,7 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void
}
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "local_stream://%s fully reloaded.\n",source->name);
switch_thread_rwlock_unlock(source->rwlock);
launch_streams(source->name);
goto done;
}
@ -683,22 +701,18 @@ static switch_status_t local_stream_file_open(switch_file_handle_t *handle, cons
return SWITCH_STATUS_FALSE;
}
switch_mutex_lock(globals.mutex);
top:
alt_path = switch_mprintf("%s/%d", path, handle->samplerate);
if ((source = switch_core_hash_find(globals.source_hash, alt_path))) {
if ((source = get_source(alt_path))) {
path = alt_path;
} else {
source = switch_core_hash_find(globals.source_hash, path);
source = get_source(path);
}
if (source) {
if (switch_thread_rwlock_tryrdlock(source->rwlock) != SWITCH_STATUS_SUCCESS) {
source = NULL;
}
} else {
if (!source) {
if (!switch_stristr("default", alt_path) && !switch_stristr("default", path)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Unknown source %s, trying 'default'\n", path);
free(alt_path);
@ -706,7 +720,6 @@ static switch_status_t local_stream_file_open(switch_file_handle_t *handle, cons
goto top;
}
}
switch_mutex_unlock(globals.mutex);
if (!source) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unknown source %s\n", path);
@ -715,8 +728,7 @@ static switch_status_t local_stream_file_open(switch_file_handle_t *handle, cons
}
if ((context = switch_core_alloc(handle->memory_pool, sizeof(*context))) == 0) {
status = SWITCH_STATUS_MEMERR;
goto end;
abort();
}
switch_queue_create(&context->video_q, 500, handle->memory_pool);
@ -760,6 +772,7 @@ static switch_status_t local_stream_file_open(switch_file_handle_t *handle, cons
switch_mutex_unlock(source->mutex);
end:
switch_safe_free(alt_path);
return status;
}
@ -783,7 +796,7 @@ static switch_status_t local_stream_file_close(switch_file_handle_t *handle)
last = cp;
}
if (context->video_q) {
if (context->source->has_video) {
flush_video_queue(context->video_q);
switch_queue_trypush(context->video_q, NULL);
switch_queue_interrupt_all(context->video_q);
@ -1046,6 +1059,8 @@ static void launch_thread(const char *name, const char *path, switch_xml_t direc
}
source->samples = switch_samples_per_packet(source->rate, source->interval);
source->abuflen = (source->samples * 2 * source->channels) + 1024;
source->abuf = switch_core_alloc(source->pool, source->abuflen);
switch_mutex_init(&source->mutex, SWITCH_MUTEX_NESTED, source->pool);
switch_threadattr_create(&thd_attr, source->pool);
switch_threadattr_detach_set(thd_attr, 1);
@ -1108,55 +1123,37 @@ SWITCH_STANDARD_API(local_stream_function)
local_stream_name = argv[1];
if (!strcasecmp(argv[0], "hup") && local_stream_name) {
switch_mutex_lock(globals.mutex);
source = switch_core_hash_find(globals.source_hash, local_stream_name);
switch_mutex_unlock(globals.mutex);
if (source) {
if ((source = get_source(local_stream_name))) {
source->hup = 1;
stream->write_function(stream, "+OK hup stream: %s", source->name);
goto done;
switch_thread_rwlock_unlock(source->rwlock);
}
} else if (!strcasecmp(argv[0], "stop") && local_stream_name) {
switch_mutex_lock(globals.mutex);
source = switch_core_hash_find(globals.source_hash, local_stream_name);
switch_mutex_unlock(globals.mutex);
if (!source) {
if ((source = get_source(local_stream_name))) {
source->stopped = 1;
stream->write_function(stream, "+OK");
switch_thread_rwlock_unlock(source->rwlock);
} else {
stream->write_function(stream, "-ERR Cannot locate local_stream %s!\n", local_stream_name);
goto done;
}
source->stopped = 1;
stream->write_function(stream, "+OK");
} else if (!strcasecmp(argv[0], "reload") && local_stream_name) {
switch_mutex_lock(globals.mutex);
source = switch_core_hash_find(globals.source_hash, local_stream_name);
switch_mutex_unlock(globals.mutex);
if (!source) {
if ((source = get_source(local_stream_name))) {
source->full_reload = 1;
source->part_reload = 1;
stream->write_function(stream, "+OK");
} else {
stream->write_function(stream, "-ERR Cannot locate local_stream %s!\n", local_stream_name);
goto done;
}
source->full_reload = 1;
source->part_reload = 1;
stream->write_function(stream, "+OK");
} else if (!strcasecmp(argv[0], "start") && local_stream_name) {
switch_mutex_lock(globals.mutex);
source = switch_core_hash_find(globals.source_hash, local_stream_name);
switch_mutex_unlock(globals.mutex);
if (source) {
if ((source = get_source(local_stream_name))) {
source->stopped = 0;
stream->write_function(stream, "+OK stream: %s", source->name);
goto done;
}
if ((ok = launch_streams(local_stream_name))) {
stream->write_function(stream, "+OK stream: %s", local_stream_name);
goto done;
} else {
if ((ok = launch_streams(local_stream_name))) {
stream->write_function(stream, "+OK stream: %s", local_stream_name);
}
}
} else if (!strcasecmp(argv[0], "show")) {
@ -1165,22 +1162,21 @@ SWITCH_STANDARD_API(local_stream_function)
void *val;
switch_bool_t xml = SWITCH_FALSE;
switch_mutex_lock(globals.mutex);
if (argc == 1) {
switch_mutex_lock(globals.mutex);
for (hi = switch_core_hash_first(globals.source_hash); hi; hi = switch_core_hash_next(&hi)) {
switch_core_hash_this(hi, &var, NULL, &val);
if ((source = (local_stream_source_t *) val)) {
stream->write_function(stream, "%s,%s\n", source->name, source->location);
}
}
switch_mutex_unlock(globals.mutex);
} else {
if (argc == 4 && !strcasecmp("xml", argv[3])) {
xml = SWITCH_TRUE;
}
source = switch_core_hash_find(globals.source_hash, local_stream_name);
if (source) {
if ((source = get_source(local_stream_name))) {
if (xml) {
stream->write_function(stream, "<?xml version=\"1.0\"?>\n<local_stream name=\"%s\">\n", source->name);
stream->write_function(stream, " <location>%s</location>\n", source->location);
@ -1210,13 +1206,11 @@ SWITCH_STANDARD_API(local_stream_function)
stream->write_function(stream, " stopped: %s\n", (source->stopped) ? "true" : "false");
stream->write_function(stream, " reloading: %s\n", (source->full_reload) ? "true" : "false");
}
switch_thread_rwlock_unlock(source->rwlock);
} else {
stream->write_function(stream, "-ERR Cannot locate local_stream %s!\n", local_stream_name);
}
}
switch_mutex_unlock(globals.mutex);
goto done;
}
goto done;

View File

@ -292,7 +292,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_perform_file_open(const char *file,
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Spool dir is set. Make sure [%s] is also a valid path\n", fh->spool_path);
}
UNPROTECT_INTERFACE(fh->file_interface);
switch_goto_status(status, fail);
goto fail;
}
fh->real_channels = fh->channels;
@ -305,7 +305,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_perform_file_open(const char *file,
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "File [%s] not created!\n", file_path);
fh->file_interface->file_close(fh);
UNPROTECT_INTERFACE(fh->file_interface);
switch_goto_status(status, fail);
goto fail;
}
if (to) {

View File

@ -333,6 +333,7 @@ static void *audio_bridge_thread(switch_thread_t *thread, void *obj)
time_t answer_limit = 0;
const char *exec_app = NULL;
const char *exec_data = NULL;
switch_codec_implementation_t read_impl = { 0 };
#ifdef SWITCH_VIDEO_IN_THREADS
struct vid_helper vh = { 0 };
@ -345,6 +346,9 @@ static void *audio_bridge_thread(switch_thread_t *thread, void *obj)
return NULL;
}
switch_core_session_get_read_impl(session_a, &read_impl);
input_callback = data->input_callback;
user_data = data->session_data;
stream_id = data->stream_id;
@ -405,8 +409,6 @@ static void *audio_bridge_thread(switch_thread_t *thread, void *obj)
}
if ((silence_var = switch_channel_get_variable(chan_a, "bridge_generate_comfort_noise"))) {
switch_codec_implementation_t read_impl = { 0 };
switch_core_session_get_read_impl(session_a, &read_impl);
if (!switch_channel_media_up(chan_a)) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session_a), SWITCH_LOG_ERROR, "Channel has no media!\n");
@ -683,7 +685,7 @@ static void *audio_bridge_thread(switch_thread_t *thread, void *obj)
if (switch_test_flag(read_frame, SFF_CNG)) {
if (silence_val) {
switch_generate_sln_silence((int16_t *) silence_frame.data, silence_frame.samples,
read_frame->codec->implementation->number_of_channels, silence_val);
read_impl.number_of_channels, silence_val);
read_frame = &silence_frame;
} else if (!switch_channel_test_flag(chan_b, CF_ACCEPT_CNG)) {
continue;