# -*- coding: UTF-8 -*- #/** # * Software Name : pycrate # * Version : 0.4 # * # * Copyright 2017. Benoit Michau. ANSSI. # * # * This library is free software; you can redistribute it and/or # * modify it under the terms of the GNU Lesser General Public # * License as published by the Free Software Foundation; either # * version 2.1 of the License, or (at your option) any later version. # * # * This library is distributed in the hope that it will be useful, # * but WITHOUT ANY WARRANTY; without even the implied warranty of # * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # * Lesser General Public License for more details. # * # * You should have received a copy of the GNU Lesser General Public # * License along with this library; if not, write to the Free Software # * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, # * MA 02110-1301 USA # * # *-------------------------------------------------------- # * File Name : pycrate_corenet/ProcProto.py # * Created : 2017-07-13 # * Authors : Benoit Michau # *-------------------------------------------------------- #*/ from pycrate_mobile.TS24007 import * from pycrate_corenet.utils_fmt import * #------------------------------------------------------------------------------# # wrapping classes #------------------------------------------------------------------------------# # Signaling stack handler (e.g. for HNBd, ENBd, GNBd, UEd) class SigStack(object): pass # Signaling procedure handler class SigProc(object): pass #------------------------------------------------------------------------------# # RAN-supported procedures (HNBAP, RUA, RANAP, S1AP) #------------------------------------------------------------------------------# class LinkSigProc(SigProc): """wrapping class that defines common methods for Iu-based and S1-based signalling procedures; relies heavily on the ASN.1 definitions """ # to keep track of the PDU(s) exchanged within this procedure TRACK_PDU = True # PDU type look-up _ptype_lut = { 'ini': 'initiatingMessage', 'suc': 'successfulOutcome', 'uns': 'unsuccessfulOutcome' } # default criticality for encoding ASN.1 undefined IE / Ext _criticality_undef = 'ignore' # ASN.1 procedure description (e.g. HNBAP.HNBAP_PDU_Descriptions.*) Desc = None # Custom decoders: # for each type of PDU (ini / suc / uns), provides specific functions (or None) # for given IEs and Exts name # this allows to collect IEs / Exts in their original ASN.1 value, or after # a specific transformation Decod = { 'ini': ({}, {}), 'suc': None, 'uns': None, } # Custom encoders: # for each type of PDU (ini / suc / uns), provides specific static values # for given IEs and Exts name # this allows to override values passed at runtime or set static values Encod = { 'ini': ({}, {}), 'suc': None, 'uns': None, } #--------------------------------------------------------------------------# @classmethod def init(cls): """class initialization required to build .Cont attribute with PDU(s) content and extend .Decod and .Encod from .Desc attribute (ASN.1 procedure description) """ # 1) get procedure code and criticality from description desc = cls.Desc() cls.Code = desc['procedureCode'] cls.Crit = desc['criticality'] # class 1: request-response, class 2: request only cls.Class = 2 # 2) retrieve PDU(s) content from description # -> get dict of protocolIEs (ident: value's type) # -> get dict of protocolExtensions (ident: extvalue's type) # -> set list of mandatory IEs # -> add in the Encod and Decod dicts the numerical `ident` index # for given encoders / decoders cls.Cont = {'ini': None, 'suc': None, 'uns': None} for ptype in (('InitiatingMessage', 'ini'), ('SuccessfulOutcome', 'suc'), ('Outcome', 'suc'), # this is used in RANAP ('UnsuccessfulOutcome', 'uns')): if ptype[0] in desc: if ptype[1] != 'ini': # request-response procedure cls.Class = 1 encod, decod = cls.Encod[ptype[1]], cls.Decod[ptype[1]] content, cont_ies, cont_exts, mand = desc[ptype[0]], {}, {}, [] if 'protocolIEs' in content._cont: # get the ASN.1 set of defined {ident : value's type} set_ies = content._cont['protocolIEs']._cont._cont['value']._const_tab ord_ies = set_ies('id') for ident in ord_ies: cont_ies[ident] = set_ies('id', ident) if cont_ies[ident]['presence'] == 'mandatory': mand.append( ident ) try: pyname = pythonize_name(cont_ies[ident]['Value']._tr._name) except Exception: pass else: if pyname in encod[0]: encod[0][ident] = encod[0][pyname] if pyname in decod[0]: decod[0][ident] = decod[0][pyname] else: ord_ies = [] if 'protocolExtensions' in content._cont: # get the ASN.1 set of defined {ident : extvalue's type} set_exts = content._cont['protocolExtensions']._cont._cont['extensionValue']._const_tab ord_exts = set_exts('id') for ident in ord_exts: cont_exts[ident] = set_exts('id', ident) if cont_exts[ident]['presence'] == 'mandatory': mand.append( ident ) try: pyname = pythonize_name(cont_exts[ident]['Extension']._tr._name) except Exception: pass else: if pyname in encod[1]: encod[1][ident] = encod[1][pyname] if pyname in decod[1]: decod[1][ident] = decod[1][pyname] else: ord_exts = [] if not cont_ies: cont_ies = None if not cont_exts: cont_exts = None cls.Cont[ptype[1]] = (content, cont_ies, cont_exts, mand, ord_ies, ord_exts) #--------------------------------------------------------------------------# def decode_pdu(self, pdu, ret): """decode the pdu and populate ret (dict) with the collected values select the expected content in self.Cont, according to the pdu type select the potential decoders in self.Decod raise Exception if an error requiring procedure rejection is found when unknown identifiers are encountered: IE buffer value is set with key 'id_%id', Extension buffer value is wet with key 'idext_%id' this enables to collect also IE and Extension that are not part of the original ASN.1 specification """ # 1) select the correct PDU and content ptype = pdu[0][:3] if ptype == 'out': # some 3G RAN procedure have only 'outcome' pdu # which are changed to 'suc' ptype = 'suc' Cont, IEs, Extensions, mand, ord_ies, ord_exts = self.Cont[ptype] Decod = self.Decod[ptype] # val = pdu[1] # 2) check the PDU criticality # actually, the sender can modify the criticality of an IE, # the criticality from the ASN.1 has to be used only in case the IE # is not present in the PDU #if val['criticality'] != self.Crit: # # this actually happens in real life: must not raise()... # #raise(CorenetErr('invalid PDU criticality')) # self._log('WNG', 'decode_pdu: incorrect PDU criticality, %s' % val['criticality']) # # 3) ensure the PDU content has been properly decoded if not isinstance(val, dict) or 'value' not in val \ or not isinstance(val['value'], tuple) or val['value'][0] != Cont._tr._name: raise(CorenetErr('invalid PDU content')) # # 4) get the value part of the PDU with IEs and Extensions, # and copy the list of mandatory IEs val, mand = val['value'][1], mand[:] # # 5) collect the list of IEs' values if 'protocolIEs' in val: for ie in val['protocolIEs']: # get the value identifier and corresponding ASN.1 object ident = ie['id'] try: IE = IEs[ident] except Exception: # unknown IE, c'est pas grave... self._log('INF', 'decode_pdu: unknown IE ident in PDU, %r' % ie) ret['id_%i' % ident] = ie['value'] else: name = IE['Value']._tr._name # check the ie criticality #if ie['criticality'] != IE['criticality']: # #raise(CorenetErr('invalid IE criticality in PDU, id %i' % ident)) # self._log('WNG', 'decode_pdu: incorrect IE %s criticality, %s'\ # % (ident, ie['criticality'])) # ensure the value content has been properly decoded if ie['value'][0] != name: raise(CorenetErr('invalid IE value in PDU, id %i' % ident)) # collect and eventually transform the ie value if ident in Decod[0]: ret[pythonize_name(name)] = Decod[0][ident](ie['value'][1]) else: ret[pythonize_name(name)] = ie['value'][1] # remove the value identifier from the list of mandatory values if ident in mand: mand.remove(ident) # # 6) collect the list of Extensions' values if 'protocolExtensions' in val: for ie in val['protocolExtensions']: # get the value identifier and corresponding ASN.1 object ident = ie['id'] try: IE = Extensions[ident] except Exception: # unknown Extension, c'est pas grave non plus... self._log('INF', 'decode_pdu: unknown Ext ident in PDU, %r' % ie) ret['idext_%i' % ident] = ie['extensionValue'] else: name = IE['Extension']._tr._name # check the ie criticality #if ie['criticality'] != IE['criticality']: # #raise(CorenetErr('invalid Extension criticality in PDU, id %i' % ident)) # self._log('WNG', 'decode_pdu: incorrect Extension %s criticality, %s'\ # % (ident, ie['criticality'])) # ensure the value content has been properly decoded if ie['extensionValue'][0] != name: raise(CorenetErr('invalid Extension value in PDU, id %i' % ident)) # collect and eventually transform the ie value if ident in Decod[1]: ret[pythonize_name(name)] = Decod[1][ident](ie['extensionValue'][1]) else: ret[pythonize_name(name)] = ie['extensionValue'][1] # remove the value identifier from the list of mandatory values if ident in mand: mand.remove(ident) # # 7) if not all mandatory IEs have not been decoded, raise if mand: raise(CorenetErr('missing mandatory IEs in PDU, %r' % mand)) def encode_pdu(self, ptype, **kw): """encode the provided IEs' values from **kw into the PDU of type ptype ('ini', 'suc' or 'uns') and stack it in self._pdu_tx values provided in self.Encod will be set in priority, potentially overriding those in **kw values can be passed by their structure name and value: e.g. 'UE_Usage_Type': 1, or by their identifier and buffer value e.g. 'id_290': b'\x01' (tip: id_$ident will never clash with an IE name which starts with an upper case) when passing values by their identifier: 'id_%id' must be used when part of the IEs, 'idext_%id' must be used when part of the Exts this enables also to add IE and Extension that are not part of the original ASN.1 specification """ # 1) select the correct PDU and content Cont, IEs, Extensions, mand, ord_ies, ord_exts = self.Cont[ptype] self._NetInfo, Encod, pdu_ies, pdu_exts = kw.copy(), self.Encod[ptype], [], [] # # 2) encode the list of IEs' values if IEs is not None: #for (ident, IE) in IEs.items(): for ident in ord_ies: IE = IEs[ident] idname = 'id_%i' % ident val = self._get_ie_val(idname, IE, Encod[0], kw) if val is not None: pdu_ies.append({'id': ident, 'criticality': IE['criticality'], 'value': val}) elif ident in mand: self._log('WNG', 'encode_pdu: missing mandatory IE, ident %i' % ident) # sort pdu_ies in order according to 'id' #pdu_ies.sort(key=lambda x:x['id']) # # 3) encode the list of Extensions' values if Extensions is not None: #for (ident, IE) in Extensions.items(): for ident in ord_exts: IE = Extensions[ident] idname = 'idext_%i' % ident val = self._get_ie_val(idname, IE, Encod[1], kw) if val is not None: pdu_exts.append({'id': ident, 'criticality': IE['criticality'], 'extensionValue': val}) elif ident in mand: self._log('WNG', 'encode_pdu: missing mandatory Ext, ident %i' % ident) # sort pdu_exts in order according to 'id' #pdu_exts.sort(key=lambda x:x['id']) # # 4) enable also undefined buffer values passed at runtime to be encoded for name in kw: if name[:3] == 'id_': ident = int(name[3:]) if isinstance(kw[name], tuple): crit = kw[name][0] val = kw[name][1] else: crit = self._criticality_undef val = kw[name] pdu_ies.append({'id': ident, 'criticality': crit, 'value': val}) elif name[:6] == 'idext_': ident = int(name[6:]) if isinstance(kw[name], tuple): crit = kw[name][0] val = kw[name][1] else: crit = self._criticality_undef val = kw[name] pdu_exts.append({'id': ident, 'criticality': crit, 'value': val}) # # 5) build the whole PDU val = {'protocolIEs': pdu_ies} if pdu_exts: val['protocolExtensions'] = pdu_exts self._pdu_tx.append( (self._ptype_lut[ptype], {'procedureCode': self.Code, 'criticality': self.Crit, 'value': (Cont._tr._name, val)}) ) def _get_ie_val(self, idname, IE, Encod, kw): val = None if IE['Value']._tr is not None: # IE refers to a sub object, that can be assigned by name iename = IE['Value']._tr._name pyname = pythonize_name(iename) if pyname in Encod: # static object value provided at the procedure class level # referred by its name val = (iename, Encod[pyname]) if pyname in kw: del kw[pyname] elif pyname in kw: # object value provided at runtime # referred by its name val = (iename, kw[pyname]) del kw[pyname] if val is None: if idname in Encod: # static object value provided at the procedure class level # referred by its ident val = Encod[idname] if idname in kw: del kw[idname] elif idname in kw: # object value provided at runtime # referred by its ident val = kw[idname] del kw[idname] return val #--------------------------------------------------------------------------# def recv(self, pdu): """process the PDU received by the signalling stack """ self._log('ERR', 'recv() not implemented') def send(self): """return a list of PDU(s) to be sent by the signalling stack """ self._log('ERR', 'send() not implemented') return self._pdu_tx def trigger(self): """return a list of new procedure(s) which were created during previous processing """ self._log('ERR', 'trigger() not implemented') return [] def abort(self): """abort the procedure, e.g. due to a timeout or an error indication """ pass #------------------------------------------------------------------------------# # NAS signalling procedures #------------------------------------------------------------------------------# tlv_get_first = lambda x: x[0] tlv_get_second = lambda x: x[1] tlv_get_third = lambda x: x[2] tlv_get_cap = lambda x: (x._V.get_val(), x._IE) class NASSigProc(SigProc): """wrapping class that defines common methods for NAS signalling procedures """ # to keep track of the NAS message(s) exchanged within this procedure TRACK_MSG = True # procedure NAS message content: # CN-initiated msg class (None or tuple), UE-initiated msg class (None or tuple) Cont = (None, None) # Custom decoders: # for each type of NAS msg defined in Cont, provides specific functions (or None) # for given IE name # this allows to collect IEs in their original NAS value, or after a specific transformation Decod = {} # Custom encoders: # for each type of NAS msg defined in Cont, provides specific static values # for given IE name # this allows to override values passed at runtime or set static values Encod = {} # NAS message processing filter, by (ProtDisc, Type), built at class init Filter = set() # NAS message processing filter, by message name, built at class init FilterStr = set() # list of IE (essentially capabilities), for which we want to get the raw # buffer and the decoded IE value Cap = () #--------------------------------------------------------------------------# @classmethod def init(cls, filter_init=1): """class initialization required to build .Filter attribute describing NAS message type accepted by the procedure handler, and default .Decod attribute to extract only V part of LV / TV / TLV IEs. filter_init = 0, builds Filter / FilterStr with CN-initiated message filter_init = 1, builds Filter / FilterStr with UE-initiated message """ ContLUT, Encod, Decod = {}, {}, {} # # CN-initiated NAS msg if cls.Cont[0] is not None: for i, msgclass in enumerate(cls.Cont[0]): msg = msgclass() mhdr = msg[0] if mhdr[0]._name == 'TIPD': mid = (mhdr[0]['ProtDisc'].get_val(), mhdr['Type'].get_val()) elif 'ProtDisc' in mhdr._by_name: mid = (mhdr['ProtDisc'].get_val(), mhdr['Type'].get_val()) else: mid = (mhdr['EPD'].get_val(), mhdr['Type'].get_val()) mies = msg[1:] ContLUT[mid] = (0, i) if mid not in cls.Encod: Encod[mid] = {} else: Encod[mid] = cls.Encod[mid] if mid not in cls.Decod: Decod[mid] = {} else: Decod[mid] = cls.Decod[mid] # build default decoders when not user-defined for ie in mies: if ie._name not in Decod[mid]: if ie._name in cls.Cap: Decod[mid][ie._name] = tlv_get_cap elif isinstance(ie, (Type1V, Type3V)): Decod[mid][ie._name] = tlv_get_first elif isinstance(ie, (Type1TV, Type3TV, Type4LV, Type6LVE)): Decod[mid][ie._name] = tlv_get_second elif isinstance(ie, (Type4TLV, Type6TLVE)): Decod[mid][ie._name] = tlv_get_third # # UE-initiated NAS msg if cls.Cont[1] is not None: for i, msgclass in enumerate(cls.Cont[1]): msg = msgclass() mhdr = msg[0] if mhdr[0]._name == 'TIPD': mid = (mhdr[0]['ProtDisc'].get_val(), mhdr['Type'].get_val()) elif 'ProtDisc' in mhdr._by_name: mid = (mhdr['ProtDisc'].get_val(), mhdr['Type'].get_val()) else: mid = (mhdr['EPD'].get_val(), mhdr['Type'].get_val()) mies = msg[1:] ContLUT[mid] = (1, i) if mid not in cls.Encod: Encod[mid] = {} else: Encod[mid] = cls.Encod[mid] if mid not in cls.Decod: Decod[mid] = {} else: Decod[mid] = cls.Decod[mid] # build default decoders when not user-defined for ie in mies: if ie._name not in Decod[mid]: if ie._name in cls.Cap: Decod[mid][ie._name] = tlv_get_cap elif isinstance(ie, (Type1V, Type3V)): Decod[mid][ie._name] = tlv_get_first elif isinstance(ie, (Type1TV, Type3TV, Type4LV, Type6LVE)): Decod[mid][ie._name] = tlv_get_second elif isinstance(ie, (Type4TLV, Type6TLVE)): Decod[mid][ie._name] = tlv_get_third # cls.ContLUT, cls.Encod, cls.Decod = ContLUT, Encod, Decod # Filter, FilterStr = set(), set() if cls.Cont[filter_init] is not None: for msgclass in cls.Cont[filter_init]: msg = msgclass() mhdr = msg[0] if mhdr[0]._name == 'TIPD': mid = (mhdr[0]['ProtDisc'].get_val(), mhdr['Type'].get_val()) elif 'ProtDisc' in mhdr._by_name: mid = (mhdr['ProtDisc'].get_val(), mhdr['Type'].get_val()) else: mid = (mhdr['EPD'].get_val(), mhdr['Type'].get_val()) Filter.add(mid) FilterStr.add(msg._name) if Filter: cls.Filter, cls.FilterStr = Filter, FilterStr #--------------------------------------------------------------------------# def _prepare(self, encod=None): # _prepare() must be called by each NASSigProc.__init__() method # self.Name = self.__class__.__name__ # # set empty dicts for the NAS messages of the instance self.Encod = {mid: {} for mid in self.__class__.Encod} # if encod: for k, v in encod.items(): self.set_msg(*k, **v) # # to store PDU traces self._pdu = [] # NAS message received from / to be sent to the UE self._nas_rx = None self._nas_tx = None #--------------------------------------------------------------------------# def decode_msg(self, msg, ret): """decode the NAS msg and populate ret (dict) with the collected IE values (all fields after the `Type' field) select specific IE decoders in self.Decod for the given message to potentially transform IE values collected """ self._nas_rx, pd, typ = msg, None, None for ie in msg[0]._content: # header if ie._name == 'TIPD': pd = ie['ProtDisc'].get_val() elif ie._name == 'ProtDisc': pd = ie.get_val() elif ie._name == 'Type': typ = ie.get_val() if pd is None or typ is None: self._log('WNG', 'decode_msg: no ProtDisc / Type found') return try: Decod = self.Decod[(pd, typ)] except Exception: self._log('WNG', 'decode_msg: no decoder dict found') Decod = {} # for ie in msg._content[1:]: if not ie.get_trans(): if ie._name in Decod: ret[ie._name] = Decod[ie._name](ie) else: # this will include potential unknown IE, which name will be _T_$tag ret[ie._name] = ie def encode_msg(self, pd, typ): """encode the NAS msg from protocol discriminator `pd' and type `typ' with IE values provided in self.Encod, and set the encoded message in self._nas_tx. if IE values are set for the class (self.__class__.Encod), they will overwrite those from self.Encod. values can be passed by their IE name and value: e.g. 'UE_Usage_Type': 1, or by their tag and buffer or integral value for unspecified IE: e.g. '_T_29': b'\x01', this enables also to add IE that are not part of the original specification """ mid = (pd, typ) # get the instance encoder try: Encod = self.Encod[mid] except Exception: self._log('WNG', 'encode_msg: no encoder dict found') Encod = {} # get the class encoder and update the instance's one ClaEncod = self.__class__.Encod[mid] if ClaEncod: Encod.update(ClaEncod) # # instantiate the msg with those values i, j = self.ContLUT[mid] self._nas_tx = self.Cont[i][j](val=Encod) # # add potential unspecified extension ext = [k for k in Encod.keys() if k[:3] == '_T_'] if ext: if isinstance(self._nas_tx, Layer3E): for k in ext: tag = int(k[3:]) if tag & 0x80: self._nas_tx.append( Type1TV(k, val={'T':tag>>4, 'V':tag&0xf}) ) elif tag & 0x70 == 0x70: self._nas_tx.append( Type6TLVE(k, val={'T':tag, 'V':Encod[k]}) ) else: self._nas_tx.append( Type4TLV(k, val={'T':tag, 'V':Encod[k]}) ) else: #isinstance(self._nas_tx, Layer3) for k in ext: tag = int(k[3:]) if tag & 0x80: self._nas_tx.append( Type1TV(k, val={'T':tag>>4, 'V':tag&0xf}) ) else: self._nas_tx.append( Type4TLV(k, val={'T':tag, 'V':Encod[k]}) ) # in many NAS procedures, encod_msg() will be called automatically by output() # instead, set_msg() must be used for preparing a payload from an external procedure def set_msg(self, pd, typ, **kw): """prepare a specific encoder dict for a given NAS message """ # select the encoder and duplicate it try: Encod = self.Encod[(pd, typ)] except Exception: return Encod.update(kw) #--------------------------------------------------------------------------# def output(self): """return a NAS msg to be sent by the signalling stack """ self._log('ERR', 'output() not implemented') return None def process(self, msg): """process the NAS msg received by the signalling stack """ self._log('ERR', 'process() not implemented') return None def postprocess(self, proc=None): """post-processing after a nested procedure `proc' has ended """ self._log('ERR', 'postprocess() not implemented') return None def abort(self): """abort the procedure, e.g. due to a timeout or an error indication """ pass