osmo-gsm-tester/src/osmo_gsm_tester/core/schema.py

444 lines
14 KiB
Python
Raw Normal View History

# osmo_gsm_tester: validate dict structures
#
# Copyright (C) 2016-2017 by sysmocom - s.f.m.c. GmbH
#
# Author: Neels Hofmeyr <neels@hofmeyr.de>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
import os
from . import log
from . import util
KEY_RE = re.compile('[a-zA-Z0-9][a-zA-Z0-9_]*')
IPV4_RE = re.compile('([0-9]{1,3}.){3}[0-9]{1,3}')
HWADDR_RE = re.compile('([0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2}')
IMSI_RE = re.compile('[0-9]{6,15}')
KI_RE = re.compile('[0-9a-fA-F]{32}')
OPC_RE = re.compile('[0-9a-fA-F]{32}')
MSISDN_RE = re.compile('[0-9]{1,15}')
def match_re(name, regex, val):
while True:
if not isinstance(val, str):
break;
if not regex.fullmatch(val):
break;
return True
raise ValueError('Invalid %s: %r' % (name, val))
def band(val):
if val in ('GSM-900', 'GSM-1800', 'GSM-1900'):
return True
raise ValueError('Unknown GSM band: %r' % val)
def ipv4(val):
match_re('IPv4 address', IPV4_RE, val)
els = [int(el) for el in val.split('.')]
if not all([el >= 0 and el <= 255 for el in els]):
raise ValueError('Invalid IPv4 address: %r' % val)
return True
def hwaddr(val):
return match_re('hardware address', HWADDR_RE, val)
def imsi(val):
return match_re('IMSI', IMSI_RE, val)
def ki(val):
return match_re('KI', KI_RE, val)
def opc(val):
return match_re('OPC', OPC_RE, val)
def msisdn(val):
return match_re('MSISDN', MSISDN_RE, val)
def auth_algo(val):
if val not in util.ENUM_OSMO_AUTH_ALGO:
raise ValueError('Unknown Authentication Algorithm: %r' % val)
return True
def uint(val):
n = int(val)
if n < 0:
raise ValueError('Positive value expected instead of %d' % n)
return True
def uint8(val):
n = int(val)
if n < 0:
raise ValueError('Positive value expected instead of %d' % n)
if n > 255: # 2^8 - 1
raise ValueError('Value %d too big, max value is 255' % n)
return True
def uint16(val):
n = int(val)
if n < 0:
raise ValueError('Positive value expected instead of %d' % n)
if n > 65535: # 2^16 - 1
raise ValueError('Value %d too big, max value is 65535' % n)
return True
def bool_str(val):
# str2bool will raise an exception if unable to parse it
util.str2bool(val)
return True
def times(val):
n = int(val)
if n < 1:
raise ValueError('Positive value >0 expected instead of %d' % n)
return True
def cipher(val):
if val in ('a5_0', 'a5_1', 'a5_2', 'a5_3', 'a5_4', 'a5_5', 'a5_6', 'a5_7'):
return True
raise ValueError('Unknown Cipher value: %r' % val)
def modem_feature(val):
Introduce Android UEs as new modems To expand the test capacities we would like to introduce Android UEs as new modems. Currently the following tests are supported: - Ping - iPerf3 DL/UL - RRC Mobile MT Ping In the following is a small description. Prerequisites: - Android UE - Rooted (Ping, iPerf, RRC Idle MT Ping) - Qualcomm baseband with working diag_mdlog (RRC Idle MT Ping) - iPerf3 - Dropbear - OGT Slave Unit - Android SDK Platform-Tools (https://developer.android.com/studio/releases/platform-tools#downloads) - Pycrate (https://github.com/P1sec/pycrate) - SCAT clone https://github.com/bedrankara/scat/ & install dependencies checkout branch ogt symlink scat (ln -s ~/scat/scat.py /usr/local/bin/scat) Infrastructure explaination: The Android UEs are connected to the OGT Units via USB. We activate tethering and set up a SSH server (with Dropbear). We chose tethering over WiFi to have a more stable route for the ssh connection. We forward incoming connections to the OGT unit hosting the Android UE(s) on specific ports to the UEs via iptables. This enables OGT to issue commands directly to the UEs. In case of local execution we use ADB to issue commands to the AndroidUE. The set up was tested with 5 Android UEs connected in parallel but it should be scalable to the number of available IPs in the respective subnet. Furthermore, we need to cross compile Dropbear and iPerf3 to use them on the UEs. These tools have to be added to the $PATH variable of the UEs. Examplary set up: In this example we have two separate OGT units (master and slave) and two Android UEs that are connected to the slave unit. An illustration may be found here: https://ibb.co/6BXSP2C On UE 1: ip address add 192.168.42.130/24 dev rndis0 ip route add 192.168.42.0/24 dev rndis0 table local_network dropbearmulti dropbear -F -E -p 130 -R -T /data/local/tmp/authorized_keys -U 0 -G 0 -N root -A On UE 2: ip address add 192.168.42.131/24 dev rndis0 ip route add 192.168.42.0/24 dev rndis0 table local_network dropbearmulti dropbear -F -E -p 131 -R -T /data/local/tmp/authorized_keys -U 0 -G 0 -N root -A On OGT slave unit: sudo ip link add name ogt type bridge sudo ip l set eth0 master ogt sudo ip l set enp0s20f0u1 master ogt sudo ip l set enp0s20f0u2 master ogt sudo ip a a 192.168.42.1/24 dev ogt sudo ip link set ogt up Now we have to manually connect to every UE from OGT Master to set up SSH keys and verify that the setup works. Therefore, use: ssh -p [UE-PORT] root@[OGT SLAVE UNIT's IP] Finally, to finish the setup procedure create the remote_run_dir for Android UEs on the slave unit like following: mkdir /osmo-gsm-tester-androidue chown jenkins /osmo-gsm-tester-androidue Example for modem in resource.conf: - label: mi5g type: androidue imsi: '901700000034757' ki: '85E9E9A947B9ACBB966ED7113C7E1B8A' opc: '3E1C73A29B9C293DC5A763E42C061F15' apn: apn: 'srsapn' mcc: '901' mnc: '70' select: 'True' auth_algo: 'milenage' features: ['4g', 'dl_qam256', 'qc_diag'] run_node: run_type: ssh run_addr: 100.113.1.170 ssh_user: jenkins ssh_addr: 100.113.1.170 ue_ssh_port: 130 adb_serial_id: '8d3c79a7' scat_parser: run_type: local run_addr: 127.0.0.1 adb_serial_id: '8d3c79a7' Example for default-suites.conf: - 4g:ms-label@mi5g+srsenb-rftype@uhd+mod-enb-nprb@25+mod-enb-txmode@1 Change-Id: I79a5d803e869a868d4dac5e0d4c2feb38038dc5c
2020-11-23 13:45:15 +00:00
if val in ('sms', 'gprs', 'voice', 'ussd', 'sim', '2g', '3g', '4g', 'dl_qam256', 'ul_qam64', 'qc_diag'):
return True
raise ValueError('Unknown Modem Feature: %r' % val)
def phy_channel_config(val):
if val in ('CCCH', 'CCCH+SDCCH4', 'TCH/F', 'TCH/H', 'SDCCH8', 'PDCH',
'TCH/F_PDCH', 'CCCH+SDCCH4+CBCH', 'SDCCH8+CBCH','TCH/F_TCH/H_PDCH'):
return True
raise ValueError('Unknown Physical channel config: %r' % val)
def channel_allocator(val):
if val in ('ascending', 'descending'):
return True
raise ValueError('Unknown Channel Allocator Policy %r' % val)
def gprs_mode(val):
if val in ('none', 'gprs', 'egprs'):
return True
raise ValueError('Unknown GPRS mode %r' % val)
def codec(val):
if val in ('hr1', 'hr2', 'hr3', 'fr1', 'fr2', 'fr3'):
return True
raise ValueError('Unknown Codec value: %r' % val)
def osmo_trx_clock_ref(val):
if val in ('internal', 'external', 'gspdo'):
return True
raise ValueError('Unknown OsmoTRX clock reference value: %r' % val)
def lte_transmission_mode(val):
n = int(val)
if n <= 4:
return True
raise ValueError('LTE Transmission Mode %d not in expected range' % n)
def duration(val):
if val.isdecimal() or val.endswith('m') or val.endswith('h'):
return True
raise ValueError('Invalid duration value: %r' % val)
INT = 'int'
STR = 'str'
UINT = 'uint'
BOOL_STR = 'bool_str'
BAND = 'band'
IPV4 = 'ipv4'
HWADDR = 'hwaddr'
IMSI = 'imsi'
KI = 'ki'
OPC = 'opc'
MSISDN = 'msisdn'
AUTH_ALGO = 'auth_algo'
TIMES='times'
CIPHER = 'cipher'
MODEM_FEATURE = 'modem_feature'
PHY_CHAN = 'chan'
CHAN_ALLOCATOR = 'chan_allocator'
GPRS_MODE = 'gprs_mode'
CODEC = 'codec'
OSMO_TRX_CLOCK_REF = 'osmo_trx_clock_ref'
LTE_TRANSMISSION_MODE = 'lte_transmission_mode'
DURATION = 'duration'
SCHEMA_TYPES = {
INT: int,
STR: str,
UINT: uint,
BOOL_STR: bool_str,
BAND: band,
IPV4: ipv4,
HWADDR: hwaddr,
IMSI: imsi,
KI: ki,
OPC: opc,
MSISDN: msisdn,
AUTH_ALGO: auth_algo,
TIMES: times,
CIPHER: cipher,
MODEM_FEATURE: modem_feature,
PHY_CHAN: phy_channel_config,
CHAN_ALLOCATOR: channel_allocator,
GPRS_MODE: gprs_mode,
CODEC: codec,
OSMO_TRX_CLOCK_REF: osmo_trx_clock_ref,
LTE_TRANSMISSION_MODE: lte_transmission_mode,
DURATION: duration,
}
def add(dest, src):
if util.is_dict(dest):
if not util.is_dict(src):
raise ValueError('cannot add to dict a value of type: %r' % type(src))
for key, val in src.items():
dest_val = dest.get(key)
if dest_val is None:
dest[key] = val
else:
log.ctx(key=key)
add(dest_val, val)
return
if util.is_list(dest):
if not util.is_list(src):
raise ValueError('cannot add to list a value of type: %r' % type(src))
dest.extend(src)
return
if dest == src:
return
raise ValueError('cannot add dicts, conflicting items (values %r and %r)'
% (dest, src))
def combine(dest, src):
if util.is_dict(dest):
if not util.is_dict(src):
raise ValueError('cannot combine dict with a value of type: %r' % type(src))
for key, val in src.items():
log.ctx(key=key)
dest_val = dest.get(key)
if dest_val is None:
dest[key] = val
else:
combine(dest_val, val)
return
if util.is_list(dest):
if not util.is_list(src):
raise ValueError('cannot combine list with a value of type: %r' % type(src))
# Validate that all elements in both lists are of the same type:
t = util.list_validate_same_elem_type(src + dest)
if t is None:
return # both lists are empty, return
# For lists of complex objects, we expect them to be sorted lists:
if t in (dict, list, tuple):
for i in range(len(dest)):
log.ctx(idx=i)
src_it = src[i] if i < len(src) else util.empty_instance_type(t)
combine(dest[i], src_it)
for i in range(len(dest), len(src)):
log.ctx(idx=i)
dest.append(src[i])
else: # for lists of basic elements, we handle them as unsorted sets:
for elem in src:
if elem not in dest:
dest.append(elem)
return
if dest == src:
return
raise ValueError('cannot combine dicts, conflicting items (values %r and %r)'
% (dest, src))
def replicate_times(d):
'''
replicate items that have a "times" > 1
'd' is a dict matching WANT_SCHEMA, which is the same as
the RESOURCES_SCHEMA, except each entity that can be reserved has a 'times'
field added, to indicate how many of those should be reserved.
'''
d = copy.deepcopy(d)
for key, item_list in d.items():
idx = 0
while idx < len(item_list):
item = item_list[idx]
times = int(item.pop('times', 1))
for j in range(1, times):
item_list.insert(idx + j, copy.deepcopy(item))
idx += times
return d
def validate(config, schema):
'''Make sure the given config dict adheres to the schema.
The schema is a dict of 'dict paths' in dot-notation with permitted
value type. All leaf nodes are validated, nesting dicts are implicit.
validate( { 'a': 123, 'b': { 'b1': 'foo', 'b2': [ 1, 2, 3 ] } },
{ 'a': int,
'b.b1': str,
'b.b2[]': int } )
Raise a ValueError in case the schema is violated.
'''
def validate_item(path, value, schema):
want_type = schema.get(path)
if util.is_list(value):
if want_type:
raise ValueError('config item is a list, should be %r: %r' % (want_type, path))
path = path + '[]'
want_type = schema.get(path)
if not want_type:
if util.is_dict(value):
nest(path, value, schema)
return
if util.is_list(value):
for list_v in value:
validate_item(path, list_v, schema)
return
raise ValueError('config item not known: %r' % path)
if want_type not in SCHEMA_TYPES:
raise ValueError('unknown type %r at %r' % (want_type, path))
if util.is_dict(value):
raise ValueError('config item is dict but should be a leaf node of type %r: %r'
% (want_type, path))
if util.is_list(value):
for list_v in value:
validate_item(path, list_v, schema)
return
fix and refactor logging: drop 'with', simplify With the recent fix of the junit report related issues, another issue arose: the 'with log.Origin' was changed to disallow __enter__ing an object twice to fix problems, now still code would fail because it tries to do 'with' on the same object twice. The only reason is to ensure that logging is associated with a given object. Instead of complicating even more, implement differently. Refactor logging to simplify use: drop the 'with Origin' style completely, and instead use the python stack to determine which objects are created by which, and which object to associate a log statement with. The new way: we rely on the convention that each class instance has a local 'self' referencing the object instance. If we need to find an origin as a new object's parent, or to associate a log message with, we traverse each stack frame, fetching the first local 'self' object that is a log.Origin class instance. How to use: Simply call log.log() anywhere, and it finds an Origin object to log for, from the stack. Alternatively call self.log() for any Origin() object to skip the lookup. Create classes as child class of log.Origin and make sure to call super().__init__(category, name). This constructor will magically find a parent Origin on the stack. When an exception happens, we first escalate the exception up through call scopes to where ever it is handled by log.log_exn(). This then finds an Origin object in the traceback's stack frames, no need to nest in 'with' scopes. Hence the 'with log.Origin' now "happens implicitly", we can write pure natural python code, no more hassles with scope ordering. Furthermore, any frame can place additional logging information in a frame by calling log.ctx(). This is automatically inserted in the ancestry associated with a log statement / exception. Change-Id: I5f9b53150f2bb6fa9d63ce27f0806f0ca6a45e90
2017-06-09 23:18:27 +00:00
log.ctx(path)
type_validator = SCHEMA_TYPES.get(want_type)
valid = type_validator(value)
if not valid:
raise ValueError('Invalid value %r for schema type \'%s\' (validator: %s)' % (value, want_type, type_validator.__name__))
def nest(parent_path, config, schema):
if parent_path:
parent_path = parent_path + '.'
else:
parent_path = ''
for k,v in config.items():
if not KEY_RE.fullmatch(k):
raise ValueError('invalid config key: %r' % k)
path = parent_path + k
validate_item(path, v, schema)
nest(None, config, schema)
def config_to_schema_def(src, key_prefix):
'Converts a yaml parsed config into a schema dictionary used by validate()'
if util.is_dict(src):
out_dict = {}
for key, val in src.items():
list_token = ''
dict_token = ''
if util.is_list(val):
list_token = '[]'
assert len(val) == 1
val = val[0]
if util.is_dict(val):
dict_token = '.'
tmp_out = config_to_schema_def(val, "%s%s%s%s" %(key_prefix, key, list_token, dict_token))
out_dict = {**out_dict, **tmp_out}
return out_dict
# base case: string
return {key_prefix: str(src)}
def generate_schemas():
"Generate supported schemas dynamically from objects"
obj_dir = '%s/../obj/' % os.path.dirname(os.path.abspath(__file__))
for filename in os.listdir(obj_dir):
if not filename.endswith(".py"):
continue
module_name = 'osmo_gsm_tester.obj.%s' % filename[:-3]
util.run_python_file_method(module_name, 'on_register_schemas', False)
_RESOURCE_TYPES = ['ip_address',]
_RESOURCES_SCHEMA = {
'ip_address[].addr': IPV4,
}
_CONFIG_SCHEMA = {}
_WANT_SCHEMA = None
_ALL_SCHEMA = None
def register_schema_types(schema_type_attr):
"""Register schema types to be used by schema attributes.
For instance: register_resource_schema_attributes({ 'fruit': lambda val: val in ('banana', 'apple') })
"""
global SCHEMA_TYPES
combine(SCHEMA_TYPES, schema_type_attr)
def register_resource_schema(obj_class_str, obj_attr_dict):
"""Register schema attributes for a resource type.
For instance: register_resource_schema_attributes('modem', {'type': schema.STR, 'ki': schema.KI})
"""
global _RESOURCES_SCHEMA
global _RESOURCE_TYPES
tmpdict = {}
for key, val in obj_attr_dict.items():
new_key = '%s[].%s' % (obj_class_str, key)
tmpdict[new_key] = val
combine(_RESOURCES_SCHEMA, tmpdict)
if obj_class_str not in _RESOURCE_TYPES:
_RESOURCE_TYPES.append(obj_class_str)
def register_config_schema(obj_class_str, obj_attr_dict):
"""Register schema attributes to configure all instances of an object class.
For instance: register_resource_schema_attributes('bsc', {'net.codec_list[]': schema.CODEC})
"""
global _CONFIG_SCHEMA, _ALL_SCHEMA
tmpdict = {}
for key, val in obj_attr_dict.items():
new_key = '%s.%s' % (obj_class_str, key)
tmpdict[new_key] = val
combine(_CONFIG_SCHEMA, tmpdict)
_ALL_SCHEMA = None # reset _ALL_SCHEMA so it is re-generated next time it's requested.
def get_resources_schema():
return _RESOURCES_SCHEMA;
def get_want_schema():
global _WANT_SCHEMA
if _WANT_SCHEMA is None:
_WANT_SCHEMA = util.dict_add(
dict([('%s[].times' % r, TIMES) for r in _RESOURCE_TYPES]),
get_resources_schema())
return _WANT_SCHEMA
def get_all_schema():
global _ALL_SCHEMA
if _ALL_SCHEMA is None:
want_schema = get_want_schema()
_ALL_SCHEMA = util.dict_add(
dict([('config.%s' % key, val) for key, val in _CONFIG_SCHEMA.items()]),
dict([('resources.%s' % key, val) for key, val in want_schema.items()]),
dict([('modifiers.%s' % key, val) for key, val in want_schema.items()]))
return _ALL_SCHEMA
# vim: expandtab tabstop=4 shiftwidth=4