Make rescheduling a job more predictable

This avoids race conditions between calls to cancel() and jobs that like
to be rescheduled.  If jobs were able to reschedule themselves it would
theoretically be possible that two worker threads have the same job
assigned (the one currently executing the job and the one executing the
same but rescheduled job if it already is time to execute it), this means
that cancel() could be called twice for that job.

Creating a new job based on the current one and reschedule that is also
OK, but rescheduling itself is more efficient for jobs that need to be
executed often.
This commit is contained in:
Tobias Brunner 2012-06-21 10:10:25 +02:00
parent 26d77eb3e6
commit e0efd7c121
4 changed files with 90 additions and 49 deletions

View File

@ -345,12 +345,7 @@ static job_requeue_t send_status(private_ha_segments_t *this)
message->destroy(message);
/* schedule next invocation */
lib->scheduler->schedule_job_ms(lib->scheduler, (job_t*)
callback_job_create((callback_job_cb_t)
send_status, this, NULL, NULL),
this->heartbeat_delay);
return JOB_REQUEUE_NONE;
return JOB_RESCHEDULE_MS(this->heartbeat_delay);
}
METHOD(ha_segments_t, is_active, bool,

View File

@ -55,7 +55,7 @@ METHOD(job_t, execute, job_requeue_t,
private_inactivity_job_t *this)
{
ike_sa_t *ike_sa;
bool rescheduled = FALSE;
u_int32_t reschedule = 0;
ike_sa = charon->ike_sa_manager->checkout_by_id(charon->ike_sa_manager,
this->reqid, TRUE);
@ -87,9 +87,7 @@ METHOD(job_t, execute, job_requeue_t,
}
else
{
lib->scheduler->schedule_job(lib->scheduler,
&this->public.job_interface, this->timeout - diff);
rescheduled = TRUE;
reschedule = this->timeout - diff;
}
}
children++;
@ -121,9 +119,9 @@ METHOD(job_t, execute, job_requeue_t,
charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
}
}
if (rescheduled)
if (reschedule)
{
return JOB_REQUEUE_SCHEDULED;
return JOB_RESCHEDULE(reschedule);
}
return JOB_REQUEUE_NONE;
}

View File

