event_loop: Use glib as mainloop impl and move modem to use event_loop
Several benefits: - We can add APIs to poll on fds in the future (for smpp socket for instance) instead of using busy polling. - During wait(), we now block in the glib mainloop instead of sleeping 0.1 secs and not handling events during that time. - We remove glib mainloop specific bits from modem.py Change-Id: I8c3bc44bbe443703077110cdc67207e9cbb43767
This commit is contained in:
parent
2c0ae6288d
commit
bf176e420e
|
@ -18,48 +18,129 @@
|
|||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import time
|
||||
from gi.repository import GLib, GObject
|
||||
|
||||
from . import log
|
||||
|
||||
poll_funcs = []
|
||||
class DeferredHandling:
|
||||
defer_queue = []
|
||||
|
||||
def handle_queue(self):
|
||||
while DeferredHandling.defer_queue:
|
||||
handler, args, kwargs = self.defer_queue.pop(0)
|
||||
handler(*args, **kwargs)
|
||||
|
||||
def defer(self, handler, *args, **kwargs):
|
||||
self.defer_queue.append((handler, args, kwargs))
|
||||
|
||||
class WaitRequest:
|
||||
timeout_ack = False
|
||||
condition_ack = False
|
||||
|
||||
def __init__(self, condition, condition_args, condition_kwargs, timeout, timestep):
|
||||
self.timeout_started = time.time()
|
||||
self.timeout = timeout
|
||||
self.condition = condition
|
||||
self.condition_args = condition_args
|
||||
self.condition_kwargs = condition_kwargs
|
||||
|
||||
def condition_check(self):
|
||||
#print("_wait_condition_check")
|
||||
waited = time.time() - self.timeout_started
|
||||
if self.condition(*self.condition_args, **self.condition_kwargs):
|
||||
self.condition_ack = True
|
||||
elif waited > self.timeout:
|
||||
self.timeout_ack = True
|
||||
|
||||
class EventLoop:
|
||||
poll_funcs = []
|
||||
gloop = None
|
||||
gctx = None
|
||||
deferred_handling = None
|
||||
|
||||
def __init__(self):
|
||||
self.gloop = GLib.MainLoop()
|
||||
self.gctx = self.gloop.get_context()
|
||||
self.deferred_handling = DeferredHandling()
|
||||
|
||||
def _trigger_cb_func(self, user_data):
|
||||
self.defer(user_data)
|
||||
return True #to retrigger the timeout
|
||||
|
||||
def defer(self, handler, *args, **kwargs):
|
||||
self.deferred_handling.defer(handler, *args, **kwargs)
|
||||
|
||||
def register_poll_func(self, func, timestep=1):
|
||||
id = GObject.timeout_add(timestep*1000, self._trigger_cb_func, func) # in 1/1000th of a sec
|
||||
self.poll_funcs.append((func, id))
|
||||
|
||||
def unregister_poll_func(self, func):
|
||||
for pair in self.poll_funcs:
|
||||
f, id = pair
|
||||
if f == func:
|
||||
GObject.source_remove(id)
|
||||
self.poll_funcs.remove(pair)
|
||||
return
|
||||
|
||||
def poll(self, may_block=False):
|
||||
self.gctx.iteration(may_block)
|
||||
self.deferred_handling.handle_queue()
|
||||
|
||||
def wait_no_raise(self, log_obj, condition, condition_args, condition_kwargs, timeout, timestep):
|
||||
if not timeout or timeout < 0:
|
||||
self = log_obj
|
||||
raise log.Error('wait() *must* time out at some point.', timeout=timeout)
|
||||
if timestep < 0.1:
|
||||
timestep = 0.1
|
||||
|
||||
wait_req = WaitRequest(condition, condition_args, condition_kwargs, timeout, timestep)
|
||||
wait_id = GObject.timeout_add(timestep*1000, self._trigger_cb_func, wait_req.condition_check)
|
||||
while True:
|
||||
self.poll(may_block=True)
|
||||
if wait_req.condition_ack or wait_req.timeout_ack:
|
||||
GObject.source_remove(wait_id)
|
||||
success = wait_req.condition_ack
|
||||
return success
|
||||
|
||||
def wait(self, log_obj, condition, *condition_args, timeout=300, timestep=1, **condition_kwargs):
|
||||
if not self.wait_no_raise(log_obj, condition, condition_args, condition_kwargs, timeout, timestep):
|
||||
log.ctx(log_obj)
|
||||
raise log.Error('Wait timeout')
|
||||
|
||||
def sleep(self, log_obj, seconds):
|
||||
assert seconds > 0.
|
||||
self.wait_no_raise(log_obj, lambda: False, [], {}, timeout=seconds, timestep=seconds)
|
||||
|
||||
|
||||
evloop = EventLoop()
|
||||
|
||||
def register_poll_func(func):
|
||||
global poll_funcs
|
||||
poll_funcs.append(func)
|
||||
global evloop
|
||||
evloop.register_poll_func(func)
|
||||
|
||||
def unregister_poll_func(func):
|
||||
global poll_funcs
|
||||
poll_funcs.remove(func)
|
||||
global evloop
|
||||
evloop.unregister_poll_func(func)
|
||||
|
||||
def poll():
|
||||
global poll_funcs
|
||||
for func in poll_funcs:
|
||||
func()
|
||||
global evloop
|
||||
evloop.poll()
|
||||
|
||||
def wait_no_raise(log_obj, condition, condition_args, condition_kwargs, timeout, timestep):
|
||||
if not timeout or timeout < 0:
|
||||
self = log_obj
|
||||
raise log.Error('wait() *must* time out at some point.', timeout=timeout)
|
||||
if timestep < 0.1:
|
||||
timestep = 0.1
|
||||
|
||||
started = time.time()
|
||||
while True:
|
||||
poll()
|
||||
if condition(*condition_args, **condition_kwargs):
|
||||
return True
|
||||
waited = time.time() - started
|
||||
if waited > timeout:
|
||||
return False
|
||||
time.sleep(timestep)
|
||||
global evloop
|
||||
evloop.wait_no_raise(log_obj, condition, condition_args, condition_kwargs, timeout, timestep)
|
||||
|
||||
def wait(log_obj, condition, *condition_args, timeout=300, timestep=1, **condition_kwargs):
|
||||
if not wait_no_raise(log_obj, condition, condition_args, condition_kwargs, timeout, timestep):
|
||||
log.ctx(log_obj)
|
||||
raise log.Error('Wait timeout')
|
||||
global evloop
|
||||
evloop.wait(log_obj, condition, *condition_args, timeout=timeout, timestep=timestep, **condition_kwargs)
|
||||
|
||||
def sleep(log_obj, seconds):
|
||||
assert seconds > 0.
|
||||
wait_no_raise(log_obj, lambda: False, [], {}, timeout=seconds, timestep=min(seconds, 1))
|
||||
global evloop
|
||||
evloop.sleep(log_obj, seconds)
|
||||
|
||||
def defer(handler, *args, **kwargs):
|
||||
global evloop
|
||||
evloop.defer(handler, *args, **kwargs)
|
||||
|
||||
|
||||
# vim: expandtab tabstop=4 shiftwidth=4
|
||||
|
|
|
@ -30,8 +30,6 @@ from gi.module import get_introspection_module
|
|||
Gio = get_introspection_module('Gio')
|
||||
|
||||
from gi.repository import GLib
|
||||
glib_main_loop = GLib.MainLoop()
|
||||
glib_main_ctx = glib_main_loop.get_context()
|
||||
bus = SystemBus()
|
||||
|
||||
I_MODEM = 'org.ofono.Modem'
|
||||
|
@ -49,24 +47,14 @@ NETREG_ST_ROAMING = 'roaming'
|
|||
|
||||
NETREG_MAX_REGISTER_ATTEMPTS = 3
|
||||
|
||||
class DeferredHandling:
|
||||
defer_queue = []
|
||||
class DeferredDBus:
|
||||
|
||||
def __init__(self, dbus_iface, handler):
|
||||
self.handler = handler
|
||||
self.subscription_id = dbus_iface.connect(self.receive_signal)
|
||||
|
||||
def receive_signal(self, *args, **kwargs):
|
||||
DeferredHandling.defer_queue.append((self.handler, args, kwargs))
|
||||
|
||||
@staticmethod
|
||||
def handle_queue():
|
||||
while DeferredHandling.defer_queue:
|
||||
handler, args, kwargs = DeferredHandling.defer_queue.pop(0)
|
||||
handler(*args, **kwargs)
|
||||
|
||||
def defer(handler, *args, **kwargs):
|
||||
DeferredHandling.defer_queue.append((handler, args, kwargs))
|
||||
event_loop.defer(self.handler, *args, **kwargs)
|
||||
|
||||
def dbus_connect(dbus_iface, handler):
|
||||
'''This function shall be used instead of directly connecting DBus signals.
|
||||
|
@ -75,15 +63,7 @@ def dbus_connect(dbus_iface, handler):
|
|||
so that a signal handler is invoked only after the DBus polling is through
|
||||
by enlisting signals that should be handled in the
|
||||
DeferredHandling.defer_queue.'''
|
||||
return DeferredHandling(dbus_iface, handler).subscription_id
|
||||
|
||||
def poll_glib():
|
||||
global glib_main_ctx
|
||||
while glib_main_ctx.pending():
|
||||
glib_main_ctx.iteration()
|
||||
DeferredHandling.handle_queue()
|
||||
|
||||
event_loop.register_poll_func(poll_glib)
|
||||
return DeferredDBus(dbus_iface, handler).subscription_id
|
||||
|
||||
def systembus_get(path):
|
||||
global bus
|
||||
|
@ -493,8 +473,8 @@ class Modem(log.Origin):
|
|||
# waiting for that. Make it async and try to register when the scan is
|
||||
# finished.
|
||||
register_func = self.scan_cb_register_automatic if mcc_mnc is None else self.scan_cb_register
|
||||
result_handler = lambda obj, result, user_data: defer(register_func, result, user_data)
|
||||
error_handler = lambda obj, e, user_data: defer(self.scan_cb_error_handler, e, mcc_mnc)
|
||||
result_handler = lambda obj, result, user_data: event_loop.defer(register_func, result, user_data)
|
||||
error_handler = lambda obj, e, user_data: event_loop.defer(self.scan_cb_error_handler, e, mcc_mnc)
|
||||
dbus_async_call(netreg, netreg.Scan, timeout=30, cancellable=self.cancellable,
|
||||
result_handler=result_handler, error_handler=error_handler,
|
||||
user_data=mcc_mnc)
|
||||
|
@ -559,7 +539,7 @@ class Modem(log.Origin):
|
|||
self.cancellable.cancel()
|
||||
# Cancel op is applied as a signal coming from glib mainloop, so we
|
||||
# need to run it and wait for the callbacks to handle cancellations.
|
||||
poll_glib()
|
||||
event_loop.poll()
|
||||
# once it has been triggered, create a new one for next operation:
|
||||
self.cancellable = Gio.Cancellable.new()
|
||||
|
||||
|
|
Loading…
Reference in New Issue