From 47d3a2bbb1363c16d86d879fc87e02cfc00e89c0 Mon Sep 17 00:00:00 2001 From: paulc Date: Wed, 8 Sep 2004 15:54:56 +0000 Subject: [PATCH] Thread termination more forgiving to long cleanups. External module fixed - it now works properly in channels. Non-string parameters of messages are stringified by libyate.php git-svn-id: http://voip.null.ro/svn/yate@41 acf43c95-373e-0410-b603-e72c3f656dc1 --- engine/Thread.cpp | 13 ++- modules/extmodule.cpp | 229 ++++++++++++++++++++++++++++-------------- scripts/libyate.php | 1 + 3 files changed, 168 insertions(+), 75 deletions(-) diff --git a/engine/Thread.cpp b/engine/Thread.cpp index e3fd6c11..6eebdf5f 100644 --- a/engine/Thread.cpp +++ b/engine/Thread.cpp @@ -167,9 +167,18 @@ void ThreadPrivate::killall() Debug(DebugInfo,"Trying to kill ThreadPrivate '%s' [%p], attempt %d",t->m_name,t,c); tmutex.unlock(); bool ok = t->cancel(); + if (ok) { + // delay a little so threads have a chance to clean up + for (int i=0; i<5; i++) { + tmutex.lock(); + bool done = (t != l->get()); + tmutex.unlock(); + if (done) + break; + ::usleep(10); + } + } tmutex.lock(); - if (ok) - ::usleep(10); if (t != l->get()) c = 1; else { diff --git a/modules/extmodule.cpp b/modules/extmodule.cpp index 6c0d02d5..5ee41add 100644 --- a/modules/extmodule.cpp +++ b/modules/extmodule.cpp @@ -30,20 +30,19 @@ static ObjList s_chans; static ObjList s_modules; class ExtModReceiver; +class ExtModChan; class ExtModSource : public ThreadedSource { public: - ExtModSource(int fd); + ExtModSource(int fd, ExtModChan* chan); ~ExtModSource(); virtual void run(); - inline bool running() const - { return m_running; } private: int m_fd; unsigned m_brate; unsigned m_total; - bool m_running; + ExtModChan* m_chan; }; class ExtModConsumer : public DataConsumer @@ -67,12 +66,22 @@ public: DataWrite, DataBoth }; - ExtModChan(const char *file, const char *args, int type); + static ExtModChan* build(const char *file, const char *args, int type); ~ExtModChan(); virtual void disconnected(); + inline ExtModReceiver* receiver() const + { return m_recv; } + inline void setRecv(ExtModReceiver* recv) + { m_recv = recv; } + inline void setRunning(bool running) + { m_running = running; } + inline bool running() const + { return m_running; } private: + ExtModChan(const char *file, const char *args, int type); ExtModReceiver *m_recv; int m_type; + bool m_running; }; class MsgHolder : public GenObject @@ -88,21 +97,22 @@ public: class ExtModReceiver : public MessageReceiver { public: - ExtModReceiver(const char *script, const char *args, + static ExtModReceiver* build(const char *script, const char *args, int ain = -1, int aout = -1, ExtModChan *chan = 0); ~ExtModReceiver(); virtual bool received(Message &msg, int id); void processLine(const char *line); bool outputLine(const char *line); void reportError(const char *line); - inline bool busy() const - { return (m_pid < 0); } - inline bool ok() const - { return (m_pid > 0); } + bool start(); void run(); - void die(); + void cleanup(); + void die(bool clearChan = true); private: + ExtModReceiver(const char *script, const char *args, + int ain = -1, int aout = -1, ExtModChan *chan = 0); bool create(const char *script, const char *args); + bool m_dead; pid_t m_pid; int m_in, m_out, m_ain, m_aout; ExtModChan *m_chan; @@ -118,6 +128,8 @@ public: { } virtual void run() { m_receiver->run(); } + virtual void cleanup() + { m_receiver->cleanup(); } private: ExtModReceiver *m_receiver; }; @@ -140,12 +152,12 @@ private: }; -ExtModSource::ExtModSource(int fd) - : m_fd(fd), m_brate(16000), m_total(0), m_running(false) +ExtModSource::ExtModSource(int fd, ExtModChan* chan) + : m_fd(fd), m_brate(16000), m_total(0), m_chan(chan) { Debug(DebugAll,"ExtModSource::ExtModSource(%d) [%p]",fd,this); if (m_fd >= 0) { - m_running = true; + chan->setRunning(true); start("ExtModSource"); } } @@ -153,7 +165,7 @@ ExtModSource::ExtModSource(int fd) ExtModSource::~ExtModSource() { Debug(DebugAll,"ExtModSource::~ExtModSource() [%p] total=%u",this,m_total); - m_running = false; + m_chan->setRunning(false); if (m_fd >= 0) { ::close(m_fd); m_fd = -1; @@ -188,7 +200,7 @@ void ExtModSource::run() tpos += (r*1000000ULL/m_brate); } while (r > 0); Debug(DebugAll,"ExtModSource [%p] end of data total=%u",this,m_total); - m_running = false; + m_chan->setRunning(false); } ExtModConsumer::ExtModConsumer(int fd) @@ -214,6 +226,16 @@ void ExtModConsumer::Consume(const DataBlock &data) } } +ExtModChan* ExtModChan::build(const char *file, const char *args, int type) +{ + ExtModChan* chan = new ExtModChan(file,args,type); + if (!chan->m_recv) { + chan->destruct(); + return 0; + } + return chan; +} + ExtModChan::ExtModChan(const char *file, const char *args, int type) : DataEndpoint("ExtModule"), m_recv(0), m_type(type) { @@ -231,17 +253,19 @@ ExtModChan::ExtModChan(const char *file, const char *args, int type) case DataRead: case DataBoth: ::pipe(rfifo); - setSource(new ExtModSource(rfifo[0])); + setSource(new ExtModSource(rfifo[0],this)); getSource()->deref(); } s_chans.append(this); - m_recv = new ExtModReceiver(file,args,wfifo[0],rfifo[1],this); + m_recv = ExtModReceiver::build(file,args,wfifo[0],rfifo[1],this); } ExtModChan::~ExtModChan() { Debug(DebugAll,"ExtModChan::~ExtModChan() [%p]",this); s_chans.remove(this,false); + if (m_recv) + m_recv->die(false); } void ExtModChan::disconnected() @@ -261,28 +285,69 @@ bool MsgHolder::decode(const char *s) return (m_msg.decode(s,m_ret,m_id) == -2); } +ExtModReceiver* ExtModReceiver::build(const char *script, const char *args, + int ain, int aout, ExtModChan *chan) +{ + ExtModReceiver* recv = new ExtModReceiver(script,args,ain,aout,chan); + if (!recv->start()) { + recv->destruct(); + return 0; + } + return recv; +} + ExtModReceiver::ExtModReceiver(const char *script, const char *args, int ain, int aout, ExtModChan *chan) - : m_pid(-1), m_in(-1), m_out(-1), m_ain(ain), m_aout(aout), + : m_dead(false), m_pid(-1), m_in(-1), m_out(-1), m_ain(ain), m_aout(aout), m_chan(chan), m_script(script), m_args(args) { Debug(DebugAll,"ExtModReceiver::ExtModReceiver(\"%s\",\"%s\") [%p]",script,args,this); s_modules.append(this); - new ExtThread(this); } + ExtModReceiver::~ExtModReceiver() { - Debug(DebugAll,"ExtModReceiver::~ExtModReceiver()"," [%p] pid=%d",this,m_pid); + Debug(DebugAll,"ExtModReceiver::~ExtModReceiver() [%p] pid=%d",this,m_pid); s_modules.remove(this,false); die(); + if (m_pid > 0) + Debug(DebugWarn,"ExtModReceiver::~ExtModReceiver() [%p] pid=%d",this,m_pid); } -void ExtModReceiver::die() +bool ExtModReceiver::start() { - Debug(DebugAll,"ExtModReceiver::die()"," [%p] pid=%d",this,m_pid); - pid_t pid = m_pid; - bool seppuku = (m_chan != 0); - m_pid = -1; + if (m_pid < 0) { + new ExtThread(this); + while (m_pid < 0) + Thread::yield(); + } + return (m_pid > 0); +} + +void ExtModReceiver::die(bool clearChan) +{ +#ifdef DEBUG + Debugger debug(DebugAll,"ExtModReceiver::die()"," [%p]",this); +#endif + if (m_dead) + return; + Debug(DebugAll,"ExtModReceiver::die() pid=%d",m_pid); + m_dead = true; + /* Give the external script a chance to die gracefully */ + if (m_out != -1) { + ::close(m_out); + m_out = -1; + } + if (m_pid > 0) { + Debug(DebugAll,"ExtModReceiver::die() waiting for pid=%d to die",m_pid); + for (int i=0; i<100; i++) { + Thread::yield(); + if (m_pid <= 0) + break; + } + } + if (m_pid > 0) + Debug(DebugInfo,"ExtModReceiver::die() pid=%d did not exit?",m_pid); /* Make sure we release all pending messages and not accept new ones */ if (!Engine::exiting()) m_relays.clear(); @@ -295,31 +360,19 @@ void ExtModReceiver::die() m_waiting.clear(); Thread::yield(); } - /* Give the external script a chance to die gracefully */ - if (pid > 0) - ::kill(pid,SIGTERM); + /* Now terminate the process and close its stdout pipe */ + if (m_pid > 0) + ::kill(m_pid,SIGTERM); if (m_in != -1) { ::close(m_in); m_in = -1; } - if (m_out != -1) { - ::close(m_out); - m_out = -1; - } if (m_chan) { - m_chan->disconnect(); + m_chan->setRecv(0); + if (clearChan) + m_chan->disconnect(); m_chan = 0; } - if (pid > 0) { - int w = ::waitpid(pid, 0, WNOHANG); - if (w == 0) - Debug(DebugWarn, "Process %d has not exited yet?",pid); - else if (w < 0) - Debug(DebugMild, "Failed waitpid on %d: %s",pid,strerror(errno)); - } - m_pid = 0; - if (seppuku) - destruct(); } bool ExtModReceiver::received(Message &msg, int id) @@ -336,8 +389,6 @@ bool ExtModReceiver::received(Message &msg, int id) Debug(DebugAll,"ExtMod [%p] queued message '%s' [%p]",this,msg.c_str(),&msg); #endif /* We use id to signal a call directly from the "call" message handler */ - if (id && (m_out < 0)) - Thread::yield(); outputLine(msg.encode(h.m_id)); while (m_waiting.find(&h)) Thread::yield(); @@ -360,20 +411,21 @@ bool ExtModReceiver::create(const char *script, const char *args) script = tmp.c_str(); if (::pipe(ext2yate)) { Debug(DebugWarn, "Unable to create ext->yate pipe: %s",strerror(errno)); - m_pid = 0; return false; } if (pipe(yate2ext)) { Debug(DebugWarn, "unable to create yate->ext pipe: %s", strerror(errno)); ::close(ext2yate[0]); ::close(ext2yate[1]); - m_pid = 0; return false; } pid = ::fork(); if (pid < 0) { Debug(DebugWarn, "Failed to fork(): %s", strerror(errno)); - m_pid = 0; + ::close(yate2ext[0]); + ::close(yate2ext[1]); + ::close(ext2yate[0]); + ::close(ext2yate[1]); return false; } if (!pid) { @@ -419,28 +471,60 @@ bool ExtModReceiver::create(const char *script, const char *args) return true; } +void ExtModReceiver::cleanup() +{ +#ifdef DEBUG + Debugger debug(DebugAll,"ExtModReceiver::cleanup()"," [%p]",this); +#endif + /* We must call waitpid from here - same thread we started the child */ + if (m_pid > 0) { + /* No thread switching if possible */ + if (m_out != -1) { + ::close(m_out); + m_out = -1; + } + Thread::yield(); + int w = ::waitpid(m_pid, 0, WNOHANG); + if (w == 0) { + Debug(DebugWarn, "Process %d has not exited on closing stdin - we'll kill it",m_pid); + ::kill(m_pid,SIGTERM); + Thread::yield(); + w = ::waitpid(m_pid, 0, WNOHANG); + } + if (w == 0) + Debug(DebugWarn, "Process %d has still not exited yet?",m_pid); + else if (w < 0) + Debug(DebugMild, "Failed waitpid on %d: %s",m_pid,strerror(errno)); + m_pid = 0; + } + destruct(); +} + void ExtModReceiver::run() { if (!create(m_script.safe(),m_args.safe())) { - die(); + m_pid = 0; return; } char buffer[1024]; int posinbuf = 0; +#ifdef DEBUG + Debug(DebugAll,"ExtModReceiver::run() entering loop [%p]",this); +#endif for (;;) { - int readsize = ::read(m_in,buffer+posinbuf,sizeof(buffer)-posinbuf-1); + int readsize = (m_in >= 0) ? ::read(m_in,buffer+posinbuf,sizeof(buffer)-posinbuf-1) : 0; +#ifdef DEBUG + Debug(DebugAll,"ExtModReceiver::run() read %d",readsize); +#endif if (!readsize) { - if (m_chan && m_chan->getSource() && - static_cast(m_chan->getSource())->running()) { + Debug("ExtModule",DebugInfo,"Read EOF on %d [%p]",m_in,this); + if (m_chan && m_chan->running()) ::usleep(1000000); - } - die(); - return; + break; } else if (readsize < 0) { - Debug("ExtModule",DebugWarn,"Read error %d on %d",errno,m_in); - die(); - return; + Debug("ExtModule",DebugWarn,"Read error %d on %d [%p]",errno,m_in,this); + break; } int totalsize = readsize + posinbuf; buffer[totalsize]=0; @@ -591,23 +675,23 @@ bool ExtModHandler::received(Message &msg) return false; } if (typ == ExtModChan::NoChannel) { - ExtModReceiver *recv = new ExtModReceiver(dest.matchString(2).c_str(), + ExtModReceiver *r = ExtModReceiver::build(dest.matchString(2).c_str(), dest.matchString(3).trimBlanks().c_str()); - while (recv->busy()) - Thread::yield(); - bool retv = recv->received(msg,1); - if (!recv->ok()) - recv->destruct(); - return retv; + return r ? r->received(msg,1) : false; } if (typ != ExtModChan::DataNone && !dd) { - Debug(DebugFail,"ExtMod '%s' call found but no data channel!",t.c_str()); + Debug(DebugGoOn,"ExtMod '%s' call found but no data channel!",t.c_str()); return false; } - ExtModChan *em = new ExtModChan(dest.matchString(2).c_str(), - dest.matchString(3).c_str(),typ); + ExtModChan *em = ExtModChan::build(dest.matchString(2).c_str(), + dest.matchString(3).c_str(),typ); if (!em) { - Debug(DebugFail,"Failed to create ExtMod for '%s'",dest.matchString(2).c_str()); + Debug(DebugGoOn,"Failed to create ExtMod for '%s'",dest.matchString(2).c_str()); + return false; + } + if (!(em->receiver() && em->receiver()->received(msg,1))) { + Debug(DebugWarn,"ExtMod '%s' did not handle call message",dest.matchString(2).c_str()); + em->deref(); return false; } if (dd && dd->connect(em)) @@ -643,9 +727,8 @@ void ExtModulePlugin::initialize() unsigned int len = list->length(); for (unsigned int i=0; igetParam(i); - if (n) { - new ExtModReceiver(n->name(),*n); - } + if (n) + ExtModReceiver::build(n->name(),*n); } } } diff --git a/scripts/libyate.php b/scripts/libyate.php index 5eec8968..5a4070ea 100644 --- a/scripts/libyate.php +++ b/scripts/libyate.php @@ -92,6 +92,7 @@ class Yate */ function Escape($str, $extra = "") { + $str = $str . ""; $s = ""; $n = strlen($str); for ($i=0; $i<$n; $i++) {