From a27e7f1b9e42481dc098ca2c4556b2a5cbabc44e Mon Sep 17 00:00:00 2001 From: marian Date: Mon, 16 Aug 2021 08:02:49 +0000 Subject: [PATCH] Added support to track message enqueue/dispatch time. Added support to track the time spent by a message in handler. git-svn-id: http://yate.null.ro/svn/yate/trunk@6513 acf43c95-373e-0410-b603-e72c3f656dc1 --- conf.d/yate.conf.sample | 9 ++++++ engine/Engine.cpp | 31 +++++++++++++++++++- engine/Message.cpp | 58 +++++++++++++++++++++++++++++++++---- yatengine.h | 64 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 155 insertions(+), 7 deletions(-) diff --git a/conf.d/yate.conf.sample b/conf.d/yate.conf.sample index f5a2d62b..a9aeb44b 100644 --- a/conf.d/yate.conf.sample +++ b/conf.d/yate.conf.sample @@ -128,6 +128,15 @@ ; filtersniffparams=any caller=^123$ called=^123$ ;filtersniffparams= +; trace_msg_time: boolean: Instruct message dispatcher to set message event(s) time (enqueue / dispatch) +;trace_msg_time=no + +; trace_msg_handler_time: boolean: Enable message dispatcher to trace the time spent by a +; message handler to process a message +; Add the time in message handlers parameter +; The time will be added to message handler name as #. +;trace_msg_handler_time=no + [modules] ; This section should hold one line for each module whose loading behaviour diff --git a/engine/Engine.cpp b/engine/Engine.cpp index da817a04..a05e3474 100644 --- a/engine/Engine.cpp +++ b/engine/Engine.cpp @@ -632,6 +632,8 @@ static const char s_logvOpt[] = " logview\r\n"; static const char s_logvMsg[] = "Show log of engine startup and initialization process\r\n"; static const char s_runpOpt[] = " runparam name=value\r\n"; static const char s_runpMsg[] = "Add a new parameter to the Engine's runtime list\r\n"; +static const char s_dispatcherOpt[] = " dispatcher {trace_msg_time|trace_msg_handler_time} \r\n"; +static const char s_dispatcherMsg[] = "Enable or disable dispatcher debugging options\r\n"; // get the base name of a module file static String moduleBase(const String& fname) @@ -738,6 +740,7 @@ void EngineCommand::doCompletion(Message &msg, const String& partLine, const Str completeOne(msg.retValue(),"events",partWord); completeOne(msg.retValue(),"logview",partWord); completeOne(msg.retValue(),"runparam",partWord); + completeOne(msg.retValue(),"dispatcher",partWord); } else if (partLine == YSTRING("status")) { completeOne(msg.retValue(),"engine",partWord); @@ -782,6 +785,15 @@ void EngineCommand::doCompletion(Message &msg, const String& partLine, const Str if (partLine == YSTRING("events")) completeOne(msg.retValue(),"clear",partWord); } + else if (partLine == YSTRING("dispatcher")) { + completeOne(msg.retValue(),"trace_msg_time",partWord); + completeOne(msg.retValue(),"trace_msg_handler_time",partWord); + } + else if ((partLine == YSTRING("dispatcher trace_msg_time")) + || (partLine == YSTRING("dispatcher trace_msg_handler_time"))) { + completeOne(msg.retValue(),"on",partWord); + completeOne(msg.retValue(),"off",partWord); + } } bool EngineCommand::received(Message &msg) @@ -852,6 +864,19 @@ bool EngineCommand::received(Message &msg) } return true; } + return false; + } + if (line.startSkip("dispatcher trace_msg_time")) { + MessageDispatcher* d = Engine::dispatcher(); + if (d) + d->traceTime(line.toBoolean()); + return 0 != d; + } + if (line.startSkip("dispatcher trace_msg_handler_time")) { + MessageDispatcher* d = Engine::dispatcher(); + if (d) + d->traceHandlerTime(line.toBoolean()); + return 0 != d; } return false; } @@ -920,7 +945,7 @@ bool EngineHelp::received(Message &msg) const char* opts = (s_nounload ? s_cmdsOptNoUnload : s_cmdsOpt); String line = msg.getValue("line"); if (line.null()) { - msg.retValue() << opts << s_evtsOpt << s_logvOpt << s_runpOpt; + msg.retValue() << opts << s_evtsOpt << s_logvOpt << s_runpOpt << s_dispatcherOpt; return false; } if (line == YSTRING("module")) @@ -931,6 +956,8 @@ bool EngineHelp::received(Message &msg) msg.retValue() << s_logvOpt << s_logvMsg; else if (line == YSTRING("runparam")) msg.retValue() << s_runpOpt << s_runpMsg; + else if (line == YSTRING("dispatcher")) + msg.retValue() << s_dispatcherOpt << s_dispatcherMsg; else return false; return true; @@ -1555,6 +1582,8 @@ int Engine::engineInit() s_timejump = MIN_TIME_JUMP; s_timejump *= 1000; m_dispatcher.warnTime(1000*(u_int64_t)s_cfg.getIntValue("general","warntime")); + m_dispatcher.traceTime(s_cfg.getBoolValue("general","trace_msg_time")); + m_dispatcher.traceHandlerTime(s_cfg.getBoolValue("general","trace_msg_handler_time")); extraPath(clientMode() ? "client" : "server"); extraPath(s_cfg.getValue("general","extrapath")); diff --git a/engine/Message.cpp b/engine/Message.cpp index 24344865..295c8959 100644 --- a/engine/Message.cpp +++ b/engine/Message.cpp @@ -37,7 +37,8 @@ private: Message::Message(const char* name, const char* retval, bool broadcast) : NamedList(name), - m_return(retval), m_data(0), m_notify(false), m_broadcast(broadcast) + m_return(retval), m_timeEnqueue((uint64_t)0), m_timeDispatch((uint64_t)0), + m_data(0), m_notify(false), m_broadcast(broadcast) { XDebug(DebugAll,"Message::Message(\"%s\",\"%s\",%s) [%p]", name,retval,String::boolText(broadcast),this); @@ -46,7 +47,9 @@ Message::Message(const char* name, const char* retval, bool broadcast) Message::Message(const Message& original) : NamedList(original), m_return(original.retValue()), m_time(original.msgTime()), - m_data(0), m_notify(false), m_broadcast(original.broadcast()) + m_timeEnqueue(original.m_timeEnqueue), m_timeDispatch(original.m_timeDispatch), + m_data(0), + m_notify(false), m_broadcast(original.broadcast()) { XDebug(DebugAll,"Message::Message(&%p) [%p]",&original,this); } @@ -54,7 +57,9 @@ Message::Message(const Message& original) Message::Message(const Message& original, bool broadcast) : NamedList(original), m_return(original.retValue()), m_time(original.msgTime()), - m_data(0), m_notify(false), m_broadcast(broadcast) + m_timeEnqueue(original.m_timeEnqueue), m_timeDispatch(original.m_timeDispatch), + m_data(0), + m_notify(false), m_broadcast(broadcast) { XDebug(DebugAll,"Message::Message(&%p,%s) [%p]", &original,String::boolText(broadcast),this); @@ -95,6 +100,14 @@ void Message::dispatched(bool accepted) hook->dispatched(*this,accepted); } +void Message::resetMsg(Time tm) +{ + m_return.clear(); + m_time = m_timeEnqueue = m_timeDispatch = tm; + if (Engine::trackParam()) + clearParam(Engine::trackParam()); +} + String Message::encode(const char* id) const { String s("%%>message:"); @@ -304,6 +317,7 @@ MessageDispatcher::MessageDispatcher(const char* trackParam) m_trackParam(trackParam), m_changes(0), m_warnTime(0), m_enqueueCount(0), m_dequeueCount(0), m_dispatchCount(0), m_queuedMax(0), m_msgAvgAge(0), + m_traceTime(false), m_traceHandlerTime(false), m_hookCount(0), m_hookHole(false) { XDebug(DebugInfo,"MessageDispatcher::MessageDispatcher('%s') [%p]",trackParam,this); @@ -386,11 +400,21 @@ bool MessageDispatcher::dispatch(Message& msg) Debugger debug("MessageDispatcher::dispatch","(%p) (\"%s\")",&msg,msg.c_str()); #endif - u_int64_t t = m_warnTime ? Time::now() : 0; + u_int64_t t = 0; + if (m_warnTime || m_traceTime) { + Time now; + if (m_warnTime) + t = now; + if (m_traceTime) + msg.m_timeDispatch = now; + } bool retv = false; bool counting = getObjCounting(); NamedCounter* saved = Thread::getCurrentObjCounter(counting); + String hTrackName; + unsigned int hTrackPos = 0; + bool hTrackTime = m_traceHandlerTime; ObjList *l = &m_handlers; Lock mylock(this); m_dispatchCount++; @@ -416,24 +440,44 @@ bool MessageDispatcher::dispatch(Message& msg) tracked->append(h->trackName(),","); else msg.addParam(trackParam(),h->trackName()); + if (hTrackTime) { + hTrackName = h->trackName(); + hTrackPos = tracked ? tracked->length() : hTrackName.length(); + } } // mark handler as unsafe to destroy / uninstall h->m_unsafe++; mylock.drop(); - u_int64_t tm = m_warnTime ? Time::now() : 0; + u_int64_t tm = (m_warnTime || hTrackTime) ? Time::now() : 0; retv = h->receivedInternal(msg) || retv; if (tm) { tm = Time::now() - tm; - if (tm > m_warnTime) { + if (m_warnTime && tm > m_warnTime) { mylock.acquire(this); const char* name = (c == m_changes) ? h->trackName().c_str() : 0; Debug(DebugInfo,"Message '%s' [%p] passed through %p%s%s%s in " FMT64U " usec", msg.c_str(),&msg,h, (name ? " '" : ""),(name ? name : ""),(name ? "'" : ""),tm); } + if (hTrackTime && hTrackName) { + NamedString* tracked = msg.getParam(trackParam()); + unsigned int start = hTrackPos - hTrackName.length(); + if (tracked && start < tracked->length()) { + if (0 == ::strncmp(tracked->c_str() + start,hTrackName.c_str(),hTrackName.length())) { + String buf; + buf.printf("#%u.%03u",(unsigned int)(tm / 1000), + (unsigned int)(tm % 1000)); + char c = (*tracked)[hTrackPos]; + if (!c) + *tracked << buf; + else if (',' == c) // Message re-dispatched. New handler name added + tracked->insert(hTrackPos,buf,buf.length()); + } + } + } } if (retv && !msg.broadcast()) @@ -529,6 +573,8 @@ bool MessageDispatcher::enqueue(Message* msg) Lock lock(this); if (!msg || m_messages.find(msg)) return false; + if (m_traceTime) + msg->m_timeEnqueue = Time::now(); m_msgAppend = m_msgAppend->append(msg); u_int64_t count = (++m_enqueueCount) - m_dequeueCount; if (m_queuedMax < count) diff --git a/yatengine.h b/yatengine.h index c6c874d6..416522b6 100644 --- a/yatengine.h +++ b/yatengine.h @@ -402,6 +402,13 @@ public: inline bool broadcast() const { return m_broadcast; } + /** + * Reset message. This method should be used when message is going to be re-dispatched. + * Reset message time, track param, return value. + * @param tm Time to set, defaults to current time + */ + void resetMsg(Time tm = Time::now()); + /** * Retrieve a reference to the creation time of the message. * @return A reference to the @ref Time when the message was created @@ -416,6 +423,38 @@ public: inline const Time& msgTime() const { return m_time; } + /** + * Retrieve a reference to the time when message was put in dispatcher. + * @return A reference to the @ref Time when the message was put in dispatcher queue. + * May be 0 if the message was not put in queue or time tracking is not enabled in dispatcher + */ + inline Time& msgTimeEnqueue() + { return m_timeEnqueue; } + + /** + * Retrieve a const reference to the time when message was put in dispatcher. + * @return A reference to the @ref Time when the message was put in dispatcher queue. + * May be 0 if the message was not put in queue or time tracking is not enabled in dispatcher + */ + inline const Time& msgTimeEnqueue() const + { return m_timeEnqueue; } + + /** + * Retrieve a reference to the time when message was dispatched. + * @return A reference to the @ref Time when the message was dispatched + * May be 0 if the message was not dispatched yet or time tracking is not enabled in dispatcher + */ + inline Time& msgTimeDispatch() + { return m_timeDispatch; } + + /** + * Retrieve a const reference to the time when message was dispatched. + * @return A reference to the @ref Time when the message was dispatched + * May be 0 if the message was not dispatched yet or time tracking is not enabled in dispatcher + */ + inline const Time& msgTimeDispatch() const + { return m_timeDispatch; } + /** * Name assignment operator */ @@ -472,6 +511,8 @@ private: Message& operator=(const Message& value); // no assignment please String m_return; Time m_time; + Time m_timeEnqueue; + Time m_timeDispatch; RefObject* m_data; bool m_notify; bool m_broadcast; @@ -790,6 +831,20 @@ public: inline void warnTime(u_int64_t usec) { m_warnTime = usec; } + /** + * Enable or disable message events time (queued / dispatch) + * @param on True to enable, false to disable + */ + inline void traceTime(bool on = false) + { m_traceTime = on; } + + /** + * Enable or disable message handler duration + * @param on True to enable, false to disable + */ + inline void traceHandlerTime(bool on = false) + { m_traceHandlerTime = on; } + /** * Clear all the message handlers and post-dispatch hooks */ @@ -910,6 +965,8 @@ private: u_int64_t m_dispatchCount; u_int64_t m_queuedMax; u_int64_t m_msgAvgAge; + bool m_traceTime; + bool m_traceHandlerTime; int m_hookCount; bool m_hookHole; }; @@ -1670,6 +1727,13 @@ public: */ static int cleanupLibrary(); + /** + * Retrieve common Engine dispatcher + * @return MessageDispatcher pointer, NULL if not set + */ + static inline MessageDispatcher* dispatcher() + { return s_self ? &(s_self->m_dispatcher) : 0; } + protected: /** * Destroys the engine and everything. You must not call it directly,