/** * queues.cpp * This file is part of the YATE Project http://YATE.null.ro * * Call distribution and queues with settings from a database. * * Yet Another Telephony Engine - a fully featured software PBX and IVR * Copyright (C) 2004-2006 Null Team * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. */ #include using namespace TelEngine; namespace { // anonymous static ObjList s_queues; class QueuedCall : public String { public: inline QueuedCall(const String& id, const char* caller = 0) : String(id), m_caller(caller) { m_last = m_time = Time::now(); } inline int waitingTime(u_int64_t when = Time::now()) { return (int)(when - m_time); } inline int waitingLast(u_int64_t when = Time::now()) { return (int)(when - m_last); } inline const String& getMarked() const { return m_marked; } inline void setMarked(const char* mark = 0) { m_marked = mark; m_last = Time::now(); } inline const String& getCaller() const { return m_caller; } protected: String m_caller; String m_marked; u_int64_t m_time; u_int64_t m_last; }; class CallsQueue : public String { public: CallsQueue(const char* name); ~CallsQueue(); inline int countCalls() const { return m_calls.count(); } inline QueuedCall* findCall(const String& id) const { return static_cast(m_calls[id]); } void addCall(QueuedCall* call) { m_calls.append(call); } inline void addCall(const String& id, const char* caller) { addCall(new QueuedCall(id,caller)); } inline bool removeCall(const String& id) { return removeCall(findCall(id)); } bool removeCall(QueuedCall* call); QueuedCall* markCall(const char* mark); bool unmarkCall(const String& id); bool hasUnmarkedCalls() const; QueuedCall* topCall() const; void listCalls(String& retval); void startACD(); protected: ObjList m_calls; }; class QueuesModule : public Module { public: QueuesModule(); ~QueuesModule(); inline static CallsQueue* findQueue(const String& name) { return static_cast(s_queues[name]); } static CallsQueue* findCallQueue(const String& id); protected: virtual void initialize(); virtual void statusParams(String& str); virtual bool received(Message& msg, int id); virtual void msgTimer(Message& msg); void onExecute(Message& msg, String callto); void onAnswered(Message& msg, String targetid, String reason); void onHangup(Message& msg, String id); void onQueued(Message& msg, String qname); void onPickup(Message& msg, String qname); bool onCommand(String line, String& retval); private: bool m_init; }; static QueuesModule module; static String s_account; static String s_pbxcall; static String s_pbxchan; static u_int32_t s_nextTime = 0; static int s_rescan = 5; CallsQueue::CallsQueue(const char* name) : String(name) { Debug(&module,DebugInfo,"Creating queue '%s'",name); s_queues.append(this); } CallsQueue::~CallsQueue() { Debug(&module,DebugInfo,"Deleting queue '%s'",c_str()); s_queues.remove(this,false); } bool CallsQueue::removeCall(QueuedCall* call) { if (!call) return false; m_calls.remove(call); if (!m_calls.count()) destruct(); return true; } QueuedCall* CallsQueue::markCall(const char* mark) { ObjList* l = m_calls.skipNull(); for (; l; l=l->skipNext()) { QueuedCall* call = static_cast(l->get()); if (call->getMarked().null()) { call->setMarked(mark); return call; } } return 0; } bool CallsQueue::unmarkCall(const String& id) { QueuedCall* call = findCall(id); if (!call) return false; call->setMarked(); return true; } bool CallsQueue::hasUnmarkedCalls() const { ObjList* l = m_calls.skipNull(); for (; l; l=l->skipNext()) { if (static_cast(l->get())->getMarked().null()) return true; } return false; } QueuedCall* CallsQueue::topCall() const { ObjList* l = m_calls.skipNull(); return l ? static_cast(l->get()) : 0; } void CallsQueue::listCalls(String& retval) { retval.append("Queue ","\r\n") << c_str() << " " << countCalls(); u_int64_t when = Time::now(); ObjList* l = m_calls.skipNull(); for (; l; l=l->skipNext()) { QueuedCall* call = static_cast(l->get()); retval << "\r\n " << *call << " " << call->getCaller(); retval << " (" << (call->waitingLast(when) / 1000000); retval << "/" << (call->waitingTime(when) / 1000000) << ")"; if (call->getMarked()) retval << " => " << call->getMarked(); } retval << "\r\n"; } void CallsQueue::startACD() { if (s_account.null() || s_pbxcall.null() || !hasUnmarkedCalls()) return; String query("SELECT algorithm,timeout,prompt FROM groups WHERE ugroup="); query << "'" << sqlEscape() << "'"; Message m("database"); m.addParam("account",s_account); m.addParam("query",query); if (!Engine::dispatch(m)) { Debug(&module,DebugWarn,"Query on '%s' failed: '%s'",s_account.c_str(),query.c_str()); return; } Array* res = static_cast(m.userObject("Array")); if (!res || (m.getIntValue("rows") != 1)) { Debug(&module,DebugWarn,"Missing data for group '%s'",c_str()); return; } String algorithm = YOBJECT(String,res->get(0,1)); String timeout = YOBJECT(String,res->get(1,1)); String prompt = YOBJECT(String,res->get(2,1)); query = "SELECT username,reg_location FROM users WHERE inuse_count=0"; query << " AND users.uid IN (SELECT appflags.uid FROM appflags,applications WHERE appflags.appid=applications.appid AND applications.name='Line' AND appflags.name='operator_available' AND appflags.value='true')"; query << " AND uid IN (SELECT appflags.uid FROM appflags,applications WHERE appflags.appid=applications.appid AND applications.name='Groups' AND appflags.name='group' AND value="; query << "'" << sqlEscape() << "')"; query << " ORDER BY COALESCE(inuse_last,TIMESTAMP 'EPOCH')"; Message msg("database"); msg.addParam("account",s_account); msg.addParam("query",query); if (!Engine::dispatch(msg)) { Debug(&module,DebugWarn,"Query on '%s' failed: '%s'",s_account.c_str(),query.c_str()); return; } res = static_cast(msg.userObject("Array")); if (!res || (msg.getIntValue("rows") < 1)) return; for (int i = 1; i < res->getRows(); i++) { String callto = YOBJECT(String,res->get(1,i)); String user = YOBJECT(String,res->get(0,i)); if (callto.null() || user.null()) continue; QueuedCall* call = markCall(user); // if we failed to pick a waiting call we are done if (!call) break; Debug(&module,DebugInfo,"Distributing call '%s' to '%s' in group '%s'", call->c_str(),user.c_str(),c_str()); Message* ex = new Message("call.execute"); ex->addParam("direct",callto); ex->addParam("target",user); ex->addParam("caller",call->getCaller()); ex->addParam("called",user); ex->addParam("callto",s_pbxcall); ex->addParam("notify",*call); ex->addParam("queue",c_str()); if (timeout.toInteger() > 0) { timeout += "000"; ex->addParam("maxcall",timeout); } if (prompt) { if (prompt.find('/') < 0) prompt = "wave/play/sounds/" + prompt; msg.setParam("prompt",prompt); } Engine::enqueue(ex); } } QueuesModule::QueuesModule() : Module("queues","misc"), m_init(false) { Output("Loaded module Queues"); } QueuesModule::~QueuesModule() { Output("Unloading module Queues"); } void QueuesModule::statusParams(String& str) { str.append("queues=",",") << s_queues.count(); } void QueuesModule::onQueued(Message& msg, String qname) { if (qname.null() || (qname.find('/') >= 0)) return; if (s_account.null() || s_pbxchan.null()) return; String query("SELECT length,greeting,onhold FROM groups WHERE ugroup="); query << "'" << qname.sqlEscape() << "'"; Message m("database"); m.addParam("account",s_account); m.addParam("query",query); if (!Engine::dispatch(m)) { Debug(this,DebugWarn,"Query on '%s' failed: '%s'",s_account.c_str(),query.c_str()); return; } Array* res = static_cast(m.userObject("Array")); if (!res || (m.getIntValue("rows") != 1)) { Debug(this,DebugWarn,"Missing queue '%s'",qname.c_str()); msg.setParam("error","noroute"); msg.setParam("reason","Queue does not exist"); return; } String tmp = YOBJECT(String,res->get(0,1)); int maxlen = tmp.toInteger(); CallsQueue* queue = findQueue(qname); if (!queue) queue = new CallsQueue(qname); if (maxlen && (queue->countCalls() >= maxlen)) { Debug(this,DebugWarn,"Queue '%s' is full",qname.c_str()); msg.setParam("error","congestion"); msg.setParam("reason","Queue is full"); return; } tmp = YOBJECT(String,res->get(1,1)); if (tmp) { if (tmp.find('/') < 0) tmp = "wave/play/sounds/" + tmp; msg.setParam("greeting",tmp); } tmp = YOBJECT(String,res->get(2,1)); if (tmp.find('/') < 0) { if (tmp) tmp = "moh/" + tmp; else tmp = "tone/ring"; } msg.setParam("source",tmp); msg.setParam("callto",s_pbxchan); queue->addCall(msg.getValue("id"),msg.getValue("caller")); queue->startACD(); } void QueuesModule::onPickup(Message& msg, String qname) { if (qname.null() || (qname.find('/') >= 0)) return; CallsQueue* queue = findQueue(qname); if (queue) { QueuedCall* call = queue->topCall(); if (call) { String id = *call; String pid = msg.getValue("id"); queue->removeCall(call); msg = "chan.connect"; msg.setParam("targetid",id); // a little late... but answer to the queued call Message* m = new Message("call.answered"); m->setParam("id",pid); m->setParam("targetid",id); Engine::enqueue(m); // also answer the pickup call m = new Message("call.answered"); m->setParam("id",id); m->setParam("targetid",pid); Engine::enqueue(m); return; } } msg.setParam("error","nocall"); msg.setParam("reason","There are no calls in queue"); } void QueuesModule::onExecute(Message& msg, String callto) { if (callto.startSkip("queue/",false)) onQueued(msg,callto); else if (callto.startSkip("pickup/",false)) onPickup(msg,callto); } void QueuesModule::onAnswered(Message& msg, String targetid, String reason) { if (reason == "queued") return; CallsQueue* queue = findCallQueue(targetid); if (!queue) return; Debug(this,DebugCall,"Answered call '%s' in queue '%s'", targetid.c_str(),queue->c_str()); queue->removeCall(targetid); } void QueuesModule::onHangup(Message& msg, String id) { String notify = msg.getValue("notify"); String qname = msg.getValue("queue"); if (notify && qname) { CallsQueue* queue = findQueue(qname); if (queue) { Debug(this,DebugCall,"Hung up outgoing call '%s' serving '%s' in '%s'", id.c_str(),notify.c_str(),qname.c_str()); if (queue->unmarkCall(notify)) { queue->startACD(); return; } } } CallsQueue* queue = findCallQueue(id); if (!queue) return; Debug(this,DebugCall,"Hung up call '%s' in '%s'",id.c_str(),queue->c_str()); queue->removeCall(id); } bool QueuesModule::onCommand(String line, String& retval) { if (line.startSkip("queues")) { if (line == "list") { ObjList* l = s_queues.skipNull(); for (; l; l=l->skipNext()) static_cast(l->get())->listCalls(retval); return true; } } return false; } bool QueuesModule::received(Message& msg, int id) { Lock lock(this); switch (id) { case Command: return onCommand(msg.getValue("line"),msg.retValue()); break; case Execute: onExecute(msg,msg.getValue("callto")); break; case Answered: onAnswered(msg,msg.getValue("targetid"),msg.getValue("reason")); break; case Private: onHangup(msg,msg.getValue("id")); break; default: lock.drop(); return Module::received(msg,id); } return false; } void QueuesModule::msgTimer(Message& msg) { u_int32_t t = msg.msgTime().sec(); if (t >= s_nextTime) { // we rescan queues every 5 seconds s_nextTime = t + s_rescan; ObjList* l = s_queues.skipNull(); for (; l; l=l->skipNext()) static_cast(l->get())->startACD(); } Module::msgTimer(msg); } CallsQueue* QueuesModule::findCallQueue(const String& id) { ObjList* l = s_queues.skipNull(); for (; l; l=l->skipNext()) { CallsQueue* queue = static_cast(l->get()); if (queue->findCall(id)) return queue; } return 0; } void QueuesModule::initialize() { Configuration cfg(Engine::configFile("queues")); lock(); s_rescan = cfg.getIntValue("general","rescan",5); s_account = cfg.getValue("general","account"); s_pbxcall = cfg.getValue("pbx","call"); s_pbxchan = cfg.getValue("pbx","chan"); int priority = cfg.getIntValue("general","priority",50); unlock(); if (m_init) return; m_init = true; setup(); Output("Initializing module Queues for database"); installRelay(Command); installRelay(Execute,priority); installRelay(Answered,priority); Engine::install(new MessageRelay("chan.hangup",this,Private,priority)); } }; // anonymous namespace /* vi: set ts=8 sw=4 sts=4 noet: */