migrate mgcp_client from osmo_wqueue to osmo_io

The new osmo_io framework means that we can [optionally] make use
of the io_uring backend, which greatly reduces the syscall load
compared to the legacy osmo_wqueue + osmo_select_main + read/write.

We only use features already present in the intiial osmo_io support
of libosmocore 1.9.0, so no entry in TODO-RELEASE is needed.

Closes: OS#5754
Related: OS#5755
Change-Id: I766224da4691695c023d4d08d042a4bbeba05e47
This commit is contained in:
Harald Welte 2024-03-02 11:09:58 +01:00 committed by Harald Welte
parent 17b5701f19
commit 28fd236044
3 changed files with 57 additions and 55 deletions

View File

@ -1,6 +1,6 @@
#pragma once #pragma once
#include <osmocom/core/write_queue.h> #include <osmocom/core/osmo_io.h>
#include <osmocom/core/timer.h> #include <osmocom/core/timer.h>
#define MSGB_CB_MGCP_TRANS_ID 0 #define MSGB_CB_MGCP_TRANS_ID 0
@ -13,7 +13,7 @@ struct reset_ep {
struct mgcp_client { struct mgcp_client {
struct mgcp_client_conf actual; struct mgcp_client_conf actual;
struct osmo_wqueue wq; struct osmo_io_fd *iofd;
mgcp_trans_id_t next_trans_id; mgcp_trans_id_t next_trans_id;
struct llist_head responses_pending; struct llist_head responses_pending;
struct mgcp_client_pool_member *pool_member; struct mgcp_client_pool_member *pool_member;

View File

@ -728,7 +728,7 @@ static struct mgcp_response_pending *mgcp_client_response_pending_get(
/* Feed an MGCP message into the receive processing. /* Feed an MGCP message into the receive processing.
* Parse the head and call any callback registered for the transaction id found * Parse the head and call any callback registered for the transaction id found
* in the MGCP message. This is normally called directly from the internal * in the MGCP message. This is normally called directly from the internal
* mgcp_do_read that reads from the socket connected to the MGCP gateway. This * mgcp_read_cb that reads from the socket connected to the MGCP gateway. This
* function is published mainly to be able to feed data from the test suite. * function is published mainly to be able to feed data from the test suite.
*/ */
int mgcp_client_rx(struct mgcp_client *mgcp, struct msgb *msg) int mgcp_client_rx(struct mgcp_client *mgcp, struct msgb *msg)
@ -781,55 +781,54 @@ error:
return rc; return rc;
} }
static int mgcp_do_read(struct osmo_fd *fd) static void mgcp_read_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg)
{ {
struct mgcp_client *mgcp = fd->data; struct mgcp_client *mgcp = osmo_iofd_get_data(iofd);
struct msgb *msg;
int ret;
msg = msgb_alloc_headroom(4096, 128, "mgcp_from_gw"); if (res <= 0) {
if (!msg) {
LOGPMGW(mgcp, LOGL_ERROR, "Failed to allocate MGCP message.\n");
return -1;
}
/* msgb_tailroom() is basically (4096 - 128); -1 is for '\0' */
ret = read(fd->fd, msg->data, msgb_tailroom(msg) - 1);
if (ret <= 0) {
LOGPMGW(mgcp, LOGL_ERROR, "Failed to read: %s: %d='%s'\n", LOGPMGW(mgcp, LOGL_ERROR, "Failed to read: %s: %d='%s'\n",
osmo_sock_get_name2(fd->fd), errno, strerror(errno)); osmo_iofd_get_name(iofd), res, strerror(res));
msgb_free(msg); msgb_free(msg);
return -1; return;
} }
msg->l2h = msgb_put(msg, ret); msg->l2h = msg->head;
ret = mgcp_client_rx(mgcp, msg); mgcp_client_rx(mgcp, msg);
talloc_free(msg); talloc_free(msg);
return ret;
} }
static int mgcp_do_write(struct osmo_fd *fd, struct msgb *msg) static int mgcp_do_write(struct mgcp_client *mgcp, struct msgb *msg)
{ {
int ret; int ret;
struct mgcp_client *mgcp = fd->data;
LOGPMGW(mgcp, LOGL_DEBUG, "Tx MGCP: %s: len=%u '%s'...\n", LOGPMGW(mgcp, LOGL_DEBUG, "Tx MGCP: %s: len=%u '%s'...\n",
osmo_sock_get_name2(fd->fd), msg->len, osmo_iofd_get_name(mgcp->iofd), msg->len,
osmo_escape_str((const char *)msg->data, OSMO_MIN(42, msg->len))); osmo_escape_str((const char *)msg->data, OSMO_MIN(42, msg->len)));
ret = write(fd->fd, msg->data, msg->len); ret = osmo_iofd_write_msgb(mgcp->iofd, msg);
if (OSMO_UNLIKELY(ret != msg->len)) if (ret < 0)
LOGPMGW(mgcp, LOGL_ERROR, "Failed to Tx MGCP: %s: %d='%s'; msg: len=%u '%s'...\n", msgb_free(msg);
osmo_sock_get_name2(fd->fd), errno, strerror(errno),
msg->len, osmo_escape_str((const char *)msg->data, OSMO_MIN(42, msg->len)));
/* Re-arm the keepalive Tx timer: */ /* Re-arm the keepalive Tx timer: */
if (mgcp->actual.keepalive.req_interval_sec > 0) if (mgcp->actual.keepalive.req_interval_sec > 0)
osmo_timer_schedule(&mgcp->keepalive_tx_timer, mgcp->actual.keepalive.req_interval_sec, 0); osmo_timer_schedule(&mgcp->keepalive_tx_timer, mgcp->actual.keepalive.req_interval_sec, 0);
return ret; return ret;
} }
static void mgcp_write_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg)
{
struct mgcp_client *mgcp = osmo_iofd_get_data(iofd);
if (OSMO_UNLIKELY(res != msg->len)) {
LOGPMGW(mgcp, LOGL_ERROR, "Failed to Tx MGCP: %s: %d='%s'; msg: len=%u '%s'...\n",
osmo_iofd_get_name(mgcp->iofd), res, strerror(res),
msg->len, osmo_escape_str((const char *)msg->data, OSMO_MIN(42, msg->len)));
}
}
static const char *_mgcp_client_name_append_domain(const struct mgcp_client *mgcp, const char *name) static const char *_mgcp_client_name_append_domain(const struct mgcp_client *mgcp, const char *name)
{ {
static char endpoint[MGCP_ENDPOINT_MAXLEN]; static char endpoint[MGCP_ENDPOINT_MAXLEN];
@ -941,11 +940,6 @@ struct mgcp_client *mgcp_client_init(void *ctx,
if (conf->description) if (conf->description)
mgcp->actual.description = talloc_strdup(mgcp, conf->description); mgcp->actual.description = talloc_strdup(mgcp, conf->description);
osmo_wqueue_init(&mgcp->wq, 1024);
mgcp->wq.read_cb = mgcp_do_read;
mgcp->wq.write_cb = mgcp_do_write;
osmo_fd_setup(&mgcp->wq.bfd, -1, OSMO_FD_READ, osmo_wqueue_bfd_cb, mgcp, 0);
memcpy(&mgcp->actual.keepalive, &conf->keepalive, sizeof(conf->keepalive)); memcpy(&mgcp->actual.keepalive, &conf->keepalive, sizeof(conf->keepalive));
osmo_timer_setup(&mgcp->keepalive_tx_timer, mgcp_client_keepalive_tx_timer_cb, mgcp); osmo_timer_setup(&mgcp->keepalive_tx_timer, mgcp_client_keepalive_tx_timer_cb, mgcp);
osmo_timer_setup(&mgcp->keepalive_rx_timer, mgcp_client_keepalive_rx_timer_cb, mgcp); osmo_timer_setup(&mgcp->keepalive_rx_timer, mgcp_client_keepalive_rx_timer_cb, mgcp);
@ -953,6 +947,11 @@ struct mgcp_client *mgcp_client_init(void *ctx,
return mgcp; return mgcp;
} }
static const struct osmo_io_ops mgcp_clnt_ioops = {
.read_cb = mgcp_read_cb,
.write_cb = mgcp_write_cb,
};
/*! Initialize client connection (opens socket) /*! Initialize client connection (opens socket)
* \param[in,out] mgcp MGCP client descriptor. * \param[in,out] mgcp MGCP client descriptor.
* \returns 0 on success, -EINVAL on error. */ * \returns 0 on success, -EINVAL on error. */
@ -968,19 +967,28 @@ int mgcp_client_connect(struct mgcp_client *mgcp)
return -EINVAL; return -EINVAL;
} }
rc = osmo_sock_init2_ofd(&mgcp->wq.bfd, AF_UNSPEC, SOCK_DGRAM, IPPROTO_UDP, mgcp->actual.local_addr, rc = osmo_sock_init2(AF_UNSPEC, SOCK_DGRAM, IPPROTO_UDP, mgcp->actual.local_addr,
mgcp->actual.local_port, mgcp->actual.remote_addr, mgcp->actual.remote_port, mgcp->actual.local_port, mgcp->actual.remote_addr, mgcp->actual.remote_port,
OSMO_SOCK_F_BIND | OSMO_SOCK_F_CONNECT); OSMO_SOCK_F_BIND | OSMO_SOCK_F_CONNECT);
if (rc < 0) { if (rc < 0) {
LOGPMGW(mgcp, LOGL_FATAL, LOGPMGW(mgcp, LOGL_FATAL,
"Failed to initialize socket %s:%u -> %s:%u for MGW: %s\n", "Failed to initialize socket %s:%u -> %s:%u for MGW: %s\n",
mgcp->actual.local_addr ? mgcp->actual.local_addr : "(any)", mgcp->actual.local_port, mgcp->actual.local_addr ? mgcp->actual.local_addr : "(any)", mgcp->actual.local_port,
mgcp->actual.remote_addr ? mgcp->actual.local_addr : "(any)", mgcp->actual.remote_port, mgcp->actual.remote_addr ? mgcp->actual.local_addr : "(any)", mgcp->actual.remote_port,
strerror(errno)); strerror(errno));
goto error_close_fd; goto error_free;
} }
LOGPMGW(mgcp, LOGL_INFO, "MGW connection: %s\n", osmo_sock_get_name2(mgcp->wq.bfd.fd)); mgcp->iofd = osmo_iofd_setup(mgcp, rc, osmo_sock_get_name2(rc), OSMO_IO_FD_MODE_READ_WRITE,
&mgcp_clnt_ioops, mgcp);
if (!mgcp->iofd)
goto error_close_fd;
LOGPMGW(mgcp, LOGL_INFO, "MGW connection: %s\n", osmo_iofd_get_name(mgcp->iofd));
osmo_iofd_register(mgcp->iofd, -1);
osmo_iofd_set_alloc_info(mgcp->iofd, 4096, 128);
osmo_iofd_set_txqueue_max_length(mgcp->iofd, 1024);
/* If configured, send a DLCX message to the endpoints that are configured to /* If configured, send a DLCX message to the endpoints that are configured to
* be reset on startup. Usually this is a wildcarded endpoint. */ * be reset on startup. Usually this is a wildcarded endpoint. */
@ -1006,9 +1014,10 @@ int mgcp_client_connect(struct mgcp_client *mgcp)
osmo_timer_schedule(&mgcp->keepalive_rx_timer, mgcp->actual.keepalive.timeout_sec, 0); osmo_timer_schedule(&mgcp->keepalive_rx_timer, mgcp->actual.keepalive.timeout_sec, 0);
return 0; return 0;
error_close_fd: error_close_fd:
close(mgcp->wq.bfd.fd); close(rc);
mgcp->wq.bfd.fd = -1; error_free:
return rc; return rc;
} }
@ -1025,8 +1034,6 @@ int mgcp_client_connect2(struct mgcp_client *mgcp, unsigned int retry_n_ports)
* \returns 0 on success, -EINVAL on error. */ * \returns 0 on success, -EINVAL on error. */
void mgcp_client_disconnect(struct mgcp_client *mgcp) void mgcp_client_disconnect(struct mgcp_client *mgcp)
{ {
struct osmo_wqueue *wq;
if (!mgcp) { if (!mgcp) {
LOGP(DLMGCP, LOGL_FATAL, "MGCP client not initialized properly\n"); LOGP(DLMGCP, LOGL_FATAL, "MGCP client not initialized properly\n");
return; return;
@ -1037,13 +1044,9 @@ void mgcp_client_disconnect(struct mgcp_client *mgcp)
osmo_timer_del(&mgcp->keepalive_tx_timer); osmo_timer_del(&mgcp->keepalive_tx_timer);
mgcp->conn_up = false; mgcp->conn_up = false;
wq = &mgcp->wq; osmo_iofd_txqueue_clear(mgcp->iofd);
osmo_wqueue_clear(wq); LOGPMGW(mgcp, LOGL_INFO, "MGCP association: %s -- closed!\n", osmo_iofd_get_name(mgcp->iofd));
LOGPMGW(mgcp, LOGL_INFO, "MGCP association: %s -- closed!\n", osmo_sock_get_name2(wq->bfd.fd)); osmo_iofd_free(mgcp->iofd);
if (osmo_fd_is_registered(&wq->bfd))
osmo_fd_unregister(&wq->bfd);
close(wq->bfd.fd);
wq->bfd.fd = -1;
} }
/*! Get the IP-Aaddress of the associated MGW as string. /*! Get the IP-Aaddress of the associated MGW as string.
@ -1197,10 +1200,9 @@ int mgcp_client_tx(struct mgcp_client *mgcp, struct msgb *msg,
goto mgcp_tx_error; goto mgcp_tx_error;
} }
rc = osmo_wqueue_enqueue(&mgcp->wq, msg); rc = mgcp_do_write(mgcp, msg);
if (rc) { if (rc) {
LOGPMGW(mgcp, LOGL_FATAL, "Could not queue message to MGW\n"); LOGPMGW(mgcp, LOGL_FATAL, "Could not queue message to MGW\n");
msgb_free(msg);
goto mgcp_tx_error; goto mgcp_tx_error;
} else } else
LOGPMGW(mgcp, LOGL_DEBUG, "Queued %u bytes for MGW\n", LOGPMGW(mgcp, LOGL_DEBUG, "Queued %u bytes for MGW\n",

View File

@ -322,7 +322,7 @@ DEFUN(cfg_mgw_mgw_keepalive_req_interval,
/* If client already exists, apply the change immediately if possible: */ /* If client already exists, apply the change immediately if possible: */
mgcp->actual.keepalive.req_interval_sec = atoi(argv[0]); mgcp->actual.keepalive.req_interval_sec = atoi(argv[0]);
if (mgcp->wq.bfd.fd != -1) { /* UDP MGCP socket connected */ if (mgcp->iofd) { /* UDP MGCP socket connected */
if (mgcp->actual.keepalive.req_interval_sec > 0) { if (mgcp->actual.keepalive.req_interval_sec > 0) {
/* Re-schedule: */ /* Re-schedule: */
osmo_timer_schedule(&mgcp->keepalive_tx_timer, mgcp->actual.keepalive.req_interval_sec, 0); osmo_timer_schedule(&mgcp->keepalive_tx_timer, mgcp->actual.keepalive.req_interval_sec, 0);
@ -375,7 +375,7 @@ DEFUN(cfg_mgw_mgw_keepalive_timeout,
/* If client already exists, apply the change immediately if possible: */ /* If client already exists, apply the change immediately if possible: */
mgcp->actual.keepalive.timeout_sec = atoi(argv[0]); mgcp->actual.keepalive.timeout_sec = atoi(argv[0]);
if (mgcp->wq.bfd.fd != -1) { /* UDP MGCP socket connected */ if (mgcp->iofd) { /* UDP MGCP socket connected */
if (mgcp->actual.keepalive.timeout_sec > 0) { if (mgcp->actual.keepalive.timeout_sec > 0) {
/* Re-schedule: */ /* Re-schedule: */
osmo_timer_schedule(&mgcp->keepalive_rx_timer, mgcp->actual.keepalive.timeout_sec, 0); osmo_timer_schedule(&mgcp->keepalive_rx_timer, mgcp->actual.keepalive.timeout_sec, 0);
@ -680,7 +680,7 @@ DEFUN(mgw_show, mgw_show_cmd, "show mgw-pool", SHOW_STR "Display information abo
const struct mgcp_client *cli = pool_member->client; const struct mgcp_client *cli = pool_member->client;
vty_out(vty, "%% MGW %s%s", mgcp_client_pool_member_name(pool_member), VTY_NEWLINE); vty_out(vty, "%% MGW %s%s", mgcp_client_pool_member_name(pool_member), VTY_NEWLINE);
vty_out(vty, "%% MGCP link: %s,%s%s", vty_out(vty, "%% MGCP link: %s,%s%s",
cli && cli->wq.bfd.fd != -1 ? "connected" : "disconnected", cli && cli->iofd ? "connected" : "disconnected",
cli && cli->conn_up ? cli && cli->conn_up ?
((cli->actual.keepalive.timeout_sec > 0) ? "UP" : "MAYBE") : ((cli->actual.keepalive.timeout_sec > 0) ? "UP" : "MAYBE") :
"DOWN", "DOWN",