Use separate RWLock(s) (for handlers, queue, posthooks) in message dispatcher.

Added command (status dispatcher handlers <name>) used to show installed handlers info.
This commit is contained in:
marian 2023-05-26 14:56:22 +03:00
parent b4af14c418
commit a590b6083f
3 changed files with 174 additions and 74 deletions

View File

@ -544,6 +544,29 @@ bool EngineStatusHandler::received(Message &msg)
objects(msg.retValue(),details);
return true;
}
if (sel.startSkip("dispatcher")) {
bool byMsg = sel.startSkip("handlers");
if ((byMsg || sel.startSkip("handlers-trackname")) && sel) {
String str;
unsigned int count = 0;
unsigned int total = 0;
MessageDispatcher* d = Engine::dispatcher();
if (d) {
if (sel[0] == '^')
count = d->fillHandlersInfo(byMsg,Regexp(sel),details ? &str : 0,&total);
else
count = d->fillHandlersInfo(byMsg,sel,details ? &str : 0,&total);
}
msg.retValue()
<< "name=dispatcher,type=system,format=Priority|TrackName|Filtered;"
<< "handlers=" << total << ",count=" << count;
if (details)
msg.retValue() << ';' << str;
msg.retValue() << "\r\n";
return true;
}
return false;
}
return false;
}
msg.retValue() << "name=engine,type=system";
@ -651,6 +674,8 @@ static const char s_runpOpt[] = " runparam name=value\r\n";
static const char s_runpMsg[] = "Add a new parameter to the Engine's runtime list\r\n";
static const char s_dispatcherOpt[] = " dispatcher {trace_msg_time|trace_msg_handler_time} <on|off>\r\n";
static const char s_dispatcherMsg[] = "Enable or disable dispatcher debugging options\r\n";
static const char s_dispatcherStatusOpt[] = " status dispatcher {handlers|handlers-trackname} <match>\r\n";
static const char s_dispatcherStatusMsg[] = "Show installed handlers by message name or track name. Matching value starting with ^ is handled as basic regular expression\r\n";
// get the base name of a module file
static String moduleBase(const String& fname)
@ -753,27 +778,32 @@ void completeModule(String& ret, const String& part, ObjList& mods, bool reload,
void EngineCommand::doCompletion(Message &msg, const String& partLine, const String& partWord)
{
if (partLine.null() || (partLine == YSTRING("help"))) {
completeOne(msg.retValue(),"module",partWord);
completeOne(msg.retValue(),"events",partWord);
completeOne(msg.retValue(),"logview",partWord);
completeOne(msg.retValue(),"runparam",partWord);
completeOne(msg.retValue(),"dispatcher",partWord);
completeOne(msg.retValue(),YSTRING("module"),partWord);
completeOne(msg.retValue(),YSTRING("events"),partWord);
completeOne(msg.retValue(),YSTRING("logview"),partWord);
completeOne(msg.retValue(),YSTRING("runparam"),partWord);
completeOne(msg.retValue(),YSTRING("dispatcher"),partWord);
}
else if (partLine == YSTRING("status")) {
completeOne(msg.retValue(),"engine",partWord);
completeOne(msg.retValue(),"objects",partWord);
completeOne(msg.retValue(),YSTRING("engine"),partWord);
completeOne(msg.retValue(),YSTRING("objects"),partWord);
completeOne(msg.retValue(),YSTRING("dispatcher"),partWord);
}
else if (partLine == YSTRING("status objects")) {
for (ObjList* l = getObjCounters().skipNull();l;l = l->skipNext())
completeOne(msg.retValue(),l->get()->toString(),partWord);
}
else if (partLine == YSTRING("status dispatcher")) {
completeOne(msg.retValue(),YSTRING("handlers"),partWord);
completeOne(msg.retValue(),YSTRING("handlers-trackname"),partWord);
}
else if (partLine == YSTRING("module")) {
completeOne(msg.retValue(),"load",partWord);
completeOne(msg.retValue(),YSTRING("load"),partWord);
if (!s_nounload) {
completeOne(msg.retValue(),"unload",partWord);
completeOne(msg.retValue(),"reload",partWord);
completeOne(msg.retValue(),YSTRING("unload"),partWord);
completeOne(msg.retValue(),YSTRING("reload"),partWord);
}
completeOne(msg.retValue(),"list",partWord);
completeOne(msg.retValue(),YSTRING("list"),partWord);
}
else if (partLine == YSTRING("module load"))
completeModule(msg.retValue(),partWord,Engine::self()->m_libs,false);
@ -798,28 +828,29 @@ void EngineCommand::doCompletion(Message &msg, const String& partLine, const Str
const EngineEventList* e = static_cast<const EngineEventList*>(l->get());
completeOne(msg.retValue(),e->toString(),partWord);
}
completeOne(msg.retValue(),"log",partWord);
completeOne(msg.retValue(),YSTRING("log"),partWord);
if (partLine == YSTRING("events"))
completeOne(msg.retValue(),"clear",partWord);
completeOne(msg.retValue(),YSTRING("clear"),partWord);
}
else if (partLine == YSTRING("dispatcher")) {
completeOne(msg.retValue(),"trace_msg_time",partWord);
completeOne(msg.retValue(),"trace_msg_handler_time",partWord);
completeOne(msg.retValue(),YSTRING("trace_msg_time"),partWord);
completeOne(msg.retValue(),YSTRING("trace_msg_handler_time"),partWord);
}
else if ((partLine == YSTRING("dispatcher trace_msg_time"))
|| (partLine == YSTRING("dispatcher trace_msg_handler_time"))) {
completeOne(msg.retValue(),"on",partWord);
completeOne(msg.retValue(),"off",partWord);
completeOne(msg.retValue(),YSTRING("on"),partWord);
completeOne(msg.retValue(),YSTRING("off"),partWord);
}
}
bool EngineCommand::received(Message &msg)
{
String line = msg.getValue("line");
if (line.null()) {
const String& l = msg[YSTRING("line")];
if (!l) {
doCompletion(msg,msg.getValue("partline"),msg.getValue("partword"));
return false;
}
String line = l;
if (line.startSkip("control")) {
int pos = line.find(' ');
String id = line.substr(0,pos).trimBlanks();
@ -883,17 +914,19 @@ bool EngineCommand::received(Message &msg)
}
return false;
}
if (line.startSkip("dispatcher trace_msg_time")) {
MessageDispatcher* d = Engine::dispatcher();
if (d)
d->traceTime(line.toBoolean());
return 0 != d;
}
if (line.startSkip("dispatcher trace_msg_handler_time")) {
MessageDispatcher* d = Engine::dispatcher();
if (d)
d->traceHandlerTime(line.toBoolean());
return 0 != d;
if (line.startSkip("dispatcher")) {
bool traceMsgTime = line.startSkip("trace_msg_time");
if (traceMsgTime || line.startSkip("trace_msg_handler_time")) {
MessageDispatcher* d = Engine::dispatcher();
if (d) {
if (traceMsgTime)
d->traceTime(line.toBoolean());
else
d->traceHandlerTime(line.toBoolean());
return true;
}
}
return false;
}
return false;
}
@ -974,7 +1007,8 @@ bool EngineHelp::received(Message &msg)
else if (line == YSTRING("runparam"))
msg.retValue() << s_runpOpt << s_runpMsg;
else if (line == YSTRING("dispatcher"))
msg.retValue() << s_dispatcherOpt << s_dispatcherMsg;
msg.retValue() << s_dispatcherOpt << s_dispatcherMsg
<< s_dispatcherStatusOpt << s_dispatcherStatusMsg;
else
return false;
return true;

View File

@ -237,7 +237,7 @@ int Message::commonDecode(const char* str, int offs)
MessageHandler::MessageHandler(const char* name, unsigned priority,
const char* trackName, bool addPriority)
: String(name),
m_trackName(trackName), m_priority(priority),
m_trackName(trackName), m_trackNameOnly(trackName), m_priority(priority),
m_unsafe(0), m_dispatcher(0), m_filter(0), m_counter(0)
{
DDebug(DebugAll,"MessageHandler::MessageHandler('%s',%u,'%s',%s) [%p]",
@ -271,9 +271,12 @@ void MessageHandler::destruct()
void MessageHandler::safeNowInternal()
{
Lock lock(m_dispatcher);
WLock lck(m_dispatcher ? &m_dispatcher->handlersLock() : 0);
// when the unsafe counter reaches zero we're again safe to destroy
m_unsafe--;
if (m_unsafe < 0)
Debug(DebugFail,"MessageHandler(%s) unsafe=%d locked=%s dispatcher=(%p) [%p]",
safe(),m_unsafe,String::boolText(lck.locked()),m_dispatcher,this);
}
bool MessageHandler::receivedInternal(Message& msg)
@ -315,8 +318,8 @@ bool MessageRelay::receivedInternal(Message& msg)
MessageDispatcher::MessageDispatcher(const char* trackParam)
: Mutex(false,"MessageDispatcher"),
m_hookMutex(false,"PostHooks"),
: m_handlersLock("DispatcherHandlers"), m_messagesLock("DispatcherMsgs"),
m_hooksLock("DispatcherHooks"),
m_msgAppend(&m_messages), m_hookAppend(&m_hooks),
m_trackParam(trackParam), m_changes(0), m_warnTime(0),
m_enqueueCount(0), m_dequeueCount(0), m_dispatchCount(0),
@ -330,9 +333,16 @@ MessageDispatcher::MessageDispatcher(const char* trackParam)
MessageDispatcher::~MessageDispatcher()
{
XDebug(DebugInfo,"MessageDispatcher::~MessageDispatcher() [%p]",this);
lock();
clear();
unlock();
}
void MessageDispatcher::clear()
{
WLock lck(m_handlersLock);
m_handlers.clear();
lck.acquire(m_hooksLock);
m_hookAppend = &m_hooks;
m_hooks.clear();
}
bool MessageDispatcher::install(MessageHandler* handler)
@ -340,7 +350,7 @@ bool MessageDispatcher::install(MessageHandler* handler)
DDebug(DebugAll,"MessageDispatcher::install(%p)",handler);
if (!handler)
return false;
Lock lock(this);
WLock lck(m_handlersLock);
ObjList *l = m_handlers.find(handler);
if (l)
return false;
@ -376,7 +386,7 @@ bool MessageDispatcher::install(MessageHandler* handler)
bool MessageDispatcher::uninstall(MessageHandler* handler)
{
DDebug(DebugAll,"MessageDispatcher::uninstall(%p)",handler);
lock();
WLock lck(m_handlersLock);
handler = static_cast<MessageHandler *>(m_handlers.remove(handler,false));
if (handler) {
m_changes++;
@ -385,16 +395,15 @@ bool MessageDispatcher::uninstall(MessageHandler* handler)
handler,handler->c_str());
// wait until handler is again safe to destroy
do {
unlock();
lck.drop();
Thread::yield();
lock();
lck.acquire(m_handlersLock);
} while (handler->m_unsafe > 0);
}
if (handler->m_unsafe != 0)
Debug(DebugFail,"MessageHandler %p has unsafe=%d",handler,handler->m_unsafe);
handler->m_dispatcher = 0;
}
unlock();
return (handler != 0);
}
@ -420,7 +429,7 @@ bool MessageDispatcher::dispatch(Message& msg)
unsigned int hTrackPos = 0;
bool hTrackTime = m_traceHandlerTime;
ObjList *l = &m_handlers;
Lock mylock(this);
RLock lck(m_handlersLock);
m_dispatchCount++;
for (; l; l=l->next()) {
MessageHandler *h = static_cast<MessageHandler*>(l->get());
@ -445,7 +454,7 @@ bool MessageDispatcher::dispatch(Message& msg)
}
// mark handler as unsafe to destroy / uninstall
h->m_unsafe++;
mylock.drop();
lck.drop();
u_int64_t tm = (m_warnTime || hTrackTime) ? Time::now() : 0;
@ -454,7 +463,7 @@ bool MessageDispatcher::dispatch(Message& msg)
if (tm) {
tm = Time::now() - tm;
if (m_warnTime && tm > m_warnTime) {
mylock.acquire(this);
lck.acquire(m_handlersLock);
const char* name = (c == m_changes) ? h->trackName().c_str() : 0;
Debug(DebugInfo,"Message '%s' [%p] passed through %p%s%s%s in " FMT64U " usec",
msg.c_str(),&msg,h,
@ -480,7 +489,7 @@ bool MessageDispatcher::dispatch(Message& msg)
if (retv && !msg.broadcast())
break;
mylock.acquire(this);
lck.acquire(m_handlersLock);
if (c == m_changes)
continue;
// the handler list has changed - find again
@ -509,7 +518,7 @@ bool MessageDispatcher::dispatch(Message& msg)
break;
}
}
mylock.drop();
lck.drop();
if (counting)
Thread::setCurrentObjCounter(msg.getObjCounter());
msg.dispatched(retv);
@ -532,7 +541,7 @@ bool MessageDispatcher::dispatch(Message& msg)
}
}
m_hookMutex.lock();
lck.acquire(m_hooksLock);
if (m_hookHole && !m_hookCount) {
// compact the list, remove the holes
for (l = &m_hooks; l; l = l->next()) {
@ -550,16 +559,16 @@ bool MessageDispatcher::dispatch(Message& msg)
for (l = m_hooks.skipNull(); l; l = l->skipNext()) {
RefPointer<MessagePostHook> ph = static_cast<MessagePostHook*>(l->get());
if (ph) {
m_hookMutex.unlock();
lck.drop();
if (counting)
Thread::setCurrentObjCounter(ph->getObjCounter());
ph->dispatched(msg,retv);
ph = 0;
m_hookMutex.lock();
lck.acquire(m_hooksLock);
}
}
m_hookCount--;
m_hookMutex.unlock();
lck.drop();
if (counting)
Thread::setCurrentObjCounter(saved);
@ -568,7 +577,7 @@ bool MessageDispatcher::dispatch(Message& msg)
bool MessageDispatcher::enqueue(Message* msg)
{
Lock lock(this);
WLock lck(m_messagesLock);
if (!msg || m_messages.find(msg))
return false;
if (m_traceTime)
@ -582,19 +591,17 @@ bool MessageDispatcher::enqueue(Message* msg)
bool MessageDispatcher::dequeueOne()
{
lock();
WLock lck(m_messagesLock);
if (m_messages.next() == m_msgAppend)
m_msgAppend = &m_messages;
Message* msg = static_cast<Message *>(m_messages.remove(false));
if (msg) {
m_dequeueCount++;
uint64_t age = Time::now() - msg->msgTime();
if (age < 60000000)
m_msgAvgAge = (3 * m_msgAvgAge + age) >> 2;
}
unlock();
if (!msg)
return false;
m_dequeueCount++;
uint64_t age = Time::now() - msg->msgTime();
if (age < 60000000)
m_msgAvgAge = (3 * m_msgAvgAge + age) >> 2;
lck.drop();
dispatch(*msg);
msg->destruct();
return true;
@ -608,35 +615,35 @@ void MessageDispatcher::dequeue()
unsigned int MessageDispatcher::messageCount()
{
Lock lock(this);
RLock lck(m_messagesLock);
return (unsigned int)(m_enqueueCount - m_dequeueCount);
}
unsigned int MessageDispatcher::handlerCount()
{
Lock lock(this);
RLock lck(m_handlersLock);
return m_handlers.count();
}
unsigned int MessageDispatcher::postHookCount()
{
Lock lock(m_hookMutex);
RLock lck(m_hooksLock);
return m_hooks.count();
}
void MessageDispatcher::getStats(u_int64_t& enqueued, u_int64_t& dequeued, u_int64_t& dispatched, u_int64_t& queueMax)
{
lock();
RLock lck(m_messagesLock);
enqueued = m_enqueueCount;
dequeued = m_dequeueCount;
dispatched = m_dispatchCount;
queueMax = m_queuedMax;
unlock();
lck.acquire(m_handlersLock);
dispatched = m_dispatchCount;
}
void MessageDispatcher::setHook(MessagePostHook* hook, bool remove)
{
m_hookMutex.lock();
WLock lck(m_hooksLock);
if (remove) {
// zero the hook, we'll compact it later when safe
ObjList* l = m_hooks.find(hook);
@ -647,7 +654,30 @@ void MessageDispatcher::setHook(MessagePostHook* hook, bool remove)
}
else
m_hookAppend = m_hookAppend->append(hook);
m_hookMutex.unlock();
}
unsigned int MessageDispatcher::fillHandlersInfo(bool byName, const String& match,
String* details, unsigned int* total)
{
unsigned int n = 0;
unsigned int matched = 0;
String tmp;
RLock lck(m_handlersLock);
for (ObjList* o = m_handlers.skipNull(); o; o = o->skipNext()) {
n++;
MessageHandler *h = static_cast<MessageHandler*>(o->get());
if (!match.matches(byName ? (const String&)(*h): h->trackNameOnly()))
continue;
matched++;
if (!details)
continue;
tmp.printf("%s=%u|%s|%s",h->safe(),h->priority(),h->trackNameOnly().safe(),
h->filter() ? "yes" : "no");
details->append(tmp,",");
}
if (total)
*total = n;
return matched;
}

View File

@ -639,13 +639,29 @@ public:
inline const String& trackName() const
{ return m_trackName; }
/**
* Retrieve the tracking name of this handler without added priority
* @return Name that is to be used in tracking operation
*/
inline const String& trackNameOnly() const
{ return m_trackNameOnly; }
/**
* Set a new tracking name for this handler.
* Works only if the handler was not yet inserted into a dispatcher
* @param name Name that is to be used in tracking operation
*/
inline void trackName(const char* name)
{ if (!m_dispatcher) m_trackName = name; }
inline void trackName(const char* name) {
if (m_dispatcher)
return;
m_trackName = name;
String tmp;
tmp << ':' << priority();
if (m_trackName.endsWith(tmp))
m_trackNameOnly = m_trackName.substr(0,m_trackName.length() - tmp.length());
else
m_trackNameOnly = m_trackName;
}
/**
* Retrive the objects counter associated to this handler
@ -714,6 +730,7 @@ protected:
private:
String m_trackName;
String m_trackNameOnly;
unsigned m_priority;
int m_unsafe;
MessageDispatcher* m_dispatcher;
@ -826,7 +843,7 @@ class YATE_API MessagePostHook : public RefObject, public MessageNotifier
* messages that are typically dispatched by a separate thread.
* @short A message dispatching hub
*/
class YATE_API MessageDispatcher : public GenObject, public Mutex
class YATE_API MessageDispatcher : public GenObject
{
friend class Engine;
YNOCOPY(MessageDispatcher); // no automatic copies please
@ -921,8 +938,7 @@ public:
/**
* Clear all the message handlers and post-dispatch hooks
*/
inline void clear()
{ m_handlers.clear(); m_hookAppend = &m_hooks; m_hooks.clear(); }
void clear();
/**
* Check if there is at least one message in the queue
@ -999,6 +1015,13 @@ public:
u_int64_t messageAge(bool usec = false) const
{ return usec ? m_msgAvgAge : ((m_msgAvgAge + 500) / 1000); }
/**
* Retrieve the handlers list lock object
* @return Handlers list lock object reference
*/
inline RWLock& handlersLock()
{ return m_handlersLock; }
/**
* Retrieve all statistics counters
* @param enqueued Returns count of enqueued messages
@ -1015,6 +1038,17 @@ public:
*/
void setHook(MessagePostHook* hook, bool remove = false);
/**
* Fill handlers status info
* @param matchName True to match message name, false to match handler trackname
* @param match Value to match. May be a regular expression
* @param details Optional pointer to string to be filled with details
* @param total Optional pointer to data to be filled with total number of handlers
* @return The number of matched handlers
*/
unsigned int fillHandlersInfo(bool matchName, const String& match, String* details = 0,
unsigned int* total = 0);
protected:
/**
* Set the tracked parameter name
@ -1027,7 +1061,9 @@ private:
ObjList m_handlers;
ObjList m_messages;
ObjList m_hooks;
Mutex m_hookMutex;
RWLock m_handlersLock;
RWLock m_messagesLock;
RWLock m_hooksLock;
ObjList* m_msgAppend;
ObjList* m_hookAppend;
String m_trackParam;