Create yate cluster listener and connections. Send/receive yate messages on cluster streams.

git-svn-id: http://voip.null.ro/svn/yate@4127 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
marian 2011-02-23 12:40:40 +00:00
parent bf1e35151c
commit 0bbe7c0ad4
2 changed files with 391 additions and 45 deletions

View File

@ -109,6 +109,12 @@
; Defaults to zlib if missing
;compression_formats=zlib
; authcluster: boolean: True to authenticate incoming cluster stream requests
; If enabled, an user.auth message will be enqueued to request authentication
; If disabled, cluster streams will be automatically accepted
; Defaults to disable
;authcluster=disable
; printxml: boolean/string: Print sent/received XML data to output if debug
; level is at least 9
; Allowed values are boolean values or 'verbose' string

View File

@ -212,6 +212,9 @@ public:
const String& remote, String& key);
// Check if a domain is serviced by this engine
virtual bool hasDomain(const String& domain);
// Create a cluster stream or return an existing one
JBClusterStream* getClusterStream(const String& local, const NamedList& params,
bool create);
// Retrieve a serviced domain. Return a referenced object
LocalDomain* findDomain(const String& domain);
// Retrieve a serviced domain from an event 'to' or event stream
@ -261,6 +264,9 @@ public:
// The given event is always valid and carry a valid stream
// Set local domain and stream features to advertise to remote party
void processStartIn(JBEvent* ev);
// Process a stream start element received by an incoming cluster stream
// The given event is always valid and carry a valid stream
void processStartInCluster(JBEvent* ev);
// Process Auth events from incoming streams
// The given event is always valid and carry a valid stream
void processAuthIn(JBEvent* ev);
@ -270,6 +276,9 @@ public:
// Process stream Running, Destroy, Terminated events
// The given event is always valid and carry a valid stream
void processStreamEvent(JBEvent* ev);
// Process cluster stream Running, Destroy, Terminated events
// The given event is always valid and carry a valid stream
void processStreamEventCluster(JBEvent* ev);
// Process stream DbResult events
// The given event is always valid and carry a valid stream
void processDbResult(JBEvent* ev);
@ -341,6 +350,12 @@ public:
// Update serviced domains features
// This method should be called with engine unlocked
void updateDomainsFeatures();
// Build an xml from a message and sent it through cluster
bool sendCluster(Message& msg, ObjList* skipParams = 0);
// Send an xml element on all cluster streams or on a specified one
// Consume the element
// This method is thread safe
bool sendCluster(XmlElement* xml, const String& node = String::empty());
// Program name and version to be advertised on request
String m_progName;
String m_progVersion;
@ -450,6 +465,8 @@ private:
void processChat(JBPendingJob& job);
// Process iq jobs
void processIq(JBPendingJob& job);
// Process iq jobs for cluster streams
void processIqCluster(JBPendingJob& job);
// Reset the global index
bool resetIndex();
@ -490,6 +507,7 @@ public:
UserUpdate = -4, // YJBEngine::handleUserUpdate()
JabberItem = -5, // YJBEngine::handleJabberItem()
EngineStart = -6, // YJBEngine::handleEngineStart()
ClusterSend = -7,
JabberIq = 150, // YJBEngine::handleJabberIq()
};
JBMessageHandler(int handler);
@ -595,6 +613,7 @@ protected:
virtual bool commandComplete(Message& msg, const String& partLine,
const String& partWord);
virtual bool commandExecute(String& retVal, const String& line);
bool handleClusterControl(Message& msg);
// Build a listener from a list of parameters. Add it to the list and start it
bool buildListener(const String& name, NamedList& p);
// Add or remove a listener to/from list
@ -619,7 +638,10 @@ static bool s_s2sFeatures = true; // Offer RFC 3920 version=1 and stream
static bool s_dumpIq = false; // Dump 'iq' xml string in jabber.iq message
static bool s_engineStarted = false; // Engine started flag
static bool s_iqAuth = true; // Allow old style auth on c2s streams
static bool s_authCluster = false; // Use user.auth message for incoming cluster streams
static const String s_capsNode = "http://yate.null.ro/yate/server/caps"; // Server entity caps node
static const String s_yate = "yate";
static ObjList s_clusterControlSkip; // Params to skip from chan.control when sent in cluster
INIT_PLUGIN(JBModule); // The module
static YJBEntityCapsList s_entityCaps;
static YJBEngine* s_jabber = 0;
@ -649,6 +671,7 @@ static const TokenDict s_msgHandler[] = {
{"jabber.iq", JBMessageHandler::JabberIq},
{"jabber.item", JBMessageHandler::JabberItem},
{"engine.start", JBMessageHandler::EngineStart},
{"cluster.send", JBMessageHandler::ClusterSend},
{0,0}
};
@ -773,10 +796,9 @@ static XmlElement* buildRosterItem(NamedList& list, unsigned int index)
// Complete stream type
static void completeStreamType(String& buf, const String& part, bool addAll = false)
{
static const String t[] = {"c2s","s2s","comp", ""};
static const String all[] = {"all","*",""};
for (const String* d = t; !d->null(); d++)
Module::itemComplete(buf,*d,part);
for (const TokenDict* d = JBStream::s_typeName; d->token; d++)
Module::itemComplete(buf,d->token,part);
if (addAll)
for (const String* d = all; !d->null(); d++)
Module::itemComplete(buf,*d,part);
@ -805,6 +827,37 @@ static void fillStreamRemote(String& buf, JBStream& stream, const char* sep = "
buf << tmp;
}
// Add compression feature if available and not already compressed
static void addCompressFeature(JBStream* stream, XMPPFeatureList& features)
{
if (!stream || stream->flag(JBStream::StreamCompressed))
return;
String fmts = __plugin.compressFmts();
if (fmts)
features.add(new XMPPFeatureCompress(fmts));
}
// Build an XML element from a list of parameters
static XmlElement* list2xml(NamedList& list, const char* name, ObjList* skip = 0)
{
static const String s_clusterPrefix = "cluster.";
XmlElement* iq = XMPPUtils::createIq(XMPPUtils::IqSet);
XmlElement* m = XMPPUtils::createElement(s_yate,XMPPNamespace::YateCluster);
iq->addChild(m);
m->setAttributeValid("name",name);
const String& tag = XMPPUtils::s_tag[XmlTag::Item];
NamedIterator iter(list);
const NamedString* ns = 0;
while (0 != (ns = iter.get())) {
if (skip && skip->find(ns->name()))
continue;
if (ns->name().startsWith(s_clusterPrefix,false))
continue;
m->addChild(XmlElement::param2xml(const_cast<NamedString*>(ns),tag));
}
return iq;
}
/*
* LocalDomain
@ -908,12 +961,12 @@ bool YJBEntityCapsList::processCaps(String& capsId, XmlElement* xml, JBStream* s
bool ok = decodeCaps(*xml,version,node,ver,ext);
if (ok) {
JBEntityCaps::buildId(capsId,version,*node,*ver,ext);
JBEntityCapsList::requestCaps(s,from,to,capsId,version,*node,*ver);
JBEntityCapsList::requestCaps(s,from,to,capsId,version,*node,*ver);
}
TelEngine::destruct(s);
return ok;
}
// Load the entity caps file
void YJBEntityCapsList::load()
{
@ -957,6 +1010,8 @@ YJBEngine::YJBEngine()
m_s2sProcess = new YStreamSetProcess(this,0,"s2s/process");
m_compReceive = new YStreamSetReceive(this,0,"comp/recv");
m_compProcess = new YStreamSetProcess(this,0,"comp/process");
m_clusterReceive = new YStreamSetReceive(this,0,"cluster/recv");
m_clusterProcess = new YStreamSetProcess(this,0,"cluster/process");
}
YJBEngine::~YJBEngine()
@ -1093,12 +1148,16 @@ void YJBEngine::processEvent(JBEvent* ev)
case JBEvent::Iq:
if (!ev->element())
break;
if (!routeInternal(ev))
if (ev->clusterStream() || !routeInternal(ev))
JBPendingWorker::add(ev);
break;
case JBEvent::Start:
if (ev->stream()->incoming())
processStartIn(ev);
if (ev->stream()->incoming()) {
if (!ev->clusterStream())
processStartIn(ev);
else
processStartInCluster(ev);
}
else {
if (!checkDupId(ev->stream()))
ev->stream()->start();
@ -1116,7 +1175,10 @@ void YJBEngine::processEvent(JBEvent* ev)
case JBEvent::Running:
case JBEvent::Destroy:
case JBEvent::Terminated:
processStreamEvent(ev);
if (!ev->clusterStream())
processStreamEvent(ev);
else
processStreamEventCluster(ev);
break;
case JBEvent::DbResult:
processDbResult(ev);
@ -1197,6 +1259,32 @@ bool YJBEngine::hasDomain(const String& domain)
return 0 != findDomain(domain,true) || 0 != findDomain(domain,false);
}
// Create a cluster stream or return an existing one
JBClusterStream* YJBEngine::getClusterStream(const String& remote, const NamedList& params,
bool create)
{
if (!remote)
return 0;
if (remote == Engine::nodeName()) {
Debug(this,DebugInfo,"Request to create cluster stream to own node!");
return 0;
}
JBClusterStream* s = findClusterStream(remote);
if (s) {
// TODO: Check ip/port: it might change.
// Destroy existing and create a new one
return s;
}
if (!create)
return 0;
if (!params.getIntValue("port")) {
Debug(this,DebugNote,"Can't create cluster stream to '%s': port is missing",
remote.c_str());
return 0;
}
return createClusterStream(Engine::nodeName(),remote,&params);
}
// Retrieve a serviced domain
// Returns a refferenced object
LocalDomain* YJBEngine::findDomain(const String& domain)
@ -1948,16 +2036,69 @@ void YJBEngine::processStartIn(JBEvent* ev)
features.add(reg);
features.add(auth);
// Offer compression
if (setComp && !ev->stream()->flag(JBStream::StreamCompressed)) {
String fmts = __plugin.compressFmts();
if (fmts)
features.add(new XMPPFeatureCompress(fmts));
}
if (setComp)
addCompressFeature(ev->stream(),features);
features.add(bind);
ev->releaseStream();
ev->stream()->start(&features,caps);
}
// Process a stream start element received by an incoming cluster stream
// The given event is always valid and carry a valid stream
void YJBEngine::processStartInCluster(JBEvent* ev)
{
JBClusterStream* cluster = ev->clusterStream();
if (ev->to() != Engine::nodeName()) {
SocketAddr addr;
cluster->remoteAddr(addr);
Debug(&__plugin,DebugWarn,
"Got cluster stream from='%s' addr=%s:%d to invalid node '%s'",
ev->from().c_str(),addr.host().c_str(),addr.port(),
ev->to().c_str());
cluster->terminate(-1,true,0,XMPPError::HostUnknown);
return;
}
if (ev->from() == Engine::nodeName()) {
SocketAddr addr;
cluster->remoteAddr(addr);
Debug(&__plugin,DebugWarn,
"Got cluster stream from addr=%s:%d with the same node",
addr.host().c_str(),addr.port());
cluster->terminate(-1,true,0,XMPPError::BadAddressing);
return;
}
JBClusterStream* dup = findClusterStream(ev->from(),cluster);
if (dup && dup->outgoing()) {
// Higher name is the BOSS!
int cmp = XMPPUtils::cmpBytes(ev->to(),ev->from());
if (cmp >= 0) {
dup->terminate(-1,true,0,XMPPError::Conflict);
TelEngine::destruct(dup);
}
}
if (!dup) {
if (!s_authCluster || cluster->flag(JBStream::StreamAuthenticated)) {
XMPPFeatureList features;
addCompressFeature(cluster,features);
cluster->start(&features);
}
else
Engine::enqueue(new UserAuthMessage(ev));
return;
}
SocketAddr oldAddr;
SocketAddr newAddr;
dup->remoteAddr(oldAddr);
cluster->remoteAddr(newAddr);
int level = (oldAddr.host() == newAddr.host()) ? DebugInfo : DebugWarn;
Debug(&__plugin,level,
"Got duplicate cluster stream node='%s' addr=%s:%d existing=%s:%d",
ev->from().c_str(),newAddr.host().c_str(),newAddr.port(),
oldAddr.host().c_str(),oldAddr.port());
TelEngine::destruct(dup);
cluster->terminate(-1,true,0,XMPPError::Conflict);
}
// Process Auth events from incoming streams
// The given event is always valid and carry a valid stream
void YJBEngine::processAuthIn(JBEvent* ev)
@ -2174,6 +2315,43 @@ void YJBEngine::processStreamEvent(JBEvent* ev)
Engine::enqueue(m);
}
// Process cluster stream Running, Destroy, Terminated events
// The given event is always valid and carry a valid stream
void YJBEngine::processStreamEventCluster(JBEvent* ev)
{
JBClusterStream* s = ev->clusterStream();
if (!s)
return;
s->lock();
String node = s->remote();
s->unlock();
bool reg = ev->type() == JBEvent::Running;
// Check for another stream on termination: the notification may come
// from a conflicted stream
JBClusterStream* dup = reg ? 0 : findClusterStream(node,s);
Debug(this,DebugAll,"Cluster stream (%p,%s) node=%s event=%s",
s,s->name(),node.c_str(),ev->name());
if (dup) {
TelEngine::destruct(dup);
return;
}
Message* m = __plugin.message("cluster.node");
m->addParam("node",node);
m->addParam("registered",String::boolText(reg));
if (reg) {
SocketAddr addr;
if (s->remoteAddr(addr)) {
m->addParam("ip_host",addr.host());
m->addParam("ip_port",String(addr.port()));
}
}
else {
if (ev->text())
m->addParam("error",ev->text());
}
Engine::enqueue(m);
}
// Process stream DbResult events
// The given event is always valid and carry a valid stream
void YJBEngine::processDbResult(JBEvent* ev)
@ -2592,13 +2770,16 @@ Message* YJBEngine::userRegister(JBStream& stream, bool reg, const char* instanc
void YJBEngine::statusParams(String& str)
{
lock();
unsigned int c2s = m_c2sReceive ? m_c2sReceive->streamCount() : 0;
unsigned int s2s = m_s2sReceive ? m_s2sReceive->streamCount() : 0;
unsigned int comp = m_compReceive ? m_compReceive->streamCount() : 0;
RefPointer<JBStreamSetList> list[JBStream::TypeCount];
getStreamLists(list);
unlock();
str << lookup(JBStream::c2s,JBStream::s_typeName) << "=" << c2s;
str << "," << lookup(JBStream::s2s,JBStream::s_typeName) << "=" << s2s;
str << "," << lookup(JBStream::comp,JBStream::s_typeName) << "=" << comp;
for (int i = 0; i < JBStream::TypeCount; i++) {
if (i)
str << ",";
str << lookup(i,JBStream::s_typeName) << "=";
str << (list[i] ? list[i]->streamCount() : 0);
list[i] = 0;
}
}
// Fill module status detail
@ -2607,7 +2788,9 @@ unsigned int YJBEngine::statusDetail(String& str, JBStream::Type t, JabberID* re
XDebug(this,DebugAll,"statusDetail('%s','%s')",
lookup(t,JBStream::s_typeName),TelEngine::c_safe(remote));
RefPointer<JBStreamSetList> list[JBStream::TypeCount];
lock();
getStreamLists(list,t);
unlock();
str << "format=Direction|Type|Status|Local|Remote";
unsigned int n = 0;
for (unsigned int i = 0; i < JBStream::TypeCount; i++) {
@ -2619,18 +2802,20 @@ unsigned int YJBEngine::statusDetail(String& str, JBStream::Type t, JabberID* re
for (ObjList* s = set->clients().skipNull(); s; s = s->skipNext()) {
JBStream* stream = static_cast<JBStream*>(s->get());
Lock lock(stream);
if (!remote || (i == JBStream::c2s && stream->remote().match(*remote))) {
bool handle = !remote;
if (!handle) {
if (i == JBStream::c2s || i == JBStream::cluster)
handle = stream->remote().match(*remote);
else if (i == JBStream::s2s) {
JBServerStream* s2s = stream->serverStream();
handle = (s2s->outgoing() && s2s->remote() == *remote) ||
(s2s->incoming() && s2s->hasRemoteDomain(*remote,false));
}
}
if (handle) {
n++;
streamDetail(str,stream);
}
else if (i == JBStream::s2s) {
JBServerStream* s2s = stream->serverStream();
if ((s2s->outgoing() && s2s->remote() == *remote) ||
(s2s->incoming() && s2s->hasRemoteDomain(*remote,false))) {
n++;
streamDetail(str,stream);
}
}
}
}
list[i]->unlock();
@ -2683,10 +2868,7 @@ void YJBEngine::completeStreamRemote(String& str, const String& partWord, JBStre
{
lock();
RefPointer<JBStreamSetList> list = 0;
if (t == JBStream::c2s)
list = m_c2sReceive;
else if (t == JBStream::s2s)
list = m_s2sReceive;
getStreamList(list,t);
unlock();
if (!list)
return;
@ -2696,7 +2878,7 @@ void YJBEngine::completeStreamRemote(String& str, const String& partWord, JBStre
for (ObjList* s = set->clients().skipNull(); s; s = s->skipNext()) {
JBStream* stream = static_cast<JBStream*>(s->get());
Lock lock(stream);
if (t == JBStream::c2s || stream->outgoing())
if (t == JBStream::c2s || t == JBStream::cluster || stream->outgoing())
Module::itemComplete(str,stream->remote(),partWord);
else if (t == JBStream::s2s && stream->incoming()) {
JBServerStream* s2s = stream->serverStream();
@ -2716,10 +2898,11 @@ void YJBEngine::completeStreamRemote(String& str, const String& partWord, JBStre
// Complete stream id starting with partWord
void YJBEngine::completeStreamName(String& str, const String& partWord)
{
RefPointer<JBStreamSetList> list[JBStream::TypeCount];
lock();
RefPointer<JBStreamSetList> list[2] = {m_c2sReceive,m_s2sReceive};
getStreamLists(list);
unlock();
for (unsigned int i = 0; i < 2; i++) {
for (unsigned int i = 0; i < JBStream::TypeCount; i++) {
if (!list[i])
continue;
list[i]->lock();
@ -2758,6 +2941,53 @@ void YJBEngine::updateDomainsFeatures()
}
}
// Build an xml from a message and sent it through cluster
bool YJBEngine::sendCluster(Message& msg, ObjList* skipParams)
{
const char* name = msg.getValue("cluster.message");
XmlElement* xml = list2xml(msg,name,skipParams);
return sendCluster(xml,msg["cluster.node"]);
}
// Send an xml element on all cluster streams or on a specified one
// Consume the element
// This method is thread safe
bool YJBEngine::sendCluster(XmlElement* xml, const String& node)
{
if (!xml)
return false;
lock();
RefPointer<JBStreamSetList> list = m_clusterReceive;
unlock();
if (!list) {
TelEngine::destruct(xml);
return 0;
}
Debug(this,DebugAll,"Sending cluster xml (%p) nodes=%s",xml,node.c_str());
ObjList* nodes = node ? node.split(',',false) : 0;
bool ok = false;
Lock lock(list);
for (ObjList* o = list->sets().skipNull(); o; o = o->skipNext()) {
JBStreamSet* set = static_cast<JBStreamSet*>(o->get());
for (ObjList* s = set->clients().skipNull(); s; s = s->skipNext()) {
JBClusterStream* stream = static_cast<JBClusterStream*>(s->get());
if (stream->state() == JBStream::Destroy)
continue;
if (nodes) {
Lock lock(stream);
if (!nodes->find(stream->remote()))
continue;
}
XmlElement* tmp = new XmlElement(*xml);
ok = stream->sendStanza(tmp) || ok;
}
}
list = 0;
TelEngine::destruct(nodes);
TelEngine::destruct(xml);
return ok;
}
// Notify an incoming s2s stream about a dialback verify response
void YJBEngine::notifyDbVerifyResult(const JabberID& local, const JabberID& remote,
const String& id, XMPPError::Type rsp)
@ -2807,11 +3037,12 @@ JBPendingJob::JBPendingJob(JBEvent* ev)
m_serverTarget(false),
m_serverItemTarget(false)
{
m_serverItemTarget = ev->to() && s_jabber->isServerItemDomain(ev->to().domain());
Lock lock(ev->stream());
m_local = ev->stream()->local().domain();
m_serverTarget = !m_serverItemTarget && (!ev->to() || ev->to() == ev->stream()->local());
lock.drop();
if (ev->stream()->type() != JBStream::cluster) {
m_serverItemTarget = ev->to() && s_jabber->isServerItemDomain(ev->to().domain());
Lock lock(ev->stream());
m_local = ev->stream()->local().domain();
m_serverTarget = !m_serverItemTarget && (!ev->to() || ev->to() == ev->stream()->local());
}
m_event->releaseStream(true);
}
@ -2896,7 +3127,10 @@ void JBPendingWorker::run()
processChat(*job);
break;
case XmlTag::Iq:
processIq(*job);
if (job->m_streamType != JBStream::cluster)
processIq(*job);
else
processIqCluster(*job);
break;
default:
DDebug(&__plugin,DebugStub,
@ -2984,7 +3218,7 @@ bool JBPendingWorker::add(JBEvent* ev)
String id(ev->from());
if (ev->stream()->type() == JBStream::s2s)
id << ev->to();
unsigned int index = id.hash() % s_threadCount;
unsigned int index = id.toLower().hash() % s_threadCount;
JBPendingWorker* th = s_threads[index];
if (th) {
Lock lock(th);
@ -3303,6 +3537,48 @@ void JBPendingWorker::processIq(JBPendingJob& job)
job.sendIqErrorStanza(XMPPError::ServiceUnavailable);
}
// Process iq jobs
void JBPendingWorker::processIqCluster(JBPendingJob& job)
{
JBEvent* ev = job.m_event;
XmlElement* service = ev->child();
XMPPUtils::IqType t = XMPPUtils::iqType(ev->stanzaType());
String* xmlns = 0;
int ns = XMPPNamespace::Count;
if (service) {
xmlns = service->xmlns();
if (xmlns)
ns = XMPPUtils::s_ns[*xmlns];
}
Debug(&__plugin,DebugAll,
"JBPendingWorker(%u) processing cluster (%p,%s) type=%s from=%s child=(%s,%s) [%p]",
m_index,ev->element(),ev->element()->tag(),ev->stanzaType().c_str(),
ev->from().c_str(),service ? service->tag() : "",
TelEngine::c_safe(xmlns),this);
if (!service || ns != XMPPNamespace::YateCluster)
return;
if (service->unprefixedTag() == s_yate) {
if (t != XMPPUtils::IqSet) {
Debug(&__plugin,DebugStub,"processIqCluster: unhandled iq type '%s'",
ev->stanzaType().c_str());
return;
}
const char* msg = service->attribute("name");
if (TelEngine::null(msg))
return;
Message* m = new Message(msg);
XmlElement::xml2param(*m,service,&(XMPPUtils::s_tag[XmlTag::Item]));
String module = m->getValue("module");
m->setParam("module",__plugin.name());
m->setParam("nodename",ev->from());
m->addParam(ev->from() + ".module",module,false);
Engine::enqueue(m);
return;
}
Debug(&__plugin,DebugStub,"processIqCluster: unhandled tag '%s'",
service->unprefixedTag().c_str());
}
// Reset the global index
bool JBPendingWorker::resetIndex()
{
@ -3329,6 +3605,8 @@ UserAuthMessage::UserAuthMessage(JBEvent* ev)
m_stream.c_str(),m_streamType,this);
__plugin.complete(*this);
addParam("streamtype",ev->stream()->typeName());
if (m_streamType == JBStream::cluster)
addParam("node",ev->from(),false);
SocketAddr addr;
if (ev->stream()->remoteAddr(addr)) {
addParam("ip_host",addr.host());
@ -3354,6 +3632,17 @@ void UserAuthMessage::dispatched(bool accepted)
JabberID username = getValue("username");
// Use a while() to break to the end
while (stream) {
if (stream->type() == JBStream::cluster) {
if (accepted) {
XMPPFeatureList features;
addCompressFeature(stream,features);
stream->start(&features);
}
else
stream->terminate(-1,true,0,XMPPError::NotAuthorized);
TelEngine::destruct(stream);
return;
}
Lock lock(stream);
// Returned value '-' means deny
if (accepted && retValue() == "-")
@ -3441,6 +3730,8 @@ bool JBMessageHandler::received(Message& msg)
case EngineStart:
s_jabber->handleEngineStart(msg);
return false;
case ClusterSend:
return s_jabber->sendCluster(msg);
default:
DDebug(&__plugin,DebugStub,"JBMessageHandler(%s) not handled!",msg.c_str());
}
@ -3582,11 +3873,17 @@ void JBModule::initialize()
Configuration cfg(Engine::configFile("jabberserver"));
if (!m_init) {
// Init some globals
s_clusterControlSkip.append(new String("targetid"));
s_clusterControlSkip.append(new String("component"));
s_clusterControlSkip.append(new String("operation"));
// Init module
m_init = true;
setup();
installRelay(Halt);
installRelay(Help);
installRelay(ImExecute);
installRelay(Control);
s_jabber = new YJBEngine;
s_jabber->debugChain(this);
// Install handlers
@ -3623,6 +3920,7 @@ void JBModule::initialize()
// (re)init globals
s_s2sFeatures = cfg.getBoolValue("general","s2s_offerfeatures",true);
s_dumpIq = cfg.getBoolValue("general","dump_iq");
s_authCluster = cfg.getBoolValue("general","authcluster");
// Init the engine
s_jabber->initialize(cfg.getSection("general"),!m_init);
@ -3800,6 +4098,11 @@ bool JBModule::received(Message& msg, int id)
msg.retValue() << "Show or set the debug level for a stream.\r\n";
return true;
}
if (id == Control) {
if (msg["targetid"] == "cluster")
return handleClusterControl(msg);
return Module::received(msg,id);
}
if (id == Halt) {
s_jabber->setExiting();
// Stop pending job workers
@ -3932,7 +4235,8 @@ bool JBModule::commandExecute(String& retVal, const String& line)
JabberID remote(l);
unsigned int n = 0;
if (remote.valid())
n = s_jabber->dropAll(t,JabberID::empty(),remote);
n = s_jabber->dropAll(t,JabberID::empty(),remote,
XMPPError::UndefinedCondition,"dropped");
retVal << "Dropped " << n << " stream(s)";
}
else {
@ -3941,7 +4245,7 @@ bool JBModule::commandExecute(String& retVal, const String& line)
n.append(l," ");
JBStream* stream = s_jabber->findStream(word);
if (stream) {
stream->terminate(-1,true,0,XMPPError::NoError);
stream->terminate(-1,true,0,XMPPError::UndefinedCondition,"dropped");
TelEngine::destruct(stream);
retVal << "Dropped stream '" << n << "'";
}
@ -4032,6 +4336,42 @@ bool JBModule::commandExecute(String& retVal, const String& line)
return true;
}
// Handle chan.control with targetid=cluster
bool JBModule::handleClusterControl(Message& msg)
{
const String& oper = msg["operation"];
Debug(this,DebugAll,"Handling cluster control oper=%s",oper.c_str());
// Send yate message
if (oper == "send")
return s_jabber->sendCluster(msg,&s_clusterControlSkip);
// Start/stop listener
if (oper == "listen") {
String name = msg.getValue("name","cluster");
if (msg.getBoolValue("enable")) {
NamedList p(msg);
p.setParam("type",lookup(JBStream::cluster,JBStream::s_typeName));
return buildListener(name,p);
}
cancelListener(name);
return false;
}
// Start/stop node connection
if (oper == "connect") {
const String& node = msg["node"];
if (!node)
return false;
bool enable = msg.getBoolValue("enable");
JBClusterStream* s = s_jabber->getClusterStream(node,msg,enable);
if (!s)
return false;
if (!enable)
s->terminate(-1,true,0,XMPPError::NoError,msg.getValue("reason","dropped"));
TelEngine::destruct(s);
return true;
}
return false;
}
// Build a listener from a list of parameters. Add it to the list and start it
bool JBModule::buildListener(const String& name, NamedList& p)
{