Add YATE library support for setting and getting the affinity of a thread.

git-svn-id: http://voip.null.ro/svn/yate@6429 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
oana 2020-09-23 15:26:34 +00:00
parent 1214822e83
commit 8bb8e83faf
4 changed files with 449 additions and 3 deletions

View File

@ -345,6 +345,46 @@ fi
AC_MSG_RESULT([$have_pthread_kill])
AC_SUBST(THREAD_KILL)
THREAD_AFFINITY=""
AC_MSG_CHECKING([for pthread_setaffinity_np declaration])
AC_LANG_SAVE
AC_LANG_C
SAVE_CFLAGS="$CFLAGS"
CFLAGS="$CFLAGS -Wall -Werror"
AC_TRY_COMPILE([
#define _GNU_SOURCE
#include <pthread.h>
],[
cpu_set_t cpuset;
pthread_setaffinity_np(pthread_self(),sizeof(cpuset),&cpuset);
],
have_pthread_affinity="yes",
have_pthread_affinity="no"
)
AC_MSG_RESULT([$have_pthread_affinity])
if [[ "x$have_pthread_affinity" = "xyes" ]]; then
THREAD_AFFINITY="-DTHREAD_AFFINITY"
else
AC_MSG_CHECKING([for sched_setaffinity declaration])
AC_TRY_COMPILE([
#define _GNU_SOURCE
#include <sched.h>
],[
cpu_set_t cpuset;
sched_setaffinity(0,sizeof(cpuset),&cpuset);
],
have_sched_affinity="yes",
have_sched_affinity="no"
)
if [[ "x$have_sched_affinity" = "xyes" ]]; then
THREAD_AFFINITY="-DSCHED_AFFINITY"
fi
AC_MSG_RESULT([$have_sched_affinity])
fi
CFLAGS="$SAVE_CFLAGS"
AC_LANG_RESTORE
AC_SUBST(THREAD_AFFINITY)
# Check for compile options
INLINE_FLAGS=""
AC_ARG_ENABLE(inline,AC_HELP_STRING([--enable-inline],[Enable inlining of functions]),want_inline=$enableval,want_inline=auto)

View File

@ -112,7 +112,7 @@ Mutex.o: @srcdir@/Mutex.cpp $(MKDEPS) $(CINC)
$(COMPILE) @MUTEX_HACK@ -c $<
Thread.o: @srcdir@/Thread.cpp $(MKDEPS) $(CINC)
$(COMPILE) @THREAD_KILL@ @HAVE_PRCTL@ -c $<
$(COMPILE) @THREAD_KILL@ @THREAD_AFFINITY@ @HAVE_PRCTL@ -c $<
TelEngine.o: @srcdir@/TelEngine.cpp $(MKDEPS) $(CINC)
$(COMPILE) @ATOMIC_OPS@ @HAVE_GMTOFF@ @HAVE_INT_TZ@ -c $<

View File

