701 lines
16 KiB
C++
701 lines
16 KiB
C++
/**
|
|
* Message.cpp
|
|
* This file is part of the YATE Project http://YATE.null.ro
|
|
*
|
|
* Yet Another Telephony Engine - a fully featured software PBX and IVR
|
|
* Copyright (C) 2004-2013 Null Team
|
|
*
|
|
* This software is distributed under multiple licenses;
|
|
* see the COPYING file in the main directory for licensing
|
|
* information for this specific distribution.
|
|
*
|
|
* This use of this software may be subject to additional restrictions.
|
|
* See the LEGAL file in the main directory for details.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
|
|
*/
|
|
|
|
#include "yatengine.h"
|
|
#include <string.h>
|
|
|
|
using namespace TelEngine;
|
|
|
|
class QueueWorker : public GenObject, public Thread
|
|
{
|
|
public:
|
|
inline QueueWorker(MessageQueue* queue)
|
|
: Thread("MessageQueueWorker"),m_queue(queue)
|
|
{}
|
|
virtual ~QueueWorker();
|
|
protected:
|
|
virtual void run();
|
|
private:
|
|
RefPointer<MessageQueue> m_queue;
|
|
};
|
|
|
|
Message::Message(const char* name, const char* retval, bool broadcast)
|
|
: NamedList(name),
|
|
m_return(retval), m_data(0), m_notify(false), m_broadcast(broadcast)
|
|
{
|
|
XDebug(DebugAll,"Message::Message(\"%s\",\"%s\",%s) [%p]",
|
|
name,retval,String::boolText(broadcast),this);
|
|
}
|
|
|
|
Message::Message(const Message& original)
|
|
: NamedList(original),
|
|
m_return(original.retValue()), m_time(original.msgTime()),
|
|
m_data(0), m_notify(false), m_broadcast(original.broadcast())
|
|
{
|
|
XDebug(DebugAll,"Message::Message(&%p) [%p]",&original,this);
|
|
}
|
|
|
|
Message::Message(const Message& original, bool broadcast)
|
|
: NamedList(original),
|
|
m_return(original.retValue()), m_time(original.msgTime()),
|
|
m_data(0), m_notify(false), m_broadcast(broadcast)
|
|
{
|
|
XDebug(DebugAll,"Message::Message(&%p,%s) [%p]",
|
|
&original,String::boolText(broadcast),this);
|
|
}
|
|
|
|
Message::~Message()
|
|
{
|
|
XDebug(DebugAll,"Message::~Message() '%s' [%p]",c_str(),this);
|
|
userData(0);
|
|
}
|
|
|
|
void* Message::getObject(const String& name) const
|
|
{
|
|
if (name == YATOM("Message"))
|
|
return const_cast<Message*>(this);
|
|
return NamedList::getObject(name);
|
|
}
|
|
|
|
void Message::userData(RefObject* data)
|
|
{
|
|
if (data == m_data)
|
|
return;
|
|
m_notify = false;
|
|
RefObject* tmp = m_data;
|
|
if (data && !data->ref())
|
|
data = 0;
|
|
m_data = data;
|
|
if (tmp)
|
|
tmp->deref();
|
|
}
|
|
|
|
void Message::dispatched(bool accepted)
|
|
{
|
|
if (!m_notify)
|
|
return;
|
|
MessageNotifier* hook = YOBJECT(MessageNotifier,m_data);
|
|
if (hook)
|
|
hook->dispatched(*this,accepted);
|
|
}
|
|
|
|
String Message::encode(const char* id) const
|
|
{
|
|
String s("%%>message:");
|
|
s << String::msgEscape(id) << ":" << (unsigned int)m_time.sec() << ":";
|
|
commonEncode(s);
|
|
return s;
|
|
}
|
|
|
|
String Message::encode(bool received, const char* id) const
|
|
{
|
|
String s("%%<message:");
|
|
s << String::msgEscape(id) << ":" << received << ":";
|
|
commonEncode(s);
|
|
return s;
|
|
}
|
|
|
|
int Message::decode(const char* str, String& id)
|
|
{
|
|
String s("%%>message:");
|
|
if (!str || ::strncmp(str,s.c_str(),s.length()))
|
|
return -1;
|
|
// locate the SEP after id
|
|
const char *sep = ::strchr(str+s.length(),':');
|
|
if (!sep)
|
|
return s.length();
|
|
// locate the SEP after time
|
|
const char *sep2 = ::strchr(sep+1,':');
|
|
if (!sep2)
|
|
return sep-str;
|
|
id.assign(str+s.length(),(sep-str)-s.length());
|
|
int err = -1;
|
|
id = id.msgUnescape(&err);
|
|
if (err >= 0)
|
|
return err+s.length();
|
|
String t(sep+1,sep2-sep-1);
|
|
unsigned int tm = 0;
|
|
t >> tm;
|
|
if (!t.null())
|
|
return sep-str;
|
|
m_time=tm ? ((u_int64_t)1000000)*tm : Time::now();
|
|
return commonDecode(str,sep2-str+1);
|
|
}
|
|
|
|
int Message::decode(const char* str, bool& received, const char* id)
|
|
{
|
|
String s("%%<message:");
|
|
s << id << ":";
|
|
if (!str || ::strncmp(str,s.c_str(),s.length()))
|
|
return -1;
|
|
// locate the SEP after received
|
|
const char *sep = ::strchr(str+s.length(),':');
|
|
if (!sep)
|
|
return s.length();
|
|
String rcvd(str+s.length(),(sep-str)-s.length());
|
|
rcvd >> received;
|
|
if (!rcvd.null())
|
|
return s.length();
|
|
return sep[1] ? commonDecode(str,sep-str+1) : -2;
|
|
}
|
|
|
|
void Message::commonEncode(String& str) const
|
|
{
|
|
str << msgEscape() << ":" << m_return.msgEscape();
|
|
unsigned n = length();
|
|
for (unsigned i = 0; i < n; i++) {
|
|
NamedString *s = getParam(i);
|
|
if (s)
|
|
str << ":" << s->name().msgEscape('=') << "=" << s->msgEscape();
|
|
}
|
|
}
|
|
|
|
int Message::commonDecode(const char* str, int offs)
|
|
{
|
|
str += offs;
|
|
// locate SEP after name
|
|
const char *sep = ::strchr(str,':');
|
|
if (!sep)
|
|
return offs;
|
|
String chunk(str,sep-str);
|
|
int err = -1;
|
|
chunk = chunk.msgUnescape(&err);
|
|
if (err >= 0)
|
|
return offs+err;
|
|
if (!chunk.null())
|
|
*this = chunk;
|
|
offs += (sep-str+1);
|
|
str = sep+1;
|
|
// locate SEP or EOL after retval
|
|
sep = ::strchr(str,':');
|
|
if (sep)
|
|
chunk.assign(str,sep-str);
|
|
else
|
|
chunk.assign(str);
|
|
chunk = chunk.msgUnescape(&err);
|
|
if (err >= 0)
|
|
return offs+err;
|
|
m_return = chunk;
|
|
// find and assign name=value pairs
|
|
while (sep) {
|
|
offs += (sep-str+1);
|
|
str = sep+1;
|
|
sep = ::strchr(str,':');
|
|
if (sep)
|
|
chunk.assign(str,sep-str);
|
|
else
|
|
chunk.assign(str);
|
|
if (chunk.null())
|
|
continue;
|
|
chunk = chunk.msgUnescape(&err);
|
|
if (err >= 0)
|
|
return offs+err;
|
|
int pos = chunk.find('=');
|
|
switch (pos) {
|
|
case -1:
|
|
clearParam(chunk);
|
|
break;
|
|
case 0:
|
|
return offs+err;
|
|
default:
|
|
setParam(chunk.substr(0,pos),chunk.substr(pos+1));
|
|
}
|
|
}
|
|
return -2;
|
|
}
|
|
|
|
|
|
MessageHandler::MessageHandler(const char* name, unsigned priority,
|
|
const char* trackName, bool addPriority)
|
|
: String(name),
|
|
m_trackName(trackName), m_priority(priority),
|
|
m_unsafe(0), m_dispatcher(0), m_filter(0)
|
|
{
|
|
DDebug(DebugAll,"MessageHandler::MessageHandler('%s',%u,'%s',%s) [%p]",
|
|
name,priority,trackName,String::boolText(addPriority),this);
|
|
if (addPriority && m_trackName)
|
|
m_trackName << ":" << priority;
|
|
}
|
|
|
|
MessageHandler::~MessageHandler()
|
|
{
|
|
DDebug(DebugAll,"MessageHandler::~MessageHandler() '%s', %u [%p]",
|
|
safe(),m_priority,this);
|
|
cleanup();
|
|
}
|
|
|
|
void MessageHandler::cleanup()
|
|
{
|
|
if (m_dispatcher) {
|
|
m_dispatcher->uninstall(this);
|
|
m_dispatcher = 0;
|
|
}
|
|
clearFilter();
|
|
}
|
|
|
|
void MessageHandler::destruct()
|
|
{
|
|
cleanup();
|
|
String::destruct();
|
|
}
|
|
|
|
void MessageHandler::safeNow()
|
|
{
|
|
Lock lock(m_dispatcher);
|
|
// when the unsafe counter reaches zero we're again safe to destroy
|
|
m_unsafe--;
|
|
}
|
|
|
|
bool MessageHandler::receivedInternal(Message& msg)
|
|
{
|
|
bool ok = received(msg);
|
|
safeNow();
|
|
return ok;
|
|
}
|
|
|
|
void MessageHandler::setFilter(NamedString* filter)
|
|
{
|
|
clearFilter();
|
|
m_filter = filter;
|
|
}
|
|
|
|
void MessageHandler::clearFilter()
|
|
{
|
|
if (m_filter) {
|
|
NamedString* tmp = m_filter;
|
|
m_filter = 0;
|
|
delete tmp;
|
|
}
|
|
}
|
|
|
|
|
|
bool MessageRelay::receivedInternal(Message& msg)
|
|
{
|
|
MessageReceiver* receiver = m_receiver;
|
|
int id = m_id;
|
|
safeNow();
|
|
return receiver && receiver->received(msg,id);
|
|
}
|
|
|
|
|
|
MessageDispatcher::MessageDispatcher(const char* trackParam)
|
|
: Mutex(false,"MessageDispatcher"),
|
|
m_hookMutex(false,"PostHooks"),
|
|
m_msgAppend(&m_messages), m_hookAppend(&m_hooks),
|
|
m_trackParam(trackParam), m_changes(0), m_warnTime(0),
|
|
m_hookCount(0), m_hookHole(false)
|
|
{
|
|
XDebug(DebugInfo,"MessageDispatcher::MessageDispatcher('%s') [%p]",trackParam,this);
|
|
}
|
|
|
|
MessageDispatcher::~MessageDispatcher()
|
|
{
|
|
XDebug(DebugInfo,"MessageDispatcher::~MessageDispatcher() [%p]",this);
|
|
lock();
|
|
clear();
|
|
unlock();
|
|
}
|
|
|
|
bool MessageDispatcher::install(MessageHandler* handler)
|
|
{
|
|
DDebug(DebugAll,"MessageDispatcher::install(%p)",handler);
|
|
if (!handler)
|
|
return false;
|
|
Lock lock(this);
|
|
ObjList *l = m_handlers.find(handler);
|
|
if (l)
|
|
return false;
|
|
unsigned p = handler->priority();
|
|
int pos = 0;
|
|
for (l=&m_handlers; l; l=l->next(),pos++) {
|
|
MessageHandler *h = static_cast<MessageHandler *>(l->get());
|
|
if (!h)
|
|
continue;
|
|
if (h->priority() < p)
|
|
continue;
|
|
if (h->priority() > p)
|
|
break;
|
|
// at the same priority we sort them in pointer address order
|
|
if (h > handler)
|
|
break;
|
|
}
|
|
m_changes++;
|
|
if (l) {
|
|
XDebug(DebugAll,"Inserting handler [%p] on place #%d",handler,pos);
|
|
l->insert(handler);
|
|
}
|
|
else {
|
|
XDebug(DebugAll,"Appending handler [%p] on place #%d",handler,pos);
|
|
m_handlers.append(handler);
|
|
}
|
|
handler->m_dispatcher = this;
|
|
if (handler->null())
|
|
Debug(DebugInfo,"Registered broadcast message handler %p",handler);
|
|
return true;
|
|
}
|
|
|
|
bool MessageDispatcher::uninstall(MessageHandler* handler)
|
|
{
|
|
DDebug(DebugAll,"MessageDispatcher::uninstall(%p)",handler);
|
|
lock();
|
|
handler = static_cast<MessageHandler *>(m_handlers.remove(handler,false));
|
|
if (handler) {
|
|
m_changes++;
|
|
if (handler->m_unsafe > 0) {
|
|
DDebug(DebugNote,"Waiting for unsafe MessageHandler %p '%s'",
|
|
handler,handler->c_str());
|
|
// wait until handler is again safe to destroy
|
|
do {
|
|
unlock();
|
|
Thread::yield();
|
|
lock();
|
|
} while (handler->m_unsafe > 0);
|
|
}
|
|
if (handler->m_unsafe != 0)
|
|
Debug(DebugFail,"MessageHandler %p has unsafe=%d",handler,handler->m_unsafe);
|
|
handler->m_dispatcher = 0;
|
|
}
|
|
unlock();
|
|
return (handler != 0);
|
|
}
|
|
|
|
bool MessageDispatcher::dispatch(Message& msg)
|
|
{
|
|
#ifdef XDEBUG
|
|
Debugger debug("MessageDispatcher::dispatch","(%p) (\"%s\")",&msg,msg.c_str());
|
|
#endif
|
|
|
|
u_int64_t t = m_warnTime ? Time::now() : 0;
|
|
|
|
bool retv = false;
|
|
ObjList *l = &m_handlers;
|
|
Lock mylock(this);
|
|
for (; l; l=l->next()) {
|
|
MessageHandler *h = static_cast<MessageHandler*>(l->get());
|
|
if (h && (h->null() || *h == msg)) {
|
|
if (h->filter() && (*(h->filter()) != msg.getValue(h->filter()->name())))
|
|
continue;
|
|
unsigned int c = m_changes;
|
|
unsigned int p = h->priority();
|
|
if (trackParam() && h->trackName()) {
|
|
NamedString* tracked = msg.getParam(trackParam());
|
|
if (tracked)
|
|
tracked->append(h->trackName(),",");
|
|
else
|
|
msg.addParam(trackParam(),h->trackName());
|
|
}
|
|
// mark handler as unsafe to destroy / uninstall
|
|
h->m_unsafe++;
|
|
mylock.drop();
|
|
|
|
u_int64_t tm = m_warnTime ? Time::now() : 0;
|
|
|
|
retv = h->receivedInternal(msg) || retv;
|
|
|
|
if (tm) {
|
|
tm = Time::now() - tm;
|
|
if (tm > m_warnTime) {
|
|
mylock.acquire(this);
|
|
const char* name = (c == m_changes) ? h->trackName().c_str() : 0;
|
|
Debug(DebugInfo,"Message '%s' [%p] passed through %p%s%s%s in " FMT64U " usec",
|
|
msg.c_str(),&msg,h,
|
|
(name ? " '" : ""),(name ? name : ""),(name ? "'" : ""),tm);
|
|
}
|
|
}
|
|
|
|
if (retv && !msg.broadcast())
|
|
break;
|
|
mylock.acquire(this);
|
|
if (c == m_changes)
|
|
continue;
|
|
// the handler list has changed - find again
|
|
NDebug(DebugAll,"Rescanning handler list for '%s' [%p] at priority %u",
|
|
msg.c_str(),&msg,p);
|
|
ObjList* l2 = &m_handlers;
|
|
for (l = l2; l; l=l->next()) {
|
|
MessageHandler *mh = static_cast<MessageHandler*>(l->get());
|
|
if (!mh)
|
|
continue;
|
|
if (mh == h)
|
|
// exact match - silently continue where we left
|
|
break;
|
|
|
|
// gone past last handler priority - exit with last handler
|
|
if ((mh->priority() > p) || ((mh->priority() == p) && (mh > h))) {
|
|
Debug(DebugAll,"Handler list for '%s' [%p] changed, skipping from %p (%u) to %p (%u)",
|
|
msg.c_str(),&msg,h,p,mh,mh->priority());
|
|
// l will advance in the outer for loop so use previous
|
|
l = l2;
|
|
break;
|
|
}
|
|
l2 = l;
|
|
}
|
|
if (!l)
|
|
break;
|
|
}
|
|
}
|
|
mylock.drop();
|
|
msg.dispatched(retv);
|
|
|
|
if (t) {
|
|
t = Time::now() - t;
|
|
if (t > m_warnTime) {
|
|
unsigned n = msg.length();
|
|
String p;
|
|
p << "\r\n retval='" << msg.retValue().safe("(null)") << "'";
|
|
for (unsigned i = 0; i < n; i++) {
|
|
NamedString *s = msg.getParam(i);
|
|
if (s)
|
|
p << "\r\n param['" << s->name() << "'] = '" << *s << "'";
|
|
}
|
|
Debug("Performance",DebugMild,"Message %p '%s' returned %s in " FMT64U " usec%s",
|
|
&msg,msg.c_str(),retv ? "true" : "false",t,p.safe());
|
|
}
|
|
}
|
|
|
|
m_hookMutex.lock();
|
|
if (m_hookHole && !m_hookCount) {
|
|
// compact the list, remove the holes
|
|
for (l = &m_hooks; l; l = l->next()) {
|
|
while (!l->get()) {
|
|
if (!l->next())
|
|
break;
|
|
if (l->next() == m_hookAppend)
|
|
m_hookAppend = &m_hooks;
|
|
l->remove();
|
|
}
|
|
}
|
|
m_hookHole = false;
|
|
}
|
|
m_hookCount++;
|
|
for (l = m_hooks.skipNull(); l; l = l->skipNext()) {
|
|
RefPointer<MessagePostHook> ph = static_cast<MessagePostHook*>(l->get());
|
|
if (ph) {
|
|
m_hookMutex.unlock();
|
|
ph->dispatched(msg,retv);
|
|
ph = 0;
|
|
m_hookMutex.lock();
|
|
}
|
|
}
|
|
m_hookCount--;
|
|
m_hookMutex.unlock();
|
|
|
|
return retv;
|
|
}
|
|
|
|
bool MessageDispatcher::enqueue(Message* msg)
|
|
{
|
|
Lock lock(this);
|
|
if (!msg || m_messages.find(msg))
|
|
return false;
|
|
m_msgAppend = m_msgAppend->append(msg);
|
|
return true;
|
|
}
|
|
|
|
bool MessageDispatcher::dequeueOne()
|
|
{
|
|
lock();
|
|
if (m_messages.next() == m_msgAppend)
|
|
m_msgAppend = &m_messages;
|
|
Message* msg = static_cast<Message *>(m_messages.remove(false));
|
|
unlock();
|
|
if (!msg)
|
|
return false;
|
|
dispatch(*msg);
|
|
msg->destruct();
|
|
return true;
|
|
}
|
|
|
|
void MessageDispatcher::dequeue()
|
|
{
|
|
while (dequeueOne())
|
|
;
|
|
}
|
|
|
|
unsigned int MessageDispatcher::messageCount()
|
|
{
|
|
Lock lock(this);
|
|
return m_messages.count();
|
|
}
|
|
|
|
unsigned int MessageDispatcher::handlerCount()
|
|
{
|
|
Lock lock(this);
|
|
return m_handlers.count();
|
|
}
|
|
|
|
unsigned int MessageDispatcher::postHookCount()
|
|
{
|
|
Lock lock(m_hookMutex);
|
|
return m_hooks.count();
|
|
}
|
|
|
|
void MessageDispatcher::setHook(MessagePostHook* hook, bool remove)
|
|
{
|
|
m_hookMutex.lock();
|
|
if (remove) {
|
|
// zero the hook, we'll compact it later when safe
|
|
ObjList* l = m_hooks.find(hook);
|
|
if (l) {
|
|
l->set(0,false);
|
|
m_hookHole = true;
|
|
}
|
|
}
|
|
else
|
|
m_hookAppend = m_hookAppend->append(hook);
|
|
m_hookMutex.unlock();
|
|
}
|
|
|
|
|
|
MessageNotifier::~MessageNotifier()
|
|
{
|
|
}
|
|
|
|
/**
|
|
* class MessageQueue
|
|
*/
|
|
|
|
static const char* s_queueMutexName = "MessageQueue";
|
|
|
|
MessageQueue::MessageQueue(const char* queueName, int numWorkers)
|
|
: Mutex(true,s_queueMutexName), m_filters(queueName), m_count(0)
|
|
{
|
|
XDebug(DebugAll,"Creating MessageQueue for %s",queueName);
|
|
for (int i = 0;i < numWorkers;i ++) {
|
|
QueueWorker* worker = new QueueWorker(this);
|
|
worker->startup();
|
|
m_workers.append(worker);
|
|
}
|
|
m_append = &m_messages;
|
|
}
|
|
|
|
void MessageQueue::received(Message& msg)
|
|
{
|
|
Engine::dispatch(msg);
|
|
}
|
|
|
|
MessageQueue::~MessageQueue()
|
|
{
|
|
XDebug(DebugAll,"Destroying MessageQueue for %s",m_filters.c_str());
|
|
}
|
|
|
|
void MessageQueue::clear()
|
|
{
|
|
Lock myLock(this);
|
|
for (ObjList* o = m_workers.skipNull();o;o = o->skipNext()) {
|
|
QueueWorker* worker = static_cast<QueueWorker*>(o->get());
|
|
worker->cancel();
|
|
o->setDelete(false);
|
|
}
|
|
m_workers.clear();
|
|
m_messages.clear();
|
|
}
|
|
|
|
bool MessageQueue::enqueue(Message* msg)
|
|
{
|
|
if (!msg)
|
|
return false;
|
|
Lock myLock(this);
|
|
m_append = m_append->append(msg);
|
|
m_count++;
|
|
return true;
|
|
}
|
|
|
|
bool MessageQueue::dequeue()
|
|
{
|
|
Lock myLock(this);
|
|
ObjList* o = m_messages.skipNull();
|
|
if (!o)
|
|
return false;
|
|
if (m_messages.next() == m_append)
|
|
m_append = &m_messages;
|
|
Message* msg = static_cast<Message*>(m_messages.remove(false));
|
|
if (!msg)
|
|
return false;
|
|
m_count--;
|
|
myLock.drop();
|
|
received(*msg);
|
|
TelEngine::destruct(msg);
|
|
return true;
|
|
}
|
|
|
|
void MessageQueue::addFilter(const char* name, const char* value)
|
|
{
|
|
Lock myLock(this);
|
|
m_filters.setParam(name,value);
|
|
}
|
|
|
|
void MessageQueue::removeFilter(const String& name)
|
|
{
|
|
Lock myLock(this);
|
|
m_filters.clearParam(name);
|
|
}
|
|
|
|
bool MessageQueue::matchesFilter(const Message& msg)
|
|
{
|
|
Lock myLock(this);
|
|
if (msg != m_filters)
|
|
return false;
|
|
for (unsigned int i = 0;i < m_filters.length();i++) {
|
|
NamedString* param = m_filters.getParam(i);
|
|
if (!param)
|
|
continue;
|
|
NamedString* match = msg.getParam(param->name());
|
|
if (!match || *match != *param)
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
void MessageQueue::removeThread(Thread* thread)
|
|
{
|
|
if (!thread)
|
|
return;
|
|
Lock myLock(this);
|
|
m_workers.remove((GenObject*)thread,false);
|
|
}
|
|
|
|
/**
|
|
* class QueueWorker
|
|
*/
|
|
|
|
QueueWorker::~QueueWorker()
|
|
{
|
|
if (m_queue)
|
|
m_queue->removeThread(this);
|
|
m_queue = 0;
|
|
}
|
|
|
|
void QueueWorker::run()
|
|
{
|
|
if (!m_queue)
|
|
return;
|
|
while (true) {
|
|
if (!m_queue->count()) {
|
|
Thread::idle(true);
|
|
continue;
|
|
}
|
|
m_queue->dequeue();
|
|
Thread::check(true);
|
|
}
|
|
}
|
|
|
|
|
|
/* vi: set ts=8 sw=4 sts=4 noet: */
|