Merge pull request #528 from lazedo/kazoo-eventstream-time

[mod_kazoo] add event stream connected time
This commit is contained in:
Andrey Volk 2020-03-24 20:15:54 +04:00 committed by GitHub
commit 890dac9972
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 49 additions and 6 deletions

View File

@ -42,11 +42,13 @@
#define API_COMMAND_OPTION 4
#define API_NODE_OPTION_FRAMING 0
#define API_NODE_OPTION_LEGACY 1
#define API_NODE_OPTION_KEEPALIVE 1
#define API_NODE_OPTION_LEGACY 2
#define API_NODE_OPTION_MAX 99
static const char *node_runtime_options[] = {
"event-stream-framing",
"event-stream-keepalive",
"enable-legacy",
NULL
};
@ -69,6 +71,10 @@ static switch_status_t api_get_node_option(ei_node_t *ei_node, switch_stream_han
stream->write_function(stream, "+OK %i", ei_node->event_stream_framing);
break;
case API_NODE_OPTION_KEEPALIVE:
stream->write_function(stream, "+OK %i", ei_node->event_stream_keepalive);
break;
case API_NODE_OPTION_LEGACY:
stream->write_function(stream, "+OK %s", ei_node->legacy ? "true" : "false");
break;
@ -98,6 +104,12 @@ static switch_status_t api_set_node_option(ei_node_t *ei_node, switch_stream_han
}
break;
case API_NODE_OPTION_KEEPALIVE:
val = switch_true(value);
stream->write_function(stream, "+OK %i", val);
ei_node->event_stream_keepalive = val;
break;
case API_NODE_OPTION_LEGACY:
ei_node->legacy = switch_true(value);
stream->write_function(stream, "+OK %s", ei_node->legacy ? "true" : "false");
@ -263,9 +275,19 @@ static switch_status_t handle_node_api_event_stream(ei_event_stream_t *event_str
stream->write_function(stream, "%s:%d -> disconnected\n"
,ip_addr, port);
} else {
stream->write_function(stream, "%s:%d -> %s:%d\n"
unsigned int year, day, hour, min, sec, delta;
delta = (switch_micro_time_now() - event_stream->connected_time) / 1000000;
sec = delta % 60;
min = delta / 60 % 60;
hour = delta / 3600 % 24;
day = delta / 86400 % 7;
year = delta / 31556926 % 12;
stream->write_function(stream, "%s:%d -> %s:%d for %d years, %d days, %d hours, %d minutes, %d seconds\n"
,event_stream->local_ip, event_stream->local_port
,event_stream->remote_ip, event_stream->remote_port);
,event_stream->remote_ip, event_stream->remote_port
,year, day, hour, min, sec);
}
binding = event_stream->bindings;

View File

@ -503,10 +503,10 @@ SWITCH_STANDARD_APP(kz_moh_function)
break;
}
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH duration %ld\n", fh.duration);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH duration %" SWITCH_INT64_T_FMT "\n", fh.duration);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH offset_pos %d\n", fh.offset_pos);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH pos %ld\n", fh.pos);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH sample_count %ld\n", fh.sample_count);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH pos %" SWITCH_INT64_T_FMT "\n", fh.pos);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH sample_count %" SWITCH_SIZE_T_FMT "\n", fh.sample_count);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH samples %d\n", fh.samples);
switch_safe_free(my_data);

View File

