titan.TestPorts.USB/src/USB_Component.ttcn

307 lines
9.4 KiB
Plaintext

module USB_Component {
import from General_Types all;
import from USB_Types all;
import from USB_Templates all;
import from USB_PortType all;
import from USB_PortTypes all;
type component USB_CT {
port USB_PT USB;
var integer g_dev_hdl := 42;
var integer g_next_req_hdl := 13;
timer g_USB_Tguard := 5.0;
}
type record USB_Device_Match_vidpid {
HEX4n vid,
HEX4n pid
};
type record USB_Device_Match_path {
charstring path
};
type union USB_Device_Match {
USB_Device_Match_vidpid vid_pid,
USB_Device_Match_path path
};
function f_usb_init_vid_pid(HEX4n vendor_id, HEX4n product_id) runs on USB_CT {
map(self:USB, system:USB);
var USB_result res;
var integer req_hdl := f_usb_get_req_hdl();
USB.send(ts_USB_open_vid_pid(hex2int(vendor_id), hex2int(product_id),
device_hdl := g_dev_hdl, req_hdl := req_hdl));
g_USB_Tguard.start;
alt {
[] USB.receive(tr_USB_result(req_hdl := req_hdl, device_hdl := g_dev_hdl)) -> value res {
log("Received ", res);
}
[] USB.receive {
testcase.stop("Couldn't open requested USB device");
}
}
g_USB_Tguard.stop;
}
function f_usb_init_path(charstring path) runs on USB_CT {
map(self:USB, system:USB);
var USB_result res;
var integer req_hdl := f_usb_get_req_hdl();
USB.send(ts_USB_open_path(path, device_hdl := g_dev_hdl, req_hdl := req_hdl));
g_USB_Tguard.start;
alt {
[] USB.receive(tr_USB_result(req_hdl := req_hdl, device_hdl := g_dev_hdl)) -> value res {
log("Received ", res);
}
[] USB.receive {
testcase.stop("Couldn't open requested USB device");
}
}
g_USB_Tguard.stop;
}
private altstep as_Tguard() runs on USB_CT {
[] g_USB_Tguard.timeout {
testcase.stop("Timeout of global USB guard timer");
}
}
function f_usb_init(USB_Device_Match udm) runs on USB_CT {
activate(as_Tguard());
if (ischosen(udm.vid_pid)) {
f_usb_init_vid_pid(udm.vid_pid.vid, udm.vid_pid.pid);
} else if (ischosen(udm.path)) {
f_usb_init_path(udm.path.path);
} else {
testcase.stop("Unsupported USB_Device_Match");
}
}
template (value) USB_transfer ts_UsbXfer_DevReq(template (value) USB_DeviceRequest req,
integer device_hdl, integer xfer_hdl) := {
device_hdl := device_hdl,
transfer_hdl := xfer_hdl,
endpoint := 0,
ttype := USB_TRANSFER_TYPE_CONTROL,
data := enc_USB_DeviceRequest(valueof(req)),
timeout_msec := 1000
}
template USB_transfer_compl tr_UsbXfer_compl(template USB_endpoint ep := ?,
template USB_transfer_type ttype := ?,
template USB_transfer_status sts := ?,
template integer device_hdl := ?,
template integer xfer_hdl := ?) := {
device_hdl := device_hdl,
transfer_hdl := xfer_hdl,
endpoint := ep,
ttype := ttype,
data := ?,
status := sts
}
function f_usb_get_req_hdl() runs on USB_CT return integer
{
var integer i := g_next_req_hdl;
g_next_req_hdl := g_next_req_hdl + 1;
return i;
}
function f_usb_claim_interface(integer dev_hdl, integer bInterface)
runs on USB_CT {
var USB_result res;
var integer req_hdl := f_usb_get_req_hdl();
g_USB_Tguard.start;
USB.send(ts_USB_claim_interface(dev_hdl, bInterface, req_hdl));
USB.receive(USB_result:{req_hdl, dev_hdl, ?}) -> value res { log("Received ", res); }
g_USB_Tguard.stop;
}
function f_usb_set_configuration(integer dev_hdl, integer bConfigurationValue)
runs on USB_CT {
var USB_result res;
var integer req_hdl := f_usb_get_req_hdl();
g_USB_Tguard.start;
USB.send(ts_USB_set_configuration(dev_hdl, bConfigurationValue, req_hdl));
USB.receive(USB_result:{req_hdl, dev_hdl, ?}) -> value res { log("Received ", res); }
g_USB_Tguard.stop;
}
function f_usb_reset_device()
runs on USB_CT {
var USB_result res;
var integer req_hdl := f_usb_get_req_hdl();
g_USB_Tguard.start;
USB.send(ts_USB_reset_device(g_dev_hdl, req_hdl));
USB.receive(USB_result:{req_hdl, g_dev_hdl, ?}) -> value res { log("Received USB reset response", res); }
g_USB_Tguard.stop;
}
/* Send a USB Device Request and wait for its completion to arrive */
function f_usb_dev_req(template (value) USB_DeviceRequest req)
runs on USB_CT return USB_transfer_compl
{
var USB_transfer_compl tc;
var integer req_hdl := f_usb_get_req_hdl();
USB.send(ts_UsbXfer_DevReq(req, g_dev_hdl, req_hdl));
g_USB_Tguard.start;
alt {
[] USB.receive(tr_UsbXfer_compl(0, ?, ?, g_dev_hdl, req_hdl)) -> value tc { }
[] USB.receive {
testcase.stop("Received unexpected primitive from USB");
}
}
g_USB_Tguard.stop;
return tc;
}
/* Send a USB Device Request and expect it to fail with a certain transfer status */
function f_usb_dev_req_exp_fail(template (value) USB_DeviceRequest req,
template USB_transfer_status sts := USB_TRANSFER_STALL)
runs on USB_CT
{
var USB_transfer_compl tc := f_usb_dev_req(req);
if (match(tc.status, sts)) {
setverdict(pass);
} else {
setverdict(fail, "Unexpected USB transfer status ", tc.status);
}
}
function f_usb_get_desc(USB_DescriptorType dtype, integer idx, integer len)
runs on USB_CT return octetstring
{
var USB_transfer_compl tc;
var integer req_hdl := f_usb_get_req_hdl();
USB.send(ts_UsbXfer_DevReq(ts_DevReq_GET_DESCRIPTOR(dtype, idx, len), g_dev_hdl, req_hdl));
g_USB_Tguard.start;
alt {
[] USB.receive(tr_UsbXfer_compl(0, USB_TRANSFER_TYPE_CONTROL, USB_TRANSFER_COMPLETED, g_dev_hdl, req_hdl)) -> value tc {
var octetstring raw_desc := substr(tc.data, 8, lengthof(tc.data)-8);
g_USB_Tguard.stop;
return raw_desc;
}
[] USB.receive(tr_UsbXfer_compl(0, ?, ?, g_dev_hdl, req_hdl)) -> value tc{
setverdict(fail, "Transfer completed unsuccessfully: ", tc);
}
}
return ''O;
}
function f_usb_get_desc_std(USB_DescriptorType dtype, integer idx, integer len)
runs on USB_CT return USB_StandardDescriptors
{
var octetstring raw := f_usb_get_desc(dtype, idx, len);
return dec_USB_StandardDescriptors(raw);
}
function f_usb_get_desc_exp_fail(USB_DescriptorType dtype, integer idx, integer len,
template USB_transfer_status exp_status := USB_TRANSFER_STALL)
runs on USB_CT
{
f_usb_dev_req_exp_fail(ts_DevReq_GET_DESCRIPTOR(dtype, idx, len), exp_status);
}
type record USB_Descriptor_Node {
USB_StandardDescriptor desc,
USB_Descriptor_Nodes children
};
type record of USB_Descriptor_Node USB_Descriptor_Nodes;
/* Generata hierarchical tree of USB Descriptors; easier to parse for most use cases */
function f_usb_get_desc_tree() runs on USB_CT return USB_Descriptor_Node
{
var USB_Descriptor_Node device_node;
var USB_StandardDescriptors dev_descs;
var integer i, j, cur_config, cur_interface;
dev_descs := f_usb_get_desc_std(USB_DescriptorType_DEVICE, 0, 255);
device_node.desc := dev_descs[0];
for (i := 0; i < device_node.desc.device.bNumConfigurations; i := i+1) {
var USB_StandardDescriptors cfg_descs;
cfg_descs := f_usb_get_desc_std(USB_DescriptorType_CONFIGURATION, i, 255);
for (j := 0; j < lengthof(cfg_descs); j:= j+1) {
var USB_StandardDescriptor desc := cfg_descs[j];
log("Descriptor ", j, ": ", desc);
if (ischosen(desc.configuration)) {
cur_config := desc.configuration.bConfigurationValue;
cur_interface := 0;
device_node.children[cur_config] := { desc := desc, children := {} };
} else if (ischosen(desc.interface)) {
cur_interface := desc.interface.bInterfaceNumber;
device_node.children[cur_config].children[cur_interface] :=
{ desc := desc, children := {} };
} else {
var integer next := lengthof(device_node.children[cur_config].children[cur_interface].children);
device_node.children[cur_config].children[cur_interface].children[next] :=
{ desc := desc, children := {} };
}
}
}
return device_node;
}
function f_usb_get_config()
runs on USB_CT return integer {
var USB_transfer_compl tc;
var integer req_hdl := f_usb_get_req_hdl();
USB.send(ts_UsbXfer_DevReq(ts_DevReq_GET_CONFIGURATION, g_dev_hdl, req_hdl));
g_USB_Tguard.start;
alt {
[] USB.receive(tr_UsbXfer_compl(0, USB_TRANSFER_TYPE_CONTROL, USB_TRANSFER_COMPLETED, g_dev_hdl, req_hdl)) -> value tc {
var octetstring raw_desc := substr(tc.data, 8, lengthof(tc.data)-8);
g_USB_Tguard.stop;
return oct2int(raw_desc[0]);
}
[] USB.receive(tr_UsbXfer_compl(0, ?, ?, g_dev_hdl, req_hdl)) -> value tc{
setverdict(fail, "Transfer completed unsuccessfully: ", tc);
}
}
return -1;
};
function f_usb_get_interface(integer intf)
runs on USB_CT return integer {
var USB_transfer_compl tc;
var integer req_hdl := f_usb_get_req_hdl();
USB.send(ts_UsbXfer_DevReq(ts_DevReq_GET_INTERFACE(intf), g_dev_hdl, req_hdl));
g_USB_Tguard.start;
alt {
[] USB.receive(tr_UsbXfer_compl(0, USB_TRANSFER_TYPE_CONTROL, USB_TRANSFER_COMPLETED, g_dev_hdl, req_hdl)) -> value tc {
var octetstring raw_desc := substr(tc.data, 8, lengthof(tc.data)-8);
g_USB_Tguard.stop;
return oct2int(raw_desc[0]);
}
[] USB.receive(tr_UsbXfer_compl(0, ?, ?, g_dev_hdl, req_hdl)) -> value tc{
setverdict(fail, "Transfer completed unsuccessfully: ", tc);
}
}
return -1;
};
function f_usb_get_status(USB_RequestType_Recipient recipient, u16le_t wIndex)
runs on USB_CT return integer
{
var USB_transfer_compl tc;
var integer req_hdl := f_usb_get_req_hdl();
USB.send(ts_UsbXfer_DevReq(ts_DevReq_GET_STATUS(recipient, wIndex), g_dev_hdl, req_hdl));
g_USB_Tguard.start;
alt {
[] USB.receive(tr_UsbXfer_compl(0, USB_TRANSFER_TYPE_CONTROL, USB_TRANSFER_COMPLETED, g_dev_hdl, req_hdl)) -> value tc {
var octetstring raw_desc := substr(tc.data, 8, lengthof(tc.data)-8);
g_USB_Tguard.stop;
return oct2int(raw_desc);
}
[] USB.receive(tr_UsbXfer_compl(0, USB_TRANSFER_TYPE_CONTROL, ?, g_dev_hdl, req_hdl)) -> value tc{
setverdict(fail, "Transfer completed unsuccessfully: ", tc);
}
}
return -1;
}
}