stm32: usb gadget0: implement loopback

The loopback functionality was never implemented, not for regular bulk
endpoints.  By adding it, and adding pairs of endpoints, we can easily
catch buffer management problems.  These tests currently fail on
st_usbfs devices.

This did require renumbering the endpoints, as dwc_otg_fs only offers
three endpoints in each direction, and they can't be arbitrary numbers,
unlike on st_usbfs.

See https://github.com/libopencm3/libopencm3/pull/880 and related tickets.
This commit is contained in:
Karl Palsson 2018-08-17 00:02:29 +00:00
parent 0e58ee2f65
commit 144911a25b
2 changed files with 114 additions and 13 deletions

View File

@ -1,5 +1,6 @@
import array
import datetime
import random
import usb.core
import usb.util as uu
import sys
@ -181,6 +182,78 @@ class TestConfigSourceSink(unittest.TestCase):
self.assertIn("Pipe", e.strerror)
class TestConfigLoopBack(unittest.TestCase):
"""
We could inherit, but it doesn't save much, and this saves me from remembering how to call super.
"""
def setUp(self):
self.dev = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID, custom_match=find_by_serial(DUT_SERIAL))
self.assertIsNotNone(self.dev, "Couldn't find locm3 gadget0 device")
self.cfg = uu.find_descriptor(self.dev, bConfigurationValue=3)
self.assertIsNotNone(self.cfg, "Config 3 should exist")
self.dev.set_configuration(self.cfg)
self.intf = self.cfg[(0, 0)]
# heh, kinda gross...
self.eps_out = [ep for ep in self.intf if uu.endpoint_direction(ep.bEndpointAddress) == uu.ENDPOINT_OUT]
self.eps_in = [ep for ep in self.intf if uu.endpoint_direction(ep.bEndpointAddress) == uu.ENDPOINT_IN]
def tearDown(self):
uu.dispose_resources(self.dev)
def _inner_basic(self, ep_out, ep_in, data):
written = self.dev.write(ep_out, data)
self.assertEqual(written, len(data), "Should have written all bytes plz")
read = self.dev.read(ep_in, len(data))
self.assertEqual(len(data), len(read))
expected = array.array('B', [x for x in data])
self.assertEqual(expected, read, "should have read back what we wrote")
def test_simple_loop(self):
"""Plain simple loopback, does it work at all"""
eout = self.eps_out[0]
ein = self.eps_in[0]
data = [random.randrange(255) for _ in range(eout.wMaxPacketSize)]
self._inner_basic(eout, ein, data)
def test_dual_loop(self):
"""Testing that we don't mix our data up, just plain and simple"""
dlen = self.eps_out[0].wMaxPacketSize
data = [
[0xaa for _ in range(dlen)],
[0xbb for _ in range(dlen)],
]
for epo, epi, data in zip(self.eps_out, self.eps_in, data):
self._inner_basic(epo, epi, data)
def test_dual_loop_back_to_back(self):
"""
write to both, _before_ we read back...
This can expose problems with buffer management
"""
dlen = self.eps_out[0].wMaxPacketSize
data = [
[0xaa for _ in range(dlen)],
[0xbb for _ in range(dlen)],
]
written = [
self.dev.write(self.eps_out[0], data[0]),
self.dev.write(self.eps_out[1], data[1]),
]
read = [
self.dev.read(self.eps_in[0], dlen),
self.dev.read(self.eps_in[1], dlen),
]
for w, r, dat in zip(written, read, data):
self.assertEqual(w, len(dat), "Should have written all bytes plz")
self.assertEqual(len(dat), len(r), "Should have read back same size")
expected = array.array('B', [x for x in dat])
self.assertEqual(expected, r, "should have read back what we wrote")
@unittest.skip("Perf tests only on demand (comment this line!)")
class TestConfigSourceSinkPerformance(unittest.TestCase):
"""

View File

@ -88,6 +88,22 @@ static const struct usb_endpoint_descriptor endp_bulk[] = {
.wMaxPacketSize = BULK_EP_MAXPACKET,
.bInterval = 1,
},
{
.bLength = USB_DT_ENDPOINT_SIZE,
.bDescriptorType = USB_DT_ENDPOINT,
.bEndpointAddress = 0x81,
.bmAttributes = USB_ENDPOINT_ATTR_BULK,
.wMaxPacketSize = BULK_EP_MAXPACKET,
.bInterval = 1,
},
{
.bLength = USB_DT_ENDPOINT_SIZE,
.bDescriptorType = USB_DT_ENDPOINT,
.bEndpointAddress = 0x2,
.bmAttributes = USB_ENDPOINT_ATTR_BULK,
.wMaxPacketSize = BULK_EP_MAXPACKET,
.bInterval = 1,
},
{
.bLength = USB_DT_ENDPOINT_SIZE,
.bDescriptorType = USB_DT_ENDPOINT,
@ -95,7 +111,7 @@ static const struct usb_endpoint_descriptor endp_bulk[] = {
.bmAttributes = USB_ENDPOINT_ATTR_BULK,
.wMaxPacketSize = BULK_EP_MAXPACKET,
.bInterval = 1,
}
},
};
static const struct usb_interface_descriptor iface_sourcesink[] = {
@ -117,7 +133,7 @@ static const struct usb_interface_descriptor iface_loopback[] = {
.bDescriptorType = USB_DT_INTERFACE,
.bInterfaceNumber = 0, /* still 0, as it's a different config...? */
.bAlternateSetting = 0,
.bNumEndpoints = 2,
.bNumEndpoints = 4,
.bInterfaceClass = USB_CLASS_VENDOR,
.iInterface = 0,
.endpoint = endp_bulk,
@ -239,18 +255,20 @@ static void gadget0_ss_in_cb(usbd_device *usbd_dev, uint8_t ep)
/*assert(x == sizeof(buf));*/
}
static void gadget0_rx_cb_loopback(usbd_device *usbd_dev, uint8_t ep)
static void gadget0_in_cb_loopback(usbd_device *usbd_dev, uint8_t ep)
{
(void) usbd_dev;
ER_DPRINTF("loop rx %x\n", ep);
/* TODO - unimplemented - consult linux source on proper behaviour */
ER_DPRINTF("loop IN %x\n", ep);
/* Nothing to do here, basically just indicates they read us. */
}
static void gadget0_tx_cb_loopback(usbd_device *usbd_dev, uint8_t ep)
static void gadget0_out_cb_loopback(usbd_device *usbd_dev, uint8_t ep)
{
(void) usbd_dev;
ER_DPRINTF("loop tx %x\n", ep);
/* TODO - unimplemented - consult linux source on proper behaviour */
uint8_t buf[BULK_EP_MAXPACKET];
/* Copy data we received on OUT ep back to the paired IN ep */
int x = usbd_ep_read_packet(usbd_dev, ep, buf, BULK_EP_MAXPACKET);
int y = usbd_ep_write_packet(usbd_dev, 0x80 | ep, buf, x);
ER_DPRINTF("loop OUT %x got %d => %d\n", ep, x, y);
}
static enum usbd_request_return_codes gadget0_control_request(usbd_device *usbd_dev,
@ -310,7 +328,7 @@ static void gadget0_set_config(usbd_device *usbd_dev, uint16_t wValue)
state.test_unaligned = 0;
usbd_ep_setup(usbd_dev, 0x01, USB_ENDPOINT_ATTR_BULK, BULK_EP_MAXPACKET,
gadget0_ss_out_cb);
usbd_ep_setup(usbd_dev, 0x82, USB_ENDPOINT_ATTR_BULK, BULK_EP_MAXPACKET,
usbd_ep_setup(usbd_dev, 0x81, USB_ENDPOINT_ATTR_BULK, BULK_EP_MAXPACKET,
gadget0_ss_in_cb);
usbd_register_control_callback(
usbd_dev,
@ -318,13 +336,23 @@ static void gadget0_set_config(usbd_device *usbd_dev, uint16_t wValue)
USB_REQ_TYPE_TYPE | USB_REQ_TYPE_RECIPIENT,
gadget0_control_request);
/* Prime source for IN data. */
gadget0_ss_in_cb(usbd_dev, 0x82);
gadget0_ss_in_cb(usbd_dev, 0x81);
break;
case GZ_CFG_LOOPBACK:
/*
* The ordering here is important, as it defines the addresses
* locality. We want to have both out endpoints in sequentially,
* so we can test for overrunning our memory space, if that's a
* concern on the usb peripheral.
*/
usbd_ep_setup(usbd_dev, 0x01, USB_ENDPOINT_ATTR_BULK, BULK_EP_MAXPACKET,
gadget0_rx_cb_loopback);
gadget0_out_cb_loopback);
usbd_ep_setup(usbd_dev, 0x02, USB_ENDPOINT_ATTR_BULK, BULK_EP_MAXPACKET,
gadget0_out_cb_loopback);
usbd_ep_setup(usbd_dev, 0x81, USB_ENDPOINT_ATTR_BULK, BULK_EP_MAXPACKET,
gadget0_in_cb_loopback);
usbd_ep_setup(usbd_dev, 0x82, USB_ENDPOINT_ATTR_BULK, BULK_EP_MAXPACKET,
gadget0_tx_cb_loopback);
gadget0_in_cb_loopback);
break;
default:
ER_DPRINTF("set configuration unknown: %d\n", wValue);