Delay stream termination on I/O error if we have pending xml elements.

git-svn-id: http://voip.null.ro/svn/yate@4585 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
marian 2011-09-01 15:04:31 +00:00
parent c76fe8afb8
commit 9084761f89
3 changed files with 168 additions and 40 deletions

View File

@ -902,7 +902,7 @@ JBEngine::JBEngine(const char* name)
m_setupTimeout(JB_SETUP_INTERVAL), m_startTimeout(JB_START_INTERVAL),
m_connectTimeout(JB_CONNECT_INTERVAL), m_srvTimeout(JB_SRV_INTERVAL),
m_pingInterval(JB_PING_INTERVAL), m_pingTimeout(JB_PING_TIMEOUT),
m_idleTimeout(0),
m_idleTimeout(0), m_pptTimeoutC2s(0), m_pptTimeout(0),
m_streamReadBuffer(JB_STREAMBUF), m_maxIncompleteXml(XMPP_MAX_INCOMPLETEXML),
m_hasClientTls(true), m_printXml(0), m_initialized(false)
{
@ -956,6 +956,8 @@ void JBEngine::initialize(const NamedList& params)
JB_PING_TIMEOUT,JB_PING_TIMEOUT_MIN,JB_PING_TIMEOUT_MAX);
m_idleTimeout = fixValue(params,"stream_idletimeout",
JB_IDLE_INTERVAL,JB_IDLE_INTERVAL_MIN,JB_IDLE_INTERVAL_MAX);
m_pptTimeoutC2s = params.getIntValue("stream_ppttimeout_c2s",10000,0,120000);
m_pptTimeout = params.getIntValue("stream_ppttimeout",60000,0,180000);
m_initialized = true;
}

View File

