osmo_io: Factor out and use common send function from backend

This handles reenqueuing a message on EAGAIN and incomplete write

Change-Id: I6da2653d32aedd0e7872be0cf90a841b56462e59
This commit is contained in:
Daniel Willmann 2023-11-21 10:17:00 +01:00
parent be3c38ca55
commit 84611881c9
4 changed files with 47 additions and 60 deletions

View File

@ -344,6 +344,46 @@ void iofd_handle_recv(struct osmo_io_fd *iofd, struct msgb *msg, int rc, struct
}
}
/*! completion handler: Calld by osmo_io backend after a given I/O operation has completed
* \param[in] iofd I/O file-descriptor on which I/O has completed
* \param[in] rc return value of the I/O operation
* \param[in] msghdr serialized msghdr containing state of completed I/O
*/
void iofd_handle_send_completion(struct osmo_io_fd *iofd, int rc, struct iofd_msghdr *msghdr)
{
struct msgb *msg = msghdr->msg;
/* Incomplete write */
if (rc > 0 && rc < msgb_length(msg)) {
/* Re-enqueue remaining data */
msgb_pull(msg, rc);
msghdr->iov[0].iov_len = msgb_length(msg);
iofd_txqueue_enqueue_front(iofd, msghdr);
return;
}
/* Reenqueue the complete msgb */
if (rc == -EAGAIN) {
iofd_txqueue_enqueue_front(iofd, msghdr);
return;
}
/* All other failure and success cases are handled here */
switch (msghdr->action) {
case IOFD_ACT_WRITE:
iofd->io_ops.write_cb(iofd, rc, msg);
break;
case IOFD_ACT_SENDTO:
iofd->io_ops.sendto_cb(iofd, rc, msg, &msghdr->osa);
break;
default:
OSMO_ASSERT(0);
}
msgb_free(msghdr->msg);
iofd_msghdr_free(msghdr);
}
/* Public functions */
/*! Send a message through a connected socket.

View File

@ -146,6 +146,7 @@ struct msgb *iofd_msgb_pending(struct osmo_io_fd *iofd);
struct msgb *iofd_msgb_pending_or_alloc(struct osmo_io_fd *iofd);
void iofd_handle_recv(struct osmo_io_fd *iofd, struct msgb *msg, int rc, struct iofd_msghdr *msghdr);
void iofd_handle_send_completion(struct osmo_io_fd *iofd, int rc, struct iofd_msghdr *msghdr);
void iofd_handle_segmented_read(struct osmo_io_fd *iofd, struct msgb *msg, int rc);
int iofd_txqueue_enqueue(struct osmo_io_fd *iofd, struct iofd_msghdr *msghdr);

View File

@ -78,33 +78,8 @@ static void iofd_poll_ofd_cb_recvmsg_sendmsg(struct osmo_fd *ofd, unsigned int w
if (what & OSMO_FD_WRITE) {
struct iofd_msghdr *msghdr = iofd_txqueue_dequeue(iofd);
if (msghdr) {
msg = msghdr->msg;
rc = sendmsg(ofd->fd, &msghdr->hdr, msghdr->flags);
if (rc > 0 && rc < msgb_length(msg)) {
msgb_pull(msg, rc);
iofd_txqueue_enqueue_front(iofd, msghdr);
return;
}
if (rc == -EAGAIN) {
iofd_txqueue_enqueue_front(iofd, msghdr);
return;
}
switch (iofd->mode) {
case OSMO_IO_FD_MODE_READ_WRITE:
iofd->io_ops.write_cb(iofd, rc, msg);
break;
case OSMO_IO_FD_MODE_RECVFROM_SENDTO:
iofd->io_ops.sendto_cb(iofd, rc, msg, &msghdr->osa);
break;
case OSMO_IO_FD_MODE_SCTP_RECVMSG_SEND:
OSMO_ASSERT(false);
break;
}
talloc_free(msghdr);
msgb_free(msg);
iofd_handle_send_completion(iofd, rc, msghdr);
} else {
if (iofd->mode == OSMO_IO_FD_MODE_READ_WRITE)
/* Socket is writable, but we have no data to send. A non-blocking/async

View File

@ -186,43 +186,14 @@ static int iofd_uring_submit_tx(struct osmo_io_fd *iofd);
static void iofd_uring_handle_tx(struct iofd_msghdr *msghdr, int rc)
{
struct osmo_io_fd *iofd = msghdr->iofd;
struct msgb *msg = msghdr->msg;
if (IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))
goto out_free;
/* Error during write */
if (rc < 0) {
if (msghdr->action == IOFD_ACT_WRITE)
iofd->io_ops.write_cb(iofd, rc, msg);
else if (msghdr->action == IOFD_ACT_SENDTO)
iofd->io_ops.sendto_cb(iofd, rc, msg, &msghdr->osa);
else
OSMO_ASSERT(0);
goto out_free;
if (OSMO_UNLIKELY(IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))) {
msgb_free(msghdr->msg);
iofd_msghdr_free(msghdr);
} else {
iofd_handle_send_completion(iofd, rc, msghdr);
}
/* Incomplete write */
if (rc < msgb_length(msg)) {
/* Re-enqueue remaining data */
msgb_pull(msg, rc);
msghdr->iov[0].iov_len = msgb_length(msg);
iofd_txqueue_enqueue_front(iofd, msghdr);
goto out;
}
if (msghdr->action == IOFD_ACT_WRITE)
iofd->io_ops.write_cb(iofd, rc, msg);
else if (msghdr->action == IOFD_ACT_SENDTO)
iofd->io_ops.sendto_cb(iofd, rc, msg, &msghdr->osa);
else
OSMO_ASSERT(0);
out_free:
msgb_free(msghdr->msg);
iofd_msghdr_free(msghdr);
out:
iofd->u.uring.write_msghdr = NULL;
/* submit the next to-be-transmitted message for this file descriptor */
if (iofd->u.uring.write_enabled && !IOFD_FLAG_ISSET(iofd, IOFD_FLAG_CLOSED))