Implemented recv. Check for thread cancel when sleeping.

git-svn-id: http://yate.null.ro/svn/yate/trunk@6054 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
marian 2015-10-12 07:35:58 +00:00
parent 85eb9cd410
commit a4ce1f1ed5
2 changed files with 210 additions and 12 deletions

View File

@ -74,3 +74,22 @@
; minFilterBandwidth: integer: Minimum settable filter bandwidth (Hz)
; Valid values 100000 - 5000000, default 1500000
;minFilterBandwidth=1500000
; profiling: boolean: True if a profiling tool is running
; Disable some time related capabilities
;profiling=no
; sample_energize: integer: Optional value used to energize sent data
; Interval allowed: [0..10000]. 0: disabled
;sample_energize=0
; rx_file_raw: string: Optional path to file containg raw data (float) to be returned
; when receive is requested
;rx_file_raw=
; rx_buf_chunk: integer: Optional RX data chunk size
; This value will be used to allign upper layer timestamp and return aligned data from
; configured rx_file_raw file
; This parameter will be ignored if non 0 and rx_file_raw file samples count is not a
; a multiple of rx_buf_chunk samples
;rx_buf_chunk=0

View File

@ -22,7 +22,6 @@
#include <yatephone.h>
#include <yateradio.h>
#include <string.h>
#include <unistd.h>
#include <math.h>
using namespace TelEngine;
@ -89,6 +88,22 @@ protected:
uint64_t getRxUsec(uint64_t ts) const;
uint64_t getTxUsec(uint64_t ts) const;
bool control(NamedList& params);
inline unsigned int sleep(unsigned int us) {
while (us) {
if (us > Thread::idleUsec()) {
Thread::usleep(Thread::idleUsec());
us -= Thread::idleUsec();
}
else {
Thread::usleep(us);
us = 0;
}
if (Thread::check(false))
return Cancelled;
}
return 0;
}
void setRxBuffer(uint64_t& when, float* samples, unsigned int size);
private:
RadioCapability m_caps;
@ -108,6 +123,12 @@ private:
unsigned int m_txLatency;
uint64_t m_rxSamp;
uint64_t m_txSamp;
DataBlock m_rxDataBuf; // RX data
unsigned int m_rxDataBufSamples; // Number of samples in buffer
unsigned int m_rxDataChunkSamples; // Number of samples in a buffer chunk (aligned data)
unsigned int m_rxDataOffs; // Current offset in buffer (in samples)
bool m_profiling; // Running a profiling tool
int16_t m_sampleEnergize; // TX data sample energize
};
class DummyModule : public Module
@ -139,13 +160,52 @@ INIT_PLUGIN(DummyModule);
static Configuration s_cfg;
// Energize a number. Refer the input value to the requested energy
static inline int16_t energize(float value, float scale, int16_t refVal, unsigned int& clamped)
{
int16_t v = (int16_t)::round(value * scale);
if (v > refVal) {
clamped++;
return refVal;
}
if (v < -refVal) {
clamped++;
return -refVal;
}
return v;
}
// Simulate float to int16_t data conversion:
// - Sample energize
// - Bounds check
static void sampleEnergize(float* samples, unsigned int size, float scale,
unsigned int refVal, unsigned int& clamped)
{
if (!samples)
return;
int16_t buf[1024];
float scaleI = scale * refVal;
float scaleQ = scale * refVal;
while (size) {
int16_t* b = buf;
unsigned int n = size > 512 ? 512 : size;
size -= n;
while (n--) {
*b++ = energize(*samples++,scaleI,refVal,clamped);
*b++ = energize(*samples++,scaleQ,refVal,clamped);
}
}
}
//
// DummyInterface
//
DummyInterface::DummyInterface(const char* name, const NamedList& config)
: RadioInterface(name),
m_startTime(0), m_sample(0), m_filter(0), m_rxFreq(0), m_txFreq(0)
m_startTime(0), m_sample(0), m_filter(0), m_rxFreq(0), m_txFreq(0),
m_rxDataBufSamples(0), m_rxDataChunkSamples(0), m_rxDataOffs(0),
m_profiling(false), m_sampleEnergize(0)
{
debugChain(&__plugin);
m_address << __plugin.name() << "/" << config;
@ -168,8 +228,51 @@ DummyInterface::DummyInterface(const char* name, const NamedList& config)
m_sampleStep = config.getIntValue("sample_step",1,1,1000);
m_filterStep = config.getIntValue("filter_step",250000,100000,5000000);
m_rxLatency = config.getIntValue("rx_latency",10000,0,50000);
m_txLatency = config.getIntValue("tx_latency",10000,0,50000);
m_txLatency = config.getIntValue("tx_latency",10000,0,100000);
m_radioCaps = &m_caps;
m_profiling = config.getBoolValue("profiling",false);
m_sampleEnergize = config.getIntValue("sample_energize",0,0,10000);
const String& rxFile = config["rx_file_raw"];
if (rxFile) {
File f;
const char* oper = 0;
if (f.openPath(rxFile)) {
int64_t len = f.length();
if (len > 0) {
if ((len % (2 * sizeof(float))) == 0) {
m_rxDataBuf.resize(len);
int rd = f.readData(m_rxDataBuf.data(),m_rxDataBuf.length());
if (rd != (int)m_rxDataBuf.length()) {
oper = "read";
m_rxDataBuf.clear();
}
}
else
Debug(this,DebugConf,"Invalid RX file file '%s' length " FMT64 " [%p]",
rxFile.c_str(),len,this);
}
else
oper = "get length";
}
else
oper = "open";
if (oper) {
String tmp;
Thread::errorString(tmp,f.error());
Debug(this,DebugMild,"RX file '%s' %s failed: %d %s [%p]",
rxFile.c_str(),oper,f.error(),tmp.c_str(),this);
}
}
m_rxDataBufSamples = m_rxDataBuf.length() / (2 * sizeof(float));
if (m_rxDataBufSamples) {
m_rxDataChunkSamples = config.getIntValue("rx_buf_chunk",0,0);
if (m_rxDataChunkSamples && (m_rxDataBufSamples % m_rxDataChunkSamples) != 0) {
Debug(this,DebugConf,
"Ignoring rx_buf_chunk=%u: not a multiple of rx buffer samples %u [%p]",
m_rxDataChunkSamples,m_rxDataBufSamples,this);
m_rxDataChunkSamples = 0;
}
}
Debug(this,DebugAll,"Interface created [%p]",this);
}
@ -261,15 +364,28 @@ unsigned int DummyInterface::send(uint64_t when, float* samples, unsigned size,
if (!(m_startTime && m_sample))
return NotInitialized;
float scale = powerScale ? *powerScale : 1.0f;
unsigned int clamped = 0;
if (m_sampleEnergize)
sampleEnergize(samples,size,scale,m_sampleEnergize,clamped);
else
for (unsigned int i = 0; i < 2 * size; i++)
if (::fabs(scale * samples[i]) > 1.0f)
clamped++;
unsigned int res = 0;
int64_t delta = getTxUsec(when) - Time::now();
if (delta > 0)
::usleep(delta);
else
res |= TooLate;
for (unsigned int i = 0; i < 2 * size; i++) {
if (::fabs(scale * samples[i]) > 1.0f)
res |= Saturation;
if (delta > 0) {
if (m_profiling && delta > (int64_t)Thread::idleUsec())
delta = Thread::idleUsec();
res = sleep(delta);
// Stop if operation was cancelled
if (res)
return res;
}
else if (!m_profiling && delta < 0)
res = TooLate;
if (clamped) {
Debug(this,DebugNote,"Tx data clamped %u/%u [%p]",clamped,size,this);
res |= Saturation;
}
if (when != m_txSamp)
Debug(this,DebugNote,"Tx discontinuity of " FMT64 ": " FMT64U " -> " FMT64U,
@ -282,8 +398,21 @@ unsigned int DummyInterface::recv(uint64_t& when, float* samples, unsigned int&
{
if (!(m_startTime && m_sample))
return NotInitialized;
// TODO
return NotSupported;
int64_t delta = getRxUsec(when) - Time::now();
unsigned int res = 0;
if (delta > 0) {
// Requested timestamp is in the future
unsigned int res = sleep(delta);
// Stop if operation was cancelled
if (res)
return res | status();
}
else if (!m_profiling && delta < 0)
res = TooEarly;
if (!res && m_rxDataBufSamples)
setRxBuffer(when,samples,size);
m_rxSamp = when + size;
return res | status();
}
unsigned int DummyInterface::setFrequency(uint64_t hz, bool tx)
@ -346,6 +475,56 @@ bool DummyInterface::control(NamedList& params)
return true;
}
void DummyInterface::setRxBuffer(uint64_t& when, float* samples, unsigned int size)
{
if (!(m_rxDataBufSamples && samples && size))
return;
// Align upper layer RX timestamp if buffer size is given
// and it requested a multiple of it
if (m_rxDataChunkSamples && (when % m_rxDataChunkSamples) == 0) {
uint64_t ts = getTS();
if (ts > when)
when += m_rxDataChunkSamples * ((ts - when) / m_rxDataChunkSamples);
}
// Skip samples in RX buffer according to requested timestamp
if (m_rxSamp) {
uint64_t skip = 0;
if (m_rxSamp < when)
skip = (when - m_rxSamp) % m_rxDataBufSamples;
else
skip = (m_rxSamp - when) % m_rxDataBufSamples;
if (skip) {
m_rxDataOffs += skip;
if (m_rxDataOffs >= m_rxDataBufSamples)
m_rxDataOffs -= m_rxDataBufSamples;
}
}
// Force data align
// Align data offset at chunk size if timestamp is multiple of chunk size
if (m_rxDataChunkSamples && m_rxDataOffs && (when % m_rxDataChunkSamples) == 0) {
unsigned int delta = m_rxDataOffs % m_rxDataChunkSamples;
if (delta) {
m_rxDataOffs += m_rxDataChunkSamples - delta;
if (m_rxDataOffs >= m_rxDataBufSamples)
m_rxDataOffs -= m_rxDataBufSamples;
}
}
const float* buf = (float*)m_rxDataBuf.data();
while (size) {
unsigned int cpSamples = m_rxDataBufSamples - m_rxDataOffs;
if (!cpSamples) {
m_rxDataOffs = 0;
cpSamples = m_rxDataBufSamples;
}
if (cpSamples > size)
cpSamples = size;
::memcpy(samples,buf + 2 * m_rxDataOffs,cpSamples * 2 * sizeof(float));
size -= cpSamples;
m_rxDataOffs += cpSamples;
samples += 2 * cpSamples;
}
}
//
// DummyModule