From e93ebd064373c881f247082a73d21118bc206fa3 Mon Sep 17 00:00:00 2001 From: marian Date: Wed, 28 Mar 2007 16:17:41 +0000 Subject: [PATCH] Work in progress git-svn-id: http://yate.null.ro/svn/yate/trunk@1238 acf43c95-373e-0410-b603-e72c3f656dc1 --- modules/wpcard.cpp | 20 +- modules/ysigchan.cpp | 677 ++++++++++++++++++++++++++++++++++--------- 2 files changed, 552 insertions(+), 145 deletions(-) diff --git a/modules/wpcard.cpp b/modules/wpcard.cpp index 09fca3cd..5f795d07 100644 --- a/modules/wpcard.cpp +++ b/modules/wpcard.cpp @@ -495,13 +495,13 @@ WpInterface::WpInterface(const NamedList& params) m_thread(0), m_received(false), m_overRead(0) { setName(params.getValue("debugname","WpInterface")); - DDebug(this,DebugAll,"WpInterface::WpInterface() [%p]",this); + XDebug(this,DebugAll,"WpInterface::WpInterface() [%p]",this); } WpInterface::~WpInterface() { control(Disable,0); - DDebug(this,DebugAll,"WpInterface::~WpInterface() [%p]",this); + XDebug(this,DebugAll,"WpInterface::~WpInterface() [%p]",this); } bool WpInterface::init(NamedList& params) @@ -812,6 +812,7 @@ bool WpCircuit::status(Status newStat, bool sync) bool enableData = false; if (SignallingCircuit::status() == Connected) enableData = true; + // Don't put this message for final states DDebug(group(),DebugAll, "WpCircuit %u. Changed status to '%u'. %s data transfer [%p]", code(),newStat,enableData ? "Enable" : "Disable",this); @@ -835,6 +836,8 @@ bool WpCircuit::status(Status newStat, bool sync) m_consumer->m_total = 0; } if (m_sourceValid) { + // Remove consumer + m_source->attach(0,true); m_sourceValid = 0; XDebug(group(),DebugAll,"WpCircuit %u. Source transferred %u byte(s) [%p]", code(),m_source->m_total,this); @@ -878,9 +881,9 @@ void* WpCircuit::getObject(const String& name) const if (!group()) return 0; if (name == "DataSource") - return m_source; + return m_sourceValid; if (name == "DataConsumer") - return m_consumer; + return m_consumerValid; return 0; } @@ -945,7 +948,7 @@ WpData::~WpData() // Initialize bool WpData::init(NamedList& params) { - DDebug(m_group,DebugAll,"WpData('%s'). Initializing [%p]",id().safe(),this); + XDebug(m_group,DebugAll,"WpData('%s'). Initializing [%p]",id().safe(),this); if (!m_group) { Debug(DebugNote,"WpData('%s'). Circuit group is missing [%p]", id().safe(),this); @@ -981,6 +984,13 @@ bool WpData::init(NamedList& params) if (!m_samples) m_samples = 50; } + else if (type == "T1") { + m_chans = 23; + if (cics.null()) + cics = "1-23"; + if (!m_samples) + m_samples = 64; + } else { Debug(m_group,DebugNote,"WpData('%s'). Invalid voice group type '%s' [%p]", id().safe(),type.safe(),this); diff --git a/modules/ysigchan.cpp b/modules/ysigchan.cpp index b7b5d5ea..4a5ecca8 100644 --- a/modules/ysigchan.cpp +++ b/modules/ysigchan.cpp @@ -25,12 +25,11 @@ #include #include +#include + using namespace TelEngine; namespace { // anonymous -//TODO: Delete. It's used only for testing -static String s_out_CallControl; - class SigChannel; // Signalling channel class SigDriver; // Signalling driver class SigParams; // Named list containing creator data (pointers) @@ -39,6 +38,9 @@ class SigCircuitGroup; // Used to create a signalling circuit class SigLink; // Keep a signalling link class SigIsdn; // ISDN (Q.931 over HDLC interface) call control class SigIsdnMonitor; // ISDN (Q.931 over HDLC interface) call control monitor +class SigConsumerMux; // Consumer used to push data to SigSourceMux +class SigSourceMux; // A data source multiplexer with 2 channels +class SigIsdnCallRecord; // Record an ISDN call monitor class SigLinkThread; // Get events and check timeout for links that have a call controller class SigChannel : public Channel @@ -94,7 +96,6 @@ private: bool m_hungup; // Hang up flag String m_reason; // Hangup reason bool m_inband; // True to try to send in-band tones - }; class SigDriver : public Driver @@ -154,6 +155,7 @@ protected: {} }; +// Signalling link class SigLink : public RefObject { friend class SigLinkThread; // The thread must set m_thread to 0 on terminate @@ -185,6 +187,9 @@ public: m_init = true; return create(params); } + // Handle events received from call controller + // Default action: calls the driver's handleEvent() + virtual void handleEvent(SignallingEvent* event); // Cancel thread if any. Call release void cleanup(); // Type names @@ -196,9 +201,12 @@ protected: virtual void release() {} // Start worker thread bool startThread(); - // Build the signalling interface + // Build the signalling interface and insert it in the engine static SignallingInterface* buildInterface(const String& device, const String& debugName, String& error); + // Build a signalling circuit group and insert it in the engine + static SigCircuitGroup* buildCircuits(const String& device, + const String& debugName, String& error); SignallingCallControl* m_controller; // Call controller, if any bool m_init; // True if already initialized bool m_inband; // True to send in-band tones through this link @@ -225,7 +233,7 @@ protected: inline ISDNQ931* q931() { return static_cast(m_controller); } // Build component debug name - inline void buildCompName(String& dest, const char* name) + inline void buildName(String& dest, const char* name) { dest = ""; dest << this->name() << '/' << name; } private: ISDNQ921* m_q921; @@ -237,12 +245,15 @@ private: class SigIsdnMonitor : public SigLink { public: - inline SigIsdnMonitor(const char* name) - : SigLink(name,IsdnMonitor), - m_q921Net(0), m_q921Cpe(0), m_ifaceNet(0), m_ifaceCpe(0), m_groupNet(0), m_groupCpe(0) - {} - virtual ~SigIsdnMonitor() - { release(); } + SigIsdnMonitor(const char* name); + virtual ~SigIsdnMonitor(); + virtual void handleEvent(SignallingEvent* event); + unsigned int sample() const + { return m_sample; } + unsigned char idleValue() const + { return m_idleValue; } + // Remove a call and it's call monitor + void removeCall(SigIsdnCallRecord* call, ISDNQ931CallMonitor* mon); protected: virtual bool create(NamedList& params); virtual bool reload(NamedList& params); @@ -250,9 +261,15 @@ protected: inline ISDNQ931Monitor* q931() { return static_cast(m_controller); } // Build component debug name - inline void buildCompName(String& dest, const char* name, bool net) + inline void buildName(String& dest, const char* name, bool net) { dest = ""; dest << this->name() << '/' << name << (net ? "/Net" : "/CPE"); } private: + Mutex m_monitorMutex; // Lock monitor list operations + ObjList m_monitors; // Monitor list + unsigned int m_id; // ID generator + unsigned int m_sample; // The sample length of one channel of a data source multiplexer + unsigned char m_idleValue; // Idle value for source multiplexer to fill when no data + ISDNQ921Pasive* m_q921Net; ISDNQ921Pasive* m_q921Cpe; SignallingInterface* m_ifaceNet; @@ -261,6 +278,74 @@ private: SigCircuitGroup* m_groupCpe; }; +// Consumer used to push data to SigSourceMux +class SigConsumerMux : public DataConsumer +{ + friend class SigSourceMux; +public: + virtual ~SigConsumerMux() + {} + virtual void Consume(const DataBlock& data, unsigned long tStamp); +protected: + inline SigConsumerMux(SigSourceMux* owner, bool first, const char* format) + : DataConsumer(format), m_owner(owner), m_first(first) + {} +private: + SigSourceMux* m_owner; // The owner of this consumer + bool m_first; // The channel allocated by the owner +}; + +// A data source multiplexer with 2 channels +class SigSourceMux : public DataSource +{ +public: + // Create consumers + // @param sample The number of bytes at which the buffer of a consumer is assumed to be full + // @param idleValue Value to fill missing data when forwarded + SigSourceMux(const char* format, unsigned int sample, unsigned char idleValue); + virtual ~SigSourceMux(); + bool hasSource(bool first) + { return first ? (m_firstSrc != 0) : (m_secondSrc != 0); } + // Replace the consumer of the given source. Remove current consumer's source before + // @param first True to replace with the first channel, false for the second + // Return false if source is 0 or has invalid format (other then ours) + bool attach(bool first, DataSource* source); + // Multiplex received data from consumers and forward it + void consume(bool first, const DataBlock& data, unsigned long tStamp); + // Forward the buffer if at least one channel is filled. Reset data + // If one channel is empty, fill it with idle value + void forwardBuffer(); + // Remove the source for the appropriate consumer + void removeSource(bool first); +private: + Mutex m_lock; // Lock consumers changes and data processing + unsigned char m_idleValue; // Filling value for missing data + bool m_sampleError; // Flag to show sample length violation error + unsigned int m_sample; // Maximum expected block length for data pushed by consumers + DataSource* m_firstSrc; // First consumer's source + DataSource* m_secondSrc; // Second consumer's source + SigConsumerMux* m_firstChan; // First channel + SigConsumerMux* m_secondChan; // Second channel + unsigned char m_filled; // Filled buffer flags (0: none, 1: first , 2: second, 3: both) + DataBlock m_buffer; // Multiplex buffer +}; + +// Record an ISDN call monitor +class SigIsdnCallRecord : public CallEndpoint +{ +public: + SigIsdnCallRecord(SigIsdnMonitor* monitor, const char* id, ISDNQ931CallMonitor* mon); + virtual ~SigIsdnCallRecord(); + bool init(SignallingEvent* event); + bool update(SignallingEvent* event); +private: + Mutex m_lock; + String m_caller; + String m_called; + SigIsdnMonitor* m_monitor; // The owner of this recorder + ISDNQ931CallMonitor* m_call; // The call monitor +}; + // Get events from call controller. Check timeouts class SigLinkThread : public Thread { @@ -371,8 +456,6 @@ SigChannel::~SigChannel() DDebug(this,DebugCall,"Destroyed with reason '%s' [%p]",m_reason.c_str(),this); } -#define EVENT_NAME SignallingEvent::typeName(event->type()) - void SigChannel::handleEvent(SignallingEvent* event) { if (!event) @@ -386,7 +469,7 @@ void SigChannel::handleEvent(SignallingEvent* event) case SignallingEvent::Ringing: evRinging(event); break; default: DDebug(this,DebugStub,"No handler for event '%s' [%p]", - EVENT_NAME,this); + event->name(),this); } } @@ -631,7 +714,7 @@ void SigChannel::evInfo(SignallingEvent* event) if (!tmp.null()) { bool inband = msg->params().getBoolValue("inband"); DDebug(this,DebugCall,"Event: '%s'. DTMF: '%s'. In band: %s [%p]", - EVENT_NAME,tmp.c_str(),String::boolText(inband),this); + event->name(),tmp.c_str(),String::boolText(inband),this); Message* m = message("chan.dtmf"); m->addParam("text",tmp); Engine::enqueue(m); @@ -640,7 +723,7 @@ void SigChannel::evInfo(SignallingEvent* event) void SigChannel::evProgress(SignallingEvent* event) { - DDebug(this,DebugCall,"Event: '%s' [%p]",EVENT_NAME,this); + DDebug(this,DebugCall,"Event: '%s' [%p]",event->name(),this); status("progressing"); Engine::enqueue(message("call.progress")); } @@ -652,12 +735,12 @@ void SigChannel::evRelease(SignallingEvent* event) else m_reason = ""; Debug(this,DebugCall,"Event: '%s'. Reason: '%s' [%p]", - EVENT_NAME,m_reason.c_str(),this); + event->name(),m_reason.c_str(),this); } void SigChannel::evAccept(SignallingEvent* event) { - DDebug(this,DebugCall,"Event: '%s' [%p]",EVENT_NAME,this); + DDebug(this,DebugCall,"Event: '%s' [%p]",event->name(),this); const char* format = 0; bool cicChange = false; if (event->message()) { @@ -670,7 +753,7 @@ void SigChannel::evAccept(SignallingEvent* event) void SigChannel::evAnswer(SignallingEvent* event) { - DDebug(this,DebugCall,"Event: '%s' [%p]",EVENT_NAME,this); + DDebug(this,DebugCall,"Event: '%s' [%p]",event->name(),this); status("answered"); const char* format = 0; bool cicChange = false; @@ -685,7 +768,7 @@ void SigChannel::evAnswer(SignallingEvent* event) void SigChannel::evRinging(SignallingEvent* event) { - DDebug(this,DebugCall,"Event: '%s' [%p]",EVENT_NAME,this); + DDebug(this,DebugCall,"Event: '%s' [%p]",event->name(),this); status("ringing"); const char* format = 0; bool cicChange = false; @@ -854,18 +937,11 @@ void SigDriver::handleEvent(SignallingEvent* event) return; } // No channel - -//TODO: ???????????? - if (event->call()->getObject("ISDNQ931CallMonitor")) { - Debug(this,DebugStub,"ISDNQ931CallMonitor event"); - return; - } - if (event->type() == SignallingEvent::NewCall) { ch = new SigChannel(event); if (!ch->route(event)) { - //TODO: Send release with congestion - event->call()->userdata(0); + ch->hangup("temporary-failure"); + ch->disconnect(); } } else @@ -990,13 +1066,16 @@ void SigDriver::initialize() switch (type) { case SigLink::IsdnPriNet: case SigLink::IsdnPriCpe: + case SigLink::IsdnMonitor: break; default: + if (stype) + Debug(this,DebugNote,"Link '%s'. Unknown type '%s'",sect->c_str(),stype); continue; } // Disable ? if (!sect->getBoolValue("enable",true)) { - clearLink(*sect,sect->getBoolValue("wait-call-end",false)); + clearLink(*sect,false); continue; } // Create or initialize @@ -1008,6 +1087,9 @@ void SigDriver::initialize() case SigLink::IsdnPriCpe: if (!link) link = new SigIsdn(*sect,type == SigLink::IsdnPriNet); + case SigLink::IsdnMonitor: + if (!link) + link = new SigIsdnMonitor(*sect); break; default: continue; @@ -1072,6 +1154,11 @@ void SigLink::setExiting(unsigned int msec) m_thread->m_timeout = Time::msecNow() + msec; } +void SigLink::handleEvent(SignallingEvent* event) +{ + plugin.handleEvent(event); +} + void SigLink::cleanup() { if (m_thread) { @@ -1094,12 +1181,14 @@ bool SigLink::startThread() return ok; } -SignallingInterface* SigLink::buildInterface(const String& device, const String& debugName, String& error) +SignallingInterface* SigLink::buildInterface(const String& device, + const String& debugName, String& error) { NamedList ifaceDefs("sig"); ifaceDefs.addParam("debugname",debugName); ifaceDefs.addParam("sig",device); - SignallingInterface* iface = static_cast(SignallingFactory::build(ifaceDefs,&ifaceDefs)); + SignallingInterface* iface = static_cast + (SignallingFactory::build(ifaceDefs,&ifaceDefs)); if (iface) { plugin.engine()->insert(iface); return iface; @@ -1109,6 +1198,44 @@ SignallingInterface* SigLink::buildInterface(const String& device, const String& return 0; } +SigCircuitGroup* SigLink::buildCircuits(const String& device, + const String& debugName, String& error) +{ + ObjList* voice = device.split(',',false); + if (!voice) { + error = "Missing or invalid voice parameter"; + return 0; + } + SigCircuitGroup* group = new SigCircuitGroup(debugName); + int start = 0; + for (ObjList* o = voice->skipNull(); o; o = o->skipNext()) { + String* s = static_cast(o->get()); + if (s->null()) + continue; + String tmp = debugName + "/" + *s; + SigParams spanParams("voice",group); + spanParams.addParam("debugname",tmp); + spanParams.addParam("voice",*s); + if (start) + spanParams.addParam("start",String(start)); + SignallingCircuitSpan* span = static_cast( + SignallingFactory::build(spanParams,&spanParams)); + if (!span) { + error << "Failed to build voice span '" << *s << "'"; + break; + } + int chans = spanParams.getIntValue("chans"); + start += chans; + } + delete voice; + if (error.null()) { + plugin.engine()->insert(group); + return group; + } + delete group; + return 0; +} + /** * SigIsdn */ @@ -1127,62 +1254,30 @@ bool SigIsdn::create(NamedList& params) m_inband = params.getBoolValue("dtmfinband",s_cfg.getBoolValue("general","dtmfinband",false)); // Signalling interface - NamedList ifaceDefs("sig"); - buildCompName(compName,"D"); + buildName(compName,"D"); m_iface = buildInterface(params.getValue("sig"),compName,error); if (!m_iface) break; // Voice transfer: circuit group, spans, circuits - buildCompName(compName,"B"); - m_group = new SigCircuitGroup(compName); - String tmp = params.getValue("voice"); // Use the same span as the signalling channel if missing - if (tmp.null()) - tmp = params.getValue("sig"); - ObjList* voice = tmp.split(',',false); - if (!voice) { - error = "Missing or invalid 'voice' parameter"; + buildName(compName,"B"); + const char* device = params.getValue("voice",params.getValue("sig")); + m_group = buildCircuits(device,compName,error); + if (!m_group) break; - } - int start = 0; - for (ObjList* o = voice->skipNull(); o; o = o->skipNext()) { - String* s = static_cast(o->get()); - if (s->null()) - continue; - tmp = compName + "/" + *s; - SigParams spanParams("voice",m_group); - spanParams.addParam("debugname",tmp); - spanParams.addParam("voice",*s); - if (start) - spanParams.addParam("start",String(start)); - SignallingCircuitSpan* span = static_cast( - SignallingFactory::build(spanParams,&spanParams)); - if (!span) { - error << "Failed to build voice span '" << *s << "'"; - break; - } - int chans = spanParams.getIntValue("chans"); - start += chans; - } - delete voice; - if (!error.null()) - break; - plugin.engine()->insert(m_group); // Q921 - buildCompName(compName,"Q921"); + buildName(compName,"Q921"); params.setParam("network",String::boolText(IsdnPriNet == type())); + params.setParam("print-frames",params.getValue("print-layer2PDU")); m_q921 = new ISDNQ921(params,compName); - m_q921->setDebug(params.getBoolValue("print-layer2PDU",false), - params.getBoolValue("extended-debug",false)); plugin.engine()->insert(m_q921); // Q931 - buildCompName(compName,"Q931"); + buildName(compName,"Q931"); + params.setParam("print-messages",params.getValue("print-layer3PDU")); m_controller = new ISDNQ931(params,compName); - q931()->setDebug(params.getBoolValue("print-layer3PDU",false), - params.getBoolValue("extended-debug",false)); plugin.engine()->insert(q931()); // Create links between components and enable them @@ -1255,6 +1350,91 @@ void SigIsdn::release() /** * SigIsdnMonitor */ +SigIsdnMonitor::SigIsdnMonitor(const char* name) + : SigLink(name,IsdnMonitor), + m_monitorMutex(true), + m_id(0), + m_sample(160), + m_idleValue(255), + m_q921Net(0), m_q921Cpe(0), m_ifaceNet(0), m_ifaceCpe(0), m_groupNet(0), m_groupCpe(0) +{ +} + +SigIsdnMonitor::~SigIsdnMonitor() +{ + release(); +} + +void SigIsdnMonitor::handleEvent(SignallingEvent* event) +{ + if (!event) + return; + if (!event->call()) { + XDebug(&plugin,DebugNote, + "SigIsdnMonitor('%s'). Received event (%p): '%s' without call [%p]", + name().c_str(),event,event->name(),this); + return; + } + + Lock lock(m_monitorMutex); + SigIsdnCallRecord* rec = 0; + ISDNQ931CallMonitor* mon = static_cast(event->call()); + + // Find monitor + for (ObjList* o = m_monitors.skipNull(); o; o = o->skipNext()) { + rec = static_cast(o->get()); + if (rec == mon->userdata()) + break; + rec = 0; + } + + if (rec) { + switch (event->type()) { + case SignallingEvent::Accept: + case SignallingEvent::Ringing: + case SignallingEvent::Answer: + if (rec->update(event)) + break; + // Fall through to release if update failed + case SignallingEvent::Release: + mon->userdata(0); + m_monitors.remove(rec,true); + break; + default: + DDebug(&plugin,DebugStub, + "SigIsdnMonitor('%s'). No handler for event '%s' [%p]", + name().c_str(),event->name(),this); + } + return; + } + + if (event->type() == SignallingEvent::NewCall) { + String id; + id << name() << "/rec/" << ++m_id; + rec = new SigIsdnCallRecord(this,id,mon); + if (rec->init(event)) { + mon->userdata(rec); + m_monitors.append(rec); + } + else { + mon->userdata(0); + rec->deref(); + } + } + else + XDebug(&plugin,DebugNote, + "SigIsdnMonitor('%s'). Received event (%p) with invalid user data (%p) [%p]", + name().c_str(),event,mon->userdata(),this); +} + +void SigIsdnMonitor::removeCall(SigIsdnCallRecord* call, ISDNQ931CallMonitor* mon) +{ + if (mon) + q931()->terminateMonitor(mon,0); + Lock lock(m_monitorMutex); + m_monitors.remove(call,false); +} + bool SigIsdnMonitor::create(NamedList& params) { release(); @@ -1267,97 +1447,81 @@ bool SigIsdnMonitor::create(NamedList& params) break; } + m_sample = params.getIntValue("sample",160); + if (!m_sample) + m_sample = 160; + m_idleValue = params.getIntValue("idlevalue"); + // Signalling interfaces - buildCompName(compName,"D",true); + buildName(compName,"D",true); m_ifaceNet = buildInterface(params.getValue("sig-net"),compName,error); if (!m_ifaceNet) break; - buildCompName(compName,"D",false); + buildName(compName,"D",false); m_ifaceCpe = buildInterface(params.getValue("sig-cpe"),compName,error); if (!m_ifaceCpe) break; -#if 0 - ISDNQ921Pasive* m_q921Net; - ISDNQ921Pasive* m_q921Cpe; - SigCircuitGroup* m_groupNet; - SigCircuitGroup* m_groupCpe; - - - - - // Voice transfer: circuit group, spans, circuits - buildCompName(compName,"B"); - m_group = new SigCircuitGroup(compName); - String tmp = params.getValue("voice"); + // Voice transfer: circuit groups, spans, circuits // Use the same span as the signalling channel if missing - if (tmp.null()) - tmp = params.getValue("sig"); - ObjList* voice = tmp.split(',',false); - if (!voice) { - error = "Missing or invalid 'voice' parameter"; + buildName(compName,"B",true); + const char* device = params.getValue("voice-net",params.getValue("sig-net")); + m_groupNet = buildCircuits(device,compName,error); + if (!m_groupNet) break; - } - int start = 0; - for (ObjList* o = voice->skipNull(); o; o = o->skipNext()) { - String* s = static_cast(o->get()); - if (s->null()) - continue; - tmp = compName + "/" + *s; - SigParams spanParams("voice",m_group); - spanParams.addParam("debugname",tmp); - spanParams.addParam("voice",*s); - if (start) - spanParams.addParam("start",String(start)); - SignallingCircuitSpan* span = static_cast( - SignallingFactory::build(spanParams,&spanParams)); - if (!span) { - error << "Failed to build voice span '" << *s << "'"; - break; - } - int chans = spanParams.getIntValue("chans"); - start += chans; - } - delete voice; - if (!error.null()) + buildName(compName,"B",false); + device = params.getValue("voice-cpe",params.getValue("sig-cpe")); + m_groupCpe = buildCircuits(device,compName,error); + if (!m_groupCpe) break; - plugin.engine()->insert(m_group); // Q921 - buildCompName(compName,"Q921"); - if (buildQ921Pasive - - params.setParam("network",String::boolText(IsdnPriNet == type())); - m_q921 = new ISDNQ921(params,compName); - m_q921->setDebug(params.getBoolValue("print-layer2PDU",false), - params.getBoolValue("extended-debug",false)); - plugin.engine()->insert(m_q921); + params.setParam("t203",params.getValue("idletimeout")); + buildName(compName,"Q921",true); + params.setParam("network",String::boolText(true)); + params.setParam("print-frames",params.getValue("print-layer2PDU")); + m_q921Net = new ISDNQ921Pasive(params,compName); + plugin.engine()->insert(m_q921Net); + buildName(compName,"Q921",false); + params.setParam("network",String::boolText(false)); + m_q921Cpe = new ISDNQ921Pasive(params,compName); + plugin.engine()->insert(m_q921Cpe); // Q931 compName = ""; - compName << this->name() << '/' << "Q931"; - m_controller = new ISDNQ931(params,compName); - q931()->setDebug(params.getBoolValue("print-layer3PDU",false), - params.getBoolValue("extended-debug",false)); + compName << name() << '/' << "Q931"; + params.setParam("print-messages",params.getValue("print-layer3PDU")); + m_controller = new ISDNQ931Monitor(params,compName); plugin.engine()->insert(q931()); // Create links between components and enable them - m_q921->SignallingReceiver::attach(m_iface); - m_iface->control(SignallingInterface::Enable); - q931()->attach(m_group); - m_q921->ISDNLayer2::attach(q931()); - q931()->attach(m_q921); - m_q921->multipleFrame(true,false); + q931()->attach(m_groupNet,true); + q931()->attach(m_groupCpe,false); + m_q921Net->SignallingReceiver::attach(m_ifaceNet); + m_q921Cpe->SignallingReceiver::attach(m_ifaceCpe); + m_ifaceNet->control(SignallingInterface::Enable); + m_ifaceCpe->control(SignallingInterface::Enable); + m_q921Net->ISDNLayer2::attach(q931()); + m_q921Cpe->ISDNLayer2::attach(q931()); + q931()->attach(m_q921Net,true); + q931()->attach(m_q921Cpe,false); // Start thread if (!startThread()) error = "Failed to start worker thread"; -#endif break; } - if (error.null()) + if (error.null()) { + if (debugAt(DebugInfo)) { + String tmp; + tmp << "\r\nSample size: " << m_sample; + tmp << "\r\nIdle fill value: " << (int)m_idleValue; + Debug(&plugin,DebugInfo,"SigIsdnMonitor('%s'). Initialized: [%p]%s", + name().c_str(),this,tmp.c_str()); + } return true; + } Debug(&plugin,DebugNote,"SigIsdnMonitor('%s'). Create failure. %s [%p]", name().c_str(),error.c_str(),this); return false; @@ -1383,6 +1547,9 @@ bool SigIsdnMonitor::reload(NamedList& params) void SigIsdnMonitor::release() { + m_monitorMutex.lock(); + m_monitors.clear(); + m_monitorMutex.unlock(); // *** Cleanup / Disable components if (q931()) q931()->cleanup(); @@ -1425,6 +1592,236 @@ void SigIsdnMonitor::release() XDebug(&plugin,DebugAll,"SigIsdnMonitor('%s'). Released [%p]",name().c_str(),this); } +/** + * SigConsumerMux + */ +void SigConsumerMux::Consume(const DataBlock& data, unsigned long tStamp) +{ + if (m_owner) + m_owner->consume(m_first,data,tStamp); +} + +/** + * SigSourceMux + */ +SigSourceMux::SigSourceMux(const char* format, unsigned int sample, unsigned char idleValue) + : DataSource(format), + m_lock(true), + m_idleValue(idleValue), + m_sampleError(false), + m_sample(sample), + m_firstSrc(0), + m_secondSrc(0), + m_firstChan(0), + m_secondChan(0), + m_filled(0) +{ + XDebug(&plugin,DebugAll,"SigSourceMux::SigSourceMux() [%p]",this); + if (!m_sample) + m_sample = 1; + m_buffer.assign(0,2 * m_sample,false); + m_firstChan = new SigConsumerMux(this,true,format); + m_secondChan = new SigConsumerMux(this,false,format); +} + +SigSourceMux::~SigSourceMux() +{ + Lock lock(m_lock); + removeSource(true); + removeSource(false); + if (m_firstChan) + m_firstChan->deref(); + if (m_secondChan) + m_secondChan->deref(); + XDebug(&plugin,DebugAll,"SigSourceMux::~SigSourceMux() [%p]",this); +} + +// Replace the consumer of the given source. Remove current consumer's source before +// @param first True to replace with the first channel, false for the second +// Return false if source is 0 or has invalid format (other then ours) +bool SigSourceMux::attach(bool first, DataSource* source) +{ + Lock lock(m_lock); + removeSource(first); + if (!(source && source->getFormat() == getFormat() && source->ref())) + return false; + if (first) { + m_firstSrc = source; + source->attach(m_firstChan); + } + else { + m_secondSrc = source; + source->attach(m_secondChan); + } + return true; +} + +// Multiplex received data from consumers and forward it +// Forward multiplexed buffer if chan already filled +// If received data is not greater then expected channel sample: +// Fill chan buffer with data +// If received data length is lesser then expected channel sample, +// fill the rest of the buffer with idle value +// If all channels are filled, forward the multiplexed buffer +// Otherwise: +// Fill chan buffer with m_sample data +// Forward data and consume the rest +void SigSourceMux::consume(bool first, const DataBlock& data, unsigned long tStamp) +{ + Lock lock(m_lock); + if (data.length() > m_sample && !m_sampleError) { + Debug(&plugin,DebugWarn, + "SigSourceMux::consume(). Received data length %u > %u from channel %c [%p]", + data.length(),m_sample,first ? '1' : '2',this); + m_sampleError = true; + } + // Check if already filled. Set filled flag + if ((first && (m_filled & 1)) || (!first && (m_filled & 2))) + forwardBuffer(); + unsigned char* buffer = (unsigned char*)m_buffer.data(); + if (!first) + buffer += m_sample; + m_filled &= first ? 1 : 2; + // Received data length is lesser then or equal to a sample + if (data.length() <= m_sample) { + ::memcpy(buffer,data.data(),data.length()); + if (data.length() != m_sample) + ::memset(buffer + data.length(),m_idleValue,m_sample - data.length()); + if (m_filled == 3) + forwardBuffer(); + return; + } + // Received data length is greater then a sample + ::memcpy(buffer,data.data(),m_sample); + DataBlock rest((unsigned char*)data.data() + m_sample,data.length() - m_sample); + consume(first,rest,tStamp); +} + +// Forward the buffer if at least one channel is filled. Reset data +// If one channel is empty, fill it with idle value +void SigSourceMux::forwardBuffer() +{ + if (!m_filled) + return; + if (m_filled < 3) { + unsigned char* data = (unsigned char*)m_buffer.data(); + if (!(m_filled & 1)) + data += m_sample; + ::memset(data,m_idleValue,m_sample); + } + m_filled = 0; + Forward(m_buffer); +} + +// Remove the source for the appropriate consumer +void SigSourceMux::removeSource(bool first) +{ + if (first && m_firstSrc) { + m_firstSrc->attach(0,true); + m_firstSrc->deref(); + m_firstSrc = 0; + } + if (!first && m_secondSrc) { + m_secondSrc->attach(0,true); + m_secondSrc->deref(); + m_secondSrc = 0; + } +} + +/** + * SigIsdnCallRecord + */ +SigIsdnCallRecord::SigIsdnCallRecord(SigIsdnMonitor* monitor, const char* id, + ISDNQ931CallMonitor* mon) + : CallEndpoint(id), + m_lock(true), + m_monitor(monitor), + m_call(mon) +{ +} + +SigIsdnCallRecord::~SigIsdnCallRecord() +{ + if (m_monitor) + m_monitor->removeCall(this,m_call); +} + +bool SigIsdnCallRecord::init(SignallingEvent* event) +{ + if (!(event && event->message())) + return false; + Lock lock(m_lock); + SignallingMessage* msg = event->message(); + m_caller = msg->params().getValue("caller"); + m_called = msg->params().getValue("called"); + return update(event); +} + +// Create the multiplexer if missing +// Update sources for the multiplexer. Change them if circuit changed +// Start recording if the multiplexer has at least one source +bool SigIsdnCallRecord::update(SignallingEvent* event) +{ + Lock lock(m_lock); + if (!(event && event->call() && event->message())) + return true; + SignallingCall* call = event->call(); + SignallingMessage* msg = event->message(); + bool chg = msg->params().getValue("circuit-change"); + const char* format = msg->params().getValue("format"); + SigSourceMux* source = static_cast(getSource()); + if (!source) { + if (!format) + return true; + source = new SigSourceMux(format, + m_monitor ? m_monitor->sample() : 160, + m_monitor ? m_monitor->idleValue() : 255); + setSource(source); + source->deref(); + if (!getSource()) { + Debug(id(),DebugWarn,"No data source. Terminate [%p]",this); + return false; + } + // Start recording + Message m("call.execute"); + m.userData(this); + m.addParam("caller",m_caller); + m.addParam("called",m_called); + m.addParam("maxlen","60000"); + m.addParam("target","record/home/marian/Desktop/test.wav"); + if (!Engine::dispatch(m)) { + Debug(id(),DebugWarn,"Failed to start recording. Terminate [%p]",this); + return false; + } + } + if (format && source->getFormat() != format) { + Debug(id(),DebugWarn,"Data format changed. Terminate [%p]",this); + setSource(); + return false; + } + if (chg) { + source->removeSource(true); + source->removeSource(false); + } + bool first = true; + while (true) { + if (!source->hasSource(first)) { + SignallingCircuit* cic = static_cast(call->getObject( + first ? "SignallingCircuitCaller" : "SignallingCircuitCalled")); + DataSource* src = cic ? static_cast(cic->getObject("DataSource")) : 0; + if (src) { + source->attach(first,src); + DDebug(id(),DebugAll,"Data source for channel %c set to (%p) [%p]", + first ? '1' : '2',src,this); + } + } + if (!first) + break; + first = false; + } + return true; +} + /** * SigLinkThread */ @@ -1443,7 +1840,7 @@ void SigLinkThread::run() Time time; event = m_link->controller()->getEvent(time); if (event) { - plugin.handleEvent(event); + m_link->handleEvent(event); delete event; } // Check timeout if waiting to terminate