Allow loading a cache using multiple database requests. Added optional extra account used to load a cache.

git-svn-id: http://voip.null.ro/svn/yate@4440 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
marian 2011-06-07 07:45:50 +00:00
parent c1569b9b5f
commit eda2587ff0
2 changed files with 171 additions and 40 deletions

View File

@ -20,6 +20,28 @@
; This parameter is applied on reload and can be overridden in cache sections
;limit=
; loadchunk: integer: The number of items to load in a database request
; Minimum allowed value is 500, maximum allowed value is 10000
; Set it to 0 to load the whole cache using a single database request
; This parameter is applied on reload and can be overridden in cache sections
; NOTES for non 0 value:
; - The 'query_loadcache' parameter in cache sections should contain an 'ORDER BY'
; clause to make sure the cache table is parsed in the same order
; - The 'query_loadcache' query MUST be a parameterized query containing
; LIMIT ${chunk} OFFSET ${offset}. The module will replace chunk and offset for each
; sent database request
;loadchunk=0
; maxchunks: integer: Maximum number of chunks to load from cache
; Minimum allowed value is 1, maximum allowed value is 10000
; Defaults to 1000
; This parameter is applied on reload
; WARNING:
; - Set a large value only if you are sure the cache load queries are correct
; - Setting a large value for a query without LIMIT or OFFSET will lead to
; useless extra processing
;maxchunks=1000
; expire_check_interval: integer: The interval (in seconds) to check cache expire
; Defaults to 10, minimum allowed value 1, maximum allowed value 300
; This parameter is applied on reload
@ -29,6 +51,10 @@
; This parameter is applied on reload and can be overridden in cache sections
;account=
; account_loadcache: string: Optional database account used to load an entire cache
; This parameter is applied on reload and can be overridden in cache sections
;account_loadcache=
[lnp]
; This section configures the LNP cache
@ -49,9 +75,9 @@
; store_npdi_before: boolean: Store routing number from incoming calls with LNP
; This parameter can be overidden by a 'cache_lnp_store' parameter when routing
; Defaults to yes
; Defaults to no
; This parameter is applied on reload
;store_npdi_before=yes
;store_npdi_before=no
; copyparams: string: Parameters to handle (store in cache or copy to handled messages)
; This parameter is applied on reload and can be overridden when routing by
@ -72,6 +98,8 @@
; query_loadcache: string: Database query used to load the LNP cache when created
; This parameter is applied on reload
;query_loadcache=SELECT FLOOR(EXTRACT('EPOCH' FROM (timeout - CURRENT_TIMESTAMP))) AS expires,* FROM lnp
; For non 0 'loadchunk'
;query_loadcache=SELECT FLOOR(EXTRACT('EPOCH' FROM (timeout - CURRENT_TIMESTAMP))) AS expires,* FROM lnp ORDER BY timeout LIMIT ${chunk} OFFSET ${offset}
; query_loaditem: string: Database query used to load an item when created
; This parameter is applied on reload
@ -122,6 +150,8 @@
; query_loadcache: string: Database query used to load the CNAM cache when created
; This parameter is applied on reload
;query_loadcache=SELECT FLOOR(EXTRACT('EPOCH' FROM (timeout - CURRENT_TIMESTAMP))) AS expires,* FROM cnam
; For non 0 'loadchunk'
;query_loadcache=SELECT FLOOR(EXTRACT('EPOCH' FROM (timeout - CURRENT_TIMESTAMP))) AS expires,* FROM cnam ORDER BY timeout LIMIT ${chunk} OFFSET ${offset}
; query_loaditem: string: Database query used to load an item when created
; This parameter is applied on reload

View File

