bladerf: Updated source/sink to use libbladeRF's sync interface

wip-signat
Brian Padalino 9 years ago committed by Dimitri Stolnikov
parent 1adf936c94
commit 4e0a2c28e3
  1. 76
      lib/bladerf/bladerf_common.cc
  2. 33
      lib/bladerf/bladerf_common.h
  3. 214
      lib/bladerf/bladerf_sink_c.cc
  4. 38
      lib/bladerf/bladerf_sink_c.h
  5. 206
      lib/bladerf/bladerf_source_c.cc
  6. 27
      lib/bladerf/bladerf_source_c.h

@ -48,8 +48,12 @@ using namespace boost::assign;
boost::mutex bladerf_common::_devs_mutex;
std::list<boost::weak_ptr<struct bladerf> > bladerf_common::_devs;
bladerf_common::bladerf_common() : _is_running(false) {}
bladerf_common::~bladerf_common() {}
bladerf_common::bladerf_common() : _conv_buf(NULL), _conv_buf_size(4096) {}
bladerf_common::~bladerf_common()
{
free(_conv_buf);
}
bladerf_sptr bladerf_common:: get_cached_device(struct bladerf_devinfo devinfo)
{
@ -170,13 +174,53 @@ void bladerf_common::set_verbosity(const std::string &verbosity)
bladerf_log_set_verbosity(l);
}
void bladerf_common::init(dict_t &dict, const char *type)
bool bladerf_common::start(bladerf_module module)
{
int ret;
ret = bladerf_sync_config(_dev.get(), module, BLADERF_FORMAT_SC16_Q11,
_num_buffers, _samples_per_buffer,
_num_transfers, _stream_timeout_ms);
if ( ret != 0 ) {
std::cerr << _pfx << "bladerf_sync_config failed: "
<< bladerf_strerror(ret) << std::endl;
return false;
}
ret = bladerf_enable_module(_dev.get(), module, true);
if ( ret != 0 ) {
std::cerr << _pfx << "bladerf_enable_module failed: "
<< bladerf_strerror(ret) << std::endl;
return false;
}
return true;
}
bool bladerf_common::stop(bladerf_module module)
{
int ret;
ret = bladerf_enable_module(_dev.get(), module, false);
if ( ret != 0 ) {
std::cerr << _pfx << "bladerf_enable_modue failed: "
<< bladerf_strerror(ret) << std::endl;
return false;
}
return true;
}
void bladerf_common::init(dict_t &dict, bladerf_module module)
{
int ret;
unsigned int device_number = 0;
std::string device_name;
struct bladerf_version ver;
char serial[BLADERF_SERIAL_LENGTH];
const char *type = (module == BLADERF_MODULE_TX ? "sink" : "source");
_pfx = std::string("[bladeRF ") + std::string(type) + std::string("] ");
@ -280,6 +324,11 @@ void bladerf_common::init(dict_t &dict, const char *type)
_num_transfers = boost::lexical_cast< size_t >( dict["transfers"] );
}
_stream_timeout_ms = 3000;
if (dict.count("stream_timeout_ms")) {
_stream_timeout_ms = boost::lexical_cast< unsigned int >(dict["stream_timout_ms"] );
}
/* Require value to be >= 2 so we can ensure we have twice as many
* buffers as transfers */
if (_num_buffers <= 1) {
@ -307,6 +356,13 @@ void bladerf_common::init(dict_t &dict, const char *type)
if (_num_transfers == 0 || _num_transfers > (_num_buffers / 2)) {
_num_transfers = _num_buffers / 2;
}
_conv_buf = static_cast<int16_t*>(malloc(_conv_buf_size * 2 * sizeof(int16_t)));
if (_conv_buf == NULL) {
throw std::runtime_error( std::string(__FUNCTION__) +
"Failed to allocate _conv_buf" );
}
}
osmosdr::freq_range_t bladerf_common::freq_range()
@ -378,20 +434,6 @@ std::vector< std::string > bladerf_common::devices()
return ret;
}
bool bladerf_common::is_running()
{
boost::shared_lock<boost::shared_mutex> lock(_state_lock);
return _is_running;
}
void bladerf_common::set_running( bool is_running )
{
boost::unique_lock<boost::shared_mutex> lock(_state_lock);
_is_running = is_running;
}
double bladerf_common::set_sample_rate( bladerf_module module, double rate )
{
int status;

@ -40,9 +40,6 @@
#include "osmosdr/ranges.h"
#include "arg_helpers.h"
/* We currently read/write 1024 samples (pairs of 16-bit signed ints) */
#define BLADERF_SAMPLE_BLOCK_SIZE (1024)
typedef boost::shared_ptr<struct bladerf> bladerf_sptr;
class bladerf_common
@ -53,7 +50,10 @@ public:
protected:
/* Handle initialized and parameters common to both source & sink */
void init(dict_t &dict, const char *type);
void init(dict_t &dict, bladerf_module module);
bool start(bladerf_module module);
bool stop(bladerf_module module);
double set_sample_rate(bladerf_module module, double rate);
double get_sample_rate(bladerf_module module);
@ -67,45 +67,36 @@ protected:
static std::vector< std::string > devices();
bool is_running();
void set_running(bool is_running);
bladerf_sptr _dev;
void **_buffers;
struct bladerf_stream *_stream;
size_t _num_buffers;
size_t _buf_index;
size_t _samples_per_buffer;
size_t _num_transfers;
unsigned int _stream_timeout_ms;
gr::thread::thread _thread;
int16_t *_conv_buf;
int _conv_buf_size; /* In units of samples */
osmosdr::gain_range_t _vga1_range;
osmosdr::gain_range_t _vga2_range;
std::string _pfx;
/*
* BladeRF IQ correction parameters
*/
/* BladeRF IQ correction parameters */
static const int16_t DCOFF_SCALE = 2048;
static const int16_t GAIN_SCALE = 4096;
static const int16_t PHASE_SCALE = 4096;
private:
void set_verbosity(const std::string &verbosity);
void set_loopback_mode(const std::string &loopback);
bladerf_sptr open(const std::string &device_name);
static void close(void *dev); /* called by shared_ptr */
static bladerf_sptr get_cached_device(struct bladerf_devinfo devinfo);
bool _is_running;
boost::shared_mutex _state_lock;
void set_verbosity(const std::string &verbosity);
void set_loopback_mode(const std::string &loopback);
static boost::mutex _devs_mutex;
static std::list<boost::weak_ptr<struct bladerf> > _devs;
static bladerf_sptr get_cached_device(struct bladerf_devinfo devinfo);
static void close(void *dev); /* called by shared_ptr */
};
#endif

