diff --git a/include/osmo-bts/bts.h b/include/osmo-bts/bts.h index 62017d787..dcd845935 100644 --- a/include/osmo-bts/bts.h +++ b/include/osmo-bts/bts.h @@ -137,6 +137,8 @@ struct bsc_oml_host { char *addr; }; +#define BTS_PCU_SOCK_WQUEUE_LEN_DEFAULT 10 + /* One BTS */ struct gsm_bts { /* list header in g_bts_sm->bts_list */ diff --git a/src/common/pcu_sock.c b/src/common/pcu_sock.c index 16048e2a8..33b6740d0 100644 --- a/src/common/pcu_sock.c +++ b/src/common/pcu_sock.c @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -962,15 +963,17 @@ static int pcu_rx(uint8_t msg_type, struct gsm_pcu_if *pcu_prim, size_t prim_len struct pcu_sock_state { struct osmo_fd listen_bfd; /* fd for listen socket */ - struct osmo_fd conn_bfd; /* fd for connection to lcr */ - struct llist_head upqueue; /* queue for sending messages */ + struct osmo_wqueue upqueue; /* For sending messages; has fd for conn. to PCU */ }; +static void pcu_sock_close(struct pcu_sock_state *state); + int pcu_sock_send(struct msgb *msg) { struct pcu_sock_state *state = g_bts_sm->gprs.pcu_state; struct osmo_fd *conn_bfd; struct gsm_pcu_if *pcu_prim = (struct gsm_pcu_if *) msg->data; + int rc; if (!state) { if (pcu_prim->msg_type != PCU_IF_MSG_TIME_IND && @@ -980,7 +983,7 @@ int pcu_sock_send(struct msgb *msg) msgb_free(msg); return -EINVAL; } - conn_bfd = &state->conn_bfd; + conn_bfd = &state->upqueue.bfd; if (conn_bfd->fd <= 0) { if (pcu_prim->msg_type != PCU_IF_MSG_TIME_IND && pcu_prim->msg_type != PCU_IF_MSG_INTERF_IND) @@ -989,15 +992,22 @@ int pcu_sock_send(struct msgb *msg) msgb_free(msg); return -EIO; } - msgb_enqueue(&state->upqueue, msg); - osmo_fd_write_enable(conn_bfd); + rc = osmo_wqueue_enqueue(&state->upqueue, msg); + if (rc < 0) { + if (rc == -ENOSPC) + LOGP(DPCU, LOGL_NOTICE, "PCU not reacting (more than %u messages waiting). Closing connection\n", + state->upqueue.max_length); + pcu_sock_close(state); + msgb_free(msg); + return rc; + } return 0; } static void pcu_sock_close(struct pcu_sock_state *state) { - struct osmo_fd *bfd = &state->conn_bfd; + struct osmo_fd *bfd = &state->upqueue.bfd; struct gsm_bts *bts; struct gsm_bts_trx *trx; unsigned int tn; @@ -1043,11 +1053,7 @@ static void pcu_sock_close(struct pcu_sock_state *state) } } - /* flush the queue */ - while (!llist_empty(&state->upqueue)) { - struct msgb *msg = msgb_dequeue(&state->upqueue); - msgb_free(msg); - } + osmo_wqueue_clear(&state->upqueue); } static int pcu_sock_read(struct osmo_fd *bfd) @@ -1096,59 +1102,34 @@ close: return -1; } -static int pcu_sock_write(struct osmo_fd *bfd) +static int pcu_sock_write(struct osmo_fd *bfd, struct msgb *msg) { struct pcu_sock_state *state = bfd->data; - struct msgb *msg; int rc; - while ((msg = msgb_dequeue(&state->upqueue))) { - /* bug hunter 8-): maybe someone forgot msgb_put(...) ? */ - OSMO_ASSERT(msgb_length(msg) > 0); - - /* try to send it over the socket */ - rc = write(bfd->fd, msgb_data(msg), msgb_length(msg)); - if (OSMO_UNLIKELY(rc == 0)) - goto close; - if (OSMO_UNLIKELY(rc < 0)) { - if (errno == EAGAIN) { - /* Re-insert at the start of the queue, skip disabling fd WRITE */ - llist_add(&msg->list, &state->upqueue); - return 0; - } - goto close; - } - msgb_free(msg); + /* bug hunter 8-): maybe someone forgot msgb_put(...) ? */ + OSMO_ASSERT(msgb_length(msg) > 0); + /* try to send it over the socket */ + rc = write(bfd->fd, msgb_data(msg), msgb_length(msg)); + if (OSMO_UNLIKELY(rc == 0)) + goto close; + if (OSMO_UNLIKELY(rc < 0)) { + if (errno == EAGAIN) + return -EAGAIN; + return -1; } - osmo_fd_write_disable(bfd); return 0; close: - msgb_free(msg); pcu_sock_close(state); return -1; } -static int pcu_sock_cb(struct osmo_fd *bfd, unsigned int flags) -{ - int rc = 0; - - if (flags & OSMO_FD_READ) - rc = pcu_sock_read(bfd); - if (rc < 0) - return rc; - - if (flags & OSMO_FD_WRITE) - rc = pcu_sock_write(bfd); - - return rc; -} - /* accept connection coming from PCU */ static int pcu_sock_accept(struct osmo_fd *bfd, unsigned int flags) { struct pcu_sock_state *state = (struct pcu_sock_state *)bfd->data; - struct osmo_fd *conn_bfd = &state->conn_bfd; + struct osmo_fd *conn_bfd = &state->upqueue.bfd; struct sockaddr_un un_addr; socklen_t len; int fd; @@ -1168,7 +1149,7 @@ static int pcu_sock_accept(struct osmo_fd *bfd, unsigned int flags) return 0; } - osmo_fd_setup(conn_bfd, fd, OSMO_FD_READ, pcu_sock_cb, state, 0); + osmo_fd_setup(conn_bfd, fd, OSMO_FD_READ, osmo_wqueue_bfd_cb, state, 0); if (osmo_fd_register(conn_bfd) != 0) { LOGP(DPCU, LOGL_ERROR, "Failed to register new connection fd\n"); @@ -1195,8 +1176,10 @@ int pcu_sock_init(const char *path) if (!state) return -ENOMEM; - INIT_LLIST_HEAD(&state->upqueue); - state->conn_bfd.fd = -1; + osmo_wqueue_init(&state->upqueue, BTS_PCU_SOCK_WQUEUE_LEN_DEFAULT); + state->upqueue.read_cb = pcu_sock_read; + state->upqueue.write_cb = pcu_sock_write; + state->upqueue.bfd.fd = -1; bfd = &state->listen_bfd; @@ -1237,7 +1220,7 @@ void pcu_sock_exit(void) return; osmo_signal_unregister_handler(SS_GLOBAL, pcu_if_signal_cb, NULL); - conn_bfd = &state->conn_bfd; + conn_bfd = &state->upqueue.bfd; if (conn_bfd->fd > 0) pcu_sock_close(state); bfd = &state->listen_bfd; @@ -1252,7 +1235,7 @@ bool pcu_connected(void) { if (!state) return false; - if (state->conn_bfd.fd <= 0) + if (state->upqueue.bfd.fd <= 0) return false; return true; }