xua + ipa: Add support for I/O in OSMO_IO mode

This switches osmo_stream_{cli,srv} over to using the OSMO_IO
mode instead of the classic OSMO_FD mode.  The difference is that
we no longer read/write directly to a file descriptor, but we pass
message buffers to/from the library.

This in turn allows the library to use more efficient I/O mechanisms
as osmo_io backend, for example the Linux kernel io_uring.

Change-Id: I7d02037990f4af405839309510dc6c04e36c3369
Depends: libosmo-netif.git I6cf5bad5f618e71c80017960c38009b089dbd6a1
Depends: libosmocore.git I89eb519b22d21011d61a7855b2364bc3c295df82
Closes: OS#5752
This commit is contained in:
Harald Welte 2024-03-04 13:10:10 +01:00
parent 542027f051
commit d2d3e3a517
4 changed files with 74 additions and 226 deletions

View File

@ -17,3 +17,4 @@ libosmo-sigtran ABI change struct osmo_xua_server: new field(s) at the end
libosmo-sigtran API added osmo_ss7_asp_get_trans_proto()
libosmo-sigtran API added osmo_ss7_asp_{find2,find_or_create2}()
libosmo-sigtran API added osmo_ss7_xua_server_{find2,create2}()
libosmo-netif >1.4.0 osmo_io SCTP support

View File

