Added IVR support, fixed some bugs.

git-svn-id: http://voip.null.ro/svn/yate@65 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
diana 2004-09-25 18:22:40 +00:00
parent 800361b9ab
commit 48131b5c2a
5 changed files with 184 additions and 74 deletions

View File

@ -288,6 +288,11 @@ bool DataSource::detach(DataConsumer *consumer)
return false;
}
DataSource::~DataSource()
{
while (detach(static_cast<DataConsumer *>(m_consumers.get()))) ;
}
DataEndpoint::~DataEndpoint()
{
disconnect();
@ -368,17 +373,25 @@ void DataEndpoint::setSource(DataSource *source)
return;
DataConsumer *consumer = m_peer ? m_peer->getConsumer() : 0;
DataSource *temp = m_source;
if (consumer)
consumer->ref();
m_source = 0;
if (temp) {
if (consumer) {
DataTranslator::detachChain(temp,consumer);
if (consumer->getConnSource())
Debug(DebugWarn,"consumer source not cleared in %p",consumer);
}
temp->deref();
}
if (source) {
source->ref();
if (consumer)
DataTranslator::attachChain(source,consumer);
}
m_source = source;
if (temp) {
if (consumer)
DataTranslator::detachChain(temp,consumer);
temp->deref();
}
if (consumer)
consumer->deref();
}
void DataEndpoint::setConsumer(DataConsumer *consumer)

View File

