Allow L1 forward proxy to provide all 4 queues to seperate applications

Different applications can now connect to L1 forward proxy or access DSP
directly, if they use different message queues.
This commit is contained in:
Andreas Eversberg 2012-07-19 20:33:37 +02:00 committed by Harald Welte
parent 08fce19cfc
commit d40d4d6071
5 changed files with 113 additions and 125 deletions

View File

@ -55,11 +55,15 @@
static const uint16_t fwd_udp_ports[_NUM_MQ_WRITE] = {
[MQ_SYS_READ] = L1FWD_SYS_PORT,
[MQ_L1_READ] = L1FWD_L1_PORT,
#ifndef HW_SYSMOBTS_V1
[MQ_TCH_READ] = L1FWD_TCH_PORT,
[MQ_PDTCH_READ] = L1FWD_PDTCH_PORT,
#endif
};
struct l1fwd_hdl {
struct sockaddr_storage remote_sa;
socklen_t remote_sa_len;
struct sockaddr_storage remote_sa[_NUM_MQ_WRITE];
socklen_t remote_sa_len[_NUM_MQ_WRITE];
struct osmo_wqueue udp_wq[_NUM_MQ_WRITE];
@ -68,12 +72,12 @@ struct l1fwd_hdl {
/* callback when there's a new L1 primitive coming in from the HW */
int l1if_handle_l1prim(struct femtol1_hdl *fl1h, struct msgb *msg)
int l1if_handle_l1prim(int wq, struct femtol1_hdl *fl1h, struct msgb *msg)
{
struct l1fwd_hdl *l1fh = fl1h->priv;
/* Enqueue message to UDP socket */
return osmo_wqueue_enqueue(&l1fh->udp_wq[MQ_L1_WRITE], msg);
return osmo_wqueue_enqueue(&l1fh->udp_wq[wq], msg);
}
/* callback when there's a new SYS primitive coming in from the HW */
@ -99,9 +103,9 @@ static int udp_read_cb(struct osmo_fd *ofd)
msg->l1h = msg->data;
l1fh->remote_sa_len = sizeof(l1fh->remote_sa);
l1fh->remote_sa_len[ofd->priv_nr] = sizeof(l1fh->remote_sa[ofd->priv_nr]);
rc = recvfrom(ofd->fd, msg->l1h, msgb_tailroom(msg), 0,
(struct sockaddr *) &l1fh->remote_sa, &l1fh->remote_sa_len);
(struct sockaddr *) &l1fh->remote_sa[ofd->priv_nr], &l1fh->remote_sa_len[ofd->priv_nr]);
if (rc < 0) {
perror("read from udp");
msgb_free(msg);
@ -113,14 +117,11 @@ static int udp_read_cb(struct osmo_fd *ofd)
}
msgb_put(msg, rc);
DEBUGP(DL1C, "UDP: Received %u bytes for %s queue\n", rc,
ofd->priv_nr == MQ_SYS_WRITE ? "SYS" : "L1");
DEBUGP(DL1C, "UDP: Received %u bytes for queue %d\n", rc,
ofd->priv_nr);
/* put the message into the right queue */
if (ofd->priv_nr == MQ_SYS_WRITE)
rc = osmo_wqueue_enqueue(&fl1h->write_q[MQ_SYS_WRITE], msg);
else
rc = osmo_wqueue_enqueue(&fl1h->write_q[MQ_L1_WRITE], msg);
rc = osmo_wqueue_enqueue(&fl1h->write_q[ofd->priv_nr], msg);
return rc;
}
@ -131,11 +132,11 @@ static int udp_write_cb(struct osmo_fd *ofd, struct msgb *msg)
int rc;
struct l1fwd_hdl *l1fh = ofd->data;
DEBUGP(DL1C, "UDP: Writing %u bytes for %s queue\n", msgb_l1len(msg),
ofd->priv_nr == MQ_SYS_WRITE ? "SYS" : "L1");
DEBUGP(DL1C, "UDP: Writing %u bytes for queue %d\n", msgb_l1len(msg),
ofd->priv_nr);
rc = sendto(ofd->fd, msg->l1h, msgb_l1len(msg), 0,
(const struct sockaddr *)&l1fh->remote_sa, l1fh->remote_sa_len);
(const struct sockaddr *)&l1fh->remote_sa[ofd->priv_nr], l1fh->remote_sa_len[ofd->priv_nr]);
if (rc < 0) {
LOGP(DL1C, LOGL_ERROR, "error writing to L1 msg_queue: %s\n",
strerror(errno));
@ -165,9 +166,11 @@ int main(int argc, char **argv)
INIT_LLIST_HEAD(&fl1h->wlc_list);
/* open the actual hardware transport */
rc = l1if_transport_open(fl1h);
if (rc < 0)
exit(1);
for (i = 0; i < ARRAY_SIZE(fl1h->write_q); i++) {
rc = l1if_transport_open(i, fl1h);
if (rc < 0)
exit(1);
}
/* create our fwd handle */
l1fh = talloc_zero(NULL, struct l1fwd_hdl);
@ -176,7 +179,7 @@ int main(int argc, char **argv)
fl1h->priv = l1fh;
/* Open UDP */
for (i = 0; i < 2; i++) {
for (i = 0; i < ARRAY_SIZE(l1fh->udp_wq); i++) {
struct osmo_wqueue *wq = &l1fh->udp_wq[i];
osmo_wqueue_init(wq, 10);

View File

@ -821,7 +821,7 @@ static int l1if_handle_ind(struct femtol1_hdl *fl1, struct msgb *msg)
return rc;
}
int l1if_handle_l1prim(struct femtol1_hdl *fl1h, struct msgb *msg)
int l1if_handle_l1prim(int wq, struct femtol1_hdl *fl1h, struct msgb *msg)
{
GsmL1_Prim_t *l1p = msgb_l1prim(msg);
struct wait_l1_conf *wlc;
@ -832,8 +832,8 @@ int l1if_handle_l1prim(struct femtol1_hdl *fl1h, struct msgb *msg)
/* silent, don't clog the log file */
break;
default:
LOGP(DL1P, LOGL_DEBUG, "Rx L1 prim %s\n",
get_value_string(femtobts_l1prim_names, l1p->id));
LOGP(DL1P, LOGL_DEBUG, "Rx L1 prim %s on queue %d\n",
get_value_string(femtobts_l1prim_names, l1p->id), wq);
}
/* check if this is a resposne to a sync-waiting request */
@ -1140,12 +1140,19 @@ struct femtol1_hdl *l1if_open(void *priv)
/* default clock source: OCXO */
fl1h->clk_src = SuperFemto_ClkSrcId_Ocxo;
rc = l1if_transport_open(fl1h);
rc = l1if_transport_open(MQ_SYS_WRITE, fl1h);
if (rc < 0) {
talloc_free(fl1h);
return NULL;
}
rc = l1if_transport_open(MQ_L1_WRITE, fl1h);
if (rc < 0) {
l1if_transport_close(MQ_SYS_WRITE, fl1h);
talloc_free(fl1h);
return NULL;
}
fl1h->gsmtap = gsmtap_source_init("localhost", GSMTAP_UDP_PORT, 1);
if (fl1h->gsmtap)
gsmtap_source_add_sink(fl1h->gsmtap);
@ -1155,5 +1162,7 @@ struct femtol1_hdl *l1if_open(void *priv)
int l1if_close(struct femtol1_hdl *fl1h)
{
return l1if_transport_close(fl1h);
l1if_transport_close(MQ_L1_WRITE, fl1h);
l1if_transport_close(MQ_SYS_WRITE, fl1h);
return 0;
}

View File

@ -4,13 +4,11 @@
#include <osmocom/core/msgb.h>
/* functions a transport calls on arrival of primitive from BTS */
int l1if_handle_l1prim(struct femtol1_hdl *fl1h, struct msgb *msg);
int l1if_handle_l1prim(int wq, struct femtol1_hdl *fl1h, struct msgb *msg);
int l1if_handle_sysprim(struct femtol1_hdl *fl1h, struct msgb *msg);
int l1if_handle_tchprim(struct femtol1_hdl *fl1h, struct msgb *msg);
int l1if_handle_pdtchprim(struct femtol1_hdl *fl1h, struct msgb *msg);
/* functions exported by a transport */
int l1if_transport_open(struct femtol1_hdl *fl1h);
int l1if_transport_close(struct femtol1_hdl *fl1h);
int l1if_transport_open(int q, struct femtol1_hdl *fl1h);
int l1if_transport_close(int q, struct femtol1_hdl *fl1h);
#endif /* _FEMTOL1_TRANSP_H */

View File

@ -87,7 +87,7 @@ static int fwd_read_cb(struct osmo_fd *ofd)
if (ofd->priv_nr == MQ_SYS_WRITE)
rc = l1if_handle_sysprim(fl1h, msg);
else
rc = l1if_handle_l1prim(fl1h, msg);
rc = l1if_handle_l1prim(ofd->priv_nr, fl1h, msg);
return rc;
}
@ -98,9 +98,9 @@ static int prim_write_cb(struct osmo_fd *ofd, struct msgb *msg)
return write(ofd->fd, msg->head, msg->len);
}
int l1if_transport_open(struct femtol1_hdl *fl1h)
int l1if_transport_open(int q, struct femtol1_hdl *fl1h)
{
int rc, i;
int rc;
char *bts_host = getenv("L1FWD_BTS_HOST");
printf("sizeof(GsmL1_Prim_t) = %zu\n", sizeof(GsmL1_Prim_t));
@ -111,42 +111,34 @@ int l1if_transport_open(struct femtol1_hdl *fl1h)
exit(2);
}
for (i = 0; i < ARRAY_SIZE(fl1h->write_q); i++) {
struct osmo_wqueue *wq = &fl1h->write_q[i];
struct osmo_fd *ofd = &wq->bfd;
struct osmo_wqueue *wq = &fl1h->write_q[q];
struct osmo_fd *ofd = &wq->bfd;
osmo_wqueue_init(wq, 10);
wq->write_cb = prim_write_cb;
wq->read_cb = fwd_read_cb;
osmo_wqueue_init(wq, 10);
wq->write_cb = prim_write_cb;
wq->read_cb = fwd_read_cb;
ofd->data = fl1h;
ofd->priv_nr = i;
ofd->when |= BSC_FD_READ;
ofd->data = fl1h;
ofd->priv_nr = q;
ofd->when |= BSC_FD_READ;
rc = osmo_sock_init_ofd(ofd, AF_UNSPEC, SOCK_DGRAM, IPPROTO_UDP,
bts_host, fwd_udp_ports[i],
OSMO_SOCK_F_CONNECT);
if (rc < 0) {
talloc_free(fl1h);
return rc;
}
}
rc = osmo_sock_init_ofd(ofd, AF_UNSPEC, SOCK_DGRAM, IPPROTO_UDP,
bts_host, fwd_udp_ports[q],
OSMO_SOCK_F_CONNECT);
if (rc < 0)
return rc;
return 0;
}
int l1if_transport_close(struct femtol1_hdl *fl1h)
int l1if_transport_close(int q, struct femtol1_hdl *fl1h)
{
int i;
struct osmo_wqueue *wq = &fl1h->write_q[q];
struct osmo_fd *ofd = &wq->bfd;
for (i = 0; i < ARRAY_SIZE(fl1h->write_q); i++) {
struct osmo_wqueue *wq = &fl1h->write_q[i];
struct osmo_fd *ofd = &wq->bfd;
osmo_wqueue_clear(wq);
osmo_fd_unregister(ofd);
close(ofd->fd);
}
osmo_wqueue_clear(wq);
osmo_fd_unregister(ofd);
close(ofd->fd);
return 0;
}

