diff --git a/modules/server/sigtransport.cpp b/modules/server/sigtransport.cpp index 4d1d2faa..ddce01c0 100644 --- a/modules/server/sigtransport.cpp +++ b/modules/server/sigtransport.cpp @@ -113,14 +113,15 @@ class TReader : public TransportWorker, public Mutex { public: inline TReader() - : Mutex(true,"TReader"), m_canSend(true), m_isSending(false) + : Mutex(true,"TReader"), + m_sending(true,"TReader::sending"), m_canSend(true) { } virtual ~TReader(); virtual void listen(int maxConn) = 0; virtual bool sendMSG(const DataBlock& header, const DataBlock& msg, int streamId = 0) = 0; virtual void setSocket(Socket* s) = 0; + Mutex m_sending; bool m_canSend; - bool m_isSending; }; class Transport : public SIGTransport @@ -197,6 +198,7 @@ public: virtual void setSocket(Socket* s); virtual void listen(int maxConn) { } + virtual bool sendBuffer(int streamId = 0); private: Transport* m_transport; Socket* m_socket; @@ -250,6 +252,13 @@ static const TokenDict s_transType[] = { { 0, 0 } }; +static const TokenDict s_transStatus[] = { + { "up", Transport::Up }, + { "initiating", Transport::Initiating }, + { "down", Transport::Down }, + { 0, 0 } +}; + static void resolveAddress(const String& addr, String& ip, int& port) { ObjList* o = addr.split(':'); @@ -344,7 +353,7 @@ bool ListenerThread::init(const NamedList& param) Debug(DebugWarn,"Unable to bind to %s:%u %s",addr.host().c_str(),addr.port(),strerror(errno)); return false; } else - DDebug("ListenerThread",DebugAll,"Socket bind to %s:%u", + DDebug("ListenerThread",DebugAll,"Socket bound to %s:%u", addr.host().c_str(),addr.port()); if (multi && !addAddress(param)) return false; @@ -409,7 +418,7 @@ bool ListenerThread::addAddress(const NamedList ¶m) DDebug("ListenerThread",DebugNote,"Failed to bindx sctp socket [%p] %s",s,strerror(errno)); return false; } else - Debug(DebugNote,"Socket binded to %d auxiliar addresses",o.count()); + Debug(DebugNote,"Socket bound to %d auxiliary addresses",o.count()); return true; } @@ -770,7 +779,7 @@ bool Transport::connectSocket() delete s; return false; } else - Debug(DebugNote,"Socket conected to %d addresses",o.count()); + Debug(this,DebugNote,"Socket conected to %d addresses",o.count()); } m_reader->setSocket(sock); setStatus(Up); @@ -781,6 +790,8 @@ void Transport::setStatus(int status) { if (m_state == status) return; + DDebug(this,DebugInfo,"State change: %s -> %s [%p]", + lookup(m_state,s_transStatus,"?"),lookup(status,s_transStatus,"?"),this); m_state = status; m_reader->m_canSend = true; notifyLayer((status == Up) ? SignallingInterface::LinkUp : SignallingInterface::LinkDown); @@ -789,14 +800,16 @@ void Transport::setStatus(int status) u_int32_t Transport::getMsgLen(unsigned char* buf) { return ((unsigned int)buf[4] << 24) | ((unsigned int)buf[5] << 16) | - ((unsigned int)buf[6] << 8) | (unsigned int)buf[7]; + ((unsigned int)buf[6] << 8) | (unsigned int)buf[7]; } bool Transport::transmitMSG(const DataBlock& header, const DataBlock& msg, int streamId) { - if (!m_reader) + if (!m_reader) { + DDebug(this,DebugMild,"Cannot send message, no reader set [%p]",this); return false; + } bool ret = m_reader->sendMSG(header,msg,streamId); return ret; } @@ -884,50 +897,62 @@ void StreamReader::setSocket(Socket* s) bool StreamReader::sendMSG(const DataBlock& header, const DataBlock& msg, int streamId) { - if (!m_canSend) + if (!m_canSend) { + DDebug(m_transport,DebugNote,"Cannot send message at this time"); return false; - m_isSending = true; + } + bool ret = false; + Lock mylock(m_sending); if (((m_sendBuffer.length() + msg.length()) + header.length()) < MAX_BUF_SIZE) { m_sendBuffer += header; m_sendBuffer += msg; + ret = true; } else - Debug(m_transport,DebugAll,"Buffer Overrun"); - while (m_socket && m_sendBuffer.length()) { - bool sendOk = false, error = false; - if (m_socket->select(0,&sendOk,&error,Thread::idleUsec())) { - if (error) { - DDebug(m_transport,DebugAll,"Error detected. %s",strerror(errno)); - break; - } - if (!sendOk) - break; - int len = 0; - if (m_transport->transType() == Transport::Sctp) { - SctpSocket* s = static_cast(m_socket); - if (!s) { - DDebug(m_transport,DebugGoOn,"Sctp conversion failed"); - break; - } - int flags = 0; - len = s->sendMsg(m_sendBuffer.data(),m_sendBuffer.length(),streamId,flags); - } - else - len = m_socket->send(m_sendBuffer.data(),m_sendBuffer.length()); - if (len <= 0) - break; - m_sendBuffer.cut(-len); - } - break; + Debug(m_transport,DebugWarn,"Buffer Overrun"); + return sendBuffer(streamId) && ret; +} + +bool StreamReader::sendBuffer(int streamId) +{ + if (!m_socket) + return needConnect(); + if (m_sendBuffer.null()) + return true; + bool sendOk = false, error = false; + if (!m_socket->select(0,&sendOk,&error,Thread::idleUsec()) || error) { + DDebug(m_transport,DebugAll,"Error detected. %s",strerror(errno)); + return false; } - m_isSending = false; - return (m_sendBuffer.length() == 0); + if (!sendOk) + return true; + int len = 0; + if (m_transport->transType() == Transport::Sctp) { + SctpSocket* s = static_cast(m_socket); + if (!s) { + DDebug(m_transport,DebugGoOn,"Sctp conversion failed"); + return false; + } + int flags = 0; + len = s->sendMsg(m_sendBuffer.data(),m_sendBuffer.length(),streamId,flags); + } + else + len = m_socket->send(m_sendBuffer.data(),m_sendBuffer.length()); + if (len <= 0) { + DDebug(m_transport,DebugAll,"Send error detected. %s",strerror(errno)); + return false; + } + m_sendBuffer.cut(-len); + return true; } bool StreamReader::readData() { if (!m_socket) return false; + m_sending.lock(); + sendBuffer(); + m_sending.unlock(); bool readOk = false, error = false; if (!m_socket->select(&readOk,0,&error,Thread::idleUsec())) return false; @@ -953,9 +978,10 @@ bool StreamReader::readData() } DDebug(m_transport,DebugWarn,"Connection down [%p] %d",m_socket, flags); m_transport->setStatus(Transport::Down); - while (m_isSending) + while (!m_sending.lock(Thread::idleUsec())) Thread::yield(); m_canSend = false; + m_sending.unlock(); m_socket->terminate(); if (m_transport->listen()) stop(); @@ -1055,11 +1081,13 @@ void MessageReader::setSocket(Socket* s) bool MessageReader::sendMSG(const DataBlock& header, const DataBlock& msg, int streamId) { - if (!m_canSend) + if (!m_canSend) { + DDebug(m_transport,DebugNote,"Cannot send message at this time"); return false; + } bool sendOk = false, error = false; bool ret = false; - m_isSending = true; + Lock mylock(m_sending); while (m_socket && m_socket->select(0,&sendOk,&error,Thread::idleUsec())) { if (error) { DDebug(m_transport,DebugAll,"Send error detected. %s",strerror(errno)); @@ -1099,7 +1127,6 @@ bool MessageReader::sendMSG(const DataBlock& header, const DataBlock& msg, int s DDebug(m_transport,DebugMild,"Error sending message %d %d %s",len,totalLen,strerror(errno)); break; } - m_isSending = false; return ret; }