[mod_kazoo] add event stream connected time
This commit is contained in:
parent
5c52b174f2
commit
343f031915
|
@ -42,11 +42,13 @@
|
||||||
#define API_COMMAND_OPTION 4
|
#define API_COMMAND_OPTION 4
|
||||||
|
|
||||||
#define API_NODE_OPTION_FRAMING 0
|
#define API_NODE_OPTION_FRAMING 0
|
||||||
#define API_NODE_OPTION_LEGACY 1
|
#define API_NODE_OPTION_KEEPALIVE 1
|
||||||
|
#define API_NODE_OPTION_LEGACY 2
|
||||||
#define API_NODE_OPTION_MAX 99
|
#define API_NODE_OPTION_MAX 99
|
||||||
|
|
||||||
static const char *node_runtime_options[] = {
|
static const char *node_runtime_options[] = {
|
||||||
"event-stream-framing",
|
"event-stream-framing",
|
||||||
|
"event-stream-keepalive",
|
||||||
"enable-legacy",
|
"enable-legacy",
|
||||||
NULL
|
NULL
|
||||||
};
|
};
|
||||||
|
@ -69,6 +71,10 @@ static switch_status_t api_get_node_option(ei_node_t *ei_node, switch_stream_han
|
||||||
stream->write_function(stream, "+OK %i", ei_node->event_stream_framing);
|
stream->write_function(stream, "+OK %i", ei_node->event_stream_framing);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
case API_NODE_OPTION_KEEPALIVE:
|
||||||
|
stream->write_function(stream, "+OK %i", ei_node->event_stream_keepalive);
|
||||||
|
break;
|
||||||
|
|
||||||
case API_NODE_OPTION_LEGACY:
|
case API_NODE_OPTION_LEGACY:
|
||||||
stream->write_function(stream, "+OK %s", ei_node->legacy ? "true" : "false");
|
stream->write_function(stream, "+OK %s", ei_node->legacy ? "true" : "false");
|
||||||
break;
|
break;
|
||||||
|
@ -98,6 +104,12 @@ static switch_status_t api_set_node_option(ei_node_t *ei_node, switch_stream_han
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
case API_NODE_OPTION_KEEPALIVE:
|
||||||
|
val = switch_true(value);
|
||||||
|
stream->write_function(stream, "+OK %i", val);
|
||||||
|
ei_node->event_stream_keepalive = val;
|
||||||
|
break;
|
||||||
|
|
||||||
case API_NODE_OPTION_LEGACY:
|
case API_NODE_OPTION_LEGACY:
|
||||||
ei_node->legacy = switch_true(value);
|
ei_node->legacy = switch_true(value);
|
||||||
stream->write_function(stream, "+OK %s", ei_node->legacy ? "true" : "false");
|
stream->write_function(stream, "+OK %s", ei_node->legacy ? "true" : "false");
|
||||||
|
@ -263,9 +275,19 @@ static switch_status_t handle_node_api_event_stream(ei_event_stream_t *event_str
|
||||||
stream->write_function(stream, "%s:%d -> disconnected\n"
|
stream->write_function(stream, "%s:%d -> disconnected\n"
|
||||||
,ip_addr, port);
|
,ip_addr, port);
|
||||||
} else {
|
} else {
|
||||||
stream->write_function(stream, "%s:%d -> %s:%d\n"
|
unsigned int year, day, hour, min, sec, delta;
|
||||||
|
|
||||||
|
delta = (switch_micro_time_now() - event_stream->connected_time) / 1000000;
|
||||||
|
sec = delta % 60;
|
||||||
|
min = delta / 60 % 60;
|
||||||
|
hour = delta / 3600 % 24;
|
||||||
|
day = delta / 86400 % 7;
|
||||||
|
year = delta / 31556926 % 12;
|
||||||
|
|
||||||
|
stream->write_function(stream, "%s:%d -> %s:%d for %d years, %d days, %d hours, %d minutes, %d seconds\n"
|
||||||
,event_stream->local_ip, event_stream->local_port
|
,event_stream->local_ip, event_stream->local_port
|
||||||
,event_stream->remote_ip, event_stream->remote_port);
|
,event_stream->remote_ip, event_stream->remote_port
|
||||||
|
,year, day, hour, min, sec);
|
||||||
}
|
}
|
||||||
|
|
||||||
binding = event_stream->bindings;
|
binding = event_stream->bindings;
|
||||||
|
|
|
@ -503,10 +503,10 @@ SWITCH_STANDARD_APP(kz_moh_function)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH duration %ld\n", fh.duration);
|
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH duration %" SWITCH_INT64_T_FMT "\n", fh.duration);
|
||||||
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH offset_pos %d\n", fh.offset_pos);
|
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH offset_pos %d\n", fh.offset_pos);
|
||||||
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH pos %ld\n", fh.pos);
|
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH pos %" SWITCH_INT64_T_FMT "\n", fh.pos);
|
||||||
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH sample_count %ld\n", fh.sample_count);
|
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH sample_count %" SWITCH_SIZE_T_FMT "\n", fh.sample_count);
|
||||||
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH samples %d\n", fh.samples);
|
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG1, "MOH samples %d\n", fh.samples);
|
||||||
|
|
||||||
switch_safe_free(my_data);
|
switch_safe_free(my_data);
|
||||||
|
|
|
@ -73,6 +73,7 @@ struct ei_event_stream_s {
|
||||||
switch_socket_t *socket;
|
switch_socket_t *socket;
|
||||||
switch_mutex_t *socket_mutex;
|
switch_mutex_t *socket_mutex;
|
||||||
switch_bool_t connected;
|
switch_bool_t connected;
|
||||||
|
switch_time_t connected_time;
|
||||||
char remote_ip[48];
|
char remote_ip[48];
|
||||||
uint16_t remote_port;
|
uint16_t remote_port;
|
||||||
char local_ip[48];
|
char local_ip[48];
|
||||||
|
@ -81,6 +82,7 @@ struct ei_event_stream_s {
|
||||||
uint32_t flags;
|
uint32_t flags;
|
||||||
ei_node_t *node;
|
ei_node_t *node;
|
||||||
short event_stream_framing;
|
short event_stream_framing;
|
||||||
|
short event_stream_keepalive;
|
||||||
switch_interval_time_t queue_timeout;
|
switch_interval_time_t queue_timeout;
|
||||||
struct ei_event_stream_s *next;
|
struct ei_event_stream_s *next;
|
||||||
};
|
};
|
||||||
|
@ -104,6 +106,7 @@ struct ei_node_s {
|
||||||
uint32_t flags;
|
uint32_t flags;
|
||||||
int legacy;
|
int legacy;
|
||||||
short event_stream_framing;
|
short event_stream_framing;
|
||||||
|
short event_stream_keepalive;
|
||||||
switch_interval_time_t event_stream_queue_timeout;
|
switch_interval_time_t event_stream_queue_timeout;
|
||||||
switch_interval_time_t receiver_queue_timeout;
|
switch_interval_time_t receiver_queue_timeout;
|
||||||
switch_interval_time_t sender_queue_timeout;
|
switch_interval_time_t sender_queue_timeout;
|
||||||
|
@ -182,6 +185,7 @@ struct kz_globals_s {
|
||||||
int event_stream_preallocate;
|
int event_stream_preallocate;
|
||||||
int send_msg_batch;
|
int send_msg_batch;
|
||||||
short event_stream_framing;
|
short event_stream_framing;
|
||||||
|
short event_stream_keepalive;
|
||||||
switch_interval_time_t event_stream_queue_timeout;
|
switch_interval_time_t event_stream_queue_timeout;
|
||||||
switch_port_t port;
|
switch_port_t port;
|
||||||
int config_fetched;
|
int config_fetched;
|
||||||
|
|
|
@ -123,6 +123,7 @@ switch_status_t kazoo_ei_config(switch_xml_t cfg) {
|
||||||
kazoo_globals.event_stream_preallocate = KZ_DEFAULT_STREAM_PRE_ALLOCATE;
|
kazoo_globals.event_stream_preallocate = KZ_DEFAULT_STREAM_PRE_ALLOCATE;
|
||||||
kazoo_globals.send_msg_batch = 10;
|
kazoo_globals.send_msg_batch = 10;
|
||||||
kazoo_globals.event_stream_framing = 2;
|
kazoo_globals.event_stream_framing = 2;
|
||||||
|
kazoo_globals.event_stream_keepalive = 1;
|
||||||
kazoo_globals.event_stream_queue_timeout = 200000;
|
kazoo_globals.event_stream_queue_timeout = 200000;
|
||||||
kazoo_globals.node_receiver_queue_timeout = 100000;
|
kazoo_globals.node_receiver_queue_timeout = 100000;
|
||||||
kazoo_globals.node_sender_queue_timeout = 0;
|
kazoo_globals.node_sender_queue_timeout = 0;
|
||||||
|
@ -209,6 +210,10 @@ switch_status_t kazoo_ei_config(switch_xml_t cfg) {
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set event-stream-framing: %s\n", val);
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set event-stream-framing: %s\n", val);
|
||||||
kazoo_globals.event_stream_framing = atoi(val);
|
kazoo_globals.event_stream_framing = atoi(val);
|
||||||
|
|
||||||
|
} else if (!strcmp(var, "event-stream-keep-alive")) {
|
||||||
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set event-stream-keep-alive: %s\n", val);
|
||||||
|
kazoo_globals.event_stream_keepalive = switch_true(val);
|
||||||
|
|
||||||
} else if (!strcmp(var, "io-fault-tolerance")) {
|
} else if (!strcmp(var, "io-fault-tolerance")) {
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set io-fault-tolerance: %s\n", val);
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set io-fault-tolerance: %s\n", val);
|
||||||
kazoo_globals.io_fault_tolerance = atoi(val);
|
kazoo_globals.io_fault_tolerance = atoi(val);
|
||||||
|
|
|
@ -242,6 +242,7 @@ static void *SWITCH_THREAD_FUNC event_stream_loop(switch_thread_t *thread, void
|
||||||
const char *ip_addr;
|
const char *ip_addr;
|
||||||
void *pop;
|
void *pop;
|
||||||
short event_stream_framing;
|
short event_stream_framing;
|
||||||
|
short event_stream_keepalive;
|
||||||
short ok = 1;
|
short ok = 1;
|
||||||
|
|
||||||
switch_atomic_inc(&kazoo_globals.threads);
|
switch_atomic_inc(&kazoo_globals.threads);
|
||||||
|
@ -249,6 +250,7 @@ static void *SWITCH_THREAD_FUNC event_stream_loop(switch_thread_t *thread, void
|
||||||
switch_assert(event_stream != NULL);
|
switch_assert(event_stream != NULL);
|
||||||
|
|
||||||
event_stream_framing = event_stream->event_stream_framing;
|
event_stream_framing = event_stream->event_stream_framing;
|
||||||
|
event_stream_keepalive = event_stream->event_stream_keepalive;
|
||||||
|
|
||||||
/* figure out what socket we just opened */
|
/* figure out what socket we just opened */
|
||||||
switch_socket_addr_get(&sa, SWITCH_FALSE, event_stream->acceptor);
|
switch_socket_addr_get(&sa, SWITCH_FALSE, event_stream->acceptor);
|
||||||
|
@ -277,6 +279,12 @@ static void *SWITCH_THREAD_FUNC event_stream_loop(switch_thread_t *thread, void
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't set socket as non-blocking\n");
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't set socket as non-blocking\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (event_stream_keepalive) {
|
||||||
|
if (switch_socket_opt_set(newsocket, SWITCH_SO_KEEPALIVE, TRUE)) {
|
||||||
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't set socket keep-alive\n");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (switch_socket_opt_set(newsocket, SWITCH_SO_TCP_NODELAY, 1)) {
|
if (switch_socket_opt_set(newsocket, SWITCH_SO_TCP_NODELAY, 1)) {
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't disable Nagle.\n");
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't disable Nagle.\n");
|
||||||
}
|
}
|
||||||
|
@ -297,6 +305,8 @@ static void *SWITCH_THREAD_FUNC event_stream_loop(switch_thread_t *thread, void
|
||||||
switch_get_addr(event_stream->local_ip, sizeof (event_stream->local_ip), sa);
|
switch_get_addr(event_stream->local_ip, sizeof (event_stream->local_ip), sa);
|
||||||
|
|
||||||
event_stream->connected = SWITCH_TRUE;
|
event_stream->connected = SWITCH_TRUE;
|
||||||
|
event_stream->connected_time = switch_micro_time_now();
|
||||||
|
|
||||||
switch_mutex_unlock(event_stream->socket_mutex);
|
switch_mutex_unlock(event_stream->socket_mutex);
|
||||||
|
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Erlang event stream %p client %s:%u\n", (void *)event_stream, event_stream->remote_ip, event_stream->remote_port);
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Erlang event stream %p client %s:%u\n", (void *)event_stream, event_stream->remote_ip, event_stream->remote_port);
|
||||||
|
@ -404,6 +414,7 @@ ei_event_stream_t *new_event_stream(ei_node_t *ei_node, const erlang_pid *from)
|
||||||
event_stream->connected = SWITCH_FALSE;
|
event_stream->connected = SWITCH_FALSE;
|
||||||
event_stream->node = ei_node;
|
event_stream->node = ei_node;
|
||||||
event_stream->event_stream_framing = ei_node->event_stream_framing;
|
event_stream->event_stream_framing = ei_node->event_stream_framing;
|
||||||
|
event_stream->event_stream_keepalive = ei_node->event_stream_keepalive;
|
||||||
event_stream->queue_timeout = ei_node->event_stream_queue_timeout;
|
event_stream->queue_timeout = ei_node->event_stream_queue_timeout;
|
||||||
memcpy(&event_stream->pid, from, sizeof(erlang_pid));
|
memcpy(&event_stream->pid, from, sizeof(erlang_pid));
|
||||||
switch_queue_create(&event_stream->queue, MAX_QUEUE_LEN, pool);
|
switch_queue_create(&event_stream->queue, MAX_QUEUE_LEN, pool);
|
||||||
|
|
|
@ -1631,6 +1631,7 @@ switch_status_t new_kazoo_node(int nodefd, ErlConnect *conn) {
|
||||||
ei_node->created_time = switch_micro_time_now();
|
ei_node->created_time = switch_micro_time_now();
|
||||||
ei_node->legacy = kazoo_globals.legacy_events;
|
ei_node->legacy = kazoo_globals.legacy_events;
|
||||||
ei_node->event_stream_framing = kazoo_globals.event_stream_framing;
|
ei_node->event_stream_framing = kazoo_globals.event_stream_framing;
|
||||||
|
ei_node->event_stream_keepalive = kazoo_globals.event_stream_keepalive;
|
||||||
ei_node->event_stream_queue_timeout = kazoo_globals.event_stream_queue_timeout;
|
ei_node->event_stream_queue_timeout = kazoo_globals.event_stream_queue_timeout;
|
||||||
ei_node->receiver_queue_timeout = kazoo_globals.node_receiver_queue_timeout;
|
ei_node->receiver_queue_timeout = kazoo_globals.node_receiver_queue_timeout;
|
||||||
ei_node->sender_queue_timeout = kazoo_globals.node_sender_queue_timeout;
|
ei_node->sender_queue_timeout = kazoo_globals.node_sender_queue_timeout;
|
||||||
|
|
Loading…
Reference in New Issue