Added support for CPU loading notification, and engine call congestion level update, based on CPU loading.

git-svn-id: http://yate.null.ro/svn/yate/trunk@3905 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
andrei 2010-12-03 16:02:11 +00:00
parent ada2affb79
commit bef3d8f44d
7 changed files with 1246 additions and 1 deletions

View File

@ -0,0 +1,21 @@
; This file is used by the ccongestion module
[cpu]
; This section keeps the settings for CPU load
; Each parameter needs to match this type:
; monitor=interval_name,threshold_value,hysteresis_value;.....
; Where :
; monitor - String: Represents the CPU monitor for the given interval. Can be one of the following :
; -systemLoad: The system CPU load
; -totalLoad: Overall Yate CPU load
; -userLoad: Yate user-space CPU load
; -kernelLoad: Yate kernel-space CPU load
; interval_name : string: The name of the interval. Four values: accept, partial, congestion, reject.
; threshold_value: int: The upper value of the interval.
; hysteresis_value: int.
; Examples:
; systemLoad=accept,50,4;partial,65,2;congestion,80,2;reject
; totalLoad=accept,40,4;partial,50,2;congestion,70,2;reject
; kernelLoad=accept,25,4;partial,35,2;congestion,60,2;reject
; userLoad=accept,25,4;partial,35,2;congestion,60,2;reject

View File

@ -0,0 +1,84 @@
; NOTE this file is used to setup CPU load information's
; The cpuload module inform YATE about system total CPU load and YATE CPU load
;[general]
; This section keeps the settings for system and Yate.
; interval: int: The interval in milliseconds for CPU load check.
; Note: Minimum value is 1s.
; Default value 1s
; interval=1000
; oscillate_interval : int: Time in milliseconds that stop this module from notifying about CPU load,
; if the CPU load is oscillating between consecutive intervals
; NOTE! oscillate_interval value should be at least 2 * interval
; oscillate_interval=5000
; core_number: integer: The number of CPU cores
; NOTE! Yate will detect the number of cores from "/proc/stat". If the file does not exists you should
; set the core number to avoid that yate CPU load to be bigger than 100%
; core_number=1
; smooth : integer between 5 and 50 : Value used to smooth the CPU loading.
; smooth = 33
;The following sections represent the CPU load managers.
; NOTE!!! Each parameter form the following sections need to match this type:
; target_name=interval_name,threshold_value,hysteresis_value;......
; target_name: string: The name if the target.
; interval_name : string: The name of the interval.
; threshold_value: int: The upper value of the interval.
; hysteresis_value: int.
; Examples: engine=accept,50,4;partial,65,2;congestion,80,2;reject
; The target is "engine";
; Interval names: all, partial,congestion,none;
; Threshold value: 50, 65, 80
; from this results two sets of intervals:
; one when the CPU load is growing :
; accept = 0-54
; partial = 54 - 67
; congestion = 67 - 82
; reject = 82 - 100
; two when CPU load is lowering
; accept = 0-46
; partial = 46 - 63
; congestion = 63 - 78
; reject = 78 - 100
; NOTE! : Parameters can be appended with chan.control message and with rmanager command
; Example: rmanager
; "control cpuload section target_name=interval_name,threshold_value,hysteresis_value;......"
; where "section" represents one from the sections above
; Example: chan.control
; Message* m = new Message("chan.control");
; m->addParam("targetid","cpuload");
; m->addParam("component","cpuload");
; m->addParam("operation",monitor); // One of the following : kernelLoad,userLoad,totalLoad,systemLoad
; m->addParam("engine","interval_name,threshold_value,hysteresis_value;...... ");
; Engine::enqueue(m); // Engine::dispatch(m);
; NOTE! : Target "engine" is reserved for engine call accept. If you want to modify engine monitors
; or intervals please do it form manager.conf
;[kernelLoad]
; YATE CPU kernel-space load settings
;[userLoad]
; YATE CPU user-space load settings
;[totalLoad]
; YATE CPU load settings
;[systemLoad]
; System CPU load settings

View File

@ -198,6 +198,7 @@ ObjList Engine::s_extramod;
NamedList Engine::s_params("");
Engine::RunMode Engine::s_mode = Engine::Stopped;
Engine::CallAccept Engine::s_accept = Engine::Accept;
Engine* Engine::s_self = 0;
int Engine::s_haltcode = -1;
int EnginePrivate::count = 0;
@ -210,6 +211,14 @@ static Engine::PluginMode s_loadMode = Engine::LoadFail;
static int s_maxworkers = 10;
static bool s_debug = true;
const TokenDict Engine::s_callAccept[] = {
{"accept", Engine::Accept},
{"partial", Engine::Partial},
{"congestion", Engine::Congestion},
{"reject", Engine::Reject},
{0,0}
};
#ifdef RLIMIT_CORE
static bool s_coredump = false;
#endif
@ -324,6 +333,7 @@ bool EngineStatusHandler::received(Message &msg)
msg.retValue() << ",locks=" << Mutex::locks();
msg.retValue() << ",semaphores=" << Semaphore::count();
msg.retValue() << ",waiting=" << Semaphore::locks();
msg.retValue() << ",acceptcalls=" << lookup(Engine::accept(),Engine::getCallAcceptStates());
msg.retValue() << "\r\n";
return false;
}

