Added cache module handling LNP and CNAM.
git-svn-id: http://voip.null.ro/svn/yate@4409 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
parent
b15b64405e
commit
a63accff68
|
@ -0,0 +1,136 @@
|
|||
; This file configures the cache module
|
||||
|
||||
|
||||
[general]
|
||||
; This section sets global variables of the implementation
|
||||
|
||||
; size: integer: The number of hash lists to use in each cache
|
||||
; Defaults to 17, can't be less then 3 or greater then 1024
|
||||
; This parameter can be overridden in cache sections
|
||||
;size=17
|
||||
|
||||
; ttl: integer: Cache item time to live in seconds
|
||||
; Minimum allowed value is 10
|
||||
; This parameter is not applied on reload for already created cache objects
|
||||
;ttl=
|
||||
|
||||
; limit: integer: Maximum number of stored cache items
|
||||
; This value must be at least the power of 2 of cache size, e.g. for
|
||||
; cache size 5 limit must be at least 25
|
||||
; This parameter is applied on reload and can be overridden in cache sections
|
||||
;limit=
|
||||
|
||||
; expire_check_interval: integer: The interval (in seconds) to check cache expire
|
||||
; Defaults to 10, minimum allowed value 1, maximum allowed value 300
|
||||
; This parameter is applied on reload
|
||||
;expire_check_interval=10
|
||||
|
||||
; account: string: Database name
|
||||
; This parameter is applied on reload and can be overridden in cache sections
|
||||
;account=
|
||||
|
||||
|
||||
[lnp]
|
||||
; This section configures the LNP cache
|
||||
; Database query examples assume a 'lnp' table with the following fields:
|
||||
; - id TEXT The called number
|
||||
; - routing TEXT Routing number (may be empty)
|
||||
; - expires NUMERIC Expire time (EPOCH) in seconds
|
||||
|
||||
; enable: boolean: Enable LNP cache
|
||||
; Defaults to no
|
||||
; This parameter is applied on reload
|
||||
;enable=no
|
||||
|
||||
; store_failed_requests: boolean: Store failed LNP requests in cache
|
||||
; Defaults to no
|
||||
; This parameter is applied on reload
|
||||
;store_failed_requests=no
|
||||
|
||||
; store_npdi_before: boolean: Store routing number from incoming calls with LNP
|
||||
; This parameter can be overidden by a 'cache_lnp_store' parameter when routing
|
||||
; Defaults to yes
|
||||
; This parameter is applied on reload
|
||||
;store_npdi_before=yes
|
||||
|
||||
; copyparams: string: Parameters to handle (store in cache or copy to handled messages)
|
||||
; This parameter is applied on reload and can be overridden when routing by
|
||||
; a 'cache_lnp_parameters' parameter
|
||||
;copyparams=routing
|
||||
|
||||
; routebefore: integer: The priority of the call.route message handler used to
|
||||
; intercept incoming calls and set the routing number if found in cache or
|
||||
; update the cache if LNP was already done
|
||||
; Defaults to 25
|
||||
;routebefore=25
|
||||
|
||||
; routeafter: integer: The priority of the call.route message handler used to
|
||||
; intercept the message after LNP was done locally and update the cache
|
||||
; Defaults to 75
|
||||
;routeafter=75
|
||||
|
||||
; query_loadcache: string: Database query used to load the LNP cache when created
|
||||
; This parameter is applied on reload
|
||||
;query_loadcache=SELECT * FROM lnp
|
||||
|
||||
; query_loaditem: string: Database query used to load an item when created
|
||||
; This parameter is applied on reload
|
||||
;query_loaditem=SELECT * FROM lnp WHERE id='${id}'
|
||||
|
||||
; query_save: string: Database query used to save an item
|
||||
; This parameter is applied on reload
|
||||
;query_save=INSERT INTO lnp(id,routing,expires) VALUES('${id}','${routing}',${expires})
|
||||
|
||||
; query_expire: string: Database query used to expire items
|
||||
; This parameter is applied on reload
|
||||
;query_expire=DELETE FROM lnp WHERE expires <= ${time}
|
||||
|
||||
|
||||
[cnam]
|
||||
; This section configures the CNAM cache
|
||||
; Database query examples assume a 'cnam' table with the following fields:
|
||||
; - id TEXT The caller number
|
||||
; - callername TEXT Caller's name (may be empty)
|
||||
; - expires NUMERIC Expire time (EPOCH) in seconds
|
||||
|
||||
; enable: boolean: Enable CNAM cache
|
||||
; Defaults to no
|
||||
; This parameter is applied on reload
|
||||
;enable=no
|
||||
|
||||
; store_empty: boolean: Store empty caller name returned by local CNAM query in cache
|
||||
; Defaults to no
|
||||
; This parameter is applied on reload
|
||||
;store_empty=no
|
||||
|
||||
; copyparams: string: Parameters to handle (store in cache or copy to handled messages)
|
||||
; This parameter is applied on reload and can be overridden when routing by
|
||||
; a 'cache_cnam_parameters' parameter
|
||||
;copyparams=callername
|
||||
|
||||
; routebefore: integer: The priority of the call.preroute message handler used to
|
||||
; intercept incoming calls and set the callername if found in cache or
|
||||
; update the cache from caller name
|
||||
; Defaults to 25
|
||||
;routebefore=25
|
||||
|
||||
; routeafter: integer: The priority of the call.preroute message handler used to
|
||||
; intercept the message after CNAM was done locally and update the cache
|
||||
; Defaults to 75
|
||||
;routeafter=75
|
||||
|
||||
; query_loadcache: string: Database query used to load the CNAM cache when created
|
||||
; This parameter is applied on reload
|
||||
;query_loadcache=SELECT * FROM cnam
|
||||
|
||||
; query_loaditem: string: Database query used to load an item when created
|
||||
; This parameter is applied on reload
|
||||
;query_loaditem=SELECT * FROM cnam WHERE id='${id}'
|
||||
|
||||
; query_save: string: Database query used to save an item
|
||||
; This parameter is applied on reload
|
||||
;query_save=INSERT INTO cnam(id,callername,expires) VALUES('${id}','${callername}',${expires})
|
||||
|
||||
; query_expire: string: Database query used to expire items
|
||||
; This parameter is applied on reload
|
||||
;query_expire=DELETE FROM cnam WHERE expires <= ${time}
|
|
@ -74,6 +74,7 @@ PROGS := cdrbuild.yate cdrfile.yate regexroute.yate \
|
|||
server/users.yate \
|
||||
$(JUSTSIG) \
|
||||
server/analogdetect.yate \
|
||||
server/cache.yate \
|
||||
client/jabberclient.yate \
|
||||
callgen.yate analyzer.yate rmanager.yate msgsniff.yate
|
||||
|
||||
|
|
|
@ -0,0 +1,849 @@
|
|||
/**
|
||||
* cache.cpp
|
||||
* This file is part of the YATE Project http://YATE.null.ro
|
||||
*
|
||||
* Cache implementation
|
||||
*
|
||||
* Yet Another Telephony Engine - a fully featured software PBX and IVR
|
||||
* Copyright (C) 2004-2011 Null Team
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation; either version 2 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
#include <yatephone.h>
|
||||
|
||||
|
||||
using namespace TelEngine;
|
||||
namespace { // anonymous
|
||||
|
||||
class CacheItem; // A cache item
|
||||
class Cache; // A cache hash list
|
||||
class CacheExpireThread; // Cache expire thread
|
||||
class CacheLoadThread; // Cache load thread
|
||||
class EngineStartHandler; // engine.start handler
|
||||
class CacheModule;
|
||||
|
||||
// Max value for cache expire check interval
|
||||
#define EXPIRE_CHECK_MAX 300
|
||||
|
||||
class CacheItem : public NamedList
|
||||
{
|
||||
friend class Cache;
|
||||
public:
|
||||
inline CacheItem(const String& id, const NamedList& p, const String& copy,
|
||||
u_int64_t expires)
|
||||
: NamedList(id), m_expires(0)
|
||||
{ update(p,copy,expires); }
|
||||
inline void update(const NamedList& p, const String& copy, u_int64_t expires) {
|
||||
m_expires = expires;
|
||||
if (copy)
|
||||
copyParams(p,copy);
|
||||
else
|
||||
copyParams(p);
|
||||
}
|
||||
inline u_int64_t expires() const
|
||||
{ return m_expires; }
|
||||
inline bool timeout(const Time& time) const
|
||||
{ return m_expires && m_expires < time; }
|
||||
protected:
|
||||
u_int64_t m_expires;
|
||||
};
|
||||
|
||||
class Cache : public RefObject, public Mutex
|
||||
{
|
||||
public:
|
||||
Cache(const String& name, int size, const NamedList& params);
|
||||
// Retrieve the number of items in cache
|
||||
inline unsigned int count() const
|
||||
{ return m_count; }
|
||||
// Retrieve the mutex protecting a given list
|
||||
inline unsigned int index(const String& str) const
|
||||
{ return str.hash() % m_list.length(); }
|
||||
// Safely retrieve DB load info
|
||||
inline void getDbLoad(String& account, String& query) {
|
||||
Lock lock(this);
|
||||
account = m_account;
|
||||
query = m_queryLoadCache;
|
||||
}
|
||||
// Reinit
|
||||
inline void update(const NamedList& params)
|
||||
{ doUpdate(params,false); }
|
||||
// Expire entries
|
||||
void expire(const Time& time);
|
||||
// Copy params from cache item. Return true if found
|
||||
bool copyParams(const String& id, NamedList& list, const String& copy);
|
||||
// Add an item to the cache. Remove an existing one
|
||||
// Set dbSave=false when loading from database to avoid saving it again
|
||||
void add(const String& id, const NamedList& params, const String* cpParams,
|
||||
bool dbSave = true) {
|
||||
Lock lock(this);
|
||||
addUnsafe(id,params,cpParams,dbSave);
|
||||
}
|
||||
// Add an item from an Array row
|
||||
inline void add(Array& array, int row, int cols) {
|
||||
Lock lock(this);
|
||||
addUnsafe(array,row,cols);
|
||||
}
|
||||
// Clear the cache
|
||||
void clear();
|
||||
// Retrieve cache name
|
||||
virtual const String& toString() const;
|
||||
protected:
|
||||
virtual void destroyed();
|
||||
// (Re)init
|
||||
void doUpdate(const NamedList& params, bool first);
|
||||
// Add an item to the cache. Remove an existing one
|
||||
CacheItem* addUnsafe(const String& id, const NamedList& params, const String* cpParams,
|
||||
bool dbSave = true);
|
||||
// Add an item from an Array row
|
||||
CacheItem* addUnsafe(Array& array, int row, int cols);
|
||||
// Find a cache item. This method is not thread safe
|
||||
CacheItem* find(const String& id);
|
||||
// Adjust cache length to limit
|
||||
void adjustToLimit(CacheItem* skipAdded);
|
||||
|
||||
String m_name; // Cache name
|
||||
HashList m_list; // The list holding the cache
|
||||
u_int64_t m_cacheTtl; // Cache item TTL (in us)
|
||||
unsigned int m_count; // Current number of items
|
||||
unsigned int m_limit; // Limit the number of cache items
|
||||
unsigned int m_limitOverflow; // Allowed limit overflow
|
||||
String m_copyParams; // Item parameters to store/copy
|
||||
String m_account; // Database account
|
||||
String m_queryLoadCache; // Database load all cache query
|
||||
String m_queryLoadItem; // Database load a cache item query
|
||||
String m_querySave; // Database save query
|
||||
String m_queryExpire; // Database expire query
|
||||
};
|
||||
|
||||
class CacheExpireThread : public Thread
|
||||
{
|
||||
public:
|
||||
inline CacheExpireThread()
|
||||
: Thread("CacheExpireThread")
|
||||
{}
|
||||
virtual void run();
|
||||
};
|
||||
|
||||
class CacheLoadThread : public Thread
|
||||
{
|
||||
public:
|
||||
inline CacheLoadThread(const String name)
|
||||
: Thread("CacheLoadThread"), m_cache(name)
|
||||
{}
|
||||
virtual void run();
|
||||
private:
|
||||
String m_cache;
|
||||
};
|
||||
|
||||
class EngineStartHandler : public MessageHandler
|
||||
{
|
||||
public:
|
||||
inline EngineStartHandler()
|
||||
: MessageHandler("engine.start")
|
||||
{}
|
||||
virtual bool received(Message& msg);
|
||||
};
|
||||
|
||||
class CacheModule : public Module
|
||||
{
|
||||
public:
|
||||
enum Relays {
|
||||
LnpBefore = Route,
|
||||
LnpAfter = Private,
|
||||
CnamBefore = Private << 1,
|
||||
CnamAfter = Private << 2,
|
||||
};
|
||||
CacheModule();
|
||||
~CacheModule();
|
||||
// Safely retrieve the database account
|
||||
inline void getAccount(String& buf) {
|
||||
Lock lock(this);
|
||||
buf = m_account;
|
||||
}
|
||||
// Safely retrieve a reference to a cache
|
||||
inline void getCache(RefPointer<Cache>& c, const String& name) {
|
||||
Lock lock(this);
|
||||
if (name == "lnp")
|
||||
c = m_lnpCache;
|
||||
else if (name == "cnam")
|
||||
c = m_cnamCache;
|
||||
}
|
||||
// Build/update a cache
|
||||
void setupCache(const String& name, const NamedList& params);
|
||||
// Load a cache from database
|
||||
// Set async=false from loading thread
|
||||
void loadCache(const String& name, bool async = true);
|
||||
protected:
|
||||
virtual void initialize();
|
||||
virtual bool received(Message& msg, int id);
|
||||
// Handle messages for LNP
|
||||
void handleLnp(Message& msg, bool before);
|
||||
// Handle messages for CNAM
|
||||
void handleCnam(Message& msg, bool before);
|
||||
|
||||
String m_account; // Database account
|
||||
Cache* m_lnpCache; // LNP cache
|
||||
Cache* m_cnamCache; // CNAM cache
|
||||
};
|
||||
|
||||
|
||||
INIT_PLUGIN(CacheModule); // The module
|
||||
static bool s_engineStarted = false; // Engine started flag
|
||||
static bool s_lnpStoreFailed = false; // Store failed LNP requests
|
||||
static bool s_lnpStoreNpdiBefore = true; // Store LNP when already done
|
||||
static bool s_cnamStoreEmpty = false; // Store empty caller name in CNAM cache
|
||||
static unsigned int s_size = 0; // The number of listst in each cache
|
||||
static unsigned int s_limit = 0; // Default cache limit
|
||||
static unsigned int s_cacheTtlSec = 0; // Default cache item time to live (in seconds)
|
||||
static u_int64_t s_checkToutInterval = 0;// Interval to check cache timeout
|
||||
|
||||
// Check if application or current thread are terminating
|
||||
static inline bool exiting()
|
||||
{
|
||||
return Engine::exiting() || Thread::check(false);
|
||||
}
|
||||
|
||||
// Return a valid unsigned integer
|
||||
static inline unsigned int safeValue(int val)
|
||||
{
|
||||
return val >= 0 ? val : 0;
|
||||
}
|
||||
|
||||
// Adjust a cache size
|
||||
static inline unsigned int adjustedCacheSize(int val)
|
||||
{
|
||||
if (val >= 3 && val <= 1024)
|
||||
return val;
|
||||
if (val > 1024)
|
||||
return 1024;
|
||||
return 3;
|
||||
}
|
||||
|
||||
// Adjust a cache limit
|
||||
static inline unsigned int adjustedCacheLimit(int val, int size)
|
||||
{
|
||||
int sq = size * size;
|
||||
return (val > sq) ? val : sq;
|
||||
}
|
||||
|
||||
// Adjust a cache TTL
|
||||
static inline unsigned int adjustedCacheTtl(int val)
|
||||
{
|
||||
return val > 10 ? val : 10;
|
||||
}
|
||||
|
||||
// Show cache item changes to output
|
||||
static inline void dumpItem(Cache& c, CacheItem& item, const char* oper)
|
||||
{
|
||||
#ifdef DEBUG
|
||||
String tmp;
|
||||
item.dump(tmp," ");
|
||||
Debug(&__plugin,DebugAll,"Cache(%s) %s %p %s expires=%u [%p]",
|
||||
c.toString().c_str(),oper,&item,tmp.c_str(),
|
||||
(unsigned int)(item.expires()/1000000),&c);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Cache
|
||||
*/
|
||||
Cache::Cache(const String& name, int size, const NamedList& params)
|
||||
: Mutex(false,"Cache"),
|
||||
m_name(name), m_list(size), m_cacheTtl(0), m_count(0), m_limit(0),
|
||||
m_limitOverflow(0)
|
||||
{
|
||||
Debug(&__plugin,DebugInfo,"Cache(%s) size=%u [%p]",
|
||||
m_name.c_str(),m_list.length(),this);
|
||||
doUpdate(params,true);
|
||||
}
|
||||
|
||||
// Copy params from cache item. Return true if found
|
||||
bool Cache::copyParams(const String& id, NamedList& list, const String& copy)
|
||||
{
|
||||
lock();
|
||||
CacheItem* item = find(id);
|
||||
if (!item && m_account && m_queryLoadItem) {
|
||||
// Load from database
|
||||
String query = m_queryLoadItem;
|
||||
NamedList p("");
|
||||
p.addParam("id",id);
|
||||
p.replaceParams(query);
|
||||
Message m("database");
|
||||
m.addParam("account",m_account);
|
||||
m.addParam("query",query);
|
||||
unlock();
|
||||
bool ok = Engine::dispatch(m);
|
||||
lock();
|
||||
const char* error = m.getValue("error");
|
||||
if (ok && !error) {
|
||||
Array* a = static_cast<Array*>(m.userObject("Array"));
|
||||
int rows = a ? a->getRows() : 0;
|
||||
if (rows > 0)
|
||||
item = addUnsafe(*a,1,a->getColumns());
|
||||
else
|
||||
DDebug(&__plugin,DebugAll,"Cache(%s) item '%s' not found in database [%p]",
|
||||
m_name.c_str(),id.c_str(),this);
|
||||
}
|
||||
else
|
||||
Debug(&__plugin,DebugNote,"Cache(%s) failed to load item '%s' %s [%p]",
|
||||
m_name.c_str(),id.c_str(),TelEngine::c_safe(error),this);
|
||||
}
|
||||
if (item) {
|
||||
list.copyParams(*item,copy);
|
||||
dumpItem(*this,*item,"found in cache");
|
||||
}
|
||||
unlock();
|
||||
return item != 0;
|
||||
}
|
||||
|
||||
// Expire entries
|
||||
void Cache::expire(const Time& time)
|
||||
{
|
||||
if (!m_cacheTtl)
|
||||
return;
|
||||
Lock lck(this);
|
||||
if (!m_cacheTtl)
|
||||
return;
|
||||
XDebug(&__plugin,DebugAll,"Cache(%s) expiring items [%p]",m_name.c_str(),this);
|
||||
if (m_account && m_queryExpire) {
|
||||
String query = m_queryExpire;
|
||||
NamedList p("");
|
||||
p.setParam("time",String(time.sec()));
|
||||
p.replaceParams(query);
|
||||
Message* m = new Message("database");
|
||||
m->addParam("account",m_account);
|
||||
m->addParam("query",query);
|
||||
Engine::enqueue(m);
|
||||
}
|
||||
for (unsigned int i = 0; i < m_list.length(); i++) {
|
||||
if (exiting())
|
||||
break;
|
||||
ObjList* list = m_list.getHashList(i);
|
||||
if (list)
|
||||
list = list->skipNull();
|
||||
// Stop when found a non timed out item:
|
||||
// we put them in the list in ascending order of timeout
|
||||
for (; list; list = list->skipNull()) {
|
||||
CacheItem* item = static_cast<CacheItem*>(list->get());
|
||||
if (!item->timeout(time))
|
||||
break;
|
||||
dumpItem(*this,*item,"removing timed out");
|
||||
list->remove();
|
||||
m_count--;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Clear the cache
|
||||
void Cache::clear()
|
||||
{
|
||||
Lock lck(this);
|
||||
m_list.clear();
|
||||
m_count = 0;
|
||||
}
|
||||
|
||||
// Retrieve cache name
|
||||
const String& Cache::toString() const
|
||||
{
|
||||
return m_name;
|
||||
}
|
||||
|
||||
void Cache::destroyed()
|
||||
{
|
||||
Debug(&__plugin,DebugInfo,"Cache(%s) destroyed [%p]",m_name.c_str(),this);
|
||||
clear();
|
||||
RefObject::destroyed();
|
||||
}
|
||||
|
||||
// (Re)init
|
||||
void Cache::doUpdate(const NamedList& params, bool first)
|
||||
{
|
||||
String account;
|
||||
__plugin.getAccount(account);
|
||||
Lock lck(this);
|
||||
if (first) {
|
||||
int ttl = safeValue(params.getIntValue("ttl",s_cacheTtlSec));
|
||||
m_cacheTtl = (u_int64_t)adjustedCacheTtl(ttl) * 1000000;
|
||||
}
|
||||
m_limit = adjustedCacheLimit(params.getIntValue("limit",s_limit),m_list.length());
|
||||
if (m_limit)
|
||||
m_limitOverflow = m_limit + (m_limit / 100);
|
||||
else
|
||||
m_limitOverflow = 0;
|
||||
m_copyParams = params.getValue("copyparams");
|
||||
m_account = params.getValue("account",account);
|
||||
m_queryLoadCache = params.getValue("query_loadcache");
|
||||
m_queryLoadItem = params.getValue("query_loaditem");
|
||||
m_querySave = params.getValue("query_save");
|
||||
m_queryExpire = params.getValue("query_expire");
|
||||
String all;
|
||||
#ifdef DEBUG
|
||||
if (m_account) {
|
||||
all << " copyparams=" << m_copyParams;
|
||||
all << " account=" << m_account;
|
||||
all << " query_loadcache=" << m_queryLoadCache;
|
||||
all << " query_loaditem=" << m_queryLoadItem;
|
||||
all << " query_save=" << m_querySave;
|
||||
all << " query_expire=" << m_queryExpire;
|
||||
}
|
||||
#endif
|
||||
Debug(&__plugin,DebugInfo,"Cache(%s) updated ttl=%u limit=%u%s [%p]",
|
||||
m_name.c_str(),(unsigned int)(m_cacheTtl / 1000000),m_limit,all.safe(),this);
|
||||
}
|
||||
|
||||
// Add an item to the cache. Remove an existing one
|
||||
CacheItem* Cache::addUnsafe(const String& id, const NamedList& params, const String* cpParams,
|
||||
bool dbSave)
|
||||
{
|
||||
XDebug(&__plugin,DebugAll,"Cache::add(%s,%p,'%s',%u) [%p]",
|
||||
id.c_str(),¶ms,TelEngine::c_safe(cpParams),dbSave,this);
|
||||
unsigned int idx = index(id);
|
||||
ObjList* list = m_list.getHashList(idx);
|
||||
if (list)
|
||||
list = list->skipNull();
|
||||
u_int64_t expires = 0;
|
||||
if (!dbSave)
|
||||
expires = (u_int64_t)safeValue(params.getIntValue("expires")) * 1000000;
|
||||
if (!expires)
|
||||
expires = Time::now() + m_cacheTtl;
|
||||
// Search for insert/add point and existing item
|
||||
ObjList* insert = 0;
|
||||
bool found = false;
|
||||
while (list) {
|
||||
CacheItem* crt = static_cast<CacheItem*>(list->get());
|
||||
if (!insert && crt->expires() > expires) {
|
||||
insert = list;
|
||||
if (found)
|
||||
break;
|
||||
}
|
||||
if (!found && id == crt->toString()) {
|
||||
if (crt->expires() > expires) {
|
||||
// Deny update for oldest item
|
||||
return crt;
|
||||
}
|
||||
if (insert == list)
|
||||
insert = 0;
|
||||
list->remove();
|
||||
found = true;
|
||||
if (insert)
|
||||
break;
|
||||
}
|
||||
ObjList* next = list->skipNext();
|
||||
if (next)
|
||||
list = next;
|
||||
else
|
||||
break;
|
||||
}
|
||||
CacheItem* item = new CacheItem(id,params,cpParams ? *cpParams : m_copyParams,expires);
|
||||
if (insert)
|
||||
insert->insert(item);
|
||||
else if (list)
|
||||
list->append(item);
|
||||
else
|
||||
m_list.append(item);
|
||||
if (dbSave && m_account && m_querySave) {
|
||||
String query = m_querySave;
|
||||
NamedList p(*item);
|
||||
p.setParam("id",item->toString());
|
||||
p.setParam("expires",String((unsigned int)(expires / 1000000)));
|
||||
p.replaceParams(query);
|
||||
Message* m = new Message("database");
|
||||
m->addParam("account",m_account);
|
||||
m->addParam("query",query);
|
||||
Engine::enqueue(m);
|
||||
}
|
||||
dumpItem(*this,*item,!found ? "added" : "updated");
|
||||
if (found)
|
||||
return item;
|
||||
m_count++;
|
||||
if (m_limitOverflow && m_count > m_limitOverflow)
|
||||
adjustToLimit(item);
|
||||
return item;
|
||||
}
|
||||
|
||||
// Add an item from an Array row
|
||||
CacheItem* Cache::addUnsafe(Array& array, int row, int cols)
|
||||
{
|
||||
XDebug(&__plugin,DebugAll,"Cache::add(%p,%d,%d) [%p]",&array,row,cols,this);
|
||||
NamedList p("");
|
||||
for (int col = 0; col < cols; col++) {
|
||||
String* colName = YOBJECT(String,array.get(col,0));
|
||||
if (TelEngine::null(colName))
|
||||
continue;
|
||||
String* colVal = YOBJECT(String,array.get(col,row));
|
||||
if (!colVal)
|
||||
continue;
|
||||
if (*colName == "id")
|
||||
p.assign(*colVal);
|
||||
else
|
||||
p.addParam(*colName,*colVal);
|
||||
}
|
||||
return p ? addUnsafe(p,p,0,false) : 0;
|
||||
}
|
||||
|
||||
// Find a cache item. This method is not thread safe
|
||||
CacheItem* Cache::find(const String& id)
|
||||
{
|
||||
ObjList* o = m_list.find(id);
|
||||
return o ? static_cast<CacheItem*>(o->get()) : 0;
|
||||
}
|
||||
|
||||
// Adjust cache length to limit
|
||||
void Cache::adjustToLimit(CacheItem* skipAdded)
|
||||
{
|
||||
if (!m_limit || m_count <= m_limit)
|
||||
return;
|
||||
Debug(&__plugin,DebugAll,"Cache(%s) adjusting to limit %u count=%u [%p]",
|
||||
m_name.c_str(),m_limit,m_count,this);
|
||||
while (m_count > m_limit) {
|
||||
CacheItem* found = 0;
|
||||
for (unsigned int i = 0; i < m_list.length(); i++) {
|
||||
ObjList* list = m_list.getHashList(i);
|
||||
if (list)
|
||||
list = list->skipNull();
|
||||
CacheItem* item = list ? static_cast<CacheItem*>(list->get()) : 0;
|
||||
if (!item || item == skipAdded)
|
||||
continue;
|
||||
if (!found || found->m_expires > item->m_expires)
|
||||
found = item;
|
||||
}
|
||||
if (found) {
|
||||
dumpItem(*this,*found,"removing oldest");
|
||||
m_list.remove(found);
|
||||
m_count--;
|
||||
continue;
|
||||
}
|
||||
Debug(&__plugin,DebugGoOn,
|
||||
"Cache(%s) can't find the oldest item count=%u limit=%u [%p]",
|
||||
m_name.c_str(),m_count,m_limit,this);
|
||||
m_count = m_list.count();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CacheExpireThread
|
||||
*/
|
||||
void CacheExpireThread::run()
|
||||
{
|
||||
static const String s_caches[] = {"lnp", "cnam", ""};
|
||||
Debug(&__plugin,DebugAll,"%s start running [%p]",currentName(),this);
|
||||
u_int64_t nextCheck = Time::now() + s_checkToutInterval;
|
||||
while (true) {
|
||||
Thread::idle();
|
||||
if (exiting())
|
||||
break;
|
||||
Time time;
|
||||
if (nextCheck > time)
|
||||
continue;
|
||||
for (int i = 0; s_caches[i]; i++) {
|
||||
RefPointer<Cache> cache;
|
||||
__plugin.getCache(cache,s_caches[i]);
|
||||
if (cache)
|
||||
cache->expire(time);
|
||||
cache = 0;
|
||||
}
|
||||
nextCheck = time + s_checkToutInterval;
|
||||
}
|
||||
Debug(&__plugin,DebugAll,"%s stopped [%p]",currentName(),this);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CacheLoadThread
|
||||
*/
|
||||
void CacheLoadThread::run()
|
||||
{
|
||||
Debug(&__plugin,DebugAll,"%s start running cache=%s [%p]",
|
||||
currentName(),m_cache.c_str(),this);
|
||||
__plugin.loadCache(m_cache,false);
|
||||
Debug(&__plugin,DebugAll,"%s stopped cache=%s [%p]",
|
||||
currentName(),m_cache.c_str(),this);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* EngineStartHandler
|
||||
*/
|
||||
bool EngineStartHandler::received(Message& msg)
|
||||
{
|
||||
s_engineStarted = true;
|
||||
__plugin.loadCache("lnp");
|
||||
__plugin.loadCache("cnam");
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CacheModule
|
||||
*/
|
||||
CacheModule::CacheModule()
|
||||
: Module("cache"),
|
||||
m_lnpCache(0), m_cnamCache(0)
|
||||
{
|
||||
Output("Loaded module Cache");
|
||||
}
|
||||
|
||||
CacheModule::~CacheModule()
|
||||
{
|
||||
Output("Unloading module Cache");
|
||||
TelEngine::destruct(m_lnpCache);
|
||||
TelEngine::destruct(m_cnamCache);
|
||||
}
|
||||
|
||||
// Build/update a cache
|
||||
void CacheModule::setupCache(const String& name, const NamedList& params)
|
||||
{
|
||||
Lock lck(this);
|
||||
Cache** c = 0;
|
||||
bool lnp = (name == "lnp");
|
||||
if (lnp)
|
||||
c = &m_lnpCache;
|
||||
else if (name == "cnam")
|
||||
c = &m_cnamCache;
|
||||
else
|
||||
return;
|
||||
bool enabled = params.getBoolValue("enable");
|
||||
if (!*c) {
|
||||
if (!enabled)
|
||||
return;
|
||||
unsigned int size = adjustedCacheSize(params.getIntValue("size",s_size));
|
||||
*c = new Cache(name,size,params);
|
||||
// Install relays
|
||||
if (lnp) {
|
||||
// LnpBefore is an alias for Route
|
||||
installRelay(LnpBefore,params.getIntValue("routebefore",25));
|
||||
installRelay(LnpAfter,"call.route",params.getIntValue("routeafter",75));
|
||||
}
|
||||
else {
|
||||
installRelay(CnamBefore,"call.preroute",params.getIntValue("routebefore",25));
|
||||
installRelay(CnamAfter,"call.preroute",params.getIntValue("routeafter",75));
|
||||
}
|
||||
if (s_engineStarted)
|
||||
loadCache(name);
|
||||
return;
|
||||
}
|
||||
if (enabled) {
|
||||
RefPointer<Cache> cache = *c;
|
||||
lck.drop();
|
||||
cache->update(params);
|
||||
cache = 0;
|
||||
}
|
||||
else
|
||||
TelEngine::destruct(*c);
|
||||
}
|
||||
|
||||
// Start cache load thread
|
||||
void CacheModule::loadCache(const String& name, bool async)
|
||||
{
|
||||
XDebug(this,DebugAll,"loadCache(%s,%u)",name.c_str(),async);
|
||||
RefPointer<Cache> cache;
|
||||
getCache(cache,name);
|
||||
if (!cache)
|
||||
return;
|
||||
String account;
|
||||
String query;
|
||||
cache->getDbLoad(account,query);
|
||||
cache = 0;
|
||||
if (!(account && query))
|
||||
return;
|
||||
if (async) {
|
||||
cache = 0;
|
||||
(new CacheLoadThread(name))->startup();
|
||||
return;
|
||||
}
|
||||
Debug(this,DebugInfo,"Loading cache '%s'",name.c_str());
|
||||
Message m("database");
|
||||
m.addParam("account",account);
|
||||
m.addParam("query",query);
|
||||
bool ok = Engine::dispatch(m);
|
||||
if (exiting())
|
||||
return;
|
||||
const char* error = m.getValue("error");
|
||||
if (!ok || error) {
|
||||
Debug(this,DebugNote,"Failed to load cache '%s' %s",
|
||||
name.c_str(),TelEngine::c_safe(error));
|
||||
return;
|
||||
}
|
||||
getCache(cache,name);
|
||||
if (!cache) {
|
||||
Debug(this,DebugInfo,"Cache '%s' vanished while loading",name.c_str());
|
||||
return;
|
||||
}
|
||||
Array* a = static_cast<Array*>(m.userObject("Array"));
|
||||
int rows = a ? a->getRows() : 0;
|
||||
int cols = a ? a->getColumns() : 0;
|
||||
for (int row = 1; row < rows; row++) {
|
||||
if (exiting())
|
||||
break;
|
||||
cache->add(*a,row,cols);
|
||||
// Take a breath, let others do their job
|
||||
if (0 == (row % 500))
|
||||
Thread::idle();
|
||||
}
|
||||
Debug(this,DebugInfo,"Loaded %d items in cache '%s'",rows ? rows - 1 : 0,name.c_str());
|
||||
cache = 0;
|
||||
}
|
||||
|
||||
void CacheModule::initialize()
|
||||
{
|
||||
static bool s_first = true;
|
||||
static bool s_init = true;
|
||||
Output("Initializing module Cache");
|
||||
Configuration cfg(Engine::configFile("cache"));
|
||||
// Globals
|
||||
s_size = adjustedCacheSize(cfg.getIntValue("general","size",17));
|
||||
s_limit = adjustedCacheLimit(cfg.getIntValue("general","limit",s_limit),s_size);
|
||||
s_cacheTtlSec = adjustedCacheTtl(cfg.getIntValue("general","ttl"));
|
||||
unsigned int tmp = safeValue(cfg.getIntValue("general","expire_check_interval",10));
|
||||
if (tmp > s_cacheTtlSec)
|
||||
tmp = s_cacheTtlSec;
|
||||
if (tmp >= 1 && tmp <= EXPIRE_CHECK_MAX)
|
||||
s_checkToutInterval = tmp * 1000000;
|
||||
else if (tmp)
|
||||
s_checkToutInterval = EXPIRE_CHECK_MAX * 1000000;
|
||||
else
|
||||
s_checkToutInterval = 1000000;
|
||||
lock();
|
||||
m_account = cfg.getValue("general","account");
|
||||
unlock();
|
||||
// Update cache objects
|
||||
NamedList* lnp = cfg.getSection("lnp");
|
||||
if (lnp) {
|
||||
// Set default copyparams
|
||||
if (!lnp->getValue("copyparams"))
|
||||
lnp->setParam("copyparams","routing");
|
||||
setupCache(*lnp,*lnp);
|
||||
s_lnpStoreFailed = lnp->getBoolValue("store_failed_requests");
|
||||
s_lnpStoreNpdiBefore = lnp->getBoolValue("store_npdi_before");
|
||||
}
|
||||
NamedList* cnam = cfg.getSection("cnam");
|
||||
if (cnam) {
|
||||
// Set default copyparams
|
||||
if (!cnam->getValue("copyparams"))
|
||||
cnam->setParam("copyparams","callername");
|
||||
setupCache(*cnam,*cnam);
|
||||
s_cnamStoreEmpty = cnam->getBoolValue("store_empty");
|
||||
}
|
||||
// Init module
|
||||
if (s_first) {
|
||||
Engine::install(new EngineStartHandler);
|
||||
s_first = false;
|
||||
}
|
||||
if (s_init) {
|
||||
// Setup if we have a cache
|
||||
lock();
|
||||
bool ok = m_lnpCache || m_cnamCache;
|
||||
unlock();
|
||||
if (ok) {
|
||||
DDebug(this,DebugAll,"Initializing");
|
||||
setup();
|
||||
// Expire thread
|
||||
(new CacheExpireThread)->startup();
|
||||
s_init = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool CacheModule::received(Message& msg, int id)
|
||||
{
|
||||
if (id == LnpBefore || id == LnpAfter) {
|
||||
handleLnp(msg,id == LnpBefore);
|
||||
return false;
|
||||
}
|
||||
if (id == CnamBefore || id == CnamAfter) {
|
||||
handleCnam(msg,id == CnamBefore);
|
||||
return false;
|
||||
}
|
||||
return Module::received(msg,id);
|
||||
}
|
||||
|
||||
// Handle messages for LNP
|
||||
void CacheModule::handleLnp(Message& msg, bool before)
|
||||
{
|
||||
if (!(before || msg.getBoolValue("cache_lnp_posthook")))
|
||||
return;
|
||||
const String& called = msg["called"];
|
||||
if (!called)
|
||||
return;
|
||||
RefPointer<Cache> lnp;
|
||||
getCache(lnp,"lnp");
|
||||
if (!lnp)
|
||||
return;
|
||||
Debug(this,DebugAll,"handleLnp(%s) called=%s routing=%s querylnp=%s npdi=%s",
|
||||
(before ? "before" : "after"),msg.getValue("called"),
|
||||
msg.getValue("routing"),msg.getValue("querylnp"),msg.getValue("npdi"));
|
||||
bool querylnp = msg.getBoolValue("querylnp");
|
||||
if (before) {
|
||||
if (querylnp) {
|
||||
// LNP requested: check the cache
|
||||
if (lnp->copyParams(called,msg,msg.getParam("cache_lnp_parameters")))
|
||||
msg.setParam("querylnp",String::boolText(false));
|
||||
else
|
||||
msg.setParam("cache_lnp_posthook",String::boolText(true));
|
||||
}
|
||||
else if (msg.getBoolValue("npdi") &&
|
||||
msg.getBoolValue("cache_lnp_store",s_lnpStoreNpdiBefore)) {
|
||||
// LNP already done: update cache
|
||||
lnp->add(called,msg,msg.getParam("cache_lnp_parameters"));
|
||||
}
|
||||
}
|
||||
else if (!querylnp || s_lnpStoreFailed || msg.getBoolValue("npdi")) {
|
||||
// querylnp=true: request failed
|
||||
// LNP query made locally: update cache
|
||||
lnp->add(called,msg,msg.getParam("cache_lnp_parameters"));
|
||||
}
|
||||
lnp = 0;
|
||||
}
|
||||
|
||||
// Handle messages for CNAM
|
||||
void CacheModule::handleCnam(Message& msg, bool before)
|
||||
{
|
||||
if (!(before || msg.getBoolValue("cache_cnam_posthook")))
|
||||
return;
|
||||
const String& caller = msg["caller"];
|
||||
if (!caller)
|
||||
return;
|
||||
RefPointer<Cache> cnam;
|
||||
getCache(cnam,"cnam");
|
||||
if (!cnam)
|
||||
return;
|
||||
Debug(this,DebugAll,"handleCnam(%s) caller=%s callername=%s querycnam=%s",
|
||||
(before ? "before" : "after"),msg.getValue("caller"),
|
||||
msg.getValue("callername"),msg.getValue("querycnam"));
|
||||
bool querycnam = msg.getBoolValue("querycnam");
|
||||
if (before) {
|
||||
if (querycnam) {
|
||||
// CNAM requested: check the cache
|
||||
if (cnam->copyParams(caller,msg,msg.getParam("cache_cnam_parameters")))
|
||||
msg.setParam("querycnam",String::boolText(false));
|
||||
else
|
||||
msg.setParam("cache_cnam_posthook",String::boolText(true));
|
||||
}
|
||||
}
|
||||
else if (!querycnam && (s_cnamStoreEmpty || msg.getValue("callername"))) {
|
||||
// querycnam=true: request failed
|
||||
// CNAM query made locally: update cache
|
||||
cnam->add(caller,msg,msg.getParam("cache_cnam_parameters"));
|
||||
}
|
||||
cnam = 0;
|
||||
}
|
||||
|
||||
}; // anonymous namespace
|
||||
|
||||
/* vi: set ts=8 sw=4 sts=4 noet: */
|
Loading…
Reference in New Issue