Add command line argument for running YATE with a required affinity.

Add configuration for affinity of RTPGroup threads.



git-svn-id: http://yate.null.ro/svn/yate/trunk@6430 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
oana 2020-09-23 13:12:56 +00:00
parent 841314da5f
commit 664e1391a6
6 changed files with 40 additions and 8 deletions

View File

@ -68,6 +68,11 @@
; It is a bad idea to set a low priority for anything but testing
;thread=normal
; affinity: list: Set the CPUs on which this thread is allowed to run on.
; It is a comma-separated lists of CPU numbers and/or CPU ranges (e.g: 1,3,12-15)
; If not set, it will use the process affinity
;affinity=
; defsleep: int: Default in-loop sleep time for new RTP sessions in milliseconds
;defsleep=5

View File

@ -275,6 +275,7 @@ int Engine::s_haltcode = -1;
int EnginePrivate::count = 0;
static String s_cfgpath(CFG_PATH);
static String s_usrpath;
static String s_affinity;
static bool s_createusr = true;
static bool s_init = false;
static bool s_dynplugin = false;
@ -1571,6 +1572,8 @@ int Engine::engineInit()
s_params.addParam("maxmsgage",String(s_maxmsgage));
s_params.addParam("maxqueued",String(s_maxqueued));
s_params.addParam("maxevents",String(s_maxevents));
if (s_affinity)
s_params.addParam("affinity",s_affinity);
int nodeBits = s_cfg.getIntValue("general","nodebits",0,0,10);
if (nodeBits) {
@ -2527,6 +2530,7 @@ static void usage(bool client, FILE* f)
" -x dirpath Absolute or relative path to extra modules directory (can be repeated)\n"
" -w directory Change working directory\n"
" -N nodename Set the name of this node in a cluster\n"
" -A cpus Set affinity from comma separated list of CPUs (e.g 1-4,7,8)\n"
#ifdef RLIMIT_CORE
" -C Enable core dumps if possible\n"
#endif
@ -2757,6 +2761,10 @@ int Engine::main(int argc, const char** argv, const char** env, RunMode mode, En
GET_PARAM;
s_node = param;
break;
case 'A':
GET_PARAM;
s_affinity = param;
break;
#ifdef RLIMIT_CORE
case 'C':
s_coredump = true;
@ -3021,6 +3029,13 @@ int Engine::main(int argc, const char** argv, const char** env, RunMode mode, En
}
#endif
if (s_affinity) {
int err = Thread::setCurrentAffinity(s_affinity);
if (err) {
Debug(DebugWarn,"Failed to set affinity to '%s', error=%s(%d)",s_affinity.c_str(),strerror(err),err);
s_affinity.clear();
}
}
int retcode = -1;
#ifndef _WINDOWS
if (supervised)

View File

@ -728,15 +728,15 @@ RTPTransport* UDPSession::createTransport()
return trans;
}
bool UDPSession::initGroup(int msec, Thread::Priority prio)
bool UDPSession::initGroup(int msec, Thread::Priority prio, const String& affinity)
{
if (m_group)
return true;
// try to pick the grop from the transport if it has one
// try to pick the group from the transport if it has one
if (m_transport)
group(m_transport->group());
if (!m_group)
group(new RTPGroup(msec,prio));
group(new RTPGroup(msec,prio,affinity));
if (!m_group)
return false;
if (m_transport)

View File

@ -19,6 +19,7 @@
*/
#include <yatertp.h>
#include <string.h>
#define BUF_SIZE 1500
@ -42,7 +43,7 @@ static inline void setScopeId(const SocketAddr& local, SocketAddr& sa1,
}
RTPGroup::RTPGroup(int msec, Priority prio)
RTPGroup::RTPGroup(int msec, Priority prio, const String& affinity)
: Mutex(true,"RTPGroup"),
Thread("RTP Group",prio), m_listChanged(false)
{
@ -52,6 +53,12 @@ RTPGroup::RTPGroup(int msec, Priority prio)
if (msec > 50)
msec = 50;
m_sleep = msec;
if (affinity) {
int err = setAffinity(affinity);
if (err)
Debug(DebugWarn,"Failed to set affinity to '%s', error=%s(%d) [%p]",
affinity.c_str(),::strerror(err),err,this);
}
}
RTPGroup::~RTPGroup()

View File

@ -202,8 +202,9 @@ public:
* Constructor
* @param msec Minimum time to sleep in loop in milliseconds
* @param prio Thread priority to run this group
* @param affinity Comma-separated list of CPUs and/or CPU range on which the thread should run on
*/
RTPGroup(int msec = 0, Priority prio = Normal);
RTPGroup(int msec = 0, Priority prio = Normal, const String& affinity = String::empty());
/**
* Group destructor, removes itself from all remaining processors
@ -973,9 +974,10 @@ public:
* Initialize the RTP session, attach a group if none is present
* @param msec Minimum time to sleep in group loop in milliseconds
* @param prio Thread priority to run the new group
* @param affinity Comma-separated list of CPUs and/or CPU range on which the thread should run on
* @return True if initialized, false on some failure
*/
bool initGroup(int msec = 0, Thread::Priority prio = Thread::Normal);
bool initGroup(int msec = 0, Thread::Priority prio = Thread::Normal, const String& affinity = String::empty());
/**
* Set the remote network address of the RTP transport of this session

View File

@ -97,6 +97,7 @@ static bool s_rtcp = true;
static bool s_drill = false;
static Thread::Priority s_priority = Thread::Normal;
static String s_affinity;
static int s_tos = Socket::Normal;
static int s_udpbuf = 0;
static int s_sleep = 5;
@ -735,7 +736,7 @@ bool YRTPWrapper::startRTP(const char* raddr, unsigned int rport, Message& msg)
m_consumer->deref();
}
}
if (!(m_rtp->initGroup(msec,Thread::priority(msg.getValue(YSTRING("thread")),s_priority)) &&
if (!(m_rtp->initGroup(msec,Thread::priority(msg.getValue(YSTRING("thread")),s_priority),msg.getValue(YSTRING("affinity"),s_affinity)) &&
m_rtp->direction(m_dir)))
return false;
@ -796,7 +797,8 @@ bool YRTPWrapper::startUDPTL(const char* raddr, unsigned int rport, Message& msg
int msec = msg.getIntValue(YSTRING("msleep"),s_sleep);
if (!setRemote(raddr,rport,msg))
return false;
if (!m_udptl->initGroup(msec,Thread::priority(msg.getValue(YSTRING("thread")),s_priority)))
if (!m_udptl->initGroup(msec,Thread::priority(msg.getValue(YSTRING("thread")),s_priority),
msg.getValue(YSTRING("affinity"),s_affinity)))
return false;
m_udptl->setTOS(tos);
@ -1914,6 +1916,7 @@ void YRTPPlugin::initialize()
s_sleep = cfg.getIntValue("general","defsleep",5);
RTPGroup::setMinSleep(cfg.getIntValue("general","minsleep"));
s_priority = Thread::priority(cfg.getValue("general","thread"));
s_affinity = cfg.getValue("general","affinity");
s_rtpWarnSeq = cfg.getBoolValue("general","rtp_warn_seq",true);
s_timeout = cfg.getIntValue("timeouts","timeout",3000);
s_udptlTimeout = cfg.getIntValue("timeouts","udptl_timeout",25000);