Guard hungup channels for a while to reduce problems with chan.hangup arriving before other channel messages.

Added a warning when detecting conditions that can cause billing errors.


git-svn-id: http://voip.null.ro/svn/yate@2626 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
paulc 2009-05-12 17:24:41 +00:00
parent e7eb83ff62
commit eaba0c7996
2 changed files with 91 additions and 23 deletions

View File

@ -3,6 +3,9 @@
; resolution: keyword: Resolution for time representation: sec, msec or usec
;resolution=msec
; guardtime: int: Time in ms to remember hungup channels to avoid race conditions
;guardtime=5000
[parameters]
; Each line consists of name=bool where name is the name of the parameter being

View File

@ -59,6 +59,7 @@ public:
virtual bool received(Message &msg);
};
// Collects CDR information and emits the call.cdr messages when needed
class CdrBuilder : public NamedList
{
public:
@ -83,6 +84,7 @@ private:
bool m_write;
};
// CDR extra parameter name with an overwrite flag
class Param : public String
{
public:
@ -97,7 +99,28 @@ private:
bool m_overwrite;
};
// Temporarily keep the ID of hungup channels to prevent race issues
class Hungup : public String
{
public:
inline Hungup(const String& id, bool emitHangup)
: String(id),
m_hangup(emitHangup), m_expires(Time::now() + s_exp)
{ DDebug("cdrbuild",DebugInfo,"Hungup '%s'",id.c_str()); }
inline u_int64_t expires() const
{ return m_expires; }
inline bool hangup()
{ return m_hangup && !(m_hangup = false); }
static u_int64_t s_exp;
private:
bool m_hangup;
u_int64_t m_expires;
};
static ObjList s_cdrs;
static ObjList s_hungup;
u_int64_t Hungup::s_exp = 5000000;
// This mutex protects both the CDR list and the params list
static Mutex s_mutex(false,"CdrBuild");
@ -145,6 +168,7 @@ static const char* const s_forbidden[] = {
0
};
// Print time with configured resolution
static const char* printTime(char* buf,u_int64_t usec)
{
switch (s_res) {
@ -165,6 +189,19 @@ static const char* printTime(char* buf,u_int64_t usec)
return buf;
}
// Expire hungup guard records
static void expireHungup()
{
Time t;
while (Hungup* h = static_cast<Hungup*>(s_hungup.get())) {
if (h->expires() > t.usec())
return;
DDebug("cdrbuild",DebugInfo,"Expiring hungup guard for '%s'",h->c_str());
s_hungup.remove(h);
}
}
CdrBuilder::CdrBuilder(const char *name)
: NamedList(name), m_dir("unknown"), m_status("unknown"),
m_first(true), m_write(true)
@ -181,6 +218,8 @@ CdrBuilder::~CdrBuilder()
addParam("reason","CDR shutdown");
}
emit("finalize");
if (Hungup::s_exp && !s_hungup.find(*this))
s_hungup.append(new Hungup(*this,false));
}
void CdrBuilder::emit(const char *operation)
@ -195,6 +234,8 @@ void CdrBuilder::emit(const char *operation)
m_start = t_call;
if (!t_call)
t_call = m_start;
if (!t_call)
t_call = m_start = t_hangup;
if (!t_ringing)
t_ringing = t_call;
if (!t_answer)
@ -209,7 +250,7 @@ void CdrBuilder::emit(const char *operation)
operation = m_first ? "initialize" : "update";
m_first = false;
DDebug("cdrbuild",DebugInfo,"Emit '%s' for '%s' status '%s'",
DDebug("cdrbuild",DebugAll,"Emit '%s' for '%s' status '%s'",
operation,c_str(),m_status.c_str());
char buf[64];
Message *m = new Message("call.cdr");
@ -294,9 +335,7 @@ bool CdrBuilder::update(const Message& msg, int type, u_int64_t val)
if (reason)
setParam("reason",reason);
}
s_mutex.lock();
s_cdrs.remove(this);
s_mutex.unlock();
return true;
}
// cdrwrite must be consistent over all emitted messages so we read it once
@ -318,12 +357,10 @@ bool CdrBuilder::update(const Message& msg, int type, u_int64_t val)
m_dir = *s;
else {
// search the parameter
Lock lock(s_mutex);
Param* p = static_cast<Param*>(s_params[s->name()]);
if (!p)
continue;
bool overwrite = p->overwrite();
lock.drop();
NamedString* str = getParam(s->name());
// parameter is not yet stored - store a copy
if (!str)
@ -337,9 +374,7 @@ bool CdrBuilder::update(const Message& msg, int type, u_int64_t val)
update(type,val);
if (type == CdrHangup) {
s_mutex.lock();
s_cdrs.remove(this);
s_mutex.unlock();
// object is now destroyed, "this" no longer valid
return false;
}
@ -355,14 +390,10 @@ CdrBuilder* CdrBuilder::find(String &id)
bool CdrHandler::received(Message &msg)
{
// this mutex serializes all CDR building
static Mutex mutex(false,"CdrBuild::handler");
Lock lock(mutex);
Lock lock(s_mutex);
if (m_type == EngHalt) {
s_mutex.lock();
unsigned int n = s_cdrs.count();
s_cdrs.clear();
s_mutex.unlock();
if (n)
Debug("cdrbuild",DebugWarn,"Forcibly finalized %u CDR records.",n);
return false;
@ -390,23 +421,50 @@ bool CdrHandler::received(Message &msg)
return false;
}
bool rval = false;
s_mutex.lock();
int type = m_type;
int level = DebugInfo;
CdrBuilder *b = CdrBuilder::find(id);
if (!b && ((m_type == CdrStart) || (m_type == CdrCall))) {
b = new CdrBuilder(id);
s_cdrs.append(b);
if (!b) {
switch (type) {
case CdrStart:
case CdrCall:
case CdrAnswer:
{
expireHungup();
Hungup* h = static_cast<Hungup*>(s_hungup[id]);
if (h) {
if (h->hangup())
// seen hangup but not emitted call.cdr - do it now
type = CdrHangup;
else {
// post-hangup answer cause billing problems so warn
level = (CdrAnswer == type) ? DebugWarn : DebugNote;
break;
}
}
}
b = new CdrBuilder(id);
s_cdrs.append(b);
break;
case CdrHangup:
expireHungup();
if (Hungup::s_exp && !s_hungup.find(id))
// remember to emit a finalize if we ever see a startup
s_hungup.append(new Hungup(id,true));
else
level = DebugMild;
break;
}
}
s_mutex.unlock();
if (b)
rval = b->update(msg,m_type,msg.msgTime().usec());
rval = b->update(msg,type,msg.msgTime().usec());
else
Debug("cdrbuild",((CdrHangup == m_type) ? DebugMild : DebugInfo),
"Got message '%s' for untracked id '%s'",
Debug("cdrbuild",level,"Got message '%s' for untracked id '%s'",
msg.c_str(),id.c_str());
if ((m_type == CdrRinging) || (m_type == CdrAnswer)) {
if ((type == CdrRinging) || (type == CdrAnswer)) {
id = msg.getValue("peerid");
if (id && (b = CdrBuilder::find(id))) {
b->update(m_type,msg.msgTime().usec());
b->update(type,msg.msgTime().usec());
b->emit();
}
}
@ -420,7 +478,8 @@ bool StatusHandler::received(Message &msg)
return false;
String st("name=cdrbuild,type=cdr,format=Status|Caller|Called|Duration");
s_mutex.lock();
st << ";cdrs=" << s_cdrs.count();
expireHungup();
st << ";cdrs=" << s_cdrs.count() << ",hungup=" << s_hungup.count();
if (msg.getBoolValue("details",true)) {
st << ";";
ObjList *l = &s_cdrs;
@ -468,7 +527,13 @@ void CdrBuildPlugin::initialize()
Output("Initializing module CdrBuild");
Configuration cfg(Engine::configFile("cdrbuild"));
s_res = cfg.getIntValue("general","resolution",s_timeRes,1);
int exp = cfg.getIntValue("general","guardtime",5000);
if (exp < 0)
exp = 0;
else if (exp > 600000)
exp = 600000;
s_mutex.lock();
Hungup::s_exp = 1000 * (u_int64_t)exp;
s_params.clear();
const struct _params* params = s_defParams;
for (; params->name; params++)