thread locking for sender and processor optimized

This commit is contained in:
Tobias Brunner 2008-04-03 09:19:12 +00:00
parent 6af29ccf33
commit 8e91a36314
3 changed files with 43 additions and 22 deletions

View File

@ -53,9 +53,14 @@ struct private_sender_t {
pthread_mutex_t mutex;
/**
* condvar to signal for packets in list
* condvar to signal for packets added to list
*/
pthread_cond_t condvar;
pthread_cond_t gotone;
/**
* condvar to signal for packets sent
*/
pthread_cond_t sentone;
};
/**
@ -71,8 +76,8 @@ static void send_(private_sender_t *this, packet_t *packet)
pthread_mutex_lock(&this->mutex);
this->list->insert_last(this->list, packet);
pthread_cond_signal(&this->gotone);
pthread_mutex_unlock(&this->mutex);
pthread_cond_signal(&this->condvar);
}
/**
@ -90,12 +95,13 @@ static job_requeue_t send_packets(private_sender_t * this)
pthread_cleanup_push((void(*)(void*))pthread_mutex_unlock, (void*)&this->mutex);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
pthread_cond_wait(&this->condvar, &this->mutex);
pthread_cond_wait(&this->gotone, &this->mutex);
pthread_setcancelstate(oldstate, NULL);
pthread_cleanup_pop(0);
}
this->list->remove_first(this->list, (void**)&packet);
pthread_cond_signal(&this->sentone);
pthread_mutex_unlock(&this->mutex);
charon->socket->send(charon->socket, packet);
@ -109,10 +115,13 @@ static job_requeue_t send_packets(private_sender_t * this)
static void destroy(private_sender_t *this)
{
/* send all packets in the queue */
pthread_mutex_lock(&this->mutex);
while (this->list->get_count(this->list))
{
sched_yield();
pthread_cond_wait(&this->sentone, &this->mutex);
}
pthread_mutex_unlock(&this->mutex);
pthread_mutex_destroy(&this->mutex);
this->job->cancel(this->job);
this->list->destroy(this->list);
free(this);
@ -130,7 +139,8 @@ sender_t * sender_create()
this->list = linked_list_create();
pthread_mutex_init(&this->mutex, NULL);
pthread_cond_init(&this->condvar, NULL);
pthread_cond_init(&this->gotone, NULL);
pthread_cond_init(&this->sentone, NULL);
this->job = callback_job_create((callback_job_cb_t)send_packets,
this, NULL, NULL);

View File

@ -55,11 +55,6 @@ struct private_callback_job_t {
*/
pthread_mutex_t mutex;
/**
* condvar to synchronize thread startup/cancellation
*/
pthread_cond_t condvar;
/**
* list of asociated child jobs
*/
@ -140,7 +135,6 @@ static void execute(private_callback_job_t *this)
pthread_mutex_lock(&this->mutex);
this->thread = pthread_self();
pthread_cond_signal(&this->condvar);
pthread_mutex_unlock(&this->mutex);
pthread_cleanup_push((void*)destroy, this);
@ -187,7 +181,6 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
/* private variables */
pthread_mutex_init(&this->mutex, NULL);
pthread_cond_init(&this->condvar, NULL);
this->callback = cb;
this->data = data;
this->cleanup = cleanup;

View File

@ -66,7 +66,12 @@ struct private_processor_t {
/**
* Condvar to wait for new jobs
*/
pthread_cond_t condvar;
pthread_cond_t jobadded;
/**
* Condvar to wait for terminated threads
*/
pthread_cond_t threadterminated;
};
static void process_jobs(private_processor_t *this);
@ -80,7 +85,10 @@ static void restart(private_processor_t *this)
if (pthread_create(&thread, NULL, (void*)process_jobs, this) != 0)
{
pthread_mutex_lock(&this->mutex);
this->total_threads--;
pthread_cond_broadcast(&this->threadterminated);
pthread_mutex_unlock(&this->mutex);
}
}
@ -103,7 +111,7 @@ static void process_jobs(private_processor_t *this)
if (this->list->get_count(this->list) == 0)
{
this->idle_threads++;
pthread_cond_wait(&this->condvar, &this->mutex);
pthread_cond_wait(&this->jobadded, &this->mutex);
this->idle_threads--;
continue;
}
@ -116,7 +124,7 @@ static void process_jobs(private_processor_t *this)
pthread_mutex_lock(&this->mutex);
}
this->total_threads--;
pthread_cond_broadcast(&this->condvar);
pthread_cond_signal(&this->threadterminated);
pthread_mutex_unlock(&this->mutex);
}
@ -125,7 +133,11 @@ static void process_jobs(private_processor_t *this)
*/
static u_int get_total_threads(private_processor_t *this)
{
return this->total_threads;
u_int count;
pthread_mutex_lock(&this->mutex);
count = this->total_threads;
pthread_mutex_unlock(&this->mutex);
return count;
}
/**
@ -133,7 +145,11 @@ static u_int get_total_threads(private_processor_t *this)
*/
static u_int get_idle_threads(private_processor_t *this)
{
return this->idle_threads;
u_int count;
pthread_mutex_lock(&this->mutex);
count = this->idle_threads;
pthread_mutex_unlock(&this->mutex);
return count;
}
/**
@ -155,8 +171,8 @@ static void queue_job(private_processor_t *this, job_t *job)
{
pthread_mutex_lock(&this->mutex);
this->list->insert_last(this->list, job);
pthread_cond_signal(&this->jobadded);
pthread_mutex_unlock(&this->mutex);
pthread_cond_signal(&this->condvar);
}
/**
@ -184,6 +200,7 @@ static void set_threads(private_processor_t *this, u_int count)
{ /* decrease thread count */
this->desired_threads = count;
}
pthread_cond_broadcast(&this->jobadded);
pthread_mutex_unlock(&this->mutex);
}
@ -196,8 +213,8 @@ static void destroy(private_processor_t *this)
pthread_mutex_lock(&this->mutex);
while (this->total_threads > 0)
{
pthread_cond_broadcast(&this->condvar);
pthread_cond_wait(&this->condvar, &this->mutex);
pthread_cond_broadcast(&this->jobadded);
pthread_cond_wait(&this->threadterminated, &this->mutex);
}
pthread_mutex_unlock(&this->mutex);
this->list->destroy_offset(this->list, offsetof(job_t, destroy));
@ -220,7 +237,8 @@ processor_t *processor_create(size_t pool_size)
this->list = linked_list_create();
pthread_mutex_init(&this->mutex, NULL);
pthread_cond_init(&this->condvar, NULL);
pthread_cond_init(&this->jobadded, NULL);
pthread_cond_init(&this->threadterminated, NULL);
this->total_threads = 0;
this->desired_threads = 0;
this->idle_threads = 0;