Fixed connect and send for stream transports.

Cleaned up and improved some debugging messages.


git-svn-id: http://voip.null.ro/svn/yate@3365 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
paulc 2010-06-08 14:59:40 +00:00
parent dd6513559a
commit 5ada883050
1 changed files with 69 additions and 42 deletions

View File

@ -113,14 +113,15 @@ class TReader : public TransportWorker, public Mutex
{ {
public: public:
inline TReader() inline TReader()
: Mutex(true,"TReader"), m_canSend(true), m_isSending(false) : Mutex(true,"TReader"),
m_sending(true,"TReader::sending"), m_canSend(true)
{ } { }
virtual ~TReader(); virtual ~TReader();
virtual void listen(int maxConn) = 0; virtual void listen(int maxConn) = 0;
virtual bool sendMSG(const DataBlock& header, const DataBlock& msg, int streamId = 0) = 0; virtual bool sendMSG(const DataBlock& header, const DataBlock& msg, int streamId = 0) = 0;
virtual void setSocket(Socket* s) = 0; virtual void setSocket(Socket* s) = 0;
Mutex m_sending;
bool m_canSend; bool m_canSend;
bool m_isSending;
}; };
class Transport : public SIGTransport class Transport : public SIGTransport
@ -197,6 +198,7 @@ public:
virtual void setSocket(Socket* s); virtual void setSocket(Socket* s);
virtual void listen(int maxConn) virtual void listen(int maxConn)
{ } { }
virtual bool sendBuffer(int streamId = 0);
private: private:
Transport* m_transport; Transport* m_transport;
Socket* m_socket; Socket* m_socket;
@ -250,6 +252,13 @@ static const TokenDict s_transType[] = {
{ 0, 0 } { 0, 0 }
}; };
static const TokenDict s_transStatus[] = {
{ "up", Transport::Up },
{ "initiating", Transport::Initiating },
{ "down", Transport::Down },
{ 0, 0 }
};
static void resolveAddress(const String& addr, String& ip, int& port) static void resolveAddress(const String& addr, String& ip, int& port)
{ {
ObjList* o = addr.split(':'); ObjList* o = addr.split(':');
@ -344,7 +353,7 @@ bool ListenerThread::init(const NamedList& param)
Debug(DebugWarn,"Unable to bind to %s:%u %s",addr.host().c_str(),addr.port(),strerror(errno)); Debug(DebugWarn,"Unable to bind to %s:%u %s",addr.host().c_str(),addr.port(),strerror(errno));
return false; return false;
} else } else
DDebug("ListenerThread",DebugAll,"Socket bind to %s:%u", DDebug("ListenerThread",DebugAll,"Socket bound to %s:%u",
addr.host().c_str(),addr.port()); addr.host().c_str(),addr.port());
if (multi && !addAddress(param)) if (multi && !addAddress(param))
return false; return false;
@ -409,7 +418,7 @@ bool ListenerThread::addAddress(const NamedList &param)
DDebug("ListenerThread",DebugNote,"Failed to bindx sctp socket [%p] %s",s,strerror(errno)); DDebug("ListenerThread",DebugNote,"Failed to bindx sctp socket [%p] %s",s,strerror(errno));
return false; return false;
} else } else
Debug(DebugNote,"Socket binded to %d auxiliar addresses",o.count()); Debug(DebugNote,"Socket bound to %d auxiliary addresses",o.count());
return true; return true;
} }
@ -770,7 +779,7 @@ bool Transport::connectSocket()
delete s; delete s;
return false; return false;
} else } else
Debug(DebugNote,"Socket conected to %d addresses",o.count()); Debug(this,DebugNote,"Socket conected to %d addresses",o.count());
} }
m_reader->setSocket(sock); m_reader->setSocket(sock);
setStatus(Up); setStatus(Up);
@ -781,6 +790,8 @@ void Transport::setStatus(int status)
{ {
if (m_state == status) if (m_state == status)
return; return;
DDebug(this,DebugInfo,"State change: %s -> %s [%p]",
lookup(m_state,s_transStatus,"?"),lookup(status,s_transStatus,"?"),this);
m_state = status; m_state = status;
m_reader->m_canSend = true; m_reader->m_canSend = true;
notifyLayer((status == Up) ? SignallingInterface::LinkUp : SignallingInterface::LinkDown); notifyLayer((status == Up) ? SignallingInterface::LinkUp : SignallingInterface::LinkDown);
@ -789,14 +800,16 @@ void Transport::setStatus(int status)
u_int32_t Transport::getMsgLen(unsigned char* buf) u_int32_t Transport::getMsgLen(unsigned char* buf)
{ {
return ((unsigned int)buf[4] << 24) | ((unsigned int)buf[5] << 16) | return ((unsigned int)buf[4] << 24) | ((unsigned int)buf[5] << 16) |
((unsigned int)buf[6] << 8) | (unsigned int)buf[7]; ((unsigned int)buf[6] << 8) | (unsigned int)buf[7];
} }
bool Transport::transmitMSG(const DataBlock& header, const DataBlock& msg, int streamId) bool Transport::transmitMSG(const DataBlock& header, const DataBlock& msg, int streamId)
{ {
if (!m_reader) if (!m_reader) {
DDebug(this,DebugMild,"Cannot send message, no reader set [%p]",this);
return false; return false;
}
bool ret = m_reader->sendMSG(header,msg,streamId); bool ret = m_reader->sendMSG(header,msg,streamId);
return ret; return ret;
} }
@ -884,50 +897,62 @@ void StreamReader::setSocket(Socket* s)
bool StreamReader::sendMSG(const DataBlock& header, const DataBlock& msg, int streamId) bool StreamReader::sendMSG(const DataBlock& header, const DataBlock& msg, int streamId)
{ {
if (!m_canSend) if (!m_canSend) {
DDebug(m_transport,DebugNote,"Cannot send message at this time");
return false; return false;
m_isSending = true; }
bool ret = false;
Lock mylock(m_sending);
if (((m_sendBuffer.length() + msg.length()) + header.length()) < MAX_BUF_SIZE) { if (((m_sendBuffer.length() + msg.length()) + header.length()) < MAX_BUF_SIZE) {
m_sendBuffer += header; m_sendBuffer += header;
m_sendBuffer += msg; m_sendBuffer += msg;
ret = true;
} }
else else
Debug(m_transport,DebugAll,"Buffer Overrun"); Debug(m_transport,DebugWarn,"Buffer Overrun");
while (m_socket && m_sendBuffer.length()) { return sendBuffer(streamId) && ret;
bool sendOk = false, error = false; }
if (m_socket->select(0,&sendOk,&error,Thread::idleUsec())) {
if (error) { bool StreamReader::sendBuffer(int streamId)
DDebug(m_transport,DebugAll,"Error detected. %s",strerror(errno)); {
break; if (!m_socket)
} return needConnect();
if (!sendOk) if (m_sendBuffer.null())
break; return true;
int len = 0; bool sendOk = false, error = false;
if (m_transport->transType() == Transport::Sctp) { if (!m_socket->select(0,&sendOk,&error,Thread::idleUsec()) || error) {
SctpSocket* s = static_cast<SctpSocket*>(m_socket); DDebug(m_transport,DebugAll,"Error detected. %s",strerror(errno));
if (!s) { return false;
DDebug(m_transport,DebugGoOn,"Sctp conversion failed");
break;
}
int flags = 0;
len = s->sendMsg(m_sendBuffer.data(),m_sendBuffer.length(),streamId,flags);
}
else
len = m_socket->send(m_sendBuffer.data(),m_sendBuffer.length());
if (len <= 0)
break;
m_sendBuffer.cut(-len);
}
break;
} }
m_isSending = false; if (!sendOk)
return (m_sendBuffer.length() == 0); return true;
int len = 0;
if (m_transport->transType() == Transport::Sctp) {
SctpSocket* s = static_cast<SctpSocket*>(m_socket);
if (!s) {
DDebug(m_transport,DebugGoOn,"Sctp conversion failed");
return false;
}
int flags = 0;
len = s->sendMsg(m_sendBuffer.data(),m_sendBuffer.length(),streamId,flags);
}
else
len = m_socket->send(m_sendBuffer.data(),m_sendBuffer.length());
if (len <= 0) {
DDebug(m_transport,DebugAll,"Send error detected. %s",strerror(errno));
return false;
}
m_sendBuffer.cut(-len);
return true;
} }
bool StreamReader::readData() bool StreamReader::readData()
{ {
if (!m_socket) if (!m_socket)
return false; return false;
m_sending.lock();
sendBuffer();
m_sending.unlock();
bool readOk = false, error = false; bool readOk = false, error = false;
if (!m_socket->select(&readOk,0,&error,Thread::idleUsec())) if (!m_socket->select(&readOk,0,&error,Thread::idleUsec()))
return false; return false;
@ -953,9 +978,10 @@ bool StreamReader::readData()
} }
DDebug(m_transport,DebugWarn,"Connection down [%p] %d",m_socket, flags); DDebug(m_transport,DebugWarn,"Connection down [%p] %d",m_socket, flags);
m_transport->setStatus(Transport::Down); m_transport->setStatus(Transport::Down);
while (m_isSending) while (!m_sending.lock(Thread::idleUsec()))
Thread::yield(); Thread::yield();
m_canSend = false; m_canSend = false;
m_sending.unlock();
m_socket->terminate(); m_socket->terminate();
if (m_transport->listen()) if (m_transport->listen())
stop(); stop();
@ -1055,11 +1081,13 @@ void MessageReader::setSocket(Socket* s)
bool MessageReader::sendMSG(const DataBlock& header, const DataBlock& msg, int streamId) bool MessageReader::sendMSG(const DataBlock& header, const DataBlock& msg, int streamId)
{ {
if (!m_canSend) if (!m_canSend) {
DDebug(m_transport,DebugNote,"Cannot send message at this time");
return false; return false;
}
bool sendOk = false, error = false; bool sendOk = false, error = false;
bool ret = false; bool ret = false;
m_isSending = true; Lock mylock(m_sending);
while (m_socket && m_socket->select(0,&sendOk,&error,Thread::idleUsec())) { while (m_socket && m_socket->select(0,&sendOk,&error,Thread::idleUsec())) {
if (error) { if (error) {
DDebug(m_transport,DebugAll,"Send error detected. %s",strerror(errno)); DDebug(m_transport,DebugAll,"Send error detected. %s",strerror(errno));
@ -1099,7 +1127,6 @@ bool MessageReader::sendMSG(const DataBlock& header, const DataBlock& msg, int s
DDebug(m_transport,DebugMild,"Error sending message %d %d %s",len,totalLen,strerror(errno)); DDebug(m_transport,DebugMild,"Error sending message %d %d %s",len,totalLen,strerror(errno));
break; break;
} }
m_isSending = false;
return ret; return ret;
} }