cache session threads and reuse them if possible
This commit is contained in:
parent
b6f64dc1c2
commit
142c051d68
|
@ -283,6 +283,11 @@ struct switch_session_manager {
|
||||||
uint32_t session_count;
|
uint32_t session_count;
|
||||||
uint32_t session_limit;
|
uint32_t session_limit;
|
||||||
switch_size_t session_id;
|
switch_size_t session_id;
|
||||||
|
switch_queue_t *thread_queue;
|
||||||
|
switch_mutex_t *mutex;
|
||||||
|
int ready;
|
||||||
|
int running;
|
||||||
|
int busy;
|
||||||
};
|
};
|
||||||
|
|
||||||
extern struct switch_session_manager session_manager;
|
extern struct switch_session_manager session_manager;
|
||||||
|
|
|
@ -690,6 +690,9 @@ SWITCH_DECLARE(switch_core_session_t *) switch_core_session_request_by_name(_In_
|
||||||
*/
|
*/
|
||||||
SWITCH_DECLARE(switch_status_t) switch_core_session_thread_launch(_In_ switch_core_session_t *session);
|
SWITCH_DECLARE(switch_status_t) switch_core_session_thread_launch(_In_ switch_core_session_t *session);
|
||||||
|
|
||||||
|
|
||||||
|
SWITCH_DECLARE(switch_status_t) switch_core_session_thread_pool_launch(switch_core_session_t *session);
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
\brief Retrieve a pointer to the channel object associated with a given session
|
\brief Retrieve a pointer to the channel object associated with a given session
|
||||||
\param session the session to retrieve from
|
\param session the session to retrieve from
|
||||||
|
|
|
@ -323,7 +323,8 @@ typedef enum {
|
||||||
SCF_SYNC_CLOCK_REQUESTED = (1 << 19),
|
SCF_SYNC_CLOCK_REQUESTED = (1 << 19),
|
||||||
SCF_CORE_ODBC_REQ = (1 << 20),
|
SCF_CORE_ODBC_REQ = (1 << 20),
|
||||||
SCF_DEBUG_SQL = (1 << 21),
|
SCF_DEBUG_SQL = (1 << 21),
|
||||||
SCF_API_EXPANSION = (1 << 22)
|
SCF_API_EXPANSION = (1 << 22),
|
||||||
|
SCF_SESSION_THREAD_POOL = (1 << 23)
|
||||||
} switch_core_flag_enum_t;
|
} switch_core_flag_enum_t;
|
||||||
typedef uint32_t switch_core_flag_t;
|
typedef uint32_t switch_core_flag_t;
|
||||||
|
|
||||||
|
|
|
@ -1440,6 +1440,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_init(switch_core_flag_t flags, switc
|
||||||
switch_set_flag((&runtime), SCF_AUTO_SCHEMAS);
|
switch_set_flag((&runtime), SCF_AUTO_SCHEMAS);
|
||||||
switch_set_flag((&runtime), SCF_CLEAR_SQL);
|
switch_set_flag((&runtime), SCF_CLEAR_SQL);
|
||||||
switch_set_flag((&runtime), SCF_API_EXPANSION);
|
switch_set_flag((&runtime), SCF_API_EXPANSION);
|
||||||
|
switch_set_flag((&runtime), SCF_SESSION_THREAD_POOL);
|
||||||
#ifdef WIN32
|
#ifdef WIN32
|
||||||
switch_set_flag((&runtime), SCF_THREADED_SYSTEM_EXEC);
|
switch_set_flag((&runtime), SCF_THREADED_SYSTEM_EXEC);
|
||||||
#endif
|
#endif
|
||||||
|
@ -1751,6 +1752,12 @@ static void switch_load_core_config(const char *file)
|
||||||
} else {
|
} else {
|
||||||
switch_clear_flag((&runtime), SCF_AUTO_SCHEMAS);
|
switch_clear_flag((&runtime), SCF_AUTO_SCHEMAS);
|
||||||
}
|
}
|
||||||
|
} else if (!strcasecmp(var, "session-thread-pool")) {
|
||||||
|
if (switch_true(val)) {
|
||||||
|
switch_set_flag((&runtime), SCF_SESSION_THREAD_POOL);
|
||||||
|
} else {
|
||||||
|
switch_clear_flag((&runtime), SCF_SESSION_THREAD_POOL);
|
||||||
|
}
|
||||||
} else if (!strcasecmp(var, "auto-clear-sql")) {
|
} else if (!strcasecmp(var, "auto-clear-sql")) {
|
||||||
if (switch_true(val)) {
|
if (switch_true(val)) {
|
||||||
switch_set_flag((&runtime), SCF_CLEAR_SQL);
|
switch_set_flag((&runtime), SCF_CLEAR_SQL);
|
||||||
|
|
|
@ -1451,12 +1451,156 @@ static void *SWITCH_THREAD_FUNC switch_core_session_thread(switch_thread_t *thre
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef struct switch_thread_pool_node_s {
|
||||||
|
switch_memory_pool_t *pool;
|
||||||
|
} switch_thread_pool_node_t;
|
||||||
|
|
||||||
|
static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_worker(switch_thread_t *thread, void *obj)
|
||||||
|
{
|
||||||
|
switch_thread_pool_node_t *node = (switch_thread_pool_node_t *) obj;
|
||||||
|
switch_memory_pool_t *pool = node->pool;
|
||||||
|
void *pop;
|
||||||
|
int check = 0;
|
||||||
|
|
||||||
|
switch_mutex_lock(session_manager.mutex);
|
||||||
|
session_manager.running++;
|
||||||
|
switch_mutex_unlock(session_manager.mutex);
|
||||||
|
|
||||||
|
while(session_manager.ready) {
|
||||||
|
switch_status_t check_status;
|
||||||
|
|
||||||
|
if (check) {
|
||||||
|
check_status = switch_queue_trypop(session_manager.thread_queue, &pop);
|
||||||
|
} else {
|
||||||
|
check_status = switch_queue_pop(session_manager.thread_queue, &pop);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (check_status == SWITCH_STATUS_SUCCESS) {
|
||||||
|
switch_core_session_t *session = (switch_core_session_t *) pop;
|
||||||
|
|
||||||
|
if (!session) break;
|
||||||
|
|
||||||
|
|
||||||
|
switch_mutex_lock(session_manager.mutex);
|
||||||
|
session_manager.busy++;
|
||||||
|
switch_mutex_unlock(session_manager.mutex);
|
||||||
|
|
||||||
|
switch_core_session_thread(thread, (void *) session);
|
||||||
|
|
||||||
|
switch_mutex_lock(session_manager.mutex);
|
||||||
|
session_manager.busy--;
|
||||||
|
switch_mutex_unlock(session_manager.mutex);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
if (check) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
check++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
switch_mutex_lock(session_manager.mutex);
|
||||||
|
session_manager.running--;
|
||||||
|
switch_mutex_unlock(session_manager.mutex);
|
||||||
|
|
||||||
|
switch_core_destroy_memory_pool(&pool);
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static switch_status_t check_queue(void)
|
||||||
|
{
|
||||||
|
switch_status_t status = SWITCH_STATUS_FALSE;
|
||||||
|
int ttl = 0;
|
||||||
|
int x = 0;
|
||||||
|
|
||||||
|
switch_mutex_lock(session_manager.mutex);
|
||||||
|
ttl = switch_queue_size(session_manager.thread_queue);
|
||||||
|
x = (session_manager.running - session_manager.busy);
|
||||||
|
switch_mutex_unlock(session_manager.mutex);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
while (x < ttl) {
|
||||||
|
switch_thread_t *thread;
|
||||||
|
switch_threadattr_t *thd_attr;
|
||||||
|
switch_memory_pool_t *pool;
|
||||||
|
switch_thread_pool_node_t *node;
|
||||||
|
|
||||||
|
switch_core_new_memory_pool(&pool);
|
||||||
|
node = switch_core_alloc(pool, sizeof(*node));
|
||||||
|
node->pool = pool;
|
||||||
|
|
||||||
|
switch_threadattr_create(&thd_attr, node->pool);
|
||||||
|
switch_threadattr_detach_set(thd_attr, 1);
|
||||||
|
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
|
||||||
|
|
||||||
|
if (switch_thread_create(&thread, thd_attr, switch_core_session_thread_pool_worker, node, node->pool) != SWITCH_STATUS_SUCCESS) {
|
||||||
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Thread Failure!\n");
|
||||||
|
switch_core_destroy_memory_pool(&pool);
|
||||||
|
status = SWITCH_STATUS_GENERR;
|
||||||
|
} else {
|
||||||
|
status = SWITCH_STATUS_SUCCESS;
|
||||||
|
}
|
||||||
|
x++;
|
||||||
|
}
|
||||||
|
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void *SWITCH_THREAD_FUNC switch_core_session_thread_pool_manager(switch_thread_t *thread, void *obj)
|
||||||
|
{
|
||||||
|
int x = 0;
|
||||||
|
|
||||||
|
while(session_manager.ready) {
|
||||||
|
switch_yield(100000);
|
||||||
|
|
||||||
|
if (++x == 300) {
|
||||||
|
switch_queue_interrupt_all(session_manager.thread_queue);
|
||||||
|
x = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
check_queue();
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
SWITCH_DECLARE(switch_status_t) switch_core_session_thread_pool_launch(switch_core_session_t *session)
|
||||||
|
{
|
||||||
|
if (session_manager.ready == 1) {
|
||||||
|
switch_thread_t *thread;
|
||||||
|
switch_threadattr_t *thd_attr;
|
||||||
|
|
||||||
|
switch_queue_create(&session_manager.thread_queue, 100000, session_manager.memory_pool);
|
||||||
|
switch_mutex_init(&session_manager.mutex, SWITCH_MUTEX_NESTED, session_manager.memory_pool);
|
||||||
|
|
||||||
|
switch_threadattr_create(&thd_attr, session_manager.memory_pool);
|
||||||
|
switch_threadattr_detach_set(thd_attr, 1);
|
||||||
|
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
|
||||||
|
switch_thread_create(&thread, thd_attr, switch_core_session_thread_pool_manager, NULL, session_manager.memory_pool);
|
||||||
|
session_manager.ready++;
|
||||||
|
}
|
||||||
|
|
||||||
|
switch_queue_push(session_manager.thread_queue, session);
|
||||||
|
check_queue();
|
||||||
|
return SWITCH_STATUS_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
SWITCH_DECLARE(switch_status_t) switch_core_session_thread_launch(switch_core_session_t *session)
|
SWITCH_DECLARE(switch_status_t) switch_core_session_thread_launch(switch_core_session_t *session)
|
||||||
{
|
{
|
||||||
switch_status_t status = SWITCH_STATUS_FALSE;
|
switch_status_t status = SWITCH_STATUS_FALSE;
|
||||||
switch_thread_t *thread;
|
switch_thread_t *thread;
|
||||||
switch_threadattr_t *thd_attr;;
|
switch_threadattr_t *thd_attr;
|
||||||
|
|
||||||
|
if (switch_test_flag((&runtime), SCF_SESSION_THREAD_POOL)) {
|
||||||
|
return switch_core_session_thread_pool_launch(session);
|
||||||
|
}
|
||||||
|
|
||||||
switch_threadattr_create(&thd_attr, session->pool);
|
switch_threadattr_create(&thd_attr, session->pool);
|
||||||
switch_threadattr_detach_set(thd_attr, 1);
|
switch_threadattr_detach_set(thd_attr, 1);
|
||||||
|
|
||||||
|
@ -2042,16 +2186,24 @@ SWITCH_DECLARE(uint32_t) switch_core_sessions_per_second(uint32_t new_limit)
|
||||||
|
|
||||||
void switch_core_session_init(switch_memory_pool_t *pool)
|
void switch_core_session_init(switch_memory_pool_t *pool)
|
||||||
{
|
{
|
||||||
|
|
||||||
|
|
||||||
memset(&session_manager, 0, sizeof(session_manager));
|
memset(&session_manager, 0, sizeof(session_manager));
|
||||||
session_manager.session_limit = 1000;
|
session_manager.session_limit = 1000;
|
||||||
session_manager.session_id = 1;
|
session_manager.session_id = 1;
|
||||||
session_manager.memory_pool = pool;
|
session_manager.memory_pool = pool;
|
||||||
switch_core_hash_init(&session_manager.session_table, session_manager.memory_pool);
|
switch_core_hash_init(&session_manager.session_table, session_manager.memory_pool);
|
||||||
|
session_manager.ready = 1;
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void switch_core_session_uninit(void)
|
void switch_core_session_uninit(void)
|
||||||
{
|
{
|
||||||
switch_core_hash_destroy(&session_manager.session_table);
|
switch_core_hash_destroy(&session_manager.session_table);
|
||||||
|
session_manager.ready = 0;
|
||||||
|
switch_queue_interrupt_all(session_manager.thread_queue);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SWITCH_DECLARE(switch_app_log_t *) switch_core_session_get_app_log(switch_core_session_t *session)
|
SWITCH_DECLARE(switch_app_log_t *) switch_core_session_get_app_log(switch_core_session_t *session)
|
||||||
|
|
|
@ -336,7 +336,7 @@ SWITCH_DECLARE(void) switch_core_session_run(switch_core_session_t *session)
|
||||||
const switch_state_handler_table_t *driver_state_handler = NULL;
|
const switch_state_handler_table_t *driver_state_handler = NULL;
|
||||||
const switch_state_handler_table_t *application_state_handler = NULL;
|
const switch_state_handler_table_t *application_state_handler = NULL;
|
||||||
int silly = 0;
|
int silly = 0;
|
||||||
uint32_t new_loops = 5000;
|
// uint32_t new_loops = 5000;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Life of the channel. you have channel and pool in your session
|
Life of the channel. you have channel and pool in your session
|
||||||
|
@ -468,6 +468,7 @@ SWITCH_DECLARE(void) switch_core_session_run(switch_core_session_t *session)
|
||||||
endstate = switch_channel_get_state(session->channel);
|
endstate = switch_channel_get_state(session->channel);
|
||||||
|
|
||||||
if (endstate == switch_channel_get_running_state(session->channel)) {
|
if (endstate == switch_channel_get_running_state(session->channel)) {
|
||||||
|
/**
|
||||||
if (endstate == CS_NEW) {
|
if (endstate == CS_NEW) {
|
||||||
switch_cond_next();
|
switch_cond_next();
|
||||||
switch_ivr_parse_all_events(session);
|
switch_ivr_parse_all_events(session);
|
||||||
|
@ -477,6 +478,7 @@ SWITCH_DECLARE(void) switch_core_session_run(switch_core_session_t *session)
|
||||||
switch_channel_hangup(session->channel, SWITCH_CAUSE_INVALID_CALL_REFERENCE);
|
switch_channel_hangup(session->channel, SWITCH_CAUSE_INVALID_CALL_REFERENCE);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
**/
|
||||||
switch_ivr_parse_all_events(session);
|
switch_ivr_parse_all_events(session);
|
||||||
switch_ivr_parse_all_events(session);
|
switch_ivr_parse_all_events(session);
|
||||||
|
|
||||||
|
@ -490,7 +492,7 @@ SWITCH_DECLARE(void) switch_core_session_run(switch_core_session_t *session)
|
||||||
|
|
||||||
switch_ivr_parse_all_events(session);
|
switch_ivr_parse_all_events(session);
|
||||||
switch_ivr_parse_all_events(session);
|
switch_ivr_parse_all_events(session);
|
||||||
}
|
//}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
done:
|
done:
|
||||||
|
|
|
@ -307,6 +307,8 @@ static switch_status_t switch_event_queue_dispatch_event(switch_event_t **eventp
|
||||||
if (switch_queue_size(EVENT_DISPATCH_QUEUE) > (unsigned int)(DISPATCH_QUEUE_LEN * DISPATCH_THREAD_COUNT)) {
|
if (switch_queue_size(EVENT_DISPATCH_QUEUE) > (unsigned int)(DISPATCH_QUEUE_LEN * DISPATCH_THREAD_COUNT)) {
|
||||||
launch++;
|
launch++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
switch_mutex_unlock(EVENT_QUEUE_MUTEX);
|
||||||
|
|
||||||
if (launch) {
|
if (launch) {
|
||||||
if (SOFT_MAX_DISPATCH + 1 < MAX_DISPATCH) {
|
if (SOFT_MAX_DISPATCH + 1 < MAX_DISPATCH) {
|
||||||
|
@ -314,8 +316,6 @@ static switch_status_t switch_event_queue_dispatch_event(switch_event_t **eventp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
switch_mutex_unlock(EVENT_QUEUE_MUTEX);
|
|
||||||
|
|
||||||
*eventp = NULL;
|
*eventp = NULL;
|
||||||
switch_queue_push(EVENT_DISPATCH_QUEUE, event);
|
switch_queue_push(EVENT_DISPATCH_QUEUE, event);
|
||||||
event = NULL;
|
event = NULL;
|
||||||
|
|
Loading…
Reference in New Issue