Added optional cache periodic reload parameter. Allow a cache to have a 0 ttl (no expire).

git-svn-id: http://yate.null.ro/svn/yate/trunk@4501 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
marian 2011-07-13 15:20:08 +00:00
parent 5841ccec16
commit daae56df24
2 changed files with 150 additions and 14 deletions

View File

@ -56,6 +56,16 @@
;account_loadcache=
; The following parameters can be set in cache sections
; reload_interval: integer: Interval (in seconds) to reload the cache
; This parameter is applied on reload
; This parameter will be ignored if the cache don't have a load account and query
; Minimum allowed value is 10. Set it to 0 to disable cache reload
; Defaults to 0 (no reload)
;reload_interval=0
[lnp]
; This section configures the LNP cache
; Database query examples assume a 'lnp' table with the following fields:

View File

@ -37,6 +37,8 @@ class CacheModule;
// Max value for cache expire check interval
#define EXPIRE_CHECK_MAX 300
// Min value for cache reload interval in seconds
#define CACHE_RELOAD_MIN 10
class CacheItem : public NamedList
{
@ -68,6 +70,12 @@ public:
// Retrieve the number of items in cache
inline unsigned int count() const
{ return m_count; }
// Retrieve the cache TTL
inline u_int64_t cacheTtl() const
{ return m_cacheTtl; }
// Check if the cache has reload set
inline bool canReload()
{ return m_loadInterval != 0; }
// Retrieve the mutex protecting a given list
inline unsigned int index(const String& str) const
{ return str.hash() % m_list.length(); }
@ -83,6 +91,14 @@ public:
{ doUpdate(params,false); }
// Expire entries
void expire(const Time& time);
// Reload the cache if not currently loading and set it to reload
// Set force to true to ignore the time to reload value
bool reload(const Time& time, bool force = false);
// Check if the cache can be loaded. Set the loading flag if true is returned
// endLoad() must be called when done
bool startLoad();
// Reset the loading flag. Set the next re-load time if we have an interval
void endLoad();
// Copy params from cache item. Return true if found
bool copyParams(const String& id, NamedList& list, const String* cpParams);
// Add an item to the cache. Remove an existing one
@ -129,6 +145,9 @@ protected:
unsigned int m_limit; // Limit the number of cache items
unsigned int m_limitOverflow; // Allowed limit overflow
unsigned int m_loadChunk; // The number of items to load in each DB load query
bool m_loading; // Cache is loading from database
unsigned int m_loadInterval; // Cache re-load interval (in seconds)
u_int64_t m_nextLoad; // Next time to load the cache
String m_copyParams; // Item parameters to store/copy
String m_account; // Database account
String m_accountLoadCache; // Load cache account
@ -202,6 +221,8 @@ protected:
virtual void statusModule(String& buf);
virtual void statusParams(String& buf);
virtual void statusDetail(String& buf);
// Update cache reload flag
void updateCacheReload();
// Add a cache to detail
void addCacheDetail(String& buf, Cache* cache);
// Handle messages for LNP
@ -209,6 +230,7 @@ protected:
// Handle messages for CNAM
void handleCnam(Message& msg, bool before);
bool m_haveCacheReload; // True if we have caches to reload
String m_account; // Database account
String m_accountLoadCache; // Load cache account
Cache* m_lnpCache; // LNP cache
@ -228,6 +250,9 @@ static unsigned int s_maxChunks = 1000; // Maximum number of chunks to load in
static unsigned int s_cacheTtlSec = 0; // Default cache item time to live (in seconds)
static u_int64_t s_checkToutInterval = 0;// Interval to check cache timeout
// List of known caches
static const String s_caches[] = {"lnp", "cnam", ""};
// Check if application or current thread are terminating
static inline bool exiting()
{
@ -260,7 +285,7 @@ static inline unsigned int adjustedCacheLimit(int val, int size)
// Adjust a cache TTL
static inline unsigned int adjustedCacheTtl(int val)
{
return val > 10 ? val : 10;
return val > 10 ? val : (!val ? 0 : 10);
}
// Adjust a cache load chunk
@ -292,13 +317,54 @@ static inline void dumpItem(Cache& c, CacheItem& item, const char* oper)
Cache::Cache(const String& name, int size, const NamedList& params)
: Mutex(false,"Cache"),
m_name(name), m_list(size), m_cacheTtl(0), m_count(0), m_limit(0),
m_limitOverflow(0), m_loadChunk(0)
m_limitOverflow(0), m_loadChunk(0),
m_loading(false), m_loadInterval(0), m_nextLoad(0)
{
Debug(&__plugin,DebugInfo,"Cache(%s) size=%u [%p]",
m_name.c_str(),m_list.length(),this);
doUpdate(params,true);
}
// Reload the cache if not currently loading and set it to reload
// Set force to true to ignore the time to reload value
bool Cache::reload(const Time& time, bool force)
{
if (!m_loadInterval || m_loading)
return false;
lock();
String tmp;
if (m_loadInterval && !m_loading && (force || !m_nextLoad || m_nextLoad <= time))
tmp = toString();
unlock();
if (!tmp)
return false;
DDebug(&__plugin,DebugInfo,"Cache(%s) re-loading [%p]",m_name.c_str(),this);
__plugin.loadCache(tmp,true);
return true;
}
// Check if the cache can be loaded. Set the loading flag if true is returned
// endLoad() must be called when done
bool Cache::startLoad()
{
Lock lock(this);
DDebug(&__plugin,DebugInfo,"Cache(%s) startLoad() ok=%u [%p]",
m_name.c_str(),!m_loading,this);
if (m_loading)
return false;
m_loading = true;
return true;
}
// Reset the loading flag. Set the next re-load time if we have an interval
void Cache::endLoad()
{
Lock lock(this);
DDebug(&__plugin,DebugInfo,"Cache(%s) endLoad() [%p]",m_name.c_str(),this);
m_loading = false;
m_nextLoad = m_loadInterval ? (Time::now() + m_loadInterval * 1000000) : 0;
}
// Copy params from cache item. Return true if found
bool Cache::copyParams(const String& id, NamedList& list, const String* cpParams)
{
@ -528,6 +594,15 @@ void Cache::doUpdate(const NamedList& params, bool first)
m_loadChunk = 0;
}
}
if ((m_accountLoadCache || m_account) && m_queryLoadCache) {
unsigned int interval = params.getIntValue("reload_interval");
if (interval)
m_loadInterval = (interval >= CACHE_RELOAD_MIN) ? interval : CACHE_RELOAD_MIN;
else
m_loadInterval = 0;
}
else
m_loadInterval = 0;
String all;
#ifdef DEBUG
if (m_account) {
@ -541,8 +616,8 @@ void Cache::doUpdate(const NamedList& params, bool first)
}
#endif
Debug(&__plugin,DebugInfo,
"Cache(%s) updated ttl=%u limit=%u copyparams='%s'%s [%p]",
m_name.c_str(),(unsigned int)(m_cacheTtl / 1000000),m_limit,
"Cache(%s) updated ttl=%u limit=%u reload_interval=%u copyparams='%s'%s [%p]",
m_name.c_str(),(unsigned int)(m_cacheTtl / 1000000),m_limit,m_loadInterval,
m_copyParams.safe(),all.safe(),this);
}
@ -570,7 +645,7 @@ CacheItem* Cache::addUnsafe(const String& id, const NamedList& params, const Str
}
}
}
if (!expires)
if (!expires && m_cacheTtl)
expires = Time::now() + m_cacheTtl;
// Search for insert/add point and existing item
ObjList* insert = 0;
@ -747,7 +822,7 @@ bool EngineStartHandler::received(Message& msg)
*/
CacheModule::CacheModule()
: Module("cache"),
m_lnpCache(0), m_cnamCache(0)
m_haveCacheReload(false), m_lnpCache(0), m_cnamCache(0)
{
Output("Loaded module Cache");
}
@ -789,6 +864,8 @@ void CacheModule::setupCache(const String& name, const NamedList& params)
}
if (s_engineStarted)
loadCache(name);
lck.drop();
updateCacheReload();
return;
}
if (enabled) {
@ -799,6 +876,8 @@ void CacheModule::setupCache(const String& name, const NamedList& params)
}
else
TelEngine::destruct(*c);
lck.drop();
updateCacheReload();
}
// Start cache load thread
@ -813,18 +892,25 @@ void CacheModule::loadCache(const String& name, bool async)
String query;
unsigned int chunk = 0;
cache->getDbLoad(account,query,chunk);
cache = 0;
if (!(account && query))
if (!(account && query)) {
cache = 0;
return;
}
if (async) {
cache = 0;
(new CacheLoadThread(name))->startup();
return;
}
bool load = cache->startLoad();
cache = 0;
if (!load)
return;
Debug(this,DebugInfo,"Loading cache '%s' chunk=%u",name.c_str(),chunk);
unsigned int loaded = 0;
unsigned int failed = 0;
unsigned int offset = 0;
unsigned int max = chunk ? s_maxChunks : 1;
// NOTE: Don't return from the loop: we must notify the cache
for (unsigned int i = 0; i < max; i++) {
Message m("database");
m.addParam("account",account);
@ -837,17 +923,17 @@ void CacheModule::loadCache(const String& name, bool async)
m.addParam("query",query);
bool ok = Engine::dispatch(m);
if (exiting())
return;
break;
const char* error = m.getValue("error");
if (!ok || error) {
Debug(this,DebugNote,"Failed to load cache '%s' reason=%s",
name.c_str(),TelEngine::c_safe(error));
return;
break;
}
getCache(cache,name);
if (!cache) {
Debug(this,DebugInfo,"Cache '%s' vanished while loading",name.c_str());
return;
break;
}
Array* a = static_cast<Array*>(m.userObject("Array"));
int rows = a ? a->getRows() : 0;
@ -865,11 +951,16 @@ void CacheModule::loadCache(const String& name, bool async)
if (added < loadedRows)
failed += loadedRows - added;
if (exiting())
return;
break;
// Stop if got less then requested
if (chunk && loadedRows < chunk)
break;
}
getCache(cache,name);
if (!cache)
return;
cache->endLoad();
cache = 0;
Debug(this,DebugInfo,"Loaded %u items (failed=%u) in cache '%s'",
loaded,failed,name.c_str());
}
@ -878,6 +969,7 @@ void CacheModule::initialize()
{
static bool s_first = true;
static bool s_init = true;
static bool s_createExpire = true;
Output("Initializing module Cache");
Configuration cfg(Engine::configFile("cache"));
// Globals
@ -938,11 +1030,21 @@ void CacheModule::initialize()
if (ok) {
DDebug(this,DebugAll,"Initializing");
setup();
// Expire thread
(new CacheExpireThread)->startup();
s_init = false;
}
}
if (!s_init && s_createExpire) {
// Create expire thread if we have a cache with non 0 TTL
lock();
bool ok = (m_lnpCache && m_lnpCache->cacheTtl()) ||
(m_cnamCache && m_cnamCache->cacheTtl());
unlock();
if (ok) {
DDebug(this,DebugAll,"Creating expire thread");
(new CacheExpireThread)->startup();
s_createExpire = false;
}
}
}
bool CacheModule::received(Message& msg, int id)
@ -955,6 +1057,17 @@ bool CacheModule::received(Message& msg, int id)
handleCnam(msg,id == CnamBefore);
return false;
}
if (id == Timer) {
if (m_haveCacheReload) {
for (int i = 0; s_caches[i]; i++) {
RefPointer<Cache> cache;
getCache(cache,s_caches[i]);
if (cache)
cache->reload(msg.msgTime());
cache = 0;
}
}
}
return Module::received(msg,id);
}
@ -983,6 +1096,19 @@ void CacheModule::statusDetail(String& buf)
addCacheDetail(buf,m_cnamCache);
}
// Update cache reload flag
void CacheModule::updateCacheReload()
{
bool ok = false;
for (int i = 0; !ok && s_caches[i]; i++) {
RefPointer<Cache> cache;
getCache(cache,s_caches[i]);
ok = cache && cache->canReload();
cache = 0;
}
m_haveCacheReload = ok;
}
// Add a cache to detail
void CacheModule::addCacheDetail(String& buf, Cache* cache)
{