add cache_db handle api (odbc/sqlite abstraction)

git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@15473 d0543943-73ff-0310-b7d9-9358b9ac24b2
This commit is contained in:
Anthony Minessale 2009-11-14 21:59:01 +00:00
parent 4e91bc95eb
commit f06f3a5de7
5 changed files with 218 additions and 6 deletions

View File

@ -116,6 +116,7 @@ struct switch_core_session;
struct switch_core_runtime; struct switch_core_runtime;
struct switch_core_port_allocator; struct switch_core_port_allocator;
/*! /*!
\defgroup core1 Core Library \defgroup core1 Core Library
\ingroup FREESWITCH \ingroup FREESWITCH
@ -1403,7 +1404,7 @@ SWITCH_DECLARE(switch_codec_t *) switch_core_session_get_video_write_codec(_In_
\param filename the path to the db file to open \param filename the path to the db file to open
\return the db handle \return the db handle
*/ */
SWITCH_DECLARE(switch_core_db_t *) switch_core_db_open_file(char *filename); SWITCH_DECLARE(switch_core_db_t *) switch_core_db_open_file(const char *filename);
/*! /*!
\brief Execute a sql stmt until it is accepted \brief Execute a sql stmt until it is accepted
@ -1920,6 +1921,22 @@ SWITCH_DECLARE(switch_status_t) switch_ivr_preprocess_session(switch_core_sessio
\} \}
*/ */
typedef struct {
switch_core_db_t *db;
switch_odbc_handle_t *odbc_dbh;
time_t last_used;
switch_mutex_t *mutex;
switch_memory_pool_t *pool;
} switch_cache_db_handle_t;
SWITCH_DECLARE(void) switch_cache_db_release_db_handle(switch_cache_db_handle_t **dbh);
SWITCH_DECLARE(switch_cache_db_handle_t *)switch_cache_db_get_db_handle(const char *db_name, const char *odbc_user, const char *odbc_pass);
SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql(switch_cache_db_handle_t *dbh, char *sql);
SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_callback(switch_cache_db_handle_t *dbh, char *sql,
switch_core_db_callback_func_t callback, void *pdata);
SWITCH_END_EXTERN_C SWITCH_END_EXTERN_C
#endif #endif
/* For Emacs: /* For Emacs:

View File

@ -51,7 +51,7 @@ typedef enum {
SWITCH_ODBC_FAIL = -1 SWITCH_ODBC_FAIL = -1
} switch_odbc_status_t; } switch_odbc_status_t;
SWITCH_DECLARE(switch_odbc_handle_t *) switch_odbc_handle_new(char *dsn, char *username, char *password); SWITCH_DECLARE(switch_odbc_handle_t *) switch_odbc_handle_new(const char *dsn, const char *username, const char *password);
SWITCH_DECLARE(switch_odbc_status_t) switch_odbc_handle_disconnect(switch_odbc_handle_t *handle); SWITCH_DECLARE(switch_odbc_status_t) switch_odbc_handle_disconnect(switch_odbc_handle_t *handle);
SWITCH_DECLARE(switch_odbc_status_t) switch_odbc_handle_connect(switch_odbc_handle_t *handle); SWITCH_DECLARE(switch_odbc_status_t) switch_odbc_handle_connect(switch_odbc_handle_t *handle);
SWITCH_DECLARE(void) switch_odbc_handle_destroy(switch_odbc_handle_t **handlep); SWITCH_DECLARE(void) switch_odbc_handle_destroy(switch_odbc_handle_t **handlep);

View File

@ -195,12 +195,12 @@ SWITCH_DECLARE(char *) switch_vmprintf(const char *zFormat, va_list ap)
return sqlite3_vmprintf(zFormat, ap); return sqlite3_vmprintf(zFormat, ap);
} }
SWITCH_DECLARE(switch_core_db_t *) switch_core_db_open_file(char *filename) SWITCH_DECLARE(switch_core_db_t *) switch_core_db_open_file(const char *filename)
{ {
switch_core_db_t *db; switch_core_db_t *db;
char path[1024]; char path[1024];
db_pick_path(filename, path, sizeof(path)); db_pick_path((char *)filename, path, sizeof(path));
if (switch_core_db_open(path, &db)) { if (switch_core_db_open(path, &db)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR [%s]\n", switch_core_db_errmsg(db)); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR [%s]\n", switch_core_db_errmsg(db));
switch_core_db_close(db); switch_core_db_close(db);

View File

@ -45,6 +45,180 @@ static struct {
int thread_running; int thread_running;
} sql_manager; } sql_manager;
static switch_mutex_t *dbh_mutex = NULL;
static switch_hash_t *dbh_hash = NULL;
#define SQL_CACHE_TIMEOUT 300
static void sql_close(time_t prune)
{
switch_hash_index_t *hi;
const void *var;
void *val;
switch_cache_db_handle_t *dbh = NULL;
int locked = 0;
char *key;
switch_mutex_lock(dbh_mutex);
top:
locked = 0;
for (hi = switch_hash_first(NULL, dbh_hash); hi; hi = switch_hash_next(hi)) {
switch_hash_this(hi, &var, NULL, &val);
key = (char *) var;
if ((dbh = (switch_cache_db_handle_t *) val)) {
time_t diff = 0;
if (prune > 0 && prune > dbh->last_used) {
diff = (time_t) prune - dbh->last_used;
}
if (diff < SQL_CACHE_TIMEOUT) {
continue;
}
if (switch_mutex_trylock(dbh->mutex) == SWITCH_STATUS_SUCCESS) {
if (dbh->db) {
switch_core_db_close(dbh->db);
dbh->db = NULL;
} else if (switch_odbc_available() && dbh->odbc_dbh) {
switch_odbc_handle_destroy(&dbh->odbc_dbh);
}
switch_core_hash_delete(dbh_hash, key);
switch_mutex_unlock(dbh->mutex);
switch_core_destroy_memory_pool(&dbh->pool);
goto top;
} else {
if (!prune) locked++;
continue;
}
}
}
if (locked) {
goto top;
}
switch_mutex_unlock(dbh_mutex);
}
SWITCH_DECLARE(void) switch_cache_db_release_db_handle(switch_cache_db_handle_t **dbh)
{
if (dbh && *dbh) {
switch_mutex_unlock((*dbh)->mutex);
*dbh = NULL;
}
}
SWITCH_DECLARE(switch_cache_db_handle_t *)switch_cache_db_get_db_handle(const char *db_name, const char *odbc_user, const char *odbc_pass)
{
switch_thread_id_t self = switch_thread_self();
char thread_str[256] = "";
switch_cache_db_handle_t *dbh = NULL;
switch_assert(db_name);
snprintf(thread_str, sizeof(thread_str) - 1, "%s_%lu", db_name, (unsigned long)(intptr_t)self);
switch_mutex_lock(dbh_mutex);
if (!(dbh = switch_core_hash_find(dbh_hash, thread_str))) {
switch_memory_pool_t *pool = NULL;
switch_core_db_t *db = NULL;
switch_odbc_handle_t *odbc_dbh = NULL;
if (switch_odbc_available() && db_name && odbc_user && odbc_pass) {
if ((odbc_dbh = switch_odbc_handle_new(db_name, odbc_user, odbc_pass))) {
if (switch_odbc_handle_connect(odbc_dbh) != SWITCH_STATUS_SUCCESS) {
switch_odbc_handle_destroy(&odbc_dbh);
}
}
} else {
db = switch_core_db_open_file(db_name);
}
if (!db && !odbc_dbh) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failure!\n");
goto end;
}
switch_core_new_memory_pool(&pool);
dbh = switch_core_alloc(pool, sizeof(*dbh));
dbh->pool = pool;
if (db) dbh->db = db; else dbh->odbc_dbh = odbc_dbh;
switch_mutex_init(&dbh->mutex, SWITCH_MUTEX_NESTED, dbh->pool);
switch_mutex_lock(dbh->mutex);
switch_core_hash_insert(dbh_hash, thread_str, dbh);
}
end:
if (dbh) dbh->last_used = switch_epoch_time_now(NULL);
switch_mutex_unlock(dbh_mutex);
return dbh;
}
SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql(switch_cache_db_handle_t *dbh, char *sql)
{
switch_status_t status = SWITCH_STATUS_FALSE;
if (switch_odbc_available() && dbh->odbc_dbh) {
switch_odbc_statement_handle_t stmt;
if ((status = switch_odbc_handle_exec(dbh->odbc_dbh, sql, &stmt)) != SWITCH_ODBC_SUCCESS) {
char *err_str;
err_str = switch_odbc_handle_get_error(dbh->odbc_dbh, stmt);
if (!zstr(err_str)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "ERR: [%s]\n[%s]\n", sql, err_str);
}
switch_safe_free(err_str);
}
switch_odbc_statement_handle_free(&stmt);
} else {
char *errmsg;
status = switch_core_db_exec(dbh->db, sql, NULL, NULL, &errmsg);
if (errmsg) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR [%s]\n%s\n", errmsg, sql);
switch_core_db_free(errmsg);
}
}
return status;
}
SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_callback(switch_cache_db_handle_t *dbh,
char *sql, switch_core_db_callback_func_t callback, void *pdata)
{
switch_status_t status = SWITCH_STATUS_FALSE;
char *errmsg = NULL;
if (switch_odbc_available() && dbh->odbc_dbh) {
status = switch_odbc_handle_callback_exec(dbh->odbc_dbh, sql, callback, pdata);
} else {
status = switch_core_db_exec(dbh->db, sql, callback, pdata, &errmsg);
if (errmsg) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR: [%s] %s\n", sql, errmsg);
free(errmsg);
}
}
return status;
}
static switch_status_t switch_core_db_persistant_execute_trans(switch_core_db_t *db, char *sql, uint32_t retries) static switch_status_t switch_core_db_persistant_execute_trans(switch_core_db_t *db, char *sql, uint32_t retries)
{ {
char *errmsg; char *errmsg;
@ -147,6 +321,12 @@ SWITCH_DECLARE(switch_status_t) switch_core_db_persistant_execute(switch_core_db
return status; return status;
} }
#define SQLLEN 1024 * 64 #define SQLLEN 1024 * 64
static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t * thread, void *obj) static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t * thread, void *obj)
{ {
@ -159,6 +339,7 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t * thread,
char *sql; char *sql;
switch_size_t newlen; switch_size_t newlen;
int lc = 0; int lc = 0;
uint32_t loops = 0, sec = 0;
switch_assert(sqlbuf); switch_assert(sqlbuf);
@ -169,6 +350,14 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t * thread,
sql_manager.thread_running = 1; sql_manager.thread_running = 1;
for (;;) { for (;;) {
if (++loops == 1000) {
if (++sec == SQL_CACHE_TIMEOUT) {
sql_close(switch_epoch_time_now(NULL));
sec = 0;
}
loops = 0;
}
if (switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS || if (switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS ||
switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) { switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) {
sql = (char *) pop; sql = (char *) pop;
@ -446,6 +635,9 @@ void switch_core_sqldb_start(switch_memory_pool_t *pool)
sql_manager.memory_pool = pool; sql_manager.memory_pool = pool;
switch_mutex_init(&dbh_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
switch_core_hash_init(&dbh_hash, sql_manager.memory_pool);
/* Activate SQL database */ /* Activate SQL database */
if ((sql_manager.db = switch_core_db_handle()) == 0) { if ((sql_manager.db = switch_core_db_handle()) == 0) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
@ -597,10 +789,13 @@ void switch_core_sqldb_stop(void)
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Waiting for unfinished SQL transactions\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Waiting for unfinished SQL transactions\n");
switch_thread_join(&st, sql_manager.thread); switch_thread_join(&st, sql_manager.thread);
sql_close(0);
switch_core_db_close(sql_manager.db); switch_core_db_close(sql_manager.db);
switch_core_db_close(sql_manager.event_db); switch_core_db_close(sql_manager.event_db);
switch_core_hash_destroy(&dbh_hash);
} }
/* For Emacs: /* For Emacs:

View File

@ -59,7 +59,7 @@ struct switch_odbc_handle {
}; };
#endif #endif
SWITCH_DECLARE(switch_odbc_handle_t *) switch_odbc_handle_new(char *dsn, char *username, char *password) SWITCH_DECLARE(switch_odbc_handle_t *) switch_odbc_handle_new(const char *dsn, const char *username, const char *password)
{ {
#ifdef SWITCH_HAVE_ODBC #ifdef SWITCH_HAVE_ODBC
switch_odbc_handle_t *new_handle; switch_odbc_handle_t *new_handle;