diff --git a/include/osmocom/mgcp/mgcp.h b/include/osmocom/mgcp/mgcp.h index e61ba890c..4dff4d086 100644 --- a/include/osmocom/mgcp/mgcp.h +++ b/include/osmocom/mgcp/mgcp.h @@ -24,6 +24,7 @@ #include #include +#include #include #include #include @@ -205,4 +206,4 @@ int mgcp_send_reset_all(struct mgcp_config *cfg); int mgcp_create_bind(const char *source_addr, int port, uint8_t dscp, uint8_t prio); -int mgcp_udp_send(int fd, const struct osmo_sockaddr *addr, const char *buf, int len); +int mgcp_udp_send(struct osmo_io_fd *iofd, const struct osmo_sockaddr *addr, const char *buf, int len); diff --git a/include/osmocom/mgcp/mgcp_network.h b/include/osmocom/mgcp/mgcp_network.h index 1ec897936..8f6505c11 100644 --- a/include/osmocom/mgcp/mgcp_network.h +++ b/include/osmocom/mgcp/mgcp_network.h @@ -4,6 +4,7 @@ #include #include +#include #include @@ -120,8 +121,8 @@ struct mgcp_rtp_end { bool rfc5993_hr_convert; /* Each end has a separate socket for RTP and RTCP */ - struct osmo_fd rtp; - struct osmo_fd rtcp; + struct osmo_io_fd *rtp; + struct osmo_io_fd *rtcp; /* local UDP port number of the RTP socket; RTCP is +1 */ int local_port; @@ -179,7 +180,7 @@ void rtpconn_rate_ctr_add(struct mgcp_conn_rtp *conn_rtp, struct mgcp_endpoint * int id, int inc); void rtpconn_rate_ctr_inc(struct mgcp_conn_rtp *conn_rtp, struct mgcp_endpoint *endp, int id); -void forward_data_tap(int fd, struct mgcp_rtp_tap *tap, struct msgb *msg); +void forward_data_tap(struct osmo_io_fd *iofd, struct mgcp_rtp_tap *tap, struct msgb *msg); uint32_t mgcp_get_current_ts(unsigned codec_rate); int amr_oa_bwe_convert(struct mgcp_endpoint *endp, struct msgb *msg, bool target_is_oa); diff --git a/src/libosmo-mgcp/mgcp_conn.c b/src/libosmo-mgcp/mgcp_conn.c index d9bc57371..5eb48971c 100644 --- a/src/libosmo-mgcp/mgcp_conn.c +++ b/src/libosmo-mgcp/mgcp_conn.c @@ -106,8 +106,8 @@ static int mgcp_rtp_conn_init(struct mgcp_conn_rtp *conn_rtp, struct mgcp_conn * /* backpointer to the generic part of the connection */ conn->u.rtp.conn = conn; - end->rtp.fd = -1; - end->rtcp.fd = -1; + end->rtp = NULL; + end->rtcp = NULL; memset(&end->addr, 0, sizeof(end->addr)); end->rtcp_port = 0; diff --git a/src/libosmo-mgcp/mgcp_iuup.c b/src/libosmo-mgcp/mgcp_iuup.c index 3818d3e23..7531e42ce 100644 --- a/src/libosmo-mgcp/mgcp_iuup.c +++ b/src/libosmo-mgcp/mgcp_iuup.c @@ -512,10 +512,9 @@ static int mgcp_send_iuup(struct mgcp_endpoint *endp, struct msgb *msg, osmo_sockaddr_port(&rtp_end->addr.u.sa), ntohs(rtp_end->rtcp_port)); /* Forward a copy of the RTP data to a debug ip/port */ - forward_data_tap(rtp_end->rtp.fd, &conn_src->tap_out, - msg); + forward_data_tap(rtp_end->rtp, &conn_src->tap_out, msg); - len = mgcp_udp_send(rtp_end->rtp.fd, &rtp_end->addr, (char *)hdr, buflen); + len = mgcp_udp_send(rtp_end->rtp, &rtp_end->addr, (char *)hdr, buflen); if (len <= 0) return len; diff --git a/src/libosmo-mgcp/mgcp_network.c b/src/libosmo-mgcp/mgcp_network.c index bdf516e70..1fc2c56cd 100644 --- a/src/libosmo-mgcp/mgcp_network.c +++ b/src/libosmo-mgcp/mgcp_network.c @@ -4,6 +4,7 @@ /* * (C) 2009-2012 by Holger Hans Peter Freyther * (C) 2009-2012 by On-Waves + * (C) 2013-2024 by sysmocom - s.f.m.c. GmbH * All Rights Reserved * * This program is free software; you can redistribute it and/or modify @@ -794,16 +795,18 @@ static int amr_oa_check(char *data, int len) /* Forward data to a debug tap. This is debug function that is intended for * debugging the voice traffic with tools like gstreamer */ -void forward_data_tap(int fd, struct mgcp_rtp_tap *tap, struct msgb *msg) +void forward_data_tap(struct osmo_io_fd *iofd, struct mgcp_rtp_tap *tap, struct msgb *msg) { int rc; if (!tap->enabled) return; - rc = sendto(fd, msgb_data(msg), msgb_length(msg), 0, (struct sockaddr *)&tap->forward, - sizeof(tap->forward)); + struct msgb *msg2 = msgb_copy(msg, "RTP TAP Tx"); + if (!msg2) + return; + rc = osmo_iofd_sendto_msgb(iofd, msg2, 0, &tap->forward); if (rc < 0) LOGP(DRTP, LOGL_ERROR, "Forwarding tapped (debug) voice data failed.\n"); @@ -1039,29 +1042,36 @@ static int mgcp_conn_rtp_dispatch_rtp(struct mgcp_conn_rtp *conn_dst, struct msg return -1; } -/*! send udp packet. - * \param[in] fd associated file descriptor. +/*! send message buffer via udp socket. If it succeeds, it takes ownership of the msgb and internally calls + * msgb_free() after the aynchronous sendto() completes. In case of error, the msgb is still owned by the + * caller and must be free'd accordingly. + * \param[in] iofd associated file descriptor. + * \param[in] addr destination ip-address. + * \param[in] msg message buffer that holds the data to be send. + * \returns 0 in case of success (takes msgb ownership), -1 on error (doesn't take msgb ownership). */ +static int mgcp_udp_send_msg(struct osmo_io_fd *iofd, const struct osmo_sockaddr *addr, struct msgb *msg) +{ + LOGP(DRTP, LOGL_DEBUG, "sending %d bytes length packet to %s ...\n", msgb_length(msg), + osmo_sockaddr_to_str(addr)); + + return osmo_iofd_sendto_msgb(iofd, msg, 0, addr); +} + +/*! send udp packet from raw buffer/length. + * \param[in] iofd associated file descriptor. * \param[in] addr destination ip-address. * \param[in] buf buffer that holds the data to be send. * \param[in] len length of the data to be sent. * \returns bytes sent, -1 on error. */ -int mgcp_udp_send(int fd, const struct osmo_sockaddr *addr, const char *buf, int len) +int mgcp_udp_send(struct osmo_io_fd *iofd, const struct osmo_sockaddr *addr, const char *buf, int len) { - char ipbuf[INET6_ADDRSTRLEN]; - size_t addr_len; + struct msgb *msg = msgb_alloc_c(iofd, len, "mgcp_udp_send"); + if (!msg) + return -ENOMEM; + memcpy(msg->tail, buf, len); + msgb_put(msg, len); - LOGP(DRTP, LOGL_DEBUG, - "sending %d bytes length packet to %s:%u ...\n", len, - osmo_sockaddr_ntop(&addr->u.sa, ipbuf), - osmo_sockaddr_port(&addr->u.sa)); - - if (addr->u.sa.sa_family == AF_INET6) { - addr_len = sizeof(addr->u.sin6); - } else { - addr_len = sizeof(addr->u.sin); - } - - return sendto(fd, buf, len, 0, &addr->u.sa, addr_len); + return mgcp_udp_send_msg(iofd, addr, msg); } /*! send RTP dummy packet (to keep NAT connection open). @@ -1089,8 +1099,7 @@ int mgcp_send_dummy(struct mgcp_endpoint *endp, struct mgcp_conn_rtp *conn) if (mgcp_conn_rtp_is_iuup(conn)) rc = mgcp_conn_iuup_send_dummy(conn); else - rc = mgcp_udp_send(conn->end.rtp.fd, &conn->end.addr, - rtp_dummy_payload, sizeof(rtp_dummy_payload)); + rc = mgcp_udp_send(conn->end.rtp, &conn->end.addr, rtp_dummy_payload, sizeof(rtp_dummy_payload)); if (rc == -1) goto failed; @@ -1101,7 +1110,7 @@ int mgcp_send_dummy(struct mgcp_endpoint *endp, struct mgcp_conn_rtp *conn) was_rtcp = 1; rtcp_addr = conn->end.addr; osmo_sockaddr_set_port(&rtcp_addr.u.sa, ntohs(conn->end.rtcp_port)); - rc = mgcp_udp_send(conn->end.rtcp.fd, &rtcp_addr, + rc = mgcp_udp_send(conn->end.rtcp, &rtcp_addr, rtp_dummy_payload, sizeof(rtp_dummy_payload)); if (rc >= 0) @@ -1225,22 +1234,21 @@ int mgcp_send(struct mgcp_endpoint *endp, int is_rtp, struct osmo_sockaddr *addr ); /* Forward a copy of the RTP data to a debug ip/port */ - forward_data_tap(rtp_end->rtp.fd, &conn_src->tap_out, - msg); + forward_data_tap(rtp_end->rtp, &conn_src->tap_out, msg); - len = mgcp_udp_send(rtp_end->rtp.fd, &rtp_end->addr, - (char *)msgb_data(msg), msgb_length(msg)); - if (len <= 0) { + len = msgb_length(msg); + + rc = mgcp_udp_send_msg(rtp_end->rtp, &rtp_end->addr, msg); + if (rc < 0) { msgb_free(msg); - return len; + return rc; } rtpconn_rate_ctr_inc(conn_dst, endp, RTP_PACKETS_TX_CTR); rtpconn_rate_ctr_add(conn_dst, endp, RTP_OCTETS_TX_CTR, len); rtp_state->alt_rtp_tx_sequence++; - msgb_free(msg); - return len; + return 0; } else if (!trunk->omit_rtcp) { struct osmo_sockaddr rtcp_addr = rtp_end->addr; osmo_sockaddr_set_port(&rtcp_addr.u.sa, rtp_end->rtcp_port); @@ -1251,15 +1259,19 @@ int mgcp_send(struct mgcp_endpoint *endp, int is_rtp, struct osmo_sockaddr *addr osmo_sockaddr_port(&rtcp_addr.u.sa) ); - len = mgcp_udp_send(rtp_end->rtcp.fd, &rtcp_addr, - (char *)msgb_data(msg), msgb_length(msg)); + len = msgb_length(msg); + + rc = mgcp_udp_send_msg(rtp_end->rtcp, &rtcp_addr, msg); + if (rc < 0) { + msgb_free(msg); + return rc; + } rtpconn_rate_ctr_inc(conn_dst, endp, RTP_PACKETS_TX_CTR); rtpconn_rate_ctr_add(conn_dst, endp, RTP_OCTETS_TX_CTR, len); rtp_state->alt_rtp_tx_sequence++; - msgb_free(msg); - return len; + return 0; } msgb_free(msg); @@ -1461,7 +1473,7 @@ void mgcp_cleanup_e1_bridge_cb(struct mgcp_endpoint *endp, struct mgcp_conn *con } /* Handle incoming RTP data from NET */ -static int rtp_data_net(struct osmo_fd *fd, unsigned int what) +static void rtp_recvfrom_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg, const struct osmo_sockaddr *saddr) { /* NOTE: This is a generic implementation. RTP data is received. In * case of loopback the data is just sent back to its origin. All @@ -1472,49 +1484,34 @@ static int rtp_data_net(struct osmo_fd *fd, unsigned int what) struct mgcp_conn_rtp *conn_src; struct mgcp_endpoint *endp; - struct osmo_sockaddr addr; - socklen_t slen = sizeof(addr); - char ipbuf[INET6_ADDRSTRLEN]; - int ret; enum rtp_proto proto; struct osmo_rtp_msg_ctx *mc; - struct msgb *msg; - int rc; - conn_src = (struct mgcp_conn_rtp *)fd->data; + conn_src = (struct mgcp_conn_rtp *) osmo_iofd_get_data(iofd); OSMO_ASSERT(conn_src); endp = conn_src->conn->endp; OSMO_ASSERT(endp); - msg = msgb_alloc_c(endp->trunk, RTP_BUF_SIZE, "RTP-rx"); - proto = (fd == &conn_src->end.rtp) ? MGCP_PROTO_RTP : MGCP_PROTO_RTCP; + proto = (iofd == conn_src->end.rtp) ? MGCP_PROTO_RTP : MGCP_PROTO_RTCP; - ret = recvfrom(fd->fd, msgb_data(msg), msg->data_len, 0, (struct sockaddr *)&addr.u.sa, &slen); - - if (ret <= 0) { - LOG_CONN_RTP(conn_src, LOGL_ERROR, "recvfrom error: %s\n", strerror(errno)); - rc = -1; - goto out; + if (res <= 0) { + LOG_CONN_RTP(conn_src, LOGL_ERROR, "recvfrom error: %s\n", strerror(-res)); + goto out_free; } - msgb_put(msg, ret); - - LOG_CONN_RTP(conn_src, LOGL_DEBUG, "%s: rx %u bytes from %s:%u\n", + LOG_CONN_RTP(conn_src, LOGL_DEBUG, "%s: rx %u bytes from %s\n", proto == MGCP_PROTO_RTP ? "RTP" : "RTCP", - msgb_length(msg), osmo_sockaddr_ntop(&addr.u.sa, ipbuf), - osmo_sockaddr_port(&addr.u.sa)); + msgb_length(msg), osmo_sockaddr_to_str(saddr)); if ((proto == MGCP_PROTO_RTP && check_rtp(conn_src, msg)) || (proto == MGCP_PROTO_RTCP && check_rtcp(conn_src, msg))) { /* Logging happened in the two check_ functions */ - rc = -1; - goto out; + goto out_free; } if (mgcp_is_rtp_dummy_payload(msg)) { LOG_CONN_RTP(conn_src, LOGL_DEBUG, "rx dummy packet (dropped)\n"); - rc = 0; - goto out; + goto out_free; } /* Since the msgb remains owned and freed by this function, the msg ctx data struct can just be on the stack and @@ -1523,7 +1520,7 @@ static int rtp_data_net(struct osmo_fd *fd, unsigned int what) *mc = (struct osmo_rtp_msg_ctx){ .proto = proto, .conn_src = conn_src, - .from_addr = &addr, + .from_addr = (struct osmo_sockaddr *) saddr, }; LOG_CONN_RTP(conn_src, LOGL_DEBUG, "msg ctx: %d %p %s\n", mc->proto, mc->conn_src, @@ -1538,12 +1535,13 @@ static int rtp_data_net(struct osmo_fd *fd, unsigned int what) /* FIXME: count RTP and RTCP separately, also count IuUP payload-less separately */ /* Forward a copy of the RTP data to a debug ip/port */ - forward_data_tap(fd->fd, &conn_src->tap_in, msg); + forward_data_tap(iofd, &conn_src->tap_in, msg); - rc = rx_rtp(msg); + rx_rtp(msg); + return; -out: - return rc; +out_free: + msgb_free(msg); } /* Note: This function is able to handle RTP and RTCP. msgb ownership is transferred, so this function or its @@ -1590,6 +1588,24 @@ out_free: return -1; } +static void rtp_sendto_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg, const struct osmo_sockaddr *daddr) +{ + /* nothing; osmo_io takes care of msgb_free */ + if (res < 0) { + struct mgcp_conn_rtp *conn_rtp = (struct mgcp_conn_rtp *) osmo_iofd_get_data(iofd); + int priv_nr = osmo_iofd_get_priv_nr(iofd); + char errbuf[129]; + strerror_r(-res, errbuf, sizeof(errbuf)); + LOG_CONN_RTP(conn_rtp, LOGL_ERROR, "%s sendto(%s) failed: %s\n", priv_nr ? "RTCP" : "RTP", + osmo_sockaddr_to_str(daddr), errbuf); + } +} + +static const struct osmo_io_ops rtp_ioops = { + .recvfrom_cb = rtp_recvfrom_cb, + .sendto_cb = rtp_sendto_cb, +}; + /*! bind RTP port to osmo_fd. * \param[in] source_addr source (local) address to bind on. * \param[in] port to bind on. @@ -1617,7 +1633,7 @@ int mgcp_create_bind(const char *source_addr, int port, uint8_t dscp, uint8_t pr static int bind_rtp(struct mgcp_config *cfg, const char *source_addr, struct mgcp_rtp_end *rtp_end, struct mgcp_endpoint *endp) { - int rc; + int rc, rtp_fd, rtcp_fd; /* NOTE: The port that is used for RTCP is the RTP port incremented by one * (e.g. RTP-Port = 16000 ==> RTCP-Port = 16001) */ @@ -1629,7 +1645,7 @@ static int bind_rtp(struct mgcp_config *cfg, const char *source_addr, source_addr, rtp_end->local_port); goto cleanup0; } - rtp_end->rtp.fd = rc; + rtp_fd = rc; rc = mgcp_create_bind(source_addr, rtp_end->local_port + 1, cfg->endp_dscp, cfg->endp_priority); if (rc < 0) { @@ -1638,16 +1654,16 @@ static int bind_rtp(struct mgcp_config *cfg, const char *source_addr, source_addr, rtp_end->local_port + 1); goto cleanup1; } - rtp_end->rtcp.fd = rc; + rtcp_fd = rc; - if (osmo_fd_register(&rtp_end->rtp) != 0) { + if (osmo_iofd_register(rtp_end->rtp, rtp_fd) < 0) { LOGPENDP(endp, DRTP, LOGL_ERROR, "failed to register RTP port %d\n", rtp_end->local_port); goto cleanup2; } - if (osmo_fd_register(&rtp_end->rtcp) != 0) { + if (osmo_iofd_register(rtp_end->rtcp, rtcp_fd) != 0) { LOGPENDP(endp, DRTP, LOGL_ERROR, "failed to register RTCP port %d\n", rtp_end->local_port + 1); @@ -1657,13 +1673,11 @@ static int bind_rtp(struct mgcp_config *cfg, const char *source_addr, return 0; cleanup3: - osmo_fd_unregister(&rtp_end->rtp); + osmo_iofd_unregister(rtp_end->rtp); cleanup2: - close(rtp_end->rtcp.fd); - rtp_end->rtcp.fd = -1; + close(rtcp_fd); cleanup1: - close(rtp_end->rtp.fd); - rtp_end->rtp.fd = -1; + close(rtp_fd); cleanup0: return -1; } @@ -1682,7 +1696,8 @@ int mgcp_bind_net_rtp_port(struct mgcp_endpoint *endp, int rtp_port, snprintf(name, sizeof(name), "%s-%s", conn->conn->name, conn->conn->id); end = &conn->end; - if (end->rtp.fd != -1 || end->rtcp.fd != -1) { + if ((end->rtp && osmo_iofd_get_fd(end->rtp) != -1) || + (end->rtcp && osmo_iofd_get_fd(end->rtcp) != -1)) { LOGPENDP(endp, DRTP, LOGL_ERROR, "%u was already bound on conn:%s\n", rtp_port, mgcp_conn_dump(conn->conn)); @@ -1695,8 +1710,18 @@ int mgcp_bind_net_rtp_port(struct mgcp_endpoint *endp, int rtp_port, } end->local_port = rtp_port; - osmo_fd_setup(&end->rtp, -1, OSMO_FD_READ, rtp_data_net, conn, 0); - osmo_fd_setup(&end->rtcp, -1, OSMO_FD_READ, rtp_data_net, conn, 0); + end->rtp = osmo_iofd_setup(conn->conn, -1, name, OSMO_IO_FD_MODE_RECVFROM_SENDTO, &rtp_ioops, conn); + if (!end->rtp) + return -EIO; + osmo_iofd_set_alloc_info(end->rtp, RTP_BUF_SIZE, 0); + end->rtcp = osmo_iofd_setup(conn->conn, -1, name, OSMO_IO_FD_MODE_RECVFROM_SENDTO, &rtp_ioops, conn); + if (!end->rtcp) { + osmo_iofd_free(end->rtp); + end->rtp = NULL; + return -EIO; + } + osmo_iofd_set_alloc_info(end->rtcp, RTP_BUF_SIZE, 0); + osmo_iofd_set_priv_nr(end->rtcp, 1); /* we use priv_nr as identifier for RTCP */ return bind_rtp(endp->trunk->cfg, conn->end.local_addr, end, endp); } @@ -1705,15 +1730,13 @@ int mgcp_bind_net_rtp_port(struct mgcp_endpoint *endp, int rtp_port, * \param[in] end RTP end */ void mgcp_free_rtp_port(struct mgcp_rtp_end *end) { - if (end->rtp.fd != -1) { - osmo_fd_unregister(&end->rtp); - close(end->rtp.fd); - end->rtp.fd = -1; + if (end->rtp) { + osmo_iofd_free(end->rtp); + end->rtp = NULL; } - if (end->rtcp.fd != -1) { - osmo_fd_unregister(&end->rtcp); - close(end->rtcp.fd); - end->rtcp.fd = -1; + if (end->rtcp) { + osmo_iofd_free(end->rtcp); + end->rtcp = NULL; } } diff --git a/src/libosmo-mgcp/mgcp_osmux.c b/src/libosmo-mgcp/mgcp_osmux.c index 3df99724e..df91dbc0b 100644 --- a/src/libosmo-mgcp/mgcp_osmux.c +++ b/src/libosmo-mgcp/mgcp_osmux.c @@ -1,6 +1,7 @@ /* * (C) 2012-2013 by Pablo Neira Ayuso * (C) 2012-2013 by On Waves ehf + * (C) 2013-2024 by sysmocom - s.f.m.c. GmbH * All rights not specifically granted under this license are reserved. * * This program is free software; you can redistribute it and/or modify it @@ -13,9 +14,11 @@ #include /* for memcpy */ #include /* for abs */ #include /* for PRIu64 */ +#include /* for PRIu64 */ #include #include #include +#include #include #include @@ -30,8 +33,8 @@ #include #include -static struct osmo_fd osmux_fd_v4; -static struct osmo_fd osmux_fd_v6; +static struct osmo_io_fd *osmux_fd_v4; +static struct osmo_io_fd *osmux_fd_v6; static LLIST_HEAD(osmux_handle_list); @@ -76,34 +79,31 @@ static void rtpconn_osmux_rate_ctr_inc(struct mgcp_conn_rtp *conn_rtp, int id) static void osmux_deliver_cb(struct msgb *batch_msg, void *data) { struct osmux_handle *handle = data; - socklen_t dest_len; - int rc, fd; - struct mgcp_trunk *trunk = (struct mgcp_trunk *)osmux_fd_v4.data; + int rc; + struct osmo_io_fd *iofd; + struct mgcp_trunk *trunk = (struct mgcp_trunk *) osmo_iofd_get_data(osmux_fd_v4); struct rate_ctr_group *all_osmux_stats = trunk->ratectr.all_osmux_conn_stats; switch (handle->rem_addr.u.sa.sa_family) { case AF_INET6: - dest_len = sizeof(handle->rem_addr.u.sin6); - fd = osmux_fd_v6.fd; + iofd = osmux_fd_v6; break; case AF_INET: default: - dest_len = sizeof(handle->rem_addr.u.sin); - fd = osmux_fd_v4.fd; + iofd = osmux_fd_v4; break; } - rc = sendto(fd, batch_msg->data, batch_msg->len, 0, - (struct sockaddr *)&handle->rem_addr.u.sa, dest_len); + rc = osmo_iofd_sendto_msgb(iofd, batch_msg, 0, &handle->rem_addr); if (rc < 0) { char errbuf[129]; - strerror_r(errno, errbuf, sizeof(errbuf)); + strerror_r(-rc, errbuf, sizeof(errbuf)); LOGP(DOSMUX, LOGL_NOTICE, "osmux sendto(%s) failed: %s\n", osmo_sockaddr_to_str(&handle->rem_addr), errbuf); rate_ctr_inc(rate_ctr_group_get_ctr(all_osmux_stats, OSMUX_DROPPED_PACKETS_CTR)); + msgb_free(batch_msg); } else { rate_ctr_inc(rate_ctr_group_get_ctr(all_osmux_stats, OSMUX_PACKETS_TX_CTR)); } - msgb_free(batch_msg); } /* Lookup existing OSMUX handle for specified destination address. */ @@ -325,28 +325,6 @@ static void scheduled_from_osmux_tx_rtp_cb(struct msgb *msg, void *data) /* dispatch_rtp_cb() has taken ownership of the msgb */ } -static struct msgb *osmux_recv(struct osmo_fd *ofd, struct osmo_sockaddr *addr) -{ - struct msgb *msg; - socklen_t slen = sizeof(addr->u.sas); - int ret; - - msg = msgb_alloc(4096, "OSMUX"); - if (!msg) { - LOGP(DOSMUX, LOGL_ERROR, "cannot allocate message\n"); - return NULL; - } - ret = recvfrom(ofd->fd, msg->data, msg->data_len, 0, &addr->u.sa, &slen); - if (ret <= 0) { - msgb_free(msg); - LOGP(DOSMUX, LOGL_ERROR, "cannot receive message\n"); - return NULL; - } - msgb_put(msg, ret); - - return msg; -} - /* To be called every time some AMR data is received on a connection * returns: 0 if conn can process data, negative if an error ocurred and data should not be further processed */ static int conn_osmux_event_data_received(struct mgcp_conn_rtp *conn, const struct osmo_sockaddr *rem_addr) @@ -442,22 +420,16 @@ out: } #define osmux_chunk_length(msg, rem) ((rem) - (msg)->len) -static int osmux_read_fd_cb(struct osmo_fd *ofd, unsigned int what) +static void osmux_recvfrom_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg, const struct osmo_sockaddr *rem_addr) { - struct msgb *msg; struct osmux_hdr *osmuxh; - struct osmo_sockaddr rem_addr; - uint32_t rem; - struct mgcp_trunk *trunk = ofd->data; + struct mgcp_trunk *trunk = osmo_iofd_get_data(iofd); struct rate_ctr_group *all_rtp_stats = trunk->ratectr.all_osmux_conn_stats; + uint32_t rem; char addr_str[64]; - msg = osmux_recv(ofd, &rem_addr); - if (!msg) - return -1; - rate_ctr_inc(rate_ctr_group_get_ctr(all_rtp_stats, OSMUX_PACKETS_RX_CTR)); - osmo_sockaddr_to_str_buf(addr_str, sizeof(addr_str), &rem_addr); + osmo_sockaddr_to_str_buf(addr_str, sizeof(addr_str), rem_addr); if (trunk->cfg->osmux.usage == OSMUX_USAGE_OFF) { LOGP(DOSMUX, LOGL_ERROR, @@ -467,14 +439,16 @@ static int osmux_read_fd_cb(struct osmo_fd *ofd, unsigned int what) } /* Catch legacy dummy message and process them separately: */ - if (msg->len == 2 && msg->data[0] == MGCP_DUMMY_LOAD) - return osmux_handle_legacy_dummy(trunk, &rem_addr, msg); + if (msg->len == 2 && msg->data[0] == MGCP_DUMMY_LOAD) { + osmux_handle_legacy_dummy(trunk, rem_addr, msg); + return; + } rem = msg->len; while((osmuxh = osmux_xfrm_output_pull(msg)) != NULL) { struct mgcp_conn_rtp *conn_src; conn_src = osmux_conn_lookup(trunk, osmuxh->circuit_id, - &rem_addr); + rem_addr); if (!conn_src) { LOGP(DOSMUX, LOGL_DEBUG, "Cannot find a src conn for %s CID=%d\n", @@ -482,7 +456,7 @@ static int osmux_read_fd_cb(struct osmo_fd *ofd, unsigned int what) goto next; } - if (conn_osmux_event_data_received(conn_src, &rem_addr) < 0) + if (conn_osmux_event_data_received(conn_src, rem_addr) < 0) goto next; mgcp_conn_watchdog_kick(conn_src->conn); @@ -496,19 +470,38 @@ next: } out: msgb_free(msg); - return 0; } +static void osmux_sendto_cb(struct osmo_io_fd *iofd, int res, struct msgb *msg, const struct osmo_sockaddr *rem_addr) +{ + /* nothing; osmo_io takes care of msgb_free */ + if (res < 0) { + struct mgcp_trunk *trunk = (struct mgcp_trunk *) osmo_iofd_get_data(iofd); + struct rate_ctr_group *all_osmux_stats = trunk->ratectr.all_osmux_conn_stats; + char errbuf[129]; + strerror_r(-res, errbuf, sizeof(errbuf)); + LOGP(DOSMUX, LOGL_NOTICE, "osmux sendto(%s) failed: %s\n", osmo_sockaddr_to_str(rem_addr), errbuf); + rate_ctr_inc(rate_ctr_group_get_ctr(all_osmux_stats, OSMUX_DROPPED_PACKETS_CTR)); + } +} + +static const struct osmo_io_ops osmux_ioops = { + .recvfrom_cb = osmux_recvfrom_cb, + .sendto_cb = osmux_sendto_cb, +}; + int osmux_init(struct mgcp_trunk *trunk) { - int ret; + int ret, fd; struct mgcp_config *cfg = trunk->cfg; /* So far we only support running on one trunk: */ OSMO_ASSERT(trunk == mgcp_trunk_by_num(cfg, MGCP_TRUNK_VIRTUAL, MGCP_VIRT_TRUNK_ID)); - osmo_fd_setup(&osmux_fd_v4, -1, OSMO_FD_READ, osmux_read_fd_cb, trunk, 0); - osmo_fd_setup(&osmux_fd_v6, -1, OSMO_FD_READ, osmux_read_fd_cb, trunk, 0); + osmux_fd_v4 = osmo_iofd_setup(trunk, -1, "osmux_fd_v4", OSMO_IO_FD_MODE_RECVFROM_SENDTO, &osmux_ioops, trunk); + if (!osmux_fd_v4) + goto out; + osmo_iofd_set_alloc_info(osmux_fd_v4, 4096, 0); if (cfg->osmux.local_addr_v4) { ret = mgcp_create_bind(cfg->osmux.local_addr_v4, cfg->osmux.local_port, @@ -516,40 +509,55 @@ int osmux_init(struct mgcp_trunk *trunk) if (ret < 0) { LOGP(DOSMUX, LOGL_ERROR, "Cannot bind OSMUX IPv4 socket to %s:%u\n", cfg->osmux.local_addr_v4, cfg->osmux.local_port); - return ret; + goto out_free_v4; } - osmux_fd_v4.fd = ret; + fd = ret; - ret = osmo_fd_register(&osmux_fd_v4); + ret = osmo_iofd_register(osmux_fd_v4, fd); if (ret < 0) { - LOGP(DOSMUX, LOGL_ERROR, "Cannot register OSMUX IPv4 socket %s\n", - osmo_sock_get_name2(osmux_fd_v4.fd)); - return ret; + LOGP(DOSMUX, LOGL_ERROR, "Cannot register OSMUX IPv4 socket %s\n", osmo_sock_get_name2(fd)); + close(fd); + goto out_free_v4; } - LOGP(DOSMUX, LOGL_INFO, "OSMUX IPv4 socket listening on %s\n", - osmo_sock_get_name2(osmux_fd_v4.fd)); + LOGP(DOSMUX, LOGL_INFO, "OSMUX IPv4 socket listening on %s\n", osmo_sock_get_name2(fd)); } + + osmux_fd_v6 = osmo_iofd_setup(trunk, -1, "osmux_fd_v6", OSMO_IO_FD_MODE_RECVFROM_SENDTO, &osmux_ioops, trunk); + if (!osmux_fd_v6) + goto out_free_v4; + osmo_iofd_set_alloc_info(osmux_fd_v6, 4096, 0); + if (cfg->osmux.local_addr_v6) { ret = mgcp_create_bind(cfg->osmux.local_addr_v6, cfg->osmux.local_port, cfg->endp_dscp, cfg->endp_priority); if (ret < 0) { LOGP(DOSMUX, LOGL_ERROR, "Cannot bind OSMUX IPv6 socket to [%s]:%u\n", cfg->osmux.local_addr_v6, cfg->osmux.local_port); - return ret; + goto out_free_v6; } - osmux_fd_v6.fd = ret; + fd = ret; - ret = osmo_fd_register(&osmux_fd_v6); + ret = osmo_iofd_register(osmux_fd_v6, fd); if (ret < 0) { - LOGP(DOSMUX, LOGL_ERROR, "Cannot register OSMUX IPv6 socket %s\n", - osmo_sock_get_name2(osmux_fd_v6.fd)); - return ret; + LOGP(DOSMUX, LOGL_ERROR, "Cannot register OSMUX IPv6 socket %s\n", osmo_sock_get_name2(fd)); + close(fd); + goto out_free_v6; } - LOGP(DOSMUX, LOGL_INFO, "OSMUX IPv6 socket listening on %s\n", - osmo_sock_get_name2(osmux_fd_v6.fd)); + LOGP(DOSMUX, LOGL_INFO, "OSMUX IPv6 socket listening on %s\n", osmo_sock_get_name2(fd)); } cfg->osmux.initialized = true; return 0; + +out_free_v6: + /* osmo_iofd_free performs unregister + close */ + osmo_iofd_free(osmux_fd_v6); + osmux_fd_v6 = NULL; +out_free_v4: + /* osmo_iofd_free performs unregister + close */ + osmo_iofd_free(osmux_fd_v4); + osmux_fd_v4 = NULL; +out: + return -1; } /*! relase OSXMUX cid, that had been allocated to this connection. @@ -715,7 +723,7 @@ int osmux_send_dummy(struct mgcp_conn_rtp *conn) osmo_sockaddr_ntop(&conn->end.addr.u.sa, ipbuf), osmo_sockaddr_port(&conn->end.addr.u.sa), conn->osmux.remote_cid); - return mgcp_udp_send(osmux_fd_v4.fd, &conn->end.addr, (char *)osmuxh, buf_len); + return mgcp_udp_send(osmux_fd_v4, &conn->end.addr, (char *)osmuxh, buf_len); } /* Keeps track of locally allocated Osmux circuit ID. +7 to round up to 8 bit boundary. */ diff --git a/tests/mgcp/mgcp_test.c b/tests/mgcp/mgcp_test.c index c76bd9d83..ffc8a202e 100644 --- a/tests/mgcp/mgcp_test.c +++ b/tests/mgcp/mgcp_test.c @@ -653,12 +653,13 @@ static struct msgb *create_msg(const char *str, const char *conn_id) static int dummy_packets = 0; /* override and forward */ -ssize_t sendto(int sockfd, const void *buf, size_t len, int flags, - const struct sockaddr *dest_addr, socklen_t addrlen) +int osmo_iofd_sendto_msgb(struct osmo_io_fd *iofd, struct msgb *msg, int flags, const struct osmo_sockaddr *addr) { uint32_t dest_host = - htonl(((struct sockaddr_in *)dest_addr)->sin_addr.s_addr); - int dest_port = htons(((struct sockaddr_in *)dest_addr)->sin_port); + htonl(((struct sockaddr_in *)addr)->sin_addr.s_addr); + int dest_port = htons(((struct sockaddr_in *)addr)->sin_port); + const uint8_t *buf = msgb_data(msg); + size_t len = msgb_length(msg); if (len == sizeof(rtp_dummy_payload) && memcmp(buf, rtp_dummy_payload, sizeof(rtp_dummy_payload)) == 0) { @@ -672,6 +673,8 @@ ssize_t sendto(int sockfd, const void *buf, size_t len, int flags, OSMO_ASSERT(dest_host); OSMO_ASSERT(dest_port); + msgb_free(msg); + return len; }