#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Reiniger: S1AP / NAS packet capture anonymizer. # # (C) 2019 by Vadim Yanitskiy # # All rights reserved. # # SPDX-License-Identifier: GPL-2.0-or-later # # Released under the terms of GNU General Public License, # Version 2 or (at your option) any later version. # # Based on Scapy and pycrate: # https://github.com/secdev/scapy # https://github.com/P1sec/pycrate import logging as log import sys from scapy.all import PcapReader, PcapWriter from scapy.all import SCTPChunkData, NoPayload from scapy.all import SCTP from pycrate_asn1dir import S1AP from pycrate_mobile import NAS from pycrate_core.charpy import Charpy IMSI_REPLACEMENT_BYTES = b'\x29\x26\x24' + b'\x00' * 5 def get_key_or_none(elem, k): try: id = elem[k] return id except ValueError: return None def has_key(elem, k): try: id = elem[k] return True except: return False def handle_nas_pdu(pdu, dl, regen = False): log.debug("Processing %s NAS PDU: %s" % ("Downlink" if dl else "Uplink", pdu.hex())) (msg, code) = NAS.parse_NASLTE_MT(pdu) if dl else NAS.parse_NASLTE_MO(pdu) if code: log.error("Failed to parse NAS payload") return None #print(msg.CLASS) if has_key(msg, 'ID'): id = msg['ID'][1] #for k in id: # print("--- %s, %s" % (k, k.show())) id_type = id['Type'].get_val() if id_type == 1: # IMSI id.from_bytes(Charpy(IMSI_REPLACEMENT_BYTES)) else: raise FooErr print("+++ %s" % (id)) regen = True # Try to find EPSID (may contain IMSI) # TODO: also patch IMEI / IMEISV if has_key(msg, 'EPSID'): epsid = msg['EPSID'][1] # Check if EPSID contains exactly IMSI id_type = epsid['Type'].get_val() print("ID type: %d" % (id_type)) if id_type == 1: log.info("Cleaning %s" % epsid.repr()) # 262420000000000, Vodafone GmbH, Germany epsid.from_bytes(Charpy(IMSI_REPLACEMENT_BYTES)) else: raise FooErr regen = True return msg.to_bytes() if regen else pdu def handle_s1ap_imsi(imsi): log.info("Cleaning IMSI: %s" % NAS.decode_bcd(imsi)) # 262420000000000, Vodafone GmbH, Germany return '\x29\x26\x24' + '\x00' * 5 def handle_s1ap_tmsi(tmsi): log.info("Cleaning TMSI: %s" % tmsi.hex()) return tmsi # NOTE: for now, keep TMSI unchanged def handle_s1ap_imeisv(imeisv): log.info("Cleaning IMEISV: %s" % NAS.decode_bcd(imeisv)) # 3555720187847840 (Motorola C113) return '\x33\x55\x75\x02\x81\x87\x74\x48\xf0' def find_s1ap_ie(ie_list, ie_id): for ie in ie_list: if ie['id'] == ie_id: return ie return None def find_s1ap_ies(ie_list, ie_ids): for ie in ie_list: if ie['id'] in ie_ids: return ie return None def handle_s1ap_paging(msg): ie = find_s1ap_ie(msg['protocolIEs'], S1AP.S1AP_Constants.id_UEPagingID.get_val()) if ie is None: return ueid = ie['value'][1] if ueid[0] == 's-TMSI': tmsi = handle_s1ap_tmsi(ueid[1]['m-TMSI']) ueid[1]['m-TMSI'] = tmsi elif ueid[0] == 'iMSI': imsi = handle_s1ap_imsi(ueid[1]) # FIXME: I am not 100% sure if this is correct ie['value'] = ('iMSI', imsi) else: log.warn("Unknown Paging identity type '%s'" % ueid[0]) def find_and_handle_s1ap_imei_ie(msg): ie = find_s1ap_ie(msg[1]['protocolIEs'], S1AP.S1AP_Constants.id_Masked_IMEISV.get_val()) if ie is not None: imeisv = handle_s1ap_imeisv(ie['value'][1]) # FIXME: I am not 100% sure if this is correct ie['value'] = (ie['value'][0], imeisv) def find_and_handle_s1ap_nas_pdu(msg, dl): ie = find_s1ap_ie(msg[1]['protocolIEs'], S1AP.S1AP_Constants.id_NAS_PDU.get_val()) if ie is not None: pdu = handle_nas_pdu(ie['value'][1], dl) ie['value'] = ('NAS-PDU', pdu) def find_and_handle_s1ap_nested_nas_pdu(msg): IEs = { S1AP.S1AP_Constants.id_E_RABToBeSetupListCtxtSUReq.get_val() : 'E-RABToBeSetupItemCtxtSUReq', S1AP.S1AP_Constants.id_E_RABToBeSetupListBearerSUReq.get_val() : 'E-RABToBeSetupItemBearerSUReq', S1AP.S1AP_Constants.id_E_RABToBeModifiedListBearerModReq.get_val() : 'E-RABToBeModifiedItemBearerModReq'} ie = find_s1ap_ies(msg[1]['protocolIEs'], IEs.keys()) if ie is None: return for item in ie['value'][1]: if item['value'][0] in IEs.values(): if 'nAS-PDU' in item['value'][1]: pdu = handle_nas_pdu(item['value'][1]['nAS-PDU'], dl = True) item['value'][1]['nAS-PDU'] = pdu def handle_s1ap(msg, pkt_number): log.info("Processing S1AP message '%s:%s' (Packet %d)" % (msg[0], msg[1]['value'][0], pkt_number)) # Paging may contain if msg[0] != u'initiatingMessage': return # Look for IMSI, TMSI and IMEISV in S1AP if msg[1]['procedureCode'] == S1AP.S1AP_Constants.id_Paging.get_val(): handle_s1ap_paging(msg[1]['value'][1]) elif msg[1]['procedureCode'] == S1AP.S1AP_Constants.id_InitialContextSetup.get_val(): find_and_handle_s1ap_imei_ie(msg[1]['value']) elif msg[1]['procedureCode'] == S1AP.S1AP_Constants.id_HandoverResourceAllocation.get_val(): find_and_handle_s1ap_imei_ie(msg[1]['value']) # Look for NAS payload (which may also contain sensitive info) procCodeListNested = ( # MME-originated NAS payload S1AP.S1AP_Constants.id_E_RABSetup.get_val(), S1AP.S1AP_Constants.id_E_RABModify.get_val(), S1AP.S1AP_Constants.id_InitialContextSetup.get_val()) procCodeListDL = ( # MME-originated NAS payload S1AP.S1AP_Constants.id_E_RABRelease.get_val(), S1AP.S1AP_Constants.id_downlinkNASTransport.get_val()) procCodeListUL = ( # UE-originated payload S1AP.S1AP_Constants.id_initialUEMessage.get_val(), S1AP.S1AP_Constants.id_uplinkNASTransport.get_val(), S1AP.S1AP_Constants.id_NASNonDeliveryIndication.get_val()) if msg[1]['procedureCode'] in procCodeListNested: find_and_handle_s1ap_nested_nas_pdu(msg[1]['value']) elif msg[1]['procedureCode'] in procCodeListDL: find_and_handle_s1ap_nas_pdu(msg[1]['value'], dl = True) elif msg[1]['procedureCode'] in procCodeListUL: find_and_handle_s1ap_nas_pdu(msg[1]['value'], dl = False) def handle_sctp_chunk(chunk, pkt_number): log.debug("Processing an ASN.1 encoded S1AP PDU") # Parse ASN.1 encoded S1AP PDU s1ap = S1AP.S1AP_PDU_Descriptions.S1AP_PDU # Guard against malformed packets try: s1ap.from_aper(chunk.data) except: log.error("Malformed packet, skipping...") return False try: handle_s1ap(s1ap(), pkt_number) except: log.error("Failed to decode S1AP payload (Packet %d)", (pkt_number)) return False # Encapsulate the new payload # TODO: reset checksum fields try: # Encode the S1AP payload payload = s1ap.to_aper() chunk.data = payload # Scapy will calculate the length and padding del chunk.len except: log.error("Failed to encode a S1AP payload") return False return True def handle_sctp_pkt(pkt, pkt_number): # Find and decapsulate the S1AP payload ip = pkt.payload sctp = ip.payload # There can be multiple chunks, look for Data with S1AP chunk = sctp.payload while not isinstance(chunk, NoPayload): if isinstance(chunk, SCTPChunkData) and chunk.proto_id == 0x12: success = handle_sctp_chunk(chunk, pkt_number) if not success: return None chunk = chunk.payload return pkt def handle_pcap(src_path, dst_path = '/dev/null'): i = 1 # Open the destination capture dst_pcap = PcapWriter(dst_path) # Open source capture (with sensitive info) src_pcap = PcapReader(src_path) # Get the first packet pkt = src_pcap.read_packet() # Process all packets in loop while pkt: # TODO: S1AP can be carried over TCP too if pkt.haslayer(SCTP): pkt = handle_sctp_pkt(pkt, i) # Write potentially cleaned packet to the new file if pkt is not None: dst_pcap.write(pkt) # Get the next packet pkt = src_pcap.read_packet() i += 1 if __name__ == '__main__': # Configure logging format # Example: [DEBUG] foo_bar.py:71 Mahlzeit! LOG_FMT_DEFAULT = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s" LOG_LEVEL_DEFAULT = log.INFO # Default logging handler (stderr) sh = log.StreamHandler() sh.setLevel(log.getLevelName(LOG_LEVEL_DEFAULT)) sh.setFormatter(log.Formatter(LOG_FMT_DEFAULT)) log.root.addHandler(sh) # Set DEBUG for the root logger log.root.setLevel(log.DEBUG) # TODO: use argparse if len(sys.argv) < 3: print("Usage: %s SRC_PCAP DST_PCAP" % sys.argv[0]) sys.exit(1) handle_pcap(sys.argv[1], sys.argv[2])