Optimized array parsing when processing a cache load result set.

git-svn-id: http://yate.null.ro/svn/yate/trunk@4459 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
marian 2011-06-15 14:17:59 +00:00
parent 9e482e0127
commit 48b0a29acb
1 changed files with 88 additions and 24 deletions

View File

@ -92,13 +92,15 @@ public:
Lock lock(this);
addUnsafe(id,params,cpParams,dbSave);
}
// Add items from NamedList list. Return the number of added items
unsigned int add(ObjList& list);
// Add an item from an Array row
inline void add(Array& array, int row, int cols) {
Lock lock(this);
addUnsafe(array,row,cols);
}
// Add items from Array rows. Return the number of added rows
unsigned int addRows(Array& array, int startRow, int numRows);
unsigned int addRows(Array& array);
// Clear the cache
void clear();
// Retrieve cache name
@ -353,6 +355,7 @@ void Cache::expire(const Time& time)
Message* m = new Message("database");
m->addParam("account",m_account);
m->addParam("query",query);
m->addParam("results",String::boolText(false));
Engine::enqueue(m);
}
for (unsigned int i = 0; i < m_list.length(); i++) {
@ -374,20 +377,89 @@ void Cache::expire(const Time& time)
}
}
// Add items from Array rows. Return the number of added rows
unsigned int Cache::addRows(Array& array, int startRow, int numRows)
// Add items from NamedList list
// Return the number of added items
unsigned int Cache::add(ObjList& list)
{
if (numRows <= 0)
unsigned int added = 0;
Lock lck(this);
for (ObjList* o = list.skipNull(); o; o = o->skipNext()) {
NamedList* nl = static_cast<NamedList*>(o->get());
if (addUnsafe(*nl,*nl,0,false))
added++;
}
return added;
}
// Add items from Array rows. Return the number of added rows
unsigned int Cache::addRows(Array& array)
{
int rows = array.getRows();
if (rows < 2)
return 0;
int cols = array.getColumns();
if (cols < 1)
return 0;
int rows = array.getRows();
ObjList** columns = new ObjList*[cols];
String** titles = new String*[cols];
lock();
ObjList* params = m_copyParams.split(',',false);
unlock();
int colId = -1;
for (int i = 0; i < cols; i++) {
columns[i] = array.getColumn(i);
titles[i] = 0;
String* title = columns[i] ? YOBJECT(String,columns[i]->get()) : 0;
if (TelEngine::null(title))
continue;
if (*title == YSTRING("id")) {
colId = i;
titles[i] = title;
}
else if (*title == YSTRING("expires") || params->find(*title))
titles[i] = title;
}
TelEngine::destruct(params);
if (colId < 0) {
// Don't release columns and titles content: they are owned by the array
delete columns;
delete titles;
return 0;
}
unsigned int added = 0;
Lock lock(this);
for (; numRows && startRow < rows; startRow++, numRows--)
if (addUnsafe(array,startRow,cols))
added++;
ObjList pending;
for (int row = 1; row < rows; row++) {
NamedList* p = new NamedList("");
for (int i = 0; i < cols; i++) {
columns[i] = columns[i]->next();
if (!columns[i] || TelEngine::null(titles[i]))
continue;
String* colVal = YOBJECT(String,columns[i]->get());
if (!colVal)
continue;
if (i == colId)
p->assign(*colVal);
else
p->addParam(titles[i]->c_str(),*colVal);
}
if (*p)
pending.append(p);
else
TelEngine::destruct(p);
if (0 != (row % 500))
continue;
// Add pending items, take a breath to let others do their job
added += add(pending);
pending.clear();
Thread::idle();
if (exiting())
break;
}
// Add remaining items
added += add(pending);
// Don't release columns and titles content: they are owned by the array
delete columns;
delete titles;
return added;
}
@ -544,6 +616,7 @@ CacheItem* Cache::addUnsafe(const String& id, const NamedList& params, const Str
Message* m = new Message("database");
m->addParam("account",m_account);
m->addParam("query",query);
m->addParam("results",String::boolText(false));
Engine::enqueue(m);
}
dumpItem(*this,*item,!found ? "added" : "updated");
@ -787,21 +860,12 @@ void CacheModule::loadCache(const String& name, bool async)
}
offset += loadedRows;
loaded += loadedRows;
for (int row = 1; row < rows;) {
unsigned int remaining = rows - row;
unsigned int numRows = 500;
if (remaining < numRows)
numRows = remaining;
unsigned int added = cache->addRows(*a,row,numRows);
row += numRows;
if (added < numRows)
failed += numRows - added;
// Take a breath, let others do their job
Thread::idle();
if (exiting())
return;
}
cache = 0;
unsigned int added = cache->addRows(*a);
cache = 0;
if (added < loadedRows)
failed += loadedRows - added;
if (exiting())
return;
// Stop if got less then requested
if (chunk && loadedRows < chunk)
break;