packet transmission reporting

This commit is contained in:
jchavanton 2023-08-25 19:54:29 +00:00 committed by Julien Chavanton
parent d91947f587
commit 907a957662
8 changed files with 222 additions and 4 deletions

View File

@ -170,6 +170,8 @@ struct switch_core_session {
switch_buffer_t *text_line_buffer;
switch_mutex_t *text_mutex;
const char *external_id;
packet_stats_t stats;
};
struct switch_media_bug {

View File

@ -90,6 +90,30 @@ typedef struct device_uuid_node_s {
struct device_uuid_node_s *next;
} switch_device_node_t;
typedef struct packet_stats_io_info {
const char* in_callid;
char* in_codec;
uint32_t in_ssrc;
switch_sockaddr_t *in_remote_addr;
switch_sockaddr_t *in_local_addr;
const char* out_callid;
char* out_codec;
uint32_t out_ssrc;
switch_sockaddr_t *out_remote_addr;
switch_sockaddr_t *out_local_addr;
uint32_t count; // count of packets going out
} packet_stats_io_info_t;
typedef struct packet_stats {
int max;
float average;
uint32_t in_count;
uint32_t in_plc;
uint32_t count;
packet_stats_io_info_t io_info;
switch_bool_t reported;
} packet_stats_t;
typedef struct switch_device_stats_s {
uint32_t total;
uint32_t total_in;
@ -255,6 +279,10 @@ static inline void *switch_must_realloc(void *_b, size_t _z)
///\{
SWITCH_DECLARE(void) switch_core_session_increment_read(switch_core_session_t *session);
SWITCH_DECLARE(void) switch_core_session_increment_plc(switch_core_session_t *session);
SWITCH_DECLARE(void) packet_stats_print(switch_core_session_t *session);
SWITCH_DECLARE(void) switch_core_session_set_io_stats(switch_core_session_t *session, packet_stats_io_info_t *packet_stats_io_info);
SWITCH_DECLARE(void) switch_core_screen_size(int *x, int *y);
SWITCH_DECLARE(void) switch_core_session_sched_heartbeat(switch_core_session_t *session, uint32_t seconds);
SWITCH_DECLARE(void) switch_core_session_unsched_heartbeat(switch_core_session_t *session);

View File

@ -87,6 +87,7 @@ typedef struct switch_frame_geometry {
payload_map_t *pmap;
switch_image_t *img;
struct switch_frame_geometry geometry;
switch_time_t received_ts;
};
SWITCH_END_EXTERN_C

View File

@ -57,6 +57,7 @@ typedef struct {
char body[SWITCH_RTP_MAX_BUF_LEN+4+sizeof(char *)];
switch_rtp_hdr_ext_t *ext;
char *ebody;
switch_time_t received_ts;
} switch_rtp_packet_t;
typedef enum {

View File

@ -35,6 +35,7 @@
#include <switch_stun.h>
#include <switch_nat.h>
#include "private/switch_core_pvt.h"
#include <fspr_network_io.h>
#include <switch_curl.h>
#include <errno.h>
#include <sofia-sip/sdp.h>
@ -274,6 +275,71 @@ struct switch_media_handle_s {
};
void packet_stats_init(packet_stats_t *stats, int latency, int count) {
stats->max = latency;
stats->average = latency;
stats->count = count;
}
#define _VOR1(v) ((v)?(v):1)
void packet_stats_update(packet_stats_t *stats, int latency)
{
if (stats->count >= UINT32_MAX)
return;
stats->count++;
if (stats->count == 1)
packet_stats_init(stats, latency, 1);
if (stats->max < latency)
stats->max = latency;
if (stats->count > 1) {
float delta;
delta = latency - stats->average;
stats->average += delta/_VOR1(stats->count);
}
}
void packet_stats_print(switch_core_session_t *session) {
if (session->stats.io_info.in_remote_addr && session->stats.io_info.out_local_addr
&& !session->stats.reported) {
char in_ipbuf[48];
char in_l_ipbuf[48];
char out_ipbuf[48];
char out_l_ipbuf[48];
const char *v=NULL;
switch_channel_set_variable_printf(session->channel, "packet_stats_report",
"{"
"\"in\" : { \"ssrc\": \"0x%08X\", \"remote_socket\": \"%s:%u\", \"local_socket\": \"%s:%u\""
", \"codec\": \"%s\", \"count\": %u, \"plc\": %u}"
","
"\"out\" : { \"ssrc\": \"0x%08X\", \"remote_socket\": \"%s:%u\", \"local_socket\": \"%s:%u\""
", \"codec\": \"%s\", \"count\": %u, \"max\": %d, \"avg\": %.2f }}",
session->stats.io_info.in_ssrc,
switch_get_addr(in_ipbuf, sizeof(in_ipbuf), session->stats.io_info.in_remote_addr),
session->stats.io_info.in_remote_addr->port,
switch_get_addr(in_l_ipbuf, sizeof(in_ipbuf), session->stats.io_info.in_local_addr),
session->stats.io_info.in_local_addr->port,
session->stats.io_info.in_codec,
session->stats.in_count,
session->stats.in_plc,
session->stats.io_info.out_ssrc,
switch_get_addr(out_ipbuf, sizeof(out_ipbuf), session->stats.io_info.out_remote_addr),
session->stats.io_info.out_local_addr->port,
switch_get_addr(out_l_ipbuf, sizeof(out_ipbuf), session->stats.io_info.out_local_addr),
session->stats.io_info.out_remote_addr->port,
session->stats.io_info.out_codec,
session->stats.count,
session->stats.max,
session->stats.average
);
v = switch_channel_get_variable_dup(session->channel,"packet_stats_report", SWITCH_FALSE, -1);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "[packet_stats_report] %s\n", v);
session->stats.reported = true;
}
}
switch_srtp_crypto_suite_t SUITES[CRYPTO_INVALID] = {
{ "AEAD_AES_256_GCM_8", "", AEAD_AES_256_GCM_8, 44, 12},
{ "AEAD_AES_256_GCM", "", AEAD_AES_256_GCM, 44, 12},
@ -15763,7 +15829,6 @@ SWITCH_DECLARE(switch_msrp_session_t *) switch_core_media_get_msrp_session(switc
return session->media_handle->msrp_session;
}
SWITCH_DECLARE(switch_status_t) switch_core_session_write_frame(switch_core_session_t *session, switch_frame_t *frame, switch_io_flag_t flags,
int stream_id)
{
@ -16122,6 +16187,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_write_frame(switch_core_sess
goto done;
}
write_frame->received_ts = frame->received_ts;
if (session->write_codec) {
if (!ptime_mismatch && write_frame->codec && write_frame->codec->implementation &&
write_frame->codec->implementation->decoded_bytes_per_packet == session->write_impl.decoded_bytes_per_packet) {
@ -16397,6 +16463,16 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_write_frame(switch_core_sess
}
error:
if (frame->received_ts > 0) {
int64_t d = (switch_micro_time_now() - frame->received_ts)/1000;
if (d > 2000) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "excessive delay[%ld]-[%ld]=[%ldms] seq[%u]ssrc[0x%08X]\n",
(int64_t)(switch_micro_time_now()/1000), (int64_t)(frame->received_ts/1000), d, ntohs(frame->seq), frame->ssrc);
}
packet_stats_update(&session->stats, d);
}
session->stats.io_info.out_codec = write_frame->codec->implementation->iananame;
session->stats.io_info.in_codec = frame->codec->implementation->iananame;
switch_mutex_unlock(session->write_codec->mutex);
switch_mutex_unlock(frame->codec->mutex);

View File

@ -36,11 +36,43 @@
#include "switch.h"
#include "switch_core.h"
#include "private/switch_core_pvt.h"
#include <fspr_network_io.h>
#define DEBUG_THREAD_POOL
struct switch_session_manager session_manager;
SWITCH_DECLARE(void) switch_core_session_increment_plc(switch_core_session_t *session)
{
session->stats.in_plc++;
}
SWITCH_DECLARE(void) switch_core_session_increment_read(switch_core_session_t *session)
{
session->stats.in_count++;
}
SWITCH_DECLARE(void) switch_core_session_set_io_stats(switch_core_session_t *session, packet_stats_io_info_t *packet_stats_io_info)
{
if ((session->stats.io_info.out_ssrc != 0 && session->stats.io_info.out_ssrc != packet_stats_io_info->out_ssrc)
|| (session->stats.io_info.in_ssrc != 0 && session->stats.io_info.in_ssrc != packet_stats_io_info->in_ssrc)) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "SSRC change[%u][%u] [%u][%u]\n",
session->stats.io_info.out_ssrc, packet_stats_io_info->out_ssrc,
session->stats.io_info.in_ssrc, packet_stats_io_info->in_ssrc
);
packet_stats_print(session);
memset(&session->stats, '\0', sizeof(packet_stats_t));
}
session->stats.io_info.out_remote_addr = packet_stats_io_info->out_remote_addr;
session->stats.io_info.out_local_addr = packet_stats_io_info->out_local_addr;
session->stats.io_info.out_ssrc = packet_stats_io_info->out_ssrc;
session->stats.io_info.out_callid = packet_stats_io_info->out_callid;
session->stats.io_info.in_remote_addr = packet_stats_io_info->in_remote_addr;
session->stats.io_info.in_local_addr = packet_stats_io_info->in_local_addr;
session->stats.io_info.in_ssrc = packet_stats_io_info->in_ssrc;
session->stats.io_info.in_callid = packet_stats_io_info->in_callid;
}
SWITCH_DECLARE(void) switch_core_session_set_dmachine(switch_core_session_t *session, switch_ivr_dmachine_t *dmachine, switch_digit_action_target_t target)
{
int i = (int) target;
@ -1493,6 +1525,7 @@ SWITCH_DECLARE(void) switch_core_session_signal_state_change(switch_core_session
}
}
}
packet_stats_print(session);
switch_core_session_kill_channel(session, SWITCH_SIG_BREAK);
}

