diff --git a/src/include/switch_rtp.h b/src/include/switch_rtp.h index 3ec5926810..27a06296bd 100644 --- a/src/include/switch_rtp.h +++ b/src/include/switch_rtp.h @@ -474,6 +474,8 @@ SWITCH_DECLARE(int) switch_rtp_write_frame(switch_rtp_t *rtp_session, switch_fra SWITCH_DECLARE(int) switch_rtp_write_manual(switch_rtp_t *rtp_session, void *data, uint32_t datalen, uint8_t m, switch_payload_t payload, uint32_t ts, switch_frame_flag_t *flags); +SWITCH_DECLARE(switch_status_t) switch_rtp_write_raw(switch_rtp_t *rtp_session, void *data, switch_size_t *bytes, switch_bool_t process_encryption); + /*! \brief Retrieve the SSRC from a given RTP session \param rtp_session the RTP session to retrieve from diff --git a/src/include/switch_vidderbuffer.h b/src/include/switch_vidderbuffer.h index 5cfca7de2d..f885765f18 100644 --- a/src/include/switch_vidderbuffer.h +++ b/src/include/switch_vidderbuffer.h @@ -34,7 +34,7 @@ #define SWITCH_VIDDERBUFFER_H SWITCH_BEGIN_EXTERN_C -SWITCH_DECLARE(switch_status_t) switch_vb_create(switch_vb_t **vbp, uint32_t min_frame_len, uint32_t max_frame_len, switch_bool_t timer_compensation); +SWITCH_DECLARE(switch_status_t) switch_vb_create(switch_vb_t **vbp, uint32_t min_frame_len, uint32_t max_frame_len); SWITCH_DECLARE(switch_status_t) switch_vb_destroy(switch_vb_t **vbp); SWITCH_DECLARE(void) switch_vb_reset(switch_vb_t *vb, switch_bool_t flush); SWITCH_DECLARE(void) switch_vb_debug_level(switch_vb_t *vb, uint8_t level); @@ -42,6 +42,8 @@ SWITCH_DECLARE(int) switch_vb_frame_count(switch_vb_t *vb); SWITCH_DECLARE(int) switch_vb_poll(switch_vb_t *vb); SWITCH_DECLARE(switch_status_t) switch_vb_put_packet(switch_vb_t *vb, switch_rtp_packet_t *packet, switch_size_t len); SWITCH_DECLARE(switch_status_t) switch_vb_get_packet(switch_vb_t *vb, switch_rtp_packet_t *packet, switch_size_t *len); +SWITCH_DECLARE(uint32_t) switch_vb_pop_nack(switch_vb_t *vb); +SWITCH_DECLARE(switch_status_t) switch_vb_get_packet_by_seq(switch_vb_t *vb, uint16_t seq, switch_rtp_packet_t *packet, switch_size_t *len); SWITCH_END_EXTERN_C #endif diff --git a/src/switch_rtp.c b/src/switch_rtp.c index 83485701ee..05db7298dd 100644 --- a/src/switch_rtp.c +++ b/src/switch_rtp.c @@ -300,7 +300,7 @@ struct switch_rtp { uint8_t fir_seq; uint16_t fir_count; uint16_t pli_count; - + uint32_t cur_nack; ts_normalize_t ts_norm; switch_sockaddr_t *remote_addr, *rtcp_remote_addr; rtp_msg_t recv_msg; @@ -389,6 +389,7 @@ struct switch_rtp { uint8_t cn; stfu_instance_t *jb; switch_vb_t *vb; + switch_vb_t *vbw; uint32_t max_missed_packets; uint32_t missed_count; rtp_msg_t write_msg; @@ -1848,9 +1849,13 @@ static int check_rtcp_and_ice(switch_rtp_t *rtp_session) rtcp_ok = 0; } + if (rtp_session->flags[SWITCH_RTP_FLAG_NACK] && rtp_session->vb) { + rtp_session->cur_nack = switch_vb_pop_nack(rtp_session->vb); + } + if (rtp_session->rtcp_sock_output && rtp_session->flags[SWITCH_RTP_FLAG_ENABLE_RTCP] && !rtp_session->flags[SWITCH_RTP_FLAG_RTCP_PASSTHRU] && - ((now - rtp_session->rtcp_last_sent) > rtp_session->rtcp_send_rate * 1000000 || rtp_session->pli_count || rtp_session->fir_count)) { + ((now - rtp_session->rtcp_last_sent) > rtp_session->rtcp_send_rate * 1000000 || rtp_session->pli_count || rtp_session->fir_count || rtp_session->cur_nack)) { switch_rtcp_numbers_t * stats = &rtp_session->stats.rtcp; struct switch_rtcp_receiver_report *rr; struct switch_rtcp_sender_report *sr; @@ -1886,7 +1891,7 @@ static int check_rtcp_and_ice(switch_rtp_t *rtp_session) rtcp_generate_report_block(rtp_session, rtcp_report_block); rtp_session->rtcp_send_msg.header.length = htons((uint16_t)(rtcp_bytes / 4) - 1); - + if (rtp_session->flags[SWITCH_RTP_FLAG_VIDEO]) { if (rtp_session->remote_ssrc == 0) { rtp_session->remote_ssrc = rtp_session->stats.rtcp.peer_ssrc; @@ -1916,6 +1921,31 @@ static int check_rtcp_and_ice(switch_rtp_t *rtp_session) rtcp_bytes += sizeof(switch_rtcp_ext_hdr_t); rtp_session->pli_count = 0; } + + if (rtp_session->flags[SWITCH_RTP_FLAG_NACK] && rtp_session->cur_nack) { + switch_rtcp_ext_hdr_t *ext_hdr; + uint32_t *nack; + p = (uint8_t *) (&rtp_session->rtcp_send_msg) + rtcp_bytes; + ext_hdr = (switch_rtcp_ext_hdr_t *) p; + + ext_hdr->version = 2; + ext_hdr->p = 0; + ext_hdr->fmt = 1; + ext_hdr->pt = 205; + ext_hdr->send_ssrc = htonl(rtp_session->ssrc); + ext_hdr->recv_ssrc = htonl(rtp_session->remote_ssrc); + ext_hdr->length = htons(3); + p += sizeof(switch_rtcp_ext_hdr_t); + nack = (uint32_t *) p; + *nack = rtp_session->cur_nack; + + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, "Sending RTCP NACK %u\n", + ntohs(*nack & 0xFFFF)); + + rtcp_bytes += sizeof(switch_rtcp_ext_hdr_t) + sizeof(rtp_session->cur_nack); + + rtp_session->cur_nack = 0; + } if (rtp_session->fir_count) { switch_rtcp_ext_hdr_t *ext_hdr; @@ -3489,8 +3519,8 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_create(switch_rtp_t **new_rtp_session switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, "Starting video timer.\n"); } - switch_vb_create(&rtp_session->vb, 5, 30, SWITCH_FALSE); - switch_vb_debug_level(rtp_session->vb, 10); + switch_vb_create(&rtp_session->vb, 5, 30); + //switch_vb_debug_level(rtp_session->vb, 10); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, "Starting video buffer.\n"); } else { @@ -4131,6 +4161,10 @@ SWITCH_DECLARE(void) switch_rtp_destroy(switch_rtp_t **rtp_session) switch_vb_destroy(&(*rtp_session)->vb); } + if ((*rtp_session)->vbw) { + switch_vb_destroy(&(*rtp_session)->vbw); + } + if ((*rtp_session)->dtls && (*rtp_session)->dtls == (*rtp_session)->rtcp_dtls) { (*rtp_session)->rtcp_dtls = NULL; } @@ -4824,23 +4858,36 @@ static switch_status_t read_rtp_packet(switch_rtp_t *rtp_session, switch_size_t my_host = switch_get_addr(bufc, sizeof(bufc), rtp_session->local_addr); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG_CLEAN(rtp_session->session), SWITCH_LOG_CONSOLE, - "R %s b=%4ld %s:%u %s:%u %s:%u pt=%d ts=%u m=%d\n", + "R %s b=%4ld %s:%u %s:%u %s:%u pt=%d ts=%u seq=%u m=%d\n", rtp_session->session ? switch_channel_get_name(switch_core_session_get_channel(rtp_session->session)) : "No-Name", (long) *bytes, my_host, switch_sockaddr_get_port(rtp_session->local_addr), old_host, rtp_session->remote_port, tx_host, switch_sockaddr_get_port(rtp_session->from_addr), - rtp_session->recv_msg.header.pt, ntohl(rtp_session->recv_msg.header.ts), rtp_session->recv_msg.header.m); + rtp_session->recv_msg.header.pt, ntohl(rtp_session->recv_msg.header.ts), ntohs(rtp_session->recv_msg.header.seq), + rtp_session->recv_msg.header.m); } - +#ifdef RTP_READ_PLOSS + { + int r = (rand() % 10000) + 1; + if (r <= 200) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_ALERT, + "Simulate dropped packet ......... ts: %u seq: %u\n", ntohl(rtp_session->recv_msg.header.ts), ntohs(rtp_session->recv_msg.header.seq)); + *bytes = 0; + } + } +#endif + + if (sync) { if (!rtp_session->flags[SWITCH_RTP_FLAG_USE_TIMER] && rtp_session->timer.interval) { switch_core_timer_sync(&rtp_session->timer); reset_jitter_seq(rtp_session); } rtp_session->hot_hits = 0; + goto more; } @@ -5049,7 +5096,7 @@ static switch_status_t read_rtp_packet(switch_rtp_t *rtp_session, switch_size_t stfu_n_destroy(&rtp_session->jb); } } - + if (rtp_session->recv_msg.header.version == 2 && *bytes) { if (rtp_session->vb) { @@ -5146,12 +5193,76 @@ static switch_status_t read_rtp_packet(switch_rtp_t *rtp_session, switch_size_t return status; } +static void handle_nack(switch_rtp_t *rtp_session, uint32_t nack) +{ + switch_size_t bytes = 0; + rtp_msg_t send_msg[1] = {{{0}}}; + uint16_t seq = (uint16_t) (nack & 0xFFFF); + int i; + const char *tx_host = NULL; + const char *old_host = NULL; + const char *my_host = NULL; + char bufa[30], bufb[30], bufc[30]; + + if (!(rtp_session->flags[SWITCH_RTP_FLAG_NACK] && rtp_session->vbw)) { + return; /* not enabled */ + } + + if (rtp_session->flags[SWITCH_RTP_FLAG_DEBUG_RTP_WRITE]) { + tx_host = switch_get_addr(bufa, sizeof(bufa), rtp_session->from_addr); + old_host = switch_get_addr(bufb, sizeof(bufb), rtp_session->remote_addr); + my_host = switch_get_addr(bufc, sizeof(bufc), rtp_session->local_addr); + } + + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, "Got NACK [%u][0x%x] for seq %u\n", nack, nack, ntohs(seq)); + + if (switch_vb_get_packet_by_seq(rtp_session->vbw, seq, (switch_rtp_packet_t *) send_msg, &bytes) == SWITCH_STATUS_SUCCESS) { + + if (rtp_session->flags[SWITCH_RTP_FLAG_DEBUG_RTP_WRITE]) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG_CLEAN(rtp_session->session), SWITCH_LOG_CONSOLE, + "X %s b=%4ld %s:%u %s:%u %s:%u pt=%d ts=%u seq=%u m=%d\n", + rtp_session->session ? switch_channel_get_name(switch_core_session_get_channel(rtp_session->session)) : "NoName", + (long) bytes, + my_host, switch_sockaddr_get_port(rtp_session->local_addr), + old_host, rtp_session->remote_port, + tx_host, switch_sockaddr_get_port(rtp_session->from_addr), + send_msg->header.pt, ntohl(send_msg->header.ts), ntohs(send_msg->header.seq), send_msg->header.m); + + } + switch_rtp_write_raw(rtp_session, (void *) send_msg, &bytes, SWITCH_FALSE); + } else { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, "Cannot send NACK for seq %u\n", ntohs(seq)); + } + + for (i = 0; i < 16; i++) { + if ((nack & (1 << (16 + i)))) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, "Also Got NACK for seq %u\n", ntohs(seq) + i); + if (switch_vb_get_packet_by_seq(rtp_session->vbw, htons(ntohs(seq) + i), (switch_rtp_packet_t *) &send_msg, &bytes) == SWITCH_STATUS_SUCCESS) { + if (rtp_session->flags[SWITCH_RTP_FLAG_DEBUG_RTP_WRITE]) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG_CLEAN(rtp_session->session), SWITCH_LOG_CONSOLE, + "X %s b=%4ld %s:%u %s:%u %s:%u pt=%d ts=%u seq=%u m=%d\n", + rtp_session->session ? switch_channel_get_name(switch_core_session_get_channel(rtp_session->session)) : "NoName", + (long) bytes, + my_host, switch_sockaddr_get_port(rtp_session->local_addr), + old_host, rtp_session->remote_port, + tx_host, switch_sockaddr_get_port(rtp_session->from_addr), + send_msg->header.pt, ntohl(send_msg->header.ts), ntohs(send_msg->header.seq), send_msg->header.m); + + } + switch_rtp_write_raw(rtp_session, (void *) &send_msg, &bytes, SWITCH_FALSE); + } else { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, "Cannot send NACK for seq %u\n", ntohs(seq) + i); + } + } + } +} + static switch_status_t process_rtcp_report(switch_rtp_t *rtp_session, rtcp_msg_t *msg, switch_size_t bytes) { switch_status_t status = SWITCH_STATUS_FALSE; - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_CRIT, + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG1, "RTCP packet bytes %" SWITCH_SIZE_T_FMT " type %d pad %d\n", bytes, msg->header.type, msg->header.p); @@ -5159,13 +5270,26 @@ static switch_status_t process_rtcp_report(switch_rtp_t *rtp_session, rtcp_msg_t (msg->header.type == 205 || //RTPFB msg->header.type == 206)) {//PSFB rtcp_ext_msg_t *extp = (rtcp_ext_msg_t *) msg; - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_CRIT, "PICKED UP XRTCP type: %d fmt: %d\n", + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, "PICKED UP XRTCP type: %d fmt: %d\n", msg->header.type, extp->header.fmt); - if ((extp->header.fmt == 4) || (extp->header.fmt == 1)) { /* FIR || PLI */ + if (msg->header.type == 206 && (extp->header.fmt == 4 || extp->header.fmt == 1)) { /* FIR || PLI */ switch_core_media_gen_key_frame(rtp_session->session); switch_channel_set_flag(switch_core_session_get_channel(rtp_session->session), CF_VIDEO_REFRESH_REQ); } + + if (msg->header.type == 205 && extp->header.fmt == 1) { /*NACK*/ + uint32_t *nack = (uint32_t *) extp->body; + int i; + + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, "Got NACK count %d\n", ntohs(extp->header.length) - 2); + + for (i = 0; i < ntohs(extp->header.length) - 2; i++) { + handle_nack(rtp_session, *nack); + nack++; + } + } + } else if (msg->header.type == 200 || msg->header.type == 201) { @@ -5218,7 +5342,7 @@ static switch_status_t process_rtcp_report(switch_rtp_t *rtp_session, rtcp_msg_t if (report_block->lsr && !rtp_session->flags[SWITCH_RTP_FLAG_RTCP_PASSTHRU]) { switch_time_exp_gmt(&now_hr,now); /* Calculating RTT = A - DLSR - LSR */ - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG1, "Receiving an RTCP packet\n[%04d-%02d-%02d %02d:%02d:%02d.%d] SSRC[%u]\n" "RTT[%f] A[%u] - DLSR[%u] - LSR[%u]\n", 1900 + now_hr.tm_year, now_hr.tm_mday, now_hr.tm_mon, now_hr.tm_hour, now_hr.tm_min, now_hr.tm_sec, now_hr.tm_usec, @@ -5587,10 +5711,10 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ pt = 200000; } - if (rtp_session->vb && switch_vb_poll(rtp_session->vb)) { - pt = 1000; - force = 1; - } + //if (rtp_session->vb && switch_vb_poll(rtp_session->vb)) { + // pt = 1000; + // force = 1; + //} poll_status = switch_poll(rtp_session->read_pollfd, 1, &fdr, pt); @@ -6794,22 +6918,47 @@ static int rtp_common_write(switch_rtp_t *rtp_session, my_host = switch_get_addr(bufc, sizeof(bufc), rtp_session->local_addr); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG_CLEAN(rtp_session->session), SWITCH_LOG_CONSOLE, - "W %s b=%4ld %s:%u %s:%u %s:%u pt=%d ts=%u m=%d\n", + "W %s b=%4ld %s:%u %s:%u %s:%u pt=%d ts=%u seq=%u m=%d\n", rtp_session->session ? switch_channel_get_name(switch_core_session_get_channel(rtp_session->session)) : "NoName", (long) bytes, my_host, switch_sockaddr_get_port(rtp_session->local_addr), old_host, rtp_session->remote_port, tx_host, switch_sockaddr_get_port(rtp_session->from_addr), - send_msg->header.pt, ntohl(send_msg->header.ts), send_msg->header.m); + send_msg->header.pt, ntohl(send_msg->header.ts), ntohs(send_msg->header.seq), send_msg->header.m); } + + if (rtp_session->flags[SWITCH_RTP_FLAG_NACK]) { + if (!rtp_session->vbw) { + switch_vb_create(&rtp_session->vbw, 5, 5); + //switch_vb_debug_level(rtp_session->vbw, 10); + } + switch_vb_put_packet(rtp_session->vbw, (switch_rtp_packet_t *)send_msg, bytes); + } +#ifdef RTP_WRITE_PLOSS + { + int r = (rand() % 10000) + 1; + + if (r <= 200) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_ALERT, + "Simulate dropping packet ......... ts: %u seq: %u\n", ntohl(send_msg->header.ts), ntohs(send_msg->header.seq)); + } else { + if (switch_socket_sendto(rtp_session->sock_output, rtp_session->remote_addr, 0, (void *) send_msg, &bytes) != SWITCH_STATUS_SUCCESS) { + rtp_session->seq--; + ret = -1; + goto end; + } + } + } +#else + if (switch_socket_sendto(rtp_session->sock_output, rtp_session->remote_addr, 0, (void *) send_msg, &bytes) != SWITCH_STATUS_SUCCESS) { rtp_session->seq--; ret = -1; goto end; } - +#endif rtp_session->last_write_ts = this_ts; rtp_session->flags[SWITCH_RTP_FLAG_RESET] = 0; @@ -7159,60 +7308,7 @@ SWITCH_DECLARE(int) switch_rtp_write_manual(switch_rtp_t *rtp_session, bytes = rtp_header_len + datalen; -#ifdef ENABLE_SRTP - if (rtp_session->flags[SWITCH_RTP_FLAG_SECURE_SEND]) { - - int sbytes = (int) bytes; - err_status_t stat; - - if (rtp_session->flags[SWITCH_RTP_FLAG_SECURE_SEND_RESET]) { - switch_rtp_clear_flag(rtp_session, SWITCH_RTP_FLAG_SECURE_SEND_RESET); - srtp_dealloc(rtp_session->send_ctx[rtp_session->srtp_idx_rtp]); - rtp_session->send_ctx[rtp_session->srtp_idx_rtp] = NULL; - if ((stat = srtp_create(&rtp_session->send_ctx[rtp_session->srtp_idx_rtp], &rtp_session->send_policy[rtp_session->srtp_idx_rtp]))) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_ERROR, "Error! RE-Activating Secure RTP SEND\n"); - ret = -1; - goto end; - } else { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_INFO, "RE-Activating Secure RTP SEND\n"); - } - } - - stat = srtp_protect(rtp_session->send_ctx[rtp_session->srtp_idx_rtp], &rtp_session->write_msg.header, &sbytes); - if (stat) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_ERROR, "Error: SRTP protection failed with code %d\n", stat); - } - bytes = sbytes; - } -#endif -#ifdef ENABLE_ZRTP - /* ZRTP Send */ - if (zrtp_on && !rtp_session->flags[SWITCH_RTP_FLAG_PROXY_MEDIA]) { - unsigned int sbytes = (int) bytes; - zrtp_status_t stat = zrtp_status_fail; - - stat = zrtp_process_rtp(rtp_session->zrtp_stream, (void *) &rtp_session->write_msg, &sbytes); - - switch (stat) { - case zrtp_status_ok: - break; - case zrtp_status_drop: - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error: zRTP protection drop with code %d\n", stat); - ret = (int) bytes; - goto end; - break; - case zrtp_status_fail: - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error: zRTP protection fail with code %d\n", stat); - break; - default: - break; - } - - bytes = sbytes; - } -#endif - - if (switch_socket_sendto(rtp_session->sock_output, rtp_session->remote_addr, 0, (void *) &rtp_session->write_msg, &bytes) != SWITCH_STATUS_SUCCESS) { + if (switch_rtp_write_raw(rtp_session, (void *) &rtp_session->write_msg, &bytes, SWITCH_TRUE) != SWITCH_STATUS_SUCCESS) { rtp_session->seq--; ret = -1; goto end; @@ -7232,6 +7328,89 @@ SWITCH_DECLARE(int) switch_rtp_write_manual(switch_rtp_t *rtp_session, return ret; } + + +SWITCH_DECLARE(switch_status_t) switch_rtp_write_raw(switch_rtp_t *rtp_session, void *data, switch_size_t *bytes, switch_bool_t process_encryption) +{ + switch_status_t status = SWITCH_STATUS_FALSE; + + switch_assert(bytes); + + if (!switch_rtp_ready(rtp_session) || !rtp_session->remote_addr || *bytes > SWITCH_RTP_MAX_BUF_LEN) { + return status; + } + + if (!rtp_write_ready(rtp_session, *bytes, __LINE__)) { + return SWITCH_STATUS_NOT_INITALIZED; + } + + WRITE_INC(rtp_session); + + if (process_encryption) { + process_encryption = SWITCH_FALSE; +#ifdef ENABLE_SRTP + if (rtp_session->flags[SWITCH_RTP_FLAG_SECURE_SEND]) { + + int sbytes = (int) *bytes; + err_status_t stat; + + if (rtp_session->flags[SWITCH_RTP_FLAG_SECURE_SEND_RESET]) { + switch_rtp_clear_flag(rtp_session, SWITCH_RTP_FLAG_SECURE_SEND_RESET); + srtp_dealloc(rtp_session->send_ctx[rtp_session->srtp_idx_rtp]); + rtp_session->send_ctx[rtp_session->srtp_idx_rtp] = NULL; + if ((stat = srtp_create(&rtp_session->send_ctx[rtp_session->srtp_idx_rtp], &rtp_session->send_policy[rtp_session->srtp_idx_rtp]))) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_ERROR, "Error! RE-Activating Secure RTP SEND\n"); + status = SWITCH_STATUS_FALSE; + goto end; + } else { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_INFO, "RE-Activating Secure RTP SEND\n"); + } + } + + stat = srtp_protect(rtp_session->send_ctx[rtp_session->srtp_idx_rtp], &rtp_session->write_msg.header, &sbytes); + if (stat) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_ERROR, "Error: SRTP protection failed with code %d\n", stat); + } + *bytes = sbytes; + } +#endif +#ifdef ENABLE_ZRTP + /* ZRTP Send */ + if (zrtp_on && !rtp_session->flags[SWITCH_RTP_FLAG_PROXY_MEDIA]) { + unsigned int sbytes = (int) *bytes; + zrtp_status_t stat = zrtp_status_fail; + + stat = zrtp_process_rtp(rtp_session->zrtp_stream, (void *) &rtp_session->write_msg, &sbytes); + + switch (stat) { + case zrtp_status_ok: + break; + case zrtp_status_drop: + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error: zRTP protection drop with code %d\n", stat); + ret = SWITCH_STATUS_SUCCESS; + goto end; + break; + case zrtp_status_fail: + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error: zRTP protection fail with code %d\n", stat); + break; + default: + break; + } + + *bytes = sbytes; + } +#endif + } + + status = switch_socket_sendto(rtp_session->sock_output, rtp_session->remote_addr, 0, data, bytes); + + end: + + WRITE_DEC(rtp_session); + + return status; +} + SWITCH_DECLARE(uint32_t) switch_rtp_get_ssrc(switch_rtp_t *rtp_session) { return rtp_session->ssrc; diff --git a/src/switch_vidderbuffer.c b/src/switch_vidderbuffer.c index 6d9bb71c42..78a973fcde 100644 --- a/src/switch_vidderbuffer.c +++ b/src/switch_vidderbuffer.c @@ -34,54 +34,44 @@ #define MAX_MISSING_SEQ 20 #define vb_debug(_vb, _level, _format, ...) if (_vb->debug_level >= _level) switch_log_printf(SWITCH_CHANNEL_LOG_CLEAN, SWITCH_LOG_ALERT, "VB:%p level:%d line:%d ->" _format, (void *) _vb, _level, __LINE__, __VA_ARGS__) +struct switch_vb_s; + typedef struct switch_vb_node_s { - struct switch_vb_frame_s *parent; + struct switch_vb_s *parent; switch_rtp_packet_t packet; uint32_t len; uint8_t visible; struct switch_vb_node_s *next; } switch_vb_node_t; -typedef struct switch_vb_frame_s { - struct switch_vb_s *parent; - struct switch_vb_node_s *node_list; - uint32_t ts; - uint32_t visible_nodes; - uint8_t visible; - uint8_t complete; - uint8_t mark; - struct switch_vb_frame_s *next; - uint16_t min_seq; - uint16_t max_seq; -} switch_vb_frame_t; - struct switch_vb_s { - struct switch_vb_frame_s *frame_list; - struct switch_vb_frame_s *cur_read_frame; - struct switch_vb_frame_s *cur_write_frame; - uint32_t last_read_ts; - uint32_t last_read_seq; + struct switch_vb_node_s *node_list; uint32_t last_target_seq; - uint32_t last_wrote_ts; - uint32_t last_wrote_seq; + uint32_t highest_read_ts; + uint32_t highest_read_seq; + uint32_t highest_wrote_ts; + uint32_t highest_wrote_seq; uint16_t target_seq; uint16_t seq_out; - uint32_t visible_frames; + uint32_t visible_nodes; uint32_t total_frames; uint32_t complete_frames; uint32_t frame_len; uint32_t min_frame_len; uint32_t max_frame_len; + uint8_t write_init; + uint8_t read_init; uint8_t debug_level; - switch_timer_t timer; - int cur_errs; + uint16_t next_seq; + switch_inthash_t *missing_seq_hash; + switch_inthash_t *node_hash; }; -static inline switch_vb_node_t *new_node(switch_vb_frame_t *frame) +static inline switch_vb_node_t *new_node(switch_vb_t *vb) { switch_vb_node_t *np, *last = NULL; - for (np = frame->node_list; np; np = np->next) { + for (np = vb->node_list; np; np = np->next) { if (!np->visible) { break; } @@ -91,12 +81,11 @@ static inline switch_vb_node_t *new_node(switch_vb_frame_t *frame) if (!np) { switch_zmalloc(np, sizeof(*np)); - np->parent = frame; if (last) { last->next = np; } else { - frame->node_list = np; + vb->node_list = np; } } @@ -104,167 +93,113 @@ static inline switch_vb_node_t *new_node(switch_vb_frame_t *frame) switch_assert(np); np->visible = 1; - np->parent->visible_nodes++; + vb->visible_nodes++; + np->parent = vb; return np; } -static inline void add_node(switch_vb_frame_t *frame, switch_rtp_packet_t *packet, switch_size_t len) +static inline switch_vb_node_t *find_seq(switch_vb_t *vb, uint16_t seq) { - switch_vb_node_t *node = new_node(frame); - uint16_t seq = ntohs(packet->header.seq); + switch_vb_node_t *np; + for (np = vb->node_list; np; np = np->next) { + if (!np->visible) continue; + + if (ntohs(np->packet.header.seq) == ntohs(seq)) { + return np; + } + } + + return NULL; +} + +static inline void hide_node(switch_vb_node_t *node) +{ + node->visible = 0; + node->parent->visible_nodes--; + switch_core_inthash_delete(node->parent->node_hash, node->packet.header.seq); +} + +static inline void hide_nodes(switch_vb_t *vb) +{ + switch_vb_node_t *np; + + for (np = vb->node_list; np; np = np->next) { + hide_node(np); + } +} + +static inline void drop_ts(switch_vb_t *vb, uint32_t ts) +{ + switch_vb_node_t *np; + int x = 0; + + for (np = vb->node_list; np; np = np->next) { + if (!np->visible) continue; + + if (ts == np->packet.header.ts) { + hide_node(np); + x++; + } + } + + if (x) vb->complete_frames--; +} + +static inline uint32_t vb_find_lowest_ts(switch_vb_t *vb) +{ + switch_vb_node_t *np, *lowest = NULL; + + for (np = vb->node_list; np; np = np->next) { + if (!np->visible) continue; + + if (!lowest || ntohl(lowest->packet.header.ts) > ntohl(np->packet.header.ts)) { + lowest = np; + } + } + + return lowest ? lowest->packet.header.ts : 0; +} + +static inline void drop_oldest_frame(switch_vb_t *vb) +{ + uint32_t ts = vb_find_lowest_ts(vb); + + drop_ts(vb, ts); + vb_debug(vb, 1, "Dropping oldest frame ts:%u\n", ntohl(ts)); +} + +static inline void add_node(switch_vb_t *vb, switch_rtp_packet_t *packet, switch_size_t len) +{ + switch_vb_node_t *node = new_node(vb); node->packet = *packet; node->len = len; memcpy(node->packet.body, packet->body, len); - if (!frame->min_seq ||seq < ntohs(frame->min_seq)) { - frame->min_seq = packet->header.seq; + switch_core_inthash_insert(vb->node_hash, node->packet.header.seq, node); + + vb_debug(vb, (packet->header.m ? 1 : 2), "PUT packet last_ts:%u ts:%u seq:%u%s\n", + ntohl(vb->highest_wrote_ts), ntohl(node->packet.header.ts), ntohs(node->packet.header.seq), packet->header.m ? " " : ""); + + if (!vb->write_init || ntohs(packet->header.seq) > ntohs(vb->highest_wrote_seq) || + (ntohs(vb->highest_wrote_seq) > USHRT_MAX - 10 && ntohs(packet->header.seq) <= 10) ) { + vb->highest_wrote_seq = packet->header.seq; } - if (seq > ntohs(frame->max_seq)) { - frame->max_seq = packet->header.seq; - } - - vb_debug(frame->parent, (packet->header.m ? 1 : 2), "PUT packet last_ts:%u ts:%u seq:%u%s\n", - ntohl(frame->parent->last_wrote_ts), ntohl(node->packet.header.ts), ntohs(node->packet.header.seq), packet->header.m ? " " : ""); - - - if (packet->header.m) { - frame->mark = 1; + if (vb->write_init && htons(packet->header.seq) >= htons(vb->highest_wrote_seq) && (ntohl(node->packet.header.ts) > ntohl(vb->highest_wrote_ts))) { + vb->complete_frames++; + vb_debug(vb, 2, "WRITE frame ts: %u complete=%u/%u n:%u\n", ntohl(node->packet.header.ts), vb->complete_frames , vb->frame_len, vb->visible_nodes); + vb->highest_wrote_ts = packet->header.ts; + } else if (!vb->write_init) { + vb->highest_wrote_ts = packet->header.ts; } - if ((frame->parent->last_wrote_ts && frame->parent->last_wrote_ts != node->packet.header.ts)) { - frame->complete = 1; - frame->parent->complete_frames++; + if (!vb->write_init) vb->write_init = 1; + + if (vb->complete_frames > vb->max_frame_len) { + drop_oldest_frame(vb); } - - frame->parent->last_wrote_ts = packet->header.ts; - frame->parent->last_wrote_seq = packet->header.seq; -} - -static inline void hide_node(switch_vb_node_t *node) -{ - if (node->visible) { - node->visible = 0; - node->parent->visible_nodes--; - } -} - -static inline void hide_nodes(switch_vb_frame_t *frame) -{ - switch_vb_node_t *np; - - for (np = frame->node_list; np; np = np->next) { - hide_node(np); - } -} - -static inline void hide_frame(switch_vb_frame_t *frame) -{ - vb_debug(frame->parent, 2, "Hide frame ts: %u\n", ntohl(frame->ts)); - - if (frame->visible) { - frame->visible = 0; - frame->parent->visible_frames--; - } - - if (frame->complete) { - frame->parent->complete_frames--; - frame->complete = 0; - } - - frame->min_seq = frame->max_seq = 0; - - hide_nodes(frame); -} - -static inline switch_vb_frame_t *new_frame(switch_vb_t *vb, switch_rtp_packet_t *packet) -{ - switch_vb_frame_t *fp = NULL, *last = NULL; - int new = 1; - - if (vb->cur_write_frame) { - if (!vb->cur_write_frame->visible) { - vb->cur_write_frame = NULL; - return NULL; - } else if (vb->cur_write_frame->ts == packet->header.ts) { - fp = vb->cur_write_frame; - new = 0; - } - } - - if (!fp) { - for (fp = vb->frame_list; fp; fp = fp->next) { - if (fp->ts == packet->header.ts) { - if (!fp->visible) { - return NULL; - } else { - new = 0; - break; - } - } - } - } - - if (!fp) { - for (fp = vb->frame_list; fp; fp = fp->next) { - if (!fp->visible) { - break; - } - last = fp; - } - } - - if (!fp) { - switch_zmalloc(fp, sizeof(*fp)); - fp->parent = vb; - vb->total_frames++; - - if (last) { - last->next = fp; - } else { - vb->frame_list = fp; - } - } - - switch_assert(fp); - - if (new) { - vb->visible_frames++; - fp->visible = 1; - fp->complete = 0; - fp->ts = packet->header.ts; - fp->min_seq = fp->max_seq = 0; - fp->mark = 0; - } - - vb->cur_write_frame = fp; - - return fp; - -} - -static inline int frame_contains_seq(switch_vb_frame_t *frame, uint16_t target_seq, switch_vb_node_t **nodep) -{ - uint16_t seq = ntohs(target_seq); - switch_vb_node_t *np; - - for (np = frame->node_list; np; np = np->next) { - if (!np->visible) { - continue; - } - //vb_debug(frame->parent, 10, " CMP %u %u/%u\n", ntohl(frame->ts), ntohs(np->packet.header.seq), seq); - if (ntohs(np->packet.header.seq) == seq) { - //vb_debug(frame->parent, 10, " MATCH %u %u v:%d\n", ntohs(np->packet.header.seq), seq, np->visible); - if (nodep) { - *nodep = np; - } - return 1; - } - } - - return 0; } static inline void increment_seq(switch_vb_t *vb) @@ -278,155 +213,56 @@ static inline void set_read_seq(switch_vb_t *vb, uint16_t seq) vb->target_seq = htons((ntohs(vb->last_target_seq) + 1)); } -static inline switch_status_t next_frame(switch_vb_t *vb, switch_vb_node_t **nodep) -{ - switch_vb_frame_t *fp = NULL, *oldest = NULL, *frame_containing_seq = NULL; - - if ((fp = vb->cur_read_frame)) { - if (fp->visible_nodes == 0) { - hide_frame(fp); - vb->cur_read_frame = NULL; - } - } - - - if ((fp = vb->cur_read_frame)) { - int ok = 1; - - if (!fp->visible || fp->visible_nodes == 0) { - ok = 0; - } else { - if (vb->target_seq) { - if (frame_contains_seq(fp, vb->target_seq, nodep)) { - vb_debug(vb, 2, "CUR FRAME %u CONTAINS REQUESTED SEQ %d\n", ntohl(fp->ts), ntohs(vb->target_seq)); - frame_containing_seq = fp; - goto end; - } else { - ok = 0; - } - } - } - - if (!ok) { - vb_debug(vb, 2, "DONE WITH CUR FRAME %u v: %d c: %d\n", ntohl(fp->ts), fp->visible, fp->complete); - vb->cur_read_frame = NULL; - } - } - - do { - *nodep = NULL; - - for (fp = vb->frame_list; fp; fp = fp->next) { - if (!fp->visible || !fp->complete) { - continue; - } - - if (vb->target_seq) { - if (frame_contains_seq(fp, vb->target_seq, nodep)) { - vb_debug(vb, 2, "FOUND FRAME %u CONTAINING SEQ %d\n", ntohl(fp->ts), ntohs(vb->target_seq)); - frame_containing_seq = fp; - goto end; - } - } - - if ((!oldest || htonl(oldest->ts) > htonl(fp->ts))) { - oldest = fp; - } - } - - if (!frame_containing_seq && vb->target_seq) { - if (ntohs(vb->target_seq) - ntohs(vb->last_target_seq) > MAX_MISSING_SEQ) { - vb_debug(vb, 1, "FOUND NO FRAMES CONTAINING SEQ %d. Too many failures....\n", ntohs(vb->target_seq)); - switch_vb_reset(vb, SWITCH_FALSE); - } else { - vb_debug(vb, 2, "FOUND NO FRAMES CONTAINING SEQ %d. Try next one\n", ntohs(vb->target_seq)); - increment_seq(vb); - vb->cur_errs++; - } - } - } while (!frame_containing_seq && vb->target_seq); - - end: - - if (frame_containing_seq) { - vb->cur_read_frame = frame_containing_seq; - if (nodep && *nodep) { - hide_node(*nodep); - set_read_seq(vb, (*nodep)->packet.header.seq); - } - } else if (oldest) { - vb->cur_read_frame = oldest; - } else { - vb->cur_read_frame = NULL; - } - - if (vb->cur_read_frame) { - return SWITCH_STATUS_SUCCESS; - } - - return SWITCH_STATUS_NOTFOUND; -} - -static inline switch_vb_node_t *frame_find_next_seq(switch_vb_frame_t *frame) -{ - switch_vb_node_t *np; - - for (np = frame->node_list; np; np = np->next) { - if (!np->visible) continue; - - if (ntohs(np->packet.header.seq) == ntohs(frame->parent->target_seq)) { - hide_node(np); - set_read_seq(frame->parent, np->packet.header.seq); - return np; - } - } - - return NULL; -} - - -static inline switch_vb_node_t *frame_find_lowest_seq(switch_vb_frame_t *frame) +static inline switch_vb_node_t *vb_find_lowest_seq(switch_vb_t *vb) { switch_vb_node_t *np, *lowest = NULL; - for (np = frame->node_list; np; np = np->next) { + for (np = vb->node_list; np; np = np->next) { if (!np->visible) continue; if (!lowest || ntohs(lowest->packet.header.seq) > ntohs(np->packet.header.seq)) { - hide_node(np); lowest = np; } } - if (lowest) { - set_read_seq(frame->parent, lowest->packet.header.seq); - } - return lowest; } -static inline switch_status_t next_frame_packet(switch_vb_t *vb, switch_vb_node_t **nodep) +static inline switch_status_t vb_next_packet(switch_vb_t *vb, switch_vb_node_t **nodep) { - switch_vb_node_t *node = NULL; + switch_vb_node_t *np = NULL, *node = NULL; switch_status_t status; - if ((status = next_frame(vb, &node) != SWITCH_STATUS_SUCCESS)) { - return status; - } - - if (!node) { - if (vb->target_seq) { - vb_debug(vb, 2, "Search for next packet %u cur ts: %u\n", htons(vb->target_seq), htonl(vb->cur_read_frame->ts)); - node = frame_find_next_seq(vb->cur_read_frame); + if (np) status = 0, status++; + + if (!vb->target_seq) { + if ((node = vb_find_lowest_seq(vb))) { + vb_debug(vb, 2, "No target seq using seq: %u as a starting point\n", ntohs(node->packet.header.seq)); } else { - node = frame_find_lowest_seq(vb->cur_read_frame); - vb_debug(vb, 2, "Find lowest seq frame ts: %u seq: %u\n", ntohl(vb->cur_read_frame->ts), ntohs(node->packet.header.seq)); + vb_debug(vb, 1, "%s", "No nodes available....\n"); + } + } else if ((node = switch_core_inthash_find(vb->node_hash, vb->target_seq))) { + vb_debug(vb, 2, "FOUND desired seq: %u\n", ntohs(vb->target_seq)); + } else { + int x; + + vb_debug(vb, 2, "MISSING desired seq: %u\n", ntohs(vb->target_seq)); + + for (x = 0; x < 10; x++) { + increment_seq(vb); + if ((node = switch_core_inthash_find(vb->node_hash, vb->target_seq))) { + vb_debug(vb, 2, "FOUND incremental seq: %u\n", ntohs(vb->target_seq)); + break; + } else { + vb_debug(vb, 2, "MISSING incremental seq: %u\n", ntohs(vb->target_seq)); + } } } *nodep = node; if (node) { + set_read_seq(vb, node->packet.header.seq); return SWITCH_STATUS_SUCCESS; } @@ -434,9 +270,9 @@ static inline switch_status_t next_frame_packet(switch_vb_t *vb, switch_vb_node_ } -static inline void free_nodes(switch_vb_frame_t *frame) +static inline void free_nodes(switch_vb_t *vb) { - switch_vb_node_t *np = frame->node_list, *cur; + switch_vb_node_t *np = vb->node_list, *cur; while(np) { cur = np; @@ -444,32 +280,9 @@ static inline void free_nodes(switch_vb_frame_t *frame) free(cur); } - frame->node_list = NULL; + vb->node_list = NULL; } -static inline void free_frames(switch_vb_t *vb) -{ - switch_vb_frame_t *fp = vb->frame_list, *cur = NULL; - - while(fp) { - cur = fp; - fp = fp->next; - free_nodes(cur); - free(cur); - } - - vb->frame_list = NULL; -} - -static inline void do_flush(switch_vb_t *vb) -{ - switch_vb_frame_t *fp = vb->frame_list; - - while(fp) { - hide_frame(fp); - fp = fp->next; - } -} SWITCH_DECLARE(int) switch_vb_poll(switch_vb_t *vb) { @@ -490,21 +303,15 @@ SWITCH_DECLARE(void) switch_vb_reset(switch_vb_t *vb, switch_bool_t flush) { vb_debug(vb, 2, "RESET BUFFER flush: %d\n", (int)flush); - - if (vb->cur_read_frame) { - vb->cur_read_frame = NULL; - } - - vb->last_read_ts = 0; vb->last_target_seq = 0; vb->target_seq = 0; if (flush) { - do_flush(vb); + //do_flush(vb); } } -SWITCH_DECLARE(switch_status_t) switch_vb_create(switch_vb_t **vbp, uint32_t min_frame_len, uint32_t max_frame_len, switch_bool_t timer_compensation) +SWITCH_DECLARE(switch_status_t) switch_vb_create(switch_vb_t **vbp, uint32_t min_frame_len, uint32_t max_frame_len) { switch_vb_t *vb; switch_zmalloc(vb, sizeof(*vb)); @@ -512,10 +319,8 @@ SWITCH_DECLARE(switch_status_t) switch_vb_create(switch_vb_t **vbp, uint32_t min vb->min_frame_len = vb->frame_len = min_frame_len; vb->max_frame_len = max_frame_len; //vb->seq_out = (uint16_t) rand(); - - if (timer_compensation) { /* rewrite timestamps and seq as they are read to hide packet loss */ - switch_core_timer_init(&vb->timer, "soft", 1, 90, NULL); - } + switch_core_inthash_init(&vb->missing_seq_hash); + switch_core_inthash_init(&vb->node_hash); *vbp = vb; @@ -526,35 +331,105 @@ SWITCH_DECLARE(switch_status_t) switch_vb_destroy(switch_vb_t **vbp) { switch_vb_t *vb = *vbp; *vbp = NULL; + + switch_core_inthash_destroy(&vb->missing_seq_hash); + switch_core_inthash_destroy(&vb->node_hash); - if (vb->timer.timer_interface) { - switch_core_timer_destroy(&vb->timer); + free_nodes(vb); + free(vb); + + return SWITCH_STATUS_SUCCESS; +} + +SWITCH_DECLARE(uint32_t) switch_vb_pop_nack(switch_vb_t *vb) +{ + switch_hash_index_t *hi = NULL; + uint32_t nack = 0; + uint16_t least = 0; + int i = 0; + + void *val; + const void *var; + + for (hi = switch_core_hash_first(vb->missing_seq_hash); hi; hi = switch_core_hash_next(&hi)) { + uint16_t seq; + + switch_core_hash_this(hi, &var, NULL, &val); + seq = ntohs(*((uint16_t *) var)); + + vb_debug(vb, 3, "WTF ENTRY %u\n", seq); + + if (!least || seq < least) { + least = seq; + } } - free_frames(vb); - free(vb); + if (least && switch_core_inthash_delete(vb->missing_seq_hash, (uint32_t)htons(least))) { + vb_debug(vb, 3, "Found smallest NACKABLE seq %u\n", least); + nack = (uint32_t) htons(least); + + for (i = 1; i > 17; i++) { + if (switch_core_inthash_delete(vb->missing_seq_hash, (uint32_t)htons(least + i))) { + vb_debug(vb, 3, "Found addtl NACKABLE seq %u\n", least + i); + nack |= (1 << (16 + i)); + } else { + break; + } + } + } + + return nack; +} + +SWITCH_DECLARE(switch_status_t) switch_vb_push_packet(switch_vb_t *vb, switch_rtp_packet_t *packet, switch_size_t len) +{ + add_node(vb, packet, len); return SWITCH_STATUS_SUCCESS; } SWITCH_DECLARE(switch_status_t) switch_vb_put_packet(switch_vb_t *vb, switch_rtp_packet_t *packet, switch_size_t len) { - switch_vb_frame_t *frame; + uint32_t i; + uint16_t want = ntohs(vb->next_seq), got = ntohs(packet->header.seq); + + if (!want) want = got; -#ifdef VB_PLOSS - int r = (rand() % 10000) + 1; - if (r <= 200) { - vb_debug(vb, 1, "Simulate dropped packet ......... ts: %u seq: %u\n", ntohl(packet->header.ts), ntohs(packet->header.seq)); - return SWITCH_STATUS_SUCCESS; - } -#endif - - if ((frame = new_frame(vb, packet))) { - add_node(frame, packet, len); - return SWITCH_STATUS_SUCCESS; + if (got > want) { + for (i = want; i < got; i++) { + vb_debug(vb, 2, "MARK SEQ MISSING %u\n", i); + switch_core_inthash_insert(vb->missing_seq_hash, (uint32_t)htons(i), (void *)SWITCH_TRUE); + } + } else { + if (switch_core_inthash_delete(vb->missing_seq_hash, (uint32_t)htons(got))) { + vb_debug(vb, 2, "MARK SEQ FOUND %u\n", got); + } } - return SWITCH_STATUS_IGNORE; + if (got >= want) { + vb->next_seq = htons(ntohs(packet->header.seq) + 1); + } + + add_node(vb, packet, len); + + return SWITCH_STATUS_SUCCESS; +} + +SWITCH_DECLARE(switch_status_t) switch_vb_get_packet_by_seq(switch_vb_t *vb, uint16_t seq, switch_rtp_packet_t *packet, switch_size_t *len) +{ + switch_vb_node_t *node; + + if ((node = switch_core_inthash_find(vb->node_hash, seq))) { + vb_debug(vb, 2, "Found buffered seq: %u\n", ntohs(seq)); + *packet = node->packet; + *len = node->len; + memcpy(packet->body, node->packet.body, node->len); + return SWITCH_STATUS_SUCCESS; + } else { + vb_debug(vb, 2, "Missing buffered seq: %u\n", ntohs(seq)); + } + + return SWITCH_STATUS_NOTFOUND; } SWITCH_DECLARE(switch_status_t) switch_vb_get_packet(switch_vb_t *vb, switch_rtp_packet_t *packet, switch_size_t *len) @@ -562,15 +437,32 @@ SWITCH_DECLARE(switch_status_t) switch_vb_get_packet(switch_vb_t *vb, switch_rtp switch_vb_node_t *node = NULL; switch_status_t status; - vb->cur_errs = 0; + vb_debug(vb, 2, "GET PACKET %u/%u n:%d\n", vb->complete_frames , vb->frame_len, vb->visible_nodes); if (vb->complete_frames < vb->frame_len) { vb_debug(vb, 2, "BUFFERING %u/%u\n", vb->complete_frames , vb->frame_len); return SWITCH_STATUS_MORE_DATA; } - if ((status = next_frame_packet(vb, &node)) == SWITCH_STATUS_SUCCESS) { - vb_debug(vb, 2, "Found next frame cur ts: %u seq: %u\n", htonl(vb->cur_read_frame->ts), htons(node->packet.header.seq)); + if ((status = vb_next_packet(vb, &node)) == SWITCH_STATUS_SUCCESS) { + vb_debug(vb, 2, "Found next frame cur ts: %u seq: %u\n", htonl(node->packet.header.ts), htons(node->packet.header.seq)); + + if (!vb->read_init || ntohs(node->packet.header.seq) > ntohs(vb->highest_read_seq) || + (ntohs(vb->highest_read_seq) > USHRT_MAX - 10 && ntohs(node->packet.header.seq) <= 10) ) { + vb->highest_read_seq = node->packet.header.seq; + } + + if (vb->read_init && htons(node->packet.header.seq) >= htons(vb->highest_read_seq) && (ntohl(node->packet.header.ts) > ntohl(vb->highest_read_ts))) { + vb->complete_frames--; + vb_debug(vb, 2, "READ frame ts: %u complete=%u/%u n:%u\n", ntohl(node->packet.header.ts), vb->complete_frames , vb->frame_len, vb->visible_nodes); + vb->highest_read_ts = node->packet.header.ts; + } else if (!vb->read_init) { + vb->highest_read_ts = node->packet.header.ts; + } + + if (!vb->read_init) vb->read_init = 1; + + } else { switch_vb_reset(vb, SWITCH_FALSE); @@ -591,35 +483,11 @@ SWITCH_DECLARE(switch_status_t) switch_vb_get_packet(switch_vb_t *vb, switch_rtp *packet = node->packet; *len = node->len; memcpy(packet->body, node->packet.body, node->len); - - if (vb->cur_errs) { - vb_debug(vb, 1, "One or more Missing SEQ TS %u\n", ntohl(packet->header.ts)); - status = SWITCH_STATUS_BREAK; - } - - vb->last_read_ts = packet->header.ts; - vb->last_read_seq = packet->header.seq; - - if (vb->timer.timer_interface) { - if (packet->header.m || !vb->timer.samplecount) { - switch_core_timer_sync(&vb->timer); - } - } - - if (vb->cur_read_frame && vb->cur_read_frame->visible_nodes == 0 && !packet->header.m) { - /* force mark bit */ - vb_debug(vb, 1, "LAST PACKET %u WITH NO MARK BIT, ADDIONG MARK BIT\n", ntohl(packet->header.ts)); - packet->header.m = 1; - status = SWITCH_STATUS_BREAK; - } + hide_node(node); vb_debug(vb, 1, "GET packet ts:%u seq:%u~%u%s\n", ntohl(packet->header.ts), ntohs(packet->header.seq), vb->seq_out, packet->header.m ? " " : ""); //packet->header.seq = htons(vb->seq_out++); - if (vb->timer.timer_interface) { - packet->header.ts = htonl(vb->timer.samplecount); - } - return status; }