/* 
 * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
 * Copyright (C) 2005-2009, Anthony Minessale II <anthm@freeswitch.org>
 *
 * Version: MPL 1.1
 *
 * The contents of this file are subject to the Mozilla Public License Version
 * 1.1 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 * http://www.mozilla.org/MPL/
 *
 * Software distributed under the License is distributed on an "AS IS" basis,
 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
 * for the specific language governing rights and limitations under the
 * License.
 *
 * The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
 *
 * The Initial Developer of the Original Code is
 * Anthony Minessale II <anthm@freeswitch.org>
 * Portions created by the Initial Developer are Copyright (C)
 * the Initial Developer. All Rights Reserved.
 *
 * Contributor(s):
 * 
 * Anthony Minessale II <anthm@freeswitch.org>
 * Michael Jerris <mike@jerris.com>
 * Paul D. Tinsley <pdt at jackhammer.org>
 *
 *
 * switch_core_sqldb.c -- Main Core Library (statistics tracker)
 *
 */

#include <switch.h>
#include "private/switch_core_pvt.h"

static struct {
	switch_cache_db_handle_t *event_db;
	switch_queue_t *sql_queue[2];
	switch_memory_pool_t *memory_pool;
	switch_event_node_t *event_node;
	switch_thread_t *thread;
	int thread_running;
	switch_bool_t manage;
} sql_manager;

static switch_mutex_t *dbh_mutex = NULL;
static switch_hash_t *dbh_hash = NULL;

#define SQL_CACHE_TIMEOUT 300

static void sql_close(time_t prune)
{
	switch_hash_index_t *hi;
	const void *var;
	void *val;
	switch_cache_db_handle_t *dbh = NULL;
	int locked = 0;
	char *key;
	
	switch_mutex_lock(dbh_mutex);
 top:
	locked = 0;
	
	for (hi = switch_hash_first(NULL, dbh_hash); hi; hi = switch_hash_next(hi)) {
		switch_hash_this(hi, &var, NULL, &val);
		key = (char *) var;

		if ((dbh = (switch_cache_db_handle_t *) val)) {
			time_t diff = 0;

			if (prune > 0 && prune > dbh->last_used) {
				diff = (time_t) prune - dbh->last_used;
			}
			
			if (prune > 0 && diff < SQL_CACHE_TIMEOUT) {
				continue;
			}

			if (switch_mutex_trylock(dbh->mutex) == SWITCH_STATUS_SUCCESS) {
				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Dropping idle DB connection %s\n", key);

				switch (dbh->type) {
				case SCDB_TYPE_ODBC:
					{
						switch_odbc_handle_destroy(&dbh->native_handle.odbc_dbh);
					}
					break;
				case SCDB_TYPE_CORE_DB:
					{
						switch_core_db_close(dbh->native_handle.core_db_dbh);
						dbh->native_handle.core_db_dbh = NULL;	
					}
					break;
				}

				switch_core_hash_delete(dbh_hash, key);
				switch_mutex_unlock(dbh->mutex);
				switch_core_destroy_memory_pool(&dbh->pool);
				goto top;
	
			} else {
				if (!prune) locked++;
				continue;
			}
		}
	}

	if (locked) {
		goto top;
	}

	switch_mutex_unlock(dbh_mutex);
}


SWITCH_DECLARE(void) switch_cache_db_release_db_handle(switch_cache_db_handle_t **dbh)
{
	if (dbh && *dbh) {
		switch_mutex_unlock((*dbh)->mutex);
		*dbh = NULL;
	}
}


SWITCH_DECLARE(void) switch_cache_db_destroy_db_handle(switch_cache_db_handle_t **dbh)
{
	if (dbh && *dbh) {
		switch_mutex_lock(dbh_mutex);
		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Deleting DB connection %s\n", (*dbh)->name);

		switch ((*dbh)->type) {
		case SCDB_TYPE_ODBC:
			{
				switch_odbc_handle_destroy(&(*dbh)->native_handle.odbc_dbh);
			}
			break;
		case SCDB_TYPE_CORE_DB:
			{
				switch_core_db_close((*dbh)->native_handle.core_db_dbh);
				(*dbh)->native_handle.core_db_dbh = NULL;		
			}
			break;
		}

		
		switch_core_hash_delete(dbh_hash, (*dbh)->name);		
		switch_mutex_unlock((*dbh)->mutex);
		switch_core_destroy_memory_pool(&(*dbh)->pool);
		*dbh = NULL;
		switch_mutex_unlock(dbh_mutex);
	}
}

SWITCH_DECLARE(void) switch_cache_db_detach(void)
{
	char thread_str[CACHE_DB_LEN] = "";
	switch_hash_index_t *hi;
	const void *var;
	void *val;
	char *key;
	switch_cache_db_handle_t *dbh = NULL;

	snprintf(thread_str, sizeof(thread_str) - 1, "%lu", (unsigned long)(intptr_t)switch_thread_self());
	switch_mutex_lock(dbh_mutex);

	for (hi = switch_hash_first(NULL, dbh_hash); hi; hi = switch_hash_next(hi)) {
		switch_hash_this(hi, &var, NULL, &val);
		key = (char *) var;
		if ((dbh = (switch_cache_db_handle_t *) val)) {
			if (switch_mutex_trylock(dbh->mutex) == SWITCH_STATUS_SUCCESS) {
				if (strstr(dbh->name, thread_str)) {
					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG10, 
									  "Detach cached DB handle %s [%s]\n", thread_str, switch_cache_db_type_name(dbh->type));
					switch_clear_flag(dbh, CDF_INUSE);
				} 
				switch_mutex_unlock(dbh->mutex);
			}
		}
	}

	switch_mutex_unlock(dbh_mutex);
}

