From f4431128a0b1d75e86a95fd4814133cd4e96fb64 Mon Sep 17 00:00:00 2001 From: Dragos Oancea Date: Tue, 19 Dec 2023 02:32:42 +0200 Subject: [PATCH] [mod_event_socket] Add event stats, introduce FS CLI commands event_socket_debug, event_socket_queue_size, event_socket_show_stats. --- .../mod_event_socket/mod_event_socket.c | 125 +++++++++++++++++- 1 file changed, 120 insertions(+), 5 deletions(-) diff --git a/src/mod/event_handlers/mod_event_socket/mod_event_socket.c b/src/mod/event_handlers/mod_event_socket/mod_event_socket.c index 520bd92ac7..43f4ed8866 100644 --- a/src/mod/event_handlers/mod_event_socket/mod_event_socket.c +++ b/src/mod/event_handlers/mod_event_socket/mod_event_socket.c @@ -32,8 +32,10 @@ */ #include #define CMD_BUFLEN 1024 * 1000 -#define MAX_QUEUE_LEN 100000 +#define MAX_QUEUE_LEN SWITCH_CORE_QUEUE_LEN /*100000*/ #define MAX_MISSED 500 +#define QUEUE_SMOOTH_LIMIT 5000 + SWITCH_MODULE_LOAD_FUNCTION(mod_event_socket_load); SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_event_socket_shutdown); SWITCH_MODULE_RUNTIME_FUNCTION(mod_event_socket_runtime); @@ -87,6 +89,8 @@ struct listener { switch_core_session_t *session; int lost_events; int lost_logs; + uint64_t total_logs; + uint64_t event_stats[SWITCH_EVENT_ALL + 1]; time_t last_flush; time_t expire_time; uint32_t timeout; @@ -108,6 +112,7 @@ static struct { switch_mutex_t *listener_mutex; switch_event_node_t *node; int debug; + uint8_t enable_stats; } globals; static struct { @@ -176,6 +181,7 @@ static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_l if (switch_test_flag(l, LFLAG_LOG) && l->level >= node->level) { switch_log_node_t *dnode = switch_log_node_dup(node); qstatus = switch_queue_trypush(l->log_queue, dnode); + l->total_logs++; if (qstatus == SWITCH_STATUS_SUCCESS) { if (l->lost_logs) { int ll = l->lost_logs; @@ -225,6 +231,26 @@ static void flush_listener(listener_t *listener, switch_bool_t flush_log, switch } } +static void count_events(listener_t *l, switch_event_t *event) +{ + l->event_stats[SWITCH_EVENT_ALL]++; + l->event_stats[event->event_id]++; +} + +static void print_event_stats(listener_t *l, switch_log_level_t level) +{ + int i; + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(l->session), level, + "Current Event Queue size: [%u/%u] Total events / session: [%lu]\n", + switch_queue_size(l->event_queue), MAX_QUEUE_LEN, (unsigned long)l->event_stats[SWITCH_EVENT_ALL]); + + for (i = 0; i < SWITCH_EVENT_ALL; i++) { + if ((l->event_stats[i]) != 0) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(l->session), level, "Event name: [%s] Count: [%lu]", switch_event_name(i), (unsigned long)l->event_stats[i]); + } + } +} + static switch_status_t expire_listener(listener_t ** listener) { listener_t *l; @@ -384,21 +410,37 @@ static void event_handler(switch_event_t *event) if (send) { if (switch_event_dup(&clone, event) == SWITCH_STATUS_SUCCESS) { - qstatus = switch_queue_trypush(l->event_queue, clone); + qstatus = switch_queue_trypush(l->event_queue, clone); + if (globals.enable_stats) { + count_events(l, event); + } + if (qstatus == SWITCH_STATUS_SUCCESS) { if (l->lost_events) { int le = l->lost_events; l->lost_events = 0; - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(l->session), SWITCH_LOG_CRIT, "Lost [%d] events! Event Queue size: [%u/%u]\n", le, switch_queue_size(l->event_queue), MAX_QUEUE_LEN); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(l->session), SWITCH_LOG_CRIT, "Lost [%d] events! Event Queue size: [%u/%u] Total events / session: [%lu]\n", + le, switch_queue_size(l->event_queue), MAX_QUEUE_LEN, (unsigned long)l->event_stats[SWITCH_EVENT_ALL]); + } + + if (switch_queue_size(l->event_queue) > MAX_QUEUE_LEN/2) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(l->session), SWITCH_LOG_WARNING, "Event Queue surpassed its half length [%u/%u]\n", + switch_queue_size(l->event_queue), MAX_QUEUE_LEN); + } + + if (switch_queue_size(l->event_queue) > QUEUE_SMOOTH_LIMIT) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(l->session), SWITCH_LOG_WARNING, + "Event Queue over smooth limit [%u/%u] We're over uncharted territory now!\n", switch_queue_size(l->event_queue), MAX_QUEUE_LEN); } } else { char errbuf[512] = {0}; unsigned int qsize = switch_queue_size(l->event_queue); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Event enqueue ERROR [%d] | [%s] | Queue size: [%u/%u] %s\n", - (int)qstatus, switch_strerror(qstatus, errbuf, sizeof(errbuf)), qsize, MAX_QUEUE_LEN, (qsize == MAX_QUEUE_LEN)?"Max queue size reached":""); + (int)qstatus, switch_strerror(qstatus, errbuf, sizeof(errbuf)), qsize, MAX_QUEUE_LEN, (qsize == MAX_QUEUE_LEN) ? "Max queue size reached":""); if (++l->lost_events > MAX_MISSED) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Killing listener because of too many lost events. Lost [%d] Queue size[%u/%u]\n", l->lost_events, qsize, MAX_QUEUE_LEN); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Killing listener because of too many lost events! Lost [%d] Queue size[%u/%u] Total events / session: [%lu]\n", + l->lost_events, qsize, MAX_QUEUE_LEN, (unsigned long)l->event_stats[SWITCH_EVENT_ALL]); kill_listener(l, "killed listener because of lost events\n"); } switch_event_destroy(&clone); @@ -634,6 +676,13 @@ static void send_disconnect(listener_t *listener, const char *message) char disco_buf[512] = ""; switch_size_t len, mlen; + if (globals.enable_stats) { + if (!zstr(listener->remote_ip)) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(listener->session), SWITCH_LOG_INFO, "Event stats for %s:%d\n", listener->remote_ip, listener->remote_port); + } + print_event_stats(listener, SWITCH_LOG_INFO); + } + if (zstr(message)) { message = "Disconnected.\n"; } @@ -1176,6 +1225,67 @@ SWITCH_STANDARD_API(event_sink_function) return SWITCH_STATUS_SUCCESS; } +#define EVENT_SOCKET_DEBUG_SYNTAX "" + +SWITCH_STANDARD_API(event_socket_debug) +{ + if (zstr(cmd)) { + stream->write_function(stream, "-USAGE: %s\n", EVENT_SOCKET_DEBUG_SYNTAX); + } else { + if (!strcasecmp(cmd, "on")) { + globals.debug = 1; + stream->write_function(stream, "Event Socket Debug: on\n"); + } else if (!strcasecmp(cmd, "off")) { + globals.debug = 0; + stream->write_function(stream, "Event Socket Debug: off\n"); + } else { + stream->write_function(stream, "-USAGE: %s\n", EVENT_SOCKET_DEBUG_SYNTAX); + } + } + + return SWITCH_STATUS_SUCCESS; +} + +SWITCH_STANDARD_API(event_socket_q_size) +{ + listener_t *l; + + switch_mutex_lock(globals.listener_mutex); + for (l = listen_list.listeners; l; l = l->next) { + stream->write_function(stream, + "Current Event Queue [%u/%u] for Session %d %s:%d \n", switch_queue_size(l->event_queue), MAX_QUEUE_LEN, l->id, l->remote_ip, l->remote_port); + } + switch_mutex_unlock(globals.listener_mutex); + + return SWITCH_STATUS_SUCCESS; +} + +SWITCH_STANDARD_API(event_socket_show_stats) +{ + listener_t *l; + int i; + + switch_mutex_lock(globals.listener_mutex); + for (l = listen_list.listeners; l; l = l->next) { + if (!zstr(l->remote_ip)) { + stream->write_function(stream, + "Event Stats for Session %d %s:%d \n", l->id, l->remote_ip, l->remote_port); + } + + stream->write_function(stream, + "Current Event Queue size: [%u/%u] Total events / session: [%lu]\n", + switch_queue_size(l->event_queue), MAX_QUEUE_LEN, (unsigned long)l->event_stats[SWITCH_EVENT_ALL]); + + for (i = 0; i < SWITCH_EVENT_ALL; i++) { + if (l->event_stats[i] != 0) { + stream->write_function(stream, "Event name: [%s] Count: [%lu]\n", switch_event_name(i), l->event_stats[i]); + } + } + } + switch_mutex_unlock(globals.listener_mutex); + + return SWITCH_STATUS_SUCCESS; +} SWITCH_MODULE_LOAD_FUNCTION(mod_event_socket_load) { @@ -1200,6 +1310,9 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_event_socket_load) *module_interface = switch_loadable_module_create_module_interface(pool, modname); SWITCH_ADD_APP(app_interface, "socket", "Connect to a socket", "Connect to a socket", socket_function, "[:]", SAF_SUPPORT_NOMEDIA); SWITCH_ADD_API(api_interface, "event_sink", "event_sink", event_sink_function, ""); + SWITCH_ADD_API(api_interface, "event_socket_debug", "Set Event Socket Debug", event_socket_debug, EVENT_SOCKET_DEBUG_SYNTAX); + SWITCH_ADD_API(api_interface, "event_socket_queue_size", "Show Event Socket current queue size", event_socket_q_size, NULL); + SWITCH_ADD_API(api_interface, "event_socket_show_stats", "Show Event Socket current stats", event_socket_show_stats, NULL); /* indicate that the module should continue to be loaded */ return SWITCH_STATUS_SUCCESS; @@ -2887,6 +3000,8 @@ static int config(void) set_pref_ip(val); } else if (!strcmp(var, "debug")) { globals.debug = atoi(val); + } else if (!strcmp(var, "enable_stats")) { + globals.enable_stats = atoi(val); } else if (!strcmp(var, "nat-map")) { if (switch_true(val) && switch_nat_get_type()) { prefs.nat_map = 1;