From a365fb636ad9e2f4bb5dee43eddc305560699114 Mon Sep 17 00:00:00 2001 From: Anthony Minessale Date: Wed, 11 Jan 2012 17:49:35 -0600 Subject: [PATCH] mailing list 36bc584d980ce80fe6a6f6e7d7383db9.squirrel@my.tomp.co.uk [Freeswitch-users] audo sync issues with record_session to mp3 I redid the stream recording with timestamps and headers to try to keep it more synced --- src/include/private/switch_core_pvt.h | 3 + src/include/switch_types.h | 6 ++ src/switch_core_io.c | 22 ++++- src/switch_core_media_bug.c | 117 ++++++++++++++++++++------ src/switch_ivr_async.c | 2 +- 5 files changed, 118 insertions(+), 32 deletions(-) diff --git a/src/include/private/switch_core_pvt.h b/src/include/private/switch_core_pvt.h index 8d2a8122b9..3256d0ae6f 100644 --- a/src/include/private/switch_core_pvt.h +++ b/src/include/private/switch_core_pvt.h @@ -193,6 +193,9 @@ struct switch_media_bug { switch_thread_id_t thread_id; char *function; char *target; + switch_codec_implementation_t read_impl; + switch_codec_implementation_t write_impl; + switch_timer_t timer; struct switch_media_bug *next; }; diff --git a/src/include/switch_types.h b/src/include/switch_types.h index ea65319924..acc7722637 100644 --- a/src/include/switch_types.h +++ b/src/include/switch_types.h @@ -785,6 +785,12 @@ typedef struct { #pragma pack(pop, r1) #endif +typedef struct audio_buffer_header_s { + uint32_t ts; + uint32_t len; +} audio_buffer_header_t; + + /*! \enum switch_priority_t \brief Priority Indication diff --git a/src/switch_core_io.c b/src/switch_core_io.c index b0e525b511..bd3d649656 100644 --- a/src/switch_core_io.c +++ b/src/switch_core_io.c @@ -484,8 +484,14 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_read_frame(switch_core_sessi } if (bp->ready && switch_test_flag(bp, SMBF_READ_STREAM)) { + audio_buffer_header_t h = { 0 }; + switch_mutex_lock(bp->read_mutex); - switch_buffer_write(bp->raw_read_buffer, read_frame->data, read_frame->datalen); + h.ts = bp->timer.samplecount; + h.len = read_frame->datalen; + switch_buffer_write(bp->raw_read_buffer, &h, sizeof(h)); + switch_buffer_write(bp->raw_read_buffer, read_frame->data, h.len); + if (bp->callback) { ok = bp->callback(bp, bp->user_data, SWITCH_ABC_TYPE_READ); } @@ -646,6 +652,10 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_read_frame(switch_core_sessi continue; } + if (bp->ready && bp->timer.timer_interface) { + switch_core_timer_sync(&bp->timer); + } + if (bp->ready && switch_test_flag(bp, SMBF_READ_PING)) { switch_mutex_lock(bp->read_mutex); if (bp->callback) { @@ -970,10 +980,16 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_write_frame(switch_core_sess } if (switch_test_flag(bp, SMBF_WRITE_STREAM)) { - + audio_buffer_header_t h = { 0 }; + switch_mutex_lock(bp->write_mutex); - switch_buffer_write(bp->raw_write_buffer, write_frame->data, write_frame->datalen); + h.ts = bp->timer.samplecount; + h.len = write_frame->datalen; + + switch_buffer_write(bp->raw_write_buffer, &h, sizeof(h)); + switch_buffer_write(bp->raw_write_buffer, write_frame->data, h.len); switch_mutex_unlock(bp->write_mutex); + if (bp->callback) { ok = bp->callback(bp, bp->user_data, SWITCH_ABC_TYPE_WRITE); } diff --git a/src/switch_core_media_bug.c b/src/switch_core_media_bug.c index 7e75ae9e96..fb6068b94e 100644 --- a/src/switch_core_media_bug.c +++ b/src/switch_core_media_bug.c @@ -47,6 +47,10 @@ static void switch_core_media_bug_destroy(switch_media_bug_t *bug) switch_buffer_destroy(&bug->raw_write_buffer); } + if (bug->timer.timer_interface) { + switch_core_timer_destroy(&bug->timer); + } + if (switch_event_create(&event, SWITCH_EVENT_MEDIA_BUG_STOP) == SWITCH_STATUS_SUCCESS) { switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Media-Bug-Function", "%s", bug->function); switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Media-Bug-Target", "%s", bug->target); @@ -140,6 +144,20 @@ SWITCH_DECLARE(void) switch_core_media_bug_inuse(switch_media_bug_t *bug, switch } } +static switch_size_t do_peek(switch_buffer_t *buffer, audio_buffer_header_t *h) +{ + const void *vp = NULL; + audio_buffer_header_t *hp; + switch_size_t r; + + if ((r = switch_buffer_peek_zerocopy(buffer, &vp))) { + hp = (audio_buffer_header_t *) vp; + *h = *hp; + } + + return r; +} + SWITCH_DECLARE(switch_status_t) switch_core_media_bug_read(switch_media_bug_t *bug, switch_frame_t *frame, switch_bool_t fill) { switch_size_t bytes = 0, datalen = 0, ttl = 0; @@ -150,6 +168,9 @@ SWITCH_DECLARE(switch_status_t) switch_core_media_bug_read(switch_media_bug_t *b uint32_t blen; switch_codec_implementation_t read_impl = { 0 }; int16_t *tp; + audio_buffer_header_t rh = { 0 }, wh = { 0 }; + int do_read = 0, do_write = 0; + switch_size_t ur = 0, uw = 0; switch_core_session_get_read_impl(bug->session, &read_impl); @@ -167,29 +188,72 @@ SWITCH_DECLARE(switch_status_t) switch_core_media_bug_read(switch_media_bug_t *b return SWITCH_STATUS_FALSE; } + + frame->flags = 0; frame->datalen = 0; - if (!switch_buffer_inuse(bug->raw_read_buffer)) { - return SWITCH_STATUS_FALSE; + if (switch_test_flag(bug, SMBF_READ_STREAM)) { + switch_mutex_lock(bug->read_mutex); + do_peek(bug->raw_read_buffer, &rh); + ur = switch_buffer_inuse(bug->raw_read_buffer); + switch_mutex_unlock(bug->read_mutex); } - switch_mutex_lock(bug->read_mutex); - frame->datalen = (uint32_t) switch_buffer_read(bug->raw_read_buffer, frame->data, bytes); - ttl += frame->datalen; - switch_mutex_unlock(bug->read_mutex); - if (switch_test_flag(bug, SMBF_WRITE_STREAM)) { switch_assert(bug->raw_write_buffer); - switch_mutex_lock(bug->write_mutex); - datalen = (uint32_t) switch_buffer_read(bug->raw_write_buffer, bug->data, bytes); - ttl += datalen; - if (fill && datalen < bytes) { - memset(((unsigned char *) bug->data) + datalen, 0, bytes - datalen); - datalen = bytes; - } + do_peek(bug->raw_write_buffer, &wh); + uw = switch_buffer_inuse(bug->raw_write_buffer); switch_mutex_unlock(bug->write_mutex); } + + + if (ur && uw && ur > uw) { + do_write = 1; + } else if (ur && uw && uw > ur) { + do_read = 1; + } else { + do_read = !!ur; + do_write = !!uw; + } + + if (do_read && do_write) { + if (rh.ts > wh.ts) { + do_read = 0; + } else if (wh.ts > rh.ts) { + do_write = 0; + } + } + + if (!(do_read || do_write)) { + return SWITCH_STATUS_FALSE; + } + + if (do_read) { + switch_mutex_lock(bug->read_mutex); + switch_buffer_read(bug->raw_read_buffer, &rh, sizeof(rh)); + frame->datalen = (uint32_t) switch_buffer_read(bug->raw_read_buffer, frame->data, rh.len); + ttl += frame->datalen; + switch_mutex_unlock(bug->read_mutex); + } else { + memset(frame->data, 255, bytes); + frame->datalen = bytes; + ttl += bytes; + } + + if (do_write) { + switch_assert(bug->raw_write_buffer); + switch_mutex_lock(bug->write_mutex); + switch_buffer_read(bug->raw_write_buffer, &wh, sizeof(wh)); + + datalen = (uint32_t) switch_buffer_read(bug->raw_write_buffer, bug->data, wh.len); + ttl += datalen; + switch_mutex_unlock(bug->write_mutex); + } else { + memset(bug->data, 255, bytes); + datalen = bytes; + ttl += bytes; + } tp = bug->tmp; dp = (int16_t *) bug->data; @@ -198,14 +262,6 @@ SWITCH_DECLARE(switch_status_t) switch_core_media_bug_read(switch_media_bug_t *b wlen = datalen / 2; blen = bytes / 2; - if (!fill && rlen == 0 && wlen == 0) { - frame->datalen = 0; - frame->samples = 0; - frame->rate = read_impl.actual_samples_per_second; - frame->codec = NULL; - return SWITCH_STATUS_FALSE; - } - if (switch_test_flag(bug, SMBF_STEREO)) { int16_t *left, *right; size_t left_len, right_len; @@ -271,8 +327,6 @@ SWITCH_DECLARE(switch_status_t) switch_core_media_bug_add(switch_core_session_t { switch_media_bug_t *bug; //, *bp; switch_size_t bytes; - switch_codec_implementation_t read_impl = { 0 }; - switch_codec_implementation_t write_impl = { 0 }; switch_event_t *event; const char *p; @@ -283,8 +337,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_media_bug_add(switch_core_session_t } } - switch_core_session_get_read_impl(session, &read_impl); - switch_core_session_get_write_impl(session, &write_impl); + *new_bug = NULL; @@ -329,6 +382,9 @@ SWITCH_DECLARE(switch_status_t) switch_core_media_bug_add(switch_core_session_t bug->function = "N/A"; bug->target = "N/A"; + switch_core_session_get_read_impl(session, &bug->read_impl); + switch_core_session_get_write_impl(session, &bug->write_impl); + if (function) { bug->function = switch_core_session_strdup(session, function); } @@ -338,7 +394,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_media_bug_add(switch_core_session_t } bug->stop_time = stop_time; - bytes = read_impl.decoded_bytes_per_packet; + bytes = bug->read_impl.decoded_bytes_per_packet; if (!bug->flags) { bug->flags = (SMBF_READ_STREAM | SMBF_WRITE_STREAM); @@ -349,13 +405,18 @@ SWITCH_DECLARE(switch_status_t) switch_core_media_bug_add(switch_core_session_t switch_mutex_init(&bug->read_mutex, SWITCH_MUTEX_NESTED, session->pool); } - bytes = write_impl.decoded_bytes_per_packet; + bytes = bug->write_impl.decoded_bytes_per_packet; if (switch_test_flag(bug, SMBF_WRITE_STREAM)) { switch_buffer_create_dynamic(&bug->raw_write_buffer, bytes * SWITCH_BUFFER_BLOCK_FRAMES, bytes * SWITCH_BUFFER_START_FRAMES, MAX_BUG_BUFFER); switch_mutex_init(&bug->write_mutex, SWITCH_MUTEX_NESTED, session->pool); } + if (switch_test_flag(bug, SMBF_READ_STREAM) || switch_test_flag(bug, SMBF_WRITE_STREAM)) { + switch_core_timer_init(&bug->timer, "soft", bug->read_impl.microseconds_per_packet / 1000, bug->read_impl.samples_per_packet, + switch_core_session_get_pool(session)); + } + if ((bug->flags & SMBF_THREAD_LOCK)) { bug->thread_id = switch_thread_self(); } diff --git a/src/switch_ivr_async.c b/src/switch_ivr_async.c index bea3229df6..2e96c8aa0b 100644 --- a/src/switch_ivr_async.c +++ b/src/switch_ivr_async.c @@ -1091,7 +1091,7 @@ static switch_bool_t record_callback(switch_media_bug_t *bug, void *user_data, s } break; - case SWITCH_ABC_TYPE_READ: + case SWITCH_ABC_TYPE_READ_PING: if (rh->fh) { switch_size_t len;