diff --git a/libs/stfu/stfu.c b/libs/stfu/stfu.c index 374185ac9f..d83baed017 100644 --- a/libs/stfu/stfu.c +++ b/libs/stfu/stfu.c @@ -339,10 +339,10 @@ static void stfu_n_reset_counters(stfu_instance_t *i) } -void stfu_n_reset(stfu_instance_t *i) +void _stfu_n_reset(stfu_instance_t *i, const char *file, const char *func, int line) { if (stfu_log != null_logger && i->debug) { - stfu_log(STFU_LOG_EMERG, "%s RESET\n", i->name); + stfu_log(file, func, line, STFU_LOG_LEVEL_EMERG, "%s RESET\n", i->name); } i->ready = 0; @@ -447,14 +447,11 @@ stfu_status_t stfu_n_add_data(stfu_instance_t *i, uint32_t ts, uint16_t seq, uin if (i->max_drift) { - if (i->drift_dropped_packets > 500) { - stfu_n_reset(i); - } - if (i->ts_drift < i->max_drift) { if (++i->drift_dropped_packets < i->drift_max_dropped) { stfu_log(STFU_LOG_EMERG, "%s TOO LATE !!! %u \n\n\n", i->name, ts); - return STFU_ITS_TOO_LATE; + stfu_n_sync(i, 1); + //return STFU_ITS_TOO_LATE; } } else { i->drift_dropped_packets = 0; @@ -479,7 +476,8 @@ stfu_status_t stfu_n_add_data(stfu_instance_t *i, uint32_t ts, uint16_t seq, uin if (i->in_queue->array_len < i->in_queue->array_size) { i->in_queue->array_len++; } - return STFU_ITS_TOO_LATE; + stfu_n_sync(i, 1); + //return STFU_ITS_TOO_LATE; } } } @@ -522,10 +520,6 @@ stfu_status_t stfu_n_add_data(stfu_instance_t *i, uint32_t ts, uint16_t seq, uin i->diff_total += i->diff; if ((i->period_packet_in_count > i->period_time)) { - //uint32_t avg; - - //avg = i->diff_total / least1(i->period_packet_in_count); - i->period_packet_in_count = 0; if (i->period_missing_count == 0 && i->qlen > i->orig_qlen) { @@ -535,10 +529,7 @@ stfu_status_t stfu_n_add_data(stfu_instance_t *i, uint32_t ts, uint16_t seq, uin stfu_n_reset_counters(i); } - - - if (stfu_log != null_logger && i->debug) { stfu_log(STFU_LOG_EMERG, "I: %s %u/%u i=%u/%u - g:%u/%u c:%u/%u b:%u - %u:%u - %u %d %u %u %d %d %d/%d\n", i->name, i->qlen, i->max_qlen, i->period_packet_in_count, i->period_time, i->consecutive_good_count, @@ -584,8 +575,9 @@ stfu_status_t stfu_n_add_data(stfu_instance_t *i, uint32_t ts, uint16_t seq, uin static int stfu_n_find_any_frame(stfu_instance_t *in, stfu_queue_t *queue, stfu_frame_t **r_frame) { - uint32_t i = 0; - stfu_frame_t *frame = NULL; + uint32_t i = 0, best_index = 0; + int best_diff = 1000000, cur_diff = 0; + stfu_frame_t *frame = NULL, *best_frame = NULL; stfu_assert(r_frame); @@ -593,16 +585,25 @@ static int stfu_n_find_any_frame(stfu_instance_t *in, stfu_queue_t *queue, stfu_ for(i = 0; i < queue->real_array_size; i++) { frame = &queue->array[i]; - if (!frame->was_read) { - *r_frame = frame; - queue->last_index = i; - frame->was_read = 1; - in->period_packet_out_count++; - in->session_packet_out_count++; - return 1; + cur_diff = abs(frame->ts - in->cur_ts); + + if (!frame->was_read && cur_diff < best_diff) { + best_diff = cur_diff; + best_frame = frame; + best_index = i; } } + if (best_frame) { + *r_frame = best_frame; + queue->last_index = best_index; + best_frame->was_read = 1; + in->period_packet_out_count++; + in->session_packet_out_count++; + return 1; + } + + return 0; } @@ -735,21 +736,20 @@ stfu_frame_t *stfu_n_read_a_frame(stfu_instance_t *i) } if (stfu_log != null_logger && i->debug) { - stfu_log(STFU_LOG_EMERG, "%s ", i->name); + stfu_log(STFU_LOG_EMERG, "%s ------------\n", i->name); for(y = 0; y < i->out_queue->array_size; y++) { - if ((y % 5) == 0) stfu_log(STFU_LOG_EMERG, "\n%s ", i->name); frame = &i->out_queue->array[y]; - stfu_log(STFU_LOG_EMERG, "%u:%u\t", frame->ts, frame->ts / i->samples_per_packet); + stfu_log(STFU_LOG_EMERG, "%s\t%u:%u\n", i->name, frame->ts, frame->ts / i->samples_per_packet); } - stfu_log(STFU_LOG_EMERG, "\n%s ", i->name); + stfu_log(STFU_LOG_EMERG, "%s ------------\n\n\n", i->name); + stfu_log(STFU_LOG_EMERG, "%s ------------\n", i->name); for(y = 0; y < i->in_queue->array_size; y++) { - if ((y % 5) == 0) stfu_log(STFU_LOG_EMERG, "\n%s ", i->name); frame = &i->in_queue->array[y]; - stfu_log(STFU_LOG_EMERG, "%u:%u\t", frame->ts, frame->ts / i->samples_per_packet); + stfu_log(STFU_LOG_EMERG, "%s\t%u:%u\n", i->name, frame->ts, frame->ts / i->samples_per_packet); } - stfu_log(STFU_LOG_EMERG, "\n%s\n\n\n", i->name); + stfu_log(STFU_LOG_EMERG, "%s\n\n\n", i->name); } @@ -788,23 +788,37 @@ stfu_frame_t *stfu_n_read_a_frame(stfu_instance_t *i) i->plc_pt = rframe->pt; } else { - i->last_wr_ts = i->cur_ts; - rframe = &i->out_queue->int_frame; - rframe->dlen = i->plc_len; - rframe->pt = i->plc_pt; - rframe->ts = i->cur_ts; - rframe->seq = i->cur_seq; - i->miss_count++; - - if (stfu_log != null_logger && i->debug) { - stfu_log(STFU_LOG_EMERG, "%s PLC %d %d %ld %u:%u\n", i->name, - i->miss_count, rframe->plc, rframe->dlen, rframe->ts, rframe->ts / i->samples_per_packet); + if (stfu_n_find_any_frame(i, i->out_queue, &rframe)) { + i->cur_ts = rframe->ts; + i->cur_seq = rframe->seq; + i->last_wr_ts = i->cur_ts; + i->miss_count++; + + if (stfu_log != null_logger && i->debug) { + stfu_log(STFU_LOG_EMERG, "%s AUTOCORRECT %d %d %ld %u:%u\n", i->name, + i->miss_count, rframe->plc, rframe->dlen, rframe->ts, rframe->ts / i->samples_per_packet); + } + + } else { + i->last_wr_ts = i->cur_ts; + rframe = &i->out_queue->int_frame; + rframe->dlen = i->plc_len; + rframe->pt = i->plc_pt; + rframe->ts = i->cur_ts; + rframe->seq = i->cur_seq; + i->miss_count++; + + if (stfu_log != null_logger && i->debug) { + stfu_log(STFU_LOG_EMERG, "%s PLC %d %d %ld %u:%u\n", i->name, + i->miss_count, rframe->plc, rframe->dlen, rframe->ts, rframe->ts / i->samples_per_packet); + } } if (i->miss_count > i->max_plc) { stfu_n_reset(i); rframe = NULL; } + } return rframe; diff --git a/libs/stfu/stfu.h b/libs/stfu/stfu.h index 3ff5326271..895c0911aa 100644 --- a/libs/stfu/stfu.h +++ b/libs/stfu/stfu.h @@ -194,7 +194,8 @@ stfu_status_t stfu_n_resize(stfu_instance_t *i, uint32_t qlen); stfu_status_t stfu_n_add_data(stfu_instance_t *i, uint32_t ts, uint16_t seq, uint32_t pt, void *data, size_t datalen, uint32_t timer_ts, int last); stfu_frame_t *stfu_n_read_a_frame(stfu_instance_t *i); STFU_DECLARE(int32_t) stfu_n_copy_next_frame(stfu_instance_t *jb, uint32_t timestamp, uint16_t seq, uint16_t distance, stfu_frame_t *next_frame); -void stfu_n_reset(stfu_instance_t *i); +void _stfu_n_reset(stfu_instance_t *i, const char *file, const char *func, int line); +#define stfu_n_reset(_i) _stfu_n_reset(_i, STFU_PRE) stfu_status_t stfu_n_sync(stfu_instance_t *i, uint32_t packets); void stfu_n_call_me(stfu_instance_t *i, stfu_n_call_me_t callback, void *udata); void stfu_n_debug(stfu_instance_t *i, const char *name); diff --git a/src/switch_rtp.c b/src/switch_rtp.c index 07a26eaa46..801fa9a7b8 100644 --- a/src/switch_rtp.c +++ b/src/switch_rtp.c @@ -3594,7 +3594,7 @@ static void jb_logger(const char *file, const char *func, int line, int level, c va_start(ap, fmt); ret = switch_vasprintf(&data, fmt, ap); if (ret != -1) { - switch_log_printf(SWITCH_CHANNEL_LOG_CLEAN, SWITCH_LOG_CONSOLE, "%s", data); + switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_CONSOLE, "%s", data); free(data); } @@ -4331,6 +4331,22 @@ SWITCH_DECLARE(void) rtp_flush_read_buffer(switch_rtp_t *rtp_session, switch_rtp } } +static int jb_valid(switch_rtp_t *rtp_session) +{ + if (rtp_session->ice.ice_user) { + if (!rtp_session->ice.ready && rtp_session->ice.rready) { + return 0; + } + } + + if (rtp_session->dtls && rtp_session->dtls->state != DS_READY) { + return 0; + } + + return 1; +} + + static void do_flush(switch_rtp_t *rtp_session, int force) { int was_blocking = 0; @@ -4354,8 +4370,12 @@ static void do_flush(switch_rtp_t *rtp_session, int force) READ_INC(rtp_session); - if (switch_rtp_ready(rtp_session)) { - + if (switch_rtp_ready(rtp_session) ) { + + if (rtp_session->jb && !rtp_session->pause_jb && jb_valid(rtp_session)) { + return; + } + if (rtp_session->flags[SWITCH_RTP_FLAG_DEBUG_RTP_READ]) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_CONSOLE, "%s FLUSH\n", @@ -4396,10 +4416,12 @@ static void do_flush(switch_rtp_t *rtp_session, int force) } } while (bytes > 0); +#if 0 if (rtp_session->jb && flushed) { stfu_n_sync(rtp_session->jb, flushed); reset_jitter_seq(rtp_session); } +#endif if (was_blocking && switch_rtp_ready(rtp_session)) { switch_rtp_clear_flag(rtp_session, SWITCH_RTP_FLAG_NOBLOCK); @@ -4410,21 +4432,6 @@ static void do_flush(switch_rtp_t *rtp_session, int force) READ_DEC(rtp_session); } -static int jb_valid(switch_rtp_t *rtp_session) -{ - if (rtp_session->ice.ice_user) { - if (!rtp_session->ice.ready && rtp_session->ice.rready) { - return 0; - } - } - - if (rtp_session->dtls && rtp_session->dtls->state != DS_READY) { - return 0; - } - - return 1; -} - static int check_recv_payload(switch_rtp_t *rtp_session) { int ok = 1; @@ -4804,10 +4811,12 @@ static switch_status_t read_rtp_packet(switch_rtp_t *rtp_session, switch_size_t } status = SWITCH_STATUS_FALSE; + *bytes = 0; + if (!return_jb_packet) { return status; } - *bytes = 0; + } if (rtp_session->jb && !rtp_session->pause_jb && jb_valid(rtp_session)) { @@ -5047,15 +5056,35 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ bytes = 0; - if (rtp_session->flags[SWITCH_RTP_FLAG_USE_TIMER]) { - if ((rtp_session->flags[SWITCH_RTP_FLAG_AUTOFLUSH] || rtp_session->flags[SWITCH_RTP_FLAG_STICKY_FLUSH]) && - !rtp_session->flags[SWITCH_RTP_FLAG_PROXY_MEDIA] && - !rtp_session->flags[SWITCH_RTP_FLAG_VIDEO] && - !rtp_session->flags[SWITCH_RTP_FLAG_UDPTL] && - //!rtp_session->flags[SWITCH_RTP_FLAG_RTCP_MUX] && - //!rtp_session->dtls && - rtp_session->read_pollfd) { + if (rtp_session->flags[SWITCH_RTP_FLAG_USE_TIMER] && + !rtp_session->flags[SWITCH_RTP_FLAG_PROXY_MEDIA] && + !rtp_session->flags[SWITCH_RTP_FLAG_VIDEO] && + !rtp_session->flags[SWITCH_RTP_FLAG_UDPTL] && + rtp_session->read_pollfd) { + + if (rtp_session->jb && !rtp_session->pause_jb && jb_valid(rtp_session)) { + while (switch_poll(rtp_session->read_pollfd, 1, &fdr, 0) == SWITCH_STATUS_SUCCESS) { + status = read_rtp_packet(rtp_session, &bytes, flags, SWITCH_FALSE); + if (status == SWITCH_STATUS_GENERR) { + ret = -1; + goto end; + } + + if ((*flags & SFF_RTCP)) { + *flags &= ~SFF_RTCP; + has_rtcp = 1; + read_pretriggered = 0; + goto rtcp; + } + + if (status != SWITCH_STATUS_FALSE) { + read_pretriggered = 1; + } + } + + } else if ((rtp_session->flags[SWITCH_RTP_FLAG_AUTOFLUSH] || rtp_session->flags[SWITCH_RTP_FLAG_STICKY_FLUSH])) { + if (switch_poll(rtp_session->read_pollfd, 1, &fdr, 0) == SWITCH_STATUS_SUCCESS) { status = read_rtp_packet(rtp_session, &bytes, flags, SWITCH_FALSE); if (status == SWITCH_STATUS_GENERR) {