From d6d7773ca4dba2f69bb6b18e58e03d74a2f6a0e8 Mon Sep 17 00:00:00 2001 From: Anthony Minessale Date: Thu, 29 Jul 2010 17:41:23 -0500 Subject: [PATCH] FSCORE-639 with some additional changes --- src/include/switch_types.h | 3 +- src/mod/endpoints/mod_sofia/mod_sofia.c | 2 +- src/switch_core_timer.c | 2 +- src/switch_ivr_play_say.c | 70 +++++++------ src/switch_rtp.c | 128 ++++++++++++++++++------ src/switch_time.c | 5 +- 6 files changed, 144 insertions(+), 66 deletions(-) diff --git a/src/include/switch_types.h b/src/include/switch_types.h index 1808aa167e..f719c108b2 100644 --- a/src/include/switch_types.h +++ b/src/include/switch_types.h @@ -1289,7 +1289,8 @@ typedef uint32_t switch_file_flag_t; typedef enum { SWITCH_IO_FLAG_NONE = 0, - SWITCH_IO_FLAG_NOBLOCK = (1 << 0) + SWITCH_IO_FLAG_NOBLOCK = (1 << 0), + SWITCH_IO_FLAG_SINGLE_READ = (1 << 1) } switch_io_flag_enum_t; typedef uint32_t switch_io_flag_t; diff --git a/src/mod/endpoints/mod_sofia/mod_sofia.c b/src/mod/endpoints/mod_sofia/mod_sofia.c index 79a393a464..ba8f7b71c7 100644 --- a/src/mod/endpoints/mod_sofia/mod_sofia.c +++ b/src/mod/endpoints/mod_sofia/mod_sofia.c @@ -1369,7 +1369,7 @@ static switch_status_t sofia_receive_message(switch_core_session_t *session, swi } else { ok = sofia_test_pflag(tech_pvt->profile, PFLAG_RTP_AUTOFLUSH_DURING_BRIDGE); } - + if (ok) { rtp_flush_read_buffer(tech_pvt->rtp_session, SWITCH_RTP_FLUSH_STICK); } else { diff --git a/src/switch_core_timer.c b/src/switch_core_timer.c index e7f908cc0e..8d563b08b5 100644 --- a/src/switch_core_timer.c +++ b/src/switch_core_timer.c @@ -48,7 +48,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_timer_init(switch_timer_t *timer, co timer->interval = interval; timer->samples = samples; - timer->samplecount = 0; + timer->samplecount = samples; timer->timer_interface = timer_interface; if (pool) { diff --git a/src/switch_ivr_play_say.c b/src/switch_ivr_play_say.c index bc1fbadd00..200126c178 100644 --- a/src/switch_ivr_play_say.c +++ b/src/switch_ivr_play_say.c @@ -915,6 +915,7 @@ SWITCH_DECLARE(switch_status_t) switch_ivr_play_file(switch_core_session_t *sess int done = 0; int timeout_samples = 0; const char *var; + int more_data = 0; if (switch_channel_pre_answer(channel) != SWITCH_STATUS_SUCCESS) { return SWITCH_STATUS_FALSE; @@ -1190,6 +1191,7 @@ SWITCH_DECLARE(switch_status_t) switch_ivr_play_file(switch_core_session_t *sess status = SWITCH_STATUS_GENERR; continue; } + switch_core_timer_sync(&timer); // Sync timer switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Setup timer success %u bytes per %d ms!\n", len, interval); } write_frame.rate = fh->samplerate; @@ -1384,6 +1386,41 @@ SWITCH_DECLARE(switch_status_t) switch_ivr_play_file(switch_core_session_t *sess olen = llen; } + if (!more_data) { + if (timer_name) { + if (switch_core_timer_next(&timer) != SWITCH_STATUS_SUCCESS) { + break; + } + } else { /* time off the channel (if you must) */ + switch_frame_t *read_frame; + switch_status_t tstatus; + + while (switch_channel_ready(channel) && switch_channel_test_flag(channel, CF_HOLD)) { + switch_yield(10000); + } + + tstatus = switch_core_session_read_frame(session, &read_frame, SWITCH_IO_FLAG_SINGLE_READ, 0); + + if (!SWITCH_READ_ACCEPTABLE(tstatus)) { + break; + } + + if (args && (args->read_frame_callback)) { + int ok = 1; + switch_set_flag(fh, SWITCH_FILE_CALLBACK); + if (args->read_frame_callback(session, read_frame, args->user_data) != SWITCH_STATUS_SUCCESS) { + ok = 0; + } + switch_clear_flag(fh, SWITCH_FILE_CALLBACK); + if (!ok) { + break; + } + } + } + } + + more_data = 0; + write_frame.samples = (uint32_t) olen; if (asis) { @@ -1424,6 +1461,7 @@ SWITCH_DECLARE(switch_status_t) switch_ivr_play_file(switch_core_session_t *sess if (status == SWITCH_STATUS_MORE_DATA) { status = SWITCH_STATUS_SUCCESS; + more_data = 1; continue; } else if (status != SWITCH_STATUS_SUCCESS) { done = 1; @@ -1433,36 +1471,6 @@ SWITCH_DECLARE(switch_status_t) switch_ivr_play_file(switch_core_session_t *sess if (done) { break; } - - if (timer_name) { - if (switch_core_timer_next(&timer) != SWITCH_STATUS_SUCCESS) { - break; - } - } else { /* time off the channel (if you must) */ - switch_frame_t *read_frame; - switch_status_t tstatus; - while (switch_channel_ready(channel) && switch_channel_test_flag(channel, CF_HOLD)) { - switch_yield(10000); - } - - tstatus = switch_core_session_read_frame(session, &read_frame, SWITCH_IO_FLAG_NONE, 0); - - if (!SWITCH_READ_ACCEPTABLE(tstatus)) { - break; - } - - if (args && (args->read_frame_callback)) { - int ok = 1; - switch_set_flag(fh, SWITCH_FILE_CALLBACK); - if (args->read_frame_callback(session, read_frame, args->user_data) != SWITCH_STATUS_SUCCESS) { - ok = 0; - } - switch_clear_flag(fh, SWITCH_FILE_CALLBACK); - if (!ok) { - break; - } - } - } } switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "done playing file\n"); @@ -2177,6 +2185,8 @@ SWITCH_DECLARE(switch_status_t) switch_ivr_speak_text(switch_core_session_t *ses switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Setup timer success %u bytes per %d ms!\n", sh->samples * 2, interval); } + switch_core_timer_sync(timer); // Sync timer + /* start a thread to absorb incoming audio */ switch_core_service_session(session); diff --git a/src/switch_rtp.c b/src/switch_rtp.c index a123d93eea..cec5c6be35 100644 --- a/src/switch_rtp.c +++ b/src/switch_rtp.c @@ -135,6 +135,17 @@ struct switch_rtp_rfc2833_data { switch_mutex_t *dtmf_mutex; }; + +#define FLUSH_MAX 5 +#define MAX_MSG 6 + +struct rtp_packet { + rtp_msg_t recv_msg; + switch_size_t bytes; +}; + +typedef struct rtp_packet rtp_packet_t; + struct switch_rtp { /* * Two sockets are needed because we might be transcoding protocol families @@ -151,7 +162,12 @@ struct switch_rtp { rtcp_msg_t rtcp_send_msg; switch_sockaddr_t *remote_addr, *rtcp_remote_addr; + rtp_msg_t recv_msg; + rtp_packet_t recv_msg_array[MAX_MSG]; + int recv_msg_idx; + + rtcp_msg_t rtcp_recv_msg; switch_sockaddr_t *remote_stun_addr; @@ -225,7 +241,7 @@ struct switch_rtp { uint32_t cng_count; switch_rtp_bug_flag_t rtp_bugs; switch_rtp_stats_t stats; - uint32_t hot_hits; + //uint32_t hot_hits; uint32_t sync_packets; int rtcp_interval; switch_bool_t rtcp_fresh_frame; @@ -239,6 +255,7 @@ struct switch_rtp { #endif switch_time_t send_time; + //int more_data; }; struct switch_rtcp_senderinfo { @@ -2057,13 +2074,59 @@ static switch_status_t read_rtp_packet(switch_rtp_t *rtp_session, switch_size_t { switch_status_t status = SWITCH_STATUS_FALSE; stfu_frame_t *jb_frame; + int i = 0; + switch_assert(bytes); - *bytes = sizeof(rtp_msg_t); - status = switch_socket_recvfrom(rtp_session->from_addr, rtp_session->sock_input, 0, (void *) &rtp_session->recv_msg, bytes); + *bytes = 0; + + top: + + if ((switch_test_flag(rtp_session, SWITCH_RTP_FLAG_AUTOFLUSH) || switch_test_flag(rtp_session, SWITCH_RTP_FLAG_STICKY_FLUSH))) { + if (rtp_session->recv_msg_idx) { + rtp_session->recv_msg = rtp_session->recv_msg_array[0].recv_msg; + *bytes = rtp_session->recv_msg_array[0].bytes; + + for (i = 1; i < MAX_MSG - 1; i++) { + rtp_session->recv_msg_array[i-1] = rtp_session->recv_msg_array[i]; + } + rtp_session->recv_msg_idx--; + status = SWITCH_STATUS_SUCCESS; + goto got_data; + } + + + while(rtp_session->recv_msg_idx < MAX_MSG) { + switch_status_t rstatus; + switch_size_t rb = sizeof(rtp_msg_t); + + rstatus = switch_socket_recvfrom(rtp_session->from_addr, + rtp_session->sock_input, 0, + (void *) &rtp_session->recv_msg_array[rtp_session->recv_msg_idx].recv_msg, + &rb); + + if ((rstatus != SWITCH_STATUS_SUCCESS && rstatus != SWITCH_STATUS_BREAK) || rb < 0) { + *bytes = rb; + return rstatus; + } + + if (!rb) break; + + rtp_session->recv_msg_array[rtp_session->recv_msg_idx].bytes = rb; + rtp_session->recv_msg_idx++; + } + + if (!*bytes && rtp_session->recv_msg_idx) goto top; + } else { + *bytes = sizeof(rtp_msg_t); + status = switch_socket_recvfrom(rtp_session->from_addr, rtp_session->sock_input, 0, (void *) &rtp_session->recv_msg, bytes); + } + + got_data: if (*bytes) { + rtp_session->stats.inbound.raw_bytes += *bytes; if (rtp_session->recv_te && rtp_session->recv_msg.header.pt == rtp_session->recv_te) { rtp_session->stats.inbound.dtmf_packet_count++; @@ -2235,6 +2298,7 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ int fdr = 0; int rtcp_fdr = 0; int hot_socket = 0; + int read_loops = 0; if (session) { channel = switch_core_session_get_channel(session); @@ -2253,36 +2317,36 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ while (switch_rtp_ready(rtp_session)) { int do_cng = 0; bytes = 0; + read_loops++; if (rtp_session->timer.interval) { - if ((switch_test_flag(rtp_session, SWITCH_RTP_FLAG_AUTOFLUSH) || switch_test_flag(rtp_session, SWITCH_RTP_FLAG_STICKY_FLUSH)) && - rtp_session->read_pollfd) { - if (switch_poll(rtp_session->read_pollfd, 1, &fdr, 0) == SWITCH_STATUS_SUCCESS) { - rtp_session->hot_hits += rtp_session->samples_per_interval; - - if (rtp_session->hot_hits >= rtp_session->samples_per_second * 5) { - switch_set_flag(rtp_session, SWITCH_RTP_FLAG_FLUSH); - hot_socket = 1; - } - } else { - rtp_session->hot_hits = 0; - } + if ((switch_test_flag(rtp_session, SWITCH_RTP_FLAG_AUTOFLUSH) || switch_test_flag(rtp_session, SWITCH_RTP_FLAG_STICKY_FLUSH)) && + rtp_session->recv_msg_idx > FLUSH_MAX) { + hot_socket = 1; + } else { + hot_socket = 0; } - + if (hot_socket) { rtp_session->sync_packets++; switch_core_timer_sync(&rtp_session->timer); } else { if (rtp_session->sync_packets) { #if 0 - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Auto-Flush catching up %d packets (%d)ms.\n", rtp_session->sync_packets, (rtp_session->ms_per_packet * rtp_session->sync_packets) / 1000); #endif rtp_session->sync_packets = 0; } + switch_core_timer_next(&rtp_session->timer); } + + + + + } recvfrom: @@ -2405,7 +2469,13 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ goto end; } - if (rtp_session->max_missed_packets) { + if (!bytes && (io_flags & SWITCH_IO_FLAG_NOBLOCK)) { + rtp_session->missed_count = 0; + ret = 0; + goto end; + } + + if (rtp_session->max_missed_packets && read_loops == 1) { if (bytes) { rtp_session->missed_count = 0; } else if (++rtp_session->missed_count >= rtp_session->max_missed_packets) { @@ -2553,10 +2623,6 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ } } - if (!bytes && (io_flags & SWITCH_IO_FLAG_NOBLOCK)) { - return_cng_frame(); - } - if (check && switch_test_flag(rtp_session, SWITCH_RTP_FLAG_AUTO_CNG) && rtp_session->timer.samplecount >= (rtp_session->last_write_samplecount + (rtp_session->samples_per_interval * 50))) { @@ -2814,17 +2880,16 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ switch_cond_next(); continue; } - + return_cng_frame(); } } - + if (status == SWITCH_STATUS_BREAK || bytes == 0) { - if (switch_test_flag(rtp_session, SWITCH_RTP_FLAG_DATAWAIT)) { + if (!(io_flags & SWITCH_IO_FLAG_SINGLE_READ) && switch_test_flag(rtp_session, SWITCH_RTP_FLAG_DATAWAIT)) { goto do_continue; } - ret = 0; - goto end; + return_cng_frame(); } if (switch_test_flag(rtp_session, SWITCH_RTP_FLAG_GOOGLEHACK) && rtp_session->recv_msg.header.pt == 102) { @@ -2856,7 +2921,7 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ } end: - + READ_DEC(rtp_session); return ret; @@ -2960,6 +3025,7 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_read(switch_rtp_t *rtp_session, void bytes = rtp_common_read(rtp_session, payload_type, flags, io_flags); + if (bytes < 0) { *datalen = 0; return bytes == -2 ? SWITCH_STATUS_TIMEOUT : SWITCH_STATUS_GENERR; @@ -3153,10 +3219,14 @@ static int rtp_common_write(switch_rtp_t *rtp_session, if (timestamp) { rtp_session->ts = (uint32_t) timestamp; + /* Send marker bit if timestamp is lower/same as before (resetted/new timer) */ + if (rtp_session->ts <= rtp_session->last_write_ts) { + m++; + } } else if (rtp_session->timer.timer_interface) { rtp_session->ts = rtp_session->timer.samplecount; - if (rtp_session->ts <= rtp_session->last_write_ts) { + if (rtp_session->ts <= rtp_session->last_write_ts && rtp_session->ts > 0) { rtp_session->ts = rtp_session->last_write_ts + rtp_session->samples_per_interval; } } else { diff --git a/src/switch_time.c b/src/switch_time.c index 5c07409d93..7ffdf4e4d7 100644 --- a/src/switch_time.c +++ b/src/switch_time.c @@ -550,10 +550,7 @@ static switch_status_t timer_sync(switch_timer_t *timer) private_info->reference = timer->tick = TIMER_MATRIX[timer->interval].tick; /* apply timestamp */ - if (timer_step(timer) == SWITCH_STATUS_SUCCESS) { - /* push the reference into the future to prevent collision */ - private_info->reference++; - } + timer_step(timer); return SWITCH_STATUS_SUCCESS; }