#!/usr/bin/env python3 """ SPDX-License-Identifier: AGPL-3.0-or-later Copyright 2019 sysmocom s.f.m.c GmbH This is a freeswitch dialplan implementation, see: https://freeswitch.org/confluence/display/FREESWITCH/mod_python Find the right SIP server with mslookup (depending on the destination number) and bridge calls accordingly. """ import json import subprocess def query_mslookup(service_type, id, id_type='msisdn'): query_str = '%s.%s.%s' % (service_type, id, id_type) print('[dialplan-dgsm] mslookup: ' + query_str) result_line = subprocess.check_output([ 'osmo-mslookup-client', query_str, '-f', 'json']) if isinstance(result_line, bytes): result_line = result_line.decode('ascii') print('[dialplan-dgsm] mslookup result: ' + result_line) return json.loads(result_line) def handler(session, args): """ Handle calls: bridge to the SIP server found with mslookup. """ print('[dialplan-dgsm] call handler') msisdn = session.getVariable('destination_number') # Run osmo-mslookup-client binary. We have also tried to directly call the # C functions with ctypes but this has lead to hard-to-debug segfaults. try: result = query_mslookup("sip.voice", msisdn) # This example only makes use of IPv4 if not result['v4']: print('[dialplan-dgsm] no IPv4 result from mslookup') session.hangup('UNALLOCATED_NUMBER') return sip_ip, sip_port = result['v4'] dial_str = 'sofia/internal/sip:{}@{}:{}'.format( msisdn, sip_ip, sip_port) print('[dialplan-dgsm] dial_str: ' + str(dial_str)) session.execute('bridge', dial_str) except: print('[dialplan-dgsm]: exception during call handler') session.hangup('UNALLOCATED_NUMBER') def fsapi(session, stream, env, args): """ Freeswitch refuses to load the module without this. """ stream.write(env.serialize()) def main(): import argparse parser = argparse.ArgumentParser() parser.add_argument('id', type=int) parser.add_argument('-i', '--id-type', default='msisdn', help='default: "msisdn"') parser.add_argument('-s', '--service', default='sip.voice', help='default: "sip.voice"') args = parser.parse_args() result = query_mslookup(args.service, args.id, args.id_type) print(json.dumps(result)) if __name__ == '__main__': main()