Fixed bug in stream restart.

git-svn-id: http://voip.null.ro/svn/yate@1094 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
marian 2006-10-31 09:02:15 +00:00
parent 0df283ba65
commit 2e1b684ffa
3 changed files with 58 additions and 27 deletions

View File

@ -32,6 +32,7 @@ using namespace TelEngine;
// Default values
#define JB_STREAM_PARTIALRESTART 2
#define JB_STREAM_TOTALRESTART -1
#define JB_STREAM_WAITRESTART 5000
// Sleep time for threads
#define SLEEP_READSOCKET 2 // Read socket
@ -63,14 +64,24 @@ void JBEngine::initialize(const NamedList& params)
// Stream restart attempts
m_partialStreamRestart =
params.getIntValue("stream_partialrestart",JB_STREAM_PARTIALRESTART);
// sanity check to avoid perpetual retries
// Sanity check to avoid perpetual retries
if (m_partialStreamRestart < 1)
m_partialStreamRestart = 1;
m_totalStreamRestart =
params.getIntValue("stream_totalrestart",JB_STREAM_TOTALRESTART);
m_waitStreamRestart =
params.getIntValue("stream_waitrestart",JB_STREAM_WAITRESTART);
// XML parser max receive buffer
XMLParser::s_maxDataBuffer =
params.getIntValue("xmlparser_maxbuffer",XMLPARSER_MAXDATABUFFER);
if (debugAt(DebugAll)) {
String s;
s << "\r\nstream_partialrestart=" << m_partialStreamRestart;
s << "\r\nstream_totalrestart=" << m_totalStreamRestart;
s << "\r\nstream_waitrestart=" << m_waitStreamRestart;
s << "\r\nxmlparser_maxbuffer=" << XMLParser::s_maxDataBuffer;
Debug(this,DebugAll,"Initialized:%s",s.c_str());
}
}
void JBEngine::cleanup()
@ -530,7 +541,7 @@ JBEvent::~JBEvent()
}
if (m_element)
delete m_element;
XDebug(DebugAll,"JBEvent::~JBEvent [%p].",this);
XDebug(DebugAll,"JBEvent::~JBEvent. [%p]",this);
}
void JBEvent::releaseStream()
@ -543,14 +554,15 @@ void JBEvent::releaseStream()
bool JBEvent::init(JBComponentStream* stream, XMLElement* element)
{
XDebug(DebugAll,"JBEvent::JBEvent [%p]. Element: (%p).",
this,m_element);
bool bRet = true;
if (stream && stream->ref())
m_stream = stream;
else
return false;
bRet = false;
m_element = element;
return true;
XDebug(DebugAll,"JBEvent::JBEvent. Type: %u. Stream (%p). Element: (%p). [%p]",
m_type,m_stream,m_element,this);
return bRet;
}
/**
@ -592,7 +604,7 @@ JBPresence::JBPresence(JBEngine* engine)
Mutex(true)
{
debugName("jbpresence");
XDebug(this,DebugAll,"JBPresence [%p].",this);
XDebug(this,DebugAll,"JBPresence. [%p]",this);
if (m_engine)
m_engine->setPresenceServer(this);
}
@ -602,7 +614,7 @@ JBPresence::~JBPresence()
if (m_engine)
m_engine->unsetPresenceServer(this);
m_events.clear();
XDebug(this,DebugAll,"~JBPresence [%p].",this);
XDebug(this,DebugAll,"~JBPresence. [%p]",this);
}
bool JBPresence::receive(JBEvent* event)

View File

@ -62,7 +62,7 @@ JBComponentStream::JBComponentStream(JBEngine* engine, const String& remoteName,
m_totalRestart = m_engine->totalStreamRestartAttempts();
m_engine->getServerIdentity(m_localName,remoteName);
// Start
connect();
m_engine->connect(this);
}
JBComponentStream::~JBComponentStream()
@ -91,8 +91,11 @@ JBComponentStream::~JBComponentStream()
void JBComponentStream::connect()
{
Lock2 lock(*this,m_receiveMutex);
if (m_state != Terminated)
if (m_state != Terminated) {
Debug(m_engine,DebugNote,
"Stream::connect. Attempt to connect in non Terminated state. [%p]",this);
return;
}
m_state = WaitToConnect;
// Check restart counters: If both of them are 0 destroy the stream
Debug(m_engine,DebugAll,
@ -102,10 +105,6 @@ void JBComponentStream::connect()
terminate(true,false,0,false);
return;
}
// Update restart counters
if (m_partialRestart > 0)
m_partialRestart--;
m_waitBeforeConnect = (m_partialRestart == 0);
// Reset data
m_id = "";
m_parser.reset();
@ -116,6 +115,21 @@ void JBComponentStream::connect()
bool res = m_socket->connect(m_remoteAddr);
// Lock again to update stream
lock.lock(*this,m_receiveMutex);
// Update restart counters
m_waitBeforeConnect = false;
if (res)
m_partialRestart = m_engine->partialStreamRestartAttempts();
else {
if (m_partialRestart > 0)
m_partialRestart--;
if (!m_partialRestart && m_totalRestart > 0)
m_totalRestart--;
if (!m_partialRestart && m_totalRestart) {
m_waitBeforeConnect = true;
m_partialRestart = m_engine->partialStreamRestartAttempts();
}
}
// Check connect result
if (!res) {
Debug(m_engine,DebugWarn,
"Stream::connect. Failed to connect socket to '%s:%d'. Error: '%s' (%d). [%p]",
@ -126,12 +140,6 @@ void JBComponentStream::connect()
}
Debug(m_engine,DebugAll,"Stream::connect. Connected to '%s:%d'. [%p]",
m_remoteAddr.host().c_str(),m_remoteAddr.port(),this);
// Update restart data
if (m_partialRestart != -1)
m_partialRestart = m_engine->partialStreamRestartAttempts();
if (m_totalRestart > 0)
m_totalRestart--;
m_waitBeforeConnect = false;
// Connected
m_socket->setBlocking(false);
lock.drop();
@ -218,27 +226,30 @@ JBEvent* JBComponentStream::getEvent(u_int64_t time)
m_state == Destroy || m_state == Terminated) {
if (m_lastEvent)
return 0;
if (m_terminateEvent) {
m_lastEvent = m_terminateEvent;
m_terminateEvent = 0;
}
return m_lastEvent;
break;
}
// Send pending elements.
// If not terminated check received elements
// Again, if not terminated, get event from queue
sendXML();
if (m_terminateEvent)
continue;
break;
processIncomingXML();
if (m_terminateEvent)
continue;
break;
// Get first event from queue
ObjList* obj = m_events.skipNull();
if (!obj)
break;
m_lastEvent = static_cast<JBEvent*>(obj->get());
m_events.remove(m_lastEvent,false);
break;
}
if (m_lastEvent || m_terminateEvent) {
if (!m_lastEvent) {
m_lastEvent = m_terminateEvent;
m_terminateEvent = 0;
}
DDebug(m_engine,DebugAll,
"Stream::getEvent. Raise event (%p): %u. [%p]",
m_lastEvent,m_lastEvent->type(),this);

View File

@ -647,6 +647,13 @@ public:
inline int totalStreamRestartAttempts() const
{ return m_totalStreamRestart; }
/**
* Get the time to wait after m_partialStreamRestart reaches 0.
* @return time to wait after m_partialStreamRestart reaches 0.
*/
inline u_int32_t waitStreamRestart() const
{ return m_waitStreamRestart; }
/**
* Check if a stream to the given server exists.
* If the stream doesn't exists creates it.
@ -828,6 +835,7 @@ private:
ObjList m_features; // Remote peers' features
int m_partialStreamRestart; // Partial outgoing stream restart attempts counter
int m_totalStreamRestart; // Total outgoing stream restart attempts counter
u_int32_t m_waitStreamRestart; // How much time to wait after m_partialStreamRestart reaches 0
// ID generation data
u_int64_t m_streamID; // Stream id counter
// Server list