Added setting to automatically enter congested mode over a queue size threshold.

Added access to Engine's call accept state from Javascript and regexroute.
Optimized MessageDispatcher::messageCount() using the enqueue / dequeue counters.


git-svn-id: http://voip.null.ro/svn/yate@6163 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
paulc 2017-01-09 15:04:04 +00:00
parent e043cf9378
commit 2c615905ef
7 changed files with 46 additions and 6 deletions

View File

@ -51,6 +51,7 @@
; $(engine,NAME) = value of Engine's runtime parameter NAME
; $(runid) = the current Engine run identifier
; $(nodename) = the node name the Engine runs as, may be empty
; $(accepting) = call accept status in engine, one of "accept", "partial", "congestion" or "reject"
; $(threadname) = name of the thread that dispatched the message, may be empty
; $(dispatching) = the reentry depth, 0 if the message is not generated locally
; $(message,name) = name of the message handled

View File

@ -39,6 +39,11 @@
; Valid range 0 to 50000, default 0 (disable message rate check)
;maxmsgrate=0
; maxqueued: int: Message queue size threshold to declare engine congestion
; This parameter is reloadable
; Valid range 0 to 10000, default 0 (disable queue size check)
;maxqueued=0
; maxevents: int: Maximum number of events kept per type
; This parameter is reloadable
; Valid range 0 to 1000, default 25, 0 disables limit

View File

@ -236,6 +236,7 @@ static int s_minworkers = 1;
static int s_maxworkers = 10;
static int s_addworkers = 1;
static int s_maxmsgrate = 0;
static int s_maxqueued = 0;
static int s_exit = -1;
unsigned int Engine::s_congestion = 0;
static Mutex s_congMutex(false,"Congestion");
@ -1362,7 +1363,8 @@ unsigned int SharedVars::dec(const String& name, unsigned int wrap)
Engine::Engine()
: m_dispatchedLast(0), m_messageRate(0), m_maxMsgRate(0), m_rateCongested(false)
: m_dispatchedLast(0), m_messageRate(0), m_maxMsgRate(0),
m_rateCongested(false), m_queueCongested(false)
{
DDebug(DebugAll,"Engine::Engine() [%p]",this);
initUsrPath(s_usrpath);
@ -1452,6 +1454,7 @@ int Engine::engineInit()
s_maxworkers = s_cfg.getIntValue("general","maxworkers",s_maxworkers,s_minworkers,500);
s_addworkers = s_cfg.getIntValue("general","addworkers",s_addworkers,1,10);
s_maxmsgrate = s_cfg.getIntValue("general","maxmsgrate",s_maxmsgrate,0,50000);
s_maxqueued = s_cfg.getIntValue("general","maxqueued",s_maxqueued,0,10000);
s_maxevents = s_cfg.getIntValue("general","maxevents",s_maxevents,0,1000);
s_restarts = s_cfg.getIntValue("general","restarts");
m_dispatcher.warnTime(1000*(u_int64_t)s_cfg.getIntValue("general","warntime"));
@ -1482,6 +1485,7 @@ int Engine::engineInit()
s_params.addParam("maxworkers",String(s_maxworkers));
s_params.addParam("addworkers",String(s_addworkers));
s_params.addParam("maxmsgrate",String(s_maxmsgrate));
s_params.addParam("maxqueued",String(s_maxqueued));
s_params.addParam("maxevents",String(s_maxevents));
if (track)
s_params.addParam("trackparam",track);
@ -1584,6 +1588,8 @@ int Engine::run()
= s_cfg.getIntValue("general","addworkers",s_addworkers,1,10))));
s_params.setParam("maxmsgrate",String((s_maxmsgrate
= s_cfg.getIntValue("general","maxmsgrate",s_maxmsgrate,0,50000))));
s_params.setParam("maxqueued",String((s_maxqueued
= s_cfg.getIntValue("general","maxqueued",s_maxqueued,0,10000))));
s_params.setParam("maxevents",String((s_maxevents
= s_cfg.getIntValue("general","maxevents",s_maxevents,0,1000))));
initPlugins();
@ -1653,10 +1659,15 @@ int Engine::run()
m_dispatchedLast = disp;
if (m_maxMsgRate < m_messageRate)
m_maxMsgRate = m_messageRate;
bool rateCongested = s_maxmsgrate && (m_messageRate > (unsigned)s_maxmsgrate);
if (rateCongested != m_rateCongested) {
m_rateCongested = rateCongested;
setCongestion(rateCongested ? "message rate over limit" : 0);
bool cong = s_maxmsgrate && (m_messageRate > (unsigned)s_maxmsgrate);
if (cong != m_rateCongested) {
m_rateCongested = cong;
setCongestion(cong ? "message rate over limit" : 0);
}
cong = s_maxqueued && (m_dispatcher.messageCount() > (unsigned)s_maxqueued);
if (cong != m_queueCongested) {
m_queueCongested = cong;
setCongestion(cong ? "message queue over limit" : 0);
}
// Attempt to sleep until the next full second

View File

@ -552,7 +552,7 @@ void MessageDispatcher::dequeue()
unsigned int MessageDispatcher::messageCount()
{
Lock lock(this);
return m_messages.count();
return m_enqueueCount - m_dequeueCount;
}
unsigned int MessageDispatcher::handlerCount()

View File

@ -251,6 +251,7 @@ public:
params().addParam(new ExpFunction("debugAt"));
params().addParam(new ExpFunction("setDebug"));
params().addParam(new ExpFunction("started"));
params().addParam(new ExpFunction("accepting"));
if (name)
params().addParam(new ExpOperation(name,"name"));
params().addParam(new ExpWrapper(new JsShared(mtx),"shared"));
@ -1400,6 +1401,25 @@ bool JsEngine::runNative(ObjList& stack, const ExpOperation& oper, GenObject* co
return false;
ExpEvaluator::pushOne(stack,new ExpOperation(Engine::started()));
}
else if (oper.name() == YSTRING("accepting")) {
ObjList args;
switch (extractArgs(stack,oper,context,args)) {
case 0:
ExpEvaluator::pushOne(stack,new ExpOperation(
lookup(Engine::accept(),Engine::getCallAcceptStates())));
break;
case 1:
{
int arg = static_cast<ExpOperation*>(args[0])->toInteger(
Engine::getCallAcceptStates(),-1);
if ((Engine::Accept <= arg) && (Engine::Reject >= arg))
Engine::setAccept((Engine::CallAccept)arg);
}
break;
default:
return false;
}
}
else if (oper.name() == YSTRING("atob")) {
// str = Engine.atob(b64_str)
ObjList args;

View File

@ -428,6 +428,8 @@ static void evalFunc(String& str, Message& msg)
str = Engine::nodeName();
else if (str == YSTRING("threadname"))
str = Thread::currentName();
else if (str == YSTRING("accepting"))
str = lookup(Engine::accept(),Engine::getCallAcceptStates());
else if ((sep >= 0) && (str == YSTRING("transcode"))) {
str = par.substr(0,sep);
par = par.substr(sep+1).trimBlanks();

View File

@ -1631,6 +1631,7 @@ private:
unsigned int m_messageRate;
unsigned int m_maxMsgRate;
bool m_rateCongested;
bool m_queueCongested;
static Engine* s_self;
static String s_node;
static String s_shrpath;