Work in progress

git-svn-id: http://yate.null.ro/svn/yate/trunk@1247 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
marian 2007-04-05 14:04:30 +00:00
parent f356e3a4f1
commit 688cda58a8
2 changed files with 154 additions and 97 deletions

View File

@ -187,7 +187,7 @@ private:
WpSocket m_socket;
WpSigThread* m_thread; // Thread used to read data from socket
bool m_received; // Received data flag
int m_overRead; //
int m_overRead; // Header extension
};
// Read signalling data for WpInterface
@ -424,9 +424,16 @@ int WpSocket::recv(void* buffer, int len, int flags)
m_readError = false;
return r;
}
if (!(m_socket.canRetry() && m_readError))
showError("Read");
m_readError = true;
if (!(m_socket.canRetry() || m_readError)) {
const char* info = 0;
#ifdef SIOC_WANPIPE_SOCK_STATE
r == ::ioctl(m_socket.handle(),SIOC_WANPIPE_SOCK_STATE,0);
if (r == -1)
info = " (IOCTL failed: data link may be disconnected)";
#endif
showError("Read",info);
m_readError = true;
}
return -1;
}
@ -449,10 +456,10 @@ int WpSocket::send(const void* buffer, int len, int flags)
return -1;
}
// Check socket state
// Check events and socket availability
bool WpSocket::select(unsigned int multiplier)
{
m_canRead = m_event = false;
m_canRead = m_event = false;
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = multiplier * WPSOCKET_SELECT_TIMEOUT;
@ -813,9 +820,10 @@ bool WpCircuit::status(Status newStat, bool sync)
if (SignallingCircuit::status() == Connected)
enableData = true;
// Don't put this message for final states
DDebug(group(),DebugAll,
"WpCircuit %u. Changed status to '%u'. %s data transfer [%p]",
code(),newStat,enableData ? "Enable" : "Disable",this);
if (!Engine::exiting())
DDebug(group(),DebugAll,
"WpCircuit %u. Changed status to '%u'. %s data transfer [%p]",
code(),newStat,enableData ? "Enable" : "Disable",this);
if (enableData) {
m_sourceValid = m_source;
m_consumerValid = m_consumer;
@ -985,7 +993,7 @@ bool WpData::init(NamedList& params)
m_samples = 50;
}
else if (type == "T1") {
m_chans = 23;
m_chans = 24;
if (cics.null())
cics = "1-23";
if (!m_samples)
@ -1137,6 +1145,9 @@ void WpData::run()
m_bufferLen = WP_HEADER + m_samples * m_count;
m_buffer = new unsigned char[m_bufferLen];
}
XDebug(m_group,DebugInfo,
"WpData('%s'). Running: circuits=%u, buffer=%u, samples=%u [%p]",
id().safe(),m_count,m_bufferLen,m_samples,this);
while (true) {
if (Thread::check(true))
break;
@ -1184,7 +1195,7 @@ void WpData::run()
// Check for received event (including in-band events)
bool WpData::readEvent()
{
DDebug(m_group,DebugInfo,"WpData('%s'). Got event. Checking OOB [%p]",
XDebug(m_group,DebugInfo,"WpData('%s'). Got event. Checking OOB [%p]",
id().safe(),this);
int r = m_socket.recv(m_buffer,m_bufferLen,MSG_OOB);
if (r >= WP_HEADER)

View File

@ -43,6 +43,7 @@ class SigSourceMux; // A data source multiplexer with 2 cha
class SigIsdnCallRecord; // Record an ISDN call monitor
class SigLinkThread; // Get events and check timeout for links that have a call controller
// The signalling channel
class SigChannel : public Channel
{
public:
@ -131,6 +132,8 @@ private:
Mutex m_linksMutex; // Link list operations
};
// Named list containing creator data (pointers)
// Used to pass parameters to objects that need to obtain some pointers
class SigParams : public NamedList
{
public:
@ -142,6 +145,7 @@ private:
SignallingCircuitGroup* m_cicGroup;
};
// Used to create a signalling circuit group descendant to set the debug name
class SigCircuitGroup : public SignallingCircuitGroup
{
public:
@ -163,7 +167,7 @@ public:
enum Type {
IsdnPriNet,
IsdnPriCpe,
IsdnMonitor,
IsdnPriMon,
Unknown
};
// Set link name and type. Append to plugin list
@ -211,8 +215,8 @@ protected:
bool m_init; // True if already initialized
bool m_inband; // True to send in-band tones through this link
private:
String m_name; // Link name
int m_type; // Link type
String m_name; // Link name
SigLinkThread* m_thread; // Event thread for call controller
};
@ -248,8 +252,8 @@ public:
SigIsdnMonitor(const char* name);
virtual ~SigIsdnMonitor();
virtual void handleEvent(SignallingEvent* event);
unsigned int sample() const
{ return m_sample; }
unsigned int chanBuffer() const
{ return m_chanBuffer; }
unsigned char idleValue() const
{ return m_idleValue; }
// Remove a call and it's call monitor
@ -267,9 +271,9 @@ private:
Mutex m_monitorMutex; // Lock monitor list operations
ObjList m_monitors; // Monitor list
unsigned int m_id; // ID generator
unsigned int m_sample; // The sample length of one channel of a data source multiplexer
unsigned int m_chanBuffer; // The buffer length of one channel of a data source multiplexer
unsigned char m_idleValue; // Idle value for source multiplexer to fill when no data
// Components
ISDNQ921Pasive* m_q921Net;
ISDNQ921Pasive* m_q921Cpe;
SignallingInterface* m_ifaceNet;
@ -300,9 +304,9 @@ class SigSourceMux : public DataSource
{
public:
// Create consumers
// @param sample The number of bytes at which the buffer of a consumer is assumed to be full
// @param idleValue Value to fill missing data when forwarded
SigSourceMux(const char* format, unsigned int sample, unsigned char idleValue);
// @param chanBuffer The length of a channel buffer (will be rounded up to a multiple of sample length)
SigSourceMux(const char* format, unsigned char idleValue, unsigned int chanBuffer);
virtual ~SigSourceMux();
bool hasSource(bool first)
{ return first ? (m_firstSrc != 0) : (m_secondSrc != 0); }
@ -312,22 +316,38 @@ public:
bool attach(bool first, DataSource* source);
// Multiplex received data from consumers and forward it
void consume(bool first, const DataBlock& data, unsigned long tStamp);
// Forward the buffer if at least one channel is filled. Reset data
// If one channel is empty, fill it with idle value
void forwardBuffer();
// Remove the source for the appropriate consumer
void removeSource(bool first);
protected:
// Forward the buffer if at least one channel is filled. Reset data
// If one channel is empty or incomplete, fill it with idle value
inline void forwardBuffer() {
if (!(firstFull() || secondFull()))
return;
fillBuffer(!firstFull());
m_samplesFirst = m_samplesSecond = 0;
Forward(m_buffer);
}
// Fill (interlaced samples) buffer with samples of received data
// If no data, fill the free space with idle value
void fillBuffer(bool first, unsigned char* data = 0, unsigned int samples = 0);
inline bool firstFull() const
{ return m_samplesFirst == m_maxSamples; }
inline bool secondFull() const
{ return m_samplesSecond == m_maxSamples; }
private:
Mutex m_lock; // Lock consumers changes and data processing
unsigned char m_idleValue; // Filling value for missing data
bool m_sampleError; // Flag to show sample length violation error
unsigned int m_sample; // Maximum expected block length for data pushed by consumers
DataSource* m_firstSrc; // First consumer's source
DataSource* m_secondSrc; // Second consumer's source
SigConsumerMux* m_firstChan; // First channel
SigConsumerMux* m_secondChan; // Second channel
unsigned char m_filled; // Filled buffer flags (0: none, 1: first , 2: second, 3: both)
unsigned char m_idleValue; // Filling value for missing data
unsigned int m_sampleLen; // The format sample length
unsigned int m_maxSamples; // Maximum samples in a channel buffer
unsigned int m_samplesFirst; // The number of samples in first channel's buffer
unsigned int m_samplesSecond; // The number of samples in second channel's buffer
DataBlock m_buffer; // Multiplex buffer
unsigned int m_error; // Flag to show data length violation error
};
// Record an ISDN call monitor
@ -656,7 +676,6 @@ void SigChannel::callRejected(const char* error, const char* reason, const Messa
{
DDebug(this,DebugCall,"callRejected. Error: '%s'. Reason: '%s' [%p]",
error,reason,this);
Channel::callRejected(error,reason,msg);
m_reason = error ? error : (reason ? reason : "unknown");
hangup();
}
@ -670,7 +689,7 @@ void SigChannel::disconnected(bool final, const char* reason)
void SigChannel::hangup(const char* reason, bool reject)
{
m_callMutex.lock();
Lock lock(m_callMutex);
if (m_hungup)
return;
setSource();
@ -691,7 +710,7 @@ void SigChannel::hangup(const char* reason, bool reject)
m_call->deref();
m_call = 0;
}
m_callMutex.unlock();
lock.drop();
Message* m = message("chan.hangup",true);
m->setParam("status","hangup");
m->setParam("reason",m_reason);
@ -955,9 +974,8 @@ SigLink* SigDriver::findLink(const char* name, bool callCtrl)
if (!name)
return 0;
Lock lock(m_linksMutex);
ObjList* obj = m_links.skipNull();
for (; obj; obj = obj->skipNext()) {
SigLink* link = static_cast<SigLink*>(obj->get());
for (ObjList* o = m_links.skipNull(); o; o = o->skipNext()) {
SigLink* link = static_cast<SigLink*>(o->get());
if (link->name() == name) {
if (callCtrl && !link->controller())
return 0;
@ -973,9 +991,8 @@ SigLink* SigDriver::findLink(const SignallingCallControl* ctrl)
if (!ctrl)
return 0;
Lock lock(m_linksMutex);
ObjList* obj = m_links.skipNull();
for (; obj; obj = obj->skipNext()) {
SigLink* link = static_cast<SigLink*>(obj->get());
for (ObjList* o = m_links.skipNull(); o; o = o->skipNext()) {
SigLink* link = static_cast<SigLink*>(o->get());
if (link->controller() == ctrl)
return link;
}
@ -1048,6 +1065,7 @@ void SigDriver::initialize()
installRelay(Update);
installRelay(Route);
m_engine = new SignallingEngine;
m_engine->debugChain(this);
m_engine->start();
}
// Get stacks
@ -1066,7 +1084,7 @@ void SigDriver::initialize()
switch (type) {
case SigLink::IsdnPriNet:
case SigLink::IsdnPriCpe:
case SigLink::IsdnMonitor:
case SigLink::IsdnPriMon:
break;
default:
if (stype)
@ -1087,7 +1105,7 @@ void SigDriver::initialize()
case SigLink::IsdnPriCpe:
if (!link)
link = new SigIsdn(*sect,type == SigLink::IsdnPriNet);
case SigLink::IsdnMonitor:
case SigLink::IsdnPriMon:
if (!link)
link = new SigIsdnMonitor(*sect);
break;
@ -1122,7 +1140,7 @@ void* SigParams::getObject(const String& name) const
TokenDict SigLink::s_type[] = {
{"isdn-pri-net", IsdnPriNet},
{"isdn-pri-cpe", IsdnPriCpe},
{"isdn-monitor", IsdnMonitor},
{"isdn-pri-mon", IsdnPriMon},
{0,0}
};
@ -1130,8 +1148,8 @@ SigLink::SigLink(const char* name, Type type)
: m_controller(0),
m_init(false),
m_inband(false),
m_name(name),
m_type(type),
m_name(name),
m_thread(0)
{
plugin.appendLink(this);
@ -1181,6 +1199,7 @@ bool SigLink::startThread()
return ok;
}
// Build a signalling interface for this link
SignallingInterface* SigLink::buildInterface(const String& device,
const String& debugName, String& error)
{
@ -1198,6 +1217,7 @@ SignallingInterface* SigLink::buildInterface(const String& device,
return 0;
}
// Build a signalling circuit for this link
SigCircuitGroup* SigLink::buildCircuits(const String& device,
const String& debugName, String& error)
{
@ -1305,8 +1325,7 @@ bool SigIsdn::reload(NamedList& params)
{
if (!m_init)
return false;
DDebug(&plugin,DebugAll,"SigIsdn('%s'). Reloading [%p]",
name().c_str(),this);
DDebug(&plugin,DebugAll,"SigIsdn('%s'). Reloading [%p]",name().c_str(),this);
if (q931())
q931()->setDebug(params.getBoolValue("print-layer3PDU",false),
params.getBoolValue("extended-debug",false));
@ -1351,10 +1370,10 @@ void SigIsdn::release()
* SigIsdnMonitor
*/
SigIsdnMonitor::SigIsdnMonitor(const char* name)
: SigLink(name,IsdnMonitor),
: SigLink(name,IsdnPriMon),
m_monitorMutex(true),
m_id(0),
m_sample(160),
m_chanBuffer(160),
m_idleValue(255),
m_q921Net(0), m_q921Cpe(0), m_ifaceNet(0), m_ifaceCpe(0), m_groupNet(0), m_groupCpe(0)
{
@ -1447,9 +1466,9 @@ bool SigIsdnMonitor::create(NamedList& params)
break;
}
m_sample = params.getIntValue("sample",160);
if (!m_sample)
m_sample = 160;
m_chanBuffer = params.getIntValue("muxchanbuffer",160);
if (!m_chanBuffer)
m_chanBuffer = 160;
m_idleValue = params.getIntValue("idlevalue");
// Signalling interfaces
@ -1515,8 +1534,8 @@ bool SigIsdnMonitor::create(NamedList& params)
if (error.null()) {
if (debugAt(DebugInfo)) {
String tmp;
tmp << "\r\nSample size: " << m_sample;
tmp << "\r\nIdle fill value: " << (int)m_idleValue;
tmp << "\r\nChannel buffer: " << m_chanBuffer;
tmp << "\r\nIdle value: " << (int)m_idleValue;
Debug(&plugin,DebugInfo,"SigIsdnMonitor('%s'). Initialized: [%p]%s",
name().c_str(),this,tmp.c_str());
}
@ -1531,8 +1550,7 @@ bool SigIsdnMonitor::reload(NamedList& params)
{
if (!m_init)
return false;
DDebug(&plugin,DebugAll,"SigIsdnMonitor('%s'). Reloading [%p]",
name().c_str(),this);
DDebug(&plugin,DebugAll,"SigIsdnMonitor('%s'). Reloading [%p]",name().c_str(),this);
if (q931())
q931()->setDebug(params.getBoolValue("print-layer3PDU",false),
params.getBoolValue("extended-debug",false));
@ -1604,24 +1622,34 @@ void SigConsumerMux::Consume(const DataBlock& data, unsigned long tStamp)
/**
* SigSourceMux
*/
SigSourceMux::SigSourceMux(const char* format, unsigned int sample, unsigned char idleValue)
SigSourceMux::SigSourceMux(const char* format, unsigned char idleValue, unsigned int chanBuffer)
: DataSource(format),
m_lock(true),
m_idleValue(idleValue),
m_sampleError(false),
m_sample(sample),
m_firstSrc(0),
m_secondSrc(0),
m_firstChan(0),
m_secondChan(0),
m_filled(0)
m_idleValue(idleValue),
m_sampleLen(1),
m_maxSamples(0),
m_samplesFirst(0),
m_samplesSecond(0),
m_error(0)
{
XDebug(&plugin,DebugAll,"SigSourceMux::SigSourceMux() [%p]",this);
if (!m_sample)
m_sample = 1;
m_buffer.assign(0,2 * m_sample,false);
if (getFormat() == "mulaw")
m_sampleLen = 2;
// Adjust channel buffer to be multiple of sample length and not lesser then it
if (chanBuffer < m_sampleLen)
chanBuffer = m_sampleLen;
if (0 != (chanBuffer % m_sampleLen))
chanBuffer = chanBuffer / m_sampleLen + m_sampleLen;
m_maxSamples = chanBuffer / m_sampleLen;
m_buffer.assign(0,2 * chanBuffer,false);
m_firstChan = new SigConsumerMux(this,true,format);
m_secondChan = new SigConsumerMux(this,false,format);
XDebug(&plugin,DebugAll,
"SigSourceMux::SigSourceMux(). Max samples=%u, sample=%u [%p]",
m_maxSamples,m_sampleLen,this);
}
SigSourceMux::~SigSourceMux()
@ -1658,59 +1686,77 @@ bool SigSourceMux::attach(bool first, DataSource* source)
// Multiplex received data from consumers and forward it
// Forward multiplexed buffer if chan already filled
// If received data is not greater then expected channel sample:
// If received data is not greater then free space:
// Fill chan buffer with data
// If received data length is lesser then expected channel sample,
// fill the rest of the buffer with idle value
// If all channels are filled, forward the multiplexed buffer
// If both channels are filled, forward the multiplexed buffer
// Otherwise:
// Fill chan buffer with m_sample data
// Forward data and consume the rest
// Fill free chan buffer
// Forward buffer and consume the rest
void SigSourceMux::consume(bool first, const DataBlock& data, unsigned long tStamp)
{
Lock lock(m_lock);
if (data.length() > m_sample && !m_sampleError) {
unsigned int samples = data.length() / m_sampleLen;
if (!m_error && (data.length() % m_sampleLen)) {
Debug(&plugin,DebugWarn,
"SigSourceMux::consume(). Received data length %u > %u from channel %c [%p]",
data.length(),m_sample,first ? '1' : '2',this);
m_sampleError = true;
"SigSourceMux. Wrong sample (data length %u) from %s channel [%p]",
data.length(),first ? "first" : "second",this);
m_error++;
}
// Check if already filled. Set filled flag
if ((first && (m_filled & 1)) || (!first && (m_filled & 2)))
if (!samples)
return;
// Forward buffer if already filled for this channel
if ((first && firstFull()) || (!first && secondFull()))
forwardBuffer();
unsigned char* buffer = (unsigned char*)m_buffer.data();
if (!first)
buffer += m_sample;
m_filled &= first ? 1 : 2;
// Received data length is lesser then or equal to a sample
if (data.length() <= m_sample) {
::memcpy(buffer,data.data(),data.length());
if (data.length() != m_sample)
::memset(buffer + data.length(),m_idleValue,m_sample - data.length());
if (m_filled == 3)
unsigned int freeSamples = m_maxSamples - (first ? m_samplesFirst: m_samplesSecond);
unsigned char* buf = (unsigned char*)data.data();
if (samples <= freeSamples) {
fillBuffer(first,buf,samples);
if (firstFull() && secondFull())
forwardBuffer();
return;
}
// Received data length is greater then a sample
::memcpy(buffer,data.data(),m_sample);
DataBlock rest((unsigned char*)data.data() + m_sample,data.length() - m_sample);
// Received more samples that free space in buffer
fillBuffer(first,buf,freeSamples);
forwardBuffer();
unsigned int consumed = freeSamples * m_sampleLen;
DataBlock rest(buf + consumed,data.length() - consumed);
consume(first,rest,tStamp);
}
// Forward the buffer if at least one channel is filled. Reset data
// If one channel is empty, fill it with idle value
void SigSourceMux::forwardBuffer()
// Fill interlaced samples buffer with samples of received data
// If no data, fill the free space with idle value
void SigSourceMux::fillBuffer(bool first, unsigned char* data, unsigned int samples)
{
if (!m_filled)
unsigned int* count = (first ? &m_samplesFirst : &m_samplesSecond);
unsigned char* buf = (unsigned char*)m_buffer.data() + *count * m_sampleLen * 2;
// Fill received data
if (data) {
if (samples > m_maxSamples - *count)
samples = m_maxSamples - *count;
*count += samples;
if (m_sampleLen == 1)
for (; samples; samples--, buf += 2, data++)
*buf = *data;
else if (m_sampleLen == 2)
for (; samples; samples--, buf += 4, data += 2) {
*buf = *data;
*(buf + 1) = *(data + 1);
}
return;
if (m_filled < 3) {
unsigned char* data = (unsigned char*)m_buffer.data();
if (!(m_filled & 1))
data += m_sample;
::memset(data,m_idleValue,m_sample);
}
m_filled = 0;
Forward(m_buffer);
// Fill with idle value
samples = m_maxSamples - *count;
*count = m_maxSamples;
if (m_sampleLen == 1)
for (; samples; samples--, buf += 2)
*buf = m_idleValue;
else if (m_sampleLen == 2)
for (; samples; samples--, buf += 4)
*buf = *(buf + 1) = m_idleValue;
}
// Remove the source for the appropriate consumer
@ -1774,8 +1820,8 @@ bool SigIsdnCallRecord::update(SignallingEvent* event)
if (!format)
return true;
source = new SigSourceMux(format,
m_monitor ? m_monitor->sample() : 160,
m_monitor ? m_monitor->idleValue() : 255);
m_monitor ? m_monitor->idleValue() : 255,
m_monitor ? m_monitor->chanBuffer() : 160);
setSource(source);
source->deref();
if (!getSource()) {