# -*- coding: UTF-8 -*- #/** # * Software Name : pycrate # * Version : 0.4 # * # * Copyright 2017. Benoit Michau. ANSSI. # * Copyright 2020. Benoit Michau. P1Sec. # * # * This library is free software; you can redistribute it and/or # * modify it under the terms of the GNU Lesser General Public # * License as published by the Free Software Foundation; either # * version 2.1 of the License, or (at your option) any later version. # * # * This library is distributed in the hope that it will be useful, # * but WITHOUT ANY WARRANTY; without even the implied warranty of # * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # * Lesser General Public License for more details. # * # * You should have received a copy of the GNU Lesser General Public # * License along with this library; if not, write to the Free Software # * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, # * MA 02110-1301 USA # * # *-------------------------------------------------------- # * File Name : pycrate_corenet/utils.py # * Created : 2017-06-28 # * Authors : Benoit Michau # *-------------------------------------------------------- #*/ # Python built-ins libraries required import sys import socket import random import re #import traceback from select import select from threading import Thread, Lock, Event from random import SystemRandom, randint from time import time, sleep from datetime import datetime from socket import AF_INET, AF_INET6, AF_PACKET, ntohl, htonl, ntohs, htons # SCTP support for NGAP / S1AP / HNBAP / RUA interfaces try: import sctp except ImportError as err: print('pysctp library required for CorenetServer') print('check on github: https://github.com/P1sec/pysctp') raise(err) # conversion function for security context try: from CryptoMobile.conv import * except ImportError as err: print('CryptoMobile library required for CorenetServer') print('check on github: https://github.com/P1sec/CryptoMobile') raise(err) # all pycrate stuffs from pycrate_core.utils import * from pycrate_core.repr import * from pycrate_core.elt import Element, Envelope from pycrate_core.base import Buf, Uint8 Element._SAFE_STAT = True Element._SAFE_DYN = True from pycrate_corenet.utils_fmt import * from pycrate_corenet.ProcProto import SigStack, SigProc log('pycrate_corenet: loading all ASN.1 and NAS modules, be patient...') # import ASN.1 modules # to drive gNodeB and ng-eNodeB from pycrate_asn1dir import NGAP # to drive eNodeB and Home-eNodeB from pycrate_asn1dir import S1AP # to drive Home-NodeB from pycrate_asn1dir import HNBAP from pycrate_asn1dir import RUA from pycrate_asn1dir import RANAP # to decode UE 3G, LTE and NR radio capability from pycrate_asn1dir import RRC3G from pycrate_asn1dir import RRCLTE from pycrate_asn1dir import RRCNR # to handle SS messages from pycrate_asn1dir import SS # from pycrate_asn1rt.utils import get_val_at # to drive 3G UE from pycrate_mobile import TS24007 from pycrate_mobile import TS24008_IE # CS domain from pycrate_mobile import TS24008_MM from pycrate_mobile import TS24008_CC from pycrate_mobile import TS24011_PPSMS from pycrate_mobile import TS24080_SS # PS domain from pycrate_mobile import TS24008_GMM from pycrate_mobile import TS24008_SM # # to drive LTE UE from pycrate_mobile import TS24301_EMM from pycrate_mobile import TS24301_ESM # # to drive 5G UE from pycrate_mobile import TS24501_FGMM from pycrate_mobile import TS24501_FGSM # from pycrate_mobile import TS24007 from pycrate_mobile import NAS #------------------------------------------------------------------------------# # ASN.1 objects #------------------------------------------------------------------------------# # actually, all ASN.1 modules are in ASN_GLOBAL ASN_GLOBAL = S1AP.GLOBAL.MOD # ASN.1 PDU encoders / decoders PDU_NGAP = NGAP.NGAP_PDU_Descriptions.NGAP_PDU PDU_S1AP = S1AP.S1AP_PDU_Descriptions.S1AP_PDU PDU_HNBAP = HNBAP.HNBAP_PDU_Descriptions.HNBAP_PDU PDU_RUA = RUA.RUA_PDU_Descriptions.RUA_PDU PDU_RANAP = RANAP.RANAP_PDU_Descriptions.RANAP_PDU PDU_SS_Facility = SS.SS_Facility.Facility # ASN.1 modules are not thread-safe # objects' value will be mixed in case a thread ctxt switch occurs between # the fg interpreter and the bg CorenetServer loop, and both accesses the same # ASN.1 modules / objects ASN_READY_NGAP = Event() ASN_READY_S1AP = Event() ASN_READY_HNBAP = Event() ASN_READY_RUA = Event() ASN_READY_RANAP = Event() ASN_READY_NGAP.set() ASN_READY_S1AP.set() ASN_READY_HNBAP.set() ASN_READY_RUA.set() ASN_READY_RANAP.set() ASN_ACQUIRE_TO = 0.01 # in sec def asn_ngap_acquire(): if ASN_READY_NGAP.is_set(): ASN_READY_NGAP.clear() return True else: ready = ASN_READY_NGAP.wait(ASN_ACQUIRE_TO) if not ready: # timeout, module is still locked return False else: ASN_READY_NGAP.clear() return True def asn_ngap_release(): ASN_READY_NGAP.set() def asn_s1ap_acquire(): if ASN_READY_S1AP.is_set(): ASN_READY_S1AP.clear() return True else: ready = ASN_READY_S1AP.wait(ASN_ACQUIRE_TO) if not ready: # timeout, module is still locked return False else: ASN_READY_S1AP.clear() return True def asn_s1ap_release(): ASN_READY_S1AP.set() def asn_hnbap_acquire(): if ASN_READY_HNBAP.is_set(): ASN_READY_HNBAP.clear() return True else: ready = ASN_READY_HNBAP.wait(ASN_ACQUIRE_TO) if not ready: # timeout, module is still locked return False else: ASN_READY_HNBAP.clear() return True def asn_hnbap_release(): ASN_READY_HNBAP.set() def asn_rua_acquire(): if ASN_READY_RUA.is_set(): ASN_READY_RUA.clear() return True else: ready = ASN_READY_RUA.wait(ASN_ACQUIRE_TO) if not ready: # timeout, module is still locked return False else: ASN_READY_RUA.clear() return True def asn_rua_release(): ASN_READY_RUA.set() def asn_ranap_acquire(): if ASN_READY_RANAP.is_set(): ASN_READY_RANAP.clear() return True else: ready = ASN_READY_RANAP.wait(ASN_ACQUIRE_TO) if not ready: # timeout, module is still locked return False else: ASN_READY_RANAP.clear() return True def asn_ranap_release(): ASN_READY_RANAP.set() def decode_ue_rad_cap(buf): UERadCap = RRCLTE.EUTRA_InterNodeDefinitions.UERadioAccessCapabilityInformation try: UERadCap.from_uper(buf) except Exception: return None uecapinfo = {} try: # ue-RadioAccessCapabilityInfo (OCTET STRING) contains UECapabilityInformation (SEQUENCE) radcapinfo = get_val_at(UERadCap, ('criticalExtensions', 'c1', 'ueRadioAccessCapabilityInformation-r8', 'ue-RadioAccessCapabilityInfo', 'UECapabilityInformation', 'criticalExtensions', 'c1', 'ueCapabilityInformation-r8')) except Exception: return UERadCap._val, uecapinfo # decode each ueCapabilityRAT-Container for caprat in radcapinfo['ue-CapabilityRAT-ContainerList']: rattype = caprat['rat-Type'] # eutra, utra, geran-cs, geran-ps, cdma2000-1XRTT if rattype == 'eutra': UEEUTRACap = RRCLTE.EUTRA_RRC_Definitions.UE_EUTRA_Capability try: UEEUTRACap.from_uper(caprat['ueCapabilityRAT-Container']) except Exception: uecapinfo[rattype] = caprat['ueCapabilityRAT-Container'] else: uecapinfo[rattype] = UEEUTRACap._val elif rattype == 'utra': UEUTRACap = RRC3G.PDU_definitions.InterRATHandoverInfo try: UEUTRACap.from_uper(caprat['ueCapabilityRAT-Container']) except Exception: uecapinfo[rattype] = caprat['ueCapabilityRAT-Container'] else: uecapinfo[rattype] = UEUTRACap._val elif rattype == 'geran-cs': m2, m3 = NAS.MSCm2(), NAS.classmark_3_value_part.clone() # MSCm2 || MSCm3 buf = caprat['ueCapabilityRAT-Container'] if buf[0:1] != b'\x33': uecapinfo[rattype] = buf else: m2_len = ord(buf[1:2]) buf_m2 = buf[2:2+m2_len] buf_m3 = buf[2+m2_len:] try: m2.from_bytes(buf_m2) m3.from_bytes(buf_m3) except Exception: uecapinfo[rattype] = caprat['ueCapabilityRAT-Container'] else: uecapinfo[rattype] = (m2, m3) elif rattype == 'geran-ps': mrc = NAS.ms_ra_capability_value_part.clone() try: mrc.from_bytes(caprat['ueCapabilityRAT-Container']) except Exception: uecapinfo[rattype] = caprat['ueCapabilityRAT-Container'] else: uecapinfo[rattype] = mrc else: # TODO: could be cdma2000_1XRTT uecapinfo[rattype] = caprat['ueCapabilityRAT-Container'] return UERadCap._val, uecapinfo # special handling for hiding MeasParameters reported in UERadioCapability def _seq_to_asn1_bypass(): return '{ -- removed for brevity -- }' def _get_mp_from_eutra_cap(): mp_list = [] par = RRCLTE.EUTRA_RRC_Definitions.UE_EUTRA_Capability mp_list.append(par._cont['measParameters']) par = par._cont['nonCriticalExtension']._cont['nonCriticalExtension']._cont['nonCriticalExtension'] mp_list.append(par._cont['measParameters-v1020']) par = par._cont['nonCriticalExtension']._cont['nonCriticalExtension']._cont['nonCriticalExtension'] mp_list.append(par._cont['measParameters-v1130']) par = par._cont['nonCriticalExtension']._cont['nonCriticalExtension']._cont['nonCriticalExtension'] mp_list.append(par._cont['measParameters-v11a0']) par = par._cont['nonCriticalExtension'] mp_list.append(par._cont['measParameters-v1250']) par = par._cont['nonCriticalExtension']._cont['nonCriticalExtension']._cont['nonCriticalExtension']._cont['nonCriticalExtension'] mp_list.append(par._cont['measParameters-v1310']) return mp_list _RRCLTE_MPList = _get_mp_from_eutra_cap() _RRCLTE_MPList_to_asn1 = [mp._to_asn1 for mp in _RRCLTE_MPList] def meas_params_to_asn1_patch(): for mp in _RRCLTE_MPList: mp._to_asn1 = _seq_to_asn1_bypass def meas_params_to_asn1_restore(): for i, mp in enumerate(_RRCLTE_MPList): mp._to_asn1 = _RRCLTE_MPList_to_asn1[i] #------------------------------------------------------------------------------# # logging facilities #------------------------------------------------------------------------------# class CorenetErr(PycrateErr): pass # coloured logs TRACE_COLOR_START = '\x1b[94m' TRACE_COLOR_END = '\x1b[0m' # logging facility def log(msg='', withdate=True, tostdio=False, tofile='/tmp/corenet.log'): if withdate: msg = '[%s] %s\n' % (datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3], msg) else: msg = msg + '\n' if tostdio: print(msg[:-1]) if tofile: fd = open(tofile, 'a') fd.write(msg) fd.close() #------------------------------------------------------------------------------# # threading facilities #------------------------------------------------------------------------------# # thread launcher def threadit(f, *args, **kwargs): t = Thread(target=f, args=args, kwargs=kwargs) t.start() return t #------------------------------------------------------------------------------# # global constants #------------------------------------------------------------------------------# # radio access techno identifiers RAT_GERA = 'GERAN' RAT_UTRA = 'UTRAN' RAT_EUTRA = 'E-UTRAN' RAT_NR = 'NR' # SCTP payload protocol identifiers SCTP_PPID_HNBAP = 20 SCTP_PPID_RUA = 19 SCTP_PPID_S1AP = 18 SCTP_PPID_NGAP = 60 # HNB / ENB protocol identifiers PROTO_HNBAP = 'HNBAP' PROTO_RUA = 'RUA' PROTO_RANAP = 'RANAP' PROTO_S1AP = 'S1AP' PROTO_NGAP = 'NGAP' #------------------------------------------------------------------------------# # built-ins object copy routines #------------------------------------------------------------------------------# def cpdict(d): ret = {} for k in d: if isinstance(d[k], dict): ret[k] = cpdict(d[k]) elif isinstance(d[k], list): ret[k] = cplist(d[k]) else: ret[k] = d[k] return ret def cplist(l): ret = [] for e in l: if isinstance(e, dict): ret.append(cpdict(e)) elif isinstance(e, list): ret.append(cplist(e)) else: ret.append(e) return ret #------------------------------------------------------------------------------# # ASN.1 object handling facilities #------------------------------------------------------------------------------# def print_pduies(desc): for ptype in ('InitiatingMessage', 'Outcome', 'SuccessfulOutcome', 'UnsuccessfulOutcome'): if ptype in desc(): pdu = desc()[ptype] print(ptype + ':') done = False if 'protocolIEs' in pdu._cont: ies = pdu._cont['protocolIEs']._cont._cont['value']._const_tab print(' IEs:') IEs = [] for ident in ies('id'): try: info = ' - %i: %s (%s)'\ % (ident, pythonize_name(ies('id', ident)['Value']._tr._name), ies('id', ident)['presence'][0].upper()) except Exception: info = ' - %i: [%s] (%s)'\ % (ident, ies('id', ident)['Value'].TYPE, ies('id', ident)['presence'][0].upper()) IEs.append((ident, info)) if not IEs: print(' None') else: IEs.sort(key=lambda x:x[0]) print('\n'.join([x[1] for x in IEs])) done = True if 'protocolExtensions' in pdu._cont: ies = pdu._cont['protocolExtensions']._cont._cont['extensionValue']._const_tab print(' Extensions:') Exts = [] for ident in ies('id'): try: info = ' - %i: %s (%s)'\ % (ident, pythonize_name(ies('id', ident)['Extension']._tr._name), ies('id', ident)['presence'][0].upper()) except Exception: info = ' - %i: [%s] (%s)'\ % (ident, pythonize_name(ies('id', ident)['Extension'].TYPE), ies('id', ident)['presence'][0].upper()) Exts.append((ident, info)) if not Exts: print(' None') else: Exts.sort(key=lambda x:x[0]) print('\n'.join([x[1] for x in Exts])) done = True if not done: print(' None') def print_nasies(nasmsg, indent=''): # go after the header (last field: Type), and print IE type, tag if defined, # and name # WNG: Type1V (Uint4), Type2 (Uint8), Type3V(Buf) are not wrapped hdr = nasmsg[0] if 'ProtDisc' in hdr._by_name: pd = hdr['ProtDisc'].get_val() elif 'EPD' in hdr._by_name: pd = hdr['EPD'].get_val() else: pd = hdr[0]['ProtDisc'].get_val() typ = hdr['Type'].get_val() print('%s%s (PD %i, Type %i), IEs:' % (indent, nasmsg._name, pd, typ)) # if len(nasmsg._content) == 1: print('%s None' % indent) else: for ie in nasmsg._content[1:]: if ie.get_trans(): # optional IE print('%s- %-9s : %s (T: %i)'\ % (indent, ie.__class__.__name__, ie._name, ie[0].get_val())) elif isinstance(ie, TS24007.IE): # mandatory IE print('%s- %-9s : %s' % (indent, ie.__class__.__name__, ie._name)) elif ie.get_bl() == 4: # uint / spare bits print('%s- %-9s : %s' % (indent, 'Type1V', 'spare')) else: assert() def print_nasproc_docs(nasproc): msgcn, msgue = nasproc.Cont print('CN message:') if msgcn is None: print(' None') else: for m in msgcn: print_nasies(m(), indent=' ') print(' ') print('UE message:') if msgue is None: print(' None') else: for m in msgue: print_nasies(m(), indent=' ') print(' ')