Detect zombie sctp sockets and reconnect them.

Fixed sctp stream listener logic.


git-svn-id: http://yate.null.ro/svn/yate/trunk@5130 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
andrei 2012-06-18 17:09:11 +00:00
parent 99c6af539f
commit 9826917545
2 changed files with 183 additions and 44 deletions

View File

@ -1,3 +1,10 @@
;[general]
; max_down: long: The time in seconds after a sctp socket is in connection
; hanging state. After this time expires the connection is re-established.
; If the connection is not re established the timer will exponential back off
; until reaches 60 seconds
;max_down=10
; Each section in this file describes a SIGTRAN connection
; Connections are referenced from other configurations describing the upper layer

View File

@ -57,6 +57,7 @@ public:
virtual bool readData() = 0;
virtual bool connectSocket() = 0;
virtual bool needConnect() = 0;
virtual void reset() = 0;
bool running();
bool start(Thread::Priority prio = Thread::Normal);
inline void resetThread()
@ -88,12 +89,15 @@ class TransportThread : public Thread
friend class TransportWorker;
public:
inline TransportThread(TransportWorker* worker, Priority prio = Normal)
: Thread("SignallingTransporter",prio), m_worker(worker)
: Thread("SignallingTransporter",prio), m_worker(worker), m_exit(false)
{ }
virtual ~TransportThread();
virtual void run();
void exitThread()
{ m_exit = true; }
private:
TransportWorker* m_worker;
bool m_exit;
};
class ListenerThread : public Thread
@ -118,12 +122,13 @@ public:
inline TReader()
: Mutex(true,"TReader"),
m_sending(true,"TReader::sending"), m_canSend(true), m_reconnect(false),
m_tryAgain(0), m_interval(CONN_RETRY_MIN)
m_tryAgain(0), m_interval(CONN_RETRY_MIN), m_downTime(0)
{ }
virtual ~TReader();
virtual void listen(int maxConn) = 0;
virtual bool sendMSG(const DataBlock& header, const DataBlock& msg, int streamId = 0) = 0;
virtual void setSocket(Socket* s) = 0;
virtual void reset() = 0;
void reconnect()
{ m_reconnect = true; }
Mutex m_sending;
@ -132,6 +137,7 @@ protected:
bool m_reconnect;
u_int64_t m_tryAgain;
u_int32_t m_interval;
u_int64_t m_downTime;
};
class Transport : public SIGTransport
@ -182,6 +188,7 @@ public:
{ }
virtual void reconnect(bool force);
bool connectSocket();
void resetReader(TReader* reader);
u_int32_t getMsgLen(unsigned char* buf);
bool addAddress(const NamedList &param, Socket* socket);
bool bindSocket();
@ -189,6 +196,7 @@ public:
virtual bool transmitMSG(const DataBlock& header, const DataBlock& msg, int streamId = 0);
private:
TReader* m_reader;
Mutex m_readerMutex;
bool m_streamer;
int m_type;
int m_state;
@ -212,7 +220,10 @@ public:
virtual void listen(int maxConn)
{ }
virtual bool sendBuffer(int streamId = 0);
void connectionDown();
virtual void reset()
{ m_transport->resetReader(this); }
void connectionDown(bool stop = true);
void stopThread();
private:
Transport* m_transport;
Socket* m_socket;
@ -238,11 +249,17 @@ public:
virtual void setSocket(Socket* s);
virtual void listen(int maxConn)
{ m_socket->listen(maxConn); }
virtual void reset()
{ m_transport->resetReader(this); }
bool bindSocket();
void reconnectSocket();
void updateTransportStatus(int status);
private:
Socket* m_socket;
Transport* m_transport;
Socket* m_socket;
SocketAddr m_remote;
u_int32_t m_reconnectInterval;
u_int64_t m_reconnectTryAgain;
};
class TransportModule : public Module
@ -257,6 +274,8 @@ private:
static TransportModule plugin;
YSIGFACTORY2(Transport);
static long s_maxDownAllowed = 10000000;
static const TokenDict s_transType[] = {
{ "none", Transport::None },
@ -400,10 +419,12 @@ void ListenerThread::run()
DDebug("ListenerThread",DebugNote, "Accept error: %s", strerror(m_socket->error()));
continue;
} else {
if (!m_transport->addSocket(newSoc,address))
if (!m_transport->addSocket(newSoc,address)) {
DDebug("ListenerThread",DebugNote,"Connection rejected for %s",
address.host().c_str());
Debug(DebugStub,"See if should be done peeloff for new incomming connections");
newSoc->terminate();
delete newSoc;
}
}
}
if (m_transport) {
@ -454,9 +475,7 @@ TransportThread::~TransportThread()
void TransportThread::run()
{
if (!m_worker)
return;
while (true) {
while (!m_exit) {
bool ret = false;
if (m_worker->needConnect())
ret = m_worker->connectSocket();
@ -467,6 +486,9 @@ void TransportThread::run()
else
Thread::msleep(5,true);
}
m_worker->resetThread();
m_worker->reset();
m_worker = 0;
}
TReader::~TReader()
@ -496,13 +518,12 @@ void TransportWorker::stop()
{
if (!(m_thread && m_thread->running()))
return;
m_thread->cancel();
m_thread->exitThread();
for (unsigned int i = 2 * Thread::idleMsec(); i--; ) {
Thread::msleep(1);
if (!m_thread)
return;
}
m_thread->cancel(true);
m_thread = 0;
}
@ -527,9 +548,10 @@ SignallingComponent* Transport::create(const String& type, const NamedList& name
}
Transport::Transport(const NamedList &param)
: m_reader(0), m_state(Down), m_listener(0), m_config(param), m_endpoint(true), m_supportEvents(true)
: m_reader(0), m_readerMutex(true,"TransportReader"), m_state(Down),
m_listener(0), m_config(param), m_endpoint(true), m_supportEvents(true)
{
setName("Transport");
setName("Transport:" + param);
Debug(this,DebugAll,"Transport created (%p)",this);
}
@ -541,6 +563,18 @@ Transport::~Transport()
Thread::yield();
Debug(this,DebugAll,"Destroying Transport [%p]",this);
delete m_reader;
m_reader = 0;
}
void Transport::resetReader(TReader* caller)
{
if (!caller)
return;
m_readerMutex.lock();
if (caller == m_reader)
m_reader = 0;
m_readerMutex.unlock();
delete caller;
}
bool Transport::control(const NamedList &param)
@ -563,12 +597,16 @@ bool Transport::control(const NamedList &param)
void Transport::reconnect(bool force)
{
Lock lock(m_readerMutex);
if (!m_reader) {
Debug(this,DebugWarn,"Request to reconnect but the transport is not initialized!!");
return;
}
if (m_state == Up && !force)
if (m_state == Up && !force) {
Debug(this,DebugInfo,
"Skipped transport restart. Transport is UP and force restart was not requested.");
return;
}
Debug(this,DebugInfo,"Transport reconnect requested");
m_reader->reconnect();
}
@ -595,6 +633,7 @@ bool Transport::initialize(const NamedList* params)
m_listener->startup();
return true;
}
Lock lock(m_readerMutex);
if (m_streamer)
m_reader = new StreamReader(this,0);
else {
@ -689,10 +728,15 @@ bool Transport::bindSocket()
delete socket;
return false;
}
Lock lock(m_readerMutex);
if (!m_reader) {
Debug(this,DebugFail,"Bind socket null reader!!");
return false;
}
m_reader->setSocket(socket);
int linger = m_config.getIntValue("linger",0);
socket->setLinger(linger);
setStatus(Up);
if (m_type == Sctp) {
// Send a dummy MGMT NTFY message to create the connection
static const unsigned char dummy[8] =
@ -702,7 +746,8 @@ bool Transport::bindSocket()
if (m_reader->sendMSG(hdr,DataBlock::empty(),1))
m_reader->listen(1);
hdr.clear(false);
}
} else
setStatus(Up);
return true;
}
@ -846,6 +891,13 @@ bool Transport::connectSocket()
Debug(this,DebugNote,"Socket conected to %d addresses",o.count());
}
sock->setBlocking(false);
Lock mylock(m_readerMutex);
if (!m_reader) {
Debug(this,DebugFail,"Connect socket null reader");
sock->terminate();
delete sock;
return false;
}
m_reader->setSocket(sock);
setStatus(Up);
return true;
@ -858,7 +910,13 @@ void Transport::setStatus(int status)
DDebug(this,DebugInfo,"State change: %s -> %s [%p]",
lookup(m_state,s_transStatus,"?"),lookup(status,s_transStatus,"?"),this);
m_state = status;
Lock mylock(m_readerMutex);
if (!m_reader) {
Debug(this,DebugFail,"setStatus null reader");
return;
}
m_reader->m_canSend = true;
mylock.drop();
notifyLayer((status == Up) ? SignallingInterface::LinkUp : SignallingInterface::LinkDown);
}
@ -871,6 +929,7 @@ u_int32_t Transport::getMsgLen(unsigned char* buf)
bool Transport::transmitMSG(const DataBlock& header, const DataBlock& msg, int streamId)
{
Lock lock(m_readerMutex);
if (!m_reader) {
DDebug(this,DebugMild,"Cannot send message, no reader set [%p]",this);
return false;
@ -881,8 +940,14 @@ bool Transport::transmitMSG(const DataBlock& header, const DataBlock& msg, int s
bool Transport::addSocket(Socket* socket,SocketAddr& adress)
{
Lock lock(m_readerMutex);
if (transType() == Up)
return false;
if (m_reader) {
StreamReader* sr = static_cast<StreamReader*>(m_reader);
m_reader = 0;
sr->reconnect();
}
SocketAddr addr(AF_INET);
String address, adr = m_config.getValue("remote");
int port = defPort();
@ -972,12 +1037,12 @@ void StreamReader::setSocket(Socket* s)
bool StreamReader::sendMSG(const DataBlock& header, const DataBlock& msg, int streamId)
{
Lock mylock(m_sending);
if (!m_canSend) {
DDebug(m_transport,DebugNote,"Cannot send message at this time");
return false;
}
bool ret = false;
Lock mylock(m_sending);
if (((m_sendBuffer.length() + msg.length()) + header.length()) < MAX_BUF_SIZE) {
m_sendBuffer += header;
m_sendBuffer += msg;
@ -985,11 +1050,17 @@ bool StreamReader::sendMSG(const DataBlock& header, const DataBlock& msg, int st
}
else
Debug(m_transport,DebugWarn,"Buffer Overrun");
mylock.drop();
return sendBuffer(streamId) && ret;
}
bool StreamReader::sendBuffer(int streamId)
{
Lock mylock(m_sending);
if (!m_canSend) {
DDebug(m_transport,DebugNote,"Cannot send message at this time");
return false;
}
if (!m_socket)
return needConnect();
if (m_sendBuffer.null())
@ -1000,8 +1071,10 @@ bool StreamReader::sendBuffer(int streamId)
return false;
}
if (error) {
if (m_socket->updateError() && !m_socket->canRetry())
if (m_socket->updateError() && !m_socket->canRetry()) {
mylock.drop();
connectionDown();
}
return false;
}
if (!sendOk)
@ -1015,6 +1088,7 @@ bool StreamReader::sendBuffer(int streamId)
}
if (m_transport->status() == Transport::Up)
if (!s->valid()) {
mylock.drop();
connectionDown();
return false;
}
@ -1026,6 +1100,7 @@ bool StreamReader::sendBuffer(int streamId)
if (len <= 0) {
if (!m_socket->canRetry()) {
Debug(m_transport,DebugMild,"Send error detected. %s",strerror(errno));
mylock.drop();
connectionDown();
}
return false;
@ -1037,13 +1112,18 @@ bool StreamReader::sendBuffer(int streamId)
bool StreamReader::connectSocket()
{
Time t;
if (t < m_tryAgain) {
if (t < m_tryAgain && !m_reconnect) {
Thread::yield(true);
return false;
}
if (m_reconnect) {
m_reconnect = false;
m_interval = CONN_RETRY_MIN;
}
m_tryAgain = t + m_interval;
if (m_transport->connectSocket()) {
m_interval = CONN_RETRY_MIN;
m_tryAgain = 0;
return true;
}
m_tryAgain = Time::now() + m_interval;
@ -1056,14 +1136,17 @@ bool StreamReader::connectSocket()
bool StreamReader::readData()
{
if (!(m_socket && m_sending.lock(SignallingEngine::maxLockWait())))
if (!(m_sending.lock(SignallingEngine::maxLockWait()) && m_socket))
return false;
if (m_reconnect) {
connectionDown();
connectionDown(false);
m_sending.unlock();
stopThread();
return false;
}
sendBuffer();
if (!m_socket)
return false;
m_sending.unlock();
int stream = 0, len = 0;
SocketAddr addr;
@ -1172,32 +1255,40 @@ bool StreamReader::readData()
return false;
}
void StreamReader::connectionDown() {
void StreamReader::connectionDown(bool stopTh) {
Debug(m_transport,DebugMild,"Connection down [%p]",m_socket);
m_transport->setStatus(Transport::Down);
while (!m_sending.lock(Thread::idleUsec()))
Thread::yield();
m_canSend = false;
m_sendBuffer.clear();
if (!m_socket) {
m_sending.unlock();
return;
if (m_socket) {
m_socket->terminate();
delete m_socket;
m_socket = 0;
}
m_socket->terminate();
if (m_transport->listen())
stop();
delete m_socket;
m_socket = 0;
m_sending.unlock();
if (stopTh) {
m_transport->setStatus(Transport::Down);
stopThread();
}
}
void StreamReader::stopThread()
{
m_transport->setStatus(Transport::Down);
if (!m_transport->listen())
return;
stop();
}
/**
* class MessageReader
*/
MessageReader::MessageReader(Transport* transport, Socket* sock, SocketAddr& addr)
: m_socket(sock), m_transport(transport), m_remote(addr)
: m_transport(transport), m_socket(sock), m_remote(addr),
m_reconnectInterval(s_maxDownAllowed),m_reconnectTryAgain(0)
{
DDebug(DebugAll,"Creating MessageReader [%p]",this);
}
@ -1231,7 +1322,6 @@ bool MessageReader::bindSocket()
Thread::yield(true);
return false;
}
m_tryAgain = t + m_interval;
Lock mylock(m_sending);
if (m_transport->bindSocket()) {
m_interval = CONN_RETRY_MIN;
@ -1245,6 +1335,22 @@ bool MessageReader::bindSocket()
return false;
}
void MessageReader::reconnectSocket()
{
u_int64_t t = Time::now();
if (t < m_reconnectTryAgain) {
Thread::yield(true);
return;
}
m_reconnect = true;
m_reconnectTryAgain = t + m_reconnectInterval;
// exponential backoff
m_reconnectInterval *= 2;
if (m_reconnectInterval > CONN_RETRY_MAX)
m_reconnectInterval = CONN_RETRY_MAX;
}
bool MessageReader::sendMSG(const DataBlock& header, const DataBlock& msg, int streamId)
{
if (!m_canSend) {
@ -1257,7 +1363,7 @@ bool MessageReader::sendMSG(const DataBlock& header, const DataBlock& msg, int s
while (m_socket && m_socket->select(0,&sendOk,&error,Thread::idleUsec())) {
if (error) {
DDebug(m_transport,DebugAll,"Send error detected. %s",strerror(errno));
m_transport->setStatus(Transport::Down);
updateTransportStatus(Transport::Down);
break;
}
if (!sendOk)
@ -1303,22 +1409,33 @@ bool MessageReader::readData()
if (m_socket && m_reconnect) {
if (!reconLock.locked())
return false;
if (m_transport->status() != Transport::Up)
return false; // We are already in reconnecting state
m_transport->setStatus(Transport::Initiating);
if (m_downTime != 0) {
int sec = (int)((Time::now() - m_downTime) / 1000000); // usec to sec
Debug(m_transport,DebugNote,"Reconnecting sctp socket! is down for %d seconds.",sec);
}
m_socket->terminate();
delete m_socket;
m_socket = 0;
reconLock.drop();
updateTransportStatus(Transport::Initiating);
return false;
}
reconLock.drop();
if (!m_socket && !bindSocket())
return false;
bool readOk = false,error = false;
if (!(running() && m_socket && m_socket->select(&readOk,0,&error,Thread::idleUsec())))
if (!(running() && m_socket))
return false;
if (!readOk || error)
if (!m_socket->select(&readOk,0,&error,Thread::idleUsec())) {
if (m_transport->status() == Transport::Initiating)
reconnectSocket();
return false;
}
if ((!readOk || error)) {
if (!readOk && m_transport->status() == Transport::Initiating)
reconnectSocket();
return false;
}
unsigned char buffer[MAX_BUF_SIZE];
int stream = 0;
@ -1344,14 +1461,14 @@ bool MessageReader::readData()
if (flags) {
if (flags == 2) {
DDebug(m_transport,DebugAll,"Sctp connection is Up");
m_transport->setStatus(Transport::Up);
updateTransportStatus(Transport::Up);
return true;
}
DDebug(m_transport,DebugNote,"Message error [%p] %d",m_socket,flags);
if (m_transport->status() != Transport::Up)
return false;
Lock lock(m_sending);
m_transport->setStatus(Transport::Initiating);
updateTransportStatus(Transport::Initiating);
Debug(m_transport,DebugInfo,"Terminating socket [%p] Reason: connection down!",m_socket);
m_socket->terminate();
delete m_socket;
@ -1361,17 +1478,17 @@ bool MessageReader::readData()
}
else
r = m_socket->recvFrom((void*)buffer,MAX_BUF_SIZE,addr);
if (r <= 0)
return false;
m_interval = CONN_RETRY_MIN;
m_reconnectInterval = s_maxDownAllowed;
u_int32_t len = m_transport->getMsgLen(buffer);
if ((unsigned int)r != len) {
Debug(m_transport,DebugNote,"Protocol read error read: %d, expected %d",r,len);
return false;
}
m_transport->setStatus(Transport::Up);
updateTransportStatus(Transport::Up);
DataBlock packet(buffer,r);
packet.cut(-8);
m_transport->processMSG(m_transport->getVersion((unsigned char*)buffer),
@ -1379,6 +1496,17 @@ bool MessageReader::readData()
return true;
}
void MessageReader::updateTransportStatus(int status)
{
if (status == Transport::Up)
m_downTime = 0;
else if (m_downTime == 0) {
m_downTime = Time::now();
m_reconnectTryAgain = m_downTime + m_reconnectInterval;
}
m_transport->setStatus(status);
}
/**
* class TransportModule
*/
@ -1397,8 +1525,12 @@ TransportModule::~TransportModule()
void TransportModule::initialize()
{
Output("Initializing module SigTransport");
Configuration cfg(Engine::configFile("sigtransport"));
cfg.load();
s_maxDownAllowed = cfg.getIntValue(YSTRING("general"),YSTRING("max_down"),10);
s_maxDownAllowed *=1000000;
if (!m_init) {
Output("Initializing module SigTransport");
m_init = true;
setup();
}