implement modem and nitb bits for a real test

Change-Id: I1ca3bd874aed1265d83a46340e9b3e469075c7ee
This commit is contained in:
Neels Hofmeyr 2017-04-09 14:18:34 +02:00
parent e352844011
commit b3daaea6b5
4 changed files with 97 additions and 29 deletions

View File

@ -8,19 +8,18 @@ ms_mo = suite.modem()
ms_mt = suite.modem()
print('start nitb and bts...')
nitb.add_bts(bts)
nitb.bts_add(bts)
nitb.start()
sleep(.1)
assert nitb.running()
bts.start()
nitb.add_subscriber(ms_mo)
nitb.add_subscriber(ms_mt)
nitb.subscriber_add(ms_mo)
nitb.subscriber_add(ms_mt)
ms_mo.connect(nitb)
ms_mt.connect(nitb)
wait(nitb.subscriber_attached, ms_mo, ms_mt)
sms = ms_mo.sms_send(ms_mt.msisdn)
sleep(3)
wait(nitb.sms_received, sms)
wait(ms_mt.sms_received, sms)

20
selftest/sms_test.py Executable file
View File

@ -0,0 +1,20 @@
#!/usr/bin/env python3
import _prep
from osmo_gsm_tester import ofono_client
print(ofono_client.Sms())
print(ofono_client.Sms())
print(ofono_client.Sms())
sms = ofono_client.Sms('123', '456')
print(str(sms))
sms2 = ofono_client.Sms('123', '456')
print(str(sms2))
assert sms != sms2
sms2.msg = str(sms.msg)
print(str(sms2))
assert sms == sms2
# vim: expandtab tabstop=4 shiftwidth=4

View File

@ -17,7 +17,7 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from . import log
from . import log, test
from pydbus import SystemBus, Variant
import time
@ -28,6 +28,9 @@ glib_main_loop = GLib.MainLoop()
glib_main_ctx = glib_main_loop.get_context()
bus = SystemBus()
I_NETREG = 'org.ofono.NetworkRegistration'
I_SMS = 'org.ofono.MessageManager'
def poll():
global glib_main_ctx
while glib_main_ctx.pending():
@ -52,8 +55,8 @@ class Modem(log.Origin):
self.set_name(self.path)
self.set_log_category(log.C_BUS)
self._dbus_obj = None
self._interfaces_was = set()
poll()
self._interfaces = set()
test.poll()
def set_msisdn(self, msisdn):
self.msisdn = msisdn
@ -65,7 +68,9 @@ class Modem(log.Origin):
return self.conf.get('ki')
def set_powered(self, on=True):
test.poll()
self.dbus_obj.SetProperty('Powered', Variant('b', on))
test.poll()
def dbus_obj(self):
if self._dbus_obj is not None:
@ -83,9 +88,9 @@ class Modem(log.Origin):
def _on_interfaces_change(self, interfaces_now):
now = set(interfaces_now)
additions = now - self._interfaces_was
removals = self._interfaces_was - now
self._interfaces_was = now
additions = now - self._interfaces
removals = self._interfaces - now
self._interfaces = now
for iface in removals:
with log.Origin('modem.disable(%s)' % iface):
try:
@ -101,19 +106,62 @@ class Modem(log.Origin):
def _on_interface_enabled(self, interface_name):
self.dbg('Interface enabled:', interface_name)
# todo: when the messages service comes up, connect a message reception signal
if interface_name == I_SMS:
self._dbus_obj[I_SMS].IncomingMessage.connect(self._on_incoming_message)
def _on_interface_disabled(self, interface_name):
self.dbg('Interface disabled:', interface_name)
def has_interface(self, name):
return name in self._interfaces
def connect(self, nitb):
'set the modem up to connect to MCC+MNC from NITB config'
self.log('connect to', nitb)
self.err('Modem.connect() is still a fake and does not do anything')
self.set_powered()
if not self.has_interface(I_NETREG):
self.log('No %r interface, hoping that the modem connects by itself' % I_NETREG)
else:
self.log('Use of %r interface not implemented yet, hoping that the modem connects by itself' % I_NETREG)
def sms_send(self, msisdn):
self.log('send sms to MSISDN', msisdn)
self.err('Modem.sms_send() is still a fake and does not do anything')
return 'todo'
def sms_send(self, to_msisdn):
if hasattr(to_msisdn, 'msisdn'):
to_msisdn = to_msisdn.msisdn
self.log('sending sms to MSISDN', to_msisdn)
if not self.has_interface(I_SMS):
raise RuntimeError('Modem cannot send SMS, interface not active: %r' % I_SMS)
sms = Sms(self.msisdn(), to_msisdn)
mm = self.dbus_obj[I_SMS]
mm.SendMessage(to_msisdn, str(sms))
return sms
def _on_incoming_message(self, message, info):
self.log('Incoming SMS:', repr(message), **info)
def sms_received(self, sms):
pass
class Sms:
_last_sms_idx = 0
msg = None
def __init__(self, from_msisdn=None, to_msisdn=None):
Sms._last_sms_idx += 1
msgs = ['message nr. %d' % Sms._last_sms_idx]
if from_msisdn or to_msisdn:
msgs.append(' sent')
if from_msisdn:
msgs.append(' from %s' % from_msisdn)
if to_msisdn:
msgs.append(' to %s' % to_msisdn)
self.msg = ''.join(msgs)
def __str__(self):
return self.msg
def __eq__(self, other):
if isinstance(other, Sms):
return self.msg == other.msg
return inself.msg == other
# vim: expandtab tabstop=4 shiftwidth=4

