From da79946d6ecebcf6e3d4d74146c6808f923e950a Mon Sep 17 00:00:00 2001 From: paulc Date: Fri, 8 Jun 2007 18:33:33 +0000 Subject: [PATCH] Changed the unlocking order to prevent races. Perform cleanups in the destroyed() method instead of the destructor. git-svn-id: http://yate.null.ro/svn/yate/trunk@1357 acf43c95-373e-0410-b603-e72c3f656dc1 --- engine/Channel.cpp | 2 +- engine/DataFormat.cpp | 66 ++++++++++++++++++++++++++----------------- modules/tonegen.cpp | 15 +++++----- modules/wavefile.cpp | 65 +++++++++++++++++++++++++++++------------- yatephone.h | 29 +++++++++++-------- 5 files changed, 112 insertions(+), 65 deletions(-) diff --git a/engine/Channel.cpp b/engine/Channel.cpp index e418ba5a..270a4d19 100644 --- a/engine/Channel.cpp +++ b/engine/Channel.cpp @@ -73,7 +73,7 @@ CallEndpoint::CallEndpoint(const char* id) { } -CallEndpoint::~CallEndpoint() +void CallEndpoint::destroyed() { #ifdef DEBUG ObjList* l = m_data.skipNull(); diff --git a/engine/DataFormat.cpp b/engine/DataFormat.cpp index c1e5949d..34935f5e 100644 --- a/engine/DataFormat.cpp +++ b/engine/DataFormat.cpp @@ -110,7 +110,6 @@ static TranslatorCaps s_stereoCaps[] = { }; static Mutex s_dataMutex(true); -static Mutex s_sourceMutex(true); class ThreadedSourcePrivate : public Thread { @@ -123,20 +122,18 @@ protected: virtual void run() { m_source->run(); - s_sourceMutex.lock(); - ThreadedSource* source = m_source; - m_source = 0; - if (source) { - source->m_thread = 0; - source->cleanup(); - } - s_sourceMutex.unlock(); + // execute cleanup from this thread if possible + cleanup(); } virtual void cleanup() { - if (m_source) - m_source->cleanup(); + RefObject::refMutex().lock(); + ThreadedSource* source = m_source; + m_source = 0; + RefObject::refMutex().unlock(); + if (source) + source->cleanup(); } private: @@ -435,7 +432,7 @@ const FormatInfo* DataFormat::getInfo() const } -DataConsumer::~DataConsumer() +void DataConsumer::destroyed() { if (m_source || m_override) { // this should not happen - but scream bloody murder if so @@ -446,6 +443,7 @@ DataConsumer::~DataConsumer() m_source->detach(this); if (m_override) m_override->detach(this); + DataNode::destroyed(); } void* DataConsumer::getObject(const String& name) const @@ -597,9 +595,10 @@ bool DataSource::detachInternal(DataConsumer* consumer) return false; } -DataSource::~DataSource() +void DataSource::destroyed() { clear(); + DataNode::destroyed(); } void DataSource::clear() @@ -645,9 +644,9 @@ DataEndpoint::DataEndpoint(CallEndpoint* call, const char* name) m_call->m_data.append(this); } -DataEndpoint::~DataEndpoint() +void DataEndpoint::destroyed() { - DDebug(DebugAll,"DataEndpoint::~DataEndpoint() '%s' call=%p [%p]", + DDebug(DebugAll,"DataEndpoint::destroyed() '%s' call=%p [%p]", m_name.c_str(),m_call,this); if (m_call) m_call->m_data.remove(this,false); @@ -657,6 +656,7 @@ DataEndpoint::~DataEndpoint() clearSniffers(); setSource(); setConsumer(); + RefObject::destroyed(); } void* DataEndpoint::getObject(const String& name) const @@ -935,13 +935,14 @@ void DataEndpoint::clearSniffers() } -ThreadedSource::~ThreadedSource() +void ThreadedSource::destroyed() { if (m_asyncDelete && m_thread) Debug(DebugFail,"ThreadedSource destroyed holding thread %p [%p]",m_thread,this); m_asyncDelete = false; if (m_thread) stop(); + DataSource::destroyed(); } bool ThreadedSource::start(const char* name, Thread::Priority prio) @@ -964,28 +965,40 @@ void ThreadedSource::stop() Lock lock(mutex()); if (!m_thread) return; - s_sourceMutex.lock(); + RefObject::refMutex().lock(); ThreadedSourcePrivate* tmp = m_thread; m_thread = 0; if (tmp) { - tmp->m_source = 0; - delete tmp; + if (tmp->m_source == this) + tmp->m_source = 0; + else + tmp = 0; } - s_sourceMutex.unlock(); + RefObject::refMutex().unlock(); + if (tmp) + delete tmp; } void ThreadedSource::cleanup() { - if (m_asyncDelete && !alive()) - DataSource::zeroRefs(); + Lock lock(RefObject::refMutex()); + m_thread = 0; + if (m_asyncDelete && !alive()) { + lock.drop(); + zeroRefs(); + } } -void ThreadedSource::zeroRefs() +bool ThreadedSource::zeroRefsTest() { // let the data thread destroy us if possible - if (m_asyncDelete && m_thread && m_thread->running()) - return; - DataSource::zeroRefs(); + if (m_asyncDelete && m_thread && m_thread->running()) { + m_thread = 0; + return false; + } + // if async not possible make sure we are set up for synchronous destruction + m_asyncDelete = false; + return DataSource::zeroRefsTest(); } Thread* ThreadedSource::thread() const @@ -995,6 +1008,7 @@ Thread* ThreadedSource::thread() const bool ThreadedSource::running() const { + Lock lock(RefObject::refMutex()); return m_thread && m_thread->running(); } diff --git a/modules/tonegen.cpp b/modules/tonegen.cpp index 9623d885..f57d038b 100644 --- a/modules/tonegen.cpp +++ b/modules/tonegen.cpp @@ -90,7 +90,7 @@ private: class ToneSource : public ThreadedSource { public: - virtual ~ToneSource(); + virtual void destroyed(); virtual void run(); inline const String& name() { return m_name; } @@ -438,17 +438,18 @@ ToneSource::ToneSource(const ToneDesc* tone) asyncDelete(true); } -ToneSource::~ToneSource() +void ToneSource::destroyed() { - Lock lock(__plugin); - Debug(&__plugin,DebugAll,"ToneSource::~ToneSource() [%p] total=%u stamp=%lu",this,m_total,timeStamp()); - stop(); + Debug(&__plugin,DebugAll,"ToneSource::destroyed() '%s' [%p] total=%u stamp=%lu", + m_name.c_str(),this,m_total,timeStamp()); + ThreadedSource::destroyed(); if (m_time) Debug(&__plugin,DebugInfo,"ToneSource rate=%u b/s",byteRate(m_time,m_total)); } void ToneSource::zeroRefs() { + Debug(&__plugin,DebugAll,"ToneSource::zeroRefs() '%s' [%p]",m_name.c_str(),this); __plugin.lock(); tones.remove(this,false); __plugin.unlock(); @@ -530,10 +531,8 @@ ToneSource* ToneSource::getTone(String& tone) ObjList* l = &tones; for (; l; l = l->next()) { ToneSource* t = static_cast(l->get()); - if (t && (t->name() == tone)) { - t->ref(); + if (t && (t->name() == tone) && t->ref()) return t; - } } ToneSource* t = new ToneSource(td); tones.append(t); diff --git a/modules/wavefile.cpp b/modules/wavefile.cpp index 8b321ff6..658d84df 100644 --- a/modules/wavefile.cpp +++ b/modules/wavefile.cpp @@ -40,6 +40,7 @@ public: ~WaveSource(); virtual void run(); virtual void cleanup(); + virtual bool zeroRefsTest(); void setNotify(const String& id); bool derefReady(); private: @@ -62,6 +63,7 @@ private: bool m_autoclose; bool m_autoclean; bool m_nodata; + bool m_insert; volatile bool m_derefOk; }; @@ -218,9 +220,11 @@ void WaveSource::init(const String& file, bool autorepeat) WaveSource::WaveSource(const char* file, CallEndpoint* chan, bool autoclose) : m_chan(chan), m_fd(-1), m_swap(false), m_brate(0), m_repeatPos(-1), m_total(0), m_time(0), m_autoclose(autoclose), m_autoclean(false), - m_nodata(false), m_derefOk(true) + m_nodata(false), m_insert(false), m_derefOk(true) { Debug(&__plugin,DebugAll,"WaveSource::WaveSource(\"%s\",%p) [%p]",file,chan,this); + if (m_chan) + m_insert = true; } WaveSource::~WaveSource() @@ -371,9 +375,6 @@ void WaveSource::run() Thread::usleep((unsigned long)dly); } if (!alive()) { - m_autoclose = false; - // if this is a zombie it surely has no owner anymore - m_chan = 0; notify(0,"replaced"); return; } @@ -383,20 +384,30 @@ void WaveSource::run() tpos += (r*(u_int64_t)1000000/m_brate); } while (r > 0); Debug(&__plugin,DebugAll,"WaveSource '%s' end of data (%u played) chan=%p [%p]",m_id.c_str(),m_total,m_chan,this); + if (!ref()) { + notify(0,"replaced"); + return; + } // prevent disconnector thread from succeeding before notify returns m_derefOk = false; // at cleanup time deref the data source if we start no disconnector thread - m_derefOk = m_autoclean = !notify(this,"eof"); + m_autoclean = !notify(this,"eof"); + if (!deref()) + m_derefOk = m_autoclean; } void WaveSource::cleanup() { - Debug(&__plugin,DebugAll,"WaveSource cleanup, total=%u, alive=%s, autoclean=%s [%p]", - m_total,String::boolText(alive()),String::boolText(m_autoclean),this); + Lock lock(DataEndpoint::commonMutex()); + Debug(&__plugin,DebugAll,"WaveSource cleanup, total=%u, alive=%s, autoclean=%s chan=%p [%p]", + m_total,String::boolText(alive()),String::boolText(m_autoclean),m_chan,this); + clearThread(); if (m_autoclean) { asyncDelete(false); - if (m_chan && (m_chan->getSource() == this)) - m_chan->setSource(); + if (m_insert) { + if (m_chan && (m_chan->getSource() == this)) + m_chan->setSource(); + } else deref(); return; @@ -407,6 +418,21 @@ void WaveSource::cleanup() m_derefOk = true; } +bool WaveSource::zeroRefsTest() +{ + DDebug(&__plugin,DebugAll,"WaveSource::zeroRefsTest() chan=%p%s%s%s [%p]", + m_chan, + (thread() ? " thread" : ""), + (m_autoclose ? " close" : ""), + (m_autoclean ? " clean" : ""), + this); + // since this is a zombie it has no owner anymore and needs no removal + m_chan = 0; + m_autoclose = false; + m_autoclean = false; + return ThreadedSource::zeroRefsTest(); +} + void WaveSource::setNotify(const String& id) { m_id = id; @@ -440,8 +466,8 @@ bool WaveSource::notify(WaveSource* source, const char* reason) return false; } if (m_id || m_autoclose) { - DDebug(&__plugin,DebugInfo,"Preparing '%s' disconnector for '%s' chan '%s' source=%p [%p]", - reason,m_id.c_str(),(m_chan ? m_chan->id().c_str() : ""),source,this); + DDebug(&__plugin,DebugInfo,"Preparing '%s' disconnector for '%s' chan %p '%s' source=%p [%p]", + reason,m_id.c_str(),m_chan,(m_chan ? m_chan->id().c_str() : ""),source,this); Disconnector *disc = new Disconnector(m_chan,m_id,source,m_autoclose,reason); return disc->init(); } @@ -612,8 +638,8 @@ void WaveConsumer::Consume(const DataBlock& data, unsigned long tStamp) m_fd = -1; } if (m_chan) { - DDebug(&__plugin,DebugInfo,"Preparing 'maxlen' disconnector for '%s' chan '%s' in consumer [%p]", - m_id.c_str(),(m_chan ? m_chan->id().c_str() : ""),this); + DDebug(&__plugin,DebugInfo,"Preparing 'maxlen' disconnector for '%s' chan %p '%s' in consumer [%p]", + m_id.c_str(),m_chan,(m_chan ? m_chan->id().c_str() : ""),this); Disconnector *disc = new Disconnector(m_chan,m_id,0,false,"maxlen"); m_chan = 0; disc->init(); @@ -629,19 +655,20 @@ Disconnector::Disconnector(CallEndpoint* chan, const String& id, WaveSource* sou { if (id) { Message* m = new Message("chan.notify"); - if (chan) - m->addParam("id",chan->id()); + if (m_chan) + m->addParam("id",m_chan->id()); m->addParam("targetid",id); if (reason) m->addParam("reason",reason); - m->userData(chan); + m->userData(m_chan); m_msg = m; } if (source) { if (source->ref()) m_source = source; else { - Debug(&__plugin,DebugGoOn,"Disconnecting dead source %p",source); + Debug(&__plugin,DebugGoOn,"Disconnecting dead source %p, reason: '%s'", + source,reason); m_chan = 0; } } @@ -667,8 +694,8 @@ bool Disconnector::init() void Disconnector::run() { - DDebug(&__plugin,DebugAll,"Disconnector::run() chan=%p msg=%p source=%s disc=%s [%p]", - (void*)m_chan,m_msg,String::boolText(m_source),String::boolText(m_disc),this); + DDebug(&__plugin,DebugAll,"Disconnector::run() chan=%p msg=%p source=%p disc=%s [%p]", + (void*)m_chan,m_msg,m_source,String::boolText(m_disc),this); if (!m_chan) return; if (m_source) { diff --git a/yatephone.h b/yatephone.h index ce15a03b..02231081 100644 --- a/yatephone.h +++ b/yatephone.h @@ -366,9 +366,9 @@ public: { } /** - * Consumer's destructor - complains loudly if still attached to a source + * Destruct notification - complains loudly if still attached to a source */ - virtual ~DataConsumer(); + virtual void destroyed(); /** * Get a pointer to a derived class given that class name @@ -438,9 +438,9 @@ public: : DataNode(format), m_nextStamp(invalidStamp()), m_translator(0) { } /** - * Source's destructor - detaches all consumers + * Source's destruct notification - detaches all consumers */ - virtual ~DataSource(); + virtual void destroyed(); /** * Get a pointer to a derived class given that class name @@ -522,9 +522,9 @@ class YATE_API ThreadedSource : public DataSource friend class ThreadedSourcePrivate; public: /** - * The destructor, stops the thread + * The destruction notification, stops the thread */ - virtual ~ThreadedSource(); + virtual void destroyed(); /** * Starts the worker thread @@ -573,6 +573,12 @@ protected: inline void asyncDelete(bool async) { m_asyncDelete = async; } + /** + * Clear the worker thread pointer + */ + inline void clearThread() + { m_thread = 0; } + /** * The worker method. You have to reimplement it as you need */ @@ -587,8 +593,9 @@ protected: /** * Override so destruction can be delayed after all references were lost * to let the data pumping thread end normally + * @return True to delete the source right away, false to defer */ - virtual void zeroRefs(); + virtual bool zeroRefsTest(); private: ThreadedSourcePrivate* m_thread; @@ -851,9 +858,9 @@ public: DataEndpoint(CallEndpoint* call = 0, const char* name = "audio"); /** - * Destroys the endpoint, source and consumer + * Endpoint destruct notification, clears source and consumer */ - ~DataEndpoint(); + virtual void destroyed(); /** * Get a pointer to a derived class given that class name @@ -1033,9 +1040,9 @@ protected: public: /** - * Destructor + * Destruct notification, performs cleanups */ - virtual ~CallEndpoint(); + virtual void destroyed(); /** * Get a pointer to a derived class given that class name