diff --git a/conf.d/sigtransport.conf.sample b/conf.d/sigtransport.conf.sample index 57115148..5b9bf570 100644 --- a/conf.d/sigtransport.conf.sample +++ b/conf.d/sigtransport.conf.sample @@ -1,3 +1,10 @@ +;[general] +; max_down: long: The time in seconds after a sctp socket is in connection +; hanging state. After this time expires the connection is re-established. +; If the connection is not re established the timer will exponential back off +; until reaches 60 seconds +;max_down=10 + ; Each section in this file describes a SIGTRAN connection ; Connections are referenced from other configurations describing the upper layer diff --git a/modules/server/sigtransport.cpp b/modules/server/sigtransport.cpp index 714a9090..20f629f7 100644 --- a/modules/server/sigtransport.cpp +++ b/modules/server/sigtransport.cpp @@ -57,6 +57,7 @@ public: virtual bool readData() = 0; virtual bool connectSocket() = 0; virtual bool needConnect() = 0; + virtual void reset() = 0; bool running(); bool start(Thread::Priority prio = Thread::Normal); inline void resetThread() @@ -88,12 +89,15 @@ class TransportThread : public Thread friend class TransportWorker; public: inline TransportThread(TransportWorker* worker, Priority prio = Normal) - : Thread("SignallingTransporter",prio), m_worker(worker) + : Thread("SignallingTransporter",prio), m_worker(worker), m_exit(false) { } virtual ~TransportThread(); virtual void run(); + void exitThread() + { m_exit = true; } private: TransportWorker* m_worker; + bool m_exit; }; class ListenerThread : public Thread @@ -118,12 +122,13 @@ public: inline TReader() : Mutex(true,"TReader"), m_sending(true,"TReader::sending"), m_canSend(true), m_reconnect(false), - m_tryAgain(0), m_interval(CONN_RETRY_MIN) + m_tryAgain(0), m_interval(CONN_RETRY_MIN), m_downTime(0) { } virtual ~TReader(); virtual void listen(int maxConn) = 0; virtual bool sendMSG(const DataBlock& header, const DataBlock& msg, int streamId = 0) = 0; virtual void setSocket(Socket* s) = 0; + virtual void reset() = 0; void reconnect() { m_reconnect = true; } Mutex m_sending; @@ -132,6 +137,7 @@ protected: bool m_reconnect; u_int64_t m_tryAgain; u_int32_t m_interval; + u_int64_t m_downTime; }; class Transport : public SIGTransport @@ -182,6 +188,7 @@ public: { } virtual void reconnect(bool force); bool connectSocket(); + void resetReader(TReader* reader); u_int32_t getMsgLen(unsigned char* buf); bool addAddress(const NamedList ¶m, Socket* socket); bool bindSocket(); @@ -189,6 +196,7 @@ public: virtual bool transmitMSG(const DataBlock& header, const DataBlock& msg, int streamId = 0); private: TReader* m_reader; + Mutex m_readerMutex; bool m_streamer; int m_type; int m_state; @@ -212,7 +220,10 @@ public: virtual void listen(int maxConn) { } virtual bool sendBuffer(int streamId = 0); - void connectionDown(); + virtual void reset() + { m_transport->resetReader(this); } + void connectionDown(bool stop = true); + void stopThread(); private: Transport* m_transport; Socket* m_socket; @@ -238,11 +249,17 @@ public: virtual void setSocket(Socket* s); virtual void listen(int maxConn) { m_socket->listen(maxConn); } + virtual void reset() + { m_transport->resetReader(this); } bool bindSocket(); + void reconnectSocket(); + void updateTransportStatus(int status); private: - Socket* m_socket; Transport* m_transport; + Socket* m_socket; SocketAddr m_remote; + u_int32_t m_reconnectInterval; + u_int64_t m_reconnectTryAgain; }; class TransportModule : public Module @@ -257,6 +274,8 @@ private: static TransportModule plugin; YSIGFACTORY2(Transport); +static long s_maxDownAllowed = 10000000; + static const TokenDict s_transType[] = { { "none", Transport::None }, @@ -400,10 +419,12 @@ void ListenerThread::run() DDebug("ListenerThread",DebugNote, "Accept error: %s", strerror(m_socket->error())); continue; } else { - if (!m_transport->addSocket(newSoc,address)) + if (!m_transport->addSocket(newSoc,address)) { DDebug("ListenerThread",DebugNote,"Connection rejected for %s", address.host().c_str()); - Debug(DebugStub,"See if should be done peeloff for new incomming connections"); + newSoc->terminate(); + delete newSoc; + } } } if (m_transport) { @@ -454,9 +475,7 @@ TransportThread::~TransportThread() void TransportThread::run() { - if (!m_worker) - return; - while (true) { + while (!m_exit) { bool ret = false; if (m_worker->needConnect()) ret = m_worker->connectSocket(); @@ -467,6 +486,9 @@ void TransportThread::run() else Thread::msleep(5,true); } + m_worker->resetThread(); + m_worker->reset(); + m_worker = 0; } TReader::~TReader() @@ -496,13 +518,12 @@ void TransportWorker::stop() { if (!(m_thread && m_thread->running())) return; - m_thread->cancel(); + m_thread->exitThread(); for (unsigned int i = 2 * Thread::idleMsec(); i--; ) { Thread::msleep(1); if (!m_thread) return; } - m_thread->cancel(true); m_thread = 0; } @@ -527,9 +548,10 @@ SignallingComponent* Transport::create(const String& type, const NamedList& name } Transport::Transport(const NamedList ¶m) - : m_reader(0), m_state(Down), m_listener(0), m_config(param), m_endpoint(true), m_supportEvents(true) + : m_reader(0), m_readerMutex(true,"TransportReader"), m_state(Down), + m_listener(0), m_config(param), m_endpoint(true), m_supportEvents(true) { - setName("Transport"); + setName("Transport:" + param); Debug(this,DebugAll,"Transport created (%p)",this); } @@ -541,6 +563,18 @@ Transport::~Transport() Thread::yield(); Debug(this,DebugAll,"Destroying Transport [%p]",this); delete m_reader; + m_reader = 0; +} + +void Transport::resetReader(TReader* caller) +{ + if (!caller) + return; + m_readerMutex.lock(); + if (caller == m_reader) + m_reader = 0; + m_readerMutex.unlock(); + delete caller; } bool Transport::control(const NamedList ¶m) @@ -563,12 +597,16 @@ bool Transport::control(const NamedList ¶m) void Transport::reconnect(bool force) { + Lock lock(m_readerMutex); if (!m_reader) { Debug(this,DebugWarn,"Request to reconnect but the transport is not initialized!!"); return; } - if (m_state == Up && !force) + if (m_state == Up && !force) { + Debug(this,DebugInfo, + "Skipped transport restart. Transport is UP and force restart was not requested."); return; + } Debug(this,DebugInfo,"Transport reconnect requested"); m_reader->reconnect(); } @@ -595,6 +633,7 @@ bool Transport::initialize(const NamedList* params) m_listener->startup(); return true; } + Lock lock(m_readerMutex); if (m_streamer) m_reader = new StreamReader(this,0); else { @@ -689,10 +728,15 @@ bool Transport::bindSocket() delete socket; return false; } + Lock lock(m_readerMutex); + if (!m_reader) { + Debug(this,DebugFail,"Bind socket null reader!!"); + return false; + } m_reader->setSocket(socket); int linger = m_config.getIntValue("linger",0); socket->setLinger(linger); - setStatus(Up); + if (m_type == Sctp) { // Send a dummy MGMT NTFY message to create the connection static const unsigned char dummy[8] = @@ -702,7 +746,8 @@ bool Transport::bindSocket() if (m_reader->sendMSG(hdr,DataBlock::empty(),1)) m_reader->listen(1); hdr.clear(false); - } + } else + setStatus(Up); return true; } @@ -846,6 +891,13 @@ bool Transport::connectSocket() Debug(this,DebugNote,"Socket conected to %d addresses",o.count()); } sock->setBlocking(false); + Lock mylock(m_readerMutex); + if (!m_reader) { + Debug(this,DebugFail,"Connect socket null reader"); + sock->terminate(); + delete sock; + return false; + } m_reader->setSocket(sock); setStatus(Up); return true; @@ -858,7 +910,13 @@ void Transport::setStatus(int status) DDebug(this,DebugInfo,"State change: %s -> %s [%p]", lookup(m_state,s_transStatus,"?"),lookup(status,s_transStatus,"?"),this); m_state = status; + Lock mylock(m_readerMutex); + if (!m_reader) { + Debug(this,DebugFail,"setStatus null reader"); + return; + } m_reader->m_canSend = true; + mylock.drop(); notifyLayer((status == Up) ? SignallingInterface::LinkUp : SignallingInterface::LinkDown); } @@ -871,6 +929,7 @@ u_int32_t Transport::getMsgLen(unsigned char* buf) bool Transport::transmitMSG(const DataBlock& header, const DataBlock& msg, int streamId) { + Lock lock(m_readerMutex); if (!m_reader) { DDebug(this,DebugMild,"Cannot send message, no reader set [%p]",this); return false; @@ -881,8 +940,14 @@ bool Transport::transmitMSG(const DataBlock& header, const DataBlock& msg, int s bool Transport::addSocket(Socket* socket,SocketAddr& adress) { + Lock lock(m_readerMutex); if (transType() == Up) return false; + if (m_reader) { + StreamReader* sr = static_cast(m_reader); + m_reader = 0; + sr->reconnect(); + } SocketAddr addr(AF_INET); String address, adr = m_config.getValue("remote"); int port = defPort(); @@ -972,12 +1037,12 @@ void StreamReader::setSocket(Socket* s) bool StreamReader::sendMSG(const DataBlock& header, const DataBlock& msg, int streamId) { + Lock mylock(m_sending); if (!m_canSend) { DDebug(m_transport,DebugNote,"Cannot send message at this time"); return false; } bool ret = false; - Lock mylock(m_sending); if (((m_sendBuffer.length() + msg.length()) + header.length()) < MAX_BUF_SIZE) { m_sendBuffer += header; m_sendBuffer += msg; @@ -985,11 +1050,17 @@ bool StreamReader::sendMSG(const DataBlock& header, const DataBlock& msg, int st } else Debug(m_transport,DebugWarn,"Buffer Overrun"); + mylock.drop(); return sendBuffer(streamId) && ret; } bool StreamReader::sendBuffer(int streamId) { + Lock mylock(m_sending); + if (!m_canSend) { + DDebug(m_transport,DebugNote,"Cannot send message at this time"); + return false; + } if (!m_socket) return needConnect(); if (m_sendBuffer.null()) @@ -1000,8 +1071,10 @@ bool StreamReader::sendBuffer(int streamId) return false; } if (error) { - if (m_socket->updateError() && !m_socket->canRetry()) + if (m_socket->updateError() && !m_socket->canRetry()) { + mylock.drop(); connectionDown(); + } return false; } if (!sendOk) @@ -1015,6 +1088,7 @@ bool StreamReader::sendBuffer(int streamId) } if (m_transport->status() == Transport::Up) if (!s->valid()) { + mylock.drop(); connectionDown(); return false; } @@ -1026,6 +1100,7 @@ bool StreamReader::sendBuffer(int streamId) if (len <= 0) { if (!m_socket->canRetry()) { Debug(m_transport,DebugMild,"Send error detected. %s",strerror(errno)); + mylock.drop(); connectionDown(); } return false; @@ -1037,13 +1112,18 @@ bool StreamReader::sendBuffer(int streamId) bool StreamReader::connectSocket() { Time t; - if (t < m_tryAgain) { + if (t < m_tryAgain && !m_reconnect) { Thread::yield(true); return false; } + if (m_reconnect) { + m_reconnect = false; + m_interval = CONN_RETRY_MIN; + } m_tryAgain = t + m_interval; if (m_transport->connectSocket()) { m_interval = CONN_RETRY_MIN; + m_tryAgain = 0; return true; } m_tryAgain = Time::now() + m_interval; @@ -1056,14 +1136,17 @@ bool StreamReader::connectSocket() bool StreamReader::readData() { - if (!(m_socket && m_sending.lock(SignallingEngine::maxLockWait()))) + if (!(m_sending.lock(SignallingEngine::maxLockWait()) && m_socket)) return false; if (m_reconnect) { - connectionDown(); + connectionDown(false); m_sending.unlock(); + stopThread(); return false; } sendBuffer(); + if (!m_socket) + return false; m_sending.unlock(); int stream = 0, len = 0; SocketAddr addr; @@ -1172,32 +1255,40 @@ bool StreamReader::readData() return false; } -void StreamReader::connectionDown() { +void StreamReader::connectionDown(bool stopTh) { Debug(m_transport,DebugMild,"Connection down [%p]",m_socket); - m_transport->setStatus(Transport::Down); while (!m_sending.lock(Thread::idleUsec())) Thread::yield(); m_canSend = false; m_sendBuffer.clear(); - if (!m_socket) { - m_sending.unlock(); - return; + if (m_socket) { + m_socket->terminate(); + delete m_socket; + m_socket = 0; } - m_socket->terminate(); - if (m_transport->listen()) - stop(); - delete m_socket; - m_socket = 0; m_sending.unlock(); + + if (stopTh) { + m_transport->setStatus(Transport::Down); + stopThread(); + } } +void StreamReader::stopThread() +{ + m_transport->setStatus(Transport::Down); + if (!m_transport->listen()) + return; + stop(); +} /** * class MessageReader */ MessageReader::MessageReader(Transport* transport, Socket* sock, SocketAddr& addr) - : m_socket(sock), m_transport(transport), m_remote(addr) + : m_transport(transport), m_socket(sock), m_remote(addr), + m_reconnectInterval(s_maxDownAllowed),m_reconnectTryAgain(0) { DDebug(DebugAll,"Creating MessageReader [%p]",this); } @@ -1231,7 +1322,6 @@ bool MessageReader::bindSocket() Thread::yield(true); return false; } - m_tryAgain = t + m_interval; Lock mylock(m_sending); if (m_transport->bindSocket()) { m_interval = CONN_RETRY_MIN; @@ -1245,6 +1335,22 @@ bool MessageReader::bindSocket() return false; } + +void MessageReader::reconnectSocket() +{ + u_int64_t t = Time::now(); + if (t < m_reconnectTryAgain) { + Thread::yield(true); + return; + } + m_reconnect = true; + m_reconnectTryAgain = t + m_reconnectInterval; + // exponential backoff + m_reconnectInterval *= 2; + if (m_reconnectInterval > CONN_RETRY_MAX) + m_reconnectInterval = CONN_RETRY_MAX; +} + bool MessageReader::sendMSG(const DataBlock& header, const DataBlock& msg, int streamId) { if (!m_canSend) { @@ -1257,7 +1363,7 @@ bool MessageReader::sendMSG(const DataBlock& header, const DataBlock& msg, int s while (m_socket && m_socket->select(0,&sendOk,&error,Thread::idleUsec())) { if (error) { DDebug(m_transport,DebugAll,"Send error detected. %s",strerror(errno)); - m_transport->setStatus(Transport::Down); + updateTransportStatus(Transport::Down); break; } if (!sendOk) @@ -1303,22 +1409,33 @@ bool MessageReader::readData() if (m_socket && m_reconnect) { if (!reconLock.locked()) return false; - if (m_transport->status() != Transport::Up) - return false; // We are already in reconnecting state - m_transport->setStatus(Transport::Initiating); + if (m_downTime != 0) { + int sec = (int)((Time::now() - m_downTime) / 1000000); // usec to sec + Debug(m_transport,DebugNote,"Reconnecting sctp socket! is down for %d seconds.",sec); + } m_socket->terminate(); delete m_socket; m_socket = 0; + reconLock.drop(); + updateTransportStatus(Transport::Initiating); return false; } reconLock.drop(); if (!m_socket && !bindSocket()) return false; bool readOk = false,error = false; - if (!(running() && m_socket && m_socket->select(&readOk,0,&error,Thread::idleUsec()))) + if (!(running() && m_socket)) return false; - if (!readOk || error) + if (!m_socket->select(&readOk,0,&error,Thread::idleUsec())) { + if (m_transport->status() == Transport::Initiating) + reconnectSocket(); return false; + } + if ((!readOk || error)) { + if (!readOk && m_transport->status() == Transport::Initiating) + reconnectSocket(); + return false; + } unsigned char buffer[MAX_BUF_SIZE]; int stream = 0; @@ -1344,14 +1461,14 @@ bool MessageReader::readData() if (flags) { if (flags == 2) { DDebug(m_transport,DebugAll,"Sctp connection is Up"); - m_transport->setStatus(Transport::Up); + updateTransportStatus(Transport::Up); return true; } DDebug(m_transport,DebugNote,"Message error [%p] %d",m_socket,flags); if (m_transport->status() != Transport::Up) return false; Lock lock(m_sending); - m_transport->setStatus(Transport::Initiating); + updateTransportStatus(Transport::Initiating); Debug(m_transport,DebugInfo,"Terminating socket [%p] Reason: connection down!",m_socket); m_socket->terminate(); delete m_socket; @@ -1361,17 +1478,17 @@ bool MessageReader::readData() } else r = m_socket->recvFrom((void*)buffer,MAX_BUF_SIZE,addr); - if (r <= 0) return false; m_interval = CONN_RETRY_MIN; + m_reconnectInterval = s_maxDownAllowed; u_int32_t len = m_transport->getMsgLen(buffer); if ((unsigned int)r != len) { Debug(m_transport,DebugNote,"Protocol read error read: %d, expected %d",r,len); return false; } - m_transport->setStatus(Transport::Up); + updateTransportStatus(Transport::Up); DataBlock packet(buffer,r); packet.cut(-8); m_transport->processMSG(m_transport->getVersion((unsigned char*)buffer), @@ -1379,6 +1496,17 @@ bool MessageReader::readData() return true; } +void MessageReader::updateTransportStatus(int status) +{ + if (status == Transport::Up) + m_downTime = 0; + else if (m_downTime == 0) { + m_downTime = Time::now(); + m_reconnectTryAgain = m_downTime + m_reconnectInterval; + } + m_transport->setStatus(status); +} + /** * class TransportModule */ @@ -1397,8 +1525,12 @@ TransportModule::~TransportModule() void TransportModule::initialize() { + Output("Initializing module SigTransport"); + Configuration cfg(Engine::configFile("sigtransport")); + cfg.load(); + s_maxDownAllowed = cfg.getIntValue(YSTRING("general"),YSTRING("max_down"),10); + s_maxDownAllowed *=1000000; if (!m_init) { - Output("Initializing module SigTransport"); m_init = true; setup(); }