View File

@ -82,28 +82,28 @@ class OsmoNitb(log.Origin):
def addr(self):
return self.nitb_iface.get('addr')
def add_bts(self, bts):
def bts_add(self, bts):
self.bts.append(bts)
bts.set_nitb(self)
def add_subscriber(self, modem, msisdn=None):
def subscriber_add(self, modem, msisdn=None):
if msisdn is None:
msisdn = self.suite_run.resources_pool.next_msisdn(modem)
modem.set_msisdn(msisdn)
self.log('Add subscriber', msisdn=msisdn, imsi=modem.imsi())
with self:
OsmoNitbCtrl(self).add_subscriber(modem.imsi(), msisdn, modem.ki())
OsmoNitbCtrl(self).subscriber_add(modem.imsi(), msisdn, modem.ki())
def subscriber_attached(self, *modems):
return all([self.imsi_attached(m.imsi()) for m in modems])
return self.imsi_attached([m.imsi() for m in modems])
def imsi_attached(self, imsi):
self.err('OsmoNitb.imsi_attached() is still fake and does not do anything')
return random.choice((True, False))
def imsi_attached(self, *imsis):
attached = self.imsi_list_attached()
return all([imsi in attached for imsi in imsis])
def sms_received(self, sms):
self.err('OsmoNitb.sms_received() is still fake and does not do anything')
return random.choice((True, False))
def imsi_list_attached(self):
with self:
OsmoNitbCtrl(self).subscriber_list_active()
def running(self):
return not self.process.terminated()
@ -123,7 +123,7 @@ class OsmoNitbCtrl(log.Origin):
def ctrl(self):
return osmo_ctrl.OsmoCtrl(self.nitb.addr(), OsmoNitbCtrl.PORT)
def add_subscriber(self, imsi, msisdn, ki=None, algo=None):
def subscriber_add(self, imsi, msisdn, ki=None, algo=None):
created = False
if ki and not algo:
algo = 'comp128v1'
@ -149,7 +149,8 @@ class OsmoNitbCtrl(log.Origin):
aslist_str = ""
with osmo_ctrl.OsmoCtrl(self.nitb.addr(), OsmoNitbCtrl.PORT) as ctrl:
self.ctrl.do_get(OsmoNitbCtrl.SUBSCR_LIST_ACTIVE_VAR)
# this looks like it doesn't work for long data. It's legacy code from the old osmo-gsm-tester.
# This is legacy code from the old osmo-gsm-tester.
# looks like this doesn't work for long data.
data = self.ctrl.receive()
while (len(data) > 0):
(answer, data) = self.ctrl.remove_ipa_ctrl_header(data)