osmo_io: sendmsg/recvmsg support

Add support osmo_io operations resembling sendmsg() and recvmsg() socket
operations.  This is what will enable the implementation of higher-layer
functions like equivalents of sctp_recvmsg() and sctp_send() in
libosmo-netif and/or other users.

Change-Id: I89eb519b22d21011d61a7855b2364bc3c295df82
Related: OS#5751
This commit is contained in:
Harald Welte 2023-11-18 18:51:58 +01:00 committed by Andreas Eversberg
parent 848faf9256
commit 1047ed7255
6 changed files with 169 additions and 20 deletions

View File

@ -4,6 +4,8 @@
#pragma once
#include <sys/socket.h>
#include <osmocom/core/linuxlist.h>
#include <osmocom/core/logging.h>
#include <osmocom/core/msgb.h>
@ -21,8 +23,8 @@ enum osmo_io_fd_mode {
OSMO_IO_FD_MODE_READ_WRITE,
/*! use recvfrom() / sendto() calls */
OSMO_IO_FD_MODE_RECVFROM_SENDTO,
/*! emulate sctp_recvmsg() and sctp_send() */
OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND,
/*! emulate recvmsg() / sendmsg() */
OSMO_IO_FD_MODE_RECVMSG_SENDMSG,
};
enum osmo_io_backend {
@ -65,12 +67,20 @@ struct osmo_io_ops {
struct msgb *msg,
const struct osmo_sockaddr *daddr);
};
/* mode OSMO_IO_FD_MODE_RECVMSG_SENDMSG: */
struct {
void (*recvmsg_cb)(struct osmo_io_fd *iofd, int res,
struct msgb *msg, const struct msghdr *msgh);
void (*sendmsg_cb)(struct osmo_io_fd *iofd, int res, struct msgb *msg);
};
};
void osmo_iofd_init(void);
struct osmo_io_fd *osmo_iofd_setup(const void *ctx, int fd, const char *name,
enum osmo_io_fd_mode mode, const struct osmo_io_ops *ioops, void *data);
int osmo_iofd_set_cmsg_size(struct osmo_io_fd *iofd, size_t cmsg_size);
int osmo_iofd_register(struct osmo_io_fd *iofd, int fd);
int osmo_iofd_unregister(struct osmo_io_fd *iofd);
unsigned int osmo_iofd_txqueue_len(struct osmo_io_fd *iofd);
@ -83,6 +93,8 @@ void osmo_iofd_notify_connected(struct osmo_io_fd *iofd);
int osmo_iofd_write_msgb(struct osmo_io_fd *iofd, struct msgb *msg);
int osmo_iofd_sendto_msgb(struct osmo_io_fd *iofd, struct msgb *msg, int sendto_flags,
const struct osmo_sockaddr *dest);
int osmo_iofd_sendmsg_msgb(struct osmo_io_fd *iofd, struct msgb *msg, int sendmsg_flags,
const struct msghdr *msgh);
void osmo_iofd_set_alloc_info(struct osmo_io_fd *iofd, unsigned int size, unsigned int headroom);
void osmo_iofd_set_txqueue_max_length(struct osmo_io_fd *iofd, unsigned int size);

View File

@ -266,7 +266,10 @@ osmo_iofd_init;
osmo_iofd_ops;
osmo_iofd_register;
osmo_iofd_sendto_msgb;
osmo_iofd_sctp_send_msgb;
osmo_iofd_sendmsg_msgb;
osmo_iofd_set_alloc_info;
osmo_iofd_set_cmsg_size;
osmo_iofd_set_data;
osmo_iofd_set_ioops;
osmo_iofd_set_priv_nr;

View File

