From e57ed39fbc55994471b6e3815fa6f4dfc928d9a6 Mon Sep 17 00:00:00 2001 From: Neels Janosch Hofmeyr Date: Thu, 6 Apr 2023 04:53:30 +0200 Subject: [PATCH] wip: write queue Change-Id: Ic058e0b1a8086075c899e197066b408a00a7e4d8 --- src/libosmo-pfcp/pfcp_endpoint.c | 145 +++++++++++++++++++------------ 1 file changed, 91 insertions(+), 54 deletions(-) diff --git a/src/libosmo-pfcp/pfcp_endpoint.c b/src/libosmo-pfcp/pfcp_endpoint.c index 7e08d8e..ba4d999 100644 --- a/src/libosmo-pfcp/pfcp_endpoint.c +++ b/src/libosmo-pfcp/pfcp_endpoint.c @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -37,7 +38,7 @@ struct osmo_pfcp_endpoint { struct osmo_pfcp_endpoint_cfg cfg; /* PFCP socket */ - struct osmo_fd pfcp_fd; + struct osmo_wqueue wq; /* The time at which this endpoint last restarted, as seconds since unix epoch. */ uint32_t recovery_time_stamp; @@ -145,7 +146,9 @@ struct osmo_pfcp_endpoint *osmo_pfcp_endpoint_create(void *ctx, const struct osm INIT_LLIST_HEAD(&ep->sent_requests); INIT_LLIST_HEAD(&ep->sent_responses); - ep->pfcp_fd.fd = -1; + /* proper init happens in osmo_pfcp_endpoint_bind() */ + osmo_wqueue_init(&ep->wq, 0); + ep->wq.bfd.fd = -1; /* time() returns seconds since 1970 (UNIX epoch), but the recovery_time_stamp is coded in the NTP format, which is * seconds since 1900, the NTP era 0. 2208988800L is the offset between UNIX epoch and NTP era 0. @@ -241,25 +244,42 @@ static void pfcp_queue_sent_resp_timer_cb(void *data) static int osmo_pfcp_endpoint_tx_data_no_logging(struct osmo_pfcp_endpoint *ep, struct osmo_pfcp_msg *m) { int rc; + struct msgb *msg; - if (!m->encoded) { - /* Allocate msgb as child of the message m, so that when m gets deallocated at the end of - * retransmission queueing, the msgb gets deallocated with it. */ - m->encoded = msgb_alloc_c(m, OSMO_PFCP_MSGB_ALLOC_SIZE, "PFCP-tx"); - OSMO_ASSERT(m->encoded); - rc = osmo_pfcp_msg_encode(m->encoded, m); + if (m->encoded) { + /* Snatch the already encoded message from osmo_pfcp_msg. + * This is a tradeoff decision: + * The osmo_pfcp_msg may be queued and retransmitted, and we'd still need the encoded msgb then. + * If we steal the msgb from the osmo_pfcp_msg, it has to call the encoding again on retransmission. + * If we don't steal the msgb, we need have to make a copy of it for every transmission. + * Let's not optimize the retransmission case, but rather the common successful case. + */ + msg = m->encoded; + m->encoded = NULL; + /* Make sure the msg in the write queue doesn't get deallocated along with the osmo_pfcp_msg */ + talloc_steal(ep, msg); + } else { + /* Allocate new msgb for the write queue. */ + msg = msgb_alloc_c(ep, OSMO_PFCP_MSGB_ALLOC_SIZE, "PFCP-tx"); + OSMO_ASSERT(msg); + rc = osmo_pfcp_msg_encode(msg, m); if (rc) { - msgb_free(m->encoded); - m->encoded = NULL; + msgb_free(msg); return rc; } } - rc = sendto(ep->pfcp_fd.fd, msgb_data(m->encoded), msgb_length(m->encoded), 0, - (struct sockaddr *)&m->remote_addr, sizeof(m->remote_addr)); - if (rc != msgb_length(m->encoded)) { - OSMO_LOG_PFCP_MSG(m, LOGL_ERROR, "sendto() failed: rc = %d != length %u\n", - rc, msgb_length(m->encoded)); + /* pass on the destination address; hope dearly that the PFCP retransmission queue keeps the pointer valid long + * enough (TEMPORARy hACK)*/ + *(void**)msg->cb = (void*)&m->remote_addr; + + rc = osmo_wqueue_enqueue(&ep->wq, msg); + if (rc) { + OSMO_LOG_PFCP_MSG(m, LOGL_ERROR, "failed to add PFCP message to write queue (rc=%d, wq.len=%u)\n", + rc, ep->wq.current_length); + /* Free msgb: instead, just tag it back to the osmo_pfcp_msg */ + m->encoded = msg; + talloc_steal(m, msg); return -EIO; } return 0; @@ -420,50 +440,64 @@ static void osmo_pfcp_endpoint_handle_rx(struct osmo_pfcp_endpoint *ep, struct o } /* call-back for PFCP socket file descriptor */ -static int osmo_pfcp_fd_cb(struct osmo_fd *ofd, unsigned int what) +static int osmo_pfcp_fd_read_cb(struct osmo_fd *ofd) { int rc; struct osmo_pfcp_endpoint *ep = ofd->data; + struct osmo_sockaddr remote; + socklen_t remote_len = sizeof(remote); + struct msgb *msg = msgb_alloc_c(OTC_SELECT, OSMO_PFCP_MSGB_ALLOC_SIZE, "PFCP-rx"); + if (!msg) + return -ENOMEM; - if (what & OSMO_FD_READ) { - struct osmo_sockaddr remote; - socklen_t remote_len = sizeof(remote); - struct msgb *msg = msgb_alloc_c(OTC_SELECT, OSMO_PFCP_MSGB_ALLOC_SIZE, "PFCP-rx"); - if (!msg) - return -ENOMEM; + msg->l3h = msg->tail; + rc = recvfrom(ofd->fd, msg->tail, msgb_tailroom(msg), 0, (struct sockaddr *)&remote, &remote_len); + if (rc <= 0) + return -EIO; + msgb_put(msg, rc); - msg->l3h = msg->tail; - rc = recvfrom(ofd->fd, msg->tail, msgb_tailroom(msg), 0, (struct sockaddr *)&remote, &remote_len); - if (rc <= 0) - return -EIO; - msgb_put(msg, rc); + OSMO_ASSERT(ep->cfg.rx_msg_cb); - OSMO_ASSERT(ep->cfg.rx_msg_cb); + /* This may be a bundle of PFCP messages. Parse and receive each message received, by shifting l4h + * through the message bundle. */ + msg->l4h = msg->l3h; + while (msgb_l4len(msg)) { + struct osmo_gtlv_load tlv; + struct osmo_pfcp_msg *m = osmo_pfcp_msg_alloc_rx(OTC_SELECT, &remote); + m->encoded = msg; - /* This may be a bundle of PFCP messages. Parse and receive each message received, by shifting l4h - * through the message bundle. */ - msg->l4h = msg->l3h; - while (msgb_l4len(msg)) { - struct osmo_gtlv_load tlv; - struct osmo_pfcp_msg *m = osmo_pfcp_msg_alloc_rx(OTC_SELECT, &remote); - m->encoded = msg; + rc = osmo_pfcp_msg_decode_header(&tlv, m, msg); + if (rc < 0) + break; + msg->l4h += rc; - rc = osmo_pfcp_msg_decode_header(&tlv, m, msg); - if (rc < 0) - break; - msg->l4h += rc; - - rc = osmo_pfcp_msg_decode_tlv(m, &tlv); - /* If errors occurred, they have already been logged on DLPFCP. */ - if (rc == 0) - osmo_pfcp_endpoint_handle_rx(ep, m); - osmo_pfcp_msg_free(m); - } - msgb_free(msg); + rc = osmo_pfcp_msg_decode_tlv(m, &tlv); + /* If errors occurred, they have already been logged on DLPFCP. */ + if (rc == 0) + osmo_pfcp_endpoint_handle_rx(ep, m); + osmo_pfcp_msg_free(m); } + msgb_free(msg); return 0; } +static int osmo_pfcp_fd_write_cb(struct osmo_fd *bfd, struct msgb *msg) +{ + int rc; + const int limit = 42; + struct sockaddr *dest_addr = *(void**)msg->cb; + + rc = sendto(bfd->fd, msgb_data(msg), msgb_length(msg), 0, + dest_addr, sizeof(*dest_addr)); + if (rc != msg->len) + LOGP(DLPFCP, LOGL_ERROR, "Failed to write to PFCP fd: %s: %d='%s'; msg: len=%u %s%s\n", + osmo_sock_get_name2(bfd->fd), errno, strerror(errno), + msg->len, + osmo_hexdump_nospc(msg->data, OSMO_MIN(limit, msg->len)), + limit < msg->len ? "..." : ""); + return rc; +} + /*! bind a PFCP endpoint to its configured address (ep->cfg.local_addr). * \return 0 on success, negative on error. */ int osmo_pfcp_endpoint_bind(struct osmo_pfcp_endpoint *ep) @@ -478,9 +512,12 @@ int osmo_pfcp_endpoint_bind(struct osmo_pfcp_endpoint *ep) } /* create the new socket, binding to configured local address */ - ep->pfcp_fd.cb = osmo_pfcp_fd_cb; - ep->pfcp_fd.data = ep; - rc = osmo_sock_init_osa_ofd(&ep->pfcp_fd, SOCK_DGRAM, IPPROTO_UDP, &ep->cfg.local_addr, NULL, OSMO_SOCK_F_BIND); + osmo_wqueue_init(&ep->wq, 10000); + ep->wq.read_cb = osmo_pfcp_fd_read_cb; + ep->wq.write_cb = osmo_pfcp_fd_write_cb; + osmo_fd_setup(&ep->wq.bfd, -1, OSMO_FD_READ, osmo_wqueue_bfd_cb, ep, 0); + + rc = osmo_sock_init_osa_ofd(&ep->wq.bfd, SOCK_DGRAM, IPPROTO_UDP, &ep->cfg.local_addr, NULL, OSMO_SOCK_F_BIND); if (rc < 0) return rc; return 0; @@ -494,10 +531,10 @@ void osmo_pfcp_endpoint_close(struct osmo_pfcp_endpoint *ep) while ((qe = llist_first_entry_or_null(&ep->sent_responses, struct osmo_pfcp_queue_entry, entry))) osmo_pfcp_queue_del(qe); - if (ep->pfcp_fd.fd != -1) { - osmo_fd_unregister(&ep->pfcp_fd); - close(ep->pfcp_fd.fd); - ep->pfcp_fd.fd = -1; + if (ep->wq.bfd.fd != -1) { + osmo_fd_unregister(&ep->wq.bfd); + close(ep->wq.bfd.fd); + ep->wq.bfd.fd = -1; } }