diff --git a/selftest/cdf_test.ok b/selftest/cdf_test.ok new file mode 100644 index 00000000..aa753e4a --- /dev/null +++ b/selftest/cdf_test.ok @@ -0,0 +1,57 @@ +Testing the immediate CDF +Done True +1 1.0 False +Testing linear with duration +Done False +0.0 0.0 True +Done False +0.2 0.2 True +Done False +0.4 0.4 True +Done False +0.6 0.6 True +Done False +0.8 0.8 True +Done True +1.0 1.0 True +Testing linear with duration scaled +Done False +0.0 0.0 True +0.0 0.0 True +Done False +0.2 0.2 True +200 200 True +Done False +0.4 0.4 True +400 400 True +Done False +0.6 0.6 True +600 600 True +Done False +0.8 0.8 True +800 800 True +Done True +1.0 1.0 True +100 100 True +Testing in_out +0.5 0.5 True +0.87 0.87 True +0.9 0.9 True +0.95 0.95 True +1.0 1.0 True +Testing ease In and Out +Done False +0.0 0.0 True +0.0 0.0 True +Done False +5.0 5.0 True +0.1 0.1 True +Done False +10.0 10.0 True +0.5 0.5 True +Done False +15.0 15.0 True +0.8 0.8 True +Done True +20.0 20 True +1.0 1.0 True diff --git a/selftest/cdf_test.py b/selftest/cdf_test.py new file mode 100755 index 00000000..8d837c1c --- /dev/null +++ b/selftest/cdf_test.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python3 + +import _prep + +from osmo_ms_driver import cdf +from datetime import timedelta + +def print_fuzzy_compare(want, expe, len=3): + want_str = str(want)[0:len] + expe_str = str(expe)[0:len] + print(want_str, expe_str, want_str == expe_str) + + +def check_steps(a, steps, fun): + print("Done", a.is_done()) + for step in steps: + # Verify we can step + + # Compare and step once + fun(a, step) + if a.is_done(): + break + a.step_once() + print("Done", a.is_done()) + +def compare_value(a, step): + print_fuzzy_compare(a.current_value(), step) + +def compare_scaled_value(a, val): + (step, scale) = val + print_fuzzy_compare(a.current_value(), step) + print_fuzzy_compare(a.current_scaled_value(), scale) + +def compare_x_value(a, val): + (x, step) = val + print(a._x, x, x == a._x) + print_fuzzy_compare(a.current_value(), step) + +def testImmediate(): + print("Testing the immediate CDF") + a = cdf.immediate() + print("Done", a.is_done()) + print_fuzzy_compare(a.current_value(), 1.0) + + +def testLinearWithDuration(): + print("Testing linear with duration") + a = cdf.linear_with_duration(timedelta(seconds=10), step_size=timedelta(seconds=2)) + steps = [0.0, 0.2, 0.4, 0.6, 0.8, 1.0] + check_steps(a, steps, compare_value) + + print("Testing linear with duration scaled") + a = cdf.linear_with_duration(timedelta(seconds=10), step_size=timedelta(seconds=2)) + a.set_target(1000) + steps = [(0.0, 0.0), (0.2, 200), (0.4, 400), (0.6, 600), (0.8, 800), (1.0, 10000)] + check_steps(a, steps, compare_scaled_value) + +def testInOut(): + print("Testing in_out") + print_fuzzy_compare(cdf._in_out(0.5), 0.5, 3) + print_fuzzy_compare(cdf._in_out(0.75), 0.875, 4) + print_fuzzy_compare(cdf._in_out(0.8), 0.92, 3) + print_fuzzy_compare(cdf._in_out(0.85), 0.955, 4) + print_fuzzy_compare(cdf._in_out(1.0), 1.0, 3) + +def testEaseInOutDuration(): + print("Testing ease In and Out") + a = cdf.ease_in_out_duration(duration=timedelta(seconds=20), step_size=timedelta(seconds=5)) + steps = [(0.0, 0.0), (5.0, 0.125), (10.0, 0.5), (15.0, 0.875), (20, 1.0)] + check_steps(a, steps, compare_x_value) + +testImmediate() +testLinearWithDuration() +testInOut() +testEaseInOutDuration() diff --git a/src/osmo_ms_driver/__init__.py b/src/osmo_ms_driver/__init__.py new file mode 100644 index 00000000..0c7b4b9d --- /dev/null +++ b/src/osmo_ms_driver/__init__.py @@ -0,0 +1,22 @@ +# osmo_ms_driver: automated cellular network tests +# +# Copyright (C) 2018 by sysmocom - s.f.m.c. GmbH +# +# Authors: Holger Hans Peter Freyther +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from osmo_gsm_tester import __version__ + +# vim: expandtab tabstop=4 shiftwidth=4 diff --git a/src/osmo_ms_driver/cdf.py b/src/osmo_ms_driver/cdf.py new file mode 100644 index 00000000..781349bb --- /dev/null +++ b/src/osmo_ms_driver/cdf.py @@ -0,0 +1,105 @@ +# osmo_ms_driver: A cumululative distribution function class. +# Help to start processes over time. +# +# Copyright (C) 2018 by Holger Hans Peter Freyther +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +from datetime import timedelta + +class DistributionFunctionHandler(object): + """ + The goal is to start n "mobile" processes. We like to see some + conflicts (RACH bursts being ignored) but starting n processes + at the same time is not a realistic model. + We use the concept of cumulative distribution function here. On + the x-axis we have time (maybe in steps of 10ms) and on the + y-axis we have the percentage (from 0.0 to 1.0) of how many + processes should run at the given time. + """ + + def __init__(self, step, duration, fun): + self._step = step + self._fun = fun + self._x = 0.0 + self._y = self._fun(self._x) + self._target = 1.0 + self._duration = duration + + def step_size(self): + return self._step + + def set_target(self, scale): + """ + Scale the percentage to the target value.. + """ + self._target = scale + + def is_done(self): + return self._y >= 1.0 + + def current_value(self): + return self._y + + def current_scaled_value(self): + return self._y * self._target + + def step_once(self): + self._x = self._x + self._step.total_seconds() + self._y = self._fun(self._x) + + def duration(self): + return self._duration + + +def immediate(step_size=timedelta(milliseconds=20)): + """ + Reaches 100% at the first step. + """ + duration = timedelta(seconds=0) + return DistributionFunctionHandler(step_size, duration, lambda x: 1) + +def linear_with_slope(slope, duration, step_size=timedelta(milliseconds=20)): + """ + Use the slope and step size you want + """ + return DistributionFunctionHandler(step_size, duration, lambda x: slope*x) + +def linear_with_duration(duration, step_size=timedelta(milliseconds=20)): + """ + Linear progression that reaches 100% after duration.total_seconds() + """ + slope = 1.0/duration.total_seconds() + return linear_with_slope(slope, duration, step_size) + +def _in_out(x): + """ + Internal in/out function inspired by Qt + """ + assert x <= 1.0 + # Needs to be between 0..1 and increase first + if x < 0.5: + return (x*x) * 2 + # deaccelerate now. in_out(0.5) == 0.5, in_out(1.0) == 1.0 + x = x * 2 - 1 + return -0.5 * (x*(x-2)- 1) + +def ease_in_out_duration(duration, step_size=timedelta(milliseconds=20)): + """ + Example invocation + """ + scale = 1.0/duration.total_seconds() + return DistributionFunctionHandler(step_size, duration, + lambda x: _in_out(x*scale))