Added external component support to jabber server.

git-svn-id: http://voip.null.ro/svn/yate@2893 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
marian 2009-11-04 14:29:28 +00:00
parent d6fceae1dc
commit a5a3672ead
7 changed files with 417 additions and 105 deletions

View File

@ -88,6 +88,7 @@
; Allowed values: ; Allowed values:
; c2s Client to server connection ; c2s Client to server connection
; s2s Server to server connection ; s2s Server to server connection
; comp External component connection
;type= ;type=
; address: string: IP address to listen ; address: string: IP address to listen
@ -98,6 +99,7 @@
; These are the default values for some known types (only if this parameter is missing) ; These are the default values for some known types (only if this parameter is missing)
; c2s 5222 ; c2s 5222
; s2s 5269 ; s2s 5269
; Tehre is no default value for external component listeners
;port= ;port=
; backlog: integer: Maximum length of the queue of pending connections ; backlog: integer: Maximum length of the queue of pending connections

View File

@ -819,7 +819,9 @@ bool JBEngine::acceptConn(Socket* sock, SocketAddr& remote, JBStream::Type t)
if (t == JBStream::c2s) if (t == JBStream::c2s)
s = new JBClientStream(this,sock); s = new JBClientStream(this,sock);
else if (t == JBStream::s2s) else if (t == JBStream::s2s)
s = new JBServerStream(this,sock); s = new JBServerStream(this,sock,false);
else if (t == JBStream::comp)
s = new JBServerStream(this,sock,true);
if (s) if (s)
addStream(s); addStream(s);
else else
@ -1096,23 +1098,6 @@ void JBEngine::removeStream(JBStream* stream, bool delObj)
unlock(); unlock();
} }
// Find a stream by its name in a given set list
JBStream* JBEngine::findStream(const String& id, JBStreamSetList* list)
{
if (!list)
return 0;
Lock lock(list);
ObjList* found = 0;
for (ObjList* o = list->sets().skipNull(); !found && o; o = o->skipNext()) {
JBStreamSet* set = static_cast<JBStreamSet*>(o->get());
found = set->clients().find(id);
}
JBStream* stream = found ? static_cast<JBStream*>(found->get()) : 0;
if (stream && !stream->ref())
stream = 0;
return stream;
}
// Add/remove a connect stream thread when started/stopped // Add/remove a connect stream thread when started/stopped
void JBEngine::connectStatus(JBConnect* conn, bool started) void JBEngine::connectStatus(JBConnect* conn, bool started)
{ {
@ -1130,6 +1115,23 @@ void JBEngine::connectStatus(JBConnect* conn, bool started)
} }
} }
// Find a stream by its name in a given set list
JBStream* JBEngine::findStream(const String& id, JBStreamSetList* list)
{
if (!list)
return 0;
Lock lock(list);
ObjList* found = 0;
for (ObjList* o = list->sets().skipNull(); !found && o; o = o->skipNext()) {
JBStreamSet* set = static_cast<JBStreamSet*>(o->get());
found = set->clients().find(id);
}
JBStream* stream = found ? static_cast<JBStream*>(found->get()) : 0;
if (stream && !stream->ref())
stream = 0;
return stream;
}
/* /*
* JBServerEngine * JBServerEngine
@ -1137,7 +1139,8 @@ void JBEngine::connectStatus(JBConnect* conn, bool started)
JBServerEngine::JBServerEngine(const char* name) JBServerEngine::JBServerEngine(const char* name)
: JBEngine(name), : JBEngine(name),
m_streamIndex(0), m_streamIndex(0),
m_c2sReceive(0), m_c2sProcess(0), m_s2sReceive(0), m_s2sProcess(0) m_c2sReceive(0), m_c2sProcess(0), m_s2sReceive(0), m_s2sProcess(0),
m_compReceive(0), m_compProcess(0)
{ {
} }
@ -1158,6 +1161,8 @@ void JBServerEngine::cleanup(bool final, bool waitTerminate)
TelEngine::destruct(m_c2sProcess); TelEngine::destruct(m_c2sProcess);
TelEngine::destruct(m_s2sReceive); TelEngine::destruct(m_s2sReceive);
TelEngine::destruct(m_s2sProcess); TelEngine::destruct(m_s2sProcess);
TelEngine::destruct(m_compReceive);
TelEngine::destruct(m_compProcess);
} }
// Stop all stream sets // Stop all stream sets
@ -1166,23 +1171,14 @@ void JBServerEngine::stopStreamSets(bool waitTerminate)
XDebug(this,DebugAll,"JBServerEngine::stopStreamSets() wait=%s", XDebug(this,DebugAll,"JBServerEngine::stopStreamSets() wait=%s",
String::boolText(waitTerminate)); String::boolText(waitTerminate));
lock(); lock();
RefPointer<JBStreamSetList> c2sReceive = m_c2sReceive; RefPointer<JBStreamSetList> sets[6] = {m_c2sReceive,m_c2sProcess,
RefPointer<JBStreamSetList> c2sProcess = m_c2sProcess; m_s2sReceive,m_s2sProcess,m_compReceive,m_compProcess};
RefPointer<JBStreamSetList> s2sReceive = m_s2sReceive;
RefPointer<JBStreamSetList> s2sProcess = m_s2sProcess;
unlock(); unlock();
if (c2sReceive) for (int i = 0; i < 6; i++)
c2sReceive->stop(0,waitTerminate); if (sets[i])
if (c2sProcess) sets[i]->stop(0,waitTerminate);
c2sProcess->stop(0,waitTerminate); for (int j = 0; j < 6; j++)
if (s2sReceive) sets[j] = 0;
s2sReceive->stop(0,waitTerminate);
if (s2sProcess)
s2sProcess->stop(0,waitTerminate);
c2sReceive = 0;
c2sProcess = 0;
s2sReceive = 0;
s2sProcess = 0;
} }
// Retrieve the list of streams of a given type // Retrieve the list of streams of a given type
@ -1193,9 +1189,11 @@ void JBServerEngine::getStreamList(RefPointer<JBStreamSetList>& list, int type)
list = m_c2sReceive; list = m_c2sReceive;
else if (type == JBStream::s2s) else if (type == JBStream::s2s)
list = m_s2sReceive; list = m_s2sReceive;
else if (type == JBStream::comp)
list = m_compReceive;
} }
// Find a server to server stream by local/remote domain. // Find a server to server or component stream by local/remote domain.
// Skip over outgoing dialback streams // Skip over outgoing dialback streams
JBServerStream* JBServerEngine::findServerStream(const String& local, const String& remote, JBServerStream* JBServerEngine::findServerStream(const String& local, const String& remote,
bool out) bool out)
@ -1203,17 +1201,17 @@ JBServerStream* JBServerEngine::findServerStream(const String& local, const Stri
if (!(local && remote)) if (!(local && remote))
return 0; return 0;
lock(); lock();
RefPointer<JBStreamSetList> list = m_s2sReceive; RefPointer<JBStreamSetList> list[2] = {m_s2sReceive,m_compReceive};
unlock(); unlock();
if (!list)
return 0;
JBServerStream* stream = 0; JBServerStream* stream = 0;
list->lock(); for (int i = 0; i < 2; i++) {
for (ObjList* o = list->sets().skipNull(); o; o = o->skipNext()) { list[i]->lock();
for (ObjList* o = list[i]->sets().skipNull(); o; o = o->skipNext()) {
JBStreamSet* set = static_cast<JBStreamSet*>(o->get()); JBStreamSet* set = static_cast<JBStreamSet*>(o->get());
for (ObjList* s = set->clients().skipNull(); s; s = s->skipNext()) { for (ObjList* s = set->clients().skipNull(); s; s = s->skipNext()) {
stream = static_cast<JBServerStream*>(s->get()); stream = static_cast<JBServerStream*>(s->get());
if (out == stream->outgoing() && !stream->dialback()) { if (stream->type() == JBStream::comp ||
(out == stream->outgoing() && !stream->dialback())) {
// Lock the stream: remote jid might change // Lock the stream: remote jid might change
Lock lock(stream); Lock lock(stream);
if (local == stream->local() && remote == stream->remote()) { if (local == stream->local() && remote == stream->remote()) {
@ -1223,9 +1221,14 @@ JBServerStream* JBServerEngine::findServerStream(const String& local, const Stri
} }
stream = 0; stream = 0;
} }
if (stream)
break;
} }
list->unlock(); list[i]->unlock();
list = 0; if (stream)
break;
}
list[0] = list[1] = 0;
return stream; return stream;
} }
@ -1286,6 +1289,10 @@ void JBServerEngine::addStream(JBStream* stream)
recv = m_s2sReceive; recv = m_s2sReceive;
process = m_s2sProcess; process = m_s2sProcess;
} }
else if (stream->type() == JBStream::comp) {
recv = m_compReceive;
process = m_compProcess;
}
unlock(); unlock();
if (recv && process) { if (recv && process) {
recv->add(stream); recv->add(stream);
@ -1318,6 +1325,10 @@ void JBServerEngine::removeStream(JBStream* stream, bool delObj)
recv = m_s2sReceive; recv = m_s2sReceive;
process = m_s2sProcess; process = m_s2sProcess;
} }
else if (stream->type() == JBStream::comp) {
recv = m_compReceive;
process = m_compProcess;
}
unlock(); unlock();
if (recv) if (recv)
recv->remove(stream,delObj); recv->remove(stream,delObj);

View File

@ -113,6 +113,7 @@ const TokenDict JBStream::s_flagName[] = {
const TokenDict JBStream::s_typeName[] = { const TokenDict JBStream::s_typeName[] = {
{"c2s", c2s}, {"c2s", c2s},
{"s2s", s2s}, {"s2s", s2s},
{"comp", comp},
{0,0} {0,0}
}; };
@ -536,6 +537,10 @@ bool JBStream::authenticated(bool ok, const String& rsp, XMPPError::Type error)
XmlElement* rsp = XMPPUtils::createDialbackResult(m_local,m_remote,true); XmlElement* rsp = XMPPUtils::createDialbackResult(m_local,m_remote,true);
ok = sendStreamXml(Running,rsp); ok = sendStreamXml(Running,rsp);
} }
else if (m_type == comp) {
XmlElement* rsp = XMPPUtils::createElement(XmlTag::Handshake);
ok = sendStreamXml(Running,rsp);
}
} }
else { else {
if (m_type == c2s) { if (m_type == c2s) {
@ -548,6 +553,8 @@ bool JBStream::authenticated(bool ok, const String& rsp, XMPPError::Type error)
if (ok) if (ok)
terminate(0,true,0,XMPPError::NotAuthorized); terminate(0,true,0,XMPPError::NotAuthorized);
} }
else if (m_type == comp)
terminate(0,true,0,XMPPError::NotAuthorized);
} }
TelEngine::destruct(m_sasl); TelEngine::destruct(m_sasl);
return ok; return ok;
@ -738,7 +745,7 @@ void JBStream::process(u_int64_t time)
JabberID to; JabberID to;
if (!getJids(root,from,to)) if (!getJids(root,from,to))
break; break;
XDebug(this,DebugAll,"Processing (%p,%s) in state %s [%p]", Debug(this,DebugAll,"Processing (%p,%s) in state %s [%p]",
root,root->tag(),stateName(),this); root,root->tag(),stateName(),this);
processStart(root,from,to); processStart(root,from,to);
break; break;
@ -988,6 +995,7 @@ bool JBStream::processAuth(XmlElement* xml, const JabberID& from,
// Check if a received start start element's namespaces are correct. // Check if a received start start element's namespaces are correct.
bool JBStream::processStreamStart(const XmlElement* xml) bool JBStream::processStreamStart(const XmlElement* xml)
{ {
XDebug(this,DebugAll,"JBStream::processStreamStart() [%p]",this);
if (m_state == Starting) if (m_state == Starting)
return true; return true;
changeState(Starting); changeState(Starting);
@ -1000,7 +1008,7 @@ bool JBStream::processStreamStart(const XmlElement* xml)
XMPPError::Type error = XMPPError::NoError; XMPPError::Type error = XMPPError::NoError;
const char* reason = 0; const char* reason = 0;
while (true) { while (true) {
if (m_type != c2s && m_type != s2s) { if (m_type != c2s && m_type != s2s && m_type != comp) {
Debug(this,DebugStub,"processStreamStart() type %u not handled!",m_type); Debug(this,DebugStub,"processStreamStart() type %u not handled!",m_type);
error = XMPPError::Internal; error = XMPPError::Internal;
break; break;
@ -1039,7 +1047,7 @@ bool JBStream::processStreamStart(const XmlElement* xml)
else else
error = XMPPError::EncryptionRequired; error = XMPPError::EncryptionRequired;
} }
else else if (m_type != comp)
error = XMPPError::Internal; error = XMPPError::Internal;
} }
else if (remoteVersion > 1) else if (remoteVersion > 1)
@ -1162,14 +1170,16 @@ bool JBStream::checkStanzaRecv(XmlElement* xml, JabberID& from, JabberID& to)
"Possible checkStanzaRecv() unhandled outgoing c2s stream [%p]",this); "Possible checkStanzaRecv() unhandled outgoing c2s stream [%p]",this);
} }
break; break;
case comp:
case s2s: case s2s:
// RFC 3920bis 9.1.1.2 and 9.1.2.1: // RFC 3920bis 9.1.1.2 and 9.1.2.1:
// Validate 'to' and 'from' // Validate 'to' and 'from'
// Accept anything for component streams
if (!(to && from)) { if (!(to && from)) {
terminate(0,m_incoming,xml,XMPPError::BadAddressing); terminate(0,m_incoming,xml,XMPPError::BadAddressing);
return false; return false;
} }
if (!m_engine->hasDomain(to.domain())) { if (m_type == s2s && !m_engine->hasDomain(to.domain())) {
terminate(0,m_incoming,xml,XMPPError::HostUnknown); terminate(0,m_incoming,xml,XMPPError::HostUnknown);
return false; return false;
} }
@ -1582,6 +1592,20 @@ bool JBStream::processFeaturesIn(XmlElement* xml, const JabberID& from, const Ja
if (!xml->getTag(t,nsName)) if (!xml->getTag(t,nsName))
return dropXml(xml,"invalid tag namespace prefix"); return dropXml(xml,"invalid tag namespace prefix");
int ns = nsName ? XMPPUtils::s_ns[*nsName] : XMPPNamespace::Count; int ns = nsName ? XMPPUtils::s_ns[*nsName] : XMPPNamespace::Count;
// Component: Waiting for handshake in the stream namespace
if (type() == comp) {
if (outgoing())
return dropXml(xml,"invalid state for incoming stream");
if (*t != XMPPUtils::s_tag[XmlTag::Handshake] || ns != m_xmlns)
return dropXml(xml,"expecting handshake in stream's namespace");
JBEvent* ev = new JBEvent(JBEvent::Auth,this,xml,from,to);
ev->m_text = xml->getText();
m_events.append(ev);
changeState(Auth);
return true;
}
// Check if received unexpected feature // Check if received unexpected feature
if (!m_features.get(ns)) { if (!m_features.get(ns)) {
// Check for some features that can be negotiated via 'iq' elements // Check for some features that can be negotiated via 'iq' elements
@ -1812,6 +1836,9 @@ void JBStream::setXmlns()
case s2s: case s2s:
m_xmlns = XMPPNamespace::Server; m_xmlns = XMPPNamespace::Server;
break; break;
case comp:
m_xmlns = XMPPNamespace::ComponentAccept;
break;
} }
} }
@ -2308,8 +2335,8 @@ bool JBClientStream::bind()
* JBServerStream * JBServerStream
*/ */
// Build an incoming stream from a socket // Build an incoming stream from a socket
JBServerStream::JBServerStream(JBEngine* engine, Socket* socket) JBServerStream::JBServerStream(JBEngine* engine, Socket* socket, bool component)
: JBStream(engine,socket,s2s), : JBStream(engine,socket,component ? comp : s2s),
m_dbKey(0) m_dbKey(0)
{ {
} }
@ -2387,7 +2414,7 @@ bool JBServerStream::processRunning(XmlElement* xml, const JabberID& from,
// Check the tags of known dialback elements: // Check the tags of known dialback elements:
// there are servers who don't stamp them with the namespace // there are servers who don't stamp them with the namespace
// Let other elements stamped with dialback namespace go the upper layer // Let other elements stamped with dialback namespace go the upper layer
if (isDbResult(*xml)) { if (type() != comp && isDbResult(*xml)) {
if (outgoing()) if (outgoing())
return dropXml(xml,"dialback result on outgoing stream"); return dropXml(xml,"dialback result on outgoing stream");
const char* key = xml->getText(); const char* key = xml->getText();
@ -2422,6 +2449,7 @@ XmlElement* JBServerStream::buildStreamStart()
start->setAttribute("id",m_id); start->setAttribute("id",m_id);
XMPPUtils::setStreamXmlns(*start); XMPPUtils::setStreamXmlns(*start);
start->setAttribute(XmlElement::s_ns,XMPPUtils::s_ns[m_xmlns]); start->setAttribute(XmlElement::s_ns,XMPPUtils::s_ns[m_xmlns]);
if (type() == s2s) {
start->setAttribute(XmlElement::s_nsPrefix + "db",XMPPUtils::s_ns[XMPPNamespace::Dialback]); start->setAttribute(XmlElement::s_nsPrefix + "db",XMPPUtils::s_ns[XMPPNamespace::Dialback]);
if (!dialback()) { if (!dialback()) {
start->setAttributeValid("from",m_local.bare()); start->setAttributeValid("from",m_local.bare());
@ -2431,6 +2459,11 @@ XmlElement* JBServerStream::buildStreamStart()
start->setAttribute("xml:lang","en"); start->setAttribute("xml:lang","en");
} }
} }
}
else if (type() == comp) {
if (incoming())
start->setAttributeValid("from",m_remote.domain());
}
return start; return start;
} }
@ -2441,9 +2474,22 @@ XmlElement* JBServerStream::buildStreamStart()
bool JBServerStream::processStart(const XmlElement* xml, const JabberID& from, bool JBServerStream::processStart(const XmlElement* xml, const JabberID& from,
const JabberID& to) const JabberID& to)
{ {
XDebug(this,DebugAll,"JBServerStream::processStart() [%p]",this);
if (!processStreamStart(xml)) if (!processStreamStart(xml))
return true; return true;
if (type() == comp) {
if (incoming()) {
changeState(Starting);
m_events.append(new JBEvent(JBEvent::Start,this,0,to,JabberID::empty()));
return true;
}
Debug(this,DebugStub,"JBComponentStream::processStart() not implemented for outgoing [%p]",this);
terminate(0,true,0,XMPPError::NoError);
return false;
}
if (outgoing()) { if (outgoing()) {
// Wait features ? // Wait features ?
if (flag(StreamRemoteVer1)) { if (flag(StreamRemoteVer1)) {
@ -2506,4 +2552,17 @@ bool JBServerStream::processAuth(XmlElement* xml, const JabberID& from,
return dropXml(xml,"incomplete state process"); return dropXml(xml,"incomplete state process");
} }
// Start the stream (reply to received stream start)
bool JBServerStream::startComp(const String& local, const String& remote)
{
if (state() != Starting || type() != comp)
return false;
Lock lock(this);
m_local.set(local);
m_remote.set(remote);
setSecured();
XmlElement* s = buildStreamStart();
return sendStreamXml(Features,s);
}
/* vi: set ts=8 sw=4 sts=4 noet: */ /* vi: set ts=8 sw=4 sts=4 noet: */

View File

@ -223,6 +223,7 @@ const String XMPPNamespace::s_array[Count] = {
"jabber:x:oob", // XOob "jabber:x:oob", // XOob
"http://jabber.org/protocol/command", // Command "http://jabber.org/protocol/command", // Command
"msgoffline", // MsgOffline "msgoffline", // MsgOffline
"jabber:component:accept", // ComponentAccept
}; };
const String XMPPError::s_array[Count] = { const String XMPPError::s_array[Count] = {
@ -356,6 +357,7 @@ const String XmlTag::s_array[Count] = {
"identity", // Identity "identity", // Identity
"priority", // Priority "priority", // Priority
"c", // EntityCapsTag "c", // EntityCapsTag
"handshake", // Handshake
}; };
XMPPNamespace XMPPUtils::s_ns; XMPPNamespace XMPPUtils::s_ns;

View File

@ -214,7 +214,8 @@ public:
XOob = 44, // jabber:x:oob XOob = 44, // jabber:x:oob
Command= 45, // http://jabber.org/protocol/command Command= 45, // http://jabber.org/protocol/command
MsgOffline= 46, // msgoffline MsgOffline= 46, // msgoffline
Count = 47, ComponentAccept = 47, // jabber:component:accept
Count = 48,
}; };
/** /**
@ -399,7 +400,8 @@ public:
Identity = 63, // identity Identity = 63, // identity
Priority = 64, // priority Priority = 64, // priority
EntityCapsTag = 65, // c EntityCapsTag = 65, // c
Count = 66 Handshake = 66, // handshake
Count = 67
}; };
/** /**

View File

@ -31,6 +31,7 @@
*/ */
namespace TelEngine { namespace TelEngine {
class SASL; // SASL authentication mechanism
class JBEvent; // A Jabber event class JBEvent; // A Jabber event
class JBStream; // A Jabber stream class JBStream; // A Jabber stream
class JBClientStream; // A client to server stream class JBClientStream; // A client to server stream
@ -230,6 +231,7 @@ public:
Start, Start,
// Incoming stream need auth: when processing this event, the upper // Incoming stream need auth: when processing this event, the upper
// layer must call stream's authenticated() method // layer must call stream's authenticated() method
// Component: the event's text contains the handshake data
Auth, Auth,
// The event's element is an 'iq' with a child qualified by bind namespace // The event's element is an 'iq' with a child qualified by bind namespace
// This event is generated by an incoming client stream without a bound resource // This event is generated by an incoming client stream without a bound resource
@ -470,7 +472,8 @@ public:
enum Type { enum Type {
c2s = 0, // Client to server c2s = 0, // Client to server
s2s = 1, // Server to server s2s = 1, // Server to server
TypeCount = 2 // Unknown comp = 2, // External component
TypeCount = 3 // Unknown
}; };
/** /**
@ -487,6 +490,7 @@ public:
WaitTlsRsp = 5, // 'starttls' sent: waiting for response WaitTlsRsp = 5, // 'starttls' sent: waiting for response
Securing = 10, // Stream is currently negotiating the TLS Securing = 10, // Stream is currently negotiating the TLS
Auth = 11, // Auth element (db:result for s2s streams) sent Auth = 11, // Auth element (db:result for s2s streams) sent
// Incoming comp: handshake received
Challenge = 12, // 'challenge' element sent/received Challenge = 12, // 'challenge' element sent/received
Register = 20, // A new user is currently registering Register = 20, // A new user is currently registering
Running = 100, // Established. Allow XML stanzas to pass over the stream Running = 100, // Established. Allow XML stanzas to pass over the stream
@ -1042,6 +1046,14 @@ protected:
return false; return false;
} }
/**
* Set secured flag. Remove feature from list
*/
inline void setSecured() {
m_flags |= StreamSecured;
m_features.remove(XMPPNamespace::Tls);
}
State m_state; // Stream state State m_state; // Stream state
String m_id; // Stream id String m_id; // Stream id
JabberID m_local; // Local peer's jid JabberID m_local; // Local peer's jid
@ -1093,11 +1105,6 @@ private:
// Return false if stream termination was initiated // Return false if stream termination was initiated
bool processWaitTlsRsp(XmlElement* xml, const JabberID& from, bool processWaitTlsRsp(XmlElement* xml, const JabberID& from,
const JabberID& to); const JabberID& to);
// Set secured flag. Remove feature from list
inline void setSecured() {
m_flags |= StreamSecured;
m_features.remove(XMPPNamespace::Tls);
}
// Set stream namespace from type // Set stream namespace from type
void setXmlns(); void setXmlns();
// Event termination notification // Event termination notification
@ -1309,8 +1316,9 @@ public:
* Constructor. Build an incoming stream from a socket * Constructor. Build an incoming stream from a socket
* @param engine Engine owning this stream * @param engine Engine owning this stream
* @param socket The socket * @param socket The socket
* @param component True to build an external component stream
*/ */
JBServerStream(JBEngine* engine, Socket* socket); JBServerStream(JBEngine* engine, Socket* socket, bool component = false);
/** /**
* Constructor. Build an outgoing stream * Constructor. Build an outgoing stream
@ -1380,6 +1388,25 @@ public:
*/ */
bool sendDialback(); bool sendDialback();
/**
* Build a component handshake from stream id and secret as defined in XEP 0114
* @param buf Destination buffer
* @param secret The secret
*/
inline void buildHandshake(String& buf, const String& secret) {
SHA1 sha(id() + secret);
buf = sha.hexDigest();
buf.toLower();
}
/**
* Start a component stream (reply to received stream start)
* @param local Local domain
* @param remote Remote domain
* @return True on success
*/
bool startComp(const String& local, const String& remote);
protected: protected:
/** /**
* Release memory * Release memory
@ -1782,6 +1809,8 @@ protected:
getStreamList(list[JBStream::c2s],JBStream::c2s); getStreamList(list[JBStream::c2s],JBStream::c2s);
if (type == JBStream::s2s || type == JBStream::TypeCount) if (type == JBStream::s2s || type == JBStream::TypeCount)
getStreamList(list[JBStream::s2s],JBStream::s2s); getStreamList(list[JBStream::s2s],JBStream::s2s);
if (type == JBStream::comp || type == JBStream::TypeCount)
getStreamList(list[JBStream::comp],JBStream::comp);
} }
/** /**
@ -1849,12 +1878,13 @@ public:
{ name << "stream/" << getStreamIndex(); } { name << "stream/" << getStreamIndex(); }
/** /**
* Find a server to server stream by local/remote domain. * Find a server to server or component stream by local/remote domain.
* Skip over outgoing dialback only streams * Skip over outgoing dialback only streams
* This method is thread safe * This method is thread safe
* @param local Local domain * @param local Local domain
* @param remote Remote domain * @param remote Remote domain
* @param out True to find an outgoing stream, false to find an incoming one * @param out True to find an outgoing stream, false to find an incoming one.
* Ignored for component streams
* @return Referenced JBServerStream pointer or 0 * @return Referenced JBServerStream pointer or 0
*/ */
JBServerStream* findServerStream(const String& local, const String& remote, bool out); JBServerStream* findServerStream(const String& local, const String& remote, bool out);
@ -1924,6 +1954,8 @@ protected:
JBStreamSetList* m_c2sProcess; // c2s streams process list JBStreamSetList* m_c2sProcess; // c2s streams process list
JBStreamSetList* m_s2sReceive; // s2s streams receive list JBStreamSetList* m_s2sReceive; // s2s streams receive list
JBStreamSetList* m_s2sProcess; // s2s streams process list JBStreamSetList* m_s2sProcess; // s2s streams process list
JBStreamSetList* m_compReceive; // comp streams receive list
JBStreamSetList* m_compProcess; // comp streams process list
}; };
/** /**

View File

@ -181,8 +181,19 @@ public:
} }
// Replace the list of domains service by this engine // Replace the list of domains service by this engine
void setDomains(const String& list); void setDomains(const String& list);
// Retrieve a subdomain of a serviced domain
void getSubDomain(String& subdomain, const String& domain);
// Add or remove a component to/from serviced domains and components list
void setComponent(const String& domain, bool add);
// Check if a component is serviced by this engine
bool hasComponent(const String& domain);
// Check if a resource name is retricted // Check if a resource name is retricted
bool restrictedResource(const String& name); bool restrictedResource(const String& name);
// Check if a domain is serviced by a server item
bool isServerItemDomain(const String& domain);
// Internally route c2s <--> comp stanzas
// Return true if handled
bool routeInternal(JBEvent* ev);
// Process 'user.roster' notification messages // Process 'user.roster' notification messages
void handleUserRoster(Message& msg); void handleUserRoster(Message& msg);
// Process 'user.update' messages // Process 'user.update' messages
@ -283,6 +294,7 @@ private:
ObjList m_domains; // Domains serviced by this engine ObjList m_domains; // Domains serviced by this engine
ObjList m_restrictedResources; // Resource names the users can't use ObjList m_restrictedResources; // Resource names the users can't use
ObjList m_items; ObjList m_items;
ObjList m_components;
XMPPFeatureList m_c2sFeatures; // Server features to advertise on c2s streams XMPPFeatureList m_c2sFeatures; // Server features to advertise on c2s streams
XMPPFeatureList m_features; // Server features to advertise on non c2s streams XMPPFeatureList m_features; // Server features to advertise on non c2s streams
String m_dialbackSecret; // Server dialback secret used to build keys String m_dialbackSecret; // Server dialback secret used to build keys
@ -651,7 +663,7 @@ static XmlElement* buildRosterItem(NamedList& list, unsigned int index)
// Complete stream type // Complete stream type
static void completeStreamType(String& buf, const String& part, bool addAll = false) static void completeStreamType(String& buf, const String& part, bool addAll = false)
{ {
static const String t[] = {"c2s","s2s",""}; static const String t[] = {"c2s","s2s","comp", ""};
static const String all[] = {"all","*",""}; static const String all[] = {"all","*",""};
for (const String* d = t; !d->null(); d++) for (const String* d = t; !d->null(); d++)
Module::itemComplete(buf,*d,part); Module::itemComplete(buf,*d,part);
@ -704,6 +716,8 @@ YJBEngine::YJBEngine()
m_c2sProcess = new YStreamSetProcess(this,10,"c2s/process"); m_c2sProcess = new YStreamSetProcess(this,10,"c2s/process");
m_s2sReceive = new YStreamSetReceive(this,0,"s2s/recv"); m_s2sReceive = new YStreamSetReceive(this,0,"s2s/recv");
m_s2sProcess = new YStreamSetProcess(this,0,"s2s/process"); m_s2sProcess = new YStreamSetProcess(this,0,"s2s/process");
m_compReceive = new YStreamSetReceive(this,0,"comp/recv");
m_compProcess = new YStreamSetProcess(this,0,"comp/process");
// c2s features // c2s features
m_c2sFeatures.add(XMPPNamespace::DiscoInfo); m_c2sFeatures.add(XMPPNamespace::DiscoInfo);
m_c2sFeatures.add(XMPPNamespace::DiscoItems); m_c2sFeatures.add(XMPPNamespace::DiscoItems);
@ -792,13 +806,21 @@ void YJBEngine::processEvent(JBEvent* ev)
XDebug(this,DebugInfo,"Processing event (%p,%s)",ev,ev->name()); XDebug(this,DebugInfo,"Processing event (%p,%s)",ev,ev->name());
switch (ev->type()) { switch (ev->type()) {
case JBEvent::Message: case JBEvent::Message:
if (!ev->element())
break;
if (!routeInternal(ev))
JBPendingWorker::add(ev); JBPendingWorker::add(ev);
break; break;
case JBEvent::Presence: case JBEvent::Presence:
if (ev->element()) if (!ev->element())
break;
if (!routeInternal(ev))
processPresenceStanza(ev); processPresenceStanza(ev);
break; break;
case JBEvent::Iq: case JBEvent::Iq:
if (!ev->element())
break;
if (!routeInternal(ev))
JBPendingWorker::add(ev); JBPendingWorker::add(ev);
break; break;
case JBEvent::Start: case JBEvent::Start:
@ -872,8 +894,15 @@ void YJBEngine::buildDialbackKey(const String& id, String& key)
// Check if a domain is serviced by this engine // Check if a domain is serviced by this engine
bool YJBEngine::hasDomain(const String& domain) bool YJBEngine::hasDomain(const String& domain)
{ {
if (!domain)
return false;
Lock lock(this); Lock lock(this);
return domain && m_domains.find(domain); for (ObjList* o = m_domains.skipNull(); o; o = o->skipNext()) {
String* tmp = static_cast<String*>(o->get());
if (*tmp == domain)
return true;
}
return false;
} }
// Replace the list of domains service by this engine // Replace the list of domains service by this engine
@ -902,6 +931,48 @@ void YJBEngine::setDomains(const String& list)
Debug(this,DebugGoOn,"No domains configured"); Debug(this,DebugGoOn,"No domains configured");
} }
// Add or remove a component to/from serviced domains and components list
void YJBEngine::setComponent(const String& domain, bool add)
{
Lock lock(this);
ObjList* oc = m_components.skipNull();
for (; oc; oc = oc->skipNext()) {
String* tmp = static_cast<String*>(oc->get());
if (*tmp == domain)
break;
}
ObjList* od = m_domains.skipNull();
for (; od; od = od->skipNext()) {
String* tmp = static_cast<String*>(od->get());
if (*tmp == domain)
break;
}
if (add) {
if (!oc)
m_components.append(new String(domain));
if (!od)
m_domains.append(new String(domain));
}
else {
if (oc)
oc->remove();
if (od)
od->remove();
}
}
// Check if a component is serviced by this engine
bool YJBEngine::hasComponent(const String& domain)
{
Lock lock(this);
for (ObjList* o = m_components.skipNull(); o; o = o->skipNext()) {
String* tmp = static_cast<String*>(o->get());
if (*tmp == domain)
return true;
}
return false;
}
// Check if a resource name is retricted // Check if a resource name is retricted
bool YJBEngine::restrictedResource(const String& name) bool YJBEngine::restrictedResource(const String& name)
{ {
@ -914,6 +985,60 @@ bool YJBEngine::restrictedResource(const String& name)
return false; return false;
} }
// Check if a domain is serviced by a server item
bool YJBEngine::isServerItemDomain(const String& domain)
{
Lock lock(this);
for (ObjList* o = m_items.skipNull(); o; o = o->skipNext()) {
JabberID* jid = static_cast<JabberID*>(o->get());
if (domain == jid->domain())
return true;
}
return false;
}
// Internally route c2s <--> comp stanzas
// Return true if handled
bool YJBEngine::routeInternal(JBEvent* ev)
{
JBStream* s = 0;
if (ev->stream()->type() == JBStream::s2s) {
// Incoming on s2s: check if it should be routed to a component
if (!hasComponent(ev->to().domain()))
return false;
String comp;
getSubDomain(comp,ev->to().domain());
if (comp) {
String local = ev->to().domain().substr(comp.length() + 1);
s = findServerStream(local,ev->to().domain(),true);
}
}
else if (ev->stream()->type() == JBStream::comp) {
// Incoming on comp: check if it should be routed to a remote domain
if (hasDomain(ev->to().domain()))
return false;
s = findServerStream(ev->from().domain(),ev->to().domain(),true);
}
else
return false;
DDebug(this,DebugAll,"routeInternal() src=%s from=%s to=%s stream=%p",
ev->stream()->typeName(),ev->from().c_str(),ev->to().c_str(),s);
if (s) {
XmlElement* xml = ev->releaseXml();
bool ok = false;
if (xml) {
xml->removeAttribute(XmlElement::s_ns);
ok = s->sendStanza(xml);
}
if (!ok)
ev->sendStanzaError(XMPPError::Internal);
}
else
ev->sendStanzaError(XMPPError::NoRemote,0,XMPPError::TypeCancel);
return true;
}
// Process an 'user.roster' messages // Process an 'user.roster' messages
void YJBEngine::handleUserRoster(Message& msg) void YJBEngine::handleUserRoster(Message& msg)
{ {
@ -976,7 +1101,7 @@ bool YJBEngine::handleJabberIq(Message& msg)
return false; return false;
DDebug(this,DebugAll,"YJBEngine::handleJabberIq() from=%s to=%s",from.c_str(),to.c_str()); DDebug(this,DebugAll,"YJBEngine::handleJabberIq() from=%s to=%s",from.c_str(),to.c_str());
JBStream* stream = 0; JBStream* stream = 0;
if (hasDomain(to.domain())) { if (hasDomain(to.domain()) && !hasComponent(to.domain())) {
stream = findClientStream(true,to); stream = findClientStream(true,to);
if (!(stream && stream->flag(JBStream::AvailableResource))) if (!(stream && stream->flag(JBStream::AvailableResource)))
TelEngine::destruct(stream); TelEngine::destruct(stream);
@ -1014,7 +1139,7 @@ bool YJBEngine::handleResSubscribe(Message& msg)
msg.c_str(),from.bare().c_str(),to.bare().c_str(),oper->c_str()); msg.c_str(),from.bare().c_str(),to.bare().c_str(),oper->c_str());
XmlElement* xml = getPresenceXml(msg,from.bare(),presType); XmlElement* xml = getPresenceXml(msg,from.bare(),presType);
bool ok = false; bool ok = false;
if (hasDomain(to.domain())) { if (hasDomain(to.domain()) && !hasComponent(to.domain())) {
xml->removeAttribute("to"); xml->removeAttribute("to");
// RFC 3921: (un)subscribe requests are sent only to available resources // RFC 3921: (un)subscribe requests are sent only to available resources
String* instance = msg.getParam("instance"); String* instance = msg.getParam("instance");
@ -1054,7 +1179,7 @@ bool YJBEngine::handleResNotify(Message& msg)
Debug(this,DebugAll,"Processing %s from=%s to=%s oper=%s", Debug(this,DebugAll,"Processing %s from=%s to=%s oper=%s",
msg.c_str(),from.c_str(),to.c_str(),oper->c_str()); msg.c_str(),from.c_str(),to.c_str(),oper->c_str());
XmlElement* xml = 0; XmlElement* xml = 0;
bool c2s = hasDomain(to.domain()); bool c2s = hasDomain(to.domain()) && !hasComponent(to.domain());
bool online = (*oper == "online" || *oper == "update"); bool online = (*oper == "online" || *oper == "update");
if (online || *oper == "offline" || *oper == "delete") { if (online || *oper == "offline" || *oper == "delete") {
if (!from.resource()) if (!from.resource())
@ -1135,7 +1260,7 @@ bool YJBEngine::handleMsgExecute(Message& msg)
if (!(caller.resource())) if (!(caller.resource()))
return false; return false;
DDebug(this,DebugAll,"handleMsgExecute() caller=%s called=%s",caller.c_str(),called.c_str()); DDebug(this,DebugAll,"handleMsgExecute() caller=%s called=%s",caller.c_str(),called.c_str());
if (hasDomain(called.domain())) { if (hasDomain(called.domain()) && !hasComponent(called.domain())) {
// RFC 3921 11.1: Send chat only to clients with non-negative resource priority // RFC 3921 11.1: Send chat only to clients with non-negative resource priority
bool ok = false; bool ok = false;
unsigned int n = msg.getIntValue("instance.count"); unsigned int n = msg.getIntValue("instance.count");
@ -1210,7 +1335,20 @@ bool YJBEngine::handleJabberItem(Message& msg)
Debug(this,DebugAll,"Removed item '%s'",jid); Debug(this,DebugAll,"Removed item '%s'",jid);
} }
else if (!o) { else if (!o) {
m_items.append(new String(jid)); JabberID* j = new JabberID(jid);
String comp;
getSubDomain(comp,j->domain());
if (comp) {
String local(j->domain().substr(comp.length() + 1));
if (findServerStream(local,j->domain(),true)) {
Debug(this,DebugMild,
"Request to add server item '%s' while already having a component in the same domain",
jid);
TelEngine::destruct(j);
return false;
}
}
m_items.append(j);
Debug(this,DebugAll,"Added item '%s'",jid); Debug(this,DebugAll,"Added item '%s'",jid);
} }
return false; return false;
@ -1324,6 +1462,19 @@ void YJBEngine::processPresenceStanza(JBEvent* ev)
ev->sendStanzaError(XMPPError::ServiceUnavailable); ev->sendStanzaError(XMPPError::ServiceUnavailable);
} }
// Retrieve a subdomain of a serviced domain
void YJBEngine::getSubDomain(String& subdomain, const String& domain)
{
Lock lock(this);
for (ObjList* o = m_domains.skipNull(); o; o = o->skipNext()) {
String cmp("." + o->get()->toString());
if (domain.endsWith(cmp) && domain.length() > cmp.length()) {
subdomain = domain.substr(0,domain.length() - cmp.length());
return;
}
}
}
// Process a stream start element received by an incoming stream // Process a stream start element received by an incoming stream
// The given event is always valid and carry a valid stream // The given event is always valid and carry a valid stream
// Set local domain and stream features to advertise to remote party // Set local domain and stream features to advertise to remote party
@ -1331,6 +1482,30 @@ void YJBEngine::processStartIn(JBEvent* ev)
{ {
static const char* node = "http://yate.null.ro/yate/server/caps"; static const char* node = "http://yate.null.ro/yate/server/caps";
JBServerStream* comp = ev->serverStream();
if (comp && comp->type() == JBStream::comp) {
String sub;
if (ev->from() && !ev->from().node() && !ev->from().resource())
getSubDomain(sub,ev->from().domain());
if (!sub) {
comp->terminate(-1,true,0,XMPPError::HostUnknown);
return;
}
String local(ev->from().substr(sub.length() + 1));
bool isItem = isServerItemDomain(ev->from().domain());
if (isItem || findServerStream(local,ev->from(),false)) {
if (isItem)
Debug(this,DebugMild,"Component request for server item domain '%s'",
ev->from().domain().c_str());
comp->terminate(-1,true,0,XMPPError::Conflict);
return;
}
// Add component to serviced domain
setComponent(ev->from(),true);
comp->startComp(local,ev->from());
return;
}
// Set c2s stream TLS required flag // Set c2s stream TLS required flag
if (ev->stream()->type() == JBStream::c2s) if (ev->stream()->type() == JBStream::c2s)
ev->stream()->setTlsRequired(m_c2sTlsRequired); ev->stream()->setTlsRequired(m_c2sTlsRequired);
@ -1485,6 +1660,9 @@ void YJBEngine::processStreamEvent(JBEvent* ev)
// TODO: notify offline for all users in remote domain // TODO: notify offline for all users in remote domain
m = userRegister(*s,false); m = userRegister(*s,false);
} }
// Remove component from serviced domain
if (ev->stream()->type() == JBStream::comp)
setComponent(ev->stream()->remote(),false);
} }
} }
else { else {
@ -1787,12 +1965,16 @@ bool YJBEngine::sendStanza(XmlElement*& xml, ObjList*& streams)
// Build a new one if not found // Build a new one if not found
JBStream* YJBEngine::getServerStream(const JabberID& from, const JabberID& to) JBStream* YJBEngine::getServerStream(const JabberID& from, const JabberID& to)
{ {
// Avoid streams to internal components
if (m_items.find(to.domain()) || !hasDomain(from.domain()))
return 0;
JBServerStream* s = findServerStream(from.domain(),to.domain(),true); JBServerStream* s = findServerStream(from.domain(),to.domain(),true);
if (s) if (s)
return s; return s;
// Avoid streams to internal components or (sub)domains
if (m_items.find(to.domain()) || !hasDomain(from.domain()))
return 0;
String comp;
getSubDomain(comp,to.domain());
if (comp)
return 0;
return createServerStream(from.domain(),to.domain()); return createServerStream(from.domain(),to.domain());
} }
@ -1891,9 +2073,11 @@ void YJBEngine::statusParams(String& str)
lock(); lock();
unsigned int c2s = m_c2sReceive ? m_c2sReceive->streamCount() : 0; unsigned int c2s = m_c2sReceive ? m_c2sReceive->streamCount() : 0;
unsigned int s2s = m_s2sReceive ? m_s2sReceive->streamCount() : 0; unsigned int s2s = m_s2sReceive ? m_s2sReceive->streamCount() : 0;
unsigned int comp = m_compReceive ? m_compReceive->streamCount() : 0;
unlock(); unlock();
str << lookup(JBStream::c2s,JBStream::s_typeName) << "=" << c2s; str << lookup(JBStream::c2s,JBStream::s_typeName) << "=" << c2s;
str << "," << lookup(JBStream::s2s,JBStream::s_typeName) << "=" << s2s; str << "," << lookup(JBStream::s2s,JBStream::s_typeName) << "=" << s2s;
str << "," << lookup(JBStream::comp,JBStream::s_typeName) << "=" << comp;
} }
// Fill module status detail // Fill module status detail
@ -2235,7 +2419,8 @@ void JBPendingWorker::processChat(JBPendingJob& job)
return; return;
} }
XMPPError::Type error = XMPPError::NoError; XMPPError::Type error = XMPPError::NoError;
bool localTarget = s_jabber->hasDomain(ev->to().domain()); bool localTarget = s_jabber->hasDomain(ev->to().domain()) &&
!s_jabber->hasComponent(ev->to().domain());
Message m("msg.route"); Message m("msg.route");
while (true) { while (true) {
__plugin.complete(m); __plugin.complete(m);
@ -2426,7 +2611,8 @@ void JBPendingWorker::processIq(JBPendingJob& job)
bool respond = (t == XMPPUtils::IqGet || t == XMPPUtils::IqSet); bool respond = (t == XMPPUtils::IqGet || t == XMPPUtils::IqSet);
// Destination at local domain: deny the request if the sender is not // Destination at local domain: deny the request if the sender is not
// the target's roster // the target's roster
if (s_jabber->hasDomain(ev->to().domain())) { if (s_jabber->hasDomain(ev->to().domain()) &&
!s_jabber->hasComponent(ev->to().domain())) {
// Check auth // Check auth
Message auth("resource.subscribe"); Message auth("resource.subscribe");
auth.addParam("module",__plugin.name()); auth.addParam("module",__plugin.name());
@ -2486,6 +2672,8 @@ UserAuthMessage::UserAuthMessage(JBEvent* ev)
: Message("user.auth"), : Message("user.auth"),
m_stream(ev->stream()->toString()), m_streamType((JBStream::Type)ev->stream()->type()) m_stream(ev->stream()->toString()), m_streamType((JBStream::Type)ev->stream()->type())
{ {
XDebug(&__plugin,DebugAll,"UserAuthMessage stream=%s type=%u",
m_stream.c_str(),m_streamType);
__plugin.complete(*this); __plugin.complete(*this);
addParam("streamtype",ev->stream()->typeName()); addParam("streamtype",ev->stream()->typeName());
ev->stream()->lock(); ev->stream()->lock();
@ -2498,6 +2686,10 @@ UserAuthMessage::UserAuthMessage(JBEvent* ev)
user << ev->stream()->local().domain(); user << ev->stream()->local().domain();
setParam("username",user); setParam("username",user);
} }
else if (m_streamType == JBStream::comp) {
setParam("username",ev->stream()->remote());
setParam("handshake",ev->text());
}
ev->stream()->unlock(); ev->stream()->unlock();
SocketAddr addr; SocketAddr addr;
if (ev->stream()->remoteAddr(addr)) { if (ev->stream()->remoteAddr(addr)) {
@ -2510,13 +2702,13 @@ UserAuthMessage::UserAuthMessage(JBEvent* ev)
void UserAuthMessage::dispatched(bool accepted) void UserAuthMessage::dispatched(bool accepted)
{ {
JBStream* stream = s_jabber->findStream(m_stream,m_streamType); JBStream* stream = s_jabber->findStream(m_stream,m_streamType);
XDebug(&__plugin,DebugAll,"UserAuthMessage::dispatch(%u) stream=(%p,%s) type=%u",
accepted,stream,m_stream.c_str(),m_streamType);
bool ok = false; bool ok = false;
String rspValue; String rspValue;
// Use a while() to break to the end // Use a while() to break to the end
while (stream) { while (stream) {
Lock lock(stream); Lock lock(stream);
if (!stream->m_sasl)
break;
// Returned value '-' means deny // Returned value '-' means deny
if (accepted && retValue() == "-") if (accepted && retValue() == "-")
break; break;
@ -2527,8 +2719,11 @@ void UserAuthMessage::dispatched(bool accepted)
if (!getValue("username")) if (!getValue("username"))
break; break;
// Check credentials // Check credentials
if (m_streamType != JBStream::comp) {
String* rsp = getParam("response"); String* rsp = getParam("response");
if (rsp) { if (rsp) {
if (!stream->m_sasl)
break;
if (stream->m_sasl->m_plain) if (stream->m_sasl->m_plain)
ok = (*rsp == retValue()); ok = (*rsp == retValue());
else { else {
@ -2539,6 +2734,15 @@ void UserAuthMessage::dispatched(bool accepted)
stream->m_sasl->buildMD5Digest(rspValue,retValue(),false); stream->m_sasl->buildMD5Digest(rspValue,retValue(),false);
} }
} }
}
else {
JBServerStream* comp = stream->serverStream();
if (comp) {
String digest;
comp->buildHandshake(digest,retValue());
ok = (digest == getValue("handshake"));
}
}
break; break;
} }
if (stream) if (stream)