Added flags to data forwarded through DataNodes.

The amount of consumed data is returned to the source.


git-svn-id: http://yate.null.ro/svn/yate/trunk@2745 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
paulc 2009-07-02 09:24:33 +00:00
parent db25093aa1
commit 8520218d2d
29 changed files with 193 additions and 112 deletions

View File

@ -146,10 +146,11 @@ class SimpleTranslator : public DataTranslator
public:
SimpleTranslator(const DataFormat& sFormat, const DataFormat& dFormat)
: DataTranslator(sFormat,dFormat) { }
virtual void Consume(const DataBlock& data, unsigned long tStamp)
virtual unsigned long Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags)
{
if (!ref())
return;
return 0;
unsigned long len = 0;
while (getTransSource()) {
int nchan = m_format.numChannels();
if (nchan != getTransSource()->getFormat().numChannels())
@ -163,18 +164,19 @@ public:
}
DataBlock oblock;
if (oblock.convert(data, sFmt, dFmt)) {
if (tStamp == (unsigned long)-1) {
if (tStamp == invalidStamp()) {
unsigned int delta = data.length();
if (delta > oblock.length())
delta = oblock.length();
tStamp = m_timestamp + delta;
}
m_timestamp = tStamp;
getTransSource()->Forward(oblock, tStamp);
len = getTransSource()->Forward(oblock, tStamp, flags);
}
break;
}
deref();
return len;
}
};
@ -188,11 +190,12 @@ public:
: DataTranslator(sFormat,dFormat),
m_sRate(sFormat.sampleRate()), m_dRate(dFormat.sampleRate())
{ }
virtual void Consume(const DataBlock& data, unsigned long tStamp)
virtual unsigned long Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags)
{
unsigned int n = data.length();
if (!n || (n & 1) || !m_sRate || !m_dRate || !ref())
return;
return 0;
unsigned long len = 0;
n /= 2;
DataSource* src = getTransSource();
if (src) {
@ -235,9 +238,10 @@ public:
}
if (src->timeStamp() != invalidStamp())
delta += src->timeStamp();
src->Forward(oblock, delta);
len = src->Forward(oblock, delta, flags);
}
deref();
return len;
}
};
@ -251,11 +255,12 @@ public:
: DataTranslator(sFormat,dFormat),
m_sChans(sFormat.numChannels()), m_dChans(dFormat.numChannels())
{ }
virtual void Consume(const DataBlock& data, unsigned long tStamp)
virtual unsigned long Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags)
{
unsigned int n = data.length();
if (!n || (n & 1) || !ref())
return;
return 0;
unsigned long len = 0;
n /= 2;
if (getTransSource()) {
short* s = (short*) data.data();
@ -286,9 +291,10 @@ public:
*d++ = v;
}
}
getTransSource()->Forward(oblock, tStamp);
len = getTransSource()->Forward(oblock, tStamp, flags);
}
deref();
return len;
}
};
@ -453,18 +459,19 @@ void* DataConsumer::getObject(const String& name) const
return DataNode::getObject(name);
}
void DataConsumer::Consume(const DataBlock& data, unsigned long tStamp, DataSource* source)
unsigned long DataConsumer::Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags, DataSource* source)
{
if (source == m_override)
tStamp += m_overrideTsDelta;
else if (m_override || (source != m_source))
return;
return 0;
else
tStamp += m_regularTsDelta;
u_int64_t tsTime = Time::now();
Consume(data,tStamp);
unsigned long len = Consume(data,tStamp,flags);
m_timestamp = tStamp;
m_lastTsTime = tsTime;
return len;
}
bool DataConsumer::synchronize(DataSource* source)
@ -504,13 +511,13 @@ bool DataConsumer::synchronize(DataSource* source)
}
void DataSource::Forward(const DataBlock& data, unsigned long tStamp)
unsigned long DataSource::Forward(const DataBlock& data, unsigned long tStamp, unsigned long flags)
{
Lock mylock(this,100000);
// we DON'T refcount here, we rely on the mutex to keep us safe
if (!(mylock.mutex() && alive())) {
DDebug(DebugInfo,"Forwarding on a dead DataSource! [%p]",this);
return;
return 0;
}
// try to evaluate amount of samples in this packet
@ -526,13 +533,20 @@ void DataSource::Forward(const DataBlock& data, unsigned long tStamp)
m_timestamp,nSamp,this);
tStamp = m_timestamp + nSamp;
}
unsigned long len = invalidStamp();
ObjList *l = m_consumers.skipNull();
for (; l; l=l->skipNext()) {
DataConsumer *c = static_cast<DataConsumer *>(l->get());
c->Consume(data,tStamp,this);
unsigned long ll = c->Consume(data,tStamp,flags,this);
// get the minimum data amount forwarded to all consumers
if (len > ll)
len = ll;
}
if (len == invalidStamp())
len = 0;
m_timestamp = tStamp;
m_nextStamp = nSamp ? (tStamp + nSamp) : invalidStamp();
return len;
}
bool DataSource::attach(DataConsumer* consumer, bool override)

