zapcard: Fix consume erros in ZapCircuit

Patch written by mtelegin, rebased onto modern yate

Bug report:
https://web.archive.org/web/20220819114119/http://yate.null.ro/mantis/view.php?id=143

During the call between SIP phone and phone connected to FXS port on Digium TDM400P has been occured a lot of clicks on FXS port. Log files contains messages like "Buffer overrun ... Resource temporarily unavailable" and "Consumer errors: 29. Lost: 13920/262320". The fix is applied in patch. Tested only on Digium analog card TDM400P
This commit is contained in:
Harald Welte 2023-12-03 13:26:47 +00:00
parent 099b17459c
commit a66e3dd9de
1 changed files with 99 additions and 54 deletions

View File

@ -408,6 +408,8 @@ public:
{ return m_alarmsText; }
inline bool canRead() const
{ return m_canRead; }
inline bool canWrite() const
{ return m_canWrite; }
inline bool event() const
{ return m_event || m_savedEvent; }
inline const char* zapDevName() const
@ -460,7 +462,7 @@ public:
// Flush read and write buffers
bool flushBuffers(FlushTarget target = FlushAll);
// Check if received data. Wait usec microseconds before returning
bool select(unsigned int usec);
bool select(unsigned int usec, bool read, bool write);
// Receive data. Return -1 on error or the number of bytes read
// If -1 is returned, the caller should check if m_event is set
int recv(void* buffer, int len);
@ -501,11 +503,13 @@ private:
int m_savedEvent; // Event saved asynchronously for later
String m_alarmsText; // Alarms text
bool m_canRead; // True if there is data to read
bool m_canWrite; // True if we can write data to the device
bool m_event; // True if an event occurred when recv/select
bool m_readError; // Flag used to print read errors
bool m_writeError; // Flag used to print write errors
bool m_selectError; // Flag used to print select errors
fd_set m_rdfds;
fd_set m_wrfds;
fd_set m_errfds;
struct timeval m_tv;
};
@ -599,7 +603,7 @@ public:
virtual bool getParam(const String& param, String& value) const;
// Get this circuit or source/consumer
virtual void* getObject(const String& name) const;
// Process incoming data
// Process incoming and outgoing data
virtual bool process();
// Send an event
virtual bool sendEvent(SignallingCircuitEvent::Type type, NamedList* params = 0);
@ -623,6 +627,15 @@ protected:
bool enqueueEvent(int event, SignallingCircuitEvent::Type type);
// Enqueue received digits
bool enqueueDigit(bool tone, char digit);
// Process incoming and outgoing data
bool processData();
// Receive incoming data into the source buffer
bool recvSourceBuffer();
// Send consumer buffer. Stop on error
void sendConsBuffer();
// Check whether data consumer buffer are ready for device
inline const bool consReady() const
{ return m_consBuffer.length() >= m_buflen; }
ZapDevice m_device; // The device
ZapDevice::Type m_type; // Circuit type
@ -641,6 +654,8 @@ protected:
ZapConsumer* m_consumer; // The data consumer
DataBlock m_sourceBuffer; // Data source buffer
DataBlock m_consBuffer; // Data consumer buffer
Mutex m_consMutex; // Data consumer buffer mutex
bool m_consReady; // Data in consumer buffer are ready for device (avoiding mutex lock)
unsigned int m_buflen; // Data block length
unsigned int m_consBufMax; // Max consumer buffer length
unsigned int m_consErrors; // Consumer. Total number of send failures
@ -672,7 +687,7 @@ public:
virtual bool setParam(const String& param, const String& value);
// Send an event
virtual bool sendEvent(SignallingCircuitEvent::Type type, NamedList* params = 0);
// Process incoming data
// Process incoming and outgoing data, poll hook
virtual bool process();
protected:
// Process additional events. Return false if not processed
@ -1008,6 +1023,7 @@ ZapDevice::ZapDevice(Type t, SignallingComponent* dbg, unsigned int chan,
m_rxHookSig(-1),
m_savedEvent(0),
m_canRead(false),
m_canWrite(false),
m_event(false),
m_readError(false),
m_writeError(false),
@ -1037,6 +1053,7 @@ ZapDevice::ZapDevice(unsigned int chan, bool disableDbg, bool open)
m_rxHookSig(-1),
m_savedEvent(0),
m_canRead(false),
m_canWrite(false),
m_event(false),
m_readError(false),
m_writeError(false),
@ -1451,18 +1468,23 @@ bool ZapDevice::flushBuffers(FlushTarget target)
}
// Check if received data. Wait usec microseconds before returning
bool ZapDevice::select(unsigned int usec)
bool ZapDevice::select(unsigned int usec, bool read, bool write)
{
FD_ZERO(&m_rdfds);
FD_SET(m_handle, &m_rdfds);
if (read)
FD_SET(m_handle, &m_rdfds);
FD_ZERO(&m_wrfds);
if (write)
FD_SET(m_handle, &m_wrfds);
FD_ZERO(&m_errfds);
FD_SET(m_handle, &m_errfds);
m_tv.tv_sec = 0;
m_tv.tv_usec = usec;
int sel = ::select(m_handle+1,&m_rdfds,NULL,&m_errfds,&m_tv);
int sel = ::select(m_handle+1,&m_rdfds,&m_wrfds,&m_errfds,&m_tv);
if (sel >= 0) {
m_event = FD_ISSET(m_handle,&m_errfds);
m_canRead = FD_ISSET(m_handle,&m_rdfds);
m_canWrite = FD_ISSET(m_handle,&m_wrfds);
m_selectError = false;
return true;
}
@ -1921,7 +1943,7 @@ bool ZapInterface::init(ZapDevice::Type type, unsigned int code, unsigned int ch
// Process incoming data
bool ZapInterface::process()
{
if (!m_device.select(100))
if (!m_device.select(100,true,false))
return false;
if (!m_device.canRead()) {
if (m_device.event())
@ -2226,6 +2248,7 @@ ZapCircuit::ZapCircuit(ZapDevice::Type type, unsigned int code, unsigned int cha
m_priority(Thread::Normal),
m_source(0),
m_consumer(0),
m_consReady(false),
m_buflen(0),
m_consBufMax(0),
m_consErrors(0),
@ -2494,36 +2517,77 @@ void* ZapCircuit::getObject(const String& name) const
return SignallingCircuit::getObject(name);
}
// Process incoming data
// Process incoming and outgoing data
bool ZapCircuit::process()
{
if (!(m_device.valid() && SignallingCircuit::status() == Connected))
return false;
return processData();
}
// Process incoming and outgoing data
bool ZapCircuit::processData()
{
s_sourceAccessMutex.lock();
RefPointer<ZapSource> src = m_source;
s_sourceAccessMutex.unlock();
if (!(m_device.valid() && SignallingCircuit::status() == Connected && src))
bool sourceReady = src;
bool res = false;
if (!m_device.select(10, sourceReady, m_consReady))
return false;
if (!m_device.select(10))
return false;
if (!m_device.canRead()) {
if (m_device.event())
checkEvents();
return false;
if (m_device.canRead()) {
res = recvSourceBuffer();
if (res)
src->Forward(m_sourceBuffer);
}
int r = m_device.recv(m_sourceBuffer.data(),m_sourceBuffer.length());
if (m_device.canWrite())
sendConsBuffer();
if (m_device.event())
checkEvents();
// Do not yield, if some data was read. Call again processData
return res;
}
// Receive incoming data into the source buffer
bool ZapCircuit::recvSourceBuffer()
{
int r = m_device.recv(m_sourceBuffer.data(),m_sourceBuffer.length());
if (r > 0) {
if ((unsigned int)r != m_sourceBuffer.length())
::memset((unsigned char*)m_sourceBuffer.data() + r,m_idleValue,m_sourceBuffer.length() - r);
src->Forward(m_sourceBuffer);
return true;
}
return false;
}
// Send consumer buffer. Stop on error
void ZapCircuit::sendConsBuffer()
{
Lock lock(m_consMutex);
while (consReady()) {
int w = m_device.write(m_consBuffer.data(),m_buflen);
// m_device is opened in nonblocking mode. When driver buffer
// will be full the errno will be EAGAIN.
if (w <= 0) {
m_errno = errno;
break;
}
m_errno = 0;
m_consBuffer.cut(-w);
XDebug(group(),DebugAll,"ZapCircuit(%u). Sent %d bytes. Remaining: %u [%p]",
code(),w,m_consBuffer.length(),this);
}
// Avoiding select call under mutex lock
m_consReady = consReady();
}
// Send an event through the circuit
bool ZapCircuit::sendEvent(SignallingCircuitEvent::Type type, NamedList* params)
{
@ -2557,6 +2621,13 @@ void ZapCircuit::consume(const DataBlock& data)
if (!(SignallingCircuit::status() >= Special && m_canSend && data.length()))
return;
Lock lock(m_consMutex,100000);
if (!lock.locked()) {
XDebug(group(),DebugInfo,"ZapCircuit(%u). Failed lock mutex at consume! [%p]",
code(),this);
return;
}
// Copy data in buffer
// Throw old data on buffer overrun
m_consTotal += data.length();
@ -2572,18 +2643,8 @@ void ZapCircuit::consume(const DataBlock& data)
m_consBuffer = data;
}
// Send buffer. Stop on error
while (m_consBuffer.length() >= m_buflen) {
int w = m_device.write(m_consBuffer.data(),m_buflen);
if (w <= 0) {
m_errno = errno;
break;
}
m_errno = 0;
m_consBuffer.cut(-w);
XDebug(group(),DebugAll,"ZapCircuit(%u). Sent %d bytes. Remaining: %u [%p]",
code(),w,m_consBuffer.length(),this);
}
// Avoiding select call under mutex lock
m_consReady = consReady();
}
// Close device. Stop worker. Remove source consumer. Change status. Release memory if requested
@ -2934,35 +2995,15 @@ bool ZapAnalogCircuit::processEvent(int event, char c)
return false;
}
// Process incoming data
// Process incoming and outgoing data, poll hook
bool ZapAnalogCircuit::process()
{
if (!(m_device.valid() && SignallingCircuit::status() != SignallingCircuit::Disabled))
return false;
m_device.pollHook();
checkEvents();
s_sourceAccessMutex.lock();
RefPointer<ZapSource> src = m_source;
s_sourceAccessMutex.unlock();
if (!(src && m_device.select(10) && m_device.canRead()))
return false;
int r = m_device.recv(m_sourceBuffer.data(),m_sourceBuffer.length());
if (m_device.event())
checkEvents();
if (r > 0) {
if ((unsigned int)r != m_sourceBuffer.length())
::memset((unsigned char*)m_sourceBuffer.data() + r,m_idleValue,m_sourceBuffer.length() - r);
XDebug(group(),DebugAll,"ZapCircuit(%u). Forwarding %u bytes [%p]",
code(),m_sourceBuffer.length(),this);
src->Forward(m_sourceBuffer);
return true;
}
return false;
return processData();
}
// Change hook state if different
@ -3169,8 +3210,12 @@ bool ZapModule::received(Message& msg, int id)
msg.retValue() << ",count=" << total;
if (!ok)
break;
if (span == 1)
msg.retValue() << ";";
else
msg.retValue() << ",";
// format=Channels|Total|Alarms|Name|Description
msg.retValue() << ";" << span << "=" << p.getValue("configured-chans");
msg.retValue() << span << "=" << p.getValue("configured-chans");
msg.retValue() << "|" << p.getValue("total-chans");
msg.retValue() << "|" << p.getValue("alarmstext");
msg.retValue() << "|" << p.getValue("name");
@ -3211,7 +3256,7 @@ bool ZapModule::received(Message& msg, int id)
bool show = (dev->span() == span) || (cmd == ZapChannelsAll);
if (show) {
// format=Type|ZaptelType|Span|SpanPos|Alarms|Address
s << ";" << dev->channel() << "=" << lookup(dev->type(),s_types);
s << (i ? "," : ";") << dev->channel() << "=" << lookup(dev->type(),s_types);
if (dev->span() == span) {
s << "|" << lookup(dev->zapsig(),s_zaptelSig);
s << "|" << dev->span();