libosmocore/utils/osmo-gsm-shark

3595 lines
99 KiB
Python
Executable File

#!/usr/bin/env python3
# osmo-gsm-shark: produce a ladder diagram from and/or filter a GSM network pcap by subscriber.
# Copyright (C) 2019 by Neels Hofmeyr <neels@hofmeyr.de>
#
# All Rights Reserved
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
'''osmo-gsm-shark: produce a ladder diagram from and/or filter a GSM network pcap by subscriber.
Copyright (C) 2019 by Neels Hofmeyr <neels@hofmeyr.de>
SPDX-License-Identifier: GPL-2.0+
This tool uses tshark (pyshark) to analyze a pcap file or a live network capture to:
- Associate almost all messages with a subscriber. It is possible to filter by subscriber.
- Separate the different network elements (BSC, MSC, hNodeB, ...).
- Output a ladder diagram.
- Combine repetitive messages.
- Combine/abstract messages into short activity summary.
Examples:
osmo-gsm-shark -f trace.pcapng
osmo-gsm-shark -l any
osmo-gsm-shark -l any --filter-subscr 901701234567123
osmo-gsm-shark -f trace.pcapng --filter-msg dtap
'''
import collections
import pyshark
import re
import sys
import types
import time
import traceback
SHOW_ALL_DEBUG = False
SHOW_ALL_LAYERS = False
SCCP_COLLAPSE_STP = True
IUH_COLLAPSE_HNBGW = True # doesnt work
DTAP_COMPL_L3 = ('Location-Updating-Request', 'CM-Service-Request', 'Paging-Response', 'IMSI-Detach-Indication')
GMM_COMPL_L3 = ('Attach-Request', 'Detach-Request')
class Color:
codes = (
('red', '\033[1;31m'),
('green', '\033[1;32m'),
('yellow', '\033[1;33m'),
('blue', '\033[1;34m'),
('purple', '\033[1;35m'),
('cyan', '\033[1;36m'),
('darkred', '\033[31m'),
('darkgreen', '\033[32m'),
('darkyellow', '\033[33m'),
('darkblue', '\033[34m'),
('darkpurple', '\033[35m'),
('darkcyan', '\033[36m'),
('darkgrey', '\033[1;30m'),
('brightwhite', '\033[1;37m'),
)
codes_dict = dict(codes)
end = '\033[0;m'
def colored(code, text):
if type(code) is int:
code = Color.codes[code % len(Color.codes)][1]
else:
code = Color.codes_dict[code]
return f'{code}{text}{Color.end}'
def set_instance_vars_from_args(*ignore, self='s'):
f = sys._getframe(1).f_locals
s = f.get(self)
for k,v in f.items():
if v is s:
continue
if k in ignore:
continue
setattr(s, k, v)
def same_nonempty(a, b):
if isinstance(a, types.GeneratorType):
return list(a) == list(b)
return a and a == b
def str_drop(a_str, drop_str):
if a_str and a_str.startswith(drop_str):
return a_str[len(drop_str):]
return a_str
def sane_msgtype(msgtype):
if not msgtype:
return msgtype
return sane_showname(msgtype).replace(' ','-')
re_msgtype_label = re.compile('message.type *', re.I)
def sane_showname(showname):
if not showname:
return showname
if ': ' in showname:
showname = showname[showname.index(': ')+1:]
if '(' in showname:
showname = showname[:showname.index('(')]
showname = re_msgtype_label.sub('', showname)
return showname.strip()
def dir_vals(elem):
strs = []
for name in dir(elem):
if name.startswith('_'):
continue
try:
strs.append('%r=%r' % (name, getattr(elem, name)))
except:
strs.append('%r=<Exception>' % (name))
return '\n' + '\n'.join(strs)
def dir_p(p, name):
return f'=== {name}\n{dir_vals(p.get(name))}\n---{name}'
def str_to_int(nr_str):
'convert hex or decimal string to int'
if not nr_str:
return None
elif nr_str.startswith('0x'):
return int(nr_str, 16)
else:
return int(nr_str, 10)
def out_text(*args, **kwargs):
print(*args, **kwargs)
g_ui = None
g_current_msg = None
g_debug_full = False
def to_text(*args, **kwargs):
if kwargs:
args = list(args) + [repr(kwargs)]
return ' '.join(str(arg) for arg in args)
def out_text_now(*args, **kwargs):
if g_ui is not None:
g_ui.out_text_now(*args, **kwargs)
else:
print(to_text(*args, **kwargs))
def LOG(*args, **kwargs):
if g_current_msg is not None:
g_current_msg.log(*args, **kwargs)
else:
out_text_now(*args, **kwargs)
def DBG(*args, **kwargs):
if g_current_msg is not None:
g_current_msg.dbg(*args, **kwargs)
else:
out_text_now(*args, **kwargs)
def ERR(*args, **kwargs):
LOG(Color.colored('red', '***** ERROR:'), *args, **kwargs)
def trace():
return ''.join(traceback.format_stack())
def because_str(because):
if not because:
return '-'
t = ['BECAUSE']
for b in because:
if isinstance(b, tuple) or isinstance(b, list):
t.extend(because_str(b).splitlines())
elif isinstance(b, str):
t.append(b)
elif isinstance(b, Message):
t.append(b.str(show_traits=True, show_conns=True))
else:
t.append(str(b))
return '\n|'.join(t)
def out_error(*args, **kwargs):
if g_ui is not None:
g_ui.out_error(*args, **kwargs)
else:
out_text_now(Color.colored('red', '*** ERROR:'), *args, **kwargs)
if g_current_msg:
out_text_now(Color.colored('red', '*** ERROR: while processing msg'), g_current_msg.str(show_traits=True, show_conns=True))
out_text_now(trace())
def tmsi_standardize(tmsi):
try:
return format(int(tmsi), '08x')
except:
return None
# a dict containing a list for each key; l.add(name, item) adds item to the list at key=name.
class listdict(dict):
'''A dict where each entry is a list of items'''
def _have(ld, name):
l = ld.get(name)
if not l:
l = []
ld[name] = l
return l
def add(ld, name, item):
l = ld._have(name)
l.append(item)
return ld
def add_dict(ld, d):
for k,v in d.items():
ld.add(k, v)
def update(ld, other_ld):
for name, items in other_ld.items():
ld.extend(name, items)
return ld
def extend(ld, name, vals):
l = ld._have(name)
l.extend(vals)
return ld
def remove(ld, name, item):
l = ld.get(name)
if item in l:
l.remove(item)
def _have(ld, name):
l = ld.get(name)
if not l:
l = []
ld[name] = l
return l
def add(ld, name, item):
l = ld._have(name)
l.append(item)
return ld
def add_dict(ld, d):
for k,v in d.items():
ld.add(k, v)
def update(ld, other_ld):
for name, items in other_ld.items():
ld.extend(name, items)
return ld
def extend(ld, name, vals):
l = ld._have(name)
l.extend(vals)
return ld
def remove(ld, name, item):
l = ld.get(name)
if item in l:
l.remove(item)
class UniqueList(list):
def append(s, item):
if item in s or item is None:
return False
super().append(item)
return True
def insert(s, idx, item):
if item in s or item is None:
return False
super().insert(idx, item)
return True
def extend(s, items):
added = 0
for item in items:
if s.append(item):
added += 1
return added
class NamedIds(dict):
def __init__(s, start_id:int=1):
set_instance_vars_from_args()
def next(s, name):
next_id = s.get(name, s.start_id)
s[name] = next_id + 1
return next_id
class Packet:
def __init__(s, idx, cap_p):
set_instance_vars_from_args()
# sanitize impossible attr with dot in its name,
# seen gsm_a.bssmap and gsm_a.dtap
for name in dir(s.cap_p):
if '.' in name:
new_name = name.replace('.', '_')
elif not name:
new_name = 'unnamed'
else:
continue
setattr(s.cap_p, new_name, getattr(s.cap_p, name))
@classmethod
def pget(cls, cap_p, tokens, ifnone=None):
if cap_p is None or len(tokens) < 1:
return ifnone
p_field = getattr(cap_p, tokens[0], None)
if p_field is None:
p_field = getattr(cap_p, '.'.join(tokens), None)
if p_field is None:
return ifnone
if len(tokens) > 1:
return Packet.pget(p_field, tokens[1:], ifnone=ifnone)
return p_field
def get(s, field, ifnone=None):
return Packet.pget(s.cap_p, field.split('.'), ifnone=ifnone)
def str(s, elem_name=None):
strs = []
if elem_name:
elem = s.get(elem_name)
else:
elem = s.cap_p
return dir_vals(elem);
def field_names(s, elem_name=None, elem=None):
strs = ['', f'=== {elem_name} ===']
if elem is None:
elem = s.get(elem_name)
if not elem:
strs.append('None')
else:
for f in elem._get_all_fields_with_alternates():
for n in dir(f):
if n.startswith('_'):
continue
strs.append('%r=%r' % (n, getattr(f, n)))
return '\n'.join(strs)
def all_str(s, elem_name=None, elem=None, depth=1000):
strs = []
if elem is None:
if elem_name:
elem = s.get(elem_name)
else:
elem = s.cap_p
elem_name = '/'
strs.append('%s:' % elem_name)
for name in dir(elem):
if name.startswith('_') or name.endswith('_value') or name in [
'sort', 'reverse', 'remove', 'pop', 'insert', 'index', 'extend', 'count',
'copy', 'clear', 'append', 'zfill', 'max', 'min', 'resolution',
]:
continue
try:
full_name = '%s.%s' % (elem_name, name)
val = getattr(elem, name)
if callable(val) or name in ['base16_value']:
continue
strs.append('%r=%r' % (full_name, val))
if depth and type(val) not in [int, str]:
strs.append(s.all_str(full_name, val, depth-1))
except:
pass
if hasattr(elem, '_get_all_fields_with_alternates'):
for f in elem._get_all_fields_with_alternates():
for n in dir(f):
if n.startswith('_'):
continue
full_name = '%r[%r]' % (elem_name, n)
try:
val = getattr(f, n)
except:
continue
if callable(val):
continue
strs.append('%s=%r' % (full_name, val))
if depth:
strs.append(s.all_str(full_name, val, depth-1))
return '\n'.join(strs)
class IpPort:
all_ports = {}
@classmethod
def _key(cls, ip, port):
return f'{ip}:{port}'
@classmethod
def get(cls, ip, port, entity=None, proto=None, create=True):
o = IpPort.all_ports.get(IpPort._key(ip, port))
if o is None and create:
return IpPort(ip, port, entity, proto)
if o is not None:
if entity is not None:
if o.entity is not None and o.entity is not entity:
ERR('Port changes:', o, 'to', entity)
entity.add_port(o.proto, o)
return o
def __init__(s, ip:str=None, port:str=None, entity=None, proto=None):
set_instance_vars_from_args()
IpPort.all_ports[s.key()] = s
def __repr__(s):
r = ''
tokens = []
if s.entity:
tokens.append(f'{s.entity}')
if s.proto:
tokens.append(f'{s.proto}')
tokens.append(s.key())
return '-'.join(tokens)
def __eq__(s, other):
return s is other
def __hash__(s):
return hash(s.key())
def key(s):
return IpPort._key(s.ip, s.port)
@classmethod
def from_sdp(p:Packet):
ip = p.get('sdp.connection_info_address')
port = p.get('sdp.media_port')
return IpPort.get(ip, port)
@classmethod
def _from_ip(cls, p:Packet, src_dst:str, port:int):
ip = p.get('ip.' + src_dst)
if ip is None:
ipv6 = p.get('ipv6.' + src_dst)
if ipv6 is not None:
ip = '[' + ipv6 + ']'
return IpPort.get(ip, port)
@classmethod
def from_ip_source(cls, p:Packet, port:int):
return cls._from_ip(p, 'src', port)
@classmethod
def from_ip_dest(cls, p:Packet, port:int):
return cls._from_ip(p, 'dst', port)
@classmethod
def from_udp_source(cls, p:Packet):
return cls.from_ip_source(p, p.get('udp.srcport'))
@classmethod
def from_udp_dest(cls, p:Packet):
return cls.from_ip_dest(p, p.get('udp.dstport'))
@classmethod
def from_tcp_source(cls, p:Packet):
return cls.from_ip_source(p, p.get('tcp.srcport'))
@classmethod
def from_tcp_dest(cls, p:Packet):
return cls.from_ip_dest(p, p.get('tcp.dstport'))
@classmethod
def from_sctp_source(cls, p:Packet):
return cls.from_ip_source(p, p.get('sctp.srcport'))
@classmethod
def from_sctp_dest(cls, p:Packet):
return cls.from_ip_dest(p, p.get('sctp.dstport'))
class Message:
pass
class Trait:
def __init__(s, **kwargs):
if len(kwargs) > 1:
raise Exception('only one trait allowed per Trait(): %r' % kwargs)
for k, v in kwargs.items():
s.name = k
s.val = v
def __repr__(s):
return '%r=%r' % (s.name, s.val)
class Traits(collections.OrderedDict):
def __init__(s, *args, **kwargs):
for arg in args:
s.add(arg)
s.set_vals(**kwargs)
def add(s, trait:Trait):
s[trait.name] = trait
def set(s, name, val):
if val is not None:
s.add(Trait(**{name: val}))
def set_vals(s, **kwargs):
for k,v in kwargs.items():
if v is None:
continue
if type(v) not in (str, int, float, bool, IpPort):
v = str(v)
s.set(k, v)
def __repr__(s):
strs = []
for k,trait in s.items():
assert k == trait.name
strs.append(repr(trait))
return '{%s}' % ', '.join(strs)
def find_recent_msg(msg:Message, messages:list, my_idx:int, condition_func, max_t=1):
for i in reversed(range(my_idx)):
prev_msg = messages[i]
if not prev_msg:
continue
if prev_msg.finalized:
return None
if msg.timestamp - prev_msg.timestamp > max_t:
return None
if condition_func(prev_msg):
yield prev_msg
return None
def find_same_trait(msg:Message, messages:list, my_idx:int, proto:str, name:str, max_t=1, operator=any):
def same_traits(prev_msg):
return msg.same_traits(prev_msg, proto, name, operator=operator)
return find_recent_msg(msg, messages, my_idx, same_traits, max_t)
class dddict(dict):
@classmethod
def _get(cls, d, keys, create=False):
if not keys:
return d
k = keys[0]
v = d.get(k)
if len(keys) > 0:
if v is None:
if create:
v = {}
d[k] = v
else:
return None
r = cls._get(v, keys[1:], create=create)
return r
def gget(s, keys, create=None):
v = dddict._get(s, keys, create=False)
if v is None:
if create is None:
return None
else:
return s.sset(keys, create)
return v
def sset(s, keys, item):
d = dddict._get(s, keys[:-1], create=True)
d[keys[-1]] = item
return item
def ppop(s, keys):
d = dddict._get(s, keys[:-1])
if d is None:
return None
r = d.pop(keys[-1])
if not d:
if len(keys) > 1:
s.ppop(keys[:-1])
return r
@classmethod
def _count(cls, d):
if isinstance(d, dict):
count = 0
for val in d.values():
count += cls._count(val)
return count
return 1
def count(s):
return dddict._count(s)
@classmethod
def _all(cls, d):
if isinstance(d, dict):
for val in d.values():
yield from cls._all(val)
else:
yield d
def all(s):
return dddict._all(s)
class Conn:
open_conns = dddict()
closed_conns = dddict()
'''One end of a time-limited connection for a protocol layer'''
def __init__(s, proto:str, port:IpPort, conn_id:str, start_msg:Message, close_msg:Message=None, idx=-1,
entity=None, counterparts:list=[], subscriber_conn=None, merge_counterparts=True, add_message=True,
meta=False):
set_instance_vars_from_args('entity', 'add_message', 'merge_counterparts')
s.tx_messages = UniqueList()
s.rx_messages = UniqueList()
if s.subscriber_conn:
s.subscriber_conn.conns.append(s)
s.keys = (proto, port.key(), conn_id)
Conn.open_conns.sset(s.keys, s)
# A counterpart is the same conn seen from the other side.
# For example, if a BSC opens a conn, the conterpart is the MSC's perspective on the same conn.
s.counterparts = UniqueList()
for cp in counterparts:
if cp is None:
continue
s.counterparts.append(cp)
cp.counterparts.append(s)
if entity:
entity.add_port(proto, port, from_msg=start_msg)
if add_message:
s.add_message(start_msg)
if not counterparts:
LOG(Color.colored('green', f'New conn (now {Conn.count_open_conns()})'), s.proto, '%r' % s.conn_id)
if merge_counterparts:
for other_conn in s.counterparts:
s.merge_subscr_conns(other_conn)
@classmethod
def _find(cls, keys, find_in_closed_conns=False):
ret = Conn.open_conns.gget(keys)
if ret is not None:
return ret
if find_in_closed_conns:
return Conn.closed_conns.gget(keys)
return None
@classmethod
def find(cls, proto:str, port:IpPort, conn_id:str, find_in_closed_conns=False):
return cls._find((proto, port.key(), conn_id), find_in_closed_conns=find_in_closed_conns)
@classmethod
def open(cls, proto:str, port:IpPort, conn_id:str, *args, **kwargs):
exists = cls.find(proto, port, conn_id)
if exists is not None:
ERR('Conn already open:', type(exists), repr(exists))
LOG(trace())
return Conn(proto, port, conn_id, *args, **kwargs)
@classmethod
def find_or_open(cls, proto:str, port:IpPort, conn_id:str, *args, **kwargs):
exists = cls.find(proto, port, conn_id)
if exists is not None:
return exists
return Conn(proto, port, conn_id, *args, **kwargs)
@classmethod
def open_2way(cls, proto:str, conn_id:str, msg:Message, *args, **kwargs):
conn1 = cls.open(proto, msg.src, conn_id, msg, *args, **kwargs)
cls.open(proto, msg.dst, conn_id, msg, *args, counterparts=[conn1], **kwargs)
return conn1
@classmethod
def close_conn(cls, conn, msg):
conn._add_message(msg)
conn.close_msg = msg
Conn.open_conns.ppop(conn.keys)
l = Conn.closed_conns.gget(conn.keys, create=[])
l.append(conn)
LOG(Color.colored('blue', f'Close conn ({Conn.count_open_conns()} left)'), conn)
@classmethod
def close(cls, proto, port, conn_id, close_msg, close_counterparts=True, if_exists=False):
conn = Conn.find(proto, port, conn_id)
if conn is None or not conn.is_open():
if if_exists == False:
ERR('Cannot close, conn not open:', Conn.key_label(proto, port, conn_id))
return None
Conn.close_conn(conn, close_msg)
assert Conn.find(proto, port, conn_id) is None
if close_counterparts:
for cp in conn.counterparts:
if cp.is_open():
Conn.close_conn(cp, close_msg)
return conn
@classmethod
def message(cls, proto, port, conn_id, msg):
conn = cls.find(proto, port, conn_id)
if conn is None:
return None
conn.add_message(msg)
return conn
def _add_message(s, msg):
if s.port == msg.src:
if s.meta:
msg.meta_conns.append(s)
else:
msg.src_conns.append(s)
s.tx_messages.append(msg)
elif s.port == msg.dst:
if s.meta:
msg.meta_conns.append(s)
else:
msg.dst_conns.append(s)
s.rx_messages.append(msg)
def add_message(s, msg, add_to_counterparts=True):
if s.close_msg:
out_error('Message on a closed conn', s, msg)
s._add_message(msg)
if add_to_counterparts:
for cp in s.counterparts:
cp._add_message(msg)
@classmethod
def key_label(cls, proto, port, conn_id):
if conn_id:
return f'{proto}:{conn_id}@{port}'
else:
return f'{proto}@{port}'
def label(s):
return s.key_label(s.proto, s.port, s.conn_id)
def __repr__(s):
tokens = [s.label()]
for r in s.counterparts:
if r is None:
tokens.append('None')
else:
tokens.append(r.label())
return ' -> '.join(tokens)
def merge_subscr_conns(s, other_conn):
if other_conn is None:
return
if not isinstance(other_conn, Conn):
for item in other_conn:
s.merge_subscr_conns(item)
return
assert isinstance(other_conn, Conn)
if s.subscriber_conn is not None and s.subscriber_conn is other_conn.subscriber_conn:
return
s.subscriber_conn = SubscriberConn.merge(s.subscriber_conn, other_conn.subscriber_conn)
if s.subscriber_conn is None:
s.subscriber_conn = SubscriberConn()
s.subscriber_conn.add_conn(s)
other_conn.subscriber_conn = s.subscriber_conn
s.subscriber_conn.add_conn(other_conn)
def find_entity(s, kind, with_port=None):
if s.subscriber_conn:
return s.subscriber_conn.find_entity(kind, with_port=with_port)
return None, None
def is_open(s):
return s.close_msg is None and Conn._find(s.keys) is s
@classmethod
def count_open_conns(cls):
count = 0
have = set()
for conn in cls.open_conns.all():
do_count = True
for conn2 in conn.counterparts:
if conn2.label() in have:
do_count = False
have.add(conn2.label())
if conn.label() in have:
do_count = False
have.add(conn.label())
if do_count:
count += 1
return count
class Layer:
_classes = {}
traits = None
proto = None
cap_p_name = None
def __init__(s, msg:Message, proto:str, msgtype:str, traits:Traits, minor=False, hidden=False, cap_p_name:str=None):
set_instance_vars_from_args()
if proto in msg.layers:
raise Exception(f'duplicate proto {proto} for message')
msg.layers[proto] = s
s.msgtype = sane_msgtype(s.msgtype)
if not s.cap_p_name:
s.cap_p_name = s.proto
s.__class__.proto = s.proto
s.__class__.cap_p_name = s.cap_p_name
def label(s):
if s.msgtype:
return f'{s.proto}.{s.msgtype}'
else:
return s.proto
def identify_entities(s, msg:Message, messages, my_idx):
'''return a list of Message.EntityIdent instances describing source and/or destination entity that message msg identifies.'''
return None
@classmethod
def identify_conns(s, messages, my_idx):
pass
def collapse(s, messages, my_idx):
'''return the message itself if it remains in messages, if another absorbed it return the other message,
or if if it is dropped completely return None'''
return messages[my_idx]
@classmethod
def classes(cls):
if not Layer._classes:
for cls in Layer.__subclasses__():
name = cls.__name__
if not name.startswith('Layer_'):
continue
proto_name = name[len('Layer_'):]
Layer._classes[proto_name] = cls
#cls.proto = proto_name
return Layer._classes
@classmethod
def parse(cls, msg:Message):
for proto_name,child_class in Layer.classes().items():
if not msg.p.get(proto_name):
continue
child_class(msg)
class Message:
def __init__(s, p:Packet, finalized=False):
set_instance_vars_from_args()
s.layers = collections.OrderedDict()
s.count = 1
s.count_back = 0
s.timestamp = float(p.cap_p.sniff_timestamp)
s.hide = False
s.src = None
s.dst = None
s.src_conns = []
s.dst_conns = []
s.meta_conns = []
s.absorbed = UniqueList()
s.strong_relation = True
s.debug = SHOW_ALL_DEBUG
s._log = []
def log(s, *text, **kwtext):
s._log.append(to_text(*text, *kwtext))
def dbg(s, *text, **kwtext):
if s.debug:
s._log.append(to_text(*text, *kwtext))
def is_minor(s):
return all(l.minor for l in s.layers.values())
def get_trait(s, proto:str, name:str, ifnone=None):
# allow alternative lists for proto, like s.get_trait(('tcp', 'udp'), 'src')
if proto is None:
proto = s.layers.keys()
if type(proto) is not str:
for proto_ in proto:
val = s.get_trait(proto_, name, None)
if val is not None:
return val
return ifnone
# allow alternative lists for name, like s.get_trait('tcp', ('src', 'dst))
if type(name) is not str:
for name_ in name:
val = s.get_trait(proto, name_, None)
if val is not None:
return val
return ifnone
if name == 'timestamp':
return int(s.timestamp)
layer = s.layers.get(proto, None)
if not layer:
return ifnone
if name == 'msgtype':
return layer.msgtype
trait = layer.traits.get(name, None)
if trait is None:
return ifnone
if trait.val is None:
return ifnone
return trait.val
def get_traits(s, proto=None, names=None, proto_and_names=None):
pn = []
if proto or names:
if proto is None or isinstance(proto, str):
proto = [proto]
for p in proto:
pn.append((p, names))
if proto_and_names:
pn.extend(proto_and_names)
for proto_, names in pn:
if proto_ is None:
proto_ = s.layers.keys()
if isinstance(proto_, str):
proto_ = [proto_]
for proto in proto_:
if names is None:
l = s.layers.get(proto, None)
if not l:
continue
names = l.traits.keys()
if type(names) is str:
names = [names]
for name in names:
result = s.get_trait(proto, name, ifnone=None)
if result is not None:
yield (proto, name, result)
def get_all_traits(s, proto:str):
layer = s.layers.get(proto)
if not layer:
return {}
return layer.traits
def same_traits(s, other_msg, proto:str, name:str, allow_unset=False, operator=all):
if type(proto) is not str:
return operator(
s.same_traits(other_msg, proto_, name, allow_unset=allow_unset)
for proto_ in proto
)
if name is None:
my_traits = s.get_all_traits(proto)
other_traits = other_msg.get_all_traits(proto)
names = set(my_traits.keys())
names.update(other_traits.keys())
name = list(names)
if type(name) is not str:
return operator(
s.same_traits(other_msg, proto, name_, allow_unset=allow_unset)
for name_ in name
)
val = s.get_trait(proto, name)
other_val = other_msg.get_trait(proto, name)
if not allow_unset:
if val is None or other_val is None:
return False
return val == other_val
def set_trait(s, proto, name, val):
layer = s.layers.get(proto, None)
if layer is None:
layer = Layer(s, proto, None, Traits(Trait(name, val)))
else:
layer.traits.set(name, val)
def collapse(s, messages, my_idx):
'''iterate backwards over recent messages and see if messages can be combined'''
orig_msg = messages[my_idx]
for layer in s.layers.values():
msg = layer.collapse(messages, my_idx)
if orig_msg is not msg:
break
return msg
def absorb_msg(s, other_msg):
global g_current_msg
if g_current_msg is other_msg:
g_current_msg = s
s._log.extend(other_msg._log)
if other_msg and other_msg is not s:
s.absorbed.append(other_msg)
if other_msg.absorbed:
other_absorbed = other_msg.absorbed
other_msg.absorbed = []
for oa in other_absorbed:
s.absorb_msg(oa)
def identify_conns(s, messages, my_idx):
msg = messages[my_idx]
for layer_class in Layer.classes().values():
if layer_class.proto not in msg.layers:
continue
layer_class.identify_conns(messages, my_idx)
class EntityIdent:
def __init__(s, proto=None, src_port=None, src_kind=None, src_entity=None, dst_port=None, dst_kind=None, dst_entity=None, rename=False):
set_instance_vars_from_args()
def identify_entities(s, messages, my_idx):
'''From protocol and message discriminators, see if we can identify the src and dst port of the message
to be of a specific core network entity.'''
for layer in s.layers.values():
try:
identifieds = layer.identify_entities(s, messages, my_idx)
if identifieds is None:
continue
if isinstance(identifieds, Message.EntityIdent):
identifieds = [identifieds]
for ident in identifieds:
if ident is None:
continue
Entity.find_or_create(ident.proto, ident.src_kind,
ident.src_port or s.src,
ident.src_entity, from_msg=s,
rename=(ident.rename is True or ident.rename == 'src'))
Entity.find_or_create(ident.proto, ident.dst_kind,
ident.dst_port or s.dst,
ident.dst_entity, from_msg=s,
rename=(ident.rename is True or ident.rename == 'dst'))
except:
raise Exception(f'In layer {layer} {s}')
def find_entity(s, kind, with_port=None):
for conn in (s.src_conns + s.dst_conns):
ret = conn.find_entity(kind, with_port=with_port)
if ret and ret[0] is not None:
return ret
return None, None
def get_port(s, entity_kind:str):
if s.src_entity_is(entity_kind):
return s.src
elif s.dst_entity_is(entity_kind):
return s.dst
return None
def entity(s, *kinds):
if s.src.entity and s.src.entity.kind in kinds:
return s.src.entity
if s.dst.entity and s.dst.entity.kind in kinds:
return s.dst.entity
def src_entity_is(s, *kinds):
return s.src.entity and s.src.entity.kind in kinds
def dst_entity_is(s, *kinds):
return s.dst.entity and s.dst.entity.kind in kinds
def same_src_dst(s, other, forward=None, reverse=None):
# assume forward and reverse if neither are set.
# if one of them is set to True, assume the other as False.
if forward is None and reverse is None:
forward = True
reverse = True
a = (s.src.key(), s.dst.key())
b = (other.src.key(), other.dst.key())
if forward and reverse:
return set(a) == set(b)
elif forward:
return a == b
elif reverse:
return a == tuple(reversed(b))
else:
return False
@classmethod
def parse(cls, p:Packet):
msg = Message(p)
Layer.parse(msg)
msg.src = msg.get_trait(('tcp','udp','sctp'), 'src')
msg.dst = msg.get_trait(('tcp','udp','sctp'), 'dst')
if msg.src is None or msg.dst is None:
return None
assert isinstance(msg.src, IpPort)
assert isinstance(msg.dst, IpPort)
return msg
def label(s):
label = []
for l in s.layers.values():
if not SHOW_ALL_LAYERS:
if l.minor:
continue
if l.hidden and not all((ll.minor or ll.hidden) for ll in s.layers.values()):
continue
label.insert(0, l.label())
return '/'.join(label)
def related_subscribers(s):
subscribers = UniqueList()
src_sc = s.src_subscriber_conn()
if src_sc:
subscribers.append(src_sc.subscriber)
dst_sc = s.dst_subscriber_conn()
if dst_sc:
subscribers.append(dst_sc.subscriber)
for a in s.absorbed:
subscribers.extend(a.related_subscribers())
return subscribers
def is_subscriber_related(s, subscriber):
return subscriber in s.related_subscribers()
def __repr__(s):
return s.__str__()
def __str__(s):
return s.str()
def str(s, ladder=False, one_column_per_kind=False, show_traits=True, show_conns=True):
t_name = []
t_name.extend(subscr.label() for subscr in s.related_subscribers())
name = s.label()
if name:
t_name.append(name)
src = str(s.src)
dst = str(s.dst)
if s.src.entity is not None:
src_str = s.src.entity.label()
else:
src_str = src
if s.count and dst == src:
dst_str = '(self)'
elif s.dst.entity is not None:
dst_str = s.dst.entity.label()
else:
dst_str = dst
src_pos = 0
dst_pos = 0
if s.src.entity:
src_pos = s.src.entity.labelcolumn(one_column_per_kind)
if s.dst.entity:
dst_pos = s.dst.entity.labelcolumn(one_column_per_kind)
# allows injecting informational fake messages (Entity.news)
if not s.count and not s.count_back:
dst_pos = src_pos
if not s.src.entity and not s.dst.entity:
if src > dst:
src_pos = dst_pos + 1
else:
dst_pos = src_pos + 1
if not ladder:
if src_pos > dst_pos:
src_pos = 1
dst_pos = 0
else:
src_pos = 0
dst_pos = 1
if src_pos <= dst_pos:
left_pos = src_pos
right_pos = dst_pos
left_label = src_str
right_label = dst_str
to_left_count = s.count_back
to_right_count = s.count
else:
left_pos = dst_pos
right_pos = src_pos
left_label = dst_str
right_label = src_str
to_left_count = s.count
to_right_count = s.count_back
left_strs = []
left_strs.append(left_label)
if to_left_count:
left_strs.append('<')
if to_left_count > 1:
left_strs.append(f'{to_left_count}')
right_strs = []
if to_right_count:
if to_right_count > 1:
right_strs.append(f'{to_right_count}')
right_strs.append('>')
right_strs.append(right_label)
real_left_pos = max(0, left_pos - (len(left_label)//2))
real_right_pos = right_pos - (len(right_label)//2) + len(right_label) + 1 - (len(right_label)&1)
left_str = ''.join(left_strs)
right_str = ''.join(right_strs)
mid_gap = real_right_pos - real_left_pos - len(right_str) - len(left_str)
mid_gap = max(1, mid_gap)
if not ladder:
mid_name_margin = 6
else:
mid_name_margin = mid_gap - len(name)
if to_left_count or to_right_count:
if mid_name_margin > 50:
mid_gap_strs = ['-' * int(mid_name_margin // 2),
name,
'-' * int(mid_name_margin - (mid_name_margin//2))]
name = ''
else:
mid_gap_strs = ['-' * int(mid_gap)]
else:
mid_gap_strs = []
t = [' ' * int(real_left_pos),
left_str,]
t.extend(mid_gap_strs)
t.append(right_str)
if ladder:
t = [''.join(t)]
right_end = len(t[0])
label_pos = Entity.textcol_one_per_kind if one_column_per_kind else Entity.textcol_one_per_entity
diff = label_pos - right_end
if diff > 0:
t.append(' ' * int(diff))
if show_traits:
if isinstance(show_traits, str):
show_traits = [show_traits]
for proto,l in s.layers.items():
if not l.traits:
continue
if (show_traits is not True) and (proto not in show_traits):
continue
t_name.append('%s%s' % (proto, l.traits))
if show_conns:
conns = set()
for c in (s.src_conns + s.dst_conns):
conns.add(f'{c.proto}:{c.conn_id}')
#conns.add(f'{c}')
t_name.append(' '.join(conns))
idxs = [s.p.idx] + [a.p.idx for a in s.absorbed]
if len(idxs) <= 3:
t_name.append('+'.join(str(i) for i in sorted(idxs)))
else:
t_name.append(f'{min(idxs)}-{max(idxs)}')
t_name.append(f'{s.timestamp:.3f}')
t.append(' ')
t = [''.join(t)]
indent = '\n' + (' ' * len(t[0]) + ' | ')
t.append(' '.join(t_name))
for l in s._log:
t.append(indent)
t.append(l)
return ''.join(t)
def src_subscriber_conn(s):
for conn in s.src_conns:
if conn.subscriber_conn is not None:
return conn.subscriber_conn
return None
def dst_subscriber_conn(s):
for conn in s.dst_conns:
if conn.subscriber_conn is not None:
return conn.subscriber_conn
return None
def find_message(s, proto, trait, value):
for subscr_conn in (s.src_subscriber_conn(), s.dst_subscriber_conn()):
if subscr_conn is None:
continue
msg = subscr_conn.find_message(proto, trait, value)
if msg:
return msg
return None
class Entity:
'''A core network program like BSC, MSC, ...'''
KINDS_SORTING = ('MS', 'BTS', 'PCU', 'hNodeB', 'BSC', 'MGW@BSC', 'HNBGW', 'STP', 'MSC', 'MGW@MSC', 'MGW',
'SGSN', 'HLR', 'SIP', 'SIPcon', 'PBX', 'GGSN')
KINDS_SORTING_EXIST = ()
entities = listdict()
state_version = 1 # whether to update cached text columns
spacing = 5
label_spacing = 2
textcol_one_per_kind = 0
textcol_one_per_entity = 0
# proxy / forwarding addresses to ignore, like the STP port
blacklist = []
def __init__(s, kind:str):
set_instance_vars_from_args()
s.idx = None
s.state_version = 0
s.ports = listdict()
s.labelcol_one_per_kind = 0
s.labelcol_one_per_entity = 0
Entity.add(s)
@classmethod
def add(cls, entity):
Entity.entities.add(entity.kind, entity)
entity.idx = entity.idx_in_kind()
if entity.kind not in Entity.KINDS_SORTING_EXIST:
# a new kind has come up, refresh Entity.KINDS_SORTING_EXIST
exist = []
for k in Entity.KINDS_SORTING:
if k in Entity.entities.keys():
exist.append(k)
for k in Entity.entities.keys():
if k not in exist:
exist.append(k)
Entity.KINDS_SORTING_EXIST = tuple(exist)
Entity.state_version += 1
@classmethod
def count_entities(cls, kind):
l = Entity.entities.get(kind)
return len(l)
@classmethod
def add_to_blacklist(cls, port:IpPort):
if port in cls.blacklist:
return
cls.blacklist.append(port);
def rename(s, to_kind):
Entity.entities.remove(s.kind, s)
was_kind = s.kind
s.kind = to_kind
Entity.add(s)
for proto,l in s.ports.items():
for port in l:
LOG(Color.colored('yellow', 'Rename port'), 'from', was_kind, 'to', s, proto, port)
@classmethod
def find_or_create(cls, proto, kind, port, matched_entity=None, from_msg=None, rename=False):
if not port:
return None
if port in Entity.blacklist:
return None
found_entity = port.entity
if found_entity and matched_entity and (found_entity is not matched_entity):
LOG(Color.colored('purple', 'Renaming entity port:'), port, 'to', matched_entity)
if not matched_entity:
matched_entity = found_entity
if matched_entity:
if kind and matched_entity.kind != kind and rename:
matched_entity.rename(kind)
matched_entity.add_port(proto, port, from_msg=from_msg)
return matched_entity
if kind is None or rename:
for l in Entity.entities.values():
for e in l:
if e.has_port(port):
if kind and e.kind != kind and rename:
e.rename(kind)
return e
if kind is None:
return None
else:
l = Entity.entities.get(kind)
if l:
for e in l:
if e.has_port(port):
return e
e = Entity(kind)
e.add_port(proto, port, from_msg=from_msg)
return e
def absorb(s, other_entity):
'''Merge two entities to one, adopting the other's ports'''
for port in other_entity.ports:
s.add_port(port)
del other_entity
def label(s):
idx = ''
if s.idx:
idx = str(s.idx + 1)
return f'{s.kind}{idx}'
def __repr__(s):
return s.label()
def __str__(s):
return repr(s)
def kind_idx(s):
'''this entity kind's position in the currently known entity kinds:
For 'BSC', if we've seen BTS, BSC and MSC, return 1.'''
return Entity.KINDS_SORTING_EXIST.index(s.kind)
def idx_in_all(s):
'''this entity kind's position in all currently known entities:
For the second 'BSC', if we've seen 2 BTS, 3 BSC and 1 MSC, return 2 (BTS) + 1 (second BSC) = 3.'''
idx = 0
for k in Entity.KINDS_SORTING_EXIST:
if k == s.kind:
idx += Entity.entities.get(s.kind).index(s)
return idx
idx += Entity.count_entities(k)
return idx
def idx_in_kind(s):
'''this entity kind's position in the list of entities of the same kind'''
return Entity.entities.get(s.kind).index(s)
def check_update_state(s):
if s.state_version == Entity.state_version:
return
Entity.calculate_textcolumns()
s.state_version = Entity.state_version
def labelcolumn(s, one_column_per_kind=False, mid=True):
s.check_update_state()
if one_column_per_kind:
midcol = s.labelcol_one_per_kind
else:
midcol = s.labelcol_one_per_entity
if mid:
ret = midcol
else:
ret = int(midcol - (len(s.label()) // 2))
return ret
@classmethod
def calculate_textcolumns(cls):
'''In text rendering of a ladder diagram, return the text column for this entity,
if rendering each entity in its own column (not sharing one column per entity kind)'''
entity_col = 0
kind_col = 0
for k in Entity.KINDS_SORTING_EXIST:
l = Entity.entities.get(k)
kind_col += len(k) // 2
for e in l:
e.labelcol_one_per_kind = kind_col
Entity.textcol_one_per_kind = kind_col + len(e.label()) + Entity.spacing
entity_col += len(e.kind)//2
e.labelcol_one_per_entity = entity_col
entity_col += len(e.label()) + Entity.spacing
Entity.textcol_one_per_entity = entity_col
kind_col += len(k) + Entity.spacing
def has_port(s, port, proto=None):
if proto:
if port in s.ports.get(proto, []):
return proto
return None
for proto,l in s.ports.items():
if port in l:
return proto
return None
def remove_port(s, port):
for proto,l in s.ports.items():
if port in l:
l.remove(port)
port.entity = None
return
def add_port(s, proto, port, from_msg=None):
if port.entity and port.entity is not s:
port.entity.remove_port(port)
if s.has_port(port, proto=proto):
return
s.ports.add(proto, port)
port.entity = s
port.proto = proto
LOG(Color.colored('cyan', 'New port:'), port)
class Subscriber:
next_ident = 1
next_tmsi_idx = 1
imsis = {}
tmsis = {}
imeis = {}
msisdns = {}
def __init__(s, imsi:str=None, tmsi=None, imei=None, msisdn=None):
set_instance_vars_from_args()
s.subscriber_conns = UniqueList()
s.tmsis = set()
s.tmsi_idx = 0
s.set_last_tmsi(tmsi)
s.ident = Subscriber.next_ident
Subscriber.next_ident += 1
Subscriber.update_dicts(s)
def set_last_tmsi(s, tmsi):
if tmsi is None:
return
s.tmsi = tmsi
if (not isinstance(tmsi, str)) or len(str(tmsi)) < 8:
raise Exception('Invalid TMSI: ' + str(tmsi))
s.tmsi_idx = Subscriber.next_tmsi_idx
Subscriber.next_tmsi_idx += 1
s.tmsis.add(tmsi)
Subscriber.tmsis[s.tmsi] = s
@classmethod
def update_dicts(cls, s):
if s.imsi:
Subscriber.imsis[s.imsi] = s
for tmsi in s.tmsis:
Subscriber.tmsis[tmsi] = s
if s.imei:
Subscriber.imeis[s.imei] = s
if s.msisdn:
Subscriber.msisdns[s.msisdn] = s
def label(s, full=False):
l = []
if full or (not s.imsi and not s.tmsi and not s.imei and not s.msisdn):
l.append(f'subscr{s.ident}')
if s.imsi and (full or not s.msisdn):
l.append(f'imsi{s.imsi}')
if s.tmsi and (full or not s.imsi):
l.append(f'tmsi{s.tmsi}')
if s.imei and (full or (not s.imsi and not s.tmsi)):
l.append(f'imei{s.imei}')
if s.msisdn:
l.append(f'msisdn{s.msisdn}')
return Color.colored(s.ident, ':'.join(l))
def __repr__(s):
return s.label(full=True)
def __str__(s):
return s.label()
@classmethod
def identify_subscriber(cls, msg:Message):
imsi = msg.get_trait(('dtap','bssmap','rsl','gsup'), 'imsi')
tmsi = tmsi_standardize(msg.get_trait(('dtap','bssmap','rsl'), 'tmsi'))
imei = msg.get_trait('dtap', 'imei')
msisdn = msg.get_trait('gsup', 'msisdn')
if not (imsi or tmsi or imei or msisdn):
return
#if not msg.src_conns and not msg.dst_conns:
# return
# could start out with subscr = None, but to use a few less subscriber.ids start out with a present
# subscriber, if any.
subscr_conn = msg.src_subscriber_conn() or msg.dst_subscriber_conn()
if subscr_conn is not None:
subscr = subscr_conn.subscriber
else:
subscr = None
if imsi:
subscr = Subscriber.merge(Subscriber.by_imsi(imsi), subscr)
if tmsi:
subscr = Subscriber.merge(Subscriber.by_tmsi(tmsi), subscr)
if imei:
subscr = Subscriber.merge(Subscriber.by_imei(imei), subscr)
if msisdn:
subscr = Subscriber.merge(Subscriber.by_msisdn(msisdn), subscr)
if subscr is None:
return
if subscr_conn is None:
subscr_conn = SubscriberConn()
subscr_conn.set_subscriber(subscr)
for c in (msg.src_conns + msg.dst_conns):
if c.subscriber_conn is None:
subscr_conn.add_conn(c)
c.subscriber_conn = subscr_conn
else:
c.subscriber_conn = SubscriberConn.merge(c.subscriber_conn, subscr_conn)
@classmethod
def by_imsi(cls, imsi):
subscr = Subscriber.imsis.get(imsi)
if not subscr:
subscr = Subscriber(imsi=imsi)
return subscr
@classmethod
def by_tmsi(cls, tmsi):
subscr = Subscriber.tmsis.get(tmsi)
if not subscr:
subscr = Subscriber(tmsi=tmsi)
return subscr
@classmethod
def by_imei(cls, imei):
subscr = Subscriber.imeis.get(imei)
if not subscr:
subscr = Subscriber(imei=imei)
return subscr
@classmethod
def by_msisdn(cls, msisdn):
subscr = Subscriber.msisdns.get(msisdn)
if not subscr:
subscr = Subscriber(msisdn=msisdn)
return subscr
@classmethod
def merge(cls, a, b):
assert a is None or isinstance(a, Subscriber)
assert b is None or isinstance(b, Subscriber)
if a is None and b is None:
return None
if a is None:
return b
if b is None or b is a:
return a
if not a.imsi and a.ident > b.ident:
return cls.merge(b, a)
if a.imsi and b.imsi and a.imsi != b.imsi:
out_error(f'cannot absorb, subscriber would change IMSI: {b.imsi} -> {a.imsi}')
return None
if a.imei and b.imei and a.imei != b.imei:
out_error(f'cannot absorb, subscriber would change IMEI: {b.imei} -> {a.imei}')
return None
if b.imsi:
a.imsi = b.imsi
b.imsi = None
if b.tmsis:
a.tmsis.update(b.tmsis)
b.tmsis = set()
if b.tmsi_idx > a.tmsi_idx:
a.set_last_tmsi(b.tmsi)
b.tmsi = None
if b.imei:
a.imei = b.imei
b.imei = None
if b.msisdn:
if a.msisdn and a.msisdn != b.msisdn:
LOG(f'subscriber {a} changes MSISDN: {a.msisdn} -> {b.msisdn}')
a.msisdn = b.msisdn
b.msisdn = None
Subscriber.update_dicts(a)
for sc in b.subscriber_conns:
a.add_subscriber_conn(sc)
b.subscriber_conns = []
return a
def add_subscriber_conn(s, subscriber_conn):
if subscriber_conn.subscriber is s:
return
if subscriber_conn.subscriber:
subscriber_conn.subscriber.subscriber_conns.remove(subscriber_conn)
s.subscriber_conns.append(subscriber_conn)
subscriber_conn.subscriber = s
assert subscriber_conn in subscriber_conn.subscriber.subscriber_conns
def find_entity(s, kind, with_port=None):
for subscriber_conn in reversed(s.subscriber_conns):
found, found_subscriber_conn = subscriber_conn.find_entity(kind, ask_subscriber=False,
with_port=with_port)
if found is not None:
return found, found_subscriber_conn
return None, None
class SubscriberConn:
'''A SubscriberConn is a collection of conns that feed into each other.
For example, the Abis and the BSSMAP link for the same subscriber are related, as well as the MGCP and
RTP spoken for that subscriber.
If a subscriber disconnects and connects again, that is a new separate SubscriberConn;
also if a subscriber would concurrently attach in twice somehow, that would be separate SubscriberConn
instances.
Note that a Message's src_conns and a dst_conns are not necessarily listed in the same SubscriberConn, for example
for RTP, SIP or SMS, the messages may pass from one subscriber to another.'''
def __init__(s, subscriber=None):
set_instance_vars_from_args()
s.conns = UniqueList()
@classmethod
def merge(cls, a, b):
assert a is None or isinstance(a, SubscriberConn)
assert b is None or isinstance(b, SubscriberConn)
if a is None and b is None:
return None
if a is None:
return b
if b is None or b is a:
return a
b_subscr = b.subscriber
if b.subscriber:
b.subscriber.subscriber_conns.remove(b)
b.subscriber = None
a.subscriber = Subscriber.merge(a.subscriber, b_subscr)
if a.subscriber:
a.subscriber.subscriber_conns.append(a)
for conn in b.conns:
conn.subscriber_conn = a
a.conns.append(conn)
b.conns = None
return a
def find_entity(s, kind, with_port=None, ask_subscriber=True):
if ask_subscriber and s.subscriber is not None:
return s.subscriber.find_entity(kind, with_port=with_port)
if with_port is not None and isinstance(with_port, str):
with_port = [with_port]
for conn in reversed(s.conns):
if with_port is not None and conn.port.proto not in with_port:
continue
if conn.port.entity is not None and conn.port.entity.kind == kind:
return conn.port.entity, s
return None, None
def find_message(s, proto, trait, val):
for conn in reversed(s.conns):
for msgs in (conn.tx_messages, conn.rx_messages):
for msg in msgs:
for p, t, v in msg.get_traits(proto, trait):
if val is None or val == v:
return msg
return None
def set_subscriber(s, subscriber):
s.subscriber = subscriber
if subscriber is not None:
subscriber.subscriber_conns.append(s)
def add_conn(s, conn):
s.conns.append(conn)
conn.subscriber_conn = s
def __repr__(s):
return f'{s.subscriber}~{s.conns}'
class Layer_tcp(Layer):
def __init__(s, msg:Message):
p = msg.p
traits = Traits(
src=IpPort.from_tcp_source(p),
dst=IpPort.from_tcp_dest(p),
)
super().__init__(msg=msg, proto='tcp', msgtype=None, traits=traits, minor=True)
class Layer_udp(Layer):
def __init__(s, msg:Message):
p = msg.p
traits = Traits(
src=IpPort.from_udp_source(p),
dst=IpPort.from_udp_dest(p),
)
super().__init__(msg=msg, proto='udp', msgtype=None, traits=traits, minor=True)
class Layer_sctp(Layer):
def __init__(s, msg:Message):
p = msg.p
traits = Traits(
src=IpPort.from_sctp_source(p),
dst=IpPort.from_sctp_dest(p),
stream_id = p.get('sctp.data_sid'),
stream_seq = p.get('sctp.data_ssn'),
)
super().__init__(msg=msg, proto='sctp', msgtype=None, traits=traits, minor=True)
class Layer_rtp(Layer):
def __init__(s, msg:Message):
pt = msg.p.get('rtp.p_type')
iuup_msgtype = sane_showname(msg.p.get('iuup.pdu_type.showname'))
if iuup_msgtype is not None:
msgtype = f'{pt}.{iuup_msgtype}'
else:
msgtype = pt
traits = Traits(
pt=pt,
iuup=iuup_msgtype
)
super().__init__(msg=msg, proto='rtp', msgtype=msgtype, traits=traits)
def collapse(s, messages, my_idx):
msgtype = s.msg.get_trait('rtp', 'msgtype')
src = s.msg.src
dst = s.msg.dst
for i in reversed(range(my_idx)):
prev_msg = messages[i]
if not prev_msg:
continue
if prev_msg.finalized:
break
if not 'rtp' in prev_msg.layers:
if prev_msg.is_minor():
continue
else:
break
if prev_msg.get_trait('rtp', 'msgtype') != msgtype:
continue
if s.msg.same_src_dst(prev_msg, forward=True):
# found a recent RTP similar RTP packet, combine
prev_msg.count += 1
messages[my_idx] = None
prev_msg.absorb_msg(s.msg)
return prev_msg
if 1 and s.msg.same_src_dst(prev_msg, reverse=True):
# same but backwards
prev_msg.count_back += 1
messages[my_idx] = None
prev_msg.absorb_msg(s.msg)
return prev_msg
return s.msg
# identify_entities: RTP ports are identified by watching RSL and MGCP, see Layer_gsm_abis_rsl.identify_conns_rtp
@classmethod
def identify_conns(cls, messages:list, my_idx:int):
msg = messages[my_idx]
conn = Conn.find('rtp', msg.src, conn_id=msg.src.key())
if conn is not None:
conn.add_message(msg)
conn = Conn.find('rtp', msg.dst, conn_id=msg.dst.key())
if conn is not None:
conn.add_message(msg)
class Layer_mgcp(Layer):
def __init__(s, msg:Message):
p = msg.p
verb = p.get('mgcp.req_verb')
rsp = p.get('mgcp.rsp_rspstring')
msgtype = verb or rsp or '?'
tid = p.get('mgcp.transid', '')
rtp_port = None
sdp_rtp_ip = p.get('sdp.connection_info_address')
sdp_rtp_port = p.get('sdp.media_port')
if sdp_rtp_ip and sdp_rtp_port:
rtp_port = IpPort.get(sdp_rtp_ip, sdp_rtp_port)
if rsp:
endp = p.get('mgcp.param_specificendpointid')
else:
endp = p.get('mgcp.req_endpoint')
if endp and endp.startswith('rtpbridge/*@'):
endp = None
traits = Traits(
tid=tid,
endp=endp,
ci=p.get('mgcp.param_connectionid'),
verb=verb,
rsp=rsp,
rtp_port=rtp_port,
)
s.tid = tid
super().__init__(msg=msg, proto='mgcp', msgtype=msgtype, traits=traits)
def label(s):
return f'mgcp.{s.tid}.{s.msgtype}'
def identify_entities(s, msg:Message, messages, my_idx):
if msg.get_trait('mgcp', 'verb') == 'CRCX':
dst_kind = 'MGW'
if msg.src_entity_is('BSC'):
dst_kind = 'MGW@BSC'
elif msg.src_entity_is('MSC'):
dst_kind = 'MGW@MSC'
return Message.EntityIdent(proto='mgcp', dst_kind=dst_kind)
elif msg.get_trait('mgcp', 'rsp') and msg.src_entity_is('MGW', 'MGW@BSC','MGW@MSC'):
rtp = msg.get_trait('mgcp', 'rtp_port')
if rtp:
msg.src.entity.add_port('rtp', rtp)
return None
@classmethod
def find_req(cls, messages, my_idx):
msg = messages[my_idx]
for match in find_same_trait(msg, messages, my_idx, 'mgcp', 'tid'):
if match.get_trait('mgcp', 'rsp'):
continue
if not match.same_src_dst(msg, reverse=True):
continue
return match
return None
@classmethod
def identify_conns(cls, messages:list, my_idx:int):
msg = messages[my_idx]
proto = 'mgcp'
if msg.get_trait('mgcp', 'verb'):
endp = msg.get_trait('mgcp', 'endp')
mgw_port = msg.dst
endp_conn = Conn.find(proto, mgw_port, endp)
if endp_conn is not None:
endp_conn.add_message(msg)
for c in (msg.src_conns + msg.dst_conns):
endp_conn.merge_subscr_conns(c)
ci = msg.get_trait('mgcp', 'ci')
conn_id = f'{endp}:{ci}'
conn = Conn.find(proto, mgw_port, conn_id)
if conn:
conn.add_message(msg)
for c in (msg.src_conns + msg.dst_conns + msg.meta_conns):
conn.merge_subscr_conns(c)
if msg.get_trait('mgcp', 'rsp') == 'OK':
req = cls.find_req(messages, my_idx)
if req is None:
ERR('MGCP response without request')
return
mgw_port = req.dst
verb = req.get_trait('mgcp', 'verb')
if verb == 'CRCX':
# The MGCP connection
endp = msg.get_trait('mgcp', 'endp') or req.get_trait('mgcp', 'endp')
ci = msg.get_trait('mgcp', 'ci')
if not endp or not ci:
ERR('MGCP CRCX with endp =', endp, 'ci =', ci)
# creating two levels of conn: a meta conn with just endp,
# and a proper one with endp+ci
# endp:
endp_conn = Conn.find(proto, mgw_port, endp)
if not endp_conn:
endp_conn = Conn.open(proto, mgw_port, conn_id=endp, start_msg=msg)
# endp+ci:
conn_id = f'{endp}:{ci}'
conn = Conn.find(proto, mgw_port, conn_id)
if not conn:
conn = Conn.open(proto, mgw_port, conn_id, msg)
conn.add_message(req)
conn.add_message(msg)
conn.merge_subscr_conns(endp_conn)
# The RTP connection set up by MGCP
rtp_port = msg.get_trait('mgcp', 'rtp_port')
if rtp_port:
rtp_conn = Conn.open('rtp', rtp_port, conn_id=rtp_port.key(), start_msg=msg, merge_counterparts=False,
entity=conn.port.entity)
rtp_conn.merge_subscr_conns(conn)
# bssmap or ranap layer will mention this rtp_port in their Assignment / RAB-Assignment
else:
endp = req.get_trait('mgcp', 'endp')
ci = req.get_trait('mgcp', 'ci')
Conn.message(proto, mgw_port, endp, msg)
conn_id = f'{endp}:{ci}'
Conn.message(proto, mgw_port, conn_id, msg)
if verb == 'DLCX':
# The MGCP connection
endp = req.get_trait('mgcp', 'endp')
ci = req.get_trait('mgcp', 'ci')
if not endp:
ERR('MGCP DLCX without endp')
def close_ci(ci):
ci_conn_id = f'{endp}:{ci}'
# go through all RTP ports created in this conn
ci_conn = Conn.find(proto, mgw_port, ci_conn_id)
if ci_conn is not None:
all_rtp_ports = UniqueList()
for msgs in (ci_conn.rx_messages, ci_conn.tx_messages):
for msg in msgs:
all_rtp_ports.append(msg.get_trait('mgcp', 'rtp_port'))
for rtp_port in all_rtp_ports:
Conn.close('rtp', rtp_port, conn_id=rtp_port.key(), close_msg=msg, if_exists=True)
Conn.close(proto, mgw_port, ci_conn_id, msg)
if ci:
close_ci(ci)
else:
endp_conn = Conn.find(proto, mgw_port, endp)
all_ci = UniqueList()
if endp_conn:
for msgs in (endp_conn.tx_messages, endp_conn.rx_messages):
for msg in msgs:
all_ci.append(msg.get_trait('mgcp', 'ci'))
for ci in all_ci:
close_ci(ci)
Conn.close(proto, mgw_port, endp, msg, if_exists=True)
elif msg.get_trait('mgcp', 'verb') == 'MDCX':
# The RTP connection set up by BSC or MSC
rtp_port = msg.get_trait('mgcp', 'rtp_port')
if rtp_port and Conn.find('rtp', rtp_port, conn_id=rtp_port.key()) is None:
rtp_conn = Conn.open('rtp', rtp_port, conn_id=rtp_port.key(), start_msg=msg, merge_counterparts=False)
for c in msg.src_conns:
rtp_conn.merge_subscr_conns(c)
class Layer_sccp(Layer):
def __init__(s, msg:Message):
p = msg.p
msgtype = p.get('sccp.message_type.showname')
traits = Traits(
src_lref=p.get('sccp.slr'),
dst_lref=p.get('sccp.dlr'),
)
super().__init__(msg=msg, proto='sccp', msgtype=msgtype, traits=traits, hidden=True)
def collapse(s, messages, my_idx):
msg = s.msg
# cut out STP hop
if SCCP_COLLAPSE_STP:
src = msg.src
t = msg.timestamp
for i in reversed(range(my_idx)):
prev_msg = messages[i]
if not prev_msg:
continue
if t - prev_msg.timestamp > 1:
break
if prev_msg.absorbed:
continue
prev_sccp = prev_msg.layers.get(s.proto, None)
if prev_sccp is None:
continue
#if src != prev_msg.dst:
# continue
if s.msgtype != prev_sccp.msgtype:
continue
if not msg.same_traits(prev_msg, 'sccp', ('src_lref', 'dst_lref'), allow_unset=True):
continue
if not msg.same_traits(prev_msg, 'sctp', 'stream_id'):
continue
if not msg.same_traits(prev_msg, 'm3ua', ('opc', 'dpc')):
continue
if not msg.same_traits(prev_msg, 'm3ua', 'message_length'):
continue
prev_msg.set_trait('sctp', 'dst', msg.get_trait('sctp', 'dst'))
prev_msg.dst = msg.dst
prev_msg.absorb_msg(msg)
messages[i] = None
messages[my_idx] = prev_msg
Entity.add_to_blacklist(src)
return prev_msg
return msg
@classmethod
def identify_conns(cls, messages:list, my_idx:int):
msg = messages[my_idx]
proto = 'sccp'
msgtype = msg.get_trait(proto, 'msgtype')
if SCCP_COLLAPSE_STP and not msg.absorbed:
return
src_id = msg.get_trait(proto, 'src_lref')
dst_id = msg.get_trait(proto, 'dst_lref')
if msgtype == 'Connection-Request':
Conn.open(proto, msg.src, src_id, msg)
elif msgtype == 'Connection-Confirm':
Conn.open(proto, msg.src, src_id, msg,
counterparts=[Conn.find(proto, msg.dst, dst_id)])
elif msgtype in ('Release-Complete',):
Conn.close(proto, msg.src, src_id, msg)
else:
if src_id:
Conn.message(proto, msg.src, src_id, msg)
if dst_id:
Conn.message(proto, msg.dst, dst_id, msg)
class Layer_m3ua(Layer):
def __init__(s, msg:Message):
traits = Traits(
opc = msg.p.get('m3ua.protocol_data_opc'),
dpc = msg.p.get('m3ua.protocol_data_dpc'),
message_length = msg.p.get('m3ua.message_length'),
)
super().__init__(msg=msg, proto='m3ua', msgtype=None, traits=traits, minor=True)
# wireshark commonly falsely classifies a BSSMAP Ciphering Mode Command as RNSAP PDU
class Layer_rnsap(Layer):
def __init__(s, msg:Message):
p = msg.p
traits = Traits()
msgtype = 'Cipher Mode Command'
super().__init__(msg=msg, proto='bssmap', msgtype=msgtype, traits=traits)
class Layer_bssap(Layer):
'BSSAP, either BSS Management (see Layer_gsm_a_bssmap) or Direct Transfer (see Layer_gsm_a_dtap)'
def __init__(s, msg:Message):
msgtype = msg.p.get('bssap.msgtype.showname')
msgtype = msg.p.get('bssap.pdu_type.showname')
traits = Traits(
msgtype_nr=int(msg.p.get('bssap.pdu_type'), 16),
)
super().__init__(msg=msg, proto='bssap', msgtype=msgtype, traits=traits, minor=True)
class Layer_bssgp(Layer):
def __init__(s, msg:Message):
msgtype = sane_msgtype(msg.p.get('bssgp.pdu_type.showname'))
traits = Traits(
tlli=msg.p.get('bssgp.gsm_a_rr_tlli'),
)
super().__init__(msg=msg, proto='bssgp', msgtype=msgtype, traits=traits)
def identify_entities(s, msg:Message, messages, my_idx):
if msg.get_trait('bssgp', 'msgtype') == 'FLOW-CONTROL-BVC':
return Message.EntityIdent(proto='bssgp', src_kind='PCU', dst_kind='SGSN')
return None
@classmethod
def identify_conns(cls, messages:list, my_idx:int):
msg = messages[my_idx]
proto = 'bssgp'
msgtype = msg.get_trait('dtap', 'msgtype')
tlli = msg.get_trait('bssgp', 'tlli')
conn_id = tlli
if not conn_id:
return
if msgtype == 'Attach-Request':
Conn.open(proto, msg.src, conn_id, msg,
counterparts=[Conn.open(proto, msg.dst, conn_id, msg)])
elif msgtype == 'Attach-Accept':
conn = Conn.close(proto, msg.src, conn_id, msg)
new_conn_id = msg.get_trait('dtap', 'tmsi')
new_conn = Conn.open(proto, msg.src, new_conn_id, msg,
counterparts=[Conn.open(proto, msg.dst, new_conn_id, msg)])
new_conn.merge_subscr_conns(conn)
elif msgtype == 'Attach-Complete':
Conn.close(proto, msg.src, conn_id, msg)
else:
conn = Conn.message(proto, msg.src, conn_id, msg)
class Layer_hnbap(Layer):
def __init__(s, msg:Message):
def strip_till_dash(dashstr):
if not dashstr or not '-' in dashstr:
return dashstr
dash = dashstr.rindex('-')
return dashstr[dash+1:]
msgtype = strip_till_dash(msg.p.get('hnbap.procedurecode.showname'))
pdutype = strip_till_dash(sane_msgtype(msg.p.get('hnbap.hnbap_pdu.showname')))
pdutype_nr = msg.p.get('hnbap.hnbap_pdu')
traits = Traits(
msgtype_nr=int(msg.p.get('hnbap.procedurecode')),
pdutype=pdutype,
pdutype_nr=int(pdutype_nr),
)
super().__init__(msg=msg, proto='hnbap', msgtype=msgtype, traits=traits)
def identify_entities(s, msg:Message, messages, my_idx):
if (msg.get_trait('hnbap', 'msgtype') in ('Register', 'HNBRegister', 'UERegister')) and (msg.get_trait('hnbap', 'pdutype_nr') == 0):
return Message.EntityIdent(proto='Iuh', src_kind='hNodeB', dst_kind='HNBGW')
return None
class Layer_rua(Layer):
def __init__(s, msg:Message):
def strip_till_dash(dashstr):
if not dashstr or not '-' in dashstr:
return dashstr
dash = dashstr.rindex('-')
return dashstr[dash+1:]
msgtype = strip_till_dash(msg.p.get('rua.procedurecode.showname'))
pdutype = strip_till_dash(sane_msgtype(msg.p.get('rua.rua_pdu.showname')))
pdutype_nr = msg.p.get('rua.rua_pdu')
cn_domain_i = msg.p.get('rua.cn_domainindicator')
cn_domain = None
if cn_domain_i == '0':
cn_domain = 'cs'
elif cn_domain_i == '1':
cn_domain = 'ps'
traits = Traits(
msgtype_nr=int(msg.p.get('rua.procedurecode')),
pdutype=pdutype,
cn_domain=cn_domain,
rua_ctx=msg.p.get('rua.context_id'),
)
super().__init__(msg=msg, proto='rua', msgtype=msgtype, traits=traits, cap_p_name='rua')
@classmethod
def identify_conns(cls, messages:list, my_idx:int):
msg = messages[my_idx]
proto = 'rua'
msgtype = msg.get_trait(proto, 'msgtype')
conn_id = (msg.get_trait(proto, 'cn_domain') or '?') + ':' + (msg.get_trait(proto, 'rua_ctx') or '?')
if msgtype == 'Connect':
conn = Conn.open(proto, msg.src, conn_id, msg)
Conn.open(proto, msg.dst, conn_id, msg, counterparts=[conn])
elif msgtype == 'Disconnect':
Conn.close(proto, msg.dst, conn_id, msg)
else:
Conn.message(proto, msg.src, conn_id, msg)
class Layer_ranap(Layer):
def __init__(s, msg:Message):
msgtype = msg.p.get('ranap.rab_assignmentrequest_element'
) or msg.p.get('ranap.rab_assignmentresponse_element'
) or msg.p.get('ranap.initiatingmessage_element')
rtp_port = None
ip = msg.p.get('ranap.nsap_ipv4_addr')
port_bin = msg.p.get('ranap.bindingid.binary_value')
if ip and port_bin and len(port_bin) >= 2:
port = int.from_bytes(port_bin[:2], "big")
rtp_port = IpPort.get(ip, port)
traits = Traits(
rtp_port=rtp_port,
)
super().__init__(msg=msg, proto='ranap', msgtype=msgtype, traits=traits)
def collapse(s, messages, my_idx):
msg = s.msg
# cut out HNBGW hop
if IUH_COLLAPSE_HNBGW:
src = msg.src
t = msg.timestamp
for i in reversed(range(my_idx)):
prev_msg = messages[i]
if not prev_msg:
continue
if t - prev_msg.timestamp > 1:
break
if src != prev_msg.dst:
continue
if msg.src.entity is not prev_msg.dst.entity:
continue
# DOESNT WORK
if not msg.same_traits(prev_msg, 'ranap', None):
continue
if not msg.same_traits(prev_msg, 'sccp', ('src_lref', 'dst_lref'), allow_unset=True):
continue
if not msg.same_traits(prev_msg, 'sctp', 'stream_id'):
continue
if not msg.same_traits(prev_msg, 'm3ua', ('opc', 'dpc')):
continue
prev_msg.set_trait('sctp', 'dst', msg.get_trait('sctp', 'dst'))
prev_msg.dst = msg.dst
prev_msg.absorb_msg(msg)
messages[i] = None
messages[my_idx] = prev_msg
Entity.add_to_blacklist(src)
return prev_msg
return msg
def identify_entities(s, msg:Message, messages, my_idx):
ids = []
ids.append(s.identify_attach(msg, messages, my_idx))
msgtype = msg.get_trait('ranap', 'msgtype')
rtp_port = msg.get_trait('ranap', 'rtp_port')
if rtp_port and msgtype == 'RAB-AssignmentRequest':
# associate the MSC's MGCP port, but take care to not say the STP is an MSC
crcx_ok = msg.find_message('mgcp', 'rtp_port', rtp_port)
if crcx_ok and crcx_ok.src_entity_is('MGW'):
mgw = crcx_ok.src.entity
msc = None
if msg.src_entity_is('MSC'):
msc = msg.src.entity
ids.append(Message.EntityIdent(proto='mgcp', src_port=crcx_ok.src, src_entity=mgw, src_kind='MGW@MSC', rename=True,
dst_entity=msc, dst_port=crcx_ok.dst if msc else None))
if rtp_port and msgtype == 'RAB-AssignmentResponse' and msg.src_entity_is('hNodeB'):
ids.append(Message.EntityIdent(proto='rtp', src_kind='hNodeB', src_port=rtp_port, src_entity=msg.src.entity))
return ids
def identify_attach(s, msg:Message, messages, my_idx):
ids = []
dst_kind = None
msgtype = msg.get_trait('dtap', 'msgtype')
if msgtype in DTAP_COMPL_L3:
dst_kind = 'MSC'
proto = 'IuCS'
elif msgtype in GMM_COMPL_L3:
dst_kind = 'SGSN'
proto = 'IuPS'
if not dst_kind:
return None
if 'rua' in msg.layers:
return Message.EntityIdent(proto='Iuh', src_kind='hNodeB', dst_kind='HNBGW')
# don't mistake the STP as MSC or SGSN
if SCCP_COLLAPSE_STP and not msg.absorbed:
return None
if not SCCP_COLLAPSE_STP and msg.src_entity_is('HNBGW'):
return None
# FIXME: below only makes sense with SCCP_COLLAPSE_STP == True
if not SCCP_COLLAPSE_STP:
return None
# find a HNBGW that has recently received the same LU,
# associate IuCS port
src_entity = None
for match in find_same_trait(msg, messages, my_idx, 'dtap', None):
if 'rua' not in match.layers:
continue
if not match.dst_entity_is('HNBGW'):
continue
src_entity = match.dst.entity
if src_entity:
break
return Message.EntityIdent(proto=proto, src_kind='HNBGW', src_entity=src_entity, dst_kind=dst_kind)
@classmethod
def identify_conns(cls, messages, my_idx):
msg = messages[my_idx]
if SCCP_COLLAPSE_STP and not msg.absorbed:
return
rtp_port = msg.get_trait('ranap', 'rtp_port')
if rtp_port:
rtp_conn = Conn.find('rtp', rtp_port, rtp_port.key())
if rtp_conn:
for c in msg.src_conns:
rtp_conn.merge_subscr_conns(c)
class Layer_gsm_a_bssmap(Layer):
def __init__(s, msg:Message):
p = msg.p
msgtype = p.get('gsm_a_bssmap.msgtype.showname')
rtp_port = None
ip = p.get('gsm_a_bssmap.aoip_trans_ipv4')
port = p.get('gsm_a_bssmap.aoip_trans_port')
if ip and port:
rtp_port = IpPort.get(ip, port)
tmsi = tmsi_standardize(p.get('gsm_a_bssmap.gsm_a_tmsi'))
traits = Traits(
msgtype_nr=str_to_int(p.get('gsm_a_bssmap.msgtype')),
rtp_port=rtp_port,
imsi=p.get('gsm_a_bssmap.e212_imsi'),
tmsi=tmsi,
)
super().__init__(msg=msg, proto='bssmap', msgtype=msgtype, traits=traits, cap_p_name='gsm_a_bssmap')
def identify_entities(s, msg:Message, messages, my_idx):
# don't mistake the STP as MSC or BSC
if SCCP_COLLAPSE_STP and not msg.absorbed:
return None
msgtype = msg.get_trait('bssmap', 'msgtype')
if msgtype in ('Complete-Layer-3-Information', 'Clear-Complete'):
# associate BSC BSSMAP port with BSC RSL port
src_entity = None
for match in find_same_trait(msg, messages, my_idx, 'dtap', ('tmsi', 'imsi')):
if 'rsl' not in match.layers:
continue
if not match.dst.entity or match.dst.entity.kind != 'BSC':
continue
src_entity = match.dst.entity
if src_entity:
break
return Message.EntityIdent(proto='bssmap', src_kind='BSC', dst_kind='MSC', src_entity = src_entity)
if msgtype == 'Assignment-Request':
# This Assignment-Request's rtp_port should match an earlier MGCP CRCX-OK message, and we now
# know that this MSC asked for it.
rtp_port = msg.get_trait('bssmap', 'rtp_port')
if not rtp_port:
return None
def cond(prev_msg):
return prev_msg.get_trait('mgcp', 'rtp_port') == rtp_port
crcx_ok = None
for prev_msg in find_recent_msg(msg, messages, my_idx, cond):
crcx_ok = prev_msg
break
if not crcx_ok:
return None
msc = msg.src.entity
mgw = crcx_ok.src.entity
return Message.EntityIdent(proto='mgcp', src_port=crcx_ok.src, src_entity=mgw, src_kind='MGW@MSC', rename=True,
dst_port=crcx_ok.dst, dst_entity=msc, dst_kind='MSC')
return None
@classmethod
def identify_conns(cls, messages, my_idx):
msg = messages[my_idx]
msgtype = msg.get_trait('bssmap', 'msgtype')
if SCCP_COLLAPSE_STP and not msg.absorbed:
return
# Paging does not have a proper end, it may never be answered.
# The RSL paging command hopefully happens, and closes this Conn.
if msgtype == 'Paging':
imsi = msg.get_trait('bssmap', 'imsi')
if imsi is not None:
c = Conn.open('bssmap', msg.dst, f'page_imsi{imsi}', msg)
Conn.close_conn(c, msg)
tmsi = msg.get_trait('bssmap', 'tmsi')
if tmsi is not None:
c = Conn.open('bssmap', msg.dst, f'page_tmsi{tmsi}', msg)
Conn.close_conn(c, msg)
rtp_port = msg.get_trait('bssmap', 'rtp_port')
if rtp_port is None:
return
rtp_conn = Conn.find('rtp', rtp_port, rtp_port.key())
if rtp_conn is None:
return
for c in msg.src_conns:
rtp_conn.merge_subscr_conns(c)
def cond(prev_msg):
return prev_msg.get_trait('mgcp', 'rtp_port') == rtp_port
crcx_ok = None
for prev_msg in find_recent_msg(msg, messages, my_idx, cond):
for c in (prev_msg.src_conns + prev_msg.dst_conns):
rtp_conn.merge_subscr_conns(c)
class Layer_gsm_abis_rsl(Layer):
def __init__(s, msg:Message):
p = msg.p
msgtype = sane_msgtype(p.get('gsm_abis_rsl.msg_type.showname'))
msgtype_nr = p.get('gsm_abis_rsl.msg_type')
msgtype_nr = str_to_int(msgtype_nr)
ch = None
ch_imm_ass = None
# For Immediate Assignment, the assigned TS/chan is more interesting
ts = p.get('gsm_a_ccch.gsm_a_rr_timeslot')
cbits = (p.get('gsm_a_ccch.gsm_a_rr_sdcch4_sdcchc4_cbch')
or p.get('gsm_a_ccch.gsm_a_rr_sdcch8_sdcchc8_cbch'))
try:
ch_ts = int(ts)
ch_cbits = int(cbits)
ch_imm_ass = f'{ch_ts}-{ch_cbits}'
except:
pass
# normal RSL messages on a given TS/chan
ts = p.get('gsm_abis_rsl.ch_no_tn')
cbits = p.get('gsm_abis_rsl.ch_no_cbits')
if ts is not None and cbits is not None:
try:
ch_ts = int(ts)
ch_cbits = int(cbits)
ch = f'{ch_ts}-{ch_cbits}'
except:
raise
ch_assign = None
new_ch_ts = p.get('gsm_a_dtap.gsm_a_rr_timeslot')
new_ch_ss = p.get('gsm_a_dtap.gsm_a_rr_tch_facch_sacchf')
if new_ch_ts and new_ch_ss:
ch_assign = f'{new_ch_ts}-{new_ch_ss}'
rtp_local_port = None
ipacc_rtp_local_ip = p.get('gsm_abis_rsl.ipacc_local_ip')
ipacc_rtp_local_port = p.get('gsm_abis_rsl.ipacc_local_port')
if ipacc_rtp_local_ip and ipacc_rtp_local_port:
rtp_local_port = IpPort.get(ipacc_rtp_local_ip, ipacc_rtp_local_port)
rtp_remote_port = None
ipacc_rtp_remote_ip = p.get('gsm_abis_rsl.ipacc_remote_ip')
ipacc_rtp_remote_port = p.get('gsm_abis_rsl.ipacc_remote_port')
if ipacc_rtp_remote_ip and ipacc_rtp_remote_port:
rtp_remote_port = IpPort.get(ipacc_rtp_remote_ip, ipacc_rtp_remote_port)
req_ref_l = (p.get('gsm_abis_rsl.req_ref_ra'),
p.get('gsm_abis_rsl.req_ref_t1prim'),
p.get('gsm_abis_rsl.req_ref_t2'),
p.get('gsm_abis_rsl.req_ref_t3'))
if not all(req_ref_l):
req_ref_l = (p.get('gsm_a_ccch.gsm_a_rr_ra'),
p.get('gsm_a_ccch.gsm_a_rr_t1prim'),
p.get('gsm_a_ccch.gsm_a_rr_t2'),
p.get('gsm_a_ccch.gsm_a_rr_t3'))
req_ref = None
if all(req_ref_l):
req_ref = '-'.join(req_ref_l)
try:
page_tmsi = tmsi_standardize(msg.p.cap_p.gsm_abis_rsl._all_fields['3gpp.tmsi'])
except:
page_tmsi = None
traits = Traits(
msgtype_nr=msgtype_nr,
ch=ch,
ch_imm_ass=ch_imm_ass,
ch_assign=ch_assign,
chan_type=p.get('gsm_abis_rsl.ch_type'),
rtp_port=rtp_local_port,
rtp_remote_port=rtp_remote_port,
tmsi=tmsi_standardize(p.get('gsm_abis_rsl.gsm_a_tmsi')),
imsi=p.get('gsm_abis_rsl.gsm_a_imsi'),
page_tmsi=page_tmsi,
arfcn = p.get('gsm_a_dtap.gsm_a_rr_single_channel_arfcn') or p.get('gsm_abis_rsl.gsm_a_rr_single_channel_arfcn'),
req_ref = req_ref,
)
super().__init__(msg=msg, proto='rsl', msgtype=msgtype, traits=traits, cap_p_name='gsm_abis_rsl')
# ignore CCCH Load INDication
#if msgtype_nr == 18:
# msg.hide = True
def identify_entities(s, msg:Message, messages, my_idx):
ids = []
msgtype = msg.get_trait('rsl', 'msgtype')
if msgtype in ('RF-RESource-INDication', 'CCCH-LOAD-INDication', 'CHANnel-ReQuireD', ):
# INDication from BTS to BSC
ids.append(Message.EntityIdent(proto='rsl', src_kind='BTS', dst_kind='BSC'))
if msgtype in ('CHANnel-ACTIVation', 'IMMEDIATE-ASSIGN-COMMAND'):
# from BSC to BTS
ids.append(Message.EntityIdent(proto='rsl', dst_kind='BTS', src_kind='BSC'))
rtp_port = msg.get_trait('rsl', 'rtp_port')
if rtp_port and msg.src_entity_is('BTS') and msgtype in (
'ip.access-CRCX-ACK', 'ip.access-MDCX-ACK'):
ids.append(Message.EntityIdent(proto='rtp', src_kind='BTS', src_entity=msg.src.entity, src_port=rtp_port))
if msgtype == 'ip.access-MDCX':
# This ip.a MDCX's rtp_remote_port should match an earlier MGCP CRCX-OK message, and we now
# know that this BSC asked for it.
rtp_port = msg.get_trait('rsl', 'rtp_remote_port')
if not rtp_port:
return None
def cond(prev_msg):
return prev_msg.get_trait('mgcp', 'rtp_port') == rtp_port
crcx_ok = None
for prev_msg in find_recent_msg(msg, messages, my_idx, cond):
crcx_ok = prev_msg
break
if not crcx_ok:
return None
bsc = msg.src.entity
mgw = crcx_ok.src.entity
ids.append( Message.EntityIdent(proto='mgcp', src_port=crcx_ok.src, src_entity=mgw, src_kind='MGW@BSC', rename=True,
dst_port=crcx_ok.dst, dst_entity=bsc, dst_kind='BSC') )
return ids
def collapse(s, messages, my_idx):
# combine duplicates like rsl.CCCH-LOAD-INDication
for i in reversed(range(my_idx)):
prev_msg = messages[i]
if not prev_msg:
continue
if prev_msg.finalized:
break
# stop combining at any non-rsl (and non-minor) message
if not 'rsl' in prev_msg.layers:
if all(l.minor for l in prev_msg.layers.values()):
continue
else:
break
if not same_nonempty(prev_msg.get_traits('rsl'), s.msg.get_traits('rsl')):
continue
if s.msg.same_src_dst(prev_msg, forward=True):
# found a recent similar packet, combine
prev_msg.count += 1
messages[my_idx] = None
prev_msg.absorb_msg(s.msg)
return prev_msg
return s.msg
@classmethod
def identify_conns_ra(cls, messages:list, my_idx:int):
msg = messages[my_idx]
msgtype = msg.get_trait('rsl', 'msgtype')
bts = msg.entity('BTS')
if bts is None:
return None
bts_port = msg.get_port('BTS')
if bts_port is None:
return None
ra = msg.get_trait('rsl', 'req_ref')
if ra is None:
return None
bts_ra = f'{bts_port.entity.label()}.ra{ra}'
proto = 'rsl'
conn_id = bts_ra
conn = None
if msgtype == 'CHANnel-ReQuireD':
if not msg.src_entity_is('BTS'):
return
bts = msg.src.entity
bts_port = msg.src
bsc = msg.dst.entity
bsc_port = msg.dst
conn = Conn.open(proto, bts_port, conn_id, msg, entity=bts)
Conn.open(proto, bsc_port, conn_id, msg, entity=bsc, counterparts=[conn])
elif msgtype in ('CHANnel-ACTIVation'):
conn = Conn.message(proto, msg.dst, conn_id, msg)
elif msgtype == 'IMMEDIATE-ASSIGN-COMMAND':
# the RA token has fulfilled its use as soon as an IMM ASS happened
Conn.close(proto, msg.dst, conn_id, msg)
else:
Conn.message(proto, msg.dst, conn_id, msg)
if conn:
for c in (msg.src_conns + msg.dst_conns):
conn.merge_subscr_conns(c)
@classmethod
def identify_conns_ch(cls, messages:list, my_idx:int):
msg = messages[my_idx]
proto = 'rsl'
msgtype = msg.get_trait('rsl', 'msgtype')
# For Immediate Assignment, the assigned TS/chan is the one to match on
if msgtype == 'IMMEDIATE-ASSIGN-COMMAND':
ch = msg.get_trait('rsl', 'ch_imm_ass')
else:
ch = msg.get_trait('rsl', 'ch')
if ch is None:
return None
if msgtype in ('CHANnel-ACTIVation') or msg.src_entity_is('BSC'):
bsc = msg.src.entity
bsc_port = msg.src
bts = msg.dst.entity
bts_port = msg.dst
elif msg.src_entity_is('BTS'):
bts = msg.src.entity
bts_port = msg.src
bsc = msg.dst.entity
bsc_port = msg.dst
else:
return None
if bts_port.entity is None:
return None
bts_ch = f'{bts_port.entity.label()}.ch{ch}'
conn_id = bts_ch
if msgtype == 'CHANnel-ACTIVation':
conn = Conn.open(proto, bts_port, conn_id, msg, entity=bts)
Conn.open(proto, bsc_port, conn_id, msg, entity=bsc, counterparts=[conn])
elif msgtype == 'RF-CHANnel-RELease-ACKnowledge':
conn = Conn.close(proto, msg.src, conn_id, msg)
else:
conn = Conn.message(proto, bts_port, conn_id, msg)
if conn:
for c in (msg.src_conns + msg.dst_conns):
conn.merge_subscr_conns(c)
# when changing to a new ch, e.g. a regular Assignment Command to change from SDCCH to TCH,
# associate the new channel with the conn
ch_assign = msg.get_trait('rsl', 'ch_assign')
if ch_assign:
conn_id = f'{bts_port.entity.label()}.ch{ch_assign}'
conn = Conn.find(proto, bts_port, conn_id)
if conn:
conn.add_message(msg)
for c in (msg.src_conns + msg.dst_conns):
conn.merge_subscr_conns(c)
@classmethod
def identify_conns_rtp(cls, messages:list, my_idx:int):
msg = messages[my_idx]
# BTS RTP
rtp_port = msg.get_trait('rsl', 'rtp_port')
if rtp_port and msg.src_entity_is('BTS') and msg.get_trait('rsl','msgtype') == 'ip.access-CRCX-ACK':
rtp_port.entity = msg.src.entity
rtp_conn = Conn.open('rtp', rtp_port, conn_id=rtp_port.key(), start_msg=msg, add_message=False)
for c in (msg.src_conns + msg.dst_conns):
rtp_conn.merge_subscr_conns(c)
# MGW@BSC RTP towards BTS
rtp_port = msg.get_trait('rsl', 'rtp_remote_port')
if rtp_port:
rtp_conn = Conn.find('rtp', rtp_port, conn_id=rtp_port.key())
if rtp_conn:
rtp_conn.add_message(msg)
for c in (msg.src_conns + msg.dst_conns):
rtp_conn.merge_subscr_conns(c)
@classmethod
def identify_conns_paging(cls, messages:list, my_idx:int):
msg = messages[my_idx]
msgtype = msg.get_trait('rsl', 'msgtype')
if msgtype != 'PAGING-CoMmanD':
return
tmsi = msg.get_trait('rsl', 'page_tmsi')
if tmsi is None:
return
conn_id = f'page_tmsi{tmsi}'
subscr = Subscriber.by_tmsi(tmsi)
if subscr is None:
return
rsl_conn = Conn.open('rsl', msg.src, conn_id, msg)
subscr_conn = SubscriberConn()
subscr.add_subscriber_conn(subscr_conn)
subscr_conn.add_conn(rsl_conn)
# Paging does not have a proper end, it may never be answered.
# A Conn wants to be closed at some point. Just close it directly.
Conn.close_conn(rsl_conn, msg)
# Expecting a recent Paging on BSSMAP
bssmap_paging = None
def cond(prev_msg):
return (prev_msg.get_trait('bssmap', 'msgtype') == 'Paging'
and prev_msg.get_trait('bssmap', 'tmsi') == tmsi)
for match in find_recent_msg(msg, messages, my_idx, cond):
bssmap_paging = match
break
if bssmap_paging is None:
return
bssmap_conn = Conn.find('bssmap', bssmap_paging.dst, conn_id, find_in_closed_conns=True)
if bssmap_conn:
rsl_conn.merge_subscr_conns(bssmap_conn)
@classmethod
def identify_conns(cls, messages:list, my_idx:int):
cls.identify_conns_ra(messages, my_idx)
cls.identify_conns_ch(messages, my_idx)
cls.identify_conns_rtp(messages, my_idx)
cls.identify_conns_paging(messages, my_idx)
class Layer_gsm_a_dtap(Layer):
def __init__(s, msg:Message):
dtap = msg.p.get('gsm_a_dtap')
assert dtap is not None
msgtype = None
msgtype_nr = None
for f in dtap._get_all_fields_with_alternates():
if f.name.startswith('gsm_a.dtap.msg_') and f.name.endswith('_type'):
msgtype = f.showname_value
try:
msgtype_nr = int(f.raw_value)
except:
pass
traits = Traits(
msgtype_nr=msgtype_nr,
imsi=msg.p.get('gsm_a_dtap.e212_imsi') or msg.p.get('gsm_a_dtap.gsm_a_imsi'),
tmsi=tmsi_standardize(msg.p.get('gsm_a_dtap.gsm_a_tmsi') or msg.p.get('gsm_a_dtap.3gpp_tmsi')),
imei=msg.p.get('gsm_a_dtap.gsm_a_imei'),
to_msisdn=msg.p.get('gsm_a_dtap.cld_party_bcd_num'),
)
super().__init__(msg=msg, proto='dtap', msgtype=msgtype, traits=traits, cap_p_name='gsm_a_dtap')
class Layer_gsup(Layer):
def __init__(s, msg:Message):
msgtype = sane_msgtype(msg.p.get('gsup.msg_type.showname'))
msisdn = None
if '-forwardSM-' not in msgtype:
msisdn = msg.p.get('gsup.e164_msisdn')
to_msisdn = msg.p.get('gsup.gsm_sms_tp_da')
msgtype_nr = None
is_request = None
try:
msgtype_nr = int(msg.p.get('gsup.msg_type'))
is_request = not (msgtype_nr & 0x3)
except:
pass
session_state = None
try:
session_state = int(msg.p.get('gsup.session_state'))
except:
pass
traits = Traits(
imsi=msg.p.get('gsup.e212_imsi'),
msgtype_nr=msgtype_nr,
is_request=is_request,
cn_domain=sane_showname(msg.p.get('gsup.cn_domain.showname')),
msisdn=msisdn,
to_msisdn=to_msisdn,
source_name=msg.p.get('gsup.source_name_text'),
destination_name=msg.p.get('gsup.destination_name_text'),
session_id=msg.p.get('gsup.session_id'),
session_state=session_state,
)
super().__init__(msg=msg, proto='gsup', msgtype=msgtype, traits=traits)
@classmethod
def identify_conns(cls, messages:list, my_idx:int):
msg = messages[my_idx]
imsi = msg.get_trait('gsup', 'imsi')
is_request = msg.get_trait('gsup', 'is_request')
session_id = msg.get_trait('gsup', 'session_id')
session_state = msg.get_trait('gsup', 'session_state')
if not imsi:
return
proto = 'gsup'
if session_id:
conn_id = f'{imsi}:{session_id}'
have = Conn.find(proto, msg.src, conn_id) or Conn.find(proto, msg.dst, conn_id)
if have is None:
have = Conn.open(proto, msg.src, conn_id, msg,
counterparts=[Conn.open(proto, msg.dst, conn_id, msg)])
if session_state is not None and session_state == 0x3:
Conn.close(proto, msg.src, conn_id, msg)
else:
have.add_message(msg)
else:
conn_id = f'{imsi}'
if is_request:
have = Conn.find(proto, msg.src, conn_id)
if have is None:
Conn.open(proto, msg.src, conn_id, msg)
else:
have.add_message(msg)
else:
have = Conn.find(proto, msg.dst, conn_id)
if have is None:
# it's a stray response? anyway create a conn for association with a subscriber
Conn.open(proto, msg.dst, conn_id, msg)
Conn.close(proto, msg.dst, conn_id, msg)
def identify_entities(s, msg:Message, messages, my_idx):
if msg.get_trait('gsup', 'msgtype') in ('SendAuthInfo-Request', 'UpdateLocation-Request', 'PurgeMS-Request'):
cn = msg.get_trait('gsup', 'cn_domain')
src_kind = None
src_entity = None
src_subscr_conn = None
if msg.get_trait('gsup', 'source_name'):
# proxy forwarding
src_kind = 'HLR'
traits = [name for proto, name, result in msg.get_traits('gsup') if name not in ('source_name', 'destination_name')]
for match in find_same_trait(msg, messages, my_idx, 'gsup', traits):
if not match.dst_entity_is('HLR'):
continue
src_entity = match.dst.entity
break
else:
if cn == 'CS':
src_kind = 'MSC'
elif cn == 'PS':
src_kind = 'SGSN'
if src_kind:
# associate MSC GSUP port with MSC BSSMAP port
imsi = msg.get_trait('gsup', 'imsi')
if imsi:
subscr = Subscriber.by_imsi(imsi)
def cond(prev_msg):
return (prev_msg.dst_entity_is(src_kind)
and prev_msg.is_subscriber_related(subscr))
for match in find_recent_msg(msg, messages, my_idx, cond):
src_entity = match.dst.entity
break
# i forgot whatever this does:
if not src_entity:
src_entity, src_subscr_conn = s.msg.find_entity(src_kind,
with_port=('bssgp', 'IuPS'))
else:
# no cn_domain in the GSUP message, try to guess
msc, msc_subscr_conn = s.msg.find_entity('MSC')
sgsn, sgsn_subscr_conn = s.msg.find_entity('SGSN')
if msc and not sgsn:
src_entity = msc
src_subscr_conn = msc_subscr_conn
if sgsn and not msc:
src_entity = sgsn
src_subscr_conn = sgsn_subscr_conn
if src_subscr_conn is not None and msg.src_conns:
for c in msg.src_conns:
c.subscriber_conn = SubscriberConn.merge(c.subscriber_conn, src_subscr_conn)
return Message.EntityIdent(proto='gsup', src_kind=src_kind, dst_kind='HLR', src_entity=src_entity, dst_entity=msg.dst.entity)
class Layer_sip(Layer):
def __init__(s, msg:Message):
method = msg.p.get('sip.method')
cseq_method = msg.p.get('sip.cseq_method')
status_code = msg.p.get('sip.status_code')
status_line = msg.p.get('sip.status_line')
if status_line:
if '--' in status_line:
status_line = status_line[:status_line.index('--')]
status_line = status_line.strip()
status = status_line.split()[-1]
else:
status = status_code
if status:
msgtype = f'{cseq_method}-{status}'
elif method:
msgtype = method
else:
msgtype = cseq_method
sip_from_host = msg.p.get('sip.from_host')
sip_from_port = msg.p.get('sip.from_port')
sip_from = IpPort.get(sip_from_host, sip_from_port)
rtp_port = None
ip = msg.p.get('sip.sdp_connection_info_address')
port = msg.p.get('sip.sdp_media_port')
if ip and port:
rtp_port = IpPort.get(ip, port)
server = msg.p.get('sip.Server')
agent = msg.p.get('sip.User-Agent')
traits = Traits(
sip_agent=server or agent,
sip_from=sip_from,
call_id = msg.p.get('sip.call_id'),
method = cseq_method,
seq = msg.p.get('sip.cseq_seq'),
to_msisdn = msg.p.get('sip.to_user'),
from_msisdn = msg.p.get('sip.from_user'),
from_tag = msg.p.get('sip.from_tag'),
r_uri = msg.p.get('sip.r_uri'),
status_code = status_code,
rtp_port=rtp_port,
)
super().__init__(msg=msg, proto='sip', msgtype=msgtype, traits=traits)
def identify_entities(s, msg:Message, messages, my_idx):
src_kind = 'SIP'
sip_from = msg.get_trait('sip', 'sip_from')
agent = msg.get_trait('sip', 'sip_agent')
rename = False
if agent and sip_from and sip_from == msg.src:
rename = 'src'
if agent.startswith('kamailio'):
src_kind = 'PBX'
elif agent.startswith('sofia'):
src_kind = 'SIPCON'
else:
rename = False
return Message.EntityIdent(proto='sip', src_kind=src_kind, dst_kind='SIP', rename=rename)
@classmethod
def identify_conns(cls, messages:list, my_idx:int):
msg = messages[my_idx]
msgtype = msg.get_trait('sip', 'msgtype')
call_id = msg.get_trait('sip', 'call_id')
sip_from = msg.get_trait('sip', 'sip_from')
if msgtype in ('INVITE','INVITE-OK') and sip_from and sip_from == msg.src:
conn = Conn.open('sip', msg.src, call_id, start_msg=msg)
rtp_port = msg.get_trait('sip', 'rtp_port')
if rtp_port is None:
return
rtp_conn = Conn.find('rtp', rtp_port, conn_id=rtp_port.key())
if rtp_conn is None:
return
if not conn.subscriber_conn:
conn.merge_subscr_conns(rtp_conn)
else:
conn = Conn.message('sip', msg.src, call_id, msg)
if conn is None:
return
class Layer_gsmtap_log(Layer):
def __init__(s, msg:Message):
app = msg.p.get('gsmtap_log.ident')
level = msg.p.get('gsmtap_log.level')
level_str = sane_showname(msg.p.get('gsmtap_log.level.showname'))
logmsg = msg.p.get('gsmtap_log.string')
cat = msg.p.get('gsmtap_log.subsys')
if level_str != 'ERROR':
return
return
msgtype = f'{app}.{level_str}'
msg.log(Color.colored('red', logmsg))
traits = Traits(
msgtype=msgtype,
level=level,
app=app,
cat=cat,
logmsg=logmsg,
)
super().__init__(msg=msg, proto='log', msgtype=msgtype, traits=traits)
def identify_entities(s, msg:Message, messages, my_idx):
app = msg.get_trait('log', 'app')
if app.startswith('Osmo'):
app = app[4:]
return Message.EntityIdent(proto='log', src_kind=app, dst_kind='LOG')
class MessageFilter:
def __init__(s, layer=None, idx=None, values=[], negate=False):
set_instance_vars_from_args()
def matches(s, msg:Message):
r = s._matches(msg)
if s.negate:
return not r
return r
def _matches(s, msg:Message):
if s.layer and s.layer not in msg.layers:
return False
if s.idx and not (msg.p.idx == s.idx or any(a.p.idx == s.idx for a in msg.absorbed)):
return False
if s.values:
layers = s.layer or msg.layers.keys()
for k,v in s.values:
for proto, name, result in msg.get_traits(layers, k):
if v is None and name == k:
return True
if result is None or result == v:
return True
p_layers = None
if s.layer:
msg_layer = msg.layers.get(s.layer)
if msg_layer:
p_layers = [msg_layer.cap_p_name]
else:
p_layers = [s.layer]
else:
p_layers = [layer.cap_p_name for layer in msg.layers.values()]
for k,v in s.values:
for p_layer in p_layers:
p_val = msg.p.get(p_layer + '.' + k)
if p_val is None:
continue
if v is None:
return True
if v == str(p_val):
return True
return False
return True
@classmethod
def debug(cls, flt_list, msg:Message):
layers = set()
for flt in flt_list:
if flt.negate:
continue
if flt.layer is None:
layers.update(msg.layers.keys())
else:
layers.add(flt.layer)
for layer_name, layer in msg.layers.items():
if layer_name not in layers:
continue
LOG(dir_p(msg.p, layer.cap_p_name))
def __repr__(s):
t = []
if s.negate:
t.append('NOT')
if s.layer:
t.append(f'layer {s.layer!r}')
if s.idx:
t.append(f'packet nr {s.idx}')
if s.values:
t_v = [(f'{k} == {v!r}' if v is not None else f'has {k}') for k,v in s.values]
t.append('values: ' + (' or '.join(t_v)))
return ', '.join(t)
word_re = re.compile('^[a-zA-Z0-9_=-]*')
@classmethod
def parse(cls, spec_str):
if spec_str is None:
return []
if not spec_str:
return [MessageFilter()]
filters = []
token = None
try:
for token in spec_str.split(','):
negate = False
if token.startswith('!'):
token = token[1:]
negate = True
layer = cls.word_re.match(token).group()
flt = MessageFilter(layer=layer, negate=negate)
rest = cls.word_re.split(token)[1]
while rest:
char = rest[0]
rest = rest[1:]
word = cls.word_re.match(rest).group()
rest = cls.word_re.split(rest)[1]
if char == '#':
flt.idx = int(word)
elif char == '.':
if not flt.values:
flt.values = []
if '=' in word:
name = word[:word.index('=')]
val = word[word.index('=')+1:]
flt.values.append((name, val))
else:
flt.values.append((word, None))
else:
raise Exception('Unknown token: %r' % (char + word))
filters.append(flt)
return filters
except:
out_error('Some mistake in message filter: %r in token %r' % (spec_str, token))
raise
@classmethod
def match(cls, flt_list, msg):
any_match = False
for flt in flt_list:
r = flt.matches(msg)
if flt.negate and not r:
return False
any_match = any_match or r
return any_match
DOC = '''messagefilter examples, for --filter-msg and --debug:
dtap
All messages that contain a DTAP layer.
dtap.msgtype=Identity-Request
All DTAP msgtype=Identity-Request messages; for debug, show
dtap layer.
.msgtype=Identity-Request
All msgtype=Identity-Request messages; for debug, show all
layers. (Value names can be either parsed traits or raw packet
names.)
sccp.src_lref=0x00010000.dst_lref=0x00010000
All messages with an SCCP layer and either src_lref or dst_lref
== 0x00010000.
.imsi
All messages where any layer contains a value named 'imsi'.
'#123'
Message number 123 (don't forget to quote for the shell).
'gsup,#614,sccp.src_lref=0x00010000'
Show all GSUP messages, packet number 614 and all SCCP with
given source local reference.
'!rsl.msgtype=CCCH-LOAD-INDication'
Don't show CCCH-LOAD-INDication message types (quote for the
shell).
'''
class UI:
def __init__(s, opts, finalize_after_seconds=5):
set_instance_vars_from_args()
s.messages = []
s.finalized_idx = -1
s.show_traits = None
if s.opts.show_traits:
if s.opts.show_traits == 'all':
s.show_traits = True
else:
s.show_traits = s.opts.show_traits.split(',')
s.show_conns = None
if s.opts.show_conns:
if s.opts.show_conns == 'all':
s.show_conns = True
else:
s.show_conns = s.opts.show_conns.split(',')
s.filter_msg = MessageFilter.parse(s.opts.filter_msg)
for flt in s.filter_msg:
out_text_now('Filter-msg:', flt)
s.debug = MessageFilter.parse(s.opts.debug)
for dbg in s.debug:
out_text_now('Debug:', dbg)
s.filter_subscr = []
if s.opts.filter_subscr:
tokens = s.opts.filter_subscr.split(',')
names = ('imsi', '0x', 'imei', 'msisdn', 'tmsi')
for token in tokens:
handled = False
for name in names:
if token.startswith(name):
token_val = token[len(name):]
if not token_val.isdigit():
continue
s.filter_subscr.append(name + token[len(name):])
handled = True
break
if not handled:
s.filter_subscr.append(token)
s.filter_subscr.extend([name + token for name in names])
def out_text_now(s, *args, **kwargs):
'to be implemented by child class'
assert False
def out_error(s, *args, **kwargs):
'to be implemented by child class'
assert False
def msg_finalized(s, msg, apply_filter=True):
'to be implemented by child class'
assert False
def flush_msg(s, msg):
msg.finalized = True
s.msg_finalized(msg)
def msg_filter(s, msg):
'''Return True when the message passes active message filtering, False if it should be hidden'''
if s.filter_msg and not MessageFilter.match(s.filter_msg, msg):
return False
if all(l.minor for l in msg.layers.values()):
return False
if msg.hide:
return False
if s.filter_subscr:
match = False
match_vals = set()
for subscr in msg.related_subscribers():
match_vals.update((f'imsi{subscr.imsi}', f'imei{subscr.imei}', f'msisdn{subscr.msisdn}'))
match_vals.update(subscr.tmsis)
match_vals.update(f'tmsi{tmsi[2:]}' for tmsi in subscr.tmsis)
match_vals.update(f'tmsi{tmsi}' for tmsi in subscr.tmsis)
if not any(token in match_vals for token in s.filter_subscr):
return False
return True
def flush(s, timestamp_now=0, finalize_after_seconds=0):
flush_t = timestamp_now - finalize_after_seconds
for i in range(s.finalized_idx+1, len(s.messages)):
msg = s.messages[i]
if not msg:
continue
if timestamp_now and msg.timestamp > flush_t:
break
s.finalized_idx = i
s.flush_msg(msg)
def start(s):
pass
def stop(s):
pass
def add_msg(s, msg):
global g_current_msg
s.flush(msg.timestamp, s.finalize_after_seconds)
try:
if s.debug and any(dbg_filter.matches(msg) for dbg_filter in s.debug):
msg.debug = True
g_current_msg = msg
if not msg.layers:
return
s.messages.append(msg)
idx = len(s.messages) - 1
changed_msg = msg.collapse(s.messages, idx)
# if the received message was absorbed by another, continue to identify the modified message using the
# new index
if changed_msg is not None and changed_msg is not msg:
msg = changed_msg
idx = s.messages.index(msg)
msg.identify_entities(s.messages, idx)
msg.identify_conns(s.messages, idx)
Subscriber.identify_subscriber(msg)
except:
s.flush()
out_error('Exception')
raise
def process_messages(s, msg_src):
for msg in msg_src.next():
if 0 and msg.p.idx in (3676,):
msg.log('rsl all_fields ', msg.p.cap_p.gsm_abis_rsl._all_fields)
msg.log(msg.p.all_str('gsm_abis_rsl'))
s.add_msg(msg)
s.flush()
msg_src.done()
class UI_Plain(UI):
def out_text_now(s, *args, **kwargs):
print(to_text(*args, **kwargs))
def out_error(s, *args, **kwargs):
s.out_text_now(Color.colored('red', '*** ERROR:'), *args, **kwargs)
if g_current_msg:
s.out_text_now(Color.colored('red', '*** ERROR: while processing msg'), g_current_msg.str(show_traits=True, show_conns=True))
s.out_text_now(trace())
def msg_print(s, msg, apply_filter=True):
if apply_filter and not s.msg_filter(msg):
return
s.out_text_now(msg.str(ladder=True, one_column_per_kind=True, show_traits=s.show_traits, show_conns=s.show_conns))
if s.debug and MessageFilter.match(s.debug, msg):
MessageFilter.debug(s.debug, msg)
def msg_finalized(s, msg):
s.msg_print(msg, apply_filter=True)
class UI_Quiet(UI_Plain):
def msg_finalized(s, msg):
pass
class UI_Curses(UI):
pass
class MsgSource:
def __init__(s, opts):
set_instance_vars_from_args()
s.start_t = None
s.p_min_t = None
s.p_max_t = None
s.end_t = None
def _next_cap_p(s):
assert False
def next(s) -> Message:
p_idx = 0
s.start_t = time.time()
warn_t = s.start_t
warn_p_t = None
for cap_p in s._next_cap_p():
p_idx += 1
if p_idx < s.opts.packet_start:
continue
if s.opts.packet_count and (p_idx - s.opts.packet_start) > s.opts.packet_count:
break
if s.opts.packet_end and p_idx > s.opts.packet_end:
break
msg = Message.parse(Packet(p_idx, cap_p))
if msg is None or not msg.layers:
continue
s.p_min_t = msg.timestamp if s.p_min_t is None else min(s.p_min_t, msg.timestamp)
s.p_max_t = msg.timestamp if s.p_max_t is None else max(s.p_max_t, msg.timestamp)
now = time.time()
if warn_p_t is None or now > warn_t + 3:
if warn_p_t:
packet_time = s.p_max_t - warn_p_t
real_time = now - warn_t
if real_time > (1.3 * packet_time):
out_text_now(f'! taking longer to calculate than packets arrive by {100.*(real_time - packet_time)/packet_time:.1f}%')
warn_t = now
warn_p_t = s.p_max_t
yield msg
def done(s):
if s.start_t is None or s.p_min_t is None:
out_text_now('Nothing processed.')
s.end_t = time.time()
out_text_now(f'packet time: {s.p_max_t - s.p_min_t:.1f}s in real time: {s.end_t - s.start_t:.1f}s')
class MsgSource_File(MsgSource):
def __init__(s, path, opts):
set_instance_vars_from_args()
super().__init__(opts)
def _next_cap_p(s):
for cap_p in pyshark.FileCapture(s.path):
yield cap_p
class MsgSource_Live(MsgSource):
def __init__(s, iface, opts):
set_instance_vars_from_args()
super().__init__(opts)
def _next_cap_p(s):
for cap_p in pyshark.LiveCapture(s.iface).sniff_continuously():
yield cap_p
class MsgSource_Pipe(MsgSource):
def __init__(s, opts):
super().__init__(opts)
def _next_cap_p(s):
from pyshark.capture.pipe_capture import PipeCapture
for cap_p in PipeCapture(sys.stdin):
yield cap_p
def run_tests():
def out_test(*args, **kwargs):
print(*args, **kwargs)
d = dddict()
d.sset(('a', 'b', 'c'), 'abc')
d.sset(('a', 'b', 'd'), 'abd')
out_test(d)
assert d == {'a': {'b': {'c': 'abc', 'd': 'abd'}}}
def verify_gget(keys, expect):
val = d.gget(keys)
out_test('gget:', keys,'=',val)
assert val == expect
verify_gget(('a', 'b', 'c'), 'abc')
verify_gget(('a', 'b', 'd'), 'abd')
verify_gget(('a', 'b', 'x'), None)
verify_gget(('a', 'b'), {'c': 'abc', 'd': 'abd'})
verify_gget(('a',), {'b': {'c': 'abc', 'd': 'abd'}})
verify_gget(('x',), None)
def verify_ppop(keys, expect):
try:
val = d.ppop(keys)
assert expect is not None
except KeyError:
assert expect is None
out_test('ppop:', keys,'=',None)
return
out_test('ppop:', keys,'=',val)
assert val == expect
assert d.gget(keys) is None
verify_ppop(('a', 'b', 'c'), 'abc')
verify_ppop(('a', 'b', 'd'), 'abd')
verify_ppop(('a', 'b', 'x'), None)
d.sset(('a', 'b', 'c'), 'abc')
d.sset(('a', 'b', 'd'), 'abd')
verify_ppop(('a', 'b'), {'c': 'abc', 'd': 'abd'})
d.sset(('a', 'b', 'c'), 'abc')
d.sset(('a', 'b', 'd'), 'abd')
verify_ppop(('a',), {'b': {'c': 'abc', 'd': 'abd'}})
verify_ppop(('x',), None)
SUBSCRIBERFILTER_DOC = '''subscriberfilter examples, for --filter-subscr:
123
Show subscriber where any value matches 123 (probably only MSISDN will
match, because '123' is too short for IMSI etc).
imsi123456789012345
imei123456789012345
msisdn123456789012345
tmsi1234abcd
Show subscriber with the given IMSI/IMEI/MSISDN/TMSI.
imsi123456,imsi987654
Show both these IMSIs.
imsi123456,msisdn123,1234abcd,imei987654
Show all of these subscribers: IMSI 123456, MSISDN 123, TMSI 0x1234abcd
and IMEI 987654.
'''
def parse_args():
import argparse
parser = argparse.ArgumentParser(description=__doc__
+ '\n' + SUBSCRIBERFILTER_DOC
+'\n' + MessageFilter.DOC,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('--pcap-file', '-f', metavar='file')
parser.add_argument('--live-capture', '-l', metavar='interface')
parser.add_argument('--stdin-capture', '-i', action='store_true')
parser.add_argument('--packet-start', '-S', default=0, type=int)
parser.add_argument('--packet-count', '-C', default=0, type=int)
parser.add_argument('--packet-end', '-E', default=0, type=int)
parser.add_argument('--ui', '-u', metavar='USER-INTERFACE',
help='How to display messages: plain / p = print to stdout as plain-text;'
' curses / c = interactive curses interface; none / n = quiet',
default='plain')
parser.add_argument('--filter-subscr', default=None,
help='Show only messages related to the given subscriberfilter')
parser.add_argument('--filter-msg', default=None, metavar='messagefilter',
help='Show only messages matching this messagefilter')
parser.add_argument('--show-traits', default=None)
parser.add_argument('--show-conns', default=None, help="'all' for all, or specific conn names (comma separated)")
parser.add_argument('--collapse-stp', '-s', default=None, help="BSSAP via STP may appear duplicated. '-s1' collapses the duplicates, -s0 does not.")
parser.add_argument('--test', action='store_true')
parser.add_argument('--debug', metavar='messagefilter',
help='Show a lot more info on messages matching this messagefilter')
return parser.parse_args()
def main():
opts = parse_args()
if opts.test:
run_tests()
else:
if (opts.collapse_stp or '').upper() in ['1', 'Y', 'YES', 'TRUE']:
SCCP_COLLAPSE_STP = True
if (opts.collapse_stp or '').upper() in ['0', 'N', 'NO', 'FALSE']:
SCCP_COLLAPSE_STP = False
ui_class = None
ui_type = opts.ui or 'none'
if 'plain'.startswith(ui_type):
ui_class = UI_Plain
elif 'curses'.startswith(ui_type):
ui_class = UI_Curses
elif 'none'.startswith(ui_type):
ui_class = UI_Quiet
else:
ERR('Unknown UI type:', repr(ui_type))
return 1
g_ui = ui_class(opts)
msg_source = None
if opts.pcap_file:
msg_source = MsgSource_File(opts.pcap_file, opts)
elif opts.live_capture:
msg_source = MsgSource_Live(opts.live_capture, opts)
elif opts.stdin_capture:
msg_source = MsgSource_Pipe(opts)
else:
ERR('No message source, try `-l any` or `-f my.pcap`')
return 1
g_ui.process_messages(msg_source)
g_ui.flush()
if Conn.open_conns:
print('still open conns:', repr(Conn.open_conns))
return 0
if __name__ == '__main__':
if False:
import cProfile
cProfile.run('main()', sort='tottime')
else:
exit(main())
# vim: noexpandtab tabstop=8 shiftwidth=8