Added module that creates a combined CDR of all legs of a call.

Modified cdrfile and register to be able to write combined CDR.


git-svn-id: http://yate.null.ro/svn/yate/trunk@5382 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
paulc 2013-01-25 11:45:58 +00:00
parent 164a1df870
commit 598981ef54
8 changed files with 413 additions and 19 deletions

View File

@ -7,11 +7,22 @@
; tabs: bool: Use tab-separated instead of comma-separated if format is missing
;tabs=true
; combined: bool: Use combined CDR for all legs of a call
;combined=false
; format: string: Custom format to use, overrides default. Each ${parameter}
; is replaced with the value of that parameter in the call.cdr message
; tab-separated (.tsv)
; Below are listed the format defaults for each mode combination
; tab-separated (.tsv), combined=false
;format=${time} ${billid} ${chan} ${address} ${caller} ${called} ${billtime} ${ringtime} ${duration} ${direction} ${status} ${reason}
; comma-separated (.csv)
; tab-separated (.tsv), combined=true
;format=${time} ${billid} ${chan} ${address} ${caller} ${called} ${billtime} ${ringtime} ${duration} ${status} ${reason} ${out_leg.chan} ${out_leg.address} ${out_leg.billtime} ${out_leg.ringtime} ${out_leg.duration} ${out_leg.reason}
; comma-separated (.csv), combined=false
;format=${time},"${billid}","${chan}","${address}","${caller}","${called}",${billtime},${ringtime},${duration},"${direction}","${status}","${reason}"
; comma-separated (.csv), combined=true
;format=${time},"${billid}","${chan}","${address}","${caller}","${called}",${billtime},${ringtime},${duration},"${status}","${reason}","${out_leg.chan}","${out_leg.address}",${out_leg.billtime},${out_leg.ringtime},${out_leg.duration},"${out_leg.reason}"

View File

