Processor job scheduling respects job priority classes

This commit is contained in:
Martin Willi 2011-05-02 11:28:04 +02:00
parent f77203bbfb
commit c73d4f53f5
3 changed files with 55 additions and 17 deletions

View File

@ -407,7 +407,7 @@ METHOD(stroke_list_t, status, void,
host_t *host;
u_int32_t dpd;
time_t since, now;
u_int size, online, offline;
u_int size, online, offline, i;
now = time_monotonic(NULL);
since = time(NULL) - (now - this->uptime);
@ -424,9 +424,13 @@ METHOD(stroke_list_t, status, void,
fprintf(out, " worker threads: %d idle of %d,",
lib->processor->get_idle_threads(lib->processor),
lib->processor->get_total_threads(lib->processor));
fprintf(out, " job queue load: %d,",
lib->processor->get_job_load(lib->processor));
fprintf(out, " scheduled events: %d\n",
fprintf(out, " job queue load: ");
for (i = 0; i < JOB_PRIO_MAX; i++)
{
fprintf(out, "%s%d", i == 0 ? "" : "/",
lib->processor->get_job_load(lib->processor, i));
}
fprintf(out, ", scheduled events: %d\n",
lib->scheduler->get_job_load(lib->scheduler));
fprintf(out, " loaded plugins: ");
enumerator = lib->plugins->create_plugin_enumerator(lib->plugins);

View File

@ -34,6 +34,7 @@ typedef struct private_processor_t private_processor_t;
* Private data of processor_t class.
*/
struct private_processor_t {
/**
* Public processor_t interface.
*/
@ -61,12 +62,12 @@ struct private_processor_t {
linked_list_t *threads;
/**
* The jobs are stored in a linked list
* A list of queued jobs for each priority
*/
linked_list_t *list;
linked_list_t *jobs[JOB_PRIO_MAX];
/**
* access to linked_list is locked through this mutex
* access to job lists is locked through this mutex
*/
mutex_t *mutex;
@ -120,16 +121,24 @@ static void process_jobs(private_processor_t *this)
this->mutex->lock(this->mutex);
while (this->desired_threads >= this->total_threads)
{
job_t *job;
job_t *job = NULL;
int i;
if (this->list->get_count(this->list) == 0)
for (i = 0; i < JOB_PRIO_MAX; i++)
{
if (this->jobs[i]->remove_first(this->jobs[i],
(void**)&job) == SUCCESS)
{
break;
}
}
if (!job)
{
this->idle_threads++;
this->job_added->wait(this->job_added, this->mutex);
this->idle_threads--;
continue;
}
this->list->remove_first(this->list, (void**)&job);
this->mutex->unlock(this->mutex);
/* terminated threads are restarted, so we have a constant pool */
thread_cleanup_push((thread_cleanup_t)restart, this);
@ -164,13 +173,26 @@ METHOD(processor_t, get_idle_threads, u_int,
return count;
}
/**
* Check priority bounds
*/
static job_priority_t sane_prio(job_priority_t prio)
{
if (prio < 0 || prio >= JOB_PRIO_MAX)
{
return JOB_PRIO_MAX - 1;
}
return prio;
}
METHOD(processor_t, get_job_load, u_int,
private_processor_t *this)
private_processor_t *this, job_priority_t prio)
{
u_int load;
prio = sane_prio(prio);
this->mutex->lock(this->mutex);
load = this->list->get_count(this->list);
load = this->jobs[prio]->get_count(this->jobs[prio]);
this->mutex->unlock(this->mutex);
return load;
}
@ -178,8 +200,11 @@ METHOD(processor_t, get_job_load, u_int,
METHOD(processor_t, queue_job, void,
private_processor_t *this, job_t *job)
{
job_priority_t prio;
prio = sane_prio(job->get_priority(job));
this->mutex->lock(this->mutex);
this->list->insert_last(this->list, job);
this->jobs[prio]->insert_last(this->jobs[prio], job);
this->job_added->signal(this->job_added);
this->mutex->unlock(this->mutex);
}
@ -217,6 +242,7 @@ METHOD(processor_t, destroy, void,
private_processor_t *this)
{
thread_t *current;
int i;
set_threads(this, 0);
this->mutex->lock(this->mutex);
@ -234,7 +260,10 @@ METHOD(processor_t, destroy, void,
this->thread_terminated->destroy(this->thread_terminated);
this->job_added->destroy(this->job_added);
this->mutex->destroy(this->mutex);
this->list->destroy_offset(this->list, offsetof(job_t, destroy));
for (i = 0; i < JOB_PRIO_MAX; i++)
{
this->jobs[i]->destroy_offset(this->jobs[i], offsetof(job_t, destroy));
}
this->threads->destroy(this->threads);
free(this);
}
@ -245,6 +274,7 @@ METHOD(processor_t, destroy, void,
processor_t *processor_create()
{
private_processor_t *this;
int i;
INIT(this,
.public = {
@ -255,12 +285,15 @@ processor_t *processor_create()
.set_threads = _set_threads,
.destroy = _destroy,
},
.list = linked_list_create(),
.threads = linked_list_create(),
.mutex = mutex_create(MUTEX_TYPE_DEFAULT),
.job_added = condvar_create(CONDVAR_TYPE_DEFAULT),
.thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT),
);
for (i = 0; i < JOB_PRIO_MAX; i++)
{
this->jobs[i] = linked_list_create();
}
return &this->public;
}

View File

@ -49,11 +49,12 @@ struct processor_t {
u_int (*get_idle_threads) (processor_t *this);
/**
* Get the number of queued jobs.
* Get the number of queued jobs for a specified priority.
*
* @param prio priority class to get job load for
* @return number of items in queue
*/
u_int (*get_job_load) (processor_t *this);
u_int (*get_job_load) (processor_t *this, job_priority_t prio);
/**
* Adds a job to the queue.