osmux: major rework to reduce batch message size (add counter field)

This patch adds the counter field to the osmux header, so we can
reduce the size of the batch even further, eg.

osmuxhdr (ctr=3)
speech
speech
speech
osmuxhdr (ctr=2)
speech
speech
...

The new header is the following:

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| FT  | CTR |F|Q|    SeqNR      |  Circuit ID   |AMR-FT |AMR-CMR|
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

The counter field is 3 bits long, thus, we can batch up to 8
RTP speech frames into one single batch per circuit ID.

I have also removed the RTP marker, since it can be reconstructed
from the AMR information.

Moreover, the entire workflow has been also reworked. Whenever a
packet arrives, we introduce it into the batch list. This batch
list contains a list of RTP messages ordered by RTP SSRC. Then,
once the batch timer expires or the it gets full, we build the
batch from the list of RTP messages.

Note that this allows us to put several speech frame into one
single osmux header without actually worrying about the amount
of messages that we'll receive.

The functions that reconstruct the RTP messages has been also
adjusted. Now, it returns a list of RTP messages per RTP SSRC
that has been extracted from the batch.
This commit is contained in:
Pablo Neira Ayuso 2012-08-02 20:24:57 +02:00
parent b03de23120
commit ffd20f3f1c
3 changed files with 228 additions and 83 deletions

View File

@ -18,26 +18,24 @@
struct osmux_hdr {
#if __BYTE_ORDER == __BIG_ENDIAN
uint8_t ft:4,
uint8_t ft:3,
ctr:3,
amr_f:1,
amr_q:1;
#elif __BYTE_ORDER == __LITTLE_ENDIAN
uint8_t amr_q:1,
amr_f:1,
ctr:3,
ft:3;
#endif
uint8_t seq;
uint8_t circuit_id;
#if __BYTE_ORDER == __BIG_ENDIAN
uint8_t amr_ft:4,
amr_cmr:4;
#elif __BYTE_ORDER == __LITTLE_ENDIAN
uint8_t amr_cmr:4,
ft:4;
#endif
uint8_t circuit_id;
uint8_t seq;
#if __BYTE_ORDER == __BIG_ENDIAN
uint8_t amr_f:1,
amr_ft:4,
amr_q:1,
rtp_marker:1,
pad:1;
#elif __BYTE_ORDER == __LITTLE_ENDIAN
uint8_t pad:1,
rtp_marker:1,
amr_q:1,
amr_ft:4,
amr_f:1;
amr_ft:4;
#endif
} __attribute__((packed));

View File

