WIP: import sispm + intellinet switcher from osmo-gsm-tester

This commit is contained in:
Harald Welte 2023-05-07 12:29:56 +10:00
parent af0449d9e0
commit b91c00d034
2 changed files with 221 additions and 7 deletions

View File

@ -1,6 +1,105 @@
# osmo-lpmgd: class defining a Switcher object for Intellinet PDUs
# Import from osmo-gsm-tester.git/src/osmo_gsm_tester/obj/powersupply_intellinet.py
#
# Copyright (C) 2019,2023 by sysmocom - s.f.m.c. GmbH
#
# Author: Pau Espin Pedrol <pespin@sysmocom.de>
# Author: Harald Welte <laforge@osmocom.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import urllib.request
import xml.etree.ElementTree as ET
from model import Switcher, SwitcherGroup
from model import Switcher
# FIXME: port from osmo-gsm-tester.git/src/osmo_gsm_tester/obj/powersupply_intellinet.py
logger = logging.getLogger(__name__)
class IntellinetSwitcher(Switcher):
"""Switcher implementation to control Intellinet PDU devices."""
# HTTP request timeout, in seconds
PDU_TIMEOUT = 5
PDU_CMD_ON = 0
PDU_CMD_OFF = 1
def _url_prefix(self):
return 'http://' + self.device_ip
def _url_status(self):
return self._url_prefix() + '/status.xml'
def _url_set_port_status(self, pdu_cmd):
return self._url_prefix() + "/control_outlet.htm?" + "outlet" + str(self.port - 1) + "=1" + "&op=" + str(pdu_cmd) + "&submit=Anwenden"
def _port_stat_name(self):
# Names start with idx 0, while in ogt we count sockets starting from 1.
return 'outletStat' + str(self.port - 1)
def _fetch_status(self):
data = urllib.request.urlopen(self._url_status(), timeout = self.PDU_TIMEOUT).read()
if not data:
raise self.log.Error('empty status xml')
return data
def _get_port_status(self):
data = self._fetch_status()
root = ET.fromstring(data)
for child in root:
if child.tag == self._port_stat_name():
return child.text
raise self.log.Error('no state for %s' % self._port_stat_name())
def _set_port_status(self, pdu_cmd):
urllib.request.urlopen(self._url_set_port_status(pdu_cmd),timeout = self.PDU_TIMEOUT).read()
########################
# PUBLIC - INTERNAL API
########################
def __init__(self, conf):
super().__init__(conf, 'intellinet')
self.log = logger
mydevid = conf.get('device')
if mydevid is None:
raise self.log.Error('No "device" attribute provided in supply conf!')
self.set_name('intellinet-'+mydevid)
myport = conf.get('port')
if myport is None:
raise self.log.Error('No "port" attribute provided in power_supply conf!')
if not int(myport):
raise self.log.Error('Wrong non numeric "port" attribute provided in power_supply conf!')
self.set_name('intellinet-'+mydevid+'-'+myport)
self.device_ip = mydevid
self.port = int(myport)
def _obtain_actual_status(self):
"""Get whether the device is powered on or off."""
return self._get_port_status()
def _status_change(self, new_status: str):
"""Turn on (onoff=True) or off (onoff=False) the device."""
if new_status == 'on':
self.log.debug('switchon %s:%u' % (self.device_ip, self.port))
self._set_port_status(self.PDU_CMD_ON)
else:
self.log.debug('switchoff %s:%u' % (self.device_ip, self.port))
self._set_port_status(self.PDU_CMD_OFF)
# vim: expandtab tabstop=4 shiftwidth=4

View File

@ -1,12 +1,127 @@
# Import from osmo-gsm-tester.git/src/osmo_gsm_tester/obj/powersupply_sispm.py
# osmo-lpmgd: class defining a sispm Power Supply object
#
# Copyright (C) 2018,2023 by sysmocom - s.f.m.c. GmbH
#
# Author: Pau Espin Pedrol <pespin@sysmocom.de>
# Author: Harald Welte <laforge@osmocom.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import pysispm
from model import Switcher, SwitcherGroup
# FIXME: port from osmo-gsm-tester.git/src/osmo_gsm_tester/obj/powersupply_sispm.py
logger = logging.getLogger(__name__)
class SispmSwitcher(Switcher):
def __init__(self, group: SwitcherGroup, name: str, conf):
super().__init__(group, name)
self.conf = conf
"""Switcher implementation using pysispm.
The device object from sismpm is not cached into an attribute of the class
instance because it is actually a libusb object keeping the device assigned
to it until it destroyed, meaning it will block other users to use the whole
device until the object is released. Instead, we pick the object in the
smallest scope possible, and we re-try if we receive a "Resource Busy" error
because we know it will be available in short time.
"""
def _retry_usberr(self, func, *args):
"""Run function until it runs successfully, retry on spurious errors.
Sometimes when operating the usb device, libusb reports the following spurious exception:
[Errno 16] Resource busy -> This can appear if another instance is using the device.
[Errno 110] Operation timed out
Retrying after that it's usually enough.
"""
while True:
try:
ret = func(*args)
return ret
except USBError as e:
if e.errno == 16 or e.errno==110:
self.log.debug('skipping USB error, retrying', repr(e))
#MainLoop.sleep(0.1)
# FIXME: synchronous sleep here?
continue
raise e
def _get_device(self):
"""Get the sispm device object.
It should be kept alive as short as possible as it blocks other users
from using the device until the object is released.
"""
mydevid = self.conf.get('device')
devices = self._retry_usberr(sispm.connect)
for d in devices:
did = self._retry_usberr(sispm.getid, d)
self.dbg('detected device:', did)
if did == mydevid:
self.dbg('found matching device: %s' % did)
return d
return None
########################
# PUBLIC - INTERNAL API
########################
#def __init__(self, group: SwitcherGroup, name: str, conf):
def __init__(self, conf):
super().__init__(conf, 'sispm')
self.log = logger
import sispm
from usb.core import USBError
mydevid = conf.get('device')
if mydevid is None:
raise self.log.Error('No "device" attribute provided in supply conf!')
self.set_name('sispm-'+mydevid)
myport = conf.get('port')
if myport is None:
raise self.log.Error('No "port" attribute provided in power_supply conf!')
if not int(myport):
raise self.log.Error('Wrong non numeric "port" attribute provided in power_supply conf!')
self.set_name('sispm-'+mydevid+'-'+myport)
self.port = int(myport)
device = self._get_device()
if device is None:
raise self.log.Error('device with with id %s not found!' % mydevid)
dmin = self._retry_usberr(sispm.getminport, device)
dmax = self._retry_usberr(sispm.getmaxport, device)
if dmin > self.port or dmax < self.port:
raise self.log.Error('Out of range "port" attribute provided in power_supply conf!')
def _obtain_actual_status(self):
"""Get whether the device is powered on or off."""
if self._retry_usberr(sispm.getstatus, self._get_device(), self.port):
return 'on'
else:
return 'off'
def _status_change(self, new_status: str):
FIXME
"""Turn on or off the device."""
if new_status == 'on':
self.dbg('switchon')
self._retry_usberr(sispm.switchon, self._get_device(), self.port)
else:
self.dbg('switchoff')
self._retry_usberr(sispm.switchoff, self._get_device(), self.port)
# vim: expandtab tabstop=4 shiftwidth=4