Fixed a race bug in in removing the data source.

git-svn-id: http://yate.null.ro/svn/yate/trunk@1065 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
paulc 2006-10-03 16:42:49 +00:00
parent 08403476e8
commit f47eaf8153
1 changed files with 15 additions and 10 deletions

View File

@ -87,14 +87,14 @@ public:
class Disconnector : public Thread class Disconnector : public Thread
{ {
public: public:
Disconnector(CallEndpoint* chan, const String& id, bool source, bool disc); Disconnector(CallEndpoint* chan, const String& id, DataSource* source, bool disc);
virtual ~Disconnector(); virtual ~Disconnector();
virtual void run(); virtual void run();
bool init(); bool init();
private: private:
RefPointer<CallEndpoint> m_chan; RefPointer<CallEndpoint> m_chan;
Message* m_msg; Message* m_msg;
bool m_source; DataSource* m_source;
bool m_disc; bool m_disc;
}; };
@ -276,6 +276,7 @@ void WaveSource::run()
if (!alive()) if (!alive())
return; return;
} }
DDebug(&__plugin,DebugAll,"Consumer found, starting to play data with rate %d [%p]",m_brate,this);
m_data.assign(0,(m_brate*20)/1000); m_data.assign(0,(m_brate*20)/1000);
// start counting time from now // start counting time from now
u_int64_t tpos = Time::now(); u_int64_t tpos = Time::now();
@ -304,13 +305,13 @@ void WaveSource::run()
Thread::usleep((unsigned long)dly); Thread::usleep((unsigned long)dly);
} }
if (!alive()) if (!alive())
break; return;
Forward(m_data,ts); Forward(m_data,ts);
ts += m_data.length()*8000/m_brate; ts += m_data.length()*8000/m_brate;
m_total += r; m_total += r;
tpos += (r*(u_int64_t)1000000/m_brate); tpos += (r*(u_int64_t)1000000/m_brate);
} while (r > 0); } while (r > 0);
Debug(&__plugin,DebugAll,"WaveSource [%p] end of data [%p] [%s] ",this,m_chan,m_id.c_str()); Debug(&__plugin,DebugAll,"WaveSource '%s' end of data (%u played) chan=%p [%p]",m_id.c_str(),m_total,m_chan,this);
// at cleanup time deref the data source if we start no disconnector thread // at cleanup time deref the data source if we start no disconnector thread
m_autoclean = !notify(); m_autoclean = !notify();
} }
@ -336,7 +337,7 @@ void WaveSource::setNotify(const String& id)
bool WaveSource::notify() bool WaveSource::notify()
{ {
if (m_id || m_autoclose) { if (m_id || m_autoclose) {
Disconnector *disc = new Disconnector(m_chan,m_id,true,m_autoclose); Disconnector *disc = new Disconnector(m_chan,m_id,this,m_autoclose);
return disc->init(); return disc->init();
} }
return false; return false;
@ -396,7 +397,7 @@ void WaveConsumer::Consume(const DataBlock& data, unsigned long tStamp)
m_fd = -1; m_fd = -1;
} }
if (m_chan) { if (m_chan) {
Disconnector *disc = new Disconnector(m_chan,m_id,false,false); Disconnector *disc = new Disconnector(m_chan,m_id,0,false);
m_chan = 0; m_chan = 0;
disc->init(); disc->init();
} }
@ -404,7 +405,7 @@ void WaveConsumer::Consume(const DataBlock& data, unsigned long tStamp)
} }
} }
Disconnector::Disconnector(CallEndpoint* chan, const String& id, bool source, bool disc) Disconnector::Disconnector(CallEndpoint* chan, const String& id, DataSource* source, bool disc)
: m_chan(chan), m_msg(0), m_source(source), m_disc(disc) : m_chan(chan), m_msg(0), m_source(source), m_disc(disc)
{ {
if (id) { if (id) {
@ -433,12 +434,16 @@ bool Disconnector::init()
void Disconnector::run() void Disconnector::run()
{ {
DDebug(&__plugin,DebugAll,"Disconnector::run() chan=%p msg=%p", DDebug(&__plugin,DebugAll,"Disconnector::run() chan=%p msg=%p source=%s disc=%s",
(void*)m_chan,m_msg); (void*)m_chan,m_msg,String::boolText(m_source),String::boolText(m_disc));
if (!m_chan) if (!m_chan)
return; return;
if (m_source) { if (m_source) {
if (m_chan->getSource() == m_source)
m_chan->setSource(); m_chan->setSource();
else
Debug(&__plugin,DebugMild,"Source %p in channel %p was replaced with %p",
m_source,(void*)m_chan,m_chan->getSource());
if (m_disc) if (m_disc)
m_chan->disconnect("eof"); m_chan->disconnect("eof");
} }