Changed im.execute relay id. Removed im.route message relay. Route chat using call.route message.

git-svn-id: http://yate.null.ro/svn/yate/trunk@5618 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
marian 2013-08-13 07:54:00 +00:00
parent a8e03e577b
commit e0942e7e65
11 changed files with 183 additions and 108 deletions

View File

@ -131,6 +131,21 @@
; Defaults to no
;dump_iq=no
; message_route_external: boolean: Route (send call.route) for incoming chat to
; external domains (components or server items) serviced by us
; NOTE: A module may register a server item having a domain from our configured domains.
; In this case the domain will become external and message stanzas addressed to it
; won't be routed unless message_route_external is enabled
; If enabled and call.route fails the message stanza will be dropped or rejected
; Defaults to no
;message_route_external=no
; message_route_foreign: boolean: Route (send call.route) for incoming chat to
; foreign domains (any domain not serviced by us)
; If enabled and call.route fails the message stanza will be dropped or rejected
; Defaults to no
;message_route_foreign=no
; compression_formats: string: Comma separated list of supported compression formats
; This parameter configures the formats to be offered on incoming streams
; This parameter is not applied on reload

View File

@ -43,3 +43,16 @@
; contact_delete: string: Database query used to delete a specific contact
;contact_delete=DELETE FROM roster WHERE username='${username}' AND contact='${contact}'
; route_callto: string: Target to set when successfully handled a call.route message
; This parameter is applied on reload
;route_callto=jabber/${called}
[priorities]
; Message handlers priorities
; call.route: integer: call.route message handler priority
; Defaults to 100
; This parameter is not applied on reload
;call.route=100

View File

@ -968,8 +968,7 @@ TokenDict Module::s_messages[] = {
{ "chan.locate", Module::Locate },
{ "chan.transfer", Module::Transfer },
{ "chan.control", Module::Control },
{ "msg.route", Module::ImRoute },
{ "msg.execute", Module::ImExecute },
{ "msg.execute", Module::MsgExecute },
{ 0, 0 }
};

View File

