Implemented timestamp resync. Added configurable option for allowed interval with timestamps in the past to make sure we are returning from read. Implemented capability to alter rx timestamps (for debug purposes).

git-svn-id: http://yate.null.ro/svn/yate/trunk@6045 acf43c95-373e-0410-b603-e72c3f656dc1
This commit is contained in:
marian 2015-09-22 13:36:26 +00:00
parent 085ca47e11
commit 9575abca85
2 changed files with 234 additions and 72 deletions

View File

@ -39,6 +39,14 @@
; positive: the number of times to check
;rxcheckts=-1
; rx_ts_past_error_interval: integer: Interval (in milliseconds) for allowed late timestamps
; when reading from device. I.e. how many old samples in a row we accept when reading
; This value is used to make sure we are returning from device read (with error): avoid
; reading forever if we have a faulty device
; Allowed interval [50..10000]
; Defaults to 200
;rx_ts_past_error_interval=200
[libusb]
; This section configures libusb backend related data

View File

@ -682,7 +682,7 @@ public:
showDcOffsChange(0), showFpgaPhaseChange(0), showPowerBalanceChange(0),
startTime(0), transferred(0),
timestamp(0), lastTs(0), buffers(0), hdrLen(0), bufSamples(0),
bufSamplesLen(0), crtBuf(0), crtBufSampOffs(0),
bufSamplesLen(0), crtBuf(0), crtBufSampOffs(0), newBuffer(true),
syncFlags(0), syncTs(0), syncStatus(tx),
dataDumpParams(""), dataDump(0), dataDumpFile(brfDir(tx)),
upDumpParams(""), upDump(0), upDumpFile(String(brfDir(tx)) + "-APP"),
@ -717,6 +717,7 @@ public:
inline bool advanceBuffer() {
if (crtBuf < buffers)
setCrtBuf(crtBuf + 1);
newBuffer = true;
return crtBuf < buffers;
}
inline uint8_t* bufStart(unsigned int index)
@ -750,6 +751,7 @@ public:
m_bufEndianOk = false;
#endif
setCrtBuf(start ? 0 : buffers);
newBuffer = true;
}
#ifdef LITTLE_ENDIAN
inline void fixEndian()
@ -802,6 +804,7 @@ public:
unsigned int bufLen; // Length of a single buffer (in bytes)
unsigned int crtBuf; // Current buffer
unsigned int crtBufSampOffs; // Current buffer samples offset
bool newBuffer; // New buffer to process
DataBlock buffer; // I/O buffer
// Sync timestamp
unsigned int syncFlags;
@ -1450,7 +1453,7 @@ private:
void ioBufCheckTs(bool tx, unsigned int nBufs = 0);
// Alter data
void updateAlterData(const NamedList& params);
void rxAlterData();
void rxAlterData(bool first);
// Calibration utilities
unsigned int calibrateAuto(String* error);
unsigned int calBackupRestore(BrfCalData& bak, bool backup, String* error);
@ -1580,7 +1583,10 @@ private:
BrfDevIO m_rxIO;
LusbTransfer m_usbTransfer[EpCount]; // List of USB transfers
int m_loopback; // Current loopback mode
String m_name;
uint64_t m_rxTimestamp; // RX timestamp
uint64_t m_rxResyncCandidate; // RX: timestamp resync value
unsigned int m_rxTsPastIntervalMs; // RX: allowed timestamp in the past interval in 1 read operation
unsigned int m_rxTsPastSamples; // RX: How many samples in the past to allow in 1 read operation
// TX power scale
bool m_calibrated;
float m_txPowerBalance;
@ -1592,8 +1598,13 @@ private:
int16_t m_wrMaxI;
int16_t m_wrMaxQ;
// Alter data
NamedList m_rxAlterDataParams;
bool m_rxAlterData;
int16_t m_rxAlterIncrement;
String m_rxAlterTsJumpPatern; // Pattern used to alter rx timestamps (kept to check changes)
bool m_rxAlterTsJumpSingle; // Stop altering ts after first pass
DataBlock m_rxAlterTsJump; // Values to alter rx timestamps
unsigned int m_rxAlterTsJumpPos; // Current position in rx alter timestamps
bool m_txPatternChanged;
String m_txPatternStr;
DataBlock m_txPattern;
@ -2287,6 +2298,10 @@ BrfLibUsbDevice::BrfLibUsbDevice(BrfInterface* owner)
m_txIO(true),
m_rxIO(false),
m_loopback(0),
m_rxTimestamp(0),
m_rxResyncCandidate(0),
m_rxTsPastIntervalMs(200),
m_rxTsPastSamples(0),
m_calibrated(false),
m_txPowerBalance(1),
m_txPowerBalanceChanged(false),
@ -2296,8 +2311,11 @@ BrfLibUsbDevice::BrfLibUsbDevice(BrfInterface* owner)
m_wrPowerScaleQ(s_sampleEnergize),
m_wrMaxI(s_sampleEnergize),
m_wrMaxQ(s_sampleEnergize),
m_rxAlterDataParams(""),
m_rxAlterData(false),
m_rxAlterIncrement(0),
m_rxAlterTsJumpSingle(true),
m_rxAlterTsJumpPos(0),
m_txPatternChanged(false),
m_txPatternBufPos(0),
m_txPatternBufSamples(0),
@ -2412,12 +2430,13 @@ void BrfLibUsbDevice::dumpStats(String& buf, const char* sep)
}
static inline void buildTimestampReport(String& buf, bool tx, uint64_t our, uint64_t board,
unsigned int code)
unsigned int code, bool app = true)
{
if (!code) {
const char* what = app ? "app" : "crt";
int64_t delta = (int64_t)(our - board);
buf.printf("%s: app=" FMT64U "\tboard=" FMT64U "\tdelta=" FMT64 "\tapp_position: %s",
brfDir(tx),our,board,delta,(delta < 0 ? "past" : "future"));
buf.printf("%s: %s=" FMT64U "\tboard=" FMT64U "\tdelta=" FMT64 "\t%s_position: %s",
brfDir(tx),what,our,board,delta,what,(delta < 0 ? "past" : "future"));
}
else
buf << brfDir(tx) << ": failure - " << RadioInterface::errorName(code);
@ -2434,13 +2453,18 @@ void BrfLibUsbDevice::dumpTimestamps(String& buf, const char* sep)
uint64_t tsRx = 0;
uint64_t ourRx = m_rxIO.timestamp;
unsigned int codeRx = internalGetTimestamp(false,tsRx);
uint64_t rx = m_rxTimestamp;
rxSerialize.drop();
String s;
String sTx;
String sRx;
String sRxTs;
buildTimestampReport(sTx,true,ourTx,tsTx,codeTx);
buildTimestampReport(sRx,false,ourRx,tsRx,codeRx);
if (!codeRx)
buildTimestampReport(sRxTs,false,rx,tsRx,codeRx,false);
buf.append(sTx,sep) << sep << sRx;
buf.append(sRxTs,sep);
}
void BrfLibUsbDevice::dumpDev(String& buf, bool info, bool state, const char* sep,
@ -2546,6 +2570,7 @@ void BrfLibUsbDevice::reLoad()
setDataDump();
checkTs(true,gen->getIntValue("txcheckts",0));
checkTs(false,gen->getIntValue("rxcheckts",-1));
updateAlterData(*gen);
}
// dir: 0=both negative=rx positive=tx
@ -2645,6 +2670,8 @@ bool BrfLibUsbDevice::open(const NamedList& params)
int tmpInt = 0;
int i = 0;
int q = 0;
m_rxResyncCandidate = 0;
m_rxTsPastIntervalMs = clampIntParam(params,"rx_ts_past_error_interval",200,50,10000);
#if 1
i = clampIntParam(params,"RX.OffsetI",0,-BRF_RX_DC_OFFSET_MAX,BRF_RX_DC_OFFSET_MAX);
q = clampIntParam(params,"RX.OffsetQ",0,-BRF_RX_DC_OFFSET_MAX,BRF_RX_DC_OFFSET_MAX);
@ -3615,72 +3642,105 @@ unsigned int BrfLibUsbDevice::recv(uint64_t& ts, float* data, unsigned int& samp
unsigned int samplesCopied = 0;
unsigned int samplesLeft = samples;
float* cpDest = data;
// Don't warn if requested timestamp is in the future
bool warnTsPast = ts && (ts <= io.timestamp);
uint64_t crtTs = ts;
String e;
unsigned int status = lusbSetAltInterface(BRF_ALTSET_RF_LINK,&e);
unsigned int nSamplesInPast = 0;
while (!status) {
bool done = false;
while (samplesLeft && io.crtBuf < io.buffers) {
// Retrieve buffer timestamp
uint64_t bufTs = io.bufTs(io.crtBuf);
if (io.crtBufSampOffs)
bufTs += io.crtBufSampOffs;
int64_t resync = (io.newBuffer && bufTs != m_rxTimestamp) ?
(int64_t)(bufTs - m_rxTimestamp) : 0;
#ifdef DEBUG_DEVICE_RX
String deltaStr;
if (resync)
deltaStr << " " << (resync > 0 ? "future=" : "past=") <<
(resync > 0 ? resync : -resync);
Debugger loopDbg(DebugAll,"RX: processing buffer",
" %u/%u ts=" FMT64U " crt_ts=" FMT64U " [%p]",
io.crtBuf + 1,io.buffers,bufTs,crtTs,m_owner);
" %u/%u rx_ts=" FMT64U " (resync_ts=" FMT64U ") "
"ts=" FMT64U " crt_ts=" FMT64U "%s [%p]",
io.crtBuf + 1,io.buffers,m_rxTimestamp,m_rxResyncCandidate,
bufTs,crtTs,deltaStr.safe(),m_owner);
#endif
// Check timestamp
if (bufTs > crtTs) {
// Buffer timestamp is in the future
// Already set valid data: stop and return it, don't advance the buffer
if (samplesCopied) {
done = true;
break;
if (resync) {
if ((resync > -1000 && resync < 1000) || bufTs == m_rxResyncCandidate) {
Debug(m_owner,DebugNote,
"RX: timestamp adjusted by " FMT64 " to " FMT64U " [%p]",
resync,bufTs,m_owner);
m_rxTimestamp = bufTs;
m_rxResyncCandidate = 0;
}
// No data set: continue from this timestamp
else {
Debug(m_owner,DebugWarn,
"RX: timestamp jumped by " FMT64 " to " FMT64U " in buffer %u/%u [%p]",
resync,m_rxTimestamp,io.crtBuf + 1,io.buffers,m_owner);
m_rxResyncCandidate = bufTs;
}
}
io.newBuffer = false;
unsigned int avail = 0;
int16_t* start = io.crtBufSamples(avail);
if (avail > samplesLeft)
avail = samplesLeft;
// Check timestamp
if (m_rxTimestamp > crtTs) {
// Buffer timestamp is in the future
#ifdef DEBUG_DEVICE_RX
if (crtTs)
Debug(m_owner,DebugNote,
"RX: reset timestamp in future in buffer %u/%u requested="
"RX: timestamp in future in buffer %u/%u requested="
FMT64U " found=" FMT64U " [%p]",
io.crtBuf + 1,io.buffers,crtTs,bufTs,m_owner);
ts = crtTs = bufTs;
io.crtBuf + 1,io.buffers,crtTs,m_rxTimestamp,m_owner);
#endif
// Pad with 0
uint64_t delta = m_rxTimestamp - crtTs;
if (delta > samplesLeft)
delta = samplesLeft;
crtTs += delta;
samplesLeft -= delta;
samplesCopied += delta;
::memset(cpDest,0,2 * delta * sizeof(float));
cpDest += 2 * delta;
#ifdef DEBUG_DEVICE_RX
Debug(m_owner,DebugAll,
"RX: zeroed %u samples status=%u/%u remains=%u [%p]",
(unsigned int)delta,samplesCopied,samples,samplesLeft,m_owner);
#endif
if (!samplesLeft)
break;
if (avail > samplesLeft)
avail = samplesLeft;
}
else if (bufTs < crtTs) {
else if (m_rxTimestamp < crtTs) {
// Timestamp in the past: check if can use some data, skip buffer
unsigned int avail = 0;
io.crtBufSamples(avail);
unsigned int skipSamples = avail;
uint64_t diff = crtTs - bufTs;
if (diff < skipSamples)
skipSamples = diff;
if (warnTsPast)
Debug(m_owner,DebugNote,
"RX: skipping %u/%u samples in buffer %u/%u:"
" timestamp in the past by " FMT64U " [%p]",
skipSamples,avail,io.crtBuf + 1,io.buffers,crtTs - bufTs,m_owner);
uint64_t delta = crtTs - m_rxTimestamp;
if (delta < skipSamples)
skipSamples = delta;
#ifdef DEBUG_DEVICE_RX
Debug(m_owner,DebugNote,
"RX: skipping %u/%u samples in buffer %u/%u:"
" timestamp in the past by " FMT64U " [%p]",
skipSamples,avail,io.crtBuf + 1,io.buffers,delta,m_owner);
#endif
avail -= skipSamples;
nSamplesInPast += skipSamples;
io.crtBufSampOffs += skipSamples;
m_rxTimestamp += skipSamples;
if (m_rxResyncCandidate)
m_rxResyncCandidate += skipSamples;
if (io.crtBufSampOffs >= io.bufSamples) {
io.advanceBuffer();
continue;
}
}
warnTsPast = true;
unsigned int avail = 0;
int16_t* start = io.crtBufSamples(avail);
if (avail > samplesLeft)
avail = samplesLeft;
// We have some valid data: reset samples in the past counter
if (avail)
nSamplesInPast = 0;
int16_t* last = start + avail * 2;
#ifdef DEBUG_DEVICE_RX
String s;
//io.dumpInt16Samples(s,io.crtBuf,io.crtBufSampOffs,avail);
//s = "\r\n-----\r\n" + s + "\r\n-----";
Debug(m_owner,DebugAll,"RX: buf %u/%u preparing to copy %u samples data=(%p) (end=(%p)) "
"start=(%p) last=(%p) [%p]%s",io.crtBuf + 1,io.buffers,avail,
cpDest,cpDest + avail,start,last,m_owner,s.safe());
#endif
// Copy data
while (start != last) {
*cpDest++ = ((float)*start++) / 2048;
@ -3688,12 +3748,14 @@ unsigned int BrfLibUsbDevice::recv(uint64_t& ts, float* data, unsigned int& samp
}
samplesCopied += avail;
samplesLeft -= avail;
m_rxTimestamp += avail;
if (m_rxResyncCandidate)
m_rxResyncCandidate += avail;
#ifdef DEBUG_DEVICE_RX
Debug(m_owner,DebugAll,
"RX: copied %u samples from buffer %u/%u status=%u/%u"
" remains=%u data=(%p) start=(%p) [%p]",
avail,io.crtBuf + 1,io.buffers,samplesCopied,samples,samplesLeft,
cpDest,start,m_owner);
"RX: copied %u samples from buffer %u/%u status=%u/%u remains=%u [%p]",
avail,io.crtBuf + 1,io.buffers,samplesCopied,
samples,samplesLeft,m_owner);
#endif
// Advance buffer offset, advance the buffer if we used all data
io.crtBufSampOffs += avail;
@ -3702,8 +3764,18 @@ unsigned int BrfLibUsbDevice::recv(uint64_t& ts, float* data, unsigned int& samp
crtTs += avail;
}
}
if (done || !samplesLeft)
if (!samplesLeft)
break;
if (nSamplesInPast > m_rxTsPastSamples) {
// Don't signal error if we have some valid data
// This will allow the upper layer to update timestamps
// Read operation may fail on subsequent reads
if (!samplesCopied) {
e = "Too much data in the past";
status = RadioInterface::Failure;
}
break;
}
status = syncTransfer(EpReadSamples,io.bufStart(0),io.buffer.length(),&e);
if (status)
break;
@ -3715,12 +3787,14 @@ unsigned int BrfLibUsbDevice::recv(uint64_t& ts, float* data, unsigned int& samp
unsigned int nPrint = checkDbgInt(io.showBuf,io.buffers);
if (nPrint)
printIOBuffer(false,"RECV",-1,nPrint);
if (m_rxAlterData)
rxAlterData(true);
if (checkDbgInt(io.checkTs))
ioBufCheckTs(false);
if (m_rxDcAuto || m_rxShowDcInfo)
computeRx(crtTs);
if (m_rxAlterData)
rxAlterData();
rxAlterData(false);
}
samples = samplesCopied;
#ifdef DEBUG_DEVICE_RX
@ -3796,6 +3870,9 @@ unsigned int BrfLibUsbDevice::internalSetSampleRate(bool tx, uint32_t value,
Debug(m_owner,DebugInfo,"%s samplerate set to %u [%p]",
brfDir(tx),value,m_owner);
}
// Calculate RX samples allowed to be in the past
if (!tx)
m_rxTsPastSamples = m_rxTsPastIntervalMs * ((value + 999) / 1000);
return 0;
}
e.printf(1024,"Failed to set %s samplerate %u: %s",brfDir(tx),value,e.c_str());
@ -5537,34 +5614,111 @@ void BrfLibUsbDevice::ioBufCheckTs(bool tx, unsigned int nBufs)
void BrfLibUsbDevice::updateAlterData(const NamedList& params)
{
if (params.getBoolValue(YSTRING("rx_alter_increment")))
m_rxAlterIncrement = 1;
else
m_rxAlterIncrement = 0;
m_rxAlterData = (m_rxAlterIncrement != 0);
Lock lck(m_dbgMutex);
m_rxAlterDataParams = params;
m_rxAlterDataParams.assign("-");
m_rxAlterData = true;
}
void BrfLibUsbDevice::rxAlterData()
void BrfLibUsbDevice::rxAlterData(bool first)
{
BrfDevIO& io = m_rxIO;
if (m_rxAlterIncrement) {
int maxVal = 9999 - io.bufSamples * 4;
for (unsigned int i = 0; i < io.buffers; i++) {
if (m_rxAlterIncrement >= maxVal)
while (m_rxAlterDataParams.c_str()) {
Lock lck(m_dbgMutex);
if (!m_rxAlterDataParams.c_str())
break;
if (m_rxAlterDataParams.getBoolValue(YSTRING("rx_alter_increment"))) {
if (!m_rxAlterIncrement)
m_rxAlterIncrement = 1;
int16_t* p = io.samples(i);
int16_t* last = io.samplesEOF(i);
while (p != last) {
*p++ = m_rxAlterIncrement;
*p++ = -m_rxAlterIncrement;
m_rxAlterIncrement++;
}
else
m_rxAlterIncrement = 0;
m_rxAlterData = (m_rxAlterIncrement != 0);
const String& tsJumpPattern = m_rxAlterDataParams[YSTRING("rx_alter_ts_jump_pattern")];
if (tsJumpPattern != m_rxAlterTsJumpPatern) {
m_rxAlterTsJumpPatern = tsJumpPattern;
ObjList* list = m_rxAlterTsJumpPatern.split(',');
m_rxAlterTsJump.overAlloc(10 * sizeof(int64_t));
m_rxAlterTsJump.resize(list->count() * sizeof(int64_t));
int64_t* d = (int64_t*)m_rxAlterTsJump.data();
bool ok = false;
unsigned int index = 0;
for (ObjList* o = list->skipNull(); o; o = o->skipNext()) {
const String* s = static_cast<String*>(o->get());
if (!s->startsWith("rep_")) {
d[index] = s->toInt64();
if (d[index])
ok = true;
index++;
continue;
}
int64_t lastVal = index ? d[index - 1] : 0;
unsigned int repeat = s->substr(4).toInteger(0,0,0);
if (repeat < 2) {
d[index++] = lastVal;
continue;
}
DataBlock tmp = m_rxAlterTsJump;
m_rxAlterTsJump.resize(tmp.length() + (sizeof(int64_t) * (repeat - 1)));
d = (int64_t*)m_rxAlterTsJump.data();
::memcpy(d,tmp.data(),index * sizeof(int64_t));
while (repeat--)
d[index++] = lastVal;
}
TelEngine::destruct(list);
if (!ok)
m_rxAlterTsJump.clear();
m_rxAlterTsJumpPos = 0;
}
m_rxAlterTsJumpSingle = m_rxAlterDataParams.getBoolValue(
YSTRING("rx_alter_ts_jump_single"),true);
if (m_rxAlterTsJump.length())
m_rxAlterData = true;
m_rxAlterDataParams.assign("");
m_rxAlterDataParams.clear();
if (!m_rxAlterData)
return;
}
BrfDevIO& io = m_rxIO;
if (first) {
// Change timestamps
if (m_rxAlterTsJump.length()) {
int64_t* d = (int64_t*)m_rxAlterTsJump.data();
unsigned int len = m_rxAlterTsJump.length() / sizeof(int64_t);
for (unsigned int i = 0; i < io.buffers; i++) {
if (d[m_rxAlterTsJumpPos])
io.setBufTs(i,io.bufTs(i) + d[m_rxAlterTsJumpPos]);
m_rxAlterTsJumpPos++;
if (m_rxAlterTsJumpPos >= len) {
m_rxAlterTsJumpPos = 0;
if (m_rxAlterTsJumpSingle) {
m_rxAlterTsJump.clear();
// Signal update on next call
m_rxAlterData = true;
break;
}
}
}
}
}
else {
// Change radio data
if (m_rxAlterIncrement && !first) {
for (unsigned int i = 0; i < io.buffers; i++) {
int16_t* p = io.samples(i);
int16_t* last = io.samplesEOF(i);
while (p != last) {
*p++ = m_rxAlterIncrement;
*p++ = -m_rxAlterIncrement;
m_rxAlterIncrement++;
if (m_rxAlterIncrement >= 2048)
m_rxAlterIncrement = 1;
}
}
}
}
#if 0
else
return;
printIOBuffer(false,"alter");
if (!first)
printIOBuffer(false,"alter");
#endif
}