pluto: Added a generic event queue.

This allows to easily execute arbitrary callbacks in the context of the pluto
main thread (e.g. in order to synchronize with threads from the thread-pool).
This commit is contained in:
Tobias Brunner 2010-07-30 11:51:15 +02:00
parent 4cf459a054
commit 8808edfb4c
6 changed files with 292 additions and 1 deletions

View File

@ -18,6 +18,7 @@ db_ops.c db_ops.h \
defs.c defs.h \
demux.c demux.h \
dnskey.c dnskey.h \
event_queue.c event_queue.h \
fetch.c fetch.h \
foodgroups.c foodgroups.h \
ike_alg.c ike_alg.h \

195
src/pluto/event_queue.c Normal file
View File

@ -0,0 +1,195 @@
/*
* Copyright (C) 2010 Tobias Brunner
* Hochschule fuer Technik Rapperswil
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*/
#include <unistd.h>
#include <fcntl.h>
#include "event_queue.h"
#include <debug.h>
#include <threading/mutex.h>
#include <utils/linked_list.h>
typedef struct private_event_queue_t private_event_queue_t;
/**
* Private data of event_queue_t class.
*/
struct private_event_queue_t {
/**
* Public event_queue_t interface.
*/
event_queue_t public;
/**
* List of queued events (event_t*).
*/
linked_list_t *events;
/**
* Mutex for event list.
*/
mutex_t *mutex;
/**
* Read end of the notification pipe.
*/
int read_fd;
/**
* Write end of the notification pipe.
*/
int write_fd;
};
typedef struct event_t event_t;
struct event_t {
/**
* Callback function.
*/
void (*callback)(void *data);
/**
* Data to supply to the callback.
*/
void *data;
/**
* Cleanup function.
*/
void (*cleanup)(void *data);
};
static event_t *event_create(void (*callback)(void *data), void *data,
void (*cleanup)(void *data))
{
event_t *this;
INIT(this,
.callback = callback,
.data = data,
.cleanup = cleanup,
);
return this;
}
static void event_destroy(event_t *this)
{
if (this->cleanup)
{
this->cleanup(this->data);
}
free(this);
}
METHOD(event_queue_t, get_event_fd, int,
private_event_queue_t *this)
{
return this->read_fd;
}
METHOD(event_queue_t, handle, void,
private_event_queue_t *this)
{
char buf[10];
linked_list_t *events;
event_t *event;
this->mutex->lock(this->mutex);
/* flush pipe */
while (read(this->read_fd, &buf, sizeof(buf)) == sizeof(buf));
/* replace the list, so we can unlock the mutex while executing the jobs */
events = this->events;
this->events = linked_list_create();
this->mutex->unlock(this->mutex);
while (events->remove_first(events, (void**)&event) == SUCCESS)
{
event->callback(event->data);
event_destroy(event);
}
events->destroy(events);
}
METHOD(event_queue_t, queue, void,
private_event_queue_t *this, void (*callback)(void *data), void *data,
void (*cleanup)(void *data))
{
event_t *event = event_create(callback, data, cleanup);
char c = 0;
this->mutex->lock(this->mutex);
this->events->insert_last(this->events, event);
ignore_result(write(this->write_fd, &c, 1));
this->mutex->unlock(this->mutex);
}
METHOD(event_queue_t, destroy, void,
private_event_queue_t *this)
{
this->mutex->lock(this->mutex);
this->events->destroy_function(this->events, (void*)event_destroy);
this->mutex->unlock(this->mutex);
this->mutex->destroy(this->mutex);
close(this->read_fd);
close(this->write_fd);
free(this);
}
bool set_nonblock(int socket)
{
int flags = fcntl(socket, F_GETFL);
return flags != -1 && fcntl(socket, F_SETFL, flags | O_NONBLOCK) != -1;
}
bool set_cloexec(int socket)
{
int flags = fcntl(socket, F_GETFD);
return flags != -1 && fcntl(socket, F_SETFD, flags | FD_CLOEXEC) != -1;
}
/*
* Described in header.
*/
event_queue_t *event_queue_create()
{
private_event_queue_t *this;
int fd[2];
INIT(this,
.public = {
.get_event_fd = _get_event_fd,
.handle = _handle,
.queue = _queue,
.destroy = _destroy,
},
.events = linked_list_create(),
.mutex = mutex_create(MUTEX_TYPE_DEFAULT),
);
if (pipe(fd) == -1 ||
!set_nonblock(fd[0]) || !set_cloexec(fd[0]) ||
!set_nonblock(fd[1]) || !set_cloexec(fd[1]))
{
DBG1(DBG_JOB, "failed to create pipe for job queue");
_destroy(this);
return NULL;
}
this->read_fd = fd[0];
this->write_fd = fd[1];
return &this->public;
}

