Count number of threads active in each class, and reserve threads only if none active

This commit is contained in:
Martin Willi 2011-05-04 15:32:31 +02:00
parent a694b481ee
commit 877fdcf0b8
2 changed files with 66 additions and 17 deletions

View File

@ -51,9 +51,9 @@ struct private_processor_t {
u_int desired_threads;
/**
* Number of threads waiting for work
* Number of threads currently working, for each priority
*/
u_int idle_threads;
u_int working_threads[JOB_PRIO_MAX];
/**
* All threads managed in the pool (including threads that have been
@ -113,6 +113,29 @@ static void restart(private_processor_t *this)
this->mutex->unlock(this->mutex);
}
/**
* Decrement working thread count of a priority class
*/
static void decrement_working_threads(u_int *working_threads)
{
(*working_threads)--;
}
/**
* Get number of idle threads, non-locking variant
*/
static u_int get_idle_threads_nolock(private_processor_t *this)
{
u_int count, i;
count = this->total_threads;
for (i = 0; i < JOB_PRIO_MAX; i++)
{
count -= this->working_threads[i];
}
return count;
}
/**
* Process queued jobs, called by the worker threads
*/
@ -127,37 +150,43 @@ static void process_jobs(private_processor_t *this)
while (this->desired_threads >= this->total_threads)
{
job_t *job = NULL;
int i, prio_sum = 0;
int i, reserved = 0, idle;
idle = get_idle_threads_nolock(this);
for (i = 0; i < JOB_PRIO_MAX; i++)
{
if (prio_sum && prio_sum >= this->idle_threads)
if (reserved && reserved >= idle)
{
DBG2(DBG_JOB, "delaying %N priority jobs: %d threads idle, "
"but %d reserved for higher priorities",
job_priority_names, i, this->idle_threads, prio_sum);
job_priority_names, i, idle, reserved);
break;
}
prio_sum += this->prio_threads[i];
if (this->working_threads[i] < this->prio_threads[i])
{
reserved += this->prio_threads[i] - this->working_threads[i];
}
if (this->jobs[i]->remove_first(this->jobs[i],
(void**)&job) == SUCCESS)
{
this->working_threads[i]++;
this->mutex->unlock(this->mutex);
thread_cleanup_push((thread_cleanup_t)decrement_working_threads,
&this->working_threads[i]);
/* terminated threads are restarted to get a constant pool */
thread_cleanup_push((thread_cleanup_t)restart, this);
job->execute(job);
thread_cleanup_pop(FALSE);
this->mutex->lock(this->mutex);
thread_cleanup_pop(TRUE);
break;
}
}
if (!job)
{
this->idle_threads++;
this->job_added->wait(this->job_added, this->mutex);
this->idle_threads--;
continue;
}
this->mutex->unlock(this->mutex);
/* terminated threads are restarted, so we have a constant pool */
thread_cleanup_push((thread_cleanup_t)restart, this);
job->execute(job);
thread_cleanup_pop(FALSE);
this->mutex->lock(this->mutex);
}
this->total_threads--;
this->thread_terminated->signal(this->thread_terminated);
@ -181,7 +210,7 @@ METHOD(processor_t, get_idle_threads, u_int,
u_int count;
this->mutex->lock(this->mutex);
count = this->idle_threads;
count = get_idle_threads_nolock(this);
this->mutex->unlock(this->mutex);
return count;
}
@ -198,6 +227,17 @@ static job_priority_t sane_prio(job_priority_t prio)
return prio;
}
METHOD(processor_t, get_working_threads, u_int,
private_processor_t *this, job_priority_t prio)
{
u_int count;
this->mutex->lock(this->mutex);
count = this->working_threads[sane_prio(prio)];
this->mutex->unlock(this->mutex);
return count;
}
METHOD(processor_t, get_job_load, u_int,
private_processor_t *this, job_priority_t prio)
{
@ -293,6 +333,7 @@ processor_t *processor_create()
.public = {
.get_total_threads = _get_total_threads,
.get_idle_threads = _get_idle_threads,
.get_working_threads = _get_working_threads,
.get_job_load = _get_job_load,
.queue_job = _queue_job,
.set_threads = _set_threads,

View File

@ -42,12 +42,20 @@ struct processor_t {
u_int (*get_total_threads) (processor_t *this);
/**
* Get the number of threads currently waiting.
* Get the number of threads currently waiting for work.
*
* @return number of idle threads
*/
u_int (*get_idle_threads) (processor_t *this);
/**
* Get the number of threads currently working, per priority class.
*
* @param prioritiy to check
* @return number of threads in priority working
*/
u_int (*get_working_threads)(processor_t *this, job_priority_t prio);
/**
* Get the number of queued jobs for a specified priority.
*