Fixed major memory leak in threads.

Added automatic restart when no module is busy.


git-svn-id: http://voip.null.ro/svn/yate@101 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
paulc 2004-11-01 00:07:00 +00:00
parent 6d49aa01b6
commit 92064e1651
19 changed files with 205 additions and 55 deletions

View File

@ -13,7 +13,7 @@ DEFS :=
LIBAUX:= -ldl
LIBTHR:= -lpthread
INCLUDES := -I@srcdir@
CFLAGS := -O2 @MODULE_CFLAGS@ -finline -Winline
CFLAGS := -O2 @MODULE_CFLAGS@ @INLINE_FLAGS@
LDFLAGS:=
PROGS:= yate
@ -84,18 +84,6 @@ love:
war:
@echo 'Please make love instead!'
.PHONY: run
run: all
-./run
.PHONY: gdb-run
gdb-run: all
-./run --gdb
.PHONY: gdb-core
gdb-core: all
-./run --core
.PHONY: test
test: engine
$(MAKE) -C ./test all

View File

@ -569,6 +569,8 @@ public:
GtkClientPlugin();
~GtkClientPlugin();
virtual void initialize();
virtual bool isBusy() const
{ return true; }
private:
MessageHandler *m_route;
bool m_init;
@ -602,7 +604,8 @@ void GtkClientPlugin::initialize()
{
m_route = new GtkClientHandler(priority);
Engine::install(m_route);
new GtkClient;
GtkClient *gtk = new GtkClient;
gtk->startup();
}
}
// int argc = 0;

View File

@ -41,6 +41,26 @@ AC_CACHE_SAVE
AC_CHECK_LIB([dl], [dlopen], , [AC_MSG_ERROR([This library is required.])])
AC_CHECK_LIB([pthread], [pthread_create], , [AC_MSG_ERROR([This library is required.])])
# Check for compile options
INLINE_FLAGS=""
AC_ARG_ENABLE(inline,AC_HELP_STRING([--enable-inline],[Enable inlining of functions]),want_inline=$enableval,want_inline=auto)
AC_MSG_CHECKING([whether to use inline functions])
case "x$want_inline" in
xyes)
INLINE_FLAGS="-finline -Winline"
;;
xno)
INLINE_FLAGS="-fno-inline"
;;
xauto)
;;
*)
AC_ERROR([Invalid argument passed to --enable-inline])
;;
esac
AC_MSG_RESULT([$want_inline])
AC_SUBST(INLINE_FLAGS)
# Checks for optional libraries.
HAVE_PGSQL=no

View File

@ -427,10 +427,13 @@ ThreadedSource::~ThreadedSource()
delete m_thread;
}
void ThreadedSource::start(const char *name)
bool ThreadedSource::start(const char *name)
{
if (!m_thread)
if (!m_thread) {
m_thread = new ThreadedSourcePrivate(this,name);
m_thread->startup();
}
return m_thread->running();
}
void ThreadedSource::cleanup()

View File