@ -72,10 +72,11 @@ public:
inline unsigned int index(const String& str) const
{ return str.hash() % m_list.length(); }
// Safely retrieve DB load info
inline void getDbLoad(String& account, String& query) {
inline void getDbLoad(String& account, String& query, unsigned int& loadChunk) {
Lock lock(this);
account = m_account;
account = (m_accountLoadCache ? m_accountLoadCache : m_account);
query = m_queryLoadCache;
loadChunk = m_loadChunk;
}
// Reinit
inline void update(const NamedList& params)
@ -96,10 +97,15 @@ public:
Lock lock(this);
addUnsafe(array,row,cols);
}
// Add items from Array rows. Return the number of added rows
unsigned int addRows(Array& array, int startRow, int numRows);
// Clear the cache
void clear();
// Retrieve cache name
virtual const String& toString() const;
// Set chunk limit and offset to a query
// Return the number of replaced params
static int setLimits(String& query, unsigned int chunk, unsigned int offset);
protected:
virtual void destroyed();
// (Re)init
@ -120,8 +126,10 @@ protected:
unsigned int m_count; // Current number of items
unsigned int m_limit; // Limit the number of cache items
unsigned int m_limitOverflow; // Allowed limit overflow
unsigned int m_loadChunk; // The number of items to load in each DB load query
String m_copyParams; // Item parameters to store/copy
String m_account; // Database account
String m_accountLoadCache; // Load cache account
String m_queryLoadCache; // Database load all cache query
String m_queryLoadItem; // Database load a cache item query
String m_querySave; // Database save query
@ -169,9 +177,9 @@ public:
CacheModule();
~CacheModule();
// Safely retrieve the database account
inline void getAccount(String& buf) {
inline void getAccount(String& buf, bool cacheLoad = false) {
Lock lock(this);
buf = m_account;
buf = !cacheLoad ? m_account : m_accountLoadCache;
}
// Safely retrieve a reference to a cache
inline void getCache(RefPointer<Cache>& c, const String& name) {
@ -200,6 +208,7 @@ protected:
void handleCnam(Message& msg, bool before);
String m_account; // Database account
String m_accountLoadCache; // Load cache account
Cache* m_lnpCache; // LNP cache
Cache* m_cnamCache; // CNAM cache
};
@ -212,6 +221,8 @@ static bool s_lnpStoreNpdiBefore = true; // Store LNP when already done
static bool s_cnamStoreEmpty = false; // Store empty caller name in CNAM cache
static unsigned int s_size = 0; // The number of listst in each cache
static unsigned int s_limit = 0; // Default cache limit
static unsigned int s_loadChunk = 0; // The number of cache items to load in each DB load query
static unsigned int s_maxChunks = 1000; // Maximum number of chunks to load in a cache
static unsigned int s_cacheTtlSec = 0; // Default cache item time to live (in seconds)
static u_int64_t s_checkToutInterval = 0;// Interval to check cache timeout
@ -250,10 +261,20 @@ static inline unsigned int adjustedCacheTtl(int val)
return val > 10 ? val : 10;
}
// Adjust a cache load chunk
static inline unsigned int adjustedCacheLoadChunk(int val)
{
if (val <= 0)
return 0;
if (val >= 500 && val <= 10000)
return val;
return val < 500 ? 500 : 10000;
}
// Show cache item changes to output
static inline void dumpItem(Cache& c, CacheItem& item, const char* oper)
{
#ifdef DEBUG
#ifdef XDEBUG
String tmp;
item.dump(tmp," ");
Debug(&__plugin,DebugAll,"Cache(%s) %s %p %s expires=%u [%p]",
@ -269,7 +290,7 @@ static inline void dumpItem(Cache& c, CacheItem& item, const char* oper)
Cache::Cache(const String& name, int size, const NamedList& params)
: Mutex(false,"Cache"),
m_name(name), m_list(size), m_cacheTtl(0), m_count(0), m_limit(0),
m_limitOverflow(0)
m_limitOverflow(0), m_loadChunk(0)
{
Debug(&__plugin,DebugInfo,"Cache(%s) size=%u [%p]",
m_name.c_str(),m_list.length(),this);
@ -353,6 +374,23 @@ void Cache::expire(const Time& time)
}
}
// Add items from Array rows. Return the number of added rows
unsigned int Cache::addRows(Array& array, int startRow, int numRows)
{
if (numRows <= 0)
return 0;
int cols = array.getColumns();
if (cols < 1)
return 0;
int rows = array.getRows();
unsigned int added = 0;
Lock lock(this);
for (; numRows && startRow < rows; startRow++, numRows--)
if (addUnsafe(array,startRow,cols))
added++;
return added;
}
// Clear the cache
void Cache::clear()
{
@ -367,6 +405,16 @@ const String& Cache::toString() const
return m_name;
}
// Set chunk limit and offset to a query
// Return the number of replaced params
int Cache::setLimits(String& query, unsigned int chunk, unsigned int offset)
{
NamedList params("");
params.addParam("chunk",String(chunk));
params.addParam("offset",String(offset));
return params.replaceParams(query);
}
void Cache::destroyed()
{
Debug(&__plugin,DebugInfo,"Cache(%s) destroyed [%p]",m_name.c_str(),this);
@ -378,7 +426,9 @@ void Cache::destroyed()
void Cache::doUpdate(const NamedList& params, bool first)
{
String account;
String accountLoadCache;
__plugin.getAccount(account);
__plugin.getAccount(accountLoadCache,true);
Lock lck(this);
if (first) {
int ttl = safeValue(params.getIntValue("ttl",s_cacheTtlSec));
@ -389,25 +439,39 @@ void Cache::doUpdate(const NamedList& params, bool first)
m_limitOverflow = m_limit + (m_limit / 100);
else
m_limitOverflow = 0;
m_loadChunk = adjustedCacheLoadChunk(params.getIntValue("loadchunk",s_loadChunk));
m_copyParams = params.getValue("copyparams");
m_account = params.getValue("account",account);
m_accountLoadCache = params.getValue("account_loadcache",accountLoadCache);
m_queryLoadCache = params.getValue("query_loadcache");
m_queryLoadItem = params.getValue("query_loaditem");
m_querySave = params.getValue("query_save");
m_queryExpire = params.getValue("query_expire");
// Minimum sanity check for cache load
if (m_loadChunk && m_queryLoadCache) {
String tmp = m_queryLoadCache;
if (setLimits(tmp,m_loadChunk,0) < 2) {
Debug(&__plugin,DebugNote,"Cache(%s) invalid query_loadcache='%s' for loadchunk=%u [%p]",
m_name.c_str(),m_queryLoadCache.c_str(),m_loadChunk,this);
m_loadChunk = 0;
}
}
String all;
#ifdef DEBUG
if (m_account) {
all << " copyparams=" << m_copyParams;
all << " loadchunk=" << m_loadChunk;
all << " account=" << m_account;
all << " account_loadcache=" << m_accountLoadCache;
all << " query_loadcache=" << m_queryLoadCache;
all << " query_loaditem=" << m_queryLoadItem;
all << " query_save=" << m_querySave;
all << " query_expire=" << m_queryExpire;
}
#endif
Debug(&__plugin,DebugInfo,"Cache(%s) updated ttl=%u limit=%u%s [%p]",
m_name.c_str(),(unsigned int)(m_cacheTtl / 1000000),m_limit,all.safe(),this);
Debug(&__plugin,DebugInfo,
"Cache(%s) updated ttl=%u limit=%u copyparams='%s'%s [%p]",
m_name.c_str(),(unsigned int)(m_cacheTtl / 1000000),m_limit,
m_copyParams.safe(),all.safe(),this);
}
// Add an item to the cache. Remove an existing one
@ -674,46 +738,76 @@ void CacheModule::loadCache(const String& name, bool async)
return;
String account;
String query;
cache->getDbLoad(account,query);
unsigned int chunk = 0;
cache->getDbLoad(account,query,chunk);
cache = 0;
if (!(account && query))
return;
if (async) {
cache = 0;
(new CacheLoadThread(name))->startup();
return;
}
Debug(this,DebugInfo,"Loading cache '%s'",name.c_str());
Message m("database");
m.addParam("account",account);
m.addParam("query",query);
bool ok = Engine::dispatch(m);
if (exiting())
return;
const char* error = m.getValue("error");
if (!ok || error) {
Debug(this,DebugNote,"Failed to load cache '%s' %s",
name.c_str(),TelEngine::c_safe(error));
return;
}
getCache(cache,name);
if (!cache) {
Debug(this,DebugInfo,"Cache '%s' vanished while loading",name.c_str());
return;
}
Array* a = static_cast<Array*>(m.userObject("Array"));
int rows = a ? a->getRows() : 0;
int cols = a ? a->getColumns() : 0;
for (int row = 1; row < rows; row++) {
Debug(this,DebugInfo,"Loading cache '%s' chunk=%u",name.c_str(),chunk);
unsigned int loaded = 0;
unsigned int failed = 0;
unsigned int offset = 0;
unsigned int max = chunk ? s_maxChunks : 1;
for (unsigned int i = 0; i < max; i++) {
Message m("database");
m.addParam("account",account);
if (chunk) {
String tmp = query;
Cache::setLimits(tmp,chunk,offset);
m.addParam("query",tmp);
}
else
m.addParam("query",query);
bool ok = Engine::dispatch(m);
if (exiting())
return;
const char* error = m.getValue("error");
if (!ok || error) {
Debug(this,DebugNote,"Failed to load cache '%s' reason=%s",
name.c_str(),TelEngine::c_safe(error));
return;
}
getCache(cache,name);
if (!cache) {
Debug(this,DebugInfo,"Cache '%s' vanished while loading",name.c_str());
return;
}
Array* a = static_cast<Array*>(m.userObject("Array"));
int rows = a ? a->getRows() : 0;
unsigned int loadedRows = (rows > 0) ? rows - 1 : 0;
Debug(this,DebugAll,"Loaded %u rows current chunk=%u for cache '%s'",
loadedRows,i + 1,name.c_str());
if (!loadedRows) {
cache = 0;
break;
cache->add(*a,row,cols);
// Take a breath, let others do their job
if (0 == (row % 500))
}
offset += loadedRows;
loaded += loadedRows;
for (int row = 1; row < rows;) {
unsigned int remaining = rows - row;
unsigned int numRows = 500;
if (remaining < numRows)
numRows = remaining;
unsigned int added = cache->addRows(*a,row,numRows);
row += numRows;
if (added < numRows)
failed += numRows - added;
// Take a breath, let others do their job
Thread::idle();
if (exiting())
return;
}
cache = 0;
// Stop if got less then requested
if (chunk && loadedRows < chunk)
break;
}
Debug(this,DebugInfo,"Loaded %d items in cache '%s'",rows ? rows - 1 : 0,name.c_str());
cache = 0;
Debug(this,DebugInfo,"Loaded %u items (failed=%u) in cache '%s'",
loaded,failed,name.c_str());
}
void CacheModule::initialize()
@ -725,6 +819,12 @@ void CacheModule::initialize()
// Globals
s_size = adjustedCacheSize(cfg.getIntValue("general","size",17));
s_limit = adjustedCacheLimit(cfg.getIntValue("general","limit",s_limit),s_size);
s_loadChunk = adjustedCacheLoadChunk(cfg.getIntValue("general","loadchunk"));
s_maxChunks = safeValue(cfg.getIntValue("general","maxchunks",1000));
if (!s_maxChunks)
s_maxChunks = 1;
else if (s_maxChunks > 10000)
s_maxChunks = 10000;
s_cacheTtlSec = adjustedCacheTtl(cfg.getIntValue("general","ttl"));
unsigned int tmp = safeValue(cfg.getIntValue("general","expire_check_interval",10));
if (tmp > s_cacheTtlSec)
@ -737,6 +837,7 @@ void CacheModule::initialize()
s_checkToutInterval = 1000000;
lock();
m_account = cfg.getValue("general","account");
m_accountLoadCache = cfg.getValue("general","account_loadcache");
unlock();
// Update cache objects
NamedList* lnp = cfg.getSection("lnp");