From ab0a2bfa034aa535de9dae1cf7cbf03224ad2bf3 Mon Sep 17 00:00:00 2001 From: Anthony Minessale Date: Fri, 17 Dec 2010 17:08:24 -0600 Subject: [PATCH] prune --- src/mod/applications/mod_fifo/mod_fifo.c | 103 ++++++++++++----------- 1 file changed, 56 insertions(+), 47 deletions(-) diff --git a/src/mod/applications/mod_fifo/mod_fifo.c b/src/mod/applications/mod_fifo/mod_fifo.c index f275363706..9d9896956e 100644 --- a/src/mod/applications/mod_fifo/mod_fifo.c +++ b/src/mod/applications/mod_fifo/mod_fifo.c @@ -291,6 +291,7 @@ static switch_status_t fifo_queue_popfly(fifo_queue_t *queue, const char *uuid) struct fifo_node { char *name; switch_mutex_t *mutex; + switch_mutex_t *update_mutex; fifo_queue_t *fifo_list[MAX_PRI]; switch_hash_t *consumer_hash; int outbound_priority; @@ -801,6 +802,7 @@ static fifo_node_t *create_node(const char *name, uint32_t importance, switch_mu switch_core_hash_init(&node->consumer_hash, node->pool); switch_thread_rwlock_create(&node->rwlock, node->pool); switch_mutex_init(&node->mutex, SWITCH_MUTEX_NESTED, node->pool); + switch_mutex_init(&node->update_mutex, SWITCH_MUTEX_NESTED, node->pool); cbt.buf = outbound_count; cbt.len = sizeof(outbound_count); sql = switch_mprintf("select count(*) from fifo_outbound where fifo_name = '%q'", name); @@ -1193,10 +1195,10 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void if (node) { - switch_thread_rwlock_wrlock(node->rwlock); + switch_mutex_lock(node->update_mutex); node->busy = 0; node->ring_consumer_count = 1; - switch_thread_rwlock_unlock(node->rwlock); + switch_mutex_unlock(node->update_mutex); } else { goto end; } @@ -1437,10 +1439,10 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void cbh->ready = 1; if (node) { - switch_thread_rwlock_wrlock(node->rwlock); + switch_mutex_lock(node->update_mutex); node->ring_consumer_count = 0; node->busy = 0; - switch_thread_rwlock_unlock(node->rwlock); + switch_mutex_unlock(node->update_mutex); } @@ -1501,10 +1503,10 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj) switch_mutex_unlock(globals.mutex); if (node) { - switch_thread_rwlock_wrlock(node->rwlock); + switch_mutex_lock(node->update_mutex); node->ring_consumer_count++; node->busy = 0; - switch_thread_rwlock_unlock(node->rwlock); + switch_mutex_unlock(node->update_mutex); } switch_event_create(&ovars, SWITCH_EVENT_REQUEST_PARAMS); @@ -1601,12 +1603,12 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj) switch_event_destroy(&ovars); if (node) { - switch_thread_rwlock_wrlock(node->rwlock); + switch_mutex_lock(node->update_mutex); if (node->ring_consumer_count-- < 0) { node->ring_consumer_count = 0; } node->busy = 0; - switch_thread_rwlock_unlock(node->rwlock); + switch_mutex_unlock(node->update_mutex); } switch_core_destroy_memory_pool(&h->pool); @@ -1754,9 +1756,35 @@ static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *o if (globals.debug) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Trying priority: %d\n", cur_priority); + restart: + for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) { switch_hash_this(hi, &var, NULL, &val); if ((node = (fifo_node_t *) val)) { + + if (node->ready == FIFO_DELAY_DESTROY) { + int doit = 0; + + switch_mutex_lock(node->update_mutex); + doit = node->consumer_count == 0 && node_caller_count(node) == 0; + switch_mutex_unlock(node->update_mutex); + + if (doit) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", node->name); + switch_core_hash_delete(globals.fifo_hash, node->name); + + node->ready = 0; + switch_mutex_lock(node->mutex); + switch_core_hash_destroy(&node->consumer_hash); + switch_mutex_unlock(node->mutex); + switch_mutex_unlock(node->update_mutex); + switch_core_destroy_memory_pool(&node->pool); + goto restart; + } + + } + + if (node->outbound_priority == 0) node->outbound_priority = 5; if (node->has_outbound && node->ready && !node->busy && node->outbound_priority == cur_priority) { ppl_waiting = node_caller_count(node); @@ -2219,6 +2247,7 @@ SWITCH_STANDARD_APP(fifo_function) if (!(node = switch_core_hash_find(globals.fifo_hash, nlist[i]))) { node = create_node(nlist[i], importance, globals.sql_mutex); node->ready = 1; + switch_thread_rwlock_rdlock(node->rwlock); } node_list[node_count++] = node; } @@ -2306,7 +2335,7 @@ SWITCH_STANDARD_APP(fifo_function) switch_channel_answer(channel); - switch_thread_rwlock_wrlock(node->rwlock); + switch_mutex_lock(node->update_mutex); if ((pri = switch_channel_get_variable(channel, "fifo_priority"))) { p = atoi(pri); @@ -2344,7 +2373,7 @@ SWITCH_STANDARD_APP(fifo_function) switch_channel_set_variable(channel, "fifo_priority", tmp); } - switch_thread_rwlock_unlock(node->rwlock); + switch_mutex_unlock(node->update_mutex); ts = switch_micro_time_now(); switch_time_exp_lt(&tm, ts); @@ -2450,9 +2479,9 @@ SWITCH_STANDARD_APP(fifo_function) } switch_mutex_lock(globals.mutex); - switch_thread_rwlock_wrlock(node->rwlock); + switch_mutex_lock(node->update_mutex); node_remove_uuid(node, uuid); - switch_thread_rwlock_unlock(node->rwlock); + switch_mutex_unlock(node->update_mutex); send_presence(node); check_cancel(node); switch_mutex_unlock(globals.mutex); @@ -2682,9 +2711,9 @@ SWITCH_STANDARD_APP(fifo_function) } if (pop && !node_caller_count(node)) { - switch_thread_rwlock_wrlock(node->rwlock); + switch_mutex_lock(node->update_mutex); node->start_waiting = 0; - switch_thread_rwlock_unlock(node->rwlock); + switch_mutex_unlock(node->update_mutex); } } @@ -3074,29 +3103,9 @@ SWITCH_STANDARD_APP(fifo_function) done: - switch_mutex_lock(globals.mutex); - if (node && node->ready == FIFO_DELAY_DESTROY) { - int doit = 0; - - switch_thread_rwlock_wrlock(node->rwlock); - doit = node->consumer_count == 0 && node_caller_count(node) == 0; + if (node) { switch_thread_rwlock_unlock(node->rwlock); - - if (doit) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_NOTICE, "%s removed.\n", node->name); - switch_core_hash_delete(globals.fifo_hash, node->name); - - node->ready = 0; - switch_mutex_lock(node->mutex); - switch_core_hash_destroy(&node->consumer_hash); - switch_mutex_unlock(node->mutex); - switch_thread_rwlock_unlock(node->rwlock); - switch_core_destroy_memory_pool(&node->pool); - } - } - switch_mutex_unlock(globals.mutex); - switch_channel_clear_app_flag_key(FIFO_APP_KEY, channel, FIFO_APP_BRIDGE_TAG); @@ -3707,9 +3716,9 @@ SWITCH_STANDARD_API(fifo_api_function) switch_hash_this(hi, &var, NULL, &val); node = (fifo_node_t *) val; len = node_caller_count(node); - switch_thread_rwlock_wrlock(node->rwlock); + switch_mutex_lock(node->update_mutex); stream->write_function(stream, "%s:%d:%d:%d\n", (char *) var, node->consumer_count, node_caller_count(node), len); - switch_thread_rwlock_unlock(node->rwlock); + switch_mutex_unlock(node->update_mutex); x++; } @@ -3718,9 +3727,9 @@ SWITCH_STANDARD_API(fifo_api_function) } } else if ((node = switch_core_hash_find(globals.fifo_hash, argv[1]))) { len = node_caller_count(node); - switch_thread_rwlock_wrlock(node->rwlock); + switch_mutex_lock(node->update_mutex); stream->write_function(stream, "%s:%d:%d:%d\n", argv[1], node->consumer_count, node_caller_count(node), len); - switch_thread_rwlock_unlock(node->rwlock); + switch_mutex_unlock(node->update_mutex); } else { stream->write_function(stream, "none\n"); } @@ -3730,9 +3739,9 @@ SWITCH_STANDARD_API(fifo_api_function) switch_hash_this(hi, &var, NULL, &val); node = (fifo_node_t *) val; len = node_caller_count(node); - switch_thread_rwlock_wrlock(node->rwlock); + switch_mutex_lock(node->update_mutex); stream->write_function(stream, "%s:%d\n", (char *) var, node->has_outbound); - switch_thread_rwlock_unlock(node->rwlock); + switch_mutex_unlock(node->update_mutex); x++; } @@ -3741,9 +3750,9 @@ SWITCH_STANDARD_API(fifo_api_function) } } else if ((node = switch_core_hash_find(globals.fifo_hash, argv[1]))) { len = node_caller_count(node); - switch_thread_rwlock_wrlock(node->rwlock); + switch_mutex_lock(node->update_mutex); stream->write_function(stream, "%s:%d\n", argv[1], node->has_outbound); - switch_thread_rwlock_unlock(node->rwlock); + switch_mutex_unlock(node->update_mutex); } else { stream->write_function(stream, "none\n"); } @@ -4116,7 +4125,7 @@ static switch_status_t load_config(int reload, int del_all) node->ready = FIFO_DELAY_DESTROY; } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", node->name); - switch_thread_rwlock_wrlock(node->rwlock); + switch_mutex_lock(node->update_mutex); for (x = 0; x < MAX_PRI; x++) { while (fifo_queue_pop(node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) { switch_event_destroy(&pop); @@ -4125,7 +4134,7 @@ static switch_status_t load_config(int reload, int del_all) switch_core_hash_delete(globals.fifo_hash, node->name); switch_core_hash_destroy(&node->consumer_hash); - switch_thread_rwlock_unlock(node->rwlock); + switch_mutex_unlock(node->update_mutex); switch_core_destroy_memory_pool(&node->pool); goto top; } @@ -4401,7 +4410,7 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown) switch_hash_this(hi, NULL, NULL, &val); node = (fifo_node_t *) val; - switch_thread_rwlock_wrlock(node->rwlock); + switch_mutex_lock(node->update_mutex); switch_mutex_lock(node->mutex); for (x = 0; x < MAX_PRI; x++) { while (fifo_queue_pop(node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) { @@ -4411,7 +4420,7 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown) switch_mutex_unlock(node->mutex); switch_core_hash_delete(globals.fifo_hash, node->name); switch_core_hash_destroy(&node->consumer_hash); - switch_thread_rwlock_unlock(node->rwlock); + switch_mutex_unlock(node->update_mutex); switch_core_destroy_memory_pool(&node->pool); }