From 8808edfb4c0d8761ba7db9508823cf7004684707 Mon Sep 17 00:00:00 2001 From: Tobias Brunner Date: Fri, 30 Jul 2010 11:51:15 +0200 Subject: [PATCH] pluto: Added a generic event queue. This allows to easily execute arbitrary callbacks in the context of the pluto main thread (e.g. in order to synchronize with threads from the thread-pool). --- src/pluto/Makefile.am | 1 + src/pluto/event_queue.c | 195 ++++++++++++++++++++++++++++++++++++++++ src/pluto/event_queue.h | 69 ++++++++++++++ src/pluto/pluto.c | 2 + src/pluto/pluto.h | 7 ++ src/pluto/server.c | 19 +++- 6 files changed, 292 insertions(+), 1 deletion(-) create mode 100644 src/pluto/event_queue.c create mode 100644 src/pluto/event_queue.h diff --git a/src/pluto/Makefile.am b/src/pluto/Makefile.am index 04835f3a4..ba4bde5a4 100644 --- a/src/pluto/Makefile.am +++ b/src/pluto/Makefile.am @@ -18,6 +18,7 @@ db_ops.c db_ops.h \ defs.c defs.h \ demux.c demux.h \ dnskey.c dnskey.h \ +event_queue.c event_queue.h \ fetch.c fetch.h \ foodgroups.c foodgroups.h \ ike_alg.c ike_alg.h \ diff --git a/src/pluto/event_queue.c b/src/pluto/event_queue.c new file mode 100644 index 000000000..55d064f26 --- /dev/null +++ b/src/pluto/event_queue.c @@ -0,0 +1,195 @@ +/* + * Copyright (C) 2010 Tobias Brunner + * Hochschule fuer Technik Rapperswil + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See . + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#include +#include + +#include "event_queue.h" + +#include +#include +#include + +typedef struct private_event_queue_t private_event_queue_t; + +/** + * Private data of event_queue_t class. + */ +struct private_event_queue_t { + /** + * Public event_queue_t interface. + */ + event_queue_t public; + + /** + * List of queued events (event_t*). + */ + linked_list_t *events; + + /** + * Mutex for event list. + */ + mutex_t *mutex; + + /** + * Read end of the notification pipe. + */ + int read_fd; + + /** + * Write end of the notification pipe. + */ + int write_fd; + +}; + +typedef struct event_t event_t; + +struct event_t { + /** + * Callback function. + */ + void (*callback)(void *data); + + /** + * Data to supply to the callback. + */ + void *data; + + /** + * Cleanup function. + */ + void (*cleanup)(void *data); +}; + +static event_t *event_create(void (*callback)(void *data), void *data, + void (*cleanup)(void *data)) +{ + event_t *this; + INIT(this, + .callback = callback, + .data = data, + .cleanup = cleanup, + ); + return this; +} + +static void event_destroy(event_t *this) +{ + if (this->cleanup) + { + this->cleanup(this->data); + } + free(this); +} + +METHOD(event_queue_t, get_event_fd, int, + private_event_queue_t *this) +{ + return this->read_fd; +} + +METHOD(event_queue_t, handle, void, + private_event_queue_t *this) +{ + char buf[10]; + linked_list_t *events; + event_t *event; + this->mutex->lock(this->mutex); + /* flush pipe */ + while (read(this->read_fd, &buf, sizeof(buf)) == sizeof(buf)); + /* replace the list, so we can unlock the mutex while executing the jobs */ + events = this->events; + this->events = linked_list_create(); + this->mutex->unlock(this->mutex); + + while (events->remove_first(events, (void**)&event) == SUCCESS) + { + event->callback(event->data); + event_destroy(event); + } + events->destroy(events); +} + +METHOD(event_queue_t, queue, void, + private_event_queue_t *this, void (*callback)(void *data), void *data, + void (*cleanup)(void *data)) +{ + event_t *event = event_create(callback, data, cleanup); + char c = 0; + this->mutex->lock(this->mutex); + this->events->insert_last(this->events, event); + ignore_result(write(this->write_fd, &c, 1)); + this->mutex->unlock(this->mutex); +} + +METHOD(event_queue_t, destroy, void, + private_event_queue_t *this) +{ + this->mutex->lock(this->mutex); + this->events->destroy_function(this->events, (void*)event_destroy); + this->mutex->unlock(this->mutex); + this->mutex->destroy(this->mutex); + close(this->read_fd); + close(this->write_fd); + free(this); +} + +bool set_nonblock(int socket) +{ + int flags = fcntl(socket, F_GETFL); + return flags != -1 && fcntl(socket, F_SETFL, flags | O_NONBLOCK) != -1; +} + +bool set_cloexec(int socket) +{ + int flags = fcntl(socket, F_GETFD); + return flags != -1 && fcntl(socket, F_SETFD, flags | FD_CLOEXEC) != -1; +} + +/* + * Described in header. + */ +event_queue_t *event_queue_create() +{ + private_event_queue_t *this; + int fd[2]; + + INIT(this, + .public = { + .get_event_fd = _get_event_fd, + .handle = _handle, + .queue = _queue, + .destroy = _destroy, + }, + .events = linked_list_create(), + .mutex = mutex_create(MUTEX_TYPE_DEFAULT), + ); + + if (pipe(fd) == -1 || + !set_nonblock(fd[0]) || !set_cloexec(fd[0]) || + !set_nonblock(fd[1]) || !set_cloexec(fd[1])) + { + DBG1(DBG_JOB, "failed to create pipe for job queue"); + _destroy(this); + return NULL; + } + + this->read_fd = fd[0]; + this->write_fd = fd[1]; + + return &this->public; +} + diff --git a/src/pluto/event_queue.h b/src/pluto/event_queue.h new file mode 100644 index 000000000..343729e25 --- /dev/null +++ b/src/pluto/event_queue.h @@ -0,0 +1,69 @@ +/* + * Copyright (C) 2010 Tobias Brunner + * Hochschule fuer Technik Rapperswil + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See . + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +/** + * @defgroup event_queue event_queue + * @{ @ingroup pluto + */ + +#ifndef EVENT_QUEUE_H_ +#define EVENT_QUEUE_H_ + +typedef struct event_queue_t event_queue_t; + +/** + * The event queue facility can be used to synchronize thread-pool threads + * with the pluto main thread. That is, all queued callbacks are executed + * asynchronously by the pluto main thread. + */ +struct event_queue_t { + + /** + * Returns the file descriptor used to notify the main thread. + * + * @return fd to use in the main thread + */ + int (*get_event_fd) (event_queue_t *this); + + /** + * Handle all queued events. + */ + void (*handle) (event_queue_t *this); + + /** + * Add an event to the queue. + * + * @param callback callback function to add to the queue + * @param data data supplied to the callback function + * @param cleanup optional cleanup function + */ + void (*queue) (event_queue_t *this, void (*callback)(void *data), + void *data, void (*cleanup)(void *data)); + + /** + * Destroy this instance. + */ + void (*destroy) (event_queue_t *this); + +}; + +/** + * Create the event queue. + * + * @return created object + */ +event_queue_t *event_queue_create(); + +#endif /** EVENT_QUEUE_H_ @}*/ diff --git a/src/pluto/pluto.c b/src/pluto/pluto.c index e9c7c316b..66fdb30b9 100644 --- a/src/pluto/pluto.c +++ b/src/pluto/pluto.c @@ -41,6 +41,7 @@ pluto_t *pluto; void pluto_deinit() { private_pluto_t *this = (private_pluto_t*)pluto; + this->public.events->destroy(this->public.events); this->public.xauth->destroy(this->public.xauth); free(this); pluto = NULL; @@ -55,6 +56,7 @@ bool pluto_init(char *file) INIT(this, .public = { + .events = event_queue_create(), .xauth = xauth_manager_create(), }, ); diff --git a/src/pluto/pluto.h b/src/pluto/pluto.h index 37e6e3f33..2440093ca 100644 --- a/src/pluto/pluto.h +++ b/src/pluto/pluto.h @@ -31,6 +31,7 @@ typedef struct pluto_t pluto_t; +#include #include #include @@ -40,10 +41,16 @@ typedef struct pluto_t pluto_t; */ struct pluto_t { + /** + * event queue (callbacks, executed by the pluto main thread) + */ + event_queue_t *events; + /** * manager for payload attributes */ xauth_manager_t *xauth; + }; /** diff --git a/src/pluto/server.c b/src/pluto/server.c index 21f65f4f8..6ad49640d 100644 --- a/src/pluto/server.c +++ b/src/pluto/server.c @@ -56,6 +56,7 @@ #include "adns.h" /* needs */ #include "dnskey.h" /* needs keys.h and adns.h */ #include "whack.h" /* for RC_LOG_SERIOUS */ +#include "pluto.h" #include #include @@ -811,7 +812,7 @@ call_server(void) { fd_set readfds; fd_set writefds; - int ndes; + int ndes, events_fd; /* wait for next interesting thing */ @@ -853,6 +854,11 @@ call_server(void) FD_SET(adns_afd, &readfds); } + events_fd = pluto->events->get_event_fd(pluto->events); + if (maxfd < events_fd) + maxfd = events_fd; + FD_SET(events_fd, &readfds); + #ifdef KLIPS if (!no_klips) { @@ -947,6 +953,17 @@ call_server(void) ndes--; } + if (FD_ISSET(events_fd, &readfds)) + { + passert(ndes > 0); + DBG(DBG_CONTROL, + DBG_log(BLANK_FORMAT); + DBG_log("*handling asynchronous events")); + pluto->events->handle(pluto->events); + passert(GLOBALS_ARE_RESET()); + ndes--; + } + #ifdef KLIPS if (!no_klips && FD_ISSET(*kernel_ops->async_fdp, &readfds)) {