Handle client subscriptions in lookip plugin

This commit is contained in:
Martin Willi 2012-10-04 16:14:10 +02:00
parent 7650dd9a4f
commit bae50c7393
1 changed files with 89 additions and 12 deletions

View File

@ -24,6 +24,8 @@
#include <daemon.h>
#include <threading/thread.h>
#include <threading/mutex.h>
#include <utils/linked_list.h>
#include <processing/jobs/callback_job.h>
#include "lookip_msg.h"
@ -49,6 +51,16 @@ struct private_lookip_socket_t {
* lookip unix socket file descriptor
*/
int socket;
/**
* List of registered listeners, as entry_t
*/
linked_list_t *clients;
/**
* Mutex to lock clients list
*/
mutex_t *mutex;
};
/**
@ -94,41 +106,71 @@ static bool open_socket(private_lookip_socket_t *this)
}
/**
* Listener callback data
* Listener callback entry
*/
typedef struct {
/* FD to write to */
int fd;
/* message type to send */
int type;
} cb_data_t;
/* back pointer to socket, only for subscriptions */
private_lookip_socket_t *this;
} entry_t;
/**
* Destroy entry
*/
static void entry_destroy(entry_t *this)
{
close(this->fd);
free(this);
}
/**
* Callback function for listener
*/
static bool listener_cb(cb_data_t *data, bool up, host_t *vip,
static bool listener_cb(entry_t *entry, bool up, host_t *vip,
host_t *other, identification_t *id, char *name)
{
lookip_response_t resp = {
.type = data->type,
.type = entry->type,
};
/* filter events */
if (up && entry->type == LOOKIP_NOTIFY_DOWN)
{
return TRUE;
}
if (!up && entry->type == LOOKIP_NOTIFY_UP)
{
return TRUE;
}
snprintf(resp.vip, sizeof(resp.vip), "%H", vip);
snprintf(resp.ip, sizeof(resp.ip), "%H", other);
snprintf(resp.id, sizeof(resp.id), "%Y", id);
snprintf(resp.name, sizeof(resp.name), "%s", name);
switch (send(data->fd, &resp, sizeof(resp), 0))
switch (send(entry->fd, &resp, sizeof(resp), 0))
{
case sizeof(resp):
return TRUE;
case 0:
/* client disconnected, adios */
return FALSE;
break;
default:
DBG1(DBG_CFG, "sending lookip response failed: %s", strerror(errno));
return FALSE;
break;
}
if (entry->this)
{ /* unregister listener */
entry->this->mutex->lock(entry->this->mutex);
entry->this->clients->remove(entry->this->clients, entry, NULL);
entry->this->mutex->unlock(entry->this->mutex);
entry_destroy(entry);
}
return FALSE;
}
/**
@ -136,7 +178,7 @@ static bool listener_cb(cb_data_t *data, bool up, host_t *vip,
*/
static void query(private_lookip_socket_t *this, int fd, lookip_request_t *req)
{
cb_data_t data = {
entry_t entry = {
.fd = fd,
.type = LOOKIP_ENTRY,
};
@ -149,17 +191,37 @@ static void query(private_lookip_socket_t *this, int fd, lookip_request_t *req)
if (vip)
{
this->listener->lookup(this->listener, vip,
(void*)listener_cb, &data);
(void*)listener_cb, &entry);
vip->destroy(vip);
}
}
else
{ /* dump */
this->listener->lookup(this->listener, NULL,
(void*)listener_cb, &data);
(void*)listener_cb, &entry);
}
}
/**
* Subscribe to virtual IP events
*/
static void subscribe(private_lookip_socket_t *this, int fd, int type)
{
entry_t *entry;
INIT(entry,
.fd = fd,
.type = type,
.this = this,
);
this->mutex->lock(this->mutex);
this->clients->insert_last(this->clients, entry);
this->mutex->unlock(this->mutex);
this->listener->add_listener(this->listener, (void*)listener_cb, entry);
}
/**
* Accept client connections, dispatch
*/
@ -168,7 +230,7 @@ static job_requeue_t receive(private_lookip_socket_t *this)
struct sockaddr_un addr;
int fd, len = sizeof(addr);
lookip_request_t req;
bool oldstate;
bool oldstate, subscribed = FALSE;
oldstate = thread_cancelability(TRUE);
fd = accept(this->socket, (struct sockaddr*)&addr, &len);
@ -192,6 +254,14 @@ static job_requeue_t receive(private_lookip_socket_t *this)
case LOOKIP_DUMP:
query(this, fd, NULL);
continue;
case LOOKIP_REGISTER_UP:
subscribe(this, fd, LOOKIP_NOTIFY_UP);
subscribed = TRUE;
continue;
case LOOKIP_REGISTER_DOWN:
subscribe(this, fd, LOOKIP_NOTIFY_DOWN);
subscribed = TRUE;
continue;
case LOOKIP_END:
break;
default:
@ -210,7 +280,10 @@ static job_requeue_t receive(private_lookip_socket_t *this)
}
break;
}
close(fd);
if (!subscribed)
{ /* don't close if we queued the fd */
close(fd);
}
}
else
{
@ -223,6 +296,8 @@ static job_requeue_t receive(private_lookip_socket_t *this)
METHOD(lookip_socket_t, destroy, void,
private_lookip_socket_t *this)
{
this->clients->destroy_function(this->clients, (void*)entry_destroy);
this->mutex->destroy(this->mutex);
close(this->socket);
free(this);
}
@ -239,6 +314,8 @@ lookip_socket_t *lookip_socket_create(lookip_listener_t *listener)
.destroy = _destroy,
},
.listener = listener,
.clients = linked_list_create(),
.mutex = mutex_create(MUTEX_TYPE_DEFAULT),
);
if (!open_socket(this))