From e1b699fda04b114c8973e4cbef929781d2312ba5 Mon Sep 17 00:00:00 2001 From: Dimitri Stolnikov Date: Fri, 1 Nov 2013 15:54:14 +0100 Subject: [PATCH] netsdr: replace boost asio sockets with native bsd sockets until we find out what's exactly interfering with fosphor when using python based flowgraphs (osmocom_fft). c++ flowgraphs are not affected. --- lib/netsdr/netsdr_source_c.cc | 221 ++++++++++++++++++++++++++++++++-- lib/netsdr/netsdr_source_c.h | 19 ++- 2 files changed, 223 insertions(+), 17 deletions(-) diff --git a/lib/netsdr/netsdr_source_c.cc b/lib/netsdr/netsdr_source_c.cc index cb54248..4539f1f 100644 --- a/lib/netsdr/netsdr_source_c.cc +++ b/lib/netsdr/netsdr_source_c.cc @@ -27,13 +27,25 @@ #include "config.h" #endif +#ifndef USE_ASIO +#include +#include +#include +#include +#include +#include +#include +#endif + #include #include #include #include #include +#ifdef USE_ASIO #include +#endif #include @@ -41,7 +53,9 @@ #include "netsdr_source_c.h" using namespace boost::assign; +#ifdef USE_ASIO using boost::asio::deadline_timer; +#endif #define DEFAULT_HOST "127.0.0.1" /* We assume a running moetronix server */ #define DEFAULT_PORT 50000 @@ -76,10 +90,15 @@ netsdr_source_c::netsdr_source_c (const std::string &args) : gr::sync_block ("netsdr_source_c", gr::io_signature::make (MIN_IN, MAX_IN, sizeof (gr_complex)), gr::io_signature::make (MIN_OUT, MAX_OUT, sizeof (gr_complex))), +#ifdef USE_ASIO _io_service(), _resolver(_io_service), _t(_io_service), _u(_io_service), +#else + _tcp(-1), + _udp(-1), +#endif _running(false), _sequence(0), _nchan(1) @@ -133,7 +152,7 @@ netsdr_source_c::netsdr_source_c (const std::string &args) if ( label.length() ) std::cerr << "Using " + label << " "; - +#ifdef USE_ASIO tcp::resolver::query query(tcp::v4(), host.c_str(), port_str.c_str()); tcp::resolver::iterator iterator = _resolver.resolve(query); @@ -155,6 +174,84 @@ netsdr_source_c::netsdr_source_c (const std::string &args) _u.set_option(udp::socket::reuse_address(true)); _t.set_option(udp::socket::reuse_address(true)); +#else + + if ( (_tcp = socket(AF_INET, SOCK_STREAM, 0) ) < 0) + { + throw std::runtime_error("Could not create TCP socket"); + } + + int sockoptval = 1; + setsockopt(_tcp, SOL_SOCKET, SO_REUSEADDR, &sockoptval, sizeof(int)); + sockoptval = 1; + setsockopt(_tcp, IPPROTO_TCP, TCP_NODELAY, &sockoptval, sizeof(int)); + + /* don't wait when shutting down */ + linger lngr; + lngr.l_onoff = 1; + lngr.l_linger = 0; + setsockopt(_tcp, SOL_SOCKET, SO_LINGER, &lngr, sizeof(linger)); + + struct hostent *hp; /* host information */ + struct sockaddr_in host_sa; /* local address */ + struct sockaddr_in peer_sa; /* remote address */ + + /* look up the address of the server given its name */ + hp = gethostbyname( host.c_str() ); + if (!hp) { + close(_tcp); + throw std::runtime_error(std::string(hstrerror(h_errno)) + " (" + host + ")"); + } + + /* fill in the hosts's address and data */ + memset(&host_sa, 0, sizeof(host_sa)); + host_sa.sin_family = AF_INET; + host_sa.sin_addr.s_addr = htonl(INADDR_ANY); + host_sa.sin_port = htons(0); + + if ( bind(_tcp, (struct sockaddr *)&host_sa, sizeof(host_sa)) < 0 ) + { + close(_tcp); + throw std::runtime_error("Bind of TCP socket failed: " + std::string(strerror(errno))); + } + + /* fill in the server's address and data */ + memset(&peer_sa, 0, sizeof(peer_sa)); + peer_sa.sin_family = AF_INET; + peer_sa.sin_port = htons(port); + + /* put the host's address into the server address structure */ + memcpy((void *)&peer_sa.sin_addr, hp->h_addr_list[0], hp->h_length); + + /* connect to server */ + if ( connect(_tcp, (struct sockaddr *)&peer_sa, sizeof(peer_sa)) < 0 ) + { + close(_tcp); + throw std::runtime_error(std::string(strerror(errno)) + " (" + host + ":" + port_str + ")"); + } + + if ( (_udp = socket(AF_INET, SOCK_DGRAM, 0)) < 0 ) + { + close(_tcp); + throw std::runtime_error("Could not create UDP socket"); + } + + sockoptval = 1; + setsockopt(_udp, SOL_SOCKET, SO_REUSEADDR, &sockoptval, sizeof(int)); + + /* fill in the hosts's address and data */ + memset(&host_sa, 0, sizeof(host_sa)); + host_sa.sin_family = AF_INET; + host_sa.sin_addr.s_addr = htonl(INADDR_ANY); + host_sa.sin_port = htons(DEFAULT_PORT); + + if ( bind(_udp, (struct sockaddr *)&host_sa, sizeof(host_sa)) < 0 ) + { + close(_tcp); + close(_udp); + throw std::runtime_error("Bind of UDP socket failed: " + std::string(strerror(errno))); + } +#endif // request & print device information std::vector< unsigned char > response; @@ -240,7 +337,10 @@ netsdr_source_c::netsdr_source_c (const std::string &args) */ netsdr_source_c::~netsdr_source_c () { - +#ifndef USE_ASIO + close(_tcp); + close(_udp); +#endif } void netsdr_source_c::apply_channel( unsigned char *cmd, size_t chan_pos, size_t chan ) @@ -279,6 +379,7 @@ bool netsdr_source_c::transaction( const unsigned char *cmd, size_t size ) bool netsdr_source_c::transaction( const unsigned char *cmd, size_t size, std::vector< unsigned char > &response ) { + size_t rx_bytes = 0; unsigned char data[1024*2]; response.clear(); @@ -289,9 +390,30 @@ bool netsdr_source_c::transaction( const unsigned char *cmd, size_t size, printf("\n"); #endif +#ifdef USE_ASIO _t.write_some( boost::asio::buffer(cmd, size) ); - size_t rx_bytes = _t.read_some( boost::asio::buffer(data, sizeof(data)) ); + rx_bytes = _t.read_some( boost::asio::buffer(data, sizeof(data)) ); +#else + write(_tcp, cmd, size); + + int nbytes = read(_tcp, data, 2); /* read header */ + if ( nbytes != 2 ) + return false; + + int length = (data[1] & 0x1f) | data[0]; + + if ( (length < 2) || (length > (int)sizeof(data)) ) + return false; + + length -= 2; /* subtract header size */ + + nbytes = read(_tcp, &data[2], length); /* read payload */ + if ( nbytes != length ) + return false; + + rx_bytes = 2 + length; /* header + payload */ +#endif response.resize( rx_bytes ); memcpy( response.data(), data, rx_bytes ); @@ -332,14 +454,23 @@ int netsdr_source_c::work(int noutput_items, gr_vector_const_void_star &input_items, gr_vector_void_star &output_items ) { - udp::endpoint ep; unsigned char data[1024*2]; if ( ! _running ) return WORK_DONE; - +#ifdef USE_ASIO + udp::endpoint ep; size_t rx_bytes = _u.receive_from( boost::asio::buffer(data, sizeof(data)), ep ); - +#else + struct sockaddr_in sa_in; /* remote address */ + socklen_t addrlen = sizeof(sa_in); /* length of addresses */ + ssize_t rx_bytes = recvfrom(_udp, data, sizeof(data), 0, (struct sockaddr *)&sa_in, &addrlen); + if ( rx_bytes <= 0 ) + { + std::cerr << "recvfrom returned " << rx_bytes << std::endl; + return WORK_DONE; + } +#endif #define HEADER_SIZE 2 #define SEQNUM_SIZE 2 @@ -365,7 +496,13 @@ int netsdr_source_c::work(int noutput_items, if ( diff > 1 ) { - std::cerr << "Lost " << diff << " packets from NetSDR at " << ep << std::endl; + std::cerr << "Lost " << diff << " packets from " +#ifdef USE_ASIO + << ep +#else + << inet_ntoa(sa_in.sin_addr) << ":" << ntohs(sa_in.sin_port) +#endif + << std::endl; } _sequence = (0xffff == sequence) ? 0 : sequence; @@ -443,6 +580,7 @@ typedef struct uint16_t port; } unit_t; +#ifdef USE_ASIO static void handle_receive( const boost::system::error_code& ec, std::size_t length, boost::system::error_code* out_ec, @@ -457,16 +595,17 @@ static void handle_timer( const boost::system::error_code& ec, { *out_ec = boost::asio::error::timed_out; } +#endif static std::vector < unit_t > discover_netsdr() { std::vector < unit_t > units; +#ifdef USE_ASIO boost::system::error_code ec; boost::asio::io_service ios; udp::socket socket(ios); deadline_timer timer(ios); - unsigned char data[1024*2]; timer.expires_at(boost::posix_time::pos_infin); @@ -482,7 +621,47 @@ static std::vector < unit_t > discover_netsdr() socket.set_option(udp::socket::reuse_address(true)); socket.set_option(boost::asio::socket_base::broadcast(true)); +#else + int sock; + if ( (sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0 ) + return units; + + int sockoptval = 1; + setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &sockoptval, sizeof(int)); + sockoptval = 1; + setsockopt(sock, SOL_SOCKET, SO_BROADCAST, &sockoptval, sizeof(int)); + + struct sockaddr_in host_sa; /* local address */ + struct sockaddr_in peer_sa; /* remote address */ + + /* fill in the server's address and data */ + memset((char*)&peer_sa, 0, sizeof(peer_sa)); + peer_sa.sin_family = AF_INET; + peer_sa.sin_addr.s_addr = htonl(INADDR_BROADCAST); + peer_sa.sin_port = htons(DISCOVER_SERVER_PORT); + + /* fill in the hosts's address and data */ + memset(&host_sa, 0, sizeof(host_sa)); + host_sa.sin_family = AF_INET; + host_sa.sin_addr.s_addr = htonl(INADDR_ANY); + host_sa.sin_port = htons(DISCOVER_CLIENT_PORT); + + if ( bind(sock, (struct sockaddr *)&host_sa, sizeof(host_sa)) < 0 ) + { + close(sock); + return units; + } + + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = 100000; + if ( setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0 ) + { + close(sock); + return units; + } +#endif discover_common_msg_t tx_msg; memset( (void *)&tx_msg, 0, sizeof(discover_common_msg_t) ); @@ -491,19 +670,24 @@ static std::vector < unit_t > discover_netsdr() tx_msg.key[0] = KEY0; tx_msg.key[1] = KEY1; tx_msg.op = MSG_REQ; - +#ifdef USE_ASIO udp::endpoint ep(boost::asio::ip::address_v4::broadcast(), DISCOVER_SERVER_PORT); socket.send_to(boost::asio::buffer(&tx_msg, sizeof(tx_msg)), ep); - +#else + sendto(sock, &tx_msg, sizeof(tx_msg), 0, (struct sockaddr *)&peer_sa, sizeof(peer_sa)); +#endif while ( true ) { + std::size_t rx_bytes = 0; + unsigned char data[1024*2]; + +#ifdef USE_ASIO // Set up the variables that receive the result of the asynchronous // operation. The error code is set to would_block to signal that the // operation is incomplete. Asio guarantees that its asynchronous // operations will never fail with would_block, so any other value in // ec indicates completion. ec = boost::asio::error::would_block; - std::size_t rx_bytes = 0; // Start the asynchronous receive operation. The handle_receive function // used as a callback will update the ec and rx_bytes variables. @@ -536,6 +720,14 @@ static std::vector < unit_t > discover_netsdr() { timer.cancel(); } +#else + socklen_t addrlen = sizeof(peer_sa); /* length of addresses */ + int nbytes = recvfrom(sock, data, sizeof(data), 0, (struct sockaddr *)&peer_sa, &addrlen); + if ( nbytes <= 0 ) + break; + + rx_bytes = nbytes; +#endif if ( rx_bytes >= sizeof(discover_common_msg_t) ) { @@ -562,8 +754,11 @@ static std::vector < unit_t > discover_netsdr() } } } - +#ifdef USE_ASIO socket.close(ec); +#else + close(sock); +#endif return units; } @@ -584,7 +779,7 @@ std::vector netsdr_source_c::get_devices( bool fake ) } if ( devices.empty() && fake ) - devices += str(boost::format("netsdr=%s:%d,label='RFSPACE NetSDR'") + devices += str(boost::format("netsdr=%s:%d,label='RFSPACE NetSDR Server'") % DEFAULT_HOST % DEFAULT_PORT); return devices; diff --git a/lib/netsdr/netsdr_source_c.h b/lib/netsdr/netsdr_source_c.h index d643ba6..6f4b634 100644 --- a/lib/netsdr/netsdr_source_c.h +++ b/lib/netsdr/netsdr_source_c.h @@ -20,20 +20,27 @@ #ifndef INCLUDED_NETSDR_SOURCE_C_H #define INCLUDED_NETSDR_SOURCE_C_H -#include +//#define USE_ASIO +#ifdef USE_ASIO +#include +#endif #include #include #include #include "osmosdr/ranges.h" #include "source_iface.h" - +#ifdef USE_ASIO using boost::asio::ip::tcp; using boost::asio::ip::udp; - +#endif class netsdr_source_c; +#ifndef SOCKET +#define SOCKET int +#endif + /* * We use boost::shared_ptr's instead of raw pointers for all access * to gr_blocks (and many other data structures). The shared_ptr gets @@ -118,11 +125,15 @@ private: /* functions */ std::vector< unsigned char > &response ); private: /* members */ +#ifdef USE_ASIO boost::asio::io_service _io_service; tcp::resolver _resolver; tcp::socket _t; udp::socket _u; - +#else + SOCKET _tcp; + SOCKET _udp; +#endif bool _running; uint16_t _sequence;