tools: add simple extNAS/RRCTL demo sending Detach Request
This commit is contained in:
parent
c3d9fa8132
commit
5723fc16c5
1
setup.py
1
setup.py
|
@ -51,6 +51,7 @@ setup(
|
|||
"tools/pycrate_showmedia.py",
|
||||
"tools/pycrate_shownas.py",
|
||||
"tools/pycrate_map_op_info.py",
|
||||
"tools/pycrate_extnas_demo.py",
|
||||
],
|
||||
|
||||
# potential dependencies
|
||||
|
|
|
@ -0,0 +1,250 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: UTF-8 -*-
|
||||
|
||||
#/**
|
||||
# * Software Name : pycrate
|
||||
# * Version : 0.4
|
||||
# *
|
||||
# * Copyright 2022. Vadim Yanitskiy
|
||||
# *
|
||||
# * This library is free software; you can redistribute it and/or
|
||||
# * modify it under the terms of the GNU Lesser General Public
|
||||
# * License as published by the Free Software Foundation; either
|
||||
# * version 2.1 of the License, or (at your option) any later version.
|
||||
# *
|
||||
# * This library is distributed in the hope that it will be useful,
|
||||
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
# * Lesser General Public License for more details.
|
||||
# *
|
||||
# * You should have received a copy of the GNU Lesser General Public
|
||||
# * License along with this library; if not, write to the Free Software
|
||||
# * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
||||
# * MA 02110-1301 USA
|
||||
# *
|
||||
# *--------------------------------------------------------
|
||||
# * File Name : tools/pycrate_extnas_demo.py
|
||||
# * Created : 2020-03-03
|
||||
# * Authors : Vadim Yanitskiy
|
||||
# *--------------------------------------------------------
|
||||
#*/
|
||||
|
||||
from binascii import hexlify
|
||||
import logging as log
|
||||
import argparse
|
||||
import socket
|
||||
|
||||
from pycrate_mobile.NASLTE import parse_NASLTE_MT
|
||||
from pycrate_mobile import TS24301_EMM as NAS
|
||||
from pycrate_mobile import TS24301_IE as IE
|
||||
|
||||
from pycrate_osmo.RRCTL import RRCTLMsgType, RRCTLMsgDisc
|
||||
from pycrate_osmo.RRCTL import RRCTLConnEstCause
|
||||
from pycrate_osmo.RRCTL import PLMNInfo
|
||||
from pycrate_osmo.RRCTL import RRCTLMsg
|
||||
|
||||
class Connection:
|
||||
def __init__(self, path='/tmp/ue_extnas.sock'):
|
||||
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
self.connected = False
|
||||
self.sock_path = path
|
||||
|
||||
def connect(self):
|
||||
log.info('Connecting to \'%s\'', self.sock_path)
|
||||
self.sock.connect(self.sock_path)
|
||||
self.connected = True
|
||||
|
||||
def disconnect(self):
|
||||
self.sock.close()
|
||||
self.connected = False
|
||||
|
||||
class RRCTLInterface(Connection):
|
||||
def send(self, msg):
|
||||
log.debug('Tx RRCTL message: %s', msg['Hdr'])
|
||||
self.sock.send(msg.to_bytes())
|
||||
|
||||
def receive(self):
|
||||
data = self.sock.recv(1024)
|
||||
if not data:
|
||||
raise IOError
|
||||
|
||||
msg = RRCTLMsg()
|
||||
msg.from_bytes(data)
|
||||
log.debug('Rx RRCTL message: %s', msg['Hdr'])
|
||||
|
||||
return msg
|
||||
|
||||
def expect(self, msg_type, msg_disc):
|
||||
msg = self.receive()
|
||||
if not msg.match(msg_type, msg_disc):
|
||||
raise AssertionError('Rx unexpected RRCTL PDU', msg['Hdr'])
|
||||
return msg
|
||||
|
||||
def reset(self):
|
||||
''' Send Reset.Req, wait for Reset.Cnf '''
|
||||
msg = RRCTLMsg()
|
||||
msg.set_type(RRCTLMsgType.Reset)
|
||||
self.send(msg)
|
||||
|
||||
# Wait for the confirmation
|
||||
self.expect(RRCTLMsgType.Reset, RRCTLMsgDisc.Cnf)
|
||||
|
||||
class TestCase:
|
||||
def __init__(self, iface, params):
|
||||
assert iface.connected
|
||||
self.iface = iface
|
||||
self.iface.reset()
|
||||
self.params = params
|
||||
|
||||
def do_plmn_search(self):
|
||||
''' Search for networks on the configured EARFCN '''
|
||||
msg = RRCTLMsg()
|
||||
msg.set_type(RRCTLMsgType.NetworkSearch)
|
||||
self.iface.send(msg)
|
||||
|
||||
# Wait for the search results
|
||||
msg = self.iface.expect(RRCTLMsgType.NetworkSearch, RRCTLMsgDisc.Res)
|
||||
|
||||
cell_num = msg['Data']['NofPLMNs'].get_val()
|
||||
cell_list = msg['Data']['PLMNs']
|
||||
if cell_num != cell_list.get_num():
|
||||
raise ValueError('PLMN list is longer / shorter than indicated')
|
||||
|
||||
log.info('PLMN search completed, %u cell(s) found', cell_num)
|
||||
for idx, cell in enumerate(cell_list):
|
||||
log.info("Cell #%02d: %s", idx, cell)
|
||||
|
||||
return cell_list
|
||||
|
||||
def do_plmn_select(self, mcc, mnc):
|
||||
''' Select a cell defined by a given pair of MCC/MNC '''
|
||||
msg = RRCTLMsg()
|
||||
msg.set_type(RRCTLMsgType.NetworkSelect)
|
||||
msg['Data']['MCC'].set_val(mcc)
|
||||
msg['Data']['MNC'].set_val(mnc)
|
||||
self.iface.send(msg)
|
||||
|
||||
# Wait for the network selection result
|
||||
self.iface.expect(RRCTLMsgType.NetworkSelect, RRCTLMsgDisc.Cnf)
|
||||
|
||||
def do_conn_establish(self, pdu, cause=RRCTLConnEstCause.MO_Signalling):
|
||||
''' Establish connection with the previously selected network '''
|
||||
msg = RRCTLMsg()
|
||||
msg.set_type(RRCTLMsgType.ConnEstabish)
|
||||
msg['Data']['Cause'].set_val(cause)
|
||||
msg['Data']['PDU'].set_val(pdu)
|
||||
self.iface.send(msg)
|
||||
|
||||
# Wait for the establishment result
|
||||
self.iface.expect(RRCTLMsgType.ConnEstabish, RRCTLMsgDisc.Cnf)
|
||||
|
||||
def search_select(self):
|
||||
cell_list = self.do_plmn_search()
|
||||
if cell_list.get_num() == 0:
|
||||
raise AssertionError('No PLMNs found :/')
|
||||
|
||||
# Find a cell by MCC/MNC (if specified)
|
||||
cell = None
|
||||
for c in cell_list:
|
||||
if self.params.plmn_mcc is not None:
|
||||
if self.params.plmn_mcc != c['MCC'].decode():
|
||||
continue
|
||||
if self.params.plmn_mnc is not None:
|
||||
if self.params.plmn_mnc != c['MNC'].decode():
|
||||
continue
|
||||
cell = c
|
||||
break
|
||||
if cell is None:
|
||||
raise AssertionError('No matching PLMN found')
|
||||
|
||||
log.info('Selecting PLMN: %s', cell)
|
||||
(mcc, mnc, tac) = cell()
|
||||
self.do_plmn_select(mcc, mnc)
|
||||
|
||||
log.info('PLMN has been found and selected \o/')
|
||||
|
||||
def establish(self, pdu):
|
||||
log.info('Establishing MO connection to the network...')
|
||||
self.do_conn_establish(pdu)
|
||||
log.info('Connection has been established \o/')
|
||||
|
||||
def test(self):
|
||||
''' The actual test case body '''
|
||||
raise NotImplementedError
|
||||
|
||||
class TC_DetachRequest(TestCase):
|
||||
def test(self):
|
||||
# Search and select the network
|
||||
self.search_select()
|
||||
|
||||
pdu = NAS.EMMDetachRequestMO()
|
||||
|
||||
pdu['EPSDetachType'].set_IE(val = {
|
||||
'SwitchOff' : 1, # Yes
|
||||
'Type' : self.params.detach_type,
|
||||
})
|
||||
|
||||
# Oh, that looks so ugly... is there a better way?
|
||||
epsid = IE.EPSID()
|
||||
epsid.encode(IE.IDTYPE_IMSI, self.params.subscr_imsi)
|
||||
pdu['EPSID'].set_val({'V' : epsid.to_bytes()})
|
||||
|
||||
log.info('Sending Detach Request:\n%s', pdu.show())
|
||||
self.establish(pdu.to_bytes())
|
||||
|
||||
# Wait for any RRCTL messages
|
||||
while True:
|
||||
msg = self.iface.receive()
|
||||
if not msg.match(RRCTLMsgType.ConnData, RRCTLMsgDisc.Ind):
|
||||
log.info('Rx unexpected message %s', msg[1])
|
||||
|
||||
lcid = msg['Data']['LCID'].get_val()
|
||||
pdu_enc = msg['Data']['PDU'].get_val()
|
||||
pdu_len = len(pdu_enc)
|
||||
|
||||
log.info('Rx MT NAS message (lcid=0x%08x, len=%u): %s',
|
||||
lcid, pdu_len, hexlify(pdu_enc))
|
||||
try:
|
||||
(pdu, err) = parse_NASLTE_MT(pdu_enc)
|
||||
print(pdu.show())
|
||||
except:
|
||||
log.info('Failed to parse MT NAS PDU')
|
||||
continue
|
||||
# We're done
|
||||
break
|
||||
|
||||
ap = argparse.ArgumentParser(prog='pycrate_extnas_demo')
|
||||
|
||||
ap.add_argument('-s', '--socket-path', metavar='PATH',
|
||||
type=str, default='/tmp/ue_extnas.sock',
|
||||
help='UNIX socket patch of the RRC interface (default \'%(default)s\')')
|
||||
ap.add_argument('--plmn-mcc', metavar='MCC',
|
||||
type=str, nargs='?',
|
||||
help='Mobile Country Code (select any by default)')
|
||||
ap.add_argument('--plmn-mnc', metavar='MNC',
|
||||
type=str, nargs='?',
|
||||
help='Mobile Network Code (select any by default)')
|
||||
ap.add_argument('--subscr-imsi', metavar='IMSI',
|
||||
type=str, default='001010000000000',
|
||||
help='IMSI to use (default \'%(default)s\')')
|
||||
ap.add_argument('--detach-type', metavar='TYPE',
|
||||
type=int, default=2, choices=range(8),
|
||||
help='IMSI detach type (default %(default)s, see _EPSDetTypeMO_dict)')
|
||||
|
||||
log.basicConfig(format='[%(levelname)s] %(filename)s:%(lineno)d %(message)s', level=log.INFO)
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Parse the arguments
|
||||
argv = ap.parse_args()
|
||||
|
||||
# Init the RRCTL interface
|
||||
iface = RRCTLInterface(argv.socket_path)
|
||||
iface.connect()
|
||||
|
||||
test_cases = list()
|
||||
test_cases.append(TC_DetachRequest(iface, argv))
|
||||
|
||||
for test in test_cases:
|
||||
log.info('Starting test case \'%s\'', test.__class__.__name__)
|
||||
test.test()
|
||||
log.info('Test case \'%s\' completed', test.__class__.__name__)
|
Loading…
Reference in New Issue