RIFO (random in, first out) for IP->E1 direction

In the past, we used a FIFO structure (first in, first out) - which
obviously cannot deal with packet re-ordering on the IP side.

This patch introduces a new "RIFO" as a replacement for the FIFO.

The RIFO is able to reconstruct E1 frame ordering by using the
reduced frame number from the TDMoIP messages, at least as long
as the related frame number range is within the current RIFO depth.

Change-Id: I22256870114cb85e4e10932554478be7061e086b
This commit is contained in:
Harald Welte 2022-03-10 22:20:07 +01:00
parent 322fbc4901
commit 38b1c5d3f0
14 changed files with 462 additions and 12 deletions

8
.gitignore vendored
View File

@ -34,3 +34,11 @@ src/osmo-e1d
src/osmo-e1d-pipe
contrib/e1-prbs-test/e1-prbs-test
contrib/e1-prbs-test/ice40-e1-prbs-check
tests/atconfig
tests/atlocal
tests/package.m4
tests/testsuite
tests/testsuite.log
tests/testsuite.dir
tests/*/*_test

View File

@ -5,6 +5,7 @@ SUBDIRS = \
doc \
src \
include \
tests \
$(NULL)
EXTRA_DIST = \

View File

@ -9,6 +9,7 @@ dnl libtool init
LT_INIT
AM_INIT_AUTOMAKE([foreign dist-bzip2 no-dist-gzip 1.9])
AC_CONFIG_TESTDIR(tests)
CFLAGS="$CFLAGS -std=gnu11"
@ -74,6 +75,26 @@ then
CPPFLAGS="$CPPFLAGS $WERROR_FLAGS"
fi
AC_ARG_ENABLE([external_tests],
AC_HELP_STRING([--enable-external-tests],
[Include the VTY/CTRL tests in make check [default=no]]),
[enable_ext_tests="$enableval"],[enable_ext_tests="no"])
if test "x$enable_ext_tests" = "xyes" ; then
AC_CHECK_PROG(PYTHON3_AVAIL,python3,yes)
if test "x$PYTHON3_AVAIL" != "xyes" ; then
AC_MSG_ERROR([Please install python3 to run the VTY/CTRL tests.])
fi
AC_CHECK_PROG(OSMOTESTEXT_CHECK,osmotestvty.py,yes)
if test "x$OSMOTESTEXT_CHECK" != "xyes" ; then
AC_MSG_ERROR([Please install git://osmocom.org/python/osmo-python-tests to run the VTY/CTRL tests.])
fi
fi
AC_MSG_CHECKING([whether to enable VTY/CTRL tests])
AC_MSG_RESULT([$enable_ext_tests])
AM_CONDITIONAL(ENABLE_EXT_TESTS, test "x$enable_ext_tests" = "xyes")
# https://www.freedesktop.org/software/systemd/man/daemon.html
AC_ARG_WITH([systemdsystemunitdir],
[AS_HELP_STRING([--with-systemdsystemunitdir=DIR], [Directory for systemd service files])],,
@ -99,6 +120,8 @@ AC_OUTPUT(
src/Makefile
src/octoi/Makefile
include/Makefile
tests/Makefile
tests/rifo/Makefile
libosmo-e1d.pc
libosmo-octoi.pc
)

View File

@ -11,6 +11,7 @@ lib_LTLIBRARIES = libosmo-octoi.la
libosmo_octoi_la_SOURCES = \
frame_fifo.c \
frame_rifo.c \
e1oip.c \
octoi.c \
octoi_sock.c \
@ -28,6 +29,7 @@ libosmo_octoi_la_LIBADD = $(LIBOSMOCORE_LIBS) $(LIBOSMOVTY_LIBS)
noinst_HEADERS = \
e1oip.h \
frame_fifo.h \
frame_rifo.h \
octoi.h \
octoi_fsm.h \
octoi_sock.h \

View File

@ -168,11 +168,13 @@ int e1oip_rcvmsg_tdm_data(struct e1oip_line *iline, struct msgb *msg)
struct octoi_peer *peer = iline->peer;
const struct e1oip_tdm_hdr *e1th;
uint16_t frame_nr;
uint32_t fn32;
uint32_t ts_mask;
uint8_t idx2ts[BYTES_PER_FRAME];
unsigned int n_frames;
uint8_t frame_buf[BYTES_PER_FRAME];
unsigned int num_ts;
uint16_t exp_next_seq = iline->e1t.next_fn32 & 0xffff;
struct timespec ts;
/* update the timestamp at which we last received data from this peer */
@ -191,10 +193,33 @@ int e1oip_rcvmsg_tdm_data(struct e1oip_line *iline, struct msgb *msg)
frame_nr = ntohs(e1th->frame_nr);
ts_mask = ntohl(e1th->ts_mask);
if (frame_nr != iline->e1t.next_seq) {
LOGPEER(peer, LOGL_NOTICE, "RxIP: frame_nr=%u, but expected %u: packet loss? "
"or re-ordering?\n", frame_nr, iline->e1t.next_seq);
/* FIXME: locally substitute frames? */
if (frame_nr != exp_next_seq) {
int delta = frame_nr - exp_next_seq;
bool re_ordering;
if (delta > 0 && delta < 5 * iline->cfg.batching_factor) {
/* assume re-ordering */
re_ordering = true;
} else {
/* assume packet loss */
re_ordering = false;
}
LOGPEER(peer, LOGL_NOTICE, "RxIP: frame_nr=%u, but expected %u: delta=%d - assuming %s\n",
frame_nr, exp_next_seq, delta, re_ordering ? "re-ordering" : "packet loss");
fn32 = (iline->e1t.next_fn32 & 0xffff0000) + frame_nr;
if (fn32 > iline->e1t.next_fn32 + 8000) {
/* more than 1s in the future: was this a wrap-around? */
fn32 = ((iline->e1t.next_fn32 & 0xffff0000) - 0x10000) + frame_nr;
if (fn32 < iline->e1t.next_fn32 - 8000) {
/* also no match: give up */
LOGPEER(peer, LOGL_NOTICE, "RxIP: frame_nr=%u at exp_next_fn32=%u; "
"received frame outside +/- 1s window of expected frame\n",
frame_nr, iline->e1t.next_fn32);
}
}
} else {
fn32 = iline->e1t.next_fn32;
}
/* compute E1oIP idx to timeslot table */
@ -223,13 +248,15 @@ int e1oip_rcvmsg_tdm_data(struct e1oip_line *iline, struct msgb *msg)
frame_buf[ts_nr] = e1th->data[i*num_ts + j];
}
/* FIXME: what to do about TS0? */
frame_fifo_in(&iline->e1t.fifo, frame_buf);
frame_rifo_in(&iline->e1t.rifo, frame_buf, fn32+i);
}
/* update local state */
memcpy(iline->e1t.last_frame, frame_buf, BYTES_PER_FRAME);
iline->e1t.next_seq = frame_nr + n_frames;
/* FIXME: only in some cases */
//if (fn32 >= iline->e1t.next_fn32I
iline->e1t.next_fn32 = fn32 + n_frames;
iline_stat_set(iline, LINE_STAT_E1oIP_E1T_FIFO, frame_fifo_frames(&iline->e1t.fifo));
//iline_stat_set(iline, LINE_STAT_E1oIP_E1T_FIFO, frame_fifo_frames(&iline->e1t.fifo));
return 0;
}
@ -258,7 +285,7 @@ struct e1oip_line *e1oip_line_alloc(struct octoi_peer *peer)
frame_fifo_init(&iline->e1o.fifo, iline->cfg.batching_factor, fifo_threshold_cb, iline);
memset(&iline->e1o.last_frame, 0xff, sizeof(iline->e1o.last_frame));
frame_fifo_init(&iline->e1t.fifo, 0, NULL, iline);
frame_rifo_init(&iline->e1t.rifo);
memset(&iline->e1t.last_frame, 0xff, sizeof(iline->e1o.last_frame));
iline->peer = peer;

