diff --git a/src/mod/event_handlers/mod_kazoo/kazoo_node.c b/src/mod/event_handlers/mod_kazoo/kazoo_node.c index cf7f044346..de8b16420a 100644 --- a/src/mod/event_handlers/mod_kazoo/kazoo_node.c +++ b/src/mod/event_handlers/mod_kazoo/kazoo_node.c @@ -1281,6 +1281,7 @@ static void *SWITCH_THREAD_FUNC receive_handler(switch_thread_t *thread, void *o static void *SWITCH_THREAD_FUNC handle_node(switch_thread_t *thread, void *obj) { ei_node_t *ei_node = (ei_node_t *) obj; ei_received_msg_t *received_msg = NULL; + int fault_count = 0; switch_atomic_inc(&globals.threads); @@ -1332,13 +1333,17 @@ static void *SWITCH_THREAD_FUNC handle_node(switch_thread_t *thread, void *obj) /* erlang nodes send ticks to eachother to validate they are still reachable, we dont have to do anything here */ break; case ERL_MSG: + fault_count = 0; + if (switch_queue_trypush(ei_node->received_msgs, received_msg) != SWITCH_STATUS_SUCCESS) { ei_x_free(&received_msg->buf); switch_safe_free(received_msg); } + if (globals.receive_msg_preallocate > 0 && received_msg->buf.buffsz > globals.receive_msg_preallocate) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "increased received message buffer size to %d\n", received_msg->buf.buffsz); } + received_msg = NULL; break; case ERL_ERROR: @@ -1353,8 +1358,12 @@ static void *SWITCH_THREAD_FUNC handle_node(switch_thread_t *thread, void *obj) switch_clear_flag(ei_node, LFLAG_RUNNING); break; case EIO: - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Erlang communication fault with node %p %s (%s:%d): socket closed or I/O error\n", (void *)ei_node, ei_node->peer_nodename, ei_node->remote_ip, ei_node->remote_port); - switch_clear_flag(ei_node, LFLAG_RUNNING); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Erlang communication fault with node %p %s (%s:%d): socket closed or I/O error [fault count %d]\n", (void *)ei_node, ei_node->peer_nodename, ei_node->remote_ip, ei_node->remote_port, ++fault_count); + + if (fault_count >= globals.io_fault_tolerance) { + switch_clear_flag(ei_node, LFLAG_RUNNING); + } + break; default: /* OH NOS! something has gone horribly wrong, shutdown the connection if status set by ei_xreceive_msg_tmo is less than or equal to 0 */ diff --git a/src/mod/event_handlers/mod_kazoo/mod_kazoo.c b/src/mod/event_handlers/mod_kazoo/mod_kazoo.c index 174559405e..e283cb5a96 100644 --- a/src/mod/event_handlers/mod_kazoo/mod_kazoo.c +++ b/src/mod/event_handlers/mod_kazoo/mod_kazoo.c @@ -349,6 +349,7 @@ static switch_status_t config(void) { globals.send_msg_batch = 10; globals.event_stream_framing = 2; globals.port = 0; + globals.io_fault_tolerance = 10; if (!(xml = switch_xml_open_cfg(cf, &cfg, NULL))) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to open configuration file %s\n", cf); @@ -410,6 +411,9 @@ static switch_status_t config(void) { } else if (!strcmp(var, "event-stream-framing")) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set event-stream-framing: %s\n", val); globals.event_stream_framing = atoi(val); + } else if (!strcmp(var, "io-fault-tolerance")) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Set io-fault-tolerance: %s\n", val); + globals.io_fault_tolerance = atoi(val); } } } @@ -444,6 +448,11 @@ static switch_status_t config(void) { globals.send_msg_batch = 10; } + if (globals.io_fault_tolerance < 1) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Invalid I/O fault tolerance, reverting to default\n"); + globals.io_fault_tolerance = 10; + } + if (!globals.event_filter) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Event filter not found in configuration, using default\n"); globals.event_filter = create_default_filter(); diff --git a/src/mod/event_handlers/mod_kazoo/mod_kazoo.h b/src/mod/event_handlers/mod_kazoo/mod_kazoo.h index 8c41dc6402..9e28707c38 100644 --- a/src/mod/event_handlers/mod_kazoo/mod_kazoo.h +++ b/src/mod/event_handlers/mod_kazoo/mod_kazoo.h @@ -119,6 +119,7 @@ struct globals_s { short event_stream_framing; switch_port_t port; int config_filters_fetched; + int io_fault_tolerance; }; typedef struct globals_s globals_t; extern globals_t globals;