Stdio pipe or socket operation are executed with the mutex locked.
Wait a configurable time after flushing relays and pending messages. git-svn-id: http://yate.null.ro/svn/yate/trunk@2593 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
parent
bdc79c79c4
commit
b745abe0f5
|
@ -16,6 +16,10 @@
|
|||
; timebomb: bool: Kill the module instance if it timed out
|
||||
;timebomb=false
|
||||
|
||||
; waitflush: int: Milliseconds to wait at script shutdown after waiting messages
|
||||
; and message relays are flushed, valid range 1-100 ms
|
||||
;waitflush=5
|
||||
|
||||
|
||||
;[listener sample]
|
||||
; For each socket listener there should be a section starting with the
|
||||
|
|
|
@ -52,12 +52,17 @@ namespace { // anonymous
|
|||
// Default message timeout in milliseconds
|
||||
#define MSG_TIMEOUT 10000
|
||||
|
||||
// Safety wait time after we flushed watchers, relays or messages (in ms)
|
||||
#define WAIT_FLUSH 5
|
||||
|
||||
static Configuration s_cfg;
|
||||
static ObjList s_chans;
|
||||
static ObjList s_modules;
|
||||
static Mutex s_mutex(true);
|
||||
static int s_waitFlush = WAIT_FLUSH;
|
||||
static int s_timeout = MSG_TIMEOUT;
|
||||
static bool s_timebomb = false;
|
||||
static bool s_pluginSafe = true;
|
||||
|
||||
static const char* s_cmds[] = {
|
||||
"info",
|
||||
|
@ -730,10 +735,12 @@ void ExtModReceiver::closeIn()
|
|||
{
|
||||
if (!m_in)
|
||||
return;
|
||||
lock();
|
||||
Stream* tmp = m_in;
|
||||
m_in = 0;
|
||||
if (m_out == tmp)
|
||||
m_out = 0;
|
||||
unlock();
|
||||
if (tmp)
|
||||
delete tmp;
|
||||
}
|
||||
|
@ -742,10 +749,12 @@ void ExtModReceiver::closeOut()
|
|||
{
|
||||
if (!m_out)
|
||||
return;
|
||||
lock();
|
||||
Stream* tmp = m_out;
|
||||
m_out = 0;
|
||||
if (m_in == tmp)
|
||||
m_in = 0;
|
||||
unlock();
|
||||
if (tmp)
|
||||
delete tmp;
|
||||
}
|
||||
|
@ -777,28 +786,35 @@ bool ExtModReceiver::start()
|
|||
|
||||
bool ExtModReceiver::flush()
|
||||
{
|
||||
lock();
|
||||
bool needWait = (0 != m_watcher);
|
||||
TelEngine::destruct(m_watcher);
|
||||
// Make sure we release all pending messages and not accept new ones
|
||||
bool relays = m_relays.count() != 0;
|
||||
if (!Engine::exiting())
|
||||
needWait = needWait || (m_relays.count() != 0);
|
||||
if (s_pluginSafe)
|
||||
m_relays.clear();
|
||||
else {
|
||||
ObjList *p = &m_relays;
|
||||
for (; p; p=p->next())
|
||||
p->setDelete(false);
|
||||
}
|
||||
if (relays)
|
||||
Thread::yield();
|
||||
Lock lock(this);
|
||||
bool flushed = false;
|
||||
if (m_waiting.get()) {
|
||||
Debug(DebugInfo,"ExtModReceiver releasing %u pending messages [%p]",
|
||||
m_waiting.count(),this);
|
||||
m_waiting.clear();
|
||||
lock.drop();
|
||||
Thread::yield();
|
||||
return true;
|
||||
needWait = flushed = true;
|
||||
}
|
||||
return false;
|
||||
unlock();
|
||||
if (needWait && s_pluginSafe) {
|
||||
int ms = s_waitFlush;
|
||||
// During shutdown longer delays are not acceptable
|
||||
if ((ms > WAIT_FLUSH) && Engine::exiting())
|
||||
ms = WAIT_FLUSH;
|
||||
DDebug(DebugAll,"ExtModReceiver sleeping %d ms [%p]",ms,this);
|
||||
Thread::msleep(ms);
|
||||
}
|
||||
return flushed;
|
||||
}
|
||||
|
||||
void ExtModReceiver::die(bool clearChan)
|
||||
|
@ -810,16 +826,19 @@ void ExtModReceiver::die(bool clearChan)
|
|||
Debug(DebugAll,"ExtModReceiver::die() pid=%d dead=%s [%p]",
|
||||
m_pid,m_dead ? "yes" : "no",this);
|
||||
#endif
|
||||
if (m_dead)
|
||||
return;
|
||||
Lock mylock(this);
|
||||
if (m_dead)
|
||||
return;
|
||||
m_dead = true;
|
||||
use();
|
||||
flush();
|
||||
|
||||
ExtModChan *chan = m_chan;
|
||||
RefPointer<ExtModChan> chan = m_chan;
|
||||
m_chan = 0;
|
||||
if (chan)
|
||||
chan->setRecv(0);
|
||||
mylock.drop();
|
||||
|
||||
// Give the external script a chance to die gracefully
|
||||
closeOut();
|
||||
|
@ -834,8 +853,10 @@ void ExtModReceiver::die(bool clearChan)
|
|||
if (m_pid > 1)
|
||||
Debug(DebugInfo,"ExtModReceiver::die() pid=%d did not exit? [%p]",m_pid,this);
|
||||
|
||||
// Now terminate the process and close its stdout pipe
|
||||
// Close the stdout pipe before terminating the process
|
||||
closeIn();
|
||||
// Release relays and messages since no confirmation can be received anymore
|
||||
flush();
|
||||
#ifndef _WINDOWS
|
||||
if (m_pid > 1)
|
||||
::kill(m_pid,SIGTERM);
|
||||
|
@ -855,7 +876,7 @@ bool ExtModReceiver::received(Message &msg, int id)
|
|||
return false;
|
||||
lock();
|
||||
// check if we are no longer running
|
||||
bool ok = (m_pid > 0) && m_in && m_out;
|
||||
bool ok = (m_pid > 0) && m_in && m_out && !m_dead;
|
||||
if (ok && !m_reenter) {
|
||||
// check if the message was generated by ourselves - avoid reentrance
|
||||
ExtMessage* m = YOBJECT(ExtMessage,&msg);
|
||||
|
@ -1027,29 +1048,31 @@ void ExtModReceiver::run()
|
|||
m_pid = 0;
|
||||
return;
|
||||
}
|
||||
if (m_in)
|
||||
m_in->setBlocking(false);
|
||||
if (m_in && !m_in->setBlocking(false))
|
||||
Debug("ExtModule",DebugWarn,"Failed to set nonblocking mode, expect trouble [%p]",this);
|
||||
char buffer[MAX_INCOMING_LINE];
|
||||
int posinbuf = 0;
|
||||
DDebug(DebugAll,"ExtModReceiver::run() entering loop [%p]",this);
|
||||
for (;;) {
|
||||
use();
|
||||
lock();
|
||||
int readsize = m_in ? m_in->readData(buffer+posinbuf,sizeof(buffer)-posinbuf-1) : 0;
|
||||
unlock();
|
||||
if (unuse())
|
||||
return;
|
||||
if (!readsize) {
|
||||
lock();
|
||||
if (m_in)
|
||||
Debug("ExtModule",DebugInfo,"Read EOF on %p [%p]",m_in,this);
|
||||
closeIn();
|
||||
flush();
|
||||
unlock();
|
||||
if (m_chan && m_chan->running())
|
||||
Thread::sleep(1);
|
||||
break;
|
||||
}
|
||||
else if (readsize < 0) {
|
||||
Lock mylock(this);
|
||||
if (m_in && m_in->canRetry()) {
|
||||
mylock.drop();
|
||||
Thread::msleep(5);
|
||||
continue;
|
||||
}
|
||||
|
@ -1083,37 +1106,50 @@ void ExtModReceiver::run()
|
|||
|
||||
bool ExtModReceiver::outputLine(const char* line)
|
||||
{
|
||||
lock();
|
||||
DDebug("ExtModReceiver",DebugAll,"%soutputLine '%s'",
|
||||
(m_out ? "" : "failing "), line);
|
||||
if (m_dead || !m_out)
|
||||
((m_out && !m_dead) ? "" : "failing "), line);
|
||||
if (m_dead || !m_out) {
|
||||
unlock();
|
||||
return false;
|
||||
}
|
||||
// since m_out can be non-blocking (the socket) we have to loop
|
||||
int len = ::strlen(line);
|
||||
while (m_out && (len > 0) && !m_dead) {
|
||||
int w = m_out->writeData(line,len);
|
||||
if (w < 0) {
|
||||
if (m_dead || !m_out || !m_out->canRetry())
|
||||
if (m_dead || !m_out || !m_out->canRetry()) {
|
||||
unlock();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
else {
|
||||
line += w;
|
||||
len -= w;
|
||||
}
|
||||
if (len > 0)
|
||||
if (len > 0) {
|
||||
unlock();
|
||||
Thread::yield();
|
||||
lock();
|
||||
}
|
||||
}
|
||||
char nl = '\n';
|
||||
while (m_out && !m_dead) {
|
||||
for (;;) {
|
||||
if (m_dead || !m_out) {
|
||||
unlock();
|
||||
return false;
|
||||
}
|
||||
int w = m_out->writeData(&nl,1);
|
||||
if (m_dead || !m_out)
|
||||
return false;
|
||||
if ((w < 0) && !m_out->canRetry())
|
||||
return false;
|
||||
if ((w < 0) && m_out->canRetry())
|
||||
w = 0;
|
||||
unlock();
|
||||
if (w > 0)
|
||||
return true;
|
||||
if (w < 0)
|
||||
return false;
|
||||
Thread::yield();
|
||||
lock();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void ExtModReceiver::reportError(const char* line)
|
||||
|
@ -1125,14 +1161,14 @@ void ExtModReceiver::reportError(const char* line)
|
|||
void ExtModReceiver::returnMsg(const Message* msg, const char* id, bool accepted)
|
||||
{
|
||||
String ret(msg->encode(accepted,id));
|
||||
lock();
|
||||
outputLine(ret);
|
||||
unlock();
|
||||
}
|
||||
|
||||
bool ExtModReceiver::addWatched(const String& name)
|
||||
{
|
||||
Lock mylock(this);
|
||||
if (m_dead)
|
||||
return false;
|
||||
if (!m_watcher) {
|
||||
m_watcher = new MsgWatcher(this);
|
||||
Engine::self()->setHook(m_watcher);
|
||||
|
@ -1143,6 +1179,8 @@ bool ExtModReceiver::addWatched(const String& name)
|
|||
bool ExtModReceiver::delWatched(const String& name)
|
||||
{
|
||||
Lock mylock(this);
|
||||
if (m_dead)
|
||||
return false;
|
||||
return m_watcher && m_watcher->delWatched(name);
|
||||
}
|
||||
|
||||
|
@ -1210,7 +1248,6 @@ bool ExtModReceiver::processLine(const char* line)
|
|||
else if (id.startSkip("%%>install:",false)) {
|
||||
int prio = 100;
|
||||
id >> prio >> ":";
|
||||
bool ok = true;
|
||||
String fname;
|
||||
String fvalue;
|
||||
Regexp r("^\\([^:]*\\):\\([^:]*\\):\\?\\(.*\\)");
|
||||
|
@ -1221,7 +1258,8 @@ bool ExtModReceiver::processLine(const char* line)
|
|||
id = id.matchString(1);
|
||||
}
|
||||
// sanity checks
|
||||
ok = ok && id && !m_relays.find(id);
|
||||
lock();
|
||||
bool ok = id && !m_dead && !m_relays.find(id);
|
||||
if (ok) {
|
||||
MessageRelay *r = new MessageRelay(id,this,0,prio);
|
||||
if (fname)
|
||||
|
@ -1229,6 +1267,7 @@ bool ExtModReceiver::processLine(const char* line)
|
|||
m_relays.append(r);
|
||||
Engine::install(r);
|
||||
}
|
||||
unlock();
|
||||
if (debugAt(DebugAll)) {
|
||||
String tmp;
|
||||
if (fname)
|
||||
|
@ -1245,6 +1284,7 @@ bool ExtModReceiver::processLine(const char* line)
|
|||
else if (id.startSkip("%%>uninstall:",false)) {
|
||||
int prio = 0;
|
||||
bool ok = false;
|
||||
lock();
|
||||
ObjList *p = &m_relays;
|
||||
for (; p; p=p->next()) {
|
||||
MessageRelay *r = static_cast<MessageRelay *>(p->get());
|
||||
|
@ -1255,6 +1295,7 @@ bool ExtModReceiver::processLine(const char* line)
|
|||
break;
|
||||
}
|
||||
}
|
||||
unlock();
|
||||
Debug("ExtModReceiver",DebugAll,"Uninstall '%s' %s", id.c_str(),ok ? "ok" : "failed");
|
||||
String out("%%<uninstall:");
|
||||
out << prio << ":" << id << ":" << ok;
|
||||
|
@ -1288,6 +1329,9 @@ bool ExtModReceiver::processLine(const char* line)
|
|||
String val(id.substr(col+1));
|
||||
id = id.substr(0,col);
|
||||
bool ok = false;
|
||||
Lock mylock(this);
|
||||
if (m_dead)
|
||||
return false;
|
||||
if (m_chan && (id == "id")) {
|
||||
if (val.null())
|
||||
val = m_chan->id();
|
||||
|
@ -1675,6 +1719,7 @@ ExtModulePlugin::~ExtModulePlugin()
|
|||
{
|
||||
Output("Unloading module ExtModule");
|
||||
s_mutex.lock();
|
||||
s_pluginSafe = false;
|
||||
s_modules.clear();
|
||||
// the receivers destroyed above should also clear chans but better be sure
|
||||
s_chans.clear();
|
||||
|
@ -1694,6 +1739,12 @@ void ExtModulePlugin::initialize()
|
|||
s_cfg.load();
|
||||
s_timeout = s_cfg.getIntValue("general","timeout",MSG_TIMEOUT);
|
||||
s_timebomb = s_cfg.getBoolValue("general","timebomb",false);
|
||||
int wf = s_cfg.getIntValue("general","waitflush",WAIT_FLUSH);
|
||||
if (wf < 1)
|
||||
wf = 1;
|
||||
else if (wf > 100)
|
||||
wf = 100;
|
||||
s_waitFlush = wf;
|
||||
if (!m_handler) {
|
||||
m_handler = new ExtModHandler("call.execute",s_cfg.getIntValue("general","priority",100));
|
||||
Engine::install(m_handler);
|
||||
|
|
Loading…
Reference in New Issue