osmo-ttcn3-hacks/asterisk/Asterisk_Tests.ttcn

299 lines
8.1 KiB
Plaintext

module Asterisk_Tests {
/* Asterisk test suite in TTCN-3
* (C) 2024 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
* All rights reserved.
* Author: Pau Espin Pedrol <pespin@sysmocom.de>
*
* Released under the terms of GNU General Public License, Version 2 or
* (at your option) any later version.
*
* SPDX-License-Identifier: GPL-2.0-or-later
*/
import from TCCOpenSecurity_Functions all;
import from General_Types all;
import from Osmocom_Types all;
import from Native_Functions all;
import from Misc_Helpers all;
import from SDP_Types all;
import from SDP_Templates all;
import from SIP_Emulation all;
import from SIPmsg_Types all;
import from SIP_Templates all;
modulepar {
charstring mp_local_sip_host := "127.0.0.2";
integer mp_local_sip_port := 5060;
charstring mp_remote_sip_host := "127.0.0.1";
integer mp_remote_sip_port := 5060;
}
type component test_CT {
var SIP_Emulation_CT vc_SIP;
}
type component ConnHdlr extends SIP_ConnHdlr {
var ConnHdlrPars g_pars;
timer g_Tguard;
var PDU_SIP_Request g_rx_sip_req;
var PDU_SIP_Response g_rx_sip_resp;
}
type record ConnHdlrPars {
float t_guard,
charstring user,
charstring password,
SipUrl registrar_sip_url,
SipAddr registrar_sip_record,
CallidString registrar_sip_call_id,
Via registrar_via,
integer registrar_sip_seq_nr,
SipAddr sip_url_ext,
Contact local_contact,
CallPars cp optional
}
template (value) ConnHdlrPars t_Pars(charstring user,
charstring displayname := "\"Anonymous\"",
charstring password := "secret") := {
t_guard := 30.0,
user := user,
password := password,
registrar_sip_url := valueof(ts_SipUrlHost(mp_remote_sip_host)),
registrar_sip_record := ts_SipAddr(ts_HostPort(mp_remote_sip_host),
ts_UserInfo(user),
displayName := displayname),
registrar_sip_call_id := hex2str(f_rnd_hexstring(15)) & "@" & mp_local_sip_host,
registrar_via := ts_Via_from(ts_HostPort(mp_local_sip_host, mp_local_sip_port)),
registrar_sip_seq_nr := f_sip_rand_seq_nr(),
sip_url_ext := ts_SipAddr(ts_HostPort(mp_local_sip_host, mp_local_sip_port),
ts_UserInfo(user)),
local_contact := valueof(ts_Contact({
ts_ContactAddress(
ts_Addr_Union_SipUrl(ts_SipUrl(ts_HostPort(
mp_local_sip_host,
mp_local_sip_port),
ts_UserInfo(user))),
omit)
})),
cp := omit
}
function f_init_ConnHdlrPars(integer idx := 1) runs on test_CT return ConnHdlrPars {
var ConnHdlrPars pars := valueof(t_Pars("0" & int2str(500 + idx)));
return pars;
}
type record CallPars {
boolean is_mo,
charstring calling,
charstring called,
CallParsComputed comp optional,
charstring sip_rtp_addr,
uint16_t sip_rtp_port,
charstring cn_rtp_addr,
uint16_t cn_rtp_port
}
type record CallParsComputed {
CallidString sip_call_id,
charstring sip_body,
integer sip_seq_nr
}
private template (value) CallPars t_CallPars(boolean is_mo) := {
is_mo := is_mo,
calling := "12345",
called := "98766",
comp := {
sip_call_id := hex2str(f_rnd_hexstring(15)),
sip_body := "",
sip_seq_nr := f_sip_rand_seq_nr()
},
sip_rtp_addr := "1.2.3.4",
sip_rtp_port := 1234,
cn_rtp_addr := "5.6.7.8",
cn_rtp_port := 5678
}
function f_init() runs on test_CT {
f_init_sip(vc_SIP, "Asterisk_Test");
log("end of f_init");
}
type function void_fn(charstring id) runs on ConnHdlr;
function f_start_handler(void_fn fn, ConnHdlrPars pars)
runs on test_CT return ConnHdlr {
var ConnHdlr vc_conn;
var charstring id := testcasename();
vc_conn := ConnHdlr.create(id);
connect(vc_conn:SIP, vc_SIP:CLIENT);
connect(vc_conn:SIP_PROC, vc_SIP:CLIENT_PROC);
vc_conn.start(f_handler_init(fn, id, pars));
return vc_conn;
}
private altstep as_Tguard() runs on ConnHdlr {
[] g_Tguard.timeout {
setverdict(fail, "Tguard timeout");
mtc.stop;
}
}
private function f_handler_init(void_fn fn, charstring id, ConnHdlrPars pars)
runs on ConnHdlr {
g_pars := pars;
g_Tguard.start(pars.t_guard);
activate(as_Tguard());
// Make sure the UA is deregistered before starting the test:
// sends REGISTER with Contact = "*" and Expires = 0
//f_SIP_deregister();
/* call the user-supied test case function */
fn.apply(id);
}
altstep as_SIP_expect_req(template PDU_SIP_Request sip_expect) runs on ConnHdlr
{
[] SIP.receive(sip_expect) -> value g_rx_sip_req;
[] SIP.receive {
log("FAIL: expected SIP message ", sip_expect);
Misc_Helpers.f_shutdown(__BFILE__, __LINE__, fail, "Received unexpected SIP message");
}
}
altstep as_SIP_expect_resp(template PDU_SIP_Response sip_expect) runs on ConnHdlr
{
[] SIP.receive(sip_expect) -> value g_rx_sip_resp;
[] SIP.receive {
log("FAIL: expected SIP message ", sip_expect);
Misc_Helpers.f_shutdown(__BFILE__, __LINE__, fail, "Received unexpected SIP message");
}
}
private function f_tr_Via_response(Via via_req) return template (present) Via {
template (present) SemicolonParam_List via_resp_params := ?;
/*via_resp_params := {
{ id := "rport", paramValue := int2str(mp_remote_sip_port) },
{ id := "received", paramValue := mp_remote_sip_host }
}; */
return tr_Via_from(via_req.viaBody[0].sentBy,
via_resp_params);
}
private function f_tr_To_response(SipAddr to_req) return template (present) SipAddr {
return tr_SipAddr_from_val(to_req);
}
function f_SIP_register() runs on ConnHdlr return PDU_SIP_Response
{
var template (present) PDU_SIP_Response exp;
var Authorization authorization;
var Via via := g_pars.registrar_via;
var SipAddr from_sipaddr := g_pars.registrar_sip_record;
var charstring branch_value;
branch_value := f_sip_gen_branch(f_sip_SipAddr_to_str(g_pars.registrar_sip_record),
f_sip_SipAddr_to_str(g_pars.registrar_sip_record),
g_pars.registrar_sip_call_id,
g_pars.registrar_sip_seq_nr);
via.viaBody[0].viaParams := f_sip_param_set(via.viaBody[0].viaParams, "branch", branch_value);
from_sipaddr.params := f_sip_param_set(from_sipaddr.params, "tag", f_sip_rand_tag());
SIP.send(ts_SIP_REGISTER(g_pars.registrar_sip_url,
g_pars.registrar_sip_call_id,
from_sipaddr,
g_pars.registrar_sip_record,
via,
g_pars.registrar_sip_seq_nr,
g_pars.local_contact,
ts_Expires("7200")));
exp := tr_SIP_Response_REGISTER_Unauthorized(
g_pars.registrar_sip_call_id,
from_sipaddr,
f_tr_To_response(g_pars.registrar_sip_record),
f_tr_Via_response(via),
*,
tr_WwwAuthenticate({tr_Challenge_digestCln(?)}),
g_pars.registrar_sip_seq_nr);
as_SIP_expect_resp(exp);
/* Digest Auth: RFC 2617 */
authorization := f_sip_digest_gen_Authorization(g_rx_sip_resp.msgHeader.wwwAuthenticate,
g_pars.user, g_pars.password,
"REGISTER", "sip:" & mp_remote_sip_host)
/* New transaction: */
g_pars.registrar_sip_seq_nr := g_pars.registrar_sip_seq_nr + 1;
branch_value := f_sip_gen_branch(f_sip_SipAddr_to_str(g_pars.registrar_sip_record),
f_sip_SipAddr_to_str(g_pars.registrar_sip_record),
g_pars.registrar_sip_call_id,
g_pars.registrar_sip_seq_nr);
via.viaBody[0].viaParams := f_sip_param_set(via.viaBody[0].viaParams, "branch", branch_value);
SIP.send(ts_SIP_REGISTER(g_pars.registrar_sip_url,
g_pars.registrar_sip_call_id,
from_sipaddr,
g_pars.registrar_sip_record,
via,
g_pars.registrar_sip_seq_nr,
g_pars.local_contact,
ts_Expires("7200"),
authorization := authorization));
/* Wait for OK answer */
exp := tr_SIP_Response(
g_pars.registrar_sip_call_id,
from_sipaddr,
f_tr_To_response(g_pars.registrar_sip_record),
f_tr_Via_response(via),
*,
"REGISTER", 200,
g_pars.registrar_sip_seq_nr, "OK");
as_SIP_expect_resp(exp);
/* Prepare for next use: */
g_pars.registrar_sip_seq_nr := g_pars.registrar_sip_seq_nr + 1;
return g_rx_sip_resp;
}
/* Test SIP registration of local clients */
private function f_TC_internal_registration(charstring id) runs on ConnHdlr {
f_SIP_register();
// f_SIP_deregister();
setverdict(pass);
}
testcase TC_internal_registration() runs on test_CT {
var ConnHdlrPars pars;
var ConnHdlr vc_conn;
f_init();
pars := f_init_ConnHdlrPars();
vc_conn := f_start_handler(refers(f_TC_internal_registration), pars);
vc_conn.done;
}
testcase TC_selftest() runs on test_CT {
f_sip_digest_selftest();
setverdict(pass);
}
control {
execute( TC_internal_registration() );
}
}