bladeRF: Refactored buffering scheme in sink

Removed the use of an intermediate sample FIFO in the sink
implementation. Note the the FIFO has not been moved out of
bladerf_common --> bladerf_source_c in this commit.

work() now handles converting samples from complex to SC16_Q12, and filling
"transmit-ready" buffers. The callbacks are now only responsible for
marking the provided buffer free, and returning the next buffer.

It appears that a small deadlock issues remains in this changest, which
can be induced by:
 1: Using a small sample rate (160Khz)
 2: Switching back and forth between sinusoid <-> GSM burst

 In this case, it appears that work() is blocked waiting for a buffer to
 become free. More investigation here is required...
This commit is contained in:
Jon Szymaniak 2013-09-21 16:53:16 -04:00 committed by Dimitri Stolnikov
parent ba7188727c
commit 9b41c6aa20
2 changed files with 163 additions and 74 deletions

View File

@ -39,6 +39,9 @@
#include "arg_helpers.h"
#include "bladerf_sink_c.h"
#define NUM_BUFFERS 32
#define NUM_SAMPLES_PER_BUFFER 4096
using namespace boost::assign;
/*
@ -156,21 +159,72 @@ bladerf_sink_c::bladerf_sink_c (const std::string &args)
/* Set the range of VGA2, VGA2GAIN[4:0] */
_vga2_range = osmosdr::gain_range_t( 0, 25, 1 );
_buf_index = 0;
_num_buffers = 8; /* TODO: make it an argument */
const size_t samp_per_buf = 1024 * 10; /* TODO: make it an argument */
_num_buffers = _samples_per_buffer = 0;
/* Initialize buffer and sample configuration */
if (dict.count("buffers")) {
_num_buffers = boost::lexical_cast< size_t >( dict["buffers"] );
}
if (dict.count("buflen")) {
_samples_per_buffer = boost::lexical_cast< size_t >( dict["buflen"] );
}
unsigned int transfers = 0;
if (dict.count("transfers")) {
transfers = boost::lexical_cast< size_t >( dict["transfers"] );
}
/* Require value to be >= 2 so we can ensure we have twice as many
* buffers as transfers */
if (_num_buffers <= 1) {
_num_buffers = NUM_BUFFERS;
}
if (0 == _samples_per_buffer) {
_samples_per_buffer = NUM_SAMPLES_PER_BUFFER;
} else {
/* For SC16_Q12, 1 sample = 2 int16_t's */
_samples_per_buffer /= 2 * sizeof(int16_t);
if (_samples_per_buffer < 1024 || _samples_per_buffer % 1024 != 0)
_samples_per_buffer = NUM_SAMPLES_PER_BUFFER;
}
if (transfers == 0 || transfers > (_num_buffers / 2)) {
transfers = _num_buffers / 2;
}
/* Initialize the stream */
ret = bladerf_init_stream( &_stream, _dev, stream_callback,
&_buffers, _num_buffers, BLADERF_FORMAT_SC16_Q12,
samp_per_buf, _num_buffers, this );
_samples_per_buffer, transfers, this );
if ( ret != 0 )
std::cerr << "bladerf_init_stream has failed with " << ret << std::endl;
/* Initialize buffer management */
_buf_index = _next_to_tx = 0;
_next_value = static_cast<int16_t*>(_buffers[0]);
_samples_left = _samples_per_buffer;
_filled = new bool[_num_buffers];
if (!_filled) {
throw std::runtime_error( std::string(__FUNCTION__) + ": " +
"Failed to allocate _filled[]");
}
for (size_t i = 0; i < _num_buffers; ++i) {
_filled[i] = false;
}
ret = bladerf_enable_module( _dev, BLADERF_MODULE_TX, true );
if ( ret != 0 )
std::cerr << "bladerf_enable_module has failed with " << ret << std::endl;
set_running( true );
_thread = gr::thread::thread( boost::bind(&bladerf_sink_c::write_task, this) );
}
@ -182,6 +236,13 @@ bladerf_sink_c::~bladerf_sink_c ()
int ret;
set_running(false);
/* Ensure work() or callbacks return from wait() calls */
_buf_status_lock.lock();
_samp_avail.notify_all();
_buffer_emptied.notify_all();
_buf_status_lock.unlock();
_thread.join();
ret = bladerf_enable_module( _dev, BLADERF_MODULE_TX, false );
@ -193,6 +254,8 @@ bladerf_sink_c::~bladerf_sink_c ()
/* Close the device */
bladerf_close( _dev );
delete[] _filled;
}
void *bladerf_sink_c::stream_callback( struct bladerf *dev,
@ -203,49 +266,48 @@ void *bladerf_sink_c::stream_callback( struct bladerf *dev,
void *user_data )
{
bladerf_sink_c *obj = (bladerf_sink_c *) user_data;
if ( ! obj->is_running() )
return NULL;
return obj->stream_task( samples, num_samples );
return obj->get_next_buffer( samples, num_samples );
}
/* Convert & push samples to the sample fifo */
void *bladerf_sink_c::stream_task( void *samples, size_t num_samples )
static size_t buffer2index(void **buffers, void *current, size_t num_buffers)
{
for (size_t i = 0; i < num_buffers; ++i) {
if (static_cast<char*>(current) == static_cast<char*>(buffers[i]))
return i;
}
throw std::runtime_error( std::string(__FUNCTION__) + " " +
"Has hit unexpected condition");
}
/* Fetch the next full buffer to pass down to the device */
void *bladerf_sink_c::get_next_buffer( void *samples, size_t num_samples)
{
size_t i, n_avail;
void *ret;
bool running;
ret = _buffers[_buf_index];
_buf_index = (_buf_index + 1) % _num_buffers;
while ( is_running() )
{
{
/* Lock the circular buffer */
boost::unique_lock<boost::mutex> lock(_fifo_lock);
boost::unique_lock<boost::mutex> lock(_buf_status_lock);
/* Check to make sure we have samples available */
n_avail = _fifo->size();
while( n_avail < num_samples ) {
/* Wait until there is at least a block size of samples ready */
_samp_avail.wait(lock);
n_avail = _fifo->size();
}
/* Mark the incoming buffer empty and notify work() */
if (samples) {
size_t buffer_emptied_index = buffer2index(_buffers, samples, _num_buffers);
/* Pop samples from circular buffer, write samples to outgoing buffer */
int16_t *p = (int16_t *) ret;
for( i = 0; i < num_samples; ++i ) {
gr_complex sample = _fifo->at(0);
_fifo->pop_front();
*p++ = 0xa000 | (int16_t)(real(sample)*2000);
*p++ = 0x5000 | (int16_t)(imag(sample)*2000);
}
} /* Give up the lock by leaving the scope ... */
_filled[buffer_emptied_index] = false;
_buffer_emptied.notify_one();
}
/* Notify that we've just popped some samples */
//std::cerr << "-" << std::flush;
_samp_avail.notify_one();
/* Wait for our next buffer to become filled */
while ((running = is_running()) && !_filled[_next_to_tx]) {
_samp_avail.wait(lock);
}
if (running) {
ret = _buffers[_next_to_tx];
_next_to_tx = (_next_to_tx + 1) % _num_buffers;
} else {
ret = NULL;
}
}
return ret;
@ -255,8 +317,6 @@ void bladerf_sink_c::write_task()
{
int status;
set_running( true );
/* Start stream and stay there until we kill the stream */
status = bladerf_stream(_stream, BLADERF_MODULE_TX);
@ -270,49 +330,55 @@ int bladerf_sink_c::work( int noutput_items,
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items )
{
int n_space_avail, to_copy, limit, i;
const gr_complex *in = (const gr_complex *) input_items[0];
if ( ! is_running() )
return WORK_DONE;
int num_samples, to_copy;
bool running = is_running();
/* Total samples we want to process */
to_copy = noutput_items;
num_samples = noutput_items;
/* While there are still samples to copy out ... */
while( to_copy > 0 ) {
{
/* Acquire the circular buffer lock */
boost::unique_lock<boost::mutex> lock(_fifo_lock);
while (running && num_samples > 0) {
/* Check to see how much space is available */
n_space_avail = _fifo->capacity() - _fifo->size();
while (_samples_left && num_samples) {
while (n_space_avail == 0) {
_samp_avail.wait(lock);
n_space_avail = _fifo->capacity() - _fifo->size();
/* Scale and sign extend I and then Q */
*_next_value = (int16_t)(real(*in) * 2000);
_next_value++;
*_next_value = (int16_t)(imag(*in) * 2000);
_next_value++;
/* Advance to next sample */
in++;
num_samples--;
_samples_left--;
}
/* Advance to the next buffer if the current one is filled */
if (_samples_left == 0) {
{
boost::unique_lock<boost::mutex> lock(_buf_status_lock);
_filled[_buf_index] = true;
_buf_index = (_buf_index + 1) % _num_buffers;
_next_value = static_cast<int16_t*>(_buffers[_buf_index]);
_samples_left = _samples_per_buffer;
/* Signal that we have filled a buffer */
_samp_avail.notify_one();
/* Wait here if the next buffer isn't full. The callback will
* signal us when it has freed up a buffer */
while (_filled[_buf_index] && running) {
_buffer_emptied.wait(lock);
running = is_running();
}
}
/* Limit ourselves to either the number of output items ...
... or whatever space is available */
limit = (n_space_avail < noutput_items ? n_space_avail : noutput_items);
/* Consume! */
for( i = 0; i < limit; i++ ) {
_fifo->push_back(*in++);
}
/* Decrement the amount we need to copy */
to_copy -= limit;
} /* Unlock by leaving the scope */
/* Notify that we've just added some samples */
//std::cerr << "+" << std::flush;
_samp_avail.notify_one();
}
}
return noutput_items;
return running ? noutput_items : 0;
}
std::vector<std::string> bladerf_sink_c::get_devices()

View File

@ -114,12 +114,35 @@ private: /* functions */
size_t num_samples,
void *user_data );
void *stream_task(void *samples, size_t num_samples);
void *get_next_buffer(void *samples, size_t num_samples);
void write_task();
private: /* members */
size_t _samples_per_buffer;
/* Array denoting whether each buffer is filled with data and ready to TX */
bool *_filled;
/* Acquire while updating _filled, and signalling/waiting on
* _buffer_emptied and _samples_avail */
boost::mutex _buf_status_lock;
/* wait() may block waiting for the TX callbacks to make a buffer availble.
* The callback uses this to signal when it has emptied out a buffer. */
boost::condition_variable _buffer_emptied;
/* The parent's _samples_avail is used to denote that work() has
* filled a buffer, unblocking a TX callback that's waiting for samples */
/* These values are only to be updated and accessed from within work() */
int16_t *_next_value; /* I/Q value insertion point in current buffer */
size_t _samples_left; /* # of samples left to fill in our current buffer */
/* This should only be accessed and updated from TX callbacks */
size_t _next_to_tx; /* Next buffer to transmit */
};
#endif /* INCLUDED_BLADERF_SINK_C_H */