commit 9dcedc7cfd864b47e4147598b3c9f281247ff71e Author: Vadim Yanitskiy Date: Fri Sep 27 09:32:52 2019 +0200 Initial import of s1ap_reiniger.py diff --git a/s1ap_reiniger.py b/s1ap_reiniger.py new file mode 100644 index 0000000..82b90b3 --- /dev/null +++ b/s1ap_reiniger.py @@ -0,0 +1,242 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Reiniger: S1AP / NAS packet capture anonymizer. +# +# (C) 2019 by Vadim Yanitskiy +# +# All rights reserved. +# +# SPDX-License-Identifier: GPL-2.0-or-later +# +# Released under the terms of GNU General Public License, +# Version 2 or (at your option) any later version. +# +# Based on Scapy and pycrate: +# https://github.com/secdev/scapy +# https://github.com/P1sec/pycrate + +import logging as log +import sys + +from scapy.all import PcapReader, PcapWriter +from scapy.all import SCTPChunkData, NoPayload +from scapy.all import SCTP + +from pycrate_asn1dir import S1AP +from pycrate_mobile import NAS + +def handle_nas_pdu(pdu, dl, regen = False): + log.debug("Processing %s NAS PDU: %s" % ("Downlink" if dl else "Uplink", pdu.encode('hex'))) + (msg, code) = NAS.parse_NASLTE_MT(pdu) if dl else NAS.parse_NASLTE_MO(pdu) + if code: + log.error("Failed to parse NAS payload") + return None + + # Try to find EPSID (may contain IMSI) + # TODO: also patch IMEI / IMEISV + try: + epsid = msg['EPSID'][1] + # Check if EPSID contains exactly IMSI + if epsid[2].get_val() == 1: + log.info("Cleaning %s" % epsid.repr()) + # 262420000000000, Vodafone GmbH, Germany + epsid.from_bytes('\x29\x26\x24' + '\x00' * 5) + regen = True + except: + pass + + return msg.to_bytes() if regen else pdu + +def handle_s1ap_imsi(imsi): + log.info("Cleaning IMSI: %s" % NAS.decode_bcd(imsi)) + # 262420000000000, Vodafone GmbH, Germany + return '\x29\x26\x24' + '\x00' * 5 + +def handle_s1ap_tmsi(tmsi): + log.info("Cleaning TMSI: %s" % tmsi.encode('hex')) + return tmsi # NOTE: for now, keep TMSI unchanged + +def handle_s1ap_imeisv(imeisv): + log.info("Cleaning IMEISV: %s" % NAS.decode_bcd(imeisv)) + # 3555720187847840 (Motorola C113) + return '\x33\x55\x75\x02\x81\x87\x74\x48\xf0' + +def find_s1ap_ie(ie_list, ie_id): + for ie in ie_list: + if ie['id'] == ie_id: + return ie + return None + +def find_s1ap_ies(ie_list, ie_ids): + for ie in ie_list: + if ie['id'] in ie_ids: + return ie + return None + +def handle_s1ap_paging(msg): + ie = find_s1ap_ie(msg['protocolIEs'], S1AP.S1AP_Constants.id_UEPagingID.get_val()) + if ie is None: + return + + ueid = ie['value'][1] + if ueid[0] == 's-TMSI': + tmsi = handle_s1ap_tmsi(ueid[1]['m-TMSI']) + ueid[1]['m-TMSI'] = tmsi + elif ueid[0] == 'iMSI': + imsi = handle_s1ap_imsi(ueid[1]) + # FIXME: I am not 100% sure if this is correct + ie['value'] = ('iMSI', imsi) + else: + log.warn("Unknown Paging identity type '%s'" % ueid[0]) + +def find_and_handle_s1ap_imei_ie(msg): + ie = find_s1ap_ie(msg[1]['protocolIEs'], S1AP.S1AP_Constants.id_Masked_IMEISV.get_val()) + if ie is not None: + imeisv = handle_s1ap_imeisv(ie['value'][1]) + # FIXME: I am not 100% sure if this is correct + ie['value'] = (ie['value'][0], imeisv) + +def find_and_handle_s1ap_nas_pdu(msg, dl): + ie = find_s1ap_ie(msg[1]['protocolIEs'], S1AP.S1AP_Constants.id_NAS_PDU.get_val()) + if ie is not None: + pdu = handle_nas_pdu(ie['value'][1], dl) + ie['value'] = ('NAS-PDU', pdu) + +def find_and_handle_s1ap_nested_nas_pdu(msg): + IEs = { + S1AP.S1AP_Constants.id_E_RABToBeSetupListCtxtSUReq.get_val() : 'E-RABToBeSetupItemCtxtSUReq', + S1AP.S1AP_Constants.id_E_RABToBeSetupListBearerSUReq.get_val() : 'E-RABToBeSetupItemBearerSUReq', + S1AP.S1AP_Constants.id_E_RABToBeModifiedListBearerModReq.get_val() : 'E-RABToBeModifiedItemBearerModReq'} + ie = find_s1ap_ies(msg[1]['protocolIEs'], IEs.keys()) + if ie is None: + return + + for item in ie['value'][1]: + if item['value'][0] in IEs.values(): + pdu = handle_nas_pdu(item['value'][1]['nAS-PDU'], dl = True) + item['value'][1]['nAS-PDU'] = pdu + +def handle_s1ap(msg): + log.info("Processing S1AP message '%s:%s'" % (msg[0], msg[1]['value'][0])) + + # Paging may contain + if msg[0] != u'initiatingMessage': + return + + # Look for IMSI, TMSI and IMEISV in S1AP + if msg[1]['procedureCode'] == S1AP.S1AP_Constants.id_Paging.get_val(): + handle_s1ap_paging(msg[1]['value'][1]) + elif msg[1]['procedureCode'] == S1AP.S1AP_Constants.id_InitialContextSetup.get_val(): + find_and_handle_s1ap_imei_ie(msg[1]['value']) + elif msg[1]['procedureCode'] == S1AP.S1AP_Constants.id_HandoverResourceAllocation.get_val(): + find_and_handle_s1ap_imei_ie(msg[1]['value']) + + # Look for NAS payload (which may also contain sensitive info) + procCodeListNested = ( # MME-originated NAS payload + S1AP.S1AP_Constants.id_E_RABSetup.get_val(), + S1AP.S1AP_Constants.id_E_RABModify.get_val(), + S1AP.S1AP_Constants.id_InitialContextSetup.get_val()) + procCodeListDL = ( # MME-originated NAS payload + S1AP.S1AP_Constants.id_E_RABRelease.get_val(), + S1AP.S1AP_Constants.id_downlinkNASTransport.get_val()) + procCodeListUL = ( # UE-originated payload + S1AP.S1AP_Constants.id_initialUEMessage.get_val(), + S1AP.S1AP_Constants.id_uplinkNASTransport.get_val(), + S1AP.S1AP_Constants.id_NASNonDeliveryIndication.get_val()) + if msg[1]['procedureCode'] in procCodeListNested: + find_and_handle_s1ap_nested_nas_pdu(msg[1]['value']) + elif msg[1]['procedureCode'] in procCodeListDL: + find_and_handle_s1ap_nas_pdu(msg[1]['value'], dl = True) + elif msg[1]['procedureCode'] in procCodeListUL: + find_and_handle_s1ap_nas_pdu(msg[1]['value'], dl = False) + +def handle_sctp_chunk(chunk): + log.debug("Processing an ASN.1 encoded S1AP PDU") + + # Parse ASN.1 encoded S1AP PDU + s1ap = S1AP.S1AP_PDU_Descriptions.S1AP_PDU + # Guard against malformed packets + try: + s1ap.from_aper(chunk.data) + except: + log.error("Malformed packet, skipping...") + return False + + handle_s1ap(s1ap()) + + # Encapsulate the new payload + # TODO: reset checksum fields + try: + # Encode the S1AP payload + payload = s1ap.to_aper() + chunk.data = payload + + # Scapy will calculate the length and padding + del chunk.len + except: + log.error("Failed to encode a S1AP payload") + return False + + return True + +def handle_sctp_pkt(pkt): + # Find and decapsulate the S1AP payload + ip = pkt.payload + sctp = ip.payload + + # There can be multiple chunks, look for Data with S1AP + chunk = sctp.payload + while not isinstance(chunk, NoPayload): + if isinstance(chunk, SCTPChunkData) and chunk.proto_id == 0x12: + success = handle_sctp_chunk(chunk) + if not success: + return None + chunk = chunk.payload + + return pkt + +def handle_pcap(src_path, dst_path = '/dev/null'): + # Open the destination capture + dst_pcap = PcapWriter(dst_path) + + # Open source capture (with sensitive info) + src_pcap = PcapReader(src_path) + + # Get the first packet + pkt = src_pcap.read_packet() + + # Process all packets in loop + while pkt: + # TODO: S1AP can be carried over TCP too + if pkt.haslayer(SCTP): + pkt = handle_sctp_pkt(pkt) + + # Write potentially cleaned packet to the new file + if pkt is not None: + dst_pcap.write(pkt) + + # Get the next packet + pkt = src_pcap.read_packet() + +if __name__ == '__main__': + # Configure logging format + # Example: [DEBUG] foo_bar.py:71 Mahlzeit! + LOG_FMT_DEFAULT = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s" + LOG_LEVEL_DEFAULT = log.INFO + + # Default logging handler (stderr) + sh = log.StreamHandler() + sh.setLevel(log.getLevelName(LOG_LEVEL_DEFAULT)) + sh.setFormatter(log.Formatter(LOG_FMT_DEFAULT)) + log.root.addHandler(sh) + + # Set DEBUG for the root logger + log.root.setLevel(log.DEBUG) + + # TODO: use argparse + if len(sys.argv) < 3: + print("Usage: %s SRC_PCAP DST_PCAP" % sys.argv[0]) + sys.exit(1) + + handle_pcap(sys.argv[1], sys.argv[2])