Mutex related changes in engine and channels.

git-svn-id: http://yate.null.ro/svn/yate/trunk@467 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
paulc 2005-08-02 02:20:00 +00:00
parent aac002b4f0
commit 215cd75a24
9 changed files with 126 additions and 36 deletions

View File

@ -27,6 +27,10 @@
using namespace TelEngine;
// this is to protect against two threads trying to (dis)connect a pair
// of call endpoints at the same time
static Mutex s_mutex(true);
CallEndpoint::CallEndpoint(const char* id)
: m_peer(0), m_id(id), m_mutex(0)
{
@ -52,7 +56,7 @@ void* CallEndpoint::getObject(const String& name) const
return RefObject::getObject(name);
}
bool CallEndpoint::connect(CallEndpoint* peer)
bool CallEndpoint::connect(CallEndpoint* peer, const char* reason)
{
if (!peer) {
disconnect();
@ -62,6 +66,14 @@ bool CallEndpoint::connect(CallEndpoint* peer)
return true;
DDebug(DebugAll,"CallEndpoint '%s' connecting peer %p to [%p]",m_id.c_str(),peer,this);
#if 0
if (!s_mutex.lock(5000000)) {
Debug(DebugFail,"Call connect failed - deadlock on call endpoint mutex!");
Engine::restart(0);
return false;
}
#endif
ref();
disconnect();
peer->ref();
@ -74,20 +86,33 @@ bool CallEndpoint::connect(CallEndpoint* peer)
}
m_peer = peer;
peer->setPeer(this);
connected();
peer->setPeer(this,reason);
connected(reason);
#if 0
s_mutex.unlock();
#endif
return true;
}
void CallEndpoint::disconnect(bool final, const char* reason)
bool CallEndpoint::disconnect(bool final, const char* reason)
{
if (!m_peer)
return;
return false;
DDebug(DebugAll,"CallEndpoint '%s' disconnecting peer %p from [%p]",m_id.c_str(),m_peer,this);
Lock lock(s_mutex,5000000);
if (!lock.mutex()) {
Debug(DebugFail,"Call disconnect failed - deadlock on call endpoint mutex!");
Engine::restart(0);
return false;
}
CallEndpoint *temp = m_peer;
m_peer = 0;
if (!temp)
return false;
ObjList* l = m_data.skipNull();
for (; l; l=l->skipNext()) {
@ -97,16 +122,18 @@ void CallEndpoint::disconnect(bool final, const char* reason)
}
temp->setPeer(0,reason);
if (final)
disconnected(true,reason);
lock.drop();
temp->deref();
deref();
return deref();
}
void CallEndpoint::setPeer(CallEndpoint* peer, const char* reason)
{
m_peer = peer;
if (m_peer)
connected();
connected(reason);
else
disconnected(false,reason);
}
@ -177,11 +204,7 @@ Channel::~Channel()
#ifdef DEBUG
Debugger debug(DebugAll,"Channel::~Channel()"," '%s' [%p]",m_id.c_str(),this);
#endif
m_timeout = 0;
status("deleted");
dropChan();
m_driver = 0;
m_mutex = 0;
cleanup();
}
void* Channel::getObject(const String& name) const
@ -196,7 +219,7 @@ void Channel::init()
status(direction());
m_mutex = m_driver;
if (m_driver) {
m_mutex->lock();
m_driver->lock();
debugName(m_driver->debugName());
debugChain(m_driver);
if (m_id.null())
@ -204,19 +227,29 @@ void Channel::init()
m_driver->m_total++;
m_driver->channels().append(this);
m_driver->changed();
m_mutex->unlock();
m_driver->unlock();
}
DDebug(this,DebugInfo,"Channel::init() '%s' [%p]",m_id.c_str(),this);
}
void Channel::cleanup()
{
m_timeout = 0;
status("deleted");
m_targetid.clear();
dropChan();
m_driver = 0;
m_mutex = 0;
}
void Channel::dropChan()
{
if (!m_driver)
return;
m_mutex->lock();
m_driver->lock();
if (m_driver->channels().remove(this,false))
m_driver->changed();
m_mutex->unlock();
m_driver->unlock();
}
void Channel::zeroRefs()

View File

