Changed the way ExtModReceivers are (de)referenced and destroyed.

Fixes destruction of external channels that have or had a watcher.


git-svn-id: http://yate.null.ro/svn/yate/trunk@6105 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
paulc 2016-03-18 10:17:55 +00:00
parent 648123eb0a
commit d3ba52e3e0
1 changed files with 38 additions and 26 deletions

View File

@ -60,7 +60,7 @@ static Configuration s_cfg;
static ObjList s_chans;
static ObjList s_modules;
static Mutex s_mutex(true,"ExtModule");
static Mutex s_watchMutex(false,"ExtWatchers");
static Mutex s_uses(false,"ExtModUse");
static int s_waitFlush = WAIT_FLUSH;
static int s_timeout = MSG_TIMEOUT;
static bool s_timebomb = false;
@ -195,14 +195,13 @@ public:
inline MsgWatcher(ExtModReceiver* receiver)
: m_receiver(receiver)
{ }
virtual void destruct();
virtual void dispatched(const Message& msg, bool handled);
bool addWatched(const String& name);
bool delWatched(const String& name);
void clear();
protected:
virtual void destroyed();
private:
void clear();
ExtModReceiver* m_receiver;
ObjList m_watched;
};
@ -221,7 +220,7 @@ public:
static ExtModReceiver* build(const char* name, Stream* io, ExtModChan* chan = 0,
int role = RoleUnknown, const char* conn = 0);
static ExtModReceiver* find(const String& script);
~ExtModReceiver();
virtual void destruct();
virtual bool received(Message& msg, int id);
bool processLine(const char* line);
bool outputLine(const char* line);
@ -234,6 +233,7 @@ public:
void cleanup();
bool flush();
void die(bool clearChan = true);
bool useUnlocked();
bool use();
bool unuse();
inline const String& scriptFile() const
@ -245,7 +245,7 @@ public:
inline void setRestart(bool restart)
{ m_restart = restart; }
inline bool dead() const
{ return m_dead; }
{ return m_dead || m_quit || (m_use <= 0); }
void describe(String& rval) const;
private:
@ -642,12 +642,6 @@ void ExtMessage::dispatched(bool accepted)
}
void MsgWatcher::destruct()
{
clear();
MessagePostHook::destruct();
}
void MsgWatcher::destroyed()
{
clear();
@ -656,12 +650,14 @@ void MsgWatcher::destroyed()
void MsgWatcher::dispatched(const Message& msg, bool handled)
{
Lock mylock(s_watchMutex);
Lock lock(s_uses);
ExtModReceiver* recv = m_receiver;
Lock lock(recv);
if (!(recv && recv->use()))
if (!recv || recv->dead() || (recv->m_watcher != this) || !recv->useUnlocked())
return;
mylock.drop();
if (!lock.acquire(recv)) {
recv->unuse();
return;
}
if (!recv->selfWatch()) {
// check if the message was generated by ourselves - avoid reentrance
@ -710,10 +706,14 @@ bool MsgWatcher::delWatched(const String& name)
void MsgWatcher::clear()
{
Engine::self()->setHook(this,true);
if (!m_receiver)
return;
s_uses.lock();
ExtModReceiver* recv = m_receiver;
m_receiver = 0;
if (recv && (recv->m_watcher == this))
recv->m_watcher = 0;
s_uses.unlock();
}
@ -751,21 +751,29 @@ ExtModReceiver* ExtModReceiver::find(const String& script)
return 0;
}
bool ExtModReceiver::useUnlocked()
{
if (m_use <= 0)
return false;
++m_use;
return true;
}
bool ExtModReceiver::use()
{
lock();
s_uses.lock();
bool ok = (m_use > 0);
if (ok)
++m_use;
unlock();
s_uses.unlock();
return ok;
}
bool ExtModReceiver::unuse()
{
lock();
s_uses.lock();
int u = --m_use;
unlock();
s_uses.unlock();
if (!u)
destruct();
return (u <= 0);
@ -808,9 +816,9 @@ ExtModReceiver::ExtModReceiver(const char* name, Stream* io, ExtModChan* chan, i
s_mutex.unlock();
}
ExtModReceiver::~ExtModReceiver()
void ExtModReceiver::destruct()
{
Debug(DebugAll,"ExtModReceiver::~ExtModReceiver() pid=%d [%p]",m_pid,this);
Debug(DebugAll,"ExtModReceiver::destruct() pid=%d [%p]",m_pid,this);
Lock lock(this);
// One destruction is plenty enough
m_use = -1;
@ -819,7 +827,7 @@ ExtModReceiver::~ExtModReceiver()
s_mutex.unlock();
die();
if (m_pid > 1)
Debug(DebugWarn,"ExtModReceiver::~ExtModReceiver() pid=%d [%p]",m_pid,this);
Debug(DebugWarn,"ExtModReceiver::destruct() pid=%d [%p]",m_pid,this);
closeAudio();
Stream* tmp = m_in;
m_in = 0;
@ -875,11 +883,15 @@ bool ExtModReceiver::start()
bool ExtModReceiver::flush()
{
s_watchMutex.lock();
lock();
bool needWait = (0 != m_watcher);
TelEngine::destruct(m_watcher);
s_watchMutex.unlock();
MsgWatcher* w = m_watcher;
m_watcher = 0;
bool needWait = !!w;
if (w) {
w->clear();
Thread::yield();
TelEngine::destruct(w);
}
// Make sure we release all pending messages and not accept new ones
needWait = needWait || (m_relays.count() != 0);
if (s_pluginSafe)