@ -42,7 +42,7 @@
using namespace boost::assign;
/*
* Create a new instance of bladerf_source_c and return
* Create a new instance of bladerf_sink_c and return
* a boost shared_ptr. This is effectively the public constructor.
*/
bladerf_sink_c_sptr make_bladerf_sink_c (const std::string &args)
@ -75,162 +75,23 @@ bladerf_sink_c::bladerf_sink_c (const std::string &args)
dict_t dict = params_to_dict(args);
/* Perform src/sink agnostic initializations */
init(dict, "source");
init(dict, BLADERF_MODULE_TX);
/* Set the range of VGA1, VGA1GAINT[7:0] */
_vga1_range = osmosdr::gain_range_t( -35, -4, 1 );
/* Set the range of VGA2, VGA2GAIN[4:0] */
_vga2_range = osmosdr::gain_range_t( 0, 25, 1 );
_filled = new bool[_num_buffers];
if (!_filled) {
throw std::runtime_error( std::string(__FUNCTION__) + " " +
"Failed to allocate _filled[]" );
}
}
/*
* Our virtual destructor.
*/
bladerf_sink_c::~bladerf_sink_c ()
{
int ret;
if( is_running() == true ) {
std::cerr << _pfx << "Still running when destructor called!"
<< std::endl;
stop();
}
ret = bladerf_enable_module( _dev.get(), BLADERF_MODULE_TX, false );
if ( ret != 0 )
std::cerr << _pfx << "bladerf_enable_module failed:"
<< bladerf_strerror(ret) << std::endl;
/* Release stream resources */
bladerf_deinit_stream(_stream);
delete[] _filled;
}
void *bladerf_sink_c::stream_callback( struct bladerf *dev,
struct bladerf_stream *stream,
struct bladerf_metadata *metadata,
void *samples,
size_t num_samples,
void *user_data )
{
bladerf_sink_c *obj = (bladerf_sink_c *) user_data;
return obj->get_next_buffer( samples, num_samples );
}
static size_t buffer2index(void **buffers, void *current, size_t num_buffers)
{
for (size_t i = 0; i < num_buffers; ++i) {
if (static_cast<char*>(current) == static_cast<char*>(buffers[i]))
return i;
}
throw std::runtime_error( std::string(__FUNCTION__) + " " +
"Has hit unexpected condition");
}
/* Fetch the next full buffer to pass down to the device */
void *bladerf_sink_c::get_next_buffer( void *samples, size_t num_samples)
{
void *ret;
bool running;
{
boost::unique_lock<boost::mutex> lock(_buf_status_lock);
/* Mark the incoming buffer empty and notify work() */
if (samples) {
size_t buffer_emptied_index = buffer2index(_buffers, samples, _num_buffers);
_filled[buffer_emptied_index] = false;
_buffer_emptied.notify_one();
}
/* Wait for our next buffer to become filled */
while ((running = is_running()) && !_filled[_next_to_tx]) {
_buffer_filled.wait(lock);
}
if (running) {
ret = _buffers[_next_to_tx];
_next_to_tx = (_next_to_tx + 1) % _num_buffers;
} else {
ret = NULL;
}
}
return ret;
}
void bladerf_sink_c::write_task()
{
int status;
/* Start stream and stay there until we kill the stream */
set_running(true);
status = bladerf_stream(_stream, BLADERF_MODULE_TX);
if ( status < 0 ) {
set_running(false);
std::cerr << _pfx << "Sink stream error: "
<< bladerf_strerror(status) << std::endl;
if ( status == BLADERF_ERR_TIMEOUT ) {
std::cerr << _pfx << "Try adjusting your sample rate or the "
<< "\"buffers\", \"buflen\", and \"transfers\" parameters. "
<< std::endl;
}
}
}
bool bladerf_sink_c::start()
{
int ret;
/* Initialize the stream */
ret = bladerf_init_stream( &_stream, _dev.get(), stream_callback,
&_buffers, _num_buffers, BLADERF_FORMAT_SC16_Q11,
_samples_per_buffer, _num_transfers, this );
if ( ret != 0 ) {
throw std::runtime_error( std::string(__FUNCTION__) + " " +
"bladerf_init_stream failed" ) ;
}
/* Initialize buffer management */
_buf_index = _next_to_tx = 0;
_next_value = static_cast<int16_t*>(_buffers[0]);
_samples_left = _samples_per_buffer;
for (size_t i = 0; i < _num_buffers; ++i) {
_filled[i] = false;
}
ret = bladerf_enable_module( _dev.get(), BLADERF_MODULE_TX, true );
if ( ret != 0 ) {
throw std::runtime_error(std::string(__FUNCTION__) + " " +
"bladerf_enable_module has failed:" + bladerf_strerror(ret) );
}
_thread = gr::thread::thread( boost::bind(&bladerf_sink_c::write_task, this) );
while(is_running() == false) {
/* Not quite started up just yet, so wait for a short period of time */
boost::this_thread::sleep( boost::posix_time::milliseconds(1) );
}
return true;
return bladerf_common::start(BLADERF_MODULE_TX);
}
bool bladerf_sink_c::stop()
{
set_running(false);
_thread.join();
return true;
return bladerf_common::stop(BLADERF_MODULE_TX);
}
int bladerf_sink_c::work( int noutput_items,
@ -238,56 +99,43 @@ int bladerf_sink_c::work( int noutput_items,
gr_vector_void_star &output_items )
{
const gr_complex *in = (const gr_complex *) input_items[0];
int num_samples;
bool running = is_running();
/* Total samples we want to process */
num_samples = noutput_items;
struct bladerf_metadata meta;
const float scaling = 2000.0f;
int ret;
/* While there are still samples to copy out ... */
while (running && num_samples > 0) {
if (noutput_items > _conv_buf_size) {
void *tmp;
while (_samples_left && num_samples) {
_conv_buf_size = noutput_items;
tmp = realloc(_conv_buf, _conv_buf_size * 2 * sizeof(int16_t));
if (tmp == NULL) {
throw std::runtime_error( std::string(__FUNCTION__) +
"Failed to realloc _conv_buf" );
}
/* Scale and sign extend I and then Q */
*_next_value = (int16_t)(real(*in) * 2000);
_next_value++;
_conv_buf = static_cast<int16_t*>(tmp);
}
*_next_value = (int16_t)(imag(*in) * 2000);
_next_value++;
/* Convert floating point samples into fixed point */
for (int i = 0; i < 2 * noutput_items;) {
_conv_buf[i++] = (int16_t)(scaling * real(*in));
_conv_buf[i++] = (int16_t)(scaling * imag(*in++));
}
/* Advance to next sample */
in++;
num_samples--;
_samples_left--;
}
/* Submit them to the device */
ret = bladerf_sync_tx(_dev.get(), static_cast<void *>(_conv_buf),
noutput_items, &meta, _stream_timeout_ms);
/* Advance to the next buffer if the current one is filled */
if (_samples_left == 0) {
{
boost::unique_lock<boost::mutex> lock(_buf_status_lock);
_filled[_buf_index] = true;
_buf_index = (_buf_index + 1) % _num_buffers;
_next_value = static_cast<int16_t*>(_buffers[_buf_index]);
_samples_left = _samples_per_buffer;
/* Signal that we have filled a buffer */
_buffer_filled.notify_one();
/* Wait here if the next buffer isn't full. The callback will
* signal us when it has freed up a buffer */
while (_filled[_buf_index] && running) {
_buffer_emptied.wait(lock);
running = is_running();
}
}
}
if ( ret != 0 ) {
std::cerr << _pfx << "bladerf_sync_tx error: "
<< bladerf_strerror(ret) << std::endl;
return WORK_DONE;
}
return running ? noutput_items : 0;
return noutput_items;
}
std::vector<std::string> bladerf_sink_c::get_devices()
{
return bladerf_common::devices();

@ -66,8 +66,6 @@ private:
bladerf_sink_c (const std::string & args); // private constructor
public:
~bladerf_sink_c (); // public destructor
bool start();
bool stop();
@ -111,42 +109,6 @@ public:
double set_bandwidth( double bandwidth, size_t chan = 0 );
double get_bandwidth( size_t chan = 0 );
osmosdr::freq_range_t get_bandwidth_range( size_t chan = 0 );
private: /* functions */
static void *stream_callback( struct bladerf *_dev,
struct bladerf_stream *stream,
struct bladerf_metadata *metadata,
void *samples,
size_t num_samples,
void *user_data );
void *get_next_buffer(void *samples, size_t num_samples);
void write_task();
private: /* members */
/* Array denoting whether each buffer is filled with data and ready to TX */
bool *_filled;
/* Acquire while updating _filled, and signalling/waiting on
* _buffer_emptied and _buffer_filled */
boost::mutex _buf_status_lock;
/* work() may block waiting for the stream callback to empty (consume) a
* buffer. The callback uses this to signal when it has emptied a buffer. */
boost::condition_variable _buffer_emptied;
/* The stream callback may block waiting for work() to fill (produce) a
* buffer. work() uses this to signal that it has filled a buffer. */
boost::condition_variable _buffer_filled;
/* These values are only to be updated and accessed from within work() */
int16_t *_next_value; /* I/Q value insertion point in current buffer */
size_t _samples_left; /* # of samples left to fill in our current buffer */
/* This should only be accessed and updated from TX callbacks */
size_t _next_to_tx; /* Next buffer to transmit */
};
#endif /* INCLUDED_BLADERF_SINK_C_H */

@ -40,13 +40,6 @@
#include "bladerf_source_c.h"
#include "osmosdr/source.h"
/*
* Default size of sample FIFO, in entries.
*/
#define BLADERF_SAMPLE_FIFO_SIZE (2 * 1024 * 1024)
#define BLADERF_SAMPLE_FIFO_MIN_SIZE (3 * BLADERF_SAMPLE_BLOCK_SIZE)
using namespace boost::assign;
/*
@ -81,35 +74,12 @@ bladerf_source_c::bladerf_source_c (const std::string &args)
gr::io_signature::make (MIN_OUT, MAX_OUT, sizeof (gr_complex)))
{
int ret;
size_t fifo_size;
std::string device_name;
struct bladerf_version fpga_version;
dict_t dict = params_to_dict(args);
init(dict, "source");
fifo_size = BLADERF_SAMPLE_FIFO_SIZE;
if (dict.count("fifo")) {
try {
fifo_size = boost::lexical_cast<size_t>(dict["fifo"]);
} catch (const boost::bad_lexical_cast &e) {
std::cerr << _pfx << "Warning: \"fifo\" value is invalid. Defaulting to "
<< fifo_size;
}
if (fifo_size < BLADERF_SAMPLE_FIFO_MIN_SIZE) {
fifo_size = BLADERF_SAMPLE_FIFO_MIN_SIZE;
std::cerr << _pfx << "Warning: \"fifo\" value is too small. Defaulting to "
<< BLADERF_SAMPLE_FIFO_MIN_SIZE;
}
}
_fifo = new boost::circular_buffer<gr_complex>(fifo_size);
if (!_fifo) {
throw std::runtime_error( std::string(__FUNCTION__) + " " +
"Failed to allocate a sample FIFO!" );
}
init(dict, BLADERF_MODULE_RX);
if (dict.count("sampling"))
{
@ -155,165 +125,61 @@ bladerf_source_c::bladerf_source_c (const std::string &args)
}
}
/*
* Our virtual destructor.
*/
bladerf_source_c::~bladerf_source_c ()
bool bladerf_source_c::start()
{
int ret;
if (is_running()) {
std::cerr << _pfx << "Still running when destructor called!"
<< std::endl;
stop();
}
ret = bladerf_enable_module( _dev.get(), BLADERF_MODULE_RX, false );
if ( ret != 0 )
std::cerr << _pfx << "bladerf_enable_module failed: "
<< bladerf_strerror(ret) << std::endl;
/* Release stream resources */
bladerf_deinit_stream(_stream);
delete _fifo;
return bladerf_common::start(BLADERF_MODULE_RX);
}
void *bladerf_source_c::stream_callback( struct bladerf *dev,
struct bladerf_stream *stream,
struct bladerf_metadata *metadata,
void *samples,
size_t num_samples,
void *user_data )
bool bladerf_source_c::stop()
{
bladerf_source_c *obj = (bladerf_source_c *) user_data;
if ( ! obj->is_running() )
return NULL;
return obj->stream_task( samples, num_samples );
return bladerf_common::stop(BLADERF_MODULE_RX);
}
/* Convert & push samples to the sample fifo */
void *bladerf_source_c::stream_task( void *samples, size_t num_samples )
int bladerf_source_c::work( int noutput_items,
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items )
{
size_t i, n_avail, to_copy;
int16_t *sample = (int16_t *)samples;
void *ret;
int ret;
struct bladerf_metadata meta;
int16_t *current;
const float scaling = 1.0f / 2048.0f;
gr_complex *out = static_cast<gr_complex *>(output_items[0]);
ret = _buffers[_buf_index];
_buf_index = (_buf_index + 1) % _num_buffers;
_fifo_lock.lock();
n_avail = _fifo->capacity() - _fifo->size();
to_copy = (n_avail < num_samples ? n_avail : num_samples);
for(i = 0; i < to_copy; i++ ) {
/* Push sample to the fifo */
_fifo->push_back( gr_complex( *sample * scaling,
*(sample+1) * scaling) );
/* offset to the next I+Q sample */
sample += 2;
}
_fifo_lock.unlock();
if (noutput_items > _conv_buf_size) {
void *tmp;
/* We have made some new samples available to the consumer in work() */
if (to_copy) {
//std::cerr << "+" << std::flush;
_samp_avail.notify_one();
}
/* Indicate overrun, if neccesary */
if (to_copy < num_samples)
std::cerr << "O" << std::flush;
return ret;
}
void bladerf_source_c::read_task()
{
int status;
set_running( true );
/* Start stream and stay there until we kill the stream */
status = bladerf_stream(_stream, BLADERF_MODULE_RX);
if ( status < 0 ) {
set_running( false );
std::cerr << "Source stream error: " << bladerf_strerror(status) << std::endl;
if ( status == BLADERF_ERR_TIMEOUT ) {
std::cerr << _pfx << "Try adjusting your sample rate or the "
<< "\"buffers\", \"buflen\", and \"transfers\" parameters. "
<< std::endl;
_conv_buf_size = noutput_items;
tmp = realloc(_conv_buf, _conv_buf_size * 2 * sizeof(int16_t));
if (tmp == NULL) {
throw std::runtime_error( std::string(__FUNCTION__) +
"Failed to realloc _conv_buf" );
}
}
}
bool bladerf_source_c::start()
{
int ret;
/* Initialize the stream */
_buf_index = 0;
ret = bladerf_init_stream( &_stream, _dev.get(), stream_callback,
&_buffers, _num_buffers, BLADERF_FORMAT_SC16_Q11,
_samples_per_buffer, _num_buffers, this );
if ( ret != 0 )
std::cerr << _pfx << "bladerf_init_stream failed: "
<< bladerf_strerror(ret) << std::endl;
ret = bladerf_enable_module( _dev.get(), BLADERF_MODULE_RX, true );
if ( ret != 0 )
std::cerr << _pfx << "bladerf_enable_module failed:"
<< bladerf_strerror(ret) << std::endl;
_thread = gr::thread::thread( boost::bind(&bladerf_source_c::read_task, this) );
while( is_running() == false ) {
boost::this_thread::sleep( boost::posix_time::milliseconds(1) );
_conv_buf = static_cast<int16_t*>(tmp);
}
return true;
}
bool bladerf_source_c::stop()
{
set_running(false);
_thread.join();
return true;
}
/* Main work function, pull samples from the sample fifo */
int bladerf_source_c::work( int noutput_items,
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items )
{
if ( ! is_running() )
/* Grab all the samples into the temporary buffer */
ret = bladerf_sync_rx(_dev.get(), static_cast<void *>(_conv_buf),
noutput_items, &meta, _stream_timeout_ms);
if ( ret != 0 ) {
std::cerr << _pfx << "bladerf_sync_rx error: "
<< bladerf_strerror(ret) << std::endl;
return WORK_DONE;
}
if( noutput_items > 0 ) {
gr_complex *out = (gr_complex *)output_items[0];
current = _conv_buf;
boost::unique_lock<boost::mutex> lock(_fifo_lock);
/* Convert them from fixed to floating point */
for (int i = 0; i < noutput_items; ++i) {
float x, y;
/* Wait until we have the requested number of samples */
int n_samples_avail = _fifo->size();
x = scaling * *current;
current++;
while (n_samples_avail < noutput_items) {
_samp_avail.wait(lock);
n_samples_avail = _fifo->size();
}
y = scaling * *current;
current++;
for(int i = 0; i < noutput_items; ++i) {
out[i] = _fifo->at(0);
_fifo->pop_front();
}
out[i] = gr_complex(x, y) ;
}
return noutput_items;

@ -66,8 +66,6 @@ private:
bladerf_source_c (const std::string & args); // private constructor
public:
~bladerf_source_c (); // public destructor
bool start();
bool stop();
@ -115,31 +113,8 @@ public:
double get_bandwidth( size_t chan = 0 );
osmosdr::freq_range_t get_bandwidth_range( size_t chan = 0 );
private: /* functions */
static void *stream_callback( struct bladerf *_dev,
struct bladerf_stream *stream,
struct bladerf_metadata *metadata,
void *samples,
size_t num_samples,
void *user_data );
void *stream_task(void *samples, size_t num_samples);
void read_task();
private: /* members */
private:
osmosdr::gain_range_t _lna_range;
/* The stream callback converts SC16Q11 samples from the bladeRF to gr_complex
* values, and adds them to this FIFO. work() fetches the gr_complex values
* from this queue */
boost::circular_buffer<gr_complex> *_fifo;
boost::mutex _fifo_lock;
/* work() will block if the stream callback hasn't produced samples. The
* callback uses this to notify work of the availability of samples */
boost::condition_variable _samp_avail;
};
#endif /* INCLUDED_BLADERF_SOURCE_C_H */

Loading…
Cancel
Save