Support for multiplexing both recorded parties into a single file or stream.

git-svn-id: http://voip.null.ro/svn/yate@5308 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
paulc 2012-10-24 11:18:47 +00:00
parent fc78cb9ac8
commit 349584635f
1 changed files with 144 additions and 47 deletions

View File

@ -39,7 +39,7 @@ class MuxConsumer : public DataConsumer
{
friend class MuxSource;
public:
MuxConsumer(MuxSource* owner, unsigned int chan, const char* format);
MuxConsumer(MuxSource* owner, unsigned int chan, const char* format, bool reference);
virtual ~MuxConsumer()
{}
// Send data to the owner, if any
@ -48,6 +48,7 @@ protected:
virtual void destroyed();
private:
MuxSource* m_owner; // The owner of this consumer
RefPointer<RefObject> m_ref; // Reference keeper
unsigned int m_channel; // The channel allocated by the owner
unsigned int m_bufferFilled; // The number of samples in the buffer kept by the owner
unsigned int m_overErrors; // Buffer overrun errors
@ -59,7 +60,7 @@ class MuxSource : public DataSource, public DebugEnabler
friend class MuxConsumer;
public:
MuxSource(const String& id, const char* targetid, const char* format,
const NamedList& params, String& error);
const NamedList& params, String& error, bool reference);
virtual ~MuxSource()
{}
inline const String& id() const
@ -68,6 +69,10 @@ public:
{ return m_targetid; }
inline unsigned int channels()
{ return m_channels; }
// Get the consumer for a specific channel
inline MuxConsumer* getConsumer(unsigned int channel) {
return channel < m_channels ? m_consumers[channel] : 0;
}
// Check if a channel's consumer exists and has a source attached
inline bool hasSource(unsigned int channel) {
return channel < m_channels && m_consumers[channel] &&
@ -112,11 +117,13 @@ private:
class MuxModule : public Module
{
public:
enum {
Attach = Private,
Record = (Private << 1)
};
MuxModule();
~MuxModule();
virtual void initialize();
// Respond to a request to attach/change a multiplexer
bool chanAttach(Message& msg);
// Append source
inline void append(MuxSource* src) {
if (!src)
@ -130,6 +137,11 @@ public:
m_sources.remove(src,false);
}
protected:
// Respond to a request to attach/change a multiplexer
bool chanAttach(Message& msg);
// Create a multiplexer for bidirectional recording
bool chanRecord(Message& msg);
virtual bool received(Message& msg, int id);
virtual void statusParams(String& str);
virtual void statusDetail(String& str);
private:
@ -149,18 +161,8 @@ static unsigned char s_idleValue; // Idle value for source multiplexer to
static String s_defFormat; // Default format for MuxSource
static MuxModule plugin;
// chan.attach handler
class ChanAttachHandler : public MessageHandler
{
public:
inline ChanAttachHandler()
: MessageHandler("chan.attach",100,plugin.name())
{}
virtual bool received(Message& msg);
};
// Dictionary containig the supported formats and sample lengths
static TokenDict s_dictSampleLen[] = {
static const TokenDict s_dictSampleLen[] = {
{"mulaw", 1},
{"alaw", 1},
{"slin", 2},
@ -186,7 +188,7 @@ inline DataSource* getChannelSource(GenObject* target, unsigned int channel)
#define ENABLER (m_owner?(DebugEnabler*)m_owner:(DebugEnabler*)&plugin)
#define OWNERNAME (m_owner?m_owner->debugName():"(null)")
MuxConsumer::MuxConsumer(MuxSource* owner, unsigned int chan, const char* format)
MuxConsumer::MuxConsumer(MuxSource* owner, unsigned int chan, const char* format, bool reference)
: DataConsumer(format),
m_owner(owner),
m_channel(chan),
@ -195,6 +197,8 @@ MuxConsumer::MuxConsumer(MuxSource* owner, unsigned int chan, const char* format
{
if (m_owner)
m_owner->setConsumer(m_channel,this);
if (reference)
m_ref = owner;
DDebug(ENABLER,DebugAll,"MuxConsumer(%s,%u) created [%p]",OWNERNAME,m_channel,this);
}
@ -216,6 +220,7 @@ void MuxConsumer::destroyed()
m_owner->setConsumer(m_channel);
m_owner = 0;
plugin.unlock();
m_ref = 0;
DataConsumer::destroyed();
}
@ -227,7 +232,7 @@ void MuxConsumer::destroyed()
* MuxSource
*/
MuxSource::MuxSource(const String& id, const char* targetid, const char* format,
const NamedList& params, String& error)
const NamedList& params, String& error, bool reference)
: DataSource(format),
m_lock(true,"MuxSource::lock"),
m_id(id),
@ -256,7 +261,7 @@ MuxSource::MuxSource(const String& id, const char* targetid, const char* format,
m_channels = (unsigned int)channels;
// Get the format of each channel and the sample length. Skip channel count and '*' from format
pos++;
if ((unsigned int)pos < getFormat().length()) {
if ((unsigned int)pos < getFormat().String::length()) {
channelFormat = getFormat().c_str() + pos;
m_sampleLen = lookup(channelFormat,s_dictSampleLen,0);
}
@ -282,7 +287,7 @@ MuxSource::MuxSource(const String& id, const char* targetid, const char* format,
m_consumers = new MuxConsumer*[m_channels];
::memset(m_consumers,0,m_channels * sizeof(MuxConsumer*));
for (unsigned int i = 0; i < m_channels; i++)
new MuxConsumer(this,i,channelFormat);
new MuxConsumer(this,i,channelFormat,reference);
Debug(this,DebugAll,
"Created channels=%u format=%s sample=%u buffer=%u targetid=%s [%p]",
@ -389,7 +394,7 @@ void MuxSource::consume(MuxConsumer& consumer, const DataBlock& data, unsigned l
fillBuffer(consumer.m_channel,consumer.m_bufferFilled,buf,samples);
if (m_full == m_channels)
forwardBuffer();
XDebug(this,DebugAll,"Consumed %u bytes on channel %u [%p]",
XDebug(this,DebugAll,"Consumed all %u bytes on channel %u [%p]",
samples * m_sampleLen,consumer.m_channel,this);
return;
}
@ -398,7 +403,7 @@ void MuxSource::consume(MuxConsumer& consumer, const DataBlock& data, unsigned l
fillBuffer(consumer.m_channel,consumer.m_bufferFilled,buf,freeSamples);
forwardBuffer();
unsigned int consumed = freeSamples * m_sampleLen;
Debug(this,DebugAll,"Consumed %u/%u bytes on channel %u [%p]",
DDebug(this,DebugAll,"Consumed only %u/%u bytes on channel %u [%p]",
consumed,data.length(),consumer.m_channel,this);
DataBlock rest(buf + consumed,data.length() - consumed,false);
consume(consumer,rest,tStamp);
@ -410,20 +415,22 @@ void MuxSource::destroyed()
Lock2 lock(plugin,m_lock);
plugin.remove(this);
for (unsigned int i = 0; i < m_channels; i++) {
if (!m_consumers[i])
continue;
setSource(i);
if (m_consumers[i]->m_overErrors > 10)
Debug(this,DebugMild,
"Removing consumer on channel %u with %u overrun errors [%p]",
i,m_consumers[i]->m_overErrors,this);
m_consumers[i]->m_overErrors = 0;
m_consumers[i]->m_owner = 0;
TelEngine::destruct(m_consumers[i]);
if (m_consumers) {
for (unsigned int i = 0; i < m_channels; i++) {
if (!m_consumers[i])
continue;
setSource(i);
if (m_consumers[i]->m_overErrors > 10)
Debug(this,DebugMild,
"Removing consumer on channel %u with %u overrun errors [%p]",
i,m_consumers[i]->m_overErrors,this);
m_consumers[i]->m_overErrors = 0;
m_consumers[i]->m_owner = 0;
TelEngine::destruct(m_consumers[i]);
}
delete[] m_consumers;
m_consumers = 0;
}
delete[] m_consumers;
m_consumers = 0;
lock.drop();
if (!m_error)
Debug(this,DebugAll,"Destroyed targetid=%s [%p]",m_targetid.c_str(),this);
@ -533,7 +540,8 @@ void MuxModule::initialize()
// Startup
if (m_first) {
setup();
Engine::install(new ChanAttachHandler);
installRelay(Attach,"chan.attach",100);
installRelay(Record,"chan.record",100);
}
s_chanBuffer = s_cfg.getIntValue("general","chanbuffer",160);
@ -551,6 +559,17 @@ void MuxModule::initialize()
m_first = false;
}
bool MuxModule::received(Message& msg, int id)
{
switch (id) {
case Attach:
return chanAttach(msg);
case Record:
return chanRecord(msg);
}
return Module::received(msg,id);
}
// Respond to a request to attach/change a multiplexer
bool MuxModule::chanAttach(Message& msg)
{
@ -572,11 +591,11 @@ bool MuxModule::chanAttach(Message& msg)
bool failOne = msg.getBoolValue("fail",false);
if (!id) {
plugin.lock();
lock();
id << m_prefix << m_id++;
plugin.unlock();
unlock();
src = new MuxSource(id,targetid,msg.getValue("format",s_defFormat),msg,error);
src = new MuxSource(id,targetid,msg.getValue("format",s_defFormat),msg,error,false);
// Set channel sources
if (!error) {
@ -635,6 +654,93 @@ bool MuxModule::chanAttach(Message& msg)
return !error;
}
// Create a multiplexer for bidirectional recording
bool MuxModule::chanRecord(Message& msg)
{
NamedString* both = msg.getParam(YSTRING("both"));
if (TelEngine::null(both))
return false;
CallEndpoint* ch = YOBJECT(CallEndpoint,msg.userData());
RefPointer<DataEndpoint> de = YOBJECT(DataEndpoint,msg.userData());
if (*both == "-") {
if (ch && !de)
de = ch->getEndpoint();
if (de) {
de->setCallRecord();
de->setPeerRecord();
}
return msg.getBoolValue(YSTRING("single"));
}
if (ch && !de)
de = ch->setEndpoint();
if (!de) {
Debug(DebugWarn,"Consumer '%s' both record with no data channel!",both->c_str());
return false;
}
const char* targetid = msg.getValue(YSTRING("notify"));
String format = msg.getValue(YSTRING("format"),s_defFormat);
String muxFormat = format;
muxFormat.startSkip("2*",false);
switch (muxFormat.toInteger(s_dictSampleLen)) {
case 1:
case 2:
break;
default:
format = "slin";
muxFormat = "slin";
}
muxFormat = "2*" + muxFormat;
Message m("chan.record");
m.addParam("call",*both);
if (targetid)
m.addParam("notify",targetid);
m.addParam("format",format);
m.copyParam(msg,YSTRING("append"));
m.copyParam(msg,YSTRING("maxlen"));
m.addParam("call_account",msg.getValue(YSTRING("both_account")),false);
m.addParam("call_query",msg.getValue(YSTRING("both_query")),false);
m.addParam("call_fallback",msg.getValue(YSTRING("both_fallback")),false);
m.addParam("single",String::boolText(true));
DataEndpoint* ep = new DataEndpoint;
m.userData(ep);
Engine::dispatch(m);
RefPointer<DataConsumer> c = ep->getCallRecord();
m.userData(0);
TelEngine::destruct(ep);
if (!c)
return false;
String error;
String id;
lock();
id << m_prefix << m_id++;
unlock();
MuxSource* s = new MuxSource(id,targetid,muxFormat,msg,error,true);
if (error.null()) {
if (DataTranslator::attachChain(s,c)) {
// Consumers are kept referenced by the DataEndpoint
DataConsumer* dc = s->getConsumer(0);
de->setCallRecord(dc);
TelEngine::destruct(dc);
dc = s->getConsumer(1);
de->setPeerRecord(dc);
TelEngine::destruct(dc);
}
else
error = "Translator chain attach failure";
}
// Source is kept referenced by the consumers
TelEngine::destruct(s);
c = 0;
if (error)
msg.setParam("error",error);
return error.null() && msg.getBoolValue(YSTRING("single"));
}
void MuxModule::statusParams(String& str)
{
Module::statusParams(str);
@ -650,15 +756,6 @@ void MuxModule::statusDetail(String& str)
}
}
/**
* ChanAttachHandler
*/
bool ChanAttachHandler::received(Message& msg)
{
return plugin.chanAttach(msg);
}
}; // anonymous namespace
/* vi: set ts=8 sw=4 sts=4 noet: */