From 8bb8e83faf0e5189458e55494c8f6b5bf813f92d Mon Sep 17 00:00:00 2001 From: oana Date: Wed, 23 Sep 2020 15:26:34 +0000 Subject: [PATCH] Add YATE library support for setting and getting the affinity of a thread. git-svn-id: http://voip.null.ro/svn/yate@6429 acf43c95-373e-0410-b603-e72c3f656dc1 --- configure.ac | 40 ++++++ engine/Makefile.in | 2 +- engine/Thread.cpp | 332 ++++++++++++++++++++++++++++++++++++++++++++- yateclass.h | 78 ++++++++++- 4 files changed, 449 insertions(+), 3 deletions(-) diff --git a/configure.ac b/configure.ac index 98215e02..0e71c814 100644 --- a/configure.ac +++ b/configure.ac @@ -345,6 +345,46 @@ fi AC_MSG_RESULT([$have_pthread_kill]) AC_SUBST(THREAD_KILL) +THREAD_AFFINITY="" +AC_MSG_CHECKING([for pthread_setaffinity_np declaration]) +AC_LANG_SAVE +AC_LANG_C +SAVE_CFLAGS="$CFLAGS" +CFLAGS="$CFLAGS -Wall -Werror" +AC_TRY_COMPILE([ +#define _GNU_SOURCE +#include +],[ + cpu_set_t cpuset; + pthread_setaffinity_np(pthread_self(),sizeof(cpuset),&cpuset); +], + have_pthread_affinity="yes", + have_pthread_affinity="no" +) +AC_MSG_RESULT([$have_pthread_affinity]) +if [[ "x$have_pthread_affinity" = "xyes" ]]; then + THREAD_AFFINITY="-DTHREAD_AFFINITY" +else + AC_MSG_CHECKING([for sched_setaffinity declaration]) + AC_TRY_COMPILE([ + #define _GNU_SOURCE + #include + ],[ + cpu_set_t cpuset; + sched_setaffinity(0,sizeof(cpuset),&cpuset); + ], + have_sched_affinity="yes", + have_sched_affinity="no" + ) + if [[ "x$have_sched_affinity" = "xyes" ]]; then + THREAD_AFFINITY="-DSCHED_AFFINITY" + fi + AC_MSG_RESULT([$have_sched_affinity]) +fi +CFLAGS="$SAVE_CFLAGS" +AC_LANG_RESTORE +AC_SUBST(THREAD_AFFINITY) + # Check for compile options INLINE_FLAGS="" AC_ARG_ENABLE(inline,AC_HELP_STRING([--enable-inline],[Enable inlining of functions]),want_inline=$enableval,want_inline=auto) diff --git a/engine/Makefile.in b/engine/Makefile.in index ffb90922..c9a45164 100644 --- a/engine/Makefile.in +++ b/engine/Makefile.in @@ -112,7 +112,7 @@ Mutex.o: @srcdir@/Mutex.cpp $(MKDEPS) $(CINC) $(COMPILE) @MUTEX_HACK@ -c $< Thread.o: @srcdir@/Thread.cpp $(MKDEPS) $(CINC) - $(COMPILE) @THREAD_KILL@ @HAVE_PRCTL@ -c $< + $(COMPILE) @THREAD_KILL@ @THREAD_AFFINITY@ @HAVE_PRCTL@ -c $< TelEngine.o: @srcdir@/TelEngine.cpp $(MKDEPS) $(CINC) $(COMPILE) @ATOMIC_OPS@ @HAVE_GMTOFF@ @HAVE_INT_TZ@ -c $< diff --git a/engine/Thread.cpp b/engine/Thread.cpp index 4cf7120f..b62626cb 100644 --- a/engine/Thread.cpp +++ b/engine/Thread.cpp @@ -24,7 +24,26 @@ #ifdef _WINDOWS #include typedef unsigned long HTHREAD; + +#define EINVAL_ERR ERROR_INVALID_PARAMETER +#define ENOTSUP_ERR ERROR_NOT_SUPPORTED +#define EINVALTHR_ERR ERROR_INVALID_HANDLE + #else + +#if defined(THREAD_AFFINITY) || defined(SCHED_AFFINITY) +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif +#ifdef SCHED_AFFINITY +#include +#endif +#endif + +#define EINVAL_ERR EINVAL +#define ENOTSUP_ERR ENOTSUP +#define EINVALTHR_ERR ESRCH + #include typedef pthread_t HTHREAD; #ifndef PTHREAD_EXPLICIT_SCHED @@ -66,8 +85,13 @@ public: static ThreadPrivate* create(Thread* t,const char* name,Thread::Priority prio); static void killall(); static ThreadPrivate* current(); + static int setAffinity(ThreadPrivate* t, const DataBlock& bits); + static int getAffinity(ThreadPrivate* t, DataBlock& outMask); Thread* m_thread; HTHREAD thread; +#if !defined(_WINDOWS) && defined(SCHED_AFFINITY) + pid_t m_tid; +#endif NamedCounter* m_counter; bool m_running; bool m_started; @@ -76,6 +100,7 @@ public: const char* m_name; #ifdef _WINDOWS static void startFunc(void* arg); + DWORD_PTR m_affinityMask; #else static void* startFunc(void* arg); #endif @@ -105,8 +130,14 @@ static DWORD getTls() abort(); return tls_index; } + +static inline bool haveAffinity() +{ + // TODO handle detection of support on Windows + return true; +} #else /* _WINDOWS */ -static pthread_key_t current_key; +static pthread_key_t current_key = PTHREAD_KEYS_MAX; // this signifies invalid key class ThreadPrivateKeyAlloc { @@ -120,6 +151,14 @@ public: } }; +static inline bool haveAffinity() +{ +#if defined(THREAD_AFFINITY) || defined(SCHED_AFFINITY) + return true; +#endif + return false; +} + static ThreadPrivateKeyAlloc keyAllocator; #endif /* _WINDOWS */ @@ -242,6 +281,22 @@ ThreadPrivate::ThreadPrivate(Thread* t,const char* name) { #ifdef DEBUG Debugger debug("ThreadPrivate::ThreadPrivate","(%p,\"%s\") [%p]",t,name,this); +#endif +#if defined(_WINDOWS) + m_affinityMask = 0; + DWORD_PTR sysAffin = 0; + if (!::GetProcessAffinityMask(::GetCurrentProcess(),&m_affinityMask,&sysAffin)) { + int err = ::GetLastError(); + Debug(DebugNote,"ThreadPrivate::ThreadPrivate(%p,\"%s\") - " + "Failed to get process affinity, err=%d [%p]",t,name,err,this); + } +#ifdef DEBUG + else + Debug(DebugAll,"ThreadPrivate::ThreadPrivate(%p,\"%s\") - Process affinity = %lx," + " system affinity = %lx [%p]",t,name,m_affinityMask,sysAffin,this); +#endif +#elif defined(SCHED_AFFINITY) + m_tid = -1; #endif // Inherit object counter of creating thread m_counter = Thread::getCurrentObjCounter(true); @@ -425,6 +480,155 @@ ThreadPrivate* ThreadPrivate::current() #endif } +int ThreadPrivate::setAffinity(ThreadPrivate* t, const DataBlock& cpuMask) +{ +#ifdef DEBUG + String str; + str.hexify(cpuMask.data(),cpuMask.length()); + Debug(DebugAll,"ThreadPrivate::setAffinity() %s(%p) to affinity mask:'%s'", + t ? t->m_name : "self",t,str.c_str()); +#endif + if (!haveAffinity()) + return ENOTSUP_ERR; + if (!cpuMask.length()) + return EINVAL_ERR; + +#ifdef _WINDOWS + + DWORD_PTR mask = 0; + if (sizeof(mask) < cpuMask.length()) + Debug(DebugNote,"CPU affinity mask is '%u' long, permitted length is '%u', ignoring exceeding bits", + cpuMask.length() * 8, sizeof(DWORD_PTR) * 8); + for (unsigned int i = 0; i < sizeof(mask) && i < cpuMask.length(); i++) + mask |= (cpuMask.at(i) << (i << 3)); + if (!(mask = ::SetThreadAffinityMask(t ?reinterpret_cast(t->thread) : ::GetCurrentThread(),mask))) + return ::GetLastError(); + if (t) + t->m_affinityMask = mask; + return 0; + +#elif defined(THREAD_AFFINITY) || defined(SCHED_AFFINITY) + + cpu_set_t cpuSet; + CPU_ZERO(&cpuSet); + uint8_t* bytes = (uint8_t*)cpuMask.data(); + unsigned int len = cpuMask.length(); + for (unsigned int i = 0; i < len; i++) { + uint8_t b = bytes[i]; + for (uint8_t j = 0; j < 8; j++) + if ((b & (1 << j))) + CPU_SET(((i << 3) + j),&cpuSet); + } + +#ifdef THREAD_AFFINITY + return pthread_setaffinity_np(t ? t->thread : pthread_self(),sizeof(cpuSet),&cpuSet); +#else + pid_t tid = -1; + if (!t) + tid = (pid_t)syscall(SYS_gettid); + else { + // there is a race between getting the TID and this call + // try to hold off for a while, maybe it will get set + // otherwise signal that the user should try again + unsigned int i = 0; + while (t->m_tid < 0 && i++ < 5) + ::usleep(0); + tid = t->m_tid; + } + DDebug(DebugAll,"ThreadPrivate::setAffinity() %s(%p) for TID:'%d'",t ? t->m_name : "self",t,tid); + if (-1 == tid) + return EAGAIN; + return sched_setaffinity(tid,sizeof(cpuSet),&cpuSet) ? errno : 0; +#endif + +#endif /* _WINDOWS */ + return ENOTSUP_ERR; +} + +int ThreadPrivate::getAffinity(ThreadPrivate* t, DataBlock& outMask) +{ + if (!haveAffinity()) + return ENOTSUP_ERR; + +#ifdef _WINDOWS + + HANDLE thr = t ? reinterpret_cast(t->thread) : ::GetCurrentThread(); + DWORD_PTR setMask = 0; + if (t) { + setMask = t->m_affinityMask; + } + else { + DWORD_PTR sysAffin = 0; + if (!::GetProcessAffinityMask(::GetCurrentProcess(),&setMask,&sysAffin)) { + int err = ::GetLastError(); + Debug(DebugNote,"ThreadPrivate::getAffinity(t=%p) - " + "Failed to get process affinity, err=%d [%p]",t,err); + } + } + DWORD_PTR mask = ::SetThreadAffinityMask(thr,setMask); + if (!mask) + return ::GetLastError(); + if (mask != setMask) {// maybe mask was changed by something external, restore it + if (!(::SetThreadAffinityMask(thr,mask))) { + Debug(DebugNote,"Failed to restore thread CPU affinity to '%lx', " + "now set to '%lx' [%p] for thread '%s' [%p]",mask,setMask,t ? t->m_name : "self",t); + return ::GetLastError(); + } + } + outMask.resize(sizeof(mask)); + uint8_t* bytes = (uint8_t*)outMask.data(); + for (unsigned int i = 0; i < sizeof(mask) ; i++) { + *bytes++ = (uint8_t)(mask >> (i << 3)) & 0xff; + } + return 0; + +#elif defined(THREAD_AFFINITY) || defined(SCHED_AFFINITY) + + cpu_set_t cpuSet; + CPU_ZERO(&cpuSet); + int ret = 0; + +#ifdef THREAD_AFFINITY + ret = pthread_getaffinity_np(t ? t->thread : pthread_self(),sizeof(cpuSet),&cpuSet); +#else + pid_t tid = -1; + if (!t) + tid = (pid_t)syscall(SYS_gettid); + else { + // there is a race between getting the TID and this call + // try to hold off for a while, maybe it will get set + // otherwise signal that the user should try again + unsigned int i = 0; + while (t->m_tid < 0 && i++ < 5) + ::usleep(0); + tid = t->m_tid; + } + DDebug(DebugAll,"ThreadPrivate::getAffinity() %s(%p) for TID:'%d'",t ? t->name : "self",t,tid); + if (-1 == tid) + return EAGAIN; + ret = sched_getaffinity(tid,sizeof(cpuSet),&cpuSet) ? errno : 0; +#endif + + if (!ret) { + outMask.resize(sizeof(cpuSet)); + uint8_t* bytes = (uint8_t*) outMask.data(); + unsigned int lastSet = 0; + for (unsigned int i = 0; i < (sizeof(cpuSet) << 3); i++) { + if (!CPU_ISSET(i,&cpuSet)) + continue; + bytes[i >> 3] |= (1 << (i & 7)); + lastSet = i >> 3; + } + // cpu_set has CPU_SETSIZE(1024 bits) + // remove octets from the end that have no bit set + outMask.cut(outMask.length() - lastSet - 1); + } + + return ret; +#endif /* _WINDOWS */ + return ENOTSUP_ERR; +} + void ThreadPrivate::killall() { Debugger debug("ThreadPrivate::killall()"); @@ -549,6 +753,11 @@ void* ThreadPrivate::startFunc(void* arg) { DDebug(DebugAll,"ThreadPrivate::startFunc(%p)",arg); ThreadPrivate *t = reinterpret_cast(arg); +#if !defined(_WINDOWS) && defined(SCHED_AFFINITY) && defined(SYS_gettid) + // get TID as early as possible if needed + t->m_tid = (pid_t)syscall(SYS_gettid); + DDebug(DebugAll,"Thread '%s' (%p) has TID:'%d'",t->m_name,t,t->m_tid); +#endif t->run(); #ifdef _WINDOWS t->m_running = false; @@ -624,6 +833,127 @@ const char* Thread::currentName() return t ? t->m_name : 0; } +bool Thread::parseCPUMask(const String& cpus, DataBlock& mask) +{ + if (!cpus) + return false; + ObjList* cpuList = cpus.split(',',false); + bool err = false; + for (ObjList* o = cpuList->skipNull(); o; o = o->skipNext()) { + String* str = static_cast(o->get()); + int pos = str->find('-'); + int16_t cStart = 0; + int16_t cEnd = 0; + switch (pos) { + case -1: + cStart = cEnd = str->toInteger(-1); + if (cStart < 0) + err = true; + break; + case 0: + err = true; + break; + default: + cStart = str->substr(0,pos).toInteger(-1); + cEnd = str->substr(pos + 1).toInteger(-1); + if (cStart < 0 || cEnd < 0 || cEnd < cStart) + err = true; + break; + } + if (err) + break; + unsigned int needLen = (cEnd >> 3) + 1; + // adjust bitmask length to be able to set the highest bit + while (mask.length() < needLen) { + uint8_t b = 0; + mask.append(&b,1); + } + uint8_t* bytes = (uint8_t*)mask.data(); + for (int16_t i = cStart; i <= cEnd; i++) { + uint8_t* byte = bytes + (i >> 3); + *byte |= 1 << (i & 7); + } + } + TelEngine::destruct(cpuList); +#ifdef DEBUG + String str; + str.hexify(mask.data(),mask.length()); + Debug(DebugAll,"Thread::parseCPUMask() Parsed '%s' into bitmask: '%s'",cpus.c_str(),str.c_str()); +#endif + return !err && mask.length(); +} + +void Thread::printCPUMask(const DataBlock& mask, String& str, bool hexa) +{ + if (hexa) { + String c; + for (int i = mask.length() - 1; i >= 0; i--) { + c.hexify(mask.data(i),1); + str << " " << c; + } + str.trimBlanks(); + } + else { + for (unsigned int i = 0; i < mask.length(); i++) { + uint8_t b = mask[i]; + for (uint8_t j = 0; j < 8; j++) + if ((b & (1 << j))) { + if (str) + str << ","; + str << (uint32_t)((i << 3) + j); + } + } + } +} + +int Thread::setAffinity(const String& cpus) +{ + DataBlock bits; + if (!parseCPUMask(cpus,bits)) + return EINVAL_ERR; + Lock lock(s_tmutex); + return ThreadPrivate::setAffinity(m_private,bits); +} + +int Thread::setAffinity(const DataBlock& cpus) +{ + Lock lock(s_tmutex); + return ThreadPrivate::setAffinity(m_private,cpus); +} + +int Thread::getAffinity(DataBlock& outCpuMask) +{ + Lock lock(s_tmutex); + return ThreadPrivate::getAffinity(m_private,outCpuMask); +} + +int Thread::setCurrentAffinity(const String& cpus) +{ + DataBlock bits; + if (!parseCPUMask(cpus,bits)) + return EINVAL_ERR; + return ThreadPrivate::setAffinity(ThreadPrivate::current(),bits); +} + +int Thread::setCurrentAffinity(const DataBlock& cpuMask) +{ + return ThreadPrivate::setAffinity(ThreadPrivate::current(),cpuMask); +} + +int Thread::getCurrentAffinity(DataBlock& outCpuMask) +{ + return ThreadPrivate::getAffinity(ThreadPrivate::current(),outCpuMask); +} + +int Thread::getCurrentAffinity(String& outCpus, bool hex) +{ + DataBlock d; + if (int err = ThreadPrivate::getAffinity(ThreadPrivate::current(),d)) + return err; + Thread::printCPUMask(d,outCpus,hex); + return 0; +} + NamedCounter* Thread::getObjCounter() const { return m_private ? m_private->m_counter : 0; diff --git a/yateclass.h b/yateclass.h index 7ca62c6f..ea51f560 100644 --- a/yateclass.h +++ b/yateclass.h @@ -5973,6 +5973,31 @@ public: */ bool running() const; + /** + * Get the affinity mask of this thread + * @param outCpuMask Bit mask specifying CPUs on which the thread is running on. Bit 0 of octet 0 in DataBlock is CPU 0, + * bit 1 in octet 0 is CPU 1,..., bit 0 in octet 2 is CPU 8, etc. + * @return 0 on success, error otherwise + */ + int getAffinity(DataBlock& outCpuMask); + + /** + * Set the affinity of this thread by using a string that specifies the + * allowed CPUs by listing them separated with commas or as ranges. + * Mixing ranges with list is allowed (e.g. 0,2,5-6,10) + * @param cpus String specifying CPUs on which this thread should run. + * @return 0 on success, error otherwise + */ + int setAffinity(const String& cpus); + + /** + * Set the affinity of this thread + * @param mask Bit mask specifying allowed CPUs kept in a DataBlock. Bit 0 of octet 0 in DataBlock is CPU 0, + * bit 1 in octet 0 is CPU 1,..., bit 0 in octet 1 is CPU 8, etc. + * @return 0 on success, error otherwise + */ + int setAffinity(const DataBlock& mask); + /** * Count how many Yate mutexes are kept locked by this thread * @return Number of Mutex locks held by this thread @@ -5999,6 +6024,57 @@ public: */ static const char* currentName(); + /** + * Get the affinity mask of current thread + * @param outCpuMask Bit mask specifying CPUs on which the current thread is running on. Bit 0 of octet 0 in DataBlock is CPU 0, + * bit 1 in octet 0 is CPU 1,..., bit 0 in octet 1 is CPU 8, etc. + * @return 0 on success, error otherwise + */ + static int getCurrentAffinity(DataBlock& outCpuMask); + + /** + * Get the affinity mask of current thread + * @param outCpus String into which to put the affinity + * @param hex True to put it as octet string, false as comma-separated list of CPUs + * @return 0 on success, error otherwise + */ + static int getCurrentAffinity(String& outCpus, bool hex = false); + + /** + * Set the affinity of the current thread by using a string that specifies the + * allowed CPUs by listing them separated with commas or as ranges. + * Mixing ranges with list is allowed (e.g. 0,2,5-6,10) + * @param cpus String specifying CPUs on which this thread should run. + * @return 0 on success, error otherwise + */ + static int setCurrentAffinity(const String& cpus); + + /** + * Set the affinity of the current thread + * @param mask Bit mask specifying allowed CPUs kept in a DataBlock. Bit 0 of octet 0 in DataBlock is CPU 0, + * bit 1 in octet 0 is CPU 1,..., bit 0 in octet 1 is CPU 8, etc. + * @return 0 on success, error otherwise + */ + static int setCurrentAffinity(const DataBlock& mask); + + /** + * Parse a CPU list into a bitmask held in a DataBlock. + * String is formated as a list of integers or integer ranges separated by commas.. + * Mixing ranges with list is allowed (e.g. 0,2,5-6,10) + * @param cpus String specifying CPUs + * @param mask Output bitmask resulted from parsing. + * @return True if parsing succeeded, false otherwise + */ + static bool parseCPUMask(const String& cpus, DataBlock& mask); + + /** + * Stringify the CPU mask + * @param mask Mask to stringify + * @param str Output string + * @param hexa Output as hexadecimal string if set, otherwise build a list of comma separated CPUs + */ + static void printCPUMask(const DataBlock& mask, String& str, bool hexa = true); + /** * Give up the currently running timeslice. Note that on some platforms * it also sleeps for the operating system's scheduler resolution @@ -6390,7 +6466,7 @@ public: * Retrieve address family name * @return Address family name */ - inline const char* familyName() + inline const char* familyName() const { return lookupFamily(family()); } /**