@ -127,9 +127,26 @@
;critical=yes
;initquery=UPDATE cdr SET ended=true WHERE ended IS NULL OR NOT ended
;cdr_initialize=INSERT INTO cdr VALUES(TIMESTAMP 'EPOCH' + INTERVAL '${time} s','${chan}','${address}','${direction}','${billid}','${caller}','${called}',INTERVAL '${duration} s',INTERVAL '${billtime} s',INTERVAL '${ringtime} s','${status}','${reason}',false)
;cdr_update=UPDATE cdr SET address='${address}',direction='${direction}',billid='${billid}',caller='${caller}',called='${called}',duration=INTERVAL '${duration} s',billtime=INTERVAL '${billtime} s',ringtime=INTERVAL '${ringtime} s',status='${status}',reason='${reason}' WHERE chan='${chan}' AND time=TIMESTAMP 'EPOCH' + INTERVAL '${time} s'
;cdr_finalize=UPDATE cdr SET address='${address}',direction='${direction}',billid='${billid}',caller='${caller}',called='${called}',duration=INTERVAL '${duration} s',billtime=INTERVAL '${billtime} s',ringtime=INTERVAL '${ringtime} s',status='${status}',reason='${reason}',ended=true WHERE chan='${chan}' AND time=TIMESTAMP 'EPOCH' + INTERVAL '${time} s'
;cdr_initialize=INSERT INTO cdr VALUES(TIMESTAMP 'EPOCH' + INTERVAL '${time} s','${chan}',\
; '${address}','${direction}','${billid}','${caller}','${called}',INTERVAL '${duration} s',\
; INTERVAL '${billtime} s',INTERVAL '${ringtime} s','${status}','${reason}',false)
;cdr_update=UPDATE cdr SET address='${address}',direction='${direction}',billid='${billid}',\
; caller='${caller}',called='${called}',duration=INTERVAL '${duration} s',\
; billtime=INTERVAL '${billtime} s',ringtime=INTERVAL '${ringtime} s',status='${status}',\
; reason='${reason}' WHERE chan='${chan}' AND time=TIMESTAMP 'EPOCH' + INTERVAL '${time} s'
;cdr_finalize=UPDATE cdr SET address='${address}',direction='${direction}',\
; billid='${billid}',caller='${caller}',called='${called}',duration=INTERVAL '${duration} s',\
; billtime=INTERVAL '${billtime} s',ringtime=INTERVAL '${ringtime} s',status='${status}',\
; reason='${reason}',ended=true WHERE chan='${chan}' AND time=TIMESTAMP 'EPOCH' + INTERVAL '${time} s'
;cdr_combined=INSERT INTO cdr VALUES(TIMESTAMP 'EPOCH' + INTERVAL '${time} s','${chan}',\
; '${address}','${caller}','${called}',INTERVAL '${duration} s',INTERVAL '${billtime} s',\
; INTERVAL '${ringtime} s','${status}','${reason}','${out_leg.chan}','${out_leg.address}',\
; INTERVAL '${out_leg.billtime} s',INTERVAL '${out_leg.ringtime} s',\
; INTERVAL '${out_leg.duration} s','${out_leg.reason}'
[linetracker]

View File

@ -49,7 +49,7 @@ JUSTSIG := server/ysigchan.yate server/analog.yate \
SUBDIRS :=
MKDEPS := ../config.status
PROGS := cdrbuild.yate cdrfile.yate regexroute.yate \
PROGS := cdrbuild.yate cdrcombine.yate cdrfile.yate regexroute.yate \
tonegen.yate tonedetect.yate wavefile.yate \
extmodule.yate conference.yate moh.yate pbx.yate \
dumbchan.yate callfork.yate mux.yate \

334
modules/cdrcombine.cpp Normal file
View File

@ -0,0 +1,334 @@
/**
* cdrcombine.cpp
* This file is part of the YATE Project http://YATE.null.ro
*
* Combined CDR builder
*
* Yet Another Telephony Engine - a fully featured software PBX and IVR
* Copyright (C) 2013 Null Team
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include <yatengine.h>
using namespace TelEngine;
namespace { // anonymous
#define PLUGIN "cdrcombine"
class CdrCombinePlugin : public Plugin
{
public:
CdrCombinePlugin();
virtual ~CdrCombinePlugin();
virtual void initialize();
private:
bool m_first;
};
INIT_PLUGIN(CdrCombinePlugin);
class CdrHandler : public MessageHandler
{
public:
CdrHandler()
: MessageHandler("call.cdr",100,__plugin.name())
{ }
virtual bool received(Message &msg);
};
class StatusHandler : public MessageHandler
{
public:
StatusHandler()
: MessageHandler("engine.status",100,__plugin.name())
{ }
virtual bool received(Message &msg);
};
class CommandHandler : public MessageHandler
{
public:
CommandHandler()
: MessageHandler("engine.command",100,__plugin.name())
{ }
virtual bool received(Message &msg);
};
class CdrParams : public NamedList
{
public:
explicit inline CdrParams(const char* name)
: NamedList(name), m_inuse(true)
{ }
inline bool inUse() const
{ return m_inuse; }
inline void finalize()
{ m_inuse = false; }
void setParams(const NamedList& src, bool outgoing);
private:
static bool isForbidden(const String& name, const String* list);
bool m_inuse;
};
class CdrCombiner : public CdrParams
{
public:
explicit inline CdrCombiner(const char* billid)
: CdrParams(billid)
{ }
void updateInit(const NamedList& params, const String& chan);
bool updateFini(const NamedList& params, const String& chan);
void getStatus(String& str) const;
private:
CdrParams* updateParams(const NamedList& params, const String& chan);
void emitMessage() const;
ObjList m_out;
};
// List of CDRs in progress
static ObjList s_cdrs;
// This mutex protects the CDR list
static Mutex s_mutex(false,"CdrCombine");
// Non-copiable parameters for all call legs
static const String s_forbidden[] = {
"operation",
"direction",
"billid",
"cdrtrack",
"cdrcreate",
"cdrid",
""
};
// Extra non-copiable parameters for outgoing call legs
static const String s_forbidden2[] = {
"nodename",
"nodeprefix",
"cdrwrite",
"runid",
""
};
void CdrParams::setParams(const NamedList& src, bool outgoing)
{
DDebug(PLUGIN,DebugAll,"Setting params of %s leg '%s'",(outgoing ? "outgoing" : "incoming"),c_str());
NamedIterator iter(src);
while (const NamedString* p = iter.get()) {
if (p->name() == Engine::trackParam())
continue;
if (isForbidden(p->name(),s_forbidden))
continue;
if (outgoing && isForbidden(p->name(),s_forbidden2))
continue;
setParam(p->name(),*p);
}
}
bool CdrParams::isForbidden(const String& name, const String* list)
{
for (; !TelEngine::null(list); list++) {
if (name == *list)
return true;
}
return false;
}
CdrParams* CdrCombiner::updateParams(const NamedList& params, const String& chan)
{
const String* ch = getParam(YSTRING("chan"));
if (ch) {
if (chan == *ch) {
setParams(params,false);
return this;
}
}
else if (params[YSTRING("direction")] == YSTRING("incoming")) {
setParams(params,false);
return this;
}
CdrParams* c = static_cast<CdrParams*>(m_out[chan]);
if (!c) {
DDebug(PLUGIN,DebugAll,"Creating CdrParams for '%s' in '%s'",chan.c_str(),c_str());
c = new CdrParams(chan);
m_out.insert(c);
}
c->setParams(params,true);
return c;
}
void CdrCombiner::updateInit(const NamedList& params, const String& chan)
{
updateParams(params,chan);
}
bool CdrCombiner::updateFini(const NamedList& params, const String& chan)
{
updateParams(params,chan)->finalize();
// check if this CDR is finalized or not
if (inUse())
return false;
for (ObjList* l = m_out.skipNull(); l; l = l->skipNext()) {
if (static_cast<const CdrParams*>(l->get())->inUse())
return false;
}
// all legs are no longer in use - emit message and be destroyed
emitMessage();
return true;
}
void CdrCombiner::emitMessage() const
{
Message* m = new Message("call.cdr",0,true);
m->addParam("operation","combined");
m->addParam("billid",c_str());
m->copyParams(*this);
int count = 0;
for (ObjList* l = m_out.skipNull(); l; l = l->skipNext(), count++) {
String prefix("out_leg");
if (count)
prefix << "." << count;
prefix << ".";
NamedIterator iter(*static_cast<const CdrParams*>(l->get()));
while (const NamedString* p = iter.get())
m->addParam(prefix + p->name(),*p);
}
Engine::enqueue(m);
}
void CdrCombiner::getStatus(String& str) const
{
str << c_str()
<< "=" << getValue(YSTRING("chan")) << "|" << getValue(YSTRING("caller"))
<< "|" << getValue(YSTRING("called")) << "|" << getValue(YSTRING("address"))
<< "|" << m_out.count();
}
bool CdrHandler::received(Message &msg)
{
const String* op = msg.getParam(YSTRING("operation"));
if (TelEngine::null(op))
return false;
bool init = false;
if (*op == YSTRING("initialize"))
init = true;
else if (*op != YSTRING("finalize"))
return false;
const String& billid = msg[YSTRING("billid")];
if (billid.null())
return false;
const String& chan = msg[YSTRING("chan")];
if (chan.null())
return false;
s_mutex.lock();
if (init) {
CdrCombiner* c = static_cast<CdrCombiner*>(s_cdrs[billid]);
if (!c) {
DDebug(PLUGIN,DebugInfo,"Creating CdrCombiner for '%s'",billid.c_str());
c = new CdrCombiner(billid);
s_cdrs.append(c);
}
c->updateInit(msg,chan);
}
else {
CdrCombiner* c = static_cast<CdrCombiner*>(s_cdrs[billid]);
if (c) {
if (c->updateFini(msg,chan)) {
DDebug(PLUGIN,DebugInfo,"Removing CdrCombiner for '%s'",billid.c_str());
s_cdrs.remove(c);
}
}
else
Debug(chan.c_str(),DebugWarn,"CDR finalize without combiner for '%s'",billid.c_str());
}
s_mutex.unlock();
return false;
}
bool StatusHandler::received(Message &msg)
{
const String* sel = msg.getParam(YSTRING("module"));
if (!(TelEngine::null(sel) || (*sel == YSTRING("cdrcombine"))))
return false;
String st("name=cdrcombine,type=cdr,format=ChanId|Caller|Called|Address|OutLegs");
s_mutex.lock();
st << ";cdrs=" << s_cdrs.count();
if (msg.getBoolValue(YSTRING("details"),true)) {
st << ";";
bool first = true;
for (ObjList* l = s_cdrs.skipNull(); l; l = l->skipNext()) {
const CdrCombiner* c = static_cast<const CdrCombiner*>(l->get());
if (first)
first = false;
else
st << ",";
c->getStatus(st);
}
}
s_mutex.unlock();
msg.retValue() << st << "\r\n";
return false;
}
bool CommandHandler::received(Message &msg)
{
static const String name("cdrcombine");
const String* partial = msg.getParam(YSTRING("partline"));
if (!partial || *partial != YSTRING("status"))
return false;
partial = msg.getParam(YSTRING("partword"));
if (TelEngine::null(partial) || name.startsWith(*partial))
msg.retValue().append(name,"\t");
return false;
}
CdrCombinePlugin::CdrCombinePlugin()
: Plugin("cdrcombine"),
m_first(true)
{
Output("Loaded module CdrCombine");
}
CdrCombinePlugin::~CdrCombinePlugin()
{
Output("Unloading module CdrCombine");
}
void CdrCombinePlugin::initialize()
{
Output("Initializing module CdrCombine");
if (m_first) {
m_first = false;
Engine::install(new CdrHandler);
Engine::install(new StatusHandler);
Engine::install(new CommandHandler);
}
}
}; // anonymous namespace
/* vi: set ts=8 sw=4 sts=4 noet: */

View File

@ -5,7 +5,7 @@
* Write the CDR to a text file
*
* Yet Another Telephony Engine - a fully featured software PBX and IVR
* Copyright (C) 2004-2006 Null Team
* Copyright (C) 2004-2013 Null Team
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@ -59,13 +59,14 @@ public:
CdrFileHandler(const char *name)
: MessageHandler(name,100,__plugin.name()),
Mutex(false,"CdrFileHandler"),
m_file(-1)
m_file(-1), m_combined(false)
{ }
virtual ~CdrFileHandler();
virtual bool received(Message &msg);
void init(const char *fname, bool tabsep, const char* format);
void init(const char *fname, bool tabsep, bool combined, const char* format);
private:
int m_file;
bool m_combined;
String m_format;
};
@ -78,7 +79,7 @@ CdrFileHandler::~CdrFileHandler()
}
}
void CdrFileHandler::init(const char *fname, bool tabsep, const char* format)
void CdrFileHandler::init(const char *fname, bool tabsep, bool combined, const char* format)
{
Lock lock(this);
if (m_file >= 0) {
@ -86,10 +87,26 @@ void CdrFileHandler::init(const char *fname, bool tabsep, const char* format)
m_file = -1;
}
m_format = format;
if (m_format.null())
m_combined = combined;
if (m_format.null()) {
m_format = tabsep
? "${time}\t${billid}\t${chan}\t${address}\t${caller}\t${called}\t${billtime}\t${ringtime}\t${duration}\t${direction}\t${status}\t${reason}"
: "${time},\"${billid}\",\"${chan}\",\"${address}\",\"${caller}\",\"${called}\",${billtime},${ringtime},${duration},\"${direction}\",\"${status}\",\"${reason}\"";
? (combined
? "${time}\t${billid}\t${chan}\t${address}\t${caller}\t${called}"
"\t${billtime}\t${ringtime}\t${duration}\t${status}\t${reason}"
"\t${out_leg.chan}\t${out_leg.address}\t${out_leg.billtime}"
"\t${out_leg.ringtime}\t${out_leg.duration}\t${out_leg.reason}"
: "${time}\t${billid}\t${chan}\t${address}\t${caller}\t${called}"
"\t${billtime}\t${ringtime}\t${duration}\t${direction}\t${status}\t${reason}"
)
: (combined
? "${time},\"${billid}\",\"${chan}\",\"${address}\",\"${caller}\",\"${called}\""
",${billtime},${ringtime},${duration},\"${status}\",\"${reason}\""
",\"${out_leg.chan}\",\"${out_leg.address}\",${out_leg.billtime}"
",${out_leg.ringtime},${out_leg.duration},\"${out_leg.reason}\""
: "${time},\"${billid}\",\"${chan}\",\"${address}\",\"${caller}\",\"${called}\""
",${billtime},${ringtime},${duration},\"${direction}\",\"${status}\",\"${reason}\""
);
}
if (fname) {
m_file = ::open(fname,O_WRONLY|O_CREAT|O_APPEND|O_LARGEFILE,0640);
if (m_file < 0)
@ -103,7 +120,7 @@ bool CdrFileHandler::received(Message &msg)
if (!msg.getBoolValue("cdrwrite_cdrfile",true))
return false;
String op(msg.getValue("operation"));
if (op != "finalize")
if (op != (m_combined ? YSTRING("combined") : YSTRING("finalize")))
return false;
if (!msg.getBoolValue("cdrwrite",true))
return false;
@ -141,7 +158,8 @@ void CdrFilePlugin::initialize()
Engine::install(m_handler);
}
if (m_handler)
m_handler->init(file,cfg.getBoolValue("general","tabs",true),cfg.getValue("general","format"));
m_handler->init(file,cfg.getBoolValue("general","tabs",true),
cfg.getBoolValue("general","combined",false),cfg.getValue("general","format"));
}
}; // anonymous namespace

