527 lines
17 KiB
Python
527 lines
17 KiB
Python
# -*- coding: UTF-8 -*-
|
|
#/**
|
|
# * Software Name : pycrate
|
|
# * Version : 0.4
|
|
# *
|
|
# * Copyright 2017. Benoit Michau. ANSSI.
|
|
# * Copyright 2020. Benoit Michau. P1Sec.
|
|
# *
|
|
# * This library is free software; you can redistribute it and/or
|
|
# * modify it under the terms of the GNU Lesser General Public
|
|
# * License as published by the Free Software Foundation; either
|
|
# * version 2.1 of the License, or (at your option) any later version.
|
|
# *
|
|
# * This library is distributed in the hope that it will be useful,
|
|
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
# * Lesser General Public License for more details.
|
|
# *
|
|
# * You should have received a copy of the GNU Lesser General Public
|
|
# * License along with this library; if not, write to the Free Software
|
|
# * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
# * MA 02110-1301 USA
|
|
# *
|
|
# *--------------------------------------------------------
|
|
# * File Name : pycrate_corenet/utils.py
|
|
# * Created : 2017-06-28
|
|
# * Authors : Benoit Michau
|
|
# *--------------------------------------------------------
|
|
#*/
|
|
|
|
# Python built-ins libraries required
|
|
import sys
|
|
import socket
|
|
import random
|
|
import re
|
|
#import traceback
|
|
from select import select
|
|
from threading import Thread, Lock, Event
|
|
from random import SystemRandom, randint
|
|
from time import time, sleep
|
|
from datetime import datetime
|
|
from socket import AF_INET, AF_INET6, AF_PACKET, ntohl, htonl, ntohs, htons
|
|
|
|
# SCTP support for NGAP / S1AP / HNBAP / RUA interfaces
|
|
try:
|
|
import sctp
|
|
except ImportError as err:
|
|
print('pysctp library required for CorenetServer')
|
|
print('check on github: https://github.com/P1sec/pysctp')
|
|
raise(err)
|
|
|
|
# conversion function for security context
|
|
try:
|
|
from CryptoMobile.conv import *
|
|
except ImportError as err:
|
|
print('CryptoMobile library required for CorenetServer')
|
|
print('check on github: https://github.com/P1sec/CryptoMobile')
|
|
raise(err)
|
|
|
|
# all pycrate stuffs
|
|
from pycrate_core.utils import *
|
|
from pycrate_core.repr import *
|
|
from pycrate_core.elt import Element, Envelope
|
|
from pycrate_core.base import Buf, Uint8
|
|
Element._SAFE_STAT = True
|
|
Element._SAFE_DYN = True
|
|
|
|
from pycrate_corenet.utils_fmt import *
|
|
from pycrate_corenet.ProcProto import SigStack, SigProc
|
|
|
|
log('pycrate_corenet: loading all ASN.1 and NAS modules, be patient...')
|
|
# import ASN.1 modules
|
|
# to drive gNodeB and ng-eNodeB
|
|
from pycrate_asn1dir import NGAP
|
|
# to drive eNodeB and Home-eNodeB
|
|
from pycrate_asn1dir import S1AP
|
|
# to drive Home-NodeB
|
|
from pycrate_asn1dir import HNBAP
|
|
from pycrate_asn1dir import RUA
|
|
from pycrate_asn1dir import RANAP
|
|
# to decode UE 3G, LTE and NR radio capability
|
|
from pycrate_asn1dir import RRC3G
|
|
from pycrate_asn1dir import RRCLTE
|
|
from pycrate_asn1dir import RRCNR
|
|
# to handle SS messages
|
|
from pycrate_asn1dir import SS
|
|
#
|
|
from pycrate_asn1rt.utils import get_val_at
|
|
|
|
# to drive 3G UE
|
|
from pycrate_mobile import TS24007
|
|
from pycrate_mobile import TS24008_IE
|
|
# CS domain
|
|
from pycrate_mobile import TS24008_MM
|
|
from pycrate_mobile import TS24008_CC
|
|
from pycrate_mobile import TS24011_PPSMS
|
|
from pycrate_mobile import TS24080_SS
|
|
# PS domain
|
|
from pycrate_mobile import TS24008_GMM
|
|
from pycrate_mobile import TS24008_SM
|
|
#
|
|
# to drive LTE UE
|
|
from pycrate_mobile import TS24301_EMM
|
|
from pycrate_mobile import TS24301_ESM
|
|
#
|
|
# to drive 5G UE
|
|
from pycrate_mobile import TS24501_FGMM
|
|
from pycrate_mobile import TS24501_FGSM
|
|
#
|
|
from pycrate_mobile import TS24007
|
|
from pycrate_mobile import NAS
|
|
|
|
|
|
#------------------------------------------------------------------------------#
|
|
# ASN.1 objects
|
|
#------------------------------------------------------------------------------#
|
|
|
|
# actually, all ASN.1 modules are in ASN_GLOBAL
|
|
ASN_GLOBAL = S1AP.GLOBAL.MOD
|
|
|
|
# ASN.1 PDU encoders / decoders
|
|
PDU_NGAP = NGAP.NGAP_PDU_Descriptions.NGAP_PDU
|
|
PDU_S1AP = S1AP.S1AP_PDU_Descriptions.S1AP_PDU
|
|
PDU_HNBAP = HNBAP.HNBAP_PDU_Descriptions.HNBAP_PDU
|
|
PDU_RUA = RUA.RUA_PDU_Descriptions.RUA_PDU
|
|
PDU_RANAP = RANAP.RANAP_PDU_Descriptions.RANAP_PDU
|
|
PDU_SS_Facility = SS.SS_Facility.Facility
|
|
|
|
# ASN.1 modules are not thread-safe
|
|
# objects' value will be mixed in case a thread ctxt switch occurs between
|
|
# the fg interpreter and the bg CorenetServer loop, and both accesses the same
|
|
# ASN.1 modules / objects
|
|
ASN_READY_NGAP = Event()
|
|
ASN_READY_S1AP = Event()
|
|
ASN_READY_HNBAP = Event()
|
|
ASN_READY_RUA = Event()
|
|
ASN_READY_RANAP = Event()
|
|
ASN_READY_NGAP.set()
|
|
ASN_READY_S1AP.set()
|
|
ASN_READY_HNBAP.set()
|
|
ASN_READY_RUA.set()
|
|
ASN_READY_RANAP.set()
|
|
|
|
ASN_ACQUIRE_TO = 0.01 # in sec
|
|
|
|
def asn_ngap_acquire():
|
|
if ASN_READY_NGAP.is_set():
|
|
ASN_READY_NGAP.clear()
|
|
return True
|
|
else:
|
|
ready = ASN_READY_NGAP.wait(ASN_ACQUIRE_TO)
|
|
if not ready:
|
|
# timeout, module is still locked
|
|
return False
|
|
else:
|
|
ASN_READY_NGAP.clear()
|
|
return True
|
|
|
|
def asn_ngap_release():
|
|
ASN_READY_NGAP.set()
|
|
|
|
def asn_s1ap_acquire():
|
|
if ASN_READY_S1AP.is_set():
|
|
ASN_READY_S1AP.clear()
|
|
return True
|
|
else:
|
|
ready = ASN_READY_S1AP.wait(ASN_ACQUIRE_TO)
|
|
if not ready:
|
|
# timeout, module is still locked
|
|
return False
|
|
else:
|
|
ASN_READY_S1AP.clear()
|
|
return True
|
|
|
|
def asn_s1ap_release():
|
|
ASN_READY_S1AP.set()
|
|
|
|
def asn_hnbap_acquire():
|
|
if ASN_READY_HNBAP.is_set():
|
|
ASN_READY_HNBAP.clear()
|
|
return True
|
|
else:
|
|
ready = ASN_READY_HNBAP.wait(ASN_ACQUIRE_TO)
|
|
if not ready:
|
|
# timeout, module is still locked
|
|
return False
|
|
else:
|
|
ASN_READY_HNBAP.clear()
|
|
return True
|
|
|
|
def asn_hnbap_release():
|
|
ASN_READY_HNBAP.set()
|
|
|
|
def asn_rua_acquire():
|
|
if ASN_READY_RUA.is_set():
|
|
ASN_READY_RUA.clear()
|
|
return True
|
|
else:
|
|
ready = ASN_READY_RUA.wait(ASN_ACQUIRE_TO)
|
|
if not ready:
|
|
# timeout, module is still locked
|
|
return False
|
|
else:
|
|
ASN_READY_RUA.clear()
|
|
return True
|
|
|
|
def asn_rua_release():
|
|
ASN_READY_RUA.set()
|
|
|
|
def asn_ranap_acquire():
|
|
if ASN_READY_RANAP.is_set():
|
|
ASN_READY_RANAP.clear()
|
|
return True
|
|
else:
|
|
ready = ASN_READY_RANAP.wait(ASN_ACQUIRE_TO)
|
|
if not ready:
|
|
# timeout, module is still locked
|
|
return False
|
|
else:
|
|
ASN_READY_RANAP.clear()
|
|
return True
|
|
|
|
def asn_ranap_release():
|
|
ASN_READY_RANAP.set()
|
|
|
|
|
|
def decode_ue_rad_cap(buf):
|
|
UERadCap = RRCLTE.EUTRA_InterNodeDefinitions.UERadioAccessCapabilityInformation
|
|
try:
|
|
UERadCap.from_uper(buf)
|
|
except Exception:
|
|
return None
|
|
uecapinfo = {}
|
|
try:
|
|
# ue-RadioAccessCapabilityInfo (OCTET STRING) contains UECapabilityInformation (SEQUENCE)
|
|
radcapinfo = get_val_at(UERadCap, ('criticalExtensions',
|
|
'c1',
|
|
'ueRadioAccessCapabilityInformation-r8',
|
|
'ue-RadioAccessCapabilityInfo',
|
|
'UECapabilityInformation',
|
|
'criticalExtensions',
|
|
'c1',
|
|
'ueCapabilityInformation-r8'))
|
|
except Exception:
|
|
return UERadCap._val, uecapinfo
|
|
# decode each ueCapabilityRAT-Container
|
|
for caprat in radcapinfo['ue-CapabilityRAT-ContainerList']:
|
|
rattype = caprat['rat-Type'] # eutra, utra, geran-cs, geran-ps, cdma2000-1XRTT
|
|
if rattype == 'eutra':
|
|
UEEUTRACap = RRCLTE.EUTRA_RRC_Definitions.UE_EUTRA_Capability
|
|
try:
|
|
UEEUTRACap.from_uper(caprat['ueCapabilityRAT-Container'])
|
|
except Exception:
|
|
uecapinfo[rattype] = caprat['ueCapabilityRAT-Container']
|
|
else:
|
|
uecapinfo[rattype] = UEEUTRACap._val
|
|
elif rattype == 'utra':
|
|
UEUTRACap = RRC3G.PDU_definitions.InterRATHandoverInfo
|
|
try:
|
|
UEUTRACap.from_uper(caprat['ueCapabilityRAT-Container'])
|
|
except Exception:
|
|
uecapinfo[rattype] = caprat['ueCapabilityRAT-Container']
|
|
else:
|
|
uecapinfo[rattype] = UEUTRACap._val
|
|
elif rattype == 'geran-cs':
|
|
m2, m3 = NAS.MSCm2(), NAS.classmark_3_value_part.clone()
|
|
# MSCm2 || MSCm3
|
|
buf = caprat['ueCapabilityRAT-Container']
|
|
if buf[0:1] != b'\x33':
|
|
uecapinfo[rattype] = buf
|
|
else:
|
|
m2_len = ord(buf[1:2])
|
|
buf_m2 = buf[2:2+m2_len]
|
|
buf_m3 = buf[2+m2_len:]
|
|
try:
|
|
m2.from_bytes(buf_m2)
|
|
m3.from_bytes(buf_m3)
|
|
except Exception:
|
|
uecapinfo[rattype] = caprat['ueCapabilityRAT-Container']
|
|
else:
|
|
uecapinfo[rattype] = (m2, m3)
|
|
elif rattype == 'geran-ps':
|
|
mrc = NAS.ms_ra_capability_value_part.clone()
|
|
try:
|
|
mrc.from_bytes(caprat['ueCapabilityRAT-Container'])
|
|
except Exception:
|
|
uecapinfo[rattype] = caprat['ueCapabilityRAT-Container']
|
|
else:
|
|
uecapinfo[rattype] = mrc
|
|
else:
|
|
# TODO: could be cdma2000_1XRTT
|
|
uecapinfo[rattype] = caprat['ueCapabilityRAT-Container']
|
|
return UERadCap._val, uecapinfo
|
|
|
|
|
|
# special handling for hiding MeasParameters reported in UERadioCapability
|
|
|
|
def _seq_to_asn1_bypass():
|
|
return '{ -- removed for brevity -- }'
|
|
|
|
|
|
def _get_mp_from_eutra_cap():
|
|
mp_list = []
|
|
par = RRCLTE.EUTRA_RRC_Definitions.UE_EUTRA_Capability
|
|
mp_list.append(par._cont['measParameters'])
|
|
par = par._cont['nonCriticalExtension']._cont['nonCriticalExtension']._cont['nonCriticalExtension']
|
|
mp_list.append(par._cont['measParameters-v1020'])
|
|
par = par._cont['nonCriticalExtension']._cont['nonCriticalExtension']._cont['nonCriticalExtension']
|
|
mp_list.append(par._cont['measParameters-v1130'])
|
|
par = par._cont['nonCriticalExtension']._cont['nonCriticalExtension']._cont['nonCriticalExtension']
|
|
mp_list.append(par._cont['measParameters-v11a0'])
|
|
par = par._cont['nonCriticalExtension']
|
|
mp_list.append(par._cont['measParameters-v1250'])
|
|
par = par._cont['nonCriticalExtension']._cont['nonCriticalExtension']._cont['nonCriticalExtension']._cont['nonCriticalExtension']
|
|
mp_list.append(par._cont['measParameters-v1310'])
|
|
return mp_list
|
|
|
|
|
|
_RRCLTE_MPList = _get_mp_from_eutra_cap()
|
|
_RRCLTE_MPList_to_asn1 = [mp._to_asn1 for mp in _RRCLTE_MPList]
|
|
|
|
|
|
def meas_params_to_asn1_patch():
|
|
for mp in _RRCLTE_MPList:
|
|
mp._to_asn1 = _seq_to_asn1_bypass
|
|
|
|
|
|
def meas_params_to_asn1_restore():
|
|
for i, mp in enumerate(_RRCLTE_MPList):
|
|
mp._to_asn1 = _RRCLTE_MPList_to_asn1[i]
|
|
|
|
|
|
#------------------------------------------------------------------------------#
|
|
# logging facilities
|
|
#------------------------------------------------------------------------------#
|
|
|
|
class CorenetErr(PycrateErr):
|
|
pass
|
|
|
|
# coloured logs
|
|
TRACE_COLOR_START = '\x1b[94m'
|
|
TRACE_COLOR_END = '\x1b[0m'
|
|
|
|
# logging facility
|
|
def log(msg='', withdate=True, tostdio=False, tofile='/tmp/corenet.log'):
|
|
if withdate:
|
|
msg = '[%s] %s\n' % (datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3], msg)
|
|
else:
|
|
msg = msg + '\n'
|
|
if tostdio:
|
|
print(msg[:-1])
|
|
if tofile:
|
|
fd = open(tofile, 'a')
|
|
fd.write(msg)
|
|
fd.close()
|
|
|
|
|
|
#------------------------------------------------------------------------------#
|
|
# threading facilities
|
|
#------------------------------------------------------------------------------#
|
|
|
|
# thread launcher
|
|
def threadit(f, *args, **kwargs):
|
|
t = Thread(target=f, args=args, kwargs=kwargs)
|
|
t.start()
|
|
return t
|
|
|
|
|
|
#------------------------------------------------------------------------------#
|
|
# global constants
|
|
#------------------------------------------------------------------------------#
|
|
|
|
# radio access techno identifiers
|
|
RAT_GERA = 'GERAN'
|
|
RAT_UTRA = 'UTRAN'
|
|
RAT_EUTRA = 'E-UTRAN'
|
|
RAT_NR = 'NR'
|
|
|
|
# SCTP payload protocol identifiers
|
|
SCTP_PPID_HNBAP = 20
|
|
SCTP_PPID_RUA = 19
|
|
SCTP_PPID_S1AP = 18
|
|
SCTP_PPID_NGAP = 60
|
|
|
|
# HNB / ENB protocol identifiers
|
|
PROTO_HNBAP = 'HNBAP'
|
|
PROTO_RUA = 'RUA'
|
|
PROTO_RANAP = 'RANAP'
|
|
PROTO_S1AP = 'S1AP'
|
|
PROTO_NGAP = 'NGAP'
|
|
|
|
|
|
#------------------------------------------------------------------------------#
|
|
# built-ins object copy routines
|
|
#------------------------------------------------------------------------------#
|
|
|
|
def cpdict(d):
|
|
ret = {}
|
|
for k in d:
|
|
if isinstance(d[k], dict):
|
|
ret[k] = cpdict(d[k])
|
|
elif isinstance(d[k], list):
|
|
ret[k] = cplist(d[k])
|
|
else:
|
|
ret[k] = d[k]
|
|
return ret
|
|
|
|
def cplist(l):
|
|
ret = []
|
|
for e in l:
|
|
if isinstance(e, dict):
|
|
ret.append(cpdict(e))
|
|
elif isinstance(e, list):
|
|
ret.append(cplist(e))
|
|
else:
|
|
ret.append(e)
|
|
return ret
|
|
|
|
|
|
#------------------------------------------------------------------------------#
|
|
# ASN.1 object handling facilities
|
|
#------------------------------------------------------------------------------#
|
|
|
|
def print_pduies(desc):
|
|
for ptype in ('InitiatingMessage', 'Outcome', 'SuccessfulOutcome', 'UnsuccessfulOutcome'):
|
|
if ptype in desc():
|
|
pdu = desc()[ptype]
|
|
print(ptype + ':')
|
|
done = False
|
|
if 'protocolIEs' in pdu._cont:
|
|
ies = pdu._cont['protocolIEs']._cont._cont['value']._const_tab
|
|
print(' IEs:')
|
|
IEs = []
|
|
for ident in ies('id'):
|
|
try:
|
|
info = ' - %i: %s (%s)'\
|
|
% (ident,
|
|
pythonize_name(ies('id', ident)['Value']._tr._name),
|
|
ies('id', ident)['presence'][0].upper())
|
|
except Exception:
|
|
info = ' - %i: [%s] (%s)'\
|
|
% (ident,
|
|
ies('id', ident)['Value'].TYPE,
|
|
ies('id', ident)['presence'][0].upper())
|
|
IEs.append((ident, info))
|
|
if not IEs:
|
|
print(' None')
|
|
else:
|
|
IEs.sort(key=lambda x:x[0])
|
|
print('\n'.join([x[1] for x in IEs]))
|
|
done = True
|
|
if 'protocolExtensions' in pdu._cont:
|
|
ies = pdu._cont['protocolExtensions']._cont._cont['extensionValue']._const_tab
|
|
print(' Extensions:')
|
|
Exts = []
|
|
for ident in ies('id'):
|
|
try:
|
|
info = ' - %i: %s (%s)'\
|
|
% (ident,
|
|
pythonize_name(ies('id', ident)['Extension']._tr._name),
|
|
ies('id', ident)['presence'][0].upper())
|
|
except Exception:
|
|
info = ' - %i: [%s] (%s)'\
|
|
% (ident,
|
|
pythonize_name(ies('id', ident)['Extension'].TYPE),
|
|
ies('id', ident)['presence'][0].upper())
|
|
Exts.append((ident, info))
|
|
if not Exts:
|
|
print(' None')
|
|
else:
|
|
Exts.sort(key=lambda x:x[0])
|
|
print('\n'.join([x[1] for x in Exts]))
|
|
done = True
|
|
if not done:
|
|
print(' None')
|
|
|
|
|
|
def print_nasies(nasmsg, indent=''):
|
|
# go after the header (last field: Type), and print IE type, tag if defined,
|
|
# and name
|
|
# WNG: Type1V (Uint4), Type2 (Uint8), Type3V(Buf) are not wrapped
|
|
hdr = nasmsg[0]
|
|
if 'ProtDisc' in hdr._by_name:
|
|
pd = hdr['ProtDisc'].get_val()
|
|
elif 'EPD' in hdr._by_name:
|
|
pd = hdr['EPD'].get_val()
|
|
else:
|
|
pd = hdr[0]['ProtDisc'].get_val()
|
|
typ = hdr['Type'].get_val()
|
|
print('%s%s (PD %i, Type %i), IEs:' % (indent, nasmsg._name, pd, typ))
|
|
#
|
|
if len(nasmsg._content) == 1:
|
|
print('%s None' % indent)
|
|
else:
|
|
for ie in nasmsg._content[1:]:
|
|
if ie.get_trans():
|
|
# optional IE
|
|
print('%s- %-9s : %s (T: %i)'\
|
|
% (indent, ie.__class__.__name__, ie._name, ie[0].get_val()))
|
|
elif isinstance(ie, TS24007.IE):
|
|
# mandatory IE
|
|
print('%s- %-9s : %s' % (indent, ie.__class__.__name__, ie._name))
|
|
elif ie.get_bl() == 4:
|
|
# uint / spare bits
|
|
print('%s- %-9s : %s' % (indent, 'Type1V', 'spare'))
|
|
else:
|
|
assert()
|
|
|
|
|
|
def print_nasproc_docs(nasproc):
|
|
msgcn, msgue = nasproc.Cont
|
|
print('CN message:')
|
|
if msgcn is None:
|
|
print(' None')
|
|
else:
|
|
for m in msgcn:
|
|
print_nasies(m(), indent=' ')
|
|
print(' ')
|
|
print('UE message:')
|
|
if msgue is None:
|
|
print(' None')
|
|
else:
|
|
for m in msgue:
|
|
print_nasies(m(), indent=' ')
|
|
print(' ')
|
|
|