osmo_io: Add io_uring backend

Change-Id: I5152129eb84b31ccc9e02bc2a5c5bdb046d331bc
This commit is contained in:
Daniel Willmann 2023-01-04 18:20:55 +01:00
parent e486e012c8
commit f91d2aac9d
8 changed files with 476 additions and 3 deletions

View File

@ -175,6 +175,20 @@ AC_CONFIG_HEADER(config.h)
PKG_CHECK_MODULES(TALLOC, [talloc >= 2.1.0])
AC_ARG_ENABLE([uring], [AS_HELP_STRING([--disable-uring], [Build without io_uring support])],
[
ENABLE_URING=$enableval
],
[
ENABLE_URING="yes"
])
AS_IF([test "x$ENABLE_URING" = "xyes"], [
PKG_CHECK_MODULES(URING, [liburing >= 0.7])
AC_DEFINE([HAVE_URING],[1],[Build with io_uring support])
])
AM_CONDITIONAL(ENABLE_URING, test "x$ENABLE_URING" = "xyes")
AC_SUBST(ENABLE_URING)
AC_ARG_ENABLE([pcsc], [AS_HELP_STRING([--disable-pcsc], [Build without PC/SC support])],
[
ENABLE_PCSC=$enableval

View File

@ -32,6 +32,7 @@ BuildRequires: pkgconfig(libusb-1.0)
BuildRequires: pkgconfig(talloc) >= 2.1.0
BuildRequires: pkgconfig(libmnl)
BuildRequires: pkgconfig(libsystemd)
BuildRequires: pkgconfig(liburing)
%description
libosmocore is a package with various utility functions that were

View File

@ -27,6 +27,7 @@ enum osmo_io_fd_mode {
enum osmo_io_backend {
OSMO_IO_BACKEND_POLL,
OSMO_IO_BACKEND_IO_URING,
};
extern const struct value_string osmo_io_backend_names[];

View File

@ -4,7 +4,7 @@
LIBVERSION=20:0:0
AM_CPPFLAGS = -I$(top_srcdir)/include -I$(top_builddir)/include -I$(top_builddir)
AM_CFLAGS = -Wall $(TALLOC_CFLAGS) $(PTHREAD_CFLAGS) $(LIBSCTP_CFLAGS) $(LIBMNL_CFLAGS)
AM_CFLAGS = -Wall $(TALLOC_CFLAGS) $(PTHREAD_CFLAGS) $(LIBSCTP_CFLAGS) $(LIBMNL_CFLAGS) $(URING_CFLAGS)
if ENABLE_PSEUDOTALLOC
AM_CPPFLAGS += -I$(top_srcdir)/src/pseudotalloc
@ -18,6 +18,7 @@ libosmocore_la_LIBADD = \
$(LIBRARY_RT) \
$(PTHREAD_LIBS) \
$(LIBSCTP_LIBS) \
$(URING_LIBS) \
$(NULL)
libosmocore_la_SOURCES = \
@ -156,5 +157,9 @@ BUILT_SOURCES += probes.h probes.lo
libosmocore_la_LIBADD += probes.lo
endif
if ENABLE_URING
libosmocore_la_SOURCES += osmo_io_uring.c
endif
crc%gen.c: crcXXgen.c.tpl
$(AM_V_GEN)sed -e's/XX/$*/g' $< > $@

View File

@ -47,6 +47,7 @@
const struct value_string osmo_io_backend_names[] = {
{ OSMO_IO_BACKEND_POLL, "poll" },
{ OSMO_IO_BACKEND_IO_URING, "io_uring" },
{ 0, NULL }
};
@ -55,12 +56,21 @@ static enum osmo_io_backend g_io_backend;
/* Used by some tests, can't be static */
struct iofd_backend_ops osmo_iofd_ops;
#if defined(HAVE_URING)
void osmo_iofd_uring_init(void);
#endif
/*! initialize osmo_io for the current thread */
void osmo_iofd_init(void)
{
switch (g_io_backend) {
case OSMO_IO_BACKEND_POLL:
break;
#if defined(HAVE_URING)
case OSMO_IO_BACKEND_IO_URING:
osmo_iofd_uring_init();
break;
#endif
default:
OSMO_ASSERT(0);
break;
@ -78,6 +88,11 @@ static __attribute__((constructor(103))) void on_dso_load_osmo_io(void)
if (!strcmp("POLL", backend)) {
g_io_backend = OSMO_IO_BACKEND_POLL;
osmo_iofd_ops = iofd_poll_ops;
#if defined(HAVE_URING)
} else if (!strcmp("IO_URING", backend)) {
g_io_backend = OSMO_IO_BACKEND_IO_URING;
osmo_iofd_ops = iofd_uring_ops;
#endif
} else {
fprintf(stderr, "Invalid osmo_io backend requested: \"%s\"\nCheck the environment variable %s\n", backend, OSMO_IO_BACKEND_ENV);
exit(1);

View File

@ -19,6 +19,10 @@
extern const struct iofd_backend_ops iofd_poll_ops;
#define OSMO_IO_BACKEND_DEFAULT "POLL"
#if defined(HAVE_URING)
extern const struct iofd_backend_ops iofd_uring_ops;
#endif
struct iofd_backend_ops {
int (*register_fd)(struct osmo_io_fd *iofd);
int (*unregister_fd)(struct osmo_io_fd *iofd);
@ -90,9 +94,9 @@ struct osmo_io_fd {
} poll;
struct {
bool read_enabled;
bool read_pending;
bool write_pending;
bool write_enabled;
void *read_msghdr;
void *write_msghdr;
/* TODO: index into array of registered fd's? */
} uring;
} u;

427
src/core/osmo_io_uring.c Normal file
View File

@ -0,0 +1,427 @@
/*! \file osmo_io_uring.c
* io_uring backend for osmo_io.
*
* (C) 2022-2023 by sysmocom s.f.m.c.
* Author: Daniel Willmann <daniel@sysmocom.de>
*
* All Rights Reserved.
*
* SPDX-License-Identifier: GPL-2.0+
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*/
/* TODO:
* Parameters:
* - number of simultaneous read/write in uring for given fd
*
*/
#include "../config.h"
#if defined(__linux__)
#include <stdio.h>
#include <talloc.h>
#include <unistd.h>
#include <string.h>
#include <stdbool.h>
#include <errno.h>
#include <sys/eventfd.h>
#include <liburing.h>
#include <osmocom/core/osmo_io.h>
#include <osmocom/core/linuxlist.h>
#include <osmocom/core/logging.h>
#include <osmocom/core/msgb.h>
#include <osmocom/core/select.h>
#include <osmocom/core/talloc.h>
#include <osmocom/core/utils.h>
#include <osmocom/core/socket.h>
#include "osmo_io_internal.h"
#define IOFD_URING_ENTRIES 4096
struct osmo_io_uring {
struct osmo_fd event_ofd;
struct io_uring ring;
};
static __thread struct osmo_io_uring g_ring;
static void iofd_uring_cqe(struct io_uring *ring);
static int iofd_uring_poll_cb(struct osmo_fd *ofd, unsigned int what)
{
struct io_uring *ring = ofd->data;
eventfd_t val;
int rc;
if (what & OSMO_FD_READ) {
rc = eventfd_read(ofd->fd, &val);
if (rc < 0) {
LOGP(DLIO, LOGL_ERROR, "eventfd_read() returned error\n");
return rc;
}
iofd_uring_cqe(ring);
}
if (what & OSMO_FD_WRITE)
OSMO_ASSERT(0);
return 0;
}
/*! initialize the uring and tie it into our event loop */
void osmo_iofd_uring_init(void)
{
int rc;
rc = io_uring_queue_init(IOFD_URING_ENTRIES, &g_ring.ring, 0);
if (rc < 0)
OSMO_ASSERT(0);
rc = eventfd(0, 0);
if (rc < 0) {
io_uring_queue_exit(&g_ring.ring);
OSMO_ASSERT(0);
}
osmo_fd_setup(&g_ring.event_ofd, rc, OSMO_FD_READ, iofd_uring_poll_cb, &g_ring.ring, 0);
osmo_fd_register(&g_ring.event_ofd);
io_uring_register_eventfd(&g_ring.ring, rc);
}
static void iofd_uring_submit_recv(struct osmo_io_fd *iofd, enum iofd_msg_action action)
{
struct msgb *msg;
struct iofd_msghdr *msghdr;
struct io_uring_sqe *sqe;
msg = iofd_msgb_pending_or_alloc(iofd);
if (!msg) {
LOGPIO(iofd, LOGL_ERROR, "Could not allocate msgb for reading\n");
OSMO_ASSERT(0);
}
msghdr = iofd_msghdr_alloc(iofd, action, msg);
if (!msghdr) {
LOGPIO(iofd, LOGL_ERROR, "Could not allocate msghdr for reading\n");
OSMO_ASSERT(0);
}
msghdr->iov[0].iov_base = msg->tail;
msghdr->iov[0].iov_len = msgb_tailroom(msg);
switch (action) {
case IOFD_ACT_READ:
break;
case IOFD_ACT_RECVFROM:
msghdr->hdr.msg_iov = &msghdr->iov[0];
msghdr->hdr.msg_iovlen = 1;
msghdr->hdr.msg_name = &msghdr->osa.u.sa;
msghdr->hdr.msg_namelen = osmo_sockaddr_size(&msghdr->osa);
break;
default:
OSMO_ASSERT(0);
}
sqe = io_uring_get_sqe(&g_ring.ring);
if (!sqe) {
LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
OSMO_ASSERT(0);
}
switch (action) {
case IOFD_ACT_READ:
io_uring_prep_readv(sqe, iofd->fd, msghdr->iov, 1, 0);
break;
case IOFD_ACT_RECVFROM:
io_uring_prep_recvmsg(sqe, iofd->fd, &msghdr->hdr, msghdr->flags);
break;
default:
OSMO_ASSERT(0);
}
io_uring_sqe_set_data(sqe, msghdr);
io_uring_submit(&g_ring.ring);
/* NOTE: This only works if we have one read per fd */
iofd->u.uring.read_msghdr = msghdr;
}
static void iofd_uring_handle_recv(struct iofd_msghdr *msghdr, int rc)
{
struct osmo_io_fd *iofd = msghdr->iofd;
struct msgb *msg = msghdr->msg;
if (rc > 0)
msgb_put(msg, rc);
if (!IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
iofd_handle_recv(iofd, msg, rc, msghdr);
if (iofd->u.uring.read_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
iofd_uring_submit_recv(iofd, msghdr->action);
else
iofd->u.uring.read_msghdr = NULL;
iofd_msghdr_free(msghdr);
}
static int iofd_uring_submit_tx(struct osmo_io_fd *iofd);
static void iofd_uring_handle_tx(struct iofd_msghdr *msghdr, int rc)
{
struct osmo_io_fd *iofd = msghdr->iofd;
if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
goto out_free;
/* Error during write */
if (rc < 0) {
if (msghdr->action == IOFD_ACT_WRITE)
iofd->io_ops.write_cb(iofd, rc, msghdr->msg);
else if (msghdr->action == IOFD_ACT_SENDTO)
iofd->io_ops.sendto_cb(iofd, rc, msghdr->msg, &msghdr->osa);
else
OSMO_ASSERT(0);
goto out_free;
}
/* Incomplete write */
if (rc < msgb_length(msghdr->msg)) {
/* Re-enqueue remaining data */
msgb_pull(msghdr->msg, rc);
msghdr->iov[0].iov_len = msgb_length(msghdr->msg);
iofd_txqueue_enqueue_front(iofd, msghdr);
goto out;
}
if (msghdr->action == IOFD_ACT_WRITE)
iofd->io_ops.write_cb(iofd, rc, msghdr->msg);
else if (msghdr->action == IOFD_ACT_SENDTO)
iofd->io_ops.sendto_cb(iofd, rc, msghdr->msg, &msghdr->osa);
else
OSMO_ASSERT(0);
out_free:
msgb_free(msghdr->msg);
iofd_msghdr_free(msghdr);
out:
iofd->u.uring.write_msghdr = NULL;
if (iofd->u.uring.write_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
iofd_uring_submit_tx(iofd);
}
static void iofd_uring_handle_completion(struct iofd_msghdr *msghdr, int res)
{
struct osmo_io_fd *iofd = msghdr->iofd;
IOFD_FLAG_SET(iofd, IOFD_FLAG_IN_CALLBACK);
switch (msghdr->action) {
case IOFD_ACT_READ:
case IOFD_ACT_RECVFROM:
iofd_uring_handle_recv(msghdr, res);
break;
case IOFD_ACT_WRITE:
case IOFD_ACT_SENDTO:
iofd_uring_handle_tx(msghdr, res);
break;
default:
OSMO_ASSERT(0)
}
if (!iofd->u.uring.read_msghdr && !iofd->u.uring.write_msghdr)
IOFD_FLAG_UNSET(iofd, IOFD_FLAG_IN_CALLBACK);
if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_TO_FREE) && !iofd->u.uring.read_msghdr && !iofd->u.uring.write_msghdr)
talloc_free(iofd);
}
static void iofd_uring_cqe(struct io_uring *ring)
{
int rc;
struct io_uring_cqe *cqe;
struct iofd_msghdr *msghdr;
while (io_uring_peek_cqe(ring, &cqe) == 0) {
msghdr = io_uring_cqe_get_data(cqe);
if (!msghdr) {
LOGP(DLIO, LOGL_DEBUG, "Cancellation returned\n");
io_uring_cqe_seen(ring, cqe);
continue;
}
rc = cqe->res;
/* Hand the entry back to the kernel before */
io_uring_cqe_seen(ring, cqe);
iofd_uring_handle_completion(msghdr, rc);
}
}
static int iofd_uring_submit_tx(struct osmo_io_fd *iofd)
{
struct io_uring_sqe *sqe;
struct iofd_msghdr *msghdr;
msghdr = iofd_txqueue_dequeue(iofd);
if (!msghdr)
return -ENODATA;
sqe = io_uring_get_sqe(&g_ring.ring);
if (!sqe) {
LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
OSMO_ASSERT(0);
}
io_uring_sqe_set_data(sqe, msghdr);
switch (msghdr->action) {
case IOFD_ACT_WRITE:
case IOFD_ACT_SENDTO:
io_uring_prep_sendmsg(sqe, msghdr->iofd->fd, &msghdr->hdr, msghdr->flags);
break;
default:
OSMO_ASSERT(0);
}
io_uring_submit(&g_ring.ring);
iofd->u.uring.write_msghdr = msghdr;
return 0;
}
static void iofd_uring_write_enable(struct osmo_io_fd *iofd);
static void iofd_uring_read_enable(struct osmo_io_fd *iofd);
static int iofd_uring_register(struct osmo_io_fd *iofd)
{
return 0;
}
static int iofd_uring_unregister(struct osmo_io_fd *iofd)
{
struct io_uring_sqe *sqe;
if (iofd->u.uring.read_msghdr) {
sqe = io_uring_get_sqe(&g_ring.ring);
OSMO_ASSERT(sqe != NULL);
io_uring_sqe_set_data(sqe, NULL);
LOGPIO(iofd, LOGL_DEBUG, "Cancelling read\n");
io_uring_prep_cancel(sqe, iofd->u.uring.read_msghdr, 0);
}
if (iofd->u.uring.write_msghdr) {
sqe = io_uring_get_sqe(&g_ring.ring);
OSMO_ASSERT(sqe != NULL);
io_uring_sqe_set_data(sqe, NULL);
LOGPIO(iofd, LOGL_DEBUG, "Cancelling write\n");
io_uring_prep_cancel(sqe, iofd->u.uring.write_msghdr, 0);
}
io_uring_submit(&g_ring.ring);
return 0;
}
static void iofd_uring_write_enable(struct osmo_io_fd *iofd)
{
iofd->u.uring.write_enabled = true;
if (iofd->u.uring.write_msghdr)
return;
if (osmo_iofd_txqueue_len(iofd) > 0)
iofd_uring_submit_tx(iofd);
else if (iofd->mode == OSMO_IO_FD_MODE_READ_WRITE) {
/* Empty write request to check when the socket is connected */
struct iofd_msghdr *msghdr;
struct io_uring_sqe *sqe;
struct msgb *msg = msgb_alloc_headroom(0, 0, "io_uring write dummy");
if (!msg) {
LOGPIO(iofd, LOGL_ERROR, "Could not allocate msgb for writing\n");
OSMO_ASSERT(0);
}
msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_WRITE, msg);
if (!msghdr) {
LOGPIO(iofd, LOGL_ERROR, "Could not allocate msghdr for writing\n");
OSMO_ASSERT(0);
}
msghdr->iov[0].iov_base = msgb_data(msg);
msghdr->iov[0].iov_len = msgb_tailroom(msg);
sqe = io_uring_get_sqe(&g_ring.ring);
if (!sqe) {
LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
OSMO_ASSERT(0);
}
// Prep msgb/iov
io_uring_prep_writev(sqe, iofd->fd, msghdr->iov, 1, 0);
io_uring_sqe_set_data(sqe, msghdr);
io_uring_submit(&g_ring.ring);
iofd->u.uring.write_msghdr = msghdr;
}
}
static void iofd_uring_write_disable(struct osmo_io_fd *iofd)
{
iofd->u.uring.write_enabled = false;
}
static void iofd_uring_read_enable(struct osmo_io_fd *iofd)
{
iofd->u.uring.read_enabled = true;
if (iofd->u.uring.read_msghdr)
return;
switch (iofd->mode) {
case OSMO_IO_FD_MODE_READ_WRITE:
iofd_uring_submit_recv(iofd, IOFD_ACT_READ);
break;
case OSMO_IO_FD_MODE_RECVFROM_SENDTO:
iofd_uring_submit_recv(iofd, IOFD_ACT_RECVFROM);
break;
default:
OSMO_ASSERT(0);
}
}
static void iofd_uring_read_disable(struct osmo_io_fd *iofd)
{
iofd->u.uring.read_enabled = false;
}
static int iofd_uring_close(struct osmo_io_fd *iofd)
{
iofd_uring_read_disable(iofd);
iofd_uring_write_disable(iofd);
iofd_uring_unregister(iofd);
return close(iofd->fd);
}
const struct iofd_backend_ops iofd_uring_ops = {
.register_fd = iofd_uring_register,
.unregister_fd = iofd_uring_unregister,
.close = iofd_uring_close,
.write_enable = iofd_uring_write_enable,
.write_disable = iofd_uring_write_disable,
.read_enable = iofd_uring_read_enable,
.read_disable = iofd_uring_read_disable,
};
#endif /* defined(__linux__) */

View File

@ -95,6 +95,9 @@ static void test_connected(void)
osmo_iofd_free(iofd1);
osmo_iofd_free(iofd2);
for (int i = 0; i < 128; i++)
osmo_select_main(1);
}
static void recvfrom_cb(struct osmo_io_fd *iofd, int rc, struct msgb *msg,
@ -147,6 +150,9 @@ static void test_unconnected(void)
osmo_iofd_free(iofd1);
osmo_iofd_free(iofd2);
for (int i = 0; i < 128; i++)
osmo_select_main(1);
}
static const struct log_info_cat default_categories[] = {
};