From c8de5ee105a76fae6866c49d6775f5a96f623b1e Mon Sep 17 00:00:00 2001 From: Marc Olivier Chouinard Date: Fri, 25 Sep 2009 21:18:09 +0000 Subject: [PATCH] mod_fifo: allow to call outbound member on on-the-fly fifo, also add a settings params to delete or keep all dynamic fifo entry (MODAPP-332) git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@14989 d0543943-73ff-0310-b7d9-9358b9ac24b2 --- conf/autoload_configs/fifo.conf.xml | 3 ++ src/mod/applications/mod_fifo/mod_fifo.c | 61 +++++++++++++++++++----- 2 files changed, 52 insertions(+), 12 deletions(-) diff --git a/conf/autoload_configs/fifo.conf.xml b/conf/autoload_configs/fifo.conf.xml index ff99134848..b1db4dd2bd 100644 --- a/conf/autoload_configs/fifo.conf.xml +++ b/conf/autoload_configs/fifo.conf.xml @@ -1,4 +1,7 @@ + + + diff --git a/src/mod/applications/mod_fifo/mod_fifo.c b/src/mod/applications/mod_fifo/mod_fifo.c index b06a441dbe..e53a264711 100644 --- a/src/mod/applications/mod_fifo/mod_fifo.c +++ b/src/mod/applications/mod_fifo/mod_fifo.c @@ -59,6 +59,23 @@ struct fifo_node { typedef struct fifo_node fifo_node_t; +struct callback { + char *buf; + size_t len; + int matches; +}; +typedef struct callback callback_t; + +static int sql2str_callback(void *pArg, int argc, char **argv, char **columnNames) +{ + callback_t *cbt = (callback_t *) pArg; + + switch_copy_string(cbt->buf, argv[0], cbt->len); + cbt->matches++; + return 0; +} + + static switch_status_t on_dtmf(switch_core_session_t *session, void *input, switch_input_type_t itype, void *buf, unsigned int buflen) { switch_core_session_t *bleg = (switch_core_session_t *) buf; @@ -349,11 +366,14 @@ static switch_bool_t fifo_execute_sql_callback(switch_mutex_t *mutex, char *sql, return ret; } -static fifo_node_t *create_node(const char *name, uint32_t importance) +static fifo_node_t *create_node(const char *name, uint32_t importance, switch_mutex_t *mutex) { fifo_node_t *node; int x = 0; switch_memory_pool_t *pool; + char outbound_count[80] = ""; + callback_t cbt = { 0 }; + char *sql = NULL; if (!globals.running) { return NULL; @@ -373,7 +393,17 @@ static fifo_node_t *create_node(const char *name, uint32_t importance) switch_core_hash_init(&node->consumer_hash, node->pool); switch_thread_rwlock_create(&node->rwlock, node->pool); switch_mutex_init(&node->mutex, SWITCH_MUTEX_NESTED, node->pool); + cbt.buf = outbound_count; + cbt.len = sizeof(outbound_count); + sql = switch_mprintf("select count(*) from fifo_outbound where fifo_name = '%q'", name); + fifo_execute_sql_callback(mutex, sql, sql2str_callback, &cbt); + if (atoi(outbound_count) > 0) { + node->has_outbound = 1; + } + switch_safe_free(sql); + node->importance = importance; + switch_mutex_lock(globals.mutex); switch_core_hash_insert(globals.fifo_hash, name, node); switch_mutex_unlock(globals.mutex); @@ -560,20 +590,22 @@ static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *o switch_mutex_lock(globals.mutex); for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) { switch_hash_this(hi, &var, NULL, &val); - if ((node = (fifo_node_t *) val) && node->has_outbound && node->ready) { + if ((node = (fifo_node_t *) val)) { + if (node->has_outbound && node->ready) { switch_mutex_lock(node->mutex); ppl_waiting = node_consumer_wait_count(node); consumer_total = node->consumer_count; idle_consumers = node_idle_consumers(node); - //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, - //"%s waiting %d consumer_total %d idle_consumers %d ring_consumers\n", node->name, ppl_waiting, consumer_total, idle_consumers, node->ring_consumer_count); + /* switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, + "%s waiting %d consumer_total %d idle_consumers %d ring_consumers %d\n", node->name, ppl_waiting, consumer_total, idle_consumers, node->ring_consumer_count); */ if ((ppl_waiting - node->ring_consumer_count > 0)&& (!consumer_total || !idle_consumers)) { find_consumers(node); } switch_mutex_unlock(node->mutex); } + } } switch_mutex_unlock(globals.mutex); @@ -678,7 +710,8 @@ static void pres_event_handler(switch_event_t *event) switch_mutex_lock(globals.mutex); if (!(node = switch_core_hash_find(globals.fifo_hash, node_name))) { - node = create_node(node_name, 0); + node = create_node(node_name, 0, globals.sql_mutex); + node->ready = 1; } send_presence(node); @@ -700,7 +733,7 @@ static uint32_t fifo_add_outbound(const char *node_name, const char *url, uint32 switch_mutex_lock(globals.mutex); if (!(node = switch_core_hash_find(globals.fifo_hash, node_name))) { - node = create_node(node_name, 0); + node = create_node(node_name, 0, globals.sql_mutex); } switch_mutex_unlock(globals.mutex); @@ -839,7 +872,8 @@ SWITCH_STANDARD_APP(fifo_function) if (!(node = switch_core_hash_find(globals.fifo_hash, nlist[i]))) { - node = create_node(nlist[i], importance); + node = create_node(nlist[i], importance, globals.sql_mutex); + node->ready = 1; } node_list[node_count++] = node; } @@ -1846,6 +1880,7 @@ static switch_status_t load_config(int reload, int del_all) switch_core_db_t *db; switch_status_t status = SWITCH_STATUS_SUCCESS; char *sql; + switch_bool_t delete_all_outbound_member_on_startup = SWITCH_FALSE; gethostname(globals.hostname, sizeof(globals.hostname)); @@ -1874,11 +1909,12 @@ static switch_status_t load_config(int reload, int del_all) } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "ODBC IS NOT AVAILABLE!\n"); } + } else if (!strcasecmp(var, "delete-all-outbound-member-on-startup")) { + delete_all_outbound_member_on_startup = switch_true(val); } } } - if (switch_strlen_zero(globals.odbc_dsn) || switch_strlen_zero(odbc_user) || switch_strlen_zero(odbc_pass)) { globals.dbname = "fifo"; } @@ -1903,7 +1939,7 @@ static switch_status_t load_config(int reload, int del_all) } } else { if ((db = switch_core_db_open_file(globals.dbname))) { - switch_core_db_test_reactive(db, "delete from fifo_outbound", NULL, (char *)outbound_sql); + switch_core_db_test_reactive(db, "delete from fifo_outbound where static = 1", NULL, (char *)outbound_sql); } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Cannot Open SQL Database!\n"); status = SWITCH_STATUS_FALSE; @@ -1926,7 +1962,7 @@ static switch_status_t load_config(int reload, int del_all) switch_mutex_unlock(globals.mutex); } - if (del_all) { + if ((reload && del_all) || (!reload && delete_all_outbound_member_on_startup) ) { sql = switch_mprintf("delete from fifo_outbound where hostname='%q'", globals.hostname); } else { sql = switch_mprintf("delete from fifo_outbound where static=1 and hostname='%q'", globals.hostname); @@ -1962,7 +1998,7 @@ static switch_status_t load_config(int reload, int del_all) switch_mutex_lock(globals.mutex); if (!(node = switch_core_hash_find(globals.fifo_hash, name))) { - node = create_node(name, imp); + node = create_node(name, imp, globals.sql_mutex); } switch_mutex_unlock(globals.mutex); @@ -2086,7 +2122,8 @@ static void fifo_member_add(char *fifo_name, char *originate_string, int simo_co switch_mutex_lock(globals.mutex); if (!(node = switch_core_hash_find(globals.fifo_hash, fifo_name))) { - node = create_node(fifo_name, 0); + node = create_node(fifo_name, 0, globals.sql_mutex); + node->ready = 1; } switch_mutex_unlock(globals.mutex);