Use the exponential backoff timer for connecting stream transports too.

Renamed the timeout constants to reflect the change.


git-svn-id: http://voip.null.ro/svn/yate@3499 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
paulc 2010-08-23 23:33:49 +00:00
parent 5b782a4171
commit d11940f0ed
1 changed files with 33 additions and 13 deletions

View File

@ -30,8 +30,8 @@
#define MAX_BUF_SIZE 48500
#define SCTP_RETRY_MIN 100000
#define SCTP_RETRY_MAX 5000000
#define CONN_RETRY_MIN 100000
#define CONN_RETRY_MAX 5000000
using namespace TelEngine;
namespace { // anonymous
@ -117,7 +117,8 @@ class TReader : public TransportWorker, public Mutex
public:
inline TReader()
: Mutex(true,"TReader"),
m_sending(true,"TReader::sending"), m_canSend(true)
m_sending(true,"TReader::sending"), m_canSend(true),
m_tryAgain(0), m_interval(CONN_RETRY_MIN)
{ }
virtual ~TReader();
virtual void listen(int maxConn) = 0;
@ -125,6 +126,9 @@ public:
virtual void setSocket(Socket* s) = 0;
Mutex m_sending;
bool m_canSend;
protected:
u_int64_t m_tryAgain;
u_int32_t m_interval;
};
class Transport : public SIGTransport
@ -193,8 +197,7 @@ public:
StreamReader(Transport* transport, Socket* sock);
~StreamReader();
virtual bool readData();
virtual bool connectSocket()
{ return m_transport->connectSocket(); }
virtual bool connectSocket();
virtual bool needConnect()
{ return m_transport && m_transport->status() == Transport::Down && !m_transport->listen(); }
virtual bool sendMSG(const DataBlock& header, const DataBlock& msg, int streamId = 0);
@ -231,8 +234,6 @@ private:
Socket* m_socket;
Transport* m_transport;
SocketAddr m_remote;
u_int64_t m_tryAgain;
u_int32_t m_interval;
};
class TransportModule : public Module
@ -753,7 +754,7 @@ bool Transport::connectSocket()
addr.host(address);
addr.port(port);
if (m_endpoint && !sock->connect(addr)) {
DDebug(this,DebugNote,"Unable to connect to %s:%u. %s",
Debug(this,DebugNote,"Unable to connect to %s:%u. %s",
addr.host().c_str(),addr.port(),strerror(errno));
sock->terminate();
delete sock;
@ -959,6 +960,26 @@ bool StreamReader::sendBuffer(int streamId)
return true;
}
bool StreamReader::connectSocket()
{
Time t;
if (t < m_tryAgain) {
Thread::yield(true);
return false;
}
m_tryAgain = t + m_interval;
if (m_transport->connectSocket()) {
m_interval = CONN_RETRY_MIN;
return true;
}
m_tryAgain = Time::now() + m_interval;
// exponential backoff
m_interval *= 2;
if (m_interval > CONN_RETRY_MAX)
m_interval = CONN_RETRY_MAX;
return false;
}
bool StreamReader::readData()
{
if (!m_socket)
@ -1077,8 +1098,7 @@ bool StreamReader::readData()
*/
MessageReader::MessageReader(Transport* transport, Socket* sock, SocketAddr& addr)
: m_socket(sock), m_transport(transport), m_remote(addr),
m_tryAgain(0), m_interval(SCTP_RETRY_MIN)
: m_socket(sock), m_transport(transport), m_remote(addr)
{
DDebug(DebugAll,"Creating MessageReader [%p]",this);
}
@ -1195,8 +1215,8 @@ bool MessageReader::readData()
m_tryAgain = Time::now() + m_interval;
// exponential backoff
m_interval *= 2;
if (m_interval > SCTP_RETRY_MAX)
m_interval = SCTP_RETRY_MAX;
if (m_interval > CONN_RETRY_MAX)
m_interval = CONN_RETRY_MAX;
return false;
}
}
@ -1205,7 +1225,7 @@ bool MessageReader::readData()
if (r <= 0)
return false;
m_interval = SCTP_RETRY_MIN;
m_interval = CONN_RETRY_MIN;
u_int32_t len = m_transport->getMsgLen(buffer);
if ((unsigned int)r != len) {