Add support for read-write lock.

git-svn-id: http://voip.null.ro/svn/yate@6570 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
oana 2022-10-27 13:51:32 +00:00
parent 44b4069218
commit 664f85e763
5 changed files with 630 additions and 1 deletions

View File

@ -309,6 +309,25 @@ MUTEX_HACK="$MUTEX_HACK -DHAVE_TIMEDWAIT"
fi
AC_MSG_RESULT([$have_sem_timedwait])
have_rd_timedlock=""
AC_CHECK_LIB([pthread], [pthread_rwlock_timedrdlock], [have_rd_timedlock="yes"])
if [[ "x$have_rd_timedlock" = "x" ]]; then
AC_CHECK_LIB([c], [pthread_rwlock_timedrdlock],[have_rd_timedlock="yes"])
fi
if [[ "x$have_rd_timedlock" = "xyes" ]]; then
MUTEX_HACK="$MUTEX_HACK -DHAVE_TIMEDRDLOCK"
fi
have_wr_timedlock=""
AC_CHECK_LIB([pthread], [pthread_rwlock_timedwrlock], [have_wr_timedlock="yes"])
if [[ "x$have_wr_timedlock" = "x" ]]; then
AC_CHECK_LIB([c], [pthread_rwlock_timedwrlock], [have_wr_timedlock="yes"])
fi
if [[ "x$have_rd_timedlock" = "xyes" ]]; then
MUTEX_HACK="$MUTEX_HACK -DHAVE_TIMEDWRLOCK"
fi
CFLAGS="$SAVE_CFLAGS"
LIBS="$SAVE_LIBS"
AC_LANG_RESTORE

View File

