diff --git a/conf.d/regexroute.conf.sample b/conf.d/regexroute.conf.sample index 2704247b..21a1f0fd 100644 --- a/conf.d/regexroute.conf.sample +++ b/conf.d/regexroute.conf.sample @@ -51,6 +51,7 @@ ; $(engine,NAME) = value of Engine's runtime parameter NAME ; $(runid) = the current Engine run identifier ; $(nodename) = the node name the Engine runs as, may be empty +; $(accepting) = call accept status in engine, one of "accept", "partial", "congestion" or "reject" ; $(threadname) = name of the thread that dispatched the message, may be empty ; $(dispatching) = the reentry depth, 0 if the message is not generated locally ; $(message,name) = name of the message handled diff --git a/conf.d/yate.conf.sample b/conf.d/yate.conf.sample index aabde635..83d5d44b 100644 --- a/conf.d/yate.conf.sample +++ b/conf.d/yate.conf.sample @@ -39,6 +39,11 @@ ; Valid range 0 to 50000, default 0 (disable message rate check) ;maxmsgrate=0 +; maxqueued: int: Message queue size threshold to declare engine congestion +; This parameter is reloadable +; Valid range 0 to 10000, default 0 (disable queue size check) +;maxqueued=0 + ; maxevents: int: Maximum number of events kept per type ; This parameter is reloadable ; Valid range 0 to 1000, default 25, 0 disables limit diff --git a/engine/Engine.cpp b/engine/Engine.cpp index 080cfc35..80f23511 100644 --- a/engine/Engine.cpp +++ b/engine/Engine.cpp @@ -236,6 +236,7 @@ static int s_minworkers = 1; static int s_maxworkers = 10; static int s_addworkers = 1; static int s_maxmsgrate = 0; +static int s_maxqueued = 0; static int s_exit = -1; unsigned int Engine::s_congestion = 0; static Mutex s_congMutex(false,"Congestion"); @@ -1362,7 +1363,8 @@ unsigned int SharedVars::dec(const String& name, unsigned int wrap) Engine::Engine() - : m_dispatchedLast(0), m_messageRate(0), m_maxMsgRate(0), m_rateCongested(false) + : m_dispatchedLast(0), m_messageRate(0), m_maxMsgRate(0), + m_rateCongested(false), m_queueCongested(false) { DDebug(DebugAll,"Engine::Engine() [%p]",this); initUsrPath(s_usrpath); @@ -1452,6 +1454,7 @@ int Engine::engineInit() s_maxworkers = s_cfg.getIntValue("general","maxworkers",s_maxworkers,s_minworkers,500); s_addworkers = s_cfg.getIntValue("general","addworkers",s_addworkers,1,10); s_maxmsgrate = s_cfg.getIntValue("general","maxmsgrate",s_maxmsgrate,0,50000); + s_maxqueued = s_cfg.getIntValue("general","maxqueued",s_maxqueued,0,10000); s_maxevents = s_cfg.getIntValue("general","maxevents",s_maxevents,0,1000); s_restarts = s_cfg.getIntValue("general","restarts"); m_dispatcher.warnTime(1000*(u_int64_t)s_cfg.getIntValue("general","warntime")); @@ -1482,6 +1485,7 @@ int Engine::engineInit() s_params.addParam("maxworkers",String(s_maxworkers)); s_params.addParam("addworkers",String(s_addworkers)); s_params.addParam("maxmsgrate",String(s_maxmsgrate)); + s_params.addParam("maxqueued",String(s_maxqueued)); s_params.addParam("maxevents",String(s_maxevents)); if (track) s_params.addParam("trackparam",track); @@ -1584,6 +1588,8 @@ int Engine::run() = s_cfg.getIntValue("general","addworkers",s_addworkers,1,10)))); s_params.setParam("maxmsgrate",String((s_maxmsgrate = s_cfg.getIntValue("general","maxmsgrate",s_maxmsgrate,0,50000)))); + s_params.setParam("maxqueued",String((s_maxqueued + = s_cfg.getIntValue("general","maxqueued",s_maxqueued,0,10000)))); s_params.setParam("maxevents",String((s_maxevents = s_cfg.getIntValue("general","maxevents",s_maxevents,0,1000)))); initPlugins(); @@ -1653,10 +1659,15 @@ int Engine::run() m_dispatchedLast = disp; if (m_maxMsgRate < m_messageRate) m_maxMsgRate = m_messageRate; - bool rateCongested = s_maxmsgrate && (m_messageRate > (unsigned)s_maxmsgrate); - if (rateCongested != m_rateCongested) { - m_rateCongested = rateCongested; - setCongestion(rateCongested ? "message rate over limit" : 0); + bool cong = s_maxmsgrate && (m_messageRate > (unsigned)s_maxmsgrate); + if (cong != m_rateCongested) { + m_rateCongested = cong; + setCongestion(cong ? "message rate over limit" : 0); + } + cong = s_maxqueued && (m_dispatcher.messageCount() > (unsigned)s_maxqueued); + if (cong != m_queueCongested) { + m_queueCongested = cong; + setCongestion(cong ? "message queue over limit" : 0); } // Attempt to sleep until the next full second diff --git a/engine/Message.cpp b/engine/Message.cpp index 1491fb48..5327fd64 100644 --- a/engine/Message.cpp +++ b/engine/Message.cpp @@ -552,7 +552,7 @@ void MessageDispatcher::dequeue() unsigned int MessageDispatcher::messageCount() { Lock lock(this); - return m_messages.count(); + return m_enqueueCount - m_dequeueCount; } unsigned int MessageDispatcher::handlerCount() diff --git a/modules/javascript.cpp b/modules/javascript.cpp index 180f0552..d5994417 100644 --- a/modules/javascript.cpp +++ b/modules/javascript.cpp @@ -251,6 +251,7 @@ public: params().addParam(new ExpFunction("debugAt")); params().addParam(new ExpFunction("setDebug")); params().addParam(new ExpFunction("started")); + params().addParam(new ExpFunction("accepting")); if (name) params().addParam(new ExpOperation(name,"name")); params().addParam(new ExpWrapper(new JsShared(mtx),"shared")); @@ -1400,6 +1401,25 @@ bool JsEngine::runNative(ObjList& stack, const ExpOperation& oper, GenObject* co return false; ExpEvaluator::pushOne(stack,new ExpOperation(Engine::started())); } + else if (oper.name() == YSTRING("accepting")) { + ObjList args; + switch (extractArgs(stack,oper,context,args)) { + case 0: + ExpEvaluator::pushOne(stack,new ExpOperation( + lookup(Engine::accept(),Engine::getCallAcceptStates()))); + break; + case 1: + { + int arg = static_cast(args[0])->toInteger( + Engine::getCallAcceptStates(),-1); + if ((Engine::Accept <= arg) && (Engine::Reject >= arg)) + Engine::setAccept((Engine::CallAccept)arg); + } + break; + default: + return false; + } + } else if (oper.name() == YSTRING("atob")) { // str = Engine.atob(b64_str) ObjList args; diff --git a/modules/regexroute.cpp b/modules/regexroute.cpp index a7e5f633..dd20fea7 100644 --- a/modules/regexroute.cpp +++ b/modules/regexroute.cpp @@ -428,6 +428,8 @@ static void evalFunc(String& str, Message& msg) str = Engine::nodeName(); else if (str == YSTRING("threadname")) str = Thread::currentName(); + else if (str == YSTRING("accepting")) + str = lookup(Engine::accept(),Engine::getCallAcceptStates()); else if ((sep >= 0) && (str == YSTRING("transcode"))) { str = par.substr(0,sep); par = par.substr(sep+1).trimBlanks(); diff --git a/yatengine.h b/yatengine.h index 2fdeb412..d11e0217 100644 --- a/yatengine.h +++ b/yatengine.h @@ -1631,6 +1631,7 @@ private: unsigned int m_messageRate; unsigned int m_maxMsgRate; bool m_rateCongested; + bool m_queueCongested; static Engine* s_self; static String s_node; static String s_shrpath;