SWITCH_DECLARE(switch_status_t)_switch_cache_db_get_db_handle(switch_cache_db_handle_t **dbh,
															  switch_cache_db_handle_type_t type,
															  switch_cache_db_connection_options_t *connection_options,
															  const char *file, const char *func, int line)
{
	switch_thread_id_t self = switch_thread_self();
	char thread_str[CACHE_DB_LEN] = "";
	char db_str[CACHE_DB_LEN] = "";
	char db_callsite_str[CACHE_DB_LEN] = "";
	switch_cache_db_handle_t *new_dbh = NULL;
	switch_ssize_t hlen = -1;

	const char *db_name = NULL;
	const char *db_user = NULL;
	const char *db_pass = NULL;

	switch (type) {
	case SCDB_TYPE_ODBC:
		{
			db_name = connection_options->odbc_options.dsn;
			db_user = connection_options->odbc_options.user;
			db_pass = connection_options->odbc_options.pass;
		}
		break;
	case SCDB_TYPE_CORE_DB:
		{
			db_name = connection_options->core_db_options.db_path;
			db_user = "";
			db_pass = "";
		}
		break;
	}

	if (!db_name) {
		return SWITCH_STATUS_FALSE;
	}

	snprintf(db_str, sizeof(db_str) - 1, "db=\"%s\";user=\"%s\";pass=\"%s\"", db_name, db_user, db_pass);
	snprintf(thread_str, sizeof(thread_str) - 1, "%s;thread=\"%lu\"", db_str, (unsigned long)(intptr_t)self);
	snprintf(db_callsite_str, sizeof(db_callsite_str) - 1, "%s:%d", file, line);
	
	switch_mutex_lock(dbh_mutex);
	if ((new_dbh = switch_core_hash_find(dbh_hash, thread_str))) {
		switch_set_string(new_dbh->last_user, db_callsite_str);
		switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_DEBUG10, 
						  "Reuse Cached DB handle %s [%s]\n", thread_str, switch_cache_db_type_name(new_dbh->type));
	} else {
		switch_hash_index_t *hi;
		const void *var;
		void *val;
		char *key;
		unsigned long hash = 0;
		
		hash = switch_ci_hashfunc_default(db_str, &hlen);

		for (hi = switch_hash_first(NULL, dbh_hash); hi; hi = switch_hash_next(hi)) {
			switch_hash_this(hi, &var, NULL, &val);
			key = (char *) var;
			
			if ((new_dbh = (switch_cache_db_handle_t *) val)) {
				if (hash == new_dbh->hash && !strncasecmp(new_dbh->name, db_str, strlen(db_str)) && 
					!switch_test_flag(new_dbh, CDF_INUSE) && switch_mutex_trylock(new_dbh->mutex) == SWITCH_STATUS_SUCCESS) {
					switch_set_flag(new_dbh, CDF_INUSE);
					switch_set_string(new_dbh->name, thread_str);
					new_dbh->hash = switch_ci_hashfunc_default(db_str, &hlen);
					switch_set_string(new_dbh->last_user, db_callsite_str);
					switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_DEBUG10, 
									  "Reuse Unused Cached DB handle %s [%s]\n", thread_str, switch_cache_db_type_name(new_dbh->type));
					break;
				}
			}
			new_dbh = NULL;
		}
	}

	if (!new_dbh) {
		switch_memory_pool_t *pool = NULL;
		switch_core_db_t *db = NULL;
		switch_odbc_handle_t *odbc_dbh = NULL;

		switch (type) {
		case SCDB_TYPE_ODBC:
			{

				if (!switch_odbc_available()) {
					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failure! OBDC NOT AVAILABLE!\n");
					goto end;
				}

				if ((odbc_dbh = switch_odbc_handle_new(connection_options->odbc_options.dsn, 
													   connection_options->odbc_options.user,
													   connection_options->odbc_options.pass))) {
					if (switch_odbc_handle_connect(odbc_dbh) != SWITCH_STATUS_SUCCESS) {
						switch_odbc_handle_destroy(&odbc_dbh);
					}
				}
				

			}
			break;
		case SCDB_TYPE_CORE_DB:
			{
				db = switch_core_db_open_file(connection_options->core_db_options.db_path);
			}
			break;

		default:
			goto end;
		}

		if (!db && !odbc_dbh) {
			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failure!\n");
			goto end;
		}

		switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_DEBUG10, 
						  "Create Cached DB handle %s [%s]\n", thread_str, switch_cache_db_type_name(type));

		switch_core_new_memory_pool(&pool);
		new_dbh = switch_core_alloc(pool, sizeof(*new_dbh));
		new_dbh->pool = pool;
		new_dbh->type = type;
		switch_set_string(new_dbh->name, thread_str);
		switch_set_flag(new_dbh, CDF_INUSE);
		new_dbh->hash = switch_ci_hashfunc_default(db_str, &hlen);


		if (db) new_dbh->native_handle.core_db_dbh = db; else new_dbh->native_handle.odbc_dbh = odbc_dbh;
		switch_mutex_init(&new_dbh->mutex, SWITCH_MUTEX_UNNESTED, new_dbh->pool);
		switch_set_string(new_dbh->creator, db_callsite_str);
		switch_mutex_lock(new_dbh->mutex);

		switch_core_hash_insert(dbh_hash, new_dbh->name, new_dbh);
	}

 end:

	if (new_dbh) new_dbh->last_used = switch_epoch_time_now(NULL);

	switch_mutex_unlock(dbh_mutex);

	*dbh = new_dbh;

	return *dbh ? SWITCH_STATUS_SUCCESS : SWITCH_STATUS_FALSE;
}


static switch_status_t switch_cache_db_execute_sql_real(switch_cache_db_handle_t *dbh, const char *sql, char **err)
{
	switch_status_t status = SWITCH_STATUS_FALSE;
	char *errmsg = NULL;

	if (err) *err = NULL;
	
	switch (dbh->type) {
	case SCDB_TYPE_ODBC:
		{
			switch_odbc_statement_handle_t stmt = NULL;
			if ((status = switch_odbc_handle_exec(dbh->native_handle.odbc_dbh, sql, &stmt)) != SWITCH_ODBC_SUCCESS) {
				errmsg = switch_odbc_handle_get_error(dbh->native_handle.odbc_dbh, stmt);
			}
			switch_odbc_statement_handle_free(&stmt);
		}
		break;
	case SCDB_TYPE_CORE_DB:
		{
			status = switch_core_db_exec(dbh->native_handle.core_db_dbh, sql, NULL, NULL, &errmsg);
		}
		break;
	}

	if (errmsg) {
		if (!switch_stristr("already exists", errmsg)) {
			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR [%s]\n%s\n", errmsg, sql);
		}
		if (err) {
			*err = errmsg;
		} else {
			free(errmsg);
		}
	}
	
	return status;
}

