osmo-gsm-tester/src/osmo_gsm_tester/obj/ms_ofono.py

835 lines
33 KiB
Python
Raw Normal View History

# osmo_gsm_tester: DBUS client to talk to ofono
#
# Copyright (C) 2016-2017 by sysmocom - s.f.m.c. GmbH
#
# Author: Neels Hofmeyr <neels@hofmeyr.de>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
from ..core import log, util, process
from ..core.event_loop import MainLoop
from .ms import MS
from . import sms
_import_external_modules_done = False
bus = None
Gio = None
GLib = None
Variant = None
def _import_external_modules():
global _import_external_modules_done, bus, Gio, GLib, Variant
if _import_external_modules_done:
return
_import_external_modules_done = True
# Required for Gio.Cancellable.
# See https://lazka.github.io/pgi-docs/Gio-2.0/classes/Cancellable.html#Gio.Cancellable
from gi.module import get_introspection_module
Gio = get_introspection_module('Gio')
from gi.repository import GLib as glib_module
GLib = glib_module
from pydbus import SystemBus, Variant
bus = SystemBus()
from pydbus import Variant as variant_class
Variant = variant_class
I_MODEM = 'org.ofono.Modem'
I_NETREG = 'org.ofono.NetworkRegistration'
I_SMS = 'org.ofono.MessageManager'
I_CONNMGR = 'org.ofono.ConnectionManager'
I_CALLMGR = 'org.ofono.VoiceCallManager'
I_CALL = 'org.ofono.VoiceCall'
I_SS = 'org.ofono.SupplementaryServices'
I_SIMMGR = 'org.ofono.SimManager'
I_VOICECALL = 'org.ofono.VoiceCall'
ofono_client: Implement network registration during connect() A new mcc_mnc parameter is now optionally passed to connect() in order to manually register to a specific network with a given MCC+MNC pair. If no parameter is passed (or None), then the modem will be instructed to attempt an automatic registration with any available network which permits it. We get the MCC+MNC parameter from the MSC/NITB and we pass it to the modem object at connect time as shown in the modified tests. Two new simple tests to check network registration is working are added in this commit. Ofono modems seem to be automatically registering at some point after they are set Online=true, and we were actually using that 'feature' before this patch. Thus, it is possible that a modem quickly becomes registered, and we then check so before starting the scan+registration process, which can take a few seconds. The scanning method can take a few seconds to complete. To avoid blocking in the dbus ofono Scan() method, this commit adds some code to make use of glib/gdbus async methods, which are not yet supported directly by pydbus. This way, we can continue polling while waiting for the scan process to complete and we can register several modems in parallel. When scan completes, a callback is run which attempts to register. If no MCC+MNC was passed, as we just finished scanning the modem should have enough fresh operator information to take good and quick decisions on where to connect. If we have an MCC+MNC, then we check the operator list received by Scan() method. If operator with desired MCC+MNC is there, we register with it. If it's not there, we start scanning() again asynchronously hoping the operator will show up in next scan. As scanning() and registration is done in the background, tests are expected to call connect(), and then later on wait for the modem to register by waiting/polling the method "modem.is_connected()". Tests first check for the modem being connected and after with MSC subscriber_attached(). The order is intentional because the later has to poll through network and adds unneeded garbage to the pcap files bein recorded. Change-Id: I8d9eb47eac1044550d3885adb55105c304b0c15c
2017-05-29 12:25:22 +00:00
# See https://github.com/intgr/ofono/blob/master/doc/network-api.txt#L78
NETREG_ST_REGISTERED = 'registered'
NETREG_ST_ROAMING = 'roaming'
NETREG_MAX_REGISTER_ATTEMPTS = 3
class DeferredDBus:
def __init__(self, dbus_iface, handler):
self.handler = handler
self.subscription_id = dbus_iface.connect(self.receive_signal)
def receive_signal(self, *args, **kwargs):
MainLoop.defer(self.handler, *args, **kwargs)
ofono_client: Implement network registration during connect() A new mcc_mnc parameter is now optionally passed to connect() in order to manually register to a specific network with a given MCC+MNC pair. If no parameter is passed (or None), then the modem will be instructed to attempt an automatic registration with any available network which permits it. We get the MCC+MNC parameter from the MSC/NITB and we pass it to the modem object at connect time as shown in the modified tests. Two new simple tests to check network registration is working are added in this commit. Ofono modems seem to be automatically registering at some point after they are set Online=true, and we were actually using that 'feature' before this patch. Thus, it is possible that a modem quickly becomes registered, and we then check so before starting the scan+registration process, which can take a few seconds. The scanning method can take a few seconds to complete. To avoid blocking in the dbus ofono Scan() method, this commit adds some code to make use of glib/gdbus async methods, which are not yet supported directly by pydbus. This way, we can continue polling while waiting for the scan process to complete and we can register several modems in parallel. When scan completes, a callback is run which attempts to register. If no MCC+MNC was passed, as we just finished scanning the modem should have enough fresh operator information to take good and quick decisions on where to connect. If we have an MCC+MNC, then we check the operator list received by Scan() method. If operator with desired MCC+MNC is there, we register with it. If it's not there, we start scanning() again asynchronously hoping the operator will show up in next scan. As scanning() and registration is done in the background, tests are expected to call connect(), and then later on wait for the modem to register by waiting/polling the method "modem.is_connected()". Tests first check for the modem being connected and after with MSC subscriber_attached(). The order is intentional because the later has to poll through network and adds unneeded garbage to the pcap files bein recorded. Change-Id: I8d9eb47eac1044550d3885adb55105c304b0c15c
2017-05-29 12:25:22 +00:00
def dbus_connect(dbus_iface, handler):
'''This function shall be used instead of directly connecting DBus signals.
It ensures that we don't nest a glib main loop within another, and also
that we receive exceptions raised within the signal handlers. This makes it
so that a signal handler is invoked only after the DBus polling is through
by enlisting signals that should be handled in the
DeferredHandling.defer_queue.'''
return DeferredDBus(dbus_iface, handler).subscription_id
def systembus_get(path):
return bus.get('org.ofono', path)
def list_modems():
root = systembus_get('/')
return sorted(root.GetModems())
def get_dbuspath_from_syspath(syspath):
modems = list_modems()
for dbuspath, props in modems:
if props.get('SystemPath', '') == syspath:
return dbuspath
raise ValueError('could not find %s in modem list: %s' % (syspath, modems))
ofono_client: Implement network registration during connect() A new mcc_mnc parameter is now optionally passed to connect() in order to manually register to a specific network with a given MCC+MNC pair. If no parameter is passed (or None), then the modem will be instructed to attempt an automatic registration with any available network which permits it. We get the MCC+MNC parameter from the MSC/NITB and we pass it to the modem object at connect time as shown in the modified tests. Two new simple tests to check network registration is working are added in this commit. Ofono modems seem to be automatically registering at some point after they are set Online=true, and we were actually using that 'feature' before this patch. Thus, it is possible that a modem quickly becomes registered, and we then check so before starting the scan+registration process, which can take a few seconds. The scanning method can take a few seconds to complete. To avoid blocking in the dbus ofono Scan() method, this commit adds some code to make use of glib/gdbus async methods, which are not yet supported directly by pydbus. This way, we can continue polling while waiting for the scan process to complete and we can register several modems in parallel. When scan completes, a callback is run which attempts to register. If no MCC+MNC was passed, as we just finished scanning the modem should have enough fresh operator information to take good and quick decisions on where to connect. If we have an MCC+MNC, then we check the operator list received by Scan() method. If operator with desired MCC+MNC is there, we register with it. If it's not there, we start scanning() again asynchronously hoping the operator will show up in next scan. As scanning() and registration is done in the background, tests are expected to call connect(), and then later on wait for the modem to register by waiting/polling the method "modem.is_connected()". Tests first check for the modem being connected and after with MSC subscriber_attached(). The order is intentional because the later has to poll through network and adds unneeded garbage to the pcap files bein recorded. Change-Id: I8d9eb47eac1044550d3885adb55105c304b0c15c
2017-05-29 12:25:22 +00:00
def _async_result_handler(obj, result, user_data):
'''Generic callback dispatcher called from glib loop when an async method
call has returned. This callback is set up by method dbus_async_call.'''
(result_callback, error_callback, real_user_data) = user_data
try:
ret = obj.call_finish(result)
except Exception as e:
if isinstance(e, GLib.Error) and e.code == Gio.IOErrorEnum.CANCELLED:
log.dbg('DBus method cancelled')
return
ofono_client: Implement network registration during connect() A new mcc_mnc parameter is now optionally passed to connect() in order to manually register to a specific network with a given MCC+MNC pair. If no parameter is passed (or None), then the modem will be instructed to attempt an automatic registration with any available network which permits it. We get the MCC+MNC parameter from the MSC/NITB and we pass it to the modem object at connect time as shown in the modified tests. Two new simple tests to check network registration is working are added in this commit. Ofono modems seem to be automatically registering at some point after they are set Online=true, and we were actually using that 'feature' before this patch. Thus, it is possible that a modem quickly becomes registered, and we then check so before starting the scan+registration process, which can take a few seconds. The scanning method can take a few seconds to complete. To avoid blocking in the dbus ofono Scan() method, this commit adds some code to make use of glib/gdbus async methods, which are not yet supported directly by pydbus. This way, we can continue polling while waiting for the scan process to complete and we can register several modems in parallel. When scan completes, a callback is run which attempts to register. If no MCC+MNC was passed, as we just finished scanning the modem should have enough fresh operator information to take good and quick decisions on where to connect. If we have an MCC+MNC, then we check the operator list received by Scan() method. If operator with desired MCC+MNC is there, we register with it. If it's not there, we start scanning() again asynchronously hoping the operator will show up in next scan. As scanning() and registration is done in the background, tests are expected to call connect(), and then later on wait for the modem to register by waiting/polling the method "modem.is_connected()". Tests first check for the modem being connected and after with MSC subscriber_attached(). The order is intentional because the later has to poll through network and adds unneeded garbage to the pcap files bein recorded. Change-Id: I8d9eb47eac1044550d3885adb55105c304b0c15c
2017-05-29 12:25:22 +00:00
if error_callback:
error_callback(obj, e, real_user_data)
else:
result_callback(obj, e, real_user_data)
return
ret = ret.unpack()
# to be compatible with standard Python behaviour, unbox
# single-element tuples and return None for empty result tuples
if len(ret) == 1:
ret = ret[0]
elif len(ret) == 0:
ret = None
result_callback(obj, ret, real_user_data)
def dbus_async_call(instance, proxymethod, *proxymethod_args,
result_handler=None, error_handler=None,
user_data=None, timeout=30, cancellable=None,
ofono_client: Implement network registration during connect() A new mcc_mnc parameter is now optionally passed to connect() in order to manually register to a specific network with a given MCC+MNC pair. If no parameter is passed (or None), then the modem will be instructed to attempt an automatic registration with any available network which permits it. We get the MCC+MNC parameter from the MSC/NITB and we pass it to the modem object at connect time as shown in the modified tests. Two new simple tests to check network registration is working are added in this commit. Ofono modems seem to be automatically registering at some point after they are set Online=true, and we were actually using that 'feature' before this patch. Thus, it is possible that a modem quickly becomes registered, and we then check so before starting the scan+registration process, which can take a few seconds. The scanning method can take a few seconds to complete. To avoid blocking in the dbus ofono Scan() method, this commit adds some code to make use of glib/gdbus async methods, which are not yet supported directly by pydbus. This way, we can continue polling while waiting for the scan process to complete and we can register several modems in parallel. When scan completes, a callback is run which attempts to register. If no MCC+MNC was passed, as we just finished scanning the modem should have enough fresh operator information to take good and quick decisions on where to connect. If we have an MCC+MNC, then we check the operator list received by Scan() method. If operator with desired MCC+MNC is there, we register with it. If it's not there, we start scanning() again asynchronously hoping the operator will show up in next scan. As scanning() and registration is done in the background, tests are expected to call connect(), and then later on wait for the modem to register by waiting/polling the method "modem.is_connected()". Tests first check for the modem being connected and after with MSC subscriber_attached(). The order is intentional because the later has to poll through network and adds unneeded garbage to the pcap files bein recorded. Change-Id: I8d9eb47eac1044550d3885adb55105c304b0c15c
2017-05-29 12:25:22 +00:00
**proxymethod_kwargs):
'''pydbus doesn't support asynchronous methods. This method adds support for
it until pydbus implements it'''
argdiff = len(proxymethod_args) - len(proxymethod._inargs)
if argdiff < 0:
raise TypeError(proxymethod.__qualname__ + " missing {} required positional argument(s)".format(-argdiff))
elif argdiff > 0:
raise TypeError(proxymethod.__qualname__ + " takes {} positional argument(s) but {} was/were given".format(len(proxymethod._inargs), len(proxymethod_args)))
timeout = timeout * 1000
user_data = (result_handler, error_handler, user_data)
# See https://lazka.github.io/pgi-docs/Gio-2.0/classes/DBusProxy.html#Gio.DBusProxy.call
instance._bus.con.call(
ofono_client: Implement network registration during connect() A new mcc_mnc parameter is now optionally passed to connect() in order to manually register to a specific network with a given MCC+MNC pair. If no parameter is passed (or None), then the modem will be instructed to attempt an automatic registration with any available network which permits it. We get the MCC+MNC parameter from the MSC/NITB and we pass it to the modem object at connect time as shown in the modified tests. Two new simple tests to check network registration is working are added in this commit. Ofono modems seem to be automatically registering at some point after they are set Online=true, and we were actually using that 'feature' before this patch. Thus, it is possible that a modem quickly becomes registered, and we then check so before starting the scan+registration process, which can take a few seconds. The scanning method can take a few seconds to complete. To avoid blocking in the dbus ofono Scan() method, this commit adds some code to make use of glib/gdbus async methods, which are not yet supported directly by pydbus. This way, we can continue polling while waiting for the scan process to complete and we can register several modems in parallel. When scan completes, a callback is run which attempts to register. If no MCC+MNC was passed, as we just finished scanning the modem should have enough fresh operator information to take good and quick decisions on where to connect. If we have an MCC+MNC, then we check the operator list received by Scan() method. If operator with desired MCC+MNC is there, we register with it. If it's not there, we start scanning() again asynchronously hoping the operator will show up in next scan. As scanning() and registration is done in the background, tests are expected to call connect(), and then later on wait for the modem to register by waiting/polling the method "modem.is_connected()". Tests first check for the modem being connected and after with MSC subscriber_attached(). The order is intentional because the later has to poll through network and adds unneeded garbage to the pcap files bein recorded. Change-Id: I8d9eb47eac1044550d3885adb55105c304b0c15c
2017-05-29 12:25:22 +00:00
instance._bus_name, instance._path,
proxymethod._iface_name, proxymethod.__name__,
GLib.Variant(proxymethod._sinargs, proxymethod_args),
GLib.VariantType.new(proxymethod._soutargs),
0, timeout, cancellable,
ofono_client: Implement network registration during connect() A new mcc_mnc parameter is now optionally passed to connect() in order to manually register to a specific network with a given MCC+MNC pair. If no parameter is passed (or None), then the modem will be instructed to attempt an automatic registration with any available network which permits it. We get the MCC+MNC parameter from the MSC/NITB and we pass it to the modem object at connect time as shown in the modified tests. Two new simple tests to check network registration is working are added in this commit. Ofono modems seem to be automatically registering at some point after they are set Online=true, and we were actually using that 'feature' before this patch. Thus, it is possible that a modem quickly becomes registered, and we then check so before starting the scan+registration process, which can take a few seconds. The scanning method can take a few seconds to complete. To avoid blocking in the dbus ofono Scan() method, this commit adds some code to make use of glib/gdbus async methods, which are not yet supported directly by pydbus. This way, we can continue polling while waiting for the scan process to complete and we can register several modems in parallel. When scan completes, a callback is run which attempts to register. If no MCC+MNC was passed, as we just finished scanning the modem should have enough fresh operator information to take good and quick decisions on where to connect. If we have an MCC+MNC, then we check the operator list received by Scan() method. If operator with desired MCC+MNC is there, we register with it. If it's not there, we start scanning() again asynchronously hoping the operator will show up in next scan. As scanning() and registration is done in the background, tests are expected to call connect(), and then later on wait for the modem to register by waiting/polling the method "modem.is_connected()". Tests first check for the modem being connected and after with MSC subscriber_attached(). The order is intentional because the later has to poll through network and adds unneeded garbage to the pcap files bein recorded. Change-Id: I8d9eb47eac1044550d3885adb55105c304b0c15c
2017-05-29 12:25:22 +00:00
_async_result_handler, user_data)
def dbus_call_dismiss_error(log_obj, err_str, method):
try:
method()
except GLib.Error as e:
if Gio.DBusError.is_remote_error(e) and Gio.DBusError.get_remote_error(e) == err_str:
log_obj.log('Dismissed Dbus method error: %r' % e)
return
raise e
class ModemDbusInteraction(log.Origin):
'''Work around inconveniences specific to pydbus and ofono.
ofono adds and removes DBus interfaces and notifies about them.
Upon changes we need a fresh pydbus object to benefit from that.
Watching the interfaces change is optional; be sure to call
watch_interfaces() if you'd like to have signals subscribed.
Related: https://github.com/LEW21/pydbus/issues/56
'''
fix and refactor logging: drop 'with', simplify With the recent fix of the junit report related issues, another issue arose: the 'with log.Origin' was changed to disallow __enter__ing an object twice to fix problems, now still code would fail because it tries to do 'with' on the same object twice. The only reason is to ensure that logging is associated with a given object. Instead of complicating even more, implement differently. Refactor logging to simplify use: drop the 'with Origin' style completely, and instead use the python stack to determine which objects are created by which, and which object to associate a log statement with. The new way: we rely on the convention that each class instance has a local 'self' referencing the object instance. If we need to find an origin as a new object's parent, or to associate a log message with, we traverse each stack frame, fetching the first local 'self' object that is a log.Origin class instance. How to use: Simply call log.log() anywhere, and it finds an Origin object to log for, from the stack. Alternatively call self.log() for any Origin() object to skip the lookup. Create classes as child class of log.Origin and make sure to call super().__init__(category, name). This constructor will magically find a parent Origin on the stack. When an exception happens, we first escalate the exception up through call scopes to where ever it is handled by log.log_exn(). This then finds an Origin object in the traceback's stack frames, no need to nest in 'with' scopes. Hence the 'with log.Origin' now "happens implicitly", we can write pure natural python code, no more hassles with scope ordering. Furthermore, any frame can place additional logging information in a frame by calling log.ctx(). This is automatically inserted in the ancestry associated with a log statement / exception. Change-Id: I5f9b53150f2bb6fa9d63ce27f0806f0ca6a45e90
2017-06-09 23:18:27 +00:00
modem_path = None
watch_props_subscription = None
_dbus_obj = None
interfaces = None
def __init__(self, modem_path):
self.modem_path = modem_path
fix and refactor logging: drop 'with', simplify With the recent fix of the junit report related issues, another issue arose: the 'with log.Origin' was changed to disallow __enter__ing an object twice to fix problems, now still code would fail because it tries to do 'with' on the same object twice. The only reason is to ensure that logging is associated with a given object. Instead of complicating even more, implement differently. Refactor logging to simplify use: drop the 'with Origin' style completely, and instead use the python stack to determine which objects are created by which, and which object to associate a log statement with. The new way: we rely on the convention that each class instance has a local 'self' referencing the object instance. If we need to find an origin as a new object's parent, or to associate a log message with, we traverse each stack frame, fetching the first local 'self' object that is a log.Origin class instance. How to use: Simply call log.log() anywhere, and it finds an Origin object to log for, from the stack. Alternatively call self.log() for any Origin() object to skip the lookup. Create classes as child class of log.Origin and make sure to call super().__init__(category, name). This constructor will magically find a parent Origin on the stack. When an exception happens, we first escalate the exception up through call scopes to where ever it is handled by log.log_exn(). This then finds an Origin object in the traceback's stack frames, no need to nest in 'with' scopes. Hence the 'with log.Origin' now "happens implicitly", we can write pure natural python code, no more hassles with scope ordering. Furthermore, any frame can place additional logging information in a frame by calling log.ctx(). This is automatically inserted in the ancestry associated with a log statement / exception. Change-Id: I5f9b53150f2bb6fa9d63ce27f0806f0ca6a45e90
2017-06-09 23:18:27 +00:00
super().__init__(log.C_BUS, self.modem_path)
self.interfaces = set()
# A dict listing signal handlers to connect, e.g.
# { I_SMS: ( ('IncomingMessage', self._on_incoming_message), ), }
self.required_signals = {}
# A dict collecting subscription tokens for connected signal handlers.
# { I_SMS: ( token1, token2, ... ), }
self.connected_signals = util.listdict()
def cleanup(self):
self.set_powered(False)
self.unwatch_interfaces()
for interface_name in list(self.connected_signals.keys()):
self.remove_signals(interface_name)
def __del__(self):
self.cleanup()
def get_new_dbus_obj(self):
return systembus_get(self.modem_path)
def dbus_obj(self):
if self._dbus_obj is None:
self._dbus_obj = self.get_new_dbus_obj()
return self._dbus_obj
def interface(self, interface_name):
try:
return self.dbus_obj()[interface_name]
except KeyError:
fix and refactor logging: drop 'with', simplify With the recent fix of the junit report related issues, another issue arose: the 'with log.Origin' was changed to disallow __enter__ing an object twice to fix problems, now still code would fail because it tries to do 'with' on the same object twice. The only reason is to ensure that logging is associated with a given object. Instead of complicating even more, implement differently. Refactor logging to simplify use: drop the 'with Origin' style completely, and instead use the python stack to determine which objects are created by which, and which object to associate a log statement with. The new way: we rely on the convention that each class instance has a local 'self' referencing the object instance. If we need to find an origin as a new object's parent, or to associate a log message with, we traverse each stack frame, fetching the first local 'self' object that is a log.Origin class instance. How to use: Simply call log.log() anywhere, and it finds an Origin object to log for, from the stack. Alternatively call self.log() for any Origin() object to skip the lookup. Create classes as child class of log.Origin and make sure to call super().__init__(category, name). This constructor will magically find a parent Origin on the stack. When an exception happens, we first escalate the exception up through call scopes to where ever it is handled by log.log_exn(). This then finds an Origin object in the traceback's stack frames, no need to nest in 'with' scopes. Hence the 'with log.Origin' now "happens implicitly", we can write pure natural python code, no more hassles with scope ordering. Furthermore, any frame can place additional logging information in a frame by calling log.ctx(). This is automatically inserted in the ancestry associated with a log statement / exception. Change-Id: I5f9b53150f2bb6fa9d63ce27f0806f0ca6a45e90
2017-06-09 23:18:27 +00:00
raise log.Error('Modem interface is not available:', interface_name)
def signal(self, interface_name, signal):
return getattr(self.interface(interface_name), signal)
def watch_interfaces(self):
self.unwatch_interfaces()
# Note: we are watching the properties on a get_new_dbus_obj() that is
# separate from the one used to interact with interfaces. We need to
# refresh the pydbus object to interact with Interfaces that have newly
# appeared, but exchanging the DBus object to watch Interfaces being
# enabled and disabled is racy: we may skip some removals and
# additions. Hence do not exchange this DBus object. We don't even
# need to store the dbus object used for this, we will not touch it
# again. We only store the signal subscription.
self.watch_props_subscription = dbus_connect(self.get_new_dbus_obj().PropertyChanged,
self.on_property_change)
self.on_interfaces_change(self.properties().get('Interfaces'))
def unwatch_interfaces(self):
if self.watch_props_subscription is None:
return
self.watch_props_subscription.disconnect()
self.watch_props_subscription = None
def on_property_change(self, name, value):
if name == 'Interfaces':
self.on_interfaces_change(value)
modem: log property changes from Modem interface The purpose of this patch is to make sure we output to the extended log some information regarding the firwmare version of the modem being used, even if the test being run doesn't call print(modem.info) explicitly. Printing self.info() just after waiting for Powered==true doesn't always work, because the properties are not populated after a while after. Calling Modem.GetProperties() immediately after receiving Powered==true, will provide with empty strings on properties such as Revision. 18:59:55.747412 bus /sierra_1: Setting Powered True 18:59:55.938889 bus /sierra_1: DBG: 'org.ofono.Modem'.PropertyChanged() -> Powered=True 18:59:55.964694 bus /sierra_1: DBG: Powered == True 18:59:55.987777 bus /sierra_1: DBG: interface enabled: org.ofono.VoiceCallManager 18:59:56.006278 bus /sierra_1: DBG: 'org.ofono.Modem'.PropertyChanged() -> Features=[] 18:59:56.027366 bus /sierra_1: Setting Online True 18:59:56.189991 bus /sierra_1: DBG: 'org.ofono.Modem'.PropertyChanged() -> Manufacturer=Sierra Wireless, Incorporated 18:59:56.210935 bus /sierra_1: DBG: interface enabled: org.ofono.LocationReporting 18:59:56.233217 bus /sierra_1: DBG: 'org.ofono.Modem'.PropertyChanged() -> Features=['gps'] 18:59:56.255807 bus /sierra_1: DBG: 'org.ofono.Modem'.PropertyChanged() -> Model=MC7304 18:59:56.279374 bus /sierra_1: DBG: 'org.ofono.Modem'.PropertyChanged() -> Online=True 18:59:56.315008 bus /sierra_1: DBG: Online == True 18:59:56.338779 bus /sierra_1: DBG: interface enabled: org.ofono.SimManager 18:59:56.357744 bus /sierra_1: DBG: 'org.ofono.Modem'.PropertyChanged() -> Features=['sim', 'gps'] 18:59:56.379532 bus /sierra_1: DBG: 'org.ofono.Modem'.PropertyChanged() -> Revision=SWI9X15C_06.03.32.04 r28324 CNSHZ-AR-BUILD 2015/05/25 01:09:15 18:59:56.401241 bus /sierra_1: DBG: 'org.ofono.Modem'.PropertyChanged() -> Serial=356853054230919 Change-Id: If215c7d63ce1b86314ed25f8e76413b15676b7f5
2017-09-05 17:04:06 +00:00
else:
self.dbg('%r.PropertyChanged() -> %s=%s' % (I_MODEM, name, value))
def on_interfaces_change(self, interfaces_now):
# First some logging.
now = set(interfaces_now)
additions = now - self.interfaces
removals = self.interfaces - now
self.interfaces = now
if not (additions or removals):
# nothing changed.
return
if additions:
self.dbg('interface enabled:', ', '.join(sorted(additions)))
if removals:
self.dbg('interface disabled:', ', '.join(sorted(removals)))
# The dbus object is now stale and needs refreshing before we
# access the next interface function.
self._dbus_obj = None
# If an interface disappeared, disconnect the signal handlers for it.
# Even though we're going to use a fresh dbus object for new
# subscriptions, we will still keep active subscriptions alive on the
# old dbus object which will linger, associated with the respective
# signal subscription.
for removed in removals:
self.remove_signals(removed)
# Connect signals for added interfaces.
for interface_name in additions:
self.connect_signals(interface_name)
def remove_signals(self, interface_name):
got = self.connected_signals.pop(interface_name, [])
if not got:
return
self.dbg('Disconnecting', len(got), 'signals for', interface_name)
for subscription in got:
subscription.disconnect()
def connect_signals(self, interface_name):
# If an interface was added, it must not have existed before. For
# paranoia, make sure we have no handlers for those.
self.remove_signals(interface_name)
want = self.required_signals.get(interface_name, [])
if not want:
return
self.dbg('Connecting', len(want), 'signals for', interface_name)
for signal, cb in self.required_signals.get(interface_name, []):
subscription = dbus_connect(self.signal(interface_name, signal), cb)
self.connected_signals.add(interface_name, subscription)
def has_interface(self, *interface_names):
try:
for interface_name in interface_names:
self.dbus_obj()[interface_name]
result = True
except KeyError:
result = False
self.dbg('has_interface(%s) ==' % (', '.join(interface_names)), result)
return result
def properties(self, iface=I_MODEM):
return self.dbus_obj()[iface].GetProperties()
def property_is(self, name, val, iface=I_MODEM):
is_val = self.properties(iface).get(name)
self.dbg(name, '==', is_val)
return is_val is not None and is_val == val
def set_bool(self, name, bool_val, iface=I_MODEM):
# to make sure any pending signals are received before we send out more DBus requests
MainLoop.poll()
val = bool(bool_val)
self.log('Setting', name, val)
self.interface(iface).SetProperty(name, Variant('b', val))
MainLoop.wait(self, self.property_is, name, bool_val)
def set_powered(self, powered=True):
self.set_bool('Powered', powered)
def set_online(self, online=True):
self.set_bool('Online', online)
def is_powered(self):
return self.property_is('Powered', True)
def is_online(self):
return self.property_is('Online', True)
class ModemCall(log.Origin):
'ofono Modem voicecall dbus object'
def __init__(self, modem, dbuspath):
super().__init__(log.C_TST, dbuspath)
self.modem = modem
self.dbuspath = dbuspath
self.signal_list = []
self.register_signals()
def register_signals(self):
call_dbus_obj = systembus_get(self.dbuspath)
subscr = dbus_connect(call_dbus_obj.PropertyChanged, lambda name, value: self.on_voicecall_property_change(self.dbuspath, name, value))
self.signal_list.append(subscr)
subscr = dbus_connect(call_dbus_obj.DisconnectReason, lambda reason: self.on_voicecall_disconnect_reason(self.dbuspath, reason))
self.signal_list.append(subscr)
def unregister_signals(self):
for subscr in self.signal_list:
subscr.disconnect()
self.signal_list = []
def cleanup(self):
self.unregister_signals()
def __del__(self):
self.cleanup()
def on_voicecall_property_change(self, obj_path, name, value):
self.dbg('%r:%r.PropertyChanged() -> %s=%s' % (obj_path, I_VOICECALL, name, value))
def on_voicecall_disconnect_reason(self, obj_path, reason):
self.dbg('%r:%r.DisconnectReason() -> %s' % (obj_path, I_VOICECALL, reason))
class Modem(MS):
'convenience for ofono Modem interaction'
CTX_PROT_IPv4 = 'ip'
CTX_PROT_IPv6 = 'ipv6'
CTX_PROT_IPv46 = 'dual'
def __init__(self, testenv, conf):
super().__init__('modem', conf)
_import_external_modules()
self.syspath = conf.get('path')
self.dbuspath = get_dbuspath_from_syspath(self.syspath)
self.set_name(self.dbuspath)
self.dbg('creating from syspath %s' % self.syspath)
self._ki = None
self._imsi = None
self._apn_ipaddr = None
self.run_dir = util.Dir(testenv.suite().get_run_dir().new_dir(self.name().strip('/')))
self.sms_received_list = []
self.dbus = ModemDbusInteraction(self.dbuspath)
ofono_client: Implement network registration during connect() A new mcc_mnc parameter is now optionally passed to connect() in order to manually register to a specific network with a given MCC+MNC pair. If no parameter is passed (or None), then the modem will be instructed to attempt an automatic registration with any available network which permits it. We get the MCC+MNC parameter from the MSC/NITB and we pass it to the modem object at connect time as shown in the modified tests. Two new simple tests to check network registration is working are added in this commit. Ofono modems seem to be automatically registering at some point after they are set Online=true, and we were actually using that 'feature' before this patch. Thus, it is possible that a modem quickly becomes registered, and we then check so before starting the scan+registration process, which can take a few seconds. The scanning method can take a few seconds to complete. To avoid blocking in the dbus ofono Scan() method, this commit adds some code to make use of glib/gdbus async methods, which are not yet supported directly by pydbus. This way, we can continue polling while waiting for the scan process to complete and we can register several modems in parallel. When scan completes, a callback is run which attempts to register. If no MCC+MNC was passed, as we just finished scanning the modem should have enough fresh operator information to take good and quick decisions on where to connect. If we have an MCC+MNC, then we check the operator list received by Scan() method. If operator with desired MCC+MNC is there, we register with it. If it's not there, we start scanning() again asynchronously hoping the operator will show up in next scan. As scanning() and registration is done in the background, tests are expected to call connect(), and then later on wait for the modem to register by waiting/polling the method "modem.is_connected()". Tests first check for the modem being connected and after with MSC subscriber_attached(). The order is intentional because the later has to poll through network and adds unneeded garbage to the pcap files bein recorded. Change-Id: I8d9eb47eac1044550d3885adb55105c304b0c15c
2017-05-29 12:25:22 +00:00
self.register_attempts = 0
self.call_list = []
# one Cancellable can handle several concurrent methods.
self.cancellable = Gio.Cancellable.new()
self.dbus.required_signals = {
I_SMS: ( ('IncomingMessage', self._on_incoming_message), ),
I_NETREG: ( ('PropertyChanged', self._on_netreg_property_changed), ),
I_CONNMGR: ( ('PropertyChanged', self._on_connmgr_property_changed), ),
I_CALLMGR: ( ('PropertyChanged', self._on_callmgr_property_changed),
('CallAdded', self._on_callmgr_call_added),
('CallRemoved', self._on_callmgr_call_removed), ),
}
self.dbus.watch_interfaces()
def cleanup(self):
self.dbg('cleanup')
if self.cancellable:
self.cancel_pending_dbus_methods()
self.cancellable = None
if self.is_powered():
self.power_off()
for call_obj in self.call_list:
call_obj.cleanup()
self.call_list = []
self.dbus.cleanup()
self.dbus = None
def netns(self):
return os.path.basename(self.syspath.rstrip('/'))
def properties(self, *args, **kwargs):
'''Return a dict of properties on this modem. For the actual arguments,
see ModemDbusInteraction.properties(), which this function calls. The
returned dict is defined by ofono. An example is:
{'Lockdown': False,
'Powered': True,
'Model': 'MC7304',
'Revision': 'SWI9X15C_05.05.66.00 r29972 CARMD-EV-FRMWR1 2015/10/08 08:36:28',
'Manufacturer': 'Sierra Wireless, Incorporated',
'Emergency': False,
'Interfaces': ['org.ofono.SmartMessaging',
'org.ofono.PushNotification',
'org.ofono.MessageManager',
'org.ofono.NetworkRegistration',
'org.ofono.ConnectionManager',
'org.ofono.SupplementaryServices',
'org.ofono.RadioSettings',
'org.ofono.AllowedAccessPoints',
'org.ofono.SimManager',
'org.ofono.LocationReporting',
'org.ofono.VoiceCallManager'],
'Serial': '356853054230919',
'Features': ['sms', 'net', 'gprs', 'ussd', 'rat', 'sim', 'gps'],
'Type': 'hardware',
'Online': True}
'''
return self.dbus.properties(*args, **kwargs)
def set_powered(self, powered=True):
return self.dbus.set_powered(powered=powered)
def set_online(self, online=True):
return self.dbus.set_online(online=online)
def is_powered(self):
return self.dbus.is_powered()
def is_online(self):
return self.dbus.is_online()
def imsi(self):
if self._imsi is None:
if 'sim' in self.features():
if not self.is_powered():
self.set_powered()
# wait for SimManager iface to appear after we power on
MainLoop.wait(self, self.dbus.has_interface, I_SIMMGR, timeout=10)
simmgr = self.dbus.interface(I_SIMMGR)
# If properties are requested quickly, it may happen that Sim property is still not there.
MainLoop.wait(self, lambda: simmgr.GetProperties().get('SubscriberIdentity', None) is not None, timeout=10)
props = simmgr.GetProperties()
self.dbg('got SIM properties', props)
self._imsi = props.get('SubscriberIdentity', None)
else:
self._imsi = super().imsi()
if self._imsi is None:
raise log.Error('No IMSI')
return self._imsi
def set_ki(self, ki):
self._ki = ki
def ki(self):
if self._ki is not None:
return self._ki
return super().ki()
def apn_ipaddr(self):
if self._apn_ipaddr is not None:
return self._apn_ipaddr
return 'dynamic'
def features(self):
return self._conf.get('features', [])
def _required_ifaces(self):
req_ifaces = (I_NETREG,)
req_ifaces += (I_SMS,) if 'sms' in self.features() else ()
req_ifaces += (I_SS,) if 'ussd' in self.features() else ()
req_ifaces += (I_CONNMGR,) if 'gprs' in self.features() else ()
req_ifaces += (I_SIMMGR,) if 'sim' in self.features() else ()
return req_ifaces
def _on_netreg_property_changed(self, name, value):
self.dbg('%r.PropertyChanged() -> %s=%s' % (I_NETREG, name, value))
ofono_client: Implement network registration during connect() A new mcc_mnc parameter is now optionally passed to connect() in order to manually register to a specific network with a given MCC+MNC pair. If no parameter is passed (or None), then the modem will be instructed to attempt an automatic registration with any available network which permits it. We get the MCC+MNC parameter from the MSC/NITB and we pass it to the modem object at connect time as shown in the modified tests. Two new simple tests to check network registration is working are added in this commit. Ofono modems seem to be automatically registering at some point after they are set Online=true, and we were actually using that 'feature' before this patch. Thus, it is possible that a modem quickly becomes registered, and we then check so before starting the scan+registration process, which can take a few seconds. The scanning method can take a few seconds to complete. To avoid blocking in the dbus ofono Scan() method, this commit adds some code to make use of glib/gdbus async methods, which are not yet supported directly by pydbus. This way, we can continue polling while waiting for the scan process to complete and we can register several modems in parallel. When scan completes, a callback is run which attempts to register. If no MCC+MNC was passed, as we just finished scanning the modem should have enough fresh operator information to take good and quick decisions on where to connect. If we have an MCC+MNC, then we check the operator list received by Scan() method. If operator with desired MCC+MNC is there, we register with it. If it's not there, we start scanning() again asynchronously hoping the operator will show up in next scan. As scanning() and registration is done in the background, tests are expected to call connect(), and then later on wait for the modem to register by waiting/polling the method "modem.is_connected()". Tests first check for the modem being connected and after with MSC subscriber_attached(). The order is intentional because the later has to poll through network and adds unneeded garbage to the pcap files bein recorded. Change-Id: I8d9eb47eac1044550d3885adb55105c304b0c15c
2017-05-29 12:25:22 +00:00
def is_connected(self, mcc_mnc=None):
netreg = self.dbus.interface(I_NETREG)
prop = netreg.GetProperties()
status = prop.get('Status')
self.dbg('status:', status)
ofono_client: Implement network registration during connect() A new mcc_mnc parameter is now optionally passed to connect() in order to manually register to a specific network with a given MCC+MNC pair. If no parameter is passed (or None), then the modem will be instructed to attempt an automatic registration with any available network which permits it. We get the MCC+MNC parameter from the MSC/NITB and we pass it to the modem object at connect time as shown in the modified tests. Two new simple tests to check network registration is working are added in this commit. Ofono modems seem to be automatically registering at some point after they are set Online=true, and we were actually using that 'feature' before this patch. Thus, it is possible that a modem quickly becomes registered, and we then check so before starting the scan+registration process, which can take a few seconds. The scanning method can take a few seconds to complete. To avoid blocking in the dbus ofono Scan() method, this commit adds some code to make use of glib/gdbus async methods, which are not yet supported directly by pydbus. This way, we can continue polling while waiting for the scan process to complete and we can register several modems in parallel. When scan completes, a callback is run which attempts to register. If no MCC+MNC was passed, as we just finished scanning the modem should have enough fresh operator information to take good and quick decisions on where to connect. If we have an MCC+MNC, then we check the operator list received by Scan() method. If operator with desired MCC+MNC is there, we register with it. If it's not there, we start scanning() again asynchronously hoping the operator will show up in next scan. As scanning() and registration is done in the background, tests are expected to call connect(), and then later on wait for the modem to register by waiting/polling the method "modem.is_connected()". Tests first check for the modem being connected and after with MSC subscriber_attached(). The order is intentional because the later has to poll through network and adds unneeded garbage to the pcap files bein recorded. Change-Id: I8d9eb47eac1044550d3885adb55105c304b0c15c
2017-05-29 12:25:22 +00:00
if not (status == NETREG_ST_REGISTERED or status == NETREG_ST_ROAMING):
return False
if mcc_mnc is None: # Any network is fine and we are registered.
return True
mcc = prop.get('MobileCountryCode')
mnc = prop.get('MobileNetworkCode')
if (mcc, mnc) == mcc_mnc:
return True
return False
def schedule_scan_register(self, mcc_mnc):
if self.register_attempts > NETREG_MAX_REGISTER_ATTEMPTS:
raise log.Error('Failed to find Network Operator', mcc_mnc=mcc_mnc, attempts=self.register_attempts)
ofono_client: Implement network registration during connect() A new mcc_mnc parameter is now optionally passed to connect() in order to manually register to a specific network with a given MCC+MNC pair. If no parameter is passed (or None), then the modem will be instructed to attempt an automatic registration with any available network which permits it. We get the MCC+MNC parameter from the MSC/NITB and we pass it to the modem object at connect time as shown in the modified tests. Two new simple tests to check network registration is working are added in this commit. Ofono modems seem to be automatically registering at some point after they are set Online=true, and we were actually using that 'feature' before this patch. Thus, it is possible that a modem quickly becomes registered, and we then check so before starting the scan+registration process, which can take a few seconds. The scanning method can take a few seconds to complete. To avoid blocking in the dbus ofono Scan() method, this commit adds some code to make use of glib/gdbus async methods, which are not yet supported directly by pydbus. This way, we can continue polling while waiting for the scan process to complete and we can register several modems in parallel. When scan completes, a callback is run which attempts to register. If no MCC+MNC was passed, as we just finished scanning the modem should have enough fresh operator information to take good and quick decisions on where to connect. If we have an MCC+MNC, then we check the operator list received by Scan() method. If operator with desired MCC+MNC is there, we register with it. If it's not there, we start scanning() again asynchronously hoping the operator will show up in next scan. As scanning() and registration is done in the background, tests are expected to call connect(), and then later on wait for the modem to register by waiting/polling the method "modem.is_connected()". Tests first check for the modem being connected and after with MSC subscriber_attached(). The order is intentional because the later has to poll through network and adds unneeded garbage to the pcap files bein recorded. Change-Id: I8d9eb47eac1044550d3885adb55105c304b0c15c
2017-05-29 12:25:22 +00:00
self.register_attempts += 1
netreg = self.dbus.interface(I_NETREG)
self.dbg('Scanning for operators...')
# Scan method can take several seconds, and we don't want to block
# waiting for that. Make it async and try to register when the scan is
# finished.
register_func = self.scan_cb_register_automatic if mcc_mnc is None else self.scan_cb_register
result_handler = lambda obj, result, user_data: MainLoop.defer(register_func, result, user_data)
error_handler = lambda obj, e, user_data: MainLoop.defer(self.scan_cb_error_handler, e, mcc_mnc)
dbus_async_call(netreg, netreg.Scan, timeout=30, cancellable=self.cancellable,
result_handler=result_handler, error_handler=error_handler,
user_data=mcc_mnc)
ofono_client: Implement network registration during connect() A new mcc_mnc parameter is now optionally passed to connect() in order to manually register to a specific network with a given MCC+MNC pair. If no parameter is passed (or None), then the modem will be instructed to attempt an automatic registration with any available network which permits it. We get the MCC+MNC parameter from the MSC/NITB and we pass it to the modem object at connect time as shown in the modified tests. Two new simple tests to check network registration is working are added in this commit. Ofono modems seem to be automatically registering at some point after they are set Online=true, and we were actually using that 'feature' before this patch. Thus, it is possible that a modem quickly becomes registered, and we then check so before starting the scan+registration process, which can take a few seconds. The scanning method can take a few seconds to complete. To avoid blocking in the dbus ofono Scan() method, this commit adds some code to make use of glib/gdbus async methods, which are not yet supported directly by pydbus. This way, we can continue polling while waiting for the scan process to complete and we can register several modems in parallel. When scan completes, a callback is run which attempts to register. If no MCC+MNC was passed, as we just finished scanning the modem should have enough fresh operator information to take good and quick decisions on where to connect. If we have an MCC+MNC, then we check the operator list received by Scan() method. If operator with desired MCC+MNC is there, we register with it. If it's not there, we start scanning() again asynchronously hoping the operator will show up in next scan. As scanning() and registration is done in the background, tests are expected to call connect(), and then later on wait for the modem to register by waiting/polling the method "modem.is_connected()". Tests first check for the modem being connected and after with MSC subscriber_attached(). The order is intentional because the later has to poll through network and adds unneeded garbage to the pcap files bein recorded. Change-Id: I8d9eb47eac1044550d3885adb55105c304b0c15c
2017-05-29 12:25:22 +00:00
def scan_cb_error_handler(self, e, mcc_mnc):
# It was detected that Scan() method can fail for some modems on some
# specific circumstances. For instance it fails with org.ofono.Error.Failed
# if the modem starts to register internally after we started Scan() and
# the registering succeeds while we are still waiting for Scan() to finsih.
# So far the easiest seems to check if we are now registered and
# otherwise schedule a scan again.
self.err('Scan() failed, retrying if needed:', e)
if not self.is_connected(mcc_mnc):
self.schedule_scan_register(mcc_mnc)
else:
self.log('Already registered with network', mcc_mnc)
ofono_client: Implement network registration during connect() A new mcc_mnc parameter is now optionally passed to connect() in order to manually register to a specific network with a given MCC+MNC pair. If no parameter is passed (or None), then the modem will be instructed to attempt an automatic registration with any available network which permits it. We get the MCC+MNC parameter from the MSC/NITB and we pass it to the modem object at connect time as shown in the modified tests. Two new simple tests to check network registration is working are added in this commit. Ofono modems seem to be automatically registering at some point after they are set Online=true, and we were actually using that 'feature' before this patch. Thus, it is possible that a modem quickly becomes registered, and we then check so before starting the scan+registration process, which can take a few seconds. The scanning method can take a few seconds to complete. To avoid blocking in the dbus ofono Scan() method, this commit adds some code to make use of glib/gdbus async methods, which are not yet supported directly by pydbus. This way, we can continue polling while waiting for the scan process to complete and we can register several modems in parallel. When scan completes, a callback is run which attempts to register. If no MCC+MNC was passed, as we just finished scanning the modem should have enough fresh operator information to take good and quick decisions on where to connect. If we have an MCC+MNC, then we check the operator list received by Scan() method. If operator with desired MCC+MNC is there, we register with it. If it's not there, we start scanning() again asynchronously hoping the operator will show up in next scan. As scanning() and registration is done in the background, tests are expected to call connect(), and then later on wait for the modem to register by waiting/polling the method "modem.is_connected()". Tests first check for the modem being connected and after with MSC subscriber_attached(). The order is intentional because the later has to poll through network and adds unneeded garbage to the pcap files bein recorded. Change-Id: I8d9eb47eac1044550d3885adb55105c304b0c15c
2017-05-29 12:25:22 +00:00
def scan_cb_register_automatic(self, scanned_operators, mcc_mnc):
self.dbg('scanned operators: ', scanned_operators);
for op_path, op_prop in scanned_operators:
if op_prop.get('Status') == 'current':
mcc = op_prop.get('MobileCountryCode')
mnc = op_prop.get('MobileNetworkCode')
self.log('Already registered with network', (mcc, mnc))
return
self.log('Registering with the default network')
netreg = self.dbus.interface(I_NETREG)
dbus_call_dismiss_error(self, 'org.ofono.Error.InProgress', netreg.Register)
ofono_client: Implement network registration during connect() A new mcc_mnc parameter is now optionally passed to connect() in order to manually register to a specific network with a given MCC+MNC pair. If no parameter is passed (or None), then the modem will be instructed to attempt an automatic registration with any available network which permits it. We get the MCC+MNC parameter from the MSC/NITB and we pass it to the modem object at connect time as shown in the modified tests. Two new simple tests to check network registration is working are added in this commit. Ofono modems seem to be automatically registering at some point after they are set Online=true, and we were actually using that 'feature' before this patch. Thus, it is possible that a modem quickly becomes registered, and we then check so before starting the scan+registration process, which can take a few seconds. The scanning method can take a few seconds to complete. To avoid blocking in the dbus ofono Scan() method, this commit adds some code to make use of glib/gdbus async methods, which are not yet supported directly by pydbus. This way, we can continue polling while waiting for the scan process to complete and we can register several modems in parallel. When scan completes, a callback is run which attempts to register. If no MCC+MNC was passed, as we just finished scanning the modem should have enough fresh operator information to take good and quick decisions on where to connect. If we have an MCC+MNC, then we check the operator list received by Scan() method. If operator with desired MCC+MNC is there, we register with it. If it's not there, we start scanning() again asynchronously hoping the operator will show up in next scan. As scanning() and registration is done in the background, tests are expected to call connect(), and then later on wait for the modem to register by waiting/polling the method "modem.is_connected()". Tests first check for the modem being connected and after with MSC subscriber_attached(). The order is intentional because the later has to poll through network and adds unneeded garbage to the pcap files bein recorded. Change-Id: I8d9eb47eac1044550d3885adb55105c304b0c15c
2017-05-29 12:25:22 +00:00
def scan_cb_register(self, scanned_operators, mcc_mnc):
self.dbg('scanned operators: ', scanned_operators);
matching_op_path = None
for op_path, op_prop in scanned_operators:
mcc = op_prop.get('MobileCountryCode')
mnc = op_prop.get('MobileNetworkCode')
if (mcc, mnc) == mcc_mnc:
if op_prop.get('Status') == 'current':
self.log('Already registered with network', mcc_mnc)
# We discovered the network and we are already registered
# with it. Avoid calling op.Register() in this case (it
# won't act as a NO-OP, it actually returns an error).
return
matching_op_path = op_path
break
if matching_op_path is None:
self.dbg('Failed to find Network Operator', mcc_mnc=mcc_mnc, attempts=self.register_attempts)
self.schedule_scan_register(mcc_mnc)
return
dbus_op = systembus_get(matching_op_path)
self.log('Registering with operator', matching_op_path, mcc_mnc)
try:
dbus_call_dismiss_error(self, 'org.ofono.Error.InProgress', dbus_op.Register)
except GLib.Error as e:
if Gio.DBusError.is_remote_error(e) and Gio.DBusError.get_remote_error(e) == 'org.ofono.Error.NotSupported':
self.log('modem does not support manual registering, attempting automatic registering')
self.scan_cb_register_automatic(scanned_operators, mcc_mnc)
return
raise e
ofono_client: Implement network registration during connect() A new mcc_mnc parameter is now optionally passed to connect() in order to manually register to a specific network with a given MCC+MNC pair. If no parameter is passed (or None), then the modem will be instructed to attempt an automatic registration with any available network which permits it. We get the MCC+MNC parameter from the MSC/NITB and we pass it to the modem object at connect time as shown in the modified tests. Two new simple tests to check network registration is working are added in this commit. Ofono modems seem to be automatically registering at some point after they are set Online=true, and we were actually using that 'feature' before this patch. Thus, it is possible that a modem quickly becomes registered, and we then check so before starting the scan+registration process, which can take a few seconds. The scanning method can take a few seconds to complete. To avoid blocking in the dbus ofono Scan() method, this commit adds some code to make use of glib/gdbus async methods, which are not yet supported directly by pydbus. This way, we can continue polling while waiting for the scan process to complete and we can register several modems in parallel. When scan completes, a callback is run which attempts to register. If no MCC+MNC was passed, as we just finished scanning the modem should have enough fresh operator information to take good and quick decisions on where to connect. If we have an MCC+MNC, then we check the operator list received by Scan() method. If operator with desired MCC+MNC is there, we register with it. If it's not there, we start scanning() again asynchronously hoping the operator will show up in next scan. As scanning() and registration is done in the background, tests are expected to call connect(), and then later on wait for the modem to register by waiting/polling the method "modem.is_connected()". Tests first check for the modem being connected and after with MSC subscriber_attached(). The order is intentional because the later has to poll through network and adds unneeded garbage to the pcap files bein recorded. Change-Id: I8d9eb47eac1044550d3885adb55105c304b0c15c
2017-05-29 12:25:22 +00:00
def cancel_pending_dbus_methods(self):
self.cancellable.cancel()
# Cancel op is applied as a signal coming from glib mainloop, so we
# need to run it and wait for the callbacks to handle cancellations.
MainLoop.poll()
# once it has been triggered, create a new one for next operation:
self.cancellable = Gio.Cancellable.new()
def power_off(self):
if self.dbus.has_interface(I_CONNMGR) and self.is_attached():
self.detach()
self.set_online(False)
self.set_powered(False)
req_ifaces = self._required_ifaces()
for iface in req_ifaces:
MainLoop.wait(self, lambda: not self.dbus.has_interface(iface), timeout=10)
ofono_client: Implement network registration during connect() A new mcc_mnc parameter is now optionally passed to connect() in order to manually register to a specific network with a given MCC+MNC pair. If no parameter is passed (or None), then the modem will be instructed to attempt an automatic registration with any available network which permits it. We get the MCC+MNC parameter from the MSC/NITB and we pass it to the modem object at connect time as shown in the modified tests. Two new simple tests to check network registration is working are added in this commit. Ofono modems seem to be automatically registering at some point after they are set Online=true, and we were actually using that 'feature' before this patch. Thus, it is possible that a modem quickly becomes registered, and we then check so before starting the scan+registration process, which can take a few seconds. The scanning method can take a few seconds to complete. To avoid blocking in the dbus ofono Scan() method, this commit adds some code to make use of glib/gdbus async methods, which are not yet supported directly by pydbus. This way, we can continue polling while waiting for the scan process to complete and we can register several modems in parallel. When scan completes, a callback is run which attempts to register. If no MCC+MNC was passed, as we just finished scanning the modem should have enough fresh operator information to take good and quick decisions on where to connect. If we have an MCC+MNC, then we check the operator list received by Scan() method. If operator with desired MCC+MNC is there, we register with it. If it's not there, we start scanning() again asynchronously hoping the operator will show up in next scan. As scanning() and registration is done in the background, tests are expected to call connect(), and then later on wait for the modem to register by waiting/polling the method "modem.is_connected()". Tests first check for the modem being connected and after with MSC subscriber_attached(). The order is intentional because the later has to poll through network and adds unneeded garbage to the pcap files bein recorded. Change-Id: I8d9eb47eac1044550d3885adb55105c304b0c15c
2017-05-29 12:25:22 +00:00
def power_cycle(self):
'Power the modem and put it online, power cycle it if it was already on'
req_ifaces = self._required_ifaces()
if self.is_powered():
ofono_client: Implement network registration during connect() A new mcc_mnc parameter is now optionally passed to connect() in order to manually register to a specific network with a given MCC+MNC pair. If no parameter is passed (or None), then the modem will be instructed to attempt an automatic registration with any available network which permits it. We get the MCC+MNC parameter from the MSC/NITB and we pass it to the modem object at connect time as shown in the modified tests. Two new simple tests to check network registration is working are added in this commit. Ofono modems seem to be automatically registering at some point after they are set Online=true, and we were actually using that 'feature' before this patch. Thus, it is possible that a modem quickly becomes registered, and we then check so before starting the scan+registration process, which can take a few seconds. The scanning method can take a few seconds to complete. To avoid blocking in the dbus ofono Scan() method, this commit adds some code to make use of glib/gdbus async methods, which are not yet supported directly by pydbus. This way, we can continue polling while waiting for the scan process to complete and we can register several modems in parallel. When scan completes, a callback is run which attempts to register. If no MCC+MNC was passed, as we just finished scanning the modem should have enough fresh operator information to take good and quick decisions on where to connect. If we have an MCC+MNC, then we check the operator list received by Scan() method. If operator with desired MCC+MNC is there, we register with it. If it's not there, we start scanning() again asynchronously hoping the operator will show up in next scan. As scanning() and registration is done in the background, tests are expected to call connect(), and then later on wait for the modem to register by waiting/polling the method "modem.is_connected()". Tests first check for the modem being connected and after with MSC subscriber_attached(). The order is intentional because the later has to poll through network and adds unneeded garbage to the pcap files bein recorded. Change-Id: I8d9eb47eac1044550d3885adb55105c304b0c15c
2017-05-29 12:25:22 +00:00
self.dbg('Power cycling')
self.power_off()
ofono_client: Implement network registration during connect() A new mcc_mnc parameter is now optionally passed to connect() in order to manually register to a specific network with a given MCC+MNC pair. If no parameter is passed (or None), then the modem will be instructed to attempt an automatic registration with any available network which permits it. We get the MCC+MNC parameter from the MSC/NITB and we pass it to the modem object at connect time as shown in the modified tests. Two new simple tests to check network registration is working are added in this commit. Ofono modems seem to be automatically registering at some point after they are set Online=true, and we were actually using that 'feature' before this patch. Thus, it is possible that a modem quickly becomes registered, and we then check so before starting the scan+registration process, which can take a few seconds. The scanning method can take a few seconds to complete. To avoid blocking in the dbus ofono Scan() method, this commit adds some code to make use of glib/gdbus async methods, which are not yet supported directly by pydbus. This way, we can continue polling while waiting for the scan process to complete and we can register several modems in parallel. When scan completes, a callback is run which attempts to register. If no MCC+MNC was passed, as we just finished scanning the modem should have enough fresh operator information to take good and quick decisions on where to connect. If we have an MCC+MNC, then we check the operator list received by Scan() method. If operator with desired MCC+MNC is there, we register with it. If it's not there, we start scanning() again asynchronously hoping the operator will show up in next scan. As scanning() and registration is done in the background, tests are expected to call connect(), and then later on wait for the modem to register by waiting/polling the method "modem.is_connected()". Tests first check for the modem being connected and after with MSC subscriber_attached(). The order is intentional because the later has to poll through network and adds unneeded garbage to the pcap files bein recorded. Change-Id: I8d9eb47eac1044550d3885adb55105c304b0c15c
2017-05-29 12:25:22 +00:00
else:
self.dbg('Powering on')
self.set_powered()
self.set_online()
MainLoop.wait(self, self.dbus.has_interface, *req_ifaces, timeout=10)
ofono_client: Implement network registration during connect() A new mcc_mnc parameter is now optionally passed to connect() in order to manually register to a specific network with a given MCC+MNC pair. If no parameter is passed (or None), then the modem will be instructed to attempt an automatic registration with any available network which permits it. We get the MCC+MNC parameter from the MSC/NITB and we pass it to the modem object at connect time as shown in the modified tests. Two new simple tests to check network registration is working are added in this commit. Ofono modems seem to be automatically registering at some point after they are set Online=true, and we were actually using that 'feature' before this patch. Thus, it is possible that a modem quickly becomes registered, and we then check so before starting the scan+registration process, which can take a few seconds. The scanning method can take a few seconds to complete. To avoid blocking in the dbus ofono Scan() method, this commit adds some code to make use of glib/gdbus async methods, which are not yet supported directly by pydbus. This way, we can continue polling while waiting for the scan process to complete and we can register several modems in parallel. When scan completes, a callback is run which attempts to register. If no MCC+MNC was passed, as we just finished scanning the modem should have enough fresh operator information to take good and quick decisions on where to connect. If we have an MCC+MNC, then we check the operator list received by Scan() method. If operator with desired MCC+MNC is there, we register with it. If it's not there, we start scanning() again asynchronously hoping the operator will show up in next scan. As scanning() and registration is done in the background, tests are expected to call connect(), and then later on wait for the modem to register by waiting/polling the method "modem.is_connected()". Tests first check for the modem being connected and after with MSC subscriber_attached(). The order is intentional because the later has to poll through network and adds unneeded garbage to the pcap files bein recorded. Change-Id: I8d9eb47eac1044550d3885adb55105c304b0c15c
2017-05-29 12:25:22 +00:00
def connect(self, mcc_mnc=None):
'Connect to MCC+MNC'
if (mcc_mnc is not None) and (len(mcc_mnc) != 2 or None in mcc_mnc):
raise log.Error('mcc_mnc value is invalid. It should be None or contain both valid mcc and mnc values:', mcc_mnc=mcc_mnc)
# if test called connect() before and async scanning has not finished, we need to get rid of it:
self.cancel_pending_dbus_methods()
ofono_client: Implement network registration during connect() A new mcc_mnc parameter is now optionally passed to connect() in order to manually register to a specific network with a given MCC+MNC pair. If no parameter is passed (or None), then the modem will be instructed to attempt an automatic registration with any available network which permits it. We get the MCC+MNC parameter from the MSC/NITB and we pass it to the modem object at connect time as shown in the modified tests. Two new simple tests to check network registration is working are added in this commit. Ofono modems seem to be automatically registering at some point after they are set Online=true, and we were actually using that 'feature' before this patch. Thus, it is possible that a modem quickly becomes registered, and we then check so before starting the scan+registration process, which can take a few seconds. The scanning method can take a few seconds to complete. To avoid blocking in the dbus ofono Scan() method, this commit adds some code to make use of glib/gdbus async methods, which are not yet supported directly by pydbus. This way, we can continue polling while waiting for the scan process to complete and we can register several modems in parallel. When scan completes, a callback is run which attempts to register. If no MCC+MNC was passed, as we just finished scanning the modem should have enough fresh operator information to take good and quick decisions on where to connect. If we have an MCC+MNC, then we check the operator list received by Scan() method. If operator with desired MCC+MNC is there, we register with it. If it's not there, we start scanning() again asynchronously hoping the operator will show up in next scan. As scanning() and registration is done in the background, tests are expected to call connect(), and then later on wait for the modem to register by waiting/polling the method "modem.is_connected()". Tests first check for the modem being connected and after with MSC subscriber_attached(). The order is intentional because the later has to poll through network and adds unneeded garbage to the pcap files bein recorded. Change-Id: I8d9eb47eac1044550d3885adb55105c304b0c15c
2017-05-29 12:25:22 +00:00
self.power_cycle()
self.register_attempts = 0
if self.is_connected(mcc_mnc):
self.log('Already registered with', mcc_mnc if mcc_mnc else 'default network')
else:
self.log('Connect to', mcc_mnc if mcc_mnc else 'default network')
self.schedule_scan_register(mcc_mnc)
def is_attached(self):
connmgr = self.dbus.interface(I_CONNMGR)
prop = connmgr.GetProperties()
attached = prop.get('Attached')
self.dbg('attached:', attached)
return attached
def attach(self, allow_roaming=False):
self.dbg('attach')
if self.is_attached():
self.detach()
connmgr = self.dbus.interface(I_CONNMGR)
connmgr.SetProperty('RoamingAllowed', Variant('b', allow_roaming))
connmgr.SetProperty('Powered', Variant('b', True))
def detach(self):
self.dbg('detach')
connmgr = self.dbus.interface(I_CONNMGR)
connmgr.SetProperty('RoamingAllowed', Variant('b', False))
connmgr.SetProperty('Powered', Variant('b', False))
connmgr.DeactivateAll()
connmgr.ResetContexts() # Requires Powered=false
def activate_context(self, apn='internet', user='ogt', pwd='', protocol='ip'):
self.dbg('activate_context', apn=apn, user=user, protocol=protocol)
connmgr = self.dbus.interface(I_CONNMGR)
ctx_path = connmgr.AddContext('internet')
ctx = systembus_get(ctx_path)
ctx.SetProperty('AccessPointName', Variant('s', apn))
ctx.SetProperty('Username', Variant('s', user))
ctx.SetProperty('Password', Variant('s', pwd))
ctx.SetProperty('Protocol', Variant('s', protocol))
# Activate can only be called after we are attached
ctx.SetProperty('Active', Variant('b', True))
MainLoop.wait(self, lambda: ctx.GetProperties()['Active'] == True)
self.log('context activated', path=ctx_path, apn=apn, user=user, properties=ctx.GetProperties())
return ctx_path
def deactivate_context(self, ctx_id):
self.dbg('deactivate_context', path=ctx_id)
ctx = systembus_get(ctx_id)
ctx.SetProperty('Active', Variant('b', False))
MainLoop.wait(self, lambda: ctx.GetProperties()['Active'] == False)
self.dbg('deactivate_context active=false, removing', path=ctx_id)
connmgr = self.dbus.interface(I_CONNMGR)
connmgr.RemoveContext(ctx_id)
self.log('context deactivated', path=ctx_id)
def run_netns_wait(self, name, popen_args):
proc = process.NetNSProcess(name, self.run_dir.new_dir(name), self.netns(), popen_args,
env={})
proc.launch_sync()
return proc
def setup_context_data_plane(self, ctx_id):
self.dbg('setup_context_data', path=ctx_id)
ctx = systembus_get(ctx_id)
ctx_settings = ctx.GetProperties().get('Settings', None)
if not ctx_settings:
raise log.Error('%s no Settings found! No way to get iface!' % ctx_id)
iface = ctx_settings.get('Interface', None)
if not iface:
raise log.Error('%s Settings contains no iface! %r' % (ctx_id, repr(ctx_settings)))
util.move_iface_to_netns(iface, self.netns(), self.run_dir.new_dir('move_netns'))
self.run_netns_wait('ifup', ('ip', 'link', 'set', 'dev', iface, 'up'))
self.run_netns_wait('dhcp', ('udhcpc', '-q', '-i', iface))
def sms_send(self, to_msisdn_or_modem, *tokens):
if isinstance(to_msisdn_or_modem, Modem):
to_msisdn = to_msisdn_or_modem.msisdn
tokens = list(tokens)
tokens.append('to ' + to_msisdn_or_modem.name())
else:
to_msisdn = str(to_msisdn_or_modem)
msg = sms.Sms(self.msisdn, to_msisdn, 'from ' + self.name(), *tokens)
self.log('sending sms to MSISDN', to_msisdn, sms=msg)
mm = self.dbus.interface(I_SMS)
mm.SendMessage(to_msisdn, str(msg))
return msg
def _on_incoming_message(self, message, info):
self.log('Incoming SMS:', repr(message))
self.dbg(info=info)
2017-05-02 14:29:09 +00:00
self.sms_received_list.append((message, info))
def sms_was_received(self, sms_obj):
2017-05-02 14:29:09 +00:00
for msg, info in self.sms_received_list:
if sms_obj.matches(msg):
self.log('SMS received as expected:', repr(msg))
self.dbg(info=info)
2017-05-02 14:29:09 +00:00
return True
return False
def call_id_list(self):
li = [call.dbuspath for call in self.call_list]
self.dbg('call_id_list: %r' % li)
return li
def call_find_by_id(self, id):
for call in self.call_list:
if call.dbuspath == id:
return call
return None
def call_dial(self, to_msisdn_or_modem):
if isinstance(to_msisdn_or_modem, Modem):
to_msisdn = to_msisdn_or_modem.msisdn
else:
to_msisdn = str(to_msisdn_or_modem)
self.dbg('Dialing:', to_msisdn)
cmgr = self.dbus.interface(I_CALLMGR)
call_obj_path = cmgr.Dial(to_msisdn, 'default')
if self.call_find_by_id(call_obj_path) is None:
self.dbg('Adding %s to call list' % call_obj_path)
self.call_list.append(ModemCall(self, call_obj_path))
else:
self.dbg('Dial returned already existing call')
return call_obj_path
def _find_call_msisdn_state(self, msisdn, state):
cmgr = self.dbus.interface(I_CALLMGR)
ret = cmgr.GetCalls()
for obj_path, props in ret:
if props['LineIdentification'] == msisdn and props['State'] == state:
return obj_path
return None
def call_wait_incoming(self, caller_msisdn_or_modem, timeout=60):
if isinstance(caller_msisdn_or_modem, Modem):
caller_msisdn = caller_msisdn_or_modem.msisdn
else:
caller_msisdn = str(caller_msisdn_or_modem)
self.dbg('Waiting for incoming call from:', caller_msisdn)
MainLoop.wait(self, lambda: self._find_call_msisdn_state(caller_msisdn, 'incoming') is not None, timeout=timeout)
return self._find_call_msisdn_state(caller_msisdn, 'incoming')
def call_answer(self, call_id):
self.dbg('Answer call %s' % call_id)
assert self.call_state(call_id) == 'incoming'
call_dbus_obj = systembus_get(call_id)
call_dbus_obj.Answer()
self.dbg('Answered call %s' % call_id)
def call_hangup(self, call_id):
self.dbg('Hang up call %s' % call_id)
call_dbus_obj = systembus_get(call_id)
call_dbus_obj.Hangup()
def call_is_active(self, call_id):
return self.call_state(call_id) == 'active'
def call_state(self, call_id):
try:
call_dbus_obj = systembus_get(call_id)
props = call_dbus_obj.GetProperties()
state = props.get('State')
except Exception:
self.log('asking call state for non existent call')
log.log_exn()
state = 'disconnected'
self.dbg('call state: %s' % state, call_id=call_id)
return state
def _on_callmgr_call_added(self, obj_path, properties):
self.dbg('%r.CallAdded() -> %s=%r' % (I_CALLMGR, obj_path, repr(properties)))
if self.call_find_by_id(obj_path) is None:
self.call_list.append(ModemCall(self, obj_path))
else:
self.dbg('Call already exists %r' % obj_path)
def _on_callmgr_call_removed(self, obj_path):
self.dbg('%r.CallRemoved() -> %s' % (I_CALLMGR, obj_path))
call_obj = self.call_find_by_id(obj_path)
if call_obj is not None:
self.call_list.remove(call_obj)
call_obj.cleanup()
else:
self.dbg('Trying to remove non-existing call %r' % obj_path)
def _on_callmgr_property_changed(self, name, value):
self.dbg('%r.PropertyChanged() -> %s=%s' % (I_CALLMGR, name, value))
def _on_connmgr_property_changed(self, name, value):
self.dbg('%r.PropertyChanged() -> %s=%s' % (I_CONNMGR, name, value))
def info(self, keys=('Manufacturer', 'Model', 'Revision', 'Serial')):
props = self.properties()
return ', '.join(['%s: %r'%(k,props.get(k)) for k in keys])
def log_info(self, *args, **kwargs):
self.log(self.info(*args, **kwargs))
def ussd_send(self, command):
ss = self.dbus.interface(I_SS)
service_type, response = ss.Initiate(command)
return response
# vim: expandtab tabstop=4 shiftwidth=4