git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@7929 d0543943-73ff-0310-b7d9-9358b9ac24b2
This commit is contained in:
Anthony Minessale 2008-03-19 16:03:51 +00:00
parent 391afc48e5
commit 11d03864ad
1 changed files with 102 additions and 33 deletions

View File

@ -36,6 +36,20 @@ SWITCH_MODULE_DEFINITION(mod_fifo, mod_fifo_load, mod_fifo_shutdown, NULL);
#define FIFO_EVENT "fifo::info" #define FIFO_EVENT "fifo::info"
#define MAX_PRI 10
struct fifo_node {
char *name;
switch_mutex_t *mutex;
switch_queue_t *fifo_list[MAX_PRI];
switch_hash_t *caller_hash;
switch_hash_t *consumer_hash;
int caller_count;
int waiting_count;
int consumer_count;
};
typedef struct fifo_node fifo_node_t;
static switch_status_t on_dtmf(switch_core_session_t *session, void *input, switch_input_type_t itype, void *buf, unsigned int buflen) static switch_status_t on_dtmf(switch_core_session_t *session, void *input, switch_input_type_t itype, void *buf, unsigned int buflen)
@ -66,10 +80,17 @@ static switch_status_t on_dtmf(switch_core_session_t *session, void *input, swit
static switch_status_t read_frame_callback(switch_core_session_t *session, switch_frame_t *frame, void *user_data) static switch_status_t read_frame_callback(switch_core_session_t *session, switch_frame_t *frame, void *user_data)
{ {
switch_queue_t *fifo = (switch_queue_t *) user_data; fifo_node_t *node = (fifo_node_t *) user_data;
if (switch_queue_size(fifo)) { int x = 0, total = 0;
for (x = 0; x < MAX_PRI; x++) {
total += switch_queue_size(node->fifo_list[x]);
}
if (total) {
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
@ -81,23 +102,10 @@ static struct {
} globals; } globals;
struct fifo_node {
char *name;
switch_mutex_t *mutex;
switch_queue_t *fifo;
switch_hash_t *caller_hash;
switch_hash_t *consumer_hash;
int caller_count;
int waiting_count;
int consumer_count;
};
typedef struct fifo_node fifo_node_t;
static fifo_node_t *create_node(const char *name) static fifo_node_t *create_node(const char *name)
{ {
fifo_node_t *node; fifo_node_t *node;
int x = 0;
if (!globals.running) { if (!globals.running) {
return NULL; return NULL;
@ -105,11 +113,13 @@ static fifo_node_t *create_node(const char *name)
node = switch_core_alloc(globals.pool, sizeof(*node)); node = switch_core_alloc(globals.pool, sizeof(*node));
node->name = switch_core_strdup(globals.pool, name); node->name = switch_core_strdup(globals.pool, name);
for (x = 0; x < MAX_PRI; x++) {
switch_queue_create(&node->fifo, SWITCH_CORE_QUEUE_LEN, globals.pool); switch_queue_create(&node->fifo_list[x], SWITCH_CORE_QUEUE_LEN, globals.pool);
switch_assert(node->fifo_list[x]);
}
switch_core_hash_init(&node->caller_hash, globals.pool); switch_core_hash_init(&node->caller_hash, globals.pool);
switch_core_hash_init(&node->consumer_hash, globals.pool); switch_core_hash_init(&node->consumer_hash, globals.pool);
switch_assert(node->fifo);
switch_mutex_init(&node->mutex, SWITCH_MUTEX_NESTED, globals.pool); switch_mutex_init(&node->mutex, SWITCH_MUTEX_NESTED, globals.pool);
switch_core_hash_insert(globals.fifo_hash, name, node); switch_core_hash_insert(globals.fifo_hash, name, node);
@ -228,6 +238,8 @@ SWITCH_STANDARD_APP(fifo_function)
if (!strcasecmp(argv[1], "in")) { if (!strcasecmp(argv[1], "in")) {
const char *uuid = strdup(switch_core_session_get_uuid(session)); const char *uuid = strdup(switch_core_session_get_uuid(session));
const char *pri;
int p = 0;
switch_channel_answer(channel); switch_channel_answer(channel);
@ -253,7 +265,17 @@ SWITCH_STANDARD_APP(fifo_function)
node->waiting_count++; node->waiting_count++;
send_presence(node); send_presence(node);
switch_core_hash_insert(node->caller_hash, uuid, session); switch_core_hash_insert(node->caller_hash, uuid, session);
switch_queue_push(node->fifo, (void *)uuid);
if ((pri = switch_channel_get_variable(channel, "fifo_priority"))) {
p = atoi(pri);
}
if (p >= MAX_PRI) {
p = MAX_PRI - 1;
}
switch_queue_push(node->fifo_list[p], (void *)uuid);
switch_mutex_unlock(node->mutex); switch_mutex_unlock(node->mutex);
ts = switch_timestamp_now(); ts = switch_timestamp_now();
@ -311,6 +333,10 @@ SWITCH_STANDARD_APP(fifo_function)
int done = 0; int done = 0;
switch_core_session_t *other_session; switch_core_session_t *other_session;
switch_input_args_t args = { 0 }; switch_input_args_t args = { 0 };
const char *pop_order = NULL;
int custom_pop = 0;
int pop_array[MAX_PRI] = { 0 };
char *pop_list[MAX_PRI] = { 0 };
if (argc > 3) { if (argc > 3) {
announce = argv[3]; announce = argv[3];
@ -344,26 +370,57 @@ SWITCH_STANDARD_APP(fifo_function)
switch_channel_set_variable(channel, "fifo_status", "WAITING"); switch_channel_set_variable(channel, "fifo_status", "WAITING");
switch_channel_set_variable(channel, "fifo_timestamp", date); switch_channel_set_variable(channel, "fifo_timestamp", date);
if ((pop_order = switch_channel_get_variable(channel, "fifo_pop_order"))) {
char *tmp = switch_core_session_strdup(session, pop_order);
int x;
custom_pop = switch_separate_string(tmp, ',', pop_list, (sizeof(pop_list) / sizeof(pop_list[0])));
if (custom_pop >= MAX_PRI) {
custom_pop = MAX_PRI -1;
}
for (x = 0; x < custom_pop; x++) {
int tmp = atoi(pop_list[x]);
if (tmp > -1 && tmp < MAX_PRI) {
pop_array[x] = tmp;
}
}
}
while(switch_channel_ready(channel)) { while(switch_channel_ready(channel)) {
int x = 0 ;
if (moh) { if (moh) {
args.read_frame_callback = read_frame_callback; args.read_frame_callback = read_frame_callback;
args.user_data = node->fifo; args.user_data = node;
switch_ivr_play_file(session, NULL, moh, &args); switch_ivr_play_file(session, NULL, moh, &args);
} }
if (switch_queue_trypop(node->fifo, &pop) != SWITCH_STATUS_SUCCESS) { if (custom_pop) {
for(x = 0; x < MAX_PRI; x++) {
if (switch_queue_trypop(node->fifo_list[pop_array[x]], &pop) == SWITCH_STATUS_SUCCESS && pop) {
break;
}
}
} else {
for(x = 0; x < MAX_PRI; x++) {
if (switch_queue_trypop(node->fifo_list[x], &pop) == SWITCH_STATUS_SUCCESS && pop) {
break;
}
}
}
if (!pop) {
if (nowait) { if (nowait) {
break; break;
} }
status = switch_core_session_read_frame(session, &read_frame, -1, 0); status = switch_core_session_read_frame(session, &read_frame, -1, 0);
if (!SWITCH_READ_ACCEPTABLE(status)) { if (!SWITCH_READ_ACCEPTABLE(status)) {
break; break;
} }
continue; continue;
} }
if (!pop) {
break;
}
uuid = (char *) pop; uuid = (char *) pop;
@ -615,9 +672,13 @@ SWITCH_STANDARD_API(fifo_api_function)
} else if (!strcasecmp(argv[0], "count")) { } else if (!strcasecmp(argv[0], "count")) {
if (argc < 2) { if (argc < 2) {
for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) { for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) {
int x = 0;
switch_hash_this(hi, &var, NULL, &val); switch_hash_this(hi, &var, NULL, &val);
node = (fifo_node_t *) val; node = (fifo_node_t *) val;
len = switch_queue_size(node->fifo); len = 0;
for (x = 0 ;x < MAX_PRI; x++) {
len += switch_queue_size(node->fifo_list[x]);
}
switch_mutex_lock(node->mutex); switch_mutex_lock(node->mutex);
stream->write_function(stream, "%s:%d:%d:%d\n", (char *)var, node->consumer_count, node->caller_count, len); stream->write_function(stream, "%s:%d:%d:%d\n", (char *)var, node->consumer_count, node->caller_count, len);
switch_mutex_unlock(node->mutex); switch_mutex_unlock(node->mutex);
@ -629,7 +690,12 @@ SWITCH_STANDARD_API(fifo_api_function)
} }
} else { } else {
if ((node = switch_core_hash_find(globals.fifo_hash, argv[1]))) { if ((node = switch_core_hash_find(globals.fifo_hash, argv[1]))) {
len = switch_queue_size(node->fifo); int x = 0;
len = 0;
for (x = 0 ;x < MAX_PRI; x++) {
len += switch_queue_size(node->fifo_list[x]);
}
} }
switch_mutex_lock(node->mutex); switch_mutex_lock(node->mutex);
stream->write_function(stream, "%s:%d:%d:%d\n", argv[1], node->consumer_count, node->caller_count, len); stream->write_function(stream, "%s:%d:%d:%d\n", argv[1], node->consumer_count, node->caller_count, len);
@ -689,11 +755,14 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown)
globals.running = 0; globals.running = 0;
/* Cleanup*/ /* Cleanup*/
for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) { for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) {
int x = 0 ;
switch_hash_this(hi, NULL, NULL, &val); switch_hash_this(hi, NULL, NULL, &val);
node = (fifo_node_t *) val; node = (fifo_node_t *) val;
while (switch_queue_trypop(node->fifo, &pop) == SWITCH_STATUS_SUCCESS) { for (x = 0; x < MAX_PRI; x++) {
free(pop); while (switch_queue_trypop(node->fifo_list[x], &pop) == SWITCH_STATUS_SUCCESS) {
} free(pop);
}
}
switch_core_hash_destroy(&node->caller_hash); switch_core_hash_destroy(&node->caller_hash);
switch_core_hash_destroy(&node->consumer_hash); switch_core_hash_destroy(&node->consumer_hash);
} }