Added common pre-answer timeout in class Channel.

Fixed bugs in disconnecting wave sources.
Added option settings for external modules.


git-svn-id: http://yate.null.ro/svn/yate/trunk@559 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
paulc 2005-11-04 19:30:47 +00:00
parent 2bb1415c8c
commit d8c52bc7bc
12 changed files with 247 additions and 72 deletions

View File

@ -143,6 +143,26 @@ properly or not.<br />
&lt;success&gt; - boolean (&quot;true&quot; or &quot;false&quot;) success of operation<br />
</p>
<p><b>Keyword: %%&gt;setlocal</b><br />
%%&gt;setlocal:&lt;name&gt;:&lt;value&gt;<br />
<b>Direction: Application to engine</b><br />
Always from the application to the engine, requests the change of a local
parameter<br />
The answer to the change request is delivered asynchronously (see below).<br />
&lt;name&gt; - name of the parameter to modify, must not be empty<br />
&lt;value&gt; - new value to set in the local module instance<br />
</p>
<p><b>Keyword: %%&lt;setlocal</b><br />
%%&lt;setlocal:&lt;name&gt;:&lt;value&gt;:&lt;success&gt;<br />
<b>Direction: Engine to application</b><br />
Confirmation from engine to the application that the local parameter has been
changed successfully or not.<br />
&lt;name&gt; - name of the modified parameter<br />
&lt;value&gt; - new value of the local parameter<br />
&lt;success&gt; - boolean (&quot;true&quot; or &quot;false&quot;) success of operation<br />
</p>
<p><b>Keyword: %%&gt;connect</b><br />
%%&gt;connect:&lt;role&gt;[:&lt;id&gt;][:&lt;type&gt;]<br />
<b>Direction: Application to engine</b><br />

View File

