revamp fifo with a ringall strategy so you can see the stupid callers' caller id ....

This commit is contained in:
Anthony Minessale 2010-06-30 21:46:38 -05:00
parent 5b11c81731
commit 7f9beb96cc

View File

@ -39,11 +39,203 @@ SWITCH_MODULE_DEFINITION(mod_fifo, mod_fifo_load, mod_fifo_shutdown, NULL);
static switch_status_t load_config(int reload, int del_all);
#define MAX_PRI 10
typedef enum {
NODE_STRATEGY_INVALID = -1,
NODE_STRATEGY_RINGALL = 0,
NODE_STRATEGY_ENTERPRISE
} outbound_strategy_t;
static outbound_strategy_t default_strategy = NODE_STRATEGY_ENTERPRISE;
typedef struct {
int nelm;
int idx;
switch_event_t **data;
switch_memory_pool_t *pool;
switch_mutex_t *mutex;
} fifo_queue_t;
switch_status_t fifo_queue_create(fifo_queue_t **queue, int size, switch_memory_pool_t *pool)
{
fifo_queue_t *q;
q = switch_core_alloc(pool, sizeof(*q));
q->pool = pool;
q->nelm = size - 1;
q->data = switch_core_alloc(pool, size * sizeof(switch_event_t *));
switch_mutex_init(&q->mutex, SWITCH_MUTEX_NESTED, pool);
*queue = q;
return SWITCH_STATUS_SUCCESS;
}
static void change_pos(switch_event_t *event, int pos)
{
const char *uuid = switch_event_get_header(event, "unique-id");
switch_core_session_t *session;
switch_channel_t *channel;
char tmp[30] = "";
if (zstr(uuid)) return;
if (!(session = switch_core_session_locate(uuid))) {
return;
}
channel = switch_core_session_get_channel(session);
switch_snprintf(tmp, sizeof(tmp), "%d", pos);
switch_channel_set_variable(channel, "fifo_position", tmp);
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "fifo_position", tmp);
switch_core_session_rwunlock(session);
}
static switch_status_t fifo_queue_push(fifo_queue_t *queue, switch_event_t *ptr)
{
switch_mutex_lock(queue->mutex);
if (queue->idx == queue->nelm) {
switch_mutex_unlock(queue->mutex);
return SWITCH_STATUS_FALSE;
}
queue->data[queue->idx++] = ptr;
switch_mutex_unlock(queue->mutex);
return SWITCH_STATUS_SUCCESS;
}
static int fifo_queue_size(fifo_queue_t *queue)
{
int s;
switch_mutex_lock(queue->mutex);
s = queue->idx;
switch_mutex_unlock(queue->mutex);
return s;
}
static switch_status_t fifo_queue_pop(fifo_queue_t *queue, switch_event_t **pop, switch_bool_t remove)
{
int i;
switch_mutex_lock(queue->mutex);
if (queue->idx == 0) {
switch_mutex_unlock(queue->mutex);
*pop = NULL;
return SWITCH_STATUS_FALSE;
}
*pop = queue->data[0];
if (remove) {
for (i = 1; i < queue->idx; i++) {
queue->data[i-1] = queue->data[i];
queue->data[i] = NULL;
change_pos(queue->data[i-1], i);
}
queue->idx--;
}
switch_mutex_unlock(queue->mutex);
return SWITCH_STATUS_SUCCESS;
}
static switch_status_t fifo_queue_pop_nameval(fifo_queue_t *queue, const char *name, const char *val, switch_event_t **pop, switch_bool_t remove)
{
int i, j;
switch_mutex_lock(queue->mutex);
if (queue->idx == 0 || zstr(name) || zstr(val)) {
switch_mutex_unlock(queue->mutex);
return SWITCH_STATUS_FALSE;
}
for (j = 0; j < queue->idx; j++) {
const char *j_val = switch_event_get_header(queue->data[j], name);
if (j_val && val && !strcmp(j_val, val)) {
*pop = queue->data[j];
break;
}
}
if (j == queue->idx) {
switch_mutex_unlock(queue->mutex);
return SWITCH_STATUS_FALSE;
}
if (remove) {
for (i = j+1; i < queue->idx; i++) {
queue->data[i-1] = queue->data[i];
queue->data[i] = NULL;
change_pos(queue->data[i-1], i);
}
queue->idx--;
}
switch_mutex_unlock(queue->mutex);
return SWITCH_STATUS_SUCCESS;
}
static switch_status_t fifo_queue_popfly(fifo_queue_t *queue, const char *uuid)
{
int i, j;
switch_mutex_lock(queue->mutex);
if (queue->idx == 0 || zstr(uuid)) {
switch_mutex_unlock(queue->mutex);
return SWITCH_STATUS_FALSE;
}
for (j = 0; j < queue->idx; j++) {
const char *j_uuid = switch_event_get_header(queue->data[j], "unique-id");
if (j_uuid && !strcmp(j_uuid, uuid)) break;
}
if (j == queue->idx) {
switch_mutex_unlock(queue->mutex);
return SWITCH_STATUS_FALSE;
}
for (i = j+1; i < queue->idx; i++) {
queue->data[i-1] = queue->data[i];
queue->data[i] = NULL;
change_pos(queue->data[i-1], i);
}
queue->idx--;
switch_mutex_unlock(queue->mutex);
return SWITCH_STATUS_SUCCESS;
}
struct fifo_node {
char *name;
switch_mutex_t *mutex;
switch_queue_t *fifo_list[MAX_PRI];
switch_hash_t *caller_hash;
fifo_queue_t *fifo_list[MAX_PRI];
switch_hash_t *consumer_hash;
int caller_count;
int consumer_count;
@ -54,7 +246,9 @@ struct fifo_node {
switch_memory_pool_t *pool;
int has_outbound;
int ready;
int busy;
int is_static;
outbound_strategy_t outbound_strategy;
};
typedef struct fifo_node fifo_node_t;
@ -66,6 +260,33 @@ struct callback {
};
typedef struct callback callback_t;
static const char *strat_parse(outbound_strategy_t s)
{
switch (s) {
case NODE_STRATEGY_RINGALL:
return "ringall";
case NODE_STRATEGY_ENTERPRISE:
return "enterprise";
default:
break;
}
return "invalid";
}
static outbound_strategy_t parse_strat(const char *name)
{
if (!strcasecmp(name, "ringall")) {
return NODE_STRATEGY_RINGALL;
}
if (!strcasecmp(name, "enterprise")) {
return NODE_STRATEGY_ENTERPRISE;
}
return NODE_STRATEGY_INVALID;
}
static int sql2str_callback(void *pArg, int argc, char **argv, char **columnNames)
{
callback_t *cbt = (callback_t *) pArg;
@ -161,7 +382,7 @@ static int node_consumer_wait_count(fifo_node_t *node)
int i, len = 0;
for (i = 0; i < MAX_PRI; i++) {
len += switch_queue_size(node->fifo_list[i]);
len += fifo_queue_size(node->fifo_list[i]);
}
return len;
@ -169,24 +390,10 @@ static int node_consumer_wait_count(fifo_node_t *node)
static void node_remove_uuid(fifo_node_t *node, const char *uuid)
{
int i, len = 0, done = 0;
void *pop = NULL;
int i = 0;
for (i = 0; i < MAX_PRI; i++) {
if (!(len = switch_queue_size(node->fifo_list[i]))) {
continue;
}
while (len) {
if (switch_queue_trypop(node->fifo_list[i], &pop) == SWITCH_STATUS_SUCCESS && pop) {
if (!done && !strcmp((char *) pop, uuid)) {
free(pop);
done++;
} else {
switch_queue_push(node->fifo_list[i], pop);
}
}
len--;
}
fifo_queue_popfly(node->fifo_list[i], uuid);
}
if (!node_consumer_wait_count(node)) {
@ -265,7 +472,7 @@ static switch_status_t consumer_read_frame_callback(switch_core_session_t *sessi
break;
}
for (x = 0; x < MAX_PRI; x++) {
total += switch_queue_size(node->fifo_list[x]);
total += fifo_queue_size(node->fifo_list[x]);
}
}
@ -303,13 +510,15 @@ switch_cache_db_handle_t *fifo_get_db_handle(void)
options.odbc_options.user = globals.odbc_user;
options.odbc_options.pass = globals.odbc_pass;
if (switch_cache_db_get_db_handle(&dbh, SCDB_TYPE_ODBC, &options) != SWITCH_STATUS_SUCCESS)
if (switch_cache_db_get_db_handle(&dbh, SCDB_TYPE_ODBC, &options) != SWITCH_STATUS_SUCCESS) {
dbh = NULL;
}
return dbh;
} else {
options.core_db_options.db_path = globals.dbname;
if (switch_cache_db_get_db_handle(&dbh, SCDB_TYPE_CORE_DB, &options) != SWITCH_STATUS_SUCCESS)
if (switch_cache_db_get_db_handle(&dbh, SCDB_TYPE_CORE_DB, &options) != SWITCH_STATUS_SUCCESS) {
dbh = NULL;
}
return dbh;
}
}
@ -392,13 +601,13 @@ static fifo_node_t *create_node(const char *name, uint32_t importance, switch_mu
node = switch_core_alloc(pool, sizeof(*node));
node->pool = pool;
node->outbound_strategy = default_strategy;
node->name = switch_core_strdup(node->pool, name);
for (x = 0; x < MAX_PRI; x++) {
switch_queue_create(&node->fifo_list[x], SWITCH_CORE_QUEUE_LEN, node->pool);
fifo_queue_create(&node->fifo_list[x], SWITCH_CORE_QUEUE_LEN, node->pool);
switch_assert(node->fifo_list[x]);
}
switch_core_hash_init(&node->caller_hash, node->pool);
switch_core_hash_init(&node->consumer_hash, node->pool);
switch_thread_rwlock_create(&node->rwlock, node->pool);
switch_mutex_init(&node->mutex, SWITCH_MUTEX_NESTED, node->pool);
@ -471,6 +680,179 @@ struct call_helper {
switch_memory_pool_t *pool;
};
#define MAX_ROWS 2048
struct callback_helper {
int need;
switch_memory_pool_t *pool;
struct call_helper *rows[MAX_ROWS];
int rowcount;
};
static void *SWITCH_THREAD_FUNC ringall_thread_run(switch_thread_t *thread, void *obj)
{
struct callback_helper *cbh = (struct callback_helper *) obj;
char *node_name;
int i = 0;
int timeout = 0;
char sql[256] = "";
switch_stream_handle_t stream = { 0 };
fifo_node_t *node = NULL;
char *originate_string;
switch_event_t *ovars = NULL;
switch_status_t status;
switch_core_session_t *session = NULL;
switch_call_cause_t cause = SWITCH_CAUSE_NONE;
char *app_name = NULL, *arg = NULL;
switch_caller_extension_t *extension = NULL;
switch_channel_t *channel;
char *cid_name = NULL, *cid_num = NULL, *id = NULL;
switch_event_t *pop = NULL;
fifo_queue_t *q = NULL;
int x = 0;
if (!cbh->rowcount) {
goto end;
}
node_name = cbh->rows[0]->node_name;
switch_mutex_lock(globals.mutex);
node = switch_core_hash_find(globals.fifo_hash, node_name);
switch_mutex_unlock(globals.mutex);
if (node) {
switch_mutex_lock(node->mutex);
node->busy = 1;
node->ring_consumer_count++;
switch_mutex_unlock(node->mutex);
} else {
goto end;
}
SWITCH_STANDARD_STREAM(stream);
switch_event_create(&ovars, SWITCH_EVENT_REQUEST_PARAMS);
switch_assert(ovars);
for (i = 0; i < cbh->rowcount; i++) {
struct call_helper *h = cbh->rows[i];
char *parsed = NULL;
switch_event_create_brackets(h->originate_string, '{', '}', ',', &ovars, &parsed);
if (!h->timeout) h->timeout = 60;
if (timeout < h->timeout) timeout = h->timeout;
stream.write_function(&stream, "[leg_timeout=%d,fifo_outbound_uuid=%s]%s,", h->timeout, h->uuid, parsed ? parsed : h->originate_string);
switch_safe_free(parsed);
switch_snprintf(sql, sizeof(sql), "update fifo_outbound set use_count=use_count+1 where uuid='%s'", h->uuid);
fifo_execute_sql(sql, globals.sql_mutex);
}
originate_string = (char *) stream.data;
if (originate_string) {
end_of(originate_string) = '\0';
}
if (!timeout) timeout = 60;
for (x = 0; x < MAX_PRI; x++) {
q = node->fifo_list[x];
switch_mutex_lock(q->mutex);
if (fifo_queue_pop_nameval(q, "variable_fifo_vip", "true", &pop, SWITCH_FALSE) == SWITCH_STATUS_SUCCESS && pop) {
goto found;
}
switch_mutex_unlock(q->mutex);
q = NULL;
}
if (!pop) {
for (x = 0; x < MAX_PRI; x++) {
q = node->fifo_list[x];
switch_mutex_lock(q->mutex);
if (fifo_queue_pop(node->fifo_list[x], &pop, SWITCH_FALSE) == SWITCH_STATUS_SUCCESS && pop) {
goto found;
}
}
switch_mutex_unlock(q->mutex);
q = NULL;
}
found:
if (!q) goto end;
if (!pop) {
if (q) switch_mutex_unlock(q->mutex);
goto end;
}
if ((cid_name = switch_event_get_header(pop, "caller-caller-id-name"))) {
switch_event_add_header_string(ovars, SWITCH_STACK_BOTTOM, "origination_caller_id_name", cid_name);
}
if ((cid_num = switch_event_get_header(pop, "caller-caller-id-number"))) {
switch_event_add_header_string(ovars, SWITCH_STACK_BOTTOM, "origination_caller_id_number", cid_num);
}
if ((id = switch_event_get_header(pop, "unique-id"))) {
switch_event_add_header_string(ovars, SWITCH_STACK_BOTTOM, "fifo_bridge_uuid", id);
}
switch_mutex_unlock(q->mutex);
status = switch_ivr_originate(NULL, &session, &cause, originate_string, timeout, NULL, NULL, NULL, NULL, ovars, SOF_NONE, NULL);
free(originate_string);
if (status != SWITCH_STATUS_SUCCESS) {
for (i = 0; i < cbh->rowcount; i++) {
struct call_helper *h = cbh->rows[i];
switch_snprintf(sql, sizeof(sql),
"update fifo_outbound set use_count=use_count-1, outbound_fail_count=outbound_fail_count+1, next_avail=%ld + lag where uuid='%s'",
(long) switch_epoch_time_now(NULL), h->uuid);
fifo_execute_sql(sql, globals.sql_mutex);
}
goto end;
}
channel = switch_core_session_get_channel(session);
switch_channel_set_variable(channel, "fifo_pop_order", NULL);
switch_core_event_hook_add_state_change(session, hanguphook);
app_name = "fifo";
arg = switch_core_session_sprintf(session, "%s out wait", node_name);
extension = switch_caller_extension_new(session, app_name, arg);
switch_caller_extension_add_application(session, extension, app_name, arg);
switch_channel_set_caller_extension(channel, extension);
switch_channel_set_state(channel, CS_EXECUTE);
switch_core_session_rwunlock(session);
end:
switch_event_destroy(&ovars);
if (node) {
switch_mutex_lock(node->mutex);
if (node->ring_consumer_count-- < 0) {
node->ring_consumer_count = 0;
}
node->busy = 0;
switch_mutex_unlock(node->mutex);
}
switch_core_destroy_memory_pool(&cbh->pool);
return NULL;
}
static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
{
struct call_helper *h = (struct call_helper *) obj;
@ -565,7 +947,29 @@ static void *SWITCH_THREAD_FUNC o_thread_run(switch_thread_t *thread, void *obj)
return NULL;
}
static int place_call_callback(void *pArg, int argc, char **argv, char **columnNames)
static int place_call_ringall_callback(void *pArg, int argc, char **argv, char **columnNames)
{
struct callback_helper *cbh = (struct callback_helper *) pArg;
struct call_helper *h;
h = switch_core_alloc(cbh->pool, sizeof(*h));
h->pool = cbh->pool;
h->uuid = switch_core_strdup(h->pool, argv[0]);
h->node_name = switch_core_strdup(h->pool, argv[1]);
h->originate_string = switch_core_strdup(h->pool, argv[2]);
h->timeout = atoi(argv[5]);
cbh->rows[cbh->rowcount++] = h;
cbh->need--;
if (cbh->rowcount == MAX_ROWS) return -1;
return cbh->need ? 0 : -1;
}
static int place_call_enterprise_callback(void *pArg, int argc, char **argv, char **columnNames)
{
int *need = (int *) pArg;
@ -596,18 +1000,50 @@ static int place_call_callback(void *pArg, int argc, char **argv, char **columnN
static void find_consumers(fifo_node_t *node)
{
int need = node_consumer_wait_count(node);
char *sql;
sql = switch_mprintf("select uuid, fifo_name, originate_string, simo_count, use_count, timeout, lag, "
"next_avail, expires, static, outbound_call_count, outbound_fail_count, hostname "
"from fifo_outbound where taking_calls = 1 and "
"(fifo_name = '%q') and (use_count < simo_count) and (next_avail = 0 or next_avail <= %ld) "
"order by next_avail", node->name, (long) switch_epoch_time_now(NULL));
switch_assert(sql);
fifo_execute_sql_callback(globals.sql_mutex, sql, place_call_callback, &need);
free(sql);
switch(node->outbound_strategy) {
case NODE_STRATEGY_ENTERPRISE:
{
int need = node_consumer_wait_count(node);
fifo_execute_sql_callback(globals.sql_mutex, sql, place_call_enterprise_callback, &need);
}
break;
case NODE_STRATEGY_RINGALL:
{
switch_thread_t *thread;
switch_threadattr_t *thd_attr = NULL;
struct callback_helper *cbh;
switch_memory_pool_t *pool;
switch_core_new_memory_pool(&pool);
cbh = switch_core_alloc(pool, sizeof(*cbh));
cbh->pool = pool;
cbh->need = node_consumer_wait_count(node);
fifo_execute_sql_callback(globals.sql_mutex, sql, place_call_ringall_callback, cbh);
switch_threadattr_create(&thd_attr, cbh->pool);
switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_thread_create(&thread, thd_attr, ringall_thread_run, cbh, cbh->pool);
}
break;
default:
break;
}
switch_safe_free(sql);
}
static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *obj)
@ -626,7 +1062,7 @@ static void *SWITCH_THREAD_FUNC node_thread_run(switch_thread_t *thread, void *o
for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) {
switch_hash_this(hi, &var, NULL, &val);
if ((node = (fifo_node_t *) val)) {
if (node->has_outbound && node->ready) {
if (node->has_outbound && node->ready && !node->busy) {
switch_mutex_lock(node->mutex);
ppl_waiting = node_consumer_wait_count(node);
consumer_total = node->consumer_count;
@ -759,7 +1195,7 @@ static void pres_event_handler(switch_event_t *event)
static uint32_t fifo_add_outbound(const char *node_name, const char *url, uint32_t priority)
{
fifo_node_t *node;
char *str;
switch_event_t *call_event;
if (priority >= MAX_PRI) {
priority = MAX_PRI - 1;
@ -773,10 +1209,13 @@ static uint32_t fifo_add_outbound(const char *node_name, const char *url, uint32
switch_mutex_unlock(globals.mutex);
str = switch_mprintf("dial:%s", url);
switch_queue_push(node->fifo_list[priority], (void *) str);
switch_event_create(&call_event, SWITCH_EVENT_CHANNEL_DATA);
switch_event_add_header_string(call_event, SWITCH_STACK_BOTTOM, "dial-url", url);
fifo_queue_push(node->fifo_list[priority], call_event);
call_event = NULL;
return switch_queue_size(node->fifo_list[priority]);
return fifo_queue_size(node->fifo_list[priority]);
}
@ -970,6 +1409,7 @@ SWITCH_STANDARD_APP(fifo_function)
int freq = 30;
int ftmp = 0;
int to = 60;
switch_event_t *call_event;
if (orbit_exten) {
char *ot;
@ -998,8 +1438,6 @@ SWITCH_STANDARD_APP(fifo_function)
switch_mutex_lock(node->mutex);
node->caller_count++;
switch_core_hash_insert(node->caller_hash, uuid, session);
if ((pri = switch_channel_get_variable(channel, "fifo_priority"))) {
p = atoi(pri);
}
@ -1020,8 +1458,13 @@ SWITCH_STANDARD_APP(fifo_function)
switch_event_fire(&event);
}
switch_queue_push(node->fifo_list[p], (void *) strdup(uuid));
switch_snprintf(tmp, sizeof(tmp), "%d", switch_queue_size(node->fifo_list[p]));
switch_event_create(&call_event, SWITCH_EVENT_CHANNEL_DATA);
switch_channel_event_set_data(channel, call_event);
fifo_queue_push(node->fifo_list[p], call_event);
call_event = NULL;
switch_snprintf(tmp, sizeof(tmp), "%d", fifo_queue_size(node->fifo_list[p]));
switch_channel_set_variable(channel, "fifo_position", tmp);
if (!pri) {
@ -1136,7 +1579,6 @@ SWITCH_STANDARD_APP(fifo_function)
switch_mutex_lock(node->mutex);
node_remove_uuid(node, uuid);
node->caller_count--;
switch_core_hash_delete(node->caller_hash, uuid);
switch_mutex_unlock(node->mutex);
send_presence(node);
check_cancel(node);
@ -1154,10 +1596,9 @@ SWITCH_STANDARD_APP(fifo_function)
goto done;
} else { /* consumer */
void *pop = NULL;
switch_event_t *pop = NULL;
switch_frame_t *read_frame;
switch_status_t status;
char *uuid;
switch_core_session_t *other_session;
switch_input_args_t args = { 0 };
const char *pop_order = NULL;
@ -1173,8 +1614,10 @@ SWITCH_STANDARD_APP(fifo_function)
char buf[5] = "";
const char *strat_str = switch_channel_get_variable(channel, "fifo_strategy");
fifo_strategy_t strat = STRAT_WAITING_LONGER;
char *url = NULL;
const char *url = NULL;
const char *caller_uuid = NULL;
switch_event_t *call_event;
if (!zstr(strat_str)) {
if (!strcasecmp(strat_str, "more_ppl")) {
strat = STRAT_MORE_PPL;
@ -1247,7 +1690,7 @@ SWITCH_STANDARD_APP(fifo_function)
int x = 0, winner = -1;
switch_time_t longest = (0xFFFFFFFFFFFFFFFFULL / 2);
uint32_t importance = 0, waiting = 0, most_waiting = 0;
pop = NULL;
if (moh && do_wait) {
@ -1296,20 +1739,50 @@ SWITCH_STANDARD_APP(fifo_function)
}
if (node) {
if (custom_pop) {
const char *varval;
if ((varval = switch_channel_get_variable(channel, "fifo_bridge_uuid"))) {
for (x = 0; x < MAX_PRI; x++) {
if (switch_queue_trypop(node->fifo_list[pop_array[x]], &pop) == SWITCH_STATUS_SUCCESS && pop) {
if (fifo_queue_pop_nameval(node->fifo_list[pop_array[x]], "unique-id", varval, &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) {
break;
}
}
} else {
}
if (!pop && (varval = switch_channel_get_variable(channel, "fifo_target_skill"))) {
for (x = 0; x < MAX_PRI; x++) {
if (switch_queue_trypop(node->fifo_list[x], &pop) == SWITCH_STATUS_SUCCESS && pop) {
if (fifo_queue_pop_nameval(node->fifo_list[pop_array[x]], "variable_fifo_skill",
varval, &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) {
break;
}
}
}
if (!pop) {
for (x = 0; x < MAX_PRI; x++) {
if (fifo_queue_pop_nameval(node->fifo_list[pop_array[x]], "variable_fifo_vip", "true",
&pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) {
break;
}
}
}
if (!pop) {
if (custom_pop) {
for (x = 0; x < MAX_PRI; x++) {
if (fifo_queue_pop(node->fifo_list[pop_array[x]], &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) {
break;
}
}
} else {
for (x = 0; x < MAX_PRI; x++) {
if (fifo_queue_pop(node->fifo_list[x], &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS && pop) {
break;
}
}
}
}
if (pop && !node_consumer_wait_count(node)) {
switch_mutex_lock(node->mutex);
node->start_waiting = 0;
@ -1331,14 +1804,16 @@ SWITCH_STANDARD_APP(fifo_function)
continue;
}
uuid = (char *) pop;
call_event = (switch_event_t *) pop;
pop = NULL;
url = switch_event_get_header(call_event, "dial-url");
caller_uuid = switch_event_get_header(call_event, "unique-id");
if (!strncasecmp(uuid, "dial:", 5)) {
if (url) {
switch_call_cause_t cause = SWITCH_CAUSE_NONE;
const char *o_announce = NULL;
url = uuid + 5;
if ((o_announce = switch_channel_get_variable(channel, "fifo_outbound_announce"))) {
switch_ivr_play_file(session, NULL, o_announce, NULL);
}
@ -1367,12 +1842,11 @@ SWITCH_STANDARD_APP(fifo_function)
switch_event_fire(&event);
}
url = NULL;
free(uuid);
uuid = strdup(switch_core_session_get_uuid(other_session));
caller_uuid = switch_core_session_strdup(session, switch_core_session_get_uuid(other_session));
}
} else {
if ((other_session = switch_core_session_locate(uuid))) {
if ((other_session = switch_core_session_locate(caller_uuid))) {
switch_channel_t *other_channel = switch_core_session_get_channel(other_session);
if (switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, FIFO_EVENT) == SWITCH_STATUS_SUCCESS) {
switch_channel_event_set_data(other_channel, event);
@ -1427,7 +1901,6 @@ SWITCH_STANDARD_APP(fifo_function)
switch_mutex_lock(node->mutex);
node->caller_count--;
switch_core_hash_delete(node->caller_hash, uuid);
switch_mutex_unlock(node->mutex);
send_presence(node);
check_cancel(node);
@ -1460,7 +1933,7 @@ SWITCH_STANDARD_APP(fifo_function)
switch_time_exp_lt(&tm, ts);
switch_strftime_nocheck(date, &retsize, sizeof(date), "%Y-%m-%d %T", &tm);
switch_channel_set_variable(channel, "fifo_status", "TALKING");
switch_channel_set_variable(channel, "fifo_target", uuid);
switch_channel_set_variable(channel, "fifo_target", caller_uuid);
switch_channel_set_variable(channel, "fifo_timestamp", date);
switch_channel_set_variable(other_channel, "fifo_status", "TALKING");
@ -1512,12 +1985,13 @@ SWITCH_STANDARD_APP(fifo_function)
switch_mutex_lock(node->mutex);
node->caller_count--;
switch_core_hash_delete(node->caller_hash, uuid);
switch_mutex_unlock(node->mutex);
send_presence(node);
check_cancel(node);
switch_core_session_rwunlock(other_session);
switch_safe_free(uuid);
if (call_event) {
switch_event_destroy(&call_event);
}
if (!do_wait || !switch_channel_ready(channel)) {
break;
@ -1626,7 +2100,6 @@ SWITCH_STANDARD_APP(fifo_function)
switch_thread_rwlock_wrlock(node->rwlock);
node->ready = 0;
switch_mutex_lock(node->mutex);
switch_core_hash_destroy(&node->caller_hash);
switch_core_hash_destroy(&node->consumer_hash);
switch_mutex_unlock(node->mutex);
switch_thread_rwlock_unlock(node->rwlock);
@ -1680,8 +2153,8 @@ static int xml_callback(void *pArg, int argc, char **argv, char **columnNames)
switch_xml_set_attr_d(x_out, "lag", argv[6]);
switch_xml_set_attr_d(x_out, "outbound-call-count", argv[10]);
switch_xml_set_attr_d(x_out, "outbound-fail-count", argv[11]);
switch_xml_set_attr_d(x_out, "taking-calls", argv[12]);
switch_xml_set_attr_d(x_out, "status", argv[13]);
switch_xml_set_attr_d(x_out, "taking-calls", argv[13]);
switch_xml_set_attr_d(x_out, "status", argv[14]);
switch_xml_set_attr_d(x_out, "next-available", expires);
switch_xml_set_txt_d(x_out, argv[2]);
@ -1764,6 +2237,82 @@ static int xml_hash(switch_xml_t xml, switch_hash_t *hash, char *container, char
return cc_off;
}
static int xml_caller(switch_xml_t xml, fifo_node_t *node, char *container, char *tag, int cc_off, int verbose)
{
switch_xml_t x_tmp, x_caller, x_cp, variables;
int i, x;
switch_core_session_t *session;
switch_channel_t *channel;
x_tmp = switch_xml_add_child_d(xml, container, cc_off++);
switch_assert(x_tmp);
for (x = 0; x < MAX_PRI; x++) {
fifo_queue_t *q = node->fifo_list[x];
switch_mutex_lock(q->mutex);
for (i = 0; i < q->idx; i++) {
int c_off = 0, d_off = 0;
const char *status;
const char *ts;
const char *uuid = switch_event_get_header(q->data[i], "unique-id");
if (!uuid) {
continue;
}
if (!(session = switch_core_session_locate(uuid))) {
continue;
}
channel = switch_core_session_get_channel(session);
x_caller = switch_xml_add_child_d(x_tmp, tag, c_off++);
switch_assert(x_caller);
switch_xml_set_attr_d(x_caller, "uuid", switch_core_session_get_uuid(session));
if ((status = switch_channel_get_variable(channel, "fifo_status"))) {
switch_xml_set_attr_d(x_caller, "status", status);
}
if ((ts = switch_channel_get_variable(channel, "fifo_timestamp"))) {
switch_xml_set_attr_d(x_caller, "timestamp", ts);
}
if ((ts = switch_channel_get_variable(channel, "fifo_target"))) {
switch_xml_set_attr_d(x_caller, "target", ts);
}
if ((ts = switch_channel_get_variable(channel, "fifo_position"))) {
switch_xml_set_attr_d(x_caller, "position", ts);
}
if (!(x_cp = switch_xml_add_child_d(x_caller, "caller_profile", d_off++))) {
abort();
}
if (verbose) {
d_off += switch_ivr_set_xml_profile_data(x_cp, switch_channel_get_caller_profile(channel), d_off);
if (!(variables = switch_xml_add_child_d(x_caller, "variables", c_off++))) {
abort();
}
switch_ivr_set_xml_chan_vars(variables, channel, c_off);
}
switch_core_session_rwunlock(session);
session = NULL;
}
switch_mutex_unlock(q->mutex);
}
return cc_off;
}
static void list_node(fifo_node_t *node, switch_xml_t x_report, int *off, int verbose)
{
switch_xml_t x_fifo;
@ -1784,8 +2333,10 @@ static void list_node(fifo_node_t *node, switch_xml_t x_report, int *off, int ve
switch_snprintf(tmp, sizeof(buffer), "%u", node->importance);
switch_xml_set_attr_d(x_fifo, "importance", tmp);
switch_xml_set_attr_d(x_fifo, "outbound_strategy", strat_parse(node->outbound_strategy));
cc_off = xml_outbound(x_fifo, node, "outbound", "member", cc_off, verbose);
cc_off = xml_hash(x_fifo, node->caller_hash, "callers", "caller", cc_off, verbose);
cc_off = xml_caller(x_fifo, node, "callers", "caller", cc_off, verbose);
cc_off = xml_hash(x_fifo, node->consumer_hash, "consumers", "consumer", cc_off, verbose);
}
@ -1965,6 +2516,10 @@ static switch_status_t load_config(int reload, int del_all)
var = (char *) switch_xml_attr_soft(param, "name");
val = (char *) switch_xml_attr_soft(param, "value");
if (!strcasecmp(var, "outbound-strategy") && !zstr(val)) {
default_strategy = parse_strat(val);
}
if (!strcasecmp(var, "odbc-dsn") && !zstr(val)) {
if (switch_odbc_available()) {
globals.odbc_dsn = switch_core_strdup(globals.pool, val);
@ -2024,7 +2579,7 @@ static switch_status_t load_config(int reload, int del_all)
if ((fifos = switch_xml_child(cfg, "fifos"))) {
for (fifo = switch_xml_child(fifos, "fifo"); fifo; fifo = fifo->next) {
const char *name;
const char *name, *outbound_strategy;
const char *importance;
int imp = 0;
int simo_i = 1;
@ -2033,6 +2588,7 @@ static switch_status_t load_config(int reload, int del_all)
fifo_node_t *node;
name = switch_xml_attr(fifo, "name");
outbound_strategy = switch_xml_attr(fifo, "outbound_strategy");
if ((importance = switch_xml_attr(fifo, "importance"))) {
if ((imp = atoi(importance)) < 0) {
@ -2055,6 +2611,10 @@ static switch_status_t load_config(int reload, int del_all)
switch_mutex_lock(node->mutex);
if (outbound_strategy) {
node->outbound_strategy = parse_strat(outbound_strategy);
}
for (member = switch_xml_child(fifo, "member"); member; member = member->next) {
const char *simo = switch_xml_attr_soft(member, "simo");
const char *lag = switch_xml_attr_soft(member, "lag");
@ -2113,7 +2673,8 @@ static switch_status_t load_config(int reload, int del_all)
if (reload) {
switch_hash_index_t *hi;
void *val, *pop;
void *val;
switch_event_t *pop;
fifo_node_t *node;
switch_mutex_lock(globals.mutex);
top:
@ -2131,13 +2692,12 @@ static switch_status_t load_config(int reload, int del_all)
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s removed.\n", node->name);
switch_thread_rwlock_wrlock(node->rwlock);
for (x = 0; x < MAX_PRI; x++) {
while (switch_queue_trypop(node->fifo_list[x], &pop) == SWITCH_STATUS_SUCCESS) {
free(pop);
while (fifo_queue_pop(node->fifo_list[x], &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS) {
switch_event_destroy(&pop);
}
}
switch_core_hash_delete(globals.fifo_hash, node->name);
switch_core_hash_destroy(&node->caller_hash);
switch_core_hash_destroy(&node->consumer_hash);
switch_thread_rwlock_unlock(node->rwlock);
switch_core_destroy_memory_pool(&node->pool);
@ -2361,7 +2921,8 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_fifo_load)
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown)
{
switch_hash_index_t *hi;
void *val, *pop;
void *val;
switch_event_t *pop = NULL;
fifo_node_t *node;
switch_memory_pool_t *pool = globals.pool;
switch_mutex_t *mutex = globals.mutex;
@ -2385,13 +2946,12 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown)
switch_thread_rwlock_wrlock(node->rwlock);
for (x = 0; x < MAX_PRI; x++) {
while (switch_queue_trypop(node->fifo_list[x], &pop) == SWITCH_STATUS_SUCCESS) {
free(pop);
while (fifo_queue_pop(node->fifo_list[x], &pop, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS) {
switch_event_destroy(&pop);
}
}
switch_core_hash_delete(globals.fifo_hash, node->name);
switch_core_hash_destroy(&node->caller_hash);
switch_core_hash_destroy(&node->consumer_hash);
switch_thread_rwlock_unlock(node->rwlock);
switch_core_destroy_memory_pool(&node->pool);