diff --git a/conf.d/wiresniff.conf.sample b/conf.d/wiresniff.conf.sample index cd49db33..f40ece3a 100644 --- a/conf.d/wiresniff.conf.sample +++ b/conf.d/wiresniff.conf.sample @@ -21,9 +21,13 @@ ; If empty it will match all messages ; Example for a filter matching all chan.Anything messages and engine.halt: ; filter=^\(chan\.\|engine\.halt$\) -;filter - - +;filter= +; timer: boolean: True to sniff engine.timer messages, false otherwise +;timer=false +; max_buf_size: integer: Maximum admitted length of an encoded message. +; If encoded message length exceeds this length, message will not be sniffed +; Acceptable range is 2048 .. 65507 +;max_buf_size=2048 diff --git a/modules/wiresniff.cpp b/modules/wiresniff.cpp index f4f0f1c0..6c76be94 100644 --- a/modules/wiresniff.cpp +++ b/modules/wiresniff.cpp @@ -5,7 +5,7 @@ * Capture interface for YATE messages. * * Yet Another Telephony Engine - a fully featured software PBX and IVR - * Copyright (C) 2018-2019 Null Team + * Copyright (C) 2018-2023 Null Team * * This software is distributed under multiple licenses; * see the COPYING file in the main directory for licensing @@ -22,22 +22,15 @@ #include #include -#include #include -#define MAX_BUFFER_SIZE 65507 +#define MIN_BUFF_SIZE 2048 +#define MAX_BUFF_SIZE 65507 using namespace TelEngine; + namespace { //anonymous -static Configuration s_cfg; -static Socket s_socket; -static SocketAddr s_remoteAddr; -static SocketAddr s_localAddr; -static Regexp s_filter; -static bool s_timer = false; -static Mutex s_mutex(false,"WireSniff"); - class WireSniffPlugin : public Plugin { public: @@ -46,222 +39,227 @@ public: private: bool m_first; }; - + class WireSniffHandler : public MessageHandler { public: - WireSniffHandler() : MessageHandler(0,0) { } + WireSniffHandler() + : MessageHandler(0,0) + { } virtual bool received(Message &msg); }; - + class WireSniffHook : public MessagePostHook { public: virtual void dispatched(const Message& msg, bool handled); -}; +}; -static void addTag(DataBlock& data, uint8_t tag, uint8_t length = 0, const void* value = 0) +static Socket s_socket; +static SocketAddr s_remAddr; +static SocketAddr s_localAddr; +static Regexp s_filter; +static bool s_timer = false; +static RWLock s_lock("WireSniff"); +// max buffer size, max jumbo frame size - IPv4 header and UDP header +// can be rewritten by configuration +static unsigned int s_maxBuffSize = 65507; + + +INIT_PLUGIN(WireSniffPlugin); + +// append data, value must be given in network order +static void addTag(DataBlock& data, uint8_t tag, const uint8_t* value = 0, uint8_t length = 0) { unsigned char buf[2]; buf[0] = tag; buf[1] = length; data.append(buf,2); if (length) - data.append(const_cast(value),length); + data.append(value,length); } - -static void addTag(DataBlock& data, uint8_t tag, const char* text) + +// append a string +static inline void addTag(DataBlock& data, uint8_t tag, const char* text) { if (text && *text) - addTag(data,tag,::strlen(text),text); + addTag(data,tag,(const uint8_t*)text,::strlen(text)); } - -static void addTag(DataBlock& data, uint8_t tag, void* ptr) + +// append a pointer +static inline void addTag(DataBlock& data, uint8_t tag, void* ptr) { if (!ptr) return; - intptr_t val = reinterpret_cast(ptr); - addTag(data,tag,sizeof(val),ptr); + String id; + id.printf("%p",ptr); + addTag(data,tag,id.c_str()); } - - + +static inline void addTag(DataBlock& data, uint8_t tag, uint8_t val) +{ + addTag(data,tag,&val,1); +} + static bool sendMessage(const Message& msg, bool result, bool handled) { DataBlock buf; - String signature("yate-msg"); + static const String signature("yate-msg"); buf.append(signature); - uint8_t i = result ? 1: 0; - addTag(buf,YSNIFF_RESULT,1,&i); + addTag(buf,YSNIFF_RESULT,result ? 1 : 0); + addTag(buf,YSNIFF_THREAD_ADDR,Thread::current()); + addTag(buf,YSNIFF_THREAD_NAME,Thread::currentName()); + addTag(buf,YSNIFF_DATA,msg.userData()); + addTag(buf,YSNIFF_BROADCAST,msg.broadcast() ? 1 : 0); + addTag(buf,YSNIFF_FINAL_TAG,(uint8_t)0); // cast to uint8_t for resolving ambiguousness - Thread* t = Thread::current(); - addTag(buf,YSNIFF_THREAD_ADDR,t); - - const char* name = Thread::currentName(); - addTag(buf,YSNIFF_THREAD_NAME,name); - - RefObject* data = msg.userData(); - addTag(buf, YSNIFF_DATA, data); - - uint8_t broadcast = msg.broadcast() ? 1 : 0; - addTag(buf,YSNIFF_BROADCAST,1,&broadcast); - - uint8_t finaltag = 0; - addTag(buf,YSNIFF_FINAL_TAG,1,&finaltag); - - String id; id.printf("%p",&msg); if (result) buf.append(msg.encode(handled,id)); else buf.append(msg.encode(id)); - - if (!(buf.length() < MAX_BUFFER_SIZE)) - DDebug(DebugWarn,"Buffer Overrun"); - - s_mutex.lock(); - unsigned int len = s_socket.sendTo(buf.data(),buf.length(),s_remoteAddr); - s_mutex.unlock(); - if (!len) - DDebug(DebugWarn,"Unable to send package"); - - return(len == buf.length()); + + RLock l(s_lock); + if (buf.length() > s_maxBuffSize) { + Debug(&__plugin,DebugWarn,"Encoded '%s'(%p) is too long, encoded length=%u, max allowed length=%u", + msg.toString().c_str(),&msg,buf.length(),s_maxBuffSize); + return false; + } + + if (!s_socket.valid()) + return false; + int len = s_socket.sendTo(buf.data(),buf.length(),s_remAddr); + if (len == (int)buf.length()) + return true; + if (len != Socket::socketError()) { + Debug(&__plugin,DebugMild,"Incomplete write of '%s'(%p) message, written %u of %u octets", + msg.toString().c_str(),&msg,len,buf.length()); + return false; + } + if (!s_socket.canRetry()) + Debug(&__plugin,DebugWarn,"Socket write error: %d: %s", + s_socket.error(),::strerror(s_socket.error())); + else + DDebug(&__plugin,DebugMild,"Socket temporary unavailable: %d: %s", + s_socket.error(),::strerror(s_socket.error())); + return false; } - - + bool WireSniffHandler::received(Message &msg) { if (!s_timer && (msg == YSTRING("engine.timer"))) return false; - if (!s_socket.valid()) { - Debug(DebugWarn,"Socket %s is not valid", strerror(s_socket.error())); - return false; - } - if (!s_localAddr.valid()){ - Debug(DebugWarn,"SocketAddr is not valid"); - return false; - } - + s_lock.readLock(); if (s_filter && !s_filter.matches(msg)) return false; - + s_lock.unlock(); sendMessage(msg,false,false); return false; } - + void WireSniffHook::dispatched(const Message& msg, bool handled) { if ((!s_timer && (msg == YSTRING("engine.timer")))) return; - + s_lock.readLock(); if (s_filter && !s_filter.matches(msg)) return; - + s_lock.unlock(); sendMessage(msg,true,handled); } - - -WireSniffPlugin::WireSniffPlugin() + +WireSniffPlugin::WireSniffPlugin() : Plugin("wiresniff"), m_first(true) { Output("Loaded module WireSniff"); } - + void WireSniffPlugin::initialize() { Output("Initializing module WireSniff"); - s_mutex.lock(); - s_cfg = Engine::configFile("wiresniff"); - s_cfg.load(); - s_mutex.unlock(); + Configuration cfg(Engine::configFile("wiresniff")); + cfg.load(); - String remoteName = s_cfg.getValue("general","remote_host"); - s_remoteAddr.host(remoteName); - s_remoteAddr.port(s_cfg.getIntValue("general","remote_port")); - - if(s_remoteAddr.valid() && s_remoteAddr.port()) { - DDebug(this,DebugNote,"Remote address '%s' and remote port '%u' are set", remoteName.c_str(), s_remoteAddr.port()); - } - - if(!(s_remoteAddr.host() && s_remoteAddr.port())) { - s_socket.terminate(); - if(remoteName) - Debug(this,DebugWarn,"Remote address is invalid"); - return; - } - - if (s_remoteAddr.host() && s_remoteAddr.port() && !(s_remoteAddr.valid())) { - DDebug(this,DebugWarn,"Invalid address '%s'",remoteName.c_str()); + SocketAddr rAddr; + rAddr.host(cfg.getValue("general","remote_host")); + rAddr.port(cfg.getIntValue("general","remote_port")); + if(!(rAddr.host() && rAddr.port() && rAddr.valid())) { + Debug(this,DebugConf,"Failed to initialize: invalid remote address '%s:%u' [%p]", + cfg.getValue("general","remote_host"),cfg.getIntValue("general","remote_port"),this); return; } - if(s_remoteAddr.family() != SocketAddr::IPv6) - s_localAddr.host(s_cfg.getValue("general","local_host","0.0.0.0")); - else - s_localAddr.host(s_cfg.getValue("general","local_host","::")); - - s_localAddr.port(s_cfg.getIntValue("general","local_port",0)); - - if (!(s_localAddr.valid() && s_localAddr.host())) { - DDebug(this,DebugWarn,"Invalid address '%s'", s_localAddr.host().c_str()); + SocketAddr lAddr; + lAddr.host(cfg.getValue("general","local_host")); + lAddr.port(cfg.getIntValue("general","local_port")); + if (!(lAddr.host() && lAddr.port() && lAddr.valid())) { + Debug(this,DebugConf,"Failed to initialize: invalid local address '%s:%u' [%p]", + cfg.getValue("general","local_host"),cfg.getIntValue("general","local_port"),this); return; - } - if (s_remoteAddr.host()) { - if (s_localAddr.family() != s_remoteAddr.family()) { - Debug(this,DebugWarn,"Socket Addresses are not compatible"); + if (lAddr.family() != rAddr.family()) { + Debug(this,DebugConf,"Failed to initialize: mismatched socket families for local (%s) and remote (%s) socket addresses [%p]", + lAddr.familyName(),rAddr.familyName(),this); return; + } + + WLock l(s_lock); + s_remAddr = rAddr; + + if (lAddr != s_localAddr) { + if (s_socket.valid()) { + Debug(this,DebugInfo,"Stopping socket bound on local address '%s' [%p]",s_localAddr.addr().c_str(),this); + s_socket.terminate(); + } + + s_localAddr = lAddr; + if (!s_socket.create(s_localAddr.family(),SOCK_DGRAM)) { + Debug(this,DebugWarn,"Failed to create socket for local address '%s', error=%s(%u) [%p]", + s_localAddr.addr().c_str(),::strerror(s_socket.error()),s_socket.error(),this); + return; + } + + if (!s_socket.bind(s_localAddr)) { + Debug(this,DebugWarn,"Failed to bind socket on local address '%s', error=%s(%u) [%p]", + s_localAddr.addr().c_str(),::strerror(s_socket.error()),s_socket.error(),this); + return; + } + + if (!s_socket.setBlocking(false)) { + s_socket.terminate(); + Debug(DebugWarn,"Failed to set socket bound on local address '%s' in non-blocking mode, error=%s(%u) [%p]", + s_localAddr.addr().c_str(),::strerror(s_socket.error()),s_socket.error(),this); + return; } } - if (!s_socket.create(s_localAddr.family(),SOCK_DGRAM)) { - Debug(this,DebugWarn,"Could not create socket %s", strerror(s_socket.error())); - s_socket.terminate(); - } + Regexp filter(cfg.getValue("general","filter")); + if (filter && !filter.compile()) + Debug(this,DebugConf,"Failed to set message filter '%s', does not compile [%p]", + cfg.getValue("general","filter"),this); + else + s_filter = filter; - if (!s_socket.bind(s_localAddr)) { - Debug(this,DebugWarn,"Unable to bind to %s:%u : %s",s_localAddr.host().c_str(),s_localAddr.port(),strerror(s_socket.error())); - return; - } - -#ifdef IP_MTU_DISCOVER - int val = 1; - if (!s_socket.setOption(IPPROTO_IP, IP_MTU_DISCOVER, &val, sizeof(val))) { - Debug(this,DebugWarn,"Socket %s option is not set!", strerror(s_socket.error())); - return; - } -#endif + Debug(this,DebugAll,"Sending Yate messages from '%s' to '%s' with filter '%s' [%p]", + s_localAddr.addr().c_str(),s_remAddr.addr().c_str(),s_filter.c_str(),this); + + s_timer = cfg.getBoolValue("general","timer",false); + s_maxBuffSize = cfg.getIntValue("general","max_buf_size", + s_maxBuffSize,MIN_BUFF_SIZE,MAX_BUFF_SIZE); + l.drop(); - if (!s_socket.setBlocking(false)) { - s_socket.terminate(); - Debug(DebugWarn,"Unable to set socket %s to nonblocking mode", strerror(s_socket.error())); - return; - } - if (!s_socket.getSockName(s_localAddr)) { - Debug(this,DebugWarn,"Error getting address: %s",strerror(s_socket.error())); - return; - } - - DDebug(this,DebugNote,"Socket bound to: %s:%u",s_localAddr.host().c_str(),s_localAddr.port()); - Debug(this,DebugNote,"Sending from %s to %s:%u",s_localAddr.addr().c_str(),s_remoteAddr.host().c_str(),s_remoteAddr.port()); - if (m_first) { m_first = false; Engine::install(new WireSniffHandler); Engine::self()->setHook(new WireSniffHook); - } - - s_mutex.lock(); - s_filter = s_cfg.getValue("general", "filter"); - s_mutex.unlock(); - - Output("WireSniff was initialized"); + } + } - - INIT_PLUGIN(WireSniffPlugin); + }; // anonymous namespace /* vi: set ts=8 sw=4 sts=4 noet: */