@ -2563,6 +2563,7 @@ void Engine::initLibrary(const String& line, String* output)
ENGINE_SET_VAL_BREAK('s',s_lateabrt,true);
ENGINE_INSTR_BREAK('m',setLockableWait());
ENGINE_INSTR_BREAK('d',Lockable::enableSafety());
ENGINE_INSTR_BREAK('r',RWLock::disableRWLock(true));
default:
unkArgs.append("-D" + String(*pc)," ");
}
@ -2901,6 +2902,9 @@ int Engine::main(int argc, const char** argv, const char** env, RunMode mode, En
case 'd':
Lockable::enableSafety();
break;
case 'r':
RWLock::disableRWLock(true);
break;
#ifdef RTLD_GLOBAL
case 'l':
s_localsymbol = true;

View File

@ -109,7 +109,7 @@ Resolver.o: @srcdir@/Resolver.cpp $(MKDEPS) $(CINC)
$(COMPILE) @RESOLV_INC@ -c $<
Mutex.o: @srcdir@/Mutex.cpp $(MKDEPS) $(CINC)
$(COMPILE) @MUTEX_HACK@ -c $<
$(COMPILE) @ATOMIC_OPS@ @MUTEX_HACK@ -c $<
Thread.o: @srcdir@/Thread.cpp $(MKDEPS) $(CINC)
$(COMPILE) @THREAD_KILL@ @THREAD_AFFINITY@ @HAVE_PRCTL@ -c $<

View File

@ -43,6 +43,7 @@ extern int pthread_mutexattr_settype(pthread_mutexattr_t *__attr, int __kind) __
typedef pthread_mutex_t HMUTEX;
typedef sem_t HSEMAPHORE;
typedef pthread_rwlock_t rwlock_t;
#endif /* ! _WINDOWS */
@ -109,6 +110,42 @@ private:
const char* m_name;
};
class RWLockPrivate
{
public:
RWLockPrivate(const char* name);
~RWLockPrivate();
inline void ref()
{ ++m_refcount; }
inline void deref()
{ if (!--m_refcount) delete this; }
inline const char* name() const
{ return m_name; }
inline const char* owner() const
{ return m_nonRWLck ? m_nonRWLck->owner() : m_wrOwner; }
bool locked() const
{ return m_nonRWLck ? m_nonRWLck->locked() : (m_locked > 0); }
bool readLock(long maxWait = -1);
bool writeLock(long maxWwait = -1);
bool unlock();
static volatile int s_count;
static volatile int s_locks;
private:
#ifdef _WINDOWS
// we use m_nonRWLck
#else
rwlock_t m_lock;
#endif
MutexPrivate* m_nonRWLck;
int m_refcount;
const char* m_name;
unsigned int m_locked;
const char* m_wrOwner;
#ifndef ATOMIC_OPS
Mutex m_mutex;
#endif
};
class GlobalMutex {
public:
GlobalMutex();
@ -129,11 +166,18 @@ static GlobalMutex s_global;
static unsigned long s_maxwait = 0;
static bool s_unsafe = MUTEX_STATIC_UNSAFE;
static bool s_safety = false;
#ifdef _WINDOWS
static bool s_rwLockDisabled = true;
#else
static bool s_rwLockDisabled = false;
#endif
volatile int MutexPrivate::s_count = 0;
volatile int MutexPrivate::s_locks = 0;
volatile int SemaphorePrivate::s_count = 0;
volatile int SemaphorePrivate::s_locks = 0;
volatile int RWLockPrivate::s_count = 0;
volatile int RWLockPrivate::s_locks = 0;
bool GlobalMutex::s_init = true;
// WARNING!!!
@ -769,4 +813,349 @@ void Lock2::drop()
mx1->unlock();
}
RWLockPrivate::RWLockPrivate(const char* name)
: m_nonRWLck(0), m_refcount(1), m_name(name), m_locked(0), m_wrOwner(0)
#ifndef ATOMIC_OPS
, m_mutex(true,"RWLockPrivate")
#endif
{
if (s_rwLockDisabled) {
m_nonRWLck = new MutexPrivate(true,name);
return;
}
GlobalMutex::lock();
s_count++;
#ifdef _WINDOWS
// not implemented, uses m_nonRWLck
#else
::pthread_rwlock_init(&m_lock,0);
#endif
GlobalMutex::unlock();
}
RWLockPrivate::~RWLockPrivate()
{
if (m_nonRWLck) {
delete m_nonRWLck;
m_nonRWLck = 0;
return;
}
bool warn = false;
GlobalMutex::lock();
if (m_locked) {
warn = true;
--m_locked;
if (s_safety)
--s_locks;
#ifdef _WINDOWS
// not implemented, uses m_nonRWLck
#else
::pthread_rwlock_unlock(&m_lock);
#endif
}
s_count--;
#ifdef _WINDOWS
// not implemented, uses m_nonRWLck
#else
::pthread_rwlock_destroy(&m_lock);
#endif
GlobalMutex::unlock();
if (m_locked)
Debug(DebugFail,"RWLockPrivate '%s' owned by '%s' destroyed with %u locks [%p]",
m_name,m_wrOwner,m_locked,this);
else if (warn)
Debug(DebugCrit,"RWLockPrivate '%s' owned by '%s' unlocked in destructor [%p]",
m_name,m_wrOwner,this);
}
bool RWLockPrivate::readLock(long maxwait)
{
if (m_nonRWLck)
return m_nonRWLck->lock(maxwait);
int ret = -1;
bool warn = false;
if (s_maxwait && (maxwait < 0)) {
maxwait = (long)s_maxwait;
warn = true;
}
bool safety = s_safety;
if (safety)
GlobalMutex::lock();
Thread* thr = Thread::current();
if (thr)
thr->m_locking = true;
if (safety)
GlobalMutex::unlock();
#ifdef _WINDOWS
// not implemented, uses m_nonRWLck
#else
if (s_unsafe)
ret = 0;
if (maxwait < 0)
ret = ::pthread_rwlock_rdlock(&m_lock);
else if (!maxwait)
ret = ::pthread_rwlock_tryrdlock(&m_lock);
else {
u_int64_t t = Time::now() + maxwait;
#ifdef HAVE_TIMEDRDLOCK
struct timeval tv;
struct timespec ts;
Time::toTimeval(&tv,t);
ts.tv_sec = tv.tv_sec;
ts.tv_nsec = 1000 * tv.tv_usec;
ret = ::pthread_rwlock_timedrdlock(&m_lock,&ts);
#else
bool dead = false;
do {
if (!dead) {
dead = Thread::check(false);
// give up only if caller asked for a limited wait
if (dead && !warn)
break;
}
ret = ::pthread_rwlock_tryrdlock(&m_lock);
if (!ret)
break;
Thread::yield();
} while (t > Time::now());
#endif
}
#endif // _WINDOWS
if (safety)
GlobalMutex::lock();
if (thr)
thr->m_locking = false;
if (!ret) {
if (safety)
++s_locks;
#ifdef ATOMIC_OPS
#ifdef _WINDOWS
InterlockedIncrement((LONG*)&m_locked);
#else
__sync_add_and_fetch(&m_locked,1);
#endif
#else
m_mutex.lock();
++m_locked;
m_mutex.unlock();
#endif
if (thr)
++thr->m_locks;
}
if (safety)
GlobalMutex::unlock();
if (warn && ret)
Debug(DebugFail,"Thread '%s' could not lock for read RW lock '%s' writing-owned by '%s' after waiting for %ld usec! [%p]",
Thread::currentName(),TelEngine::c_safe (m_name),TelEngine::c_safe(m_wrOwner),maxwait,this);
return ret == 0;
}
bool RWLockPrivate::writeLock(long maxwait)
{
if (m_nonRWLck)
return m_nonRWLck->lock(maxwait);
int ret = -1;
bool warn = false;
if (s_maxwait && (maxwait < 0)) {
maxwait = (long)s_maxwait;
warn = true;
}
bool safety = s_safety;
if (safety)
GlobalMutex::lock();
Thread* thr = Thread::current();
if (thr)
thr->m_locking = true;
if (safety)
GlobalMutex::unlock();
#ifdef _WINDOWS
// not implemented, uses m_nonRWLck
#else
if (s_unsafe)
ret = 0;
if (maxwait < 0)
ret = ::pthread_rwlock_wrlock(&m_lock);
else if (!maxwait)
ret = ::pthread_rwlock_trywrlock(&m_lock);
else {
u_int64_t t = Time::now() + maxwait;
#ifdef HAVE_TIMEDWRLOCK
struct timeval tv;
struct timespec ts;
Time::toTimeval(&tv,t);
ts.tv_sec = tv.tv_sec;
ts.tv_nsec = 1000 * tv.tv_usec;
ret = ::pthread_rwlock_timedwrlock(&m_lock,&ts);
#else
bool dead = false;
do {
if (!dead) {
dead = Thread::check(false);
// give up only if caller asked for a limited wait
if (dead && !warn)
break;
}
ret = ::pthread_rwlock_trywrlock(&m_lock);
if (!ret)
break;
Thread::yield();
} while (t > Time::now());
#endif
}
#endif
if (safety)
GlobalMutex::lock();
if (thr)
thr->m_locking = false;
if (!ret) {
if (safety)
++s_locks;
#ifdef ATOMIC_OPS
#ifdef _WINDOWS
InterlockedIncrement((LONG*)&m_locked);
#else
__sync_add_and_fetch(&m_locked,1);
#endif
#else
m_mutex.lock();
++m_locked;
m_mutex.unlock();
#endif
m_wrOwner = Thread::currentName();
if (thr)
++thr->m_locks;
}
if (safety)
GlobalMutex::unlock();
if (warn && ret)
Debug(DebugFail,"Thread '%s' could not lock for write RW lock '%s' writing-owned by '%s' after waiting for %ld usec! [%p]",
Thread::currentName(),TelEngine::c_safe(m_name),TelEngine::c_safe(m_wrOwner),maxwait,this);
return ret == 0;
}
bool RWLockPrivate::unlock()
{
if (m_nonRWLck)
return m_nonRWLck->unlock();
int ok = -1;
bool safety = s_safety;
if (safety)
GlobalMutex::lock();
if (m_locked) {
Thread* thr = Thread::current();
if (thr)
--thr->m_locks;
#ifdef ATOMIC_OPS
#ifdef _WINDOWS
int l = InterlockedDecrement((LONG*)&m_locked);
#else
int l = __sync_sub_and_fetch(&m_locked,1);
#endif
#else
m_mutex.lock();
int l = --m_locked;
m_mutex.unlock();
#endif
if (!l) {
const char* tname = thr ? thr->name() : 0;
if (m_wrOwner && tname != m_wrOwner)
Debug(DebugFail,"RWLockPrivate '%s' unlocked by '%s' but owned by '%s' [%p]",
TelEngine::c_safe(m_name),TelEngine::c_safe(tname),TelEngine::c_safe(m_wrOwner),this);
m_wrOwner = 0;
}
if (safety) {
int locks = --s_locks;
if (locks < 0) {
// this is very very bad - abort right now
abortOnBug(true);
s_locks = 0;
Debug(DebugFail,"RWLockPrivate::locks() is %d [%p]",locks,this);
}
}
#ifdef _WINDOWS
// not implemented, uses m_nonRWLck
#else
ok = s_unsafe ? 0 : ::pthread_rwlock_unlock(&m_lock);
#endif
if (ok)
Debug(DebugFail,"Thread '%s' failed to unlock RW lock '%s' owned by '%s' [%p]",Thread::currentName(),
TelEngine::c_safe(m_name),TelEngine::c_safe(m_wrOwner),this);
}
else {
Debug(DebugFail,"Thread '%s' could not unlock already unlocked RW lock '%s' writing-owned by '%s' [%p]",Thread::currentName(),
TelEngine::c_safe(m_name),TelEngine::c_safe(m_wrOwner),this);
}
if (safety)
GlobalMutex::unlock();
return ok == 0;
}
/**
* class RWLock
*/
RWLock::RWLock(const char* name)
{
m_private = new RWLockPrivate(name ? name : "?");
}
RWLock::RWLock(const RWLock& original)
: Lockable(),
m_private(original.privDataCopy())
{
}
RWLock::~RWLock()
{
RWLockPrivate* priv = m_private;
m_private = 0;
if (priv)
priv->deref();
}
bool RWLock::readLock(long maxwait)
{
return m_private && m_private->readLock(maxwait);
}
bool RWLock::writeLock(long maxwait)
{
return m_private && m_private->writeLock(maxwait);
}
bool RWLock::unlock()
{
return m_private && m_private->unlock();
}
bool RWLock::locked() const
{
return m_private && m_private->locked();
}
void RWLock::disableRWLock(bool disable)
{
#ifdef _WINDOWS
// we disable RWLock usage as it is not implemented
s_rwLockDisabled = true;
#else
s_rwLockDisabled = disable;
#endif
}
RWLockPrivate* RWLock::privDataCopy() const
{
if (m_private)
m_private->ref();
return m_private;
}
/* vi: set ts=8 sw=4 sts=4 noet: */

