Add a manul test to the contrib folder that helped and can help during the development of osmo-sip-connector or similar software. This avoids having to create a separate mncc module. It can be started like: $ PYTHONPATH=$PWD/../ python manual_test_server.py Change-Id: I7f62efbc62455e6fcb1f091afb5fa120099834f0changes/52/2452/1
parent
03980b4e31
commit
ce184f83f0
@ -0,0 +1,126 @@ |
||||
import mncc |
||||
import mncc_sock |
||||
import ctypes |
||||
import socket |
||||
|
||||
GSM340_PLAN_ISDN = 1 |
||||
GSM340_TYPE_NATIONAL = 2 |
||||
|
||||
class MnccMessageBuilder(object): |
||||
""" |
||||
I help in creating messages... |
||||
""" |
||||
@staticmethod |
||||
def build_hello(): |
||||
hello = mncc.gsm_mncc_hello() |
||||
hello.msg_type = mncc.MNCC_SOCKET_HELLO |
||||
hello.version = mncc.MNCC_SOCK_VERSION |
||||
hello.mncc_size = ctypes.sizeof(mncc.gsm_mncc) |
||||
hello.data_frame_size = ctypes.sizeof(mncc.gsm_data_frame) |
||||
hello.called_offset = mncc.gsm_mncc.called.offset |
||||
hello.signal_offset = mncc.gsm_mncc.signal.offset |
||||
hello.emergency_offset = mncc.gsm_mncc.emergency.offset |
||||
hello.lchan_type_offset = mncc.gsm_mncc.lchan_type.offset |
||||
return hello |
||||
|
||||
@staticmethod |
||||
def build_mncc_number(number): |
||||
return mncc.gsm_mncc_number( |
||||
type=GSM340_TYPE_NATIONAL, |
||||
plan=GSM340_PLAN_ISDN, |
||||
number=number) |
||||
|
||||
@staticmethod |
||||
def build_setup_ind(calling, called, callref=1): |
||||
setup = mncc.gsm_mncc() |
||||
setup.msg_type = mncc.MNCC_SETUP_IND |
||||
setup.callref = callref |
||||
setup.fields = mncc.MNCC_F_CALLED | mncc.MNCC_F_CALLING |
||||
setup.called = MnccMessageBuilder.build_mncc_number(called) |
||||
setup.calling = MnccMessageBuilder.build_mncc_number(calling) |
||||
return setup |
||||
|
||||
@staticmethod |
||||
def build_setup_cmpl_ind(callref=1): |
||||
setup = mncc.gsm_mncc() |
||||
setup.msg_type = mncc.MNCC_SETUP_COMPL_IND |
||||
setup.callref = callref |
||||
return setup |
||||
|
||||
@staticmethod |
||||
def build_rtp_msg(msg_type, callref, addr, port): |
||||
return mncc.gsm_mncc_rtp( |
||||
msg_type=msg_type, callref=callref, |
||||
ip=addr, port=port, |
||||
#payload_type=3, |
||||
#payload_msg_type=mncc.GSM_TCHF_FRAME) |
||||
payload_type=98, |
||||
payload_msg_type=mncc.GSM_TCH_FRAME_AMR) |
||||
|
||||
@staticmethod |
||||
def build_dtmf_start(callref, data): |
||||
return mncc.gsm_mncc( |
||||
msg_type=mncc.MNCC_START_DTMF_IND, |
||||
callref=callref, |
||||
fields=mncc.MNCC_F_KEYPAD, |
||||
keypad=ord(data)) |
||||
|
||||
@staticmethod |
||||
def build_dtmf_stop(callref, data): |
||||
return mncc.gsm_mncc( |
||||
callref=callref, |
||||
msg_type=mncc.MNCC_STOP_DTMF_IND) |
||||
|
||||
def send_dtmf(callref): |
||||
global conn |
||||
|
||||
conn.send_msg(MnccMessageBuilder.build_dtmf_start(callref, '1')) |
||||
conn.send_msg(MnccMessageBuilder.build_dtmf_stop(callref, '1')) |
||||
conn.send_msg(MnccMessageBuilder.build_dtmf_start(callref, '2')) |
||||
conn.send_msg(MnccMessageBuilder.build_dtmf_stop(callref, '2')) |
||||
|
||||
|
||||
server = mncc_sock.MnccSocketServer() |
||||
conn = server.accept() |
||||
|
||||
# Say hello and set-up a call |
||||
conn.send_msg(MnccMessageBuilder.build_hello()) |
||||
conn.send_msg(MnccMessageBuilder.build_setup_ind("1234", "5000")) |
||||
print("=> Sent hello + setup indication") |
||||
|
||||
# Wait for the RTP crate.. and actknowledge it.. |
||||
msg = conn.recv() |
||||
assert msg.msg_type == mncc.MNCC_RTP_CREATE |
||||
print("<= Received request to create a RTP socket") |
||||
conn.send_msg(MnccMessageBuilder.build_rtp_msg(mncc.MNCC_RTP_CREATE, |
||||
msg.callref, |
||||
#socket.INADDR_LOOPBACK, 4000)) |
||||
socket.INADDR_ANY, 4000)) |
||||
print("=> Claimed socket was created...") |
||||
|
||||
msg = conn.recv() |
||||
assert msg.msg_type == mncc.MNCC_CALL_PROC_REQ |
||||
print("<= Received proceeding...") |
||||
|
||||
|
||||
|
||||
while True: |
||||
msg = conn.recv() |
||||
if msg.msg_type == mncc.MNCC_ALERT_REQ: |
||||
print("=> I should alert...") |
||||
continue |
||||
if msg.msg_type == mncc.MNCC_RTP_CONNECT: |
||||
conn.send_msg(MnccMessageBuilder.build_rtp_msg(mncc.MNCC_RTP_CONNECT, |
||||
msg.callref, |
||||
socket.INADDR_LOOPBACK, 4000)) |
||||
print("=> I needed to connect RTP...") |
||||
continue |
||||
if msg.msg_type == mncc.MNCC_SETUP_RSP: |
||||
print("=> Call is connected?") |
||||
conn.send_msg(MnccMessageBuilder.build_setup_cmpl_ind(msg.callref)) |
||||
send_dtmf(msg.callref) |
||||
continue |
||||
|
||||
print(msg) |
||||
print(msg.msg_type) |
||||
print(type(msg)) |
Loading…
Reference in new issue