prune
This commit is contained in:
parent
257c7edaf7
commit
ab0a2bfa03
|
@ -291,6 +291,7 @@ static switch_status_t fifo_queue_popfly(fifo_queue_t *queue, const char *uuid)
|
||||||
struct fifo_node {
|
struct fifo_node {
|
||||||
char *name;
|
char *name;
|
||||||
switch_mutex_t *mutex;
|
switch_mutex_t *mutex;
|
||||||
|
switch_mutex_t *update_mutex;
|
||||||
fifo_queue_t *fifo_list[MAX_PRI];
|
fifo_queue_t *fifo_list[MAX_PRI];
|
||||||
switch_hash_t *consumer_hash;
|
switch_hash_t *consumer_hash;
|
||||||
int outbound_priority;
|
int outbound_priority;
|
||||||
|
@ -801,6 +802,7 @@ static fifo_node_t *create_node(const char *name, uint32_t importance, switch_mu
|
||||||
switch_core_hash_init(&node->consumer_hash, node->pool);
|
switch_core_hash_init(&node->consumer_hash, node->pool);
|
||||||
switch_thread_rwlock_create(&node->rwlock, node->pool);
|
switch_thread_rwlock_create(&node->rwlock, node->pool);
|
||||||
switch_mutex_init(&node->mutex, SWITCH_MUTEX_NESTED, node->pool);
|
switch_mutex_init(&node->mutex, SWITCH_MUTEX_NESTED, node->pool);
|
||||||
|
switch_mutex_init(&node->update_mutex, SWITCH_MUTEX_NESTED, node->pool);
|
||||||
cbt.buf = outbound_count;
|
cbt.buf = outbound_count;
|
||||||
cbt.len = sizeof(outbound_count);
|
cbt.len = sizeof(outbound_count);
|
||||||
sql = switch_mprintf("select count(*) from fifo_outbound where fifo_name = '%q'", name);
|
sql = switch_mprintf("select count(*) from fifo_outbound where fifo_name = '%q'", name);
|
||||||
|
@ -1193,10 +1195,10 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
|
||||||
|
|
||||||
|
|
||||||
if (node) {
|
if (node) {
|
||||||
switch_thread_rwlock_wrlock(node->rwlock);
|
switch_mutex_lock(node->update_mutex);
|
||||||
node->busy = 0;
|
node->busy = 0;
|
||||||
node->ring_consumer_count = 1;
|
node->ring_consumer_count = 1;
|
||||||
switch_thread_rwlock_unlock(node->rwlock);
|
switch_mutex_unlock(node->update_mutex);
|
||||||
} else {
|
} else {
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
@ -1437,10 +1439,10 @@ static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void
|
||||||
cbh->ready = 1;
|
cbh->ready = 1;
|
||||||
|
|
||||||
if (node) {
|
if (node) {
|
||||||
switch_thread_rwlock_wrlock(node->rwlock);
|
switch_mutex_lock(node->update_mutex);
|
||||||
node->ring_consumer_count = 0;
|
node->ring_consumer_count = 0;
|
||||||
node->busy = 0;
|
node->busy = 0;
|
||||||
switch_thread_rwlock_unlock(node->rwlock);
|
switch_mutex_unlock(node->update_mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1501,10 +1503,10 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
|
||||||
switch_mutex_unlock(globals.mutex);
|
switch_mutex_unlock(globals.mutex);
|
||||||
|
|
||||||
if (node) {
|
if (node) {
|
||||||
switch_thread_rwlock_wrlock(node->rwlock);
|
switch_mutex_lock(node->update_mutex);
|
||||||
node->ring_consumer_count++;
|
node->ring_consumer_count++;
|
||||||
node->busy = 0;
|
node->busy = 0;
|
||||||
switch_thread_rwlock_unlock(node->rwlock);
|
switch_mutex_unlock(node->update_mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
switch_event_create(&ovars, SWITCH_EVENT_REQUEST_PARAMS);
|
switch_event_create(&ovars, SWITCH_EVENT_REQUEST_PARAMS);
|
||||||
|
@ -1601,12 +1603,12 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
|
||||||
|
|
||||||
switch_event_destroy(&ovars);
|
switch_event_destroy(&ovars);
|
||||||
if (node) {
|
if (node) {
|
||||||
switch_thread_rwlock_wrlock(node->rwlock);
|
switch_mutex_lock(node->update_mutex);
|
||||||
if (node->ring_consumer_count-- < 0) {
|
if (node->ring_consumer_count-- < 0) {
|
||||||
node->ring_consumer_count = 0;
|
node->ring_consumer_count = 0;
|
||||||
}
|
}
|
||||||
node->busy = 0;
|
node->busy = 0;
|
||||||
switch_thread_rwlock_unlock(node->rwlock);
|
switch_mutex_unlock(node->update_mutex);
|
||||||
}
|
}
|
||||||
switch_core_destroy_memory_pool(&h->pool);
|
switch_core_destroy_memory_pool(&h->pool);
|
||||||
|
|
||||||
|
@ -1754,9 +1756,35 @@ static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *o
|
||||||
if (globals.debug) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Trying priority: %d\n", cur_priority);
|
if (globals.debug) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Trying priority: %d\n", cur_priority);
|
||||||
|
|
||||||
|
|
||||||
|
restart:
|
||||||
|
|
||||||
for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) {
|
for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) {
|
||||||
switch_hash_this(hi, &var, NULL, &val);
|
switch_hash_this(hi, &var, NULL, &val);
|
||||||
if ((node = (fifo_node_t *) val)) {
|
if ((node = (fifo_node_t *) val)) {
|
||||||
|
|
||||||
|
if (node->ready == FIFO_DELAY_DESTROY) {
|
||||||
|
int doit = 0;
|
||||||
|
|
||||||
|
switch_mutex_lock(node->update_mutex);
|
||||||
|
doit = node->consumer_count == 0 && node_caller_count(node) == 0;
|
||||||
|
switch_mutex_unlock(node->update_mutex);
|
||||||
|
|
||||||
|
if (doit) {
|
||||||
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", node->name);
|
||||||
|
switch_core_hash_delete(globals.fifo_hash, node->name);
|
||||||
|
|
||||||
|
node->ready = 0;
|
||||||
|
switch_mutex_lock(node->mutex);
|
||||||
|
switch_core_hash_destroy(&node->consumer_hash);
|
||||||
|
switch_mutex_unlock(node->mutex);
|
||||||
|
switch_mutex_unlock(node->update_mutex);
|
||||||
|
switch_core_destroy_memory_pool(&node->pool);
|
||||||
|
goto restart;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
if (node->outbound_priority == 0) node->outbound_priority = 5;
|
if (node->outbound_priority == 0) node->outbound_priority = 5;
|
||||||
if (node->has_outbound && node->ready && !node->busy && node->outbound_priority == cur_priority) {
|
if (node->has_outbound && node->ready && !node->busy && node->outbound_priority == cur_priority) {
|
||||||
ppl_waiting = node_caller_count(node);
|
ppl_waiting = node_caller_count(node);
|
||||||
|
@ -2219,6 +2247,7 @@ SWITCH_STANDARD_APP(fifo_function)
|
||||||
if (!(node = switch_core_hash_find(globals.fifo_hash, nlist[i]))) {
|
if (!(node = switch_core_hash_find(globals.fifo_hash, nlist[i]))) {
|
||||||
node = create_node(nlist[i], importance, globals.sql_mutex);
|
node = create_node(nlist[i], importance, globals.sql_mutex);
|
||||||
node->ready = 1;
|
node->ready = 1;
|
||||||
|
switch_thread_rwlock_rdlock(node->rwlock);
|
||||||
}
|
}
|
||||||
node_list[node_count++] = node;
|
node_list[node_count++] = node;
|
||||||
}
|
}
|
||||||
|
@ -2306,7 +2335,7 @@ SWITCH_STANDARD_APP(fifo_function)
|
||||||
|
|
||||||
switch_channel_answer(channel);
|
switch_channel_answer(channel);
|
||||||
|
|
||||||
switch_thread_rwlock_wrlock(node->rwlock);
|
switch_mutex_lock(node->update_mutex);
|
||||||
|
|
||||||
if ((pri = switch_channel_get_variable(channel, "fifo_priority"))) {
|
if ((pri = switch_channel_get_variable(channel, "fifo_priority"))) {
|
||||||
p = atoi(pri);
|
p = atoi(pri);
|
||||||
|
@ -2344,7 +2373,7 @@ SWITCH_STANDARD_APP(fifo_function)
|
||||||
switch_channel_set_variable(channel, "fifo_priority", tmp);
|
switch_channel_set_variable(channel, "fifo_priority", tmp);
|
||||||
}
|
}
|
||||||
|
|
||||||
switch_thread_rwlock_unlock(node->rwlock);
|
switch_mutex_unlock(node->update_mutex);
|
||||||
|
|
||||||
ts = switch_micro_time_now();
|
ts = switch_micro_time_now();
|
||||||
switch_time_exp_lt(&tm, ts);
|
switch_time_exp_lt(&tm, ts);
|
||||||
|
@ -2450,9 +2479,9 @@ SWITCH_STANDARD_APP(fifo_function)
|
||||||
}
|
}
|
||||||
|
|
||||||
switch_mutex_lock(globals.mutex);
|
switch_mutex_lock(globals.mutex);
|
||||||
switch_thread_rwlock_wrlock(node->rwlock);
|
switch_mutex_lock(node->update_mutex);
|
||||||
node_remove_uuid(node, uuid);
|
node_remove_uuid(node, uuid);
|
||||||
switch_thread_rwlock_unlock(node->rwlock);
|
switch_mutex_unlock(node->update_mutex);
|
||||||
send_presence(node);
|
send_presence(node);
|
||||||
check_cancel(node);
|
check_cancel(node);
|
||||||
switch_mutex_unlock(globals.mutex);
|
switch_mutex_unlock(globals.mutex);
|
||||||
|
@ -2682,9 +2711,9 @@ SWITCH_STANDARD_APP(fifo_function)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pop && !node_caller_count(node)) {
|
if (pop && !node_caller_count(node)) {
|
||||||
switch_thread_rwlock_wrlock(node->rwlock);
|
switch_mutex_lock(node->update_mutex);
|
||||||
node->start_waiting = 0;
|
node->start_waiting = 0;
|
||||||
switch_thread_rwlock_unlock(node->rwlock);
|
switch_mutex_unlock(node->update_mutex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3074,30 +3103,10 @@ SWITCH_STANDARD_APP(fifo_function)
|
||||||
|
|
||||||
done:
|
done:
|
||||||
|
|
||||||
switch_mutex_lock(globals.mutex);
|
if (node) {
|
||||||
if (node && node->ready == FIFO_DELAY_DESTROY) {
|
|
||||||
int doit = 0;
|
|
||||||
|
|
||||||
switch_thread_rwlock_wrlock(node->rwlock);
|
|
||||||
doit = node->consumer_count == 0 && node_caller_count(node) == 0;
|
|
||||||
switch_thread_rwlock_unlock(node->rwlock);
|
switch_thread_rwlock_unlock(node->rwlock);
|
||||||
|
|
||||||
if (doit) {
|
|
||||||
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_NOTICE, "%s removed.\n", node->name);
|
|
||||||
switch_core_hash_delete(globals.fifo_hash, node->name);
|
|
||||||
|
|
||||||
node->ready = 0;
|
|
||||||
switch_mutex_lock(node->mutex);
|
|
||||||
switch_core_hash_destroy(&node->consumer_hash);
|
|
||||||
switch_mutex_unlock(node->mutex);
|
|
||||||
switch_thread_rwlock_unlock(node->rwlock);
|
|
||||||
switch_core_destroy_memory_pool(&node->pool);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
|
||||||
switch_mutex_unlock(globals.mutex);
|
|
||||||
|
|
||||||
|
|
||||||
switch_channel_clear_app_flag_key(FIFO_APP_KEY, channel, FIFO_APP_BRIDGE_TAG);
|
switch_channel_clear_app_flag_key(FIFO_APP_KEY, channel, FIFO_APP_BRIDGE_TAG);
|
||||||
|
|
||||||
switch_core_media_bug_resume(session);
|
switch_core_media_bug_resume(session);
|
||||||
|
@ -3707,9 +3716,9 @@ SWITCH_STANDARD_API(fifo_api_function)
|
||||||
switch_hash_this(hi, &var, NULL, &val);
|
switch_hash_this(hi, &var, NULL, &val);
|
||||||
node = (fifo_node_t *) val;
|
node = (fifo_node_t *) val;
|
||||||
len = node_caller_count(node);
|
len = node_caller_count(node);
|
||||||
switch_thread_rwlock_wrlock(node->rwlock);
|
switch_mutex_lock(node->update_mutex);
|
||||||
stream->write_function(stream, "%s:%d:%d:%d\n", (char *) var, node->consumer_count, node_caller_count(node), len);
|
stream->write_function(stream, "%s:%d:%d:%d\n", (char *) var, node->consumer_count, node_caller_count(node), len);
|
||||||
switch_thread_rwlock_unlock(node->rwlock);
|
switch_mutex_unlock(node->update_mutex);
|
||||||
x++;
|
x++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3718,9 +3727,9 @@ SWITCH_STANDARD_API(fifo_api_function)
|
||||||
}
|
}
|
||||||
} else if ((node = switch_core_hash_find(globals.fifo_hash, argv[1]))) {
|
} else if ((node = switch_core_hash_find(globals.fifo_hash, argv[1]))) {
|
||||||
len = node_caller_count(node);
|
len = node_caller_count(node);
|
||||||
switch_thread_rwlock_wrlock(node->rwlock);
|
switch_mutex_lock(node->update_mutex);
|
||||||
stream->write_function(stream, "%s:%d:%d:%d\n", argv[1], node->consumer_count, node_caller_count(node), len);
|
stream->write_function(stream, "%s:%d:%d:%d\n", argv[1], node->consumer_count, node_caller_count(node), len);
|
||||||
switch_thread_rwlock_unlock(node->rwlock);
|
switch_mutex_unlock(node->update_mutex);
|
||||||
} else {
|
} else {
|
||||||
stream->write_function(stream, "none\n");
|
stream->write_function(stream, "none\n");
|
||||||
}
|
}
|
||||||
|
@ -3730,9 +3739,9 @@ SWITCH_STANDARD_API(fifo_api_function)
|
||||||
switch_hash_this(hi, &var, NULL, &val);
|
switch_hash_this(hi, &var, NULL, &val);
|
||||||
node = (fifo_node_t *) val;
|
node = (fifo_node_t *) val;
|
||||||
len = node_caller_count(node);
|
len = node_caller_count(node);
|
||||||
switch_thread_rwlock_wrlock(node->rwlock);
|
switch_mutex_lock(node->update_mutex);
|
||||||
stream->write_function(stream, "%s:%d\n", (char *) var, node->has_outbound);
|
stream->write_function(stream, "%s:%d\n", (char *) var, node->has_outbound);
|
||||||
switch_thread_rwlock_unlock(node->rwlock);
|
switch_mutex_unlock(node->update_mutex);
|
||||||
x++;
|
x++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3741,9 +3750,9 @@ SWITCH_STANDARD_API(fifo_api_function)
|
||||||
}
|
}
|
||||||
} else if ((node = switch_core_hash_find(globals.fifo_hash, argv[1]))) {
|
} else if ((node = switch_core_hash_find(globals.fifo_hash, argv[1]))) {
|
||||||
len = node_caller_count(node);
|
len = node_caller_count(node);
|
||||||
switch_thread_rwlock_wrlock(node->rwlock);
|
switch_mutex_lock(node->update_mutex);
|
||||||
stream->write_function(stream, "%s:%d\n", argv[1], node->has_outbound);
|
stream->write_function(stream, "%s:%d\n", argv[1], node->has_outbound);
|
||||||
switch_thread_rwlock_unlock(node->rwlock);
|
switch_mutex_unlock(node->update_mutex);
|
||||||
} else {
|
} else {
|
||||||
stream->write_function(stream, "none\n");
|
stream->write_function(stream, "none\n");
|
||||||
}
|
}
|
||||||
|
@ -4116,7 +4125,7 @@ static switch_status_t load_config(int reload, int del_all)
|
||||||
node->ready = FIFO_DELAY_DESTROY;
|
node->ready = FIFO_DELAY_DESTROY;
|
||||||
} else {
|
} else {
|
||||||
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", node->name);
|
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", node->name);
|
||||||
switch_thread_rwlock_wrlock(node->rwlock);
|
switch_mutex_lock(node->update_mutex);
|
||||||
for (x = 0; x < MAX_PRI; x++) {
|
for (x = 0; x < MAX_PRI; x++) {
|
||||||
while (fifo_queue_pop(node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
|
while (fifo_queue_pop(node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
|
||||||
switch_event_destroy(&pop);
|
switch_event_destroy(&pop);
|
||||||
|
@ -4125,7 +4134,7 @@ static switch_status_t load_config(int reload, int del_all)
|
||||||
|
|
||||||
switch_core_hash_delete(globals.fifo_hash, node->name);
|
switch_core_hash_delete(globals.fifo_hash, node->name);
|
||||||
switch_core_hash_destroy(&node->consumer_hash);
|
switch_core_hash_destroy(&node->consumer_hash);
|
||||||
switch_thread_rwlock_unlock(node->rwlock);
|
switch_mutex_unlock(node->update_mutex);
|
||||||
switch_core_destroy_memory_pool(&node->pool);
|
switch_core_destroy_memory_pool(&node->pool);
|
||||||
goto top;
|
goto top;
|
||||||
}
|
}
|
||||||
|
@ -4401,7 +4410,7 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown)
|
||||||
switch_hash_this(hi, NULL, NULL, &val);
|
switch_hash_this(hi, NULL, NULL, &val);
|
||||||
node = (fifo_node_t *) val;
|
node = (fifo_node_t *) val;
|
||||||
|
|
||||||
switch_thread_rwlock_wrlock(node->rwlock);
|
switch_mutex_lock(node->update_mutex);
|
||||||
switch_mutex_lock(node->mutex);
|
switch_mutex_lock(node->mutex);
|
||||||
for (x = 0; x < MAX_PRI; x++) {
|
for (x = 0; x < MAX_PRI; x++) {
|
||||||
while (fifo_queue_pop(node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
|
while (fifo_queue_pop(node->fifo_list[x], &pop, 2) == SWITCH_STATUS_SUCCESS) {
|
||||||
|
@ -4411,7 +4420,7 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown)
|
||||||
switch_mutex_unlock(node->mutex);
|
switch_mutex_unlock(node->mutex);
|
||||||
switch_core_hash_delete(globals.fifo_hash, node->name);
|
switch_core_hash_delete(globals.fifo_hash, node->name);
|
||||||
switch_core_hash_destroy(&node->consumer_hash);
|
switch_core_hash_destroy(&node->consumer_hash);
|
||||||
switch_thread_rwlock_unlock(node->rwlock);
|
switch_mutex_unlock(node->update_mutex);
|
||||||
switch_core_destroy_memory_pool(&node->pool);
|
switch_core_destroy_memory_pool(&node->pool);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue