FS-7564 #resolve #comment [mod_rayo] Added new algorithms for offering calls to clients.

Two new params added to autoload_configs/rayo.conf.xml
     offer-algorithm
       all: offer to all clients (default and old behavior)
       first: offer to first client, fails over to next client in list
       random: offer to random client, fails over to next random client

     offer-timeout-ms
       0: disable
       > 0 and < 120000: time to wait for reply from offer.  On timeout, next client is offered call.
                         If no other clients available, call is rejected.  5000 is default.
This commit is contained in:
Chris Rienzo 2015-06-02 10:48:57 -04:00
parent 6bfa299216
commit d04e1f03ff
3 changed files with 228 additions and 16 deletions

View File

@ -8,6 +8,11 @@
<param name="mixer-conf-profile" value="sla"/>
<!-- if true, to attribute in offer uses URI instead of name/number -->
<param name="offer-uri" value="true"/>
<!-- how offers are distributed to clients (all, first, random). -->
<param name="offer-algorithm" value="all"/>
<!-- If offer is not answered after timeout, next client is offered (based on algorithm picked).
If no other clients are available, the call is rejected. Set to 0 to disable -->
<param name="offer-timeout-ms" value="5000"/>
<!-- if true, channel variables are added to rayo client offer -->
<param name="add-variables-to-offer" value="false"/>
<!-- if true, channel variables are added to offer, ringing, answered, and end events sent to rayo clients -->

View File

@ -8,6 +8,11 @@
<param name="mixer-conf-profile" value="sla"/>
<!-- if true, to attribute in offer uses URI instead of name/number -->
<param name="offer-uri" value="true"/>
<!-- how offers are distributed to clients (all, first, random). -->
<param name="offer-algorithm" value="all"/>
<!-- If offer is not answered after timeout, next client is offered (based on algorithm picked).
If no other clients are available, the call is rejected. Set to 0 to disable -->
<param name="offer-timeout-ms" value="5000"/>
<!-- if true, channel variables are added to rayo client offer -->
<param name="add-variables-to-offer" value="false"/>
<!-- if true, channel variables are added to offer, ringing, answered, and end events sent to rayo clients -->

View File

