Implemented an alternative way of destroying a ThreadedSource from the data

pumping thread itself, should fix most threading related problems.


git-svn-id: http://voip.null.ro/svn/yate@1162 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
paulc 2007-01-12 22:16:38 +00:00
parent bec6531f02
commit fba53dfec5
4 changed files with 127 additions and 56 deletions

View File

@ -912,6 +912,8 @@ void DataEndpoint::clearSniffers()
ThreadedSource::~ThreadedSource() ThreadedSource::~ThreadedSource()
{ {
if (m_asyncDelete && m_thread)
Debug(DebugFail,"ThreadedSource destroyed holding thread %p [%p]",m_thread,this);
stop(); stop();
} }
@ -945,6 +947,16 @@ void ThreadedSource::stop()
void ThreadedSource::cleanup() void ThreadedSource::cleanup()
{ {
if (m_asyncDelete && !alive())
delete this;
}
void ThreadedSource::zeroRefs()
{
// let the data thread destroy us if possible
if (m_asyncDelete && m_thread && m_thread->running())
return;
DataSource::zeroRefs();
} }
Thread* ThreadedSource::thread() const Thread* ThreadedSource::thread() const
@ -952,6 +964,11 @@ Thread* ThreadedSource::thread() const
return m_thread; return m_thread;
} }
bool ThreadedSource::running() const
{
return m_thread && m_thread->running();
}
DataTranslator::DataTranslator(const char* sFormat, const char* dFormat) DataTranslator::DataTranslator(const char* sFormat, const char* dFormat)
: DataConsumer(sFormat) : DataConsumer(sFormat)

View File