View File

@ -72,7 +72,7 @@ class AmrTrans : public DataTranslator
public:
AmrTrans(const char* sFormat, const char* dFormat, void* amrState, bool octetAlign = false);
virtual ~AmrTrans();
virtual void Consume(const DataBlock& data, unsigned long tStamp);
virtual unsigned long Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags);
inline bool valid() const
{ return 0 != m_amrState; }
static inline const char* alignName(bool align)
@ -178,10 +178,12 @@ AmrTrans::~AmrTrans()
}
// Actual transcoding of data
void AmrTrans::Consume(const DataBlock& data, unsigned long tStamp)
unsigned long AmrTrans::Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags)
{
if (!(m_amrState && getTransSource()))
return;
return 0;
if (data.null() && (flags & DataSilent))
return getTransSource()->Forward(data,tStamp,flags);
ref();
m_data += data;
if (!tStamp)
@ -189,6 +191,7 @@ void AmrTrans::Consume(const DataBlock& data, unsigned long tStamp)
while (pushData(tStamp))
;
deref();
return invalidStamp();
}
// Data error, report error 1st time and clear buffer

View File

@ -110,7 +110,7 @@ class AnalyzerCons : public DataConsumer, public Runnable
public:
AnalyzerCons(const String& type, const char* window = 0);
virtual ~AnalyzerCons();
virtual void Consume(const DataBlock& data, unsigned long tStamp);
virtual unsigned long Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags);
virtual void statusParams(String& str);
virtual void run();
protected:
@ -449,13 +449,13 @@ AnalyzerCons::~AnalyzerCons()
s_mutex.unlock();
}
void AnalyzerCons::Consume(const DataBlock& data, unsigned long tStamp)
unsigned long AnalyzerCons::Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags)
{
if (!m_timeStart) {
// the first data block may be garbled or have bad timestamp - ignore
m_timeStart = Time::now();
m_tsStart = tStamp;
return;
return invalidStamp();
}
unsigned int samples = data.length() / 2;
long delta = tStamp - timeStamp() - samples;
@ -468,11 +468,11 @@ void AnalyzerCons::Consume(const DataBlock& data, unsigned long tStamp)
m_tsGapLength += delta;
}
if (!m_spectrum)
return;
return invalidStamp();
m_data += data;
unsigned int len = 2 * m_spectrum->samples();
if (m_data.length() < len)
return;
return invalidStamp();
// limit the length of the buffer
int toCut = data.length() - (2 * len);
if (toCut > 0) {
@ -481,6 +481,7 @@ void AnalyzerCons::Consume(const DataBlock& data, unsigned long tStamp)
}
if (m_spectrum->prepare((const short*)m_data.data()))
m_data.cut(-(int)len);
return invalidStamp();
}
void AnalyzerCons::run()

View File

@ -78,8 +78,8 @@ private:
class DummyConsumer : public DataConsumer
{
public:
virtual void Consume(const DataBlock& data, unsigned long tStamp)
{ }
virtual unsigned long Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags)
{ return invalidStamp(); }
};
class GenThread : public Thread

View File

@ -68,7 +68,7 @@ public:
AlsaConsumer(AlsaDevice* dev);
~AlsaConsumer();
bool init();
virtual void Consume(const DataBlock &data, unsigned long tStamp);
virtual unsigned long Consume(const DataBlock &data, unsigned long tStamp, unsigned long flags);
private:
AlsaDevice* m_device;
unsigned m_total;
@ -248,12 +248,13 @@ AlsaConsumer::~AlsaConsumer()
m_device->deref();
}
void AlsaConsumer::Consume(const DataBlock &data, unsigned long tStamp)
unsigned long AlsaConsumer::Consume(const DataBlock &data, unsigned long tStamp, unsigned long flags)
{
if (m_device->closed() || data.null())
return;
return 0;
m_device->write(data.data(),data.length()/2);
m_total += data.length();
return invalidStamp();
}
AlsaChan::AlsaChan(const String& dev)

View File

