diff --git a/src/libcharon/plugins/osmo_epdg/gsup_client.c b/src/libcharon/plugins/osmo_epdg/gsup_client.c index 73835f530..83ff1e329 100644 --- a/src/libcharon/plugins/osmo_epdg/gsup_client.c +++ b/src/libcharon/plugins/osmo_epdg/gsup_client.c @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -38,6 +39,27 @@ #include "gsup_client.h" #include "osmo_epdg_utils.h" +/* A GSUP client for osmocom. + * + * A request will block until it handled by the gsup or timeout. + * So all tx function will block with a timeout of 5 seconds. + * + * To allow multiple request in flight, request will flow through gsup_client: + * - send_auth_request() -> generate a gsup_request_t object + * - it will enqueue()d into the inqueue (a blocking queue). + * - the sender job (a differnet thread) will get a req out of inqueue, transmit it and enqueue it into **pending**. + * - the receveier job (also a different thread?) will receive a PDU and try to find a matching gsup_request_t. + * - if a matching gsup_request_t can be found, the thread of who is blocked in send_auth_request() will be woken and can work with the response. + * + * - if a timeout happen, the gsup_request_t can be at 3 different position. + * - a) still in the *inqueue*. the requester can remove it atomic + * - b) in the *pending* list, the requester can remove it atomic + * - c) neither in *inqueue* and *pending*, it is current in use by the sender/receiver. The requester will use the gsup_request_t->lock() to wait for the completion. + * + * The c) case is the most complex to ensure the request will be cleaned. If not synchronized, the requester look into the *pending* queue, can't find it there and the + * gsup_request_t will never been cleaned. + */ + typedef struct gsup_request_t gsup_request_t; struct gsup_request_t { /** @@ -50,8 +72,23 @@ struct gsup_request_t { */ condvar_t *condvar; + /** + * Lock the object to allow working with it without an garbage collector. + * When a request time out and at the same time the receiver or sender is using the object, + * the object isn't part of the in-queue nor of the pending-list. Use the lock to synchronize + * this small time. + * After this lock is taken by enqueue(), the rx/tx gsup won't add it anymore to pending list and release it. + */ + rwlock_t *lock; + + /** + * refcounter to free the object + */ + refcount_t refcount; + struct msgb *msg; enum osmo_gsup_message_type msg_type; + char *imsi; osmo_epdg_gsup_response_t *resp; }; @@ -65,26 +102,25 @@ struct private_osmo_epdg_gsup_client_t { osmo_epdg_ipa_client_t *ipa; /** - * List of all pending requests + * List of all inqueue requests + * The list "owns" a references by req->get(). */ - blocking_queue_t *pending; + blocking_queue_t *inqueue; /** - * Current request which isn't part of linked list + * List of all pending requests (gsup_request_t). + * The list "owns" a references by req->get(). */ - gsup_request_t *current_request; - - /** - * Mutex to protect current_request - */ - mutex_t *mutex; + linked_list_t *pending; + mutex_t *pending_mutex; char *uri; stream_t *stream; }; -static gsup_request_t *gsup_request_create(enum osmo_gsup_message_type msg_type, struct msgb *msg) +/* TODO: move into own class? */ +static gsup_request_t *gsup_request_create(enum osmo_gsup_message_type msg_type, const char *imsi, struct msgb *msg) { gsup_request_t *req = calloc(1, sizeof(gsup_request_t)); if (!req) @@ -92,10 +128,13 @@ static gsup_request_t *gsup_request_create(enum osmo_gsup_message_type msg_type, return NULL; } + req->lock = rwlock_create(RWLOCK_TYPE_DEFAULT); req->mutex = mutex_create(MUTEX_TYPE_DEFAULT); req->condvar = condvar_create(CONDVAR_TYPE_DEFAULT); req->msg_type = msg_type; + req->imsi = strdup(imsi); req->msg = msg; + req->refcount = 1; return req; } @@ -117,6 +156,13 @@ static void gsup_request_destroy(gsup_request_t *this) this->condvar->destroy(this->condvar); } + DESTROY_IF(this->lock); + + if (this->imsi) + { + free(this->imsi); + } + if (this->msg) { free(this->msg); @@ -129,6 +175,36 @@ static void gsup_request_destroy(gsup_request_t *this) free(this); } +static void gsup_request_get(gsup_request_t *this) +{ + ref_get(&this->refcount); +} + +static void gsup_request_put(gsup_request_t *this) +{ + if (ref_put(&this->refcount)) + { + gsup_request_destroy(this); + } +} + +#define IMSI_LEN 15 +int imsi_copy(void *dest, const char *imsi) +{ + if (!imsi) + { + return -EINVAL; + } + + if (strlen(imsi) != IMSI_LEN) + { + return -EINVAL; + } + memcpy(dest, imsi, IMSI_LEN); + + return 0; +} + static struct msgb *encode_to_msgb(struct osmo_gsup_message *gsup_msg) { chunk_t msg_chunk; @@ -163,56 +239,95 @@ free_msg: return NULL; } +static bool remove_pending(linked_list_t *list, gsup_request_t *req) +{ + enumerator_t *enumerator; + gsup_request_t *ele = NULL; + bool found = false; + + enumerator = list->create_enumerator(list); + while (enumerator->enumerate(enumerator, (void **) &ele)) + { + if (ele == req) + { + list->remove_at(list, enumerator); + found = true; + goto out; + } + } +out: + enumerator->destroy(enumerator); + return found; +} + +static gsup_request_t *get_pending(linked_list_t *list, const char *imsi, enum osmo_gsup_message_type message_type) +{ + enumerator_t *enumerator; + gsup_request_t *req = NULL; + message_type = message_type & ~0b11; + enumerator = list->create_enumerator(list); + while (enumerator->enumerate(enumerator, (void **) &req)) + { + if (strncmp(imsi, req->imsi, IMSI_LEN) == 0 && req->msg_type == message_type) + { + list->remove_at(list, enumerator); + goto out; + } + } + req = NULL; +out: + enumerator->destroy(enumerator); + return req; +} + + /** * enqueue a message/request to be send out and wait for the response. - * - * when exiting enqueue, it must be guaranteed the req isn't referenced by anything + * + * when exiting enqueue, it must be guaranteed the req isn't referenced by anything. + * The caller must hold a ref to req via get(). * @param timeout_ms A timeout in ms * @return TRUE is the request timed out. */ static bool enqueue(private_osmo_epdg_gsup_client_t *this, gsup_request_t *req, u_int timeout_ms) { bool ret = FALSE; - DBG1(DBG_NET, "epdg: gsupc: Enqueuing message. Waiting %d ms for an answer", timeout_ms); + req->mutex->lock(req->mutex); - this->pending->enqueue(this->pending, req); + /* take a ref to have for the in/pending queue */ + gsup_request_get(req); + this->inqueue->enqueue(this->inqueue, req); ret = req->condvar->timed_wait(req->condvar, req->mutex, timeout_ms); + + /* take owner ship / allow garbage free release. + * The owner ship isn't giving back. The rx/tx path will fail on try_write_lock() */ + req->lock->write_lock(req->lock); if (ret) { - void *found = this->pending->remove(this->pending, req); - if (!found) + /* timed out */ + DBG1(DBG_NET, "epdg: gsupc: %s/%d Message timedout!", req->imsi, req->msg_type); + void *found = this->inqueue->remove(this->inqueue, req); + if (found) { - this->mutex->lock(this->mutex); - if (this->current_request == req) - { - this->current_request = NULL; - } - this->mutex->unlock(this->mutex); + /* give back the ref we took for the pending queue */ + gsup_request_put(req); + return ret; + } + this->pending_mutex->lock(this->pending_mutex); + bool found2 = remove_pending(this->pending, req); + this->pending_mutex->unlock(this->pending_mutex); + if (found2) + { + /* give back the ref we took for the pending queue */ + gsup_request_put(req); + return ret; } - DBG1(DBG_NET, "epdg: gsupc: Message timedout!"); } return ret; } -#define IMSI_LEN 15 -int imsi_copy(void *dest, const char *imsi) -{ - if (!imsi) - { - return -EINVAL; - } - - if (strlen(imsi) != IMSI_LEN) - { - return -EINVAL; - } - memcpy(dest, imsi, IMSI_LEN); - - return 0; -} - METHOD(osmo_epdg_gsup_client_t, tunnel_request, osmo_epdg_gsup_response_t*, private_osmo_epdg_gsup_client_t *this, const char *imsi) { @@ -237,18 +352,18 @@ METHOD(osmo_epdg_gsup_client_t, tunnel_request, osmo_epdg_gsup_response_t*, return NULL; } - gsup_request_t *req = gsup_request_create(OSMO_GSUP_MSGT_EPDG_TUNNEL_REQUEST, msg); + gsup_request_t *req = gsup_request_create(OSMO_GSUP_MSGT_EPDG_TUNNEL_REQUEST, imsi, msg); osmo_epdg_gsup_response_t *resp = NULL; timedout = enqueue(this, req, 5000); if (timedout) { - gsup_request_destroy(req); + gsup_request_put(req); return NULL; } resp = req->resp; req->resp = NULL; - gsup_request_destroy(req); + gsup_request_put(req); return resp; } @@ -341,19 +456,19 @@ METHOD(osmo_epdg_gsup_client_t, send_auth_request, osmo_epdg_gsup_response_t*, return NULL; } - gsup_request_t *req = gsup_request_create(OSMO_GSUP_MSGT_SEND_AUTH_INFO_REQUEST, msg); + gsup_request_t *req = gsup_request_create(OSMO_GSUP_MSGT_SEND_AUTH_INFO_REQUEST, imsi, msg); osmo_epdg_gsup_response_t *resp = NULL; timedout = enqueue(this, req, 5000); if (timedout) { DBG1(DBG_NET, "epdg: gsupc: Timeout request."); - gsup_request_destroy(req); + gsup_request_put(req); return NULL; } resp = req->resp; req->resp = NULL; - gsup_request_destroy(req); + gsup_request_put(req); return resp; } @@ -396,18 +511,18 @@ METHOD(osmo_epdg_gsup_client_t, update_location, osmo_epdg_gsup_response_t *, return NULL; } - gsup_request_t *req = gsup_request_create(OSMO_GSUP_MSGT_UPDATE_LOCATION_REQUEST, msg); + gsup_request_t *req = gsup_request_create(OSMO_GSUP_MSGT_UPDATE_LOCATION_REQUEST, imsi, msg); osmo_epdg_gsup_response_t *resp = NULL; timedout = enqueue(this, req, 5000); if (timedout) { - gsup_request_destroy(req); + gsup_request_put(req); return NULL; } resp = req->resp; req->resp = NULL; - gsup_request_destroy(req); + gsup_request_put(req); return resp; } @@ -464,6 +579,7 @@ static void signal_request(gsup_request_t *req, osmo_epdg_gsup_response_t *resp) static bool on_recv_pdu(void *data, osmo_epdg_ipa_client_t *client, struct msgb *pdu) { private_osmo_epdg_gsup_client_t *this = data; + gsup_request_t *req; osmo_epdg_gsup_response_t *resp; int ret; @@ -479,6 +595,10 @@ static bool on_recv_pdu(void *data, osmo_epdg_ipa_client_t *client, struct msgb goto out; } + DBG1(DBG_NET, "epdg: gsupc: receive gsup message %s/%d", + resp->gsup.imsi, resp->gsup.message_type); + + switch (resp->gsup.message_type) { case OSMO_GSUP_MSGT_INSERT_DATA_REQUEST: @@ -490,31 +610,32 @@ static bool on_recv_pdu(void *data, osmo_epdg_ipa_client_t *client, struct msgb case OSMO_GSUP_MSGT_UPDATE_LOCATION_RESULT: case OSMO_GSUP_MSGT_EPDG_TUNNEL_ERROR: case OSMO_GSUP_MSGT_EPDG_TUNNEL_RESULT: - this->mutex->lock(this->mutex); - if (!this->current_request) + this->pending_mutex->lock(this->pending_mutex); + req = get_pending(this->pending, resp->gsup.imsi, resp->gsup.message_type); + if (!req) { - /* a timeout happened and current_request is NULL */ - this->mutex->unlock(this->mutex); + this->pending_mutex->unlock(this->pending_mutex); + DBG1(DBG_NET, "epdg: gsupc: receive gsup message where no matching response could be found. %s/%d", + resp->gsup.imsi, resp->gsup.message_type); goto out; } - if ((this->current_request->msg_type & 0xfffffffc) != (resp->gsup.message_type & 0xfffffffc)) + + if (!req->lock->try_write_lock(req->lock)) { - /* Request, Result, Error, Other are encoded in the last 2 bits */ - DBG1(DBG_NET, "epdg: gsupc: received non matching Result. Requested %s but received %s", - osmo_gsup_message_type_name(this->current_request->msg_type), - osmo_gsup_message_type_name(resp->gsup.message_type)); - this->mutex->unlock(this->mutex); + /* Race Condition, Response came to late! */ + DBG1(DBG_NET, "epdg: gsupc: %s/%d: Can't aquire try_write_lock. Response too late", + resp->gsup.imsi, resp->gsup.message_type); + this->pending_mutex->unlock(this->pending_mutex); + gsup_request_put(req); goto out; } - if (!this->current_request) - { - DBG2(DBG_NET, "epdg: gsupc: received response when no request waiting %02x. This might came too late.", resp->gsup.message_type); - this->mutex->unlock(this->mutex); - goto out; - } - signal_request(this->current_request, resp); - this->current_request = NULL; - this->mutex->unlock(this->mutex); + this->pending_mutex->unlock(this->pending_mutex); + DBG1(DBG_NET, "epdg: gsupc: %s/%d: Informing requester. %p", + resp->gsup.imsi, resp->gsup.message_type, req); + + signal_request(req, resp); + req->lock->unlock(req->lock); + gsup_request_put(req); break; default: DBG1(DBG_NET, "epdg: gsupc: received unknown message type %02x", resp->gsup.message_type); @@ -535,33 +656,33 @@ static int disconnect_gsup(private_osmo_epdg_gsup_client_t *this) return 0; } +static void add_pending(private_osmo_epdg_gsup_client_t *this, gsup_request_t *req) +{ + this->pending_mutex->lock(this->pending_mutex); + this->pending->insert_last(this->pending, req); + this->pending_mutex->unlock(this->pending_mutex); +} + /* TODO: worker thread which sends out enqueue'd message ! */ static job_requeue_t queue_worker(private_osmo_epdg_gsup_client_t *this) { int ret; gsup_request_t *req; - this->mutex->lock(this->mutex); - if (this->current_request) - { - /* TODO: should we join the signal? */ - this->mutex->unlock(this->mutex); - return JOB_REQUEUE_FAIR; - } - this->mutex->unlock(this->mutex); + /* should we multiple queue it? */ /* TODO: replace pending with a thread safe queue, but non-blocking */ - req = this->pending->dequeue(this->pending); - - this->mutex->lock(this->mutex); - if (this->current_request) + req = this->inqueue->dequeue(this->inqueue); + if (!req) { - /* TODO: how could this happen? */ - this->mutex->unlock(this->mutex); - signal_request(req, NULL); - return JOB_REQUEUE_FAIR; + return JOB_REQUEUE_NONE; + } + + if (!req->lock->try_write_lock(req->lock)) + { + /* request is about to be released */ + gsup_request_put(req); + return JOB_REQUEUE_NONE; } - this->current_request = req; - this->mutex->unlock(this->mutex); ret = this->ipa->send(this->ipa, IPAC_PROTO_EXT_GSUP, req->msg); req->msg = NULL; @@ -569,8 +690,15 @@ static job_requeue_t queue_worker(private_osmo_epdg_gsup_client_t *this) { /* TODO: disconnect & reconnect, but request is lost for now */ /* TODO: wake up request */ + req->lock->unlock(req->lock); signal_request(req, NULL); + gsup_request_put(req); + } else { + /* add to pending */ + add_pending(this, req); + req->lock->unlock(req->lock); } + return JOB_REQUEUE_FAIR; } @@ -587,9 +715,9 @@ osmo_epdg_gsup_client_t *osmo_epdg_gsup_client_create(char *uri) .destroy = _destroy, }, .uri = strdup(uri), - .pending = blocking_queue_create(), - .current_request = NULL, - .mutex = mutex_create(MUTEX_TYPE_DEFAULT), + .inqueue = blocking_queue_create(), + .pending = linked_list_create(), + .pending_mutex = mutex_create(MUTEX_TYPE_DEFAULT), .ipa = osmo_epdg_ipa_client_create(uri), ); this->ipa->on_recv(this->ipa, IPAC_PROTO_EXT_GSUP, on_recv_pdu, this);