/**
   OMFG you cruel bastards.  Who chooses 64k as a max buffer len for a sql statement, have you ever heard of transactions?
 **/
static switch_status_t switch_cache_db_execute_sql_chunked(switch_cache_db_handle_t *dbh, char *sql, uint32_t chunk_size, char **err)
{
	switch_status_t status = SWITCH_STATUS_FALSE;
	char *p, *s, *e;
	int chunk_count;
	switch_size_t len;

	switch_assert(chunk_size);

	if (err) *err = NULL;

	len = strlen(sql);

	if (chunk_size > len) {
		return switch_cache_db_execute_sql_real(dbh, sql, err);
	}

	if (!(chunk_count = strlen(sql) / chunk_size)) {
		return SWITCH_STATUS_FALSE;
	}
	
	e = end_of_p(sql);
	s = sql;

	while(s && s < e) {
		p = s + chunk_size;

		if (p > e) {
			p = e;
		}
		
		while (p > s) {
			if (*p == '\n' && *(p-1) == ';') {
				*p = '\0';
				*(p-1) = '\0';
				break;
			}
			
			p--;
		}

		if (p <= s) break;
		
		
		status = switch_cache_db_execute_sql_real(dbh, s, err);
		if (status != SWITCH_STATUS_SUCCESS || (err && *err)) {
			break;
		}

		s = p;
	
	}

	return status;

}


SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql(switch_cache_db_handle_t *dbh, char *sql, char **err)
{
	switch_status_t status = SWITCH_STATUS_FALSE;


	switch (dbh->type) {
	default:
		{
			status = switch_cache_db_execute_sql_chunked(dbh, (char *)sql, 32768, err);
		}
		break;
	}

	return status;

}


SWITCH_DECLARE(char *) switch_cache_db_execute_sql2str(switch_cache_db_handle_t *dbh, char *sql, char *str, size_t len, char **err)
{
	switch_status_t status = SWITCH_STATUS_FALSE;


	switch (dbh->type) {
	case SCDB_TYPE_CORE_DB:
		{
			switch_core_db_stmt_t *stmt;
			
			if (switch_core_db_prepare(dbh->native_handle.core_db_dbh, sql, -1, &stmt, 0)) {
				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Statement Error [%s]!\n", sql);
				return NULL;
			} else {
				int running = 1;
				int colcount;

				while (running < 5000) {
					int result = switch_core_db_step(stmt);
					const unsigned char *txt;
					
					if (result == SWITCH_CORE_DB_ROW) {
						if ((colcount = switch_core_db_column_count(stmt)) > 0) {
							if ((txt = switch_core_db_column_text(stmt, 0))) {
								switch_copy_string(str, (char *) txt, len);
								status = SWITCH_STATUS_SUCCESS;
							} else {
								goto end;
							}
						}
						break;
					} else if (result == SWITCH_CORE_DB_BUSY) {
						running++;
						switch_cond_next();
						continue;
					}
					break;
				}
				
				switch_core_db_finalize(stmt);
			}
		}
		break;
	case SCDB_TYPE_ODBC:
		{
			status = switch_odbc_handle_exec_string(dbh->native_handle.odbc_dbh, sql, str, len);
		}
		break;
	}

 end:

	return status == SWITCH_STATUS_SUCCESS ? str : NULL;

}

SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute(switch_cache_db_handle_t *dbh, const char *sql, uint32_t retries)
{
	char *errmsg = NULL;
	switch_status_t status = SWITCH_STATUS_FALSE;
	uint8_t forever = 0;

	if (!retries) {
		forever = 1;
		retries = 1000;
	}

	while (retries > 0) {
		switch_cache_db_execute_sql_real(dbh, sql, &errmsg);
		if (errmsg) {
			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR [%s]\n", errmsg);
			switch_safe_free(errmsg);
			switch_yield(100000);
			retries--;
			if (retries == 0 && forever) {
				retries = 1000;
				continue;
			}
		} else {
			status = SWITCH_STATUS_SUCCESS;
			break;
		}
	}

	return status;
}


SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute_trans(switch_cache_db_handle_t *dbh, char *sql, uint32_t retries)
{
	char *errmsg = NULL;
	switch_status_t status = SWITCH_STATUS_FALSE;
	uint8_t forever = 0;
	unsigned begin_retries = 100;
	uint8_t again = 0;
	
	if (!retries) {
		forever = 1;
		retries = 1000;
	}

again:

	while (begin_retries > 0) {
		again = 0;

		switch_cache_db_execute_sql_real(dbh,  "BEGIN", &errmsg);

		if (errmsg) {
			begin_retries--;
			if (strstr(errmsg, "cannot start a transaction within a transaction")) {
				again = 1;
			} else {
				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "SQL Retry [%s]\n", errmsg);
			}
			free(errmsg);
			errmsg = NULL;

			if (again) {
				switch_cache_db_execute_sql_real(dbh, "COMMIT", NULL);
				goto again;
			}

			switch_yield(100000);

			if (begin_retries == 0) {
				goto done;
			}
		} else {
			break;
		}

	}

	while (retries > 0) {

		switch_cache_db_execute_sql(dbh, sql, &errmsg);


		if (errmsg) {
			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR [%s]\n", errmsg);
			free(errmsg);
			errmsg = NULL;
			switch_yield(100000);
			retries--;
			if (retries == 0 && forever) {
				retries = 1000;
				continue;
			}
		} else {
			status = SWITCH_STATUS_SUCCESS;
			break;
		}
	}

done:

	switch_cache_db_execute_sql_real(dbh,  "COMMIT", NULL);

	return status;
}

SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_callback(switch_cache_db_handle_t *dbh, 
																	 const char *sql, switch_core_db_callback_func_t callback, void *pdata, char **err)
	