View File

@ -6062,6 +6062,7 @@ protected:
class MutexPrivate;
class SemaphorePrivate;
class ThreadPrivate;
class RWLockPrivate;
/**
* An abstract base class for implementing lockable objects
@ -6522,6 +6523,221 @@ private:
inline void* operator new[](size_t);
};
/**
* A read/write lock
* @short Read/write lock support
*/
class YATE_API RWLock : public Lockable
{
friend class RWLockPrivate;
public:
/**
* Build a read/write lock
* @param name Name of this lock
*/
RWLock(const char* name = 0);
/**
* Copy constructor, creates a RWLock semaphore
* @param original Reference of the semaphore to share
*/
RWLock(const RWLock& original);
/**
* Destructor
*/
~RWLock();
/**
* Unlock either the read or write lock held by the calling thread
* @return True if anything was unlocked
*/
bool unlock();
/**
* Lock the read lock.
* @param maxWait Time to wait for locking to succeed, -1 to wait forever, 0 return immediately
* @return True if locking succeed
*/
bool readLock(long maxWait = -1);
/**
* Lock the write lock.
* @param maxWait Time to wait for locking to succeed, -1 to wait forever, 0 return immediately
* @return True if locking succeed
*/
bool writeLock(long maxWait = -1);
/**
* Lock the write lock. This behaves like a mutex
* @param maxWait Time to wait for locking to succeed, -1 to wait forever, 0 return immediately
* @return True if locking succeed
*/
virtual bool lock(long maxWait = -1)
{ return writeLock(maxWait); }
/**
* Check if the object is currently locked - as it's asynchronous it
* guarantees nothing if other thread changes the status
* @return True if the object was locked when the function was called
*/
virtual bool locked() const;
/**
* Debugging method for disabling RW locks usage
* and replacing it with a non-recursive mutex
* @param disable True to disable RW locks usage
*/
static void disableRWLock(bool disable);
private:
RWLockPrivate* privDataCopy() const;
RWLockPrivate* m_private;
};
/**
* Ephemeral read lock on a read-write lock (stack allocated lock that is locked on
* creation and unlocked in destructor
*/
class RLock
{
public:
/**
* Create the lock, try to lock the object
* @param lck Reference to the object to lock
* @param maxWait Time in microseconds to wait, -1 wait forever
*/
inline RLock(RWLock& lck, long maxWait = -1)
{ m_lock = lck.readLock(maxWait) ? &lck : 0; }
/**
* Create the lock, try to lock the object
* @param lck Pointer to the object to lock
* @param maxwait Time in microseconds to wait, -1 wait forever
*/
inline RLock(RWLock* lck, long maxwait = -1)
{ m_lock = (lck && lck->readLock(maxwait)) ? lck : 0; }
/**
* Destroy the lock, unlock the mutex if it was locked
*/
~RLock()
{ if (m_lock) m_lock->unlock(); }
/**
* Return a pointer to the lockable object this lock holds
* @return A pointer to a Lockable or NULL if locking failed
*/
inline RWLock* locked() const
{ return m_lock; }
/**
* Unlock the object if it was locked and drop the reference to it
*/
inline void drop()
{ if (m_lock) m_lock->unlock(); m_lock = 0; }
/**
* Attempt to acquire a new lock on another object
* @param lck Pointer to the object to lock
* @param maxwait Time in microseconds to wait, -1 wait forever
* @return True if locking succeeded or same object was locked
*/
inline bool acquire(RWLock* lck, long maxwait = -1)
{ return (lck && (lck == m_lock)) ||
(drop(),(lck && (m_lock = lck->readLock(maxwait) ? lck : 0))); }
/**
* Attempt to acquire a new lock on another object
* @param lck Reference to the object to lock
* @param maxwait Time in microseconds to wait, -1 wait forever
* @return True if locking succeeded or same object was locked
*/
inline bool acquire(RWLock& lck, long maxwait = -1)
{ return acquire(&lck,maxwait); }
private:
RWLock* m_lock;
/** Make sure no Lock is ever created on heap */
inline void* operator new(size_t);
/** Never allocate an array of this class */
inline void* operator new[](size_t);
};
/**
* Ephemeral read lock on a read-write lock (stack allocated lock that is locked on
* creation and unlocked in destructor
*/
class WLock
{
public:
/**
* Create the lock, try to lock the object
* @param lck Reference to the object to lock
* @param maxWait Time in microseconds to wait, -1 wait forever
*/
inline WLock(RWLock& lck, long maxWait = -1)
{ m_lock = lck.writeLock(maxWait) ? &lck : 0; }
/**
* Create the lock, try to lock the object
* @param lck Pointer to the object to lock
* @param maxWait Time in microseconds to wait, -1 wait forever
*/
inline WLock(RWLock* lck, long maxWait = -1)
{ m_lock = (lck && lck->writeLock(maxWait)) ? lck : 0; }
/**
* Destroy the lock, unlock the mutex if it was locked
*/
~WLock()
{ if (m_lock) m_lock->unlock(); }
/**
* Return a pointer to the lockable object this lock holds
* @return A pointer to a Lockable or NULL if locking failed
*/
inline RWLock* locked() const
{ return m_lock; }
/**
* Unlock the object if it was locked and drop the reference to it
*/
inline void drop()
{ if (m_lock) m_lock->unlock(); m_lock = 0; }
/**
* Attempt to acquire a new lock on another object
* @param lck Pointer to the object to lock
* @param maxWait Time in microseconds to wait, -1 wait forever
* @return True if locking succeeded or same object was locked
*/
inline bool acquire(RWLock* lck, long maxWait = -1)
{ return (lck && (lck == m_lock)) ||
(drop(),(lck && (m_lock = lck->writeLock(maxWait) ? lck : 0))); }
/**
* Attempt to acquire a new lock on another object
* @param lck Reference to the object to lock
* @param maxWait Time in microseconds to wait, -1 wait forever
* @return True if locking succeeded or same object was locked
*/
inline bool acquire(RWLock& lck, long maxWait = -1)
{ return acquire(&lck,maxWait); }
private:
RWLock* m_lock;
/** Make sure no Lock is ever created on heap */
inline void* operator new(size_t);
/** Never allocate an array of this class */
inline void* operator new[](size_t);
};
/**
* This class holds the action to execute a certain task, usually in a
* different execution thread.
@ -6553,6 +6769,7 @@ class YATE_API Thread : public Runnable
friend class ThreadPrivate;
friend class MutexPrivate;
friend class SemaphorePrivate;
friend class RWLockPrivate;
YNOCOPY(Thread); // no automatic copies please
public:
/**