Stdio pipe or socket operation are executed with the mutex locked.

Wait a configurable time after flushing relays and pending messages.


git-svn-id: http://voip.null.ro/svn/yate@2593 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
paulc 2009-04-24 11:18:35 +00:00
parent 5814bc741c
commit b6558d5271
2 changed files with 86 additions and 31 deletions

View File

@ -16,6 +16,10 @@
; timebomb: bool: Kill the module instance if it timed out
;timebomb=false
; waitflush: int: Milliseconds to wait at script shutdown after waiting messages
; and message relays are flushed, valid range 1-100 ms
;waitflush=5
;[listener sample]
; For each socket listener there should be a section starting with the

View File

@ -52,12 +52,17 @@ namespace { // anonymous
// Default message timeout in milliseconds
#define MSG_TIMEOUT 10000
// Safety wait time after we flushed watchers, relays or messages (in ms)
#define WAIT_FLUSH 5
static Configuration s_cfg;
static ObjList s_chans;
static ObjList s_modules;
static Mutex s_mutex(true);
static int s_waitFlush = WAIT_FLUSH;
static int s_timeout = MSG_TIMEOUT;
static bool s_timebomb = false;
static bool s_pluginSafe = true;
static const char* s_cmds[] = {
"info",
@ -730,10 +735,12 @@ void ExtModReceiver::closeIn()
{
if (!m_in)
return;
lock();
Stream* tmp = m_in;
m_in = 0;
if (m_out == tmp)
m_out = 0;
unlock();
if (tmp)
delete tmp;
}
@ -742,10 +749,12 @@ void ExtModReceiver::closeOut()
{
if (!m_out)
return;
lock();
Stream* tmp = m_out;
m_out = 0;
if (m_in == tmp)
m_in = 0;
unlock();
if (tmp)
delete tmp;
}
@ -777,28 +786,35 @@ bool ExtModReceiver::start()
bool ExtModReceiver::flush()
{
lock();
bool needWait = (0 != m_watcher);
TelEngine::destruct(m_watcher);
// Make sure we release all pending messages and not accept new ones
bool relays = m_relays.count() != 0;
if (!Engine::exiting())
needWait = needWait || (m_relays.count() != 0);
if (s_pluginSafe)
m_relays.clear();
else {
ObjList *p = &m_relays;
for (; p; p=p->next())
p->setDelete(false);
}
if (relays)
Thread::yield();
Lock lock(this);
bool flushed = false;
if (m_waiting.get()) {
Debug(DebugInfo,"ExtModReceiver releasing %u pending messages [%p]",
m_waiting.count(),this);
m_waiting.clear();
lock.drop();
Thread::yield();
return true;
needWait = flushed = true;
}
return false;
unlock();
if (needWait && s_pluginSafe) {
int ms = s_waitFlush;
// During shutdown longer delays are not acceptable
if ((ms > WAIT_FLUSH) && Engine::exiting())
ms = WAIT_FLUSH;
DDebug(DebugAll,"ExtModReceiver sleeping %d ms [%p]",ms,this);
Thread::msleep(ms);
}
return flushed;
}
void ExtModReceiver::die(bool clearChan)
@ -810,16 +826,19 @@ void ExtModReceiver::die(bool clearChan)
Debug(DebugAll,"ExtModReceiver::die() pid=%d dead=%s [%p]",
m_pid,m_dead ? "yes" : "no",this);
#endif
if (m_dead)
return;
Lock mylock(this);
if (m_dead)
return;
m_dead = true;
use();
flush();
ExtModChan *chan = m_chan;
RefPointer<ExtModChan> chan = m_chan;
m_chan = 0;
if (chan)
chan->setRecv(0);
mylock.drop();
// Give the external script a chance to die gracefully
closeOut();
@ -834,8 +853,10 @@ void ExtModReceiver::die(bool clearChan)
if (m_pid > 1)
Debug(DebugInfo,"ExtModReceiver::die() pid=%d did not exit? [%p]",m_pid,this);
// Now terminate the process and close its stdout pipe
// Close the stdout pipe before terminating the process
closeIn();
// Release relays and messages since no confirmation can be received anymore
flush();
#ifndef _WINDOWS
if (m_pid > 1)
::kill(m_pid,SIGTERM);
@ -855,7 +876,7 @@ bool ExtModReceiver::received(Message &msg, int id)
return false;
lock();
// check if we are no longer running
bool ok = (m_pid > 0) && m_in && m_out;
bool ok = (m_pid > 0) && m_in && m_out && !m_dead;
if (ok && !m_reenter) {
// check if the message was generated by ourselves - avoid reentrance
ExtMessage* m = YOBJECT(ExtMessage,&msg);
@ -1027,29 +1048,31 @@ void ExtModReceiver::run()
m_pid = 0;
return;
}
if (m_in)
m_in->setBlocking(false);
if (m_in && !m_in->setBlocking(false))
Debug("ExtModule",DebugWarn,"Failed to set nonblocking mode, expect trouble [%p]",this);
char buffer[MAX_INCOMING_LINE];
int posinbuf = 0;
DDebug(DebugAll,"ExtModReceiver::run() entering loop [%p]",this);
for (;;) {
use();
lock();
int readsize = m_in ? m_in->readData(buffer+posinbuf,sizeof(buffer)-posinbuf-1) : 0;
unlock();
if (unuse())
return;
if (!readsize) {
lock();
if (m_in)
Debug("ExtModule",DebugInfo,"Read EOF on %p [%p]",m_in,this);
closeIn();
flush();
unlock();
if (m_chan && m_chan->running())
Thread::sleep(1);
break;
}
else if (readsize < 0) {
Lock mylock(this);
if (m_in && m_in->canRetry()) {
mylock.drop();
Thread::msleep(5);
continue;
}
@ -1083,37 +1106,50 @@ void ExtModReceiver::run()
bool ExtModReceiver::outputLine(const char* line)
{
lock();
DDebug("ExtModReceiver",DebugAll,"%soutputLine '%s'",
(m_out ? "" : "failing "), line);
if (m_dead || !m_out)
((m_out && !m_dead) ? "" : "failing "), line);
if (m_dead || !m_out) {
unlock();
return false;
}
// since m_out can be non-blocking (the socket) we have to loop
int len = ::strlen(line);
while (m_out && (len > 0) && !m_dead) {
int w = m_out->writeData(line,len);
if (w < 0) {
if (m_dead || !m_out || !m_out->canRetry())
if (m_dead || !m_out || !m_out->canRetry()) {
unlock();
return false;
}
}
else {
line += w;
len -= w;
}
if (len > 0)
if (len > 0) {
unlock();
Thread::yield();
lock();
}
}
char nl = '\n';
while (m_out && !m_dead) {
for (;;) {
if (m_dead || !m_out) {
unlock();
return false;
}
int w = m_out->writeData(&nl,1);
if (m_dead || !m_out)
return false;
if ((w < 0) && !m_out->canRetry())
return false;
if ((w < 0) && m_out->canRetry())
w = 0;
unlock();
if (w > 0)
return true;
if (w < 0)
return false;
Thread::yield();
lock();
}
return false;
}
void ExtModReceiver::reportError(const char* line)
@ -1125,14 +1161,14 @@ void ExtModReceiver::reportError(const char* line)
void ExtModReceiver::returnMsg(const Message* msg, const char* id, bool accepted)
{
String ret(msg->encode(accepted,id));
lock();
outputLine(ret);
unlock();
}
bool ExtModReceiver::addWatched(const String& name)
{
Lock mylock(this);
if (m_dead)
return false;
if (!m_watcher) {
m_watcher = new MsgWatcher(this);
Engine::self()->setHook(m_watcher);
@ -1143,6 +1179,8 @@ bool ExtModReceiver::addWatched(const String& name)
bool ExtModReceiver::delWatched(const String& name)
{
Lock mylock(this);
if (m_dead)
return false;
return m_watcher && m_watcher->delWatched(name);
}
@ -1210,7 +1248,6 @@ bool ExtModReceiver::processLine(const char* line)
else if (id.startSkip("%%>install:",false)) {
int prio = 100;
id >> prio >> ":";
bool ok = true;
String fname;
String fvalue;
Regexp r("^\\([^:]*\\):\\([^:]*\\):\\?\\(.*\\)");
@ -1221,7 +1258,8 @@ bool ExtModReceiver::processLine(const char* line)
id = id.matchString(1);
}
// sanity checks
ok = ok && id && !m_relays.find(id);
lock();
bool ok = id && !m_dead && !m_relays.find(id);
if (ok) {
MessageRelay *r = new MessageRelay(id,this,0,prio);
if (fname)
@ -1229,6 +1267,7 @@ bool ExtModReceiver::processLine(const char* line)
m_relays.append(r);
Engine::install(r);
}
unlock();
if (debugAt(DebugAll)) {
String tmp;
if (fname)
@ -1245,6 +1284,7 @@ bool ExtModReceiver::processLine(const char* line)
else if (id.startSkip("%%>uninstall:",false)) {
int prio = 0;
bool ok = false;
lock();
ObjList *p = &m_relays;
for (; p; p=p->next()) {
MessageRelay *r = static_cast<MessageRelay *>(p->get());
@ -1255,6 +1295,7 @@ bool ExtModReceiver::processLine(const char* line)
break;
}
}
unlock();
Debug("ExtModReceiver",DebugAll,"Uninstall '%s' %s", id.c_str(),ok ? "ok" : "failed");
String out("%%<uninstall:");
out << prio << ":" << id << ":" << ok;
@ -1288,6 +1329,9 @@ bool ExtModReceiver::processLine(const char* line)
String val(id.substr(col+1));
id = id.substr(0,col);
bool ok = false;
Lock mylock(this);
if (m_dead)
return false;
if (m_chan && (id == "id")) {
if (val.null())
val = m_chan->id();
@ -1675,6 +1719,7 @@ ExtModulePlugin::~ExtModulePlugin()
{
Output("Unloading module ExtModule");
s_mutex.lock();
s_pluginSafe = false;
s_modules.clear();
// the receivers destroyed above should also clear chans but better be sure
s_chans.clear();
@ -1694,6 +1739,12 @@ void ExtModulePlugin::initialize()
s_cfg.load();
s_timeout = s_cfg.getIntValue("general","timeout",MSG_TIMEOUT);
s_timebomb = s_cfg.getBoolValue("general","timebomb",false);
int wf = s_cfg.getIntValue("general","waitflush",WAIT_FLUSH);
if (wf < 1)
wf = 1;
else if (wf > 100)
wf = 100;
s_waitFlush = wf;
if (!m_handler) {
m_handler = new ExtModHandler("call.execute",s_cfg.getIntValue("general","priority",100));
Engine::install(m_handler);