@ -24,7 +24,26 @@
#ifdef _WINDOWS
#include <process.h>
typedef unsigned long HTHREAD;
#define EINVAL_ERR ERROR_INVALID_PARAMETER
#define ENOTSUP_ERR ERROR_NOT_SUPPORTED
#define EINVALTHR_ERR ERROR_INVALID_HANDLE
#else
#if defined(THREAD_AFFINITY) || defined(SCHED_AFFINITY)
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#ifdef SCHED_AFFINITY
#include <sys/syscall.h>
#endif
#endif
#define EINVAL_ERR EINVAL
#define ENOTSUP_ERR ENOTSUP
#define EINVALTHR_ERR ESRCH
#include <pthread.h>
typedef pthread_t HTHREAD;
#ifndef PTHREAD_EXPLICIT_SCHED
@ -66,8 +85,13 @@ public:
static ThreadPrivate* create(Thread* t,const char* name,Thread::Priority prio);
static void killall();
static ThreadPrivate* current();
static int setAffinity(ThreadPrivate* t, const DataBlock& bits);
static int getAffinity(ThreadPrivate* t, DataBlock& outMask);
Thread* m_thread;
HTHREAD thread;
#if !defined(_WINDOWS) && defined(SCHED_AFFINITY)
pid_t m_tid;
#endif
NamedCounter* m_counter;
bool m_running;
bool m_started;
@ -76,6 +100,7 @@ public:
const char* m_name;
#ifdef _WINDOWS
static void startFunc(void* arg);
DWORD_PTR m_affinityMask;
#else
static void* startFunc(void* arg);
#endif
@ -105,8 +130,14 @@ static DWORD getTls()
abort();
return tls_index;
}
static inline bool haveAffinity()
{
// TODO handle detection of support on Windows
return true;
}
#else /* _WINDOWS */
static pthread_key_t current_key;
static pthread_key_t current_key = PTHREAD_KEYS_MAX; // this signifies invalid key
class ThreadPrivateKeyAlloc
{
@ -120,6 +151,14 @@ public:
}
};
static inline bool haveAffinity()
{
#if defined(THREAD_AFFINITY) || defined(SCHED_AFFINITY)
return true;
#endif
return false;
}
static ThreadPrivateKeyAlloc keyAllocator;
#endif /* _WINDOWS */
@ -242,6 +281,22 @@ ThreadPrivate::ThreadPrivate(Thread* t,const char* name)
{
#ifdef DEBUG
Debugger debug("ThreadPrivate::ThreadPrivate","(%p,\"%s\") [%p]",t,name,this);
#endif
#if defined(_WINDOWS)
m_affinityMask = 0;
DWORD_PTR sysAffin = 0;
if (!::GetProcessAffinityMask(::GetCurrentProcess(),&m_affinityMask,&sysAffin)) {
int err = ::GetLastError();
Debug(DebugNote,"ThreadPrivate::ThreadPrivate(%p,\"%s\") - "
"Failed to get process affinity, err=%d [%p]",t,name,err,this);
}
#ifdef DEBUG
else
Debug(DebugAll,"ThreadPrivate::ThreadPrivate(%p,\"%s\") - Process affinity = %lx,"
" system affinity = %lx [%p]",t,name,m_affinityMask,sysAffin,this);
#endif
#elif defined(SCHED_AFFINITY)
m_tid = -1;
#endif
// Inherit object counter of creating thread
m_counter = Thread::getCurrentObjCounter(true);
@ -425,6 +480,155 @@ ThreadPrivate* ThreadPrivate::current()
#endif
}
int ThreadPrivate::setAffinity(ThreadPrivate* t, const DataBlock& cpuMask)
{
#ifdef DEBUG
String str;
str.hexify(cpuMask.data(),cpuMask.length());
Debug(DebugAll,"ThreadPrivate::setAffinity() %s(%p) to affinity mask:'%s'",
t ? t->m_name : "self",t,str.c_str());
#endif
if (!haveAffinity())
return ENOTSUP_ERR;
if (!cpuMask.length())
return EINVAL_ERR;
#ifdef _WINDOWS
DWORD_PTR mask = 0;
if (sizeof(mask) < cpuMask.length())
Debug(DebugNote,"CPU affinity mask is '%u' long, permitted length is '%u', ignoring exceeding bits",
cpuMask.length() * 8, sizeof(DWORD_PTR) * 8);
for (unsigned int i = 0; i < sizeof(mask) && i < cpuMask.length(); i++)
mask |= (cpuMask.at(i) << (i << 3));
if (!(mask = ::SetThreadAffinityMask(t ?reinterpret_cast<HANDLE>(t->thread) : ::GetCurrentThread(),mask)))
return ::GetLastError();
if (t)
t->m_affinityMask = mask;
return 0;
#elif defined(THREAD_AFFINITY) || defined(SCHED_AFFINITY)
cpu_set_t cpuSet;
CPU_ZERO(&cpuSet);
uint8_t* bytes = (uint8_t*)cpuMask.data();
unsigned int len = cpuMask.length();
for (unsigned int i = 0; i < len; i++) {
uint8_t b = bytes[i];
for (uint8_t j = 0; j < 8; j++)
if ((b & (1 << j)))
CPU_SET(((i << 3) + j),&cpuSet);
}
#ifdef THREAD_AFFINITY
return pthread_setaffinity_np(t ? t->thread : pthread_self(),sizeof(cpuSet),&cpuSet);
#else
pid_t tid = -1;
if (!t)
tid = (pid_t)syscall(SYS_gettid);
else {
// there is a race between getting the TID and this call
// try to hold off for a while, maybe it will get set
// otherwise signal that the user should try again
unsigned int i = 0;
while (t->m_tid < 0 && i++ < 5)
::usleep(0);
tid = t->m_tid;
}
DDebug(DebugAll,"ThreadPrivate::setAffinity() %s(%p) for TID:'%d'",t ? t->m_name : "self",t,tid);
if (-1 == tid)
return EAGAIN;
return sched_setaffinity(tid,sizeof(cpuSet),&cpuSet) ? errno : 0;
#endif
#endif /* _WINDOWS */
return ENOTSUP_ERR;
}
int ThreadPrivate::getAffinity(ThreadPrivate* t, DataBlock& outMask)
{
if (!haveAffinity())
return ENOTSUP_ERR;
#ifdef _WINDOWS
HANDLE thr = t ? reinterpret_cast<HANDLE>(t->thread) : ::GetCurrentThread();
DWORD_PTR setMask = 0;
if (t) {
setMask = t->m_affinityMask;
}
else {
DWORD_PTR sysAffin = 0;
if (!::GetProcessAffinityMask(::GetCurrentProcess(),&setMask,&sysAffin)) {
int err = ::GetLastError();
Debug(DebugNote,"ThreadPrivate::getAffinity(t=%p) - "
"Failed to get process affinity, err=%d [%p]",t,err);
}
}
DWORD_PTR mask = ::SetThreadAffinityMask(thr,setMask);
if (!mask)
return ::GetLastError();
if (mask != setMask) {// maybe mask was changed by something external, restore it
if (!(::SetThreadAffinityMask(thr,mask))) {
Debug(DebugNote,"Failed to restore thread CPU affinity to '%lx', "
"now set to '%lx' [%p] for thread '%s' [%p]",mask,setMask,t ? t->m_name : "self",t);
return ::GetLastError();
}
}
outMask.resize(sizeof(mask));
uint8_t* bytes = (uint8_t*)outMask.data();
for (unsigned int i = 0; i < sizeof(mask) ; i++) {
*bytes++ = (uint8_t)(mask >> (i << 3)) & 0xff;
}
return 0;
#elif defined(THREAD_AFFINITY) || defined(SCHED_AFFINITY)
cpu_set_t cpuSet;
CPU_ZERO(&cpuSet);
int ret = 0;
#ifdef THREAD_AFFINITY
ret = pthread_getaffinity_np(t ? t->thread : pthread_self(),sizeof(cpuSet),&cpuSet);
#else
pid_t tid = -1;
if (!t)
tid = (pid_t)syscall(SYS_gettid);
else {
// there is a race between getting the TID and this call
// try to hold off for a while, maybe it will get set
// otherwise signal that the user should try again
unsigned int i = 0;
while (t->m_tid < 0 && i++ < 5)
::usleep(0);
tid = t->m_tid;
}
DDebug(DebugAll,"ThreadPrivate::getAffinity() %s(%p) for TID:'%d'",t ? t->name : "self",t,tid);
if (-1 == tid)
return EAGAIN;
ret = sched_getaffinity(tid,sizeof(cpuSet),&cpuSet) ? errno : 0;
#endif
if (!ret) {
outMask.resize(sizeof(cpuSet));
uint8_t* bytes = (uint8_t*) outMask.data();
unsigned int lastSet = 0;
for (unsigned int i = 0; i < (sizeof(cpuSet) << 3); i++) {
if (!CPU_ISSET(i,&cpuSet))
continue;
bytes[i >> 3] |= (1 << (i & 7));
lastSet = i >> 3;
}
// cpu_set has CPU_SETSIZE(1024 bits)
// remove octets from the end that have no bit set
outMask.cut(outMask.length() - lastSet - 1);
}
return ret;
#endif /* _WINDOWS */
return ENOTSUP_ERR;
}
void ThreadPrivate::killall()
{
Debugger debug("ThreadPrivate::killall()");
@ -549,6 +753,11 @@ void* ThreadPrivate::startFunc(void* arg)
{
DDebug(DebugAll,"ThreadPrivate::startFunc(%p)",arg);
ThreadPrivate *t = reinterpret_cast<ThreadPrivate *>(arg);
#if !defined(_WINDOWS) && defined(SCHED_AFFINITY) && defined(SYS_gettid)
// get TID as early as possible if needed
t->m_tid = (pid_t)syscall(SYS_gettid);
DDebug(DebugAll,"Thread '%s' (%p) has TID:'%d'",t->m_name,t,t->m_tid);
#endif
t->run();
#ifdef _WINDOWS
t->m_running = false;
@ -624,6 +833,127 @@ const char* Thread::currentName()
return t ? t->m_name : 0;
}
bool Thread::parseCPUMask(const String& cpus, DataBlock& mask)
{
if (!cpus)
return false;
ObjList* cpuList = cpus.split(',',false);
bool err = false;
for (ObjList* o = cpuList->skipNull(); o; o = o->skipNext()) {
String* str = static_cast<String*>(o->get());
int pos = str->find('-');
int16_t cStart = 0;
int16_t cEnd = 0;
switch (pos) {
case -1:
cStart = cEnd = str->toInteger(-1);
if (cStart < 0)
err = true;
break;
case 0:
err = true;
break;
default:
cStart = str->substr(0,pos).toInteger(-1);
cEnd = str->substr(pos + 1).toInteger(-1);
if (cStart < 0 || cEnd < 0 || cEnd < cStart)
err = true;
break;
}
if (err)
break;
unsigned int needLen = (cEnd >> 3) + 1;
// adjust bitmask length to be able to set the highest bit
while (mask.length() < needLen) {
uint8_t b = 0;
mask.append(&b,1);
}
uint8_t* bytes = (uint8_t*)mask.data();
for (int16_t i = cStart; i <= cEnd; i++) {
uint8_t* byte = bytes + (i >> 3);
*byte |= 1 << (i & 7);
}
}
TelEngine::destruct(cpuList);
#ifdef DEBUG
String str;
str.hexify(mask.data(),mask.length());
Debug(DebugAll,"Thread::parseCPUMask() Parsed '%s' into bitmask: '%s'",cpus.c_str(),str.c_str());
#endif
return !err && mask.length();
}
void Thread::printCPUMask(const DataBlock& mask, String& str, bool hexa)
{
if (hexa) {
String c;
for (int i = mask.length() - 1; i >= 0; i--) {
c.hexify(mask.data(i),1);
str << " " << c;
}
str.trimBlanks();
}
else {
for (unsigned int i = 0; i < mask.length(); i++) {
uint8_t b = mask[i];
for (uint8_t j = 0; j < 8; j++)
if ((b & (1 << j))) {
if (str)
str << ",";
str << (uint32_t)((i << 3) + j);
}
}
}
}
int Thread::setAffinity(const String& cpus)
{
DataBlock bits;
if (!parseCPUMask(cpus,bits))
return EINVAL_ERR;
Lock lock(s_tmutex);
return ThreadPrivate::setAffinity(m_private,bits);
}
int Thread::setAffinity(const DataBlock& cpus)
{
Lock lock(s_tmutex);
return ThreadPrivate::setAffinity(m_private,cpus);
}
int Thread::getAffinity(DataBlock& outCpuMask)
{
Lock lock(s_tmutex);
return ThreadPrivate::getAffinity(m_private,outCpuMask);
}
int Thread::setCurrentAffinity(const String& cpus)
{
DataBlock bits;
if (!parseCPUMask(cpus,bits))
return EINVAL_ERR;
return ThreadPrivate::setAffinity(ThreadPrivate::current(),bits);
}
int Thread::setCurrentAffinity(const DataBlock& cpuMask)
{
return ThreadPrivate::setAffinity(ThreadPrivate::current(),cpuMask);
}
int Thread::getCurrentAffinity(DataBlock& outCpuMask)
{
return ThreadPrivate::getAffinity(ThreadPrivate::current(),outCpuMask);
}
int Thread::getCurrentAffinity(String& outCpus, bool hex)
{
DataBlock d;
if (int err = ThreadPrivate::getAffinity(ThreadPrivate::current(),d))
return err;
Thread::printCPUMask(d,outCpus,hex);
return 0;
}
NamedCounter* Thread::getObjCounter() const
{
return m_private ? m_private->m_counter : 0;

View File

@ -5973,6 +5973,31 @@ public:
*/
bool running() const;
/**
* Get the affinity mask of this thread
* @param outCpuMask Bit mask specifying CPUs on which the thread is running on. Bit 0 of octet 0 in DataBlock is CPU 0,
* bit 1 in octet 0 is CPU 1,..., bit 0 in octet 2 is CPU 8, etc.
* @return 0 on success, error otherwise
*/
int getAffinity(DataBlock& outCpuMask);
/**
* Set the affinity of this thread by using a string that specifies the
* allowed CPUs by listing them separated with commas or as ranges.
* Mixing ranges with list is allowed (e.g. 0,2,5-6,10)
* @param cpus String specifying CPUs on which this thread should run.
* @return 0 on success, error otherwise
*/
int setAffinity(const String& cpus);
/**
* Set the affinity of this thread
* @param mask Bit mask specifying allowed CPUs kept in a DataBlock. Bit 0 of octet 0 in DataBlock is CPU 0,
* bit 1 in octet 0 is CPU 1,..., bit 0 in octet 1 is CPU 8, etc.
* @return 0 on success, error otherwise
*/
int setAffinity(const DataBlock& mask);
/**
* Count how many Yate mutexes are kept locked by this thread
* @return Number of Mutex locks held by this thread
@ -5999,6 +6024,57 @@ public:
*/
static const char* currentName();
/**
* Get the affinity mask of current thread
* @param outCpuMask Bit mask specifying CPUs on which the current thread is running on. Bit 0 of octet 0 in DataBlock is CPU 0,
* bit 1 in octet 0 is CPU 1,..., bit 0 in octet 1 is CPU 8, etc.
* @return 0 on success, error otherwise
*/
static int getCurrentAffinity(DataBlock& outCpuMask);
/**
* Get the affinity mask of current thread
* @param outCpus String into which to put the affinity
* @param hex True to put it as octet string, false as comma-separated list of CPUs
* @return 0 on success, error otherwise
*/
static int getCurrentAffinity(String& outCpus, bool hex = false);
/**
* Set the affinity of the current thread by using a string that specifies the
* allowed CPUs by listing them separated with commas or as ranges.
* Mixing ranges with list is allowed (e.g. 0,2,5-6,10)
* @param cpus String specifying CPUs on which this thread should run.
* @return 0 on success, error otherwise
*/
static int setCurrentAffinity(const String& cpus);
/**
* Set the affinity of the current thread
* @param mask Bit mask specifying allowed CPUs kept in a DataBlock. Bit 0 of octet 0 in DataBlock is CPU 0,
* bit 1 in octet 0 is CPU 1,..., bit 0 in octet 1 is CPU 8, etc.
* @return 0 on success, error otherwise
*/
static int setCurrentAffinity(const DataBlock& mask);
/**
* Parse a CPU list into a bitmask held in a DataBlock.
* String is formated as a list of integers or integer ranges separated by commas..
* Mixing ranges with list is allowed (e.g. 0,2,5-6,10)
* @param cpus String specifying CPUs
* @param mask Output bitmask resulted from parsing.
* @return True if parsing succeeded, false otherwise
*/
static bool parseCPUMask(const String& cpus, DataBlock& mask);
/**
* Stringify the CPU mask
* @param mask Mask to stringify
* @param str Output string
* @param hexa Output as hexadecimal string if set, otherwise build a list of comma separated CPUs
*/
static void printCPUMask(const DataBlock& mask, String& str, bool hexa = true);
/**
* Give up the currently running timeslice. Note that on some platforms
* it also sleeps for the operating system's scheduler resolution
@ -6390,7 +6466,7 @@ public:
* Retrieve address family name
* @return Address family name
*/
inline const char* familyName()
inline const char* familyName() const
{ return lookupFamily(family()); }
/**