{
	switch_status_t status = SWITCH_STATUS_FALSE;
	char *errmsg = NULL;

	if (err) *err = NULL;



	switch (dbh->type) {
	case SCDB_TYPE_ODBC:
		{
			status = switch_odbc_handle_callback_exec(dbh->native_handle.odbc_dbh, sql, callback, pdata, err);
		}
		break;
	case SCDB_TYPE_CORE_DB:
		{
			status = switch_core_db_exec(dbh->native_handle.core_db_dbh, sql, callback, pdata, &errmsg);
			
			if (errmsg) {
				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR: [%s] %s\n", sql, errmsg);
				free(errmsg);
			}
		}
		break;
	}
	
	return status;
}

SWITCH_DECLARE(void) switch_cache_db_test_reactive(switch_cache_db_handle_t *dbh, const char *test_sql, const char *drop_sql, const char *reactive_sql)
{
	char *errmsg;

	switch (dbh->type) {
	case SCDB_TYPE_ODBC:
		{
			if (switch_odbc_handle_exec(dbh->native_handle.odbc_dbh, test_sql, NULL) != SWITCH_ODBC_SUCCESS) {
				if (drop_sql) {
					switch_odbc_handle_exec(dbh->native_handle.odbc_dbh, drop_sql, NULL);
				}
				switch_odbc_handle_exec(dbh->native_handle.odbc_dbh, reactive_sql, NULL);
			}
		}
		break;
	case SCDB_TYPE_CORE_DB:
		{
			if (test_sql) {
				switch_core_db_exec(dbh->native_handle.core_db_dbh, test_sql, NULL, NULL, &errmsg);

				if (errmsg) {
					switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "SQL ERR [%s]\n[%s]\nAuto Generating Table!\n", errmsg, test_sql);
					switch_core_db_free(errmsg);
					errmsg = NULL;
					if (drop_sql) {
						switch_core_db_exec(dbh->native_handle.core_db_dbh, drop_sql, NULL, NULL, &errmsg);
					}
					if (errmsg) {
						switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "SQL ERR [%s]\n[%s]\n", errmsg, reactive_sql);
						switch_core_db_free(errmsg);
						errmsg = NULL;
					}
					switch_core_db_exec(dbh->native_handle.core_db_dbh, reactive_sql, NULL, NULL, &errmsg);
					if (errmsg) {
						switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "SQL ERR [%s]\n[%s]\n", errmsg, reactive_sql);
						switch_core_db_free(errmsg);
						errmsg = NULL;
					}
				}
			}
		}
		break;
	}
}



#define SQLLEN 1024 * 64
static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t * thread, void *obj)
{
	void *pop;
	uint32_t itterations = 0;
	uint8_t trans = 0, nothing_in_queue = 0;
	uint32_t target = 50000;
	switch_size_t len = 0, sql_len = SQLLEN;
	char *tmp, *sqlbuf = (char *) malloc(sql_len);
	char *sql;
	switch_size_t newlen;
	int lc = 0;
	uint32_t loops = 0, sec = 0;
	uint32_t l1 = 1000;

	switch_assert(sqlbuf);

	if (!sql_manager.manage) {
		l1 = 10;
	}

	if (!sql_manager.event_db) {
		switch_core_db_handle(&sql_manager.event_db);
	}

	sql_manager.thread_running = 1;

	while(sql_manager.thread_running == 1) {
		if (++loops == l1) {
			if (++sec == SQL_CACHE_TIMEOUT) {
				sql_close(switch_epoch_time_now(NULL));
				sec = 0;
			}
			loops = 0;
		}
		
		if (!sql_manager.manage) {
			switch_yield(100000);
			continue;
		}
		
		if (switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS || 
			switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) {
			sql = (char *) pop;

			if (sql) {
				newlen = strlen(sql) + 2;

				if (itterations == 0) {
					trans = 1;
				}
				
				/* ignore abnormally large strings sql strings as potential buffer overflow */
				if (newlen < SQLLEN) {
					itterations++;
					if (len + newlen > sql_len) {
						sql_len = len + SQLLEN;
						if (!(tmp = realloc(sqlbuf, sql_len))) {
							switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread ending on mem err\n");
							abort();
							break;
						}
						sqlbuf = tmp;
					}
					sprintf(sqlbuf + len, "%s;\n", sql);
					len += newlen;

				}
				switch_core_db_free(sql);
			} else {
				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "SQL thread ending\n");
				break;
			}
		} else {
			nothing_in_queue = 1;
		}
		
		
		if (trans && ((itterations == target) || (nothing_in_queue && ++lc >= 500))) {
			if (switch_cache_db_persistant_execute_trans(sql_manager.event_db, sqlbuf, 1) != SWITCH_STATUS_SUCCESS) {
				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread unable to commit transaction, records lost!\n");
			}
			itterations = 0;
			trans = 0;
			nothing_in_queue = 0;
			len = 0;
			*sqlbuf = '\0';
			lc = 0;
		}
		
		if (nothing_in_queue) {
			switch_cond_next();
		}
	}

	while (switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS) {
		free(pop);
	}

	while (switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) {
		free(pop);
	}

	free(sqlbuf);

	sql_manager.thread_running = 0;

	switch_cache_db_release_db_handle(&sql_manager.event_db);

	return NULL;
}