@ -208,14 +208,14 @@ DataConsumer* CallEndpoint::getConsumer(const char* type) const
Channel::Channel(Driver* driver, const char* id, bool outgoing)
: CallEndpoint(id),
m_driver(driver), m_outgoing(outgoing), m_timeout(0)
m_driver(driver), m_outgoing(outgoing), m_timeout(0), m_maxcall(0)
{
init();
}
Channel::Channel(Driver& driver, const char* id, bool outgoing)
: CallEndpoint(id),
m_driver(&driver), m_outgoing(outgoing), m_timeout(0)
m_driver(&driver), m_outgoing(outgoing), m_timeout(0), m_maxcall(0)
{
init();
}
@ -256,6 +256,7 @@ void Channel::init()
void Channel::cleanup()
{
m_timeout = 0;
m_maxcall = 0;
status("deleted");
m_targetid.clear();
dropChan();
@ -299,6 +300,15 @@ const char* Channel::direction() const
return m_outgoing ? "outgoing" : "incoming";
}
void Channel::setMaxcall(const Message* msg)
{
int tout = msg ? msg->getIntValue("maxcall") : 0;
if (tout > 0)
maxcall(Time::now() + tout*(u_int64_t)1000);
else
maxcall(0);
}
void Channel::complete(Message& msg, bool minimal) const
{
msg.setParam("id",m_id);
@ -366,6 +376,7 @@ bool Channel::msgRinging(Message& msg)
bool Channel::msgAnswered(Message& msg)
{
m_maxcall = 0;
status("answered");
if (m_billid.null())
m_billid = msg.getValue("billid");
@ -384,7 +395,8 @@ bool Channel::msgText(Message& msg, const char* text)
bool Channel::msgDrop(Message& msg, const char* reason)
{
status("dropped");
m_timeout = m_maxcall = 0;
status(null(reason) ? "dropped" : reason);
disconnect(reason);
return true;
}
@ -713,12 +725,15 @@ bool Driver::received(Message &msg, int id)
ObjList* l = &m_chans;
while (l) {
Channel* c = static_cast<Channel*>(l->get());
if (c && c->timeout() && (c->timeout() < t)) {
c->msgDrop(msg,"timeout");
if (l->get() != c)
break;
if (c) {
if (c->timeout() && (c->timeout() < t))
c->msgDrop(msg,"timeout");
else if (c->maxcall() && (c->maxcall() < t))
c->msgDrop(msg,"noanswer");
}
l = l->next();
// advance the pointer only if not dropped synchronously
if (l->get() == c)
l = l->next();
}
unlock();
}

View File

@ -238,7 +238,14 @@ bool MessageDispatcher::install(MessageHandler* handler)
int pos = 0;
for (l=&m_handlers; l; l=l->next(),pos++) {
MessageHandler *h = static_cast<MessageHandler *>(l->get());
if (h && (h->priority() > p))
if (!h)
continue;
if (h->priority() < p)
continue;
if (h->priority() > p)
break;
// at the same priority we sort them in pointer address order
if (h > handler)
break;
}
m_changes++;
@ -305,25 +312,20 @@ bool MessageDispatcher::dispatch(Message& msg)
// the handler list has changed - find again
NDebug(DebugAll,"Rescanning handler list for '%s' [%p] at priority %u",
msg.c_str(),&msg,p);
for (ObjList* l2 = l = &m_handlers; l2; l2=l2->next()) {
MessageHandler *mh = static_cast<MessageHandler*>(l2->get());
for (l = &m_handlers; l; l=l->next()) {
MessageHandler *mh = static_cast<MessageHandler*>(l->get());
if (!mh)
continue;
if (mh == h) {
// exact match - continue where we left
l = l2;
if (mh == h)
// exact match - silently continue where we left
break;
}
// gone past last handler priority - exit with last handler
if (mh->priority() > p) {
unsigned int p2 = l->get() ? static_cast<MessageHandler*>(l->get())->priority() : 0;
Debug(DebugAll,"Handler list for '%s' [%p] changed, skipping back from %u to %u",
msg.c_str(),&msg,p,p2);
if ((mh->priority() > p) || ((mh->priority() == p) && (mh > h))) {
Debug(DebugAll,"Handler list for '%s' [%p] changed, skipping from %p (%u) to %p (%u)",
msg.c_str(),&msg,h,p,mh,mh->priority());
break;
}
// update pointer past already used handlers
if (mh->priority() < p)
l = l2;
}
}
}

View File

@ -235,6 +235,8 @@ bool CdrHandler::received(Message &msg)
cdrs.clear();
return false;
}
if (!msg.getBoolValue("cdrtrack",true))
return false;
String id(msg.getValue("id"));
if (id.null()) {
id = msg.getValue("module");

View File

@ -91,11 +91,18 @@ public:
{ m_running = running; }
inline bool running() const
{ return m_running; }
inline void setDisconn(bool disconn)
{ m_disconn = disconn; }
inline bool disconn() const
{ return m_disconn; }
inline void setId(const String& id)
{ m_id = id; }
private:
ExtModChan(const char *file, const char *args, int type);
ExtModReceiver *m_recv;
int m_type;
bool m_running;
bool m_disconn;
};
class MsgHolder : public GenObject
@ -127,9 +134,9 @@ public:
inline void use()
{ m_use++; }
bool unuse();
inline const String &scriptFile() const
inline const String& scriptFile() const
{ return m_script; }
inline const String &commandArg() const
inline const String& commandArg() const
{ return m_args; }
private:
ExtModReceiver(const char *script, const char *args,
@ -282,7 +289,7 @@ ExtModChan* ExtModChan::build(const char *file, const char *args, int type)
}
ExtModChan::ExtModChan(const char *file, const char *args, int type)
: CallEndpoint("ExtModule"), m_recv(0), m_type(type)
: CallEndpoint("ExtModule"), m_recv(0), m_type(type), m_disconn(false)
{
Debug(DebugAll,"ExtModChan::ExtModChan(%d) [%p]",type,this);
File* reader = 0;
@ -335,7 +342,22 @@ ExtModChan::~ExtModChan()
void ExtModChan::disconnected(bool final, const char *reason)
{
Debugger debug("ExtModChan::disconnected()"," '%s' [%p]",reason,this);
Debug(DebugAll,"ExtModChan::disconnected() '%s' [%p]",reason,this);
if (final || Engine::exiting())
return;
if (m_disconn) {
Message* m = new Message("chan.disconnected");
m->userData(this);
m->addParam("id",m_id);
m->addParam("module","external");
if (m_recv)
m->addParam("address",m_recv->scriptFile());
if (reason)
m->addParam("reason",reason);
if (getPeer())
m->addParam("peerid",getPeer()->id());
Engine::enqueue(m);
}
}
MsgHolder::MsgHolder(Message &msg)
@ -516,7 +538,7 @@ void ExtModReceiver::die(bool clearChan)
if (m_pid > 0)
::kill(m_pid,SIGTERM);
if (chan && clearChan)
chan->disconnect();
chan->disconnect();
unuse();
}
@ -786,6 +808,29 @@ void ExtModReceiver::processLine(const char *line)
outputLine(out);
return;
}
else if (startSkip(id,"%%>setlocal:")) {
int col = id.find(':');
if (col > 0) {
String val(id.substr(col+1));
id = id.substr(0,col);
bool ok = false;
if (m_chan && (id == "id")) {
m_chan->setId(val);
ok = true;
}
else if (m_chan && (id == "disconnected")) {
m_chan->setDisconn(val.toBoolean(m_chan->disconn()));
val = m_chan->disconn();
ok = true;
}
Debug("ExtModReceiver",DebugAll,"Set '%s'='%s' %s",
id.c_str(),val.c_str(),ok ? "ok" : "failed");
String out("%%<setlocal:");
out << id << ":" << val << ":" << ok;
outputLine(out);
return;
}
}
else {
Message* m = new Message("");
if (m->decode(line,id) == -2) {
@ -856,10 +901,6 @@ bool ExtModHandler::received(Message &msg)
dest.matchString(3).trimBlanks().c_str());
return r ? r->received(msg,1) : false;
}
if (typ != ExtModChan::DataNone && !ch) {
Debug(DebugGoOn,"ExtMod '%s' call found but no data channel!",t.c_str());
return false;
}
ExtModChan *em = ExtModChan::build(dest.matchString(2).c_str(),
dest.matchString(3).c_str(),typ);
if (!em) {
@ -871,8 +912,9 @@ bool ExtModHandler::received(Message &msg)
em->deref();
return false;
}
if (ch && ch->connect(em))
em->deref();
if (ch)
ch->connect(em);
em->deref();
return true;
}

