#!/usr/bin/env python3 # Pau Espin Pedrol # MIT # manage netns for ofono modems import os import sys import subprocess import usb.core import usb.util from pprint import pprint def get_path_ids(bus, port_numbers): port_numbers = [str(port) for port in port_numbers] ports = '.'.join(port_numbers) return '{}-{}'.format(bus, ports) def get_usb_dir(bus, port_numbers): return '/sys/bus/usb/devices/' + get_path_ids(bus, port_numbers) + '/' def get_net_from_usb(bus, port_numbers): net_ifaces = [] path = get_usb_dir(bus, port_numbers) path_ids = get_path_ids(bus, port_numbers) usb_interfaces = [f for f in os.listdir(path) if f.startswith(path_ids)] for usb_iface in usb_interfaces: listdir = [f for f in os.listdir(path + usb_iface) if f == ('net')] if listdir: # found a net iface net_ifaces += os.listdir(path + usb_iface + '/net/') return net_ifaces def move_modem_to_netns(usb_path_id, net_li): if len(net_li) == 0: print("%s: Device has no net ifaces, skipping" %(usb_path_id)) return if not os.path.exists("/var/run/netns/%s" % usb_path_id): print("%s: Creating netns" % (usb_path_id)) subprocess.check_call(["ip", "netns", "add", usb_path_id]) else: print("%s: netns already exists" % (usb_path_id)) for netif in net_li: print("%s: Moving iface %s to netns" % (usb_path_id, netif)) subprocess.check_call(["ip", "link", "set", netif, "netns", usb_path_id]) # iface Must be set up AFTER pdp ctx is activated, otherwise we get no DHCP response. #print("%s: Setting up iface %s" % (usb_path_id, netif)) #subprocess.check_call(["ip", "netns", "exec", usb_path_id, "ip", "link", "set", "dev", netif, "up"]) #subprocess.check_call(["ip", "netns", "exec", usb_path_id, "udhcpc", "-i", netif]) def delete_modem_netns(usb_path_id): if os.path.exists("/var/run/netns/%s" % usb_path_id): print("%s: Deleting netns" % (usb_path_id)) subprocess.check_call(["ip", "netns", "delete", usb_path_id]) else: print("%s: netns doesn't exist" % (usb_path_id)) def print_help(): print("Usage: %s start|stop" % sys.argv[0]) exit(1) if __name__ == '__main__': if len(sys.argv) != 2: print_help() USB_DEVS = [dev for dev in usb.core.find(find_all=True)] RESULT = {} for device in USB_DEVS: result = {} if not device.port_numbers: continue usb_path_id = get_path_ids(device.bus, device.port_numbers) net_li = get_net_from_usb(device.bus, device.port_numbers) if sys.argv[1] == "start": move_modem_to_netns(usb_path_id, net_li) elif sys.argv[1] == "stop": delete_modem_netns(usb_path_id) else: print_help()