From ce184f83f0ebad256fe05be75c2b53d03a5ee158 Mon Sep 17 00:00:00 2001 From: Holger Hans Peter Freyther Date: Sat, 29 Apr 2017 14:43:28 +0200 Subject: [PATCH] test: Add a manual test to simulate DTMF handling in osmo-sip-connector Add a manul test to the contrib folder that helped and can help during the development of osmo-sip-connector or similar software. This avoids having to create a separate mncc module. It can be started like: $ PYTHONPATH=$PWD/../ python manual_test_server.py Change-Id: I7f62efbc62455e6fcb1f091afb5fa120099834f0 --- contrib/manual_test_server.py | 126 ++++++++++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 contrib/manual_test_server.py diff --git a/contrib/manual_test_server.py b/contrib/manual_test_server.py new file mode 100644 index 0000000..5625670 --- /dev/null +++ b/contrib/manual_test_server.py @@ -0,0 +1,126 @@ +import mncc +import mncc_sock +import ctypes +import socket + +GSM340_PLAN_ISDN = 1 +GSM340_TYPE_NATIONAL = 2 + +class MnccMessageBuilder(object): + """ + I help in creating messages... + """ + @staticmethod + def build_hello(): + hello = mncc.gsm_mncc_hello() + hello.msg_type = mncc.MNCC_SOCKET_HELLO + hello.version = mncc.MNCC_SOCK_VERSION + hello.mncc_size = ctypes.sizeof(mncc.gsm_mncc) + hello.data_frame_size = ctypes.sizeof(mncc.gsm_data_frame) + hello.called_offset = mncc.gsm_mncc.called.offset + hello.signal_offset = mncc.gsm_mncc.signal.offset + hello.emergency_offset = mncc.gsm_mncc.emergency.offset + hello.lchan_type_offset = mncc.gsm_mncc.lchan_type.offset + return hello + + @staticmethod + def build_mncc_number(number): + return mncc.gsm_mncc_number( + type=GSM340_TYPE_NATIONAL, + plan=GSM340_PLAN_ISDN, + number=number) + + @staticmethod + def build_setup_ind(calling, called, callref=1): + setup = mncc.gsm_mncc() + setup.msg_type = mncc.MNCC_SETUP_IND + setup.callref = callref + setup.fields = mncc.MNCC_F_CALLED | mncc.MNCC_F_CALLING + setup.called = MnccMessageBuilder.build_mncc_number(called) + setup.calling = MnccMessageBuilder.build_mncc_number(calling) + return setup + + @staticmethod + def build_setup_cmpl_ind(callref=1): + setup = mncc.gsm_mncc() + setup.msg_type = mncc.MNCC_SETUP_COMPL_IND + setup.callref = callref + return setup + + @staticmethod + def build_rtp_msg(msg_type, callref, addr, port): + return mncc.gsm_mncc_rtp( + msg_type=msg_type, callref=callref, + ip=addr, port=port, + #payload_type=3, + #payload_msg_type=mncc.GSM_TCHF_FRAME) + payload_type=98, + payload_msg_type=mncc.GSM_TCH_FRAME_AMR) + + @staticmethod + def build_dtmf_start(callref, data): + return mncc.gsm_mncc( + msg_type=mncc.MNCC_START_DTMF_IND, + callref=callref, + fields=mncc.MNCC_F_KEYPAD, + keypad=ord(data)) + + @staticmethod + def build_dtmf_stop(callref, data): + return mncc.gsm_mncc( + callref=callref, + msg_type=mncc.MNCC_STOP_DTMF_IND) + +def send_dtmf(callref): + global conn + + conn.send_msg(MnccMessageBuilder.build_dtmf_start(callref, '1')) + conn.send_msg(MnccMessageBuilder.build_dtmf_stop(callref, '1')) + conn.send_msg(MnccMessageBuilder.build_dtmf_start(callref, '2')) + conn.send_msg(MnccMessageBuilder.build_dtmf_stop(callref, '2')) + + +server = mncc_sock.MnccSocketServer() +conn = server.accept() + +# Say hello and set-up a call +conn.send_msg(MnccMessageBuilder.build_hello()) +conn.send_msg(MnccMessageBuilder.build_setup_ind("1234", "5000")) +print("=> Sent hello + setup indication") + +# Wait for the RTP crate.. and actknowledge it.. +msg = conn.recv() +assert msg.msg_type == mncc.MNCC_RTP_CREATE +print("<= Received request to create a RTP socket") +conn.send_msg(MnccMessageBuilder.build_rtp_msg(mncc.MNCC_RTP_CREATE, + msg.callref, + #socket.INADDR_LOOPBACK, 4000)) + socket.INADDR_ANY, 4000)) +print("=> Claimed socket was created...") + +msg = conn.recv() +assert msg.msg_type == mncc.MNCC_CALL_PROC_REQ +print("<= Received proceeding...") + + + +while True: + msg = conn.recv() + if msg.msg_type == mncc.MNCC_ALERT_REQ: + print("=> I should alert...") + continue + if msg.msg_type == mncc.MNCC_RTP_CONNECT: + conn.send_msg(MnccMessageBuilder.build_rtp_msg(mncc.MNCC_RTP_CONNECT, + msg.callref, + socket.INADDR_LOOPBACK, 4000)) + print("=> I needed to connect RTP...") + continue + if msg.msg_type == mncc.MNCC_SETUP_RSP: + print("=> Call is connected?") + conn.send_msg(MnccMessageBuilder.build_setup_cmpl_ind(msg.callref)) + send_dtmf(msg.callref) + continue + + print(msg) + print(msg.msg_type) +print(type(msg))