From 77fc08d7a99b347dc84702efde1cc35d47edae89 Mon Sep 17 00:00:00 2001 From: Anthony Minessale Date: Tue, 25 Oct 2011 11:54:01 -0500 Subject: [PATCH] fix race deleting queues at a bad time --- src/mod/applications/mod_fifo/mod_fifo.c | 199 ++++++++++++----------- 1 file changed, 108 insertions(+), 91 deletions(-) diff --git a/src/mod/applications/mod_fifo/mod_fifo.c b/src/mod/applications/mod_fifo/mod_fifo.c index f86c7686b8..e45684134a 100644 --- a/src/mod/applications/mod_fifo/mod_fifo.c +++ b/src/mod/applications/mod_fifo/mod_fifo.c @@ -37,7 +37,7 @@ SWITCH_MODULE_DEFINITION(mod_fifo, mod_fifo_load, mod_fifo_shutdown, NULL); #define MANUAL_QUEUE_NAME "manual_calls" #define FIFO_EVENT "fifo::info" -#define FIFO_DELAY_DESTROY 100 + static switch_status_t load_config(int reload, int del_all); #define MAX_PRI 10 @@ -312,6 +312,7 @@ struct fifo_node { int ring_timeout; int default_lag; char *domain_name; + struct fifo_node *next; }; typedef struct fifo_node fifo_node_t; @@ -564,6 +565,8 @@ static switch_status_t consumer_read_frame_callback(switch_core_session_t *sessi return SWITCH_STATUS_SUCCESS; } +struct fifo_node; + static struct { switch_hash_t *caller_orig_hash; switch_hash_t *consumer_orig_hash; @@ -587,6 +590,7 @@ static struct { int threads; switch_thread_t *node_thread; int debug; + struct fifo_node *nodes; } globals; @@ -856,6 +860,8 @@ static fifo_node_t *create_node(const char *name, uint32_t importance, switch_mu switch_mutex_lock(globals.mutex); switch_core_hash_insert(globals.fifo_hash, name, node); + node->next = globals.nodes; + globals.nodes = node; switch_mutex_unlock(globals.mutex); switch_safe_free(domain_name); @@ -1213,6 +1219,7 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void switch_mutex_lock(globals.mutex); node = switch_core_hash_find(globals.fifo_hash, node_name); + switch_thread_rwlock_rdlock(node->rwlock); switch_mutex_unlock(globals.mutex); for (i = 0; i < cbh->rowcount; i++) { @@ -1498,6 +1505,7 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void node->ring_consumer_count = 0; node->busy = 0; switch_mutex_unlock(node->update_mutex); + switch_thread_rwlock_unlock(node->rwlock); } @@ -1554,6 +1562,7 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj) switch_mutex_lock(globals.mutex); node = switch_core_hash_find(globals.fifo_hash, h->node_name); + switch_thread_rwlock_rdlock(node->rwlock); switch_mutex_unlock(globals.mutex); if (node) { @@ -1661,6 +1670,7 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj) } node->busy = 0; switch_mutex_unlock(node->update_mutex); + switch_thread_rwlock_unlock(node->rwlock); } switch_core_destroy_memory_pool(&h->pool); @@ -1792,73 +1802,93 @@ static void find_consumers(fifo_node_t *node) static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *obj) { - fifo_node_t *node; + fifo_node_t *node, *last, *this_node; int cur_priority = 1; globals.node_thread_running = 1; while (globals.node_thread_running == 1) { - switch_hash_index_t *hi; - void *val; - const void *var; int ppl_waiting, consumer_total, idle_consumers, found = 0; switch_mutex_lock(globals.mutex); if (globals.debug) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Trying priority: %d\n", cur_priority); + last = NULL; + node = globals.nodes; - restart: + while(node) { + int x = 0; + switch_event_t *pop; + int nuke = 0; + + this_node = node; + node = node->next; - for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) { - switch_hash_this(hi, &var, NULL, &val); - if ((node = (fifo_node_t *) val)) { - int x = 0; - switch_event_t *pop; - - if (node->ready == FIFO_DELAY_DESTROY) { - if (switch_thread_rwlock_trywrlock(node->rwlock) == SWITCH_STATUS_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", node->name); - switch_core_hash_delete(globals.fifo_hash, node->name); - - for (x = 0; x < MAX_PRI; x++) { - while (fifo_queue_pop(node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) { - switch_event_destroy(&pop); - } - } - - node->ready = 0; - switch_core_hash_destroy(&node->consumer_hash); - switch_mutex_unlock(node->mutex); - switch_mutex_unlock(node->update_mutex); - switch_thread_rwlock_unlock(node->rwlock); - switch_core_destroy_memory_pool(&node->pool); - goto restart; + if (this_node->ready == 0) { + + for (x = 0; x < MAX_PRI; x++) { + while (fifo_queue_pop(this_node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) { + const char *caller_uuid = switch_event_get_header(pop, "unique-id"); + switch_ivr_kill_uuid(caller_uuid, SWITCH_CAUSE_MANAGER_REQUEST); + switch_event_destroy(&pop); } } + } - if (node->outbound_priority == 0) node->outbound_priority = 5; - if (node->has_outbound && node->ready && !node->busy && node->outbound_priority == cur_priority) { - ppl_waiting = node_caller_count(node); - consumer_total = node->consumer_count; - idle_consumers = node_idle_consumers(node); - if (globals.debug) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, - "%s waiting %d consumer_total %d idle_consumers %d ring_consumers %d pri %d\n", - node->name, ppl_waiting, consumer_total, idle_consumers, node->ring_consumer_count, node->outbound_priority); + if (this_node->ready == 0 && switch_thread_rwlock_trywrlock(this_node->rwlock) == SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", this_node->name); + + for (x = 0; x < MAX_PRI; x++) { + while (fifo_queue_pop(this_node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) { + switch_event_destroy(&pop); } + } + + if (last) { + last->next = this_node->next; + } else { + globals.nodes = this_node->next; + } + + switch_core_hash_destroy(&this_node->consumer_hash); + switch_mutex_unlock(this_node->mutex); + switch_mutex_unlock(this_node->update_mutex); + switch_thread_rwlock_unlock(this_node->rwlock); + switch_core_destroy_memory_pool(&this_node->pool); + nuke++; + } + + last = this_node; + + if (nuke) continue; + + if (this_node->outbound_priority == 0) this_node->outbound_priority = 5; + + globals.debug = 1; + + if (this_node->has_outbound && !this_node->busy && this_node->outbound_priority == cur_priority) { + ppl_waiting = node_caller_count(this_node); + consumer_total = this_node->consumer_count; + idle_consumers = node_idle_consumers(this_node); + + if (globals.debug) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, + "%s waiting %d consumer_total %d idle_consumers %d ring_consumers %d pri %d\n", + this_node->name, ppl_waiting, consumer_total, idle_consumers, this_node->ring_consumer_count, this_node->outbound_priority); + } - if ((ppl_waiting - node->ring_consumer_count > 0) && (!consumer_total || !idle_consumers)) { - found++; - find_consumers(node); - switch_yield(1000000); - } + if ((ppl_waiting - this_node->ring_consumer_count > 0) && (!consumer_total || !idle_consumers)) { + found++; + find_consumers(this_node); + switch_yield(1000000); } } } + if (++cur_priority > 10) { cur_priority = 1; @@ -2001,7 +2031,9 @@ static void pres_event_handler(switch_event_t *event) node->ready = 1; } + switch_thread_rwlock_rdlock(node->rwlock); send_presence(node); + switch_thread_rwlock_unlock(node->rwlock); switch_mutex_unlock(globals.mutex); @@ -2013,6 +2045,7 @@ static uint32_t fifo_add_outbound(const char *node_name, const char *url, uint32 { fifo_node_t *node; switch_event_t *call_event; + uint32_t i = 0; if (priority >= MAX_PRI) { priority = MAX_PRI - 1; @@ -2026,6 +2059,8 @@ static uint32_t fifo_add_outbound(const char *node_name, const char *url, uint32 node = create_node(node_name, 0, globals.sql_mutex); } + switch_thread_rwlock_rdlock(node->rwlock); + switch_mutex_unlock(globals.mutex); switch_event_create(&call_event, SWITCH_EVENT_CHANNEL_DATA); @@ -2034,7 +2069,11 @@ static uint32_t fifo_add_outbound(const char *node_name, const char *url, uint32 fifo_queue_push(node->fifo_list[priority], call_event); call_event = NULL; - return fifo_queue_size(node->fifo_list[priority]); + i = fifo_queue_size(node->fifo_list[priority]); + + switch_thread_rwlock_unlock(node->rwlock); + + return i; } @@ -3190,7 +3229,7 @@ SWITCH_STANDARD_APP(fifo_function) } switch_thread_rwlock_unlock(node->rwlock); if (node->ready == 1 && do_destroy) { - node->ready = FIFO_DELAY_DESTROY; + node->ready = 0; } } switch_mutex_unlock(globals.mutex); @@ -4005,8 +4044,8 @@ static switch_status_t load_config(int reload, int del_all) switch_mutex_lock(globals.mutex); for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) { switch_hash_this(hi, NULL, NULL, &val); - if ((node = (fifo_node_t *) val) && node->is_static) { - node->ready = 0; + if ((node = (fifo_node_t *) val) && node->is_static && node->ready == 1) { + node->ready = -1; } } switch_mutex_unlock(globals.mutex); @@ -4190,41 +4229,18 @@ static switch_status_t load_config(int reload, int del_all) done: if (reload) { - switch_hash_index_t *hi; - void *val; - switch_event_t *pop; fifo_node_t *node; + switch_mutex_lock(globals.mutex); - top: - for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) { - int x = 0; - switch_hash_this(hi, NULL, NULL, &val); - if (!(node = (fifo_node_t *) val) || !node->is_static || node->ready) { - continue; - } - - if (node_caller_count(node) || node->consumer_count || node_idle_consumers(node)) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s removal delayed, still in use.\n", node->name); - node->ready = FIFO_DELAY_DESTROY; - } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", node->name); - switch_mutex_lock(node->update_mutex); - for (x = 0; x < MAX_PRI; x++) { - while (fifo_queue_pop(node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) { - switch_event_destroy(&pop); - } - } - + for (node = globals.nodes; node; node = node->next) { + if (node->ready == -1) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s queued for removal\n", node->name); switch_core_hash_delete(globals.fifo_hash, node->name); - switch_core_hash_destroy(&node->consumer_hash); - switch_mutex_unlock(node->update_mutex); - switch_core_destroy_memory_pool(&node->pool); - goto top; + node->ready = 0; } } - - fifo_caller_del(NULL); switch_mutex_unlock(globals.mutex); + fifo_caller_del(NULL); } @@ -4472,10 +4488,8 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_fifo_load) */ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown) { - switch_hash_index_t *hi; - void *val; switch_event_t *pop = NULL; - fifo_node_t *node; + fifo_node_t *node, *this_node; switch_mutex_t *mutex = globals.mutex; switch_event_unbind(&globals.node); @@ -4492,24 +4506,27 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown) switch_cond_next(); } + node = globals.nodes; - while ((hi = switch_hash_first(NULL, globals.fifo_hash))) { + while(node) { int x = 0; - switch_hash_this(hi, NULL, NULL, &val); - node = (fifo_node_t *) val; - switch_mutex_lock(node->update_mutex); - switch_mutex_lock(node->mutex); + this_node = node; + node = node->next; + + + switch_mutex_lock(this_node->update_mutex); + switch_mutex_lock(this_node->mutex); for (x = 0; x < MAX_PRI; x++) { - while (fifo_queue_pop(node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) { + while (fifo_queue_pop(this_node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) { switch_event_destroy(&pop); } } - switch_mutex_unlock(node->mutex); - switch_core_hash_delete(globals.fifo_hash, node->name); - switch_core_hash_destroy(&node->consumer_hash); - switch_mutex_unlock(node->update_mutex); - switch_core_destroy_memory_pool(&node->pool); + switch_mutex_unlock(this_node->mutex); + switch_core_hash_delete(globals.fifo_hash, this_node->name); + switch_core_hash_destroy(&this_node->consumer_hash); + switch_mutex_unlock(this_node->update_mutex); + switch_core_destroy_memory_pool(&this_node->pool); } switch_core_hash_destroy(&globals.fifo_hash);