diff --git a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c index ac89597bd5..62ffc8bf7c 100644 --- a/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c +++ b/src/mod/event_handlers/mod_erlang_event/mod_erlang_event.c @@ -91,7 +91,6 @@ typedef struct listener listener_t; static struct { int sockfd; - switch_mutex_t *mutex; switch_mutex_t *sock_mutex; listener_t *listeners; uint8_t ready; @@ -124,6 +123,7 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) static void launch_listener_thread(listener_t *listener); static struct { + switch_mutex_t *listener_mutex; switch_event_node_t *node; } globals; @@ -132,7 +132,7 @@ static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_l { listener_t *l; - switch_mutex_lock(listen_list.mutex); + switch_mutex_lock(globals.listener_mutex); for (l = listen_list.listeners; l; l = l->next) { if (switch_test_flag(l, LFLAG_LOG) && l->level >= node->level) { char *data = strdup(node->data); @@ -157,7 +157,7 @@ static switch_status_t socket_logger(const switch_log_node_t *node, switch_log_l } } } - switch_mutex_unlock(listen_list.mutex); + switch_mutex_unlock(globals.listener_mutex); return SWITCH_STATUS_SUCCESS; } @@ -223,7 +223,7 @@ static void event_handler(switch_event_t *event) lp = listen_list.listeners; - switch_mutex_lock(listen_list.mutex); + switch_mutex_lock(globals.listener_mutex); while(lp) { uint8_t send = 0; @@ -279,7 +279,19 @@ static void event_handler(switch_event_t *event) } } - switch_mutex_unlock(listen_list.mutex); + switch_mutex_unlock(globals.listener_mutex); +} + + +static void close_socket(int *sock) +{ + switch_mutex_lock(listen_list.sock_mutex); + if (*sock) { + shutdown(*sock, SHUT_RDWR); + close(*sock); + sock = NULL; + } + switch_mutex_unlock(listen_list.sock_mutex); } @@ -292,23 +304,24 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_erlang_event_shutdown) switch_log_unbind_logger(socket_logger); - close(listen_list.sockfd); + /*close_socket(&listen_list.sockfd);*/ - switch_yield(500); - - while (prefs.threads) { + while (prefs.threads || prefs.done == 1) { switch_yield(10000); if (++sanity == 1000) { break; } } + switch_event_unbind(&globals.node); - switch_mutex_lock(listen_list.mutex); + switch_mutex_lock(globals.listener_mutex); + for (l = listen_list.listeners; l; l = l->next) { - close(l->sockfd); + close_socket(&l->sockfd); } - switch_mutex_unlock(listen_list.mutex); + + switch_mutex_unlock(globals.listener_mutex); return SWITCH_STATUS_SUCCESS; } @@ -317,10 +330,10 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_erlang_event_shutdown) static void add_listener(listener_t *listener) { /* add me to the listeners so I get events */ - switch_mutex_lock(listen_list.mutex); + switch_mutex_lock(globals.listener_mutex); listener->next = listen_list.listeners; listen_list.listeners = listener; - switch_mutex_unlock(listen_list.mutex); + switch_mutex_unlock(globals.listener_mutex); } @@ -328,7 +341,7 @@ static void remove_listener(listener_t *listener) { listener_t *l, *last = NULL; - switch_mutex_lock(listen_list.mutex); + switch_mutex_lock(globals.listener_mutex); for (l = listen_list.listeners; l; l = l->next) { if (l == listener) { if (last) { @@ -339,11 +352,21 @@ static void remove_listener(listener_t *listener) } last = l; } - switch_mutex_unlock(listen_list.mutex); + switch_mutex_unlock(globals.listener_mutex); } SWITCH_MODULE_LOAD_FUNCTION(mod_erlang_event_load) { + switch_mutex_init(&globals.listener_mutex, SWITCH_MUTEX_NESTED, pool); + + if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, NULL, &globals.node) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind!\n"); + close_socket(&listen_list.sockfd); + return SWITCH_STATUS_GENERR; + } + + switch_log_bind_logger(socket_logger, SWITCH_LOG_DEBUG, SWITCH_FALSE); + /* connect my internal structure to the blank pointer passed to me */ *module_interface = switch_loadable_module_create_module_interface(pool, modname); @@ -830,9 +853,9 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) int status = 1; void *pop; - switch_mutex_lock(listen_list.mutex); + switch_mutex_lock(globals.listener_mutex); prefs.threads++; - switch_mutex_unlock(listen_list.mutex); + switch_mutex_unlock(globals.listener_mutex); switch_assert(listener != NULL); @@ -984,7 +1007,7 @@ done: switch_thread_rwlock_wrlock(listener->rwlock); if (listener->sockfd) { - close(listener->sockfd); + close_socket(&listener->sockfd); } switch_thread_rwlock_unlock(listener->rwlock); @@ -1001,9 +1024,9 @@ done: switch_core_destroy_memory_pool(&pool); } - switch_mutex_lock(listen_list.mutex); + switch_mutex_lock(globals.listener_mutex); prefs.threads--; - switch_mutex_unlock(listen_list.mutex); + switch_mutex_unlock(globals.listener_mutex); return NULL; } @@ -1092,6 +1115,7 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime) struct sockaddr_in server_addr; int on = 1; int clientfd; + int epmdfd; memset(&listen_list, 0, sizeof(listen_list)); config(); @@ -1101,7 +1125,6 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime) return SWITCH_STATUS_TERM; } - switch_mutex_init(&listen_list.mutex, SWITCH_MUTEX_NESTED, pool); switch_mutex_init(&listen_list.sock_mutex, SWITCH_MUTEX_NESTED, pool); /* zero out the struct before we use it */ @@ -1129,7 +1152,7 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime) goto sock_fail; } - if (setsockopt(listen_list.sockfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) != 0) { + if (setsockopt(listen_list.sockfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to enable SO_REUSEADDR for socket on %s:%u : %s\n", prefs.ip, prefs.port, strerror(errno)); goto sock_fail; } @@ -1170,12 +1193,15 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime) /* init the ei stuff */ if (ei_connect_xinit(&ec, thishostname, prefs.nodename, thisnodename, (Erl_IpAddr)(&server_addr.sin_addr.s_addr), prefs.cookie, 0) < 0) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to init ei connection\n"); + close_socket(&listen_list.sockfd); return SWITCH_STATUS_GENERR; } /* return value is -1 for error, a descriptor pointing to epmd otherwise */ - if (ei_publish(&ec, prefs.port) == -1) { + if ((epmdfd = ei_publish(&ec, prefs.port)) == -1) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to publish port to empd\n"); + /* TODO - start epmd? */ + close_socket(&listen_list.sockfd); return SWITCH_STATUS_GENERR; } @@ -1183,13 +1209,6 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime) listen_list.ready = 1; - if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, NULL, &globals.node) != SWITCH_STATUS_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind!\n"); - return SWITCH_STATUS_GENERR; - } - - switch_log_bind_logger(socket_logger, SWITCH_LOG_DEBUG, SWITCH_FALSE); - for (;;) { /* zero out errno because ei_accept doesn't differentiate between a * failed authentication or a socket failure, or a client version @@ -1239,7 +1258,11 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime) } - close(listen_list.sockfd); + /* cleanup epmd registration */ + ei_unpublish(&ec); + close(epmdfd); + + close_socket(&listen_list.sockfd); if (pool) { switch_core_destroy_memory_pool(&pool); @@ -1254,6 +1277,7 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_erlang_event_runtime) switch_safe_free(prefs.acl[x]); } + prefs.done = 2; fail: return SWITCH_STATUS_TERM; }