Added RWLockPool class.

Added atomic number class(es).
Pass atomic operation availability to everybody in configure script.
Use atomic number in NamedCounter.
Use atomic number for MessageHandler unsafe. Fixes concurrent increment introduced by RLock usage in dispatcher.
This commit is contained in:
marian 2023-05-31 15:54:22 +03:00
parent a30704254c
commit d820fed4b8
7 changed files with 582 additions and 69 deletions

View File

@ -1824,7 +1824,7 @@ AC_SUBST(INSTALL_L)
INSTALL_D="install -D"
CFLAGS=`echo "$CFLAGS" | sed 's/\(^\| \+\)-g[[0-9]]*//' | sed 's/[[[:space:]]]\{2,\}/ /g'`
MODULE_CFLAGS="-fno-exceptions -fPIC $HAVE_GCC_FORMAT_CHECK $HAVE_BLOCK_RETURN"
MODULE_CFLAGS="-fno-exceptions -fPIC $HAVE_GCC_FORMAT_CHECK $HAVE_BLOCK_RETURN $ATOMIC_OPS"
MODULE_CPPFLAGS="$HAVE_NO_OVERLOAD_VIRT_WARN $RTTI_OPT $MODULE_CFLAGS"
MODULE_LDRELAX="-rdynamic -shared"
MODULE_SYMBOLS="-Wl,--retain-symbols-file,/dev/null"

View File

@ -109,13 +109,13 @@ Resolver.o: @srcdir@/Resolver.cpp $(MKDEPS) $(CINC)
$(COMPILE) @RESOLV_INC@ -c $<
Mutex.o: @srcdir@/Mutex.cpp $(MKDEPS) $(CINC)
$(COMPILE) @ATOMIC_OPS@ @MUTEX_HACK@ -c $<
$(COMPILE) @MUTEX_HACK@ -c $<
Thread.o: @srcdir@/Thread.cpp $(MKDEPS) $(CINC)
$(COMPILE) @THREAD_KILL@ @THREAD_AFFINITY@ @HAVE_PRCTL@ -c $<
TelEngine.o: @srcdir@/TelEngine.cpp $(MKDEPS) $(CINC)
$(COMPILE) @ATOMIC_OPS@ @HAVE_GMTOFF@ @HAVE_INT_TZ@ -c $<
$(COMPILE) @HAVE_GMTOFF@ @HAVE_INT_TZ@ -c $<
Client.o: @srcdir@/Client.cpp $(MKDEPS) $(CLINC)
$(COMPILE) -c $<

View File

@ -238,7 +238,7 @@ MessageHandler::MessageHandler(const char* name, unsigned priority,
const char* trackName, bool addPriority)
: String(name),
m_trackName(trackName), m_trackNameOnly(trackName), m_priority(priority),
m_unsafe(0), m_dispatcher(0), m_filter(0), m_counter(0)
m_dispatcher(0), m_filter(0), m_counter(0)
{
DDebug(DebugAll,"MessageHandler::MessageHandler('%s',%u,'%s',%s) [%p]",
name,priority,trackName,String::boolText(addPriority),this);
@ -273,10 +273,10 @@ void MessageHandler::safeNowInternal()
{
WLock lck(m_dispatcher ? &m_dispatcher->handlersLock() : 0);
// when the unsafe counter reaches zero we're again safe to destroy
m_unsafe--;
if (m_unsafe < 0)
Debug(DebugFail,"MessageHandler(%s) unsafe=%d locked=%s dispatcher=(%p) [%p]",
safe(),m_unsafe,String::boolText(lck.locked()),m_dispatcher,this);
int v = --m_unsafe;
if (v < 0)
Debug(DebugFail,"MessageHandler(%s) unsafe=%d dispatcher=(%p) [%p]",
safe(),v,m_dispatcher,this);
}
bool MessageHandler::receivedInternal(Message& msg)
@ -401,7 +401,7 @@ bool MessageDispatcher::uninstall(MessageHandler* handler)
} while (handler->m_unsafe > 0);
}
if (handler->m_unsafe != 0)
Debug(DebugFail,"MessageHandler %p has unsafe=%d",handler,handler->m_unsafe);
Debug(DebugFail,"MessageHandler %p has unsafe=%d",handler,(int)handler->m_unsafe);
handler->m_dispatcher = 0;
}
return (handler != 0);

