add async select feature to sql queue manager api

This commit is contained in:
Anthony Minessale 2013-01-31 08:59:21 -06:00
parent f0bf3b917d
commit 203f727e1f
2 changed files with 97 additions and 4 deletions

View File

@ -2226,7 +2226,7 @@ SWITCH_DECLARE(void) switch_core_sqldb_resume(void);
\} \}
*/ */
typedef int (*switch_db_event_callback_func_t) (void *pArg, switch_event_t *event); typedef int (*switch_core_db_event_callback_func_t) (void *pArg, switch_event_t *event);
#define CACHE_DB_LEN 256 #define CACHE_DB_LEN 256
typedef enum { typedef enum {
@ -2461,7 +2461,7 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_init_name(const char *n
SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_start(switch_sql_queue_manager_t *qm); SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_start(switch_sql_queue_manager_t *qm);
SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_stop(switch_sql_queue_manager_t *qm); SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_stop(switch_sql_queue_manager_t *qm);
SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_event_callback(switch_cache_db_handle_t *dbh, SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_event_callback(switch_cache_db_handle_t *dbh,
const char *sql, switch_db_event_callback_func_t callback, void *pdata, char **err); const char *sql, switch_core_db_event_callback_func_t callback, void *pdata, char **err);
SWITCH_DECLARE(pid_t) switch_fork(void); SWITCH_DECLARE(pid_t) switch_fork(void);

View File

@ -1017,7 +1017,7 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute_trans_full(sw
} }
struct helper { struct helper {
switch_db_event_callback_func_t callback; switch_core_db_event_callback_func_t callback;
void *pdata; void *pdata;
}; };
@ -1037,7 +1037,7 @@ static int helper_callback(void *pArg, int argc, char **argv, char **columnNames
} }
SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_event_callback(switch_cache_db_handle_t *dbh, SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_event_callback(switch_cache_db_handle_t *dbh,
const char *sql, switch_db_event_callback_func_t callback, void *pdata, char **err) const char *sql, switch_core_db_event_callback_func_t callback, void *pdata, char **err)
{ {
switch_status_t status = SWITCH_STATUS_FALSE; switch_status_t status = SWITCH_STATUS_FALSE;
char *errmsg = NULL; char *errmsg = NULL;
@ -1287,6 +1287,99 @@ static uint32_t qm_ttl(switch_sql_queue_manager_t *qm)
return ttl; return ttl;
} }
struct db_job {
switch_sql_queue_manager_t *qm;
char *sql;
switch_core_db_callback_func_t callback;
switch_core_db_event_callback_func_t event_callback;
void *pdata;
int event;
switch_memory_pool_t *pool;
};
static void *SWITCH_THREAD_FUNC sql_in_thread (switch_thread_t *thread, void *obj)
{
struct db_job *job = (struct db_job *) obj;
switch_memory_pool_t *pool = job->pool;
char *err = NULL;
switch_cache_db_handle_t *dbh;
if (switch_cache_db_get_db_handle_dsn(&dbh, job->qm->dsn) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Cannot connect DSN %s\n", job->qm->dsn);
return NULL;
}
if (job->callback) {
switch_cache_db_execute_sql_callback(dbh, job->sql, job->callback, job->pdata, &err);
} else if (job->event_callback) {
switch_cache_db_execute_sql_event_callback(dbh, job->sql, job->event_callback, job->pdata, &err);
}
if (err) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR: [%s] %s\n", job->sql, err);
free(err);
}
switch_cache_db_release_db_handle(&dbh);
if (pool) {
switch_core_destroy_memory_pool(&pool);
}
return NULL;
}
static switch_thread_data_t *new_job(switch_sql_queue_manager_t *qm, const char *sql,
switch_core_db_callback_func_t callback, switch_core_db_event_callback_func_t event_callback, void *pdata)
{
switch_memory_pool_t *pool;
switch_thread_data_t *td;
struct db_job *job;
switch_core_new_memory_pool(&pool);
td = switch_core_alloc(pool, sizeof(*td));
job = switch_core_alloc(pool, sizeof(*job));
td->func = sql_in_thread;
td->obj = job;
job->sql = switch_core_strdup(pool, sql);
job->qm = qm;
if (callback) {
job->callback = callback;
} else if (event_callback) {
job->event_callback = event_callback;
}
job->pdata = pdata;
job->pool = pool;
return td;
}
SWITCH_DECLARE(void) switch_sql_queue_manger_execute_sql_callback(switch_sql_queue_manager_t *qm,
const char *sql, switch_core_db_callback_func_t callback, void *pdata)
{
switch_thread_data_t *td;
if ((td = new_job(qm, sql, callback, NULL, pdata))) {
switch_thread_pool_launch_thread(&td);
}
}
SWITCH_DECLARE(void) switch_sql_queue_manger_execute_sql_event_callback(switch_sql_queue_manager_t *qm,
const char *sql, switch_core_db_event_callback_func_t callback, void *pdata)
{
switch_thread_data_t *td;
if ((td = new_job(qm, sql, NULL, callback, pdata))) {
switch_thread_pool_launch_thread(&td);
}
}
SWITCH_DECLARE(int) switch_sql_queue_manager_size(switch_sql_queue_manager_t *qm, uint32_t index) SWITCH_DECLARE(int) switch_sql_queue_manager_size(switch_sql_queue_manager_t *qm, uint32_t index)
{ {
int size = 0; int size = 0;