Fixed bugs in sigtransport.

Notify sigtran when a new sctp connection has been established.
Set custom names to transport thread and transport mutex for debugging.


git-svn-id: http://yate.null.ro/svn/yate/trunk@5338 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
andrei 2012-11-22 12:50:16 +00:00
parent 7f62a2c8fc
commit 7d449a804e
4 changed files with 351 additions and 68 deletions

View File

@ -49,6 +49,10 @@
; linger: int; How much to block waiting for socket to close.
;linger = 0
; listen-notify: boolean: True to notify the upper layer that a new incoming connection has been established
; Default to true
;listen-notify=true
; SCTP parameters
; NOTE these parameters will be set only for SCTP sockets

View File

@ -219,6 +219,25 @@ bool SIGTRAN::getSocketParams(const String& params, NamedList& result)
return true;
}
bool SIGTRAN::hasTransportThread()
{
m_transMutex.lock();
RefPointer<SIGTransport> trans = m_trans;
m_transMutex.unlock();
if (!trans)
return false;
return trans->hasThread();
}
void SIGTRAN::stopTransportThread()
{
m_transMutex.lock();
RefPointer<SIGTransport> trans = m_trans;
m_transMutex.unlock();
if (trans)
trans->stopThread();
}
// Attach or detach an user adaptation layer
void SIGTransport::attach(SIGTRAN* sigtran)
{
@ -238,9 +257,9 @@ u_int32_t SIGTransport::defPort() const
bool SIGTransport::processMSG(unsigned char msgVersion, unsigned char msgClass,
unsigned char msgType, const DataBlock& msg, int streamId) const
{
XDebug(this,DebugAll,"Received message class %s type %s (0x%02X)",
XDebug(this,DebugAll,"Received message class %s type %s (0x%02X) on stream %d",
lookup(msgClass,s_classes,"Unknown"),
SIGTRAN::typeName(msgClass,msgType,"Unknown"),msgType);
SIGTRAN::typeName(msgClass,msgType,"Unknown"),msgType,streamId);
return alive() && m_sigtran && m_sigtran->processMSG(msgVersion,msgClass,msgType,msg,streamId);
}
@ -256,9 +275,9 @@ bool SIGTransport::transmitMSG(unsigned char msgVersion, unsigned char msgClass,
{
if (!alive())
return false;
XDebug(this,DebugAll,"Sending message class %s type %s (0x%02X)",
XDebug(this,DebugAll,"Sending message class %s type %s (0x%02X) on stream %d",
lookup(msgClass,s_classes,"Unknown"),
SIGTRAN::typeName(msgClass,msgType,"Unknown"),msgType);
SIGTRAN::typeName(msgClass,msgType,"Unknown"),msgType,streamId);
if (!connected(streamId)) {
Debug(this,DebugMild,"Cannot send message, stream %d not connected [%p]",
@ -283,6 +302,15 @@ bool SIGTransport::transmitMSG(unsigned char msgVersion, unsigned char msgClass,
return ok;
}
bool SIGTransport::transportNotify(SIGTransport* newTransport, const SocketAddr& addr)
{
if (alive() && m_sigtran) {
return m_sigtran->transportNotify(newTransport,addr);
}
TelEngine::destruct(newTransport);
return false;
}
/**
* Class SIGAdaptation
@ -501,6 +529,36 @@ static const TokenDict s_clientStates[] = {
};
#undef MAKE_NAME
static const TokenDict s_uaErrors[] = {
{ "Invalid Version", SIGAdaptation::InvalidVersion },
{ "Invalid Interface Identifier", SIGAdaptation::InvalidIID },
{ "Unsupported Message Class", SIGAdaptation::UnsupportedMessageClass },
{ "Unsupported Message Type", SIGAdaptation::UnsupportedMessageType },
{ "Unsupported Traffic Handling Mode", SIGAdaptation::UnsupportedTrafficMode },
{ "Unexpected Message", SIGAdaptation::UnexpectedMessage },
{ "Protocol Error", SIGAdaptation::ProtocolError },
{ "Unsupported Interface Identifier Type", SIGAdaptation::UnsupportedIIDType },
{ "Invalid Stream Identifier", SIGAdaptation::InvalidStreamIdentifier },
{ "Unassigned TEI", SIGAdaptation::UnassignedTEI },
{ "Unrecognized SAPI", SIGAdaptation::UnrecognizedSAPI },
{ "Invalid TEI, SAPI combination", SIGAdaptation::InvalidTEISAPI },
{ "Refused - Management Blocking", SIGAdaptation::ManagementBlocking },
{ "ASP Identifier Required", SIGAdaptation::ASPIDRequired },
{ "Invalid ASP Identifier", SIGAdaptation::InvalidASPID },
{ "ASP Active for Interface Identifier(s)", SIGAdaptation::ASPActiveIID },
{ "Invalid Parameter Value ", SIGAdaptation::InvalidParameterValue },
{ "Parameter Field Error", SIGAdaptation::ParameterFieldError },
{ "Unexpected Parameter", SIGAdaptation::UnexpectedParameter },
{ "Destination Status Unknown", SIGAdaptation::DestinationStatusUnknown },
{ "Invalid Network Appearance", SIGAdaptation::InvalidNetworkAppearance },
{ "Missing Parameter", SIGAdaptation::MissingParameter },
{ "Invalid Routing Context", SIGAdaptation::InvalidRoutingContext },
{ "No Configured AS for ASP", SIGAdaptation::NotConfiguredAS },
{ "Subsystem Status Unknown", SIGAdaptation::SubsystemStatusUnknown },
{ "Invalid loadsharing label", SIGAdaptation::InvalidLoadsharingLabel },
{ 0, 0 }
};
static const TokenDict s_trafficModes[] = {
{ "unused", SIGAdaptation::TrafficUnused },
{ "override", SIGAdaptation::TrafficOverride },
@ -682,7 +740,7 @@ bool SIGAdaptClient::processMgmtMSG(unsigned char msgType, const DataBlock& msg,
setState(AspDown);
return true;
default:
Debug(this,DebugWarn,"SG reported error %u",errCode);
Debug(this,DebugWarn,"SG reported error %u: %s",errCode,lookup(errCode,s_uaErrors,"Unknown"));
return true;
}
}
@ -907,6 +965,13 @@ SS7M2PA::~SS7M2PA()
DDebug(this,DebugAll,"Destroying SS7M2PA [%p]",this);
}
void SS7M2PA::destroyed()
{
stopTransportThread();
SIGTRAN::attach(0);
SS7Layer2::destroyed();
}
bool SS7M2PA::initialize(const NamedList* config)
{
#ifdef DEBUG
@ -924,6 +989,7 @@ bool SS7M2PA::initialize(const NamedList* config)
resolveConfig(YSTRING("basename"),params,config)) {
params.addParam("basename",params);
params.addParam("protocol","ss7");
params.addParam("listen-notify","false");
SIGTransport* tr = YSIGCREATE(SIGTransport,&params);
if (!tr)
return false;
@ -1930,7 +1996,7 @@ bool SS7M2UA::processMGMT(unsigned char msgType, const DataBlock& msg, int strea
m_linkState = LinkDown;
return true;
default:
Debug(this,DebugWarn,"M2UA SG reported error %u",errCode);
Debug(this,DebugWarn,"M2UA SG reported error %u: %s",errCode,lookup(errCode,s_uaErrors,"Unknown"));
return true;
}
}
@ -2101,6 +2167,9 @@ bool SS7M2UA::operational() const
return (m_linkState >= LinkUp) && !m_rpo;
}
/**
* ISDNIUAClient
*/
bool ISDNIUAClient::processMSG(unsigned char msgVersion, unsigned char msgClass,
unsigned char msgType, const DataBlock& msg, int streamId)
@ -2240,7 +2309,7 @@ bool ISDNIUA::processMGMT(unsigned char msgType, const DataBlock& msg, int strea
multipleFrameReleased(localTei(),false,true);
return true;
default:
Debug(this,DebugWarn,"IUA SG reported error %u",errCode);
Debug(this,DebugWarn,"IUA SG reported error %u: %s",errCode,lookup(errCode,s_uaErrors,"Unknown"));
return true;
}
}

View File

@ -4355,6 +4355,27 @@ public:
virtual bool getSocketParams(const String& params, NamedList& result)
{ return false; }
/**
* Notification that a new incomming connection has been made
* NOTE newTransport needs to be destroyed if will not be used
* @param newTransport The new created transport
* @param addr The newly created transport socket address
* @return True if the newTransport will be used.
*/
virtual bool transportNotify(SIGTransport* newTransport, const SocketAddr& addr);
/**
* Check if the transport thread is still running
* @return True if the thread is still running.
*/
virtual bool hasThread()
{ return false; }
/**
* Stop the transport thread
*/
virtual void stopThread()
{ }
protected:
/**
* Constructor
@ -4598,6 +4619,27 @@ public:
* @return True if operation was successful, false if an error occurred
*/
bool getSocketParams(const String& params, NamedList& result);
/**
* Notification that a new incomming connection has been made
* @param newTransport The new created transport
* @param addr The newly created transport socket address
* @return True if the newTransport will be used.
*/
virtual bool transportNotify(SIGTransport* newTransport, const SocketAddr& addr)
{ TelEngine::destruct(newTransport); return false; }
/**
* Check if the transport thread is running
* @return true if the transport thread is running
*/
bool hasTransportThread();
/**
* Stop the transport thread
*/
void stopTransportThread();
protected:
/**
* Process a complete message
@ -4636,6 +4678,35 @@ public:
TrafficBroadcast = 3,
};
enum Errors {
InvalidVersion = 0x01,
InvalidIID = 0x02,
UnsupportedMessageClass = 0x03,
UnsupportedMessageType = 0x04,
UnsupportedTrafficMode = 0x05,
UnexpectedMessage = 0x06,
ProtocolError = 0x07,
UnsupportedIIDType = 0x08,
InvalidStreamIdentifier = 0x09,
UnassignedTEI = 0x0a,
UnrecognizedSAPI = 0x0b,
InvalidTEISAPI = 0x0c,
ManagementBlocking = 0x0d,
ASPIDRequired = 0x0e,
InvalidASPID = 0x0f,
ASPActiveIID = 0x10,
InvalidParameterValue = 0x11,
ParameterFieldError = 0x12,
UnexpectedParameter = 0x13,
DestinationStatusUnknown = 0x14,
InvalidNetworkAppearance = 0x15,
MissingParameter = 0x16,
InvalidRoutingContext = 0x19,
NotConfiguredAS = 0x1a,
SubsystemStatusUnknown = 0x1b,
InvalidLoadsharingLabel = 0x1c
};
/**
* Destructor
*/
@ -5535,7 +5606,6 @@ protected:
/**
* Process a notification generated by the attached data link
* @param link Data link that generated the notification
* @return True if notification was processed
*/
virtual void notify(SS7Layer2* link) = 0;
};
@ -7087,6 +7157,17 @@ public:
*/
virtual bool control(M2PAOperations oper, NamedList* params = 0);
/**
* Execute a control operation. Operations can change the link status or
* can query the aligned status.
* @param oper Operation to execute
* @param params Optional parameters for the operation
* @return True if the command completed successfully, for query operations
* also indicates the data link is aligned and operational
*/
virtual bool control(SS7Layer2::Operation oper, NamedList* params = 0)
{ return control((M2PAOperations)oper,params); }
/**
* Retrieve the current link status indications
* @return Link status indication bits
@ -7235,6 +7316,7 @@ protected:
*/
void retransData();
virtual void destroyed();
private:
void dumpMsg(u_int8_t version, u_int8_t mClass, u_int8_t type,
const DataBlock& data, int stream, bool send);

View File

@ -51,7 +51,7 @@ class TransportWorker
friend class TransportThread;
public:
inline TransportWorker()
: m_thread(0)
: m_thread(0), m_threadMutex(true,"TransportThread")
{ }
virtual ~TransportWorker() { stop(); }
virtual bool readData() = 0;
@ -61,11 +61,22 @@ public:
bool running();
bool start(Thread::Priority prio = Thread::Normal);
inline void resetThread()
{ m_thread = 0; }
{
Lock myLock(m_threadMutex);
m_thread = 0;
}
virtual const char* getTransportName() = 0;
inline bool hasThread()
{
Lock myLock(m_threadMutex);
return m_thread != 0;
}
void exitThread();
protected:
void stop();
private:
TransportThread* m_thread;
Mutex m_threadMutex;
};
class SockRef : public RefObject
@ -88,16 +99,23 @@ class TransportThread : public Thread
{
friend class TransportWorker;
public:
inline TransportThread(TransportWorker* worker, Priority prio = Normal)
: Thread("SignallingTransporter",prio), m_worker(worker), m_exit(false)
inline TransportThread(TransportWorker* worker, String* tName, Priority prio = Normal)
: Thread(*tName,prio), m_worker(worker), m_exit(false), m_threadName(tName), m_cleanWorker(true)
{ }
virtual ~TransportThread();
virtual void run();
void exitThread()
{ m_exit = true; }
{
m_cleanWorker = false;
m_exit = true;
}
void resetWorker()
{ m_worker = 0; }
private:
TransportWorker* m_worker;
bool m_exit;
String* m_threadName;
bool m_cleanWorker;
};
class ListenerThread : public Thread
@ -116,7 +134,7 @@ private:
bool m_stream;
};
class TReader : public TransportWorker, public Mutex
class TReader : public TransportWorker, public Mutex, public RefObject
{
public:
inline TReader()
@ -158,7 +176,8 @@ public:
Down
};
virtual bool initialize(const NamedList* config);
Transport(const NamedList& params);
Transport(const NamedList& params, String* mutexName);
Transport(TransportType type, String* mutexName);
~Transport();
inline unsigned char getVersion(unsigned char* buf) const
{ return buf[0]; }
@ -179,6 +198,8 @@ public:
{ return m_state; }
inline void resetListener()
{ m_listener = 0; }
inline void startReading()
{ if (m_reader) m_reader->start(); }
bool addSocket(Socket* socket,SocketAddr& adress);
virtual bool reliable() const
{ return m_type == Sctp || m_type == Tcp; }
@ -196,6 +217,10 @@ public:
static SignallingComponent* create(const String& type, NamedList& params);
virtual bool transmitMSG(const DataBlock& header, const DataBlock& msg, int streamId = 0);
virtual bool getSocketParams(const String& params, NamedList& result);
virtual void destroyed();
virtual bool hasThread();
virtual void stopThread();
private:
TReader* m_reader;
Mutex m_readerMutex;
@ -203,9 +228,11 @@ private:
int m_type;
int m_state;
ListenerThread* m_listener;
const NamedList m_config;
NamedList m_config;
bool m_endpoint;
bool m_supportEvents;
bool m_listenNotify;
String* m_mutexName;
};
class StreamReader : public TReader
@ -224,9 +251,11 @@ public:
virtual bool sendBuffer(int streamId = 0);
virtual bool getSocketParams(const String& params, NamedList& result);
virtual void reset()
{ m_transport->resetReader(this); }
{ if (m_transport) m_transport->resetReader(this); }
void connectionDown(bool stop = true);
void stopThread();
virtual const char* getTransportName()
{ return m_transport ? m_transport->debugName() : ""; }
private:
Transport* m_transport;
Socket* m_socket;
@ -258,6 +287,8 @@ public:
bool bindSocket();
void reconnectSocket();
void updateTransportStatus(int status);
virtual const char* getTransportName()
{ return m_transport ? m_transport->debugName() : ""; }
private:
Transport* m_transport;
Socket* m_socket;
@ -279,7 +310,20 @@ private:
static TransportModule plugin;
YSIGFACTORY2(Transport);
static long s_maxDownAllowed = 10000000;
static ObjList s_names;
Mutex s_namesMutex(false,"TransportNames");
static void addName(String* name)
{
Lock myLock(s_namesMutex);
s_names.append(name);
}
static void removeName(String* name)
{
Lock myLock(s_namesMutex);
s_names.remove(name);
}
static const TokenDict s_transType[] = {
{ "none", Transport::None },
@ -359,6 +403,7 @@ bool ListenerThread::init(const NamedList& param)
Debug("ListenerThread",DebugConf,"Could not obtain SctpSocket");
return false;
}
delete m_socket;
m_socket = soc;
m_socket->create(AF_INET,m_stream ? SOCK_STREAM : SOCK_SEQPACKET,IPPROTO_SCTP);
break;
@ -472,9 +517,10 @@ bool ListenerThread::addAddress(const NamedList &param)
TransportThread::~TransportThread()
{
DDebug("TransportThread",DebugAll,"Destroying TransportThread [%p]",this);
if (m_worker)
m_worker->resetThread();
DDebug(DebugAll,"Destroying Transport Thread [%p]",this);
removeName(m_threadName);
}
void TransportThread::run()
@ -490,27 +536,35 @@ void TransportThread::run()
else
Thread::msleep(5,true);
}
if (!m_worker)
return;
m_worker->resetThread();
m_worker->reset();
if (m_cleanWorker)
m_worker->reset();
m_worker = 0;
}
TReader::~TReader()
{
Debug(DebugAll,"Destroying TReader [%p]",this);
DDebug(DebugAll,"Destroying TReader [%p]",this);
}
/** TransportWorker class */
bool TransportWorker::running()
{
Lock myLock(m_threadMutex);
return m_thread && m_thread->running();
}
bool TransportWorker::start(Thread::Priority prio)
{
if (!m_thread)
m_thread = new TransportThread(this,prio);
Lock myLock(m_threadMutex);
if (!m_thread) {
String* name = new String(getTransportName());
addName(name);
m_thread = new TransportThread(this,name,prio);
}
if (m_thread->running() || m_thread->startup())
return true;
m_thread->cancel(true);
@ -520,15 +574,31 @@ bool TransportWorker::start(Thread::Priority prio)
void TransportWorker::stop()
{
Lock myLock(m_threadMutex);
if (!(m_thread && m_thread->running()))
return;
m_thread->exitThread();
for (unsigned int i = 2 * Thread::idleMsec(); i--; ) {
if (m_thread == Thread::current()) {
m_thread->resetWorker();
m_thread = 0;
DDebug(DebugWarn,"Stopping TransportWorker from itself!! %p ", this);
return;
}
myLock.drop();
while (true) {
Thread::msleep(1);
if (!m_thread)
return;
}
m_thread = 0;
}
void TransportWorker::exitThread()
{
Lock myLock(m_threadMutex);
if (!m_thread)
return;
m_thread->exitThread();
}
/**
@ -542,6 +612,7 @@ SignallingComponent* Transport::create(const String& type, NamedList& name)
Configuration cfg(Engine::configFile("sigtransport"));
cfg.load();
NamedString* listenNotify = name.getParam(YSTRING("listen-notify"));
const char* sectName = name.getValue("basename");
NamedList* config = cfg.getSection(sectName);
if (!name.getBoolValue(YSTRING("local-config"),false))
@ -552,15 +623,30 @@ SignallingComponent* Transport::create(const String& type, NamedList& name)
} else
name.copyParams(*config);
return new Transport(*config);
if (listenNotify)
config->setParam(listenNotify->name(),*listenNotify);
String* mName = new String("TransportReader:");
mName->append(*config);
addName(mName);
return new Transport(*config,mName);
}
Transport::Transport(const NamedList &param)
: m_reader(0), m_readerMutex(true,"TransportReader"), m_state(Down),
m_listener(0), m_config(param), m_endpoint(true), m_supportEvents(true)
Transport::Transport(const NamedList &param, String* mutexName)
: m_reader(0), m_readerMutex(true,*mutexName), m_streamer(false), m_state(Down),
m_listener(0), m_config(param), m_endpoint(true), m_supportEvents(true),
m_listenNotify(true), m_mutexName(mutexName)
{
setName("Transport:" + param);
Debug(this,DebugAll,"Transport created (%p)",this);
DDebug(this,DebugAll,"Transport created (%p)",this);
m_listenNotify = param.getBoolValue("listen-notify",true);
}
Transport::Transport(TransportType type, String* mutexName)
: m_reader(0), m_readerMutex(true,*mutexName), m_streamer(true), m_type(type), m_state(Down),
m_listener(0), m_config(""), m_endpoint(true), m_supportEvents(true),
m_listenNotify(false), m_mutexName(mutexName)
{
DDebug(this,DebugInfo,"Creating new Transport [%p]",this);
}
Transport::~Transport()
@ -570,19 +656,31 @@ Transport::~Transport()
while (m_listener)
Thread::yield();
Debug(this,DebugAll,"Destroying Transport [%p]",this);
delete m_reader;
TelEngine::destruct(m_reader);
removeName(m_mutexName);
}
void Transport::destroyed()
{
m_readerMutex.lock();
TReader* tmp = m_reader;
m_reader = 0;
m_readerMutex.unlock();
TelEngine::destruct(tmp);
}
void Transport::resetReader(TReader* caller)
{
if (!caller)
return;
m_readerMutex.lock();
if (caller == m_reader)
m_reader = 0;
m_readerMutex.unlock();
delete caller;
if (m_reader) {
m_readerMutex.lock();
if (caller == m_reader)
m_reader = 0;
m_readerMutex.unlock();
}
if (m_listener)
TelEngine::destruct(caller);
}
bool Transport::control(const NamedList &param)
@ -655,6 +753,7 @@ bool Transport::initialize(const NamedList* params)
bindSocket();
}
m_reader->start();
lock.drop();
return true;
}
@ -724,7 +823,7 @@ bool Transport::bindSocket()
addr.host(address);
addr.port(port);
if (!socket->bind(addr)) {
Debug(DebugMild,"Unable to bind to %s:%u: %d: %s",
Debug(this,DebugMild,"Unable to bind to %s:%u: %d: %s",
addr.host().c_str(),addr.port(),errno,strerror(errno));
socket->terminate();
delete socket;
@ -737,10 +836,8 @@ bool Transport::bindSocket()
return false;
}
Lock lock(m_readerMutex);
if (!m_reader) {
Debug(this,DebugFail,"Bind socket null reader!!");
if (!m_reader)
return false;
}
m_reader->setSocket(socket);
int linger = m_config.getIntValue("linger",0);
socket->setLinger(linger);
@ -919,10 +1016,8 @@ void Transport::setStatus(int status)
lookup(m_state,s_transStatus,"?"),lookup(status,s_transStatus,"?"),this);
m_state = status;
Lock mylock(m_readerMutex);
if (!m_reader) {
Debug(this,DebugFail,"setStatus null reader");
if (!m_reader)
return;
}
m_reader->m_canSend = true;
mylock.drop();
notifyLayer((status == Up) ? SignallingInterface::LinkUp : SignallingInterface::LinkDown);
@ -938,36 +1033,51 @@ u_int32_t Transport::getMsgLen(unsigned char* buf)
bool Transport::transmitMSG(const DataBlock& header, const DataBlock& msg, int streamId)
{
Lock lock(m_readerMutex);
if (!m_reader) {
DDebug(this,DebugMild,"Cannot send message, no reader set [%p]",this);
RefPointer<TReader> reader = m_reader;
if (!reader)
return false;
}
bool ret = m_reader->sendMSG(header,msg,streamId);
return ret;
lock.drop();
return reader->sendMSG(header,msg,streamId);;
}
bool Transport::addSocket(Socket* socket,SocketAddr& adress)
bool Transport::addSocket(Socket* socket,SocketAddr& socketAddress)
{
if (m_listenNotify) {
String* name = new String("Transport:");
name->append(socketAddress.host() + ":");
name->append((int)socketAddress.port());
addName(name);
Transport* newTrans = new Transport((TransportType)m_type,name);
if (!transportNotify(newTrans,socketAddress)) {
DDebug(this,DebugInfo,"New transport wasn't accepted!");
return false;
}
if (!newTrans->addSocket(socket,socketAddress)) {
newTrans->setStatus(Down);
return false;
}
return true;
}
Lock lock(m_readerMutex);
if (transType() == Up)
return false;
if (m_reader) {
StreamReader* sr = static_cast<StreamReader*>(m_reader);
m_reader = 0;
sr->reconnect();
TelEngine::destruct(sr);
}
SocketAddr addr(AF_INET);
String address, adr = m_config.getValue("remote");
int port = defPort();
resolveAddress(adr,address,port);
addr.host(address);
addr.port(port);
if (!m_config.c_str()) {
m_config.assign(String(socketAddress.host().c_str()) << ":" << socketAddress.port());
setName(m_config);
}
socket->setBlocking(false);
switch (m_type) {
case Sctp :
{
Socket* sock = 0;
Message m("socket.sctp");
m.addParam("handle",String(socket->handle()));
m.addParam("handle",String(socket->detach()));
delete socket;
SockRef* s = new SockRef(&sock);
m.userData(s);
TelEngine::destruct(s);
@ -988,10 +1098,11 @@ bool Transport::addSocket(Socket* socket,SocketAddr& adress)
ppid = m_config.getIntValue("payload",ppid);
if (ppid > 0)
soc->setPayload(ppid);
soc->setBlocking(false);
if (m_streamer)
m_reader = new StreamReader(this,soc);
else
m_reader = new MessageReader(this,soc,addr);
Debug(this,DebugStub,"Add socket requested to create sctp message reader!");
break;
}
case Unix :
@ -999,17 +1110,33 @@ bool Transport::addSocket(Socket* socket,SocketAddr& adress)
m_reader = new StreamReader(this,socket);
break;
case Udp :
m_reader = new MessageReader(this,socket,addr);
Debug(this,DebugStub,"Add socket requested to create message reader for UDP socket type!");
break;
default:
Debug(this,DebugWarn,"Unknown socket type %d",m_type);
return false;
}
m_reader->start();
setStatus(Up);
m_reader->start();
return true;
}
bool Transport::hasThread()
{
Lock lock(m_readerMutex);
if (!m_reader)
return false;
return m_reader->hasThread();
}
void Transport::stopThread()
{
Lock lock(m_readerMutex);
if (m_reader)
m_reader->exitThread();
}
/**
* class StreamReader
*/
@ -1080,8 +1207,8 @@ bool StreamReader::sendBuffer(int streamId)
}
if (error) {
if (m_socket->updateError() && !m_socket->canRetry()) {
mylock.drop();
connectionDown();
m_reconnect = true;
m_canSend = false;
}
return false;
}
@ -1096,8 +1223,8 @@ bool StreamReader::sendBuffer(int streamId)
}
if (m_transport->status() == Transport::Up)
if (!s->valid()) {
mylock.drop();
connectionDown();
m_reconnect = true;
m_canSend = false;
return false;
}
int flags = 0;
@ -1108,8 +1235,8 @@ bool StreamReader::sendBuffer(int streamId)
if (len <= 0) {
if (!m_socket->canRetry()) {
Debug(m_transport,DebugMild,"Send error detected. %s",strerror(errno));
mylock.drop();
connectionDown();
m_reconnect = true;
m_canSend = false;
}
return false;
}
@ -1200,7 +1327,7 @@ bool StreamReader::readData()
m_totalPacketLen = m_transport->getMsgLen(auxBuf);
if (m_totalPacketLen >= 8 && m_totalPacketLen < MAX_BUF_SIZE) {
m_totalPacketLen -= 8;
XDebug(m_transport,DebugAll,"Expecting %d bytes of packet data",m_totalPacketLen);
XDebug(m_transport,DebugAll,"Expecting %d bytes of packet data %d",m_totalPacketLen,stream);
if (!m_totalPacketLen) {
m_transport->setStatus(Transport::Up);
m_transport->processMSG(m_transport->getVersion((unsigned char*)m_headerBuffer.data()),
@ -1381,6 +1508,7 @@ bool MessageReader::sendMSG(const DataBlock& header, const DataBlock& msg, int s
while (m_socket && m_socket->select(0,&sendOk,&error,Thread::idleUsec())) {
if (error) {
DDebug(m_transport,DebugAll,"Send error detected. %s",strerror(errno));
mylock.drop();
updateTransportStatus(Transport::Down);
break;
}
@ -1414,7 +1542,7 @@ bool MessageReader::sendMSG(const DataBlock& header, const DataBlock& msg, int s
ret = true;
break;
}
DDebug(m_transport,DebugMild,"Error sending message %d %d %s",len,totalLen,strerror(errno));
DDebug(m_transport,DebugMild,"Error sending message %d %d %s %s %d",len,totalLen,strerror(errno),m_remote.host().c_str(),m_remote.port());
break;
}
return ret;
@ -1485,8 +1613,8 @@ bool MessageReader::readData()
DDebug(m_transport,DebugNote,"Message error [%p] %d",m_socket,flags);
if (m_transport->status() != Transport::Up)
return false;
Lock lock(m_sending);
updateTransportStatus(Transport::Initiating);
Lock lock(m_sending);
Debug(m_transport,DebugInfo,"Terminating socket [%p] Reason: connection down!",m_socket);
m_socket->terminate();
delete m_socket;