View File

@ -1175,4 +1175,28 @@ RWLockPrivate* RWLock::privDataCopy() const
}
RWLockPool::RWLockPool(unsigned int len, const char* name)
: m_name(0), m_data(0), m_length(len ? len : 1)
{
if (TelEngine::null(name))
name = "Pool";
m_name = new String[m_length];
m_data = new RWLock*[m_length];
for (unsigned int i = 0; i < m_length; i++) {
m_name[i] << name << "::" << (i + 1);
m_data[i] = new RWLock(m_name[i]);
}
}
RWLockPool::~RWLockPool()
{
if (m_data) {
for (unsigned int i = 0; i < m_length; i++)
delete m_data[i];
delete[] m_data;
}
if (m_name)
delete[] m_name;
}
/* vi: set ts=8 sw=4 sts=4 noet: */

View File

@ -1287,7 +1287,6 @@ ObjList& GenObject::getObjCounters()
return s_counters;
}
#ifndef ATOMIC_OPS
static MutexPool s_refMutex(REFOBJECT_MUTEX_COUNT,false,"RefObject");
#endif
@ -1425,58 +1424,6 @@ void RefPointerBase::assign(RefObject* oldptr, RefObject* newptr, void* pointer)
}
NamedCounter::NamedCounter(const String& name)
: String(name), m_count(0), m_enabled(getObjCounting()), m_mutex(0)
{
#ifndef ATOMIC_OPS
m_mutex = s_refMutex.mutex(this);
#endif
}
int NamedCounter::inc()
{
#ifdef ATOMIC_OPS
#ifdef _WINDOWS
return InterlockedIncrement((LONG*)&m_count);
#else
return __sync_add_and_fetch(&m_count,1);
#endif
#else
Lock lock(m_mutex);
return ++m_count;
#endif
}
int NamedCounter::dec()
{
#ifdef ATOMIC_OPS
#ifdef _WINDOWS
return InterlockedDecrement((LONG*)&m_count);
#else
return __sync_fetch_and_sub(&m_count,1);
#endif
#else
Lock lock(m_mutex);
return --m_count;
#endif
}
int NamedCounter::add(int val)
{
#ifdef ATOMIC_OPS
#ifdef _WINDOWS
return InterlockedExchangeAdd((LONG*)&m_count,(LONG)val);
#else
return __sync_add_and_fetch(&m_count,val);
#endif
#else
Lock lock(m_mutex);
m_count += val;
return m_count;
#endif
}
void SysUsage::init()
{
if (!s_startTime)
@ -1550,6 +1497,35 @@ double SysUsage::runTime(Type type)
#endif
}
//
// AtomicOp
//
#ifndef ATOMIC_OP_LOCK_COUNT
#define ATOMIC_OP_LOCK_COUNT 101
#endif
#ifdef YATOMIC_LOCK
static RWLockPool s_atomicLock(ATOMIC_OP_LOCK_COUNT,"AtomicOp");
#endif
AtomicOp::AtomicOp()
: m_lock(0)
{
#ifdef YATOMIC_LOCK
m_lock = s_atomicLock.lock(this);
#endif
}
bool AtomicOp::efficient()
{
#ifdef YATOMIC_LOCK
return false;
#else
return true;
#endif
}
};
/* vi: set ts=8 sw=4 sts=4 noet: */

View File

