From 7055d02750cbd1b954c534a991d3115658cc6576 Mon Sep 17 00:00:00 2001 From: Anthony Minessale Date: Fri, 16 Jul 2010 11:43:23 -0500 Subject: [PATCH] fifo tweaks --- src/mod/applications/mod_fifo/mod_fifo.c | 43 +++++++++++++++++------- 1 file changed, 30 insertions(+), 13 deletions(-) diff --git a/src/mod/applications/mod_fifo/mod_fifo.c b/src/mod/applications/mod_fifo/mod_fifo.c index 2e10e4fcea..14192851c7 100644 --- a/src/mod/applications/mod_fifo/mod_fifo.c +++ b/src/mod/applications/mod_fifo/mod_fifo.c @@ -255,7 +255,7 @@ struct fifo_node { switch_memory_pool_t *pool; int has_outbound; int ready; - int busy; + long busy; int is_static; int outbound_per_cycle; char *outbound_name; @@ -968,6 +968,7 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void char uuid_str[SWITCH_UUID_FORMATTED_LENGTH + 1]; switch_call_cause_t cancel_cause = 0; char *uuid_list = NULL; + int connected = 0; switch_uuid_get(&uuid); switch_uuid_format(uuid_str, &uuid); @@ -984,8 +985,8 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void if (node) { switch_mutex_lock(node->mutex); - node->busy++; - node->ring_consumer_count = cbh->rowcount; + node->busy = switch_epoch_time_now(NULL) + 600; + node->ring_consumer_count = 1; switch_mutex_unlock(node->mutex); } else { goto end; @@ -1161,6 +1162,8 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void goto end; } + connected = 1; + channel = switch_core_session_get_channel(session); if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) { @@ -1209,7 +1212,7 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void if (node) { switch_mutex_lock(node->mutex); node->ring_consumer_count = 0; - if (node->busy) node->busy--; + node->busy = switch_epoch_time_now(NULL) + connected; switch_mutex_unlock(node->mutex); } @@ -1233,7 +1236,8 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj) switch_status_t status = SWITCH_STATUS_FALSE; switch_event_t *event = NULL; char *sql = NULL; - + int connected = 0; + switch_mutex_lock(globals.mutex); node = switch_core_hash_find(globals.fifo_hash, h->node_name); switch_mutex_unlock(globals.mutex); @@ -1241,7 +1245,7 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj) if (node) { switch_mutex_lock(node->mutex); node->ring_consumer_count++; - node->busy++; + node->busy = switch_epoch_time_now(NULL) + 600; switch_mutex_unlock(node->mutex); } @@ -1304,6 +1308,8 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj) goto end; } + connected = 1; + channel = switch_core_session_get_channel(session); if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) { @@ -1341,7 +1347,7 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj) if (node->ring_consumer_count-- < 0) { node->ring_consumer_count = 0; } - if (node->busy) node->busy--; + node->busy = switch_epoch_time_now(NULL) + connected; switch_mutex_unlock(node->mutex); } switch_core_destroy_memory_pool(&h->pool); @@ -1482,8 +1488,8 @@ static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *o for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) { switch_hash_this(hi, &var, NULL, &val); if ((node = (fifo_node_t *) val)) { - if (node->has_outbound && node->ready && !node->busy) { - switch_mutex_lock(node->mutex); + switch_mutex_lock(node->mutex); + if (node->has_outbound && node->ready && switch_epoch_time_now(NULL) > node->busy) { ppl_waiting = node_consumer_wait_count(node); consumer_total = node->consumer_count; idle_consumers = node_idle_consumers(node); @@ -1494,8 +1500,8 @@ static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *o if ((ppl_waiting - node->ring_consumer_count > 0) && (!consumer_total || !idle_consumers)) { find_consumers(node); } - switch_mutex_unlock(node->mutex); } + switch_mutex_unlock(node->mutex); } } switch_mutex_unlock(globals.mutex); @@ -1552,7 +1558,13 @@ static void check_ocancel(switch_core_session_t *session) static void check_cancel(fifo_node_t *node) { - int ppl_waiting = node_consumer_wait_count(node); + int ppl_waiting; + + if (node->outbound_strategy != NODE_STRATEGY_ENTERPRISE) { + return; + } + + ppl_waiting = node_consumer_wait_count(node); if (node->ring_consumer_count > 0 && ppl_waiting < 1) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Outbound call count (%d) exceeds required value for queue %s (%d), " @@ -1694,7 +1706,8 @@ SWITCH_STANDARD_API(fifo_add_outbound_function) SWITCH_STANDARD_APP(fifo_member_usage_function) { switch_channel_t *channel = switch_core_session_get_channel(session); - + char *sql; + if (zstr(data)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Invalid!\n"); return; @@ -1702,6 +1715,10 @@ SWITCH_STANDARD_APP(fifo_member_usage_function) switch_channel_set_variable(channel, "fifo_outbound_uuid", data); + sql = switch_mprintf("update fifo_outbound set next_avail=%ld + lag where uuid='%q'", (long) switch_epoch_time_now(NULL), data); + fifo_execute_sql(sql, globals.sql_mutex); + switch_safe_free(sql); + switch_core_event_hook_add_receive_message(session, messagehook); } @@ -3304,7 +3321,7 @@ static switch_status_t load_config(int reload, int del_all) switch_cache_db_test_reactive(dbh, "delete from fifo_bridge", "drop table fifo_bridge", bridge_sql); switch_cache_db_release_db_handle(&dbh); - fifo_execute_sql("update fifo_outbound set ring_count=0,use_count=0,outbound_call_count=0,outbound_fail_count=0", globals.sql_mutex); + fifo_execute_sql("update fifo_outbound set ring_count=0,use_count=0,outbound_call_count=0,outbound_fail_count=0 where static=0", globals.sql_mutex); if (reload) { switch_hash_index_t *hi;