If connections fail to be initialized, try again periodically until whole pool is initialized.

git-svn-id: http://voip.null.ro/svn/yate@4803 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
oana 2012-01-13 11:49:16 +00:00
parent 703e3314ab
commit ae376bbafc
2 changed files with 158 additions and 47 deletions

View File

@ -13,6 +13,9 @@
; timeout: int: Query timeout in milliseconds - will be rounded to seconds
;timeout=10000
; initretry: int: Interval (in seconds) to retry creating failed connections. Setting it to 0 will disable retrying.
;initretry=10
; host: string: MySQL server to connect to, defaults to local
;host=

View File

@ -51,6 +51,7 @@ class DbQuery;
class DbQueryList;
class MySqlConn;
class MyAcct;
class InitThread;
static ObjList s_conns;
static unsigned int s_failedConns;
@ -75,13 +76,6 @@ public:
~MyConn();
void closeConn();
inline void setInit(bool val = true)
{ m_init = val; }
inline bool isInitialized()
{ return m_init; }
void runQueries();
int queryDbInternal(DbQuery* query);
@ -105,6 +99,7 @@ public:
~MyAcct();
bool initDb();
int initConns();
void dropDb();
inline bool ok() const
{ return 0 != m_connections.skipNull(); }
@ -127,9 +122,20 @@ public:
{ return ((int)(m_poolSize - m_failedConns) > 0 ? true : false); }
inline unsigned int queryTime()
{ return (unsigned int) m_queryTime; } //microseconds
inline void setRetryWhen()
{ m_retryWhen = Time::msecNow() + m_retryTime * 1000; }
inline u_int64_t retryWhen()
{ return m_retryWhen; }
inline bool shouldRetryInit()
{ return m_retryTime && m_connections.count() < (unsigned int)m_poolSize; }
inline int poolSize()
{ return m_poolSize; }
private:
unsigned int m_timeout;
// interval at which connection initialization should be tried
unsigned int m_retryTime;
// when should we try initialization again?
u_int64_t m_retryWhen;
String m_host;
String m_user;
@ -185,6 +191,19 @@ public:
virtual bool received(Message& msg);
};
/**
* Class InitThread
* Running thread for initializing MySQL connections
*/
class InitThread : public Thread
{
public:
InitThread();
~InitThread();
virtual void run();
virtual void cleanup();
};
/**
* Class MyModule
* The MySQL database module
@ -195,6 +214,15 @@ public:
MyModule();
~MyModule();
void statusModule(String& str);
virtual bool received(Message& msg, int id);
InitThread* m_initThread;
inline void startInitThread()
{
lock();
if (!m_initThread)
(m_initThread = new InitThread())->startup();
unlock();
}
protected:
virtual void initialize();
virtual void statusParams(String& str);
@ -247,7 +275,7 @@ MyConn::~MyConn()
{
m_conn = 0;
//closeConn();
Debug(&module,DebugInfo,"Database connection '%s' destroyed",c_str());
Debug(&module,DebugAll,"Database connection '%s' destroyed",c_str());
}
void MyConn::closeConn()
@ -433,6 +461,9 @@ MyAcct::MyAcct(const NamedList* sect)
m_poolSize = MAX_CONNECTIONS;
Debug(&module, DebugNote, "For account '%s' connection pool size is %d",
c_str(),m_poolSize);
m_retryTime = sect->getIntValue("initretry",10); // default value is 10 seconds
setRetryWhen(); // set retry interval
}
MyAcct::~MyAcct()
@ -443,33 +474,22 @@ MyAcct::~MyAcct()
dropDb();
}
// initialize the database connection
bool MyAcct::initDb()
int MyAcct::initConns()
{
Lock lock(this);
// allow specifying the raw connection string
Debug(&module,DebugNote,"Initiating pool of %d connections for '%s'",
m_poolSize,c_str());
m_queueSem.lock();
int initConns = 0;
s_libMutex.lock();
if (0 == s_libCounter++) {
DDebug(&module,DebugAll,"Initializing the MySQL library");
mysql_library_init(0,0,0);
}
s_libMutex.unlock();
for (int i = 0; i < m_poolSize; i++) {
int count = m_connections.count();
DDebug(&module,DebugInfo,"MyAcct::initConns() - %d connections initialized already, pool required is of %d connections for '%s'",
count,m_poolSize,c_str());
// set new retry interval
setRetryWhen();
for (int i = count; i < m_poolSize; i++) {
MyConn* mySqlConn = new MyConn(toString() + "." + String(i), this);
mySqlConn->m_conn = mysql_init(mySqlConn->m_conn);
if (!mySqlConn->m_conn) {
Debug(&module,DebugGoOn,"Could not start connection %d for '%s'",i,c_str());
mysql_thread_end();
continue;
return i;
}
DDebug(&module,DebugAll,"Connection '%s' for account '%s' was created",mySqlConn->c_str(),c_str());
@ -504,26 +524,51 @@ bool MyAcct::initDb()
#endif
DbThread* thread = new DbThread(mySqlConn);
if (thread->startup()) {
mySqlConn->setInit();
if (thread->startup())
m_connections.append(mySqlConn);
initConns++;
}
else {
delete thread;
TelEngine::destruct(mySqlConn);
}
}
else {
TelEngine::destruct(mySqlConn);
return i;
}
}
if (!initConns) {
Debug(&module,DebugWarn,"Could not initiate any connections for account '%s'",
c_str());
return false;
}
if (initConns != m_poolSize)
Debug(&module,DebugMild,"Could initiate only %d of %d connections for account '%s'",
initConns,m_poolSize,c_str());
return m_poolSize;
}
// initialize the database connection
bool MyAcct::initDb()
{
Lock lock(this);
// allow specifying the raw connection string
Debug(&module,DebugNote,"Initiating pool of %d connections for '%s'",
m_poolSize,c_str());
m_queueSem.lock();
s_libMutex.lock();
if (0 == s_libCounter++) {
DDebug(&module,DebugAll,"Initializing the MySQL library");
mysql_library_init(0,0,0);
}
s_libMutex.unlock();
int initCons = initConns();
if (!initCons) {
Debug(&module,DebugWarn,"Could not initiate any connections for account '%s', trying again in %d seconds",
c_str(),m_retryTime);
module.startInitThread();
return true;
}
if (initCons != m_poolSize) {
Debug(&module,DebugMild,"Could initiate only %d of %d connections for account '%s', trying again in %d seconds",
initCons,m_poolSize,c_str(),m_retryTime);
module.startInitThread();
}
return true;
}
@ -618,10 +663,8 @@ void MyAcct::appendQuery(DbQuery* query)
void DbThread::run()
{
mysql_thread_init();
if (m_conn->isInitialized()) {
m_conn->m_thread = this;
m_conn->runQueries();
}
m_conn->m_thread = this;
m_conn->runQueries();
}
void DbThread::cleanup()
@ -677,11 +720,63 @@ bool MyHandler::received(Message& msg)
return true;
}
/**
* InitThread
*/
InitThread::InitThread()
: Thread("Mysql Init")
{
Debug(&module,DebugAll,"InitThread created [%p]",this);
}
InitThread::~InitThread()
{
Debug(&module,DebugAll,"InitThread thread terminated [%p]",this);
module.lock();
module.m_initThread = 0;
module.unlock();
}
void InitThread::run()
{
mysql_thread_init();
while(!Engine::exiting()) {
Thread::sleep(1,true);
bool retryAgain = false;
s_acctMutex.lock();
for (ObjList* o = s_conns.skipNull(); o; o = o->next()) {
MyAcct* acc = static_cast<MyAcct*>(o->get());
if (acc->shouldRetryInit() && acc->retryWhen() <= Time::msecNow()) {
int count = acc->initConns();
if (count < acc->poolSize())
Debug(&module,(count ? DebugMild : DebugWarn),"Account '%s' has %d initialized connections out of "
" a pool of %d",acc->c_str(),count,acc->poolSize());
else
Debug(&module,DebugInfo,"All connections for account '%s' have been initialized, pool size is %d",
acc->c_str(),acc->poolSize());
}
if (acc->shouldRetryInit())
retryAgain = true;
}
s_acctMutex.unlock();
if (!retryAgain)
break;
}
}
void InitThread::cleanup()
{
Debug(&module,DebugInfo,"InitThread::cleanup() [%p]",this);
mysql_thread_end();
}
/**
* MyModule
*/
MyModule::MyModule()
: Module ("mysqldb","database",true),m_init(true)
: Module ("mysqldb","database",true),
m_initThread(0),
m_init(true)
{
Output("Loaded module MySQL based on %s",mysql_get_client_info());
}
@ -691,6 +786,9 @@ MyModule::~MyModule()
Output("Unloading module MySQL");
s_conns.clear();
s_failedConns = 0;
// Wait for expire thread termination
while (m_initThread)
Thread::idle();
}
void MyModule::statusModule(String& str)
@ -724,6 +822,7 @@ void MyModule::initialize()
Configuration cfg(Engine::configFile("mysqldb"));
if (m_init)
Engine::install(new MyHandler(cfg.getIntValue("general","priority",100)));
installRelay(Halt);
m_init = false;
s_failedConns = 0;
for (unsigned int i = 0; i < cfg.sections(); i++) {
@ -748,6 +847,15 @@ void MyModule::initialize()
}
}
bool MyModule::received(Message& msg, int id)
{
if (id == Halt) {
if (m_initThread)
m_initThread->cancel(true);
}
return Module::received(msg,id);
}
void MyModule::genUpdate(Message& msg)
{
Lock lock(this);