contrib: add fsm-to-dot.py to draw osmo_fsm dotty graphs

Add a first version of a python script that tries to analyze .c source files to
draw graphs of osmo_fsm implementations. So far it uses quick-and-dirty
regexes.

Change-Id: I155f57a608d600f59aedfd27ef66eb9772c124e7
This commit is contained in:
Neels Hofmeyr 2016-11-16 14:36:29 +01:00 committed by Harald Welte
parent c7f52c4c84
commit 0898a007ba
1 changed files with 710 additions and 0 deletions

710
contrib/fsm-to-dot.py Executable file
View File

@ -0,0 +1,710 @@
#!/usr/bin/env python
__doc__ = '''
fsm-to-dot: convert FSM definitons to graph images
Usage:
./fsm-to-dot.py ~/openbsc/openbsc/src/libvlr/*.c
for f in *.dot ; do dot -Tpng $f > $f.png; done
# dot comes from 'apt-get install graphviz'
Looks for osmo_fsm finite state machine definitions and madly parses .c files
to draw graphs of them. This uses wild regexes that rely on coding style etc..
No proper C parsing is done here (pycparser sucked, unfortunately).
'''
import sys, re
def err(msg):
sys.stderr.write(msg + '\n')
class listdict(object):
def __getattr__(ld, name):
if name == 'add':
return ld.__getattribute__(name)
return ld.__dict__.__getattribute__(name)
def _have(ld, name):
l = ld.__dict__.get(name)
if not l:
l = []
ld.__dict__[name] = l
return l
def add(ld, name, item):
l = ld._have(name)
l.append(item)
return ld
def add_dict(ld, d):
for k,v in d.items():
ld.add(k, v)
def __setitem__(ld, name, val):
return ld.__dict__.__setitem__(name, val)
def __getitem__(ld, name):
return ld.__dict__.__getitem__(name)
def __str__(ld):
return ld.__dict__.__str__()
def __repr__(ld):
return ld.__dict__.__repr__()
def update(ld, other_ld):
for name, items in other_ld.items():
ld.extend(name, items)
return ld
def extend(ld, name, vals):
l = ld._have(name)
l.extend(vals)
return ld
re_state_start = re.compile(r'\[([A-Z_][A-Z_0-9]*)\]')
re_event = re.compile(r'S\(([A-Z_][A-Z_0-9]*)\)')
re_action = re.compile(r'.action *= *([a-z_][a-z_0-9]*)')
def state_starts(line):
m = re_state_start.search(line)
if m:
return m.group(1)
return None
def in_event_starts(line):
return line.find('in_event_mask') >= 0
def out_state_starts(line):
return line.find('out_state_mask') >= 0
def states_or_events(line):
return re_event.findall(line)
def parse_action(line):
a = re_action.findall(line)
if a:
return a[0]
return None
def _common_prefix(a, b):
for l in reversed(range(1,len(a))):
aa = a[:l+1]
if b.startswith(aa):
return aa
return ''
def common_prefix(strs):
if not strs:
return ''
p = None
for s in strs:
if p is None:
p = s
continue
p = _common_prefix(p, s)
if not p:
return ''
return p
KIND_STATE = 'KIND_STATE'
KIND_FUNC = 'KIND_FUNC'
KIND_FSM = 'KIND_FSM'
BOX_SHAPES = {
KIND_STATE : None,
KIND_FUNC : 'box',
KIND_FSM : 'box3d',
}
class Event:
def __init__(event, name):
event.name = name
event.short_name = name
def __cmp__(event, other):
return cmp(event.name, other.name)
class Edge:
def __init__(edge, to_state, event_name=None, style=None, action=None):
edge.to_state = to_state
edge.style = style
edge.events = []
edge.actions = []
edge.add_event_name(event_name)
edge.add_action(action)
def add_event_name(edge, event_name):
if not event_name:
return
edge.add_event(Event(event_name))
def add_event(edge, event):
if not event:
return
if event in edge.events:
return
edge.events.append(event)
def add_events(edge, events):
for event in events:
edge.add_event(event)
def add_action(edge, action):
if not action or action in edge.actions:
return
edge.actions.append(action)
def add_actions(edge, actions):
for action in actions:
edge.add_action(action)
def event_names(edge):
return sorted([event.name for event in edge.events])
def event_labels(edge):
return sorted([event.short_name for event in edge.events])
def action_labels(edge):
return sorted([action + '()' for action in edge.actions])
def has_event_name(edge, event_name):
return event_name in edge.event_names()
class State:
name = None
short_name = None
action = None
label = None
in_event_names = None
out_state_names = None
out_edges = None
kind = None
def __init__(state):
state.in_event_names = []
state.out_state_names = []
state.out_edges = []
state.kind = KIND_STATE
def add_out_edge(state, edge):
for out_edge in state.out_edges:
if out_edge.to_state is edge.to_state:
if out_edge.style == edge.style:
out_edge.add_events(edge.events)
out_edge.add_actions(edge.actions)
return
# sanity
if out_edge.to_state.get_label() == edge.to_state.get_label():
raise Exception('Two distinct states exist with identical labels.')
state.out_edges.append(edge)
def get_label(state):
if state.label:
return state.label
l = [state.short_name]
if state.action:
if state.short_name == state.action:
l = []
l.append(state.action + '()')
return r'\n'.join(l)
def event_names(state):
event_names = []
for out_edge in state.out_edges:
event_names.extend(out_edge.event_names())
return event_names
def shape_str(state):
shape = BOX_SHAPES.get(state.kind, None)
if not shape:
return ''
return ',shape=%s' % shape
def __repr__(state):
return 'State(name=%r,short_name=%r,out=%d)' % (state.name, state.short_name, len(state.out_edges))
class Fsm:
def __init__(fsm, struct_name, states_struct_name, from_file=None):
fsm.states = []
fsm.struct_name = struct_name
fsm.states_struct_name = states_struct_name
fsm.from_file = from_file
fsm.action_funcs = set()
fsm.event_names = set()
def parse_states(fsm, src):
state = None
started = None
IN_EVENTS = 'events'
OUT_STATES = 'states'
lines = src.splitlines()
for line in lines:
state_name = state_starts(line)
if state_name:
state = State()
fsm.states.append(state)
started = None
state.name = state_name
if in_event_starts(line):
started = IN_EVENTS
if out_state_starts(line):
started = OUT_STATES
if not state or not started:
continue
tokens = states_or_events(line)
if started == IN_EVENTS:
state.in_event_names.extend(tokens)
elif started == OUT_STATES:
state.out_state_names.extend(tokens)
else:
err('ignoring: %r' % tokens)
a = parse_action(line)
if a:
state.action = a
for state in fsm.states:
if state.action:
fsm.action_funcs.add(state.action)
if state.in_event_names:
fsm.event_names.update(state.in_event_names)
fsm.make_states_short_names()
fsm.ref_out_states()
def make_states_short_names(fsm):
p = common_prefix([s.name for s in fsm.states])
for s in fsm.states:
s.short_name = s.name[len(p):]
return p
def make_events_short_names(fsm):
p = common_prefix(fsm.event_names)
for state in fsm.states:
for edge in state.out_edges:
for event in edge.events:
event.short_name = event.name[len(p):]
def ref_out_states(fsm):
for state in fsm.states:
for e in [Edge(fsm.find_state_by_name(n, True)) for n in state.out_state_names]:
state.add_out_edge(e)
def find_state_by_name(fsm, name, strict=False):
for state in fsm.states:
if state.name == name:
return state
if strict:
raise Exception("State not found: %r" % name);
return None
def find_state_by_action(fsm, action):
for state in fsm.states:
if state.action == action:
return state
return None
def add_special_state(fsm, additional_states, name, in_state=None,
out_state=None, event_name=None, kind=KIND_FUNC,
state_action=None, label=None, edge_action=None):
additional_state = None
for s in additional_states:
if s.short_name == name:
additional_state = s
break;
if not additional_state:
for s in fsm.states:
if s.short_name == name:
additional_state = s
break;
if kind == KIND_FUNC and not state_action:
state_action = name
if not additional_state:
additional_state = State()
additional_state.short_name = name
additional_state.action = state_action
additional_state.kind = kind
additional_state.label = label
additional_states.append(additional_state)
if out_state:
additional_state.out_state_names.append(out_state.name)
additional_state.add_out_edge(Edge(out_state, event_name, 'dotted',
action=edge_action))
if in_state:
in_state.out_state_names.append(additional_state.name)
in_state.add_out_edge(Edge(additional_state, event_name, 'dotted',
action=edge_action))
def find_event_edges(fsm, c_files):
# enrich state transitions between the states with event labels
func_to_state_transitions = listdict()
for c_file in c_files:
func_to_state_transitions.update( c_file.find_state_transitions(fsm.event_names) )
# edges between explicit states
for state in fsm.states:
transitions = func_to_state_transitions.get(state.action)
if not transitions:
continue
for to_state_name, event_name in transitions:
if not event_name:
continue
for out_edge in state.out_edges:
if out_edge.to_state.name == to_state_name:
out_edge.add_event_name(event_name)
additional_states = []
# functions that aren't state actions but still effect state transitions
for func_name, transitions in func_to_state_transitions.items():
if func_name in fsm.action_funcs:
continue
for to_state_name, event_name in transitions:
to_state = fsm.find_state_by_name(to_state_name)
if not to_state:
continue
fsm.add_special_state(additional_states, func_name, None, to_state, event_name)
event_sources = c_files.find_event_sources(fsm.event_names)
for state in fsm.states:
for in_event_name in state.in_event_names:
funcs_for_in_event = event_sources.get(in_event_name)
if not funcs_for_in_event:
continue
found = False
for out_edge in state.out_edges:
if out_edge.has_event_name(in_event_name):
out_edge.action = r'\n'.join([(f + '()') for f in funcs_for_in_event
if f != state.action])
# if any functions that don't belong to a state trigger events, add
# them to the graph as well
additional_funcs = [f for f in funcs_for_in_event if f not in fsm.action_funcs]
for af in additional_funcs:
fsm.add_special_state(additional_states, af, None, state, in_event_name)
fsm.states.extend(additional_states)
# do any existing action functions by chance call other action functions?
for state in fsm.states:
if not state.action:
continue
callers = c_files.find_callers(state.action)
if not callers:
continue
for other_state in fsm.states:
if other_state.action in callers:
other_state.add_out_edge(Edge(state, None, 'dotted'))
def add_fsm_alloc(fsm, c_files):
allocating_funcs = []
for c_file in c_files:
allocating_funcs.extend(c_file.fsm_allocators.get(fsm.struct_name, []))
starting_state = None
if fsm.states:
# assume the first state starts
starting_state = fsm.states[0]
additional_states = []
for func_name in allocating_funcs:
fsm.add_special_state(additional_states, func_name, None, starting_state)
fsm.states.extend(additional_states)
def add_cross_fsm_links(fsm, fsms, c_files, fsm_meta):
for state in fsm.states:
if not state.action:
continue
if state.kind == KIND_FSM:
continue
callers = c_files.find_callers(state.action)
if state.kind == KIND_FUNC:
callers.append(state.action)
if not callers:
continue
for caller in callers:
for calling_fsm in fsms:
if calling_fsm is fsm:
continue
calling_state = calling_fsm.find_state_by_action(caller)
if not calling_state:
continue
if calling_state.kind == KIND_FSM:
continue
label = None
if state.kind == KIND_STATE:
label=fsm.struct_name + ': ' + state.short_name
edge_action = caller
if calling_state.action == edge_action:
edge_action = None
calling_fsm.add_special_state(calling_fsm.states, fsm.struct_name,
calling_state, kind=KIND_FSM, edge_action=edge_action, label=label)
label = None
if calling_state.kind == KIND_STATE:
label=calling_fsm.struct_name + ': ' + calling_state.short_name
edge_action = caller
if state.action == edge_action:
edge_action = None
fsm.add_special_state(fsm.states, calling_fsm.struct_name, None,
state, kind=KIND_FSM, edge_action=edge_action,
label=label)
# meta overview
meta_called_fsm = fsm_meta.have_state(fsm.struct_name, KIND_FSM)
meta_calling_fsm = fsm_meta.have_state(calling_fsm.struct_name, KIND_FSM)
meta_calling_fsm.add_out_edge(Edge(meta_called_fsm))
def have_state(fsm, name, kind=KIND_STATE):
state = fsm.find_state_by_name(name)
if not state:
state = State()
state.name = name
state.short_name = name
state.kind = kind
fsm.states.append(state)
return state
def to_dot(fsm):
out = ['digraph G {', 'rankdir=LR;']
for state in fsm.states:
out.append('%s [label="%s"%s]' % (state.short_name, state.get_label(),
state.shape_str()))
for state in fsm.states:
for out_edge in state.out_edges:
attrs = []
labels = []
if out_edge.events:
labels.extend(out_edge.event_labels())
if out_edge.actions:
labels.extend(out_edge.action_labels())
if labels:
attrs.append('label="%s"' % (r'\n'.join(labels)))
if out_edge.style:
attrs.append('style=%s'% out_edge.style)
attrs_str = ''
if attrs:
attrs_str = ' [%s]' % (','.join(attrs))
out.append('%s->%s%s' % (state.short_name, out_edge.to_state.short_name, attrs_str))
out.append('}\n')
return '\n'.join(out)
def write_dot_file(fsm):
dot_path = '%s.dot' % fsm.struct_name
f = open(dot_path, 'w')
f.write(fsm.to_dot())
f.close()
print(dot_path)
re_fsm = re.compile(r'struct osmo_fsm ([a-z_][a-z_0-9]*) =')
re_fsm_states_struct_name = re.compile(r'\bstates = ([a-z_][a-z_0-9]*)\W*,')
re_fsm_states = re.compile(r'struct osmo_fsm_state ([a-z_][a-z_0-9]*)\[\] =')
re_func = re.compile(r'(\b[a-z_][a-z_0-9]*\b)\([^)]*\)\W*^{', re.MULTILINE)
re_state_trigger = re.compile(r'osmo_fsm_inst_state_chg\([^,]+,\W*([A-Z_][A-Z_0-9]*)\W*,', re.M)
re_fsm_alloc = re.compile(r'osmo_fsm_inst_alloc[_child]*\(\W*&([a-z_][a-z_0-9]*),', re.M)
re_fsm_event_dispatch = re.compile(r'osmo_fsm_inst_dispatch\(\W*[^,]+,\W*([A-Z_][A-Z_0-9]*)\W*,', re.M)
class CFile():
def __init__(c_file, path):
c_file.path = path
c_file.src = open(path).read()
c_file.funcs = {}
c_file.fsm_allocators = listdict()
def extract_block(c_file, brace_open, brace_close, start):
pos = 0
try:
src = c_file.src
block_start = src.find(brace_open, start)
pos = block_start
level = 1
while level > 0:
pos += 1
if src[pos] == brace_open:
level += 1
elif src[pos] == brace_close:
level -= 1
return src[block_start+1:pos]
except:
print("Error while trying to extract a code block from %r char pos %d" % (c_file.path, pos))
print("Block start at char pos %d" % block_start)
try:
print(src[block_start - 20 : block_start + 20])
print('...')
print(src[pos - 20 : pos + 20])
except:
pass
return ''
def find_fsms(c_file):
fsms = []
for m in re_fsm.finditer(c_file.src):
struct_name = m.group(1)
struct_def = c_file.extract_block('{', '}', m.start())
states_struct_name = re_fsm_states_struct_name.findall(struct_def)[0]
fsm = Fsm(struct_name, states_struct_name, c_file)
fsms.append(fsm)
return fsms
def find_fsm_states(c_file, fsms):
for m in re_fsm_states.finditer(c_file.src):
states_struct_name = m.group(1)
for fsm in fsms:
if states_struct_name == fsm.states_struct_name:
fsm.parse_states(c_file.extract_block('{', '}', m.start()))
def parse_functions(c_file):
funcs = {}
for m in re_func.finditer(c_file.src):
name = m.group(1)
func_src = c_file.extract_block('{', '}', m.start())
funcs[name] = func_src
c_file.funcs = funcs
c_file.find_fsm_allocators()
def find_callers(c_file, func_name):
func_call = func_name + '('
callers = []
for func_name, src in c_file.funcs.items():
if src.find(func_call) >= 0:
callers.append(func_name)
return callers
def find_fsm_allocators(c_file):
c_file.fsm_allocators = listdict()
for func_name, src in c_file.funcs.items():
for m in re_fsm_alloc.finditer(src):
fsm_struct_name = m.group(1)
c_file.fsm_allocators.add(fsm_struct_name, func_name)
def find_state_transitions(c_file, event_names):
TO_STATE = 'TO_STATE'
EVENT = 'EVENT'
func_to_state_transitions = listdict()
for func_name, src in c_file.funcs.items():
found_tokens = []
for m in re_state_trigger.finditer(src):
to_state = m.group(1)
found_tokens.append((m.start(), TO_STATE, to_state))
for event in event_names:
re_event = re.compile(r'\b(' + event + r')\b')
for m in re_event.finditer(src):
event = m.group(1)
found_tokens.append((m.start(), EVENT, event))
found_tokens = sorted(found_tokens)
last_event = None
for start, kind, name in found_tokens:
if kind == EVENT:
last_event = name
else:
func_to_state_transitions.add(func_name, (name, last_event))
return func_to_state_transitions
def find_event_sources(c_file, event_names):
c_file.event_sources = listdict()
for func_name, src in c_file.funcs.items():
for m in re_fsm_event_dispatch.finditer(src):
event_name = m.group(1)
c_file.event_sources.add(event_name, func_name)
class CFiles(list):
def find_callers(c_files, func_name):
callers = []
for c_file in c_files:
callers.extend(c_file.find_callers(func_name))
return callers
def find_func_to_state_transitions(c_files):
func_to_state_transitions = listdict()
for c_file in c_files:
func_to_state_transitions.update( c_file.find_state_transitions(fsm.event_names) )
return func_to_state_transitions
def find_event_sources(c_files, event_names):
event_sources = listdict()
for c_file in c_files:
for event, sources in c_file.event_sources.items():
if event in event_names:
event_sources.extend(event, sources)
return event_sources
c_files = CFiles()
paths_seen = set()
for path in sys.argv[1:]:
if path in paths_seen:
continue
paths_seen.add(path)
c_file = CFile(path)
c_files.append(c_file)
for c_file in c_files:
c_file.parse_functions()
fsms = []
for c_file in c_files:
fsms.extend(c_file.find_fsms())
for c_file in c_files:
c_file.find_fsm_states(fsms)
c_file.find_event_sources(fsms)
for fsm in fsms:
fsm.find_event_edges(c_files)
fsm.add_fsm_alloc(c_files)
fsm_meta = Fsm("meta", "meta")
for fsm in fsms:
fsm.add_cross_fsm_links(fsms, c_files, fsm_meta)
for fsm in fsms:
fsm.make_events_short_names()
for fsm in fsms:
fsm.write_dot_file()
fsm_meta.write_dot_file()
# vim: tabstop=2 shiftwidth=2 expandtab