gr-gsm/python/qa_burst_printer.py

298 lines
20 KiB
Python
Executable File

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @file
# @author (C) 2015 by Roman Khassraf <rkhassraf@gmail.com>
# @section LICENSE
#
# Gr-gsm is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3, or (at your option)
# any later version.
#
# Gr-gsm is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with gr-gsm; see the file COPYING. If not, write to
# the Free Software Foundation, Inc., 51 Franklin Street,
# Boston, MA 02110-1301, USA.
#
#
from gnuradio import gr, gr_unittest, blocks
import grgsm_swig as grgsm
import os
import pmt
import sys
import tempfile
class qa_burst_printer (gr_unittest.TestCase):
def setUp (self):
self.tb = gr.top_block()
self.tmpfile = tempfile.NamedTemporaryFile()
self.prevfd = os.dup(sys.stdout.fileno())
os.dup2(self.tmpfile.fileno(), sys.stdout.fileno())
self.prev = sys.stdout
sys.stdout = os.fdopen(self.prevfd, "w")
def tearDown (self):
self.tb = None
os.dup2(self.prevfd, self.prev.fileno())
sys.stdout = self.prev
self.tmpfile.close()
def getOutput(self):
self.tmpfile.seek(0)
return self.tmpfile.read()
def getOutputExpected(self, expected_lines):
out = ""
for l in expected_lines:
out = out + l + "\n"
return out.encode('utf-8')
def test_001_complete_bursts_prefix (self):
"""
Complete bursts, without any prefix
"""
framenumbers_input = [2569043, 2569044, 2569045, 2569046]
timeslots_input = [2, 2, 2, 2]
bursts_input = [
"0001100000010010011110111110011111000000001010001111000000001000110101110010000011010111100101101010000001111010100010110111101011101011100000101000",
"0001000101111101111110000010100001011011111010111110101011101000110101110010000011010111110110111101101111110000011011010111011111001011101000011000",
"0000001000011110111110101011001000110000000000110110101100011000110101110010000011010111001010100101011111001000111100000100000111111000000101110000",
"0001101010111110010001010110101100000011101100011111110100101000110101110010000011010111111000000001010010111001111111011001000000001001000011101000"
]
expected_lines = [
"0001100000010010011110111110011111000000001010001111000000001000110101110010000011010111100101101010000001111010100010110111101011101011100000101000",
"0001000101111101111110000010100001011011111010111110101011101000110101110010000011010111110110111101101111110000011011010111011111001011101000011000",
"0000001000011110111110101011001000110000000000110110101100011000110101110010000011010111001010100101011111001000111100000100000111111000000101110000",
"0001101010111110010001010110101100000011101100011111110100101000110101110010000011010111111000000001010010111001111111011001000000001001000011101000"
]
src = grgsm.burst_source(framenumbers_input, timeslots_input, bursts_input)
printer = grgsm.bursts_printer(pmt.intern(""), False, False, False, False)
self.tb.msg_connect(src, "out", printer, "bursts")
self.tb.run()
self.assertEqual(self.getOutput(), self.getOutputExpected(expected_lines))
def test_002_complete_bursts_prefix (self):
"""
Complete bursts, with a string prefix
"""
framenumbers_input = [2569043, 2569044, 2569045, 2569046]
timeslots_input = [2, 2, 2, 2]
bursts_input = [
"0001100000010010011110111110011111000000001010001111000000001000110101110010000011010111100101101010000001111010100010110111101011101011100000101000",
"0001000101111101111110000010100001011011111010111110101011101000110101110010000011010111110110111101101111110000011011010111011111001011101000011000",
"0000001000011110111110101011001000110000000000110110101100011000110101110010000011010111001010100101011111001000111100000100000111111000000101110000",
"0001101010111110010001010110101100000011101100011111110100101000110101110010000011010111111000000001010010111001111111011001000000001001000011101000"
]
expected_lines = [
"Test 0001100000010010011110111110011111000000001010001111000000001000110101110010000011010111100101101010000001111010100010110111101011101011100000101000",
"Test 0001000101111101111110000010100001011011111010111110101011101000110101110010000011010111110110111101101111110000011011010111011111001011101000011000",
"Test 0000001000011110111110101011001000110000000000110110101100011000110101110010000011010111001010100101011111001000111100000100000111111000000101110000",
"Test 0001101010111110010001010110101100000011101100011111110100101000110101110010000011010111111000000001010010111001111111011001000000001001000011101000"
]
src = grgsm.burst_source(framenumbers_input, timeslots_input, bursts_input)
printer = grgsm.bursts_printer(pmt.intern("Test "), False, False, False, False)
self.tb.msg_connect(src, "out", printer, "bursts")
self.tb.run()
self.assertEqual(self.getOutput(), self.getOutputExpected(expected_lines))
def test_003_complete_bursts_fnr (self):
"""
Complete bursts, no prefix, with frame number
"""
framenumbers_input = [2569043, 2569044, 2569045, 2569046]
timeslots_input = [2, 2, 2, 2]
bursts_input = [
"0001100000010010011110111110011111000000001010001111000000001000110101110010000011010111100101101010000001111010100010110111101011101011100000101000",
"0001000101111101111110000010100001011011111010111110101011101000110101110010000011010111110110111101101111110000011011010111011111001011101000011000",
"0000001000011110111110101011001000110000000000110110101100011000110101110010000011010111001010100101011111001000111100000100000111111000000101110000",
"0001101010111110010001010110101100000011101100011111110100101000110101110010000011010111111000000001010010111001111111011001000000001001000011101000"
]
expected_lines = [
"2569043: 0001100000010010011110111110011111000000001010001111000000001000110101110010000011010111100101101010000001111010100010110111101011101011100000101000",
"2569044: 0001000101111101111110000010100001011011111010111110101011101000110101110010000011010111110110111101101111110000011011010111011111001011101000011000",
"2569045: 0000001000011110111110101011001000110000000000110110101100011000110101110010000011010111001010100101011111001000111100000100000111111000000101110000",
"2569046: 0001101010111110010001010110101100000011101100011111110100101000110101110010000011010111111000000001010010111001111111011001000000001001000011101000"
]
src = grgsm.burst_source(framenumbers_input, timeslots_input, bursts_input)
printer = grgsm.bursts_printer(pmt.intern(""), True, False, False, False)
self.tb.msg_connect(src, "out", printer, "bursts")
self.tb.run()
self.assertEqual(self.getOutput(), self.getOutputExpected(expected_lines))
def test_004_complete_bursts_fcount (self):
"""
Complete bursts, no prefix, with frame count
"""
framenumbers_input = [2569043, 2569044, 2569045, 2569046]
timeslots_input = [2, 2, 2, 2]
bursts_input = [
"0001100000010010011110111110011111000000001010001111000000001000110101110010000011010111100101101010000001111010100010110111101011101011100000101000",
"0001000101111101111110000010100001011011111010111110101011101000110101110010000011010111110110111101101111110000011011010111011111001011101000011000",
"0000001000011110111110101011001000110000000000110110101100011000110101110010000011010111001010100101011111001000111100000100000111111000000101110000",
"0001101010111110010001010110101100000011101100011111110100101000110101110010000011010111111000000001010010111001111111011001000000001001000011101000"
]
expected_lines = [
"3967625: 0001100000010010011110111110011111000000001010001111000000001000110101110010000011010111100101101010000001111010100010110111101011101011100000101000",
"3967658: 0001000101111101111110000010100001011011111010111110101011101000110101110010000011010111110110111101101111110000011011010111011111001011101000011000",
"3967691: 0000001000011110111110101011001000110000000000110110101100011000110101110010000011010111001010100101011111001000111100000100000111111000000101110000",
"3967724: 0001101010111110010001010110101100000011101100011111110100101000110101110010000011010111111000000001010010111001111111011001000000001001000011101000"
]
src = grgsm.burst_source(framenumbers_input, timeslots_input, bursts_input)
printer = grgsm.bursts_printer(pmt.intern(""), False, True, False, False)
self.tb.msg_connect(src, "out", printer, "bursts")
self.tb.run()
self.assertEqual(self.getOutput(), self.getOutputExpected(expected_lines))
def test_005_complete_bursts_fnr_fcount (self):
"""
Complete bursts, no prefix, with frame number and frame count
"""
framenumbers_input = [2569043, 2569044, 2569045, 2569046]
timeslots_input = [2, 2, 2, 2]
bursts_input = [
"0001100000010010011110111110011111000000001010001111000000001000110101110010000011010111100101101010000001111010100010110111101011101011100000101000",
"0001000101111101111110000010100001011011111010111110101011101000110101110010000011010111110110111101101111110000011011010111011111001011101000011000",
"0000001000011110111110101011001000110000000000110110101100011000110101110010000011010111001010100101011111001000111100000100000111111000000101110000",
"0001101010111110010001010110101100000011101100011111110100101000110101110010000011010111111000000001010010111001111111011001000000001001000011101000"
]
expected_lines = [
"2569043 3967625: 0001100000010010011110111110011111000000001010001111000000001000110101110010000011010111100101101010000001111010100010110111101011101011100000101000",
"2569044 3967658: 0001000101111101111110000010100001011011111010111110101011101000110101110010000011010111110110111101101111110000011011010111011111001011101000011000",
"2569045 3967691: 0000001000011110111110101011001000110000000000110110101100011000110101110010000011010111001010100101011111001000111100000100000111111000000101110000",
"2569046 3967724: 0001101010111110010001010110101100000011101100011111110100101000110101110010000011010111111000000001010010111001111111011001000000001001000011101000"
]
src = grgsm.burst_source(framenumbers_input, timeslots_input, bursts_input)
printer = grgsm.bursts_printer(pmt.intern(""), True, True, False, False)
self.tb.msg_connect(src, "out", printer, "bursts")
self.tb.run()
self.assertEqual(self.getOutput(), self.getOutputExpected(expected_lines))
def test_006_payload_prefix_fnr_fcount (self):
"""
Payload only, no prefix, with frame number and frame count
Bursts 4-7 are dummy bursts
"""
framenumbers_input = [2569043, 2569044, 2569045, 2569046, 1099602, 1099603, 1099604, 1099605]
timeslots_input = [2, 2, 2, 2, 4, 4, 4, 4]
bursts_input = [
"0001100000010010011110111110011111000000001010001111000000001000110101110010000011010111100101101010000001111010100010110111101011101011100000101000",
"0001000101111101111110000010100001011011111010111110101011101000110101110010000011010111110110111101101111110000011011010111011111001011101000011000",
"0000001000011110111110101011001000110000000000110110101100011000110101110010000011010111001010100101011111001000111100000100000111111000000101110000",
"0001101010111110010001010110101100000011101100011111110100101000110101110010000011010111111000000001010010111001111111011001000000001001000011101000",
"0001111101101110110000010100100111000001001000100000001111100011100010111000101110001010111010010100011001100111001111010011111000100101111101010000",
"0001111101101110110000010100100111000001001000100000001111100011100010111000101110001010111010010100011001100111001111010011111000100101111101010000",
"0001111101101110110000010100100111000001001000100000001111100011100010111000101110001010111010010100011001100111001111010011111000100101111101010000",
"0001111101101110110000010100100111000001001000100000001111100011100010111000101110001010111010010100011001100111001111010011111000100101111101010000"
]
expected_lines = [
"test_006: 2569043 3967625: 110000001001001111011111001111100000000101000111100000000100101101010000001111010100010110111101011101011100000101",
"test_006: 2569044 3967658: 100010111110111111000001010000101101111101011111010101110110110111101101111110000011011010111011111001011101000011",
"test_006: 2569045 3967691: 000100001111011111010101100100011000000000011011010110001001010100101011111001000111100000100000111111000000101110",
"test_006: 2569046 3967724: 110101011111001000101011010110000001110110001111111010010111000000001010010111001111111011001000000001001000011101",
"test_006: 1099602 1699146: 111110110111011000001010010011100000100100010000000111110111010010100011001100111001111010011111000100101111101010",
"test_006: 1099603 1699179: 111110110111011000001010010011100000100100010000000111110111010010100011001100111001111010011111000100101111101010",
"test_006: 1099604 1699212: 111110110111011000001010010011100000100100010000000111110111010010100011001100111001111010011111000100101111101010",
"test_006: 1099605 1699245: 111110110111011000001010010011100000100100010000000111110111010010100011001100111001111010011111000100101111101010"
]
src = grgsm.burst_source(framenumbers_input, timeslots_input, bursts_input)
printer = grgsm.bursts_printer(pmt.intern("test_006: "), True, True, True, False)
self.tb.msg_connect(src, "out", printer, "bursts")
self.tb.run()
self.assertEqual(self.getOutput(), self.getOutputExpected(expected_lines))
def test_007_payload_prefix_fnr_fcount (self):
"""
Payload only, no prefix, with frame number, frame count, and ignoring dummy bursts
Bursts 4-7 are dummy bursts
"""
framenumbers_input = [2569043, 2569044, 2569045, 2569046, 1099602, 1099603, 1099604, 1099605]
timeslots_input = [2, 2, 2, 2, 4, 4, 4, 4]
bursts_input = [
"0001100000010010011110111110011111000000001010001111000000001000110101110010000011010111100101101010000001111010100010110111101011101011100000101000",
"0001000101111101111110000010100001011011111010111110101011101000110101110010000011010111110110111101101111110000011011010111011111001011101000011000",
"0000001000011110111110101011001000110000000000110110101100011000110101110010000011010111001010100101011111001000111100000100000111111000000101110000",
"0001101010111110010001010110101100000011101100011111110100101000110101110010000011010111111000000001010010111001111111011001000000001001000011101000",
"0001111101101110110000010100100111000001001000100000001111100011100010111000101110001010111010010100011001100111001111010011111000100101111101010000",
"0001111101101110110000010100100111000001001000100000001111100011100010111000101110001010111010010100011001100111001111010011111000100101111101010000",
"0001111101101110110000010100100111000001001000100000001111100011100010111000101110001010111010010100011001100111001111010011111000100101111101010000",
"0001111101101110110000010100100111000001001000100000001111100011100010111000101110001010111010010100011001100111001111010011111000100101111101010000"
]
expected_lines = [
"test_007: 2569043 3967625: 110000001001001111011111001111100000000101000111100000000100101101010000001111010100010110111101011101011100000101",
"test_007: 2569044 3967658: 100010111110111111000001010000101101111101011111010101110110110111101101111110000011011010111011111001011101000011",
"test_007: 2569045 3967691: 000100001111011111010101100100011000000000011011010110001001010100101011111001000111100000100000111111000000101110",
"test_007: 2569046 3967724: 110101011111001000101011010110000001110110001111111010010111000000001010010111001111111011001000000001001000011101"
]
src = grgsm.burst_source(framenumbers_input, timeslots_input, bursts_input)
printer = grgsm.bursts_printer(pmt.intern("test_007: "), True, True, True, True)
self.tb.msg_connect(src, "out", printer, "bursts")
self.tb.run()
self.assertEqual(self.getOutput(), self.getOutputExpected(expected_lines))
def test_008_complete_prefix_fnr_fcount (self):
"""
Complete bursts, no prefix, with frame number, frame count, and ignoring dummy bursts
Bursts 4-7 are dummy bursts
"""
framenumbers_input = [2569043, 2569044, 2569045, 2569046, 1099602, 1099603, 1099604, 1099605]
timeslots_input = [2, 2, 2, 2, 4, 4, 4, 4]
bursts_input = [
"0001100000010010011110111110011111000000001010001111000000001000110101110010000011010111100101101010000001111010100010110111101011101011100000101000",
"0001000101111101111110000010100001011011111010111110101011101000110101110010000011010111110110111101101111110000011011010111011111001011101000011000",
"0000001000011110111110101011001000110000000000110110101100011000110101110010000011010111001010100101011111001000111100000100000111111000000101110000",
"0001101010111110010001010110101100000011101100011111110100101000110101110010000011010111111000000001010010111001111111011001000000001001000011101000",
"0001111101101110110000010100100111000001001000100000001111100011100010111000101110001010111010010100011001100111001111010011111000100101111101010000",
"0001111101101110110000010100100111000001001000100000001111100011100010111000101110001010111010010100011001100111001111010011111000100101111101010000",
"0001111101101110110000010100100111000001001000100000001111100011100010111000101110001010111010010100011001100111001111010011111000100101111101010000",
"0001111101101110110000010100100111000001001000100000001111100011100010111000101110001010111010010100011001100111001111010011111000100101111101010000"
]
expected_lines = [
"test_008: 2569043 3967625: 0001100000010010011110111110011111000000001010001111000000001000110101110010000011010111100101101010000001111010100010110111101011101011100000101000",
"test_008: 2569044 3967658: 0001000101111101111110000010100001011011111010111110101011101000110101110010000011010111110110111101101111110000011011010111011111001011101000011000",
"test_008: 2569045 3967691: 0000001000011110111110101011001000110000000000110110101100011000110101110010000011010111001010100101011111001000111100000100000111111000000101110000",
"test_008: 2569046 3967724: 0001101010111110010001010110101100000011101100011111110100101000110101110010000011010111111000000001010010111001111111011001000000001001000011101000"
]
src = grgsm.burst_source(framenumbers_input, timeslots_input, bursts_input)
printer = grgsm.bursts_printer(pmt.intern("test_008: "), True, True, False, True)
self.tb.msg_connect(src, "out", printer, "bursts")
self.tb.run()
self.assertEqual(self.getOutput(), self.getOutputExpected(expected_lines))
if __name__ == '__main__':
gr_unittest.run(qa_burst_printer)