sdrangelove/sdrbase/dsp/threadedsamplesink.cpp

115 lines
2.6 KiB
C++

#include <QThread>
#include "dsp/threadedsamplesink.h"
#include "util/message.h"
ThreadedSampleSink::ThreadedSampleSink(SampleSink* sampleSink) :
m_thread(new QThread),
m_sampleSink(sampleSink)
{
moveToThread(m_thread);
connect(m_thread, SIGNAL(started()), this, SLOT(threadStarted()));
connect(m_thread, SIGNAL(finished()), this, SLOT(threadFinished()));
m_messageQueue.moveToThread(m_thread);
connect(&m_messageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleMessages()));
m_sampleFifo.moveToThread(m_thread);
connect(&m_sampleFifo, SIGNAL(dataReady()), this, SLOT(handleData()));
m_sampleFifo.setSize(128 * 1024);
sampleSink->moveToThread(m_thread);
}
ThreadedSampleSink::~ThreadedSampleSink()
{
m_thread->exit();
m_thread->wait();
delete m_thread;
}
void ThreadedSampleSink::feed(SampleVector::const_iterator begin, SampleVector::const_iterator end, bool firstOfBurst)
{
Q_UNUSED(firstOfBurst);
m_sampleFifo.write(begin, end);
}
void ThreadedSampleSink::start()
{
m_thread->start();
}
void ThreadedSampleSink::stop()
{
m_thread->exit();
m_thread->wait();
m_sampleFifo.readCommit(m_sampleFifo.fill());
}
bool ThreadedSampleSink::handleMessage(Message* cmd)
{
// called from other thread
m_messageQueue.submit(cmd);
return true;
}
void ThreadedSampleSink::handleData()
{
bool firstOfBurst = true;
QTime time;
time.start();
while((m_sampleFifo.fill() > 0) && (m_messageQueue.countPending() == 0) && (time.elapsed() < 250)) {
SampleVector::iterator part1begin;
SampleVector::iterator part1end;
SampleVector::iterator part2begin;
SampleVector::iterator part2end;
size_t count = m_sampleFifo.readBegin(m_sampleFifo.fill(), &part1begin, &part1end, &part2begin, &part2end);
if(m_sampleSink != NULL) {
// first part of FIFO data
if(part1begin != part1end) {
// handle data
m_sampleSink->feed(part1begin, part1end, firstOfBurst);
firstOfBurst = false;
}
// second part of FIFO data (used when block wraps around)
if(part2begin != part2end) {
// handle data
m_sampleSink->feed(part2begin, part2end, firstOfBurst);
firstOfBurst = false;
}
}
// adjust FIFO pointers
m_sampleFifo.readCommit(count);
}
}
void ThreadedSampleSink::handleMessages()
{
Message* message;
while((message = m_messageQueue.accept()) != NULL) {
//qDebug("CMD: %s", message->getIdentifier());
if(m_sampleSink != NULL) {
if(!m_sampleSink->handleMessage(message))
message->completed();
} else {
message->completed();
}
}
}
void ThreadedSampleSink::threadStarted()
{
if(m_sampleSink != NULL)
m_sampleSink->start();
}
void ThreadedSampleSink::threadFinished()
{
if(m_sampleSink != NULL)
m_sampleSink->stop();
}