osmo-ttcn3-hacks/asterisk/AMI_Functions.ttcn

183 lines
5.3 KiB
Plaintext

/* Asterisk's AMI interface functions in TTCN-3
* (C) 2024 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
* Author: Pau Espin Pedrol <pespin@sysmocom.de>
* All rights reserved.
*
* Released under the terms of GNU General Public License, Version 2 or
* (at your option) any later version.
*
* SPDX-License-Identifier: GPL-2.0-or-later
*/
/*
* https://docs.asterisk.org/Configuration/Interfaces/Asterisk-Manager-Interface-AMI/AMI-v2-Specification/
*/
module AMI_Functions {
import from Misc_Helpers all;
import from TELNETasp_PortType all;
import from Osmocom_Types all;
import from TCCConversion_Functions all;
import from Socket_API_Definitions all;
modulepar {
float mp_ami_prompt_timeout := 10.0;
}
const charstring AMI_FIELD_ACTION := "Action";
const charstring AMI_FIELD_USERNAME := "Username";
const charstring AMI_FIELD_SECRET := "Secret";
const charstring AMI_FIELD_RESPONSE := "Response";
type record AMI_Field {
charstring key,
charstring val
} with {
encode "TEXT"
variant "SEPARATOR(': ', ':\s+')"
};
type set of AMI_Field AMI_Msg with {
encode "TEXT"
variant "SEPARATOR('\r\n', '(\r\n)|[\n]')"
variant "END('\r\n', '(\r\n)|[\n]')"
};
external function enc_AMI_Msg(in AMI_Msg msg) return charstring
with { extension "prototype(convert) encode(TEXT)" }
external function dec_AMI_Msg(in charstring stream) return AMI_Msg
with { extension "prototype(convert) decode(TEXT)" }
template (value) AMI_Field
ts_AMI_Field(template (value) charstring key,
template (value) charstring val) := {
key := key,
val := val
};
template (present) AMI_Field
tr_AMI_Field(template (present) charstring key := ?,
template (present) charstring val := ?) := {
key := key,
val := val
};
/*
* Field Templates:
*/
template (value) AMI_Field
ts_AMI_Field_Action(template (value) charstring val) := ts_AMI_Field(AMI_FIELD_ACTION, val);
template (value) AMI_Field
ts_AMI_Field_Username(template (value) charstring val) := ts_AMI_Field(AMI_FIELD_USERNAME, val);
template (value) AMI_Field
ts_AMI_Field_Secret(template (value) charstring val) := ts_AMI_Field(AMI_FIELD_SECRET, val);
template (present) AMI_Field
tr_AMI_Field_Action(template (present) charstring val := ?) := tr_AMI_Field(AMI_FIELD_ACTION, val);
template (present) AMI_Field
tr_AMI_Field_Username(template (present) charstring val := ?) := tr_AMI_Field(AMI_FIELD_USERNAME, val);
template (present) AMI_Field
tr_AMI_Field_Secret(template (present) charstring val := ?) := tr_AMI_Field(AMI_FIELD_SECRET, val);
template (present) AMI_Field
tr_AMI_Field_Response(template (present) charstring val := ?) := tr_AMI_Field(AMI_FIELD_RESPONSE, val);
template (present) AMI_Field
tr_AMI_Field_ResponseSuccess := tr_AMI_Field(AMI_FIELD_RESPONSE, "Success");
/*
* Message Templates:
*/
template (value) AMI_Msg
ts_AMI_Action_Login(charstring username, charstring secret) := {
ts_AMI_Field_Action("Login"),
ts_AMI_Field_Username(username),
ts_AMI_Field_Secret(secret)
};
template (present) AMI_Msg
tr_AMI_Action_Login(template(present) charstring username := ?,
template(present) charstring secret := ?) := superset(
tr_AMI_Field_Action("Login"),
tr_AMI_Field_Username(username),
tr_AMI_Field_Secret(secret)
);
template (present) AMI_Msg
tr_AMI_Response_Success := superset(
tr_AMI_Field_ResponseSuccess
);
/*
* Functions:
*/
private function f_ami_wait_for_prompt_str(TELNETasp_PT pt, charstring log_label := "(?)")
return charstring {
var charstring rx, buf := "";
var integer fd;
timer T;
T.start(mp_ami_prompt_timeout);
alt {
[] pt.receive(pattern "\n") { buf := buf & "\n" };
[] pt.receive(charstring:?) -> value rx { buf := buf & rx; repeat };
[] pt.receive(integer:?) -> value fd {
if (fd == -1) {
Misc_Helpers.f_shutdown(__BFILE__, __LINE__, fail,
"AMI Telnet Connection Failure: " & log_label);
} else {
repeat; /* telnet connection succeeded */
}
}
[] T.timeout {
Misc_Helpers.f_shutdown(__BFILE__, __LINE__, fail,
"AMI Timeout for prompt: " & log_label);
};
}
T.stop;
return buf;
}
function f_ami_wait_for_prompt(TELNETasp_PT pt, charstring log_label := "(?)") return AMI_Msg {
var charstring buf := f_ami_wait_for_prompt_str(pt, log_label);
var AMI_Msg msg := dec_AMI_Msg(buf);
return msg;
}
/* send a AMI command and obtain response until prompt is received */
private function f_ami_transceive_ret_str(TELNETasp_PT pt, charstring tx) return charstring {
pt.send(tx);
return f_ami_wait_for_prompt_str(pt, tx);
}
function f_ami_transceive_ret(TELNETasp_PT pt, template (value) AMI_Msg tx_msg) return AMI_Msg {
var charstring tx_txt := enc_AMI_Msg(valueof(tx_msg));
var charstring resp_txt := f_ami_transceive_ret_str(pt, tx_txt);
return dec_AMI_Msg(resp_txt);
}
function f_ami_transceive_match(TELNETasp_PT pt,
template (value) AMI_Msg tx_msg,
template (present) AMI_Msg exp_ret := ?) {
var AMI_Msg ret := f_ami_transceive_ret(pt, tx_msg);
if (not match(ret, exp_ret)) {
Misc_Helpers.f_shutdown(__BFILE__, __LINE__, fail,
log2str("Non-matching AMI response: ", ret, " vs exp: ", exp_ret));
}
}
function f_ami_transceive_match_response_success(TELNETasp_PT pt,
template (value) AMI_Msg tx_msg) {
f_ami_transceive_match(pt, tx_msg, tr_AMI_Response_Success);
}
function f_ami_action_login(TELNETasp_PT pt, charstring username, charstring secret) {
f_ami_transceive_match_response_success(pt, ts_AMI_Action_Login(username, secret));
}
}