Reduced thread stack size. Proper signalling in H323. Changed status format

in Zaptel.


git-svn-id: http://yate.null.ro/svn/yate/trunk@150 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
paulc 2004-12-31 01:09:21 +00:00
parent 07de9b28e5
commit 84a24b93d4
7 changed files with 596 additions and 79 deletions

View File

@ -128,7 +128,7 @@ SIPMessage::SIPMessage(SIPParty* ep, const char *buf, int len)
SIPMessage::SIPMessage(const SIPMessage* message, int _code, const char* _reason)
: code(_code), reason(_reason), body(0),
m_ep(0), m_valid(false),
m_answer(true), m_outgoing(true), m_cseq(-1)
m_answer(true), m_outgoing(true), m_ack(false), m_cseq(-1)
{
Debug(DebugAll,"SIPMessage::SIPMessage(%p,%d,'%s') [%p]",
message,_code,_reason,this);
@ -182,8 +182,14 @@ SIPMessage::~SIPMessage()
setBody();
}
void SIPMessage::complete(SIPEngine* engine, const char* user, const char* domain)
void SIPMessage::complete(SIPEngine* engine, const char* user, const char* domain, const char* dlgTag)
{
Debug("SIPMessage",DebugAll,"complete(%p,'%s','%s','%s')%s%s%s [%p]",
engine,user,domain,dlgTag,
isACK() ? " ACK" : "",
isOutgoing() ? " OUT" : "",
isAnswer() ? " ANS" : "",
this);
if (!engine)
return;
@ -212,6 +218,12 @@ void SIPMessage::complete(SIPEngine* engine, const char* user, const char* domai
tmp << (int)::random();
hl->addParam("branch",tmp);
}
if (isAnswer()) {
if (!hl->getParam("received"))
hl->addParam("received",getParty()->getPartyAddr());
if (!hl->getParam("rport"))
hl->addParam("rport",String(getParty()->getPartyPort()));
}
hl = const_cast<HeaderLine*>(getHeader("From"));
if (!hl) {
@ -223,11 +235,15 @@ void SIPMessage::complete(SIPEngine* engine, const char* user, const char* domai
if (!hl->getParam("tag"))
hl->addParam("tag",String((int)::random()));
if (!getHeader("To")) {
hl = const_cast<HeaderLine*>(getHeader("To"));
if (!hl) {
String tmp;
tmp << "<" << uri << ">";
addHeader("To",tmp);
hl = new HeaderLine("To",tmp);
header.append(hl);
}
if (dlgTag && !hl->getParam("tag"))
hl->addParam("tag",dlgTag);
if (!getHeader("Call-ID")) {
String tmp;
@ -247,6 +263,12 @@ void SIPMessage::complete(SIPEngine* engine, const char* user, const char* domai
addHeader("Max-Forwards",tmp);
}
if (!getHeader("Contact")) {
String tmp;
tmp << "<sip:" << user << "@" << getParty()->getLocalAddr() << ":" << getParty()->getLocalPort() << ">";
addHeader("Contact",tmp);
}
if (!(getHeader("User-Agent") || engine->getUserAgent().null()))
addHeader("User-Agent",engine->getUserAgent());
@ -365,6 +387,10 @@ bool SIPMessage::parse(const char* buf, int len)
if ((m_cseq < 0) && (name &= "CSeq")) {
String seq = *line;
seq >> m_cseq;
if (m_answer) {
seq.trimBlanks().toUpper();
method = seq;
}
}
line->destruct();
}

View File

@ -32,7 +32,7 @@ using namespace TelEngine;
SIPTransaction::SIPTransaction(SIPMessage* message, SIPEngine* engine, bool outgoing)
: m_outgoing(outgoing), m_invite(false), m_transmit(false), m_state(Invalid), m_timeout(0),
m_firstMessage(message), m_lastMessage(0), m_pending(0), m_engine(engine)
m_firstMessage(message), m_lastMessage(0), m_pending(0), m_engine(engine), m_private(0)
{
Debug(DebugAll,"SIPTransaction::SIPTransaction(%p,%p) [%p]",message,engine,this);
if (m_firstMessage) {
@ -42,6 +42,9 @@ SIPTransaction::SIPTransaction(SIPMessage* message, SIPEngine* engine, bool outg
m_branch = *ns;
if (!m_branch.startsWith("z9hG4bK"))
m_branch.clear();
ns = message->getParam("To","tag");
if (ns)
m_tag = *ns;
const HeaderLine* hl = message->getHeader("Call-ID");
if (hl)
m_callid = *hl;
@ -117,7 +120,9 @@ void SIPTransaction::setLatestMessage(SIPMessage* message)
m_lastMessage = message;
if (m_lastMessage) {
m_lastMessage->ref();
m_lastMessage->complete(m_engine);
if (message->isAnswer() && (message->code > 100) && m_tag.null())
m_tag = (int)::random();
message->complete(m_engine,0,0,m_tag);
}
}
@ -219,6 +224,15 @@ void SIPTransaction::setResponse(int code, const char* reason)
Debug(DebugWarn,"setResponse(%d,'%s') in client transaction [%p]",code,reason,this);
return;
}
switch (m_state) {
case Invalid:
case Retrans:
case Finish:
case Cleared:
Debug("SIPTransaction",DebugInfo,"Ignoring setResponse(%d) in state %s [%p]",
code,stateName(m_state),this);
return;
}
SIPMessage* msg = new SIPMessage(m_firstMessage, code, reason);
setResponse(msg);
msg->deref();
@ -226,7 +240,7 @@ void SIPTransaction::setResponse(int code, const char* reason)
bool SIPTransaction::processMessage(SIPMessage* message, const String& branch)
{
Debug("SIPTransaction",DebugAll,"processMessage(%p,'%s') [%p]",
DDebug("SIPTransaction",DebugAll,"processMessage(%p,'%s') [%p]",
message,branch.c_str(),this);
if (branch) {
if (branch != m_branch)
@ -240,6 +254,19 @@ bool SIPTransaction::processMessage(SIPMessage* message, const String& branch)
Debug("SIPTransaction",DebugWarn,"Non-branch matching not implemented!");
return false;
}
Debug("SIPTransaction",DebugAll,"Processing %s %p '%s' in [%p]",
message->isAnswer() ? "answer" : "request",
message,message->method.c_str(),this);
if (m_tag.null() && message->isAnswer()) {
const NamedString* ns = message->getParam("To","tag");
if (ns) {
m_tag = *ns;
Debug("SIPTransaction",DebugInfo,"Found dialog tag '%s' [%p]",
m_tag.c_str(),this);
}
}
if (isOutgoing())
processClientMessage(message,m_state);
else

View File

@ -204,7 +204,7 @@ public:
/**
* Complete missing fields with defaults taken from a SIP engine
*/
void complete(SIPEngine* engine, const char* user = 0, const char* domain = 0);
void complete(SIPEngine* engine, const char* user = 0, const char* domain = 0, const char* dlgTag = 0);
/**
* Copy an entire header line (including all parameters) from another message
@ -269,7 +269,7 @@ public:
{ return m_ep ? m_ep->isReliable() : false; }
/**
*
* Get the Command Sequence number from this message
*/
inline int getCSeq() const
{ return m_cseq; }

View File

@ -26,6 +26,10 @@
#include <pthread.h>
#include <errno.h>
#ifndef PTHREAD_STACK_MIN
#define PTHREAD_STACK_MIN 16384
#endif
namespace TelEngine {
class ThreadPrivate : public GenObject {
@ -67,12 +71,18 @@ ThreadPrivate *ThreadPrivate::create(Thread *t,const char *name)
{
ThreadPrivate *p = new ThreadPrivate(t,name);
int e = 0;
// Set a decent (256K) stack size that won't eat all virtual memory
pthread_attr_t attr;
::pthread_attr_init(&attr);
::pthread_attr_setstacksize(&attr, 16*PTHREAD_STACK_MIN);
for (int i=0; i<5; i++) {
e = ::pthread_create(&p->thread,0,startFunc,p);
e = ::pthread_create(&p->thread,&attr,startFunc,p);
if (e != EAGAIN)
break;
::usleep(20);
}
::pthread_attr_destroy(&attr);
if (e) {
Debug(DebugFail,"Error %d while creating pthread in '%s' [%p]",e,name,p);
p->m_thread = 0;
@ -349,3 +359,4 @@ void Thread::preExec()
::pthread_kill_other_threads_np();
#endif
}
/* vi: set ts=8 sw=4 sts=4 noet: */

View File

@ -234,6 +234,8 @@ public:
{ m_status = status; }
inline void setTarget(const char *target = 0)
{ m_targetid = target; }
inline const String& getTarget() const
{ return m_targetid; }
inline static int total()
{ return s_total; }
private:
@ -283,18 +285,16 @@ public:
virtual bool received(Message &msg);
};
class H323DTMF : public MessageHandler
class H323ConnHandler : public MessageReceiver
{
public:
H323DTMF(const char *name) : MessageHandler(name) { }
virtual bool received(Message &msg);
};
class H323Text : public MessageHandler
{
public:
H323Text(const char *name) : MessageHandler(name) { }
virtual bool received(Message &msg);
enum {
Ringing,
Answered,
DTMF,
Text,
};
virtual bool received(Message &msg, int id);
};
class H323Stopper : public MessageHandler
@ -380,7 +380,10 @@ bool H323MsgThread::route()
Debug(DebugInfo,"Routing H.323 call %s [%p] to '%s'",m_id.c_str(),conn,m_msg->getValue("callto"));
conn->setStatus("routed");
conn->setTarget(m_msg->getValue("targetid"));
conn->AnsweringCall(H323Connection::AnswerCallNow);
if (conn->getTarget().null()) {
Debug(DebugInfo,"Answering now H.323 call %s [%p] because we have no targetid",m_id.c_str(),conn);
conn->AnsweringCall(H323Connection::AnswerCallNow);
}
conn->deref();
}
else {
@ -1260,38 +1263,35 @@ bool H323Dropper::received(Message &msg)
return false;
};
bool H323DTMF::received(Message &msg)
bool H323ConnHandler::received(Message &msg, int id)
{
String id(msg.getValue("targetid"));
if (!id.startsWith("h323"))
String callid(msg.getValue("targetid"));
if (!callid.startsWith("h323"))
return false;
YateH323Connection *conn = hplugin.findConnectionLock(id);
if (!conn)
return false;
String text(msg.getValue("text"));
YateH323Connection *conn = hplugin.findConnectionLock(id);
if (conn) {
Debug("H323DTMF",DebugInfo,"Text '%s' for %s [%p]",text.c_str(),conn->id().c_str(),conn);
for (unsigned int i = 0; i < text.length(); i++)
conn->SendUserInputTone(text[i]);
conn->Unlock();
return true;
switch (id) {
case Answered:
conn->AnsweringCall(H323Connection::AnswerCallNow);
break;
case Ringing:
conn->AnsweringCall(H323Connection::AnswerCallAlertWithMedia);
break;
case DTMF:
Debug("H323",DebugInfo,"DTMF '%s' for %s [%p]",text.c_str(),conn->id().c_str(),conn);
for (unsigned int i = 0; i < text.length(); i++)
conn->SendUserInputTone(text[i]);
break;
case Text:
Debug("H323",DebugInfo,"Text '%s' for %s [%p]",text.c_str(),conn->id().c_str(),conn);
conn->SendUserInputIndicationString(text.safe());
break;
}
return false;
};
bool H323Text::received(Message &msg)
{
String id(msg.getValue("targetid"));
if (!id.startsWith("h323"))
return false;
String text(msg.getValue("text"));
YateH323Connection *conn = hplugin.findConnectionLock(id);
if (conn) {
Debug("H323Text",DebugInfo,"Text '%s' for %s [%p]",text.c_str(),conn->id().c_str(),conn);
conn->SendUserInputIndicationString(text.safe());
conn->Unlock();
return true;
}
return false;
};
conn->Unlock();
return true;
}
bool StatusHandler::received(Message &msg)
{
@ -1411,10 +1411,13 @@ void H323Plugin::initialize()
s_externalRtp = s_cfg.getBoolValue("general","external_rtp",false);
if (m_first) {
m_first = false;
H323ConnHandler* ch = new H323ConnHandler;
Engine::install(new MessageRelay("call.ringing",ch,H323ConnHandler::Ringing));
Engine::install(new MessageRelay("call.answered",ch,H323ConnHandler::Answered));
Engine::install(new MessageRelay("chan.dtmf",ch,H323ConnHandler::DTMF));
Engine::install(new MessageRelay("chan.text",ch,H323ConnHandler::Text));
Engine::install(new H323Handler("call.execute"));
Engine::install(new H323Dropper("call.drop"));
Engine::install(new H323DTMF("chan.dtmf"));
Engine::install(new H323Text("chan.text"));
Engine::install(new H323Stopper("engine.halt"));
Engine::install(new StatusHandler);
}

View File

@ -44,8 +44,19 @@
using namespace TelEngine;
static Configuration s_cfg;
/* Payloads for the AV profile */
static TokenDict dict_payloads[] = {
{ "mulaw", 0 },
{ "gsm", 3 },
{ "lpc10", 7 },
{ "alaw", 8 },
{ "slin", 11 },
{ "g726", 2 },
{ "g722", 9 },
{ "g723", 12 },
};
static Configuration s_cfg;
class SIPHandler : public MessageHandler
{
@ -54,6 +65,17 @@ public:
virtual bool received(Message &msg);
};
class SIPConnHandler : public MessageReceiver
{
public:
enum {
Ringing,
Answered,
Drop,
};
virtual bool received(Message &msg, int id);
};
class YateUDPParty : public SIPParty
{
public:
@ -105,6 +127,62 @@ private:
};
class YateSIPConnection : public DataEndpoint
{
public:
YateSIPConnection(Message& msg, SIPTransaction* tr);
YateSIPConnection(Message& msg, const String& uri);
~YateSIPConnection();
virtual void disconnected(bool final, const char *reason);
virtual const String& toString() const
{ return m_id; }
bool process(SIPEvent* ev);
void ringing(Message* msg = 0);
void answered(Message* msg = 0);
inline const String& id() const
{ return m_id; }
inline const String& status() const
{ return m_status; }
inline void setStatus(const char *status)
{ m_status = status; }
inline void setTarget(const char *target = 0)
{ m_target = target; }
inline const String& getTarget() const
{ return m_target; }
inline SIPTransaction* getTransaction() const
{ return m_tr; }
static YateSIPConnection* find(const String& id);
private:
SDPBody* createPasstroughSDP(Message &msg);
SDPBody* createRtpSDP(SIPMessage* msg, const char* formats);
SIPTransaction* m_tr;
String m_id;
String m_target;
String m_status;
String m_rtpid;
};
class SipMsgThread : public Thread
{
public:
SipMsgThread(SIPTransaction* tr, Message* msg)
: Thread("SipMsgThread"), m_tr(tr), m_msg(msg)
{ m_tr->ref(); m_id = m_tr->getCallID(); }
virtual void run();
virtual void cleanup();
bool route();
inline static int count()
{ return s_count; }
inline static int routed()
{ return s_routed; }
private:
SIPTransaction* m_tr;
Message* m_msg;
String m_id;
static int s_count;
static int s_routed;
};
class SIPPlugin : public Plugin
{
public:
@ -114,11 +192,47 @@ public:
inline YateSIPEndPoint* ep() const
{ return m_endpoint; }
private:
SIPHandler *m_handler;
SIPConnHandler *m_handler;
YateSIPEndPoint *m_endpoint;
};
static void parseSDP(SDPBody* sdp, String& addr, String& port, String& formats)
{
const NamedString* c = sdp->getLine("c");
if (c) {
String tmp(*c);
if (tmp.startSkip("IN IP4")) {
tmp.trimBlanks();
addr = tmp;
}
}
c = sdp->getLine("m");
if (c) {
String tmp(*c);
if (tmp.startSkip("audio")) {
int var = 0;
tmp >> var >> " RTP/AVP";
if (var > 0)
port = var;
String fmt;
while (tmp[0] == ' ') {
var = -1;
tmp >> " " >> var;
const char* payload = lookup(var,dict_payloads);
if (payload) {
if (fmt)
fmt << ",";
fmt << payload;
}
}
formats = fmt;
}
}
}
static SIPPlugin plugin;
static ObjList s_calls;
static Mutex s_mutex;
YateUDPParty::YateUDPParty(int fd,struct sockaddr_in sin, int local)
: m_netfd(fd), m_sin(sin)
@ -316,7 +430,14 @@ void YateSIPEndPoint::run ()
// m_engine->process();
SIPEvent* e = m_engine->getEvent();
if (e) {
if ((e->getState() == SIPTransaction::Trying) && !e->isOutgoing()) {
YateSIPConnection* conn = static_cast<YateSIPConnection*>(e->getTransaction()->getUserData());
if (conn) {
if (conn->process(e))
delete e;
else
m_engine->processEvent(e);
}
else if ((e->getState() == SIPTransaction::Trying) && !e->isOutgoing()) {
incoming(e);
delete e;
}
@ -334,47 +455,372 @@ void YateSIPEndPoint::incoming(SIPEvent* e)
}
}
static int s_maxqueue = 5;
void YateSIPEndPoint::invite(SIPEvent* e)
{
if (Engine::exiting()) {
Debug(DebugWarn,"Dropping call, engine is exiting");
e->getTransaction()->setResponse(500, "Server Shutting Down");
return;
}
int cnt = SipMsgThread::count();
if (cnt > s_maxqueue) {
Debug(DebugWarn,"Dropping call, there are already %d waiting",cnt);
e->getTransaction()->setResponse(503, "Service Unavailable");
return;
}
String callid(e->getTransaction()->getCallID());
URI uri(e->getTransaction()->getURI());
const HeaderLine* hl = e->getMessage()->getHeader("From");
URI from(hl ? *hl : "");
Message *m = new Message("call.preroute");
m->addParam("driver","sip");
m->addParam("id","sip2/" + callid);
m->addParam("id","sip/" + callid);
m->addParam("caller",from.getUser());
m->addParam("called",uri.getUser());
m->addParam("SIP-CallID",callid);
Engine::dispatch(m);
*m = "call.route";
m->retValue().clear();
if (Engine::dispatch(m) && m->retValue()) {
e->getTransaction()->setResponse(183, "Call Progress");
// e->getTransaction()->setResponse(500, "Server Internal Error");
m->addParam("sip.callid",callid);
if (e->getMessage()->body && e->getMessage()->body->isSDP()) {
String addr,port,formats;
parseSDP(static_cast<SDPBody*>(e->getMessage()->body),addr,port,formats);
m->addParam("rtp.addr",addr);
m->addParam("rtp.port",port);
m->addParam("formats",formats);
}
else
e->getTransaction()->setResponse(404, "Not Found");
delete m;
SipMsgThread *t = new SipMsgThread(e->getTransaction(),m);
if (!t->startup()) {
Debug(DebugWarn,"Error starting routing thread! [%p]",this);
delete t;
e->getTransaction()->setResponse(500, "Server Internal Error");
}
}
static Mutex s_route;
int SipMsgThread::s_count = 0;
int SipMsgThread::s_routed = 0;
YateSIPConnection* YateSIPConnection::find(const String& id)
{
ObjList* l = s_calls.find(id);
return l ? static_cast<YateSIPConnection*>(l->get()) : 0;
}
// Incoming call constructor
YateSIPConnection::YateSIPConnection(Message& msg, SIPTransaction* tr)
: m_tr(tr)
{
Debug(DebugAll,"YateSIPConnection::YateSIPConnection(%p) [%p]",tr,this);
s_mutex.lock();
m_tr->ref();
m_id = m_tr->getCallID();
m_tr->setUserData(this);
s_calls.append(this);
s_mutex.unlock();
if (msg.getValue("rtp.forward")) {
}
}
// Outgoing call constructor
YateSIPConnection::YateSIPConnection(Message& msg, const String& uri)
: m_tr(0)
{
Debug(DebugAll,"YateSIPConnection::YateSIPConnection(%p,'%s') [%p]",
&msg,uri.c_str(),this);
SIPMessage* m = new SIPMessage("INVITE",uri);
plugin.ep()->buildParty(m);
// m->complete(plugin.ep()->engine());
SDPBody* sdp = createPasstroughSDP(msg);
if (!sdp)
sdp = createRtpSDP(m,msg.getValue("formats"));
m->setBody(sdp);
m_tr = plugin.ep()->engine()->addMessage(m);
m->deref();
if (m_tr) {
m_tr->ref();
m_id = m_tr->getCallID();
m_tr->setUserData(this);
}
Lock lock(s_mutex);
s_calls.append(this);
}
YateSIPConnection::~YateSIPConnection()
{
Debug(DebugAll,"YateSIPConnection::~YateSIPConnection() [%p]",this);
Lock lock(s_mutex);
s_calls.remove(this,false);
if (m_tr) {
m_tr->setUserData(0);
m_tr->setResponse(487, "Request Terminated");
m_tr->deref();
}
}
SDPBody* YateSIPConnection::createPasstroughSDP(Message &msg)
{
String tmp = msg.getValue("rtp.forward");
if (!tmp.toBoolean())
return 0;
tmp = msg.getValue("rtp.port");
int port = tmp.toInteger();
tmp = msg.getValue("rtp.addr");
if (port && tmp) {
tmp = "IN IP4 " + tmp;
String frm = msg.getValue("formats");
if (frm.null())
frm = "alaw,mulaw";
ObjList* l = tmp.split(',',false);
frm = "audio ";
frm << port << " RTP/AVP";
ObjList* f = l;
for (; f; f = f->next()) {
String* s = static_cast<String*>(f->get());
if (s) {
int payload = s->toInteger(dict_payloads,-1);
if (payload >= 0)
frm << " " << payload;
}
}
delete l;
String owner;
owner << "- " << port << " 1" << tmp;
SDPBody* sdp = new SDPBody;
sdp->addLine("v","0");
sdp->addLine("o",owner);
sdp->addLine("s","Call");
sdp->addLine("t","0 0");
sdp->addLine("c",tmp);
sdp->addLine("m",frm);
return sdp;
}
return 0;
}
SDPBody* YateSIPConnection::createRtpSDP(SIPMessage* msg, const char* formats)
{
Message m("chan.rtp");
m.addParam("direction","bidir");
m.addParam("remoteip",msg->getParty()->getPartyAddr());
m.userData(static_cast<DataEndpoint *>(this));
if (Engine::dispatch(m)) {
m_rtpid = m.getValue("rtpid");
String port(m.getValue("localport"));
String tmp(m.getValue("localip"));
tmp = "IN IP4 " + tmp;
String owner;
owner << "- " << port << " 1" << tmp;
String frm(formats);
if (frm.null())
frm = "alaw,mulaw";
ObjList* l = tmp.split(',',false);
frm = "audio ";
frm << port << " RTP/AVP";
ObjList* f = l;
for (; f; f = f->next()) {
String* s = static_cast<String*>(f->get());
if (s) {
int payload = s->toInteger(dict_payloads,-1);
if (payload >= 0)
frm << " " << payload;
}
}
delete l;
SDPBody* sdp = new SDPBody;
sdp->addLine("v","0");
sdp->addLine("o",owner);
sdp->addLine("s","Call");
sdp->addLine("t","0 0");
sdp->addLine("c",tmp);
sdp->addLine("m",frm);
return sdp;
}
return 0;
}
void YateSIPConnection::disconnected(bool final, const char *reason)
{
Debug(DebugAll,"YateSIPConnection::disconnected() '%s'",reason);
setStatus("disconnected");
setTarget();
}
bool YateSIPConnection::process(SIPEvent* ev)
{
Debug(DebugInfo,"YateSIPConnection::process(%p) %s [%p]",
ev,SIPTransaction::stateName(ev->getState()),this);
return false;
}
void YateSIPConnection::ringing(Message* msg)
{
if (m_tr && (m_tr->getState() == SIPTransaction::Process))
m_tr->setResponse(180, "Ringing");
setStatus("ringing");
}
void YateSIPConnection::answered(Message* msg)
{
if (m_tr && (m_tr->getState() == SIPTransaction::Process)) {
#if 0
SDPBody* sdp = new SDPBody;
sdp->addLine("v","0");
sdp->addLine("o","- 99 1 IN IP4 192.168.168.2");
sdp->addLine("s","Call");
sdp->addLine("t","0 0");
sdp->addLine("c","IN IP4 192.168.168.2");
sdp->addLine("m","audio 9090 RTP/AVP 0 8");
#endif
SIPMessage* m = new SIPMessage(m_tr->initialMessage(), 200, "OK");
SDPBody* sdp = 0;
if (m_rtpid) {
}
else if (msg)
sdp = createPasstroughSDP(*msg);
m->setBody(sdp);
m_tr->setResponse(m);
m->deref();
// m_tr->setResponse(200, "OK");
m_tr->deref();
m_tr = 0;
}
setStatus("answered");
}
bool SipMsgThread::route()
{
Debug(DebugAll,"Routing thread for %s [%p]",m_id.c_str(),this);
Engine::dispatch(m_msg);
*m_msg = "call.route";
m_msg->retValue().clear();
bool ok = Engine::dispatch(m_msg) && !m_msg->retValue().null();
if (m_tr->getState() != SIPTransaction::Process) {
Debug(DebugInfo,"SIP call %s (%p) vanished while routing!",m_id.c_str(),m_tr);
return false;
}
if (ok) {
*m_msg = "call.execute";
m_msg->addParam("callto",m_msg->retValue());
m_msg->retValue().clear();
YateSIPConnection* conn = new YateSIPConnection(*m_msg,m_tr);
m_msg->userData(conn);
if (Engine::dispatch(m_msg)) {
Debug(DebugInfo,"Routing SIP call %s (%p) to '%s' [%p]",
m_id.c_str(),m_tr,m_msg->getValue("callto"),this);
conn->setStatus("routed");
conn->setTarget(m_msg->getValue("targetid"));
if (conn->getTarget().null()) {
Debug(DebugInfo,"Answering now SIP call %s [%p] because we have no targetid",
conn->id().c_str(),conn);
conn->answered();
}
else
m_tr->setResponse(183, "Session Progress");
}
else {
Debug(DebugInfo,"Rejecting unconnected SIP call %s (%p) [%p]",
m_id.c_str(),m_tr,this);
m_tr->setResponse(500, "Server Internal Error");
conn->setStatus("rejected");
conn->destruct();
}
}
else {
Debug(DebugInfo,"Rejecting unrouted SIP call %s (%p) [%p]",
m_id.c_str(),m_tr,this);
m_tr->setResponse(404, "Not Found");
}
return ok;
}
void SipMsgThread::run()
{
s_route.lock();
s_count++;
s_route.unlock();
Debug(DebugAll,"Started routing thread for %s (%p) [%p]",
m_id.c_str(),m_tr,this);
bool ok = route();
s_route.lock();
s_count--;
if (ok)
s_routed++;
s_route.unlock();
}
void SipMsgThread::cleanup()
{
Debug(DebugAll,"Cleaning up routing thread for %s (%p) [%p]",
m_id.c_str(),m_tr,this);
delete m_msg;
m_tr->deref();
}
bool SIPHandler::received(Message &msg)
{
String dest(msg.getValue("callto"));
if (!dest.startSkip("sip2/",false))
if (!dest.startSkip("sip/",false))
return false;
SIPMessage* m = new SIPMessage("INVITE",dest);
// m->complete(plugin.ep()->engine());
SDPBody* sdp = new SDPBody;
sdp->addLine("v","0");
sdp->addLine("s","Call");
sdp->addLine("t","0 0");
m->setBody(sdp);
plugin.ep()->engine()->addMessage(m);
m->deref();
if (!msg.userData()) {
Debug(DebugWarn,"SIP call found but no data channel!");
return false;
}
YateSIPConnection* conn = new YateSIPConnection(msg,dest);
if (conn->getTransaction()) {
DataEndpoint *dd = static_cast<DataEndpoint *>(msg.userData());
if (dd && conn->connect(dd)) {
msg.addParam("targetid",conn->id());
conn->setTarget(msg.getValue("id"));
conn->deref();
return true;
}
}
conn->destruct();
return false;
}
bool SIPConnHandler::received(Message &msg, int id)
{
String callid;
switch (id) {
case Answered:
case Ringing:
callid = msg.getParam("targetid");
break;
case Drop:
callid = msg.getParam("id");
break;
default:
return false;
}
if (callid.null()) {
if (id == Drop) {
Debug("SIP",DebugInfo,"Dropping all calls");
s_calls.clear();
}
return false;
}
Lock lock(s_mutex);
YateSIPConnection* conn = YateSIPConnection::find(callid);
if (!conn)
return false;
switch (id) {
case Drop:
lock.drop();
conn->disconnect();
break;
case Ringing:
conn->ringing(&msg);
break;
case Answered:
conn->answered(&msg);
break;
default:
return false;
}
return true;
}
SIPPlugin::SIPPlugin()
: m_handler(0), m_endpoint(0)
{
@ -403,8 +849,12 @@ void SIPPlugin::initialize()
m_endpoint->startup();
}
if (!m_handler) {
m_handler = new SIPHandler("call.execute");
Engine::install(m_handler);
m_handler = new SIPConnHandler;
Engine::install(new MessageRelay("call.ringing",m_handler,SIPConnHandler::Ringing));
Engine::install(new MessageRelay("call.answered",m_handler,SIPConnHandler::Answered));
Engine::install(new MessageRelay("call.drop",m_handler,SIPConnHandler::Drop));
Engine::install(new SIPHandler("call.execute"));
// Engine::install(new StatusHandler("engine.status"));
}
}

View File

@ -1342,7 +1342,7 @@ bool StatusHandler::received(Message &msg)
const char *sel = msg.getValue("module");
if (sel && ::strcmp(sel,"zapchan") && ::strcmp(sel,"fixchans"))
return false;
String st("name=zapchan,type=fixchans,format=Span|Chan|Status");
String st("name=zapchan,type=fixchans,format=Status|Span/Chan");
zplugin.mutex.lock();
const ObjList *l = &zplugin.m_spans;
st << ",spans=" << l->count() << ",spanlen=";
@ -1371,7 +1371,7 @@ bool StatusHandler::received(Message &msg)
else
st << ",";
st << c->id() << "=";
st << s->span() << "|" << n << "|" << c->status();
st << c->status() << "|" << s->span() << "/" << n;
}
}
}