Changed the unlocking order to prevent races. Perform cleanups in the

destroyed() method instead of the destructor.


git-svn-id: http://yate.null.ro/svn/yate/trunk@1357 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
paulc 2007-06-08 18:33:33 +00:00
parent 52bfb29082
commit da79946d6e
5 changed files with 112 additions and 65 deletions

View File

@ -73,7 +73,7 @@ CallEndpoint::CallEndpoint(const char* id)
{ {
} }
CallEndpoint::~CallEndpoint() void CallEndpoint::destroyed()
{ {
#ifdef DEBUG #ifdef DEBUG
ObjList* l = m_data.skipNull(); ObjList* l = m_data.skipNull();

View File

@ -110,7 +110,6 @@ static TranslatorCaps s_stereoCaps[] = {
}; };
static Mutex s_dataMutex(true); static Mutex s_dataMutex(true);
static Mutex s_sourceMutex(true);
class ThreadedSourcePrivate : public Thread class ThreadedSourcePrivate : public Thread
{ {
@ -123,20 +122,18 @@ protected:
virtual void run() virtual void run()
{ {
m_source->run(); m_source->run();
s_sourceMutex.lock(); // execute cleanup from this thread if possible
ThreadedSource* source = m_source; cleanup();
m_source = 0;
if (source) {
source->m_thread = 0;
source->cleanup();
}
s_sourceMutex.unlock();
} }
virtual void cleanup() virtual void cleanup()
{ {
if (m_source) RefObject::refMutex().lock();
m_source->cleanup(); ThreadedSource* source = m_source;
m_source = 0;
RefObject::refMutex().unlock();
if (source)
source->cleanup();
} }
private: private:
@ -435,7 +432,7 @@ const FormatInfo* DataFormat::getInfo() const
} }
DataConsumer::~DataConsumer() void DataConsumer::destroyed()
{ {
if (m_source || m_override) { if (m_source || m_override) {
// this should not happen - but scream bloody murder if so // this should not happen - but scream bloody murder if so
@ -446,6 +443,7 @@ DataConsumer::~DataConsumer()
m_source->detach(this); m_source->detach(this);
if (m_override) if (m_override)
m_override->detach(this); m_override->detach(this);
DataNode::destroyed();
} }
void* DataConsumer::getObject(const String& name) const void* DataConsumer::getObject(const String& name) const
@ -597,9 +595,10 @@ bool DataSource::detachInternal(DataConsumer* consumer)
return false; return false;
} }
DataSource::~DataSource() void DataSource::destroyed()
{ {
clear(); clear();
DataNode::destroyed();
} }
void DataSource::clear() void DataSource::clear()
@ -645,9 +644,9 @@ DataEndpoint::DataEndpoint(CallEndpoint* call, const char* name)
m_call->m_data.append(this); m_call->m_data.append(this);
} }
DataEndpoint::~DataEndpoint() void DataEndpoint::destroyed()
{ {
DDebug(DebugAll,"DataEndpoint::~DataEndpoint() '%s' call=%p [%p]", DDebug(DebugAll,"DataEndpoint::destroyed() '%s' call=%p [%p]",
m_name.c_str(),m_call,this); m_name.c_str(),m_call,this);
if (m_call) if (m_call)
m_call->m_data.remove(this,false); m_call->m_data.remove(this,false);
@ -657,6 +656,7 @@ DataEndpoint::~DataEndpoint()
clearSniffers(); clearSniffers();
setSource(); setSource();
setConsumer(); setConsumer();
RefObject::destroyed();
} }
void* DataEndpoint::getObject(const String& name) const void* DataEndpoint::getObject(const String& name) const
@ -935,13 +935,14 @@ void DataEndpoint::clearSniffers()
} }
ThreadedSource::~ThreadedSource() void ThreadedSource::destroyed()
{ {
if (m_asyncDelete && m_thread) if (m_asyncDelete && m_thread)
Debug(DebugFail,"ThreadedSource destroyed holding thread %p [%p]",m_thread,this); Debug(DebugFail,"ThreadedSource destroyed holding thread %p [%p]",m_thread,this);
m_asyncDelete = false; m_asyncDelete = false;
if (m_thread) if (m_thread)
stop(); stop();
DataSource::destroyed();
} }
bool ThreadedSource::start(const char* name, Thread::Priority prio) bool ThreadedSource::start(const char* name, Thread::Priority prio)
@ -964,28 +965,40 @@ void ThreadedSource::stop()
Lock lock(mutex()); Lock lock(mutex());
if (!m_thread) if (!m_thread)
return; return;
s_sourceMutex.lock(); RefObject::refMutex().lock();
ThreadedSourcePrivate* tmp = m_thread; ThreadedSourcePrivate* tmp = m_thread;
m_thread = 0; m_thread = 0;
if (tmp) { if (tmp) {
tmp->m_source = 0; if (tmp->m_source == this)
delete tmp; tmp->m_source = 0;
else
tmp = 0;
} }
s_sourceMutex.unlock(); RefObject::refMutex().unlock();
if (tmp)
delete tmp;
} }
void ThreadedSource::cleanup() void ThreadedSource::cleanup()
{ {
if (m_asyncDelete && !alive()) Lock lock(RefObject::refMutex());
DataSource::zeroRefs(); m_thread = 0;
if (m_asyncDelete && !alive()) {
lock.drop();
zeroRefs();
}
} }
void ThreadedSource::zeroRefs() bool ThreadedSource::zeroRefsTest()
{ {
// let the data thread destroy us if possible // let the data thread destroy us if possible
if (m_asyncDelete && m_thread && m_thread->running()) if (m_asyncDelete && m_thread && m_thread->running()) {
return; m_thread = 0;
DataSource::zeroRefs(); return false;
}
// if async not possible make sure we are set up for synchronous destruction
m_asyncDelete = false;
return DataSource::zeroRefsTest();
} }
Thread* ThreadedSource::thread() const Thread* ThreadedSource::thread() const
@ -995,6 +1008,7 @@ Thread* ThreadedSource::thread() const
bool ThreadedSource::running() const bool ThreadedSource::running() const
{ {
Lock lock(RefObject::refMutex());
return m_thread && m_thread->running(); return m_thread && m_thread->running();
} }

