MySQL connection pool implemented per database account. The "database" message parameter "results" has changed its meaning. For this, please check documentation.
git-svn-id: http://voip.null.ro/svn/yate@2821 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
parent
86b791f46d
commit
e6dbb33d71
|
@ -38,3 +38,8 @@
|
|||
; If not set or empty will use the default for your system
|
||||
; This setting is not available on older MySQL client libraries
|
||||
;encoding=
|
||||
|
||||
; poolsize: int: Number of connections to establish for this account
|
||||
; If not set or empty, it will create only one connection
|
||||
; Minimum number of connections is 1, maximum is 10
|
||||
;poolsize=1
|
||||
|
|
|
@ -40,38 +40,79 @@
|
|||
#define mysql_library_end mysql_server_end
|
||||
#endif
|
||||
|
||||
#define MIN_CONNECTIONS 1
|
||||
#define MAX_CONNECTIONS 10
|
||||
|
||||
using namespace TelEngine;
|
||||
namespace { // anonymous
|
||||
|
||||
class DbThread;
|
||||
class DbQuery;
|
||||
class DbQueryList;
|
||||
class MySqlConn;
|
||||
class MyAcct;
|
||||
|
||||
static ObjList s_conns;
|
||||
Mutex s_conmutex(false,"MySQL::conn");
|
||||
Mutex s_acctMutex(false,"MySQL::accts");
|
||||
|
||||
class MyConn : public GenObject, public Mutex
|
||||
/**
|
||||
* Class MyConn
|
||||
* A MySQL connection
|
||||
*/
|
||||
class MyConn : public String
|
||||
{
|
||||
friend class DbThread;
|
||||
public:
|
||||
MyConn(const NamedList* sect);
|
||||
~MyConn();
|
||||
virtual const String& toString() const
|
||||
{ return m_name; }
|
||||
friend class MyAcct;
|
||||
|
||||
public:
|
||||
|
||||
inline MyConn(const String& name, MyAcct* conn)
|
||||
: String(name),
|
||||
m_conn(0), m_owner(conn),
|
||||
m_thread(0), m_init(false)
|
||||
{}
|
||||
~MyConn();
|
||||
|
||||
void closeConn();
|
||||
|
||||
inline void setInit(bool val = true)
|
||||
{ m_init = val; }
|
||||
|
||||
inline bool isInitialized()
|
||||
{ return m_init; }
|
||||
|
||||
void runQueries();
|
||||
int queryDbInternal(DbQuery* query);
|
||||
|
||||
private:
|
||||
MYSQL* m_conn;
|
||||
MyAcct* m_owner;
|
||||
DbThread* m_thread;
|
||||
bool testDb();
|
||||
bool m_init;
|
||||
};
|
||||
|
||||
/**
|
||||
* Class MyAcct
|
||||
* A MySQL database account
|
||||
*/
|
||||
class MyAcct : public String, public Mutex
|
||||
{
|
||||
friend class MyConn;
|
||||
public:
|
||||
MyAcct(const NamedList* sect);
|
||||
~MyAcct();
|
||||
|
||||
int queryDb(const char* query, Message* dest = 0);
|
||||
bool initDb();
|
||||
void dropDb();
|
||||
inline bool ok() const
|
||||
{ return 0 != m_thread; }
|
||||
void runQueries();
|
||||
{ return 0 != m_connections.skipNull(); }
|
||||
|
||||
void appendQuery(DbQuery* query);
|
||||
|
||||
private:
|
||||
bool testDb();
|
||||
bool startDb();
|
||||
int queryDbInternal();
|
||||
String m_name;
|
||||
unsigned int m_timeout;
|
||||
MYSQL* m_conn;
|
||||
DbThread* m_thread;
|
||||
|
||||
String m_host;
|
||||
String m_user;
|
||||
String m_pass;
|
||||
|
@ -80,12 +121,19 @@ private:
|
|||
unsigned int m_port;
|
||||
bool m_compress;
|
||||
String m_encoding;
|
||||
String m_query;
|
||||
Message* m_msg;
|
||||
int m_res;
|
||||
volatile bool m_go;
|
||||
|
||||
int m_poolSize;
|
||||
ObjList m_connections;
|
||||
ObjList m_queryQueue;
|
||||
|
||||
Semaphore m_queueSem;
|
||||
Mutex m_queueMutex;
|
||||
};
|
||||
|
||||
/**
|
||||
* Class DbThread
|
||||
* Running thread for a MySQL connection
|
||||
*/
|
||||
class DbThread : public Thread
|
||||
{
|
||||
public:
|
||||
|
@ -98,6 +146,10 @@ private:
|
|||
MyConn* m_conn;
|
||||
};
|
||||
|
||||
/**
|
||||
* Class MyHandler
|
||||
* Message handler for "database" message
|
||||
*/
|
||||
class MyHandler : public MessageHandler
|
||||
{
|
||||
public:
|
||||
|
@ -107,6 +159,10 @@ public:
|
|||
virtual bool received(Message& msg);
|
||||
};
|
||||
|
||||
/**
|
||||
* Class MyModule
|
||||
* The MySQL database module
|
||||
*/
|
||||
class MyModule : public Module
|
||||
{
|
||||
public:
|
||||
|
@ -119,108 +175,104 @@ private:
|
|||
bool m_init;
|
||||
};
|
||||
|
||||
/**
|
||||
* Class DbQuery
|
||||
* A MySQL query
|
||||
*/
|
||||
class DbQuery : public String, public Semaphore
|
||||
{
|
||||
friend class MyConn;
|
||||
public:
|
||||
inline DbQuery(const String& query, Message* msg)
|
||||
: String(query),
|
||||
Semaphore(1,"MySQL::query"),
|
||||
m_msg(msg), m_finished(false)
|
||||
{ DDebug( DebugAll, "DbQuery object [%p] created for query '%s'", this, c_str()); }
|
||||
|
||||
inline ~DbQuery()
|
||||
{ m_msg = 0;
|
||||
DDebug( DebugAll, "DbQuery object [%p] with query '%s' was destroyed", this, c_str()); }
|
||||
|
||||
inline bool finished()
|
||||
{ return m_finished; }
|
||||
|
||||
inline void setFinished()
|
||||
{ m_finished = true;
|
||||
if (!m_msg)
|
||||
destruct();
|
||||
}
|
||||
|
||||
private:
|
||||
Message* m_msg;
|
||||
bool m_finished;
|
||||
};
|
||||
|
||||
static MyModule module;
|
||||
static Mutex s_libMutex(false,"MySQL::lib");
|
||||
static int s_libCounter = 0;
|
||||
|
||||
MyConn::MyConn(const NamedList* sect)
|
||||
: Mutex(true,"MyConn"),
|
||||
m_name(*sect),
|
||||
m_conn(0), m_thread(0), m_msg(0), m_go(false)
|
||||
{
|
||||
int tout = sect->getIntValue("timeout",10000);
|
||||
// round to seconds
|
||||
m_timeout = (tout + 500) / 1000;
|
||||
// but make sure it doesn't round to zero unless zero was requested
|
||||
if (tout && !m_timeout)
|
||||
m_timeout = 1;
|
||||
m_host = sect->getValue("host");
|
||||
m_user = sect->getValue("user","mysql");
|
||||
m_pass = sect->getValue("password");
|
||||
m_db = sect->getValue("database","yate");
|
||||
m_port = sect->getIntValue("port");
|
||||
m_unix = sect->getValue("socket");
|
||||
m_compress = sect->getBoolValue("compress");
|
||||
m_encoding = sect->getValue("encoding");
|
||||
}
|
||||
|
||||
/**
|
||||
* MyConn
|
||||
*/
|
||||
MyConn::~MyConn()
|
||||
{
|
||||
s_conns.remove(this,false);
|
||||
// FIXME: should we try to do it from this thread?
|
||||
dropDb();
|
||||
{
|
||||
m_conn = 0;
|
||||
//closeConn();
|
||||
Debug(&module,DebugInfo,"Database connection '%s' destroyed",c_str());
|
||||
}
|
||||
|
||||
// initialize the database connection
|
||||
bool MyConn::initDb()
|
||||
void MyConn::closeConn()
|
||||
{
|
||||
Lock lock(this);
|
||||
// allow specifying the raw connection string
|
||||
Debug(&module,DebugInfo,"Initiating connection for '%s'",m_name.c_str());
|
||||
m_conn = mysql_init(m_conn);
|
||||
if (!m_conn) {
|
||||
Debug(&module,DebugGoOn,"Could not start connection for '%s'",m_name.c_str());
|
||||
return false;
|
||||
}
|
||||
if (m_compress)
|
||||
mysql_options(m_conn,MYSQL_OPT_COMPRESS,0);
|
||||
mysql_options(m_conn,MYSQL_OPT_CONNECT_TIMEOUT,(const char*)&m_timeout);
|
||||
#ifdef MYSQL_OPT_READ_TIMEOUT
|
||||
mysql_options(m_conn,MYSQL_OPT_READ_TIMEOUT,(const char*)&m_timeout);
|
||||
#endif
|
||||
#ifdef MYSQL_OPT_WRITE_TIMEOUT
|
||||
mysql_options(m_conn,MYSQL_OPT_WRITE_TIMEOUT,(const char*)&m_timeout);
|
||||
#endif
|
||||
if (mysql_real_connect(m_conn,m_host,m_user,m_pass,m_db,m_port,m_unix,CLIENT_MULTI_STATEMENTS)) {
|
||||
#ifdef MYSQL_OPT_RECONNECT
|
||||
// this option must be set after connect - bug in mysql client library
|
||||
my_bool reconn = 1;
|
||||
mysql_options(m_conn,MYSQL_OPT_RECONNECT,(const char*)&reconn);
|
||||
#endif
|
||||
#ifdef HAVE_MYSQL_SET_CHARSET
|
||||
if (m_encoding && mysql_set_character_set(m_conn,m_encoding))
|
||||
Debug(&module,DebugWarn,"Failed to set encoding '%s' on connection '%s'",
|
||||
m_encoding.c_str(),m_name.c_str());
|
||||
#else
|
||||
if (m_encoding)
|
||||
Debug(&module,DebugWarn,"Your client library does not support setting the character set");
|
||||
#endif
|
||||
return true;
|
||||
}
|
||||
Debug(&module,DebugWarn,"Connection for '%s' failed: %s",m_name.c_str(),mysql_error(m_conn));
|
||||
return false;
|
||||
}
|
||||
|
||||
// drop the connection
|
||||
void MyConn::dropDb()
|
||||
{
|
||||
Lock lock(this);
|
||||
m_res = -1;
|
||||
m_go = false;
|
||||
DDebug(&module,DebugInfo,"Database connection '%s' trying to close %p",c_str(),m_conn);
|
||||
if (!m_conn)
|
||||
return;
|
||||
MYSQL* tmp = m_conn;
|
||||
m_conn = 0;
|
||||
lock.drop();
|
||||
mysql_close(tmp);
|
||||
Debug(&module,DebugInfo,"Database connection '%s' closed",m_name.c_str());
|
||||
if(m_owner)
|
||||
m_owner->m_connections.remove(this);
|
||||
Debug(&module,DebugInfo,"Database connection '%s' closed",c_str());
|
||||
}
|
||||
|
||||
void MyConn::runQueries()
|
||||
{
|
||||
while (m_conn && m_owner) {
|
||||
Thread::check();
|
||||
m_owner->m_queueSem.lock(Thread::idleUsec());
|
||||
|
||||
Lock mylock(m_owner->m_queueMutex);
|
||||
DbQuery* query = static_cast<DbQuery*>(m_owner->m_queryQueue.remove(false));
|
||||
if (!query)
|
||||
continue;
|
||||
mylock.drop();
|
||||
|
||||
DDebug(&module,DebugAll,"Connection '%s' will try to execute '%s'",
|
||||
c_str(),query->c_str());
|
||||
|
||||
int res = queryDbInternal(query);
|
||||
if ((res < 0) && query->m_msg)
|
||||
query->m_msg->setParam("error","failure");
|
||||
|
||||
query->unlock();
|
||||
query->setFinished();
|
||||
DDebug(&module,DebugAll,"Connection '%s' finished executing query",c_str());
|
||||
}
|
||||
}
|
||||
|
||||
// test it the connection is still OK
|
||||
bool MyConn::testDb()
|
||||
{
|
||||
return m_conn && !mysql_ping(m_conn);
|
||||
return m_conn && !mysql_ping(m_conn);
|
||||
}
|
||||
|
||||
// perform the query, fill the message with data
|
||||
// return number of rows, -1 for error
|
||||
int MyConn::queryDbInternal()
|
||||
int MyConn::queryDbInternal(DbQuery* query)
|
||||
{
|
||||
if (!testDb())
|
||||
return -1;
|
||||
|
||||
if (mysql_real_query(m_conn,m_query.safe(),m_query.length())) {
|
||||
Debug(&module,DebugWarn,"Query for '%s' failed: %s",m_name.c_str(),mysql_error(m_conn));
|
||||
if (mysql_real_query(m_conn,query->safe(),query->length())) {
|
||||
Debug(&module,DebugWarn,"Query for '%s' failed: %s",c_str(),mysql_error(m_conn));
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -236,102 +288,212 @@ int MyConn::queryDbInternal()
|
|||
unsigned int rows = (unsigned int)mysql_num_rows(res);
|
||||
Debug(&module,DebugAll,"Got result set %p rows=%u cols=%u",res,rows,cols);
|
||||
total += rows;
|
||||
if (m_msg) {
|
||||
if (query->m_msg) {
|
||||
MYSQL_FIELD* fields = mysql_fetch_fields(res);
|
||||
m_msg->setParam("columns",String(cols));
|
||||
m_msg->setParam("rows",String(rows));
|
||||
if (m_msg->getBoolValue("results",true)) {
|
||||
Array *a = new Array(cols,rows+1);
|
||||
unsigned int c;
|
||||
// add field names
|
||||
for (c = 0; c < cols; c++)
|
||||
a->set(new String(fields[c].name),c,0);
|
||||
// and now data row by row
|
||||
for (unsigned int r = 1; r <= rows; r++) {
|
||||
MYSQL_ROW row = mysql_fetch_row(res);
|
||||
if (!row)
|
||||
break;
|
||||
unsigned long* len = mysql_fetch_lengths(res);
|
||||
for (c = 0; c < cols; c++) {
|
||||
if (!row[c])
|
||||
query->m_msg->setParam("columns",String(cols));
|
||||
query->m_msg->setParam("rows",String(rows));
|
||||
Array *a = new Array(cols,rows+1);
|
||||
unsigned int c;
|
||||
// add field names
|
||||
for (c = 0; c < cols; c++)
|
||||
a->set(new String(fields[c].name),c,0);
|
||||
// and now data row by row
|
||||
for (unsigned int r = 1; r <= rows; r++) {
|
||||
MYSQL_ROW row = mysql_fetch_row(res);
|
||||
if (!row)
|
||||
break;
|
||||
unsigned long* len = mysql_fetch_lengths(res);
|
||||
for (c = 0; c < cols; c++) {
|
||||
if (!row[c])
|
||||
continue;
|
||||
if (63 == fields[c].charsetnr) {
|
||||
// field holds binary data
|
||||
if (!len)
|
||||
continue;
|
||||
if (63 == fields[c].charsetnr) {
|
||||
// field holds binary data
|
||||
if (!len)
|
||||
continue;
|
||||
a->set(new DataBlock(row[c],len[c]),c,r);
|
||||
}
|
||||
else
|
||||
a->set(new String(row[c]),c,r);
|
||||
a->set(new DataBlock(row[c],len[c]),c,r);
|
||||
}
|
||||
else
|
||||
a->set(new String(row[c]),c,r);
|
||||
}
|
||||
m_msg->userData(a);
|
||||
a->deref();
|
||||
}
|
||||
query->m_msg->userData(a);
|
||||
a->deref();
|
||||
}
|
||||
mysql_free_result(res);
|
||||
}
|
||||
} while (!mysql_next_result(m_conn));
|
||||
|
||||
if (m_msg) {
|
||||
m_msg->setParam("affected",String(affected));
|
||||
if (query->m_msg) {
|
||||
query->m_msg->setParam("affected",String(affected));
|
||||
if (warns)
|
||||
m_msg->setParam("warnings",String(warns));
|
||||
query->m_msg->setParam("warnings",String(warns));
|
||||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
void MyConn::runQueries()
|
||||
/**
|
||||
* MyAcct
|
||||
*/
|
||||
MyAcct::MyAcct(const NamedList* sect)
|
||||
: String(*sect),
|
||||
Mutex(true,"MySQL::acct"),
|
||||
m_queueSem(MAX_CONNECTIONS,"MySQL::queue"),
|
||||
m_queueMutex(false,"MySQL::queue")
|
||||
{
|
||||
while (m_conn) {
|
||||
if (m_go) {
|
||||
DDebug(&module,DebugAll,"Running query \"%s\" for '%s'",
|
||||
m_query.c_str(),m_name.c_str());
|
||||
m_res = queryDbInternal();
|
||||
m_msg = 0;
|
||||
m_query.clear();
|
||||
m_go = false;
|
||||
Thread::yield(true);
|
||||
}
|
||||
else
|
||||
Thread::idle(true);
|
||||
}
|
||||
int tout = sect->getIntValue("timeout",10000);
|
||||
// round to seconds
|
||||
m_timeout = (tout + 500) / 1000;
|
||||
// but make sure it doesn't round to zero unless zero was requested
|
||||
if (tout && !m_timeout)
|
||||
m_timeout = 1;
|
||||
m_host = sect->getValue("host");
|
||||
m_user = sect->getValue("user","mysql");
|
||||
m_pass = sect->getValue("password");
|
||||
m_db = sect->getValue("database","yate");
|
||||
m_port = sect->getIntValue("port");
|
||||
m_unix = sect->getValue("socket");
|
||||
m_compress = sect->getBoolValue("compress");
|
||||
m_encoding = sect->getValue("encoding");
|
||||
|
||||
m_poolSize = sect->getIntValue("poolsize",MIN_CONNECTIONS);
|
||||
if (m_poolSize < MIN_CONNECTIONS)
|
||||
m_poolSize = MIN_CONNECTIONS;
|
||||
else if (m_poolSize > MAX_CONNECTIONS)
|
||||
m_poolSize = MAX_CONNECTIONS;
|
||||
Debug(&module, DebugNote, "For account '%s' connection pool size is %d",
|
||||
c_str(),m_poolSize);
|
||||
}
|
||||
|
||||
static bool failure(Message* m)
|
||||
MyAcct::~MyAcct()
|
||||
{
|
||||
if (m)
|
||||
m->setParam("error","failure");
|
||||
return false;
|
||||
Debug(&module, DebugNote, "~MyAcct()");
|
||||
s_conns.remove(this,false);
|
||||
// FIXME: should we try to do it from this thread?
|
||||
dropDb();
|
||||
}
|
||||
|
||||
int MyConn::queryDb(const char* query, Message* dest)
|
||||
// initialize the database connection
|
||||
bool MyAcct::initDb()
|
||||
{
|
||||
if (TelEngine::null(query))
|
||||
return -1;
|
||||
DDebug(&module,DebugAll,"Proxying query \"%s\" for '%s'",
|
||||
query,m_name.c_str());
|
||||
m_msg = dest;
|
||||
m_query = query;
|
||||
m_go = true;
|
||||
while (m_go)
|
||||
Thread::msleep(1);
|
||||
if (m_res < 0)
|
||||
failure(dest);
|
||||
return m_res;
|
||||
}
|
||||
Lock lock(this);
|
||||
// allow specifying the raw connection string
|
||||
Debug(&module,DebugNote,"Initiating pool of %d connections for '%s'",
|
||||
m_poolSize,c_str());
|
||||
|
||||
m_queueSem.lock();
|
||||
|
||||
int initConns = 0;
|
||||
|
||||
void DbThread::run()
|
||||
{
|
||||
s_libMutex.lock();
|
||||
if (0 == s_libCounter++) {
|
||||
DDebug(&module,DebugAll,"Initializing the MySQL library");
|
||||
mysql_library_init(0,0,0);
|
||||
}
|
||||
s_libMutex.unlock();
|
||||
|
||||
for (int i = 0; i < m_poolSize; i++) {
|
||||
MyConn* mySqlConn = new MyConn(toString() + "." + String(i), this);
|
||||
|
||||
mySqlConn->m_conn = mysql_init(mySqlConn->m_conn);
|
||||
if (!mySqlConn->m_conn) {
|
||||
Debug(&module,DebugGoOn,"Could not start connection %d for '%s'",i,c_str());
|
||||
mysql_thread_end();
|
||||
continue;
|
||||
}
|
||||
DDebug(&module,DebugAll,"Connection '%s' for account '%s' was created",mySqlConn->c_str(),c_str());
|
||||
|
||||
if (m_compress)
|
||||
mysql_options(mySqlConn->m_conn,MYSQL_OPT_COMPRESS,0);
|
||||
|
||||
mysql_options(mySqlConn->m_conn,MYSQL_OPT_CONNECT_TIMEOUT,(const char*)&m_timeout);
|
||||
|
||||
#ifdef MYSQL_OPT_READ_TIMEOUT
|
||||
mysql_options(mySqlConn->m_conn,MYSQL_OPT_READ_TIMEOUT,(const char*)&m_timeout);
|
||||
#endif
|
||||
|
||||
#ifdef MYSQL_OPT_WRITE_TIMEOUT
|
||||
mysql_options(mySqlConn->m_conn,MYSQL_OPT_WRITE_TIMEOUT,(const char*)&m_timeout);
|
||||
#endif
|
||||
|
||||
if (mysql_real_connect(mySqlConn->m_conn,m_host,m_user,m_pass,m_db,m_port,m_unix,CLIENT_MULTI_STATEMENTS)) {
|
||||
|
||||
#ifdef MYSQL_OPT_RECONNECT
|
||||
// this option must be set after connect - bug in mysql client library
|
||||
my_bool reconn = 1;
|
||||
mysql_options(mySqlConn->m_conn,MYSQL_OPT_RECONNECT,(const char*)&reconn);
|
||||
#endif
|
||||
|
||||
#ifdef HAVE_MYSQL_SET_CHARSET
|
||||
if (m_encoding && mysql_set_character_set(mySqlConn->m_conn,m_encoding))
|
||||
Debug(&module,DebugWarn,"Failed to set encoding '%s' on connection '%s'",
|
||||
m_encoding.c_str(),mySqlConn->c_str());
|
||||
#else
|
||||
if (m_encoding)
|
||||
Debug(&module,DebugWarn,"Your client library does not support setting the character set");
|
||||
#endif
|
||||
DbThread* thread = new DbThread(mySqlConn);
|
||||
|
||||
if (thread->startup()) {
|
||||
mySqlConn->setInit();
|
||||
m_connections.append(mySqlConn);
|
||||
initConns++;
|
||||
}
|
||||
else {
|
||||
delete thread;
|
||||
TelEngine::destruct(mySqlConn);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!initConns) {
|
||||
Debug(&module,DebugWarn,"Could not initiate any connections for account '%s'",
|
||||
c_str());
|
||||
return false;
|
||||
}
|
||||
if (initConns != m_poolSize)
|
||||
Debug(&module,DebugMild,"Could initiate only %d of %d connections for account '%s'",
|
||||
initConns,m_poolSize,c_str());
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
// drop the connection
|
||||
void MyAcct::dropDb()
|
||||
{
|
||||
Lock mylock(this);
|
||||
|
||||
ObjList* o = m_connections.skipNull();
|
||||
for (; o; o = o->skipNext()) {
|
||||
MyConn* c = static_cast<MyConn*>(o->get());
|
||||
if (c)
|
||||
c->closeConn();
|
||||
}
|
||||
m_queryQueue.clear();
|
||||
Debug(&module,DebugNote,"Database account '%s' closed",c_str());
|
||||
|
||||
s_libMutex.lock();
|
||||
if (0 == --s_libCounter) {
|
||||
DDebug(&module,DebugInfo,"Deinitializing the MySQL library");
|
||||
mysql_library_end();
|
||||
}
|
||||
s_libMutex.unlock();
|
||||
}
|
||||
|
||||
void MyAcct::appendQuery(DbQuery* query)
|
||||
{
|
||||
DDebug(&module, DebugAll, "Account '%s' received a new query %p",c_str(),query);
|
||||
m_queueMutex.lock();
|
||||
m_queryQueue.append(query);
|
||||
m_queueMutex.unlock();
|
||||
m_queueSem.unlock();
|
||||
}
|
||||
|
||||
/**
|
||||
* DbThread
|
||||
*/
|
||||
void DbThread::run()
|
||||
{
|
||||
mysql_thread_init();
|
||||
if (m_conn->initDb()) {
|
||||
if (m_conn->isInitialized()) {
|
||||
m_conn->m_thread = this;
|
||||
m_conn->runQueries();
|
||||
}
|
||||
|
@ -339,48 +501,60 @@ void DbThread::run()
|
|||
|
||||
void DbThread::cleanup()
|
||||
{
|
||||
Debug(&module,DebugNote,"Cleaning up connection %p",m_conn);
|
||||
Debug(&module,DebugInfo,"Cleaning up connection %p thread [%p]",m_conn,this);
|
||||
if (m_conn) {
|
||||
m_conn->m_thread = 0;
|
||||
m_conn->dropDb();
|
||||
m_conn->closeConn();
|
||||
}
|
||||
mysql_thread_end();
|
||||
s_libMutex.lock();
|
||||
if (0 == --s_libCounter) {
|
||||
DDebug(&module,DebugAll,"Deinitializing the MySQL library");
|
||||
mysql_library_end();
|
||||
}
|
||||
s_libMutex.unlock();
|
||||
}
|
||||
|
||||
|
||||
static MyConn* findDb(String& account)
|
||||
static MyAcct* findDb(const String& account)
|
||||
{
|
||||
if (account.null())
|
||||
return 0;
|
||||
ObjList* l = s_conns.find(account);
|
||||
return l ? static_cast<MyConn *>(l->get()): 0;
|
||||
return l ? static_cast<MyAcct*>(l->get()) : 0;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* MyHandler
|
||||
*/
|
||||
bool MyHandler::received(Message& msg)
|
||||
{
|
||||
String tmp(msg.getValue("account"));
|
||||
if (tmp.null())
|
||||
const String* str = msg.getParam("account");
|
||||
if (TelEngine::null(str))
|
||||
return false;
|
||||
Lock lock(s_conmutex);
|
||||
MyConn* db = findDb(tmp);
|
||||
Lock lock(s_acctMutex);
|
||||
MyAcct* db = findDb(*str);
|
||||
if (!(db && db->ok()))
|
||||
return false;
|
||||
Lock lo(db);
|
||||
lock.drop();
|
||||
String query(msg.getValue("query"));
|
||||
db->queryDb(query,&msg);
|
||||
|
||||
str = msg.getParam("query");
|
||||
if (!TelEngine::null(str)) {
|
||||
if (msg.getBoolValue("results",true)) {
|
||||
DbQuery* q = new DbQuery(*str,&msg);
|
||||
db->appendQuery(q);
|
||||
|
||||
while (!q->finished()) {
|
||||
Thread::check();
|
||||
q->lock(Thread::idleUsec());
|
||||
}
|
||||
TelEngine::destruct(q);
|
||||
}
|
||||
else
|
||||
db->appendQuery(new DbQuery(*str,0));
|
||||
}
|
||||
msg.setParam("dbtype","mysqldb");
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* MyModule
|
||||
*/
|
||||
MyModule::MyModule()
|
||||
: Module ("mysqldb","database",true),m_init(true)
|
||||
{
|
||||
|
@ -410,24 +584,19 @@ void MyModule::initialize()
|
|||
NamedList* sec = cfg.getSection(i);
|
||||
if (!sec || (*sec == "general"))
|
||||
continue;
|
||||
MyConn* conn = findDb(*sec);
|
||||
MyAcct* conn = findDb(*sec);
|
||||
if (conn) {
|
||||
if (!conn->ok()) {
|
||||
Debug(this,DebugNote,"Reinitializing connection '%s'",conn->toString().c_str());
|
||||
DbThread* thr = new DbThread(conn);
|
||||
if (!thr->startup())
|
||||
delete thr;
|
||||
conn->initDb();
|
||||
}
|
||||
continue;
|
||||
}
|
||||
conn = new MyConn(sec);
|
||||
DbThread* thr = new DbThread(conn);
|
||||
if (thr->startup())
|
||||
s_conns.insert(conn);
|
||||
else {
|
||||
delete thr;
|
||||
TelEngine::destruct(conn);
|
||||
}
|
||||
|
||||
conn = new MyAcct(sec);
|
||||
s_conns.insert(conn);
|
||||
if (!conn->initDb())
|
||||
s_conns.remove(conn);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -420,7 +420,7 @@ bool AAAHandler::received(Message& msg)
|
|||
if (s_critical)
|
||||
return failure(&msg);
|
||||
Message m("database");
|
||||
prepareQuery(m,account,query,false);
|
||||
prepareQuery(m,account,query,true);
|
||||
if (Engine::dispatch(m))
|
||||
if (m.getIntValue("affected") >= 1 || m.getIntValue("rows") >=1)
|
||||
return true;
|
||||
|
@ -486,7 +486,8 @@ bool AAAHandler::received(Message& msg)
|
|||
{
|
||||
// no error check needed on unregister - we return false
|
||||
Message m("database");
|
||||
prepareQuery(m,account,query,false);
|
||||
prepareQuery(m,account,query,true);
|
||||
// we don't enqueue the message because we must assure ourselves that this message is processed synchronously
|
||||
Engine::dispatch(m);
|
||||
}
|
||||
break;
|
||||
|
|
Loading…
Reference in New Issue