add switch_sql_queue_manager_pause and switch_sql_queue_manager_resume

This commit is contained in:
Anthony Minessale 2014-03-12 11:42:37 -05:00
parent 60bb1602d2
commit efef505e26
2 changed files with 54 additions and 20 deletions

View File

@ -1355,7 +1355,6 @@ SWITCH_DECLARE(switch_status_t) switch_core_hash_init_case(_Out_ switch_hash_t *
#define switch_core_hash_init_nocase(_hash) switch_core_hash_init_case(_hash, SWITCH_FALSE)
/*!
\brief Destroy an existing hash table
\param hash the hash to destroy
@ -2541,6 +2540,9 @@ SWITCH_DECLARE(void) switch_core_recovery_untrack(switch_core_session_t *session
SWITCH_DECLARE(void) switch_core_recovery_track(switch_core_session_t *session);
SWITCH_DECLARE(void) switch_core_recovery_flush(const char *technology, const char *profile_name);
SWITCH_DECLARE(void) switch_sql_queue_manager_pause(switch_sql_queue_manager_t *qm, switch_bool_t flush);
SWITCH_DECLARE(void) switch_sql_queue_manager_resume(switch_sql_queue_manager_t *qm);
SWITCH_DECLARE(int) switch_sql_queue_manager_size(switch_sql_queue_manager_t *qm, uint32_t index);
SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_push_confirm(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup);
SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_push(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup);

View File

@ -1266,6 +1266,7 @@ struct switch_sql_queue_manager {
switch_memory_pool_t *pool;
uint32_t max_trans;
uint32_t confirm;
uint8_t paused;
};
static int qm_wake(switch_sql_queue_manager_t *qm)
@ -1400,6 +1401,52 @@ SWITCH_DECLARE(void) switch_sql_queue_manger_execute_sql_event_callback(switch_s
}
}
static void do_flush(switch_sql_queue_manager_t *qm, int i, switch_cache_db_handle_t *dbh)
{
void *pop = NULL;
switch_queue_t *q = qm->sql_queue[i];
switch_mutex_lock(qm->mutex);
while (switch_queue_trypop(q, &pop) == SWITCH_STATUS_SUCCESS) {
if (pop) {
if (dbh) {
switch_cache_db_execute_sql(dbh, (char *) pop, NULL);
}
free(pop);
}
}
switch_mutex_unlock(qm->mutex);
}
SWITCH_DECLARE(void) switch_sql_queue_manager_resume(switch_sql_queue_manager_t *qm)
{
switch_mutex_lock(qm->mutex);
qm->paused = 0;
switch_mutex_unlock(qm->mutex);
qm_wake(qm);
}
SWITCH_DECLARE(void) switch_sql_queue_manager_pause(switch_sql_queue_manager_t *qm, switch_bool_t flush)
{
uint32_t i;
switch_mutex_lock(qm->mutex);
qm->paused = 1;
switch_mutex_unlock(qm->mutex);
if (flush) {
for(i = 0; i < qm->numq; i++) {
do_flush(qm, i, NULL);
}
}
}
SWITCH_DECLARE(int) switch_sql_queue_manager_size(switch_sql_queue_manager_t *qm, uint32_t index)
{
int size = 0;
@ -1462,25 +1509,6 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_start(switch_sql_queue_
return SWITCH_STATUS_FALSE;
}
static void do_flush(switch_sql_queue_manager_t *qm, int i, switch_cache_db_handle_t *dbh)
{
void *pop = NULL;
switch_queue_t *q = qm->sql_queue[i];
switch_mutex_lock(qm->mutex);
while (switch_queue_trypop(q, &pop) == SWITCH_STATUS_SUCCESS) {
if (pop) {
if (dbh) {
switch_cache_db_execute_sql(dbh, (char *) pop, NULL);
}
free(pop);
}
}
switch_mutex_unlock(qm->mutex);
}
SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_destroy(switch_sql_queue_manager_t **qmp)
{
switch_sql_queue_manager_t *qm;
@ -1854,6 +1882,10 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
uint32_t i, lc;
uint32_t written = 0, iterations = 0;
if (qm->paused) {
goto check;
}
if (sql_manager.paused) {
for (i = 0; i < qm->numq; i++) {
do_flush(qm, i, NULL);