@ -67,7 +67,7 @@ class DSoundConsumer : public DataConsumer
public:
DSoundConsumer(bool stereo = false);
~DSoundConsumer();
virtual void Consume(const DataBlock &data, unsigned long tStamp);
virtual unsigned long Consume(const DataBlock &data, unsigned long tStamp, unsigned long flags);
bool control(NamedList& msg);
private:
DSoundPlay* m_dsound;
@ -627,10 +627,13 @@ DSoundConsumer::~DSoundConsumer()
m_dsound->terminate();
}
void DSoundConsumer::Consume(const DataBlock &data, unsigned long tStamp)
unsigned long DSoundConsumer::Consume(const DataBlock &data, unsigned long tStamp, unsigned long flags)
{
if (m_dsound)
if (m_dsound) {
m_dsound->put(data);
return invalidStamp();
}
return 0;
}
bool DSoundConsumer::control(NamedList& msg)

View File

@ -71,7 +71,7 @@ public:
OssConsumer(OssDevice* dev);
~OssConsumer();
bool init();
virtual void Consume(const DataBlock &data, unsigned long tStamp);
virtual unsigned long Consume(const DataBlock &data, unsigned long tStamp, unsigned long flags);
private:
OssDevice* m_device;
unsigned m_total;
@ -280,12 +280,13 @@ OssConsumer::~OssConsumer()
m_device->deref();
}
void OssConsumer::Consume(const DataBlock &data, unsigned long tStamp)
unsigned long OssConsumer::Consume(const DataBlock &data, unsigned long tStamp, unsigned long flags)
{
if (m_device->closed() || data.null())
return;
return 0;
::write(m_device->fd(),data.data(),data.length());
m_total += data.length();
return invalidStamp();
}
OssChan::OssChan(const String& dev)

View File

@ -157,7 +157,7 @@ public:
{ }
~ConfConsumer()
{ }
virtual void Consume(const DataBlock& data, unsigned long tStamp);
virtual unsigned long Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags);
unsigned int energy() const;
unsigned int noise() const;
inline unsigned int energy2() const
@ -545,10 +545,10 @@ void ConfRoom::mix(ConfConsumer* cons)
// Compute the energy level and noise threshold, store the data and call mixer
void ConfConsumer::Consume(const DataBlock& data, unsigned long tStamp)
unsigned long ConfConsumer::Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags)
{
if (m_muted || data.null() || !m_room)
return;
return 0;
if (m_smart) {
// we need to compute the average energy and take decay into account
int64_t sum2 = m_energy2;
@ -580,13 +580,14 @@ void ConfConsumer::Consume(const DataBlock& data, unsigned long tStamp)
m_room->toString().c_str(),this);
// mute the channel to avoid getting back here
m_muted = true;
return;
return 0;
}
if (m_buffer.length()+data.length() <= MAX_BUFFER)
m_buffer += data;
m_room->unlock();
if (m_buffer.length() >= MIN_BUFFER)
m_room->mix(this);
return invalidStamp();
}
// Take out of the buffer the samples mixed in or skipped

View File

@ -93,7 +93,7 @@ class ExtModConsumer : public DataConsumer
public:
ExtModConsumer(Stream* str);
~ExtModConsumer();
virtual void Consume(const DataBlock& data, unsigned long timestamp);
virtual unsigned long Consume(const DataBlock& data, unsigned long timestamp, unsigned long flags);
private:
Stream* m_str;
unsigned m_total;
@ -437,12 +437,14 @@ ExtModConsumer::~ExtModConsumer()
}
}
void ExtModConsumer::Consume(const DataBlock& data, unsigned long timestamp)
unsigned long ExtModConsumer::Consume(const DataBlock& data, unsigned long timestamp, unsigned long flags)
{
if ((m_str) && !data.null()) {
m_str->writeData(data);
m_total += data.length();
return invalidStamp();
}
return 0;
}

View File

@ -82,7 +82,7 @@ class FaxConsumer : public DataConsumer
public:
FaxConsumer(FaxWrapper* wrapper, const char* format = "slin");
~FaxConsumer();
virtual void Consume(const DataBlock& data, unsigned long tStamp);
virtual unsigned long Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags);
private:
RefPointer<FaxWrapper> m_wrap;
};
@ -244,11 +244,12 @@ FaxConsumer::~FaxConsumer()
m_wrap = 0;
}
void FaxConsumer::Consume(const DataBlock& data, unsigned long tStamp)
unsigned long FaxConsumer::Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags)
{
if (data.null() || !m_wrap)
return;
return 0;
m_wrap->rxData(data,tStamp);
return invalidStamp();
}

