diff --git a/src/switch_core_sqldb.c b/src/switch_core_sqldb.c index a5322751cd..c6b7f8d970 100644 --- a/src/switch_core_sqldb.c +++ b/src/switch_core_sqldb.c @@ -34,6 +34,7 @@ #include #include "private/switch_core_pvt.h" +//*#define DEBUG_SQL 1 static struct { switch_cache_db_handle_t *event_db; @@ -41,7 +42,9 @@ static struct { switch_memory_pool_t *memory_pool; switch_event_node_t *event_node; switch_thread_t *thread; + switch_thread_t *db_thread; int thread_running; + int db_thread_running; switch_bool_t manage; switch_mutex_t *io_mutex; switch_mutex_t *dbh_mutex; @@ -859,7 +862,24 @@ SWITCH_DECLARE(switch_bool_t) switch_cache_db_test_reactive(switch_cache_db_hand } +static void *SWITCH_THREAD_FUNC switch_core_sql_db_thread(switch_thread_t *thread, void *obj) +{ + int sec = 0; + sql_manager.db_thread_running = 1; + + while (sql_manager.db_thread_running == 1) { + if (++sec == SQL_CACHE_TIMEOUT) { + sql_close(switch_epoch_time_now(NULL)); + wake_thread(1); + sec = 0; + } + switch_yield(1000); + } + + + return NULL; +} static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, void *obj) { @@ -872,16 +892,10 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, char *sql = NULL; switch_size_t newlen; int lc = 0; - uint32_t loops = 0, sec = 0; - uint32_t l1 = 1000; uint32_t sanity = 120; switch_assert(sqlbuf); - if (!sql_manager.manage) { - l1 = 10; - } - while (!sql_manager.event_db) { if (switch_core_db_handle(&sql_manager.event_db) == SWITCH_STATUS_SUCCESS && sql_manager.event_db) break; @@ -895,26 +909,12 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, return NULL; } - sql_manager.thread_running = 1; while (sql_manager.thread_running == 1) { switch_mutex_lock(sql_manager.cond_mutex); - if (++loops == l1) { - if (++sec == SQL_CACHE_TIMEOUT) { - sql_close(switch_epoch_time_now(NULL)); - sec = 0; - } - loops = 0; - } - - if (!sql_manager.manage) { - switch_yield(100000); - continue; - } - if (sql || switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS || switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) { @@ -932,9 +932,11 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, if (new_mlen < runtime.max_sql_buffer_len) { sql_len = new_mlen; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, +#ifdef DEBUG_SQL + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "REALLOC %ld %d %d\n", (long int)sql_len, switch_queue_size(sql_manager.sql_queue[0]), switch_queue_size(sql_manager.sql_queue[1])); +#endif if (!(tmp = realloc(sqlbuf, sql_len))) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread ending on mem err\n"); abort(); @@ -942,8 +944,10 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, } sqlbuf = tmp; } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, +#ifdef DEBUG_SQL + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SAVE %d %d\n", switch_queue_size(sql_manager.sql_queue[0]), switch_queue_size(sql_manager.sql_queue[1])); +#endif goto skip; } } @@ -964,12 +968,16 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, lc = sql ? 1 : 0 + switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]); if (trans && iterations && (iterations > target || !lc)) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, +#ifdef DEBUG_SQL + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "RUN %d %d %d\n", switch_queue_size(sql_manager.sql_queue[0]), switch_queue_size(sql_manager.sql_queue[1]), iterations); +#endif if (switch_cache_db_persistant_execute_trans(sql_manager.event_db, sqlbuf, 1) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread unable to commit transaction, records lost!\n"); } - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "DONE\n"); +#ifdef DEBUG_SQL + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "DONE\n"); +#endif iterations = 0; trans = 0; len = 0; @@ -978,6 +986,8 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, switch_yield(400000); } + lc = sql ? 1 : 0 + switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]); + if (!lc) { switch_thread_cond_wait(sql_manager.cond, sql_manager.cond_mutex); } @@ -1412,7 +1422,7 @@ static void core_event_handler(switch_event_t *event) switch_queue_push(sql_manager.sql_queue[0], sql[i]); } sql[i] = NULL; - wake_thread(0); + wake_thread(1); } } } @@ -1663,7 +1673,10 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_ switch_threadattr_create(&thd_attr, sql_manager.memory_pool); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); - switch_thread_create(&sql_manager.thread, thd_attr, switch_core_sql_thread, NULL, sql_manager.memory_pool); + if (sql_manager.manage) { + switch_thread_create(&sql_manager.thread, thd_attr, switch_core_sql_thread, NULL, sql_manager.memory_pool); + } + switch_thread_create(&sql_manager.db_thread, thd_attr, switch_core_sql_db_thread, NULL, sql_manager.memory_pool); while (!sql_manager.thread_running) { switch_yield(10000); @@ -1693,6 +1706,12 @@ void switch_core_sqldb_stop(void) switch_thread_join(&st, sql_manager.thread); } + + if (sql_manager.thread && sql_manager.db_thread_running) { + sql_manager.db_thread_running = -1; + switch_thread_join(&st, sql_manager.db_thread); + } + switch_cache_db_flush_handles(); sql_close(0);