@ -1,6 +1,6 @@
/*
* mod_rayo for FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
* Copyright (C) 2013-2014, Grasshopper
* Copyright (C) 2013-2015, Grasshopper
*
* Version: MPL 1.1
*
@ -61,6 +61,10 @@ SWITCH_MODULE_DEFINITION(mod_rayo, mod_rayo_load, mod_rayo_shutdown, mod_rayo_ru
#define JOINED_CALL 1
#define JOINED_MIXER 2
#define OFFER_ALL 0
#define OFFER_FIRST 1
#define OFFER_RANDOM 2
struct rayo_actor;
struct rayo_client;
struct rayo_call;
@ -123,8 +127,12 @@ struct rayo_call {
struct rayo_actor base;
/** Definitive controlling party JID */
char *dcp_jid;
/** Potential controlling parties */
/** Potential controlling parties (have sent offers to) */
switch_hash_t *pcps;
/** Available controlling parties (not sent offers to) */
switch_hash_t *acps;
/** Number of available controlling parties */
int num_acps;
/** current idle start time */
switch_time_t idle_start_time;
/** true if fax is in progress */
@ -223,6 +231,8 @@ static struct {
int num_message_threads;
/** message delivery queue */
switch_queue_t *msg_queue;
/** in progress offer queue */
switch_queue_t *offer_queue;
/** shutdown flag */
int shutdown;
/** prevents context shutdown until all threads are finished */
@ -237,6 +247,10 @@ static struct {
int add_variables_to_offer;
/** if true, channel variables are added to answered, ringing, end events */
int add_variables_to_events;
/** How to distribute offers to clients */
int offer_algorithm;
/** How long to wait for offer response before retrying */
int offer_timeout_us;
} globals;
/**
@ -866,12 +880,13 @@ static void start_deliver_message_thread(switch_memory_pool_t *pool)
}
/**
* Stop all message threads
* Stop all threads
*/
static void stop_deliver_message_threads(void)
static void stop_all_threads(void)
{
globals.shutdown = 1;
switch_queue_interrupt_all(globals.msg_queue);
switch_queue_interrupt_all(globals.offer_queue);
switch_thread_rwlock_wrlock(globals.shutdown_rwlock);
}
@ -1219,6 +1234,7 @@ done:
switch_event_destroy(&call->answer_event);
}
switch_core_hash_destroy(&call->pcps);
switch_core_hash_destroy(&call->acps);
}
/**
@ -1404,6 +1420,8 @@ static struct rayo_call *rayo_call_init(struct rayo_call *call, switch_memory_po
call->rayo_app_started = 0;
call->answer_event = NULL;
switch_core_hash_init(&call->pcps);
switch_core_hash_init(&call->acps);
call->num_acps = 0;
}
switch_safe_free(call_jid);
@ -3825,6 +3843,171 @@ static int should_offer_to_client(struct rayo_client *rclient, char **offer_filt
return 0;
}
/**
* Offered call information
*/
struct offered_call_info {
/** Call JID */
char *call_jid;
/** Time this offer expires */
switch_time_t offer_time;
};
/**
* Deliver offer message to next available client(s)
*/
static int send_offer_to_clients(struct rayo_call *from_call, switch_core_session_t *session)
{
int i = 0;
int selection = 0;
int sent = 0;
switch_hash_index_t *hi = NULL;
iks *offer = NULL;
if (from_call->num_acps <= 0) {
return 0;
}
if (globals.offer_algorithm == OFFER_RANDOM) {
/* pick client at (not really) random */
selection = rand() % from_call->num_acps;
} else if (globals.offer_algorithm == OFFER_FIRST) {
/* send to first client */
selection = 0;
} else {
/* send to all clients */
selection = -1;
}
for (hi = switch_core_hash_first(from_call->acps); hi; hi = switch_core_hash_next(&hi)) {
if (i++ == selection || selection == -1) {
const char *to_client_jid = NULL;
const void *key;
void *val;
/* get client jid to send to */
switch_core_hash_this(hi, &key, NULL, &val);
to_client_jid = (const char *)key;
switch_assert(to_client_jid);
/* send offer to client, remembering jid as PCP */
if (!offer) {
offer = rayo_create_offer(from_call, session);
}
switch_core_hash_insert(from_call->pcps, to_client_jid, "1");
iks_insert_attrib(offer, "to", to_client_jid);
RAYO_SEND_MESSAGE_DUP(from_call, to_client_jid, offer);
/* remove client JID from list of available clients */
switch_core_hash_delete(from_call->acps, to_client_jid);
from_call->num_acps--;
sent = 1;
if (selection != -1) {
break;
}
}
}
switch_safe_free(hi);
/* queue offer information */
if (globals.offer_timeout_us > 0 && sent) {
struct offered_call_info *offered_call;
switch_zmalloc(offered_call, sizeof(*offered_call));
offered_call->offer_time = switch_micro_time_now();
offered_call->call_jid = strdup(RAYO_JID(from_call));
if (switch_queue_trypush(globals.offer_queue, offered_call) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Failed to queue offered call info! Offer timeout won't work on this call\n");
switch_safe_free(offered_call->call_jid);
switch_safe_free(offered_call);
}
}
if (offer) {
iks_delete(offer);
}
return sent;
}
/**
* Thread that monitors for timed out offers
* @param thread this thread
* @param obj unused
* @return NULL
*/
static void *SWITCH_THREAD_FUNC offer_timeout_thread(switch_thread_t *thread, void *obj)
{
struct offered_call_info *next_offer;
switch_thread_rwlock_rdlock(globals.shutdown_rwlock);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "New offer timeout thread\n");
while (!globals.shutdown) {
if (switch_queue_pop(globals.offer_queue, (void *)&next_offer) == SWITCH_STATUS_SUCCESS) {
switch_time_t now = switch_micro_time_now();
switch_time_t offer_timeout = next_offer->offer_time + globals.offer_timeout_us;
/* wait for timeout */
while (offer_timeout > now && !globals.shutdown) {
switch_time_t remain = offer_timeout - now;
remain = remain > 500000 ? 500000 : remain;
switch_sleep(remain);
now = switch_micro_time_now();
}
/* check if offer was accepted - it is accepted if the call has a DCP (definitive controlling party) */
if (!globals.shutdown) {
struct rayo_call *call = RAYO_CALL_LOCATE(next_offer->call_jid);
if (call) {
switch_mutex_lock(RAYO_ACTOR(call)->mutex);
if (zstr(rayo_call_get_dcp_jid(call))) {
switch_core_session_t *session = switch_core_session_locate(rayo_call_get_uuid(call));
if (session) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s, offer timeout\n", RAYO_JID(call));
if (!send_offer_to_clients(call, session)) {
/* nobody to offer to, end call */
switch_channel_t *channel = switch_core_session_get_channel(session);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s, no more clients to offer, ending call\n", RAYO_JID(call));
switch_channel_hangup(channel, SWITCH_CAUSE_NORMAL_TEMPORARY_FAILURE);
}
switch_core_session_rwunlock(session);
}
}
switch_mutex_unlock(RAYO_ACTOR(call)->mutex);
RAYO_RELEASE(call);
}
}
switch_safe_free(next_offer->call_jid);
switch_safe_free(next_offer);
}
}
/* clean up queue */
while(switch_queue_trypop(globals.offer_queue, (void *)&next_offer) == SWITCH_STATUS_SUCCESS) {
switch_safe_free(next_offer->call_jid);
switch_safe_free(next_offer);
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Offer timeout thread finished\n");
switch_thread_rwlock_unlock(globals.shutdown_rwlock);
return NULL;
}
/**
* Create a new offer timeout thread
* @param pool to use
*/
static void start_offer_timeout_thread(switch_memory_pool_t *pool)
{
switch_thread_t *thread;
switch_threadattr_t *thd_attr = NULL;
switch_threadattr_create(&thd_attr, pool);
switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_thread_create(&thread, thd_attr, offer_timeout_thread, NULL, pool);
}
#define RAYO_USAGE "[client username 1,client username n]"
/**
* Offer call and park channel
@ -3874,7 +4057,6 @@ SWITCH_STANDARD_APP(rayo_app)
if (!call) {
/* offer control */
switch_hash_index_t *hi = NULL;
iks *offer = NULL;
char *clients_to_offer[16] = { 0 };
int clients_to_offer_count = 0;
@ -3888,7 +4070,6 @@ SWITCH_STANDARD_APP(rayo_app)
switch_channel_set_variable(switch_core_session_get_channel(session), "rayo_call_jid", RAYO_JID(call));
offer = rayo_create_offer(call, session);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "Offering call for Rayo 3PCC\n");
if (!zstr(data)) {
@ -3904,7 +4085,6 @@ SWITCH_STANDARD_APP(rayo_app)
}
/* Offer call to all (or specified) ONLINE clients */
/* TODO load balance offers so first session doesn't always get offer first? */
switch_mutex_lock(globals.clients_mutex);
for (hi = switch_core_hash_first(globals.clients_roster); hi; hi = switch_core_hash_next(&hi)) {
struct rayo_client *rclient;
@ -3914,16 +4094,15 @@ SWITCH_STANDARD_APP(rayo_app)
rclient = (struct rayo_client *)val;
switch_assert(rclient);
/* is session available to take call? */
/* find clients available to take calls */
if (should_offer_to_client(rclient, clients_to_offer, clients_to_offer_count)) {
ok = 1;
switch_core_hash_insert(call->pcps, RAYO_JID(rclient), "1");
iks_insert_attrib(offer, "to", RAYO_JID(rclient));
RAYO_SEND_MESSAGE_DUP(call, RAYO_JID(rclient), offer);
switch_core_hash_insert(call->acps, RAYO_JID(rclient), "1");
call->num_acps++;
}
}
ok = send_offer_to_clients(call, session);
switch_mutex_unlock(globals.clients_mutex);
iks_delete(offer);
/* nobody to offer to */
if (!ok) {
@ -4158,6 +4337,8 @@ static switch_status_t do_config(switch_memory_pool_t *pool, const char *config_
globals.pause_when_offline = 0;
globals.add_variables_to_offer = 0;
globals.add_variables_to_events = 0;
globals.offer_timeout_us = 5000000;
globals.offer_algorithm = OFFER_ALL;
/* get params */
{
@ -4203,6 +4384,25 @@ static switch_status_t do_config(switch_memory_pool_t *pool, const char *config_
globals.add_variables_to_offer = 1;
globals.add_variables_to_events = 1;
}
} else if (!strcasecmp(var, "offer-timeout-ms")) {
int offer_timeout_ms = 0;
if (switch_is_number(val) && (offer_timeout_ms = atoi(val)) >= 0 && offer_timeout_ms < 120000) {
globals.offer_timeout_us = offer_timeout_ms * 1000;
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Ignoring invalid value for offer-timeout-ms \"%s\"\n", val);
}
} else if (!strcasecmp(var, "offer-algorithm")) {
if (zstr(val)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "No value for offer-algorithm\n");
} else if (!strcasecmp(val, "all")) {
globals.offer_algorithm = OFFER_ALL;
} else if (!strcasecmp(val, "first")) {
globals.offer_algorithm = OFFER_FIRST;
} else if (!strcasecmp(val, "random")) {
globals.offer_algorithm = OFFER_RANDOM;
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Ignoring invalid value for offer-algorithm \"%s\"\n", val);
}
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Unsupported param: %s\n", var);
}
@ -4881,6 +5081,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_rayo_load)
switch_core_hash_init(&globals.cmd_aliases);
switch_thread_rwlock_create(&globals.shutdown_rwlock, pool);
switch_queue_create(&globals.msg_queue, 25000, pool);
switch_queue_create(&globals.offer_queue, 25000, pool);
globals.offline_logged = 1;
/* server commands */
@ -4930,6 +5131,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_rayo_load)
start_deliver_message_thread(pool);
}
}
start_offer_timeout_thread(pool);
/* create admin client */
globals.console = rayo_console_client_create();
@ -4979,9 +5181,9 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_rayo_shutdown)
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for XMPP threads to stop\n");
xmpp_stream_context_destroy(globals.xmpp_context);
/* stop message threads */
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for message threads to stop\n");
stop_deliver_message_threads();
/* stop threads */
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for message and offer timeout threads to stop\n");
stop_all_threads();
if (globals.console) {
RAYO_RELEASE(globals.console);