View File

@ -814,6 +814,14 @@ static void *audio_bridge_thread(switch_thread_t *thread, void *obj)
continue;
}
if (switch_test_flag((read_frame), SFF_PLC)) {
read_frame->received_ts = 0;
switch_core_session_increment_plc(session_b);
} else {
if (read_frame->packetlen > 0)
switch_core_session_increment_read(session_b);
}
if (status != SWITCH_STATUS_BREAK && !switch_channel_test_flag(chan_a, CF_HOLD) && !switch_channel_test_flag(chan_b, CF_LEG_HOLDING)) {
if (switch_core_session_write_frame(session_b, read_frame, SWITCH_IO_FLAG_NONE, stream_id) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session_a), SWITCH_LOG_DEBUG,

View File

@ -113,6 +113,7 @@ typedef struct {
char body[SWITCH_RTP_MAX_BUF_LEN+4+sizeof(char *)];
switch_rtp_hdr_ext_t *ext;
char *ebody;
switch_time_t received_ts;
} rtp_msg_t;
#define RTP_BODY(_s) (char *) (_s->recv_msg.ebody ? _s->recv_msg.ebody : _s->recv_msg.body)
@ -483,6 +484,7 @@ struct switch_rtp {
uint32_t last_max_vb_frames;
int skip_timer;
uint32_t prev_nacks_inflight;
switch_bool_t packet_stats_io_info_set;
};
struct switch_rtcp_report_block {
@ -4267,11 +4269,32 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_change_interval(switch_rtp_t *rtp_ses
return status;
}
static void reset_packet_stats_io_info(switch_rtp_t *rtp_session) {
switch_channel_t *channel = switch_core_session_get_channel(rtp_session->session);
const char *uuid=NULL;
switch_core_session_t *b_session=NULL;
switch_rtp_t *b_rtp_session=NULL;
rtp_session->packet_stats_io_info_set = SWITCH_FALSE;
if (channel) {
uuid = switch_channel_get_variable_dup(channel,"bridge_uuid", SWITCH_FALSE, -1);
}
if (uuid) {
b_session = switch_core_session_locate(uuid);
}
if (b_session) {
b_rtp_session = switch_core_media_get_rtp_session(b_session, SWITCH_MEDIA_TYPE_AUDIO);
b_rtp_session->packet_stats_io_info_set = SWITCH_FALSE;
switch_core_session_rwunlock(b_session);
}
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_INFO, ">>>> PACKET STATS IO INFO UNSET <<<<\n");
}
SWITCH_DECLARE(switch_status_t) switch_rtp_set_ssrc(switch_rtp_t *rtp_session, uint32_t ssrc)
{
rtp_session->ssrc = ssrc;
rtp_session->send_msg.header.ssrc = htonl(rtp_session->ssrc);
reset_packet_stats_io_info(rtp_session);
return SWITCH_STATUS_SUCCESS;
}
@ -4279,6 +4302,7 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_set_remote_ssrc(switch_rtp_t *rtp_ses
{
rtp_session->remote_ssrc = ssrc;
rtp_session->flags[SWITCH_RTP_FLAG_DETECT_SSRC] = 0;
reset_packet_stats_io_info(rtp_session);
return SWITCH_STATUS_SUCCESS;
}
@ -5651,6 +5675,46 @@ static int get_recv_payload(switch_rtp_t *rtp_session)
return r;
}
static void set_packet_stats_io_info(switch_rtp_t *rtp_session) {
switch_channel_t *channel = switch_core_session_get_channel(rtp_session->session);
const char *uuid=NULL;
switch_core_session_t *b_session=NULL;
switch_rtp_t *b_rtp_session=NULL;
switch_channel_t *b_channel=NULL;
if (rtp_session->packet_stats_io_info_set)
return;
if (channel) {
uuid = switch_channel_get_variable_dup(channel,"bridge_uuid", SWITCH_FALSE, -1);
}
if (uuid) {
b_session = switch_core_session_locate(uuid);
}
if (b_session) {
b_rtp_session = switch_core_media_get_rtp_session(b_session, SWITCH_MEDIA_TYPE_AUDIO);
b_channel = switch_core_session_get_channel(b_session);
switch_core_session_rwunlock(b_session);
if (b_rtp_session && b_channel) {
packet_stats_io_info_t packet_stats_io_info;
packet_stats_io_info.out_ssrc = b_rtp_session->ssrc;
packet_stats_io_info.out_codec = '\0';
packet_stats_io_info.out_remote_addr = b_rtp_session->remote_addr;
packet_stats_io_info.out_local_addr = b_rtp_session->local_addr;
packet_stats_io_info.in_ssrc = rtp_session->remote_ssrc;
packet_stats_io_info.in_codec = '\0';
packet_stats_io_info.in_remote_addr = rtp_session->remote_addr;
packet_stats_io_info.in_local_addr = rtp_session->local_addr;
if (rtp_session->remote_ssrc && b_rtp_session->ssrc)
rtp_session->packet_stats_io_info_set = SWITCH_TRUE;
switch_core_session_set_io_stats(b_rtp_session->session, &packet_stats_io_info);
}
}
}
#define return_cng_frame() do_cng = 1; goto timer_check
static switch_status_t read_rtp_packet(switch_rtp_t *rtp_session, switch_size_t *bytes, switch_frame_flag_t *flags,
@ -5719,6 +5783,9 @@ static switch_status_t read_rtp_packet(switch_rtp_t *rtp_session, switch_size_t
if (poll_status == SWITCH_STATUS_SUCCESS) {
status = switch_socket_recvfrom(rtp_session->from_addr, rtp_session->sock_input, 0, (void *) &rtp_session->recv_msg, bytes);
if (*bytes) {
rtp_session->recv_msg.received_ts = switch_micro_time_now();
}
} else {
*bytes = 0;
}
@ -6357,7 +6424,7 @@ static switch_status_t read_rtp_packet(switch_rtp_t *rtp_session, switch_size_t
}
}
}
set_packet_stats_io_info(rtp_session);
return status;
}
@ -7891,6 +7958,8 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_zerocopy_read_frame(switch_rtp_t *rtp
switch_set_flag(frame, SFF_RFC2833);
}
frame->timestamp = ntohl(rtp_session->last_rtp_hdr.ts);
frame->received_ts = rtp_session->recv_msg.received_ts;
frame->seq = (uint16_t) ntohs((uint16_t) rtp_session->last_rtp_hdr.seq);
frame->ssrc = ntohl(rtp_session->last_rtp_hdr.ssrc);
frame->m = rtp_session->last_rtp_hdr.m ? SWITCH_TRUE : SWITCH_FALSE;
@ -8298,7 +8367,7 @@ static int rtp_common_write(switch_rtp_t *rtp_session,
rtp_session->seq += delta;
send_msg->header.seq = htons(rtp_session->seq);
if (rtp_session->flags[SWITCH_RTP_FLAG_BYTESWAP] && send_msg->header.pt == rtp_session->payload) {
switch_swap_linear((int16_t *)send_msg->body, (int) datalen);
}