4g: Introduce ZMQ GnuRadio stream broker

srsENB currently creates 1 zmq stream (1 tx, 1 rx) for each cell (2 if
MIMO is enabled). Each cell transceives on a given EARFCN (and several
cells can transmit on same EARFCN).

However, for handover test purposes, we want to join all cells operating
on the same EARFCN to transceive on the same ZMQ conn, so that an srsUE
can interact with them at the same time (same as if the medium was shared).
Furthermore, we want to set different gains on each of those paths
before merging them in order to emulate RF conditions like handover.

In order to do so, a new element called the Broker is introduced, which
is placed in between ENBs and UEs ZMQ conenctions, multiplexing the
connections on the ENB side towards the UE side.

A separate process for the broker is run remotely (ENB run host) which
listens on a ctrl socket for commands. An internal Broker class is used
in osmo-gsm-tester to interact with the remote script, for instance to
configure the ports, start and stop the remote process, send commands to
it, etc.
On each ENB, when the rfemu "gnuradio_zmq" rfemu implementation is selected
in configuration, it will configure its zmq connections and the UE ones to
go over the Broker.

As a result, that means the UE zmq port configuration is expected to be
different than when no broker is in used, since there's the multiplexing
per EARFCN in between.

In this commit, only 1 ENB is supported, but multi-enb support is
planned in the future.

The handover test passes in the docker setup with this config:
"""
OSMO_GSM_TESTER_OPTS="-T -l dbg -s 4g:srsue-rftype@zmq+srsenb-rftype@zmq+" \
	"mod-enb-nprb@6+mod-enb-ncells@2+mod-enb-cells-2ca+suite-4g@10,2+" \
	"mod-enb-meas-enable -t =handover.py"
"""

and in resources.conf (or scenario), added:
"""
enb:
  ...
  cell_list:
    - dl_rfemu:
       type: gnuradio_zmq
    - dl_rfemu:
        type: gnuradio_zmq
"""

Note that since the broker is used, there's not need for mod-srsue-ncarriers@2
since the broker is joining the 2 enb cells into 1 stream on the UE side.

Change-Id: I6282cda400558dcb356276786d91e6388524c5b1
This commit is contained in:
Pau Espin 2020-10-05 19:23:38 +02:00 committed by pespin
parent 4b7c585561
commit 410912333e
8 changed files with 515 additions and 31 deletions

View File

