smalltalk
/
osmo-st-all
Archived
1
0
Fork 0

Merge branch 'master' of git://git.osmocom.org/smalltalk/osmo-st-openbsc-test

This commit is contained in:
Holger Hans Peter Freyther 2012-11-22 10:13:13 +01:00
commit 80a1fdf9a9
23 changed files with 1116 additions and 60 deletions

View File

@ -1 +1,2 @@
*.sw?
*.py?

View File

@ -3,9 +3,11 @@ Eval [
fileIn: 'OMLMsg.st';
fileIn: 'IPAOMLMsg.st';
fileIn: 'OML.st';
fileIn: 'OMLDualTrx.st';
fileIn: 'OMLInit.st';
fileIn: 'RSLMsg.st';
fileIn: 'BTS.st';
fileIn: 'BTSDualTrx.st';
fileIn: 'BTSConnection.st';
fileIn: 'OpenBSCTest.st';
fileIn: 'ExampleTest.st'.

View File

@ -22,39 +22,39 @@ simulating failure condition..."
PackageLoader fileInPackage: #OsmoGSM.
RSLBCCHInformation extend [
btsDispatchOn: aBTS [
trxDispatchOn: aTrx [
<category: '*-BTS-Core'>
"nothing todo"
]
]
RSLSACCHFilling extend [
btsDispatchOn: aBTS [
trxDispatchOn: aTrx [
<category: '*-BTS-Core'>
"nothing todo"
]
]
RSLMessageBase extend [
btsChannelDispatch: aBTS [
trxChannelDispatch: aTrx [
| ts lchan |
<category: '*-BTS-Core'>
"Generic channel based dispatch."
ts := aBTS trx channel: self channelNumber timeslotNumber + 1.
ts := aTrx channel: self channelNumber timeslotNumber + 1.
lchan := ts lchan: self channelNumber subslotNumber + 1.
self btsDispatchOn: aBTS with: lchan.
self trxDispatchOn: aTrx with: lchan.
]
]
RSLDedicatedChannelManagement extend [
btsDispatchOn: aBTS [
trxDispatchOn: aTrx [
<category: '*-BTS-Core'>
self btsChannelDispatch: aBTS.
self trxChannelDispatch: aTrx.
]
]
RSLChannelActivation extend [
btsAllocateChan: aBTS lchan: aChan [
trxAllocateChan: aTrx lchan: aChan [
| ack |
<category: '*-BTS-Core'>
@ -64,15 +64,15 @@ RSLChannelActivation extend [
channelNumber: self channelNumber;
frameNumber: #(23 42) asRSLAttributeData;
yourself.
aBTS sendRSL: ack toMessage.
aTrx mainBts sendRSL: ack toMessage on: aTrx.
]
btsNackChan: aBTS lchan: aLchan [
trxNackChan: aTrx lchan: aLchan [
<category: '*-BTS-Core'>
^ self notYetImplemented
]
btsDispatchOn: aBTS with: lchan [
trxDispatchOn: aTrx with: lchan [
<category: '*-BTS-Core'>
"Find the channel and activate it."
Transcript
@ -81,20 +81,20 @@ RSLChannelActivation extend [
"Allocate..."
lchan allocate
ifTrue: [self btsAllocateChan: aBTS lchan: lchan]
ifFalse: [self btsNackChan: aBTS lchan: lchan].
ifTrue: [self trxAllocateChan: aTrx lchan: lchan]
ifFalse: [self trxNackChan: aTrx lchan: lchan].
]
]
RSLRFChannelRelease extend [
btsDispatchOn: aBTS with: lchan [
trxDispatchOn: aTrx with: lchan [
<category: '*-BTS-Core'>
lchan releaseRequested.
]
]
RSLImmediateAssignment extend [
btsDispatchOn: aBTS [
trxDispatchOn: aTrx [
| gsm ts lchan chan_nr |
<category: '*-BTS-Core'>
self channelNumber isPchAgch
@ -111,47 +111,47 @@ RSLImmediateAssignment extend [
data: (ByteArray with: gsm channelOrPacketDescription data first).
"Find the lchan now."
ts := aBTS trx channel: chan_nr timeslotNumber + 1.
ts := aTrx channel: chan_nr timeslotNumber + 1.
lchan := ts lchan: chan_nr subslotNumber + 1.
"Check that the is allocated."
lchan isFree ifTrue: [^self error: 'The lchan should be allocated.'].
aBTS channelAssigned: lchan ra: gsm requestReference ra.
aTrx mainBts channelAssigned: lchan ra: gsm requestReference ra.
]
]
RSLDataRequest extend [
btsDispatchOn: aBTS with: lchan [
trxDispatchOn: aTrx with: lchan [
<category: '*-BTS-Core'>
lchan
dataRequest: self l3Information data
sapi: (self linkIdentifier data first bitAnd: 2r111).
]
btsDispatchOn: aBTS [
trxDispatchOn: aTrx [
<category: '*-BTS-Core'>
self btsChannelDispatch: aBTS.
self trxChannelDispatch: aTrx.
]
]
RSLSacchDeactivate extend [
btsDispatchOn: aBTS [
trxDispatchOn: aTrx [
<category: '*-BTS-Core'>
Transcript nextPutAll: 'Deactivating SACCH. Not doing anything'; nl.
]
]
RSLReleaseRequest extend [
btsDispatchOn: aBTS with: lchan [
trxDispatchOn: aTrx with: lchan [
<category: '*-BTS-Core'>
lchan releaseSapiRequest: self linkIdentifier data first.
]
btsDispatchOn: aBTS [
trxDispatchOn: aTrx [
<category: '*-BTS-Core'>
"A sapi has been released."
self btsChannelDispatch: aBTS.
self trxChannelDispatch: aTrx.
]
]
@ -161,6 +161,10 @@ Object subclass: BTS [
<comment: 'A fake BTS to test the state machine and inject
RSL messages to test a network without RF.'>
BTS class >> omlInitClass [
^ OMLBTSInit
]
connect: anAddress [
<category: 'connect'>
self stop.
@ -207,10 +211,11 @@ Object subclass: BTS [
"Forward all RSL data from the TRX."
site_mgr bts basebandTransceiver
onData: [:each | self sendRSL: each].
onData: [:each | self sendOnPrimaryRSL: each];
mainBts: self.
"Start the OML init now in a new thread"
oml_init := OMLBTSInit initWith: self.
oml_init := self class omlInitClass initWith: self.
[[oml_init run ] ensure: [Transcript nextPutAll: 'OML-Init exited'; nl]] fork.
]
@ -265,15 +270,14 @@ Object subclass: BTS [
[oml txQueueIsEmpty] whileFalse: [(Delay forMilliseconds: 500) wait].
]
startRSL: aPort streamId: anId [
startRSL: aPort streamId: anId on: aTrx [
<category: 'rsl'>
"TODO: handle the stream."
rsl isNil ifFalse: [rsl stop].
rsl := BTSRslConnection new
onData: [:each | self handleRsl: each];
onStop: [self rslStopped];
onConnect: [self rslConnected];
onData: [:each | self handleRsl: each on: aTrx];
onStop: [self rslStopped: rsl];
onConnect: [self rslConnected: rsl];
btsId: bts_id;
streamId: anId;
yourself.
@ -286,14 +290,14 @@ Object subclass: BTS [
bts_id := aId.
]
handleRsl: aMsg [
handleRsl: aMsg on: aTrx [
| rsl |
<category: 'rsl'>
[
| rsl |
rsl := RSLMessageBase parse: aMsg asByteArray readStream.
rsl btsDispatchOn: self.
rsl trxDispatchOn: aTrx.
] on: Exception do: [:e |
Transcript nextPutAll: 'RSL Parsing failed with'; nl.
e inspect.
@ -304,24 +308,29 @@ Object subclass: BTS [
]
]
rslStopped [
rslStopped: anInput [
<category: 'rsl'>
Transcript nextPutAll: 'RSL Stopped'; nl.
]
rslConnected [
rslConnected: anInput [
<category: 'rsl'>
Transcript nextPutAll: 'RSL Connected'; nl.
"Send anything so rsl will be initialized."
rsl send: #(1 2 3 4 5).
anInput send: #(1 2 3 4 5).
oml_up signal.
]
sendRSL: aMsg [
sendOnPrimaryRSL: aMsg [
<category: 'rsl'>
rsl send: aMsg.
]
sendRSL: aMsg on: aTrx [
<category: 'rsl'>
self sendOnPrimaryRSL: aMsg.
]
findRequestee: aRa [
<category: 'lchan'>
@ -357,7 +366,7 @@ Object subclass: BTS [
].
"Send the request"
self sendRSL: aMsg.
self sendOnPrimaryRSL: aMsg.
"Wait for a result and just return the out_chan, remove the entry"
(Delay forSeconds: 2) timedWaitOn: sem.

View File

@ -0,0 +1,96 @@
"
(C) 2012 by Holger Hans Peter Freyther
All Rights Reserved
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"
"
This is code for a dual trx bts
"
BTS subclass: DualTrxBTS [
| rsl2 |
<category: 'BTS-Core-DualTRX'>
<comment: 'I am a fake dual TRX bts.'>
stop [
<category: 'control'>
rsl2 isNil ifFalse: [rsl2 stop. rsl2 := nil].
^ super stop.
]
omlConnected [
<category: 'control'>
Transcript nextPutAll: 'OML Connected for dual TRX'; nl.
"Create a new SiteManager and forward OML data."
site_mgr := DualTrxSiteManager new
onData: [:each | self sendOML: each];
yourself.
"Forward all RSL data from the TRX."
(site_mgr bts basebandTransceiver: 1)
onData: [:each | rsl send: each];
mainBts: self.
(site_mgr bts basebandTransceiver: 2)
onData: [:each | rsl2 send: each];
mainBts: self.
"Start the OML init now in a new thread"
oml_init := OMLBTSInit initWith: self.
[[oml_init run ] ensure: [Transcript nextPutAll: 'OML-Init exited'; nl]] fork.
]
waitForBTSReady [
<category: 'oml'>
"Wait for one more RSL connection."
oml_up wait.
^ super waitForBTSReady.
]
startRSL: aPort streamId: anId on: aTrx [
^ aTrx fomInstance trx = 0
ifTrue: [super startRSL: aPort streamId: anId on: aTrx]
ifFalse: [self startSecondRSL: aPort streamId: anId on: aTrx].
]
startSecondRSL: aPort streamId: anId on: aTrx [
| trx_id |
"Make sure the RSL id ends with a /1"
trx_id := bts_id copyFrom: 1 to: bts_id size - 1.
trx_id := trx_id , '1'.
rsl2 isNil ifFalse: [rsl2 stop].
rsl2 := BTSRslConnection new
onData: [:each | self handleRsl: each on: aTrx];
onStop: [self rslStopped: rsl2];
onConnect: [self rslConnected: rsl2];
btsId: trx_id;
streamId: anId;
yourself.
rsl2 connect: oml address port: aPort.
]
sendRSL: aMsg on: aTrx [
<category: 'rsl'>
aTrx fomInstance trx = 0
ifTrue: [rsl send: aMsg]
ifFalse: [rsl2 send: aMsg].
]
]

View File

@ -389,12 +389,31 @@ OMLManagerBase subclass: BTSOML [
<category: 'creation'>
radio_carrier := RadioCarrierOML new
parent: self;
id: 1;
yourself.
baseband := BasebandTransceiverOML new
parent: self;
id: 1;
yourself.
]
availableTrx [
<category: 'accessing'>
^ 1
]
radioCarrier: aNr [
<category: 'accessing'>
aNr = 1 ifFalse: [^self error: 'Wrong RadioCarrier number: ', aNr printString].
^ self radioCarrier
]
basebandTransceiver: aNr [
<category: 'accessing'>
aNr = 1 ifFalse: [^self error: 'Wrong Baseband number: ', aNr printString].
^ self basebandTransceiver
]
radioCarrier [
<category: 'accessing'>
^ radio_carrier
@ -436,6 +455,7 @@ OMLManagerBase subclass: BTSOML [
]
OMLManagerBase subclass: RadioCarrierOML [
| id |
<category: 'BTS-OML'>
<comment: 'I am the GSM 12.21 Radio carrier'>
@ -492,9 +512,14 @@ OMLManagerBase subclass: RadioCarrierOML [
self basicStart.
]
id: anId [
<category: 'creation'>
id := anId
]
fomInstance [
^ parent fomInstance
trx: 16r0;
trx: id - 1;
ts: 16rFF;
yourself.
]
@ -512,7 +537,7 @@ OMLManagerBase subclass: RadioCarrierOML [
]
OMLManagerBase subclass: BasebandTransceiverOML [
| channels onData |
| channels onData id mainBts |
<category: 'BTS-OML'>
<comment: 'I am the GSM 12.21 Baseband Transceiver'>
@ -581,10 +606,15 @@ OMLManagerBase subclass: BasebandTransceiverOML [
channels do: [:each | each start]
]
id: anId [
<category: 'creation'>
id := anId
]
fomInstance [
<category: 'oml'>
^ parent fomInstance
trx: 16r0;
trx: id - 1;
yourself.
]
@ -610,6 +640,16 @@ OMLManagerBase subclass: BasebandTransceiverOML [
<category: 'sending'>
onData value: aMsg.
]
mainBts: aBTS [
<category: 'creation'>
mainBts := aBTS
]
mainBts [
<category: 'access'>
^ mainBts
]
]
OMLChannelCombination extend [

View File

@ -0,0 +1,94 @@
"
Create a SM, BTS for a dual trx sceneriao
(C) 2012 by Holger Hans Peter Freyther
All Rights Reserved
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"
SiteManagerOML subclass: DualTrxSiteManager [
<category: 'BTS-OML-DualTRX'>
initialize [
<category: 'creation'>
bts := DualTrxBTSOML new
parent: self;
yourself.
]
]
BTSOML subclass: DualTrxBTSOML [
| rc_two baseband_two |
<category: 'BTS-OML-DualTRX'>
availableTrx [
<category: 'accessing'>
^ 2
]
initialize [
<category: 'creation'>
super initialize.
rc_two := RadioCarrierOML new
parent: self;
id: 2;
yourself.
baseband_two := BasebandTransceiverOML new
parent: self;
id: 2;
yourself.
]
radioCarrier: nr [
<category: 'accessing'>
nr = 1 ifTrue: [^radio_carrier].
nr = 2 ifTrue: [^rc_two].
^ self error: 'RC(%1) not available' % {nr}.
]
basebandTransceiver: nr [
<category: 'accessing'>
nr = 1 ifTrue: [^baseband].
nr = 2 ifTrue: [^baseband_two].
^ self error: 'Baseband(%1) not available' % {nr}.
]
start [
<category: 'accessing'>
attributes := nil.
self basicStart.
(self basebandTransceiver: 1) start.
(self radioCarrier: 1) start.
(self basebandTransceiver: 2) start.
(self radioCarrier: 2) start.
]
findObject: fomKey [
| bb |
<category: 'accessing'>
self fomKey = fomKey
ifTrue: [^self].
fomKey second = radio_carrier class objectClass
ifTrue: [
| rc |
rc := self radioCarrier: (fomKey first trx + 1).
^ rc findObject: fomKey].
bb := self basebandTransceiver: (fomKey first trx + 1).
^ bb findObject: fomKey.
]
]

View File

@ -198,10 +198,10 @@ Object subclass: OMLBTSInit [
^self error: 'Failed to get a (IPA) Formatted O&M message'
]
startRSL: aRsl [
startRSL: aRsl on: aTrx [
| port |
port := (aRsl port data asByteArray ushortAt: 1) swap16.
bts startRSL: port streamId: aRsl streamId data first.
bts startRSL: port streamId: aRsl streamId data first on: aTrx.
^ true
]
@ -233,37 +233,47 @@ Object subclass: OMLBTSInit [
aSem signal.
]
initBtsInstance: bts withQueue: btsQueue [
^ OMLBTSInstanceInit on: bts withInit: self withQueue: btsQueue.
]
btsInit: aSem [
| btsQueue bts init trxProc trxSem rcProc rcSem |
| btsQueue bts init trxSem rcSem |
btsQueue := SharedQueue new.
bts := sm bts.
queues at: bts fomKey put: btsQueue.
"1. Activate the software"
self forwardOML: bts createSwActivateRequest toMessage.
init := OMLBTSInstanceInit on: bts withInit: self withQueue: btsQueue.
init := self initBtsInstance: bts withQueue: btsQueue.
"Get the SM into the enabled state"
trxSem := Semaphore new.
rcSem := Semaphore new.
init start: [
trxProc := [self trxInit: trxSem] fork.
trxProc name: 'TRX Init process'.
rcProc := [self rcInit: rcSem] fork.
rcProc name: 'Radio-Carrier Init process'.
"Start all trx and radio carriers"
1 to: bts availableTrx do: [:each |
| trxProc rcProc |
trxProc := [self trxInit: trxSem on: (bts basebandTransceiver: each)] fork.
trxProc name: 'TRX(%1) Init process' % {each}.
rcProc := [self rcInit: rcSem on: (bts radioCarrier: each)] fork.
rcProc name: 'Radio-Carrier(%1) Init process' % {each}.
].
].
trxSem wait.
rcSem wait.
"Now wait for all of them to initialize"
1 to: bts availableTrx do: [:each |
trxSem wait.
rcSem wait.
].
bts availabilityStatus state: nil.
bts sendStateChanged.
aSem signal.
]
trxInit: aSem [
| trxQueue trx init msg res ack tss |
trxInit: aSem on: trx [
| trxQueue init msg res ack tss |
trxQueue := SharedQueue new.
trx := sm bts basebandTransceiver.
queues at: trx fomKey put: trxQueue.
"1. Activate the software"
@ -277,7 +287,7 @@ Object subclass: OMLBTSInit [
msg := trxQueue next.
msg omDataField class = IPAOMLRSLConnect
ifFalse: [self error: 'Failed to get the RSL Connect'].
res := self startRSL: msg omDataField.
res := self startRSL: msg omDataField on: trx.
ack := msg createResponse: res.
self forwardOML: ack toMessage.
@ -300,10 +310,9 @@ Object subclass: OMLBTSInit [
aSem signal.
]
rcInit: aSem [
| rcQueue rc init |
rcInit: aSem on: rc [
| rcQueue init |
rcQueue := SharedQueue new.
rc := sm bts radioCarrier.
queues at: rc fomKey put: rcQueue.
"1. Activate the software"

View File

@ -958,7 +958,8 @@ OMLMessageBase subclass: FOMMessage [
createNack [
<category: 'acking'>
^ self notImplementedYet
^ self class new
omDataField: om_field createNack.
]
createResponse: aResponse [
@ -1334,6 +1335,19 @@ OMLDataField subclass: OMLSetBTSAttributes [
ack instVarNamed: name put: (self instVarNamed: name)].
^ ack
]
createNack [
| nack |
<category: 'acking'>
nack := OMLSetBTSAttributesNack new
objectClass: self objectClass;
objectInstance: self objectInstance;
yourself.
self class instVarNames do: [:name |
nack instVarNamed: name put: (self instVarNamed: name)].
^ nack
]
]
OMLSetBTSAttributes subclass: OMLSetBTSAttributesAck [
@ -1346,6 +1360,16 @@ OMLSetBTSAttributes subclass: OMLSetBTSAttributesAck [
]
]
OMLSetBTSAttributes subclass: OMLSetBTSAttributesNack [
<category: 'BTS-OML'>
<comment: 'I construct a GSM 12.21 O&M Data field as of 8.6.1'>
OMLSetBTSAttributesNack class >> attributeType [
<category: 'parsing'>
^ FOMMessage msgSetBTSAttributesNack
]
]
OMLDataField subclass: OMLChangeAdminState [
| adm_state |

View File

@ -116,6 +116,16 @@ Object subclass: OpenBSCTest [
bts waitForBTSReady.
]
createAndConnectBTS: aNr [
<category: 'bts'>
bts := BTS new.
bts
btsId: aNr;
connect: 'localhost';
waitForBTSReady.
]
stopBts [
<category: 'bts'>
bts stop.
@ -134,6 +144,7 @@ Object subclass: OpenBSCTest [
rsl accessDelay: #(23) asRSLAttributeData.
lchan := bts waitForChannel: rsl toMessage with: ra.
lchan isNil ifTrue: [^self error: 'No LCHAN allocated.'].
^ LogicalChannelWrapper initWith: lchan.
]

View File

@ -295,10 +295,14 @@ RoundTripTestCase subclass: OMLMsgTest [
]
testSetBTSAttributes [
| oml |
| oml nack |
oml := OMLMessageBase parse: self setBtsAttributesData readStream.
self assert: oml omDataField class = OMLSetBTSAttributes.
self assert: oml toMessage asByteArray = self setBtsAttributesData asByteArray.
"Create a nack now"
nack := oml createResponse: false.
self assert: nack omDataField class = OMLSetBTSAttributesNack.
]
testAdmState [
@ -533,3 +537,20 @@ TestCase subclass: RSLIETest [
should: [RSLChannelNumber ccchRach subslotNumber] raise: Exception.
]
]
TestCase subclass: DualTrxSiteManagerTest [
<category: 'BTS-OML-DualTRX'>
testCreation [
| sm rc1 rc2 bb1 bb2 |
"Verify we have two RC and two Basebands"
sm := DualTrxSiteManager new.
rc1 := sm bts radioCarrier: 1.
rc2 := sm bts radioCarrier: 2.
bb1 := sm bts basebandTransceiver: 1.
bb2 := sm bts basebandTransceiver: 2.
self deny: rc1 == rc2.
self deny: bb1 == bb2.
]
]

View File

@ -7,10 +7,12 @@
<filein>OMLMsg.st</filein>
<filein>IPAOMLMsg.st</filein>
<filein>OML.st</filein>
<filein>OMLDualTrx.st</filein>
<filein>OMLInit.st</filein>
<filein>RSLMsg.st</filein>
<filein>BTSConnection.st</filein>
<filein>BTS.st</filein>
<filein>BTSDualTrx.st</filein>
<filein>OpenBSCTest.st</filein>
<test>
@ -23,6 +25,7 @@
<sunit>FakeBTS.RSLSmokeTest</sunit>
<sunit>FakeBTS.RSLRoundTripTest</sunit>
<sunit>FakeBTS.RSLIETest</sunit>
<sunit>FakeBTS.DualTrxSiteManagerTest</sunit>
<filein>Test.st</filein>
</test>
</package>

View File

@ -0,0 +1,96 @@
"
(C) 2012 by Holger Hans Peter Freyther
All Rights Reserved
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"
PackageLoader fileInPackage: #FakeBTS.
FakeBTS.OMLBTSInstanceInit subclass: NACKBTSInit [
<import: OsmoGSM>
<comment: 'I will respond with a nack...'>
waitForAttributes [
| msg res nack |
<category: 'protected'>
msg := queue next.
msg omDataField class = OMLSetBTSAttributes
ifFalse: [self error: 'Failed to get SetBTSAttributes'].
nack := msg createResponse: false.
omlInit forwardOML: nack toMessage.
]
]
FakeBTS.OMLBTSInit subclass: NACKInit [
<import: OsmoGSM>
<comment: 'I am an INIT that will nack the BTS Attributes to check
what OpenBSC will do'>
initBtsInstance: aBts withQueue: aQueue [
"Called to initialize the BTS Object"
^ NACKBTSInit on: aBts withInit: self withQueue: aQueue.
]
]
FakeBTS.BTS subclass: NACKBTS [
| oml_gone |
<import: OsmoGSM>
<comment: 'I am a BTS that will NACK the OML init and this should
cause this BTS to be dropped.'>
NACKBTS class >> omlInitClass [
^ NACKInit
]
connect: aHost [
self stop.
rsl := nil.
oml_gone := Semaphore new.
^ super connect: aHost.
]
omlStopped [
oml_gone signal.
]
waitForOMLGone [
"Wait until the OML connection is gone"
oml_gone wait.
]
]
Eval [
| btsAck btsNack test lchan |
"Connect and wait for the BTS to be ready."
btsAck := FakeBTS.BTS new
btsId: '1801/0/0'; connect: 'localhost';
waitForBTSReady; yourself.
"Now connect a NACK bts.."
btsNack := NACKBTS new
btsId: '1802/0/0'; connect: 'localhost';
waitForOMLGone; yourself.
"Verify that the first BTS is still connected"
test := FakeBTS.OpenBSCTest initWith: btsAck.
lchan := test requireAnyChannel.
lchan isNil
ifTrue: [Transcript nextPutAll: 'FAILED TO ALLOCATE A LCHAN'; nl.]
ifFalse: [Transcript nextPutAll: 'Test passed'; nl.].
]

View File

@ -0,0 +1,2 @@
This should test two BTS connecting and the second one should NACK
some OML attributes while the first BTS remains connected.

View File

@ -0,0 +1,147 @@
!
! OpenBSC (0.12.0.18-1a6b8-dirty) configuration saved from vty
!!
password foo
!
log stderr
logging color 1
logging timestamp 0
!
line vty
no login
!
e1_input
e1_line 0 driver ipa
e1_line 0 port 0
network
network country code 1
mobile network code 1
short name OpenBSC
long name OpenBSC
auth policy closed
location updating reject cause 13
encryption a5 0
neci 1
paging any use tch 0
rrlp mode none
mm info 1
handover 0
handover window rxlev averaging 10
handover window rxqual averaging 1
handover window rxlev neighbor averaging 10
handover power budget interval 6
handover power budget hysteresis 3
handover maximum distance 9999
timer t3101 10
timer t3103 0
timer t3105 0
timer t3107 0
timer t3109 0
timer t3111 0
timer t3113 60
timer t3115 0
timer t3117 0
timer t3119 0
timer t3122 0
timer t3141 0
dtx-used 0
subscriber-keep-in-ram 0
bts 0
type nanobts
band DCS1800
cell_identity 0
location_area_code 1
training_sequence_code 7
base_station_id_code 63
ms max power 15
cell reselection hysteresis 4
rxlev access min 0
channel allocator ascending
rach tx integer 9
rach max transmission 7
ip.access unit_id 1801 0
oml ip.access stream_id 255 line 0
neighbor-list mode automatic
gprs mode none
description BTS 1.. not nacked
trx 0
rf_locked 0
arfcn 809
nominal power 23
max_power_red 20
rsl e1 tei 0
timeslot 0
phys_chan_config CCCH+SDCCH4
hopping enabled 0
timeslot 1
phys_chan_config SDCCH8
hopping enabled 0
timeslot 2
phys_chan_config TCH/H
hopping enabled 0
timeslot 3
phys_chan_config TCH/F
hopping enabled 0
timeslot 4
phys_chan_config TCH/F
hopping enabled 0
timeslot 5
phys_chan_config TCH/F
hopping enabled 0
timeslot 6
phys_chan_config TCH/F
hopping enabled 0
timeslot 7
phys_chan_config TCH/F
hopping enabled 0
bts 1
type nanobts
band DCS1800
cell_identity 0
location_area_code 1
training_sequence_code 7
base_station_id_code 63
ms max power 15
cell reselection hysteresis 4
rxlev access min 0
channel allocator ascending
rach tx integer 9
rach max transmission 7
ip.access unit_id 1802 0
oml ip.access stream_id 255 line 0
neighbor-list mode automatic
gprs mode none
description BTS 1.. nacked
trx 0
rf_locked 0
arfcn 809
nominal power 23
max_power_red 20
rsl e1 tei 0
timeslot 0
phys_chan_config CCCH+SDCCH4
hopping enabled 0
timeslot 1
phys_chan_config SDCCH8
hopping enabled 0
timeslot 2
phys_chan_config TCH/F
hopping enabled 0
timeslot 3
phys_chan_config TCH/F
hopping enabled 0
timeslot 4
phys_chan_config TCH/F
hopping enabled 0
timeslot 5
phys_chan_config TCH/F
hopping enabled 0
timeslot 6
phys_chan_config TCH/F
hopping enabled 0
timeslot 7
phys_chan_config TCH/F
hopping enabled 0
mncc-int
default-codec tch-f efr
default-codec tch-h hr

View File

@ -0,0 +1 @@
End to End tests for OpenBSC/sysmoBTS

View File

@ -0,0 +1,151 @@
# Copyright (C) 2012 Holger Hans Peter Freyther
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import dbus
import sim, sms
class Modem(object):
def __init__(self, bus, name):
self.name = name
self.bus = bus
self.modem = dbus.Interface(bus.get_object('org.ofono', name), 'org.ofono.Modem')
def enable(self):
"""
Enable the given modem on the Bus
"""
self.modem.SetProperty("Powered", dbus.Boolean(1), timeout = 120)
def disable(self):
"""
Enable the given modem on the Bus
"""
self.modem.SetProperty("Powered", dbus.Boolean(0), timeout = 120)
def online(self):
"""
Switch-on on the RF Modem
"""
self.modem.SetProperty("Online", dbus.Boolean(1), timeout = 120)
def offline(self):
"""
Switch-off on the RF Modem
"""
self.modem.SetProperty("Online", dbus.Boolean(0), timeout = 120)
def is_enabled(self):
"""
Is the modem online?
"""
return bool(self._get_property('Powered'))
def manufacturer(self):
"""
Who is the owner of the mode?
"""
man = self.modem.GetProperties()
try:
return str(man['Manufacturer'])
except:
return None
def _get_property(self, name):
"""
Internal
"""
return self.modem.GetProperties()[name]
def sim(self):
return sim.Sim(self.bus, self.name)
def sms(self):
return sms.SmsManager(self.bus, self.name)
def register(self):
"""Ask for the module to register"""
network = dbus.Interface(
self.bus.get_object('org.ofono', self.name),
'org.ofono.NetworkRegistration')
network.Register()
def __repr__(self):
return "<Modem('%s')>" % self.name
def get(bus, name):
"""
Find the modem
"""
return Modem(bus, name)
def getmodems(bus):
"""
Find modems...
"""
obj = dbus.Interface(bus.get_object('org.ofono', '/'), 'org.ofono.Manager')
return [Modem(bus, str(x[0])) for x in obj.GetModems()]
def detect_modems(bus, sleep=True, poweroff=True):
"""
Detect the modems that can be used for the test...
"""
modems = getmodems(bus)
# Filter out the phonesim
modems = filter(lambda x: x.name != '/phonesim', modems)
wait = []
on = []
# Enable each modem...
for mod in modems:
if mod.is_enabled():
on.append(mod)
else:
print("Going to enable modem: %s" % mod.name)
mod.enable()
wait.append(mod)
# Now... wait a bit for the modem to do some init
if len(wait) >0 and sleep:
import time
print("I need to sleep some time for the modem to wake up")
time.sleep(20)
for mod in wait:
if mod.is_enabled():
on.append(mod)
# Now filter out the modems without a SIM Card
def modem_vendor(modem):
# Check if the modem vendor was queried
return modem.manufacturer() != None
def sim_present(modem):
return modem.sim().imsi() != None
on = filter(modem_vendor, on)
on = filter(sim_present, on)
# TODO: We could now disable all modems without a SIMcard
for mod in modems:
if mod in on or not poweroff:
continue
print("Modem %s is wihtout SIM card. Powering it down." % mod.name)
mod.disable()
return on

View File

@ -0,0 +1,63 @@
# Copyright (C) 2012 Holger Hans Peter Freyther
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import dbus, modem, openbsc
class NitbTest(object):
"""
I help with testing NITB/pseudoMSC code. E.g. for call testing..
and SMS..
"""
def __init__(self, host):
self.host = host
self.socket = openbsc.NITBSocket(host)
def setup(self):
# Get the modems.. now check which ones can be used for the
# test with a proper IMSI
self.modems = modem.detect_modems(dbus.SystemBus())
self.avail_modems = []
self.ext2mod = {}
self.mod2ext = {}
for mod in self.modems:
imsi = mod.sim().imsi()
if not self.socket.imsi_present(imsi):
print("Modem('%s') doesn't have a provisioned SIM('%s')" % (mod.name, imsi))
continue
self.avail_modems.append(mod)
ext = self.socket.extension(imsi)
self.ext2mod[ext] = mod
self.mod2ext[mod] = ext
# Now register
try:
mod.register()
except:
print("Registering %s failed. Continuing anyway" % mod)
# TODO: Check if all modems are registered? But this would delay the
# test further. But the modem's SIM card is inside the NITB HLR so it
# should be able to register.
def teardown(self):
pass
def run(self):
self.setup()
self.run_test()
self.teardown()

View File

@ -0,0 +1,61 @@
# Copyright (C) 2012 Holger Hans Peter Freyther
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# VTY helper code for OpenBSC
#
import socket
class _VTYSocket(object):
def __init__(self, name, host, port):
self.name = name
self.host = host
self.port = port
def connectSendAndWait(self, request):
sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sck.setblocking(1)
sck.connect((self.host, self.port))
sck.recv(4096)
end = '\r\n%s> ' % self.name
# Now send the command
sck.send("%s\r" % request)
res = ""
while True:
data = sck.recv(4096)
res = "%s%s" % (res, data)
if res.endswith(end):
break
sck.close()
return res[len(request) + 2: -len(end)]
class NITBSocket(object):
def __init__(self, host):
self.host = host
def _vty(self):
return _VTYSocket('OpenBSC', self.host, 4242)
def imsi_present(self, imsi):
res = self._vty().connectSendAndWait('show subscriber imsi %s' % imsi)
return not res.startswith('% No subscriber found for imsi ')
def extension(self, imsi):
if not self.imsi_present(imsi):
return None
res = self._vty().connectSendAndWait('show subscriber imsi %s' % imsi)
return res.split('\r\n')[2].split(': ')[1]

View File

@ -0,0 +1,41 @@
# Copyright (C) 2012 Holger Hans Peter Freyther
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import dbus
class Sim(object):
def __init__(self, bus, path):
self.bus = bus
self.path = path
self.sim = dbus.Interface(bus.get_object('org.ofono', path), 'org.ofono.SimManager')
def imsi(self):
res = self.sim.GetProperties(['SubscriberIdentity'])
try:
return str(res['SubscriberIdentity'])
except:
return None
def present(self):
"""The Wavecom driver is broken and 'detects' a SIM when there is None"""
return self.imsi() != None
def __repr__(self):
return "<Sim(imsi=%s) of '%s'>" % (self.imsi(), self.path)
def get(bus, path):
"""Get the SIM manager"""
return Sim(bus, path)

View File

@ -0,0 +1,63 @@
# Copyright (C) 2012 Holger Hans Peter Freyther
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import dbus
class Sms(object):
def __init__(self, bus, name):
self.bus = bus
self.name = name
self.sms = dbus.Interface(
bus.get_object('org.ofono', name),
'org.ofono.Message')
def cancel(self):
self.sms.Cancel()
def state(self):
return str(self.sms.GetProperties()['State'])
def __repr__(self):
return "<Sms('%s')>" % self.name
class SmsManager(object):
def __init__(self, bus, name):
self.sms = dbus.Interface(
bus.get_object('org.ofono', name),
'org.ofono.MessageManager')
self.bus = bus
self.name = name
def send_message(self, number, text, delivery_report):
self.sms.SetProperty('UseDeliveryReports', dbus.Boolean(int(delivery_report)))
return self.sms.SendMessage(number, text)
def all_message_names(self):
messages = self.sms.GetMessages()
return map(lambda x: str(x[0]), messages)
def get_message(self, path):
return Sms(self.bus, path)
def registerOnMsgAdded(self, cb):
self.sms.connect_to_signal('MessageAdded', cb)
def registerOnMsgRemoved(self, cb):
self.sms.connect_to_signal('MessageRemoved', cb)
def __repr__(self):
return "<SmsManager for Modem('%s')>" % self.name
def get(bus, name):
return SmsManager(bus, name)

View File

@ -0,0 +1,54 @@
#!/usr/bin/env python
# Copyright (C) 2012 Holger Hans Peter Freyther
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import dbus
import dbus.mainloop.glib
import gobject
import time
from osmocom import modem
messages = []
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
mod = modem.detect_modems(bus, sleep=False, poweroff=False)[0]
print mod.name
sim = mod.sim()
print sim.present()
print sim.imsi()
sms = mod.sms()
sms.send_message('2233', 'BLA', False)
print sms.all_message_names()
for name in sms.all_message_names():
msg = sms.get_message(name)
print msg.state()
print msg
msg.cancel()
#watcher = sms.SmsWatcher(sm)
#for i in range(1, 2000):
# messages.append(sms.sendessage(sm, '39323', 'TEST %d' % i, False))
#time.sleep(2)
#sms.wait_for_sent(messages)
mainloop = gobject.MainLoop()
print dir(mainloop)
mainloop.run()

View File

@ -0,0 +1,67 @@
#!/usr/bin/env python
# Copyright (C) 2012 Holger Hans Peter Freyther
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# Hi. I am going to send SMS between all available modules and then wait
# for it to complete and verify that all messages arrived and find out
# which one is missing.
from osmocom.nitb_test import NitbTest
class SmsMassTest(NitbTest):
def __init__(self, host, sms_to_send=1000):
NitbTest.__init__(self, host)
self.sms_to_send = sms_to_send
def run_test(self):
"""Run the test"""
self.clear_all_sms()
extensions = self.ext2mod.keys()
for nr in xrange(1, self.sms_to_send):
for modem in self.avail_modems:
self.send_sms(nr, modem, extensions)
# Now wait.... TODO. We could connect to the MessageAdded/Removed
# signal and enter the event loop here...
if len(self.avail_modems) > 0:
print("Queued a lot of SMS... now waiting.")
else:
print("No SMS queued due lack of modems.")
def clear_all_sms(self):
# Clear all SMS so we can easily verify we get SMS..
for modem in self.avail_modems:
sms = modem.sms()
for msg in sms.all_message_names():
try:
sms.get_message(msg).cancel()
except:
print("Deleting SMS('%s') failed. Continuing." % msg)
def send_sms(self, nr, modem, extension_lists):
# Send a SMS to all extensions
sms = modem.sms()
for extension in extension_lists:
# Do not send a SMS to myself (unless this wants to be tested)
if self.ext2mod[extension] == modem:
continue
text = "This is SMS %d from %s to %s" % (nr, self.mod2ext[modem], extension)
sms.send_message(extension, text, False)
if __name__ == "__main__":
SmsMassTest('localhost').run()