@ -25,8 +25,9 @@
typedef struct job_t job_t;
typedef enum job_priority_t job_priority_t;
typedef enum job_requeue_t job_requeue_t;
typedef enum job_status_t job_status_t;
typedef enum job_requeue_type_t job_requeue_type_t;
typedef struct job_requeue_t job_requeue_t;
#include <library.h>
@ -50,23 +51,6 @@ enum job_priority_t {
*/
extern enum_name_t *job_priority_names;
/**
* Job requeueing policy.
*
* The job requeueing policy defines how a job is handled after it has been
* executed.
*/
enum job_requeue_t {
/** Do not requeue job, destroy it */
JOB_REQUEUE_NONE = 0,
/** Requeue the job fairly, i.e. it is inserted at the end of the queue */
JOB_REQUEUE_FAIR,
/** Reexecute the job directly, without the need of requeueing it */
JOB_REQUEUE_DIRECT,
/** For jobs that rescheduled themselves via scheduler_t */
JOB_REQUEUE_SCHEDULED,
};
/**
* Job status
*/
@ -81,6 +65,54 @@ enum job_status_t {
JOB_STATUS_DONE,
};
/**
* How a job is handled after is has been executed.
*/
enum job_requeue_type_t {
/** Do not requeue job, destroy it */
JOB_REQUEUE_TYPE_NONE = 0,
/** Requeue the job fairly, i.e. it is inserted at the end of the queue */
JOB_REQUEUE_TYPE_FAIR,
/** Reexecute the job directly, without the need of requeueing it */
JOB_REQUEUE_TYPE_DIRECT,
/** Rescheduled the job via scheduler_t */
JOB_REQUEUE_TYPE_SCHEDULE,
};
/**
* Job requeueing policy.
*
* The job requeueing policy defines how a job is handled after it has been
* executed.
*/
struct job_requeue_t {
/** How to handle the job after executing it */
job_requeue_type_t type;
/** How to reschedule the job, if so */
enum {
JOB_SCHEDULE,
JOB_SCHEDULE_MS,
JOB_SCHEDULE_TV,
} schedule;
/** Time to reschedule the job */
union {
u_int32_t rel;
timeval_t abs;
} time;
};
/**
* Helper macros to easily define requeueing policies.
*/
#define __JOB_REQUEUE(t) (job_requeue_t){ .type = t }
#define JOB_REQUEUE_NONE __JOB_REQUEUE(JOB_REQUEUE_TYPE_NONE)
#define JOB_REQUEUE_FAIR __JOB_REQUEUE(JOB_REQUEUE_TYPE_FAIR)
#define JOB_REQUEUE_DIRECT __JOB_REQUEUE(JOB_REQUEUE_TYPE_DIRECT)
#define __JOB_RESCHEDULE(t, ...) (job_requeue_t){ .type = JOB_REQUEUE_TYPE_SCHEDULE, .schedule = t, { __VA_ARGS__ } }
#define JOB_RESCHEDULE(s) __JOB_RESCHEDULE(JOB_SCHEDULE, .rel = s)
#define JOB_RESCHEDULE_MS(ms) __JOB_RESCHEDULE(JOB_SCHEDULE_MS, .rel = ms)
#define JOB_RESCHEDULE_TV(tv) __JOB_RESCHEDULE(JOB_SCHEDULE_TV, .abs = tv)
/**
* Job interface as it is stored in the job queue.
*/

View File

@ -217,13 +217,13 @@ static void process_jobs(worker_thread_t *worker)
while (TRUE)
{
requeue = worker->job->execute(worker->job);
if (requeue != JOB_REQUEUE_DIRECT)
if (requeue.type != JOB_REQUEUE_TYPE_DIRECT)
{
break;
}
else if (!worker->job->cancel)
{ /* only allow cancelable jobs to requeue directly */
requeue = JOB_REQUEUE_FAIR;
requeue.type = JOB_REQUEUE_TYPE_FAIR;
break;
}
}
@ -234,25 +234,41 @@ static void process_jobs(worker_thread_t *worker)
{ /* job was canceled via a custom cancel() method or did not
* use JOB_REQUEUE_TYPE_DIRECT */
worker->job->destroy(worker->job);
break;
}
else
switch (requeue.type)
{
switch (requeue)
{
case JOB_REQUEUE_NONE:
worker->job->status = JOB_STATUS_DONE;
worker->job->destroy(worker->job);
break;
case JOB_REQUEUE_FAIR:
worker->job->status = JOB_STATUS_QUEUED;
this->jobs[i]->insert_last(this->jobs[i],
worker->job);
this->job_added->signal(this->job_added);
break;
case JOB_REQUEUE_SCHEDULED:
default:
break;
}
case JOB_REQUEUE_TYPE_NONE:
worker->job->status = JOB_STATUS_DONE;
worker->job->destroy(worker->job);
break;
case JOB_REQUEUE_TYPE_FAIR:
worker->job->status = JOB_STATUS_QUEUED;
this->jobs[i]->insert_last(this->jobs[i],
worker->job);
this->job_added->signal(this->job_added);
break;
case JOB_REQUEUE_TYPE_SCHEDULE:
/* scheduler_t does not hold its lock when queeuing jobs
* so this should be safe without unlocking our mutex */
switch (requeue.schedule)
{
case JOB_SCHEDULE:
lib->scheduler->schedule_job(lib->scheduler,
worker->job, requeue.time.rel);
break;
case JOB_SCHEDULE_MS:
lib->scheduler->schedule_job_ms(lib->scheduler,
worker->job, requeue.time.rel);
break;
case JOB_SCHEDULE_TV:
lib->scheduler->schedule_job_tv(lib->scheduler,
worker->job, requeue.time.abs);
break;
}
break;
default:
break;
}
break;
}