View File

@ -1089,6 +1089,7 @@ void YateH323Connection::OnEstablished()
return;
}
m_chan->status("answered");
m_chan->maxcall(0);
Message *m = m_chan->message("call.answered",false,true);
if (m_passtrough) {
if (m_remotePort) {
@ -1762,6 +1763,7 @@ YateH323Chan::YateH323Chan(YateH323Connection* conn,Message* msg,const char* add
conn,addr,direction(),this);
m_address = addr;
m_address.startSkip("ip$",false);
setMaxcall(msg);
Message* s = message("chan.startup");
if (msg) {
s->setParam("caller",msg->getValue("caller"));

View File

@ -496,6 +496,7 @@ IAXConnection::IAXConnection(Driver* driver, const char* addr, iax_session* sess
m_session = ::iax_session_new();
::iax_set_private(m_session,this);
s_mutex.unlock();
setMaxcall(msg);
Message* s = message("chan.startup");
s->setParam("direction",status());
if (msg) {
@ -611,6 +612,7 @@ void IAXConnection::handleEvent(iax_event *event)
break;
case IAX_EVENT_ANSWER:
Debug(this,DebugInfo,"IAX ANSWER inside a call [%p]",this);
maxcall(0);
Engine::enqueue(message("call.answered"));
startAudio(event->ies.format,event->ies.capability);
break;

View File

@ -755,6 +755,7 @@ void PriChan::answered()
m_timeout = 0;
status(chanStatus());
Debug(this,DebugInfo,"Remote answered on %s (%s)",id().c_str(),address().c_str());
maxcall(0);
Message *m = message("call.answered");
m->addParam("span",String(m_span->span()));
m->addParam("channel",String(m_chan));
@ -833,6 +834,7 @@ bool PriChan::call(Message &msg, const char *called)
#endif
setTimeout(30000000);
status(chanStatus());
setMaxcall(msg);
Message *m = message("chan.startup");
m->setParam("caller",msg.getValue("caller"));
m->setParam("called",msg.getValue("called"));

View File

@ -39,13 +39,13 @@ public:
~WaveSource();
virtual void run();
virtual void cleanup();
inline void setNotify(const String& id)
{ m_id = id; }
void setNotify(const String& id);
private:
void detectAuFormat();
void detectWavFormat();
void detectIlbcFormat();
bool computeDataRate();
void notify();
CallEndpoint* m_chan;
DataBlock m_data;
int m_fd;
@ -81,15 +81,18 @@ public:
~WaveChan();
};
class ConsDisconnector : public Thread
class Disconnector : public Thread
{
public:
ConsDisconnector(CallEndpoint* chan, const String& id)
: m_chan(chan), m_id(id) { }
Disconnector(CallEndpoint* chan, const String& id, bool source, bool disc);
virtual ~Disconnector();
virtual void run();
bool init();
private:
CallEndpoint* m_chan;
String m_id;
Message* m_msg;
bool m_source;
bool m_disc;
};
class AttachHandler : public MessageHandler
@ -129,9 +132,10 @@ WaveSource::WaveSource(const String& file, CallEndpoint* chan, bool autoclose)
}
m_fd = ::open(file.safe(),O_RDONLY|O_NOCTTY);
if (m_fd < 0) {
Debug(DebugGoOn,"Opening '%s': error %d: %s",
Debug(DebugWarn,"Opening '%s': error %d: %s",
file.c_str(), errno, ::strerror(errno));
m_format.clear();
notify();
return;
}
if (file.endsWith(".gsm"))
@ -154,8 +158,10 @@ WaveSource::WaveSource(const String& file, CallEndpoint* chan, bool autoclose)
Debug(DebugMild,"Unknown format for file '%s', assuming signed linear",file.c_str());
if (computeDataRate())
start("WaveSource");
else
else {
Debug(DebugWarn,"Unable to compute data rate for file '%s'",file.c_str());
notify();
}
}
WaveSource::~WaveSource()
@ -295,23 +301,27 @@ void WaveSource::run()
tpos += (r*(u_int64_t)1000000/m_brate);
} while (r > 0);
Debug(&__plugin,DebugAll,"WaveSource [%p] end of data [%p] [%s] ",this,m_chan,m_id.c_str());
if (m_id) {
Message *m = new Message("chan.notify");
m->addParam("targetid",m_id);
m->userData(m_chan);
Engine::enqueue(m);
if (m_chan && (m_chan->getSource() == this))
m_chan->setSource();
}
notify();
}
void WaveSource::cleanup()
{
Debug(&__plugin,DebugAll,"WaveSource [%p] cleanup, total=%u",this,m_total);
if (m_chan && m_autoclose)
m_chan->disconnect("eof");
if (!m_chan)
deref();
}
void WaveSource::setNotify(const String& id)
{
m_id = id;
if (m_fd < 0)
notify();
}
void WaveSource::notify()
{
if (m_id || m_autoclose) {
Disconnector *disc = new Disconnector(m_chan,m_id,true,m_autoclose);
disc->init();
}
}
WaveConsumer::WaveConsumer(const String& file, CallEndpoint* chan, unsigned maxlen)
@ -333,7 +343,7 @@ WaveConsumer::WaveConsumer(const String& file, CallEndpoint* chan, unsigned maxl
m_format = "ilbc30";
m_fd = ::creat(file.safe(),S_IRUSR|S_IWUSR);
if (m_fd < 0)
Debug(DebugGoOn,"Creating '%s': error %d: %s",
Debug(DebugWarn,"Creating '%s': error %d: %s",
file.c_str(), errno, ::strerror(errno));
}
@ -368,31 +378,58 @@ void WaveConsumer::Consume(const DataBlock& data, unsigned long tStamp)
m_fd = -1;
}
if (m_chan) {
ConsDisconnector *disc = new ConsDisconnector(m_chan,m_id);
Disconnector *disc = new Disconnector(m_chan,m_id,false,false);
m_chan = 0;
if (disc->error()) {
Debug(DebugFail,"Error creating disconnector thread %p",disc);
delete disc;
}
else
disc->startup();
disc->init();
}
}
}
}
void ConsDisconnector::run()
Disconnector::Disconnector(CallEndpoint* chan, const String& id, bool source, bool disc)
: m_chan(chan), m_msg(0), m_source(source), m_disc(disc)
{
DDebug(&__plugin,DebugAll,"ConsDisconnector chan=%p id='%s'",m_chan,m_id.c_str());
if (m_id) {
m_chan->setConsumer();
if (id) {
Message *m = new Message("chan.notify");
m->addParam("targetid",m_id);
m->userData(m_chan);
Engine::enqueue(m);
m->addParam("targetid",id);
m->userData(chan);
m_msg = m;
}
}
Disconnector::~Disconnector()
{
if (m_msg)
Engine::enqueue(m_msg);
}
bool Disconnector::init()
{
if (error()) {
Debug(DebugFail,"Error creating disconnector thread %p",this);
delete this;
return false;
}
startup();
return true;
}
void Disconnector::run()
{
DDebug(&__plugin,DebugAll,"Disconnector::run() chan=%p msg=%p",m_chan,m_msg);
if (!m_chan)
return;
if (m_source) {
m_chan->setSource();
if (m_disc)
m_chan->disconnect("eof");
}
else {
if (m_msg)
m_chan->setConsumer();
else
m_chan->disconnect();
}
else
m_chan->disconnect();
}
WaveChan::WaveChan(const String& file, bool record, unsigned maxlen)
@ -498,8 +535,8 @@ bool AttachHandler::received(Message &msg)
if (!src.null()) {
WaveSource* s = new WaveSource(src,ch,false);
s->setNotify(msg.getValue("notify"));
ch->setSource(s);
s->setNotify(msg.getValue("notify"));
s->deref();
msg.clearParam("source");
}

View File

@ -1296,7 +1296,10 @@ YateSIPConnection::YateSIPConnection(SIPEvent* ev, SIPTransaction* tr)
}
DDebug(this,DebugAll,"RTP addr '%s' [%p]",m_rtpAddr.c_str(),this);
m_route = m;
Engine::enqueue(message("chan.startup"));
Message* s = message("chan.startup");
s->addParam("caller",m_uri.getUser());
s->addParam("called",uri.getUser());
Engine::enqueue(s);
}
// Outgoing call constructor - in call.execute handler
@ -1364,6 +1367,7 @@ YateSIPConnection::YateSIPConnection(Message& msg, const String& uri, const char
m_tr->setUserData(this);
}
m->deref();
setMaxcall(msg);
Message* s = message("chan.startup");
s->setParam("caller",msg.getValue("caller"));
s->setParam("called",msg.getValue("called"));
@ -1929,6 +1933,7 @@ bool YateSIPConnection::process(SIPEvent* ev)
}
setReason("",0);
setStatus("answered",Established);
maxcall(0);
Message *m = message("call.answered");
addRtpParams(*m,natAddr);
Engine::enqueue(m);

View File

@ -140,6 +140,18 @@ class Yate
print "%%>uninstall:$name\n";
}
/**
* Changes a local module parameter
* @param $name Name of the parameter to modify
* @param $value New value to set in the parameter
*/
function SetLocal($name, $value)
{
$name=Yate::Escape($name);
$value=Yate::Escape($value);
print "%%>setlocal:$name:$value\n";
}
/**
* Constructor. Creates a new outgoing message
* @param $name Name of the new message
@ -278,6 +290,12 @@ class Yate
$ev->type="uninstalled";
$ev->handled=Yate::Str2bool($part[3]);
break;
case "%%<setlocal":
/* local parameter answer str_name:str_value:bool_success */
$ev=new Yate(Yate::Unescape($part[1]),Yate::Unescape($part[2]));
$ev->type="setlocal";
$ev->handled=Yate::Str2bool($part[3]);
break;
case "Error in":
/* We are already in error so better stay quiet */
break;

View File

@ -1127,6 +1127,7 @@ private:
Driver* m_driver;
bool m_outgoing;
u_int64_t m_timeout;
u_int64_t m_maxcall;
protected:
String m_status;
@ -1297,6 +1298,33 @@ public:
inline void timeout(u_int64_t tout)
{ m_timeout = tout; }
/**
* Get the time this channel will time out on outgoing calls
* @return Timeout time or zero if no timeout
*/
inline u_int64_t maxcall() const
{ return m_maxcall; }
/**
* Set the time this channel will time out on outgoing calls
* @param tout New timeout time or zero to disable
*/
inline void maxcall(u_int64_t tout)
{ m_maxcall = tout; }
/**
* Set the time this channel will time out on outgoing calls
* @param msg Reference of message possibly holding "maxcall" parameter
*/
inline void setMaxcall(const Message& msg)
{ setMaxcall(&msg); }
/**
* Set the time this channel will time out on outgoing calls
* @param msg Pointer to message possibly holding "maxcall" parameter
*/
void setMaxcall(const Message* msg);
/**
* Get the connected channel identifier.
* @return A String holding the unique channel id of the target or an empty