View File

@ -59,7 +59,7 @@ PROGS := cdrbuild.yate cdrfile.yate regexroute.yate \
server/pbxassist.yate server/dbpbx.yate server/lateroute.yate \
server/park.yate server/queues.yate server/queuesnotify.yate \
server/regfile.yate server/accfile.yate server/register.yate \
server/callcounters.yate \
server/callcounters.yate server/cpuload.yate server/ccongestion.yate \
server/dbwave.yate \
server/yradius.yate \
server/ysnmpagent.yate \

View File

@ -0,0 +1,177 @@
/**
* ccongestion.cpp
* This file is part of the YATE Project http://YATE.null.ro
*
* Update call accept engine status from installed engine's monitors
*
* Yet Another Telephony Engine - a fully featured software PBX and IVR
* Copyright (C) 2004-2010 Null Team
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include <yatephone.h>
using namespace TelEngine;
namespace { // anonymous
class CpuNotify : public MessageHandler
{
public:
inline CpuNotify()
: MessageHandler("monitor.notify") {}
virtual bool received(Message& m);
};
class Monitor : public String
{
public:
inline Monitor (const String& name)
: String(name),m_value(0) {}
inline void update(int value)
{ m_value = value; }
inline int getValue()
{ return m_value; }
private:
int m_value;
};
/*
NOTE!!! This module use YATE engine behavior. It expects that first engine worker thread
is created after all modules has been initialized
NOTE: when this module is initialized, from the conf file are created
chan.control messages that are enqueued and delivered to cpuload module when
the first engine worker thread is created.
*/
class CongestionModule : public Module
{
public:
CongestionModule();
~CongestionModule();
virtual void initialize();
// Update a monitor state. If monitor does not exists, it will be appended
void updateMonitor(const String& name, const String& step);
// Find worst state and update engine's state
void updateEngine();
private:
bool m_init;
ObjList m_monitors;
Mutex m_monitorsBlocker;
};
static CongestionModule s_module;
static const String s_target = "engine";
static const char* s_mutexName = "CCongestion";
/**
* class CpuNotify
*/
bool CpuNotify::received(Message& msg)
{
int count = msg.getIntValue("count",0);
NamedList params("");
String param = "notify.";
String paramValue = "value.";
for (int i = 0; i < count; i++) {
String notif = msg.getValue(param + String(i),"");
String value = msg.getValue(paramValue + String(i),"");
if (notif == "target" && value != s_target)
return false;
params.addParam(notif,value);
}
s_module.updateMonitor(params.getValue("monitor"),params.getValue("new"));
s_module.updateEngine();
return true;
}
/**
* Class CongestionModule
*/
CongestionModule::CongestionModule()
: Module("CongestionModule","misc"), m_init(false), m_monitorsBlocker(false,s_mutexName)
{
Output("Loaded module CCongestion");
}
CongestionModule::~CongestionModule()
{
Output("Unloading module CCongestion");
}
void CongestionModule::initialize()
{
Output("Initializing module CCongestion");
Configuration cfg(Engine::configFile("ccongestion"));
if (!m_init) {
m_init = true;
Engine::install(new CpuNotify());
}
m_monitorsBlocker.lock();
m_monitors.clear();
m_monitorsBlocker.unlock();
NamedList* cpu = cfg.getSection("cpu");
if (cpu) {
for (unsigned int i = 0;i < cpu->count();i++) {
NamedString* ns = cpu->getParam(i);
if (!ns)
continue;
Message* m = new Message("chan.control");
m->addParam("targetid","cpuload");
m->addParam("component","cpuload");
m->addParam("operation",ns->name());
m->addParam("engine",*ns);
Engine::enqueue(m);
}
}
}
void CongestionModule::updateMonitor(const String& name, const String& value)
{
int val = lookup(value,Engine::getCallAcceptStates(),Engine::Accept);
Lock lock(m_monitorsBlocker);
ObjList* o = m_monitors.find(name);
if (o) {
Monitor* mon = static_cast<Monitor*>(o->get());
if (mon)
mon->update(val);
return;
}
Monitor* mon = new Monitor(name);
mon->update(val);
m_monitors.append(mon);
}
void CongestionModule::updateEngine()
{
Lock lock(m_monitorsBlocker);
int val = 0;
for (ObjList* o = m_monitors.skipNull();o;o = o->skipNext()) {
Monitor* mon = static_cast<Monitor*>(o->get());
if (!mon)
continue;
if (mon->getValue() > val)
val = mon->getValue();
}
if (Engine::accept() == val)
return;
Engine::setAccept((Engine::CallAccept)val);
DDebug(this,DebugInfo,"Updating cpu state to %s",lookup(val,Engine::getCallAcceptStates()));
}
}; // anonymous namespace
/* vi: set ts=8 sw=4 sts=4 noet: */

