rtp-load-gen: Add CTRL interface

Change-Id: I1122e5c125f03b615d587f734fc1b7eadf827a27
This commit is contained in:
Harald Welte 2020-10-19 13:14:41 +02:00
parent 004ea1398a
commit b7a0e043b5
4 changed files with 302 additions and 28 deletions

View File

@ -1,8 +1,11 @@
CFLAGS=-g -Wall $(shell pkg-config --cflags libosmocore libosmo-netif liburing)
LIBS=-lpthread $(shell pkg-config --libs libosmocore libosmo-netif liburing)
CFLAGS=-g -Wall $(shell pkg-config --cflags libosmocore libosmoctrl libosmo-netif liburing)
LIBS=-lpthread $(shell pkg-config --libs libosmocore libosmoctrl libosmo-netif liburing)
rtp-load-gen: rtp-load-gen.o rtp_provider.o rtp_provider_static.o
rtp-load-gen: rtp-load-gen.o rtp_provider.o rtp_provider_static.o ctrl_if.o
$(CC) -o $@ $^ $(LIBS)
%.o: %.c
$(CC) $(CFLAGS) -o $@ -c $^
clean:
@rm rtp-load-gen *.o

View File

@ -0,0 +1,214 @@
/* CTRL interface of rtpsource program
*
* (C) 2020 by Harald Welte <laforge@gnumonks.org>
*
* All Rights Reserved
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <osmocom/ctrl/control_cmd.h>
#include "internal.h"
#include "rtp_provider.h"
static struct rtpsim_connection *find_connection_by_cname(const char *cname)
{
struct rtpsim_connection *rtpc;
struct rtpsim_instance *ri;
pthread_rwlock_rdlock(&g_rtpsim->rwlock);
llist_for_each_entry(ri, &g_rtpsim->instances, list) {
rtpc = rtpsim_conn_find(ri, cname);
if (rtpc) {
pthread_rwlock_unlock(&g_rtpsim->rwlock);
return rtpc;
}
}
pthread_rwlock_unlock(&g_rtpsim->rwlock);
return NULL;
}
static struct rtpsim_connection *create_connection(const char *cname, enum codec_type codec)
{
struct rtpsim_connection *rtpc;
struct rtpsim_instance *ri;
pthread_rwlock_rdlock(&g_rtpsim->rwlock);
llist_for_each_entry(ri, &g_rtpsim->instances, list) {
rtpc = rtpsim_conn_reserve(ri, cname, codec);
if (rtpc) {
pthread_rwlock_unlock(&g_rtpsim->rwlock);
return rtpc;
}
}
pthread_rwlock_unlock(&g_rtpsim->rwlock);
return NULL;
}
static int connect_connection(struct rtpsim_connection *rtpc, const char *remote_host,
uint16_t remote_port, uint8_t pt)
{
int rc;
osmo_sockaddr_str_from_str(&rtpc->cfg.remote, remote_host, remote_port);
rtpc->cfg.pt = pt;
rc = rtpsim_conn_connect(rtpc);
if (rc < 0)
return rc;
rc = rtpsim_conn_start(rtpc);
return rc;
}
static int delete_connection(struct rtpsim_connection *rtpc)
{
rtpsim_conn_stop(rtpc);
rtpsim_conn_unreserve(rtpc);
return 0;
}
CTRL_CMD_DEFINE_WO_NOVRF(rtp_create, "rtp_create");
static int set_rtp_create(struct ctrl_cmd *cmd, void *data)
{
struct rtpsim_connection *conn;
const char *cname, *codec_str;
char *tmp, *saveptr;
enum codec_type codec;
tmp = talloc_strdup(cmd, cmd->value);
OSMO_ASSERT(tmp);
cname = strtok_r(tmp, ",", &saveptr);
codec_str = strtok_r(NULL, ",", &saveptr);
if (!cname || !codec_str) {
cmd->reply = "Format is cname,codec";
goto error;
}
if (find_connection_by_cname(cname)) {
cmd->reply = "Connection already exists for cname";
goto error;
}
codec = get_string_value(codec_type_names, codec_str);
if (codec < 0) {
cmd->reply = "Invalid codec name (try GSM_FR, GSM_EFR etc.)";
goto error;
}
conn = create_connection(cname, codec);
if (!conn) {
cmd->reply = "Error creating RTP connection";
goto error;
}
/* Respond */
cmd->reply = talloc_asprintf(cmd, "%s,%s,%d", conn->cname, conn->cfg.local.ip, conn->cfg.local.port);
talloc_free(tmp);
return CTRL_CMD_REPLY;
error:
talloc_free(tmp);
return CTRL_CMD_ERROR;
}
CTRL_CMD_DEFINE_WO_NOVRF(rtp_connect, "rtp_connect");
static int set_rtp_connect(struct ctrl_cmd *cmd, void *data)
{
struct rtpsim_connection *conn;
const char *cname, *remote_host, *remote_port, *pt;
char *tmp, *saveptr;
int rc;
tmp = talloc_strdup(cmd, cmd->value);
OSMO_ASSERT(tmp);
/* FIXME: parse command */
cname = strtok_r(tmp, ",", &saveptr);
remote_host = strtok_r(NULL, ",", &saveptr);
remote_port = strtok_r(NULL, ",", &saveptr);
pt = strtok_r(NULL, ",", &saveptr);
if (!cname || !remote_host || !remote_port || !pt) {
cmd->reply = "Format is cname,remote_host,remote_port,pt";
talloc_free(tmp);
return CTRL_CMD_ERROR;
}
conn = find_connection_by_cname(cname);
if (!conn) {
cmd->reply = "Error finding RTP connection for connect";
talloc_free(tmp);
return CTRL_CMD_ERROR;
}
rc = connect_connection(conn, remote_host, atoi(remote_port), atoi(pt));
if (rc < 0) {
cmd->reply = "Error binding RTP connection";
talloc_free(tmp);
return CTRL_CMD_ERROR;
}
/* Respond */
talloc_free(tmp);
cmd->reply = talloc_asprintf(cmd, "%s,%s,%d,%d", conn->cname, conn->cfg.remote.ip,
conn->cfg.remote.port, conn->cfg.pt);
return CTRL_CMD_REPLY;
}
CTRL_CMD_DEFINE_WO_NOVRF(rtp_delete, "rtp_delete");
static int set_rtp_delete(struct ctrl_cmd *cmd, void *data)
{
struct rtpsim_connection *conn;
const char *cname = cmd->value;
conn = find_connection_by_cname(cname);
if (!conn) {
cmd->reply = "Error finding RTP connection for delete";
return CTRL_CMD_ERROR;
}
cmd->reply = talloc_asprintf(cmd, "%s", conn->cname);
delete_connection(conn);
/* Respond */
return CTRL_CMD_REPLY;
}
int rtpsource_ctrl_cmds_install(void)
{
int rc;
rc = ctrl_cmd_install(CTRL_NODE_ROOT, &cmd_rtp_create);
if (rc)
goto end;
rc = ctrl_cmd_install(CTRL_NODE_ROOT, &cmd_rtp_connect);
if (rc)
goto end;
rc = ctrl_cmd_install(CTRL_NODE_ROOT, &cmd_rtp_delete);
if (rc)
goto end;
end:
return rc;
}

