Thread termination more forgiving to long cleanups.

External module fixed - it now works properly in channels.
Non-string parameters of messages are stringified by libyate.php


git-svn-id: http://voip.null.ro/svn/yate@41 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
paulc 2004-09-08 15:54:56 +00:00
parent 38b1cbbd50
commit 47d3a2bbb1
3 changed files with 168 additions and 75 deletions

View File

@ -167,9 +167,18 @@ void ThreadPrivate::killall()
Debug(DebugInfo,"Trying to kill ThreadPrivate '%s' [%p], attempt %d",t->m_name,t,c); Debug(DebugInfo,"Trying to kill ThreadPrivate '%s' [%p], attempt %d",t->m_name,t,c);
tmutex.unlock(); tmutex.unlock();
bool ok = t->cancel(); bool ok = t->cancel();
if (ok) {
// delay a little so threads have a chance to clean up
for (int i=0; i<5; i++) {
tmutex.lock();
bool done = (t != l->get());
tmutex.unlock();
if (done)
break;
::usleep(10);
}
}
tmutex.lock(); tmutex.lock();
if (ok)
::usleep(10);
if (t != l->get()) if (t != l->get())
c = 1; c = 1;
else { else {

View File

@ -30,20 +30,19 @@ static ObjList s_chans;
static ObjList s_modules; static ObjList s_modules;
class ExtModReceiver; class ExtModReceiver;
class ExtModChan;
class ExtModSource : public ThreadedSource class ExtModSource : public ThreadedSource
{ {
public: public:
ExtModSource(int fd); ExtModSource(int fd, ExtModChan* chan);
~ExtModSource(); ~ExtModSource();
virtual void run(); virtual void run();
inline bool running() const
{ return m_running; }
private: private:
int m_fd; int m_fd;
unsigned m_brate; unsigned m_brate;
unsigned m_total; unsigned m_total;
bool m_running; ExtModChan* m_chan;
}; };
class ExtModConsumer : public DataConsumer class ExtModConsumer : public DataConsumer
@ -67,12 +66,22 @@ public:
DataWrite, DataWrite,
DataBoth DataBoth
}; };
ExtModChan(const char *file, const char *args, int type); static ExtModChan* build(const char *file, const char *args, int type);
~ExtModChan(); ~ExtModChan();
virtual void disconnected(); virtual void disconnected();
inline ExtModReceiver* receiver() const
{ return m_recv; }
inline void setRecv(ExtModReceiver* recv)
{ m_recv = recv; }
inline void setRunning(bool running)
{ m_running = running; }
inline bool running() const
{ return m_running; }
private: private:
ExtModChan(const char *file, const char *args, int type);
ExtModReceiver *m_recv; ExtModReceiver *m_recv;
int m_type; int m_type;
bool m_running;
}; };
class MsgHolder : public GenObject class MsgHolder : public GenObject
@ -88,21 +97,22 @@ public:
class ExtModReceiver : public MessageReceiver class ExtModReceiver : public MessageReceiver
{ {
public: public:
ExtModReceiver(const char *script, const char *args, static ExtModReceiver* build(const char *script, const char *args,
int ain = -1, int aout = -1, ExtModChan *chan = 0); int ain = -1, int aout = -1, ExtModChan *chan = 0);
~ExtModReceiver(); ~ExtModReceiver();
virtual bool received(Message &msg, int id); virtual bool received(Message &msg, int id);
void processLine(const char *line); void processLine(const char *line);
bool outputLine(const char *line); bool outputLine(const char *line);
void reportError(const char *line); void reportError(const char *line);
inline bool busy() const bool start();
{ return (m_pid < 0); }
inline bool ok() const
{ return (m_pid > 0); }
void run(); void run();
void die(); void cleanup();
void die(bool clearChan = true);
private: private:
ExtModReceiver(const char *script, const char *args,
int ain = -1, int aout = -1, ExtModChan *chan = 0);
bool create(const char *script, const char *args); bool create(const char *script, const char *args);
bool m_dead;
pid_t m_pid; pid_t m_pid;
int m_in, m_out, m_ain, m_aout; int m_in, m_out, m_ain, m_aout;
ExtModChan *m_chan; ExtModChan *m_chan;
@ -118,6 +128,8 @@ public:
{ } { }
virtual void run() virtual void run()
{ m_receiver->run(); } { m_receiver->run(); }
virtual void cleanup()
{ m_receiver->cleanup(); }
private: private:
ExtModReceiver *m_receiver; ExtModReceiver *m_receiver;
}; };
@ -140,12 +152,12 @@ private:
}; };
ExtModSource::ExtModSource(int fd) ExtModSource::ExtModSource(int fd, ExtModChan* chan)
: m_fd(fd), m_brate(16000), m_total(0), m_running(false) : m_fd(fd), m_brate(16000), m_total(0), m_chan(chan)
{ {
Debug(DebugAll,"ExtModSource::ExtModSource(%d) [%p]",fd,this); Debug(DebugAll,"ExtModSource::ExtModSource(%d) [%p]",fd,this);
if (m_fd >= 0) { if (m_fd >= 0) {
m_running = true; chan->setRunning(true);
start("ExtModSource"); start("ExtModSource");
} }
} }
@ -153,7 +165,7 @@ ExtModSource::ExtModSource(int fd)
ExtModSource::~ExtModSource() ExtModSource::~ExtModSource()
{ {
Debug(DebugAll,"ExtModSource::~ExtModSource() [%p] total=%u",this,m_total); Debug(DebugAll,"ExtModSource::~ExtModSource() [%p] total=%u",this,m_total);
m_running = false; m_chan->setRunning(false);
if (m_fd >= 0) { if (m_fd >= 0) {
::close(m_fd); ::close(m_fd);
m_fd = -1; m_fd = -1;
@ -188,7 +200,7 @@ void ExtModSource::run()
tpos += (r*1000000ULL/m_brate); tpos += (r*1000000ULL/m_brate);
} while (r > 0); } while (r > 0);
Debug(DebugAll,"ExtModSource [%p] end of data total=%u",this,m_total); Debug(DebugAll,"ExtModSource [%p] end of data total=%u",this,m_total);
m_running = false; m_chan->setRunning(false);
} }
ExtModConsumer::ExtModConsumer(int fd) ExtModConsumer::ExtModConsumer(int fd)
@ -214,6 +226,16 @@ void ExtModConsumer::Consume(const DataBlock &data)
} }
} }
ExtModChan* ExtModChan::build(const char *file, const char *args, int type)
{
ExtModChan* chan = new ExtModChan(file,args,type);
if (!chan->m_recv) {
chan->destruct();
return 0;
}
return chan;
}
ExtModChan::ExtModChan(const char *file, const char *args, int type) ExtModChan::ExtModChan(const char *file, const char *args, int type)
: DataEndpoint("ExtModule"), m_recv(0), m_type(type) : DataEndpoint("ExtModule"), m_recv(0), m_type(type)
{ {
@ -231,17 +253,19 @@ ExtModChan::ExtModChan(const char *file, const char *args, int type)
case DataRead: case DataRead:
case DataBoth: case DataBoth:
::pipe(rfifo); ::pipe(rfifo);
setSource(new ExtModSource(rfifo[0])); setSource(new ExtModSource(rfifo[0],this));
getSource()->deref(); getSource()->deref();
} }
s_chans.append(this); s_chans.append(this);
m_recv = new ExtModReceiver(file,args,wfifo[0],rfifo[1],this); m_recv = ExtModReceiver::build(file,args,wfifo[0],rfifo[1],this);
} }
ExtModChan::~ExtModChan() ExtModChan::~ExtModChan()
{ {
Debug(DebugAll,"ExtModChan::~ExtModChan() [%p]",this); Debug(DebugAll,"ExtModChan::~ExtModChan() [%p]",this);
s_chans.remove(this,false); s_chans.remove(this,false);
if (m_recv)
m_recv->die(false);
} }
void ExtModChan::disconnected() void ExtModChan::disconnected()
@ -261,28 +285,69 @@ bool MsgHolder::decode(const char *s)
return (m_msg.decode(s,m_ret,m_id) == -2); return (m_msg.decode(s,m_ret,m_id) == -2);
} }
ExtModReceiver* ExtModReceiver::build(const char *script, const char *args,
int ain, int aout, ExtModChan *chan)
{
ExtModReceiver* recv = new ExtModReceiver(script,args,ain,aout,chan);
if (!recv->start()) {
recv->destruct();
return 0;
}
return recv;
}
ExtModReceiver::ExtModReceiver(const char *script, const char *args, int ain, int aout, ExtModChan *chan) ExtModReceiver::ExtModReceiver(const char *script, const char *args, int ain, int aout, ExtModChan *chan)
: m_pid(-1), m_in(-1), m_out(-1), m_ain(ain), m_aout(aout), : m_dead(false), m_pid(-1), m_in(-1), m_out(-1), m_ain(ain), m_aout(aout),
m_chan(chan), m_script(script), m_args(args) m_chan(chan), m_script(script), m_args(args)
{ {
Debug(DebugAll,"ExtModReceiver::ExtModReceiver(\"%s\",\"%s\") [%p]",script,args,this); Debug(DebugAll,"ExtModReceiver::ExtModReceiver(\"%s\",\"%s\") [%p]",script,args,this);
s_modules.append(this); s_modules.append(this);
new ExtThread(this);
} }
ExtModReceiver::~ExtModReceiver() ExtModReceiver::~ExtModReceiver()
{ {
Debug(DebugAll,"ExtModReceiver::~ExtModReceiver()"," [%p] pid=%d",this,m_pid); Debug(DebugAll,"ExtModReceiver::~ExtModReceiver() [%p] pid=%d",this,m_pid);
s_modules.remove(this,false); s_modules.remove(this,false);
die(); die();
if (m_pid > 0)
Debug(DebugWarn,"ExtModReceiver::~ExtModReceiver() [%p] pid=%d",this,m_pid);
} }
void ExtModReceiver::die() bool ExtModReceiver::start()
{ {
Debug(DebugAll,"ExtModReceiver::die()"," [%p] pid=%d",this,m_pid); if (m_pid < 0) {
pid_t pid = m_pid; new ExtThread(this);
bool seppuku = (m_chan != 0); while (m_pid < 0)
m_pid = -1; Thread::yield();
}
return (m_pid > 0);
}
void ExtModReceiver::die(bool clearChan)
{
#ifdef DEBUG
Debugger debug(DebugAll,"ExtModReceiver::die()"," [%p]",this);
#endif
if (m_dead)
return;
Debug(DebugAll,"ExtModReceiver::die() pid=%d",m_pid);
m_dead = true;
/* Give the external script a chance to die gracefully */
if (m_out != -1) {
::close(m_out);
m_out = -1;
}
if (m_pid > 0) {
Debug(DebugAll,"ExtModReceiver::die() waiting for pid=%d to die",m_pid);
for (int i=0; i<100; i++) {
Thread::yield();
if (m_pid <= 0)
break;
}
}
if (m_pid > 0)
Debug(DebugInfo,"ExtModReceiver::die() pid=%d did not exit?",m_pid);
/* Make sure we release all pending messages and not accept new ones */ /* Make sure we release all pending messages and not accept new ones */
if (!Engine::exiting()) if (!Engine::exiting())
m_relays.clear(); m_relays.clear();
@ -295,31 +360,19 @@ void ExtModReceiver::die()
m_waiting.clear(); m_waiting.clear();
Thread::yield(); Thread::yield();
} }
/* Give the external script a chance to die gracefully */ /* Now terminate the process and close its stdout pipe */
if (pid > 0) if (m_pid > 0)
::kill(pid,SIGTERM); ::kill(m_pid,SIGTERM);
if (m_in != -1) { if (m_in != -1) {
::close(m_in); ::close(m_in);
m_in = -1; m_in = -1;
} }
if (m_out != -1) {
::close(m_out);
m_out = -1;
}
if (m_chan) { if (m_chan) {
m_chan->disconnect(); m_chan->setRecv(0);
if (clearChan)
m_chan->disconnect();
m_chan = 0; m_chan = 0;
} }
if (pid > 0) {
int w = ::waitpid(pid, 0, WNOHANG);
if (w == 0)
Debug(DebugWarn, "Process %d has not exited yet?",pid);
else if (w < 0)
Debug(DebugMild, "Failed waitpid on %d: %s",pid,strerror(errno));
}
m_pid = 0;
if (seppuku)
destruct();
} }
bool ExtModReceiver::received(Message &msg, int id) bool ExtModReceiver::received(Message &msg, int id)
@ -336,8 +389,6 @@ bool ExtModReceiver::received(Message &msg, int id)
Debug(DebugAll,"ExtMod [%p] queued message '%s' [%p]",this,msg.c_str(),&msg); Debug(DebugAll,"ExtMod [%p] queued message '%s' [%p]",this,msg.c_str(),&msg);
#endif #endif
/* We use id to signal a call directly from the "call" message handler */ /* We use id to signal a call directly from the "call" message handler */
if (id && (m_out < 0))
Thread::yield();
outputLine(msg.encode(h.m_id)); outputLine(msg.encode(h.m_id));
while (m_waiting.find(&h)) while (m_waiting.find(&h))
Thread::yield(); Thread::yield();
@ -360,20 +411,21 @@ bool ExtModReceiver::create(const char *script, const char *args)
script = tmp.c_str(); script = tmp.c_str();
if (::pipe(ext2yate)) { if (::pipe(ext2yate)) {
Debug(DebugWarn, "Unable to create ext->yate pipe: %s",strerror(errno)); Debug(DebugWarn, "Unable to create ext->yate pipe: %s",strerror(errno));
m_pid = 0;
return false; return false;
} }
if (pipe(yate2ext)) { if (pipe(yate2ext)) {
Debug(DebugWarn, "unable to create yate->ext pipe: %s", strerror(errno)); Debug(DebugWarn, "unable to create yate->ext pipe: %s", strerror(errno));
::close(ext2yate[0]); ::close(ext2yate[0]);
::close(ext2yate[1]); ::close(ext2yate[1]);
m_pid = 0;
return false; return false;
} }
pid = ::fork(); pid = ::fork();
if (pid < 0) { if (pid < 0) {
Debug(DebugWarn, "Failed to fork(): %s", strerror(errno)); Debug(DebugWarn, "Failed to fork(): %s", strerror(errno));
m_pid = 0; ::close(yate2ext[0]);
::close(yate2ext[1]);
::close(ext2yate[0]);
::close(ext2yate[1]);
return false; return false;
} }
if (!pid) { if (!pid) {
@ -419,28 +471,60 @@ bool ExtModReceiver::create(const char *script, const char *args)
return true; return true;
} }
void ExtModReceiver::cleanup()
{
#ifdef DEBUG
Debugger debug(DebugAll,"ExtModReceiver::cleanup()"," [%p]",this);
#endif
/* We must call waitpid from here - same thread we started the child */
if (m_pid > 0) {
/* No thread switching if possible */
if (m_out != -1) {
::close(m_out);
m_out = -1;
}
Thread::yield();
int w = ::waitpid(m_pid, 0, WNOHANG);
if (w == 0) {
Debug(DebugWarn, "Process %d has not exited on closing stdin - we'll kill it",m_pid);
::kill(m_pid,SIGTERM);
Thread::yield();
w = ::waitpid(m_pid, 0, WNOHANG);
}
if (w == 0)
Debug(DebugWarn, "Process %d has still not exited yet?",m_pid);
else if (w < 0)
Debug(DebugMild, "Failed waitpid on %d: %s",m_pid,strerror(errno));
m_pid = 0;
}
destruct();
}
void ExtModReceiver::run() void ExtModReceiver::run()
{ {
if (!create(m_script.safe(),m_args.safe())) { if (!create(m_script.safe(),m_args.safe())) {
die(); m_pid = 0;
return; return;
} }
char buffer[1024]; char buffer[1024];
int posinbuf = 0; int posinbuf = 0;
#ifdef DEBUG
Debug(DebugAll,"ExtModReceiver::run() entering loop [%p]",this);
#endif
for (;;) { for (;;) {
int readsize = ::read(m_in,buffer+posinbuf,sizeof(buffer)-posinbuf-1); int readsize = (m_in >= 0) ? ::read(m_in,buffer+posinbuf,sizeof(buffer)-posinbuf-1) : 0;
#ifdef DEBUG
Debug(DebugAll,"ExtModReceiver::run() read %d",readsize);
#endif
if (!readsize) { if (!readsize) {
if (m_chan && m_chan->getSource() && Debug("ExtModule",DebugInfo,"Read EOF on %d [%p]",m_in,this);
static_cast<ExtModSource*>(m_chan->getSource())->running()) { if (m_chan && m_chan->running())
::usleep(1000000); ::usleep(1000000);
} break;
die();
return;
} }
else if (readsize < 0) { else if (readsize < 0) {
Debug("ExtModule",DebugWarn,"Read error %d on %d",errno,m_in); Debug("ExtModule",DebugWarn,"Read error %d on %d [%p]",errno,m_in,this);
die(); break;
return;
} }
int totalsize = readsize + posinbuf; int totalsize = readsize + posinbuf;
buffer[totalsize]=0; buffer[totalsize]=0;
@ -591,23 +675,23 @@ bool ExtModHandler::received(Message &msg)
return false; return false;
} }
if (typ == ExtModChan::NoChannel) { if (typ == ExtModChan::NoChannel) {
ExtModReceiver *recv = new ExtModReceiver(dest.matchString(2).c_str(), ExtModReceiver *r = ExtModReceiver::build(dest.matchString(2).c_str(),
dest.matchString(3).trimBlanks().c_str()); dest.matchString(3).trimBlanks().c_str());
while (recv->busy()) return r ? r->received(msg,1) : false;
Thread::yield();
bool retv = recv->received(msg,1);
if (!recv->ok())
recv->destruct();
return retv;
} }
if (typ != ExtModChan::DataNone && !dd) { if (typ != ExtModChan::DataNone && !dd) {
Debug(DebugFail,"ExtMod '%s' call found but no data channel!",t.c_str()); Debug(DebugGoOn,"ExtMod '%s' call found but no data channel!",t.c_str());
return false; return false;
} }
ExtModChan *em = new ExtModChan(dest.matchString(2).c_str(), ExtModChan *em = ExtModChan::build(dest.matchString(2).c_str(),
dest.matchString(3).c_str(),typ); dest.matchString(3).c_str(),typ);
if (!em) { if (!em) {
Debug(DebugFail,"Failed to create ExtMod for '%s'",dest.matchString(2).c_str()); Debug(DebugGoOn,"Failed to create ExtMod for '%s'",dest.matchString(2).c_str());
return false;
}
if (!(em->receiver() && em->receiver()->received(msg,1))) {
Debug(DebugWarn,"ExtMod '%s' did not handle call message",dest.matchString(2).c_str());
em->deref();
return false; return false;
} }
if (dd && dd->connect(em)) if (dd && dd->connect(em))
@ -643,9 +727,8 @@ void ExtModulePlugin::initialize()
unsigned int len = list->length(); unsigned int len = list->length();
for (unsigned int i=0; i<len; i++) { for (unsigned int i=0; i<len; i++) {
NamedString *n = list->getParam(i); NamedString *n = list->getParam(i);
if (n) { if (n)
new ExtModReceiver(n->name(),*n); ExtModReceiver::build(n->name(),*n);
}
} }
} }
} }

View File

@ -92,6 +92,7 @@ class Yate
*/ */
function Escape($str, $extra = "") function Escape($str, $extra = "")
{ {
$str = $str . "";
$s = ""; $s = "";
$n = strlen($str); $n = strlen($str);
for ($i=0; $i<$n; $i++) { for ($i=0; $i<$n; $i++) {