libosmocore/contrib/fsm-to-dot.py

829 lines
24 KiB
Python
Executable File

#!/usr/bin/env python
__doc__ = '''
fsm-to-dot: convert FSM definitons to graph images
Usage:
./fsm-to-dot.py ~/openbsc/openbsc/src/libvlr/*.c
for f in *.dot ; do dot -Tpng "$f" > "$f.png"; done
# dot comes from 'apt-get install graphviz'
Looks for osmo_fsm finite state machine definitions and madly parses .c files
to draw graphs of them. This uses wild regexes that rely on coding style etc..
No proper C parsing is done here (pycparser sucked, unfortunately).
'''
import sys, re, os
if '-h' in sys.argv or '--help' in sys.argv:
print(__doc__)
exit(0)
def err(msg):
sys.stderr.write(msg + '\n')
class listdict(object):
def __getattr__(ld, name):
if name == 'add':
return ld.__getattribute__(name)
return ld.__dict__.__getattribute__(name)
def _have(ld, name):
l = ld.__dict__.get(name)
if not l:
l = []
ld.__dict__[name] = l
return l
def add(ld, name, item):
l = ld._have(name)
l.append(item)
return ld
def add_dict(ld, d):
for k,v in d.items():
ld.add(k, v)
def __setitem__(ld, name, val):
return ld.__dict__.__setitem__(name, val)
def __getitem__(ld, name):
return ld.__dict__.__getitem__(name)
def __str__(ld):
return ld.__dict__.__str__()
def __repr__(ld):
return ld.__dict__.__repr__()
def update(ld, other_ld):
for name, items in other_ld.items():
ld.extend(name, items)
return ld
def extend(ld, name, vals):
l = ld._have(name)
l.extend(vals)
return ld
re_state_start = re.compile(r'\[([A-Z_][A-Z_0-9]*)\]')
re_event_alternatives = [
re.compile(r'\(1 *<< *([A-Z_][A-Z_0-9]*)\)'),
re.compile(r'S\(([A-Z_][A-Z_0-9]*)\)'),
]
re_action = re.compile(r'.action *= *([a-z_][a-z_0-9]*)')
re_insane_dot_name_chars = re.compile('[^a-zA-Z_]')
def state_starts(line):
m = re_state_start.search(line)
if m:
return m.group(1)
return None
def in_event_starts(line):
return line.find('in_event_mask') >= 0
def out_state_starts(line):
return line.find('out_state_mask') >= 0
def states_or_events(line):
results = []
for one_re in re_event_alternatives:
results.extend(one_re.findall(line))
return results
def parse_action(line):
a = re_action.findall(line)
if a:
return a[0]
return None
def _common_prefix(a, b):
for l in reversed(range(1,len(a))):
aa = a[:l+1]
if b.startswith(aa):
return aa
return ''
def common_prefix(strs):
if not strs:
return ''
p = None
for s in strs:
if p is None:
p = s
continue
p = _common_prefix(p, s)
if not p:
return ''
return p
KIND_STATE = 'KIND_STATE'
KIND_FUNC = 'KIND_FUNC'
KIND_FSM = 'KIND_FSM'
BOX_SHAPES = {
KIND_STATE : None,
KIND_FUNC : 'box',
KIND_FSM : 'box3d',
}
class Event:
def __init__(event, name):
event.name = name
event.short_name = name
def __cmp__(event, other):
return cmp(event.name, other.name)
class Edge:
def __init__(edge, to_state, event_name=None, style=None, action=None, color=None, arrow_head=None):
edge.to_state = to_state
edge.style = style
edge.color = color
edge.arrow_head = arrow_head
edge.events = []
edge.actions = []
edge.add_event_name(event_name)
edge.add_action(action)
def add_event_name(edge, event_name):
if not event_name:
return
edge.add_event(Event(event_name))
def add_event(edge, event):
if not event:
return
if event in edge.events:
return
edge.events.append(event)
def add_events(edge, events):
for event in events:
edge.add_event(event)
def add_action(edge, action):
if not action or action in edge.actions:
return
edge.actions.append(action)
def add_actions(edge, actions):
for action in actions:
edge.add_action(action)
def event_names(edge):
return sorted([event.name for event in edge.events])
def event_labels(edge):
return sorted([event.short_name for event in edge.events])
def action_labels(edge):
return sorted([action + '()' for action in edge.actions])
def has_event_name(edge, event_name):
return event_name in edge.event_names()
class State:
name = None
short_name = None
action = None
label = None
in_event_names = None
out_state_names = None
out_edges = None
kind = None
color = None
def __init__(state):
state.in_event_names = []
state.out_state_names = []
state.out_edges = []
state.kind = KIND_STATE
def add_out_edge(state, edge):
for out_edge in state.out_edges:
if out_edge.to_state is edge.to_state:
if out_edge.style == edge.style:
out_edge.add_events(edge.events)
out_edge.add_actions(edge.actions)
return
elif out_edge.to_state.get_label() == edge.to_state.get_label():
# sanity: there already is an edge to a state that a) is not identical to the target state of the
# newly added edge but b) has the same label.
raise Exception('Two distinct states exist with identical label: %r: states %r and %r.'
% (out_edge.to_state.get_label(), out_edge.to_state, edge.to_state))
state.out_edges.append(edge)
def get_label(state):
if state.label:
return state.label
l = [state.short_name]
if state.action:
if state.short_name == state.action:
l = []
l.append(state.action + '()')
return r'\n'.join(l)
def event_names(state):
event_names = []
for out_edge in state.out_edges:
event_names.extend(out_edge.event_names())
return event_names
def shape_str(state):
shape = BOX_SHAPES.get(state.kind, None)
if not shape:
return ''
return ',shape=%s' % shape
def color_str(state):
if state.color is None:
return ''
return ',color="%s"' % state.color
def __repr__(state):
return 'State(name=%r,short_name=%r,out=%d)' % (state.name, state.short_name, len(state.out_edges))
class Fsm:
def __init__(fsm, struct_name, string_name, states_struct_name, from_file=None):
fsm.states = []
fsm.struct_name = struct_name
fsm.string_name = string_name
fsm.states_struct_name = states_struct_name
fsm.from_file = from_file
fsm.action_funcs = set()
fsm.event_names = set()
fsm.dot_name = fsm.all_names_sanitized()
def __repr__(fsm):
return str(fsm)
def __str__(fsm):
return 'Fsm(%r,%r)' % (fsm.struct_name, fsm.from_file)
def parse_states(fsm, src, c_file):
state = None
started = None
IN_EVENTS = 'events'
OUT_STATES = 'states'
lines = src.splitlines()
for line in lines:
state_name = state_starts(line)
if state_name:
state = fsm.find_state_by_name(state_name)
if state is not None:
if c_file is fsm.from_file:
print('ERROR: fsm %r has multiple definitions of state %r' % (fsm, state_name))
else:
print('ERROR: it appears two FSMs with identical name %r exist in %r and %r'
% (fsm.struct_name, fsm.from_file, c_file))
state = None
continue
state = State()
fsm.states.append(state)
started = None
state.name = state_name
if in_event_starts(line):
started = IN_EVENTS
if out_state_starts(line):
started = OUT_STATES
if not state or not started:
continue
tokens = states_or_events(line)
if started == IN_EVENTS:
state.in_event_names.extend(tokens)
elif started == OUT_STATES:
state.out_state_names.extend(tokens)
else:
err('ignoring: %r' % tokens)
a = parse_action(line)
if a:
state.action = a
for state in fsm.states:
if state.action:
fsm.action_funcs.add(state.action)
if state.in_event_names:
fsm.event_names.update(state.in_event_names)
fsm.make_states_short_names()
fsm.ref_out_states()
def make_states_short_names(fsm):
p = common_prefix([s.name for s in fsm.states])
for s in fsm.states:
s.short_name = s.name[len(p):]
return p
def make_events_short_names(fsm):
p = common_prefix(fsm.event_names)
for state in fsm.states:
for edge in state.out_edges:
for event in edge.events:
event.short_name = event.name[len(p):]
def ref_out_states(fsm):
for state in fsm.states:
for out_state_name in state.out_state_names:
out_state = fsm.find_state_by_name(out_state_name, False)
if out_state is None:
print('ERROR: fsm %r has a transition to state not part of the FSM: %r'
% (fsm, out_state_name))
out_state = fsm.have_state(out_state_name, KIND_STATE, color='red')
state.add_out_edge(Edge(out_state))
def find_state_by_name(fsm, name, strict=False):
for state in fsm.states:
if state.name == name:
return state
if strict:
raise Exception("State not found: %r" % name);
return None
def find_state_by_action(fsm, action):
for state in fsm.states:
if state.action == action:
return state
return None
def add_special_state(fsm, additional_states, name, in_state=None,
out_state=None, event_name=None, kind=KIND_FUNC,
state_action=None, label=None, edge_action=None,
style='dotted', arrow_head=None):
additional_state = None
for s in additional_states:
if s.short_name == name:
additional_state = s
break;
if not additional_state:
for s in fsm.states:
if s.short_name == name:
additional_state = s
break;
if kind == KIND_FUNC and not state_action:
state_action = name
if not additional_state:
additional_state = State()
additional_state.short_name = name
additional_state.action = state_action
additional_state.kind = kind
additional_state.label = label
additional_states.append(additional_state)
if out_state:
additional_state.out_state_names.append(out_state.name)
additional_state.add_out_edge(Edge(out_state, event_name, style=style,
action=edge_action, arrow_head=arrow_head))
if in_state:
in_state.out_state_names.append(additional_state.name)
in_state.add_out_edge(Edge(additional_state, event_name, style=style,
action=edge_action, arrow_head=arrow_head))
def find_event_edges(fsm, c_files):
# enrich state transitions between the states with event labels
func_to_state_transitions = listdict()
for c_file in c_files:
func_to_state_transitions.update( c_file.find_state_transitions(fsm.event_names) )
# edges between explicit states
for state in fsm.states:
transitions = func_to_state_transitions.get(state.action)
if not transitions:
continue
for to_state_name, event_name in transitions:
if not event_name:
continue
found = False
for out_edge in state.out_edges:
if out_edge.to_state.name == to_state_name:
out_edge.add_event_name(event_name)
found = True
if not found:
sys.stderr.write(
"ERROR: %s() triggers a transition to %s, but this is not allowed by the FSM definition\n"
% (state.action, to_state_name))
state.add_out_edge(Edge(fsm.find_state_by_name(to_state_name, True), event_name,
color='red'))
additional_states = []
# functions that aren't state actions but still effect state transitions
for func_name, transitions in func_to_state_transitions.items():
if func_name in fsm.action_funcs:
continue
for to_state_name, event_name in transitions:
to_state = fsm.find_state_by_name(to_state_name)
if not to_state:
continue
fsm.add_special_state(additional_states, func_name, None, to_state, event_name)
event_sources = c_files.find_event_sources(fsm.event_names)
for state in fsm.states:
for in_event_name in state.in_event_names:
funcs_for_in_event = event_sources.get(in_event_name)
if not funcs_for_in_event:
continue
found = False
for out_edge in state.out_edges:
if out_edge.has_event_name(in_event_name):
out_edge.action = r'\n'.join([(f + '()') for f in funcs_for_in_event
if f != state.action])
# if any functions that don't belong to a state trigger events, add
# them to the graph as well
additional_funcs = [f for f in funcs_for_in_event if f not in fsm.action_funcs]
for af in additional_funcs:
fsm.add_special_state(additional_states, af, None, state, in_event_name,
arrow_head='halfopen')
fsm.states.extend(additional_states)
# do any existing action functions by chance call other action functions?
for state in fsm.states:
if not state.action:
continue
callers = c_files.find_callers(state.action)
if not callers:
continue
for other_state in fsm.states:
if other_state.action in callers:
other_state.add_out_edge(Edge(state, None, 'dotted'))
def add_fsm_alloc(fsm, c_files):
allocating_funcs = []
for c_file in c_files:
allocating_funcs.extend(c_file.fsm_allocators.get(fsm.struct_name, []))
starting_state = None
if fsm.states:
# assume the first state starts
starting_state = fsm.states[0]
additional_states = []
for func_name in allocating_funcs:
fsm.add_special_state(additional_states, func_name, None, starting_state)
fsm.states.extend(additional_states)
def add_cross_fsm_links(fsm, fsms, c_files, fsm_meta):
for state in fsm.states:
if not state.action:
continue
if state.kind == KIND_FSM:
continue
callers = c_files.find_callers(state.action)
if state.kind == KIND_FUNC:
callers.append(state.action)
if not callers:
continue
for caller in callers:
for calling_fsm in fsms:
if calling_fsm is fsm:
continue
calling_state = calling_fsm.find_state_by_action(caller)
if not calling_state:
continue
if calling_state.kind == KIND_FSM:
continue
label = None
if state.kind == KIND_STATE:
label=fsm.struct_name + ': ' + state.short_name
edge_action = caller
if calling_state.action == edge_action:
edge_action = None
calling_fsm.add_special_state(calling_fsm.states, fsm.dot_name,
calling_state, kind=KIND_FSM, edge_action=edge_action, label=' '.join(fsm.all_names()))
label = None
if calling_state.kind == KIND_STATE:
label=calling_fsm.struct_name + ': ' + calling_state.short_name
edge_action = caller
if state.action == edge_action:
edge_action = None
fsm.add_special_state(fsm.states, calling_fsm.dot_name, None,
state, kind=KIND_FSM, edge_action=edge_action,
label=label)
# meta overview
meta_called_fsm = fsm_meta.have_state(fsm.dot_name, KIND_FSM)
meta_calling_fsm = fsm_meta.have_state(calling_fsm.dot_name, KIND_FSM)
meta_calling_fsm.add_out_edge(Edge(meta_called_fsm))
def have_state(fsm, name, kind=KIND_STATE, color=None):
state = fsm.find_state_by_name(name)
if not state:
state = State()
state.name = name
state.short_name = name
state.kind = kind
state.color = color
fsm.states.append(state)
return state
def to_dot(fsm):
out = ['digraph G {', 'rankdir=LR;']
for state in fsm.states:
out.append('%s [label="%s"%s%s]' % (state.short_name, state.get_label(),
state.shape_str(), state.color_str()))
for state in fsm.states:
for out_edge in state.out_edges:
attrs = []
labels = []
if out_edge.events:
labels.extend(out_edge.event_labels())
if out_edge.actions:
labels.extend(out_edge.action_labels())
if labels:
label = r'\n'.join(labels)
else:
label = '-'
attrs.append('label="%s"' % label)
if out_edge.style:
attrs.append('style=%s'% out_edge.style)
if out_edge.color:
attrs.append('color=%s'% out_edge.color)
if out_edge.arrow_head:
attrs.append('arrowhead=%s'% out_edge.arrow_head)
attrs_str = ''
if attrs:
attrs_str = ' [%s]' % (','.join(attrs))
out.append('%s->%s%s' % (state.short_name, out_edge.to_state.short_name, attrs_str))
out.append('}\n')
return '\n'.join(out)
def all_names(fsm):
n = []
if fsm.from_file:
n.append(os.path.basename(fsm.from_file.path))
if fsm.struct_name:
n.append(fsm.struct_name)
if fsm.string_name:
n.append(fsm.string_name)
return n
def all_names_sanitized(fsm, sep='_'):
n = sep.join(fsm.all_names())
n = re_insane_dot_name_chars.sub('_', n)
return n
def write_dot_file(fsm):
dot_path = '%s.dot' % ('_'.join(fsm.all_names()))
f = open(dot_path, 'w')
f.write(fsm.to_dot())
f.close()
print(dot_path)
re_fsm = re.compile(r'struct osmo_fsm ([a-z_][a-z_0-9]*) =')
re_fsm_string_name = re.compile(r'\bname = "([^"]*)"')
re_fsm_states_struct_name = re.compile(r'\bstates = ([a-z_][a-z_0-9]*)\W*,')
re_fsm_states = re.compile(r'struct osmo_fsm_state ([a-z_][a-z_0-9]*)\[\] =')
re_func = re.compile(r'(\b[a-z_][a-z_0-9]*\b)\([^)]*\)\W*^{', re.MULTILINE)
re_state_trigger = re.compile(r'osmo_fsm_inst_state_chg\([^,]+,\W*([A-Z_][A-Z_0-9]*)\W*,', re.M)
re_fsm_alloc = re.compile(r'osmo_fsm_inst_alloc[_child]*\(\W*&([a-z_][a-z_0-9]*),', re.M)
re_fsm_event_dispatch = re.compile(r'osmo_fsm_inst_dispatch\(\W*[^,]+,\W*([A-Z_][A-Z_0-9]*)\W*,', re.M)
re_comment_multiline = re.compile(r'/\*.*?\*/', re.M | re.S)
re_comment_single_line = re.compile(r'//.*$', re.M | re.S)
re_break = re.compile(r'^\W*\bbreak;', re.M)
class CFile():
def __init__(c_file, path):
c_file.path = path
c_file.src = open(path).read()
c_file.funcs = {}
c_file.fsm_allocators = listdict()
def __repr__(c_file):
return str(c_file)
def __str__(c_file):
return 'CFile(%r)' % c_file.path
def extract_block(c_file, brace_open, brace_close, start):
pos = 0
try:
src = c_file.src
block_start = src.find(brace_open, start)
pos = block_start
level = 1
while level > 0:
pos += 1
if src[pos] == brace_open:
level += 1
elif src[pos] == brace_close:
level -= 1
return src[block_start+1:pos]
except:
print("Error while trying to extract a code block from %r char pos %d" % (c_file.path, pos))
print("Block start at char pos %d" % block_start)
try:
print(src[block_start - 20 : block_start + 20])
print('...')
print(src[pos - 20 : pos + 20])
except:
pass
return ''
def find_fsms(c_file):
fsms = []
for m in re_fsm.finditer(c_file.src):
struct_name = m.group(1)
struct_def = c_file.extract_block('{', '}', m.start())
string_name = (re_fsm_string_name.findall(struct_def) or [None])[0]
states_struct_name = re_fsm_states_struct_name.findall(struct_def)[0]
fsm = Fsm(struct_name, string_name, states_struct_name, c_file)
fsms.append(fsm)
return fsms
def find_fsm_states(c_file, fsms):
for m in re_fsm_states.finditer(c_file.src):
states_struct_name = m.group(1)
for fsm in fsms:
if states_struct_name == fsm.states_struct_name:
fsm.parse_states(c_file.extract_block('{', '}', m.start()), c_file)
def parse_functions(c_file):
funcs = {}
for m in re_func.finditer(c_file.src):
name = m.group(1)
func_src = c_file.extract_block('{', '}', m.start())
func_src = ''.join(re_comment_multiline.split(func_src))
func_src = ''.join(re_comment_single_line.split(func_src))
funcs[name] = func_src
c_file.funcs = funcs
c_file.find_fsm_allocators()
def find_callers(c_file, func_name):
func_call = func_name + '('
callers = []
for func_name, src in c_file.funcs.items():
if src.find(func_call) >= 0:
callers.append(func_name)
return callers
def find_fsm_allocators(c_file):
c_file.fsm_allocators = listdict()
for func_name, src in c_file.funcs.items():
for m in re_fsm_alloc.finditer(src):
fsm_struct_name = m.group(1)
c_file.fsm_allocators.add(fsm_struct_name, func_name)
def find_state_transitions(c_file, event_names):
TO_STATE = 'TO_STATE'
IF_EVENT = 'IF_EVENT'
CASE_EVENT = 'CASE_EVENT'
BREAK = 'BREAK'
func_to_state_transitions = listdict()
for func_name, src in c_file.funcs.items():
found_tokens = []
for m in re_state_trigger.finditer(src):
to_state = m.group(1)
found_tokens.append((m.start(), TO_STATE, to_state))
for event in event_names:
re_event = re.compile(r'\bif\w*\(.*\b(' + event + r')\b')
for m in re_event.finditer(src):
event = m.group(1)
found_tokens.append((m.start(), IF_EVENT, event))
re_event = re.compile(r'^\W*case\W\W*\b(' + event + r'):', re.M)
for m in re_event.finditer(src):
event = m.group(1)
found_tokens.append((m.start(), CASE_EVENT, event))
for m in re_break.finditer(src):
found_tokens.append((m.start(), BREAK, 'break'))
found_tokens = sorted(found_tokens)
last_events = []
saw_break = True
for start, kind, name in found_tokens:
if kind == IF_EVENT:
last_events = [name]
saw_break = True
elif kind == CASE_EVENT:
if saw_break:
last_events = []
saw_break = False
last_events.append(name)
elif kind == BREAK:
saw_break = True
elif kind == TO_STATE:
for event in (last_events or [None]):
func_to_state_transitions.add(func_name, (name, event))
return func_to_state_transitions
def find_event_sources(c_file, event_names):
c_file.event_sources = listdict()
for func_name, src in c_file.funcs.items():
for m in re_fsm_event_dispatch.finditer(src):
event_name = m.group(1)
c_file.event_sources.add(event_name, func_name)
class CFiles(list):
def find_callers(c_files, func_name):
callers = []
for c_file in c_files:
callers.extend(c_file.find_callers(func_name))
return callers
def find_func_to_state_transitions(c_files):
func_to_state_transitions = listdict()
for c_file in c_files:
func_to_state_transitions.update( c_file.find_state_transitions(fsm.event_names) )
return func_to_state_transitions
def find_event_sources(c_files, event_names):
event_sources = listdict()
for c_file in c_files:
for event, sources in c_file.event_sources.items():
if event in event_names:
event_sources.extend(event, sources)
return event_sources
c_files = CFiles()
paths_seen = set()
for path in sys.argv[1:]:
if path in paths_seen:
continue
paths_seen.add(path)
c_file = CFile(path)
c_files.append(c_file)
for c_file in c_files:
c_file.parse_functions()
fsms = []
for c_file in c_files:
fsms.extend(c_file.find_fsms())
for fsm1 in fsms:
for fsm2 in fsms:
if fsm1 is fsm2:
continue
if fsm1.states_struct_name == fsm2.states_struct_name:
print('ERROR: two distinct FSMs share the same states-struct name: %r and %r both use %r'
% (fsm1, fsm2, fsm1.states_struct_name))
for c_file in c_files:
c_file.find_fsm_states(fsms)
c_file.find_event_sources(fsms)
for fsm in fsms:
fsm.find_event_edges(c_files)
fsm.add_fsm_alloc(c_files)
fsm_meta = Fsm("meta", None, "meta")
for fsm in fsms:
fsm.add_cross_fsm_links(fsms, c_files, fsm_meta)
for fsm in fsms:
fsm.make_events_short_names()
for fsm in fsms:
fsm.write_dot_file()
fsm_meta.write_dot_file()
# vim: tabstop=2 shiftwidth=2 expandtab