Added queues (ACD) module.
git-svn-id: http://voip.null.ro/svn/yate@1374 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
parent
985b349df0
commit
03f8906ddb
|
@ -27,7 +27,8 @@ PROGS := cdrbuild.yate cdrfile.yate \
|
|||
tonegen.yate tonedetect.yate wavefile.yate \
|
||||
conference.yate moh.yate \
|
||||
callgen.yate analyzer.yate rmanager.yate msgsniff.yate \
|
||||
pbx.yate dbpbx.yate pbxassist.yate dumbchan.yate callfork.yate \
|
||||
pbx.yate dbpbx.yate pbxassist.yate \
|
||||
dumbchan.yate callfork.yate queues.yate \
|
||||
extmodule.yate yradius.yate \
|
||||
yrtpchan.yate ystunchan.yate \
|
||||
ysipchan.yate yiaxchan.yate yjinglechan.yate
|
||||
|
|
|
@ -0,0 +1,479 @@
|
|||
/**
|
||||
* queues.cpp
|
||||
* This file is part of the YATE Project http://YATE.null.ro
|
||||
*
|
||||
* Call distribution and queues with settings from a database.
|
||||
*
|
||||
* Yet Another Telephony Engine - a fully featured software PBX and IVR
|
||||
* Copyright (C) 2004-2006 Null Team
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation; either version 2 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
#include <yatephone.h>
|
||||
|
||||
using namespace TelEngine;
|
||||
namespace { // anonymous
|
||||
|
||||
static ObjList s_queues;
|
||||
|
||||
class QueuedCall : public String
|
||||
{
|
||||
public:
|
||||
inline QueuedCall(const String& id, const char* caller = 0)
|
||||
: String(id), m_caller(caller)
|
||||
{ m_last = m_time = Time::now(); }
|
||||
inline int waitingTime(u_int64_t when = Time::now())
|
||||
{ return (int)(when - m_time); }
|
||||
inline int waitingLast(u_int64_t when = Time::now())
|
||||
{ return (int)(when - m_last); }
|
||||
inline const String& getMarked() const
|
||||
{ return m_marked; }
|
||||
inline void setMarked(const char* mark = 0)
|
||||
{ m_marked = mark; m_last = Time::now(); }
|
||||
inline const String& getCaller() const
|
||||
{ return m_caller; }
|
||||
|
||||
protected:
|
||||
String m_caller;
|
||||
String m_marked;
|
||||
u_int64_t m_time;
|
||||
u_int64_t m_last;
|
||||
};
|
||||
|
||||
class CallsQueue : public String
|
||||
{
|
||||
public:
|
||||
CallsQueue(const char* name);
|
||||
~CallsQueue();
|
||||
inline int countCalls() const
|
||||
{ return m_calls.count(); }
|
||||
inline QueuedCall* findCall(const String& id) const
|
||||
{ return static_cast<QueuedCall*>(m_calls[id]); }
|
||||
void addCall(QueuedCall* call)
|
||||
{ m_calls.append(call); }
|
||||
inline void addCall(const String& id, const char* caller)
|
||||
{ addCall(new QueuedCall(id,caller)); }
|
||||
inline bool removeCall(const String& id)
|
||||
{ return removeCall(findCall(id)); }
|
||||
bool removeCall(QueuedCall* call);
|
||||
QueuedCall* markCall(const char* mark);
|
||||
bool unmarkCall(const String& id);
|
||||
bool hasUnmarkedCalls() const;
|
||||
QueuedCall* topCall() const;
|
||||
void listCalls(String& retval);
|
||||
void startACD();
|
||||
protected:
|
||||
ObjList m_calls;
|
||||
};
|
||||
|
||||
class QueuesModule : public Module
|
||||
{
|
||||
public:
|
||||
QueuesModule();
|
||||
~QueuesModule();
|
||||
inline static CallsQueue* findQueue(const String& name)
|
||||
{ return static_cast<CallsQueue*>(s_queues[name]); }
|
||||
static CallsQueue* findCallQueue(const String& id);
|
||||
protected:
|
||||
virtual void initialize();
|
||||
virtual void statusParams(String& str);
|
||||
virtual bool received(Message& msg, int id);
|
||||
virtual void msgTimer(Message& msg);
|
||||
void onExecute(Message& msg, String callto);
|
||||
void onAnswered(Message& msg, String targetid, String reason);
|
||||
void onHangup(Message& msg, String id);
|
||||
void onQueued(Message& msg, String qname);
|
||||
void onPickup(Message& msg, String qname);
|
||||
bool onCommand(String line, String& retval);
|
||||
private:
|
||||
bool m_init;
|
||||
};
|
||||
|
||||
|
||||
static QueuesModule module;
|
||||
static String s_account;
|
||||
static String s_pbxcall;
|
||||
static String s_pbxchan;
|
||||
static u_int32_t s_nextTime = 0;
|
||||
static int s_rescan = 5;
|
||||
|
||||
|
||||
CallsQueue::CallsQueue(const char* name)
|
||||
: String(name)
|
||||
{
|
||||
Debug(&module,DebugInfo,"Creating queue '%s'",name);
|
||||
s_queues.append(this);
|
||||
}
|
||||
|
||||
CallsQueue::~CallsQueue()
|
||||
{
|
||||
Debug(&module,DebugInfo,"Deleting queue '%s'",c_str());
|
||||
s_queues.remove(this,false);
|
||||
}
|
||||
|
||||
bool CallsQueue::removeCall(QueuedCall* call)
|
||||
{
|
||||
if (!call)
|
||||
return false;
|
||||
m_calls.remove(call);
|
||||
if (!m_calls.count())
|
||||
destruct();
|
||||
return true;
|
||||
}
|
||||
|
||||
QueuedCall* CallsQueue::markCall(const char* mark)
|
||||
{
|
||||
ObjList* l = m_calls.skipNull();
|
||||
for (; l; l=l->skipNext()) {
|
||||
QueuedCall* call = static_cast<QueuedCall*>(l->get());
|
||||
if (call->getMarked().null()) {
|
||||
call->setMarked(mark);
|
||||
return call;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool CallsQueue::unmarkCall(const String& id)
|
||||
{
|
||||
QueuedCall* call = findCall(id);
|
||||
if (!call)
|
||||
return false;
|
||||
call->setMarked();
|
||||
return true;
|
||||
}
|
||||
|
||||
bool CallsQueue::hasUnmarkedCalls() const
|
||||
{
|
||||
ObjList* l = m_calls.skipNull();
|
||||
for (; l; l=l->skipNext()) {
|
||||
if (static_cast<QueuedCall*>(l->get())->getMarked().null())
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
QueuedCall* CallsQueue::topCall() const
|
||||
{
|
||||
ObjList* l = m_calls.skipNull();
|
||||
return l ? static_cast<QueuedCall*>(l->get()) : 0;
|
||||
}
|
||||
|
||||
void CallsQueue::listCalls(String& retval)
|
||||
{
|
||||
retval.append("Queue ","\r\n") << c_str() << " " << countCalls();
|
||||
u_int64_t when = Time::now();
|
||||
ObjList* l = m_calls.skipNull();
|
||||
for (; l; l=l->skipNext()) {
|
||||
QueuedCall* call = static_cast<QueuedCall*>(l->get());
|
||||
retval << "\r\n " << *call << " " << call->getCaller();
|
||||
retval << " (" << (call->waitingLast(when) / 1000000);
|
||||
retval << "/" << (call->waitingTime(when) / 1000000) << ")";
|
||||
if (call->getMarked())
|
||||
retval << " => " << call->getMarked();
|
||||
}
|
||||
retval << "\r\n";
|
||||
}
|
||||
|
||||
void CallsQueue::startACD()
|
||||
{
|
||||
if (s_account.null() || s_pbxcall.null() || !hasUnmarkedCalls())
|
||||
return;
|
||||
String query("SELECT algorithm,timeout,prompt FROM groups WHERE ugroup=");
|
||||
query << "'" << sqlEscape() << "'";
|
||||
Message m("database");
|
||||
m.addParam("account",s_account);
|
||||
m.addParam("query",query);
|
||||
if (!Engine::dispatch(m)) {
|
||||
Debug(&module,DebugWarn,"Query on '%s' failed: '%s'",s_account.c_str(),query.c_str());
|
||||
return;
|
||||
}
|
||||
Array* res = static_cast<Array*>(m.userObject("Array"));
|
||||
if (!res || (m.getIntValue("rows") != 1)) {
|
||||
Debug(&module,DebugWarn,"Missing data for group '%s'",c_str());
|
||||
return;
|
||||
}
|
||||
String algorithm = YOBJECT(String,res->get(0,1));
|
||||
String timeout = YOBJECT(String,res->get(1,1));
|
||||
String prompt = YOBJECT(String,res->get(2,1));
|
||||
|
||||
query = "SELECT username,reg_location FROM users WHERE inuse_count=0";
|
||||
query << " AND users.uid IN (SELECT appflags.uid FROM appflags,applications WHERE appflags.appid=applications.appid AND applications.name='Line' AND appflags.name='operator_available' AND appflags.value='true')";
|
||||
query << " AND uid IN (SELECT appflags.uid FROM appflags,applications WHERE appflags.appid=applications.appid AND applications.name='Groups' AND appflags.name='group' AND value=";
|
||||
query << "'" << sqlEscape() << "')";
|
||||
query << " ORDER BY COALESCE(inuse_last,TIMESTAMP 'EPOCH')";
|
||||
Message msg("database");
|
||||
msg.addParam("account",s_account);
|
||||
msg.addParam("query",query);
|
||||
if (!Engine::dispatch(msg)) {
|
||||
Debug(&module,DebugWarn,"Query on '%s' failed: '%s'",s_account.c_str(),query.c_str());
|
||||
return;
|
||||
}
|
||||
res = static_cast<Array*>(msg.userObject("Array"));
|
||||
if (!res || (msg.getIntValue("rows") < 1))
|
||||
return;
|
||||
for (int i = 1; i < res->getRows(); i++) {
|
||||
String callto = YOBJECT(String,res->get(1,i));
|
||||
String user = YOBJECT(String,res->get(0,i));
|
||||
if (callto.null() || user.null())
|
||||
continue;
|
||||
QueuedCall* call = markCall(user);
|
||||
// if we failed to pick a waiting call we are done
|
||||
if (!call)
|
||||
break;
|
||||
Debug(&module,DebugInfo,"Distributing call '%s' to '%s' in group '%s'",
|
||||
call->c_str(),user.c_str(),c_str());
|
||||
Message* ex = new Message("call.execute");
|
||||
ex->addParam("direct",callto);
|
||||
ex->addParam("target",user);
|
||||
ex->addParam("caller",call->getCaller());
|
||||
ex->addParam("called",user);
|
||||
ex->addParam("callto",s_pbxcall);
|
||||
ex->addParam("notify",*call);
|
||||
ex->addParam("queue",c_str());
|
||||
if (timeout.toInteger() > 0) {
|
||||
timeout += "000";
|
||||
ex->addParam("maxcall",timeout);
|
||||
}
|
||||
if (prompt) {
|
||||
if (prompt.find('/') < 0)
|
||||
prompt = "wave/play/sounds/" + prompt;
|
||||
msg.setParam("prompt",prompt);
|
||||
}
|
||||
Engine::enqueue(ex);
|
||||
}
|
||||
}
|
||||
|
||||
QueuesModule::QueuesModule()
|
||||
: Module("queues","misc"), m_init(false)
|
||||
{
|
||||
Output("Loaded module Queues");
|
||||
}
|
||||
|
||||
QueuesModule::~QueuesModule()
|
||||
{
|
||||
Output("Unloading module Queues");
|
||||
}
|
||||
|
||||
void QueuesModule::statusParams(String& str)
|
||||
{
|
||||
str.append("queues=",",") << s_queues.count();
|
||||
}
|
||||
|
||||
void QueuesModule::onQueued(Message& msg, String qname)
|
||||
{
|
||||
if (s_account.null() || s_pbxchan.null())
|
||||
return;
|
||||
String query("SELECT length,greeting,onhold FROM groups WHERE ugroup=");
|
||||
query << "'" << qname.sqlEscape() << "'";
|
||||
Message m("database");
|
||||
m.addParam("account",s_account);
|
||||
m.addParam("query",query);
|
||||
if (!Engine::dispatch(m)) {
|
||||
Debug(this,DebugWarn,"Query on '%s' failed: '%s'",s_account.c_str(),query.c_str());
|
||||
return;
|
||||
}
|
||||
Array* res = static_cast<Array*>(m.userObject("Array"));
|
||||
if (!res || (m.getIntValue("rows") != 1)) {
|
||||
Debug(this,DebugWarn,"Missing queue '%s'",qname.c_str());
|
||||
msg.setParam("error","noroute");
|
||||
msg.setParam("reason","Queue does not exist");
|
||||
return;
|
||||
}
|
||||
String tmp = YOBJECT(String,res->get(0,1));
|
||||
int maxlen = tmp.toInteger();
|
||||
CallsQueue* queue = findQueue(qname);
|
||||
if (!queue)
|
||||
queue = new CallsQueue(qname);
|
||||
if (maxlen && (queue->countCalls() >= maxlen)) {
|
||||
Debug(this,DebugWarn,"Queue '%s' is full",qname.c_str());
|
||||
msg.setParam("error","congestion");
|
||||
msg.setParam("reason","Queue is full");
|
||||
return;
|
||||
}
|
||||
tmp = YOBJECT(String,res->get(1,1));
|
||||
if (tmp) {
|
||||
if (tmp.find('/') < 0)
|
||||
tmp = "wave/play/sounds/" + tmp;
|
||||
msg.setParam("greeting",tmp);
|
||||
}
|
||||
tmp = YOBJECT(String,res->get(2,1));
|
||||
if (tmp.find('/') < 0) {
|
||||
if (tmp)
|
||||
tmp = "moh/" + tmp;
|
||||
else
|
||||
tmp = "tone/ring";
|
||||
}
|
||||
msg.setParam("source",tmp);
|
||||
msg.setParam("callto",s_pbxchan);
|
||||
queue->addCall(msg.getValue("id"),msg.getValue("caller"));
|
||||
queue->startACD();
|
||||
}
|
||||
|
||||
void QueuesModule::onPickup(Message& msg, String qname)
|
||||
{
|
||||
CallsQueue* queue = findQueue(qname);
|
||||
if (queue) {
|
||||
QueuedCall* call = queue->topCall();
|
||||
if (call) {
|
||||
String id = *call;
|
||||
String pid = msg.getValue("id");
|
||||
queue->removeCall(call);
|
||||
msg = "chan.connect";
|
||||
msg.setParam("targetid",id);
|
||||
// a little late... but answer to the queued call
|
||||
Message* m = new Message("call.answered");
|
||||
m->setParam("id",pid);
|
||||
m->setParam("targetid",id);
|
||||
Engine::enqueue(m);
|
||||
// also answer the pickup call
|
||||
m = new Message("call.answered");
|
||||
m->setParam("id",id);
|
||||
m->setParam("targetid",pid);
|
||||
Engine::enqueue(m);
|
||||
return;
|
||||
}
|
||||
}
|
||||
msg.setParam("error","nocall");
|
||||
msg.setParam("reason","There are no calls in queue");
|
||||
}
|
||||
|
||||
void QueuesModule::onExecute(Message& msg, String callto)
|
||||
{
|
||||
if (callto.startSkip("queue/",false))
|
||||
onQueued(msg,callto);
|
||||
else if (callto.startSkip("pickup/",false))
|
||||
onPickup(msg,callto);
|
||||
}
|
||||
|
||||
void QueuesModule::onAnswered(Message& msg, String targetid, String reason)
|
||||
{
|
||||
if (reason == "queued")
|
||||
return;
|
||||
CallsQueue* queue = findCallQueue(targetid);
|
||||
if (!queue)
|
||||
return;
|
||||
Debug(this,DebugCall,"Answered call '%s' in queue '%s'",
|
||||
targetid.c_str(),queue->c_str());
|
||||
queue->removeCall(targetid);
|
||||
}
|
||||
|
||||
void QueuesModule::onHangup(Message& msg, String id)
|
||||
{
|
||||
String notify = msg.getValue("notify");
|
||||
String qname = msg.getValue("queue");
|
||||
if (notify && qname) {
|
||||
CallsQueue* queue = findQueue(qname);
|
||||
if (queue) {
|
||||
Debug(this,DebugCall,"Hung up outgoing call '%s' serving '%s' in '%s'",
|
||||
id.c_str(),notify.c_str(),qname.c_str());
|
||||
if (queue->unmarkCall(notify)) {
|
||||
queue->startACD();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
CallsQueue* queue = findCallQueue(id);
|
||||
if (!queue)
|
||||
return;
|
||||
Debug(this,DebugCall,"Hung up call '%s' in '%s'",id.c_str(),queue->c_str());
|
||||
queue->removeCall(id);
|
||||
}
|
||||
|
||||
bool QueuesModule::onCommand(String line, String& retval)
|
||||
{
|
||||
if (line.startSkip("queues")) {
|
||||
if (line == "list") {
|
||||
ObjList* l = s_queues.skipNull();
|
||||
for (; l; l=l->skipNext())
|
||||
static_cast<CallsQueue*>(l->get())->listCalls(retval);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool QueuesModule::received(Message& msg, int id)
|
||||
{
|
||||
Lock lock(this);
|
||||
switch (id) {
|
||||
case Command:
|
||||
return onCommand(msg.getValue("line"),msg.retValue());
|
||||
break;
|
||||
case Execute:
|
||||
onExecute(msg,msg.getValue("callto"));
|
||||
break;
|
||||
case Answered:
|
||||
onAnswered(msg,msg.getValue("targetid"),msg.getValue("reason"));
|
||||
break;
|
||||
case Private:
|
||||
onHangup(msg,msg.getValue("id"));
|
||||
break;
|
||||
default:
|
||||
lock.drop();
|
||||
return Module::received(msg,id);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void QueuesModule::msgTimer(Message& msg)
|
||||
{
|
||||
u_int32_t t = msg.msgTime().sec();
|
||||
if (t >= s_nextTime) {
|
||||
// we rescan queues every 5 seconds
|
||||
s_nextTime = t + s_rescan;
|
||||
ObjList* l = s_queues.skipNull();
|
||||
for (; l; l=l->skipNext())
|
||||
static_cast<CallsQueue*>(l->get())->startACD();
|
||||
}
|
||||
Module::msgTimer(msg);
|
||||
}
|
||||
|
||||
CallsQueue* QueuesModule::findCallQueue(const String& id)
|
||||
{
|
||||
ObjList* l = s_queues.skipNull();
|
||||
for (; l; l=l->skipNext()) {
|
||||
CallsQueue* queue = static_cast<CallsQueue*>(l->get());
|
||||
if (queue->findCall(id))
|
||||
return queue;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void QueuesModule::initialize()
|
||||
{
|
||||
Configuration cfg(Engine::configFile("queues"));
|
||||
lock();
|
||||
s_rescan = cfg.getIntValue("general","rescan",5);
|
||||
s_account = cfg.getValue("general","account");
|
||||
s_pbxcall = cfg.getValue("pbx","call");
|
||||
s_pbxchan = cfg.getValue("pbx","chan");
|
||||
int priority = cfg.getIntValue("general","priority",50);
|
||||
unlock();
|
||||
if (m_init)
|
||||
return;
|
||||
m_init = true;
|
||||
setup();
|
||||
Output("Initializing module Queues for database");
|
||||
installRelay(Command);
|
||||
installRelay(Execute,priority);
|
||||
installRelay(Answered,priority);
|
||||
Engine::install(new MessageRelay("chan.hangup",this,Private,priority));
|
||||
}
|
||||
|
||||
}; // anonymous namespace
|
||||
|
||||
/* vi: set ts=8 sw=4 sts=4 noet: */
|
Loading…
Reference in New Issue