@ -31,6 +31,9 @@
/* XXX: MTU - iphdr (20 bytes) - udphdr (8 bytes) */
#define OSMUX_BATCH_MAX 1472
/* XXX: make this configurable */
#define OSMUX_BATCH_FACTOR 4
struct osmux_hdr *osmux_xfrm_output_pull(struct msgb *msg)
{
struct osmux_hdr *osmuxh = NULL;
@ -39,7 +42,7 @@ struct osmux_hdr *osmux_xfrm_output_pull(struct msgb *msg)
osmuxh = (struct osmux_hdr *)msg->data;
msgb_pull(msg, sizeof(struct osmux_hdr) +
osmo_amr_bytes(osmuxh->amr_cmr));
(osmo_amr_bytes(osmuxh->amr_cmr) * (osmuxh->ctr+1)));
} else if (msg->len > 0) {
LOGP(DOSMUX, LOGL_ERROR,
"remaining %d bytes, broken osmuxhdr?\n", msg->len);
@ -48,8 +51,9 @@ struct osmux_hdr *osmux_xfrm_output_pull(struct msgb *msg)
return osmuxh;
}
struct msgb *
osmux_xfrm_output(struct osmux_hdr *osmuxh, struct osmux_out_handle *h)
static struct msgb *
osmux_rebuild_rtp(struct osmux_out_handle *h,
struct osmux_hdr *osmuxh, void *payload, int payload_len)
{
struct msgb *out_msg;
struct rtp_hdr *rtph;
@ -69,7 +73,6 @@ osmux_xfrm_output(struct osmux_hdr *osmuxh, struct osmux_out_handle *h)
rtph->extension = 0;
rtph->version = RTP_VERSION;
rtph->payload_type = 98;
rtph->marker = osmuxh->rtp_marker;
/* ... emulate timestamp and ssrc */
rtph->timestamp = htonl(h->rtp_timestamp);
rtph->sequence = htons(h->rtp_seq);
@ -87,9 +90,8 @@ osmux_xfrm_output(struct osmux_hdr *osmuxh, struct osmux_out_handle *h)
msgb_put(out_msg, sizeof(struct amr_hdr));
/* add AMR speech data */
memcpy(out_msg->tail, osmux_get_payload(osmuxh),
osmo_amr_bytes(osmuxh->amr_cmr));
msgb_put(out_msg, osmo_amr_bytes(osmuxh->amr_cmr));
memcpy(out_msg->tail, payload, payload_len);
msgb_put(out_msg, payload_len);
/* bump last RTP sequence number and timestamp that has been used */
h->rtp_seq++;
@ -98,24 +100,136 @@ osmux_xfrm_output(struct osmux_hdr *osmuxh, struct osmux_out_handle *h)
return out_msg;
}
struct msgb *
osmux_xfrm_output(struct osmux_hdr *osmuxh, struct osmux_out_handle *h)
{
struct msgb *first, *next;
int i;
first = osmux_rebuild_rtp(h, osmuxh,
osmux_get_payload(osmuxh),
osmo_amr_bytes(osmuxh->amr_cmr));
INIT_LLIST_HEAD(&first->list);
for (i=0; i<osmuxh->ctr; i++) {
next = osmux_rebuild_rtp(h, osmuxh,
osmux_get_payload(osmuxh) +
((i+1) * osmo_amr_bytes(osmuxh->amr_cmr)),
osmo_amr_bytes(osmuxh->amr_cmr));
llist_add_tail(&next->list, &first->list);
}
return first;
}
static struct osmux_batch {
struct osmo_timer_list timer;
struct msgb *msg;
uint8_t seq;
struct osmo_timer_list timer;
struct osmux_hdr *osmuxh;
struct llist_head msgb_list;
unsigned int remaining_bytes;
uint8_t seq;
} batch;
static int osmux_batch_has_room(int msg_len)
static int
osmux_batch_add(struct msgb *out_msg, struct msgb *msg, struct rtp_hdr *rtph,
struct amr_hdr *amrh, uint32_t amr_payload_len,
uint8_t circuit_id, int add_osmux_header)
{
return batch.msg == NULL ? 1 : msg_len < msgb_tailroom(batch.msg);
struct osmux_hdr *osmuxh;
if (add_osmux_header) {
osmuxh = (struct osmux_hdr *)out_msg->tail;
osmuxh->ft = OSMUX_FT_VOICE_AMR;
osmuxh->ctr = 0;
osmuxh->amr_f = amrh->f;
osmuxh->amr_q= amrh->q;
osmuxh->seq = batch.seq++;
osmuxh->circuit_id = circuit_id;
osmuxh->amr_cmr = amrh->cmr;
osmuxh->amr_ft = amrh->ft;
msgb_put(out_msg, sizeof(struct osmux_hdr));
/* annotate current osmux header */
batch.osmuxh = osmuxh;
} else
batch.osmuxh->ctr++;
memcpy(out_msg->tail, osmo_amr_get_payload(amrh), amr_payload_len);
msgb_put(out_msg, amr_payload_len);
return 0;
}
static int
osmux_xfrm_encode_amr(struct msgb *out_msg,
struct rtp_hdr *rtph, struct msgb *msg,
int add_osmux_header)
{
struct amr_hdr *amrh;
uint32_t amr_len;
uint32_t amr_payload_len;
amrh = osmo_rtp_get_payload(rtph, msg, &amr_len);
if (amrh == NULL)
return -1;
amr_payload_len = amr_len - sizeof(struct amr_hdr);
if (osmux_batch_add(out_msg, msg, rtph, amrh, amr_payload_len, 0,
add_osmux_header) < 0)
return -1;
return 0;
}
static struct msgb *osmux_build_batch(void)
{
struct msgb *cur, *tmp, *batch_msg;
uint32_t last_rtp_ssrc;
int last_rtp_ssrc_set = 0, add_osmux_hdr = 1;
int i=0;
batch_msg = msgb_alloc(OSMUX_BATCH_MAX, "OSMUX");
if (batch_msg == NULL) {
LOGP(DOSMUX, LOGL_ERROR, "Not enough memory\n");
return NULL;
}
LOGP(DOSMUX, LOGL_DEBUG, "Now building batch\n");
llist_for_each_entry_safe(cur, tmp, &batch.msgb_list, list) {
struct rtp_hdr *rtph;
LOGP(DOSMUX, LOGL_DEBUG,
"building message (%p) into batch (%d)\n", cur, ++i);
rtph = osmo_rtp_get_hdr(cur);
if (rtph == NULL)
return NULL;
if (last_rtp_ssrc_set)
add_osmux_hdr = (last_rtp_ssrc == rtph->ssrc);
osmux_xfrm_encode_amr(batch_msg, rtph, cur, add_osmux_hdr);
last_rtp_ssrc_set = 1;
last_rtp_ssrc = rtph->ssrc;
llist_del(&cur->list);
msgb_free(cur);
}
return batch_msg;
}
void osmux_xfrm_input_deliver(struct osmux_in_handle *h)
{
struct msgb *batch_msg;
LOGP(DOSMUX, LOGL_DEBUG, "invoking delivery function\n");
h->deliver(batch.msg);
msgb_free(batch.msg);
batch.msg = NULL;
batch_msg = osmux_build_batch();
h->deliver(batch_msg);
msgb_free(batch_msg);
osmo_timer_del(&batch.timer);
batch.remaining_bytes = OSMUX_BATCH_MAX;
}
static void osmux_batch_timer_expired(void *data)
@ -126,67 +240,77 @@ static void osmux_batch_timer_expired(void *data)
osmux_xfrm_input_deliver(h);
}
static struct msgb *osmux_batch_get(void)
{
if (batch.msg == NULL) {
batch.msg = msgb_alloc(OSMUX_BATCH_MAX, "OSMUX");
if (batch.msg == NULL) {
LOGP(DOSMUX, LOGL_ERROR, "Not enough memory\n");
return NULL;
}
osmo_timer_schedule(&batch.timer, 0, 160000); /* XXX */
}
return batch.msg;
}
static int
osmux_batch_add(struct msgb *msg, struct rtp_hdr *rtph, struct amr_hdr *amrh,
uint32_t amr_payload_len, uint8_t circuit_id, uint8_t seq)
{
struct osmux_hdr *osmuxh;
osmuxh = (struct osmux_hdr *)batch.msg->tail;
osmuxh->ft = OSMUX_FT_VOICE_AMR;
osmuxh->circuit_id = circuit_id;
osmuxh->seq = seq;
osmuxh->amr_cmr = amrh->cmr;
osmuxh->amr_f = amrh->f;
osmuxh->amr_ft = amrh->ft;
osmuxh->amr_q = amrh->q;
osmuxh->rtp_marker = rtph->marker;
msgb_put(batch.msg, sizeof(struct osmux_hdr));
memcpy(batch.msg->tail, osmo_amr_get_payload(amrh), amr_payload_len);
msgb_put(batch.msg, amr_payload_len);
return 0;
}
static int osmux_xfrm_encore_amr(struct rtp_hdr *rtph, struct msgb *msg)
static int osmux_rtp_amr_payload_len(struct msgb *msg, struct rtp_hdr *rtph)
{
struct amr_hdr *amrh;
struct msgb *out_msg;
uint32_t amr_len;
uint32_t amr_payload_len;
unsigned int amr_len;
amrh = osmo_rtp_get_payload(rtph, msg, &amr_len);
if (amrh == NULL)
return -1;
amr_payload_len = amr_len - sizeof(struct amr_hdr);
return amr_len - sizeof(struct amr_hdr);
}
if (!osmux_batch_has_room(sizeof(struct osmux_hdr) + amr_payload_len))
static int osmux_msgb_batch_queue_add(struct msgb *msg)
{
struct rtp_hdr *rtph;
struct msgb *cur;
int found = 0, bytes = 0;
rtph = osmo_rtp_get_hdr(msg);
if (rtph == NULL)
return -1;
llist_for_each_entry(cur, &batch.msgb_list, list) {
struct rtp_hdr *rtph2;
rtph2 = osmo_rtp_get_hdr(msg);
if (rtph2 == NULL)
return -1;
/* inset messages in order based on the RTP SSRC */
if (rtph->ssrc < rtph2->ssrc)
continue;
if (rtph->ssrc == rtph2->ssrc) {
found = 1;
continue;
}
bytes += osmux_rtp_amr_payload_len(msg, rtph);
if (!found)
bytes += sizeof(struct osmux_hdr);
/* Still room in this batch for this message? if there is not
* then deliver current batch.
*/
if (bytes > batch.remaining_bytes)
return 1;
batch.remaining_bytes -= bytes;
llist_add(&msg->list, &cur->list);
LOGP(DOSMUX, LOGL_DEBUG, "adding to batch (%p)\n", msg);
return 0;
}
/*
* ... adding to the tail or empty list case.
*/
bytes += osmux_rtp_amr_payload_len(msg, rtph);
if (!found)
bytes += sizeof(struct osmux_hdr);
/* Still room in this batch for this message? if there is not
* then deliver current batch.
*/
if (bytes > batch.remaining_bytes)
return 1;
out_msg = osmux_batch_get();
if (out_msg == NULL)
return -1;
batch.remaining_bytes -= bytes;
llist_add_tail(&msg->list, &batch.msgb_list);
if (osmux_batch_add(out_msg, rtph, amrh, amr_payload_len, 0,
batch.seq++) < 0)
return -1;
LOGP(DOSMUX, LOGL_DEBUG, "adding to batch (%p)\n", msg);
return 0;
}
@ -212,7 +336,17 @@ int osmux_xfrm_input(struct msgb *msg)
case RTP_PT_RTCP:
return 0;
case RTP_PT_AMR:
ret = osmux_xfrm_encore_amr(rtph, msg);
/* This is the first message in the batch, start the
* batch timer to deliver it.
*/
if (llist_empty(&batch.msgb_list)) {
LOGP(DOSMUX, LOGL_DEBUG,
"osmux start timer batch\n");
osmo_timer_schedule(&batch.timer, 0,
OSMUX_BATCH_FACTOR * 20000);
}
ret = osmux_msgb_batch_queue_add(msg);
break;
default:
/* Only AMR supported so far, sorry. */
@ -224,6 +358,9 @@ int osmux_xfrm_input(struct msgb *msg)
void osmux_xfrm_input_init(struct osmux_in_handle *h)
{
LOGP(DOSMUX, LOGL_DEBUG, "initialized osmux input converter\n");
INIT_LLIST_HEAD(&batch.msgb_list);
batch.remaining_bytes = OSMUX_BATCH_MAX;
batch.timer.cb = osmux_batch_timer_expired;
batch.timer.data = h;
}

View File

@ -63,12 +63,22 @@ static void deliver(struct msgb *batch_msg)
/* This code below belongs to the osmux receiver */
while((osmuxh = osmux_xfrm_output_pull(batch_msg)) != NULL) {
struct msgb *next;
msg = osmux_xfrm_output(osmuxh, &h_output);
printf("scheduled transmision in %lu.%6lu seconds, "
"msg=%p (%d in batch)\n",
tv.tv_sec, tv.tv_usec, msg, ++i);
osmux_tx_sched(msg, &tv, tx_cb, NULL);
timeradd(&tv, &delta, &tv);
llist_for_each_entry(next, &msg->list, list) {
printf("scheduled transmision in %lu.%6lu seconds, "
"msg=%p (%d in batch)\n",
tv.tv_sec, tv.tv_usec, next, ++i);
osmux_tx_sched(next, &tv, tx_cb, NULL);
timeradd(&tv, &delta, &tv);
}
}
}