diff --git a/modules/server/zapcard.cpp b/modules/server/zapcard.cpp index 83c49b16..3273b8dc 100644 --- a/modules/server/zapcard.cpp +++ b/modules/server/zapcard.cpp @@ -408,6 +408,8 @@ public: { return m_alarmsText; } inline bool canRead() const { return m_canRead; } + inline bool canWrite() const + { return m_canWrite; } inline bool event() const { return m_event || m_savedEvent; } inline const char* zapDevName() const @@ -460,7 +462,7 @@ public: // Flush read and write buffers bool flushBuffers(FlushTarget target = FlushAll); // Check if received data. Wait usec microseconds before returning - bool select(unsigned int usec); + bool select(unsigned int usec, bool read, bool write); // Receive data. Return -1 on error or the number of bytes read // If -1 is returned, the caller should check if m_event is set int recv(void* buffer, int len); @@ -501,11 +503,13 @@ private: int m_savedEvent; // Event saved asynchronously for later String m_alarmsText; // Alarms text bool m_canRead; // True if there is data to read + bool m_canWrite; // True if we can write data to the device bool m_event; // True if an event occurred when recv/select bool m_readError; // Flag used to print read errors bool m_writeError; // Flag used to print write errors bool m_selectError; // Flag used to print select errors fd_set m_rdfds; + fd_set m_wrfds; fd_set m_errfds; struct timeval m_tv; }; @@ -599,7 +603,7 @@ public: virtual bool getParam(const String& param, String& value) const; // Get this circuit or source/consumer virtual void* getObject(const String& name) const; - // Process incoming data + // Process incoming and outgoing data virtual bool process(); // Send an event virtual bool sendEvent(SignallingCircuitEvent::Type type, NamedList* params = 0); @@ -623,6 +627,15 @@ protected: bool enqueueEvent(int event, SignallingCircuitEvent::Type type); // Enqueue received digits bool enqueueDigit(bool tone, char digit); + // Process incoming and outgoing data + bool processData(); + // Receive incoming data into the source buffer + bool recvSourceBuffer(); + // Send consumer buffer. Stop on error + void sendConsBuffer(); + // Check whether data consumer buffer are ready for device + inline const bool consReady() const + { return m_consBuffer.length() >= m_buflen; } ZapDevice m_device; // The device ZapDevice::Type m_type; // Circuit type @@ -641,6 +654,8 @@ protected: ZapConsumer* m_consumer; // The data consumer DataBlock m_sourceBuffer; // Data source buffer DataBlock m_consBuffer; // Data consumer buffer + Mutex m_consMutex; // Data consumer buffer mutex + bool m_consReady; // Data in consumer buffer are ready for device (avoiding mutex lock) unsigned int m_buflen; // Data block length unsigned int m_consBufMax; // Max consumer buffer length unsigned int m_consErrors; // Consumer. Total number of send failures @@ -672,7 +687,7 @@ public: virtual bool setParam(const String& param, const String& value); // Send an event virtual bool sendEvent(SignallingCircuitEvent::Type type, NamedList* params = 0); - // Process incoming data + // Process incoming and outgoing data, poll hook virtual bool process(); protected: // Process additional events. Return false if not processed @@ -1008,6 +1023,7 @@ ZapDevice::ZapDevice(Type t, SignallingComponent* dbg, unsigned int chan, m_rxHookSig(-1), m_savedEvent(0), m_canRead(false), + m_canWrite(false), m_event(false), m_readError(false), m_writeError(false), @@ -1037,6 +1053,7 @@ ZapDevice::ZapDevice(unsigned int chan, bool disableDbg, bool open) m_rxHookSig(-1), m_savedEvent(0), m_canRead(false), + m_canWrite(false), m_event(false), m_readError(false), m_writeError(false), @@ -1451,18 +1468,23 @@ bool ZapDevice::flushBuffers(FlushTarget target) } // Check if received data. Wait usec microseconds before returning -bool ZapDevice::select(unsigned int usec) +bool ZapDevice::select(unsigned int usec, bool read, bool write) { FD_ZERO(&m_rdfds); - FD_SET(m_handle, &m_rdfds); + if (read) + FD_SET(m_handle, &m_rdfds); + FD_ZERO(&m_wrfds); + if (write) + FD_SET(m_handle, &m_wrfds); FD_ZERO(&m_errfds); FD_SET(m_handle, &m_errfds); m_tv.tv_sec = 0; m_tv.tv_usec = usec; - int sel = ::select(m_handle+1,&m_rdfds,NULL,&m_errfds,&m_tv); + int sel = ::select(m_handle+1,&m_rdfds,&m_wrfds,&m_errfds,&m_tv); if (sel >= 0) { m_event = FD_ISSET(m_handle,&m_errfds); m_canRead = FD_ISSET(m_handle,&m_rdfds); + m_canWrite = FD_ISSET(m_handle,&m_wrfds); m_selectError = false; return true; } @@ -1921,7 +1943,7 @@ bool ZapInterface::init(ZapDevice::Type type, unsigned int code, unsigned int ch // Process incoming data bool ZapInterface::process() { - if (!m_device.select(100)) + if (!m_device.select(100,true,false)) return false; if (!m_device.canRead()) { if (m_device.event()) @@ -2226,6 +2248,7 @@ ZapCircuit::ZapCircuit(ZapDevice::Type type, unsigned int code, unsigned int cha m_priority(Thread::Normal), m_source(0), m_consumer(0), + m_consReady(false), m_buflen(0), m_consBufMax(0), m_consErrors(0), @@ -2494,36 +2517,77 @@ void* ZapCircuit::getObject(const String& name) const return SignallingCircuit::getObject(name); } -// Process incoming data +// Process incoming and outgoing data bool ZapCircuit::process() +{ + if (!(m_device.valid() && SignallingCircuit::status() == Connected)) + return false; + + return processData(); +} + +// Process incoming and outgoing data +bool ZapCircuit::processData() { s_sourceAccessMutex.lock(); RefPointer src = m_source; s_sourceAccessMutex.unlock(); - if (!(m_device.valid() && SignallingCircuit::status() == Connected && src)) + bool sourceReady = src; + bool res = false; + + if (!m_device.select(10, sourceReady, m_consReady)) return false; - if (!m_device.select(10)) - return false; - if (!m_device.canRead()) { - if (m_device.event()) - checkEvents(); - return false; + if (m_device.canRead()) { + res = recvSourceBuffer(); + if (res) + src->Forward(m_sourceBuffer); } - - int r = m_device.recv(m_sourceBuffer.data(),m_sourceBuffer.length()); + if (m_device.canWrite()) + sendConsBuffer(); if (m_device.event()) checkEvents(); + + // Do not yield, if some data was read. Call again processData + return res; +} + +// Receive incoming data into the source buffer +bool ZapCircuit::recvSourceBuffer() +{ + int r = m_device.recv(m_sourceBuffer.data(),m_sourceBuffer.length()); if (r > 0) { if ((unsigned int)r != m_sourceBuffer.length()) ::memset((unsigned char*)m_sourceBuffer.data() + r,m_idleValue,m_sourceBuffer.length() - r); - src->Forward(m_sourceBuffer); return true; } return false; } +// Send consumer buffer. Stop on error +void ZapCircuit::sendConsBuffer() +{ + Lock lock(m_consMutex); + + while (consReady()) { + int w = m_device.write(m_consBuffer.data(),m_buflen); + // m_device is opened in nonblocking mode. When driver buffer + // will be full the errno will be EAGAIN. + if (w <= 0) { + m_errno = errno; + break; + } + m_errno = 0; + m_consBuffer.cut(-w); + XDebug(group(),DebugAll,"ZapCircuit(%u). Sent %d bytes. Remaining: %u [%p]", + code(),w,m_consBuffer.length(),this); + } + + // Avoiding select call under mutex lock + m_consReady = consReady(); +} + // Send an event through the circuit bool ZapCircuit::sendEvent(SignallingCircuitEvent::Type type, NamedList* params) { @@ -2557,6 +2621,13 @@ void ZapCircuit::consume(const DataBlock& data) if (!(SignallingCircuit::status() >= Special && m_canSend && data.length())) return; + Lock lock(m_consMutex,100000); + if (!lock.locked()) { + XDebug(group(),DebugInfo,"ZapCircuit(%u). Failed lock mutex at consume! [%p]", + code(),this); + return; + } + // Copy data in buffer // Throw old data on buffer overrun m_consTotal += data.length(); @@ -2572,18 +2643,8 @@ void ZapCircuit::consume(const DataBlock& data) m_consBuffer = data; } - // Send buffer. Stop on error - while (m_consBuffer.length() >= m_buflen) { - int w = m_device.write(m_consBuffer.data(),m_buflen); - if (w <= 0) { - m_errno = errno; - break; - } - m_errno = 0; - m_consBuffer.cut(-w); - XDebug(group(),DebugAll,"ZapCircuit(%u). Sent %d bytes. Remaining: %u [%p]", - code(),w,m_consBuffer.length(),this); - } + // Avoiding select call under mutex lock + m_consReady = consReady(); } // Close device. Stop worker. Remove source consumer. Change status. Release memory if requested @@ -2934,35 +2995,15 @@ bool ZapAnalogCircuit::processEvent(int event, char c) return false; } -// Process incoming data +// Process incoming and outgoing data, poll hook bool ZapAnalogCircuit::process() { if (!(m_device.valid() && SignallingCircuit::status() != SignallingCircuit::Disabled)) return false; m_device.pollHook(); - checkEvents(); - s_sourceAccessMutex.lock(); - RefPointer src = m_source; - s_sourceAccessMutex.unlock(); - - if (!(src && m_device.select(10) && m_device.canRead())) - return false; - - int r = m_device.recv(m_sourceBuffer.data(),m_sourceBuffer.length()); - if (m_device.event()) - checkEvents(); - if (r > 0) { - if ((unsigned int)r != m_sourceBuffer.length()) - ::memset((unsigned char*)m_sourceBuffer.data() + r,m_idleValue,m_sourceBuffer.length() - r); - XDebug(group(),DebugAll,"ZapCircuit(%u). Forwarding %u bytes [%p]", - code(),m_sourceBuffer.length(),this); - src->Forward(m_sourceBuffer); - return true; - } - - return false; + return processData(); } // Change hook state if different @@ -3169,8 +3210,12 @@ bool ZapModule::received(Message& msg, int id) msg.retValue() << ",count=" << total; if (!ok) break; + if (span == 1) + msg.retValue() << ";"; + else + msg.retValue() << ","; // format=Channels|Total|Alarms|Name|Description - msg.retValue() << ";" << span << "=" << p.getValue("configured-chans"); + msg.retValue() << span << "=" << p.getValue("configured-chans"); msg.retValue() << "|" << p.getValue("total-chans"); msg.retValue() << "|" << p.getValue("alarmstext"); msg.retValue() << "|" << p.getValue("name"); @@ -3211,7 +3256,7 @@ bool ZapModule::received(Message& msg, int id) bool show = (dev->span() == span) || (cmd == ZapChannelsAll); if (show) { // format=Type|ZaptelType|Span|SpanPos|Alarms|Address - s << ";" << dev->channel() << "=" << lookup(dev->type(),s_types); + s << (i ? "," : ";") << dev->channel() << "=" << lookup(dev->type(),s_types); if (dev->span() == span) { s << "|" << lookup(dev->zapsig(),s_zaptelSig); s << "|" << dev->span();