Added ability to limit how much to try to acquire a non-critical mutex.

This should reduce contention during high load and the chances to deadlock.


git-svn-id: http://voip.null.ro/svn/yate@4950 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
paulc 2012-03-02 16:01:45 +00:00
parent 3d1f0fcf05
commit ee9229a986
16 changed files with 83 additions and 24 deletions

View File

@ -30,12 +30,18 @@
; Can be overridden in each ss7-isup, isdn-pri-... or isdn-bri-... section
;floodevents=20
; maxlock: int: How many microseconds to try to take a non-critical lock before giving up
; Valid values 500 or higher, -1 to wait forever
; This parameter is applied on reload
;maxlock=10000 normally, -1 if Yate is started with -Dm
; datafile: string: File to save/restore trunks data (circuits lock status)
; Defaults to ysigdata.conf located in current config directory
; If set the file must contain the path (relative or absolute)
; This parameter is applied on reload
;datafile=
; Each section except [general] can have some common parameters
; To disable the component without deleting the entire section.

View File

@ -27,6 +27,11 @@
#include <string.h>
// Maximum wait for a non-critical mutex acquisition
#ifndef MAX_LOCK_WAIT
#define MAX_LOCK_WAIT 10000
#endif
#define MIN_TICK_SLEEP 500
#define DEF_TICK_SLEEP 5000
#define MAX_TICK_SLEEP 50000
@ -260,7 +265,9 @@ void SignallingNotifier::cleanup()
DDebug(DebugInfo,"SignallingNotifier::cleanup() [%p] stub",this);
}
static SignallingEngine* s_self = 0;
long SignallingEngine::s_maxLockWait = MAX_LOCK_WAIT;
SignallingEngine::SignallingEngine(const char* name)
: Mutex(true,"SignallingEngine"),
@ -289,8 +296,12 @@ SignallingEngine::~SignallingEngine()
SignallingEngine* SignallingEngine::self(bool create)
{
if (create && !s_self)
if (create && !s_self) {
// if mutex debugging is in force don't limit the lock time
if (Lockable::wait())
s_maxLockWait = -1;
s_self = new SignallingEngine;
}
return s_self;
}
@ -491,6 +502,14 @@ unsigned long SignallingEngine::timerTick(const Time& when)
return rval;
}
void SignallingEngine::maxLockWait(long maxWait)
{
if (maxWait < 0)
maxWait = -1;
else if (maxWait < MIN_TICK_SLEEP)
maxWait = MIN_TICK_SLEEP;
s_maxLockWait = maxWait;
}
SignallingThreadPrivate::~SignallingThreadPrivate()
{

View File

@ -2188,8 +2188,8 @@ inline static bool timeout(SS7ISUP* isup, SS7ISUPCall* call, SignallingTimer& ti
// Get an event from this call
SignallingEvent* SS7ISUPCall::getEvent(const Time& when)
{
Lock mylock(this);
if (m_lastEvent || m_state == Released)
Lock mylock(this,SignallingEngine::maxLockWait());
if (m_lastEvent || m_state == Released || !mylock.locked())
return 0;
SS7MsgISUP* msg = 0;
while (true) {
@ -3631,8 +3631,8 @@ void SS7ISUP::destroyed()
void SS7ISUP::timerTick(const Time& when)
{
Lock mylock(this);
if (!(m_l3LinkUp && circuits()))
Lock mylock(this,SignallingEngine::maxLockWait());
if (!(mylock.locked() && m_l3LinkUp && circuits()))
return;
// Test remote user part

View File

@ -163,7 +163,8 @@ void SS7Layer2::attach(SS7L2User* l2user)
void SS7Layer2::timerTick(const Time& when)
{
SignallingComponent::timerTick(when);
m_l2userMutex.lock();
if (!m_l2userMutex.lock(SignallingEngine::maxLockWait()))
return;
RefPointer<SS7L2User> tmp = m_notify ? m_l2user : 0;
m_notify = false;
m_l2userMutex.unlock();
@ -469,7 +470,8 @@ bool SS7MTP2::notify(SignallingInterface::Notification event)
void SS7MTP2::timerTick(const Time& when)
{
SS7Layer2::timerTick(when);
lock();
if (!lock(SignallingEngine::maxLockWait()))
return;
bool tout = m_interval && (when >= m_interval);
if (tout)
m_interval = 0;

View File

@ -1262,7 +1262,9 @@ void SS7MTP3::notify(SS7Layer2* link)
void SS7MTP3::timerTick(const Time& when)
{
Lock lock(this);
Lock mylock(this,SignallingEngine::maxLockWait());
if (!mylock.locked())
return;
for (ObjList* o = m_links.skipNull(); o; o = o->skipNext()) {
L2Pointer* p = static_cast<L2Pointer*>(o->get());
if (!p)

View File

@ -1171,7 +1171,8 @@ bool SS7Management::timeout(SignallingMessageTimer& timer, bool final)
void SS7Management::timerTick(const Time& when)
{
for (;;) {
lock();
if (!lock(SignallingEngine::maxLockWait()))
break;
SnmPending* msg = static_cast<SnmPending*>(m_pending.timeout(when));
unlock();
if (!msg)

View File

@ -320,9 +320,9 @@ void ISDNQ921::timerTick(const Time& when)
// If possible return early without locking
if (state() == Released)
return;
Lock lock(l2Mutex());
Lock lock(l2Mutex(),SignallingEngine::maxLockWait());
// Check state again after locking, to be sure it didn't change
if (state() == Released)
if (!lock.locked() || (state() == Released))
return;
// T200 not started
if (!m_retransTimer.started()) {
@ -1589,8 +1589,8 @@ void ISDNQ921Passive::cleanup()
// Check idle timer. Notify upper layer on timeout
void ISDNQ921Passive::timerTick(const Time& when)
{
Lock lock(l2Mutex());
if (!m_idleTimer.timeout(when.msec()))
Lock lock(l2Mutex(),SignallingEngine::maxLockWait());
if (!(lock.locked() && m_idleTimer.timeout(when.msec())))
return;
// Timeout. Notify layer 3. Restart timer
XDebug(this,DebugNote,"Timeout. Channel was idle for " FMT64 " ms",m_idleTimer.interval());

View File

@ -2903,7 +2903,9 @@ void ISDNQ931::setInterval(SignallingTimer& timer, int id)
// Check timeouts for segmented messages, layer 2 down state, restart circuits
void ISDNQ931::timerTick(const Time& when)
{
Lock lock(l3Mutex());
Lock mylock(l3Mutex(),SignallingEngine::maxLockWait());
if (!mylock.locked())
return;
// Check segmented message
if (m_recvSgmTimer.timeout(when.msec()))
endReceiveSegment("timeout");

View File

@ -693,7 +693,9 @@ void SS7Router::buildView(SS7PointCode::Type type, ObjList& view, SS7Layer3* net
void SS7Router::timerTick(const Time& when)
{
Lock mylock(this);
Lock mylock(this,SignallingEngine::maxLockWait());
if (!mylock.locked())
return;
if (m_isolate.timeout(when.msec())) {
Debug(this,DebugWarn,"Node is isolated and down! [%p]",this);
m_phase2 = false;

View File

@ -2307,7 +2307,8 @@ void SCCPManagement::routeStatus(String& dest,bool extended)
void SCCPManagement::timerTick(const Time& when)
{
lock();
if (!lock(SignallingEngine::maxLockWait()))
return;
ObjList coordt;
for (ObjList* o = m_localSubsystems.skipNull();o;o = o->skipNext()) {
SccpLocalSubsystem* ss = static_cast<SccpLocalSubsystem*>(o->get());
@ -2896,7 +2897,8 @@ bool SS7SCCP::managementStatus(Type type, NamedList& params)
void SS7SCCP::timerTick(const Time& when)
{
Lock lock(this);
if (!lock(SignallingEngine::maxLockWait()))
return;
for (ObjList* o = m_reassembleList.skipNull();o;) {
SS7MsgSccpReassemble* usr = YOBJECT(SS7MsgSccpReassemble,o->get());
if (usr->timeout()) {
@ -2906,6 +2908,7 @@ void SS7SCCP::timerTick(const Time& when)
else
o = o->skipNext();
}
unlock();
}
void SS7SCCP::ajustMessageParams(NamedList& params, SS7MsgSCCP::Type type)

View File

@ -1080,7 +1080,9 @@ bool SS7M2PA::decodeSeq(const DataBlock& data,u_int8_t msgType)
void SS7M2PA::timerTick(const Time& when)
{
SS7Layer2::timerTick(when);
Lock lock(m_mutex);
Lock lock(m_mutex,SignallingEngine::maxLockWait());
if (!lock.locked())
return;
if (m_confTimer.timeout(when.msec())) {
sendAck(); // Acknowledge last received message before endpoint drops down the link
m_confTimer.stop();

View File

@ -443,7 +443,9 @@ void SS7TCAP::enqueue(SS7TCAPMessage* msg)
SS7TCAPMessage* SS7TCAP::dequeue()
{
Lock lock(m_inQueueMtx);
Lock lock(m_inQueueMtx,SignallingEngine::maxLockWait());
if (!lock.locked())
return 0;
ObjList* obj = m_inQueue.skipNull();
if (!obj)
return 0;

View File

@ -114,8 +114,8 @@ void SS7Testing::notify(SS7Layer3* network, int sls)
void SS7Testing::timerTick(const Time& when)
{
Lock mylock(this);
if (!m_timer.timeout(when.msec()))
Lock mylock(this,SignallingEngine::maxLockWait());
if (!(mylock.locked() && m_timer.timeout(when.msec())))
return;
m_timer.start(when.msec());
sendTraffic();

View File

@ -915,6 +915,19 @@ public:
inline unsigned long tickDefault() const
{ return m_usecSleep; }
/**
* Get the maximum time we should spend acquiring a non-critical Mutex
* @return Maximum non-critical lock wait in usec, -1 to wait forever
*/
inline static long maxLockWait()
{ return s_maxLockWait; }
/**
* Set the maximum time we should spend acquiring a non-critical Mutex
* @param maxWait New maximum non-critical lock wait in usec, negative to wait forever
*/
static void maxLockWait(long maxWait);
/**
* Helper template used to remove a component descendant from its engine,
* destroy it and set the received pointer to 0
@ -947,6 +960,7 @@ private:
SignallingNotifier* m_notifier;
unsigned long m_usecSleep;
unsigned long m_tickSleep;
static long s_maxLockWait;
};
/**

View File

@ -1056,9 +1056,8 @@ bool StreamReader::connectSocket()
bool StreamReader::readData()
{
if (!m_socket)
if (!(m_socket && m_sending.lock(SignallingEngine::maxLockWait())))
return false;
m_sending.lock();
if (m_reconnect) {
connectionDown();
m_sending.unlock();
@ -1299,9 +1298,11 @@ bool MessageReader::sendMSG(const DataBlock& header, const DataBlock& msg, int s
bool MessageReader::readData()
{
Lock reconLock(m_sending);
Lock reconLock(m_sending,SignallingEngine::maxLockWait());
// If m_socket is null We are already reconnecting
if (m_socket && m_reconnect) {
if (!reconLock.locked())
return false;
if (m_transport->status() != Transport::Up)
return false; // We are already in reconnecting state
m_transport->setStatus(Transport::Initiating);

View File

@ -3097,6 +3097,9 @@ void SigDriver::initialize()
m_dataFile = s_cfg.getValue("general","datafile",Engine::configFile("ysigdata"));
Engine::self()->runParams().replaceParams(m_dataFile);
s_floodEvents = s_cfg.getIntValue("general","floodevents",20);
int maxLock = s_cfg.getIntValue("general","maxlock",-2);
if (maxLock > -2)
SignallingEngine::maxLockWait(maxLock);
// Startup
setup();
if (!m_engine) {