/** * mux.cpp * This file is part of the YATE Project http://YATE.null.ro * * Data multiplex * * Yet Another Telephony Engine - a fully featured software PBX and IVR * Copyright (C) 2004-2006 Null Team * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. */ #include #include using namespace TelEngine; namespace { // anonymous class MuxConsumer; // Consumer used to push data a source multiplexer class MuxSource; // A data source multiplexer class MuxModule; // The module class ChanAttachHandler; // chan.attach handler // Consumer used to push data to a MuxSource class MuxConsumer : public DataConsumer { friend class MuxSource; public: MuxConsumer(MuxSource* owner, unsigned int chan, const char* format); virtual ~MuxConsumer() {} // Send data to the owner, if any virtual void Consume(const DataBlock& data, unsigned long tStamp); protected: virtual void destroyed(); private: MuxSource* m_owner; // The owner of this consumer unsigned int m_channel; // The channel allocated by the owner unsigned int m_bufferFilled; // The number of samples in the buffer kept by the owner unsigned int m_overErrors; // Buffer overrun errors }; // A data source multiplexer class MuxSource : public DataSource, public DebugEnabler { friend class MuxConsumer; public: MuxSource(const String& id, const char* targetid, const char* format, const NamedList& params, String& error); virtual ~MuxSource() {} inline const String& id() const { return m_id; } inline const String& targetid() const { return m_targetid; } inline unsigned int channels() { return m_channels; } // Check if a channel's consumer exists and has a source attached inline bool hasSource(unsigned int channel) { return channel < m_channels && m_consumers[channel] && m_consumers[channel]->getConnSource(); } // Set/remove the source for a channel's consumer // channel: The channel whose source will be replaced bool setSource(unsigned int channel, DataSource* source = 0); // Multiplex received data from consumers and forward it void consume(MuxConsumer& cons, const DataBlock& data, unsigned long tStamp); virtual const String& toString() const { return m_id; } protected: // Set/remove a consumer. Fill it's buffer with idle values if removed void setConsumer(unsigned int channel, MuxConsumer* pCons = 0); // Clear consumers list (remove their owner before) virtual void destroyed(); private: // Forward the buffer if at least one channel is filled. Reset data // Fill incomplete channel buffers with idle value before forwarding the data void forwardBuffer(); // Fill (interlaced samples) buffer with samples of received data // If no data, fill the free space with idle value void fillBuffer(unsigned int channel, unsigned int& filled, unsigned char* data = 0, unsigned int samples = 0); Mutex m_lock; // Lock consumers changes and data processing String m_id; // The id wthin this module String m_targetid; // The id of the target (user) MuxConsumer** m_consumers; // The consumers unsigned int m_channels; // The number of consumers unsigned int m_full; // The number of consumers with full buffers unsigned char m_idleValue; // Filling value for missing data unsigned int m_sampleLen; // The format sample length unsigned int m_maxSamples; // Maximum samples in a channel buffer unsigned int m_delta; // Offset to write samples for each channel (m_channels * m_sampleLen) DataBlock m_buffer; // Multiplex buffer unsigned int m_error; // The number of data length violation error }; // The module class MuxModule : public Module { public: MuxModule(); ~MuxModule(); virtual void initialize(); // Respond to a request to attach/change a multiplexer bool chanAttach(Message& msg); // Append source inline void append(MuxSource* src) { if (!src) return; Lock lock(this); m_sources.append(src)->setDelete(false); } // Remove source inline void remove(MuxSource* src) { Lock lock(this); m_sources.remove(src,false); } protected: virtual void statusParams(String& str); virtual void statusDetail(String& str); private: bool m_first; // First init flag String m_prefix; // Module's prefix (name/) unsigned int m_id; // Next sources's id ObjList m_sources; // Source list }; // chan.attach handler class ChanAttachHandler : public MessageHandler { public: inline ChanAttachHandler() : MessageHandler("chan.attach",100) {} virtual bool received(Message& msg); }; /** * Module data and function */ static Configuration s_cfg; // Configuration file static unsigned int s_chanBuffer; // The buffer length of one channel of a data source multiplexer static unsigned char s_idleValue; // Idle value for source multiplexer to fill when no data static String s_defFormat; // Default format for MuxSource static MuxModule plugin; // Dictionary containig the supported formats and sample lengths static TokenDict s_dictSampleLen[] = { {"mulaw", 1}, {"alaw", 1}, {"slin", 2}, {0,0}, }; // Request a data source for a channel inline DataSource* getChannelSource(GenObject* target, unsigned int channel) { if (!target) return 0; String chNo; chNo << "DataSource" << channel; GenObject* ret = (GenObject*)target->getObject(chNo); chNo.clear(); return ret ? static_cast(ret->getObject("DataSource")) : 0; } /** * MuxConsumer */ #define ENABLER (m_owner?(DebugEnabler*)m_owner:(DebugEnabler*)&plugin) #define OWNERNAME (m_owner?m_owner->debugName():"(null)") MuxConsumer::MuxConsumer(MuxSource* owner, unsigned int chan, const char* format) : DataConsumer(format), m_owner(owner), m_channel(chan), m_bufferFilled(0), m_overErrors(0) { if (m_owner) m_owner->setConsumer(m_channel,this); DDebug(ENABLER,DebugAll,"MuxConsumer(%s,%u) created [%p]",OWNERNAME,m_channel,this); } void MuxConsumer::Consume(const DataBlock& data, unsigned long tStamp) { if (m_owner) m_owner->consume(*this,data,tStamp); } void MuxConsumer::destroyed() { plugin.lock(); DDebug(ENABLER,DebugAll,"MuxConsumer(%s,%u) destroyed [%p]", OWNERNAME,m_channel,this); if (m_owner) m_owner->setConsumer(m_channel); m_owner = 0; plugin.unlock(); DataConsumer::destroyed(); } #undef ENABLER #undef OWNERNAME /** * MuxSource */ MuxSource::MuxSource(const String& id, const char* targetid, const char* format, const NamedList& params, String& error) : DataSource(format), m_lock(true,"MuxSource::lock"), m_id(id), m_targetid(targetid), m_consumers(0), m_channels(0), m_full(0), m_idleValue(0), m_sampleLen(0), m_maxSamples(0), m_delta(0), m_error(0) { debugName(m_id); debugChain(&plugin); const char* channelFormat = 0; // Set the number of channels and the format while (true) { int pos = getFormat().find("*"); if (pos < 1) break; int channels = getFormat().substr(0,pos).toInteger(); if (channels < 2) break; m_channels = (unsigned int)channels; // Get the format of each channel and the sample length. Skip channel count and '*' from format pos++; if ((unsigned int)pos < getFormat().length()) { channelFormat = getFormat().c_str() + pos; m_sampleLen = lookup(channelFormat,s_dictSampleLen,0); } break; } if (!m_sampleLen) { error << "Unsupported format '" << getFormat() << "'"; return; } m_delta = m_channels * m_sampleLen; m_idleValue = params.getIntValue("idlevalue",s_idleValue); unsigned int chanBuffer = params.getIntValue("chanbuffer",s_chanBuffer); // Adjust channel buffer to be multiple of sample length and not lesser then it if (chanBuffer < m_sampleLen) chanBuffer = m_sampleLen; m_maxSamples = chanBuffer / m_sampleLen; chanBuffer = m_maxSamples * m_sampleLen; m_buffer.assign(0,m_channels * chanBuffer); // Create consumers m_consumers = new MuxConsumer*[m_channels]; ::memset(m_consumers,0,m_channels * sizeof(MuxConsumer*)); for (unsigned int i = 0; i < m_channels; i++) new MuxConsumer(this,i,channelFormat); Debug(this,DebugAll, "Created channels=%u format=%s sample=%u buffer=%u targetid=%s [%p]", m_channels,getFormat().c_str(),m_sampleLen,m_buffer.length(),targetid,this); plugin.append(this); } // Set/remove a consumer. Fill it's buffer with idle value if removed void MuxSource::setConsumer(unsigned int channel, MuxConsumer* pCons) { Lock lock(m_lock); if (!m_consumers || channel >= m_channels || (pCons && pCons->m_owner != this)) return; MuxConsumer* old = m_consumers[channel]; if (old == pCons) return; plugin.lock(); m_consumers[channel] = 0; if (old) { old->m_overErrors = 0; old->m_owner = 0; } plugin.unlock(); m_consumers[channel] = pCons; if (pCons) Debug(this,DebugAll,"Consumer for channel %u set to (%p) [%p]", channel,pCons,this); else { unsigned int tmp = 0; fillBuffer(channel,tmp); Debug(this,DebugAll,"Removed consumer (%p) for channel %u [%p]", old,channel,this); } } // Set/remove the source for a channel's consumer bool MuxSource::setSource(unsigned int channel, DataSource* source) { Lock lock(m_lock); if (channel >= m_channels || !m_consumers[channel]) return false; DataSource* old = m_consumers[channel]->getConnSource(); if (old == source) return true; if (old) { old->detach(m_consumers[channel]); if (!m_consumers[channel]) { Debug(this,DebugNote, "Channel %u consumer vanished after detaching from source (%p) [%p]", channel,old,this); return false; } else Debug(this,DebugAll,"Channel %u detached from source (%p) [%p]",channel,old,this); } if (!source) return true; source->attach(m_consumers[channel]); Debug(this,DebugAll,"Channel %u attached to source (%p) [%p]",channel,source,this); return true; } // Multiplex received data from consumers and forward it // Forward multiplexed buffer if chan already filled // If received data is not greater then free space: // Fill chan buffer with data // If all channels are filled, forward the multiplexed buffer // Otherwise: // Fill free chan buffer // Forward buffer and consume the rest void MuxSource::consume(MuxConsumer& consumer, const DataBlock& data, unsigned long tStamp) { if (!data.length() || consumer.m_owner != this) return; Lock lock(m_lock,100000); if (!(lock.mutex() && alive())) { Debug(this,DebugMild,"Locking failed, dropping %u bytes [%p]",data.length(),this); return; } XDebug(this,DebugAll,"Consuming %u bytes on channel %u [%p]", data.length(),consumer.m_channel,this); if ((data.length() % m_sampleLen) && !m_error) { Debug(this,DebugWarn,"Wrong sample (received %u bytes) on channel %u [%p]", data.length(),consumer.m_channel,this); m_error++; } unsigned int samples = data.length() / m_sampleLen; // Forward buffer if already filled for this channel if (consumer.m_bufferFilled == m_maxSamples) { consumer.m_overErrors++; if (0 == consumer.m_overErrors % 5) DDebug(this,DebugMild,"Buffer overrun on channel %u [%p]", consumer.m_channel,this); forwardBuffer(); } unsigned int freeSamples = m_maxSamples - consumer.m_bufferFilled; unsigned char* buf = (unsigned char*)data.data(); if (samples <= freeSamples) { fillBuffer(consumer.m_channel,consumer.m_bufferFilled,buf,samples); if (m_full == m_channels) forwardBuffer(); XDebug(this,DebugAll,"Consumed %u bytes on channel %u [%p]", samples * m_sampleLen,consumer.m_channel,this); return; } // Received more samples that free space in buffer fillBuffer(consumer.m_channel,consumer.m_bufferFilled,buf,freeSamples); forwardBuffer(); unsigned int consumed = freeSamples * m_sampleLen; Debug(this,DebugAll,"Consumed %u/%u bytes on channel %u [%p]", consumed,data.length(),consumer.m_channel,this); DataBlock rest(buf + consumed,data.length() - consumed,false); consume(consumer,rest,tStamp); rest.clear(false); } void MuxSource::destroyed() { Lock2 lock(plugin,m_lock); plugin.remove(this); for (unsigned int i = 0; i < m_channels; i++) { if (!m_consumers[i]) continue; setSource(i); if (m_consumers[i]->m_overErrors > 10) Debug(this,DebugMild, "Removing consumer on channel %u with %u overrun errors [%p]", i,m_consumers[i]->m_overErrors,this); m_consumers[i]->m_overErrors = 0; m_consumers[i]->m_owner = 0; TelEngine::destruct(m_consumers[i]); } delete[] m_consumers; m_consumers = 0; lock.drop(); if (!m_error) Debug(this,DebugAll,"Destroyed targetid=%s [%p]",m_targetid.c_str(),this); else Debug(this,DebugMild,"Destroyed targetid=%s data length errors=%u [%p]", m_targetid.c_str(),m_error,this); DataSource::destroyed(); } // Forward the buffer if at least one channel is filled. Reset data // Fill incomplete channel buffers with idle value before forwarding the data void MuxSource::forwardBuffer() { if (!m_full) return; // Fill incomplete buffers. Reset data for (unsigned int i = 0; i < m_channels; i++) { if (!m_consumers[i]) continue; if (m_consumers[i]->m_bufferFilled < m_maxSamples) { XDebug(this,DebugAll,"Filling %u idle values on channel %u [%p]", m_sampleLen * (m_maxSamples - m_consumers[i]->m_bufferFilled),i,this); fillBuffer(m_consumers[i]->m_channel,m_consumers[i]->m_bufferFilled); } m_consumers[i]->m_bufferFilled = 0; } m_full = 0; XDebug(this,DebugAll,"Forwarding buffer [%p]",this); Forward(m_buffer); } // Fill interlaced samples buffer with samples of received data // If no data, fill the free space with idle value void MuxSource::fillBuffer(unsigned int channel, unsigned int& filled, unsigned char* data, unsigned int samples) { unsigned char* buf = (unsigned char*)m_buffer.data(); buf += m_sampleLen * (channel + filled * m_channels); // Fill received data if (data) { if (samples > m_maxSamples - filled) samples = m_maxSamples - filled; filled += samples; if (filled == m_maxSamples) m_full++; switch (m_sampleLen) { case 1: for (; samples; samples--, buf += m_delta) *buf = *data++; break; case 2: for (; samples; samples--, buf += m_delta) { buf[0] = *data++; buf[1] = *data++; } break; default: for (; samples; samples--, buf += m_delta, data += m_sampleLen) ::memcpy(buf,data,m_sampleLen); } return; } // Fill with idle value samples = m_maxSamples - filled; filled = m_maxSamples; m_full++; switch (m_sampleLen) { case 1: for (; samples; samples--, buf += m_delta) *buf = m_idleValue; break; case 2: for (; samples; samples--, buf += m_delta) buf[0] = buf[1] = m_idleValue; break; default: for (; samples; samples--, buf += m_delta, data += m_sampleLen) ::memset(buf,m_idleValue,m_sampleLen); } } /** * MuxModule * Early init, late cleanup since we provide services to other modules */ MuxModule::MuxModule() : Module("mux","misc",true), m_first(true), m_id(1) { Output("Loaded module MUX"); m_prefix << debugName() << "/"; } MuxModule::~MuxModule() { Output("Unloading module MUX"); } void MuxModule::initialize() { Output("Initializing module MUX"); s_cfg = Engine::configFile("mux"); s_cfg.load(); // Startup if (m_first) { setup(); Engine::install(new ChanAttachHandler); } s_chanBuffer = s_cfg.getIntValue("general","chanbuffer",160); if (s_chanBuffer < 1) s_chanBuffer = 1; unsigned int ui = s_cfg.getIntValue("general","idlevalue",255); s_idleValue = (ui <= 255 ? ui : 255); const char* format = s_cfg.getValue("general","format"); if (!lookup(format,s_dictSampleLen)) format = "alaw"; s_defFormat.clear(); s_defFormat << "2*" << format; m_first = false; } // Respond to a request to attach/change a multiplexer bool MuxModule::chanAttach(Message& msg) { String id = msg.getValue("source"); if (Engine::exiting() || !id.startSkip(m_prefix,false)) return false; GenObject* sender = msg.userData(); if (!sender) { msg.setParam("error","No userdata"); return false; } MuxSource* src = 0; String error; const char* targetid = msg.getValue("notify"); // Check if should fail on channel source attach failure bool failOne = msg.getBoolValue("fail",false); if (!id) { plugin.lock(); id << m_prefix << m_id++; plugin.unlock(); src = new MuxSource(id,targetid,msg.getValue("format",s_defFormat),msg,error); // Set channel sources if (!error) { unsigned int count = 0; for (unsigned int channel = 0; channel < src->channels(); channel++) { if (src->setSource(channel,getChannelSource(sender,channel))) count++; else if (failOne) { error << "Attach failure on channel " << channel; break; } } if (!error && !count && msg.getBoolValue("failempty",false)) error = "Attach failure on all channels"; } if (!error) { msg.userData(src); msg.setParam("id",src->debugName()); } } else { Lock lock(plugin); ObjList* o = m_sources.find(id); src = o ? static_cast(o->get()) : 0; if (!(src && src->ref())) return false; lock.drop(); id = src->debugName(); // Modify sources unsigned int n = msg.count(); for (unsigned int i = 0; i < n; i++) { NamedString* ns = msg.getParam(i); if (!(ns && ns->name() == "channel")) continue; unsigned int channel = ns->toInteger(src->channels()); if (channel < src->channels() && src->setSource(channel,getChannelSource(sender,channel))) continue; if (failOne) { if (channel >= src->channels()) error << "Invalid channel=" << *ns; else error << "Attach failure on channel " << channel; break; } } } if (error) { Debug(this,DebugNote,"MuxSource failure id=%s targetid=%s error='%s'", id.c_str(),targetid,error.c_str()); msg.setParam("error",error); } TelEngine::destruct(src); return !error; } void MuxModule::statusParams(String& str) { Module::statusParams(str); str.append("count=",",") << m_sources.count() << ",format=channels|targetid"; } void MuxModule::statusDetail(String& str) { Module::statusDetail(str); for (ObjList* o = m_sources.skipNull(); o; o = o->skipNext()) { MuxSource* s = static_cast(o->get()); str.append(s->id(),",") << "=" << s->channels() << "|" << s->targetid(); } } /** * ChanAttachHandler */ bool ChanAttachHandler::received(Message& msg) { return plugin.chanAttach(msg); } }; // anonymous namespace /* vi: set ts=8 sw=4 sts=4 noet: */