diff --git a/engine/DataBlock.cpp b/engine/DataBlock.cpp index 1f8a89df..4a104020 100644 --- a/engine/DataBlock.cpp +++ b/engine/DataBlock.cpp @@ -288,6 +288,11 @@ bool DataSource::detach(DataConsumer *consumer) return false; } +DataSource::~DataSource() +{ + while (detach(static_cast(m_consumers.get()))) ; +} + DataEndpoint::~DataEndpoint() { disconnect(); @@ -368,17 +373,25 @@ void DataEndpoint::setSource(DataSource *source) return; DataConsumer *consumer = m_peer ? m_peer->getConsumer() : 0; DataSource *temp = m_source; + if (consumer) + consumer->ref(); + m_source = 0; + if (temp) { + if (consumer) { + DataTranslator::detachChain(temp,consumer); + if (consumer->getConnSource()) + Debug(DebugWarn,"consumer source not cleared in %p",consumer); + } + temp->deref(); + } if (source) { source->ref(); if (consumer) DataTranslator::attachChain(source,consumer); } m_source = source; - if (temp) { - if (consumer) - DataTranslator::detachChain(temp,consumer); - temp->deref(); - } + if (consumer) + consumer->deref(); } void DataEndpoint::setConsumer(DataConsumer *consumer) diff --git a/modules/extmodule.cpp b/modules/extmodule.cpp index dad5cd33..1198711c 100644 --- a/modules/extmodule.cpp +++ b/modules/extmodule.cpp @@ -108,11 +108,15 @@ public: void run(); void cleanup(); void die(bool clearChan = true); + inline void use() + { m_use++; } + bool unuse(); private: ExtModReceiver(const char *script, const char *args, int ain = -1, int aout = -1, ExtModChan *chan = 0); bool create(const char *script, const char *args); bool m_dead; + int m_use; pid_t m_pid; int m_in, m_out, m_ain, m_aout; ExtModChan *m_chan; @@ -263,8 +267,10 @@ ExtModChan::ExtModChan(const char *file, const char *args, int type) ExtModChan::~ExtModChan() { - Debug(DebugAll,"ExtModChan::~ExtModChan() [%p]",this); + Debugger debug(DebugAll,"ExtModChan::~ExtModChan()"," [%p]",this); s_chans.remove(this,false); + setSource(); + setConsumer(); if (m_recv) m_recv->die(false); } @@ -298,8 +304,16 @@ ExtModReceiver* ExtModReceiver::build(const char *script, const char *args, return recv; } +bool ExtModReceiver::unuse() +{ + int u = --m_use; + if (!u) + destruct(); + return (u <= 0); +} + ExtModReceiver::ExtModReceiver(const char *script, const char *args, int ain, int aout, ExtModChan *chan) - : m_dead(false), m_pid(-1), m_in(-1), m_out(-1), m_ain(ain), m_aout(aout), + : m_dead(false), m_use(0), m_pid(-1), m_in(-1), m_out(-1), m_ain(ain), m_aout(aout), m_chan(chan), m_script(script), m_args(args) { Debug(DebugAll,"ExtModReceiver::ExtModReceiver(\"%s\",\"%s\") [%p]",script,args,this); @@ -335,6 +349,24 @@ void ExtModReceiver::die(bool clearChan) return; Debug(DebugAll,"ExtModReceiver::die() pid=%d",m_pid); m_dead = true; + use(); + /* Make sure we release all pending messages and not accept new ones */ + if (!Engine::exiting()) + m_relays.clear(); + else { + ObjList *p = &m_relays; + for (; p; p=p->next()) + p->setDelete(false); + } + if (m_waiting.get()) { + m_waiting.clear(); + Thread::yield(); + } + + ExtModChan *chan = m_chan; + m_chan = 0; + chan->setRecv(0); + /* Give the external script a chance to die gracefully */ if (m_out != -1) { ::close(m_out); @@ -350,31 +382,17 @@ void ExtModReceiver::die(bool clearChan) } if (m_pid > 0) Debug(DebugInfo,"ExtModReceiver::die() pid=%d did not exit?",m_pid); - /* Make sure we release all pending messages and not accept new ones */ - if (!Engine::exiting()) - m_relays.clear(); - else { - ObjList *p = &m_relays; - for (; p; p=p->next()) - p->setDelete(false); - } - if (m_waiting.get()) { - m_waiting.clear(); - Thread::yield(); - } + /* Now terminate the process and close its stdout pipe */ - if (m_pid > 0) - ::kill(m_pid,SIGTERM); if (m_in != -1) { ::close(m_in); m_in = -1; } - if (m_chan) { - m_chan->setRecv(0); - if (clearChan) - m_chan->disconnect(); - m_chan = 0; - } + if (m_pid > 0) + ::kill(m_pid,SIGTERM); + if (chan && clearChan) + chan->disconnect(); + unuse(); } bool ExtModReceiver::received(Message &msg, int id) @@ -385,6 +403,7 @@ bool ExtModReceiver::received(Message &msg, int id) if (m_reenter.find(&msg)) return false; + use(); MsgHolder h(msg); m_waiting.append(&h)->setDelete(false); #ifdef DEBUG @@ -396,6 +415,7 @@ bool ExtModReceiver::received(Message &msg, int id) #ifdef DEBUG Debug(DebugAll,"ExtMod [%p] message '%s' [%p] returning %s",this,msg.c_str(),&msg, h.m_ret ? "true" : "false"); #endif + unuse(); return h.m_ret; } @@ -498,7 +518,7 @@ void ExtModReceiver::cleanup() Debug(DebugMild, "Failed waitpid on %d: %s",m_pid,strerror(errno)); m_pid = 0; } - destruct(); + unuse(); } void ExtModReceiver::run() @@ -507,16 +527,20 @@ void ExtModReceiver::run() m_pid = 0; return; } + use(); char buffer[1024]; int posinbuf = 0; #ifdef DEBUG Debug(DebugAll,"ExtModReceiver::run() entering loop [%p]",this); #endif for (;;) { + use(); int readsize = (m_in >= 0) ? ::read(m_in,buffer+posinbuf,sizeof(buffer)-posinbuf-1) : 0; #ifdef DEBUG Debug(DebugAll,"ExtModReceiver::run() read %d",readsize); #endif + if (unuse()) + return; if (!readsize) { Debug("ExtModule",DebugInfo,"Read EOF on %d [%p]",m_in,this); if (m_chan && m_chan->running()) @@ -536,8 +560,11 @@ void ExtModReceiver::run() if (!eoline) break; *eoline=0; + use(); if (buffer[0]) processLine(buffer); + if (unuse()) + return; totalsize -= eoline-buffer+1; ::memmove(buffer,eoline+1,totalsize+1); } diff --git a/modules/iaxchan.cpp b/modules/iaxchan.cpp index 2102cfeb..d90531b8 100644 --- a/modules/iaxchan.cpp +++ b/modules/iaxchan.cpp @@ -64,7 +64,6 @@ private: unsigned m_total; unsigned long long m_time; }; - class YateIAXAudioConsumer : public DataConsumer { public: @@ -84,6 +83,7 @@ private: unsigned long long m_time; }; + class YateIAXEndPoint : public Thread { public: @@ -97,7 +97,7 @@ public: void run(void); void terminateall(void); YateIAXConnection *findconn(iax_session *session); - YateIAXConnection *findconn(String ourcallid); + YateIAXConnection *findconn(const String& ourcallid); void handleEvent(iax_event *event); inline ObjList &calls() @@ -179,6 +179,13 @@ public: virtual bool received(Message &msg); }; +class TransferHandler : public MessageHandler +{ +public: + TransferHandler(const char *name) : MessageHandler(name,100) { } + virtual bool received(Message &msg); +}; + static IAXPlugin iplugin; static void iax_err_cb(const char *s) @@ -192,7 +199,6 @@ static void iax_out_cb(const char *s) Debug("IAX",DebugInfo,"%s",s); } - YateIAXEndPoint::YateIAXEndPoint() : Thread("IAX EndPoint") { @@ -221,7 +227,7 @@ bool YateIAXEndPoint::Init(void) iax_set_output(iax_out_cb); int tos = s_cfg.getIntValue("general","tos",dict_tos,0); if (tos) - ::setsockopt(iax_get_fd(),IPPROTO_IP,IP_TOS,&tos,sizeof(tos)); + ::setsockopt(iax_get_fd(),IPPROTO_IP,IP_TOS,&tos,sizeof(tos)); return true; } @@ -258,7 +264,6 @@ void YateIAXEndPoint::Setup(void) void YateIAXEndPoint::terminateall(void) { - Debug(DebugInfo,"YateIAXEndPoint::terminateall()"); m_calls.clear(); } @@ -461,8 +466,8 @@ void YateIAXEndPoint::answer(iax_event *e) *m = "call"; m->userData(conn); m->addParam("callto",m->retValue()); - m->addParam("partycallid",conn->ourcallid); - m->retValue() = 0; + m->addParam("ourcallid",conn->ourcallid); + m->retValue().clear(); if (!Engine::dispatch(m)) { conn->reject("I haven't been able to connect you with the other module"); @@ -470,12 +475,11 @@ void YateIAXEndPoint::answer(iax_event *e) delete m; return; } - /* i do this to setup the peercallid by getting ourcallid - * from the other party */ - String ourcallid(m->getValue("ourcallid")); - Debug(DebugInfo,"partycallid %s",ourcallid.c_str()); - if (ourcallid) - conn->partycallid = ourcallid; + /* i do this to setup the peercallid by getting + * partycallid (that mean ourcallid from the other party) */ + String partycallid(m->getValue("partycallid")); + Debug(DebugInfo,"partycallid %s",partycallid.c_str()); + conn->partycallid = partycallid; conn->deref(); s_mutex.lock(); ::iax_answer(e->session); @@ -574,7 +578,7 @@ YateIAXConnection * YateIAXEndPoint::findconn(iax_session *session) return 0; } -YateIAXConnection * YateIAXEndPoint::findconn(String ourcallid) +YateIAXConnection * YateIAXEndPoint::findconn(const String& ourcallid) { ObjList *p = &m_calls; for (; p; p=p->next()) { @@ -599,7 +603,7 @@ YateIAXConnection::YateIAXConnection(iax_session *session) iplugin.m_endpoint->calls().append(this); ::iax_set_private(m_session,this); char buf[64]; - snprintf(buf,sizeof(buf),"%p",m_session); + snprintf(buf,sizeof(buf),"iax/%p",m_session); ourcallid=buf; } @@ -770,6 +774,18 @@ void YateIAXConnection::sourceAudio(void *buffer, int len, int format) void YateIAXConnection::disconnected() { Debug(DebugAll,"YateIAXConnection::disconnected()"); + // If we still have a connection this is the last chance to get transferred + if (!m_final) { + Message m("disconnected"); + m.addParam("ourcallid",ourcallid.c_str()); + if (partycallid) { + // Announce our old party but at this point it may be destroyed + m.addParam("partycallid",partycallid.c_str()); + partycallid.clear(); + } + m.userData(this); + Engine::dispatch(m); + } } IAXSource::~IAXSource() @@ -819,19 +835,18 @@ void YateIAXAudioConsumer::Consume(const DataBlock &data) bool SMSHandler::received(Message &msg) { - String partycallid(msg.getValue("partycallid")); - if (!partycallid) + String ourcallid(msg.getValue("partycallid")); + if (!ourcallid) return false; String text(msg.getValue("text")); if (!text) return false; - Debug(DebugInfo,"text %s partycallid %s",text.c_str(),partycallid.c_str()); - YateIAXConnection *conn= iplugin.m_endpoint->findconn(partycallid); - if (!conn) - Debug(DebugInfo,"conn is null"); - else { - text << "\0"; + Debug(DebugInfo,"text %s ourcallid %s",text.c_str(),ourcallid.c_str()); + YateIAXConnection *conn= iplugin.m_endpoint->findconn(ourcallid); + if (conn){ + s_mutex.lock(); ::iax_send_text(conn->session(),(char *)(text.c_str())); + s_mutex.unlock(); return true; } return false; @@ -839,19 +854,19 @@ bool SMSHandler::received(Message &msg) bool DTMFHandler::received(Message &msg) { - String partycallid(msg.getValue("partycallid")); - if (!partycallid) + String ourcallid(msg.getValue("partycallid")); + if (!ourcallid) return false; String text(msg.getValue("text")); if (!text) return false; - Debug(DebugInfo,"text %s partycallid %s",text.c_str(),partycallid.c_str()); - YateIAXConnection *conn= iplugin.m_endpoint->findconn(partycallid); - if (!conn) - Debug(DebugInfo,"conn is null"); - else { + Debug(DebugInfo,"text %s ourcallid %s",text.c_str(), ourcallid.c_str()); + YateIAXConnection *conn= iplugin.m_endpoint->findconn(ourcallid); + if (conn){ + s_mutex.lock(); for (unsigned int i=0;isession(),(text[i])); + s_mutex.unlock(); return true; } return false; @@ -873,9 +888,8 @@ bool IAXHandler::received(Message &msg) YateIAXConnection *conn = new YateIAXConnection(); /* i do this to setup the peercallid by getting ourcallid * from the other party */ - String ourcallid(msg.getValue("ourcallid")); - if (ourcallid) - conn->partycallid = ourcallid; + String partycallid(msg.getValue("ourcallid")); + conn->partycallid = partycallid; conn->calledaddress = dest; int i = conn->makeCall((char *)msg.getValue("caller"),(char *)msg.getValue("callername"),(char *)dest.matchString(1).safe()); if (i < 0) { @@ -885,7 +899,10 @@ bool IAXHandler::received(Message &msg) } DataEndpoint *dd = static_cast(msg.userData()); if (dd && conn->connect(dd)) + { + msg.addParam("partycallid",conn->ourcallid); conn->deref(); + } return true; }; @@ -900,7 +917,7 @@ bool StatusHandler::received(Message &msg) for (; l; l=l->next()) { YateIAXConnection *c = static_cast(l->get()); if (c) { - st << ",iax/" << c->ourcallid << "=" << c->calledaddress << "/" << c->partycallid; + st << "," << c->ourcallid << "=" << c->calledaddress << "/" << c->partycallid; } } msg.retValue() << st << "\n"; @@ -909,8 +926,8 @@ bool StatusHandler::received(Message &msg) bool DropHandler::received(Message &msg) { - String id(msg.getValue("id")); - if (id.null()) { + String ourcallid(msg.getValue("ourcallid")); + if (ourcallid.null()) { Debug("IAXDroper",DebugInfo,"Dropping all calls"); ObjList *l = &iplugin.m_endpoint->calls(); for (; l; l=l->next()) { @@ -919,16 +936,42 @@ bool DropHandler::received(Message &msg) delete c; } } - if (!id.startsWith("iax")) + if (!ourcallid.startsWith("iax/")) return false; - id >> "/"; - YateIAXConnection *conn = iplugin.m_endpoint->findconn(id); + YateIAXConnection *conn = iplugin.m_endpoint->findconn(ourcallid); if (conn) { Debug("IAXDropper",DebugInfo,"Dropping call '%s' [%p]",conn->ourcallid.c_str(),conn); delete conn; return true; } - Debug("IAXDropper",DebugInfo,"Could not find call '%s'",id.c_str()); + Debug("IAXDropper",DebugInfo,"Could not find call '%s'",ourcallid.c_str()); + return false; +} + +bool TransferHandler::received(Message &msg) +{ + String ourcallid(msg.getValue("partycallid")); + if (!ourcallid) + return false; + String callto(msg.getValue("callto")); + if (!callto) + return false; + YateIAXConnection *conn= iplugin.m_endpoint->findconn(ourcallid); + if (conn) { + Debug(DebugInfo,"Transferring connection '%s' [%p] to '%s'", + ourcallid.c_str(),conn,callto.c_str()); + Message m("call"); + m.addParam("callto",callto.c_str()); + m.addParam("ourcallid",conn->ourcallid); + m.userData(conn); + if (Engine::dispatch(m)) { + String partycallid(m.getValue("partycallid")); + Debug(DebugInfo,"IAX [%p] transferred, new partyid '%s'", + conn,partycallid.c_str()); + conn->partycallid = partycallid; + return true; + } + } return false; } @@ -971,6 +1014,7 @@ void IAXPlugin::initialize() Engine::install(new DTMFHandler("dtmf")); Engine::install(new StatusHandler("status")); Engine::install(new DropHandler("drop")); + Engine::install(new TransferHandler("transfer")); } } diff --git a/modules/wavefile.cpp b/modules/wavefile.cpp index 46fb292e..de39280d 100644 --- a/modules/wavefile.cpp +++ b/modules/wavefile.cpp @@ -21,16 +21,20 @@ using namespace TelEngine; class WaveSource : public ThreadedSource { public: - WaveSource(const String& file, DataEndpoint *chan); + WaveSource(const String& file, DataEndpoint *chan, bool autoclose = true); ~WaveSource(); virtual void run(); virtual void cleanup(); + inline void setNotify(const String& id) + { m_id = id; } private: DataEndpoint *m_chan; int m_fd; unsigned m_brate; unsigned m_total; unsigned long long m_time; + String m_id; + bool m_autoclose; }; class WaveConsumer : public DataConsumer @@ -39,12 +43,15 @@ public: WaveConsumer(const String& file, DataEndpoint *chan = 0, unsigned maxlen = 0); ~WaveConsumer(); virtual void Consume(const DataBlock &data); + inline void setNotify(const String& id) + { m_id = id; } private: DataEndpoint *m_chan; int m_fd; unsigned m_total; unsigned m_maxlen; unsigned long long m_time; + String m_id; }; class WaveChan : public DataEndpoint @@ -78,8 +85,8 @@ private: WaveHandler *m_handler; }; -WaveSource::WaveSource(const String& file, DataEndpoint *chan) - : m_chan(chan), m_fd(-1), m_brate(16000), m_total(0), m_time(0) +WaveSource::WaveSource(const String& file, DataEndpoint *chan, bool autoclose) + : m_chan(chan), m_fd(-1), m_brate(16000), m_total(0), m_time(0), m_autoclose(autoclose) { Debug(DebugAll,"WaveSource::WaveSource(\"%s\",%p) [%p]",file.c_str(),chan,this); if (file.endsWith(".gsm")) { @@ -146,13 +153,20 @@ void WaveSource::run() m_total += r; tpos += (r*1000000ULL/m_brate); } while (r > 0); - Debug(DebugAll,"WaveSource [%p] end of data",this); + Debug(DebugAll,"WaveSource [%p] end of data [%p] [%s] ",this,m_chan,m_id.c_str()); + if (m_chan && !m_id.null()) { + Message *m = new Message("notify"); + m->addParam("id",m_id); + m->userData(m_chan); + Engine::enqueue(m); + m_chan->setSource(); + } } void WaveSource::cleanup() { Debug(DebugAll,"WaveSource [%p] cleanup, total=%u",this,m_total); - if (m_chan) + if (m_chan && m_autoclose) m_chan->disconnect(); } @@ -203,6 +217,13 @@ void WaveConsumer::Consume(const DataBlock &data) ::close(m_fd); m_fd = -1; } + if (m_chan && !m_id.null()) { + m_chan->setConsumer(); + Message *m = new Message("notify"); + m->addParam("id",m_id); + m->userData(m_chan); + Engine::enqueue(m); + } #if 0 // This is no good - this should be done in another thread if (m_chan) @@ -349,13 +370,15 @@ bool AttachHandler::received(Message &msg) } if (!src.null()) { - WaveSource* s = new WaveSource(src,dd); + WaveSource* s = new WaveSource(src,dd,false); + s->setNotify(msg.getValue("notify")); dd->setSource(s); s->deref(); } if (!cons.null()) { WaveConsumer* c = new WaveConsumer(cons,dd,maxlen); + c->setNotify(msg.getValue("notify")); dd->setConsumer(c); c->deref(); } diff --git a/yatephone.h b/yatephone.h index 7727c190..132a7e32 100644 --- a/yatephone.h +++ b/yatephone.h @@ -255,6 +255,9 @@ public: DataSource(const char *format = "slin") : DataNode(format), m_translator(0) { } + /** porma */ + ~DataSource(); + /** * Forwards the data to its consumers * @param data The raw data block to forward; an empty block ends data