diff --git a/src/include/switch_core.h b/src/include/switch_core.h index 564694dbac..01f4a87111 100644 --- a/src/include/switch_core.h +++ b/src/include/switch_core.h @@ -1355,7 +1355,6 @@ SWITCH_DECLARE(switch_status_t) switch_core_hash_init_case(_Out_ switch_hash_t * #define switch_core_hash_init_nocase(_hash) switch_core_hash_init_case(_hash, SWITCH_FALSE) - /*! \brief Destroy an existing hash table \param hash the hash to destroy @@ -2541,6 +2540,9 @@ SWITCH_DECLARE(void) switch_core_recovery_untrack(switch_core_session_t *session SWITCH_DECLARE(void) switch_core_recovery_track(switch_core_session_t *session); SWITCH_DECLARE(void) switch_core_recovery_flush(const char *technology, const char *profile_name); +SWITCH_DECLARE(void) switch_sql_queue_manager_pause(switch_sql_queue_manager_t *qm, switch_bool_t flush); +SWITCH_DECLARE(void) switch_sql_queue_manager_resume(switch_sql_queue_manager_t *qm); + SWITCH_DECLARE(int) switch_sql_queue_manager_size(switch_sql_queue_manager_t *qm, uint32_t index); SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_push_confirm(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup); SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_push(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup); diff --git a/src/switch_core_sqldb.c b/src/switch_core_sqldb.c index 054cfad2f5..0195e4ac6e 100644 --- a/src/switch_core_sqldb.c +++ b/src/switch_core_sqldb.c @@ -1266,6 +1266,7 @@ struct switch_sql_queue_manager { switch_memory_pool_t *pool; uint32_t max_trans; uint32_t confirm; + uint8_t paused; }; static int qm_wake(switch_sql_queue_manager_t *qm) @@ -1400,6 +1401,52 @@ SWITCH_DECLARE(void) switch_sql_queue_manger_execute_sql_event_callback(switch_s } } + +static void do_flush(switch_sql_queue_manager_t *qm, int i, switch_cache_db_handle_t *dbh) +{ + void *pop = NULL; + switch_queue_t *q = qm->sql_queue[i]; + + switch_mutex_lock(qm->mutex); + while (switch_queue_trypop(q, &pop) == SWITCH_STATUS_SUCCESS) { + if (pop) { + if (dbh) { + switch_cache_db_execute_sql(dbh, (char *) pop, NULL); + } + free(pop); + } + } + switch_mutex_unlock(qm->mutex); + +} + + +SWITCH_DECLARE(void) switch_sql_queue_manager_resume(switch_sql_queue_manager_t *qm) +{ + switch_mutex_lock(qm->mutex); + qm->paused = 0; + switch_mutex_unlock(qm->mutex); + + qm_wake(qm); + +} + +SWITCH_DECLARE(void) switch_sql_queue_manager_pause(switch_sql_queue_manager_t *qm, switch_bool_t flush) +{ + uint32_t i; + + switch_mutex_lock(qm->mutex); + qm->paused = 1; + switch_mutex_unlock(qm->mutex); + + if (flush) { + for(i = 0; i < qm->numq; i++) { + do_flush(qm, i, NULL); + } + } + +} + SWITCH_DECLARE(int) switch_sql_queue_manager_size(switch_sql_queue_manager_t *qm, uint32_t index) { int size = 0; @@ -1462,25 +1509,6 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_start(switch_sql_queue_ return SWITCH_STATUS_FALSE; } - -static void do_flush(switch_sql_queue_manager_t *qm, int i, switch_cache_db_handle_t *dbh) -{ - void *pop = NULL; - switch_queue_t *q = qm->sql_queue[i]; - - switch_mutex_lock(qm->mutex); - while (switch_queue_trypop(q, &pop) == SWITCH_STATUS_SUCCESS) { - if (pop) { - if (dbh) { - switch_cache_db_execute_sql(dbh, (char *) pop, NULL); - } - free(pop); - } - } - switch_mutex_unlock(qm->mutex); - -} - SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_destroy(switch_sql_queue_manager_t **qmp) { switch_sql_queue_manager_t *qm; @@ -1854,6 +1882,10 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, uint32_t i, lc; uint32_t written = 0, iterations = 0; + if (qm->paused) { + goto check; + } + if (sql_manager.paused) { for (i = 0; i < qm->numq; i++) { do_flush(qm, i, NULL);