981 lines
34 KiB
Python
981 lines
34 KiB
Python
# -*- coding: UTF-8 -*-
|
|
#/**
|
|
# * Software Name : pycrate
|
|
# * Version : 0.4
|
|
# *
|
|
# * Copyright 2020. Benoit Michau. P1Sec.
|
|
# *
|
|
# * This library is free software; you can redistribute it and/or
|
|
# * modify it under the terms of the GNU Lesser General Public
|
|
# * License as published by the Free Software Foundation; either
|
|
# * version 2.1 of the License, or (at your option) any later version.
|
|
# *
|
|
# * This library is distributed in the hope that it will be useful,
|
|
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
# * Lesser General Public License for more details.
|
|
# *
|
|
# * You should have received a copy of the GNU Lesser General Public
|
|
# * License along with this library; if not, write to the Free Software
|
|
# * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
# * MA 02110-1301 USA
|
|
# *
|
|
# *--------------------------------------------------------
|
|
# * File Name : pycrate_corenet/HdlrUENG.py
|
|
# * Created : 2020-04-29
|
|
# * Authors : Benoit Michau
|
|
# *--------------------------------------------------------
|
|
#*/
|
|
|
|
from .utils import *
|
|
from .ProcCNNgap import *
|
|
from .ProcCNFGMM import *
|
|
from .ProcCNFGSM import *
|
|
from .HdlrUESMS import *
|
|
|
|
from pycrate_mobile.TS24501_IE import (
|
|
FGSIDTYPE_NO, # 0
|
|
FGSIDTYPE_SUPI,
|
|
FGSIDTYPE_GUTI,
|
|
FGSIDTYPE_IMEI,
|
|
FGSIDTYPE_STMSI,
|
|
FGSIDTYPE_IMEISV,
|
|
FGSIDTYPE_MAC,
|
|
FGSIDTYPE_EUI64, # 7
|
|
FGSIDFMT_IMSI, # 0
|
|
FGSIDFMT_NSI,
|
|
FGSIDFMT_GCI,
|
|
FGSIDFMT_GLI, # 3
|
|
UESecCap as FGSUESecCap
|
|
)
|
|
|
|
|
|
class UEFGMMd(SigStack):
|
|
"""UE 5GMM handler within a UENGd instance
|
|
responsible for 5G Mobility Management signalling procedures
|
|
"""
|
|
|
|
TRACK_PROC = True
|
|
|
|
# reference to the UEd
|
|
UE = None
|
|
# reference to the UENGd
|
|
NG = None
|
|
|
|
# state: DEREGISTERED (cannot be paged) <-> CONNECTED <-> IDLE
|
|
state = 'DEREGISTERED'
|
|
|
|
# to bypass the process() server loop with a custom NAS PDU handler
|
|
RX_HOOK = None
|
|
|
|
# additional time for letting background task happen in priority
|
|
_WAIT_ADD = 0.005
|
|
|
|
|
|
#--------------------------------------------------------------------------#
|
|
# FGMMAuthentication policy
|
|
#--------------------------------------------------------------------------#
|
|
# this will systematically bypass all authentication procedures
|
|
AUTH_DISABLED = False
|
|
# 5GMM procedure timer for auth and smc
|
|
T3560 = 2
|
|
# Authentication Management Field
|
|
AUTH_AMF = b'\x80\x00'
|
|
# Authentication ABBA
|
|
AUTH_ABBA = b'\x00\x00'
|
|
# if AUTH_PLMN is not None, it will be used for building the 5G auth vector
|
|
# otherwise the main Corenet PLMN will be used
|
|
AUTH_PLMN = None
|
|
# this is to force a 2G or 3G authentication instead of a 5G one
|
|
AUTH_2G = False
|
|
AUTH_3G = False
|
|
# this is to extend AUTN with arbitrary data
|
|
AUTH_AUTN_EXT = None
|
|
#
|
|
# re-authentication policy:
|
|
# this forces an auth procedure every X 5GMM Reg / Service / Detach procedures
|
|
# even if a valid KSI is provided by the UE
|
|
AUTH_REG = 1
|
|
AUTH_SER = 3
|
|
AUTH_DET = 1 # only applied to Detach without UE power off
|
|
|
|
|
|
|
|
|
|
#--------------------------------------------------------------------------#
|
|
# FGMM timers
|
|
#--------------------------------------------------------------------------#
|
|
|
|
# MT Deregistration
|
|
T3522 = 1
|
|
# Registration
|
|
T3550 = 1
|
|
# UE Config Update
|
|
T3555 = 2
|
|
# AKA, SMC
|
|
T3560 = 2
|
|
# Identification
|
|
T3570 = 1
|
|
# NSSAI Auth
|
|
T3575 = 2
|
|
|
|
|
|
|
|
|
|
def _log(self, logtype, msg):
|
|
self.NG._log(logtype, '[5GMM] %s' % msg)
|
|
|
|
def __init__(self, ued, uengd):
|
|
self.UE = ued
|
|
self.set_ng(uengd)
|
|
#
|
|
# ready event, used by foreground tasks (network / interpreter initiated)
|
|
self.ready = Event()
|
|
self.ready.set()
|
|
# stack of ongoing 5GMM procedures (i.e. common procedures can be run
|
|
# within specific procedure)
|
|
self.Proc = []
|
|
# list of tracked procedures (requires TRACK_PROC = True)
|
|
self._proc = []
|
|
|
|
def set_ng(self, uengd):
|
|
self.NG = uengd
|
|
|
|
def process(self, NasRx):
|
|
"""process a NAS 5GMM message (NasRx) sent by the UE,
|
|
and return a list (possibly empty) of NGAP procedure(s) to be sent back
|
|
to the gNB
|
|
"""
|
|
# TODO
|
|
return []
|
|
|
|
def init_proc(self, ProcClass, encod=None, fgmm_preempt=False):
|
|
"""initialize a CN-initiated 5GMM procedure of class `ProcClass' and
|
|
given encoder(s), and return the procedure
|
|
"""
|
|
# TODO
|
|
pass
|
|
|
|
def clear(self):
|
|
"""abort all running procedures
|
|
"""
|
|
for Proc in self.Proc[::-1]:
|
|
Proc.abort()
|
|
|
|
#--------------------------------------------------------------------------#
|
|
# SMC and security-related methods
|
|
#--------------------------------------------------------------------------#
|
|
|
|
def require_auth(self, Proc, ksi=None):
|
|
return True
|
|
|
|
def require_smc(self, Proc):
|
|
return True
|
|
|
|
def get_new_ksi(self):
|
|
for i in range(0, 7):
|
|
if i not in self.NG.SEC:
|
|
return i
|
|
# all native KSI have been used, clear all of them except the current one
|
|
# if defined
|
|
cur = self.NG.SEC['KSI']
|
|
for i in range(0, 7):
|
|
if i != cur:
|
|
del self.NG.SEC[i]
|
|
if cur == 0:
|
|
return 1
|
|
else:
|
|
return 0
|
|
|
|
def set_sec_ctx(self, ksi, ctx, vect, snid):
|
|
ksi = (ksi[0]<<3) + ksi[1]
|
|
if ctx == 2:
|
|
# WNG: this is undefined / illegal and won't work (hopefully)
|
|
CK, IK = conv_102_C4(vect[2]), conv_102_C5(vect[2])
|
|
if self.AUTH_PLMN:
|
|
snid = make_5g_snn(self.AUTH_PLMN)
|
|
else:
|
|
snid = make_5g_snn(self.UE.Server.PLMN)
|
|
Kausf = conv_501_A2(CK, IK, sn_name, sqnak)
|
|
secctx = {'VEC' : vect,
|
|
'CTX' : ctx,
|
|
'Kc' : vect[2],
|
|
'CK' : CK,
|
|
'IK' : IK,
|
|
'Kausf': Kausf}
|
|
elif ctx == 3:
|
|
# WNG: this is also undefined and shouldn't work
|
|
if self.AUTH_PLMN:
|
|
snid = make_5g_snn(self.FGMM.AUTH_PLMN)
|
|
else:
|
|
snid = make_5g_snn(self.UE.Server.PLMN)
|
|
Kausf = conv_(vect[3], vect[4], snid, vect[2][:6])
|
|
secctx = {'VEC' : vect,
|
|
'CTX' : ctx,
|
|
'CK' : vect[3],
|
|
'IK' : vect[4],
|
|
'Kausf': Kausf}
|
|
else:
|
|
# ctx == 5
|
|
secctx = {'VEC' : vect,
|
|
'CTX' : ctx,
|
|
'Kausf': vect[3]}
|
|
#
|
|
secctx['Kseaf'] = conv_501_A6(kausf, snid)
|
|
secctx['Kamf'] = conv_501_A7(kseaf, self.UE.IMSI, self.AUTH_ABBA)
|
|
secctx['UL'], secctx['DL'] = 0, 0
|
|
# TODO: check if a custom UL counter is still required for gNB key derivation
|
|
#secctx['UL_gnb'] = 0
|
|
self.NG.SEC[ksi] = secctx
|
|
self.NG.SEC['KSI'] = ksi
|
|
|
|
|
|
#--------------------------------------------------------------------------#
|
|
# network-initiated method (fg task, to be used from the interpreter)
|
|
#--------------------------------------------------------------------------#
|
|
|
|
def _net_init_con(self):
|
|
if not self.NG.page_block():
|
|
return False
|
|
# need to wait for potential 5GMM serving / common procedures to happen and end
|
|
sleep(self._WAIT_ADD)
|
|
if not self.ready.wait(5):
|
|
# something is blocking in the serving / common procedures
|
|
return False
|
|
elif not self.NG.connected.is_set():
|
|
# something went wrong during the serving / common procedures
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
def run_proc(self, ProcClass, **IEs):
|
|
"""run a network-initiated procedure ProcClass in the context of the 5GMM stack,
|
|
after setting the given IEs in the NAS message to be sent to the UE
|
|
|
|
returns a 2-tuple (success, proc)
|
|
success is a bool
|
|
proc is the instance of ProcClass or None
|
|
"""
|
|
if ProcClass.Init is None:
|
|
self._log('ERR', 'invalid network-initiated procedure %s' % ProcClass.Name)
|
|
return False, None
|
|
if not self._net_init_con():
|
|
return False, None
|
|
#
|
|
Proc = self.init_proc(ProcClass, encod={ProcClass.Init: IEs}, fgmm_preempt=True)
|
|
try:
|
|
NgapTxProc = Proc.output()
|
|
except Exception:
|
|
self._log('ERR', 'invalid IEs for network-initiated procedure %s' % Proc.Name)
|
|
Proc.abort()
|
|
return False, Proc
|
|
if not self.NG.transmit_ngap_proc(NgapTxProc):
|
|
return False, Proc
|
|
#
|
|
# check if a response is expected
|
|
if not hasattr(Proc, 'TimerValue'):
|
|
return True, Proc
|
|
elif not self.ready.wait(Proc.TimerValue + self._WAIT_ADD):
|
|
# procedure is stuck, will be aborted in the server loop
|
|
# WNG: this means the routine for cleaning NAS procedures in timeout
|
|
# should be enabled in CorenetServer
|
|
return False, Proc
|
|
#
|
|
# check if a response was received
|
|
if hasattr(Proc, 'UEInfo'):
|
|
return True, Proc
|
|
else:
|
|
return False, Proc
|
|
|
|
|
|
|
|
|
|
class UEFGSMd(SigStack):
|
|
"""UE 5GSM handler within a UENGd instance
|
|
responsible for 5G Session Management signalling procedures
|
|
"""
|
|
|
|
TRACK_PROC = True
|
|
|
|
# reference to the UEd
|
|
UE = None
|
|
# reference to the UENGd
|
|
NG = None
|
|
|
|
# to bypass the process() server loop with a custom NAS PDU handler
|
|
RX_HOOK = None
|
|
|
|
|
|
def _log(self, logtype, msg):
|
|
self.NG._log(logtype, '[5GSM] %s' % msg)
|
|
|
|
def __init__(self, ued, uengd):
|
|
self.UE = ued
|
|
self.set_ng(uengd)
|
|
#
|
|
# dict of ongoing 5GSM procedures, indexed by 5GS bearer ID
|
|
self.Proc = {i: [] for i in range(16)}
|
|
# dict of configured PDU, indexed by 5GS bearer ID
|
|
self.PDU = {}
|
|
# dict of ongoing 5GSM transactions IEs
|
|
self.Trans = {}
|
|
# list of tracked procedures (requires TRACK_PROC = True)
|
|
self._proc = []
|
|
|
|
def set_ng(self, uengd):
|
|
self.NG = uengd
|
|
|
|
def process(self, NasRx, FGMMProc=None):
|
|
"""process a NAS 5GSM message (NasRx) sent by the UE,
|
|
and return a list (possibly empty) of NGAP procedure(s) to be sent back
|
|
to the gNB
|
|
|
|
FGMMProc [FMMSigProc or None], indicates if the NAS FGSM message is handled in
|
|
the context of an FGMM procedure
|
|
"""
|
|
# TODO
|
|
return []
|
|
|
|
|
|
def init_proc(self, ProcClass, **kw):
|
|
"""initialize a CN-initiated 5GSM procedure of class `ProcClass' and
|
|
given encoder(s), and return the procedure
|
|
"""
|
|
# TODO
|
|
pass
|
|
|
|
def clear(self, ebi=None):
|
|
"""abort all running procedures, eventually for a single 5GS Bearer ID
|
|
"""
|
|
pass
|
|
|
|
|
|
#--------------------------------------------------------------------------#
|
|
# transaction processing
|
|
#--------------------------------------------------------------------------#
|
|
|
|
def process_trans(self, trans_id):
|
|
"""process a 5GSM transaction initiated by the UE, and return a network-initiated
|
|
procedure with IEs configured and None, or None and the 5GSM error code
|
|
"""
|
|
pass
|
|
|
|
|
|
#--------------------------------------------------------------------------#
|
|
# protocol configuration processing
|
|
#--------------------------------------------------------------------------#
|
|
|
|
|
|
|
|
|
|
class UENGd(SigStack):
|
|
"""UE NG handler within a CorenetServer instance
|
|
responsible for UE-associated NGAP signalling
|
|
"""
|
|
|
|
# to keep track of all NGAP procedures
|
|
TRACK_PROC = True
|
|
|
|
# domain
|
|
DOM = '5GS'
|
|
|
|
# reference to the UEd
|
|
UE = None
|
|
# reference to the GNBd, SCTP stream id
|
|
GNB = None
|
|
SID = None
|
|
|
|
# to bypass the process_nas() server loop with a custom NAS PDU handler
|
|
RX_HOOK = None
|
|
|
|
# for pure NGAP procedure (no NAS trafic, neither PDU-oriented stuff)
|
|
# should we page the UE to run the procedure successfully when UE is idle
|
|
NGAP_FORCE_PAGE = False
|
|
|
|
#--------------------------------------------------------------------------#
|
|
# global security policy
|
|
#--------------------------------------------------------------------------#
|
|
#
|
|
# 1) NAS Rx path
|
|
#
|
|
# IEs allowed in clear-text initial NAS messages
|
|
SECNAS_RX_CT_IES = {
|
|
# Registration Req
|
|
65 : {
|
|
'NAS_KSI',
|
|
'5GSRegType',
|
|
'5GSID', # needs to be a temp id or SUCI
|
|
'UESecCap',
|
|
'UEStatus',
|
|
'AddGUTI',
|
|
'EPSNASContainer',
|
|
'NASContainer',
|
|
},
|
|
# Service Req
|
|
76 : {
|
|
'ServiceType',
|
|
'NAS_KSI',
|
|
'5GSID', # needs to be a temp id
|
|
'NASContainer',
|
|
},
|
|
# Ctrl Plane Service Req
|
|
79 : {
|
|
'NAS_KSI',
|
|
'CtrlPlaneServiceType',
|
|
'NASContainer',
|
|
}
|
|
}
|
|
#
|
|
# Identity type allowed in clear-text IdentityResponse
|
|
SECNAS_RX_CT_IDTYPE = {
|
|
FGSIDTYPE_NO,
|
|
FGSIDTYPE_SUPI,
|
|
#FGSIDTYPE_GUTI,
|
|
#FGSIDTYPE_STMSI
|
|
}
|
|
#
|
|
# dropping invalid Rx message is the default behaviour
|
|
SECNAS_RX_DROP_INVAL = True
|
|
|
|
'''
|
|
# this will systematically bypass all auth and smc procedures,
|
|
# NAS MAC and UL count verification in the uplink
|
|
# and setting of the 5GMM security header (and encryption) in the downlink
|
|
SECNAS_DISABLED = False
|
|
#
|
|
# finer grained NAS security checks:
|
|
# True to drop NAS PDU when NAS MAC verification fails
|
|
SECNAS_UL_MAC = False
|
|
# True to drop NAS PDU when NAS UL count verification fails
|
|
SECNAS_UL_CNT = False
|
|
# WNG: 5GMM and 5GSM stacks have further control on accepting or not certain
|
|
# NAS message even if security control have failed
|
|
#
|
|
# this will disable the setting of the 5GMM security header (and encryption)
|
|
# in the downlink for given NAS message (by name)
|
|
SECNAS_PDU_NOSEC = set()
|
|
'''
|
|
#
|
|
# format of the security context dict self.SEC:
|
|
# self.SEC is a dict of available 5G security contexts indexed by KSI,
|
|
# and current KSI in use
|
|
#
|
|
# when self.SEC['KSI'] is not None, the context is enabled at the NAS level, e.g.
|
|
# self.SEC = {'KSI': 0,
|
|
# 0: {'RAND': b'...', 'RES': b'...', 'AUTN': b'...', 'CK': b'...', 'IK': b'...',
|
|
# 'SNName': b'...', 'ABBA': b'...', 'RESstar': b'...',
|
|
# 'Kausf': b'...', 'Kseaf': b'...', 'Kamf': b'...',
|
|
# 'Knasenc': b'...', 'Knasint': b'...',
|
|
# 'UL': 0, 'DL': 0, 'NASEA': 0, 'NASIA': 0,
|
|
# 'Kgnb': b'...'},
|
|
# ...,
|
|
# 'POL': {'REG': 0, 'SER': 0, 'DER': 0}}
|
|
#
|
|
# The POL dict indicates the authentication policy for each procedure
|
|
|
|
|
|
#--------------------------------------------------------------------------#
|
|
# NGAPPaging policy
|
|
#--------------------------------------------------------------------------#
|
|
# page_block() parameters:
|
|
# number of retries when not successful
|
|
PAG_RETR = 2
|
|
# timer in sec between retries
|
|
PAG_WAIT = 2
|
|
|
|
#--------------------------------------------------------------------------#
|
|
# NGAPInitialContextSetup policy
|
|
#--------------------------------------------------------------------------#
|
|
#
|
|
|
|
#--------------------------------------------------------------------------#
|
|
# NGAPTraceStart policy
|
|
#--------------------------------------------------------------------------#
|
|
#
|
|
|
|
|
|
def _log(self, logtype, msg):
|
|
self.UE._log(logtype, '[UENGd: %3i] %s' % (self.CtxId, msg))
|
|
|
|
def __init__(self, ued, gnbd=None, ctx_id=-1, sid=None):
|
|
self.UE = ued
|
|
self.Server = ued.Server
|
|
self.Config = self.Server.ConfigNG
|
|
#
|
|
# dict of ongoing NGAP procedures (indexed by their procedure code)
|
|
self.Proc = {}
|
|
# list of tracked procedures (requires TRACK_PROC = True)
|
|
self._proc = []
|
|
#
|
|
# dict of available 5G security contexts, indexed by KSI
|
|
# and current KSI in use
|
|
self.SEC = {}
|
|
self.reset_sec_ctx()
|
|
#
|
|
# state for NG / radio connection: set with InitialUEMessage, unset with UEContextRelease
|
|
self.connected = Event()
|
|
# state for processing the initial NAS message: unset after InitialUEMessage processed
|
|
self.nasinit = Event()
|
|
if gnbd is not None:
|
|
self.set_ran(gnbd)
|
|
else:
|
|
self.CtxId = -1
|
|
#
|
|
# init 5GMM and 5GSM sig stacks
|
|
self.FGMM = UEFGMMd(ued, self)
|
|
self.FGSM = UEFGSMd(ued, self)
|
|
self.SMS = UESMSd(ued, self)
|
|
|
|
def set_ran(self, gnbd):
|
|
self.SEC['KSI'] = None
|
|
self.GNB = gnbd
|
|
self.connected.set()
|
|
self.nasinit.set()
|
|
|
|
def unset_ran(self):
|
|
self.GNB.unset_ue_ng(self.CtxId)
|
|
del self.GNB
|
|
self.SEC['KSI'] = None
|
|
self.clear()
|
|
self.connected.clear()
|
|
|
|
def set_ran_unconnected(self, gnbd):
|
|
# required for paging
|
|
self.SEC['KSI'] = None
|
|
self.GNB = gnbd
|
|
|
|
def unset_ran_unconnected(self):
|
|
# required for paging
|
|
del self.GNB
|
|
self.SEC['KSI'] = None
|
|
|
|
def is_connected(self):
|
|
return self.connected.is_set()
|
|
|
|
def set_ctx(self, ctx_id, sid):
|
|
self.CtxId = ctx_id
|
|
self.SID = sid
|
|
|
|
def unset_ctx(self):
|
|
self.CtxId = -1
|
|
del self.SID
|
|
|
|
def reset_sec_ctx(self):
|
|
self.SEC.clear()
|
|
self.SEC['KSI'] = None
|
|
self.SEC['POL'] = {'REG': 0, 'DET': 0, 'SER': 0}
|
|
if 'UESecCap' in self.UE.Cap:
|
|
del self.UE.Cap['UESecCap']
|
|
|
|
def get_sec_ctx(self):
|
|
return self.SEC.get(self.SEC['KSI'], None)
|
|
|
|
#--------------------------------------------------------------------------#
|
|
# handling of NGAP procedures
|
|
#--------------------------------------------------------------------------#
|
|
|
|
def process_ngap_pdu(self, pdu_rx):
|
|
"""process a NGAP PDU sent by the gNB for UE-associated signalling
|
|
and return a list of NGAP PDU(s) to be sent back to it
|
|
"""
|
|
errcause = None
|
|
if pdu_rx[0] == 'initiatingMessage':
|
|
# gNB-initiated procedure, instantiate it
|
|
try:
|
|
Proc = NGAPProcRANDispatcher[pdu_rx[1]['procedureCode']](self)
|
|
except Exception:
|
|
self._log('ERR', 'invalid NGAP PDU, initiatingMessage, code %i'\
|
|
% pdu_rx[1]['procedureCode'])
|
|
errcause = ('protocol', 'abstract-syntax-error-reject')
|
|
Proc = self.init_ngap_proc(NGAPErrorIndCN, Cause=errcause)
|
|
if not Proc:
|
|
return []
|
|
else:
|
|
if self.TRACK_PROC:
|
|
self._proc.append( Proc )
|
|
# process the PDU within the procedure
|
|
Proc.recv( pdu_rx )
|
|
if Proc.Class == 2 and Proc.errcause:
|
|
Err = self.init_ngap_proc(NGAPErrorIndCN, Cause=Proc.errcause)
|
|
if not Err:
|
|
return []
|
|
self.ProcLast = Err.Code
|
|
return Err.send()
|
|
elif Proc.Class == 1 or errcause:
|
|
self.ProcLast = Proc.Code
|
|
return Proc.send()
|
|
else:
|
|
pdu_tx = []
|
|
for ProcRet in Proc.trigger():
|
|
pdu_tx.extend( ProcRet.send() )
|
|
self.ProcLast = ProcRet.Code
|
|
return pdu_tx
|
|
#
|
|
else:
|
|
# CN-initiated procedure, transfer the PDU to it
|
|
try:
|
|
Proc = self.Proc[pdu_rx[1]['procedureCode']]
|
|
except Exception:
|
|
self._log('ERR', 'invalid NGAP PDU, %s, code %i'\
|
|
% (pdu_rx[0], pdu_rx[1]['procedureCode']))
|
|
errcause = ('protocol', 'message-not-compatible-with-receiver-state')
|
|
Proc = self.init_ngap_proc(NGAPErrorIndCN, Cause=errcause)
|
|
if not Proc:
|
|
return []
|
|
# process the PDU within the procedure
|
|
Proc.recv( pdu_rx )
|
|
if Proc.errcause:
|
|
Err = self.init_ngap_proc(NGAPErrorIndCN, Cause=Proc.errcause)
|
|
if not Err:
|
|
return []
|
|
self.ProcLast = Err.Code
|
|
return Err.send()
|
|
elif errcause:
|
|
self.ProcLast = Proc.Code
|
|
return Proc.send()
|
|
else:
|
|
pdu_tx = []
|
|
for ProcRet in Proc.trigger():
|
|
pdu_tx.extend( ProcRet.send() )
|
|
self.ProcLast = ProcRet.Code
|
|
return pdu_tx
|
|
|
|
def init_ngap_proc(self, ProcClass, **IEs):
|
|
"""initialize a CN-initiated NGAP procedure of class `ProcClass' for
|
|
UE-associated signalling, encode the initiatingMessage PDU with given
|
|
**IEs and return the procedure
|
|
"""
|
|
Proc = self._init_ngap_proc(ProcClass)
|
|
if not Proc:
|
|
return None
|
|
else:
|
|
self._encode_ngap_proc(Proc, **IEs)
|
|
return Proc
|
|
|
|
def _init_ngap_proc(self, ProcClass):
|
|
if not issubclass(ProcClass, NGAPSigProc):
|
|
self._log('WNG', 'starting an invalid procedure for UE-associated NG signalling')
|
|
if ProcClass.Code in self.Proc:
|
|
self._log('ERR', 'an NGAP procedure %s is already ongoing' % ProcClass.__name__)
|
|
return None
|
|
try:
|
|
Proc = ProcClass(self)
|
|
except Exception:
|
|
# no active NG link
|
|
self._log('ERR', 'no active NG link to initialize the NGAP procedure %s'\
|
|
% ProcClass.__name__)
|
|
return None
|
|
if Proc.Code in NGAPProcCNDispatcher and Proc.Class == 1:
|
|
# store the procedure, which requires a response from the gNB
|
|
self.Proc[Proc.Code] = Proc
|
|
if self.TRACK_PROC:
|
|
self._proc.append( Proc )
|
|
return Proc
|
|
|
|
def _encode_ngap_proc(self, Proc, **IEs):
|
|
if Proc.Name != 'NGAPUEContextRelease':
|
|
IEs['AMF_UE_NGAP_ID'], IEs['RAN_UE_NGAP_ID'] = self.CtxId, self.CtxId
|
|
else:
|
|
IEs['UE_NGAP_IDs'] = ('uE-NGAP-ID-pair', {'aMF-UE-NGAP-ID': self.CtxId,
|
|
'rAN-UE-NGAP-ID': self.CtxId})
|
|
Proc.encode_pdu('ini', **IEs)
|
|
|
|
def start_ngap_proc(self, ProcClass, **IEs):
|
|
"""initialize a CN-initiated NGAP procedure of class `ProcClass' for
|
|
UE-associated signalling, encode the initiatingMessage PDU with given
|
|
**IEs and send the PDU generated by the procedure to the gNB
|
|
"""
|
|
if not self.is_connected():
|
|
self._log('ERR', 'not connected')
|
|
return 0
|
|
Proc = self.init_ngap_proc(ProcClass, **IEs)
|
|
if Proc is None:
|
|
return 0
|
|
self.ProcLast, cnt = Proc.Code, 0
|
|
for pdu_tx in Proc.send():
|
|
if self.UE.Server.send_ngap_pdu(self.GNB, pdu_tx, self.SID):
|
|
cnt += 1
|
|
return cnt
|
|
|
|
def transmit_ngap_proc(self, NgapTxProc):
|
|
"""send the NGAP PDU as returned by the .send() method of the NGAP procedures
|
|
in the NgapTxProc list to the gNB
|
|
"""
|
|
cnt = 0
|
|
for Proc in NgapTxProc:
|
|
self.ProcLast = Proc.Code
|
|
for pdu_tx in Proc.send():
|
|
if self.UE.Server.send_ngap_pdu(self.GNB, pdu_tx, self.SID):
|
|
cnt += 1
|
|
return cnt
|
|
|
|
def clear(self):
|
|
# clears all running NGAP procedures
|
|
for Proc in list(self.Proc.values()):
|
|
Proc.abort()
|
|
|
|
#--------------------------------------------------------------------------#
|
|
# handling of NAS messages dispatching
|
|
#--------------------------------------------------------------------------#
|
|
|
|
def process_nas(self, buf):
|
|
"""process a NAS message buffer for the 5GS domain sent by the mobile
|
|
and return a list (possibly empty) of NGAP procedure(s) to be sent back
|
|
to the gNB
|
|
"""
|
|
if self.RX_HOOK:
|
|
return self.RX_HOOK(buf)
|
|
NasRxSec, err = NAS.parse_NAS5G(buf, inner=False)
|
|
if err:
|
|
self._log('WNG', 'invalid 5GS NAS message: %s' % hexlify(buf).decode('ascii'))
|
|
return self.ret_ngap_dnt(NAS.FGMMStatus(val={'5GMMCause': err}, sec=False))
|
|
#
|
|
# 5GS NAS security handling
|
|
sh, pd = NasRxSec[0]['SecHdr'].get_val(), NasRxSec[0]['EPD'].get_val()
|
|
if sh == 0:
|
|
# clear-text NAS message
|
|
if self.UE.TRACE_NAS_5GS:
|
|
self._log('TRACE_NAS_5GS_UL', '\n' + NasRxSec.show())
|
|
if pd == 126:
|
|
return self.process_nas_nosec(NasRxSec)
|
|
#
|
|
elif pd == 126 and sh in (1, 2, 4):
|
|
# protected NAS message
|
|
# 1: integrity prot only, 2: current sec ctx, 4: new sec ctx (SMC after fresh auth)
|
|
return self.process_nas_sec(NasRxSec)
|
|
#
|
|
# invalid NAS message
|
|
self._log('WNG', 'invalid 5GS NAS message security status')
|
|
if self.UE.TRACE_NAS_5GS:
|
|
self._log('TRACE_NAS_5GS_UL', '\n' + NasRxSec.show())
|
|
# err cause 98: Message type not compatible with the protocol state
|
|
return self.ret_ngap_dnt(NAS.FGMMStatus(val={'5GMMCause': 98}, sec=False))
|
|
|
|
def process_nas_nosec(self, NasRx):
|
|
# Check if the message type is valid or not, log it and eventually drop it
|
|
typ = NasRx[0]['Type'].get_val()
|
|
#
|
|
if typ in (65, 76, 79):
|
|
# initial NAS message
|
|
ct_ies, vln_ies = self.SECNAS_RX_CT_IES[typ], []
|
|
for ie in list(NasRx)[1:]:
|
|
if ie._name not in ct_ies:
|
|
vln_ies.append(ie._name)
|
|
if vln_ies:
|
|
self._log('VLN', 'unprotected IEs in initial NAS message: %s' % ', '.join(vln_ies))
|
|
#
|
|
elif typ == 92:
|
|
# ident resp
|
|
ie = NasRx['5GSID']['V'].get_val_d()
|
|
if isinstance(ie, dict) and ie['Type'] not in self.SECNAS_RX_CT_IDTYPE:
|
|
self._log('VLN', 'unprotected UE Identity: %r' % ie)
|
|
#
|
|
elif typ not in {69, 70, 87, 89, 95, 100}:
|
|
# not dereg req, dereg acc, auth resp, auth fail, sec mode rej, status
|
|
self._log('VLN', 'invalid unprotected NAS message: %s' % NasRx._name)
|
|
if self.SECNAS_RX_DROP_INVAL:
|
|
return []
|
|
#
|
|
return self.dispatch_nas(NasRx)
|
|
|
|
def process_nas_sec(self, NasRx):
|
|
# get the KSI and sec ctx
|
|
# verify MAC
|
|
# decrypt
|
|
#
|
|
# in case of MAC only (no encr), we need to go through process_nas_nosec()
|
|
# whatever result of the MAC check
|
|
|
|
if self.SEC['KSI'] in self.SEC:
|
|
sec_ctx = self.SEC[self.SEC['KSI']]
|
|
if self.SEC['KSI'] is not None:
|
|
self.SEC['KSI'] = None
|
|
else:
|
|
# TODO: no readily-available security context
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#
|
|
return self.dispatch_nas(NasRxNosec)
|
|
|
|
|
|
def dispatch_nas(self, NasRx):
|
|
epd = NasRx[0]['EPD'].get_val()
|
|
if epd == 126:
|
|
return self.FGMM.process(NasRx)
|
|
elif epd == 46:
|
|
return self.FGSM.process(NasRx)
|
|
else:
|
|
self._log('WNG', 'invalid 5G NAS message, header: %r' % NasRx[0])
|
|
return []
|
|
|
|
|
|
def ret_ngap_dnt(self, NasTx, **IEs):
|
|
"""returns an NGAPDownlinkNASTransport procedure initialized with the
|
|
NAS PDU and optional IEs to be sent
|
|
"""
|
|
if not NasTx:
|
|
return []
|
|
else:
|
|
buf = self.output_nas_sec(NasTx)
|
|
if buf is None:
|
|
return self._ngap_nas_sec_err()
|
|
IEs['NAS_PDU'] = buf
|
|
S1apProc = self.init_s1ap_proc(S1APDownlinkNASTransport, **IEs)
|
|
if S1apProc:
|
|
return [S1apProc]
|
|
else:
|
|
return []
|
|
|
|
def _ngap_nas_sec_err(self):
|
|
# TODO: maybe release the NG-UE link ?
|
|
return []
|
|
|
|
def clear_nas_proc(self):
|
|
# clears all NAS EPS procedures
|
|
self.FGMM.clear()
|
|
self.FGSM.clear()
|
|
self.SMS.clear()
|
|
|
|
#--------------------------------------------------------------------------#
|
|
# network-initiated method (fg task, to be used from the interpreter)
|
|
#--------------------------------------------------------------------------#
|
|
|
|
def _get_paging_ies(self):
|
|
guami = self.Server.AMF_GUAMI[self.UE.PLMN]
|
|
# only supporting mandatory IEs
|
|
IEs = {
|
|
'TAIListForPaging': [{
|
|
'tAI': {
|
|
'pLMNIdentity': plmn_str_to_buf(self.UE.PLMN),
|
|
'tAC': uint_to_bytes(self.UE.TAC, 24)
|
|
}
|
|
}],
|
|
'UEPagingIdentity': {
|
|
'fiveG-S-TMSI': {
|
|
'aMFSetID': (guami[1], 10),
|
|
'aMFPointer': (guami[2], 6),
|
|
'fiveG-TMSI': uint_to_bytes(self.UE.FGTMSI, 32),
|
|
}
|
|
}
|
|
}
|
|
#
|
|
return IEs
|
|
|
|
def page(self):
|
|
"""send NGAP Paging command to gNB responsible for the UE TAI
|
|
"""
|
|
# send a NGAPPaging for the 5GS domain
|
|
if self.connected.is_set():
|
|
self._log('DBG', 'paging: UE already connected')
|
|
return
|
|
# get the set of gNBs serving the UE TAI
|
|
# gNB id is 3-tuple whereas eNB id is 2-tuple
|
|
tai = (self.UE.PLMN, self.UE.TAC)
|
|
try:
|
|
gnbs = [self.Server.RAN[gnbid] for gnbid in self.Server.TAI[tai] if len(gnbid) == 3]
|
|
except Exception:
|
|
self._log('ERR', 'paging: no gNB serving the UE TAI %s.%.6x' % tai)
|
|
return
|
|
#
|
|
# only mandatory IEs supported yet
|
|
IEs = self._get_paging_ies()
|
|
#
|
|
# start a NGAPPaging procedure on all gNBs
|
|
for gnb in enbs:
|
|
gnb.page(**IEs)
|
|
self._log('INF', 'paging: ongoing')
|
|
|
|
def page_block(self):
|
|
"""page the UE and wait for it to connect, or the paging procedure to timeout.
|
|
Returns True if UE gets connected, False otherwise.
|
|
"""
|
|
# send a NGAPPaging for the 5GS domain
|
|
if self.connected.is_set():
|
|
self._log('DBG', 'paging: UE already connected')
|
|
return True
|
|
# get the set of gNBs serving the UE TAI
|
|
# gNB id is 3-tuple whereas eNB id is 2-tuple
|
|
tai = (self.UE.PLMN, self.UE.TAC)
|
|
try:
|
|
gnbs = [self.Server.RAN[gnbid] for gnbid in self.Server.TAI[tai] if len(gnbid) == 3]
|
|
except Exception:
|
|
self._log('ERR', 'paging: no gNB serving the UE TAI %s.%.6x' % tai)
|
|
return False
|
|
#
|
|
IEs = self._get_paging_ies()
|
|
#
|
|
# retries paging as defined in case UE does not connect
|
|
i = 0
|
|
while i <= self.PAG_RETR:
|
|
# start an S1APPaging procedure on all RNCs
|
|
for enb in enbs:
|
|
enb.page(**IEs)
|
|
# check until UE gets connected or timer expires
|
|
if self.connected.wait(self.PAG_WAIT):
|
|
self._log('INF', 'paging: UE connected')
|
|
return True
|
|
else:
|
|
# timeout
|
|
i += 1
|
|
self._log('WNG', 'paging: timeout, UE not connected')
|
|
return False
|
|
|
|
def send_ng_rel(self, cause=('nas', 'normal-release')):
|
|
"""send an UEContextRelease over the NG link with the given NGAP cause
|
|
"""
|
|
if not self.connected.is_set():
|
|
# nothing to release
|
|
self._log('DBG', 'release: UE not connected')
|
|
return True
|
|
# prepare the NGAPUEContextRelease procedure
|
|
NgapProc = self.init_ngap_proc(NGAPUEContextRelease, Cause=cause)
|
|
if not NgapProc:
|
|
return False
|
|
if not self.transmit_ngap_proc([NgapProc]):
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
def send_ng_err(self, cause, **IEs):
|
|
"""send an ErrorIndication over the NG link with the given AP cause
|
|
IEs can contain any of the optional or extended IEs: CriticalityDiagnostics
|
|
"""
|
|
if not self.connected.is_set():
|
|
# NGAP link disconnected
|
|
if self.NGAP_FORCE_PAGE:
|
|
# force to connect
|
|
if not self._net_init_con():
|
|
# unable to connect with the UE
|
|
return False
|
|
else:
|
|
return False
|
|
# prepare the S1AP procedure
|
|
IEs['Cause'] = cause
|
|
NgapProc = self.init_ngap_proc(NGAPErrorIndCN, **IEs)
|
|
if not NgapProc:
|
|
return False
|
|
if not self.transmit_s1ap_proc([NgapProc]):
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
|
|
#--------------------------------------------------------------------------#
|
|
# to send arbitrary NAS buffers to the UE
|
|
#--------------------------------------------------------------------------#
|
|
# TODO
|
|
|
|
|
|
#--------------------------------------------------------------------------#
|
|
# 5G bearer activation
|
|
#--------------------------------------------------------------------------#
|
|
# TODO
|
|
|
|
|