fix fifo issues

git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@13047 d0543943-73ff-0310-b7d9-9358b9ac24b2
This commit is contained in:
Anthony Minessale 2009-04-15 22:01:27 +00:00
parent 6810445738
commit 00139e9502
1 changed files with 48 additions and 10 deletions

View File

@ -38,7 +38,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_fifo_load);
SWITCH_MODULE_DEFINITION(mod_fifo, mod_fifo_load, mod_fifo_shutdown, NULL); SWITCH_MODULE_DEFINITION(mod_fifo, mod_fifo_load, mod_fifo_shutdown, NULL);
#define FIFO_EVENT "fifo::info" #define FIFO_EVENT "fifo::info"
#define FIFO_DELAY_DESTROY 100
static switch_status_t load_config(int reload, int del_all); static switch_status_t load_config(int reload, int del_all);
#define MAX_PRI 10 #define MAX_PRI 10
@ -369,13 +369,16 @@ static fifo_node_t *create_node(const char *name, uint32_t importance)
{ {
fifo_node_t *node; fifo_node_t *node;
int x = 0; int x = 0;
switch_memory_pool_t *pool;
if (!globals.running) { if (!globals.running) {
return NULL; return NULL;
} }
node = switch_core_alloc(globals.pool, sizeof(*node)); switch_core_new_memory_pool(&pool);
node->pool = globals.pool;
node = switch_core_alloc(pool, sizeof(*node));
node->pool = pool;
node->name = switch_core_strdup(node->pool, name); node->name = switch_core_strdup(node->pool, name);
for (x = 0; x < MAX_PRI; x++) { for (x = 0; x < MAX_PRI; x++) {
@ -387,7 +390,9 @@ static fifo_node_t *create_node(const char *name, uint32_t importance)
switch_thread_rwlock_create(&node->rwlock, node->pool); switch_thread_rwlock_create(&node->rwlock, node->pool);
switch_mutex_init(&node->mutex, SWITCH_MUTEX_NESTED, node->pool); switch_mutex_init(&node->mutex, SWITCH_MUTEX_NESTED, node->pool);
node->importance = importance; node->importance = importance;
switch_mutex_lock(globals.mutex);
switch_core_hash_insert(globals.fifo_hash, name, node); switch_core_hash_insert(globals.fifo_hash, name, node);
switch_mutex_unlock(globals.mutex);
return node; return node;
} }
@ -526,7 +531,7 @@ static void find_consumers(fifo_node_t *node)
sql = switch_mprintf("select uuid, fifo_name, originate_string, simo_count, use_count, timeout, lag, " sql = switch_mprintf("select uuid, fifo_name, originate_string, simo_count, use_count, timeout, lag, "
"next_avail, expires, static, outbound_call_count, outbound_fail_count, hostname " "next_avail, expires, static, outbound_call_count, outbound_fail_count, hostname "
"from fifo_outbound where (fifo_name = '%s') and (use_count < simo_count) and (next_avail = 0 or next_avail <= %ld) " "from fifo_outbound where (fifo_name = '%q') and (use_count < simo_count) and (next_avail = 0 or next_avail <= %ld) "
"order by outbound_call_count", node->name, (long) switch_epoch_time_now(NULL)); "order by outbound_call_count", node->name, (long) switch_epoch_time_now(NULL));
switch_assert(sql); switch_assert(sql);
@ -747,6 +752,7 @@ SWITCH_STANDARD_APP(fifo_function)
} }
} }
if (!(node = switch_core_hash_find(globals.fifo_hash, nlist[i]))) { if (!(node = switch_core_hash_find(globals.fifo_hash, nlist[i]))) {
node = create_node(nlist[i], importance); node = create_node(nlist[i], importance);
} }
@ -963,12 +969,15 @@ SWITCH_STANDARD_APP(fifo_function)
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", cd.do_orbit ? "timeout" : "abort"); switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "FIFO-Action", cd.do_orbit ? "timeout" : "abort");
switch_event_fire(&event); switch_event_fire(&event);
} }
switch_mutex_lock(globals.mutex);
switch_mutex_lock(node->mutex); switch_mutex_lock(node->mutex);
node_remove_uuid(node, uuid); node_remove_uuid(node, uuid);
node->caller_count--; node->caller_count--;
switch_core_hash_delete(node->caller_hash, uuid); switch_core_hash_delete(node->caller_hash, uuid);
switch_mutex_unlock(node->mutex); switch_mutex_unlock(node->mutex);
send_presence(node); send_presence(node);
switch_mutex_unlock(globals.mutex);
} }
@ -1406,6 +1415,19 @@ SWITCH_STANDARD_APP(fifo_function)
} }
done: done:
switch_mutex_lock(globals.mutex);
if (node && node->ready == FIFO_DELAY_DESTROY && node->consumer_count == 0 && node->caller_count == 0) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed. (delayed)\n", node->name);
switch_core_hash_delete(globals.fifo_hash, node->name);
switch_core_hash_destroy(&node->caller_hash);
switch_core_hash_destroy(&node->consumer_hash);
switch_thread_rwlock_unlock(node->rwlock);
switch_core_destroy_memory_pool(&node->pool);
}
switch_mutex_unlock(globals.mutex);
switch_channel_clear_app_flag(channel, CF_APP_TAGGED); switch_channel_clear_app_flag(channel, CF_APP_TAGGED);
switch_core_media_bug_resume(session); switch_core_media_bug_resume(session);
@ -1460,7 +1482,9 @@ static int xml_callback(void *pArg, int argc, char **argv, char **columnNames)
static int xml_outbound(switch_xml_t xml, fifo_node_t *node, char *container, char *tag, int cc_off, int verbose) static int xml_outbound(switch_xml_t xml, fifo_node_t *node, char *container, char *tag, int cc_off, int verbose)
{ {
struct xml_helper h; struct xml_helper h;
char *sql = "select uuid, fifo_name, originate_string, simo_count, use_count, timeout, lag, next_avail, expires, static, outbound_call_count, outbound_fail_count, hostname from fifo_outbound"; char *sql = switch_mprintf("select uuid, fifo_name, originate_string, simo_count, use_count, timeout, "
"lag, next_avail, expires, static, outbound_call_count, outbound_fail_count, "
"hostname from fifo_outbound where fifo_name = '%q'", node->name);
h.xml = xml; h.xml = xml;
h.node = node; h.node = node;
@ -1471,6 +1495,8 @@ static int xml_outbound(switch_xml_t xml, fifo_node_t *node, char *container, ch
fifo_execute_sql_callback(globals.sql_mutex, sql, xml_callback, &h); fifo_execute_sql_callback(globals.sql_mutex, sql, xml_callback, &h);
switch_safe_free(sql);
return h.cc_off; return h.cc_off;
} }
@ -1763,12 +1789,14 @@ static switch_status_t load_config(int reload, int del_all)
switch_hash_index_t *hi; switch_hash_index_t *hi;
fifo_node_t *node; fifo_node_t *node;
void *val; void *val;
switch_mutex_lock(globals.mutex);
for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) { for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) {
switch_hash_this(hi, NULL, NULL, &val); switch_hash_this(hi, NULL, NULL, &val);
if ((node = (fifo_node_t *) val) && node->is_static) { if ((node = (fifo_node_t *) val) && node->is_static) {
node->ready = 0; node->ready = 0;
} }
} }
switch_mutex_unlock(globals.mutex);
} }
if (del_all) { if (del_all) {
@ -1869,6 +1897,8 @@ static switch_status_t load_config(int reload, int del_all)
switch_hash_index_t *hi; switch_hash_index_t *hi;
void *val, *pop; void *val, *pop;
fifo_node_t *node; fifo_node_t *node;
switch_mutex_lock(globals.mutex);
top:
for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) { for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) {
int x = 0; int x = 0;
switch_hash_this(hi, NULL, NULL, &val); switch_hash_this(hi, NULL, NULL, &val);
@ -1877,8 +1907,8 @@ static switch_status_t load_config(int reload, int del_all)
} }
if (node_consumer_wait_count(node) || node->consumer_count || node_idle_consumers(node)) { if (node_consumer_wait_count(node) || node->consumer_count || node_idle_consumers(node)) {
node->ready = 1; switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s removal delayed, still in use.\n", node->name);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s not removed, still in use.\n", node->name); node->ready = FIFO_DELAY_DESTROY;
} else { } else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", node->name); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", node->name);
switch_thread_rwlock_wrlock(node->rwlock); switch_thread_rwlock_wrlock(node->rwlock);
@ -1888,11 +1918,15 @@ static switch_status_t load_config(int reload, int del_all)
} }
} }
switch_core_hash_delete(globals.fifo_hash, node->name);
switch_core_hash_destroy(&node->caller_hash); switch_core_hash_destroy(&node->caller_hash);
switch_core_hash_destroy(&node->consumer_hash); switch_core_hash_destroy(&node->consumer_hash);
switch_thread_rwlock_unlock(node->rwlock); switch_thread_rwlock_unlock(node->rwlock);
switch_core_destroy_memory_pool(&node->pool);
goto top;
} }
} }
switch_mutex_unlock(globals.mutex);
} }
@ -1924,7 +1958,8 @@ static void fifo_member_add(char *fifo_name, char *originate_string, int simo_co
node->has_outbound = 1; node->has_outbound = 1;
sql = switch_mprintf("insert into fifo_outbound " sql = switch_mprintf("insert into fifo_outbound "
"(uuid, fifo_name, originate_string, simo_count, use_count, timeout, lag, next_avail, expires, static, outbound_call_count, outbound_fail_count, hostname) " "(uuid, fifo_name, originate_string, simo_count, use_count, timeout, "
"lag, next_avail, expires, static, outbound_call_count, outbound_fail_count, hostname) "
"values ('%q','%q','%q',%d,%d,%d,%d,%d,%ld,0,0,0,'%q')", "values ('%q','%q','%q',%d,%d,%d,%d,%d,%ld,0,0,0,'%q')",
digest, fifo_name, originate_string, simo_count, 0, timeout, lag, 0, (long)expires, globals.hostname digest, fifo_name, originate_string, simo_count, 0, timeout, lag, 0, (long)expires, globals.hostname
); );
@ -2088,7 +2123,7 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown)
stop_node_thread(); stop_node_thread();
} }
top:
for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) { for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) {
int x = 0; int x = 0;
switch_hash_this(hi, NULL, NULL, &val); switch_hash_this(hi, NULL, NULL, &val);
@ -2101,9 +2136,12 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown)
} }
} }
switch_core_hash_delete(globals.fifo_hash, node->name);
switch_core_hash_destroy(&node->caller_hash); switch_core_hash_destroy(&node->caller_hash);
switch_core_hash_destroy(&node->consumer_hash); switch_core_hash_destroy(&node->consumer_hash);
switch_thread_rwlock_unlock(node->rwlock); switch_thread_rwlock_unlock(node->rwlock);
switch_core_destroy_memory_pool(&node->pool);
goto top;
} }
switch_core_hash_destroy(&globals.fifo_hash); switch_core_hash_destroy(&globals.fifo_hash);
memset(&globals, 0, sizeof(globals)); memset(&globals, 0, sizeof(globals));