osmo_io: SCTP support

Add support osmo_io operations resembling sctp_send() and sctp_recvmsg()
that is provided by libsctp of lk-sctp.

Change-Id: I89eb519b22d21011d61a7855b2364bc3c295df82
Related: OS#5751
This commit is contained in:
Harald Welte 2023-11-18 18:51:58 +01:00 committed by Andreas Eversberg
parent 34518d3c6d
commit 4a22c184f0
6 changed files with 128 additions and 8 deletions

View File

@ -14,6 +14,16 @@
#define LOGPIO(iofd, level, fmt, args...) \
LOGP(DLIO, level, "iofd(%s)" fmt, iofd->name, ## args)
/*! \brief Access SCTP flags from the msgb control buffer */
#define OSMO_STREAM_SCTP_MSG_FLAGS_NOTIFICATION 0x80 /* sctp_recvmsg() flags=MSG_NOTIFICATION, msgb_data() contains "union sctp_notification*" */
#define msgb_sctp_msg_flags(msg) (msg)->cb[2]
/*! \brief Access the SCTP PPID from the msgb control buffer */
#define msgb_sctp_ppid(msg) (msg)->cb[3]
/*! \brief Access the SCTP Stream ID from the msgb control buffer */
#define msgb_sctp_stream(msg) (msg)->cb[4]
struct sctp_sndrcvinfo;
struct osmo_io_fd;
enum osmo_io_fd_mode {
@ -36,7 +46,7 @@ static inline const char *osmo_io_backend_name(enum osmo_io_backend val)
struct osmo_io_ops {
union {
/* mode OSMO_IO_FD_MODE_READ_WRITE: */
/* mode OSMO_IO_FD_MODE_READ_WRITE and OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND: */
struct {
/*! call-back function when something was read from fd */
void (*read_cb)(struct osmo_io_fd *iofd, int res, struct msgb *msg);
@ -85,6 +95,7 @@ void osmo_iofd_notify_connected(struct osmo_io_fd *iofd);
int osmo_iofd_write_msgb(struct osmo_io_fd *iofd, struct msgb *msg);
int osmo_iofd_sendto_msgb(struct osmo_io_fd *iofd, struct msgb *msg, int sendto_flags,
const struct osmo_sockaddr *dest);
int osmo_iofd_sctp_send_msgb(struct osmo_io_fd *iofd, struct msgb *msg, int sendmsg_flags);
void osmo_iofd_set_alloc_info(struct osmo_io_fd *iofd, unsigned int size, unsigned int headroom);
void osmo_iofd_set_txqueue_max_length(struct osmo_io_fd *iofd, unsigned int size);

View File

@ -266,6 +266,7 @@ osmo_iofd_init;
osmo_iofd_ops;
osmo_iofd_register;
osmo_iofd_sendto_msgb;
osmo_iofd_sctp_send_msgb;
osmo_iofd_set_alloc_info;
osmo_iofd_set_data;
osmo_iofd_set_ioops;

View File

@ -329,6 +329,8 @@ void iofd_handle_segmented_read(struct osmo_io_fd *iofd, struct msgb *msg, int r
* \param[in] hdr serialized msghdr containing state of completed I/O */
void iofd_handle_recv(struct osmo_io_fd *iofd, struct msgb *msg, int rc, struct iofd_msghdr *hdr)
{
struct cmsghdr *cmsg = NULL;
talloc_steal(iofd->msgb_alloc.ctx, msg);
switch (iofd->mode) {
case OSMO_IO_FD_MODE_READ_WRITE:
@ -338,7 +340,25 @@ void iofd_handle_recv(struct osmo_io_fd *iofd, struct msgb *msg, int rc, struct
iofd->io_ops.recvfrom_cb(iofd, rc, msg, &hdr->osa);
break;
case OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND:
/* TODO Implement */
msgb_sctp_msg_flags(msg) = 0;
if (hdr->hdr.msg_flags & MSG_NOTIFICATION) {
msgb_sctp_msg_flags(msg) = OSMO_STREAM_SCTP_MSG_FLAGS_NOTIFICATION;
} else {
for (cmsg = CMSG_FIRSTHDR(&hdr->hdr); cmsg != NULL;
cmsg = CMSG_NXTHDR(&hdr->hdr, cmsg)) {
if (cmsg->cmsg_level == IPPROTO_SCTP && cmsg->cmsg_type == SCTP_SNDRCV) {
struct sctp_sndrcvinfo *sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
msgb_sctp_ppid(msg) = htonl(sinfo->sinfo_ppid);
msgb_sctp_stream(msg) = sinfo->sinfo_stream;
break;
}
}
if (rc > 0 && !cmsg)
LOGPIO(iofd, LOGL_ERROR, "sctp_recvmsg without SNDRCV cmsg?!?\n");
}
iofd->io_ops.read_cb(iofd, rc, msg);
break;
default:
OSMO_ASSERT(false);
break;
}
@ -371,6 +391,7 @@ void iofd_handle_send_completion(struct osmo_io_fd *iofd, int rc, struct iofd_ms
/* All other failure and success cases are handled here */
switch (msghdr->action) {
case IOFD_ACT_WRITE:
case IOFD_ACT_SCTP_SEND:
iofd->io_ops.write_cb(iofd, rc, msg);
break;
case IOFD_ACT_SENDTO:
@ -473,6 +494,63 @@ int osmo_iofd_sendto_msgb(struct osmo_io_fd *iofd, struct msgb *msg, int sendto_
return 0;
}
/*! Send a message through a connected SCTP socket, similar to sctp_sendmsg().
*
* Appends the message to the internal transmit queue.
* If the function returns success (0), it will take ownership of the msgb and
* internally call msgb_free() after the write request completes.
* In case of an error the msgb needs to be freed by the caller.
* \param[in] iofd file descriptor to write to
* \param[in] msg message buffer to send; uses msgb_sctp_ppid/msg_sctp_stream
* \param[in] sendmsg_flags Flags to pass to the send call
* \returns 0 in case of success; a negative value in case of error
*/
int osmo_iofd_sctp_send_msgb(struct osmo_io_fd *iofd, struct msgb *msg, int sendmsg_flags)
{
int rc;
struct cmsghdr *cmsg;
struct sctp_sndrcvinfo *sinfo;
OSMO_ASSERT(iofd->mode == OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND);
if (OSMO_UNLIKELY(!iofd->io_ops.write_cb)) {
LOGPIO(iofd, LOGL_ERROR, "write_cb not set, Rejecting msgb\n");
return -EINVAL;
}
struct iofd_msghdr *msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_SCTP_SEND, msg);
if (!msghdr)
return -ENOMEM;
msghdr->hdr.msg_flags = sendmsg_flags;
msghdr->iov[0].iov_base = msgb_data(msghdr->msg);
msghdr->iov[0].iov_len = msgb_length(msghdr->msg);
msghdr->hdr.msg_iov = &msghdr->iov[0];
msghdr->hdr.msg_iovlen = 1;
/* put together sctp_sndrcvinfo, just like libsctp's sctp_sensmsg() */
msghdr->hdr.msg_control = msghdr->cmsg;
msghdr->hdr.msg_controllen = sizeof(msghdr->cmsg);
cmsg = CMSG_FIRSTHDR(&msghdr->hdr);
cmsg->cmsg_level = IPPROTO_SCTP;
cmsg->cmsg_type = SCTP_SNDRCV;
cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
msghdr->hdr.msg_controllen = cmsg->cmsg_len;
sinfo = (struct sctp_sndrcvinfo *) CMSG_DATA(cmsg);
sinfo->sinfo_ppid = htonl(msgb_sctp_ppid(msg));
sinfo->sinfo_stream = msgb_sctp_stream(msg);
rc = iofd_txqueue_enqueue(iofd, msghdr);
if (rc < 0) {
iofd_msghdr_free(msghdr);
LOGPIO(iofd, LOGL_ERROR, "enqueueing message failed (%d). Rejecting msgb\n", rc);
return rc;
}
return 0;
}
/*! Allocate and setup a new iofd.
* \param[in] ctx the parent context from which to allocate
* \param[in] fd the underlying system file descriptor
@ -491,6 +569,7 @@ struct osmo_io_fd *osmo_iofd_setup(const void *ctx, int fd, const char *name, en
switch (mode) {
case OSMO_IO_FD_MODE_READ_WRITE:
case OSMO_IO_FD_MODE_RECVFROM_SENDTO:
case OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND:
break;
default:
return NULL;
@ -728,6 +807,7 @@ void osmo_iofd_set_ioops(struct osmo_io_fd *iofd, const struct osmo_io_ops *ioop
switch (iofd->mode) {
case OSMO_IO_FD_MODE_READ_WRITE:
case OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND:
if (iofd->io_ops.read_cb)
osmo_iofd_ops.read_enable(iofd);
else
@ -739,7 +819,6 @@ void osmo_iofd_set_ioops(struct osmo_io_fd *iofd, const struct osmo_io_ops *ioop
else
osmo_iofd_ops.read_disable(iofd);
break;
case OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND:
default:
OSMO_ASSERT(0);
}
@ -750,7 +829,8 @@ void osmo_iofd_set_ioops(struct osmo_io_fd *iofd, const struct osmo_io_ops *ioop
* \param[in] iofd the file descriptor */
void osmo_iofd_notify_connected(struct osmo_io_fd *iofd)
{
OSMO_ASSERT(iofd->mode == OSMO_IO_FD_MODE_READ_WRITE);
OSMO_ASSERT(iofd->mode == OSMO_IO_FD_MODE_READ_WRITE ||
iofd->mode == OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND);
OSMO_ASSERT(osmo_iofd_ops.notify_connected);
osmo_iofd_ops.notify_connected(iofd);
}

View File

@ -4,6 +4,7 @@
#include <unistd.h>
#include <stdbool.h>
#include <netinet/sctp.h>
#include <osmocom/core/osmo_io.h>
#include <osmocom/core/linuxlist.h>
@ -109,7 +110,8 @@ enum iofd_msg_action {
IOFD_ACT_WRITE,
IOFD_ACT_RECVFROM,
IOFD_ACT_SENDTO,
// TODO: SCTP_*
IOFD_ACT_SCTP_RECVMSG,
IOFD_ACT_SCTP_SEND,
};
@ -125,6 +127,8 @@ struct iofd_msghdr {
/*! io-vector we need to pass as argument to sendmsg/recvmsg; is set up
* to point into msg below */
struct iovec iov[1];
/*! control message buffer for passing sctp_sndrcvinfo along */
char cmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
/*! flags we pass as argument to sendmsg / recvmsg */
int flags;

View File

@ -64,6 +64,10 @@ static void iofd_poll_ofd_cb_recvmsg_sendmsg(struct osmo_fd *ofd, unsigned int w
.msg_name = &hdr.osa.u.sa,
.msg_namelen = sizeof(struct osmo_sockaddr),
};
if (iofd->mode == OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND) {
hdr.hdr.msg_control = hdr.cmsg;
hdr.hdr.msg_controllen = sizeof(hdr.cmsg);
}
rc = recvmsg(ofd->fd, &hdr.hdr, flags);
if (rc > 0)
@ -81,10 +85,16 @@ static void iofd_poll_ofd_cb_recvmsg_sendmsg(struct osmo_fd *ofd, unsigned int w
rc = sendmsg(ofd->fd, &msghdr->hdr, msghdr->flags);
iofd_handle_send_completion(iofd, rc, msghdr);
} else {
if (iofd->mode == OSMO_IO_FD_MODE_READ_WRITE)
switch (iofd->mode) {
case OSMO_IO_FD_MODE_READ_WRITE:
case OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND:
/* Socket is writable, but we have no data to send. A non-blocking/async
connect() is signalled this way. */
iofd->io_ops.write_cb(iofd, 0, NULL);
break;
default:
break;
}
if (osmo_iofd_txqueue_len(iofd) == 0)
iofd_poll_ops.write_disable(iofd);
}

View File

@ -3,6 +3,7 @@
*
* (C) 2022-2023 by sysmocom s.f.m.c.
* Author: Daniel Willmann <daniel@sysmocom.de>
* (C) 2023 by Harald Welte <laforge@osmocom.org>
*
* All Rights Reserved.
*
@ -35,6 +36,8 @@
#include <stdbool.h>
#include <errno.h>
#include <netinet/in.h>
#include <netinet/sctp.h>
#include <sys/eventfd.h>
#include <liburing.h>
@ -126,6 +129,10 @@ static void iofd_uring_submit_recv(struct osmo_io_fd *iofd, enum iofd_msg_action
switch (action) {
case IOFD_ACT_READ:
break;
case IOFD_ACT_SCTP_RECVMSG:
msghdr->hdr.msg_control = msghdr->cmsg;
msghdr->hdr.msg_controllen = sizeof(msghdr->cmsg);
/* fall-through */
case IOFD_ACT_RECVFROM:
msghdr->hdr.msg_iov = &msghdr->iov[0];
msghdr->hdr.msg_iovlen = 1;
@ -146,6 +153,7 @@ static void iofd_uring_submit_recv(struct osmo_io_fd *iofd, enum iofd_msg_action
case IOFD_ACT_READ:
io_uring_prep_readv(sqe, iofd->fd, msghdr->iov, 1, 0);
break;
case IOFD_ACT_SCTP_RECVMSG:
case IOFD_ACT_RECVFROM:
io_uring_prep_recvmsg(sqe, iofd->fd, &msghdr->hdr, msghdr->flags);
break;
@ -210,10 +218,12 @@ static void iofd_uring_handle_completion(struct iofd_msghdr *msghdr, int res)
switch (msghdr->action) {
case IOFD_ACT_READ:
case IOFD_ACT_RECVFROM:
case IOFD_ACT_SCTP_RECVMSG:
iofd_uring_handle_recv(msghdr, res);
break;
case IOFD_ACT_WRITE:
case IOFD_ACT_SENDTO:
case IOFD_ACT_SCTP_SEND:
iofd_uring_handle_tx(msghdr, res);
break;
default:
@ -273,6 +283,7 @@ static int iofd_uring_submit_tx(struct osmo_io_fd *iofd)
switch (msghdr->action) {
case IOFD_ACT_WRITE:
case IOFD_ACT_SENDTO:
case IOFD_ACT_SCTP_SEND:
io_uring_prep_sendmsg(sqe, msghdr->iofd->fd, &msghdr->hdr, msghdr->flags);
break;
default:
@ -341,14 +352,14 @@ static void iofd_uring_write_enable(struct osmo_io_fd *iofd)
}
msghdr->iov[0].iov_base = msgb_data(msg);
msghdr->iov[0].iov_len = msgb_tailroom(msg);
msghdr->iov[0].iov_len = msgb_length(msg);
sqe = io_uring_get_sqe(&g_ring.ring);
if (!sqe) {
LOGPIO(iofd, LOGL_ERROR, "Could not get io_uring_sqe\n");
OSMO_ASSERT(0);
}
// Prep msgb/iov
io_uring_prep_writev(sqe, iofd->fd, msghdr->iov, 1, 0);
io_uring_sqe_set_data(sqe, msghdr);
@ -376,6 +387,9 @@ static void iofd_uring_read_enable(struct osmo_io_fd *iofd)
case OSMO_IO_FD_MODE_RECVFROM_SENDTO:
iofd_uring_submit_recv(iofd, IOFD_ACT_RECVFROM);
break;
case OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND:
iofd_uring_submit_recv(iofd, IOFD_ACT_SCTP_RECVMSG);
break;
default:
OSMO_ASSERT(0);
}