View File

@ -90,7 +90,7 @@ private:
class ToneSource : public ThreadedSource class ToneSource : public ThreadedSource
{ {
public: public:
virtual ~ToneSource(); virtual void destroyed();
virtual void run(); virtual void run();
inline const String& name() inline const String& name()
{ return m_name; } { return m_name; }
@ -438,17 +438,18 @@ ToneSource::ToneSource(const ToneDesc* tone)
asyncDelete(true); asyncDelete(true);
} }
ToneSource::~ToneSource() void ToneSource::destroyed()
{ {
Lock lock(__plugin); Debug(&__plugin,DebugAll,"ToneSource::destroyed() '%s' [%p] total=%u stamp=%lu",
Debug(&__plugin,DebugAll,"ToneSource::~ToneSource() [%p] total=%u stamp=%lu",this,m_total,timeStamp()); m_name.c_str(),this,m_total,timeStamp());
stop(); ThreadedSource::destroyed();
if (m_time) if (m_time)
Debug(&__plugin,DebugInfo,"ToneSource rate=%u b/s",byteRate(m_time,m_total)); Debug(&__plugin,DebugInfo,"ToneSource rate=%u b/s",byteRate(m_time,m_total));
} }
void ToneSource::zeroRefs() void ToneSource::zeroRefs()
{ {
Debug(&__plugin,DebugAll,"ToneSource::zeroRefs() '%s' [%p]",m_name.c_str(),this);
__plugin.lock(); __plugin.lock();
tones.remove(this,false); tones.remove(this,false);
__plugin.unlock(); __plugin.unlock();
@ -530,10 +531,8 @@ ToneSource* ToneSource::getTone(String& tone)
ObjList* l = &tones; ObjList* l = &tones;
for (; l; l = l->next()) { for (; l; l = l->next()) {
ToneSource* t = static_cast<ToneSource*>(l->get()); ToneSource* t = static_cast<ToneSource*>(l->get());
if (t && (t->name() == tone)) { if (t && (t->name() == tone) && t->ref())
t->ref();
return t; return t;
}
} }
ToneSource* t = new ToneSource(td); ToneSource* t = new ToneSource(td);
tones.append(t); tones.append(t);

View File

@ -40,6 +40,7 @@ public:
~WaveSource(); ~WaveSource();
virtual void run(); virtual void run();
virtual void cleanup(); virtual void cleanup();
virtual bool zeroRefsTest();
void setNotify(const String& id); void setNotify(const String& id);
bool derefReady(); bool derefReady();
private: private:
@ -62,6 +63,7 @@ private:
bool m_autoclose; bool m_autoclose;
bool m_autoclean; bool m_autoclean;
bool m_nodata; bool m_nodata;
bool m_insert;
volatile bool m_derefOk; volatile bool m_derefOk;
}; };
@ -218,9 +220,11 @@ void WaveSource::init(const String& file, bool autorepeat)
WaveSource::WaveSource(const char* file, CallEndpoint* chan, bool autoclose) WaveSource::WaveSource(const char* file, CallEndpoint* chan, bool autoclose)
: m_chan(chan), m_fd(-1), m_swap(false), m_brate(0), m_repeatPos(-1), : m_chan(chan), m_fd(-1), m_swap(false), m_brate(0), m_repeatPos(-1),
m_total(0), m_time(0), m_autoclose(autoclose), m_autoclean(false), m_total(0), m_time(0), m_autoclose(autoclose), m_autoclean(false),
m_nodata(false), m_derefOk(true) m_nodata(false), m_insert(false), m_derefOk(true)
{ {
Debug(&__plugin,DebugAll,"WaveSource::WaveSource(\"%s\",%p) [%p]",file,chan,this); Debug(&__plugin,DebugAll,"WaveSource::WaveSource(\"%s\",%p) [%p]",file,chan,this);
if (m_chan)
m_insert = true;
} }
WaveSource::~WaveSource() WaveSource::~WaveSource()
@ -371,9 +375,6 @@ void WaveSource::run()
Thread::usleep((unsigned long)dly); Thread::usleep((unsigned long)dly);
} }
if (!alive()) { if (!alive()) {
m_autoclose = false;
// if this is a zombie it surely has no owner anymore
m_chan = 0;
notify(0,"replaced"); notify(0,"replaced");
return; return;
} }
@ -383,20 +384,30 @@ void WaveSource::run()
tpos += (r*(u_int64_t)1000000/m_brate); tpos += (r*(u_int64_t)1000000/m_brate);
} while (r > 0); } while (r > 0);
Debug(&__plugin,DebugAll,"WaveSource '%s' end of data (%u played) chan=%p [%p]",m_id.c_str(),m_total,m_chan,this); Debug(&__plugin,DebugAll,"WaveSource '%s' end of data (%u played) chan=%p [%p]",m_id.c_str(),m_total,m_chan,this);
if (!ref()) {
notify(0,"replaced");
return;
}
// prevent disconnector thread from succeeding before notify returns // prevent disconnector thread from succeeding before notify returns
m_derefOk = false; m_derefOk = false;
// at cleanup time deref the data source if we start no disconnector thread // at cleanup time deref the data source if we start no disconnector thread
m_derefOk = m_autoclean = !notify(this,"eof"); m_autoclean = !notify(this,"eof");
if (!deref())
m_derefOk = m_autoclean;
} }
void WaveSource::cleanup() void WaveSource::cleanup()
{ {
Debug(&__plugin,DebugAll,"WaveSource cleanup, total=%u, alive=%s, autoclean=%s [%p]", Lock lock(DataEndpoint::commonMutex());
m_total,String::boolText(alive()),String::boolText(m_autoclean),this); Debug(&__plugin,DebugAll,"WaveSource cleanup, total=%u, alive=%s, autoclean=%s chan=%p [%p]",
m_total,String::boolText(alive()),String::boolText(m_autoclean),m_chan,this);
clearThread();
if (m_autoclean) { if (m_autoclean) {
asyncDelete(false); asyncDelete(false);
if (m_chan && (m_chan->getSource() == this)) if (m_insert) {
m_chan->setSource(); if (m_chan && (m_chan->getSource() == this))
m_chan->setSource();
}
else else
deref(); deref();
return; return;
@ -407,6 +418,21 @@ void WaveSource::cleanup()
m_derefOk = true; m_derefOk = true;
} }
bool WaveSource::zeroRefsTest()
{
DDebug(&__plugin,DebugAll,"WaveSource::zeroRefsTest() chan=%p%s%s%s [%p]",
m_chan,
(thread() ? " thread" : ""),
(m_autoclose ? " close" : ""),
(m_autoclean ? " clean" : ""),
this);
// since this is a zombie it has no owner anymore and needs no removal
m_chan = 0;
m_autoclose = false;
m_autoclean = false;
return ThreadedSource::zeroRefsTest();
}
void WaveSource::setNotify(const String& id) void WaveSource::setNotify(const String& id)
{ {
m_id = id; m_id = id;
@ -440,8 +466,8 @@ bool WaveSource::notify(WaveSource* source, const char* reason)
return false; return false;
} }
if (m_id || m_autoclose) { if (m_id || m_autoclose) {
DDebug(&__plugin,DebugInfo,"Preparing '%s' disconnector for '%s' chan '%s' source=%p [%p]", DDebug(&__plugin,DebugInfo,"Preparing '%s' disconnector for '%s' chan %p '%s' source=%p [%p]",
reason,m_id.c_str(),(m_chan ? m_chan->id().c_str() : ""),source,this); reason,m_id.c_str(),m_chan,(m_chan ? m_chan->id().c_str() : ""),source,this);
Disconnector *disc = new Disconnector(m_chan,m_id,source,m_autoclose,reason); Disconnector *disc = new Disconnector(m_chan,m_id,source,m_autoclose,reason);
return disc->init(); return disc->init();
} }
@ -612,8 +638,8 @@ void WaveConsumer::Consume(const DataBlock& data, unsigned long tStamp)
m_fd = -1; m_fd = -1;
} }
if (m_chan) { if (m_chan) {
DDebug(&__plugin,DebugInfo,"Preparing 'maxlen' disconnector for '%s' chan '%s' in consumer [%p]", DDebug(&__plugin,DebugInfo,"Preparing 'maxlen' disconnector for '%s' chan %p '%s' in consumer [%p]",
m_id.c_str(),(m_chan ? m_chan->id().c_str() : ""),this); m_id.c_str(),m_chan,(m_chan ? m_chan->id().c_str() : ""),this);
Disconnector *disc = new Disconnector(m_chan,m_id,0,false,"maxlen"); Disconnector *disc = new Disconnector(m_chan,m_id,0,false,"maxlen");
m_chan = 0; m_chan = 0;
disc->init(); disc->init();
@ -629,19 +655,20 @@ Disconnector::Disconnector(CallEndpoint* chan, const String& id, WaveSource* sou
{ {
if (id) { if (id) {
Message* m = new Message("chan.notify"); Message* m = new Message("chan.notify");
if (chan) if (m_chan)
m->addParam("id",chan->id()); m->addParam("id",m_chan->id());
m->addParam("targetid",id); m->addParam("targetid",id);
if (reason) if (reason)
m->addParam("reason",reason); m->addParam("reason",reason);
m->userData(chan); m->userData(m_chan);
m_msg = m; m_msg = m;
} }
if (source) { if (source) {
if (source->ref()) if (source->ref())
m_source = source; m_source = source;
else { else {
Debug(&__plugin,DebugGoOn,"Disconnecting dead source %p",source); Debug(&__plugin,DebugGoOn,"Disconnecting dead source %p, reason: '%s'",
source,reason);
m_chan = 0; m_chan = 0;
} }
} }
@ -667,8 +694,8 @@ bool Disconnector::init()
void Disconnector::run() void Disconnector::run()
{ {
DDebug(&__plugin,DebugAll,"Disconnector::run() chan=%p msg=%p source=%s disc=%s [%p]", DDebug(&__plugin,DebugAll,"Disconnector::run() chan=%p msg=%p source=%p disc=%s [%p]",
(void*)m_chan,m_msg,String::boolText(m_source),String::boolText(m_disc),this); (void*)m_chan,m_msg,m_source,String::boolText(m_disc),this);
if (!m_chan) if (!m_chan)
return; return;
if (m_source) { if (m_source) {

View File

@ -366,9 +366,9 @@ public:
{ } { }
/** /**
* Consumer's destructor - complains loudly if still attached to a source * Destruct notification - complains loudly if still attached to a source
*/ */
virtual ~DataConsumer(); virtual void destroyed();
/** /**
* Get a pointer to a derived class given that class name * Get a pointer to a derived class given that class name
@ -438,9 +438,9 @@ public:
: DataNode(format), m_nextStamp(invalidStamp()), m_translator(0) { } : DataNode(format), m_nextStamp(invalidStamp()), m_translator(0) { }
/** /**
* Source's destructor - detaches all consumers * Source's destruct notification - detaches all consumers
*/ */
virtual ~DataSource(); virtual void destroyed();
/** /**
* Get a pointer to a derived class given that class name * Get a pointer to a derived class given that class name
@ -522,9 +522,9 @@ class YATE_API ThreadedSource : public DataSource
friend class ThreadedSourcePrivate; friend class ThreadedSourcePrivate;
public: public:
/** /**
* The destructor, stops the thread * The destruction notification, stops the thread
*/ */
virtual ~ThreadedSource(); virtual void destroyed();
/** /**
* Starts the worker thread * Starts the worker thread
@ -573,6 +573,12 @@ protected:
inline void asyncDelete(bool async) inline void asyncDelete(bool async)
{ m_asyncDelete = async; } { m_asyncDelete = async; }
/**
* Clear the worker thread pointer
*/
inline void clearThread()
{ m_thread = 0; }
/** /**
* The worker method. You have to reimplement it as you need * The worker method. You have to reimplement it as you need
*/ */
@ -587,8 +593,9 @@ protected:
/** /**
* Override so destruction can be delayed after all references were lost * Override so destruction can be delayed after all references were lost
* to let the data pumping thread end normally * to let the data pumping thread end normally
* @return True to delete the source right away, false to defer
*/ */
virtual void zeroRefs(); virtual bool zeroRefsTest();
private: private:
ThreadedSourcePrivate* m_thread; ThreadedSourcePrivate* m_thread;
@ -851,9 +858,9 @@ public:
DataEndpoint(CallEndpoint* call = 0, const char* name = "audio"); DataEndpoint(CallEndpoint* call = 0, const char* name = "audio");
/** /**
* Destroys the endpoint, source and consumer * Endpoint destruct notification, clears source and consumer
*/ */
~DataEndpoint(); virtual void destroyed();
/** /**
* Get a pointer to a derived class given that class name * Get a pointer to a derived class given that class name
@ -1033,9 +1040,9 @@ protected:
public: public:
/** /**
* Destructor * Destruct notification, performs cleanups
*/ */
virtual ~CallEndpoint(); virtual void destroyed();
/** /**
* Get a pointer to a derived class given that class name * Get a pointer to a derived class given that class name