osmo-gsm-tester/src/osmo_gsm_tester/core/schema.py

440 lines
14 KiB
Python
Raw Normal View History

# osmo_gsm_tester: validate dict structures
#
# Copyright (C) 2016-2017 by sysmocom - s.f.m.c. GmbH
#
# Author: Neels Hofmeyr <neels@hofmeyr.de>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
import os
from . import log
from . import util
KEY_RE = re.compile('[a-zA-Z0-9][a-zA-Z0-9_]*')
IPV4_RE = re.compile('([0-9]{1,3}.){3}[0-9]{1,3}')
HWADDR_RE = re.compile('([0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2}')
IMSI_RE = re.compile('[0-9]{6,15}')
KI_RE = re.compile('[0-9a-fA-F]{32}')
MSISDN_RE = re.compile('[0-9]{1,15}')
def match_re(name, regex, val):
while True:
if not isinstance(val, str):
break;
if not regex.fullmatch(val):
break;
return True
raise ValueError('Invalid %s: %r' % (name, val))
def band(val):
if val in ('GSM-900', 'GSM-1800', 'GSM-1900'):
return True
raise ValueError('Unknown GSM band: %r' % val)
def ipv4(val):
match_re('IPv4 address', IPV4_RE, val)
els = [int(el) for el in val.split('.')]
if not all([el >= 0 and el <= 255 for el in els]):
raise ValueError('Invalid IPv4 address: %r' % val)
return True
def hwaddr(val):
return match_re('hardware address', HWADDR_RE, val)
def imsi(val):
return match_re('IMSI', IMSI_RE, val)
def ki(val):
return match_re('KI', KI_RE, val)
def msisdn(val):
return match_re('MSISDN', MSISDN_RE, val)
def auth_algo(val):
if val not in util.ENUM_OSMO_AUTH_ALGO:
raise ValueError('Unknown Authentication Algorithm: %r' % val)
return True
def uint(val):
n = int(val)
if n < 0:
raise ValueError('Positive value expected instead of %d' % n)
return True
def uint8(val):
n = int(val)
if n < 0:
raise ValueError('Positive value expected instead of %d' % n)
if n > 255: # 2^8 - 1
raise ValueError('Value %d too big, max value is 255' % n)
return True
def uint16(val):
n = int(val)
if n < 0:
raise ValueError('Positive value expected instead of %d' % n)
if n > 65535: # 2^16 - 1
raise ValueError('Value %d too big, max value is 65535' % n)
return True
def bool_str(val):
# str2bool will raise an exception if unable to parse it
util.str2bool(val)
return True
def times(val):
n = int(val)
if n < 1:
raise ValueError('Positive value >0 expected instead of %d' % n)
return True
def cipher(val):
if val in ('a5_0', 'a5_1', 'a5_2', 'a5_3', 'a5_4', 'a5_5', 'a5_6', 'a5_7'):
return True
raise ValueError('Unknown Cipher value: %r' % val)
def modem_feature(val):
if val in ('sms', 'gprs', 'voice', 'ussd', 'sim', '2g', '3g', '4g'):
return True
raise ValueError('Unknown Modem Feature: %r' % val)
def phy_channel_config(val):
if val in ('CCCH', 'CCCH+SDCCH4', 'TCH/F', 'TCH/H', 'SDCCH8', 'PDCH',
'TCH/F_PDCH', 'CCCH+SDCCH4+CBCH', 'SDCCH8+CBCH','TCH/F_TCH/H_PDCH'):
return True
raise ValueError('Unknown Physical channel config: %r' % val)
def channel_allocator(val):
if val in ('ascending', 'descending'):
return True
raise ValueError('Unknown Channel Allocator Policy %r' % val)
def gprs_mode(val):
if val in ('none', 'gprs', 'egprs'):
return True
raise ValueError('Unknown GPRS mode %r' % val)
def codec(val):
if val in ('hr1', 'hr2', 'hr3', 'fr1', 'fr2', 'fr3'):
return True
raise ValueError('Unknown Codec value: %r' % val)
def osmo_trx_clock_ref(val):
if val in ('internal', 'external', 'gspdo'):
return True
raise ValueError('Unknown OsmoTRX clock reference value: %r' % val)
def lte_transmission_mode(val):
n = int(val)
if n <= 4:
return True
raise ValueError('LTE Transmission Mode %d not in expected range' % n)
def duration(val):
if val.isdecimal() or val.endswith('m') or val.endswith('h'):
return True
raise ValueError('Invalid duration value: %r' % val)
INT = 'int'
STR = 'str'
UINT = 'uint'
BOOL_STR = 'bool_str'
BAND = 'band'
IPV4 = 'ipv4'
HWADDR = 'hwaddr'
IMSI = 'imsi'
KI = 'ki'
MSISDN = 'msisdn'
AUTH_ALGO = 'auth_algo'
TIMES='times'
CIPHER = 'cipher'
MODEM_FEATURE = 'modem_feature'
PHY_CHAN = 'chan'
CHAN_ALLOCATOR = 'chan_allocator'
GPRS_MODE = 'gprs_mode'
CODEC = 'codec'
OSMO_TRX_CLOCK_REF = 'osmo_trx_clock_ref'
LTE_TRANSMISSION_MODE = 'lte_transmission_mode'
DURATION = 'duration'
SCHEMA_TYPES = {
INT: int,
STR: str,
UINT: uint,
BOOL_STR: bool_str,
BAND: band,
IPV4: ipv4,
HWADDR: hwaddr,
IMSI: imsi,
KI: ki,
MSISDN: msisdn,
AUTH_ALGO: auth_algo,
TIMES: times,
CIPHER: cipher,
MODEM_FEATURE: modem_feature,
PHY_CHAN: phy_channel_config,
CHAN_ALLOCATOR: channel_allocator,
GPRS_MODE: gprs_mode,
CODEC: codec,
OSMO_TRX_CLOCK_REF: osmo_trx_clock_ref,
LTE_TRANSMISSION_MODE: lte_transmission_mode,
DURATION: duration,
}
def add(dest, src):
if util.is_dict(dest):
if not util.is_dict(src):
raise ValueError('cannot add to dict a value of type: %r' % type(src))
for key, val in src.items():
dest_val = dest.get(key)
if dest_val is None:
dest[key] = val
else:
log.ctx(key=key)
add(dest_val, val)
return
if util.is_list(dest):
if not util.is_list(src):
raise ValueError('cannot add to list a value of type: %r' % type(src))
dest.extend(src)
return
if dest == src:
return
raise ValueError('cannot add dicts, conflicting items (values %r and %r)'
% (dest, src))
def combine(dest, src):
if util.is_dict(dest):
if not util.is_dict(src):
raise ValueError('cannot combine dict with a value of type: %r' % type(src))
for key, val in src.items():
log.ctx(key=key)
dest_val = dest.get(key)
if dest_val is None:
dest[key] = val
else:
combine(dest_val, val)
return
if util.is_list(dest):
if not util.is_list(src):
raise ValueError('cannot combine list with a value of type: %r' % type(src))
# Validate that all elements in both lists are of the same type:
t = util.list_validate_same_elem_type(src + dest)
if t is None:
return # both lists are empty, return
# For lists of complex objects, we expect them to be sorted lists:
if t in (dict, list, tuple):
for i in range(len(dest)):
log.ctx(idx=i)
src_it = src[i] if i < len(src) else util.empty_instance_type(t)
combine(dest[i], src_it)
for i in range(len(dest), len(src)):
log.ctx(idx=i)
dest.append(src[i])
else: # for lists of basic elements, we handle them as unsorted sets:
for elem in src:
if elem not in dest:
dest.append(elem)
return
if dest == src:
return
raise ValueError('cannot combine dicts, conflicting items (values %r and %r)'
% (dest, src))
def replicate_times(d):
'''
replicate items that have a "times" > 1
'd' is a dict matching WANT_SCHEMA, which is the same as
the RESOURCES_SCHEMA, except each entity that can be reserved has a 'times'
field added, to indicate how many of those should be reserved.
'''
d = copy.deepcopy(d)
for key, item_list in d.items():
idx = 0
while idx < len(item_list):
item = item_list[idx]
times = int(item.pop('times', 1))
for j in range(1, times):
item_list.insert(idx + j, copy.deepcopy(item))
idx += times
return d
def validate(config, schema):
'''Make sure the given config dict adheres to the schema.
The schema is a dict of 'dict paths' in dot-notation with permitted
value type. All leaf nodes are validated, nesting dicts are implicit.
validate( { 'a': 123, 'b': { 'b1': 'foo', 'b2': [ 1, 2, 3 ] } },
{ 'a': int,
'b.b1': str,
'b.b2[]': int } )
Raise a ValueError in case the schema is violated.
'''
def validate_item(path, value, schema):
want_type = schema.get(path)
if util.is_list(value):
if want_type:
raise ValueError('config item is a list, should be %r: %r' % (want_type, path))
path = path + '[]'
want_type = schema.get(path)
if not want_type:
if util.is_dict(value):
nest(path, value, schema)
return
if util.is_list(value) and value:
for list_v in value:
validate_item(path, list_v, schema)
return
raise ValueError('config item not known: %r' % path)
if want_type not in SCHEMA_TYPES:
raise ValueError('unknown type %r at %r' % (want_type, path))
if util.is_dict(value):
raise ValueError('config item is dict but should be a leaf node of type %r: %r'
% (want_type, path))
if util.is_list(value):
for list_v in value:
validate_item(path, list_v, schema)
return
fix and refactor logging: drop 'with', simplify With the recent fix of the junit report related issues, another issue arose: the 'with log.Origin' was changed to disallow __enter__ing an object twice to fix problems, now still code would fail because it tries to do 'with' on the same object twice. The only reason is to ensure that logging is associated with a given object. Instead of complicating even more, implement differently. Refactor logging to simplify use: drop the 'with Origin' style completely, and instead use the python stack to determine which objects are created by which, and which object to associate a log statement with. The new way: we rely on the convention that each class instance has a local 'self' referencing the object instance. If we need to find an origin as a new object's parent, or to associate a log message with, we traverse each stack frame, fetching the first local 'self' object that is a log.Origin class instance. How to use: Simply call log.log() anywhere, and it finds an Origin object to log for, from the stack. Alternatively call self.log() for any Origin() object to skip the lookup. Create classes as child class of log.Origin and make sure to call super().__init__(category, name). This constructor will magically find a parent Origin on the stack. When an exception happens, we first escalate the exception up through call scopes to where ever it is handled by log.log_exn(). This then finds an Origin object in the traceback's stack frames, no need to nest in 'with' scopes. Hence the 'with log.Origin' now "happens implicitly", we can write pure natural python code, no more hassles with scope ordering. Furthermore, any frame can place additional logging information in a frame by calling log.ctx(). This is automatically inserted in the ancestry associated with a log statement / exception. Change-Id: I5f9b53150f2bb6fa9d63ce27f0806f0ca6a45e90
2017-06-09 23:18:27 +00:00
log.ctx(path)
type_validator = SCHEMA_TYPES.get(want_type)
valid = type_validator(value)
if not valid:
raise ValueError('Invalid value %r for schema type \'%s\' (validator: %s)' % (value, want_type, type_validator.__name__))
def nest(parent_path, config, schema):
if parent_path:
parent_path = parent_path + '.'
else:
parent_path = ''
for k,v in config.items():
if not KEY_RE.fullmatch(k):
raise ValueError('invalid config key: %r' % k)
path = parent_path + k
validate_item(path, v, schema)
nest(None, config, schema)
def config_to_schema_def(src, key_prefix):
'Converts a yaml parsed config into a schema dictionary used by validate()'
if util.is_dict(src):
out_dict = {}
for key, val in src.items():
list_token = ''
dict_token = ''
if util.is_list(val):
list_token = '[]'
assert len(val) == 1
val = val[0]
if util.is_dict(val):
dict_token = '.'
tmp_out = config_to_schema_def(val, "%s%s%s%s" %(key_prefix, key, list_token, dict_token))
out_dict = {**out_dict, **tmp_out}
return out_dict
# base case: string
return {key_prefix: str(src)}
def generate_schemas():
"Generate supported schemas dynamically from objects"
obj_dir = '%s/../obj/' % os.path.dirname(os.path.abspath(__file__))
for filename in os.listdir(obj_dir):
if not filename.endswith(".py"):
continue
module_name = 'osmo_gsm_tester.obj.%s' % filename[:-3]
util.run_python_file_method(module_name, 'on_register_schemas', False)
_RESOURCE_TYPES = ['ip_address', 'arfcn']
_RESOURCES_SCHEMA = {
'ip_address[].addr': IPV4,
'arfcn[].arfcn': INT,
'arfcn[].band': BAND,
}
_CONFIG_SCHEMA = {}
_WANT_SCHEMA = None
_ALL_SCHEMA = None
def register_schema_types(schema_type_attr):
"""Register schema types to be used by schema attributes.
For instance: register_resource_schema_attributes({ 'fruit': lambda val: val in ('banana', 'apple') })
"""
global SCHEMA_TYPES
combine(SCHEMA_TYPES, schema_type_attr)
def register_resource_schema(obj_class_str, obj_attr_dict):
"""Register schema attributes for a resource type.
For instance: register_resource_schema_attributes('modem', {'type': schema.STR, 'ki': schema.KI})
"""
global _RESOURCES_SCHEMA
global _RESOURCE_TYPES
tmpdict = {}
for key, val in obj_attr_dict.items():
new_key = '%s[].%s' % (obj_class_str, key)
tmpdict[new_key] = val
combine(_RESOURCES_SCHEMA, tmpdict)
if obj_class_str not in _RESOURCE_TYPES:
_RESOURCE_TYPES.append(obj_class_str)
def register_config_schema(obj_class_str, obj_attr_dict):
"""Register schema attributes to configure all instances of an object class.
For instance: register_resource_schema_attributes('bsc', {'net.codec_list[]': schema.CODEC})
"""
global _CONFIG_SCHEMA, _ALL_SCHEMA
tmpdict = {}
for key, val in obj_attr_dict.items():
new_key = '%s.%s' % (obj_class_str, key)
tmpdict[new_key] = val
combine(_CONFIG_SCHEMA, tmpdict)
_ALL_SCHEMA = None # reset _ALL_SCHEMA so it is re-generated next time it's requested.
def get_resources_schema():
return _RESOURCES_SCHEMA;
def get_want_schema():
global _WANT_SCHEMA
if _WANT_SCHEMA is None:
_WANT_SCHEMA = util.dict_add(
dict([('%s[].times' % r, TIMES) for r in _RESOURCE_TYPES]),
get_resources_schema())
return _WANT_SCHEMA
def get_all_schema():
global _ALL_SCHEMA
if _ALL_SCHEMA is None:
want_schema = get_want_schema()
_ALL_SCHEMA = util.dict_add(
dict([('config.%s' % key, val) for key, val in _CONFIG_SCHEMA.items()]),
dict([('resources.%s' % key, val) for key, val in want_schema.items()]),
dict([('modifiers.%s' % key, val) for key, val in want_schema.items()]))
return _ALL_SCHEMA
# vim: expandtab tabstop=4 shiftwidth=4