@ -3889,8 +3889,7 @@ void ClientDriver::setup()
installRelay(Progress);
installRelay(Route,200);
installRelay(Text);
installRelay(ImRoute);
installRelay(ImExecute);
installRelay(MsgExecute);
}
// if we receive a message for an incoming call, we pass the message on
@ -3913,9 +3912,20 @@ void ClientDriver::msgTimer(Message& msg)
// Routing handler
bool ClientDriver::msgRoute(Message& msg)
{
// don't route here our own calls
if (name() == msg.getValue(YSTRING("module")))
// don't route here our own messages
if (name() == msg[YSTRING("module")])
return false;
String* routeType = msg.getParam(YSTRING("route_type"));
if (routeType) {
if (*routeType == YSTRING("msg")) {
if (!(Client::self() && Client::self()->imRouting(msg)))
return false;
msg.retValue() = name() + "/*";
return true;
}
if (*routeType != YSTRING("call"))
return Driver::msgRoute(msg);
}
if (Client::self() && Client::self()->callRouting(msg)) {
msg.retValue() = name() + "/*";
return true;
@ -3925,16 +3935,7 @@ bool ClientDriver::msgRoute(Message& msg)
bool ClientDriver::received(Message& msg, int id)
{
if (id == ImRoute) {
// don't route here our own messages
if (name() == msg.getValue(YSTRING("module")))
return false;
if (!(Client::self() && Client::self()->imRouting(msg)))
return false;
msg.retValue() = name() + "/*";
return true;
}
if (id == ImExecute || id == Text) {
if (id == MsgExecute || id == Text) {
if (Client::isClientMsg(msg))
return false;
return Client::self() && Client::self()->imExecute(msg);

View File

@ -691,7 +691,7 @@ void ForkModule::initialize()
case Text:
case Update:
case Control:
case ImExecute:
case MsgExecute:
break;
default:
Debug(this,DebugWarn,"Refusing to fork message '%s'",item->name().c_str());

View File

@ -3084,7 +3084,7 @@ void JBModule::initialize()
setup();
installRelay(Halt);
installRelay(Help);
installRelay(ImExecute);
installRelay(MsgExecute);
installRelay(EngineStart,"engine.start");
s_jabber = new YJBEngine;
s_jabber->debugChain(this);
@ -3115,7 +3115,7 @@ void JBModule::initialize()
// Message handler
bool JBModule::received(Message& msg, int id)
{
if (id == ImExecute) {
if (id == MsgExecute) {
if (isModule(msg))
return false;
String* line = getLine(msg);

View File

@ -251,7 +251,7 @@ public:
// Process 'resource.notify' messages
bool handleResNotify(Message& msg);
// Process 'msg.execute' messages
bool handleMsgExecute(Message& msg);
bool handleMsgExecute(Message& msg, const String& target);
// Process 'jabber.item' messages
bool handleJabberItem(Message& msg);
// Process 'engine.start' messages
@ -581,6 +581,8 @@ class JBModule : public Module
public:
JBModule();
virtual ~JBModule();
inline const String& prefix() const
{ return m_prefix; }
// Inherited methods
virtual void initialize();
// Cancel a given listener or all listeners if name is empty
@ -625,6 +627,7 @@ protected:
void listener(TcpListener* l, bool add);
private:
bool m_init;
String m_prefix;
ObjList m_handlers; // Message handlers list
String m_domain; // Default domain served by the jabber engine
ObjList m_streamListeners;
@ -644,6 +647,8 @@ static bool s_dumpIq = false; // Dump 'iq' xml string in jabber.iq me
static bool s_engineStarted = false; // Engine started flag
static bool s_iqAuth = true; // Allow old style auth on c2s streams
static bool s_authCluster = false; // Use user.auth message for incoming cluster streams
static bool s_msgRouteExternal = false; // Send call.route for non configured serviced domains
static bool s_msgRouteForeign = false; // Send call.route for foreign (unknown) domains
static const String s_capsNode = "http://yate.null.ro/yate/server/caps"; // Server entity caps node
static const String s_yate = "yate";
static ObjList s_clusterControlSkip; // Params to skip from chan.control when sent in cluster
@ -1679,14 +1684,17 @@ bool YJBEngine::handleResNotify(Message& msg)
}
// Process 'msg.execute' messages
bool YJBEngine::handleMsgExecute(Message& msg)
bool YJBEngine::handleMsgExecute(Message& msg, const String& target)
{
JabberID caller(msg.getValue("caller"));
JabberID called(msg.getValue("called"));
JabberID called(target);
if (!caller.resource())
caller.resource(msg.getValue("caller_instance"));
Debug(this,DebugAll,"Processing %s caller=%s called=%s",
Debug(this,called.domain() ? DebugAll : DebugNote,
"Processing %s caller=%s called=%s",
msg.c_str(),caller.c_str(),called.c_str());
if (!called.domain())
return false;
if (hasDomain(called.domain()) && !hasComponent(called.domain())) {
// RFC 3921 11.1: Broadcast chat only to clients with non-negative resource priority
bool ok = false;
@ -3348,9 +3356,14 @@ void JBPendingWorker::processChat(JBPendingJob& job)
return;
}
XMPPError::Type error = XMPPError::NoError;
bool localTarget = s_jabber->hasDomain(ev->to().domain()) &&
!s_jabber->hasComponent(ev->to().domain()) &&
!s_jabber->isServerItemDomain(ev->to().domain());
bool localTarget = s_jabber->hasDomain(ev->to().domain());
bool externalTarget = false;
if (localTarget && (s_jabber->hasComponent(ev->to().domain()) ||
s_jabber->isServerItemDomain(ev->to().domain()))) {
localTarget = false;
externalTarget = true;
}
bool foreignTarget = !(localTarget || externalTarget);
// NOTE: RFC3921bis recommends to broadcast only 'headline' messages
// for bare jid target (or target resource not found)
@ -3370,70 +3383,73 @@ void JBPendingWorker::processChat(JBPendingJob& job)
}
}
Message m("msg.route");
Message m("call.route");
while (true) {
m.addParam("route_type","msg");
__plugin.complete(m);
const char* tStr = ev->stanzaType();
m.addParam("type",tStr ? tStr : XMPPUtils::msgText(XMPPUtils::Normal));
if (localTarget)
m.addParam("localdomain",String::boolText(localTarget));
if (externalTarget)
m.addParam("externaldomain",String::boolText(externalTarget));
addValidParam(m,"id",ev->id());
m.addParam("caller",ev->from().bare());
addValidParam(m,"called",ev->to().bare());
addValidParam(m,"caller_instance",ev->from().resource());
addValidParam(m,"called_instance",ev->to().resource());
if (localTarget) {
bool ok = Engine::dispatch(m);
if (!ok || (m.retValue() == "-") || (m.retValue() == "error")) {
// Check if an 'instance.count' parameter was returned:
// the target exists
if (m.getParam("instance.count"))
if (localTarget || (externalTarget && s_msgRouteExternal) ||
(foreignTarget && s_msgRouteForeign)) {
// Directed message with offline resource: try to retrieve online resources
// for non error/groupchat type
if (localTarget && ev->to().resource() && mType != XMPPUtils::MsgError &&
mType != XMPPUtils::GroupChat)
m.addParam("fallback_online_instances",String::boolText(true));
if (!(Engine::dispatch(m) && m.retValue() &&
m.retValue() != "-" && m.retValue() != "error")) {
// See RFC3921bis 8.2.2
// Discard errors, reject with error if type is groupchat
if (mType == XMPPUtils::MsgError)
break;
if (mType == XMPPUtils::GroupChat) {
error = XMPPError::ServiceUnavailable;
break;
}
if (localTarget && m.getParam(YSTRING("instance.count")))
// instance.count present means the sender is allowed to send chat
error = XMPPError::ItemNotFound;
else
error = XMPPError::ServiceUnavailable;
break;
}
// Directed message with instance not found
m.clearParam(YSTRING("error"));
m.clearParam(YSTRING("reason"));
m.clearParam(YSTRING("handlers"));
// Clear instance.count for directed chat if confirmed
// The absence of instance.count is an indication of directed chat
if (ev->to().resource()) {
if (m.getIntValue("instance.count")) {
// Clear instance.count to signal directed message
m.clearParam("instance.count");
}
else {
// See RFC3921bis 8.2.2
// Discard errors
if (mType == XMPPUtils::MsgError)
break;
// Deny groupchat
if (mType == XMPPUtils::GroupChat) {
error = XMPPError::ServiceUnavailable;
break;
NamedString* n = m.getParam(YSTRING("instance.count"));
if (n && n->toInteger() == 1) {
NamedString* inst = m.getParam(YSTRING("instance.1"));
if (inst && *inst == ev->to().resource()) {
m.clearParam(n);
m.clearParam(inst);
}
// Broadcast all other types
m.clearParam("called_instance");
ok = Engine::dispatch(m);
if (!ok || (m.retValue() == "-") || (m.retValue() == "error")) {
// Check if an 'instance.count' parameter was returned: the target exists
if (m.getParam("instance.count"))
error = XMPPError::ItemNotFound;
else
error = XMPPError::ServiceUnavailable;
break;
}
// Add again the called_instance param to signal directed message
m.addParam("called_instance",ev->to().resource());
}
}
m.setParam("callto",m.retValue());
m.retValue().clear();
}
// Check route(s)
else
m.addParam("callto",__plugin.prefix() + ev->to().bare());
// Execute
m = "msg.execute";
// m.setParam("callto",m.retValue());
m.clearParam("error");
m.retValue().clear();
XmlElement* xml = ev->releaseXml();
addValidParam(m,"subject",XMPPUtils::subject(*xml));
addValidParam(m,"body",XMPPUtils::body(*xml));
m.addParam(new NamedPointer("xml",xml));
if (!Engine::dispatch(m))
error = XMPPError::Gone;
error = XMPPError::Gone;
break;
}
if (error == XMPPError::NoError)
@ -3972,6 +3988,7 @@ JBModule::JBModule()
m_init(false)
{
Output("Loaded module Jabber Server");
m_prefix << name() << "/";
}
JBModule::~JBModule()
@ -3996,7 +4013,7 @@ void JBModule::initialize()
setup();
installRelay(Halt);
installRelay(Help);
installRelay(ImExecute);
installRelay(MsgExecute);
installRelay(Control);
s_jabber = new YJBEngine;
s_jabber->debugChain(this);
@ -4035,6 +4052,8 @@ void JBModule::initialize()
s_s2sFeatures = cfg.getBoolValue("general","s2s_offerfeatures",true);
s_dumpIq = cfg.getBoolValue("general","dump_iq");
s_authCluster = cfg.getBoolValue("general","authcluster");
s_msgRouteExternal = cfg.getBoolValue("general","message_route_external");
s_msgRouteForeign = cfg.getBoolValue("general","message_route_foreign");
// Init the engine
s_jabber->initialize(cfg.getSection("general"),!m_init);
@ -4152,8 +4171,11 @@ bool JBModule::checkTls(bool server, const String& domain)
// Message handler
bool JBModule::received(Message& msg, int id)
{
if (id == ImExecute)
return s_jabber->handleMsgExecute(msg);
if (id == MsgExecute) {
const String& dest = msg[YSTRING("callto")];
return dest.startsWith(prefix()) &&
s_jabber->handleMsgExecute(msg,dest.substr(prefix().length()));
}
if (id == Status) {
String target = msg.getValue("module");
// Target is the module

View File

@ -733,12 +733,13 @@ bool RouteHandler::received(Message &msg)
const char *context = msg.getValue(YSTRING("context"),"default");
Lock lock(s_mutex);
if (oneContext(msg,called,context,msg.retValue())) {
Debug(DebugInfo,"Routing call to '%s' in context '%s' via '%s' in " FMT64U " usec",
called.c_str(),context,msg.retValue().c_str(),Time::now()-tmr);
Debug(DebugInfo,"Routing %s to '%s' in context '%s' via '%s' in " FMT64U " usec",
msg.getValue(YSTRING("route_type"),"call"),called.c_str(),context,
msg.retValue().c_str(),Time::now()-tmr);
return true;
}
Debug(DebugInfo,"Could not route call to '%s' in context '%s', wasted " FMT64U " usec",
called.c_str(),context,Time::now()-tmr);
Debug(DebugInfo,"Could not route %s to '%s' in context '%s', wasted " FMT64U " usec",
msg.getValue(YSTRING("route_type"),"call"),called.c_str(),context,Time::now()-tmr);
return false;
};

View File

@ -497,8 +497,8 @@ public:
bool handleUserRosterDelete(const String& user, const String& contact, Message& msg);
// Handle 'user.update' messages with operation 'delete'
void handleUserUpdateDelete(const String& user, Message& msg);
// Handle 'msg.route' messages
bool imRoute(Message& msg);
// Handle 'call.route' messages
bool imRoute(Message& msg, const String& type);
void expireSubscriptions();
// Build a database message from account and query.
// Replace query params. Return Message pointer on success
@ -518,6 +518,7 @@ public:
String m_contactSetFullQuery;
String m_contactDeleteQuery;
String m_genericUserLoadQuery;
String m_routeCallto;
UserList m_users;
Mutex m_eventsMutex;
ObjList m_events;
@ -1729,13 +1730,12 @@ SubscriptionModule::~SubscriptionModule()
void SubscriptionModule::initialize()
{
Output("Initializing module Subscriptions");
Configuration cfg(Engine::configFile("subscription"));
if (m_handlers.skipNull()) {
// Reload generic users (wait engine.start for the first load)
m_genericUsers.load();
}
else {
Configuration cfg(Engine::configFile("subscription"));
m_account = cfg.getValue("general","account");
m_userLoadQuery = cfg.getValue("general","user_roster_load");
m_userEventQuery = cfg.getValue("general","user_event_auth");
@ -1753,7 +1753,7 @@ void SubscriptionModule::initialize()
// Install relays
setup();
installRelay(Halt);
installRelay(ImRoute);
installRelay(Route,cfg.getIntValue("priorities","call.route",100));
// Install handlers
for (const TokenDict* d = s_msgHandler; d->token; d++) {
if (d->value == SubMessageHandler::CallCdr && !m_userEventQuery)
@ -1763,6 +1763,10 @@ void SubscriptionModule::initialize()
m_handlers.append(h);
}
}
Lock lck(this);
m_routeCallto = cfg.getValue("general","route_callto","jabber/${called}");
if (!m_routeCallto)
Debug(this,DebugConf,"Empty 'route_callto' in config");
}
// Enqueue a resource.notify for a given instance
@ -2788,8 +2792,8 @@ void SubscriptionModule::handleUserUpdateDelete(const String& user, Message& msg
TelEngine::destruct(queryDb(m));
}
// Handle 'msg.route' messages
bool SubscriptionModule::imRoute(Message& msg)
// Handle 'call.route' messages
bool SubscriptionModule::imRoute(Message& msg, const String& type)
{
String* caller = msg.getParam("caller");
String* called = msg.getParam("called");
@ -2799,16 +2803,30 @@ bool SubscriptionModule::imRoute(Message& msg)
msg.c_str(),caller->c_str(),called->c_str());
PresenceUser* u = m_users.getUser(*called);
if (!u) {
Debug(this,DebugStub,"%s caller=%s called=%s destination is an unknown user",
msg.c_str(),caller->c_str(),called->c_str());
XDebug(this,DebugAll,
"%s '%s' caller=%s called=%s destination is an unknown user",
msg.c_str(),type.c_str(),caller->c_str(),called->c_str());
return false;
}
bool auth = msg.getBoolValue("auth");
bool ok = true;
unsigned int n = 0;
u->lock();
String* tmp = msg.getParam("called_instance");
if (TelEngine::null(tmp)) {
bool haveInst = !TelEngine::null(tmp);
u->lock();
// An instance was given
if (haveInst) {
if (!auth || u->findContact(*caller) || *caller == *called) {
Instance* inst = u->instances().findInstance(*tmp);
if (inst)
inst->addListParam(msg,++n);
}
else
ok = false;
}
// No instance given or requested to fallback to online instances
if (ok && !n &&
(!haveInst || msg.getBoolValue(YSTRING("fallback_online_instances")))) {
String* skip = 0;
if (*caller == *called)
skip = msg.getParam("caller_instance");
@ -2817,18 +2835,22 @@ bool SubscriptionModule::imRoute(Message& msg)
if (ok)
n = u->instances().addListParam(msg,skip);
}
else if (!auth || u->findContact(*caller) || *caller == *called) {
Instance* inst = u->instances().findInstance(*tmp);
if (inst)
inst->addListParam(msg,++n);
}
else
ok = false;
u->unlock();
TelEngine::destruct(u);
if (ok)
msg.addParam("instance.count",String(n));
return ok && n != 0;
if (!ok)
return false;
msg.addParam("instance.count",String(n));
if (n) {
lock();
msg.retValue() = m_routeCallto;
unlock();
msg.replaceParams(msg.retValue());
if (!msg.retValue())
return false;
Debug(this,DebugAll,"Routing '%s' caller=%s called=%s to '%s' instances=%u",
type.c_str(),caller->c_str(),called->c_str(),msg.retValue().c_str(),n);
}
return n != 0;
}
void SubscriptionModule::expireSubscriptions()
@ -2916,8 +2938,13 @@ bool SubscriptionModule::received(Message& msg, int id)
case Timer:
s_check = true;
break;
case ImRoute:
return imRoute(msg);
case Route:
{
if (!m_routeCallto)
return false;
String* t = msg.getParam(YSTRING("route_type"));
return t && (*t == YSTRING("msg")) && imRoute(msg,*t);
}
case Halt:
Lock lock(this);
if (m_expire)

View File

@ -3869,7 +3869,7 @@ void YJGDriver::initialize()
installRelay(Route);
installRelay(Update);
installRelay(Transfer);
installRelay(ImExecute);
installRelay(MsgExecute);
installRelay(Progress);
// Install handlers
for (const TokenDict* d = s_msgHandler; d->token; d++) {
@ -4273,7 +4273,7 @@ bool YJGDriver::msgExecute(Message& msg, String& dest)
// Message handler: Disconnect channels, destroy streams, clear rosters
bool YJGDriver::received(Message& msg, int id)
{
if (id == ImExecute)
if (id == MsgExecute)
return !isModule(msg) && handleImExecute(msg);
if (id == Execute) {
// Client only: handle call.execute with target starting jabber/
@ -4567,18 +4567,16 @@ bool YJGDriver::handleImExecute(Message& msg)
// Set local (target) from callto/called parameter
JabberID local;
String* callto = msg.getParam("callto");
if (TelEngine::null(callto)) {
if (TelEngine::null(callto))
local.set(msg.getValue("called"));
if (local && !local.resource())
local.resource(msg.getValue("called_instance"));
}
else {
if (!callto->startsWith(prefix()))
return false;
else if (callto->startsWith(prefix()))
local.set(callto->substr(prefix().length()));
}
else
return false;
if (!local)
return false;
if (!local.resource())
local.resource(msg.getValue("called_instance"));
Message* m = 0;
Lock lock(this);
// Check if target is in our domain(s)

View File

@ -1414,9 +1414,8 @@ protected:
Update = 0x00020000,
Transfer = 0x00040000,
Control = 0x00080000,
// Instant messenging related
ImRoute = 0x00100000,
ImExecute = 0x00200000,
// Instant messaging related
MsgExecute = 0x00100000,
// Last possible public ID
PubLast = 0x0fffffff,
// Private messages base ID