@ -52,6 +52,7 @@ using namespace TelEngine;
#define MAX_SANITY 5
static unsigned long long s_nextinit = 0;
static unsigned long long s_restarts = 0;
static bool s_makeworker = true;
static bool s_keepclosing = false;
static int s_super_handle = -1;
@ -158,6 +159,7 @@ bool EngineStatusHandler::received(Message &msg)
return false;
msg.retValue() << "engine";
msg.retValue() << ",plugins=" << plugins.count();
msg.retValue() << ",inuse=" << Engine::self()->usedPlugins();
msg.retValue() << ",threads=" << Thread::count();
msg.retValue() << ",workers=" << EnginePrivate::count;
msg.retValue() << ",mutexes=" << Mutex::count();
@ -198,10 +200,17 @@ int Engine::run()
{
Debug(DebugAll,"Engine::run()");
install(new EngineStatusHandler);
if (s_super_handle >= 0)
install(new EngineSuperHandler);
loadPlugins();
Debug(DebugInfo,"plugins.count() = %d",plugins.count());
if (s_super_handle >= 0) {
install(new EngineSuperHandler);
if (s_restarts)
s_restarts = 1000000 * s_restarts + Time::now();
}
else if (s_restarts) {
Debug(DebugWarn,"No supervisor - disabling automatic restarts");
s_restarts = 0;
}
initPlugins();
::signal(SIGINT,sighandler);
::signal(SIGTERM,sighandler);
@ -219,11 +228,21 @@ int Engine::run()
// Create worker thread if we didn't hear about any of them in a while
if (s_makeworker && (EnginePrivate::count < s_maxworkers)) {
Debug(DebugInfo,"Creating new message dispatching thread");
new EnginePrivate;
EnginePrivate *prv = new EnginePrivate;
prv->startup();
}
else
s_makeworker = true;
if (s_restarts && (Time::now() >= s_restarts)) {
if (!(usedPlugins() || dispatch("engine.busy"))) {
s_haltcode = 128;
break;
}
// If we cannot restart now try again in 10s
s_restarts = Time::now() + 10000000;
}
// Attempt to sleep until the next full second
unsigned long t = (Time::now() + corr) % 1000000;
::usleep(1000000 - t);
@ -298,6 +317,8 @@ void Engine::loadPlugins()
const char *name = cfg.getValue("general","modpath");
if (name)
s_modpath = name;
s_maxworkers = cfg.getIntValue("general","maxworkers",s_maxworkers);
s_restarts = cfg.getIntValue("general","restarts");
NamedList *l = cfg.getSection("preload");
if (l) {
unsigned int len = l->length();
@ -351,6 +372,18 @@ void Engine::initPlugins()
}
}
int Engine::usedPlugins()
{
int used = 0;
ObjList *l = &plugins;
for (; l; l = l->next()) {
Plugin *p = static_cast<Plugin *>(l->get());
if (p && p->isBusy())
used++;
}
return used;
}
void Engine::halt(unsigned int code)
{
s_haltcode = code;
@ -453,8 +486,11 @@ static int supervise(void)
if (tmp > 0) {
// Child exited for some reason
if (WIFEXITED(status)) {
s_runagain = false;
retcode = WEXITSTATUS(status);
if (retcode <= 127)
s_runagain = false;
else
retcode &= 127;
}
else if (WIFSIGNALED(status)) {
retcode = WTERMSIG(status);
@ -484,7 +520,7 @@ static int supervise(void)
// If -Da was specified try to get a corefile
if (s_sigabrt) {
::kill(s_childpid,SIGABRT);
::usleep(10000);
::usleep(250000);
}
::kill(s_childpid,SIGKILL);
::usleep(10000);
@ -616,7 +652,6 @@ int Engine::main(int argc, const char **argv, const char **environ)
while (*++pc) {
switch (*pc) {
case 'a':
abortOnBug(true);
s_sigabrt = true;
break;
case 'c':
@ -681,6 +716,7 @@ int Engine::main(int argc, const char **argv, const char **environ)
}
}
debugLevel(debug_level);
abortOnBug(s_sigabrt);
int retcode = supervised ? supervise() : -1;
if (retcode >= 0)

View File

@ -27,6 +27,7 @@ public:
Thread *m_thread;
pthread_t thread;
bool m_running;
bool m_started;
bool m_updest;
const char *m_name;
private:
@ -66,7 +67,7 @@ ThreadPrivate *ThreadPrivate::create(Thread *t,const char *name)
}
ThreadPrivate::ThreadPrivate(Thread *t,const char *name)
: m_thread(t), m_running(false), m_updest(true), m_name(name)
: m_thread(t), m_running(false), m_started(false), m_updest(true), m_name(name)
{
#ifdef DEBUG
Debugger debug("ThreadPrivate::ThreadPrivate","(%p,\"%s\") [%p]",t,name,this);
@ -121,8 +122,9 @@ void ThreadPrivate::run()
::pthread_setspecific(current_key,this);
pthread_cleanup_push(cleanupFunc,this);
::pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,0);
::pthread_detach(::pthread_self());
while (!m_running)
while (!m_started)
::usleep(10);
m_thread->run();
pthread_cleanup_pop(1);
@ -289,7 +291,15 @@ bool Thread::error() const
bool Thread::running() const
{
return m_private ? m_private->m_running : false;
return m_private ? m_private->m_started : false;
}
bool Thread::startup()
{
if (!m_private)
return false;
m_private->m_started = true;
return true;
}
Thread *Thread::current()

View File

@ -11,7 +11,7 @@ CC := g++ -Wall
SED := sed
DEFS :=
INCLUDES := -I@top_srcdir@
CFLAGS := -O2 @MODULE_CFLAGS@
CFLAGS := -O2 @MODULE_CFLAGS@ @INLINE_FLAGS@
LDFLAGS:= -L.. -lyate
MODFLAGS:= @MODULE_LDFLAGS@
MODSTRIP:= @MODULE_SYMBOLS@

View File

@ -164,6 +164,7 @@ public:
ExtModulePlugin();
~ExtModulePlugin();
virtual void initialize();
virtual bool isBusy() const;
private:
ExtModHandler *m_handler;
};
@ -360,7 +361,8 @@ ExtModReceiver::~ExtModReceiver()
bool ExtModReceiver::start()
{
if (m_pid < 0) {
new ExtThread(this);
ExtThread *ext = new ExtThread(this);
ext->startup();
while (m_pid < 0)
Thread::yield();
}
@ -827,6 +829,11 @@ ExtModulePlugin::~ExtModulePlugin()
s_chans.clear();
}
bool ExtModulePlugin::isBusy() const
{
return (s_chans.count() != 0);
}
void ExtModulePlugin::initialize()
{
Output("Initializing module ExtModule");

View File

@ -30,6 +30,7 @@ public:
GsmPlugin();
~GsmPlugin();
virtual void initialize() { }
virtual bool isBusy() const;
virtual DataTranslator *create(const String &sFormat, const String &dFormat);
virtual const TranslatorCaps *getCapabilities() const;
};
@ -119,6 +120,11 @@ GsmPlugin::~GsmPlugin()
Output("Unloading module GSM with %d codecs still in use",count);
}
bool GsmPlugin::isBusy() const
{
return (count != 0);
}
DataTranslator *GsmPlugin::create(const String &sFormat, const String &dFormat)
{
if (sFormat == "slin" && dFormat == "gsm")

View File

@ -283,12 +283,10 @@ class H323MsgThread : public Thread
{
public:
H323MsgThread(Message *msg, const char *id)
: Thread("H323MsgThread"), m_msg(msg), m_id(id) { m_mutex.lock(); }
: Thread("H323MsgThread"), m_msg(msg), m_id(id) { }
virtual void run();
virtual void cleanup();
bool route();
inline void resume()
{ m_mutex.unlock(); }
inline static int count()
{ return s_count; }
inline static int routed()
@ -296,7 +294,6 @@ public:
private:
Message *m_msg;
String m_id;
Mutex m_mutex;
static int s_count;
static int s_routed;
};
@ -314,6 +311,7 @@ public:
H323Plugin();
virtual ~H323Plugin();
virtual void initialize();
virtual bool isBusy() const;
void cleanup();
YateH323Connection *findConnectionLock(const char *id);
inline YateH323EndPoint *ep()
@ -335,6 +333,7 @@ int YateH323Connection::s_total = 0;
int H323MsgThread::s_count = 0;
int H323MsgThread::s_routed = 0;
bool H323MsgThread::route()
{
Debug(DebugAll,"Routing thread for %s [%p]",m_id.c_str(),this);
@ -345,7 +344,7 @@ bool H323MsgThread::route()
bool ok = Engine::dispatch(m_msg) && !m_msg->retValue().null();
YateH323Connection *conn = hplugin.findConnectionLock(m_id);
if (!conn) {
Debug(DebugMild,"YateH323Connection '%s' wanished while routing!",m_id.c_str());
Debug(DebugMild,"YateH323Connection '%s' vanished while routing!",m_id.c_str());
return false;
}
if (ok) {
@ -381,8 +380,6 @@ void H323MsgThread::run()
s_count++;
s_route.unlock();
Debug(DebugAll,"Started routing thread for %s [%p]",m_id.c_str(),this);
m_mutex.lock();
m_mutex.unlock();
bool ok = route();
s_route.lock();
s_count--;
@ -438,6 +435,10 @@ YateH323EndPoint::~YateH323EndPoint()
H323Connection *YateH323EndPoint::CreateConnection(unsigned callReference,
void *userData, H323Transport *transport, H323SignalPDU *setupPDU)
{
if (Engine::exiting()) {
Debug(DebugWarn,"Refusing new connection, engine is exiting");
return 0;
}
if (s_maxconns > 0) {
s_calls.lock();
int cnt = hplugin.calls().count();
@ -579,7 +580,8 @@ YateH323Connection::YateH323Connection(YateH323EndPoint &endpoint,
YateH323Connection::~YateH323Connection()
{
Debug(DebugAll,"YateH323Connection::~YateH323Connection() [%p]",this);
Debug(DebugAll,"YateH323Connection::~YateH323Connection() %s %s [%p]",
m_status.c_str(),m_id.c_str(),this);
setStatus("destroyed");
s_calls.lock();
hplugin.calls().remove(this,false);
@ -591,7 +593,8 @@ YateH323Connection::~YateH323Connection()
H323Connection::AnswerCallResponse YateH323Connection::OnAnswerCall(const PString &caller,
const H323SignalPDU &setupPDU, H323SignalPDU &connectPDU)
{
Debug(DebugInfo,"YateH323Connection::OnAnswerCall caller='%s' [%p]",(const char *)caller,this);
Debug(DebugInfo,"YateH323Connection::OnAnswerCall caller='%s' in %s [%p]",
(const char *)caller,m_id.c_str(),this);
setStatus("incoming");
if (Engine::exiting()) {
@ -640,14 +643,12 @@ H323Connection::AnswerCallResponse YateH323Connection::OnAnswerCall(const PStrin
m->addParam("calledname",s);
#endif
H323MsgThread *t = new H323MsgThread(m,id());
if (t->error()) {
if (!t->startup()) {
Debug(DebugWarn,"Error starting routing thread! [%p]",this);
t->resume();
delete t;
setStatus("dropped");
return H323Connection::AnswerCallDenied;
}
t->resume();
return H323Connection::AnswerCallDeferred;
}
@ -1305,19 +1306,26 @@ YateH323Connection *H323Plugin::findConnectionLock(const char *id)
{
s_calls.lock();
ObjList *l = &m_calls;
for (; l; l=l->next()) {
while (l) {
YateH323Connection *c = static_cast<YateH323Connection *>(l->get());
l=l->next();
if (c && (c->id() == id)) {
if (c->TryLock() > 0) {
int res = c->TryLock();
if (res > 0) {
s_calls.unlock();
return c;
}
else {
// Yield and try scanning the list again
l = &m_calls;
else if (res < 0) {
// Connection locked - yield and try scanning the list again
s_calls.unlock();
Thread::yield();
s_calls.lock();
l = &m_calls;
}
else {
// Connection shutting down - we can't lock it anymore
s_calls.unlock();
return 0;
}
}
}
@ -1325,13 +1333,19 @@ YateH323Connection *H323Plugin::findConnectionLock(const char *id)
return 0;
}
bool H323Plugin::isBusy() const
{
return (m_calls.count() != 0);
}
void H323Plugin::initialize()
{
Output("Initializing module H.323 - based on OpenH323-" OPENH323_VERSION);
s_cfg = Engine::configFile("h323chan");
s_cfg.load();
if (!m_process)
if (!m_process){
m_process = new H323Process;
}
s_maxqueue = s_cfg.getIntValue("incoming","maxqueue",5);
s_maxconns = s_cfg.getIntValue("ep","maxconns",0);
int dbg=s_cfg.getIntValue("general","debug");

View File

@ -99,7 +99,7 @@ public:
YateIAXConnection *findconn(iax_session *session);
YateIAXConnection *findconn(const String& ourcallid);
void handleEvent(iax_event *event);
inline ObjList &calls()
{ return m_calls; }
private:
@ -138,6 +138,7 @@ public:
IAXPlugin();
virtual ~IAXPlugin();
virtual void initialize();
virtual bool isBusy() const;
void cleanup();
YateIAXEndPoint *m_endpoint;
private:
@ -995,6 +996,10 @@ IAXPlugin::~IAXPlugin()
cleanup();
}
bool IAXPlugin::isBusy() const
{
return m_endpoint && (m_endpoint->calls().count() != 0);
}
void IAXPlugin::initialize()
{
@ -1005,6 +1010,7 @@ void IAXPlugin::initialize()
if (!YateIAXEndPoint::Init())
return;
m_endpoint = new YateIAXEndPoint;
m_endpoint->startup();
}
YateIAXEndPoint::Setup();
if (m_first) {

View File

@ -115,6 +115,7 @@ class OssPlugin : public Plugin
public:
OssPlugin();
virtual void initialize();
virtual bool isBusy() const;
private:
OssHandler *m_handler;
};
@ -477,6 +478,11 @@ void OssPlugin::initialize()
}
}
bool OssPlugin::isBusy() const
{
return (s_chan != 0);
}
INIT_PLUGIN(OssPlugin);
/* vi: set ts=8 sw=4 sts=4 noet: */

View File

@ -77,6 +77,7 @@ public:
RManager();
~RManager();
virtual void initialize();
virtual bool isBusy() const;
private:
bool m_first;
};
@ -115,7 +116,9 @@ void RManagerThread::run()
Connection *Connection::checkCreate(int sock)
{
// should check IP address here
return new Connection(sock);
Connection *conn = new Connection(sock);
conn->startup();
return conn;
}
Connection::Connection(int sock)
@ -404,6 +407,11 @@ RManager::~RManager()
Debugger::setIntOut(0);
}
bool RManager::isBusy() const
{
return (connectionlist.count() != 0);
}
void RManager::initialize()
{
Output("Initializing module RManager");
@ -448,7 +456,8 @@ void RManager::initialize()
if (m_first) {
m_first = false;
Engine::self()->setHook(postHook);
new RManagerThread;
RManagerThread *mt = new RManagerThread;
mt->startup();
}
}

View File

@ -475,6 +475,7 @@ public:
ZaptelPlugin();
virtual ~ZaptelPlugin();
virtual void initialize();
virtual bool isBusy() const;
PriSpan *findSpan(int chan);
ZapChan *findChan(const char *id);
ZapChan *findChan(int first = -1, int last = -1);
@ -501,7 +502,9 @@ PriSpan *PriSpan::create(int span, int chan1, int nChans, int dChan, bool isNet,
::close(fd);
return 0;
}
return new PriSpan(p,span,chan1,nChans,dChan,fd,dialPlan,presentation);
PriSpan *ps = new PriSpan(p,span,chan1,nChans,dChan,fd,dialPlan,presentation);
ps->startup();
return ps;
}
struct pri *PriSpan::makePri(int fd, int dchan, int nettype, int swtype,
@ -1339,6 +1342,22 @@ ZapChan *ZaptelPlugin::findChan(int first, int last)
return 0;
}
bool ZaptelPlugin::isBusy() const
{
const ObjList *l = &m_spans;
for (; l; l=l->next()) {
PriSpan *s = static_cast<PriSpan *>(l->get());
if (s) {
for (int n=1; n<=s->chans(); n++) {
ZapChan *c = s->getChan(n);
if (c && c->inUse())
return true;
}
}
}
return false;
}
void ZaptelPlugin::initialize()
{
Output("Initializing module Zaptel");

View File

@ -5,7 +5,7 @@ CC := g++ -Wall
SED := sed
DEFS :=
INCLUDES := -I@top_srcdir@
CFLAGS := -O0 @MODULE_CFLAGS@
CFLAGS := -O0 @MODULE_CFLAGS@ @INLINE_FLAGS@
LDFLAGS:= -L.. -lyate
MODFLAGS:= @MODULE_LDFLAGS@
MODSTRIP:= @MODULE_SYMBOLS@

View File

@ -86,8 +86,10 @@ RandPlugin::RandPlugin()
void RandPlugin::initialize()
{
Output("Initializing module RandPlugin");
if (!m_thread)
if (!m_thread) {
m_thread = new RandThread;
m_thread->startup();
}
}
INIT_PLUGIN(RandPlugin);

View File

@ -87,7 +87,8 @@ void TestPlugin1::initialize()
m_first = false;
for (int i=0; i<n; i++) {
::usleep(10000);
new TestThread;
TestThread *t = new TestThread;
t->startup();
}
}
}

View File

@ -1590,6 +1590,12 @@ public:
*/
virtual void cleanup();
/**
* Actually starts running the new thread which lingers after creation
* @return True if an error occured, false if started ok
*/
bool startup();
/**
* Check if the thread creation failed
* @return True if an error occured, false if created ok
@ -1598,7 +1604,7 @@ public:
/**
* Check if the thread is running or not
* @return True if running, false if it has terminated
* @return True if running, false if it has terminated or no startup called
*/
bool running() const;
@ -1774,6 +1780,13 @@ public:
* Initialize the plugin after it was loaded and registered.
*/
virtual void initialize() = 0;
/**
* Check if the module is actively used.
* @return True if the plugin is in use, false if should be ok to restart
*/
virtual bool isBusy() const
{ return false; }
};
/**
@ -1922,6 +1935,12 @@ public:
inline void setHook(void (*hookFunc)(Message &, bool) = 0)
{ m_dispatcher.setHook(hookFunc); }
/**
* Get a count of plugins that are actively in use
* @return Count of plugins in use
*/
int usedPlugins();
protected:
/**
* Destroys the engine and everything. You must not call it directly,

View File

@ -324,9 +324,10 @@ public:
virtual ~ThreadedSource();
/**
* Start the worker thread
* Starts the worker thread
* @return True if started, false if an error occured
*/
void start(const char *name = "ThreadedSource");
bool start(const char *name = "ThreadedSource");
protected:
/**