View File

@ -7,6 +7,7 @@
#include <osmocom/octoi/e1oip_proto.h>
#include "frame_fifo.h"
#include "frame_rifo.h"
#define iline_ctr_add(iline, idx, add) rate_ctr_add(rate_ctr_group_get_ctr((iline)->ctrs, idx), add)
#define iline_stat_set(iline, idx, add) \
@ -47,9 +48,9 @@ struct e1oip_line {
/* E1 terminated side (E1<-IP) */
struct {
struct frame_fifo fifo;
struct frame_rifo rifo;
uint8_t last_frame[BYTES_PER_FRAME]; /* last frame on the E1 side */
uint16_t next_seq; /* next expected sequence nr */
uint32_t next_fn32; /* next expected frame number */
} e1t;
/* TODO: statistics (RTT, frame loss, std deviation, alarms */

View File

@ -1,5 +1,8 @@
#pragma once
#include <stdint.h>
#include <stddef.h>
#define BYTES_PER_FRAME 32
#define FRAMES_PER_FIFO 800

153
src/octoi/frame_rifo.c Normal file
View File

@ -0,0 +1,153 @@
/*
* frame_rifo.c
*
* This is for the IP -> E1 direction, where IP packets may arrive with
* re-ordering. So this "Random [order] In, First Out' is reconstructing
* the original order.
*
* (C) 2022 by Harald Welte <laforge@osmocom.org>
*
* All Rights Reserved
*
* SPDX-License-Identifier: GPL-2.0+
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include <errno.h>
#include <unistd.h>
#include <stdint.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <osmocom/core/utils.h>
#include "frame_rifo.h"
/***********************************************************************
* Frame RIFO
***********************************************************************/
/* return the absolute bucket number (0.. FRAMES_PER_FIFO-1) for given fn */
static inline uint32_t bucket_for_fn(const struct frame_rifo *rifo, uint32_t fn)
{
uint32_t next_out_bucket = (rifo->next_out - rifo->buf) / BYTES_PER_FRAME;
/* offset in frames compared to next_out */
uint32_t offset = fn - rifo->next_out_fn % FRAMES_PER_FIFO;
return (next_out_bucket + offset) % FRAMES_PER_FIFO;
}
/* set the bucket bit for given bucket number */
static void bucket_bit_set(struct frame_rifo *rifo, uint32_t bucket_nr)
{
uint8_t byte = bucket_nr/8;
uint8_t bit = bucket_nr%8;
OSMO_ASSERT(byte < sizeof(rifo->bitvec));
rifo->bitvec[byte] |= (1 << bit);
}
/* clear the bucket bit for given bucket number */
static void bucket_bit_clear(struct frame_rifo *rifo, uint32_t bucket_nr)
{
uint8_t byte = bucket_nr/8;
uint8_t bit = bucket_nr%8;
OSMO_ASSERT(byte < sizeof(rifo->bitvec));
rifo->bitvec[byte] &= ~(1 << bit);
}
/* is the given bucket bit number set? */
static bool bucket_bit_get(struct frame_rifo *rifo, uint32_t bucket_nr)
{
uint8_t byte = bucket_nr/8;
uint8_t bit = bucket_nr%8;
OSMO_ASSERT(byte < sizeof(rifo->bitvec));
return rifo->bitvec[byte] & (1 << bit);
}
void rifo_dump(struct frame_rifo *rifo)
{
printf("buf=%p, size=%zu, next_out=%lu, next_out_fn=%u\n", rifo->buf, sizeof(rifo->buf),
rifo->next_out - rifo->buf, rifo->next_out_fn);
}
/*! Initialize a frame RIFO.
* \param rifo Caller-allocated memory for RIFO data structure */
void frame_rifo_init(struct frame_rifo *rifo)
{
memset(rifo->buf, 0xff, sizeof(rifo->buf));
rifo->next_out = rifo->buf;
rifo->next_out_fn = 0;
memset(rifo->bitvec, 0, sizeof(rifo->bitvec));
}
#define RIFO_BUF_END(f) ((f)->buf + sizeof((f)->buf))
/*! put one received frame into the RIFO at a given specified frame number.
* \param rifo The RIFO to which we want to put (append) multiple frames
* \param frame Pointer to memory containing the frame data
* \param fn Absolute frame number at which to insert the frame.
* \returns 0 on success; -1 on error (overflow */
int frame_rifo_in(struct frame_rifo *rifo, const uint8_t *frame, uint32_t fn)
{
uint32_t bucket;
uint8_t *dst;
if (fn > frame_rifo_max_in_fn(rifo))
return -ERANGE;
bucket = bucket_for_fn(rifo, fn);
dst = rifo->buf + bucket * BYTES_PER_FRAME;
OSMO_ASSERT(dst + BYTES_PER_FRAME <= RIFO_BUF_END(rifo));
memcpy(dst, frame, BYTES_PER_FRAME);
bucket_bit_set(rifo, bucket);
return 0;
}
/*! pull one frames out of the RIFO.
* \param rifo The RIFO from which we want to pull frames
* \param out Caller-allocated output buffer
* \returns 0 on success; -1 on error (no frame available) */
int frame_rifo_out(struct frame_rifo *rifo, uint8_t *out)
{
uint32_t next_out_bucket = (rifo->next_out - rifo->buf) / BYTES_PER_FRAME;
bool bucket_bit = bucket_bit_get(rifo, next_out_bucket);
int rc = 0;
if (!bucket_bit) {
/* caller is supposed to copy/duplicate previous frame */
rc = -1;
} else {
memcpy(out, rifo->next_out, BYTES_PER_FRAME);
bucket_bit_clear(rifo, next_out_bucket);
}
/* advance by one frame */
rifo->next_out += BYTES_PER_FRAME;
rifo->next_out_fn += 1;
if (rifo->next_out >= RIFO_BUF_END(rifo))
rifo->next_out -= sizeof(rifo->buf);
return rc;
}

43
src/octoi/frame_rifo.h Normal file
View File

@ -0,0 +1,43 @@
#pragma once
#include "frame_fifo.h"
struct frame_rifo {
uint8_t *next_out; /* where to read next output from FIFO */
uint8_t buf[BYTES_PER_FRAME * FRAMES_PER_FIFO];
uint32_t next_out_fn; /* frame number of next output frame */
uint8_t bitvec[FRAMES_PER_FIFO/8];
/* bit-vector of occupied (data received) slots in FIFO,
indexed by physical offset in buf */
};
/* maximum frame number we currently can store in the rifo */
static inline uint32_t frame_rifo_max_in_fn(const struct frame_rifo *ff)
{
return ff->next_out_fn + FRAMES_PER_FIFO - 1;
}
void frame_rifo_init(struct frame_rifo *rifo);
/* number of frames currently available in FIFO */
static inline unsigned int frame_rifo_frames(struct frame_rifo *rifo)
{
unsigned int byte, bit;
unsigned int frame_count = 0;
for (byte = 0; byte < sizeof(rifo->bitvec); byte++) {
for (bit = 0; bit < 8; bit++) {
if (rifo->bitvec[byte] & (1 << bit))
frame_count++;
}
}
return frame_count;
}
/* put a received frame into the FIFO */
int frame_rifo_in(struct frame_rifo *rifo, const uint8_t *frame, uint32_t fn);
/* pull one frame out of the FIFO */
int frame_rifo_out(struct frame_rifo *rifo, uint8_t *out);

View File

@ -120,12 +120,12 @@ void octoi_peer_e1t_out(struct octoi_peer *peer, uint8_t *buf, int fts)
for (int i = 0; i < fts; i++) {
uint8_t *cur = buf + BYTES_PER_FRAME*i;
rc = frame_fifo_out(&iline->e1t.fifo, cur);
rc = frame_rifo_out(&iline->e1t.rifo, cur);
if (rc < 0) {
iline_ctr_add(iline, LINE_CTR_E1oIP_UNDERRUN, 1);
/* substitute with last received frame */
memcpy(cur, iline->e1t.last_frame, BYTES_PER_FRAME);
}
}
iline_stat_set(iline, LINE_STAT_E1oIP_E1T_FIFO, frame_fifo_frames(&iline->e1t.fifo));
//iline_stat_set(iline, LINE_STAT_E1oIP_E1T_FIFO, frame_fifo_frames(&iline->e1t.fifo));
}