@ -153,7 +153,7 @@ JBStream::JBStream(JBEngine* engine, Socket* socket, Type t, bool ssl)
m_idleTimeout(0), m_connectTimeout(0),
m_restart(0), m_timeToFillRestart(0),
m_engine(engine), m_type(t),
m_incoming(true), m_terminateEvent(0),
m_incoming(true), m_terminateEvent(0), m_ppTerminate(0), m_ppTerminateTimeout(0),
m_xmlDom(0), m_socket(0), m_socketFlags(0), m_socketMutex(true,"JBStream::Socket"),
m_connectPort(0), m_compress(0), m_connectStatus(JBConnect::Start)
{
@ -184,7 +184,7 @@ JBStream::JBStream(JBEngine* engine, Type t, const JabberID& local, const Jabber
m_restart(1), m_timeToFillRestart(0),
m_engine(engine), m_type(t),
m_incoming(false), m_name(name),
m_terminateEvent(0),
m_terminateEvent(0), m_ppTerminate(0), m_ppTerminateTimeout(0),
m_xmlDom(0), m_socket(0), m_socketFlags(0), m_socketMutex(true,"JBStream::Socket"),
m_connectPort(0), m_compress(0), m_connectStatus(JBConnect::Start)
{
@ -285,6 +285,23 @@ const String& JBStream::toString() const
return m_name;
}
// Check if the stream has valid pending data
bool JBStream::haveData()
{
Lock2 lck(this,&m_socketMutex);
// Pending data with socket available for writing
if (m_pending.skipNull() && socketCanWrite())
return true;
// Pending events
if (m_events.skipNull())
return true;
// Pending incoming XML
XmlDocument* doc = m_xmlDom ? m_xmlDom->document() : 0;
XmlElement* root = doc ? doc->root(false) : 0;
XmlElement* first = root ? root->findFirstChild() : 0;
return first && first->completed();
}
// Retrieve connection address(es), port and status
void JBStream::connectAddr(String& addr, int& port, String& localip, int& stat,
ObjList& srvs) const
@ -348,7 +365,7 @@ bool JBStream::readSocket(char* buf, unsigned int len)
int read = m_socket->readData(buf,len);
Lock lck(m_socketMutex);
// Check if the connection is waiting to be reset
if (0 != (m_socketFlags & SocketWaitReset)) {
if (socketWaitReset()) {
socketSetReading(false);
return false;
}
@ -437,12 +454,12 @@ bool JBStream::readSocket(char* buf, unsigned int len)
}
// Error
int location = 0;
const char* reason = 0;
String reason;
if (error != XMPPError::SocketError) {
if (error == XMPPError::Xml) {
reason = m_xmlDom->getError();
Debug(this,DebugNote,"Parser error='%s' buffer='%s' [%p]",
reason,m_xmlDom->buffer().c_str(),this);
reason << "Parser error '" << m_xmlDom->getError() << "'";
Debug(this,DebugNote,"%s buffer='%s' [%p]",
reason.c_str(),m_xmlDom->buffer().c_str(),this);
}
else if (error == XMPPError::UndefinedCondition) {
reason = "Decompression failure";
@ -453,23 +470,25 @@ bool JBStream::readSocket(char* buf, unsigned int len)
Debug(this,DebugNote,"No decompressor [%p]",this);
}
else {
reason = "XML element too long";
Debug(this,DebugNote,"Parser error='%s' overflow len=%u max= %u [%p]",
reason,m_xmlDom->buffer().length(),m_engine->m_maxIncompleteXml,this);
reason = "Parser error 'XML element too long'";
Debug(this,DebugNote,"Parser overflow len=%u max= %u [%p]",
m_xmlDom->buffer().length(),m_engine->m_maxIncompleteXml,this);
}
}
else if (read) {
String tmp;
Thread::errorString(tmp,m_socket->error());
Debug(this,DebugWarn,"Socket read error: %d: '%s' [%p]",m_socket->error(),
tmp.c_str(),this);
reason << "Socket read error: " << tmp << " (" << m_socket->error() << ")";
Debug(this,DebugWarn,"%s [%p]",reason.c_str(),this);
}
else {
Debug(this,DebugInfo,"Stream EOF [%p]",this);
reason = "Stream EOF";
Debug(this,DebugInfo,"%s [%p]",reason.c_str(),this);
location = 1;
}
socketSetCanRead(false);
lck.drop();
terminate(location,m_incoming,0,error,reason);
postponeTerminate(location,m_incoming,error,reason);
return read > 0;
}
@ -749,6 +768,7 @@ void JBStream::terminate(int location, bool destroy, XmlElement* xml, int error,
Lock lock(this);
m_pending.clear();
m_outXmlCompress.clear();
resetPostponedTerminate();
// Already in destroy
if (state() == Destroy) {
TelEngine::destruct(xml);
@ -937,13 +957,22 @@ void JBStream::process(u_int64_t time)
XmlElement* xml = root->pop();
if (!xml) {
// No (more) children: check termination
if (root->completed()) {
lockDoc.drop();
DDebug(this,DebugAll,"Remote closed the stream in state %s [%p]",
stateName(),this);
terminate(1,false,0);
if (root->completed())
socketSetCanRead(false);
if (m_events.skipNull())
break;
if (!root->completed()) {
if (m_ppTerminate && !(m_pending.skipNull() && socketCanWrite())) {
lockDoc.drop();
postponedTerminate();
}
break;
}
DDebug(this,DebugAll,"Remote closed the stream in state %s [%p]",
stateName(),this);
lockDoc.drop();
resetPostponedTerminate();
terminate(1,false,0);
break;
}
lockDoc.drop();
@ -965,7 +994,7 @@ void JBStream::process(u_int64_t time)
if (!checkStanzaRecv(xml,from,to))
break;
XDebug(m_engine,DebugAll,"Processing (%p,%s) in state %s [%p]",
DDebug(this,DebugAll,"Processing (%p,%s) in state %s [%p]",
xml,xml->tag(),stateName(),this);
// Process here dialback verify
@ -1061,6 +1090,12 @@ bool JBStream::processRunning(XmlElement* xml, const JabberID& from, const Jabbe
// This method is called from getEvent() with the stream locked, after
void JBStream::checkTimeouts(u_int64_t time)
{
if (m_ppTerminateTimeout && m_ppTerminateTimeout <= time) {
m_ppTerminateTimeout = 0;
Debug(this,DebugAll,"Postponed termination timed out [%p]",this);
if (postponedTerminate())
return;
}
// Running: check ping and idle timers
if (m_state == Running) {
if (m_pingTimeout) {
@ -1135,6 +1170,7 @@ void JBStream::resetConnection(Socket* sock)
delete tmp;
}
}
resetPostponedTerminate();
if (sock) {
Lock lock(m_socketMutex);
if (m_socket) {
@ -1155,6 +1191,7 @@ void JBStream::resetConnection(Socket* sock)
m_socket->setReuse(true);
m_socket->setBlocking(false);
socketSetCanRead(true);
socketSetCanWrite(true);
}
}
@ -1517,7 +1554,7 @@ void JBStream::changeState(State newState, u_int64_t time)
{
if (newState == m_state)
return;
DDebug(this,DebugAll,"Changing state from '%s' to '%s' [%p]",
Debug(this,DebugAll,"Changing state from '%s' to '%s' [%p]",
stateName(),lookup(newState,s_stateName),this);
// Set/reset state depending data
switch (m_state) {
@ -1653,10 +1690,8 @@ bool JBStream::sendPending(bool streamOnly)
buf = m_outStreamXmlCompress.data();
len = m_outStreamXmlCompress.length();
}
if (!writeSocket(buf,len)) {
terminate(0,m_incoming,0,XMPPError::SocketError);
if (!writeSocket(buf,len))
return false;
}
bool all = false;
if (noComp) {
all = (len == m_outStreamXml.length());
@ -1722,6 +1757,8 @@ bool JBStream::sendPending(bool streamOnly)
if (!sent)
m_engine->printXml(this,true,*xml);
if (writeSocket(buf,len)) {
if (!len)
return true;
setIdleTimer();
// Adjust element's buffer. Remove it from list on completion
unsigned int rest = 0;
@ -1743,22 +1780,24 @@ bool JBStream::sendPending(bool streamOnly)
return true;
}
// Error
Debug(this,DebugNote,"Failed to send (%p,%s) in state=%s [%p]",
xml,xml->tag(),stateName(),this);
terminate(0,m_incoming,0,XMPPError::SocketError);
Debug(this,DebugNote,"Failed to send (%p,%s) [%p]",xml,xml->tag(),this);
return false;
}
// Write data to socket
bool JBStream::writeSocket(const void* data, unsigned int& len)
{
if (!(data && m_socket)) {
if (!(data && len)) {
len = 0;
return m_socket != 0;
return true;
}
Lock lock(m_socketMutex);
if (!m_socket || 0 != (m_socketFlags & SocketWaitReset)) {
if (!socketCanWrite()) {
len = 0;
if (0 != (m_socketFlags & SocketCanWrite)) {
socketSetCanWrite(false);
postponeTerminate(0,m_incoming,XMPPError::SocketError,"No socket");
}
return false;
}
socketSetWriting(true);
@ -1784,7 +1823,7 @@ bool JBStream::writeSocket(const void* data, unsigned int& len)
#endif
Lock lck(m_socketMutex);
// Check if the connection is waiting to be reset
if (0 != (m_socketFlags & SocketWaitReset)) {
if (socketWaitReset()) {
socketSetWriting(false);
return true;
}
@ -1796,13 +1835,14 @@ bool JBStream::writeSocket(const void* data, unsigned int& len)
socketSetWriting(false);
if (w != Socket::socketError() || m_socket->canRetry())
return true;
socketSetCanWrite(false);
String tmp;
Thread::errorString(tmp,m_socket->error());
Debug(this,DebugWarn,"Socket send error: %d: '%s' [%p]",
m_socket->error(),tmp.c_str(),this);
String reason;
reason << "Socket send error: " << tmp << " (" << m_socket->error() << ")";
Debug(this,DebugWarn,"%s [%p]",reason.c_str(),this);
lck.drop();
// Terminate the connection now: avoid loop back
resetConnection();
postponeTerminate(0,m_incoming,XMPPError::SocketError,reason);
return false;
}
@ -2389,6 +2429,54 @@ void JBStream::resetConnectStatus()
m_connectSrvs.clear();
}
// Postpone stream terminate until all parsed elements are processed
// Terminate now if allowed
void JBStream::postponeTerminate(int location, bool destroy, int error, const char* reason)
{
lock();
XDebug(this,DebugAll,"postponeTerminate(%d,%u,%s,%s) state=%s [%p]",
location,destroy,XMPPUtils::s_error[error].c_str(),reason,stateName(),this);
if (!m_ppTerminate) {
int interval = 0;
if (type() == c2s)
interval = m_engine->m_pptTimeoutC2s;
else
interval = m_engine->m_pptTimeout;
if (interval && haveData()) {
m_ppTerminate = new NamedList("");
m_ppTerminate->addParam("location",String(location));
m_ppTerminate->addParam("destroy",String::boolText(destroy));
m_ppTerminate->addParam("error",String(error));
m_ppTerminate->addParam("reason",reason);
m_ppTerminateTimeout = Time::msecNow() + interval;
Debug(this,DebugInfo,
"Postponed termination location=%d destroy=%u error=%s reason=%s interval=%us [%p]",
location,destroy,XMPPUtils::s_error[error].c_str(),reason,interval,this);
}
}
bool postponed = m_ppTerminate != 0;
unlock();
if (!postponed)
terminate(location,destroy,0,error,reason);
}
// Handle postponed termination. Return true if found
bool JBStream::postponedTerminate()
{
if (!m_ppTerminate)
return false;
int location = m_ppTerminate->getIntValue("location");
int destroy = m_ppTerminate->getIntValue("destroy");
int error = m_ppTerminate->getIntValue("error");
String reason = m_ppTerminate->getValue("reason");
resetPostponedTerminate();
DDebug(this,DebugAll,"postponedTerminate(%d,%u,%s,%s) state=%s [%p]",
location,destroy,XMPPUtils::s_error[error].c_str(),reason.c_str(),
stateName(),this);
terminate(location,destroy,0,error,reason);
return true;
}
/*
* JBClientStream

View File

@ -711,6 +711,14 @@ public:
resetFlags(TlsRequired);
}
/**
* Check if the stream has valid pending data (received xml elements in queue or
* pending events or pending xml elements that can still be sent).
* This method is thread safe
* @return True if the stream have pending data, false otherwise
*/
bool haveData();
/**
* Retrieve connection address(es), port and status
* This method is not thread safe
@ -1120,7 +1128,7 @@ protected:
* Write data to socket. Terminate the stream on socket error
* @param data Buffer to sent
* @param len The number of bytes to send. Filled with actually sent bytes on exit
* @return True on success, false if stream termination was initiated
* @return True on success, false on failure
*/
bool writeSocket(const void* data, unsigned int& len);
@ -1243,11 +1251,24 @@ private:
bool compress(XmlElementOut* xml = 0);
// Reset connect status data
void resetConnectStatus();
// Postpone stream terminate until all parsed elements are processed
// Terminate now if allowed
// This method is thread safe
void postponeTerminate(int location, bool destroy, int error, const char* reason);
// Handle postponed termination. Return true if found
// This method is not thread safe
bool postponedTerminate();
// Reset postponed terminate data
inline void resetPostponedTerminate() {
m_ppTerminateTimeout = 0;
TelEngine::destruct(m_ppTerminate);
}
enum {
SocketCanRead = 0x01,
SocketReading = 0x02,
SocketWriting = 0x10,
SocketCanWrite = 0x10,
SocketWriting = 0x20,
SocketWaitReset = 0x80,
};
inline void socketSetCanRead(bool ok) {
@ -1263,6 +1284,13 @@ private:
else
m_socketFlags &= ~SocketReading;
}
inline void socketSetCanWrite(bool ok) {
Lock lock(m_socketMutex);
if (ok)
m_socketFlags |= SocketCanWrite;
else
m_socketFlags &= ~SocketCanWrite;
}
inline void socketSetWriting(bool ok) {
if (ok)
m_socketFlags |= SocketWriting;
@ -1271,18 +1299,26 @@ private:
}
inline bool socketCanRead() const {
return m_socket && (m_socketFlags & SocketCanRead) &&
0 == (m_socketFlags & SocketWaitReset);
!socketWaitReset();
}
inline bool socketCanWrite() const {
return m_socket && (m_socketFlags & SocketCanWrite) &&
!socketWaitReset();
}
inline bool socketReading() const
{ return (m_socketFlags & SocketReading) != 0; }
inline bool socketWriting() const
{ return (m_socketFlags & SocketWriting) != 0; }
inline bool socketWaitReset() const
{ return 0 != (m_socketFlags & SocketWaitReset); }
JBEngine* m_engine; // The owner of this stream
int m_type; // Stream type
bool m_incoming; // Stream direction
String m_name; // Local (internal) name
JBEvent* m_terminateEvent; // Pending terminate event
NamedList* m_ppTerminate; // Postponed terminate parameters
u_int64_t m_ppTerminateTimeout; // Postponed terminate timeout
// Pending outgoing XML
String m_outStreamXml;
DataBlock m_outStreamXmlCompress;
@ -2129,6 +2165,8 @@ protected:
unsigned int m_pingInterval; // Stream idle interval (no data received)
unsigned int m_pingTimeout; // Sent ping timeout
unsigned int m_idleTimeout; // Stream idle timeout (nothing sent or received)
unsigned int m_pptTimeoutC2s; // Client streams postpone termination intervals
unsigned int m_pptTimeout; // Non client streams postpone stream termination intervals
unsigned int m_streamReadBuffer; // Stream read buffer length
unsigned int m_maxIncompleteXml; // Maximum length of an incomplete xml
bool m_hasClientTls; // True if TLS is available for outgoing streams