From 0bbe7c0ad4e6a1dd03801efe6ef3ad0147ae3035 Mon Sep 17 00:00:00 2001 From: marian Date: Wed, 23 Feb 2011 12:40:40 +0000 Subject: [PATCH] Create yate cluster listener and connections. Send/receive yate messages on cluster streams. git-svn-id: http://voip.null.ro/svn/yate@4127 acf43c95-373e-0410-b603-e72c3f656dc1 --- conf.d/jabberserver.conf.sample | 6 + modules/jabber/jabberserver.cpp | 430 ++++++++++++++++++++++++++++---- 2 files changed, 391 insertions(+), 45 deletions(-) diff --git a/conf.d/jabberserver.conf.sample b/conf.d/jabberserver.conf.sample index 7957e935..e662e69e 100644 --- a/conf.d/jabberserver.conf.sample +++ b/conf.d/jabberserver.conf.sample @@ -109,6 +109,12 @@ ; Defaults to zlib if missing ;compression_formats=zlib +; authcluster: boolean: True to authenticate incoming cluster stream requests +; If enabled, an user.auth message will be enqueued to request authentication +; If disabled, cluster streams will be automatically accepted +; Defaults to disable +;authcluster=disable + ; printxml: boolean/string: Print sent/received XML data to output if debug ; level is at least 9 ; Allowed values are boolean values or 'verbose' string diff --git a/modules/jabber/jabberserver.cpp b/modules/jabber/jabberserver.cpp index 7c29ccd4..44f3a367 100644 --- a/modules/jabber/jabberserver.cpp +++ b/modules/jabber/jabberserver.cpp @@ -212,6 +212,9 @@ public: const String& remote, String& key); // Check if a domain is serviced by this engine virtual bool hasDomain(const String& domain); + // Create a cluster stream or return an existing one + JBClusterStream* getClusterStream(const String& local, const NamedList& params, + bool create); // Retrieve a serviced domain. Return a referenced object LocalDomain* findDomain(const String& domain); // Retrieve a serviced domain from an event 'to' or event stream @@ -261,6 +264,9 @@ public: // The given event is always valid and carry a valid stream // Set local domain and stream features to advertise to remote party void processStartIn(JBEvent* ev); + // Process a stream start element received by an incoming cluster stream + // The given event is always valid and carry a valid stream + void processStartInCluster(JBEvent* ev); // Process Auth events from incoming streams // The given event is always valid and carry a valid stream void processAuthIn(JBEvent* ev); @@ -270,6 +276,9 @@ public: // Process stream Running, Destroy, Terminated events // The given event is always valid and carry a valid stream void processStreamEvent(JBEvent* ev); + // Process cluster stream Running, Destroy, Terminated events + // The given event is always valid and carry a valid stream + void processStreamEventCluster(JBEvent* ev); // Process stream DbResult events // The given event is always valid and carry a valid stream void processDbResult(JBEvent* ev); @@ -341,6 +350,12 @@ public: // Update serviced domains features // This method should be called with engine unlocked void updateDomainsFeatures(); + // Build an xml from a message and sent it through cluster + bool sendCluster(Message& msg, ObjList* skipParams = 0); + // Send an xml element on all cluster streams or on a specified one + // Consume the element + // This method is thread safe + bool sendCluster(XmlElement* xml, const String& node = String::empty()); // Program name and version to be advertised on request String m_progName; String m_progVersion; @@ -450,6 +465,8 @@ private: void processChat(JBPendingJob& job); // Process iq jobs void processIq(JBPendingJob& job); + // Process iq jobs for cluster streams + void processIqCluster(JBPendingJob& job); // Reset the global index bool resetIndex(); @@ -490,6 +507,7 @@ public: UserUpdate = -4, // YJBEngine::handleUserUpdate() JabberItem = -5, // YJBEngine::handleJabberItem() EngineStart = -6, // YJBEngine::handleEngineStart() + ClusterSend = -7, JabberIq = 150, // YJBEngine::handleJabberIq() }; JBMessageHandler(int handler); @@ -595,6 +613,7 @@ protected: virtual bool commandComplete(Message& msg, const String& partLine, const String& partWord); virtual bool commandExecute(String& retVal, const String& line); + bool handleClusterControl(Message& msg); // Build a listener from a list of parameters. Add it to the list and start it bool buildListener(const String& name, NamedList& p); // Add or remove a listener to/from list @@ -619,7 +638,10 @@ static bool s_s2sFeatures = true; // Offer RFC 3920 version=1 and stream static bool s_dumpIq = false; // Dump 'iq' xml string in jabber.iq message static bool s_engineStarted = false; // Engine started flag static bool s_iqAuth = true; // Allow old style auth on c2s streams +static bool s_authCluster = false; // Use user.auth message for incoming cluster streams static const String s_capsNode = "http://yate.null.ro/yate/server/caps"; // Server entity caps node +static const String s_yate = "yate"; +static ObjList s_clusterControlSkip; // Params to skip from chan.control when sent in cluster INIT_PLUGIN(JBModule); // The module static YJBEntityCapsList s_entityCaps; static YJBEngine* s_jabber = 0; @@ -649,6 +671,7 @@ static const TokenDict s_msgHandler[] = { {"jabber.iq", JBMessageHandler::JabberIq}, {"jabber.item", JBMessageHandler::JabberItem}, {"engine.start", JBMessageHandler::EngineStart}, + {"cluster.send", JBMessageHandler::ClusterSend}, {0,0} }; @@ -773,10 +796,9 @@ static XmlElement* buildRosterItem(NamedList& list, unsigned int index) // Complete stream type static void completeStreamType(String& buf, const String& part, bool addAll = false) { - static const String t[] = {"c2s","s2s","comp", ""}; static const String all[] = {"all","*",""}; - for (const String* d = t; !d->null(); d++) - Module::itemComplete(buf,*d,part); + for (const TokenDict* d = JBStream::s_typeName; d->token; d++) + Module::itemComplete(buf,d->token,part); if (addAll) for (const String* d = all; !d->null(); d++) Module::itemComplete(buf,*d,part); @@ -805,6 +827,37 @@ static void fillStreamRemote(String& buf, JBStream& stream, const char* sep = " buf << tmp; } +// Add compression feature if available and not already compressed +static void addCompressFeature(JBStream* stream, XMPPFeatureList& features) +{ + if (!stream || stream->flag(JBStream::StreamCompressed)) + return; + String fmts = __plugin.compressFmts(); + if (fmts) + features.add(new XMPPFeatureCompress(fmts)); +} + +// Build an XML element from a list of parameters +static XmlElement* list2xml(NamedList& list, const char* name, ObjList* skip = 0) +{ + static const String s_clusterPrefix = "cluster."; + XmlElement* iq = XMPPUtils::createIq(XMPPUtils::IqSet); + XmlElement* m = XMPPUtils::createElement(s_yate,XMPPNamespace::YateCluster); + iq->addChild(m); + m->setAttributeValid("name",name); + const String& tag = XMPPUtils::s_tag[XmlTag::Item]; + NamedIterator iter(list); + const NamedString* ns = 0; + while (0 != (ns = iter.get())) { + if (skip && skip->find(ns->name())) + continue; + if (ns->name().startsWith(s_clusterPrefix,false)) + continue; + m->addChild(XmlElement::param2xml(const_cast(ns),tag)); + } + return iq; +} + /* * LocalDomain @@ -908,12 +961,12 @@ bool YJBEntityCapsList::processCaps(String& capsId, XmlElement* xml, JBStream* s bool ok = decodeCaps(*xml,version,node,ver,ext); if (ok) { JBEntityCaps::buildId(capsId,version,*node,*ver,ext); - JBEntityCapsList::requestCaps(s,from,to,capsId,version,*node,*ver); + JBEntityCapsList::requestCaps(s,from,to,capsId,version,*node,*ver); } TelEngine::destruct(s); return ok; } - + // Load the entity caps file void YJBEntityCapsList::load() { @@ -957,6 +1010,8 @@ YJBEngine::YJBEngine() m_s2sProcess = new YStreamSetProcess(this,0,"s2s/process"); m_compReceive = new YStreamSetReceive(this,0,"comp/recv"); m_compProcess = new YStreamSetProcess(this,0,"comp/process"); + m_clusterReceive = new YStreamSetReceive(this,0,"cluster/recv"); + m_clusterProcess = new YStreamSetProcess(this,0,"cluster/process"); } YJBEngine::~YJBEngine() @@ -1093,12 +1148,16 @@ void YJBEngine::processEvent(JBEvent* ev) case JBEvent::Iq: if (!ev->element()) break; - if (!routeInternal(ev)) + if (ev->clusterStream() || !routeInternal(ev)) JBPendingWorker::add(ev); break; case JBEvent::Start: - if (ev->stream()->incoming()) - processStartIn(ev); + if (ev->stream()->incoming()) { + if (!ev->clusterStream()) + processStartIn(ev); + else + processStartInCluster(ev); + } else { if (!checkDupId(ev->stream())) ev->stream()->start(); @@ -1116,7 +1175,10 @@ void YJBEngine::processEvent(JBEvent* ev) case JBEvent::Running: case JBEvent::Destroy: case JBEvent::Terminated: - processStreamEvent(ev); + if (!ev->clusterStream()) + processStreamEvent(ev); + else + processStreamEventCluster(ev); break; case JBEvent::DbResult: processDbResult(ev); @@ -1197,6 +1259,32 @@ bool YJBEngine::hasDomain(const String& domain) return 0 != findDomain(domain,true) || 0 != findDomain(domain,false); } +// Create a cluster stream or return an existing one +JBClusterStream* YJBEngine::getClusterStream(const String& remote, const NamedList& params, + bool create) +{ + if (!remote) + return 0; + if (remote == Engine::nodeName()) { + Debug(this,DebugInfo,"Request to create cluster stream to own node!"); + return 0; + } + JBClusterStream* s = findClusterStream(remote); + if (s) { + // TODO: Check ip/port: it might change. + // Destroy existing and create a new one + return s; + } + if (!create) + return 0; + if (!params.getIntValue("port")) { + Debug(this,DebugNote,"Can't create cluster stream to '%s': port is missing", + remote.c_str()); + return 0; + } + return createClusterStream(Engine::nodeName(),remote,¶ms); +} + // Retrieve a serviced domain // Returns a refferenced object LocalDomain* YJBEngine::findDomain(const String& domain) @@ -1948,16 +2036,69 @@ void YJBEngine::processStartIn(JBEvent* ev) features.add(reg); features.add(auth); // Offer compression - if (setComp && !ev->stream()->flag(JBStream::StreamCompressed)) { - String fmts = __plugin.compressFmts(); - if (fmts) - features.add(new XMPPFeatureCompress(fmts)); - } + if (setComp) + addCompressFeature(ev->stream(),features); features.add(bind); ev->releaseStream(); ev->stream()->start(&features,caps); } +// Process a stream start element received by an incoming cluster stream +// The given event is always valid and carry a valid stream +void YJBEngine::processStartInCluster(JBEvent* ev) +{ + JBClusterStream* cluster = ev->clusterStream(); + if (ev->to() != Engine::nodeName()) { + SocketAddr addr; + cluster->remoteAddr(addr); + Debug(&__plugin,DebugWarn, + "Got cluster stream from='%s' addr=%s:%d to invalid node '%s'", + ev->from().c_str(),addr.host().c_str(),addr.port(), + ev->to().c_str()); + cluster->terminate(-1,true,0,XMPPError::HostUnknown); + return; + } + if (ev->from() == Engine::nodeName()) { + SocketAddr addr; + cluster->remoteAddr(addr); + Debug(&__plugin,DebugWarn, + "Got cluster stream from addr=%s:%d with the same node", + addr.host().c_str(),addr.port()); + cluster->terminate(-1,true,0,XMPPError::BadAddressing); + return; + } + JBClusterStream* dup = findClusterStream(ev->from(),cluster); + if (dup && dup->outgoing()) { + // Higher name is the BOSS! + int cmp = XMPPUtils::cmpBytes(ev->to(),ev->from()); + if (cmp >= 0) { + dup->terminate(-1,true,0,XMPPError::Conflict); + TelEngine::destruct(dup); + } + } + if (!dup) { + if (!s_authCluster || cluster->flag(JBStream::StreamAuthenticated)) { + XMPPFeatureList features; + addCompressFeature(cluster,features); + cluster->start(&features); + } + else + Engine::enqueue(new UserAuthMessage(ev)); + return; + } + SocketAddr oldAddr; + SocketAddr newAddr; + dup->remoteAddr(oldAddr); + cluster->remoteAddr(newAddr); + int level = (oldAddr.host() == newAddr.host()) ? DebugInfo : DebugWarn; + Debug(&__plugin,level, + "Got duplicate cluster stream node='%s' addr=%s:%d existing=%s:%d", + ev->from().c_str(),newAddr.host().c_str(),newAddr.port(), + oldAddr.host().c_str(),oldAddr.port()); + TelEngine::destruct(dup); + cluster->terminate(-1,true,0,XMPPError::Conflict); +} + // Process Auth events from incoming streams // The given event is always valid and carry a valid stream void YJBEngine::processAuthIn(JBEvent* ev) @@ -2174,6 +2315,43 @@ void YJBEngine::processStreamEvent(JBEvent* ev) Engine::enqueue(m); } +// Process cluster stream Running, Destroy, Terminated events +// The given event is always valid and carry a valid stream +void YJBEngine::processStreamEventCluster(JBEvent* ev) +{ + JBClusterStream* s = ev->clusterStream(); + if (!s) + return; + s->lock(); + String node = s->remote(); + s->unlock(); + bool reg = ev->type() == JBEvent::Running; + // Check for another stream on termination: the notification may come + // from a conflicted stream + JBClusterStream* dup = reg ? 0 : findClusterStream(node,s); + Debug(this,DebugAll,"Cluster stream (%p,%s) node=%s event=%s", + s,s->name(),node.c_str(),ev->name()); + if (dup) { + TelEngine::destruct(dup); + return; + } + Message* m = __plugin.message("cluster.node"); + m->addParam("node",node); + m->addParam("registered",String::boolText(reg)); + if (reg) { + SocketAddr addr; + if (s->remoteAddr(addr)) { + m->addParam("ip_host",addr.host()); + m->addParam("ip_port",String(addr.port())); + } + } + else { + if (ev->text()) + m->addParam("error",ev->text()); + } + Engine::enqueue(m); +} + // Process stream DbResult events // The given event is always valid and carry a valid stream void YJBEngine::processDbResult(JBEvent* ev) @@ -2592,13 +2770,16 @@ Message* YJBEngine::userRegister(JBStream& stream, bool reg, const char* instanc void YJBEngine::statusParams(String& str) { lock(); - unsigned int c2s = m_c2sReceive ? m_c2sReceive->streamCount() : 0; - unsigned int s2s = m_s2sReceive ? m_s2sReceive->streamCount() : 0; - unsigned int comp = m_compReceive ? m_compReceive->streamCount() : 0; + RefPointer list[JBStream::TypeCount]; + getStreamLists(list); unlock(); - str << lookup(JBStream::c2s,JBStream::s_typeName) << "=" << c2s; - str << "," << lookup(JBStream::s2s,JBStream::s_typeName) << "=" << s2s; - str << "," << lookup(JBStream::comp,JBStream::s_typeName) << "=" << comp; + for (int i = 0; i < JBStream::TypeCount; i++) { + if (i) + str << ","; + str << lookup(i,JBStream::s_typeName) << "="; + str << (list[i] ? list[i]->streamCount() : 0); + list[i] = 0; + } } // Fill module status detail @@ -2607,7 +2788,9 @@ unsigned int YJBEngine::statusDetail(String& str, JBStream::Type t, JabberID* re XDebug(this,DebugAll,"statusDetail('%s','%s')", lookup(t,JBStream::s_typeName),TelEngine::c_safe(remote)); RefPointer list[JBStream::TypeCount]; + lock(); getStreamLists(list,t); + unlock(); str << "format=Direction|Type|Status|Local|Remote"; unsigned int n = 0; for (unsigned int i = 0; i < JBStream::TypeCount; i++) { @@ -2619,18 +2802,20 @@ unsigned int YJBEngine::statusDetail(String& str, JBStream::Type t, JabberID* re for (ObjList* s = set->clients().skipNull(); s; s = s->skipNext()) { JBStream* stream = static_cast(s->get()); Lock lock(stream); - if (!remote || (i == JBStream::c2s && stream->remote().match(*remote))) { + bool handle = !remote; + if (!handle) { + if (i == JBStream::c2s || i == JBStream::cluster) + handle = stream->remote().match(*remote); + else if (i == JBStream::s2s) { + JBServerStream* s2s = stream->serverStream(); + handle = (s2s->outgoing() && s2s->remote() == *remote) || + (s2s->incoming() && s2s->hasRemoteDomain(*remote,false)); + } + } + if (handle) { n++; streamDetail(str,stream); } - else if (i == JBStream::s2s) { - JBServerStream* s2s = stream->serverStream(); - if ((s2s->outgoing() && s2s->remote() == *remote) || - (s2s->incoming() && s2s->hasRemoteDomain(*remote,false))) { - n++; - streamDetail(str,stream); - } - } } } list[i]->unlock(); @@ -2683,10 +2868,7 @@ void YJBEngine::completeStreamRemote(String& str, const String& partWord, JBStre { lock(); RefPointer list = 0; - if (t == JBStream::c2s) - list = m_c2sReceive; - else if (t == JBStream::s2s) - list = m_s2sReceive; + getStreamList(list,t); unlock(); if (!list) return; @@ -2696,7 +2878,7 @@ void YJBEngine::completeStreamRemote(String& str, const String& partWord, JBStre for (ObjList* s = set->clients().skipNull(); s; s = s->skipNext()) { JBStream* stream = static_cast(s->get()); Lock lock(stream); - if (t == JBStream::c2s || stream->outgoing()) + if (t == JBStream::c2s || t == JBStream::cluster || stream->outgoing()) Module::itemComplete(str,stream->remote(),partWord); else if (t == JBStream::s2s && stream->incoming()) { JBServerStream* s2s = stream->serverStream(); @@ -2716,10 +2898,11 @@ void YJBEngine::completeStreamRemote(String& str, const String& partWord, JBStre // Complete stream id starting with partWord void YJBEngine::completeStreamName(String& str, const String& partWord) { + RefPointer list[JBStream::TypeCount]; lock(); - RefPointer list[2] = {m_c2sReceive,m_s2sReceive}; + getStreamLists(list); unlock(); - for (unsigned int i = 0; i < 2; i++) { + for (unsigned int i = 0; i < JBStream::TypeCount; i++) { if (!list[i]) continue; list[i]->lock(); @@ -2758,6 +2941,53 @@ void YJBEngine::updateDomainsFeatures() } } +// Build an xml from a message and sent it through cluster +bool YJBEngine::sendCluster(Message& msg, ObjList* skipParams) +{ + const char* name = msg.getValue("cluster.message"); + XmlElement* xml = list2xml(msg,name,skipParams); + return sendCluster(xml,msg["cluster.node"]); +} + +// Send an xml element on all cluster streams or on a specified one +// Consume the element +// This method is thread safe +bool YJBEngine::sendCluster(XmlElement* xml, const String& node) +{ + if (!xml) + return false; + lock(); + RefPointer list = m_clusterReceive; + unlock(); + if (!list) { + TelEngine::destruct(xml); + return 0; + } + Debug(this,DebugAll,"Sending cluster xml (%p) nodes=%s",xml,node.c_str()); + ObjList* nodes = node ? node.split(',',false) : 0; + bool ok = false; + Lock lock(list); + for (ObjList* o = list->sets().skipNull(); o; o = o->skipNext()) { + JBStreamSet* set = static_cast(o->get()); + for (ObjList* s = set->clients().skipNull(); s; s = s->skipNext()) { + JBClusterStream* stream = static_cast(s->get()); + if (stream->state() == JBStream::Destroy) + continue; + if (nodes) { + Lock lock(stream); + if (!nodes->find(stream->remote())) + continue; + } + XmlElement* tmp = new XmlElement(*xml); + ok = stream->sendStanza(tmp) || ok; + } + } + list = 0; + TelEngine::destruct(nodes); + TelEngine::destruct(xml); + return ok; +} + // Notify an incoming s2s stream about a dialback verify response void YJBEngine::notifyDbVerifyResult(const JabberID& local, const JabberID& remote, const String& id, XMPPError::Type rsp) @@ -2807,11 +3037,12 @@ JBPendingJob::JBPendingJob(JBEvent* ev) m_serverTarget(false), m_serverItemTarget(false) { - m_serverItemTarget = ev->to() && s_jabber->isServerItemDomain(ev->to().domain()); - Lock lock(ev->stream()); - m_local = ev->stream()->local().domain(); - m_serverTarget = !m_serverItemTarget && (!ev->to() || ev->to() == ev->stream()->local()); - lock.drop(); + if (ev->stream()->type() != JBStream::cluster) { + m_serverItemTarget = ev->to() && s_jabber->isServerItemDomain(ev->to().domain()); + Lock lock(ev->stream()); + m_local = ev->stream()->local().domain(); + m_serverTarget = !m_serverItemTarget && (!ev->to() || ev->to() == ev->stream()->local()); + } m_event->releaseStream(true); } @@ -2896,7 +3127,10 @@ void JBPendingWorker::run() processChat(*job); break; case XmlTag::Iq: - processIq(*job); + if (job->m_streamType != JBStream::cluster) + processIq(*job); + else + processIqCluster(*job); break; default: DDebug(&__plugin,DebugStub, @@ -2984,7 +3218,7 @@ bool JBPendingWorker::add(JBEvent* ev) String id(ev->from()); if (ev->stream()->type() == JBStream::s2s) id << ev->to(); - unsigned int index = id.hash() % s_threadCount; + unsigned int index = id.toLower().hash() % s_threadCount; JBPendingWorker* th = s_threads[index]; if (th) { Lock lock(th); @@ -3303,6 +3537,48 @@ void JBPendingWorker::processIq(JBPendingJob& job) job.sendIqErrorStanza(XMPPError::ServiceUnavailable); } +// Process iq jobs +void JBPendingWorker::processIqCluster(JBPendingJob& job) +{ + JBEvent* ev = job.m_event; + XmlElement* service = ev->child(); + XMPPUtils::IqType t = XMPPUtils::iqType(ev->stanzaType()); + String* xmlns = 0; + int ns = XMPPNamespace::Count; + if (service) { + xmlns = service->xmlns(); + if (xmlns) + ns = XMPPUtils::s_ns[*xmlns]; + } + Debug(&__plugin,DebugAll, + "JBPendingWorker(%u) processing cluster (%p,%s) type=%s from=%s child=(%s,%s) [%p]", + m_index,ev->element(),ev->element()->tag(),ev->stanzaType().c_str(), + ev->from().c_str(),service ? service->tag() : "", + TelEngine::c_safe(xmlns),this); + if (!service || ns != XMPPNamespace::YateCluster) + return; + if (service->unprefixedTag() == s_yate) { + if (t != XMPPUtils::IqSet) { + Debug(&__plugin,DebugStub,"processIqCluster: unhandled iq type '%s'", + ev->stanzaType().c_str()); + return; + } + const char* msg = service->attribute("name"); + if (TelEngine::null(msg)) + return; + Message* m = new Message(msg); + XmlElement::xml2param(*m,service,&(XMPPUtils::s_tag[XmlTag::Item])); + String module = m->getValue("module"); + m->setParam("module",__plugin.name()); + m->setParam("nodename",ev->from()); + m->addParam(ev->from() + ".module",module,false); + Engine::enqueue(m); + return; + } + Debug(&__plugin,DebugStub,"processIqCluster: unhandled tag '%s'", + service->unprefixedTag().c_str()); +} + // Reset the global index bool JBPendingWorker::resetIndex() { @@ -3329,6 +3605,8 @@ UserAuthMessage::UserAuthMessage(JBEvent* ev) m_stream.c_str(),m_streamType,this); __plugin.complete(*this); addParam("streamtype",ev->stream()->typeName()); + if (m_streamType == JBStream::cluster) + addParam("node",ev->from(),false); SocketAddr addr; if (ev->stream()->remoteAddr(addr)) { addParam("ip_host",addr.host()); @@ -3354,6 +3632,17 @@ void UserAuthMessage::dispatched(bool accepted) JabberID username = getValue("username"); // Use a while() to break to the end while (stream) { + if (stream->type() == JBStream::cluster) { + if (accepted) { + XMPPFeatureList features; + addCompressFeature(stream,features); + stream->start(&features); + } + else + stream->terminate(-1,true,0,XMPPError::NotAuthorized); + TelEngine::destruct(stream); + return; + } Lock lock(stream); // Returned value '-' means deny if (accepted && retValue() == "-") @@ -3441,6 +3730,8 @@ bool JBMessageHandler::received(Message& msg) case EngineStart: s_jabber->handleEngineStart(msg); return false; + case ClusterSend: + return s_jabber->sendCluster(msg); default: DDebug(&__plugin,DebugStub,"JBMessageHandler(%s) not handled!",msg.c_str()); } @@ -3582,11 +3873,17 @@ void JBModule::initialize() Configuration cfg(Engine::configFile("jabberserver")); if (!m_init) { + // Init some globals + s_clusterControlSkip.append(new String("targetid")); + s_clusterControlSkip.append(new String("component")); + s_clusterControlSkip.append(new String("operation")); + // Init module m_init = true; setup(); installRelay(Halt); installRelay(Help); installRelay(ImExecute); + installRelay(Control); s_jabber = new YJBEngine; s_jabber->debugChain(this); // Install handlers @@ -3623,6 +3920,7 @@ void JBModule::initialize() // (re)init globals s_s2sFeatures = cfg.getBoolValue("general","s2s_offerfeatures",true); s_dumpIq = cfg.getBoolValue("general","dump_iq"); + s_authCluster = cfg.getBoolValue("general","authcluster"); // Init the engine s_jabber->initialize(cfg.getSection("general"),!m_init); @@ -3800,6 +4098,11 @@ bool JBModule::received(Message& msg, int id) msg.retValue() << "Show or set the debug level for a stream.\r\n"; return true; } + if (id == Control) { + if (msg["targetid"] == "cluster") + return handleClusterControl(msg); + return Module::received(msg,id); + } if (id == Halt) { s_jabber->setExiting(); // Stop pending job workers @@ -3932,7 +4235,8 @@ bool JBModule::commandExecute(String& retVal, const String& line) JabberID remote(l); unsigned int n = 0; if (remote.valid()) - n = s_jabber->dropAll(t,JabberID::empty(),remote); + n = s_jabber->dropAll(t,JabberID::empty(),remote, + XMPPError::UndefinedCondition,"dropped"); retVal << "Dropped " << n << " stream(s)"; } else { @@ -3941,7 +4245,7 @@ bool JBModule::commandExecute(String& retVal, const String& line) n.append(l," "); JBStream* stream = s_jabber->findStream(word); if (stream) { - stream->terminate(-1,true,0,XMPPError::NoError); + stream->terminate(-1,true,0,XMPPError::UndefinedCondition,"dropped"); TelEngine::destruct(stream); retVal << "Dropped stream '" << n << "'"; } @@ -4032,6 +4336,42 @@ bool JBModule::commandExecute(String& retVal, const String& line) return true; } +// Handle chan.control with targetid=cluster +bool JBModule::handleClusterControl(Message& msg) +{ + const String& oper = msg["operation"]; + Debug(this,DebugAll,"Handling cluster control oper=%s",oper.c_str()); + // Send yate message + if (oper == "send") + return s_jabber->sendCluster(msg,&s_clusterControlSkip); + // Start/stop listener + if (oper == "listen") { + String name = msg.getValue("name","cluster"); + if (msg.getBoolValue("enable")) { + NamedList p(msg); + p.setParam("type",lookup(JBStream::cluster,JBStream::s_typeName)); + return buildListener(name,p); + } + cancelListener(name); + return false; + } + // Start/stop node connection + if (oper == "connect") { + const String& node = msg["node"]; + if (!node) + return false; + bool enable = msg.getBoolValue("enable"); + JBClusterStream* s = s_jabber->getClusterStream(node,msg,enable); + if (!s) + return false; + if (!enable) + s->terminate(-1,true,0,XMPPError::NoError,msg.getValue("reason","dropped")); + TelEngine::destruct(s); + return true; + } + return false; +} + // Build a listener from a list of parameters. Add it to the list and start it bool JBModule::buildListener(const String& name, NamedList& p) {