diff --git a/src/include/switch_core.h b/src/include/switch_core.h index 575e4601ae..c53446fb3e 100644 --- a/src/include/switch_core.h +++ b/src/include/switch_core.h @@ -2226,7 +2226,7 @@ SWITCH_DECLARE(void) switch_core_sqldb_resume(void); \} */ -typedef int (*switch_db_event_callback_func_t) (void *pArg, switch_event_t *event); +typedef int (*switch_core_db_event_callback_func_t) (void *pArg, switch_event_t *event); #define CACHE_DB_LEN 256 typedef enum { @@ -2461,7 +2461,7 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_init_name(const char *n SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_start(switch_sql_queue_manager_t *qm); SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_stop(switch_sql_queue_manager_t *qm); SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_event_callback(switch_cache_db_handle_t *dbh, - const char *sql, switch_db_event_callback_func_t callback, void *pdata, char **err); + const char *sql, switch_core_db_event_callback_func_t callback, void *pdata, char **err); SWITCH_DECLARE(pid_t) switch_fork(void); diff --git a/src/switch_core_sqldb.c b/src/switch_core_sqldb.c index 64e72c1bbb..a4d16a3f15 100644 --- a/src/switch_core_sqldb.c +++ b/src/switch_core_sqldb.c @@ -1017,7 +1017,7 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute_trans_full(sw } struct helper { - switch_db_event_callback_func_t callback; + switch_core_db_event_callback_func_t callback; void *pdata; }; @@ -1037,7 +1037,7 @@ static int helper_callback(void *pArg, int argc, char **argv, char **columnNames } SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_event_callback(switch_cache_db_handle_t *dbh, - const char *sql, switch_db_event_callback_func_t callback, void *pdata, char **err) + const char *sql, switch_core_db_event_callback_func_t callback, void *pdata, char **err) { switch_status_t status = SWITCH_STATUS_FALSE; char *errmsg = NULL; @@ -1287,6 +1287,99 @@ static uint32_t qm_ttl(switch_sql_queue_manager_t *qm) return ttl; } +struct db_job { + switch_sql_queue_manager_t *qm; + char *sql; + switch_core_db_callback_func_t callback; + switch_core_db_event_callback_func_t event_callback; + void *pdata; + int event; + switch_memory_pool_t *pool; +}; + +static void *SWITCH_THREAD_FUNC sql_in_thread (switch_thread_t *thread, void *obj) +{ + struct db_job *job = (struct db_job *) obj; + switch_memory_pool_t *pool = job->pool; + char *err = NULL; + switch_cache_db_handle_t *dbh; + + + if (switch_cache_db_get_db_handle_dsn(&dbh, job->qm->dsn) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Cannot connect DSN %s\n", job->qm->dsn); + return NULL; + } + + if (job->callback) { + switch_cache_db_execute_sql_callback(dbh, job->sql, job->callback, job->pdata, &err); + } else if (job->event_callback) { + switch_cache_db_execute_sql_event_callback(dbh, job->sql, job->event_callback, job->pdata, &err); + } + + if (err) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR: [%s] %s\n", job->sql, err); + free(err); + } + + switch_cache_db_release_db_handle(&dbh); + + if (pool) { + switch_core_destroy_memory_pool(&pool); + } + + return NULL; +} + +static switch_thread_data_t *new_job(switch_sql_queue_manager_t *qm, const char *sql, + switch_core_db_callback_func_t callback, switch_core_db_event_callback_func_t event_callback, void *pdata) +{ + switch_memory_pool_t *pool; + switch_thread_data_t *td; + struct db_job *job; + switch_core_new_memory_pool(&pool); + + td = switch_core_alloc(pool, sizeof(*td)); + job = switch_core_alloc(pool, sizeof(*job)); + + td->func = sql_in_thread; + td->obj = job; + + job->sql = switch_core_strdup(pool, sql); + job->qm = qm; + + if (callback) { + job->callback = callback; + } else if (event_callback) { + job->event_callback = event_callback; + } + + job->pdata = pdata; + job->pool = pool; + + return td; +} + + +SWITCH_DECLARE(void) switch_sql_queue_manger_execute_sql_callback(switch_sql_queue_manager_t *qm, + const char *sql, switch_core_db_callback_func_t callback, void *pdata) +{ + + switch_thread_data_t *td; + if ((td = new_job(qm, sql, callback, NULL, pdata))) { + switch_thread_pool_launch_thread(&td); + } +} + +SWITCH_DECLARE(void) switch_sql_queue_manger_execute_sql_event_callback(switch_sql_queue_manager_t *qm, + const char *sql, switch_core_db_event_callback_func_t callback, void *pdata) +{ + + switch_thread_data_t *td; + if ((td = new_job(qm, sql, NULL, callback, pdata))) { + switch_thread_pool_launch_thread(&td); + } +} + SWITCH_DECLARE(int) switch_sql_queue_manager_size(switch_sql_queue_manager_t *qm, uint32_t index) { int size = 0;