wip: write queue

Change-Id: Ic058e0b1a8086075c899e197066b408a00a7e4d8
This commit is contained in:
Neels Hofmeyr 2023-04-06 04:53:30 +02:00
parent 4c8ab89921
commit e57ed39fbc
1 changed files with 91 additions and 54 deletions

View File

@ -28,6 +28,7 @@
#include <osmocom/core/talloc.h>
#include <osmocom/core/timer.h>
#include <osmocom/core/tdef.h>
#include <osmocom/core/write_queue.h>
#include <osmocom/pfcp/pfcp_endpoint.h>
#include <osmocom/pfcp/pfcp_msg.h>
@ -37,7 +38,7 @@ struct osmo_pfcp_endpoint {
struct osmo_pfcp_endpoint_cfg cfg;
/* PFCP socket */
struct osmo_fd pfcp_fd;
struct osmo_wqueue wq;
/* The time at which this endpoint last restarted, as seconds since unix epoch. */
uint32_t recovery_time_stamp;
@ -145,7 +146,9 @@ struct osmo_pfcp_endpoint *osmo_pfcp_endpoint_create(void *ctx, const struct osm
INIT_LLIST_HEAD(&ep->sent_requests);
INIT_LLIST_HEAD(&ep->sent_responses);
ep->pfcp_fd.fd = -1;
/* proper init happens in osmo_pfcp_endpoint_bind() */
osmo_wqueue_init(&ep->wq, 0);
ep->wq.bfd.fd = -1;
/* time() returns seconds since 1970 (UNIX epoch), but the recovery_time_stamp is coded in the NTP format, which is
* seconds since 1900, the NTP era 0. 2208988800L is the offset between UNIX epoch and NTP era 0.
@ -241,25 +244,42 @@ static void pfcp_queue_sent_resp_timer_cb(void *data)
static int osmo_pfcp_endpoint_tx_data_no_logging(struct osmo_pfcp_endpoint *ep, struct osmo_pfcp_msg *m)
{
int rc;
struct msgb *msg;
if (!m->encoded) {
/* Allocate msgb as child of the message m, so that when m gets deallocated at the end of
* retransmission queueing, the msgb gets deallocated with it. */
m->encoded = msgb_alloc_c(m, OSMO_PFCP_MSGB_ALLOC_SIZE, "PFCP-tx");
OSMO_ASSERT(m->encoded);
rc = osmo_pfcp_msg_encode(m->encoded, m);
if (m->encoded) {
/* Snatch the already encoded message from osmo_pfcp_msg.
* This is a tradeoff decision:
* The osmo_pfcp_msg may be queued and retransmitted, and we'd still need the encoded msgb then.
* If we steal the msgb from the osmo_pfcp_msg, it has to call the encoding again on retransmission.
* If we don't steal the msgb, we need have to make a copy of it for every transmission.
* Let's not optimize the retransmission case, but rather the common successful case.
*/
msg = m->encoded;
m->encoded = NULL;
/* Make sure the msg in the write queue doesn't get deallocated along with the osmo_pfcp_msg */
talloc_steal(ep, msg);
} else {
/* Allocate new msgb for the write queue. */
msg = msgb_alloc_c(ep, OSMO_PFCP_MSGB_ALLOC_SIZE, "PFCP-tx");
OSMO_ASSERT(msg);
rc = osmo_pfcp_msg_encode(msg, m);
if (rc) {
msgb_free(m->encoded);
m->encoded = NULL;
msgb_free(msg);
return rc;
}
}
rc = sendto(ep->pfcp_fd.fd, msgb_data(m->encoded), msgb_length(m->encoded), 0,
(struct sockaddr *)&m->remote_addr, sizeof(m->remote_addr));
if (rc != msgb_length(m->encoded)) {
OSMO_LOG_PFCP_MSG(m, LOGL_ERROR, "sendto() failed: rc = %d != length %u\n",
rc, msgb_length(m->encoded));
/* pass on the destination address; hope dearly that the PFCP retransmission queue keeps the pointer valid long
* enough (TEMPORARy hACK)*/
*(void**)msg->cb = (void*)&m->remote_addr;
rc = osmo_wqueue_enqueue(&ep->wq, msg);
if (rc) {
OSMO_LOG_PFCP_MSG(m, LOGL_ERROR, "failed to add PFCP message to write queue (rc=%d, wq.len=%u)\n",
rc, ep->wq.current_length);
/* Free msgb: instead, just tag it back to the osmo_pfcp_msg */
m->encoded = msg;
talloc_steal(m, msg);
return -EIO;
}
return 0;
@ -420,50 +440,64 @@ static void osmo_pfcp_endpoint_handle_rx(struct osmo_pfcp_endpoint *ep, struct o
}
/* call-back for PFCP socket file descriptor */
static int osmo_pfcp_fd_cb(struct osmo_fd *ofd, unsigned int what)
static int osmo_pfcp_fd_read_cb(struct osmo_fd *ofd)
{
int rc;
struct osmo_pfcp_endpoint *ep = ofd->data;
struct osmo_sockaddr remote;
socklen_t remote_len = sizeof(remote);
struct msgb *msg = msgb_alloc_c(OTC_SELECT, OSMO_PFCP_MSGB_ALLOC_SIZE, "PFCP-rx");
if (!msg)
return -ENOMEM;
if (what & OSMO_FD_READ) {
struct osmo_sockaddr remote;
socklen_t remote_len = sizeof(remote);
struct msgb *msg = msgb_alloc_c(OTC_SELECT, OSMO_PFCP_MSGB_ALLOC_SIZE, "PFCP-rx");
if (!msg)
return -ENOMEM;
msg->l3h = msg->tail;
rc = recvfrom(ofd->fd, msg->tail, msgb_tailroom(msg), 0, (struct sockaddr *)&remote, &remote_len);
if (rc <= 0)
return -EIO;
msgb_put(msg, rc);
msg->l3h = msg->tail;
rc = recvfrom(ofd->fd, msg->tail, msgb_tailroom(msg), 0, (struct sockaddr *)&remote, &remote_len);
if (rc <= 0)
return -EIO;
msgb_put(msg, rc);
OSMO_ASSERT(ep->cfg.rx_msg_cb);
OSMO_ASSERT(ep->cfg.rx_msg_cb);
/* This may be a bundle of PFCP messages. Parse and receive each message received, by shifting l4h
* through the message bundle. */
msg->l4h = msg->l3h;
while (msgb_l4len(msg)) {
struct osmo_gtlv_load tlv;
struct osmo_pfcp_msg *m = osmo_pfcp_msg_alloc_rx(OTC_SELECT, &remote);
m->encoded = msg;
/* This may be a bundle of PFCP messages. Parse and receive each message received, by shifting l4h
* through the message bundle. */
msg->l4h = msg->l3h;
while (msgb_l4len(msg)) {
struct osmo_gtlv_load tlv;
struct osmo_pfcp_msg *m = osmo_pfcp_msg_alloc_rx(OTC_SELECT, &remote);
m->encoded = msg;
rc = osmo_pfcp_msg_decode_header(&tlv, m, msg);
if (rc < 0)
break;
msg->l4h += rc;
rc = osmo_pfcp_msg_decode_header(&tlv, m, msg);
if (rc < 0)
break;
msg->l4h += rc;
rc = osmo_pfcp_msg_decode_tlv(m, &tlv);
/* If errors occurred, they have already been logged on DLPFCP. */
if (rc == 0)
osmo_pfcp_endpoint_handle_rx(ep, m);
osmo_pfcp_msg_free(m);
}
msgb_free(msg);
rc = osmo_pfcp_msg_decode_tlv(m, &tlv);
/* If errors occurred, they have already been logged on DLPFCP. */
if (rc == 0)
osmo_pfcp_endpoint_handle_rx(ep, m);
osmo_pfcp_msg_free(m);
}
msgb_free(msg);
return 0;
}
static int osmo_pfcp_fd_write_cb(struct osmo_fd *bfd, struct msgb *msg)
{
int rc;
const int limit = 42;
struct sockaddr *dest_addr = *(void**)msg->cb;
rc = sendto(bfd->fd, msgb_data(msg), msgb_length(msg), 0,
dest_addr, sizeof(*dest_addr));
if (rc != msg->len)
LOGP(DLPFCP, LOGL_ERROR, "Failed to write to PFCP fd: %s: %d='%s'; msg: len=%u %s%s\n",
osmo_sock_get_name2(bfd->fd), errno, strerror(errno),
msg->len,
osmo_hexdump_nospc(msg->data, OSMO_MIN(limit, msg->len)),
limit < msg->len ? "..." : "");
return rc;
}
/*! bind a PFCP endpoint to its configured address (ep->cfg.local_addr).
* \return 0 on success, negative on error. */
int osmo_pfcp_endpoint_bind(struct osmo_pfcp_endpoint *ep)
@ -478,9 +512,12 @@ int osmo_pfcp_endpoint_bind(struct osmo_pfcp_endpoint *ep)
}
/* create the new socket, binding to configured local address */
ep->pfcp_fd.cb = osmo_pfcp_fd_cb;
ep->pfcp_fd.data = ep;
rc = osmo_sock_init_osa_ofd(&ep->pfcp_fd, SOCK_DGRAM, IPPROTO_UDP, &ep->cfg.local_addr, NULL, OSMO_SOCK_F_BIND);
osmo_wqueue_init(&ep->wq, 10000);
ep->wq.read_cb = osmo_pfcp_fd_read_cb;
ep->wq.write_cb = osmo_pfcp_fd_write_cb;
osmo_fd_setup(&ep->wq.bfd, -1, OSMO_FD_READ, osmo_wqueue_bfd_cb, ep, 0);
rc = osmo_sock_init_osa_ofd(&ep->wq.bfd, SOCK_DGRAM, IPPROTO_UDP, &ep->cfg.local_addr, NULL, OSMO_SOCK_F_BIND);
if (rc < 0)
return rc;
return 0;
@ -494,10 +531,10 @@ void osmo_pfcp_endpoint_close(struct osmo_pfcp_endpoint *ep)
while ((qe = llist_first_entry_or_null(&ep->sent_responses, struct osmo_pfcp_queue_entry, entry)))
osmo_pfcp_queue_del(qe);
if (ep->pfcp_fd.fd != -1) {
osmo_fd_unregister(&ep->pfcp_fd);
close(ep->pfcp_fd.fd);
ep->pfcp_fd.fd = -1;
if (ep->wq.bfd.fd != -1) {
osmo_fd_unregister(&ep->wq.bfd);
close(ep->wq.bfd.fd);
ep->wq.bfd.fd = -1;
}
}