procqueue: use linuxlist to store queue items

The usage of linuxlist is more flexible than having a limited
array of pointers. This approach allows to have as much items
in a processing queue as required.
This commit is contained in:
Vadim Yanitskiy 2017-09-01 16:19:40 +07:00
parent 1cf7f34444
commit 539af39e29
2 changed files with 64 additions and 46 deletions

View File

@ -22,6 +22,8 @@
#include <stdint.h>
#include <stdio.h> /* for FILE */
#include <osmocom/core/linuxlist.h>
struct osmo_gapk_pq_item {
/*! input frame size (in bytes). '0' in case of variable frames */
unsigned int len_in;
@ -39,14 +41,16 @@ struct osmo_gapk_pq_item {
* \returns number of output bytes written to \a out; negative on error */
int (*proc)(void *state, uint8_t *out, const uint8_t *in, unsigned int in_len);
void (*exit)(void *state);
/*! \brief link to a processing queue */
struct llist_head list;
};
#define VAR_BUF_SIZE 320
#define MAX_PQ_ITEMS 8
struct osmo_gapk_pq {
struct llist_head items;
unsigned n_items;
struct osmo_gapk_pq_item *items[MAX_PQ_ITEMS];
};
/* Processing queue management */

View File

@ -21,13 +21,23 @@
#include <stdint.h>
#include <stdlib.h>
#include <osmocom/core/linuxlist.h>
#include <osmocom/gapk/procqueue.h>
/* crate a new (empty) processing queue */
struct osmo_gapk_pq *
osmo_gapk_pq_create(void)
{
return (struct osmo_gapk_pq *) calloc(1, sizeof(struct osmo_gapk_pq));
struct osmo_gapk_pq *pq;
/* Allocate memory for a new processing queue */
pq = (struct osmo_gapk_pq *) calloc(1, sizeof(struct osmo_gapk_pq));
/* Init its list of items */
INIT_LLIST_HEAD(&pq->items);
return pq;
}
/*! destroy a processing queue, calls exit() callback of each item
@ -35,19 +45,23 @@ osmo_gapk_pq_create(void)
void
osmo_gapk_pq_destroy(struct osmo_gapk_pq *pq)
{
int i;
struct osmo_gapk_pq_item *item, *item_next;
if (!pq)
return;
for (i=0; i<pq->n_items; i++) {
if (!pq->items[i])
continue;
if (pq->items[i]->exit)
pq->items[i]->exit(pq->items[i]->state);
/* Iterate over all items in queue */
llist_for_each_entry_safe(item, item_next, &pq->items, list) {
/* Free output buffer memory */
free(item->buf);
free(pq->items[i]->buf);
free(pq->items[i]);
/* Call exit handler if preset */
if (item->exit)
item->exit(item->state);
/* Delete an item from list */
llist_del(&item->list);
free(item);
}
free(pq);
@ -61,17 +75,16 @@ osmo_gapk_pq_add_item(struct osmo_gapk_pq *pq)
{
struct osmo_gapk_pq_item *item;
if (pq->n_items == MAX_PQ_ITEMS) {
fprintf(stderr, "[!] Processing Queue cannot handle more than %u items\n",
MAX_PQ_ITEMS);
return NULL;
}
/* Allocate memory for a new item */
item = calloc(1, sizeof(struct osmo_gapk_pq_item));
if (!item)
return NULL;
pq->items[pq->n_items++] = item;
/* Add one to the end of a queue */
llist_add_tail(&item->list, &pq->items);
/* Increase the items count */
pq->n_items++;
return item;
}
@ -82,35 +95,40 @@ osmo_gapk_pq_add_item(struct osmo_gapk_pq *pq)
int
osmo_gapk_pq_prepare(struct osmo_gapk_pq *pq)
{
int i;
unsigned int len_prev;
len_prev = 0;
for (i=0; i<pq->n_items; i++) {
struct osmo_gapk_pq_item *item = pq->items[i];
struct osmo_gapk_pq_item *item;
unsigned int len_prev = 0;
/* Iterate over all items in queue */
llist_for_each_entry(item, &pq->items, list) {
/* Make sure I/O data lengths are equal */
if (item->len_in && item->len_in != len_prev) {
fprintf(stderr, "[!] PQ item requires input size %u, but previous output is %u\n",
item->len_in, len_prev);
fprintf(stderr, "[!] PQ item requires input size %u, "
"but previous output is %u\n", item->len_in, len_prev);
return -EINVAL;
}
if (i < (pq->n_items-1)) {
/* The sink item doesn't require an output buffer */
if (item->list.next != &pq->items) {
unsigned int buf_size = item->len_out;
/* variable-length codec output, use maximum
* known buffer size */
/**
* Use maximum known buffer size
* for variable-length codec output
*/
if (!buf_size)
buf_size = VAR_BUF_SIZE;
/* Allocate memory for an output buffer */
item->buf = malloc(buf_size);
if (!item->buf)
return -ENOMEM;
} else{
} else {
/* Make sure the last item is a sink */
if (item->len_out)
return -EINVAL;
}
/* Store output length for further comparation */
len_prev = item->len_out;
}
@ -123,26 +141,22 @@ osmo_gapk_pq_prepare(struct osmo_gapk_pq *pq)
int
osmo_gapk_pq_execute(struct osmo_gapk_pq *pq)
{
int i;
uint8_t *buf_prev, *buf;
unsigned int len_prev;
struct osmo_gapk_pq_item *item;
unsigned int len_prev = 0;
uint8_t *buf_prev = NULL;
int rv;
buf_prev = NULL;
len_prev = 0;
for (i=0; i<pq->n_items; i++) {
int rv;
struct osmo_gapk_pq_item *item = pq->items[i];
buf = i < (pq->n_items-1) ? item->buf : NULL;
rv = item->proc(item->state, buf, buf_prev, len_prev);
/* Iterate over all items in queue */
llist_for_each_entry(item, &pq->items, list) {
/* Call item's processing handler */
rv = item->proc(item->state, item->buf, buf_prev, len_prev);
if (rv < 0) {
fprintf(stderr, "[!] osmo_gapk_pq_execute(): abort, item returned %d\n", rv);
fprintf(stderr, "[!] osmo_gapk_pq_execute(): "
"abort, item returned %d\n", rv);
return rv;
}
buf_prev = buf;
buf_prev = item->buf;
len_prev = rv;
}