diff --git a/conf/rayo/autoload_configs/rayo.conf.xml b/conf/rayo/autoload_configs/rayo.conf.xml
index 54aa388675..2ebcdbcffd 100644
--- a/conf/rayo/autoload_configs/rayo.conf.xml
+++ b/conf/rayo/autoload_configs/rayo.conf.xml
@@ -8,6 +8,11 @@
+
+
+
+
diff --git a/src/mod/event_handlers/mod_rayo/conf/autoload_configs/rayo.conf.xml b/src/mod/event_handlers/mod_rayo/conf/autoload_configs/rayo.conf.xml
index 54aa388675..2ebcdbcffd 100644
--- a/src/mod/event_handlers/mod_rayo/conf/autoload_configs/rayo.conf.xml
+++ b/src/mod/event_handlers/mod_rayo/conf/autoload_configs/rayo.conf.xml
@@ -8,6 +8,11 @@
+
+
+
+
diff --git a/src/mod/event_handlers/mod_rayo/mod_rayo.c b/src/mod/event_handlers/mod_rayo/mod_rayo.c
index aa213ff168..28d7ae1013 100644
--- a/src/mod/event_handlers/mod_rayo/mod_rayo.c
+++ b/src/mod/event_handlers/mod_rayo/mod_rayo.c
@@ -1,6 +1,6 @@
/*
* mod_rayo for FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
- * Copyright (C) 2013-2014, Grasshopper
+ * Copyright (C) 2013-2015, Grasshopper
*
* Version: MPL 1.1
*
@@ -61,6 +61,10 @@ SWITCH_MODULE_DEFINITION(mod_rayo, mod_rayo_load, mod_rayo_shutdown, mod_rayo_ru
#define JOINED_CALL 1
#define JOINED_MIXER 2
+#define OFFER_ALL 0
+#define OFFER_FIRST 1
+#define OFFER_RANDOM 2
+
struct rayo_actor;
struct rayo_client;
struct rayo_call;
@@ -123,8 +127,12 @@ struct rayo_call {
struct rayo_actor base;
/** Definitive controlling party JID */
char *dcp_jid;
- /** Potential controlling parties */
+ /** Potential controlling parties (have sent offers to) */
switch_hash_t *pcps;
+ /** Available controlling parties (not sent offers to) */
+ switch_hash_t *acps;
+ /** Number of available controlling parties */
+ int num_acps;
/** current idle start time */
switch_time_t idle_start_time;
/** true if fax is in progress */
@@ -223,6 +231,8 @@ static struct {
int num_message_threads;
/** message delivery queue */
switch_queue_t *msg_queue;
+ /** in progress offer queue */
+ switch_queue_t *offer_queue;
/** shutdown flag */
int shutdown;
/** prevents context shutdown until all threads are finished */
@@ -237,6 +247,10 @@ static struct {
int add_variables_to_offer;
/** if true, channel variables are added to answered, ringing, end events */
int add_variables_to_events;
+ /** How to distribute offers to clients */
+ int offer_algorithm;
+ /** How long to wait for offer response before retrying */
+ int offer_timeout_us;
} globals;
/**
@@ -866,12 +880,13 @@ static void start_deliver_message_thread(switch_memory_pool_t *pool)
}
/**
- * Stop all message threads
+ * Stop all threads
*/
-static void stop_deliver_message_threads(void)
+static void stop_all_threads(void)
{
globals.shutdown = 1;
switch_queue_interrupt_all(globals.msg_queue);
+ switch_queue_interrupt_all(globals.offer_queue);
switch_thread_rwlock_wrlock(globals.shutdown_rwlock);
}
@@ -1219,6 +1234,7 @@ done:
switch_event_destroy(&call->answer_event);
}
switch_core_hash_destroy(&call->pcps);
+ switch_core_hash_destroy(&call->acps);
}
/**
@@ -1404,6 +1420,8 @@ static struct rayo_call *rayo_call_init(struct rayo_call *call, switch_memory_po
call->rayo_app_started = 0;
call->answer_event = NULL;
switch_core_hash_init(&call->pcps);
+ switch_core_hash_init(&call->acps);
+ call->num_acps = 0;
}
switch_safe_free(call_jid);
@@ -3825,6 +3843,171 @@ static int should_offer_to_client(struct rayo_client *rclient, char **offer_filt
return 0;
}
+/**
+ * Offered call information
+ */
+struct offered_call_info {
+ /** Call JID */
+ char *call_jid;
+ /** Time this offer expires */
+ switch_time_t offer_time;
+};
+
+/**
+ * Deliver offer message to next available client(s)
+ */
+static int send_offer_to_clients(struct rayo_call *from_call, switch_core_session_t *session)
+{
+ int i = 0;
+ int selection = 0;
+ int sent = 0;
+ switch_hash_index_t *hi = NULL;
+ iks *offer = NULL;
+
+ if (from_call->num_acps <= 0) {
+ return 0;
+ }
+
+ if (globals.offer_algorithm == OFFER_RANDOM) {
+ /* pick client at (not really) random */
+ selection = rand() % from_call->num_acps;
+ } else if (globals.offer_algorithm == OFFER_FIRST) {
+ /* send to first client */
+ selection = 0;
+ } else {
+ /* send to all clients */
+ selection = -1;
+ }
+
+ for (hi = switch_core_hash_first(from_call->acps); hi; hi = switch_core_hash_next(&hi)) {
+ if (i++ == selection || selection == -1) {
+ const char *to_client_jid = NULL;
+ const void *key;
+ void *val;
+
+ /* get client jid to send to */
+ switch_core_hash_this(hi, &key, NULL, &val);
+ to_client_jid = (const char *)key;
+ switch_assert(to_client_jid);
+
+ /* send offer to client, remembering jid as PCP */
+ if (!offer) {
+ offer = rayo_create_offer(from_call, session);
+ }
+ switch_core_hash_insert(from_call->pcps, to_client_jid, "1");
+ iks_insert_attrib(offer, "to", to_client_jid);
+ RAYO_SEND_MESSAGE_DUP(from_call, to_client_jid, offer);
+
+ /* remove client JID from list of available clients */
+ switch_core_hash_delete(from_call->acps, to_client_jid);
+ from_call->num_acps--;
+ sent = 1;
+
+ if (selection != -1) {
+ break;
+ }
+ }
+ }
+ switch_safe_free(hi);
+
+ /* queue offer information */
+ if (globals.offer_timeout_us > 0 && sent) {
+ struct offered_call_info *offered_call;
+ switch_zmalloc(offered_call, sizeof(*offered_call));
+ offered_call->offer_time = switch_micro_time_now();
+ offered_call->call_jid = strdup(RAYO_JID(from_call));
+ if (switch_queue_trypush(globals.offer_queue, offered_call) != SWITCH_STATUS_SUCCESS) {
+ switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Failed to queue offered call info! Offer timeout won't work on this call\n");
+ switch_safe_free(offered_call->call_jid);
+ switch_safe_free(offered_call);
+ }
+ }
+
+ if (offer) {
+ iks_delete(offer);
+ }
+
+ return sent;
+}
+
+/**
+ * Thread that monitors for timed out offers
+ * @param thread this thread
+ * @param obj unused
+ * @return NULL
+ */
+static void *SWITCH_THREAD_FUNC offer_timeout_thread(switch_thread_t *thread, void *obj)
+{
+ struct offered_call_info *next_offer;
+ switch_thread_rwlock_rdlock(globals.shutdown_rwlock);
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "New offer timeout thread\n");
+ while (!globals.shutdown) {
+ if (switch_queue_pop(globals.offer_queue, (void *)&next_offer) == SWITCH_STATUS_SUCCESS) {
+ switch_time_t now = switch_micro_time_now();
+ switch_time_t offer_timeout = next_offer->offer_time + globals.offer_timeout_us;
+
+ /* wait for timeout */
+ while (offer_timeout > now && !globals.shutdown) {
+ switch_time_t remain = offer_timeout - now;
+ remain = remain > 500000 ? 500000 : remain;
+ switch_sleep(remain);
+ now = switch_micro_time_now();
+ }
+
+ /* check if offer was accepted - it is accepted if the call has a DCP (definitive controlling party) */
+ if (!globals.shutdown) {
+ struct rayo_call *call = RAYO_CALL_LOCATE(next_offer->call_jid);
+ if (call) {
+ switch_mutex_lock(RAYO_ACTOR(call)->mutex);
+ if (zstr(rayo_call_get_dcp_jid(call))) {
+ switch_core_session_t *session = switch_core_session_locate(rayo_call_get_uuid(call));
+ if (session) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s, offer timeout\n", RAYO_JID(call));
+ if (!send_offer_to_clients(call, session)) {
+ /* nobody to offer to, end call */
+ switch_channel_t *channel = switch_core_session_get_channel(session);
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s, no more clients to offer, ending call\n", RAYO_JID(call));
+ switch_channel_hangup(channel, SWITCH_CAUSE_NORMAL_TEMPORARY_FAILURE);
+ }
+ switch_core_session_rwunlock(session);
+ }
+ }
+ switch_mutex_unlock(RAYO_ACTOR(call)->mutex);
+ RAYO_RELEASE(call);
+ }
+ }
+
+ switch_safe_free(next_offer->call_jid);
+ switch_safe_free(next_offer);
+ }
+ }
+
+ /* clean up queue */
+ while(switch_queue_trypop(globals.offer_queue, (void *)&next_offer) == SWITCH_STATUS_SUCCESS) {
+ switch_safe_free(next_offer->call_jid);
+ switch_safe_free(next_offer);
+ }
+
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Offer timeout thread finished\n");
+ switch_thread_rwlock_unlock(globals.shutdown_rwlock);
+
+ return NULL;
+}
+
+/**
+ * Create a new offer timeout thread
+ * @param pool to use
+ */
+static void start_offer_timeout_thread(switch_memory_pool_t *pool)
+{
+ switch_thread_t *thread;
+ switch_threadattr_t *thd_attr = NULL;
+ switch_threadattr_create(&thd_attr, pool);
+ switch_threadattr_detach_set(thd_attr, 1);
+ switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
+ switch_thread_create(&thread, thd_attr, offer_timeout_thread, NULL, pool);
+}
+
#define RAYO_USAGE "[client username 1,client username n]"
/**
* Offer call and park channel
@@ -3874,7 +4057,6 @@ SWITCH_STANDARD_APP(rayo_app)
if (!call) {
/* offer control */
switch_hash_index_t *hi = NULL;
- iks *offer = NULL;
char *clients_to_offer[16] = { 0 };
int clients_to_offer_count = 0;
@@ -3888,7 +4070,6 @@ SWITCH_STANDARD_APP(rayo_app)
switch_channel_set_variable(switch_core_session_get_channel(session), "rayo_call_jid", RAYO_JID(call));
- offer = rayo_create_offer(call, session);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Offering call for Rayo 3PCC\n");
if (!zstr(data)) {
@@ -3904,7 +4085,6 @@ SWITCH_STANDARD_APP(rayo_app)
}
/* Offer call to all (or specified) ONLINE clients */
- /* TODO load balance offers so first session doesn't always get offer first? */
switch_mutex_lock(globals.clients_mutex);
for (hi = switch_core_hash_first(globals.clients_roster); hi; hi = switch_core_hash_next(&hi)) {
struct rayo_client *rclient;
@@ -3914,16 +4094,15 @@ SWITCH_STANDARD_APP(rayo_app)
rclient = (struct rayo_client *)val;
switch_assert(rclient);
- /* is session available to take call? */
+ /* find clients available to take calls */
if (should_offer_to_client(rclient, clients_to_offer, clients_to_offer_count)) {
- ok = 1;
- switch_core_hash_insert(call->pcps, RAYO_JID(rclient), "1");
- iks_insert_attrib(offer, "to", RAYO_JID(rclient));
- RAYO_SEND_MESSAGE_DUP(call, RAYO_JID(rclient), offer);
+ switch_core_hash_insert(call->acps, RAYO_JID(rclient), "1");
+ call->num_acps++;
}
}
+ ok = send_offer_to_clients(call, session);
+
switch_mutex_unlock(globals.clients_mutex);
- iks_delete(offer);
/* nobody to offer to */
if (!ok) {
@@ -4158,6 +4337,8 @@ static switch_status_t do_config(switch_memory_pool_t *pool, const char *config_
globals.pause_when_offline = 0;
globals.add_variables_to_offer = 0;
globals.add_variables_to_events = 0;
+ globals.offer_timeout_us = 5000000;
+ globals.offer_algorithm = OFFER_ALL;
/* get params */
{
@@ -4203,6 +4384,25 @@ static switch_status_t do_config(switch_memory_pool_t *pool, const char *config_
globals.add_variables_to_offer = 1;
globals.add_variables_to_events = 1;
}
+ } else if (!strcasecmp(var, "offer-timeout-ms")) {
+ int offer_timeout_ms = 0;
+ if (switch_is_number(val) && (offer_timeout_ms = atoi(val)) >= 0 && offer_timeout_ms < 120000) {
+ globals.offer_timeout_us = offer_timeout_ms * 1000;
+ } else {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Ignoring invalid value for offer-timeout-ms \"%s\"\n", val);
+ }
+ } else if (!strcasecmp(var, "offer-algorithm")) {
+ if (zstr(val)) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "No value for offer-algorithm\n");
+ } else if (!strcasecmp(val, "all")) {
+ globals.offer_algorithm = OFFER_ALL;
+ } else if (!strcasecmp(val, "first")) {
+ globals.offer_algorithm = OFFER_FIRST;
+ } else if (!strcasecmp(val, "random")) {
+ globals.offer_algorithm = OFFER_RANDOM;
+ } else {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Ignoring invalid value for offer-algorithm \"%s\"\n", val);
+ }
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Unsupported param: %s\n", var);
}
@@ -4881,6 +5081,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_rayo_load)
switch_core_hash_init(&globals.cmd_aliases);
switch_thread_rwlock_create(&globals.shutdown_rwlock, pool);
switch_queue_create(&globals.msg_queue, 25000, pool);
+ switch_queue_create(&globals.offer_queue, 25000, pool);
globals.offline_logged = 1;
/* server commands */
@@ -4930,6 +5131,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_rayo_load)
start_deliver_message_thread(pool);
}
}
+ start_offer_timeout_thread(pool);
/* create admin client */
globals.console = rayo_console_client_create();
@@ -4979,9 +5181,9 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_rayo_shutdown)
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for XMPP threads to stop\n");
xmpp_stream_context_destroy(globals.xmpp_context);
- /* stop message threads */
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for message threads to stop\n");
- stop_deliver_message_threads();
+ /* stop threads */
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for message and offer timeout threads to stop\n");
+ stop_all_threads();
if (globals.console) {
RAYO_RELEASE(globals.console);