module USB_Component { import from General_Types all; import from USB_Types all; import from USB_Templates all; import from USB_PortType all; import from USB_PortTypes all; type component USB_CT { port USB_PT USB; var integer g_dev_hdl := 42; var integer g_next_req_hdl := 13; timer g_USB_Tguard := 5.0; } type record USB_Device_Match_vidpid { HEX4n vid, HEX4n pid }; type record USB_Device_Match_path { charstring path }; type union USB_Device_Match { USB_Device_Match_vidpid vid_pid, USB_Device_Match_path path }; function f_usb_init_vid_pid(HEX4n vendor_id, HEX4n product_id) runs on USB_CT { map(self:USB, system:USB); var USB_result res; var integer req_hdl := f_usb_get_req_hdl(); USB.send(ts_USB_open_vid_pid(hex2int(vendor_id), hex2int(product_id), device_hdl := g_dev_hdl, req_hdl := req_hdl)); g_USB_Tguard.start; alt { [] USB.receive(tr_USB_result(req_hdl := req_hdl, device_hdl := g_dev_hdl)) -> value res { log("Received ", res); } [] USB.receive { testcase.stop("Couldn't open requested USB device"); } } g_USB_Tguard.stop; } function f_usb_init_path(charstring path) runs on USB_CT { map(self:USB, system:USB); var USB_result res; var integer req_hdl := f_usb_get_req_hdl(); USB.send(ts_USB_open_path(path, device_hdl := g_dev_hdl, req_hdl := req_hdl)); g_USB_Tguard.start; alt { [] USB.receive(tr_USB_result(req_hdl := req_hdl, device_hdl := g_dev_hdl)) -> value res { log("Received ", res); } [] USB.receive { testcase.stop("Couldn't open requested USB device"); } } g_USB_Tguard.stop; } private altstep as_Tguard() runs on USB_CT { [] g_USB_Tguard.timeout { testcase.stop("Timeout of global USB guard timer"); } } function f_usb_init(USB_Device_Match udm) runs on USB_CT { activate(as_Tguard()); if (ischosen(udm.vid_pid)) { f_usb_init_vid_pid(udm.vid_pid.vid, udm.vid_pid.pid); } else if (ischosen(udm.path)) { f_usb_init_path(udm.path.path); } else { testcase.stop("Unsupported USB_Device_Match"); } } template (value) USB_transfer ts_UsbXfer_DevReq(template (value) USB_DeviceRequest req, integer device_hdl, integer xfer_hdl) := { device_hdl := device_hdl, transfer_hdl := xfer_hdl, endpoint := 0, ttype := USB_TRANSFER_TYPE_CONTROL, data := enc_USB_DeviceRequest(valueof(req)), timeout_msec := 1000 } template USB_transfer_compl tr_UsbXfer_compl(template USB_endpoint ep := ?, template USB_transfer_type ttype := ?, template USB_transfer_status sts := ?, template integer device_hdl := ?, template integer xfer_hdl := ?) := { device_hdl := device_hdl, transfer_hdl := xfer_hdl, endpoint := ep, ttype := ttype, data := ?, status := sts } function f_usb_get_req_hdl() runs on USB_CT return integer { var integer i := g_next_req_hdl; g_next_req_hdl := g_next_req_hdl + 1; return i; } function f_usb_claim_interface(integer dev_hdl, integer bInterface) runs on USB_CT { var USB_result res; var integer req_hdl := f_usb_get_req_hdl(); g_USB_Tguard.start; USB.send(ts_USB_claim_interface(dev_hdl, bInterface, req_hdl)); USB.receive(USB_result:{req_hdl, dev_hdl, ?}) -> value res { log("Received ", res); } g_USB_Tguard.stop; } function f_usb_set_configuration(integer dev_hdl, integer bConfigurationValue) runs on USB_CT { var USB_result res; var integer req_hdl := f_usb_get_req_hdl(); g_USB_Tguard.start; USB.send(ts_USB_set_configuration(dev_hdl, bConfigurationValue, req_hdl)); USB.receive(USB_result:{req_hdl, dev_hdl, ?}) -> value res { log("Received ", res); } g_USB_Tguard.stop; } function f_usb_reset_device() runs on USB_CT { var USB_result res; var integer req_hdl := f_usb_get_req_hdl(); g_USB_Tguard.start; USB.send(ts_USB_reset_device(g_dev_hdl, req_hdl)); USB.receive(USB_result:{req_hdl, g_dev_hdl, ?}) -> value res { log("Received USB reset response", res); } g_USB_Tguard.stop; } /* Send a USB Device Request and wait for its completion to arrive */ function f_usb_dev_req(template (value) USB_DeviceRequest req) runs on USB_CT return USB_transfer_compl { var USB_transfer_compl tc; var integer req_hdl := f_usb_get_req_hdl(); USB.send(ts_UsbXfer_DevReq(req, g_dev_hdl, req_hdl)); g_USB_Tguard.start; alt { [] USB.receive(tr_UsbXfer_compl(0, ?, ?, g_dev_hdl, req_hdl)) -> value tc { } [] USB.receive { testcase.stop("Received unexpected primitive from USB"); } } g_USB_Tguard.stop; return tc; } /* Send a USB Device Request and expect it to fail with a certain transfer status */ function f_usb_dev_req_exp_fail(template (value) USB_DeviceRequest req, template USB_transfer_status sts := USB_TRANSFER_STALL) runs on USB_CT { var USB_transfer_compl tc := f_usb_dev_req(req); if (match(tc.status, sts)) { setverdict(pass); } else { setverdict(fail, "Unexpected USB transfer status ", tc.status); } } function f_usb_get_desc(USB_DescriptorType dtype, integer idx, integer len) runs on USB_CT return octetstring { var USB_transfer_compl tc; var integer req_hdl := f_usb_get_req_hdl(); USB.send(ts_UsbXfer_DevReq(ts_DevReq_GET_DESCRIPTOR(dtype, idx, len), g_dev_hdl, req_hdl)); g_USB_Tguard.start; alt { [] USB.receive(tr_UsbXfer_compl(0, USB_TRANSFER_TYPE_CONTROL, USB_TRANSFER_COMPLETED, g_dev_hdl, req_hdl)) -> value tc { var octetstring raw_desc := substr(tc.data, 8, lengthof(tc.data)-8); g_USB_Tguard.stop; return raw_desc; } [] USB.receive(tr_UsbXfer_compl(0, ?, ?, g_dev_hdl, req_hdl)) -> value tc{ setverdict(fail, "Transfer completed unsuccessfully: ", tc); } } return ''O; } function f_usb_get_desc_std(USB_DescriptorType dtype, integer idx, integer len) runs on USB_CT return USB_StandardDescriptors { var octetstring raw := f_usb_get_desc(dtype, idx, len); return dec_USB_StandardDescriptors(raw); } function f_usb_get_desc_exp_fail(USB_DescriptorType dtype, integer idx, integer len, template USB_transfer_status exp_status := USB_TRANSFER_STALL) runs on USB_CT { f_usb_dev_req_exp_fail(ts_DevReq_GET_DESCRIPTOR(dtype, idx, len), exp_status); } type record USB_Descriptor_Node { USB_StandardDescriptor desc, USB_Descriptor_Nodes children }; type record of USB_Descriptor_Node USB_Descriptor_Nodes; /* Generata hierarchical tree of USB Descriptors; easier to parse for most use cases */ function f_usb_get_desc_tree() runs on USB_CT return USB_Descriptor_Node { var USB_Descriptor_Node device_node; var USB_StandardDescriptors dev_descs; var integer i, j, cur_config, cur_interface; dev_descs := f_usb_get_desc_std(USB_DescriptorType_DEVICE, 0, 255); device_node.desc := dev_descs[0]; for (i := 0; i < device_node.desc.device.bNumConfigurations; i := i+1) { var USB_StandardDescriptors cfg_descs; cfg_descs := f_usb_get_desc_std(USB_DescriptorType_CONFIGURATION, i, 255); for (j := 0; j < lengthof(cfg_descs); j:= j+1) { var USB_StandardDescriptor desc := cfg_descs[j]; log("Descriptor ", j, ": ", desc); if (ischosen(desc.configuration)) { cur_config := desc.configuration.bConfigurationValue; cur_interface := 0; device_node.children[cur_config] := { desc := desc, children := {} }; } else if (ischosen(desc.interface)) { cur_interface := desc.interface.bInterfaceNumber; device_node.children[cur_config].children[cur_interface] := { desc := desc, children := {} }; } else { var integer next := lengthof(device_node.children[cur_config].children[cur_interface].children); device_node.children[cur_config].children[cur_interface].children[next] := { desc := desc, children := {} }; } } } return device_node; } function f_usb_get_config() runs on USB_CT return integer { var USB_transfer_compl tc; var integer req_hdl := f_usb_get_req_hdl(); USB.send(ts_UsbXfer_DevReq(ts_DevReq_GET_CONFIGURATION, g_dev_hdl, req_hdl)); g_USB_Tguard.start; alt { [] USB.receive(tr_UsbXfer_compl(0, USB_TRANSFER_TYPE_CONTROL, USB_TRANSFER_COMPLETED, g_dev_hdl, req_hdl)) -> value tc { var octetstring raw_desc := substr(tc.data, 8, lengthof(tc.data)-8); g_USB_Tguard.stop; return oct2int(raw_desc[0]); } [] USB.receive(tr_UsbXfer_compl(0, ?, ?, g_dev_hdl, req_hdl)) -> value tc{ setverdict(fail, "Transfer completed unsuccessfully: ", tc); } } return -1; }; function f_usb_get_interface(integer intf) runs on USB_CT return integer { var USB_transfer_compl tc; var integer req_hdl := f_usb_get_req_hdl(); USB.send(ts_UsbXfer_DevReq(ts_DevReq_GET_INTERFACE(intf), g_dev_hdl, req_hdl)); g_USB_Tguard.start; alt { [] USB.receive(tr_UsbXfer_compl(0, USB_TRANSFER_TYPE_CONTROL, USB_TRANSFER_COMPLETED, g_dev_hdl, req_hdl)) -> value tc { var octetstring raw_desc := substr(tc.data, 8, lengthof(tc.data)-8); g_USB_Tguard.stop; return oct2int(raw_desc[0]); } [] USB.receive(tr_UsbXfer_compl(0, ?, ?, g_dev_hdl, req_hdl)) -> value tc{ setverdict(fail, "Transfer completed unsuccessfully: ", tc); } } return -1; }; function f_usb_get_status(USB_RequestType_Recipient recipient, u16le_t wIndex) runs on USB_CT return integer { var USB_transfer_compl tc; var integer req_hdl := f_usb_get_req_hdl(); USB.send(ts_UsbXfer_DevReq(ts_DevReq_GET_STATUS(recipient, wIndex), g_dev_hdl, req_hdl)); g_USB_Tguard.start; alt { [] USB.receive(tr_UsbXfer_compl(0, USB_TRANSFER_TYPE_CONTROL, USB_TRANSFER_COMPLETED, g_dev_hdl, req_hdl)) -> value tc { var octetstring raw_desc := substr(tc.data, 8, lengthof(tc.data)-8); g_USB_Tguard.stop; return oct2int(raw_desc); } [] USB.receive(tr_UsbXfer_compl(0, USB_TRANSFER_TYPE_CONTROL, ?, g_dev_hdl, req_hdl)) -> value tc{ setverdict(fail, "Transfer completed unsuccessfully: ", tc); } } return -1; } }