osmo-ttcn3-hacks/cbc/ECBE_Components.ttcn

140 lines
3.8 KiB
Plaintext

/* ECBE (REST) interface client of osmo-cbc test suite in TTCN-3
* (C) 2022 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
* All rights reserved.
*
* Released under the terms of GNU General Public License, Version 2 or
* (at your option) any later version.
*
* SPDX-License-Identifier: GPL-2.0-or-later
*/
module ECBE_Components {
import from Osmocom_Types all;
import from HTTP_Adapter all;
import from HTTPmsg_Types all;
import from ECBE_Types all;
import from CBSP_Types all;
import from CBS_Message all;
private function f_cbs2ecbe_category(CBSP_Category cat_in) return EcbeCategory
{
select (cat_in) {
case (CBSP_CATEG_HIGH_PRIO) { return high_priority; }
case (CBSP_CATEG_BACKGROUND) { return background; }
case (CBSP_CATEG_NORMAL) { return normal; }
case else { mtc.stop }
}
}
private function f_cbs2ecbe_page(CBS_MessageContent inp) return EcbePage
{
return hex2str(oct2hex(inp.payload));
}
private function f_cbs2ecbe_etws(CBS_Message inp, charstring cbe_name) return EcbeCbcMessage
{
var EcbeWarningTypeDecoded warn_type;
int2enum(inp.msg_id - 4352, warn_type);
var EcbeCbcMessage ret := {
cbe_name := cbe_name,
category := f_cbs2ecbe_category(inp.category),
repetition_period := inp.rep_period,
num_of_bcast := inp.num_bcast_req,
scope := { scope_plmn := {} },
smscb_message := {
serial_nr := {
serial_nr_encoded := inp.ser_nr
},
message_id := inp.msg_id,
payload := {
payload_etws := {
warning_type := {
warning_type_decoded := warn_type
},
emergency_user_alert := true,
popup_on_display := true,
warning_sec_info := omit
}
}
}
};
return ret;
}
/* convert from CBS_Message to EcbeCbcMessage */
function f_cbs2ecbe(CBS_Message inp, charstring cbe_name) return EcbeCbcMessage
{
if (msg_id_is_etws(inp.msg_id)) {
return f_cbs2ecbe_etws(inp, cbe_name);
}
var EcbeCbcMessage ret := {
cbe_name := cbe_name,
category := f_cbs2ecbe_category(inp.category),
repetition_period := inp.rep_period,
num_of_bcast := inp.num_bcast_req,
scope := { scope_plmn := {} },
smscb_message := {
serial_nr := {
serial_nr_encoded := inp.ser_nr
},
message_id := inp.msg_id,
payload := {
payload_encoded := {
dcs := inp.dcs,
pages := { } /* appended below */
}
}
}
};
for (var integer i := 0; i < lengthof(inp.content); i := i+1) {
ret.smscb_message.payload.payload_encoded.pages :=
ret.smscb_message.payload.payload_encoded.pages & { f_cbs2ecbe_page(inp.content[i]) };
}
return ret;
}
/*********************************************************************************
* ECBE (REST) interface
*********************************************************************************/
function f_ecbe_tx_post_cbs(EcbeCbcMessage cbc)
runs on http_CT {
var charstring body := oct2char(enc_EcbeCbcMessage(cbc));
log("TX POST CBS: ", body);
var HTTPMessage http_resp;
f_http_tx_request(url := "/api/ecbe/v1/message", method := "POST", body := body);
}
function f_ecbe_rx_resp(template integer exp_sts := (200..299))
runs on http_CT return HTTPResponse {
var HTTPMessage http_resp := f_http_rx_response(tr_HTTP_Resp(exp_sts), tout := 20.0);
return http_resp.response;
}
/* run a HTTP POST to add a new CBC message */
function f_ecbe_post_cbs(EcbeCbcMessage cbc, template integer exp_sts := 201)
runs on http_CT return HTTPResponse {
f_ecbe_tx_post_cbs(cbc);
return f_ecbe_rx_resp(exp_sts)
}
function f_ecbe_tx_delete_cbs(integer msg_id)
runs on http_CT {
f_http_tx_request("/api/ecbe/v1/message/" & int2str(msg_id), method := "DELETE");
}
/* run a HTTP GET on specified URL expecting json in RSRES format as response */
function f_ecbe_delete_cbs(integer msg_id, template integer exp_sts := 200)
runs on http_CT return HTTPResponse {
f_ecbe_tx_delete_cbs(msg_id);
return f_ecbe_rx_resp(exp_sts);
}
}