diff --git a/src/include/private/switch_core_pvt.h b/src/include/private/switch_core_pvt.h index 2df37b0e05..95fd120337 100644 --- a/src/include/private/switch_core_pvt.h +++ b/src/include/private/switch_core_pvt.h @@ -283,6 +283,11 @@ struct switch_session_manager { uint32_t session_count; uint32_t session_limit; switch_size_t session_id; + switch_queue_t *thread_queue; + switch_mutex_t *mutex; + int ready; + int running; + int busy; }; extern struct switch_session_manager session_manager; diff --git a/src/include/switch_core.h b/src/include/switch_core.h index dcaace2724..1fda42588c 100644 --- a/src/include/switch_core.h +++ b/src/include/switch_core.h @@ -690,6 +690,9 @@ SWITCH_DECLARE(switch_core_session_t *) switch_core_session_request_by_name(_In_ */ SWITCH_DECLARE(switch_status_t) switch_core_session_thread_launch(_In_ switch_core_session_t *session); + +SWITCH_DECLARE(switch_status_t) switch_core_session_thread_pool_launch(switch_core_session_t *session); + /*! \brief Retrieve a pointer to the channel object associated with a given session \param session the session to retrieve from diff --git a/src/include/switch_types.h b/src/include/switch_types.h index 381ab22012..6a593a879e 100644 --- a/src/include/switch_types.h +++ b/src/include/switch_types.h @@ -323,7 +323,8 @@ typedef enum { SCF_SYNC_CLOCK_REQUESTED = (1 << 19), SCF_CORE_ODBC_REQ = (1 << 20), SCF_DEBUG_SQL = (1 << 21), - SCF_API_EXPANSION = (1 << 22) + SCF_API_EXPANSION = (1 << 22), + SCF_SESSION_THREAD_POOL = (1 << 23) } switch_core_flag_enum_t; typedef uint32_t switch_core_flag_t; diff --git a/src/switch_core.c b/src/switch_core.c index 49b84f998a..e259a2d8a8 100644 --- a/src/switch_core.c +++ b/src/switch_core.c @@ -1440,6 +1440,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_init(switch_core_flag_t flags, switc switch_set_flag((&runtime), SCF_AUTO_SCHEMAS); switch_set_flag((&runtime), SCF_CLEAR_SQL); switch_set_flag((&runtime), SCF_API_EXPANSION); + switch_set_flag((&runtime), SCF_SESSION_THREAD_POOL); #ifdef WIN32 switch_set_flag((&runtime), SCF_THREADED_SYSTEM_EXEC); #endif @@ -1751,6 +1752,12 @@ static void switch_load_core_config(const char *file) } else { switch_clear_flag((&runtime), SCF_AUTO_SCHEMAS); } + } else if (!strcasecmp(var, "session-thread-pool")) { + if (switch_true(val)) { + switch_set_flag((&runtime), SCF_SESSION_THREAD_POOL); + } else { + switch_clear_flag((&runtime), SCF_SESSION_THREAD_POOL); + } } else if (!strcasecmp(var, "auto-clear-sql")) { if (switch_true(val)) { switch_set_flag((&runtime), SCF_CLEAR_SQL); diff --git a/src/switch_core_session.c b/src/switch_core_session.c index 4113afd1e0..4b334315e2 100644 --- a/src/switch_core_session.c +++ b/src/switch_core_session.c @@ -1451,12 +1451,156 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread(switch_thread_t *thre return NULL; } +typedef struct switch_thread_pool_node_s { + switch_memory_pool_t *pool; +} switch_thread_pool_node_t; + +static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_worker(switch_thread_t *thread, void *obj) +{ + switch_thread_pool_node_t *node = (switch_thread_pool_node_t *) obj; + switch_memory_pool_t *pool = node->pool; + void *pop; + int check = 0; + + switch_mutex_lock(session_manager.mutex); + session_manager.running++; + switch_mutex_unlock(session_manager.mutex); + + while(session_manager.ready) { + switch_status_t check_status; + + if (check) { + check_status = switch_queue_trypop(session_manager.thread_queue, &pop); + } else { + check_status = switch_queue_pop(session_manager.thread_queue, &pop); + } + + if (check_status == SWITCH_STATUS_SUCCESS) { + switch_core_session_t *session = (switch_core_session_t *) pop; + + if (!session) break; + + + switch_mutex_lock(session_manager.mutex); + session_manager.busy++; + switch_mutex_unlock(session_manager.mutex); + + switch_core_session_thread(thread, (void *) session); + + switch_mutex_lock(session_manager.mutex); + session_manager.busy--; + switch_mutex_unlock(session_manager.mutex); + + } else { + if (check) { + break; + } + check++; + } + } + + switch_mutex_lock(session_manager.mutex); + session_manager.running--; + switch_mutex_unlock(session_manager.mutex); + + switch_core_destroy_memory_pool(&pool); + + return NULL; +} + + +static switch_status_t check_queue(void) +{ + switch_status_t status = SWITCH_STATUS_FALSE; + int ttl = 0; + int x = 0; + + switch_mutex_lock(session_manager.mutex); + ttl = switch_queue_size(session_manager.thread_queue); + x = (session_manager.running - session_manager.busy); + switch_mutex_unlock(session_manager.mutex); + + + + while (x < ttl) { + switch_thread_t *thread; + switch_threadattr_t *thd_attr; + switch_memory_pool_t *pool; + switch_thread_pool_node_t *node; + + switch_core_new_memory_pool(&pool); + node = switch_core_alloc(pool, sizeof(*node)); + node->pool = pool; + + switch_threadattr_create(&thd_attr, node->pool); + switch_threadattr_detach_set(thd_attr, 1); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + + if (switch_thread_create(&thread, thd_attr, switch_core_session_thread_pool_worker, node, node->pool) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Thread Failure!\n"); + switch_core_destroy_memory_pool(&pool); + status = SWITCH_STATUS_GENERR; + } else { + status = SWITCH_STATUS_SUCCESS; + } + x++; + } + + return status; +} + + +static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_manager(switch_thread_t *thread, void *obj) +{ + int x = 0; + + while(session_manager.ready) { + switch_yield(100000); + + if (++x == 300) { + switch_queue_interrupt_all(session_manager.thread_queue); + x = 0; + } + + check_queue(); + } + + return NULL; +} + + +SWITCH_DECLARE(switch_status_t) switch_core_session_thread_pool_launch(switch_core_session_t *session) +{ + if (session_manager.ready == 1) { + switch_thread_t *thread; + switch_threadattr_t *thd_attr; + + switch_queue_create(&session_manager.thread_queue, 100000, session_manager.memory_pool); + switch_mutex_init(&session_manager.mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool); + + switch_threadattr_create(&thd_attr, session_manager.memory_pool); + switch_threadattr_detach_set(thd_attr, 1); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + switch_thread_create(&thread, thd_attr, switch_core_session_thread_pool_manager, NULL, session_manager.memory_pool); + session_manager.ready++; + } + + switch_queue_push(session_manager.thread_queue, session); + check_queue(); + return SWITCH_STATUS_SUCCESS; +} + + SWITCH_DECLARE(switch_status_t) switch_core_session_thread_launch(switch_core_session_t *session) { switch_status_t status = SWITCH_STATUS_FALSE; switch_thread_t *thread; - switch_threadattr_t *thd_attr;; + switch_threadattr_t *thd_attr; + if (switch_test_flag((&runtime), SCF_SESSION_THREAD_POOL)) { + return switch_core_session_thread_pool_launch(session); + } + switch_threadattr_create(&thd_attr, session->pool); switch_threadattr_detach_set(thd_attr, 1); @@ -2042,16 +2186,24 @@ SWITCH_DECLARE(uint32_t) switch_core_sessions_per_second(uint32_t new_limit) void switch_core_session_init(switch_memory_pool_t *pool) { + + memset(&session_manager, 0, sizeof(session_manager)); session_manager.session_limit = 1000; session_manager.session_id = 1; session_manager.memory_pool = pool; switch_core_hash_init(&session_manager.session_table, session_manager.memory_pool); + session_manager.ready = 1; + + } void switch_core_session_uninit(void) { switch_core_hash_destroy(&session_manager.session_table); + session_manager.ready = 0; + switch_queue_interrupt_all(session_manager.thread_queue); + } SWITCH_DECLARE(switch_app_log_t *) switch_core_session_get_app_log(switch_core_session_t *session) diff --git a/src/switch_core_state_machine.c b/src/switch_core_state_machine.c index 46d49e6883..da00b6bca5 100644 --- a/src/switch_core_state_machine.c +++ b/src/switch_core_state_machine.c @@ -336,7 +336,7 @@ SWITCH_DECLARE(void) switch_core_session_run(switch_core_session_t *session) const switch_state_handler_table_t *driver_state_handler = NULL; const switch_state_handler_table_t *application_state_handler = NULL; int silly = 0; - uint32_t new_loops = 5000; + // uint32_t new_loops = 5000; /* Life of the channel. you have channel and pool in your session @@ -468,6 +468,7 @@ SWITCH_DECLARE(void) switch_core_session_run(switch_core_session_t *session) endstate = switch_channel_get_state(session->channel); if (endstate == switch_channel_get_running_state(session->channel)) { + /** if (endstate == CS_NEW) { switch_cond_next(); switch_ivr_parse_all_events(session); @@ -477,6 +478,7 @@ SWITCH_DECLARE(void) switch_core_session_run(switch_core_session_t *session) switch_channel_hangup(session->channel, SWITCH_CAUSE_INVALID_CALL_REFERENCE); } } else { + **/ switch_ivr_parse_all_events(session); switch_ivr_parse_all_events(session); @@ -490,7 +492,7 @@ SWITCH_DECLARE(void) switch_core_session_run(switch_core_session_t *session) switch_ivr_parse_all_events(session); switch_ivr_parse_all_events(session); - } + //} } } done: diff --git a/src/switch_event.c b/src/switch_event.c index 5b7891d496..4a53229af7 100644 --- a/src/switch_event.c +++ b/src/switch_event.c @@ -307,6 +307,8 @@ static switch_status_t switch_event_queue_dispatch_event(switch_event_t **eventp if (switch_queue_size(EVENT_DISPATCH_QUEUE) > (unsigned int)(DISPATCH_QUEUE_LEN * DISPATCH_THREAD_COUNT)) { launch++; } + + switch_mutex_unlock(EVENT_QUEUE_MUTEX); if (launch) { if (SOFT_MAX_DISPATCH + 1 < MAX_DISPATCH) { @@ -314,8 +316,6 @@ static switch_status_t switch_event_queue_dispatch_event(switch_event_t **eventp } } - switch_mutex_unlock(EVENT_QUEUE_MUTEX); - *eventp = NULL; switch_queue_push(EVENT_DISPATCH_QUEUE, event); event = NULL;