From 0a9ed88019d89d3590c63b1ce1bea69a1ab289fd Mon Sep 17 00:00:00 2001 From: Anthony Minessale Date: Fri, 27 Sep 2013 23:37:05 +0500 Subject: [PATCH] improve thread pool logic --- src/include/private/switch_core_pvt.h | 4 ++ src/switch_core.c | 6 +++ src/switch_core_session.c | 68 ++++++++++++++++++++++----- 3 files changed, 67 insertions(+), 11 deletions(-) diff --git a/src/include/private/switch_core_pvt.h b/src/include/private/switch_core_pvt.h index 1b0ab28742..99dd7af55b 100644 --- a/src/include/private/switch_core_pvt.h +++ b/src/include/private/switch_core_pvt.h @@ -288,10 +288,14 @@ struct switch_session_manager { switch_queue_t *thread_queue; switch_thread_t *manager_thread; switch_mutex_t *mutex; + switch_thread_cond_t *cond; + switch_mutex_t *cond_mutex; + switch_mutex_t *cond2_mutex; int ready; int running; int busy; int popping; + int starting; }; extern struct switch_session_manager session_manager; diff --git a/src/switch_core.c b/src/switch_core.c index 3bc1a33d7b..6f382f3725 100644 --- a/src/switch_core.c +++ b/src/switch_core.c @@ -1779,6 +1779,12 @@ static void switch_load_core_config(const char *file) switch_core_hash_insert(runtime.ptimes, "isac", &d_30); switch_core_hash_insert(runtime.ptimes, "G723", &d_30); + if (runtime.cpu_count == 1) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, + "Implicitly setting events-use-dispatch based on a single CPU\n"); + runtime.events_use_dispatch = 1; + } + if ((xml = switch_xml_open_cfg(file, &cfg, NULL))) { switch_xml_t settings, param; diff --git a/src/switch_core_session.c b/src/switch_core_session.c index cd7d8a965e..f77ef60559 100644 --- a/src/switch_core_session.c +++ b/src/switch_core_session.c @@ -1581,6 +1581,7 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_worker(switch_th int check = 0; switch_mutex_lock(session_manager.mutex); + session_manager.starting--; session_manager.running++; switch_mutex_unlock(session_manager.mutex); #ifdef DEBUG_THREAD_POOL @@ -1675,6 +1676,33 @@ static void thread_launch_failure(void) switch_mutex_unlock(session_manager.mutex); } +static int wake_queue(void) +{ + switch_status_t status; + int tries = 0; + + top: + + status = switch_mutex_trylock(session_manager.cond_mutex); + + if (status == SWITCH_STATUS_SUCCESS) { + switch_thread_cond_signal(session_manager.cond); + switch_mutex_unlock(session_manager.cond_mutex); + return 1; + } else { + if (switch_mutex_trylock(session_manager.cond2_mutex) == SWITCH_STATUS_SUCCESS) { + switch_mutex_unlock(session_manager.cond2_mutex); + } else { + if (++tries < 10) { + switch_cond_next(); + goto top; + } + } + } + + return 0; +} + static switch_status_t check_queue(void) { switch_status_t status = SWITCH_STATUS_FALSE; @@ -1683,7 +1711,7 @@ static switch_status_t check_queue(void) switch_mutex_lock(session_manager.mutex); ttl = switch_queue_size(session_manager.thread_queue); - x = (session_manager.running - session_manager.busy); + x = ((session_manager.running + session_manager.starting) - session_manager.busy); switch_mutex_unlock(session_manager.mutex); @@ -1710,6 +1738,10 @@ static switch_status_t check_queue(void) } else { status = SWITCH_STATUS_SUCCESS; } + + switch_mutex_lock(session_manager.mutex); + session_manager.starting++; + switch_mutex_unlock(session_manager.mutex); x++; } @@ -1719,12 +1751,20 @@ static switch_status_t check_queue(void) static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_manager(switch_thread_t *thread, void *obj) { - int x = 0; + + uint32_t sleep = 10000000; + switch_time_t next = switch_micro_time_now() + sleep; + + switch_mutex_lock(session_manager.cond_mutex); while(session_manager.ready) { - switch_yield(100000); + int check = 1; - if (++x == 300) { + switch_mutex_lock(session_manager.cond2_mutex); + switch_thread_cond_timedwait(session_manager.cond, session_manager.cond_mutex, sleep); + switch_mutex_unlock(session_manager.cond2_mutex); + + if (switch_micro_time_now() >= next) { if (session_manager.popping) { #ifdef DEBUG_THREAD_POOL switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, @@ -1732,17 +1772,20 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_manager(switch_t #endif switch_queue_interrupt_all(session_manager.thread_queue); - x--; - - continue; + sleep = 100000; + check = 0; } else { - x = 0; + sleep = 10000000; } } - check_queue(); + if (check) check_queue(); + + next = switch_micro_time_now() + sleep; } + switch_mutex_unlock(session_manager.cond_mutex); + while(session_manager.running) { switch_queue_interrupt_all(session_manager.thread_queue); switch_yield(20000); @@ -1763,7 +1806,7 @@ SWITCH_DECLARE(switch_status_t) switch_thread_pool_launch_thread(switch_thread_d *tdp = NULL; switch_queue_push(session_manager.thread_queue, td); - check_queue(); + wake_queue(); return status; } @@ -1786,7 +1829,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_thread_pool_launch(switch_co td->obj = session; td->func = switch_core_session_thread; switch_queue_push(session_manager.thread_queue, td); - check_queue(); + wake_queue(); } switch_mutex_unlock(session->mutex); @@ -2431,6 +2474,9 @@ void switch_core_session_init(switch_memory_pool_t *pool) switch_threadattr_t *thd_attr; switch_mutex_init(&session_manager.mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool); + switch_thread_cond_create(&session_manager.cond, session_manager.memory_pool); + switch_mutex_init(&session_manager.cond_mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool); + switch_mutex_init(&session_manager.cond2_mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool); switch_queue_create(&session_manager.thread_queue, 100000, session_manager.memory_pool); switch_threadattr_create(&thd_attr, session_manager.memory_pool); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);