568 lines
12 KiB
C
568 lines
12 KiB
C
/*
|
|
* Copyright (C) 2014 Martin Willi
|
|
* Copyright (C) 2014 revosec AG
|
|
*
|
|
* This program is free software; you can redistribute it and/or modify it
|
|
* under the terms of the GNU General Public License as published by the
|
|
* Free Software Foundation; either version 2 of the License, or (at your
|
|
* option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
|
|
*
|
|
* This program is distributed in the hope that it will be useful, but
|
|
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
|
|
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
|
* for more details.
|
|
*/
|
|
|
|
/*
|
|
* Copyright (C) 2014 Timo Teräs <timo.teras@iki.fi>
|
|
*
|
|
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
* of this software and associated documentation files (the "Software"), to deal
|
|
* in the Software without restriction, including without limitation the rights
|
|
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
* copies of the Software, and to permit persons to whom the Software is
|
|
* furnished to do so, subject to the following conditions:
|
|
*
|
|
* The above copyright notice and this permission notice shall be included in
|
|
* all copies or substantial portions of the Software.
|
|
*
|
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
|
* THE SOFTWARE.
|
|
*/
|
|
|
|
#include "vici_dispatcher.h"
|
|
#include "vici_socket.h"
|
|
|
|
#include <bio/bio_reader.h>
|
|
#include <bio/bio_writer.h>
|
|
#include <threading/mutex.h>
|
|
#include <threading/condvar.h>
|
|
#include <threading/thread.h>
|
|
#include <collections/array.h>
|
|
#include <collections/hashtable.h>
|
|
|
|
typedef struct private_vici_dispatcher_t private_vici_dispatcher_t;
|
|
|
|
/**
|
|
* Private data of an vici_dispatcher_t object.
|
|
*/
|
|
struct private_vici_dispatcher_t {
|
|
|
|
/**
|
|
* Public vici_dispatcher_t interface.
|
|
*/
|
|
vici_dispatcher_t public;
|
|
|
|
/**
|
|
* Socket to send/receive messages
|
|
*/
|
|
vici_socket_t *socket;
|
|
|
|
/**
|
|
* List of registered commands (char* => command_t*)
|
|
*/
|
|
hashtable_t *cmds;
|
|
|
|
/**
|
|
* List of known events, and registered clients (char* => event_t*)
|
|
*/
|
|
hashtable_t *events;
|
|
|
|
/**
|
|
* Mutex to lock hashtables
|
|
*/
|
|
mutex_t *mutex;
|
|
|
|
/**
|
|
* Condvar to signal command termination
|
|
*/
|
|
condvar_t *cond;
|
|
};
|
|
|
|
/**
|
|
* Registered command
|
|
*/
|
|
typedef struct {
|
|
/** command name */
|
|
char *name;
|
|
/** callback for command */
|
|
vici_command_cb_t cb;
|
|
/** user data to pass to callback */
|
|
void *user;
|
|
/** command currently in use? */
|
|
u_int uses;
|
|
} command_t;
|
|
|
|
/**
|
|
* Registered event
|
|
*/
|
|
typedef struct {
|
|
/** event name */
|
|
char *name;
|
|
/** registered clients, as u_int */
|
|
array_t *clients;
|
|
/** event currently in use? */
|
|
u_int uses;
|
|
} event_t;
|
|
|
|
/**
|
|
* Send a operation code, optionally with name and message
|
|
*/
|
|
static void send_op(private_vici_dispatcher_t *this, u_int id,
|
|
vici_operation_t op, char *name, vici_message_t *message)
|
|
{
|
|
bio_writer_t *writer;
|
|
u_int len;
|
|
|
|
len = sizeof(uint8_t);
|
|
if (name)
|
|
{
|
|
len += sizeof(uint8_t) + strlen(name);
|
|
}
|
|
if (message)
|
|
{
|
|
len += message->get_encoding(message).len;
|
|
}
|
|
writer = bio_writer_create(len);
|
|
writer->write_uint8(writer, op);
|
|
if (name)
|
|
{
|
|
writer->write_data8(writer, chunk_from_str(name));
|
|
}
|
|
if (message)
|
|
{
|
|
writer->write_data(writer, message->get_encoding(message));
|
|
}
|
|
this->socket->send(this->socket, id, writer->extract_buf(writer));
|
|
writer->destroy(writer);
|
|
}
|
|
|
|
/**
|
|
* Register client for event
|
|
*/
|
|
static void register_event(private_vici_dispatcher_t *this, char *name,
|
|
u_int id)
|
|
{
|
|
event_t *event;
|
|
|
|
this->mutex->lock(this->mutex);
|
|
while (TRUE)
|
|
{
|
|
event = this->events->get(this->events, name);
|
|
if (!event)
|
|
{
|
|
break;
|
|
}
|
|
if (!event->uses)
|
|
{
|
|
array_insert(event->clients, ARRAY_TAIL, &id);
|
|
break;
|
|
}
|
|
this->cond->wait(this->cond, this->mutex);
|
|
}
|
|
this->mutex->unlock(this->mutex);
|
|
|
|
if (event)
|
|
{
|
|
DBG2(DBG_CFG, "vici client %u registered for: %s", id, name);
|
|
send_op(this, id, VICI_EVENT_CONFIRM, NULL, NULL);
|
|
}
|
|
else
|
|
{
|
|
DBG1(DBG_CFG, "vici client %u invalid registration: %s", id, name);
|
|
send_op(this, id, VICI_EVENT_UNKNOWN, NULL, NULL);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Unregister client for event
|
|
*/
|
|
static void unregister_event(private_vici_dispatcher_t *this, char *name,
|
|
u_int id)
|
|
{
|
|
enumerator_t *enumerator;
|
|
event_t *event;
|
|
u_int *current;
|
|
bool found = FALSE;
|
|
|
|
this->mutex->lock(this->mutex);
|
|
while (TRUE)
|
|
{
|
|
event = this->events->get(this->events, name);
|
|
if (!event)
|
|
{
|
|
break;
|
|
}
|
|
if (!event->uses)
|
|
{
|
|
enumerator = array_create_enumerator(event->clients);
|
|
while (enumerator->enumerate(enumerator, ¤t))
|
|
{
|
|
if (*current == id)
|
|
{
|
|
array_remove_at(event->clients, enumerator);
|
|
found = TRUE;
|
|
break;
|
|
}
|
|
}
|
|
enumerator->destroy(enumerator);
|
|
break;
|
|
}
|
|
this->cond->wait(this->cond, this->mutex);
|
|
}
|
|
this->mutex->unlock(this->mutex);
|
|
|
|
DBG2(DBG_CFG, "vici client %u unregistered for: %s", id, name);
|
|
|
|
if (found)
|
|
{
|
|
send_op(this, id, VICI_EVENT_CONFIRM, NULL, NULL);
|
|
}
|
|
else
|
|
{
|
|
send_op(this, id, VICI_EVENT_UNKNOWN, NULL, NULL);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Data to release on thread cancellation
|
|
*/
|
|
typedef struct {
|
|
private_vici_dispatcher_t *this;
|
|
command_t *cmd;
|
|
vici_message_t *request;
|
|
} release_data_t;
|
|
|
|
/**
|
|
* Release command after execution/cancellation
|
|
*/
|
|
CALLBACK(release_command, void,
|
|
release_data_t *release)
|
|
{
|
|
release->request->destroy(release->request);
|
|
|
|
release->this->mutex->lock(release->this->mutex);
|
|
if (--release->cmd->uses == 0)
|
|
{
|
|
release->this->cond->broadcast(release->this->cond);
|
|
}
|
|
release->this->mutex->unlock(release->this->mutex);
|
|
|
|
free(release);
|
|
}
|
|
|
|
/**
|
|
* Process a request message
|
|
*/
|
|
void process_request(private_vici_dispatcher_t *this, char *name, u_int id,
|
|
chunk_t data)
|
|
{
|
|
vici_message_t *response = NULL;
|
|
release_data_t *release;
|
|
command_t *cmd;
|
|
|
|
this->mutex->lock(this->mutex);
|
|
cmd = this->cmds->get(this->cmds, name);
|
|
if (cmd)
|
|
{
|
|
cmd->uses++;
|
|
}
|
|
this->mutex->unlock(this->mutex);
|
|
|
|
if (cmd)
|
|
{
|
|
INIT(release,
|
|
.this = this,
|
|
.cmd = cmd,
|
|
);
|
|
|
|
DBG2(DBG_CFG, "vici client %u requests: %s", id, name);
|
|
|
|
thread_cleanup_push(release_command, release);
|
|
|
|
release->request = vici_message_create_from_data(data, FALSE);
|
|
response = release->cmd->cb(cmd->user, cmd->name, id, release->request);
|
|
|
|
thread_cleanup_pop(TRUE);
|
|
|
|
if (response)
|
|
{
|
|
send_op(this, id, VICI_CMD_RESPONSE, NULL, response);
|
|
response->destroy(response);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
DBG1(DBG_CFG, "vici client %u invalid request: %s", id, name);
|
|
send_op(this, id, VICI_CMD_UNKNOWN, NULL, NULL);
|
|
}
|
|
}
|
|
|
|
CALLBACK(inbound, void,
|
|
private_vici_dispatcher_t *this, u_int id, chunk_t data)
|
|
{
|
|
bio_reader_t *reader;
|
|
chunk_t chunk;
|
|
uint8_t type;
|
|
char name[257];
|
|
|
|
reader = bio_reader_create(data);
|
|
if (reader->read_uint8(reader, &type))
|
|
{
|
|
switch (type)
|
|
{
|
|
case VICI_EVENT_REGISTER:
|
|
if (reader->read_data8(reader, &chunk) &&
|
|
vici_stringify(chunk, name, sizeof(name)))
|
|
{
|
|
register_event(this, name, id);
|
|
}
|
|
else
|
|
{
|
|
DBG1(DBG_CFG, "invalid vici register message");
|
|
}
|
|
break;
|
|
case VICI_EVENT_UNREGISTER:
|
|
if (reader->read_data8(reader, &chunk) &&
|
|
vici_stringify(chunk, name, sizeof(name)))
|
|
{
|
|
unregister_event(this, name, id);
|
|
}
|
|
else
|
|
{
|
|
DBG1(DBG_CFG, "invalid vici unregister message");
|
|
}
|
|
break;
|
|
case VICI_CMD_REQUEST:
|
|
if (reader->read_data8(reader, &chunk) &&
|
|
vici_stringify(chunk, name, sizeof(name)))
|
|
{
|
|
thread_cleanup_push((void*)reader->destroy, reader);
|
|
process_request(this, name, id, reader->peek(reader));
|
|
thread_cleanup_pop(FALSE);
|
|
}
|
|
else
|
|
{
|
|
DBG1(DBG_CFG, "invalid vici request message");
|
|
}
|
|
break;
|
|
case VICI_CMD_RESPONSE:
|
|
case VICI_EVENT_CONFIRM:
|
|
case VICI_EVENT_UNKNOWN:
|
|
case VICI_EVENT:
|
|
default:
|
|
DBG1(DBG_CFG, "unsupported vici operation: %u", type);
|
|
break;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
DBG1(DBG_CFG, "invalid vici message");
|
|
}
|
|
reader->destroy(reader);
|
|
}
|
|
|
|
CALLBACK(connect_, void,
|
|
private_vici_dispatcher_t *this, u_int id)
|
|
{
|
|
DBG2(DBG_CFG, "vici client %u connected", id);
|
|
}
|
|
|
|
CALLBACK(disconnect, void,
|
|
private_vici_dispatcher_t *this, u_int id)
|
|
{
|
|
enumerator_t *events, *ids;
|
|
event_t *event;
|
|
u_int *current;
|
|
|
|
/* deregister client from all events */
|
|
this->mutex->lock(this->mutex);
|
|
events = this->events->create_enumerator(this->events);
|
|
while (events->enumerate(events, NULL, &event))
|
|
{
|
|
while (event->uses)
|
|
{
|
|
this->cond->wait(this->cond, this->mutex);
|
|
}
|
|
ids = array_create_enumerator(event->clients);
|
|
while (ids->enumerate(ids, ¤t))
|
|
{
|
|
if (id == *current)
|
|
{
|
|
array_remove_at(event->clients, ids);
|
|
}
|
|
}
|
|
ids->destroy(ids);
|
|
}
|
|
events->destroy(events);
|
|
this->mutex->unlock(this->mutex);
|
|
|
|
DBG2(DBG_CFG, "vici client %u disconnected", id);
|
|
}
|
|
|
|
METHOD(vici_dispatcher_t, manage_command, void,
|
|
private_vici_dispatcher_t *this, char *name,
|
|
vici_command_cb_t cb, void *user)
|
|
{
|
|
command_t *cmd;
|
|
|
|
this->mutex->lock(this->mutex);
|
|
if (cb)
|
|
{
|
|
INIT(cmd,
|
|
.name = strdup(name),
|
|
.cb = cb,
|
|
.user = user,
|
|
);
|
|
cmd = this->cmds->put(this->cmds, cmd->name, cmd);
|
|
}
|
|
else
|
|
{
|
|
cmd = this->cmds->remove(this->cmds, name);
|
|
}
|
|
if (cmd)
|
|
{
|
|
while (cmd->uses)
|
|
{
|
|
this->cond->wait(this->cond, this->mutex);
|
|
}
|
|
free(cmd->name);
|
|
free(cmd);
|
|
}
|
|
this->mutex->unlock(this->mutex);
|
|
}
|
|
|
|
METHOD(vici_dispatcher_t, manage_event, void,
|
|
private_vici_dispatcher_t *this, char *name, bool reg)
|
|
{
|
|
event_t *event;
|
|
|
|
this->mutex->lock(this->mutex);
|
|
if (reg)
|
|
{
|
|
INIT(event,
|
|
.name = strdup(name),
|
|
.clients = array_create(sizeof(u_int), 0),
|
|
);
|
|
event = this->events->put(this->events, event->name, event);
|
|
}
|
|
else
|
|
{
|
|
event = this->events->remove(this->events, name);
|
|
}
|
|
if (event)
|
|
{
|
|
while (event->uses)
|
|
{
|
|
this->cond->wait(this->cond, this->mutex);
|
|
}
|
|
array_destroy(event->clients);
|
|
free(event->name);
|
|
free(event);
|
|
}
|
|
this->mutex->unlock(this->mutex);
|
|
}
|
|
|
|
METHOD(vici_dispatcher_t, has_event_listeners, bool,
|
|
private_vici_dispatcher_t *this, char *name)
|
|
{
|
|
event_t *event;
|
|
bool retval = FALSE;
|
|
|
|
this->mutex->lock(this->mutex);
|
|
event = this->events->get(this->events, name);
|
|
if (event)
|
|
{
|
|
/* the entry might be getting destroyed, but returning
|
|
* false positive is not a problem as a later raise_event
|
|
* will check things again. */
|
|
retval = array_count(event->clients);
|
|
}
|
|
this->mutex->unlock(this->mutex);
|
|
|
|
return retval;
|
|
}
|
|
|
|
METHOD(vici_dispatcher_t, raise_event, void,
|
|
private_vici_dispatcher_t *this, char *name, u_int id,
|
|
vici_message_t *message)
|
|
{
|
|
enumerator_t *enumerator;
|
|
event_t *event;
|
|
u_int *current;
|
|
|
|
this->mutex->lock(this->mutex);
|
|
event = this->events->get(this->events, name);
|
|
if (event)
|
|
{
|
|
event->uses++;
|
|
this->mutex->unlock(this->mutex);
|
|
|
|
enumerator = array_create_enumerator(event->clients);
|
|
while (enumerator->enumerate(enumerator, ¤t))
|
|
{
|
|
if (id == 0 || id == *current)
|
|
{
|
|
send_op(this, *current, VICI_EVENT, name, message);
|
|
}
|
|
}
|
|
enumerator->destroy(enumerator);
|
|
|
|
this->mutex->lock(this->mutex);
|
|
if (--event->uses == 0)
|
|
{
|
|
this->cond->broadcast(this->cond);
|
|
}
|
|
}
|
|
this->mutex->unlock(this->mutex);
|
|
|
|
message->destroy(message);
|
|
}
|
|
|
|
METHOD(vici_dispatcher_t, destroy, void,
|
|
private_vici_dispatcher_t *this)
|
|
{
|
|
DESTROY_IF(this->socket);
|
|
this->mutex->destroy(this->mutex);
|
|
this->cond->destroy(this->cond);
|
|
this->cmds->destroy(this->cmds);
|
|
this->events->destroy(this->events);
|
|
free(this);
|
|
}
|
|
|
|
/**
|
|
* See header
|
|
*/
|
|
vici_dispatcher_t *vici_dispatcher_create(char *uri)
|
|
{
|
|
private_vici_dispatcher_t *this;
|
|
|
|
INIT(this,
|
|
.public = {
|
|
.manage_command = _manage_command,
|
|
.manage_event = _manage_event,
|
|
.has_event_listeners = _has_event_listeners,
|
|
.raise_event = _raise_event,
|
|
.destroy = _destroy,
|
|
},
|
|
.cmds = hashtable_create(hashtable_hash_str, hashtable_equals_str, 1),
|
|
.events = hashtable_create(hashtable_hash_str, hashtable_equals_str, 1),
|
|
.mutex = mutex_create(MUTEX_TYPE_DEFAULT),
|
|
.cond = condvar_create(CONDVAR_TYPE_DEFAULT),
|
|
);
|
|
|
|
this->socket = vici_socket_create(uri, inbound, connect_, disconnect, this);
|
|
if (!this->socket)
|
|
{
|
|
destroy(this);
|
|
return NULL;
|
|
}
|
|
|
|
return &this->public;
|
|
}
|