diff --git a/src/mod/event_handlers/mod_kazoo/kazoo_api.c b/src/mod/event_handlers/mod_kazoo/kazoo_api.c index 180f3e3d48..96f9ff7bb4 100644 --- a/src/mod/event_handlers/mod_kazoo/kazoo_api.c +++ b/src/mod/event_handlers/mod_kazoo/kazoo_api.c @@ -42,11 +42,13 @@ #define API_COMMAND_OPTION 4 #define API_NODE_OPTION_FRAMING 0 -#define API_NODE_OPTION_LEGACY 1 +#define API_NODE_OPTION_KEEPALIVE 1 +#define API_NODE_OPTION_LEGACY 2 #define API_NODE_OPTION_MAX 99 static const char *node_runtime_options[] = { "event-stream-framing", + "event-stream-keepalive", "enable-legacy", NULL }; @@ -69,6 +71,10 @@ static switch_status_t api_get_node_option(ei_node_t *ei_node, switch_stream_han stream->write_function(stream, "+OK %i", ei_node->event_stream_framing); break; + case API_NODE_OPTION_KEEPALIVE: + stream->write_function(stream, "+OK %i", ei_node->event_stream_keepalive); + break; + case API_NODE_OPTION_LEGACY: stream->write_function(stream, "+OK %s", ei_node->legacy ? "true" : "false"); break; @@ -98,6 +104,12 @@ static switch_status_t api_set_node_option(ei_node_t *ei_node, switch_stream_han } break; + case API_NODE_OPTION_KEEPALIVE: + val = switch_true(value); + stream->write_function(stream, "+OK %i", val); + ei_node->event_stream_keepalive = val; + break; + case API_NODE_OPTION_LEGACY: ei_node->legacy = switch_true(value); stream->write_function(stream, "+OK %s", ei_node->legacy ? "true" : "false"); @@ -263,9 +275,19 @@ static switch_status_t handle_node_api_event_stream(ei_event_stream_t *event_str stream->write_function(stream, "%s:%d -> disconnected\n" ,ip_addr, port); } else { - stream->write_function(stream, "%s:%d -> %s:%d\n" + unsigned int year, day, hour, min, sec, delta; + + delta = (switch_micro_time_now() - event_stream->connected_time) / 1000000; + sec = delta % 60; + min = delta / 60 % 60; + hour = delta / 3600 % 24; + day = delta / 86400 % 7; + year = delta / 31556926 % 12; + + stream->write_function(stream, "%s:%d -> %s:%d for %d years, %d days, %d hours, %d minutes, %d seconds\n" ,event_stream->local_ip, event_stream->local_port - ,event_stream->remote_ip, event_stream->remote_port); + ,event_stream->remote_ip, event_stream->remote_port + ,year, day, hour, min, sec); } binding = event_stream->bindings; diff --git a/src/mod/event_handlers/mod_kazoo/kazoo_dptools.c b/src/mod/event_handlers/mod_kazoo/kazoo_dptools.c index 63fa9d5881..5a01650969 100644 --- a/src/mod/event_handlers/mod_kazoo/kazoo_dptools.c +++ b/src/mod/event_handlers/mod_kazoo/kazoo_dptools.c @@ -503,10 +503,10 @@ SWITCH_STANDARD_APP(kz_moh_function) break; } - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH duration %ld\n", fh.duration); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH duration %" SWITCH_INT64_T_FMT "\n", fh.duration); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH offset_pos %d\n", fh.offset_pos); - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH pos %ld\n", fh.pos); - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH sample_count %ld\n", fh.sample_count); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH pos %" SWITCH_INT64_T_FMT "\n", fh.pos); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH sample_count %" SWITCH_SIZE_T_FMT "\n", fh.sample_count); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH samples %d\n", fh.samples); switch_safe_free(my_data); diff --git a/src/mod/event_handlers/mod_kazoo/kazoo_ei.h b/src/mod/event_handlers/mod_kazoo/kazoo_ei.h index 64f90c37a3..5f617b5f1d 100644 --- a/src/mod/event_handlers/mod_kazoo/kazoo_ei.h +++ b/src/mod/event_handlers/mod_kazoo/kazoo_ei.h @@ -73,6 +73,7 @@ struct ei_event_stream_s { switch_socket_t *socket; switch_mutex_t *socket_mutex; switch_bool_t connected; + switch_time_t connected_time; char remote_ip[48]; uint16_t remote_port; char local_ip[48]; @@ -81,6 +82,7 @@ struct ei_event_stream_s { uint32_t flags; ei_node_t *node; short event_stream_framing; + short event_stream_keepalive; switch_interval_time_t queue_timeout; struct ei_event_stream_s *next; }; @@ -104,6 +106,7 @@ struct ei_node_s { uint32_t flags; int legacy; short event_stream_framing; + short event_stream_keepalive; switch_interval_time_t event_stream_queue_timeout; switch_interval_time_t receiver_queue_timeout; switch_interval_time_t sender_queue_timeout; @@ -182,6 +185,7 @@ struct kz_globals_s { int event_stream_preallocate; int send_msg_batch; short event_stream_framing; + short event_stream_keepalive; switch_interval_time_t event_stream_queue_timeout; switch_port_t port; int config_fetched; diff --git a/src/mod/event_handlers/mod_kazoo/kazoo_ei_config.c b/src/mod/event_handlers/mod_kazoo/kazoo_ei_config.c index d49c54d023..b2f3baf963 100644 --- a/src/mod/event_handlers/mod_kazoo/kazoo_ei_config.c +++ b/src/mod/event_handlers/mod_kazoo/kazoo_ei_config.c @@ -123,6 +123,7 @@ switch_status_t kazoo_ei_config(switch_xml_t cfg) { kazoo_globals.event_stream_preallocate = KZ_DEFAULT_STREAM_PRE_ALLOCATE; kazoo_globals.send_msg_batch = 10; kazoo_globals.event_stream_framing = 2; + kazoo_globals.event_stream_keepalive = 1; kazoo_globals.event_stream_queue_timeout = 200000; kazoo_globals.node_receiver_queue_timeout = 100000; kazoo_globals.node_sender_queue_timeout = 0; @@ -209,6 +210,10 @@ switch_status_t kazoo_ei_config(switch_xml_t cfg) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set event-stream-framing: %s\n", val); kazoo_globals.event_stream_framing = atoi(val); + } else if (!strcmp(var, "event-stream-keep-alive")) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set event-stream-keep-alive: %s\n", val); + kazoo_globals.event_stream_keepalive = switch_true(val); + } else if (!strcmp(var, "io-fault-tolerance")) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set io-fault-tolerance: %s\n", val); kazoo_globals.io_fault_tolerance = atoi(val); diff --git a/src/mod/event_handlers/mod_kazoo/kazoo_event_stream.c b/src/mod/event_handlers/mod_kazoo/kazoo_event_stream.c index 4f0148340d..3d1f18326b 100644 --- a/src/mod/event_handlers/mod_kazoo/kazoo_event_stream.c +++ b/src/mod/event_handlers/mod_kazoo/kazoo_event_stream.c @@ -242,6 +242,7 @@ static void *SWITCH_THREAD_FUNC event_stream_loop(switch_thread_t *thread, void const char *ip_addr; void *pop; short event_stream_framing; + short event_stream_keepalive; short ok = 1; switch_atomic_inc(&kazoo_globals.threads); @@ -249,6 +250,7 @@ static void *SWITCH_THREAD_FUNC event_stream_loop(switch_thread_t *thread, void switch_assert(event_stream != NULL); event_stream_framing = event_stream->event_stream_framing; + event_stream_keepalive = event_stream->event_stream_keepalive; /* figure out what socket we just opened */ switch_socket_addr_get(&sa, SWITCH_FALSE, event_stream->acceptor); @@ -277,6 +279,12 @@ static void *SWITCH_THREAD_FUNC event_stream_loop(switch_thread_t *thread, void switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't set socket as non-blocking\n"); } + if (event_stream_keepalive) { + if (switch_socket_opt_set(newsocket, SWITCH_SO_KEEPALIVE, TRUE)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't set socket keep-alive\n"); + } + } + if (switch_socket_opt_set(newsocket, SWITCH_SO_TCP_NODELAY, 1)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't disable Nagle.\n"); } @@ -297,6 +305,8 @@ static void *SWITCH_THREAD_FUNC event_stream_loop(switch_thread_t *thread, void switch_get_addr(event_stream->local_ip, sizeof (event_stream->local_ip), sa); event_stream->connected = SWITCH_TRUE; + event_stream->connected_time = switch_micro_time_now(); + switch_mutex_unlock(event_stream->socket_mutex); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Erlang event stream %p client %s:%u\n", (void *)event_stream, event_stream->remote_ip, event_stream->remote_port); @@ -404,6 +414,7 @@ ei_event_stream_t *new_event_stream(ei_node_t *ei_node, const erlang_pid *from) event_stream->connected = SWITCH_FALSE; event_stream->node = ei_node; event_stream->event_stream_framing = ei_node->event_stream_framing; + event_stream->event_stream_keepalive = ei_node->event_stream_keepalive; event_stream->queue_timeout = ei_node->event_stream_queue_timeout; memcpy(&event_stream->pid, from, sizeof(erlang_pid)); switch_queue_create(&event_stream->queue, MAX_QUEUE_LEN, pool); diff --git a/src/mod/event_handlers/mod_kazoo/kazoo_node.c b/src/mod/event_handlers/mod_kazoo/kazoo_node.c index 13b916bd6f..fae09bee2f 100644 --- a/src/mod/event_handlers/mod_kazoo/kazoo_node.c +++ b/src/mod/event_handlers/mod_kazoo/kazoo_node.c @@ -1631,6 +1631,7 @@ switch_status_t new_kazoo_node(int nodefd, ErlConnect *conn) { ei_node->created_time = switch_micro_time_now(); ei_node->legacy = kazoo_globals.legacy_events; ei_node->event_stream_framing = kazoo_globals.event_stream_framing; + ei_node->event_stream_keepalive = kazoo_globals.event_stream_keepalive; ei_node->event_stream_queue_timeout = kazoo_globals.event_stream_queue_timeout; ei_node->receiver_queue_timeout = kazoo_globals.node_receiver_queue_timeout; ei_node->sender_queue_timeout = kazoo_globals.node_sender_queue_timeout;