View File

@ -139,7 +139,7 @@ public:
return (tmp && File::exists(m_tmpFileName)) ||
(file && File::exists(m_fileName));
}
virtual void Consume(const DataBlock& data, unsigned long tStamp);
virtual unsigned long Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags);
protected:
// Release memory
virtual void destroyed();
@ -544,7 +544,7 @@ FileConsumer::FileConsumer(const String& file, NamedList* params, const char* ch
m_delTemp = false;
}
void FileConsumer::Consume(const DataBlock& data, unsigned long tStamp)
unsigned long FileConsumer::Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags)
{
if (!m_startTime) {
m_startTime = Time::now();
@ -556,7 +556,7 @@ void FileConsumer::Consume(const DataBlock& data, unsigned long tStamp)
Debug(&__plugin,DebugNote,
"FileConsumer(%s) failed to start: temporary file already exists! [%p]",
m_fileName.c_str(),this);
return;
return 0;
}
m_delTemp = true;
if (!m_file.openPath(m_tmpFileName,true,false,true,true,true)) {
@ -566,12 +566,12 @@ void FileConsumer::Consume(const DataBlock& data, unsigned long tStamp)
Debug(&__plugin,DebugNote,
"FileConsumer(%s) failed to create temporary file. %d: '%s' [%p]",
m_fileName.c_str(),m_file.error(),error.c_str(),this);
return;
return 0;
}
}
if (data.null())
return;
return 0;
XDebug(&__plugin,DebugAll,"FileConsumer(%s) consuming %u bytes [%p]",
m_fileName.c_str(),data.length(),this);
@ -594,6 +594,7 @@ void FileConsumer::Consume(const DataBlock& data, unsigned long tStamp)
m_transferred += data.length();
if (m_transferred && (m_transferred >= m_fileSize))
terminate();
return data.length();
}
// Release memory

View File

@ -57,7 +57,7 @@ class GsmCodec : public DataTranslator
public:
GsmCodec(const char* sFormat, const char* dFormat, bool encoding);
~GsmCodec();
virtual void Consume(const DataBlock& data, unsigned long tStamp);
virtual unsigned long Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags);
private:
bool m_encoding;
gsm m_gsm;
@ -84,10 +84,12 @@ GsmCodec::~GsmCodec()
}
}
void GsmCodec::Consume(const DataBlock& data, unsigned long tStamp)
unsigned long GsmCodec::Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags)
{
if (!(m_gsm && getTransSource()))
return;
return 0;
if (data.null() && (flags & DataSilent))
return getTransSource()->Forward(data,tStamp,flags);
ref();
m_data += data;
DataBlock outdata;
@ -120,11 +122,13 @@ void GsmCodec::Consume(const DataBlock& data, unsigned long tStamp)
}
XDebug("GsmCodec",DebugAll,"%scoding %d frames of %d input bytes (consumed %d) in %d output bytes",
m_encoding ? "en" : "de",frames,m_data.length(),consumed,outdata.length());
unsigned long len = 0;
if (frames) {
m_data.cut(-consumed);
getTransSource()->Forward(outdata,tStamp);
len = getTransSource()->Forward(outdata,tStamp,flags);
}
deref();
return len;
}
GsmPlugin::GsmPlugin()

View File

@ -339,7 +339,7 @@ public:
virtual BOOL Close();
virtual BOOL IsOpen() const;
virtual BOOL Read(void *buf, PINDEX len);
virtual void Consume(const DataBlock &data, unsigned long tStamp);
virtual unsigned long Consume(const DataBlock &data, unsigned long tStamp, unsigned long flags);
private:
PAdaptiveDelay readDelay;
DataBlock m_buffer;
@ -1762,17 +1762,20 @@ BOOL YateH323AudioConsumer::IsOpen() const
return !m_exit;
}
void YateH323AudioConsumer::Consume(const DataBlock &data, unsigned long tStamp)
unsigned long YateH323AudioConsumer::Consume(const DataBlock &data, unsigned long tStamp, unsigned long flags)
{
if (m_exit)
return;
return 0;
Lock lock(this);
if ((m_buffer.length() + data.length()) <= (480*5))
if ((m_buffer.length() + data.length()) <= (480*5)) {
m_buffer += data;
return invalidStamp();
}
#ifdef DEBUG
else
Debug(&hplugin,DebugAll,"Consumer skipped %u bytes, buffer is full [%p]",data.length(),this);
#endif
return 0;
}
BOOL YateH323AudioConsumer::Read(void *buf, PINDEX len)

