osmo-epdg: gsup_client: rewrite gsup client

Remove the limitation of a single global request towards the server.
This commit is contained in:
Alexander Couzens 2024-02-18 21:15:32 +01:00
parent 18b9377295
commit 3e5cc68e2a
1 changed files with 217 additions and 89 deletions

View File

@ -25,6 +25,7 @@
#include <threading/mutex.h>
#include <threading/thread.h>
#include <threading/condvar.h>
#include <threading/rwlock.h>
#include <osmocom/core/msgb.h>
#include <osmocom/gsm/apn.h>
@ -38,6 +39,27 @@
#include "gsup_client.h"
#include "osmo_epdg_utils.h"
/* A GSUP client for osmocom.
*
* A request will block until it handled by the gsup or timeout.
* So all tx function will block with a timeout of 5 seconds.
*
* To allow multiple request in flight, request will flow through gsup_client:
* - send_auth_request() -> generate a gsup_request_t object
* - it will enqueue()d into the inqueue (a blocking queue).
* - the sender job (a differnet thread) will get a req out of inqueue, transmit it and enqueue it into **pending**.
* - the receveier job (also a different thread?) will receive a PDU and try to find a matching gsup_request_t.
* - if a matching gsup_request_t can be found, the thread of who is blocked in send_auth_request() will be woken and can work with the response.
*
* - if a timeout happen, the gsup_request_t can be at 3 different position.
* - a) still in the *inqueue*. the requester can remove it atomic
* - b) in the *pending* list, the requester can remove it atomic
* - c) neither in *inqueue* and *pending*, it is current in use by the sender/receiver. The requester will use the gsup_request_t->lock() to wait for the completion.
*
* The c) case is the most complex to ensure the request will be cleaned. If not synchronized, the requester look into the *pending* queue, can't find it there and the
* gsup_request_t will never been cleaned.
*/
typedef struct gsup_request_t gsup_request_t;
struct gsup_request_t {
/**
@ -50,8 +72,23 @@ struct gsup_request_t {
*/
condvar_t *condvar;
/**
* Lock the object to allow working with it without an garbage collector.
* When a request time out and at the same time the receiver or sender is using the object,
* the object isn't part of the in-queue nor of the pending-list. Use the lock to synchronize
* this small time.
* After this lock is taken by enqueue(), the rx/tx gsup won't add it anymore to pending list and release it.
*/
rwlock_t *lock;
/**
* refcounter to free the object
*/
refcount_t refcount;
struct msgb *msg;
enum osmo_gsup_message_type msg_type;
char *imsi;
osmo_epdg_gsup_response_t *resp;
};
@ -65,26 +102,25 @@ struct private_osmo_epdg_gsup_client_t {
osmo_epdg_ipa_client_t *ipa;
/**
* List of all pending requests
* List of all inqueue requests
* The list "owns" a references by req->get().
*/
blocking_queue_t *pending;
blocking_queue_t *inqueue;
/**
* Current request which isn't part of linked list
* List of all pending requests (gsup_request_t).
* The list "owns" a references by req->get().
*/
gsup_request_t *current_request;
/**
* Mutex to protect current_request
*/
mutex_t *mutex;
linked_list_t *pending;
mutex_t *pending_mutex;
char *uri;
stream_t *stream;
};
static gsup_request_t *gsup_request_create(enum osmo_gsup_message_type msg_type, struct msgb *msg)
/* TODO: move into own class? */
static gsup_request_t *gsup_request_create(enum osmo_gsup_message_type msg_type, const char *imsi, struct msgb *msg)
{
gsup_request_t *req = calloc(1, sizeof(gsup_request_t));
if (!req)
@ -92,10 +128,13 @@ static gsup_request_t *gsup_request_create(enum osmo_gsup_message_type msg_type,
return NULL;
}
req->lock = rwlock_create(RWLOCK_TYPE_DEFAULT);
req->mutex = mutex_create(MUTEX_TYPE_DEFAULT);
req->condvar = condvar_create(CONDVAR_TYPE_DEFAULT);
req->msg_type = msg_type;
req->imsi = strdup(imsi);
req->msg = msg;
req->refcount = 1;
return req;
}
@ -117,6 +156,13 @@ static void gsup_request_destroy(gsup_request_t *this)
this->condvar->destroy(this->condvar);
}
DESTROY_IF(this->lock);
if (this->imsi)
{
free(this->imsi);
}
if (this->msg)
{
free(this->msg);
@ -129,6 +175,36 @@ static void gsup_request_destroy(gsup_request_t *this)
free(this);
}
static void gsup_request_get(gsup_request_t *this)
{
ref_get(&this->refcount);
}
static void gsup_request_put(gsup_request_t *this)
{
if (ref_put(&this->refcount))
{
gsup_request_destroy(this);
}
}
#define IMSI_LEN 15
int imsi_copy(void *dest, const char *imsi)
{
if (!imsi)
{
return -EINVAL;
}
if (strlen(imsi) != IMSI_LEN)
{
return -EINVAL;
}
memcpy(dest, imsi, IMSI_LEN);
return 0;
}
static struct msgb *encode_to_msgb(struct osmo_gsup_message *gsup_msg)
{
chunk_t msg_chunk;
@ -163,56 +239,95 @@ free_msg:
return NULL;
}
static bool remove_pending(linked_list_t *list, gsup_request_t *req)
{
enumerator_t *enumerator;
gsup_request_t *ele = NULL;
bool found = false;
enumerator = list->create_enumerator(list);
while (enumerator->enumerate(enumerator, (void **) &ele))
{
if (ele == req)
{
list->remove_at(list, enumerator);
found = true;
goto out;
}
}
out:
enumerator->destroy(enumerator);
return found;
}
static gsup_request_t *get_pending(linked_list_t *list, const char *imsi, enum osmo_gsup_message_type message_type)
{
enumerator_t *enumerator;
gsup_request_t *req = NULL;
message_type = message_type & ~0b11;
enumerator = list->create_enumerator(list);
while (enumerator->enumerate(enumerator, (void **) &req))
{
if (strncmp(imsi, req->imsi, IMSI_LEN) == 0 && req->msg_type == message_type)
{
list->remove_at(list, enumerator);
goto out;
}
}
req = NULL;
out:
enumerator->destroy(enumerator);
return req;
}
/**
* enqueue a message/request to be send out and wait for the response.
*
* when exiting enqueue, it must be guaranteed the req isn't referenced by anything
*
* when exiting enqueue, it must be guaranteed the req isn't referenced by anything.
* The caller must hold a ref to req via get().
* @param timeout_ms A timeout in ms
* @return TRUE is the request timed out.
*/
static bool enqueue(private_osmo_epdg_gsup_client_t *this, gsup_request_t *req, u_int timeout_ms)
{
bool ret = FALSE;
DBG1(DBG_NET, "epdg: gsupc: Enqueuing message. Waiting %d ms for an answer", timeout_ms);
req->mutex->lock(req->mutex);
this->pending->enqueue(this->pending, req);
/* take a ref to have for the in/pending queue */
gsup_request_get(req);
this->inqueue->enqueue(this->inqueue, req);
ret = req->condvar->timed_wait(req->condvar, req->mutex, timeout_ms);
/* take owner ship / allow garbage free release.
* The owner ship isn't giving back. The rx/tx path will fail on try_write_lock() */
req->lock->write_lock(req->lock);
if (ret)
{
void *found = this->pending->remove(this->pending, req);
if (!found)
/* timed out */
DBG1(DBG_NET, "epdg: gsupc: %s/%d Message timedout!", req->imsi, req->msg_type);
void *found = this->inqueue->remove(this->inqueue, req);
if (found)
{
this->mutex->lock(this->mutex);
if (this->current_request == req)
{
this->current_request = NULL;
}
this->mutex->unlock(this->mutex);
/* give back the ref we took for the pending queue */
gsup_request_put(req);
return ret;
}
this->pending_mutex->lock(this->pending_mutex);
bool found2 = remove_pending(this->pending, req);
this->pending_mutex->unlock(this->pending_mutex);
if (found2)
{
/* give back the ref we took for the pending queue */
gsup_request_put(req);
return ret;
}
DBG1(DBG_NET, "epdg: gsupc: Message timedout!");
}
return ret;
}
#define IMSI_LEN 15
int imsi_copy(void *dest, const char *imsi)
{
if (!imsi)
{
return -EINVAL;
}
if (strlen(imsi) != IMSI_LEN)
{
return -EINVAL;
}
memcpy(dest, imsi, IMSI_LEN);
return 0;
}
METHOD(osmo_epdg_gsup_client_t, tunnel_request, osmo_epdg_gsup_response_t*,
private_osmo_epdg_gsup_client_t *this, const char *imsi)
{
@ -237,18 +352,18 @@ METHOD(osmo_epdg_gsup_client_t, tunnel_request, osmo_epdg_gsup_response_t*,
return NULL;
}
gsup_request_t *req = gsup_request_create(OSMO_GSUP_MSGT_EPDG_TUNNEL_REQUEST, msg);
gsup_request_t *req = gsup_request_create(OSMO_GSUP_MSGT_EPDG_TUNNEL_REQUEST, imsi, msg);
osmo_epdg_gsup_response_t *resp = NULL;
timedout = enqueue(this, req, 5000);
if (timedout)
{
gsup_request_destroy(req);
gsup_request_put(req);
return NULL;
}
resp = req->resp;
req->resp = NULL;
gsup_request_destroy(req);
gsup_request_put(req);
return resp;
}
@ -341,19 +456,19 @@ METHOD(osmo_epdg_gsup_client_t, send_auth_request, osmo_epdg_gsup_response_t*,
return NULL;
}
gsup_request_t *req = gsup_request_create(OSMO_GSUP_MSGT_SEND_AUTH_INFO_REQUEST, msg);
gsup_request_t *req = gsup_request_create(OSMO_GSUP_MSGT_SEND_AUTH_INFO_REQUEST, imsi, msg);
osmo_epdg_gsup_response_t *resp = NULL;
timedout = enqueue(this, req, 5000);
if (timedout)
{
DBG1(DBG_NET, "epdg: gsupc: Timeout request.");
gsup_request_destroy(req);
gsup_request_put(req);
return NULL;
}
resp = req->resp;
req->resp = NULL;
gsup_request_destroy(req);
gsup_request_put(req);
return resp;
}
@ -396,18 +511,18 @@ METHOD(osmo_epdg_gsup_client_t, update_location, osmo_epdg_gsup_response_t *,
return NULL;
}
gsup_request_t *req = gsup_request_create(OSMO_GSUP_MSGT_UPDATE_LOCATION_REQUEST, msg);
gsup_request_t *req = gsup_request_create(OSMO_GSUP_MSGT_UPDATE_LOCATION_REQUEST, imsi, msg);
osmo_epdg_gsup_response_t *resp = NULL;
timedout = enqueue(this, req, 5000);
if (timedout)
{
gsup_request_destroy(req);
gsup_request_put(req);
return NULL;
}
resp = req->resp;
req->resp = NULL;
gsup_request_destroy(req);
gsup_request_put(req);
return resp;
}
@ -464,6 +579,7 @@ static void signal_request(gsup_request_t *req, osmo_epdg_gsup_response_t *resp)
static bool on_recv_pdu(void *data, osmo_epdg_ipa_client_t *client, struct msgb *pdu)
{
private_osmo_epdg_gsup_client_t *this = data;
gsup_request_t *req;
osmo_epdg_gsup_response_t *resp;
int ret;
@ -479,6 +595,10 @@ static bool on_recv_pdu(void *data, osmo_epdg_ipa_client_t *client, struct msgb
goto out;
}
DBG1(DBG_NET, "epdg: gsupc: receive gsup message %s/%d",
resp->gsup.imsi, resp->gsup.message_type);
switch (resp->gsup.message_type)
{
case OSMO_GSUP_MSGT_INSERT_DATA_REQUEST:
@ -490,31 +610,32 @@ static bool on_recv_pdu(void *data, osmo_epdg_ipa_client_t *client, struct msgb
case OSMO_GSUP_MSGT_UPDATE_LOCATION_RESULT:
case OSMO_GSUP_MSGT_EPDG_TUNNEL_ERROR:
case OSMO_GSUP_MSGT_EPDG_TUNNEL_RESULT:
this->mutex->lock(this->mutex);
if (!this->current_request)
this->pending_mutex->lock(this->pending_mutex);
req = get_pending(this->pending, resp->gsup.imsi, resp->gsup.message_type);
if (!req)
{
/* a timeout happened and current_request is NULL */
this->mutex->unlock(this->mutex);
this->pending_mutex->unlock(this->pending_mutex);
DBG1(DBG_NET, "epdg: gsupc: receive gsup message where no matching response could be found. %s/%d",
resp->gsup.imsi, resp->gsup.message_type);
goto out;
}
if ((this->current_request->msg_type & 0xfffffffc) != (resp->gsup.message_type & 0xfffffffc))
if (!req->lock->try_write_lock(req->lock))
{
/* Request, Result, Error, Other are encoded in the last 2 bits */
DBG1(DBG_NET, "epdg: gsupc: received non matching Result. Requested %s but received %s",
osmo_gsup_message_type_name(this->current_request->msg_type),
osmo_gsup_message_type_name(resp->gsup.message_type));
this->mutex->unlock(this->mutex);
/* Race Condition, Response came to late! */
DBG1(DBG_NET, "epdg: gsupc: %s/%d: Can't aquire try_write_lock. Response too late",
resp->gsup.imsi, resp->gsup.message_type);
this->pending_mutex->unlock(this->pending_mutex);
gsup_request_put(req);
goto out;
}
if (!this->current_request)
{
DBG2(DBG_NET, "epdg: gsupc: received response when no request waiting %02x. This might came too late.", resp->gsup.message_type);
this->mutex->unlock(this->mutex);
goto out;
}
signal_request(this->current_request, resp);
this->current_request = NULL;
this->mutex->unlock(this->mutex);
this->pending_mutex->unlock(this->pending_mutex);
DBG1(DBG_NET, "epdg: gsupc: %s/%d: Informing requester. %p",
resp->gsup.imsi, resp->gsup.message_type, req);
signal_request(req, resp);
req->lock->unlock(req->lock);
gsup_request_put(req);
break;
default:
DBG1(DBG_NET, "epdg: gsupc: received unknown message type %02x", resp->gsup.message_type);
@ -535,33 +656,33 @@ static int disconnect_gsup(private_osmo_epdg_gsup_client_t *this)
return 0;
}
static void add_pending(private_osmo_epdg_gsup_client_t *this, gsup_request_t *req)
{
this->pending_mutex->lock(this->pending_mutex);
this->pending->insert_last(this->pending, req);
this->pending_mutex->unlock(this->pending_mutex);
}
/* TODO: worker thread which sends out enqueue'd message ! */
static job_requeue_t queue_worker(private_osmo_epdg_gsup_client_t *this)
{
int ret;
gsup_request_t *req;
this->mutex->lock(this->mutex);
if (this->current_request)
{
/* TODO: should we join the signal? */
this->mutex->unlock(this->mutex);
return JOB_REQUEUE_FAIR;
}
this->mutex->unlock(this->mutex);
/* should we multiple queue it? */
/* TODO: replace pending with a thread safe queue, but non-blocking */
req = this->pending->dequeue(this->pending);
this->mutex->lock(this->mutex);
if (this->current_request)
req = this->inqueue->dequeue(this->inqueue);
if (!req)
{
/* TODO: how could this happen? */
this->mutex->unlock(this->mutex);
signal_request(req, NULL);
return JOB_REQUEUE_FAIR;
return JOB_REQUEUE_NONE;
}
if (!req->lock->try_write_lock(req->lock))
{
/* request is about to be released */
gsup_request_put(req);
return JOB_REQUEUE_NONE;
}
this->current_request = req;
this->mutex->unlock(this->mutex);
ret = this->ipa->send(this->ipa, IPAC_PROTO_EXT_GSUP, req->msg);
req->msg = NULL;
@ -569,8 +690,15 @@ static job_requeue_t queue_worker(private_osmo_epdg_gsup_client_t *this)
{
/* TODO: disconnect & reconnect, but request is lost for now */
/* TODO: wake up request */
req->lock->unlock(req->lock);
signal_request(req, NULL);
gsup_request_put(req);
} else {
/* add to pending */
add_pending(this, req);
req->lock->unlock(req->lock);
}
return JOB_REQUEUE_FAIR;
}
@ -587,9 +715,9 @@ osmo_epdg_gsup_client_t *osmo_epdg_gsup_client_create(char *uri)
.destroy = _destroy,
},
.uri = strdup(uri),
.pending = blocking_queue_create(),
.current_request = NULL,
.mutex = mutex_create(MUTEX_TYPE_DEFAULT),
.inqueue = blocking_queue_create(),
.pending = linked_list_create(),
.pending_mutex = mutex_create(MUTEX_TYPE_DEFAULT),
.ipa = osmo_epdg_ipa_client_create(uri),
);
this->ipa->on_recv(this->ipa, IPAC_PROTO_EXT_GSUP, on_recv_pdu, this);