75
tests/Makefile.am Normal file
View File

@ -0,0 +1,75 @@
SUBDIRS = \
rifo \
$(NULL)
# The `:;' works around a Bash 3.2 bug when the output is not writeable.
$(srcdir)/package.m4: $(top_srcdir)/configure.ac
:;{ \
echo '# Signature of the current package.' && \
echo 'm4_define([AT_PACKAGE_NAME],' && \
echo ' [$(PACKAGE_NAME)])' && \
echo 'm4_define([AT_PACKAGE_TARNAME],' && \
echo ' [$(PACKAGE_TARNAME)])' && \
echo 'm4_define([AT_PACKAGE_VERSION],' && \
echo ' [$(PACKAGE_VERSION)])' && \
echo 'm4_define([AT_PACKAGE_STRING],' && \
echo ' [$(PACKAGE_STRING)])' && \
echo 'm4_define([AT_PACKAGE_BUGREPORT],' && \
echo ' [$(PACKAGE_BUGREPORT)])'; \
echo 'm4_define([AT_PACKAGE_URL],' && \
echo ' [$(PACKAGE_URL)])'; \
} >'$(srcdir)/package.m4'
EXTRA_DIST = \
testsuite.at \
$(srcdir)/package.m4 \
$(TESTSUITE) \
$(NULL)
TESTSUITE = $(srcdir)/testsuite
DISTCLEANFILES = \
atconfig \
$(NULL)
if ENABLE_EXT_TESTS
python-tests: $(BUILT_SOURCES)
$(MAKE) vty-test
osmotestvty.py -p $(abs_top_srcdir) -w $(abs_top_builddir) -v
osmotestconfig.py -p $(abs_top_srcdir) -w $(abs_top_builddir) -v
$(srcdir)/vty_test_runner.py -w $(abs_top_builddir) -v
$(srcdir)/ctrl_test_runner.py -w $(abs_top_builddir) -v
else
python-tests: $(BUILT_SOURCES)
echo "Not running python-based tests (determined at configure-time)"
endif
# Run a specific test with: 'make vty-test VTY_TEST=osmo-e1d.vty'
VTY_TEST ?= *.vty
# To update the VTY script from current application behavior,
# pass -u to vty_script_runner.py by doing:
# make vty-test U=-u
vty-test:
osmo_verify_transcript_vty.py -v \
-n osmo-e1d -p 4239 \
-r "$(top_builddir)/src/osmo-e1d/osmo-e1d -c $(top_srcdir)/doc/examples/osmo-e1d/osmo-e1d-vpair.cfg" \
$(U) $(srcdir)/$(VTY_TEST)
check-local: atconfig $(TESTSUITE)
$(SHELL) '$(TESTSUITE)' $(TESTSUITEFLAGS)
$(MAKE) $(AM_MAKEFLAGS) python-tests
installcheck-local: atconfig $(TESTSUITE)
$(SHELL) '$(TESTSUITE)' AUTOTEST_PATH='$(bindir)' \
$(TESTSUITEFLAGS)
clean-local:
test ! -f '$(TESTSUITE)' || \
$(SHELL) '$(TESTSUITE)' --clean
AUTOM4TE = $(SHELL) $(top_srcdir)/missing --run autom4te
AUTOTEST = $(AUTOM4TE) --language=autotest
$(TESTSUITE): $(srcdir)/testsuite.at $(srcdir)/package.m4
$(AUTOTEST) -I '$(srcdir)' -o $@.tmp $@.at
mv $@.tmp $@

