From cbf791469c73a07fc344373965a044170a0cab40 Mon Sep 17 00:00:00 2001 From: Jon Szymaniak Date: Sat, 21 Sep 2013 16:53:16 -0400 Subject: [PATCH] bladeRF: Refactored buffering scheme in sink Removed the use of an intermediate sample FIFO in the sink implementation. Note the the FIFO has not been moved out of bladerf_common --> bladerf_source_c in this commit. work() now handles converting samples from complex to SC16_Q12, and filling "transmit-ready" buffers. The callbacks are now only responsible for marking the provided buffer free, and returning the next buffer. It appears that a small deadlock issues remains in this changest, which can be induced by: 1: Using a small sample rate (160Khz) 2: Switching back and forth between sinusoid <-> GSM burst In this case, it appears that work() is blocked waiting for a buffer to become free. More investigation here is required... --- lib/bladerf/bladerf_sink_c.cc | 212 ++++++++++++++++++++++------------ lib/bladerf/bladerf_sink_c.h | 25 +++- 2 files changed, 163 insertions(+), 74 deletions(-) diff --git a/lib/bladerf/bladerf_sink_c.cc b/lib/bladerf/bladerf_sink_c.cc index 2cd59f7..5d24318 100644 --- a/lib/bladerf/bladerf_sink_c.cc +++ b/lib/bladerf/bladerf_sink_c.cc @@ -39,6 +39,9 @@ #include "arg_helpers.h" #include "bladerf_sink_c.h" +#define NUM_BUFFERS 32 +#define NUM_SAMPLES_PER_BUFFER 4096 + using namespace boost::assign; /* @@ -156,21 +159,72 @@ bladerf_sink_c::bladerf_sink_c (const std::string &args) /* Set the range of VGA2, VGA2GAIN[4:0] */ _vga2_range = osmosdr::gain_range_t( 0, 25, 1 ); - _buf_index = 0; - _num_buffers = 8; /* TODO: make it an argument */ - const size_t samp_per_buf = 1024 * 10; /* TODO: make it an argument */ + _num_buffers = _samples_per_buffer = 0; + + /* Initialize buffer and sample configuration */ + if (dict.count("buffers")) { + _num_buffers = boost::lexical_cast< size_t >( dict["buffers"] ); + } + + if (dict.count("buflen")) { + _samples_per_buffer = boost::lexical_cast< size_t >( dict["buflen"] ); + } + + unsigned int transfers = 0; + if (dict.count("transfers")) { + transfers = boost::lexical_cast< size_t >( dict["transfers"] ); + } + + /* Require value to be >= 2 so we can ensure we have twice as many + * buffers as transfers */ + if (_num_buffers <= 1) { + _num_buffers = NUM_BUFFERS; + } + + if (0 == _samples_per_buffer) { + _samples_per_buffer = NUM_SAMPLES_PER_BUFFER; + } else { + /* For SC16_Q12, 1 sample = 2 int16_t's */ + _samples_per_buffer /= 2 * sizeof(int16_t); + + if (_samples_per_buffer < 1024 || _samples_per_buffer % 1024 != 0) + _samples_per_buffer = NUM_SAMPLES_PER_BUFFER; + } + + + + if (transfers == 0 || transfers > (_num_buffers / 2)) { + transfers = _num_buffers / 2; + + } /* Initialize the stream */ ret = bladerf_init_stream( &_stream, _dev, stream_callback, &_buffers, _num_buffers, BLADERF_FORMAT_SC16_Q12, - samp_per_buf, _num_buffers, this ); + _samples_per_buffer, transfers, this ); if ( ret != 0 ) std::cerr << "bladerf_init_stream has failed with " << ret << std::endl; + /* Initialize buffer management */ + _buf_index = _next_to_tx = 0; + _next_value = static_cast(_buffers[0]); + _samples_left = _samples_per_buffer; + + _filled = new bool[_num_buffers]; + if (!_filled) { + throw std::runtime_error( std::string(__FUNCTION__) + ": " + + "Failed to allocate _filled[]"); + } + + for (size_t i = 0; i < _num_buffers; ++i) { + _filled[i] = false; + } + ret = bladerf_enable_module( _dev, BLADERF_MODULE_TX, true ); if ( ret != 0 ) std::cerr << "bladerf_enable_module has failed with " << ret << std::endl; + set_running( true ); _thread = gr::thread::thread( boost::bind(&bladerf_sink_c::write_task, this) ); } @@ -182,6 +236,13 @@ bladerf_sink_c::~bladerf_sink_c () int ret; set_running(false); + + /* Ensure work() or callbacks return from wait() calls */ + _buf_status_lock.lock(); + _samp_avail.notify_all(); + _buffer_emptied.notify_all(); + _buf_status_lock.unlock(); + _thread.join(); ret = bladerf_enable_module( _dev, BLADERF_MODULE_TX, false ); @@ -193,6 +254,8 @@ bladerf_sink_c::~bladerf_sink_c () /* Close the device */ bladerf_close( _dev ); + + delete[] _filled; } void *bladerf_sink_c::stream_callback( struct bladerf *dev, @@ -203,49 +266,48 @@ void *bladerf_sink_c::stream_callback( struct bladerf *dev, void *user_data ) { bladerf_sink_c *obj = (bladerf_sink_c *) user_data; - - if ( ! obj->is_running() ) - return NULL; - - return obj->stream_task( samples, num_samples ); + return obj->get_next_buffer( samples, num_samples ); } -/* Convert & push samples to the sample fifo */ -void *bladerf_sink_c::stream_task( void *samples, size_t num_samples ) +static size_t buffer2index(void **buffers, void *current, size_t num_buffers) +{ + for (size_t i = 0; i < num_buffers; ++i) { + if (static_cast(current) == static_cast(buffers[i])) + return i; + } + + throw std::runtime_error( std::string(__FUNCTION__) + " " + + "Has hit unexpected condition"); +} + +/* Fetch the next full buffer to pass down to the device */ +void *bladerf_sink_c::get_next_buffer( void *samples, size_t num_samples) { - size_t i, n_avail; void *ret; + bool running; - ret = _buffers[_buf_index]; - _buf_index = (_buf_index + 1) % _num_buffers; - - while ( is_running() ) { - { - /* Lock the circular buffer */ - boost::unique_lock lock(_fifo_lock); + boost::unique_lock lock(_buf_status_lock); - /* Check to make sure we have samples available */ - n_avail = _fifo->size(); - while( n_avail < num_samples ) { - /* Wait until there is at least a block size of samples ready */ - _samp_avail.wait(lock); - n_avail = _fifo->size(); - } + /* Mark the incoming buffer empty and notify work() */ + if (samples) { + size_t buffer_emptied_index = buffer2index(_buffers, samples, _num_buffers); - /* Pop samples from circular buffer, write samples to outgoing buffer */ - int16_t *p = (int16_t *) ret; - for( i = 0; i < num_samples; ++i ) { - gr_complex sample = _fifo->at(0); - _fifo->pop_front(); - *p++ = 0xa000 | (int16_t)(real(sample)*2000); - *p++ = 0x5000 | (int16_t)(imag(sample)*2000); - } - } /* Give up the lock by leaving the scope ... */ + _filled[buffer_emptied_index] = false; + _buffer_emptied.notify_one(); + } - /* Notify that we've just popped some samples */ - //std::cerr << "-" << std::flush; - _samp_avail.notify_one(); + /* Wait for our next buffer to become filled */ + while ((running = is_running()) && !_filled[_next_to_tx]) { + _samp_avail.wait(lock); + } + + if (running) { + ret = _buffers[_next_to_tx]; + _next_to_tx = (_next_to_tx + 1) % _num_buffers; + } else { + ret = NULL; + } } return ret; @@ -255,8 +317,6 @@ void bladerf_sink_c::write_task() { int status; - set_running( true ); - /* Start stream and stay there until we kill the stream */ status = bladerf_stream(_stream, BLADERF_MODULE_TX); @@ -270,49 +330,55 @@ int bladerf_sink_c::work( int noutput_items, gr_vector_const_void_star &input_items, gr_vector_void_star &output_items ) { - int n_space_avail, to_copy, limit, i; const gr_complex *in = (const gr_complex *) input_items[0]; - - if ( ! is_running() ) - return WORK_DONE; + int num_samples, to_copy; + bool running = is_running(); /* Total samples we want to process */ - to_copy = noutput_items; + num_samples = noutput_items; /* While there are still samples to copy out ... */ - while( to_copy > 0 ) { - { - /* Acquire the circular buffer lock */ - boost::unique_lock lock(_fifo_lock); + while (running && num_samples > 0) { - /* Check to see how much space is available */ - n_space_avail = _fifo->capacity() - _fifo->size(); + while (_samples_left && num_samples) { - while (n_space_avail == 0) { - _samp_avail.wait(lock); - n_space_avail = _fifo->capacity() - _fifo->size(); + /* Scale and sign extend I and then Q */ + *_next_value = (int16_t)(real(*in) * 2000); + _next_value++; + + *_next_value = (int16_t)(imag(*in) * 2000); + _next_value++; + + /* Advance to next sample */ + in++; + num_samples--; + _samples_left--; + } + + /* Advance to the next buffer if the current one is filled */ + if (_samples_left == 0) { + { + boost::unique_lock lock(_buf_status_lock); + + _filled[_buf_index] = true; + _buf_index = (_buf_index + 1) % _num_buffers; + _next_value = static_cast(_buffers[_buf_index]); + _samples_left = _samples_per_buffer; + + /* Signal that we have filled a buffer */ + _samp_avail.notify_one(); + + /* Wait here if the next buffer isn't full. The callback will + * signal us when it has freed up a buffer */ + while (_filled[_buf_index] && running) { + _buffer_emptied.wait(lock); + running = is_running(); + } } - - /* Limit ourselves to either the number of output items ... - ... or whatever space is available */ - limit = (n_space_avail < noutput_items ? n_space_avail : noutput_items); - - /* Consume! */ - for( i = 0; i < limit; i++ ) { - _fifo->push_back(*in++); - } - - /* Decrement the amount we need to copy */ - to_copy -= limit; - - } /* Unlock by leaving the scope */ - - /* Notify that we've just added some samples */ - //std::cerr << "+" << std::flush; - _samp_avail.notify_one(); + } } - return noutput_items; + return running ? noutput_items : 0; } std::vector bladerf_sink_c::get_devices() diff --git a/lib/bladerf/bladerf_sink_c.h b/lib/bladerf/bladerf_sink_c.h index 8fa90f4..a0e028b 100644 --- a/lib/bladerf/bladerf_sink_c.h +++ b/lib/bladerf/bladerf_sink_c.h @@ -114,12 +114,35 @@ private: /* functions */ size_t num_samples, void *user_data ); - void *stream_task(void *samples, size_t num_samples); + void *get_next_buffer(void *samples, size_t num_samples); void write_task(); private: /* members */ + size_t _samples_per_buffer; + + /* Array denoting whether each buffer is filled with data and ready to TX */ + bool *_filled; + + /* Acquire while updating _filled, and signalling/waiting on + * _buffer_emptied and _samples_avail */ + boost::mutex _buf_status_lock; + + /* wait() may block waiting for the TX callbacks to make a buffer availble. + * The callback uses this to signal when it has emptied out a buffer. */ + boost::condition_variable _buffer_emptied; + + /* The parent's _samples_avail is used to denote that work() has + * filled a buffer, unblocking a TX callback that's waiting for samples */ + + + /* These values are only to be updated and accessed from within work() */ + int16_t *_next_value; /* I/Q value insertion point in current buffer */ + size_t _samples_left; /* # of samples left to fill in our current buffer */ + + /* This should only be accessed and updated from TX callbacks */ + size_t _next_to_tx; /* Next buffer to transmit */ }; #endif /* INCLUDED_BLADERF_SINK_C_H */