diff --git a/engine/DataFormat.cpp b/engine/DataFormat.cpp index f38ae667..949135c5 100644 --- a/engine/DataFormat.cpp +++ b/engine/DataFormat.cpp @@ -912,6 +912,8 @@ void DataEndpoint::clearSniffers() ThreadedSource::~ThreadedSource() { + if (m_asyncDelete && m_thread) + Debug(DebugFail,"ThreadedSource destroyed holding thread %p [%p]",m_thread,this); stop(); } @@ -945,6 +947,16 @@ void ThreadedSource::stop() void ThreadedSource::cleanup() { + if (m_asyncDelete && !alive()) + delete this; +} + +void ThreadedSource::zeroRefs() +{ + // let the data thread destroy us if possible + if (m_asyncDelete && m_thread && m_thread->running()) + return; + DataSource::zeroRefs(); } Thread* ThreadedSource::thread() const @@ -952,6 +964,11 @@ Thread* ThreadedSource::thread() const return m_thread; } +bool ThreadedSource::running() const +{ + return m_thread && m_thread->running(); +} + DataTranslator::DataTranslator(const char* sFormat, const char* dFormat) : DataConsumer(sFormat) diff --git a/modules/tonegen.cpp b/modules/tonegen.cpp index 3e7188b9..2cc61970 100644 --- a/modules/tonegen.cpp +++ b/modules/tonegen.cpp @@ -101,6 +101,7 @@ public: static Tone* buildDtmf(const String& dtmf, int len = DTMF_LEN, int gap = DTMF_GAP); protected: ToneSource(const ToneDesc* tone = 0); + virtual void zeroRefs(); String m_name; const Tone* m_tone; int m_repeat; @@ -258,6 +259,7 @@ static const ToneDesc s_desc[] = { { 0, 0, 0 } }; + ToneData::ToneData(const char* desc) : m_f1(0), m_f2(0), m_mod(false), m_data(0) { @@ -383,6 +385,7 @@ ToneData* ToneData::getData(const char* desc) return d; } + ToneSource::ToneSource(const ToneDesc* tone) : m_tone(0), m_repeat(tone == 0), m_data(0,320), m_brate(16000), m_total(0), m_time(0) @@ -393,13 +396,13 @@ ToneSource::ToneSource(const ToneDesc* tone) } Debug(&__plugin,DebugAll,"ToneSource::ToneSource(%p) '%s' [%p]", tone,m_name.c_str(),this); + asyncDelete(true); } ToneSource::~ToneSource() { Lock lock(__plugin); Debug(&__plugin,DebugAll,"ToneSource::~ToneSource() [%p] total=%u stamp=%lu",this,m_total,timeStamp()); - tones.remove(this,false); stop(); if (m_time) { m_time = Time::now() - m_time; @@ -410,6 +413,14 @@ ToneSource::~ToneSource() } } +void ToneSource::zeroRefs() +{ + __plugin.lock(); + tones.remove(this,false); + __plugin.unlock(); + ThreadedSource::zeroRefs(); +} + bool ToneSource::startup() { return m_tone && start("ToneSource"); @@ -553,6 +564,7 @@ void ToneSource::run() m_time = 0; } + TempSource::TempSource(String& desc) : m_single(0) { @@ -604,6 +616,7 @@ void TempSource::cleanup() deref(); } + ToneChan::ToneChan(String& tone) : Channel(__plugin) { @@ -626,58 +639,6 @@ ToneChan::~ToneChan() Debug(this,DebugAll,"ToneChan::~ToneChan() %s [%p]",id().c_str(),this); } -bool ToneGenDriver::msgExecute(Message& msg, String& dest) -{ - CallEndpoint* ch = static_cast(msg.userData()); - if (ch) { - ToneChan *tc = new ToneChan(dest); - if (ch->connect(tc,msg.getValue("reason"))) { - msg.setParam("peerid",tc->id()); - tc->deref(); - } - else { - tc->destruct(); - return false; - } - } - else { - Message m("call.route"); - m.addParam("module",name()); - String callto(msg.getValue("direct")); - if (callto.null()) { - const char *targ = msg.getValue("target"); - if (!targ) { - Debug(DebugWarn,"Tone outgoing call with no target!"); - return false; - } - callto = msg.getValue("caller"); - if (callto.null()) - callto << prefix() << dest; - m.addParam("called",targ); - m.addParam("caller",callto); - if (!Engine::dispatch(m)) { - Debug(DebugWarn,"Tone outgoing call but no route!"); - return false; - } - callto = m.retValue(); - m.retValue().clear(); - } - m = "call.execute"; - m.addParam("callto",callto); - ToneChan *tc = new ToneChan(dest); - m.setParam("id",tc->id()); - m.userData(tc); - if (Engine::dispatch(m)) { - msg.setParam("id",tc->id()); - tc->deref(); - return true; - } - Debug(DebugWarn,"Tone outgoing call not accepted!"); - tc->destruct(); - return false; - } - return true; -} bool AttachHandler::received(Message& msg) { @@ -737,6 +698,60 @@ bool AttachHandler::received(Message& msg) return ret; } + +bool ToneGenDriver::msgExecute(Message& msg, String& dest) +{ + CallEndpoint* ch = static_cast(msg.userData()); + if (ch) { + ToneChan *tc = new ToneChan(dest); + if (ch->connect(tc,msg.getValue("reason"))) { + msg.setParam("peerid",tc->id()); + tc->deref(); + } + else { + tc->destruct(); + return false; + } + } + else { + Message m("call.route"); + m.addParam("module",name()); + String callto(msg.getValue("direct")); + if (callto.null()) { + const char *targ = msg.getValue("target"); + if (!targ) { + Debug(DebugWarn,"Tone outgoing call with no target!"); + return false; + } + callto = msg.getValue("caller"); + if (callto.null()) + callto << prefix() << dest; + m.addParam("called",targ); + m.addParam("caller",callto); + if (!Engine::dispatch(m)) { + Debug(DebugWarn,"Tone outgoing call but no route!"); + return false; + } + callto = m.retValue(); + m.retValue().clear(); + } + m = "call.execute"; + m.addParam("callto",callto); + ToneChan *tc = new ToneChan(dest); + m.setParam("id",tc->id()); + m.userData(tc); + if (Engine::dispatch(m)) { + msg.setParam("id",tc->id()); + tc->deref(); + return true; + } + Debug(DebugWarn,"Tone outgoing call not accepted!"); + tc->destruct(); + return false; + } + return true; +} + void ToneGenDriver::statusModule(String& str) { Module::statusModule(str); diff --git a/modules/wavefile.cpp b/modules/wavefile.cpp index 3495a651..f03deb99 100644 --- a/modules/wavefile.cpp +++ b/modules/wavefile.cpp @@ -125,6 +125,7 @@ private: INIT_PLUGIN(WaveFileDriver); + WaveSource::WaveSource(const String& file, CallEndpoint* chan, bool autoclose, bool autorepeat) : m_chan(chan), m_fd(-1), m_swap(false), m_brate(0), m_repeatPos(-1), m_total(0), m_time(0), m_autoclose(autoclose), m_autoclean(false), @@ -166,6 +167,7 @@ WaveSource::WaveSource(const String& file, CallEndpoint* chan, bool autoclose, b if (computeDataRate()) { if (autorepeat) m_repeatPos = ::lseek(m_fd,0,SEEK_CUR); + asyncDelete(true); start("WaveSource"); } else { @@ -335,13 +337,16 @@ void WaveSource::run() void WaveSource::cleanup() { - Debug(&__plugin,DebugAll,"WaveSource [%p] cleanup, total=%u",this,m_total); + Debug(&__plugin,DebugAll,"WaveSource cleanup, total=%u [%p]",m_total,this); if (m_autoclean) { + asyncDelete(false); if (m_chan && (m_chan->getSource() == this)) m_chan->setSource(); else deref(); + return; } + ThreadedSource::cleanup(); } void WaveSource::setNotify(const String& id) @@ -362,6 +367,7 @@ bool WaveSource::notify(DataSource* source, const char* reason) return false; } + WaveConsumer::WaveConsumer(const String& file, CallEndpoint* chan, unsigned maxlen) : m_chan(chan), m_fd(-1), m_total(0), m_maxlen(maxlen), m_time(0) { @@ -426,6 +432,7 @@ void WaveConsumer::Consume(const DataBlock& data, unsigned long tStamp) } } + Disconnector::Disconnector(CallEndpoint* chan, const String& id, DataSource* source, bool disc, const char* reason) : m_chan(chan), m_msg(0), m_source(source), m_disc(disc) { @@ -482,6 +489,7 @@ void Disconnector::run() } } + WaveChan::WaveChan(const String& file, bool record, unsigned maxlen, bool autorepeat) : Channel(__plugin) { @@ -501,6 +509,7 @@ WaveChan::~WaveChan() Debug(this,DebugAll,"WaveChan::~WaveChan() %s [%p]",id().c_str(),this); } + bool AttachHandler::received(Message &msg) { int more = 3; @@ -622,6 +631,7 @@ bool AttachHandler::received(Message &msg) return ret && !more; } + bool RecordHandler::received(Message &msg) { int more = 2; @@ -703,6 +713,7 @@ bool RecordHandler::received(Message &msg) return !more; } + bool WaveFileDriver::msgExecute(Message& msg, String& dest) { Regexp r("^\\([^/]*\\)/\\(.*\\)$"); diff --git a/yatephone.h b/yatephone.h index 02ec2157..d12f2d8d 100644 --- a/yatephone.h +++ b/yatephone.h @@ -545,13 +545,33 @@ public: */ Thread* thread() const; + /** + * Check if the data thread is running + * @return True if the data thread was started and is running + */ + bool running() const; + + /** + * Get the current status of the asynchronous deletion flag + */ + inline bool asyncDelete() const + { return m_asyncDelete; } + protected: /** * Threaded Source constructor * @param format Name of the data format, default "slin" (Signed Linear) */ inline ThreadedSource(const char* format = "slin") - : DataSource(format), m_thread(0) { } + : DataSource(format), m_thread(0), m_asyncDelete(false) + { } + + /** + * Derived classes should call this method to let the source to be + * destroyed asynchronously in the data thread + */ + inline void asyncDelete(bool async) + { m_asyncDelete = async; } /** * The worker method. You have to reimplement it as you need @@ -559,12 +579,20 @@ protected: virtual void run() = 0; /** - * The cleanup after thread method + * The cleanup after thread method, deletes the source if already + * dereferenced and set for asynchronous deletion */ virtual void cleanup(); + /** + * Override so destruction can be delayed after all references were lost + * to let the data pumping thread end normally + */ + virtual void zeroRefs(); + private: ThreadedSourcePrivate* m_thread; + bool m_asyncDelete; }; /**