yate/modules/mux.cpp

772 lines
23 KiB
C++

/**
* mux.cpp
* This file is part of the YATE Project http://YATE.null.ro
*
* Data multiplex
*
* Yet Another Telephony Engine - a fully featured software PBX and IVR
* Copyright (C) 2004-2023 Null Team
*
* This software is distributed under multiple licenses;
* see the COPYING file in the main directory for licensing
* information for this specific distribution.
*
* This use of this software may be subject to additional restrictions.
* See the LEGAL file in the main directory for details.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
*/
#include <yatephone.h>
#include <string.h>
using namespace TelEngine;
namespace { // anonymous
class MuxConsumer; // Consumer used to push data a source multiplexer
class MuxSource; // A data source multiplexer
class MuxModule; // The module
class ChanAttachHandler; // chan.attach handler
// Consumer used to push data to a MuxSource
class MuxConsumer : public DataConsumer
{
friend class MuxSource;
public:
MuxConsumer(MuxSource* owner, unsigned int chan, const char* format, bool reference);
virtual ~MuxConsumer()
{}
// Send data to the owner, if any
virtual unsigned long Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags);
protected:
virtual void destroyed();
private:
MuxSource* m_owner; // The owner of this consumer
RefPointer<RefObject> m_ref; // Reference keeper
unsigned int m_channel; // The channel allocated by the owner
unsigned int m_bufferFilled; // The number of samples in the buffer kept by the owner
unsigned int m_overErrors; // Buffer overrun errors
};
// A data source multiplexer
class MuxSource : public DataSource, public DebugEnabler
{
friend class MuxConsumer;
public:
MuxSource(const String& id, const char* targetid, const char* format,
const NamedList& params, String& error, bool reference);
virtual ~MuxSource()
{}
inline const String& id() const
{ return m_id; }
inline const String& targetid() const
{ return m_targetid; }
inline unsigned int channels()
{ return m_channels; }
// Get the consumer for a specific channel
inline MuxConsumer* getConsumer(unsigned int channel) {
return channel < m_channels ? m_consumers[channel] : 0;
}
// Check if a channel's consumer exists and has a source attached
inline bool hasSource(unsigned int channel) {
return channel < m_channels && m_consumers[channel] &&
m_consumers[channel]->getConnSource();
}
// Set/remove the source for a channel's consumer
// channel: The channel whose source will be replaced
bool setSource(unsigned int channel, DataSource* source = 0);
// Multiplex received data from consumers and forward it
void consume(MuxConsumer& cons, const DataBlock& data, unsigned long tStamp);
virtual const String& toString() const
{ return m_id; }
protected:
// Set/remove a consumer. Fill it's buffer with idle values if removed
void setConsumer(unsigned int channel, MuxConsumer* pCons = 0);
// Clear consumers list (remove their owner before)
virtual void destroyed();
private:
// Forward the buffer if at least one channel is filled. Reset data
// Fill incomplete channel buffers with idle value before forwarding the data
void forwardBuffer();
// Fill (interlaced samples) buffer with samples of received data
// If no data, fill the free space with idle value
void fillBuffer(unsigned int channel, unsigned int& filled,
unsigned char* data = 0, unsigned int samples = 0);
Mutex m_lock; // Lock consumers changes and data processing
String m_id; // The id wthin this module
String m_targetid; // The id of the target (user)
MuxConsumer** m_consumers; // The consumers
unsigned int m_channels; // The number of consumers
unsigned int m_full; // The number of consumers with full buffers
unsigned char m_idleValue; // Filling value for missing data
unsigned int m_sampleLen; // The format sample length
unsigned int m_maxSamples; // Maximum samples in a channel buffer
unsigned int m_delta; // Offset to write samples for each channel (m_channels * m_sampleLen)
DataBlock m_buffer; // Multiplex buffer
unsigned int m_error; // The number of data length violation error
};
// The module
class MuxModule : public Module
{
public:
enum {
Attach = Private,
Record = (Private << 1)
};
MuxModule();
~MuxModule();
virtual void initialize();
// Append source
inline void append(MuxSource* src) {
if (!src)
return;
Lock lock(this);
m_sources.append(src)->setDelete(false);
}
// Remove source
inline void remove(MuxSource* src) {
Lock lock(this);
m_sources.remove(src,false);
}
protected:
// Respond to a request to attach/change a multiplexer
bool chanAttach(Message& msg);
// Create a multiplexer for bidirectional recording
bool chanRecord(Message& msg);
virtual bool received(Message& msg, int id);
virtual void statusParams(String& str);
virtual void statusDetail(String& str);
private:
bool m_first; // First init flag
String m_prefix; // Module's prefix (name/)
unsigned int m_id; // Next sources's id
ObjList m_sources; // Source list
};
/**
* Module data and function
*/
static Configuration s_cfg; // Configuration file
static unsigned int s_chanBuffer; // The buffer length of one channel of a data source multiplexer
static unsigned char s_idleValue; // Idle value for source multiplexer to fill when no data
static String s_defFormat; // Default format for MuxSource
static MuxModule plugin;
// Dictionary containig the supported formats and sample lengths
static const TokenDict s_dictSampleLen[] = {
{"mulaw", 1},
{"mulaw/16000", 1},
{"mulaw/32000", 1},
{"alaw", 1},
{"alaw/16000", 1},
{"alaw/32000", 1},
{"slin", 2},
{"slin/16000", 2},
{"slin/32000", 2},
{0,0},
};
// Request a data source for a channel
inline DataSource* getChannelSource(GenObject* target, unsigned int channel)
{
if (!target)
return 0;
String chNo;
chNo << "DataSource" << channel;
GenObject* ret = (GenObject*)target->getObject(chNo);
chNo.clear();
return ret ? static_cast<DataSource*>(ret->getObject(YATOM("DataSource"))) : 0;
}
/**
* MuxConsumer
*/
#define ENABLER (m_owner?(DebugEnabler*)m_owner:(DebugEnabler*)&plugin)
#define OWNERNAME (m_owner?m_owner->debugName():"(null)")
MuxConsumer::MuxConsumer(MuxSource* owner, unsigned int chan, const char* format, bool reference)
: DataConsumer(format),
m_owner(owner),
m_channel(chan),
m_bufferFilled(0),
m_overErrors(0)
{
if (m_owner)
m_owner->setConsumer(m_channel,this);
if (reference)
m_ref = owner;
DDebug(ENABLER,DebugAll,"MuxConsumer(%s,%u) created [%p]",OWNERNAME,m_channel,this);
}
unsigned long MuxConsumer::Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags)
{
if (m_owner) {
m_owner->consume(*this,data,tStamp);
return invalidStamp();
}
return 0;
}
void MuxConsumer::destroyed()
{
plugin.lock();
DDebug(ENABLER,DebugAll,"MuxConsumer(%s,%u) destroyed [%p]",
OWNERNAME,m_channel,this);
if (m_owner)
m_owner->setConsumer(m_channel);
m_owner = 0;
plugin.unlock();
m_ref = 0;
DataConsumer::destroyed();
}
#undef ENABLER
#undef OWNERNAME
/**
* MuxSource
*/
MuxSource::MuxSource(const String& id, const char* targetid, const char* format,
const NamedList& params, String& error, bool reference)
: DataSource(format),
m_lock(true,"MuxSource::lock"),
m_id(id),
m_targetid(targetid),
m_consumers(0),
m_channels(0),
m_full(0),
m_idleValue(0),
m_sampleLen(0),
m_maxSamples(0),
m_delta(0),
m_error(0)
{
debugName(m_id);
debugChain(&plugin);
const char* channelFormat = 0;
// Set the number of channels and the format
while (true) {
int pos = getFormat().find("*");
if (pos < 1)
break;
int channels = getFormat().substr(0,pos).toInteger();
if (channels < 2)
break;
m_channels = (unsigned int)channels;
// Get the format of each channel and the sample length. Skip channel count and '*' from format
pos++;
if ((unsigned int)pos < getFormat().String::length()) {
channelFormat = getFormat().c_str() + pos;
m_sampleLen = lookup(channelFormat,s_dictSampleLen,0);
}
break;
}
if (!m_sampleLen) {
error << "Unsupported format '" << getFormat() << "'";
return;
}
m_delta = m_channels * m_sampleLen;
m_idleValue = params.getIntValue("idlevalue",s_idleValue);
unsigned int chanBuffer = params.getIntValue("chanbuffer",s_chanBuffer);
if (getFormat().endsWith("/16000"))
chanBuffer *= 2;
else if (getFormat().endsWith("/32000"))
chanBuffer *= 4;
// Adjust channel buffer to be multiple of sample length and not lesser then it
if (chanBuffer < m_sampleLen)
chanBuffer = m_sampleLen;
m_maxSamples = chanBuffer / m_sampleLen;
chanBuffer = m_maxSamples * m_sampleLen;
m_buffer.assign(0,m_channels * chanBuffer);
// Create consumers
m_consumers = new MuxConsumer*[m_channels];
::memset(m_consumers,0,m_channels * sizeof(MuxConsumer*));
for (unsigned int i = 0; i < m_channels; i++)
new MuxConsumer(this,i,channelFormat,reference);
Debug(this,DebugAll,
"Created channels=%u format=%s sample=%u buffer=%u targetid=%s [%p]",
m_channels,getFormat().c_str(),m_sampleLen,m_buffer.length(),targetid,this);
plugin.append(this);
}
// Set/remove a consumer. Fill it's buffer with idle value if removed
void MuxSource::setConsumer(unsigned int channel, MuxConsumer* pCons)
{
Lock lock(m_lock);
if (!m_consumers || channel >= m_channels || (pCons && pCons->m_owner != this))
return;
MuxConsumer* old = m_consumers[channel];
if (old == pCons)
return;
plugin.lock();
m_consumers[channel] = 0;
if (old) {
old->m_overErrors = 0;
old->m_owner = 0;
}
plugin.unlock();
m_consumers[channel] = pCons;
if (pCons)
Debug(this,DebugAll,"Consumer for channel %u set to (%p) [%p]",
channel,pCons,this);
else {
unsigned int tmp = 0;
fillBuffer(channel,tmp);
Debug(this,DebugAll,"Removed consumer (%p) for channel %u [%p]",
old,channel,this);
}
}
// Set/remove the source for a channel's consumer
bool MuxSource::setSource(unsigned int channel, DataSource* source)
{
Lock lock(m_lock);
if (channel >= m_channels || !m_consumers[channel])
return false;
DataSource* old = m_consumers[channel]->getConnSource();
if (old == source)
return true;
if (old) {
old->detach(m_consumers[channel]);
if (!m_consumers[channel]) {
Debug(this,DebugNote,
"Channel %u consumer vanished after detaching from source (%p) [%p]",
channel,old,this);
return false;
}
else
Debug(this,DebugAll,"Channel %u detached from source (%p) [%p]",channel,old,this);
}
if (!source)
return true;
source->attach(m_consumers[channel]);
Debug(this,DebugAll,"Channel %u attached to source (%p) [%p]",channel,source,this);
return true;
}
// Multiplex received data from consumers and forward it
// Forward multiplexed buffer if chan already filled
// If received data is not greater then free space:
// Fill chan buffer with data
// If all channels are filled, forward the multiplexed buffer
// Otherwise:
// Fill free chan buffer
// Forward buffer and consume the rest
void MuxSource::consume(MuxConsumer& consumer, const DataBlock& data, unsigned long tStamp)
{
if (!data.length() || consumer.m_owner != this)
return;
Lock lock(m_lock,100000);
if (!(lock.locked() && alive())) {
Debug(this,DebugMild,"Locking failed, dropping %u bytes [%p]",data.length(),this);
return;
}
XDebug(this,DebugAll,"Consuming %u bytes on channel %u [%p]",
data.length(),consumer.m_channel,this);
if ((data.length() % m_sampleLen) && !m_error) {
Debug(this,DebugWarn,"Wrong sample (received %u bytes) on channel %u [%p]",
data.length(),consumer.m_channel,this);
m_error++;
}
unsigned int samples = data.length() / m_sampleLen;
// Forward buffer if already filled for this channel
if (consumer.m_bufferFilled == m_maxSamples) {
consumer.m_overErrors++;
if (0 == consumer.m_overErrors % 5)
DDebug(this,DebugMild,"Buffer overrun on channel %u [%p]",
consumer.m_channel,this);
forwardBuffer();
}
unsigned int freeSamples = m_maxSamples - consumer.m_bufferFilled;
unsigned char* buf = (unsigned char*)data.data();
if (samples <= freeSamples) {
fillBuffer(consumer.m_channel,consumer.m_bufferFilled,buf,samples);
if (m_full == m_channels)
forwardBuffer();
XDebug(this,DebugAll,"Consumed all %u bytes on channel %u [%p]",
samples * m_sampleLen,consumer.m_channel,this);
return;
}
// Received more samples that free space in buffer
fillBuffer(consumer.m_channel,consumer.m_bufferFilled,buf,freeSamples);
forwardBuffer();
unsigned int consumed = freeSamples * m_sampleLen;
DDebug(this,DebugAll,"Consumed only %u/%u bytes on channel %u [%p]",
consumed,data.length(),consumer.m_channel,this);
DataBlock rest(buf + consumed,data.length() - consumed,false);
consume(consumer,rest,tStamp);
rest.clear(false);
}
void MuxSource::destroyed()
{
Lock2 lock(plugin,m_lock);
plugin.remove(this);
if (m_consumers) {
for (unsigned int i = 0; i < m_channels; i++) {
if (!m_consumers[i])
continue;
setSource(i);
if (m_consumers[i]->m_overErrors > 10)
Debug(this,DebugMild,
"Removing consumer on channel %u with %u overrun errors [%p]",
i,m_consumers[i]->m_overErrors,this);
m_consumers[i]->m_overErrors = 0;
m_consumers[i]->m_owner = 0;
TelEngine::destruct(m_consumers[i]);
}
delete[] m_consumers;
m_consumers = 0;
}
lock.drop();
if (!m_error)
Debug(this,DebugAll,"Destroyed targetid=%s [%p]",m_targetid.c_str(),this);
else
Debug(this,DebugMild,"Destroyed targetid=%s data length errors=%u [%p]",
m_targetid.c_str(),m_error,this);
DataSource::destroyed();
}
// Forward the buffer if at least one channel is filled. Reset data
// Fill incomplete channel buffers with idle value before forwarding the data
void MuxSource::forwardBuffer()
{
if (!m_full)
return;
// Fill incomplete buffers. Reset data
for (unsigned int i = 0; i < m_channels; i++) {
if (!m_consumers[i])
continue;
if (m_consumers[i]->m_bufferFilled < m_maxSamples) {
XDebug(this,DebugAll,"Filling %u idle values on channel %u [%p]",
m_sampleLen * (m_maxSamples - m_consumers[i]->m_bufferFilled),i,this);
fillBuffer(m_consumers[i]->m_channel,m_consumers[i]->m_bufferFilled);
}
m_consumers[i]->m_bufferFilled = 0;
}
m_full = 0;
XDebug(this,DebugAll,"Forwarding buffer [%p]",this);
Forward(m_buffer);
}
// Fill interlaced samples buffer with samples of received data
// If no data, fill the free space with idle value
void MuxSource::fillBuffer(unsigned int channel, unsigned int& filled,
unsigned char* data, unsigned int samples)
{
unsigned char* buf = (unsigned char*)m_buffer.data();
buf += m_sampleLen * (channel + filled * m_channels);
// Fill received data
if (data) {
if (samples > m_maxSamples - filled)
samples = m_maxSamples - filled;
filled += samples;
if (filled == m_maxSamples)
m_full++;
switch (m_sampleLen) {
case 1:
for (; samples; samples--, buf += m_delta)
*buf = *data++;
break;
case 2:
for (; samples; samples--, buf += m_delta) {
buf[0] = *data++;
buf[1] = *data++;
}
break;
default:
for (; samples; samples--, buf += m_delta, data += m_sampleLen)
::memcpy(buf,data,m_sampleLen);
}
return;
}
// Fill with idle value
samples = m_maxSamples - filled;
filled = m_maxSamples;
m_full++;
switch (m_sampleLen) {
case 1:
for (; samples; samples--, buf += m_delta)
*buf = m_idleValue;
break;
case 2:
for (; samples; samples--, buf += m_delta)
buf[0] = buf[1] = m_idleValue;
break;
default:
for (; samples; samples--, buf += m_delta, data += m_sampleLen)
::memset(buf,m_idleValue,m_sampleLen);
}
}
/**
* MuxModule
* Early init, late cleanup since we provide services to other modules
*/
MuxModule::MuxModule()
: Module("mux","misc",true),
m_first(true),
m_id(1)
{
Output("Loaded module MUX");
m_prefix << debugName() << "/";
}
MuxModule::~MuxModule()
{
Output("Unloading module MUX");
}
void MuxModule::initialize()
{
Output("Initializing module MUX");
s_cfg = Engine::configFile("mux");
s_cfg.load();
// Startup
if (m_first) {
setup();
installRelay(Attach,"chan.attach",100);
installRelay(Record,"chan.record",100);
}
s_chanBuffer = s_cfg.getIntValue("general","chanbuffer",160);
if (s_chanBuffer < 1)
s_chanBuffer = 1;
unsigned int ui = s_cfg.getIntValue("general","idlevalue",255);
s_idleValue = (ui <= 255 ? ui : 255);
const char* format = s_cfg.getValue("general","format");
if (!lookup(format,s_dictSampleLen))
format = "alaw";
s_defFormat.clear();
s_defFormat << "2*" << format;
m_first = false;
}
bool MuxModule::received(Message& msg, int id)
{
switch (id) {
case Attach:
return chanAttach(msg);
case Record:
return chanRecord(msg);
}
return Module::received(msg,id);
}
// Respond to a request to attach/change a multiplexer
bool MuxModule::chanAttach(Message& msg)
{
String id = msg.getValue("source");
if (Engine::exiting() || !id.startSkip(m_prefix,false))
return false;
GenObject* sender = msg.userData();
if (!sender) {
msg.setParam("error","No userdata");
return false;
}
MuxSource* src = 0;
String error;
const char* targetid = msg.getValue("notify");
// Check if should fail on channel source attach failure
bool failOne = msg.getBoolValue("fail",false);
if (!id) {
lock();
id << m_prefix << m_id++;
unlock();
src = new MuxSource(id,targetid,msg.getValue("format",s_defFormat),msg,error,false);
// Set channel sources
if (!error) {
unsigned int count = 0;
for (unsigned int channel = 0; channel < src->channels(); channel++) {
if (src->setSource(channel,getChannelSource(sender,channel)))
count++;
else if (failOne) {
error << "Attach failure on channel " << channel;
break;
}
}
if (!error && !count && msg.getBoolValue("failempty",false))
error = "Attach failure on all channels";
}
if (!error) {
msg.userData(src);
msg.setParam("id",src->debugName());
}
}
else {
Lock lock(plugin);
ObjList* o = m_sources.find(id);
src = o ? static_cast<MuxSource*>(o->get()) : 0;
if (!(src && src->ref()))
return false;
lock.drop();
id = src->debugName();
// Modify sources
unsigned int n = msg.count();
for (unsigned int i = 0; i < n; i++) {
NamedString* ns = msg.getParam(i);
if (!(ns && ns->name() == "channel"))
continue;
unsigned int channel = ns->toInteger(src->channels());
if (channel < src->channels() &&
src->setSource(channel,getChannelSource(sender,channel)))
continue;
if (failOne) {
if (channel >= src->channels())
error << "Invalid channel=" << *ns;
else
error << "Attach failure on channel " << channel;
break;
}
}
}
if (error) {
Debug(this,DebugNote,"MuxSource failure id=%s targetid=%s error='%s'",
id.c_str(),targetid,error.c_str());
msg.setParam("error",error);
}
TelEngine::destruct(src);
return !error;
}
// Create a multiplexer for bidirectional recording
bool MuxModule::chanRecord(Message& msg)
{
NamedString* both = msg.getParam(YSTRING("both"));
if (TelEngine::null(both))
return false;
CallEndpoint* ch = YOBJECT(CallEndpoint,msg.userData());
RefPointer<DataEndpoint> de = YOBJECT(DataEndpoint,msg.userData());
if (*both == "-") {
if (ch && !de)
de = ch->getEndpoint();
if (de) {
de->setCallRecord();
de->setPeerRecord();
}
return msg.getBoolValue(YSTRING("single"));
}
if (ch && !de) {
de = ch->setEndpoint();
if (!de)
return false;
}
if (!de) {
Debug(DebugWarn,"Consumer '%s' both record with no data channel!",both->c_str());
return false;
}
const char* targetid = msg.getValue(YSTRING("notify"));
String format = msg.getValue(YSTRING("format"),s_defFormat);
String muxFormat = format;
muxFormat.startSkip("2*",false);
switch (muxFormat.toInteger(s_dictSampleLen)) {
case 1:
case 2:
break;
default:
format = "slin";
muxFormat = "slin";
}
muxFormat = "2*" + muxFormat;
Message m("chan.record");
m.addParam("call",*both);
if (targetid)
m.addParam("notify",targetid);
m.addParam("format",format);
m.copyParam(msg,YSTRING("append"));
m.copyParam(msg,YSTRING("maxlen"));
m.addParam("call_account",msg.getValue(YSTRING("both_account")),false);
m.addParam("call_query",msg.getValue(YSTRING("both_query")),false);
m.addParam("call_fallback",msg.getValue(YSTRING("both_fallback")),false);
m.addParam("single",String::boolText(true));
DataEndpoint* ep = new DataEndpoint;
m.userData(ep);
Engine::dispatch(m);
RefPointer<DataConsumer> c = ep->getCallRecord();
m.userData(0);
TelEngine::destruct(ep);
if (!c)
return false;
String error;
String id;
lock();
id << m_prefix << m_id++;
unlock();
MuxSource* s = new MuxSource(id,targetid,muxFormat,msg,error,true);
if (error.null()) {
if (DataTranslator::attachChain(s,c)) {
// Consumers are kept referenced by the DataEndpoint
DataConsumer* dc = s->getConsumer(0);
de->setCallRecord(dc);
TelEngine::destruct(dc);
dc = s->getConsumer(1);
de->setPeerRecord(dc);
TelEngine::destruct(dc);
}
else
error = "Translator chain attach failure";
}
// Source is kept referenced by the consumers
TelEngine::destruct(s);
c = 0;
if (error)
msg.setParam("error",error);
return error.null() && msg.getBoolValue(YSTRING("single"));
}
void MuxModule::statusParams(String& str)
{
Module::statusParams(str);
str.append("count=",",") << m_sources.count() << ",format=channels|targetid";
}
void MuxModule::statusDetail(String& str)
{
Module::statusDetail(str);
for (ObjList* o = m_sources.skipNull(); o; o = o->skipNext()) {
MuxSource* s = static_cast<MuxSource*>(o->get());
str.append(s->id(),",") << "=" << s->channels() << "|" << s->targetid();
}
}
}; // anonymous namespace
/* vi: set ts=8 sw=4 sts=4 noet: */