30
tests/rifo/Makefile.am Normal file
View File

@ -0,0 +1,30 @@
AM_CPPFLAGS = \
$(all_includes) \
-I$(top_srcdir)/include \
-I$(top_srcdir)/src \
-I$(top_srcdir)/src/octoi \
$(NULL)
AM_CFLAGS = \
-Wall \
-ggdb3 \
$(LIBOSMOCORE_CFLAGS) \
$(NULL)
EXTRA_DIST = \
rifo_test.ok \
$(NULL)
noinst_PROGRAMS = \
rifo_test \
$(NULL)
rifo_test_SOURCES = \
rifo_test.c \
$(NULL)
rifo_test_LDADD = \
$(top_builddir)/src/octoi/frame_rifo.o \
$(top_builddir)/src/log.o \
$(LIBOSMOCORE_LIBS) \
$(NULL)

76
tests/rifo/rifo_test.c Normal file
View File

@ -0,0 +1,76 @@
#include <stdint.h>
#include <string.h>
#include <osmocom/core/application.h>
#include <osmocom/core/talloc.h>
#include <osmocom/core/utils.h>
#include "log.h"
#include "frame_rifo.h"
static void *g_e1d_ctx;
static void rifo_in(struct frame_rifo *rifo, uint8_t *frame, uint32_t fn)
{
printf("RIFO_IN(%s, %u)\n", osmo_hexdump_nospc(frame, BYTES_PER_FRAME), fn);
OSMO_ASSERT(frame_rifo_in(rifo, frame, fn) == 0);
}
static int rifo_out(struct frame_rifo *rifo, uint8_t *out)
{
int rc = frame_rifo_out(rifo, out);
printf("RIFO_OUT(%s)=%d\n", osmo_hexdump_nospc(out, BYTES_PER_FRAME), rc);
return rc;
}
static void missing_frames(void)
{
struct frame_rifo rifo;
frame_rifo_init(&rifo);
printf("\nTEST: %s\n", __func__);
for (int i = 0; i < 10; i++) {
uint8_t frame[32];
memset(frame, i, sizeof(frame));
rifo_in(&rifo, frame, 2*i);
}
for (int i = 0; i < 10; i++) {
uint8_t frame[32];
memset(frame, 0xff, sizeof(frame));
rifo_out(&rifo, frame);
}
}
static void reordered_in(void)
{
struct frame_rifo rifo;
frame_rifo_init(&rifo);
printf("\nTEST: %s\n", __func__);
const uint8_t in[] = { 0, 1, 4, 3, 5, 2, 6, 7, 8, 9 };
for (int i = 0; i < sizeof(in); i++) {
uint8_t frame[32];
memset(frame, in[i], sizeof(frame));
rifo_in(&rifo, frame, in[i]);
}
for (int i = 0; i < 10; i++) {
uint8_t frame[32];
memset(frame, 0xff, sizeof(frame));
rifo_out(&rifo, frame);
}
}
int main(int argc, char **argv)
{
g_e1d_ctx = talloc_named_const(NULL, 0, "osmo-e1d");
osmo_init_logging2(g_e1d_ctx, &log_info);
missing_frames();
reordered_in();
}

8
tests/testsuite.at Normal file
View File

@ -0,0 +1,8 @@
AT_INIT
AT_BANNER([Regression tests.])
AT_SETUP([rifo])
AT_KEYWORDS([rifo])
cat $abs_srcdir/rifo/rifo_test.ok > expout
AT_CHECK([$abs_top_builddir/tests/rifo/rifo_test], [], [expout], [ignore])
AT_CLEANUP