From b881188c1c4125bdd7176f81053d70e6cbaa843d Mon Sep 17 00:00:00 2001 From: Chris Rienzo Date: Tue, 2 Jun 2015 10:48:57 -0400 Subject: [PATCH] FS-7564 #resolve #comment [mod_rayo] Added new algorithms for offering calls to clients. Two new params added to autoload_configs/rayo.conf.xml offer-algorithm all: offer to all clients (default and old behavior) first: offer to first client, fails over to next client in list random: offer to random client, fails over to next random client offer-timeout-ms 0: disable > 0 and < 120000: time to wait for reply from offer. On timeout, next client is offered call. If no other clients available, call is rejected. 5000 is default. --- conf/rayo/autoload_configs/rayo.conf.xml | 5 + .../conf/autoload_configs/rayo.conf.xml | 5 + src/mod/event_handlers/mod_rayo/mod_rayo.c | 234 ++++++++++++++++-- 3 files changed, 228 insertions(+), 16 deletions(-) diff --git a/conf/rayo/autoload_configs/rayo.conf.xml b/conf/rayo/autoload_configs/rayo.conf.xml index 54aa388675..2ebcdbcffd 100644 --- a/conf/rayo/autoload_configs/rayo.conf.xml +++ b/conf/rayo/autoload_configs/rayo.conf.xml @@ -8,6 +8,11 @@ + + + + diff --git a/src/mod/event_handlers/mod_rayo/conf/autoload_configs/rayo.conf.xml b/src/mod/event_handlers/mod_rayo/conf/autoload_configs/rayo.conf.xml index 54aa388675..2ebcdbcffd 100644 --- a/src/mod/event_handlers/mod_rayo/conf/autoload_configs/rayo.conf.xml +++ b/src/mod/event_handlers/mod_rayo/conf/autoload_configs/rayo.conf.xml @@ -8,6 +8,11 @@ + + + + diff --git a/src/mod/event_handlers/mod_rayo/mod_rayo.c b/src/mod/event_handlers/mod_rayo/mod_rayo.c index aa213ff168..28d7ae1013 100644 --- a/src/mod/event_handlers/mod_rayo/mod_rayo.c +++ b/src/mod/event_handlers/mod_rayo/mod_rayo.c @@ -1,6 +1,6 @@ /* * mod_rayo for FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application - * Copyright (C) 2013-2014, Grasshopper + * Copyright (C) 2013-2015, Grasshopper * * Version: MPL 1.1 * @@ -61,6 +61,10 @@ SWITCH_MODULE_DEFINITION(mod_rayo, mod_rayo_load, mod_rayo_shutdown, mod_rayo_ru #define JOINED_CALL 1 #define JOINED_MIXER 2 +#define OFFER_ALL 0 +#define OFFER_FIRST 1 +#define OFFER_RANDOM 2 + struct rayo_actor; struct rayo_client; struct rayo_call; @@ -123,8 +127,12 @@ struct rayo_call { struct rayo_actor base; /** Definitive controlling party JID */ char *dcp_jid; - /** Potential controlling parties */ + /** Potential controlling parties (have sent offers to) */ switch_hash_t *pcps; + /** Available controlling parties (not sent offers to) */ + switch_hash_t *acps; + /** Number of available controlling parties */ + int num_acps; /** current idle start time */ switch_time_t idle_start_time; /** true if fax is in progress */ @@ -223,6 +231,8 @@ static struct { int num_message_threads; /** message delivery queue */ switch_queue_t *msg_queue; + /** in progress offer queue */ + switch_queue_t *offer_queue; /** shutdown flag */ int shutdown; /** prevents context shutdown until all threads are finished */ @@ -237,6 +247,10 @@ static struct { int add_variables_to_offer; /** if true, channel variables are added to answered, ringing, end events */ int add_variables_to_events; + /** How to distribute offers to clients */ + int offer_algorithm; + /** How long to wait for offer response before retrying */ + int offer_timeout_us; } globals; /** @@ -866,12 +880,13 @@ static void start_deliver_message_thread(switch_memory_pool_t *pool) } /** - * Stop all message threads + * Stop all threads */ -static void stop_deliver_message_threads(void) +static void stop_all_threads(void) { globals.shutdown = 1; switch_queue_interrupt_all(globals.msg_queue); + switch_queue_interrupt_all(globals.offer_queue); switch_thread_rwlock_wrlock(globals.shutdown_rwlock); } @@ -1219,6 +1234,7 @@ done: switch_event_destroy(&call->answer_event); } switch_core_hash_destroy(&call->pcps); + switch_core_hash_destroy(&call->acps); } /** @@ -1404,6 +1420,8 @@ static struct rayo_call *rayo_call_init(struct rayo_call *call, switch_memory_po call->rayo_app_started = 0; call->answer_event = NULL; switch_core_hash_init(&call->pcps); + switch_core_hash_init(&call->acps); + call->num_acps = 0; } switch_safe_free(call_jid); @@ -3825,6 +3843,171 @@ static int should_offer_to_client(struct rayo_client *rclient, char **offer_filt return 0; } +/** + * Offered call information + */ +struct offered_call_info { + /** Call JID */ + char *call_jid; + /** Time this offer expires */ + switch_time_t offer_time; +}; + +/** + * Deliver offer message to next available client(s) + */ +static int send_offer_to_clients(struct rayo_call *from_call, switch_core_session_t *session) +{ + int i = 0; + int selection = 0; + int sent = 0; + switch_hash_index_t *hi = NULL; + iks *offer = NULL; + + if (from_call->num_acps <= 0) { + return 0; + } + + if (globals.offer_algorithm == OFFER_RANDOM) { + /* pick client at (not really) random */ + selection = rand() % from_call->num_acps; + } else if (globals.offer_algorithm == OFFER_FIRST) { + /* send to first client */ + selection = 0; + } else { + /* send to all clients */ + selection = -1; + } + + for (hi = switch_core_hash_first(from_call->acps); hi; hi = switch_core_hash_next(&hi)) { + if (i++ == selection || selection == -1) { + const char *to_client_jid = NULL; + const void *key; + void *val; + + /* get client jid to send to */ + switch_core_hash_this(hi, &key, NULL, &val); + to_client_jid = (const char *)key; + switch_assert(to_client_jid); + + /* send offer to client, remembering jid as PCP */ + if (!offer) { + offer = rayo_create_offer(from_call, session); + } + switch_core_hash_insert(from_call->pcps, to_client_jid, "1"); + iks_insert_attrib(offer, "to", to_client_jid); + RAYO_SEND_MESSAGE_DUP(from_call, to_client_jid, offer); + + /* remove client JID from list of available clients */ + switch_core_hash_delete(from_call->acps, to_client_jid); + from_call->num_acps--; + sent = 1; + + if (selection != -1) { + break; + } + } + } + switch_safe_free(hi); + + /* queue offer information */ + if (globals.offer_timeout_us > 0 && sent) { + struct offered_call_info *offered_call; + switch_zmalloc(offered_call, sizeof(*offered_call)); + offered_call->offer_time = switch_micro_time_now(); + offered_call->call_jid = strdup(RAYO_JID(from_call)); + if (switch_queue_trypush(globals.offer_queue, offered_call) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Failed to queue offered call info! Offer timeout won't work on this call\n"); + switch_safe_free(offered_call->call_jid); + switch_safe_free(offered_call); + } + } + + if (offer) { + iks_delete(offer); + } + + return sent; +} + +/** + * Thread that monitors for timed out offers + * @param thread this thread + * @param obj unused + * @return NULL + */ +static void *SWITCH_THREAD_FUNC offer_timeout_thread(switch_thread_t *thread, void *obj) +{ + struct offered_call_info *next_offer; + switch_thread_rwlock_rdlock(globals.shutdown_rwlock); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "New offer timeout thread\n"); + while (!globals.shutdown) { + if (switch_queue_pop(globals.offer_queue, (void *)&next_offer) == SWITCH_STATUS_SUCCESS) { + switch_time_t now = switch_micro_time_now(); + switch_time_t offer_timeout = next_offer->offer_time + globals.offer_timeout_us; + + /* wait for timeout */ + while (offer_timeout > now && !globals.shutdown) { + switch_time_t remain = offer_timeout - now; + remain = remain > 500000 ? 500000 : remain; + switch_sleep(remain); + now = switch_micro_time_now(); + } + + /* check if offer was accepted - it is accepted if the call has a DCP (definitive controlling party) */ + if (!globals.shutdown) { + struct rayo_call *call = RAYO_CALL_LOCATE(next_offer->call_jid); + if (call) { + switch_mutex_lock(RAYO_ACTOR(call)->mutex); + if (zstr(rayo_call_get_dcp_jid(call))) { + switch_core_session_t *session = switch_core_session_locate(rayo_call_get_uuid(call)); + if (session) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s, offer timeout\n", RAYO_JID(call)); + if (!send_offer_to_clients(call, session)) { + /* nobody to offer to, end call */ + switch_channel_t *channel = switch_core_session_get_channel(session); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s, no more clients to offer, ending call\n", RAYO_JID(call)); + switch_channel_hangup(channel, SWITCH_CAUSE_NORMAL_TEMPORARY_FAILURE); + } + switch_core_session_rwunlock(session); + } + } + switch_mutex_unlock(RAYO_ACTOR(call)->mutex); + RAYO_RELEASE(call); + } + } + + switch_safe_free(next_offer->call_jid); + switch_safe_free(next_offer); + } + } + + /* clean up queue */ + while(switch_queue_trypop(globals.offer_queue, (void *)&next_offer) == SWITCH_STATUS_SUCCESS) { + switch_safe_free(next_offer->call_jid); + switch_safe_free(next_offer); + } + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Offer timeout thread finished\n"); + switch_thread_rwlock_unlock(globals.shutdown_rwlock); + + return NULL; +} + +/** + * Create a new offer timeout thread + * @param pool to use + */ +static void start_offer_timeout_thread(switch_memory_pool_t *pool) +{ + switch_thread_t *thread; + switch_threadattr_t *thd_attr = NULL; + switch_threadattr_create(&thd_attr, pool); + switch_threadattr_detach_set(thd_attr, 1); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + switch_thread_create(&thread, thd_attr, offer_timeout_thread, NULL, pool); +} + #define RAYO_USAGE "[client username 1,client username n]" /** * Offer call and park channel @@ -3874,7 +4057,6 @@ SWITCH_STANDARD_APP(rayo_app) if (!call) { /* offer control */ switch_hash_index_t *hi = NULL; - iks *offer = NULL; char *clients_to_offer[16] = { 0 }; int clients_to_offer_count = 0; @@ -3888,7 +4070,6 @@ SWITCH_STANDARD_APP(rayo_app) switch_channel_set_variable(switch_core_session_get_channel(session), "rayo_call_jid", RAYO_JID(call)); - offer = rayo_create_offer(call, session); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Offering call for Rayo 3PCC\n"); if (!zstr(data)) { @@ -3904,7 +4085,6 @@ SWITCH_STANDARD_APP(rayo_app) } /* Offer call to all (or specified) ONLINE clients */ - /* TODO load balance offers so first session doesn't always get offer first? */ switch_mutex_lock(globals.clients_mutex); for (hi = switch_core_hash_first(globals.clients_roster); hi; hi = switch_core_hash_next(&hi)) { struct rayo_client *rclient; @@ -3914,16 +4094,15 @@ SWITCH_STANDARD_APP(rayo_app) rclient = (struct rayo_client *)val; switch_assert(rclient); - /* is session available to take call? */ + /* find clients available to take calls */ if (should_offer_to_client(rclient, clients_to_offer, clients_to_offer_count)) { - ok = 1; - switch_core_hash_insert(call->pcps, RAYO_JID(rclient), "1"); - iks_insert_attrib(offer, "to", RAYO_JID(rclient)); - RAYO_SEND_MESSAGE_DUP(call, RAYO_JID(rclient), offer); + switch_core_hash_insert(call->acps, RAYO_JID(rclient), "1"); + call->num_acps++; } } + ok = send_offer_to_clients(call, session); + switch_mutex_unlock(globals.clients_mutex); - iks_delete(offer); /* nobody to offer to */ if (!ok) { @@ -4158,6 +4337,8 @@ static switch_status_t do_config(switch_memory_pool_t *pool, const char *config_ globals.pause_when_offline = 0; globals.add_variables_to_offer = 0; globals.add_variables_to_events = 0; + globals.offer_timeout_us = 5000000; + globals.offer_algorithm = OFFER_ALL; /* get params */ { @@ -4203,6 +4384,25 @@ static switch_status_t do_config(switch_memory_pool_t *pool, const char *config_ globals.add_variables_to_offer = 1; globals.add_variables_to_events = 1; } + } else if (!strcasecmp(var, "offer-timeout-ms")) { + int offer_timeout_ms = 0; + if (switch_is_number(val) && (offer_timeout_ms = atoi(val)) >= 0 && offer_timeout_ms < 120000) { + globals.offer_timeout_us = offer_timeout_ms * 1000; + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Ignoring invalid value for offer-timeout-ms \"%s\"\n", val); + } + } else if (!strcasecmp(var, "offer-algorithm")) { + if (zstr(val)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "No value for offer-algorithm\n"); + } else if (!strcasecmp(val, "all")) { + globals.offer_algorithm = OFFER_ALL; + } else if (!strcasecmp(val, "first")) { + globals.offer_algorithm = OFFER_FIRST; + } else if (!strcasecmp(val, "random")) { + globals.offer_algorithm = OFFER_RANDOM; + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Ignoring invalid value for offer-algorithm \"%s\"\n", val); + } } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Unsupported param: %s\n", var); } @@ -4881,6 +5081,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_rayo_load) switch_core_hash_init(&globals.cmd_aliases); switch_thread_rwlock_create(&globals.shutdown_rwlock, pool); switch_queue_create(&globals.msg_queue, 25000, pool); + switch_queue_create(&globals.offer_queue, 25000, pool); globals.offline_logged = 1; /* server commands */ @@ -4930,6 +5131,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_rayo_load) start_deliver_message_thread(pool); } } + start_offer_timeout_thread(pool); /* create admin client */ globals.console = rayo_console_client_create(); @@ -4979,9 +5181,9 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_rayo_shutdown) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for XMPP threads to stop\n"); xmpp_stream_context_destroy(globals.xmpp_context); - /* stop message threads */ - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for message threads to stop\n"); - stop_deliver_message_threads(); + /* stop threads */ + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for message and offer timeout threads to stop\n"); + stop_all_threads(); if (globals.console) { RAYO_RELEASE(globals.console);