Use a separate list and mutex for loggers.

This avoids deadlocks caused by extensive listener_t implementations
which might want to acquire a lock which is currently held by another
thread wanting to log messages. Since the latter requires that thread
to acquire the same lock the initial thread currently holds this
previously resulted in a deadlock.

With this change logging messages does not require threads to acquire
the main lock in bus_t and thus avoids the deadlock.
This commit is contained in:
Tobias Brunner 2011-12-29 19:13:08 +01:00
parent ecb5abd7fa
commit f9f867899a
3 changed files with 168 additions and 73 deletions

View File

@ -39,10 +39,20 @@ struct private_bus_t {
linked_list_t *listeners;
/**
* mutex to synchronize active listeners, recursively
* List of registered listeners implementing listener_t.log() as entry_t
*/
linked_list_t *loggers;
/**
* mutex for the list of listeners, recursively
*/
mutex_t *mutex;
/**
* mutex for the list of loggers
*/
mutex_t *log_mutex;
/**
* Thread local storage the threads IKE_SA
*/
@ -52,7 +62,7 @@ struct private_bus_t {
typedef struct entry_t entry_t;
/**
* a listener entry, either active or passive
* a listener entry
*/
struct entry_t {
@ -65,6 +75,16 @@ struct entry_t {
* are we currently calling this listener
*/
int calling;
/**
* are we currently logging on this listener
*/
int logging;
/**
* TRUE if this listener is active (we use this for the loggers)
*/
bool enabled;
};
/**
@ -76,6 +96,7 @@ static entry_t *entry_create(listener_t *listener)
INIT(this,
.listener = listener,
.enabled = TRUE,
);
return this;
}
@ -91,16 +112,25 @@ static void entry_destroy(entry_t *entry)
METHOD(bus_t, add_listener, void,
private_bus_t *this, listener_t *listener)
{
entry_t *entry = entry_create(listener);
this->mutex->lock(this->mutex);
this->listeners->insert_last(this->listeners, entry_create(listener));
this->listeners->insert_last(this->listeners, entry);
this->mutex->unlock(this->mutex);
if (listener->log)
{
this->log_mutex->lock(this->log_mutex);
this->loggers->insert_last(this->loggers, entry);
this->log_mutex->unlock(this->log_mutex);
}
}
METHOD(bus_t, remove_listener, void,
private_bus_t *this, listener_t *listener)
{
enumerator_t *enumerator;
entry_t *entry;
entry_t *entry, *found = NULL;
this->mutex->lock(this->mutex);
enumerator = this->listeners->create_enumerator(this->listeners);
@ -109,12 +139,20 @@ METHOD(bus_t, remove_listener, void,
if (entry->listener == listener)
{
this->listeners->remove_at(this->listeners, enumerator);
entry_destroy(entry);
found = entry;
break;
}
}
enumerator->destroy(enumerator);
this->mutex->unlock(this->mutex);
if (found)
{
this->log_mutex->lock(this->log_mutex);
this->loggers->remove(this->loggers, found, NULL);
this->log_mutex->unlock(this->log_mutex);
entry_destroy(found);
}
}
typedef struct cleanup_data_t cleanup_data_t;
@ -166,21 +204,22 @@ static bool log_cb(entry_t *entry, log_data_t *data)
{
va_list args;
if (entry->calling || !entry->listener->log)
if (entry->logging)
{ /* avoid recursive calls */
return FALSE;
}
entry->calling++;
entry->logging = TRUE;
va_copy(args, data->args);
if (!entry->listener->log(entry->listener, data->group, data->level,
data->thread, data->ike_sa, data->format, args))
{
entry_destroy(entry);
entry->enabled = FALSE;
va_end(args);
return TRUE;
}
va_end(args);
entry->calling--;
entry->logging = FALSE;
return FALSE;
}
@ -197,11 +236,11 @@ METHOD(bus_t, vlog, void,
data.format = format;
va_copy(data.args, args);
this->mutex->lock(this->mutex);
this->log_mutex->lock(this->log_mutex);
/* We use the remove() method to invoke all listeners. This is cheap and
* does not require an allocation for this performance critical function. */
this->listeners->remove(this->listeners, &data, (void*)log_cb);
this->mutex->unlock(this->mutex);
this->loggers->remove(this->loggers, &data, (void*)log_cb);
this->log_mutex->unlock(this->log_mutex);
va_end(data.args);
}
@ -223,6 +262,9 @@ static void unregister_listener(private_bus_t *this, entry_t *entry,
enumerator_t *enumerator)
{
this->listeners->remove_at(this->listeners, enumerator);
this->log_mutex->lock(this->log_mutex);
this->loggers->remove(this->loggers, entry, NULL);
this->log_mutex->unlock(this->log_mutex);
entry_destroy(entry);
}
@ -245,12 +287,15 @@ METHOD(bus_t, alert, void,
{
continue;
}
entry->calling++;
va_start(args, alert);
keep = entry->listener->alert(entry->listener, ike_sa, alert, args);
va_end(args);
entry->calling--;
if (!keep)
if (entry->enabled)
{
entry->calling++;
va_start(args, alert);
keep = entry->listener->alert(entry->listener, ike_sa, alert, args);
va_end(args);
entry->calling--;
}
if (!entry->enabled || !keep)
{
unregister_listener(this, entry, enumerator);
}
@ -274,10 +319,13 @@ METHOD(bus_t, ike_state_change, void,
{
continue;
}
entry->calling++;
keep = entry->listener->ike_state_change(entry->listener, ike_sa, state);
entry->calling--;
if (!keep)
if (entry->enabled)
{
entry->calling++;
keep = entry->listener->ike_state_change(entry->listener, ike_sa, state);
entry->calling--;
}
if (!entry->enabled || !keep)
{
unregister_listener(this, entry, enumerator);
}
@ -304,11 +352,14 @@ METHOD(bus_t, child_state_change, void,
{
continue;
}
entry->calling++;
keep = entry->listener->child_state_change(entry->listener, ike_sa,
child_sa, state);
entry->calling--;
if (!keep)
if (entry->enabled)
{
entry->calling++;
keep = entry->listener->child_state_change(entry->listener, ike_sa,
child_sa, state);
entry->calling--;
}
if (!entry->enabled || !keep)
{
unregister_listener(this, entry, enumerator);
}
@ -335,11 +386,14 @@ METHOD(bus_t, message, void,
{
continue;
}
entry->calling++;
keep = entry->listener->message(entry->listener, ike_sa,
message, incoming, plain);
entry->calling--;
if (!keep)
if (entry->enabled)
{
entry->calling++;
keep = entry->listener->message(entry->listener, ike_sa,
message, incoming, plain);
entry->calling--;
}
if (!entry->enabled || !keep)
{
unregister_listener(this, entry, enumerator);
}
@ -365,11 +419,14 @@ METHOD(bus_t, ike_keys, void,
{
continue;
}
entry->calling++;
keep = entry->listener->ike_keys(entry->listener, ike_sa, dh, dh_other,
nonce_i, nonce_r, rekey, shared);
entry->calling--;
if (!keep)
if (entry->enabled)
{
entry->calling++;
keep = entry->listener->ike_keys(entry->listener, ike_sa, dh, dh_other,
nonce_i, nonce_r, rekey, shared);
entry->calling--;
}
if (!entry->enabled || !keep)
{
unregister_listener(this, entry, enumerator);
}
@ -397,11 +454,14 @@ METHOD(bus_t, child_keys, void,
{
continue;
}
entry->calling++;
keep = entry->listener->child_keys(entry->listener, ike_sa, child_sa,
initiator, dh, nonce_i, nonce_r);
entry->calling--;
if (!keep)
if (entry->enabled)
{
entry->calling++;
keep = entry->listener->child_keys(entry->listener, ike_sa,
child_sa, initiator, dh, nonce_i, nonce_r);
entry->calling--;
}
if (!entry->enabled || !keep)
{
unregister_listener(this, entry, enumerator);
}
@ -428,11 +488,14 @@ METHOD(bus_t, child_updown, void,
{
continue;
}
entry->calling++;
keep = entry->listener->child_updown(entry->listener,
ike_sa, child_sa, up);
entry->calling--;
if (!keep)
if (entry->enabled)
{
entry->calling++;
keep = entry->listener->child_updown(entry->listener,
ike_sa, child_sa, up);
entry->calling--;
}
if (!entry->enabled || !keep)
{
unregister_listener(this, entry, enumerator);
}
@ -459,10 +522,14 @@ METHOD(bus_t, child_rekey, void,
{
continue;
}
entry->calling++;
keep = entry->listener->child_rekey(entry->listener, ike_sa, old, new);
entry->calling--;
if (!keep)
if (entry->enabled)
{
entry->calling++;
keep = entry->listener->child_rekey(entry->listener, ike_sa,
old, new);
entry->calling--;
}
if (!entry->enabled || !keep)
{
unregister_listener(this, entry, enumerator);
}
@ -486,10 +553,13 @@ METHOD(bus_t, ike_updown, void,
{
continue;
}
entry->calling++;
keep = entry->listener->ike_updown(entry->listener, ike_sa, up);
entry->calling--;
if (!keep)
if (entry->enabled)
{
entry->calling++;
keep = entry->listener->ike_updown(entry->listener, ike_sa, up);
entry->calling--;
}
if (!entry->enabled || !keep)
{
unregister_listener(this, entry, enumerator);
}
@ -527,10 +597,13 @@ METHOD(bus_t, ike_rekey, void,
{
continue;
}
entry->calling++;
keep = entry->listener->ike_rekey(entry->listener, old, new);
entry->calling--;
if (!keep)
if (entry->enabled)
{
entry->calling++;
keep = entry->listener->ike_rekey(entry->listener, old, new);
entry->calling--;
}
if (!entry->enabled || !keep)
{
unregister_listener(this, entry, enumerator);
}
@ -557,11 +630,14 @@ METHOD(bus_t, authorize, bool,
{
continue;
}
entry->calling++;
keep = entry->listener->authorize(entry->listener, ike_sa,
final, &success);
entry->calling--;
if (!keep)
if (entry->enabled)
{
entry->calling++;
keep = entry->listener->authorize(entry->listener, ike_sa,
final, &success);
entry->calling--;
}
if (!entry->enabled || !keep)
{
unregister_listener(this, entry, enumerator);
}
@ -594,11 +670,14 @@ METHOD(bus_t, narrow, void,
{
continue;
}
entry->calling++;
keep = entry->listener->narrow(entry->listener, ike_sa, child_sa,
type, local, remote);
entry->calling--;
if (!keep)
if (entry->enabled)
{
entry->calling++;
keep = entry->listener->narrow(entry->listener, ike_sa, child_sa,
type, local, remote);
entry->calling--;
}
if (!entry->enabled || !keep)
{
unregister_listener(this, entry, enumerator);
}
@ -611,8 +690,10 @@ METHOD(bus_t, destroy, void,
private_bus_t *this)
{
this->thread_sa->destroy(this->thread_sa);
this->log_mutex->destroy(this->log_mutex);
this->mutex->destroy(this->mutex);
this->listeners->destroy_function(this->listeners, (void*)entry_destroy);
this->loggers->destroy(this->loggers);
free(this);
}
@ -646,7 +727,9 @@ bus_t *bus_create()
.destroy = _destroy,
},
.listeners = linked_list_create(),
.loggers = linked_list_create(),
.mutex = mutex_create(MUTEX_TYPE_RECURSIVE),
.log_mutex = mutex_create(MUTEX_TYPE_RECURSIVE),
.thread_sa = thread_value_create(NULL),
);

View File

@ -118,8 +118,12 @@ enum narrow_hook_t {
/**
* The bus receives events and sends them to all registered listeners.
*
* Any events sent to are delivered to all registered listeners. Threads
* may wait actively to events using the blocking listen() call.
* Calls to bus_t.log() are handled seperately from calls to other event
* functions. This means that listeners have to be aware that calls to
* listener_t.log() can happen concurrently with calls to one of the other
* callbacks. Due to this unregistering from the log() callback is not fully
* in sync with the other callbacks, thus, one of these might be called before
* the listener is finally unregistered.
*/
struct bus_t {

View File

@ -35,8 +35,16 @@ struct listener_t {
*
* The implementing signal function returns TRUE to stay registered
* to the bus, or FALSE to unregister itself.
* Calling bus_t.log() inside of a registered listener is possible,
* but the bus does not invoke listeners recursively.
*
* Calling bus_t.log() inside of a registered listener is possible
* from all listener_t callbacks, but recursive calls from log() itself
* are ignored.
*
* Note that calls to bus_t.log() are handled seperately from calls to
* other functions, thus this callback may be called concurrently with
* some of the others. Because of this unregistering from this callback
* does not happen in sync with the other callbacks, thus, one of the other
* callbacks might be called before the listener is finally unregistered.
*
* @param group kind of the signal (up, down, rekeyed, ...)
* @param level verbosity level of the signal