diff --git a/modules/extmodule.cpp b/modules/extmodule.cpp index c4f8b5b1..8a62255e 100644 --- a/modules/extmodule.cpp +++ b/modules/extmodule.cpp @@ -60,7 +60,7 @@ static Configuration s_cfg; static ObjList s_chans; static ObjList s_modules; static Mutex s_mutex(true,"ExtModule"); -static Mutex s_watchMutex(false,"ExtWatchers"); +static Mutex s_uses(false,"ExtModUse"); static int s_waitFlush = WAIT_FLUSH; static int s_timeout = MSG_TIMEOUT; static bool s_timebomb = false; @@ -195,14 +195,13 @@ public: inline MsgWatcher(ExtModReceiver* receiver) : m_receiver(receiver) { } - virtual void destruct(); virtual void dispatched(const Message& msg, bool handled); bool addWatched(const String& name); bool delWatched(const String& name); + void clear(); protected: virtual void destroyed(); private: - void clear(); ExtModReceiver* m_receiver; ObjList m_watched; }; @@ -221,7 +220,7 @@ public: static ExtModReceiver* build(const char* name, Stream* io, ExtModChan* chan = 0, int role = RoleUnknown, const char* conn = 0); static ExtModReceiver* find(const String& script); - ~ExtModReceiver(); + virtual void destruct(); virtual bool received(Message& msg, int id); bool processLine(const char* line); bool outputLine(const char* line); @@ -234,6 +233,7 @@ public: void cleanup(); bool flush(); void die(bool clearChan = true); + bool useUnlocked(); bool use(); bool unuse(); inline const String& scriptFile() const @@ -245,7 +245,7 @@ public: inline void setRestart(bool restart) { m_restart = restart; } inline bool dead() const - { return m_dead; } + { return m_dead || m_quit || (m_use <= 0); } void describe(String& rval) const; private: @@ -642,12 +642,6 @@ void ExtMessage::dispatched(bool accepted) } -void MsgWatcher::destruct() -{ - clear(); - MessagePostHook::destruct(); -} - void MsgWatcher::destroyed() { clear(); @@ -656,12 +650,14 @@ void MsgWatcher::destroyed() void MsgWatcher::dispatched(const Message& msg, bool handled) { - Lock mylock(s_watchMutex); + Lock lock(s_uses); ExtModReceiver* recv = m_receiver; - Lock lock(recv); - if (!(recv && recv->use())) + if (!recv || recv->dead() || (recv->m_watcher != this) || !recv->useUnlocked()) return; - mylock.drop(); + if (!lock.acquire(recv)) { + recv->unuse(); + return; + } if (!recv->selfWatch()) { // check if the message was generated by ourselves - avoid reentrance @@ -710,10 +706,14 @@ bool MsgWatcher::delWatched(const String& name) void MsgWatcher::clear() { Engine::self()->setHook(this,true); + if (!m_receiver) + return; + s_uses.lock(); ExtModReceiver* recv = m_receiver; m_receiver = 0; if (recv && (recv->m_watcher == this)) recv->m_watcher = 0; + s_uses.unlock(); } @@ -751,21 +751,29 @@ ExtModReceiver* ExtModReceiver::find(const String& script) return 0; } +bool ExtModReceiver::useUnlocked() +{ + if (m_use <= 0) + return false; + ++m_use; + return true; +} + bool ExtModReceiver::use() { - lock(); + s_uses.lock(); bool ok = (m_use > 0); if (ok) ++m_use; - unlock(); + s_uses.unlock(); return ok; } bool ExtModReceiver::unuse() { - lock(); + s_uses.lock(); int u = --m_use; - unlock(); + s_uses.unlock(); if (!u) destruct(); return (u <= 0); @@ -808,9 +816,9 @@ ExtModReceiver::ExtModReceiver(const char* name, Stream* io, ExtModChan* chan, i s_mutex.unlock(); } -ExtModReceiver::~ExtModReceiver() +void ExtModReceiver::destruct() { - Debug(DebugAll,"ExtModReceiver::~ExtModReceiver() pid=%d [%p]",m_pid,this); + Debug(DebugAll,"ExtModReceiver::destruct() pid=%d [%p]",m_pid,this); Lock lock(this); // One destruction is plenty enough m_use = -1; @@ -819,7 +827,7 @@ ExtModReceiver::~ExtModReceiver() s_mutex.unlock(); die(); if (m_pid > 1) - Debug(DebugWarn,"ExtModReceiver::~ExtModReceiver() pid=%d [%p]",m_pid,this); + Debug(DebugWarn,"ExtModReceiver::destruct() pid=%d [%p]",m_pid,this); closeAudio(); Stream* tmp = m_in; m_in = 0; @@ -875,11 +883,15 @@ bool ExtModReceiver::start() bool ExtModReceiver::flush() { - s_watchMutex.lock(); lock(); - bool needWait = (0 != m_watcher); - TelEngine::destruct(m_watcher); - s_watchMutex.unlock(); + MsgWatcher* w = m_watcher; + m_watcher = 0; + bool needWait = !!w; + if (w) { + w->clear(); + Thread::yield(); + TelEngine::destruct(w); + } // Make sure we release all pending messages and not accept new ones needWait = needWait || (m_relays.count() != 0); if (s_pluginSafe)