Fixed race condition when (re)setting connection transaction pointer.

git-svn-id: http://yate.null.ro/svn/yate/trunk@5546 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
marian 2013-06-14 14:54:22 +00:00
parent 2cf95d0cf8
commit 266819937c
4 changed files with 157 additions and 84 deletions

View File

@ -665,16 +665,23 @@ void IAXEngine::releaseCallNo(u_int16_t lcallno)
}
IAXTransaction* IAXEngine::startLocalTransaction(IAXTransaction::Type type,
const SocketAddr& addr, IAXIEList& ieList)
const SocketAddr& addr, IAXIEList& ieList, bool refTrans, bool startTrans)
{
Lock lock(this);
Lock lck(this);
u_int16_t lcn = generateCallNo();
if (!lcn)
return 0;
IAXTransaction* tr = IAXTransaction::factoryOut(this,type,lcn,addr,ieList);
if (tr)
m_incompleteTransList.append(tr);
else
if (tr) {
if (!refTrans || tr->ref()) {
m_incompleteTransList.append(tr);
if (startTrans)
tr->start();
}
else
TelEngine::destruct(tr);
}
if (!tr)
releaseCallNo(lcn);
return tr;
}

View File

@ -120,7 +120,8 @@ IAXTransaction::IAXTransaction(IAXEngine* engine, IAXFullFrame* frame, u_int16_t
m_trunkInStartTime(0),
m_trunkInTsDelta(0),
m_trunkInTsDiffRestart(5000),
m_trunkInFirstTs(0)
m_trunkInFirstTs(0),
m_startIEs(0)
{
switch (frame->subclass()) {
case IAXControl::New:
@ -193,7 +194,8 @@ IAXTransaction::IAXTransaction(IAXEngine* engine, Type type, u_int16_t lcallno,
m_trunkInStartTime(0),
m_trunkInTsDelta(0),
m_trunkInTsDiffRestart(5000),
m_trunkInFirstTs(0)
m_trunkInFirstTs(0),
m_startIEs(0)
{
// Init data members
if (!m_addr.port()) {
@ -203,56 +205,56 @@ IAXTransaction::IAXTransaction(IAXEngine* engine, Type type, u_int16_t lcallno,
m_addr.port(4569);
}
init(ieList);
IAXControl::Type frametype;
IAXIEList* ies = new IAXIEList;
m_startIEs = new IAXIEList;
// Create IE list to send
switch (type) {
case New:
ies->insertVersion();
ies->appendString(IAXInfoElement::USERNAME,m_username);
ies->appendString(IAXInfoElement::CALLING_NUMBER,m_callingNo);
if (!ies->appendIE(ieList,IAXInfoElement::CALLINGTON))
ies->appendNumeric(IAXInfoElement::CALLINGTON,m_engine->callerNumType(),1);
if (!ies->appendIE(ieList,IAXInfoElement::CALLINGPRES))
ies->appendNumeric(IAXInfoElement::CALLINGPRES,m_engine->callingPres(),1);
if (!ies->appendIE(ieList,IAXInfoElement::CALLINGTNS))
ies->appendNumeric(IAXInfoElement::CALLINGTNS,0,2);
ies->appendString(IAXInfoElement::CALLING_NAME,m_callingName);
ies->appendString(IAXInfoElement::CALLED_NUMBER,m_calledNo);
ies->appendString(IAXInfoElement::CALLED_CONTEXT,m_calledContext);
ies->appendNumeric(IAXInfoElement::FORMAT,m_format.format() | m_formatVideo.format(),4);
ies->appendNumeric(IAXInfoElement::CAPABILITY,m_capability,4);
ies->appendString(IAXInfoElement::CODEC_PREFS,String::empty());
m_startIEs->insertVersion();
if (m_username)
m_startIEs->appendString(IAXInfoElement::USERNAME,m_username);
m_startIEs->appendString(IAXInfoElement::CALLING_NUMBER,m_callingNo);
if (!m_startIEs->appendIE(ieList,IAXInfoElement::CALLINGTON))
m_startIEs->appendNumeric(IAXInfoElement::CALLINGTON,m_engine->callerNumType(),1);
if (!m_startIEs->appendIE(ieList,IAXInfoElement::CALLINGPRES))
m_startIEs->appendNumeric(IAXInfoElement::CALLINGPRES,m_engine->callingPres(),1);
if (!m_startIEs->appendIE(ieList,IAXInfoElement::CALLINGTNS))
m_startIEs->appendNumeric(IAXInfoElement::CALLINGTNS,0,2);
if (m_callingName)
m_startIEs->appendString(IAXInfoElement::CALLING_NAME,m_callingName);
m_startIEs->appendString(IAXInfoElement::CALLED_NUMBER,m_calledNo);
if (m_calledContext)
m_startIEs->appendString(IAXInfoElement::CALLED_CONTEXT,m_calledContext);
m_startIEs->appendNumeric(IAXInfoElement::FORMAT,m_format.format() | m_formatVideo.format(),4);
m_startIEs->appendNumeric(IAXInfoElement::CAPABILITY,m_capability,4);
m_startIEs->appendString(IAXInfoElement::CODEC_PREFS,String::empty());
if (m_callToken)
ies->appendBinary(IAXInfoElement::CALLTOKEN,0,0);
frametype = IAXControl::New;
m_startIEs->appendBinary(IAXInfoElement::CALLTOKEN,0,0);
break;
case RegReq:
case RegRel:
ies->appendString(IAXInfoElement::USERNAME,m_username);
m_startIEs->appendString(IAXInfoElement::USERNAME,m_username);
if (type == RegReq)
ies->appendNumeric(IAXInfoElement::REFRESH,m_expire,2);
m_startIEs->appendNumeric(IAXInfoElement::REFRESH,m_expire,2);
if (m_callToken)
ies->appendBinary(IAXInfoElement::CALLTOKEN,0,0);
frametype = (type == RegReq ? IAXControl::RegReq : IAXControl::RegRel);
m_startIEs->appendBinary(IAXInfoElement::CALLTOKEN,0,0);
break;
case Poke:
frametype = IAXControl::Poke;
break;
default:
Debug(m_engine,DebugStub,"Transaction(%u,%u) outgoing with unsupported type %u [%p]",
localCallNo(),remoteCallNo(),m_type,this);
delete ies;
delete m_startIEs;
m_startIEs = 0;
m_type = Incorrect;
return;
}
init();
postFrameIes(IAXFrame::IAX,frametype,ies);
changeState(NewLocalInvite);
}
IAXTransaction::~IAXTransaction()
{
if (m_startIEs)
delete m_startIEs;
XDebug(m_engine,DebugAll,"IAXTransaction::~IAXTransaction(%u,%u). [%p]",
localCallNo(),remoteCallNo(),this);
}
@ -277,6 +279,31 @@ IAXTransaction* IAXTransaction::factoryOut(IAXEngine* engine, Type type, u_int16
return 0;
}
// Start an outgoing transaction
void IAXTransaction::start()
{
Lock lck(this);
if (!(outgoing() && state() == Unknown && m_startIEs))
return;
Debug(m_engine,DebugAll,"Transaction(%u) starting [%p]",localCallNo(),this);
switch (m_type) {
#define IAXTRANS_START(transtype,frmtype) \
case transtype: postFrameIes(IAXFrame::IAX,frmtype,m_startIEs); break
IAXTRANS_START(New,IAXControl::New);
IAXTRANS_START(RegReq,IAXControl::RegReq);
IAXTRANS_START(RegRel,IAXControl::RegRel);
IAXTRANS_START(Poke,IAXControl::Poke);
#undef IAXTRANS_START
default:
Debug(m_engine,DebugStub,"Transaction(%u,%u) outgoing with unsupported type %u [%p]",
localCallNo(),remoteCallNo(),m_type,this);
setDestroy();
return;
}
m_startIEs = 0;
changeState(NewLocalInvite);
}
IAXTransaction* IAXTransaction::processFrame(IAXFrame* frame)
{
if (!frame)
@ -593,7 +620,12 @@ unsigned int IAXTransaction::sendMedia(const DataBlock& data, unsigned int tStam
if (m_trunkOutSend)
m_trunkFrame->send();
}
// Release lock while sending full frame to avoid deadlock with transaction
// mutex
// There are places when this mutex is taken after transaction mutex
lck.drop();
postFrame(IAXFrame::Voice,fmt->out(),data.data(),data.length(),ts,true);
lck.acquire(d->m_outMutex);
sent = data.length();
}
else if (m_trunkFrame) {
@ -649,6 +681,9 @@ IAXEvent* IAXTransaction::getEvent(const Time& now)
return 0;
return keepEvent(terminate(IAXEvent::Terminated,true));
}
// Outgoing waiting to start
if (outgoing() && state() == Unknown)
return 0;
// Send ack for received frames
ackInFrames();
// Do we have a generated event ?

View File

@ -2017,8 +2017,14 @@ public:
{ m_destroy = true; }
/**
* Process a frame from remote peer
* This method is thread safe.
* Start an outgoing transaction.
* This method is thread safe
*/
void start();
/**
* Process a frame from remote peer.
* This method is thread safe
* @param frame IAX frame belonging to this transaction to process
* @return 'this' if successful or NULL if the frame is invalid
*/
@ -2672,6 +2678,8 @@ private:
u_int32_t m_trunkInTsDelta; // Value used to re-build ts: last voice timestamp
u_int32_t m_trunkInTsDiffRestart; // Incoming trunk without timestamp: diff between timestamps at which we restart
u_int32_t m_trunkInFirstTs; // Incoming trunk without timestamp: first trunk timestamp
// Postponed start
IAXIEList* m_startIEs; // Postponed start
};
/**
@ -3246,10 +3254,13 @@ protected:
* @param type Transaction type
* @param addr Remote address to send the request
* @param ieList First frame IE list
* @return IAXTransaction pointer on success.
* @param refTrans Return a refferenced transaction pointer
* @param startTrans Start transaction
* @return IAXTransaction pointer on success
*/
IAXTransaction* startLocalTransaction(IAXTransaction::Type type,
const SocketAddr& addr, IAXIEList& ieList);
const SocketAddr& addr, IAXIEList& ieList,
bool refTrans = false, bool startTrans = true);
private:
Socket m_socket; // Socket

View File

@ -396,6 +396,14 @@ public:
inline YIAXEngine* getEngine() const
{ return m_iaxEngine; }
// Find a channel. Return a refferenced pointer
inline Channel* findChan(Channel* chan) {
if (!chan)
return 0;
Lock lck(this);
return channels().find(chan) && chan->ref() ? chan : 0;
}
// Update codecs a list of parameters
// @return False if the result is 0 (no intersection)
bool updateCodecsFromRoute(u_int32_t& codecs, const NamedList& params, int type);
@ -536,7 +544,7 @@ protected:
bool safeRefIncrease();
private:
YIAXEngine* m_iaxEngine; // IAX engine owning the transaction
IAXTransaction* m_transaction; // IAX transaction
RefPointer<IAXTransaction> m_transaction; // IAX transaction
String m_password; // Password for client authentication
bool m_mutedIn; // No remote media accepted
bool m_mutedOut; // No local media accepted
@ -1113,20 +1121,20 @@ bool YIAXEngine::reg(YIAXLine* line, bool regreq)
if (line->callToken())
ieList.appendBinary(IAXInfoElement::CALLTOKEN,0,0);
// Make it !
IAXTransaction::Type t = regreq ? IAXTransaction::RegReq : IAXTransaction::RegRel;
IAXTransaction* tr = startLocalTransaction(t,addr,ieList,true,false);
if (!tr)
return false;
// Lock lines to protect line transaction pointer
// Lock engine to safe ref the transaction
Lock lck(s_lines);
lock();
line->m_transaction = startLocalTransaction(
regreq ? IAXTransaction::RegReq : IAXTransaction::RegRel,addr,ieList);
unlock();
if (line->m_transaction) {
line->m_transaction->setUserData(line);
Debug(&iplugin,DebugAll,"Line(%s) set transaction (%p) lCallNo=%u [%p]",
line->toString().c_str(),(IAXTransaction*)line->m_transaction,
line->m_transaction->localCallNo(),line);
}
return line->m_transaction != 0;
tr->setUserData(line);
line->m_transaction = tr;
Debug(&iplugin,DebugAll,"Line(%s) set transaction (%p) callno=%u [%p]",
line->toString().c_str(),tr,tr->localCallNo(),line);
lck.drop();
tr->start();
TelEngine::destruct(tr);
return true;
}
// Create a new call transaction from target address and message params
@ -1174,7 +1182,7 @@ IAXTransaction* YIAXEngine::call(SocketAddr& addr, NamedList& params)
ieList.appendNumeric(IAXInfoElement::CAPABILITY,codecs,4);
if (params.getBoolValue("calltoken_out",s_callTokenOut))
ieList.appendBinary(IAXInfoElement::CALLTOKEN,0,0);
return startLocalTransaction(IAXTransaction::New,addr,ieList);
return startLocalTransaction(IAXTransaction::New,addr,ieList,true,false);
}
// Create a POKE transaction
@ -1253,46 +1261,55 @@ void YIAXEngine::initFormats(NamedList* params)
// Process all IAX events
void YIAXEngine::processEvent(IAXEvent* event)
{
YIAXConnection* connection = 0;
switch (event->getTransaction()->type()) {
case IAXTransaction::New:
connection = static_cast<YIAXConnection*>(event->getTransaction()->getUserData());
if (connection) {
IAXTransaction* tr = event ? event->getTransaction() : 0;
if (!tr) {
if (event)
delete event;
return;
}
if (tr->type() == IAXTransaction::New) {
if (tr->getUserData()) {
Channel* chan = static_cast<Channel*>(tr->getUserData());
YIAXConnection* conn = static_cast<YIAXConnection*>(iplugin.findChan(chan));
if (conn) {
// We already have a channel for this call
connection->handleEvent(event);
conn->handleEvent(event);
if (event->final()) {
// Final event: disconnect
DDebug(this,DebugAll,"processEvent. Disconnecting (%p): '%s'",
connection,connection->id().c_str());
connection->disconnect();
conn,conn->id().c_str());
conn->disconnect();
}
TelEngine::destruct(conn);
}
else {
if (event->type() == IAXEvent::New) {
// Incoming request for a new call
if (iplugin.canAccept(true)) {
connection = new YIAXConnection(this,event->getTransaction());
connection->initChan();
event->getTransaction()->setUserData(connection);
if (!connection->route(event))
event->getTransaction()->setUserData(0);
}
else {
Debug(&iplugin,DebugWarn,"Refusing new IAX call, full or exiting");
// Cause code 42: switch congestion
event->getTransaction()->sendReject(0,42);
}
}
Debug(this,DebugNote,"No connection (%p) for transaction (%p) callno=%u",
conn,tr,tr->localCallNo());
tr->setDestroy();
}
break;
case IAXTransaction::RegReq:
case IAXTransaction::RegRel:
if (event->getTransaction()->outgoing())
s_lines.handleEvent(event);
else if (event->type() == IAXEvent::New || event->type() == IAXEvent::AuthRep)
processRemoteReg(event,(event->type() == IAXEvent::New));
break;
default: ;
}
else if (event->type() == IAXEvent::New) {
// Incoming request for a new call
if (iplugin.canAccept(true)) {
YIAXConnection* conn = new YIAXConnection(this,tr);
conn->initChan();
tr->setUserData(conn);
if (!conn->route(event))
tr->setUserData(0);
}
else {
Debug(&iplugin,DebugWarn,"Refusing new IAX call, full or exiting");
// Cause code 42: switch congestion
tr->sendReject(0,42);
}
}
}
else if (tr->type() == IAXTransaction::RegReq ||
tr->type() == IAXTransaction::RegRel) {
if (tr->outgoing())
s_lines.handleEvent(event);
else if (event->type() == IAXEvent::New || event->type() == IAXEvent::AuthRep)
processRemoteReg(event,(event->type() == IAXEvent::New));
}
delete event;
}
@ -1565,6 +1582,7 @@ bool YIAXDriver::msgExecute(Message& msg, String& dest)
conn->initChan();
tr->setUserData(conn);
m_iaxEngine->initTransaction(tr,msg,line);
tr->start();
if (conn->connect(ch,msg.getValue("reason"))) {
conn->callConnect(msg);
msg.setParam("peerid",conn->id());
@ -1574,6 +1592,7 @@ bool YIAXDriver::msgExecute(Message& msg, String& dest)
tr->setUserData(0);
tr->setDestroy();
}
TelEngine::destruct(tr);
line = 0;
conn->deref();
return true;
@ -1819,6 +1838,7 @@ YIAXConnection::~YIAXConnection()
setConsumer();
setSource();
hangup(0);
m_transaction = 0;
Debug(this,DebugAll,"Destroyed with reason '%s' [%p]",m_reason.safe(),this);
}