libosmo-netif/tests/stream/stream_test.c

905 lines
30 KiB
C

/*
* (C) 2023 by sysmocom - s.f.m.c. GmbH.
* Authors: Max Suraev
* Alexander Rehbein
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*/
#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <osmocom/core/byteswap.h>
#include <osmocom/core/select.h>
#include <osmocom/core/talloc.h>
#include <osmocom/core/msgb.h>
#include <osmocom/core/logging.h>
#include <osmocom/core/application.h>
#include <osmocom/core/timer.h>
#include <osmocom/gsm/protocol/ipaccess.h>
#include <osmocom/netif/ipa.h>
#include <osmocom/netif/stream.h>
#define RECONNECT_TIMEOUT_SECS 9
#define DSTREAMTEST 0
struct log_info_cat osmo_stream_test_cat[] = {
[DSTREAMTEST] = {
.name = "DSTREAMTEST",
.description = "STREAM test",
.enabled = 1, .loglevel = LOGL_INFO,
},
};
const struct log_info osmo_stream_test_log_info = {
.filter_fn = NULL,
.cat = osmo_stream_test_cat,
.num_cat = ARRAY_SIZE(osmo_stream_test_cat),
};
static struct msgb *make_msgb(const char *m)
{
struct msgb *msg = msgb_alloc(512, "STREAM test");
if (!msg) {
printf("Unable to allocate message\n");
return NULL;
}
if (m)
msgb_printf(msg, "%s", m);
return msg;
}
#define ASTR(rec) ((rec) ? "autoreconnecting" : "non-reconnecting")
/* client defs */
#define LOGCLI(cli, fmt, args...) do { \
struct timeval tv; \
osmo_gettimeofday(&tv, NULL); \
printf("{%lu.%06lu} [%s] Client's %s(): " fmt, tv.tv_sec, tv.tv_usec, \
osmo_stream_cli_get_data(cli) ? "OK" : "NA", __func__, ##args); \
} while (0)
#define CLI_SND(cli, m) do { \
struct msgb *msg = make_msgb(m); \
LOGCLI(cli, "sent %d bytes message: %s\n", \
msg->len, msgb_hexdump(msg)); \
osmo_stream_cli_send(cli, msg); \
} while(0)
/* client callbacks */
static int connect_cb_cli(struct osmo_stream_cli *cli)
{
void *recon = osmo_stream_cli_get_data(cli);
LOGCLI(cli, "callback triggered <%s>\n", recon ? "reconnected" : "initial");
if (recon) {
LOGCLI(cli, "closing connection\n");
osmo_stream_cli_close(cli);
} else
CLI_SND(cli, "Hi! from connect callback :-P");
return 0;
}
static int read_cb_cli(struct osmo_stream_cli *cli)
{
int bytes;
void *cli_data = osmo_stream_cli_get_data(cli);
struct msgb *msg = make_msgb(NULL);
if (!msg)
return -ENOMEM;
LOGCLI(cli, "callback triggered\n");
bytes = osmo_stream_cli_recv(cli, msg);
if (bytes < 0) {
LOGCLI(cli, "unable to receive message\n");
return -EINVAL;
}
if (bytes)
LOGCLI(cli, "received %d(%d) bytes: %s\n", bytes, msg->len, msgb_hexdump(msg));
else {
/* N. B: normally receiving 0 bytes means that we should close the connection and re-establish it
but to test autoreconnection logic we ignore it in here to let the test run till completion */
LOGCLI(cli, "0-byte read, auto-reconnect will be triggered if enabled\n");
osmo_gettimeofday_override_add(RECONNECT_TIMEOUT_SECS, 0);
}
if (!cli_data) {
LOGCLI(cli, "initial read, contacting server\n");
osmo_stream_cli_set_data(cli, msg);
CLI_SND(cli, "Doh, responding to server :-D");
}
msgb_free(msg);
return 0;
}
/* client helpers */
static struct osmo_stream_cli *init_client_reconnection(struct osmo_stream_cli *cli, bool autoreconnect)
{
/* setting negative timeout ensures that we disable reconnection logic */
osmo_stream_cli_set_reconnect_timeout(cli, autoreconnect ? RECONNECT_TIMEOUT_SECS : -1);
if (osmo_stream_cli_open(cli) < 0) {
LOGCLI(cli, "unable to open client\n");
return NULL;
}
return cli;
}
/* Without explicit timeout set with osmo_stream_cli_set_reconnect_timeout() default value is used.
static struct osmo_stream_cli *init_client_reconnection_broken1(struct osmo_stream_cli *cli, bool autoreconnect)
{
if (osmo_stream_cli_open2(cli, autoreconnect) < 0) {
LOGCLI(cli, "unable to open client\n");
return NULL;
}
return cli;
}
That's why those those functions result in exact the same output despite inverse use of autoreconnect parameter.
static struct osmo_stream_cli *init_client_reconnection_broken2(struct osmo_stream_cli *cli, bool autoreconnect)
{
if (osmo_stream_cli_open2(cli, !autoreconnect) < 0) {
LOGCLI(cli, "unable to open client\n");
return NULL;
}
return cli;
}
Variant below are also equivalent to each other.
static struct osmo_stream_cli *init_client_reconnection_broken1(struct osmo_stream_cli *cli, bool autoreconnect)
{
osmo_stream_cli_set_reconnect_timeout(cli, (!autoreconnect) ? 2 : -1);
if (osmo_stream_cli_open2(cli, autoreconnect) < 0) {
LOGCLI(cli, "unable to open client\n");
return NULL;
}
return cli;
}
static struct osmo_stream_cli *init_client_reconnection_broken2(struct osmo_stream_cli *cli, bool autoreconnect)
{
osmo_stream_cli_set_reconnect_timeout(cli, (!autoreconnect) ? 2 : -1);
if (osmo_stream_cli_open2(cli, !autoreconnect) < 0) {
LOGCLI(cli, "unable to open client\n");
return NULL;
}
return cli;
}
Note: the result differs from normal init_client_reconnection()
*/
/* Setting reconnection value explicitly as follows is equivalent to normal init_client_reconnection()
static struct osmo_stream_cli *init_client_reconnection_broken1(struct osmo_stream_cli *cli, bool autoreconnect)
{
osmo_stream_cli_set_reconnect_timeout(cli, autoreconnect ? 2 : -1);
if (osmo_stream_cli_open2(cli, autoreconnect) < 0) {
LOGCLI(cli, "unable to open client\n");
return NULL;
}
return cli;
}
static struct osmo_stream_cli *init_client_reconnection_broken2(struct osmo_stream_cli *cli, bool autoreconnect)
{
osmo_stream_cli_set_reconnect_timeout(cli, autoreconnect ? 2 : -1);
if (osmo_stream_cli_open2(cli, !autoreconnect) < 0) {
LOGCLI(cli, "unable to open client\n");
return NULL;
}
return cli;
}
*/
static struct osmo_stream_cli *make_client(void *ctx, const char *host, unsigned port, bool autoreconnect)
{
struct osmo_stream_cli *cli = osmo_stream_cli_create(ctx);
if (!cli) {
printf("Unable to create client\n");
return NULL;
}
printf("Prepare %s stream client...\n", ASTR(autoreconnect));
osmo_stream_cli_set_local_port(cli, 8976);
osmo_stream_cli_set_name(cli, "cli_test");
osmo_stream_cli_set_addr(cli, host);
osmo_stream_cli_set_port(cli, port);
osmo_stream_cli_set_connect_cb(cli, connect_cb_cli);
osmo_stream_cli_set_read_cb(cli, read_cb_cli);
/* using
return init_client_reconnection_broken1(cli, autoreconnect);
or
return init_client_reconnection_broken2(cli, autoreconnect);
will result in exactly the same output which might or might not be the same as with
init_client_reconnection() - see preceeding notes */
return init_client_reconnection(cli, autoreconnect);
}
/* server defs */
#define LOGLNK(lnk, fmt, args...) \
printf("[%s] Server's %s(): " fmt, osmo_stream_srv_link_get_data(lnk) ? "OK" : "NA", __func__, ##args)
#define LOGSRV(srv, fmt, args...) do { \
struct timeval tv; \
osmo_gettimeofday(&tv, NULL); \
printf("{%lu.%06lu} [%s|%s] Server's %s(): " fmt, tv.tv_sec, tv.tv_usec, \
osmo_stream_srv_get_data(srv) ? "OK" : "NA", \
osmo_stream_srv_link_get_data(osmo_stream_srv_get_master(srv)) ? "OK" : "NA", \
__func__, ##args); \
} while (0)
#define SRV_SND(srv, m) do { \
struct msgb *msg = make_msgb(m); \
LOGSRV(srv, "sent %d bytes message: %s\n", \
msg->len, msgb_hexdump(msg)); \
osmo_stream_srv_send(srv, msg); \
} while(0)
/* server helpers */
static bool subsequent_read(struct osmo_stream_srv *srv)
{
if (osmo_stream_srv_get_data(srv))
return true;
osmo_stream_srv_set_data(srv, srv);
return false;
}
static void request_test_stop(struct osmo_stream_srv *srv)
{
osmo_stream_srv_link_set_data(osmo_stream_srv_get_master(srv), NULL);
}
static bool test_stop_requested(struct osmo_stream_srv_link *lnk)
{
if (osmo_stream_srv_link_get_data(lnk))
return false;
return true;
}
/* server callbacks */
int read_cb_srv(struct osmo_stream_srv *srv)
{
int bytes;
struct msgb *msg = make_msgb(NULL);
if (!msg)
return -ENOMEM;
LOGSRV(srv, "callback triggered\n");
bytes = osmo_stream_srv_recv(srv, msg);
if (bytes <= 0) {
if (bytes < 0)
LOGSRV(srv, "unable to receive message: %s\n", strerror(-bytes));
else {
LOGSRV(srv, "client have already closed connection\n");
/* if client have already closed the connection,
than it must be subsequent (after reconnect) call */
request_test_stop(srv);
}
msgb_free(msg);
osmo_stream_srv_destroy(srv);
return -EINVAL;
} else {
LOGSRV(srv, "received %d(%d) bytes: %s\n", bytes, msg->len, msgb_hexdump(msg));
SRV_SND(srv, __func__);
}
msgb_free(msg);
if (subsequent_read(srv)) {
LOGSRV(srv, "force client disconnect on subsequent call\n");
osmo_stream_srv_destroy(srv);
} else
LOGSRV(srv, "keep initial client connection\n");
return 0;
}
static int close_cb_srv(struct osmo_stream_srv *ignored)
{
return 0;
}
static int accept_cb_srv(struct osmo_stream_srv_link *lnk, int fd)
{
struct osmo_stream_srv *srv = osmo_stream_srv_create(osmo_stream_srv_link_get_data(lnk), lnk, fd,
read_cb_srv, close_cb_srv, NULL);
if (!srv) {
LOGLNK(lnk, "error while creating connection\n");
return -EINVAL;
}
osmo_stream_srv_set_name(srv, "srv_test");
return 0;
}
static void test_recon(void *ctx, const char *host, unsigned port, unsigned steps, struct osmo_stream_srv_link *lnk,
bool autoreconnect)
{
struct timeval tv;
struct osmo_stream_cli *cli;
if (osmo_stream_srv_link_open(lnk) < 0) {
printf("Unable to open server\n");
osmo_stream_srv_link_destroy(lnk);
return;
}
cli = make_client(ctx, host, port, autoreconnect);
if (!cli)
return;
printf("=======================================\n");
printf("Client/Server entering %s event loop...\n", ASTR(autoreconnect));
printf("=======================================\n");
osmo_stream_srv_link_set_data(lnk, ctx);
while(steps--) {
osmo_gettimeofday_override_add(0, 1); /* small increment to easily spot iterations */
osmo_select_main(0);
osmo_gettimeofday(&tv, NULL);
fprintf(stderr, "\n{%lu.%06lu} %s test step %u [client %s, server %s], FD reg %u\n",
tv.tv_sec, tv.tv_usec, ASTR(autoreconnect), steps,
osmo_stream_cli_get_data(cli) ? "OK" : "NA",
osmo_stream_srv_link_get_data(lnk) ? "OK" : "NA",
osmo_fd_is_registered(osmo_stream_cli_get_ofd(cli)));
if (test_stop_requested(lnk)) {
printf("{%lu.%06lu} Server requested test termination\n",
tv.tv_sec, tv.tv_usec);
steps = 0;
}
}
osmo_stream_cli_destroy(cli);
osmo_stream_srv_link_close(lnk);
printf("{%lu.%06lu} %s test complete.\n\n", tv.tv_sec, tv.tv_usec, ASTR(autoreconnect));
}
/* Segmentation test code (using IPA) */
#define IPAC_MSG_PING_LEN 0x01
static const uint8_t ipac_msg_ping[] = {
0x00, IPAC_MSG_PING_LEN,
IPAC_PROTO_IPACCESS,
IPAC_MSGT_PING
};
#define IPAC_MSG_PONG_LEN 0x01
static const uint8_t ipac_msg_pong[] = {
0x00, IPAC_MSG_PONG_LEN,
IPAC_PROTO_IPACCESS,
IPAC_MSGT_PONG
};
#define IPAC_MSG_IDREQ_PAYLOAD_INITIALIZER \
IPAC_MSGT_ID_GET, \
0x01, IPAC_IDTAG_UNITNAME
static const uint8_t ipac_msg_idreq_payload[] = {
IPAC_MSG_IDREQ_PAYLOAD_INITIALIZER
};
#define IPAC_MSG_ID_REQ_LEN 0x03
static const uint8_t ipac_msg_idreq[] = {
0x00, IPAC_MSG_ID_REQ_LEN,
IPAC_PROTO_IPACCESS,
IPAC_MSG_IDREQ_PAYLOAD_INITIALIZER
};
#define ipac_msg_idreq_third (sizeof(ipac_msg_idreq)/3)
#define ipac_msg_idreq_last_third (sizeof(ipac_msg_idreq) - 2 * ipac_msg_idreq_third)
#define IPAC_MSG_ID_RESP_LEN 0x07
static const uint8_t ipac_msg_idresp[] = {
0x00, IPAC_MSG_ID_RESP_LEN,
IPAC_PROTO_IPACCESS,
IPAC_MSGT_ID_RESP,
0x01, IPAC_IDTAG_UNITNAME, 0xde, 0xad, 0xbe, 0xef
};
#define put_ipa_msg(unsigned_char_ptr, struct_msgb_ptr, byte_array) do {\
(unsigned_char_ptr) = msgb_put(struct_msgb_ptr, sizeof(byte_array));\
memcpy(unsigned_char_ptr, byte_array, sizeof(byte_array));\
} while (0)
/* Array indices correspond to enum values stringified on the right */
static const char * const IPAC_MSG_TYPES[] = {
[0] = "IPAC_MSGT_PING",
[1] = "IPAC_MSGT_PONG",
[2] = "UNEXPECTED VALUE",
[3] = "UNEXPECTED VALUE",
[4] = "IPAC_MSGT_ID_GET",
[5] = "IPAC_MSGT_ID_RESP",
};
#define IPAC_MSGT_OFFSET 3
/* Append a message to UCHAR_PTR_DST. SRC_IPAC_MSG_BUF is expected to be a
* buffer containing an IPA message of type IPAC_PROTO_ACCESS that is
* syntactically correct up to offset 3 (IPAC_MSGT_OFFSET).
* Uses a counter so that appended messages can be distinguished easily in the logs */
#define CLI_APPEND_MSG(OSMO_STREAM_CLI_PTR, UCHAR_PTR_DST, STRUCT_MSGB_PTR, SRC_IPAC_MSG_BUF) do {\
LOGCLI(OSMO_STREAM_CLI_PTR, "[%u-cli] Appending msg of type %s into buffer\n",\
++test_segm_ipa_stream_srv_msglognum_cli, IPAC_MSG_TYPES[SRC_IPAC_MSG_BUF[IPAC_MSGT_OFFSET]]);\
LOGCLI(OSMO_STREAM_CLI_PTR, "\t(msg dump: %s)\n", osmo_hexdump(SRC_IPAC_MSG_BUF,\
sizeof(SRC_IPAC_MSG_BUF)));\
put_ipa_msg(UCHAR_PTR_DST, STRUCT_MSGB_PTR, SRC_IPAC_MSG_BUF);\
} while (0)
static unsigned test_segm_ipa_stream_srv_msglognum_cli = 0;
static int test_segm_ipa_stream_srv_cli_connect_cb(struct osmo_stream_cli *cli)
{
unsigned char *data;
struct msgb *m = msgb_alloc_headroom(128, 0, "IPA messages");
if (m == NULL) {
fprintf(stderr, "Cannot allocate message\n");
return -ENOMEM;
}
/* Send 4 and 1/3 messages */
/* Append 4 */
CLI_APPEND_MSG(cli, data, m, ipac_msg_ping);
CLI_APPEND_MSG(cli, data, m, ipac_msg_pong);
CLI_APPEND_MSG(cli, data, m, ipac_msg_ping);
CLI_APPEND_MSG(cli, data, m, ipac_msg_idresp);
/* Append 1/3 */
LOGCLI(cli, "[(0%u + 1/3)-cli] Appending 1st third of msg of type %s into buffer\n",
test_segm_ipa_stream_srv_msglognum_cli, IPAC_MSG_TYPES[ipac_msg_idreq[3]]);
LOGCLI(cli, "\t(dump: %s)\n", osmo_hexdump(ipac_msg_idreq, ipac_msg_idreq_third));
data = msgb_put(m, ipac_msg_idreq_third);
memcpy(data, ipac_msg_idreq, ipac_msg_idreq_third);
LOGCLI(cli, "Sending 4 + 1/3 messages as one:\n");
LOGCLI(cli, "\t(msg dump: %s)\n\n", osmo_hexdump(m->data, m->len));
osmo_stream_cli_send(cli, m);
return 0;
}
static bool test_segm_ipa_stream_srv_all_msgs_processed = false;
static void send_last_third(void *osmo_stream_cli_arg)
{
struct osmo_stream_cli *osc = osmo_stream_cli_arg;
unsigned char *data;
struct msgb *reply = msgb_alloc_headroom(128, 0, "IPA delayed reply");
LOGCLI(osc, "Delay for sending last third of message is over\n");
if (reply == NULL) {
fprintf(stderr, "Cannot allocate message\n");
return;
}
LOGCLI(osc, "[%u-cli] Appending: Last third of IPAC_MSGT_ID_GET\n",
++test_segm_ipa_stream_srv_msglognum_cli);
data = msgb_put(reply, ipac_msg_idreq_last_third);
memcpy(data, ipac_msg_idreq + 2 * ipac_msg_idreq_third,
ipac_msg_idreq_last_third);
/* Append two entire messages */
CLI_APPEND_MSG(osc, data, reply, ipac_msg_pong);
CLI_APPEND_MSG(osc, data, reply, ipac_msg_pong);
LOGCLI(osc, "\tSending:"
"[ Last third of IPAC_MSGT_ID_GET | IPAC_MSGT_PONG | IPAC_MSGT_PONG ]\n");
LOGCLI(osc, "\t(msg dump: %s)\n\n", osmo_hexdump(reply->data, reply->len));
osmo_stream_cli_send(osc, reply);
}
static struct osmo_timer_list fragmented_send_tl_cli;
static int test_segm_ipa_stream_srv_cli_read_cb(struct osmo_stream_cli *osc, struct msgb *msg)
{
unsigned char *data;
struct ipa_head *h = (struct ipa_head *) msg->l1h;
uint8_t ipac_msg_type = *msg->data;
struct msgb *reply;
LOGCLI(osc, "Received message from stream (payload len = %" PRIu16 ")\n", msgb_length(msg));
if (ipac_msg_type < 0 || 5 < ipac_msg_type) {
fprintf(stderr, "Received unexpected IPAC message type %"PRIu8"\n", ipac_msg_type);
msgb_free(msg);
return -ENOMSG;
}
LOGCLI(osc, "\tType: %s\n", IPAC_MSG_TYPES[ipac_msg_type]);
if (ipac_msg_type == IPAC_MSGT_ID_GET) {
LOGCLI(osc, "Got IPAC_MSGT_ID_GET from server\n");
LOGCLI(osc, "[(%u + 2/3) -cli] Appending: Second third of IPAC_MSGT_ID_GET\n",
test_segm_ipa_stream_srv_msglognum_cli);
reply = msgb_alloc_headroom(128, 0, "IPA reply");
if (reply == NULL) {
fprintf(stderr, "Cannot allocate message\n");
return -ENOMEM;
}
data = msgb_put(reply, ipac_msg_idreq_third);
memcpy(data, ipac_msg_idreq + ipac_msg_idreq_third,
ipac_msg_idreq_third);
LOGCLI(osc, "\tSending: Second third of IPAC_MSGT_ID_GET\n");
LOGCLI(osc, "\t(msg dump: %s)\n", osmo_hexdump(reply->data, reply->len));
osmo_stream_cli_send(osc, reply);
osmo_timer_setup(&fragmented_send_tl_cli, send_last_third, osc);
osmo_timer_add(&fragmented_send_tl_cli);
osmo_timer_schedule(&fragmented_send_tl_cli, 0, 500000);
} else if (ipac_msg_type == IPAC_MSGT_ID_RESP) {
LOGCLI(osc, "\tresult= %s\n",
osmo_hexdump((const unsigned char *)h, sizeof(*h) + h->len));
LOGCLI(osc, "\texpected=%s\n",
osmo_hexdump(ipac_msg_idresp, sizeof(ipac_msg_idresp)));
}
msgb_free(msg);
printf("\n");
return 0;
}
struct osmo_stream_cli *test_segm_ipa_stream_srv_run_client(void *ctx)
{
struct osmo_stream_cli *osc = osmo_stream_cli_create(ctx);
if (osc == NULL) {
fprintf(stderr, "osmo_stream_cli_create_iofd()\n");
return NULL;
}
osmo_stream_cli_set_addr(osc, "127.0.0.11");
osmo_stream_cli_set_local_port(osc, 8977);
osmo_stream_cli_set_port(osc, 1111);
osmo_stream_cli_set_connect_cb(osc, test_segm_ipa_stream_srv_cli_connect_cb);
osmo_stream_cli_set_read_cb2(osc, test_segm_ipa_stream_srv_cli_read_cb);
osmo_stream_cli_set_nodelay(osc, true);
if (osmo_stream_cli_open(osc) < 0) {
fprintf(stderr, "Cannot open stream client\n");
return NULL;
}
osmo_stream_cli_set_segmentation_cb(osc, osmo_ipa_segmentation_cb);
return osc;
}
int test_segm_ipa_stream_srv_srv_read_cb(struct osmo_stream_srv *conn, struct msgb *msg)
{
static unsigned msgnum_srv = 0;
struct ipa_head *ih = (struct ipa_head *)msg->l1h;
unsigned char *data;
struct msgb *m;
uint8_t *msgt = msg->l2h; /* Octet right after IPA header */
LOGSRV(conn, "[%u-srv] Received IPA message from stream (payload len = %" PRIu16 ")\n",
++msgnum_srv, msgb_length(msg));
LOGSRV(conn, "\tmsg buff data (including stripped headers): %s\n",
osmo_hexdump((unsigned char *)ih, osmo_ntohs(ih->len) + sizeof(*ih)));
LOGSRV(conn, "\tIPA payload: %s\n", osmo_hexdump(ih->data, osmo_ntohs(ih->len)));
LOGSRV(conn, "\tType: %s\n", IPAC_MSG_TYPES[*msgt]);
LOGSRV(conn, "\t(msg dump: %s)\n", osmo_hexdump(msg->l1h, msg->len + sizeof(struct ipa_head)));
if (*msgt == IPAC_MSGT_ID_RESP) { /* */
LOGSRV(conn, "Send IPAC_MSGT_ID_GET to trigger client to send next third\n\n");
m = osmo_ipa_msg_alloc(128);
if (m == NULL) {
fprintf(stderr, "Cannot allocate message\n");
return -ENOMEM;
}
put_ipa_msg(data, m, ipac_msg_idreq_payload);
osmo_ipa_msg_push_headers(m, IPAC_PROTO_IPACCESS, -1);
osmo_stream_srv_send(conn, m);
} else if (msgnum_srv == 7 && *msgt == IPAC_MSGT_PONG) {
test_segm_ipa_stream_srv_all_msgs_processed = true;
osmo_stream_srv_destroy(conn);
}
return 0;
}
static int test_segm_ipa_stream_srv_srv_accept_cb(struct osmo_stream_srv_link *srv, int fd)
{
void *ctx = osmo_stream_srv_link_get_data(srv);
struct osmo_stream_srv *oss =
osmo_stream_srv_create2(ctx, srv, fd, NULL);
if (oss == NULL) {
fprintf(stderr, "Error while creating connection\n");
return -1;
}
osmo_stream_srv_set_segmentation_cb(oss, osmo_ipa_segmentation_cb);
osmo_stream_srv_set_read_cb(oss, test_segm_ipa_stream_srv_srv_read_cb);
return 0;
}
static void test_segm_ipa_stream_srv_run(void *ctx, const char *host, unsigned port,
struct osmo_stream_srv_link *srv)
{
struct osmo_stream_cli *osc;
const char *testname = "test_segm_ipa_stream_srv";
osmo_stream_srv_link_set_accept_cb(srv,
test_segm_ipa_stream_srv_srv_accept_cb);
if (osmo_stream_srv_link_open(srv) < 0) {
printf("Unable to open server\n");
exit(1);
}
osc = test_segm_ipa_stream_srv_run_client(ctx);
printf("______________________________________Running test %s______________________________________\n", testname);
alarm(2);
while (!test_segm_ipa_stream_srv_all_msgs_processed) {
osmo_gettimeofday_override_add(0, 1); /* small increment to easily spot iterations */
osmo_select_main(1);
}
alarm(0);
printf("==================================Test %s complete========================================\n\n", testname);
if (osc)
osmo_stream_cli_destroy(osc);
osmo_stream_srv_link_close(srv);
}
static void sigalarm_handler(int _foo)
{
printf("FAIL: test did not run successfully\n");
exit(EXIT_FAILURE);
}
static struct osmo_timer_list fragmented_send_tl_srv;
static struct osmo_timer_list fragmented_send_tl_srv_destroy;
static unsigned test_segm_ipa_stream_cli_srv_msglognum = 0;
/* Like CLI_APPEND_MSG, but for server side */
#define SRV_APPEND_MSG(OSMO_STREAM_SRV_PTR, UCHAR_PTR_DST, STRUCT_MSGB_PTR, SRC_IPAC_MSG_BUF) do {\
LOGSRV(OSMO_STREAM_SRV_PTR, "[%u-srv] Appending msg of type %s into buffer\n",\
++test_segm_ipa_stream_cli_srv_msglognum, IPAC_MSG_TYPES[SRC_IPAC_MSG_BUF[IPAC_MSGT_OFFSET]]);\
LOGSRV(OSMO_STREAM_SRV_PTR, "\t(msg dump: %s)\n", osmo_hexdump(SRC_IPAC_MSG_BUF,\
sizeof(SRC_IPAC_MSG_BUF)));\
put_ipa_msg(UCHAR_PTR_DST, STRUCT_MSGB_PTR, SRC_IPAC_MSG_BUF);\
} while (0)
static void destroy_conn(void *osmo_stream_srv_arg)
{
osmo_stream_srv_destroy(osmo_stream_srv_arg);
}
static void send_last_third_srv(void *osmo_stream_srv_arg)
{
struct osmo_stream_srv *oss = osmo_stream_srv_arg;
unsigned char *data;
struct msgb *reply = msgb_alloc_headroom(128, 0, "IPA delayed reply");
LOGSRV(oss, "Delay for sending last third of message is over\n");
if (reply == NULL) {
fprintf(stderr, "Cannot allocate message\n");
return;
}
LOGSRV(oss, "[%u-srv] Appending: Last third of IPAC_MSGT_ID_GET\n",
++test_segm_ipa_stream_cli_srv_msglognum);
data = msgb_put(reply, ipac_msg_idreq_last_third);
memcpy(data, ipac_msg_idreq + 2 * ipac_msg_idreq_third,
ipac_msg_idreq_last_third);
/* Append two entire messages */
SRV_APPEND_MSG(oss, data, reply, ipac_msg_pong);
SRV_APPEND_MSG(oss, data, reply, ipac_msg_pong);
LOGSRV(oss, "\tSending:"
"[ Last third of IPAC_MSGT_ID_GET | IPAC_MSGT_PONG | IPAC_MSGT_PONG ]\n");
LOGSRV(oss, "\t(msg dump: %s)\n\n", osmo_hexdump(reply->data, reply->len));
osmo_stream_srv_send(oss, reply);
osmo_timer_setup(&fragmented_send_tl_srv_destroy, destroy_conn, oss);
osmo_timer_add(&fragmented_send_tl_srv_destroy);
/* 2 select loop iterations needed, timing only 1 will leave the client side hanging while waiting
* to receive the last messages */
osmo_timer_schedule(&fragmented_send_tl_srv_destroy, 0, 2);
}
int test_segm_ipa_stream_cli_srv_read_cb(struct osmo_stream_srv *conn, struct msgb *msg)
{
unsigned char *data;
struct ipa_head *h = (struct ipa_head *) msg->l1h;
uint8_t ipa_msg_type = ((uint8_t *)h)[sizeof(struct ipa_head)];
struct msgb *reply = msgb_alloc_headroom(128, 0, "IPA reply");
if (reply == NULL) {
fprintf(stderr, "Cannot allocate message\n");
return -ENOMEM;
}
LOGSRV(conn, "Received message from stream (total len including stripped headers = %lu)\n",
osmo_ntohs(h->len) + sizeof(*h));
if (5 < ipa_msg_type) {
fprintf(stderr, "Received unexpected IPAC message type %"PRIu8"\n", ipa_msg_type);
return -ENOMSG;
}
LOGSRV(conn, "\tType: %s\n", IPAC_MSG_TYPES[ipa_msg_type]);
if (ipa_msg_type == IPAC_MSGT_ID_GET) {
LOGSRV(conn, "Got IPAC_MSGT_ID_GET from client\n");
LOGSRV(conn, "[(%u + 2/3) -srv] Appending: Second third of IPAC_MSGT_ID_GET\n",
test_segm_ipa_stream_cli_srv_msglognum);
data = msgb_put(reply, ipac_msg_idreq_third);
memcpy(data, ipac_msg_idreq + ipac_msg_idreq_third,
ipac_msg_idreq_third);
LOGSRV(conn, "\tSending: Second third of IPAC_MSGT_ID_GET\n");
LOGSRV(conn, "\t(msg dump: %s)\n", osmo_hexdump(reply->data, reply->len));
osmo_stream_srv_send(conn, reply);
osmo_timer_setup(&fragmented_send_tl_srv, send_last_third_srv, conn);
osmo_timer_add(&fragmented_send_tl_srv);
osmo_timer_schedule(&fragmented_send_tl_srv, 0, 125000);
} else if (ipa_msg_type == IPAC_MSGT_ID_RESP) {
LOGSRV(conn, "\tresult= %s\n",
osmo_hexdump((const unsigned char *)h, sizeof(*h) + h->len));
LOGSRV(conn, "\texpected=%s\n",
osmo_hexdump(ipac_msg_idresp, sizeof(ipac_msg_idresp)));
}
printf("\n");
return 0;
}
static int test_segm_ipa_stream_cli_srv_accept_cb(struct osmo_stream_srv_link *srv, int fd)
{
void *ctx = osmo_stream_srv_link_get_data(srv);
struct osmo_stream_srv *oss =
osmo_stream_srv_create2(ctx, srv, fd, NULL);
unsigned char *data;
struct msgb *m = msgb_alloc_headroom(128, 0, "IPA messages");
if (oss == NULL) {
fprintf(stderr, "Error while creating connection\n");
return -1;
}
if (m == NULL) {
fprintf(stderr, "Cannot allocate message\n");
return -ENOMEM;
}
osmo_stream_srv_set_segmentation_cb(oss, osmo_ipa_segmentation_cb);
osmo_stream_srv_set_read_cb(oss, test_segm_ipa_stream_cli_srv_read_cb);
/* Send 4 and 1/3 messages, as done analogously in test_segm_ipa_stream_srv_cli_connect_cb() */
/* Append 4 */
SRV_APPEND_MSG(oss, data, m, ipac_msg_ping);
SRV_APPEND_MSG(oss, data, m, ipac_msg_pong);
SRV_APPEND_MSG(oss, data, m, ipac_msg_ping);
SRV_APPEND_MSG(oss, data, m, ipac_msg_idresp);
/* Append 1/3 */
LOGSRV(oss, "[(0%u + 1/3)-srv] Appending 1st third of msg of type %s into buffer\n",
test_segm_ipa_stream_cli_srv_msglognum, IPAC_MSG_TYPES[ipac_msg_idreq[3]]);
LOGSRV(oss, "\t(dump: %s)\n", osmo_hexdump(ipac_msg_idreq, ipac_msg_idreq_third));
data = msgb_put(m, ipac_msg_idreq_third);
memcpy(data, ipac_msg_idreq, ipac_msg_idreq_third);
LOGSRV(oss, "Sending 4 + 1/3 messages as one:\n");
LOGSRV(oss, "\t(msg dump: %s)\n\n", osmo_hexdump(m->data, m->len));
osmo_stream_srv_send(oss, m);
return 0;
}
static bool test_segm_ipa_stream_cli_all_msgs_processed = false;
static int test_segm_ipa_stream_cli_cli_read_cb(struct osmo_stream_cli *osc, struct msgb *msg)
{
static unsigned msgnum_cli = 0;
unsigned char *data;
struct msgb *m;
uint8_t *msgt = msg->data;
LOGCLI(osc, "[%u-cli] Received message from stream (len = %" PRIu16 ")\n",
++msgnum_cli, msgb_length(msg));
LOGCLI(osc, "\tmsg buff data: %s\n", osmo_hexdump(msg->data, msg->len));
LOGCLI(osc, "\tIPA payload: %s\n", osmo_hexdump(msg->data, msg->len));
LOGCLI(osc, "\tType: %s\n", IPAC_MSG_TYPES[*msgt]);
LOGCLI(osc, "\t(msg dump (including stripped headers): %s)\n",
osmo_hexdump(msg->l1h, sizeof(struct ipa_head) + msg->len));
if (*msgt == IPAC_MSGT_ID_RESP) {
LOGCLI(osc, "Send IPAC_MSGT_ID_GET to trigger server to send next third\n\n");
m = msgb_alloc_headroom(128, sizeof(struct ipa_head) +
sizeof(struct ipa_head_ext), "IPA messages");
if (m == NULL) {
fprintf(stderr, "Cannot allocate message\n");
return -ENOMEM;
}
put_ipa_msg(data, m, ipac_msg_idreq_payload);
osmo_ipa_msg_push_headers(m, IPAC_PROTO_IPACCESS, -1);
osmo_stream_cli_send(osc, m);
} else if (msgnum_cli == 7 && *msgt == IPAC_MSGT_PONG) {
test_segm_ipa_stream_cli_all_msgs_processed = true;
}
return 0;
}
static struct osmo_stream_cli *test_segm_ipa_stream_cli_run_client(void *ctx)
{
struct osmo_stream_cli *osc = osmo_stream_cli_create(ctx);
if (osc == NULL) {
fprintf(stderr, "osmo_stream_cli_create_iofd()\n");
return NULL;
}
osmo_stream_cli_set_addr(osc, "127.0.0.11");
osmo_stream_cli_set_local_port(osc, 8977);
osmo_stream_cli_set_port(osc, 1112);
osmo_stream_cli_set_read_cb2(osc, test_segm_ipa_stream_cli_cli_read_cb);
osmo_stream_cli_set_nodelay(osc, true);
osmo_stream_cli_set_segmentation_cb(osc, osmo_ipa_segmentation_cb);
if (osmo_stream_cli_open(osc) < 0) {
fprintf(stderr, "Cannot open stream client\n");
return NULL;
}
return osc;
}
static void test_segm_ipa_stream_cli_run(void *ctx, const char *host, unsigned port,
struct osmo_stream_srv_link *srv)
{
const char *testname = "test_segm_ipa_stream_cli";
struct osmo_stream_cli *osc = NULL;
osmo_stream_srv_link_set_accept_cb(srv,
test_segm_ipa_stream_cli_srv_accept_cb);
osmo_stream_srv_link_set_port(srv, 1112);
if (osmo_stream_srv_link_open(srv) < 0) {
printf("Unable to open server\n");
exit(1);
}
osc = test_segm_ipa_stream_cli_run_client(ctx);
printf("______________________________________Running test %s______________________________________\n", testname);
alarm(2);
while (!test_segm_ipa_stream_cli_all_msgs_processed) {
osmo_gettimeofday_override_add(0, 1); /* small increment to easily spot iterations */
osmo_select_main(1);
}
alarm(0);
printf("==================================Test %s complete========================================\n\n", testname);
if (osc)
osmo_stream_cli_destroy(osc);
osmo_stream_srv_link_close(srv);
}
int main(void)
{
if (signal(SIGALRM, sigalarm_handler) == SIG_ERR) {
perror("signal");
exit(EXIT_FAILURE);
}
struct osmo_stream_srv_link *srv;
char *host = "127.0.0.11";
unsigned port = 1111;
void *tall_test = talloc_named_const(NULL, 1, "osmo_stream_test");
osmo_gettimeofday_override = true;
osmo_gettimeofday_override_time.tv_sec = 2;
osmo_gettimeofday_override_time.tv_usec = 0;
osmo_init_logging2(tall_test, &osmo_stream_test_log_info);
log_set_log_level(osmo_stderr_target, LOGL_INFO);
log_set_use_color(osmo_stderr_target, 0);
log_set_print_category_hex(osmo_stderr_target, 0);
log_set_print_category(osmo_stderr_target, 0);
log_set_print_filename2(osmo_stderr_target, LOG_FILENAME_NONE);
printf("Preparing stream server...\n");
srv = osmo_stream_srv_link_create(tall_test);
if (!srv) {
printf("Unable to create server\n");
return EXIT_FAILURE;
}
osmo_stream_srv_link_set_name(srv, "srv_link_test");
osmo_stream_srv_link_set_addr(srv, host);
osmo_stream_srv_link_set_port(srv, port);
osmo_stream_srv_link_set_accept_cb(srv, accept_cb_srv);
osmo_stream_srv_link_set_nodelay(srv, true);
test_recon(tall_test, host, port, 12, srv, true);
test_recon(tall_test, host, port, 8, srv, false);
osmo_stream_srv_link_set_data(srv, tall_test);
test_segm_ipa_stream_srv_run(tall_test, host, port, srv);
test_segm_ipa_stream_cli_run(tall_test, host, port, srv);
printf("Stream tests completed\n");
osmo_stream_srv_link_destroy(srv);
log_fini();
talloc_free(tall_test);
return EXIT_SUCCESS;
}