transceiver: get rid of the ctrl threads

There is no need to have n threads handle n ctrl sockets, since they all
will immediately respond to commands, so handle them from the existing
main osmo select loop.

Care must be taken to ensure that calls from within the command handler
do not block, or at least don't block too long, which currently is the
case.

Change-Id: I642a34451e1825eafecf71a902df916ccee7944c
This commit is contained in:
Eric Wild 2020-04-08 01:39:17 +02:00 committed by Hoernchen
parent 4ffdca10d4
commit a7143bf7a1
3 changed files with 124 additions and 75 deletions

View File

@ -45,6 +45,8 @@ extern "C" {
using namespace GSM; using namespace GSM;
Transceiver *transceiver;
#define USB_LATENCY_INTRVL 10,0 #define USB_LATENCY_INTRVL 10,0
/* Number of running values use in noise average */ /* Number of running values use in noise average */
@ -151,20 +153,30 @@ Transceiver::~Transceiver()
close(mClockSocket); close(mClockSocket);
for (size_t i = 0; i < mChans; i++) { for (size_t i = 0; i < mChans; i++) {
if (mControlServiceLoopThreads[i]) {
mControlServiceLoopThreads[i]->cancel();
mControlServiceLoopThreads[i]->join();
delete mControlServiceLoopThreads[i];
}
mTxPriorityQueues[i].clear(); mTxPriorityQueues[i].clear();
if (mCtrlSockets[i] >= 0)
close(mCtrlSockets[i]);
if (mDataSockets[i] >= 0) if (mDataSockets[i] >= 0)
close(mDataSockets[i]); close(mDataSockets[i]);
} }
} }
int Transceiver::ctrl_sock_cb(struct osmo_fd *bfd, unsigned int flags)
{
int rc = 0;
int chan = static_cast<int>(reinterpret_cast<uintptr_t>(bfd->data));
if (flags & OSMO_FD_READ)
rc = transceiver->ctrl_sock_handle_rx(chan);
if (rc < 0)
osmo_signal_dispatch(SS_MAIN, S_MAIN_STOP_REQUIRED, NULL);
if (flags & OSMO_FD_WRITE)
rc = transceiver->ctrl_sock_write(chan);
if (rc < 0)
osmo_signal_dispatch(SS_MAIN, S_MAIN_STOP_REQUIRED, NULL);
return rc;
}
/* /*
* Initialize transceiver * Initialize transceiver
* *
@ -193,8 +205,7 @@ bool Transceiver::init(FillerType filler, size_t rtsc, unsigned rach_delay,
mEdge = edge; mEdge = edge;
mDataSockets.resize(mChans, -1); mDataSockets.resize(mChans, -1);
mCtrlSockets.resize(mChans, -1); mCtrlSockets.resize(mChans);
mControlServiceLoopThreads.resize(mChans);
mTxPriorityQueueServiceLoopThreads.resize(mChans); mTxPriorityQueueServiceLoopThreads.resize(mChans);
mRxServiceLoopThreads.resize(mChans); mRxServiceLoopThreads.resize(mChans);
@ -216,24 +227,34 @@ bool Transceiver::init(FillerType filler, size_t rtsc, unsigned rach_delay,
return false; return false;
for (size_t i = 0; i < mChans; i++) { for (size_t i = 0; i < mChans; i++) {
int rv;
c_srcport = mBasePort + 2 * i + 1; c_srcport = mBasePort + 2 * i + 1;
c_dstport = mBasePort + 2 * i + 101; c_dstport = mBasePort + 2 * i + 101;
d_srcport = mBasePort + 2 * i + 2; d_srcport = mBasePort + 2 * i + 2;
d_dstport = mBasePort + 2 * i + 102; d_dstport = mBasePort + 2 * i + 102;
mCtrlSockets[i] = osmo_sock_init2(AF_UNSPEC, SOCK_DGRAM, IPPROTO_UDP, rv = osmo_sock_init2_ofd(&mCtrlSockets[i].conn_bfd, AF_UNSPEC, SOCK_DGRAM, IPPROTO_UDP,
mLocalAddr.c_str(), c_srcport, mLocalAddr.c_str(), c_srcport,
mRemoteAddr.c_str(), c_dstport, mRemoteAddr.c_str(), c_dstport,
OSMO_SOCK_F_BIND | OSMO_SOCK_F_CONNECT); OSMO_SOCK_F_BIND | OSMO_SOCK_F_CONNECT);
if (mCtrlSockets[i] < 0) if (rv < 0)
return false; return false;
mCtrlSockets[i].conn_bfd.cb = ctrl_sock_cb;
mCtrlSockets[i].conn_bfd.data = reinterpret_cast<void*>(i);
mDataSockets[i] = osmo_sock_init2(AF_UNSPEC, SOCK_DGRAM, IPPROTO_UDP, mDataSockets[i] = osmo_sock_init2(AF_UNSPEC, SOCK_DGRAM, IPPROTO_UDP,
mLocalAddr.c_str(), d_srcport, mLocalAddr.c_str(), d_srcport,
mRemoteAddr.c_str(), d_dstport, mRemoteAddr.c_str(), d_dstport,
OSMO_SOCK_F_BIND | OSMO_SOCK_F_CONNECT); OSMO_SOCK_F_BIND | OSMO_SOCK_F_CONNECT);
if (mDataSockets[i] < 0) if (mDataSockets[i] < 0)
return false; return false;
if (i && filler == FILLER_DUMMY)
filler = FILLER_ZERO;
mStates[i].init(filler, mSPSTx, txFullScale, rtsc, rach_delay);
} }
/* Randomize the central clock */ /* Randomize the central clock */
@ -243,21 +264,6 @@ bool Transceiver::init(FillerType filler, size_t rtsc, unsigned rach_delay,
mLastClockUpdateTime = startTime; mLastClockUpdateTime = startTime;
mLatencyUpdateTime = startTime; mLatencyUpdateTime = startTime;
/* Start control threads */
for (size_t i = 0; i < mChans; i++) {
TrxChanThParams *params = (TrxChanThParams *)malloc(sizeof(struct TrxChanThParams));
params->trx = this;
params->num = i;
mControlServiceLoopThreads[i] = new Thread(stackSize);
mControlServiceLoopThreads[i]->start((void * (*)(void*))
ControlServiceLoopAdapter, (void*) params);
if (i && filler == FILLER_DUMMY)
filler = FILLER_ZERO;
mStates[i].init(filler, mSPSTx, txFullScale, rtsc, rach_delay);
}
return true; return true;
} }
@ -719,8 +725,6 @@ void Transceiver::reset()
} }
#define MAX_PACKET_LENGTH 100
/** /**
* Matches a buffer with a command. * Matches a buffer with a command.
* @param buf a buffer to look command in * @param buf a buffer to look command in
@ -750,27 +754,77 @@ static bool match_cmd(char *buf,
return true; return true;
} }
bool Transceiver::driveControl(size_t chan) void Transceiver::ctrl_sock_send(ctrl_msg& m, int chan)
{ {
char buffer[MAX_PACKET_LENGTH + 1]; ctrl_sock_state& s = mCtrlSockets[chan];
char response[MAX_PACKET_LENGTH + 1]; struct osmo_fd *conn_bfd = &s.conn_bfd;
s.txmsgqueue.push_back(m);
conn_bfd->when |= OSMO_FD_WRITE;
}
int Transceiver::ctrl_sock_write(int chan)
{
int rc;
ctrl_sock_state& s = mCtrlSockets[chan];
if (s.conn_bfd.fd < 0) {
return -EIO;
}
while (s.txmsgqueue.size()) {
const ctrl_msg m = s.txmsgqueue.front();
s.conn_bfd.when &= ~OSMO_FD_WRITE;
/* try to send it over the socket */
rc = write(s.conn_bfd.fd, m.data, strlen(m.data) + 1);
if (rc == 0)
goto close;
if (rc < 0) {
if (errno == EAGAIN) {
s.conn_bfd.when |= OSMO_FD_WRITE;
break;
}
goto close;
}
s.txmsgqueue.pop_front();
}
return 0;
close:
LOGCHAN(chan, DTRXCTRL, NOTICE) << "mCtrlSockets write(" << s.conn_bfd.fd << ") failed: " << rc;
return -1;
}
int Transceiver::ctrl_sock_handle_rx(int chan)
{
ctrl_msg cmd_received;
ctrl_msg cmd_to_send;
char *buffer = cmd_received.data;
char *response = cmd_to_send.data;
char *command, *params; char *command, *params;
int msgLen; int msgLen;
ctrl_sock_state& s = mCtrlSockets[chan];
/* Attempt to read from control socket */ /* Attempt to read from control socket */
msgLen = read(mCtrlSockets[chan], buffer, MAX_PACKET_LENGTH); msgLen = read(s.conn_bfd.fd, buffer, sizeof(cmd_received.data)-1);
if (msgLen < 0 && errno == EAGAIN)
return 0; /* Try again later */
if (msgLen <= 0) { if (msgLen <= 0) {
LOGCHAN(chan, DTRXCTRL, NOTICE) << "mCtrlSockets read(" << mCtrlSockets[chan] << ") failed: " << msgLen; LOGCHAN(chan, DTRXCTRL, NOTICE) << "mCtrlSockets read(" << s.conn_bfd.fd << ") failed: " << msgLen;
return false; return -EIO;
} }
/* Zero-terminate received string */ /* Zero-terminate received string */
buffer[msgLen] = '\0'; buffer[msgLen] = '\0';
/* Verify a command signature */ /* Verify a command signature */
if (strncmp(buffer, "CMD ", 4)) { if (strncmp(buffer, "CMD ", 4)) {
LOGCHAN(chan, DTRXCTRL, NOTICE) << "bogus message on control interface"; LOGCHAN(chan, DTRXCTRL, NOTICE) << "bogus message on control interface";
return false; return -EIO;
} }
/* Set command pointer */ /* Set command pointer */
@ -889,7 +943,7 @@ bool Transceiver::driveControl(size_t chan)
if ((timeslot < 0) || (timeslot > 7)) { if ((timeslot < 0) || (timeslot > 7)) {
LOGCHAN(chan, DTRXCTRL, NOTICE) << "bogus message on control interface"; LOGCHAN(chan, DTRXCTRL, NOTICE) << "bogus message on control interface";
sprintf(response,"RSP SETSLOT 1 %d %d",timeslot,corrCode); sprintf(response,"RSP SETSLOT 1 %d %d",timeslot,corrCode);
return true; return 0;
} }
mStates[chan].chanType[timeslot] = (ChannelCombination) corrCode; mStates[chan].chanType[timeslot] = (ChannelCombination) corrCode;
setModulus(timeslot, chan); setModulus(timeslot, chan);
@ -921,12 +975,8 @@ bool Transceiver::driveControl(size_t chan)
} }
LOGCHAN(chan, DTRXCTRL, INFO) << "response is '" << response << "'"; LOGCHAN(chan, DTRXCTRL, INFO) << "response is '" << response << "'";
msgLen = write(mCtrlSockets[chan], response, strlen(response) + 1); transceiver->ctrl_sock_send(cmd_to_send, chan);
if (msgLen <= 0) { return 0;
LOGCHAN(chan, DTRXCTRL, NOTICE) << "mCtrlSockets write(" << mCtrlSockets[chan] << ") failed: " << msgLen;
return false;
}
return true;
} }
bool Transceiver::driveTxPriorityQueue(size_t chan) bool Transceiver::driveTxPriorityQueue(size_t chan)
@ -1179,28 +1229,6 @@ void *TxLowerLoopAdapter(Transceiver *transceiver)
return NULL; return NULL;
} }
void *ControlServiceLoopAdapter(TrxChanThParams *params)
{
char thread_name[16];
Transceiver *trx = params->trx;
size_t num = params->num;
free(params);
snprintf(thread_name, 16, "CtrlService%zu", num);
set_selfthread_name(thread_name);
while (1) {
if (!trx->driveControl(num)) {
LOGCHAN(num, DTRXCTRL, FATAL) << "Something went wrong in thread " << thread_name << ", requesting stop";
osmo_signal_dispatch(SS_MAIN, S_MAIN_STOP_REQUIRED, NULL);
break;
}
pthread_testcancel();
}
return NULL;
}
void *TxUpperLoopAdapter(TrxChanThParams *params) void *TxUpperLoopAdapter(TrxChanThParams *params)
{ {
char thread_name[16]; char thread_name[16];

View File

@ -33,11 +33,14 @@
extern "C" { extern "C" {
#include <osmocom/core/signal.h> #include <osmocom/core/signal.h>
#include <osmocom/core/select.h>
#include "config_defs.h" #include "config_defs.h"
} }
class Transceiver; class Transceiver;
extern Transceiver *transceiver;
/** Channel descriptor for transceiver object and channel number pair */ /** Channel descriptor for transceiver object and channel number pair */
struct TrxChanThParams { struct TrxChanThParams {
Transceiver *trx; Transceiver *trx;
@ -142,12 +145,33 @@ public:
} ChannelCombination; } ChannelCombination;
private: private:
struct ctrl_msg {
char data[101];
ctrl_msg() {};
};
struct ctrl_sock_state {
osmo_fd conn_bfd;
std::deque<ctrl_msg> txmsgqueue;
ctrl_sock_state() {
conn_bfd.fd = -1;
}
~ctrl_sock_state() {
if(conn_bfd.fd >= 0) {
close(conn_bfd.fd);
conn_bfd.fd = -1;
osmo_fd_unregister(&conn_bfd);
}
}
};
int mBasePort; int mBasePort;
std::string mLocalAddr; std::string mLocalAddr;
std::string mRemoteAddr; std::string mRemoteAddr;
std::vector<int> mDataSockets; ///< socket for writing to/reading from GSM core std::vector<int> mDataSockets; ///< socket for writing to/reading from GSM core
std::vector<int> mCtrlSockets; ///< socket for writing/reading control commands from GSM core std::vector<ctrl_sock_state> mCtrlSockets; ///< socket for writing/reading control commands from GSM core
int mClockSocket; ///< socket for writing clock updates to GSM core int mClockSocket; ///< socket for writing clock updates to GSM core
std::vector<VectorQueue> mTxPriorityQueues; ///< priority queue of transmit bursts received from GSM core std::vector<VectorQueue> mTxPriorityQueues; ///< priority queue of transmit bursts received from GSM core
@ -156,7 +180,6 @@ private:
std::vector<Thread *> mRxServiceLoopThreads; ///< thread to pull bursts into receive FIFO std::vector<Thread *> mRxServiceLoopThreads; ///< thread to pull bursts into receive FIFO
Thread *mRxLowerLoopThread; ///< thread to pull bursts into receive FIFO Thread *mRxLowerLoopThread; ///< thread to pull bursts into receive FIFO
Thread *mTxLowerLoopThread; ///< thread to push bursts into transmit FIFO Thread *mTxLowerLoopThread; ///< thread to push bursts into transmit FIFO
std::vector<Thread *> mControlServiceLoopThreads; ///< thread to process control messages from GSM core
std::vector<Thread *> mTxPriorityQueueServiceLoopThreads; ///< thread to process transmit bursts from GSM core std::vector<Thread *> mTxPriorityQueueServiceLoopThreads; ///< thread to process transmit bursts from GSM core
GSM::Time mTransmitLatency; ///< latency between basestation clock and transmit deadline clock GSM::Time mTransmitLatency; ///< latency between basestation clock and transmit deadline clock
@ -193,6 +216,12 @@ private:
/** send messages over the clock socket */ /** send messages over the clock socket */
bool writeClockInterface(void); bool writeClockInterface(void);
static int ctrl_sock_cb(struct osmo_fd *bfd, unsigned int flags);
int ctrl_sock_write(int chan);
void ctrl_sock_send(ctrl_msg& m, int chan);
/** drive handling of control messages from GSM core */
int ctrl_sock_handle_rx(int chan);
int mSPSTx; ///< number of samples per Tx symbol int mSPSTx; ///< number of samples per Tx symbol
int mSPSRx; ///< number of samples per Rx symbol int mSPSRx; ///< number of samples per Rx symbol
size_t mChans; size_t mChans;
@ -229,9 +258,6 @@ protected:
/** drive transmission of GSM bursts */ /** drive transmission of GSM bursts */
void driveTxFIFO(); void driveTxFIFO();
/** drive handling of control messages from GSM core */
bool driveControl(size_t chan);
/** /**
drive modulation and sorting of GSM bursts from GSM core drive modulation and sorting of GSM bursts from GSM core
@return true if a burst was transferred successfully @return true if a burst was transferred successfully
@ -242,7 +268,6 @@ protected:
friend void *TxUpperLoopAdapter(TrxChanThParams *params); friend void *TxUpperLoopAdapter(TrxChanThParams *params);
friend void *RxLowerLoopAdapter(Transceiver *transceiver); friend void *RxLowerLoopAdapter(Transceiver *transceiver);
friend void *TxLowerLoopAdapter(Transceiver *transceiver); friend void *TxLowerLoopAdapter(Transceiver *transceiver);
friend void *ControlServiceLoopAdapter(TrxChanThParams *params);
void reset(); void reset();
@ -256,8 +281,5 @@ void *RxUpperLoopAdapter(TrxChanThParams *params);
void *RxLowerLoopAdapter(Transceiver *transceiver); void *RxLowerLoopAdapter(Transceiver *transceiver);
void *TxLowerLoopAdapter(Transceiver *transceiver); void *TxLowerLoopAdapter(Transceiver *transceiver);
/** control message handler thread loop */
void *ControlServiceLoopAdapter(TrxChanThParams *params);
/** transmit queueing thread loop */ /** transmit queueing thread loop */
void *TxUpperLoopAdapter(TrxChanThParams *params); void *TxUpperLoopAdapter(TrxChanThParams *params);

View File

@ -79,7 +79,6 @@ static struct ctrl_handle *g_ctrlh;
static RadioDevice *usrp; static RadioDevice *usrp;
static RadioInterface *radio; static RadioInterface *radio;
static Transceiver *transceiver;
/* Create radio interface /* Create radio interface
* The interface consists of sample rate changes, frequency shifts, * The interface consists of sample rate changes, frequency shifts,