mirror of https://gerrit.osmocom.org/libosmocore
osmo_io: SCTP support
Add support osmo_io operations resembling sctp_send() and sctp_recvmsg() that is provided by libsctp of lk-sctp. Change-Id: I89eb519b22d21011d61a7855b2364bc3c295df82 Related: OS#5751
This commit is contained in:
parent
8dce0e95c5
commit
9269f20fc7
|
@ -14,6 +14,16 @@
|
|||
#define LOGPIO(iofd, level, fmt, args...) \
|
||||
LOGP(DLIO, level, "iofd(%s)" fmt, iofd->name, ## args)
|
||||
|
||||
/*! \brief Access SCTP flags from the msgb control buffer */
|
||||
#define OSMO_STREAM_SCTP_MSG_FLAGS_NOTIFICATION 0x80 /* sctp_recvmsg() flags=MSG_NOTIFICATION, msgb_data() contains "union sctp_notification*" */
|
||||
#define msgb_sctp_msg_flags(msg) (msg)->cb[2]
|
||||
|
||||
/*! \brief Access the SCTP PPID from the msgb control buffer */
|
||||
#define msgb_sctp_ppid(msg) (msg)->cb[3]
|
||||
/*! \brief Access the SCTP Stream ID from the msgb control buffer */
|
||||
#define msgb_sctp_stream(msg) (msg)->cb[4]
|
||||
|
||||
struct sctp_sndrcvinfo;
|
||||
struct osmo_io_fd;
|
||||
|
||||
enum osmo_io_fd_mode {
|
||||
|
@ -36,7 +46,7 @@ static inline const char *osmo_io_backend_name(enum osmo_io_backend val)
|
|||
|
||||
struct osmo_io_ops {
|
||||
union {
|
||||
/* mode OSMO_IO_FD_MODE_READ_WRITE: */
|
||||
/* mode OSMO_IO_FD_MODE_READ_WRITE and OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND: */
|
||||
struct {
|
||||
/*! call-back function when something was read from fd */
|
||||
void (*read_cb)(struct osmo_io_fd *iofd, int res, struct msgb *msg);
|
||||
|
@ -85,6 +95,7 @@ void osmo_iofd_notify_connected(struct osmo_io_fd *iofd);
|
|||
int osmo_iofd_write_msgb(struct osmo_io_fd *iofd, struct msgb *msg);
|
||||
int osmo_iofd_sendto_msgb(struct osmo_io_fd *iofd, struct msgb *msg, int sendto_flags,
|
||||
const struct osmo_sockaddr *dest);
|
||||
int osmo_iofd_sctp_send_msgb(struct osmo_io_fd *iofd, struct msgb *msg, int sendmsg_flags);
|
||||
|
||||
void osmo_iofd_set_alloc_info(struct osmo_io_fd *iofd, unsigned int size, unsigned int headroom);
|
||||
void osmo_iofd_set_txqueue_max_length(struct osmo_io_fd *iofd, unsigned int size);
|
||||
|
|
|
@ -266,6 +266,7 @@ osmo_iofd_init;
|
|||
osmo_iofd_ops;
|
||||
osmo_iofd_register;
|
||||
osmo_iofd_sendto_msgb;
|
||||
osmo_iofd_sctp_send_msgb;
|
||||
osmo_iofd_set_alloc_info;
|
||||
osmo_iofd_set_data;
|
||||
osmo_iofd_set_ioops;
|
||||
|
|
|
@ -329,6 +329,8 @@ void iofd_handle_segmented_read(struct osmo_io_fd *iofd, struct msgb *msg, int r
|
|||
* \param[in] hdr serialized msghdr containing state of completed I/O */
|
||||
void iofd_handle_recv(struct osmo_io_fd *iofd, struct msgb *msg, int rc, struct iofd_msghdr *hdr)
|
||||
{
|
||||
struct cmsghdr *cmsg = NULL;
|
||||
|
||||
talloc_steal(iofd->msgb_alloc.ctx, msg);
|
||||
switch (iofd->mode) {
|
||||
case OSMO_IO_FD_MODE_READ_WRITE:
|
||||
|
@ -338,7 +340,25 @@ void iofd_handle_recv(struct osmo_io_fd *iofd, struct msgb *msg, int rc, struct
|
|||
iofd->io_ops.recvfrom_cb(iofd, rc, msg, &hdr->osa);
|
||||
break;
|
||||
case OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND:
|
||||
/* TODO Implement */
|
||||
msgb_sctp_msg_flags(msg) = 0;
|
||||
if (hdr->hdr.msg_flags & MSG_NOTIFICATION) {
|
||||
msgb_sctp_msg_flags(msg) = OSMO_STREAM_SCTP_MSG_FLAGS_NOTIFICATION;
|
||||
} else {
|
||||
for (cmsg = CMSG_FIRSTHDR(&hdr->hdr); cmsg != NULL;
|
||||
cmsg = CMSG_NXTHDR(&hdr->hdr, cmsg)) {
|
||||
if (cmsg->cmsg_level == IPPROTO_SCTP && cmsg->cmsg_type == SCTP_SNDRCV) {
|
||||
struct sctp_sndrcvinfo *sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
|
||||
msgb_sctp_ppid(msg) = htonl(sinfo->sinfo_ppid);
|
||||
msgb_sctp_stream(msg) = sinfo->sinfo_stream;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (rc > 0 && !cmsg)
|
||||
LOGPIO(iofd, LOGL_ERROR, "sctp_recvmsg without SNDRCV cmsg?!?\n");
|
||||
}
|
||||
iofd->io_ops.read_cb(iofd, rc, msg);
|
||||
break;
|
||||
default:
|
||||
OSMO_ASSERT(false);
|
||||
break;
|
||||
}
|
||||
|
@ -371,6 +391,7 @@ void iofd_handle_send_completion(struct osmo_io_fd *iofd, int rc, struct iofd_ms
|
|||
/* All other failure and success cases are handled here */
|
||||
switch (msghdr->action) {
|
||||
case IOFD_ACT_WRITE:
|
||||
case IOFD_ACT_SCTP_SEND:
|
||||
iofd->io_ops.write_cb(iofd, rc, msg);
|
||||
break;
|
||||
case IOFD_ACT_SENDTO:
|
||||
|
@ -473,6 +494,63 @@ int osmo_iofd_sendto_msgb(struct osmo_io_fd *iofd, struct msgb *msg, int sendto_
|
|||
return 0;
|
||||
}
|
||||
|
||||
/*! Send a message through a connected SCTP socket, similar to sctp_sendmsg().
|
||||
*
|
||||
* Appends the message to the internal transmit queue.
|
||||
* If the function returns success (0), it will take ownership of the msgb and
|
||||
* internally call msgb_free() after the write request completes.
|
||||
* In case of an error the msgb needs to be freed by the caller.
|
||||
* \param[in] iofd file descriptor to write to
|
||||
* \param[in] msg message buffer to send; uses msgb_sctp_ppid/msg_sctp_stream
|
||||
* \param[in] sendmsg_flags Flags to pass to the send call
|
||||
* \returns 0 in case of success; a negative value in case of error
|
||||
*/
|
||||
int osmo_iofd_sctp_send_msgb(struct osmo_io_fd *iofd, struct msgb *msg, int sendmsg_flags)
|
||||
{
|
||||
int rc;
|
||||
struct cmsghdr *cmsg;
|
||||
struct sctp_sndrcvinfo *sinfo;
|
||||
|
||||
OSMO_ASSERT(iofd->mode == OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND);
|
||||
if (OSMO_UNLIKELY(!iofd->io_ops.write_cb)) {
|
||||
LOGPIO(iofd, LOGL_ERROR, "write_cb not set, Rejecting msgb\n");
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
struct iofd_msghdr *msghdr = iofd_msghdr_alloc(iofd, IOFD_ACT_SCTP_SEND, msg);
|
||||
if (!msghdr)
|
||||
return -ENOMEM;
|
||||
|
||||
msghdr->hdr.msg_flags = sendmsg_flags;
|
||||
msghdr->iov[0].iov_base = msgb_data(msghdr->msg);
|
||||
msghdr->iov[0].iov_len = msgb_length(msghdr->msg);
|
||||
msghdr->hdr.msg_iov = &msghdr->iov[0];
|
||||
msghdr->hdr.msg_iovlen = 1;
|
||||
|
||||
/* put together sctp_sndrcvinfo, just like libsctp's sctp_sensmsg() */
|
||||
msghdr->hdr.msg_control = msghdr->cmsg;
|
||||
msghdr->hdr.msg_controllen = sizeof(msghdr->cmsg);
|
||||
|
||||
cmsg = CMSG_FIRSTHDR(&msghdr->hdr);
|
||||
cmsg->cmsg_level = IPPROTO_SCTP;
|
||||
cmsg->cmsg_type = SCTP_SNDRCV;
|
||||
cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
|
||||
msghdr->hdr.msg_controllen = cmsg->cmsg_len;
|
||||
sinfo = (struct sctp_sndrcvinfo *) CMSG_DATA(cmsg);
|
||||
sinfo->sinfo_ppid = htonl(msgb_sctp_ppid(msg));
|
||||
sinfo->sinfo_stream = msgb_sctp_stream(msg);
|
||||
|
||||
rc = iofd_txqueue_enqueue(iofd, msghdr);
|
||||
if (rc < 0) {
|
||||
iofd_msghdr_free(msghdr);
|
||||
LOGPIO(iofd, LOGL_ERROR, "enqueueing message failed (%d). Rejecting msgb\n", rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
||||
}
|
||||
|
||||
/*! Allocate and setup a new iofd.
|
||||
* \param[in] ctx the parent context from which to allocate
|
||||
* \param[in] fd the underlying system file descriptor
|
||||
|
@ -491,6 +569,7 @@ struct osmo_io_fd *osmo_iofd_setup(const void *ctx, int fd, const char *name, en
|
|||
switch (mode) {
|
||||
case OSMO_IO_FD_MODE_READ_WRITE:
|
||||
case OSMO_IO_FD_MODE_RECVFROM_SENDTO:
|
||||
case OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND:
|
||||
break;
|
||||
default:
|
||||
return NULL;
|
||||
|
@ -728,6 +807,7 @@ void osmo_iofd_set_ioops(struct osmo_io_fd *iofd, const struct osmo_io_ops *ioop
|
|||
|
||||
switch (iofd->mode) {
|
||||
case OSMO_IO_FD_MODE_READ_WRITE:
|
||||
case OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND:
|
||||
if (iofd->io_ops.read_cb)
|
||||
osmo_iofd_ops.read_enable(iofd);
|
||||
else
|
||||
|
@ -739,7 +819,6 @@ void osmo_iofd_set_ioops(struct osmo_io_fd *iofd, const struct osmo_io_ops *ioop
|
|||
else
|
||||
osmo_iofd_ops.read_disable(iofd);
|
||||
break;
|
||||
case OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND:
|
||||
default:
|
||||
OSMO_ASSERT(0);
|
||||
}
|
||||
|
@ -750,7 +829,8 @@ void osmo_iofd_set_ioops(struct osmo_io_fd *iofd, const struct osmo_io_ops *ioop
|
|||
* \param[in] iofd the file descriptor */
|
||||
void osmo_iofd_notify_connected(struct osmo_io_fd *iofd)
|
||||
{
|
||||
OSMO_ASSERT(iofd->mode == OSMO_IO_FD_MODE_READ_WRITE);
|
||||
OSMO_ASSERT(iofd->mode == OSMO_IO_FD_MODE_READ_WRITE ||
|
||||
iofd->mode == OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND);
|
||||
OSMO_ASSERT(osmo_iofd_ops.notify_connected);
|
||||
osmo_iofd_ops.notify_connected(iofd);
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
|
||||
#include <unistd.h>
|
||||
#include <stdbool.h>
|
||||
#include <netinet/sctp.h>
|
||||
|
||||
#include <osmocom/core/osmo_io.h>
|
||||
#include <osmocom/core/linuxlist.h>
|
||||
|
@ -109,7 +110,8 @@ enum iofd_msg_action {
|
|||
IOFD_ACT_WRITE,
|
||||
IOFD_ACT_RECVFROM,
|
||||
IOFD_ACT_SENDTO,
|
||||
// TODO: SCTP_*
|
||||
IOFD_ACT_SCTP_RECVMSG,
|
||||
IOFD_ACT_SCTP_SEND,
|
||||
};
|
||||
|
||||
|
||||
|
@ -125,6 +127,8 @@ struct iofd_msghdr {
|
|||
/*! io-vector we need to pass as argument to sendmsg/recvmsg; is set up
|
||||
* to point into msg below */
|
||||
struct iovec iov[1];
|
||||
/*! control message buffer for passing sctp_sndrcvinfo along */
|
||||
char cmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
|
||||
/*! flags we pass as argument to sendmsg / recvmsg */
|
||||
int flags;
|
||||
|
||||
|
|
|
@ -64,6 +64,10 @@ static void iofd_poll_ofd_cb_recvmsg_sendmsg(struct osmo_fd *ofd, unsigned int w
|
|||
.msg_name = &hdr.osa.u.sa,
|
||||
.msg_namelen = sizeof(struct osmo_sockaddr),
|
||||
};
|
||||
if (iofd->mode == OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND) {
|
||||
hdr.hdr.msg_control = hdr.cmsg;
|
||||
hdr.hdr.msg_controllen = sizeof(hdr.cmsg);
|
||||
}
|
||||
|
||||
rc = recvmsg(ofd->fd, &hdr.hdr, flags);
|
||||
if (rc > 0)
|
||||
|
@ -81,10 +85,16 @@ static void iofd_poll_ofd_cb_recvmsg_sendmsg(struct osmo_fd *ofd, unsigned int w
|
|||
rc = sendmsg(ofd->fd, &msghdr->hdr, msghdr->flags);
|
||||
iofd_handle_send_completion(iofd, rc, msghdr);
|
||||
} else {
|
||||
if (iofd->mode == OSMO_IO_FD_MODE_READ_WRITE)
|
||||
switch (iofd->mode) {
|
||||
case OSMO_IO_FD_MODE_READ_WRITE:
|
||||
case OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND:
|
||||
/* Socket is writable, but we have no data to send. A non-blocking/async
|
||||
connect() is signalled this way. */
|
||||
iofd->io_ops.write_cb(iofd, 0, NULL);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
if (osmo_iofd_txqueue_len(iofd) == 0)
|
||||
iofd_poll_ops.write_disable(iofd);
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
*
|
||||
* (C) 2022-2023 by sysmocom s.f.m.c.
|
||||
* Author: Daniel Willmann <daniel@sysmocom.de>
|
||||
* (C) 2023 by Harald Welte <laforge@osmocom.org>
|
||||
*
|
||||
* All Rights Reserved.
|
||||
*
|
||||
|
@ -35,6 +36,8 @@
|
|||
#include <stdbool.h>
|
||||
#include <errno.h>
|
||||
|
||||
#include <netinet/in.h>
|
||||
#include <netinet/sctp.h>
|
||||
#include <sys/eventfd.h>
|
||||
#include <liburing.h>
|
||||
|
||||
|
@ -126,6 +129,10 @@ static void iofd_uring_submit_recv(struct osmo_io_fd *iofd, enum iofd_msg_action
|
|||
switch (action) {
|
||||
case IOFD_ACT_READ:
|
||||
break;
|
||||
case IOFD_ACT_SCTP_RECVMSG:
|
||||
msghdr->hdr.msg_control = msghdr->cmsg;
|
||||
msghdr->hdr.msg_controllen = sizeof(msghdr->cmsg);
|
||||
/* fall-through */
|
||||
case IOFD_ACT_RECVFROM:
|
||||
msghdr->hdr.msg_iov = &msghdr->iov[0];
|
||||
msghdr->hdr.msg_iovlen = 1;
|
||||
|
@ -146,6 +153,7 @@ static void iofd_uring_submit_recv(struct osmo_io_fd *iofd, enum iofd_msg_action
|
|||
case IOFD_ACT_READ:
|
||||
io_uring_prep_readv(sqe, iofd->fd, msghdr->iov, 1, 0);
|
||||
break;
|
||||
case IOFD_ACT_SCTP_RECVMSG:
|
||||
case IOFD_ACT_RECVFROM:
|
||||
io_uring_prep_recvmsg(sqe, iofd->fd, &msghdr->hdr, msghdr->flags);
|
||||
break;
|
||||
|
@ -210,10 +218,12 @@ static void iofd_uring_handle_completion(struct iofd_msghdr *msghdr, int res)
|
|||
switch (msghdr->action) {
|
||||
case IOFD_ACT_READ:
|
||||
case IOFD_ACT_RECVFROM:
|
||||
case IOFD_ACT_SCTP_RECVMSG:
|
||||
iofd_uring_handle_recv(msghdr, res);
|
||||
break;
|
||||
case IOFD_ACT_WRITE:
|
||||
case IOFD_ACT_SENDTO:
|
||||
case IOFD_ACT_SCTP_SEND:
|
||||
iofd_uring_handle_tx(msghdr, res);
|
||||
break;
|
||||
default:
|
||||
|
@ -273,6 +283,7 @@ static int iofd_uring_submit_tx(struct osmo_io_fd *iofd)
|
|||
switch (msghdr->action) {
|
||||
case IOFD_ACT_WRITE:
|
||||
case IOFD_ACT_SENDTO:
|
||||
case IOFD_ACT_SCTP_SEND:
|
||||
io_uring_prep_sendmsg(sqe, msghdr->iofd->fd, &msghdr->hdr, msghdr->flags);
|
||||
break;
|
||||
default:
|
||||
|
@ -325,7 +336,8 @@ static void iofd_uring_write_enable(struct osmo_io_fd *iofd)
|
|||
|
||||
if (osmo_iofd_txqueue_len(iofd) > 0)
|
||||
iofd_uring_submit_tx(iofd);
|
||||
else if (iofd->mode == OSMO_IO_FD_MODE_READ_WRITE) {
|
||||
else if (iofd->mode == OSMO_IO_FD_MODE_READ_WRITE ||
|
||||
iofd->mode == OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND) {
|
||||
/* Empty write request to check when the socket is connected */
|
||||
struct iofd_msghdr *msghdr;
|
||||
struct io_uring_sqe *sqe;
|
||||
|
@ -341,7 +353,7 @@ static void iofd_uring_write_enable(struct osmo_io_fd *iofd)
|
|||
}
|
||||
|
||||
msghdr->iov[0].iov_base = msgb_data(msg);
|
||||
msghdr->iov[0].iov_len = msgb_tailroom(msg);
|
||||
msghdr->iov[0].iov_len = msgb_length(msg);
|
||||
|
||||
sqe = io_uring_get_sqe(&g_ring.ring);
|
||||
if (!sqe) {
|
||||
|
@ -349,6 +361,12 @@ static void iofd_uring_write_enable(struct osmo_io_fd *iofd)
|
|||
OSMO_ASSERT(0);
|
||||
}
|
||||
// Prep msgb/iov
|
||||
if (iofd->mode == OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND) {
|
||||
msghdr->hdr.msg_flags = MSG_NOSIGNAL;
|
||||
msghdr->hdr.msg_iov = &msghdr->iov[0];
|
||||
msghdr->hdr.msg_iovlen = 1;
|
||||
io_uring_prep_sendmsg(sqe, iofd->fd, &msghdr->hdr, msghdr->flags);
|
||||
} else
|
||||
io_uring_prep_writev(sqe, iofd->fd, msghdr->iov, 1, 0);
|
||||
io_uring_sqe_set_data(sqe, msghdr);
|
||||
|
||||
|
@ -376,6 +394,9 @@ static void iofd_uring_read_enable(struct osmo_io_fd *iofd)
|
|||
case OSMO_IO_FD_MODE_RECVFROM_SENDTO:
|
||||
iofd_uring_submit_recv(iofd, IOFD_ACT_RECVFROM);
|
||||
break;
|
||||
case OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND:
|
||||
iofd_uring_submit_recv(iofd, IOFD_ACT_SCTP_RECVMSG);
|
||||
break;
|
||||
default:
|
||||
OSMO_ASSERT(0);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue