Added jabber stream compression support.

git-svn-id: http://yate.null.ro/svn/yate/trunk@3436 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
marian 2010-07-16 11:11:50 +00:00
parent 6fb5c2aea9
commit 7f13563d83
8 changed files with 634 additions and 63 deletions

View File

@ -102,6 +102,13 @@
; Defaults to no
;dump_iq=no
; compression_formats: string: Comma separated list of supported compression formats
; This parameter configures the formats to be offered on incoming streams
; This parameter is not applied on reload
; Set it to empty string to disable stream compression on incoming streams
; Defaults to zlib if missing
;compression_formats=zlib
; printxml: boolean/string: Print sent/received XML data to output if debug
; level is at least 9
; Allowed values are boolean values or 'verbose' string

View File

@ -1012,6 +1012,12 @@ void JBEngine::connectStream(JBStream* stream)
Debug(this,DebugStub,"JBEngine::connectStream() not implemented!");
}
// Start stream compression
void JBEngine::compressStream(JBStream* stream, const String& formats)
{
Debug(this,DebugStub,"JBEngine::compressStream() not implemented!");
}
// Build a dialback key
void JBEngine::buildDialbackKey(const String& id, const String& local,
const String& remote, String& key)

View File

@ -26,6 +26,14 @@
using namespace TelEngine;
#ifdef XDEBUG
#define JBSTREAM_DEBUG_COMPRESS
#define JBSTREAM_DEBUG_SOCKET
#else
// #define JBSTREAM_DEBUG_COMPRESS // Show (de)compress debug
// #define JBSTREAM_DEBUG_SOCKET // Show socket read/write debug
#endif
static const String s_dbVerify = "verify";
static const String s_dbResult = "result";
@ -87,6 +95,7 @@ const TokenDict JBStream::s_stateName[] = {
{"Auth", Auth},
{"Challenge", Challenge},
{"Securing", Securing},
{"Compressing", Compressing},
{"Register", Register},
{"Destroy", Destroy},
{0,0},
@ -98,17 +107,20 @@ const TokenDict JBStream::s_flagName[] = {
{"dialback", DialbackOnly},
{"allowplainauth", AllowPlainAuth},
{"register", RegisterUser},
{"compress", Compress},
{"error", InError},
// Internal flags
{"roster_requested", RosterRequested},
{"online", AvailableResource},
{"secured", StreamTls | StreamSecured},
{"encrypted", StreamTls},
{"authenticated", StreamAuthenticated},
{"waitbindrsp", StreamWaitBindRsp},
{"waitsessrsp", StreamWaitSessRsp},
{"waitchallenge", StreamWaitChallenge},
{"waitchallengersp", StreamWaitChgRsp},
{"version1", StreamRemoteVer1},
{"compressed", StreamCompressed},
{0,0}
};
@ -141,7 +153,7 @@ JBStream::JBStream(JBEngine* engine, Socket* socket, Type t, bool ssl)
m_engine(engine), m_type(t),
m_incoming(true), m_terminateEvent(0),
m_xmlDom(0), m_socket(0), m_socketFlags(0), m_socketMutex(true,"JBStream::Socket"),
m_connectPort(0)
m_connectPort(0), m_compress(0)
{
if (ssl)
setFlags(StreamSecured | StreamTls);
@ -172,7 +184,7 @@ JBStream::JBStream(JBEngine* engine, Type t, const JabberID& local, const Jabber
m_incoming(false), m_name(name),
m_terminateEvent(0),
m_xmlDom(0), m_socket(0), m_socketFlags(0), m_socketMutex(true,"JBStream::Socket"),
m_connectPort(0)
m_connectPort(0), m_compress(0)
{
if (!m_name)
m_engine->buildStreamName(m_name,this);
@ -186,6 +198,9 @@ JBStream::JBStream(JBEngine* engine, Type t, const JabberID& local, const Jabber
}
else
updateFromRemoteDef();
// Compress always defaults to true if not explicitly disabled
if (!flag(Compress) && !(params && params->getBoolValue("nocompression")))
setFlags(Compress);
Debug(this,DebugAll,"JBStream::JBStream(%p,%s,%s,%s) outgoing [%p]",
engine,typeName(),local.c_str(),remote.c_str(),this);
setXmlns();
@ -230,6 +245,8 @@ void* JBStream::getObject(const String& name) const
{
if (name == "Socket*")
return state() == Securing ? (void*)&m_socket : 0;
if (name == "Compressor*")
return (void*)&m_compress;
if (name == "JBStream")
return (void*)this;
return RefObject::getObject(name);
@ -303,14 +320,48 @@ bool JBStream::readSocket(char* buf, unsigned int len)
return false;
}
if (read && read != Socket::socketError()) {
buf[read] = 0;
XDebug(this,DebugInfo,"Received %s [%p]",buf,this);
if (!m_xmlDom->parse(buf)) {
if (m_xmlDom->error() != XmlSaxParser::Incomplete)
error = XMPPError::Xml;
else if (m_xmlDom->buffer().length() > m_engine->m_maxIncompleteXml)
error = XMPPError::Policy;
if (!flag(StreamCompressed)) {
buf[read] = 0;
#ifdef JBSTREAM_DEBUG_SOCKET
Debug(this,DebugInfo,"Received %s [%p]",buf,this);
#endif
if (!m_xmlDom->parse(buf)) {
if (m_xmlDom->error() != XmlSaxParser::Incomplete)
error = XMPPError::Xml;
else if (m_xmlDom->buffer().length() > m_engine->m_maxIncompleteXml)
error = XMPPError::Policy;
}
}
else if (m_compress) {
#ifdef JBSTREAM_DEBUG_SOCKET
Debug(this,DebugInfo,"Received %d compressed bytes [%p]",read,this);
#endif
DataBlock d;
int res = m_compress->decompress(buf,read,d);
if (res == read) {
#ifdef JBSTREAM_DEBUG_COMPRESS
Debug(this,DebugInfo,"Decompressed %d --> %u [%p]",read,d.length(),this);
#endif
if (d.length()) {
char c = 0;
d.append(&c,1);
buf = (char*)d.data();
#ifdef JBSTREAM_DEBUG_SOCKET
Debug(this,DebugInfo,"Received compressed %s [%p]",buf,this);
#endif
if (!m_xmlDom->parse(buf)) {
if (m_xmlDom->error() != XmlSaxParser::Incomplete)
error = XMPPError::Xml;
else if (m_xmlDom->buffer().length() > m_engine->m_maxIncompleteXml)
error = XMPPError::Policy;
}
}
}
else
error = XMPPError::UndefinedCondition;
}
else
error = XMPPError::Internal;
}
socketSetReading(false);
if (read) {
@ -350,18 +401,24 @@ bool JBStream::readSocket(char* buf, unsigned int len)
int location = 0;
const char* reason = 0;
if (error != XMPPError::SocketError) {
String tmp;
if (error == XMPPError::Xml) {
reason = m_xmlDom->getError();
tmp = m_xmlDom->buffer();
Debug(this,DebugNote,"Parser error='%s' buffer='%s' [%p]",
reason,m_xmlDom->buffer().c_str(),this);
}
else if (error == XMPPError::UndefinedCondition) {
reason = "Decompression failure";
Debug(this,DebugNote,"Decompressor failure [%p]",this);
}
else if (error == XMPPError::Internal) {
reason = "Decompression failure";
Debug(this,DebugNote,"No decompressor [%p]",this);
}
else {
tmp << "overflow len=" << m_xmlDom->buffer().length() << " max=" <<
m_engine->m_maxIncompleteXml;
reason = "XML element too long";
Debug(this,DebugNote,"Parser error='%s' overflow len=%u max= %u [%p]",
reason,m_xmlDom->buffer().length(),m_engine->m_maxIncompleteXml,this);
}
Debug(this,DebugNote,"Parser error='%s' buffer='%s' [%p]",
reason,tmp.c_str(),this);
}
else if (read) {
String tmp;
@ -435,8 +492,8 @@ bool JBStream::sendStreamXml(State newState, XmlElement* first, XmlElement* seco
Lock lock(this);
bool ok = false;
XmlFragment frag;
// Use a while() to break to the end: safe cleanup
while (true) {
// Use a do while() to break to the end: safe cleanup
do {
if (m_state == Idle || m_state == Destroy)
break;
// Check if we have unsent stream xml
@ -464,10 +521,13 @@ bool JBStream::sendStreamXml(State newState, XmlElement* first, XmlElement* seco
}
}
first = second = third = 0;
if (flag(StreamCompressed) && !compress()) {
ok = false;
break;
}
m_engine->printXml(this,true,frag);
ok = sendPending(true);
break;
}
} while (false);
TelEngine::destruct(first);
TelEngine::destruct(second);
TelEngine::destruct(third);
@ -516,7 +576,7 @@ void JBStream::start(XMPPFeatureList* features, XmlElement* caps, bool useVer1)
if (features)
m_features.add(*features);
if (useVer1 && flag(StreamRemoteVer1))
m_flags |= StreamLocalVer1;
setFlags(StreamLocalVer1);
if (flag(StreamRemoteVer1) && flag(StreamLocalVer1)) {
// Set secured flag if we don't advertise TLS
if (!(flag(StreamSecured) || m_features.get(XMPPNamespace::Tls)))
@ -638,6 +698,7 @@ void JBStream::terminate(int location, bool destroy, XmlElement* xml, int error,
location,destroy,xml,error,reason,final,stateName(),this);
Lock lock(this);
m_pending.clear();
m_outXmlCompress.clear();
// Already in destroy
if (state() == Destroy) {
TelEngine::destruct(xml);
@ -696,6 +757,7 @@ void JBStream::terminate(int location, bool destroy, XmlElement* xml, int error,
}
resetConnection();
m_outStreamXml.clear();
m_outStreamXmlCompress.clear();
// Always set termination event, except when called from destructor
if (!(final || m_terminateEvent)) {
@ -893,6 +955,9 @@ void JBStream::process(u_int64_t time)
case Register:
processRegister(xml,from,to);
break;
case Compressing:
processCompressing(xml,from,to);
break;
default:
dropXml(xml,"unhandled stream state in process()");
}
@ -994,6 +1059,7 @@ void JBStream::resetConnection(Socket* sock)
delete m_xmlDom;
m_xmlDom = 0;
}
TelEngine::destruct(m_compress);
break;
}
lock.drop();
@ -1055,6 +1121,63 @@ bool JBStream::processStart(const XmlElement* xml, const JabberID& from,
return true;
}
// Process elements in Compressing state
// Return false if stream termination was initiated
bool JBStream::processCompressing(XmlElement* xml, const JabberID& from,
const JabberID& to)
{
XDebug(this,DebugAll,"JBStream::processCompressing() [%p]",this);
int t = XmlTag::Count;
int n = XMPPNamespace::Count;
XMPPUtils::getTag(*xml,t,n);
if (n != XMPPNamespace::Compress)
return dropXml(xml,"expecting compression namespace");
if (outgoing()) {
// Expecting: compressed/failure
bool ok = (t == XmlTag::Compressed);
if (!ok && t != XmlTag::Failure)
return dropXml(xml,"expecting compress response (compressed/failure)");
if (ok) {
if (m_compress)
setFlags(StreamCompressed);
else
return destroyDropXml(xml,XMPPError::Internal,"no compressor");
}
else {
XmlElement* ch = xml->findFirstChild();
Debug(this,DebugInfo,"Compress failed at remote party error=%s [%p]",
ch ? ch->tag() : "",this);
TelEngine::destruct(m_compress);
}
TelEngine::destruct(xml);
// Restart the stream on success
if (ok) {
XmlElement* s = buildStreamStart();
return sendStreamXml(WaitStart,s);
}
// Compress failed: continue
JBServerStream* server = serverStream();
if (server)
return server->sendDialback();
JBClientStream* client = clientStream();
if (client)
return client->bind();
Debug(this,DebugNote,"Unhandled stream type in %s state [%p]",stateName(),this);
terminate(0,true,0,XMPPError::Internal);
return true;
}
// Incoming s2s waiting for compression or any other element
if (type() == s2s && m_features.get(XMPPNamespace::CompressFeature)) {
if (t == XmlTag::Compress && n == XMPPNamespace::Compress)
return handleCompressReq(xml);
// Change state to Running
changeState(Running);
return processRunning(xml,from,to);
}
return dropXml(xml,"not implemented");
}
// Process elements in Register state
bool JBStream::processRegister(XmlElement* xml, const JabberID& from,
const JabberID& to)
@ -1160,6 +1283,39 @@ bool JBStream::processStreamStart(const XmlElement* xml)
return false;
}
// Handle an already checked (tag and namespace) compress request
// Respond to it. Change stream state on success
bool JBStream::handleCompressReq(XmlElement* xml)
{
XMPPError::Type error = XMPPError::UnsupportedMethod;
State newState = state();
XmlElement* rsp = 0;
XmlElement* m = XMPPUtils::findFirstChild(*xml,XmlTag::Method,
XMPPNamespace::Compress);
if (m) {
// Get and check the method
const String& method = m->getText();
XMPPFeatureCompress* c = m_features.getCompress();
if (method && c && c->hasMethod(method)) {
// Build the (de)compressor
Lock lock(m_socketMutex);
m_engine->compressStream(this,method);
if (m_compress) {
newState = WaitStart;
setFlags(SetCompressed);
m_features.remove(XMPPNamespace::CompressFeature);
rsp = XMPPUtils::createElement(XmlTag::Compressed,XMPPNamespace::Compress);
}
else
error = XMPPError::SetupFailed;
}
}
TelEngine::destruct(xml);
if (!rsp)
rsp = XMPPUtils::createFailure(XMPPNamespace::Compress,error);
return sendStreamXml(newState,rsp);
}
// Check if a received element is a stream error one
bool JBStream::streamError(XmlElement* xml)
{
@ -1368,6 +1524,28 @@ void JBStream::changeState(State newState, u_int64_t time)
setIdleTimer(time);
}
// Check if the stream compress flag is set and compression was offered by remote party
XmlElement* JBStream::checkCompress()
{
if (flag(StreamCompressed) || !flag(Compress))
return 0;
XMPPFeatureCompress* c = m_features.getCompress();
if (!c)
return 0;
if (!(c && c->methods()))
return 0;
XmlElement* x = 0;
Lock lock(m_socketMutex);
m_engine->compressStream(this,c->methods());
if (m_compress && m_compress->format()) {
x = XMPPUtils::createElement(XmlTag::Compress,XMPPNamespace::Compress);
x->addChild(XMPPUtils::createElement(XmlTag::Method,m_compress->format()));
}
else
TelEngine::destruct(m_compress);
return x;
}
// Check for pending events. Set the last event
void JBStream::checkPendingEvent()
{
@ -1398,18 +1576,40 @@ bool JBStream::sendPending(bool streamOnly)
if (!m_socket)
return false;
XDebug(this,DebugAll,"JBStream::sendPending() [%p]",this);
bool noComp = !flag(StreamCompressed);
// Always try to send pending stream XML first
if (m_outStreamXml) {
unsigned int len = m_outStreamXml.length();
if (!writeSocket(m_outStreamXml.c_str(),len)) {
const void* buf = 0;
unsigned int len = 0;
if (noComp) {
buf = m_outStreamXml.c_str();
len = m_outStreamXml.length();
}
else {
buf = m_outStreamXmlCompress.data();
len = m_outStreamXmlCompress.length();
}
if (!writeSocket(buf,len)) {
terminate(0,m_incoming,0,XMPPError::SocketError);
return false;
}
bool all = (len == m_outStreamXml.length());
if (all)
m_outStreamXml.clear();
else
m_outStreamXml = m_outStreamXml.substr(len);
bool all = false;
if (noComp) {
all = (len == m_outStreamXml.length());
if (all)
m_outStreamXml.clear();
else
m_outStreamXml = m_outStreamXml.substr(len);
}
else {
all = (len == m_outStreamXmlCompress.length());
if (all) {
m_outStreamXml.clear();
m_outStreamXmlCompress.clear();
}
else
m_outStreamXmlCompress.cut(-len);
}
// Start TLS now for incoming streams
if (m_incoming && m_state == Securing) {
if (all) {
@ -1419,6 +1619,9 @@ bool JBStream::sendPending(bool streamOnly)
}
return true;
}
// Check set StreamCompressed flag if all data sent
if (all && flag(SetCompressed))
setFlags(StreamCompressed);
if (streamOnly || !all)
return true;
}
@ -1435,16 +1638,37 @@ bool JBStream::sendPending(bool streamOnly)
m_pending.remove(eout,true);
return true;
}
bool sent = eout->sent();
const void* buf = 0;
unsigned int len = 0;
if (noComp)
buf = (const void*)eout->getData(len);
else {
if (!sent) {
// Make sure the buffer is prepared for sending
eout->getData(len);
m_outXmlCompress.clear();
if (!compress(eout))
return false;
}
buf = m_outXmlCompress.data();
len = m_outXmlCompress.length();
}
// Print the element only if it's the first time we try to send it
if (!eout->sent())
if (!sent)
m_engine->printXml(this,true,*xml);
u_int32_t len;
const char* data = eout->getData(len);
if (writeSocket(data,len)) {
if (writeSocket(buf,len)) {
setIdleTimer();
// Adjust element's buffer. Remove it from list on completion
eout->dataSent(len);
unsigned int rest = eout->dataCount();
unsigned int rest = 0;
if (noComp) {
eout->dataSent(len);
rest = eout->dataCount();
}
else {
m_outXmlCompress.cut(-len);
rest = m_outXmlCompress.length();
}
if (!rest) {
DDebug(this,DebugAll,"Sent element (%p,%s) [%p]",xml,xml->tag(),this);
m_pending.remove(eout,true);
@ -1462,7 +1686,7 @@ bool JBStream::sendPending(bool streamOnly)
}
// Write data to socket
bool JBStream::writeSocket(const char* data, unsigned int& len)
bool JBStream::writeSocket(const void* data, unsigned int& len)
{
if (!(data && m_socket)) {
len = 0;
@ -1475,15 +1699,24 @@ bool JBStream::writeSocket(const char* data, unsigned int& len)
}
socketSetWriting(true);
lock.drop();
XDebug(this,DebugInfo,"Sending %s [%p]",data,this);
#ifdef JBSTREAM_DEBUG_SOCKET
if (!flag(StreamCompressed))
Debug(this,DebugInfo,"Sending %s [%p]",(const char*)data,this);
else
Debug(this,DebugInfo,"Sending %u compressed bytes [%p]",len,this);
#endif
int w = m_socket->writeData(data,len);
if (w != Socket::socketError())
len = w;
else
len = 0;
#ifdef XDEBUG
String sent(data,len);
Debug(this,DebugInfo,"Sent %s [%p]",sent.c_str(),this);
#ifdef JBSTREAM_DEBUG_SOCKET
if (!flag(StreamCompressed)) {
String sent((const char*)data,len);
Debug(this,DebugInfo,"Sent %s [%p]",sent.c_str(),this);
}
else
Debug(this,DebugInfo,"Sent %u compressed bytes [%p]",len,this);
#endif
Lock lck(m_socketMutex);
// Check if the connection is waiting to be reset
@ -1546,6 +1779,34 @@ bool JBStream::dropXml(XmlElement*& xml, const char* reason)
return true;
}
// Set stream flag mask
void JBStream::setFlags(int mask)
{
#ifdef XDEBUG
String f;
XMPPUtils::buildFlags(f,mask,s_flagName);
Debug(this,DebugAll,"Setting flags 0x%X (%s) current=0x%X [%p]",
mask,f.c_str(),m_flags,this);
#endif
m_flags |= mask;
#ifdef DEBUG
if (0 != (mask & StreamCompressed))
Debug(this,DebugAll,"Stream is using compression [%p]",this);
#endif
}
// Reset stream flag mask
void JBStream::resetFlags(int mask)
{
#ifdef XDEBUG
String f;
XMPPUtils::buildFlags(f,mask,s_flagName);
Debug(this,DebugAll,"Resetting flags 0x%X (%s) current=0x%X [%p]",
mask,f.c_str(),m_flags,this);
#endif
m_flags &= ~mask;
}
// Set the idle timer in Running state
void JBStream::setIdleTimer(u_int64_t msecNow)
{
@ -1618,7 +1879,7 @@ bool JBStream::processSaslAuth(XmlElement* xml, const JabberID& from, const Jabb
return true;
if (!XMPPUtils::isTag(*xml,XmlTag::Auth,XMPPNamespace::Sasl))
return dropXml(xml,"expecting 'auth' in sasl namespace");
XMPPFeatureSasl* sasl = static_cast<XMPPFeatureSasl*>(m_features.get(XMPPNamespace::Sasl));
XMPPFeatureSasl* sasl = m_features.getSasl();
TelEngine::destruct(m_sasl);
XMPPError::Type error = XMPPError::NoError;
const char* mName = xml->attribute("mechanism");
@ -1719,8 +1980,15 @@ bool JBStream::processFeaturesIn(XmlElement* xml, const JabberID& from, const Ja
return true;
}
XMPPFeature* f = 0;
// Stream compression feature and compression namespace are not the same!
if (ns != XMPPNamespace::Compress)
f = m_features.get(ns);
else
f = m_features.get(XMPPNamespace::CompressFeature);
// Check if received unexpected feature
if (!m_features.get(ns)) {
if (!f) {
// Check for some features that can be negotiated via 'iq' elements
if (m_type == c2s && *t == XMPPUtils::s_tag[XmlTag::Iq] && ns == m_xmlns) {
int chTag = XmlTag::Count;
@ -1875,6 +2143,12 @@ bool JBStream::processFeaturesIn(XmlElement* xml, const JabberID& from, const Ja
}
return processSaslAuth(xml,from,to);
}
// Stream compression
if (ns == XMPPNamespace::Compress) {
if (*t != XMPPUtils::s_tag[XmlTag::Compress])
return dropXml(xml,"expecting stream compression 'compress' element");
return handleCompressReq(xml);
}
return dropXml(xml,"unhandled stream feature");
}
@ -1923,11 +2197,21 @@ bool JBStream::processFeaturesOut(XmlElement* xml, const JabberID& from,
return client->requestRegister(false);
}
}
// Check compression
XmlElement* x = checkCompress();
if (x)
return sendStreamXml(Compressing,x);
JBClientStream* client = clientStream();
if (client) {
TelEngine::destruct(xml);
return client->bind();
}
JBServerStream* server = serverStream();
if (server) {
TelEngine::destruct(xml);
changeState(Running);
return true;
}
return dropXml(xml,"incomplete features process for outgoing stream");
}
@ -1997,6 +2281,32 @@ void JBStream::eventTerminated(const JBEvent* ev)
}
}
// Compress data to be sent (the pending stream xml buffer or pending stanza)
// Return false on failure
bool JBStream::compress(XmlElementOut* xml)
{
DataBlock& buf = xml ? m_outXmlCompress : m_outStreamXmlCompress;
const String& xmlBuf = xml ? xml->buffer() : m_outStreamXml;
m_socketMutex.lock();
int res = m_compress ? m_compress->compress(xmlBuf.c_str(),xmlBuf.length(),buf) : -1000;
m_socketMutex.unlock();
const char* s = xml ? "pending" : "stream";
if (res >= 0) {
if ((unsigned int)res == xmlBuf.length()) {
#ifdef JBSTREAM_DEBUG_COMPRESS
Debug(this,DebugInfo,"Compressed %s xml %u --> %u [%p]",
s,xmlBuf.length(),buf.length(),this);
#endif
return true;
}
Debug(this,DebugNote,"Partially compressed %s xml %d/%u [%p]",
s,res,xmlBuf.length(),this);
}
else
Debug(this,DebugNote,"Failed to compress %s xml: %d [%p]",s,res,this);
return false;
}
/*
* JBClientStream
@ -2335,7 +2645,7 @@ bool JBClientStream::processAuth(XmlElement* xml, const JabberID& from,
return false;
}
TelEngine::destruct(xml);
resetFlags(StreamWaitBindRsp);
resetFlags(StreamWaitSessRsp);
changeState(Running);
return true;
}
@ -2420,10 +2730,7 @@ bool JBClientStream::startAuth()
TelEngine::destruct(m_sasl);
XMPPFeature* f = m_features.get(XMPPNamespace::Sasl);
XMPPFeatureSasl* sasl = 0;
if (f)
sasl = static_cast<XMPPFeatureSasl*>(f->getObject("XMPPFeatureSasl"));
XMPPFeatureSasl* sasl = m_features.getSasl();
if (!sasl) {
terminate(0,true,0,XMPPError::NoError,"Missing authentication data");
return false;
@ -2532,7 +2839,13 @@ bool JBServerStream::sendDbResult(const JabberID& from, const JabberID& to, XMPP
DDebug(this,DebugAll,"Sending '%s' db:result response from %s to %s [%p]",
result->attribute("type"),from.c_str(),to.c_str(),this);
if (m_state < Running) {
ok = sendStreamXml(Running,result);
// Authenticated, incoming, not compressed which might still be compressed:
// change state to Compressing
if (valid && !flag(StreamCompressed) &&
m_features.get(XMPPNamespace::CompressFeature))
ok = sendStreamXml(Compressing,result);
else
ok = sendStreamXml(Running,result);
// Remove features and set the authenticated flag
if (ok && valid) {
m_features.remove(XMPPNamespace::Sasl);
@ -2697,6 +3010,10 @@ bool JBServerStream::processAuth(XmlElement* xml, const JabberID& from,
// Stream authenticated
TelEngine::destruct(xml);
setFlags(StreamAuthenticated);
// Check compression
XmlElement* x = checkCompress();
if (x)
return sendStreamXml(Compressing,x);
changeState(Running);
return true;
}

View File

@ -232,6 +232,8 @@ const String XMPPNamespace::s_array[Count] = {
"http://jabber.org/protocol/muc#owner", // MucOwner
"http://jabber.org/protocol/muc#user", // MucUser
"urn:xmpp:features:dialback", // DialbackFeature
"http://jabber.org/protocol/compress", // Compress
"http://jabber.org/features/compress", // CompressFeature
};
const String XMPPError::s_array[Count] = {
@ -291,6 +293,8 @@ const String XMPPError::s_array[Count] = {
"subscription-required", // Subscription
"unexpected-request", // Request
"", // SocketError
"unsupported-method", // UnsupportedMethod
"setup-failed", // SetupFailed
"cancel", // TypeCancel
"continue", // TypeContinue
"modify", // TypeModify
@ -367,6 +371,10 @@ const String XmlTag::s_array[Count] = {
"c", // EntityCapsTag
"handshake", // Handshake
"dialback", // Dialback
"method", // Method
"compress", // Compress
"compressed", // Compressed
"compression", // Compression
};
XMPPNamespace XMPPUtils::s_ns;
@ -620,7 +628,9 @@ void XMPPFeature::addReqChild(XmlElement& xml)
// Build a feature from a stream:features child
XMPPFeature* XMPPFeature::fromStreamFeature(XmlElement& xml)
{
int t = XMPPUtils::tag(xml);
int t = XmlTag::Count;
int n = XMPPNamespace::Count;
XMPPUtils::getTag(xml,t,n);
if (t == XmlTag::Count) {
DDebug(DebugStub,"XMPPFeature::fromStreamFeature() unhandled tag '%s'",
xml.tag());
@ -628,7 +638,9 @@ XMPPFeature* XMPPFeature::fromStreamFeature(XmlElement& xml)
}
XMPPFeature* f = 0;
bool required = XMPPUtils::required(xml);
if (t == XmlTag::Mechanisms && XMPPUtils::hasXmlns(xml,XMPPNamespace::Sasl)) {
DDebug(DebugAll,"XMPPFeature::fromStreamFeature() processing '%s' ns=%s",
xml.tag(),TelEngine::c_safe(xml.xmlns()));
if (t == XmlTag::Mechanisms && n == XMPPNamespace::Sasl) {
int mech = 0;
// Get mechanisms
XmlElement* x = XMPPUtils::findFirstChild(xml,XmlTag::Mechanism);
@ -645,6 +657,14 @@ XMPPFeature* XMPPFeature::fromStreamFeature(XmlElement& xml)
}
f = new XMPPFeatureSasl(mech,required);
}
else if (t == XmlTag::Compression && n == XMPPNamespace::CompressFeature) {
String meth;
// Get methods
XmlElement* x = 0;
while (0 != (x = XMPPUtils::findNextChild(xml,x,XmlTag::Method)))
meth.append(x->getText(),false);
f = new XMPPFeatureCompress(meth,required);
}
else {
String* xmlns = xml.xmlns();
if (!TelEngine::null(xmlns))
@ -677,6 +697,34 @@ XmlElement* XMPPFeatureSasl::build(bool addReq)
}
/*
* XMPPFeatureCompress
*/
// Check if a given method is supported by this feature
bool XMPPFeatureCompress::hasMethod(const String& method) const
{
ObjList* list = m_methods.split(',',false);
bool ok = 0 != list->find(method);
TelEngine::destruct(list);
return ok;
}
// Build an xml element from this feature
XmlElement* XMPPFeatureCompress::build(bool addReq)
{
if (!m_methods)
return 0;
XmlElement* xml = XMPPFeature::build(false);
ObjList* list = m_methods.split(',',false);
for (ObjList* o = list->skipNull(); o; o = o->skipNext())
xml->addChild(XMPPUtils::createElement(XmlTag::Method,o->get()->toString()));
TelEngine::destruct(list);
if (addReq)
addReqChild(*xml);
return xml;
}
/*
* XMPPFeatureList
*/

View File

@ -60,6 +60,7 @@ class JIDIdentity; // A JID's identity
class JIDIdentityList; // A list of JID identities
class XMPPFeature; // A feature (stream or JID)
class XMPPFeatureSasl; // A SASL feature
class XMPPFeatureCompress; // A stream compression feature
class XMPPFeatureList; // Feature list
class XMPPUtils; // Utilities
class XMPPDirVal; // Direction flags
@ -221,7 +222,9 @@ public:
MucOwner = 50, // http://jabber.org/protocol/muc#owner
MucUser = 51, // http://jabber.org/protocol/muc#user
DialbackFeature = 52, // urn:xmpp:features:dialback
Count = 53,
Compress = 53, // http://jabber.org/protocol/compress
CompressFeature = 54, // http://jabber.org/features/compress
Count = 55,
};
/**
@ -303,7 +306,9 @@ public:
Subscription = 53, // subscription-required
Request = 54, // unexpected-request
SocketError = 55, // Don't send any error or stream end tag to remote party
TypeCount = 56
UnsupportedMethod = 56, // unsupported-method
SetupFailed = 57, // setup-failed
TypeCount = 58
};
/**
@ -408,7 +413,11 @@ public:
EntityCapsTag = 65, // c
Handshake = 66, // handshake
Dialback = 67, // dialback
Count = 68
Method = 68, // method
Compress = 69, // compress
Compressed = 70, // compressed
Compression = 71, // compression
Count = 72
};
/**
@ -862,6 +871,50 @@ private:
};
/**
* This class holds a compression feature along with compression methods
* @short A compression feature
*/
class YJABBER_API XMPPFeatureCompress : public XMPPFeature
{
YCLASS(XMPPFeatureCompress,XMPPFeature)
public:
/**
* Constructor
* @param meth Comma separated list of compression methods
* @param required Required flag
*/
inline XMPPFeatureCompress(const String& meth, bool required = false)
: XMPPFeature(XmlTag::Compression,XMPPNamespace::CompressFeature,required),
m_methods(meth)
{}
/**
* Get the compression method(s)
* @return Comma separated list of compression methods
*/
inline const String& methods() const
{ return m_methods; }
/**
* Check if a given method is supported by this feature
* @param method Method to check
* @return True if the method was found in feature's list
*/
bool hasMethod(const String& method) const;
/**
* Build an xml element from this feature
* @param addReq True to add the required/optional child
* @return XmlElement pointer or 0
*/
virtual XmlElement* build(bool addReq = true);
private:
String m_methods; // Compression methods
};
/**
* This class holds a list of JID features
* @short JID feature list
@ -961,13 +1014,20 @@ public:
return o ? static_cast<XMPPFeature*>(o->get()) : 0;
}
/**
/**
* Get a XMPPFeatureSasl feature from list
* @return XMPPFeatureSasl pointer or 0 if not found
*/
inline XMPPFeatureSasl* getSasl()
{ return YOBJECT(XMPPFeatureSasl,get(XMPPNamespace::Sasl)); }
/**
* Get a XMPPFeatureCompress feature from list
* @return XMPPFeatureCompress pointer or 0 if not found
*/
inline XMPPFeatureCompress* getCompress()
{ return YOBJECT(XMPPFeatureCompress,get(XMPPNamespace::CompressFeature)); }
/**
* Build stream features from this list
* @return XmlElement pointer

View File

@ -492,6 +492,10 @@ public:
Auth = 11, // Auth element (db:result for s2s streams) sent
// Incoming comp: handshake received
Challenge = 12, // 'challenge' element sent/received
Compressing = 15, // Stream is negotiating compression
// outgoing: compress element sent, wait for response
// incoming: waiting for <compressed> element to be sent or
// s2s still waiting for compress
Register = 20, // A new user is currently registering
Running = 100, // Established. Allow XML stanzas to pass over the stream
Destroy, // Stream is destroying. No more traffic allowed
@ -508,12 +512,15 @@ public:
// offered by server the stream will be terminated
DialbackOnly = 0x00000008,// Outgoing s2s dialback stream
RegisterUser = 0x00000010,// Outgoing c2s register new user
Compress = 0x00000020,// Offer/handle compression
InError = 0x00000080,// The stream was terminated with error
// Flags to be managed by the upper layer
RosterRequested = 0x00000100,// c2s: the roster was already requested
AvailableResource = 0x00000200,// c2s: available presence was sent/received
PositivePriority = 0x00000400,// c2s: the resource advertised by the client has priority >= 0
// Internal flags (cleared when the stream is re-started)
SetCompressed = 0x00010000,// Set StreamCompressed flag after succesfully sending
// the current stream xml buffer
StreamSecured = 0x00020000,// TLS stage was done (possible without using TLS)
StreamTls = 0x00040000,// The stream is using TLS
StreamAuthenticated = 0x00080000,// Stream already authenticated
@ -524,6 +531,7 @@ public:
StreamWaitChallenge = 0x04000000,// Outgoing waiting for auth challenge
StreamWaitChgRsp = 0x08000000,// Outgoing waiting challenge response confirmation
StreamRfc3920Chg = 0x10000000,// Outgoing sent empty response to challenge with rspauth (RFC3920)
StreamCompressed = 0x20000000,// The stream is using compression
// Flag masks
StreamFlags = 0x000000ff,
InternalFlags = 0xffff0000,
@ -955,6 +963,16 @@ protected:
virtual bool processAuth(XmlElement* xml, const JabberID& from,
const JabberID& to);
/**
* Process elements in Compressing state
* @param xml Received element (will be consumed)
* @param from Already parsed source JID
* @param to Already parsed destination JID
* @return False if stream termination was initiated
*/
virtual bool processCompressing(XmlElement* xml, const JabberID& from,
const JabberID& to);
/**
* Process elements in Register state
* @param xml Received element (will be consumed)
@ -976,6 +994,14 @@ protected:
*/
bool processStreamStart(const XmlElement* xml);
/**
* Handle an already checked (tag and namespace) compress request
* Respond to it. Change stream state on success
* @param xml Received xml element (will be consumed)
* @return False if stream termination was initiated
*/
bool handleCompressReq(XmlElement* xml);
/**
* Check if a received element is a stream error one
* @param xml Received xml element
@ -1012,6 +1038,12 @@ protected:
*/
void changeState(State newState, u_int64_t time = Time::msecNow());
/**
* Check if the stream compress flag is set and compression was offered by remote party
* @return Compress request XmlElement pointer or 0
*/
XmlElement* checkCompress();
/**
* Check for pending events. Set the last event
*/
@ -1031,7 +1063,7 @@ protected:
* @param len The number of bytes to send. Filled with actually sent bytes on exit
* @return True on success, false if stream termination was initiated
*/
bool writeSocket(const char* data, unsigned int& len);
bool writeSocket(const void* data, unsigned int& len);
/**
* Update stream flags and remote connection data from engine
@ -1069,20 +1101,13 @@ protected:
* Set stream flag mask
* @param mask The bit mask to set
*/
inline void setFlags(int mask) {
XDebug(this,DebugAll,"Setting flags 0x%X current=0x%X [%p]",mask,m_flags,this);
m_flags |= mask;
}
void setFlags(int mask);
/**
* Reset stream flag mask
* @param mask The bit mask to reset
*/
void resetFlags(int mask) {
XDebug(this,DebugAll,"Resetting flags 0x%X current=0x%X [%p]",mask,m_flags,this);
m_flags &= ~mask;
}
void resetFlags(int mask);
/**
* Set secured flag. Remove feature from list
@ -1154,6 +1179,9 @@ private:
// Event termination notification
// @param event The notifier. Ignored if it's not m_lastEvent
void eventTerminated(const JBEvent* event);
// Compress data to be sent (the pending stream xml buffer or pending stanza)
// Return false on failure
bool compress(XmlElementOut* xml = 0);
enum {
SocketCanRead = 0x01,
@ -1196,6 +1224,8 @@ private:
JBEvent* m_terminateEvent; // Pending terminate event
// Pending outgoing XML
String m_outStreamXml;
DataBlock m_outStreamXmlCompress;
DataBlock m_outXmlCompress;
// Connection related data
XmlDomParser* m_xmlDom;
Socket* m_socket;
@ -1203,6 +1233,7 @@ private:
Mutex m_socketMutex; // Protect the socket and parser
String m_connectAddr; // Remote ip to connect to
int m_connectPort; // Remote port to connect to
Compressor* m_compress;
};
@ -1839,6 +1870,13 @@ public:
*/
virtual void connectStream(JBStream* stream);
/**
* Start stream compression
* @param stream The stream to compress
* @param formats Supported formats
*/
virtual void compressStream(JBStream* stream, const String& formats);
/**
* Build a dialback key
* @param id The stream id

View File

@ -234,6 +234,8 @@ public:
virtual void encryptStream(JBStream* stream);
// Connect an outgoing stream
virtual void connectStream(JBStream* stream);
// Start stream compression
virtual void compressStream(JBStream* stream, const String& formats);
// Process 'user.roster' messages
bool handleUserRoster(Message& msg, const String& line);
// Process 'user.update' messages
@ -645,6 +647,21 @@ void YJBEngine::connectStream(JBStream* stream)
(new YJBConnectThread(*stream))->startup();
}
// Start stream compression
void YJBEngine::compressStream(JBStream* stream, const String& formats)
{
if (!stream)
return;
DDebug(this,DebugAll,"compressStream(%p,'%s') formats=%s",
stream,stream->toString().c_str(),formats.c_str());
Message msg("engine.compress");
msg.userData(stream);
msg.addParam("formats",formats,false);
msg.addParam("name",stream->toString());
msg.addParam("data_type","text");
Engine::dispatch(msg);
}
// Process 'user.roster' messages
bool YJBEngine::handleUserRoster(Message& msg, const String& line)
{

View File

@ -203,6 +203,8 @@ public:
virtual void encryptStream(JBStream* stream);
// Connect an outgoing stream
virtual void connectStream(JBStream* stream);
// Start stream compression
virtual void compressStream(JBStream* stream, const String& formats);
// Build a dialback key
virtual void buildDialbackKey(const String& id, const String& local,
const String& remote, String& key);
@ -573,6 +575,13 @@ public:
msg.addParam("module",name());
msg.addParam("protocol","jabber");
}
// Retrieve the compression formats
inline String compressFmts() {
Lock lock(this);
return m_compressFmts;
}
// Check if compression formats are supported. Update the list
void checkCompressFmts();
// Check if client/server TLS is available
bool checkTls(bool server, const String& domain = String::empty());
protected:
@ -592,6 +601,7 @@ private:
ObjList m_handlers; // Message handlers list
String m_domain; // Default domain served by the jabber engine
ObjList m_streamListeners;
String m_compressFmts; // Supported compression formats
};
@ -1120,6 +1130,20 @@ void YJBEngine::connectStream(JBStream* stream)
(new YJBConnectThread(*stream))->startup();
}
// Start stream compression
void YJBEngine::compressStream(JBStream* stream, const String& formats)
{
if (!stream)
return;
DDebug(this,DebugAll,"compressStream(%p,'%s') formats=%s",
stream,stream->toString().c_str(),formats.c_str());
Message msg("engine.compress");
msg.userData(stream);
msg.addParam("formats",formats);
msg.addParam("name",stream->toString());
Engine::dispatch(msg);
}
// Build a dialback key
void YJBEngine::buildDialbackKey(const String& id, const String& local,
const String& remote, String& key)
@ -1627,6 +1651,8 @@ bool YJBEngine::handleJabberItem(Message& msg)
void YJBEngine::handleEngineStart(Message& msg)
{
s_engineStarted = true;
// Check configured compression formats
__plugin.checkCompressFmts();
// Check client TLS
m_hasClientTls = __plugin.checkTls(false);
if (!m_hasClientTls)
@ -1842,6 +1868,7 @@ void YJBEngine::processStartIn(JBEvent* ev)
XMPPFeature* auth = 0;
XMPPFeature* bind = 0;
XmlElement* caps = 0;
bool setComp = false;
bool c2s = ev->stream()->type() == JBStream::c2s;
// Add TLS if not secured
if (!secured && (c2s || s_s2sFeatures) && canTls)
@ -1849,11 +1876,16 @@ void YJBEngine::processStartIn(JBEvent* ev)
ev->stream()->flag(JBStream::TlsRequired));
bool authenticated = ev->stream()->flag(JBStream::StreamAuthenticated);
if (ev->stream()->type() == JBStream::s2s) {
if (!authenticated)
if (!authenticated) {
auth = new XMPPFeature(XmlTag::Dialback,XMPPNamespace::DialbackFeature);
setComp = true;
}
}
else if (c2s) {
bool tlsReq = tls && tls->required();
// NOTE: We should offer compression after authentication (XEP-0170)
// There are clients who ignore compression offered after auth
setComp = !tlsReq;
bool addReg = !authenticated && domain &&
domain->hasFeature(XMPPNamespace::Register,true);
// Add entity caps 'c' element
@ -1881,6 +1913,12 @@ void YJBEngine::processStartIn(JBEvent* ev)
features.add(tls);
features.add(reg);
features.add(auth);
// Offer compression
if (setComp && !ev->stream()->flag(JBStream::StreamCompressed)) {
String fmts = __plugin.compressFmts();
if (fmts)
features.add(new XMPPFeatureCompress(fmts));
}
features.add(bind);
ev->releaseStream();
ev->stream()->start(&features,caps);
@ -3518,6 +3556,15 @@ void JBModule::initialize()
s_entityCaps.load();
else
Debug(this,DebugAll,"Entity capability is disabled");
// Compression formats
String* fmts = cfg.getKey("general","compression_formats");
lock();
if (!fmts)
m_compressFmts = "zlib";
else
m_compressFmts = *fmts;
unlock();
}
// (re)init globals
@ -3591,6 +3638,37 @@ void JBModule::cancelListener(const String& name)
Debug(this,DebugInfo,"Listener '%s' terminated",name.c_str());
}
// Check if compression formats are supported. Update the list
void JBModule::checkCompressFmts()
{
static bool s_checking = false;
if (!s_engineStarted)
return;
Lock lock1(this);
if (s_checking)
return;
s_checking = true;
ObjList* list = m_compressFmts.split(',',false);
lock1.drop();
String tmp;
for (ObjList* o = list->skipNull(); o; o = o->skipNext()) {
String* s = static_cast<String*>(o->get());
Message m("engine.compress");
m.addParam("test",String::boolText(true));
m.addParam("format",*s);
if (Engine::dispatch(m))
tmp.append(*s,",");
}
TelEngine::destruct(list);
Lock lck(this);
if (m_compressFmts != tmp) {
Debug(this,DebugNote,"Changing supported compression formats to '%s' old='%s'",
tmp.c_str(),m_compressFmts.c_str());
m_compressFmts = tmp;
}
s_checking = false;
}
// Check if client/server TLS is available
bool JBModule::checkTls(bool server, const String& domain)
{