@ -1,8 +1,8 @@
/*! \file osmo_io.c
* New osmocom async I/O API.
*
* (C) 2022 by Harald Welte <laforge@osmocom.org>
* (C) 2022-2023 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
* (C) 2022-2024 by Harald Welte <laforge@osmocom.org>
* (C) 2022-2024 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
* Author: Daniel Willmann <dwillmann@sysmocom.de>
*
* All Rights Reserved.
@ -105,8 +105,10 @@ static __attribute__((constructor(103))) void on_dso_load_osmo_io(void)
* \param[in] iofd the osmo_io file structure
* \param[in] action the action this msg(hdr) is for (read, write, ..)
* \param[in] msg the msg buffer to use. Will allocate a new one if NULL
* \param[in] cmsg_size size (in bytes) of iofd_msghdr.cmsg buffer. Can be 0 if cmsg is not used.
* \returns the newly allocated msghdr or NULL in case of error */
struct iofd_msghdr *iofd_msghdr_alloc(struct osmo_io_fd *iofd, enum iofd_msg_action action, struct msgb *msg)
struct iofd_msghdr *iofd_msghdr_alloc(struct osmo_io_fd *iofd, enum iofd_msg_action action, struct msgb *msg,
size_t cmsg_size)
{
bool free_msg = false;
struct iofd_msghdr *hdr;
@ -120,7 +122,7 @@ struct iofd_msghdr *iofd_msghdr_alloc(struct osmo_io_fd *iofd, enum iofd_msg_act
talloc_steal(iofd, msg);
}
hdr = talloc_zero(iofd, struct iofd_msghdr);
hdr = talloc_zero_size(iofd, sizeof(struct iofd_msghdr) + cmsg_size);
if (!hdr) {
if (free_msg)
talloc_free(msg);
@ -339,8 +341,10 @@ void iofd_handle_recv(struct osmo_io_fd *iofd, struct msgb *msg, int rc, struct
case OSMO_IO_FD_MODE_RECVFROM_SENDTO:
iofd->io_ops.recvfrom_cb(iofd, rc, msg, &hdr->osa);
break;
case OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND:
/* TODO Implement */
case OSMO_IO_FD_MODE_RECVMSG_SENDMSG:
iofd->io_ops.recvmsg_cb(iofd, rc, msg, &hdr->hdr);
break;
default:
OSMO_ASSERT(false);
break;
}
@ -378,6 +382,9 @@ void iofd_handle_send_completion(struct osmo_io_fd *iofd, int rc, struct iofd_ms
case IOFD_ACT_SENDTO:
iofd->io_ops.sendto_cb(iofd, rc, msg, &msghdr->osa);
break;
case IOFD_ACT_SENDMSG:
iofd->io_ops.sendmsg_cb(iofd, rc, msg);
break;
default:
OSMO_ASSERT(0);
}
@ -408,7 +415,7 @@ int osmo_iofd_write_msgb(struct osmo_io_fd *iofd, struct msgb *msg)
return -EINVAL;
}
struct iofd_msghdr *msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_WRITE, msg);
struct iofd_msghdr *msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_WRITE, msg, 0);
if (!msghdr)
return -ENOMEM;
@ -450,7 +457,7 @@ int osmo_iofd_sendto_msgb(struct osmo_io_fd *iofd, struct msgb *msg, int sendto_
return -EINVAL;
}
struct iofd_msghdr *msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_SENDTO, msg);
struct iofd_msghdr *msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_SENDTO, msg, 0);
if (!msghdr)
return -ENOMEM;
@ -475,14 +482,95 @@ int osmo_iofd_sendto_msgb(struct osmo_io_fd *iofd, struct msgb *msg, int sendto_
return 0;
}
/*! ismo_io equivalent of the sendmsg(2) socket API call
*
* Appends the message to the internal transmit queue.
* If the function returns success (0), it will take ownership of the msgb and
* internally call msgb_free() after the write request completes.
* In case of an error the msgb needs to be freed by the caller.
* \param[in] iofd file descriptor to write to
* \param[in] msg message buffer to send; is used to fill msgh->iov[]
* \param[in] sendmsg_flags Flags to pass to the send call
* \param[in] msgh 'struct msghdr' for name/control/flags. iov must be empty!
* \returns 0 in case of success; a negative value in case of error
*/
int osmo_iofd_sendmsg_msgb(struct osmo_io_fd *iofd, struct msgb *msg, int sendmsg_flags, const struct msghdr *msgh)
{
int rc;
struct iofd_msghdr *msghdr;
OSMO_ASSERT(iofd->mode == OSMO_IO_FD_MODE_RECVMSG_SENDMSG);
if (OSMO_UNLIKELY(!iofd->io_ops.sendmsg_cb)) {
LOGPIO(iofd, LOGL_ERROR, "sendmsg_cb not set, Rejecting msgb\n");
return -EINVAL;
}
if (OSMO_UNLIKELY(msgh->msg_namelen > sizeof(msghdr->osa))) {
LOGPIO(iofd, LOGL_ERROR, "osmo_iofd_sendmsg msg_namelen (%u) > supported %zu bytes\n",
msgh->msg_namelen, sizeof(msghdr->osa));
return -EINVAL;
}
if (OSMO_UNLIKELY(msgh->msg_iovlen)) {
LOGPIO(iofd, LOGL_ERROR, "osmo_iofd_sendmsg must have all in 'struct msgb', not in 'msg_iov'\n");
return -EINVAL;
}
msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_SENDMSG, msg, msgh->msg_controllen);
if (!msghdr)
return -ENOMEM;
/* copy over optional address */
if (msgh->msg_name) {
memcpy(&msghdr->osa, msgh->msg_name, msgh->msg_namelen);
msghdr->hdr.msg_name = &msghdr->osa.u.sa;
msghdr->hdr.msg_namelen = msgh->msg_namelen;
}
/* build iov from msgb */
msghdr->iov[0].iov_base = msgb_data(msghdr->msg);
msghdr->iov[0].iov_len = msgb_length(msghdr->msg);
msghdr->hdr.msg_iov = &msghdr->iov[0];
msghdr->hdr.msg_iovlen = 1;
/* copy over the cmsg from the msghdr */
if (msgh->msg_control && msgh->msg_controllen) {
msghdr->hdr.msg_control = msghdr->cmsg;
msghdr->hdr.msg_controllen = msgh->msg_controllen;
memcpy(msghdr->cmsg, msgh->msg_control, msgh->msg_controllen);
}
/* copy over msg_flags */
msghdr->hdr.msg_flags = sendmsg_flags;
rc = iofd_txqueue_enqueue(iofd, msghdr);
if (rc < 0) {
iofd_msghdr_free(msghdr);
LOGPIO(iofd, LOGL_ERROR, "enqueueing message failed (%d). Rejecting msgb\n", rc);
return rc;
}
return 0;
}
static int check_mode_callback_compat(enum osmo_io_fd_mode mode, const struct osmo_io_ops *ops)
{
switch (mode) {
case OSMO_IO_FD_MODE_READ_WRITE:
if (ops->recvfrom_cb || ops->sendto_cb)
return false;
if (ops->recvmsg_cb || ops->sendmsg_cb)
return false;
break;
case OSMO_IO_FD_MODE_RECVFROM_SENDTO:
if (ops->read_cb || ops->write_cb)
return false;
if (ops->recvmsg_cb || ops->sendmsg_cb)
return false;
break;
case OSMO_IO_FD_MODE_RECVMSG_SENDMSG:
if (ops->recvfrom_cb || ops->sendto_cb)
return false;
if (ops->read_cb || ops->write_cb)
return false;
break;
@ -511,6 +599,7 @@ struct osmo_io_fd *osmo_iofd_setup(const void *ctx, int fd, const char *name, en
switch (mode) {
case OSMO_IO_FD_MODE_READ_WRITE:
case OSMO_IO_FD_MODE_RECVFROM_SENDTO:
case OSMO_IO_FD_MODE_RECVMSG_SENDMSG:
break;
default:
return NULL;
@ -547,6 +636,16 @@ struct osmo_io_fd *osmo_iofd_setup(const void *ctx, int fd, const char *name, en
return iofd;
}
/*! Set the size of the control message buffer allocated when submitting recvmsg */
int osmo_iofd_set_cmsg_size(struct osmo_io_fd *iofd, size_t cmsg_size)
{
if (iofd->mode != OSMO_IO_FD_MODE_RECVMSG_SENDMSG)
return -EINVAL;
iofd->cmsg_size = cmsg_size;
return 0;
}
/*! Register the fd with the underlying backend.
*
* \param[in] iofd the iofd file descriptor
@ -567,7 +666,8 @@ int osmo_iofd_register(struct osmo_io_fd *iofd, int fd)
IOFD_FLAG_UNSET(iofd, IOFD_FLAG_CLOSED);
if ((iofd->mode == OSMO_IO_FD_MODE_READ_WRITE && iofd->io_ops.read_cb) ||
(iofd->mode == OSMO_IO_FD_MODE_RECVFROM_SENDTO && iofd->io_ops.recvfrom_cb)) {
(iofd->mode == OSMO_IO_FD_MODE_RECVFROM_SENDTO && iofd->io_ops.recvfrom_cb) ||
(iofd->mode == OSMO_IO_FD_MODE_RECVMSG_SENDMSG && iofd->io_ops.recvmsg_cb)) {
osmo_iofd_ops.read_enable(iofd);
}
@ -767,7 +867,12 @@ int osmo_iofd_set_ioops(struct osmo_io_fd *iofd, const struct osmo_io_ops *ioops
else
osmo_iofd_ops.read_disable(iofd);
break;
case OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND:
case OSMO_IO_FD_MODE_RECVMSG_SENDMSG:
if (iofd->io_ops.recvmsg_cb)
osmo_iofd_ops.read_enable(iofd);
else
osmo_iofd_ops.read_disable(iofd);
break;
default:
OSMO_ASSERT(0);
}
@ -780,7 +885,8 @@ int osmo_iofd_set_ioops(struct osmo_io_fd *iofd, const struct osmo_io_ops *ioops
* \param[in] iofd the file descriptor */
void osmo_iofd_notify_connected(struct osmo_io_fd *iofd)
{
OSMO_ASSERT(iofd->mode == OSMO_IO_FD_MODE_READ_WRITE);
OSMO_ASSERT(iofd->mode == OSMO_IO_FD_MODE_READ_WRITE ||
iofd->mode == OSMO_IO_FD_MODE_RECVMSG_SENDMSG);
OSMO_ASSERT(osmo_iofd_ops.notify_connected);
osmo_iofd_ops.notify_connected(iofd);
}

View File

@ -4,6 +4,7 @@
#include <unistd.h>
#include <stdbool.h>
#include <netinet/sctp.h>
#include <osmocom/core/osmo_io.h>
#include <osmocom/core/linuxlist.h>
@ -72,6 +73,9 @@ struct osmo_io_fd {
/*! private number, extending \a data */
unsigned int priv_nr;
/*! size of iofd_msghdr.cmsg[] when allocated in recvmsg path */
size_t cmsg_size;
struct {
/*! talloc context from which to allocate msgb when reading */
const void *ctx;
@ -109,7 +113,8 @@ enum iofd_msg_action {
IOFD_ACT_WRITE,
IOFD_ACT_RECVFROM,
IOFD_ACT_SENDTO,
// TODO: SCTP_*
IOFD_ACT_RECVMSG,
IOFD_ACT_SENDMSG,
};
@ -132,6 +137,9 @@ struct iofd_msghdr {
struct msgb *msg;
/*! I/O file descriptor on which we perform this I/O operation */
struct osmo_io_fd *iofd;
/*! control message buffer for passing sctp_sndrcvinfo along */
char cmsg[0]; /* size is determined by iofd->cmsg_size on recvmsg, and by mcghdr->msg_controllen on sendmsg */
};
enum iofd_seg_act {
@ -140,7 +148,7 @@ enum iofd_seg_act {
IOFD_SEG_ACT_DEFER,
};
struct iofd_msghdr *iofd_msghdr_alloc(struct osmo_io_fd *iofd, enum iofd_msg_action action, struct msgb *msg);
struct iofd_msghdr *iofd_msghdr_alloc(struct osmo_io_fd *iofd, enum iofd_msg_action action, struct msgb *msg, size_t cmsg_size);
void iofd_msghdr_free(struct iofd_msghdr *msghdr);
struct msgb *iofd_msgb_alloc(struct osmo_io_fd *iofd);

View File

@ -49,6 +49,7 @@ static void iofd_poll_ofd_cb_recvmsg_sendmsg(struct osmo_fd *ofd, unsigned int w
if (what & OSMO_FD_READ) {
struct iofd_msghdr hdr;
msg = iofd_msgb_pending_or_alloc(iofd);
if (!msg) {
LOGPIO(iofd, LOGL_ERROR, "Could not allocate msgb for reading\n");
@ -64,6 +65,10 @@ static void iofd_poll_ofd_cb_recvmsg_sendmsg(struct osmo_fd *ofd, unsigned int w
.msg_name = &hdr.osa.u.sa,
.msg_namelen = sizeof(struct osmo_sockaddr),
};
if (iofd->mode == OSMO_IO_FD_MODE_RECVMSG_SENDMSG) {
hdr.hdr.msg_control = alloca(iofd->cmsg_size);
hdr.hdr.msg_controllen = iofd->cmsg_size;
}
rc = recvmsg(ofd->fd, &hdr.hdr, flags);
if (rc > 0)
@ -90,13 +95,15 @@ static void iofd_poll_ofd_cb_recvmsg_sendmsg(struct osmo_fd *ofd, unsigned int w
case OSMO_IO_FD_MODE_RECVFROM_SENDTO:
iofd->io_ops.sendto_cb(iofd, 0, NULL, NULL);
break;
case OSMO_IO_FD_MODE_RECVMSG_SENDMSG:
iofd->io_ops.sendmsg_cb(iofd, 0, NULL);
break;
default:
break;
}
if (osmo_iofd_txqueue_len(iofd) == 0)
iofd_poll_ops.write_disable(iofd);
}
}
}

View File

@ -3,6 +3,7 @@
*
* (C) 2022-2023 by sysmocom s.f.m.c.
* Author: Daniel Willmann <daniel@sysmocom.de>
* (C) 2023-2024 by Harald Welte <laforge@osmocom.org>
*
* All Rights Reserved.
*
@ -35,6 +36,8 @@
#include <stdbool.h>
#include <errno.h>
#include <netinet/in.h>
#include <netinet/sctp.h>
#include <sys/eventfd.h>
#include <liburing.h>
@ -114,7 +117,7 @@ static void iofd_uring_submit_recv(struct osmo_io_fd *iofd, enum iofd_msg_action
OSMO_ASSERT(0);
}
msghdr = iofd_msghdr_alloc(iofd, action, msg);
msghdr = iofd_msghdr_alloc(iofd, action, msg, iofd->cmsg_size);
if (!msghdr) {
LOGPIO(iofd, LOGL_ERROR, "Could not allocate msghdr for reading\n");
OSMO_ASSERT(0);
@ -126,6 +129,10 @@ static void iofd_uring_submit_recv(struct osmo_io_fd *iofd, enum iofd_msg_action
switch (action) {
case IOFD_ACT_READ:
break;
case IOFD_ACT_RECVMSG:
msghdr->hdr.msg_control = msghdr->cmsg;
msghdr->hdr.msg_controllen = iofd->cmsg_size;
/* fall-through */
case IOFD_ACT_RECVFROM:
msghdr->hdr.msg_iov = &msghdr->iov[0];
msghdr->hdr.msg_iovlen = 1;
@ -146,6 +153,7 @@ static void iofd_uring_submit_recv(struct osmo_io_fd *iofd, enum iofd_msg_action
case IOFD_ACT_READ:
io_uring_prep_readv(sqe, iofd->fd, msghdr->iov, 1, 0);
break;
case IOFD_ACT_RECVMSG:
case IOFD_ACT_RECVFROM:
io_uring_prep_recvmsg(sqe, iofd->fd, &msghdr->hdr, msghdr->flags);
break;
@ -210,10 +218,12 @@ static void iofd_uring_handle_completion(struct iofd_msghdr *msghdr, int res)
switch (msghdr->action) {
case IOFD_ACT_READ:
case IOFD_ACT_RECVFROM:
case IOFD_ACT_RECVMSG:
iofd_uring_handle_recv(msghdr, res);
break;
case IOFD_ACT_WRITE:
case IOFD_ACT_SENDTO:
case IOFD_ACT_SENDMSG:
iofd_uring_handle_tx(msghdr, res);
break;
default:
@ -273,6 +283,7 @@ static int iofd_uring_submit_tx(struct osmo_io_fd *iofd)
switch (msghdr->action) {
case IOFD_ACT_WRITE:
case IOFD_ACT_SENDTO:
case IOFD_ACT_SENDMSG:
io_uring_prep_sendmsg(sqe, msghdr->iofd->fd, &msghdr->hdr, msghdr->flags);
break;
default:
@ -334,21 +345,20 @@ static void iofd_uring_write_enable(struct osmo_io_fd *iofd)
LOGPIO(iofd, LOGL_ERROR, "Could not allocate msgb for writing\n");
OSMO_ASSERT(0);
}
msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_WRITE, msg);
msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_WRITE, msg, 0);
if (!msghdr) {
LOGPIO(iofd, LOGL_ERROR, "Could not allocate msghdr for writing\n");
OSMO_ASSERT(0);
}
msghdr->iov[0].iov_base = msgb_data(msg);
msghdr->iov[0].iov_len = msgb_tailroom(msg);
msghdr->iov[0].iov_len = msgb_length(msg);
sqe = io_uring_get_sqe(&g_ring.ring);
if (!sqe) {
LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
OSMO_ASSERT(0);
}
// Prep msgb/iov
io_uring_prep_writev(sqe, iofd->fd, msghdr->iov, 1, 0);
io_uring_sqe_set_data(sqe, msghdr);
@ -376,6 +386,9 @@ static void iofd_uring_read_enable(struct osmo_io_fd *iofd)
case OSMO_IO_FD_MODE_RECVFROM_SENDTO:
iofd_uring_submit_recv(iofd, IOFD_ACT_RECVFROM);
break;
case OSMO_IO_FD_MODE_RECVMSG_SENDMSG:
iofd_uring_submit_recv(iofd, IOFD_ACT_RECVMSG);
break;
default:
OSMO_ASSERT(0);
}