From 84a24b93d427ae55c52df440058d0b4d2707e38c Mon Sep 17 00:00:00 2001 From: paulc Date: Fri, 31 Dec 2004 01:09:21 +0000 Subject: [PATCH] Reduced thread stack size. Proper signalling in H323. Changed status format in Zaptel. git-svn-id: http://yate.null.ro/svn/yate/trunk@150 acf43c95-373e-0410-b603-e72c3f656dc1 --- contrib/ysip/message.cpp | 34 ++- contrib/ysip/transaction.cpp | 33 ++- contrib/ysip/yatesip.h | 4 +- engine/Thread.cpp | 13 +- modules/h323chan.cpp | 85 +++--- modules/ysipchan.cpp | 502 +++++++++++++++++++++++++++++++++-- modules/zapchan.cpp | 4 +- 7 files changed, 596 insertions(+), 79 deletions(-) diff --git a/contrib/ysip/message.cpp b/contrib/ysip/message.cpp index c3362a56..f94557b7 100644 --- a/contrib/ysip/message.cpp +++ b/contrib/ysip/message.cpp @@ -128,7 +128,7 @@ SIPMessage::SIPMessage(SIPParty* ep, const char *buf, int len) SIPMessage::SIPMessage(const SIPMessage* message, int _code, const char* _reason) : code(_code), reason(_reason), body(0), m_ep(0), m_valid(false), - m_answer(true), m_outgoing(true), m_cseq(-1) + m_answer(true), m_outgoing(true), m_ack(false), m_cseq(-1) { Debug(DebugAll,"SIPMessage::SIPMessage(%p,%d,'%s') [%p]", message,_code,_reason,this); @@ -182,8 +182,14 @@ SIPMessage::~SIPMessage() setBody(); } -void SIPMessage::complete(SIPEngine* engine, const char* user, const char* domain) +void SIPMessage::complete(SIPEngine* engine, const char* user, const char* domain, const char* dlgTag) { + Debug("SIPMessage",DebugAll,"complete(%p,'%s','%s','%s')%s%s%s [%p]", + engine,user,domain,dlgTag, + isACK() ? " ACK" : "", + isOutgoing() ? " OUT" : "", + isAnswer() ? " ANS" : "", + this); if (!engine) return; @@ -212,6 +218,12 @@ void SIPMessage::complete(SIPEngine* engine, const char* user, const char* domai tmp << (int)::random(); hl->addParam("branch",tmp); } + if (isAnswer()) { + if (!hl->getParam("received")) + hl->addParam("received",getParty()->getPartyAddr()); + if (!hl->getParam("rport")) + hl->addParam("rport",String(getParty()->getPartyPort())); + } hl = const_cast(getHeader("From")); if (!hl) { @@ -223,11 +235,15 @@ void SIPMessage::complete(SIPEngine* engine, const char* user, const char* domai if (!hl->getParam("tag")) hl->addParam("tag",String((int)::random())); - if (!getHeader("To")) { + hl = const_cast(getHeader("To")); + if (!hl) { String tmp; tmp << "<" << uri << ">"; - addHeader("To",tmp); + hl = new HeaderLine("To",tmp); + header.append(hl); } + if (dlgTag && !hl->getParam("tag")) + hl->addParam("tag",dlgTag); if (!getHeader("Call-ID")) { String tmp; @@ -247,6 +263,12 @@ void SIPMessage::complete(SIPEngine* engine, const char* user, const char* domai addHeader("Max-Forwards",tmp); } + if (!getHeader("Contact")) { + String tmp; + tmp << "getLocalAddr() << ":" << getParty()->getLocalPort() << ">"; + addHeader("Contact",tmp); + } + if (!(getHeader("User-Agent") || engine->getUserAgent().null())) addHeader("User-Agent",engine->getUserAgent()); @@ -365,6 +387,10 @@ bool SIPMessage::parse(const char* buf, int len) if ((m_cseq < 0) && (name &= "CSeq")) { String seq = *line; seq >> m_cseq; + if (m_answer) { + seq.trimBlanks().toUpper(); + method = seq; + } } line->destruct(); } diff --git a/contrib/ysip/transaction.cpp b/contrib/ysip/transaction.cpp index e7c01250..d5755c21 100644 --- a/contrib/ysip/transaction.cpp +++ b/contrib/ysip/transaction.cpp @@ -32,7 +32,7 @@ using namespace TelEngine; SIPTransaction::SIPTransaction(SIPMessage* message, SIPEngine* engine, bool outgoing) : m_outgoing(outgoing), m_invite(false), m_transmit(false), m_state(Invalid), m_timeout(0), - m_firstMessage(message), m_lastMessage(0), m_pending(0), m_engine(engine) + m_firstMessage(message), m_lastMessage(0), m_pending(0), m_engine(engine), m_private(0) { Debug(DebugAll,"SIPTransaction::SIPTransaction(%p,%p) [%p]",message,engine,this); if (m_firstMessage) { @@ -42,6 +42,9 @@ SIPTransaction::SIPTransaction(SIPMessage* message, SIPEngine* engine, bool outg m_branch = *ns; if (!m_branch.startsWith("z9hG4bK")) m_branch.clear(); + ns = message->getParam("To","tag"); + if (ns) + m_tag = *ns; const HeaderLine* hl = message->getHeader("Call-ID"); if (hl) m_callid = *hl; @@ -117,7 +120,9 @@ void SIPTransaction::setLatestMessage(SIPMessage* message) m_lastMessage = message; if (m_lastMessage) { m_lastMessage->ref(); - m_lastMessage->complete(m_engine); + if (message->isAnswer() && (message->code > 100) && m_tag.null()) + m_tag = (int)::random(); + message->complete(m_engine,0,0,m_tag); } } @@ -219,6 +224,15 @@ void SIPTransaction::setResponse(int code, const char* reason) Debug(DebugWarn,"setResponse(%d,'%s') in client transaction [%p]",code,reason,this); return; } + switch (m_state) { + case Invalid: + case Retrans: + case Finish: + case Cleared: + Debug("SIPTransaction",DebugInfo,"Ignoring setResponse(%d) in state %s [%p]", + code,stateName(m_state),this); + return; + } SIPMessage* msg = new SIPMessage(m_firstMessage, code, reason); setResponse(msg); msg->deref(); @@ -226,7 +240,7 @@ void SIPTransaction::setResponse(int code, const char* reason) bool SIPTransaction::processMessage(SIPMessage* message, const String& branch) { - Debug("SIPTransaction",DebugAll,"processMessage(%p,'%s') [%p]", + DDebug("SIPTransaction",DebugAll,"processMessage(%p,'%s') [%p]", message,branch.c_str(),this); if (branch) { if (branch != m_branch) @@ -240,6 +254,19 @@ bool SIPTransaction::processMessage(SIPMessage* message, const String& branch) Debug("SIPTransaction",DebugWarn,"Non-branch matching not implemented!"); return false; } + Debug("SIPTransaction",DebugAll,"Processing %s %p '%s' in [%p]", + message->isAnswer() ? "answer" : "request", + message,message->method.c_str(),this); + + if (m_tag.null() && message->isAnswer()) { + const NamedString* ns = message->getParam("To","tag"); + if (ns) { + m_tag = *ns; + Debug("SIPTransaction",DebugInfo,"Found dialog tag '%s' [%p]", + m_tag.c_str(),this); + } + } + if (isOutgoing()) processClientMessage(message,m_state); else diff --git a/contrib/ysip/yatesip.h b/contrib/ysip/yatesip.h index 9cbf8cf5..a3ea50a9 100644 --- a/contrib/ysip/yatesip.h +++ b/contrib/ysip/yatesip.h @@ -204,7 +204,7 @@ public: /** * Complete missing fields with defaults taken from a SIP engine */ - void complete(SIPEngine* engine, const char* user = 0, const char* domain = 0); + void complete(SIPEngine* engine, const char* user = 0, const char* domain = 0, const char* dlgTag = 0); /** * Copy an entire header line (including all parameters) from another message @@ -269,7 +269,7 @@ public: { return m_ep ? m_ep->isReliable() : false; } /** - * + * Get the Command Sequence number from this message */ inline int getCSeq() const { return m_cseq; } diff --git a/engine/Thread.cpp b/engine/Thread.cpp index d810dfb7..786fb19c 100644 --- a/engine/Thread.cpp +++ b/engine/Thread.cpp @@ -26,6 +26,10 @@ #include #include +#ifndef PTHREAD_STACK_MIN +#define PTHREAD_STACK_MIN 16384 +#endif + namespace TelEngine { class ThreadPrivate : public GenObject { @@ -67,12 +71,18 @@ ThreadPrivate *ThreadPrivate::create(Thread *t,const char *name) { ThreadPrivate *p = new ThreadPrivate(t,name); int e = 0; + // Set a decent (256K) stack size that won't eat all virtual memory + pthread_attr_t attr; + ::pthread_attr_init(&attr); + ::pthread_attr_setstacksize(&attr, 16*PTHREAD_STACK_MIN); + for (int i=0; i<5; i++) { - e = ::pthread_create(&p->thread,0,startFunc,p); + e = ::pthread_create(&p->thread,&attr,startFunc,p); if (e != EAGAIN) break; ::usleep(20); } + ::pthread_attr_destroy(&attr); if (e) { Debug(DebugFail,"Error %d while creating pthread in '%s' [%p]",e,name,p); p->m_thread = 0; @@ -349,3 +359,4 @@ void Thread::preExec() ::pthread_kill_other_threads_np(); #endif } +/* vi: set ts=8 sw=4 sts=4 noet: */ diff --git a/modules/h323chan.cpp b/modules/h323chan.cpp index 369d6f82..bfdc2f7d 100644 --- a/modules/h323chan.cpp +++ b/modules/h323chan.cpp @@ -234,6 +234,8 @@ public: { m_status = status; } inline void setTarget(const char *target = 0) { m_targetid = target; } + inline const String& getTarget() const + { return m_targetid; } inline static int total() { return s_total; } private: @@ -283,18 +285,16 @@ public: virtual bool received(Message &msg); }; -class H323DTMF : public MessageHandler +class H323ConnHandler : public MessageReceiver { public: - H323DTMF(const char *name) : MessageHandler(name) { } - virtual bool received(Message &msg); -}; - -class H323Text : public MessageHandler -{ -public: - H323Text(const char *name) : MessageHandler(name) { } - virtual bool received(Message &msg); + enum { + Ringing, + Answered, + DTMF, + Text, + }; + virtual bool received(Message &msg, int id); }; class H323Stopper : public MessageHandler @@ -380,7 +380,10 @@ bool H323MsgThread::route() Debug(DebugInfo,"Routing H.323 call %s [%p] to '%s'",m_id.c_str(),conn,m_msg->getValue("callto")); conn->setStatus("routed"); conn->setTarget(m_msg->getValue("targetid")); - conn->AnsweringCall(H323Connection::AnswerCallNow); + if (conn->getTarget().null()) { + Debug(DebugInfo,"Answering now H.323 call %s [%p] because we have no targetid",m_id.c_str(),conn); + conn->AnsweringCall(H323Connection::AnswerCallNow); + } conn->deref(); } else { @@ -1260,38 +1263,35 @@ bool H323Dropper::received(Message &msg) return false; }; -bool H323DTMF::received(Message &msg) +bool H323ConnHandler::received(Message &msg, int id) { - String id(msg.getValue("targetid")); - if (!id.startsWith("h323")) + String callid(msg.getValue("targetid")); + if (!callid.startsWith("h323")) + return false; + YateH323Connection *conn = hplugin.findConnectionLock(id); + if (!conn) return false; String text(msg.getValue("text")); - YateH323Connection *conn = hplugin.findConnectionLock(id); - if (conn) { - Debug("H323DTMF",DebugInfo,"Text '%s' for %s [%p]",text.c_str(),conn->id().c_str(),conn); - for (unsigned int i = 0; i < text.length(); i++) - conn->SendUserInputTone(text[i]); - conn->Unlock(); - return true; + switch (id) { + case Answered: + conn->AnsweringCall(H323Connection::AnswerCallNow); + break; + case Ringing: + conn->AnsweringCall(H323Connection::AnswerCallAlertWithMedia); + break; + case DTMF: + Debug("H323",DebugInfo,"DTMF '%s' for %s [%p]",text.c_str(),conn->id().c_str(),conn); + for (unsigned int i = 0; i < text.length(); i++) + conn->SendUserInputTone(text[i]); + break; + case Text: + Debug("H323",DebugInfo,"Text '%s' for %s [%p]",text.c_str(),conn->id().c_str(),conn); + conn->SendUserInputIndicationString(text.safe()); + break; } - return false; -}; - -bool H323Text::received(Message &msg) -{ - String id(msg.getValue("targetid")); - if (!id.startsWith("h323")) - return false; - String text(msg.getValue("text")); - YateH323Connection *conn = hplugin.findConnectionLock(id); - if (conn) { - Debug("H323Text",DebugInfo,"Text '%s' for %s [%p]",text.c_str(),conn->id().c_str(),conn); - conn->SendUserInputIndicationString(text.safe()); - conn->Unlock(); - return true; - } - return false; -}; + conn->Unlock(); + return true; +} bool StatusHandler::received(Message &msg) { @@ -1411,10 +1411,13 @@ void H323Plugin::initialize() s_externalRtp = s_cfg.getBoolValue("general","external_rtp",false); if (m_first) { m_first = false; + H323ConnHandler* ch = new H323ConnHandler; + Engine::install(new MessageRelay("call.ringing",ch,H323ConnHandler::Ringing)); + Engine::install(new MessageRelay("call.answered",ch,H323ConnHandler::Answered)); + Engine::install(new MessageRelay("chan.dtmf",ch,H323ConnHandler::DTMF)); + Engine::install(new MessageRelay("chan.text",ch,H323ConnHandler::Text)); Engine::install(new H323Handler("call.execute")); Engine::install(new H323Dropper("call.drop")); - Engine::install(new H323DTMF("chan.dtmf")); - Engine::install(new H323Text("chan.text")); Engine::install(new H323Stopper("engine.halt")); Engine::install(new StatusHandler); } diff --git a/modules/ysipchan.cpp b/modules/ysipchan.cpp index 4b470781..7a853a39 100644 --- a/modules/ysipchan.cpp +++ b/modules/ysipchan.cpp @@ -44,8 +44,19 @@ using namespace TelEngine; -static Configuration s_cfg; +/* Payloads for the AV profile */ +static TokenDict dict_payloads[] = { + { "mulaw", 0 }, + { "gsm", 3 }, + { "lpc10", 7 }, + { "alaw", 8 }, + { "slin", 11 }, + { "g726", 2 }, + { "g722", 9 }, + { "g723", 12 }, +}; +static Configuration s_cfg; class SIPHandler : public MessageHandler { @@ -54,6 +65,17 @@ public: virtual bool received(Message &msg); }; +class SIPConnHandler : public MessageReceiver +{ +public: + enum { + Ringing, + Answered, + Drop, + }; + virtual bool received(Message &msg, int id); +}; + class YateUDPParty : public SIPParty { public: @@ -105,6 +127,62 @@ private: }; +class YateSIPConnection : public DataEndpoint +{ +public: + YateSIPConnection(Message& msg, SIPTransaction* tr); + YateSIPConnection(Message& msg, const String& uri); + ~YateSIPConnection(); + virtual void disconnected(bool final, const char *reason); + virtual const String& toString() const + { return m_id; } + bool process(SIPEvent* ev); + void ringing(Message* msg = 0); + void answered(Message* msg = 0); + inline const String& id() const + { return m_id; } + inline const String& status() const + { return m_status; } + inline void setStatus(const char *status) + { m_status = status; } + inline void setTarget(const char *target = 0) + { m_target = target; } + inline const String& getTarget() const + { return m_target; } + inline SIPTransaction* getTransaction() const + { return m_tr; } + static YateSIPConnection* find(const String& id); +private: + SDPBody* createPasstroughSDP(Message &msg); + SDPBody* createRtpSDP(SIPMessage* msg, const char* formats); + SIPTransaction* m_tr; + String m_id; + String m_target; + String m_status; + String m_rtpid; +}; + +class SipMsgThread : public Thread +{ +public: + SipMsgThread(SIPTransaction* tr, Message* msg) + : Thread("SipMsgThread"), m_tr(tr), m_msg(msg) + { m_tr->ref(); m_id = m_tr->getCallID(); } + virtual void run(); + virtual void cleanup(); + bool route(); + inline static int count() + { return s_count; } + inline static int routed() + { return s_routed; } +private: + SIPTransaction* m_tr; + Message* m_msg; + String m_id; + static int s_count; + static int s_routed; +}; + class SIPPlugin : public Plugin { public: @@ -114,11 +192,47 @@ public: inline YateSIPEndPoint* ep() const { return m_endpoint; } private: - SIPHandler *m_handler; + SIPConnHandler *m_handler; YateSIPEndPoint *m_endpoint; }; +static void parseSDP(SDPBody* sdp, String& addr, String& port, String& formats) +{ + const NamedString* c = sdp->getLine("c"); + if (c) { + String tmp(*c); + if (tmp.startSkip("IN IP4")) { + tmp.trimBlanks(); + addr = tmp; + } + } + c = sdp->getLine("m"); + if (c) { + String tmp(*c); + if (tmp.startSkip("audio")) { + int var = 0; + tmp >> var >> " RTP/AVP"; + if (var > 0) + port = var; + String fmt; + while (tmp[0] == ' ') { + var = -1; + tmp >> " " >> var; + const char* payload = lookup(var,dict_payloads); + if (payload) { + if (fmt) + fmt << ","; + fmt << payload; + } + } + formats = fmt; + } + } +} + static SIPPlugin plugin; +static ObjList s_calls; +static Mutex s_mutex; YateUDPParty::YateUDPParty(int fd,struct sockaddr_in sin, int local) : m_netfd(fd), m_sin(sin) @@ -316,7 +430,14 @@ void YateSIPEndPoint::run () // m_engine->process(); SIPEvent* e = m_engine->getEvent(); if (e) { - if ((e->getState() == SIPTransaction::Trying) && !e->isOutgoing()) { + YateSIPConnection* conn = static_cast(e->getTransaction()->getUserData()); + if (conn) { + if (conn->process(e)) + delete e; + else + m_engine->processEvent(e); + } + else if ((e->getState() == SIPTransaction::Trying) && !e->isOutgoing()) { incoming(e); delete e; } @@ -334,47 +455,372 @@ void YateSIPEndPoint::incoming(SIPEvent* e) } } +static int s_maxqueue = 5; + void YateSIPEndPoint::invite(SIPEvent* e) { + if (Engine::exiting()) { + Debug(DebugWarn,"Dropping call, engine is exiting"); + e->getTransaction()->setResponse(500, "Server Shutting Down"); + return; + } + int cnt = SipMsgThread::count(); + if (cnt > s_maxqueue) { + Debug(DebugWarn,"Dropping call, there are already %d waiting",cnt); + e->getTransaction()->setResponse(503, "Service Unavailable"); + return; + } + String callid(e->getTransaction()->getCallID()); URI uri(e->getTransaction()->getURI()); const HeaderLine* hl = e->getMessage()->getHeader("From"); URI from(hl ? *hl : ""); Message *m = new Message("call.preroute"); m->addParam("driver","sip"); - m->addParam("id","sip2/" + callid); + m->addParam("id","sip/" + callid); m->addParam("caller",from.getUser()); m->addParam("called",uri.getUser()); - m->addParam("SIP-CallID",callid); - Engine::dispatch(m); - *m = "call.route"; - m->retValue().clear(); - if (Engine::dispatch(m) && m->retValue()) { - e->getTransaction()->setResponse(183, "Call Progress"); -// e->getTransaction()->setResponse(500, "Server Internal Error"); + m->addParam("sip.callid",callid); + if (e->getMessage()->body && e->getMessage()->body->isSDP()) { + String addr,port,formats; + parseSDP(static_cast(e->getMessage()->body),addr,port,formats); + m->addParam("rtp.addr",addr); + m->addParam("rtp.port",port); + m->addParam("formats",formats); } - else - e->getTransaction()->setResponse(404, "Not Found"); - delete m; + SipMsgThread *t = new SipMsgThread(e->getTransaction(),m); + if (!t->startup()) { + Debug(DebugWarn,"Error starting routing thread! [%p]",this); + delete t; + e->getTransaction()->setResponse(500, "Server Internal Error"); + } +} + +static Mutex s_route; +int SipMsgThread::s_count = 0; +int SipMsgThread::s_routed = 0; + +YateSIPConnection* YateSIPConnection::find(const String& id) +{ + ObjList* l = s_calls.find(id); + return l ? static_cast(l->get()) : 0; +} + +// Incoming call constructor +YateSIPConnection::YateSIPConnection(Message& msg, SIPTransaction* tr) + : m_tr(tr) +{ + Debug(DebugAll,"YateSIPConnection::YateSIPConnection(%p) [%p]",tr,this); + s_mutex.lock(); + m_tr->ref(); + m_id = m_tr->getCallID(); + m_tr->setUserData(this); + s_calls.append(this); + s_mutex.unlock(); + if (msg.getValue("rtp.forward")) { + } +} + +// Outgoing call constructor +YateSIPConnection::YateSIPConnection(Message& msg, const String& uri) + : m_tr(0) +{ + Debug(DebugAll,"YateSIPConnection::YateSIPConnection(%p,'%s') [%p]", + &msg,uri.c_str(),this); + SIPMessage* m = new SIPMessage("INVITE",uri); + plugin.ep()->buildParty(m); +// m->complete(plugin.ep()->engine()); + SDPBody* sdp = createPasstroughSDP(msg); + if (!sdp) + sdp = createRtpSDP(m,msg.getValue("formats")); + m->setBody(sdp); + m_tr = plugin.ep()->engine()->addMessage(m); + m->deref(); + if (m_tr) { + m_tr->ref(); + m_id = m_tr->getCallID(); + m_tr->setUserData(this); + } + Lock lock(s_mutex); + s_calls.append(this); +} + +YateSIPConnection::~YateSIPConnection() +{ + Debug(DebugAll,"YateSIPConnection::~YateSIPConnection() [%p]",this); + Lock lock(s_mutex); + s_calls.remove(this,false); + if (m_tr) { + m_tr->setUserData(0); + m_tr->setResponse(487, "Request Terminated"); + m_tr->deref(); + } +} + +SDPBody* YateSIPConnection::createPasstroughSDP(Message &msg) +{ + String tmp = msg.getValue("rtp.forward"); + if (!tmp.toBoolean()) + return 0; + tmp = msg.getValue("rtp.port"); + int port = tmp.toInteger(); + tmp = msg.getValue("rtp.addr"); + if (port && tmp) { + tmp = "IN IP4 " + tmp; + String frm = msg.getValue("formats"); + if (frm.null()) + frm = "alaw,mulaw"; + ObjList* l = tmp.split(',',false); + frm = "audio "; + frm << port << " RTP/AVP"; + ObjList* f = l; + for (; f; f = f->next()) { + String* s = static_cast(f->get()); + if (s) { + int payload = s->toInteger(dict_payloads,-1); + if (payload >= 0) + frm << " " << payload; + } + } + delete l; + String owner; + owner << "- " << port << " 1" << tmp; + SDPBody* sdp = new SDPBody; + sdp->addLine("v","0"); + sdp->addLine("o",owner); + sdp->addLine("s","Call"); + sdp->addLine("t","0 0"); + sdp->addLine("c",tmp); + sdp->addLine("m",frm); + return sdp; + } + return 0; +} + +SDPBody* YateSIPConnection::createRtpSDP(SIPMessage* msg, const char* formats) +{ + Message m("chan.rtp"); + m.addParam("direction","bidir"); + m.addParam("remoteip",msg->getParty()->getPartyAddr()); + m.userData(static_cast(this)); + if (Engine::dispatch(m)) { + m_rtpid = m.getValue("rtpid"); + String port(m.getValue("localport")); + String tmp(m.getValue("localip")); + tmp = "IN IP4 " + tmp; + String owner; + owner << "- " << port << " 1" << tmp; + String frm(formats); + if (frm.null()) + frm = "alaw,mulaw"; + ObjList* l = tmp.split(',',false); + frm = "audio "; + frm << port << " RTP/AVP"; + ObjList* f = l; + for (; f; f = f->next()) { + String* s = static_cast(f->get()); + if (s) { + int payload = s->toInteger(dict_payloads,-1); + if (payload >= 0) + frm << " " << payload; + } + } + delete l; + SDPBody* sdp = new SDPBody; + sdp->addLine("v","0"); + sdp->addLine("o",owner); + sdp->addLine("s","Call"); + sdp->addLine("t","0 0"); + sdp->addLine("c",tmp); + sdp->addLine("m",frm); + return sdp; + } + return 0; +} + +void YateSIPConnection::disconnected(bool final, const char *reason) +{ + Debug(DebugAll,"YateSIPConnection::disconnected() '%s'",reason); + setStatus("disconnected"); + setTarget(); +} + +bool YateSIPConnection::process(SIPEvent* ev) +{ + Debug(DebugInfo,"YateSIPConnection::process(%p) %s [%p]", + ev,SIPTransaction::stateName(ev->getState()),this); + return false; +} + +void YateSIPConnection::ringing(Message* msg) +{ + if (m_tr && (m_tr->getState() == SIPTransaction::Process)) + m_tr->setResponse(180, "Ringing"); + setStatus("ringing"); +} + +void YateSIPConnection::answered(Message* msg) +{ + if (m_tr && (m_tr->getState() == SIPTransaction::Process)) { +#if 0 + SDPBody* sdp = new SDPBody; + sdp->addLine("v","0"); + sdp->addLine("o","- 99 1 IN IP4 192.168.168.2"); + sdp->addLine("s","Call"); + sdp->addLine("t","0 0"); + sdp->addLine("c","IN IP4 192.168.168.2"); + sdp->addLine("m","audio 9090 RTP/AVP 0 8"); +#endif + SIPMessage* m = new SIPMessage(m_tr->initialMessage(), 200, "OK"); + SDPBody* sdp = 0; + if (m_rtpid) { + } + else if (msg) + sdp = createPasstroughSDP(*msg); + m->setBody(sdp); + m_tr->setResponse(m); + m->deref(); +// m_tr->setResponse(200, "OK"); + m_tr->deref(); + m_tr = 0; + } + setStatus("answered"); +} + +bool SipMsgThread::route() +{ + Debug(DebugAll,"Routing thread for %s [%p]",m_id.c_str(),this); + Engine::dispatch(m_msg); + *m_msg = "call.route"; + m_msg->retValue().clear(); + bool ok = Engine::dispatch(m_msg) && !m_msg->retValue().null(); + if (m_tr->getState() != SIPTransaction::Process) { + Debug(DebugInfo,"SIP call %s (%p) vanished while routing!",m_id.c_str(),m_tr); + return false; + } + if (ok) { + *m_msg = "call.execute"; + m_msg->addParam("callto",m_msg->retValue()); + m_msg->retValue().clear(); + YateSIPConnection* conn = new YateSIPConnection(*m_msg,m_tr); + m_msg->userData(conn); + if (Engine::dispatch(m_msg)) { + Debug(DebugInfo,"Routing SIP call %s (%p) to '%s' [%p]", + m_id.c_str(),m_tr,m_msg->getValue("callto"),this); + conn->setStatus("routed"); + conn->setTarget(m_msg->getValue("targetid")); + if (conn->getTarget().null()) { + Debug(DebugInfo,"Answering now SIP call %s [%p] because we have no targetid", + conn->id().c_str(),conn); + conn->answered(); + } + else + m_tr->setResponse(183, "Session Progress"); + } + else { + Debug(DebugInfo,"Rejecting unconnected SIP call %s (%p) [%p]", + m_id.c_str(),m_tr,this); + m_tr->setResponse(500, "Server Internal Error"); + conn->setStatus("rejected"); + conn->destruct(); + } + } + else { + Debug(DebugInfo,"Rejecting unrouted SIP call %s (%p) [%p]", + m_id.c_str(),m_tr,this); + m_tr->setResponse(404, "Not Found"); + } + return ok; +} + +void SipMsgThread::run() +{ + s_route.lock(); + s_count++; + s_route.unlock(); + Debug(DebugAll,"Started routing thread for %s (%p) [%p]", + m_id.c_str(),m_tr,this); + bool ok = route(); + s_route.lock(); + s_count--; + if (ok) + s_routed++; + s_route.unlock(); +} + +void SipMsgThread::cleanup() +{ + Debug(DebugAll,"Cleaning up routing thread for %s (%p) [%p]", + m_id.c_str(),m_tr,this); + delete m_msg; + m_tr->deref(); } bool SIPHandler::received(Message &msg) { String dest(msg.getValue("callto")); - if (!dest.startSkip("sip2/",false)) + if (!dest.startSkip("sip/",false)) return false; - SIPMessage* m = new SIPMessage("INVITE",dest); -// m->complete(plugin.ep()->engine()); - SDPBody* sdp = new SDPBody; - sdp->addLine("v","0"); - sdp->addLine("s","Call"); - sdp->addLine("t","0 0"); - m->setBody(sdp); - plugin.ep()->engine()->addMessage(m); - m->deref(); + if (!msg.userData()) { + Debug(DebugWarn,"SIP call found but no data channel!"); + return false; + } + YateSIPConnection* conn = new YateSIPConnection(msg,dest); + if (conn->getTransaction()) { + DataEndpoint *dd = static_cast(msg.userData()); + if (dd && conn->connect(dd)) { + msg.addParam("targetid",conn->id()); + conn->setTarget(msg.getValue("id")); + conn->deref(); + return true; + } + } + conn->destruct(); return false; } +bool SIPConnHandler::received(Message &msg, int id) +{ + String callid; + switch (id) { + case Answered: + case Ringing: + callid = msg.getParam("targetid"); + break; + case Drop: + callid = msg.getParam("id"); + break; + default: + return false; + } + if (callid.null()) { + if (id == Drop) { + Debug("SIP",DebugInfo,"Dropping all calls"); + s_calls.clear(); + } + return false; + } + Lock lock(s_mutex); + YateSIPConnection* conn = YateSIPConnection::find(callid); + if (!conn) + return false; + switch (id) { + case Drop: + lock.drop(); + conn->disconnect(); + break; + case Ringing: + conn->ringing(&msg); + break; + case Answered: + conn->answered(&msg); + break; + default: + return false; + } + return true; +} + SIPPlugin::SIPPlugin() : m_handler(0), m_endpoint(0) { @@ -403,8 +849,12 @@ void SIPPlugin::initialize() m_endpoint->startup(); } if (!m_handler) { - m_handler = new SIPHandler("call.execute"); - Engine::install(m_handler); + m_handler = new SIPConnHandler; + Engine::install(new MessageRelay("call.ringing",m_handler,SIPConnHandler::Ringing)); + Engine::install(new MessageRelay("call.answered",m_handler,SIPConnHandler::Answered)); + Engine::install(new MessageRelay("call.drop",m_handler,SIPConnHandler::Drop)); + Engine::install(new SIPHandler("call.execute")); +// Engine::install(new StatusHandler("engine.status")); } } diff --git a/modules/zapchan.cpp b/modules/zapchan.cpp index a4760c52..f66608f3 100644 --- a/modules/zapchan.cpp +++ b/modules/zapchan.cpp @@ -1342,7 +1342,7 @@ bool StatusHandler::received(Message &msg) const char *sel = msg.getValue("module"); if (sel && ::strcmp(sel,"zapchan") && ::strcmp(sel,"fixchans")) return false; - String st("name=zapchan,type=fixchans,format=Span|Chan|Status"); + String st("name=zapchan,type=fixchans,format=Status|Span/Chan"); zplugin.mutex.lock(); const ObjList *l = &zplugin.m_spans; st << ",spans=" << l->count() << ",spanlen="; @@ -1371,7 +1371,7 @@ bool StatusHandler::received(Message &msg) else st << ","; st << c->id() << "="; - st << s->span() << "|" << n << "|" << c->status(); + st << c->status() << "|" << s->span() << "/" << n; } } }