diff --git a/conf.d/cache.conf.sample b/conf.d/cache.conf.sample index 11268b88..68b07bda 100644 --- a/conf.d/cache.conf.sample +++ b/conf.d/cache.conf.sample @@ -56,6 +56,16 @@ ;account_loadcache= +; The following parameters can be set in cache sections + +; reload_interval: integer: Interval (in seconds) to reload the cache +; This parameter is applied on reload +; This parameter will be ignored if the cache don't have a load account and query +; Minimum allowed value is 10. Set it to 0 to disable cache reload +; Defaults to 0 (no reload) +;reload_interval=0 + + [lnp] ; This section configures the LNP cache ; Database query examples assume a 'lnp' table with the following fields: diff --git a/modules/server/cache.cpp b/modules/server/cache.cpp index eb0f71ca..0d4b891c 100644 --- a/modules/server/cache.cpp +++ b/modules/server/cache.cpp @@ -37,6 +37,8 @@ class CacheModule; // Max value for cache expire check interval #define EXPIRE_CHECK_MAX 300 +// Min value for cache reload interval in seconds +#define CACHE_RELOAD_MIN 10 class CacheItem : public NamedList { @@ -68,6 +70,12 @@ public: // Retrieve the number of items in cache inline unsigned int count() const { return m_count; } + // Retrieve the cache TTL + inline u_int64_t cacheTtl() const + { return m_cacheTtl; } + // Check if the cache has reload set + inline bool canReload() + { return m_loadInterval != 0; } // Retrieve the mutex protecting a given list inline unsigned int index(const String& str) const { return str.hash() % m_list.length(); } @@ -83,6 +91,14 @@ public: { doUpdate(params,false); } // Expire entries void expire(const Time& time); + // Reload the cache if not currently loading and set it to reload + // Set force to true to ignore the time to reload value + bool reload(const Time& time, bool force = false); + // Check if the cache can be loaded. Set the loading flag if true is returned + // endLoad() must be called when done + bool startLoad(); + // Reset the loading flag. Set the next re-load time if we have an interval + void endLoad(); // Copy params from cache item. Return true if found bool copyParams(const String& id, NamedList& list, const String* cpParams); // Add an item to the cache. Remove an existing one @@ -129,6 +145,9 @@ protected: unsigned int m_limit; // Limit the number of cache items unsigned int m_limitOverflow; // Allowed limit overflow unsigned int m_loadChunk; // The number of items to load in each DB load query + bool m_loading; // Cache is loading from database + unsigned int m_loadInterval; // Cache re-load interval (in seconds) + u_int64_t m_nextLoad; // Next time to load the cache String m_copyParams; // Item parameters to store/copy String m_account; // Database account String m_accountLoadCache; // Load cache account @@ -202,6 +221,8 @@ protected: virtual void statusModule(String& buf); virtual void statusParams(String& buf); virtual void statusDetail(String& buf); + // Update cache reload flag + void updateCacheReload(); // Add a cache to detail void addCacheDetail(String& buf, Cache* cache); // Handle messages for LNP @@ -209,6 +230,7 @@ protected: // Handle messages for CNAM void handleCnam(Message& msg, bool before); + bool m_haveCacheReload; // True if we have caches to reload String m_account; // Database account String m_accountLoadCache; // Load cache account Cache* m_lnpCache; // LNP cache @@ -228,6 +250,9 @@ static unsigned int s_maxChunks = 1000; // Maximum number of chunks to load in static unsigned int s_cacheTtlSec = 0; // Default cache item time to live (in seconds) static u_int64_t s_checkToutInterval = 0;// Interval to check cache timeout +// List of known caches +static const String s_caches[] = {"lnp", "cnam", ""}; + // Check if application or current thread are terminating static inline bool exiting() { @@ -260,7 +285,7 @@ static inline unsigned int adjustedCacheLimit(int val, int size) // Adjust a cache TTL static inline unsigned int adjustedCacheTtl(int val) { - return val > 10 ? val : 10; + return val > 10 ? val : (!val ? 0 : 10); } // Adjust a cache load chunk @@ -292,13 +317,54 @@ static inline void dumpItem(Cache& c, CacheItem& item, const char* oper) Cache::Cache(const String& name, int size, const NamedList& params) : Mutex(false,"Cache"), m_name(name), m_list(size), m_cacheTtl(0), m_count(0), m_limit(0), - m_limitOverflow(0), m_loadChunk(0) + m_limitOverflow(0), m_loadChunk(0), + m_loading(false), m_loadInterval(0), m_nextLoad(0) { Debug(&__plugin,DebugInfo,"Cache(%s) size=%u [%p]", m_name.c_str(),m_list.length(),this); doUpdate(params,true); } +// Reload the cache if not currently loading and set it to reload +// Set force to true to ignore the time to reload value +bool Cache::reload(const Time& time, bool force) +{ + if (!m_loadInterval || m_loading) + return false; + lock(); + String tmp; + if (m_loadInterval && !m_loading && (force || !m_nextLoad || m_nextLoad <= time)) + tmp = toString(); + unlock(); + if (!tmp) + return false; + DDebug(&__plugin,DebugInfo,"Cache(%s) re-loading [%p]",m_name.c_str(),this); + __plugin.loadCache(tmp,true); + return true; +} + +// Check if the cache can be loaded. Set the loading flag if true is returned +// endLoad() must be called when done +bool Cache::startLoad() +{ + Lock lock(this); + DDebug(&__plugin,DebugInfo,"Cache(%s) startLoad() ok=%u [%p]", + m_name.c_str(),!m_loading,this); + if (m_loading) + return false; + m_loading = true; + return true; +} + +// Reset the loading flag. Set the next re-load time if we have an interval +void Cache::endLoad() +{ + Lock lock(this); + DDebug(&__plugin,DebugInfo,"Cache(%s) endLoad() [%p]",m_name.c_str(),this); + m_loading = false; + m_nextLoad = m_loadInterval ? (Time::now() + m_loadInterval * 1000000) : 0; +} + // Copy params from cache item. Return true if found bool Cache::copyParams(const String& id, NamedList& list, const String* cpParams) { @@ -528,6 +594,15 @@ void Cache::doUpdate(const NamedList& params, bool first) m_loadChunk = 0; } } + if ((m_accountLoadCache || m_account) && m_queryLoadCache) { + unsigned int interval = params.getIntValue("reload_interval"); + if (interval) + m_loadInterval = (interval >= CACHE_RELOAD_MIN) ? interval : CACHE_RELOAD_MIN; + else + m_loadInterval = 0; + } + else + m_loadInterval = 0; String all; #ifdef DEBUG if (m_account) { @@ -541,8 +616,8 @@ void Cache::doUpdate(const NamedList& params, bool first) } #endif Debug(&__plugin,DebugInfo, - "Cache(%s) updated ttl=%u limit=%u copyparams='%s'%s [%p]", - m_name.c_str(),(unsigned int)(m_cacheTtl / 1000000),m_limit, + "Cache(%s) updated ttl=%u limit=%u reload_interval=%u copyparams='%s'%s [%p]", + m_name.c_str(),(unsigned int)(m_cacheTtl / 1000000),m_limit,m_loadInterval, m_copyParams.safe(),all.safe(),this); } @@ -570,7 +645,7 @@ CacheItem* Cache::addUnsafe(const String& id, const NamedList& params, const Str } } } - if (!expires) + if (!expires && m_cacheTtl) expires = Time::now() + m_cacheTtl; // Search for insert/add point and existing item ObjList* insert = 0; @@ -747,7 +822,7 @@ bool EngineStartHandler::received(Message& msg) */ CacheModule::CacheModule() : Module("cache"), - m_lnpCache(0), m_cnamCache(0) + m_haveCacheReload(false), m_lnpCache(0), m_cnamCache(0) { Output("Loaded module Cache"); } @@ -789,6 +864,8 @@ void CacheModule::setupCache(const String& name, const NamedList& params) } if (s_engineStarted) loadCache(name); + lck.drop(); + updateCacheReload(); return; } if (enabled) { @@ -799,6 +876,8 @@ void CacheModule::setupCache(const String& name, const NamedList& params) } else TelEngine::destruct(*c); + lck.drop(); + updateCacheReload(); } // Start cache load thread @@ -813,18 +892,25 @@ void CacheModule::loadCache(const String& name, bool async) String query; unsigned int chunk = 0; cache->getDbLoad(account,query,chunk); - cache = 0; - if (!(account && query)) + if (!(account && query)) { + cache = 0; return; + } if (async) { + cache = 0; (new CacheLoadThread(name))->startup(); return; } + bool load = cache->startLoad(); + cache = 0; + if (!load) + return; Debug(this,DebugInfo,"Loading cache '%s' chunk=%u",name.c_str(),chunk); unsigned int loaded = 0; unsigned int failed = 0; unsigned int offset = 0; unsigned int max = chunk ? s_maxChunks : 1; + // NOTE: Don't return from the loop: we must notify the cache for (unsigned int i = 0; i < max; i++) { Message m("database"); m.addParam("account",account); @@ -837,17 +923,17 @@ void CacheModule::loadCache(const String& name, bool async) m.addParam("query",query); bool ok = Engine::dispatch(m); if (exiting()) - return; + break; const char* error = m.getValue("error"); if (!ok || error) { Debug(this,DebugNote,"Failed to load cache '%s' reason=%s", name.c_str(),TelEngine::c_safe(error)); - return; + break; } getCache(cache,name); if (!cache) { Debug(this,DebugInfo,"Cache '%s' vanished while loading",name.c_str()); - return; + break; } Array* a = static_cast(m.userObject("Array")); int rows = a ? a->getRows() : 0; @@ -865,11 +951,16 @@ void CacheModule::loadCache(const String& name, bool async) if (added < loadedRows) failed += loadedRows - added; if (exiting()) - return; + break; // Stop if got less then requested if (chunk && loadedRows < chunk) break; } + getCache(cache,name); + if (!cache) + return; + cache->endLoad(); + cache = 0; Debug(this,DebugInfo,"Loaded %u items (failed=%u) in cache '%s'", loaded,failed,name.c_str()); } @@ -878,6 +969,7 @@ void CacheModule::initialize() { static bool s_first = true; static bool s_init = true; + static bool s_createExpire = true; Output("Initializing module Cache"); Configuration cfg(Engine::configFile("cache")); // Globals @@ -938,11 +1030,21 @@ void CacheModule::initialize() if (ok) { DDebug(this,DebugAll,"Initializing"); setup(); - // Expire thread - (new CacheExpireThread)->startup(); s_init = false; } } + if (!s_init && s_createExpire) { + // Create expire thread if we have a cache with non 0 TTL + lock(); + bool ok = (m_lnpCache && m_lnpCache->cacheTtl()) || + (m_cnamCache && m_cnamCache->cacheTtl()); + unlock(); + if (ok) { + DDebug(this,DebugAll,"Creating expire thread"); + (new CacheExpireThread)->startup(); + s_createExpire = false; + } + } } bool CacheModule::received(Message& msg, int id) @@ -955,6 +1057,17 @@ bool CacheModule::received(Message& msg, int id) handleCnam(msg,id == CnamBefore); return false; } + if (id == Timer) { + if (m_haveCacheReload) { + for (int i = 0; s_caches[i]; i++) { + RefPointer cache; + getCache(cache,s_caches[i]); + if (cache) + cache->reload(msg.msgTime()); + cache = 0; + } + } + } return Module::received(msg,id); } @@ -983,6 +1096,19 @@ void CacheModule::statusDetail(String& buf) addCacheDetail(buf,m_cnamCache); } +// Update cache reload flag +void CacheModule::updateCacheReload() +{ + bool ok = false; + for (int i = 0; !ok && s_caches[i]; i++) { + RefPointer cache; + getCache(cache,s_caches[i]); + ok = cache && cache->canReload(); + cache = 0; + } + m_haveCacheReload = ok; +} + // Add a cache to detail void CacheModule::addCacheDetail(String& buf, Cache* cache) {