diff --git a/include/osmocom/core/osmo_io.h b/include/osmocom/core/osmo_io.h index 46e1a5399..ec2896b54 100644 --- a/include/osmocom/core/osmo_io.h +++ b/include/osmocom/core/osmo_io.h @@ -4,6 +4,8 @@ #pragma once +#include + #include #include #include @@ -21,8 +23,8 @@ enum osmo_io_fd_mode { OSMO_IO_FD_MODE_READ_WRITE, /*! use recvfrom() / sendto() calls */ OSMO_IO_FD_MODE_RECVFROM_SENDTO, - /*! emulate sctp_recvmsg() and sctp_send() */ - OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND, + /*! emulate recvmsg() / sendmsg() */ + OSMO_IO_FD_MODE_RECVMSG_SENDMSG, }; enum osmo_io_backend { @@ -66,6 +68,13 @@ struct osmo_io_ops { struct msgb *msg, const struct osmo_sockaddr *daddr); }; + + /* mode OSMO_IO_FD_MODE_RECVMSG_SENDMSG: */ + struct { + void (*recvmsg_cb)(struct osmo_io_fd *iofd, int res, + struct msgb *msg, const struct msghdr *msgh); + void (*sendmsg_cb)(struct osmo_io_fd *iofd, int res, struct msgb *msg); + }; }; }; @@ -73,6 +82,7 @@ void osmo_iofd_init(void); struct osmo_io_fd *osmo_iofd_setup(const void *ctx, int fd, const char *name, enum osmo_io_fd_mode mode, const struct osmo_io_ops *ioops, void *data); +int osmo_iofd_set_cmsg_size(struct osmo_io_fd *iofd, size_t cmsg_size); int osmo_iofd_register(struct osmo_io_fd *iofd, int fd); int osmo_iofd_unregister(struct osmo_io_fd *iofd); unsigned int osmo_iofd_txqueue_len(struct osmo_io_fd *iofd); @@ -85,6 +95,8 @@ void osmo_iofd_notify_connected(struct osmo_io_fd *iofd); int osmo_iofd_write_msgb(struct osmo_io_fd *iofd, struct msgb *msg); int osmo_iofd_sendto_msgb(struct osmo_io_fd *iofd, struct msgb *msg, int sendto_flags, const struct osmo_sockaddr *dest); +int osmo_iofd_sendmsg_msgb(struct osmo_io_fd *iofd, struct msgb *msg, int sendmsg_flags, + const struct msghdr *msgh); void osmo_iofd_set_alloc_info(struct osmo_io_fd *iofd, unsigned int size, unsigned int headroom); void osmo_iofd_set_txqueue_max_length(struct osmo_io_fd *iofd, unsigned int size); diff --git a/src/core/libosmocore.map b/src/core/libosmocore.map index b66e37d65..a50a9ed45 100644 --- a/src/core/libosmocore.map +++ b/src/core/libosmocore.map @@ -266,7 +266,10 @@ osmo_iofd_init; osmo_iofd_ops; osmo_iofd_register; osmo_iofd_sendto_msgb; +osmo_iofd_sctp_send_msgb; +osmo_iofd_sendmsg_msgb; osmo_iofd_set_alloc_info; +osmo_iofd_set_cmsg_size; osmo_iofd_set_data; osmo_iofd_set_ioops; osmo_iofd_set_priv_nr; diff --git a/src/core/osmo_io.c b/src/core/osmo_io.c index 2f41bf4c7..44506ad6a 100644 --- a/src/core/osmo_io.c +++ b/src/core/osmo_io.c @@ -1,8 +1,8 @@ /*! \file osmo_io.c * New osmocom async I/O API. * - * (C) 2022 by Harald Welte - * (C) 2022-2023 by sysmocom - s.f.m.c. GmbH + * (C) 2022-2024 by Harald Welte + * (C) 2022-2024 by sysmocom - s.f.m.c. GmbH * Author: Daniel Willmann * * All Rights Reserved. @@ -105,8 +105,10 @@ static __attribute__((constructor(103))) void on_dso_load_osmo_io(void) * \param[in] iofd the osmo_io file structure * \param[in] action the action this msg(hdr) is for (read, write, ..) * \param[in] msg the msg buffer to use. Will allocate a new one if NULL + * \param[in] cmsg_size size (in bytes) of iofd_msghdr.cmsg buffer. Can be 0 if cmsg is not used. * \returns the newly allocated msghdr or NULL in case of error */ -struct iofd_msghdr *iofd_msghdr_alloc(struct osmo_io_fd *iofd, enum iofd_msg_action action, struct msgb *msg) +struct iofd_msghdr *iofd_msghdr_alloc(struct osmo_io_fd *iofd, enum iofd_msg_action action, struct msgb *msg, + size_t cmsg_size) { bool free_msg = false; struct iofd_msghdr *hdr; @@ -120,7 +122,7 @@ struct iofd_msghdr *iofd_msghdr_alloc(struct osmo_io_fd *iofd, enum iofd_msg_act talloc_steal(iofd, msg); } - hdr = talloc_zero(iofd, struct iofd_msghdr); + hdr = talloc_zero_size(iofd, sizeof(struct iofd_msghdr) + cmsg_size); if (!hdr) { if (free_msg) talloc_free(msg); @@ -337,8 +339,10 @@ void iofd_handle_recv(struct osmo_io_fd *iofd, struct msgb *msg, int rc, struct case OSMO_IO_FD_MODE_RECVFROM_SENDTO: iofd->io_ops.recvfrom_cb(iofd, rc, msg, &hdr->osa); break; - case OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND: - /* TODO Implement */ + case OSMO_IO_FD_MODE_RECVMSG_SENDMSG: + iofd->io_ops.recvmsg_cb(iofd, rc, msg, &hdr->hdr); + break; + default: OSMO_ASSERT(false); break; } @@ -376,6 +380,9 @@ void iofd_handle_send_completion(struct osmo_io_fd *iofd, int rc, struct iofd_ms case IOFD_ACT_SENDTO: iofd->io_ops.sendto_cb(iofd, rc, msg, &msghdr->osa); break; + case IOFD_ACT_SENDMSG: + iofd->io_ops.sendmsg_cb(iofd, rc, msg); + break; default: OSMO_ASSERT(0); } @@ -406,7 +413,7 @@ int osmo_iofd_write_msgb(struct osmo_io_fd *iofd, struct msgb *msg) return -EINVAL; } - struct iofd_msghdr *msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_WRITE, msg); + struct iofd_msghdr *msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_WRITE, msg, 0); if (!msghdr) return -ENOMEM; @@ -448,7 +455,7 @@ int osmo_iofd_sendto_msgb(struct osmo_io_fd *iofd, struct msgb *msg, int sendto_ return -EINVAL; } - struct iofd_msghdr *msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_SENDTO, msg); + struct iofd_msghdr *msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_SENDTO, msg, 0); if (!msghdr) return -ENOMEM; @@ -473,6 +480,77 @@ int osmo_iofd_sendto_msgb(struct osmo_io_fd *iofd, struct msgb *msg, int sendto_ return 0; } +/*! ismo_io equivalent of the sendmsg(2) socket API call + * + * Appends the message to the internal transmit queue. + * If the function returns success (0), it will take ownership of the msgb and + * internally call msgb_free() after the write request completes. + * In case of an error the msgb needs to be freed by the caller. + * \param[in] iofd file descriptor to write to + * \param[in] msg message buffer to send; is used to fill msgh->iov[] + * \param[in] sendmsg_flags Flags to pass to the send call + * \param[in] msgh 'struct msghdr' for name/control/flags. iov must be empty! + * \returns 0 in case of success; a negative value in case of error + */ +int osmo_iofd_sendmsg_msgb(struct osmo_io_fd *iofd, struct msgb *msg, int sendmsg_flags, const struct msghdr *msgh) +{ + int rc; + struct iofd_msghdr *msghdr; + + OSMO_ASSERT(iofd->mode == OSMO_IO_FD_MODE_RECVMSG_SENDMSG); + if (OSMO_UNLIKELY(!iofd->io_ops.sendmsg_cb)) { + LOGPIO(iofd, LOGL_ERROR, "sendmsg_cb not set, Rejecting msgb\n"); + return -EINVAL; + } + + if (OSMO_UNLIKELY(msgh->msg_namelen > sizeof(msghdr->osa))) { + LOGPIO(iofd, LOGL_ERROR, "osmo_iofd_sendmsg msg_namelen (%u) > supported %zu bytes\n", + msgh->msg_namelen, sizeof(msghdr->osa)); + return -EINVAL; + } + + if (OSMO_UNLIKELY(msgh->msg_iovlen)) { + LOGPIO(iofd, LOGL_ERROR, "osmo_iofd_sendmsg must have all in 'struct msgb', not in 'msg_iov'\n"); + return -EINVAL; + } + + msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_SENDMSG, msg, msgh->msg_controllen); + if (!msghdr) + return -ENOMEM; + + /* copy over optional address */ + if (msgh->msg_name) { + memcpy(&msghdr->osa, msgh->msg_name, msgh->msg_namelen); + msghdr->hdr.msg_name = &msghdr->osa.u.sa; + msghdr->hdr.msg_namelen = msgh->msg_namelen; + } + + /* build iov from msgb */ + msghdr->iov[0].iov_base = msgb_data(msghdr->msg); + msghdr->iov[0].iov_len = msgb_length(msghdr->msg); + msghdr->hdr.msg_iov = &msghdr->iov[0]; + msghdr->hdr.msg_iovlen = 1; + + /* copy over the cmsg from the msghdr */ + if (msgh->msg_control && msgh->msg_controllen) { + msghdr->hdr.msg_control = msghdr->cmsg; + msghdr->hdr.msg_controllen = msgh->msg_controllen; + memcpy(msghdr->cmsg, msgh->msg_control, msgh->msg_controllen); + } + + /* copy over msg_flags */ + msghdr->hdr.msg_flags = sendmsg_flags; + + rc = iofd_txqueue_enqueue(iofd, msghdr); + if (rc < 0) { + iofd_msghdr_free(msghdr); + LOGPIO(iofd, LOGL_ERROR, "enqueueing message failed (%d). Rejecting msgb\n", rc); + return rc; + } + + return 0; +} + /*! Allocate and setup a new iofd. * \param[in] ctx the parent context from which to allocate * \param[in] fd the underlying system file descriptor @@ -491,6 +569,7 @@ struct osmo_io_fd *osmo_iofd_setup(const void *ctx, int fd, const char *name, en switch (mode) { case OSMO_IO_FD_MODE_READ_WRITE: case OSMO_IO_FD_MODE_RECVFROM_SENDTO: + case OSMO_IO_FD_MODE_RECVMSG_SENDMSG: break; default: return NULL; @@ -524,6 +603,16 @@ struct osmo_io_fd *osmo_iofd_setup(const void *ctx, int fd, const char *name, en return iofd; } +/*! Set the size of the control message buffer allocated when submitting recvmsg */ +int osmo_iofd_set_cmsg_size(struct osmo_io_fd *iofd, size_t cmsg_size) +{ + if (iofd->mode != OSMO_IO_FD_MODE_RECVMSG_SENDMSG) + return -EINVAL; + + iofd->cmsg_size = cmsg_size; + return 0; +} + /*! Register the fd with the underlying backend. * * \param[in] iofd the iofd file descriptor @@ -739,7 +828,12 @@ void osmo_iofd_set_ioops(struct osmo_io_fd *iofd, const struct osmo_io_ops *ioop else osmo_iofd_ops.read_disable(iofd); break; - case OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND: + case OSMO_IO_FD_MODE_RECVMSG_SENDMSG: + if (iofd->io_ops.recvmsg_cb) + osmo_iofd_ops.read_enable(iofd); + else + osmo_iofd_ops.read_disable(iofd); + break; default: OSMO_ASSERT(0); } @@ -750,7 +844,8 @@ void osmo_iofd_set_ioops(struct osmo_io_fd *iofd, const struct osmo_io_ops *ioop * \param[in] iofd the file descriptor */ void osmo_iofd_notify_connected(struct osmo_io_fd *iofd) { - OSMO_ASSERT(iofd->mode == OSMO_IO_FD_MODE_READ_WRITE); + OSMO_ASSERT(iofd->mode == OSMO_IO_FD_MODE_READ_WRITE || + iofd->mode == OSMO_IO_FD_MODE_RECVMSG_SENDMSG); OSMO_ASSERT(osmo_iofd_ops.notify_connected); osmo_iofd_ops.notify_connected(iofd); } diff --git a/src/core/osmo_io_internal.h b/src/core/osmo_io_internal.h index 9c86e0511..af47a3db2 100644 --- a/src/core/osmo_io_internal.h +++ b/src/core/osmo_io_internal.h @@ -4,6 +4,7 @@ #include #include +#include #include #include @@ -72,6 +73,9 @@ struct osmo_io_fd { /*! private number, extending \a data */ unsigned int priv_nr; + /*! size of iofd_msghdr.cmsg[] when allocated in recvmsg path */ + size_t cmsg_size; + struct { /*! talloc context from which to allocate msgb when reading */ const void *ctx; @@ -109,7 +113,8 @@ enum iofd_msg_action { IOFD_ACT_WRITE, IOFD_ACT_RECVFROM, IOFD_ACT_SENDTO, - // TODO: SCTP_* + IOFD_ACT_RECVMSG, + IOFD_ACT_SENDMSG, }; @@ -132,6 +137,9 @@ struct iofd_msghdr { struct msgb *msg; /*! I/O file descriptor on which we perform this I/O operation */ struct osmo_io_fd *iofd; + + /*! control message buffer for passing sctp_sndrcvinfo along */ + char cmsg[0]; /* size is determined by iofd->cmsg_size on recvmsg, and by mcghdr->msg_controllen on sendmsg */ }; enum iofd_seg_act { @@ -140,7 +148,7 @@ enum iofd_seg_act { IOFD_SEG_ACT_DEFER, }; -struct iofd_msghdr *iofd_msghdr_alloc(struct osmo_io_fd *iofd, enum iofd_msg_action action, struct msgb *msg); +struct iofd_msghdr *iofd_msghdr_alloc(struct osmo_io_fd *iofd, enum iofd_msg_action action, struct msgb *msg, size_t cmsg_size); void iofd_msghdr_free(struct iofd_msghdr *msghdr); struct msgb *iofd_msgb_alloc(struct osmo_io_fd *iofd); diff --git a/src/core/osmo_io_poll.c b/src/core/osmo_io_poll.c index 89ecbb67c..ff4cc19bb 100644 --- a/src/core/osmo_io_poll.c +++ b/src/core/osmo_io_poll.c @@ -49,6 +49,7 @@ static void iofd_poll_ofd_cb_recvmsg_sendmsg(struct osmo_fd *ofd, unsigned int w if (what & OSMO_FD_READ) { struct iofd_msghdr hdr; + msg = iofd_msgb_pending_or_alloc(iofd); if (!msg) { LOGPIO(iofd, LOGL_ERROR, "Could not allocate msgb for reading\n"); @@ -64,6 +65,10 @@ static void iofd_poll_ofd_cb_recvmsg_sendmsg(struct osmo_fd *ofd, unsigned int w .msg_name = &hdr.osa.u.sa, .msg_namelen = sizeof(struct osmo_sockaddr), }; + if (iofd->mode == OSMO_IO_FD_MODE_RECVMSG_SENDMSG) { + hdr.hdr.msg_control = alloca(iofd->cmsg_size); + hdr.hdr.msg_controllen = iofd->cmsg_size; + } rc = recvmsg(ofd->fd, &hdr.hdr, flags); if (rc > 0) @@ -81,10 +86,16 @@ static void iofd_poll_ofd_cb_recvmsg_sendmsg(struct osmo_fd *ofd, unsigned int w rc = sendmsg(ofd->fd, &msghdr->hdr, msghdr->flags); iofd_handle_send_completion(iofd, rc, msghdr); } else { - if (iofd->mode == OSMO_IO_FD_MODE_READ_WRITE) + switch (iofd->mode) { + case OSMO_IO_FD_MODE_READ_WRITE: + case OSMO_IO_FD_MODE_RECVMSG_SENDMSG: /* Socket is writable, but we have no data to send. A non-blocking/async connect() is signalled this way. */ iofd->io_ops.write_cb(iofd, 0, NULL); + break; + default: + break; + } if (osmo_iofd_txqueue_len(iofd) == 0) iofd_poll_ops.write_disable(iofd); } diff --git a/src/core/osmo_io_uring.c b/src/core/osmo_io_uring.c index 24d1e0857..1c47e64ae 100644 --- a/src/core/osmo_io_uring.c +++ b/src/core/osmo_io_uring.c @@ -3,6 +3,7 @@ * * (C) 2022-2023 by sysmocom s.f.m.c. * Author: Daniel Willmann + * (C) 2023-2024 by Harald Welte * * All Rights Reserved. * @@ -35,6 +36,8 @@ #include #include +#include +#include #include #include @@ -114,7 +117,7 @@ static void iofd_uring_submit_recv(struct osmo_io_fd *iofd, enum iofd_msg_action OSMO_ASSERT(0); } - msghdr = iofd_msghdr_alloc(iofd, action, msg); + msghdr = iofd_msghdr_alloc(iofd, action, msg, iofd->cmsg_size); if (!msghdr) { LOGPIO(iofd, LOGL_ERROR, "Could not allocate msghdr for reading\n"); OSMO_ASSERT(0); @@ -126,6 +129,10 @@ static void iofd_uring_submit_recv(struct osmo_io_fd *iofd, enum iofd_msg_action switch (action) { case IOFD_ACT_READ: break; + case IOFD_ACT_RECVMSG: + msghdr->hdr.msg_control = msghdr->cmsg; + msghdr->hdr.msg_controllen = iofd->cmsg_size; + /* fall-through */ case IOFD_ACT_RECVFROM: msghdr->hdr.msg_iov = &msghdr->iov[0]; msghdr->hdr.msg_iovlen = 1; @@ -146,6 +153,7 @@ static void iofd_uring_submit_recv(struct osmo_io_fd *iofd, enum iofd_msg_action case IOFD_ACT_READ: io_uring_prep_readv(sqe, iofd->fd, msghdr->iov, 1, 0); break; + case IOFD_ACT_RECVMSG: case IOFD_ACT_RECVFROM: io_uring_prep_recvmsg(sqe, iofd->fd, &msghdr->hdr, msghdr->flags); break; @@ -210,10 +218,12 @@ static void iofd_uring_handle_completion(struct iofd_msghdr *msghdr, int res) switch (msghdr->action) { case IOFD_ACT_READ: case IOFD_ACT_RECVFROM: + case IOFD_ACT_RECVMSG: iofd_uring_handle_recv(msghdr, res); break; case IOFD_ACT_WRITE: case IOFD_ACT_SENDTO: + case IOFD_ACT_SENDMSG: iofd_uring_handle_tx(msghdr, res); break; default: @@ -273,6 +283,7 @@ static int iofd_uring_submit_tx(struct osmo_io_fd *iofd) switch (msghdr->action) { case IOFD_ACT_WRITE: case IOFD_ACT_SENDTO: + case IOFD_ACT_SENDMSG: io_uring_prep_sendmsg(sqe, msghdr->iofd->fd, &msghdr->hdr, msghdr->flags); break; default: @@ -334,22 +345,29 @@ static void iofd_uring_write_enable(struct osmo_io_fd *iofd) LOGPIO(iofd, LOGL_ERROR, "Could not allocate msgb for writing\n"); OSMO_ASSERT(0); } - msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_WRITE, msg); + msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_WRITE, msg, 0); if (!msghdr) { LOGPIO(iofd, LOGL_ERROR, "Could not allocate msghdr for writing\n"); OSMO_ASSERT(0); } msghdr->iov[0].iov_base = msgb_data(msg); - msghdr->iov[0].iov_len = msgb_tailroom(msg); + msghdr->iov[0].iov_len = msgb_length(msg); sqe = io_uring_get_sqe(&g_ring.ring); if (!sqe) { LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n"); OSMO_ASSERT(0); } - // Prep msgb/iov - io_uring_prep_writev(sqe, iofd->fd, msghdr->iov, 1, 0); + // Prep msgb/iov FIXME DEAD CODE (see else if above) */ + if (iofd->mode == OSMO_IO_FD_MODE_RECVMSG_SENDMSG) { + msghdr->hdr.msg_flags = MSG_NOSIGNAL; + msghdr->hdr.msg_iov = &msghdr->iov[0]; + msghdr->hdr.msg_iovlen = 1; + io_uring_prep_sendmsg(sqe, iofd->fd, &msghdr->hdr, msghdr->flags); + } else { + io_uring_prep_writev(sqe, iofd->fd, msghdr->iov, 1, 0); + } io_uring_sqe_set_data(sqe, msghdr); io_uring_submit(&g_ring.ring); @@ -376,6 +394,9 @@ static void iofd_uring_read_enable(struct osmo_io_fd *iofd) case OSMO_IO_FD_MODE_RECVFROM_SENDTO: iofd_uring_submit_recv(iofd, IOFD_ACT_RECVFROM); break; + case OSMO_IO_FD_MODE_RECVMSG_SENDMSG: + iofd_uring_submit_recv(iofd, IOFD_ACT_RECVMSG); + break; default: OSMO_ASSERT(0); }