69
src/pluto/event_queue.h Normal file
View File

@ -0,0 +1,69 @@
/*
* Copyright (C) 2010 Tobias Brunner
* Hochschule fuer Technik Rapperswil
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*/
/**
* @defgroup event_queue event_queue
* @{ @ingroup pluto
*/
#ifndef EVENT_QUEUE_H_
#define EVENT_QUEUE_H_
typedef struct event_queue_t event_queue_t;
/**
* The event queue facility can be used to synchronize thread-pool threads
* with the pluto main thread. That is, all queued callbacks are executed
* asynchronously by the pluto main thread.
*/
struct event_queue_t {
/**
* Returns the file descriptor used to notify the main thread.
*
* @return fd to use in the main thread
*/
int (*get_event_fd) (event_queue_t *this);
/**
* Handle all queued events.
*/
void (*handle) (event_queue_t *this);
/**
* Add an event to the queue.
*
* @param callback callback function to add to the queue
* @param data data supplied to the callback function
* @param cleanup optional cleanup function
*/
void (*queue) (event_queue_t *this, void (*callback)(void *data),
void *data, void (*cleanup)(void *data));
/**
* Destroy this instance.
*/
void (*destroy) (event_queue_t *this);
};
/**
* Create the event queue.
*
* @return created object
*/
event_queue_t *event_queue_create();
#endif /** EVENT_QUEUE_H_ @}*/

View File

@ -41,6 +41,7 @@ pluto_t *pluto;
void pluto_deinit()
{
private_pluto_t *this = (private_pluto_t*)pluto;
this->public.events->destroy(this->public.events);
this->public.xauth->destroy(this->public.xauth);
free(this);
pluto = NULL;
@ -55,6 +56,7 @@ bool pluto_init(char *file)
INIT(this,
.public = {
.events = event_queue_create(),
.xauth = xauth_manager_create(),
},
);

View File

@ -31,6 +31,7 @@
typedef struct pluto_t pluto_t;
#include <event_queue.h>
#include <xauth/xauth_manager.h>
#include <library.h>
@ -40,10 +41,16 @@ typedef struct pluto_t pluto_t;
*/
struct pluto_t {
/**
* event queue (callbacks, executed by the pluto main thread)
*/
event_queue_t *events;
/**
* manager for payload attributes
*/
xauth_manager_t *xauth;
};
/**

View File

@ -56,6 +56,7 @@
#include "adns.h" /* needs <resolv.h> */
#include "dnskey.h" /* needs keys.h and adns.h */
#include "whack.h" /* for RC_LOG_SERIOUS */
#include "pluto.h"
#include <pfkeyv2.h>
#include <pfkey.h>
@ -811,7 +812,7 @@ call_server(void)
{
fd_set readfds;
fd_set writefds;
int ndes;
int ndes, events_fd;
/* wait for next interesting thing */
@ -853,6 +854,11 @@ call_server(void)
FD_SET(adns_afd, &readfds);
}
events_fd = pluto->events->get_event_fd(pluto->events);
if (maxfd < events_fd)
maxfd = events_fd;
FD_SET(events_fd, &readfds);
#ifdef KLIPS
if (!no_klips)
{
@ -947,6 +953,17 @@ call_server(void)
ndes--;
}
if (FD_ISSET(events_fd, &readfds))
{
passert(ndes > 0);
DBG(DBG_CONTROL,
DBG_log(BLANK_FORMAT);
DBG_log("*handling asynchronous events"));
pluto->events->handle(pluto->events);
passert(GLOBALS_ARE_RESET());
ndes--;
}
#ifdef KLIPS
if (!no_klips && FD_ISSET(*kernel_ops->async_fdp, &readfds))
{