Added MessageHook and MessageQueue classes.

git-svn-id: http://yate.null.ro/svn/yate/trunk@5455 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
andrei 2013-04-12 13:54:55 +00:00
parent c704ff9ace
commit ef6d1f89ae
3 changed files with 324 additions and 3 deletions

View File

@ -237,6 +237,8 @@ static Mutex s_eventsMutex(false,"EventsList");
static ObjList s_events;
static String s_startMsg;
static SharedVars s_vars;
static Mutex s_hooksMutex(true,"HooksList");
static ObjList s_hooks;
const TokenDict Engine::s_callAccept[] = {
{"accept", Engine::Accept},
@ -1515,6 +1517,12 @@ int Engine::engineCleanup()
CapturedEvent::capturing(false);
setStatus(SERVICE_STOP_PENDING);
::signal(SIGINT,SIG_DFL);
Lock myLock(s_hooksMutex);
for (ObjList* o = s_hooks.skipNull();o;o = o->skipNext()) {
MessageHook* mh = static_cast<MessageHook*>(o->get());
mh->clear();
}
myLock.drop();
dispatch("engine.halt",true);
checkPoint();
Thread::msleep(200);
@ -1839,9 +1847,23 @@ bool Engine::uninstall(MessageHandler* handler)
return s_self ? s_self->m_dispatcher.uninstall(handler) : false;
}
bool Engine::enqueue(Message* msg)
bool Engine::enqueue(Message* msg, bool skipHooks)
{
return (msg && s_self) ? s_self->m_dispatcher.enqueue(msg) : false;
if (!msg)
return false;
if (!skipHooks) {
Lock myLock(s_hooksMutex);
for (ObjList* o = s_hooks.skipNull();o;o = o->skipNext()) {
MessageHook* hook = static_cast<MessageHook*>(o->get());
if (!hook || !hook->matchesFilter(*msg))
continue;
RefPointer<MessageHook> rhook = hook;
myLock.drop();
rhook->enqueue(msg);
return true;
}
}
return s_self ? s_self->m_dispatcher.enqueue(msg) : false;
}
bool Engine::dispatch(Message* msg)
@ -1864,6 +1886,24 @@ bool Engine::dispatch(const char* name, bool broadcast)
return s_self->m_dispatcher.dispatch(msg);
}
bool Engine::installHook(MessageHook* hook)
{
Lock myLock(s_hooksMutex);
if (!hook || s_hooks.find(hook))
return false;
s_hooks.append(hook);
return true;
}
void Engine::uninstallHook(MessageHook* hook)
{
if (!hook)
return;
Lock myLock(s_hooksMutex);
hook->clear();
s_hooks.remove(hook);
}
unsigned int Engine::runId()
{
return s_runid;

View File

@ -25,6 +25,19 @@
using namespace TelEngine;
class QueueWorker : public GenObject, public Thread
{
public:
inline QueueWorker(MessageQueue* queue)
: Thread("MessageQueueWorker"),m_queue(queue)
{}
virtual ~QueueWorker();
protected:
virtual void run();
private:
RefPointer<MessageQueue> m_queue;
};
Message::Message(const char* name, const char* retval, bool broadcast)
: NamedList(name),
m_return(retval), m_data(0), m_notify(false), m_broadcast(broadcast)
@ -519,4 +532,134 @@ MessageNotifier::~MessageNotifier()
{
}
/**
* class MessageQueue
*/
static const char* s_queueMutexName = "MessageQueue";
MessageQueue::MessageQueue(const char* queueName, int numWorkers)
: Mutex(true,s_queueMutexName), m_filters(queueName), m_count(0)
{
XDebug(DebugAll,"Creating MessageQueue for %s",queueName);
for (int i = 0;i < numWorkers;i ++) {
QueueWorker* worker = new QueueWorker(this);
worker->startup();
m_workers.append(worker);
}
m_append = &m_messages;
}
void MessageQueue::received(Message& msg)
{
Engine::dispatch(msg);
}
MessageQueue::~MessageQueue()
{
XDebug(DebugAll,"Destroying MessageQueue for %s",m_filters.c_str());
}
void MessageQueue::clear()
{
Lock myLock(this);
for (ObjList* o = m_workers.skipNull();o;o = o->skipNext()) {
QueueWorker* worker = static_cast<QueueWorker*>(o->get());
worker->cancel();
o->setDelete(false);
}
m_workers.clear();
m_messages.clear();
}
bool MessageQueue::enqueue(Message* msg)
{
if (!msg)
return false;
Lock myLock(this);
m_append = m_append->append(msg);
m_count++;
return true;
}
bool MessageQueue::dequeue()
{
Lock myLock(this);
ObjList* o = m_messages.skipNull();
if (!o)
return false;
if (m_messages.next() == m_append)
m_append = &m_messages;
Message* msg = static_cast<Message*>(m_messages.remove(false));
if (!msg)
return false;
m_count--;
myLock.drop();
received(*msg);
TelEngine::destruct(msg);
return true;
}
void MessageQueue::addFilter(const char* name, const char* value)
{
Lock myLock(this);
m_filters.setParam(name,value);
}
void MessageQueue::removeFilter(const String& name)
{
Lock myLock(this);
m_filters.clearParam(name);
}
bool MessageQueue::matchesFilter(const Message& msg)
{
Lock myLock(this);
if (msg != m_filters)
return false;
for (unsigned int i = 0;i < m_filters.length();i++) {
NamedString* param = m_filters.getParam(i);
if (!param)
continue;
NamedString* match = msg.getParam(param->name());
if (!match || *match != *param)
return false;
}
return true;
}
void MessageQueue::removeThread(Thread* thread)
{
if (!thread)
return;
Lock myLock(this);
m_workers.remove((GenObject*)thread,false);
}
/**
* class QueueWorker
*/
QueueWorker::~QueueWorker()
{
if (m_queue)
m_queue->removeThread(this);
m_queue = 0;
}
void QueueWorker::run()
{
if (!m_queue)
return;
while (true) {
if (!m_queue->count()) {
Thread::idle(true);
continue;
}
m_queue->dequeue();
Thread::check(true);
}
}
/* vi: set ts=8 sw=4 sts=4 noet: */

View File

@ -790,6 +790,130 @@ private:
u_int64_t m_warnTime;
};
/**
* Abstract class for message hook
* @short Abstract message hook
*/
class YATE_API MessageHook : public RefObject
{
public:
/**
* Try to enqueue a message to this hook's queue
* @param msg The message to enqueue
* @return True if the message was enqueued.
*/
virtual bool enqueue(Message* msg) = 0;
/**
* Clear this hook data
*/
virtual void clear() = 0;
/**
* Check if the given message can be inserted in this queue
* @param msg The message to check
* @return True if the message can be inserted in this queue
*/
virtual bool matchesFilter(const Message& msg) = 0;
};
/**
* MessageQueue class allows to create a private queue for a message who matches
* the specified filters.
* @short A message queue
*/
class YATE_API MessageQueue : public MessageHook, public Mutex
{
friend class Engine;
public:
/**
* Creates a new message queue.
* @param hookName Name of the message served by this queue
* @param numWorkers The number of workers who serve this queue
*/
MessageQueue(const char* hookName, int numWorkers = 0);
/**
* Destroys the message queue
*/
~MessageQueue();
/**
* Append a message in the queue
* @param msg The message to enqueue, will be destroyed after the processing is done
* @return True if successfully queued, false otherwise
*/
virtual bool enqueue(Message* msg);
/**
* Process a message from the waiting queue
* @return False if the message queue is empty
*/
bool dequeue();
/**
* Add a new filter to this queue
* @param name The filter name
* @param value The filter value
*/
void addFilter(const char* name, const char* value);
/**
* Remove a filter form this queue
* @param name The filter name
*/
void removeFilter(const String& name);
/**
* Clear private data
*/
virtual void clear();
/**
* Remove a thread from workers list
* @param thread The thread to remove
*/
void removeThread(Thread* thread);
/**
* Helper method to obtain the number of unprocessed messages in the queue
* @return The number of queued messages.
*/
inline unsigned int count() const
{ return m_count; }
/**
* Obtain the filter list for this queue
* @return The filter list
*/
inline const NamedList& getFilters() const
{ return m_filters; }
/**
* Check if the given message can be inserted in this queue
* @param msg The message to check
* @return True if the message can be inserted in this queue
*/
virtual bool matchesFilter(const Message& msg);
protected:
/**
* Callback method for message processing
* Default calls Engine::dispatch
* @param msg The message to process
*/
virtual void received(Message& msg);
private:
NamedList m_filters;
ObjList m_messages;
ObjList m_workers;
ObjList* m_append;
unsigned int m_count;
};
/**
* Initialization and information about plugins.
* Plugins are located in @em shared libraries that are loaded at runtime.
@ -1198,9 +1322,10 @@ public:
/**
* Enqueue a message in the message queue for asynchronous dispatching
* @param msg The message to enqueue, will be destroyed after dispatching
* @param skipHooks True to append the message directly into the main queue
* @return True if enqueued, false on error (already queued)
*/
static bool enqueue(Message* msg);
static bool enqueue(Message* msg, bool skipHooks = false);
/**
* Convenience function.
@ -1250,6 +1375,19 @@ public:
inline static const String& trackParam()
{ return s_self ? s_self->m_dispatcher.trackParam() : String::empty(); }
/**
* Appends a new message hook to the hooks list.
* @param hook The message hook to append.
* @return True if the message hook was successfully appended to the hooks list
*/
static bool installHook(MessageHook* hook);
/**
* Remove a message hook from the hooks list.
* @param hook The hook to remove.
*/
static void uninstallHook(MessageHook* hook);
/**
* Get a count of plugins that are actively in use
* @return Count of plugins in use