View File

@ -86,6 +86,7 @@ protected:
String m_name;
String m_queryInitialize;
String m_queryUpdate;
String m_queryCombined;
bool m_critical;
};
@ -552,13 +553,15 @@ bool CDRHandler::loadQuery()
{
m_queryInitialize = s_cfg.getValue(name(),"cdr_initialize");
m_queryUpdate = s_cfg.getValue(name(),"cdr_update");
m_queryCombined = s_cfg.getValue(name(),"cdr_combined");
m_query = s_cfg.getValue(name(),"cdr_finalize");
if (m_query.null())
m_query = s_cfg.getValue(name(),"query");
indirectQuery(m_queryInitialize);
indirectQuery(m_queryUpdate);
indirectQuery(m_queryCombined);
indirectQuery(m_query);
return m_queryInitialize || m_queryUpdate || m_query;
return m_queryInitialize || m_queryUpdate || m_queryCombined || m_query;
}
bool CDRHandler::received(Message& msg)
@ -571,11 +574,13 @@ bool CDRHandler::received(Message& msg)
if (!msg.getBoolValue("cdrwrite",true))
return false;
String query(msg.getValue("operation"));
if (query == "initialize")
if (query == YSTRING("initialize"))
query = m_queryInitialize;
else if (query == "update")
else if (query == YSTRING("update"))
query = m_queryUpdate;
else if (query == "finalize")
else if (query == YSTRING("combined"))
query = m_queryCombined;
else if (query == YSTRING("finalize"))
query = m_query;
else
return false;

View File

@ -94,6 +94,7 @@ for small to large scale projects.
%{_datadir}/yate/data/*
%dir %{_libdir}/yate
%{_libdir}/yate/cdrbuild.yate
%{_libdir}/yate/cdrcombine.yate
%{_libdir}/yate/cdrfile.yate
%{_libdir}/yate/regexroute.yate
%{_libdir}/yate/javascript.yate

View File

@ -681,6 +681,7 @@ public:
/**
* Retrieve the tracker parameter name
* @return Name of the parameter used to track message dispatching
*/
inline const String& trackParam() const
{ return m_trackParam; }
@ -1235,6 +1236,13 @@ public:
inline void setHook(MessagePostHook* hook, bool remove = false)
{ m_dispatcher.setHook(hook,remove); }
/**
* Retrieve the tracker parameter name
* @return Name of the parameter used to track message dispatching
*/
inline static const String& trackParam()
{ return s_self ? s_self->m_dispatcher.trackParam() : String::empty(); }
/**
* Get a count of plugins that are actively in use
* @return Count of plugins in use