pycrate/pycrate_corenet/ProcCNRua.py

497 lines
15 KiB
Python

# -*- coding: UTF-8 -*-
#/**
# * Software Name : pycrate
# * Version : 0.4
# *
# * Copyright 2017. Benoit Michau. ANSSI.
# *
# * This library is free software; you can redistribute it and/or
# * modify it under the terms of the GNU Lesser General Public
# * License as published by the Free Software Foundation; either
# * version 2.1 of the License, or (at your option) any later version.
# *
# * This library is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# * Lesser General Public License for more details.
# *
# * You should have received a copy of the GNU Lesser General Public
# * License along with this library; if not, write to the Free Software
# * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# * MA 02110-1301 USA
# *
# *--------------------------------------------------------
# * File Name : pycrate_corenet/ProcCNRuapy
# * Created : 2017-07-13
# * Authors : Benoit Michau
# *--------------------------------------------------------
#*/
__all__ = [
'RUASigProc',
'RUAConnect',
'RUADirectTransfer',
'RUADisconnect',
'RUAConnectlessTransfer',
'RUAErrorInd',
'RUAPrivateMessage',
#
'RUAProcDispatcher'
]
from .utils import *
from .ProcProto import *
#------------------------------------------------------------------------------#
# RUA signalling procedure
# TS 25.468, version d10
# HNB-GW side
#------------------------------------------------------------------------------#
class RUASigProc(LinkSigProc):
"""RUA signalling procedure handler
instance attributes:
- Name : procedure name
- HNB : reference to the HNBd instance running this procedure
- Server: reference to the CorenetServer instance handling the HNB
- Desc : ASN.1 procedure description
- Code : procedure code
- Crit : procedure criticality
- Cont : ASN.1 procedure PDU(s) content
- Encod : custom PDU encoders with fixed values
- Decod : custom PDU decoders with tranform functions
"""
TRACK_PDU = True
def __init__(self, hnbd):
#
self.Name = self.__class__.__name__
self.HNB = hnbd
self.Server = hnbd.Server
#
# to store PDU traces
self._pdu = []
# list of PDU to be sent to the HNB
self._pdu_tx = []
#
self._log('DBG', 'instantiating procedure')
def _log(self, logtype, msg):
self.HNB._log(logtype, '[%s] %s' % (self.Name, msg))
def _recv(self, pdu_rx):
if self.TRACK_PDU:
self._pdu.append( (time(), 'UL', pdu_rx) )
self.errcause, self.ConInfo = None, {}
try:
self.decode_pdu(pdu_rx, self.ConInfo)
except Exception as err:
self._err = err
self._log('ERR', 'decode_pdu (%s), sending error indication' % err)
self.errcause = ('protocol', 'abstract-syntax-error-reject')
def recv(self, pdu_rx):
self._recv(pdu_rx)
self._log('ERR', 'recv() not implemented')
def send(self):
if self.TRACK_PDU:
for pdu in self._pdu_tx:
self._pdu.append( (time(), 'DL', pdu) )
return self._pdu_tx
def trigger(self):
self._log('ERR', 'trigger() not implemented')
return []
def abort(self):
self._log('INF', 'aborting')
# All following RUA procedures have recv() and trigger() methods defined
# corresponding to HNB-initiated procedures
class RUAConnect(RUASigProc):
"""Connect: TS 25.468, section 8.2
HNB-initiated or GW-initiated
request only
InitiatingMessage:
IEs:
- 3: Context_ID (M)
- 4: RANAP_Message (M)
- 5: IntraDomainNasNodeSelector (O)
- 6: Establishment_Cause (M)
- 7: CN_DomainIndicator (M)
Extensions:
- 9: CSGMembershipStatus (O)
"""
# ASN.1 procedure description
Desc = RUA.RUA_PDU_Descriptions.connectionRequest
# Custom decoders
Decod = {
'ini': ({}, {}),
'suc': None,
'uns': None
}
# Custom encoders
Encod = {
'ini': ({}, {}),
'suc': None,
'uns': None
}
def recv(self, pdu_rx):
self._recv(pdu_rx)
if not self.errcause:
# log EC request
if self.ConInfo['Establishment_Cause'] == 'emergency-call':
self._log('WNG', 'emergency call requested')
# get the UE corresponding to the RUA ctx_id
ctx_id = self.ConInfo['Context_ID'][0]
try:
ued = self.HNB.UE_HNBAP[ctx_id]
except Exception:
self._log('ERR', 'no UE associated to context-id %i'\
% self.ConInfo['Context_ID'][0])
self.errcause = ('radioNetwork', 'connect-failed')
else:
#
# set the Iu context-id and dispatch the RANAP PDU
if self.ConInfo['CN_DomainIndicator'] == 'cs-domain':
self.HNB.set_ue_iucs(ued, ctx_id)
try:
ued.set_ran(self.HNB, ctx_id, dom='CS')
except Exception as err:
self._log('ERR', 'UE connected to several RAN, %r' % err)
return []
self.retpdu = ued.IuCS.process_ranap(self.ConInfo['RANAP_Message'])
else:
#self.ConInfo['CN_DomainIndicator'] == 'ps-domain'
self.HNB.set_ue_iups(ued, ctx_id)
try:
ued.set_ran(self.HNB, ctx_id, dom='PS')
except Exception as err:
self._log('ERR', 'UE connected to several RAN, %r' % err)
return []
self.retpdu = ued.IuPS.process_ranap(self.ConInfo['RANAP_Message'])
def trigger(self):
if self.errcause:
Err = self.HNB.init_rua_proc(RUAErrorInd, Cause=self.errcause)
return [Err]
else:
Trans = []
for pdu in self.retpdu:
# wrap each RANAP PDU into a RUA direct transfer PDU
Trans.append( self.HNB.init_rua_proc(RUADirectTransfer,
Context_ID=self.ConInfo['Context_ID'],
RANAP_Message=pdu,
CN_DomainIndicator=self.ConInfo['CN_DomainIndicator']) )
return Trans
class RUADirectTransfer(RUASigProc):
"""Direct Transfer: TS 25.468, section 8.3
HNB-initiated or GW-initiated
request only
InitiatingMessage:
IEs:
- 3: Context_ID (M)
- 4: RANAP_Message (M)
- 7: CN_DomainIndicator (M)
Extensions:
None
"""
# ASN.1 procedure description
Desc = RUA.RUA_PDU_Descriptions.directTransfer
# Custom decoders
Decod = {
'ini': ({}, {}),
'suc': None,
'uns': None
}
# Custom encoders
Encod = {
'ini': ({}, {}),
'suc': None,
'uns': None
}
def recv(self, pdu_rx):
self._recv(pdu_rx)
if not self.errcause:
# get the UE corresponding to the ctx_id and dispatch the RANAP PDU
if self.ConInfo['CN_DomainIndicator'] == 'cs-domain':
try:
ued = self.HNB.UE_IuCS[self.ConInfo['Context_ID'][0]]
except Exception:
self._log('ERR', 'no UE associated to context-id %i'\
% self.ConInfo['Context_ID'][0])
self.errcause = ('protocol', 'abstract-syntax-error-reject')
else:
self.retpdu = ued.IuCS.process_ranap(self.ConInfo['RANAP_Message'])
else:
try:
ued = self.HNB.UE_IuPS[self.ConInfo['Context_ID'][0]]
except Exception:
self._log('ERR', 'no UE associated to context-id %i'\
% self.ConInfo['Context_ID'][0])
self.errcause = ('protocol', 'abstract-syntax-error-reject')
else:
self.retpdu = ued.IuPS.process_ranap(self.ConInfo['RANAP_Message'])
def trigger(self):
if self.errcause:
Err = self.HNB.init_rua_proc(RUAErrorInd, Cause=self.errcause)
return [Err]
else:
Trans = []
for pdu in self.retpdu:
# wrap each RANAP PDU into a RUA direct transfer PDU
Trans.append( self.HNB.init_rua_proc(RUADirectTransfer,
Context_ID=self.ConInfo['Context_ID'],
RANAP_Message=pdu,
CN_DomainIndicator=self.ConInfo['CN_DomainIndicator']) )
return Trans
class RUADisconnect(RUASigProc):
"""Disconnect: TS 25.468, section 8.4
HNB-initiated or GW-initiated
request only
InitiatingMessage:
IEs:
- 1: Cause (M)
- 3: Context_ID (M)
- 4: RANAP_Message (C)
- 7: CN_DomainIndicator (M)
Extensions:
None
"""
# ASN.1 procedure description
Desc = RUA.RUA_PDU_Descriptions.disconnectRequest
# Custom decoders
Decod = {
'ini': ({}, {}),
'suc': None,
'uns': None
}
# Custom encoders
Encod = {
'ini': ({}, {}),
'suc': None,
'uns': None
}
def recv(self, pdu_rx):
self._recv(pdu_rx)
if not self.errcause:
# get the UE corresponding to the ctx_id and dispatch the RANAP PDU
if self.ConInfo['CN_DomainIndicator'] == 'cs-domain':
try:
ued = self.HNB.UE_IuCS[self.ConInfo['Context_ID'][0]]
except Exception:
self._log('ERR', 'no UE associated to context-id %i'\
% self.ConInfo['Context_ID'][0])
self.errcause = ('protocol', 'abstract-syntax-error-reject')
else:
self.retpdu = ued.IuCS.process_ranap(self.ConInfo['RANAP_Message'])
# RANAP_Message should be an IuRelease response
# hence, there should be no RANAP answer from the CN
# after receiving an RUA disconnect
assert( self.retpdu == [] )
self.HNB.unset_ue_iucs(self.ConInfo['Context_ID'][0])
else:
try:
ued = self.HNB.UE_IuPS[self.ConInfo['Context_ID'][0]]
except Exception:
self._log('ERR', 'no UE associated to context-id %i'\
% self.ConInfo['Context_ID'][0])
self.errcause = ('protocol', 'abstract-syntax-error-reject')
else:
self.retpdu = ued.IuPS.process_ranap(self.ConInfo['RANAP_Message'])
assert( self.retpdu == [] )
self.HNB.unset_ue_iups(self.ConInfo['Context_ID'][0])
def trigger(self):
if self.errcause is not None:
Err = self.HNB.init_rua_proc(RUAErrorInd, Cause=self.errcause)
return [Err]
else:
# nothing to trigger after an RUA disconnect
return []
class RUAConnectlessTransfer(RUASigProc):
"""Connectionless Transfer : TS 25.468, section 8.5
HNB-initiated or GW-initiated
request only
InitiatingMessage:
IEs:
- 4: RANAP_Message (M)
Extensions:
None
"""
# ASN.1 procedure description
Desc = RUA.RUA_PDU_Descriptions.connectionlessTransfer
# Custom decoders
Decod = {
'ini': ({}, {}),
'suc': None,
'uns': None
}
# Custom encoders
Encod = {
'ini': ({}, {}),
'suc': None,
'uns': None
}
def recv(self, pdu_rx):
self._recv(pdu_rx)
if not self.errcause:
# connection-less transfer, process the RANAP PDU directly in the RNC handler
self.retpdu = self.HNB.process_ranap(self.ConInfo['RANAP_Message'])
def trigger(self):
if self.errcause is not None:
Err = self.HNB.init_rua_proc(RUAErrorInd, Cause=self.errcause)
return [Err]
else:
Trans = []
for pdu in self.retpdu:
# wrap each RANAP PDU into a RUA connect less transfer PDU
Trans.append( self.HNB.init_rua_proc(RUAConnectlessTransfer,
RANAP_Message=pdu) )
return Trans
class RUAErrorInd(RUASigProc):
"""Error Indication: TS 25.468, section 8.6
HNB-initiated or GW-initiated
request only
InitiatingMessage:
IEs:
- 1: Cause (M)
- 2: CriticalityDiagnostics (O)
Extensions:
None
"""
# ASN.1 procedure description
Desc = RUA.RUA_PDU_Descriptions.errorIndication
# Custom decoders
Decod = {
'ini': ({}, {}),
'suc': None,
'uns': None
}
# Custom encoders
Encod = {
'ini': ({}, {}),
'suc': None,
'uns': None
}
# we don't respond to a malformed error ind with another error ind
errcause = None
def recv(self, pdu_rx):
if self.TRACK_PDU:
self._pdu.append( (time(), 'UL', pdu_rx) )
#
# WNG: this is the same error ind procedure used for
# HNB-initiated and GW-initiated procedure
#
if pdu_rx[0] == 'initiatingMessage' and pdu_rx[1]['procedureCode'] == 1:
# HNB-initiated
self.ErrInfo = {}
try:
self.decode_pdu(pdu_rx, self.ErrInfo)
except Exception as err:
self._err = err
self._log('ERR', 'decode_pdu: %s' % err)
# don't respond with another error ind
else:
self._log('WNG', 'error ind received: %s.%s' % self.ErrInfo['Cause'])
#
# otherwise: GW-initiated, nothing to do
def trigger(self):
# nothing to trigger after an error ind
return []
class RUAPrivateMessage(RUASigProc):
"""Error Indication: TS 25.468
HNB-initiated or GW-initiated
request only
InitiatingMessage:
None
"""
# ASN.1 procedure description
Desc = RUA.RUA_PDU_Descriptions.errorIndication
# Custom decoders
Decod = {
'ini': ({}, {}),
'suc': None,
'uns': None
}
# Custom encoders
Encod = {
'ini': ({}, {}),
'suc': None,
'uns': None
}
# initializing all RUA procedures classes
RUAConnect.init()
RUADirectTransfer.init()
RUADisconnect.init()
RUAConnectlessTransfer.init()
RUAErrorInd.init()
RUAPrivateMessage.init()
# RUA procedures dispatcher
RUAProcDispatcher = {
1 : RUAConnect,
2 : RUADirectTransfer,
3 : RUADisconnect,
4 : RUAConnectlessTransfer,
5 : RUAErrorInd,
6 : RUAPrivateMessage
}