diff --git a/src/mod/languages/mod_php/Makefile b/src/mod/languages/mod_php/Makefile index 2a661acfcd..111a07054e 100644 --- a/src/mod/languages/mod_php/Makefile +++ b/src/mod/languages/mod_php/Makefile @@ -6,7 +6,7 @@ LCFLAGS=-fPIC -DZTS -DPTHREADS CFLAGS += `$(PCFG) --includes` -g3 -fno-strict-aliasing MDIR += `$(PCFG) --extension-dir` PHPMOD=freeswitch -PHPLDFLAGS = `$(PCFG) --ldflags` -lcrypt -lresolv -lm -ldl -lnsl -lxml2 -lz -lphp5 +PHPLDFLAGS = `$(PCFG) --ldflags` -lm -ldl -lxml2 -lz -lphp5 MOD_CFLAGS += -fPIC all: depends $(MODNAME).$(DYNAMIC_LIB_EXTEN) $(PHPMOD).$(DYNAMIC_LIB_EXTEN) diff --git a/src/switch_core.c b/src/switch_core.c index 065fc9c274..3342f6fbe3 100644 --- a/src/switch_core.c +++ b/src/switch_core.c @@ -141,6 +141,11 @@ static void switch_core_standard_on_hold(switch_core_session_t *session); /* The main runtime obj we keep this hidden for ourselves */ static struct switch_core_runtime runtime; +/* Mutex and conditional for sql lite */ +static switch_mutex_t *SWITCH_SQL_MUTEX; +static switch_thread_cond_t *SWITCH_SQL_CONDITIONAL; + + static void db_pick_path(char *dbname, char *buf, switch_size_t size) { @@ -3208,20 +3213,23 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, uint8_t trans = 0; switch_time_t last_commit = switch_time_now(); uint32_t freq = 1000, target = 1000, diff = 0; + + //if these are null we have big problems + assert(SWITCH_SQL_MUTEX != NULL); + assert(SWITCH_SQL_CONDITIONAL != NULL); if (!runtime.event_db) { runtime.event_db = switch_core_db_handle(); } switch_queue_create(&runtime.sql_queue, SWITCH_SQL_QUEUE_LEN, runtime.memory_pool); + + switch_mutex_lock(SWITCH_SQL_MUTEX); for(;;) { - uint32_t work = 0; - if (switch_queue_trypop(runtime.sql_queue, &pop) == SWITCH_STATUS_SUCCESS) { char *sql = (char *) pop; if (sql) { - work++; if (itterations == 0) { char *isql = "begin transaction CORE1;"; if (switch_core_db_persistant_execute(runtime.event_db, isql, 25) != SWITCH_STATUS_SUCCESS) { @@ -3241,6 +3249,15 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "SQL thread ending\n"); break; } + } else { + //Are we currently in a transaction, wait accordingly + if(trans) { + //we need to finish a transaction in a bit, wait around until we have more work or that time comes + switch_thread_cond_timedwait(SWITCH_SQL_CONDITIONAL, SWITCH_SQL_MUTEX, (freq * 1000) + last_commit - switch_time_now()); + } else { + //wait until we have more work + switch_thread_cond_wait(SWITCH_SQL_CONDITIONAL, SWITCH_SQL_MUTEX); + } } if (diff < freq) { @@ -3249,7 +3266,6 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, if (trans && (itterations == target || diff >= freq)) { char *sql = "end transaction CORE1"; - work++; if (switch_core_db_persistant_execute(runtime.event_db, sql, 25) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL exec error! [%s]\n", sql); } @@ -3258,9 +3274,7 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, trans = 0; diff = 0; } - if (!work) { - switch_yield(1000); - } + } return NULL; } @@ -3273,6 +3287,10 @@ static void switch_core_sql_thread_launch(void) assert(runtime.memory_pool != NULL); + //create the mutex and conditional + switch_mutex_init(&SWITCH_SQL_MUTEX, SWITCH_MUTEX_NESTED, runtime.memory_pool); + switch_thread_cond_create(&SWITCH_SQL_CONDITIONAL, runtime.memory_pool); + switch_threadattr_create(&thd_attr, runtime.memory_pool); switch_threadattr_detach_set(thd_attr, 1); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); @@ -3380,6 +3398,16 @@ static void core_event_handler(switch_event_t *event) if (sql) { switch_queue_push(runtime.sql_queue, sql); + + //See if we need to wake up the sql thread + if(switch_mutex_trylock(SWITCH_SQL_MUTEX) == SWITCH_STATUS_SUCCESS) { + //wake up the SQL thread + switch_thread_cond_signal(SWITCH_SQL_CONDITIONAL); + + //give up our lock + switch_mutex_unlock(SWITCH_SQL_MUTEX); + } + sql = NULL; } } @@ -3686,6 +3714,15 @@ SWITCH_DECLARE(switch_status_t) switch_core_destroy(void) switch_event_shutdown(); switch_queue_push(runtime.sql_queue, NULL); + //See if we need to wake up the sql thread + if(switch_mutex_trylock(SWITCH_SQL_MUTEX) == SWITCH_STATUS_SUCCESS) { + //wake up the SQL thread + switch_thread_cond_signal(SWITCH_SQL_CONDITIONAL); + + //give up our lock + switch_mutex_unlock(SWITCH_SQL_MUTEX); + } + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Waiting for unfinished SQL transactions\n"); while (switch_queue_size(runtime.sql_queue) > 0) { switch_yield(10000); diff --git a/src/switch_event.c b/src/switch_event.c index 54ba17e9b0..ac140bd2ce 100644 --- a/src/switch_event.c +++ b/src/switch_event.c @@ -35,6 +35,9 @@ static switch_event_node_t *EVENT_NODES[SWITCH_EVENT_ALL + 1] = { NULL }; static switch_mutex_t *BLOCK = NULL; static switch_mutex_t *POOL_LOCK = NULL; +static switch_mutex_t *EVENT_QUEUE_MUTEX = NULL; +static switch_mutex_t *EVENT_QUEUE_HAVEMORE_MUTEX = NULL; +static switch_thread_cond_t *EVENT_QUEUE_CONDITIONAL = NULL; static switch_memory_pool_t *RUNTIME_POOL = NULL; //static switch_memory_pool_t *APOOL = NULL; //static switch_memory_pool_t *BPOOL = NULL; @@ -44,6 +47,7 @@ static int POOL_COUNT_MAX = SWITCH_CORE_QUEUE_LEN; static switch_hash_t *CUSTOM_HASH = NULL; static int THREAD_RUNNING = 0; +static int EVENT_QUEUE_HAVEMORE = 0; #if 0 static void *locked_alloc(switch_size_t len) @@ -174,25 +178,56 @@ static void *SWITCH_THREAD_FUNC switch_event_thread(switch_thread_t *thread, voi assert(obj == NULL); assert(POOL_LOCK != NULL); assert(RUNTIME_POOL != NULL); + assert(EVENT_QUEUE_MUTEX != NULL); + assert(EVENT_QUEUE_HAVEMORE_MUTEX != NULL); + assert(EVENT_QUEUE_CONDITIONAL != NULL); THREAD_RUNNING = 1; queues[0] = EVENT_QUEUE[SWITCH_PRIORITY_HIGH]; queues[1] = EVENT_QUEUE[SWITCH_PRIORITY_NORMAL]; queues[2] = EVENT_QUEUE[SWITCH_PRIORITY_LOW]; + + switch_mutex_lock(EVENT_QUEUE_MUTEX); for(;;) { int any; + len[1] = switch_queue_size(EVENT_QUEUE[SWITCH_PRIORITY_NORMAL]); len[2] = switch_queue_size(EVENT_QUEUE[SWITCH_PRIORITY_LOW]); len[0] = switch_queue_size(EVENT_QUEUE[SWITCH_PRIORITY_HIGH]); any = len[1] + len[2] + len[0]; + if (!any) { - if (THREAD_RUNNING != 1) { - break; + //lock on havemore so we are the only ones poking at it while we check it + //see if we saw anything in the queues or have a check again flag + switch_mutex_lock(EVENT_QUEUE_HAVEMORE_MUTEX); + if(!EVENT_QUEUE_HAVEMORE) { + //See if we need to quit + if (THREAD_RUNNING != 1) { + //give up our lock + switch_mutex_unlock(EVENT_QUEUE_HAVEMORE_MUTEX); + + //Game over + break; + } + + //give up our lock + switch_mutex_unlock(EVENT_QUEUE_HAVEMORE_MUTEX); + + //wait until someone tells us we have something to do + switch_thread_cond_wait(EVENT_QUEUE_CONDITIONAL, EVENT_QUEUE_MUTEX); + } else { + //Caught a race, one of the queues was updated after we looked at it + //reset our flag + EVENT_QUEUE_HAVEMORE = 0; + + //Give up our lock + switch_mutex_unlock(EVENT_QUEUE_HAVEMORE_MUTEX); } - switch_yield(1000); + + //go grab some events continue; } @@ -304,6 +339,27 @@ SWITCH_DECLARE(switch_status_t) switch_event_shutdown(void) if (THREAD_RUNNING > 0) { THREAD_RUNNING = -1; + //Lock on havemore to make sure he event thread, if currently running + // doesn't check the HAVEMORE flag before we set it + switch_mutex_lock(EVENT_QUEUE_HAVEMORE_MUTEX); + //See if the event thread is sitting + if(switch_mutex_trylock(EVENT_QUEUE_MUTEX) == SWITCH_STATUS_SUCCESS) { + //we don't need havemore anymore, the thread was sitting already + switch_mutex_unlock(EVENT_QUEUE_HAVEMORE_MUTEX); + + //wake up the event thread + switch_thread_cond_signal(EVENT_QUEUE_CONDITIONAL); + + //give up our lock + switch_mutex_unlock(EVENT_QUEUE_MUTEX); + } else { // it wasn't waiting which means we might have updated a queue it already looked at + //set a flag so it knows to read the queues again + EVENT_QUEUE_HAVEMORE = 1; + + //variable updated, give up the mutex + switch_mutex_unlock(EVENT_QUEUE_HAVEMORE_MUTEX); + } + while (x < 100 && THREAD_RUNNING) { switch_yield(1000); if (THREAD_RUNNING == last) { @@ -344,6 +400,9 @@ SWITCH_DECLARE(switch_status_t) switch_event_init(switch_memory_pool_t *pool) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Activate Eventing Engine.\n"); switch_mutex_init(&BLOCK, SWITCH_MUTEX_NESTED, RUNTIME_POOL); switch_mutex_init(&POOL_LOCK, SWITCH_MUTEX_NESTED, RUNTIME_POOL); + switch_mutex_init(&EVENT_QUEUE_MUTEX, SWITCH_MUTEX_NESTED, RUNTIME_POOL); + switch_mutex_init(&EVENT_QUEUE_HAVEMORE_MUTEX, SWITCH_MUTEX_NESTED, RUNTIME_POOL); + switch_thread_cond_create(&EVENT_QUEUE_CONDITIONAL, RUNTIME_POOL); switch_core_hash_init(&CUSTOM_HASH, RUNTIME_POOL); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); switch_thread_create(&thread, thd_attr, switch_event_thread, NULL, RUNTIME_POOL); @@ -662,6 +721,10 @@ SWITCH_DECLARE(switch_status_t) switch_event_fire_detailed(char *file, char *fun assert(BLOCK != NULL); assert(RUNTIME_POOL != NULL); + assert(EVENT_QUEUE_HAVEMORE_MUTEX != NULL); + assert(EVENT_QUEUE_MUTEX != NULL); + assert(EVENT_QUEUE_CONDITIONAL != NULL); + assert(RUNTIME_POOL != NULL); if (THREAD_RUNNING <= 0) { /* sorry we're closed */ @@ -690,6 +753,29 @@ SWITCH_DECLARE(switch_status_t) switch_event_fire_detailed(char *file, char *fun } switch_queue_push(EVENT_QUEUE[(*event)->priority], *event); + + //Lock on havemore to make sure he event thread, if currently running + // doesn't check the HAVEMORE flag before we set it + switch_mutex_lock(EVENT_QUEUE_HAVEMORE_MUTEX); + //See if the event thread is sitting + if(switch_mutex_trylock(EVENT_QUEUE_MUTEX) == SWITCH_STATUS_SUCCESS) { + //we don't need havemore anymore, the thread was sitting already + switch_mutex_unlock(EVENT_QUEUE_HAVEMORE_MUTEX); + + //wake up the event thread + switch_thread_cond_signal(EVENT_QUEUE_CONDITIONAL); + + //give up our lock + switch_mutex_unlock(EVENT_QUEUE_MUTEX); + } else { // it wasn't waiting which means we might have updated a queue it already looked at + //set a flag so it knows to read the queues again + EVENT_QUEUE_HAVEMORE = 1; + + //variable updated, give up the mutex + switch_mutex_unlock(EVENT_QUEUE_HAVEMORE_MUTEX); + } + + *event = NULL; return SWITCH_STATUS_SUCCESS; diff --git a/src/switch_rtp.c b/src/switch_rtp.c index 98ee7b31d9..9f6c6a1771 100644 --- a/src/switch_rtp.c +++ b/src/switch_rtp.c @@ -483,12 +483,14 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_create(switch_rtp_t **new_rtp_session timer_name = "soft"; } - if (switch_core_timer_init(&rtp_session->timer, timer_name, ms_per_packet / 1000, packet_size, rtp_session->pool) == SWITCH_STATUS_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Starting timer [%s] %d bytes per %dms\n", timer_name, packet_size, ms_per_packet); - } else { - memset(&rtp_session->timer, 0, sizeof(rtp_session->timer)); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error starting timer [%s], async RTP disabled\n", timer_name); - switch_clear_flag_locked(rtp_session, SWITCH_RTP_FLAG_USE_TIMER); + if (timer_name) { + if (switch_core_timer_init(&rtp_session->timer, timer_name, ms_per_packet / 1000, packet_size, rtp_session->pool) == SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Starting timer [%s] %d bytes per %dms\n", timer_name, packet_size, ms_per_packet); + } else { + memset(&rtp_session->timer, 0, sizeof(rtp_session->timer)); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error starting timer [%s], async RTP disabled\n", timer_name); + switch_clear_flag_locked(rtp_session, SWITCH_RTP_FLAG_USE_TIMER); + } } rtp_session->ready++;