fix race deleting queues at a bad time

This commit is contained in:
Anthony Minessale 2011-10-25 11:54:01 -05:00
parent 3c9551ee90
commit 77fc08d7a9
1 changed files with 108 additions and 91 deletions

View File

@ -37,7 +37,7 @@ SWITCH_MODULE_DEFINITION(mod_fifo, mod_fifo_load, mod_fifo_shutdown, NULL);
#define MANUAL_QUEUE_NAME "manual_calls" #define MANUAL_QUEUE_NAME "manual_calls"
#define FIFO_EVENT "fifo::info" #define FIFO_EVENT "fifo::info"
#define FIFO_DELAY_DESTROY 100
static switch_status_t load_config(int reload, int del_all); static switch_status_t load_config(int reload, int del_all);
#define MAX_PRI 10 #define MAX_PRI 10
@ -312,6 +312,7 @@ struct fifo_node {
int ring_timeout; int ring_timeout;
int default_lag; int default_lag;
char *domain_name; char *domain_name;
struct fifo_node *next;
}; };
typedef struct fifo_node fifo_node_t; typedef struct fifo_node fifo_node_t;
@ -564,6 +565,8 @@ static switch_status_t consumer_read_frame_callback(switch_core_session_t *sessi
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
struct fifo_node;
static struct { static struct {
switch_hash_t *caller_orig_hash; switch_hash_t *caller_orig_hash;
switch_hash_t *consumer_orig_hash; switch_hash_t *consumer_orig_hash;
@ -587,6 +590,7 @@ static struct {
int threads; int threads;
switch_thread_t *node_thread; switch_thread_t *node_thread;
int debug; int debug;
struct fifo_node *nodes;
} globals; } globals;
@ -856,6 +860,8 @@ static fifo_node_t *create_node(const char *name, uint32_t importance, switch_mu
switch_mutex_lock(globals.mutex); switch_mutex_lock(globals.mutex);
switch_core_hash_insert(globals.fifo_hash, name, node); switch_core_hash_insert(globals.fifo_hash, name, node);
node->next = globals.nodes;
globals.nodes = node;
switch_mutex_unlock(globals.mutex); switch_mutex_unlock(globals.mutex);
switch_safe_free(domain_name); switch_safe_free(domain_name);
@ -1213,6 +1219,7 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
switch_mutex_lock(globals.mutex); switch_mutex_lock(globals.mutex);
node = switch_core_hash_find(globals.fifo_hash, node_name); node = switch_core_hash_find(globals.fifo_hash, node_name);
switch_thread_rwlock_rdlock(node->rwlock);
switch_mutex_unlock(globals.mutex); switch_mutex_unlock(globals.mutex);
for (i = 0; i < cbh->rowcount; i++) { for (i = 0; i < cbh->rowcount; i++) {
@ -1498,6 +1505,7 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
node->ring_consumer_count = 0; node->ring_consumer_count = 0;
node->busy = 0; node->busy = 0;
switch_mutex_unlock(node->update_mutex); switch_mutex_unlock(node->update_mutex);
switch_thread_rwlock_unlock(node->rwlock);
} }
@ -1554,6 +1562,7 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
switch_mutex_lock(globals.mutex); switch_mutex_lock(globals.mutex);
node = switch_core_hash_find(globals.fifo_hash, h->node_name); node = switch_core_hash_find(globals.fifo_hash, h->node_name);
switch_thread_rwlock_rdlock(node->rwlock);
switch_mutex_unlock(globals.mutex); switch_mutex_unlock(globals.mutex);
if (node) { if (node) {
@ -1661,6 +1670,7 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
} }
node->busy = 0; node->busy = 0;
switch_mutex_unlock(node->update_mutex); switch_mutex_unlock(node->update_mutex);
switch_thread_rwlock_unlock(node->rwlock);
} }
switch_core_destroy_memory_pool(&h->pool); switch_core_destroy_memory_pool(&h->pool);
@ -1792,73 +1802,93 @@ static void find_consumers(fifo_node_t *node)
static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *obj) static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *obj)
{ {
fifo_node_t *node; fifo_node_t *node, *last, *this_node;
int cur_priority = 1; int cur_priority = 1;
globals.node_thread_running = 1; globals.node_thread_running = 1;
while (globals.node_thread_running == 1) { while (globals.node_thread_running == 1) {
switch_hash_index_t *hi;
void *val;
const void *var;
int ppl_waiting, consumer_total, idle_consumers, found = 0; int ppl_waiting, consumer_total, idle_consumers, found = 0;
switch_mutex_lock(globals.mutex); switch_mutex_lock(globals.mutex);
if (globals.debug) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Trying priority: %d\n", cur_priority); if (globals.debug) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Trying priority: %d\n", cur_priority);
last = NULL;
node = globals.nodes;
restart: while(node) {
int x = 0;
switch_event_t *pop;
int nuke = 0;
this_node = node;
node = node->next;
for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) { if (this_node->ready == 0) {
switch_hash_this(hi, &var, NULL, &val);
if ((node = (fifo_node_t *) val)) { for (x = 0; x < MAX_PRI; x++) {
int x = 0; while (fifo_queue_pop(this_node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
switch_event_t *pop; const char *caller_uuid = switch_event_get_header(pop, "unique-id");
switch_ivr_kill_uuid(caller_uuid, SWITCH_CAUSE_MANAGER_REQUEST);
if (node->ready == FIFO_DELAY_DESTROY) { switch_event_destroy(&pop);
if (switch_thread_rwlock_trywrlock(node->rwlock) == SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", node->name);
switch_core_hash_delete(globals.fifo_hash, node->name);
for (x = 0; x < MAX_PRI; x++) {
while (fifo_queue_pop(node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
switch_event_destroy(&pop);
}
}
node->ready = 0;
switch_core_hash_destroy(&node->consumer_hash);
switch_mutex_unlock(node->mutex);
switch_mutex_unlock(node->update_mutex);
switch_thread_rwlock_unlock(node->rwlock);
switch_core_destroy_memory_pool(&node->pool);
goto restart;
} }
} }
}
if (node->outbound_priority == 0) node->outbound_priority = 5;
if (node->has_outbound && node->ready && !node->busy && node->outbound_priority == cur_priority) {
ppl_waiting = node_caller_count(node);
consumer_total = node->consumer_count;
idle_consumers = node_idle_consumers(node);
if (globals.debug) { if (this_node->ready == 0 && switch_thread_rwlock_trywrlock(this_node->rwlock) == SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", this_node->name);
"%s waiting %d consumer_total %d idle_consumers %d ring_consumers %d pri %d\n",
node->name, ppl_waiting, consumer_total, idle_consumers, node->ring_consumer_count, node->outbound_priority); for (x = 0; x < MAX_PRI; x++) {
while (fifo_queue_pop(this_node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
switch_event_destroy(&pop);
} }
}
if (last) {
last->next = this_node->next;
} else {
globals.nodes = this_node->next;
}
switch_core_hash_destroy(&this_node->consumer_hash);
switch_mutex_unlock(this_node->mutex);
switch_mutex_unlock(this_node->update_mutex);
switch_thread_rwlock_unlock(this_node->rwlock);
switch_core_destroy_memory_pool(&this_node->pool);
nuke++;
}
last = this_node;
if (nuke) continue;
if (this_node->outbound_priority == 0) this_node->outbound_priority = 5;
globals.debug = 1;
if (this_node->has_outbound && !this_node->busy && this_node->outbound_priority == cur_priority) {
ppl_waiting = node_caller_count(this_node);
consumer_total = this_node->consumer_count;
idle_consumers = node_idle_consumers(this_node);
if (globals.debug) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,
"%s waiting %d consumer_total %d idle_consumers %d ring_consumers %d pri %d\n",
this_node->name, ppl_waiting, consumer_total, idle_consumers, this_node->ring_consumer_count, this_node->outbound_priority);
}
if ((ppl_waiting - node->ring_consumer_count > 0) && (!consumer_total || !idle_consumers)) { if ((ppl_waiting - this_node->ring_consumer_count > 0) && (!consumer_total || !idle_consumers)) {
found++; found++;
find_consumers(node); find_consumers(this_node);
switch_yield(1000000); switch_yield(1000000);
}
} }
} }
} }
if (++cur_priority > 10) { if (++cur_priority > 10) {
cur_priority = 1; cur_priority = 1;
@ -2001,7 +2031,9 @@ static void pres_event_handler(switch_event_t *event)
node->ready = 1; node->ready = 1;
} }
switch_thread_rwlock_rdlock(node->rwlock);
send_presence(node); send_presence(node);
switch_thread_rwlock_unlock(node->rwlock);
switch_mutex_unlock(globals.mutex); switch_mutex_unlock(globals.mutex);
@ -2013,6 +2045,7 @@ static uint32_t fifo_add_outbound(const char *node_name, const char *url, uint32
{ {
fifo_node_t *node; fifo_node_t *node;
switch_event_t *call_event; switch_event_t *call_event;
uint32_t i = 0;
if (priority >= MAX_PRI) { if (priority >= MAX_PRI) {
priority = MAX_PRI - 1; priority = MAX_PRI - 1;
@ -2026,6 +2059,8 @@ static uint32_t fifo_add_outbound(const char *node_name, const char *url, uint32
node = create_node(node_name, 0, globals.sql_mutex); node = create_node(node_name, 0, globals.sql_mutex);
} }
switch_thread_rwlock_rdlock(node->rwlock);
switch_mutex_unlock(globals.mutex); switch_mutex_unlock(globals.mutex);
switch_event_create(&call_event, SWITCH_EVENT_CHANNEL_DATA); switch_event_create(&call_event, SWITCH_EVENT_CHANNEL_DATA);
@ -2034,7 +2069,11 @@ static uint32_t fifo_add_outbound(const char *node_name, const char *url, uint32
fifo_queue_push(node->fifo_list[priority], call_event); fifo_queue_push(node->fifo_list[priority], call_event);
call_event = NULL; call_event = NULL;
return fifo_queue_size(node->fifo_list[priority]); i = fifo_queue_size(node->fifo_list[priority]);
switch_thread_rwlock_unlock(node->rwlock);
return i;
} }
@ -3190,7 +3229,7 @@ SWITCH_STANDARD_APP(fifo_function)
} }
switch_thread_rwlock_unlock(node->rwlock); switch_thread_rwlock_unlock(node->rwlock);
if (node->ready == 1 && do_destroy) { if (node->ready == 1 && do_destroy) {
node->ready = FIFO_DELAY_DESTROY; node->ready = 0;
} }
} }
switch_mutex_unlock(globals.mutex); switch_mutex_unlock(globals.mutex);
@ -4005,8 +4044,8 @@ static switch_status_t load_config(int reload, int del_all)
switch_mutex_lock(globals.mutex); switch_mutex_lock(globals.mutex);
for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) { for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) {
switch_hash_this(hi, NULL, NULL, &val); switch_hash_this(hi, NULL, NULL, &val);
if ((node = (fifo_node_t *) val) && node->is_static) { if ((node = (fifo_node_t *) val) && node->is_static && node->ready == 1) {
node->ready = 0; node->ready = -1;
} }
} }
switch_mutex_unlock(globals.mutex); switch_mutex_unlock(globals.mutex);
@ -4190,41 +4229,18 @@ static switch_status_t load_config(int reload, int del_all)
done: done:
if (reload) { if (reload) {
switch_hash_index_t *hi;
void *val;
switch_event_t *pop;
fifo_node_t *node; fifo_node_t *node;
switch_mutex_lock(globals.mutex); switch_mutex_lock(globals.mutex);
top: for (node = globals.nodes; node; node = node->next) {
for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) { if (node->ready == -1) {
int x = 0; switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s queued for removal\n", node->name);
switch_hash_this(hi, NULL, NULL, &val);
if (!(node = (fifo_node_t *) val) || !node->is_static || node->ready) {
continue;
}
if (node_caller_count(node) || node->consumer_count || node_idle_consumers(node)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s removal delayed, still in use.\n", node->name);
node->ready = FIFO_DELAY_DESTROY;
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", node->name);
switch_mutex_lock(node->update_mutex);
for (x = 0; x < MAX_PRI; x++) {
while (fifo_queue_pop(node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
switch_event_destroy(&pop);
}
}
switch_core_hash_delete(globals.fifo_hash, node->name); switch_core_hash_delete(globals.fifo_hash, node->name);
switch_core_hash_destroy(&node->consumer_hash); node->ready = 0;
switch_mutex_unlock(node->update_mutex);
switch_core_destroy_memory_pool(&node->pool);
goto top;
} }
} }
fifo_caller_del(NULL);
switch_mutex_unlock(globals.mutex); switch_mutex_unlock(globals.mutex);
fifo_caller_del(NULL);
} }
@ -4472,10 +4488,8 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_fifo_load)
*/ */
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown) SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown)
{ {
switch_hash_index_t *hi;
void *val;
switch_event_t *pop = NULL; switch_event_t *pop = NULL;
fifo_node_t *node; fifo_node_t *node, *this_node;
switch_mutex_t *mutex = globals.mutex; switch_mutex_t *mutex = globals.mutex;
switch_event_unbind(&globals.node); switch_event_unbind(&globals.node);
@ -4492,24 +4506,27 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown)
switch_cond_next(); switch_cond_next();
} }
node = globals.nodes;
while ((hi = switch_hash_first(NULL, globals.fifo_hash))) { while(node) {
int x = 0; int x = 0;
switch_hash_this(hi, NULL, NULL, &val);
node = (fifo_node_t *) val;
switch_mutex_lock(node->update_mutex); this_node = node;
switch_mutex_lock(node->mutex); node = node->next;
switch_mutex_lock(this_node->update_mutex);
switch_mutex_lock(this_node->mutex);
for (x = 0; x < MAX_PRI; x++) { for (x = 0; x < MAX_PRI; x++) {
while (fifo_queue_pop(node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) { while (fifo_queue_pop(this_node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
switch_event_destroy(&pop); switch_event_destroy(&pop);
} }
} }
switch_mutex_unlock(node->mutex); switch_mutex_unlock(this_node->mutex);
switch_core_hash_delete(globals.fifo_hash, node->name); switch_core_hash_delete(globals.fifo_hash, this_node->name);
switch_core_hash_destroy(&node->consumer_hash); switch_core_hash_destroy(&this_node->consumer_hash);
switch_mutex_unlock(node->update_mutex); switch_mutex_unlock(this_node->update_mutex);
switch_core_destroy_memory_pool(&node->pool); switch_core_destroy_memory_pool(&this_node->pool);
} }
switch_core_hash_destroy(&globals.fifo_hash); switch_core_hash_destroy(&globals.fifo_hash);