View File

@ -1,10 +1,13 @@
#pragma once
#include <stdint.h>
#include <pthread.h>
#include <liburing.h>
#include <osmocom/core/linuxlist.h>
#include <osmocom/core/sockaddr_str.h>
#include <osmocom/core/rate_ctr.h>
#include <osmocom/core/select.h>
#include <osmocom/ctrl/control_if.h>
struct rtp_provider_instance;
@ -61,7 +64,6 @@ struct rtpsim_connection {
struct rtpsim_instance_cfg {
int num;
void *ctx;
uint16_t base_port;
unsigned int num_flows;
};
@ -83,7 +85,28 @@ struct rtpsim_instance {
unsigned int connections_size;
};
struct rtpsim_global {
/* global list of instances */
struct llist_head instances;
pthread_rwlock_t rwlock;
struct ctrl_handle *ctrl;
};
enum {
DMAIN,
};
enum codec_type;
extern struct rtpsim_global *g_rtpsim;
struct rtpsim_connection *rtpsim_conn_find(struct rtpsim_instance *ri, const char *cname);
struct rtpsim_connection *rtpsim_conn_reserve(struct rtpsim_instance *ri, const char *cname, enum codec_type codec);
int rtpsim_conn_start(struct rtpsim_connection *rtpc);
void rtpsim_conn_stop(struct rtpsim_connection *rtpc);
void rtpsim_conn_unreserve(struct rtpsim_connection *rtpc);
int rtpsim_conn_connect(struct rtpsim_connection *rtpc);
int rtpsource_ctrl_cmds_install(void);

View File

@ -11,6 +11,7 @@
#include <osmocom/core/socket.h>
#include <osmocom/core/rate_ctr.h>
#include <osmocom/core/application.h>
#include <osmocom/core/stats.h>
#include <osmocom/netif/rtp.h>
#include "internal.h"
@ -40,6 +41,8 @@
#define TX_BUF_IDX 0
#define RX_BUF_IDX 1
struct rtpsim_global *g_rtpsim;
enum rtpsim_conn_ctr {
RTP_CONN_CTR_TX_PKTS,
RTP_CONN_CTR_TX_BYTES,
@ -79,9 +82,9 @@ static const struct rate_ctr_group_desc rtpsim_ctrg_desc = {
.ctr_desc = rtpsim_ctrs,
};
struct rtpsim_instance *rtpsim_instance_init(const struct rtpsim_instance_cfg *rmp)
struct rtpsim_instance *rtpsim_instance_init(void *ctx, const struct rtpsim_instance_cfg *rmp)
{
struct rtpsim_instance *ri = talloc_zero(rmp->ctx, struct rtpsim_instance);
struct rtpsim_instance *ri = talloc_zero(ctx, struct rtpsim_instance);
int rc;
if (!ri)
@ -173,9 +176,14 @@ struct rtpsim_connection *rtpsim_conn_find(struct rtpsim_instance *ri, const cha
}
/* reserve a connection; associates cname with it */
struct rtpsim_connection *rtpsim_conn_reserve(struct rtpsim_instance *ri, const char *cname)
struct rtpsim_connection *rtpsim_conn_reserve(struct rtpsim_instance *ri, const char *cname,
enum codec_type codec)
{
struct rtpsim_connection *rtpc;
const struct rtp_provider *rtp_prov;
rtp_prov = rtp_provider_find("static"); // TODO: configurable */
OSMO_ASSERT(rtp_prov);
rtpc = rtpsim_conn_find(ri, NULL);
if (!rtpc)
@ -183,37 +191,34 @@ struct rtpsim_connection *rtpsim_conn_reserve(struct rtpsim_instance *ri, const
/* this is called from main thread, we cannot use per-thread talloc contexts
* such as ri or rtpc */
rtpc->cname = talloc_strdup(NULL, cname);
rtpc->cname = talloc_strdup(g_rtpsim, cname);
rtpc->tx.rtp_prov_inst = rtp_provider_instance_alloc(g_rtpsim, rtp_prov, codec);
OSMO_ASSERT(rtpc->tx.rtp_prov_inst);
/* re-start from zero transmit sequence number */
rtpc->tx.seq = 0;
return rtpc;
}
int rtpsim_conn_start(struct rtpsim_connection *rtpc, enum codec_type codec)
int rtpsim_conn_start(struct rtpsim_connection *rtpc)
{
const struct rtp_provider *rtp_prov;
rtp_prov = rtp_provider_find("static"); // TODO: configurable */
OSMO_ASSERT(rtp_prov);
/* this is called from main thread, we cannot use per-thread talloc contexts
* such as ri or rtpc */
rtpc->tx.rtp_prov_inst = rtp_provider_instance_alloc(NULL, rtp_prov, codec);
OSMO_ASSERT(rtpc->tx.rtp_prov_inst);
rtpc->tx.enabled = true;
rtpc->rx.enabled = true;
return 0;
}
/* unreserve a connection; stops all rx/tx and removes cname */
void rtpsim_conn_unreserve(struct rtpsim_connection *rtpc)
void rtpsim_conn_stop(struct rtpsim_connection *rtpc)
{
/* disable Rx and Tx */
rtpc->tx.enabled = false;
rtpc->rx.enabled = false;
/* re-start from zero transmit sequence number */
rtpc->tx.seq = 0;
}
/* unreserve a connection; stops all rx/tx and removes cname */
void rtpsim_conn_unreserve(struct rtpsim_connection *rtpc)
{
rtp_provider_instance_free(rtpc->tx.rtp_prov_inst);
rtpc->tx.rtp_prov_inst = NULL;
@ -335,15 +340,25 @@ static void *reap_completion(void *_ri)
}
#endif
extern int osmo_ctx_init(const char *id);
static void rtpsim_main(const struct rtpsim_instance_cfg *rmp)
{
struct rtpsim_instance *ri;
struct rtpsim_connection *rtpc;
int i, rc;
ri = rtpsim_instance_init(rmp);
char namebuf[32];
snprintf(namebuf, sizeof(namebuf), "rtpsim_worker%d", rmp->num);
osmo_ctx_init(namebuf);
ri = rtpsim_instance_init(OTC_GLOBAL, rmp);
OSMO_ASSERT(ri);
pthread_rwlock_wrlock(&g_rtpsim->rwlock);
llist_add_tail(&ri->list, &g_rtpsim->instances);
pthread_rwlock_unlock(&g_rtpsim->rwlock);
/* create desired number of sockets */
printf("binding sockets\n");
for (i = 0; i < rmp->num_flows; i++) {
@ -367,16 +382,18 @@ static void rtpsim_main(const struct rtpsim_instance_cfg *rmp)
OSMO_ASSERT(rtpc);
}
#if 1
/* HACK */
printf("connecting sockets\n");
for (i = 0; i < rmp->num_flows; i++) {
char namebuf[32];
snprintf(namebuf, sizeof(namebuf), "conn%d", i);
struct rtpsim_connection *rtpc = rtpsim_conn_reserve(ri, namebuf);
struct rtpsim_connection *rtpc = rtpsim_conn_reserve(ri, namebuf, CODEC_GSM_FR);
OSMO_ASSERT(rtpc);
OSMO_ASSERT(rtpsim_conn_connect(rtpc) == 0);
OSMO_ASSERT(rtpsim_conn_start(rtpc, CODEC_GSM_FR) == 0);
OSMO_ASSERT(rtpsim_conn_start(rtpc) == 0);
}
#endif
#ifdef USE_REGISTERED_FILES
/* register all our file descriptors; seems to fail on 5.8.x ? */
@ -506,13 +523,19 @@ int main(int argc, char **argv)
{
pthread_t worker[NR_WORKERS];
struct rtpsim_instance_cfg rmp[NR_WORKERS];
int i;
int i, rc;
osmo_init_logging2(NULL, NULL);
g_rtpsim = talloc_zero(NULL, struct rtpsim_global);
OSMO_ASSERT(g_rtpsim);
INIT_LLIST_HEAD(&g_rtpsim->instances);
pthread_rwlock_init(&g_rtpsim->rwlock, NULL);
osmo_init_logging2(g_rtpsim, NULL);
// osmo_stats_init(g_rtpsim);
/* Create worker threads */
for (i = 0; i < NR_WORKERS; i++) {
int rc;
rmp[i].ctx = talloc_named(NULL, 0, "rtpsim%d", i);
rmp[i].num = i;
rmp[i].num_flows = NUM_FLOWS_PER_WORKER;
rmp[i].base_port = 10000 + i * (2 * rmp[i].num_flows);
@ -520,6 +543,17 @@ int main(int argc, char **argv)
OSMO_ASSERT(rc >= 0);
}
/* CTRL interface */
//g_rtpsim->ctrl = ctrl_interface_setup_dynip(g_rss, ctrl_vty_get_bind_addr(), 11111, NULL);
g_rtpsim->ctrl = ctrl_interface_setup_dynip(g_rtpsim, "127.0.0.1", 11111, NULL);
OSMO_ASSERT(g_rtpsim->ctrl);
rc = rtpsource_ctrl_cmds_install();
OSMO_ASSERT(rc == 0);
while (1) {
osmo_select_main(0);
}
for (i = 0; i < NR_WORKERS; i++) {
pthread_join(worker[i], NULL);
}