#!/usr/bin/env python2 import subprocess import socket import sys import time import random import struct import timeout_decorator verbose = True def prefix_ipa_ctrl_header(data): return struct.pack(">HBB", len(data)+1, 0xee, 0) + data def ipa_ctrl_header(header): (plen, ipa_proto, osmo_proto) = struct.unpack(">HBB", header) return None if (ipa_proto != 0xee or osmo_proto != 0) else plen def remove_ipa_ctrl_header(data): if (len(data) < 4): raise BaseException("Answer too short!") plen = ipa_ctrl_header(data[:4]) if (None == plen): raise BaseException("Wrong protocol in answer!") if (plen + 3 > len(data)): print "Warning: Wrong payload length (expected %i, got %i)" % (plen, len(data) - 3) return data[4:plen+3], data[plen+3:] def connect(host, port): if verbose: print "Connecting to host %s:%i" % (host, port) sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sck.setblocking(1) sck.connect((host, port)) return sck def send(sck, data): if verbose: print "Sending \"%s\"" %(data) data = prefix_ipa_ctrl_header(data) sck.send(data) def do_set(var, value, op_id, sck): setmsg = "SET %s %s %s" %(op_id, var, value) send(sck, setmsg) def do_get(var, op_id, sck): getmsg = "GET %s %s" %(op_id, var) send(sck, getmsg) def do_set_get(sck, var, value = None): r = random.randint(1, sys.maxint) if (value != None): s = 'SET_REPLY' do_set(var, value, r, sck) else: s = 'GET_REPLY' do_get(var, r, sck) (answer, data) = remove_ipa_ctrl_header(sck.recv(4096)) x = answer.split() if (s == x[0] and str(r) == x[1] and var == x[2]): return None if ('SET_REPLY' == s and value != x[3]) else x[3] return None def set_var(sck, var, val): return do_set_get(sck, var, val) @timeout_decorator.timeout(20) def run_ussd_test(): gst = subprocess.Popen(["gst", "ussd/USSDTest.st"]) res = gst.wait() print(res) if res != 23: raise Exception("Non success error code") # Start OpenBSCand give it some time to start obsc = subprocess.Popen(["../openbsc/openbsc/src/osmo-nitb/osmo-nitb", "-c", "../openbsc/openbsc/doc/examples/osmo-nitb/nanobts/openbsc.cfg", "-e", "1"]) time.sleep(10) # try to create and authorize subscriber sck = connect("localhost", 4249) res = set_var(sck, b"subscriber-modify-v1", b"901010000001111,445567") sck.close() # Okay now.. execute the tests.. run_ussd_test() obsc.terminate() obsc.wait() res = proc.wait() print(res) sys.exit(1)