From c3d9fa81326ead4d7f45bc7c84927579d640152b Mon Sep 17 00:00:00 2001 From: Vadim Yanitskiy Date: Fri, 4 Feb 2022 22:43:51 +0600 Subject: [PATCH 1/2] pycrate_osmo.RRCTL: fix wrong operator in RRCTLMsg.match() --- pycrate_osmo/RRCTL.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pycrate_osmo/RRCTL.py b/pycrate_osmo/RRCTL.py index c4e03ac..6fc7c8c 100644 --- a/pycrate_osmo/RRCTL.py +++ b/pycrate_osmo/RRCTL.py @@ -297,8 +297,8 @@ class RRCTLMsg(Envelope): self['Hdr']['Disc'].set_val(msg_disc) def match(self, msg_type, msg_disc): - if self['Hdr']['Type'].get_val() is not msg_type: + if self['Hdr']['Type'].get_val() != msg_type: return False - if self['Hdr']['Disc'].get_val() is not msg_disc: + if self['Hdr']['Disc'].get_val() != msg_disc: return False return True From 5723fc16c52c2fcca26e673633469a69675457fe Mon Sep 17 00:00:00 2001 From: Vadim Yanitskiy Date: Fri, 4 Feb 2022 21:06:28 +0600 Subject: [PATCH 2/2] tools: add simple extNAS/RRCTL demo sending Detach Request --- setup.py | 1 + tools/pycrate_extnas_demo.py | 250 +++++++++++++++++++++++++++++++++++ 2 files changed, 251 insertions(+) create mode 100755 tools/pycrate_extnas_demo.py diff --git a/setup.py b/setup.py index bcbd284..0736752 100644 --- a/setup.py +++ b/setup.py @@ -51,6 +51,7 @@ setup( "tools/pycrate_showmedia.py", "tools/pycrate_shownas.py", "tools/pycrate_map_op_info.py", + "tools/pycrate_extnas_demo.py", ], # potential dependencies diff --git a/tools/pycrate_extnas_demo.py b/tools/pycrate_extnas_demo.py new file mode 100755 index 0000000..89426c5 --- /dev/null +++ b/tools/pycrate_extnas_demo.py @@ -0,0 +1,250 @@ +#!/usr/bin/env python +# -*- coding: UTF-8 -*- + +#/** +# * Software Name : pycrate +# * Version : 0.4 +# * +# * Copyright 2022. Vadim Yanitskiy +# * +# * This library is free software; you can redistribute it and/or +# * modify it under the terms of the GNU Lesser General Public +# * License as published by the Free Software Foundation; either +# * version 2.1 of the License, or (at your option) any later version. +# * +# * This library is distributed in the hope that it will be useful, +# * but WITHOUT ANY WARRANTY; without even the implied warranty of +# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# * Lesser General Public License for more details. +# * +# * You should have received a copy of the GNU Lesser General Public +# * License along with this library; if not, write to the Free Software +# * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, +# * MA 02110-1301 USA +# * +# *-------------------------------------------------------- +# * File Name : tools/pycrate_extnas_demo.py +# * Created : 2020-03-03 +# * Authors : Vadim Yanitskiy +# *-------------------------------------------------------- +#*/ + +from binascii import hexlify +import logging as log +import argparse +import socket + +from pycrate_mobile.NASLTE import parse_NASLTE_MT +from pycrate_mobile import TS24301_EMM as NAS +from pycrate_mobile import TS24301_IE as IE + +from pycrate_osmo.RRCTL import RRCTLMsgType, RRCTLMsgDisc +from pycrate_osmo.RRCTL import RRCTLConnEstCause +from pycrate_osmo.RRCTL import PLMNInfo +from pycrate_osmo.RRCTL import RRCTLMsg + +class Connection: + def __init__(self, path='/tmp/ue_extnas.sock'): + self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.connected = False + self.sock_path = path + + def connect(self): + log.info('Connecting to \'%s\'', self.sock_path) + self.sock.connect(self.sock_path) + self.connected = True + + def disconnect(self): + self.sock.close() + self.connected = False + +class RRCTLInterface(Connection): + def send(self, msg): + log.debug('Tx RRCTL message: %s', msg['Hdr']) + self.sock.send(msg.to_bytes()) + + def receive(self): + data = self.sock.recv(1024) + if not data: + raise IOError + + msg = RRCTLMsg() + msg.from_bytes(data) + log.debug('Rx RRCTL message: %s', msg['Hdr']) + + return msg + + def expect(self, msg_type, msg_disc): + msg = self.receive() + if not msg.match(msg_type, msg_disc): + raise AssertionError('Rx unexpected RRCTL PDU', msg['Hdr']) + return msg + + def reset(self): + ''' Send Reset.Req, wait for Reset.Cnf ''' + msg = RRCTLMsg() + msg.set_type(RRCTLMsgType.Reset) + self.send(msg) + + # Wait for the confirmation + self.expect(RRCTLMsgType.Reset, RRCTLMsgDisc.Cnf) + +class TestCase: + def __init__(self, iface, params): + assert iface.connected + self.iface = iface + self.iface.reset() + self.params = params + + def do_plmn_search(self): + ''' Search for networks on the configured EARFCN ''' + msg = RRCTLMsg() + msg.set_type(RRCTLMsgType.NetworkSearch) + self.iface.send(msg) + + # Wait for the search results + msg = self.iface.expect(RRCTLMsgType.NetworkSearch, RRCTLMsgDisc.Res) + + cell_num = msg['Data']['NofPLMNs'].get_val() + cell_list = msg['Data']['PLMNs'] + if cell_num != cell_list.get_num(): + raise ValueError('PLMN list is longer / shorter than indicated') + + log.info('PLMN search completed, %u cell(s) found', cell_num) + for idx, cell in enumerate(cell_list): + log.info("Cell #%02d: %s", idx, cell) + + return cell_list + + def do_plmn_select(self, mcc, mnc): + ''' Select a cell defined by a given pair of MCC/MNC ''' + msg = RRCTLMsg() + msg.set_type(RRCTLMsgType.NetworkSelect) + msg['Data']['MCC'].set_val(mcc) + msg['Data']['MNC'].set_val(mnc) + self.iface.send(msg) + + # Wait for the network selection result + self.iface.expect(RRCTLMsgType.NetworkSelect, RRCTLMsgDisc.Cnf) + + def do_conn_establish(self, pdu, cause=RRCTLConnEstCause.MO_Signalling): + ''' Establish connection with the previously selected network ''' + msg = RRCTLMsg() + msg.set_type(RRCTLMsgType.ConnEstabish) + msg['Data']['Cause'].set_val(cause) + msg['Data']['PDU'].set_val(pdu) + self.iface.send(msg) + + # Wait for the establishment result + self.iface.expect(RRCTLMsgType.ConnEstabish, RRCTLMsgDisc.Cnf) + + def search_select(self): + cell_list = self.do_plmn_search() + if cell_list.get_num() == 0: + raise AssertionError('No PLMNs found :/') + + # Find a cell by MCC/MNC (if specified) + cell = None + for c in cell_list: + if self.params.plmn_mcc is not None: + if self.params.plmn_mcc != c['MCC'].decode(): + continue + if self.params.plmn_mnc is not None: + if self.params.plmn_mnc != c['MNC'].decode(): + continue + cell = c + break + if cell is None: + raise AssertionError('No matching PLMN found') + + log.info('Selecting PLMN: %s', cell) + (mcc, mnc, tac) = cell() + self.do_plmn_select(mcc, mnc) + + log.info('PLMN has been found and selected \o/') + + def establish(self, pdu): + log.info('Establishing MO connection to the network...') + self.do_conn_establish(pdu) + log.info('Connection has been established \o/') + + def test(self): + ''' The actual test case body ''' + raise NotImplementedError + +class TC_DetachRequest(TestCase): + def test(self): + # Search and select the network + self.search_select() + + pdu = NAS.EMMDetachRequestMO() + + pdu['EPSDetachType'].set_IE(val = { + 'SwitchOff' : 1, # Yes + 'Type' : self.params.detach_type, + }) + + # Oh, that looks so ugly... is there a better way? + epsid = IE.EPSID() + epsid.encode(IE.IDTYPE_IMSI, self.params.subscr_imsi) + pdu['EPSID'].set_val({'V' : epsid.to_bytes()}) + + log.info('Sending Detach Request:\n%s', pdu.show()) + self.establish(pdu.to_bytes()) + + # Wait for any RRCTL messages + while True: + msg = self.iface.receive() + if not msg.match(RRCTLMsgType.ConnData, RRCTLMsgDisc.Ind): + log.info('Rx unexpected message %s', msg[1]) + + lcid = msg['Data']['LCID'].get_val() + pdu_enc = msg['Data']['PDU'].get_val() + pdu_len = len(pdu_enc) + + log.info('Rx MT NAS message (lcid=0x%08x, len=%u): %s', + lcid, pdu_len, hexlify(pdu_enc)) + try: + (pdu, err) = parse_NASLTE_MT(pdu_enc) + print(pdu.show()) + except: + log.info('Failed to parse MT NAS PDU') + continue + # We're done + break + +ap = argparse.ArgumentParser(prog='pycrate_extnas_demo') + +ap.add_argument('-s', '--socket-path', metavar='PATH', + type=str, default='/tmp/ue_extnas.sock', + help='UNIX socket patch of the RRC interface (default \'%(default)s\')') +ap.add_argument('--plmn-mcc', metavar='MCC', + type=str, nargs='?', + help='Mobile Country Code (select any by default)') +ap.add_argument('--plmn-mnc', metavar='MNC', + type=str, nargs='?', + help='Mobile Network Code (select any by default)') +ap.add_argument('--subscr-imsi', metavar='IMSI', + type=str, default='001010000000000', + help='IMSI to use (default \'%(default)s\')') +ap.add_argument('--detach-type', metavar='TYPE', + type=int, default=2, choices=range(8), + help='IMSI detach type (default %(default)s, see _EPSDetTypeMO_dict)') + +log.basicConfig(format='[%(levelname)s] %(filename)s:%(lineno)d %(message)s', level=log.INFO) + +if __name__ == '__main__': + # Parse the arguments + argv = ap.parse_args() + + # Init the RRCTL interface + iface = RRCTLInterface(argv.socket_path) + iface.connect() + + test_cases = list() + test_cases.append(TC_DetachRequest(iface, argv)) + + for test in test_cases: + log.info('Starting test case \'%s\'', test.__class__.__name__) + test.test() + log.info('Test case \'%s\' completed', test.__class__.__name__)