watcher: Avoid allocations due to enumerators

Since the FD set could get rebuilt quite often this change avoids having
to allocate memory just to enumerate the registered FDs.
This commit is contained in:
Tobias Brunner 2016-07-14 18:28:42 +02:00
parent dcee481910
commit b27663399b
1 changed files with 83 additions and 37 deletions

View File

@ -1,4 +1,7 @@
/*
* Copyright (C) 2016 Tobias Brunner
* HSR Hochschule fuer Technik Rapperswil
*
* Copyright (C) 2013 Martin Willi
* Copyright (C) 2013 revosec AG
*
@ -27,6 +30,7 @@
#include <fcntl.h>
typedef struct private_watcher_t private_watcher_t;
typedef struct entry_t entry_t;
/**
* Private data of an watcher_t object.
@ -39,9 +43,19 @@ struct private_watcher_t {
watcher_t public;
/**
* List of registered FDs, as entry_t
* List of registered FDs
*/
linked_list_t *fds;
entry_t *fds;
/**
* Last registered FD
*/
entry_t *last;
/**
* Number of registered FDs
*/
u_int count;
/**
* Pending update of FD list?
@ -77,7 +91,7 @@ struct private_watcher_t {
/**
* Entry for a registered file descriptor
*/
typedef struct {
struct entry_t {
/** file descriptor */
int fd;
/** events to watch */
@ -88,7 +102,53 @@ typedef struct {
void *data;
/** callback(s) currently active? */
int in_callback;
} entry_t;
/** next registered fd */
entry_t *next;
};
/**
* Adds the given entry at the end of the list
*/
static void add_entry(private_watcher_t *this, entry_t *entry)
{
if (this->last)
{
this->last->next = entry;
this->last = entry;
}
else
{
this->fds = this->last = entry;
}
this->count++;
}
/**
* Removes and frees the given entry
*
* Updates the previous entry and returns the next entry in the list, if any.
*/
static entry_t *remove_entry(private_watcher_t *this, entry_t *entry,
entry_t *prev)
{
entry_t *next = entry->next;
if (prev)
{
prev->next = next;
}
else
{
this->fds = next;
}
if (this->last == entry)
{
this->last = prev;
}
this->count--;
free(entry);
return next;
}
/**
* Data we pass on for an async notification
@ -153,13 +213,11 @@ static job_requeue_t notify_async(notify_data_t *data)
static void notify_end(notify_data_t *data)
{
private_watcher_t *this = data->this;
enumerator_t *enumerator;
entry_t *entry;
entry_t *entry, *prev = NULL;
/* reactivate the disabled entry */
this->mutex->lock(this->mutex);
enumerator = this->fds->create_enumerator(this->fds);
while (enumerator->enumerate(enumerator, &entry))
for (entry = this->fds; entry; prev = entry, entry = entry->next)
{
if (entry->fd == data->fd)
{
@ -168,8 +226,7 @@ static void notify_end(notify_data_t *data)
entry->events &= ~data->event;
if (!entry->events)
{
this->fds->remove_at(this->fds, enumerator);
free(entry);
remove_entry(this, entry, prev);
break;
}
}
@ -177,8 +234,6 @@ static void notify_end(notify_data_t *data)
break;
}
}
enumerator->destroy(enumerator);
update(this);
this->condvar->broadcast(this->condvar);
this->mutex->unlock(this->mutex);
@ -219,19 +274,16 @@ static void notify(private_watcher_t *this, entry_t *entry,
*/
static void activate_all(private_watcher_t *this)
{
enumerator_t *enumerator;
entry_t *entry;
/* When the watcher thread gets cancelled, we have to reactivate any entry
* and signal threads in remove() to go on. */
this->mutex->lock(this->mutex);
enumerator = this->fds->create_enumerator(this->fds);
while (enumerator->enumerate(enumerator, &entry))
for (entry = this->fds; entry; entry = entry->next)
{
entry->in_callback = 0;
}
enumerator->destroy(enumerator);
this->state = WATCHER_STOPPED;
this->condvar->broadcast(this->condvar);
this->mutex->unlock(this->mutex);
@ -240,7 +292,7 @@ static void activate_all(private_watcher_t *this)
/**
* Find flagged revents in a pollfd set by fd
*/
static int find_revents(struct pollfd *pfd, int count, int fd)
static inline int find_revents(struct pollfd *pfd, int count, int fd)
{
int i;
@ -257,7 +309,8 @@ static int find_revents(struct pollfd *pfd, int count, int fd)
/**
* Check if entry is waiting for a specific event, and if it got signaled
*/
static bool entry_ready(entry_t *entry, watcher_event_t event, int revents)
static inline bool entry_ready(entry_t *entry, watcher_event_t event,
int revents)
{
if (entry->events & event)
{
@ -279,7 +332,6 @@ static bool entry_ready(entry_t *entry, watcher_event_t event, int revents)
*/
static job_requeue_t watch(private_watcher_t *this)
{
enumerator_t *enumerator;
entry_t *entry;
struct pollfd *pfd;
int count = 0, res;
@ -287,8 +339,8 @@ static job_requeue_t watch(private_watcher_t *this)
this->mutex->lock(this->mutex);
count = this->fds->get_count(this->fds);
if (count == 0)
count = this->count;
if (!count)
{
this->state = WATCHER_STOPPED;
this->mutex->unlock(this->mutex);
@ -304,8 +356,7 @@ static job_requeue_t watch(private_watcher_t *this)
pfd[0].events = POLLIN;
count = 1;
enumerator = this->fds->create_enumerator(this->fds);
while (enumerator->enumerate(enumerator, &entry))
for (entry = this->fds; entry; entry = entry->next)
{
if (!entry->in_callback)
{
@ -329,7 +380,6 @@ static job_requeue_t watch(private_watcher_t *this)
count++;
}
}
enumerator->destroy(enumerator);
this->mutex->unlock(this->mutex);
while (!rebuild)
@ -378,8 +428,7 @@ static job_requeue_t watch(private_watcher_t *this)
}
this->mutex->lock(this->mutex);
enumerator = this->fds->create_enumerator(this->fds);
while (enumerator->enumerate(enumerator, &entry))
for (entry = this->fds; entry; entry = entry->next)
{
if (entry->in_callback)
{
@ -406,7 +455,6 @@ static job_requeue_t watch(private_watcher_t *this)
}
}
}
enumerator->destroy(enumerator);
this->mutex->unlock(this->mutex);
if (this->jobs->get_count(this->jobs))
@ -446,7 +494,7 @@ METHOD(watcher_t, add, void,
);
this->mutex->lock(this->mutex);
this->fds->insert_last(this->fds, entry);
add_entry(this, entry);
if (this->state == WATCHER_STOPPED)
{
this->state = WATCHER_QUEUED;
@ -464,16 +512,15 @@ METHOD(watcher_t, add, void,
METHOD(watcher_t, remove_, void,
private_watcher_t *this, int fd)
{
enumerator_t *enumerator;
entry_t *entry;
entry_t *entry, *prev = NULL;
this->mutex->lock(this->mutex);
while (TRUE)
{
bool is_in_callback = FALSE;
enumerator = this->fds->create_enumerator(this->fds);
while (enumerator->enumerate(enumerator, &entry))
entry = this->fds;
while (entry)
{
if (entry->fd == fd)
{
@ -482,11 +529,12 @@ METHOD(watcher_t, remove_, void,
is_in_callback = TRUE;
break;
}
this->fds->remove_at(this->fds, enumerator);
free(entry);
entry = remove_entry(this, entry, prev);
continue;
}
prev = entry;
entry = entry->next;
}
enumerator->destroy(enumerator);
if (!is_in_callback)
{
break;
@ -515,7 +563,6 @@ METHOD(watcher_t, destroy, void,
{
this->mutex->destroy(this->mutex);
this->condvar->destroy(this->condvar);
this->fds->destroy(this->fds);
if (this->notify[0] != -1)
{
close(this->notify[0]);
@ -590,7 +637,6 @@ watcher_t *watcher_create()
.get_state = _get_state,
.destroy = _destroy,
},
.fds = linked_list_create(),
.mutex = mutex_create(MUTEX_TYPE_DEFAULT),
.condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
.jobs = linked_list_create(),