vici: Support thread cancellation in command callbacks
This commit is contained in:
parent
045bdf5283
commit
ecc4b51048
|
@ -20,6 +20,7 @@
|
||||||
#include <bio/bio_writer.h>
|
#include <bio/bio_writer.h>
|
||||||
#include <threading/mutex.h>
|
#include <threading/mutex.h>
|
||||||
#include <threading/condvar.h>
|
#include <threading/condvar.h>
|
||||||
|
#include <threading/thread.h>
|
||||||
#include <collections/array.h>
|
#include <collections/array.h>
|
||||||
#include <collections/hashtable.h>
|
#include <collections/hashtable.h>
|
||||||
|
|
||||||
|
@ -184,40 +185,69 @@ static void unregister_event(private_vici_dispatcher_t *this, char *name,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Data to release on thread cancellation
|
||||||
|
*/
|
||||||
|
typedef struct {
|
||||||
|
private_vici_dispatcher_t *this;
|
||||||
|
command_t *cmd;
|
||||||
|
vici_message_t *request;
|
||||||
|
} release_data_t;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Release command after execution/cancellation
|
||||||
|
*/
|
||||||
|
CALLBACK(release_command, void,
|
||||||
|
release_data_t *release)
|
||||||
|
{
|
||||||
|
release->request->destroy(release->request);
|
||||||
|
|
||||||
|
release->this->mutex->lock(release->this->mutex);
|
||||||
|
if (--release->cmd->uses == 0)
|
||||||
|
{
|
||||||
|
release->this->cond->broadcast(release->this->cond);
|
||||||
|
}
|
||||||
|
release->this->mutex->unlock(release->this->mutex);
|
||||||
|
|
||||||
|
free(release);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Process a request message
|
* Process a request message
|
||||||
*/
|
*/
|
||||||
void process_request(private_vici_dispatcher_t *this, char *name, u_int id,
|
void process_request(private_vici_dispatcher_t *this, char *name, u_int id,
|
||||||
chunk_t data)
|
chunk_t data)
|
||||||
{
|
{
|
||||||
command_t *cmd;
|
vici_message_t *response = NULL;
|
||||||
vici_message_t *request, *response = NULL;
|
release_data_t *release;
|
||||||
|
|
||||||
|
INIT(release,
|
||||||
|
.this = this,
|
||||||
|
);
|
||||||
|
|
||||||
this->mutex->lock(this->mutex);
|
this->mutex->lock(this->mutex);
|
||||||
cmd = this->cmds->get(this->cmds, name);
|
release->cmd = this->cmds->get(this->cmds, name);
|
||||||
if (cmd)
|
if (release->cmd)
|
||||||
{
|
{
|
||||||
cmd->uses++;
|
release->cmd->uses++;
|
||||||
}
|
}
|
||||||
this->mutex->unlock(this->mutex);
|
this->mutex->unlock(this->mutex);
|
||||||
|
|
||||||
if (cmd)
|
if (release->cmd)
|
||||||
{
|
{
|
||||||
request = vici_message_create_from_data(data, FALSE);
|
thread_cleanup_push(release_command, release);
|
||||||
response = cmd->cb(cmd->user, cmd->name, id, request);
|
|
||||||
request->destroy(request);
|
|
||||||
|
|
||||||
this->mutex->lock(this->mutex);
|
release->request = vici_message_create_from_data(data, FALSE);
|
||||||
if (--cmd->uses == 0)
|
response = release->cmd->cb(release->cmd->user, release->cmd->name,
|
||||||
|
id, release->request);
|
||||||
|
|
||||||
|
thread_cleanup_pop(TRUE);
|
||||||
|
|
||||||
|
if (response)
|
||||||
{
|
{
|
||||||
this->cond->broadcast(this->cond);
|
send_op(this, id, VICI_CMD_RESPONSE, NULL, response);
|
||||||
|
response->destroy(response);
|
||||||
}
|
}
|
||||||
this->mutex->unlock(this->mutex);
|
|
||||||
}
|
|
||||||
if (response)
|
|
||||||
{
|
|
||||||
send_op(this, id, VICI_CMD_RESPONSE, NULL, response);
|
|
||||||
response->destroy(response);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -264,7 +294,9 @@ CALLBACK(inbound, void,
|
||||||
if (reader->read_data8(reader, &chunk) &&
|
if (reader->read_data8(reader, &chunk) &&
|
||||||
vici_stringify(chunk, name, sizeof(name)))
|
vici_stringify(chunk, name, sizeof(name)))
|
||||||
{
|
{
|
||||||
|
thread_cleanup_push((void*)reader->destroy, reader);
|
||||||
process_request(this, name, id, reader->peek(reader));
|
process_request(this, name, id, reader->peek(reader));
|
||||||
|
thread_cleanup_pop(FALSE);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
#include <daemon.h>
|
#include <daemon.h>
|
||||||
#include <threading/mutex.h>
|
#include <threading/mutex.h>
|
||||||
#include <threading/rwlock.h>
|
#include <threading/rwlock.h>
|
||||||
|
#include <threading/thread.h>
|
||||||
#include <collections/array.h>
|
#include <collections/array.h>
|
||||||
#include <collections/linked_list.h>
|
#include <collections/linked_list.h>
|
||||||
#include <processing/jobs/callback_job.h>
|
#include <processing/jobs/callback_job.h>
|
||||||
|
@ -374,8 +375,9 @@ CALLBACK(on_read, bool,
|
||||||
|
|
||||||
if (data.len)
|
if (data.len)
|
||||||
{
|
{
|
||||||
|
thread_cleanup_push(free, data.ptr);
|
||||||
entry->this->inbound(entry->this->user, entry->id, data);
|
entry->this->inbound(entry->this->user, entry->id, data);
|
||||||
chunk_clear(&data);
|
thread_cleanup_pop(TRUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
|
|
Loading…
Reference in New Issue