920
modules/server/cpuload.cpp Normal file
View File

@ -0,0 +1,920 @@
/**
* cpuload.cpp
* This file is part of the YATE Project http://YATE.null.ro
*
* Monitor CPU load and inform YATE about it
*
* Yet Another Telephony Engine - a fully featured software PBX and IVR
* Copyright (C) 2004-2010 Null Team
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include <yatengine.h>
#include <yatephone.h>
#include <string.h>
#include <unistd.h>
using namespace TelEngine;
namespace { // anonymous
class Cpu
{
public:
Cpu();
// Default implementation updates system load from "/proc/stat"
inline virtual int getSystemLoad()
{
Debug("CpuLoad",DebugStub,"System CPU load is not implemented"
" for this OS");
return -1;
}
// Updates yate CPU load information
void updateYateLoad();
// Set the number of cores.
inline void setCore(int core)
{ m_coreNumber = core; m_cpuDiscovered = false; }
inline int getYateLoad()
{ return (m_loadY + 50)/ 100; }
inline int getYateUserLoad()
{ return (m_loadYU + 50) / 100; }
inline int getYateKernelLoad()
{ return (m_loadYS + 50) / 100; }
inline int getSLoad()
{ return (m_loadSystem + 50) / 100; }
protected:
u_int64_t m_yateUser; // Yate last time counter spend in user mode
u_int64_t m_yateSystem; // Yate last time counter spent in kernel mode
u_int64_t m_sysUser; // Last time value of system spent in user mode
u_int64_t m_sysKer; // Last time value of system spent in kernel mode
u_int64_t m_sysNice; // Last time value of system spent in nice mode
int m_loadYU; // Current Yate user CPU loading
int m_loadYS; // Current Yate kernel CPU loading
int m_loadY; // Current Yate CPU loading
int m_loadSystem; // Current System CPU loading
int m_coreNumber; // The number of CPU cores
u_int64_t m_lastYateCheck; // Last time when we checked for Yate CPU load
u_int64_t m_lastSystemCheck; // Last time when we checked for System CPU load
bool m_cpuDiscovered; // Flag used in CPU core discovery
};
#ifdef _SC_CLK_TCK
class CpuStat : public Cpu
{
public:
inline CpuStat() {}
virtual int getSystemLoad();
void close(File* f);
};
#endif
// Platform depended System CPU loading
class CpuPlatform : public Cpu
{
public:
inline CpuPlatform() {}
virtual int getSystemLoad();
};
class Interval : public String
{
public:
Interval(const String& name, int up, int threshold, int down);
bool hasValue(int value);
inline int getThreshold()
{ return m_threshold; }
inline int getUp()
{ return m_up; }
inline int getDown()
{ return m_down; }
private:
int m_up;
int m_threshold;
int m_down;
};
class Target : public String
{
public:
Target(const String& name, int osTimer, const String& monitor);
virtual ~Target();
void updateIntervals(const String& curent);
void handleOscillation(const String& interval,int load);
Interval* getInterval(int load);
void sendNotify(int load);
bool neighbors();
inline void addInterval(Interval* i, bool ascendent)
{ ascendent ? m_ascendent.append(i) : m_descendent.append(i); }
inline void startTimer()
{ m_oscillationEnd = m_oscillationTimeout == 0 ?
0 : Time::msecNow() + m_oscillationTimeout; }
inline bool needInform()
{ return Time::msecNow() >= m_oscillationEnd; }
void manageLoad(int load);
inline unsigned int getIntervalsCount()
{ return m_ascendent.count(); }
private:
String m_currentInterval;
String m_previewsInterval;
String m_lastNotified;
String m_monitor;
ObjList m_ascendent;
ObjList m_descendent;
u_int64_t m_oscillationEnd;
int m_oscillationTimeout;
int m_counter;
};
class CpuMonitor : public String
{
public:
CpuMonitor(const String& name);
virtual ~CpuMonitor();
void initialize(const NamedList& params, int osTimer);
void manageLoad(int load);
bool addTarget(const NamedString& inc, int osTimer);
inline void clearTargets()
{ m_targets.clear(); }
private:
ObjList m_targets;
bool m_informed;
};
class CpuUpdater : public Thread, public Mutex
{
public:
enum Monitors {
YateUser,
YateKernel,
YateTotal,
System,
Unknown
};
CpuUpdater();
virtual void run();
void setCpu(Cpu* cpu);
void initialize(const Configuration& params);
inline void requestExit()
{ m_exit = true; }
bool update(const Message& msg);
inline Cpu* getCpu()
{ return m_cpu; }
private:
int m_updateInterval; // The interval to update CPU load
int m_oscillationTimer; // Default value 5000
int m_coreNumber;
Cpu* m_cpu;
bool m_exit;
bool m_systemCpuSupport;
CpuMonitor m_yateUser;
CpuMonitor m_yateSys;
CpuMonitor m_yateTotal;
CpuMonitor m_system;
};
class QueryHandler : public MessageHandler
{
public:
inline QueryHandler(unsigned int priority = 100)
: MessageHandler("monitor.query",priority)
{ }
virtual ~QueryHandler()
{ }
virtual bool received(Message& msg);
};
class CpuModule : public Module
{
public:
CpuModule();
~CpuModule();
virtual void initialize();
virtual bool received(Message& m, int id);
inline CpuUpdater* getUpdater()
{ return m_updater; }
private:
CpuUpdater* m_updater;
bool m_init;
};
static CpuModule s_module;
static String s_address = "/proc/stat";
static int s_defaultHysteresis = 2;
static int s_bufLen = 4096;
static int s_smooth = 33;
static TokenDict s_monitors[] = {
{"userLoad", CpuUpdater::YateUser},
{"kernelLoad", CpuUpdater::YateKernel},
{"totalLoad", CpuUpdater::YateTotal},
{"systemLoad", CpuUpdater::System},
{0,0}
};
/**
* Class CpuUpdater
*/
static String s_mutexName = "CpuMutex";
CpuUpdater::CpuUpdater()
: Thread("CpuThread",Thread::Normal), Mutex(false,s_mutexName), m_updateInterval(1000),
m_oscillationTimer(5000), m_coreNumber(1), m_cpu(0), m_exit(false), m_systemCpuSupport(true),
m_yateUser(s_monitors[0].token), m_yateSys(s_monitors[1].token),
m_yateTotal(s_monitors[2].token), m_system(s_monitors[3].token)
{
DDebug(&s_module,DebugAll,"Creating CpuUpdater thread");
}
void CpuUpdater::run()
{
int time = 0;//m_updateInterval;
while (!m_exit) {
if (time < m_updateInterval) {
Thread::msleep(50);
time += 50;
continue;
}
time = 0;
Lock lock(this);
m_cpu->updateYateLoad();
m_yateUser.manageLoad(m_cpu->getYateUserLoad());
m_yateSys.manageLoad(m_cpu->getYateKernelLoad());
m_yateTotal.manageLoad(m_cpu->getYateLoad());
if (!m_systemCpuSupport)
continue;
int sys = m_cpu->getSystemLoad();
if (sys < 0) {
Debug(&s_module,DebugNote,"System Cpu load not supported!");
m_systemCpuSupport = false;
continue;
}
m_system.manageLoad(sys);
XDebug(&s_module,DebugAll,"YateLoading is : yu %d ; ys %d ; y %d ; s %d",
m_cpu->getYateUserLoad(),m_cpu->getYateKernelLoad(),m_cpu->getYateLoad(),sys);
}
delete m_cpu;
}
void CpuUpdater::setCpu(Cpu* cpu)
{
if (!cpu)
return;
m_cpu = cpu;
m_cpu->setCore(m_coreNumber);
}
// This method appends a target from chan.control message
bool CpuUpdater::update(const Message& msg)
{
String mon = msg.getValue("operation","");
NamedString* inc = 0;
for (unsigned int i = 0;i < msg.count();i++) {
NamedString* ns = msg.getParam(i);
if (ns->name() == "operation" || ns->name() == "component" ||
ns->name() == "targetid")
continue;
inc = ns;
}
if (!inc) {
DDebug(&s_module,DebugNote,"No target parameter for monitor %s",mon.c_str());
return false;
}
Lock lock(this);
switch (lookup(mon,s_monitors,Unknown)) {
case YateUser:
return m_yateUser.addTarget(*inc,m_oscillationTimer);
case YateKernel:
return m_yateSys.addTarget(*inc,m_oscillationTimer);
case YateTotal:
return m_yateTotal.addTarget(*inc,m_oscillationTimer);
case System:
return m_system.addTarget(*inc,m_oscillationTimer);
default:
Debug(&s_module,DebugNote,"Unknown cpu monitor %s",mon.c_str());
}
return false;
}
void CpuUpdater::initialize(const Configuration& params)
{
NamedList* general = params.getSection("general");
int osTimer = 0;
if (general) {
m_updateInterval = general->getIntValue("interval",1000);
if (m_updateInterval < 1000) {
Debug(&s_module,DebugConf,"Minimum value for interval is 1000!");
m_updateInterval = 1000;
}
osTimer = general->getIntValue("oscillation_interval",5000);
if (osTimer < 2 * m_updateInterval) {
Debug(&s_module,DebugConf,"Oscillation interval is to small!");
osTimer = 3 * m_updateInterval;
}
m_coreNumber = general->getIntValue("core_number",1);
if (m_coreNumber < 1) {
Debug(&s_module,DebugConf,"Core number must be at least 1!");
m_coreNumber = 1;
}
if (m_cpu)
m_cpu->setCore(m_coreNumber);
}
Lock lock(this);
NamedList* yateu = params.getSection(s_monitors[0].token);
m_yateUser.clearTargets();
if (yateu)
m_yateUser.initialize(*yateu,osTimer);
NamedList* yates = params.getSection(s_monitors[1].token);
m_yateSys.clearTargets();
if (yates)
m_yateSys.initialize(*yates,osTimer);
NamedList* yatet = params.getSection(s_monitors[2].token);
m_yateTotal.clearTargets();
if (yatet)
m_yateTotal.initialize(*yatet,osTimer);
NamedList* sys = params.getSection(s_monitors[3].token);
m_system.clearTargets();
if (sys)
m_system.initialize(*sys,osTimer);
}
/**
* Class Cpu
*/
Cpu::Cpu()
: m_yateUser(0), m_yateSystem(0), m_sysUser(0), m_sysKer(0), m_sysNice(0),
m_loadYU(0), m_loadYS(0), m_loadY(0), m_coreNumber(1), m_lastYateCheck (0),
m_lastSystemCheck(0), m_cpuDiscovered(false)
{
SysUsage::init();
}
void Cpu::updateYateLoad()
{
u_int64_t user = SysUsage::msecRunTime(SysUsage::UserTime);
u_int64_t ker = SysUsage::msecRunTime(SysUsage::KernelTime);
u_int64_t time = SysUsage::msecRunTime(SysUsage::WallTime);
bool updateLoad = true;
if (user < m_yateUser || ker < m_yateSystem || time < m_lastYateCheck) {
Debug(&s_module,DebugInfo,"Negative values for yate CPU update "
"cu = "FMT64" lu="FMT64" ck="FMT64" lk="FMT64" ct="FMT64" lt="FMT64" ",
user,m_yateUser,ker,m_yateSystem,time,m_lastYateCheck);
updateLoad = false;
}
if (updateLoad && (m_yateUser != 0 || m_yateSystem != 0)) {
int inter = time - m_lastYateCheck;
int usr = user - m_yateUser;
int iload = (usr * 100) / inter;
iload /= m_coreNumber;
m_loadYU = (100 - s_smooth) * m_loadYU/100 + s_smooth*iload;
int ke = ker - m_yateSystem;
iload = (ke * 100) / inter;
iload /= m_coreNumber;
m_loadYS = (100 - s_smooth) * m_loadYS/100 + s_smooth*iload;
iload = ((usr + ke) * 100)/ inter;
iload /= m_coreNumber;
m_loadY = (100 - s_smooth) * m_loadY/100 + s_smooth*iload;
}
m_yateUser = user;
m_yateSystem = ker;
m_lastYateCheck = time;
}
/**
* Class Interval
*
*/
Interval::Interval(const String& name, int up, int threshold, int down)
: String(name), m_up(up), m_threshold(threshold), m_down(down)
{
DDebug(&s_module,DebugAll,"Creating interval %s with low = %d and hight = %d",
name.c_str(),down,up);
}
bool Interval::hasValue(int value)
{
return value >= m_down && value <= m_up;
}
/**
* Class Target
*/
Target::Target(const String& name, int osTimer, const String& monitor)
: String(name), m_monitor(monitor), m_oscillationTimeout(osTimer), m_counter(0)
{
DDebug(&s_module,DebugAll,"Creating target '%s' for monitor '%s' [%p]",
name.c_str(),monitor.c_str(),this);
}
Target::~Target()
{
DDebug(&s_module,DebugAll,"Destroing target '%s' from monitor '%s' [%p]",
c_str(),m_monitor.c_str(),this);
m_ascendent.clear();
m_descendent.clear();
}
void Target::sendNotify(int load)
{
if (m_lastNotified == m_currentInterval)
return;
Message* m = new Message("monitor.notify");
int index = 0;
String n("notify.");
String v("value.");
m->addParam(n + String(index),"monitor");
m->addParam(v + String(index++),m_monitor);
m->addParam(n + String(index),"target");
m->addParam(v + String(index++),*this);
m->addParam(n + String(index),"old");
m->addParam(v + String(index++),m_previewsInterval);
m->addParam(n + String(index),"new");
m->addParam(v + String(index++),m_currentInterval);
m->addParam(n + String(index),"cpu_load");
m->addParam(v + String(index++),String(load));
m->addParam(n + String(index),"counter");
m->addParam(v + String(index++),String(m_counter));
m->addParam("count",String(index));
Engine::enqueue(m);
m_lastNotified = m_currentInterval;
startTimer();
m_counter = 0;
}
void Target::handleOscillation(const String& interval,int load)
{
updateIntervals(interval);
if (!needInform())
return;
sendNotify(load);
}
Interval* Target::getInterval(int load)
{
m_counter++;
Interval* i1 = 0;
Interval* i2 = 0;
for (ObjList* o = m_ascendent.skipNull();o;o = o->skipNext()) {
Interval* i = static_cast<Interval*>(o->get());
if (!i)
continue;
if (i->hasValue(load))
i1 = i;
}
for (ObjList* o = m_descendent.skipNull();o;o = o->skipNext()) {
Interval* i = static_cast<Interval*>(o->get());
if (!i)
continue;
if (i->hasValue(load)) {
i2 = i;
break;
}
}
if (!i1 || !i2)
return 0;
if (*i1 == *i2)
return i1;
if (*i1 == m_currentInterval || *i2 == m_currentInterval)
return 0;
ObjList* o = m_ascendent.find(m_currentInterval);
if (!o)
return 0;
Interval* i = static_cast<Interval*>(o->get());
if (load < i->getUp() && load < i->getDown())
return i1->getUp() > i2->getUp() ? i1 : i2;
else
return i1->getDown() < i2->getDown() ? i1 : i2;
}
void Target::manageLoad(int load)
{
Interval* i = getInterval(load);
if (!i || *i == m_currentInterval) { // The interval has not been changed
updateIntervals(m_currentInterval);
sendNotify(load);
return;
}
if (m_previewsInterval == *i && neighbors()) {
// Oscillateing
handleOscillation(*i,load);
return;
}
updateIntervals(*i);
sendNotify(load);
}
// Check if m_previewsInterval and m_currentInterval are neighbors
bool Target::neighbors()
{
ObjList* o = m_ascendent.find(m_previewsInterval);
if (!o)
return false;
Interval* i = static_cast<Interval*>(o->get());
ObjList* o1 = m_ascendent.find(m_currentInterval);
if (!o1)
return false;
Interval* i1 = static_cast<Interval*>(o1->get());
return i->getDown() == i1->getUp() || i->getUp() == i1->getDown();
}
void Target::updateIntervals(const String& current)
{
m_previewsInterval = m_currentInterval;
m_currentInterval = current;
}
/**
* Class CpuMonitor
*/
CpuMonitor::CpuMonitor(const String& name)
: String(name), m_informed(false)
{
DDebug(&s_module,DebugAll,"Creating CpuMonitor '%s' [%p]", name.c_str(), this);
}
CpuMonitor::~CpuMonitor()
{
DDebug(&s_module,DebugAll,"Destroing CpuMonitor %s [%p]", c_str(),this);
m_targets.clear();
}
void CpuMonitor::initialize(const NamedList& params,int osTimer)
{
for (unsigned int i = 0; i < params.count(); i++) {
NamedString* ns = params.getParam(i);
if (ns)
addTarget(*ns,osTimer);
}
}
void CpuMonitor::manageLoad(int load)
{
if (load > 100 && !m_informed) {
Debug(&s_module,DebugConf,"Please configure cpu core number");
m_informed = true;
return;
}
for (ObjList*o = m_targets.skipNull();o;o = o->skipNext()) {
Target* i = static_cast<Target*>(o->get());
if (!i)
continue;
i->manageLoad(load);
}
}
bool CpuMonitor::addTarget(const NamedString& incumbent, int osTimer)
{
ObjList* intervals = incumbent.split(';'); // Obtain intervals
if (!intervals)
return false;
ObjList* exists = m_targets.find(incumbent.name());
if (exists) {
Debug(&s_module,DebugConf,"Target '%s' already exists for monitor '%s'",
incumbent.name().c_str(),c_str());
TelEngine::destruct(intervals);
return false;
}
Target* target = new Target(incumbent.name(),osTimer,*this);
Interval* prevUp = 0;
Interval* prevDown = 0;
for (ObjList* i = intervals->skipNull();i;i = i->skipNext()) {
String* s = static_cast<String*>(i->get());
if (!s || s->null())
continue;
ObjList* o = s->split(',');
if (!o)
continue;
int count = o->count();
String* n = static_cast<String*>(o->at(0)); // Interval name
if (!n) {
TelEngine::destruct(o);
continue;
}
String name = *n;
n = 0;
int iUp = 100;
if (count > 1) {
// Threshold value,if not exists we suppose it full CPU load (100%)
String* iu = static_cast<String*>(o->at(1));
iUp = iu ? iu->toInteger() : 100;
iu = 0;
}
int iHR = s_defaultHysteresis;
if (count > 2) {
// Hysteresis value. If not exists then init to default
String* hr = static_cast<String*>(o->at(2));
if (hr) {
iHR = hr->toInteger();
}
hr = 0;
}
TelEngine::destruct(o); // s->split()
bool error = false;
while (true) {
// If previews interval upper value is greater than current threshold
// then something may be wrong in configuration file
if (prevUp && prevUp->getUp() >= iUp) {
error = true;
break;
}
// If previews interval threshold is greater than current interval lower value or
// if this is first interval check if his boundaries are between 0-100
if ((prevDown && prevDown->getThreshold() >= (iUp - iHR)) ||
(!prevDown && (iUp - iHR) <= 0)) {
error = true;
break;
}
Interval* up = new Interval(name,iUp == 100 ? 100 : iUp + iHR,iUp,
(prevUp) ? prevUp->getUp() : 0);
Interval* down = new Interval(name,iUp == 100? 100 : iUp - iHR,iUp,
(prevDown) ? prevDown->getUp() : 0);
target->addInterval(up,true);
target->addInterval(down,false);
prevUp = up;
prevDown = down;
break;
}
if (error) {
Debug(&s_module,DebugConf,"Invalid intervals threshold for target %s",target->c_str());
TelEngine::destruct(target);
TelEngine::destruct(intervals);
return false;
}
}
if (prevUp->getUp() != 100) {
Debug(&s_module,DebugConf,"Invalid intervals! No interval reach 100");
TelEngine::destruct(target);
TelEngine::destruct(intervals);
return false;
}
// Check if we have an valid target! If not remove it
if (target->getIntervalsCount() < 2) {
Debug(&s_module,DebugConf,"To few intervals for target '%s' from manager '%s'",
target->c_str(),c_str());
TelEngine::destruct(target);
} else
m_targets.append(target);
TelEngine::destruct(intervals);
return true;
}
/**
* Class QueryHandler
*/
bool QueryHandler::received(Message& msg)
{
String target = msg.getValue("name");
int manager = lookup(target,s_monitors,CpuUpdater::Unknown);
CpuUpdater* cu = s_module.getUpdater();
if (!cu)
return false;
Cpu* cpu = cu->getCpu();
if (!cpu)
return false;
switch (manager) {
case CpuUpdater::YateKernel:
msg.setParam("value",String(cpu->getYateKernelLoad()));
return true;
case CpuUpdater::YateUser:
msg.setParam("value",String(cpu->getYateUserLoad()));
return true;
case CpuUpdater::YateTotal:
msg.setParam("value",String(cpu->getYateLoad()));
return true;
case CpuUpdater::System:
int sload = cpu->getSLoad();
if (sload < 0)
return false;
msg.setParam("value",String(sload));
return true;
}
return false;
}
/**
* Class CpuModule
*/
CpuModule::CpuModule()
: Module("CpuModule","misc",true), m_init(false)
{
Output("Loaded module Cpu");
m_updater = new CpuUpdater();
}
CpuModule::~CpuModule()
{
Output("Unloading module Cpu");
}
bool CpuModule::received (Message &msg, int id)
{
switch (id) {
case Halt:
// Stop the thread!
if (m_updater)
m_updater->requestExit();
break;
case Control:
// Process chan.control message
const String* dest = msg.getParam("component");
if (dest && (*dest == "cpuload"))
return m_updater->update(msg);
break;
}
return false;
}
void CpuModule::initialize()
{
Output("Initializing module Cpu");
Configuration cfg(Engine::configFile("cpuload"));
m_updater->initialize(cfg);
s_smooth = cfg.getIntValue("general","smooth",33);
if (s_smooth < 5)
s_smooth = 5;
if (s_smooth > 50)
s_smooth = 50;
if (m_init)
return;
Cpu* c = new CpuPlatform();
if (c->getSystemLoad() == -1) {
delete c;
c = 0;
}
#ifdef _SC_CLK_TCK
if (!c) {
c = new CpuStat();
if (c->getSystemLoad() == -1) {
delete c;
c = 0;
}
}
#endif
if (!c)
c = new Cpu();
m_updater->setCpu(c);
m_init = true;
m_updater->startup();
installRelay(Control);
Engine::install(new QueryHandler());
installRelay(Halt);
}
#ifdef _SC_CLK_TCK
/**
* class CpuStat
* Obtain system cpu load from "/proc/stat"
*/
void CpuStat::close(File* f)
{
if (!f)
return;
f->terminate();
delete f;
}
int CpuStat::getSystemLoad()
{
File* f = new File();
if (!f->openPath(s_address,false,true)) {
DDebug(&s_module,DebugNote,"Failed to open %s",s_address.c_str());
return -1;
}
u_int64_t time = Time::msecNow();
char buf[s_bufLen + 1];
int read = f->readData(buf,s_bufLen);
if (read < 0) {
Debug(&s_module,DebugNote,"Read data error %s",strerror(errno));
close(f);
return -1;
}
buf[read] = '\0';
String s(buf,read);
ObjList* ob = s.split('\n',false);
if (!ob) {
Debug(&s_module,DebugNote,"Invalid data read from %s", s_address.c_str());
close(f);
return -1;
}
int counter = 0;
for (ObjList* o = ob->skipNull();o;o = o->skipNext()) {
String* cpu = static_cast<String*>(o->get());
if (!cpu) {
TelEngine::destruct(ob);
close(f);
return -1;
}
ObjList* val = cpu->split(' ',false);
if (!val || val->count() < 4) {
if (val)
TelEngine::destruct(val);
TelEngine::destruct(ob);
close(f);
return -1;
}
String* name = static_cast<String*>(val->at(0));
if (!name) {
TelEngine::destruct(ob);
TelEngine::destruct(val);
close(f);
return -1;
}
if (*name == "cpu") {
String* u = static_cast<String*>(val->at(1));
int user = u->toInteger();
String* n = static_cast<String*>(val->at(2));
int nice = n->toInteger();
String* k = static_cast<String*>(val->at(3));
int kernel = k->toInteger();
int loading = 0;
long user_hz = ::sysconf(_SC_CLK_TCK);
if (user_hz == 0) {
Debug(&s_module,DebugWarn,"UserHZ value is 0 !! Can not calculate "
"system CPU loading");
return -1;
}
if (m_cpuDiscovered) {
// If we had discovered the CPU's number, calculate the loading
loading = (user - m_sysUser) + (nice - m_sysNice) + (kernel - m_sysKer);
int t = time - m_lastSystemCheck;
if (t == 0)
return (m_loadSystem + 50) / 100;
loading = ((loading *100) * (1000 / user_hz)) / t;
loading /= m_coreNumber;
m_loadSystem = (100 - s_smooth) * m_loadSystem/100 + s_smooth*loading;
}
m_sysUser = user;
m_sysNice = nice;
m_sysKer = kernel;
m_lastSystemCheck = time;
TelEngine::destruct(val);
if (m_cpuDiscovered) {
TelEngine::destruct(ob);
close(f);
return (m_loadSystem + 50) / 100;
}
continue;
}
if (name->startsWith("cpu")) { // count cpu0, cpu1,...
counter ++;
TelEngine::destruct(val);
continue;
}
m_cpuDiscovered = true;
if (m_coreNumber != counter && counter > 0) {
Debug(&s_module,DebugMild,"Updating CPU core number from %d to %d",
m_coreNumber,counter);
m_coreNumber = counter;
}
TelEngine::destruct(val);
TelEngine::destruct(ob);
close(f);
return 0;
}
TelEngine::destruct(ob);
close(f);
return -1;
}
#endif
/**
* Class CpuPlatform
* Platform depended system cpu load implementation
*/
int CpuPlatform::getSystemLoad()
{
#if defined(_WINDOWS)
// Windows implementation
#elseif defined(__FreeBSD__)
// FreeBSD implementation
#elseif defined(_MAC)
// MAC OS implementation
#endif
return -1;
}
}; // anonymous namespace
/* vi: set ts=8 sw=4 sts=4 noet: */

View File

@ -807,6 +807,13 @@ public:
Client = 3,
ClientProxy = 4,
};
enum CallAccept {
Accept = 0,
Partial = 1,
Congestion = 2,
Reject = 3,
};
/**
* Plugin load and initialization modes.
@ -858,6 +865,30 @@ public:
static RunMode mode()
{ return s_mode; }
/**
* Get call accept status
* @return Engine's call accept status as enumerated value
*/
inline static CallAccept accept() {
return s_accept;
}
/**
* Set call accept status
* @param ca New call accept status as enumerated value
*/
inline static void setAccept(CallAccept ca) {
s_accept = ca;
}
/**
* Get call accept states
* @return states Pointer to the call accept states TokenDict
*/
inline static const TokenDict* getCallAcceptStates() {
return s_callAccept;
}
/**
* Check if the engine is running as telephony client
* @return True if the engine is running in client mode
@ -1126,6 +1157,8 @@ private:
static NamedList s_params;
static int s_haltcode;
static RunMode s_mode;
static CallAccept s_accept;
static const TokenDict s_callAccept[];
};
}; // namespace TelEngine