From a4f840b9476b749ed15617114b2f87ed8cc29431 Mon Sep 17 00:00:00 2001 From: Anthony Minessale Date: Tue, 7 Oct 2014 12:47:58 -0500 Subject: [PATCH] more jb improvements --- src/switch_stfu.c | 106 +++++++++++++++++++++++++++++++--------------- 1 file changed, 71 insertions(+), 35 deletions(-) diff --git a/src/switch_stfu.c b/src/switch_stfu.c index 6472bd86aa..2452a708dc 100644 --- a/src/switch_stfu.c +++ b/src/switch_stfu.c @@ -121,6 +121,7 @@ struct stfu_instance { int32_t max_drift; uint32_t drift_dropped_packets; uint32_t drift_max_dropped; + uint32_t consecutive_drift; int32_t ts_diff; int32_t last_ts_diff; @@ -330,10 +331,6 @@ stfu_status_t _stfu_n_resize(stfu_instance_t *i, int32_t qlen, int line) i->last_frame = NULL; } - if (s == STFU_IT_WORKED) { - stfu_n_reset_counters(i); - } - return s; } @@ -437,7 +434,8 @@ void _stfu_n_reset(stfu_instance_t *i, const char *file, const char *func, int l i->miss_count = 0; i->packet_count = 0; i->ts_offset = 0; - + i->ts_drift = 0; + i->consecutive_drift = 0; } stfu_status_t stfu_n_sync(stfu_instance_t *i, uint32_t packets) @@ -527,7 +525,7 @@ stfu_status_t stfu_n_add_data(stfu_instance_t *i, uint32_t ts, uint16_t seq, uin i->last_ts_skew = abs((int)(i->last_ts_diff - (int)i->samples_per_packet)); } - if (i->out_queue->array_len && (i->last_time_skew > 1 || i->last_ts_skew)) { + if (i->ready && i->out_queue->array_len && (i->last_time_skew > 1 || i->last_ts_skew)) { int time_ms = i->last_time_skew, ts_ms = (i->last_ts_skew / i->samples_per_packet) * i->ms_per_packet; if (time_ms > i->period_jitter_size_time) { @@ -549,23 +547,42 @@ stfu_status_t stfu_n_add_data(stfu_instance_t *i, uint32_t ts, uint16_t seq, uin } } - if (timer_ts) { + if (timer_ts && i->ready && i->out_queue->array_len) { if (ts && !i->ts_offset) { i->ts_offset = timer_ts - ts; } - i->ts_drift = ts + (i->ts_offset - timer_ts); - - if (i->ts_offset && i->ts_drift > 0) { - i->ts_offset = timer_ts - ts; + if (i->ts_offset) { i->ts_drift = ts + (i->ts_offset - timer_ts); + + if (i->ts_drift > 0) { + i->ts_offset = timer_ts - ts; + i->ts_drift = ts + (i->ts_offset - timer_ts); + } + + if (i->ts_drift) { + i->consecutive_drift++; + } else { + i->consecutive_drift = 0; + } + + if (i->consecutive_drift >= 10 && abs(i->ts_drift) >= i->qlen * i->ms_per_packet) { + if (stfu_log != null_logger && i->debug) { + stfu_log(STFU_LOG_EMERG, "%s DRIFT %d beyond jitter size %d\n\n", i->name, abs(i->ts_drift), i->qlen * i->ms_per_packet); + } + i->period_jitter_size = abs(i->ts_drift); + stfu_n_auto_size(i, 0); + //stfu_n_reset(i); + } } + - - if (i->max_drift) { + if (i->max_drift && i->consecutive_drift >= 10) { if (i->ts_drift < i->max_drift) { if (++i->drift_dropped_packets < i->drift_max_dropped) { - stfu_log(STFU_LOG_EMERG, "%s TOO LATE !!! %u \n\n\n", i->name, ts); + if (stfu_log != null_logger && i->debug) { + stfu_log(STFU_LOG_EMERG, "%s TOO LATE !!! %u \n\n\n", i->name, ts); + } stfu_n_reset(i); //stfu_n_resize(i, 1); //stfu_n_sync(i, 1); @@ -701,12 +718,16 @@ static int stfu_n_find_any_frame(stfu_instance_t *in, stfu_queue_t *queue, stfu_ int best_diff = 1000000, cur_diff = 0; stfu_frame_t *frame = NULL, *best_frame = NULL; int newer = 0; + int was_read = 0; + stfu_assert(r_frame); *r_frame = NULL; top: + was_read = 0; + if (force) { in->cur_ts = 0; } @@ -714,6 +735,8 @@ static int stfu_n_find_any_frame(stfu_instance_t *in, stfu_queue_t *queue, stfu_ for(i = 0; i < queue->real_array_size; i++) { frame = &queue->array[i]; + if (frame->was_read) was_read++; + if (!frame->was_read && in->cur_ts && frame->ts > in->cur_ts) { newer++; } @@ -727,6 +750,14 @@ static int stfu_n_find_any_frame(stfu_instance_t *in, stfu_queue_t *queue, stfu_ } } + if (was_read == queue->real_array_size) { + if (stfu_log != null_logger && in->debug) { + stfu_log(STFU_LOG_EMERG, "%s OUT QUEUE FULL RESETTING\n", in->name); + stfu_n_reset(in); + } + return 0; + } + if (!force && !best_frame && newer) { force = 1; goto top; @@ -773,6 +804,28 @@ static int stfu_n_find_frame(stfu_instance_t *in, stfu_queue_t *queue, uint32_t return 0; } +void stfu_n_dump(stfu_instance_t *i) +{ + uint32_t y; + stfu_frame_t *frame = NULL; + + if (stfu_log != null_logger && i->debug) { + stfu_log(STFU_LOG_EMERG, "%s OUT QUEUE: ------------\n", i->name); + for(y = 0; y < i->out_queue->array_len; y++) { + frame = &i->out_queue->array[y]; + stfu_log(STFU_LOG_EMERG, "%s\t%u:%u r:%d\n", i->name, frame->ts, frame->ts / i->samples_per_packet, frame->was_read); + } + stfu_log(STFU_LOG_EMERG, "%s\n\n", i->name); + stfu_log(STFU_LOG_EMERG, "%s IN QUEUE: ------------\n", i->name); + for(y = 0; y < i->in_queue->array_len; y++) { + frame = &i->in_queue->array[y]; + stfu_log(STFU_LOG_EMERG, "%s\t%u:%u r:%d\n", i->name, frame->ts, frame->ts / i->samples_per_packet, frame->was_read); + } + stfu_log(STFU_LOG_EMERG, "%s\n\n\n", i->name); + } +} + + stfu_frame_t *stfu_n_read_a_frame(stfu_instance_t *i) { stfu_frame_t *rframe = NULL; @@ -849,9 +902,6 @@ stfu_frame_t *stfu_n_read_a_frame(stfu_instance_t *i) if (!found && i->samples_per_packet) { - uint32_t y; - stfu_frame_t *frame = NULL; - int32_t delay = i->cur_ts - i->last_rd_ts; uint32_t need = abs(delay) / i->samples_per_packet; @@ -869,23 +919,7 @@ stfu_frame_t *stfu_n_read_a_frame(stfu_instance_t *i) i->packet_count = 0; } - if (stfu_log != null_logger && i->debug) { - stfu_log(STFU_LOG_EMERG, "%s ------------\n", i->name); - for(y = 0; y < i->out_queue->array_len; y++) { - frame = &i->out_queue->array[y]; - stfu_log(STFU_LOG_EMERG, "%s\t%u:%u\n", i->name, frame->ts, frame->ts / i->samples_per_packet); - } - stfu_log(STFU_LOG_EMERG, "%s ------------\n\n\n", i->name); - - - stfu_log(STFU_LOG_EMERG, "%s ------------\n", i->name); - for(y = 0; y < i->in_queue->array_len; y++) { - frame = &i->in_queue->array[y]; - stfu_log(STFU_LOG_EMERG, "%s\t%u:%u\n", i->name, frame->ts, frame->ts / i->samples_per_packet); - } - stfu_log(STFU_LOG_EMERG, "%s\n\n\n", i->name); - - } + stfu_n_dump(i); } if (stfu_log != null_logger && (i->debug & DBG_OUT)) { @@ -936,7 +970,9 @@ stfu_frame_t *stfu_n_read_a_frame(stfu_instance_t *i) } else { if (force) { - stfu_log(STFU_LOG_EMERG, "%s NO PACKETS HARD RESETTING\n", i->name); + if (stfu_log != null_logger && i->debug) { + stfu_log(STFU_LOG_EMERG, "%s NO PACKETS HARD RESETTING\n", i->name); + } stfu_n_reset(i); } else { i->last_wr_ts = i->cur_ts;