@ -21,6 +21,7 @@ from abc import ABCMeta, abstractmethod
from ..core import log, config
from ..core import schema
from . import run_node
from .rfemu_gnuradio_zmq import GrBroker
def on_register_schemas():
resource_schema = {
@ -85,7 +86,53 @@ class eNodeB(log.Origin, metaclass=ABCMeta):
self._num_prb = 0
self._num_cells = None
self._epc = None
self._zmq_base_bind_port = None
self.gen_conf = None
self.gr_broker = None
def using_grbroker(self, cfg_values):
# whether we are to use Grbroker in between ENB and UE.
# Initial checks:
if cfg_values['enb'].get('rf_dev_type') != 'zmq':
return False
cell_list = cfg_values['enb']['cell_list']
use_match = False
notuse_match = False
for cell in cell_list:
if cell.get('dl_rfemu', False) and cell['dl_rfemu'].get('type', None) == 'gnuradio_zmq':
use_match = True
else:
notuse_match = True
if use_match and notuse_match:
raise log.Error('Some Cells are configured to use gnuradio_zmq and some are not, unsupported')
return use_match
def calc_required_zmq_ports(self, cfg_values):
cell_list = cfg_values['enb']['cell_list']
return len(cell_list) * self.num_ports() # *2 if MIMO
def calc_required_zmq_ports_joined_earfcn(self, cfg_values):
#gr_broker will join the earfcns, so we need to count uniqe earfcns:
cell_list = cfg_values['enb']['cell_list']
earfcn_li = []
[earfcn_li.append(int(cell['dl_earfcn'])) for cell in cell_list if int(cell['dl_earfcn']) not in earfcn_li]
return len(earfcn_li) * self.num_ports() # *2 if MIMO
def assign_enb_zmq_ports(self, cfg_values, port_name, base_port):
port_offset = 0
cell_list = cfg_values['enb']['cell_list']
for cell in cell_list:
cell[port_name] = base_port + port_offset
port_offset += self.num_ports()
# TODO: do we need to assign cell_list back?
def assign_enb_zmq_ports_joined_earfcn(self, cfg_values, port_name, base_port):
# TODO: Set in cell one bind port per unique earfcn, this is where UE will connect to when we use grbroker.
cell_list = cfg_values['enb']['cell_list']
earfcn_li = []
[earfcn_li.append(int(cell['dl_earfcn'])) for cell in cell_list if int(cell['dl_earfcn']) not in earfcn_li]
for cell in cell_list:
cell[port_name] = base_port + earfcn_li.index(int(cell['dl_earfcn'])) * self.num_ports()
def configure(self, config_specifics_li):
values = dict(enb=config.get_defaults('enb'))
@ -127,6 +174,30 @@ class eNodeB(log.Origin, metaclass=ABCMeta):
scell_list_new.append(scell_id)
values['enb']['cell_list'][i]['scell_list'] = scell_list_new
# Assign ZMQ ports to each Cell/EARFCN.
if values['enb'].get('rf_dev_type') == 'zmq':
resourcep = self.testenv.suite().resource_pool()
num_ports = self.calc_required_zmq_ports(values)
num_ports_joined_earfcn = self.calc_required_zmq_ports_joined_earfcn(values)
ue_bind_port = self.ue.zmq_base_bind_port()
enb_bind_port = resourcep.next_zmq_port_range(self, num_ports)
self.assign_enb_zmq_ports(values, 'zmq_enb_bind_port', enb_bind_port)
# If we are to use a GrBroker, then initialize here to have remote zmq ports available:
if self.using_grbroker(values):
zmq_enb_peer_port = resourcep.next_zmq_port_range(self, num_ports)
self.assign_enb_zmq_ports(values, 'zmq_enb_peer_port', zmq_enb_peer_port) # These are actually bound to GrBroker
self.assign_enb_zmq_ports_joined_earfcn(values, 'zmq_ue_bind_port', ue_bind_port) # This is were GrBroker binds on the UE side
zmq_ue_peer_port = resourcep.next_zmq_port_range(self, num_ports_joined_earfcn)
self.assign_enb_zmq_ports_joined_earfcn(values, 'zmq_ue_peer_port', zmq_ue_peer_port) # This is were GrBroker binds on the UE side
# Already set gen_conf here in advance since gr_broker needs the cell list
self.gen_conf = values
self.gr_broker = GrBroker.ref()
self.gr_broker.handle_enb(self)
else:
self.assign_enb_zmq_ports(values, 'zmq_enb_peer_port', ue_bind_port)
self.assign_enb_zmq_ports(values, 'zmq_ue_bind_port', ue_bind_port) #If no broker we need to match amount of ports
self.assign_enb_zmq_ports(values, 'zmq_ue_peer_port', enb_bind_port)
return values
def id(self):
@ -145,14 +216,13 @@ class eNodeB(log.Origin, metaclass=ABCMeta):
########################
def cleanup(self):
'Nothing to do by default. Subclass can override if required.'
pass
if self.gr_broker:
GrBroker.unref()
self.gr_broker = None
def num_prb(self):
return self._num_prb
def zmq_base_bind_port(self):
return self._zmq_base_bind_port
#reference: srsLTE.git srslte_symbol_sz()
def num_prb2symbol_sz(self, num_prb):
if num_prb == 6:
@ -168,24 +238,50 @@ class eNodeB(log.Origin, metaclass=ABCMeta):
def num_prb2base_srate(self, num_prb):
return self.num_prb2symbol_sz(num_prb) * 15 * 1000
def get_zmq_rf_dev_args(self):
def get_zmq_rf_dev_args(self, cfg_values):
base_srate = self.num_prb2base_srate(self.num_prb())
if self._zmq_base_bind_port is None:
self._zmq_base_bind_port = self.testenv.suite().resource_pool().next_zmq_port_range(self, 4)
ue_base_port = self.ue.zmq_base_bind_port()
if self.gr_broker:
ul_rem_addr = self.addr()
else:
ul_rem_addr = self.ue.addr()
rf_dev_args = 'fail_on_disconnect=true'
idx = 0
cell_list = cfg_values['enb']['cell_list']
# Define all 8 possible RF ports (2x CA with 2x2 MIMO)
rf_dev_args = 'fail_on_disconnect=true' \
+ ',tx_port0=tcp://' + self.addr() + ':' + str(self._zmq_base_bind_port + 0) \
+ ',tx_port1=tcp://' + self.addr() + ':' + str(self._zmq_base_bind_port + 1) \
+ ',tx_port2=tcp://' + self.addr() + ':' + str(self._zmq_base_bind_port + 2) \
+ ',tx_port3=tcp://' + self.addr() + ':' + str(self._zmq_base_bind_port + 3) \
+ ',rx_port0=tcp://' + self.ue.addr() + ':' + str(ue_base_port + 0) \
+ ',rx_port1=tcp://' + self.ue.addr() + ':' + str(ue_base_port + 1) \
+ ',rx_port2=tcp://' + self.ue.addr() + ':' + str(ue_base_port + 2) \
+ ',rx_port3=tcp://' + self.ue.addr() + ':' + str(ue_base_port + 3)
for cell in cell_list:
rf_dev_args += ',tx_port%u=tcp://%s:%u' %(idx, self.addr(), cell['zmq_enb_bind_port'] + 0)
if self.num_ports() > 1:
rf_dev_args += ',tx_port%u=tcp://%s:%u' %(idx + 1, self.addr(), cell['zmq_enb_bind_port'] + 1)
rf_dev_args += ',rx_port%u=tcp://%s:%u' %(idx, ul_rem_addr, cell['zmq_enb_peer_port'] + 0)
if self.num_ports() > 1:
rf_dev_args += ',rx_port%u=tcp://%s:%u' %(idx + 1, ul_rem_addr, cell['zmq_enb_peer_port'] + 1)
idx += self.num_ports()
rf_dev_args += ',id=enb,base_srate=' + str(base_srate)
return rf_dev_args
def get_zmq_rf_dev_args_for_ue(self, ue):
cell_list = self.gen_conf['enb']['cell_list']
rf_dev_args = ''
idx = 0
earfcns_done = []
for cell in cell_list:
if self.gr_broker:
if cell['dl_earfcn'] in earfcns_done:
continue
earfcns_done.append(cell['dl_earfcn'])
rf_dev_args += ',tx_port%u=tcp://%s:%u' %(idx, ue.addr(), cell['zmq_ue_bind_port'] + 0)
if self.num_ports() > 1:
rf_dev_args += ',tx_port%u=tcp://%s:%u' %(idx + 1, ue.addr(), cell['zmq_ue_bind_port'] + 1)
rf_dev_args += ',rx_port%u=tcp://%s:%u' %(idx, self.addr(), cell['zmq_ue_peer_port'] + 0)
if self.num_ports() > 1:
rf_dev_args += ',rx_port%u=tcp://%s:%u' %(idx + 1, self.addr(), cell['zmq_ue_peer_port'] + 1)
idx += self.num_ports()
# remove trailing comma:
if rf_dev_args[0] == ',':
return rf_dev_args[1:]
return rf_dev_args
def get_instance_by_type(testenv, conf):

View File

@ -100,6 +100,8 @@ class AmarisoftENB(enb.eNodeB):
self.rem_host.scpfrom('scp-back-phy-signal-log', self.remote_phy_signal_file, self.phy_signal_file)
except Exception as e:
self.log(repr(e))
# Clean up for parent class:
super().cleanup()
def start(self, epc):
self.log('Starting AmarisoftENB')
@ -173,7 +175,7 @@ class AmarisoftENB(enb.eNodeB):
# We need to set some specific variables programatically here to match IP addresses:
if self._conf.get('rf_dev_type') == 'zmq':
base_srate = self.num_prb2base_srate(self.num_prb())
rf_dev_args = self.get_zmq_rf_dev_args()
rf_dev_args = self.get_zmq_rf_dev_args(values)
config.overlay(values, dict(enb=dict(sample_rate = base_srate / (1000*1000),
rf_dev_args = rf_dev_args)))

View File

@ -100,6 +100,8 @@ class srsENB(enb.eNodeB, srslte_common):
# Collect KPIs for each TC
self.testenv.test().set_kpis(self.get_kpis())
# Clean up for parent class:
super().cleanup()
def start(self, epc):
self.log('Starting srsENB')
@ -198,7 +200,7 @@ class srsENB(enb.eNodeB, srslte_common):
# We need to set some specific variables programatically here to match IP addresses:
if self._conf.get('rf_dev_type') == 'zmq':
rf_dev_args = self.get_zmq_rf_dev_args()
rf_dev_args = self.get_zmq_rf_dev_args(values)
config.overlay(values, dict(enb=dict(rf_dev_args=rf_dev_args)))
# Set UHD frame size as a function of the cell bandwidth on B2XX
@ -258,7 +260,7 @@ class srsENB(enb.eNodeB, srslte_common):
rfemu_cfg = cell_list[cell].get('dl_rfemu', None)
if rfemu_cfg is None:
raise log.Error('rfemu attribute not found in cell_list item!')
if rfemu_cfg['type'] == 'srsenb_stdin':
if rfemu_cfg['type'] == 'srsenb_stdin' or rfemu_cfg['type'] == 'gnuradio_zmq':
# These fields are required so the rfemu class can interact with us:
config.overlay(rfemu_cfg, dict(enb=self,
cell_id=cell_list[cell]['cell_id']))

View File

@ -270,16 +270,9 @@ class srsUE(MS, srslte_common):
# We need to set some specific variables programatically here to match IP addresses:
if self._conf.get('rf_dev_type') == 'zmq':
base_srate = num_prb2base_srate(self.enb.num_prb())
# Define all 8 possible RF ports (2x CA with 2x2 MIMO)
enb_base_port = self.enb.zmq_base_bind_port()
rf_dev_args = 'tx_port0=tcp://' + self.addr() + ':' + str(self._zmq_base_bind_port + 0) \
+ ',tx_port1=tcp://' + self.addr() + ':' + str(self._zmq_base_bind_port + 1) \
+ ',tx_port2=tcp://' + self.addr() + ':' + str(self._zmq_base_bind_port + 2) \
+ ',tx_port3=tcp://' + self.addr() + ':' + str(self._zmq_base_bind_port + 3) \
+ ',rx_port0=tcp://' + self.enb.addr() + ':' + str(enb_base_port + 0) \
+ ',rx_port1=tcp://' + self.enb.addr() + ':' + str(enb_base_port + 1) \
+ ',rx_port2=tcp://' + self.enb.addr() + ':' + str(enb_base_port + 2) \
+ ',rx_port3=tcp://' + self.enb.addr() + ':' + str(enb_base_port + 3)
rf_dev_args = self.enb.get_zmq_rf_dev_args_for_ue(self)
if self.num_carriers == 1:
# Single carrier

View File

@ -55,6 +55,9 @@ def get_instance_by_type(rfemu_type, rfemu_opt):
elif rfemu_type == 'srsenb_stdin':
from .rfemu_srsenb_stdin import RFemulationSrsStdin
obj = RFemulationSrsStdin
elif rfemu_type == 'gnuradio_zmq':
from .rfemu_gnuradio_zmq import RFemulationGnuradioZmq
obj = RFemulationGnuradioZmq
else:
raise log.Error('RFemulation type not supported:', rfemu_type)

View File

@ -0,0 +1,198 @@
# osmo_gsm_tester: class defining a RF emulation object implemented using SRS ENB stdin interface
#
# Copyright (C) 2020 by sysmocom - s.f.m.c. GmbH
#
# Author: Pau Espin Pedrol <pespin@sysmocom.de>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import socket
from ..core import log
from ..core import util
from ..core import process
from ..core import remote
from ..core.event_loop import MainLoop
from .rfemu import RFemulation
class GrBroker(log.Origin):
# static fields:
refcount = 0
instance = None
def __init__(self):
super().__init__(log.C_RUN, 'zmq_gr_broker')
self.process = None
self.ctrl_port = 5005
self.run_dir = None
self.rem_host = None
self.cfg = None
self.enb = None
self.addr = None
self.ctrl_sk = None
@staticmethod
def ref():
if GrBroker.refcount == 0:
GrBroker.instance = GrBroker()
GrBroker.refcount = GrBroker.refcount + 1
return GrBroker.instance
@staticmethod
def unref():
GrBroker.refcount = GrBroker.refcount - 1
if GrBroker.refcount == 0:
GrBroker.instance.cleanup()
GrBroker.instance = None
def cleanup(self):
if self.ctrl_sk is not None:
self.cmd_exit()
self.ctrl_sk.close()
self.ctrl_sk = None
self.enb = None
self.testenv = None
def handle_enb(self, enb):
self.enb = enb
self.addr = self.enb.addr()
self.testenv = self.enb.testenv
self.cfg = self.gen_json(enb)
# FIXME: we may need to delay this somehow if we want to support several ENBs
self.start()
self.setup()
def gen_json_enb(self, enb):
res = []
cell_list = enb.gen_conf['enb']['cell_list']
for cell in cell_list:
# TODO: probably add enb_id, cell_id to support several ENB
data = {'earfcn': int(cell['dl_earfcn']),
'bind_port': int(cell['zmq_enb_peer_port']),
'peer_addr': enb.addr(),
'peer_port': int(cell['zmq_enb_bind_port']),
'use_mimo': True if enb.num_ports() > 1 else False
}
res.append(data)
return res
def gen_json_ue(self, enb):
res = {}
res = []
earfcns_done = []
cell_list = enb.gen_conf['enb']['cell_list']
for cell in cell_list:
data = {}
if int(cell['dl_earfcn']) in earfcns_done:
continue
earfcns_done.append(int(cell['dl_earfcn']))
data = {'earfcn': int(cell['dl_earfcn']),
'bind_port': int(cell['zmq_ue_peer_port']),
'peer_addr': enb.ue.addr(),
'peer_port': int(cell['zmq_ue_bind_port']),
'use_mimo': True if enb.num_ports() > 1 else False
}
res.append(data)
return res
def gen_json(self, enb):
res = {'enb': [self.gen_json_enb(enb)],
'ue': [self.gen_json_ue(enb)]}
return res
def start(self):
self.run_dir = util.Dir(self.testenv.test().get_run_dir().new_dir(self.name()))
args = ('osmo-gsm-tester_zmq_broker.py',
'-c', str(self.ctrl_port),
'-b', self.enb.addr())
if self.enb._run_node.is_local():
self.process = process.Process(self.name(), self.run_dir, args)
else:
self.rem_host = remote.RemoteHost(self.run_dir, self.enb._run_node.ssh_user(), self.enb._run_node.ssh_addr())
self.process = self.rem_host.RemoteProcessSafeExit('zmq_gr_broker', util.Dir('/tmp/ogt_%s' % self.name()), args, wait_time_sec=7)
self.testenv.remember_to_stop(self.process)
self.process.launch()
def setup(self):
self.dbg('waiting for gr script to be available...')
MainLoop.sleep(5)
self.ctrl_sk = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
buf = json.dumps(self.cfg)
self.send_cmd(buf)
def send_cmd(self, str_buf):
self.dbg('sending cmd: "%s"' % str_buf)
self.ctrl_sk.sendto(str_buf.encode('utf-8'), (self.addr, self.ctrl_port))
def cmd_set_relative_gain_on_local_port(self, port, rel_gain):
d = { 'action': 'set_relative_gain',
'port': port,
'rel_gain': rel_gain
}
buf = json.dumps(d)
self.send_cmd(buf)
def cmd_exit(self):
d = { 'action': 'exit' }
buf = json.dumps(d)
self.send_cmd(buf)
class RFemulationGnuradioZmq(RFemulation):
##############
# PROTECTED
##############
def __init__(self, conf):
super().__init__(conf, 'gnuradio_zmq')
self.broker = None
self.ctrl_port = 5005
self.cell_id = int(conf.get('cell_id'))
if self.cell_id is None:
raise log.Error('No "cell_id" attribute provided in rfemu conf!')
self.enb = conf.get('enb')
if self.enb is None:
raise log.Error('No "srsenb" attribute provided in rfemu conf!')
self.set_name('%s_%s_%d' % (self.name(), self.enb.name(), self.cell_id))
self.testenv = self.enb.testenv
self.configure()
def __del__(self):
if self.broker:
self.broker.unref()
self.broker = None
self.enb = None
self.testenv = None
def configure(self):
self.broker = GrBroker.ref()
#############################
# PUBLIC (test API included)
#############################
def set_attenuation(self, db):
for cell in self.enb.gen_conf['enb']['cell_list']:
if int(cell['cell_id']) == self.cell_id:
max_att_db = self.get_max_attenuation()
self.broker.cmd_set_relative_gain_on_local_port(cell['zmq_enb_peer_port'], (max_att_db - db)/max_att_db)
break
def get_max_attenuation(self):
return 12 # maximum cell_gain value in srs. Is this correct value?
# vim: expandtab tabstop=4 shiftwidth=4

View File

@ -10,7 +10,7 @@ modifiers:
ncell_list: [0x01]
- cell_id: 0x01
pci: 0x02
dl_earfcn: 3050
dl_earfcn: 2850
rf_port: 1
scell_list: [0x00]
ncell_list: [0x00]

View File

@ -0,0 +1,190 @@
#!/usr/bin/env python2
from distutils.version import StrictVersion
from gnuradio.fft import window
from gnuradio import blocks
from gnuradio import gr
from gnuradio.filter import firdes
import sys
import json
from argparse import ArgumentParser
from gnuradio.eng_arg import eng_float, intx
from gnuradio import eng_notation
from gnuradio import zeromq
import socket
import argparse
from signal import *
class GrBroker(gr.top_block):
def __init__(self, args, cfg):
gr.top_block.__init__(self, "Intra Handover Flowgraph")
##################################################
# Variables
##################################################
self.args = args
self.cfg = cfg
self.samp_rate = samp_rate = 23040000
self.relative_gain = relative_gain = 1.0
self.blocks_add = {}
##################################################
# Blocks
##################################################
# Build ENB side + connect to per stream multilier:
for enb in self.cfg['enb']:
for it in enb:
source_addr = 'tcp://%s:%u' % (it['peer_addr'].encode('utf-8'), it['peer_port'])
sink_addr = 'tcp://%s:%u' % (args.bind_addr, it['bind_port'])
print('enb: earfcn=%u source=%r sink=%r' % (it['earfcn'], source_addr, sink_addr))
it['gr_block_zmq_source'] = zeromq.req_source(gr.sizeof_gr_complex, 1, source_addr, 100, False, -1)
it['gr_block_zmq_sink'] = zeromq.rep_sink(gr.sizeof_gr_complex, 1, sink_addr, 100, False, -1)
it['gr_block_multiply'] = blocks.multiply_const_cc(relative_gain)
it['gr_block_multiply'].set_block_alias('relative_gain %s' % source_addr)
self.connect((it['gr_block_zmq_source'], 0), (it['gr_block_multiply'], 0))
if it['use_mimo']:
source_addr = 'tcp://%s:%u' % (it['peer_addr'].encode('utf-8'), it['peer_port'] + 1)
sink_addr = 'tcp://%s:%u' % (args.bind_addr, it['bind_port'] + 1)
print('enb: earfcn=%u source=%r sink=%r (MIMO)' % (it['earfcn'], source_addr, sink_addr))
it['gr_block_zmq_source2'] = zeromq.req_source(gr.sizeof_gr_complex, 1, source_addr, 100, False, -1)
it['gr_block_zmq_sink2'] = zeromq.rep_sink(gr.sizeof_gr_complex, 1, sink_addr, 100, False, -1)
it['gr_block_multiply2'] = blocks.multiply_const_cc(relative_gain)
it['gr_block_multiply2'].set_block_alias('relative_gain %s' % source_addr)
self.connect((it['gr_block_zmq_source2'], 0), (it['gr_block_multiply2'], 0))
# Build UE side:
for ue in self.cfg['ue']:
for it in ue:
source_addr = 'tcp://%s:%u' % (it['peer_addr'].encode('utf-8'), it['peer_port'])
sink_addr = 'tcp://%s:%u' % (args.bind_addr, it['bind_port'])
print('ue: earfcn=%u source=%r sink=%r' % (it['earfcn'], source_addr, sink_addr))
it['gr_block_zmq_source'] = zeromq.req_source(gr.sizeof_gr_complex, 1, source_addr, 100, False, -1)
it['gr_block_zmq_sink'] = zeromq.rep_sink(gr.sizeof_gr_complex, 1, sink_addr, 100, False, -1)
if it['use_mimo']:
source_addr = 'tcp://%s:%u' % (it['peer_addr'].encode('utf-8'), it['peer_port'] + 1)
sink_addr = 'tcp://%s:%u' % (args.bind_addr, it['bind_port'] + 1)
print('ue: earfcn=%u source=%r sink=%r (MIMO)' % (it['earfcn'], source_addr, sink_addr))
it['gr_block_zmq_source2'] = zeromq.req_source(gr.sizeof_gr_complex, 1, source_addr, 100, False, -1)
it['gr_block_zmq_sink2'] = zeromq.rep_sink(gr.sizeof_gr_complex, 1, sink_addr, 100, False, -1)
# Create per EARFCN adder (only 2->1 supported so far)
earfcn_li = self.calc_earfcn_list()
blocks_add_next_avail_port = {}
for earfcn in earfcn_li:
self.blocks_add[earfcn] = blocks.add_vcc(1)
blocks_add_next_avail_port[earfcn] = 0
# Connect the ENB-side multipliers to the Adder input ports:
idx = 0
for enb in self.cfg['enb']:
for it in enb:
print('Connecting ENB port %u to Adder[%u] for earfcn %u' % (it['bind_port'], blocks_add_next_avail_port[earfcn], it['earfcn']))
self.connect((it['gr_block_multiply'], 0), (self.blocks_add[it['earfcn']], blocks_add_next_avail_port[earfcn]))
# TODO: if it['use_mimo'], connect it['gr_block_multiply2'] to some adder...
blocks_add_next_avail_port[earfcn] += 1
# Connect the Adder to the UE-side (Dl):
for earfcn, bl_add in self.blocks_add.items():
for ue in self.cfg['ue']:
for it in ue:
if it['earfcn'] != earfcn:
continue
print('Connecting Adder for earfcn %u to UE port %u' % (earfcn, it['bind_port']))
self.connect((bl_add, 0), (it['gr_block_zmq_sink'], 0))
# TODO: if it['use_mimo'], connect some adder to it['gr_block_zmq_sink2']...
# UL: Connect 1 UE port splitting it into N ENB ports:
for ue in self.cfg['ue']:
for it_ue in ue:
for enb in self.cfg['enb']:
for it_enb in enb:
if it_ue['earfcn'] != it_enb['earfcn']:
continue
print('connecting UE port %u to ENB port %u, earfcn=%u' % (it_ue['bind_port'], it_enb['bind_port'], it_enb['earfcn']))
self.connect((it_ue['gr_block_zmq_source'], 0), (it_enb['gr_block_zmq_sink'], 0))
if it_ue['use_mimo'] and it_enb['use_mimo']:
self.connect((it_ue['gr_block_zmq_source2'], 0), (it_enb['gr_block_zmq_sink2'], 0))
def calc_earfcn_list(self):
earfcn_li = []
for enb in self.cfg['enb']:
for it in enb:
if it['earfcn'] not in earfcn_li:
earfcn_li.append(it['earfcn'])
return earfcn_li
def set_relative_gain(self, port, relative_gain):
for enb in self.cfg['enb']:
for it in enb:
if it['bind_port'] == port:
print('setting port %u rel_gain to %f' % (port, relative_gain))
it['gr_block_multiply'].set_k(relative_gain)
return
def mainloop(sock, broker):
while True:
chunk = sock.recv(4096)
stringdata = chunk.decode('utf-8')
msg = json.loads(stringdata)
print('Received msg: %s' % msg)
if msg['action'] == 'exit':
print('Received exit command. Stopping radio...')
return
elif msg['action'] == 'set_relative_gain':
broker.set_relative_gain(msg['port'], msg['rel_gain'])
else:
print('Unknwon action for message: %s' % msg)
def sig_handler_cleanup(signum, frame):
print("killed by signal %d" % signum)
# This sys.exit() will raise a SystemExit base exception at the current
# point of execution. Code must be prepared to clean system-wide resources
# by using the "finally" section. This allows at the end 'atexit' hooks to
# be called before exiting.
sys.exit(1)
def main():
for sig in (SIGINT, SIGTERM, SIGQUIT, SIGPIPE, SIGHUP):
signal(sig, sig_handler_cleanup)
parser = argparse.ArgumentParser()
parser.add_argument('-b', '--bind-addr', dest='bind_addr', help="Address where local sockets are bound to")
parser.add_argument('-c', '--ctrl-port', dest='ctrl_port', type=int, default=5005, help="Port where CTRL interface is bound to")
args = parser.parse_args()
print('bind_addr:', repr(args.bind_addr))
print('ctrl_port:', repr(args.ctrl_port))
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((args.bind_addr, args.ctrl_port))
broker = None
try:
print('waiting for configuration on UDP socket...')
chunk = sock.recv(4096)
print('Received udp packet')
stringdata = chunk.decode('utf-8')
cfg = json.loads(stringdata)
print('Got config:', stringdata)
broker = GrBroker(args, cfg)
print('Starting...')
broker.start()
print('in mainloop')
mainloop(sock, broker)
except KeyboardInterrupt:
pass
print('main loop ended, exiting...')
# closing flowgraph and socket
sock.close()
if broker:
broker.stop()
broker.wait()
if __name__ == '__main__':
main()
print("exit")