@ -272,6 +272,11 @@ const String& DataEndpoint::toString() const
return m_name;
}
Mutex* DataEndpoint::mutex() const
{
return m_call ? m_call->mutex() : 0;
}
bool DataEndpoint::connect(DataEndpoint* peer)
{
if (!peer) {
@ -312,10 +317,10 @@ bool DataEndpoint::connect(DataEndpoint* peer)
return true;
}
void DataEndpoint::disconnect()
bool DataEndpoint::disconnect()
{
if (!m_peer)
return;
return false;
DDebug(DebugInfo,"DataEndpoint '%s' disconnecting peer %p from [%p]",m_name.c_str(),m_peer,this);
DataSource *s = getSource();
@ -338,7 +343,7 @@ void DataEndpoint::disconnect()
m_peer = 0;
temp->m_peer = 0;
temp->deref();
deref();
return deref();
}
void DataEndpoint::setSource(DataSource* source)

View File

@ -519,7 +519,8 @@ int Engine::run()
// Create worker thread if we didn't hear about any of them in a while
if (s_makeworker && (EnginePrivate::count < s_maxworkers)) {
Debug(DebugMild,"Creating new message dispatching thread (%d running)",EnginePrivate::count);
Debug(EnginePrivate::count ? DebugMild : DebugInfo,
"Creating new message dispatching thread (%d running)",EnginePrivate::count);
EnginePrivate *prv = new EnginePrivate;
prv->startup();
}
@ -805,6 +806,7 @@ static void usage(bool client, FILE* f)
#ifndef NDEBUG
" -D[options] Special debugging options\n"
" a Abort if bugs are encountered\n"
" m Attempt to debug mutex deadlocks\n"
" c Call dlclose() until it gets an error\n"
" i Reinitialize after 1st initialization\n"
" x Exit immediately after initialization\n"
@ -979,6 +981,9 @@ int Engine::main(int argc, const char** argv, const char** env, RunMode mode, bo
case 'a':
s_sigabrt = true;
break;
case 'm':
Mutex::wait(10000000);
break;
case 'c':
s_keepclosing = true;
break;

View File

@ -81,6 +81,7 @@ private:
using namespace TelEngine;
GlobalMutex s_global;
unsigned long s_maxwait = 0;
volatile int MutexPrivate::s_count = 0;
volatile int MutexPrivate::s_locks = 0;
@ -179,6 +180,11 @@ MutexPrivate::~MutexPrivate()
bool MutexPrivate::lock(long maxwait)
{
bool rval = false;
bool warn = false;
if (s_maxwait && (maxwait < 0)) {
maxwait = (long)s_maxwait;
warn = true;
}
GlobalMutex::lock();
ref();
GlobalMutex::unlock();
@ -213,6 +219,8 @@ bool MutexPrivate::lock(long maxwait)
else
deref();
GlobalMutex::unlock();
if (warn && !rval)
Debug(DebugFail,"Mutex lock failed for %lu usec!",maxwait);
return rval;
}
@ -316,4 +324,9 @@ int Mutex::locks()
return MutexPrivate::s_locks;
}
void Mutex::wait(unsigned long maxwait)
{
s_maxwait = maxwait;
}
/* vi: set ts=8 sw=4 sts=4 noet: */

View File

@ -391,7 +391,7 @@ public:
BOOL startExternalRTP(const char* remoteIP, WORD remotePort, H323Channel::Directions dir, YateH323_ExternalRTPChannel* chan);
void stoppedExternal(H323Channel::Directions dir);
void setRemoteAddress(const char* remoteIP, WORD remotePort);
void cleanups();
void cleanups(bool closeChans = true);
bool sendTone(Message& msg, const char* tone);
void setCallerID(const char* number, const char* name);
void rtpExecuted(Message& msg);
@ -894,11 +894,13 @@ void YateH323Connection::CleanUpOnCallEnd()
H323Connection::CleanUpOnCallEnd();
}
void YateH323Connection::cleanups()
void YateH323Connection::cleanups(bool closeChans)
{
m_chan = 0;
CloseAllLogicalChannels(true);
CloseAllLogicalChannels(false);
if (closeChans) {
CloseAllLogicalChannels(true);
CloseAllLogicalChannels(false);
}
}
H323Connection::AnswerCallResponse YateH323Connection::OnAnswerCall(const PString &caller,
@ -1705,6 +1707,7 @@ void YateH323Chan::zeroRefs()
// to block until the native data threads terminate
dropChan();
hangup();
cleanup();
return;
}
Channel::zeroRefs();
@ -1742,6 +1745,7 @@ void YateH323Chan::hangup()
m->setParam("error",err);
if (txt)
m->setParam("reason",txt);
tmp->cleanups(false);
tmp->ClearCall();
}
Engine::enqueue(m);
@ -1940,9 +1944,11 @@ bool H323Driver::msgExecute(Message& msg, String& dest)
dest.c_str());
PString p;
YateH323EndPoint* ep = hplugin.findEndpoint(msg.getValue("line"));
lock();
YateH323Connection* conn = ep ? static_cast<YateH323Connection*>(
ep->MakeCallLocked(dest.c_str(),p,&msg)
) : 0;
unlock();
if (conn) {
conn->Unlock();
return true;

View File

@ -187,7 +187,7 @@ void Connection::run()
// For the sake of responsiveness try to turn off the tcp assembly timer
int arg = 1;
if (!m_socket->setOption(SOL_SOCKET, TCP_NODELAY, &arg, sizeof(arg)))
Debug("RManager",DebugWarn, "Failed to set tcp socket to TCP_NODELAY mode: %s\n", strerror(m_socket->error()));
Debug("RManager",DebugMild, "Failed to set tcp socket to TCP_NODELAY mode: %s\n", strerror(m_socket->error()));
Output("Remote connection from %s",m_address.c_str());
m_auth = !s_cfg.getValue("general","password");

3
yate.8
View File

@ -68,6 +68,9 @@ Special debugging options
.B \-Da
Abort (coredumps if allowed) if bugs are encountered
.TP
.B \-Dm
Attempt to debug mutex deadlocks by setting a maximum 10s limit
.TP
.B \-Dc
Call
.I dlclose()

View File

@ -176,7 +176,9 @@ enum DebugLevel {
DebugFail = 0,
DebugGoOn = 2,
DebugWarn = 5,
DebugMild = 7,
DebugMild = 6,
DebugCall = 7,
DebugNote = 8,
DebugInfo = 9,
DebugAll = 10
};
@ -2265,7 +2267,7 @@ public:
/**
* Attempt to lock the mutex and eventually wait for it
* @param maxait Time in microseconds to wait for the mutex, -1 wait forever
* @param maxwait Time in microseconds to wait for the mutex, -1 wait forever
* @return True if successfully locked, false on failure
*/
bool lock(long maxwait = -1);
@ -2284,7 +2286,7 @@ public:
/**
* Check if the mutex is unlocked (try to lock and unlock the mutex)
* @param maxait Time in microseconds to wait for the mutex, -1 wait forever
* @param maxwait Time in microseconds to wait for the mutex, -1 wait forever
* @return True if successfully locked and unlocked, false on failure
*/
bool check(long maxwait = -1);
@ -2307,6 +2309,13 @@ public:
*/
static int locks();
/**
* Set a maximum mutex wait time for debugging purposes
* @param maxwait Maximum time in microseconds to wait for any mutex
* when no time limit was requested, zero to disable limit
*/
static void wait(unsigned long maxwait);
private:
MutexPrivate* privDataCopy() const;
MutexPrivate* m_private;

View File

@ -651,6 +651,12 @@ public:
*/
virtual const String& toString() const;
/**
* Get the mutex that serializes access to this data endpoint, if any
* @return Pointer to the call's mutex object or NULL
*/
Mutex* mutex() const;
/**
* Connect the source and consumer of the endpoint to a peer
* @param peer Pointer to the peer data endpoint
@ -660,8 +666,9 @@ public:
/**
* Disconnect from the connected endpoint
* @return True if the object was deleted, false if it still exists
*/
void disconnect();
bool disconnect();
/**
* Set the data source of this object
@ -818,22 +825,24 @@ public:
* Get the mutex that serializes access to this call endpoint, if any
* @return Pointer to the call's mutex object or NULL
*/
inline Mutex* mutex()
inline Mutex* mutex() const
{ return m_mutex; }
/**
* Connect the call endpoint to a peer.
* @param peer Pointer to the peer call endpoint.
* @param reason Text that describes connect reason.
* @return True if connected, false if an error occured.
*/
bool connect(CallEndpoint* peer);
bool connect(CallEndpoint* peer, const char* reason = 0);
/**
* Disconnect from the connected peer call endpoint.
* @param reason Text that describes disconnect reason.
* @return True if the object was deleted, false if it still exists
*/
inline void disconnect(const char* reason = 0)
{ disconnect(false,reason); }
inline bool disconnect(const char* reason = 0)
{ return disconnect(false,reason); }
/**
* Get a data endpoint of this object
@ -885,8 +894,9 @@ protected:
/**
* Connect notification method.
* @param reason Text that describes connect reason.
*/
virtual void connected() { }
virtual void connected(const char* reason) { }
/**
* Disconnect notification method.
@ -903,7 +913,7 @@ protected:
void setPeer(CallEndpoint* peer, const char* reason = 0);
private:
void disconnect(bool final, const char* reason);
bool disconnect(bool final, const char* reason);
};
/**
@ -1306,6 +1316,12 @@ protected:
*/
Channel(Driver& driver, const char* id = 0, bool outgoing = false);
/**
* Perform destruction time cleanup. You can call this method earlier
* if destruction is to be postponed.
*/
void cleanup();
/**
* Remove the channel from the parent driver list
*/