pycrate/pycrate_corenet/HdlrUENG.py

981 lines
34 KiB
Python

# -*- coding: UTF-8 -*-
#/**
# * Software Name : pycrate
# * Version : 0.4
# *
# * Copyright 2020. Benoit Michau. P1Sec.
# *
# * This library is free software; you can redistribute it and/or
# * modify it under the terms of the GNU Lesser General Public
# * License as published by the Free Software Foundation; either
# * version 2.1 of the License, or (at your option) any later version.
# *
# * This library is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# * Lesser General Public License for more details.
# *
# * You should have received a copy of the GNU Lesser General Public
# * License along with this library; if not, write to the Free Software
# * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# * MA 02110-1301 USA
# *
# *--------------------------------------------------------
# * File Name : pycrate_corenet/HdlrUENG.py
# * Created : 2020-04-29
# * Authors : Benoit Michau
# *--------------------------------------------------------
#*/
from .utils import *
from .ProcCNNgap import *
from .ProcCNFGMM import *
from .ProcCNFGSM import *
from .HdlrUESMS import *
from pycrate_mobile.TS24501_IE import (
FGSIDTYPE_NO, # 0
FGSIDTYPE_SUPI,
FGSIDTYPE_GUTI,
FGSIDTYPE_IMEI,
FGSIDTYPE_STMSI,
FGSIDTYPE_IMEISV,
FGSIDTYPE_MAC,
FGSIDTYPE_EUI64, # 7
FGSIDFMT_IMSI, # 0
FGSIDFMT_NSI,
FGSIDFMT_GCI,
FGSIDFMT_GLI, # 3
UESecCap as FGSUESecCap
)
class UEFGMMd(SigStack):
"""UE 5GMM handler within a UENGd instance
responsible for 5G Mobility Management signalling procedures
"""
TRACK_PROC = True
# reference to the UEd
UE = None
# reference to the UENGd
NG = None
# state: DEREGISTERED (cannot be paged) <-> CONNECTED <-> IDLE
state = 'DEREGISTERED'
# to bypass the process() server loop with a custom NAS PDU handler
RX_HOOK = None
# additional time for letting background task happen in priority
_WAIT_ADD = 0.005
#--------------------------------------------------------------------------#
# FGMMAuthentication policy
#--------------------------------------------------------------------------#
# this will systematically bypass all authentication procedures
AUTH_DISABLED = False
# 5GMM procedure timer for auth and smc
T3560 = 2
# Authentication Management Field
AUTH_AMF = b'\x80\x00'
# Authentication ABBA
AUTH_ABBA = b'\x00\x00'
# if AUTH_PLMN is not None, it will be used for building the 5G auth vector
# otherwise the main Corenet PLMN will be used
AUTH_PLMN = None
# this is to force a 2G or 3G authentication instead of a 5G one
AUTH_2G = False
AUTH_3G = False
# this is to extend AUTN with arbitrary data
AUTH_AUTN_EXT = None
#
# re-authentication policy:
# this forces an auth procedure every X 5GMM Reg / Service / Detach procedures
# even if a valid KSI is provided by the UE
AUTH_REG = 1
AUTH_SER = 3
AUTH_DET = 1 # only applied to Detach without UE power off
#--------------------------------------------------------------------------#
# FGMM timers
#--------------------------------------------------------------------------#
# MT Deregistration
T3522 = 1
# Registration
T3550 = 1
# UE Config Update
T3555 = 2
# AKA, SMC
T3560 = 2
# Identification
T3570 = 1
# NSSAI Auth
T3575 = 2
def _log(self, logtype, msg):
self.NG._log(logtype, '[5GMM] %s' % msg)
def __init__(self, ued, uengd):
self.UE = ued
self.set_ng(uengd)
#
# ready event, used by foreground tasks (network / interpreter initiated)
self.ready = Event()
self.ready.set()
# stack of ongoing 5GMM procedures (i.e. common procedures can be run
# within specific procedure)
self.Proc = []
# list of tracked procedures (requires TRACK_PROC = True)
self._proc = []
def set_ng(self, uengd):
self.NG = uengd
def process(self, NasRx):
"""process a NAS 5GMM message (NasRx) sent by the UE,
and return a list (possibly empty) of NGAP procedure(s) to be sent back
to the gNB
"""
# TODO
return []
def init_proc(self, ProcClass, encod=None, fgmm_preempt=False):
"""initialize a CN-initiated 5GMM procedure of class `ProcClass' and
given encoder(s), and return the procedure
"""
# TODO
pass
def clear(self):
"""abort all running procedures
"""
for Proc in self.Proc[::-1]:
Proc.abort()
#--------------------------------------------------------------------------#
# SMC and security-related methods
#--------------------------------------------------------------------------#
def require_auth(self, Proc, ksi=None):
return True
def require_smc(self, Proc):
return True
def get_new_ksi(self):
for i in range(0, 7):
if i not in self.NG.SEC:
return i
# all native KSI have been used, clear all of them except the current one
# if defined
cur = self.NG.SEC['KSI']
for i in range(0, 7):
if i != cur:
del self.NG.SEC[i]
if cur == 0:
return 1
else:
return 0
def set_sec_ctx(self, ksi, ctx, vect, snid):
ksi = (ksi[0]<<3) + ksi[1]
if ctx == 2:
# WNG: this is undefined / illegal and won't work (hopefully)
CK, IK = conv_102_C4(vect[2]), conv_102_C5(vect[2])
if self.AUTH_PLMN:
snid = make_5g_snn(self.AUTH_PLMN)
else:
snid = make_5g_snn(self.UE.Server.PLMN)
Kausf = conv_501_A2(CK, IK, sn_name, sqnak)
secctx = {'VEC' : vect,
'CTX' : ctx,
'Kc' : vect[2],
'CK' : CK,
'IK' : IK,
'Kausf': Kausf}
elif ctx == 3:
# WNG: this is also undefined and shouldn't work
if self.AUTH_PLMN:
snid = make_5g_snn(self.FGMM.AUTH_PLMN)
else:
snid = make_5g_snn(self.UE.Server.PLMN)
Kausf = conv_(vect[3], vect[4], snid, vect[2][:6])
secctx = {'VEC' : vect,
'CTX' : ctx,
'CK' : vect[3],
'IK' : vect[4],
'Kausf': Kausf}
else:
# ctx == 5
secctx = {'VEC' : vect,
'CTX' : ctx,
'Kausf': vect[3]}
#
secctx['Kseaf'] = conv_501_A6(kausf, snid)
secctx['Kamf'] = conv_501_A7(kseaf, self.UE.IMSI, self.AUTH_ABBA)
secctx['UL'], secctx['DL'] = 0, 0
# TODO: check if a custom UL counter is still required for gNB key derivation
#secctx['UL_gnb'] = 0
self.NG.SEC[ksi] = secctx
self.NG.SEC['KSI'] = ksi
#--------------------------------------------------------------------------#
# network-initiated method (fg task, to be used from the interpreter)
#--------------------------------------------------------------------------#
def _net_init_con(self):
if not self.NG.page_block():
return False
# need to wait for potential 5GMM serving / common procedures to happen and end
sleep(self._WAIT_ADD)
if not self.ready.wait(5):
# something is blocking in the serving / common procedures
return False
elif not self.NG.connected.is_set():
# something went wrong during the serving / common procedures
return False
else:
return True
def run_proc(self, ProcClass, **IEs):
"""run a network-initiated procedure ProcClass in the context of the 5GMM stack,
after setting the given IEs in the NAS message to be sent to the UE
returns a 2-tuple (success, proc)
success is a bool
proc is the instance of ProcClass or None
"""
if ProcClass.Init is None:
self._log('ERR', 'invalid network-initiated procedure %s' % ProcClass.Name)
return False, None
if not self._net_init_con():
return False, None
#
Proc = self.init_proc(ProcClass, encod={ProcClass.Init: IEs}, fgmm_preempt=True)
try:
NgapTxProc = Proc.output()
except Exception:
self._log('ERR', 'invalid IEs for network-initiated procedure %s' % Proc.Name)
Proc.abort()
return False, Proc
if not self.NG.transmit_ngap_proc(NgapTxProc):
return False, Proc
#
# check if a response is expected
if not hasattr(Proc, 'TimerValue'):
return True, Proc
elif not self.ready.wait(Proc.TimerValue + self._WAIT_ADD):
# procedure is stuck, will be aborted in the server loop
# WNG: this means the routine for cleaning NAS procedures in timeout
# should be enabled in CorenetServer
return False, Proc
#
# check if a response was received
if hasattr(Proc, 'UEInfo'):
return True, Proc
else:
return False, Proc
class UEFGSMd(SigStack):
"""UE 5GSM handler within a UENGd instance
responsible for 5G Session Management signalling procedures
"""
TRACK_PROC = True
# reference to the UEd
UE = None
# reference to the UENGd
NG = None
# to bypass the process() server loop with a custom NAS PDU handler
RX_HOOK = None
def _log(self, logtype, msg):
self.NG._log(logtype, '[5GSM] %s' % msg)
def __init__(self, ued, uengd):
self.UE = ued
self.set_ng(uengd)
#
# dict of ongoing 5GSM procedures, indexed by 5GS bearer ID
self.Proc = {i: [] for i in range(16)}
# dict of configured PDU, indexed by 5GS bearer ID
self.PDU = {}
# dict of ongoing 5GSM transactions IEs
self.Trans = {}
# list of tracked procedures (requires TRACK_PROC = True)
self._proc = []
def set_ng(self, uengd):
self.NG = uengd
def process(self, NasRx, FGMMProc=None):
"""process a NAS 5GSM message (NasRx) sent by the UE,
and return a list (possibly empty) of NGAP procedure(s) to be sent back
to the gNB
FGMMProc [FMMSigProc or None], indicates if the NAS FGSM message is handled in
the context of an FGMM procedure
"""
# TODO
return []
def init_proc(self, ProcClass, **kw):
"""initialize a CN-initiated 5GSM procedure of class `ProcClass' and
given encoder(s), and return the procedure
"""
# TODO
pass
def clear(self, ebi=None):
"""abort all running procedures, eventually for a single 5GS Bearer ID
"""
pass
#--------------------------------------------------------------------------#
# transaction processing
#--------------------------------------------------------------------------#
def process_trans(self, trans_id):
"""process a 5GSM transaction initiated by the UE, and return a network-initiated
procedure with IEs configured and None, or None and the 5GSM error code
"""
pass
#--------------------------------------------------------------------------#
# protocol configuration processing
#--------------------------------------------------------------------------#
class UENGd(SigStack):
"""UE NG handler within a CorenetServer instance
responsible for UE-associated NGAP signalling
"""
# to keep track of all NGAP procedures
TRACK_PROC = True
# domain
DOM = '5GS'
# reference to the UEd
UE = None
# reference to the GNBd, SCTP stream id
GNB = None
SID = None
# to bypass the process_nas() server loop with a custom NAS PDU handler
RX_HOOK = None
# for pure NGAP procedure (no NAS trafic, neither PDU-oriented stuff)
# should we page the UE to run the procedure successfully when UE is idle
NGAP_FORCE_PAGE = False
#--------------------------------------------------------------------------#
# global security policy
#--------------------------------------------------------------------------#
#
# 1) NAS Rx path
#
# IEs allowed in clear-text initial NAS messages
SECNAS_RX_CT_IES = {
# Registration Req
65 : {
'NAS_KSI',
'5GSRegType',
'5GSID', # needs to be a temp id or SUCI
'UESecCap',
'UEStatus',
'AddGUTI',
'EPSNASContainer',
'NASContainer',
},
# Service Req
76 : {
'ServiceType',
'NAS_KSI',
'5GSID', # needs to be a temp id
'NASContainer',
},
# Ctrl Plane Service Req
79 : {
'NAS_KSI',
'CtrlPlaneServiceType',
'NASContainer',
}
}
#
# Identity type allowed in clear-text IdentityResponse
SECNAS_RX_CT_IDTYPE = {
FGSIDTYPE_NO,
FGSIDTYPE_SUPI,
#FGSIDTYPE_GUTI,
#FGSIDTYPE_STMSI
}
#
# dropping invalid Rx message is the default behaviour
SECNAS_RX_DROP_INVAL = True
'''
# this will systematically bypass all auth and smc procedures,
# NAS MAC and UL count verification in the uplink
# and setting of the 5GMM security header (and encryption) in the downlink
SECNAS_DISABLED = False
#
# finer grained NAS security checks:
# True to drop NAS PDU when NAS MAC verification fails
SECNAS_UL_MAC = False
# True to drop NAS PDU when NAS UL count verification fails
SECNAS_UL_CNT = False
# WNG: 5GMM and 5GSM stacks have further control on accepting or not certain
# NAS message even if security control have failed
#
# this will disable the setting of the 5GMM security header (and encryption)
# in the downlink for given NAS message (by name)
SECNAS_PDU_NOSEC = set()
'''
#
# format of the security context dict self.SEC:
# self.SEC is a dict of available 5G security contexts indexed by KSI,
# and current KSI in use
#
# when self.SEC['KSI'] is not None, the context is enabled at the NAS level, e.g.
# self.SEC = {'KSI': 0,
# 0: {'RAND': b'...', 'RES': b'...', 'AUTN': b'...', 'CK': b'...', 'IK': b'...',
# 'SNName': b'...', 'ABBA': b'...', 'RESstar': b'...',
# 'Kausf': b'...', 'Kseaf': b'...', 'Kamf': b'...',
# 'Knasenc': b'...', 'Knasint': b'...',
# 'UL': 0, 'DL': 0, 'NASEA': 0, 'NASIA': 0,
# 'Kgnb': b'...'},
# ...,
# 'POL': {'REG': 0, 'SER': 0, 'DER': 0}}
#
# The POL dict indicates the authentication policy for each procedure
#--------------------------------------------------------------------------#
# NGAPPaging policy
#--------------------------------------------------------------------------#
# page_block() parameters:
# number of retries when not successful
PAG_RETR = 2
# timer in sec between retries
PAG_WAIT = 2
#--------------------------------------------------------------------------#
# NGAPInitialContextSetup policy
#--------------------------------------------------------------------------#
#
#--------------------------------------------------------------------------#
# NGAPTraceStart policy
#--------------------------------------------------------------------------#
#
def _log(self, logtype, msg):
self.UE._log(logtype, '[UENGd: %3i] %s' % (self.CtxId, msg))
def __init__(self, ued, gnbd=None, ctx_id=-1, sid=None):
self.UE = ued
self.Server = ued.Server
self.Config = self.Server.ConfigNG
#
# dict of ongoing NGAP procedures (indexed by their procedure code)
self.Proc = {}
# list of tracked procedures (requires TRACK_PROC = True)
self._proc = []
#
# dict of available 5G security contexts, indexed by KSI
# and current KSI in use
self.SEC = {}
self.reset_sec_ctx()
#
# state for NG / radio connection: set with InitialUEMessage, unset with UEContextRelease
self.connected = Event()
# state for processing the initial NAS message: unset after InitialUEMessage processed
self.nasinit = Event()
if gnbd is not None:
self.set_ran(gnbd)
else:
self.CtxId = -1
#
# init 5GMM and 5GSM sig stacks
self.FGMM = UEFGMMd(ued, self)
self.FGSM = UEFGSMd(ued, self)
self.SMS = UESMSd(ued, self)
def set_ran(self, gnbd):
self.SEC['KSI'] = None
self.GNB = gnbd
self.connected.set()
self.nasinit.set()
def unset_ran(self):
self.GNB.unset_ue_ng(self.CtxId)
del self.GNB
self.SEC['KSI'] = None
self.clear()
self.connected.clear()
def set_ran_unconnected(self, gnbd):
# required for paging
self.SEC['KSI'] = None
self.GNB = gnbd
def unset_ran_unconnected(self):
# required for paging
del self.GNB
self.SEC['KSI'] = None
def is_connected(self):
return self.connected.is_set()
def set_ctx(self, ctx_id, sid):
self.CtxId = ctx_id
self.SID = sid
def unset_ctx(self):
self.CtxId = -1
del self.SID
def reset_sec_ctx(self):
self.SEC.clear()
self.SEC['KSI'] = None
self.SEC['POL'] = {'REG': 0, 'DET': 0, 'SER': 0}
if 'UESecCap' in self.UE.Cap:
del self.UE.Cap['UESecCap']
def get_sec_ctx(self):
return self.SEC.get(self.SEC['KSI'], None)
#--------------------------------------------------------------------------#
# handling of NGAP procedures
#--------------------------------------------------------------------------#
def process_ngap_pdu(self, pdu_rx):
"""process a NGAP PDU sent by the gNB for UE-associated signalling
and return a list of NGAP PDU(s) to be sent back to it
"""
errcause = None
if pdu_rx[0] == 'initiatingMessage':
# gNB-initiated procedure, instantiate it
try:
Proc = NGAPProcRANDispatcher[pdu_rx[1]['procedureCode']](self)
except Exception:
self._log('ERR', 'invalid NGAP PDU, initiatingMessage, code %i'\
% pdu_rx[1]['procedureCode'])
errcause = ('protocol', 'abstract-syntax-error-reject')
Proc = self.init_ngap_proc(NGAPErrorIndCN, Cause=errcause)
if not Proc:
return []
else:
if self.TRACK_PROC:
self._proc.append( Proc )
# process the PDU within the procedure
Proc.recv( pdu_rx )
if Proc.Class == 2 and Proc.errcause:
Err = self.init_ngap_proc(NGAPErrorIndCN, Cause=Proc.errcause)
if not Err:
return []
self.ProcLast = Err.Code
return Err.send()
elif Proc.Class == 1 or errcause:
self.ProcLast = Proc.Code
return Proc.send()
else:
pdu_tx = []
for ProcRet in Proc.trigger():
pdu_tx.extend( ProcRet.send() )
self.ProcLast = ProcRet.Code
return pdu_tx
#
else:
# CN-initiated procedure, transfer the PDU to it
try:
Proc = self.Proc[pdu_rx[1]['procedureCode']]
except Exception:
self._log('ERR', 'invalid NGAP PDU, %s, code %i'\
% (pdu_rx[0], pdu_rx[1]['procedureCode']))
errcause = ('protocol', 'message-not-compatible-with-receiver-state')
Proc = self.init_ngap_proc(NGAPErrorIndCN, Cause=errcause)
if not Proc:
return []
# process the PDU within the procedure
Proc.recv( pdu_rx )
if Proc.errcause:
Err = self.init_ngap_proc(NGAPErrorIndCN, Cause=Proc.errcause)
if not Err:
return []
self.ProcLast = Err.Code
return Err.send()
elif errcause:
self.ProcLast = Proc.Code
return Proc.send()
else:
pdu_tx = []
for ProcRet in Proc.trigger():
pdu_tx.extend( ProcRet.send() )
self.ProcLast = ProcRet.Code
return pdu_tx
def init_ngap_proc(self, ProcClass, **IEs):
"""initialize a CN-initiated NGAP procedure of class `ProcClass' for
UE-associated signalling, encode the initiatingMessage PDU with given
**IEs and return the procedure
"""
Proc = self._init_ngap_proc(ProcClass)
if not Proc:
return None
else:
self._encode_ngap_proc(Proc, **IEs)
return Proc
def _init_ngap_proc(self, ProcClass):
if not issubclass(ProcClass, NGAPSigProc):
self._log('WNG', 'starting an invalid procedure for UE-associated NG signalling')
if ProcClass.Code in self.Proc:
self._log('ERR', 'an NGAP procedure %s is already ongoing' % ProcClass.__name__)
return None
try:
Proc = ProcClass(self)
except Exception:
# no active NG link
self._log('ERR', 'no active NG link to initialize the NGAP procedure %s'\
% ProcClass.__name__)
return None
if Proc.Code in NGAPProcCNDispatcher and Proc.Class == 1:
# store the procedure, which requires a response from the gNB
self.Proc[Proc.Code] = Proc
if self.TRACK_PROC:
self._proc.append( Proc )
return Proc
def _encode_ngap_proc(self, Proc, **IEs):
if Proc.Name != 'NGAPUEContextRelease':
IEs['AMF_UE_NGAP_ID'], IEs['RAN_UE_NGAP_ID'] = self.CtxId, self.CtxId
else:
IEs['UE_NGAP_IDs'] = ('uE-NGAP-ID-pair', {'aMF-UE-NGAP-ID': self.CtxId,
'rAN-UE-NGAP-ID': self.CtxId})
Proc.encode_pdu('ini', **IEs)
def start_ngap_proc(self, ProcClass, **IEs):
"""initialize a CN-initiated NGAP procedure of class `ProcClass' for
UE-associated signalling, encode the initiatingMessage PDU with given
**IEs and send the PDU generated by the procedure to the gNB
"""
if not self.is_connected():
self._log('ERR', 'not connected')
return 0
Proc = self.init_ngap_proc(ProcClass, **IEs)
if Proc is None:
return 0
self.ProcLast, cnt = Proc.Code, 0
for pdu_tx in Proc.send():
if self.UE.Server.send_ngap_pdu(self.GNB, pdu_tx, self.SID):
cnt += 1
return cnt
def transmit_ngap_proc(self, NgapTxProc):
"""send the NGAP PDU as returned by the .send() method of the NGAP procedures
in the NgapTxProc list to the gNB
"""
cnt = 0
for Proc in NgapTxProc:
self.ProcLast = Proc.Code
for pdu_tx in Proc.send():
if self.UE.Server.send_ngap_pdu(self.GNB, pdu_tx, self.SID):
cnt += 1
return cnt
def clear(self):
# clears all running NGAP procedures
for Proc in list(self.Proc.values()):
Proc.abort()
#--------------------------------------------------------------------------#
# handling of NAS messages dispatching
#--------------------------------------------------------------------------#
def process_nas(self, buf):
"""process a NAS message buffer for the 5GS domain sent by the mobile
and return a list (possibly empty) of NGAP procedure(s) to be sent back
to the gNB
"""
if self.RX_HOOK:
return self.RX_HOOK(buf)
NasRxSec, err = NAS.parse_NAS5G(buf, inner=False)
if err:
self._log('WNG', 'invalid 5GS NAS message: %s' % hexlify(buf).decode('ascii'))
return self.ret_ngap_dnt(NAS.FGMMStatus(val={'5GMMCause': err}, sec=False))
#
# 5GS NAS security handling
sh, pd = NasRxSec[0]['SecHdr'].get_val(), NasRxSec[0]['EPD'].get_val()
if sh == 0:
# clear-text NAS message
if self.UE.TRACE_NAS_5GS:
self._log('TRACE_NAS_5GS_UL', '\n' + NasRxSec.show())
if pd == 126:
return self.process_nas_nosec(NasRxSec)
#
elif pd == 126 and sh in (1, 2, 4):
# protected NAS message
# 1: integrity prot only, 2: current sec ctx, 4: new sec ctx (SMC after fresh auth)
return self.process_nas_sec(NasRxSec)
#
# invalid NAS message
self._log('WNG', 'invalid 5GS NAS message security status')
if self.UE.TRACE_NAS_5GS:
self._log('TRACE_NAS_5GS_UL', '\n' + NasRxSec.show())
# err cause 98: Message type not compatible with the protocol state
return self.ret_ngap_dnt(NAS.FGMMStatus(val={'5GMMCause': 98}, sec=False))
def process_nas_nosec(self, NasRx):
# Check if the message type is valid or not, log it and eventually drop it
typ = NasRx[0]['Type'].get_val()
#
if typ in (65, 76, 79):
# initial NAS message
ct_ies, vln_ies = self.SECNAS_RX_CT_IES[typ], []
for ie in list(NasRx)[1:]:
if ie._name not in ct_ies:
vln_ies.append(ie._name)
if vln_ies:
self._log('VLN', 'unprotected IEs in initial NAS message: %s' % ', '.join(vln_ies))
#
elif typ == 92:
# ident resp
ie = NasRx['5GSID']['V'].get_val_d()
if isinstance(ie, dict) and ie['Type'] not in self.SECNAS_RX_CT_IDTYPE:
self._log('VLN', 'unprotected UE Identity: %r' % ie)
#
elif typ not in {69, 70, 87, 89, 95, 100}:
# not dereg req, dereg acc, auth resp, auth fail, sec mode rej, status
self._log('VLN', 'invalid unprotected NAS message: %s' % NasRx._name)
if self.SECNAS_RX_DROP_INVAL:
return []
#
return self.dispatch_nas(NasRx)
def process_nas_sec(self, NasRx):
# get the KSI and sec ctx
# verify MAC
# decrypt
#
# in case of MAC only (no encr), we need to go through process_nas_nosec()
# whatever result of the MAC check
if self.SEC['KSI'] in self.SEC:
sec_ctx = self.SEC[self.SEC['KSI']]
if self.SEC['KSI'] is not None:
self.SEC['KSI'] = None
else:
# TODO: no readily-available security context
pass
#
return self.dispatch_nas(NasRxNosec)
def dispatch_nas(self, NasRx):
epd = NasRx[0]['EPD'].get_val()
if epd == 126:
return self.FGMM.process(NasRx)
elif epd == 46:
return self.FGSM.process(NasRx)
else:
self._log('WNG', 'invalid 5G NAS message, header: %r' % NasRx[0])
return []
def ret_ngap_dnt(self, NasTx, **IEs):
"""returns an NGAPDownlinkNASTransport procedure initialized with the
NAS PDU and optional IEs to be sent
"""
if not NasTx:
return []
else:
buf = self.output_nas_sec(NasTx)
if buf is None:
return self._ngap_nas_sec_err()
IEs['NAS_PDU'] = buf
S1apProc = self.init_s1ap_proc(S1APDownlinkNASTransport, **IEs)
if S1apProc:
return [S1apProc]
else:
return []
def _ngap_nas_sec_err(self):
# TODO: maybe release the NG-UE link ?
return []
def clear_nas_proc(self):
# clears all NAS EPS procedures
self.FGMM.clear()
self.FGSM.clear()
self.SMS.clear()
#--------------------------------------------------------------------------#
# network-initiated method (fg task, to be used from the interpreter)
#--------------------------------------------------------------------------#
def _get_paging_ies(self):
guami = self.Server.AMF_GUAMI[self.UE.PLMN]
# only supporting mandatory IEs
IEs = {
'TAIListForPaging': [{
'tAI': {
'pLMNIdentity': plmn_str_to_buf(self.UE.PLMN),
'tAC': uint_to_bytes(self.UE.TAC, 24)
}
}],
'UEPagingIdentity': {
'fiveG-S-TMSI': {
'aMFSetID': (guami[1], 10),
'aMFPointer': (guami[2], 6),
'fiveG-TMSI': uint_to_bytes(self.UE.FGTMSI, 32),
}
}
}
#
return IEs
def page(self):
"""send NGAP Paging command to gNB responsible for the UE TAI
"""
# send a NGAPPaging for the 5GS domain
if self.connected.is_set():
self._log('DBG', 'paging: UE already connected')
return
# get the set of gNBs serving the UE TAI
# gNB id is 3-tuple whereas eNB id is 2-tuple
tai = (self.UE.PLMN, self.UE.TAC)
try:
gnbs = [self.Server.RAN[gnbid] for gnbid in self.Server.TAI[tai] if len(gnbid) == 3]
except Exception:
self._log('ERR', 'paging: no gNB serving the UE TAI %s.%.6x' % tai)
return
#
# only mandatory IEs supported yet
IEs = self._get_paging_ies()
#
# start a NGAPPaging procedure on all gNBs
for gnb in enbs:
gnb.page(**IEs)
self._log('INF', 'paging: ongoing')
def page_block(self):
"""page the UE and wait for it to connect, or the paging procedure to timeout.
Returns True if UE gets connected, False otherwise.
"""
# send a NGAPPaging for the 5GS domain
if self.connected.is_set():
self._log('DBG', 'paging: UE already connected')
return True
# get the set of gNBs serving the UE TAI
# gNB id is 3-tuple whereas eNB id is 2-tuple
tai = (self.UE.PLMN, self.UE.TAC)
try:
gnbs = [self.Server.RAN[gnbid] for gnbid in self.Server.TAI[tai] if len(gnbid) == 3]
except Exception:
self._log('ERR', 'paging: no gNB serving the UE TAI %s.%.6x' % tai)
return False
#
IEs = self._get_paging_ies()
#
# retries paging as defined in case UE does not connect
i = 0
while i <= self.PAG_RETR:
# start an S1APPaging procedure on all RNCs
for enb in enbs:
enb.page(**IEs)
# check until UE gets connected or timer expires
if self.connected.wait(self.PAG_WAIT):
self._log('INF', 'paging: UE connected')
return True
else:
# timeout
i += 1
self._log('WNG', 'paging: timeout, UE not connected')
return False
def send_ng_rel(self, cause=('nas', 'normal-release')):
"""send an UEContextRelease over the NG link with the given NGAP cause
"""
if not self.connected.is_set():
# nothing to release
self._log('DBG', 'release: UE not connected')
return True
# prepare the NGAPUEContextRelease procedure
NgapProc = self.init_ngap_proc(NGAPUEContextRelease, Cause=cause)
if not NgapProc:
return False
if not self.transmit_ngap_proc([NgapProc]):
return False
else:
return True
def send_ng_err(self, cause, **IEs):
"""send an ErrorIndication over the NG link with the given AP cause
IEs can contain any of the optional or extended IEs: CriticalityDiagnostics
"""
if not self.connected.is_set():
# NGAP link disconnected
if self.NGAP_FORCE_PAGE:
# force to connect
if not self._net_init_con():
# unable to connect with the UE
return False
else:
return False
# prepare the S1AP procedure
IEs['Cause'] = cause
NgapProc = self.init_ngap_proc(NGAPErrorIndCN, **IEs)
if not NgapProc:
return False
if not self.transmit_s1ap_proc([NgapProc]):
return False
else:
return True
#--------------------------------------------------------------------------#
# to send arbitrary NAS buffers to the UE
#--------------------------------------------------------------------------#
# TODO
#--------------------------------------------------------------------------#
# 5G bearer activation
#--------------------------------------------------------------------------#
# TODO