[mod_event_socket] Add event stats, introduce FS CLI commands event_socket_debug, event_socket_queue_size, event_socket_show_stats.

This commit is contained in:
Dragos Oancea 2023-12-19 02:32:42 +02:00
parent c26b5cdfa3
commit f4431128a0
1 changed files with 120 additions and 5 deletions

View File

@ -32,8 +32,10 @@
*/ */
#include <switch.h> #include <switch.h>
#define CMD_BUFLEN 1024 * 1000 #define CMD_BUFLEN 1024 * 1000
#define MAX_QUEUE_LEN 100000 #define MAX_QUEUE_LEN SWITCH_CORE_QUEUE_LEN /*100000*/
#define MAX_MISSED 500 #define MAX_MISSED 500
#define QUEUE_SMOOTH_LIMIT 5000
SWITCH_MODULE_LOAD_FUNCTION(mod_event_socket_load); SWITCH_MODULE_LOAD_FUNCTION(mod_event_socket_load);
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_event_socket_shutdown); SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_event_socket_shutdown);
SWITCH_MODULE_RUNTIME_FUNCTION(mod_event_socket_runtime); SWITCH_MODULE_RUNTIME_FUNCTION(mod_event_socket_runtime);
@ -87,6 +89,8 @@ struct listener {
switch_core_session_t *session; switch_core_session_t *session;
int lost_events; int lost_events;
int lost_logs; int lost_logs;
uint64_t total_logs;
uint64_t event_stats[SWITCH_EVENT_ALL + 1];
time_t last_flush; time_t last_flush;
time_t expire_time; time_t expire_time;
uint32_t timeout; uint32_t timeout;
@ -108,6 +112,7 @@ static struct {
switch_mutex_t *listener_mutex; switch_mutex_t *listener_mutex;
switch_event_node_t *node; switch_event_node_t *node;
int debug; int debug;
uint8_t enable_stats;
} globals; } globals;
static struct { static struct {
@ -176,6 +181,7 @@ static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_l
if (switch_test_flag(l, LFLAG_LOG) && l->level >= node->level) { if (switch_test_flag(l, LFLAG_LOG) && l->level >= node->level) {
switch_log_node_t *dnode = switch_log_node_dup(node); switch_log_node_t *dnode = switch_log_node_dup(node);
qstatus = switch_queue_trypush(l->log_queue, dnode); qstatus = switch_queue_trypush(l->log_queue, dnode);
l->total_logs++;
if (qstatus == SWITCH_STATUS_SUCCESS) { if (qstatus == SWITCH_STATUS_SUCCESS) {
if (l->lost_logs) { if (l->lost_logs) {
int ll = l->lost_logs; int ll = l->lost_logs;
@ -225,6 +231,26 @@ static void flush_listener(listener_t *listener, switch_bool_t flush_log, switch
} }
} }
static void count_events(listener_t *l, switch_event_t *event)
{
l->event_stats[SWITCH_EVENT_ALL]++;
l->event_stats[event->event_id]++;
}
static void print_event_stats(listener_t *l, switch_log_level_t level)
{
int i;
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(l->session), level,
"Current Event Queue size: [%u/%u] Total events / session: [%lu]\n",
switch_queue_size(l->event_queue), MAX_QUEUE_LEN, (unsigned long)l->event_stats[SWITCH_EVENT_ALL]);
for (i = 0; i < SWITCH_EVENT_ALL; i++) {
if ((l->event_stats[i]) != 0) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(l->session), level, "Event name: [%s] Count: [%lu]", switch_event_name(i), (unsigned long)l->event_stats[i]);
}
}
}
static switch_status_t expire_listener(listener_t ** listener) static switch_status_t expire_listener(listener_t ** listener)
{ {
listener_t *l; listener_t *l;
@ -384,21 +410,37 @@ static void event_handler(switch_event_t *event)
if (send) { if (send) {
if (switch_event_dup(&clone, event) == SWITCH_STATUS_SUCCESS) { if (switch_event_dup(&clone, event) == SWITCH_STATUS_SUCCESS) {
qstatus = switch_queue_trypush(l->event_queue, clone); qstatus = switch_queue_trypush(l->event_queue, clone);
if (globals.enable_stats) {
count_events(l, event);
}
if (qstatus == SWITCH_STATUS_SUCCESS) { if (qstatus == SWITCH_STATUS_SUCCESS) {
if (l->lost_events) { if (l->lost_events) {
int le = l->lost_events; int le = l->lost_events;
l->lost_events = 0; l->lost_events = 0;
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(l->session), SWITCH_LOG_CRIT, "Lost [%d] events! Event Queue size: [%u/%u]\n", le, switch_queue_size(l->event_queue), MAX_QUEUE_LEN); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(l->session), SWITCH_LOG_CRIT, "Lost [%d] events! Event Queue size: [%u/%u] Total events / session: [%lu]\n",
le, switch_queue_size(l->event_queue), MAX_QUEUE_LEN, (unsigned long)l->event_stats[SWITCH_EVENT_ALL]);
}
if (switch_queue_size(l->event_queue) > MAX_QUEUE_LEN/2) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(l->session), SWITCH_LOG_WARNING, "Event Queue surpassed its half length [%u/%u]\n",
switch_queue_size(l->event_queue), MAX_QUEUE_LEN);
}
if (switch_queue_size(l->event_queue) > QUEUE_SMOOTH_LIMIT) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(l->session), SWITCH_LOG_WARNING,
"Event Queue over smooth limit [%u/%u] We're over uncharted territory now!\n", switch_queue_size(l->event_queue), MAX_QUEUE_LEN);
} }
} else { } else {
char errbuf[512] = {0}; char errbuf[512] = {0};
unsigned int qsize = switch_queue_size(l->event_queue); unsigned int qsize = switch_queue_size(l->event_queue);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
"Event enqueue ERROR [%d] | [%s] | Queue size: [%u/%u] %s\n", "Event enqueue ERROR [%d] | [%s] | Queue size: [%u/%u] %s\n",
(int)qstatus, switch_strerror(qstatus, errbuf, sizeof(errbuf)), qsize, MAX_QUEUE_LEN, (qsize == MAX_QUEUE_LEN)?"Max queue size reached":""); (int)qstatus, switch_strerror(qstatus, errbuf, sizeof(errbuf)), qsize, MAX_QUEUE_LEN, (qsize == MAX_QUEUE_LEN) ? "Max queue size reached":"");
if (++l->lost_events > MAX_MISSED) { if (++l->lost_events > MAX_MISSED) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Killing listener because of too many lost events. Lost [%d] Queue size[%u/%u]\n", l->lost_events, qsize, MAX_QUEUE_LEN); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Killing listener because of too many lost events! Lost [%d] Queue size[%u/%u] Total events / session: [%lu]\n",
l->lost_events, qsize, MAX_QUEUE_LEN, (unsigned long)l->event_stats[SWITCH_EVENT_ALL]);
kill_listener(l, "killed listener because of lost events\n"); kill_listener(l, "killed listener because of lost events\n");
} }
switch_event_destroy(&clone); switch_event_destroy(&clone);
@ -634,6 +676,13 @@ static void send_disconnect(listener_t *listener, const char *message)
char disco_buf[512] = ""; char disco_buf[512] = "";
switch_size_t len, mlen; switch_size_t len, mlen;
if (globals.enable_stats) {
if (!zstr(listener->remote_ip)) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(listener->session), SWITCH_LOG_INFO, "Event stats for %s:%d\n", listener->remote_ip, listener->remote_port);
}
print_event_stats(listener, SWITCH_LOG_INFO);
}
if (zstr(message)) { if (zstr(message)) {
message = "Disconnected.\n"; message = "Disconnected.\n";
} }
@ -1176,6 +1225,67 @@ SWITCH_STANDARD_API(event_sink_function)
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
#define EVENT_SOCKET_DEBUG_SYNTAX "<on|off>"
SWITCH_STANDARD_API(event_socket_debug)
{
if (zstr(cmd)) {
stream->write_function(stream, "-USAGE: %s\n", EVENT_SOCKET_DEBUG_SYNTAX);
} else {
if (!strcasecmp(cmd, "on")) {
globals.debug = 1;
stream->write_function(stream, "Event Socket Debug: on\n");
} else if (!strcasecmp(cmd, "off")) {
globals.debug = 0;
stream->write_function(stream, "Event Socket Debug: off\n");
} else {
stream->write_function(stream, "-USAGE: %s\n", EVENT_SOCKET_DEBUG_SYNTAX);
}
}
return SWITCH_STATUS_SUCCESS;
}
SWITCH_STANDARD_API(event_socket_q_size)
{
listener_t *l;
switch_mutex_lock(globals.listener_mutex);
for (l = listen_list.listeners; l; l = l->next) {
stream->write_function(stream,
"Current Event Queue [%u/%u] for Session %d %s:%d \n", switch_queue_size(l->event_queue), MAX_QUEUE_LEN, l->id, l->remote_ip, l->remote_port);
}
switch_mutex_unlock(globals.listener_mutex);
return SWITCH_STATUS_SUCCESS;
}
SWITCH_STANDARD_API(event_socket_show_stats)
{
listener_t *l;
int i;
switch_mutex_lock(globals.listener_mutex);
for (l = listen_list.listeners; l; l = l->next) {
if (!zstr(l->remote_ip)) {
stream->write_function(stream,
"Event Stats for Session %d %s:%d \n", l->id, l->remote_ip, l->remote_port);
}
stream->write_function(stream,
"Current Event Queue size: [%u/%u] Total events / session: [%lu]\n",
switch_queue_size(l->event_queue), MAX_QUEUE_LEN, (unsigned long)l->event_stats[SWITCH_EVENT_ALL]);
for (i = 0; i < SWITCH_EVENT_ALL; i++) {
if (l->event_stats[i] != 0) {
stream->write_function(stream, "Event name: [%s] Count: [%lu]\n", switch_event_name(i), l->event_stats[i]);
}
}
}
switch_mutex_unlock(globals.listener_mutex);
return SWITCH_STATUS_SUCCESS;
}
SWITCH_MODULE_LOAD_FUNCTION(mod_event_socket_load) SWITCH_MODULE_LOAD_FUNCTION(mod_event_socket_load)
{ {
@ -1200,6 +1310,9 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_event_socket_load)
*module_interface = switch_loadable_module_create_module_interface(pool, modname); *module_interface = switch_loadable_module_create_module_interface(pool, modname);
SWITCH_ADD_APP(app_interface, "socket", "Connect to a socket", "Connect to a socket", socket_function, "<ip>[:<port>]", SAF_SUPPORT_NOMEDIA); SWITCH_ADD_APP(app_interface, "socket", "Connect to a socket", "Connect to a socket", socket_function, "<ip>[:<port>]", SAF_SUPPORT_NOMEDIA);
SWITCH_ADD_API(api_interface, "event_sink", "event_sink", event_sink_function, "<web data>"); SWITCH_ADD_API(api_interface, "event_sink", "event_sink", event_sink_function, "<web data>");
SWITCH_ADD_API(api_interface, "event_socket_debug", "Set Event Socket Debug", event_socket_debug, EVENT_SOCKET_DEBUG_SYNTAX);
SWITCH_ADD_API(api_interface, "event_socket_queue_size", "Show Event Socket current queue size", event_socket_q_size, NULL);
SWITCH_ADD_API(api_interface, "event_socket_show_stats", "Show Event Socket current stats", event_socket_show_stats, NULL);
/* indicate that the module should continue to be loaded */ /* indicate that the module should continue to be loaded */
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
@ -2887,6 +3000,8 @@ static int config(void)
set_pref_ip(val); set_pref_ip(val);
} else if (!strcmp(var, "debug")) { } else if (!strcmp(var, "debug")) {
globals.debug = atoi(val); globals.debug = atoi(val);
} else if (!strcmp(var, "enable_stats")) {
globals.enable_stats = atoi(val);
} else if (!strcmp(var, "nat-map")) { } else if (!strcmp(var, "nat-map")) {
if (switch_true(val) && switch_nat_get_type()) { if (switch_true(val) && switch_nat_get_type()) {
prefs.nat_map = 1; prefs.nat_map = 1;