From 8778cda16d503c9845f6956c27fd4fc253f56a02 Mon Sep 17 00:00:00 2001
From: Andrey Volk <andywolk@gmail.com>
Date: Thu, 9 Jan 2020 22:48:34 +0400
Subject: [PATCH] [Core] Wait sql threads to spin up. Fixes module_exists,
 show_codec commands.

---
 src/switch_core_sqldb.c     | 17 ++++++++++---
 tests/unit/switch_core_db.c | 50 +++++++++++++++++++++++++++++++++++++
 2 files changed, 63 insertions(+), 4 deletions(-)

diff --git a/src/switch_core_sqldb.c b/src/switch_core_sqldb.c
index 7f5c2aff52..98c50c0d8d 100644
--- a/src/switch_core_sqldb.c
+++ b/src/switch_core_sqldb.c
@@ -1616,6 +1616,7 @@ struct switch_sql_queue_manager {
 	uint32_t numq;
 	char *dsn;
 	switch_thread_t *thread;
+	int thread_initiated;
 	int thread_running;
 	switch_thread_cond_t *cond;
 	switch_mutex_t *cond_mutex;
@@ -1899,8 +1900,15 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_start(switch_sql_queue_
 		switch_threadattr_create(&thd_attr, qm->pool);
 		switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
 		switch_threadattr_priority_set(thd_attr, SWITCH_PRI_NORMAL);
-		switch_thread_create(&qm->thread, thd_attr, switch_user_sql_thread, qm, qm->pool);
-		return SWITCH_STATUS_SUCCESS;
+		if (switch_thread_create(&qm->thread, thd_attr, switch_user_sql_thread, qm, qm->pool) == SWITCH_STATUS_SUCCESS) {
+			while (!qm->thread_initiated) {
+				switch_cond_next();
+			}
+
+			if (qm->event_db) {
+				return SWITCH_STATUS_SUCCESS;
+			}
+		}
 	}
 
 	return SWITCH_STATUS_FALSE;
@@ -2274,11 +2282,10 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
 
 	if (!qm->event_db) {
 		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s Error getting db handle\n", qm->name);
+		qm->thread_initiated = 1;
 		return NULL;
 	}
 
-	qm->thread_running = 1;
-
 	switch_mutex_lock(qm->cond_mutex);
 
 	switch (qm->event_db->type) {
@@ -2296,6 +2303,8 @@ static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread,
 		break;
 	}
 
+	qm->thread_initiated = 1;
+	qm->thread_running = 1;
 
 	while (qm->thread_running == 1) {
 		uint32_t i, lc;
diff --git a/tests/unit/switch_core_db.c b/tests/unit/switch_core_db.c
index 244ab38398..5ebf41c8ab 100644
--- a/tests/unit/switch_core_db.c
+++ b/tests/unit/switch_core_db.c
@@ -34,6 +34,18 @@
 
 #include <test/switch_test.h>
 
+int max_rows = 150;
+
+int status = 0;
+
+int table_count_func(void *pArg, int argc, char **argv, char **columnNames){
+	if (argc > 0) {
+		status = atoi(argv[0]);
+	}
+
+	return -1;
+}
+
 FST_CORE_DB_BEGIN("./conf")
 {
 	FST_SUITE_BEGIN(switch_core_db)
@@ -69,6 +81,44 @@ FST_CORE_DB_BEGIN("./conf")
 			fst_check_string_equals(res2, "");
 		}
 		FST_TEST_END()
+
+		FST_TEST_BEGIN(test_switch_cache_db_queue_manager_race)
+		{
+			int i;
+			switch_sql_queue_manager_t *qm = NULL;
+
+			switch_sql_queue_manager_init_name("TEST",
+				&qm,
+				4,
+				"test_switch_cache_db_queue_manager_race",
+				SWITCH_MAX_TRANS,
+				NULL, NULL, NULL, NULL);
+
+			switch_sql_queue_manager_start(qm);
+
+			switch_sql_queue_manager_push_confirm(qm, "DROP TABLE IF EXISTS t;", 0, SWITCH_TRUE);
+			switch_sql_queue_manager_push_confirm(qm, "CREATE TABLE t (col1 INT);", 0, SWITCH_TRUE);
+
+			for (i = 0; i < max_rows; i++) {
+				switch_sql_queue_manager_push(qm, "INSERT INTO t (col1) VALUES (1);", 0, SWITCH_TRUE);
+			}
+
+			switch_sleep(1 * 1000 * 1000);
+
+			switch_sql_queue_manager_execute_sql_callback(qm, "SELECT COUNT(col1) FROM t;", table_count_func, NULL);
+
+			while (switch_sql_queue_manager_size(qm, 0)) {
+				switch_cond_next();
+			}
+
+			switch_sql_queue_manager_stop(qm);
+			switch_sql_queue_manager_destroy(&qm);
+
+			fst_check_int_equals(status, max_rows);
+		}
+		FST_TEST_END()
+
+
 	}
 	FST_SUITE_END()
 }