diff --git a/CommonLibs/Interthread.h b/CommonLibs/Interthread.h index 881e1a87..207ada21 100644 --- a/CommonLibs/Interthread.h +++ b/CommonLibs/Interthread.h @@ -517,7 +517,7 @@ public: @param timeout The blocking timeout in ms. @return Pointer at key or NULL on timeout. */ - D* read(const K &key, unsigned timeout) const + D* read(const K &key, unsigned timeout) { if (timeout==0) return readNoBlock(key); ScopedLock lock(mLock); @@ -537,7 +537,7 @@ public: @param key The key to read from. @return Pointer at key. */ - D* read(const K &key) const + D* read(const K &key) { ScopedLock lock(mLock); typename Map::const_iterator iter = mMap.find(key); diff --git a/CommonLibs/Threads.cpp b/CommonLibs/Threads.cpp index b6750ab5..377a1b07 100644 --- a/CommonLibs/Threads.cpp +++ b/CommonLibs/Threads.cpp @@ -43,71 +43,6 @@ using namespace std; #endif -Mutex gStreamLock; ///< Global lock to control access to cout and cerr. - -void lockCout() -{ - gStreamLock.lock(); - Timeval entryTime; - cout << entryTime << " " << osmo_gettid() << ": "; -} - - -void unlockCout() -{ - cout << dec << endl << flush; - gStreamLock.unlock(); -} - - -void lockCerr() -{ - gStreamLock.lock(); - Timeval entryTime; - cerr << entryTime << " " << osmo_gettid() << ": "; -} - -void unlockCerr() -{ - cerr << dec << endl << flush; - gStreamLock.unlock(); -} - - - - - - - -Mutex::Mutex() -{ - bool res; - res = pthread_mutexattr_init(&mAttribs); - assert(!res); - res = pthread_mutexattr_settype(&mAttribs,PTHREAD_MUTEX_RECURSIVE); - assert(!res); - res = pthread_mutex_init(&mMutex,&mAttribs); - assert(!res); -} - - -Mutex::~Mutex() -{ - pthread_mutex_destroy(&mMutex); - bool res = pthread_mutexattr_destroy(&mAttribs); - assert(!res); -} - - - - -/** Block for the signal up to the cancellation timeout. */ -void Signal::wait(Mutex& wMutex, unsigned timeout) const -{ - Timeval then(timeout); - struct timespec waitTime = then.timespec(); - pthread_cond_timedwait(&mSignal,&wMutex.mMutex,&waitTime); -} void set_selfthread_name(const char *name) { diff --git a/CommonLibs/Threads.h b/CommonLibs/Threads.h index 5ff137bc..24fec002 100644 --- a/CommonLibs/Threads.h +++ b/CommonLibs/Threads.h @@ -28,143 +28,96 @@ #ifndef THREADS_H #define THREADS_H -#include "config.h" - +#include +#include +#include #include #include -#include +#include #include +#include "config.h" +#include "Timeval.h" + class Mutex; - -/**@name Multithreaded access for standard streams. */ -//@{ - -/**@name Functions for gStreamLock. */ -//@{ -extern Mutex gStreamLock; ///< global lock for cout and cerr -void lockCerr(); ///< call prior to writing cerr -void unlockCerr(); ///< call after writing cerr -void lockCout(); ///< call prior to writing cout -void unlockCout(); ///< call after writing cout -//@} - -/**@name Macros for standard messages. */ -//@{ -#define COUT(text) { lockCout(); std::cout << text; unlockCout(); } -#define CERR(text) { lockCerr(); std::cerr << __FILE__ << ":" << __LINE__ << ": " << text; unlockCerr(); } -#ifdef NDEBUG -#define DCOUT(text) {} -#define OBJDCOUT(text) {} -#else -#define DCOUT(text) { COUT(__FILE__ << ":" << __LINE__ << " " << text); } -#define OBJDCOUT(text) { DCOUT(this << " " << text); } -#endif -//@} -//@} - - - /**@defgroup C++ wrappers for pthread mechanisms. */ //@{ -/** A class for recursive mutexes based on pthread_mutex. */ +/** A class for recursive mutexes. */ class Mutex { + std::recursive_mutex m; - private: + public: - pthread_mutex_t mMutex; - pthread_mutexattr_t mAttribs; + void lock() { + m.lock(); + } - public: + bool trylock() { + return m.try_lock(); + } - Mutex(); - - ~Mutex(); - - void lock() { pthread_mutex_lock(&mMutex); } - - bool trylock() { return pthread_mutex_trylock(&mMutex)==0; } - - void unlock() { pthread_mutex_unlock(&mMutex); } + void unlock() { + m.unlock(); + } friend class Signal; - }; - class ScopedLock { + Mutex &mMutex; - private: - Mutex& mMutex; - - public: - ScopedLock(Mutex& wMutex) :mMutex(wMutex) { mMutex.lock(); } - ~ScopedLock() { mMutex.unlock(); } - + public: + ScopedLock(Mutex &wMutex) : mMutex(wMutex) { + mMutex.lock(); + } + ~ScopedLock() { + mMutex.unlock(); + } }; - - - -/** A C++ interthread signal based on pthread condition variables. */ +/** A C++ interthread signal. */ class Signal { + /* any, because for some reason our mutex is recursive... */ + std::condition_variable_any mSignal; - private: + public: - mutable pthread_cond_t mSignal; + void wait(Mutex &wMutex, unsigned timeout) { + mSignal.wait_for(wMutex.m, std::chrono::milliseconds(timeout)); + } - public: + void wait(Mutex &wMutex) { + mSignal.wait(wMutex.m); + } - Signal() { int s = pthread_cond_init(&mSignal,NULL); assert(!s); } - - ~Signal() { pthread_cond_destroy(&mSignal); } - - /** - Block for the signal up to the cancellation timeout. - Under Linux, spurious returns are possible. - */ - void wait(Mutex& wMutex, unsigned timeout) const; - - /** - Block for the signal. - Under Linux, spurious returns are possible. - */ - void wait(Mutex& wMutex) const - { pthread_cond_wait(&mSignal,&wMutex.mMutex); } - - void signal() { pthread_cond_signal(&mSignal); } - - void broadcast() { pthread_cond_broadcast(&mSignal); } + void signal() { + mSignal.notify_one(); + } + void broadcast() { + mSignal.notify_all(); + } }; - - -#define START_THREAD(thread,function,argument) \ - thread.start((void *(*)(void*))function, (void*)argument); - void set_selfthread_name(const char *name); void thread_enable_cancel(bool cancel); /** A C++ wrapper for pthread threads. */ class Thread { - - private: - + private: pthread_t mThread; pthread_attr_t mAttrib; // FIXME -- Can this be reduced now? size_t mStackSize; - - public: - + public: /** Create a thread in a non-running state. */ - Thread(size_t wStackSize = 0):mThread((pthread_t)0) { - pthread_attr_init(&mAttrib); // (pat) moved this here. - mStackSize=wStackSize; + Thread(size_t wStackSize = 0) : mThread((pthread_t)0) + { + pthread_attr_init(&mAttrib); // (pat) moved this here. + mStackSize = wStackSize; } /** @@ -172,14 +125,17 @@ class Thread { It should be stopped and joined. */ // (pat) If the Thread is destroyed without being started, then mAttrib is undefined. Oops. - ~Thread() { pthread_attr_destroy(&mAttrib); } - + ~Thread() + { + pthread_attr_destroy(&mAttrib); + } /** Start the thread on a task. */ - void start(void *(*task)(void*), void *arg); + void start(void *(*task)(void *), void *arg); /** Join a thread that will stop on its own. */ - void join() { + void join() + { if (mThread) { int s = pthread_join(mThread, NULL); assert(!s); @@ -187,7 +143,10 @@ class Thread { } /** Send cancellation to thread */ - void cancel() { pthread_cancel(mThread); } + void cancel() + { + pthread_cancel(mThread); + } }; #ifdef HAVE_ATOMIC_OPS diff --git a/Transceiver52M/Transceiver.cpp b/Transceiver52M/Transceiver.cpp index 42aa93e3..90a3a6ca 100644 --- a/Transceiver52M/Transceiver.cpp +++ b/Transceiver52M/Transceiver.cpp @@ -135,12 +135,13 @@ bool TransceiverState::init(FillerType filler, size_t sps, float scale, size_t r Transceiver::Transceiver(const struct trx_cfg *cfg, GSM::Time wTransmitLatency, RadioInterface *wRadioInterface) - : cfg(cfg), mClockSocket(-1), - mRxLowerLoopThread(nullptr), mTxLowerLoopThread(nullptr), - mTransmitLatency(wTransmitLatency), mRadioInterface(wRadioInterface), - mChans(cfg->num_chans), mOn(false), mForceClockInterface(false), - mTxFreq(0.0), mRxFreq(0.0), mTSC(0), mMaxExpectedDelayAB(0), - mMaxExpectedDelayNB(0), mWriteBurstToDiskMask(0) + : mChans(cfg->num_chans), cfg(cfg), + mCtrlSockets(mChans), mClockSocket(-1), + mTxPriorityQueues(mChans), mReceiveFIFO(mChans), + mRxServiceLoopThreads(mChans), mRxLowerLoopThread(nullptr), mTxLowerLoopThread(nullptr), + mTxPriorityQueueServiceLoopThreads(mChans), mTransmitLatency(wTransmitLatency), mRadioInterface(wRadioInterface), + mOn(false),mForceClockInterface(false), mTxFreq(0.0), mRxFreq(0.0), mTSC(0), mMaxExpectedDelayAB(0), + mMaxExpectedDelayNB(0), mWriteBurstToDiskMask(0), mVersionTRXD(mChans), mStates(mChans) { txFullScale = mRadioInterface->fullScaleInputValue(); rxFullScale = mRadioInterface->fullScaleOutputValue(); @@ -208,14 +209,7 @@ bool Transceiver::init() } mDataSockets.resize(mChans, -1); - mCtrlSockets.resize(mChans); - mTxPriorityQueueServiceLoopThreads.resize(mChans); - mRxServiceLoopThreads.resize(mChans); - - mTxPriorityQueues.resize(mChans); - mReceiveFIFO.resize(mChans); - mStates.resize(mChans); - mVersionTRXD.resize(mChans); + /* Filler table retransmissions - support only on channel 0 */ if (cfg->filler == FILLER_DUMMY) diff --git a/Transceiver52M/Transceiver.h b/Transceiver52M/Transceiver.h index 0389e603..18608845 100644 --- a/Transceiver52M/Transceiver.h +++ b/Transceiver52M/Transceiver.h @@ -148,6 +148,7 @@ public: } ChannelCombination; private: + size_t mChans; struct ctrl_msg { char data[101]; ctrl_msg() {}; @@ -218,7 +219,7 @@ struct ctrl_sock_state { /** drive handling of control messages from GSM core */ int ctrl_sock_handle_rx(int chan); - size_t mChans; + bool mOn; ///< flag to indicate that transceiver is powered on bool mForceClockInterface; ///< flag to indicate whether IND CLOCK shall be sent unconditionally after transceiver is started bool mHandover[8][8]; ///< expect handover to the timeslot/subslot diff --git a/Transceiver52M/Transceiver2.cpp b/Transceiver52M/Transceiver2.cpp index 41189dff..4430b83b 100644 --- a/Transceiver52M/Transceiver2.cpp +++ b/Transceiver52M/Transceiver2.cpp @@ -94,11 +94,10 @@ Transceiver2::Transceiver2(int wBasePort, size_t wSPS, size_t wChans, GSM::Time wTransmitLatency, RadioInterface *wRadioInterface) - : rx_sps(4), tx_sps(4), mAddr(TRXAddress), - mTransmitLatency(wTransmitLatency), - mRadioInterface(wRadioInterface), mChans(wChans), - mOn(false), mTxFreq(0.0), mRxFreq(0.0), mPower(-10), mMaxExpectedDelay(0), - mBSIC(-1) + : mChans(wChans), rx_sps(4), tx_sps(4), mAddr(TRXAddress), mTransmitLatency(wTransmitLatency), + mTxPriorityQueues(mChans), mReceiveFIFO(mChans), mRxServiceLoopThreads(mChans), + mControlServiceLoopThreads(mChans), mTxPriorityQueueServiceLoopThreads(mChans), mRadioInterface(wRadioInterface), + mOn(false), mTxFreq(0.0), mRxFreq(0.0), mPower(-10), mMaxExpectedDelay(0), mBSIC(-1), mStates(mChans) { GSM::Time startTime(random() % gHyperframe,0); @@ -142,13 +141,7 @@ bool Transceiver2::init(bool filler) return false; } - mControlServiceLoopThreads.resize(mChans); - mTxPriorityQueueServiceLoopThreads.resize(mChans); - mRxServiceLoopThreads.resize(mChans); - mTxPriorityQueues.resize(mChans); - mReceiveFIFO.resize(mChans); - mStates.resize(mChans); /* Filler table retransmissions - support only on channel 0 */ if (filler) diff --git a/Transceiver52M/Transceiver2.h b/Transceiver52M/Transceiver2.h index 913ee1ea..2826b128 100644 --- a/Transceiver52M/Transceiver2.h +++ b/Transceiver52M/Transceiver2.h @@ -81,7 +81,7 @@ struct TransceiverState { /** The Transceiver class, responsible for physical layer of basestation */ class Transceiver2 { private: - + size_t mChans; int rx_sps, tx_sps; std::string mAddr; GSM::Time mTransmitLatency; ///< latency between basestation clock and transmit deadline clock @@ -133,7 +133,7 @@ private: bool decodeSCH(SoftVector *burst, GSM::Time *time); bool correctFCCH(TransceiverState *state, signalVector *burst); - size_t mChans; + bool mOn; ///< flag to indicate that transceiver is powered on double mTxFreq; ///< the transmit frequency diff --git a/Transceiver52M/device/common/radioDevice.h b/Transceiver52M/device/common/radioDevice.h index 1e19bdc1..a2f848cf 100644 --- a/Transceiver52M/device/common/radioDevice.h +++ b/Transceiver52M/device/common/radioDevice.h @@ -172,14 +172,13 @@ class RadioDevice { const std::vector& tx_paths, const std::vector& rx_paths): tx_sps(tx_sps), rx_sps(rx_sps), iface(type), chans(chan_num), lo_offset(offset), - tx_paths(tx_paths), rx_paths(rx_paths) + tx_paths(tx_paths), rx_paths(rx_paths), m_ctr(chans) { if (iface == MULTI_ARFCN) { LOGC(DDEV, INFO) << "Multi-ARFCN: "<< chan_num << " logical chans -> 1 physical chans"; chans = 1; } - m_ctr.resize(chans); for (size_t i = 0; i < chans; i++) { memset(&m_ctr[i], 0, sizeof(m_ctr[i])); m_ctr[i].chan = i; diff --git a/Transceiver52M/device/ipc/IPCDevice.cpp b/Transceiver52M/device/ipc/IPCDevice.cpp index 2e40aa35..8bf48364 100644 --- a/Transceiver52M/device/ipc/IPCDevice.cpp +++ b/Transceiver52M/device/ipc/IPCDevice.cpp @@ -58,18 +58,12 @@ static int ipc_chan_sock_cb(struct osmo_fd *bfd, unsigned int flags); IPCDevice::IPCDevice(size_t tx_sps, size_t rx_sps, InterfaceType iface, size_t chan_num, double lo_offset, const std::vector &tx_paths, const std::vector &rx_paths) - : RadioDevice(tx_sps, rx_sps, iface, chan_num, lo_offset, tx_paths, rx_paths), tx_attenuation(), - tmp_state(IPC_IF_MSG_GREETING_REQ), shm(NULL), shm_dec(0), started(false) + : RadioDevice(tx_sps, rx_sps, iface, chan_num, lo_offset, tx_paths, rx_paths), sk_chan_state(chans, ipc_per_trx_sock_state()), + tx_attenuation(), tmp_state(IPC_IF_MSG_GREETING_REQ), shm(NULL), shm_dec(0), + rx_buffers(chans), started(false), tx_gains(chans), rx_gains(chans) { LOGC(DDEV, INFO) << "creating IPC device..."; - rx_gains.resize(chans); - tx_gains.resize(chans); - - rx_buffers.resize(chans); - - sk_chan_state.resize(chans, ipc_per_trx_sock_state()); - /* Set up per-channel Rx timestamp based Ring buffers */ for (size_t i = 0; i < rx_buffers.size(); i++) rx_buffers[i] = new smpl_buf(SAMPLE_BUF_SZ / sizeof(uint32_t)); diff --git a/Transceiver52M/radioBuffer.cpp b/Transceiver52M/radioBuffer.cpp index 62f6553f..ec868e5c 100644 --- a/Transceiver52M/radioBuffer.cpp +++ b/Transceiver52M/radioBuffer.cpp @@ -28,7 +28,7 @@ RadioBuffer::RadioBuffer(size_t numSegments, size_t segmentLen, size_t hLen, bool outDirection) - : writeIndex(0), readIndex(0), availSamples(0) + : writeIndex(0), readIndex(0), availSamples(0), segments(numSegments) { if (!outDirection) hLen = 0; @@ -36,7 +36,6 @@ RadioBuffer::RadioBuffer(size_t numSegments, size_t segmentLen, buffer = new float[2 * (hLen + numSegments * segmentLen)]; bufferLen = numSegments * segmentLen; - segments.resize(numSegments); for (size_t i = 0; i < numSegments; i++) segments[i] = &buffer[2 * (hLen + i * segmentLen)]; diff --git a/Transceiver52M/radioInterface.cpp b/Transceiver52M/radioInterface.cpp index 70f098ec..1e82eabd 100644 --- a/Transceiver52M/radioInterface.cpp +++ b/Transceiver52M/radioInterface.cpp @@ -39,9 +39,10 @@ extern "C" { RadioInterface::RadioInterface(RadioDevice *wDevice, size_t tx_sps, size_t rx_sps, size_t chans, int wReceiveOffset, GSM::Time wStartTime) - : mDevice(wDevice), mSPSTx(tx_sps), mSPSRx(rx_sps), mChans(chans), - underrun(false), overrun(false), writeTimestamp(0), readTimestamp(0), - receiveOffset(wReceiveOffset), shiftOffset(0), shiftUpdate(false), + : mSPSTx(tx_sps), mSPSRx(rx_sps), mChans(chans), mReceiveFIFO(mChans), mDevice(wDevice), + sendBuffer(mChans), recvBuffer(mChans), convertRecvBuffer(mChans), + convertSendBuffer(mChans), powerScaling(mChans), underrun(false), overrun(false), + writeTimestamp(0), readTimestamp(0), receiveOffset(wReceiveOffset), shiftOffset(0), shiftUpdate(false), mOn(false) { mClock.set(wStartTime); @@ -59,15 +60,6 @@ bool RadioInterface::init(int type) return false; } - close(); - - sendBuffer.resize(mChans); - recvBuffer.resize(mChans); - convertSendBuffer.resize(mChans); - convertRecvBuffer.resize(mChans); - mReceiveFIFO.resize(mChans); - powerScaling.resize(mChans); - for (size_t i = 0; i < mChans; i++) { sendBuffer[i] = new RadioBuffer(NUMCHUNKS, CHUNK * mSPSTx, 0, true); recvBuffer[i] = new RadioBuffer(NUMCHUNKS, CHUNK * mSPSRx, 0, false); diff --git a/Transceiver52M/radioInterface.h b/Transceiver52M/radioInterface.h index f05a3764..232bd992 100644 --- a/Transceiver52M/radioInterface.h +++ b/Transceiver52M/radioInterface.h @@ -31,6 +31,9 @@ static const unsigned gSlotLen = 148; ///< number of symbols per slot, not class RadioInterface { protected: + size_t mSPSTx; + size_t mSPSRx; + size_t mChans; Thread mAlignRadioServiceLoopThread; ///< thread that synchronizes transmit and receive sections @@ -38,10 +41,6 @@ protected: RadioDevice *mDevice; ///< the USRP object - size_t mSPSTx; - size_t mSPSRx; - size_t mChans; - std::vector sendBuffer; std::vector recvBuffer; diff --git a/Transceiver52M/radioInterfaceMulti.cpp b/Transceiver52M/radioInterfaceMulti.cpp index a4190866..7ec47a6c 100644 --- a/Transceiver52M/radioInterfaceMulti.cpp +++ b/Transceiver52M/radioInterfaceMulti.cpp @@ -44,8 +44,9 @@ extern "C" { RadioInterfaceMulti::RadioInterfaceMulti(RadioDevice *radio, size_t tx_sps, size_t rx_sps, size_t chans) : RadioInterface(radio, tx_sps, rx_sps, chans), - outerSendBuffer(NULL), outerRecvBuffer(NULL), - dnsampler(NULL), upsampler(NULL), channelizer(NULL), synthesis(NULL) + outerSendBuffer(NULL), outerRecvBuffer(NULL), history(mChans), active(MCHANS, false), + rx_freq_state(mChans), tx_freq_state(mChans), dnsampler(NULL), upsampler(NULL), channelizer(NULL), + synthesis(NULL) { } @@ -74,12 +75,12 @@ void RadioInterfaceMulti::close() for (std::vector::iterator it = history.begin(); it != history.end(); ++it) delete *it; - mReceiveFIFO.resize(0); - powerScaling.resize(0); - history.resize(0); - active.resize(0); - rx_freq_state.resize(0); - tx_freq_state.resize(0); + mReceiveFIFO.clear(); + powerScaling.clear(); + history.clear(); + active.clear(); + rx_freq_state.clear(); + tx_freq_state.clear(); RadioInterface::close(); } @@ -154,18 +155,9 @@ bool RadioInterfaceMulti::init(int type) close(); - sendBuffer.resize(mChans); - recvBuffer.resize(mChans); convertSendBuffer.resize(1); convertRecvBuffer.resize(1); - mReceiveFIFO.resize(mChans); - powerScaling.resize(mChans); - history.resize(mChans); - rx_freq_state.resize(mChans); - tx_freq_state.resize(mChans); - active.resize(MCHANS, false); - /* 4 == sps */ inchunk = RESAMP_INRATE * 4; outchunk = RESAMP_OUTRATE * 4; diff --git a/Transceiver52M/radioInterfaceResamp.cpp b/Transceiver52M/radioInterfaceResamp.cpp index d377022b..1fccaeb7 100644 --- a/Transceiver52M/radioInterfaceResamp.cpp +++ b/Transceiver52M/radioInterfaceResamp.cpp @@ -100,13 +100,6 @@ bool RadioInterfaceResamp::init(int type) close(); - sendBuffer.resize(1); - recvBuffer.resize(1); - convertSendBuffer.resize(1); - convertRecvBuffer.resize(1); - mReceiveFIFO.resize(1); - powerScaling.resize(1); - switch (type) { case RadioDevice::RESAMP_64M: resamp_inrate = RESAMP_64M_INRATE;