diff --git a/src/switch_rtp.c b/src/switch_rtp.c index 7963be03a0..172c4a4ffc 100644 --- a/src/switch_rtp.c +++ b/src/switch_rtp.c @@ -44,6 +44,10 @@ #include #include +#define READ_INC(rtp_session) switch_mutex_lock(rtp_session->read_mutex); rtp_session->reading++ +#define READ_DEC(rtp_session) switch_mutex_unlock(rtp_session->read_mutex); rtp_session->reading-- +#define WRITE_INC(rtp_session) switch_mutex_lock(rtp_session->write_mutex); rtp_session->writing++ +#define WRITE_DEC(rtp_session) switch_mutex_unlock(rtp_session->write_mutex); rtp_session->writing-- #include "stfu.h" @@ -167,6 +171,8 @@ struct switch_rtp { switch_payload_t te; switch_payload_t cng_pt; switch_mutex_t *flag_mutex; + switch_mutex_t *read_mutex; + switch_mutex_t *write_mutex; switch_timer_t timer; uint8_t ready; uint8_t cn; @@ -176,6 +182,8 @@ struct switch_rtp { uint32_t missed_count; rtp_msg_t write_msg; switch_rtp_crypto_key_t *crypto_keys[SWITCH_RTP_CRYPTO_MAX]; + int reading; + int writing; }; static int global_init = 0; @@ -194,13 +202,16 @@ static switch_status_t ice_out(switch_rtp_t *rtp_session) switch_stun_packet_t *packet; unsigned int elapsed; switch_size_t bytes; + switch_status_t status = SWITCH_STATUS_SUCCESS; + WRITE_INC(rtp_session); + switch_assert(rtp_session != NULL); switch_assert(rtp_session->ice_user != NULL); if (rtp_session->stuncount != 0) { rtp_session->stuncount--; - return SWITCH_STATUS_SUCCESS; + goto end; } if (rtp_session->last_stun) { @@ -208,7 +219,8 @@ static switch_status_t ice_out(switch_rtp_t *rtp_session) if (elapsed > 30000) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "No stun for a long time (PUNT!)\n"); - return SWITCH_STATUS_FALSE; + status = SWITCH_STATUS_FALSE; + goto end; } } @@ -218,6 +230,10 @@ static switch_status_t ice_out(switch_rtp_t *rtp_session) switch_socket_sendto(rtp_session->sock, rtp_session->remote_addr, 0, (void *) packet, &bytes); rtp_session->stuncount = 25; + end: + WRITE_DEC(rtp_session); + + return SWITCH_STATUS_SUCCESS; } @@ -228,6 +244,10 @@ static void handle_ice(switch_rtp_t *rtp_session, void *data, switch_size_t len) char username[33] = { 0 }; unsigned char buf[512] = { 0 }; switch_size_t cpylen = len; + + + READ_INC(rtp_session); + WRITE_INC(rtp_session); if (cpylen > 512) { cpylen = 512; @@ -271,6 +291,9 @@ static void handle_ice(switch_rtp_t *rtp_session, void *data, switch_size_t len) bytes = switch_stun_packet_length(rpacket); switch_socket_sendto(rtp_session->sock, rtp_session->from_addr, 0, (void *) rpacket, &bytes); } + + READ_DEC(rtp_session); + WRITE_DEC(rtp_session); } @@ -383,6 +406,9 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_set_local_address(switch_rtp_t *rtp_s int x; #endif + WRITE_INC(rtp_session); + READ_INC(rtp_session); + *err = NULL; if (switch_sockaddr_info_get(&rtp_session->local_addr, host, SWITCH_UNSPEC, port, 0, rtp_session->pool) != SWITCH_STATUS_SUCCESS) { @@ -460,6 +486,9 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_set_local_address(switch_rtp_t *rtp_s if (old_sock) { switch_socket_close(old_sock); } + + WRITE_DEC(rtp_session); + READ_DEC(rtp_session); return status; } @@ -609,8 +638,11 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_create(switch_rtp_t **new_rtp_session rtp_session->pool = pool; rtp_session->te = 101; + rtp_session->ready = 1; switch_mutex_init(&rtp_session->flag_mutex, SWITCH_MUTEX_NESTED, pool); + switch_mutex_init(&rtp_session->read_mutex, SWITCH_MUTEX_NESTED, pool); + switch_mutex_init(&rtp_session->write_mutex, SWITCH_MUTEX_NESTED, pool); switch_mutex_init(&rtp_session->dtmf_data.dtmf_mutex, SWITCH_MUTEX_NESTED, pool); switch_queue_create(&rtp_session->dtmf_data.dtmf_queue, 100, rtp_session->pool); switch_queue_create(&rtp_session->dtmf_data.dtmf_inqueue, 100, rtp_session->pool); @@ -703,7 +735,7 @@ SWITCH_DECLARE(switch_rtp_t *) switch_rtp_new(const char *rx_host, end: if (rtp_session) { - rtp_session->ready = 1; + rtp_session->ready = 2; rtp_session->rx_host = switch_core_strdup(rtp_session->pool, rx_host); rtp_session->rx_port = rx_port; } else { @@ -755,27 +787,42 @@ SWITCH_DECLARE(void) switch_rtp_kill_socket(switch_rtp_t *rtp_session) switch_assert(rtp_session != NULL); switch_mutex_lock(rtp_session->flag_mutex); if (switch_test_flag(rtp_session, SWITCH_RTP_FLAG_IO)) { + switch_clear_flag(rtp_session, SWITCH_RTP_FLAG_IO); switch_assert(rtp_session->sock != NULL); switch_socket_shutdown(rtp_session->sock, SWITCH_SHUTDOWN_READWRITE); - switch_clear_flag(rtp_session, SWITCH_RTP_FLAG_IO); } switch_mutex_unlock(rtp_session->flag_mutex); } SWITCH_DECLARE(uint8_t) switch_rtp_ready(switch_rtp_t *rtp_session) { - return (rtp_session != NULL && rtp_session->sock && rtp_session->ready) ? 1 : 0; + return (rtp_session != NULL && switch_test_flag(rtp_session, SWITCH_RTP_FLAG_IO) && rtp_session->sock && rtp_session->ready == 2) ? 1 : 0; } SWITCH_DECLARE(void) switch_rtp_destroy(switch_rtp_t **rtp_session) { void *pop; switch_socket_t *sock; + int sanity = 0; - if (!switch_rtp_ready(*rtp_session)) { + switch_mutex_lock((*rtp_session)->flag_mutex); + + if (!rtp_session || !*rtp_session || !(*rtp_session)->ready) { return; } + (*rtp_session)->ready = 0; + + + while((*rtp_session)->reading || (*rtp_session)->writing) { + switch_yield(10000); + if (++sanity > 1000) { + break; + } + } + + switch_rtp_kill_socket(*rtp_session); + while(switch_queue_trypop((*rtp_session)->dtmf_data.dtmf_inqueue, &pop) == SWITCH_STATUS_SUCCESS) { free(pop); } @@ -783,22 +830,16 @@ SWITCH_DECLARE(void) switch_rtp_destroy(switch_rtp_t **rtp_session) while(switch_queue_trypop((*rtp_session)->dtmf_data.dtmf_queue, &pop) == SWITCH_STATUS_SUCCESS) { free(pop); } - - (*rtp_session)->ready = 0; - - switch_mutex_lock((*rtp_session)->flag_mutex); if ((*rtp_session)->jb) { stfu_n_destroy(&(*rtp_session)->jb); } - switch_rtp_kill_socket(*rtp_session); sock = (*rtp_session)->sock; (*rtp_session)->sock = NULL; switch_socket_close(sock); - if (switch_test_flag((*rtp_session), SWITCH_RTP_FLAG_VAD)) { switch_rtp_disable_vad(*rtp_session); } @@ -824,8 +865,8 @@ SWITCH_DECLARE(void) switch_rtp_destroy(switch_rtp_t **rtp_session) } switch_rtp_release_port((*rtp_session)->rx_host, (*rtp_session)->rx_port); - switch_mutex_unlock((*rtp_session)->flag_mutex); + return; } @@ -993,11 +1034,14 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ switch_status_t status; uint8_t check = 1; stfu_frame_t *jb_frame; + int ret = -1; if (!rtp_session->timer.interval) { rtp_session->last_time = switch_time_now(); } + READ_INC(rtp_session); + while (switch_rtp_ready(rtp_session)) { bytes = sizeof(rtp_msg_t); status = switch_socket_recvfrom(rtp_session->from_addr, rtp_session->sock, 0, (void *) &rtp_session->recv_msg, &bytes); @@ -1006,10 +1050,6 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ switch_core_timer_step(&rtp_session->timer); } - if (!switch_test_flag(rtp_session, SWITCH_RTP_FLAG_IO)) { - return -1; - } - if (bytes && switch_test_flag(rtp_session, SWITCH_RTP_FLAG_SECURE_RECV)) { int sbytes = (int) bytes; err_status_t stat = 0; @@ -1020,7 +1060,8 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ rtp_session->recv_ctx = NULL; if ((stat = srtp_create(&rtp_session->recv_ctx, &rtp_session->recv_policy))) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error! RE-Activating Secure RTP RECV\n"); - return -1; + ret = -1; + goto end; } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "RE-Activating Secure RTP RECV\n"); rtp_session->srtp_errs = 0; @@ -1035,7 +1076,8 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "error: srtp unprotection failed with code %d%s\n", stat, stat == err_status_replay_fail ? " (replay check failed)" : stat == err_status_auth_fail ? " (auth check failed)" : ""); - return -1; + ret = -1; + goto end; } else { sbytes = 0; } @@ -1075,11 +1117,13 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ *flags |= SFF_CNG; /* Return a CNG frame */ *payload_type = SWITCH_RTP_CNG_PAYLOAD; - return 2 + rtp_header_len; + ret = 2 + rtp_header_len; + goto end; } if (bytes < 0) { - return (int) bytes; + ret = (int) bytes; + goto end; } if (rtp_session->timer.interval) { @@ -1101,7 +1145,8 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ do_2833(rtp_session); if (!bytes && rtp_session->max_missed_packets) { if (++rtp_session->missed_count >= rtp_session->max_missed_packets) { - return -2; + ret = -2; + goto end; } } @@ -1126,7 +1171,8 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ rtp_session->recv_msg.header.pt = (uint32_t) rtp_session->cng_pt ? rtp_session->cng_pt : SWITCH_RTP_CNG_PAYLOAD; *flags |= SFF_CNG; *payload_type = (switch_payload_t)rtp_session->recv_msg.header.pt; - return 2 + rtp_header_len; + ret = 2 + rtp_header_len; + goto end; } } @@ -1146,7 +1192,8 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ rtp_session->recv_msg.header.pt = (uint32_t) rtp_session->cng_pt ? rtp_session->cng_pt : SWITCH_RTP_CNG_PAYLOAD; *flags |= SFF_CNG; *payload_type = (switch_payload_t)rtp_session->recv_msg.header.pt; - return 2 + rtp_header_len; + ret = 2 + rtp_header_len; + goto end; } if (bytes > 0) { @@ -1157,7 +1204,8 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ if (switch_test_flag(rtp_session, SWITCH_RTP_FLAG_DATAWAIT)) { goto do_continue; } - return 0; + ret = 0; + goto end; } if (bytes && switch_test_flag(rtp_session, SWITCH_RTP_FLAG_AUTOADJ) && switch_sockaddr_get_port(rtp_session->from_addr)) { @@ -1255,7 +1303,12 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ do_continue: - switch_yield(1000); + if (rtp_session->ms_per_packet) { + switch_yield((rtp_session->ms_per_packet / 1000) * 750); + } else { + switch_yield(1000); + } + } *payload_type = (switch_payload_t) rtp_session->recv_msg.header.pt; @@ -1268,7 +1321,13 @@ static int rtp_common_read(switch_rtp_t *rtp_session, switch_payload_t *payload_ do_2833(rtp_session); } - return (int) bytes; + ret = (int) bytes; + + end: + + READ_DEC(rtp_session); + + return ret; } SWITCH_DECLARE(switch_size_t) switch_rtp_has_dtmf(switch_rtp_t *rtp_session) @@ -1453,11 +1512,14 @@ static int rtp_common_write(switch_rtp_t *rtp_session, switch_size_t bytes; uint8_t send = 1; uint32_t this_ts = 0; + int ret; if (!switch_rtp_ready(rtp_session)) { return SWITCH_STATUS_FALSE; } + WRITE_INC(rtp_session); + if (send_msg) { bytes = datalen; if (flags && *flags & SFF_RFC2833) { @@ -1600,7 +1662,8 @@ static int rtp_common_write(switch_rtp_t *rtp_session, } } } else { - return SWITCH_STATUS_GENERR; + ret = -1; + goto end; } } @@ -1625,7 +1688,8 @@ static int rtp_common_write(switch_rtp_t *rtp_session, rtp_session->send_ctx = NULL; if ((stat = srtp_create(&rtp_session->send_ctx, &rtp_session->send_policy))) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error! RE-Activating Secure RTP SEND\n"); - return -1; + ret = -1; + goto end; } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "RE-Activating Secure RTP SEND\n"); } @@ -1642,7 +1706,8 @@ static int rtp_common_write(switch_rtp_t *rtp_session, if (switch_socket_sendto(rtp_session->sock, rtp_session->remote_addr, 0, (void *) send_msg, &bytes) != SWITCH_STATUS_SUCCESS) { rtp_session->seq--; - return -1; + ret = -1; + goto end; } if (rtp_session->timer.interval) { @@ -1654,11 +1719,19 @@ static int rtp_common_write(switch_rtp_t *rtp_session, if (rtp_session->ice_user) { if (ice_out(rtp_session) != SWITCH_STATUS_SUCCESS) { - return -1; + ret = -1; + goto end; } } - return (int) bytes; + ret = (int) bytes; + + end: + + WRITE_DEC(rtp_session); + + return ret; + } @@ -1729,16 +1802,12 @@ SWITCH_DECLARE(int) switch_rtp_write_frame(switch_rtp_t *rtp_session, switch_fra switch_payload_t payload; rtp_msg_t *send_msg = NULL; - if (!switch_rtp_ready(rtp_session)) { + if (!switch_rtp_ready(rtp_session) || !rtp_session->remote_addr) { return -1; } fwd = (switch_test_flag(rtp_session, SWITCH_RTP_FLAG_RAW_WRITE) && switch_test_flag(frame, SFF_RAW_RTP)) ? 1 : 0; - if (!switch_test_flag(rtp_session, SWITCH_RTP_FLAG_IO) || !rtp_session->remote_addr) { - return -1; - } - switch_assert(frame != NULL); if (switch_test_flag(frame, SFF_CNG)) { @@ -1771,18 +1840,13 @@ SWITCH_DECLARE(int) switch_rtp_write_manual(switch_rtp_t *rtp_session, uint8_t m, switch_payload_t payload, uint32_t ts, switch_frame_flag_t *flags) { switch_size_t bytes; + int ret = -1; - if (!switch_rtp_ready(rtp_session)) { + if (!switch_rtp_ready(rtp_session) || !rtp_session->remote_addr || datalen > SWITCH_RTP_MAX_BUF_LEN) { return -1; } - if (!switch_test_flag(rtp_session, SWITCH_RTP_FLAG_IO) || !rtp_session->remote_addr) { - return -1; - } - - if (datalen > SWITCH_RTP_MAX_BUF_LEN) { - return -1; - } + WRITE_INC(rtp_session); rtp_session->write_msg = rtp_session->send_msg; rtp_session->write_msg.header.seq = htons(++rtp_session->seq); @@ -1803,7 +1867,8 @@ SWITCH_DECLARE(int) switch_rtp_write_manual(switch_rtp_t *rtp_session, rtp_session->send_ctx = NULL; if ((stat = srtp_create(&rtp_session->send_ctx, &rtp_session->send_policy))) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error! RE-Activating Secure RTP SEND\n"); - return -1; + ret = -1; + goto end; } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "RE-Activating Secure RTP SEND\n"); } @@ -1818,12 +1883,20 @@ SWITCH_DECLARE(int) switch_rtp_write_manual(switch_rtp_t *rtp_session, if (switch_socket_sendto(rtp_session->sock, rtp_session->remote_addr, 0, (void *) &rtp_session->write_msg, &bytes) != SWITCH_STATUS_SUCCESS) { rtp_session->seq--; - return -1; + ret = -1; + goto end; } rtp_session->last_write_ts = ts; - return (int) bytes; + ret = (int) bytes; + + end: + + WRITE_DEC(rtp_session); + + return ret; + } SWITCH_DECLARE(uint32_t) switch_rtp_get_ssrc(switch_rtp_t *rtp_session)