@ -108,11 +108,15 @@ public:
void run();
void cleanup();
void die(bool clearChan = true);
inline void use()
{ m_use++; }
bool unuse();
private:
ExtModReceiver(const char *script, const char *args,
int ain = -1, int aout = -1, ExtModChan *chan = 0);
bool create(const char *script, const char *args);
bool m_dead;
int m_use;
pid_t m_pid;
int m_in, m_out, m_ain, m_aout;
ExtModChan *m_chan;
@ -263,8 +267,10 @@ ExtModChan::ExtModChan(const char *file, const char *args, int type)
ExtModChan::~ExtModChan()
{
Debug(DebugAll,"ExtModChan::~ExtModChan() [%p]",this);
Debugger debug(DebugAll,"ExtModChan::~ExtModChan()"," [%p]",this);
s_chans.remove(this,false);
setSource();
setConsumer();
if (m_recv)
m_recv->die(false);
}
@ -298,8 +304,16 @@ ExtModReceiver* ExtModReceiver::build(const char *script, const char *args,
return recv;
}
bool ExtModReceiver::unuse()
{
int u = --m_use;
if (!u)
destruct();
return (u <= 0);
}
ExtModReceiver::ExtModReceiver(const char *script, const char *args, int ain, int aout, ExtModChan *chan)
: m_dead(false), m_pid(-1), m_in(-1), m_out(-1), m_ain(ain), m_aout(aout),
: m_dead(false), m_use(0), m_pid(-1), m_in(-1), m_out(-1), m_ain(ain), m_aout(aout),
m_chan(chan), m_script(script), m_args(args)
{
Debug(DebugAll,"ExtModReceiver::ExtModReceiver(\"%s\",\"%s\") [%p]",script,args,this);
@ -335,6 +349,24 @@ void ExtModReceiver::die(bool clearChan)
return;
Debug(DebugAll,"ExtModReceiver::die() pid=%d",m_pid);
m_dead = true;
use();
/* Make sure we release all pending messages and not accept new ones */
if (!Engine::exiting())
m_relays.clear();
else {
ObjList *p = &m_relays;
for (; p; p=p->next())
p->setDelete(false);
}
if (m_waiting.get()) {
m_waiting.clear();
Thread::yield();
}
ExtModChan *chan = m_chan;
m_chan = 0;
chan->setRecv(0);
/* Give the external script a chance to die gracefully */
if (m_out != -1) {
::close(m_out);
@ -350,31 +382,17 @@ void ExtModReceiver::die(bool clearChan)
}
if (m_pid > 0)
Debug(DebugInfo,"ExtModReceiver::die() pid=%d did not exit?",m_pid);
/* Make sure we release all pending messages and not accept new ones */
if (!Engine::exiting())
m_relays.clear();
else {
ObjList *p = &m_relays;
for (; p; p=p->next())
p->setDelete(false);
}
if (m_waiting.get()) {
m_waiting.clear();
Thread::yield();
}
/* Now terminate the process and close its stdout pipe */
if (m_pid > 0)
::kill(m_pid,SIGTERM);
if (m_in != -1) {
::close(m_in);
m_in = -1;
}
if (m_chan) {
m_chan->setRecv(0);
if (clearChan)
m_chan->disconnect();
m_chan = 0;
}
if (m_pid > 0)
::kill(m_pid,SIGTERM);
if (chan && clearChan)
chan->disconnect();
unuse();
}
bool ExtModReceiver::received(Message &msg, int id)
@ -385,6 +403,7 @@ bool ExtModReceiver::received(Message &msg, int id)
if (m_reenter.find(&msg))
return false;
use();
MsgHolder h(msg);
m_waiting.append(&h)->setDelete(false);
#ifdef DEBUG
@ -396,6 +415,7 @@ bool ExtModReceiver::received(Message &msg, int id)
#ifdef DEBUG
Debug(DebugAll,"ExtMod [%p] message '%s' [%p] returning %s",this,msg.c_str(),&msg, h.m_ret ? "true" : "false");
#endif
unuse();
return h.m_ret;
}
@ -498,7 +518,7 @@ void ExtModReceiver::cleanup()
Debug(DebugMild, "Failed waitpid on %d: %s",m_pid,strerror(errno));
m_pid = 0;
}
destruct();
unuse();
}
void ExtModReceiver::run()
@ -507,16 +527,20 @@ void ExtModReceiver::run()
m_pid = 0;
return;
}
use();
char buffer[1024];
int posinbuf = 0;
#ifdef DEBUG
Debug(DebugAll,"ExtModReceiver::run() entering loop [%p]",this);
#endif
for (;;) {
use();
int readsize = (m_in >= 0) ? ::read(m_in,buffer+posinbuf,sizeof(buffer)-posinbuf-1) : 0;
#ifdef DEBUG
Debug(DebugAll,"ExtModReceiver::run() read %d",readsize);
#endif
if (unuse())
return;
if (!readsize) {
Debug("ExtModule",DebugInfo,"Read EOF on %d [%p]",m_in,this);
if (m_chan && m_chan->running())
@ -536,8 +560,11 @@ void ExtModReceiver::run()
if (!eoline)
break;
*eoline=0;
use();
if (buffer[0])
processLine(buffer);
if (unuse())
return;
totalsize -= eoline-buffer+1;
::memmove(buffer,eoline+1,totalsize+1);
}

View File

@ -64,7 +64,6 @@ private:
unsigned m_total;
unsigned long long m_time;
};
class YateIAXAudioConsumer : public DataConsumer
{
public:
@ -84,6 +83,7 @@ private:
unsigned long long m_time;
};
class YateIAXEndPoint : public Thread
{
public:
@ -97,7 +97,7 @@ public:
void run(void);
void terminateall(void);
YateIAXConnection *findconn(iax_session *session);
YateIAXConnection *findconn(String ourcallid);
YateIAXConnection *findconn(const String& ourcallid);
void handleEvent(iax_event *event);
inline ObjList &calls()
@ -179,6 +179,13 @@ public:
virtual bool received(Message &msg);
};
class TransferHandler : public MessageHandler
{
public:
TransferHandler(const char *name) : MessageHandler(name,100) { }
virtual bool received(Message &msg);
};
static IAXPlugin iplugin;
static void iax_err_cb(const char *s)
@ -192,7 +199,6 @@ static void iax_out_cb(const char *s)
Debug("IAX",DebugInfo,"%s",s);
}
YateIAXEndPoint::YateIAXEndPoint()
: Thread("IAX EndPoint")
{
@ -221,7 +227,7 @@ bool YateIAXEndPoint::Init(void)
iax_set_output(iax_out_cb);
int tos = s_cfg.getIntValue("general","tos",dict_tos,0);
if (tos)
::setsockopt(iax_get_fd(),IPPROTO_IP,IP_TOS,&tos,sizeof(tos));
::setsockopt(iax_get_fd(),IPPROTO_IP,IP_TOS,&tos,sizeof(tos));
return true;
}
@ -258,7 +264,6 @@ void YateIAXEndPoint::Setup(void)
void YateIAXEndPoint::terminateall(void)
{
Debug(DebugInfo,"YateIAXEndPoint::terminateall()");
m_calls.clear();
}
@ -461,8 +466,8 @@ void YateIAXEndPoint::answer(iax_event *e)
*m = "call";
m->userData(conn);
m->addParam("callto",m->retValue());
m->addParam("partycallid",conn->ourcallid);
m->retValue() = 0;
m->addParam("ourcallid",conn->ourcallid);
m->retValue().clear();
if (!Engine::dispatch(m))
{
conn->reject("I haven't been able to connect you with the other module");
@ -470,12 +475,11 @@ void YateIAXEndPoint::answer(iax_event *e)
delete m;
return;
}
/* i do this to setup the peercallid by getting ourcallid
* from the other party */
String ourcallid(m->getValue("ourcallid"));
Debug(DebugInfo,"partycallid %s",ourcallid.c_str());
if (ourcallid)
conn->partycallid = ourcallid;
/* i do this to setup the peercallid by getting
* partycallid (that mean ourcallid from the other party) */
String partycallid(m->getValue("partycallid"));
Debug(DebugInfo,"partycallid %s",partycallid.c_str());
conn->partycallid = partycallid;
conn->deref();
s_mutex.lock();
::iax_answer(e->session);
@ -574,7 +578,7 @@ YateIAXConnection * YateIAXEndPoint::findconn(iax_session *session)
return 0;
}
YateIAXConnection * YateIAXEndPoint::findconn(String ourcallid)
YateIAXConnection * YateIAXEndPoint::findconn(const String& ourcallid)
{
ObjList *p = &m_calls;
for (; p; p=p->next()) {
@ -599,7 +603,7 @@ YateIAXConnection::YateIAXConnection(iax_session *session)
iplugin.m_endpoint->calls().append(this);
::iax_set_private(m_session,this);
char buf[64];
snprintf(buf,sizeof(buf),"%p",m_session);
snprintf(buf,sizeof(buf),"iax/%p",m_session);
ourcallid=buf;
}
@ -770,6 +774,18 @@ void YateIAXConnection::sourceAudio(void *buffer, int len, int format)
void YateIAXConnection::disconnected()
{
Debug(DebugAll,"YateIAXConnection::disconnected()");
// If we still have a connection this is the last chance to get transferred
if (!m_final) {
Message m("disconnected");
m.addParam("ourcallid",ourcallid.c_str());
if (partycallid) {
// Announce our old party but at this point it may be destroyed
m.addParam("partycallid",partycallid.c_str());
partycallid.clear();
}
m.userData(this);
Engine::dispatch(m);
}
}
IAXSource::~IAXSource()
@ -819,19 +835,18 @@ void YateIAXAudioConsumer::Consume(const DataBlock &data)
bool SMSHandler::received(Message &msg)
{
String partycallid(msg.getValue("partycallid"));
if (!partycallid)
String ourcallid(msg.getValue("partycallid"));
if (!ourcallid)
return false;
String text(msg.getValue("text"));
if (!text)
return false;
Debug(DebugInfo,"text %s partycallid %s",text.c_str(),partycallid.c_str());
YateIAXConnection *conn= iplugin.m_endpoint->findconn(partycallid);
if (!conn)
Debug(DebugInfo,"conn is null");
else {
text << "\0";
Debug(DebugInfo,"text %s ourcallid %s",text.c_str(),ourcallid.c_str());
YateIAXConnection *conn= iplugin.m_endpoint->findconn(ourcallid);
if (conn){
s_mutex.lock();
::iax_send_text(conn->session(),(char *)(text.c_str()));
s_mutex.unlock();
return true;
}
return false;
@ -839,19 +854,19 @@ bool SMSHandler::received(Message &msg)
bool DTMFHandler::received(Message &msg)
{
String partycallid(msg.getValue("partycallid"));
if (!partycallid)
String ourcallid(msg.getValue("partycallid"));
if (!ourcallid)
return false;
String text(msg.getValue("text"));
if (!text)
return false;
Debug(DebugInfo,"text %s partycallid %s",text.c_str(),partycallid.c_str());
YateIAXConnection *conn= iplugin.m_endpoint->findconn(partycallid);
if (!conn)
Debug(DebugInfo,"conn is null");
else {
Debug(DebugInfo,"text %s ourcallid %s",text.c_str(), ourcallid.c_str());
YateIAXConnection *conn= iplugin.m_endpoint->findconn(ourcallid);
if (conn){
s_mutex.lock();
for (unsigned int i=0;i<text.length();i++)
::iax_send_dtmf(conn->session(),(text[i]));
s_mutex.unlock();
return true;
}
return false;
@ -873,9 +888,8 @@ bool IAXHandler::received(Message &msg)
YateIAXConnection *conn = new YateIAXConnection();
/* i do this to setup the peercallid by getting ourcallid
* from the other party */
String ourcallid(msg.getValue("ourcallid"));
if (ourcallid)
conn->partycallid = ourcallid;
String partycallid(msg.getValue("ourcallid"));
conn->partycallid = partycallid;
conn->calledaddress = dest;
int i = conn->makeCall((char *)msg.getValue("caller"),(char *)msg.getValue("callername"),(char *)dest.matchString(1).safe());
if (i < 0) {
@ -885,7 +899,10 @@ bool IAXHandler::received(Message &msg)
}
DataEndpoint *dd = static_cast<DataEndpoint *>(msg.userData());
if (dd && conn->connect(dd))
{
msg.addParam("partycallid",conn->ourcallid);
conn->deref();
}
return true;
};
@ -900,7 +917,7 @@ bool StatusHandler::received(Message &msg)
for (; l; l=l->next()) {
YateIAXConnection *c = static_cast<YateIAXConnection *>(l->get());
if (c) {
st << ",iax/" << c->ourcallid << "=" << c->calledaddress << "/" << c->partycallid;
st << "," << c->ourcallid << "=" << c->calledaddress << "/" << c->partycallid;
}
}
msg.retValue() << st << "\n";
@ -909,8 +926,8 @@ bool StatusHandler::received(Message &msg)
bool DropHandler::received(Message &msg)
{
String id(msg.getValue("id"));
if (id.null()) {
String ourcallid(msg.getValue("ourcallid"));
if (ourcallid.null()) {
Debug("IAXDroper",DebugInfo,"Dropping all calls");
ObjList *l = &iplugin.m_endpoint->calls();
for (; l; l=l->next()) {
@ -919,16 +936,42 @@ bool DropHandler::received(Message &msg)
delete c;
}
}
if (!id.startsWith("iax"))
if (!ourcallid.startsWith("iax/"))
return false;
id >> "/";
YateIAXConnection *conn = iplugin.m_endpoint->findconn(id);
YateIAXConnection *conn = iplugin.m_endpoint->findconn(ourcallid);
if (conn) {
Debug("IAXDropper",DebugInfo,"Dropping call '%s' [%p]",conn->ourcallid.c_str(),conn);
delete conn;
return true;
}
Debug("IAXDropper",DebugInfo,"Could not find call '%s'",id.c_str());
Debug("IAXDropper",DebugInfo,"Could not find call '%s'",ourcallid.c_str());
return false;
}
bool TransferHandler::received(Message &msg)
{
String ourcallid(msg.getValue("partycallid"));
if (!ourcallid)
return false;
String callto(msg.getValue("callto"));
if (!callto)
return false;
YateIAXConnection *conn= iplugin.m_endpoint->findconn(ourcallid);
if (conn) {
Debug(DebugInfo,"Transferring connection '%s' [%p] to '%s'",
ourcallid.c_str(),conn,callto.c_str());
Message m("call");
m.addParam("callto",callto.c_str());
m.addParam("ourcallid",conn->ourcallid);
m.userData(conn);
if (Engine::dispatch(m)) {
String partycallid(m.getValue("partycallid"));
Debug(DebugInfo,"IAX [%p] transferred, new partyid '%s'",
conn,partycallid.c_str());
conn->partycallid = partycallid;
return true;
}
}
return false;
}
@ -971,6 +1014,7 @@ void IAXPlugin::initialize()
Engine::install(new DTMFHandler("dtmf"));
Engine::install(new StatusHandler("status"));
Engine::install(new DropHandler("drop"));
Engine::install(new TransferHandler("transfer"));
}
}

View File

@ -21,16 +21,20 @@ using namespace TelEngine;
class WaveSource : public ThreadedSource
{
public:
WaveSource(const String& file, DataEndpoint *chan);
WaveSource(const String& file, DataEndpoint *chan, bool autoclose = true);
~WaveSource();
virtual void run();
virtual void cleanup();
inline void setNotify(const String& id)
{ m_id = id; }
private:
DataEndpoint *m_chan;
int m_fd;
unsigned m_brate;
unsigned m_total;
unsigned long long m_time;
String m_id;
bool m_autoclose;
};
class WaveConsumer : public DataConsumer
@ -39,12 +43,15 @@ public:
WaveConsumer(const String& file, DataEndpoint *chan = 0, unsigned maxlen = 0);
~WaveConsumer();
virtual void Consume(const DataBlock &data);
inline void setNotify(const String& id)
{ m_id = id; }
private:
DataEndpoint *m_chan;
int m_fd;
unsigned m_total;
unsigned m_maxlen;
unsigned long long m_time;
String m_id;
};
class WaveChan : public DataEndpoint
@ -78,8 +85,8 @@ private:
WaveHandler *m_handler;
};
WaveSource::WaveSource(const String& file, DataEndpoint *chan)
: m_chan(chan), m_fd(-1), m_brate(16000), m_total(0), m_time(0)
WaveSource::WaveSource(const String& file, DataEndpoint *chan, bool autoclose)
: m_chan(chan), m_fd(-1), m_brate(16000), m_total(0), m_time(0), m_autoclose(autoclose)
{
Debug(DebugAll,"WaveSource::WaveSource(\"%s\",%p) [%p]",file.c_str(),chan,this);
if (file.endsWith(".gsm")) {
@ -146,13 +153,20 @@ void WaveSource::run()
m_total += r;
tpos += (r*1000000ULL/m_brate);
} while (r > 0);
Debug(DebugAll,"WaveSource [%p] end of data",this);
Debug(DebugAll,"WaveSource [%p] end of data [%p] [%s] ",this,m_chan,m_id.c_str());
if (m_chan && !m_id.null()) {
Message *m = new Message("notify");
m->addParam("id",m_id);
m->userData(m_chan);
Engine::enqueue(m);
m_chan->setSource();
}
}
void WaveSource::cleanup()
{
Debug(DebugAll,"WaveSource [%p] cleanup, total=%u",this,m_total);
if (m_chan)
if (m_chan && m_autoclose)
m_chan->disconnect();
}
@ -203,6 +217,13 @@ void WaveConsumer::Consume(const DataBlock &data)
::close(m_fd);
m_fd = -1;
}
if (m_chan && !m_id.null()) {
m_chan->setConsumer();
Message *m = new Message("notify");
m->addParam("id",m_id);
m->userData(m_chan);
Engine::enqueue(m);
}
#if 0
// This is no good - this should be done in another thread
if (m_chan)
@ -349,13 +370,15 @@ bool AttachHandler::received(Message &msg)
}
if (!src.null()) {
WaveSource* s = new WaveSource(src,dd);
WaveSource* s = new WaveSource(src,dd,false);
s->setNotify(msg.getValue("notify"));
dd->setSource(s);
s->deref();
}
if (!cons.null()) {
WaveConsumer* c = new WaveConsumer(cons,dd,maxlen);
c->setNotify(msg.getValue("notify"));
dd->setConsumer(c);
c->deref();
}

View File

@ -255,6 +255,9 @@ public:
DataSource(const char *format = "slin")
: DataNode(format), m_translator(0) { }
/** porma */
~DataSource();
/**
* Forwards the data to its consumers
* @param data The raw data block to forward; an empty block ends data