more jb improvements

This commit is contained in:
Anthony Minessale 2014-10-07 12:47:58 -05:00
parent 6860b41763
commit a4f840b947
1 changed files with 71 additions and 35 deletions

View File

@ -121,6 +121,7 @@ struct stfu_instance {
int32_t max_drift;
uint32_t drift_dropped_packets;
uint32_t drift_max_dropped;
uint32_t consecutive_drift;
int32_t ts_diff;
int32_t last_ts_diff;
@ -330,10 +331,6 @@ stfu_status_t _stfu_n_resize(stfu_instance_t *i, int32_t qlen, int line)
i->last_frame = NULL;
}
if (s == STFU_IT_WORKED) {
stfu_n_reset_counters(i);
}
return s;
}
@ -437,7 +434,8 @@ void _stfu_n_reset(stfu_instance_t *i, const char *file, const char *func, int l
i->miss_count = 0;
i->packet_count = 0;
i->ts_offset = 0;
i->ts_drift = 0;
i->consecutive_drift = 0;
}
stfu_status_t stfu_n_sync(stfu_instance_t *i, uint32_t packets)
@ -527,7 +525,7 @@ stfu_status_t stfu_n_add_data(stfu_instance_t *i, uint32_t ts, uint16_t seq, uin
i->last_ts_skew = abs((int)(i->last_ts_diff - (int)i->samples_per_packet));
}
if (i->out_queue->array_len && (i->last_time_skew > 1 || i->last_ts_skew)) {
if (i->ready && i->out_queue->array_len && (i->last_time_skew > 1 || i->last_ts_skew)) {
int time_ms = i->last_time_skew, ts_ms = (i->last_ts_skew / i->samples_per_packet) * i->ms_per_packet;
if (time_ms > i->period_jitter_size_time) {
@ -549,23 +547,42 @@ stfu_status_t stfu_n_add_data(stfu_instance_t *i, uint32_t ts, uint16_t seq, uin
}
}
if (timer_ts) {
if (timer_ts && i->ready && i->out_queue->array_len) {
if (ts && !i->ts_offset) {
i->ts_offset = timer_ts - ts;
}
i->ts_drift = ts + (i->ts_offset - timer_ts);
if (i->ts_offset && i->ts_drift > 0) {
i->ts_offset = timer_ts - ts;
if (i->ts_offset) {
i->ts_drift = ts + (i->ts_offset - timer_ts);
if (i->ts_drift > 0) {
i->ts_offset = timer_ts - ts;
i->ts_drift = ts + (i->ts_offset - timer_ts);
}
if (i->ts_drift) {
i->consecutive_drift++;
} else {
i->consecutive_drift = 0;
}
if (i->consecutive_drift >= 10 && abs(i->ts_drift) >= i->qlen * i->ms_per_packet) {
if (stfu_log != null_logger && i->debug) {
stfu_log(STFU_LOG_EMERG, "%s DRIFT %d beyond jitter size %d\n\n", i->name, abs(i->ts_drift), i->qlen * i->ms_per_packet);
}
i->period_jitter_size = abs(i->ts_drift);
stfu_n_auto_size(i, 0);
//stfu_n_reset(i);
}
}
if (i->max_drift) {
if (i->max_drift && i->consecutive_drift >= 10) {
if (i->ts_drift < i->max_drift) {
if (++i->drift_dropped_packets < i->drift_max_dropped) {
stfu_log(STFU_LOG_EMERG, "%s TOO LATE !!! %u \n\n\n", i->name, ts);
if (stfu_log != null_logger && i->debug) {
stfu_log(STFU_LOG_EMERG, "%s TOO LATE !!! %u \n\n\n", i->name, ts);
}
stfu_n_reset(i);
//stfu_n_resize(i, 1);
//stfu_n_sync(i, 1);
@ -701,12 +718,16 @@ static int stfu_n_find_any_frame(stfu_instance_t *in, stfu_queue_t *queue, stfu_
int best_diff = 1000000, cur_diff = 0;
stfu_frame_t *frame = NULL, *best_frame = NULL;
int newer = 0;
int was_read = 0;
stfu_assert(r_frame);
*r_frame = NULL;
top:
was_read = 0;
if (force) {
in->cur_ts = 0;
}
@ -714,6 +735,8 @@ static int stfu_n_find_any_frame(stfu_instance_t *in, stfu_queue_t *queue, stfu_
for(i = 0; i < queue->real_array_size; i++) {
frame = &queue->array[i];
if (frame->was_read) was_read++;
if (!frame->was_read && in->cur_ts && frame->ts > in->cur_ts) {
newer++;
}
@ -727,6 +750,14 @@ static int stfu_n_find_any_frame(stfu_instance_t *in, stfu_queue_t *queue, stfu_
}
}
if (was_read == queue->real_array_size) {
if (stfu_log != null_logger && in->debug) {
stfu_log(STFU_LOG_EMERG, "%s OUT QUEUE FULL RESETTING\n", in->name);
stfu_n_reset(in);
}
return 0;
}
if (!force && !best_frame && newer) {
force = 1;
goto top;
@ -773,6 +804,28 @@ static int stfu_n_find_frame(stfu_instance_t *in, stfu_queue_t *queue, uint32_t
return 0;
}
void stfu_n_dump(stfu_instance_t *i)
{
uint32_t y;
stfu_frame_t *frame = NULL;
if (stfu_log != null_logger && i->debug) {
stfu_log(STFU_LOG_EMERG, "%s OUT QUEUE: ------------\n", i->name);
for(y = 0; y < i->out_queue->array_len; y++) {
frame = &i->out_queue->array[y];
stfu_log(STFU_LOG_EMERG, "%s\t%u:%u r:%d\n", i->name, frame->ts, frame->ts / i->samples_per_packet, frame->was_read);
}
stfu_log(STFU_LOG_EMERG, "%s\n\n", i->name);
stfu_log(STFU_LOG_EMERG, "%s IN QUEUE: ------------\n", i->name);
for(y = 0; y < i->in_queue->array_len; y++) {
frame = &i->in_queue->array[y];
stfu_log(STFU_LOG_EMERG, "%s\t%u:%u r:%d\n", i->name, frame->ts, frame->ts / i->samples_per_packet, frame->was_read);
}
stfu_log(STFU_LOG_EMERG, "%s\n\n\n", i->name);
}
}
stfu_frame_t *stfu_n_read_a_frame(stfu_instance_t *i)
{
stfu_frame_t *rframe = NULL;
@ -849,9 +902,6 @@ stfu_frame_t *stfu_n_read_a_frame(stfu_instance_t *i)
if (!found && i->samples_per_packet) {
uint32_t y;
stfu_frame_t *frame = NULL;
int32_t delay = i->cur_ts - i->last_rd_ts;
uint32_t need = abs(delay) / i->samples_per_packet;
@ -869,23 +919,7 @@ stfu_frame_t *stfu_n_read_a_frame(stfu_instance_t *i)
i->packet_count = 0;
}
if (stfu_log != null_logger && i->debug) {
stfu_log(STFU_LOG_EMERG, "%s ------------\n", i->name);
for(y = 0; y < i->out_queue->array_len; y++) {
frame = &i->out_queue->array[y];
stfu_log(STFU_LOG_EMERG, "%s\t%u:%u\n", i->name, frame->ts, frame->ts / i->samples_per_packet);
}
stfu_log(STFU_LOG_EMERG, "%s ------------\n\n\n", i->name);
stfu_log(STFU_LOG_EMERG, "%s ------------\n", i->name);
for(y = 0; y < i->in_queue->array_len; y++) {
frame = &i->in_queue->array[y];
stfu_log(STFU_LOG_EMERG, "%s\t%u:%u\n", i->name, frame->ts, frame->ts / i->samples_per_packet);
}
stfu_log(STFU_LOG_EMERG, "%s\n\n\n", i->name);
}
stfu_n_dump(i);
}
if (stfu_log != null_logger && (i->debug & DBG_OUT)) {
@ -936,7 +970,9 @@ stfu_frame_t *stfu_n_read_a_frame(stfu_instance_t *i)
} else {
if (force) {
stfu_log(STFU_LOG_EMERG, "%s NO PACKETS HARD RESETTING\n", i->name);
if (stfu_log != null_logger && i->debug) {
stfu_log(STFU_LOG_EMERG, "%s NO PACKETS HARD RESETTING\n", i->name);
}
stfu_n_reset(i);
} else {
i->last_wr_ts = i->cur_ts;