View File

@ -80,7 +80,7 @@ class iLBCCodec : public DataTranslator
public:
iLBCCodec(const char* sFormat, const char* dFormat, bool encoding, int msec);
~iLBCCodec();
virtual void Consume(const DataBlock& data, unsigned long timeDelta);
virtual unsigned long Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags);
private:
bool m_encoding;
DataBlock m_data;
@ -116,8 +116,13 @@ iLBCCodec::~iLBCCodec()
s_cmutex.unlock();
}
void iLBCCodec::Consume(const DataBlock& data, unsigned long tStamp)
unsigned long iLBCCodec::Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags)
{
if (!getTransSource())
return 0;
if (data.null() && (flags & DataSilent))
return getTransSource()->Forward(data,tStamp,flags);
ref();
// block size in samples per frame, no_bytes frame length in bytes
int block,no_bytes;
if (m_mode == 20)
@ -128,9 +133,6 @@ void iLBCCodec::Consume(const DataBlock& data, unsigned long tStamp)
block = BLOCKL_30MS;
no_bytes=NO_OF_BYTES_30MS;
}
if (!getTransSource())
return;
ref();
m_data += data;
DataBlock outdata;
int frames,consumed;
@ -175,11 +177,13 @@ void iLBCCodec::Consume(const DataBlock& data, unsigned long tStamp)
XDebug("iLBCCodec",DebugAll,"%scoding %d frames of %d input bytes (consumed %d) in %d output bytes",
m_encoding ? "en" : "de",frames,m_data.length(),consumed,outdata.length());
unsigned long len = 0;
if (frames) {
m_data.cut(-consumed);
getTransSource()->Forward(outdata,tStamp);
len = getTransSource()->Forward(outdata,tStamp,flags);
}
deref();
return len;
}
iLBCPlugin::iLBCPlugin()

View File

@ -43,7 +43,7 @@ public:
virtual ~MuxConsumer()
{}
// Send data to the owner, if any
virtual void Consume(const DataBlock& data, unsigned long tStamp);
virtual unsigned long Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags);
protected:
virtual void destroyed();
private:
@ -198,10 +198,13 @@ MuxConsumer::MuxConsumer(MuxSource* owner, unsigned int chan, const char* format
DDebug(ENABLER,DebugAll,"MuxConsumer(%s,%u) created [%p]",OWNERNAME,m_channel,this);
}
void MuxConsumer::Consume(const DataBlock& data, unsigned long tStamp)
unsigned long MuxConsumer::Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags)
{
if (m_owner)
if (m_owner) {
m_owner->consume(*this,data,tStamp);
return invalidStamp();
}
return 0;
}
void MuxConsumer::destroyed()

View File

@ -42,7 +42,7 @@ public:
virtual ~ADConsumer()
{}
// Process received data
virtual void Consume(const DataBlock& data, unsigned long tStamp);
virtual unsigned long Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags);
// Remove from module's consumer list
virtual void destroyed();
protected:
@ -147,16 +147,17 @@ ADConsumer::ADConsumer(const String& id, const char* notify)
}
// Process received data
void ADConsumer::Consume(const DataBlock& data, unsigned long tStamp)
unsigned long ADConsumer::Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags)
{
if (m_terminated)
return;
return 0;
m_terminated = !process(data);
if (!m_terminated)
return;
return invalidStamp();
DDebug(&plugin,DebugAll,"Terminated %s targetid=%s [%p]",
m_id.c_str(),m_targetid.c_str(),this);
Engine::enqueue(chanNotify("terminate","reason",getTerminateReason()));
return invalidStamp();
}
// Remove from module's consumer list

View File

@ -50,7 +50,7 @@ public:
MrcpConsumer(const String& id, const char* target, const char* format = 0);
virtual ~MrcpConsumer();
virtual bool setFormat(const DataFormat& format);
virtual void Consume(const DataBlock& data, unsigned long tStamp);
virtual unsigned long Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags);
bool init(Message& msg);
private:
void cleanup();
@ -168,10 +168,9 @@ bool MrcpConsumer::setFormat(const DataFormat& format)
return m_source && m_source->setFormat(format);
}
void MrcpConsumer::Consume(const DataBlock& data, unsigned long timeDelta)
unsigned long MrcpConsumer::Consume(const DataBlock& data, unsigned long timeDelta, unsigned long flags)
{
if (m_source)
m_source->Forward(data,timeDelta);
return m_source ? m_source->Forward(data,timeDelta,flags) : 0;
}

View File

