diff --git a/src/libcharon/plugins/load_tester/load_tester_control.c b/src/libcharon/plugins/load_tester/load_tester_control.c index 114233d57..51069e36a 100644 --- a/src/libcharon/plugins/load_tester/load_tester_control.c +++ b/src/libcharon/plugins/load_tester/load_tester_control.c @@ -23,10 +23,14 @@ #include #include +#include #include +#include +#include #include typedef struct private_load_tester_control_t private_load_tester_control_t; +typedef struct init_listener_t init_listener_t; /** * Private data of an load_tester_control_t object. @@ -44,6 +48,42 @@ struct private_load_tester_control_t { int socket; }; +/** + * Listener to follow initiation progress + */ +struct init_listener_t { + + /** + * implements listener_t + */ + listener_t listener; + + /** + * Output stream to log to + */ + FILE *stream; + + /** + * IKE_SAs we have started to initiate + */ + hashtable_t *initiated; + + /** + * IKE_SAs we have completed to initate (success or failure) + */ + hashtable_t *completed; + + /** + * Mutex to lock IKE_SA tables + */ + mutex_t *mutex; + + /** + * Condvar to wait for completion + */ + condvar_t *condvar; +}; + /** * Open load-tester listening socket */ @@ -86,11 +126,75 @@ static bool open_socket(private_load_tester_control_t *this) return TRUE; } +/** + * Hashtable hash function + */ +static u_int hash(uintptr_t id) +{ + return id; +} + +/** + * Hashtable hash function + */ +static bool equals(uintptr_t a, uintptr_t b) +{ + return a == b; +} + +METHOD(listener_t, ike_state_change, bool, + init_listener_t *this, ike_sa_t *ike_sa, ike_sa_state_t state) +{ + if (state == IKE_ESTABLISHED || state == IKE_DESTROYING) + { + uintptr_t id; + bool match = FALSE; + + id = ike_sa->get_unique_id(ike_sa); + this->mutex->lock(this->mutex); + if (this->initiated->get(this->initiated, (void*)id)) + { + match = !this->completed->put(this->completed, (void*)id, (void*)id); + } + this->mutex->unlock(this->mutex); + + if (match) + { + this->condvar->signal(this->condvar); + fprintf(this->stream, state == IKE_ESTABLISHED ? "+" : "-"); + fflush(this->stream); + } + } + return TRUE; +} + +/** + * Logging callback function used during initiate + */ +static bool initiate_cb(init_listener_t *this, debug_t group, level_t level, + ike_sa_t *ike_sa, const char *message) +{ + uintptr_t id; + + if (ike_sa) + { + id = ike_sa->get_unique_id(ike_sa); + this->mutex->lock(this->mutex); + this->initiated->put(this->initiated, (void*)id, (void*)id); + this->mutex->unlock(this->mutex); + + return FALSE; + } + + return TRUE; +} + /** * Initiate load-test, write progress to stream */ static job_requeue_t initiate(FILE *stream) { + init_listener_t *listener; enumerator_t *enumerator; peer_cfg_t *peer_cfg; child_cfg_t *child_cfg; @@ -121,21 +225,53 @@ static job_requeue_t initiate(FILE *stream) } enumerator->destroy(enumerator); + INIT(listener, + .listener = { + .ike_state_change = _ike_state_change, + }, + .stream = stream, + .initiated = hashtable_create((void*)hash, (void*)equals, count), + .completed = hashtable_create((void*)hash, (void*)equals, count), + .mutex = mutex_create(MUTEX_TYPE_DEFAULT), + .condvar = condvar_create(CONDVAR_TYPE_DEFAULT), + ); + + charon->bus->add_listener(charon->bus, &listener->listener); + for (i = 0; i < count; i++) { - if (charon->controller->initiate(charon->controller, - peer_cfg->get_ref(peer_cfg), - child_cfg->get_ref(child_cfg), - controller_cb_empty, NULL, 0) == SUCCESS) + switch (charon->controller->initiate(charon->controller, + peer_cfg->get_ref(peer_cfg), child_cfg->get_ref(child_cfg), + (void*)initiate_cb, listener, 0)) { - fprintf(stream, "."); - } - else - { - fprintf(stream, "!"); + case NEED_MORE: + /* Callback returns FALSE once it got track of this IKE_SA. + * FALL */ + case SUCCESS: + fprintf(stream, "."); + break; + default: + fprintf(stream, "!"); + break; } fflush(stream); } + + listener->mutex->lock(listener->mutex); + while (listener->completed->get_count(listener->completed) < count) + { + listener->condvar->wait(listener->condvar, listener->mutex); + } + listener->mutex->unlock(listener->mutex); + + charon->bus->remove_listener(charon->bus, &listener->listener); + + listener->initiated->destroy(listener->initiated); + listener->completed->destroy(listener->completed); + listener->mutex->destroy(listener->mutex); + listener->condvar->destroy(listener->condvar); + free(listener); + peer_cfg->destroy(peer_cfg); fprintf(stream, "\n");