@ -73,6 +73,7 @@ struct ei_event_stream_s {
switch_socket_t *socket;
switch_mutex_t *socket_mutex;
switch_bool_t connected;
switch_time_t connected_time;
char remote_ip[48];
uint16_t remote_port;
char local_ip[48];
@ -81,6 +82,7 @@ struct ei_event_stream_s {
uint32_t flags;
ei_node_t *node;
short event_stream_framing;
short event_stream_keepalive;
switch_interval_time_t queue_timeout;
struct ei_event_stream_s *next;
};
@ -104,6 +106,7 @@ struct ei_node_s {
uint32_t flags;
int legacy;
short event_stream_framing;
short event_stream_keepalive;
switch_interval_time_t event_stream_queue_timeout;
switch_interval_time_t receiver_queue_timeout;
switch_interval_time_t sender_queue_timeout;
@ -182,6 +185,7 @@ struct kz_globals_s {
int event_stream_preallocate;
int send_msg_batch;
short event_stream_framing;
short event_stream_keepalive;
switch_interval_time_t event_stream_queue_timeout;
switch_port_t port;
int config_fetched;

View File

@ -123,6 +123,7 @@ switch_status_t kazoo_ei_config(switch_xml_t cfg) {
kazoo_globals.event_stream_preallocate = KZ_DEFAULT_STREAM_PRE_ALLOCATE;
kazoo_globals.send_msg_batch = 10;
kazoo_globals.event_stream_framing = 2;
kazoo_globals.event_stream_keepalive = 1;
kazoo_globals.event_stream_queue_timeout = 200000;
kazoo_globals.node_receiver_queue_timeout = 100000;
kazoo_globals.node_sender_queue_timeout = 0;
@ -209,6 +210,10 @@ switch_status_t kazoo_ei_config(switch_xml_t cfg) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set event-stream-framing: %s\n", val);
kazoo_globals.event_stream_framing = atoi(val);
} else if (!strcmp(var, "event-stream-keep-alive")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set event-stream-keep-alive: %s\n", val);
kazoo_globals.event_stream_keepalive = switch_true(val);
} else if (!strcmp(var, "io-fault-tolerance")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set io-fault-tolerance: %s\n", val);
kazoo_globals.io_fault_tolerance = atoi(val);

View File

@ -242,6 +242,7 @@ static void *SWITCH_THREAD_FUNC event_stream_loop(switch_thread_t *thread, void
const char *ip_addr;
void *pop;
short event_stream_framing;
short event_stream_keepalive;
short ok = 1;
switch_atomic_inc(&kazoo_globals.threads);
@ -249,6 +250,7 @@ static void *SWITCH_THREAD_FUNC event_stream_loop(switch_thread_t *thread, void
switch_assert(event_stream != NULL);
event_stream_framing = event_stream->event_stream_framing;
event_stream_keepalive = event_stream->event_stream_keepalive;
/* figure out what socket we just opened */
switch_socket_addr_get(&sa, SWITCH_FALSE, event_stream->acceptor);
@ -277,6 +279,12 @@ static void *SWITCH_THREAD_FUNC event_stream_loop(switch_thread_t *thread, void
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't set socket as non-blocking\n");
}
if (event_stream_keepalive) {
if (switch_socket_opt_set(newsocket, SWITCH_SO_KEEPALIVE, TRUE)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't set socket keep-alive\n");
}
}
if (switch_socket_opt_set(newsocket, SWITCH_SO_TCP_NODELAY, 1)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't disable Nagle.\n");
}
@ -297,6 +305,8 @@ static void *SWITCH_THREAD_FUNC event_stream_loop(switch_thread_t *thread, void
switch_get_addr(event_stream->local_ip, sizeof (event_stream->local_ip), sa);
event_stream->connected = SWITCH_TRUE;
event_stream->connected_time = switch_micro_time_now();
switch_mutex_unlock(event_stream->socket_mutex);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Erlang event stream %p client %s:%u\n", (void *)event_stream, event_stream->remote_ip, event_stream->remote_port);
@ -404,6 +414,7 @@ ei_event_stream_t *new_event_stream(ei_node_t *ei_node, const erlang_pid *from)
event_stream->connected = SWITCH_FALSE;
event_stream->node = ei_node;
event_stream->event_stream_framing = ei_node->event_stream_framing;
event_stream->event_stream_keepalive = ei_node->event_stream_keepalive;
event_stream->queue_timeout = ei_node->event_stream_queue_timeout;
memcpy(&event_stream->pid, from, sizeof(erlang_pid));
switch_queue_create(&event_stream->queue, MAX_QUEUE_LEN, pool);

View File

@ -1631,6 +1631,7 @@ switch_status_t new_kazoo_node(int nodefd, ErlConnect *conn) {
ei_node->created_time = switch_micro_time_now();
ei_node->legacy = kazoo_globals.legacy_events;
ei_node->event_stream_framing = kazoo_globals.event_stream_framing;
ei_node->event_stream_keepalive = kazoo_globals.event_stream_keepalive;
ei_node->event_stream_queue_timeout = kazoo_globals.event_stream_queue_timeout;
ei_node->receiver_queue_timeout = kazoo_globals.node_receiver_queue_timeout;
ei_node->sender_queue_timeout = kazoo_globals.node_sender_queue_timeout;