@ -601,9 +601,9 @@ void osmo_ss7_asp_destroy(struct osmo_ss7_asp *asp)
talloc_free(asp);
}
static int xua_cli_read_cb(struct osmo_stream_cli *conn);
static int ipa_cli_read_cb(struct osmo_stream_cli *conn);
static int m3ua_tcp_cli_read_cb(struct osmo_stream_cli *conn);
static int xua_cli_read_cb(struct osmo_stream_cli *conn, struct msgb *msg);
static int ipa_cli_read_cb(struct osmo_stream_cli *conn, struct msgb *msg);
static int m3ua_tcp_cli_read_cb(struct osmo_stream_cli *conn, struct msgb *msg);
static int xua_cli_connect_cb(struct osmo_stream_cli *cli);
int osmo_ss7_asp_restart(struct osmo_ss7_asp *asp)
@ -645,19 +645,22 @@ int osmo_ss7_asp_restart(struct osmo_ss7_asp *asp)
switch (asp->cfg.proto) {
case OSMO_SS7_ASP_PROT_IPA:
OSMO_ASSERT(asp->cfg.trans_proto == IPPROTO_TCP);
osmo_stream_cli_set_read_cb(asp->client, ipa_cli_read_cb);
osmo_stream_cli_set_read_cb2(asp->client, ipa_cli_read_cb);
osmo_stream_cli_set_segmentation_cb(asp->client, osmo_ipa_segmentation_cb);
break;
case OSMO_SS7_ASP_PROT_M3UA:
if (asp->cfg.trans_proto == IPPROTO_SCTP)
osmo_stream_cli_set_read_cb(asp->client, xua_cli_read_cb);
else if (asp->cfg.trans_proto == IPPROTO_TCP)
osmo_stream_cli_set_read_cb(asp->client, m3ua_tcp_cli_read_cb);
else
if (asp->cfg.trans_proto == IPPROTO_SCTP) {
osmo_stream_cli_set_read_cb2(asp->client, xua_cli_read_cb);
osmo_stream_cli_set_segmentation_cb(asp->client, NULL);
} else if (asp->cfg.trans_proto == IPPROTO_TCP) {
osmo_stream_cli_set_read_cb2(asp->client, m3ua_tcp_cli_read_cb);
osmo_stream_cli_set_segmentation_cb(asp->client, xua_segmentation_cb);
} else
OSMO_ASSERT(0);
break;
default:
OSMO_ASSERT(asp->cfg.trans_proto == IPPROTO_SCTP);
osmo_stream_cli_set_read_cb(asp->client, xua_cli_read_cb);
osmo_stream_cli_set_read_cb2(asp->client, xua_cli_read_cb);
break;
}
osmo_stream_cli_set_data(asp->client, asp);
@ -789,33 +792,11 @@ static void log_sctp_notification(struct osmo_ss7_asp *asp, const char *pfx,
}
/* netif code tells us we can read something from the socket */
int ss7_asp_ipa_srv_conn_cb(struct osmo_stream_srv *conn)
int ss7_asp_ipa_srv_conn_cb(struct osmo_stream_srv *conn, struct msgb *msg)
{
int fd = osmo_stream_srv_get_fd(conn);
struct osmo_ss7_asp *asp = osmo_stream_srv_get_data(conn);
struct msgb *msg = NULL;
int rc;
OSMO_ASSERT(fd >= 0);
/* read IPA message from socket and process it */
rc = ipa_msg_recv_buffered(fd, &msg, &asp->pending_msg);
LOGPASP(asp, DLSS7, LOGL_DEBUG, "%s(): ipa_msg_recv_buffered() returned %d\n",
__func__, rc);
if (rc <= 0) {
if (rc == -EAGAIN) {
/* more data needed */
return 0;
}
osmo_stream_srv_destroy(conn);
return rc;
}
if (osmo_ipa_process_msg(msg) < 0) {
LOGPASP(asp, DLSS7, LOGL_ERROR, "Bad IPA message\n");
osmo_stream_srv_destroy(conn);
msgb_free(msg);
return -1;
}
msg->dst = asp;
rate_ctr_inc2(asp->ctrg, SS7_ASP_CTR_PKT_RX_TOTAL);
/* we can use the 'fd' return value of osmo_stream_srv_get_fd() here unverified as all we do
@ -824,19 +805,14 @@ int ss7_asp_ipa_srv_conn_cb(struct osmo_stream_srv *conn)
}
/* netif code tells us we can read something from the socket */
int ss7_asp_xua_srv_conn_cb(struct osmo_stream_srv *conn)
int ss7_asp_xua_srv_conn_cb(struct osmo_stream_srv *conn, struct msgb *msg)
{
struct osmo_ss7_asp *asp = osmo_stream_srv_get_data(conn);
struct msgb *msg = m3ua_msgb_alloc("xUA Server Rx");
unsigned int ppid;
int flags;
int rc;
int rc = 0;
if (!msg)
return -ENOMEM;
/* read xUA message from socket and process it */
rc = osmo_stream_srv_recv(conn, msg);
/* process the received xUA message */
flags = msgb_sctp_msg_flags(msg);
LOGPASP(asp, DLSS7, LOGL_DEBUG, "%s(): sctp_recvmsg() returned %d (flags=0x%x)\n",
@ -855,22 +831,6 @@ int ss7_asp_xua_srv_conn_cb(struct osmo_stream_srv *conn)
default:
break;
}
if (rc == 0) {
osmo_stream_srv_destroy(conn);
rc = -EBADF;
} else {
rc = 0;
}
goto out;
}
if (rc < 0) {
osmo_stream_srv_destroy(conn);
rc = -EBADF;
goto out;
} else if (rc == 0) {
osmo_stream_srv_destroy(conn);
rc = -EBADF;
goto out;
}
@ -890,73 +850,38 @@ out:
return rc;
}
/* netif code tells us we can read something from the socket */
int ss7_asp_m3ua_tcp_srv_conn_cb(struct osmo_stream_srv *conn)
int xua_segmentation_cb(struct msgb *msg)
{
struct osmo_ss7_asp *asp = osmo_stream_srv_get_data(conn);
int fd = osmo_stream_srv_get_fd(conn);
struct msgb *msg = asp->pending_msg;
const struct xua_common_hdr *hdr;
size_t msg_length;
int rc;
OSMO_ASSERT(fd >= 0);
if (msgb_length(msg) < sizeof(*hdr))
return -EAGAIN;
if (msg == NULL) {
msg = m3ua_msgb_alloc(__func__);
asp->pending_msg = msg;
}
/* read message header first */
if (msg->len < sizeof(*hdr)) {
errno = 0;
rc = recv(fd, msg->tail, sizeof(*hdr) - msg->len, 0);
if (rc <= 0) {
if (errno == EAGAIN || errno == EINTR)
return 0; /* need more data */
osmo_stream_srv_destroy(conn);
asp->pending_msg = NULL;
msgb_free(msg);
return rc;
}
msgb_put(msg, rc);
if (msg->len < sizeof(*hdr))
return 0; /* need more data */
}
hdr = (const struct xua_common_hdr *)msg->data;
hdr = (const struct xua_common_hdr *) msg->data;
msg_length = ntohl(hdr->msg_length); /* includes sizeof(*hdr) */
/* read the rest of the message */
if (msg->len < msg_length) {
errno = 0;
rc = recv(fd, msg->tail, msg_length - msg->len, 0);
if (rc <= 0) {
if (errno == EAGAIN || errno == EINTR)
return 0; /* need more data */
osmo_stream_srv_destroy(conn);
asp->pending_msg = NULL;
msgb_free(msg);
return rc;
}
return msg_length;
}
msgb_put(msg, rc);
if (msg->len < msg_length)
return 0; /* need more data */
}
/* netif code tells us we can read something from the socket */
int ss7_asp_m3ua_tcp_srv_conn_cb(struct osmo_stream_srv *conn, struct msgb *msg)
{
struct osmo_ss7_asp *asp = osmo_stream_srv_get_data(conn);
const struct xua_common_hdr *hdr;
int rc;
msg->dst = asp;
rate_ctr_inc2(asp->ctrg, SS7_ASP_CTR_PKT_RX_TOTAL);
/* spoof SCTP Stream ID */
hdr = (const struct xua_common_hdr *)msg->data;
if (hdr->msg_class == M3UA_MSGC_XFER)
msgb_sctp_stream(msg) = 1;
else
msgb_sctp_stream(msg) = 0;
rc = m3ua_rx_msg(asp, msg);
asp->pending_msg = NULL;
msgb_free(msg);
return rc;
@ -1016,33 +941,17 @@ static void xua_cli_close_and_reconnect(struct osmo_stream_cli *cli)
}
/* read call-back for IPA/SCCPlite socket */
static int ipa_cli_read_cb(struct osmo_stream_cli *conn)
static int ipa_cli_read_cb(struct osmo_stream_cli *conn, struct msgb *msg)
{
int fd = osmo_stream_cli_get_fd(conn);
struct osmo_ss7_asp *asp = osmo_stream_cli_get_data(conn);
struct msgb *msg = NULL;
int rc;
OSMO_ASSERT(fd >= 0);
/* read IPA message from socket and process it */
rc = ipa_msg_recv_buffered(fd, &msg, &asp->pending_msg);
LOGPASP(asp, DLSS7, LOGL_DEBUG, "%s(): ipa_msg_recv_buffered() returned %d\n",
__func__, rc);
if (rc <= 0) {
if (rc == -EAGAIN) {
/* more data needed */
return 0;
}
xua_cli_close_and_reconnect(conn);
return rc;
}
if (osmo_ipa_process_msg(msg) < 0) {
LOGPASP(asp, DLSS7, LOGL_ERROR, "Bad IPA message\n");
xua_cli_close_and_reconnect(conn);
if (msgb_length(msg) == 0) {
xua_cli_close_and_reconnect(asp->client);
msgb_free(msg);
return -1;
return 0;
}
msg->dst = asp;
rate_ctr_inc2(asp->ctrg, SS7_ASP_CTR_PKT_RX_TOTAL);
/* we can use the 'fd' return value of osmo_stream_srv_get_fd() here unverified as all we do
@ -1051,94 +960,34 @@ static int ipa_cli_read_cb(struct osmo_stream_cli *conn)
}
/* read call-back for M3UA-over-TCP socket */
static int m3ua_tcp_cli_read_cb(struct osmo_stream_cli *conn)
static int m3ua_tcp_cli_read_cb(struct osmo_stream_cli *conn, struct msgb *msg)
{
struct osmo_ss7_asp *asp = osmo_stream_cli_get_data(conn);
int fd = osmo_stream_cli_get_fd(conn);
struct msgb *msg = asp->pending_msg;
const struct xua_common_hdr *hdr;
size_t msg_length;
int rc;
OSMO_ASSERT(fd >= 0);
if (msg == NULL) {
msg = m3ua_msgb_alloc(__func__);
asp->pending_msg = msg;
}
/* read message header first */
if (msg->len < sizeof(*hdr)) {
errno = 0;
rc = recv(fd, msg->tail, sizeof(*hdr) - msg->len, 0);
if (rc <= 0) {
if (errno == EAGAIN || errno == EINTR)
return 0; /* need more data */
xua_cli_close_and_reconnect(conn);
asp->pending_msg = NULL;
msgb_free(msg);
return rc;
}
msgb_put(msg, rc);
if (msg->len < sizeof(*hdr))
return 0; /* need more data */
}
hdr = (const struct xua_common_hdr *)msg->data;
msg_length = ntohl(hdr->msg_length); /* includes sizeof(*hdr) */
/* read the rest of the message */
if (msg->len < msg_length) {
errno = 0;
rc = recv(fd, msg->tail, msg_length - msg->len, 0);
if (rc <= 0) {
if (errno == EAGAIN || errno == EINTR)
return 0; /* need more data */
xua_cli_close_and_reconnect(conn);
asp->pending_msg = NULL;
msgb_free(msg);
return rc;
}
msgb_put(msg, rc);
if (msg->len < msg_length)
return 0; /* need more data */
}
msg->dst = asp;
rate_ctr_inc2(asp->ctrg, SS7_ASP_CTR_PKT_RX_TOTAL);
/* spoof SCTP PPID */
msgb_sctp_ppid(msg) = M3UA_PPID;
/* spoof SCTP Stream ID */
hdr = (const struct xua_common_hdr *) msg->data;
if (hdr->msg_class == M3UA_MSGC_XFER)
msgb_sctp_stream(msg) = 1;
else
msgb_sctp_stream(msg) = 0;
rc = m3ua_rx_msg(asp, msg);
asp->pending_msg = NULL;
msgb_free(msg);
return rc;
return xua_cli_read_cb(conn, msg);
}
static int xua_cli_read_cb(struct osmo_stream_cli *conn)
static int xua_cli_read_cb(struct osmo_stream_cli *conn, struct msgb *msg)
{
struct osmo_ss7_asp *asp = osmo_stream_cli_get_data(conn);
struct msgb *msg = m3ua_msgb_alloc("xUA Client Rx");
unsigned int ppid;
int flags;
int rc;
if (!msg)
return -ENOMEM;
/* read xUA message from socket and process it */
rc = osmo_stream_cli_recv(conn, msg);
flags = msgb_sctp_msg_flags(msg);
LOGPASP(asp, DLSS7, LOGL_DEBUG, "%s(): sctp_recvmsg() returned %d (flags=0x%x)\n",
__func__, rc, flags);
__func__, msgb_length(msg), flags);
if (flags & OSMO_STREAM_SCTP_MSG_FLAGS_NOTIFICATION) {
union sctp_notification *notif = (union sctp_notification *) msgb_data(msg);
@ -1153,17 +1002,12 @@ static int xua_cli_read_cb(struct osmo_stream_cli *conn)
default:
break;
}
if (rc == 0)
xua_cli_close_and_reconnect(conn);
rc = 0;
goto out;
}
if (rc < 0) {
xua_cli_close_and_reconnect(conn);
goto out;
} else if (rc == 0) {
if (msgb_length(msg) == 0) {
xua_cli_close_and_reconnect(conn);
rc = 0;
goto out;
}

View File

@ -67,32 +67,11 @@ static int xua_accept_cb(struct osmo_stream_srv_link *link, int fd)
struct osmo_ss7_asp *asp;
char *sock_name = osmo_sock_get_name(link, fd);
const char *proto_name = get_value_string(osmo_ss7_asp_protocol_vals, oxs->cfg.proto);
int (*read_cb)(struct osmo_stream_srv *conn) = NULL;
int rc = 0;
LOGP(DLSS7, LOGL_INFO, "%s: New %s connection accepted\n", sock_name, proto_name);
switch (oxs->cfg.proto) {
case OSMO_SS7_ASP_PROT_IPA:
OSMO_ASSERT(oxs->cfg.trans_proto == IPPROTO_TCP);
read_cb = &ss7_asp_ipa_srv_conn_cb;
break;
case OSMO_SS7_ASP_PROT_M3UA:
if (oxs->cfg.trans_proto == IPPROTO_SCTP)
read_cb = &ss7_asp_xua_srv_conn_cb;
else if (oxs->cfg.trans_proto == IPPROTO_TCP)
read_cb = &ss7_asp_m3ua_tcp_srv_conn_cb;
else
OSMO_ASSERT(0);
break;
default:
OSMO_ASSERT(oxs->cfg.trans_proto == IPPROTO_SCTP);
read_cb = &ss7_asp_xua_srv_conn_cb;
break;
}
srv = osmo_stream_srv_create(oxs, link, fd, read_cb,
&ss7_asp_xua_srv_conn_closed_cb, NULL);
srv = osmo_stream_srv_create2(oxs, link, fd, NULL);
if (!srv) {
LOGP(DLSS7, LOGL_ERROR, "%s: Unable to create stream server "
"for connection\n", sock_name);
@ -101,6 +80,28 @@ static int xua_accept_cb(struct osmo_stream_srv_link *link, int fd)
return -1;
}
switch (oxs->cfg.proto) {
case OSMO_SS7_ASP_PROT_IPA:
osmo_stream_srv_set_read_cb(srv, ss7_asp_ipa_srv_conn_cb);
osmo_stream_srv_set_segmentation_cb(srv, osmo_ipa_segmentation_cb);
break;
case OSMO_SS7_ASP_PROT_M3UA:
if (oxs->cfg.trans_proto == IPPROTO_SCTP)
osmo_stream_srv_set_read_cb(srv, &ss7_asp_xua_srv_conn_cb);
else if (oxs->cfg.trans_proto == IPPROTO_TCP) {
osmo_stream_srv_set_read_cb(srv, &ss7_asp_m3ua_tcp_srv_conn_cb);
osmo_stream_srv_set_segmentation_cb(srv, xua_segmentation_cb);
} else
OSMO_ASSERT(0);
break;
default:
OSMO_ASSERT(oxs->cfg.trans_proto == IPPROTO_SCTP);
osmo_stream_srv_set_read_cb(srv, &ss7_asp_xua_srv_conn_cb);
osmo_stream_srv_set_segmentation_cb(srv, NULL);
break;
}
osmo_stream_srv_set_closed_cb(srv, ss7_asp_xua_srv_conn_closed_cb);
asp = ss7_asp_find_by_socket_addr(fd, oxs->cfg.trans_proto);
if (asp) {
LOGP(DLSS7, LOGL_INFO, "%s: matched connection to ASP %s\n",

View File

@ -24,9 +24,9 @@ struct osmo_ss7_asp *ss7_asp_find_by_socket_addr(int fd, int trans_proto);
bool ss7_asp_protocol_check_trans_proto(enum osmo_ss7_asp_protocol proto, int trans_proto);
int ss7_default_trans_proto_for_asp_proto(enum osmo_ss7_asp_protocol proto);
int ss7_asp_ipa_srv_conn_cb(struct osmo_stream_srv *conn);
int ss7_asp_xua_srv_conn_cb(struct osmo_stream_srv *conn);
int ss7_asp_m3ua_tcp_srv_conn_cb(struct osmo_stream_srv *conn);
int ss7_asp_ipa_srv_conn_cb(struct osmo_stream_srv *conn, struct msgb *msg);
int ss7_asp_xua_srv_conn_cb(struct osmo_stream_srv *conn, struct msgb *msg);
int ss7_asp_m3ua_tcp_srv_conn_cb(struct osmo_stream_srv *conn, struct msgb *msg);
int ss7_asp_xua_srv_conn_closed_cb(struct osmo_stream_srv *srv);
int ss7_asp_apply_peer_primary_address(const struct osmo_ss7_asp *asp);
int ss7_asp_apply_primary_address(const struct osmo_ss7_asp *asp);
@ -38,6 +38,8 @@ int ss7_asp_peer_find_host(const struct osmo_ss7_asp_peer *peer, const char *hos
bool ss7_xua_server_set_default_local_hosts(struct osmo_xua_server *oxs);
int xua_segmentation_cb(struct msgb *msg);
enum ss7_as_ctr {
SS7_AS_CTR_RX_MSU_TOTAL,
SS7_AS_CTR_TX_MSU_TOTAL,