Added recv thread priority config. Improved stats report.

git-svn-id: http://yate.null.ro/svn/yate/trunk@6237 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
marian 2017-06-22 13:19:36 +00:00
parent eed4b72288
commit 74af6b10ac
2 changed files with 36 additions and 17 deletions

View File

@ -52,6 +52,12 @@
; cmd:loopback: Set radio interface loopback
; cmd:calibrate: Execute the calibrate operation on radio interface
; priority/recv_priority: Send and receive threads prioriy
; Allowed values: lowest, low, normal, high, highest
; Defaults to 'normal'
;priority=normal
;recv_priority=normal
[radiodatafile]
; This section configures radio data file processing

View File

@ -79,7 +79,7 @@ protected:
}
}
bool wait(const String& param);
void notify(const char* state, const char* error = 0) const;
void notify(const char* state, const char* error = 0, const NamedList* params = 0) const;
RadioInterface* m_radio;
RadioTestRecv* m_recv;
@ -101,7 +101,7 @@ protected:
// RX
RadioTestIO m_rx;
RadioReadBufs m_bufs;
unsigned int m_skippedBuffs;
uint64_t m_rxSkippedSamples;
DataBlock m_crt;
DataBlock m_aux;
DataBlock m_extra;
@ -110,15 +110,15 @@ protected:
class RadioTestRecv : public Thread
{
public:
inline RadioTestRecv(RadioTest* test)
: Thread("RadioTestRecv"), m_test(test)
inline RadioTestRecv(RadioTest* test, Thread::Priority prio)
: Thread("RadioTestRecv",prio), m_test(test)
{}
~RadioTestRecv()
{ notify(); }
void cleanup()
{ notify(); }
static inline RadioTestRecv* start(RadioTest* test) {
RadioTestRecv* tmp = new RadioTestRecv(test);
static inline RadioTestRecv* start(RadioTest* test, Thread::Priority prio) {
RadioTestRecv* tmp = new RadioTestRecv(test,prio);
if (tmp->startup())
return tmp;
delete tmp;
@ -404,7 +404,7 @@ RadioTest::RadioTest(const NamedList& params, const NamedList& radioParams)
m_sendBufCount(0),
m_pulse(0),
m_rx(false),
m_skippedBuffs(0)
m_rxSkippedSamples(0)
{
m_params.setParam("orig_test_name",params.c_str());
m_params.assign(__plugin.name() + "/" + params.c_str());
@ -537,7 +537,9 @@ void RadioTest::run()
if (!(m_tx.enabled || m_rx.enabled))
break;
if (m_rx.enabled) {
m_recv = RadioTestRecv::start(this);
updateTs(false);
m_recv = RadioTestRecv::start(this,
Thread::priority(m_params.getValue("recv_priority")));
if (!m_recv)
TEST_FAIL_BREAK("Failed to start read data thread",true);
}
@ -551,7 +553,7 @@ void RadioTest::run()
if (m_rx.enabled && !m_recv && !Thread::check(false))
TEST_FAIL_BREAK("Read data thread abnormally terminated",true);
}
if (!error && Thread::check(false))
if (Thread::check(false))
error = "stopped";
readStop();
#undef TEST_FAIL_BREAK
@ -573,24 +575,28 @@ void RadioTest::terminated(const char* error)
if (!m_started)
return;
m_started = false;
String s;
RadioTestIO* txrx[2] = {&m_tx,&m_rx};
uint64_t now = Time::now();
NamedList report("");
for (uint8_t i = 0; i < 2; i++) {
RadioTestIO& io = *(txrx[i]);
if (!io.enabled)
continue;
String prefix = io.tx ? "tx_" : "rx_";
s << "\r\n" << prefix << "transferred=" << io.transferred;
report.addParam(prefix + "transferred",String(io.transferred));
if (io.transferred) {
unsigned int sec = (unsigned int)((now - io.startTime) / 1000000);
if (sec)
s << " (avg: " << (io.transferred / sec) << " samples/sec)";
report.addParam(prefix + "samplerate",String(io.transferred / sec));
}
s << "\r\n" << prefix << "timestamp=" << io.ts;
report.addParam(prefix + "ts",String(io.ts));
}
if (m_rxSkippedSamples)
report.addParam("rx_skipped_samples",String(m_rxSkippedSamples));
String s;
report.dump(s,"\r\n");
Debug(this,DebugInfo,"Terminated [%p]%s",this,encloseDashes(s));
notify("stop",error);
notify("stop",error,&report);
if (!m_repeat)
return;
Debug(this,DebugNote,"Restarting repeat=%u [%p]",m_repeat,this);
@ -769,8 +775,13 @@ bool RadioTest::read()
m_rx.startTime = Time::now();
if (!m_rx.ts)
updateTs(false);
m_skippedBuffs = 0;
unsigned int code = m_radio->read(m_rx.ts,m_bufs,m_skippedBuffs);
unsigned int skippedBuffs = 0;
unsigned int code = m_radio->read(m_rx.ts,m_bufs,skippedBuffs);
if (skippedBuffs) {
uint64_t skipped = skippedBuffs * m_bufs.bufSamples();
m_rxSkippedSamples += skipped;
m_rx.transferred += skipped;
}
if (!code) {
if (m_bufs.full(m_bufs.crt))
m_rx.transferred += m_bufs.bufSamples();
@ -830,7 +841,7 @@ bool RadioTest::wait(const String& param)
return n == 0;
}
void RadioTest::notify(const char* state, const char* error) const
void RadioTest::notify(const char* state, const char* error, const NamedList* params) const
{
const String& id = m_params[YSTRING("notify")];
if (!id)
@ -841,6 +852,8 @@ void RadioTest::notify(const char* state, const char* error) const
m->addParam("id",id);
m->addParam("state",state);
m->addParam("error",error,false);
if (params)
m->copyParams(*params);
Engine::enqueue(m);
}