From 16d483f113cd713e6d42e6749d2c3de99ce23d13 Mon Sep 17 00:00:00 2001 From: Anthony Minessale Date: Tue, 21 Aug 2012 19:47:05 -0500 Subject: [PATCH] update to session thread cache --- src/switch_core_session.c | 71 +++++++++++++++++++++++++-------------- 1 file changed, 46 insertions(+), 25 deletions(-) diff --git a/src/switch_core_session.c b/src/switch_core_session.c index 8e02b2e325..0fcd2988b4 100644 --- a/src/switch_core_session.c +++ b/src/switch_core_session.c @@ -1466,6 +1466,8 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_worker(switch_th session_manager.running++; switch_mutex_unlock(session_manager.mutex); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Started\n", (long) thread); + while(session_manager.ready) { switch_status_t check_status; @@ -1485,8 +1487,14 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_worker(switch_th session_manager.busy++; switch_mutex_unlock(session_manager.mutex); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Processing session %ld %s\n", + (long) thread, session->id, switch_core_session_get_name(session)); + switch_core_session_thread(thread, (void *) session); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Done Processing session %ld %s\n", + (long) thread, session->id, switch_core_session_get_name(session)); + switch_mutex_lock(session_manager.mutex); session_manager.busy--; switch_mutex_unlock(session_manager.mutex); @@ -1499,6 +1507,8 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_worker(switch_th } } + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "Worker Thread %ld Ended\n", (long) thread); + switch_mutex_lock(session_manager.mutex); session_manager.running--; switch_mutex_unlock(session_manager.mutex); @@ -1571,23 +1581,23 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_manager(switch_t SWITCH_DECLARE(switch_status_t) switch_core_session_thread_pool_launch(switch_core_session_t *session) { - if (session_manager.ready == 1) { - switch_thread_t *thread; - switch_threadattr_t *thd_attr; - - switch_queue_create(&session_manager.thread_queue, 100000, session_manager.memory_pool); - switch_mutex_init(&session_manager.mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool); - - switch_threadattr_create(&thd_attr, session_manager.memory_pool); - switch_threadattr_detach_set(thd_attr, 1); - switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); - switch_thread_create(&thread, thd_attr, switch_core_session_thread_pool_manager, NULL, session_manager.memory_pool); - session_manager.ready++; + switch_status_t status = SWITCH_STATUS_INUSE; + + switch_mutex_lock(session->mutex); + if (switch_test_flag(session, SSF_THREAD_RUNNING)) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_CRIT, "Cannot double-launch thread!\n"); + } else if (switch_test_flag(session, SSF_THREAD_STARTED)) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_CRIT, "Cannot launch thread again after it has already been run!\n"); + } else { + status = SWITCH_STATUS_SUCCESS; + switch_set_flag(session, SSF_THREAD_RUNNING); + switch_set_flag(session, SSF_THREAD_STARTED); + switch_queue_push(session_manager.thread_queue, session); + check_queue(); } + switch_mutex_unlock(session->mutex); - switch_queue_push(session_manager.thread_queue, session); - check_queue(); - return SWITCH_STATUS_SUCCESS; + return status; } @@ -1596,19 +1606,17 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_thread_launch(switch_core_se switch_status_t status = SWITCH_STATUS_FALSE; switch_thread_t *thread; switch_threadattr_t *thd_attr; - - if (switch_test_flag((&runtime), SCF_SESSION_THREAD_POOL)) { - return switch_core_session_thread_pool_launch(session); - } - switch_threadattr_create(&thd_attr, session->pool); - switch_threadattr_detach_set(thd_attr, 1); - if (switch_test_flag(session, SSF_THREAD_RUNNING) || switch_test_flag(session, SSF_THREAD_STARTED)) { status = SWITCH_STATUS_INUSE; goto end; } + + if (switch_test_flag((&runtime), SCF_SESSION_THREAD_POOL)) { + return switch_core_session_thread_pool_launch(session); + } + switch_mutex_lock(session->mutex); if (switch_test_flag(session, SSF_THREAD_RUNNING)) { @@ -1618,7 +1626,11 @@ SWITCH_DECLARE(switch_status_t) switch_core_session_thread_launch(switch_core_se } else { switch_set_flag(session, SSF_THREAD_RUNNING); switch_set_flag(session, SSF_THREAD_STARTED); + + switch_threadattr_create(&thd_attr, session->pool); + switch_threadattr_detach_set(thd_attr, 1); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + if (switch_thread_create(&thread, thd_attr, switch_core_session_thread, session, session->pool) == SWITCH_STATUS_SUCCESS) { switch_set_flag(session, SSF_THREAD_STARTED); status = SWITCH_STATUS_SUCCESS; @@ -2186,15 +2198,24 @@ SWITCH_DECLARE(uint32_t) switch_core_sessions_per_second(uint32_t new_limit) void switch_core_session_init(switch_memory_pool_t *pool) { - - memset(&session_manager, 0, sizeof(session_manager)); session_manager.session_limit = 1000; session_manager.session_id = 1; session_manager.memory_pool = pool; switch_core_hash_init(&session_manager.session_table, session_manager.memory_pool); - session_manager.ready = 1; + + if (switch_test_flag((&runtime), SCF_SESSION_THREAD_POOL)) { + switch_thread_t *thread; + switch_threadattr_t *thd_attr; + switch_mutex_init(&session_manager.mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool); + switch_queue_create(&session_manager.thread_queue, 100000, session_manager.memory_pool); + switch_threadattr_create(&thd_attr, session_manager.memory_pool); + switch_threadattr_detach_set(thd_attr, 1); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + switch_thread_create(&thread, thd_attr, switch_core_session_thread_pool_manager, NULL, session_manager.memory_pool); + session_manager.ready = 1; + } }