Added a network / socket performace test module by Marian Podgoreanu.
git-svn-id: http://voip.null.ro/svn/yate@2058 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
parent
7a39bf3396
commit
d4c4c2b0ae
|
@ -0,0 +1,998 @@
|
|||
/**
|
||||
* nettest.cpp
|
||||
* Network and socket performance test module
|
||||
* This file is part of the YATE Project http://YATE.null.ro
|
||||
*
|
||||
* Yet Another Telephony Engine - a fully featured software PBX and IVR
|
||||
* Copyright (C) 2004-2006 Null Team
|
||||
* Author: Marian Podgoreanu
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation; either version 2 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
#include <yatephone.h>
|
||||
|
||||
#include <string.h>
|
||||
#include <stdio.h>
|
||||
|
||||
#include <sys/time.h>
|
||||
#include <sys/types.h>
|
||||
#include <unistd.h>
|
||||
|
||||
using namespace TelEngine;
|
||||
namespace { // anonymous
|
||||
|
||||
class Statistics; //
|
||||
class NTTest; //
|
||||
class NTWorkerContainer; // A set of workers
|
||||
class NTWorker; //
|
||||
class NTWriter; //
|
||||
class NTReader; //
|
||||
class NTSelectReader; //
|
||||
class NTPlugin; // The module
|
||||
|
||||
/**
|
||||
* This class holds an fd_set
|
||||
* @short MultiSelect private data
|
||||
*/
|
||||
class PrivateFDSet
|
||||
{
|
||||
public:
|
||||
inline bool isset(int handle)
|
||||
{ return 0 != FD_ISSET(handle,&set); }
|
||||
inline void add(int handle)
|
||||
{ FD_SET(handle,&set); }
|
||||
inline void reset()
|
||||
{ FD_ZERO(&set); }
|
||||
fd_set set;
|
||||
};
|
||||
|
||||
/**
|
||||
* This class encapsulates a select for a set of file descriptors.
|
||||
* File descriptors can be appended to wait for data to be read or write or
|
||||
* wait for an exception to occur
|
||||
* @short A multiple file descriptor select
|
||||
*/
|
||||
class FDSetSelect
|
||||
{
|
||||
public:
|
||||
/**
|
||||
* Constructor
|
||||
*/
|
||||
FDSetSelect();
|
||||
|
||||
/**
|
||||
* Destructor. Release private data
|
||||
*/
|
||||
~FDSetSelect();
|
||||
|
||||
/**
|
||||
* Check if data is available for read
|
||||
* This method should be called after @ref select() returns
|
||||
* @param handle File descriptor to check
|
||||
* @return True if there is any data available for the given file descriptor
|
||||
*/
|
||||
bool canRead(int handle) const;
|
||||
|
||||
/**
|
||||
* Check if a file descriptor can be used to write data
|
||||
* This method should be called after @ref select() returns
|
||||
* @param handle File descriptor to check
|
||||
* @return True if data can be written using the given file descriptor
|
||||
*/
|
||||
bool canWrite(int handle) const;
|
||||
|
||||
/**
|
||||
* Check if there is a pending event for a given file descriptor
|
||||
* This method should be called after @ref select() returns
|
||||
* @param handle File descriptor to check
|
||||
* @return True if there is a pending event for the given file descriptor
|
||||
*/
|
||||
bool hasEvent(int handle) const;
|
||||
|
||||
/**
|
||||
* Append a file descriptor to read, write and/or event set.
|
||||
* This method shouldn't be called while in select
|
||||
* @param handle File descriptor to append
|
||||
* @param read True to append to the read set (wait to receive data)
|
||||
* @param write True to append to the write set (check if the handle can be used to write data)
|
||||
* @param event True to append to the event set (check exceptions)
|
||||
* @return False if handle is invalid or target set is missing (all flags are false)
|
||||
*/
|
||||
bool add(int handle, bool read, bool write, bool event);
|
||||
|
||||
/**
|
||||
* Reset all file descriptor sets.
|
||||
* This method shouldn't be called while in select
|
||||
*/
|
||||
void reset();
|
||||
|
||||
/**
|
||||
* Start waiting for a file descriptor state change
|
||||
* @param uSec The select timeout in microseconds (can be 0 to wait until a file descriptor get set)
|
||||
* @return The number of file descriptors whose state changed or negative on error
|
||||
*/
|
||||
int select(unsigned int uSec);
|
||||
|
||||
private:
|
||||
PrivateFDSet* m_read; // Read set
|
||||
PrivateFDSet* m_write; // Write set
|
||||
PrivateFDSet* m_event; // Event set
|
||||
PrivateFDSet* m_crtR; // Current event set for read/write/events
|
||||
PrivateFDSet* m_crtW; //
|
||||
PrivateFDSet* m_crtE; //
|
||||
int m_maxHandle; // Maximum handle value in current set(s)
|
||||
bool m_selectError; // Flag used to output errors
|
||||
};
|
||||
|
||||
// Statistics class
|
||||
class Statistics
|
||||
{
|
||||
public:
|
||||
inline Statistics()
|
||||
{ reset(); }
|
||||
inline void reset() {
|
||||
msStart = Time::msecNow();
|
||||
msStop = 0;
|
||||
packets = totalBytes = errors = lostBytes = 0;
|
||||
stopped = 0;
|
||||
}
|
||||
inline void success(unsigned int bytes) {
|
||||
packets++;
|
||||
totalBytes += bytes;
|
||||
}
|
||||
inline void failure(unsigned int bytes) {
|
||||
packets++;
|
||||
errors++;
|
||||
lostBytes += bytes;
|
||||
}
|
||||
inline Statistics& operator+=(const Statistics& src) {
|
||||
packets += src.packets;
|
||||
totalBytes += src.totalBytes;
|
||||
errors += src.errors;
|
||||
lostBytes += src.lostBytes;
|
||||
stopped += src.stopped;
|
||||
return *this;
|
||||
}
|
||||
|
||||
void output(String& dest);
|
||||
|
||||
u_int64_t msStart;
|
||||
u_int64_t msStop;
|
||||
u_int64_t packets;
|
||||
u_int64_t totalBytes;
|
||||
u_int64_t errors;
|
||||
u_int64_t lostBytes;
|
||||
unsigned int stopped;
|
||||
};
|
||||
|
||||
class NTTest : public Mutex, public GenObject, public DebugEnabler
|
||||
{
|
||||
public:
|
||||
NTTest(const char* name);
|
||||
virtual ~NTTest()
|
||||
{ stop(); }
|
||||
virtual void destruct()
|
||||
{ stop(); GenObject::destruct(); }
|
||||
inline bool send() const
|
||||
{ return m_send; }
|
||||
inline const String& localip() const
|
||||
{ return m_localip; }
|
||||
inline const String& remoteip() const
|
||||
{ return m_remoteip; }
|
||||
inline unsigned int packetLen() const
|
||||
{ return m_packetLen; }
|
||||
inline unsigned int interval() const
|
||||
{ return m_interval; }
|
||||
inline unsigned int lifetime() const
|
||||
{ return m_lifetime; }
|
||||
inline unsigned int packetCount() const
|
||||
{ return m_packetCount; }
|
||||
inline int selectTimeout() const
|
||||
{ return m_selectTimeout; }
|
||||
bool init(NamedList& params);
|
||||
void start();
|
||||
void stop();
|
||||
void addWorker();
|
||||
void removeWorker(NTWorker* worker);
|
||||
private:
|
||||
Mutex m_mutex;
|
||||
String m_id;
|
||||
String m_localip;
|
||||
String m_remoteip;
|
||||
int m_port;
|
||||
unsigned int m_threads;
|
||||
bool m_send;
|
||||
unsigned int m_packetLen;
|
||||
unsigned int m_interval;
|
||||
unsigned int m_lifetime;
|
||||
unsigned int m_packetCount;
|
||||
ObjList m_containers;
|
||||
unsigned int m_workerCount;
|
||||
int m_selectTimeout;
|
||||
Statistics m_localStats;
|
||||
};
|
||||
|
||||
class NTWorkerContainer : public Mutex, public GenObject, public DebugEnabler
|
||||
{
|
||||
friend class SelectThread;
|
||||
public:
|
||||
NTWorkerContainer(NTTest* test, unsigned int threads, const char* id);
|
||||
inline NTTest* test()
|
||||
{ return m_test; }
|
||||
void start(int& port);
|
||||
void stop();
|
||||
void addWorker(NTWorker* worker);
|
||||
void removeWorker(NTWorker* worker);
|
||||
private:
|
||||
String m_id;
|
||||
NTTest* m_test;
|
||||
unsigned int m_workerCount;
|
||||
unsigned int m_threads;
|
||||
ObjList m_workers;
|
||||
};
|
||||
|
||||
class NTWorker : public Thread, public GenObject
|
||||
{
|
||||
public:
|
||||
NTWorker(NTWorkerContainer* container, int port, const char* name = "NTWorker");
|
||||
~NTWorker();
|
||||
const Statistics& counters() const
|
||||
{ return m_counters; }
|
||||
protected:
|
||||
bool initSocket(Socket* sock = 0, SocketAddr* addr = 0);
|
||||
protected:
|
||||
NTWorkerContainer* m_container;
|
||||
NTTest* m_test;
|
||||
u_int64_t m_timeToDie;
|
||||
Socket m_socket;
|
||||
SocketAddr m_addr;
|
||||
Statistics m_counters;
|
||||
};
|
||||
|
||||
class NTWriter : public NTWorker
|
||||
{
|
||||
public:
|
||||
inline NTWriter(NTWorkerContainer* container, int port)
|
||||
: NTWorker(container,port),
|
||||
m_timeToSend(0)
|
||||
{}
|
||||
virtual void run();
|
||||
private:
|
||||
u_int64_t m_timeToSend;
|
||||
};
|
||||
|
||||
class NTReader : public NTWorker
|
||||
{
|
||||
public:
|
||||
inline NTReader(NTWorkerContainer* container, int port)
|
||||
: NTWorker(container,port)
|
||||
{}
|
||||
virtual void run();
|
||||
};
|
||||
|
||||
class NTSelectReader : public NTWorker
|
||||
{
|
||||
public:
|
||||
NTSelectReader(NTWorkerContainer* container, int& port, unsigned int count);
|
||||
virtual ~NTSelectReader();
|
||||
virtual void run();
|
||||
private:
|
||||
Socket* m_sockets;
|
||||
unsigned int m_count;
|
||||
};
|
||||
|
||||
class NTPlugin : public Module
|
||||
{
|
||||
public:
|
||||
NTPlugin();
|
||||
virtual ~NTPlugin();
|
||||
virtual void initialize();
|
||||
virtual bool received(Message& msg, int id);
|
||||
private:
|
||||
bool m_first;
|
||||
};
|
||||
|
||||
// Static data
|
||||
static DataBlock s_stopPattern;
|
||||
static NTTest* s_test = 0;
|
||||
// Config
|
||||
static String s_localip;
|
||||
static unsigned int s_packetLen = 320;
|
||||
static unsigned int s_interval = 20;
|
||||
static unsigned int s_lifetime = 60;
|
||||
static unsigned long s_sleep = 2;
|
||||
// Plugin
|
||||
static NTPlugin plugin;
|
||||
|
||||
|
||||
/**
|
||||
* FDSetSelect
|
||||
*/
|
||||
// Constructor
|
||||
FDSetSelect::FDSetSelect()
|
||||
: m_read(new PrivateFDSet),
|
||||
m_write(new PrivateFDSet),
|
||||
m_event(new PrivateFDSet),
|
||||
m_crtR(0),
|
||||
m_crtW(0),
|
||||
m_crtE(0),
|
||||
m_maxHandle(Socket::invalidHandle()),
|
||||
m_selectError(false)
|
||||
{
|
||||
}
|
||||
|
||||
// Release private data
|
||||
FDSetSelect::~FDSetSelect()
|
||||
{
|
||||
delete m_read;
|
||||
delete m_write;
|
||||
delete m_event;
|
||||
}
|
||||
|
||||
// Check if data is available for read
|
||||
bool FDSetSelect::canRead(int handle) const
|
||||
{
|
||||
return m_read->isset(handle);
|
||||
}
|
||||
|
||||
// Check if a file descriptor can be used to write data
|
||||
bool FDSetSelect::canWrite(int handle) const
|
||||
{
|
||||
return m_write->isset(handle);
|
||||
}
|
||||
|
||||
|
||||
// Check if there is a pending event for a given file descriptor
|
||||
bool FDSetSelect::hasEvent(int handle) const
|
||||
{
|
||||
return m_event->isset(handle);
|
||||
}
|
||||
|
||||
// Append a file descriptor to read, write and/or event set.
|
||||
// Return false if handle is invalid or target set is missing (all flags are false)
|
||||
bool FDSetSelect::add(int handle, bool read, bool write, bool event)
|
||||
{
|
||||
if (!(read || write || event) || handle == Socket::invalidHandle() ||
|
||||
!Socket::canSelect(handle))
|
||||
return false;
|
||||
if (read) {
|
||||
m_read->add(handle);
|
||||
m_crtR = m_read;
|
||||
}
|
||||
if (write) {
|
||||
m_write->add(handle);
|
||||
m_crtW = m_write;
|
||||
}
|
||||
if (event) {
|
||||
m_event->add(handle);
|
||||
m_crtE = m_event;
|
||||
}
|
||||
if (m_maxHandle < handle)
|
||||
m_maxHandle = handle;
|
||||
return true;
|
||||
}
|
||||
|
||||
// Reset all file descriptor sets
|
||||
void FDSetSelect::reset()
|
||||
{
|
||||
m_read->reset();
|
||||
m_write->reset();
|
||||
m_event->reset();
|
||||
m_crtR = m_crtW = m_crtE = 0;
|
||||
m_maxHandle = Socket::invalidHandle();
|
||||
}
|
||||
|
||||
// Start waiting for a file descriptor state change
|
||||
int FDSetSelect::select(unsigned int uSec)
|
||||
{
|
||||
if (m_maxHandle == Socket::invalidHandle())
|
||||
return 0;
|
||||
struct timeval t;
|
||||
t.tv_sec = 0;
|
||||
t.tv_usec = (uSec > 0) ? uSec : 0;
|
||||
m_selectError = false;
|
||||
int result = ::select(m_maxHandle+1,&m_crtR->set,&m_crtW->set,&m_crtE->set,&t);
|
||||
if (result >= 0) {
|
||||
XDebug(DebugAll,"FDSetSelect got %d handlers [%p]",result,this);
|
||||
return result;
|
||||
}
|
||||
bool canRetry = (errno == EAGAIN || errno == EINTR || errno == EBADF);
|
||||
if (!(canRetry || m_selectError)) {
|
||||
Debug(DebugWarn,"FDSetSelect failed: %d: %s [%p]",errno,::strerror(errno),this);
|
||||
m_selectError = true;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Statistics
|
||||
*/
|
||||
void Statistics::output(String& dest)
|
||||
{
|
||||
dest << "=================================================================";
|
||||
#define stat_set64(text,val) \
|
||||
::sprintf(buf,FMT64,val); \
|
||||
dest << "\r\n" << text << buf;
|
||||
char buf[128];
|
||||
stat_set64("Packets: ",packets);
|
||||
stat_set64("Total (bytes): ",totalBytes);
|
||||
stat_set64("Errors: ",errors);
|
||||
stat_set64("Lost (bytes): ",lostBytes);
|
||||
dest <<"\r\nStopped: " << stopped;
|
||||
u_int64_t stop = msStop ? msStop : Time::msecNow();
|
||||
u_int64_t lenMsec = stop - msStart;
|
||||
u_int64_t lenSec = lenMsec / 1000;
|
||||
if (!lenSec)
|
||||
lenSec = 1;
|
||||
stat_set64("Test length (ms): ",lenMsec);
|
||||
stat_set64("Ratio (Mb/s): ",totalBytes / lenSec * 8 / 1000000);
|
||||
stat_set64("Ratio (packets/s): ",packets/lenSec);
|
||||
#undef stat_set64
|
||||
dest << "\r\n=================================================================";
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* NTTest
|
||||
*/
|
||||
NTTest::NTTest(const char* name)
|
||||
: Mutex(true),
|
||||
m_mutex(true),
|
||||
m_port(0),
|
||||
m_threads(0),
|
||||
m_send(true),
|
||||
m_packetLen(0),
|
||||
m_interval(0),
|
||||
m_lifetime(0),
|
||||
m_packetCount(0),
|
||||
m_workerCount(0),
|
||||
m_selectTimeout(-1)
|
||||
{
|
||||
debugChain(&plugin);
|
||||
m_id << plugin.debugName() << "/" << name;
|
||||
debugName(m_id);
|
||||
}
|
||||
|
||||
bool NTTest::init(NamedList& params)
|
||||
{
|
||||
Lock2 lock(*this,m_mutex);
|
||||
stop();
|
||||
|
||||
m_localip = params.getValue("localip",s_localip);
|
||||
if (m_localip.null()) {
|
||||
Debug(this,DebugNote,"Empty localip in section '%s'",debugName());
|
||||
return false;
|
||||
}
|
||||
m_remoteip = params.getValue("remoteip");
|
||||
if (m_remoteip.null()) {
|
||||
Debug(this,DebugNote,"Empty remoteip in section '%s'",debugName());
|
||||
return false;
|
||||
}
|
||||
|
||||
String tmp = params.getValue("port");
|
||||
m_port = tmp.toInteger(0);
|
||||
if (!m_port) {
|
||||
Debug(this,DebugNote,"Invalid port=%s in section '%s'",
|
||||
tmp.c_str(),debugName());
|
||||
return false;
|
||||
}
|
||||
|
||||
m_threads = params.getIntValue("threads",1);
|
||||
if (m_threads < 1)
|
||||
m_threads = 1;
|
||||
|
||||
m_send = params.getBoolValue("send",true);
|
||||
|
||||
m_packetLen = params.getIntValue("packetlen",s_packetLen);
|
||||
if (m_packetLen < 16)
|
||||
m_packetLen = 16;
|
||||
else if (m_packetLen > 1400)
|
||||
m_packetLen = 1400;
|
||||
m_interval = params.getIntValue("interval",s_interval);
|
||||
if (m_interval < 1)
|
||||
m_interval = 1;
|
||||
else if (m_interval > 120)
|
||||
m_interval = 120;
|
||||
m_lifetime = params.getIntValue("lifetime",s_lifetime);
|
||||
bool sendAllPackets = params.getBoolValue("sendallpackets",true);
|
||||
if (sendAllPackets)
|
||||
m_packetCount = m_lifetime * 1000 / m_interval;
|
||||
else
|
||||
m_packetCount = 0;
|
||||
m_selectTimeout = params.getIntValue("select-timeout",-1);
|
||||
|
||||
m_containers.clear();
|
||||
int workersets = params.getIntValue("workersets",1);
|
||||
if (workersets < 1 || (unsigned int)workersets > m_threads)
|
||||
workersets = 1;
|
||||
unsigned int nFull = 0;
|
||||
unsigned int nRest = 0;
|
||||
if (workersets == 1)
|
||||
nRest = m_threads;
|
||||
else {
|
||||
nFull = m_threads / (workersets - 1);
|
||||
nRest = m_threads - nFull * (workersets - 1);
|
||||
}
|
||||
for (int i = 1; i <= workersets; i++) {
|
||||
String id;
|
||||
id << m_id << "/" << i;
|
||||
if (i < workersets)
|
||||
m_containers.append(new NTWorkerContainer(this,nFull,id));
|
||||
else
|
||||
m_containers.append(new NTWorkerContainer(this,nRest,id));
|
||||
}
|
||||
|
||||
unsigned int sock = m_threads;
|
||||
if (m_selectTimeout >= 0)
|
||||
m_threads = workersets;
|
||||
|
||||
tmp = "";
|
||||
tmp << "\r\nAction: " << (m_send ? "send" : "recv");
|
||||
if (m_selectTimeout >= 0)
|
||||
tmp << "\r\nSockets: " << sock;
|
||||
else
|
||||
tmp << "\r\nThreads: " << m_threads;
|
||||
tmp << "\r\nLocal address: " << m_localip;
|
||||
tmp << "\r\nRemote address: " << m_remoteip;
|
||||
tmp << "\r\nPort: " << m_port;
|
||||
tmp << "\r\nPacket length: " << m_packetLen;
|
||||
tmp << "\r\nPackets: " << m_packetCount;
|
||||
tmp << "\r\nInterval: " << m_interval << "ms";
|
||||
tmp << "\r\nLifetime: " << m_lifetime << "s";
|
||||
tmp << "\r\nWorker sets: " << workersets;
|
||||
tmp << "\r\nSelect timeout: " << m_selectTimeout << (m_selectTimeout < 0 ? " (not used)" : "us");
|
||||
Debug(this,DebugInfo,"Initialized:%s",tmp.c_str());
|
||||
return true;
|
||||
}
|
||||
|
||||
void NTTest::start()
|
||||
{
|
||||
Lock lock(m_mutex);
|
||||
stop();
|
||||
DDebug(this,DebugAll,"Starting");
|
||||
m_localStats.reset();
|
||||
int port = m_port;
|
||||
for (ObjList* o = m_containers.skipNull(); o; o = o->skipNext())
|
||||
(static_cast<NTWorkerContainer*>(o->get()))->start(port);
|
||||
}
|
||||
|
||||
void NTTest::stop()
|
||||
{
|
||||
Lock lock(m_mutex);
|
||||
DDebug(this,DebugAll,"Stopping %u workers",m_workerCount);
|
||||
for (ObjList* o = m_containers.skipNull(); o; o = o->skipNext())
|
||||
(static_cast<NTWorkerContainer*>(o->get()))->stop();
|
||||
}
|
||||
|
||||
void NTTest::addWorker()
|
||||
{
|
||||
Lock lock(this);
|
||||
m_workerCount++;
|
||||
if (m_workerCount == m_threads)
|
||||
Debug(this,DebugAll,"Created %u workers",m_workerCount);
|
||||
}
|
||||
|
||||
void NTTest::removeWorker(NTWorker* worker)
|
||||
{
|
||||
Lock lock(this);
|
||||
if (!(worker && m_workerCount))
|
||||
return;
|
||||
m_localStats += worker->counters();
|
||||
m_workerCount--;
|
||||
if (m_workerCount)
|
||||
return;
|
||||
lock.drop();
|
||||
m_localStats.msStop = Time::msecNow();
|
||||
String tmp;
|
||||
m_localStats.output(tmp);
|
||||
Debug(this,DebugInfo,"No more workers. Local statistics:\r\n%s",tmp.c_str());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* NTWorkerContainer
|
||||
*/
|
||||
NTWorkerContainer::NTWorkerContainer(NTTest* test, unsigned int threads, const char* id)
|
||||
: Mutex(true),
|
||||
m_id(id),
|
||||
m_test(test),
|
||||
m_workerCount(0),
|
||||
m_threads(threads)
|
||||
{
|
||||
debugName(m_id);
|
||||
debugChain(m_test);
|
||||
}
|
||||
|
||||
void NTWorkerContainer::start(int& port)
|
||||
{
|
||||
Lock lock(this);
|
||||
stop();
|
||||
if (!m_test)
|
||||
return;
|
||||
lock.drop();
|
||||
DDebug(this,DebugAll,"Starting");
|
||||
if (m_test->selectTimeout() >= 0)
|
||||
(new NTSelectReader(this,port,m_threads))->startup();
|
||||
else
|
||||
for (unsigned int i = 0; i < m_threads; i++, port++)
|
||||
if (m_test->send())
|
||||
(new NTWriter(this,port))->startup();
|
||||
else
|
||||
(new NTReader(this,port))->startup();
|
||||
}
|
||||
|
||||
void NTWorkerContainer::stop()
|
||||
{
|
||||
Lock l(this);
|
||||
DDebug(this,DebugAll,"Stopping %u workers",m_workerCount);
|
||||
if (!m_workerCount)
|
||||
return;
|
||||
if (m_workerCount) {
|
||||
ListIterator iterw(m_workers);
|
||||
for (GenObject* o = 0; 0 != (o = iterw.get());)
|
||||
(static_cast<NTWorker*>(o))->cancel(false);
|
||||
}
|
||||
l.drop();
|
||||
while (m_workerCount)
|
||||
Thread::yield();
|
||||
DDebug(this,DebugAll,"Stopped");
|
||||
}
|
||||
|
||||
void NTWorkerContainer::addWorker(NTWorker* worker)
|
||||
{
|
||||
Lock lock(this);
|
||||
if (!worker)
|
||||
return;
|
||||
ObjList* obj = m_workers.append(worker);
|
||||
if (!obj)
|
||||
return;
|
||||
m_workerCount++;
|
||||
obj->setDelete(false);
|
||||
if (m_workerCount >= m_threads)
|
||||
DDebug(this,DebugAll,"Created %u workers",m_workerCount);
|
||||
lock.drop();
|
||||
if (m_test)
|
||||
m_test->addWorker();
|
||||
}
|
||||
|
||||
void NTWorkerContainer::removeWorker(NTWorker* worker)
|
||||
{
|
||||
Lock lock(this);
|
||||
if (!(worker && m_workerCount))
|
||||
return;
|
||||
m_workers.remove(worker,false);
|
||||
if (m_workerCount)
|
||||
m_workerCount--;
|
||||
if (!m_workerCount)
|
||||
DDebug(this,DebugAll,"No more workers");
|
||||
lock.drop();
|
||||
if (m_test)
|
||||
m_test->removeWorker(worker);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* NTWorker
|
||||
*/
|
||||
NTWorker::NTWorker(NTWorkerContainer* container, int port, const char* name)
|
||||
: Thread(name),
|
||||
m_container(container),
|
||||
m_test(container ? container->test() : 0),
|
||||
m_timeToDie(0),
|
||||
m_addr(AF_INET)
|
||||
{
|
||||
if (!(m_container && m_test))
|
||||
return;
|
||||
container->addWorker(this);
|
||||
m_addr.host(m_test->send() ? m_test->remoteip() : m_test->localip());
|
||||
m_addr.port(port);
|
||||
if (!m_test->packetCount() && m_test->lifetime())
|
||||
m_timeToDie = Time::msecNow() + m_test->lifetime() * 1000;
|
||||
}
|
||||
|
||||
NTWorker::~NTWorker()
|
||||
{
|
||||
if (m_socket.valid()) {
|
||||
m_socket.setLinger(-1);
|
||||
m_socket.terminate();
|
||||
}
|
||||
if (m_container)
|
||||
m_container->removeWorker(this);
|
||||
}
|
||||
|
||||
bool NTWorker::initSocket(Socket* sock, SocketAddr* addr)
|
||||
{
|
||||
if (!(m_container && m_test))
|
||||
return false;
|
||||
if (!sock) {
|
||||
sock = &m_socket;
|
||||
addr = &m_addr;
|
||||
}
|
||||
if (!sock->create(addr->family(),SOCK_DGRAM)) {
|
||||
Debug(m_container,DebugNote,"Failed to create socket: %d '%s' [%p]",
|
||||
sock->error(),::strerror(sock->error()),this);
|
||||
return false;
|
||||
}
|
||||
if (!m_test->send() && !sock->bind(*addr)) {
|
||||
Debug(m_container,DebugNote,"Failed to bind socket on port %d: %d '%s' [%p]",
|
||||
addr->port(),sock->error(),::strerror(sock->error()),this);
|
||||
return false;
|
||||
}
|
||||
sock->setBlocking(false);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* NTWriter
|
||||
*/
|
||||
void NTWriter::run()
|
||||
{
|
||||
if (!initSocket())
|
||||
return;
|
||||
unsigned char buf[m_test->packetLen()];
|
||||
buf[0] = 1;
|
||||
while (true) {
|
||||
u_int64_t now = Time::msecNow();
|
||||
if (now < m_timeToSend) {
|
||||
Thread::msleep(s_sleep,true);
|
||||
continue;
|
||||
}
|
||||
|
||||
bool die = false;
|
||||
if (m_test->packetCount())
|
||||
die = m_counters.packets >= m_test->packetCount();
|
||||
else
|
||||
die = m_timeToDie && now > m_timeToDie;
|
||||
if (die) {
|
||||
Thread::msleep(5,true);
|
||||
if (0 < m_socket.sendTo(s_stopPattern.data(),s_stopPattern.length(),m_addr))
|
||||
m_counters.stopped = 1;
|
||||
break;
|
||||
}
|
||||
|
||||
Thread::check(true);
|
||||
m_timeToSend = now + m_test->interval();
|
||||
int w = m_socket.sendTo(buf,m_test->packetLen(),m_addr);
|
||||
if (w != m_socket.socketError() || m_socket.canRetry()) {
|
||||
if (w == m_socket.socketError())
|
||||
continue;
|
||||
if (w)
|
||||
m_counters.success(w);
|
||||
if ((unsigned int)w < m_test->packetLen())
|
||||
m_counters.failure(m_test->packetLen() - w);
|
||||
continue;
|
||||
}
|
||||
Debug(m_container,DebugNote,"SEND error dest='%s:%d': %d '%s' [%p]",
|
||||
m_addr.host().c_str(),m_addr.port(),
|
||||
m_socket.error(),::strerror(m_socket.error()),this);
|
||||
m_counters.failure(m_test->packetLen());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* NTReader
|
||||
*/
|
||||
void NTReader::run()
|
||||
{
|
||||
if (!initSocket())
|
||||
return;
|
||||
unsigned char buf[m_test->packetLen()];
|
||||
SocketAddr addr;
|
||||
while (true) {
|
||||
if (m_timeToDie && (Time::msecNow() > m_timeToDie))
|
||||
break;
|
||||
Thread::msleep(s_sleep,true);
|
||||
int r = m_socket.recvFrom(buf,m_test->packetLen(),addr);
|
||||
if (r > 0) {
|
||||
if (buf[0] == 0) {
|
||||
m_counters.stopped = 1;
|
||||
break;
|
||||
}
|
||||
m_counters.success(r);
|
||||
continue;
|
||||
}
|
||||
if (r == 0 || (r == m_socket.socketError() && m_socket.canRetry()))
|
||||
continue;
|
||||
Debug(m_container,DebugNote,"RECV error src='%s:%d': %d '%s' [%p]",
|
||||
addr.host().c_str(),addr.port(),
|
||||
m_socket.error(),::strerror(m_socket.error()),this);
|
||||
m_counters.failure(0);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* NTSelectReader
|
||||
*/
|
||||
NTSelectReader::NTSelectReader(NTWorkerContainer* container, int& port, unsigned int count)
|
||||
: NTWorker(container,0,"NTSelectReader"),
|
||||
m_sockets(0),
|
||||
m_count(count)
|
||||
{
|
||||
DDebug(container,DebugAll,"NTSelectReader sockets=%u",count);
|
||||
m_sockets = new Socket[m_count];
|
||||
unsigned int ok = 0;
|
||||
for (unsigned int i = 0; i < count; i++, port++) {
|
||||
SocketAddr addr(AF_INET);
|
||||
addr.host(m_test->localip());
|
||||
addr.port(port);
|
||||
if (initSocket(&m_sockets[i],&addr))
|
||||
ok++;
|
||||
}
|
||||
if (!ok) {
|
||||
Debug(container,DebugNote,"NTSelectReader: Bind or create failed for all sockets");
|
||||
delete[] m_sockets;
|
||||
m_sockets = 0;
|
||||
m_count = 0;
|
||||
}
|
||||
}
|
||||
|
||||
NTSelectReader::~NTSelectReader()
|
||||
{
|
||||
if (!m_sockets)
|
||||
return;
|
||||
for (unsigned int i = 0; i < m_count; i++)
|
||||
if (m_sockets[i].valid()) {
|
||||
m_sockets[i].setLinger(-1);
|
||||
m_sockets[i].terminate();
|
||||
}
|
||||
delete[] m_sockets;
|
||||
}
|
||||
|
||||
void NTSelectReader::run()
|
||||
{
|
||||
if (!m_count || !m_test || m_test->send())
|
||||
return;
|
||||
DDebug(m_container,DebugAll,"Select reader worker started");
|
||||
unsigned char buf[m_test->packetLen()];
|
||||
SocketAddr addr;
|
||||
int ok = 0;
|
||||
FDSetSelect set;
|
||||
while (true) {
|
||||
if (m_counters.stopped == m_count)
|
||||
break;
|
||||
if (m_timeToDie && (Time::msecNow() > m_timeToDie))
|
||||
break;
|
||||
set.reset();
|
||||
for (unsigned int i = 0; i < m_count; i++)
|
||||
set.add(m_sockets[i].handle(),true,false,false);
|
||||
ok = set.select(m_container->test()->selectTimeout());
|
||||
if (ok <= 0) {
|
||||
if (!m_test->selectTimeout())
|
||||
Thread::msleep(1,true);
|
||||
continue;
|
||||
}
|
||||
for (unsigned int i = 0; i < m_count; i++) {
|
||||
if (!(m_sockets[i].valid() && set.canRead(m_sockets[i].handle())))
|
||||
continue;
|
||||
int r = m_sockets[i].recvFrom(buf,m_test->packetLen(),addr);
|
||||
if (r > 0) {
|
||||
if (buf[0]) {
|
||||
if ((unsigned int)r != m_test->packetLen())
|
||||
Debug(m_container,DebugMild,"RECV %u expected=%u [%p]",
|
||||
r,m_test->packetLen(),this);
|
||||
m_counters.success(r);
|
||||
}
|
||||
else {
|
||||
m_sockets[i].setLinger(-1);
|
||||
m_sockets[i].terminate();
|
||||
m_counters.stopped++;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
m_counters.failure(0);
|
||||
if (r == 0 || (r == m_sockets[i].socketError() && m_sockets[i].canRetry()))
|
||||
continue;
|
||||
Debug(m_container,DebugNote,"RECV error src='%s:%d': %d '%s' [%p]",
|
||||
addr.host().c_str(),addr.port(),
|
||||
m_sockets[i].error(),::strerror(m_sockets[i].error()),this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Plugin
|
||||
*/
|
||||
NTPlugin::NTPlugin()
|
||||
: Module("nettest","misc"), m_first(true)
|
||||
{
|
||||
Output("Loaded module Network Test");
|
||||
}
|
||||
|
||||
NTPlugin::~NTPlugin()
|
||||
{
|
||||
Output("Unloading module Network Test");
|
||||
}
|
||||
|
||||
void NTPlugin::initialize()
|
||||
{
|
||||
Output("Initializing module Network Test");
|
||||
|
||||
debugLevel(10);
|
||||
|
||||
if (m_first) {
|
||||
m_first = false;
|
||||
setup();
|
||||
installRelay(Halt);
|
||||
}
|
||||
|
||||
// Reset statistics
|
||||
lock();
|
||||
TelEngine::destruct(s_test);
|
||||
|
||||
// Get new values from config
|
||||
Configuration cfg(Engine::configFile("nettest"));
|
||||
NamedList* general = cfg.getSection("general");
|
||||
NamedList dummy("");
|
||||
if (!general)
|
||||
general = &dummy;
|
||||
s_localip = general->getValue("localip");
|
||||
s_packetLen = general->getIntValue("packetlen",320);
|
||||
if (s_packetLen < 16)
|
||||
s_packetLen = 16;
|
||||
else if (s_packetLen > 1400)
|
||||
s_packetLen = 1400;
|
||||
s_interval = general->getIntValue("interval",20);
|
||||
if (s_interval < 1)
|
||||
s_interval = 1;
|
||||
else if (s_interval > 120)
|
||||
s_interval = 120;
|
||||
s_lifetime = general->getIntValue("lifetime",60);
|
||||
s_sleep = cfg.getIntValue("general","sleep",2);
|
||||
if (s_sleep < 1)
|
||||
s_sleep = 1;
|
||||
else if (s_sleep > 10)
|
||||
s_sleep = 10;
|
||||
s_stopPattern.assign(0,s_packetLen);
|
||||
|
||||
Debug(this,DebugInfo,
|
||||
"Init: localip=%s packet=%u interval=%ums lifetime=%us",
|
||||
s_localip.c_str(),s_packetLen,s_interval,s_lifetime);
|
||||
|
||||
unsigned int n = cfg.sections();
|
||||
for (unsigned int i = 0; i < n; i++) {
|
||||
NamedList* sect = cfg.getSection(i);
|
||||
if (!sect || sect->null() || *sect == "general")
|
||||
continue;
|
||||
|
||||
s_test = new NTTest(*sect);
|
||||
if (!s_test->init(*sect)) {
|
||||
Debug(this,DebugNote,"Failed to init test from section '%s'",sect->c_str());
|
||||
TelEngine::destruct(s_test);
|
||||
continue;
|
||||
}
|
||||
|
||||
s_test->start();
|
||||
break;
|
||||
}
|
||||
|
||||
unlock();
|
||||
}
|
||||
|
||||
bool NTPlugin::received(Message& msg, int id)
|
||||
{
|
||||
if (id == Halt)
|
||||
TelEngine::destruct(s_test);
|
||||
return Module::received(msg,id);
|
||||
}
|
||||
|
||||
}; // anonymous namespace
|
||||
|
||||
/* vi: set ts=8 sw=4 sts=4 noet: */
|
Loading…
Reference in New Issue