This commit is contained in:
Anthony Minessale 2010-09-17 16:33:47 -05:00
parent 62d275dd9d
commit ce01c76c56
1 changed files with 45 additions and 26 deletions

View File

@ -34,6 +34,7 @@
#include <switch.h> #include <switch.h>
#include "private/switch_core_pvt.h" #include "private/switch_core_pvt.h"
//*#define DEBUG_SQL 1
static struct { static struct {
switch_cache_db_handle_t *event_db; switch_cache_db_handle_t *event_db;
@ -41,7 +42,9 @@ static struct {
switch_memory_pool_t *memory_pool; switch_memory_pool_t *memory_pool;
switch_event_node_t *event_node; switch_event_node_t *event_node;
switch_thread_t *thread; switch_thread_t *thread;
switch_thread_t *db_thread;
int thread_running; int thread_running;
int db_thread_running;
switch_bool_t manage; switch_bool_t manage;
switch_mutex_t *io_mutex; switch_mutex_t *io_mutex;
switch_mutex_t *dbh_mutex; switch_mutex_t *dbh_mutex;
@ -859,7 +862,24 @@ SWITCH_DECLARE(switch_bool_t) switch_cache_db_test_reactive(switch_cache_db_hand
} }
static void *SWITCH_THREAD_FUNC switch_core_sql_db_thread(switch_thread_t *thread, void *obj)
{
int sec = 0;
sql_manager.db_thread_running = 1;
while (sql_manager.db_thread_running == 1) {
if (++sec == SQL_CACHE_TIMEOUT) {
sql_close(switch_epoch_time_now(NULL));
wake_thread(1);
sec = 0;
}
switch_yield(1000);
}
return NULL;
}
static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, void *obj) static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, void *obj)
{ {
@ -872,16 +892,10 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
char *sql = NULL; char *sql = NULL;
switch_size_t newlen; switch_size_t newlen;
int lc = 0; int lc = 0;
uint32_t loops = 0, sec = 0;
uint32_t l1 = 1000;
uint32_t sanity = 120; uint32_t sanity = 120;
switch_assert(sqlbuf); switch_assert(sqlbuf);
if (!sql_manager.manage) {
l1 = 10;
}
while (!sql_manager.event_db) { while (!sql_manager.event_db) {
if (switch_core_db_handle(&sql_manager.event_db) == SWITCH_STATUS_SUCCESS && sql_manager.event_db) if (switch_core_db_handle(&sql_manager.event_db) == SWITCH_STATUS_SUCCESS && sql_manager.event_db)
break; break;
@ -895,26 +909,12 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
return NULL; return NULL;
} }
sql_manager.thread_running = 1; sql_manager.thread_running = 1;
while (sql_manager.thread_running == 1) { while (sql_manager.thread_running == 1) {
switch_mutex_lock(sql_manager.cond_mutex); switch_mutex_lock(sql_manager.cond_mutex);
if (++loops == l1) {
if (++sec == SQL_CACHE_TIMEOUT) {
sql_close(switch_epoch_time_now(NULL));
sec = 0;
}
loops = 0;
}
if (!sql_manager.manage) {
switch_yield(100000);
continue;
}
if (sql || switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS || if (sql || switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS ||
switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) { switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) {
@ -932,9 +932,11 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
if (new_mlen < runtime.max_sql_buffer_len) { if (new_mlen < runtime.max_sql_buffer_len) {
sql_len = new_mlen; sql_len = new_mlen;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, #ifdef DEBUG_SQL
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
"REALLOC %ld %d %d\n", (long int)sql_len, switch_queue_size(sql_manager.sql_queue[0]), "REALLOC %ld %d %d\n", (long int)sql_len, switch_queue_size(sql_manager.sql_queue[0]),
switch_queue_size(sql_manager.sql_queue[1])); switch_queue_size(sql_manager.sql_queue[1]));
#endif
if (!(tmp = realloc(sqlbuf, sql_len))) { if (!(tmp = realloc(sqlbuf, sql_len))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread ending on mem err\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread ending on mem err\n");
abort(); abort();
@ -942,8 +944,10 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
} }
sqlbuf = tmp; sqlbuf = tmp;
} else { } else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, #ifdef DEBUG_SQL
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
"SAVE %d %d\n", switch_queue_size(sql_manager.sql_queue[0]), switch_queue_size(sql_manager.sql_queue[1])); "SAVE %d %d\n", switch_queue_size(sql_manager.sql_queue[0]), switch_queue_size(sql_manager.sql_queue[1]));
#endif
goto skip; goto skip;
} }
} }
@ -964,12 +968,16 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
lc = sql ? 1 : 0 + switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]); lc = sql ? 1 : 0 + switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]);
if (trans && iterations && (iterations > target || !lc)) { if (trans && iterations && (iterations > target || !lc)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, #ifdef DEBUG_SQL
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
"RUN %d %d %d\n", switch_queue_size(sql_manager.sql_queue[0]), switch_queue_size(sql_manager.sql_queue[1]), iterations); "RUN %d %d %d\n", switch_queue_size(sql_manager.sql_queue[0]), switch_queue_size(sql_manager.sql_queue[1]), iterations);
#endif
if (switch_cache_db_persistant_execute_trans(sql_manager.event_db, sqlbuf, 1) != SWITCH_STATUS_SUCCESS) { if (switch_cache_db_persistant_execute_trans(sql_manager.event_db, sqlbuf, 1) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread unable to commit transaction, records lost!\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread unable to commit transaction, records lost!\n");
} }
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, "DONE\n"); #ifdef DEBUG_SQL
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "DONE\n");
#endif
iterations = 0; iterations = 0;
trans = 0; trans = 0;
len = 0; len = 0;
@ -978,6 +986,8 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
switch_yield(400000); switch_yield(400000);
} }
lc = sql ? 1 : 0 + switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]);
if (!lc) { if (!lc) {
switch_thread_cond_wait(sql_manager.cond, sql_manager.cond_mutex); switch_thread_cond_wait(sql_manager.cond, sql_manager.cond_mutex);
} }
@ -1412,7 +1422,7 @@ static void core_event_handler(switch_event_t *event)
switch_queue_push(sql_manager.sql_queue[0], sql[i]); switch_queue_push(sql_manager.sql_queue[0], sql[i]);
} }
sql[i] = NULL; sql[i] = NULL;
wake_thread(0); wake_thread(1);
} }
} }
} }
@ -1663,7 +1673,10 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_
switch_threadattr_create(&thd_attr, sql_manager.memory_pool); switch_threadattr_create(&thd_attr, sql_manager.memory_pool);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_thread_create(&sql_manager.thread, thd_attr, switch_core_sql_thread, NULL, sql_manager.memory_pool); if (sql_manager.manage) {
switch_thread_create(&sql_manager.thread, thd_attr, switch_core_sql_thread, NULL, sql_manager.memory_pool);
}
switch_thread_create(&sql_manager.db_thread, thd_attr, switch_core_sql_db_thread, NULL, sql_manager.memory_pool);
while (!sql_manager.thread_running) { while (!sql_manager.thread_running) {
switch_yield(10000); switch_yield(10000);
@ -1693,6 +1706,12 @@ void switch_core_sqldb_stop(void)
switch_thread_join(&st, sql_manager.thread); switch_thread_join(&st, sql_manager.thread);
} }
if (sql_manager.thread && sql_manager.db_thread_running) {
sql_manager.db_thread_running = -1;
switch_thread_join(&st, sql_manager.db_thread);
}
switch_cache_db_flush_handles(); switch_cache_db_flush_handles();
sql_close(0); sql_close(0);