@ -101,6 +101,7 @@ public:
static Tone* buildDtmf(const String& dtmf, int len = DTMF_LEN, int gap = DTMF_GAP); static Tone* buildDtmf(const String& dtmf, int len = DTMF_LEN, int gap = DTMF_GAP);
protected: protected:
ToneSource(const ToneDesc* tone = 0); ToneSource(const ToneDesc* tone = 0);
virtual void zeroRefs();
String m_name; String m_name;
const Tone* m_tone; const Tone* m_tone;
int m_repeat; int m_repeat;
@ -258,6 +259,7 @@ static const ToneDesc s_desc[] = {
{ 0, 0, 0 } { 0, 0, 0 }
}; };
ToneData::ToneData(const char* desc) ToneData::ToneData(const char* desc)
: m_f1(0), m_f2(0), m_mod(false), m_data(0) : m_f1(0), m_f2(0), m_mod(false), m_data(0)
{ {
@ -383,6 +385,7 @@ ToneData* ToneData::getData(const char* desc)
return d; return d;
} }
ToneSource::ToneSource(const ToneDesc* tone) ToneSource::ToneSource(const ToneDesc* tone)
: m_tone(0), m_repeat(tone == 0), : m_tone(0), m_repeat(tone == 0),
m_data(0,320), m_brate(16000), m_total(0), m_time(0) m_data(0,320), m_brate(16000), m_total(0), m_time(0)
@ -393,13 +396,13 @@ ToneSource::ToneSource(const ToneDesc* tone)
} }
Debug(&__plugin,DebugAll,"ToneSource::ToneSource(%p) '%s' [%p]", Debug(&__plugin,DebugAll,"ToneSource::ToneSource(%p) '%s' [%p]",
tone,m_name.c_str(),this); tone,m_name.c_str(),this);
asyncDelete(true);
} }
ToneSource::~ToneSource() ToneSource::~ToneSource()
{ {
Lock lock(__plugin); Lock lock(__plugin);
Debug(&__plugin,DebugAll,"ToneSource::~ToneSource() [%p] total=%u stamp=%lu",this,m_total,timeStamp()); Debug(&__plugin,DebugAll,"ToneSource::~ToneSource() [%p] total=%u stamp=%lu",this,m_total,timeStamp());
tones.remove(this,false);
stop(); stop();
if (m_time) { if (m_time) {
m_time = Time::now() - m_time; m_time = Time::now() - m_time;
@ -410,6 +413,14 @@ ToneSource::~ToneSource()
} }
} }
void ToneSource::zeroRefs()
{
__plugin.lock();
tones.remove(this,false);
__plugin.unlock();
ThreadedSource::zeroRefs();
}
bool ToneSource::startup() bool ToneSource::startup()
{ {
return m_tone && start("ToneSource"); return m_tone && start("ToneSource");
@ -553,6 +564,7 @@ void ToneSource::run()
m_time = 0; m_time = 0;
} }
TempSource::TempSource(String& desc) TempSource::TempSource(String& desc)
: m_single(0) : m_single(0)
{ {
@ -604,6 +616,7 @@ void TempSource::cleanup()
deref(); deref();
} }
ToneChan::ToneChan(String& tone) ToneChan::ToneChan(String& tone)
: Channel(__plugin) : Channel(__plugin)
{ {
@ -626,58 +639,6 @@ ToneChan::~ToneChan()
Debug(this,DebugAll,"ToneChan::~ToneChan() %s [%p]",id().c_str(),this); Debug(this,DebugAll,"ToneChan::~ToneChan() %s [%p]",id().c_str(),this);
} }
bool ToneGenDriver::msgExecute(Message& msg, String& dest)
{
CallEndpoint* ch = static_cast<CallEndpoint*>(msg.userData());
if (ch) {
ToneChan *tc = new ToneChan(dest);
if (ch->connect(tc,msg.getValue("reason"))) {
msg.setParam("peerid",tc->id());
tc->deref();
}
else {
tc->destruct();
return false;
}
}
else {
Message m("call.route");
m.addParam("module",name());
String callto(msg.getValue("direct"));
if (callto.null()) {
const char *targ = msg.getValue("target");
if (!targ) {
Debug(DebugWarn,"Tone outgoing call with no target!");
return false;
}
callto = msg.getValue("caller");
if (callto.null())
callto << prefix() << dest;
m.addParam("called",targ);
m.addParam("caller",callto);
if (!Engine::dispatch(m)) {
Debug(DebugWarn,"Tone outgoing call but no route!");
return false;
}
callto = m.retValue();
m.retValue().clear();
}
m = "call.execute";
m.addParam("callto",callto);
ToneChan *tc = new ToneChan(dest);
m.setParam("id",tc->id());
m.userData(tc);
if (Engine::dispatch(m)) {
msg.setParam("id",tc->id());
tc->deref();
return true;
}
Debug(DebugWarn,"Tone outgoing call not accepted!");
tc->destruct();
return false;
}
return true;
}
bool AttachHandler::received(Message& msg) bool AttachHandler::received(Message& msg)
{ {
@ -737,6 +698,60 @@ bool AttachHandler::received(Message& msg)
return ret; return ret;
} }
bool ToneGenDriver::msgExecute(Message& msg, String& dest)
{
CallEndpoint* ch = static_cast<CallEndpoint*>(msg.userData());
if (ch) {
ToneChan *tc = new ToneChan(dest);
if (ch->connect(tc,msg.getValue("reason"))) {
msg.setParam("peerid",tc->id());
tc->deref();
}
else {
tc->destruct();
return false;
}
}
else {
Message m("call.route");
m.addParam("module",name());
String callto(msg.getValue("direct"));
if (callto.null()) {
const char *targ = msg.getValue("target");
if (!targ) {
Debug(DebugWarn,"Tone outgoing call with no target!");
return false;
}
callto = msg.getValue("caller");
if (callto.null())
callto << prefix() << dest;
m.addParam("called",targ);
m.addParam("caller",callto);
if (!Engine::dispatch(m)) {
Debug(DebugWarn,"Tone outgoing call but no route!");
return false;
}
callto = m.retValue();
m.retValue().clear();
}
m = "call.execute";
m.addParam("callto",callto);
ToneChan *tc = new ToneChan(dest);
m.setParam("id",tc->id());
m.userData(tc);
if (Engine::dispatch(m)) {
msg.setParam("id",tc->id());
tc->deref();
return true;
}
Debug(DebugWarn,"Tone outgoing call not accepted!");
tc->destruct();
return false;
}
return true;
}
void ToneGenDriver::statusModule(String& str) void ToneGenDriver::statusModule(String& str)
{ {
Module::statusModule(str); Module::statusModule(str);

View File

@ -125,6 +125,7 @@ private:
INIT_PLUGIN(WaveFileDriver); INIT_PLUGIN(WaveFileDriver);
WaveSource::WaveSource(const String& file, CallEndpoint* chan, bool autoclose, bool autorepeat) WaveSource::WaveSource(const String& file, CallEndpoint* chan, bool autoclose, bool autorepeat)
: m_chan(chan), m_fd(-1), m_swap(false), m_brate(0), m_repeatPos(-1), : m_chan(chan), m_fd(-1), m_swap(false), m_brate(0), m_repeatPos(-1),
m_total(0), m_time(0), m_autoclose(autoclose), m_autoclean(false), m_total(0), m_time(0), m_autoclose(autoclose), m_autoclean(false),
@ -166,6 +167,7 @@ WaveSource::WaveSource(const String& file, CallEndpoint* chan, bool autoclose, b
if (computeDataRate()) { if (computeDataRate()) {
if (autorepeat) if (autorepeat)
m_repeatPos = ::lseek(m_fd,0,SEEK_CUR); m_repeatPos = ::lseek(m_fd,0,SEEK_CUR);
asyncDelete(true);
start("WaveSource"); start("WaveSource");
} }
else { else {
@ -335,13 +337,16 @@ void WaveSource::run()
void WaveSource::cleanup() void WaveSource::cleanup()
{ {
Debug(&__plugin,DebugAll,"WaveSource [%p] cleanup, total=%u",this,m_total); Debug(&__plugin,DebugAll,"WaveSource cleanup, total=%u [%p]",m_total,this);
if (m_autoclean) { if (m_autoclean) {
asyncDelete(false);
if (m_chan && (m_chan->getSource() == this)) if (m_chan && (m_chan->getSource() == this))
m_chan->setSource(); m_chan->setSource();
else else
deref(); deref();
return;
} }
ThreadedSource::cleanup();
} }
void WaveSource::setNotify(const String& id) void WaveSource::setNotify(const String& id)
@ -362,6 +367,7 @@ bool WaveSource::notify(DataSource* source, const char* reason)
return false; return false;
} }
WaveConsumer::WaveConsumer(const String& file, CallEndpoint* chan, unsigned maxlen) WaveConsumer::WaveConsumer(const String& file, CallEndpoint* chan, unsigned maxlen)
: m_chan(chan), m_fd(-1), m_total(0), m_maxlen(maxlen), m_time(0) : m_chan(chan), m_fd(-1), m_total(0), m_maxlen(maxlen), m_time(0)
{ {
@ -426,6 +432,7 @@ void WaveConsumer::Consume(const DataBlock& data, unsigned long tStamp)
} }
} }
Disconnector::Disconnector(CallEndpoint* chan, const String& id, DataSource* source, bool disc, const char* reason) Disconnector::Disconnector(CallEndpoint* chan, const String& id, DataSource* source, bool disc, const char* reason)
: m_chan(chan), m_msg(0), m_source(source), m_disc(disc) : m_chan(chan), m_msg(0), m_source(source), m_disc(disc)
{ {
@ -482,6 +489,7 @@ void Disconnector::run()
} }
} }
WaveChan::WaveChan(const String& file, bool record, unsigned maxlen, bool autorepeat) WaveChan::WaveChan(const String& file, bool record, unsigned maxlen, bool autorepeat)
: Channel(__plugin) : Channel(__plugin)
{ {
@ -501,6 +509,7 @@ WaveChan::~WaveChan()
Debug(this,DebugAll,"WaveChan::~WaveChan() %s [%p]",id().c_str(),this); Debug(this,DebugAll,"WaveChan::~WaveChan() %s [%p]",id().c_str(),this);
} }
bool AttachHandler::received(Message &msg) bool AttachHandler::received(Message &msg)
{ {
int more = 3; int more = 3;
@ -622,6 +631,7 @@ bool AttachHandler::received(Message &msg)
return ret && !more; return ret && !more;
} }
bool RecordHandler::received(Message &msg) bool RecordHandler::received(Message &msg)
{ {
int more = 2; int more = 2;
@ -703,6 +713,7 @@ bool RecordHandler::received(Message &msg)
return !more; return !more;
} }
bool WaveFileDriver::msgExecute(Message& msg, String& dest) bool WaveFileDriver::msgExecute(Message& msg, String& dest)
{ {
Regexp r("^\\([^/]*\\)/\\(.*\\)$"); Regexp r("^\\([^/]*\\)/\\(.*\\)$");

View File

@ -545,13 +545,33 @@ public:
*/ */
Thread* thread() const; Thread* thread() const;
/**
* Check if the data thread is running
* @return True if the data thread was started and is running
*/
bool running() const;
/**
* Get the current status of the asynchronous deletion flag
*/
inline bool asyncDelete() const
{ return m_asyncDelete; }
protected: protected:
/** /**
* Threaded Source constructor * Threaded Source constructor
* @param format Name of the data format, default "slin" (Signed Linear) * @param format Name of the data format, default "slin" (Signed Linear)
*/ */
inline ThreadedSource(const char* format = "slin") inline ThreadedSource(const char* format = "slin")
: DataSource(format), m_thread(0) { } : DataSource(format), m_thread(0), m_asyncDelete(false)
{ }
/**
* Derived classes should call this method to let the source to be
* destroyed asynchronously in the data thread
*/
inline void asyncDelete(bool async)
{ m_asyncDelete = async; }
/** /**
* The worker method. You have to reimplement it as you need * The worker method. You have to reimplement it as you need
@ -559,12 +579,20 @@ protected:
virtual void run() = 0; virtual void run() = 0;
/** /**
* The cleanup after thread method * The cleanup after thread method, deletes the source if already
* dereferenced and set for asynchronous deletion
*/ */
virtual void cleanup(); virtual void cleanup();
/**
* Override so destruction can be delayed after all references were lost
* to let the data pumping thread end normally
*/
virtual void zeroRefs();
private: private:
ThreadedSourcePrivate* m_thread; ThreadedSourcePrivate* m_thread;
bool m_asyncDelete;
}; };
/** /**