Fixed the MySQL library and thread initialization.

Deal with failed connection attempts, try to reconnect on initialize.


git-svn-id: http://yate.null.ro/svn/yate/trunk@2455 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
paulc 2009-01-26 23:27:33 +00:00
parent 45f1b5b75a
commit 43d542db69
1 changed files with 57 additions and 20 deletions

View File

@ -43,21 +43,25 @@
using namespace TelEngine; using namespace TelEngine;
namespace { // anonymous namespace { // anonymous
class DbThread;
static ObjList s_conns; static ObjList s_conns;
Mutex s_conmutex; Mutex s_conmutex;
class DbConn : public GenObject class DbConn : public GenObject
{ {
friend class DbThread;
public: public:
DbConn(const NamedList* sect); DbConn(const NamedList* sect);
~DbConn(); ~DbConn();
virtual const String& toString() const virtual const String& toString() const
{ return m_name; } { return m_name; }
bool ok();
int queryDb(const char* query, Message* dest = 0); int queryDb(const char* query, Message* dest = 0);
bool initDb(); bool initDb();
void dropDb(); void dropDb();
inline bool ok() const
{ return 0 != m_thread; }
inline Mutex& mutex() inline Mutex& mutex()
{ return m_dbmutex; } { return m_dbmutex; }
void runQueries(); void runQueries();
@ -71,7 +75,8 @@ private:
int queryDbInternal(); int queryDbInternal();
String m_name; String m_name;
unsigned int m_timeout; unsigned int m_timeout;
MYSQL *m_conn; MYSQL* m_conn;
DbThread* m_thread;
String m_host; String m_host;
String m_user; String m_user;
String m_pass; String m_pass;
@ -119,9 +124,12 @@ private:
}; };
static MyModule module; static MyModule module;
static Mutex s_libMutex(false);
static int s_libCounter = 0;
DbConn::DbConn(const NamedList* sect) DbConn::DbConn(const NamedList* sect)
: m_dbmutex(true), m_name(*sect), m_conn(0), m_msg(0), m_go(false) : m_dbmutex(true), m_name(*sect),
m_conn(0), m_thread(0), m_msg(0), m_go(false)
{ {
int tout = sect->getIntValue("timeout",10000); int tout = sect->getIntValue("timeout",10000);
// round to seconds // round to seconds
@ -268,8 +276,10 @@ void DbConn::runQueries()
m_msg = 0; m_msg = 0;
m_query.clear(); m_query.clear();
m_go = false; m_go = false;
Thread::yield(true);
} }
Thread::yield(true); else
Thread::msleep(10,true);
} }
} }
@ -290,7 +300,7 @@ int DbConn::queryDb(const char* query, Message* dest)
m_query = query; m_query = query;
m_go = true; m_go = true;
while (m_go) while (m_go)
Thread::yield(); Thread::msleep(1);
if (m_res < 0) if (m_res < 0)
failure(dest); failure(dest);
return m_res; return m_res;
@ -299,16 +309,33 @@ int DbConn::queryDb(const char* query, Message* dest)
void DbThread::run() void DbThread::run()
{ {
mysql_library_init(0,0,0); s_libMutex.lock();
if (m_conn->initDb()) if (0 == s_libCounter++) {
DDebug(&module,DebugAll,"Initializing the MySQL library");
mysql_library_init(0,0,0);
}
s_libMutex.unlock();
mysql_thread_init();
if (m_conn->initDb()) {
m_conn->m_thread = this;
m_conn->runQueries(); m_conn->runQueries();
}
} }
void DbThread::cleanup() void DbThread::cleanup()
{ {
m_conn->dropDb(); Debug(&module,DebugNote,"Cleaning up connection %p",m_conn);
mysql_library_end(); if (m_conn) {
m_conn->m_thread = 0;
m_conn->dropDb();
}
mysql_thread_end(); mysql_thread_end();
s_libMutex.lock();
if (0 == --s_libCounter) {
DDebug(&module,DebugAll,"Deinitializing the MySQL library");
mysql_library_end();
}
s_libMutex.unlock();
} }
@ -320,6 +347,7 @@ static DbConn* findDb(String& account)
return l ? static_cast<DbConn *>(l->get()): 0; return l ? static_cast<DbConn *>(l->get()): 0;
} }
bool MyHandler::received(Message& msg) bool MyHandler::received(Message& msg)
{ {
String tmp(msg.getValue("account")); String tmp(msg.getValue("account"));
@ -327,7 +355,7 @@ bool MyHandler::received(Message& msg)
return false; return false;
Lock lock(s_conmutex); Lock lock(s_conmutex);
DbConn* db = findDb(tmp); DbConn* db = findDb(tmp);
if (!db) if (!(db && db->ok()))
return false; return false;
Lock lo(db->mutex()); Lock lo(db->mutex());
lock.drop(); lock.drop();
@ -337,8 +365,9 @@ bool MyHandler::received(Message& msg)
return true; return true;
} }
MyModule::MyModule() MyModule::MyModule()
: Module ("mysqldb","database",true),m_init(false) : Module ("mysqldb","database",true),m_init(true)
{ {
Output("Loaded module MySQL based on %s",mysql_get_client_info()); Output("Loaded module MySQL based on %s",mysql_get_client_info());
} }
@ -356,25 +385,33 @@ void MyModule::statusParams(String& str)
void MyModule::initialize() void MyModule::initialize()
{ {
Module::initialize();
if (m_init)
return;
m_init = true;
Output("Initializing module MySQL"); Output("Initializing module MySQL");
Module::initialize();
Configuration cfg(Engine::configFile("mysqldb")); Configuration cfg(Engine::configFile("mysqldb"));
Engine::install(new MyHandler(cfg.getIntValue("general","priority",100))); if (m_init)
unsigned int i; Engine::install(new MyHandler(cfg.getIntValue("general","priority",100)));
for (i = 0; i < cfg.sections(); i++) { m_init = false;
for (unsigned int i = 0; i < cfg.sections(); i++) {
NamedList* sec = cfg.getSection(i); NamedList* sec = cfg.getSection(i);
if (!sec || (*sec == "general")) if (!sec || (*sec == "general"))
continue; continue;
DbConn* conn = new DbConn(sec); DbConn* conn = findDb(*sec);
if (conn) {
if (!conn->ok()) {
Debug(this,DebugNote,"Reinitializing connection '%s'",conn->toString().c_str());
DbThread* thr = new DbThread(conn);
if (!thr->startup())
delete thr;
}
continue;
}
conn = new DbConn(sec);
DbThread* thr = new DbThread(conn); DbThread* thr = new DbThread(conn);
if (thr->startup()) if (thr->startup())
s_conns.insert(conn); s_conns.insert(conn);
else { else {
delete thr; delete thr;
conn->destruct(); TelEngine::destruct(conn);
} }
} }
} }