From b6558d5271cea9459d5eeaa290df3eb77a13a802 Mon Sep 17 00:00:00 2001 From: paulc Date: Fri, 24 Apr 2009 11:18:35 +0000 Subject: [PATCH] Stdio pipe or socket operation are executed with the mutex locked. Wait a configurable time after flushing relays and pending messages. git-svn-id: http://voip.null.ro/svn/yate@2593 acf43c95-373e-0410-b603-e72c3f656dc1 --- conf.d/extmodule.conf.sample | 4 ++ modules/extmodule.cpp | 113 +++++++++++++++++++++++++---------- 2 files changed, 86 insertions(+), 31 deletions(-) diff --git a/conf.d/extmodule.conf.sample b/conf.d/extmodule.conf.sample index 453c98c3..03386ffa 100644 --- a/conf.d/extmodule.conf.sample +++ b/conf.d/extmodule.conf.sample @@ -16,6 +16,10 @@ ; timebomb: bool: Kill the module instance if it timed out ;timebomb=false +; waitflush: int: Milliseconds to wait at script shutdown after waiting messages +; and message relays are flushed, valid range 1-100 ms +;waitflush=5 + ;[listener sample] ; For each socket listener there should be a section starting with the diff --git a/modules/extmodule.cpp b/modules/extmodule.cpp index b36b3f3c..de48a255 100644 --- a/modules/extmodule.cpp +++ b/modules/extmodule.cpp @@ -52,12 +52,17 @@ namespace { // anonymous // Default message timeout in milliseconds #define MSG_TIMEOUT 10000 +// Safety wait time after we flushed watchers, relays or messages (in ms) +#define WAIT_FLUSH 5 + static Configuration s_cfg; static ObjList s_chans; static ObjList s_modules; static Mutex s_mutex(true); +static int s_waitFlush = WAIT_FLUSH; static int s_timeout = MSG_TIMEOUT; static bool s_timebomb = false; +static bool s_pluginSafe = true; static const char* s_cmds[] = { "info", @@ -730,10 +735,12 @@ void ExtModReceiver::closeIn() { if (!m_in) return; + lock(); Stream* tmp = m_in; m_in = 0; if (m_out == tmp) m_out = 0; + unlock(); if (tmp) delete tmp; } @@ -742,10 +749,12 @@ void ExtModReceiver::closeOut() { if (!m_out) return; + lock(); Stream* tmp = m_out; m_out = 0; if (m_in == tmp) m_in = 0; + unlock(); if (tmp) delete tmp; } @@ -777,28 +786,35 @@ bool ExtModReceiver::start() bool ExtModReceiver::flush() { + lock(); + bool needWait = (0 != m_watcher); TelEngine::destruct(m_watcher); // Make sure we release all pending messages and not accept new ones - bool relays = m_relays.count() != 0; - if (!Engine::exiting()) + needWait = needWait || (m_relays.count() != 0); + if (s_pluginSafe) m_relays.clear(); else { ObjList *p = &m_relays; for (; p; p=p->next()) p->setDelete(false); } - if (relays) - Thread::yield(); - Lock lock(this); + bool flushed = false; if (m_waiting.get()) { Debug(DebugInfo,"ExtModReceiver releasing %u pending messages [%p]", m_waiting.count(),this); m_waiting.clear(); - lock.drop(); - Thread::yield(); - return true; + needWait = flushed = true; } - return false; + unlock(); + if (needWait && s_pluginSafe) { + int ms = s_waitFlush; + // During shutdown longer delays are not acceptable + if ((ms > WAIT_FLUSH) && Engine::exiting()) + ms = WAIT_FLUSH; + DDebug(DebugAll,"ExtModReceiver sleeping %d ms [%p]",ms,this); + Thread::msleep(ms); + } + return flushed; } void ExtModReceiver::die(bool clearChan) @@ -810,16 +826,19 @@ void ExtModReceiver::die(bool clearChan) Debug(DebugAll,"ExtModReceiver::die() pid=%d dead=%s [%p]", m_pid,m_dead ? "yes" : "no",this); #endif + if (m_dead) + return; + Lock mylock(this); if (m_dead) return; m_dead = true; use(); - flush(); - ExtModChan *chan = m_chan; + RefPointer chan = m_chan; m_chan = 0; if (chan) chan->setRecv(0); + mylock.drop(); // Give the external script a chance to die gracefully closeOut(); @@ -834,8 +853,10 @@ void ExtModReceiver::die(bool clearChan) if (m_pid > 1) Debug(DebugInfo,"ExtModReceiver::die() pid=%d did not exit? [%p]",m_pid,this); - // Now terminate the process and close its stdout pipe + // Close the stdout pipe before terminating the process closeIn(); + // Release relays and messages since no confirmation can be received anymore + flush(); #ifndef _WINDOWS if (m_pid > 1) ::kill(m_pid,SIGTERM); @@ -855,7 +876,7 @@ bool ExtModReceiver::received(Message &msg, int id) return false; lock(); // check if we are no longer running - bool ok = (m_pid > 0) && m_in && m_out; + bool ok = (m_pid > 0) && m_in && m_out && !m_dead; if (ok && !m_reenter) { // check if the message was generated by ourselves - avoid reentrance ExtMessage* m = YOBJECT(ExtMessage,&msg); @@ -1027,29 +1048,31 @@ void ExtModReceiver::run() m_pid = 0; return; } - if (m_in) - m_in->setBlocking(false); + if (m_in && !m_in->setBlocking(false)) + Debug("ExtModule",DebugWarn,"Failed to set nonblocking mode, expect trouble [%p]",this); char buffer[MAX_INCOMING_LINE]; int posinbuf = 0; DDebug(DebugAll,"ExtModReceiver::run() entering loop [%p]",this); for (;;) { use(); + lock(); int readsize = m_in ? m_in->readData(buffer+posinbuf,sizeof(buffer)-posinbuf-1) : 0; + unlock(); if (unuse()) return; if (!readsize) { - lock(); if (m_in) Debug("ExtModule",DebugInfo,"Read EOF on %p [%p]",m_in,this); closeIn(); flush(); - unlock(); if (m_chan && m_chan->running()) Thread::sleep(1); break; } else if (readsize < 0) { + Lock mylock(this); if (m_in && m_in->canRetry()) { + mylock.drop(); Thread::msleep(5); continue; } @@ -1083,37 +1106,50 @@ void ExtModReceiver::run() bool ExtModReceiver::outputLine(const char* line) { + lock(); DDebug("ExtModReceiver",DebugAll,"%soutputLine '%s'", - (m_out ? "" : "failing "), line); - if (m_dead || !m_out) + ((m_out && !m_dead) ? "" : "failing "), line); + if (m_dead || !m_out) { + unlock(); return false; + } // since m_out can be non-blocking (the socket) we have to loop int len = ::strlen(line); while (m_out && (len > 0) && !m_dead) { int w = m_out->writeData(line,len); if (w < 0) { - if (m_dead || !m_out || !m_out->canRetry()) + if (m_dead || !m_out || !m_out->canRetry()) { + unlock(); return false; + } } else { line += w; len -= w; } - if (len > 0) + if (len > 0) { + unlock(); Thread::yield(); + lock(); + } } char nl = '\n'; - while (m_out && !m_dead) { + for (;;) { + if (m_dead || !m_out) { + unlock(); + return false; + } int w = m_out->writeData(&nl,1); - if (m_dead || !m_out) - return false; - if ((w < 0) && !m_out->canRetry()) - return false; + if ((w < 0) && m_out->canRetry()) + w = 0; + unlock(); if (w > 0) return true; + if (w < 0) + return false; Thread::yield(); + lock(); } - return false; } void ExtModReceiver::reportError(const char* line) @@ -1125,14 +1161,14 @@ void ExtModReceiver::reportError(const char* line) void ExtModReceiver::returnMsg(const Message* msg, const char* id, bool accepted) { String ret(msg->encode(accepted,id)); - lock(); outputLine(ret); - unlock(); } bool ExtModReceiver::addWatched(const String& name) { Lock mylock(this); + if (m_dead) + return false; if (!m_watcher) { m_watcher = new MsgWatcher(this); Engine::self()->setHook(m_watcher); @@ -1143,6 +1179,8 @@ bool ExtModReceiver::addWatched(const String& name) bool ExtModReceiver::delWatched(const String& name) { Lock mylock(this); + if (m_dead) + return false; return m_watcher && m_watcher->delWatched(name); } @@ -1210,7 +1248,6 @@ bool ExtModReceiver::processLine(const char* line) else if (id.startSkip("%%>install:",false)) { int prio = 100; id >> prio >> ":"; - bool ok = true; String fname; String fvalue; Regexp r("^\\([^:]*\\):\\([^:]*\\):\\?\\(.*\\)"); @@ -1221,7 +1258,8 @@ bool ExtModReceiver::processLine(const char* line) id = id.matchString(1); } // sanity checks - ok = ok && id && !m_relays.find(id); + lock(); + bool ok = id && !m_dead && !m_relays.find(id); if (ok) { MessageRelay *r = new MessageRelay(id,this,0,prio); if (fname) @@ -1229,6 +1267,7 @@ bool ExtModReceiver::processLine(const char* line) m_relays.append(r); Engine::install(r); } + unlock(); if (debugAt(DebugAll)) { String tmp; if (fname) @@ -1245,6 +1284,7 @@ bool ExtModReceiver::processLine(const char* line) else if (id.startSkip("%%>uninstall:",false)) { int prio = 0; bool ok = false; + lock(); ObjList *p = &m_relays; for (; p; p=p->next()) { MessageRelay *r = static_cast(p->get()); @@ -1255,6 +1295,7 @@ bool ExtModReceiver::processLine(const char* line) break; } } + unlock(); Debug("ExtModReceiver",DebugAll,"Uninstall '%s' %s", id.c_str(),ok ? "ok" : "failed"); String out("%%id(); @@ -1675,6 +1719,7 @@ ExtModulePlugin::~ExtModulePlugin() { Output("Unloading module ExtModule"); s_mutex.lock(); + s_pluginSafe = false; s_modules.clear(); // the receivers destroyed above should also clear chans but better be sure s_chans.clear(); @@ -1694,6 +1739,12 @@ void ExtModulePlugin::initialize() s_cfg.load(); s_timeout = s_cfg.getIntValue("general","timeout",MSG_TIMEOUT); s_timebomb = s_cfg.getBoolValue("general","timebomb",false); + int wf = s_cfg.getIntValue("general","waitflush",WAIT_FLUSH); + if (wf < 1) + wf = 1; + else if (wf > 100) + wf = 100; + s_waitFlush = wf; if (!m_handler) { m_handler = new ExtModHandler("call.execute",s_cfg.getIntValue("general","priority",100)); Engine::install(m_handler);