static void core_event_handler(switch_event_t *event)
{
	char *sql = NULL;

	switch_assert(event);

	switch (event->event_id) {
	case SWITCH_EVENT_ADD_SCHEDULE:
		{
			const char *id = switch_event_get_header(event, "task-id");
			const char *manager = switch_event_get_header(event, "task-sql_manager");
			
			if (id) {
				sql = switch_mprintf("insert into tasks values(%q,'%q','%q',%q, '%q')",
									 id,
									 switch_event_get_header_nil(event, "task-desc"),
									 switch_event_get_header_nil(event, "task-group"), 
									 manager ? manager : "0",
									 switch_core_get_variable("hostname")
									 );
			}
		}
		break;
	case SWITCH_EVENT_DEL_SCHEDULE:
	case SWITCH_EVENT_EXE_SCHEDULE:
		sql = switch_mprintf("delete from tasks where task_id=%q and hostname='%q'", 
							 switch_event_get_header_nil(event, "task-id"), switch_core_get_variable("hostname"));
		break;
	case SWITCH_EVENT_RE_SCHEDULE:
		{
			const char *id = switch_event_get_header(event, "task-id");
			const char *manager = switch_event_get_header(event, "task-sql_manager");
		
			if (id) {
				sql = switch_mprintf("update tasks set task_desc='%q',task_group='%q', task_sql_manager=%q where task_id=%q and hostname='%q'",
									 switch_event_get_header_nil(event, "task-desc"), 
									 switch_event_get_header_nil(event, "task-group"), 
									 manager ? manager : "0",
									 id, switch_core_get_variable("hostname"));
			}
		}
		break;
	case SWITCH_EVENT_CHANNEL_DESTROY:
		sql = switch_mprintf("delete from channels where uuid='%q' and hostname='%q'", 
							 switch_event_get_header_nil(event, "unique-id"), switch_core_get_variable("hostname"));
		break;
	case SWITCH_EVENT_CHANNEL_UUID:
		{
			sql = switch_mprintf(
								 "update channels set uuid='%q' where uuid='%q' and hostname='%q';"
								 "update calls set caller_uuid='%q' where caller_uuid='%q' and hostname='%q';"
								 "update calls set callee_uuid='%q' where callee_uuid='%q' and hostname='%q'",
								 switch_event_get_header_nil(event, "unique-id"),
								 switch_event_get_header_nil(event, "old-unique-id"),
								 switch_core_get_variable("hostname"),
								 switch_event_get_header_nil(event, "unique-id"),
								 switch_event_get_header_nil(event, "old-unique-id"),
								 switch_core_get_variable("hostname"),
								 switch_event_get_header_nil(event, "unique-id"),
								 switch_event_get_header_nil(event, "old-unique-id"),
								 switch_core_get_variable("hostname")
								 );
			break;
		}
	case SWITCH_EVENT_CHANNEL_CREATE:
		sql = switch_mprintf("insert into channels (uuid,direction,created,created_epoch, name,state,dialplan,context,hostname) "
							 "values('%q','%q','%q','%ld','%q','%q','%q','%q','%q')",
							 switch_event_get_header_nil(event, "unique-id"),
							 switch_event_get_header_nil(event, "call-direction"),
							 switch_event_get_header_nil(event, "event-date-local"),
							 (long)switch_epoch_time_now(NULL),							 
							 switch_event_get_header_nil(event, "channel-name"),
							 switch_event_get_header_nil(event, "channel-state"),
							 switch_event_get_header_nil(event, "caller-dialplan"),
							 switch_event_get_header_nil(event, "caller-context"),
							 switch_core_get_variable("hostname")
							 );
		break;
	case SWITCH_EVENT_CODEC:
		sql =
			switch_mprintf
			("update channels set read_codec='%q',read_rate='%q',write_codec='%q',write_rate='%q' where uuid='%q' and hostname='%q'",
			 switch_event_get_header_nil(event, "channel-read-codec-name"), 
			 switch_event_get_header_nil(event, "channel-read-codec-rate"),
			 switch_event_get_header_nil(event, "channel-write-codec-name"), 
			 switch_event_get_header_nil(event, "channel-write-codec-rate"),
			 switch_event_get_header_nil(event, "unique-id"), switch_core_get_variable("hostname"));
		break;
	case SWITCH_EVENT_CHANNEL_EXECUTE:
		sql = switch_mprintf("update channels set application='%q',application_data='%q',"
							 "presence_id='%q',presence_data='%q' where uuid='%q' and hostname='%q'",
							 switch_event_get_header_nil(event, "application"),
							 switch_event_get_header_nil(event, "application-data"), 
							 switch_event_get_header_nil(event, "channel-presence-id"),
							 switch_event_get_header_nil(event, "channel-presence-data"),
							 switch_event_get_header_nil(event, "unique-id"), switch_core_get_variable("hostname")

							 );

		break;
	case SWITCH_EVENT_CHANNEL_STATE:
		{
			char *state = switch_event_get_header_nil(event, "channel-state-number");
			switch_channel_state_t state_i = CS_DESTROY;

			if (!zstr(state)) {
				state_i = atoi(state);
			}

			switch (state_i) {
			case CS_HANGUP:
			case CS_DESTROY:
				break;
			case CS_ROUTING:
				sql = switch_mprintf("update channels set state='%s',cid_name='%q',cid_num='%q',"
									 "ip_addr='%s',dest='%q',dialplan='%q',context='%q',presence_id='%q',presence_data='%q' "
									 "where uuid='%s' and hostname='%q'",
									 switch_event_get_header_nil(event, "channel-state"),
									 switch_event_get_header_nil(event, "caller-caller-id-name"),
									 switch_event_get_header_nil(event, "caller-caller-id-number"),
									 switch_event_get_header_nil(event, "caller-network-addr"),
									 switch_event_get_header_nil(event, "caller-destination-number"), 
									 switch_event_get_header_nil(event, "caller-dialplan"), 
									 switch_event_get_header_nil(event, "caller-context"),
									 switch_event_get_header_nil(event, "channel-presence-id"),
									 switch_event_get_header_nil(event, "channel-presence-data"),
									 switch_event_get_header_nil(event, "unique-id"), switch_core_get_variable("hostname"));
				break;
			default:
				sql = switch_mprintf("update channels set state='%s' where uuid='%s' and hostname='%q'",
									 switch_event_get_header_nil(event, "channel-state"), 
									 switch_event_get_header_nil(event, "unique-id"), switch_core_get_variable("hostname"));
				break;
			}
			break;
		}
	case SWITCH_EVENT_CHANNEL_BRIDGE:
		sql = switch_mprintf("insert into calls values ('%s', '%ld', '%s','%q','%q','%q','%q','%s','%q','%q','%q','%q','%s','%q')",
							 switch_event_get_header_nil(event, "event-date-local"),
							 (long)switch_epoch_time_now(NULL),
							 switch_event_get_header_nil(event, "event-calling-function"),
							 switch_event_get_header_nil(event, "caller-caller-id-name"),
							 switch_event_get_header_nil(event, "caller-caller-id-number"),
							 switch_event_get_header_nil(event, "caller-destination-number"),
							 switch_event_get_header_nil(event, "caller-channel-name"),
							 switch_event_get_header_nil(event, "caller-unique-id"),
							 switch_event_get_header_nil(event, "Other-Leg-caller-id-name"),
							 switch_event_get_header_nil(event, "Other-Leg-caller-id-number"),
							 switch_event_get_header_nil(event, "Other-Leg-destination-number"),
							 switch_event_get_header_nil(event, "Other-Leg-channel-name"), 
							 switch_event_get_header_nil(event, "Other-Leg-unique-id"),
							 switch_core_get_variable("hostname")
			);
		break;
	case SWITCH_EVENT_CHANNEL_UNBRIDGE:
		sql = switch_mprintf("delete from calls where caller_uuid='%s' and hostname='%q'", 
							 switch_event_get_header_nil(event, "caller-unique-id"), switch_core_get_variable("hostname"));
		break;
	case SWITCH_EVENT_SHUTDOWN:
		sql = switch_mprintf("delete from channels where hostname='%q';"
							 "delete from interfaces where hostname='%q';"
							 "delete from calls where hostname='%q'",
							 switch_core_get_variable("hostname"),
							 switch_core_get_variable("hostname"),
							 switch_core_get_variable("hostname")
							 );
		break;
	case SWITCH_EVENT_LOG:
		return;
	case SWITCH_EVENT_MODULE_LOAD:
		{
			const char *type = switch_event_get_header_nil(event, "type");
			const char *name = switch_event_get_header_nil(event, "name");
			const char *description = switch_event_get_header_nil(event, "description");
			const char *syntax = switch_event_get_header_nil(event, "syntax");
			const char *key = switch_event_get_header_nil(event, "key");
			const char *filename = switch_event_get_header_nil(event, "filename");
			if (!zstr(type) && !zstr(name)) {
				sql =
					switch_mprintf("insert into interfaces (type,name,description,syntax,key,filename,hostname) values('%q','%q','%q','%q','%q','%q','%q')",
								   type, name, switch_str_nil(description), switch_str_nil(syntax), 
								   switch_str_nil(key), switch_str_nil(filename), switch_core_get_variable("hostname")
								   );
			}
			break;
		}
	case SWITCH_EVENT_MODULE_UNLOAD:
		{
			const char *type = switch_event_get_header_nil(event, "type");
			const char *name = switch_event_get_header_nil(event, "name");
			if (!zstr(type) && !zstr(name)) {
				sql = switch_mprintf("delete from interfaces where type='%q' and name='%q' and hostname='%q'", type, name, 
									 switch_core_get_variable("hostname"));
			}
			break;
		}
	case SWITCH_EVENT_CALL_SECURE:
		{
			const char *type = switch_event_get_header_nil(event, "secure_type");
			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Secure Type: %s\n", type);
			if (zstr(type)) {
				break;
			}
			sql = switch_mprintf("update channels set secure='%s' where uuid='%s' and hostname='%q'",
								 type,
								 switch_event_get_header_nil(event, "caller-unique-id"),
								 switch_core_get_variable("hostname")
								 );
			break;
		}
	case SWITCH_EVENT_NAT:
		{
			const char *op = switch_event_get_header_nil(event, "op");
			switch_bool_t sticky = switch_true(switch_event_get_header_nil(event, "sticky"));
			if (!strcmp("add", op)) {
				sql = switch_mprintf("insert into nat (port, proto, sticky, hostname) values (%s, %s, %d,'%q')",
									 switch_event_get_header_nil(event, "port"),
									 switch_event_get_header_nil(event, "proto"),
									 sticky,
									 switch_core_get_variable("hostname")
									 );
			} else if (!strcmp("del", op)) {
				sql = switch_mprintf("delete from nat where port=%s and proto=%s and hostname='%q'",
									 switch_event_get_header_nil(event, "port"),
									 switch_event_get_header_nil(event, "proto"),
									 switch_core_get_variable("hostname"));
			} else if (!strcmp("status", op)) {
				/* call show nat api */
			} else if (!strcmp("status_response", op)) {
				/* ignore */
			} else {
				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unknown op for SWITCH_EVENT_NAT: %s\n", op);
			}
			break;
		}
	default:
		break;
	}

	if (sql) {
		if (switch_stristr("update channels", sql)) {
			switch_queue_push(sql_manager.sql_queue[1], sql);
		} else {
			switch_queue_push(sql_manager.sql_queue[0], sql);
		}
		sql = NULL;
	}
}


