msc_log_to_ladder.py: various tweaks

(multiple changes in one patch because who cares about this script)

tweak regexes -- they worked ok, but some of the '[^:]' should really be
'[^:)]', and they also look happier that way.

don't skip RAN=NONE, so we also see messages before Complete Layer 3.

s/sip/mncc, to generally be valid for both internal and external MNCC.

pick up RTP port information from MGCP OK

pick up RTP port information from MNCC rx and tx

add --verbose flag, to be able to check whether the regex rules are
still working (getting any hits).

fix rule_imsi_detach: should return True to be counted in --verbose.

tweak comment 'Generated by...' to include the full git path.

Change-Id: If619182ba76c6b238a1fa105a3c3449d7f473dd1
This commit is contained in:
Neels Hofmeyr 2023-03-08 00:53:47 +01:00
parent 58f408839c
commit 075ca60b06
1 changed files with 47 additions and 18 deletions

View File

@ -53,7 +53,7 @@ UE = 'ms' #'ue'
MS_UE_UNKNOWN = 'ms' #None
MSC = 'msc'
MGW = 'mgw'
SIP = 'sip'
MNCC = 'mncc'
MO = 'mo'
MT = 'mt'
@ -69,7 +69,7 @@ class OutputBase:
self.start_with_re = re.compile(start_with_re)
def head(self):
self.writeln('# Generated by msc_log_to_ladder.py')
self.writeln('# Generated by osmo-msc.git/doc/sequence_charts/msc_log_to_ladder.py')
def tail(self):
pass
@ -129,7 +129,7 @@ class OutputLadder(OutputBase):
mo_mt = arrow.mo_mt or MO
def prepend_mo_mt(name):
if name in ('.', SIP):
if name in ('.', MNCC):
return name
return '%s%s' % (mo_mt, name)
@ -222,7 +222,7 @@ class OutputMscgen(OutputBase):
mo_mt = arrow.mo_mt or MO
def prepend_mo_mt(name):
if name in ('.', SIP):
if name in ('.', MNCC):
return name
return '%s%s' % (mo_mt, name)
@ -371,6 +371,7 @@ class Parse:
self.output = output
self.linenr = 0
self.rules = []
self.rules_hit = {}
self.seen_udtrace_mncc = False
self.callrefs_mo_mt = {}
@ -390,6 +391,7 @@ class Parse:
re_str = docstr.splitlines()[0]
self.rules.append(Rule(name=member, re_str=re_str, handler=func))
self.rules_hit[member] = 0
@ -403,7 +405,7 @@ class Parse:
#('moue', 'UE,hNodeB (MO)'),
('momgw', 'MGW for MSC (MO)'),
('momsc', 'MSC (MO)'),
('sip', 'MNCC to PBX via\nosmo-sip-connector'),
('mncc', 'MNCC'),
('mtmsc', 'MSC (MT)'),
('mtmgw', 'MGW for MSC (MT)'),
('mtms', 'BSS,MS (MT)\\nhNodeB,UE (MT)'),
@ -448,6 +450,7 @@ class Parse:
for rule in self.rules:
if rule.match(line):
self.rules_hit[rule.name] = self.rules_hit.get(rule.name, 0) + 1
break
RE_DTAP_NAME = re.compile('.*GSM48_MT_([^_]+)_(.+)')
@ -473,7 +476,8 @@ class Parse:
dtap = '%s %s' % m.groups()
if 'IMSI_DETACH_IND' in dtap:
# detecting IMSI Detach separately
# detecting IMSI Detach separately, because this log line does not contain the IMSI.
# By using the rule_imsi_detach(), we can accurately put it on the MO/MT side.
return True
if l3type == 'NONE' and not tx and dtap.endswith('PAG_RESP'):
@ -497,6 +501,7 @@ class Parse:
imsi = m.group(1)
e = MO_MT_UNKNOWN
self.diagram.add_line(Arrow(e, MS_UE_UNKNOWN, '>', MSC, 'IMSI Detach', imsi=imsi))
return True
def rule_mgcp_tx(self, m):
r'.*mgw-endp\([^)]*:([^:]+):([^:]+)\).* (rtpbridge[^ ]+) .* RTP_TO_(RAN|CN)( CI=([^:]+)|): ([^ :]+).*: Sending'
@ -512,19 +517,20 @@ class Parse:
return True
def rule_mgcp_rx(self, m):
r'.*mgw-endp\([^)]*:([^:]+):([^:]+)\).* (rtpbridge[^ ]+) .* RTP_TO_(RAN|CN)( CI=([^:]+)|).*: received successful response to ([^:]+): (.*)'
ran, l3type, endp, rtp_to, cond_ci, ci, verb, details = m.groups()
r'.*mgw-endp\(([^)]+):([^:)]+):([^:)]+)\).* (rtpbridge[^ ]+) .* RTP_TO_(RAN|CN)( CI=([^:]+)|).*: received successful response to ([^:]+): RTP=[^:]+:([0-9.:]+)'
subscr, ran_conn, l3type, endp, rtp_to, cond_ci, ci, verb, rtp = m.groups()
e = mo_mt_from_l3type(l3type)
endp = self.mask_value('EP', endp)
ci = self.mask_value('CI', ci)
ci_str = ''
if ci:
ci_str = ' %s' % ci
self.diagram.add_line(Arrow(e, MGW, '>', MSC, 'for %s: %s OK\\n%s%s' % (rtp_to, verb, endp, ci_str)))
rtp = self.mask_value('IP:port', rtp)
self.diagram.add_line(Arrow(e, MGW, '>', MSC, 'for %s: %s OK\\n%s%s %s' % (rtp_to, verb, endp, ci_str, rtp)))
return True
def rule_ran_tx(self, m):
r'.*msc_a\(([^)]*):([^:]+):([^:]+)\).* RAN encode: ([^: ]+): (.+)$'
r'.*msc_a\(([^)]+):([^:)]+):([^:)]+)\).* RAN encode: ([^: ]+): (.+)$'
subscr, ran_conn, l3type, ran_type, msg_type = m.groups()
@ -538,8 +544,6 @@ class Parse:
# skip 'RAB Assignment: rab_id=1, rtp=192.168.178.66:50008, use_x213_nsap=1'
return True
if l3type == 'NONE':
return True
e = mo_mt_from_l3type(l3type)
@ -555,7 +559,7 @@ class Parse:
return True
def rule_ran_rx(self, m):
r'.*msc_a\(([^)]*):([^:]+):([^:]+)\).* RAN decode: ([^: ]+) (.+)$'
r'.*msc_a\(([^)]+):([^:)]+):([^:)]+)\).* RAN decode: ([^: ]+) (.+)$'
subscr, ran_conn, l3type, ran_type, msg_type = m.groups()
@ -563,8 +567,6 @@ class Parse:
# will get DTAP details from rule_dtap() instead, not from BSSMAP logging
return True
if l3type == 'NONE':
return True
e = mo_mt_from_l3type(l3type)
@ -589,7 +591,7 @@ class Parse:
self.diagram.add_line(Arrow(e, MSC, '<>', '.', 'CC state:\\n%s' % to_state))
return True
def rule_log_mncc(self, m):
def rule_log_mncc_no_rtp(self, m):
r'.*trans\(CC[^) ]* [^ )]+:([^:)]+) callref-([^ ]+) [^)]+\) (tx|rx) (MNCC_[^ ]*)$'
l3type, callref_hex, tx_rx, mncc_msg = m.groups()
@ -605,12 +607,34 @@ class Parse:
except:
e = MT
self.diagram.add_line(Arrow(e, MSC, '>' if tx else '<', 'sip', mncc_msg))
self.diagram.add_line(Arrow(e, MSC, '>' if tx else '<', 'mncc', mncc_msg))
return True
def rule_log_mncc_with_rtp(self, m):
r'.*trans\(CC[^) ]* [^ )]+:([^:)]+) callref-([^ ]+) [^)]+\) (tx|rx) (MNCC_[^ ]*) \(RTP=([^){]+)(|{.*})\)$'
l3type, callref_hex, tx_rx, mncc_msg, rtp, codec = m.groups()
if self.seen_udtrace_mncc:
# If no udtrace is present, take the MNCC logging.
# But if there is udtrace logging available, we should not duplicate those MNCC lines.
return True
tx = (tx_rx == 'tx')
try:
e = self.callrefs_mo_mt.get(callref_hex, MT)
except:
e = MT
rtp = self.mask_value('IP:port', rtp)
self.diagram.add_line(Arrow(e, MSC, '>' if tx else '<', 'mncc', f'{mncc_msg}\\n{rtp}'))
return True
RE_MNCC_RTP = re.compile(' ip := ([^, ]+), rtp_port := ([0-9]+),')
RE_MNCC_CALLREF = re.compile(' callref := ([^ ,]+), ')
# detecting MNCC with udtrace has the advantage that we also get an indication whether RTP information is
# present
def rule_udtrace_mncc(self, m):
r'.*(write|recv).* (Tx|Rx): \{ msg_type := ([^ ]+) .* u := \{ (.*) \} \}$'
write_recv, tx_rx, msg_type, u = m.groups()
@ -650,7 +674,7 @@ class Parse:
descr = '%s\\n%s' % (descr, rtp_info)
break
self.diagram.add_line(Arrow(e, MSC, '>' if tx else '<', 'sip', descr))
self.diagram.add_line(Arrow(e, MSC, '>' if tx else '<', 'mncc', descr))
return True
def rule_cc_timer(self, m):
@ -680,6 +704,9 @@ def translate(inf, outf, cmdline):
break;
parse.add_line(line)
parse.end()
if cmdline.verbose:
for name, count in parse.rules_hit.items():
print(f" {name}: {count}")
def open_output(inf, cmdline):
if cmdline.output_file == '-':
@ -716,6 +743,8 @@ if __name__ == '__main__':
' present. This makes the output reproducible across various logs.')
parser.add_argument('-s', '--start-with', dest='start_with_re', default=None,
help='Skip until the first message with this label (regex), e.g. -s "CC SETUP"')
parser.add_argument('-v', '--verbose', dest='verbose', action='store_true',
help='show some debug info, like which regex rules were hit and which were not.')
cmdline = parser.parse_args()