/** * nettest.cpp * This file is part of the YATE Project http://YATE.null.ro * * Network and socket performance test module * * Yet Another Telephony Engine - a fully featured software PBX and IVR * Copyright (C) 2004-2023 Null Team * Author: Marian Podgoreanu * * This software is distributed under multiple licenses; * see the COPYING file in the main directory for licensing * information for this specific distribution. * * This use of this software may be subject to additional restrictions. * See the LEGAL file in the main directory for details. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. */ #include #include #include #include #include #include using namespace TelEngine; namespace { // anonymous class Statistics; // class NTTest; // class NTWorkerContainer; // A set of workers class NTWorker; // class NTWriter; // class NTReader; // class NTSelectReader; // class NTPlugin; // The module /** * This class holds an fd_set * @short MultiSelect private data */ class PrivateFDSet { public: inline bool isset(int handle) { return 0 != FD_ISSET(handle,&set); } inline void add(int handle) { FD_SET(handle,&set); } inline void reset() { FD_ZERO(&set); } fd_set set; }; /** * This class encapsulates a select for a set of file descriptors. * File descriptors can be appended to wait for data to be read or write or * wait for an exception to occur * @short A multiple file descriptor select */ class FDSetSelect { public: /** * Constructor */ FDSetSelect(); /** * Destructor. Release private data */ ~FDSetSelect(); /** * Check if data is available for read * This method should be called after @ref select() returns * @param handle File descriptor to check * @return True if there is any data available for the given file descriptor */ bool canRead(int handle) const; /** * Check if a file descriptor can be used to write data * This method should be called after @ref select() returns * @param handle File descriptor to check * @return True if data can be written using the given file descriptor */ bool canWrite(int handle) const; /** * Check if there is a pending event for a given file descriptor * This method should be called after @ref select() returns * @param handle File descriptor to check * @return True if there is a pending event for the given file descriptor */ bool hasEvent(int handle) const; /** * Append a file descriptor to read, write and/or event set. * This method shouldn't be called while in select * @param handle File descriptor to append * @param read True to append to the read set (wait to receive data) * @param write True to append to the write set (check if the handle can be used to write data) * @param event True to append to the event set (check exceptions) * @return False if handle is invalid or target set is missing (all flags are false) */ bool add(int handle, bool read, bool write, bool event); /** * Reset all file descriptor sets. * This method shouldn't be called while in select */ void reset(); /** * Start waiting for a file descriptor state change * @param uSec The select timeout in microseconds (can be 0 to wait until a file descriptor get set) * @return The number of file descriptors whose state changed or negative on error */ int select(unsigned int uSec); private: PrivateFDSet* m_read; // Read set PrivateFDSet* m_write; // Write set PrivateFDSet* m_event; // Event set PrivateFDSet* m_crtR; // Current event set for read/write/events PrivateFDSet* m_crtW; // PrivateFDSet* m_crtE; // int m_maxHandle; // Maximum handle value in current set(s) bool m_selectError; // Flag used to output errors }; // Statistics class class Statistics { public: inline Statistics() { reset(); } inline void reset() { msStart = Time::msecNow(); msStop = 0; packets = totalBytes = errors = lostBytes = 0; stopped = 0; } inline void success(unsigned int bytes) { packets++; totalBytes += bytes; } inline void failure(unsigned int bytes) { packets++; errors++; lostBytes += bytes; } inline Statistics& operator+=(const Statistics& src) { packets += src.packets; totalBytes += src.totalBytes; errors += src.errors; lostBytes += src.lostBytes; stopped += src.stopped; return *this; } void output(String& dest); u_int64_t msStart; u_int64_t msStop; u_int64_t packets; u_int64_t totalBytes; u_int64_t errors; u_int64_t lostBytes; unsigned int stopped; }; class NTTest : public Mutex, public GenObject, public DebugEnabler { public: NTTest(const char* name); virtual ~NTTest() { stop(); } virtual void destruct() { stop(); GenObject::destruct(); } inline bool send() const { return m_send; } inline const String& localip() const { return m_localip; } inline const String& remoteip() const { return m_remoteip; } inline unsigned int packetLen() const { return m_packetLen; } inline unsigned int interval() const { return m_interval; } inline unsigned int lifetime() const { return m_lifetime; } inline unsigned int packetCount() const { return m_packetCount; } inline int selectTimeout() const { return m_selectTimeout; } bool init(NamedList& params); void start(); void stop(); void addWorker(); void removeWorker(NTWorker* worker); private: Mutex m_mutex; String m_id; String m_localip; String m_remoteip; int m_port; unsigned int m_threads; bool m_send; unsigned int m_packetLen; unsigned int m_interval; unsigned int m_lifetime; unsigned int m_packetCount; ObjList m_containers; unsigned int m_workerCount; int m_selectTimeout; Statistics m_localStats; }; class NTWorkerContainer : public Mutex, public GenObject, public DebugEnabler { friend class SelectThread; public: NTWorkerContainer(NTTest* test, unsigned int threads, const char* id); inline NTTest* test() { return m_test; } void start(int& port); void stop(); void addWorker(NTWorker* worker); void removeWorker(NTWorker* worker); private: String m_id; NTTest* m_test; unsigned int m_workerCount; unsigned int m_threads; ObjList m_workers; }; class NTWorker : public Thread, public GenObject { public: NTWorker(NTWorkerContainer* container, int port, const char* name = "NTWorker"); ~NTWorker(); const Statistics& counters() const { return m_counters; } protected: bool initSocket(Socket* sock = 0, SocketAddr* addr = 0); protected: NTWorkerContainer* m_container; NTTest* m_test; u_int64_t m_timeToDie; Socket m_socket; SocketAddr m_addr; Statistics m_counters; }; class NTWriter : public NTWorker { public: inline NTWriter(NTWorkerContainer* container, int port) : NTWorker(container,port), m_timeToSend(0) {} virtual void run(); private: u_int64_t m_timeToSend; }; class NTReader : public NTWorker { public: inline NTReader(NTWorkerContainer* container, int port) : NTWorker(container,port) {} virtual void run(); }; class NTSelectReader : public NTWorker { public: NTSelectReader(NTWorkerContainer* container, int& port, unsigned int count); virtual ~NTSelectReader(); virtual void run(); private: Socket* m_sockets; unsigned int m_count; }; class NTPlugin : public Module { public: NTPlugin(); virtual ~NTPlugin(); virtual void initialize(); virtual bool received(Message& msg, int id); private: bool m_first; }; // Static data static DataBlock s_stopPattern; static NTTest* s_test = 0; // Config static String s_localip; static unsigned int s_packetLen = 320; static unsigned int s_interval = 20; static unsigned int s_lifetime = 60; static unsigned long s_sleep = 2; // Plugin static NTPlugin plugin; /** * FDSetSelect */ // Constructor FDSetSelect::FDSetSelect() : m_read(new PrivateFDSet), m_write(new PrivateFDSet), m_event(new PrivateFDSet), m_crtR(0), m_crtW(0), m_crtE(0), m_maxHandle(Socket::invalidHandle()), m_selectError(false) { } // Release private data FDSetSelect::~FDSetSelect() { delete m_read; delete m_write; delete m_event; } // Check if data is available for read bool FDSetSelect::canRead(int handle) const { return m_read->isset(handle); } // Check if a file descriptor can be used to write data bool FDSetSelect::canWrite(int handle) const { return m_write->isset(handle); } // Check if there is a pending event for a given file descriptor bool FDSetSelect::hasEvent(int handle) const { return m_event->isset(handle); } // Append a file descriptor to read, write and/or event set. // Return false if handle is invalid or target set is missing (all flags are false) bool FDSetSelect::add(int handle, bool read, bool write, bool event) { if (!(read || write || event) || handle == Socket::invalidHandle() || !Socket::canSelect(handle)) return false; if (read) { m_read->add(handle); m_crtR = m_read; } if (write) { m_write->add(handle); m_crtW = m_write; } if (event) { m_event->add(handle); m_crtE = m_event; } if (m_maxHandle < handle) m_maxHandle = handle; return true; } // Reset all file descriptor sets void FDSetSelect::reset() { m_read->reset(); m_write->reset(); m_event->reset(); m_crtR = m_crtW = m_crtE = 0; m_maxHandle = Socket::invalidHandle(); } // Start waiting for a file descriptor state change int FDSetSelect::select(unsigned int uSec) { if (m_maxHandle == Socket::invalidHandle()) return 0; struct timeval t; t.tv_sec = 0; t.tv_usec = (uSec > 0) ? uSec : 0; m_selectError = false; int result = ::select(m_maxHandle+1,&m_crtR->set,&m_crtW->set,&m_crtE->set,&t); if (result >= 0) { XDebug(DebugAll,"FDSetSelect got %d handlers [%p]",result,this); return result; } bool canRetry = (errno == EAGAIN || errno == EINTR || errno == EBADF); if (!(canRetry || m_selectError)) { Debug(DebugWarn,"FDSetSelect failed: %d: %s [%p]",errno,::strerror(errno),this); m_selectError = true; } return -1; } /** * Statistics */ void Statistics::output(String& dest) { dest << "================================================================="; #define stat_set64(text,val) \ ::sprintf(buf,FMT64,val); \ dest << "\r\n" << text << buf; char buf[128]; stat_set64("Packets: ",packets); stat_set64("Total (bytes): ",totalBytes); stat_set64("Errors: ",errors); stat_set64("Lost (bytes): ",lostBytes); dest <<"\r\nStopped: " << stopped; u_int64_t stop = msStop ? msStop : Time::msecNow(); u_int64_t lenMsec = stop - msStart; u_int64_t lenSec = lenMsec / 1000; if (!lenSec) lenSec = 1; stat_set64("Test length (ms): ",lenMsec); stat_set64("Ratio (Mb/s): ",totalBytes / lenSec * 8 / 1000000); stat_set64("Ratio (packets/s): ",packets/lenSec); #undef stat_set64 dest << "\r\n================================================================="; } /** * NTTest */ NTTest::NTTest(const char* name) : Mutex(true), m_mutex(true), m_port(0), m_threads(0), m_send(true), m_packetLen(0), m_interval(0), m_lifetime(0), m_packetCount(0), m_workerCount(0), m_selectTimeout(-1) { debugChain(&plugin); m_id << plugin.debugName() << "/" << name; debugName(m_id); } bool NTTest::init(NamedList& params) { Lock2 lock(*this,m_mutex); stop(); m_localip = params.getValue("localip",s_localip); if (m_localip.null()) { Debug(this,DebugNote,"Empty localip in section '%s'",debugName()); return false; } m_remoteip = params.getValue("remoteip"); if (m_remoteip.null()) { Debug(this,DebugNote,"Empty remoteip in section '%s'",debugName()); return false; } String tmp = params.getValue("port"); m_port = tmp.toInteger(0); if (!m_port) { Debug(this,DebugNote,"Invalid port=%s in section '%s'", tmp.c_str(),debugName()); return false; } m_threads = params.getIntValue("threads",1); if (m_threads < 1) m_threads = 1; m_send = params.getBoolValue("send",true); m_packetLen = params.getIntValue("packetlen",s_packetLen); if (m_packetLen < 16) m_packetLen = 16; else if (m_packetLen > 1400) m_packetLen = 1400; m_interval = params.getIntValue("interval",s_interval); if (m_interval < 1) m_interval = 1; else if (m_interval > 120) m_interval = 120; m_lifetime = params.getIntValue("lifetime",s_lifetime); bool sendAllPackets = params.getBoolValue("sendallpackets",true); if (sendAllPackets) m_packetCount = m_lifetime * 1000 / m_interval; else m_packetCount = 0; m_selectTimeout = params.getIntValue("select-timeout",-1); m_containers.clear(); int workersets = params.getIntValue("workersets",1); if (workersets < 1 || (unsigned int)workersets > m_threads) workersets = 1; unsigned int nFull = 0; unsigned int nRest = 0; if (workersets == 1) nRest = m_threads; else { nFull = m_threads / (workersets - 1); nRest = m_threads - nFull * (workersets - 1); } for (int i = 1; i <= workersets; i++) { String id; id << m_id << "/" << i; if (i < workersets) m_containers.append(new NTWorkerContainer(this,nFull,id)); else m_containers.append(new NTWorkerContainer(this,nRest,id)); } unsigned int sock = m_threads; if (m_selectTimeout >= 0) m_threads = workersets; tmp = ""; tmp << "\r\nAction: " << (m_send ? "send" : "recv"); if (m_selectTimeout >= 0) tmp << "\r\nSockets: " << sock; else tmp << "\r\nThreads: " << m_threads; tmp << "\r\nLocal address: " << m_localip; tmp << "\r\nRemote address: " << m_remoteip; tmp << "\r\nPort: " << m_port; tmp << "\r\nPacket length: " << m_packetLen; tmp << "\r\nPackets: " << m_packetCount; tmp << "\r\nInterval: " << m_interval << "ms"; tmp << "\r\nLifetime: " << m_lifetime << "s"; tmp << "\r\nWorker sets: " << workersets; tmp << "\r\nSelect timeout: " << m_selectTimeout << (m_selectTimeout < 0 ? " (not used)" : "us"); Debug(this,DebugInfo,"Initialized:%s",tmp.c_str()); return true; } void NTTest::start() { Lock lock(m_mutex); stop(); DDebug(this,DebugAll,"Starting"); m_localStats.reset(); int port = m_port; for (ObjList* o = m_containers.skipNull(); o; o = o->skipNext()) (static_cast(o->get()))->start(port); } void NTTest::stop() { Lock lock(m_mutex); DDebug(this,DebugAll,"Stopping %u workers",m_workerCount); for (ObjList* o = m_containers.skipNull(); o; o = o->skipNext()) (static_cast(o->get()))->stop(); } void NTTest::addWorker() { Lock lock(this); m_workerCount++; if (m_workerCount == m_threads) Debug(this,DebugAll,"Created %u workers",m_workerCount); } void NTTest::removeWorker(NTWorker* worker) { Lock lock(this); if (!(worker && m_workerCount)) return; m_localStats += worker->counters(); m_workerCount--; if (m_workerCount) return; lock.drop(); m_localStats.msStop = Time::msecNow(); String tmp; m_localStats.output(tmp); Debug(this,DebugInfo,"No more workers. Local statistics:\r\n%s",tmp.c_str()); } /** * NTWorkerContainer */ NTWorkerContainer::NTWorkerContainer(NTTest* test, unsigned int threads, const char* id) : Mutex(true), m_id(id), m_test(test), m_workerCount(0), m_threads(threads) { debugName(m_id); debugChain(m_test); } void NTWorkerContainer::start(int& port) { Lock lock(this); stop(); if (!m_test) return; lock.drop(); DDebug(this,DebugAll,"Starting"); if (m_test->selectTimeout() >= 0) (new NTSelectReader(this,port,m_threads))->startup(); else for (unsigned int i = 0; i < m_threads; i++, port++) if (m_test->send()) (new NTWriter(this,port))->startup(); else (new NTReader(this,port))->startup(); } void NTWorkerContainer::stop() { Lock l(this); DDebug(this,DebugAll,"Stopping %u workers",m_workerCount); if (!m_workerCount) return; if (m_workerCount) { ListIterator iterw(m_workers); for (GenObject* o = 0; 0 != (o = iterw.get());) (static_cast(o))->cancel(false); } l.drop(); while (m_workerCount) Thread::yield(); DDebug(this,DebugAll,"Stopped"); } void NTWorkerContainer::addWorker(NTWorker* worker) { Lock lock(this); if (!worker) return; ObjList* obj = m_workers.append(worker); if (!obj) return; m_workerCount++; obj->setDelete(false); if (m_workerCount >= m_threads) DDebug(this,DebugAll,"Created %u workers",m_workerCount); lock.drop(); if (m_test) m_test->addWorker(); } void NTWorkerContainer::removeWorker(NTWorker* worker) { Lock lock(this); if (!(worker && m_workerCount)) return; m_workers.remove(worker,false); if (m_workerCount) m_workerCount--; if (!m_workerCount) DDebug(this,DebugAll,"No more workers"); lock.drop(); if (m_test) m_test->removeWorker(worker); } /** * NTWorker */ NTWorker::NTWorker(NTWorkerContainer* container, int port, const char* name) : Thread(name), m_container(container), m_test(container ? container->test() : 0), m_timeToDie(0), m_addr(AF_INET) { if (!(m_container && m_test)) return; container->addWorker(this); m_addr.host(m_test->send() ? m_test->remoteip() : m_test->localip()); m_addr.port(port); if (!m_test->packetCount() && m_test->lifetime()) m_timeToDie = Time::msecNow() + m_test->lifetime() * 1000; } NTWorker::~NTWorker() { if (m_socket.valid()) { m_socket.setLinger(-1); m_socket.terminate(); } if (m_container) m_container->removeWorker(this); } bool NTWorker::initSocket(Socket* sock, SocketAddr* addr) { if (!(m_container && m_test)) return false; if (!sock) { sock = &m_socket; addr = &m_addr; } if (!sock->create(addr->family(),SOCK_DGRAM)) { Debug(m_container,DebugNote,"Failed to create socket: %d '%s' [%p]", sock->error(),::strerror(sock->error()),this); return false; } if (!m_test->send() && !sock->bind(*addr)) { Debug(m_container,DebugNote,"Failed to bind socket on port %d: %d '%s' [%p]", addr->port(),sock->error(),::strerror(sock->error()),this); return false; } sock->setBlocking(false); return true; } /** * NTWriter */ void NTWriter::run() { if (!initSocket()) return; unsigned char buf[m_test->packetLen()]; buf[0] = 1; while (true) { u_int64_t now = Time::msecNow(); if (now < m_timeToSend) { Thread::msleep(s_sleep,true); continue; } bool die = false; if (m_test->packetCount()) die = m_counters.packets >= m_test->packetCount(); else die = m_timeToDie && now > m_timeToDie; if (die) { Thread::msleep(5,true); if (0 < m_socket.sendTo(s_stopPattern.data(),s_stopPattern.length(),m_addr)) m_counters.stopped = 1; break; } Thread::check(true); m_timeToSend = now + m_test->interval(); int w = m_socket.sendTo(buf,m_test->packetLen(),m_addr); if (w != m_socket.socketError() || m_socket.canRetry()) { if (w == m_socket.socketError()) continue; if (w) m_counters.success(w); if ((unsigned int)w < m_test->packetLen()) m_counters.failure(m_test->packetLen() - w); continue; } Debug(m_container,DebugNote,"SEND error dest='%s:%d': %d '%s' [%p]", m_addr.host().c_str(),m_addr.port(), m_socket.error(),::strerror(m_socket.error()),this); m_counters.failure(m_test->packetLen()); } } /** * NTReader */ void NTReader::run() { if (!initSocket()) return; unsigned char buf[m_test->packetLen()]; SocketAddr addr; while (true) { if (m_timeToDie && (Time::msecNow() > m_timeToDie)) break; Thread::msleep(s_sleep,true); int r = m_socket.recvFrom(buf,m_test->packetLen(),addr); if (r > 0) { if (buf[0] == 0) { m_counters.stopped = 1; break; } m_counters.success(r); continue; } if (r == 0 || (r == m_socket.socketError() && m_socket.canRetry())) continue; Debug(m_container,DebugNote,"RECV error src='%s:%d': %d '%s' [%p]", addr.host().c_str(),addr.port(), m_socket.error(),::strerror(m_socket.error()),this); m_counters.failure(0); } } /** * NTSelectReader */ NTSelectReader::NTSelectReader(NTWorkerContainer* container, int& port, unsigned int count) : NTWorker(container,0,"NTSelectReader"), m_sockets(0), m_count(count) { DDebug(container,DebugAll,"NTSelectReader sockets=%u",count); m_sockets = new Socket[m_count]; unsigned int ok = 0; for (unsigned int i = 0; i < count; i++, port++) { SocketAddr addr(AF_INET); addr.host(m_test->localip()); addr.port(port); if (initSocket(&m_sockets[i],&addr)) ok++; } if (!ok) { Debug(container,DebugNote,"NTSelectReader: Bind or create failed for all sockets"); delete[] m_sockets; m_sockets = 0; m_count = 0; } } NTSelectReader::~NTSelectReader() { if (!m_sockets) return; for (unsigned int i = 0; i < m_count; i++) if (m_sockets[i].valid()) { m_sockets[i].setLinger(-1); m_sockets[i].terminate(); } delete[] m_sockets; } void NTSelectReader::run() { if (!m_count || !m_test || m_test->send()) return; DDebug(m_container,DebugAll,"Select reader worker started"); unsigned char buf[m_test->packetLen()]; SocketAddr addr; int ok = 0; FDSetSelect set; while (true) { if (m_counters.stopped == m_count) break; if (m_timeToDie && (Time::msecNow() > m_timeToDie)) break; set.reset(); for (unsigned int i = 0; i < m_count; i++) set.add(m_sockets[i].handle(),true,false,false); ok = set.select(m_container->test()->selectTimeout()); if (ok <= 0) { if (!m_test->selectTimeout()) Thread::msleep(1,true); continue; } for (unsigned int i = 0; i < m_count; i++) { if (!(m_sockets[i].valid() && set.canRead(m_sockets[i].handle()))) continue; int r = m_sockets[i].recvFrom(buf,m_test->packetLen(),addr); if (r > 0) { if (buf[0]) { if ((unsigned int)r != m_test->packetLen()) Debug(m_container,DebugMild,"RECV %u expected=%u [%p]", r,m_test->packetLen(),this); m_counters.success(r); } else { m_sockets[i].setLinger(-1); m_sockets[i].terminate(); m_counters.stopped++; } continue; } m_counters.failure(0); if (r == 0 || (r == m_sockets[i].socketError() && m_sockets[i].canRetry())) continue; Debug(m_container,DebugNote,"RECV error src='%s:%d': %d '%s' [%p]", addr.host().c_str(),addr.port(), m_sockets[i].error(),::strerror(m_sockets[i].error()),this); } } } /** * Plugin */ NTPlugin::NTPlugin() : Module("nettest","misc"), m_first(true) { Output("Loaded module Network Test"); } NTPlugin::~NTPlugin() { Output("Unloading module Network Test"); } void NTPlugin::initialize() { Output("Initializing module Network Test"); debugLevel(10); if (m_first) { m_first = false; setup(); installRelay(Halt); } // Reset statistics lock(); TelEngine::destruct(s_test); // Get new values from config Configuration cfg(Engine::configFile("nettest")); NamedList* general = cfg.getSection("general"); NamedList dummy(""); if (!general) general = &dummy; s_localip = general->getValue("localip"); s_packetLen = general->getIntValue("packetlen",320); if (s_packetLen < 16) s_packetLen = 16; else if (s_packetLen > 1400) s_packetLen = 1400; s_interval = general->getIntValue("interval",20); if (s_interval < 1) s_interval = 1; else if (s_interval > 120) s_interval = 120; s_lifetime = general->getIntValue("lifetime",60); s_sleep = cfg.getIntValue("general","sleep",2); if (s_sleep < 1) s_sleep = 1; else if (s_sleep > 10) s_sleep = 10; s_stopPattern.assign(0,s_packetLen); Debug(this,DebugInfo, "Init: localip=%s packet=%u interval=%ums lifetime=%us", s_localip.c_str(),s_packetLen,s_interval,s_lifetime); unsigned int n = cfg.sections(); for (unsigned int i = 0; i < n; i++) { NamedList* sect = cfg.getSection(i); if (!sect || sect->null() || *sect == "general") continue; s_test = new NTTest(*sect); if (!s_test->init(*sect)) { Debug(this,DebugNote,"Failed to init test from section '%s'",sect->c_str()); TelEngine::destruct(s_test); continue; } s_test->start(); break; } unlock(); } bool NTPlugin::received(Message& msg, int id) { if (id == Halt) TelEngine::destruct(s_test); return Module::received(msg,id); } }; // anonymous namespace /* vi: set ts=8 sw=4 sts=4 noet: */