schema: Allow objects registering their own schema types

Change-Id: I998c8674a55531909bfeac420064c3f238cea126
This commit is contained in:
Pau Espin 2020-05-21 15:40:57 +02:00
parent 27b609f4d3
commit d79e719804
4 changed files with 90 additions and 17 deletions

View File

@ -0,0 +1,29 @@
schema:
handover:
threshold: 'uint'
myvar: 'test_type'
anothervar: 'another_type'
tests:
- foobar:
prefix:
handover:
myvar: 'valid_value1'
anothervar: 'unique_val_ok'
threshold: 2
- foobar:
prefix:
handover:
myvar: 'valid_value2'
- foobar:
prefix:
handover:
threshold: 0
- foobar:
prefix:
handover:
myvar: 'invalid_val'
- foobar:
prefix:
handover:
anothervar: 'another_invalid_val'

View File

@ -61,3 +61,20 @@ validating tests[6]
--- -: ERR: ValueError: config item is a list, should be 'str': 'foobar.prefix.hey.ho.letsgo[]'
Validation: Error
----------------------
schema_case_06.conf:
{'foobar.prefix.handover.anothervar': 'another_type',
'foobar.prefix.handover.myvar': 'test_type',
'foobar.prefix.handover.threshold': 'uint'}
validating tests[0]
Validation: OK
validating tests[1]
Validation: OK
validating tests[2]
Validation: OK
validating tests[3]
--- foobar.prefix.handover.myvar: ERR: ValueError: Invalid value 'invalid_val' for schema type 'test_type' (validator: test_validator)
Validation: Error
validating tests[4]
--- foobar.prefix.handover.anothervar: ERR: ValueError: Invalid value 'another_invalid_val' for schema type 'another_type' (validator: <lambda>)
Validation: Error
----------------------

View File

@ -25,6 +25,13 @@ def get_case_list(dir):
li.append(f)
return sorted(li)
def test_validator(val):
return val in ('valid_value1', 'valid_value2')
schema.register_schema_types({'test_type': test_validator,
'another_type': lambda val: val == 'unique_val_ok'})
print('==== Testing dynamically generated schemas ====')
for f in get_case_list(_prep.script_dir):
print('%s:' % f)

View File

@ -36,12 +36,12 @@ def match_re(name, regex, val):
break;
if not regex.fullmatch(val):
break;
return
return True
raise ValueError('Invalid %s: %r' % (name, val))
def band(val):
if val in ('GSM-900', 'GSM-1800', 'GSM-1900'):
return
return True
raise ValueError('Unknown GSM band: %r' % val)
def ipv4(val):
@ -49,27 +49,30 @@ def ipv4(val):
els = [int(el) for el in val.split('.')]
if not all([el >= 0 and el <= 255 for el in els]):
raise ValueError('Invalid IPv4 address: %r' % val)
return True
def hwaddr(val):
match_re('hardware address', HWADDR_RE, val)
return match_re('hardware address', HWADDR_RE, val)
def imsi(val):
match_re('IMSI', IMSI_RE, val)
return match_re('IMSI', IMSI_RE, val)
def ki(val):
match_re('KI', KI_RE, val)
return match_re('KI', KI_RE, val)
def msisdn(val):
match_re('MSISDN', MSISDN_RE, val)
return match_re('MSISDN', MSISDN_RE, val)
def auth_algo(val):
if val not in util.ENUM_OSMO_AUTH_ALGO:
raise ValueError('Unknown Authentication Algorithm: %r' % val)
return True
def uint(val):
n = int(val)
if n < 0:
raise ValueError('Positive value expected instead of %d' % n)
return True
def uint8(val):
n = int(val)
@ -77,6 +80,7 @@ def uint8(val):
raise ValueError('Positive value expected instead of %d' % n)
if n > 255: # 2^8 - 1
raise ValueError('Value %d too big, max value is 255' % n)
return True
def uint16(val):
n = int(val)
@ -84,57 +88,64 @@ def uint16(val):
raise ValueError('Positive value expected instead of %d' % n)
if n > 65535: # 2^16 - 1
raise ValueError('Value %d too big, max value is 65535' % n)
return True
def bool_str(val):
# str2bool will raise an exception if unable to parse it
util.str2bool(val)
return True
def times(val):
n = int(val)
if n < 1:
raise ValueError('Positive value >0 expected instead of %d' % n)
return True
def cipher(val):
if val in ('a5_0', 'a5_1', 'a5_2', 'a5_3', 'a5_4', 'a5_5', 'a5_6', 'a5_7'):
return
return True
raise ValueError('Unknown Cipher value: %r' % val)
def modem_feature(val):
if val in ('sms', 'gprs', 'voice', 'ussd', 'sim', '2g', '3g', '4g'):
return
return True
raise ValueError('Unknown Modem Feature: %r' % val)
def phy_channel_config(val):
if val in ('CCCH', 'CCCH+SDCCH4', 'TCH/F', 'TCH/H', 'SDCCH8', 'PDCH',
'TCH/F_PDCH', 'CCCH+SDCCH4+CBCH', 'SDCCH8+CBCH','TCH/F_TCH/H_PDCH'):
return
return True
raise ValueError('Unknown Physical channel config: %r' % val)
def channel_allocator(val):
if val in ('ascending', 'descending'):
return
return True
raise ValueError('Unknown Channel Allocator Policy %r' % val)
def gprs_mode(val):
if val in ('none', 'gprs', 'egprs'):
return
return True
raise ValueError('Unknown GPRS mode %r' % val)
def codec(val):
if val in ('hr1', 'hr2', 'hr3', 'fr1', 'fr2', 'fr3'):
return
return True
raise ValueError('Unknown Codec value: %r' % val)
def osmo_trx_clock_ref(val):
if val in ('internal', 'external', 'gspdo'):
return
return True
raise ValueError('Unknown OsmoTRX clock reference value: %r' % val)
def lte_transmission_mode(val):
n = int(val)
if n <= 4:
return
return True
raise ValueError('LTE Transmission Mode %d not in expected range' % n)
def duration(val):
if val.isdecimal() or val.endswith('m') or val.endswith('h'):
return
return True
raise ValueError('Invalid duration value: %r' % val)
INT = 'int'
@ -163,7 +174,7 @@ SCHEMA_TYPES = {
INT: int,
STR: str,
UINT: uint,
BOOL_STR: util.str2bool,
BOOL_STR: bool_str,
BAND: band,
IPV4: ipv4,
HWADDR: hwaddr,
@ -310,7 +321,9 @@ def validate(config, schema):
log.ctx(path)
type_validator = SCHEMA_TYPES.get(want_type)
type_validator(value)
valid = type_validator(value)
if not valid:
raise ValueError('Invalid value %r for schema type \'%s\' (validator: %s)' % (value, want_type, type_validator.__name__))
def nest(parent_path, config, schema):
if parent_path:
@ -369,6 +382,13 @@ _CONFIG_SCHEMA = {}
_WANT_SCHEMA = None
_ALL_SCHEMA = None
def register_schema_types(schema_type_attr):
"""Register schema types to be used by schema attributes.
For instance: register_resource_schema_attributes({ 'fruit': lambda val: val in ('banana', 'apple') })
"""
global SCHEMA_TYPES
combine(SCHEMA_TYPES, schema_type_attr)
def register_resource_schema(obj_class_str, obj_attr_dict):
"""Register schema attributes for a resource type.
For instance: register_resource_schema_attributes('modem', {'type': schema.STR, 'ki': schema.KI})