From b3daaea6b5207b3b19c9ccd10b8f76a56d2e6711 Mon Sep 17 00:00:00 2001 From: Neels Hofmeyr Date: Sun, 9 Apr 2017 14:18:34 +0200 Subject: [PATCH] implement modem and nitb bits for a real test Change-Id: I1ca3bd874aed1265d83a46340e9b3e469075c7ee --- selftest/real_suite/suites/sms/mo_mt_sms.py | 9 ++- selftest/sms_test.py | 20 ++++++ src/osmo_gsm_tester/ofono_client.py | 72 +++++++++++++++++---- src/osmo_gsm_tester/osmo_nitb.py | 25 +++---- 4 files changed, 97 insertions(+), 29 deletions(-) create mode 100755 selftest/sms_test.py diff --git a/selftest/real_suite/suites/sms/mo_mt_sms.py b/selftest/real_suite/suites/sms/mo_mt_sms.py index 05be48ce..b97d3321 100755 --- a/selftest/real_suite/suites/sms/mo_mt_sms.py +++ b/selftest/real_suite/suites/sms/mo_mt_sms.py @@ -8,19 +8,18 @@ ms_mo = suite.modem() ms_mt = suite.modem() print('start nitb and bts...') -nitb.add_bts(bts) +nitb.bts_add(bts) nitb.start() sleep(.1) assert nitb.running() bts.start() -nitb.add_subscriber(ms_mo) -nitb.add_subscriber(ms_mt) +nitb.subscriber_add(ms_mo) +nitb.subscriber_add(ms_mt) ms_mo.connect(nitb) ms_mt.connect(nitb) wait(nitb.subscriber_attached, ms_mo, ms_mt) sms = ms_mo.sms_send(ms_mt.msisdn) -sleep(3) -wait(nitb.sms_received, sms) +wait(ms_mt.sms_received, sms) diff --git a/selftest/sms_test.py b/selftest/sms_test.py new file mode 100755 index 00000000..25fb551c --- /dev/null +++ b/selftest/sms_test.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python3 + +import _prep +from osmo_gsm_tester import ofono_client + +print(ofono_client.Sms()) +print(ofono_client.Sms()) +print(ofono_client.Sms()) +sms = ofono_client.Sms('123', '456') +print(str(sms)) + +sms2 = ofono_client.Sms('123', '456') +print(str(sms2)) +assert sms != sms2 + +sms2.msg = str(sms.msg) +print(str(sms2)) +assert sms == sms2 + +# vim: expandtab tabstop=4 shiftwidth=4 diff --git a/src/osmo_gsm_tester/ofono_client.py b/src/osmo_gsm_tester/ofono_client.py index dcca4e29..5f917815 100644 --- a/src/osmo_gsm_tester/ofono_client.py +++ b/src/osmo_gsm_tester/ofono_client.py @@ -17,7 +17,7 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from . import log +from . import log, test from pydbus import SystemBus, Variant import time @@ -28,6 +28,9 @@ glib_main_loop = GLib.MainLoop() glib_main_ctx = glib_main_loop.get_context() bus = SystemBus() +I_NETREG = 'org.ofono.NetworkRegistration' +I_SMS = 'org.ofono.MessageManager' + def poll(): global glib_main_ctx while glib_main_ctx.pending(): @@ -52,8 +55,8 @@ class Modem(log.Origin): self.set_name(self.path) self.set_log_category(log.C_BUS) self._dbus_obj = None - self._interfaces_was = set() - poll() + self._interfaces = set() + test.poll() def set_msisdn(self, msisdn): self.msisdn = msisdn @@ -65,7 +68,9 @@ class Modem(log.Origin): return self.conf.get('ki') def set_powered(self, on=True): + test.poll() self.dbus_obj.SetProperty('Powered', Variant('b', on)) + test.poll() def dbus_obj(self): if self._dbus_obj is not None: @@ -83,9 +88,9 @@ class Modem(log.Origin): def _on_interfaces_change(self, interfaces_now): now = set(interfaces_now) - additions = now - self._interfaces_was - removals = self._interfaces_was - now - self._interfaces_was = now + additions = now - self._interfaces + removals = self._interfaces - now + self._interfaces = now for iface in removals: with log.Origin('modem.disable(%s)' % iface): try: @@ -101,19 +106,62 @@ class Modem(log.Origin): def _on_interface_enabled(self, interface_name): self.dbg('Interface enabled:', interface_name) - # todo: when the messages service comes up, connect a message reception signal + if interface_name == I_SMS: + self._dbus_obj[I_SMS].IncomingMessage.connect(self._on_incoming_message) def _on_interface_disabled(self, interface_name): self.dbg('Interface disabled:', interface_name) + def has_interface(self, name): + return name in self._interfaces + def connect(self, nitb): 'set the modem up to connect to MCC+MNC from NITB config' self.log('connect to', nitb) - self.err('Modem.connect() is still a fake and does not do anything') + self.set_powered() + if not self.has_interface(I_NETREG): + self.log('No %r interface, hoping that the modem connects by itself' % I_NETREG) + else: + self.log('Use of %r interface not implemented yet, hoping that the modem connects by itself' % I_NETREG) - def sms_send(self, msisdn): - self.log('send sms to MSISDN', msisdn) - self.err('Modem.sms_send() is still a fake and does not do anything') - return 'todo' + def sms_send(self, to_msisdn): + if hasattr(to_msisdn, 'msisdn'): + to_msisdn = to_msisdn.msisdn + self.log('sending sms to MSISDN', to_msisdn) + if not self.has_interface(I_SMS): + raise RuntimeError('Modem cannot send SMS, interface not active: %r' % I_SMS) + sms = Sms(self.msisdn(), to_msisdn) + mm = self.dbus_obj[I_SMS] + mm.SendMessage(to_msisdn, str(sms)) + return sms + + def _on_incoming_message(self, message, info): + self.log('Incoming SMS:', repr(message), **info) + + def sms_received(self, sms): + pass + +class Sms: + _last_sms_idx = 0 + msg = None + + def __init__(self, from_msisdn=None, to_msisdn=None): + Sms._last_sms_idx += 1 + msgs = ['message nr. %d' % Sms._last_sms_idx] + if from_msisdn or to_msisdn: + msgs.append(' sent') + if from_msisdn: + msgs.append(' from %s' % from_msisdn) + if to_msisdn: + msgs.append(' to %s' % to_msisdn) + self.msg = ''.join(msgs) + + def __str__(self): + return self.msg + + def __eq__(self, other): + if isinstance(other, Sms): + return self.msg == other.msg + return inself.msg == other # vim: expandtab tabstop=4 shiftwidth=4 diff --git a/src/osmo_gsm_tester/osmo_nitb.py b/src/osmo_gsm_tester/osmo_nitb.py index 027897dd..8fce3711 100644 --- a/src/osmo_gsm_tester/osmo_nitb.py +++ b/src/osmo_gsm_tester/osmo_nitb.py @@ -82,28 +82,28 @@ class OsmoNitb(log.Origin): def addr(self): return self.nitb_iface.get('addr') - def add_bts(self, bts): + def bts_add(self, bts): self.bts.append(bts) bts.set_nitb(self) - def add_subscriber(self, modem, msisdn=None): + def subscriber_add(self, modem, msisdn=None): if msisdn is None: msisdn = self.suite_run.resources_pool.next_msisdn(modem) modem.set_msisdn(msisdn) self.log('Add subscriber', msisdn=msisdn, imsi=modem.imsi()) with self: - OsmoNitbCtrl(self).add_subscriber(modem.imsi(), msisdn, modem.ki()) + OsmoNitbCtrl(self).subscriber_add(modem.imsi(), msisdn, modem.ki()) def subscriber_attached(self, *modems): - return all([self.imsi_attached(m.imsi()) for m in modems]) + return self.imsi_attached([m.imsi() for m in modems]) - def imsi_attached(self, imsi): - self.err('OsmoNitb.imsi_attached() is still fake and does not do anything') - return random.choice((True, False)) + def imsi_attached(self, *imsis): + attached = self.imsi_list_attached() + return all([imsi in attached for imsi in imsis]) - def sms_received(self, sms): - self.err('OsmoNitb.sms_received() is still fake and does not do anything') - return random.choice((True, False)) + def imsi_list_attached(self): + with self: + OsmoNitbCtrl(self).subscriber_list_active() def running(self): return not self.process.terminated() @@ -123,7 +123,7 @@ class OsmoNitbCtrl(log.Origin): def ctrl(self): return osmo_ctrl.OsmoCtrl(self.nitb.addr(), OsmoNitbCtrl.PORT) - def add_subscriber(self, imsi, msisdn, ki=None, algo=None): + def subscriber_add(self, imsi, msisdn, ki=None, algo=None): created = False if ki and not algo: algo = 'comp128v1' @@ -149,7 +149,8 @@ class OsmoNitbCtrl(log.Origin): aslist_str = "" with osmo_ctrl.OsmoCtrl(self.nitb.addr(), OsmoNitbCtrl.PORT) as ctrl: self.ctrl.do_get(OsmoNitbCtrl.SUBSCR_LIST_ACTIVE_VAR) - # this looks like it doesn't work for long data. It's legacy code from the old osmo-gsm-tester. + # This is legacy code from the old osmo-gsm-tester. + # looks like this doesn't work for long data. data = self.ctrl.receive() while (len(data) > 0): (answer, data) = self.ctrl.remove_ipa_ctrl_header(data)