@ -882,6 +882,9 @@ struct TokenDict64 {
class String;
class DataBlock;
class Mutex;
class RWLock;
class WLock;
class RLock;
class ObjList;
class NamedCounter;
@ -1103,6 +1106,454 @@ YATE_API inline uint32_t hashPtr(const void* ptr)
}
#undef YATOMIC_BUILTIN
#define YATOMIC_LOCK
#ifdef ATOMIC_OPS
#ifndef _WINDOWS
#define YATOMIC_BUILTIN
#undef YATOMIC_LOCK
#endif
#endif
#ifdef YATOMIC_LOCK
#define YATOMIC_OP_LOCK_WRITE WLock lck(m_lock)
#define YATOMIC_OP_LOCK_READ RLock lck(m_lock)
#else
#define YATOMIC_OP_LOCK_WRITE {}
#define YATOMIC_OP_LOCK_READ {}
#endif
/**
* Holds an optional lock protecting an atomic operation
* @short Base class for atomic operations
*/
class YATE_API AtomicOp
{
public:
/**
* Constructor
*/
AtomicOp();
/**
* Retrieve the lock used to protect the atomic operation
* @return RWLock pointer, NULL if atomic operation is available
*/
inline RWLock* lock() const
{ return m_lock; }
/**
* Check if atomic operations are used efficiently
* @return True if efficient, false otherwise (lock is used)
*/
static bool efficient();
protected:
/**
* Mutex used to protect changes if atomic operation is not available
*/
mutable RWLock* m_lock;
};
/**
* This class holds an atomic number along with operations on it
* @short An atomic number
*/
template <class Type> class YAtomicNumber : public AtomicOp
{
public:
/**
* Default constructor
*/
inline YAtomicNumber()
: m_value(0)
{}
/**
* Constructor
* @param val Initial value
*/
explicit inline YAtomicNumber(Type val)
: m_value(val)
{}
/**
* Copy constructor
* @param val Initial value
*/
explicit inline YAtomicNumber(const YAtomicNumber& val)
: m_value(val.valueAtomic())
{}
/**
* Retrieve held number
* @return Held number
*/
inline Type value() const
{ return m_value; }
/**
* Retrieve held number
* @return Held number
*/
inline Type value()
{ return m_value; }
/**
* Retrieve held number reference
* @return Held number reference
*/
inline Type& valueRef()
{ return m_value; }
/**
* Atomically retrieve held number
* @return Held number
*/
inline Type valueAtomic() const {
#ifdef YATOMIC_BUILTIN
return __sync_add_and_fetch(&m_value,0);
#else
YATOMIC_OP_LOCK_READ;
return m_value;
#endif
}
/**
* Atomically retrieve held number
* @return Held number
*/
inline Type valueAtomic() {
#ifdef YATOMIC_BUILTIN
return __sync_add_and_fetch(&m_value,0);
#else
YATOMIC_OP_LOCK_READ;
return m_value;
#endif
}
/**
* Replace (set a new) value
* @param val Value to set
* @return Old number value
*/
inline Type set(Type val) {
#ifdef YATOMIC_BUILTIN
return __sync_val_compare_and_swap(&m_value,valueAtomic(),val);
#else
YATOMIC_OP_LOCK_WRITE;
int old = m_value;
m_value = val;
return old;
#endif
}
/**
* Increment this number
* @return Number after increment
*/
inline Type inc() {
#ifdef YATOMIC_BUILTIN
return __sync_add_and_fetch(&m_value,1);
#else
YATOMIC_OP_LOCK_WRITE;
return ++m_value;
#endif
}
/**
* Decrement this number
* @return Number after decrement
*/
inline Type dec() {
#ifdef YATOMIC_BUILTIN
return __sync_sub_and_fetch(&m_value,1);
#else
YATOMIC_OP_LOCK_WRITE;
return --m_value;
#endif
}
/**
* Add a number to this one
* @param val Number to add
* @return Number after addition
*/
inline Type add(Type val) {
#ifdef YATOMIC_BUILTIN
return __sync_add_and_fetch(&m_value,val);
#else
YATOMIC_OP_LOCK_WRITE;
m_value += val;
return m_value;
#endif
}
/**
* Substract a number from this one
* @param val Number to substract
* @return Number after substraction
*/
inline Type sub(Type val) {
#ifdef YATOMIC_BUILTIN
return __sync_sub_and_fetch(&m_value,val);
#else
YATOMIC_OP_LOCK_WRITE;
m_value -= val;
return m_value;
#endif
}
/**
* Bitwise AND
* @param val Value to apply
* @return Number after operation
*/
inline Type bitAnd(Type val) {
#ifdef YATOMIC_BUILTIN
return __sync_and_and_fetch(&m_value,val);
#else
YATOMIC_OP_LOCK_WRITE;
m_value &= val;
return m_value;
#endif
}
/**
* Bitwise OR
* @param val Value to apply
* @return Number after operation
*/
inline Type bitOr(Type val) {
#ifdef YATOMIC_BUILTIN
return __sync_or_and_fetch(&m_value,val);
#else
YATOMIC_OP_LOCK_WRITE;
m_value |= val;
return m_value;
#endif
}
/**
* Bitwise XOR
* @param val Value to apply
* @return Number after operation
*/
inline Type bitXor(Type val) {
#ifdef YATOMIC_BUILTIN
return __sync_xor_and_fetch(&m_value,val);
#else
YATOMIC_OP_LOCK_WRITE;
m_value ^= val;
return m_value;
#endif
}
/**
* Increment this number
* @return Number before increment
*/
inline Type preInc() {
#ifdef YATOMIC_BUILTIN
return __sync_fetch_and_add(&m_value,1);
#else
YATOMIC_OP_LOCK_WRITE;
int old = m_value++;
return old;
#endif
}
/**
* Decrement this number
* @return Number before decrement
*/
inline Type preDec() {
#ifdef YATOMIC_BUILTIN
return __sync_fetch_and_sub(&m_value,1);
#else
YATOMIC_OP_LOCK_WRITE;
int old = m_value--;
return old;
#endif
}
/**
* Add a number to this one
* @param val Number to add
* @return Number before addition
*/
inline Type preAdd(Type val) {
#ifdef YATOMIC_BUILTIN
return __sync_fetch_and_add(&m_value,val);
#else
YATOMIC_OP_LOCK_WRITE;
int old = m_value;
m_value += val;
return old;
#endif
}
/**
* Substract a number from this one
* @param val Number to substract
* @return Number before substraction
*/
inline Type preSub(Type val) {
#ifdef YATOMIC_BUILTIN
return __sync_fetch_and_sub(&m_value,val);
#else
YATOMIC_OP_LOCK_WRITE;
int old = m_value;
m_value -= val;
return old;
#endif
}
/**
* Bitwise AND
* @param val Value to apply
* @return Number before operation
*/
inline Type preBitAnd(Type val) {
#ifdef YATOMIC_BUILTIN
return __sync_fetch_and_and(&m_value,val);
#else
YATOMIC_OP_LOCK_WRITE;
Type old = m_value;
m_value &= val;
return old;
#endif
}
/**
* Bitwise OR
* @param val Value to apply
* @return Number before operation
*/
inline Type preBitOr(Type val) {
#ifdef YATOMIC_BUILTIN
return __sync_fetch_and_or(&m_value,val);
#else
YATOMIC_OP_LOCK_WRITE;
Type old = m_value;
m_value |= val;
return old;
#endif
}
/**
* Bitwise XOR
* @param val Value to apply
* @return Number before operation
*/
inline Type preBitXor(Type val) {
#ifdef YATOMIC_BUILTIN
return __sync_fetch_and_xor(&m_value,val);
#else
YATOMIC_OP_LOCK_WRITE;
Type old = m_value;
m_value ^= val;
return old;
#endif
}
/**
* Cast operator
* Return atomic value
*/
inline operator Type()
{ return valueAtomic(); }
/**
* Cast operator
* Return atomic value
*/
inline operator Type() const
{ return valueAtomic(); }
/**
* Assignment operator
* @param val Number to set
*/
inline YAtomicNumber& operator=(Type val)
{ set(val); return *this; }
/**
* Assignment operator
* @param val Number to set
*/
inline YAtomicNumber& operator=(const YAtomicNumber& val)
{ set((Type)val); return *this; }
/**
* Addition operator
* @param val Number to add
*/
inline Type operator+=(Type val)
{ return add(val); }
/**
* Prefix increment operator
*/
inline Type operator++()
{ return inc(); }
/**
* Prefix decrement operator
*/
inline Type operator--()
{ return dec(); }
/**
* Postfix increment operator
*/
inline Type operator++(int)
{ return preInc(); }
/**
* Postfix decrement operator
*/
inline Type operator--(int)
{ return preDec(); }
/**
* Substraction operator
* @param val Number to substract
*/
inline Type operator-=(Type val)
{ return sub(val); }
/**
* Bitwise AND operator
* @param val Number to apply
*/
inline Type operator&=(Type val)
{ return bitAnd(val); }
/**
* Bitwise OR operator
* @param val Number to apply
*/
inline Type operator|=(Type val)
{ return bitOr(val); }
/**
* Bitwise XOR operator
* @param val Number to apply
*/
inline Type operator^=(Type val)
{ return bitXor(val); }
protected:
mutable Type m_value;
};
typedef YAtomicNumber<int> AtomicInt;
typedef YAtomicNumber<unsigned int> AtomicUInt;
typedef YAtomicNumber<int64_t> AtomicInt64;
typedef YAtomicNumber<uint64_t> AtomicUInt64;
typedef YAtomicNumber<int32_t> AtomicInt32;
typedef YAtomicNumber<uint32_t> AtomicUInt32;
/**
* An object with just a public virtual destructor
*/
@ -4370,7 +4821,9 @@ public:
* Constructor
* @param name Name of the counter
*/
explicit NamedCounter(const String& name);
explicit NamedCounter(const String& name)
: String(name), m_enabled(GenObject::getObjCounting())
{}
/**
* Check if the counter is enabled
@ -4390,19 +4843,23 @@ public:
* Increment the counter
* @return Post-increment value of the counter
*/
int inc();
inline int inc()
{ return m_count.inc(); }
/**
* Decrement the counter
* @return Post-decrement value of the counter
*/
int dec();
inline int dec()
{ return m_count.dec(); }
/**
* Add a specific value to the counter
* @param val Value to add
* @return Value after addition
*/
int add(int val);
inline int add(int val)
{ return m_count.add(val); }
/**
* Get the current value of the counter
@ -4412,9 +4869,8 @@ public:
{ return m_count; }
private:
int m_count;
AtomicInt m_count;
bool m_enabled;
Mutex* m_mutex;
};
/**
@ -8070,6 +8526,63 @@ private:
inline void* operator new[](size_t);
};
/**
* This class holds a RWLock array. Locks can be retrieved based on object pointers.
* A lock pool can be used to associate a smaller set of RWLock objects with a much
* larger set of objects needing lock.
* @short A RWLock pool
*/
class YATE_API RWLockPool
{
public:
/**
* Build the lock pool
* @param len The number of lock objects to build. The length should be an
* odd number to obtain an optimal distribution of pointer based locks
* (usually pointers are aligned at even addresses): some locks might never
* get used if the length is an even number
* @param name Static name of the lock (for debugging purpose only)
*/
RWLockPool(unsigned int len = 13, const char* name = 0);
/**
* Destructor. Release data
*/
~RWLockPool();
/**
* Build an index from object pointer (pointer value modulo array length).
* Always cast the pointer to the same type when calling this method to
* make sure the same index is returned for a given object
* @param ptr The pointer to object
* @return Valid array index
*/
inline unsigned int index(void* ptr) const
{ return ((unsigned int)(unsigned long)ptr) % m_length; }
/**
* Retrieve the lock associated with a given pointer.
* Always cast the pointer to the same type when calling this method to
* make sure the same lock is returned for a given object
* @param ptr The pointer to object
* @return Valid RWLock pointer
*/
inline RWLock* lock(void* ptr) const
{ return m_data[index(ptr)]; }
/**
* Retrieve the lock at a given index modulo array length
* @param idx The index
* @return Valid RWLock pointer
*/
inline RWLock* lock(unsigned int idx) const
{ return m_data[idx % m_length]; }
private:
String* m_name; // RWLock names
RWLock** m_data; // The array
unsigned int m_length; // Array length
};
/**
* This class holds the action to execute a certain task, usually in a

View File

@ -732,7 +732,7 @@ private:
String m_trackName;
String m_trackNameOnly;
unsigned m_priority;
int m_unsafe;
AtomicInt m_unsafe;
MessageDispatcher* m_dispatcher;
MatchingItemBase* m_filter;
NamedCounter* m_counter;