@ -386,8 +386,8 @@ public:
virtual ~TdmConsumer();
inline void changeFormat(const char* format)
{ m_format = format; }
virtual void Consume(const DataBlock& data, unsigned long tStamp)
{ if (m_circuit) m_circuit->consume(data); }
virtual unsigned long Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags)
{ if (m_circuit) m_circuit->consume(data); return invalidStamp(); }
private:
TdmCircuit* m_circuit;
String m_address;

View File

@ -303,7 +303,7 @@ public:
virtual ~WpConsumer();
inline void changeFormat(const char* format)
{ m_format = format; }
virtual void Consume(const DataBlock& data, unsigned long tStamp);
virtual unsigned long Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags);
protected:
WpCircuit* m_owner; // B-channel owning this consumer
u_int32_t m_errorCount; // The number of times the fifo was full
@ -1123,7 +1123,7 @@ WpConsumer::~WpConsumer()
}
// Put data in fifo buffer
void WpConsumer::Consume(const DataBlock& data, unsigned long tStamp)
unsigned long WpConsumer::Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags)
{
unsigned int err = put((const unsigned char*)data.data(),data.length());
if (err) {
@ -1131,6 +1131,7 @@ void WpConsumer::Consume(const DataBlock& data, unsigned long tStamp)
m_errorBytes += err;
}
m_total += data.length();
return invalidStamp();
}
/**

View File

@ -213,7 +213,7 @@ public:
virtual ~WpConsumer();
inline void changeFormat(const char* format)
{ m_format = format; }
virtual void Consume(const DataBlock& data, unsigned long tStamp);
virtual unsigned long Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags);
protected:
WpCircuit* m_owner; // B-channel owning this consumer
u_int32_t m_errorCount; // The number of times the fifo was full
@ -792,7 +792,7 @@ WpConsumer::~WpConsumer()
}
// Put data in fifo buffer
void WpConsumer::Consume(const DataBlock& data, unsigned long tStamp)
unsigned long WpConsumer::Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags)
{
unsigned int err = put((const unsigned char*)data.data(),data.length());
if (err) {
@ -800,6 +800,7 @@ void WpConsumer::Consume(const DataBlock& data, unsigned long tStamp)
m_errorBytes += err;
}
m_total += data.length();
return invalidStamp();
}
/**

View File

@ -415,7 +415,7 @@ class SigConsumerMux : public DataConsumer
public:
virtual ~SigConsumerMux()
{ }
virtual void Consume(const DataBlock& data, unsigned long tStamp);
virtual unsigned long Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags);
protected:
inline SigConsumerMux(SigSourceMux* owner, bool first, const char* format)
: DataConsumer(format), m_owner(owner), m_first(first)
@ -2529,10 +2529,13 @@ void SigIsdnMonitor::release()
/**
* SigConsumerMux
*/
void SigConsumerMux::Consume(const DataBlock& data, unsigned long tStamp)
unsigned long SigConsumerMux::Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags)
{
if (m_owner)
if (m_owner) {
m_owner->consume(m_first,data,tStamp);
return invalidStamp();
}
return 0;
}
/**

View File

@ -551,8 +551,8 @@ public:
virtual ~ZapConsumer();
inline void changeFormat(const char* format)
{ m_format = format; }
virtual void Consume(const DataBlock& data, unsigned long tStamp)
{ if (m_circuit) m_circuit->consume(data); }
virtual unsigned long Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags)
{ if (m_circuit) m_circuit->consume(data); return invalidStamp(); }
private:
ZapCircuit* m_circuit;
String m_address;

View File

@ -146,7 +146,7 @@ class SpeexCodec : public DataTranslator
public:
SpeexCodec(const char* sFormat, const char* dFormat, bool encoding, int type, int mode);
~SpeexCodec();
virtual void Consume(const DataBlock& data, unsigned long tStamp);
virtual unsigned long Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags);
private:
bool m_encoding;
DataBlock m_data;
@ -246,12 +246,12 @@ SpeexCodec::~SpeexCodec()
s_cmutex.unlock();
}
void SpeexCodec::Consume(const DataBlock& data, unsigned long tStamp)
unsigned long SpeexCodec::Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags)
{
if (!(m_state && m_bits && getTransSource()))
return;
return 0;
if (!ref())
return;
return 0;
m_data += data;
DataBlock outdata;
@ -321,12 +321,14 @@ void SpeexCodec::Consume(const DataBlock& data, unsigned long tStamp)
"%scoding %d frames of %d input bytes (consumed %d) in %d output bytes, frame size %d, time %lu, ret %d",
m_encoding ? "en" : "de", frames, m_data.length(), consumed, outdata.length(), frame_size, tStamp, ret);
unsigned long len = 0;
if (frames) {
m_data.cut(-(int)consumed);
getTransSource()->Forward(outdata, tStamp);
len = getTransSource()->Forward(outdata, tStamp, flags);
}
deref();
return len;
}
SpeexPlugin::SpeexPlugin()

View File

@ -96,7 +96,7 @@ public:
};
ToneConsumer(const String& id, const String& name);
virtual ~ToneConsumer();
virtual void Consume(const DataBlock& data, unsigned long tStamp);
virtual unsigned long Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags);
virtual const String& toString() const
{ return m_name; }
inline const String& id() const
@ -390,16 +390,16 @@ void ToneConsumer::checkFax()
}
// Feed samples to the filter(s)
void ToneConsumer::Consume(const DataBlock& data, unsigned long timeDelta)
unsigned long ToneConsumer::Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags)
{
unsigned int samp = data.length() / 2;
if (m_mode != Mono)
samp /= 2;
if (!samp)
return;
return 0;
const int16_t* s = (const int16_t*)data.data();
if (!s)
return;
return 0;
while (samp--) {
m_xv[0] = m_xv[1]; m_xv[1] = m_xv[2];
switch (m_mode) {
@ -450,6 +450,7 @@ void ToneConsumer::Consume(const DataBlock& data, unsigned long timeDelta)
}
XDebug(&plugin,DebugAll,"Fax detector on %s: signal=%0.1f, total=%0.1f",
m_id.c_str(),m_fax.value(),m_pwr);
return invalidStamp();
}
// Copy parameters required for automatic fax call diversion

View File

@ -75,7 +75,7 @@ public:
WaveConsumer(const String& file, CallEndpoint* chan, unsigned maxlen, const char* format = 0, const NamedString* param = 0);
~WaveConsumer();
virtual bool setFormat(const DataFormat& format);
virtual void Consume(const DataBlock& data, unsigned long tStamp);
virtual unsigned long Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags);
inline void setNotify(const String& id)
{ m_id = id; }
private:
@ -633,7 +633,7 @@ bool WaveConsumer::setFormat(const DataFormat& format)
return false;
}
void WaveConsumer::Consume(const DataBlock& data, unsigned long tStamp)
unsigned long WaveConsumer::Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags)
{
if (!data.null()) {
if (!m_time)
@ -675,7 +675,9 @@ void WaveConsumer::Consume(const DataBlock& data, unsigned long tStamp)
disc->init();
}
}
return invalidStamp();
}
return 0;
}

View File

@ -401,7 +401,7 @@ class YIAXConsumer : public DataConsumer
public:
YIAXConsumer(YIAXConnection* conn, u_int32_t format, const char* formatText);
~YIAXConsumer();
virtual void Consume(const DataBlock &data, unsigned long tStamp);
virtual unsigned long Consume(const DataBlock &data, unsigned long tStamp, unsigned long flags);
private:
YIAXConnection* m_connection;
unsigned m_total;
@ -1353,13 +1353,16 @@ YIAXConsumer::~YIAXConsumer()
{
}
void YIAXConsumer::Consume(const DataBlock& data, unsigned long tStamp)
unsigned long YIAXConsumer::Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags)
{
if (m_connection && !m_connection->mutedOut()) {
m_total += data.length();
if (m_connection->transaction())
if (m_connection->transaction()) {
m_connection->transaction()->sendMedia(data,m_format);
return invalidStamp();
}
}
return 0;
}
/**

View File

@ -212,7 +212,7 @@ class YRTPConsumer : public DataConsumer
public:
YRTPConsumer(YRTPWrapper* wrap);
~YRTPConsumer();
virtual void Consume(const DataBlock &data, unsigned long tStamp);
virtual unsigned long Consume(const DataBlock &data, unsigned long tStamp, unsigned long flags);
inline void setSplitable()
{ m_splitable = (m_format == "alaw") || (m_format == "mulaw"); }
private:
@ -698,7 +698,7 @@ bool YRTPSession::rtpRecvData(bool marker, unsigned int timestamp, const void* d
// the source will not be destroyed until we reset the busy flag
DataBlock block;
block.assign((void*)data, len, false);
source->Forward(block,timestamp);
source->Forward(block,timestamp,(marker ? DataNode::DataMark : 0));
block.clear(false);
source->busy(false);
return true;
@ -818,10 +818,10 @@ YRTPConsumer::~YRTPConsumer()
}
}
void YRTPConsumer::Consume(const DataBlock &data, unsigned long tStamp)
unsigned long YRTPConsumer::Consume(const DataBlock &data, unsigned long tStamp, unsigned long flags)
{
if (!(m_wrap && m_wrap->bufSize() && m_wrap->rtp()))
return;
return 0;
XDebug(&splugin,DebugAll,"YRTPConsumer writing %d bytes, ts=%lu [%p]",
data.length(),tStamp,this);
unsigned int buf = m_wrap->bufSize();
@ -837,12 +837,15 @@ void YRTPConsumer::Consume(const DataBlock &data, unsigned long tStamp)
sz = buf;
DDebug(&splugin,DebugAll,"Creating %u bytes fragment of %u bytes buffer",sz,len);
}
m_wrap->rtp()->rtpSendData(false,tStamp,ptr,sz);
bool mark = (flags & DataMark) != 0;
flags &= ~DataMark;
m_wrap->rtp()->rtpSendData(mark,tStamp,ptr,sz);
// if timestamp increment is not provided we have to guess...
tStamp += sz;
len -= sz;
ptr += sz;
}
return invalidStamp();
}

View File

@ -1222,7 +1222,7 @@ class YSocksConsumer : public DataConsumer
friend class YSocksWrapper;
public:
YSocksConsumer(YSocksWrapper* w);
virtual void Consume(const DataBlock &data, unsigned long tStamp);
virtual unsigned long Consume(const DataBlock &data, unsigned long tStamp, unsigned long flags);
protected:
// Remove from wrapper. Release memory
virtual void destroyed();
@ -3337,12 +3337,15 @@ YSocksConsumer::YSocksConsumer(YSocksWrapper* w)
m_wrapper ? m_wrapper->toString().c_str() : "",this);
}
void YSocksConsumer::Consume(const DataBlock &data, unsigned long tStamp)
unsigned long YSocksConsumer::Consume(const DataBlock &data, unsigned long tStamp, unsigned long flags)
{
XDebug(m_wrapper,DebugAll,"Sending %u bytes [%p]",data.length(),m_wrapper);
unsigned int sent = data.length();
if (m_wrapper && m_wrapper->state() == YSocksWrapper::Running && m_wrapper->m_conn)
if (m_wrapper && m_wrapper->state() == YSocksWrapper::Running && m_wrapper->m_conn) {
m_wrapper->m_conn->send(data.data(),sent);
return invalidStamp();
}
return 0;
}
// Remove from endpoint. Release memory

View File

@ -292,6 +292,19 @@ private:
class YATE_API DataNode : public RefObject
{
public:
/**
* Flags associated with the DataBlocks forwarded between nodes
*/
enum DataFlags {
DataStart = 0x0001,
DataEnd = 0x0002,
DataMark = 0x0004,
DataSilent = 0x0008,
DataMissed = 0x0010,
DataError = 0x0020,
DataPrivate = 0x0100
};
/**
* Construct a DataNode
* @param format Description of the data format, default none
@ -387,10 +400,13 @@ public:
/**
* Consumes the data sent to it from a source
* @param data The raw data block to process; an empty block ends data
* @param data The raw data block to process
* @param tStamp Timestamp of data - typically samples
* @param flags Indicator flags associated with the data block
* @return Number of samples actually consumed,
* use invalidStamp() to indicate that all data was consumed
*/
virtual void Consume(const DataBlock& data, unsigned long tStamp) = 0;
virtual unsigned long Consume(const DataBlock& data, unsigned long tStamp, unsigned long flags) = 0;
/**
* Get the data source of this object if it's connected
@ -422,7 +438,8 @@ protected:
virtual bool synchronize(DataSource* source);
private:
void Consume(const DataBlock& data, unsigned long tStamp, DataSource* source);
unsigned long Consume(const DataBlock& data, unsigned long tStamp,
unsigned long flags, DataSource* source);
DataSource* m_source;
DataSource* m_override;
long m_regularTsDelta;
@ -460,10 +477,13 @@ public:
/**
* Forwards the data to its consumers
* @param data The raw data block to forward; an empty block ends data
* @param data The raw data block to forward
* @param tStamp Timestamp of data - typically samples
* @param flags Indicator flags associated with the data block
* @return Number of samples actually forwarded to all consumers
*/
void Forward(const DataBlock& data, unsigned long tStamp = invalidStamp());
unsigned long Forward(const DataBlock& data, unsigned long tStamp = invalidStamp(),
unsigned long flags = 0);
/**
* Attach a data consumer