From 43d542db69e9f67cdbefb37fb753581ece3080a9 Mon Sep 17 00:00:00 2001 From: paulc Date: Mon, 26 Jan 2009 23:27:33 +0000 Subject: [PATCH] Fixed the MySQL library and thread initialization. Deal with failed connection attempts, try to reconnect on initialize. git-svn-id: http://yate.null.ro/svn/yate/trunk@2455 acf43c95-373e-0410-b603-e72c3f656dc1 --- modules/server/mysqldb.cpp | 77 ++++++++++++++++++++++++++++---------- 1 file changed, 57 insertions(+), 20 deletions(-) diff --git a/modules/server/mysqldb.cpp b/modules/server/mysqldb.cpp index 51005385..5252618f 100644 --- a/modules/server/mysqldb.cpp +++ b/modules/server/mysqldb.cpp @@ -43,21 +43,25 @@ using namespace TelEngine; namespace { // anonymous +class DbThread; + static ObjList s_conns; Mutex s_conmutex; class DbConn : public GenObject { + friend class DbThread; public: DbConn(const NamedList* sect); ~DbConn(); virtual const String& toString() const { return m_name; } - bool ok(); int queryDb(const char* query, Message* dest = 0); bool initDb(); void dropDb(); + inline bool ok() const + { return 0 != m_thread; } inline Mutex& mutex() { return m_dbmutex; } void runQueries(); @@ -71,7 +75,8 @@ private: int queryDbInternal(); String m_name; unsigned int m_timeout; - MYSQL *m_conn; + MYSQL* m_conn; + DbThread* m_thread; String m_host; String m_user; String m_pass; @@ -119,9 +124,12 @@ private: }; static MyModule module; +static Mutex s_libMutex(false); +static int s_libCounter = 0; DbConn::DbConn(const NamedList* sect) - : m_dbmutex(true), m_name(*sect), m_conn(0), m_msg(0), m_go(false) + : m_dbmutex(true), m_name(*sect), + m_conn(0), m_thread(0), m_msg(0), m_go(false) { int tout = sect->getIntValue("timeout",10000); // round to seconds @@ -268,8 +276,10 @@ void DbConn::runQueries() m_msg = 0; m_query.clear(); m_go = false; + Thread::yield(true); } - Thread::yield(true); + else + Thread::msleep(10,true); } } @@ -290,7 +300,7 @@ int DbConn::queryDb(const char* query, Message* dest) m_query = query; m_go = true; while (m_go) - Thread::yield(); + Thread::msleep(1); if (m_res < 0) failure(dest); return m_res; @@ -299,16 +309,33 @@ int DbConn::queryDb(const char* query, Message* dest) void DbThread::run() { - mysql_library_init(0,0,0); - if (m_conn->initDb()) + s_libMutex.lock(); + if (0 == s_libCounter++) { + DDebug(&module,DebugAll,"Initializing the MySQL library"); + mysql_library_init(0,0,0); + } + s_libMutex.unlock(); + mysql_thread_init(); + if (m_conn->initDb()) { + m_conn->m_thread = this; m_conn->runQueries(); + } } void DbThread::cleanup() { - m_conn->dropDb(); - mysql_library_end(); + Debug(&module,DebugNote,"Cleaning up connection %p",m_conn); + if (m_conn) { + m_conn->m_thread = 0; + m_conn->dropDb(); + } mysql_thread_end(); + s_libMutex.lock(); + if (0 == --s_libCounter) { + DDebug(&module,DebugAll,"Deinitializing the MySQL library"); + mysql_library_end(); + } + s_libMutex.unlock(); } @@ -320,6 +347,7 @@ static DbConn* findDb(String& account) return l ? static_cast(l->get()): 0; } + bool MyHandler::received(Message& msg) { String tmp(msg.getValue("account")); @@ -327,7 +355,7 @@ bool MyHandler::received(Message& msg) return false; Lock lock(s_conmutex); DbConn* db = findDb(tmp); - if (!db) + if (!(db && db->ok())) return false; Lock lo(db->mutex()); lock.drop(); @@ -337,8 +365,9 @@ bool MyHandler::received(Message& msg) return true; } + MyModule::MyModule() - : Module ("mysqldb","database",true),m_init(false) + : Module ("mysqldb","database",true),m_init(true) { Output("Loaded module MySQL based on %s",mysql_get_client_info()); } @@ -356,25 +385,33 @@ void MyModule::statusParams(String& str) void MyModule::initialize() { - Module::initialize(); - if (m_init) - return; - m_init = true; Output("Initializing module MySQL"); + Module::initialize(); Configuration cfg(Engine::configFile("mysqldb")); - Engine::install(new MyHandler(cfg.getIntValue("general","priority",100))); - unsigned int i; - for (i = 0; i < cfg.sections(); i++) { + if (m_init) + Engine::install(new MyHandler(cfg.getIntValue("general","priority",100))); + m_init = false; + for (unsigned int i = 0; i < cfg.sections(); i++) { NamedList* sec = cfg.getSection(i); if (!sec || (*sec == "general")) continue; - DbConn* conn = new DbConn(sec); + DbConn* conn = findDb(*sec); + if (conn) { + if (!conn->ok()) { + Debug(this,DebugNote,"Reinitializing connection '%s'",conn->toString().c_str()); + DbThread* thr = new DbThread(conn); + if (!thr->startup()) + delete thr; + } + continue; + } + conn = new DbConn(sec); DbThread* thr = new DbThread(conn); if (thr->startup()) s_conns.insert(conn); else { delete thr; - conn->destruct(); + TelEngine::destruct(conn); } } }