pycrate/pycrate_corenet/utils_fmt.py

383 lines
12 KiB
Python

# -*- coding: UTF-8 -*-
#/**
# * Software Name : pycrate
# * Version : 0.4
# *
# * Copyright 2021. Benoit Michau. P1Sec.
# *
# * This library is free software; you can redistribute it and/or
# * modify it under the terms of the GNU Lesser General Public
# * License as published by the Free Software Foundation; either
# * version 2.1 of the License, or (at your option) any later version.
# *
# * This library is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# * Lesser General Public License for more details.
# *
# * You should have received a copy of the GNU Lesser General Public
# * License along with this library; if not, write to the Free Software
# * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# * MA 02110-1301 USA
# *
# *--------------------------------------------------------
# * File Name : pycrate_corenet/utils_fmt.py
# * Created : 2021-04-01
# * Authors : Benoit Michau
# *--------------------------------------------------------
#*/
from binascii import hexlify, unhexlify
from struct import pack, unpack
from socket import inet_aton, inet_ntoa, inet_pton, inet_ntop
from pycrate_core.utils import *
from pycrate_mobile import TS24008_IE
from pycrate_mobile import NAS
#------------------------------------------------------------------------------#
# various formatting routines
#------------------------------------------------------------------------------#
def pythonize_name(name):
return name.replace('-', '_')
def plmn_buf_to_str(buf):
d = []
[d.extend([0x30 + (b&0xf), 0x30+(b>>4)]) for b in buf]
if d[3] == 0x3f:
# filler, 5 digits MCC MNC
del d[3]
return bytes(d).decode()
def plmn_str_to_buf(s):
s = s.encode()
if len(s) == 5:
return bytes([
((s[1]-0x30)<<4) + (s[0]-0x30),
0xf0 + (s[2]-0x30),
((s[4]-0x30)<<4) + (s[3]-0x30)])
else:
return bytes([
((s[1]-0x30)<<4) + (s[0]-0x30),
((s[3]-0x30)<<4) + (s[2]-0x30),
((s[5]-0x30)<<4) + (s[4]-0x30)])
__IMSI = TS24008_IE.ID()
def imsi_buf_to_str(buf):
__IMSI.from_bytes(buf)
return __IMSI.decode()[1]
def imsi_str_to_buf(s):
__IMSI.encode(type=TS24008_IE.IDTYPE_IMSI, ident=s)
return __IMSI.to_bytes()
def get_ueseccap_null_alg():
seccap = NAS.UESecCap(val={'EEA0': 1, 'EIA0': 1, 'UEA0': 1})
return seccap
def get_ueseccap_null_alg_lte():
seccap = NAS.UESecCap(val={'EEA0': 1, 'EIA0': 1})
seccap.disable_from('UEA0')
return seccap
def cellid_bstr_to_str(bstr):
# 20 or 28 bits
return hexlify(int_to_bytes(*bstr)).decode('ascii')[:-1]
def globenbid_to_hum(seq):
return {'pLMNidentity': plmn_buf_to_str(seq['pLMNidentity']),
'eNB-ID': (seq['eNB-ID'][0], cellid_bstr_to_str(seq['eNB-ID'][1]))}
def supptas_to_hum(seqof):
return [{'broadcastPLMNs': [plmn_buf_to_str(plmn) for plmn in sta['broadcastPLMNs']],
'tAC': bytes_to_uint(sta['tAC'], 16)} for sta in seqof]
def gummei_to_asn(plmnid, mmegid, mmec):
return {'pLMN-Identity': plmn_str_to_buf(plmnid),
'mME-Group-ID' : uint_to_bytes(mmegid, 16),
'mME-Code' : uint_to_bytes(mmec, 8)}
def served_gummei_to_asn(val):
return {'servedGroupIDs': [uint_to_bytes(gid, 16) for gid in val['GroupIDs']],
'servedMMECs' : [uint_to_bytes(mmec, 8) for mmec in val['MMECs']],
'servedPLMNs' : [plmn_str_to_buf(plmn) for plmn in val['PLMNs']]}
def mac_aton(mac='00:00:00:00:00:00'):
return unhexlify(mac.replace(':', ''))
def inet_aton_cn(*pdnaddr, **kw):
"""convert a PDN / PDP address tuple to a buffer
kw can be:
- dom: 'PS' or 'EPS'
"""
if pdnaddr[0] == 0:
# PPP address
return pdnaddr[1]
elif pdnaddr[0] == 1:
# IPv4 address
try:
return inet_aton(pdnaddr[1])
except Exception:
log('WNG: IPv4 address conversion error, %r' % pdnaddr[1])
return pdnaddr[1]
elif pdnaddr[0] == 2:
# accept 64-bit IPv6 prefix / subnet or full 128-bit IPv6 address
ipaddr = pdnaddr[1]
if ipaddr.count(':') == 3:
# IPv6 prefix / subnet only
return pack('>HHHH', *map(lambda x:int(x, 16), ipaddr.split(':')))
else:
try:
return inet_pton(AF_INET6, ipaddr)
except Exception:
log('WNG: IPv6 address conversion error, %r' % pdnaddr[1])
return ipaddr
elif pdnaddr[0] == 3:
# IPv4v6 addresses
if 'dom' in kw and kw['dom'] == 'EPS':
# PDN address
try:
return inet_aton_cn(2, pdnaddr[2]) + inet_aton_cn(1, pdnaddr[1])
except Exception:
log('WNG: IPv4v6 PDN address conversion error, %r' % pdnaddr[1])
return pdnaddr[1]
else:
# PDP address
try:
return inet_aton_cn(1, pdnaddr[1]) + inet_aton_cn(2, pdnaddr[2])
except Exception:
log('WNG: IPv4v6 PDP address conversion error, %r' % pdnaddr[1])
return pdnaddr[1]
else:
# unknown address type
return pdnaddr[1]
def inet_ntoa_cn(pdntype, buf, dom='EPS'):
"""convert a buffer for a given pdntype and domain to a humane-readable address
"""
if pdntype == 0:
# PPP address
return (pdntype, buf)
if pdntype == 1:
# IPv4 address
try:
return (1, inet_ntoa(buf))
except Exception:
log('WNG: IPv4 buffer conversion error, %s' % hexlify(buf).decode('ascii'))
return None
elif pdntype == 2:
# accept 64-bit IPv6 local if or full 128-bit IPv6 address
if len(buf) == 8:
return (2, '%x:%x:%x:%x' % unpack('>HHHH', buf))
else:
try:
return (2, inet_ntop(AF_INET6, buf))
except Exception:
log('WNG: IPv6 buffer conversion error, %s' % hexlify(buf).decode('ascii'))
return None
elif pdntype == 3:
if dom == 'EPS':
# PDN address
try:
return (3, inet_ntoa(buf[8:12]), inet_ntoa_cn(2, buf[:8])[1])
except Exception:
log('WNG: IPv4v6 PDN buffer conversion error, %s' % hexlify(buf).decode('ascii'))
else:
# PDP address
try:
return (3, inet_ntoa(buf[:4]), inet_ntop(AF_INET6, buf[4:20]))
except Exception:
log('WNG: IPv4v6 PDP buffer conversion error, %s' % hexlify(buf).decode('ascii'))
else:
return (pdntype, buf)
# routines for dealing with structures for NGAP
def globranid_to_hum(cho):
"""returns a 3-tuple:
- plmn id (str)
- node type (str)
- node id (bit-str value, 2-tuple of int)
"""
if cho[0] == 'globalGNB-ID':
# std gNB-ID
return (
plmn_buf_to_str(cho[1]['pLMNIdentity']),
cho[1]['gNB-ID'][0],
cho[1]['gNB-ID'][1]
)
elif cho[0] == 'globalNgENB-ID':
# std ng-eNB-ID
return (
plmn_buf_to_str(cho[1]['pLMNIdentity']),
cho[1]['ngENB-ID'][0],
cho[1]['ngENB-ID'][1]
)
elif cho[0] == 'globalN3IWF-ID':
return (
plmn_buf_to_str(cho[1]['pLMNIdentity']),
cho[1]['n3IWF-ID'][0],
cho[1]['n3IWF-ID'][1]
)
return None
def globranid_to_asn(granid):
if granid[1] == 'gNB-ID':
return (
'globalGNB-ID', {
'pLMNIdentity' : plmn_str_to_buf(granid[0]),
'gNB-ID' : (granid[1], granid[2])
}
)
elif granid[1] == 'n3IWF-ID':
return (
'globalN3IWF-ID', {
'pLMNIdentity' : plmn_str_to_buf(granid[0]),
'n3IWF-ID' : (granid[1], granid[2])
}
)
else:
return (
'globalNgENB-ID', {
'pLMNIdentity' : plmn_str_to_buf(granid[0]),
'ngENB-ID' : (granid[1], granid[2])
}
)
def bcastplmn_to_hum(seq):
"""returns a 2-tuple:
- plmn id (str)
- list 1 or 2-tuple corresponding to an snssai value with SST (uint8) and an optional SD (uint24)
"""
return (
plmn_buf_to_str(seq['pLMNIdentity']),
[(bytes_to_uint(snssai['s-NSSAI']['sST'], 8), bytes_to_uint(snssai['s-NSSAI']['sD'], 24)) \
if len(snssai['s-NSSAI']) > 1 else \
(bytes_to_uint(snssai['s-NSSAI']['sST'], 8), ) \
for snssai in seq['tAISliceSupportList']]
)
def supptalist_to_hum(seqof):
"""returns a list of 2-tuple, each 2-tuple is:
- TAC (uint24, was uint16 in S1AP)
- list of broadcasted PLMN 2-tuple (see bcastplmn_to_hum)
"""
# warning: in case of duplicate TAC, this will override GNB TAC of the first one(s)
return {
bytes_to_uint(seq['tAC'], 24) : [
bcastplmn_to_hum(bcastplmn) for bcastplmn in seq['broadcastPLMNList']] \
for seq in seqof
}
def supptalist_to_asn(supptalist):
return [
{'tAC': uint_to_bytes(tac, bitlen=24),
'broadcastPLMNList': [{
'pLMNIdentity': plmn_str_to_buf(plmn),
'tAISliceSupportList': [
{'s-NSSAI': {
'sST': uint_to_bytes(snssai[0], 8),
'sD' : uint_to_bytes(snssai[1], 24)}} if len(snssai) > 1 else \
{'s-NSSAI': {
'sST': uint_to_bytes(snssai[0], 8)}} \
for snssai in snssailist]
} for (plmn, snssailist) in bcastplmnlist]
} for (tac, bcastplmnlist) in sorted(supptalist.items())
]
def guamilist_to_hum(seqof):
# warning: in case of duplicate PLMN, this will override AMF ID of the first one(s)
return {
plmn_buf_to_str(guami['gUAMI']['pLMNIdentity']): (
guami['gUAMI']['aMFRegionID'][0],
guami['gUAMI']['aMFSetID'][0],
guami['gUAMI']['aMFPointer'][0]
) for guami in seqof
}
def guamilist_to_asn(guamilist):
return [
{'gUAMI': {
'pLMNIdentity': plmn_str_to_buf(plmn),
'aMFRegionID' : (rid, 8),
'aMFSetID' : (sid, 10),
'aMFPointer' : (ptr, 6)
}
} for (plmn, (rid, sid, ptr)) in sorted(guamilist.items())
]
def plmnsupplist_to_hum(seqof):
# warning: in case of duplicate PLMN, this will override S-NSSAI of the first one(s)
return {
plmn_buf_to_str(seq['pLMNIdentity']): [
(bytes_to_uint(snssai['s-NSSAI']['sST'], 8), bytes_to_uint(snssai['s-NSSAI']['sD'], 24)) \
if len(snssai['s-NSSAI']) > 1 else \
(bytes_to_uint(snssai['s-NSSAI']['sST'], 8), ) \
for snssai in seq['sliceSupportList']
]
for seq in seqof
}
def plmnsupplist_to_asn(plmnsupplist):
return [
{'pLMNIdentity': plmn_str_to_buf(plmn),
'sliceSupportList': [
{'s-NSSAI': {'sST': uint_to_bytes(snssai[0], 8), 'sD': uint_to_bytes(snssai[1], 24)}} if len(snssai) > 1 else \
{'s-NSSAI': {'sST': uint_to_bytes(snssai[0], 8)}} for snssai in snssais],
} for (plmn, snssais) in sorted(plmnsupplist.items())
]
def ngap_userloc_to_hum(cho):
if cho[0] == 'userLocationInformationNR':
# return NR-CGI and TAI
return {
'TAI': (
plmn_buf_to_str(cho[1]['tAI']['pLMNIdentity']),
bytes_to_uint(cho[1]['tAI']['tAC'], 24)
),
'NR-CGI': (
plmn_buf_to_str(cho[1]['nR-CGI']['pLMNIdentity']),
cho[1]['nR-CGI']['nRCellIdentity']
)
}
elif cho[0] == 'userLocationInformationEUTRA':
# return EUTRA-CGI and TAI
return {
'TAI': (
plmn_buf_to_str(cho[1]['tAI']['pLMNIdentity']),
bytes_to_uint(cho[1]['tAI']['tAC'], 16)
),
'EUTRA-CGI': (
plmn_buf_to_str(cho[1]['eUTRA-CGI']['pLMNIdentity']),
cho[1]['eUTRA-CGI']['eUTRACellIdentity']
)
}
def make_5g_snn(plmn_id, nid=None):
"""encodes the 5G Serving Network Name as in TS 24.501, 9.12.1
plmn_id: 5 or 6 digit str
nid: None or hexa-str
"""
snn = b'5G:mnc%.3i.mcc%.3i.3gppnetwork.org' % (int(mccmnc[3:]), int(mccmnc[:3]))
if nid:
snn += ":%s" % nid.encode()
return snn