osmux: store internal batching information in struct osmux_in_handle

The layout is not provided, as it is internal.

Thus, we don't allocate the internal batching information in BSS
anymore.
This commit is contained in:
Pablo Neira Ayuso 2012-08-06 18:41:49 +02:00 committed by Pablo Neira Ayuso
parent 6d72b4a729
commit f366c924e2
3 changed files with 49 additions and 31 deletions

View File

@ -43,6 +43,7 @@ struct osmux_hdr {
struct osmux_in_handle {
uint8_t osmux_seq;
void (*deliver)(struct msgb *msg);
char *data; /* internal data */
};
/* one per OSmux circuit_id, ie. one per RTP flow. */
@ -57,7 +58,8 @@ static inline uint8_t *osmux_get_payload(struct osmux_hdr *osmuxh)
}
void osmux_xfrm_input_init(struct osmux_in_handle *h);
int osmux_xfrm_input(struct msgb *msg);
int osmux_xfrm_input(struct osmux_in_handle *h, struct msgb *msg);
void osmux_xfrm_input_deliver(struct osmux_in_handle *h);
int osmux_xfrm_output(struct osmux_hdr *osmuxh, struct osmux_out_handle *h, struct llist_head *list);

View File

@ -131,19 +131,21 @@ int osmux_xfrm_output(struct osmux_hdr *osmuxh, struct osmux_out_handle *h,
return i;
}
static struct osmux_batch {
struct osmux_batch {
struct osmo_timer_list timer;
struct osmux_hdr *osmuxh;
struct llist_head msgb_list;
unsigned int remaining_bytes;
uint8_t seq;
} batch;
};
static int
osmux_batch_add(struct msgb *out_msg, struct msgb *msg, struct rtp_hdr *rtph,
osmux_batch_add(struct osmux_in_handle *h, struct msgb *out_msg,
struct msgb *msg, struct rtp_hdr *rtph,
struct amr_hdr *amrh, uint32_t amr_payload_len,
uint8_t circuit_id, int add_osmux_header)
{
struct osmux_batch *batch = (struct osmux_batch *)h->data;
struct osmux_hdr *osmuxh;
if (add_osmux_header) {
@ -152,22 +154,22 @@ osmux_batch_add(struct msgb *out_msg, struct msgb *msg, struct rtp_hdr *rtph,
osmuxh->ctr = 0;
osmuxh->amr_f = amrh->f;
osmuxh->amr_q= amrh->q;
osmuxh->seq = batch.seq++;
osmuxh->seq = batch->seq++;
osmuxh->circuit_id = circuit_id;
osmuxh->amr_cmr = amrh->cmr;
osmuxh->amr_ft = amrh->ft;
msgb_put(out_msg, sizeof(struct osmux_hdr));
/* annotate current osmux header */
batch.osmuxh = osmuxh;
batch->osmuxh = osmuxh;
} else {
if (batch.osmuxh->ctr == 0x7) {
if (batch->osmuxh->ctr == 0x7) {
LOGP(DOSMUX, LOGL_ERROR, "cannot add msg=%p, "
"too many messages for this RTP ssrc=%u\n",
msg, rtph->ssrc);
return 0;
}
batch.osmuxh->ctr++;
batch->osmuxh->ctr++;
}
memcpy(out_msg->tail, osmo_amr_get_payload(amrh), amr_payload_len);
@ -177,7 +179,8 @@ osmux_batch_add(struct msgb *out_msg, struct msgb *msg, struct rtp_hdr *rtph,
}
static int
osmux_xfrm_encode_amr(struct msgb *out_msg,
osmux_xfrm_encode_amr(struct osmux_in_handle *h,
struct msgb *out_msg,
struct rtp_hdr *rtph, struct msgb *msg,
int add_osmux_header)
{
@ -191,18 +194,19 @@ osmux_xfrm_encode_amr(struct msgb *out_msg,
amr_payload_len = amr_len - sizeof(struct amr_hdr);
if (osmux_batch_add(out_msg, msg, rtph, amrh, amr_payload_len, 0,
add_osmux_header) < 0)
if (osmux_batch_add(h, out_msg, msg, rtph, amrh, amr_payload_len,
0, add_osmux_header) < 0)
return -1;
return 0;
}
static struct msgb *osmux_build_batch(void)
static struct msgb *osmux_build_batch(struct osmux_in_handle *h)
{
struct msgb *cur, *tmp, *batch_msg;
uint32_t last_rtp_ssrc;
int last_rtp_ssrc_set = 0, add_osmux_hdr = 1;
struct osmux_batch *batch = (struct osmux_batch *)h->data;
batch_msg = msgb_alloc(OSMUX_BATCH_MAX, "OSMUX");
if (batch_msg == NULL) {
@ -212,7 +216,7 @@ static struct msgb *osmux_build_batch(void)
LOGP(DOSMUX, LOGL_DEBUG, "Now building batch\n");
llist_for_each_entry_safe(cur, tmp, &batch.msgb_list, list) {
llist_for_each_entry_safe(cur, tmp, &batch->msgb_list, list) {
struct rtp_hdr *rtph;
char buf[4096];
@ -230,7 +234,7 @@ static struct msgb *osmux_build_batch(void)
LOGP(DOSMUX, LOGL_DEBUG, "%s\n", buf);
osmux_xfrm_encode_amr(batch_msg, rtph, cur, add_osmux_hdr);
osmux_xfrm_encode_amr(h, batch_msg, rtph, cur, add_osmux_hdr);
last_rtp_ssrc_set = 1;
last_rtp_ssrc = rtph->ssrc;
@ -244,13 +248,14 @@ static struct msgb *osmux_build_batch(void)
void osmux_xfrm_input_deliver(struct osmux_in_handle *h)
{
struct msgb *batch_msg;
struct osmux_batch *batch = (struct osmux_batch *)h->data;
LOGP(DOSMUX, LOGL_DEBUG, "invoking delivery function\n");
batch_msg = osmux_build_batch();
batch_msg = osmux_build_batch(h);
h->deliver(batch_msg);
msgb_free(batch_msg);
osmo_timer_del(&batch.timer);
batch.remaining_bytes = OSMUX_BATCH_MAX;
osmo_timer_del(&batch->timer);
batch->remaining_bytes = OSMUX_BATCH_MAX;
}
static void osmux_batch_timer_expired(void *data)
@ -273,7 +278,8 @@ static int osmux_rtp_amr_payload_len(struct msgb *msg, struct rtp_hdr *rtph)
return amr_len - sizeof(struct amr_hdr);
}
static int osmux_msgb_batch_queue_add(struct msgb *msg)
static int
osmux_msgb_batch_queue_add(struct osmux_batch *batch, struct msgb *msg)
{
struct rtp_hdr *rtph;
struct msgb *cur;
@ -284,7 +290,7 @@ static int osmux_msgb_batch_queue_add(struct msgb *msg)
if (rtph == NULL)
return -1;
llist_for_each_entry(cur, &batch.msgb_list, list) {
llist_for_each_entry(cur, &batch->msgb_list, list) {
struct rtp_hdr *rtph2;
rtph2 = osmo_rtp_get_hdr(cur);
@ -297,7 +303,7 @@ static int osmux_msgb_batch_queue_add(struct msgb *msg)
if (rtph->ssrc > rtph2->ssrc)
break;
}
if (cur->list.next == batch.msgb_list.next)
if (cur->list.next == batch->msgb_list.next)
list = cur->list.next;
else
list = cur->list.prev;
@ -312,10 +318,10 @@ static int osmux_msgb_batch_queue_add(struct msgb *msg)
/* Still room in this batch for this message? if there is not
* then deliver current batch.
*/
if (bytes > batch.remaining_bytes)
if (bytes > batch->remaining_bytes)
return 1;
batch.remaining_bytes -= bytes;
batch->remaining_bytes -= bytes;
llist_add_tail(&msg->list, list);
LOGP(DOSMUX, LOGL_DEBUG, "adding to batch (%p)\n", msg);
@ -331,10 +337,11 @@ static int osmux_msgb_batch_queue_add(struct msgb *msg)
* that the message has been batched. If 1 is returned, you have to
* invoke osmux_xfrm_input_deliver and try again.
*/
int osmux_xfrm_input(struct msgb *msg)
int osmux_xfrm_input(struct osmux_in_handle *h, struct msgb *msg)
{
int ret;
struct rtp_hdr *rtph;
struct osmux_batch *batch = (struct osmux_batch *)h->data;
rtph = osmo_rtp_get_hdr(msg);
if (rtph == NULL)
@ -347,14 +354,14 @@ int osmux_xfrm_input(struct msgb *msg)
/* This is the first message in the batch, start the
* batch timer to deliver it.
*/
if (llist_empty(&batch.msgb_list)) {
if (llist_empty(&batch->msgb_list)) {
LOGP(DOSMUX, LOGL_DEBUG,
"osmux start timer batch\n");
osmo_timer_schedule(&batch.timer, 0,
osmo_timer_schedule(&batch->timer, 0,
OSMUX_BATCH_FACTOR * DELTA_RTP_MSG);
}
ret = osmux_msgb_batch_queue_add(msg);
ret = osmux_msgb_batch_queue_add(batch, msg);
break;
default:
/* Only AMR supported so far, sorry. */
@ -366,11 +373,20 @@ int osmux_xfrm_input(struct msgb *msg)
void osmux_xfrm_input_init(struct osmux_in_handle *h)
{
struct osmux_batch *batch;
LOGP(DOSMUX, LOGL_DEBUG, "initialized osmux input converter\n");
INIT_LLIST_HEAD(&batch.msgb_list);
batch.remaining_bytes = OSMUX_BATCH_MAX;
batch.timer.cb = osmux_batch_timer_expired;
batch.timer.data = h;
batch = talloc(NULL, struct osmux_batch);
if (batch == NULL)
return;
INIT_LLIST_HEAD(&batch->msgb_list);
batch->remaining_bytes = OSMUX_BATCH_MAX;
batch->timer.cb = osmux_batch_timer_expired;
batch->timer.data = h;
h->data = (void *)batch;
}
struct osmux_tx_handle {

View File

@ -77,7 +77,7 @@ static int pcap_test_run(struct msgb *msg)
{
int ret;
while ((ret = osmux_xfrm_input(msg)) > 1) {
while ((ret = osmux_xfrm_input(&h_input, msg)) > 1) {
/* batch full, deliver it */
osmux_xfrm_input_deliver(&h_input);
}