pycrate/pycrate_corenet/HdlrUE.py

797 lines
33 KiB
Python

# -*- coding: UTF-8 -*-
#/**
# * Software Name : pycrate
# * Version : 0.4
# *
# * Copyright 2017. Benoit Michau. ANSSI.
# * Copyright 2020. Benoit Michau. P1Sec.
# *
# * This library is free software; you can redistribute it and/or
# * modify it under the terms of the GNU Lesser General Public
# * License as published by the Free Software Foundation; either
# * version 2.1 of the License, or (at your option) any later version.
# *
# * This library is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# * Lesser General Public License for more details.
# *
# * You should have received a copy of the GNU Lesser General Public
# * License along with this library; if not, write to the Free Software
# * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# * MA 02110-1301 USA
# *
# *--------------------------------------------------------
# * File Name : pycrate_corenet/HdlrUE.py
# * Created : 2017-06-28
# * Authors : Benoit Michau
# *--------------------------------------------------------
#*/
from .HdlrUEIuCS import *
from .HdlrUEIuPS import *
from .HdlrUES1 import *
from .HdlrUENG import *
from .utils import *
class UEd(SigStack):
"""UE handler within a CorenetServer instance
responsible for UE-related RAN signaling and NAS signaling
"""
#--------------------------------------------------------------------------#
# debug and tracing level
#--------------------------------------------------------------------------#
#
# verbosity level
DEBUG = ('VLN', 'ERR', 'WNG', 'INF', 'DBG')
# to log UE-related RANAP, S1AP and NGAP for all UE
TRACE_ASN_RANAP_CS = False
TRACE_ASN_RANAP_PS = False
TRACE_ASN_S1AP = False
TRACE_ASN_NGAP = False
# to log UE NAS over IuCS (except SMS) for all UE
TRACE_NAS_CS = False
# to log UE NAS over IuPS for all UE
TRACE_NAS_PS = False
# to log UE LTE NAS (potentially) encrypted EMM / ESM for all UE
TRACE_NAS_EPS_SEC = False
# to log UE LTE NAS clear-text EMM / ESM for all UE
TRACE_NAS_EPS = False
# to log UE NAS containing SMS for all UE
TRACE_NAS_SMS = False
# to log UE 5G NAS (potentially) encrypted signalling for all UE
TRACE_NAS_5GS_SEC = False
# to log UE 5G NAS clear-text signalling for all UE
TRACE_NAS_5GS = False
#--------------------------------------------------------------------------#
# UE global informations
#--------------------------------------------------------------------------#
#
# fixed identities
IMSI = None
IMEI = None
IMEISV = None
# temporary identities (TMSI / PTMSI / MTMSI / FGTMSI are uint32)
TMSI = None # CS domain
PTMSI = None # PS domain
MTMSI = None # EPS domain
FGTMSI = None # 5GS domain
#--------------------------------------------------------------------------#
# CorenetServer reference
#--------------------------------------------------------------------------#
#
Server = None
#--------------------------------------------------------------------------#
# RAN-related infos
#--------------------------------------------------------------------------#
#
# Radio Access Technology (str)
RAT = None
# specific Iu / S1 / NG signaling handler
IuCS = None
IuPS = None
S1 = None
NG = None
#
# location parameters
PLMN = None # string of digits
LAC = None # uint16
RAC = None # uint8
SAC = None # uintX
TAC = None # uint16 (S1) or uint24 (NG)
def _log(self, logtype, msg):
if logtype[:3] == 'TRA':
hdr, msg = msg.split('\n', 1)
log('[TRA] [UE: %s] %s[%s]\n%s%s%s'\
% (self.IMSI, hdr, logtype[6:], TRACE_COLOR_START, msg, TRACE_COLOR_END))
elif logtype in self.DEBUG:
log('[%s] [UE: %s] %s' % (logtype, self.IMSI, msg))
def __init__(self, server, imsi, **kw):
self.Server = server
if imsi:
self.IMSI = imsi
elif 'tmsi' in kw:
# CS domain, 3G
self.TMSI = kw['tmsi']
elif 'ptmsi' in kw:
# PS domain, 3G
self.PTMSI = kw['ptmsi']
elif 'mtmsi' in kw:
# EPS domain, 4G
self.MTMSI = kw['mtmsi']
elif 'fgtmsi' in kw:
# 5GS domain, 5G
self.FGTMSI = kw['fgtmsi']
#
# init capabilities
self.Cap = {}
#
# set handler for IuCS, IuPS and S1 links
self.IuCS = UEIuCSd(self)
self.IuPS = UEIuPSd(self)
self.S1 = UES1d(self)
self.NG = UENGd(self)
self._last_ran = None
#
if 'config' in kw:
self.set_config(kw['config'])
def set_config(self, config):
self.MSISDN = config['MSISDN']
self.USIM = config['USIM']
#
self.IuPS.SM.PDPConfig = {}
for pdpconfig in config['PDP']:
apn, pdpaddr, apncfg = pdpconfig[0], pdpconfig[1:], {}
# Server.ConfigPDP provides the DNS servers for each APN (and some
# more common parameters)
# UE.IuPS.SM.RABConfig provides the default RAB QoS for each APN
if apn not in self.Server.ConfigPDP:
self._log('WNG', 'unable to configure PDP connectivity for APN %s, '\
'no DNS servers' % apn)
elif apn not in self.IuPS.SM.RABConfig:
self._log('WNG', 'unable to configure PDP connectivity for APN %s, '\
'no IuPS QoS parameters' % apn)
else:
apncfg = cpdict(self.Server.ConfigPDP[apn])
apncfg['Addr'] = pdpaddr
apncfg['RAB'] = cpdict(self.IuPS.SM.RABConfig[apn])
self.IuPS.SM.PDPConfig[apn] = apncfg
#
self.S1.ESM.PDNConfig = {}
for pdnconfig in config['PDN']:
apn, pdnaddr, apncfg = pdnconfig[0], pdnconfig[1:], {}
# Server.ConfigPDN provides the DNS servers for each APN (and some
# more common parameters)
# UE.S1.ESM.RABConfig provides the default RAB QoS for each APN
if apn not in self.Server.ConfigPDN:
self._log('WNG', 'unable to configure PDN connectivity for APN %s, '\
'no DNS servers' % apn)
elif apn not in self.S1.ESM.RABConfig:
self._log('WNG', 'unable to configure PDN connectivity for APN %s, '\
'no S1 QoS parameters' % apn)
else:
apncfg = cpdict(self.Server.ConfigPDN[apn])
apncfg['Addr'] = pdnaddr
apncfg['RAB'] = cpdict(self.S1.ESM.RABConfig[apn])
apncfg['RAB']['QCI'] = apncfg['QCI']
self.S1.ESM.PDNConfig[apn] = apncfg
#
self.NG.FGSM.PDUConfig = {}
for pduconfig in config['PDU']:
# TODO: initialize 5G SM config with allowed PDU Sessions
pass
def set_ran(self, ran, ctx_id, sid=None, dom=None):
# UE going connected
if ran.__class__.__name__ == 'HNBd':
#
if self.S1.is_connected():
# error: already linked with another ran
raise(CorenetErr('UE already connected through a S1 link'))
elif self.NG.is_connected():
raise(CorenetErr('UE already connected through a NG link'))
#
if dom != 'PS':
# IuCS stack
if not self.IuCS.is_connected():
self.IuCS.set_ran(ran)
self.IuCS.set_ctx(ctx_id)
elif self.IuCS.RNC == ran:
self.IuCS.set_ctx(ctx_id)
else:
# error: already linked with another HNB
raise(CorenetErr('UE already connected through another IuCS link'))
else:
# IuPS stack
if not self.IuPS.is_connected():
self.IuPS.set_ran(ran)
self.IuPS.set_ctx(ctx_id)
elif self.IuPS.RNC == ran:
self.IuPS.set_ctx(ctx_id)
else:
# error: already linked with another HNB
raise(CorenetErr('UE already connected through another IuPS link'))
self._last_ran = self.IuCS
#
elif ran.__class__.__name__ == 'ENBd':
#
if self.IuCS.is_connected() or self.IuPS.is_connected():
# error: already linked with another ran
raise(CorenetErr('UE already connected through an Iu link'))
elif self.NG.is_connected():
raise(CorenetErr('UE already connected through a NG link'))
#
# S1 stack
if not self.S1.is_connected():
self.S1.set_ran(ran)
self.S1.set_ctx(ctx_id, sid)
elif self.S1.ENB == ran:
self.S1.set_ctx(ctx_id, sid)
else:
# error: already linked with another ENB
raise(CorenetErr('UE already connected through another S1 link'))
self._last_ran = self.S1
#
elif ran.__class__.__name__ == 'GNBd':
#
if self.IuCS.is_connected() or self.IuPS.is_connected():
# error: already linked with another ran
raise(CorenetErr('UE already connected through an Iu link'))
elif self.S1.is_connected():
raise(CorenetErr('UE already connected through a S1 link'))
#
# NG stack
if not self.NG.is_connected():
self.NG.set_ran(ran)
self.NG.set_ctx(ctx_id, sid)
elif self.NG.GNB == ran:
self.NG.set_ctx(ctx_id, sid)
else:
# error: already linked with another GNB
raise(CorenetErr('UE already connected through another NG link'))
self._last_ran = self.NG
#
else:
assert()
#
self.RAT = ran.RAT
def unset_ran(self):
# UE going IDLE
if self.IuCS.is_connected():
self.IuCS.unset_ran()
self.IuCS.unset_ctx()
if self.IuPS.is_connected():
self.IuPS.unset_ran()
self.IuPS.unset_ctx()
if self.S1.is_connected():
self.S1.unset_ran()
self.S1.unset_ctx()
if self.NG.is_connected():
self.NG.unset_ran()
self.NG.unset_ctx()
del self.RAT
def merge_cs_handler(self, iucsd):
if self.IuCS is not None:
if self.IuCS.MM.state == 'ACTIVE':
self._log('WNG', 'unable to merge IuCS handler')
return False
else:
self._log('INF', 'merging IuCS handler')
# prepend passed proc into s1d
iucsd._proc = self.IuCS._proc + iucsd._proc
iucsd.MM._proc = self.IuCS.MM._proc + iucsd.MM._proc
iucsd.CC._proc = self.IuCS.CC._proc + iucsd.CC._proc
iucsd.SMS._proc = self.IuCS.SMS._proc + iucsd.SMS._proc
iucsd.SS._proc = self.IuCS.SS._proc + iucsd.SS._proc
# merge security contexts
for cksn in range(8):
if cksn in self.IuCS.SEC and cksn not in iucsd.SEC:
iucsd.SEC[cksn] = self.IuCS.SEC[cksn]
# transfer UE's reference
self.IuCS = iucs
iucs.UE = self
iucs.MM.UE = self
iucs.CC.UE = self
iucs.SMS.UE = self
iucs.SS.UE = self
return True
def merge_ps_handler(self, iupsd):
if self.IuPS is not None:
if self.IuPS.GMM.state == 'ACTIVE':
self._log('WNG', 'unable to merge IuPS handler')
return False
else:
self._log('INF', 'merging IuPS handler')
# prepend passed proc into s1d
iupsd._proc = self.IuPS._proc + iupsd._proc
iupsd.GMM._proc = self.IuPS.GMM._proc + iupsd.GMM._proc
iupsd.SM._proc = self.IuPS.SM._proc + iupsd.SM._proc
# merge security contexts
for cksn in range(8):
if cksn in self.IuPS.SEC and cksn not in iupsd.SEC:
iupsd.SEC[cksn] = self.IuPS.SEC[cksn]
# merge PDP contexts
iupsd.SM.PDPConfig = self.SM.PDPConfig
for nsapi in range(16):
if nsapi in self.IuPS.SM.PDP and nsapi not in iupsd.SM.PDP:
iupsd.SM.PDP[nsapi] = self.IuPS.SM.PDP[nsapi]
# transfer UE's reference
self.IuPS = iupsd
iups.UE = self
iups.GMM.UE = self
iups.SM.UE = self
return True
def merge_eps_handler(self, s1d):
if self.S1 is not None:
if self.S1.EMM.state == 'ACTIVE':
self._log('WNG', 'unable to merge S1 handler')
return False
else:
self._log('INF', 'merging S1 handler')
# prepend passed proc into s1d
s1d._proc = self.S1._proc + s1d._proc
s1d.EMM._proc = self.S1.EMM._proc + s1d.EMM._proc
s1d.ESM._proc = self.S1.ESM._proc + s1d.ESM._proc
s1d.SMS._proc = self.S1.SMS._proc + s1d.SMS._proc
# merge security contexts
for ksi in range(16):
if ksi in self.S1.SEC and ksi not in s1d.SEC:
s1d.SEC[ksi] = self.S1.SEC[ksi]
# merge PDN contexts
s1d.ESM.PDNConfig = self.S1.ESM.PDNConfig
for ebi in range(16):
if ebi in self.S1.ESM.PDN and ebi not in s1d.ESM.PDN:
s1d.ESM.PDN[ebi] = self.S1.ESM.PDN[ebi]
# transfer UE's reference
self.S1 = s1d
s1d.UE = self
s1d.EMM.UE = self
s1d.ESM.UE = self
s1d.SMS.UE = self
return True
def merge_5gs_handler(self, ngd):
# TODO
pass
#--------------------------------------------------------------------------#
# UE identity
#--------------------------------------------------------------------------#
def set_ident_from_ue(self, idtype, ident, dom='CS'):
# to be used only to set identities reported by the UE
if idtype == 1:
if self.IMSI is None:
self.IMSI = ident
elif ident != self.IMSI:
self._log('WNG', 'incorrect IMSI, %s instead of %s' % (ident, self.IMSI))
elif idtype == 2:
if self.IMEI is None:
self.IMEI = ident
elif ident != self.IMEI:
self._log('WNG', 'IMEI changed, new %s, old %s' % (ident, self.IMEI))
self.IMEI = ident
elif idtype == 3:
if self.IMEISV is None:
self.IMEISV = ident
elif ident != self.IMEISV:
self._log('WNG', 'IMEISV changed, new %s, old %s' % (ident, self.IMEISV))
self.IMEISV = ident
elif idtype == 4:
if dom == 'CS':
if self.TMSI is None:
self.TMSI = ident
elif ident != self.TMSI:
self._log('WNG', 'incorrect TMSI, %s instead of %s' % (ident, self.TMSI))
elif dom == 'PS':
if self.PTMSI is None:
self.PTMSI = ident
elif ident != self.PTMSI:
self._log('WNG', 'incorrect P-TMSI, %s instead of %s' % (ident, self.PTMSI))
elif idtype == 6:
if dom == 'EPS':
if self.MTMSI is None:
self.MTMSI = ident[3]
elif ident[3] != self.MTMSI:
self._log('WNG', 'incorrect M-TMSI, %s instead of %s' % (ident, self.MTMSI))
# TODO: check what we can have within the 5GS domain
# as per TS 24.501, 9.11.3.4
else:
self._log('INF', 'unhandled identity, type %i, ident %s' % (idtype, ident))
def get_new_tmsi(self):
# use the Python random generator
# WARNING: not good for randomness, but good enough for corenet
# and at least with some good uniqueness
return random.getrandbits(32)
def set_tmsi(self, tmsi):
# delete current TMSI from the Server LUT
if self.TMSI is not None:
try:
del self.Server.TMSI[self.TMSI]
except Exception:
pass
# set the new TMSI
self.TMSI = tmsi
# update the Server LUT
self.Server.TMSI[tmsi] = self.IMSI
def set_ptmsi(self, ptmsi):
# delete current PTMSI from the Server LUT
if self.PTMSI is not None:
try:
del self.Server.PTMSI[self.PTMSI]
except Exception:
pass
# set the new PTMSI
self.PTMSI = ptmsi
# update the Server LUT
self.Server.PTMSI[ptmsi] = self.IMSI
def set_mtmsi(self, mtmsi):
# delete current MTMSI from the Server LUT
if self.MTMSI is not None:
try:
del self.Server.MTMSI[self.MTMSI]
except Exception:
pass
# set the new MTMSI
self.MTMSI = mtmsi
# update the Server LUT
self.Server.MTMSI[mtmsi] = self.IMSI
def set_fgtmsi(self, fgtmsi):
# delete current 5GTMSI from the Server LUT
if self.FGTMSI is not None:
try:
del self.Server.FGTMSI[self.FGTMSI]
except Exception:
pass
# set the new 5G TMSI
self.FGTMSI = fgtmsi
# update the Server LUT
self.Server.FGTMSI[fgtmsi] = self.FGTMSI
#--------------------------------------------------------------------------#
# UE location
#--------------------------------------------------------------------------#
def set_plmn(self, plmn):
if plmn != self.PLMN:
self.PLMN = plmn
self._log('INF', 'located in PLMN %s' % self.PLMN)
def set_lac(self, lac):
if lac != self.LAC:
self.LAC = lac
self._log('INF', 'located in LAC %.4x' % self.LAC)
def set_rac(self, rac):
if rac != self.RAC:
self.RAC = rac
self._log('INF', 'routed in RAC %.2x' % self.RAC)
def set_tac(self, tac):
if tac != self.TAC:
self.TAC = tac
self._log('INF', 'tracked in TAC %.6x' % self.TAC)
def set_lai(self, plmn, lac):
self.set_plmn(plmn)
self.set_lac(lac)
def set_tai(self, plmn, tac):
self.set_plmn(plmn)
self.set_tac(tac)
#--------------------------------------------------------------------------#
# (E)SM protocol configuration options handling
#--------------------------------------------------------------------------#
def process_protconfig(self, smd, config, request):
"""process an (E)PS session management Protocol Configuration Options request,
return the list of message's elements of the Protocol Configuration Options response,
and a bool indicating if the PDN address for the UE is required in the NAS signalling
"""
RespElt, pdnaddrreq = [], False
#
if request[2].get_val() != 0:
# not PPP with IP PDP
smd._log('WNG', 'Protocol Config, not for PPP with IP PDP')
return RespElt, pdnaddrreq
#
#smd._log('DBG', 'Protocol Config, config : %r' % config)
#smd._log('DBG', 'Protocol Config, request: %r' % request)
for ReqElt in request[3]:
pcid = ReqElt[0].get_val()
#
if pcid == 0x8021:
# IPCP
if isinstance(ReqElt[2], NAS.NCP) and ReqElt[2][0].get_val() == 1 \
and isinstance(ReqElt[2][3], NAS.NCPDataConf):
# NCP config req
ncpreq = []
for NcpOpt in ReqElt[2][3]:
ncpreq.append( NcpOpt[0].get_val() )
NcpOptResp, dnsind = [], 0
if 3 in ncpreq:
# IPv4 addr
ip = None
for ipaddr in config['Addr']:
if ipaddr[0] == 1:
ip = inet_aton_cn(*ipaddr)
break
elif ipaddr[0] == 3:
ip = inet_aton_cn(1, ipaddr[1])
break
if ip is None:
smd._log('WNG', 'Protocol Config, no config available for'\
'the IPCP IPv4 address request')
else:
NcpOptResp.append({'Type': 3, 'Data': ip})
ncpreq.remove(3)
if 129 in ncpreq:
# 1st DNS IPv4 addr
dns = None
if 'DNS' in config:
for dnsaddr in config['DNS']:
dnsind += 1
if dnsaddr[0] == 1:
dns = inet_aton_cn(*dnsaddr)
break
if dns is None:
smd._log('WNG', 'Protocol Config, no config available for'\
'the IPCP 1st DNS IPv4 request')
else:
NcpOptResp.append({'Type': 129, 'Data': dns})
ncpreq.remove(129)
if 131 in ncpreq:
# 2nd DNS IPv4 addr
dns = None
if 'DNS' in config:
for dnsaddr in config['DNS'][dnsind:]:
if dnsaddr[0] == 1:
dns = inet_aton_cn(*dnsaddr)
break
if dns is None:
smd._log('WNG', 'Protocol Config, no config available for'\
'the IPCP 2nd DNS IPv4 request')
else:
NcpOptResp.append({'Type': 131, 'Data': dns})
ncpreq.remove(131)
if ncpreq:
smd._log('WNG', 'Protocol Config, unsupported IPCP requests, %r' % ncpreq)
RespElt.append({'ID': 32801,
'Cont':{'Code': 2,
'Id': ReqElt[2][1].get_val(),
'Data': NcpOptResp}})
else:
smd._log('WNG', 'Protocol Config, invalid IPCP request format, %r' % ReqElt)
#
elif pcid == 0xC021:
# LCP
if isinstance(ReqElt[2], NAS.LCP) and ReqElt[2][0].get_val() == 1 \
and isinstance(ReqElt[2][3], NAS.LCPDataConf):
# NCP config req
lcpreq = []
for LcpOpt in ReqElt[2][2]:
lcpreq.append( LcpOpt[0].get_val() )
# TODO: handle LCP elements
#
if lcpreq:
smd._log('ERR', 'Protocol Config, unsupported LCP requests, %r' % ReqElt[2])
else:
smd._log('WNG', 'Protocol Config, invalid LCP request format, %r' % ReqElt)
#
elif pcid == 0xC023:
# PAP
if isinstance(ReqElt[2], NAS.PAP) and ReqElt[2][0].get_val() == 1:
# PAP req
if smd.AUTH_PAP_BYPASS:
RespElt.append({'ID': 0xC023,
'Cont': {'Code': 2, # Ack
'Id': ReqElt[2][1].get_val(),
'Data':{'Msg': b''}}})
else:
authreq, ack = ReqElt[2][3], False
peerid, passwd = authreq[1].get_val(), authreq[3].get_val()
if 'PAP' in config and peerid in config['PAP'] and passwd == config['PAP'][peerid]:
RespElt.append({'ID': 0xC023,
'Cont': {'Code': 2, # Ack
'Id': ReqElt[2][1].get_val(),
'Data':{'Msg': b''}}})
else:
if 'PAP' not in config:
smd._log('WNG', 'Protocol Config, no config available for'\
'the PAP authentication')
RespElt.append({'ID': 0xC023,
'Cont': {'Code': 3, # Nak
'Id': ReqElt[2][1].get_val(),
'Data':{'Msg': b'you loose'}}})
else:
smd._log('WNG', 'Protocol Config, invalid PAP request format, %r' % ReqElt)
#
elif pcid == 0xC223:
# CHAP
if isinstance(ReqElt[2], NAS.CHAP) and ReqElt[2][0].get_val() == 1:
# CHAP req
if smd.AUTH_CHAP_BYPASS:
RespElt.append({'ID': 0xC223,
'Cont': {'Code': 3, # success
'Id': ReqElt[2][1].get_val(),
'Data': b''}})
else:
# TODO: handle CHAP auth
smd._log('ERR', 'Protocol Config, unsupported CHAP authentication')
RespElt.append({'ID': 0xC223,
'Cont': {'Code': 4, # failure
'Id': ReqElt[2][1].get_val(),
'Data': b''}})
else:
smd._log('WNG', 'Protocol Config, invalid CHAP request format, %r' % ReqElt)
#
elif pcid == 0x3:
# DNS IPv6
dns = None
if 'DNS' in config:
for dnsaddr in config['DNS']:
if dnsaddr[0] == 2:
dns = inet_aton_cn(*dnsaddr)
break
if dns is None:
smd._log('WNG', 'Protocol Config, no config available for the DNS IPv6 request')
else:
RespElt.append({'ID': 0x3, 'Cont': dns})
#
elif pcid == 0xA:
# IP alloc via NAS
pdnaddrreq = True
#
elif pcid == 0xD:
# DNS IPv4
dns = None
if 'DNS' in config:
for dnsaddr in config['DNS']:
if dnsaddr[0] == 1:
dns = inet_aton_cn(*dnsaddr)
break
if dns is None:
smd._log('WNG', 'Protocol Config, no config available for the DNS IPv4 request')
else:
RespElt.append({'ID': 0xD, 'Cont': dns})
#
elif pcid == 0x10:
# IPv4 link MTU
if 'MTU' in config:
mtu = config['MTU'][0]
if isinstance(mtu, integer_types) and 0 <= mtu <= 65535:
mtu = pack('>H', mtu)
if isinstance(mtu, bytes_types):
RespElt.append({'ID': 0x10, 'Cont': mtu})
else:
smd._log('DBG', 'Protocol Config, no config available for the IPv4 MTU request')
#
elif pcid == 0x15:
# non-IP link MTU
if 'MTU' in config:
mtu = config['MTU'][1]
if isinstance(mtu, integer_types) and 0 <= mtu <= 65535:
mtu = pack('>H', mtu)
if isinstance(mtu, bytes_types):
RespElt.append({'ID': 0x15, 'Cont': mtu})
else:
smd._log('DBG', 'Protocol Config, no config available for the non-IP MTU request')
#
else:
smd._log('WNG', 'Protocol Config, unsupported request element, %r' % ReqElt)
#
return RespElt, pdnaddrreq
#--------------------------------------------------------------------------#
# SMS delivery
#--------------------------------------------------------------------------#
def smsrp_downlink(self, rp_msg):
ran, con = self._last_ran, True
if ran is None:
con = False
elif not ran.is_connected():
if not ran._net_init_con():
con = False
if con:
# UE connected over `ran'
# init the CP procedure
CPProc = ran.SMS.init_cpdata(rp_msg)
if CPProc:
SMSTx = CPProc.output()
if len(SMSTx) == 1:
SMSTx = SMSTx[0]
# send the msg toward the hnb / enb
if isinstance(ran , UEIuCSd):
# wrap into a RANAP direct transfer
RanapProc = ran.init_ranap_proc(RANAPDirectTransferCN,
NAS_PDU=SMSTx.to_bytes(),
SAPI='sapi-3')
if RanapProc and ran._send_to_rnc_ranap([RanapProc]):
return True
elif isinstance(ran, UES1d):
# wrap into an EMM procedure first
EMMProc = ran.EMM.init_proc(EMMDLNASTransport,
encod={(7, 98): {'NASContainer': SMSTx.to_bytes()}})
if EMMProc and ran.transmit_s1ap_proc(EMMProc.output()):
return True
# unable to send the SMS
self.Server.SMSd.discard_rp(rp_msg, self.MSISDN)
return False
#--------------------------------------------------------------------------#
# pretty-print all capabilities
#--------------------------------------------------------------------------#
def show_cap(self, with_measparams=False):
"""returns a string representing all capabilities reported by the UE
ready for printing on screen or writing in file
"""
try:
from IPython.lib.pretty import pretty
except Exception:
pretty = repr
else:
txt = []
if self.Cap:
for k in self.Cap:
cap = self.Cap[k]
if k == 'UERadioCap' and isinstance(cap[2], dict):
for kb in cap[2]:
# special format:
txt.append('<<< Capability : UERadioCap.%s >>>' % kb)
c = cap[2][kb]
if kb == 'geran-cs' and isinstance(c, tuple):
txt.append(c[0].show())
txt.append(c[1].show())
elif kb == 'utra':
UEUTRACap = RRC3G.PDU_definitions.InterRATHandoverInfo
UEUTRACap.set_val(c)
txt.append(UEUTRACap.to_asn1())
elif kb == 'eutra':
UEEUTRACap = RRCLTE.EUTRA_RRC_Definitions.UE_EUTRA_Capability
if not with_measparams:
meas_params_to_asn1_patch()
UEEUTRACap.set_val(c)
txt.append(UEEUTRACap.to_asn1())
if not with_measparams:
meas_params_to_asn1_restore()
#
elif hasattr(c, 'show'):
txt.append(c.show())
else:
txt.append(pretty(c))
else:
txt.append('<<< Capability : %s >>>' % k)
c = cap[1]
if hasattr(c, 'show'):
txt.append(c.show())
else:
txt.append(pretty(c))
return '\n\n'.join(txt)
else:
return ''