diff --git a/libs/esl/src/esl.c b/libs/esl/src/esl.c index 1cb717c799..2686451c20 100644 --- a/libs/esl/src/esl.c +++ b/libs/esl/src/esl.c @@ -643,7 +643,7 @@ ESL_DECLARE(esl_status_t) esl_connect(esl_handle_t *handle, const char *host, es hval = esl_event_get_header(handle->last_event, "content-type"); - if (strcasecmp(hval, "auth/request")) { + if (esl_safe_strcasecmp(hval, "auth/request")) { snprintf(handle->err, sizeof(handle->err), "Connection Error"); goto fail; } @@ -660,7 +660,7 @@ ESL_DECLARE(esl_status_t) esl_connect(esl_handle_t *handle, const char *host, es hval = esl_event_get_header(handle->last_event, "reply-text"); - if (strcasecmp(hval, "+OK accepted")) { + if (esl_safe_strcasecmp(hval, "+OK accepted")) { snprintf(handle->err, sizeof(handle->err), "Authentication Error"); goto fail; } @@ -876,7 +876,7 @@ ESL_DECLARE(esl_status_t) esl_recv_event(esl_handle_t *handle, esl_event_t **sav hval = esl_event_get_header(revent, "content-type"); - if (!esl_strlen_zero(hval) && !strcasecmp(hval, "text/event-plain") && revent->body) { + if (!esl_strlen_zero(hval) && !esl_safe_strcasecmp(hval, "text/event-plain") && revent->body) { const char *en; esl_event_types_t et = ESL_EVENT_COMMAND; char *body = strdup(revent->body); diff --git a/libs/esl/src/include/esl.h b/libs/esl/src/include/esl.h index 40ecdc2265..a37c3cc868 100644 --- a/libs/esl/src/include/esl.h +++ b/libs/esl/src/include/esl.h @@ -341,7 +341,7 @@ ESL_DECLARE(esl_status_t) esl_events(esl_handle_t *handle, esl_event_type_t etyp #define esl_recv(_h) esl_recv_event(_h, NULL) #define esl_recv_timed(_h, _ms) esl_recv_event_timed(_h, _ms, NULL) - +#define esl_safe_strcasecmp(_s1, _s2) ((_s1) && (_s2)) ? strcasecmp((_s1), (_s2)) : 1 #ifdef __cplusplus } diff --git a/src/include/switch_apr.h b/src/include/switch_apr.h index 967ece209d..349258847d 100644 --- a/src/include/switch_apr.h +++ b/src/include/switch_apr.h @@ -620,6 +620,8 @@ SWITCH_DECLARE(unsigned int) switch_queue_size(switch_queue_t *queue); */ SWITCH_DECLARE(switch_status_t) switch_queue_trypop(switch_queue_t *queue, void **data); +SWITCH_DECLARE(switch_status_t) switch_queue_interrupt_all(switch_queue_t *queue); + /** * push/add a object to the queue, returning immediatly if the queue is full * diff --git a/src/include/switch_core.h b/src/include/switch_core.h index 49d80088e4..3934a41b3d 100644 --- a/src/include/switch_core.h +++ b/src/include/switch_core.h @@ -1671,7 +1671,8 @@ SWITCH_DECLARE(FILE *) switch_core_get_console(void); /*! \brief Launch a thread */ -SWITCH_DECLARE(void) switch_core_launch_thread(void *(SWITCH_THREAD_FUNC *func) (switch_thread_t *, void *), void *obj, switch_memory_pool_t *pool); +SWITCH_DECLARE(switch_thread_t *) switch_core_launch_thread(void *(SWITCH_THREAD_FUNC *func) (switch_thread_t *, void *), + void *obj, switch_memory_pool_t *pool); #endif /*! diff --git a/src/mod/applications/mod_commands/mod_commands.c b/src/mod/applications/mod_commands/mod_commands.c index 29fac29a90..dbd27750eb 100644 --- a/src/mod/applications/mod_commands/mod_commands.c +++ b/src/mod/applications/mod_commands/mod_commands.c @@ -40,7 +40,9 @@ #include SWITCH_MODULE_LOAD_FUNCTION(mod_commands_load); -SWITCH_MODULE_DEFINITION(mod_commands, mod_commands_load, NULL, NULL); +SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_commands_shutdown); +SWITCH_MODULE_DEFINITION(mod_commands, mod_commands_load, mod_commands_shutdown, NULL); + SWITCH_STANDARD_API(time_test_function) { @@ -2289,6 +2291,8 @@ SWITCH_STANDARD_API(sched_api_function) return SWITCH_STATUS_SUCCESS; } +static switch_thread_rwlock_t *bgapi_rwlock = NULL; + struct bg_job { char *cmd; char *arg; @@ -2309,6 +2313,8 @@ static void *SWITCH_THREAD_FUNC bgapi_exec(switch_thread_t *thread, void *obj) if (!job) return NULL; + switch_thread_rwlock_rdlock(bgapi_rwlock); + pool = job->pool; SWITCH_STANDARD_STREAM(stream); @@ -2345,6 +2351,9 @@ static void *SWITCH_THREAD_FUNC bgapi_exec(switch_thread_t *thread, void *obj) job = NULL; switch_core_destroy_memory_pool(&pool); pool = NULL; + + switch_thread_rwlock_unlock(bgapi_rwlock); + return NULL; } @@ -3183,11 +3192,37 @@ SWITCH_STANDARD_API(hupall_api_function) return SWITCH_STATUS_SUCCESS; } +SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_commands_shutdown) +{ + int x; + + + for (x = 30; x > 0; x--) { + if (switch_thread_rwlock_trywrlock(bgapi_rwlock) == SWITCH_STATUS_SUCCESS) { + switch_thread_rwlock_unlock(bgapi_rwlock); + break; + } + if (x == 30) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for bgapi threads.\n"); + } + switch_yield(1000000); + } + + if (!x) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Giving up waiting for bgapi threads.\n"); + } + + return SWITCH_STATUS_SUCCESS; +} + + SWITCH_MODULE_LOAD_FUNCTION(mod_commands_load) { switch_api_interface_t *commands_api_interface; *module_interface = switch_loadable_module_create_module_interface(pool, modname); + switch_thread_rwlock_create(&bgapi_rwlock, pool); + SWITCH_ADD_API(commands_api_interface, "group_call", "Generate a dial string to call a group", group_call_function, "[@]"); SWITCH_ADD_API(commands_api_interface, "in_group", "determine if a user is in a group", in_group_function, "[@] "); diff --git a/src/mod/event_handlers/mod_event_socket/mod_event_socket.c b/src/mod/event_handlers/mod_event_socket/mod_event_socket.c index 09b6e18a0c..c1e7edf4c6 100644 --- a/src/mod/event_handlers/mod_event_socket/mod_event_socket.c +++ b/src/mod/event_handlers/mod_event_socket/mod_event_socket.c @@ -116,6 +116,7 @@ static struct { static void remove_listener(listener_t *listener); +static void kill_all_listeners(void); static uint32_t next_id(void) { @@ -452,32 +453,25 @@ static void close_socket(switch_socket_t **sock) SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_event_socket_shutdown) { - listener_t *l; int sanity = 0; prefs.done = 1; + kill_all_listeners(); switch_log_unbind_logger(socket_logger); close_socket(&listen_list.sock); while (prefs.threads) { - switch_yield(10000); - if (++sanity == 1000) { + switch_yield(100000); + kill_all_listeners(); + if (++sanity >= 200) { break; } } + switch_event_unbind(&globals.node); - switch_mutex_lock(globals.listener_mutex); - for (l = listen_list.listeners; l; l = l->next) { - close_socket(&l->sock); - } - switch_mutex_unlock(globals.listener_mutex); - - switch_yield(1000000); - - return SWITCH_STATUS_SUCCESS; } @@ -509,6 +503,22 @@ static void remove_listener(listener_t *listener) } +static void kill_all_listeners(void) +{ + listener_t *l; + + switch_mutex_lock(globals.listener_mutex); + for (l = listen_list.listeners; l; l = l->next) { + switch_clear_flag(l, LFLAG_RUNNING); + if (l->sock) { + switch_socket_shutdown(l->sock, SWITCH_SHUTDOWN_READWRITE); + switch_socket_close(l->sock); + } + } + switch_mutex_unlock(globals.listener_mutex); +} + + static listener_t *find_listener(uint32_t id) { listener_t *l, *r = NULL; @@ -961,12 +971,16 @@ static switch_status_t read_packet(listener_t *listener, switch_event_t **event, uint32_t max_len = sizeof(mbuf); switch_channel_t *channel = NULL; int clen = 0; - + *event = NULL; + + if (prefs.done) { + return SWITCH_STATUS_FALSE; + } + start = switch_epoch_time_now(NULL); ptr = mbuf; - if (listener->session) { channel = switch_core_session_get_channel(listener->session); } @@ -976,7 +990,7 @@ static switch_status_t read_packet(listener_t *listener, switch_event_t **event, mlen = 1; status = switch_socket_recv(listener->sock, ptr, &mlen); - if (!SWITCH_STATUS_IS_BREAK(status) && status != SWITCH_STATUS_SUCCESS) { + if (prefs.done || (!SWITCH_STATUS_IS_BREAK(status) && status != SWITCH_STATUS_SUCCESS)) { return SWITCH_STATUS_FALSE; } @@ -1044,7 +1058,7 @@ static switch_status_t read_packet(listener_t *listener, switch_event_t **event, status = switch_socket_recv(listener->sock, p, &mlen); - if (!SWITCH_STATUS_IS_BREAK(status) && status != SWITCH_STATUS_SUCCESS) { + if (prefs.done || (!SWITCH_STATUS_IS_BREAK(status) && status != SWITCH_STATUS_SUCCESS)) { return SWITCH_STATUS_FALSE; } @@ -1211,10 +1225,15 @@ static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj) switch_stream_handle_t stream = { 0 }; char *reply, *freply = NULL; switch_status_t status; + + switch_mutex_lock(globals.listener_mutex); + prefs.threads++; + switch_mutex_unlock(globals.listener_mutex); + if (!acs) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Internal error.\n"); - return NULL; + goto cleanup; } if (!acs->listener || !switch_test_flag(acs->listener, LFLAG_RUNNING) || @@ -1291,6 +1310,11 @@ static void *SWITCH_THREAD_FUNC api_exec(switch_thread_t *thread, void *obj) } + cleanup: + switch_mutex_lock(globals.listener_mutex); + prefs.threads--; + switch_mutex_unlock(globals.listener_mutex); + return NULL; } @@ -1934,7 +1958,7 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) } } - while (switch_test_flag(listener, LFLAG_RUNNING) && listen_list.ready) { + while (!prefs.done && switch_test_flag(listener, LFLAG_RUNNING) && listen_list.ready) { len = sizeof(buf); memset(buf, 0, len); status = read_packet(listener, &revent, 0); @@ -2106,7 +2130,7 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_event_socket_runtime) config(); - for (;;) { + while(!prefs.done) { rv = switch_sockaddr_info_get(&sa, prefs.ip, SWITCH_INET, prefs.port, 0, pool); if (rv) goto fail; @@ -2132,7 +2156,7 @@ SWITCH_MODULE_RUNTIME_FUNCTION(mod_event_socket_runtime) listen_list.ready = 1; - for (;;) { + while(!prefs.done) { if (switch_core_new_memory_pool(&listener_pool) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "OH OH no pool\n"); goto fail; diff --git a/src/mod/formats/mod_local_stream/mod_local_stream.c b/src/mod/formats/mod_local_stream/mod_local_stream.c index b5f3b7551d..ff04395107 100644 --- a/src/mod/formats/mod_local_stream/mod_local_stream.c +++ b/src/mod/formats/mod_local_stream/mod_local_stream.c @@ -222,13 +222,13 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void if (!used && !is_open) { break; } - + if (!is_open || used >= source->prebuf || (source->total && used > source->samples * 2)) { used = switch_buffer_read(audio_buffer, dist_buf, source->samples * 2); if (source->total) { switch_mutex_lock(source->mutex); - for (cp = source->context_list; cp; cp = cp->next) { + for (cp = source->context_list; cp && RUNNING; cp = cp->next) { if (switch_test_flag(cp->handle, SWITCH_FILE_CALLBACK)) { continue; } diff --git a/src/switch_apr.c b/src/switch_apr.c index b404aaf5e8..d841060fdf 100644 --- a/src/switch_apr.c +++ b/src/switch_apr.c @@ -895,6 +895,11 @@ SWITCH_DECLARE(switch_status_t) switch_queue_trypop(switch_queue_t *queue, void return apr_queue_trypop(queue, data); } +SWITCH_DECLARE(switch_status_t) switch_queue_interrupt_all(switch_queue_t *queue) +{ + return apr_queue_interrupt_all(queue); +} + SWITCH_DECLARE(switch_status_t) switch_queue_trypush(switch_queue_t *queue, void *data) { apr_status_t s; diff --git a/src/switch_core.c b/src/switch_core.c index 72f005747e..30b1353bc2 100644 --- a/src/switch_core.c +++ b/src/switch_core.c @@ -345,9 +345,9 @@ SWITCH_DECLARE(void) switch_core_service_session(switch_core_session_t *session) */ -SWITCH_DECLARE(void) switch_core_launch_thread(switch_thread_start_t func, void *obj, switch_memory_pool_t *pool) +SWITCH_DECLARE(switch_thread_t *) switch_core_launch_thread(switch_thread_start_t func, void *obj, switch_memory_pool_t *pool) { - switch_thread_t *thread; + switch_thread_t *thread = NULL; switch_threadattr_t *thd_attr = NULL; switch_core_thread_session_t *ts; int mypool; @@ -356,11 +356,10 @@ SWITCH_DECLARE(void) switch_core_launch_thread(switch_thread_start_t func, void if (!pool && switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Could not allocate memory pool\n"); - return; + return NULL; } switch_threadattr_create(&thd_attr, pool); - switch_threadattr_detach_set(thd_attr, 1); if ((ts = switch_core_alloc(pool, sizeof(*ts))) == 0) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Could not allocate memory\n"); @@ -369,10 +368,13 @@ SWITCH_DECLARE(void) switch_core_launch_thread(switch_thread_start_t func, void ts->pool = pool; } ts->objs[0] = obj; + ts->objs[1] = thread; switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); switch_threadattr_priority_increase(thd_attr); switch_thread_create(&thread, thd_attr, func, ts, pool); } + + return thread; } SWITCH_DECLARE(void) switch_core_set_globals(void) @@ -1457,6 +1459,7 @@ SWITCH_DECLARE(switch_bool_t) switch_core_ready(void) SWITCH_DECLARE(switch_status_t) switch_core_destroy(void) { switch_event_t *event; + if (switch_event_create(&event, SWITCH_EVENT_SHUTDOWN) == SWITCH_STATUS_SUCCESS) { switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Event-Info", "System Shutting Down"); switch_event_fire(&event); @@ -1464,26 +1467,29 @@ SWITCH_DECLARE(switch_status_t) switch_core_destroy(void) switch_set_flag((&runtime), SCF_NO_NEW_SESSIONS); switch_set_flag((&runtime), SCF_SHUTTING_DOWN); - + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "End existing sessions\n"); switch_core_session_hupall(SWITCH_CAUSE_SYSTEM_SHUTDOWN); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Clean up modules.\n"); - switch_core_memory_stop(); + switch_loadable_module_shutdown(); - - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Closing Event Engine.\n"); - switch_event_shutdown(); - + if (switch_test_flag((&runtime), SCF_USE_SQL)) { switch_core_sqldb_stop(); } switch_scheduler_task_thread_stop(); - + switch_rtp_shutdown(); switch_xml_destroy(); + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Closing Event Engine.\n"); + switch_event_shutdown(); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Finalizing Shutdown.\n"); switch_log_shutdown(); + switch_core_memory_stop(); + if (runtime.console && runtime.console != stdout && runtime.console != stderr) { fclose(runtime.console); runtime.console = NULL; diff --git a/src/switch_core_session.c b/src/switch_core_session.c index d7abf0e60a..ca7bb5239b 100644 --- a/src/switch_core_session.c +++ b/src/switch_core_session.c @@ -58,6 +58,9 @@ SWITCH_DECLARE(switch_core_session_t *) switch_core_session_locate(const char *u /* Acquire a read lock on the session */ #ifdef SWITCH_DEBUG_RWLOCKS if (switch_core_session_perform_read_lock(session, file, func, line) != SWITCH_STATUS_SUCCESS) { +#if EMACS_CC_MODE_IS_BUGGY + } +#endif #else if (switch_core_session_read_lock(session) != SWITCH_STATUS_SUCCESS) { #endif @@ -127,21 +130,20 @@ SWITCH_DECLARE(void) switch_core_session_hupall(switch_call_cause_t cause) void *val; switch_core_session_t *session; uint32_t loops = 0; - - switch_mutex_lock(runtime.throttle_mutex); - for (hi = switch_hash_first(NULL, session_manager.session_table); hi; hi = switch_hash_next(hi)) { - switch_hash_this(hi, NULL, NULL, &val); - if (val) { - session = (switch_core_session_t *) val; - if (switch_core_session_read_lock(session) == SWITCH_STATUS_SUCCESS) { - switch_channel_hangup(switch_core_session_get_channel(session), cause); - switch_core_session_rwunlock(session); + + while (session_manager.session_count > 0) { + switch_mutex_lock(runtime.throttle_mutex); + for (hi = switch_hash_first(NULL, session_manager.session_table); hi; hi = switch_hash_next(hi)) { + switch_hash_this(hi, NULL, NULL, &val); + if (val) { + session = (switch_core_session_t *) val; + if (switch_core_session_read_lock(session) == SWITCH_STATUS_SUCCESS) { + switch_channel_hangup(switch_core_session_get_channel(session), cause); + switch_core_session_rwunlock(session); + } } } - } - switch_mutex_unlock(runtime.throttle_mutex); - - while (session_manager.session_count > 0) { + switch_mutex_unlock(runtime.throttle_mutex); switch_yield(1000000); if (++loops == 30) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Giving up with %d session%s remaining\n", @@ -151,6 +153,7 @@ SWITCH_DECLARE(void) switch_core_session_hupall(switch_call_cause_t cause) } } + SWITCH_DECLARE(switch_status_t) switch_core_session_message_send(const char *uuid_str, switch_core_session_message_t *message) { switch_core_session_t *session = NULL; diff --git a/src/switch_event.c b/src/switch_event.c index e17f3c4a03..68c8955650 100644 --- a/src/switch_event.c +++ b/src/switch_event.c @@ -74,7 +74,9 @@ static switch_mutex_t *POOL_LOCK = NULL; static switch_memory_pool_t *RUNTIME_POOL = NULL; static switch_memory_pool_t *THRUNTIME_POOL = NULL; #define NUMBER_OF_QUEUES 3 +static switch_thread_t *EVENT_QUEUE_THREADS[NUMBER_OF_QUEUES] = { 0 }; static switch_queue_t *EVENT_QUEUE[NUMBER_OF_QUEUES] = { 0 }; +static switch_thread_t *EVENT_DISPATCH_QUEUE_THREADS[MAX_DISPATCH_VAL] = { 0 }; static switch_queue_t *EVENT_DISPATCH_QUEUE[MAX_DISPATCH_VAL] = { 0 }; static int POOL_COUNT_MAX = SWITCH_CORE_QUEUE_LEN; static switch_mutex_t *EVENT_QUEUE_MUTEX = NULL; @@ -174,15 +176,15 @@ static char *EVENT_NAMES[] = { static int switch_events_match(switch_event_t *event, switch_event_node_t *node) { int match = 0; - + if (node->event_id == SWITCH_EVENT_ALL) { match++; - + if (!node->subclass) { return match; } } - + if (match || event->event_id == node->event_id) { if (event->subclass_name && node->subclass) { @@ -227,6 +229,10 @@ static void *SWITCH_THREAD_FUNC switch_event_dispatch_thread(switch_thread_t *th void *pop = NULL; switch_event_t *event = NULL; + if (!SYSTEM_RUNNING) { + break; + } + if (switch_queue_pop(queue, &pop) != SWITCH_STATUS_SUCCESS) { break; } @@ -278,6 +284,10 @@ static void *SWITCH_THREAD_FUNC switch_event_thread(switch_thread_t *thread, voi break; } + if (!SYSTEM_RUNNING) { + break; + } + event = (switch_event_t *) pop; while (event) { @@ -313,20 +323,22 @@ SWITCH_DECLARE(void) switch_event_deliver(switch_event_t **event) switch_event_types_t e; switch_event_node_t *node; - switch_thread_rwlock_rdlock(RWLOCK); - for (e = (*event)->event_id;; e = SWITCH_EVENT_ALL) { - for (node = EVENT_NODES[e]; node; node = node->next) { - if (switch_events_match(*event, node)) { - (*event)->bind_user_data = node->user_data; - node->callback(*event); + if (SYSTEM_RUNNING) { + switch_thread_rwlock_rdlock(RWLOCK); + for (e = (*event)->event_id;; e = SWITCH_EVENT_ALL) { + for (node = EVENT_NODES[e]; node; node = node->next) { + if (switch_events_match(*event, node)) { + (*event)->bind_user_data = node->user_data; + node->callback(*event); + } + } + + if (e == SWITCH_EVENT_ALL) { + break; } } - - if (e == SWITCH_EVENT_ALL) { - break; - } + switch_thread_rwlock_unlock(RWLOCK); } - switch_thread_rwlock_unlock(RWLOCK); switch_event_destroy(event); } @@ -416,10 +428,10 @@ SWITCH_DECLARE(void) switch_core_memory_reclaim_events(void) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Returning %d recycled event header(s) %d bytes\n", size, (int) sizeof(switch_event_header_t) * size); - while (switch_queue_trypop(EVENT_HEADER_RECYCLE_QUEUE, &pop) == SWITCH_STATUS_SUCCESS) { + while (switch_queue_trypop(EVENT_HEADER_RECYCLE_QUEUE, &pop) == SWITCH_STATUS_SUCCESS && pop) { free(pop); } - while (switch_queue_trypop(EVENT_RECYCLE_QUEUE, &pop) == SWITCH_STATUS_SUCCESS) { + while (switch_queue_trypop(EVENT_RECYCLE_QUEUE, &pop) == SWITCH_STATUS_SUCCESS && pop) { free(pop); } } @@ -438,12 +450,14 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void) for (x = 0; x < 3; x++) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping event queue %d\n", x); - switch_queue_push(EVENT_QUEUE[x], NULL); + switch_queue_trypush(EVENT_QUEUE[x], NULL); + switch_queue_interrupt_all(EVENT_QUEUE[x]); } for (x = 0; x < SOFT_MAX_DISPATCH; x++) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch queue %d\n", x); - switch_queue_push(EVENT_DISPATCH_QUEUE[x], NULL); + switch_queue_trypush(EVENT_DISPATCH_QUEUE[x], NULL); + switch_queue_interrupt_all(EVENT_DISPATCH_QUEUE[x]); } while (x < 10000 && THREAD_COUNT) { @@ -453,7 +467,35 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void) } last = THREAD_COUNT; } + + for (x = 0; x < SOFT_MAX_DISPATCH; x++) { + void *pop = NULL; + switch_event_t *event = NULL; + switch_status_t st; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping dispatch thread %d\n", x); + switch_thread_join(&st, EVENT_DISPATCH_QUEUE_THREADS[x]); + + while (switch_queue_trypop(EVENT_DISPATCH_QUEUE[x], &pop) == SWITCH_STATUS_SUCCESS && pop) { + event = (switch_event_t *) pop; + switch_event_destroy(&event); + } + } + + for (x = 0; x < NUMBER_OF_QUEUES; x++) { + void *pop = NULL; + switch_event_t *event = NULL; + switch_status_t st; + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping queue thread %d\n", x); + switch_thread_join(&st, EVENT_QUEUE_THREADS[x]); + + while (switch_queue_trypop(EVENT_QUEUE[x], &pop) == SWITCH_STATUS_SUCCESS && pop) { + event = (switch_event_t *) pop; + switch_event_destroy(&event); + } + } + for (hi = switch_hash_first(NULL, CUSTOM_HASH); hi; hi = switch_hash_next(hi)) { switch_event_subclass_t *subclass; switch_hash_this(hi, &var, NULL, &val); @@ -472,7 +514,6 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void) static void launch_dispatch_threads(uint32_t max, int len, switch_memory_pool_t *pool) { - switch_thread_t *thread; switch_threadattr_t *thd_attr; uint32_t index = 0; @@ -492,8 +533,7 @@ static void launch_dispatch_threads(uint32_t max, int len, switch_memory_pool_t switch_threadattr_create(&thd_attr, pool); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); switch_threadattr_priority_increase(thd_attr); - switch_threadattr_detach_set(thd_attr, 1); - switch_thread_create(&thread, thd_attr, switch_event_dispatch_thread, EVENT_DISPATCH_QUEUE[index], pool); + switch_thread_create(&EVENT_DISPATCH_QUEUE_THREADS[index], thd_attr, switch_event_dispatch_thread, EVENT_DISPATCH_QUEUE[index], pool); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Create event dispatch thread %d\n", index); } @@ -502,18 +542,26 @@ static void launch_dispatch_threads(uint32_t max, int len, switch_memory_pool_t SWITCH_DECLARE(switch_status_t) switch_event_init(switch_memory_pool_t *pool) { - switch_thread_t *thread; switch_threadattr_t *thd_attr;; switch_assert(pool != NULL); + THRUNTIME_POOL = RUNTIME_POOL = pool; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Activate Eventing Engine.\n"); + switch_thread_rwlock_create(&RWLOCK, RUNTIME_POOL); + switch_mutex_init(&BLOCK, SWITCH_MUTEX_NESTED, RUNTIME_POOL); + switch_mutex_init(&POOL_LOCK, SWITCH_MUTEX_NESTED, RUNTIME_POOL); + switch_mutex_init(&EVENT_QUEUE_MUTEX, SWITCH_MUTEX_NESTED, RUNTIME_POOL); + switch_core_hash_init(&CUSTOM_HASH, RUNTIME_POOL); + + switch_mutex_lock(EVENT_QUEUE_MUTEX); + SYSTEM_RUNNING = -1; + switch_mutex_unlock(EVENT_QUEUE_MUTEX); switch_threadattr_create(&thd_attr, pool); - switch_threadattr_detach_set(thd_attr, 1); gethostname(hostname, sizeof(hostname)); switch_find_local_ip(guess_ip_v4, sizeof(guess_ip_v4), AF_INET); switch_find_local_ip(guess_ip_v6, sizeof(guess_ip_v6), AF_INET6); - THRUNTIME_POOL = RUNTIME_POOL = pool; switch_queue_create(&EVENT_QUEUE[0], POOL_COUNT_MAX + 10, THRUNTIME_POOL); switch_queue_create(&EVENT_QUEUE[1], POOL_COUNT_MAX + 10, THRUNTIME_POOL); @@ -521,21 +569,13 @@ SWITCH_DECLARE(switch_status_t) switch_event_init(switch_memory_pool_t *pool) switch_queue_create(&EVENT_RECYCLE_QUEUE, 250000, THRUNTIME_POOL); switch_queue_create(&EVENT_HEADER_RECYCLE_QUEUE, 250000, THRUNTIME_POOL); - - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Activate Eventing Engine.\n"); - switch_thread_rwlock_create(&RWLOCK, RUNTIME_POOL); - switch_mutex_init(&BLOCK, SWITCH_MUTEX_NESTED, RUNTIME_POOL); - switch_mutex_init(&POOL_LOCK, SWITCH_MUTEX_NESTED, RUNTIME_POOL); - switch_mutex_init(&EVENT_QUEUE_MUTEX, SWITCH_MUTEX_NESTED, RUNTIME_POOL); - switch_core_hash_init(&CUSTOM_HASH, RUNTIME_POOL); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); switch_threadattr_priority_increase(thd_attr); - switch_threadattr_detach_set(thd_attr, 1); launch_dispatch_threads(1, DISPATCH_QUEUE_LEN, RUNTIME_POOL); - switch_thread_create(&thread, thd_attr, switch_event_thread, EVENT_QUEUE[0], RUNTIME_POOL); - switch_thread_create(&thread, thd_attr, switch_event_thread, EVENT_QUEUE[1], RUNTIME_POOL); - switch_thread_create(&thread, thd_attr, switch_event_thread, EVENT_QUEUE[2], RUNTIME_POOL); + switch_thread_create(&EVENT_QUEUE_THREADS[0], thd_attr, switch_event_thread, EVENT_QUEUE[0], RUNTIME_POOL); + switch_thread_create(&EVENT_QUEUE_THREADS[1], thd_attr, switch_event_thread, EVENT_QUEUE[1], RUNTIME_POOL); + switch_thread_create(&EVENT_QUEUE_THREADS[2], thd_attr, switch_event_thread, EVENT_QUEUE[2], RUNTIME_POOL); while (!THREAD_COUNT) { switch_cond_next(); @@ -1029,7 +1069,7 @@ SWITCH_DECLARE(switch_status_t) switch_event_fire_detailed(const char *file, con if (SYSTEM_RUNNING <= 0) { /* sorry we're closed */ switch_event_destroy(event); - return SWITCH_STATUS_FALSE; + return SWITCH_STATUS_SUCCESS; } switch_event_add_header_string(*event, SWITCH_STACK_BOTTOM, "Event-Name", switch_event_name((*event)->event_id)); diff --git a/src/switch_loadable_module.c b/src/switch_loadable_module.c index db0e56bf99..095754051f 100644 --- a/src/switch_loadable_module.c +++ b/src/switch_loadable_module.c @@ -52,6 +52,7 @@ struct switch_loadable_module { switch_module_shutdown_t switch_module_shutdown; switch_memory_pool_t *pool; switch_status_t status; + switch_thread_t *thread; }; struct switch_loadable_module_container { @@ -117,7 +118,7 @@ static void switch_loadable_module_runtime(void) if (module->switch_module_runtime) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Starting runtime thread for %s\n", module->module_interface->module_name); - switch_core_launch_thread(switch_loadable_module_exec, module, loadable_modules.pool); + module->thread = switch_core_launch_thread(switch_loadable_module_exec, module, loadable_modules.pool); } } switch_mutex_unlock(loadable_modules.mutex); @@ -910,7 +911,7 @@ static switch_status_t switch_loadable_module_load_module_ex(char *dir, char *fn } else if ((status = switch_loadable_module_load_file(path, file, global, &new_module)) == SWITCH_STATUS_SUCCESS) { if ((status = switch_loadable_module_process(file, new_module)) == SWITCH_STATUS_SUCCESS && runtime) { if (new_module->switch_module_runtime) { - switch_core_launch_thread(switch_loadable_module_exec, new_module, new_module->pool); + new_module->thread = switch_core_launch_thread(switch_loadable_module_exec, new_module, new_module->pool); } } else if (status != SWITCH_STATUS_SUCCESS) { *err = "module load routine returned an error"; @@ -1053,7 +1054,7 @@ SWITCH_DECLARE(switch_status_t) switch_loadable_module_build_dynamic(char *filen module->switch_module_runtime = switch_module_runtime; } if (runtime && module->switch_module_runtime) { - switch_core_launch_thread(switch_loadable_module_exec, module, module->pool); + module->thread = switch_core_launch_thread(switch_loadable_module_exec, module, module->pool); } switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Successfully Loaded [%s]\n", module_interface->module_name); return switch_loadable_module_process((char *) module->filename, module); @@ -1241,6 +1242,13 @@ static switch_status_t do_shutdown(switch_loadable_module_t *module, switch_bool if (unload && module->status != SWITCH_STATUS_NOUNLOAD && !(flags & SCF_VG)) { switch_memory_pool_t *pool; + switch_status_t st; + + if (module->thread) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "%s stopping runtime thread.\n", module->module_interface->module_name); + switch_thread_join(&st, module->thread); + } + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "%s unloaded.\n", module->module_interface->module_name); switch_dso_destroy(&module->lib); if ((pool = module->pool)) { diff --git a/src/switch_log.c b/src/switch_log.c index 6f0e6519ba..86532f9c02 100644 --- a/src/switch_log.c +++ b/src/switch_log.c @@ -193,6 +193,8 @@ SWITCH_DECLARE(switch_status_t) switch_log_bind_logger(switch_log_function_t fun return SWITCH_STATUS_SUCCESS; } +static switch_thread_t *thread; + static void *SWITCH_THREAD_FUNC log_thread(switch_thread_t *thread, void *obj) { @@ -382,7 +384,6 @@ SWITCH_DECLARE(void) switch_log_printf(switch_text_channel_t channel, const char SWITCH_DECLARE(switch_status_t) switch_log_init(switch_memory_pool_t *pool, switch_bool_t colorize) { - switch_thread_t *thread; switch_threadattr_t *thd_attr;; switch_assert(pool != NULL); @@ -397,7 +398,6 @@ SWITCH_DECLARE(switch_status_t) switch_log_init(switch_memory_pool_t *pool, swit switch_queue_create(&LOG_RECYCLE_QUEUE, SWITCH_CORE_QUEUE_LEN, LOG_POOL); switch_mutex_init(&BINDLOCK, SWITCH_MUTEX_NESTED, LOG_POOL); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); - switch_threadattr_detach_set(thd_attr, 1); switch_thread_create(&thread, thd_attr, log_thread, NULL, LOG_POOL); while (!THREAD_RUNNING) { @@ -433,12 +433,16 @@ SWITCH_DECLARE(void) switch_core_memory_reclaim_logger(void) SWITCH_DECLARE(switch_status_t) switch_log_shutdown(void) { + switch_status_t st; THREAD_RUNNING = -1; switch_queue_push(LOG_QUEUE, NULL); while (THREAD_RUNNING) { switch_cond_next(); } + + switch_thread_join(&st, thread); + switch_core_memory_reclaim_logger(); return SWITCH_STATUS_SUCCESS;