diff --git a/conf.d/mgcpca.conf.sample b/conf.d/mgcpca.conf.sample index 2ac46903..685ced32 100644 --- a/conf.d/mgcpca.conf.sample +++ b/conf.d/mgcpca.conf.sample @@ -51,4 +51,6 @@ ;port=2427 ; default: bool: This is the default gateway to send commands to +; The first gateway which is not explicitely disabled or the last that is +; explicitely enabled will become default ;default=no diff --git a/modules/server/mgcpca.cpp b/modules/server/mgcpca.cpp index fe1d6c3b..8b931f69 100644 --- a/modules/server/mgcpca.cpp +++ b/modules/server/mgcpca.cpp @@ -49,7 +49,7 @@ class MGCPWrapper : public DataEndpoint { YCLASS(MGCPWrapper,DataEndpoint) public: - MGCPWrapper(CallEndpoint* conn, const char* media, Message& msg); + MGCPWrapper(CallEndpoint* conn, const char* media, Message& msg, const char* epId); ~MGCPWrapper(); bool sendDTMF(const String& tones); void gotDTMF(char tone); @@ -94,8 +94,19 @@ public: virtual ~MGCPSpan(); inline const String& ntfyId() const { return m_notify; } + inline const MGCPEndpointId& epId() const + { return m_epId; } + inline MGCPEndpointId& epId() + { return m_epId; } inline bool operational() const { return m_operational; } + inline const String& address() const + { return m_address; } + inline bool fxo() const + { return m_fxo; } + inline bool fxs() const + { return m_fxs; } + bool ownsId(const String& rqId) const; static void* create(const String& type, const NamedList& name); static MGCPSpan* findNotify(const String& id); bool matchEndpoint(const MGCPEndpointId& ep); @@ -107,25 +118,72 @@ private: void clearCircuits(); MGCPCircuit* findCircuit(const String& epId, const String& rqId) const; void operational(bool active); + void operational(const SocketAddr& address); MGCPCircuit** m_circuits; unsigned int m_count; MGCPEndpointId m_epId; bool m_operational; + bool m_fxo; + bool m_fxs; String m_notify; + String m_address; }; class MGCPCircuit : public SignallingCircuit { public: - MGCPCircuit(unsigned int code, MGCPSpan* span); + MGCPCircuit(unsigned int code, MGCPSpan* span, const char* id); virtual ~MGCPCircuit(); + virtual void* getObject(const String& name) const; virtual bool status(Status newStat, bool sync); + virtual bool updateFormat(const char* format, int direction); + virtual bool sendEvent(SignallingCircuitEvent::Type type, NamedList* params); + inline const String& epId() const + { return m_epId; } inline const String& ntfyId() const { return m_notify; } - bool processNotify(const String& event); + inline const String& connId() const + { return m_connId; } + inline bool hasRtp() const + { return m_rtpId && (m_source || m_consumer); } + inline MGCPSpan* mySpan() + { return static_cast(span()); } + inline bool fxo() + { return mySpan()->fxo(); } + inline bool fxs() + { return mySpan()->fxs(); } + bool processEvent(MGCPTransaction* tr, MGCPMessage* mm); + bool processNotify(const String& package, const String& event, const String& fullName); + void clearConn(); private: + MGCPMessage* message(const char* cmd); + bool sendAsync(MGCPMessage* mm); + RefPointer sendSync(MGCPMessage* mm); + bool sendRequest(const char* sigReq, const char* reqEvt = 0, const char* digitMap = 0); + bool enqueueEvent(SignallingCircuitEvent::Type type, const char* name, const char* dtmf = 0); + void cleanupRtp(); + bool createRtp(); + bool startRtp(); + bool setupConn(); + String m_epId; Status m_statusReq; String m_notify; + // Connection data + String m_connId; + String m_callId; + // Local RTP related data + RefPointer m_source; + RefPointer m_consumer; + String m_rtpId; + String m_localIp; + int m_localPort; + // Remote (MGCP GW side) RTP data + String m_remoteIp; + int m_remotePort; + int m_remotePayload; + // Synchronous transaction data + MGCPTransaction* m_tr; + RefPointer m_msg; }; class RtpHandler : public MessageHandler @@ -172,7 +230,7 @@ static Mutex s_mutex; // Copy one parameter (if present) with new name -bool copyRename(NamedList& dest, const char* dname, const NamedList& src, const String& sname) +static bool copyRename(NamedList& dest, const char* dname, const NamedList& src, const String& sname) { if (!sname) return false; @@ -183,6 +241,31 @@ bool copyRename(NamedList& dest, const char* dname, const NamedList& src, const return true; } +// Increment the number at the end of a name by an offset +static bool increment(String& name, unsigned int offs) +{ + unsigned int i = name.length(); + while (i--) { + char c = name.at(i); + if ((c < '0') || (c > '9')) + return false; + int d = offs % 10; + offs /= 10; + if (d) { + c += d; + if (c > '9') { + c -= 10; + offs++; + } + const_cast(name.c_str())[i] = c; + } + if (0 == offs) + return true; + } + return false; +} + + YMGCPEngine::~YMGCPEngine() { s_engine = 0; @@ -194,14 +277,17 @@ bool YMGCPEngine::processEvent(MGCPTransaction* trans, MGCPMessage* msg, void* d { MGCPWrapper* wrap = YOBJECT(MGCPWrapper,static_cast(data)); MGCPSpan* span = YOBJECT(MGCPSpan,static_cast(data)); - Debug(this,DebugAll,"YMGCPEngine::processEvent(%p,%p,%p) wrap=%p span=%p [%p]", - trans,msg,data,wrap,span,this); + MGCPCircuit* circ = YOBJECT(MGCPCircuit,static_cast(data)); + Debug(this,DebugAll,"YMGCPEngine::processEvent(%p,%p,%p) wrap=%p span=%p circ=%p [%p]", + trans,msg,data,wrap,span,circ,this); if (!trans) return false; if (wrap) return wrap->processEvent(trans,msg); if (span) return span->processEvent(trans,msg); + if (circ) + return circ->processEvent(trans,msg); if (!msg) return false; if (!data && !trans->outgoing() && msg->isCommand()) { @@ -290,19 +376,18 @@ bool YMGCPEngine::processEvent(MGCPTransaction* trans, MGCPMessage* msg, void* d } -MGCPWrapper::MGCPWrapper(CallEndpoint* conn, const char* media, Message& msg) +MGCPWrapper::MGCPWrapper(CallEndpoint* conn, const char* media, Message& msg, const char* epId) : DataEndpoint(conn,media), - m_tr(0) + m_tr(0), m_connEp(epId) { - Debug(&splugin,DebugAll,"MGCPWrapper::MGCPWrapper(%p,'%s') [%p]", - conn,media,this); + Debug(&splugin,DebugAll,"MGCPWrapper::MGCPWrapper(%p,'%s','%s') [%p]", + conn,media,epId,this); m_id = "mgcp/"; m_id << (unsigned int)::random(); if (conn) m_master = conn->id(); m_master = msg.getValue("id",(conn ? conn->id().c_str() : (const char*)0)); m_audio = (name() == "audio"); - m_connEp = msg.getValue("mgcp_endpoint",s_defaultEp); s_mutex.lock(); s_wrappers.append(this); // setupRTP(localip,rtcp); @@ -351,9 +436,6 @@ bool MGCPWrapper::processEvent(MGCPTransaction* tr, MGCPMessage* mm) // Process incoming notify events for this wrapper bool MGCPWrapper::processNotify(MGCPTransaction* tr, MGCPMessage* mm, const String& event) { - Debug(&splugin,DebugStub,"MGCPWrapper::processNotify(%p,%p,'%s') [%p]", - tr,mm,event.c_str(),this); - if (event.null()) return false; else if (event.find(',') >= 0) { @@ -365,8 +447,11 @@ bool MGCPWrapper::processNotify(MGCPTransaction* tr, MGCPMessage* mm, const Stri delete l; return ok; } - else + else { + Debug(&splugin,DebugStub,"MGCPWrapper::processNotify(%p,%p,'%s') [%p]", + tr,mm,event.c_str(),this); return false; + } return true; } @@ -530,7 +615,7 @@ MGCPWrapper* MGCPWrapper::findNotify(const String& id) // Send a DTMF as a sequence of package D events bool MGCPWrapper::sendDTMF(const String& tones) { - Debug(&splugin,DebugStub,"MGCPWrapper::sendDTMF('%s') [%p]", + DDebug(&splugin,DebugInfo,"MGCPWrapper::sendDTMF('%s') [%p]", tones.c_str(),this); MGCPEpInfo* ep = s_endpoint->find(m_connEp); if (!ep) @@ -549,7 +634,7 @@ bool MGCPWrapper::sendDTMF(const String& tones) void MGCPWrapper::gotDTMF(char tone) { - Debug(&splugin,DebugInfo,"MGCPWrapper::gotDTMF('%c') [%p]",tone,this); + DDebug(&splugin,DebugInfo,"MGCPWrapper::gotDTMF('%c') [%p]",tone,this); if (m_master.null()) return; char buf[2]; @@ -576,7 +661,7 @@ bool MGCPWrapper::nativeConnect(DataEndpoint* peer) Debug(&splugin,DebugWarn,"Not bridging to uninitialized %p [%p]",other,this); return false; } - Debug(&splugin,DebugStub,"Native bridging to %p [%p]",other,this); + Debug(&splugin,DebugNote,"Native bridging to %p [%p]",other,this); MGCPEpInfo* ep = s_endpoint->find(m_connEp); if (!ep) return false; @@ -614,7 +699,7 @@ MGCPSpan* MGCPSpan::findNotify(const String& id) ObjList* l = &s_spans; for (; l; l=l->next()) { MGCPSpan* s = static_cast(l->get()); - if (s && (s->ntfyId() == id)) + if (s && s->ownsId(id)) return s; } return 0; @@ -623,12 +708,26 @@ MGCPSpan* MGCPSpan::findNotify(const String& id) MGCPSpan::MGCPSpan(const NamedList& params, const char* name, const MGCPEpInfo& ep) : SignallingCircuitSpan(params.getValue("debugname",name), static_cast(params.getObject("SignallingCircuitGroup"))), - m_circuits(0), m_count(1), m_epId(ep), m_operational(false) + m_circuits(0), m_count(1), m_epId(ep), m_operational(false), + m_fxo(false), m_fxs(false) { Debug(&splugin,DebugAll,"MGCPSpan::MGCPSpan(%p,'%s') [%p]", ¶ms,name,this); u_int32_t ntfy = (u_int32_t)::random(); m_notify.hexify(&ntfy,sizeof(ntfy)); + const AnalogLineGroup* analog = YOBJECT(AnalogLineGroup,group()); + if (analog) { + switch (analog->type()) { + case AnalogLine::FXO: + m_fxo = true; + break; + case AnalogLine::FXS: + m_fxs = true; + break; + default: + break; + } + } s_mutex.lock(); s_spans.append(this); s_mutex.unlock(); @@ -672,10 +771,18 @@ bool MGCPSpan::init(const NamedList& params) m_circuits[i] = 0; bool ok = true; for (i = 0; i < m_count; i++) { - MGCPCircuit* circuit = new MGCPCircuit(cicStart + i,this); + String name = epId().id(); + if (!increment(name,i)) { + Debug(m_group,DebugWarn,"MGCPSpan('%s'). Failed to increment name by %u. Rollback [%p]", + id().safe(),i,this); + clearCircuits(); + ok = false; + break; + } + MGCPCircuit* circuit = new MGCPCircuit(cicStart + i,this,name); m_circuits[i] = circuit; if (!m_group->insert(circuit)) { - Debug(m_group,DebugNote,"MGCPSpan('%s'). Failed to create/insert circuit %u. Rollback [%p]", + Debug(m_group,DebugWarn,"MGCPSpan('%s'). Failed to create/insert circuit %u. Rollback [%p]", id().safe(),cicStart + i,this); clearCircuits(); ok = false; @@ -693,15 +800,23 @@ void MGCPSpan::operational(bool active) { if (active == m_operational) return; - Debug(&splugin,DebugNote,"MGCPSpan '%s' is%s operational [%p]", + Debug(&splugin,DebugCall,"MGCPSpan '%s' is%s operational [%p]", id().c_str(),(active ? "" : " not"),this); m_operational = active; } +// Set the operational state and copy GW address +void MGCPSpan::operational(const SocketAddr& address) +{ + if (address.valid() && address.host()) + m_address = address.host(); + operational(true); +} + // Check if this span matches an endpoint ID bool MGCPSpan::matchEndpoint(const MGCPEndpointId& ep) { - if (ep.port() != m_epId.port()) + if (ep.port() && (ep.port() != m_epId.port())) return false; if (ep.host() |= m_epId.host()) return false; @@ -709,6 +824,26 @@ bool MGCPSpan::matchEndpoint(const MGCPEndpointId& ep) return true; if (ep.user() == "*") return true; + String tmp = ep.user(); + Regexp r("^\\(.*\\)\\[\\([0-9]\\+\\)-\\([0-9]\\+\\)\\]$"); + if (!(tmp.matches(r) && m_epId.user().startsWith(tmp.matchString(1),false,true))) + return false; + int idx = m_epId.user().substr(tmp.matchLength(1)).toInteger(-1,10); + if (idx < 0) + return false; + return (tmp.matchString(2).toInteger(idx+1,10) <= idx) && (idx <= tmp.matchString(3).toInteger(-1,10)); +} + +// Check if a request Id is for this span or one of its circuits +bool MGCPSpan::ownsId(const String& rqId) const +{ + if (ntfyId() == rqId) + return true; + for (unsigned int i = 0; i < m_count; i++) { + MGCPCircuit* circuit = m_circuits[i]; + if (circuit && (circuit->ntfyId() == rqId)) + return true; + } return false; } @@ -736,7 +871,7 @@ MGCPCircuit* MGCPSpan::findCircuit(const String& epId, const String& rqId) const // Process incoming events for this span bool MGCPSpan::processEvent(MGCPTransaction* tr, MGCPMessage* mm) { - Debug(&splugin,DebugStub,"MGCPSpan::processEvent(%p,%p) '%s' [%p]", + DDebug(&splugin,DebugInfo,"MGCPSpan::processEvent(%p,%p) '%s' [%p]", tr,mm,mm->name().c_str(),this); if (mm->name() == "NTFY") { @@ -756,7 +891,7 @@ bool MGCPSpan::processEvent(MGCPTransaction* tr, MGCPMessage* mm) // Process incoming notify events for this span bool MGCPSpan::processNotify(MGCPTransaction* tr, MGCPMessage* mm, const String& event, const String& requestId) { - Debug(&splugin,DebugStub,"MGCPSpan::processNotify(%p,%p,'%s','%s') [%p]", + DDebug(&splugin,DebugInfo,"MGCPSpan::processNotify(%p,%p,'%s','%s') [%p]", tr,mm,event.c_str(),requestId.c_str(),this); if (event.null()) @@ -772,30 +907,23 @@ bool MGCPSpan::processNotify(MGCPTransaction* tr, MGCPMessage* mm, const String& } else { MGCPCircuit* circuit = findCircuit(mm->endpointId(),requestId); - return circuit && circuit->processNotify(event); + if (!circuit) + return false; + int pos = event.find('/'); + if (pos <= 0) + return false; + return circuit->processNotify(event.substr(0,pos).trimBlanks().toUpper(), + event.substr(pos+1).trimBlanks(),event); } -#if 0 - else if (event &= "L/hd") { - // Line off-hook - } - else if (event &= "L/hu") { - // Line on-hook - } - else - return false; - return true; -#endif } // Process gateway restart events for this span bool MGCPSpan::processRestart(MGCPTransaction* tr, MGCPMessage* mm, const String& method) { - Debug(&splugin,DebugStub,"MGCPSpan::processRestart(%p,%p,'%s') [%p]", + DDebug(&splugin,DebugInfo,"MGCPSpan::processRestart(%p,%p,'%s') [%p]", tr,mm,method.c_str(),this); - if (method &= "X-KeepAlive") - return true; - if (method &= "disconnected") { - operational(true); + if ((method &= "X-KeepAlive") || (method &= "disconnected") || (method &= "restart")) { + operational(tr->addr()); } else if (method &= "graceful") operational(false); @@ -808,14 +936,17 @@ bool MGCPSpan::processRestart(MGCPTransaction* tr, MGCPMessage* mm, const String } -MGCPCircuit::MGCPCircuit(unsigned int code, MGCPSpan* span) +MGCPCircuit::MGCPCircuit(unsigned int code, MGCPSpan* span, const char* id) : SignallingCircuit(RTP,code,Missing,span->group(),span), - m_statusReq(Missing) + m_epId(id), m_statusReq(Missing), + m_localPort(0), m_remotePort(0), m_remotePayload(-1), m_tr(0) { - Debug(&splugin,DebugAll,"MGCPCircuit::MGCPCircuit(%u,%p) [%p]", - code,span,this); + Debug(&splugin,DebugAll,"MGCPCircuit::MGCPCircuit(%u,%p,'%s') [%p]", + code,span,id,this); u_int32_t cic = code; m_notify.hexify(&cic,sizeof(cic)); + m_callId.hexify(this,sizeof(this)); + m_callId += m_notify; m_notify = span->ntfyId() + m_notify; } @@ -823,6 +954,229 @@ MGCPCircuit::~MGCPCircuit() { Debug(&splugin,DebugAll,"MGCPCircuit::~MGCPCircuit() %u [%p]", code(),this); + s_mutex.lock(); + if (m_tr) { + m_tr->userData(0); + m_tr = 0; + } + s_mutex.unlock(); + m_msg = 0; + cleanupRtp(); + clearConn(); +} + +void* MGCPCircuit::getObject(const String& name) const +{ + if (SignallingCircuit::status() == Connected) { + if (name == "DataSource") + return m_source; + if (name == "DataConsumer") + return m_consumer; + } + if (name == "MGCPCircuit") + return (void*)this; + return SignallingCircuit::getObject(name); +} + +// Clean up any RTP we may still hold +void MGCPCircuit::cleanupRtp() +{ + if (m_rtpId) { + m_rtpId.clear(); + m_localIp.clear(); + m_localPort = 0; + } + m_source = 0; + m_consumer = 0; +} + +// Create a local RTP instance +bool MGCPCircuit::createRtp() +{ + if (hasRtp()) + return true; + cleanupRtp(); + Message m("chan.rtp"); + RefPointer de = new DataEndpoint; + de->deref(); + m.userData(de); + m.addParam("direction","bidir"); + if (m_remoteIp && m_remotePort) { + m.addParam("remoteip",m_remoteIp); + m.addParam("remoteport",String(m_remotePort)); + } + else + m.addParam("remoteip",mySpan()->address()); + m.addParam("mgcp_allowed",String::boolText(false)); + if (Engine::dispatch(m)) { + m_source = de->getSource(); + m_consumer = de->getConsumer(); + m_rtpId = m.getValue("rtpid"); + m_localIp = m.getValue("localip"); + m_localPort = m.getIntValue("localport"); + if (m_localIp && m_localPort && hasRtp()) + return true; + } + m_localIp.clear(); + m_localPort = 0; + Debug(&splugin,DebugWarn,"MGCPCircuit::createRtp() failed [%p]",this); + cleanupRtp(); + return false; +} + +// Start the local RTP instance +bool MGCPCircuit::startRtp() +{ + if (!(m_remoteIp && m_remotePort && (m_remotePayload >= 0) && hasRtp())) + return false; + Message m("chan.rtp"); + m.addParam("direction","bidir"); + m.addParam("rtpid",m_rtpId); + m.addParam("remoteip",m_remoteIp); + m.addParam("remoteport",String(m_remotePort)); + m.addParam("payload",String(m_remotePayload)); + if (m_localIp) + m.addParam("localip",m_localIp); + if (m_localPort) + m.addParam("localport",String(m_localPort)); + m.addParam("mgcp_allowed",String::boolText(false)); + if (Engine::dispatch(m)) + return true; + Debug(&splugin,DebugWarn,"MGCPCircuit::startRtp() failed [%p]",this); + return false; +} + +// Create or update remote connection +bool MGCPCircuit::setupConn() +{ + RefPointer mm = message(m_connId.null() ? "CRCX" : "MDCX"); + mm->params.addParam("C",m_callId); + if (m_connId) + mm->params.addParam("I",m_connId); + if (m_localIp && m_localPort) { + String mLine("audio "); + mLine << m_localPort << " RTP/AVP 0 8"; + MimeSdpBody* sdp = new MimeSdpBody; + sdp->addLine("v","0"); + sdp->addLine("o",m_localIp); + sdp->addLine("s","PSTN Circuit"); + sdp->addLine("c","IN IP4 " + m_localIp); + sdp->addLine("t","0 0"); + sdp->addLine("m",mLine); + mm->sdp.append(sdp); + } + mm = sendSync(mm); + if (!mm) + return false; + if (m_connId.null()) + m_connId = mm->params.getParam("i"); + if (m_connId.null()) + return false; + MimeSdpBody* sdp = static_cast(mm->sdp[0]); + if (sdp) { + m_remoteIp.clear(); + m_remotePort = 0; + const NamedString* c = sdp->getLine("c"); + if (c) { + String tmp(*c); + if (tmp.startSkip("IN IP4")) { + tmp.trimBlanks(); + if (tmp == "0.0.0.0") + tmp.clear(); + m_remoteIp = tmp; + } + } + c = sdp->getLine("m"); + for (; c; c = sdp->getNextLine(c)) { + String tmp(*c); + if (!tmp.startSkip("audio",true)) + continue; + int port = 0; + tmp >> port >> " "; + if (!tmp.startSkip("RTP/AVP",true,true)) + continue; + m_remotePort = port; + tmp >> m_remotePayload; + return (m_remotePayload >= 0) && (m_remotePayload <= 255); + } + } + return true; +} + +// Delete remote connection if any +void MGCPCircuit::clearConn() +{ + if (m_connId.null()) + return; + MGCPMessage* mm = message("DLCX"); + mm->params.addParam("C",m_callId); + mm->params.addParam("I",m_connId); + m_connId.clear(); + m_remoteIp.clear(); + m_remotePort = 0; + sendAsync(mm); +} + +// Build a MGCP message +MGCPMessage* MGCPCircuit::message(const char* cmd) +{ + return new MGCPMessage(s_engine,cmd,epId()); +} + +// Send a MGCP message asynchronously +bool MGCPCircuit::sendAsync(MGCPMessage* mm) +{ + if (!mm) + return false; + MGCPEpInfo* ep = s_endpoint->find(mySpan()->epId().id()); + if (ep && s_engine->sendCommand(mm,ep->address)) + return true; + TelEngine::destruct(mm); + return false; +} + +// Send a MGCP message, wait for an answer and return it +RefPointer MGCPCircuit::sendSync(MGCPMessage* mm) +{ + if (!mm) + return 0; + MGCPEpInfo* ep = s_endpoint->find(mySpan()->epId().id()); + if (!ep) { + TelEngine::destruct(mm); + return 0; + } + while (m_msg) { + if (Thread::check(false)) + return 0; + Thread::msleep(10); + } + MGCPTransaction* tr = s_engine->sendCommand(mm,ep->address); + tr->userData(static_cast(this)); + m_tr = tr; + while (m_tr == tr) + Thread::msleep(10); + RefPointer tmp = m_msg; + m_msg = 0; + if (tmp) + Debug(&splugin,DebugNote,"MGCPCircuit::sendSync() returning %d '%s' [%p]", + tmp->code(),tmp->comment().c_str(),this); + else + Debug(&splugin,DebugMild,"MGCPCircuit::sendSync() returning NULL [%p]",this); + return tmp; +} + +// Send asynchronously a notification request +bool MGCPCircuit::sendRequest(const char* sigReq, const char* reqEvt, const char* digitMap) +{ + MGCPMessage* mm = message("RQNT"); + mm->params.addParam("X",m_notify); + if (sigReq) + mm->params.addParam("S",sigReq); + if (reqEvt) + mm->params.addParam("R",reqEvt); + if (digitMap) + mm->params.addParam("D",digitMap); + return sendAsync(mm); } // Circuit status change request @@ -832,23 +1186,125 @@ bool MGCPCircuit::status(Status newStat, bool sync) lookupStatus(newStat),String::boolText(sync),this); if ((newStat == m_statusReq) && ((SignallingCircuit::status() == newStat) || !sync)) return true; - if (!static_cast(span())->operational()) { + if (!mySpan()->operational()) { if (newStat >= Idle) return false; } m_statusReq = newStat; - // !!! + switch (newStat) { + case Connected: + if (createRtp() && setupConn() && startRtp()) + break; + m_statusReq = SignallingCircuit::status(); + return false; + default: + cleanupRtp(); + clearConn(); + } return SignallingCircuit::status(newStat,sync); } -// Process notifications for this circuit -bool MGCPCircuit::processNotify(const String& event) +// Change the format of this circuit +bool MGCPCircuit::updateFormat(const char* format, int direction) { - Debug(&splugin,DebugStub,"MGCPCircuit::processNotify('%s') %u [%p]", - event.c_str(),code(),this); + if (!format) + return false; + Debug(&splugin,DebugStub,"MGCPCircuit::updateFormat('%s',%d) %u [%p]", + format,direction,code(),this); return false; } +// Send out an event on this circuit +bool MGCPCircuit::sendEvent(SignallingCircuitEvent::Type type, NamedList* params) +{ + DDebug(&splugin,DebugAll,"MGCPCircuit::sendEvent(%u,%p) %u [%p]", + type,params,code(),this); + switch (type) { + case SignallingCircuitEvent::RingBegin: + return fxs() && sendRequest("L/rg"); +// case SignallingCircuitEvent::RingEnd: +// return fxs() && sendRequest("L/rg(-)"); + case SignallingCircuitEvent::Polarity: + return fxs() && sendRequest("L/lsa"); + case SignallingCircuitEvent::OffHook: + return fxo() && sendRequest("L/hd","L/lsa(N)"); + case SignallingCircuitEvent::OnHook: + return fxo() && sendRequest("L/hu"); + case SignallingCircuitEvent::Flash: + return fxo() && sendRequest("L/hf"); + case SignallingCircuitEvent::Dtmf: + if (params) + return sendRequest("D/" + *params); + break; + default: + ; + } + return SignallingCircuit::sendEvent(type,params); +} + +// Process incoming events for this circuit +bool MGCPCircuit::processEvent(MGCPTransaction* tr, MGCPMessage* mm) +{ + Debug(&splugin,DebugAll,"MGCPCircuit::processEvent(%p,%p) [%p]", + tr,mm,this); + if (tr == m_tr) { + if (!mm || (tr->msgResponse())) { + tr->userData(0); + m_msg = mm; + m_tr = 0; + } + } + return false; +} + +// Process notifications for this circuit +bool MGCPCircuit::processNotify(const String& package, const String& event, const String& fullName) +{ + DDebug(&splugin,DebugAll,"MGCPCircuit::processNotify('%s','%s') %u [%p]", + package.c_str(),event.c_str(),code(),this); + if (package.null() || event.null()) + return false; + if ((package == "L") || (package == "H")) { + // Line or Handset events + if (event &= "hd") { + if (!mySpan()->operational()) { + Debug(&splugin,DebugMild,"Got Off-Hook on non-operational span '%s' [%p]", + mySpan()->id().c_str(),this); + return false; + } + if (fxs()) + sendRequest(0,"L/hu(N),D/[0-9#*](N)"); + return enqueueEvent(SignallingCircuitEvent::OffHook,fullName); + } + else if (event &= "hu") { + if (SignallingCircuit::status() == Connected) + status(Idle,false); + return enqueueEvent(SignallingCircuitEvent::OnHook,fullName); + } + else if (event &= "hf") + return enqueueEvent(SignallingCircuitEvent::Flash,fullName); + else if (event &= "lsa") + return enqueueEvent(SignallingCircuitEvent::Polarity,fullName); + } + else if (package == "D") { + // DTMF events + if (event.length() == 1) + return enqueueEvent(SignallingCircuitEvent::Dtmf,fullName,event); + } + return false; +} + +// Enqueue an event detected by this circuit +bool MGCPCircuit::enqueueEvent(SignallingCircuitEvent::Type type, const char* name, const char* dtmf) +{ + DDebug(&splugin,DebugAll,"Enqueueing event %u '%s' '%s'",type,name,dtmf); + SignallingCircuitEvent* ev = new SignallingCircuitEvent(this,type,name); + if (dtmf) + ev->addParam("tone",dtmf); + addEvent(ev); + return true; +} + // Handler for chan.rtp messages - one per media type bool RtpHandler::received(Message& msg) @@ -861,15 +1317,12 @@ bool RtpHandler::received(Message& msg) return false; Debug(&splugin,DebugAll,"RTP message received"); - MGCPWrapper* w = 0; + CallEndpoint* ch = YOBJECT(CallEndpoint,msg.userData()); const char* media = msg.getValue("media","audio"); - CallEndpoint *ch = static_cast(msg.userData()); - if (ch) { - w = MGCPWrapper::find(ch,media); - if (w) - Debug(&splugin,DebugAll,"Wrapper %p found by CallEndpoint",w); - } - if (!w) { + MGCPWrapper* w = MGCPWrapper::find(ch,media); + if (w) + Debug(&splugin,DebugAll,"Wrapper %p found by CallEndpoint",w); + else { w = MGCPWrapper::find(msg.getValue("rtpid")); if (w) Debug(&splugin,DebugAll,"Wrapper %p found by ID",w); @@ -882,9 +1335,13 @@ bool RtpHandler::received(Message& msg) if (w) return w->rtpMessage(msg); + const char* epId = msg.getValue("mgcp_endpoint",s_defaultEp); + if (!epId) + return false; + if (ch) ch->clearEndpoint(media); - w = new MGCPWrapper(ch,media,msg); + w = new MGCPWrapper(ch,media,msg,epId); if (!w->rtpMessage(msg)) return false; if (ch && ch->getPeer()) @@ -991,7 +1448,7 @@ void MGCPPlugin::initialize() ); if (ep) { ep->alias = sect->getValue("name",name); - if (s_defaultEp.null() || sect->getBoolValue("default")) + if (sect->getBoolValue("default",s_defaultEp.null())) s_defaultEp = ep->toString(); } else @@ -1003,7 +1460,8 @@ void MGCPPlugin::initialize() Debug(this,DebugAll,"No gateways defined so module not initialized."); break; } - Debug(this,DebugNote,"Default remote endpoint: '%s'",s_defaultEp.c_str()); + if (s_defaultEp) + Debug(this,DebugCall,"Default remote endpoint: '%s'",s_defaultEp.c_str()); int prio = cfg.getIntValue("general","priority",80); if (prio > 0) { Engine::install(new RtpHandler(prio));