diff --git a/debian/libosmocore-utils.install b/debian/libosmocore-utils.install index 9501bec96..a090eeceb 100644 --- a/debian/libosmocore-utils.install +++ b/debian/libosmocore-utils.install @@ -2,3 +2,4 @@ usr/bin/osmo-arfcn usr/bin/osmo-auc-gen usr/bin/osmo-aka-verify usr/bin/osmo-config-merge +usr/bin/osmo-gsm-shark diff --git a/utils/Makefile.am b/utils/Makefile.am index 6e11dcd9e..5ca7a4501 100644 --- a/utils/Makefile.am +++ b/utils/Makefile.am @@ -9,6 +9,7 @@ if ENABLE_UTILITIES EXTRA_DIST = conv_gen.py conv_codes_gsm.py bin_PROGRAMS += osmo-arfcn osmo-auc-gen osmo-config-merge osmo-aka-verify +bin_SCRIPTS = osmo-gsm-shark osmo_arfcn_SOURCES = osmo-arfcn.c diff --git a/utils/osmo-gsm-shark b/utils/osmo-gsm-shark new file mode 100755 index 000000000..abfd4e422 --- /dev/null +++ b/utils/osmo-gsm-shark @@ -0,0 +1,3594 @@ +#!/usr/bin/env python3 + +# osmo-gsm-shark: produce a ladder diagram from and/or filter a GSM network pcap by subscriber. +# Copyright (C) 2019 by Neels Hofmeyr +# +# All Rights Reserved +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +'''osmo-gsm-shark: produce a ladder diagram from and/or filter a GSM network pcap by subscriber. + +Copyright (C) 2019 by Neels Hofmeyr +SPDX-License-Identifier: GPL-2.0+ + +This tool uses tshark (pyshark) to analyze a pcap file or a live network capture to: + +- Associate almost all messages with a subscriber. It is possible to filter by subscriber. +- Separate the different network elements (BSC, MSC, hNodeB, ...). +- Output a ladder diagram. +- Combine repetitive messages. +- Combine/abstract messages into short activity summary. + +Examples: + + osmo-gsm-shark -f trace.pcapng + osmo-gsm-shark -l any + + osmo-gsm-shark -l any --filter-subscr 901701234567123 + osmo-gsm-shark -f trace.pcapng --filter-msg dtap +''' + +import collections +import pyshark +import re +import sys +import types +import time +import traceback + +SHOW_ALL_DEBUG = False + +SHOW_ALL_LAYERS = False +SCCP_COLLAPSE_STP = True +IUH_COLLAPSE_HNBGW = True # doesnt work + +DTAP_COMPL_L3 = ('Location-Updating-Request', 'CM-Service-Request', 'Paging-Response', 'IMSI-Detach-Indication') +GMM_COMPL_L3 = ('Attach-Request', 'Detach-Request') + +class Color: + codes = ( + ('red', '\033[1;31m'), + ('green', '\033[1;32m'), + ('yellow', '\033[1;33m'), + ('blue', '\033[1;34m'), + ('purple', '\033[1;35m'), + ('cyan', '\033[1;36m'), + ('darkred', '\033[31m'), + ('darkgreen', '\033[32m'), + ('darkyellow', '\033[33m'), + ('darkblue', '\033[34m'), + ('darkpurple', '\033[35m'), + ('darkcyan', '\033[36m'), + ('darkgrey', '\033[1;30m'), + ('brightwhite', '\033[1;37m'), + ) + codes_dict = dict(codes) + end = '\033[0;m' + + def colored(code, text): + if type(code) is int: + code = Color.codes[code % len(Color.codes)][1] + else: + code = Color.codes_dict[code] + return f'{code}{text}{Color.end}' + + +def set_instance_vars_from_args(*ignore, self='s'): + f = sys._getframe(1).f_locals + s = f.get(self) + for k,v in f.items(): + if v is s: + continue + if k in ignore: + continue + setattr(s, k, v) + +def same_nonempty(a, b): + if isinstance(a, types.GeneratorType): + return list(a) == list(b) + return a and a == b + +def str_drop(a_str, drop_str): + if a_str and a_str.startswith(drop_str): + return a_str[len(drop_str):] + return a_str + +def sane_msgtype(msgtype): + if not msgtype: + return msgtype + return sane_showname(msgtype).replace(' ','-') + +re_msgtype_label = re.compile('message.type *', re.I) +def sane_showname(showname): + if not showname: + return showname + if ': ' in showname: + showname = showname[showname.index(': ')+1:] + if '(' in showname: + showname = showname[:showname.index('(')] + showname = re_msgtype_label.sub('', showname) + return showname.strip() + +def dir_vals(elem): + strs = [] + for name in dir(elem): + if name.startswith('_'): + continue + try: + strs.append('%r=%r' % (name, getattr(elem, name))) + except: + strs.append('%r=' % (name)) + return '\n' + '\n'.join(strs) + +def dir_p(p, name): + return f'=== {name}\n{dir_vals(p.get(name))}\n---{name}' + +def str_to_int(nr_str): + 'convert hex or decimal string to int' + if not nr_str: + return None + elif nr_str.startswith('0x'): + return int(nr_str, 16) + else: + return int(nr_str, 10) + +def out_text(*args, **kwargs): + print(*args, **kwargs) + +g_ui = None +g_current_msg = None +g_debug_full = False + +def to_text(*args, **kwargs): + if kwargs: + args = list(args) + [repr(kwargs)] + return ' '.join(str(arg) for arg in args) + +def out_text_now(*args, **kwargs): + if g_ui is not None: + g_ui.out_text_now(*args, **kwargs) + else: + print(to_text(*args, **kwargs)) + +def LOG(*args, **kwargs): + if g_current_msg is not None: + g_current_msg.log(*args, **kwargs) + else: + out_text_now(*args, **kwargs) + +def DBG(*args, **kwargs): + if g_current_msg is not None: + g_current_msg.dbg(*args, **kwargs) + else: + out_text_now(*args, **kwargs) + +def ERR(*args, **kwargs): + LOG(Color.colored('red', '***** ERROR:'), *args, **kwargs) + + +def trace(): + return ''.join(traceback.format_stack()) + +def because_str(because): + if not because: + return '-' + t = ['BECAUSE'] + for b in because: + if isinstance(b, tuple) or isinstance(b, list): + t.extend(because_str(b).splitlines()) + elif isinstance(b, str): + t.append(b) + elif isinstance(b, Message): + t.append(b.str(show_traits=True, show_conns=True)) + else: + t.append(str(b)) + return '\n|'.join(t) + +def out_error(*args, **kwargs): + if g_ui is not None: + g_ui.out_error(*args, **kwargs) + else: + out_text_now(Color.colored('red', '*** ERROR:'), *args, **kwargs) + if g_current_msg: + out_text_now(Color.colored('red', '*** ERROR: while processing msg'), g_current_msg.str(show_traits=True, show_conns=True)) + out_text_now(trace()) + +def tmsi_standardize(tmsi): + try: + return format(int(tmsi), '08x') + except: + return None + +# a dict containing a list for each key; l.add(name, item) adds item to the list at key=name. +class listdict(dict): + '''A dict where each entry is a list of items''' + def _have(ld, name): + l = ld.get(name) + if not l: + l = [] + ld[name] = l + return l + + def add(ld, name, item): + l = ld._have(name) + l.append(item) + return ld + + def add_dict(ld, d): + for k,v in d.items(): + ld.add(k, v) + + def update(ld, other_ld): + for name, items in other_ld.items(): + ld.extend(name, items) + return ld + + def extend(ld, name, vals): + l = ld._have(name) + l.extend(vals) + return ld + + def remove(ld, name, item): + l = ld.get(name) + if item in l: + l.remove(item) + def _have(ld, name): + l = ld.get(name) + if not l: + l = [] + ld[name] = l + return l + + def add(ld, name, item): + l = ld._have(name) + l.append(item) + return ld + + def add_dict(ld, d): + for k,v in d.items(): + ld.add(k, v) + + def update(ld, other_ld): + for name, items in other_ld.items(): + ld.extend(name, items) + return ld + + def extend(ld, name, vals): + l = ld._have(name) + l.extend(vals) + return ld + + def remove(ld, name, item): + l = ld.get(name) + if item in l: + l.remove(item) + +class UniqueList(list): + def append(s, item): + if item in s or item is None: + return False + super().append(item) + return True + + def insert(s, idx, item): + if item in s or item is None: + return False + super().insert(idx, item) + return True + + def extend(s, items): + added = 0 + for item in items: + if s.append(item): + added += 1 + return added + +class NamedIds(dict): + def __init__(s, start_id:int=1): + set_instance_vars_from_args() + + def next(s, name): + next_id = s.get(name, s.start_id) + s[name] = next_id + 1 + return next_id + +class Packet: + def __init__(s, idx, cap_p): + set_instance_vars_from_args() + # sanitize impossible attr with dot in its name, + # seen gsm_a.bssmap and gsm_a.dtap + for name in dir(s.cap_p): + if '.' in name: + new_name = name.replace('.', '_') + elif not name: + new_name = 'unnamed' + else: + continue + setattr(s.cap_p, new_name, getattr(s.cap_p, name)) + + @classmethod + def pget(cls, cap_p, tokens, ifnone=None): + if cap_p is None or len(tokens) < 1: + return ifnone + p_field = getattr(cap_p, tokens[0], None) + if p_field is None: + p_field = getattr(cap_p, '.'.join(tokens), None) + if p_field is None: + return ifnone + if len(tokens) > 1: + return Packet.pget(p_field, tokens[1:], ifnone=ifnone) + return p_field + + def get(s, field, ifnone=None): + return Packet.pget(s.cap_p, field.split('.'), ifnone=ifnone) + + def str(s, elem_name=None): + strs = [] + if elem_name: + elem = s.get(elem_name) + else: + elem = s.cap_p + return dir_vals(elem); + + def field_names(s, elem_name=None, elem=None): + strs = ['', f'=== {elem_name} ==='] + if elem is None: + elem = s.get(elem_name) + if not elem: + strs.append('None') + else: + for f in elem._get_all_fields_with_alternates(): + for n in dir(f): + if n.startswith('_'): + continue + strs.append('%r=%r' % (n, getattr(f, n))) + return '\n'.join(strs) + + def all_str(s, elem_name=None, elem=None, depth=1000): + strs = [] + if elem is None: + if elem_name: + elem = s.get(elem_name) + else: + elem = s.cap_p + elem_name = '/' + strs.append('%s:' % elem_name) + for name in dir(elem): + if name.startswith('_') or name.endswith('_value') or name in [ + 'sort', 'reverse', 'remove', 'pop', 'insert', 'index', 'extend', 'count', + 'copy', 'clear', 'append', 'zfill', 'max', 'min', 'resolution', + ]: + continue + try: + full_name = '%s.%s' % (elem_name, name) + val = getattr(elem, name) + if callable(val) or name in ['base16_value']: + continue + strs.append('%r=%r' % (full_name, val)) + if depth and type(val) not in [int, str]: + strs.append(s.all_str(full_name, val, depth-1)) + except: + pass + if hasattr(elem, '_get_all_fields_with_alternates'): + for f in elem._get_all_fields_with_alternates(): + for n in dir(f): + if n.startswith('_'): + continue + full_name = '%r[%r]' % (elem_name, n) + try: + val = getattr(f, n) + except: + continue + if callable(val): + continue + strs.append('%s=%r' % (full_name, val)) + if depth: + strs.append(s.all_str(full_name, val, depth-1)) + return '\n'.join(strs) + +class IpPort: + all_ports = {} + + @classmethod + def _key(cls, ip, port): + return f'{ip}:{port}' + + @classmethod + def get(cls, ip, port, entity=None, proto=None, create=True): + o = IpPort.all_ports.get(IpPort._key(ip, port)) + if o is None and create: + return IpPort(ip, port, entity, proto) + if o is not None: + if entity is not None: + if o.entity is not None and o.entity is not entity: + ERR('Port changes:', o, 'to', entity) + entity.add_port(o.proto, o) + return o + + def __init__(s, ip:str=None, port:str=None, entity=None, proto=None): + set_instance_vars_from_args() + IpPort.all_ports[s.key()] = s + + def __repr__(s): + r = '' + tokens = [] + if s.entity: + tokens.append(f'{s.entity}') + if s.proto: + tokens.append(f'{s.proto}') + tokens.append(s.key()) + return '-'.join(tokens) + + def __eq__(s, other): + return s is other + + def __hash__(s): + return hash(s.key()) + + def key(s): + return IpPort._key(s.ip, s.port) + + @classmethod + def from_sdp(p:Packet): + ip = p.get('sdp.connection_info_address') + port = p.get('sdp.media_port') + return IpPort.get(ip, port) + + @classmethod + def _from_ip(cls, p:Packet, src_dst:str, port:int): + ip = p.get('ip.' + src_dst) + if ip is None: + ipv6 = p.get('ipv6.' + src_dst) + if ipv6 is not None: + ip = '[' + ipv6 + ']' + return IpPort.get(ip, port) + + @classmethod + def from_ip_source(cls, p:Packet, port:int): + return cls._from_ip(p, 'src', port) + + @classmethod + def from_ip_dest(cls, p:Packet, port:int): + return cls._from_ip(p, 'dst', port) + + @classmethod + def from_udp_source(cls, p:Packet): + return cls.from_ip_source(p, p.get('udp.srcport')) + + @classmethod + def from_udp_dest(cls, p:Packet): + return cls.from_ip_dest(p, p.get('udp.dstport')) + + @classmethod + def from_tcp_source(cls, p:Packet): + return cls.from_ip_source(p, p.get('tcp.srcport')) + + @classmethod + def from_tcp_dest(cls, p:Packet): + return cls.from_ip_dest(p, p.get('tcp.dstport')) + + @classmethod + def from_sctp_source(cls, p:Packet): + return cls.from_ip_source(p, p.get('sctp.srcport')) + + @classmethod + def from_sctp_dest(cls, p:Packet): + return cls.from_ip_dest(p, p.get('sctp.dstport')) + +class Message: + pass + +class Trait: + def __init__(s, **kwargs): + if len(kwargs) > 1: + raise Exception('only one trait allowed per Trait(): %r' % kwargs) + for k, v in kwargs.items(): + s.name = k + s.val = v + + def __repr__(s): + return '%r=%r' % (s.name, s.val) + +class Traits(collections.OrderedDict): + def __init__(s, *args, **kwargs): + for arg in args: + s.add(arg) + s.set_vals(**kwargs) + + def add(s, trait:Trait): + s[trait.name] = trait + + def set(s, name, val): + if val is not None: + s.add(Trait(**{name: val})) + + def set_vals(s, **kwargs): + for k,v in kwargs.items(): + if v is None: + continue + if type(v) not in (str, int, float, bool, IpPort): + v = str(v) + s.set(k, v) + + def __repr__(s): + strs = [] + for k,trait in s.items(): + assert k == trait.name + strs.append(repr(trait)) + return '{%s}' % ', '.join(strs) + +def find_recent_msg(msg:Message, messages:list, my_idx:int, condition_func, max_t=1): + for i in reversed(range(my_idx)): + prev_msg = messages[i] + if not prev_msg: + continue + if prev_msg.finalized: + return None + if msg.timestamp - prev_msg.timestamp > max_t: + return None + if condition_func(prev_msg): + yield prev_msg + return None + +def find_same_trait(msg:Message, messages:list, my_idx:int, proto:str, name:str, max_t=1, operator=any): + def same_traits(prev_msg): + return msg.same_traits(prev_msg, proto, name, operator=operator) + return find_recent_msg(msg, messages, my_idx, same_traits, max_t) + +class dddict(dict): + @classmethod + def _get(cls, d, keys, create=False): + if not keys: + return d + k = keys[0] + v = d.get(k) + if len(keys) > 0: + if v is None: + if create: + v = {} + d[k] = v + else: + return None + r = cls._get(v, keys[1:], create=create) + return r + + def gget(s, keys, create=None): + v = dddict._get(s, keys, create=False) + if v is None: + if create is None: + return None + else: + return s.sset(keys, create) + return v + + def sset(s, keys, item): + d = dddict._get(s, keys[:-1], create=True) + d[keys[-1]] = item + return item + + def ppop(s, keys): + d = dddict._get(s, keys[:-1]) + if d is None: + return None + r = d.pop(keys[-1]) + if not d: + if len(keys) > 1: + s.ppop(keys[:-1]) + return r + + @classmethod + def _count(cls, d): + if isinstance(d, dict): + count = 0 + for val in d.values(): + count += cls._count(val) + return count + return 1 + + def count(s): + return dddict._count(s) + + @classmethod + def _all(cls, d): + if isinstance(d, dict): + for val in d.values(): + yield from cls._all(val) + else: + yield d + + def all(s): + return dddict._all(s) + + +class Conn: + open_conns = dddict() + closed_conns = dddict() + + '''One end of a time-limited connection for a protocol layer''' + def __init__(s, proto:str, port:IpPort, conn_id:str, start_msg:Message, close_msg:Message=None, idx=-1, + entity=None, counterparts:list=[], subscriber_conn=None, merge_counterparts=True, add_message=True, + meta=False): + set_instance_vars_from_args('entity', 'add_message', 'merge_counterparts') + s.tx_messages = UniqueList() + s.rx_messages = UniqueList() + if s.subscriber_conn: + s.subscriber_conn.conns.append(s) + + s.keys = (proto, port.key(), conn_id) + Conn.open_conns.sset(s.keys, s) + # A counterpart is the same conn seen from the other side. + # For example, if a BSC opens a conn, the conterpart is the MSC's perspective on the same conn. + s.counterparts = UniqueList() + for cp in counterparts: + if cp is None: + continue + s.counterparts.append(cp) + cp.counterparts.append(s) + + if entity: + entity.add_port(proto, port, from_msg=start_msg) + + if add_message: + s.add_message(start_msg) + + if not counterparts: + LOG(Color.colored('green', f'New conn (now {Conn.count_open_conns()})'), s.proto, '%r' % s.conn_id) + if merge_counterparts: + for other_conn in s.counterparts: + s.merge_subscr_conns(other_conn) + + @classmethod + def _find(cls, keys, find_in_closed_conns=False): + ret = Conn.open_conns.gget(keys) + if ret is not None: + return ret + if find_in_closed_conns: + return Conn.closed_conns.gget(keys) + return None + + @classmethod + def find(cls, proto:str, port:IpPort, conn_id:str, find_in_closed_conns=False): + return cls._find((proto, port.key(), conn_id), find_in_closed_conns=find_in_closed_conns) + + @classmethod + def open(cls, proto:str, port:IpPort, conn_id:str, *args, **kwargs): + exists = cls.find(proto, port, conn_id) + if exists is not None: + ERR('Conn already open:', type(exists), repr(exists)) + LOG(trace()) + return Conn(proto, port, conn_id, *args, **kwargs) + + @classmethod + def find_or_open(cls, proto:str, port:IpPort, conn_id:str, *args, **kwargs): + exists = cls.find(proto, port, conn_id) + if exists is not None: + return exists + return Conn(proto, port, conn_id, *args, **kwargs) + + @classmethod + def open_2way(cls, proto:str, conn_id:str, msg:Message, *args, **kwargs): + conn1 = cls.open(proto, msg.src, conn_id, msg, *args, **kwargs) + cls.open(proto, msg.dst, conn_id, msg, *args, counterparts=[conn1], **kwargs) + return conn1 + + @classmethod + def close_conn(cls, conn, msg): + conn._add_message(msg) + conn.close_msg = msg + Conn.open_conns.ppop(conn.keys) + l = Conn.closed_conns.gget(conn.keys, create=[]) + l.append(conn) + LOG(Color.colored('blue', f'Close conn ({Conn.count_open_conns()} left)'), conn) + + @classmethod + def close(cls, proto, port, conn_id, close_msg, close_counterparts=True, if_exists=False): + conn = Conn.find(proto, port, conn_id) + if conn is None or not conn.is_open(): + if if_exists == False: + ERR('Cannot close, conn not open:', Conn.key_label(proto, port, conn_id)) + return None + Conn.close_conn(conn, close_msg) + assert Conn.find(proto, port, conn_id) is None + if close_counterparts: + for cp in conn.counterparts: + if cp.is_open(): + Conn.close_conn(cp, close_msg) + return conn + + @classmethod + def message(cls, proto, port, conn_id, msg): + conn = cls.find(proto, port, conn_id) + if conn is None: + return None + conn.add_message(msg) + return conn + + def _add_message(s, msg): + if s.port == msg.src: + if s.meta: + msg.meta_conns.append(s) + else: + msg.src_conns.append(s) + s.tx_messages.append(msg) + elif s.port == msg.dst: + if s.meta: + msg.meta_conns.append(s) + else: + msg.dst_conns.append(s) + s.rx_messages.append(msg) + + def add_message(s, msg, add_to_counterparts=True): + if s.close_msg: + out_error('Message on a closed conn', s, msg) + s._add_message(msg) + if add_to_counterparts: + for cp in s.counterparts: + cp._add_message(msg) + + @classmethod + def key_label(cls, proto, port, conn_id): + if conn_id: + return f'{proto}:{conn_id}@{port}' + else: + return f'{proto}@{port}' + + def label(s): + return s.key_label(s.proto, s.port, s.conn_id) + + def __repr__(s): + tokens = [s.label()] + for r in s.counterparts: + if r is None: + tokens.append('None') + else: + tokens.append(r.label()) + return ' -> '.join(tokens) + + def merge_subscr_conns(s, other_conn): + if other_conn is None: + return + if not isinstance(other_conn, Conn): + for item in other_conn: + s.merge_subscr_conns(item) + return + assert isinstance(other_conn, Conn) + if s.subscriber_conn is not None and s.subscriber_conn is other_conn.subscriber_conn: + return + s.subscriber_conn = SubscriberConn.merge(s.subscriber_conn, other_conn.subscriber_conn) + if s.subscriber_conn is None: + s.subscriber_conn = SubscriberConn() + s.subscriber_conn.add_conn(s) + other_conn.subscriber_conn = s.subscriber_conn + s.subscriber_conn.add_conn(other_conn) + + def find_entity(s, kind, with_port=None): + if s.subscriber_conn: + return s.subscriber_conn.find_entity(kind, with_port=with_port) + return None, None + + def is_open(s): + return s.close_msg is None and Conn._find(s.keys) is s + + @classmethod + def count_open_conns(cls): + count = 0 + have = set() + for conn in cls.open_conns.all(): + do_count = True + for conn2 in conn.counterparts: + if conn2.label() in have: + do_count = False + have.add(conn2.label()) + if conn.label() in have: + do_count = False + have.add(conn.label()) + if do_count: + count += 1 + return count + + +class Layer: + + _classes = {} + traits = None + proto = None + cap_p_name = None + + def __init__(s, msg:Message, proto:str, msgtype:str, traits:Traits, minor=False, hidden=False, cap_p_name:str=None): + set_instance_vars_from_args() + if proto in msg.layers: + raise Exception(f'duplicate proto {proto} for message') + msg.layers[proto] = s + s.msgtype = sane_msgtype(s.msgtype) + if not s.cap_p_name: + s.cap_p_name = s.proto + s.__class__.proto = s.proto + s.__class__.cap_p_name = s.cap_p_name + + def label(s): + if s.msgtype: + return f'{s.proto}.{s.msgtype}' + else: + return s.proto + + def identify_entities(s, msg:Message, messages, my_idx): + '''return a list of Message.EntityIdent instances describing source and/or destination entity that message msg identifies.''' + return None + + @classmethod + def identify_conns(s, messages, my_idx): + pass + + def collapse(s, messages, my_idx): + '''return the message itself if it remains in messages, if another absorbed it return the other message, + or if if it is dropped completely return None''' + return messages[my_idx] + + @classmethod + def classes(cls): + if not Layer._classes: + for cls in Layer.__subclasses__(): + name = cls.__name__ + if not name.startswith('Layer_'): + continue + proto_name = name[len('Layer_'):] + Layer._classes[proto_name] = cls + #cls.proto = proto_name + return Layer._classes + + @classmethod + def parse(cls, msg:Message): + + for proto_name,child_class in Layer.classes().items(): + if not msg.p.get(proto_name): + continue + child_class(msg) + + +class Message: + + def __init__(s, p:Packet, finalized=False): + set_instance_vars_from_args() + s.layers = collections.OrderedDict() + s.count = 1 + s.count_back = 0 + s.timestamp = float(p.cap_p.sniff_timestamp) + s.hide = False + s.src = None + s.dst = None + s.src_conns = [] + s.dst_conns = [] + s.meta_conns = [] + s.absorbed = UniqueList() + s.strong_relation = True + s.debug = SHOW_ALL_DEBUG + s._log = [] + + def log(s, *text, **kwtext): + s._log.append(to_text(*text, *kwtext)) + + def dbg(s, *text, **kwtext): + if s.debug: + s._log.append(to_text(*text, *kwtext)) + + def is_minor(s): + return all(l.minor for l in s.layers.values()) + + def get_trait(s, proto:str, name:str, ifnone=None): + # allow alternative lists for proto, like s.get_trait(('tcp', 'udp'), 'src') + if proto is None: + proto = s.layers.keys() + if type(proto) is not str: + for proto_ in proto: + val = s.get_trait(proto_, name, None) + if val is not None: + return val + return ifnone + # allow alternative lists for name, like s.get_trait('tcp', ('src', 'dst)) + if type(name) is not str: + for name_ in name: + val = s.get_trait(proto, name_, None) + if val is not None: + return val + return ifnone + + if name == 'timestamp': + return int(s.timestamp) + layer = s.layers.get(proto, None) + if not layer: + return ifnone + if name == 'msgtype': + return layer.msgtype + trait = layer.traits.get(name, None) + if trait is None: + return ifnone + if trait.val is None: + return ifnone + return trait.val + + def get_traits(s, proto=None, names=None, proto_and_names=None): + pn = [] + if proto or names: + if proto is None or isinstance(proto, str): + proto = [proto] + for p in proto: + pn.append((p, names)) + if proto_and_names: + pn.extend(proto_and_names) + for proto_, names in pn: + if proto_ is None: + proto_ = s.layers.keys() + if isinstance(proto_, str): + proto_ = [proto_] + for proto in proto_: + if names is None: + l = s.layers.get(proto, None) + if not l: + continue + names = l.traits.keys() + if type(names) is str: + names = [names] + for name in names: + result = s.get_trait(proto, name, ifnone=None) + if result is not None: + yield (proto, name, result) + + def get_all_traits(s, proto:str): + layer = s.layers.get(proto) + if not layer: + return {} + return layer.traits + + def same_traits(s, other_msg, proto:str, name:str, allow_unset=False, operator=all): + if type(proto) is not str: + return operator( + s.same_traits(other_msg, proto_, name, allow_unset=allow_unset) + for proto_ in proto + ) + + if name is None: + my_traits = s.get_all_traits(proto) + other_traits = other_msg.get_all_traits(proto) + names = set(my_traits.keys()) + names.update(other_traits.keys()) + name = list(names) + + if type(name) is not str: + return operator( + s.same_traits(other_msg, proto, name_, allow_unset=allow_unset) + for name_ in name + ) + + val = s.get_trait(proto, name) + other_val = other_msg.get_trait(proto, name) + if not allow_unset: + if val is None or other_val is None: + return False + return val == other_val + + def set_trait(s, proto, name, val): + layer = s.layers.get(proto, None) + if layer is None: + layer = Layer(s, proto, None, Traits(Trait(name, val))) + else: + layer.traits.set(name, val) + + def collapse(s, messages, my_idx): + '''iterate backwards over recent messages and see if messages can be combined''' + orig_msg = messages[my_idx] + for layer in s.layers.values(): + msg = layer.collapse(messages, my_idx) + if orig_msg is not msg: + break + return msg + + def absorb_msg(s, other_msg): + global g_current_msg + if g_current_msg is other_msg: + g_current_msg = s + s._log.extend(other_msg._log) + if other_msg and other_msg is not s: + s.absorbed.append(other_msg) + if other_msg.absorbed: + other_absorbed = other_msg.absorbed + other_msg.absorbed = [] + for oa in other_absorbed: + s.absorb_msg(oa) + + def identify_conns(s, messages, my_idx): + msg = messages[my_idx] + for layer_class in Layer.classes().values(): + if layer_class.proto not in msg.layers: + continue + layer_class.identify_conns(messages, my_idx) + + class EntityIdent: + def __init__(s, proto=None, src_port=None, src_kind=None, src_entity=None, dst_port=None, dst_kind=None, dst_entity=None, rename=False): + set_instance_vars_from_args() + + def identify_entities(s, messages, my_idx): + '''From protocol and message discriminators, see if we can identify the src and dst port of the message + to be of a specific core network entity.''' + for layer in s.layers.values(): + try: + identifieds = layer.identify_entities(s, messages, my_idx) + if identifieds is None: + continue + if isinstance(identifieds, Message.EntityIdent): + identifieds = [identifieds] + + for ident in identifieds: + if ident is None: + continue + Entity.find_or_create(ident.proto, ident.src_kind, + ident.src_port or s.src, + ident.src_entity, from_msg=s, + rename=(ident.rename is True or ident.rename == 'src')) + Entity.find_or_create(ident.proto, ident.dst_kind, + ident.dst_port or s.dst, + ident.dst_entity, from_msg=s, + rename=(ident.rename is True or ident.rename == 'dst')) + except: + raise Exception(f'In layer {layer} {s}') + + def find_entity(s, kind, with_port=None): + for conn in (s.src_conns + s.dst_conns): + ret = conn.find_entity(kind, with_port=with_port) + if ret and ret[0] is not None: + return ret + return None, None + + def get_port(s, entity_kind:str): + if s.src_entity_is(entity_kind): + return s.src + elif s.dst_entity_is(entity_kind): + return s.dst + return None + + def entity(s, *kinds): + if s.src.entity and s.src.entity.kind in kinds: + return s.src.entity + if s.dst.entity and s.dst.entity.kind in kinds: + return s.dst.entity + + def src_entity_is(s, *kinds): + return s.src.entity and s.src.entity.kind in kinds + + def dst_entity_is(s, *kinds): + return s.dst.entity and s.dst.entity.kind in kinds + + def same_src_dst(s, other, forward=None, reverse=None): + # assume forward and reverse if neither are set. + # if one of them is set to True, assume the other as False. + if forward is None and reverse is None: + forward = True + reverse = True + a = (s.src.key(), s.dst.key()) + b = (other.src.key(), other.dst.key()) + if forward and reverse: + return set(a) == set(b) + elif forward: + return a == b + elif reverse: + return a == tuple(reversed(b)) + else: + return False + + @classmethod + def parse(cls, p:Packet): + msg = Message(p) + Layer.parse(msg) + msg.src = msg.get_trait(('tcp','udp','sctp'), 'src') + msg.dst = msg.get_trait(('tcp','udp','sctp'), 'dst') + if msg.src is None or msg.dst is None: + return None + assert isinstance(msg.src, IpPort) + assert isinstance(msg.dst, IpPort) + return msg + + def label(s): + label = [] + for l in s.layers.values(): + if not SHOW_ALL_LAYERS: + if l.minor: + continue + if l.hidden and not all((ll.minor or ll.hidden) for ll in s.layers.values()): + continue + label.insert(0, l.label()) + return '/'.join(label) + + def related_subscribers(s): + subscribers = UniqueList() + src_sc = s.src_subscriber_conn() + if src_sc: + subscribers.append(src_sc.subscriber) + dst_sc = s.dst_subscriber_conn() + if dst_sc: + subscribers.append(dst_sc.subscriber) + for a in s.absorbed: + subscribers.extend(a.related_subscribers()) + return subscribers + + def is_subscriber_related(s, subscriber): + return subscriber in s.related_subscribers() + + def __repr__(s): + return s.__str__() + + def __str__(s): + return s.str() + + def str(s, ladder=False, one_column_per_kind=False, show_traits=True, show_conns=True): + t_name = [] + t_name.extend(subscr.label() for subscr in s.related_subscribers()) + name = s.label() + if name: + t_name.append(name) + + src = str(s.src) + dst = str(s.dst) + + if s.src.entity is not None: + src_str = s.src.entity.label() + else: + src_str = src + + if s.count and dst == src: + dst_str = '(self)' + elif s.dst.entity is not None: + dst_str = s.dst.entity.label() + else: + dst_str = dst + + src_pos = 0 + dst_pos = 0 + if s.src.entity: + src_pos = s.src.entity.labelcolumn(one_column_per_kind) + if s.dst.entity: + dst_pos = s.dst.entity.labelcolumn(one_column_per_kind) + + # allows injecting informational fake messages (Entity.news) + if not s.count and not s.count_back: + dst_pos = src_pos + + if not s.src.entity and not s.dst.entity: + if src > dst: + src_pos = dst_pos + 1 + else: + dst_pos = src_pos + 1 + + if not ladder: + if src_pos > dst_pos: + src_pos = 1 + dst_pos = 0 + else: + src_pos = 0 + dst_pos = 1 + + if src_pos <= dst_pos: + left_pos = src_pos + right_pos = dst_pos + left_label = src_str + right_label = dst_str + to_left_count = s.count_back + to_right_count = s.count + else: + left_pos = dst_pos + right_pos = src_pos + left_label = dst_str + right_label = src_str + to_left_count = s.count + to_right_count = s.count_back + + left_strs = [] + left_strs.append(left_label) + if to_left_count: + left_strs.append('<') + if to_left_count > 1: + left_strs.append(f'{to_left_count}') + + right_strs = [] + if to_right_count: + if to_right_count > 1: + right_strs.append(f'{to_right_count}') + right_strs.append('>') + right_strs.append(right_label) + + real_left_pos = max(0, left_pos - (len(left_label)//2)) + real_right_pos = right_pos - (len(right_label)//2) + len(right_label) + 1 - (len(right_label)&1) + + left_str = ''.join(left_strs) + right_str = ''.join(right_strs) + + mid_gap = real_right_pos - real_left_pos - len(right_str) - len(left_str) + mid_gap = max(1, mid_gap) + + if not ladder: + mid_name_margin = 6 + else: + mid_name_margin = mid_gap - len(name) + if to_left_count or to_right_count: + if mid_name_margin > 50: + mid_gap_strs = ['-' * int(mid_name_margin // 2), + name, + '-' * int(mid_name_margin - (mid_name_margin//2))] + name = '' + else: + mid_gap_strs = ['-' * int(mid_gap)] + else: + mid_gap_strs = [] + + t = [' ' * int(real_left_pos), + left_str,] + t.extend(mid_gap_strs) + t.append(right_str) + + if ladder: + t = [''.join(t)] + right_end = len(t[0]) + label_pos = Entity.textcol_one_per_kind if one_column_per_kind else Entity.textcol_one_per_entity + diff = label_pos - right_end + if diff > 0: + t.append(' ' * int(diff)) + + if show_traits: + if isinstance(show_traits, str): + show_traits = [show_traits] + for proto,l in s.layers.items(): + if not l.traits: + continue + if (show_traits is not True) and (proto not in show_traits): + continue + t_name.append('%s%s' % (proto, l.traits)) + + if show_conns: + conns = set() + for c in (s.src_conns + s.dst_conns): + conns.add(f'{c.proto}:{c.conn_id}') + #conns.add(f'{c}') + t_name.append(' '.join(conns)) + + idxs = [s.p.idx] + [a.p.idx for a in s.absorbed] + if len(idxs) <= 3: + t_name.append('+'.join(str(i) for i in sorted(idxs))) + else: + t_name.append(f'{min(idxs)}-{max(idxs)}') + t_name.append(f'{s.timestamp:.3f}') + t.append(' ') + t = [''.join(t)] + indent = '\n' + (' ' * len(t[0]) + ' | ') + t.append(' '.join(t_name)) + + for l in s._log: + t.append(indent) + t.append(l) + return ''.join(t) + + + def src_subscriber_conn(s): + for conn in s.src_conns: + if conn.subscriber_conn is not None: + return conn.subscriber_conn + return None + + def dst_subscriber_conn(s): + for conn in s.dst_conns: + if conn.subscriber_conn is not None: + return conn.subscriber_conn + return None + + def find_message(s, proto, trait, value): + for subscr_conn in (s.src_subscriber_conn(), s.dst_subscriber_conn()): + if subscr_conn is None: + continue + msg = subscr_conn.find_message(proto, trait, value) + if msg: + return msg + return None + + + +class Entity: + '''A core network program like BSC, MSC, ...''' + KINDS_SORTING = ('MS', 'BTS', 'PCU', 'hNodeB', 'BSC', 'MGW@BSC', 'HNBGW', 'STP', 'MSC', 'MGW@MSC', 'MGW', + 'SGSN', 'HLR', 'SIP', 'SIPcon', 'PBX', 'GGSN') + KINDS_SORTING_EXIST = () + entities = listdict() + state_version = 1 # whether to update cached text columns + spacing = 5 + label_spacing = 2 + textcol_one_per_kind = 0 + textcol_one_per_entity = 0 + + # proxy / forwarding addresses to ignore, like the STP port + blacklist = [] + + def __init__(s, kind:str): + set_instance_vars_from_args() + s.idx = None + s.state_version = 0 + s.ports = listdict() + s.labelcol_one_per_kind = 0 + s.labelcol_one_per_entity = 0 + Entity.add(s) + + @classmethod + def add(cls, entity): + Entity.entities.add(entity.kind, entity) + entity.idx = entity.idx_in_kind() + if entity.kind not in Entity.KINDS_SORTING_EXIST: + # a new kind has come up, refresh Entity.KINDS_SORTING_EXIST + exist = [] + for k in Entity.KINDS_SORTING: + if k in Entity.entities.keys(): + exist.append(k) + for k in Entity.entities.keys(): + if k not in exist: + exist.append(k) + Entity.KINDS_SORTING_EXIST = tuple(exist) + + Entity.state_version += 1 + + @classmethod + def count_entities(cls, kind): + l = Entity.entities.get(kind) + return len(l) + + @classmethod + def add_to_blacklist(cls, port:IpPort): + if port in cls.blacklist: + return + cls.blacklist.append(port); + + def rename(s, to_kind): + Entity.entities.remove(s.kind, s) + was_kind = s.kind + s.kind = to_kind + Entity.add(s) + for proto,l in s.ports.items(): + for port in l: + LOG(Color.colored('yellow', 'Rename port'), 'from', was_kind, 'to', s, proto, port) + + @classmethod + def find_or_create(cls, proto, kind, port, matched_entity=None, from_msg=None, rename=False): + if not port: + return None + if port in Entity.blacklist: + return None + + found_entity = port.entity + if found_entity and matched_entity and (found_entity is not matched_entity): + LOG(Color.colored('purple', 'Renaming entity port:'), port, 'to', matched_entity) + if not matched_entity: + matched_entity = found_entity + if matched_entity: + if kind and matched_entity.kind != kind and rename: + matched_entity.rename(kind) + matched_entity.add_port(proto, port, from_msg=from_msg) + return matched_entity + + if kind is None or rename: + for l in Entity.entities.values(): + for e in l: + if e.has_port(port): + if kind and e.kind != kind and rename: + e.rename(kind) + return e + + if kind is None: + return None + else: + l = Entity.entities.get(kind) + if l: + for e in l: + if e.has_port(port): + return e + e = Entity(kind) + e.add_port(proto, port, from_msg=from_msg) + return e + + def absorb(s, other_entity): + '''Merge two entities to one, adopting the other's ports''' + for port in other_entity.ports: + s.add_port(port) + del other_entity + + def label(s): + idx = '' + if s.idx: + idx = str(s.idx + 1) + return f'{s.kind}{idx}' + + def __repr__(s): + return s.label() + def __str__(s): + return repr(s) + + def kind_idx(s): + '''this entity kind's position in the currently known entity kinds: + For 'BSC', if we've seen BTS, BSC and MSC, return 1.''' + return Entity.KINDS_SORTING_EXIST.index(s.kind) + + def idx_in_all(s): + '''this entity kind's position in all currently known entities: + For the second 'BSC', if we've seen 2 BTS, 3 BSC and 1 MSC, return 2 (BTS) + 1 (second BSC) = 3.''' + idx = 0 + for k in Entity.KINDS_SORTING_EXIST: + if k == s.kind: + idx += Entity.entities.get(s.kind).index(s) + return idx + idx += Entity.count_entities(k) + return idx + + def idx_in_kind(s): + '''this entity kind's position in the list of entities of the same kind''' + return Entity.entities.get(s.kind).index(s) + + def check_update_state(s): + if s.state_version == Entity.state_version: + return + Entity.calculate_textcolumns() + s.state_version = Entity.state_version + + def labelcolumn(s, one_column_per_kind=False, mid=True): + s.check_update_state() + if one_column_per_kind: + midcol = s.labelcol_one_per_kind + else: + midcol = s.labelcol_one_per_entity + + if mid: + ret = midcol + else: + ret = int(midcol - (len(s.label()) // 2)) + return ret + + @classmethod + def calculate_textcolumns(cls): + '''In text rendering of a ladder diagram, return the text column for this entity, + if rendering each entity in its own column (not sharing one column per entity kind)''' + entity_col = 0 + kind_col = 0 + for k in Entity.KINDS_SORTING_EXIST: + l = Entity.entities.get(k) + + kind_col += len(k) // 2 + for e in l: + e.labelcol_one_per_kind = kind_col + Entity.textcol_one_per_kind = kind_col + len(e.label()) + Entity.spacing + + entity_col += len(e.kind)//2 + e.labelcol_one_per_entity = entity_col + entity_col += len(e.label()) + Entity.spacing + Entity.textcol_one_per_entity = entity_col + + kind_col += len(k) + Entity.spacing + + def has_port(s, port, proto=None): + if proto: + if port in s.ports.get(proto, []): + return proto + return None + for proto,l in s.ports.items(): + if port in l: + return proto + return None + + def remove_port(s, port): + for proto,l in s.ports.items(): + if port in l: + l.remove(port) + port.entity = None + return + + def add_port(s, proto, port, from_msg=None): + if port.entity and port.entity is not s: + port.entity.remove_port(port) + if s.has_port(port, proto=proto): + return + s.ports.add(proto, port) + port.entity = s + port.proto = proto + LOG(Color.colored('cyan', 'New port:'), port) + + +class Subscriber: + next_ident = 1 + next_tmsi_idx = 1 + imsis = {} + tmsis = {} + imeis = {} + msisdns = {} + + def __init__(s, imsi:str=None, tmsi=None, imei=None, msisdn=None): + set_instance_vars_from_args() + s.subscriber_conns = UniqueList() + s.tmsis = set() + s.tmsi_idx = 0 + s.set_last_tmsi(tmsi) + s.ident = Subscriber.next_ident + Subscriber.next_ident += 1 + Subscriber.update_dicts(s) + + def set_last_tmsi(s, tmsi): + if tmsi is None: + return + s.tmsi = tmsi + if (not isinstance(tmsi, str)) or len(str(tmsi)) < 8: + raise Exception('Invalid TMSI: ' + str(tmsi)) + s.tmsi_idx = Subscriber.next_tmsi_idx + Subscriber.next_tmsi_idx += 1 + s.tmsis.add(tmsi) + Subscriber.tmsis[s.tmsi] = s + + @classmethod + def update_dicts(cls, s): + if s.imsi: + Subscriber.imsis[s.imsi] = s + for tmsi in s.tmsis: + Subscriber.tmsis[tmsi] = s + if s.imei: + Subscriber.imeis[s.imei] = s + if s.msisdn: + Subscriber.msisdns[s.msisdn] = s + + def label(s, full=False): + l = [] + if full or (not s.imsi and not s.tmsi and not s.imei and not s.msisdn): + l.append(f'subscr{s.ident}') + if s.imsi and (full or not s.msisdn): + l.append(f'imsi{s.imsi}') + + if s.tmsi and (full or not s.imsi): + l.append(f'tmsi{s.tmsi}') + if s.imei and (full or (not s.imsi and not s.tmsi)): + l.append(f'imei{s.imei}') + if s.msisdn: + l.append(f'msisdn{s.msisdn}') + return Color.colored(s.ident, ':'.join(l)) + + def __repr__(s): + return s.label(full=True) + def __str__(s): + return s.label() + + @classmethod + def identify_subscriber(cls, msg:Message): + imsi = msg.get_trait(('dtap','bssmap','rsl','gsup'), 'imsi') + tmsi = tmsi_standardize(msg.get_trait(('dtap','bssmap','rsl'), 'tmsi')) + imei = msg.get_trait('dtap', 'imei') + msisdn = msg.get_trait('gsup', 'msisdn') + + if not (imsi or tmsi or imei or msisdn): + return + #if not msg.src_conns and not msg.dst_conns: + # return + + # could start out with subscr = None, but to use a few less subscriber.ids start out with a present + # subscriber, if any. + subscr_conn = msg.src_subscriber_conn() or msg.dst_subscriber_conn() + if subscr_conn is not None: + subscr = subscr_conn.subscriber + else: + subscr = None + + if imsi: + subscr = Subscriber.merge(Subscriber.by_imsi(imsi), subscr) + if tmsi: + subscr = Subscriber.merge(Subscriber.by_tmsi(tmsi), subscr) + if imei: + subscr = Subscriber.merge(Subscriber.by_imei(imei), subscr) + if msisdn: + subscr = Subscriber.merge(Subscriber.by_msisdn(msisdn), subscr) + + if subscr is None: + return + if subscr_conn is None: + subscr_conn = SubscriberConn() + subscr_conn.set_subscriber(subscr) + + for c in (msg.src_conns + msg.dst_conns): + if c.subscriber_conn is None: + subscr_conn.add_conn(c) + c.subscriber_conn = subscr_conn + else: + c.subscriber_conn = SubscriberConn.merge(c.subscriber_conn, subscr_conn) + + @classmethod + def by_imsi(cls, imsi): + subscr = Subscriber.imsis.get(imsi) + if not subscr: + subscr = Subscriber(imsi=imsi) + return subscr + + @classmethod + def by_tmsi(cls, tmsi): + subscr = Subscriber.tmsis.get(tmsi) + if not subscr: + subscr = Subscriber(tmsi=tmsi) + return subscr + + @classmethod + def by_imei(cls, imei): + subscr = Subscriber.imeis.get(imei) + if not subscr: + subscr = Subscriber(imei=imei) + return subscr + + @classmethod + def by_msisdn(cls, msisdn): + subscr = Subscriber.msisdns.get(msisdn) + if not subscr: + subscr = Subscriber(msisdn=msisdn) + return subscr + + @classmethod + def merge(cls, a, b): + assert a is None or isinstance(a, Subscriber) + assert b is None or isinstance(b, Subscriber) + if a is None and b is None: + return None + if a is None: + return b + if b is None or b is a: + return a + + if not a.imsi and a.ident > b.ident: + return cls.merge(b, a) + + if a.imsi and b.imsi and a.imsi != b.imsi: + out_error(f'cannot absorb, subscriber would change IMSI: {b.imsi} -> {a.imsi}') + return None + if a.imei and b.imei and a.imei != b.imei: + out_error(f'cannot absorb, subscriber would change IMEI: {b.imei} -> {a.imei}') + return None + + if b.imsi: + a.imsi = b.imsi + b.imsi = None + + if b.tmsis: + a.tmsis.update(b.tmsis) + b.tmsis = set() + if b.tmsi_idx > a.tmsi_idx: + a.set_last_tmsi(b.tmsi) + b.tmsi = None + + if b.imei: + a.imei = b.imei + b.imei = None + + if b.msisdn: + if a.msisdn and a.msisdn != b.msisdn: + LOG(f'subscriber {a} changes MSISDN: {a.msisdn} -> {b.msisdn}') + a.msisdn = b.msisdn + b.msisdn = None + + Subscriber.update_dicts(a) + + for sc in b.subscriber_conns: + a.add_subscriber_conn(sc) + b.subscriber_conns = [] + return a + + def add_subscriber_conn(s, subscriber_conn): + if subscriber_conn.subscriber is s: + return + if subscriber_conn.subscriber: + subscriber_conn.subscriber.subscriber_conns.remove(subscriber_conn) + s.subscriber_conns.append(subscriber_conn) + subscriber_conn.subscriber = s + assert subscriber_conn in subscriber_conn.subscriber.subscriber_conns + + def find_entity(s, kind, with_port=None): + for subscriber_conn in reversed(s.subscriber_conns): + found, found_subscriber_conn = subscriber_conn.find_entity(kind, ask_subscriber=False, + with_port=with_port) + if found is not None: + return found, found_subscriber_conn + return None, None + +class SubscriberConn: + '''A SubscriberConn is a collection of conns that feed into each other. + For example, the Abis and the BSSMAP link for the same subscriber are related, as well as the MGCP and + RTP spoken for that subscriber. + If a subscriber disconnects and connects again, that is a new separate SubscriberConn; + also if a subscriber would concurrently attach in twice somehow, that would be separate SubscriberConn + instances. + + Note that a Message's src_conns and a dst_conns are not necessarily listed in the same SubscriberConn, for example + for RTP, SIP or SMS, the messages may pass from one subscriber to another.''' + + def __init__(s, subscriber=None): + set_instance_vars_from_args() + s.conns = UniqueList() + + @classmethod + def merge(cls, a, b): + assert a is None or isinstance(a, SubscriberConn) + assert b is None or isinstance(b, SubscriberConn) + if a is None and b is None: + return None + if a is None: + return b + if b is None or b is a: + return a + b_subscr = b.subscriber + if b.subscriber: + b.subscriber.subscriber_conns.remove(b) + b.subscriber = None + a.subscriber = Subscriber.merge(a.subscriber, b_subscr) + if a.subscriber: + a.subscriber.subscriber_conns.append(a) + for conn in b.conns: + conn.subscriber_conn = a + a.conns.append(conn) + b.conns = None + return a + + def find_entity(s, kind, with_port=None, ask_subscriber=True): + if ask_subscriber and s.subscriber is not None: + return s.subscriber.find_entity(kind, with_port=with_port) + if with_port is not None and isinstance(with_port, str): + with_port = [with_port] + for conn in reversed(s.conns): + if with_port is not None and conn.port.proto not in with_port: + continue + if conn.port.entity is not None and conn.port.entity.kind == kind: + return conn.port.entity, s + return None, None + + def find_message(s, proto, trait, val): + for conn in reversed(s.conns): + for msgs in (conn.tx_messages, conn.rx_messages): + for msg in msgs: + for p, t, v in msg.get_traits(proto, trait): + if val is None or val == v: + return msg + return None + + def set_subscriber(s, subscriber): + s.subscriber = subscriber + if subscriber is not None: + subscriber.subscriber_conns.append(s) + + def add_conn(s, conn): + s.conns.append(conn) + conn.subscriber_conn = s + + def __repr__(s): + return f'{s.subscriber}~{s.conns}' + + +class Layer_tcp(Layer): + def __init__(s, msg:Message): + p = msg.p + traits = Traits( + src=IpPort.from_tcp_source(p), + dst=IpPort.from_tcp_dest(p), + ) + super().__init__(msg=msg, proto='tcp', msgtype=None, traits=traits, minor=True) + +class Layer_udp(Layer): + def __init__(s, msg:Message): + p = msg.p + traits = Traits( + src=IpPort.from_udp_source(p), + dst=IpPort.from_udp_dest(p), + ) + super().__init__(msg=msg, proto='udp', msgtype=None, traits=traits, minor=True) + +class Layer_sctp(Layer): + def __init__(s, msg:Message): + p = msg.p + traits = Traits( + src=IpPort.from_sctp_source(p), + dst=IpPort.from_sctp_dest(p), + stream_id = p.get('sctp.data_sid'), + stream_seq = p.get('sctp.data_ssn'), + ) + super().__init__(msg=msg, proto='sctp', msgtype=None, traits=traits, minor=True) + +class Layer_rtp(Layer): + def __init__(s, msg:Message): + pt = msg.p.get('rtp.p_type') + + iuup_msgtype = sane_showname(msg.p.get('iuup.pdu_type.showname')) + + if iuup_msgtype is not None: + msgtype = f'{pt}.{iuup_msgtype}' + else: + msgtype = pt + + traits = Traits( + pt=pt, + iuup=iuup_msgtype + ) + super().__init__(msg=msg, proto='rtp', msgtype=msgtype, traits=traits) + + def collapse(s, messages, my_idx): + msgtype = s.msg.get_trait('rtp', 'msgtype') + src = s.msg.src + dst = s.msg.dst + for i in reversed(range(my_idx)): + prev_msg = messages[i] + if not prev_msg: + continue + if prev_msg.finalized: + break + if not 'rtp' in prev_msg.layers: + if prev_msg.is_minor(): + continue + else: + break + if prev_msg.get_trait('rtp', 'msgtype') != msgtype: + continue + if s.msg.same_src_dst(prev_msg, forward=True): + # found a recent RTP similar RTP packet, combine + prev_msg.count += 1 + messages[my_idx] = None + prev_msg.absorb_msg(s.msg) + return prev_msg + if 1 and s.msg.same_src_dst(prev_msg, reverse=True): + # same but backwards + prev_msg.count_back += 1 + messages[my_idx] = None + prev_msg.absorb_msg(s.msg) + return prev_msg + return s.msg + + # identify_entities: RTP ports are identified by watching RSL and MGCP, see Layer_gsm_abis_rsl.identify_conns_rtp + + @classmethod + def identify_conns(cls, messages:list, my_idx:int): + msg = messages[my_idx] + + conn = Conn.find('rtp', msg.src, conn_id=msg.src.key()) + if conn is not None: + conn.add_message(msg) + + conn = Conn.find('rtp', msg.dst, conn_id=msg.dst.key()) + if conn is not None: + conn.add_message(msg) + +class Layer_mgcp(Layer): + def __init__(s, msg:Message): + p = msg.p + verb = p.get('mgcp.req_verb') + rsp = p.get('mgcp.rsp_rspstring') + msgtype = verb or rsp or '?' + tid = p.get('mgcp.transid', '') + + rtp_port = None + sdp_rtp_ip = p.get('sdp.connection_info_address') + sdp_rtp_port = p.get('sdp.media_port') + if sdp_rtp_ip and sdp_rtp_port: + rtp_port = IpPort.get(sdp_rtp_ip, sdp_rtp_port) + + if rsp: + endp = p.get('mgcp.param_specificendpointid') + else: + endp = p.get('mgcp.req_endpoint') + if endp and endp.startswith('rtpbridge/*@'): + endp = None + traits = Traits( + tid=tid, + endp=endp, + ci=p.get('mgcp.param_connectionid'), + verb=verb, + rsp=rsp, + rtp_port=rtp_port, + ) + + s.tid = tid + super().__init__(msg=msg, proto='mgcp', msgtype=msgtype, traits=traits) + + def label(s): + return f'mgcp.{s.tid}.{s.msgtype}' + + def identify_entities(s, msg:Message, messages, my_idx): + if msg.get_trait('mgcp', 'verb') == 'CRCX': + dst_kind = 'MGW' + if msg.src_entity_is('BSC'): + dst_kind = 'MGW@BSC' + elif msg.src_entity_is('MSC'): + dst_kind = 'MGW@MSC' + return Message.EntityIdent(proto='mgcp', dst_kind=dst_kind) + elif msg.get_trait('mgcp', 'rsp') and msg.src_entity_is('MGW', 'MGW@BSC','MGW@MSC'): + rtp = msg.get_trait('mgcp', 'rtp_port') + if rtp: + msg.src.entity.add_port('rtp', rtp) + return None + + @classmethod + def find_req(cls, messages, my_idx): + msg = messages[my_idx] + for match in find_same_trait(msg, messages, my_idx, 'mgcp', 'tid'): + if match.get_trait('mgcp', 'rsp'): + continue + if not match.same_src_dst(msg, reverse=True): + continue + return match + return None + + @classmethod + def identify_conns(cls, messages:list, my_idx:int): + msg = messages[my_idx] + proto = 'mgcp' + + if msg.get_trait('mgcp', 'verb'): + endp = msg.get_trait('mgcp', 'endp') + mgw_port = msg.dst + endp_conn = Conn.find(proto, mgw_port, endp) + if endp_conn is not None: + endp_conn.add_message(msg) + for c in (msg.src_conns + msg.dst_conns): + endp_conn.merge_subscr_conns(c) + + ci = msg.get_trait('mgcp', 'ci') + conn_id = f'{endp}:{ci}' + conn = Conn.find(proto, mgw_port, conn_id) + if conn: + conn.add_message(msg) + for c in (msg.src_conns + msg.dst_conns + msg.meta_conns): + conn.merge_subscr_conns(c) + + if msg.get_trait('mgcp', 'rsp') == 'OK': + req = cls.find_req(messages, my_idx) + if req is None: + ERR('MGCP response without request') + return + mgw_port = req.dst + + verb = req.get_trait('mgcp', 'verb') + if verb == 'CRCX': + # The MGCP connection + endp = msg.get_trait('mgcp', 'endp') or req.get_trait('mgcp', 'endp') + ci = msg.get_trait('mgcp', 'ci') + if not endp or not ci: + ERR('MGCP CRCX with endp =', endp, 'ci =', ci) + + # creating two levels of conn: a meta conn with just endp, + # and a proper one with endp+ci + # endp: + endp_conn = Conn.find(proto, mgw_port, endp) + if not endp_conn: + endp_conn = Conn.open(proto, mgw_port, conn_id=endp, start_msg=msg) + + # endp+ci: + conn_id = f'{endp}:{ci}' + conn = Conn.find(proto, mgw_port, conn_id) + if not conn: + conn = Conn.open(proto, mgw_port, conn_id, msg) + conn.add_message(req) + conn.add_message(msg) + conn.merge_subscr_conns(endp_conn) + + # The RTP connection set up by MGCP + rtp_port = msg.get_trait('mgcp', 'rtp_port') + if rtp_port: + rtp_conn = Conn.open('rtp', rtp_port, conn_id=rtp_port.key(), start_msg=msg, merge_counterparts=False, + entity=conn.port.entity) + rtp_conn.merge_subscr_conns(conn) + # bssmap or ranap layer will mention this rtp_port in their Assignment / RAB-Assignment + + else: + endp = req.get_trait('mgcp', 'endp') + ci = req.get_trait('mgcp', 'ci') + Conn.message(proto, mgw_port, endp, msg) + + conn_id = f'{endp}:{ci}' + Conn.message(proto, mgw_port, conn_id, msg) + + if verb == 'DLCX': + # The MGCP connection + endp = req.get_trait('mgcp', 'endp') + ci = req.get_trait('mgcp', 'ci') + if not endp: + ERR('MGCP DLCX without endp') + + def close_ci(ci): + ci_conn_id = f'{endp}:{ci}' + # go through all RTP ports created in this conn + ci_conn = Conn.find(proto, mgw_port, ci_conn_id) + if ci_conn is not None: + all_rtp_ports = UniqueList() + for msgs in (ci_conn.rx_messages, ci_conn.tx_messages): + for msg in msgs: + all_rtp_ports.append(msg.get_trait('mgcp', 'rtp_port')) + + for rtp_port in all_rtp_ports: + Conn.close('rtp', rtp_port, conn_id=rtp_port.key(), close_msg=msg, if_exists=True) + Conn.close(proto, mgw_port, ci_conn_id, msg) + + if ci: + close_ci(ci) + else: + endp_conn = Conn.find(proto, mgw_port, endp) + all_ci = UniqueList() + if endp_conn: + for msgs in (endp_conn.tx_messages, endp_conn.rx_messages): + for msg in msgs: + all_ci.append(msg.get_trait('mgcp', 'ci')) + for ci in all_ci: + close_ci(ci) + + Conn.close(proto, mgw_port, endp, msg, if_exists=True) + + + elif msg.get_trait('mgcp', 'verb') == 'MDCX': + # The RTP connection set up by BSC or MSC + rtp_port = msg.get_trait('mgcp', 'rtp_port') + if rtp_port and Conn.find('rtp', rtp_port, conn_id=rtp_port.key()) is None: + rtp_conn = Conn.open('rtp', rtp_port, conn_id=rtp_port.key(), start_msg=msg, merge_counterparts=False) + for c in msg.src_conns: + rtp_conn.merge_subscr_conns(c) + +class Layer_sccp(Layer): + def __init__(s, msg:Message): + p = msg.p + msgtype = p.get('sccp.message_type.showname') + traits = Traits( + src_lref=p.get('sccp.slr'), + dst_lref=p.get('sccp.dlr'), + ) + super().__init__(msg=msg, proto='sccp', msgtype=msgtype, traits=traits, hidden=True) + + def collapse(s, messages, my_idx): + msg = s.msg + + # cut out STP hop + if SCCP_COLLAPSE_STP: + src = msg.src + t = msg.timestamp + for i in reversed(range(my_idx)): + prev_msg = messages[i] + if not prev_msg: + continue + if t - prev_msg.timestamp > 1: + break + if prev_msg.absorbed: + continue + prev_sccp = prev_msg.layers.get(s.proto, None) + if prev_sccp is None: + continue + #if src != prev_msg.dst: + # continue + if s.msgtype != prev_sccp.msgtype: + continue + if not msg.same_traits(prev_msg, 'sccp', ('src_lref', 'dst_lref'), allow_unset=True): + continue + if not msg.same_traits(prev_msg, 'sctp', 'stream_id'): + continue + if not msg.same_traits(prev_msg, 'm3ua', ('opc', 'dpc')): + continue + if not msg.same_traits(prev_msg, 'm3ua', 'message_length'): + continue + + prev_msg.set_trait('sctp', 'dst', msg.get_trait('sctp', 'dst')) + prev_msg.dst = msg.dst + prev_msg.absorb_msg(msg) + messages[i] = None + messages[my_idx] = prev_msg + Entity.add_to_blacklist(src) + return prev_msg + return msg + + @classmethod + def identify_conns(cls, messages:list, my_idx:int): + msg = messages[my_idx] + proto = 'sccp' + msgtype = msg.get_trait(proto, 'msgtype') + if SCCP_COLLAPSE_STP and not msg.absorbed: + return + src_id = msg.get_trait(proto, 'src_lref') + dst_id = msg.get_trait(proto, 'dst_lref') + if msgtype == 'Connection-Request': + Conn.open(proto, msg.src, src_id, msg) + elif msgtype == 'Connection-Confirm': + Conn.open(proto, msg.src, src_id, msg, + counterparts=[Conn.find(proto, msg.dst, dst_id)]) + elif msgtype in ('Release-Complete',): + Conn.close(proto, msg.src, src_id, msg) + else: + if src_id: + Conn.message(proto, msg.src, src_id, msg) + if dst_id: + Conn.message(proto, msg.dst, dst_id, msg) + + +class Layer_m3ua(Layer): + def __init__(s, msg:Message): + traits = Traits( + opc = msg.p.get('m3ua.protocol_data_opc'), + dpc = msg.p.get('m3ua.protocol_data_dpc'), + message_length = msg.p.get('m3ua.message_length'), + ) + super().__init__(msg=msg, proto='m3ua', msgtype=None, traits=traits, minor=True) + +# wireshark commonly falsely classifies a BSSMAP Ciphering Mode Command as RNSAP PDU +class Layer_rnsap(Layer): + def __init__(s, msg:Message): + p = msg.p + traits = Traits() + msgtype = 'Cipher Mode Command' + super().__init__(msg=msg, proto='bssmap', msgtype=msgtype, traits=traits) + +class Layer_bssap(Layer): + 'BSSAP, either BSS Management (see Layer_gsm_a_bssmap) or Direct Transfer (see Layer_gsm_a_dtap)' + def __init__(s, msg:Message): + msgtype = msg.p.get('bssap.msgtype.showname') + msgtype = msg.p.get('bssap.pdu_type.showname') + + traits = Traits( + msgtype_nr=int(msg.p.get('bssap.pdu_type'), 16), + ) + + super().__init__(msg=msg, proto='bssap', msgtype=msgtype, traits=traits, minor=True) + +class Layer_bssgp(Layer): + def __init__(s, msg:Message): + msgtype = sane_msgtype(msg.p.get('bssgp.pdu_type.showname')) + traits = Traits( + tlli=msg.p.get('bssgp.gsm_a_rr_tlli'), + ) + super().__init__(msg=msg, proto='bssgp', msgtype=msgtype, traits=traits) + + def identify_entities(s, msg:Message, messages, my_idx): + if msg.get_trait('bssgp', 'msgtype') == 'FLOW-CONTROL-BVC': + return Message.EntityIdent(proto='bssgp', src_kind='PCU', dst_kind='SGSN') + return None + + @classmethod + def identify_conns(cls, messages:list, my_idx:int): + msg = messages[my_idx] + proto = 'bssgp' + msgtype = msg.get_trait('dtap', 'msgtype') + tlli = msg.get_trait('bssgp', 'tlli') + conn_id = tlli + if not conn_id: + return + if msgtype == 'Attach-Request': + Conn.open(proto, msg.src, conn_id, msg, + counterparts=[Conn.open(proto, msg.dst, conn_id, msg)]) + elif msgtype == 'Attach-Accept': + conn = Conn.close(proto, msg.src, conn_id, msg) + new_conn_id = msg.get_trait('dtap', 'tmsi') + new_conn = Conn.open(proto, msg.src, new_conn_id, msg, + counterparts=[Conn.open(proto, msg.dst, new_conn_id, msg)]) + new_conn.merge_subscr_conns(conn) + elif msgtype == 'Attach-Complete': + Conn.close(proto, msg.src, conn_id, msg) + else: + conn = Conn.message(proto, msg.src, conn_id, msg) + + + + +class Layer_hnbap(Layer): + def __init__(s, msg:Message): + def strip_till_dash(dashstr): + if not dashstr or not '-' in dashstr: + return dashstr + dash = dashstr.rindex('-') + return dashstr[dash+1:] + + msgtype = strip_till_dash(msg.p.get('hnbap.procedurecode.showname')) + pdutype = strip_till_dash(sane_msgtype(msg.p.get('hnbap.hnbap_pdu.showname'))) + pdutype_nr = msg.p.get('hnbap.hnbap_pdu') + traits = Traits( + msgtype_nr=int(msg.p.get('hnbap.procedurecode')), + pdutype=pdutype, + pdutype_nr=int(pdutype_nr), + ) + super().__init__(msg=msg, proto='hnbap', msgtype=msgtype, traits=traits) + + def identify_entities(s, msg:Message, messages, my_idx): + if (msg.get_trait('hnbap', 'msgtype') in ('Register', 'HNBRegister', 'UERegister')) and (msg.get_trait('hnbap', 'pdutype_nr') == 0): + return Message.EntityIdent(proto='Iuh', src_kind='hNodeB', dst_kind='HNBGW') + return None + +class Layer_rua(Layer): + def __init__(s, msg:Message): + def strip_till_dash(dashstr): + if not dashstr or not '-' in dashstr: + return dashstr + dash = dashstr.rindex('-') + return dashstr[dash+1:] + + msgtype = strip_till_dash(msg.p.get('rua.procedurecode.showname')) + pdutype = strip_till_dash(sane_msgtype(msg.p.get('rua.rua_pdu.showname'))) + pdutype_nr = msg.p.get('rua.rua_pdu') + cn_domain_i = msg.p.get('rua.cn_domainindicator') + cn_domain = None + if cn_domain_i == '0': + cn_domain = 'cs' + elif cn_domain_i == '1': + cn_domain = 'ps' + traits = Traits( + msgtype_nr=int(msg.p.get('rua.procedurecode')), + pdutype=pdutype, + cn_domain=cn_domain, + rua_ctx=msg.p.get('rua.context_id'), + ) + super().__init__(msg=msg, proto='rua', msgtype=msgtype, traits=traits, cap_p_name='rua') + + @classmethod + def identify_conns(cls, messages:list, my_idx:int): + msg = messages[my_idx] + proto = 'rua' + msgtype = msg.get_trait(proto, 'msgtype') + conn_id = (msg.get_trait(proto, 'cn_domain') or '?') + ':' + (msg.get_trait(proto, 'rua_ctx') or '?') + if msgtype == 'Connect': + conn = Conn.open(proto, msg.src, conn_id, msg) + Conn.open(proto, msg.dst, conn_id, msg, counterparts=[conn]) + elif msgtype == 'Disconnect': + Conn.close(proto, msg.dst, conn_id, msg) + else: + Conn.message(proto, msg.src, conn_id, msg) + + +class Layer_ranap(Layer): + def __init__(s, msg:Message): + msgtype = msg.p.get('ranap.rab_assignmentrequest_element' + ) or msg.p.get('ranap.rab_assignmentresponse_element' + ) or msg.p.get('ranap.initiatingmessage_element') + + rtp_port = None + ip = msg.p.get('ranap.nsap_ipv4_addr') + port_bin = msg.p.get('ranap.bindingid.binary_value') + if ip and port_bin and len(port_bin) >= 2: + port = int.from_bytes(port_bin[:2], "big") + rtp_port = IpPort.get(ip, port) + + traits = Traits( + rtp_port=rtp_port, + ) + super().__init__(msg=msg, proto='ranap', msgtype=msgtype, traits=traits) + + def collapse(s, messages, my_idx): + msg = s.msg + + # cut out HNBGW hop + if IUH_COLLAPSE_HNBGW: + src = msg.src + t = msg.timestamp + for i in reversed(range(my_idx)): + prev_msg = messages[i] + if not prev_msg: + continue + if t - prev_msg.timestamp > 1: + break + if src != prev_msg.dst: + continue + if msg.src.entity is not prev_msg.dst.entity: + continue + # DOESNT WORK + if not msg.same_traits(prev_msg, 'ranap', None): + continue + if not msg.same_traits(prev_msg, 'sccp', ('src_lref', 'dst_lref'), allow_unset=True): + continue + if not msg.same_traits(prev_msg, 'sctp', 'stream_id'): + continue + if not msg.same_traits(prev_msg, 'm3ua', ('opc', 'dpc')): + continue + + prev_msg.set_trait('sctp', 'dst', msg.get_trait('sctp', 'dst')) + prev_msg.dst = msg.dst + prev_msg.absorb_msg(msg) + messages[i] = None + messages[my_idx] = prev_msg + Entity.add_to_blacklist(src) + return prev_msg + return msg + + def identify_entities(s, msg:Message, messages, my_idx): + ids = [] + ids.append(s.identify_attach(msg, messages, my_idx)) + + msgtype = msg.get_trait('ranap', 'msgtype') + rtp_port = msg.get_trait('ranap', 'rtp_port') + + if rtp_port and msgtype == 'RAB-AssignmentRequest': + # associate the MSC's MGCP port, but take care to not say the STP is an MSC + crcx_ok = msg.find_message('mgcp', 'rtp_port', rtp_port) + if crcx_ok and crcx_ok.src_entity_is('MGW'): + mgw = crcx_ok.src.entity + + msc = None + if msg.src_entity_is('MSC'): + msc = msg.src.entity + + ids.append(Message.EntityIdent(proto='mgcp', src_port=crcx_ok.src, src_entity=mgw, src_kind='MGW@MSC', rename=True, + dst_entity=msc, dst_port=crcx_ok.dst if msc else None)) + + if rtp_port and msgtype == 'RAB-AssignmentResponse' and msg.src_entity_is('hNodeB'): + ids.append(Message.EntityIdent(proto='rtp', src_kind='hNodeB', src_port=rtp_port, src_entity=msg.src.entity)) + + return ids + + def identify_attach(s, msg:Message, messages, my_idx): + ids = [] + dst_kind = None + msgtype = msg.get_trait('dtap', 'msgtype') + if msgtype in DTAP_COMPL_L3: + dst_kind = 'MSC' + proto = 'IuCS' + elif msgtype in GMM_COMPL_L3: + dst_kind = 'SGSN' + proto = 'IuPS' + + if not dst_kind: + return None + + if 'rua' in msg.layers: + return Message.EntityIdent(proto='Iuh', src_kind='hNodeB', dst_kind='HNBGW') + + # don't mistake the STP as MSC or SGSN + if SCCP_COLLAPSE_STP and not msg.absorbed: + return None + if not SCCP_COLLAPSE_STP and msg.src_entity_is('HNBGW'): + return None + + # FIXME: below only makes sense with SCCP_COLLAPSE_STP == True + if not SCCP_COLLAPSE_STP: + return None + + # find a HNBGW that has recently received the same LU, + # associate IuCS port + src_entity = None + for match in find_same_trait(msg, messages, my_idx, 'dtap', None): + if 'rua' not in match.layers: + continue + if not match.dst_entity_is('HNBGW'): + continue + src_entity = match.dst.entity + if src_entity: + break + + return Message.EntityIdent(proto=proto, src_kind='HNBGW', src_entity=src_entity, dst_kind=dst_kind) + + @classmethod + def identify_conns(cls, messages, my_idx): + msg = messages[my_idx] + + if SCCP_COLLAPSE_STP and not msg.absorbed: + return + + rtp_port = msg.get_trait('ranap', 'rtp_port') + if rtp_port: + rtp_conn = Conn.find('rtp', rtp_port, rtp_port.key()) + if rtp_conn: + for c in msg.src_conns: + rtp_conn.merge_subscr_conns(c) + + +class Layer_gsm_a_bssmap(Layer): + def __init__(s, msg:Message): + p = msg.p + msgtype = p.get('gsm_a_bssmap.msgtype.showname') + + rtp_port = None + ip = p.get('gsm_a_bssmap.aoip_trans_ipv4') + port = p.get('gsm_a_bssmap.aoip_trans_port') + if ip and port: + rtp_port = IpPort.get(ip, port) + + tmsi = tmsi_standardize(p.get('gsm_a_bssmap.gsm_a_tmsi')) + + traits = Traits( + msgtype_nr=str_to_int(p.get('gsm_a_bssmap.msgtype')), + rtp_port=rtp_port, + imsi=p.get('gsm_a_bssmap.e212_imsi'), + tmsi=tmsi, + ) + super().__init__(msg=msg, proto='bssmap', msgtype=msgtype, traits=traits, cap_p_name='gsm_a_bssmap') + + def identify_entities(s, msg:Message, messages, my_idx): + # don't mistake the STP as MSC or BSC + if SCCP_COLLAPSE_STP and not msg.absorbed: + return None + + msgtype = msg.get_trait('bssmap', 'msgtype') + if msgtype in ('Complete-Layer-3-Information', 'Clear-Complete'): + + # associate BSC BSSMAP port with BSC RSL port + src_entity = None + for match in find_same_trait(msg, messages, my_idx, 'dtap', ('tmsi', 'imsi')): + if 'rsl' not in match.layers: + continue + if not match.dst.entity or match.dst.entity.kind != 'BSC': + continue + src_entity = match.dst.entity + if src_entity: + break + + return Message.EntityIdent(proto='bssmap', src_kind='BSC', dst_kind='MSC', src_entity = src_entity) + + if msgtype == 'Assignment-Request': + # This Assignment-Request's rtp_port should match an earlier MGCP CRCX-OK message, and we now + # know that this MSC asked for it. + rtp_port = msg.get_trait('bssmap', 'rtp_port') + if not rtp_port: + return None + + def cond(prev_msg): + return prev_msg.get_trait('mgcp', 'rtp_port') == rtp_port + crcx_ok = None + for prev_msg in find_recent_msg(msg, messages, my_idx, cond): + crcx_ok = prev_msg + break + if not crcx_ok: + return None + msc = msg.src.entity + mgw = crcx_ok.src.entity + return Message.EntityIdent(proto='mgcp', src_port=crcx_ok.src, src_entity=mgw, src_kind='MGW@MSC', rename=True, + dst_port=crcx_ok.dst, dst_entity=msc, dst_kind='MSC') + + + return None + + @classmethod + def identify_conns(cls, messages, my_idx): + msg = messages[my_idx] + msgtype = msg.get_trait('bssmap', 'msgtype') + + if SCCP_COLLAPSE_STP and not msg.absorbed: + return + + # Paging does not have a proper end, it may never be answered. + # The RSL paging command hopefully happens, and closes this Conn. + if msgtype == 'Paging': + imsi = msg.get_trait('bssmap', 'imsi') + if imsi is not None: + c = Conn.open('bssmap', msg.dst, f'page_imsi{imsi}', msg) + Conn.close_conn(c, msg) + tmsi = msg.get_trait('bssmap', 'tmsi') + if tmsi is not None: + c = Conn.open('bssmap', msg.dst, f'page_tmsi{tmsi}', msg) + Conn.close_conn(c, msg) + + rtp_port = msg.get_trait('bssmap', 'rtp_port') + if rtp_port is None: + return + + rtp_conn = Conn.find('rtp', rtp_port, rtp_port.key()) + if rtp_conn is None: + return + for c in msg.src_conns: + rtp_conn.merge_subscr_conns(c) + + def cond(prev_msg): + return prev_msg.get_trait('mgcp', 'rtp_port') == rtp_port + crcx_ok = None + for prev_msg in find_recent_msg(msg, messages, my_idx, cond): + for c in (prev_msg.src_conns + prev_msg.dst_conns): + rtp_conn.merge_subscr_conns(c) + + +class Layer_gsm_abis_rsl(Layer): + def __init__(s, msg:Message): + p = msg.p + msgtype = sane_msgtype(p.get('gsm_abis_rsl.msg_type.showname')) + msgtype_nr = p.get('gsm_abis_rsl.msg_type') + msgtype_nr = str_to_int(msgtype_nr) + + ch = None + ch_imm_ass = None + + # For Immediate Assignment, the assigned TS/chan is more interesting + ts = p.get('gsm_a_ccch.gsm_a_rr_timeslot') + cbits = (p.get('gsm_a_ccch.gsm_a_rr_sdcch4_sdcchc4_cbch') + or p.get('gsm_a_ccch.gsm_a_rr_sdcch8_sdcchc8_cbch')) + try: + ch_ts = int(ts) + ch_cbits = int(cbits) + ch_imm_ass = f'{ch_ts}-{ch_cbits}' + except: + pass + + # normal RSL messages on a given TS/chan + ts = p.get('gsm_abis_rsl.ch_no_tn') + cbits = p.get('gsm_abis_rsl.ch_no_cbits') + if ts is not None and cbits is not None: + try: + ch_ts = int(ts) + ch_cbits = int(cbits) + ch = f'{ch_ts}-{ch_cbits}' + except: + raise + + ch_assign = None + new_ch_ts = p.get('gsm_a_dtap.gsm_a_rr_timeslot') + new_ch_ss = p.get('gsm_a_dtap.gsm_a_rr_tch_facch_sacchf') + if new_ch_ts and new_ch_ss: + ch_assign = f'{new_ch_ts}-{new_ch_ss}' + + rtp_local_port = None + ipacc_rtp_local_ip = p.get('gsm_abis_rsl.ipacc_local_ip') + ipacc_rtp_local_port = p.get('gsm_abis_rsl.ipacc_local_port') + if ipacc_rtp_local_ip and ipacc_rtp_local_port: + rtp_local_port = IpPort.get(ipacc_rtp_local_ip, ipacc_rtp_local_port) + + rtp_remote_port = None + ipacc_rtp_remote_ip = p.get('gsm_abis_rsl.ipacc_remote_ip') + ipacc_rtp_remote_port = p.get('gsm_abis_rsl.ipacc_remote_port') + if ipacc_rtp_remote_ip and ipacc_rtp_remote_port: + rtp_remote_port = IpPort.get(ipacc_rtp_remote_ip, ipacc_rtp_remote_port) + + req_ref_l = (p.get('gsm_abis_rsl.req_ref_ra'), + p.get('gsm_abis_rsl.req_ref_t1prim'), + p.get('gsm_abis_rsl.req_ref_t2'), + p.get('gsm_abis_rsl.req_ref_t3')) + if not all(req_ref_l): + req_ref_l = (p.get('gsm_a_ccch.gsm_a_rr_ra'), + p.get('gsm_a_ccch.gsm_a_rr_t1prim'), + p.get('gsm_a_ccch.gsm_a_rr_t2'), + p.get('gsm_a_ccch.gsm_a_rr_t3')) + req_ref = None + if all(req_ref_l): + req_ref = '-'.join(req_ref_l) + + try: + page_tmsi = tmsi_standardize(msg.p.cap_p.gsm_abis_rsl._all_fields['3gpp.tmsi']) + except: + page_tmsi = None + + traits = Traits( + msgtype_nr=msgtype_nr, + ch=ch, + ch_imm_ass=ch_imm_ass, + ch_assign=ch_assign, + chan_type=p.get('gsm_abis_rsl.ch_type'), + rtp_port=rtp_local_port, + rtp_remote_port=rtp_remote_port, + tmsi=tmsi_standardize(p.get('gsm_abis_rsl.gsm_a_tmsi')), + imsi=p.get('gsm_abis_rsl.gsm_a_imsi'), + page_tmsi=page_tmsi, + arfcn = p.get('gsm_a_dtap.gsm_a_rr_single_channel_arfcn') or p.get('gsm_abis_rsl.gsm_a_rr_single_channel_arfcn'), + req_ref = req_ref, + ) + + super().__init__(msg=msg, proto='rsl', msgtype=msgtype, traits=traits, cap_p_name='gsm_abis_rsl') + # ignore CCCH Load INDication + #if msgtype_nr == 18: + # msg.hide = True + + def identify_entities(s, msg:Message, messages, my_idx): + ids = [] + msgtype = msg.get_trait('rsl', 'msgtype') + + if msgtype in ('RF-RESource-INDication', 'CCCH-LOAD-INDication', 'CHANnel-ReQuireD', ): + # INDication from BTS to BSC + ids.append(Message.EntityIdent(proto='rsl', src_kind='BTS', dst_kind='BSC')) + if msgtype in ('CHANnel-ACTIVation', 'IMMEDIATE-ASSIGN-COMMAND'): + # from BSC to BTS + ids.append(Message.EntityIdent(proto='rsl', dst_kind='BTS', src_kind='BSC')) + + rtp_port = msg.get_trait('rsl', 'rtp_port') + if rtp_port and msg.src_entity_is('BTS') and msgtype in ( + 'ip.access-CRCX-ACK', 'ip.access-MDCX-ACK'): + ids.append(Message.EntityIdent(proto='rtp', src_kind='BTS', src_entity=msg.src.entity, src_port=rtp_port)) + + if msgtype == 'ip.access-MDCX': + # This ip.a MDCX's rtp_remote_port should match an earlier MGCP CRCX-OK message, and we now + # know that this BSC asked for it. + + rtp_port = msg.get_trait('rsl', 'rtp_remote_port') + if not rtp_port: + return None + + def cond(prev_msg): + return prev_msg.get_trait('mgcp', 'rtp_port') == rtp_port + crcx_ok = None + for prev_msg in find_recent_msg(msg, messages, my_idx, cond): + crcx_ok = prev_msg + break + if not crcx_ok: + return None + bsc = msg.src.entity + mgw = crcx_ok.src.entity + ids.append( Message.EntityIdent(proto='mgcp', src_port=crcx_ok.src, src_entity=mgw, src_kind='MGW@BSC', rename=True, + dst_port=crcx_ok.dst, dst_entity=bsc, dst_kind='BSC') ) + + return ids + + def collapse(s, messages, my_idx): + # combine duplicates like rsl.CCCH-LOAD-INDication + for i in reversed(range(my_idx)): + prev_msg = messages[i] + if not prev_msg: + continue + if prev_msg.finalized: + break + # stop combining at any non-rsl (and non-minor) message + if not 'rsl' in prev_msg.layers: + if all(l.minor for l in prev_msg.layers.values()): + continue + else: + break + if not same_nonempty(prev_msg.get_traits('rsl'), s.msg.get_traits('rsl')): + continue + if s.msg.same_src_dst(prev_msg, forward=True): + # found a recent similar packet, combine + prev_msg.count += 1 + messages[my_idx] = None + prev_msg.absorb_msg(s.msg) + return prev_msg + return s.msg + + @classmethod + def identify_conns_ra(cls, messages:list, my_idx:int): + msg = messages[my_idx] + + msgtype = msg.get_trait('rsl', 'msgtype') + + bts = msg.entity('BTS') + if bts is None: + return None + bts_port = msg.get_port('BTS') + if bts_port is None: + return None + ra = msg.get_trait('rsl', 'req_ref') + if ra is None: + return None + bts_ra = f'{bts_port.entity.label()}.ra{ra}' + + proto = 'rsl' + conn_id = bts_ra + + conn = None + + if msgtype == 'CHANnel-ReQuireD': + if not msg.src_entity_is('BTS'): + return + bts = msg.src.entity + bts_port = msg.src + bsc = msg.dst.entity + bsc_port = msg.dst + conn = Conn.open(proto, bts_port, conn_id, msg, entity=bts) + Conn.open(proto, bsc_port, conn_id, msg, entity=bsc, counterparts=[conn]) + + elif msgtype in ('CHANnel-ACTIVation'): + conn = Conn.message(proto, msg.dst, conn_id, msg) + + elif msgtype == 'IMMEDIATE-ASSIGN-COMMAND': + # the RA token has fulfilled its use as soon as an IMM ASS happened + Conn.close(proto, msg.dst, conn_id, msg) + + else: + Conn.message(proto, msg.dst, conn_id, msg) + + if conn: + for c in (msg.src_conns + msg.dst_conns): + conn.merge_subscr_conns(c) + + @classmethod + def identify_conns_ch(cls, messages:list, my_idx:int): + msg = messages[my_idx] + proto = 'rsl' + msgtype = msg.get_trait('rsl', 'msgtype') + + # For Immediate Assignment, the assigned TS/chan is the one to match on + if msgtype == 'IMMEDIATE-ASSIGN-COMMAND': + ch = msg.get_trait('rsl', 'ch_imm_ass') + else: + ch = msg.get_trait('rsl', 'ch') + + if ch is None: + return None + + if msgtype in ('CHANnel-ACTIVation') or msg.src_entity_is('BSC'): + bsc = msg.src.entity + bsc_port = msg.src + bts = msg.dst.entity + bts_port = msg.dst + elif msg.src_entity_is('BTS'): + bts = msg.src.entity + bts_port = msg.src + bsc = msg.dst.entity + bsc_port = msg.dst + else: + return None + + if bts_port.entity is None: + return None + + bts_ch = f'{bts_port.entity.label()}.ch{ch}' + conn_id = bts_ch + + if msgtype == 'CHANnel-ACTIVation': + conn = Conn.open(proto, bts_port, conn_id, msg, entity=bts) + Conn.open(proto, bsc_port, conn_id, msg, entity=bsc, counterparts=[conn]) + elif msgtype == 'RF-CHANnel-RELease-ACKnowledge': + conn = Conn.close(proto, msg.src, conn_id, msg) + else: + conn = Conn.message(proto, bts_port, conn_id, msg) + if conn: + for c in (msg.src_conns + msg.dst_conns): + conn.merge_subscr_conns(c) + + # when changing to a new ch, e.g. a regular Assignment Command to change from SDCCH to TCH, + # associate the new channel with the conn + ch_assign = msg.get_trait('rsl', 'ch_assign') + if ch_assign: + conn_id = f'{bts_port.entity.label()}.ch{ch_assign}' + conn = Conn.find(proto, bts_port, conn_id) + if conn: + conn.add_message(msg) + for c in (msg.src_conns + msg.dst_conns): + conn.merge_subscr_conns(c) + + + @classmethod + def identify_conns_rtp(cls, messages:list, my_idx:int): + msg = messages[my_idx] + # BTS RTP + rtp_port = msg.get_trait('rsl', 'rtp_port') + if rtp_port and msg.src_entity_is('BTS') and msg.get_trait('rsl','msgtype') == 'ip.access-CRCX-ACK': + rtp_port.entity = msg.src.entity + rtp_conn = Conn.open('rtp', rtp_port, conn_id=rtp_port.key(), start_msg=msg, add_message=False) + for c in (msg.src_conns + msg.dst_conns): + rtp_conn.merge_subscr_conns(c) + + # MGW@BSC RTP towards BTS + rtp_port = msg.get_trait('rsl', 'rtp_remote_port') + if rtp_port: + rtp_conn = Conn.find('rtp', rtp_port, conn_id=rtp_port.key()) + if rtp_conn: + rtp_conn.add_message(msg) + for c in (msg.src_conns + msg.dst_conns): + rtp_conn.merge_subscr_conns(c) + + @classmethod + def identify_conns_paging(cls, messages:list, my_idx:int): + msg = messages[my_idx] + msgtype = msg.get_trait('rsl', 'msgtype') + + if msgtype != 'PAGING-CoMmanD': + return + + tmsi = msg.get_trait('rsl', 'page_tmsi') + if tmsi is None: + return + conn_id = f'page_tmsi{tmsi}' + + subscr = Subscriber.by_tmsi(tmsi) + if subscr is None: + return + + rsl_conn = Conn.open('rsl', msg.src, conn_id, msg) + subscr_conn = SubscriberConn() + subscr.add_subscriber_conn(subscr_conn) + subscr_conn.add_conn(rsl_conn) + + # Paging does not have a proper end, it may never be answered. + # A Conn wants to be closed at some point. Just close it directly. + Conn.close_conn(rsl_conn, msg) + + # Expecting a recent Paging on BSSMAP + bssmap_paging = None + def cond(prev_msg): + return (prev_msg.get_trait('bssmap', 'msgtype') == 'Paging' + and prev_msg.get_trait('bssmap', 'tmsi') == tmsi) + for match in find_recent_msg(msg, messages, my_idx, cond): + bssmap_paging = match + break + + if bssmap_paging is None: + return + + bssmap_conn = Conn.find('bssmap', bssmap_paging.dst, conn_id, find_in_closed_conns=True) + if bssmap_conn: + rsl_conn.merge_subscr_conns(bssmap_conn) + + @classmethod + def identify_conns(cls, messages:list, my_idx:int): + cls.identify_conns_ra(messages, my_idx) + cls.identify_conns_ch(messages, my_idx) + cls.identify_conns_rtp(messages, my_idx) + cls.identify_conns_paging(messages, my_idx) + +class Layer_gsm_a_dtap(Layer): + def __init__(s, msg:Message): + dtap = msg.p.get('gsm_a_dtap') + assert dtap is not None + + msgtype = None + msgtype_nr = None + for f in dtap._get_all_fields_with_alternates(): + if f.name.startswith('gsm_a.dtap.msg_') and f.name.endswith('_type'): + msgtype = f.showname_value + try: + msgtype_nr = int(f.raw_value) + except: + pass + traits = Traits( + msgtype_nr=msgtype_nr, + imsi=msg.p.get('gsm_a_dtap.e212_imsi') or msg.p.get('gsm_a_dtap.gsm_a_imsi'), + tmsi=tmsi_standardize(msg.p.get('gsm_a_dtap.gsm_a_tmsi') or msg.p.get('gsm_a_dtap.3gpp_tmsi')), + imei=msg.p.get('gsm_a_dtap.gsm_a_imei'), + to_msisdn=msg.p.get('gsm_a_dtap.cld_party_bcd_num'), + ) + super().__init__(msg=msg, proto='dtap', msgtype=msgtype, traits=traits, cap_p_name='gsm_a_dtap') + + +class Layer_gsup(Layer): + def __init__(s, msg:Message): + msgtype = sane_msgtype(msg.p.get('gsup.msg_type.showname')) + msisdn = None + if '-forwardSM-' not in msgtype: + msisdn = msg.p.get('gsup.e164_msisdn') + to_msisdn = msg.p.get('gsup.gsm_sms_tp_da') + + msgtype_nr = None + is_request = None + try: + msgtype_nr = int(msg.p.get('gsup.msg_type')) + is_request = not (msgtype_nr & 0x3) + except: + pass + + session_state = None + try: + session_state = int(msg.p.get('gsup.session_state')) + except: + pass + + traits = Traits( + imsi=msg.p.get('gsup.e212_imsi'), + msgtype_nr=msgtype_nr, + is_request=is_request, + cn_domain=sane_showname(msg.p.get('gsup.cn_domain.showname')), + msisdn=msisdn, + to_msisdn=to_msisdn, + source_name=msg.p.get('gsup.source_name_text'), + destination_name=msg.p.get('gsup.destination_name_text'), + session_id=msg.p.get('gsup.session_id'), + session_state=session_state, + ) + super().__init__(msg=msg, proto='gsup', msgtype=msgtype, traits=traits) + + @classmethod + def identify_conns(cls, messages:list, my_idx:int): + msg = messages[my_idx] + imsi = msg.get_trait('gsup', 'imsi') + is_request = msg.get_trait('gsup', 'is_request') + session_id = msg.get_trait('gsup', 'session_id') + session_state = msg.get_trait('gsup', 'session_state') + if not imsi: + return + proto = 'gsup' + if session_id: + conn_id = f'{imsi}:{session_id}' + have = Conn.find(proto, msg.src, conn_id) or Conn.find(proto, msg.dst, conn_id) + if have is None: + have = Conn.open(proto, msg.src, conn_id, msg, + counterparts=[Conn.open(proto, msg.dst, conn_id, msg)]) + if session_state is not None and session_state == 0x3: + Conn.close(proto, msg.src, conn_id, msg) + else: + have.add_message(msg) + + else: + conn_id = f'{imsi}' + + if is_request: + have = Conn.find(proto, msg.src, conn_id) + if have is None: + Conn.open(proto, msg.src, conn_id, msg) + else: + have.add_message(msg) + else: + have = Conn.find(proto, msg.dst, conn_id) + if have is None: + # it's a stray response? anyway create a conn for association with a subscriber + Conn.open(proto, msg.dst, conn_id, msg) + Conn.close(proto, msg.dst, conn_id, msg) + + def identify_entities(s, msg:Message, messages, my_idx): + if msg.get_trait('gsup', 'msgtype') in ('SendAuthInfo-Request', 'UpdateLocation-Request', 'PurgeMS-Request'): + cn = msg.get_trait('gsup', 'cn_domain') + src_kind = None + src_entity = None + src_subscr_conn = None + if msg.get_trait('gsup', 'source_name'): + # proxy forwarding + src_kind = 'HLR' + traits = [name for proto, name, result in msg.get_traits('gsup') if name not in ('source_name', 'destination_name')] + for match in find_same_trait(msg, messages, my_idx, 'gsup', traits): + if not match.dst_entity_is('HLR'): + continue + src_entity = match.dst.entity + break + else: + if cn == 'CS': + src_kind = 'MSC' + elif cn == 'PS': + src_kind = 'SGSN' + + if src_kind: + # associate MSC GSUP port with MSC BSSMAP port + imsi = msg.get_trait('gsup', 'imsi') + if imsi: + subscr = Subscriber.by_imsi(imsi) + def cond(prev_msg): + return (prev_msg.dst_entity_is(src_kind) + and prev_msg.is_subscriber_related(subscr)) + for match in find_recent_msg(msg, messages, my_idx, cond): + src_entity = match.dst.entity + break + + # i forgot whatever this does: + if not src_entity: + src_entity, src_subscr_conn = s.msg.find_entity(src_kind, + with_port=('bssgp', 'IuPS')) + else: + # no cn_domain in the GSUP message, try to guess + msc, msc_subscr_conn = s.msg.find_entity('MSC') + sgsn, sgsn_subscr_conn = s.msg.find_entity('SGSN') + if msc and not sgsn: + src_entity = msc + src_subscr_conn = msc_subscr_conn + if sgsn and not msc: + src_entity = sgsn + src_subscr_conn = sgsn_subscr_conn + + if src_subscr_conn is not None and msg.src_conns: + for c in msg.src_conns: + c.subscriber_conn = SubscriberConn.merge(c.subscriber_conn, src_subscr_conn) + + return Message.EntityIdent(proto='gsup', src_kind=src_kind, dst_kind='HLR', src_entity=src_entity, dst_entity=msg.dst.entity) + +class Layer_sip(Layer): + def __init__(s, msg:Message): + method = msg.p.get('sip.method') + cseq_method = msg.p.get('sip.cseq_method') + status_code = msg.p.get('sip.status_code') + status_line = msg.p.get('sip.status_line') + if status_line: + if '--' in status_line: + status_line = status_line[:status_line.index('--')] + status_line = status_line.strip() + status = status_line.split()[-1] + else: + status = status_code + if status: + msgtype = f'{cseq_method}-{status}' + elif method: + msgtype = method + else: + msgtype = cseq_method + + sip_from_host = msg.p.get('sip.from_host') + sip_from_port = msg.p.get('sip.from_port') + sip_from = IpPort.get(sip_from_host, sip_from_port) + + rtp_port = None + ip = msg.p.get('sip.sdp_connection_info_address') + port = msg.p.get('sip.sdp_media_port') + if ip and port: + rtp_port = IpPort.get(ip, port) + + server = msg.p.get('sip.Server') + agent = msg.p.get('sip.User-Agent') + + traits = Traits( + sip_agent=server or agent, + sip_from=sip_from, + call_id = msg.p.get('sip.call_id'), + method = cseq_method, + seq = msg.p.get('sip.cseq_seq'), + to_msisdn = msg.p.get('sip.to_user'), + from_msisdn = msg.p.get('sip.from_user'), + from_tag = msg.p.get('sip.from_tag'), + r_uri = msg.p.get('sip.r_uri'), + status_code = status_code, + rtp_port=rtp_port, + ) + super().__init__(msg=msg, proto='sip', msgtype=msgtype, traits=traits) + + def identify_entities(s, msg:Message, messages, my_idx): + src_kind = 'SIP' + sip_from = msg.get_trait('sip', 'sip_from') + agent = msg.get_trait('sip', 'sip_agent') + + rename = False + if agent and sip_from and sip_from == msg.src: + rename = 'src' + if agent.startswith('kamailio'): + src_kind = 'PBX' + elif agent.startswith('sofia'): + src_kind = 'SIPCON' + else: + rename = False + + return Message.EntityIdent(proto='sip', src_kind=src_kind, dst_kind='SIP', rename=rename) + + @classmethod + def identify_conns(cls, messages:list, my_idx:int): + msg = messages[my_idx] + + msgtype = msg.get_trait('sip', 'msgtype') + call_id = msg.get_trait('sip', 'call_id') + sip_from = msg.get_trait('sip', 'sip_from') + + if msgtype in ('INVITE','INVITE-OK') and sip_from and sip_from == msg.src: + conn = Conn.open('sip', msg.src, call_id, start_msg=msg) + rtp_port = msg.get_trait('sip', 'rtp_port') + if rtp_port is None: + return + + rtp_conn = Conn.find('rtp', rtp_port, conn_id=rtp_port.key()) + if rtp_conn is None: + return + if not conn.subscriber_conn: + conn.merge_subscr_conns(rtp_conn) + else: + conn = Conn.message('sip', msg.src, call_id, msg) + + if conn is None: + return + + +class Layer_gsmtap_log(Layer): + def __init__(s, msg:Message): + app = msg.p.get('gsmtap_log.ident') + level = msg.p.get('gsmtap_log.level') + level_str = sane_showname(msg.p.get('gsmtap_log.level.showname')) + logmsg = msg.p.get('gsmtap_log.string') + cat = msg.p.get('gsmtap_log.subsys') + + if level_str != 'ERROR': + return + + return + + msgtype = f'{app}.{level_str}' + msg.log(Color.colored('red', logmsg)) + + traits = Traits( + msgtype=msgtype, + level=level, + app=app, + cat=cat, + logmsg=logmsg, + ) + super().__init__(msg=msg, proto='log', msgtype=msgtype, traits=traits) + + def identify_entities(s, msg:Message, messages, my_idx): + app = msg.get_trait('log', 'app') + if app.startswith('Osmo'): + app = app[4:] + return Message.EntityIdent(proto='log', src_kind=app, dst_kind='LOG') + + +class MessageFilter: + def __init__(s, layer=None, idx=None, values=[], negate=False): + set_instance_vars_from_args() + + def matches(s, msg:Message): + r = s._matches(msg) + if s.negate: + return not r + return r + + def _matches(s, msg:Message): + if s.layer and s.layer not in msg.layers: + return False + if s.idx and not (msg.p.idx == s.idx or any(a.p.idx == s.idx for a in msg.absorbed)): + return False + if s.values: + layers = s.layer or msg.layers.keys() + for k,v in s.values: + for proto, name, result in msg.get_traits(layers, k): + if v is None and name == k: + return True + if result is None or result == v: + return True + p_layers = None + if s.layer: + msg_layer = msg.layers.get(s.layer) + if msg_layer: + p_layers = [msg_layer.cap_p_name] + else: + p_layers = [s.layer] + else: + p_layers = [layer.cap_p_name for layer in msg.layers.values()] + for k,v in s.values: + for p_layer in p_layers: + p_val = msg.p.get(p_layer + '.' + k) + if p_val is None: + continue + if v is None: + return True + if v == str(p_val): + return True + return False + return True + + @classmethod + def debug(cls, flt_list, msg:Message): + layers = set() + for flt in flt_list: + if flt.negate: + continue + if flt.layer is None: + layers.update(msg.layers.keys()) + else: + layers.add(flt.layer) + for layer_name, layer in msg.layers.items(): + if layer_name not in layers: + continue + LOG(dir_p(msg.p, layer.cap_p_name)) + + def __repr__(s): + t = [] + if s.negate: + t.append('NOT') + if s.layer: + t.append(f'layer {s.layer!r}') + if s.idx: + t.append(f'packet nr {s.idx}') + if s.values: + t_v = [(f'{k} == {v!r}' if v is not None else f'has {k}') for k,v in s.values] + t.append('values: ' + (' or '.join(t_v))) + return ', '.join(t) + + word_re = re.compile('^[a-zA-Z0-9_=-]*') + @classmethod + def parse(cls, spec_str): + if spec_str is None: + return [] + if not spec_str: + return [MessageFilter()] + filters = [] + token = None + try: + for token in spec_str.split(','): + negate = False + if token.startswith('!'): + token = token[1:] + negate = True + layer = cls.word_re.match(token).group() + flt = MessageFilter(layer=layer, negate=negate) + rest = cls.word_re.split(token)[1] + while rest: + char = rest[0] + rest = rest[1:] + word = cls.word_re.match(rest).group() + rest = cls.word_re.split(rest)[1] + if char == '#': + flt.idx = int(word) + elif char == '.': + if not flt.values: + flt.values = [] + if '=' in word: + name = word[:word.index('=')] + val = word[word.index('=')+1:] + flt.values.append((name, val)) + else: + flt.values.append((word, None)) + else: + raise Exception('Unknown token: %r' % (char + word)) + filters.append(flt) + return filters + except: + out_error('Some mistake in message filter: %r in token %r' % (spec_str, token)) + raise + + @classmethod + def match(cls, flt_list, msg): + any_match = False + for flt in flt_list: + r = flt.matches(msg) + if flt.negate and not r: + return False + any_match = any_match or r + return any_match + + + DOC = '''messagefilter examples, for --filter-msg and --debug: + dtap + All messages that contain a DTAP layer. + dtap.msgtype=Identity-Request + All DTAP msgtype=Identity-Request messages; for debug, show + dtap layer. + .msgtype=Identity-Request + All msgtype=Identity-Request messages; for debug, show all + layers. (Value names can be either parsed traits or raw packet + names.) + sccp.src_lref=0x00010000.dst_lref=0x00010000 + All messages with an SCCP layer and either src_lref or dst_lref + == 0x00010000. + .imsi + All messages where any layer contains a value named 'imsi'. + '#123' + Message number 123 (don't forget to quote for the shell). + 'gsup,#614,sccp.src_lref=0x00010000' + Show all GSUP messages, packet number 614 and all SCCP with + given source local reference. + '!rsl.msgtype=CCCH-LOAD-INDication' + Don't show CCCH-LOAD-INDication message types (quote for the + shell). + ''' + +class UI: + def __init__(s, opts, finalize_after_seconds=5): + set_instance_vars_from_args() + + s.messages = [] + s.finalized_idx = -1 + + s.show_traits = None + if s.opts.show_traits: + if s.opts.show_traits == 'all': + s.show_traits = True + else: + s.show_traits = s.opts.show_traits.split(',') + s.show_conns = None + if s.opts.show_conns: + if s.opts.show_conns == 'all': + s.show_conns = True + else: + s.show_conns = s.opts.show_conns.split(',') + s.filter_msg = MessageFilter.parse(s.opts.filter_msg) + for flt in s.filter_msg: + out_text_now('Filter-msg:', flt) + s.debug = MessageFilter.parse(s.opts.debug) + for dbg in s.debug: + out_text_now('Debug:', dbg) + + s.filter_subscr = [] + if s.opts.filter_subscr: + tokens = s.opts.filter_subscr.split(',') + names = ('imsi', '0x', 'imei', 'msisdn', 'tmsi') + for token in tokens: + handled = False + for name in names: + if token.startswith(name): + token_val = token[len(name):] + if not token_val.isdigit(): + continue + s.filter_subscr.append(name + token[len(name):]) + handled = True + break + if not handled: + s.filter_subscr.append(token) + s.filter_subscr.extend([name + token for name in names]) + + def out_text_now(s, *args, **kwargs): + 'to be implemented by child class' + assert False + + def out_error(s, *args, **kwargs): + 'to be implemented by child class' + assert False + + def msg_finalized(s, msg, apply_filter=True): + 'to be implemented by child class' + assert False + + def flush_msg(s, msg): + msg.finalized = True + s.msg_finalized(msg) + + def msg_filter(s, msg): + '''Return True when the message passes active message filtering, False if it should be hidden''' + if s.filter_msg and not MessageFilter.match(s.filter_msg, msg): + return False + if all(l.minor for l in msg.layers.values()): + return False + if msg.hide: + return False + if s.filter_subscr: + match = False + match_vals = set() + for subscr in msg.related_subscribers(): + match_vals.update((f'imsi{subscr.imsi}', f'imei{subscr.imei}', f'msisdn{subscr.msisdn}')) + match_vals.update(subscr.tmsis) + match_vals.update(f'tmsi{tmsi[2:]}' for tmsi in subscr.tmsis) + match_vals.update(f'tmsi{tmsi}' for tmsi in subscr.tmsis) + + if not any(token in match_vals for token in s.filter_subscr): + return False + return True + + def flush(s, timestamp_now=0, finalize_after_seconds=0): + flush_t = timestamp_now - finalize_after_seconds + for i in range(s.finalized_idx+1, len(s.messages)): + msg = s.messages[i] + if not msg: + continue + if timestamp_now and msg.timestamp > flush_t: + break + s.finalized_idx = i + s.flush_msg(msg) + + def start(s): + pass + + def stop(s): + pass + + def add_msg(s, msg): + global g_current_msg + s.flush(msg.timestamp, s.finalize_after_seconds) + try: + if s.debug and any(dbg_filter.matches(msg) for dbg_filter in s.debug): + msg.debug = True + g_current_msg = msg + if not msg.layers: + return + s.messages.append(msg) + idx = len(s.messages) - 1 + changed_msg = msg.collapse(s.messages, idx) + # if the received message was absorbed by another, continue to identify the modified message using the + # new index + if changed_msg is not None and changed_msg is not msg: + msg = changed_msg + idx = s.messages.index(msg) + msg.identify_entities(s.messages, idx) + msg.identify_conns(s.messages, idx) + Subscriber.identify_subscriber(msg) + + except: + s.flush() + + out_error('Exception') + raise + + def process_messages(s, msg_src): + for msg in msg_src.next(): + if 0 and msg.p.idx in (3676,): + msg.log('rsl all_fields ', msg.p.cap_p.gsm_abis_rsl._all_fields) + msg.log(msg.p.all_str('gsm_abis_rsl')) + + s.add_msg(msg) + s.flush() + msg_src.done() + +class UI_Plain(UI): + def out_text_now(s, *args, **kwargs): + print(to_text(*args, **kwargs)) + + def out_error(s, *args, **kwargs): + s.out_text_now(Color.colored('red', '*** ERROR:'), *args, **kwargs) + if g_current_msg: + s.out_text_now(Color.colored('red', '*** ERROR: while processing msg'), g_current_msg.str(show_traits=True, show_conns=True)) + s.out_text_now(trace()) + + def msg_print(s, msg, apply_filter=True): + if apply_filter and not s.msg_filter(msg): + return + s.out_text_now(msg.str(ladder=True, one_column_per_kind=True, show_traits=s.show_traits, show_conns=s.show_conns)) + if s.debug and MessageFilter.match(s.debug, msg): + MessageFilter.debug(s.debug, msg) + + def msg_finalized(s, msg): + s.msg_print(msg, apply_filter=True) + +class UI_Quiet(UI_Plain): + def msg_finalized(s, msg): + pass + +class UI_Curses(UI): + pass + +class MsgSource: + def __init__(s, opts): + set_instance_vars_from_args() + s.start_t = None + s.p_min_t = None + s.p_max_t = None + s.end_t = None + + def _next_cap_p(s): + assert False + + def next(s) -> Message: + p_idx = 0 + s.start_t = time.time() + warn_t = s.start_t + warn_p_t = None + for cap_p in s._next_cap_p(): + p_idx += 1 + if p_idx < s.opts.packet_start: + continue + if s.opts.packet_count and (p_idx - s.opts.packet_start) > s.opts.packet_count: + break + if s.opts.packet_end and p_idx > s.opts.packet_end: + break + msg = Message.parse(Packet(p_idx, cap_p)) + if msg is None or not msg.layers: + continue + s.p_min_t = msg.timestamp if s.p_min_t is None else min(s.p_min_t, msg.timestamp) + s.p_max_t = msg.timestamp if s.p_max_t is None else max(s.p_max_t, msg.timestamp) + + now = time.time() + if warn_p_t is None or now > warn_t + 3: + if warn_p_t: + packet_time = s.p_max_t - warn_p_t + real_time = now - warn_t + if real_time > (1.3 * packet_time): + out_text_now(f'! taking longer to calculate than packets arrive by {100.*(real_time - packet_time)/packet_time:.1f}%') + warn_t = now + warn_p_t = s.p_max_t + + yield msg + + def done(s): + if s.start_t is None or s.p_min_t is None: + out_text_now('Nothing processed.') + s.end_t = time.time() + out_text_now(f'packet time: {s.p_max_t - s.p_min_t:.1f}s in real time: {s.end_t - s.start_t:.1f}s') + +class MsgSource_File(MsgSource): + def __init__(s, path, opts): + set_instance_vars_from_args() + super().__init__(opts) + + def _next_cap_p(s): + for cap_p in pyshark.FileCapture(s.path): + yield cap_p + +class MsgSource_Live(MsgSource): + def __init__(s, iface, opts): + set_instance_vars_from_args() + super().__init__(opts) + + def _next_cap_p(s): + for cap_p in pyshark.LiveCapture(s.iface).sniff_continuously(): + yield cap_p + +class MsgSource_Pipe(MsgSource): + def __init__(s, opts): + super().__init__(opts) + + def _next_cap_p(s): + from pyshark.capture.pipe_capture import PipeCapture + for cap_p in PipeCapture(sys.stdin): + yield cap_p + +def run_tests(): + def out_test(*args, **kwargs): + print(*args, **kwargs) + + d = dddict() + d.sset(('a', 'b', 'c'), 'abc') + d.sset(('a', 'b', 'd'), 'abd') + out_test(d) + assert d == {'a': {'b': {'c': 'abc', 'd': 'abd'}}} + def verify_gget(keys, expect): + val = d.gget(keys) + out_test('gget:', keys,'=',val) + assert val == expect + verify_gget(('a', 'b', 'c'), 'abc') + verify_gget(('a', 'b', 'd'), 'abd') + verify_gget(('a', 'b', 'x'), None) + verify_gget(('a', 'b'), {'c': 'abc', 'd': 'abd'}) + verify_gget(('a',), {'b': {'c': 'abc', 'd': 'abd'}}) + verify_gget(('x',), None) + def verify_ppop(keys, expect): + try: + val = d.ppop(keys) + assert expect is not None + except KeyError: + assert expect is None + out_test('ppop:', keys,'=',None) + return + out_test('ppop:', keys,'=',val) + assert val == expect + assert d.gget(keys) is None + verify_ppop(('a', 'b', 'c'), 'abc') + verify_ppop(('a', 'b', 'd'), 'abd') + verify_ppop(('a', 'b', 'x'), None) + d.sset(('a', 'b', 'c'), 'abc') + d.sset(('a', 'b', 'd'), 'abd') + verify_ppop(('a', 'b'), {'c': 'abc', 'd': 'abd'}) + d.sset(('a', 'b', 'c'), 'abc') + d.sset(('a', 'b', 'd'), 'abd') + verify_ppop(('a',), {'b': {'c': 'abc', 'd': 'abd'}}) + verify_ppop(('x',), None) + +SUBSCRIBERFILTER_DOC = '''subscriberfilter examples, for --filter-subscr: + 123 + Show subscriber where any value matches 123 (probably only MSISDN will + match, because '123' is too short for IMSI etc). + imsi123456789012345 + imei123456789012345 + msisdn123456789012345 + tmsi1234abcd + Show subscriber with the given IMSI/IMEI/MSISDN/TMSI. + imsi123456,imsi987654 + Show both these IMSIs. + imsi123456,msisdn123,1234abcd,imei987654 + Show all of these subscribers: IMSI 123456, MSISDN 123, TMSI 0x1234abcd + and IMEI 987654. +''' +def parse_args(): + import argparse + parser = argparse.ArgumentParser(description=__doc__ + + '\n' + SUBSCRIBERFILTER_DOC + +'\n' + MessageFilter.DOC, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument('--pcap-file', '-f', metavar='file') + parser.add_argument('--live-capture', '-l', metavar='interface') + parser.add_argument('--stdin-capture', '-i', action='store_true') + parser.add_argument('--packet-start', '-S', default=0, type=int) + parser.add_argument('--packet-count', '-C', default=0, type=int) + parser.add_argument('--packet-end', '-E', default=0, type=int) + parser.add_argument('--ui', '-u', metavar='USER-INTERFACE', + help='How to display messages: plain / p = print to stdout as plain-text;' + ' curses / c = interactive curses interface; none / n = quiet', + default='plain') + parser.add_argument('--filter-subscr', default=None, + help='Show only messages related to the given subscriberfilter') + parser.add_argument('--filter-msg', default=None, metavar='messagefilter', + help='Show only messages matching this messagefilter') + parser.add_argument('--show-traits', default=None) + parser.add_argument('--show-conns', default=None, help="'all' for all, or specific conn names (comma separated)") + parser.add_argument('--collapse-stp', '-s', default=None, help="BSSAP via STP may appear duplicated. '-s1' collapses the duplicates, -s0 does not.") + parser.add_argument('--test', action='store_true') + parser.add_argument('--debug', metavar='messagefilter', + help='Show a lot more info on messages matching this messagefilter') + return parser.parse_args() + +def main(): + opts = parse_args() + + if opts.test: + run_tests() + else: + if (opts.collapse_stp or '').upper() in ['1', 'Y', 'YES', 'TRUE']: + SCCP_COLLAPSE_STP = True + if (opts.collapse_stp or '').upper() in ['0', 'N', 'NO', 'FALSE']: + SCCP_COLLAPSE_STP = False + ui_class = None + ui_type = opts.ui or 'none' + if 'plain'.startswith(ui_type): + ui_class = UI_Plain + elif 'curses'.startswith(ui_type): + ui_class = UI_Curses + elif 'none'.startswith(ui_type): + ui_class = UI_Quiet + else: + ERR('Unknown UI type:', repr(ui_type)) + return 1 + g_ui = ui_class(opts) + msg_source = None + if opts.pcap_file: + msg_source = MsgSource_File(opts.pcap_file, opts) + elif opts.live_capture: + msg_source = MsgSource_Live(opts.live_capture, opts) + elif opts.stdin_capture: + msg_source = MsgSource_Pipe(opts) + else: + ERR('No message source, try `-l any` or `-f my.pcap`') + return 1 + g_ui.process_messages(msg_source) + g_ui.flush() + if Conn.open_conns: + print('still open conns:', repr(Conn.open_conns)) + return 0 + +if __name__ == '__main__': + if False: + import cProfile + cProfile.run('main()', sort='tottime') + else: + exit(main()) +# vim: noexpandtab tabstop=8 shiftwidth=8