static char create_complete_sql[] =
	"CREATE TABLE complete (\n"
	"   sticky  INTEGER,\n" 
	"   a1  VARCHAR(4096),\n" 
	"   a2  VARCHAR(4096),\n" 
	"   a3  VARCHAR(4096),\n" 
	"   a4  VARCHAR(4096),\n" 
	"   a5  VARCHAR(4096),\n" 
	"   a6  VARCHAR(4096),\n" 
	"   a7  VARCHAR(4096),\n" 
	"   a8  VARCHAR(4096),\n" 
	"   a9  VARCHAR(4096),\n" 
	"   a10 VARCHAR(4096),\n" 
	"   hostname VARCHAR(4096)\n" 
	");\n";

static char create_alias_sql[] =
	"CREATE TABLE aliases (\n"
	"   sticky  INTEGER,\n" 
	"   alias  VARCHAR(4096),\n" 
	"   command  VARCHAR(4096),\n" 
	"   hostname VARCHAR(4096)\n" 
	");\n";

static char create_channels_sql[] =
	"CREATE TABLE channels (\n"
	"   uuid  VARCHAR(4096),\n"
	"   direction  VARCHAR(4096),\n"
	"   created  VARCHAR(4096),\n"
	"   created_epoch  INTEGER,\n"
	"   name  VARCHAR(4096),\n"
	"   state  VARCHAR(4096),\n"
	"   cid_name  VARCHAR(4096),\n"
	"   cid_num  VARCHAR(4096),\n"
	"   ip_addr  VARCHAR(4096),\n"
	"   dest  VARCHAR(4096),\n"
	"   application  VARCHAR(4096),\n"
	"   application_data  VARCHAR(4096),\n"
	"   dialplan VARCHAR(4096),\n"
	"   context VARCHAR(4096),\n"
	"   read_codec  VARCHAR(4096),\n" 
	"   read_rate  VARCHAR(4096),\n" 
	"   write_codec  VARCHAR(4096),\n" 
	"   write_rate  VARCHAR(4096),\n" 
	"   secure VARCHAR(4096),\n"
	"   hostname VARCHAR(4096),\n" 
	"   presence_id VARCHAR(4096),\n" 
	"   presence_data VARCHAR(4096)\n" 
	");\ncreate index uuindex on channels (uuid,hostname);\n";
