- code cleaned up

This commit is contained in:
Jan Hutter 2005-12-06 16:36:42 +00:00
parent f104664836
commit 3febcf15f0
11 changed files with 164 additions and 131 deletions

View File

@ -67,55 +67,55 @@ struct netlink_algo_t {
typedef struct private_kernel_interface_t private_kernel_interface_t; typedef struct private_kernel_interface_t private_kernel_interface_t;
/** /**
* @brief Private Variables and Functions of kernel_interface class * @brief Private Variables and Functions of kernel_interface class.
* *
*/ */
struct private_kernel_interface_t { struct private_kernel_interface_t {
/** /**
* Public part of the kernel_interface_t object * Public part of the kernel_interface_t object.
*/ */
kernel_interface_t public; kernel_interface_t public;
/** /**
* netlink communication socket * Netlink communication socket.
*/ */
int socket; int socket;
int bc_socket; int bc_socket;
pid_t pid; pid_t pid;
/** /**
* sequence number for messages * Sequence number for messages.
*/ */
u_int32_t seq; u_int32_t seq;
/** /**
* list of responded messages * List of responded messages.
*/ */
linked_list_t *responses; linked_list_t *responses;
/** /**
* thread which receives messages * Thread which receives messages.
*/ */
pthread_t thread; pthread_t thread;
/** /**
* mutex locks access to replies list * Mutex locks access to replies list.
*/ */
pthread_mutex_t mutex; pthread_mutex_t mutex;
/** /**
* Condvar allows signaling of threads waiting for a reply * Condvar allows signaling of threads waiting for a reply.
*/ */
pthread_cond_t condvar; pthread_cond_t condvar;
logger_t *logger; logger_t *logger;
/** /**
* Function for the thread, receives messages * Function for the thread, receives messages.
*/ */
void (*receive_messages) (private_kernel_interface_t *this); void (*receive_messages) (private_kernel_interface_t *this);
/** /**
* Sends a netlink_message_t down to the kernel * Sends a netlink_message_t down to the kernel.
*/ */
status_t (*send_message) (private_kernel_interface_t *this, netlink_message_t *request, netlink_message_t **response); status_t (*send_message) (private_kernel_interface_t *this, netlink_message_t *request, netlink_message_t **response);
}; };
@ -396,7 +396,7 @@ static void receive_messages(private_kernel_interface_t *this)
/** /**
* implements kernel_interface_t.destroy * Implementation of kernel_interface_t.destroy.
*/ */
static void destroy(private_kernel_interface_t *this) static void destroy(private_kernel_interface_t *this)
{ {
@ -408,7 +408,7 @@ static void destroy(private_kernel_interface_t *this)
} }
/* /*
* Documented in header * Described in header.
*/ */
kernel_interface_t *kernel_interface_create() kernel_interface_t *kernel_interface_create()
{ {

View File

@ -30,14 +30,23 @@
typedef struct kernel_interface_t kernel_interface_t; typedef struct kernel_interface_t kernel_interface_t;
/** /**
* @brief * @brief Interface to the kernel.
*
* @b Constructors:
* - kernel_interface_create()
* *
* @ingroup threads * @ingroup threads
*/ */
struct kernel_interface_t { struct kernel_interface_t {
/**
* Not yet fully implemented!
*/
status_t (*get_spi) (kernel_interface_t *this, host_t *src, host_t *dest, protocol_id_t protocol, bool tunnel_mode, u_int32_t *spi); status_t (*get_spi) (kernel_interface_t *this, host_t *src, host_t *dest, protocol_id_t protocol, bool tunnel_mode, u_int32_t *spi);
/**
* Not yet fully implemented!
*/
status_t (*add_sa)(kernel_interface_t *this, status_t (*add_sa)(kernel_interface_t *this,
host_t *me, host_t *me,
host_t *other, host_t *other,
@ -56,15 +65,13 @@ struct kernel_interface_t {
/** /**
* @brief Destroys a kernel_interface object. * @brief Destroys a kernel_interface object.
* *
* Stopps the prime thread and destroys the pool.
*
* @param kernel_interface_t calling object * @param kernel_interface_t calling object
*/ */
void (*destroy) (kernel_interface_t *kernel_interface); void (*destroy) (kernel_interface_t *kernel_interface);
}; };
/** /**
* @brief * @brief Creates an object of type kernel_interface_t.
* *
* @ingroup threads * @ingroup threads
*/ */

View File

@ -33,21 +33,21 @@
typedef struct prime_list_t prime_list_t; typedef struct prime_list_t prime_list_t;
/** /**
* A prime_list_t contains prime for a specific size. * A prime_list_t contains prime values of a specific size.
*/ */
struct prime_list_t { struct prime_list_t {
/** /**
* Size of the stored primes * Size of the stored primes .
*/ */
size_t prime_size; size_t prime_size;
/** /**
* is this much used prime_size ? * Is this much used prime_size ?
*/ */
u_int32_t usage; u_int32_t usage;
/** /**
* list of primes * List of primes.
*/ */
linked_list_t *primes; linked_list_t *primes;
}; };
@ -55,61 +55,60 @@ struct prime_list_t {
typedef struct private_prime_pool_t private_prime_pool_t; typedef struct private_prime_pool_t private_prime_pool_t;
/** /**
* @brief Private Variables and Functions of prime_pool class * @brief Private data of prime_pool_t.
*
*/ */
struct private_prime_pool_t { struct private_prime_pool_t {
/** /**
* Public part of the prime_pool_t object * Public part of the prime_pool_t object.
*/ */
prime_pool_t public; prime_pool_t public;
/** /**
* A list which contains a set of prime_list_t's * A list which contains a set of prime_list_t's.
*/ */
linked_list_t *prime_lists; linked_list_t *prime_lists;
/** /**
* prime generation is stopped if more than * prime generation is stopped if more than
* that primes of a kind are already generated * that primes of a kind are already generated.
*/ */
int generation_limit; int generation_limit;
/** /**
* access to prime_lists is locked through this mutex * Access to prime_lists is locked through this mutex.
*/ */
pthread_mutex_t mutex; pthread_mutex_t mutex;
/** /**
* If the queue is empty a thread has to wait * If the queue is empty a thread has to wait
* This condvar is used to wake up such a thread * This condvar is used to wake up such a thread.
*/ */
pthread_cond_t condvar; pthread_cond_t condvar;
/** /**
* prime generation thread * Prime generation thread.
*/ */
pthread_t thread; pthread_t thread;
/** /**
* Logger instance for the prime_pool * Logger instance for the prime_pool.
*/ */
logger_t *logger; logger_t *logger;
/** /**
* Function for the prime thread, generate primes * Function for the prime thread, generate primes.
*/ */
void (*generate_primes) (private_prime_pool_t *this); void (*generate_primes) (private_prime_pool_t *this);
/** /**
* calculate a prime of requested size * Calculate a prime of requested size.
*/ */
void (*compute_prime) (private_prime_pool_t *this, size_t prime_size, mpz_t *prime); void (*compute_prime) (private_prime_pool_t *this, size_t prime_size, mpz_t *prime);
}; };
/** /**
* implements prime_pool_t.get_count * Implementation of prime_pool_t.get_count.
*/ */
static int get_count(private_prime_pool_t *this, size_t prime_size) static int get_count(private_prime_pool_t *this, size_t prime_size)
{ {
@ -136,7 +135,7 @@ static int get_count(private_prime_pool_t *this, size_t prime_size)
} }
/** /**
* implements prime_pool_t.get_prime * Implementation of prime_pool_t.get_prime.
*/ */
static void get_prime(private_prime_pool_t *this, size_t prime_size, mpz_t *prime) static void get_prime(private_prime_pool_t *this, size_t prime_size, mpz_t *prime)
{ {
@ -161,7 +160,7 @@ static void get_prime(private_prime_pool_t *this, size_t prime_size, mpz_t *prim
prime_list->usage += this->prime_lists->get_count(this->prime_lists); prime_list->usage += this->prime_lists->get_count(this->prime_lists);
if (prime_list->primes->remove_first(prime_list->primes, (void*)&removed_prime) == SUCCESS) if (prime_list->primes->remove_first(prime_list->primes, (void*)&removed_prime) == SUCCESS)
{ {
this->logger->log(this->logger, CONTROL|MOST, "thread removed a prime with size %d", prime_size); this->logger->log(this->logger, CONTROL|MOST, "Thread removed a prime with size %d", prime_size);
mpz_init_set(*prime, *removed_prime); mpz_init_set(*prime, *removed_prime);
mpz_clear(*removed_prime); mpz_clear(*removed_prime);
allocator_free(removed_prime); allocator_free(removed_prime);
@ -175,7 +174,7 @@ static void get_prime(private_prime_pool_t *this, size_t prime_size, mpz_t *prim
if (create_new_list) if (create_new_list)
{ {
this->logger->log(this->logger, CONTROL|MORE, "creating a new list for primes with size %d", prime_size); this->logger->log(this->logger, CONTROL|MORE, "Creating a new list for primes with size %d", prime_size);
/* there is no list for this prime size, create one */ /* there is no list for this prime size, create one */
prime_list_t *prime_list; prime_list_t *prime_list;
prime_list = allocator_alloc_thing(prime_list_t); prime_list = allocator_alloc_thing(prime_list_t);
@ -192,13 +191,13 @@ static void get_prime(private_prime_pool_t *this, size_t prime_size, mpz_t *prim
if (!prime_found) if (!prime_found)
{ {
/* no prime found, create one ourself */ /* no prime found, create one ourself */
this->logger->log(this->logger, CONTROL|MOST, "caller didn't find a prime, generates on it's own."); this->logger->log(this->logger, CONTROL|MOST, "Caller didn't find a prime, generates on it's own.");
this->compute_prime(this, prime_size, prime); this->compute_prime(this, prime_size, prime);
} }
} }
/** /**
* implements private_prime_pool_t.compute_prime * Implementation of private_prime_pool_t.compute_prime.
*/ */
void compute_prime(private_prime_pool_t *this, size_t prime_size, mpz_t *prime) void compute_prime(private_prime_pool_t *this, size_t prime_size, mpz_t *prime)
{ {
@ -231,7 +230,7 @@ void compute_prime(private_prime_pool_t *this, size_t prime_size, mpz_t *prime)
} }
/** /**
* implements private_prime_pool_t.generate_primes * Implementation of private_prime_pool_t.generate_primes.
*/ */
void generate_primes(private_prime_pool_t *this) void generate_primes(private_prime_pool_t *this)
{ {
@ -246,7 +245,7 @@ void generate_primes(private_prime_pool_t *this)
mpz_t *prime; mpz_t *prime;
this->logger->log(this->logger, CONTROL|MOST, "finding most important prime size..."); this->logger->log(this->logger, CONTROL|MOST, "Finding most important prime size...");
pthread_mutex_lock(&(this->mutex)); pthread_mutex_lock(&(this->mutex));
@ -256,7 +255,7 @@ void generate_primes(private_prime_pool_t *this)
{ {
prime_list_t *prime_list; prime_list_t *prime_list;
iterator->current(iterator, (void*)&prime_list); iterator->current(iterator, (void*)&prime_list);
this->logger->log(this->logger, CONTROL|MOST, "primes with size %d have usage %d, %d in list", this->logger->log(this->logger, CONTROL|MOST, "Primes with size %d have usage %d, %d in list",
prime_list->prime_size, prime_list->usage, prime_list->prime_size, prime_list->usage,
prime_list->primes->get_count(prime_list->primes)); prime_list->primes->get_count(prime_list->primes));
/* get the prime_size with the highest usage factor */ /* get the prime_size with the highest usage factor */
@ -274,7 +273,7 @@ void generate_primes(private_prime_pool_t *this)
if (selected_prime_list == NULL) if (selected_prime_list == NULL)
{ {
this->logger->log(this->logger, CONTROL|MORE, "nothing to do, goint to sleep"); this->logger->log(this->logger, CONTROL|MORE, "Nothing to do, goint to sleep");
/* nothing to do. wait, while able to cancel */ /* nothing to do. wait, while able to cancel */
pthread_cleanup_push((void(*)(void*))pthread_mutex_unlock, (void*)&(this->mutex)); pthread_cleanup_push((void(*)(void*))pthread_mutex_unlock, (void*)&(this->mutex));
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
@ -289,14 +288,14 @@ void generate_primes(private_prime_pool_t *this)
if (selected_prime_list != NULL) if (selected_prime_list != NULL)
{ {
this->logger->log(this->logger, CONTROL|MORE, "going to generate a prime with size %d", this->logger->log(this->logger, CONTROL|MORE, "Going to generate a prime with size %d",
selected_prime_list->prime_size); selected_prime_list->prime_size);
/* generate the prime of requested size */ /* generate the prime of requested size */
prime = allocator_alloc_thing(mpz_t); prime = allocator_alloc_thing(mpz_t);
compute_prime(this, selected_prime_list->prime_size, prime); compute_prime(this, selected_prime_list->prime_size, prime);
/* insert prime */ /* insert prime */
this->logger->log(this->logger, CONTROL|MOST, "prime generated, inserting in list"); this->logger->log(this->logger, CONTROL|MOST, "Prime generated, inserting in list");
pthread_mutex_lock(&(this->mutex)); pthread_mutex_lock(&(this->mutex));
selected_prime_list->primes->insert_last(selected_prime_list->primes, (void*)prime); selected_prime_list->primes->insert_last(selected_prime_list->primes, (void*)prime);
pthread_mutex_unlock(&(this->mutex)); pthread_mutex_unlock(&(this->mutex));
@ -309,7 +308,7 @@ void generate_primes(private_prime_pool_t *this)
} }
/** /**
* implements prime_pool_t.destroy * Implementation of prime_pool_t.destroy.
*/ */
static void destroy (private_prime_pool_t *this) static void destroy (private_prime_pool_t *this)
{ {
@ -348,7 +347,7 @@ static void destroy (private_prime_pool_t *this)
} }
/* /*
* Documented in header * Documented in header,
*/ */
prime_pool_t *prime_pool_create(int generation_limit) prime_pool_t *prime_pool_create(int generation_limit)
{ {
@ -375,7 +374,7 @@ prime_pool_t *prime_pool_create(int generation_limit)
if (pthread_create(&(this->thread), NULL, (void*(*)(void*))this->generate_primes, this) != 0) if (pthread_create(&(this->thread), NULL, (void*(*)(void*))this->generate_primes, this) != 0)
{ {
/* failed. we live with that problem, since getting primes is still possible */ /* failed. we live with that problem, since getting primes is still possible */
this->logger->log(this->logger, ERROR, "thread creation failed, working without thread!"); this->logger->log(this->logger, ERROR, "Thread creation failed, working without thread!");
} }
/* set priority */ /* set priority */
else else
@ -389,13 +388,13 @@ prime_pool_t *prime_pool_create(int generation_limit)
if (pthread_setschedparam(this->thread, policy, &param) != 0) if (pthread_setschedparam(this->thread, policy, &param) != 0)
{ {
/* failed to set priority */ /* failed to set priority */
this->logger->log(this->logger, ERROR, "could not reduce priority of thread, running in default priority!"); this->logger->log(this->logger, ERROR, "Could not reduce priority of thread, running in default priority!");
} }
} }
else else
{ {
/* failed to get priority */ /* failed to get priority */
this->logger->log(this->logger, ERROR, "could not reduce priority of thread, running in default priority!"); this->logger->log(this->logger, ERROR, "Could not reduce priority of thread, running in default priority!");
} }
} }
} }

View File

@ -39,6 +39,11 @@ typedef struct prime_pool_t prime_pool_t;
* This increases responsibility, since prime generation * This increases responsibility, since prime generation
* is the most time-consuming task. * is the most time-consuming task.
* *
* @b Constructors:
* - prime_pool_create()
*
* @todo Store and load prime values
*
* @ingroup threads * @ingroup threads
*/ */
struct prime_pool_t { struct prime_pool_t {
@ -46,8 +51,8 @@ struct prime_pool_t {
/** /**
* @brief Get the number of available primes for the given prime size. * @brief Get the number of available primes for the given prime size.
* *
* @param prime_pool_t calling object * @param prime_pool calling object
* @param size of the prime * @param prime_size size of the prime
* @returns number of primes * @returns number of primes
*/ */
int (*get_count) (prime_pool_t *prime_pool, size_t prime_size); int (*get_count) (prime_pool_t *prime_pool, size_t prime_size);
@ -59,7 +64,9 @@ struct prime_pool_t {
* Supplied mpz will be initialized to a prime and must be cleared * Supplied mpz will be initialized to a prime and must be cleared
* after usage. * after usage.
* *
* @param prime_pool_t calling object * @param prime_pool calling object
* @param prime_size size of the prime to return
* @param prime the prime value will be written into pointed mpz_t value.
* @return chunk containing the prime * @return chunk containing the prime
*/ */
void (*get_prime) (prime_pool_t *prime_pool, size_t prime_size, mpz_t *prime); void (*get_prime) (prime_pool_t *prime_pool, size_t prime_size, mpz_t *prime);
@ -69,7 +76,7 @@ struct prime_pool_t {
* *
* Stopps the prime thread and destroys the pool. * Stopps the prime thread and destroys the pool.
* *
* @param prime_pool_t calling object * @param prime_pool calling object
*/ */
void (*destroy) (prime_pool_t *prime_pool); void (*destroy) (prime_pool_t *prime_pool);
}; };
@ -83,7 +90,7 @@ struct prime_pool_t {
* the get_prime-calling thread. * the get_prime-calling thread.
* *
* @param generation_limit generation limit to use * @param generation_limit generation limit to use
* @return created prime pool * @return prime_pool_t object
* *
* @ingroup threads * @ingroup threads
*/ */

View File

@ -34,36 +34,38 @@
#include <utils/allocator.h> #include <utils/allocator.h>
#include <utils/logger_manager.h> #include <utils/logger_manager.h>
typedef struct private_receiver_t private_receiver_t; typedef struct private_receiver_t private_receiver_t;
/** /**
* Private data of a receiver object * Private data of a receiver_t object.
*/ */
struct private_receiver_t { struct private_receiver_t {
/** /**
* Public part of a receiver object * Public part of a receiver_t object.
*/ */
receiver_t public; receiver_t public;
/** /**
* @brief Thread function started at creation of the receiver object. * @brief Thread function started at creation of the receiver object.
* *
* @param this assigned receiver object * @param this calling object
*/ */
void (*receive_packets) (private_receiver_t *this); void (*receive_packets) (private_receiver_t *this);
/** /**
* Assigned thread to the receiver_t object * Assigned thread.
*/ */
pthread_t assigned_thread; pthread_t assigned_thread;
/** /**
* logger for the receiver * A logger for the receiver_t object.
*/ */
logger_t *logger; logger_t *logger;
}; };
/** /**
* implements private_receiver_t.receive_packets * Implementation of receiver_t.receive_packets.
*/ */
static void receive_packets(private_receiver_t * this) static void receive_packets(private_receiver_t * this)
{ {
@ -73,25 +75,25 @@ static void receive_packets(private_receiver_t * this)
/* cancellation disabled by default */ /* cancellation disabled by default */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
this->logger->log(this->logger, CONTROL, "receiver thread running, thread_id %u", (int)pthread_self()); this->logger->log(this->logger, CONTROL, "Receiver thread running, thread_id %u", (int)pthread_self());
while (1) while (1)
{ {
while (charon->socket->receive(charon->socket,&current_packet) == SUCCESS) while (charon->socket->receive(charon->socket,&current_packet) == SUCCESS)
{ {
this->logger->log(this->logger, CONTROL, "creating job from packet"); this->logger->log(this->logger, CONTROL | MORE, "Creating job from packet");
current_job = (job_t *) incoming_packet_job_create(current_packet); current_job = (job_t *) incoming_packet_job_create(current_packet);
charon->job_queue->add(charon->job_queue,current_job); charon->job_queue->add(charon->job_queue,current_job);
} }
/* bad bad, rebuild the socket ? */ /* bad bad, rebuild the socket ? */
this->logger->log(this->logger, ERROR, "receiving from socket failed!"); this->logger->log(this->logger, ERROR, "Receiving from socket failed!");
} }
} }
/** /**
* Implementation of receiver_t's destroy function * Implementation of receiver_t.destroy.
*/ */
static void destroy(private_receiver_t *this) static void destroy(private_receiver_t *this)
{ {
@ -107,7 +109,7 @@ static void destroy(private_receiver_t *this)
} }
/* /*
* see header * Described in header.
*/ */
receiver_t * receiver_create() receiver_t * receiver_create()
{ {

View File

@ -35,12 +35,15 @@ typedef struct receiver_t receiver_t;
* data is available, a packet_t object is created , wrapped * data is available, a packet_t object is created , wrapped
* in an incoming_packet_job_t and added to the job queue. * in an incoming_packet_job_t and added to the job queue.
* *
* @b Constructors:
* - receiver_create()
*
* @ingroup threads * @ingroup threads
*/ */
struct receiver_t { struct receiver_t {
/** /**
* @brief Destroys a receiver_t * @brief Destroys a receiver_t object.
* *
* @param receiver receiver object * @param receiver receiver object
*/ */
@ -48,13 +51,13 @@ struct receiver_t {
}; };
/** /**
* @brief Create a receiver. * @brief Create a receiver_t object.
* *
* The receiver thread will start working, get data * The receiver thread will start working, get data
* from the socket and add those packets to the job queue. * from the socket and add those packets to the job queue.
* *
* @return * @return
* - created receiver_t, or * - receiver_t object
* - NULL of thread could not be started * - NULL of thread could not be started
* *
* @ingroup threads * @ingroup threads

View File

@ -31,14 +31,15 @@
#include <utils/logger_manager.h> #include <utils/logger_manager.h>
#include <queues/job_queue.h> #include <queues/job_queue.h>
/**
* Private data of a scheduler object
*/
typedef struct private_scheduler_t private_scheduler_t; typedef struct private_scheduler_t private_scheduler_t;
/**
* Private data of a scheduler_t object.
*/
struct private_scheduler_t { struct private_scheduler_t {
/** /**
* Public part of a scheduler object * Public part of a scheduler_t object.
*/ */
scheduler_t public; scheduler_t public;
@ -47,23 +48,23 @@ struct private_scheduler_t {
* *
* Thread function started at creation of the scheduler object. * Thread function started at creation of the scheduler object.
* *
* @param this assigned scheduler object * @param this calling object
*/ */
void (*get_events) (private_scheduler_t *this); void (*get_events) (private_scheduler_t *this);
/** /**
* Assigned thread to the scheduler_t object * Assigned thread.
*/ */
pthread_t assigned_thread; pthread_t assigned_thread;
/** /**
* logger for this scheduler * A logger.
*/ */
logger_t *logger; logger_t *logger;
}; };
/** /**
* implements private_scheduler_t.get_events * Implementation of private_scheduler_t.get_events.
*/ */
static void get_events(private_scheduler_t * this) static void get_events(private_scheduler_t * this)
{ {
@ -72,22 +73,22 @@ static void get_events(private_scheduler_t * this)
/* cancellation disabled by default */ /* cancellation disabled by default */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
this->logger->log(this->logger, CONTROL, "scheduler thread running, thread_id %u", (int)pthread_self()); this->logger->log(this->logger, CONTROL, "Scheduler thread running, thread_id %u", (int)pthread_self());
for (;;) for (;;)
{ {
this->logger->log(this->logger, CONTROL|MORE, "waiting for next event..."); this->logger->log(this->logger, CONTROL|MOST, "Waiting for next event...");
/* get a job, this block until one is available */ /* get a job, this block until one is available */
current_job = charon->event_queue->get(charon->event_queue); current_job = charon->event_queue->get(charon->event_queue);
/* queue the job in the job queue, workers will eat them */ /* queue the job in the job queue, workers will eat them */
charon->job_queue->add(charon->job_queue, current_job); charon->job_queue->add(charon->job_queue, current_job);
this->logger->log(this->logger, CONTROL, "got event, added job %s to job-queue.", this->logger->log(this->logger, CONTROL | MORE, "Got event, added job %s to job-queue.",
mapping_find(job_type_m, current_job->get_type(current_job))); mapping_find(job_type_m, current_job->get_type(current_job)));
} }
} }
/** /**
* Implementation of scheduler_t's destroy function * Implementation of scheduler_t.destroy.
*/ */
static void destroy(private_scheduler_t *this) static void destroy(private_scheduler_t *this)
{ {
@ -102,7 +103,9 @@ static void destroy(private_scheduler_t *this)
allocator_free(this); allocator_free(this);
} }
/*
* Described in header.
*/
scheduler_t * scheduler_create() scheduler_t * scheduler_create()
{ {
private_scheduler_t *this = allocator_alloc_thing(private_scheduler_t); private_scheduler_t *this = allocator_alloc_thing(private_scheduler_t);

View File

@ -28,11 +28,16 @@
typedef struct scheduler_t scheduler_t; typedef struct scheduler_t scheduler_t;
/** /**
* @brief The scheduler, looks for timed events in event-queue and adds them * @brief The scheduler thread is responsible for timed events.
*
* The scheduler thread takes out jobs from the event-queue and adds them
* to the job-queue. * to the job-queue.
* *
* Starts a thread which does the work, since event-queue is blocking. * Starts a thread which does the work, since event-queue is blocking.
* *
* @b Constructors:
* - scheduler_create()
*
* @ingroup threads * @ingroup threads
*/ */
struct scheduler_t { struct scheduler_t {
@ -40,19 +45,19 @@ struct scheduler_t {
/** /**
* @brief Destroys a scheduler object. * @brief Destroys a scheduler object.
* *
* @param scheduler scheduler object * @param scheduler calling object
*/ */
void (*destroy) (scheduler_t *scheduler); void (*destroy) (scheduler_t *scheduler);
}; };
/** /**
* @brief Create a scheduler with its thread. * @brief Create a scheduler with its associated thread.
* *
* The thread will start to get jobs form the event queue * The thread will start to get jobs form the event queue
* and adds them to the job queue. * and adds them to the job queue.
* *
* @return * @return
* - the created scheduler_t instance, or * - scheduler_t object
* - NULL if thread could not be started * - NULL if thread could not be started
* *
* @ingroup threads * @ingroup threads

View File

@ -74,16 +74,16 @@ static void send_packets(private_sender_t * this)
/* cancellation disabled by default */ /* cancellation disabled by default */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
this->logger->log(this->logger, CONTROL, "sender thread running, thread_id %u", (int)pthread_self()); this->logger->log(this->logger, CONTROL, "Sender thread running, thread_id %u", (int)pthread_self());
while (1) while (1)
{ {
current_packet = charon->send_queue->get(charon->send_queue); current_packet = charon->send_queue->get(charon->send_queue);
this->logger->log(this->logger, CONTROL|MORE, "got a packet, sending it"); this->logger->log(this->logger, CONTROL|MORE, "Got a packet, sending it");
status = charon->socket->send(charon->socket,current_packet); status = charon->socket->send(charon->socket,current_packet);
if (status != SUCCESS) if (status != SUCCESS)
{ {
this->logger->log(this->logger, ERROR, "sending failed, socket returned %s", this->logger->log(this->logger, ERROR, "Sending failed, socket returned %s",
mapping_find(status_m, status)); mapping_find(status_m, status));
} }
current_packet->destroy(current_packet); current_packet->destroy(current_packet);

View File

@ -38,31 +38,32 @@
#include <utils/allocator.h> #include <utils/allocator.h>
#include <utils/logger.h> #include <utils/logger.h>
typedef struct private_thread_pool_t private_thread_pool_t; typedef struct private_thread_pool_t private_thread_pool_t;
/** /**
* @brief Structure with private members for thread_pool_t. * @brief Private data of thread_pool_t class.
*/ */
struct private_thread_pool_t { struct private_thread_pool_t {
/** /**
* inclusion of public members * Public thread_pool_t interface.
*/ */
thread_pool_t public; thread_pool_t public;
/** /**
* @brief Main processing functino for worker threads. * @brief Main processing function for worker threads.
* *
* Gets a job from the job queue and calls corresponding * Gets a job from the job queue and calls corresponding
* function for processing. * function for processing.
* *
* @param this private_thread_pool_t-Object * @param this calling object
*/ */
void (*process_jobs) (private_thread_pool_t *this); void (*process_jobs) (private_thread_pool_t *this);
/** /**
* @brief Process a INCOMING_PACKET job. * @brief Process a INCOMING_PACKET job.
* *
* @param this private_thread_pool_t object * @param this calling object
* @param job incoming_packet_job_t object * @param job incoming_packet_job_t object
*/ */
void (*process_incoming_packet_job) (private_thread_pool_t *this, incoming_packet_job_t *job); void (*process_incoming_packet_job) (private_thread_pool_t *this, incoming_packet_job_t *job);
@ -70,7 +71,7 @@ struct private_thread_pool_t {
/** /**
* @brief Process a INITIATE_IKE_SA job. * @brief Process a INITIATE_IKE_SA job.
* *
* @param this private_thread_pool_t object * @param this calling object
* @param job initiate_ike_sa_job_t object * @param job initiate_ike_sa_job_t object
*/ */
void (*process_initiate_ike_sa_job) (private_thread_pool_t *this, initiate_ike_sa_job_t *job); void (*process_initiate_ike_sa_job) (private_thread_pool_t *this, initiate_ike_sa_job_t *job);
@ -78,7 +79,7 @@ struct private_thread_pool_t {
/** /**
* @brief Process a DELETE_HALF_OPEN_IKE_SA job. * @brief Process a DELETE_HALF_OPEN_IKE_SA job.
* *
* @param this private_thread_pool_t object * @param this calling object
* @param job delete__half_open_ike_sa_job_t object * @param job delete__half_open_ike_sa_job_t object
*/ */
void (*process_delete_half_open_ike_sa_job) (private_thread_pool_t *this, delete_half_open_ike_sa_job_t *job); void (*process_delete_half_open_ike_sa_job) (private_thread_pool_t *this, delete_half_open_ike_sa_job_t *job);
@ -86,7 +87,7 @@ struct private_thread_pool_t {
/** /**
* @brief Process a DELETE_ESTABLISHED_IKE_SA job. * @brief Process a DELETE_ESTABLISHED_IKE_SA job.
* *
* @param this private_thread_pool_t object * @param this calling object
* @param job delete_established_ike_sa_job_t object * @param job delete_established_ike_sa_job_t object
*/ */
void (*process_delete_established_ike_sa_job) (private_thread_pool_t *this, delete_established_ike_sa_job_t *job); void (*process_delete_established_ike_sa_job) (private_thread_pool_t *this, delete_established_ike_sa_job_t *job);
@ -94,7 +95,7 @@ struct private_thread_pool_t {
/** /**
* @brief Process a RETRANSMIT_REQUEST job. * @brief Process a RETRANSMIT_REQUEST job.
* *
* @param this private_thread_pool_t object * @param this calling object
* @param job retransmit_request_job_t object * @param job retransmit_request_job_t object
*/ */
void (*process_retransmit_request_job) (private_thread_pool_t *this, retransmit_request_job_t *job); void (*process_retransmit_request_job) (private_thread_pool_t *this, retransmit_request_job_t *job);
@ -111,22 +112,22 @@ struct private_thread_pool_t {
void (*create_delete_half_open_ike_sa_job) (private_thread_pool_t *this,ike_sa_id_t *ike_sa_id, u_int32_t delay); void (*create_delete_half_open_ike_sa_job) (private_thread_pool_t *this,ike_sa_id_t *ike_sa_id, u_int32_t delay);
/** /**
* number of running threads * Number of running threads.
*/ */
size_t pool_size; size_t pool_size;
/** /**
* array of thread ids * Array of thread ids.
*/ */
pthread_t *threads; pthread_t *threads;
/** /**
* logger of the threadpool * Logger of the thread pool.
*/ */
logger_t *pool_logger; logger_t *pool_logger;
/** /**
* logger of the worker threads * Logger of the worker threads.
*/ */
logger_t *worker_logger; logger_t *worker_logger;
} ; } ;
@ -144,13 +145,13 @@ static void process_jobs(private_thread_pool_t *this)
/* cancellation disabled by default */ /* cancellation disabled by default */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
this->worker_logger->log(this->worker_logger, CONTROL, "worker thread running, thread_id: %u", (int)pthread_self()); this->worker_logger->log(this->worker_logger, CONTROL, "Worker thread running, thread_id: %u", (int)pthread_self());
for (;;) { for (;;) {
job = charon->job_queue->get(charon->job_queue); job = charon->job_queue->get(charon->job_queue);
job_type = job->get_type(job); job_type = job->get_type(job);
this->worker_logger->log(this->worker_logger, CONTROL|MORE, "Process job of type %s", this->worker_logger->log(this->worker_logger, CONTROL|MOST, "Process job of type %s",
mapping_find(job_type_m,job_type)); mapping_find(job_type_m,job_type));
gettimeofday(&start_time,NULL); gettimeofday(&start_time,NULL);
switch (job_type) switch (job_type)
@ -186,7 +187,7 @@ static void process_jobs(private_thread_pool_t *this)
} }
default: default:
{ {
this->worker_logger->log(this->worker_logger, ERROR, "job of type %s not supported!", this->worker_logger->log(this->worker_logger, ERROR, "Job of type %s not supported!",
mapping_find(job_type_m,job_type)); mapping_find(job_type_m,job_type));
job->destroy(job); job->destroy(job);
break; break;
@ -194,7 +195,7 @@ static void process_jobs(private_thread_pool_t *this)
} }
gettimeofday(&end_time,NULL); gettimeofday(&end_time,NULL);
this->worker_logger->log(this->worker_logger, CONTROL, "Processed job of type %s in %d us", this->worker_logger->log(this->worker_logger, CONTROL | MOST, "Processed job of type %s in %d us",
mapping_find(job_type_m,job_type), mapping_find(job_type_m,job_type),
(((end_time.tv_sec - start_time.tv_sec) * 1000000) + (end_time.tv_usec - start_time.tv_usec))); (((end_time.tv_sec - start_time.tv_sec) * 1000000) + (end_time.tv_usec - start_time.tv_usec)));
@ -221,12 +222,12 @@ static void process_incoming_packet_job(private_thread_pool_t *this, incoming_pa
status = message->parse_header(message); status = message->parse_header(message);
if (status != SUCCESS) if (status != SUCCESS)
{ {
this->worker_logger->log(this->worker_logger, ERROR, "message header could not be verified!"); this->worker_logger->log(this->worker_logger, ERROR, "Message header could not be verified!");
message->destroy(message); message->destroy(message);
return; return;
} }
this->worker_logger->log(this->worker_logger, CONTROL|MOST, "message is a %s %s", this->worker_logger->log(this->worker_logger, CONTROL|MOST, "Message is a %s %s",
mapping_find(exchange_type_m, message->get_exchange_type(message)), mapping_find(exchange_type_m, message->get_exchange_type(message)),
message->get_request(message) ? "request" : "reply"); message->get_request(message) ? "request" : "reply");
@ -234,7 +235,7 @@ static void process_incoming_packet_job(private_thread_pool_t *this, incoming_pa
(message->get_minor_version(message) != IKE_MINOR_VERSION)) (message->get_minor_version(message) != IKE_MINOR_VERSION))
{ {
this->worker_logger->log(this->worker_logger, ERROR, "IKE version %d.%d not supported", this->worker_logger->log(this->worker_logger, ERROR | MOST, "IKE version %d.%d not supported",
message->get_major_version(message), message->get_major_version(message),
message->get_minor_version(message)); message->get_minor_version(message));
/* /*
@ -275,7 +276,7 @@ static void process_incoming_packet_job(private_thread_pool_t *this, incoming_pa
ike_sa_id->switch_initiator(ike_sa_id); ike_sa_id->switch_initiator(ike_sa_id);
this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking out IKE SA %lld:%lld, role %s", this->worker_logger->log(this->worker_logger, CONTROL|ALL, "Checking out IKE SA %lld:%lld, role %s",
ike_sa_id->get_initiator_spi(ike_sa_id), ike_sa_id->get_initiator_spi(ike_sa_id),
ike_sa_id->get_responder_spi(ike_sa_id), ike_sa_id->get_responder_spi(ike_sa_id),
ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder"); ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
@ -296,17 +297,17 @@ static void process_incoming_packet_job(private_thread_pool_t *this, incoming_pa
if (status == CREATED) if (status == CREATED)
{ {
this->worker_logger->log(this->worker_logger, CONTROL|MOST, "Create Job to delete half open IKE_SA."); this->worker_logger->log(this->worker_logger, CONTROL|ALL, "Create Job to delete half open IKE_SA.");
this->create_delete_half_open_ike_sa_job(this,ike_sa_id,charon->configuration_manager->get_half_open_ike_sa_timeout(charon->configuration_manager)); this->create_delete_half_open_ike_sa_job(this,ike_sa_id,charon->configuration_manager->get_half_open_ike_sa_timeout(charon->configuration_manager));
} }
status = ike_sa->process_message(ike_sa, message); status = ike_sa->process_message(ike_sa, message);
if ((status != SUCCESS) && (status != DELETE_ME)) if ((status != SUCCESS) && (status != DELETE_ME))
{ {
this->worker_logger->log(this->worker_logger, ERROR, "message could not be processed by IKE SA"); this->worker_logger->log(this->worker_logger, ERROR, "Message could not be processed by IKE SA");
} }
this->worker_logger->log(this->worker_logger, CONTROL|MOST, "%s IKE SA %lld:%lld, role %s", this->worker_logger->log(this->worker_logger, CONTROL|ALL, "%s IKE SA %lld:%lld, role %s",
(status == DELETE_ME) ? "Checkin and delete" : "Checkin", (status == DELETE_ME) ? "Checkin and delete" : "Checkin",
ike_sa_id->get_initiator_spi(ike_sa_id), ike_sa_id->get_initiator_spi(ike_sa_id),
ike_sa_id->get_responder_spi(ike_sa_id), ike_sa_id->get_responder_spi(ike_sa_id),
@ -324,7 +325,7 @@ static void process_incoming_packet_job(private_thread_pool_t *this, incoming_pa
if (status != SUCCESS) if (status != SUCCESS)
{ {
this->worker_logger->log(this->worker_logger, ERROR, "checkin of IKE SA failed!"); this->worker_logger->log(this->worker_logger, ERROR, "Checkin of IKE SA failed!");
} }
message->destroy(message); message->destroy(message);
} }
@ -344,29 +345,29 @@ static void process_initiate_ike_sa_job(private_thread_pool_t *this, initiate_ik
status_t status; status_t status;
this->worker_logger->log(this->worker_logger, CONTROL|MOST, "create and checking out IKE SA"); this->worker_logger->log(this->worker_logger, CONTROL|MOST, "Create and checking out IKE SA");
charon->ike_sa_manager->create_and_checkout(charon->ike_sa_manager, &ike_sa); charon->ike_sa_manager->create_and_checkout(charon->ike_sa_manager, &ike_sa);
this->worker_logger->log(this->worker_logger, CONTROL|MOST, "initializing connection \"%s\"", this->worker_logger->log(this->worker_logger, CONTROL, "Initializing connection \"%s\"",
job->get_configuration_name(job)); job->get_configuration_name(job));
status = ike_sa->initialize_connection(ike_sa, job->get_configuration_name(job)); status = ike_sa->initialize_connection(ike_sa, job->get_configuration_name(job));
if (status != SUCCESS) if (status != SUCCESS)
{ {
this->worker_logger->log(this->worker_logger, ERROR, "%s by initialize_conection, going to delete IKE_SA.", this->worker_logger->log(this->worker_logger, ERROR, "%s: By initialize_conection, going to delete IKE_SA.",
mapping_find(status_m, status)); mapping_find(status_m, status));
charon->ike_sa_manager->checkin_and_delete(charon->ike_sa_manager, ike_sa); charon->ike_sa_manager->checkin_and_delete(charon->ike_sa_manager, ike_sa);
return; return;
} }
this->worker_logger->log(this->worker_logger, CONTROL|MOST, "Create Job to delete half open IKE_SA."); this->worker_logger->log(this->worker_logger, CONTROL|ALL, "Create Job to delete half open IKE_SA.");
this->create_delete_half_open_ike_sa_job(this,ike_sa->get_id(ike_sa),charon->configuration_manager->get_half_open_ike_sa_timeout(charon->configuration_manager)); this->create_delete_half_open_ike_sa_job(this,ike_sa->get_id(ike_sa),charon->configuration_manager->get_half_open_ike_sa_timeout(charon->configuration_manager));
this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking in IKE SA"); this->worker_logger->log(this->worker_logger, CONTROL|MOST, "Checking in IKE SA");
status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
if (status != SUCCESS) if (status != SUCCESS)
{ {
this->worker_logger->log(this->worker_logger, ERROR, "%s could not checkin IKE_SA.", this->worker_logger->log(this->worker_logger, ERROR, "%s: Could not checkin IKE_SA.",
mapping_find(status_m, status)); mapping_find(status_m, status));
} }
} }
@ -471,7 +472,7 @@ static void process_retransmit_request_job(private_thread_pool_t *this, retransm
ike_sa_t *ike_sa; ike_sa_t *ike_sa;
status_t status; status_t status;
this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking out IKE SA %lld:%lld, role %s", this->worker_logger->log(this->worker_logger, CONTROL|MOST, "Checking out IKE SA %lld:%lld, role %s",
ike_sa_id->get_initiator_spi(ike_sa_id), ike_sa_id->get_initiator_spi(ike_sa_id),
ike_sa_id->get_responder_spi(ike_sa_id), ike_sa_id->get_responder_spi(ike_sa_id),
ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder"); ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
@ -488,7 +489,7 @@ static void process_retransmit_request_job(private_thread_pool_t *this, retransm
if (status != SUCCESS) if (status != SUCCESS)
{ {
this->worker_logger->log(this->worker_logger, CONTROL | MOST, "Message doesn't have to be retransmitted"); this->worker_logger->log(this->worker_logger, CONTROL | ALL, "Message doesn't have to be retransmitted");
stop_retransmitting = TRUE; stop_retransmitting = TRUE;
} }
@ -532,7 +533,7 @@ static void create_delete_half_open_ike_sa_job(private_thread_pool_t *this,ike_s
{ {
job_t *delete_job; job_t *delete_job;
this->worker_logger->log(this->worker_logger, CONTROL | MORE, "Going to create job to delete half open IKE_SA in %d ms", delay); this->worker_logger->log(this->worker_logger, CONTROL | MOST, "Going to create job to delete half open IKE_SA in %d ms", delay);
delete_job = (job_t *) delete_half_open_ike_sa_job_create(ike_sa_id); delete_job = (job_t *) delete_half_open_ike_sa_job_create(ike_sa_id);
charon->event_queue->add_relative(charon->event_queue,delete_job, delay); charon->event_queue->add_relative(charon->event_queue,delete_job, delay);
@ -606,14 +607,14 @@ thread_pool_t *thread_pool_create(size_t pool_size)
{ {
if (pthread_create(&(this->threads[current]), NULL, (void*(*)(void*))this->process_jobs, this) == 0) if (pthread_create(&(this->threads[current]), NULL, (void*(*)(void*))this->process_jobs, this) == 0)
{ {
this->pool_logger->log(this->pool_logger, CONTROL, "created worker thread #%d", current+1); this->pool_logger->log(this->pool_logger, CONTROL, "Created worker thread #%d", current+1);
} }
else else
{ {
/* creation failed, is it the first one? */ /* creation failed, is it the first one? */
if (current == 0) if (current == 0)
{ {
this->pool_logger->log(this->pool_logger, ERROR, "could not create any thread"); this->pool_logger->log(this->pool_logger, ERROR, "Could not create any thread");
charon->logger_manager->destroy_logger(charon->logger_manager, this->pool_logger); charon->logger_manager->destroy_logger(charon->logger_manager, this->pool_logger);
charon->logger_manager->destroy_logger(charon->logger_manager, this->worker_logger); charon->logger_manager->destroy_logger(charon->logger_manager, this->worker_logger);
allocator_free(this->threads); allocator_free(this->threads);
@ -621,7 +622,7 @@ thread_pool_t *thread_pool_create(size_t pool_size)
return NULL; return NULL;
} }
/* not all threads could be created, but at least one :-/ */ /* not all threads could be created, but at least one :-/ */
this->pool_logger->log(this->pool_logger, ERROR, "could only create %d from requested %d threads!", current, pool_size); this->pool_logger->log(this->pool_logger, ERROR, "Could only create %d from requested %d threads!", current, pool_size);
this->pool_size = current; this->pool_size = current;
return (thread_pool_t*)this; return (thread_pool_t*)this;

View File

@ -31,27 +31,33 @@
typedef struct thread_pool_t thread_pool_t; typedef struct thread_pool_t thread_pool_t;
/** /**
* @brief A thread_pool contains a pool of threads processing the job queue. * @brief A thread_pool consists of a pool of threads processing jobs from the job queue.
* *
* Current implementation uses as many threads as specified in constructor. * Current implementation uses as many threads as specified in constructor.
* A more improved version would dynamically increase thread count if necessary. * A more improved version would dynamically increase thread count if necessary.
* *
* @b Constructors:
* - thread_pool_create()
*
* @todo Add support for dynamic thread handling
*
* @ingroup threads * @ingroup threads
*/ */
struct thread_pool_t { struct thread_pool_t {
/** /**
* @brief Return currently instanciated threads. * @brief Return currently instanciated thread count.
* *
* @param thread_pool thread_pool_t object * @param thread_pool calling object
* @return size of thread pool * @return size of thread pool
*/ */
size_t (*get_pool_size) (thread_pool_t *thread_pool); size_t (*get_pool_size) (thread_pool_t *thread_pool);
/** /**
* @brief Destroy a thread_pool_t. * @brief Destroy a thread_pool_t object.
* *
* sends cancellation request to all threads and AWAITS their termination. * Sends cancellation request to all threads and AWAITS their termination.
* *
* @param thread_pool thread_pool_t object * @param thread_pool calling object
*/ */
void (*destroy) (thread_pool_t *thread_pool); void (*destroy) (thread_pool_t *thread_pool);
}; };
@ -61,7 +67,7 @@ struct thread_pool_t {
* *
* @param pool_size desired pool size * @param pool_size desired pool size
* @return * @return
* - thread_pool_t if one ore more threads could be started, or * - thread_pool_t object if one ore more threads could be started, or
* - NULL if no threads could be created * - NULL if no threads could be created
* *
* @ingroup threads * @ingroup threads