feed all packets to jitterbuffer when enabled to absorb bursts and improve smoothing and delay protection

This commit is contained in:
Anthony Minessale 2014-03-07 02:48:50 +05:00
parent 8a973cf5bc
commit e9847afe22
3 changed files with 114 additions and 70 deletions

View File

@ -339,10 +339,10 @@ static void stfu_n_reset_counters(stfu_instance_t *i)
}
void stfu_n_reset(stfu_instance_t *i)
void _stfu_n_reset(stfu_instance_t *i, const char *file, const char *func, int line)
{
if (stfu_log != null_logger && i->debug) {
stfu_log(STFU_LOG_EMERG, "%s RESET\n", i->name);
stfu_log(file, func, line, STFU_LOG_LEVEL_EMERG, "%s RESET\n", i->name);
}
i->ready = 0;
@ -447,14 +447,11 @@ stfu_status_t stfu_n_add_data(stfu_instance_t *i, uint32_t ts, uint16_t seq, uin
if (i->max_drift) {
if (i->drift_dropped_packets > 500) {
stfu_n_reset(i);
}
if (i->ts_drift < i->max_drift) {
if (++i->drift_dropped_packets < i->drift_max_dropped) {
stfu_log(STFU_LOG_EMERG, "%s TOO LATE !!! %u \n\n\n", i->name, ts);
return STFU_ITS_TOO_LATE;
stfu_n_sync(i, 1);
//return STFU_ITS_TOO_LATE;
}
} else {
i->drift_dropped_packets = 0;
@ -479,7 +476,8 @@ stfu_status_t stfu_n_add_data(stfu_instance_t *i, uint32_t ts, uint16_t seq, uin
if (i->in_queue->array_len < i->in_queue->array_size) {
i->in_queue->array_len++;
}
return STFU_ITS_TOO_LATE;
stfu_n_sync(i, 1);
//return STFU_ITS_TOO_LATE;
}
}
}
@ -522,10 +520,6 @@ stfu_status_t stfu_n_add_data(stfu_instance_t *i, uint32_t ts, uint16_t seq, uin
i->diff_total += i->diff;
if ((i->period_packet_in_count > i->period_time)) {
//uint32_t avg;
//avg = i->diff_total / least1(i->period_packet_in_count);
i->period_packet_in_count = 0;
if (i->period_missing_count == 0 && i->qlen > i->orig_qlen) {
@ -535,10 +529,7 @@ stfu_status_t stfu_n_add_data(stfu_instance_t *i, uint32_t ts, uint16_t seq, uin
stfu_n_reset_counters(i);
}
if (stfu_log != null_logger && i->debug) {
stfu_log(STFU_LOG_EMERG, "I: %s %u/%u i=%u/%u - g:%u/%u c:%u/%u b:%u - %u:%u - %u %d %u %u %d %d %d/%d\n", i->name,
i->qlen, i->max_qlen, i->period_packet_in_count, i->period_time, i->consecutive_good_count,
@ -584,8 +575,9 @@ stfu_status_t stfu_n_add_data(stfu_instance_t *i, uint32_t ts, uint16_t seq, uin
static int stfu_n_find_any_frame(stfu_instance_t *in, stfu_queue_t *queue, stfu_frame_t **r_frame)
{
uint32_t i = 0;
stfu_frame_t *frame = NULL;
uint32_t i = 0, best_index = 0;
int best_diff = 1000000, cur_diff = 0;
stfu_frame_t *frame = NULL, *best_frame = NULL;
stfu_assert(r_frame);
@ -593,16 +585,25 @@ static int stfu_n_find_any_frame(stfu_instance_t *in, stfu_queue_t *queue, stfu_
for(i = 0; i < queue->real_array_size; i++) {
frame = &queue->array[i];
if (!frame->was_read) {
*r_frame = frame;
queue->last_index = i;
frame->was_read = 1;
in->period_packet_out_count++;
in->session_packet_out_count++;
return 1;
cur_diff = abs(frame->ts - in->cur_ts);
if (!frame->was_read && cur_diff < best_diff) {
best_diff = cur_diff;
best_frame = frame;
best_index = i;
}
}
if (best_frame) {
*r_frame = best_frame;
queue->last_index = best_index;
best_frame->was_read = 1;
in->period_packet_out_count++;
in->session_packet_out_count++;
return 1;
}
return 0;
}
@ -735,21 +736,20 @@ stfu_frame_t *stfu_n_read_a_frame(stfu_instance_t *i)
}
if (stfu_log != null_logger && i->debug) {
stfu_log(STFU_LOG_EMERG, "%s ", i->name);
stfu_log(STFU_LOG_EMERG, "%s ------------\n", i->name);
for(y = 0; y < i->out_queue->array_size; y++) {
if ((y % 5) == 0) stfu_log(STFU_LOG_EMERG, "\n%s ", i->name);
frame = &i->out_queue->array[y];
stfu_log(STFU_LOG_EMERG, "%u:%u\t", frame->ts, frame->ts / i->samples_per_packet);
stfu_log(STFU_LOG_EMERG, "%s\t%u:%u\n", i->name, frame->ts, frame->ts / i->samples_per_packet);
}
stfu_log(STFU_LOG_EMERG, "\n%s ", i->name);
stfu_log(STFU_LOG_EMERG, "%s ------------\n\n\n", i->name);
stfu_log(STFU_LOG_EMERG, "%s ------------\n", i->name);
for(y = 0; y < i->in_queue->array_size; y++) {
if ((y % 5) == 0) stfu_log(STFU_LOG_EMERG, "\n%s ", i->name);
frame = &i->in_queue->array[y];
stfu_log(STFU_LOG_EMERG, "%u:%u\t", frame->ts, frame->ts / i->samples_per_packet);
stfu_log(STFU_LOG_EMERG, "%s\t%u:%u\n", i->name, frame->ts, frame->ts / i->samples_per_packet);
}
stfu_log(STFU_LOG_EMERG, "\n%s\n\n\n", i->name);
stfu_log(STFU_LOG_EMERG, "%s\n\n\n", i->name);
}
@ -788,23 +788,37 @@ stfu_frame_t *stfu_n_read_a_frame(stfu_instance_t *i)
i->plc_pt = rframe->pt;
} else {
i->last_wr_ts = i->cur_ts;
rframe = &i->out_queue->int_frame;
rframe->dlen = i->plc_len;
rframe->pt = i->plc_pt;
rframe->ts = i->cur_ts;
rframe->seq = i->cur_seq;
i->miss_count++;
if (stfu_log != null_logger && i->debug) {
stfu_log(STFU_LOG_EMERG, "%s PLC %d %d %ld %u:%u\n", i->name,
i->miss_count, rframe->plc, rframe->dlen, rframe->ts, rframe->ts / i->samples_per_packet);
if (stfu_n_find_any_frame(i, i->out_queue, &rframe)) {
i->cur_ts = rframe->ts;
i->cur_seq = rframe->seq;
i->last_wr_ts = i->cur_ts;
i->miss_count++;
if (stfu_log != null_logger && i->debug) {
stfu_log(STFU_LOG_EMERG, "%s AUTOCORRECT %d %d %ld %u:%u\n", i->name,
i->miss_count, rframe->plc, rframe->dlen, rframe->ts, rframe->ts / i->samples_per_packet);
}
} else {
i->last_wr_ts = i->cur_ts;
rframe = &i->out_queue->int_frame;
rframe->dlen = i->plc_len;
rframe->pt = i->plc_pt;
rframe->ts = i->cur_ts;
rframe->seq = i->cur_seq;
i->miss_count++;
if (stfu_log != null_logger && i->debug) {
stfu_log(STFU_LOG_EMERG, "%s PLC %d %d %ld %u:%u\n", i->name,
i->miss_count, rframe->plc, rframe->dlen, rframe->ts, rframe->ts / i->samples_per_packet);
}
}
if (i->miss_count > i->max_plc) {
stfu_n_reset(i);
rframe = NULL;
}
}
return rframe;

View File

@ -194,7 +194,8 @@ stfu_status_t stfu_n_resize(stfu_instance_t *i, uint32_t qlen);
stfu_status_t stfu_n_add_data(stfu_instance_t *i, uint32_t ts, uint16_t seq, uint32_t pt, void *data, size_t datalen, uint32_t timer_ts, int last);
stfu_frame_t *stfu_n_read_a_frame(stfu_instance_t *i);
STFU_DECLARE(int32_t) stfu_n_copy_next_frame(stfu_instance_t *jb, uint32_t timestamp, uint16_t seq, uint16_t distance, stfu_frame_t *next_frame);
void stfu_n_reset(stfu_instance_t *i);
void _stfu_n_reset(stfu_instance_t *i, const char *file, const char *func, int line);
#define stfu_n_reset(_i) _stfu_n_reset(_i, STFU_PRE)
stfu_status_t stfu_n_sync(stfu_instance_t *i, uint32_t packets);
void stfu_n_call_me(stfu_instance_t *i, stfu_n_call_me_t callback, void *udata);
void stfu_n_debug(stfu_instance_t *i, const char *name);

View File

@ -3594,7 +3594,7 @@ static void jb_logger(const char *file, const char *func, int line, int level, c
va_start(ap, fmt);
ret = switch_vasprintf(&data, fmt, ap);
if (ret != -1) {
switch_log_printf(SWITCH_CHANNEL_LOG_CLEAN, SWITCH_LOG_CONSOLE, "%s", data);
switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_CONSOLE, "%s", data);
free(data);
}
@ -4331,6 +4331,22 @@ SWITCH_DECLARE(void) rtp_flush_read_buffer(switch_rtp_t *rtp_session, switch_rtp
}
}
static int jb_valid(switch_rtp_t *rtp_session)
{
if (rtp_session->ice.ice_user) {
if (!rtp_session->ice.ready && rtp_session->ice.rready) {
return 0;
}
}
if (rtp_session->dtls && rtp_session->dtls->state != DS_READY) {
return 0;
}
return 1;
}
static void do_flush(switch_rtp_t *rtp_session, int force)
{
int was_blocking = 0;
@ -4354,8 +4370,12 @@ static void do_flush(switch_rtp_t *rtp_session, int force)
READ_INC(rtp_session);
if (switch_rtp_ready(rtp_session)) {
if (switch_rtp_ready(rtp_session) ) {
if (rtp_session->jb && !rtp_session->pause_jb && jb_valid(rtp_session)) {
return;
}
if (rtp_session->flags[SWITCH_RTP_FLAG_DEBUG_RTP_READ]) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session),
SWITCH_LOG_CONSOLE, "%s FLUSH\n",
@ -4396,10 +4416,12 @@ static void do_flush(switch_rtp_t *rtp_session, int force)
}
} while (bytes > 0);
#if 0
if (rtp_session->jb && flushed) {
stfu_n_sync(rtp_session->jb, flushed);
reset_jitter_seq(rtp_session);
}
#endif
if (was_blocking && switch_rtp_ready(rtp_session)) {
switch_rtp_clear_flag(rtp_session, SWITCH_RTP_FLAG_NOBLOCK);
@ -4410,21 +4432,6 @@ static void do_flush(switch_rtp_t *rtp_session, int force)
READ_DEC(rtp_session);
}
static int jb_valid(switch_rtp_t *rtp_session)
{
if (rtp_session->ice.ice_user) {
if (!rtp_session->ice.ready && rtp_session->ice.rready) {
return 0;
}
}
if (rtp_session->dtls && rtp_session->dtls->state != DS_READY) {
return 0;
}
return 1;
}
static int check_recv_payload(switch_rtp_t *rtp_session)
{
int ok = 1;
@ -4804,10 +4811,12 @@ static switch_status_t read_rtp_packet(switch_rtp_t *rtp_session, switch_size_t
}
status = SWITCH_STATUS_FALSE;
*bytes = 0;
if (!return_jb_packet) {
return status;
}
*bytes = 0;
}
if (rtp_session->jb && !rtp_session->pause_jb && jb_valid(rtp_session)) {
@ -5047,15 +5056,35 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_
bytes = 0;
if (rtp_session->flags[SWITCH_RTP_FLAG_USE_TIMER]) {
if ((rtp_session->flags[SWITCH_RTP_FLAG_AUTOFLUSH] || rtp_session->flags[SWITCH_RTP_FLAG_STICKY_FLUSH]) &&
!rtp_session->flags[SWITCH_RTP_FLAG_PROXY_MEDIA] &&
!rtp_session->flags[SWITCH_RTP_FLAG_VIDEO] &&
!rtp_session->flags[SWITCH_RTP_FLAG_UDPTL] &&
//!rtp_session->flags[SWITCH_RTP_FLAG_RTCP_MUX] &&
//!rtp_session->dtls &&
rtp_session->read_pollfd) {
if (rtp_session->flags[SWITCH_RTP_FLAG_USE_TIMER] &&
!rtp_session->flags[SWITCH_RTP_FLAG_PROXY_MEDIA] &&
!rtp_session->flags[SWITCH_RTP_FLAG_VIDEO] &&
!rtp_session->flags[SWITCH_RTP_FLAG_UDPTL] &&
rtp_session->read_pollfd) {
if (rtp_session->jb && !rtp_session->pause_jb && jb_valid(rtp_session)) {
while (switch_poll(rtp_session->read_pollfd, 1, &fdr, 0) == SWITCH_STATUS_SUCCESS) {
status = read_rtp_packet(rtp_session, &bytes, flags, SWITCH_FALSE);
if (status == SWITCH_STATUS_GENERR) {
ret = -1;
goto end;
}
if ((*flags & SFF_RTCP)) {
*flags &= ~SFF_RTCP;
has_rtcp = 1;
read_pretriggered = 0;
goto rtcp;
}
if (status != SWITCH_STATUS_FALSE) {
read_pretriggered = 1;
}
}
} else if ((rtp_session->flags[SWITCH_RTP_FLAG_AUTOFLUSH] || rtp_session->flags[SWITCH_RTP_FLAG_STICKY_FLUSH])) {
if (switch_poll(rtp_session->read_pollfd, 1, &fdr, 0) == SWITCH_STATUS_SUCCESS) {
status = read_rtp_packet(rtp_session, &bytes, flags, SWITCH_FALSE);
if (status == SWITCH_STATUS_GENERR) {