Used to quickly set modem net interfaces into their own net namespace (named after modem USB ID path). The idea is that since osmo-gsm-tester ofono modem.py knowns the USB path from a modem (path yml attr), it can infer the netns from it and run a ping process inside it. Related: OS#2308 Change-Id: Iadb2df2974e132044fba1f1bc2db8b559912e4e1changes/61/11461/4
parent
2decdba7e5
commit
e972828a01
@ -0,0 +1,87 @@ |
||||
#!/usr/bin/env python3 |
||||
# Pau Espin Pedrol <pespin@sysmocom.de> |
||||
# MIT |
||||
|
||||
# manage netns for ofono modems |
||||
|
||||
import os |
||||
import sys |
||||
import subprocess |
||||
import usb.core |
||||
import usb.util |
||||
from pprint import pprint |
||||
|
||||
def get_path_ids(bus, port_numbers): |
||||
port_numbers = [str(port) for port in port_numbers] |
||||
ports = '.'.join(port_numbers) |
||||
return '{}-{}'.format(bus, ports) |
||||
|
||||
def get_usb_dir(bus, port_numbers): |
||||
return '/sys/bus/usb/devices/' + get_path_ids(bus, port_numbers) + '/' |
||||
|
||||
def get_net_from_usb(bus, port_numbers): |
||||
net_ifaces = [] |
||||
path = get_usb_dir(bus, port_numbers) |
||||
path_ids = get_path_ids(bus, port_numbers) |
||||
|
||||
usb_interfaces = [f for f in os.listdir(path) if f.startswith(path_ids)] |
||||
for usb_iface in usb_interfaces: |
||||
listdir = [f for f in os.listdir(path + usb_iface) if f == ('net')] |
||||
if listdir: |
||||
# found a net iface |
||||
net_ifaces += os.listdir(path + usb_iface + '/net/') |
||||
return net_ifaces |
||||
|
||||
def move_modem_to_netns(usb_path_id, net_li): |
||||
|
||||
if len(net_li) == 0: |
||||
print("%s: Device has no net ifaces, skipping" %(usb_path_id)) |
||||
return |
||||
|
||||
if not os.path.exists("/var/run/netns/%s" % usb_path_id): |
||||
print("%s: Creating netns" % (usb_path_id)) |
||||
subprocess.check_call(["ip", "netns", "add", usb_path_id]) |
||||
else: |
||||
print("%s: netns already exists" % (usb_path_id)) |
||||
|
||||
for netif in net_li: |
||||
print("%s: Moving iface %s to netns" % (usb_path_id, netif)) |
||||
subprocess.check_call(["ip", "link", "set", netif, "netns", usb_path_id]) |
||||
# iface Must be set up AFTER pdp ctx is activated, otherwise we get no DHCP response. |
||||
#print("%s: Setting up iface %s" % (usb_path_id, netif)) |
||||
#subprocess.check_call(["ip", "netns", "exec", usb_path_id, "ip", "link", "set", "dev", netif, "up"]) |
||||
#subprocess.check_call(["ip", "netns", "exec", usb_path_id, "udhcpc", "-i", netif]) |
||||
|
||||
def delete_modem_netns(usb_path_id): |
||||
if os.path.exists("/var/run/netns/%s" % usb_path_id): |
||||
print("%s: Deleting netns" % (usb_path_id)) |
||||
subprocess.check_call(["ip", "netns", "delete", usb_path_id]) |
||||
else: |
||||
print("%s: netns doesn't exist" % (usb_path_id)) |
||||
|
||||
def print_help(): |
||||
print("Usage: %s start|stop" % sys.argv[0]) |
||||
exit(1) |
||||
|
||||
|
||||
if __name__ == '__main__': |
||||
|
||||
if len(sys.argv) != 2: |
||||
print_help() |
||||
|
||||
USB_DEVS = [dev for dev in usb.core.find(find_all=True)] |
||||
RESULT = {} |
||||
for device in USB_DEVS: |
||||
result = {} |
||||
if not device.port_numbers: |
||||
continue |
||||
|
||||
usb_path_id = get_path_ids(device.bus, device.port_numbers) |
||||
net_li = get_net_from_usb(device.bus, device.port_numbers) |
||||
|
||||
if sys.argv[1] == "start": |
||||
move_modem_to_netns(usb_path_id, net_li) |
||||
elif sys.argv[1] == "stop": |
||||
delete_modem_netns(usb_path_id) |
||||
else: |
||||
print_help() |
Loading…
Reference in new issue