diff --git a/include/osmocom/netif/osmux.h b/include/osmocom/netif/osmux.h index 47fbd95..702da58 100644 --- a/include/osmocom/netif/osmux.h +++ b/include/osmocom/netif/osmux.h @@ -43,6 +43,7 @@ struct osmux_hdr { struct osmux_in_handle { uint8_t osmux_seq; void (*deliver)(struct msgb *msg); + char *data; /* internal data */ }; /* one per OSmux circuit_id, ie. one per RTP flow. */ @@ -57,7 +58,8 @@ static inline uint8_t *osmux_get_payload(struct osmux_hdr *osmuxh) } void osmux_xfrm_input_init(struct osmux_in_handle *h); -int osmux_xfrm_input(struct msgb *msg); + +int osmux_xfrm_input(struct osmux_in_handle *h, struct msgb *msg); void osmux_xfrm_input_deliver(struct osmux_in_handle *h); int osmux_xfrm_output(struct osmux_hdr *osmuxh, struct osmux_out_handle *h, struct llist_head *list); diff --git a/src/osmux.c b/src/osmux.c index add73ae..6cfce57 100644 --- a/src/osmux.c +++ b/src/osmux.c @@ -131,19 +131,21 @@ int osmux_xfrm_output(struct osmux_hdr *osmuxh, struct osmux_out_handle *h, return i; } -static struct osmux_batch { +struct osmux_batch { struct osmo_timer_list timer; struct osmux_hdr *osmuxh; struct llist_head msgb_list; unsigned int remaining_bytes; uint8_t seq; -} batch; +}; static int -osmux_batch_add(struct msgb *out_msg, struct msgb *msg, struct rtp_hdr *rtph, +osmux_batch_add(struct osmux_in_handle *h, struct msgb *out_msg, + struct msgb *msg, struct rtp_hdr *rtph, struct amr_hdr *amrh, uint32_t amr_payload_len, uint8_t circuit_id, int add_osmux_header) { + struct osmux_batch *batch = (struct osmux_batch *)h->data; struct osmux_hdr *osmuxh; if (add_osmux_header) { @@ -152,22 +154,22 @@ osmux_batch_add(struct msgb *out_msg, struct msgb *msg, struct rtp_hdr *rtph, osmuxh->ctr = 0; osmuxh->amr_f = amrh->f; osmuxh->amr_q= amrh->q; - osmuxh->seq = batch.seq++; + osmuxh->seq = batch->seq++; osmuxh->circuit_id = circuit_id; osmuxh->amr_cmr = amrh->cmr; osmuxh->amr_ft = amrh->ft; msgb_put(out_msg, sizeof(struct osmux_hdr)); /* annotate current osmux header */ - batch.osmuxh = osmuxh; + batch->osmuxh = osmuxh; } else { - if (batch.osmuxh->ctr == 0x7) { + if (batch->osmuxh->ctr == 0x7) { LOGP(DOSMUX, LOGL_ERROR, "cannot add msg=%p, " "too many messages for this RTP ssrc=%u\n", msg, rtph->ssrc); return 0; } - batch.osmuxh->ctr++; + batch->osmuxh->ctr++; } memcpy(out_msg->tail, osmo_amr_get_payload(amrh), amr_payload_len); @@ -177,7 +179,8 @@ osmux_batch_add(struct msgb *out_msg, struct msgb *msg, struct rtp_hdr *rtph, } static int -osmux_xfrm_encode_amr(struct msgb *out_msg, +osmux_xfrm_encode_amr(struct osmux_in_handle *h, + struct msgb *out_msg, struct rtp_hdr *rtph, struct msgb *msg, int add_osmux_header) { @@ -191,18 +194,19 @@ osmux_xfrm_encode_amr(struct msgb *out_msg, amr_payload_len = amr_len - sizeof(struct amr_hdr); - if (osmux_batch_add(out_msg, msg, rtph, amrh, amr_payload_len, 0, - add_osmux_header) < 0) + if (osmux_batch_add(h, out_msg, msg, rtph, amrh, amr_payload_len, + 0, add_osmux_header) < 0) return -1; return 0; } -static struct msgb *osmux_build_batch(void) +static struct msgb *osmux_build_batch(struct osmux_in_handle *h) { struct msgb *cur, *tmp, *batch_msg; uint32_t last_rtp_ssrc; int last_rtp_ssrc_set = 0, add_osmux_hdr = 1; + struct osmux_batch *batch = (struct osmux_batch *)h->data; batch_msg = msgb_alloc(OSMUX_BATCH_MAX, "OSMUX"); if (batch_msg == NULL) { @@ -212,7 +216,7 @@ static struct msgb *osmux_build_batch(void) LOGP(DOSMUX, LOGL_DEBUG, "Now building batch\n"); - llist_for_each_entry_safe(cur, tmp, &batch.msgb_list, list) { + llist_for_each_entry_safe(cur, tmp, &batch->msgb_list, list) { struct rtp_hdr *rtph; char buf[4096]; @@ -230,7 +234,7 @@ static struct msgb *osmux_build_batch(void) LOGP(DOSMUX, LOGL_DEBUG, "%s\n", buf); - osmux_xfrm_encode_amr(batch_msg, rtph, cur, add_osmux_hdr); + osmux_xfrm_encode_amr(h, batch_msg, rtph, cur, add_osmux_hdr); last_rtp_ssrc_set = 1; last_rtp_ssrc = rtph->ssrc; @@ -244,13 +248,14 @@ static struct msgb *osmux_build_batch(void) void osmux_xfrm_input_deliver(struct osmux_in_handle *h) { struct msgb *batch_msg; + struct osmux_batch *batch = (struct osmux_batch *)h->data; LOGP(DOSMUX, LOGL_DEBUG, "invoking delivery function\n"); - batch_msg = osmux_build_batch(); + batch_msg = osmux_build_batch(h); h->deliver(batch_msg); msgb_free(batch_msg); - osmo_timer_del(&batch.timer); - batch.remaining_bytes = OSMUX_BATCH_MAX; + osmo_timer_del(&batch->timer); + batch->remaining_bytes = OSMUX_BATCH_MAX; } static void osmux_batch_timer_expired(void *data) @@ -273,7 +278,8 @@ static int osmux_rtp_amr_payload_len(struct msgb *msg, struct rtp_hdr *rtph) return amr_len - sizeof(struct amr_hdr); } -static int osmux_msgb_batch_queue_add(struct msgb *msg) +static int +osmux_msgb_batch_queue_add(struct osmux_batch *batch, struct msgb *msg) { struct rtp_hdr *rtph; struct msgb *cur; @@ -284,7 +290,7 @@ static int osmux_msgb_batch_queue_add(struct msgb *msg) if (rtph == NULL) return -1; - llist_for_each_entry(cur, &batch.msgb_list, list) { + llist_for_each_entry(cur, &batch->msgb_list, list) { struct rtp_hdr *rtph2; rtph2 = osmo_rtp_get_hdr(cur); @@ -297,7 +303,7 @@ static int osmux_msgb_batch_queue_add(struct msgb *msg) if (rtph->ssrc > rtph2->ssrc) break; } - if (cur->list.next == batch.msgb_list.next) + if (cur->list.next == batch->msgb_list.next) list = cur->list.next; else list = cur->list.prev; @@ -312,10 +318,10 @@ static int osmux_msgb_batch_queue_add(struct msgb *msg) /* Still room in this batch for this message? if there is not * then deliver current batch. */ - if (bytes > batch.remaining_bytes) + if (bytes > batch->remaining_bytes) return 1; - batch.remaining_bytes -= bytes; + batch->remaining_bytes -= bytes; llist_add_tail(&msg->list, list); LOGP(DOSMUX, LOGL_DEBUG, "adding to batch (%p)\n", msg); @@ -331,10 +337,11 @@ static int osmux_msgb_batch_queue_add(struct msgb *msg) * that the message has been batched. If 1 is returned, you have to * invoke osmux_xfrm_input_deliver and try again. */ -int osmux_xfrm_input(struct msgb *msg) +int osmux_xfrm_input(struct osmux_in_handle *h, struct msgb *msg) { int ret; struct rtp_hdr *rtph; + struct osmux_batch *batch = (struct osmux_batch *)h->data; rtph = osmo_rtp_get_hdr(msg); if (rtph == NULL) @@ -347,14 +354,14 @@ int osmux_xfrm_input(struct msgb *msg) /* This is the first message in the batch, start the * batch timer to deliver it. */ - if (llist_empty(&batch.msgb_list)) { + if (llist_empty(&batch->msgb_list)) { LOGP(DOSMUX, LOGL_DEBUG, "osmux start timer batch\n"); - osmo_timer_schedule(&batch.timer, 0, + osmo_timer_schedule(&batch->timer, 0, OSMUX_BATCH_FACTOR * DELTA_RTP_MSG); } - ret = osmux_msgb_batch_queue_add(msg); + ret = osmux_msgb_batch_queue_add(batch, msg); break; default: /* Only AMR supported so far, sorry. */ @@ -366,11 +373,20 @@ int osmux_xfrm_input(struct msgb *msg) void osmux_xfrm_input_init(struct osmux_in_handle *h) { + struct osmux_batch *batch; + LOGP(DOSMUX, LOGL_DEBUG, "initialized osmux input converter\n"); - INIT_LLIST_HEAD(&batch.msgb_list); - batch.remaining_bytes = OSMUX_BATCH_MAX; - batch.timer.cb = osmux_batch_timer_expired; - batch.timer.data = h; + + batch = talloc(NULL, struct osmux_batch); + if (batch == NULL) + return; + + INIT_LLIST_HEAD(&batch->msgb_list); + batch->remaining_bytes = OSMUX_BATCH_MAX; + batch->timer.cb = osmux_batch_timer_expired; + batch->timer.data = h; + + h->data = (void *)batch; } struct osmux_tx_handle { diff --git a/tests/osmo-pcap-test/osmux_test.c b/tests/osmo-pcap-test/osmux_test.c index 1c936b7..8d62ffa 100644 --- a/tests/osmo-pcap-test/osmux_test.c +++ b/tests/osmo-pcap-test/osmux_test.c @@ -77,7 +77,7 @@ static int pcap_test_run(struct msgb *msg) { int ret; - while ((ret = osmux_xfrm_input(msg)) > 1) { + while ((ret = osmux_xfrm_input(&h_input, msg)) > 1) { /* batch full, deliver it */ osmux_xfrm_input_deliver(&h_input); }