View File

@ -115,7 +115,7 @@ static int l1if_fd_cb(struct osmo_fd *ofd, unsigned int what)
if (rc != sizeof(GsmL1_Prim_t))
LOGP(DL1C, LOGL_NOTICE, "%u != "
"sizeof(GsmL1_Prim_t)\n", rc);
return l1if_handle_l1prim(fl1h, msg);
return l1if_handle_l1prim(ofd->priv_nr, fl1h, msg);
default:
/* The compiler can't know that priv_nr is an enum. Assist. */
LOGP(DL1C, LOGL_FATAL, "writing on a wrong queue: %d\n",
@ -144,87 +144,73 @@ static int l1fd_write_cb(struct osmo_fd *ofd, struct msgb *msg)
return 0;
}
int l1if_transport_open(struct femtol1_hdl *hdl)
int l1if_transport_open(int q, struct femtol1_hdl *hdl)
{
int rc, i;
int rc;
/* Step 1: Open all msg_queue file descriptors */
for (i = 0; i < ARRAY_SIZE(hdl->read_ofd); i++) {
struct osmo_fd *ofd = &hdl->read_ofd[i];
struct osmo_fd *read_ofd = &hdl->read_ofd[q];
struct osmo_wqueue *wq = &hdl->write_q[q];
struct osmo_fd *write_ofd = &hdl->write_q[q].bfd;
rc = open(rd_devnames[i], O_RDONLY);
if (rc < 0) {
LOGP(DL1C, LOGL_FATAL, "unable to open msg_queue: %s\n",
strerror(errno));
return rc;
}
ofd->fd = rc;
ofd->priv_nr = i;
ofd->data = hdl;
ofd->cb = l1if_fd_cb;
ofd->when = BSC_FD_READ;
rc = osmo_fd_register(ofd);
if (rc < 0) {
close(ofd->fd);
ofd->fd = -1;
return rc;
}
rc = open(rd_devnames[q], O_RDONLY);
if (rc < 0) {
LOGP(DL1C, LOGL_FATAL, "unable to open msg_queue: %s\n",
strerror(errno));
return rc;
}
for (i = 0; i < ARRAY_SIZE(hdl->write_q); i++) {
struct osmo_wqueue *wq = &hdl->write_q[i];
struct osmo_fd *ofd = &hdl->write_q[i].bfd;
rc = open(wr_devnames[i], O_WRONLY);
if (rc < 0) {
LOGP(DL1C, LOGL_FATAL, "unable to open msg_queue: %s\n",
strerror(errno));
goto out_read;
}
osmo_wqueue_init(wq, 10);
wq->write_cb = l1fd_write_cb;
ofd->fd = rc;
ofd->priv_nr = i;
ofd->data = hdl;
ofd->when = BSC_FD_WRITE;
rc = osmo_fd_register(ofd);
if (rc < 0) {
close(ofd->fd);
ofd->fd = -1;
goto out_read;
}
read_ofd->fd = rc;
read_ofd->priv_nr = q;
read_ofd->data = hdl;
read_ofd->cb = l1if_fd_cb;
read_ofd->when = BSC_FD_READ;
rc = osmo_fd_register(read_ofd);
if (rc < 0) {
close(read_ofd->fd);
read_ofd->fd = -1;
return rc;
}
rc = open(wr_devnames[q], O_WRONLY);
if (rc < 0) {
LOGP(DL1C, LOGL_FATAL, "unable to open msg_queue: %s\n",
strerror(errno));
goto out_read;
}
osmo_wqueue_init(wq, 10);
wq->write_cb = l1fd_write_cb;
write_ofd->fd = rc;
write_ofd->priv_nr = q;
write_ofd->data = hdl;
write_ofd->when = BSC_FD_WRITE;
rc = osmo_fd_register(write_ofd);
if (rc < 0) {
close(write_ofd->fd);
write_ofd->fd = -1;
goto out_read;
}
return 0;
out_read:
for (i = 0; i < ARRAY_SIZE(hdl->read_ofd); i++) {
close(hdl->read_ofd[i].fd);
osmo_fd_unregister(&hdl->read_ofd[i]);
}
close(hdl->read_ofd[q].fd);
osmo_fd_unregister(&hdl->read_ofd[q]);
return rc;
}
int l1if_transport_close(struct femtol1_hdl *hdl)
int l1if_transport_close(int q, struct femtol1_hdl *hdl)
{
int i;
struct osmo_fd *read_ofd = &hdl->read_ofd[q];
struct osmo_fd *write_ofd = &hdl->write_q[q].bfd;
for (i = 0; i < ARRAY_SIZE(hdl->read_ofd); i++) {
struct osmo_fd *ofd = &hdl->read_ofd[i];
osmo_fd_unregister(read_ofd);
close(read_ofd->fd);
read_ofd->fd = -1;
osmo_fd_unregister(ofd);
close(ofd->fd);
ofd->fd = -1;
}
osmo_fd_unregister(write_ofd);
close(write_ofd->fd);
write_ofd->fd = -1;
for (i = 0; i < ARRAY_SIZE(hdl->write_q); i++) {
struct osmo_fd *ofd = &hdl->write_q[i].bfd;
osmo_fd_unregister(ofd);
close(ofd->fd);
ofd->fd = -1;
}
return 0;
}