WIP: use inter-thread queue between REST and main

This way we serialize access to core data structures and also allow
future per-message FSMs in the main thread to first communicate to the
RAN peers before permitting the REST/HTTP worker thread to return
any response back to the external REST interface user.

Change-Id: Ia9f656733cf16412926a6d6dc9e1ff4243d941af
This commit is contained in:
Harald Welte 2019-08-29 21:54:39 +02:00 committed by Harald Welte
parent c8ea464939
commit 41634872dc
7 changed files with 190 additions and 17 deletions

View File

@ -5,12 +5,12 @@ AM_CFLAGS=-Wall -g $(LIBOSMOCORE_CFLAGS) $(LIBOSMOGSM_CFLAGS) $(LIBOSMOVTY_CFLAG
$(COVERAGE_CFLAGS)
AM_LDFLAGS=$(COVERAGE_LDFLAGS)
EXTRA_DIST = cbc_data.h cbsp_server.h charset.h internal.h
EXTRA_DIST = cbc_data.h cbsp_server.h charset.h internal.h rest_it_op.h
bin_PROGRAMS = osmo-cbc
osmo_cbc_SOURCES = cbc_main.c cbc_data.c cbc_vty.c cbsp_server.c cbsp_server_fsm.c \
rest_api.c charset.c message_handling.c
rest_api.c charset.c message_handling.c rest_it_op.c
osmo_cbc_LDADD = $(LIBOSMOCORE_LIBS) $(LIBOSMOGSM_LIBS) $(LIBOSMOVTY_LIBS) \
$(LIBOSMONETIF_LIBS) \
$(ULFIUS_LIBS) $(JANSSON_LIBS) $(ORCANIA_LIBS)

View File

@ -2,6 +2,7 @@
#include <stdint.h>
#include <stdbool.h>
#include <osmocom/core/linuxlist.h>
#include <osmocom/core/it_q.h>
#include <osmocom/gsm/protocol/gsm_48_049.h>
struct osmo_cbsp_cbc_client;
@ -110,6 +111,9 @@ struct cbc {
struct llist_head messages; /* cbc_message.list */
struct llist_head peers; /* cbc_peer.list */
struct {
struct osmo_it_q *rest2main;
} it_q;
};
extern struct cbc *g_cbc;

View File

@ -197,6 +197,9 @@ int main(int argc, char **argv)
rest_api_init(tall_rest_ctx, 12345);
g_cbc->it_q.rest2main = osmo_it_q_alloc(g_cbc, "rest2main", 10, rest2main_read_cb, NULL);
OSMO_ASSERT(g_cbc->it_q.rest2main);
signal(SIGUSR1, &signal_handler);
signal(SIGUSR2, &signal_handler);
osmo_init_ignore_signals();

View File

@ -33,3 +33,6 @@ void cbc_vty_init(void);
/* message_handling.c */
int cbc_message_new(struct cbc_message *cbcmsg);
struct cbc_message *cbc_message_by_id(uint16_t message_id);
/* rest_it_op.c */
void rest2main_read_cb(struct osmo_it_q *q, void *item);

View File

@ -39,6 +39,7 @@
#include "internal.h"
#include "charset.h"
#include "cbc_data.h"
#include "rest_it_op.h"
/* get an integer value for field "key" in object "parent" */
static int json_get_integer(int *out, json_t *parent, const char *key)
@ -425,30 +426,37 @@ static int json2cbc_message(struct cbc_message *out, json_t *in)
static int api_cb_message_post(const struct _u_request *req, struct _u_response *resp, void *user_data)
{
struct cbc_message *cbc_msg = talloc_zero(g_cbc, struct cbc_message);
struct rest_it_op *riop = talloc_zero(g_cbc, struct rest_it_op);
json_error_t json_err;
json_t *json_req = NULL;
int rc;
if (!riop)
return -ENOMEM;
riop->operation = REST_IT_OP_MSG_CREATE;
json_req = ulfius_get_json_body_request(req, &json_err);
if (!json_req) {
LOGP(DREST, LOGL_ERROR, "REST: No JSON Body\n");
goto err;
}
rc = json2cbc_message(cbc_msg, json_req);
rc = json2cbc_message(&riop->u.create.cbc_msg, json_req);
if (rc < 0)
goto err;
/* actually add the message */
cbc_message_new(cbc_msg);
/* request message to be added by main thread */
rc = rest_it_op_send_and_wait(riop, 10);
if (rc < 0)
goto err;
json_decref(json_req);
ulfius_set_empty_body_response(resp, 200);
return U_CALLBACK_COMPLETE;
err:
json_decref(json_req);
talloc_free(cbc_msg);
talloc_free(riop);
ulfius_set_empty_body_response(resp, 400);
return U_CALLBACK_COMPLETE;
}
@ -456,9 +464,10 @@ err:
static int api_cb_message_del(const struct _u_request *req, struct _u_response *resp, void *user_data)
{
const char *message_id_str = u_map_get(req->map_url, "message_id");
struct cbc_message *cbc_msg;
struct rest_it_op *riop = talloc_zero(g_cbc, struct rest_it_op);
uint16_t message_id;
int status = 404;
int rc;
if (!message_id_str) {
status = 400;
@ -470,19 +479,24 @@ static int api_cb_message_del(const struct _u_request *req, struct _u_response *
goto err;
}
cbc_msg = cbc_message_by_id(message_id);
if (cbc_msg) {
status = 200;
/* FIXME: delete from all peers */
/* should we postpone this and rather translate into another state until we get
* all the KILL ACK from the peers [and hence can update statistics] ? */
llist_del(&cbc_msg->list);
talloc_free(cbc_msg);
if (!riop) {
status = 999; /* FIXME */
goto err;
}
ulfius_set_empty_body_response(resp, status);
riop->operation = REST_IT_OP_MSG_DELETE;
riop->u.del.msg_id = message_id;
/* request message to be deleted by main thread */
rc = rest_it_op_send_and_wait(riop, 10);
if (rc < 0)
goto err;
talloc_free(riop);
ulfius_set_empty_body_response(resp, 200);
return U_CALLBACK_COMPLETE;
err:
talloc_free(riop);
ulfius_set_empty_body_response(resp, status);
return U_CALLBACK_COMPLETE;
}

104
src/rest_it_op.c Normal file
View File

@ -0,0 +1,104 @@
/* Osmocom CBC (Cell Broacast Centre) */
/* (C) 2019 by Harald Welte <laforge@gnumonks.org>
* All Rights Reserved
*
* SPDX-License-Identifier: AGPL-3.0+
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
#include <errno.h>
#include <pthread.h>
#include <osmocom/core/it_q.h>
#include "rest_it_op.h"
#include "internal.h"
/***********************************************************************
* HTTP THREAD
***********************************************************************/
struct rest_it_op *rest_it_op_alloc(void *ctx)
{
struct rest_it_op *op = talloc_zero(ctx, struct rest_it_op);
if (!op)
return NULL;
pthread_mutex_init(&op->mutex, NULL);
pthread_cond_init(&op->cond, NULL);
return op;
}
/* enqueue an inter-thread operation in REST->main direction and wait for its completion */
int rest_it_op_send_and_wait(struct rest_it_op *op, unsigned int wait_sec)
{
struct timespec ts;
int rc = 0;
rc = osmo_it_q_enqueue(g_cbc->it_q.rest2main, op, list);
if (rc < 0)
return rc;
/* grab mutex before pthread_cond_timedwait() */
pthread_mutex_lock(&op->mutex);
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += wait_sec;
while (rc == 0)
rc = pthread_cond_timedwait(&op->cond, &op->mutex, &ts);
if (rc == 0)
pthread_mutex_unlock(&op->mutex);
/* 'op' is implicitly owned by the caller again now, who needs to take care
* of releasing its memory */
return rc;
}
/***********************************************************************
* MAIN THREAD
***********************************************************************/
void rest2main_read_cb(struct osmo_it_q *q, void *item)
{
struct rest_it_op *op = item;
struct cbc_message *cbc_msg;
/* FIXME: look up related message and dispatch to message FSM,
* which will eventually call pthread_cond_signal(&op->cond) */
switch (op->operation) {
case REST_IT_OP_MSG_CREATE:
/* FIXME: send to message FSM who can addd it on RAN */
cbc_message_new(&op->u.create.cbc_msg);
break;
case REST_IT_OP_MSG_DELETE:
/* FIXME: send to message FSM who can remove it from RAN */
cbc_msg = cbc_message_by_id(op->u.del.msg_id);
if (cbc_msg) {
llist_del(&cbc_msg->list);
talloc_free(cbc_msg);
}
break;
default:
break;
}
pthread_cond_signal(&op->cond); // HACK
}

45
src/rest_it_op.h Normal file
View File

@ -0,0 +1,45 @@
#pragma once
#include <stdint.h>
#include <pthread.h>
#include <osmocom/core/fsm.h>
#include <osmocom/core/linuxlist.h>
#include <osmocom/core/it_q.h>
#include "cbc_data.h"
enum rest_it_operation {
REST_IT_OP_NONE,
REST_IT_OP_MSG_CREATE,
REST_IT_OP_MSG_DELETE,
_NUM_REST_IT_OP
};
/* create a new SMSCB message */
struct rest_it_op_create {
struct cbc_message cbc_msg;
};
/* delete a SMSCB message from our state and all peers */
struct rest_it_op_delete {
uint16_t msg_id;
};
/* Inter-Thread operation from REST thread to main thread */
struct rest_it_op {
struct llist_head list;
/* condition variable for REST thread to pthread_cond_wait on */
pthread_cond_t cond;
/* mutex required around cond */
pthread_mutex_t mutex;
enum rest_it_operation operation;
union {
struct rest_it_op_create create;
struct rest_it_op_delete del;
} u;
};
int rest_it_op_send_and_wait(struct rest_it_op *op, unsigned int wait_sec);