static char create_calls_sql[] =
	"CREATE TABLE calls (\n"
	"   call_created  VARCHAR(4096),\n"
	"   call_created_epoch  INTEGER,\n"
	"   function  VARCHAR(4096),\n"
	"   caller_cid_name  VARCHAR(4096),\n"
	"   caller_cid_num   VARCHAR(4096),\n"
	"   caller_dest_num  VARCHAR(4096),\n"
	"   caller_chan_name VARCHAR(4096),\n"
	"   caller_uuid      VARCHAR(4096),\n"
	"   callee_cid_name  VARCHAR(4096),\n"
	"   callee_cid_num   VARCHAR(4096),\n"
	"   callee_dest_num  VARCHAR(4096),\n" 
	"   callee_chan_name VARCHAR(4096),\n" 
	"   callee_uuid      VARCHAR(4096),\n" 
	"   hostname VARCHAR(4096)\n" 
	");\n"
	"create index eruuindex on calls (caller_uuid,hostname);\n"
	"create index eeuuindex on calls (callee_uuid,hostname);\n";
static char create_interfaces_sql[] =
	"CREATE TABLE interfaces (\n"
	"   type             VARCHAR(4096),\n"
	"   name             VARCHAR(4096),\n" 
	"   description      VARCHAR(4096),\n" 
	"   key              VARCHAR(4096),\n" 
	"   filename         VARCHAR(4096),\n" 
	"   syntax           VARCHAR(4096),\n" 
	"   hostname VARCHAR(4096)\n" 
	");\n";
static char create_tasks_sql[] =
	"CREATE TABLE tasks (\n"
	"   task_id             INTEGER,\n"
	"   task_desc           VARCHAR(4096),\n" 
	"   task_group          VARCHAR(4096),\n" 
	"   task_sql_manager    INTEGER,\n" 
	"   hostname VARCHAR(4096)\n" 
	");\n";
static char create_nat_sql[] = 
	"CREATE TABLE nat (\n"
	"   sticky  INTEGER,\n" 
	"	port	INTEGER,\n"
	"	proto	INTEGER,\n"
	"   hostname VARCHAR(4096)\n" 
	");\n";

switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_t manage)
{
	switch_threadattr_t *thd_attr;
	switch_cache_db_handle_t *dbh;

	sql_manager.memory_pool = pool;
	sql_manager.manage = manage;
	
	switch_mutex_init(&dbh_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
	switch_core_hash_init(&dbh_hash, sql_manager.memory_pool);

 top:
	
	/* Activate SQL database */
	if (switch_core_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
		switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");

		if (runtime.odbc_dsn) {
			runtime.odbc_dsn = NULL;
			runtime.odbc_user = NULL;
			runtime.odbc_pass = NULL;
			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Falling back to core_db.\n");
			goto top;
		}


		switch_clear_flag((&runtime), SCF_USE_SQL);
		return SWITCH_STATUS_FALSE;
	}

	
	switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Opening DB\n");

	switch (dbh->type) {
	case SCDB_TYPE_ODBC:
		{
			char sql[512] = "";
			char *tables[] = {"channels", "calls", "interfaces", "tasks", NULL};
			int i;
			const char *hostname = switch_core_get_variable("hostname");

			for(i = 0; tables[i]; i++) {
				switch_snprintf(sql, sizeof(sql), "delete from %s where hostname='%s'", tables[i], hostname);
				switch_cache_db_execute_sql(dbh, sql, NULL);
			}
		}
		break;
	case SCDB_TYPE_CORE_DB:
		{
			switch_cache_db_execute_sql(dbh, "drop table channels", NULL);
			switch_cache_db_execute_sql(dbh, "drop table calls", NULL);
			switch_cache_db_execute_sql(dbh, "drop table interfaces", NULL);
			switch_cache_db_execute_sql(dbh, "drop table tasks", NULL);
			switch_cache_db_execute_sql(dbh, "PRAGMA synchronous=OFF;", NULL);
			switch_cache_db_execute_sql(dbh, "PRAGMA count_changes=OFF;", NULL);
			switch_cache_db_execute_sql(dbh, "PRAGMA cache_size=8000", NULL);
			switch_cache_db_execute_sql(dbh, "PRAGMA temp_store=MEMORY;", NULL);
		}
		break;
	}

		
	switch_cache_db_test_reactive(dbh, "select hostname from complete", "DROP TABLE complete", create_complete_sql);
	switch_cache_db_test_reactive(dbh, "select hostname from aliases", "DROP TABLE aliases", create_alias_sql);
	switch_cache_db_test_reactive(dbh, "select hostname from nat", "DROP TABLE nat", create_nat_sql);


	switch (dbh->type) {
	case SCDB_TYPE_ODBC:
		{
			char *err;
			switch_cache_db_test_reactive(dbh, "select hostname from channels", "DROP TABLE channels", create_channels_sql);
			switch_cache_db_test_reactive(dbh, "select hostname from calls", "DROP TABLE calls", create_calls_sql);
			switch_cache_db_test_reactive(dbh, "select hostname from interfaces", "DROP TABLE interfaces", create_interfaces_sql);
			switch_cache_db_test_reactive(dbh, "select hostname from tasks", "DROP TABLE tasks", create_tasks_sql);
			
			switch_cache_db_execute_sql(dbh, "begin;delete from channels where hostname='';delete from channels where hostname='';commit;", &err);
			
			if (err) {
				runtime.odbc_dsn = NULL;
				runtime.odbc_user = NULL;
				runtime.odbc_pass = NULL;
				switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Transactions not supported on your DB, disabling ODBC\n");
				switch_cache_db_release_db_handle(&dbh);
				free(err);
				goto top;
			}
		}
		break;
	case SCDB_TYPE_CORE_DB:
		{
			switch_cache_db_execute_sql(dbh, create_channels_sql, NULL);
			switch_cache_db_execute_sql(dbh, create_calls_sql, NULL);
			switch_cache_db_execute_sql(dbh, create_interfaces_sql, NULL);
			switch_cache_db_execute_sql(dbh, create_tasks_sql, NULL);
		}
		break;
	}
	

	switch_cache_db_execute_sql(dbh, "delete from complete where sticky=0", NULL);
	switch_cache_db_execute_sql(dbh, "delete from aliases where sticky=0", NULL);
	switch_cache_db_execute_sql(dbh, "delete from nat where sticky=0", NULL);
	switch_cache_db_execute_sql(dbh, "create index alias1 on aliases (alias)", NULL);
	switch_cache_db_execute_sql(dbh, "create index complete1 on complete (a1,hostname)", NULL);
	switch_cache_db_execute_sql(dbh, "create index complete2 on complete (a2,hostname)", NULL);
	switch_cache_db_execute_sql(dbh, "create index complete3 on complete (a3,hostname)", NULL);
	switch_cache_db_execute_sql(dbh, "create index complete4 on complete (a4,hostname)", NULL);
	switch_cache_db_execute_sql(dbh, "create index complete5 on complete (a5,hostname)", NULL);
	switch_cache_db_execute_sql(dbh, "create index complete6 on complete (a6,hostname)", NULL);
	switch_cache_db_execute_sql(dbh, "create index complete7 on complete (a7,hostname)", NULL);
	switch_cache_db_execute_sql(dbh, "create index complete8 on complete (a8,hostname)", NULL);
	switch_cache_db_execute_sql(dbh, "create index complete9 on complete (a9,hostname)", NULL);
	switch_cache_db_execute_sql(dbh, "create index complete10 on complete (a10,hostname)", NULL);
	switch_cache_db_execute_sql(dbh, "create unique index nat_map_port_proto on nat (port,proto,hostname)", NULL);
	switch_cache_db_execute_sql(dbh, "create index channels1 on channels(hostname)", NULL);
	switch_cache_db_execute_sql(dbh, "create index calls1 on calls(hostname)", NULL);


	if (sql_manager.manage) {
		if (switch_event_bind_removable("core_db", SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, 
										core_event_handler, NULL, &sql_manager.event_node) != SWITCH_STATUS_SUCCESS) {
			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't bind event handler!\n");
		}

		switch_queue_create(&sql_manager.sql_queue[0], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool);
		switch_queue_create(&sql_manager.sql_queue[1], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool);
	}

	switch_threadattr_create(&thd_attr, sql_manager.memory_pool);
	switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
	switch_thread_create(&sql_manager.thread, thd_attr, switch_core_sql_thread, NULL, sql_manager.memory_pool);

	while (!sql_manager.thread_running) {
		switch_yield(10000);
	}
	
	switch_cache_db_release_db_handle(&dbh);

	return SWITCH_STATUS_SUCCESS;
}

void switch_core_sqldb_stop(void)
{
	switch_status_t st;

	switch_event_unbind(&sql_manager.event_node);
	
	if (sql_manager.thread && sql_manager.thread_running) {

		if (sql_manager.manage) {
			switch_queue_push(sql_manager.sql_queue[0], NULL);
			switch_queue_push(sql_manager.sql_queue[1], NULL);
			switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Waiting for unfinished SQL transactions\n");
		}

		sql_manager.thread_running = -1;
		switch_thread_join(&st, sql_manager.thread);
	}

	sql_close(0);

	switch_core_hash_destroy(&dbh_hash);

}

SWITCH_DECLARE(void) switch_cache_db_status(switch_stream_handle_t *stream)
{
	/* return some status info suitable for the cli */
	switch_hash_index_t *hi;
	switch_cache_db_handle_t *dbh = NULL;
	void *val;
	const void *var;
	char *key;
	switch_bool_t locked = SWITCH_FALSE;
	time_t now = switch_epoch_time_now(NULL);
	char cleankey_str[CACHE_DB_LEN];
	char *pos1 = NULL;
	char *pos2 = NULL;
	
	switch_mutex_lock(dbh_mutex);
	
	for (hi = switch_hash_first(NULL, dbh_hash); hi; hi = switch_hash_next(hi)) {
		switch_hash_this(hi, &var, NULL, &val);
		key = (char *) var;

		if ((dbh = (switch_cache_db_handle_t *) val)) {
			char *needle = "pass=\"";
			time_t diff = 0;

			diff = now - dbh->last_used;
			
			if (switch_mutex_trylock(dbh->mutex) == SWITCH_STATUS_SUCCESS) {
				switch_mutex_unlock(dbh->mutex);
				locked = SWITCH_FALSE;
			} else {
				locked = SWITCH_TRUE;
			}

			/* sanitize password */
			memset(cleankey_str, 0, sizeof(cleankey_str));
			pos1 = strstr(key, needle) + strlen(needle);
			pos2 = strstr(pos1, "\"");
			strncpy(cleankey_str, key, pos1-key);
			strcpy(&cleankey_str[pos1-key], pos2);
			
			
			stream->write_function(stream, "%s\n\tType: %s\n\tLast used: %d\n\tFlags: %s, %s\n"
											"\tCreator: %s\n\tLast User: %s\n",
				cleankey_str,
				switch_cache_db_type_name(dbh->type),
				diff,
				locked ? "Locked" : "Unlocked",
				switch_test_flag(dbh, CDF_INUSE) ? "Attached" : "Detached",
				dbh->creator,
				dbh->last_user);
		}
	}
	switch_mutex_unlock(dbh_mutex);
}

/* For Emacs:
 * Local Variables:
 * mode:c
 * indent-tabs-mode:t
 * tab-width:4
 * c-basic-offset:4
 * End:
 * For VIM:
 * vim:set softtabstop=4 shiftwidth=4 tabstop=4:
 */