Callback job refactored and fixed.
This commit is contained in:
parent
89ec5bef08
commit
b97cc0ab3f
|
@ -1,4 +1,5 @@
|
|||
/*
|
||||
* Copyright (C) 2009 Tobias Brunner
|
||||
* Copyright (C) 2007 Martin Willi
|
||||
* Hochschule fuer Technik Rapperswil
|
||||
*
|
||||
|
@ -15,9 +16,11 @@
|
|||
|
||||
#include "callback_job.h"
|
||||
|
||||
#include <pthread.h>
|
||||
#include <semaphore.h>
|
||||
|
||||
#include <daemon.h>
|
||||
#include <threading/thread.h>
|
||||
#include <threading/condvar.h>
|
||||
#include <threading/mutex.h>
|
||||
|
||||
typedef struct private_callback_job_t private_callback_job_t;
|
||||
|
@ -47,9 +50,9 @@ struct private_callback_job_t {
|
|||
callback_job_cleanup_t cleanup;
|
||||
|
||||
/**
|
||||
* thread ID of the job, if running
|
||||
* thread of the job, if running
|
||||
*/
|
||||
pthread_t thread;
|
||||
thread_t *thread;
|
||||
|
||||
/**
|
||||
* mutex to access jobs interna
|
||||
|
@ -65,69 +68,113 @@ struct private_callback_job_t {
|
|||
* parent of this job, or NULL
|
||||
*/
|
||||
private_callback_job_t *parent;
|
||||
|
||||
/**
|
||||
* TRUE if the job got cancelled
|
||||
*/
|
||||
bool cancelled;
|
||||
|
||||
/**
|
||||
* condvar to synchronize the cancellation/destruction of the job
|
||||
*/
|
||||
condvar_t *destroyable;
|
||||
|
||||
/**
|
||||
* semaphore to synchronize the termination of the assigned thread.
|
||||
*
|
||||
* separately allocated during cancellation, so that we can wait on it
|
||||
* without risking that it gets freed too early during destruction.
|
||||
*/
|
||||
sem_t *terminated;
|
||||
};
|
||||
|
||||
/**
|
||||
* unregister a child from its parent, if any.
|
||||
* note: this->mutex has to be locked
|
||||
*/
|
||||
static void unregister(private_callback_job_t *this)
|
||||
{
|
||||
if (this->parent)
|
||||
{
|
||||
this->parent->mutex->lock(this->parent->mutex);
|
||||
if (this->parent->cancelled && !this->cancelled)
|
||||
{
|
||||
/* if the parent has been cancelled but we have not yet, we do not
|
||||
* unregister until we got cancelled by the parent. */
|
||||
this->parent->mutex->unlock(this->parent->mutex);
|
||||
this->destroyable->wait(this->destroyable, this->mutex);
|
||||
this->parent->mutex->lock(this->parent->mutex);
|
||||
}
|
||||
this->parent->children->remove(this->parent->children, this, NULL);
|
||||
this->parent->mutex->unlock(this->parent->mutex);
|
||||
this->parent = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements job_t.destroy.
|
||||
*/
|
||||
static void destroy(private_callback_job_t *this)
|
||||
{
|
||||
this->mutex->lock(this->mutex);
|
||||
unregister(this);
|
||||
if (this->cleanup)
|
||||
{
|
||||
this->cleanup(this->data);
|
||||
}
|
||||
if (this->terminated)
|
||||
{
|
||||
sem_post(this->terminated);
|
||||
}
|
||||
this->children->destroy(this->children);
|
||||
this->destroyable->destroy(this->destroyable);
|
||||
this->mutex->unlock(this->mutex);
|
||||
this->mutex->destroy(this->mutex);
|
||||
free(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* unregister a child from its parent, if any.
|
||||
*/
|
||||
static void unregister(private_callback_job_t *this)
|
||||
{
|
||||
if (this->parent)
|
||||
{
|
||||
iterator_t *iterator;
|
||||
private_callback_job_t *child;
|
||||
|
||||
this->parent->mutex->lock(this->parent->mutex);
|
||||
iterator = this->parent->children->create_iterator(this->parent->children, TRUE);
|
||||
while (iterator->iterate(iterator, (void**)&child))
|
||||
{
|
||||
if (child == this)
|
||||
{
|
||||
iterator->remove(iterator);
|
||||
break;
|
||||
}
|
||||
}
|
||||
iterator->destroy(iterator);
|
||||
this->parent->mutex->unlock(this->parent->mutex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Implementation of callback_job_t.cancel.
|
||||
*/
|
||||
static void cancel(private_callback_job_t *this)
|
||||
{
|
||||
pthread_t thread;
|
||||
callback_job_t *child;
|
||||
sem_t *terminated = NULL;
|
||||
|
||||
this->mutex->lock(this->mutex);
|
||||
thread = this->thread;
|
||||
|
||||
/* terminate its children */
|
||||
this->children->invoke_offset(this->children, offsetof(callback_job_t, cancel));
|
||||
this->cancelled = TRUE;
|
||||
/* terminate children */
|
||||
while (this->children->get_first(this->children, (void**)&child) == SUCCESS)
|
||||
{
|
||||
this->mutex->unlock(this->mutex);
|
||||
child->cancel(child);
|
||||
this->mutex->lock(this->mutex);
|
||||
}
|
||||
if (this->thread)
|
||||
{
|
||||
/* terminate the thread, if there is currently one executing the job.
|
||||
* we wait for its termination using a semaphore */
|
||||
this->thread->cancel(this->thread);
|
||||
terminated = this->terminated = malloc_thing(sem_t);
|
||||
sem_init(terminated, 0, 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* if the job is currently queued, it gets terminated later.
|
||||
* we can't wait, because it might not get executed at all.
|
||||
* we also unregister the queued job manually from its parent (the
|
||||
* others get unregistered during destruction) */
|
||||
unregister(this);
|
||||
}
|
||||
this->destroyable->signal(this->destroyable);
|
||||
this->mutex->unlock(this->mutex);
|
||||
|
||||
/* terminate thread */
|
||||
if (thread)
|
||||
if (terminated)
|
||||
{
|
||||
pthread_cancel(thread);
|
||||
pthread_join(thread, NULL);
|
||||
sem_wait(terminated);
|
||||
sem_destroy(terminated);
|
||||
free(terminated);
|
||||
}
|
||||
/* avoid later execution of a cancelled job */
|
||||
this->callback = NULL;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -137,18 +184,22 @@ static void execute(private_callback_job_t *this)
|
|||
{
|
||||
bool cleanup = FALSE;
|
||||
|
||||
thread_cleanup_push((thread_cleanup_t)destroy, this);
|
||||
|
||||
this->mutex->lock(this->mutex);
|
||||
this->thread = pthread_self();
|
||||
this->thread = thread_current();
|
||||
this->mutex->unlock(this->mutex);
|
||||
|
||||
pthread_cleanup_push((void*)destroy, this);
|
||||
while (TRUE)
|
||||
{
|
||||
if (this->callback == NULL)
|
||||
this->mutex->lock(this->mutex);
|
||||
if (this->cancelled)
|
||||
{
|
||||
this->mutex->unlock(this->mutex);
|
||||
cleanup = TRUE;
|
||||
break;
|
||||
}
|
||||
this->mutex->unlock(this->mutex);
|
||||
switch (this->callback(this->data))
|
||||
{
|
||||
case JOB_REQUEUE_DIRECT:
|
||||
|
@ -168,9 +219,13 @@ static void execute(private_callback_job_t *this)
|
|||
}
|
||||
break;
|
||||
}
|
||||
this->thread = 0;
|
||||
unregister(this);
|
||||
pthread_cleanup_pop(cleanup);
|
||||
this->mutex->lock(this->mutex);
|
||||
this->thread = NULL;
|
||||
this->mutex->unlock(this->mutex);
|
||||
/* manually create a cancellation point to avoid that a cancelled thread
|
||||
* goes back into the thread pool */
|
||||
thread_cancellation_point();
|
||||
thread_cleanup_pop(cleanup);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -195,6 +250,9 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
|
|||
this->thread = 0;
|
||||
this->children = linked_list_create();
|
||||
this->parent = (private_callback_job_t*)parent;
|
||||
this->cancelled = FALSE;
|
||||
this->destroyable = condvar_create(CONDVAR_TYPE_DEFAULT);
|
||||
this->terminated = NULL;
|
||||
|
||||
/* register us at parent */
|
||||
if (parent)
|
||||
|
|
|
@ -90,8 +90,9 @@ struct callback_job_t {
|
|||
job_t job_interface;
|
||||
|
||||
/**
|
||||
* Cancel the jobs thread and wait for its termination.
|
||||
*/
|
||||
* Cancel the job's thread and wait for its termination. This only works
|
||||
* reliably for jobs that always use JOB_REQUEUE_FAIR or JOB_REQUEUE_DIRECT,
|
||||
* otherwise the job may already be destroyed when cancel is called. */
|
||||
void (*cancel)(callback_job_t *this);
|
||||
};
|
||||
|
||||
|
|
Loading…
Reference in New Issue