Work in progress

git-svn-id: http://yate.null.ro/svn/yate/trunk@1238 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
marian 2007-03-28 16:17:41 +00:00
parent 60741080f8
commit e93ebd0643
2 changed files with 552 additions and 145 deletions

View File

@ -495,13 +495,13 @@ WpInterface::WpInterface(const NamedList& params)
m_thread(0), m_received(false), m_overRead(0)
{
setName(params.getValue("debugname","WpInterface"));
DDebug(this,DebugAll,"WpInterface::WpInterface() [%p]",this);
XDebug(this,DebugAll,"WpInterface::WpInterface() [%p]",this);
}
WpInterface::~WpInterface()
{
control(Disable,0);
DDebug(this,DebugAll,"WpInterface::~WpInterface() [%p]",this);
XDebug(this,DebugAll,"WpInterface::~WpInterface() [%p]",this);
}
bool WpInterface::init(NamedList& params)
@ -812,6 +812,7 @@ bool WpCircuit::status(Status newStat, bool sync)
bool enableData = false;
if (SignallingCircuit::status() == Connected)
enableData = true;
// Don't put this message for final states
DDebug(group(),DebugAll,
"WpCircuit %u. Changed status to '%u'. %s data transfer [%p]",
code(),newStat,enableData ? "Enable" : "Disable",this);
@ -835,6 +836,8 @@ bool WpCircuit::status(Status newStat, bool sync)
m_consumer->m_total = 0;
}
if (m_sourceValid) {
// Remove consumer
m_source->attach(0,true);
m_sourceValid = 0;
XDebug(group(),DebugAll,"WpCircuit %u. Source transferred %u byte(s) [%p]",
code(),m_source->m_total,this);
@ -878,9 +881,9 @@ void* WpCircuit::getObject(const String& name) const
if (!group())
return 0;
if (name == "DataSource")
return m_source;
return m_sourceValid;
if (name == "DataConsumer")
return m_consumer;
return m_consumerValid;
return 0;
}
@ -945,7 +948,7 @@ WpData::~WpData()
// Initialize
bool WpData::init(NamedList& params)
{
DDebug(m_group,DebugAll,"WpData('%s'). Initializing [%p]",id().safe(),this);
XDebug(m_group,DebugAll,"WpData('%s'). Initializing [%p]",id().safe(),this);
if (!m_group) {
Debug(DebugNote,"WpData('%s'). Circuit group is missing [%p]",
id().safe(),this);
@ -981,6 +984,13 @@ bool WpData::init(NamedList& params)
if (!m_samples)
m_samples = 50;
}
else if (type == "T1") {
m_chans = 23;
if (cics.null())
cics = "1-23";
if (!m_samples)
m_samples = 64;
}
else {
Debug(m_group,DebugNote,"WpData('%s'). Invalid voice group type '%s' [%p]",
id().safe(),type.safe(),this);

View File

@ -25,12 +25,11 @@
#include <yatephone.h>
#include <yatess7.h>
#include <string.h>
using namespace TelEngine;
namespace { // anonymous
//TODO: Delete. It's used only for testing
static String s_out_CallControl;
class SigChannel; // Signalling channel
class SigDriver; // Signalling driver
class SigParams; // Named list containing creator data (pointers)
@ -39,6 +38,9 @@ class SigCircuitGroup; // Used to create a signalling circuit
class SigLink; // Keep a signalling link
class SigIsdn; // ISDN (Q.931 over HDLC interface) call control
class SigIsdnMonitor; // ISDN (Q.931 over HDLC interface) call control monitor
class SigConsumerMux; // Consumer used to push data to SigSourceMux
class SigSourceMux; // A data source multiplexer with 2 channels
class SigIsdnCallRecord; // Record an ISDN call monitor
class SigLinkThread; // Get events and check timeout for links that have a call controller
class SigChannel : public Channel
@ -94,7 +96,6 @@ private:
bool m_hungup; // Hang up flag
String m_reason; // Hangup reason
bool m_inband; // True to try to send in-band tones
};
class SigDriver : public Driver
@ -154,6 +155,7 @@ protected:
{}
};
// Signalling link
class SigLink : public RefObject
{
friend class SigLinkThread; // The thread must set m_thread to 0 on terminate
@ -185,6 +187,9 @@ public:
m_init = true;
return create(params);
}
// Handle events received from call controller
// Default action: calls the driver's handleEvent()
virtual void handleEvent(SignallingEvent* event);
// Cancel thread if any. Call release
void cleanup();
// Type names
@ -196,9 +201,12 @@ protected:
virtual void release() {}
// Start worker thread
bool startThread();
// Build the signalling interface
// Build the signalling interface and insert it in the engine
static SignallingInterface* buildInterface(const String& device,
const String& debugName, String& error);
// Build a signalling circuit group and insert it in the engine
static SigCircuitGroup* buildCircuits(const String& device,
const String& debugName, String& error);
SignallingCallControl* m_controller; // Call controller, if any
bool m_init; // True if already initialized
bool m_inband; // True to send in-band tones through this link
@ -225,7 +233,7 @@ protected:
inline ISDNQ931* q931()
{ return static_cast<ISDNQ931*>(m_controller); }
// Build component debug name
inline void buildCompName(String& dest, const char* name)
inline void buildName(String& dest, const char* name)
{ dest = ""; dest << this->name() << '/' << name; }
private:
ISDNQ921* m_q921;
@ -237,12 +245,15 @@ private:
class SigIsdnMonitor : public SigLink
{
public:
inline SigIsdnMonitor(const char* name)
: SigLink(name,IsdnMonitor),
m_q921Net(0), m_q921Cpe(0), m_ifaceNet(0), m_ifaceCpe(0), m_groupNet(0), m_groupCpe(0)
{}
virtual ~SigIsdnMonitor()
{ release(); }
SigIsdnMonitor(const char* name);
virtual ~SigIsdnMonitor();
virtual void handleEvent(SignallingEvent* event);
unsigned int sample() const
{ return m_sample; }
unsigned char idleValue() const
{ return m_idleValue; }
// Remove a call and it's call monitor
void removeCall(SigIsdnCallRecord* call, ISDNQ931CallMonitor* mon);
protected:
virtual bool create(NamedList& params);
virtual bool reload(NamedList& params);
@ -250,9 +261,15 @@ protected:
inline ISDNQ931Monitor* q931()
{ return static_cast<ISDNQ931Monitor*>(m_controller); }
// Build component debug name
inline void buildCompName(String& dest, const char* name, bool net)
inline void buildName(String& dest, const char* name, bool net)
{ dest = ""; dest << this->name() << '/' << name << (net ? "/Net" : "/CPE"); }
private:
Mutex m_monitorMutex; // Lock monitor list operations
ObjList m_monitors; // Monitor list
unsigned int m_id; // ID generator
unsigned int m_sample; // The sample length of one channel of a data source multiplexer
unsigned char m_idleValue; // Idle value for source multiplexer to fill when no data
ISDNQ921Pasive* m_q921Net;
ISDNQ921Pasive* m_q921Cpe;
SignallingInterface* m_ifaceNet;
@ -261,6 +278,74 @@ private:
SigCircuitGroup* m_groupCpe;
};
// Consumer used to push data to SigSourceMux
class SigConsumerMux : public DataConsumer
{
friend class SigSourceMux;
public:
virtual ~SigConsumerMux()
{}
virtual void Consume(const DataBlock& data, unsigned long tStamp);
protected:
inline SigConsumerMux(SigSourceMux* owner, bool first, const char* format)
: DataConsumer(format), m_owner(owner), m_first(first)
{}
private:
SigSourceMux* m_owner; // The owner of this consumer
bool m_first; // The channel allocated by the owner
};
// A data source multiplexer with 2 channels
class SigSourceMux : public DataSource
{
public:
// Create consumers
// @param sample The number of bytes at which the buffer of a consumer is assumed to be full
// @param idleValue Value to fill missing data when forwarded
SigSourceMux(const char* format, unsigned int sample, unsigned char idleValue);
virtual ~SigSourceMux();
bool hasSource(bool first)
{ return first ? (m_firstSrc != 0) : (m_secondSrc != 0); }
// Replace the consumer of the given source. Remove current consumer's source before
// @param first True to replace with the first channel, false for the second
// Return false if source is 0 or has invalid format (other then ours)
bool attach(bool first, DataSource* source);
// Multiplex received data from consumers and forward it
void consume(bool first, const DataBlock& data, unsigned long tStamp);
// Forward the buffer if at least one channel is filled. Reset data
// If one channel is empty, fill it with idle value
void forwardBuffer();
// Remove the source for the appropriate consumer
void removeSource(bool first);
private:
Mutex m_lock; // Lock consumers changes and data processing
unsigned char m_idleValue; // Filling value for missing data
bool m_sampleError; // Flag to show sample length violation error
unsigned int m_sample; // Maximum expected block length for data pushed by consumers
DataSource* m_firstSrc; // First consumer's source
DataSource* m_secondSrc; // Second consumer's source
SigConsumerMux* m_firstChan; // First channel
SigConsumerMux* m_secondChan; // Second channel
unsigned char m_filled; // Filled buffer flags (0: none, 1: first , 2: second, 3: both)
DataBlock m_buffer; // Multiplex buffer
};
// Record an ISDN call monitor
class SigIsdnCallRecord : public CallEndpoint
{
public:
SigIsdnCallRecord(SigIsdnMonitor* monitor, const char* id, ISDNQ931CallMonitor* mon);
virtual ~SigIsdnCallRecord();
bool init(SignallingEvent* event);
bool update(SignallingEvent* event);
private:
Mutex m_lock;
String m_caller;
String m_called;
SigIsdnMonitor* m_monitor; // The owner of this recorder
ISDNQ931CallMonitor* m_call; // The call monitor
};
// Get events from call controller. Check timeouts
class SigLinkThread : public Thread
{
@ -371,8 +456,6 @@ SigChannel::~SigChannel()
DDebug(this,DebugCall,"Destroyed with reason '%s' [%p]",m_reason.c_str(),this);
}
#define EVENT_NAME SignallingEvent::typeName(event->type())
void SigChannel::handleEvent(SignallingEvent* event)
{
if (!event)
@ -386,7 +469,7 @@ void SigChannel::handleEvent(SignallingEvent* event)
case SignallingEvent::Ringing: evRinging(event); break;
default:
DDebug(this,DebugStub,"No handler for event '%s' [%p]",
EVENT_NAME,this);
event->name(),this);
}
}
@ -631,7 +714,7 @@ void SigChannel::evInfo(SignallingEvent* event)
if (!tmp.null()) {
bool inband = msg->params().getBoolValue("inband");
DDebug(this,DebugCall,"Event: '%s'. DTMF: '%s'. In band: %s [%p]",
EVENT_NAME,tmp.c_str(),String::boolText(inband),this);
event->name(),tmp.c_str(),String::boolText(inband),this);
Message* m = message("chan.dtmf");
m->addParam("text",tmp);
Engine::enqueue(m);
@ -640,7 +723,7 @@ void SigChannel::evInfo(SignallingEvent* event)
void SigChannel::evProgress(SignallingEvent* event)
{
DDebug(this,DebugCall,"Event: '%s' [%p]",EVENT_NAME,this);
DDebug(this,DebugCall,"Event: '%s' [%p]",event->name(),this);
status("progressing");
Engine::enqueue(message("call.progress"));
}
@ -652,12 +735,12 @@ void SigChannel::evRelease(SignallingEvent* event)
else
m_reason = "";
Debug(this,DebugCall,"Event: '%s'. Reason: '%s' [%p]",
EVENT_NAME,m_reason.c_str(),this);
event->name(),m_reason.c_str(),this);
}
void SigChannel::evAccept(SignallingEvent* event)
{
DDebug(this,DebugCall,"Event: '%s' [%p]",EVENT_NAME,this);
DDebug(this,DebugCall,"Event: '%s' [%p]",event->name(),this);
const char* format = 0;
bool cicChange = false;
if (event->message()) {
@ -670,7 +753,7 @@ void SigChannel::evAccept(SignallingEvent* event)
void SigChannel::evAnswer(SignallingEvent* event)
{
DDebug(this,DebugCall,"Event: '%s' [%p]",EVENT_NAME,this);
DDebug(this,DebugCall,"Event: '%s' [%p]",event->name(),this);
status("answered");
const char* format = 0;
bool cicChange = false;
@ -685,7 +768,7 @@ void SigChannel::evAnswer(SignallingEvent* event)
void SigChannel::evRinging(SignallingEvent* event)
{
DDebug(this,DebugCall,"Event: '%s' [%p]",EVENT_NAME,this);
DDebug(this,DebugCall,"Event: '%s' [%p]",event->name(),this);
status("ringing");
const char* format = 0;
bool cicChange = false;
@ -854,18 +937,11 @@ void SigDriver::handleEvent(SignallingEvent* event)
return;
}
// No channel
//TODO: ????????????
if (event->call()->getObject("ISDNQ931CallMonitor")) {
Debug(this,DebugStub,"ISDNQ931CallMonitor event");
return;
}
if (event->type() == SignallingEvent::NewCall) {
ch = new SigChannel(event);
if (!ch->route(event)) {
//TODO: Send release with congestion
event->call()->userdata(0);
ch->hangup("temporary-failure");
ch->disconnect();
}
}
else
@ -990,13 +1066,16 @@ void SigDriver::initialize()
switch (type) {
case SigLink::IsdnPriNet:
case SigLink::IsdnPriCpe:
case SigLink::IsdnMonitor:
break;
default:
if (stype)
Debug(this,DebugNote,"Link '%s'. Unknown type '%s'",sect->c_str(),stype);
continue;
}
// Disable ?
if (!sect->getBoolValue("enable",true)) {
clearLink(*sect,sect->getBoolValue("wait-call-end",false));
clearLink(*sect,false);
continue;
}
// Create or initialize
@ -1008,6 +1087,9 @@ void SigDriver::initialize()
case SigLink::IsdnPriCpe:
if (!link)
link = new SigIsdn(*sect,type == SigLink::IsdnPriNet);
case SigLink::IsdnMonitor:
if (!link)
link = new SigIsdnMonitor(*sect);
break;
default:
continue;
@ -1072,6 +1154,11 @@ void SigLink::setExiting(unsigned int msec)
m_thread->m_timeout = Time::msecNow() + msec;
}
void SigLink::handleEvent(SignallingEvent* event)
{
plugin.handleEvent(event);
}
void SigLink::cleanup()
{
if (m_thread) {
@ -1094,12 +1181,14 @@ bool SigLink::startThread()
return ok;
}
SignallingInterface* SigLink::buildInterface(const String& device, const String& debugName, String& error)
SignallingInterface* SigLink::buildInterface(const String& device,
const String& debugName, String& error)
{
NamedList ifaceDefs("sig");
ifaceDefs.addParam("debugname",debugName);
ifaceDefs.addParam("sig",device);
SignallingInterface* iface = static_cast<SignallingInterface*>(SignallingFactory::build(ifaceDefs,&ifaceDefs));
SignallingInterface* iface = static_cast<SignallingInterface*>
(SignallingFactory::build(ifaceDefs,&ifaceDefs));
if (iface) {
plugin.engine()->insert(iface);
return iface;
@ -1109,6 +1198,44 @@ SignallingInterface* SigLink::buildInterface(const String& device, const String&
return 0;
}
SigCircuitGroup* SigLink::buildCircuits(const String& device,
const String& debugName, String& error)
{
ObjList* voice = device.split(',',false);
if (!voice) {
error = "Missing or invalid voice parameter";
return 0;
}
SigCircuitGroup* group = new SigCircuitGroup(debugName);
int start = 0;
for (ObjList* o = voice->skipNull(); o; o = o->skipNext()) {
String* s = static_cast<String*>(o->get());
if (s->null())
continue;
String tmp = debugName + "/" + *s;
SigParams spanParams("voice",group);
spanParams.addParam("debugname",tmp);
spanParams.addParam("voice",*s);
if (start)
spanParams.addParam("start",String(start));
SignallingCircuitSpan* span = static_cast<SignallingCircuitSpan*>(
SignallingFactory::build(spanParams,&spanParams));
if (!span) {
error << "Failed to build voice span '" << *s << "'";
break;
}
int chans = spanParams.getIntValue("chans");
start += chans;
}
delete voice;
if (error.null()) {
plugin.engine()->insert(group);
return group;
}
delete group;
return 0;
}
/**
* SigIsdn
*/
@ -1127,62 +1254,30 @@ bool SigIsdn::create(NamedList& params)
m_inband = params.getBoolValue("dtmfinband",s_cfg.getBoolValue("general","dtmfinband",false));
// Signalling interface
NamedList ifaceDefs("sig");
buildCompName(compName,"D");
buildName(compName,"D");
m_iface = buildInterface(params.getValue("sig"),compName,error);
if (!m_iface)
break;
// Voice transfer: circuit group, spans, circuits
buildCompName(compName,"B");
m_group = new SigCircuitGroup(compName);
String tmp = params.getValue("voice");
// Use the same span as the signalling channel if missing
if (tmp.null())
tmp = params.getValue("sig");
ObjList* voice = tmp.split(',',false);
if (!voice) {
error = "Missing or invalid 'voice' parameter";
buildName(compName,"B");
const char* device = params.getValue("voice",params.getValue("sig"));
m_group = buildCircuits(device,compName,error);
if (!m_group)
break;
}
int start = 0;
for (ObjList* o = voice->skipNull(); o; o = o->skipNext()) {
String* s = static_cast<String*>(o->get());
if (s->null())
continue;
tmp = compName + "/" + *s;
SigParams spanParams("voice",m_group);
spanParams.addParam("debugname",tmp);
spanParams.addParam("voice",*s);
if (start)
spanParams.addParam("start",String(start));
SignallingCircuitSpan* span = static_cast<SignallingCircuitSpan*>(
SignallingFactory::build(spanParams,&spanParams));
if (!span) {
error << "Failed to build voice span '" << *s << "'";
break;
}
int chans = spanParams.getIntValue("chans");
start += chans;
}
delete voice;
if (!error.null())
break;
plugin.engine()->insert(m_group);
// Q921
buildCompName(compName,"Q921");
buildName(compName,"Q921");
params.setParam("network",String::boolText(IsdnPriNet == type()));
params.setParam("print-frames",params.getValue("print-layer2PDU"));
m_q921 = new ISDNQ921(params,compName);
m_q921->setDebug(params.getBoolValue("print-layer2PDU",false),
params.getBoolValue("extended-debug",false));
plugin.engine()->insert(m_q921);
// Q931
buildCompName(compName,"Q931");
buildName(compName,"Q931");
params.setParam("print-messages",params.getValue("print-layer3PDU"));
m_controller = new ISDNQ931(params,compName);
q931()->setDebug(params.getBoolValue("print-layer3PDU",false),
params.getBoolValue("extended-debug",false));
plugin.engine()->insert(q931());
// Create links between components and enable them
@ -1255,6 +1350,91 @@ void SigIsdn::release()
/**
* SigIsdnMonitor
*/
SigIsdnMonitor::SigIsdnMonitor(const char* name)
: SigLink(name,IsdnMonitor),
m_monitorMutex(true),
m_id(0),
m_sample(160),
m_idleValue(255),
m_q921Net(0), m_q921Cpe(0), m_ifaceNet(0), m_ifaceCpe(0), m_groupNet(0), m_groupCpe(0)
{
}
SigIsdnMonitor::~SigIsdnMonitor()
{
release();
}
void SigIsdnMonitor::handleEvent(SignallingEvent* event)
{
if (!event)
return;
if (!event->call()) {
XDebug(&plugin,DebugNote,
"SigIsdnMonitor('%s'). Received event (%p): '%s' without call [%p]",
name().c_str(),event,event->name(),this);
return;
}
Lock lock(m_monitorMutex);
SigIsdnCallRecord* rec = 0;
ISDNQ931CallMonitor* mon = static_cast<ISDNQ931CallMonitor*>(event->call());
// Find monitor
for (ObjList* o = m_monitors.skipNull(); o; o = o->skipNext()) {
rec = static_cast<SigIsdnCallRecord*>(o->get());
if (rec == mon->userdata())
break;
rec = 0;
}
if (rec) {
switch (event->type()) {
case SignallingEvent::Accept:
case SignallingEvent::Ringing:
case SignallingEvent::Answer:
if (rec->update(event))
break;
// Fall through to release if update failed
case SignallingEvent::Release:
mon->userdata(0);
m_monitors.remove(rec,true);
break;
default:
DDebug(&plugin,DebugStub,
"SigIsdnMonitor('%s'). No handler for event '%s' [%p]",
name().c_str(),event->name(),this);
}
return;
}
if (event->type() == SignallingEvent::NewCall) {
String id;
id << name() << "/rec/" << ++m_id;
rec = new SigIsdnCallRecord(this,id,mon);
if (rec->init(event)) {
mon->userdata(rec);
m_monitors.append(rec);
}
else {
mon->userdata(0);
rec->deref();
}
}
else
XDebug(&plugin,DebugNote,
"SigIsdnMonitor('%s'). Received event (%p) with invalid user data (%p) [%p]",
name().c_str(),event,mon->userdata(),this);
}
void SigIsdnMonitor::removeCall(SigIsdnCallRecord* call, ISDNQ931CallMonitor* mon)
{
if (mon)
q931()->terminateMonitor(mon,0);
Lock lock(m_monitorMutex);
m_monitors.remove(call,false);
}
bool SigIsdnMonitor::create(NamedList& params)
{
release();
@ -1267,97 +1447,81 @@ bool SigIsdnMonitor::create(NamedList& params)
break;
}
m_sample = params.getIntValue("sample",160);
if (!m_sample)
m_sample = 160;
m_idleValue = params.getIntValue("idlevalue");
// Signalling interfaces
buildCompName(compName,"D",true);
buildName(compName,"D",true);
m_ifaceNet = buildInterface(params.getValue("sig-net"),compName,error);
if (!m_ifaceNet)
break;
buildCompName(compName,"D",false);
buildName(compName,"D",false);
m_ifaceCpe = buildInterface(params.getValue("sig-cpe"),compName,error);
if (!m_ifaceCpe)
break;
#if 0
ISDNQ921Pasive* m_q921Net;
ISDNQ921Pasive* m_q921Cpe;
SigCircuitGroup* m_groupNet;
SigCircuitGroup* m_groupCpe;
// Voice transfer: circuit group, spans, circuits
buildCompName(compName,"B");
m_group = new SigCircuitGroup(compName);
String tmp = params.getValue("voice");
// Voice transfer: circuit groups, spans, circuits
// Use the same span as the signalling channel if missing
if (tmp.null())
tmp = params.getValue("sig");
ObjList* voice = tmp.split(',',false);
if (!voice) {
error = "Missing or invalid 'voice' parameter";
buildName(compName,"B",true);
const char* device = params.getValue("voice-net",params.getValue("sig-net"));
m_groupNet = buildCircuits(device,compName,error);
if (!m_groupNet)
break;
}
int start = 0;
for (ObjList* o = voice->skipNull(); o; o = o->skipNext()) {
String* s = static_cast<String*>(o->get());
if (s->null())
continue;
tmp = compName + "/" + *s;
SigParams spanParams("voice",m_group);
spanParams.addParam("debugname",tmp);
spanParams.addParam("voice",*s);
if (start)
spanParams.addParam("start",String(start));
SignallingCircuitSpan* span = static_cast<SignallingCircuitSpan*>(
SignallingFactory::build(spanParams,&spanParams));
if (!span) {
error << "Failed to build voice span '" << *s << "'";
break;
}
int chans = spanParams.getIntValue("chans");
start += chans;
}
delete voice;
if (!error.null())
buildName(compName,"B",false);
device = params.getValue("voice-cpe",params.getValue("sig-cpe"));
m_groupCpe = buildCircuits(device,compName,error);
if (!m_groupCpe)
break;
plugin.engine()->insert(m_group);
// Q921
buildCompName(compName,"Q921");
if (buildQ921Pasive
params.setParam("network",String::boolText(IsdnPriNet == type()));
m_q921 = new ISDNQ921(params,compName);
m_q921->setDebug(params.getBoolValue("print-layer2PDU",false),
params.getBoolValue("extended-debug",false));
plugin.engine()->insert(m_q921);
params.setParam("t203",params.getValue("idletimeout"));
buildName(compName,"Q921",true);
params.setParam("network",String::boolText(true));
params.setParam("print-frames",params.getValue("print-layer2PDU"));
m_q921Net = new ISDNQ921Pasive(params,compName);
plugin.engine()->insert(m_q921Net);
buildName(compName,"Q921",false);
params.setParam("network",String::boolText(false));
m_q921Cpe = new ISDNQ921Pasive(params,compName);
plugin.engine()->insert(m_q921Cpe);
// Q931
compName = "";
compName << this->name() << '/' << "Q931";
m_controller = new ISDNQ931(params,compName);
q931()->setDebug(params.getBoolValue("print-layer3PDU",false),
params.getBoolValue("extended-debug",false));
compName << name() << '/' << "Q931";
params.setParam("print-messages",params.getValue("print-layer3PDU"));
m_controller = new ISDNQ931Monitor(params,compName);
plugin.engine()->insert(q931());
// Create links between components and enable them
m_q921->SignallingReceiver::attach(m_iface);
m_iface->control(SignallingInterface::Enable);
q931()->attach(m_group);
m_q921->ISDNLayer2::attach(q931());
q931()->attach(m_q921);
m_q921->multipleFrame(true,false);
q931()->attach(m_groupNet,true);
q931()->attach(m_groupCpe,false);
m_q921Net->SignallingReceiver::attach(m_ifaceNet);
m_q921Cpe->SignallingReceiver::attach(m_ifaceCpe);
m_ifaceNet->control(SignallingInterface::Enable);
m_ifaceCpe->control(SignallingInterface::Enable);
m_q921Net->ISDNLayer2::attach(q931());
m_q921Cpe->ISDNLayer2::attach(q931());
q931()->attach(m_q921Net,true);
q931()->attach(m_q921Cpe,false);
// Start thread
if (!startThread())
error = "Failed to start worker thread";
#endif
break;
}
if (error.null())
if (error.null()) {
if (debugAt(DebugInfo)) {
String tmp;
tmp << "\r\nSample size: " << m_sample;
tmp << "\r\nIdle fill value: " << (int)m_idleValue;
Debug(&plugin,DebugInfo,"SigIsdnMonitor('%s'). Initialized: [%p]%s",
name().c_str(),this,tmp.c_str());
}
return true;
}
Debug(&plugin,DebugNote,"SigIsdnMonitor('%s'). Create failure. %s [%p]",
name().c_str(),error.c_str(),this);
return false;
@ -1383,6 +1547,9 @@ bool SigIsdnMonitor::reload(NamedList& params)
void SigIsdnMonitor::release()
{
m_monitorMutex.lock();
m_monitors.clear();
m_monitorMutex.unlock();
// *** Cleanup / Disable components
if (q931())
q931()->cleanup();
@ -1425,6 +1592,236 @@ void SigIsdnMonitor::release()
XDebug(&plugin,DebugAll,"SigIsdnMonitor('%s'). Released [%p]",name().c_str(),this);
}
/**
* SigConsumerMux
*/
void SigConsumerMux::Consume(const DataBlock& data, unsigned long tStamp)
{
if (m_owner)
m_owner->consume(m_first,data,tStamp);
}
/**
* SigSourceMux
*/
SigSourceMux::SigSourceMux(const char* format, unsigned int sample, unsigned char idleValue)
: DataSource(format),
m_lock(true),
m_idleValue(idleValue),
m_sampleError(false),
m_sample(sample),
m_firstSrc(0),
m_secondSrc(0),
m_firstChan(0),
m_secondChan(0),
m_filled(0)
{
XDebug(&plugin,DebugAll,"SigSourceMux::SigSourceMux() [%p]",this);
if (!m_sample)
m_sample = 1;
m_buffer.assign(0,2 * m_sample,false);
m_firstChan = new SigConsumerMux(this,true,format);
m_secondChan = new SigConsumerMux(this,false,format);
}
SigSourceMux::~SigSourceMux()
{
Lock lock(m_lock);
removeSource(true);
removeSource(false);
if (m_firstChan)
m_firstChan->deref();
if (m_secondChan)
m_secondChan->deref();
XDebug(&plugin,DebugAll,"SigSourceMux::~SigSourceMux() [%p]",this);
}
// Replace the consumer of the given source. Remove current consumer's source before
// @param first True to replace with the first channel, false for the second
// Return false if source is 0 or has invalid format (other then ours)
bool SigSourceMux::attach(bool first, DataSource* source)
{
Lock lock(m_lock);
removeSource(first);
if (!(source && source->getFormat() == getFormat() && source->ref()))
return false;
if (first) {
m_firstSrc = source;
source->attach(m_firstChan);
}
else {
m_secondSrc = source;
source->attach(m_secondChan);
}
return true;
}
// Multiplex received data from consumers and forward it
// Forward multiplexed buffer if chan already filled
// If received data is not greater then expected channel sample:
// Fill chan buffer with data
// If received data length is lesser then expected channel sample,
// fill the rest of the buffer with idle value
// If all channels are filled, forward the multiplexed buffer
// Otherwise:
// Fill chan buffer with m_sample data
// Forward data and consume the rest
void SigSourceMux::consume(bool first, const DataBlock& data, unsigned long tStamp)
{
Lock lock(m_lock);
if (data.length() > m_sample && !m_sampleError) {
Debug(&plugin,DebugWarn,
"SigSourceMux::consume(). Received data length %u > %u from channel %c [%p]",
data.length(),m_sample,first ? '1' : '2',this);
m_sampleError = true;
}
// Check if already filled. Set filled flag
if ((first && (m_filled & 1)) || (!first && (m_filled & 2)))
forwardBuffer();
unsigned char* buffer = (unsigned char*)m_buffer.data();
if (!first)
buffer += m_sample;
m_filled &= first ? 1 : 2;
// Received data length is lesser then or equal to a sample
if (data.length() <= m_sample) {
::memcpy(buffer,data.data(),data.length());
if (data.length() != m_sample)
::memset(buffer + data.length(),m_idleValue,m_sample - data.length());
if (m_filled == 3)
forwardBuffer();
return;
}
// Received data length is greater then a sample
::memcpy(buffer,data.data(),m_sample);
DataBlock rest((unsigned char*)data.data() + m_sample,data.length() - m_sample);
consume(first,rest,tStamp);
}
// Forward the buffer if at least one channel is filled. Reset data
// If one channel is empty, fill it with idle value
void SigSourceMux::forwardBuffer()
{
if (!m_filled)
return;
if (m_filled < 3) {
unsigned char* data = (unsigned char*)m_buffer.data();
if (!(m_filled & 1))
data += m_sample;
::memset(data,m_idleValue,m_sample);
}
m_filled = 0;
Forward(m_buffer);
}
// Remove the source for the appropriate consumer
void SigSourceMux::removeSource(bool first)
{
if (first && m_firstSrc) {
m_firstSrc->attach(0,true);
m_firstSrc->deref();
m_firstSrc = 0;
}
if (!first && m_secondSrc) {
m_secondSrc->attach(0,true);
m_secondSrc->deref();
m_secondSrc = 0;
}
}
/**
* SigIsdnCallRecord
*/
SigIsdnCallRecord::SigIsdnCallRecord(SigIsdnMonitor* monitor, const char* id,
ISDNQ931CallMonitor* mon)
: CallEndpoint(id),
m_lock(true),
m_monitor(monitor),
m_call(mon)
{
}
SigIsdnCallRecord::~SigIsdnCallRecord()
{
if (m_monitor)
m_monitor->removeCall(this,m_call);
}
bool SigIsdnCallRecord::init(SignallingEvent* event)
{
if (!(event && event->message()))
return false;
Lock lock(m_lock);
SignallingMessage* msg = event->message();
m_caller = msg->params().getValue("caller");
m_called = msg->params().getValue("called");
return update(event);
}
// Create the multiplexer if missing
// Update sources for the multiplexer. Change them if circuit changed
// Start recording if the multiplexer has at least one source
bool SigIsdnCallRecord::update(SignallingEvent* event)
{
Lock lock(m_lock);
if (!(event && event->call() && event->message()))
return true;
SignallingCall* call = event->call();
SignallingMessage* msg = event->message();
bool chg = msg->params().getValue("circuit-change");
const char* format = msg->params().getValue("format");
SigSourceMux* source = static_cast<SigSourceMux*>(getSource());
if (!source) {
if (!format)
return true;
source = new SigSourceMux(format,
m_monitor ? m_monitor->sample() : 160,
m_monitor ? m_monitor->idleValue() : 255);
setSource(source);
source->deref();
if (!getSource()) {
Debug(id(),DebugWarn,"No data source. Terminate [%p]",this);
return false;
}
// Start recording
Message m("call.execute");
m.userData(this);
m.addParam("caller",m_caller);
m.addParam("called",m_called);
m.addParam("maxlen","60000");
m.addParam("target","record/home/marian/Desktop/test.wav");
if (!Engine::dispatch(m)) {
Debug(id(),DebugWarn,"Failed to start recording. Terminate [%p]",this);
return false;
}
}
if (format && source->getFormat() != format) {
Debug(id(),DebugWarn,"Data format changed. Terminate [%p]",this);
setSource();
return false;
}
if (chg) {
source->removeSource(true);
source->removeSource(false);
}
bool first = true;
while (true) {
if (!source->hasSource(first)) {
SignallingCircuit* cic = static_cast<SignallingCircuit*>(call->getObject(
first ? "SignallingCircuitCaller" : "SignallingCircuitCalled"));
DataSource* src = cic ? static_cast<DataSource*>(cic->getObject("DataSource")) : 0;
if (src) {
source->attach(first,src);
DDebug(id(),DebugAll,"Data source for channel %c set to (%p) [%p]",
first ? '1' : '2',src,this);
}
}
if (!first)
break;
first = false;
}
return true;
}
/**
* SigLinkThread
*/
@ -1443,7 +1840,7 @@ void SigLinkThread::run()
Time time;
event = m_link->controller()->getEvent(time);
if (event) {
plugin.handleEvent(event);
m_link->handleEvent(event);
delete event;
}
// Check timeout if waiting to terminate