osmux: cleanup tx path

This patch cleans up the transmission path for osmux, this involves
the functions that extract the messages from the batch and the one
that reconstruct the timing.

They now take a list that contains the reconstructed RTP messages:

 osmux_xfrm_output(osmuxh, &h_output, &list);
 osmux_tx_sched(&list, &tv, tx_cb, NULL);
This commit is contained in:
Pablo Neira Ayuso 2012-08-04 19:56:47 +02:00
parent fdc6538986
commit fe9fccd412
3 changed files with 52 additions and 42 deletions

View File

@ -60,9 +60,9 @@ void osmux_xfrm_input_init(struct osmux_in_handle *h);
int osmux_xfrm_input(struct msgb *msg);
void osmux_xfrm_input_deliver(struct osmux_in_handle *h);
struct msgb *osmux_xfrm_output(struct osmux_hdr *osmuxh, struct osmux_out_handle *h);
int osmux_xfrm_output(struct osmux_hdr *osmuxh, struct osmux_out_handle *h, struct llist_head *list);
struct osmux_hdr *osmux_xfrm_output_pull(struct msgb *msg);
void osmux_tx_sched(struct msgb *msg, struct timeval *when, void (*tx_cb)(struct msgb *msg, void *data), void *data);
void osmux_tx_sched(struct llist_head *list, struct timeval *when, void (*tx_cb)(struct msgb *msg, void *data), void *data);
#endif

View File

@ -34,6 +34,9 @@
/* XXX: make this configurable */
#define OSMUX_BATCH_FACTOR 4
/* delta time between two RTP messages */
#define DELTA_RTP_MSG 20000
struct osmux_hdr *osmux_xfrm_output_pull(struct msgb *msg)
{
struct osmux_hdr *osmuxh = NULL;
@ -100,25 +103,28 @@ osmux_rebuild_rtp(struct osmux_out_handle *h,
return out_msg;
}
struct msgb *
osmux_xfrm_output(struct osmux_hdr *osmuxh, struct osmux_out_handle *h)
int osmux_xfrm_output(struct osmux_hdr *osmuxh, struct osmux_out_handle *h,
struct llist_head *list)
{
struct msgb *first, *next;
struct msgb *msg;
int i;
first = osmux_rebuild_rtp(h, osmuxh,
osmux_get_payload(osmuxh),
osmo_amr_bytes(osmuxh->amr_cmr));
INIT_LLIST_HEAD(&first->list);
INIT_LLIST_HEAD(list);
for (i=0; i<osmuxh->ctr; i++) {
next = osmux_rebuild_rtp(h, osmuxh,
osmux_get_payload(osmuxh) +
((i+1) * osmo_amr_bytes(osmuxh->amr_cmr)),
osmo_amr_bytes(osmuxh->amr_cmr));
llist_add_tail(&next->list, &first->list);
for (i=0; i<osmuxh->ctr+1; i++) {
msg = osmux_rebuild_rtp(h, osmuxh,
osmux_get_payload(osmuxh) +
i * osmo_amr_bytes(osmuxh->amr_cmr),
osmo_amr_bytes(osmuxh->amr_cmr));
if (msg == NULL)
break;
LOGP(DOSMUX, LOGL_DEBUG, "extracted RTP message from batch "
"msg=%p\n", msg);
llist_add_tail(&msg->list, list);
}
return first;
return i;
}
static struct osmux_batch {
@ -344,7 +350,7 @@ int osmux_xfrm_input(struct msgb *msg)
"osmux start timer batch\n");
osmo_timer_schedule(&batch.timer, 0,
OSMUX_BATCH_FACTOR * 20000);
OSMUX_BATCH_FACTOR * DELTA_RTP_MSG);
}
ret = osmux_msgb_batch_queue_add(msg);
break;
@ -366,10 +372,10 @@ void osmux_xfrm_input_init(struct osmux_in_handle *h)
}
struct osmux_tx_handle {
struct osmo_timer_list timer;
struct msgb *msg;
void (*tx_cb)(struct msgb *msg, void *data);
void *data;
struct osmo_timer_list timer;
struct msgb *msg;
void (*tx_cb)(struct msgb *msg, void *data);
void *data;
#ifdef DEBUG_TIMING
struct timeval start;
#endif
@ -392,8 +398,9 @@ static void osmux_tx_cb(void *data)
talloc_free(h);
}
void osmux_tx_sched(struct msgb *msg, struct timeval *when,
void (*tx_cb)(struct msgb *msg, void *data), void *data)
static void
osmux_tx(struct msgb *msg, struct timeval *when,
void (*tx_cb)(struct msgb *msg, void *data), void *data)
{
struct osmux_tx_handle *h;
@ -417,3 +424,21 @@ void osmux_tx_sched(struct msgb *msg, struct timeval *when,
}
osmo_timer_schedule(&h->timer, when->tv_sec, when->tv_usec);
}
void
osmux_tx_sched(struct llist_head *list, struct timeval *when,
void (*tx_cb)(struct msgb *msg, void *data), void *data)
{
struct msgb *cur, *tmp;
struct timeval delta = { .tv_sec = 0, .tv_usec = DELTA_RTP_MSG };
llist_for_each_entry_safe(cur, tmp, list, list) {
LOGP(DOSMUX, LOGL_DEBUG, "scheduled transmision in %lu.%6lu "
"seconds, msg=%p\n", when->tv_sec, when->tv_usec, cur);
osmux_tx(cur, when, tx_cb, NULL);
timeradd(when, &delta, when);
llist_del(&cur->list);
}
}

View File

@ -52,10 +52,8 @@ static void tx_cb(struct msgb *msg, void *data)
static void deliver(struct msgb *batch_msg)
{
struct osmux_hdr *osmuxh;
struct msgb *msg;
int i = 0;
struct timeval tv = { .tv_sec = 0, .tv_usec = 0 };
struct timeval delta = { .tv_sec = 0, .tv_usec = 20000 };
struct timeval tv;
struct llist_head list;
timerclear(&tv);
@ -63,22 +61,9 @@ static void deliver(struct msgb *batch_msg)
/* This code below belongs to the osmux receiver */
while((osmuxh = osmux_xfrm_output_pull(batch_msg)) != NULL) {
struct msgb *next;
msg = osmux_xfrm_output(osmuxh, &h_output);
printf("scheduled transmision in %lu.%6lu seconds, "
"msg=%p (%d in batch)\n",
tv.tv_sec, tv.tv_usec, msg, ++i);
osmux_tx_sched(msg, &tv, tx_cb, NULL);
timeradd(&tv, &delta, &tv);
llist_for_each_entry(next, &msg->list, list) {
printf("scheduled transmision in %lu.%6lu seconds, "
"msg=%p (%d in batch)\n",
tv.tv_sec, tv.tv_usec, next, ++i);
osmux_tx_sched(next, &tv, tx_cb, NULL);
timeradd(&tv, &delta, &tv);
}
osmux_xfrm_output(osmuxh, &h_output, &list);
osmux_tx_sched(&list, &tv, tx_cb, NULL);
}
}