From a7465e6cbffd3f9d062244bbbb44b029ddfeb568 Mon Sep 17 00:00:00 2001 From: Chris Rienzo Date: Mon, 11 Apr 2016 11:54:38 -0400 Subject: [PATCH] FS-9052 [mod_hiredis] add connection pooling, improve dropped connection resiliency, and allow 0.10.0 of hiredis for CentOS 6. --- configure.ac | 2 +- .../mod_hiredis/hiredis_profile.c | 193 +++++++++++++----- .../applications/mod_hiredis/hiredis_utils.c | 9 +- .../applications/mod_hiredis/mod_hiredis.c | 2 +- .../applications/mod_hiredis/mod_hiredis.h | 12 +- 5 files changed, 161 insertions(+), 57 deletions(-) diff --git a/configure.ac b/configure.ac index 8746b0c75f..cac1f02b8c 100644 --- a/configure.ac +++ b/configure.ac @@ -1406,7 +1406,7 @@ PKG_CHECK_MODULES([SMPP34], [libsmpp34 >= 1.10],[ AM_CONDITIONAL([HAVE_SMPP34],[true])],[ AC_MSG_RESULT([no]); AM_CONDITIONAL([HAVE_SMPP34],[false])]) -PKG_CHECK_MODULES([HIREDIS], [hiredis >= 0.11.0],[ +PKG_CHECK_MODULES([HIREDIS], [hiredis >= 0.10.0],[ AM_CONDITIONAL([HAVE_HIREDIS],[true])],[ AC_MSG_RESULT([no]); AM_CONDITIONAL([HAVE_HIREDIS],[false])]) diff --git a/src/mod/applications/mod_hiredis/hiredis_profile.c b/src/mod/applications/mod_hiredis/hiredis_profile.c index ab2d152e0c..ce63d881d6 100644 --- a/src/mod/applications/mod_hiredis/hiredis_profile.c +++ b/src/mod/applications/mod_hiredis/hiredis_profile.c @@ -1,6 +1,6 @@ /* * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application -* Copyright (C) 2005-2015, Anthony Minessale II +* Copyright (C) 2005-2016, Anthony Minessale II * * Version: MPL 1.1 * @@ -24,31 +24,92 @@ * Contributor(s): * * William King +* Christopher Rienzo * * mod_hiredis.c -- redis client built using the C client library hiredis * */ #include - + +/* reconnect to redis server */ +static switch_status_t hiredis_context_reconnect(hiredis_context_t *context) +{ + redisFree(context->context); + context->context = redisConnectWithTimeout(context->connection->host, context->connection->port, context->connection->timeout); + if ( context->context && !context->context->err ) { + return SWITCH_STATUS_SUCCESS; + } + return SWITCH_STATUS_FALSE; +} + +/* Return a context back to the pool */ +static void hiredis_context_release(hiredis_context_t *context) +{ + if (switch_queue_push(context->connection->context_pool, context) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "hiredis: failed to release back to pool [%s, %d]\n", context->connection->host, context->connection->port); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "hiredis: release back to pool [%s, %d]\n", context->connection->host, context->connection->port); + } +} + +/* Grab a context from the pool, reconnect/connect as needed */ +static hiredis_context_t *hiredis_connection_get_context(hiredis_connection_t *conn) +{ + void *val = NULL; + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "hiredis: waiting for [%s, %d]\n", conn->host, conn->port); + if ( switch_queue_pop_timeout(conn->context_pool, &val, conn->timeout_us ) == SWITCH_STATUS_SUCCESS ) { + hiredis_context_t *context = (hiredis_context_t *)val; + if ( !context->context ) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "hiredis: attempting[%s, %d]\n", conn->host, conn->port); + context->context = redisConnectWithTimeout(conn->host, conn->port, conn->timeout); + if ( context->context && !context->context->err ) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "hiredis: connection success[%s, %d]\n", conn->host, conn->port); + return context; + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: connection error[%s, %d] (%s)\n", conn->host, conn->port, context->context->errstr); + hiredis_context_release(context); + return NULL; + } + } else if ( context->context->err ) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "hiredis: reconnecting[%s, %d]\n", conn->host, conn->port); + if (hiredis_context_reconnect(context) == SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "hiredis: reconnection success[%s, %d]\n", conn->host, conn->port); + return context; + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: reconnection error[%s, %d] (%s)\n", conn->host, conn->port, context->context->errstr); + hiredis_context_release(context); + return NULL; + } + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "hiredis: recycled from pool[%s, %d]\n", conn->host, conn->port); + return context; + } + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: timed out waiting for [%s, %d]\n", conn->host, conn->port); + } + + return NULL; +} + switch_status_t hiredis_profile_create(hiredis_profile_t **new_profile, char *name, uint8_t port) { hiredis_profile_t *profile = NULL; switch_memory_pool_t *pool = NULL; - switch_core_new_memory_pool(&pool); + switch_core_new_memory_pool(&pool); profile = switch_core_alloc(pool, sizeof(hiredis_profile_t)); profile->pool = pool; profile->name = name ? switch_core_strdup(profile->pool, name) : "default"; - profile->conn = NULL; profile->conn_head = NULL; switch_core_hash_insert(mod_hiredis_globals.profiles, name, (void *) profile); *new_profile = profile; - + return SWITCH_STATUS_SUCCESS; } @@ -69,7 +130,7 @@ switch_status_t hiredis_profile_destroy(hiredis_profile_t **old_profile) return SWITCH_STATUS_SUCCESS; } -switch_status_t hiredis_profile_connection_add(hiredis_profile_t *profile, char *host, char *password, uint32_t port, uint32_t timeout_ms) +switch_status_t hiredis_profile_connection_add(hiredis_profile_t *profile, char *host, char *password, uint32_t port, uint32_t timeout_ms, uint32_t max_contexts) { hiredis_connection_t *connection = NULL, *new_conn = NULL; @@ -77,21 +138,39 @@ switch_status_t hiredis_profile_connection_add(hiredis_profile_t *profile, char new_conn->host = host ? switch_core_strdup(profile->pool, host) : "localhost"; new_conn->password = password ? switch_core_strdup(profile->pool, password) : NULL; new_conn->port = port ? port : 6379; + new_conn->pool = profile->pool; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "hiredis: adding conn[%d]\n", new_conn->port); + /* create fixed size context pool */ + max_contexts = max_contexts > 0 ? max_contexts : 3; + if (switch_queue_create(&new_conn->context_pool, max_contexts, new_conn->pool) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "hiredis: failed to allocate context pool\n"); + return SWITCH_STATUS_GENERR; + } else { + int i = 0; + for (i = 0; i < max_contexts; i++) { + hiredis_context_t *new_context = switch_core_alloc(new_conn->pool, sizeof(hiredis_context_t)); + new_context->connection = new_conn; + new_context->context = NULL; + switch_queue_push(new_conn->context_pool, new_context); + } + } if ( timeout_ms ) { - new_conn->timeout.tv_sec = 0; - new_conn->timeout.tv_usec = timeout_ms * 1000; + new_conn->timeout_us = timeout_ms * 1000; + new_conn->timeout.tv_sec = timeout_ms / 1000; + new_conn->timeout.tv_usec = (timeout_ms % 1000) * 1000; } else { + new_conn->timeout_us = 500 * 1000; new_conn->timeout.tv_sec = 0; new_conn->timeout.tv_usec = 500 * 1000; } - + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "hiredis: adding conn[%s,%d], pool size = %d\n", new_conn->host, new_conn->port, max_contexts); + if ( profile->conn_head != NULL ){ /* Adding 'another' connection */ connection = profile->conn_head; - while ( connection->next != NULL ){ + while ( connection->next != NULL ) { connection = connection->next; } connection->next = new_conn; @@ -102,72 +181,88 @@ switch_status_t hiredis_profile_connection_add(hiredis_profile_t *profile, char return SWITCH_STATUS_SUCCESS; } -switch_status_t hiredis_profile_reconnect(hiredis_profile_t *profile) +static hiredis_context_t *hiredis_profile_get_context(hiredis_profile_t *profile, hiredis_connection_t *initial_conn) { - hiredis_connection_t *conn = profile->conn_head; - profile->conn = NULL; + hiredis_connection_t *conn = initial_conn ? initial_conn : profile->conn_head; + hiredis_context_t *context; - /* TODO: Needs thorough expansion to handle all disconnection scenarios */ - while ( conn ) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "hiredis: attempting[%s, %d]\n", conn->host, conn->port); - conn->context = redisConnectWithTimeout(conn->host, conn->port, conn->timeout); - - if ( conn->context && conn->context->err) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: connection error[%s]\n", conn->context->errstr); - conn = conn->next; - continue; + context = hiredis_connection_get_context(conn); + if (context) { + /* successful redis connection */ + return context; } - - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "hiredis: connection success[%s]\n", conn->host); - - /* successful redis connection */ - profile->conn = conn; - return SWITCH_STATUS_SUCCESS; + conn = conn->next; } - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: unable to reconnect\n"); - return SWITCH_STATUS_GENERR; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: unable to connect\n"); + return NULL; } -switch_status_t hiredis_profile_execute_sync(hiredis_profile_t *profile, const char *data, char **resp) +static switch_status_t hiredis_context_execute_sync(hiredis_context_t *context, const char *data, char **resp) { - char *str = NULL; - redisReply *response = NULL; - - /* Check connection */ - if ( !profile->conn && hiredis_profile_reconnect(profile) != SWITCH_STATUS_SUCCESS ) { - *resp = strdup("hiredis profile unable to establish connection"); - return SWITCH_STATUS_GENERR; - } - - response = redisCommand(profile->conn->context, data); - + redisReply *response = redisCommand(context->context, data); if ( !response ) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: empty response received\n"); + *resp = NULL; return SWITCH_STATUS_GENERR; } - + switch(response->type) { case REDIS_REPLY_STATUS: /* fallthrough */ case REDIS_REPLY_STRING: - str = strdup(response->str); + *resp = strdup(response->str); break; case REDIS_REPLY_INTEGER: - str = switch_mprintf("%lld", response->integer); + *resp = switch_mprintf("%lld", response->integer); break; default: switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: response error[%s][%d]\n", response->str, response->type); freeReplyObject(response); + *resp = NULL; return SWITCH_STATUS_GENERR; } freeReplyObject(response); - - *resp = str; return SWITCH_STATUS_SUCCESS; } +switch_status_t hiredis_profile_execute_sync(hiredis_profile_t *profile, const char *data, char **resp) +{ + hiredis_context_t *context = NULL; + int reconnected = 0; + + context = hiredis_profile_get_context(profile, NULL); + while (context) { + if (hiredis_context_execute_sync(context, data, resp) == SWITCH_STATUS_SUCCESS) { + /* got result */ + hiredis_context_release(context); + return SWITCH_STATUS_SUCCESS; + } else if (context->context->err) { + /* have a bad connection, try a single reconnect attempt before moving on to alternate connection */ + if (reconnected || hiredis_context_reconnect(context) != SWITCH_STATUS_SUCCESS) { + /* try alternate connection */ + hiredis_context_t *new_context = hiredis_profile_get_context(profile, context->connection); + hiredis_context_release(context); + context = new_context; + if (context) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "hiredis: got alternate connection to [%s, %d]\n", context->connection->host, context->connection->port); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "hiredis: no more alternate connections to try\n"); + } + reconnected = 0; + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "hiredis: reconnection success[%s, %d]\n", context->connection->host, context->connection->port); + reconnected = 1; + } + } else { + /* no problem with context, so don't retry */ + hiredis_context_release(context); + return SWITCH_STATUS_GENERR; + } + } + return SWITCH_STATUS_GENERR; +} + /* For Emacs: * Local Variables: diff --git a/src/mod/applications/mod_hiredis/hiredis_utils.c b/src/mod/applications/mod_hiredis/hiredis_utils.c index 8b552af569..ad6d625e41 100644 --- a/src/mod/applications/mod_hiredis/hiredis_utils.c +++ b/src/mod/applications/mod_hiredis/hiredis_utils.c @@ -24,6 +24,7 @@ * Contributor(s): * * William King +* Christopher Rienzo * * mod_hiredis.c -- redis client built using the C client library hiredis * @@ -67,7 +68,7 @@ switch_status_t mod_hiredis_do_config() if ( (connections = switch_xml_child(profile, "connections")) != NULL) { for (connection = switch_xml_child(connections, "connection"); connection; connection = connection->next) { char *host = NULL, *password = NULL; - uint32_t port = 0, timeout_ms = 0; + uint32_t port = 0, timeout_ms = 0, max_connections = 0; for (param = switch_xml_child(connection, "param"); param; param = param->next) { char *var = (char *) switch_xml_attr_soft(param, "name"); @@ -75,15 +76,17 @@ switch_status_t mod_hiredis_do_config() host = (char *) switch_xml_attr_soft(param, "value"); } else if ( !strncmp(var, "port", 4) ) { port = atoi(switch_xml_attr_soft(param, "value")); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "hiredis: adding conn[%u == %s]\n", port, switch_xml_attr_soft(param, "value")); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "hiredis: adding conn[%u == %s]\n", port, switch_xml_attr_soft(param, "value")); } else if ( !strncmp(var, "timeout_ms", 10) ) { timeout_ms = atoi(switch_xml_attr_soft(param, "value")); } else if ( !strncmp(var, "password", 8) ) { password = (char *) switch_xml_attr_soft(param, "value"); + } else if ( !strncmp(var, "max-connections", 15) ) { + max_connections = atoi(switch_xml_attr_soft(param, "value")); } } - if ( hiredis_profile_connection_add(new_profile, host, password, port, timeout_ms) == SWITCH_STATUS_SUCCESS) { + if ( hiredis_profile_connection_add(new_profile, host, password, port, timeout_ms, max_connections) == SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Created profile[%s]\n", name); } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to create profile[%s]\n", name); diff --git a/src/mod/applications/mod_hiredis/mod_hiredis.c b/src/mod/applications/mod_hiredis/mod_hiredis.c index d3b5ab7e11..efe0a03443 100644 --- a/src/mod/applications/mod_hiredis/mod_hiredis.c +++ b/src/mod/applications/mod_hiredis/mod_hiredis.c @@ -103,7 +103,7 @@ SWITCH_STANDARD_API(raw_api) } if ( hiredis_profile_execute_sync(profile, data, &response) != SWITCH_STATUS_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: profile[%s] error executing [%s] reason:[%s]\n", input, data, response); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "hiredis: profile[%s] error executing [%s] reason:[%s]\n", input, data, response ? response : ""); switch_goto_status(SWITCH_STATUS_GENERR, done); } diff --git a/src/mod/applications/mod_hiredis/mod_hiredis.h b/src/mod/applications/mod_hiredis/mod_hiredis.h index a73a508688..2403564b06 100644 --- a/src/mod/applications/mod_hiredis/mod_hiredis.h +++ b/src/mod/applications/mod_hiredis/mod_hiredis.h @@ -13,12 +13,19 @@ typedef struct mod_hiredis_global_s { extern mod_hiredis_global_t mod_hiredis_globals; +typedef struct mod_hiredis_context_s { + struct hiredis_connection_s *connection; + redisContext *context; +} hiredis_context_t; + typedef struct hiredis_connection_s { char *host; char *password; uint32_t port; - redisContext *context; + switch_interval_time_t timeout_us; struct timeval timeout; + switch_memory_pool_t *pool; + switch_queue_t *context_pool; struct hiredis_connection_s *next; } hiredis_connection_t; @@ -28,7 +35,6 @@ typedef struct hiredis_profile_s { char *name; int debug; - hiredis_connection_t *conn; hiredis_connection_t *conn_head; } hiredis_profile_t; @@ -44,7 +50,7 @@ typedef struct hiredis_limit_pvt_s { switch_status_t mod_hiredis_do_config(); switch_status_t hiredis_profile_create(hiredis_profile_t **new_profile, char *name, uint8_t port); switch_status_t hiredis_profile_destroy(hiredis_profile_t **old_profile); -switch_status_t hiredis_profile_connection_add(hiredis_profile_t *profile, char *host, char *password, uint32_t port, uint32_t timeout_ms); +switch_status_t hiredis_profile_connection_add(hiredis_profile_t *profile, char *host, char *password, uint32_t port, uint32_t timeout_ms, uint32_t max_connections); switch_status_t hiredis_profile_execute_sync(hiredis_profile_t *profile, const char *data, char **response);