mirror of
https://github.com/signalwire/freeswitch.git
synced 2025-03-06 18:30:01 +00:00
mod_mongo: use C++ driver pooling, added support for socket timeout
This commit is contained in:
parent
c21cd495d8
commit
7521254025
conf/vanilla/autoload_configs
src/mod/applications/mod_mongo
@ -7,8 +7,9 @@
|
|||||||
foo/server:port,server:port SET
|
foo/server:port,server:port SET
|
||||||
-->
|
-->
|
||||||
<param name="connection-string" value="127.0.0.1:27017"/>
|
<param name="connection-string" value="127.0.0.1:27017"/>
|
||||||
<param name="min-connections" value="10"/>
|
|
||||||
<param name="max-connections" value="100"/>
|
<param name="max-connections" value="100"/>
|
||||||
|
<!-- Timeout in seconds. 0 means no timeout -->
|
||||||
|
<param name="socket-timeout" value="0"/>
|
||||||
|
|
||||||
<!--
|
<!--
|
||||||
<param name="map" value="function() { emit(this.a, 1); }"/>
|
<param name="map" value="function() { emit(this.a, 1); }"/>
|
||||||
|
@ -6,12 +6,13 @@ MONGO_CXX_DRIVER_TARBALL=mongodb-linux-x86_64-$(MONGO_CXX_DRIVER_VERSION)-latest
|
|||||||
MONGO_CXX_DRIVER_SRC=$(BASE)/libs/mongo-cxx-driver-$(MONGO_CXX_DRIVER_VERSION)
|
MONGO_CXX_DRIVER_SRC=$(BASE)/libs/mongo-cxx-driver-$(MONGO_CXX_DRIVER_VERSION)
|
||||||
LIBMONGOCLIENT_A =$(MONGO_CXX_DRIVER_SRC)/libmongoclient.a
|
LIBMONGOCLIENT_A =$(MONGO_CXX_DRIVER_SRC)/libmongoclient.a
|
||||||
|
|
||||||
LOCAL_SOURCES=mongo_conn.cpp
|
LOCAL_SOURCES=
|
||||||
LOCAL_OBJS=mongo_conn.o
|
LOCAL_OBJS=
|
||||||
|
|
||||||
LOCAL_CFLAGS=-I$(MONGO_CXX_DRIVER_SRC)/src
|
LOCAL_CFLAGS=-I$(MONGO_CXX_DRIVER_SRC)/src
|
||||||
LOCAL_LIBADD=$(LIBMONGOCLIENT_A)
|
LOCAL_LIBADD=$(LIBMONGOCLIENT_A)
|
||||||
LOCAL_LDFLAGS=-lboost_thread -lboost_filesystem-mt -lboost_system-mt
|
#LOCAL_LDFLAGS=-lboost_thread -lboost_filesystem-mt -lboost_system-mt
|
||||||
|
LOCAL_LDFLAGS=-lboost_thread-mt -lboost_filesystem-mt -lboost_system-mt
|
||||||
MODDIR=$(shell pwd)
|
MODDIR=$(shell pwd)
|
||||||
|
|
||||||
|
|
||||||
|
@ -7,8 +7,9 @@
|
|||||||
foo/server:port,server:port SET
|
foo/server:port,server:port SET
|
||||||
-->
|
-->
|
||||||
<param name="connection-string" value="127.0.0.1:27017"/>
|
<param name="connection-string" value="127.0.0.1:27017"/>
|
||||||
<param name="min-connections" value="10"/>
|
|
||||||
<param name="max-connections" value="100"/>
|
<param name="max-connections" value="100"/>
|
||||||
|
<!-- Timeout in seconds. 0 means no timeout -->
|
||||||
|
<param name="socket-timeout" value="0"/>
|
||||||
|
|
||||||
<!--
|
<!--
|
||||||
<param name="map" value="function() { emit(this.a, 1); }"/>
|
<param name="map" value="function() { emit(this.a, 1); }"/>
|
||||||
|
@ -31,17 +31,20 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
#include <switch.h>
|
#include <switch.h>
|
||||||
#include "mod_mongo.h"
|
#include <mongo/client/dbclient.h>
|
||||||
|
|
||||||
|
using namespace mongo;
|
||||||
|
|
||||||
#define DELIMITER ';'
|
#define DELIMITER ';'
|
||||||
#define FIND_ONE_SYNTAX "mongo_find_one ns; query; fields; options"
|
#define FIND_ONE_SYNTAX "mongo_find_one ns; query; fields; options"
|
||||||
#define MAPREDUCE_SYNTAX "mongo_mapreduce ns; query"
|
#define MAPREDUCE_SYNTAX "mongo_mapreduce ns; query"
|
||||||
|
|
||||||
static struct {
|
static struct {
|
||||||
mongo_connection_pool_t *conn_pool;
|
const char *map;
|
||||||
char *map;
|
const char *reduce;
|
||||||
char *reduce;
|
const char *finalize;
|
||||||
char *finalize;
|
const char *conn_str;
|
||||||
|
double socket_timeout;
|
||||||
} globals;
|
} globals;
|
||||||
|
|
||||||
static int parse_query_options(char *query_options_str)
|
static int parse_query_options(char *query_options_str)
|
||||||
@ -86,6 +89,7 @@ SWITCH_STANDARD_API(mongo_mapreduce_function)
|
|||||||
|
|
||||||
if (!zstr(ns) && !zstr(json_query)) {
|
if (!zstr(ns) && !zstr(json_query)) {
|
||||||
try {
|
try {
|
||||||
|
scoped_ptr<ScopedDbConnection> conn(ScopedDbConnection::getScopedDbConnection(string(globals.conn_str, globals.socket_timeout)));
|
||||||
BSONObj query = fromjson(json_query);
|
BSONObj query = fromjson(json_query);
|
||||||
BSONObj out;
|
BSONObj out;
|
||||||
BSONObjBuilder cmd;
|
BSONObjBuilder cmd;
|
||||||
@ -105,20 +109,19 @@ SWITCH_STANDARD_API(mongo_mapreduce_function)
|
|||||||
}
|
}
|
||||||
cmd.append("out", BSON("inline" << 1));
|
cmd.append("out", BSON("inline" << 1));
|
||||||
|
|
||||||
conn = mongo_connection_pool_get(globals.conn_pool);
|
try {
|
||||||
if (conn) {
|
conn->get()->runCommand(nsGetDB(ns), cmd.done(), out);
|
||||||
conn->runCommand(nsGetDB(ns), cmd.done(), out);
|
|
||||||
mongo_connection_pool_put(globals.conn_pool, conn, SWITCH_FALSE);
|
|
||||||
|
|
||||||
stream->write_function(stream, "-OK\n%s\n", out.jsonString().c_str());
|
stream->write_function(stream, "-OK\n%s\n", out.jsonString().c_str());
|
||||||
} else {
|
} catch (DBException &e) {
|
||||||
stream->write_function(stream, "-ERR\nNo connection\n");
|
stream->write_function(stream, "-ERR\n%s\n", e.toString().c_str());
|
||||||
|
} catch (...) {
|
||||||
|
stream->write_function(stream, "-ERR\nUnknown exception!\n");
|
||||||
}
|
}
|
||||||
|
conn->done();
|
||||||
} catch (DBException &e) {
|
} catch (DBException &e) {
|
||||||
if (conn) {
|
|
||||||
mongo_connection_pool_put(globals.conn_pool, conn, SWITCH_TRUE);
|
|
||||||
}
|
|
||||||
stream->write_function(stream, "-ERR\n%s\n", e.toString().c_str());
|
stream->write_function(stream, "-ERR\n%s\n", e.toString().c_str());
|
||||||
|
} catch (...) {
|
||||||
|
stream->write_function(stream, "-ERR\nUnknown exception!\n");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
stream->write_function(stream, "-ERR\n%s\n", MAPREDUCE_SYNTAX);
|
stream->write_function(stream, "-ERR\n%s\n", MAPREDUCE_SYNTAX);
|
||||||
@ -134,7 +137,7 @@ SWITCH_STANDARD_API(mongo_find_one_function)
|
|||||||
switch_status_t status = SWITCH_STATUS_SUCCESS;
|
switch_status_t status = SWITCH_STATUS_SUCCESS;
|
||||||
char *ns = NULL, *json_query = NULL, *json_fields = NULL, *query_options_str = NULL;
|
char *ns = NULL, *json_query = NULL, *json_fields = NULL, *query_options_str = NULL;
|
||||||
int query_options = 0;
|
int query_options = 0;
|
||||||
|
|
||||||
ns = strdup(cmd);
|
ns = strdup(cmd);
|
||||||
switch_assert(ns != NULL);
|
switch_assert(ns != NULL);
|
||||||
|
|
||||||
@ -152,29 +155,24 @@ SWITCH_STANDARD_API(mongo_find_one_function)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!zstr(ns) && !zstr(json_query) && !zstr(json_fields)) {
|
if (!zstr(ns) && !zstr(json_query) && !zstr(json_fields)) {
|
||||||
|
|
||||||
DBClientBase *conn = NULL;
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
scoped_ptr<ScopedDbConnection> conn(ScopedDbConnection::getScopedDbConnection(string(globals.conn_str), globals.socket_timeout));
|
||||||
BSONObj query = fromjson(json_query);
|
BSONObj query = fromjson(json_query);
|
||||||
BSONObj fields = fromjson(json_fields);
|
BSONObj fields = fromjson(json_fields);
|
||||||
|
try {
|
||||||
conn = mongo_connection_pool_get(globals.conn_pool);
|
BSONObj res = conn->get()->findOne(ns, Query(query), &fields, query_options);
|
||||||
if (conn) {
|
|
||||||
BSONObj res = conn->findOne(ns, Query(query), &fields, query_options);
|
|
||||||
mongo_connection_pool_put(globals.conn_pool, conn, SWITCH_FALSE);
|
|
||||||
|
|
||||||
stream->write_function(stream, "-OK\n%s\n", res.jsonString().c_str());
|
stream->write_function(stream, "-OK\n%s\n", res.jsonString().c_str());
|
||||||
} else {
|
} catch (DBException &e) {
|
||||||
stream->write_function(stream, "-ERR\nNo connection\n");
|
stream->write_function(stream, "-ERR\n%s\n", e.toString().c_str());
|
||||||
|
} catch (...) {
|
||||||
|
stream->write_function(stream, "-ERR\nUnknown exception!\n");
|
||||||
}
|
}
|
||||||
|
conn->done();
|
||||||
} catch (DBException &e) {
|
} catch (DBException &e) {
|
||||||
if (conn) {
|
|
||||||
mongo_connection_pool_put(globals.conn_pool, conn, SWITCH_TRUE);
|
|
||||||
}
|
|
||||||
stream->write_function(stream, "-ERR\n%s\n", e.toString().c_str());
|
stream->write_function(stream, "-ERR\n%s\n", e.toString().c_str());
|
||||||
|
} catch (...) {
|
||||||
|
stream->write_function(stream, "-ERR\nUnknown exception!\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
stream->write_function(stream, "-ERR\n%s\n", FIND_ONE_SYNTAX);
|
stream->write_function(stream, "-ERR\n%s\n", FIND_ONE_SYNTAX);
|
||||||
}
|
}
|
||||||
@ -184,13 +182,18 @@ SWITCH_STANDARD_API(mongo_find_one_function)
|
|||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
static switch_status_t config(void)
|
static switch_status_t config(switch_memory_pool_t *pool)
|
||||||
{
|
{
|
||||||
const char *cf = "mongo.conf";
|
const char *cf = "mongo.conf";
|
||||||
switch_xml_t cfg, xml, settings, param;
|
switch_xml_t cfg, xml, settings, param;
|
||||||
switch_status_t status = SWITCH_STATUS_SUCCESS;
|
switch_status_t status = SWITCH_STATUS_SUCCESS;
|
||||||
const char *conn_str = "127.0.0.1";
|
|
||||||
switch_size_t min_connections = 1, max_connections = 1;
|
/* set defaults */
|
||||||
|
globals.map = "";
|
||||||
|
globals.reduce = "";
|
||||||
|
globals.finalize = "";
|
||||||
|
globals.conn_str = "";
|
||||||
|
globals.socket_timeout = 0.0;
|
||||||
|
|
||||||
if (!(xml = switch_xml_open_cfg(cf, &cfg, NULL))) {
|
if (!(xml = switch_xml_open_cfg(cf, &cfg, NULL))) {
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Open of %s failed\n", cf);
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Open of %s failed\n", cf);
|
||||||
@ -201,38 +204,71 @@ static switch_status_t config(void)
|
|||||||
for (param = switch_xml_child(settings, "param"); param; param = param->next) {
|
for (param = switch_xml_child(settings, "param"); param; param = param->next) {
|
||||||
char *var = (char *) switch_xml_attr_soft(param, "name");
|
char *var = (char *) switch_xml_attr_soft(param, "name");
|
||||||
char *val = (char *) switch_xml_attr_soft(param, "value");
|
char *val = (char *) switch_xml_attr_soft(param, "value");
|
||||||
int tmp;
|
|
||||||
|
|
||||||
if (!strcmp(var, "host")) {
|
if (!strcmp(var, "connection-string")) {
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "'host' is deprecated. use 'connection-string'\n");
|
if (zstr(val)) {
|
||||||
conn_str = val;
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "missing connection-string value\n");
|
||||||
} else if (!strcmp(var, "connection-string")) {
|
status = SWITCH_STATUS_GENERR;
|
||||||
conn_str = val;
|
} else {
|
||||||
} else if (!strcmp(var, "min-connections")) {
|
try {
|
||||||
if ((tmp = atoi(val)) > 0) {
|
string errmsg;
|
||||||
min_connections = tmp;
|
ConnectionString cs = ConnectionString::parse(string(val), errmsg);
|
||||||
}
|
if (!cs.isValid()) {
|
||||||
} else if (!strcmp(var, "max-connections")) {
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "connection-string \"%s\" is not valid: %s\n", val, errmsg.c_str());
|
||||||
if ((tmp = atoi(val)) > 0) {
|
status = SWITCH_STATUS_GENERR;
|
||||||
max_connections = tmp;
|
} else {
|
||||||
|
globals.conn_str = switch_core_strdup(pool, val);
|
||||||
|
}
|
||||||
|
} catch (DBException &e) {
|
||||||
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "connection-string \"%s\" is not valid: %s\n", val, e.toString().c_str());
|
||||||
|
status = SWITCH_STATUS_GENERR;
|
||||||
|
} catch (...) {
|
||||||
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "connection-string \"%s\" is not valid\n", val);
|
||||||
|
status = SWITCH_STATUS_GENERR;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else if (!strcmp(var, "map")) {
|
} else if (!strcmp(var, "map")) {
|
||||||
globals.map = strdup(val);
|
if (!zstr(val)) {
|
||||||
|
globals.map = switch_core_strdup(pool, val);
|
||||||
|
}
|
||||||
} else if (!strcmp(var, "reduce")) {
|
} else if (!strcmp(var, "reduce")) {
|
||||||
globals.reduce = strdup(val);
|
if (!zstr(val)) {
|
||||||
|
globals.reduce = switch_core_strdup(pool, val);
|
||||||
|
}
|
||||||
} else if (!strcmp(var, "finalize")) {
|
} else if (!strcmp(var, "finalize")) {
|
||||||
globals.finalize = strdup(val);
|
if (!zstr(val)) {
|
||||||
|
globals.finalize = switch_core_strdup(pool, val);
|
||||||
|
}
|
||||||
|
} else if (!strcmp(var, "socket-timeout")) {
|
||||||
|
if (!zstr(val)) {
|
||||||
|
if (switch_is_number(val)) {
|
||||||
|
double timeout = atof(val);
|
||||||
|
if (timeout >= 0.0) {
|
||||||
|
globals.socket_timeout = timeout;
|
||||||
|
} else {
|
||||||
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "socket-timeout \"%s\" is not valid\n", val);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "socket-timeout \"%s\" is not valid\n", val);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (!strcmp(var, "max-connections")) {
|
||||||
|
if (!zstr(val)) {
|
||||||
|
if (switch_is_number(val)) {
|
||||||
|
int max_connections = atoi(val);
|
||||||
|
if (max_connections > 0) {
|
||||||
|
PoolForHost::setMaxPerHost(max_connections);
|
||||||
|
} else {
|
||||||
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "max-connections \"%s\" is not valid\n", val);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "max-connections \"%s\" is not valid\n", val);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mongo_connection_pool_create(&globals.conn_pool, min_connections, max_connections, conn_str) != SWITCH_STATUS_SUCCESS) {
|
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Can't create connection pool\n");
|
|
||||||
status = SWITCH_STATUS_GENERR;
|
|
||||||
} else {
|
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Mongo connection pool created [%s %d/%d]\n", conn_str, (int)min_connections, (int)max_connections);
|
|
||||||
}
|
|
||||||
|
|
||||||
switch_xml_free(xml);
|
switch_xml_free(xml);
|
||||||
|
|
||||||
return status;
|
return status;
|
||||||
@ -255,7 +291,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_mongo_load)
|
|||||||
|
|
||||||
memset(&globals, 0, sizeof(globals));
|
memset(&globals, 0, sizeof(globals));
|
||||||
|
|
||||||
if (config() != SWITCH_STATUS_SUCCESS) {
|
if (config(pool) != SWITCH_STATUS_SUCCESS) {
|
||||||
return SWITCH_STATUS_TERM;
|
return SWITCH_STATUS_TERM;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -267,11 +303,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_mongo_load)
|
|||||||
|
|
||||||
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_mongo_shutdown)
|
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_mongo_shutdown)
|
||||||
{
|
{
|
||||||
mongo_connection_pool_destroy(&globals.conn_pool);
|
ScopedDbConnection::clearPool();
|
||||||
switch_safe_free(globals.map);
|
|
||||||
switch_safe_free(globals.reduce);
|
|
||||||
switch_safe_free(globals.finalize);
|
|
||||||
|
|
||||||
return SWITCH_STATUS_SUCCESS;
|
return SWITCH_STATUS_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,75 +0,0 @@
|
|||||||
/*
|
|
||||||
* FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
|
|
||||||
* Copyright (C) 2005-2013, Anthony Minessale II <anthm@freeswitch.org>
|
|
||||||
*
|
|
||||||
* Version: MPL 1.1
|
|
||||||
*
|
|
||||||
* The contents of this file are subject to the Mozilla Public License Version
|
|
||||||
* 1.1 (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
* http://www.mozilla.org/MPL/
|
|
||||||
*
|
|
||||||
* Software distributed under the License is distributed on an "AS IS" basis,
|
|
||||||
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
|
|
||||||
* for the specific language governing rights and limitations under the
|
|
||||||
* License.
|
|
||||||
*
|
|
||||||
* The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
|
|
||||||
*
|
|
||||||
* The Initial Developer of the Original Code is
|
|
||||||
* Anthony Minessale II <anthm@freeswitch.org>
|
|
||||||
* Portions created by the Initial Developer are Copyright (C)
|
|
||||||
* the Initial Developer. All Rights Reserved.
|
|
||||||
*
|
|
||||||
* Contributor(s):
|
|
||||||
*
|
|
||||||
* Tamas Cseke <cstomi.levlist@gmail.com>
|
|
||||||
*
|
|
||||||
* mod_mongo.h -- API for MongoDB
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef MOD_MONGO_H
|
|
||||||
#define MOD_MONGO_H
|
|
||||||
|
|
||||||
#include <mongo/client/dbclient.h>
|
|
||||||
|
|
||||||
using namespace mongo;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
char *conn_str;
|
|
||||||
|
|
||||||
switch_size_t min_connections;
|
|
||||||
switch_size_t max_connections;
|
|
||||||
switch_size_t size;
|
|
||||||
switch_queue_t *connections;
|
|
||||||
switch_mutex_t *mutex;
|
|
||||||
switch_memory_pool_t *pool;
|
|
||||||
|
|
||||||
} mongo_connection_pool_t;
|
|
||||||
|
|
||||||
|
|
||||||
switch_status_t mongo_connection_create(DBClientBase **connection, const char *conn_str);
|
|
||||||
void mongo_connection_destroy(DBClientBase **conn);
|
|
||||||
|
|
||||||
switch_status_t mongo_connection_pool_create(mongo_connection_pool_t **conn_pool, switch_size_t min_connections, switch_size_t max_connections,
|
|
||||||
const char *conn_str);
|
|
||||||
void mongo_connection_pool_destroy(mongo_connection_pool_t **conn_pool);
|
|
||||||
|
|
||||||
|
|
||||||
DBClientBase *mongo_connection_pool_get(mongo_connection_pool_t *conn_pool);
|
|
||||||
switch_status_t mongo_connection_pool_put(mongo_connection_pool_t *conn_pool, DBClientBase *conn, switch_bool_t destroy);
|
|
||||||
|
|
||||||
|
|
||||||
#endif
|
|
||||||
|
|
||||||
/* For Emacs:
|
|
||||||
* Local Variables:
|
|
||||||
* mode:c
|
|
||||||
* indent-tabs-mode:t
|
|
||||||
* tab-width:4
|
|
||||||
* c-basic-offset:4
|
|
||||||
* End:
|
|
||||||
* For VIM:
|
|
||||||
* vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet
|
|
||||||
*/
|
|
@ -1,211 +0,0 @@
|
|||||||
/*
|
|
||||||
* FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
|
|
||||||
* Copyright (C) 2005-2013, Anthony Minessale II <anthm@freeswitch.org>
|
|
||||||
*
|
|
||||||
* Version: MPL 1.1
|
|
||||||
*
|
|
||||||
* The contents of this file are subject to the Mozilla Public License Version
|
|
||||||
* 1.1 (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
* http://www.mozilla.org/MPL/
|
|
||||||
*
|
|
||||||
* Software distributed under the License is distributed on an "AS IS" basis,
|
|
||||||
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
|
|
||||||
* for the specific language governing rights and limitations under the
|
|
||||||
* License.
|
|
||||||
*
|
|
||||||
* The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
|
|
||||||
*
|
|
||||||
* The Initial Developer of the Original Code is
|
|
||||||
* Anthony Minessale II <anthm@freeswitch.org>
|
|
||||||
* Portions created by the Initial Developer are Copyright (C)
|
|
||||||
* the Initial Developer. All Rights Reserved.
|
|
||||||
*
|
|
||||||
* Contributor(s):
|
|
||||||
*
|
|
||||||
* Tamas Cseke <cstomi.levlist@gmail.com>
|
|
||||||
*
|
|
||||||
* mongo_conn.cpp -- MongoDB connection pool
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
#include <switch.h>
|
|
||||||
#include "mod_mongo.h"
|
|
||||||
|
|
||||||
/*
|
|
||||||
we could use the driver's connection pool,
|
|
||||||
if we could set the max connections (PoolForHost::setMaxPerHost)
|
|
||||||
|
|
||||||
ScopedDbConnection scoped_conn("host");
|
|
||||||
DBClientConnection *conn = dynamic_cast< DBClientConnection* >(&scoped_conn.conn());
|
|
||||||
scoped_conn.done();
|
|
||||||
*/
|
|
||||||
|
|
||||||
switch_status_t mongo_connection_create(DBClientBase **connection, const char *conn_str)
|
|
||||||
{
|
|
||||||
DBClientBase *conn = NULL;
|
|
||||||
string conn_string(conn_str), err_msg;
|
|
||||||
ConnectionString cs = ConnectionString::parse(conn_string, err_msg);
|
|
||||||
switch_status_t status = SWITCH_STATUS_FALSE;
|
|
||||||
|
|
||||||
if (!cs.isValid()) {
|
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Can't parse url: %s\n", err_msg.c_str());
|
|
||||||
return status;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
conn = cs.connect(err_msg);
|
|
||||||
} catch (DBException &e) {
|
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Can't connect to mongo [%s]: %s\n", conn_str, err_msg.c_str());
|
|
||||||
return status;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (conn) {
|
|
||||||
*connection = conn;
|
|
||||||
status = SWITCH_STATUS_SUCCESS;
|
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Connected to mongo [%s]\n", conn_str);
|
|
||||||
}
|
|
||||||
|
|
||||||
return status;
|
|
||||||
}
|
|
||||||
|
|
||||||
void mongo_connection_destroy(DBClientBase **conn)
|
|
||||||
{
|
|
||||||
switch_assert(*conn != NULL);
|
|
||||||
delete *conn;
|
|
||||||
|
|
||||||
*conn = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
switch_status_t mongo_connection_pool_create(mongo_connection_pool_t **conn_pool, switch_size_t min_connections, switch_size_t max_connections,
|
|
||||||
const char *conn_str)
|
|
||||||
{
|
|
||||||
switch_memory_pool_t *pool = NULL;
|
|
||||||
switch_status_t status = SWITCH_STATUS_SUCCESS;
|
|
||||||
mongo_connection_pool_t *cpool = NULL;
|
|
||||||
DBClientBase *conn = NULL;
|
|
||||||
|
|
||||||
if ((status = switch_core_new_memory_pool(&pool)) != SWITCH_STATUS_SUCCESS) {
|
|
||||||
return status;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!(cpool = (mongo_connection_pool_t *)switch_core_alloc(pool, sizeof(mongo_connection_pool_t)))) {
|
|
||||||
switch_goto_status(SWITCH_STATUS_MEMERR, done);
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((status = switch_mutex_init(&cpool->mutex, SWITCH_MUTEX_NESTED, pool)) != SWITCH_STATUS_SUCCESS) {
|
|
||||||
goto done;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((status = switch_queue_create(&cpool->connections, max_connections, pool)) != SWITCH_STATUS_SUCCESS) {
|
|
||||||
goto done;
|
|
||||||
}
|
|
||||||
|
|
||||||
cpool->min_connections = min_connections;
|
|
||||||
cpool->max_connections = max_connections;
|
|
||||||
cpool->conn_str = switch_core_strdup(pool, conn_str);
|
|
||||||
cpool->pool = pool;
|
|
||||||
|
|
||||||
for (cpool->size = 0; cpool->size < min_connections; cpool->size++) {
|
|
||||||
|
|
||||||
if (mongo_connection_create(&conn, conn_str) == SWITCH_STATUS_SUCCESS) {
|
|
||||||
mongo_connection_pool_put(cpool, conn, SWITCH_FALSE);
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
done:
|
|
||||||
|
|
||||||
if (status == SWITCH_STATUS_SUCCESS) {
|
|
||||||
*conn_pool = cpool;
|
|
||||||
} else {
|
|
||||||
switch_core_destroy_memory_pool(&pool);
|
|
||||||
}
|
|
||||||
|
|
||||||
return status;
|
|
||||||
}
|
|
||||||
|
|
||||||
void mongo_connection_pool_destroy(mongo_connection_pool_t **conn_pool)
|
|
||||||
{
|
|
||||||
mongo_connection_pool_t *cpool = *conn_pool;
|
|
||||||
void *data = NULL;
|
|
||||||
|
|
||||||
switch_assert(cpool != NULL);
|
|
||||||
|
|
||||||
while (switch_queue_trypop(cpool->connections, &data) == SWITCH_STATUS_SUCCESS) {
|
|
||||||
mongo_connection_destroy((DBClientBase **)&data);
|
|
||||||
}
|
|
||||||
|
|
||||||
switch_mutex_destroy(cpool->mutex);
|
|
||||||
switch_core_destroy_memory_pool(&cpool->pool);
|
|
||||||
|
|
||||||
*conn_pool = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
DBClientBase *mongo_connection_pool_get(mongo_connection_pool_t *conn_pool)
|
|
||||||
{
|
|
||||||
DBClientBase *conn = NULL;
|
|
||||||
void *data = NULL;
|
|
||||||
|
|
||||||
switch_assert(conn_pool != NULL);
|
|
||||||
|
|
||||||
switch_mutex_lock(conn_pool->mutex);
|
|
||||||
|
|
||||||
if (switch_queue_trypop(conn_pool->connections, &data) == SWITCH_STATUS_SUCCESS) {
|
|
||||||
conn = (DBClientBase *) data;
|
|
||||||
} else if (mongo_connection_create(&conn, conn_pool->conn_str) == SWITCH_STATUS_SUCCESS) {
|
|
||||||
if (++conn_pool->size > conn_pool->max_connections) {
|
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Connection pool is empty. You may want to increase 'max-connections'\n");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
switch_mutex_unlock(conn_pool->mutex);
|
|
||||||
|
|
||||||
#ifdef MONGO_POOL_DEBUG
|
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "POOL get: size %d conn: %p\n", (int) switch_queue_size(conn_pool->connections), conn);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
return conn;
|
|
||||||
}
|
|
||||||
|
|
||||||
switch_status_t mongo_connection_pool_put(mongo_connection_pool_t *conn_pool, DBClientBase *conn, switch_bool_t destroy)
|
|
||||||
{
|
|
||||||
switch_status_t status = SWITCH_STATUS_SUCCESS;
|
|
||||||
|
|
||||||
switch_assert(conn_pool != NULL);
|
|
||||||
switch_assert(conn != NULL);
|
|
||||||
|
|
||||||
switch_mutex_lock(conn_pool->mutex);
|
|
||||||
if (destroy || conn_pool->size > conn_pool->max_connections) {
|
|
||||||
#ifdef MONGO_POOL_DEBUG
|
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "POOL: Destroy connection %p\n", conn);
|
|
||||||
#endif
|
|
||||||
mongo_connection_destroy(&conn);
|
|
||||||
conn_pool->size--;
|
|
||||||
} else {
|
|
||||||
#ifdef MONGO_POOL_DEBUG
|
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "POOL: push connection %p\n", conn);
|
|
||||||
#endif
|
|
||||||
status = switch_queue_push(conn_pool->connections, conn);
|
|
||||||
}
|
|
||||||
|
|
||||||
switch_mutex_unlock(conn_pool->mutex);
|
|
||||||
|
|
||||||
#ifdef MONGO_POOL_DEBUG
|
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "POOL: put size %d conn: %p\n", (int) switch_queue_size(conn_pool->connections), conn);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
return status;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* For Emacs:
|
|
||||||
* Local Variables:
|
|
||||||
* mode:c
|
|
||||||
* indent-tabs-mode:t
|
|
||||||
* tab-width:4
|
|
||||||
* c-basic-offset:4
|
|
||||||
* End:
|
|
||||||
* For VIM:
|
|
||||||
* vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet
|
|
||||||
*/
|
|
Loading…
x
Reference in New Issue
Block a user