From eda2587ff0d83c97a77f1a8f56086afdc0cf1ce3 Mon Sep 17 00:00:00 2001 From: marian Date: Tue, 7 Jun 2011 07:45:50 +0000 Subject: [PATCH] Allow loading a cache using multiple database requests. Added optional extra account used to load a cache. git-svn-id: http://voip.null.ro/svn/yate@4440 acf43c95-373e-0410-b603-e72c3f656dc1 --- conf.d/cache.conf.sample | 34 +++++++- modules/server/cache.cpp | 177 ++++++++++++++++++++++++++++++--------- 2 files changed, 171 insertions(+), 40 deletions(-) diff --git a/conf.d/cache.conf.sample b/conf.d/cache.conf.sample index e9d51005..11268b88 100644 --- a/conf.d/cache.conf.sample +++ b/conf.d/cache.conf.sample @@ -20,6 +20,28 @@ ; This parameter is applied on reload and can be overridden in cache sections ;limit= +; loadchunk: integer: The number of items to load in a database request +; Minimum allowed value is 500, maximum allowed value is 10000 +; Set it to 0 to load the whole cache using a single database request +; This parameter is applied on reload and can be overridden in cache sections +; NOTES for non 0 value: +; - The 'query_loadcache' parameter in cache sections should contain an 'ORDER BY' +; clause to make sure the cache table is parsed in the same order +; - The 'query_loadcache' query MUST be a parameterized query containing +; LIMIT ${chunk} OFFSET ${offset}. The module will replace chunk and offset for each +; sent database request +;loadchunk=0 + +; maxchunks: integer: Maximum number of chunks to load from cache +; Minimum allowed value is 1, maximum allowed value is 10000 +; Defaults to 1000 +; This parameter is applied on reload +; WARNING: +; - Set a large value only if you are sure the cache load queries are correct +; - Setting a large value for a query without LIMIT or OFFSET will lead to +; useless extra processing +;maxchunks=1000 + ; expire_check_interval: integer: The interval (in seconds) to check cache expire ; Defaults to 10, minimum allowed value 1, maximum allowed value 300 ; This parameter is applied on reload @@ -29,6 +51,10 @@ ; This parameter is applied on reload and can be overridden in cache sections ;account= +; account_loadcache: string: Optional database account used to load an entire cache +; This parameter is applied on reload and can be overridden in cache sections +;account_loadcache= + [lnp] ; This section configures the LNP cache @@ -49,9 +75,9 @@ ; store_npdi_before: boolean: Store routing number from incoming calls with LNP ; This parameter can be overidden by a 'cache_lnp_store' parameter when routing -; Defaults to yes +; Defaults to no ; This parameter is applied on reload -;store_npdi_before=yes +;store_npdi_before=no ; copyparams: string: Parameters to handle (store in cache or copy to handled messages) ; This parameter is applied on reload and can be overridden when routing by @@ -72,6 +98,8 @@ ; query_loadcache: string: Database query used to load the LNP cache when created ; This parameter is applied on reload ;query_loadcache=SELECT FLOOR(EXTRACT('EPOCH' FROM (timeout - CURRENT_TIMESTAMP))) AS expires,* FROM lnp +; For non 0 'loadchunk' +;query_loadcache=SELECT FLOOR(EXTRACT('EPOCH' FROM (timeout - CURRENT_TIMESTAMP))) AS expires,* FROM lnp ORDER BY timeout LIMIT ${chunk} OFFSET ${offset} ; query_loaditem: string: Database query used to load an item when created ; This parameter is applied on reload @@ -122,6 +150,8 @@ ; query_loadcache: string: Database query used to load the CNAM cache when created ; This parameter is applied on reload ;query_loadcache=SELECT FLOOR(EXTRACT('EPOCH' FROM (timeout - CURRENT_TIMESTAMP))) AS expires,* FROM cnam +; For non 0 'loadchunk' +;query_loadcache=SELECT FLOOR(EXTRACT('EPOCH' FROM (timeout - CURRENT_TIMESTAMP))) AS expires,* FROM cnam ORDER BY timeout LIMIT ${chunk} OFFSET ${offset} ; query_loaditem: string: Database query used to load an item when created ; This parameter is applied on reload diff --git a/modules/server/cache.cpp b/modules/server/cache.cpp index a168507c..5f65e1c7 100644 --- a/modules/server/cache.cpp +++ b/modules/server/cache.cpp @@ -72,10 +72,11 @@ public: inline unsigned int index(const String& str) const { return str.hash() % m_list.length(); } // Safely retrieve DB load info - inline void getDbLoad(String& account, String& query) { + inline void getDbLoad(String& account, String& query, unsigned int& loadChunk) { Lock lock(this); - account = m_account; + account = (m_accountLoadCache ? m_accountLoadCache : m_account); query = m_queryLoadCache; + loadChunk = m_loadChunk; } // Reinit inline void update(const NamedList& params) @@ -96,10 +97,15 @@ public: Lock lock(this); addUnsafe(array,row,cols); } + // Add items from Array rows. Return the number of added rows + unsigned int addRows(Array& array, int startRow, int numRows); // Clear the cache void clear(); // Retrieve cache name virtual const String& toString() const; + // Set chunk limit and offset to a query + // Return the number of replaced params + static int setLimits(String& query, unsigned int chunk, unsigned int offset); protected: virtual void destroyed(); // (Re)init @@ -120,8 +126,10 @@ protected: unsigned int m_count; // Current number of items unsigned int m_limit; // Limit the number of cache items unsigned int m_limitOverflow; // Allowed limit overflow + unsigned int m_loadChunk; // The number of items to load in each DB load query String m_copyParams; // Item parameters to store/copy String m_account; // Database account + String m_accountLoadCache; // Load cache account String m_queryLoadCache; // Database load all cache query String m_queryLoadItem; // Database load a cache item query String m_querySave; // Database save query @@ -169,9 +177,9 @@ public: CacheModule(); ~CacheModule(); // Safely retrieve the database account - inline void getAccount(String& buf) { + inline void getAccount(String& buf, bool cacheLoad = false) { Lock lock(this); - buf = m_account; + buf = !cacheLoad ? m_account : m_accountLoadCache; } // Safely retrieve a reference to a cache inline void getCache(RefPointer& c, const String& name) { @@ -200,6 +208,7 @@ protected: void handleCnam(Message& msg, bool before); String m_account; // Database account + String m_accountLoadCache; // Load cache account Cache* m_lnpCache; // LNP cache Cache* m_cnamCache; // CNAM cache }; @@ -212,6 +221,8 @@ static bool s_lnpStoreNpdiBefore = true; // Store LNP when already done static bool s_cnamStoreEmpty = false; // Store empty caller name in CNAM cache static unsigned int s_size = 0; // The number of listst in each cache static unsigned int s_limit = 0; // Default cache limit +static unsigned int s_loadChunk = 0; // The number of cache items to load in each DB load query +static unsigned int s_maxChunks = 1000; // Maximum number of chunks to load in a cache static unsigned int s_cacheTtlSec = 0; // Default cache item time to live (in seconds) static u_int64_t s_checkToutInterval = 0;// Interval to check cache timeout @@ -250,10 +261,20 @@ static inline unsigned int adjustedCacheTtl(int val) return val > 10 ? val : 10; } +// Adjust a cache load chunk +static inline unsigned int adjustedCacheLoadChunk(int val) +{ + if (val <= 0) + return 0; + if (val >= 500 && val <= 10000) + return val; + return val < 500 ? 500 : 10000; +} + // Show cache item changes to output static inline void dumpItem(Cache& c, CacheItem& item, const char* oper) { -#ifdef DEBUG +#ifdef XDEBUG String tmp; item.dump(tmp," "); Debug(&__plugin,DebugAll,"Cache(%s) %s %p %s expires=%u [%p]", @@ -269,7 +290,7 @@ static inline void dumpItem(Cache& c, CacheItem& item, const char* oper) Cache::Cache(const String& name, int size, const NamedList& params) : Mutex(false,"Cache"), m_name(name), m_list(size), m_cacheTtl(0), m_count(0), m_limit(0), - m_limitOverflow(0) + m_limitOverflow(0), m_loadChunk(0) { Debug(&__plugin,DebugInfo,"Cache(%s) size=%u [%p]", m_name.c_str(),m_list.length(),this); @@ -353,6 +374,23 @@ void Cache::expire(const Time& time) } } +// Add items from Array rows. Return the number of added rows +unsigned int Cache::addRows(Array& array, int startRow, int numRows) +{ + if (numRows <= 0) + return 0; + int cols = array.getColumns(); + if (cols < 1) + return 0; + int rows = array.getRows(); + unsigned int added = 0; + Lock lock(this); + for (; numRows && startRow < rows; startRow++, numRows--) + if (addUnsafe(array,startRow,cols)) + added++; + return added; +} + // Clear the cache void Cache::clear() { @@ -367,6 +405,16 @@ const String& Cache::toString() const return m_name; } +// Set chunk limit and offset to a query +// Return the number of replaced params +int Cache::setLimits(String& query, unsigned int chunk, unsigned int offset) +{ + NamedList params(""); + params.addParam("chunk",String(chunk)); + params.addParam("offset",String(offset)); + return params.replaceParams(query); +} + void Cache::destroyed() { Debug(&__plugin,DebugInfo,"Cache(%s) destroyed [%p]",m_name.c_str(),this); @@ -378,7 +426,9 @@ void Cache::destroyed() void Cache::doUpdate(const NamedList& params, bool first) { String account; + String accountLoadCache; __plugin.getAccount(account); + __plugin.getAccount(accountLoadCache,true); Lock lck(this); if (first) { int ttl = safeValue(params.getIntValue("ttl",s_cacheTtlSec)); @@ -389,25 +439,39 @@ void Cache::doUpdate(const NamedList& params, bool first) m_limitOverflow = m_limit + (m_limit / 100); else m_limitOverflow = 0; + m_loadChunk = adjustedCacheLoadChunk(params.getIntValue("loadchunk",s_loadChunk)); m_copyParams = params.getValue("copyparams"); m_account = params.getValue("account",account); + m_accountLoadCache = params.getValue("account_loadcache",accountLoadCache); m_queryLoadCache = params.getValue("query_loadcache"); m_queryLoadItem = params.getValue("query_loaditem"); m_querySave = params.getValue("query_save"); m_queryExpire = params.getValue("query_expire"); + // Minimum sanity check for cache load + if (m_loadChunk && m_queryLoadCache) { + String tmp = m_queryLoadCache; + if (setLimits(tmp,m_loadChunk,0) < 2) { + Debug(&__plugin,DebugNote,"Cache(%s) invalid query_loadcache='%s' for loadchunk=%u [%p]", + m_name.c_str(),m_queryLoadCache.c_str(),m_loadChunk,this); + m_loadChunk = 0; + } + } String all; #ifdef DEBUG if (m_account) { - all << " copyparams=" << m_copyParams; + all << " loadchunk=" << m_loadChunk; all << " account=" << m_account; + all << " account_loadcache=" << m_accountLoadCache; all << " query_loadcache=" << m_queryLoadCache; all << " query_loaditem=" << m_queryLoadItem; all << " query_save=" << m_querySave; all << " query_expire=" << m_queryExpire; } #endif - Debug(&__plugin,DebugInfo,"Cache(%s) updated ttl=%u limit=%u%s [%p]", - m_name.c_str(),(unsigned int)(m_cacheTtl / 1000000),m_limit,all.safe(),this); + Debug(&__plugin,DebugInfo, + "Cache(%s) updated ttl=%u limit=%u copyparams='%s'%s [%p]", + m_name.c_str(),(unsigned int)(m_cacheTtl / 1000000),m_limit, + m_copyParams.safe(),all.safe(),this); } // Add an item to the cache. Remove an existing one @@ -674,46 +738,76 @@ void CacheModule::loadCache(const String& name, bool async) return; String account; String query; - cache->getDbLoad(account,query); + unsigned int chunk = 0; + cache->getDbLoad(account,query,chunk); cache = 0; if (!(account && query)) return; if (async) { - cache = 0; (new CacheLoadThread(name))->startup(); return; } - Debug(this,DebugInfo,"Loading cache '%s'",name.c_str()); - Message m("database"); - m.addParam("account",account); - m.addParam("query",query); - bool ok = Engine::dispatch(m); - if (exiting()) - return; - const char* error = m.getValue("error"); - if (!ok || error) { - Debug(this,DebugNote,"Failed to load cache '%s' %s", - name.c_str(),TelEngine::c_safe(error)); - return; - } - getCache(cache,name); - if (!cache) { - Debug(this,DebugInfo,"Cache '%s' vanished while loading",name.c_str()); - return; - } - Array* a = static_cast(m.userObject("Array")); - int rows = a ? a->getRows() : 0; - int cols = a ? a->getColumns() : 0; - for (int row = 1; row < rows; row++) { + Debug(this,DebugInfo,"Loading cache '%s' chunk=%u",name.c_str(),chunk); + unsigned int loaded = 0; + unsigned int failed = 0; + unsigned int offset = 0; + unsigned int max = chunk ? s_maxChunks : 1; + for (unsigned int i = 0; i < max; i++) { + Message m("database"); + m.addParam("account",account); + if (chunk) { + String tmp = query; + Cache::setLimits(tmp,chunk,offset); + m.addParam("query",tmp); + } + else + m.addParam("query",query); + bool ok = Engine::dispatch(m); if (exiting()) + return; + const char* error = m.getValue("error"); + if (!ok || error) { + Debug(this,DebugNote,"Failed to load cache '%s' reason=%s", + name.c_str(),TelEngine::c_safe(error)); + return; + } + getCache(cache,name); + if (!cache) { + Debug(this,DebugInfo,"Cache '%s' vanished while loading",name.c_str()); + return; + } + Array* a = static_cast(m.userObject("Array")); + int rows = a ? a->getRows() : 0; + unsigned int loadedRows = (rows > 0) ? rows - 1 : 0; + Debug(this,DebugAll,"Loaded %u rows current chunk=%u for cache '%s'", + loadedRows,i + 1,name.c_str()); + if (!loadedRows) { + cache = 0; break; - cache->add(*a,row,cols); - // Take a breath, let others do their job - if (0 == (row % 500)) + } + offset += loadedRows; + loaded += loadedRows; + for (int row = 1; row < rows;) { + unsigned int remaining = rows - row; + unsigned int numRows = 500; + if (remaining < numRows) + numRows = remaining; + unsigned int added = cache->addRows(*a,row,numRows); + row += numRows; + if (added < numRows) + failed += numRows - added; + // Take a breath, let others do their job Thread::idle(); + if (exiting()) + return; + } + cache = 0; + // Stop if got less then requested + if (chunk && loadedRows < chunk) + break; } - Debug(this,DebugInfo,"Loaded %d items in cache '%s'",rows ? rows - 1 : 0,name.c_str()); - cache = 0; + Debug(this,DebugInfo,"Loaded %u items (failed=%u) in cache '%s'", + loaded,failed,name.c_str()); } void CacheModule::initialize() @@ -725,6 +819,12 @@ void CacheModule::initialize() // Globals s_size = adjustedCacheSize(cfg.getIntValue("general","size",17)); s_limit = adjustedCacheLimit(cfg.getIntValue("general","limit",s_limit),s_size); + s_loadChunk = adjustedCacheLoadChunk(cfg.getIntValue("general","loadchunk")); + s_maxChunks = safeValue(cfg.getIntValue("general","maxchunks",1000)); + if (!s_maxChunks) + s_maxChunks = 1; + else if (s_maxChunks > 10000) + s_maxChunks = 10000; s_cacheTtlSec = adjustedCacheTtl(cfg.getIntValue("general","ttl")); unsigned int tmp = safeValue(cfg.getIntValue("general","expire_check_interval",10)); if (tmp > s_cacheTtlSec) @@ -737,6 +837,7 @@ void CacheModule::initialize() s_checkToutInterval = 1000000; lock(); m_account = cfg.getValue("general","account"); + m_accountLoadCache = cfg.getValue("general","account_loadcache"); unlock(); // Update cache objects NamedList* lnp = cfg.getSection("lnp");