184 lines
6.0 KiB
Python
184 lines
6.0 KiB
Python
# coding=utf-8
|
|
"""
|
|
"""
|
|
|
|
# (C) 2023 by Harald Welte <laforge@osmocom.org>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
from typing import Optional, Dict, Tuple
|
|
|
|
import abc
|
|
import uuid
|
|
from datetime import datetime, timedelta
|
|
|
|
class Switcher(abc.ABC):
|
|
"""Base class for some device that can switch a power rail."""
|
|
def __init__(self, group: 'SwitcherGroup', name: str):
|
|
self.group = group
|
|
self.name = name
|
|
self.status = "unknown"
|
|
self.status = self._obtain_actual_status()
|
|
self.group.channel_add(self)
|
|
|
|
@staticmethod
|
|
def _is_valid_status(status: str) -> bool:
|
|
"""Determine if the given status string is valid."""
|
|
if status in ["unknown", "on", "off"]:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
@staticmethod
|
|
def _validate_status(status: str):
|
|
if not Switcher._is_valid_status(status):
|
|
raise ValueError("Invalid Status: %s" % status)
|
|
|
|
def _update_status(self, status: str):
|
|
"""Helper method used by derived class to update status"""
|
|
self._validate_status(status)
|
|
self.status = status
|
|
|
|
def get_status(self):
|
|
"""Return the (cached) status"""
|
|
return self.status
|
|
|
|
@abc.abstractmethod
|
|
def _status_change(self, new_status: str):
|
|
"""Derived class must implement this to actually change the status"""
|
|
pass
|
|
|
|
def _obtain_actual_status(self) -> str:
|
|
"""Derived class should override this method, if it can poll the hardware about its actual
|
|
current status."""
|
|
return "unknown"
|
|
|
|
def status_change(self, new_status: str) -> str:
|
|
"""Change the status of the switcher. Returns new status"""
|
|
if self.status != new_status:
|
|
self._update_status(new_status)
|
|
return self.status
|
|
|
|
|
|
class SwitcherGroup(abc.ABC):
|
|
"""Base class for some device that can switch a power rail."""
|
|
def __init__(self, name: str, channels: dict = {}):
|
|
self.name = name
|
|
self.channels = channels
|
|
|
|
def __getitem__(self, val):
|
|
return self.channel_get(val)
|
|
|
|
def channel_add(self, chan: Switcher):
|
|
"""Add a switcher channel to a switcher group."""
|
|
self.channels[chan.name] = chan
|
|
|
|
def channel_get(self, name: str):
|
|
"""Get a channel within a switcher group."""
|
|
return self.channels[name]
|
|
|
|
|
|
class UsageToken:
|
|
"""I represent one given (concurrent) usage of a given resource."""
|
|
def __init__(self, resource: 'Resource', user_name: str, usage_name: str, duration_s: int):
|
|
self.resource = resource
|
|
self.user_name = user_name
|
|
self.expires_at = datetime.now() + timedelta(seconds=duration_s)
|
|
self.uuid = uuid.uuid4()
|
|
|
|
def has_expired(self) -> bool:
|
|
if self.expires_at <= datetime.now():
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def to_dict(self):
|
|
return {'resource': self.resource.name,
|
|
'user_name': self.user_name,
|
|
'expires_at': self.expires_at.isoformat(),
|
|
'uuid': self.uuid.hex}
|
|
|
|
class AvailabilityChecker(abc.ABC):
|
|
@abc.abstractmethod
|
|
def is_available(self) -> bool:
|
|
pass
|
|
|
|
class Resource:
|
|
"""A Resource is some kind of powered device. A Resource tracks its users via UsageTokens
|
|
and it will be automatically powered up (by its associated Switcher) once the usage count
|
|
is > 0, and powered down once it gets back to 0."""
|
|
def __init__(self, name: str, switcher: Switcher, checker: AvailabilityChecker, desc: str = ""):
|
|
self.name = name
|
|
self.switcher = switcher
|
|
self.avail_checker = checker
|
|
self.description = desc
|
|
self.status = "unknown"
|
|
self.use_count = 0
|
|
self.usage_tokens = {}
|
|
self.determine_status()
|
|
|
|
def usage_inc(self):
|
|
self.use_count += 1
|
|
if self.use_count == 1:
|
|
self.switcher.status_change("on")
|
|
return self.use_count
|
|
|
|
def usage_dec(self):
|
|
assert(self.use_count >= 1)
|
|
self.use_count -= 1
|
|
if self.use_count == 0:
|
|
self.switcher.status_change("off")
|
|
return self.use_count
|
|
|
|
def usage_token_get(self, user_name: str, usage: str, duration_s: int) -> UsageToken:
|
|
"""Create/obtain a new usage token."""
|
|
token = UsageToken(self, user_name, usage, duration_s)
|
|
self.usage_tokens[token.uuid.hex] = token
|
|
self.usage_inc()
|
|
return token
|
|
|
|
def usage_token_put(self, token: UsageToken):
|
|
"""Release/put an existing usage token."""
|
|
uuid_hex = token.uuid.hex
|
|
del self.usage_tokens[uuid_hex]
|
|
self.usage_dec()
|
|
|
|
def usage_token_find(self, uuid_hex: str):
|
|
if uuid_hex in self.usage_tokens:
|
|
return self.usage_tokens[uuid_hex]
|
|
return None
|
|
|
|
def get_status(self):
|
|
"""Return the (cached) status of the resource."""
|
|
return self.status
|
|
|
|
def determine_status(self):
|
|
"""Re-determine the status of the resource, and return it."""
|
|
sw_status = self.switcher.get_status()
|
|
if sw_status == "on":
|
|
if self.avail_checker.is_available():
|
|
self.status = "available"
|
|
else:
|
|
self.status = "powered"
|
|
else:
|
|
self.status = sw_status
|
|
return self.get_status()
|
|
|
|
def to_dict(self):
|
|
return {'name': self.name,
|
|
'description': self.description,
|
|
'status': self.status,
|
|
'use_count': self.use_count}
|
|
|