Status report in callgen and ysipchan.

Extended thread termination wait time.


git-svn-id: http://yate.null.ro/svn/yate/trunk@226 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
paulc 2005-01-21 22:24:54 +00:00
parent 8e0bb2402f
commit 83f95d86bc
3 changed files with 122 additions and 47 deletions

View File

@ -203,7 +203,7 @@ void ThreadPrivate::killall()
bool ok = t->cancel(); bool ok = t->cancel();
if (ok) { if (ok) {
// delay a little so threads have a chance to clean up // delay a little so threads have a chance to clean up
for (int i=0; i<5; i++) { for (int i=0; i<1000; i++) {
tmutex.lock(); tmutex.lock();
bool done = (t != l->get()); bool done = (t != l->get());
tmutex.unlock(); tmutex.unlock();

View File

@ -46,10 +46,10 @@ static const char s_help[] = "callgen {start|stop|drop|pause|resume|single|info|
class GenConnection : public DataEndpoint class GenConnection : public DataEndpoint
{ {
public: public:
GenConnection(); GenConnection(const String& callto);
~GenConnection(); ~GenConnection();
virtual const String& toString() const virtual const String& toString() const
{ return m_id; } { return m_id; }
virtual void disconnected(bool final, const char *reason); virtual void disconnected(bool final, const char *reason);
void ringing(); void ringing();
void answered(); void answered();
@ -59,10 +59,12 @@ public:
{ return m_id; } { return m_id; }
inline const String& status() const inline const String& status() const
{ return m_status; } { return m_status; }
inline const String& party() const
{ return m_callto; }
inline void setTarget(const char *target = 0) inline void setTarget(const char *target = 0)
{ m_target = target; } { m_target = target; }
inline const String& getTarget() const inline const String& getTarget() const
{ return m_target; } { return m_target; }
inline unsigned long long age() const inline unsigned long long age() const
{ return Time::now() - m_start; } { return Time::now() - m_start; }
static GenConnection* find(const String& id); static GenConnection* find(const String& id);
@ -70,6 +72,7 @@ public:
private: private:
String m_id; String m_id;
String m_status; String m_status;
String m_callto;
String m_target; String m_target;
unsigned long long m_start; unsigned long long m_start;
}; };
@ -87,9 +90,9 @@ class ConnHandler : public MessageReceiver
{ {
public: public:
enum { enum {
Ringing, Ringing,
Answered, Answered,
Execute, Execute,
Drop, Drop,
}; };
virtual bool received(Message &msg, int id); virtual bool received(Message &msg, int id);
@ -118,9 +121,11 @@ private:
bool m_first; bool m_first;
}; };
GenConnection::GenConnection() GenConnection::GenConnection(const String& callto)
: m_callto(callto)
{ {
m_start = Time::now(); m_start = Time::now();
m_status = "calling";
s_mutex.lock(); s_mutex.lock();
s_calls.append(this); s_calls.append(this);
m_id << "callgen/" << ++s_total; m_id << "callgen/" << ++s_total;
@ -130,6 +135,7 @@ GenConnection::GenConnection()
GenConnection::~GenConnection() GenConnection::~GenConnection()
{ {
m_status = "destroyed";
s_mutex.lock(); s_mutex.lock();
s_calls.remove(this,false); s_calls.remove(this,false);
--s_current; --s_current;
@ -176,7 +182,7 @@ bool GenConnection::oneCall(String* target)
} }
m = "call.execute"; m = "call.execute";
m.addParam("callto",callto); m.addParam("callto",callto);
GenConnection* conn = new GenConnection; GenConnection* conn = new GenConnection(callto);
m.addParam("id",conn->id()); m.addParam("id",conn->id());
m.userData(conn); m.userData(conn);
if (Engine::dispatch(m)) { if (Engine::dispatch(m)) {
@ -198,11 +204,13 @@ bool GenConnection::oneCall(String* target)
void GenConnection::disconnected(bool final, const char *reason) void GenConnection::disconnected(bool final, const char *reason)
{ {
Debug("CallGen",DebugInfo,"Disconnected '%s' reason '%s' [%p]",m_id.c_str(),reason,this); Debug("CallGen",DebugInfo,"Disconnected '%s' reason '%s' [%p]",m_id.c_str(),reason,this);
m_status = "disconnected";
} }
void GenConnection::ringing() void GenConnection::ringing()
{ {
Debug("CallGen",DebugInfo,"Ringing '%s' [%p]",m_id.c_str(),this); Debug("CallGen",DebugInfo,"Ringing '%s' [%p]",m_id.c_str(),this);
m_status = "ringing";
s_mutex.lock(); s_mutex.lock();
++s_ringing; ++s_ringing;
bool media =s_cfg.getBoolValue("parameters","earlymedia",true); bool media =s_cfg.getBoolValue("parameters","earlymedia",true);
@ -214,6 +222,7 @@ void GenConnection::ringing()
void GenConnection::answered() void GenConnection::answered()
{ {
Debug("CallGen",DebugInfo,"Answered '%s' [%p]",m_id.c_str(),this); Debug("CallGen",DebugInfo,"Answered '%s' [%p]",m_id.c_str(),this);
m_status = "answered";
s_mutex.lock(); s_mutex.lock();
++s_answers; ++s_answers;
s_mutex.unlock(); s_mutex.unlock();
@ -252,10 +261,10 @@ bool ConnHandler::received(Message &msg, int id)
} }
String text(msg.getValue("text")); String text(msg.getValue("text"));
switch (id) { switch (id) {
case Answered: case Answered:
conn->answered(); conn->answered();
break; break;
case Ringing: case Ringing:
conn->ringing(); conn->ringing();
break; break;
case Execute: case Execute:
@ -379,18 +388,34 @@ bool CmdHandler::received(Message &msg, int id)
{ {
String tmp; String tmp;
switch (id) { switch (id) {
case Status: case Status:
tmp = msg.getValue("module"); tmp = msg.getValue("module");
if (tmp.null() || (tmp == "callgen")) { if (tmp.null() || (tmp == "callgen")) {
msg.retValue() << "name=callgen,type=misc;total=" << s_total s_mutex.lock();
<< ",current=" << s_current msg.retValue() << "name=callgen,type=varchans,format=Status|Callto"
<< ";total=" << s_total
<< ",ring=" << s_ringing << ",ring=" << s_ringing
<< ",answered=" << s_answers << "\n"; << ",answered=" << s_answers
<< ",chans=" << s_current << ";";
ObjList *l = &s_calls;
bool first = true;
for (; l; l=l->next()) {
GenConnection *c = static_cast<GenConnection *>(l->get());
if (c) {
if (first)
first = false;
else
msg.retValue() << ",";
msg.retValue() << c->id() << "=" << c->status() << "|" << c->party();
}
}
msg.retValue() << "\n";
s_mutex.unlock();
if (tmp) if (tmp)
return true; return true;
} }
break; break;
case Command: case Command:
tmp = msg.getValue("line"); tmp = msg.getValue("line");
if (tmp.startSkip("callgen")) if (tmp.startSkip("callgen"))
return doCommand(tmp,msg.retValue()); return doCommand(tmp,msg.retValue());

View File

@ -81,6 +81,20 @@ public:
virtual bool received(Message &msg); virtual bool received(Message &msg);
}; };
class HaltHandler : public MessageHandler
{
public:
HaltHandler(const char *name) : MessageHandler(name) { }
virtual bool received(Message &msg);
};
class StatusHandler : public MessageHandler
{
public:
StatusHandler(const char *name) : MessageHandler(name) { }
virtual bool received(Message &msg);
};
class SIPConnHandler : public MessageReceiver class SIPConnHandler : public MessageReceiver
{ {
public: public:
@ -165,21 +179,25 @@ public:
void reInvite(SIPTransaction* t); void reInvite(SIPTransaction* t);
void hangup(); void hangup();
inline String id() const inline String id() const
{ return "sip/" + m_id; } { return "sip/" + m_id; }
inline const SIPDialog& dialog() const inline const SIPDialog& dialog() const
{ return m_id; } { return m_id; }
inline const String& status() const inline const String& status() const
{ return m_status; } { return m_status; }
inline void setStatus(const char *status, int state = -1) inline void setStatus(const char *status, int state = -1)
{ m_status = status; if (state >= 0) m_state = state; } { m_status = status; if (state >= 0) m_state = state; }
inline void setReason(const char* str = "Request Terminated", int code = 487) inline void setReason(const char* str = "Request Terminated", int code = 487)
{ m_reason = str; m_reasonCode = code; } { m_reason = str; m_reasonCode = code; }
inline void setTarget(const char *target = 0) inline void setTarget(const char *target = 0)
{ m_target = target; } { m_target = target; }
inline const String& getTarget() const inline const String& getTarget() const
{ return m_target; } { return m_target; }
inline SIPTransaction* getTransaction() const inline SIPTransaction* getTransaction() const
{ return m_tr; } { return m_tr; }
inline const String& getHost() const
{ return m_host; }
inline int getPort() const
{ return m_port; }
static YateSIPConnection* find(const String& id); static YateSIPConnection* find(const String& id);
static YateSIPConnection* find(const SIPDialog& id); static YateSIPConnection* find(const SIPDialog& id);
private: private:
@ -220,9 +238,9 @@ public:
virtual void cleanup(); virtual void cleanup();
bool route(); bool route();
inline static int count() inline static int count()
{ return s_count; } { return s_count; }
inline static int routed() inline static int routed()
{ return s_routed; } { return s_routed; }
private: private:
SIPTransaction* m_tr; SIPTransaction* m_tr;
Message* m_msg; Message* m_msg;
@ -489,7 +507,7 @@ void YateSIPEndPoint::run ()
FD_ZERO(&fds); FD_ZERO(&fds);
FD_SET(m_netfd, &fds); FD_SET(m_netfd, &fds);
/* Wait up to 20000 microseconds. */ /* Wait up to 20000 microseconds. */
tv.tv_sec = 0; tv.tv_sec = 0;
tv.tv_usec = 20000; tv.tv_usec = 20000;
retval = select(m_netfd+1, &fds, NULL, NULL, &tv); retval = select(m_netfd+1, &fds, NULL, NULL, &tv);
@ -562,9 +580,9 @@ static int s_maxqueue = 5;
void YateSIPEndPoint::invite(SIPEvent* e, SIPTransaction* t) void YateSIPEndPoint::invite(SIPEvent* e, SIPTransaction* t)
{ {
if (Engine::exiting()) { if (Engine::exiting()) {
Debug(DebugWarn,"Dropping call, engine is exiting"); Debug(DebugWarn,"Dropping call, engine is exiting");
e->getTransaction()->setResponse(500, "Server Shutting Down"); e->getTransaction()->setResponse(500, "Server Shutting Down");
return; return;
} }
if (e->getMessage()->getParam("To","tag")) { if (e->getMessage()->getParam("To","tag")) {
@ -576,14 +594,14 @@ void YateSIPEndPoint::invite(SIPEvent* e, SIPTransaction* t)
Debug(DebugWarn,"Got re-INVITE for missing dialog"); Debug(DebugWarn,"Got re-INVITE for missing dialog");
e->getTransaction()->setResponse(481, "Call/Transaction Does Not Exist"); e->getTransaction()->setResponse(481, "Call/Transaction Does Not Exist");
} }
return; return;
} }
int cnt = SipMsgThread::count(); int cnt = SipMsgThread::count();
if (cnt > s_maxqueue) { if (cnt > s_maxqueue) {
Debug(DebugWarn,"Dropping call, there are already %d waiting",cnt); Debug(DebugWarn,"Dropping call, there are already %d waiting",cnt);
e->getTransaction()->setResponse(503, "Service Unavailable"); e->getTransaction()->setResponse(503, "Service Unavailable");
return; return;
} }
String callid(t->getCallID()); String callid(t->getCallID());
@ -615,8 +633,8 @@ void YateSIPEndPoint::invite(SIPEvent* e, SIPTransaction* t)
} }
SipMsgThread *thr = new SipMsgThread(t,m); SipMsgThread *thr = new SipMsgThread(t,m);
if (!thr->startup()) { if (!thr->startup()) {
Debug(DebugWarn,"Error starting routing thread %p ! [%p]",thr,this); Debug(DebugWarn,"Error starting routing thread %p ! [%p]",thr,this);
delete thr; delete thr;
t->setResponse(500, "Server Internal Error"); t->setResponse(500, "Server Internal Error");
} }
} }
@ -742,7 +760,7 @@ void YateSIPConnection::hangup()
msg->addParam("driver","sip"); msg->addParam("driver","sip");
msg->addParam("id",id()); msg->addParam("id",id());
if (m_target) if (m_target)
msg->addParam("targetid",m_target); msg->addParam("targetid",m_target);
Engine::enqueue(msg); Engine::enqueue(msg);
msg = 0; msg = 0;
switch (m_state) { switch (m_state) {
@ -1075,13 +1093,13 @@ bool SipMsgThread::route()
return false; return false;
} }
if (ok) { if (ok) {
*m_msg = "call.execute"; *m_msg = "call.execute";
m_msg->addParam("callto",m_msg->retValue()); m_msg->addParam("callto",m_msg->retValue());
m_msg->retValue().clear(); m_msg->retValue().clear();
YateSIPConnection* conn = new YateSIPConnection(*m_msg,m_tr); YateSIPConnection* conn = new YateSIPConnection(*m_msg,m_tr);
m_msg->userData(conn); m_msg->userData(conn);
if (Engine::dispatch(m_msg)) { if (Engine::dispatch(m_msg)) {
Debug(DebugInfo,"Routing SIP call %s (%p) to '%s' [%p]", Debug(DebugInfo,"Routing SIP call %s (%p) to '%s' [%p]",
m_id.c_str(),m_tr,m_msg->getValue("callto"),this); m_id.c_str(),m_tr,m_msg->getValue("callto"),this);
conn->setStatus("routed"); conn->setStatus("routed");
conn->setTarget(m_msg->getValue("targetid")); conn->setTarget(m_msg->getValue("targetid"));
@ -1123,7 +1141,7 @@ void SipMsgThread::run()
s_route.lock(); s_route.lock();
s_count--; s_count--;
if (ok) if (ok)
s_routed++; s_routed++;
s_route.unlock(); s_route.unlock();
} }
@ -1141,8 +1159,8 @@ bool SIPHandler::received(Message &msg)
if (!dest.startSkip("sip/",false)) if (!dest.startSkip("sip/",false))
return false; return false;
if (!msg.userData()) { if (!msg.userData()) {
Debug(DebugWarn,"SIP call found but no data channel!"); Debug(DebugWarn,"SIP call found but no data channel!");
return false; return false;
} }
YateSIPConnection* conn = new YateSIPConnection(msg,dest); YateSIPConnection* conn = new YateSIPConnection(msg,dest);
if (conn->getTransaction()) { if (conn->getTransaction()) {
@ -1202,6 +1220,38 @@ bool SIPConnHandler::received(Message &msg, int id)
return true; return true;
} }
bool StatusHandler::received(Message &msg)
{
const char *sel = msg.getValue("module");
if (sel && ::strcmp(sel,"ysipchan") && ::strcmp(sel,"varchans"))
return false;
Lock lock(s_mutex);
String st("name=ysipchan,type=varchans,format=Status|Caller");
st << ";chans=" << s_calls.count() << ";";
ObjList *l = &s_calls;
bool first = true;
for (; l; l=l->next()) {
YateSIPConnection *c = static_cast<YateSIPConnection *>(l->get());
if (c) {
if (first)
first = false;
else
st << ",";
st << c->id() << "=" << c->status() << "|" << c->getHost() << ":" << c->getPort();
}
}
msg.retValue() << st << "\n";
return false;
}
bool HaltHandler::received(Message &msg)
{
// Clear calls early - give the endpoint a chance to do only minimal
// processing later in the destructor
s_calls.clear();
return false;
}
SIPPlugin::SIPPlugin() SIPPlugin::SIPPlugin()
: m_handler(0), m_endpoint(0) : m_handler(0), m_endpoint(0)
{ {
@ -1220,12 +1270,11 @@ void SIPPlugin::initialize()
s_cfg.load(); s_cfg.load();
if (!m_endpoint) { if (!m_endpoint) {
m_endpoint = new YateSIPEndPoint(); m_endpoint = new YateSIPEndPoint();
if(!(m_endpoint->Init())) if (!(m_endpoint->Init())) {
{ delete m_endpoint;
delete m_endpoint; m_endpoint = 0;
m_endpoint = 0; return;
return; }
}
else else
m_endpoint->startup(); m_endpoint->startup();
} }
@ -1235,7 +1284,8 @@ void SIPPlugin::initialize()
Engine::install(new MessageRelay("call.answered",m_handler,SIPConnHandler::Answered)); Engine::install(new MessageRelay("call.answered",m_handler,SIPConnHandler::Answered));
Engine::install(new MessageRelay("call.drop",m_handler,SIPConnHandler::Drop)); Engine::install(new MessageRelay("call.drop",m_handler,SIPConnHandler::Drop));
Engine::install(new SIPHandler("call.execute")); Engine::install(new SIPHandler("call.execute"));
// Engine::install(new StatusHandler("engine.status")); Engine::install(new HaltHandler("engine.halt"));
Engine::install(new StatusHandler("engine.status"));
} }
} }