72 lines
2.6 KiB
Python
72 lines
2.6 KiB
Python
# RESTful HTTP service for performing power management tasks
|
|
#
|
|
# (C) 2023 by Harald Welte <laforge@osmocom.org>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import requests
|
|
|
|
class HttpException(Exception):
|
|
def __init__(self, api_suffix, response):
|
|
self.api_suffix = api_suffix
|
|
self.response = response
|
|
|
|
class ApiClient:
|
|
"""Implementation of an API client for the osmo-lpmgd REST API."""
|
|
|
|
def __init__(self, server_host: str, server_port: int):
|
|
self.server_host = server_host
|
|
self.server_port = server_port
|
|
|
|
def _build_url(self, suffix):
|
|
BASE_PATH = "/api/v1"
|
|
return "http://%s:%u%s%s" % (self.server_host, self.server_port, BASE_PATH, suffix)
|
|
|
|
def _rest_get(self, suffix):
|
|
resp = requests.get(self._build_url(suffix))
|
|
if resp.ok:
|
|
return resp.json()
|
|
else:
|
|
raise HttpException(suffix, resp)
|
|
|
|
def _rest_post(self, suffix, js = None):
|
|
resp = requests.post(self._build_url(suffix), json=js)
|
|
if resp.ok:
|
|
return resp.json()
|
|
else:
|
|
raise HttpException(suffix, resp)
|
|
|
|
def get_resource_status(self, resource: str):
|
|
return self._rest_get("/resource/%s/status" % resource)
|
|
|
|
def obtain_usage_token(self, resource: str, user_name: str, usage: str, duration_s: int):
|
|
body = {
|
|
'user_name': user_name,
|
|
'usage': usage,
|
|
'duration_seconds': duration_s,
|
|
}
|
|
return self._rest_post('/resource/%s/obtain_usage_token' % resource, js=body)
|
|
|
|
def release_usage_token(self, resource: str, token: str):
|
|
return self._rest_get('/resource/%s/token/%s/release' % (resource, token))
|
|
|
|
def show_usage_token(self, resource: str, token: str):
|
|
return self._rest_get('/resource/%s/token/%s' % (resource, token))
|
|
|
|
def list_resources(self):
|
|
return self._rest_get('/resource')
|
|
|
|
def list_usage_tokens(self, resource:str):
|
|
return self._rest_get('/resource/%s/token' % (resource))
|