Add basic CTRL test
All the CTRL tests were skipped automatically because they were inherited from before repo split time. This means that MSC CTRL interface was not tested at all. Add trivial test which uses generic rate counter introspection so we at least check that MSC's CTRL interface is not completely broken. Change-Id: I784feece666b00752a81f2c126e6f255505445be
This commit is contained in:
parent
82be67de2b
commit
abb24a2725
|
@ -148,395 +148,18 @@ class TestCtrlBase(unittest.TestCase):
|
|||
return responses
|
||||
|
||||
|
||||
class TestCtrlBSC(TestCtrlBase):
|
||||
|
||||
def tearDown(self):
|
||||
TestCtrlBase.tearDown(self)
|
||||
os.unlink("tmp_dummy_sock")
|
||||
|
||||
class TestCtrlMSC(TestCtrlBase):
|
||||
def ctrl_command(self):
|
||||
return ["./src/osmo-bsc/osmo-bsc", "-r", "tmp_dummy_sock", "-c",
|
||||
"doc/examples/osmo-bsc/osmo-bsc.cfg"]
|
||||
return ["./src/osmo-msc/osmo-msc", "-c",
|
||||
"doc/examples/osmo-msc/osmo-msc.cfg"]
|
||||
|
||||
def ctrl_app(self):
|
||||
return (4249, "./src/osmo-bsc/osmo-bsc", "OsmoBSC", "bsc")
|
||||
return (4255, "./src/osmo-msc/osmo-msc", "OsmoMSC", "msc")
|
||||
|
||||
def testCtrlErrs(self):
|
||||
r = self.do_get('invalid')
|
||||
self.assertEquals(r['mtype'], 'ERROR')
|
||||
self.assertEquals(r['error'], 'Command not found')
|
||||
def testRateCounters(self):
|
||||
r = self.do_get('rate_ctr.*')
|
||||
|
||||
r = self.do_set('rf_locked', '999')
|
||||
self.assertEquals(r['mtype'], 'ERROR')
|
||||
self.assertEquals(r['error'], 'Value failed verification.')
|
||||
|
||||
r = self.do_get('bts')
|
||||
self.assertEquals(r['mtype'], 'ERROR')
|
||||
self.assertEquals(r['error'], 'Error while parsing the index.')
|
||||
|
||||
r = self.do_get('bts.999')
|
||||
self.assertEquals(r['mtype'], 'ERROR')
|
||||
self.assertEquals(r['error'], 'Error while resolving object')
|
||||
|
||||
def testBtsLac(self):
|
||||
r = self.do_get('bts.0.location-area-code')
|
||||
self.assertEquals(r['mtype'], 'GET_REPLY')
|
||||
self.assertEquals(r['var'], 'bts.0.location-area-code')
|
||||
self.assertEquals(r['value'], '1')
|
||||
|
||||
r = self.do_set('bts.0.location-area-code', '23')
|
||||
self.assertEquals(r['mtype'], 'SET_REPLY')
|
||||
self.assertEquals(r['var'], 'bts.0.location-area-code')
|
||||
self.assertEquals(r['value'], '23')
|
||||
|
||||
r = self.do_get('bts.0.location-area-code')
|
||||
self.assertEquals(r['mtype'], 'GET_REPLY')
|
||||
self.assertEquals(r['var'], 'bts.0.location-area-code')
|
||||
self.assertEquals(r['value'], '23')
|
||||
|
||||
r = self.do_set('bts.0.location-area-code', '-1')
|
||||
self.assertEquals(r['mtype'], 'ERROR')
|
||||
self.assertEquals(r['error'], 'Input not within the range')
|
||||
|
||||
def testBtsCi(self):
|
||||
r = self.do_get('bts.0.cell-identity')
|
||||
self.assertEquals(r['mtype'], 'GET_REPLY')
|
||||
self.assertEquals(r['var'], 'bts.0.cell-identity')
|
||||
self.assertEquals(r['value'], '0')
|
||||
|
||||
r = self.do_set('bts.0.cell-identity', '23')
|
||||
self.assertEquals(r['mtype'], 'SET_REPLY')
|
||||
self.assertEquals(r['var'], 'bts.0.cell-identity')
|
||||
self.assertEquals(r['value'], '23')
|
||||
|
||||
r = self.do_get('bts.0.cell-identity')
|
||||
self.assertEquals(r['mtype'], 'GET_REPLY')
|
||||
self.assertEquals(r['var'], 'bts.0.cell-identity')
|
||||
self.assertEquals(r['value'], '23')
|
||||
|
||||
r = self.do_set('bts.0.cell-identity', '-1')
|
||||
self.assertEquals(r['mtype'], 'ERROR')
|
||||
self.assertEquals(r['error'], 'Input not within the range')
|
||||
|
||||
def testBtsGenerateSystemInformation(self):
|
||||
r = self.do_get('bts.0.send-new-system-informations')
|
||||
self.assertEquals(r['mtype'], 'ERROR')
|
||||
self.assertEquals(r['error'], 'Write Only attribute')
|
||||
|
||||
# No RSL links so it will fail
|
||||
r = self.do_set('bts.0.send-new-system-informations', '1')
|
||||
self.assertEquals(r['mtype'], 'ERROR')
|
||||
self.assertEquals(r['error'], 'Failed to generate SI')
|
||||
|
||||
def testBtsChannelLoad(self):
|
||||
r = self.do_set('bts.0.channel-load', '1')
|
||||
self.assertEquals(r['mtype'], 'ERROR')
|
||||
self.assertEquals(r['error'], 'Read Only attribute')
|
||||
|
||||
# No RSL link so everything is 0
|
||||
r = self.do_get('bts.0.channel-load')
|
||||
self.assertEquals(r['mtype'], 'GET_REPLY')
|
||||
self.assertEquals(r['value'],
|
||||
'CCCH+SDCCH4,0,0 TCH/F,0,0 TCH/H,0,0 SDCCH8,0,0'
|
||||
+ ' TCH/F_PDCH,0,0 CCCH+SDCCH4+CBCH,0,0'
|
||||
+ ' SDCCH8+CBCH,0,0 TCH/F_TCH/H_PDCH,0,0')
|
||||
|
||||
def testBtsOmlConnectionState(self):
|
||||
"""Check OML state. It will not be connected"""
|
||||
r = self.do_set('bts.0.oml-connection-state', '1')
|
||||
self.assertEquals(r['mtype'], 'ERROR')
|
||||
self.assertEquals(r['error'], 'Read Only attribute')
|
||||
|
||||
# No RSL link so everything is 0
|
||||
r = self.do_get('bts.0.oml-connection-state')
|
||||
self.assertEquals(r['mtype'], 'GET_REPLY')
|
||||
self.assertEquals(r['value'], 'disconnected')
|
||||
|
||||
def testTrxPowerRed(self):
|
||||
r = self.do_get('bts.0.trx.0.max-power-reduction')
|
||||
self.assertEquals(r['mtype'], 'GET_REPLY')
|
||||
self.assertEquals(r['var'], 'bts.0.trx.0.max-power-reduction')
|
||||
self.assertEquals(r['value'], '20')
|
||||
|
||||
r = self.do_set('bts.0.trx.0.max-power-reduction', '22')
|
||||
self.assertEquals(r['mtype'], 'SET_REPLY')
|
||||
self.assertEquals(r['var'], 'bts.0.trx.0.max-power-reduction')
|
||||
self.assertEquals(r['value'], '22')
|
||||
|
||||
r = self.do_get('bts.0.trx.0.max-power-reduction')
|
||||
self.assertEquals(r['mtype'], 'GET_REPLY')
|
||||
self.assertEquals(r['var'], 'bts.0.trx.0.max-power-reduction')
|
||||
self.assertEquals(r['value'], '22')
|
||||
|
||||
r = self.do_set('bts.0.trx.0.max-power-reduction', '1')
|
||||
self.assertEquals(r['mtype'], 'ERROR')
|
||||
self.assertEquals(r['error'], 'Value must be even')
|
||||
|
||||
def testTrxArfcn(self):
|
||||
r = self.do_get('bts.0.trx.0.arfcn')
|
||||
self.assertEquals(r['mtype'], 'GET_REPLY')
|
||||
self.assertEquals(r['var'], 'bts.0.trx.0.arfcn')
|
||||
self.assertEquals(r['value'], '871')
|
||||
|
||||
r = self.do_set('bts.0.trx.0.arfcn', '873')
|
||||
self.assertEquals(r['mtype'], 'SET_REPLY')
|
||||
self.assertEquals(r['var'], 'bts.0.trx.0.arfcn')
|
||||
self.assertEquals(r['value'], '873')
|
||||
|
||||
r = self.do_get('bts.0.trx.0.arfcn')
|
||||
self.assertEquals(r['mtype'], 'GET_REPLY')
|
||||
self.assertEquals(r['var'], 'bts.0.trx.0.arfcn')
|
||||
self.assertEquals(r['value'], '873')
|
||||
|
||||
r = self.do_set('bts.0.trx.0.arfcn', '2000')
|
||||
self.assertEquals(r['mtype'], 'ERROR')
|
||||
self.assertEquals(r['error'], 'Input not within the range')
|
||||
|
||||
def testRfLock(self):
|
||||
r = self.do_get('bts.0.rf_state')
|
||||
self.assertEquals(r['mtype'], 'GET_REPLY')
|
||||
self.assertEquals(r['var'], 'bts.0.rf_state')
|
||||
self.assertEquals(r['value'], 'inoperational,unlocked,on')
|
||||
|
||||
r = self.do_set('rf_locked', '1')
|
||||
self.assertEquals(r['mtype'], 'SET_REPLY')
|
||||
self.assertEquals(r['var'], 'rf_locked')
|
||||
self.assertEquals(r['value'], '1')
|
||||
|
||||
time.sleep(1.5)
|
||||
|
||||
r = self.do_get('bts.0.rf_state')
|
||||
self.assertEquals(r['mtype'], 'GET_REPLY')
|
||||
self.assertEquals(r['var'], 'bts.0.rf_state')
|
||||
self.assertEquals(r['value'], 'inoperational,locked,off')
|
||||
|
||||
r = self.do_get('rf_locked')
|
||||
self.assertEquals(r['mtype'], 'GET_REPLY')
|
||||
self.assertEquals(r['var'], 'rf_locked')
|
||||
self.assertEquals(r['value'], 'state=off,policy=off')
|
||||
|
||||
r = self.do_set('rf_locked', '0')
|
||||
self.assertEquals(r['mtype'], 'SET_REPLY')
|
||||
self.assertEquals(r['var'], 'rf_locked')
|
||||
self.assertEquals(r['value'], '0')
|
||||
|
||||
time.sleep(1.5)
|
||||
|
||||
r = self.do_get('bts.0.rf_state')
|
||||
self.assertEquals(r['mtype'], 'GET_REPLY')
|
||||
self.assertEquals(r['var'], 'bts.0.rf_state')
|
||||
self.assertEquals(r['value'], 'inoperational,unlocked,on')
|
||||
|
||||
r = self.do_get('rf_locked')
|
||||
self.assertEquals(r['mtype'], 'GET_REPLY')
|
||||
self.assertEquals(r['var'], 'rf_locked')
|
||||
self.assertEquals(r['value'], 'state=off,policy=on')
|
||||
|
||||
def testTimezone(self):
|
||||
r = self.do_get('timezone')
|
||||
self.assertEquals(r['mtype'], 'GET_REPLY')
|
||||
self.assertEquals(r['var'], 'timezone')
|
||||
self.assertEquals(r['value'], 'off')
|
||||
|
||||
r = self.do_set('timezone', '-2,15,2')
|
||||
self.assertEquals(r['mtype'], 'SET_REPLY')
|
||||
self.assertEquals(r['var'], 'timezone')
|
||||
self.assertEquals(r['value'], '-2,15,2')
|
||||
|
||||
r = self.do_get('timezone')
|
||||
self.assertEquals(r['mtype'], 'GET_REPLY')
|
||||
self.assertEquals(r['var'], 'timezone')
|
||||
self.assertEquals(r['value'], '-2,15,2')
|
||||
|
||||
# Test invalid input
|
||||
r = self.do_set('timezone', '-2,15,2,5,6,7')
|
||||
self.assertEquals(r['mtype'], 'SET_REPLY')
|
||||
self.assertEquals(r['var'], 'timezone')
|
||||
self.assertEquals(r['value'], '-2,15,2')
|
||||
|
||||
r = self.do_set('timezone', '-2,15')
|
||||
self.assertEquals(r['mtype'], 'ERROR')
|
||||
r = self.do_set('timezone', '-2')
|
||||
self.assertEquals(r['mtype'], 'ERROR')
|
||||
r = self.do_set('timezone', '1')
|
||||
|
||||
r = self.do_set('timezone', 'off')
|
||||
self.assertEquals(r['mtype'], 'SET_REPLY')
|
||||
self.assertEquals(r['var'], 'timezone')
|
||||
self.assertEquals(r['value'], 'off')
|
||||
|
||||
r = self.do_get('timezone')
|
||||
self.assertEquals(r['mtype'], 'GET_REPLY')
|
||||
self.assertEquals(r['var'], 'timezone')
|
||||
self.assertEquals(r['value'], 'off')
|
||||
|
||||
def testMcc(self):
|
||||
r = self.do_set('mcc', '23')
|
||||
r = self.do_get('mcc')
|
||||
self.assertEquals(r['mtype'], 'GET_REPLY')
|
||||
self.assertEquals(r['var'], 'mcc')
|
||||
self.assertEquals(r['value'], '23')
|
||||
|
||||
r = self.do_set('mcc', '023')
|
||||
r = self.do_get('mcc')
|
||||
self.assertEquals(r['mtype'], 'GET_REPLY')
|
||||
self.assertEquals(r['var'], 'mcc')
|
||||
self.assertEquals(r['value'], '23')
|
||||
|
||||
def testMnc(self):
|
||||
r = self.do_set('mnc', '9')
|
||||
r = self.do_get('mnc')
|
||||
self.assertEquals(r['mtype'], 'GET_REPLY')
|
||||
self.assertEquals(r['var'], 'mnc')
|
||||
self.assertEquals(r['value'], '9')
|
||||
|
||||
r = self.do_set('mnc', '09')
|
||||
r = self.do_get('mnc')
|
||||
self.assertEquals(r['mtype'], 'GET_REPLY')
|
||||
self.assertEquals(r['var'], 'mnc')
|
||||
self.assertEquals(r['value'], '9')
|
||||
|
||||
|
||||
def testMccMncApply(self):
|
||||
# Test some invalid input
|
||||
r = self.do_set('mcc-mnc-apply', 'WRONG')
|
||||
self.assertEquals(r['mtype'], 'ERROR')
|
||||
|
||||
r = self.do_set('mcc-mnc-apply', '1,')
|
||||
self.assertEquals(r['mtype'], 'ERROR')
|
||||
|
||||
r = self.do_set('mcc-mnc-apply', '200,3')
|
||||
self.assertEquals(r['mtype'], 'SET_REPLY')
|
||||
self.assertEquals(r['var'], 'mcc-mnc-apply')
|
||||
self.assertEquals(r['value'], 'Tried to drop the BTS')
|
||||
|
||||
# Set it again
|
||||
r = self.do_set('mcc-mnc-apply', '200,3')
|
||||
self.assertEquals(r['mtype'], 'SET_REPLY')
|
||||
self.assertEquals(r['var'], 'mcc-mnc-apply')
|
||||
self.assertEquals(r['value'], 'Nothing changed')
|
||||
|
||||
# Change it
|
||||
r = self.do_set('mcc-mnc-apply', '200,4')
|
||||
self.assertEquals(r['mtype'], 'SET_REPLY')
|
||||
self.assertEquals(r['var'], 'mcc-mnc-apply')
|
||||
self.assertEquals(r['value'], 'Tried to drop the BTS')
|
||||
|
||||
# Change it
|
||||
r = self.do_set('mcc-mnc-apply', '201,4')
|
||||
self.assertEquals(r['mtype'], 'SET_REPLY')
|
||||
self.assertEquals(r['var'], 'mcc-mnc-apply')
|
||||
self.assertEquals(r['value'], 'Tried to drop the BTS')
|
||||
|
||||
# Verify
|
||||
r = self.do_get('mnc')
|
||||
self.assertEquals(r['mtype'], 'GET_REPLY')
|
||||
self.assertEquals(r['var'], 'mnc')
|
||||
self.assertEquals(r['value'], '4')
|
||||
|
||||
r = self.do_get('mcc')
|
||||
self.assertEquals(r['mtype'], 'GET_REPLY')
|
||||
self.assertEquals(r['var'], 'mcc')
|
||||
self.assertEquals(r['value'], '201')
|
||||
|
||||
# Change it
|
||||
r = self.do_set('mcc-mnc-apply', '202,03')
|
||||
self.assertEquals(r['mtype'], 'SET_REPLY')
|
||||
self.assertEquals(r['var'], 'mcc-mnc-apply')
|
||||
self.assertEquals(r['value'], 'Tried to drop the BTS')
|
||||
|
||||
r = self.do_get('mnc')
|
||||
self.assertEquals(r['mtype'], 'GET_REPLY')
|
||||
self.assertEquals(r['var'], 'mnc')
|
||||
self.assertEquals(r['value'], '3')
|
||||
|
||||
r = self.do_get('mcc')
|
||||
self.assertEquals(r['mtype'], 'GET_REPLY')
|
||||
self.assertEquals(r['var'], 'mcc')
|
||||
self.assertEquals(r['value'], '202')
|
||||
|
||||
class TestCtrlNAT(TestCtrlBase):
|
||||
|
||||
def ctrl_command(self):
|
||||
return ["./src/osmo-bsc_nat/osmo-bsc_nat", "-c",
|
||||
"doc/examples/osmo-bsc_nat/osmo-bsc_nat.cfg"]
|
||||
|
||||
def ctrl_app(self):
|
||||
return (4250, "./src/osmo-bsc_nat/osmo-bsc_nat", "OsmoNAT", "nat")
|
||||
|
||||
def testAccessList(self):
|
||||
r = self.do_get('net.0.bsc_cfg.0.access-list-name')
|
||||
self.assertEquals(r['mtype'], 'GET_REPLY')
|
||||
self.assertEquals(r['var'], 'net')
|
||||
self.assertEquals(r['value'], None)
|
||||
|
||||
r = self.do_set('net.0.bsc_cfg.0.access-list-name', 'bla')
|
||||
self.assertEquals(r['mtype'], 'SET_REPLY')
|
||||
self.assertEquals(r['var'], 'net')
|
||||
self.assertEquals(r['value'], 'bla')
|
||||
|
||||
r = self.do_get('net.0.bsc_cfg.0.access-list-name')
|
||||
self.assertEquals(r['mtype'], 'GET_REPLY')
|
||||
self.assertEquals(r['var'], 'net')
|
||||
self.assertEquals(r['value'], 'bla')
|
||||
|
||||
r = self.do_set('net.0.bsc_cfg.0.no-access-list-name', '1')
|
||||
self.assertEquals(r['mtype'], 'SET_REPLY')
|
||||
self.assertEquals(r['var'], 'net')
|
||||
self.assertEquals(r['value'], None)
|
||||
|
||||
r = self.do_get('net.0.bsc_cfg.0.access-list-name')
|
||||
self.assertEquals(r['mtype'], 'GET_REPLY')
|
||||
self.assertEquals(r['var'], 'net')
|
||||
self.assertEquals(r['value'], None)
|
||||
|
||||
def testAccessListManagement(self):
|
||||
r = self.do_set("net.0.add.allow.access-list.404", "abc")
|
||||
self.assertEquals(r['mtype'], 'ERROR')
|
||||
|
||||
r = self.do_set("net.0.add.allow.access-list.bla", "^234$")
|
||||
self.assertEquals(r['mtype'], 'SET_REPLY')
|
||||
self.assertEquals(r['var'], 'net.0.add.allow.access-list.bla')
|
||||
self.assertEquals(r['value'], 'IMSI allow added to access list')
|
||||
|
||||
# TODO.. find a way to actually see if this rule has been
|
||||
# added. e.g. by implementing a get for the list.
|
||||
|
||||
class TestCtrlSGSN(TestCtrlBase):
|
||||
def ctrl_command(self):
|
||||
return ["./src/gprs/osmo-sgsn", "-c",
|
||||
"doc/examples/osmo-sgsn/osmo-sgsn.cfg"]
|
||||
|
||||
def ctrl_app(self):
|
||||
return (4251, "./src/gprs/osmo-sgsn", "OsmoSGSN", "sgsn")
|
||||
|
||||
def testListSubscribers(self):
|
||||
# TODO. Add command to mark a subscriber as active
|
||||
r = self.do_get('subscriber-list-active-v1')
|
||||
self.assertEquals(r['mtype'], 'GET_REPLY')
|
||||
self.assertEquals(r['var'], 'subscriber-list-active-v1')
|
||||
self.assertEquals(r['value'], None)
|
||||
|
||||
def add_bsc_test(suite, workdir):
|
||||
if not os.path.isfile(os.path.join(workdir, "src/osmo-bsc/osmo-bsc")):
|
||||
print("Skipping the BSC test")
|
||||
return
|
||||
test = unittest.TestLoader().loadTestsFromTestCase(TestCtrlBSC)
|
||||
suite.addTest(test)
|
||||
|
||||
def add_nat_test(suite, workdir):
|
||||
if not os.path.isfile(os.path.join(workdir, "src/osmo-bsc_nat/osmo-bsc_nat")):
|
||||
print("Skipping the NAT test")
|
||||
return
|
||||
test = unittest.TestLoader().loadTestsFromTestCase(TestCtrlNAT)
|
||||
suite.addTest(test)
|
||||
|
||||
def add_sgsn_test(suite, workdir):
|
||||
if not os.path.isfile(os.path.join(workdir, "src/gprs/osmo-sgsn")):
|
||||
print("Skipping the SGSN test")
|
||||
return
|
||||
test = unittest.TestLoader().loadTestsFromTestCase(TestCtrlSGSN)
|
||||
suite.addTest(test)
|
||||
# FIXME: add MSC-specific CTRL test in here
|
||||
|
||||
if __name__ == '__main__':
|
||||
import argparse
|
||||
|
@ -568,8 +191,6 @@ if __name__ == '__main__':
|
|||
os.chdir(workdir)
|
||||
print "Running tests for specific control commands"
|
||||
suite = unittest.TestSuite()
|
||||
add_bsc_test(suite, workdir)
|
||||
add_nat_test(suite, workdir)
|
||||
add_sgsn_test(suite, workdir)
|
||||
suite.addTest(unittest.TestLoader().loadTestsFromTestCase(TestCtrlMSC))
|
||||
res = unittest.TextTestRunner(verbosity=verbose_level).run(suite)
|
||||
sys.exit(len(res.errors) + len(res.failures))
|
||||
|
|
Loading…
Reference in New Issue