diff --git a/conf/autoload_configs/redis.conf.xml b/conf/autoload_configs/redis.conf.xml new file mode 100644 index 0000000000..1a1f474036 --- /dev/null +++ b/conf/autoload_configs/redis.conf.xml @@ -0,0 +1,7 @@ + + + + + + + diff --git a/src/mod/applications/mod_redis/Makefile b/src/mod/applications/mod_redis/Makefile new file mode 100644 index 0000000000..9ff4816a96 --- /dev/null +++ b/src/mod/applications/mod_redis/Makefile @@ -0,0 +1,3 @@ +BASE=../../../.. +LOCAL_OBJS=credis.o +include $(BASE)/build/modmake.rules diff --git a/src/mod/applications/mod_redis/credis.c b/src/mod/applications/mod_redis/credis.c new file mode 100644 index 0000000000..bb71904005 --- /dev/null +++ b/src/mod/applications/mod_redis/credis.c @@ -0,0 +1,1146 @@ +/* credis.c -- a C client library for Redis + * + * Copyright (c) 2009-2010, Jonas Romfelt + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "credis.h" + +#define CR_ERROR '-' +#define CR_INLINE '+' +#define CR_BULK '$' +#define CR_MULTIBULK '*' +#define CR_INT ':' + +#define CR_BUFFER_SIZE 4096 +#define CR_BUFFER_WATERMARK ((CR_BUFFER_SIZE)/10+1) +#define CR_MULTIBULK_SIZE 256 + +#define _STRINGIF(arg) #arg +#define STRINGIFY(arg) _STRINGIF(arg) + +#define CR_VERSION_STRING_SIZE_STR STRINGIFY(CREDIS_VERSION_STRING_SIZE) + +#ifdef PRINTDEBUG +/* add -DPRINTDEBUG to CPPFLAGS in Makefile for debug outputs */ +#define DEBUG(...) \ + do { \ + printf("%s() @ %d: ", __FUNCTION__, __LINE__); \ + printf(__VA_ARGS__); \ + printf("\n"); \ + } while (0) +#else +#define DEBUG(...) +#endif + +typedef struct _cr_buffer { + char *data; + int idx; + int len; + int size; +} cr_buffer; + +typedef struct _cr_multibulk { + char **bulks; + int *idxs; + int size; + int len; +} cr_multibulk; + +typedef struct _cr_reply { + int integer; + char *line; + char *bulk; + cr_multibulk multibulk; +} cr_reply; + +typedef struct _cr_redis { + int fd; + char *ip; + int port; + int timeout; + cr_buffer buf; + cr_reply reply; + int error; +} cr_redis; + + +/* Returns pointer to the '\r' of the first occurence of "\r\n", or NULL + * if not found */ +static char * cr_findnl(char *buf, int len) { + while (--len >= 0) { + if (*(buf++) == '\r') + if (*buf == '\n') + return --buf; + } + return NULL; +} + +/* Allocate at least `size' bytes more buffer memory, keeping content of + * previously allocated memory untouched. + * Returns: + * 0 on success + * -1 on error, i.e. more memory not available */ +static int cr_moremem(cr_buffer *buf, int size) +{ + char *ptr; + int total, n; + + n = size / CR_BUFFER_SIZE + 1; + total = buf->size + n * CR_BUFFER_SIZE; + + DEBUG("allocate %d x CR_BUFFER_SIZE, total %d bytes", n, total); + + ptr = realloc(buf->data, total); + if (ptr == NULL) + return -1; + + buf->data = ptr; + buf->size = total; + return 0; +} + +/* Allocate at least `size' more multibulk storage, keeping content of + * previously allocated memory untouched. + * Returns: + * 0 on success + * -1 on error, i.e. more memory not available */ +static int cr_morebulk(cr_multibulk *mb, int size) +{ + char **cptr; + int *iptr; + int total, n; + + n = (size / CR_MULTIBULK_SIZE + 1) * CR_MULTIBULK_SIZE; + total = mb->size + n; + + DEBUG("allocate %d x CR_MULTIBULK_SIZE, total %d (%lu bytes)", + n, total, total * ((sizeof(char *)+sizeof(int)))); + cptr = realloc(mb->bulks, total * sizeof(char *)); + iptr = realloc(mb->idxs, total * sizeof(int)); + + if (cptr == NULL || iptr == NULL) + return CREDIS_ERR_NOMEM; + + mb->bulks = cptr; + mb->idxs = iptr; + mb->size = total; + return 0; +} + +/* Appends a string `str' to the end of buffer `buf'. If available memory + * in buffer is not enough to hold `str' more memory is allocated to the + * buffer. If `space' is not 0 `str' is padded with a space. + * Returns: + * 0 on success + * <0 on error, i.e. more memory not available */ +static int cr_appendstr(cr_buffer *buf, const char *str, int space) +{ + int rc, avail; + char *format = (space==0?"%s":" %s"); + + /* TODO instead of using formatted print use memcpy() and don't + blindly add a space before `str' */ + + avail = buf->size - buf->len; + rc = snprintf(buf->data + buf->len, avail, format, str); + if (rc >= avail) { + DEBUG("truncated, get more memory and try again"); + if (cr_moremem(buf, rc - avail + 1)) + return CREDIS_ERR_NOMEM; + + avail = buf->size - buf->len; + rc = snprintf(buf->data + buf->len, avail, format, str); + } + buf->len += rc; + + return 0; +} + +/* Appends an array of strings `strv' to the end of buffer `buf', each + * separated with a space. If `newline' is not 0 "\r\n" is added last + * to buffer. + * Returns: + * 0 on success + * <0 on error, i.e. more memory not available */ +int cr_appendstrarray(cr_buffer *buf, int strc, const char **strv, int newline) +{ + int rc, i; + + for (i = 0; i < strc; i++) { + if ((rc = cr_appendstr(buf, strv[i], 1)) != 0) + return rc; + } + + if (newline) { + if ((rc = cr_appendstr(buf, "\r\n", 0)) != 0) + return rc; + } + + return 0; +} + +/* Receives at most `size' bytes from socket `fd' to `buf'. Times out after + * `msecs' milliseconds if no data has yet arrived. + * Returns: + * >0 number of read bytes on success + * 0 server closed connection + * -1 on error + * -2 on timeout */ +static int cr_receivedata(int fd, unsigned int msecs, char *buf, int size) +{ + fd_set fds; + struct timeval tv; + int rc; + + tv.tv_sec = msecs/1000; + tv.tv_usec = (msecs%1000)*1000; + + FD_ZERO(&fds); + FD_SET(fd, &fds); + + rc = select(fd+1, &fds, NULL, NULL, &tv); + + if (rc > 0) + return recv(fd, buf, size, 0); + else if (rc == 0) + return -2; + else + return -1; +} + +/* Sends `size' bytes from `buf' to socket `fd' and times out after `msecs' + * milliseconds if not all data has been sent. + * Returns: + * >0 number of bytes sent; if less than `size' it means that timeout occurred + * -1 on error */ +static int cr_senddata(int fd, unsigned int msecs, char *buf, int size) +{ + fd_set fds; + struct timeval tv; + int rc, sent=0; + + /* NOTE: On Linux, select() modifies timeout to reflect the amount + * of time not slept, on other systems it is likely not the same */ + tv.tv_sec = msecs/1000; + tv.tv_usec = (msecs%1000)*1000; + + while (sent < size) { + FD_ZERO(&fds); + FD_SET(fd, &fds); + + rc = select(fd+1, NULL, &fds, NULL, &tv); + + if (rc > 0) { + rc = send(fd, buf+sent, size-sent, 0); + if (rc < 0) + return -1; + sent += rc; + } + else if (rc == 0) /* timeout */ + break; + else + return -1; + } + + return sent; +} + +/* Buffered read line, returns pointer to zero-terminated string + * and length of that string. `start' specifies from which byte + * to start looking for "\r\n". + * Returns: + * >0 length of string to which pointer `line' refers. `idx' is + * an optional pointer for returning start index of line with + * respect to buffer. + * 0 connection to Redis server was closed + * -1 on error, i.e. a string is not available */ +static int cr_readln(REDIS rhnd, int start, char **line, int *idx) +{ + cr_buffer *buf = &(rhnd->buf); + char *nl; + int rc, len, avail, more; + + /* do we need more data before we expect to find "\r\n"? */ + if ((more = buf->idx + start + 2 - buf->len) < 0) + more = 0; + + while (more > 0 || + (nl = cr_findnl(buf->data + buf->idx + start, buf->len - (buf->idx + start))) == NULL) { + avail = buf->size - buf->len; + if (avail < CR_BUFFER_WATERMARK || avail < more) { + DEBUG("available buffer memory is low, get more memory"); + if (cr_moremem(buf, more>0?more:1)) + return CREDIS_ERR_NOMEM; + + avail = buf->size - buf->len; + } + + rc = cr_receivedata(rhnd->fd, rhnd->timeout, buf->data + buf->len, avail); + if (rc > 0) { + DEBUG("received %d bytes: %s", rc, buf->data + buf->len); + buf->len += rc; + } + else if (rc == 0) + return 0; /* EOF reached, connection terminated */ + else + return -1; /* error */ + + /* do we need more data before we expect to find "\r\n"? */ + if ((more = buf->idx + start + 2 - buf->len) < 0) + more = 0; + } + + *nl = '\0'; /* zero terminate */ + + *line = buf->data + buf->idx; + if (idx) + *idx = buf->idx; + len = nl - *line; + buf->idx = (nl - buf->data) + 2; /* skip "\r\n" */ + + DEBUG("size=%d, len=%d, idx=%d, start=%d, line=%s", + buf->size, buf->len, buf->idx, start, *line); + + return len; +} + +static int cr_receivemultibulk(REDIS rhnd, char *line) +{ + int bnum, blen, i, rc=0, idx; + + bnum = atoi(line); + + if (bnum == -1) { + rhnd->reply.multibulk.len = 0; /* no data or key didn't exist */ + return 0; + } + else if (bnum > rhnd->reply.multibulk.size) { + DEBUG("available multibulk storage is low, get more memory"); + if (cr_morebulk(&(rhnd->reply.multibulk), bnum - rhnd->reply.multibulk.size)) + return CREDIS_ERR_NOMEM; + } + + for (i = 0; bnum > 0 && (rc = cr_readln(rhnd, 0, &line, NULL)) > 0; i++, bnum--) { + if (*(line++) != CR_BULK) + return CREDIS_ERR_PROTOCOL; + + blen = atoi(line); + if (blen == -1) + rhnd->reply.multibulk.idxs[i] = -1; + else { + if ((rc = cr_readln(rhnd, blen, &line, &idx)) != blen) + return CREDIS_ERR_PROTOCOL; + + rhnd->reply.multibulk.idxs[i] = idx; + } + } + + if (bnum != 0) { + DEBUG("bnum != 0, bnum=%d, rc=%d", bnum, rc); + return CREDIS_ERR_PROTOCOL; + } + + rhnd->reply.multibulk.len = i; + for (i = 0; i < rhnd->reply.multibulk.len; i++) { + if (rhnd->reply.multibulk.idxs[i] > 0) + rhnd->reply.multibulk.bulks[i] = rhnd->buf.data + rhnd->reply.multibulk.idxs[i]; + else + rhnd->reply.multibulk.bulks[i] = NULL; + } + + return 0; +} + +static int cr_receivebulk(REDIS rhnd, char *line) +{ + int blen; + + blen = atoi(line); + if (blen == -1) { + rhnd->reply.bulk = NULL; /* key didn't exist */ + return 0; + } + if (cr_readln(rhnd, blen, &line, NULL) >= 0) { + rhnd->reply.bulk = line; + return 0; + } + + return CREDIS_ERR_PROTOCOL; +} + +static int cr_receiveinline(REDIS rhnd, char *line) +{ + rhnd->reply.line = line; + return 0; +} + +static int cr_receiveint(REDIS rhnd, char *line) +{ + rhnd->reply.integer = atoi(line); + return 0; +} + +static int cr_receiveerror(REDIS rhnd, char *line) +{ + rhnd->reply.line = line; + return CREDIS_ERR_PROTOCOL; +} + +static int cr_receivereply(REDIS rhnd, char recvtype) +{ + char *line, prefix=0; + + /* reset common send/receive buffer */ + rhnd->buf.len = 0; + rhnd->buf.idx = 0; + + if (cr_readln(rhnd, 0, &line, NULL) > 0) { + prefix = *(line++); + + if (prefix != recvtype && prefix != CR_ERROR) + return CREDIS_ERR_PROTOCOL; + + switch(prefix) { + case CR_ERROR: + return cr_receiveerror(rhnd, line); + case CR_INLINE: + return cr_receiveinline(rhnd, line); + case CR_INT: + return cr_receiveint(rhnd, line); + case CR_BULK: + return cr_receivebulk(rhnd, line); + case CR_MULTIBULK: + return cr_receivemultibulk(rhnd, line); + } + } + + return CREDIS_ERR_RECV; +} + +static void cr_delete(REDIS rhnd) +{ + if (rhnd->reply.multibulk.bulks != NULL) + free(rhnd->reply.multibulk.bulks); + if (rhnd->reply.multibulk.idxs != NULL) + free(rhnd->reply.multibulk.idxs); + if (rhnd->buf.data != NULL) + free(rhnd->buf.data); + if (rhnd->ip != NULL) + free(rhnd->ip); + if (rhnd != NULL) + free(rhnd); +} + +REDIS cr_new(void) +{ + REDIS rhnd; + + if ((rhnd = calloc(sizeof(cr_redis), 1)) == NULL || + (rhnd->ip = malloc(32)) == NULL || + (rhnd->buf.data = malloc(CR_BUFFER_SIZE)) == NULL || + (rhnd->reply.multibulk.bulks = malloc(sizeof(char *)*CR_MULTIBULK_SIZE)) == NULL || + (rhnd->reply.multibulk.idxs = malloc(sizeof(int)*CR_MULTIBULK_SIZE)) == NULL) { + cr_delete(rhnd); + return NULL; + } + + rhnd->buf.size = CR_BUFFER_SIZE; + rhnd->reply.multibulk.size = CR_MULTIBULK_SIZE; + + return rhnd; +} + +/* Send message that has been prepared in message buffer prior to the call + * to this function. Wait and receive reply. */ +static int cr_sendandreceive(REDIS rhnd, char recvtype) +{ + int rc; + + DEBUG("Sending message: len=%d, data=%s", rhnd->buf.len, rhnd->buf.data); + + rc = cr_senddata(rhnd->fd, rhnd->timeout, rhnd->buf.data, rhnd->buf.len); + + if (rc != rhnd->buf.len) { + if (rc < 0) + return CREDIS_ERR_SEND; + return CREDIS_ERR_TIMEOUT; + } + + return cr_receivereply(rhnd, recvtype); +} + +/* Prepare message buffer for sending using a printf()-style formatting. */ +static int cr_sendfandreceive(REDIS rhnd, char recvtype, const char *format, ...) +{ + int rc; + va_list ap; + cr_buffer *buf = &(rhnd->buf); + + va_start(ap, format); + rc = vsnprintf(buf->data, buf->size, format, ap); + va_end(ap); + + if (rc < 0) + return -1; + + if (rc >= buf->size) { + DEBUG("truncated, get more memory and try again"); + if (cr_moremem(buf, rc - buf->size + 1)) + return CREDIS_ERR_NOMEM; + + va_start(ap, format); + rc = vsnprintf(buf->data, buf->size, format, ap); + va_end(ap); + } + + buf->len = rc; + + return cr_sendandreceive(rhnd, recvtype); +} + +void credis_close(REDIS rhnd) +{ + if (rhnd->fd > 0) + close(rhnd->fd); + cr_delete(rhnd); +} + +REDIS credis_connect(const char *host, int port, int timeout) +{ + int fd, yes = 1; + struct sockaddr_in sa; + REDIS rhnd; + + if ((rhnd = cr_new()) == NULL) + return NULL; + + if (host == NULL) + host = "127.0.0.1"; + if (port == 0) + port = 6379; + + if ((fd = socket(AF_INET, SOCK_STREAM, 0)) == -1 || + setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &yes, sizeof(yes)) == -1 || + setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)) == -1) + goto error; + + sa.sin_family = AF_INET; + sa.sin_port = htons(port); + if (inet_aton(host, &sa.sin_addr) == 0) { + struct hostent *he = gethostbyname(host); + if (he == NULL) + goto error; + memcpy(&sa.sin_addr, he->h_addr, sizeof(struct in_addr)); + } + + if (connect(fd, (struct sockaddr*)&sa, sizeof(sa)) == -1) + goto error; + + strcpy(rhnd->ip, inet_ntoa(sa.sin_addr)); + rhnd->port = port; + rhnd->fd = fd; + rhnd->timeout = timeout; + + return rhnd; + + error: + if (fd > 0) + close(fd); + cr_delete(rhnd); + return NULL; +} + +int credis_set(REDIS rhnd, const char *key, const char *val) +{ + return cr_sendfandreceive(rhnd, CR_INLINE, "SET %s %d\r\n%s\r\n", + key, strlen(val), val); +} + +int credis_get(REDIS rhnd, const char *key, char **val) +{ + int rc = cr_sendfandreceive(rhnd, CR_BULK, "GET %s\r\n", key); + + if (rc == 0) + if ((*val = rhnd->reply.bulk) == NULL) + return -1; + + return rc; +} + +int credis_getset(REDIS rhnd, const char *key, const char *set_val, char **get_val) +{ + int rc = cr_sendfandreceive(rhnd, CR_BULK, "GETSET %s %d\r\n%s\r\n", + key, strlen(set_val), set_val); + + if (rc == 0) + if ((*get_val = rhnd->reply.bulk) == NULL) + return -1; + + return rc; +} + +int credis_ping(REDIS rhnd) +{ + return cr_sendfandreceive(rhnd, CR_INLINE, "PING\r\n"); +} + +int credis_auth(REDIS rhnd, const char *password) +{ + return cr_sendfandreceive(rhnd, CR_INLINE, "AUTH %s\r\n", password); +} + +int cr_multikeybulkcommand(REDIS rhnd, const char *cmd, int keyc, + const char **keyv, char ***valv) +{ + cr_buffer *buf = &(rhnd->buf); + int rc; + + buf->len = 0; + if ((rc = cr_appendstr(buf, cmd, 0)) != 0) + return rc; + if ((rc = cr_appendstrarray(buf, keyc, keyv, 1)) != 0) + return rc; + if ((rc = cr_sendandreceive(rhnd, CR_MULTIBULK)) == 0) { + *valv = rhnd->reply.multibulk.bulks; + rc = rhnd->reply.multibulk.len; + } + + return rc; +} + +int cr_multikeystorecommand(REDIS rhnd, const char *cmd, const char *destkey, + int keyc, const char **keyv) +{ + cr_buffer *buf = &(rhnd->buf); + int rc; + + buf->len = 0; + if ((rc = cr_appendstr(buf, cmd, 0)) != 0) + return rc; + if ((rc = cr_appendstr(buf, destkey, 1)) != 0) + return rc; + if ((rc = cr_appendstrarray(buf, keyc, keyv, 1)) != 0) + return rc; + + return cr_sendandreceive(rhnd, CR_INLINE); +} + +int credis_mget(REDIS rhnd, int keyc, const char **keyv, char ***valv) +{ + return cr_multikeybulkcommand(rhnd, "MGET", keyc, keyv, valv); +} + +int credis_setnx(REDIS rhnd, const char *key, const char *val) +{ + int rc = cr_sendfandreceive(rhnd, CR_INT, "SETNX %s %d\r\n%s\r\n", + key, strlen(val), val); + + if (rc == 0) + if (rhnd->reply.integer == 0) + rc = -1; + + return rc; +} + +static int cr_incr(REDIS rhnd, int incr, int decr, const char *key, int *new_val) +{ + int rc = 0; + + if (incr == 1 || decr == 1) + rc = cr_sendfandreceive(rhnd, CR_INT, "%s %s\r\n", + incr>0?"INCR":"DECR", key); + else if (incr > 1 || decr > 1) + rc = cr_sendfandreceive(rhnd, CR_INT, "%s %s %d\r\n", + incr>0?"INCRBY":"DECRBY", key, incr>0?incr:decr); + + if (rc == 0 && new_val != NULL) + *new_val = rhnd->reply.integer; + + return rc; +} + +int credis_incr(REDIS rhnd, const char *key, int *new_val) +{ + return cr_incr(rhnd, 1, 0, key, new_val); +} + +int credis_decr(REDIS rhnd, const char *key, int *new_val) +{ + return cr_incr(rhnd, 0, 1, key, new_val); +} + +int credis_incrby(REDIS rhnd, const char *key, int incr_val, int *new_val) +{ + return cr_incr(rhnd, incr_val, 0, key, new_val); +} + +int credis_decrby(REDIS rhnd, const char *key, int decr_val, int *new_val) +{ + return cr_incr(rhnd, 0, decr_val, key, new_val); +} + +int credis_exists(REDIS rhnd, const char *key) +{ + int rc = cr_sendfandreceive(rhnd, CR_INT, "EXISTS %s\r\n", key); + + if (rc == 0) + if (rhnd->reply.integer == 0) + rc = -1; + + return rc; +} + +int credis_del(REDIS rhnd, const char *key) +{ + int rc = cr_sendfandreceive(rhnd, CR_INT, "DEL %s\r\n", key); + + if (rc == 0) + if (rhnd->reply.integer == 0) + rc = -1; + + return rc; +} + +int credis_type(REDIS rhnd, const char *key) +{ + int rc = cr_sendfandreceive(rhnd, CR_INLINE, "TYPE %s\r\n", key); + + if (rc == 0) { + char *t = rhnd->reply.bulk; + if (!strcmp("string", t)) + rc = CREDIS_TYPE_STRING; + else if (!strcmp("list", t)) + rc = CREDIS_TYPE_LIST; + else if (!strcmp("set", t)) + rc = CREDIS_TYPE_SET; + else + rc = CREDIS_TYPE_NONE; + } + + return rc; +} + +int credis_keys(REDIS rhnd, const char *pattern, char **keyv, int len) +{ + int rc = cr_sendfandreceive(rhnd, CR_BULK, "KEYS %s\r\n", pattern); + char *p = rhnd->reply.bulk; + int i = 0; + + if (rc != 0) { + return -1; + } + + if (!*p) { + return 0; + } + + keyv[i++] = p; + + while ((p = strchr(p, ' ')) && (i < len)) { + *p++ = '\0'; + keyv[i++] = p; + } + return i; +} + +int credis_randomkey(REDIS rhnd, char **key) +{ + int rc = cr_sendfandreceive(rhnd, CR_INLINE, "RANDOMKEY\r\n"); + + if (rc == 0) + *key = rhnd->reply.line; + + return rc; +} + +int credis_rename(REDIS rhnd, const char *key, const char *new_key_name) +{ + return cr_sendfandreceive(rhnd, CR_INLINE, "RENAME %s %s\r\n", + key, new_key_name); +} + +int credis_renamenx(REDIS rhnd, const char *key, const char *new_key_name) +{ + int rc = cr_sendfandreceive(rhnd, CR_INT, "RENAMENX %s %s\r\n", + key, new_key_name); + + if (rc == 0) + if (rhnd->reply.integer == 0) + rc = -1; + + return rc; +} + +int credis_dbsize(REDIS rhnd) +{ + int rc = cr_sendfandreceive(rhnd, CR_INT, "DBSIZE\r\n"); + + if (rc == 0) + rc = rhnd->reply.integer; + + return rc; +} + +int credis_expire(REDIS rhnd, const char *key, int secs) +{ + int rc = cr_sendfandreceive(rhnd, CR_INT, "EXPIRE %s %d\r\n", key, secs); + + if (rc == 0) + if (rhnd->reply.integer == 0) + rc = -1; + + return rc; +} + +int credis_ttl(REDIS rhnd, const char *key) +{ + int rc = cr_sendfandreceive(rhnd, CR_INT, "TTL %s\r\n", key); + + if (rc == 0) + rc = rhnd->reply.integer; + + return rc; +} + +int cr_push(REDIS rhnd, int left, const char *key, const char *val) +{ + return cr_sendfandreceive(rhnd, CR_INLINE, "%s %s %d\r\n%s\r\n", + left==1?"LPUSH":"RPUSH", key, strlen(val), val); +} + +int credis_rpush(REDIS rhnd, const char *key, const char *val) +{ + return cr_push(rhnd, 0, key, val); +} + +int credis_lpush(REDIS rhnd, const char *key, const char *val) +{ + return cr_push(rhnd, 1, key, val); +} + +int credis_llen(REDIS rhnd, const char *key) +{ + int rc = cr_sendfandreceive(rhnd, CR_INT, "LLEN %s\r\n", key); + + if (rc == 0) + rc = rhnd->reply.integer; + + return rc; +} + +int credis_lrange(REDIS rhnd, const char *key, int start, int end, char ***valv) +{ + int rc; + + if ((rc = cr_sendfandreceive(rhnd, CR_MULTIBULK, "LRANGE %s %d %d\r\n", + key, start, end)) == 0) { + *valv = rhnd->reply.multibulk.bulks; + rc = rhnd->reply.multibulk.len; + } + + return rc; +} + +int credis_ltrim(REDIS rhnd, const char *key, int start, int end) +{ + return cr_sendfandreceive(rhnd, CR_INLINE, "LTRIM %s %d %d\r\n", + key, start, end); +} + +int credis_lindex(REDIS rhnd, const char *key, int index, char **val) +{ + int rc = cr_sendfandreceive(rhnd, CR_BULK, "LINDEX %s %d\r\n", key, index); + + if (rc == 0) + if ((*val = rhnd->reply.bulk) == NULL) + return -1; + + return rc; +} + +int credis_lset(REDIS rhnd, const char *key, int index, const char *val) +{ + return cr_sendfandreceive(rhnd, CR_INLINE, "LSET %s %d %s\r\n", key, index, val); +} + +int credis_lrem(REDIS rhnd, const char *key, int count, const char *val) +{ + return cr_sendfandreceive(rhnd, CR_INT, "LREM %s %d %d\r\n", key, count, val); +} + +static int cr_pop(REDIS rhnd, int left, const char *key, char **val) +{ + int rc = cr_sendfandreceive(rhnd, CR_BULK, "%s %s\r\n", + left==1?"LPOP":"RPOP", key); + + if (rc == 0) + if ((*val = rhnd->reply.bulk) == NULL) + return -1; + + return rc; +} + +int credis_lpop(REDIS rhnd, const char *key, char **val) +{ + return cr_pop(rhnd, 1, key, val); +} + +int credis_rpop(REDIS rhnd, const char *key, char **val) +{ + return cr_pop(rhnd, 0, key, val); +} + +int credis_select(REDIS rhnd, int index) +{ + return cr_sendfandreceive(rhnd, CR_INLINE, "SELECT %d\r\n", index); +} + +int credis_move(REDIS rhnd, const char *key, int index) +{ + int rc = cr_sendfandreceive(rhnd, CR_INT, "MOVE %s %d\r\n", key, index); + + if (rc == 0) + if (rhnd->reply.integer == 0) + rc = -1; + + return rc; +} + +int credis_flushdb(REDIS rhnd) +{ + return cr_sendfandreceive(rhnd, CR_INLINE, "FLUSHDB\r\n"); +} + +int credis_flushall(REDIS rhnd) +{ + return cr_sendfandreceive(rhnd, CR_INLINE, "FLUSHALL\r\n"); +} + +int credis_sort(REDIS rhnd, const char *query, char ***elementv) +{ + int rc; + + if ((rc = cr_sendfandreceive(rhnd, CR_MULTIBULK, "SORT %s\r\n", query)) == 0) { + *elementv = rhnd->reply.multibulk.bulks; + rc = rhnd->reply.multibulk.len; + } + + return rc; +} + +int credis_save(REDIS rhnd) +{ + return cr_sendfandreceive(rhnd, CR_INLINE, "SAVE\r\n"); +} + +int credis_bgsave(REDIS rhnd) +{ + return cr_sendfandreceive(rhnd, CR_INLINE, "BGSAVE\r\n"); +} + +int credis_lastsave(REDIS rhnd) +{ + int rc = cr_sendfandreceive(rhnd, CR_INT, "LASTSAVE\r\n"); + + if (rc == 0) + rc = rhnd->reply.integer; + + return rc; +} + +int credis_shutdown(REDIS rhnd) +{ + return cr_sendfandreceive(rhnd, CR_INLINE, "SHUTDOWN\r\n"); +} + +#define CR_NUMBER_OF_ITEMS 12 + +int credis_info(REDIS rhnd, REDIS_INFO *info) +{ + int rc = cr_sendfandreceive(rhnd, CR_BULK, "INFO\r\n"); + + if (rc == 0) { + char role[CREDIS_VERSION_STRING_SIZE]; + int items = sscanf(rhnd->reply.bulk, + "redis_version:%"CR_VERSION_STRING_SIZE_STR"s\r\n" \ + "uptime_in_seconds:%d\r\n" \ + "uptime_in_days:%d\r\n" \ + "connected_clients:%d\r\n" \ + "connected_slaves:%d\r\n" \ + "used_memory:%u\r\n" \ + "changes_since_last_save:%lld\r\n" \ + "bgsave_in_progress:%d\r\n" \ + "last_save_time:%d\r\n" \ + "total_connections_received:%lld\r\n" \ + "total_commands_processed:%lld\r\n" \ + "role:%"CR_VERSION_STRING_SIZE_STR"s\r\n", + info->redis_version, + &(info->uptime_in_seconds), + &(info->uptime_in_days), + &(info->connected_clients), + &(info->connected_slaves), + &(info->used_memory), + &(info->changes_since_last_save), + &(info->bgsave_in_progress), + &(info->last_save_time), + &(info->total_connections_received), + &(info->total_commands_processed), + role); + + if (items != CR_NUMBER_OF_ITEMS) + return CREDIS_ERR_PROTOCOL; /* not enough input items returned */ + + info->role = ((role[0]=='m')?CREDIS_SERVER_MASTER:CREDIS_SERVER_SLAVE); + } + + return rc; +} + +int credis_monitor(REDIS rhnd) +{ + return cr_sendfandreceive(rhnd, CR_INLINE, "MONITOR\r\n"); +} + +int credis_slaveof(REDIS rhnd, const char *host, int port) +{ + if (host == NULL || port == 0) + return cr_sendfandreceive(rhnd, CR_INLINE, "SLAVEOF no one\r\n"); + else + return cr_sendfandreceive(rhnd, CR_INLINE, "SLAVEOF %s %d\r\n", host, port); +} + +static int cr_setaddrem(REDIS rhnd, const char *cmd, const char *key, const char *member) +{ + int rc = cr_sendfandreceive(rhnd, CR_INT, "%s %s %d\r\n%s\r\n", + cmd, key, strlen(member), member); + + if (rc == 0) + if (rhnd->reply.integer == 0) + rc = 1; + + return rc; +} + +int credis_sadd(REDIS rhnd, const char *key, const char *member) +{ + return cr_setaddrem(rhnd, "SADD", key, member); +} + +int credis_srem(REDIS rhnd, const char *key, const char *member) +{ + return cr_setaddrem(rhnd, "SREM", key, member); +} + +int credis_spop(REDIS rhnd, const char *key, char **member) +{ + int rc = cr_sendfandreceive(rhnd, CR_BULK, "SPOP %s\r\n", key); + + if (rc == 0) + if ((*member = rhnd->reply.bulk) == NULL) + rc = -1; + + return rc; +} + +int credis_smove(REDIS rhnd, const char *sourcekey, const char *destkey, + const char *member) +{ + int rc = cr_sendfandreceive(rhnd, CR_INT, "SMOVE %s %s %s\r\n", + sourcekey, destkey, member); + + if (rc == 0) + if (rhnd->reply.integer == 0) + rc = -1; + + return rc; +} + +int credis_scard(REDIS rhnd, const char *key) +{ + int rc = cr_sendfandreceive(rhnd, CR_INT, "SCARD %s\r\n", key); + + if (rc == 0) + rc = rhnd->reply.integer; + + return rc; +} + +int credis_sinter(REDIS rhnd, int keyc, const char **keyv, char ***members) +{ + return cr_multikeybulkcommand(rhnd, "SINTER", keyc, keyv, members); +} + +int credis_sunion(REDIS rhnd, int keyc, const char **keyv, char ***members) +{ + return cr_multikeybulkcommand(rhnd, "SUNION", keyc, keyv, members); +} + +int credis_sdiff(REDIS rhnd, int keyc, const char **keyv, char ***members) +{ + return cr_multikeybulkcommand(rhnd, "SDIFF", keyc, keyv, members); +} + +int credis_sinterstore(REDIS rhnd, const char *destkey, int keyc, const char **keyv) +{ + return cr_multikeystorecommand(rhnd, "SINTERSTORE", destkey, keyc, keyv); +} + +int credis_sunionstore(REDIS rhnd, const char *destkey, int keyc, const char **keyv) +{ + return cr_multikeystorecommand(rhnd, "SUNIONSTORE", destkey, keyc, keyv); +} + +int credis_sdiffstore(REDIS rhnd, const char *destkey, int keyc, const char **keyv) +{ + return cr_multikeystorecommand(rhnd, "SDIFFSTORE", destkey, keyc, keyv); +} + +int credis_sismember(REDIS rhnd, const char *key, const char *member) +{ + return cr_setaddrem(rhnd, "SISMEMBER", key, member); +} + +int credis_smembers(REDIS rhnd, const char *key, char ***members) +{ + return cr_multikeybulkcommand(rhnd, "SMEMBERS", 1, &key, members); +} diff --git a/src/mod/applications/mod_redis/credis.h b/src/mod/applications/mod_redis/credis.h new file mode 100644 index 0000000000..9760ccc8fd --- /dev/null +++ b/src/mod/applications/mod_redis/credis.h @@ -0,0 +1,327 @@ +/* credis.h -- a C client library for Redis, public API. + * + * Copyright (c) 2009-2010, Jonas Romfelt + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef __CREDIS_H +#define __CREDIS_H + +#ifdef __cplusplus +extern "C" { +#endif + +/* Functions below should map quite nicely to Redis 1.02 command set. + * Refer to the official Redis documentation for further explanation of + * each command. See credis examples that show how functions can be used. + * Here is a brief example that connects to a Redis server and sets value + * of key `fruit' to `banana': + * + * REDIS rh = credis_connect("localhost", 6789, 2000); + * credis_set(rh, "fruit", "banana"); + * credis_close(rh); + * + * In general, functions return 0 on success or a negative value on + * error. Refer to CREDIS_ERR_* codes. The return code -1 is typically + * used when for instance a key is not found. + * + * IMPORTANT! Memory buffers are allocated, used and managed by credis + * internally. Subsequent calls to credis functions _will_ destroy the + * data to which returned values reference to. If for instance the + * returned value by a call to credis_get() is to be used later in the + * program, a strdup() is highly recommended. However, each `REDIS' + * handle has its own state and manages its own memory buffer + * independently. That means that one of two handles can be destroyed + * while the other keeps its connection and data. + * + * TODO + * - Currently only support for zero-terminated strings, not for storing + * abritary binary data as bulk data. Basically an API issue since it + * is partially supported internally. + * - Support for Redis >= 1.1 protocol + */ + +/* handle to a Redis server connection */ +typedef struct _cr_redis* REDIS; + +#define CREDIS_OK 0 +#define CREDIS_ERR -90 +#define CREDIS_ERR_NOMEM -91 +#define CREDIS_ERR_RESOLVE -92 +#define CREDIS_ERR_CONNECT -93 +#define CREDIS_ERR_SEND -94 +#define CREDIS_ERR_RECV -95 +#define CREDIS_ERR_TIMEOUT -96 +#define CREDIS_ERR_PROTOCOL -97 + +#define CREDIS_TYPE_NONE 1 +#define CREDIS_TYPE_STRING 2 +#define CREDIS_TYPE_LIST 3 +#define CREDIS_TYPE_SET 4 + +#define CREDIS_SERVER_MASTER 1 +#define CREDIS_SERVER_SLAVE 2 + +#define CREDIS_VERSION_STRING_SIZE 32 + +typedef struct _cr_info { + char redis_version[CREDIS_VERSION_STRING_SIZE]; + int bgsave_in_progress; + int connected_clients; + int connected_slaves; + unsigned int used_memory; + long long changes_since_last_save; + int last_save_time; + long long total_connections_received; + long long total_commands_processed; + int uptime_in_seconds; + int uptime_in_days; + int role; +} REDIS_INFO; + + +/* + * Connection handling + */ + +/* setting host to NULL will use "localhost". setting port to 0 will use + * default port 6379 */ +REDIS credis_connect(const char *host, int port, int timeout); + +void credis_close(REDIS rhnd); + +void credis_quit(REDIS rhnd); + +int credis_auth(REDIS rhnd, const char *password); + +int credis_ping(REDIS rhnd); + +/* + * Commands operating on string values + */ + +int credis_set(REDIS rhnd, const char *key, const char *val); + +/* returns -1 if the key doesn't exists */ +int credis_get(REDIS rhnd, const char *key, char **val); + +/* returns -1 if the key doesn't exists */ +int credis_getset(REDIS rhnd, const char *key, const char *set_val, char **get_val); + +/* returns number of values returned in vector `valv'. `keyc' is the number of + * keys stored in `keyv'. */ +int credis_mget(REDIS rhnd, int keyc, const char **keyv, char ***valv); + +/* returns -1 if the key already exists and hence not set */ +int credis_setnx(REDIS rhnd, const char *key, const char *val); + +int credis_incr(REDIS rhnd, const char *key, int *new_val); + +int credis_incrby(REDIS rhnd, const char *key, int incr_val, int *new_val); + +int credis_decr(REDIS rhnd, const char *key, int *new_val); + +int credis_decrby(REDIS rhnd, const char *key, int decr_val, int *new_val); + +/* returns -1 if the key doesn't exists and 0 if it does */ +int credis_exists(REDIS rhnd, const char *key); + +/* returns -1 if the key doesn't exists and 0 if it was removed */ +int credis_del(REDIS rhnd, const char *key); + +/* returns type, refer to CREDIS_TYPE_* defines */ +int credis_type(REDIS rhnd, const char *key); + +/* TODO for Redis >= 1.1 + * MSET key1 value1 key2 value2 ... keyN valueN set a multiple keys to multiple values in a single atomic operation + * MSETNX key1 value1 key2 value2 ... keyN valueN set a multiple keys to multiple values in a single atomic operation if none of + * DEL key1 key2 ... keyN remove multiple keys + */ + +/* + * Commands operating on key space + */ + +int credis_keys(REDIS rhnd, const char *pattern, char **keyv, int len); + +int credis_randomkey(REDIS rhnd, char **key); + +int credis_rename(REDIS rhnd, const char *key, const char *new_key_name); + +/* returns -1 if the key already exists */ +int credis_renamenx(REDIS rhnd, const char *key, const char *new_key_name); + +/* returns size of db */ +int credis_dbsize(REDIS rhnd); + +/* returns -1 if the timeout was not set; either due to key already has + an associated timeout or key does not exist */ +int credis_expire(REDIS rhnd, const char *key, int secs); + +/* returns time to live seconds or -1 if key does not exists or does not + * have expire set */ +int credis_ttl(REDIS rhnd, const char *key); + +/* + * Commands operating on lists + */ + +int credis_rpush(REDIS rhnd, const char *key, const char *element); + +int credis_lpush(REDIS rhnd, const char *key, const char *element); + +/* returns length of list */ +int credis_llen(REDIS rhnd, const char *key); + +/* returns number of elements returned in vector `elementv' */ +int credis_lrange(REDIS rhnd, const char *key, int start, int range, char ***elementv); + +int credis_ltrim(REDIS rhnd, const char *key, int start, int end); + +/* returns -1 if the key doesn't exists */ +int credis_lindex(REDIS rhnd, const char *key, int index, char **element); + +int credis_lset(REDIS rhnd, const char *key, int index, const char *element); + +/* returns number of elements removed */ +int credis_lrem(REDIS rhnd, const char *key, int count, const char *element); + +/* returns -1 if the key doesn't exists */ +int credis_lpop(REDIS rhnd, const char *key, char **val); + +/* returns -1 if the key doesn't exists */ +int credis_rpop(REDIS rhnd, const char *key, char **val); + +/* TODO for Redis >= 1.1 + * RPOPLPUSH srckey dstkey + * + * TODO for Redis >= 1.3.1 + * BLPOP key1 key2 ... keyN timeout + * BRPOP key1 key2 ... keyN timeout + */ + +/* + * Commands operating on sets + */ + +/* returns -1 if the given member was already a member of the set */ +int credis_sadd(REDIS rhnd, const char *key, const char *member); + +/* returns -1 if the given member is not a member of the set */ +int credis_srem(REDIS rhnd, const char *key, const char *member); + +/* returns -1 if the key doesn't exists and 0 if it does */ +int credis_sismember(REDIS rhnd, const char *key, const char *member); + +/* returns -1 if the given key doesn't exists else value is returned in `member' */ +int credis_spop(REDIS rhnd, const char *key, char **member); + +/* returns -1 if the member doesn't exists in the source set */ +int credis_smove(REDIS rhnd, const char *sourcekey, const char *destkey, + const char *member); + +/* returns cardinality (number of members) or 0 if the given key doesn't exists */ +int credis_scard(REDIS rhnd, const char *key); + +/* returns number of members returned in vector `members'. `keyc' is the number of + * keys stored in `keyv'. */ +int credis_sinter(REDIS rhnd, int keyc, const char **keyv, char ***members); + +/* `keyc' is the number of keys stored in `keyv' */ +int credis_sinterstore(REDIS rhnd, const char *destkey, int keyc, const char **keyv); + +/* returns number of members returned in vector `members'. `keyc' is the number of + * keys stored in `keyv'. */ +int credis_sunion(REDIS rhnd, int keyc, const char **keyv, char ***members); + +/* `keyc' is the number of keys stored in `keyv' */ +int credis_sunionstore(REDIS rhnd, const char *destkey, int keyc, const char **keyv); + +/* returns number of members returned in vector `members'. `keyc' is the number of + * keys stored in `keyv'. */ +int credis_sdiff(REDIS rhnd, int keyc, const char **keyv, char ***members); + +/* `keyc' is the number of keys stored in `keyv' */ +int credis_sdiffstore(REDIS rhnd, const char *destkey, int keyc, const char **keyv); + +/* returns number of members returned in vector `members' */ +int credis_smembers(REDIS rhnd, const char *key, char ***members); + +/* TODO Redis >= 1.1 + * SRANDMEMBER key Return a random member of the Set value at key + */ + +/* + * Multiple databases handling commands + */ + +int credis_select(REDIS rhnd, int index); + +/* returns -1 if the key was not moved; already present at target + * or not found on current db */ +int credis_move(REDIS rhnd, const char *key, int index); + +int credis_flushdb(REDIS rhnd); + +int credis_flushall(REDIS rhnd); + +/* + * Sorting + */ + +/* returns number of elements returned in vector `elementv' */ +int credis_sort(REDIS rhnd, const char *query, char ***elementv); + +/* + * Persistence control commands + */ + +int credis_save(REDIS rhnd); + +int credis_bgsave(REDIS rhnd); + +/* returns UNIX time stamp of last successfull save to disk */ +int credis_lastsave(REDIS rhnd); + +int credis_shutdown(REDIS rhnd); + +/* + * Remote server control commands + */ + +int credis_info(REDIS rhnd, REDIS_INFO *info); + +int credis_monitor(REDIS rhnd); + +/* setting host to NULL and/or port to 0 will turn off replication */ +int credis_slaveof(REDIS rhnd, const char *host, int port); + +#ifdef __cplusplus +} +#endif + +#endif /* __CREDIS_H */ diff --git a/src/mod/applications/mod_redis/mod_redis.c b/src/mod/applications/mod_redis/mod_redis.c new file mode 100755 index 0000000000..ca509520c0 --- /dev/null +++ b/src/mod/applications/mod_redis/mod_redis.c @@ -0,0 +1,325 @@ +/* + * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application + * Copyright (C) 2005-2010, Anthony Minessale II + * + * Version: MPL 1.1 + * + * The contents of this file are subject to the Mozilla Public License Version + * 1.1 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" basis, + * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License + * for the specific language governing rights and limitations under the + * License. + * + * The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application + * + * The Initial Developer of the Original Code is + * Anthony Minessale II + * Portions created by the Initial Developer are Copyright (C) + * the Initial Developer. All Rights Reserved. + * + * Contributor(s): + * Kevin Morizur + * Mathieu Rene + * + * mod_redis.c -- Redis limit backend + * + */ + +#include +#include "credis.h" + +SWITCH_MODULE_LOAD_FUNCTION(mod_redis_load); +SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_redis_shutdown); +SWITCH_MODULE_DEFINITION(mod_redis, mod_redis_load, NULL, mod_redis_shutdown); + +static struct{ + char *host; + int port; + int timeout; +} globals; + +static switch_xml_config_item_t instructions[] = { + /* parameter name type reloadable pointer default value options structure */ + SWITCH_CONFIG_ITEM_STRING_STRDUP("host", CONFIG_RELOAD, &globals.host, NULL, "localhost", "Hostname for redis server"), + SWITCH_CONFIG_ITEM("port", SWITCH_CONFIG_INT, CONFIG_RELOADABLE, &globals.port, (void *) 6379, NULL,NULL, NULL), + SWITCH_CONFIG_ITEM("timeout", SWITCH_CONFIG_INT, CONFIG_RELOADABLE, &globals.timeout, (void *) 10000, NULL,NULL, NULL), + SWITCH_CONFIG_ITEM_END() +}; + +/* HASH STUFF */ +typedef struct { + switch_hash_t *hash; + switch_mutex_t *mutex; +} limit_redis_private_t; + +static switch_status_t redis_factory(REDIS *redis) +{ + if (!((*redis) = credis_connect(globals.host, globals.port, globals.timeout))) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't connect to redis server at %s:%d timeout:%d\n", globals.host, globals.port, globals.timeout); + return SWITCH_STATUS_FALSE; + } + return SWITCH_STATUS_SUCCESS; +} + +/* \brief Enforces limit_redis restrictions + * \param session current session + * \param realm limit realm + * \param id limit id + * \param max maximum count + * \param interval interval for rate limiting + * \return SWITCH_TRUE if the access is allowed, SWITCH_FALSE if it isnt + */ +SWITCH_LIMIT_INCR(limit_incr_redis) +{ + switch_channel_t *channel = switch_core_session_get_channel(session); + limit_redis_private_t *pvt = NULL; + int val,uuid_val; + char *rediskey = NULL; + char *uuid_rediskey = NULL; + uint8_t increment = 1; + switch_status_t status = SWITCH_STATUS_SUCCESS; + REDIS redis; + + if (redis_factory(&redis) != SWITCH_STATUS_SUCCESS) { + return SWITCH_STATUS_FALSE; + } + + /* Get the keys for redis server */ + uuid_rediskey = switch_core_session_sprintf(session,"%s_%s_%s", switch_core_get_variable("hostname"), realm, resource); + rediskey = switch_core_session_sprintf(session, "%s_%s", realm, resource); + + if ((pvt = switch_channel_get_private(channel, "limit_redis"))) { + increment = !switch_core_hash_find_locked(pvt->hash, rediskey, pvt->mutex); + } else { + /* This is the first limit check on this channel, create a hashtable, set our prviate data and add a state handler */ + pvt = (limit_redis_private_t *) switch_core_session_alloc(session, sizeof(limit_redis_private_t)); + switch_core_hash_init(&pvt->hash, switch_core_session_get_pool(session)); + switch_mutex_init(&pvt->mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session)); + switch_channel_set_private(channel, "limit_redis", pvt); + } + + if (!(switch_core_hash_find_locked(pvt->hash, rediskey, pvt->mutex))) { + switch_core_hash_insert_locked(pvt->hash, rediskey, rediskey, pvt->mutex); + } + + if (increment) { + if (credis_incr(redis, rediskey, &val) != 0) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't increment value corresponding to %s\n", rediskey); + switch_goto_status(SWITCH_STATUS_FALSE, end); + } + + if (max > 0) { + if (val > max){ + if (credis_decr(redis, rediskey, &val) != 0) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Couldn't decrement value corresponding to %s\n", rediskey); + switch_goto_status(SWITCH_STATUS_GENERR, end); + } else { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "Usage for %s exceeds maximum rate of %d\n", + rediskey, max); + switch_goto_status(SWITCH_STATUS_FALSE, end); + } + } else { + if (credis_incr(redis, uuid_rediskey, &uuid_val) != 0) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Couldn't increment value corresponding to %s\n", uuid_rediskey); + switch_goto_status(SWITCH_STATUS_FALSE, end); + } + } + } + } +/* + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG10, "Limit incr redis : rediskey : %s val : %d max : %d\n", rediskey, val, max); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG10, "Limit incr redis : uuid_rediskey : %s uuid_val : %d max : %d\n", uuid_rediskey,uuid_val,max); +*/ +end: + if (redis) { + credis_close(redis); + } + return status; +} + +/* !\brief Releases usage of a limit_redis-controlled ressource */ +SWITCH_LIMIT_RELEASE(limit_release_redis) +{ + switch_channel_t *channel = switch_core_session_get_channel(session); + limit_redis_private_t *pvt = switch_channel_get_private(channel, "limit_redis"); + int val, uuid_val; + switch_hash_index_t *hi; + char *rediskey = NULL; + char *uuid_rediskey = NULL; + int status = SWITCH_STATUS_SUCCESS; + REDIS redis; + + if (!pvt || !pvt->hash) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "No hashtable for channel %s\n", switch_channel_get_name(channel)); + return SWITCH_STATUS_SUCCESS; + } + + if (redis_factory(&redis) != SWITCH_STATUS_SUCCESS) { + return SWITCH_STATUS_FALSE; + } + + switch_mutex_lock(pvt->mutex); + + /* clear for uuid */ + if (realm == NULL && resource == NULL) { + /* Loop through the channel's hashtable which contains mapping to all the limit_redis_item_t referenced by that channel */ + while ((hi = switch_hash_first(NULL, pvt->hash))) { + void *p_val = NULL; + const void *p_key; + char *p_uuid_key = NULL; + switch_ssize_t keylen; + + switch_hash_this(hi, &p_key, &keylen, &p_val); + + if (credis_decr(redis, (const char*)p_key, &val) != 0) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Couldn't decrement value corresponding to %s\n", (char *)p_key); + switch_goto_status(SWITCH_STATUS_FALSE, end); + } + p_uuid_key = switch_core_session_sprintf(session, "%s_%s", switch_core_get_variable("hostname"), (char *)p_key); + if (credis_decr(redis,p_uuid_key,&uuid_val) != 0) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Couldn't decrement value corresponding to %s\n", p_uuid_key); + switch_goto_status(SWITCH_STATUS_FALSE, end); + } + switch_core_hash_delete(pvt->hash, (const char *) p_key); + /* + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG10, "Limit release redis : rediskey : %s val : %d\n", (char *)p_val,val); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG10, "Limit incr redis : uuid_rediskey : %s uuid_val : %d\n", + p_uuid_key, uuid_val);*/ + } + + } else { + rediskey = switch_core_session_sprintf(session, "%s_%s", realm, resource); + uuid_rediskey = switch_core_session_sprintf(session, "%s_%s_%s", switch_core_get_variable("hostname"), realm, resource); + switch_core_hash_delete(pvt->hash, (const char *) rediskey); + + if (credis_decr(redis, rediskey, &val) != 0) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Couldn't decrement value corresponding to %s\n", rediskey); + switch_goto_status(SWITCH_STATUS_FALSE, end); + } + if (credis_decr(redis, uuid_rediskey, &uuid_val) != 0) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Couldn't decrement value corresponding to %s\n", uuid_rediskey); + switch_goto_status(SWITCH_STATUS_FALSE, end); + } + +/* + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "Limit release redis : rediskey : %s val : %d\n", rediskey,val); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "Limit incr redis : uuid_rediskey : %s uuid_val : %d\n", uuid_rediskey,uuid_val); +*/ + } +end: + switch_mutex_unlock(pvt->mutex); + if (redis) { + credis_close(redis); + } + return status; +} + +SWITCH_LIMIT_USAGE(limit_usage_redis) +{ + char *redis_key; + char *str; + REDIS redis; + int usage; + + if (redis_factory(&redis) != SWITCH_STATUS_SUCCESS) { + return 0; + } + + redis_key = switch_mprintf("%s_%s", realm, resource); + + if (credis_get(redis, redis_key, &str) != 0){ + usage = 0; + } else { + usage = atoi(str); + } + + if (redis) { + credis_close(redis); + } + + switch_safe_free(redis_key); + return usage; +} + +SWITCH_LIMIT_RESET(limit_reset_redis) +{ + REDIS redis; + if (redis_factory(&redis) == SWITCH_STATUS_SUCCESS) { + char *rediskey = switch_mprintf("%s_*", switch_core_get_variable("hostname")); + int dec = 0, val = 0, keyc; + char *uuids[2000]; + + if ((keyc = credis_keys(redis, rediskey, uuids, switch_arraylen(uuids))) > 0) { + int i = 0; + int hostnamelen = strlen(switch_core_get_variable("hostname"))+1; + + for (i = 0; i < keyc && uuids[i]; i++){ + const char *key = uuids[i] + hostnamelen; + char *value; + + if (strlen(uuids[i]) <= hostnamelen) { + continue; /* Sanity check */ + } + + credis_get(redis, key, &value); + dec = atoi(value); + credis_decrby(redis, key, dec, &val); + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "DECR %s by %d. value is now %d\n", key, dec, val); + } + } + switch_safe_free(rediskey); + credis_close(redis); + return SWITCH_STATUS_SUCCESS; + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Couldn't check/clear old redis entries\n"); + return SWITCH_STATUS_FALSE; + } +} + +SWITCH_LIMIT_STATUS(limit_status_redis) +{ + char *ret = switch_mprintf("This function is not yet available for Redis DB"); + return ret; +} + +SWITCH_MODULE_LOAD_FUNCTION(mod_redis_load) +{ + switch_limit_interface_t *limit_interface = NULL; + + *module_interface = switch_loadable_module_create_module_interface(pool, modname); + + if (switch_xml_config_parse_module_settings("redis.conf", SWITCH_FALSE, instructions) != SWITCH_STATUS_SUCCESS) { + return SWITCH_STATUS_FALSE; + } + + /* If FreeSWITCH was restarted and we still have active calls, decrement them so our global count stays valid */ + limit_reset_redis(); + + SWITCH_ADD_LIMIT(limit_interface, "redis", limit_incr_redis, limit_release_redis, limit_usage_redis, limit_reset_redis, limit_status_redis); + + return SWITCH_STATUS_SUCCESS; +} + +SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_redis_shutdown) +{ + + switch_xml_config_cleanup(instructions); + + return SWITCH_STATUS_SUCCESS; +} + +/* For Emacs: + * Local Variables: + * mode:c + * indent-tabs-mode:t + * tab-width:4 + * c-basic-offset:4 + * End: + * For VIM: + * vim:set softtabstop=4 shiftwidth=4 tabstop=4: + */