lookip: use stream service with async I/O dispatching

Now uses SOCK_STREAM, as SOCK_SEQPACKET is not available over TCP. To have
network transparency, the message now uses network byte order.
This commit is contained in:
Martin Willi 2013-07-01 12:47:45 +02:00
parent c2a6fdf286
commit 091d0afa21
5 changed files with 299 additions and 261 deletions

View File

@ -20,51 +20,112 @@
#include <unistd.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <getopt.h>
#include <arpa/inet.h>
/**
* Connect to the daemon, return FD
*/
static int make_connection()
{
struct sockaddr_un addr;
int fd;
union {
struct sockaddr_un un;
struct sockaddr_in in;
struct sockaddr sa;
} addr;
int fd, len;
addr.sun_family = AF_UNIX;
strcpy(addr.sun_path, LOOKIP_SOCKET);
if (getenv("TCP_PORT"))
{
addr.in.sin_family = AF_INET;
addr.in.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
addr.in.sin_port = htons(atoi(getenv("TCP_PORT")));
len = sizeof(addr.in);
}
else
{
addr.un.sun_family = AF_UNIX;
strcpy(addr.un.sun_path, LOOKIP_SOCKET);
fd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
len = offsetof(struct sockaddr_un, sun_path) + strlen(addr.un.sun_path);
}
fd = socket(addr.sa.sa_family, SOCK_STREAM, 0);
if (fd < 0)
{
fprintf(stderr, "opening socket failed: %s\n", strerror(errno));
return -1;
}
if (connect(fd, (struct sockaddr *)&addr,
offsetof(struct sockaddr_un, sun_path) + strlen(addr.sun_path)) < 0)
if (connect(fd, &addr.sa, len) < 0)
{
fprintf(stderr, "connecting to %s failed: %s\n",
LOOKIP_SOCKET, strerror(errno));
fprintf(stderr, "connecting failed: %s\n", strerror(errno));
close(fd);
return -1;
}
return fd;
}
static int read_all(int fd, void *buf, size_t len, int flags)
{
ssize_t ret, done = 0;
while (done < len)
{
ret = recv(fd, buf, len - done, flags);
if (ret == -1 && errno == EINTR)
{ /* interrupted, try again */
continue;
}
if (ret == 0)
{
return 0;
}
if (ret < 0)
{
return -1;
}
done += ret;
buf += ret;
}
return len;
}
static int write_all(int fd, void *buf, size_t len)
{
ssize_t ret, done = 0;
while (done < len)
{
ret = write(fd, buf, len - done);
if (ret == -1 && errno == EINTR)
{ /* interrupted, try again */
continue;
}
if (ret < 0)
{
return -1;
}
done += ret;
buf += ret;
}
return len;
}
/**
* Send a request message
*/
static int send_request(int fd, int type, char *vip)
{
lookip_request_t req = {
.type = type,
.type = htonl(type),
};
if (vip)
{
snprintf(req.vip, sizeof(req.vip), "%s", vip);
}
if (send(fd, &req, sizeof(req), 0) != sizeof(req))
if (write_all(fd, &req, sizeof(req)) != sizeof(req))
{
fprintf(stderr, "writing to socket failed: %s\n", strerror(errno));
return 2;
@ -83,7 +144,7 @@ static int receive(int fd, int block, int loop)
do
{
res = recv(fd, &resp, sizeof(resp), block ? 0 : MSG_DONTWAIT);
res = read_all(fd, &resp, sizeof(resp), block ? 0 : MSG_DONTWAIT);
if (res == 0)
{ /* closed by server */
return 0;
@ -97,7 +158,7 @@ static int receive(int fd, int block, int loop)
fprintf(stderr, "reading from socket failed: %s\n", strerror(errno));
return 1;
}
switch (resp.type)
switch (ntohl(resp.type))
{
case LOOKIP_ENTRY:
label = "lookup:";
@ -120,7 +181,7 @@ static int receive(int fd, int block, int loop)
resp.id[sizeof(resp.id) - 1] = '\0';
resp.name[sizeof(resp.name) - 1] = '\0';
snprintf(name, sizeof(name), "%s[%u]", resp.name, resp.unique_id);
snprintf(name, sizeof(name), "%s[%u]", resp.name, ntohl(resp.unique_id));
printf("%-12s %16s %16s %20s %s\n",
label, resp.vip, resp.ip, name, resp.id);
}

View File

@ -290,6 +290,26 @@ METHOD(lookip_listener_t, add_listener, void,
this->lock->unlock(this->lock);
}
METHOD(lookip_listener_t, remove_listener, void,
private_lookip_listener_t *this, void *user)
{
listener_entry_t *listener;
enumerator_t *enumerator;
this->lock->write_lock(this->lock);
enumerator = this->listeners->create_enumerator(this->listeners);
while (enumerator->enumerate(enumerator, &listener))
{
if (listener->user == user)
{
this->listeners->remove_at(this->listeners, enumerator);
free(listener);
}
}
enumerator->destroy(enumerator);
this->lock->unlock(this->lock);
}
METHOD(lookip_listener_t, destroy, void,
private_lookip_listener_t *this)
{
@ -315,6 +335,7 @@ lookip_listener_t *lookip_listener_create()
},
.lookup = _lookup,
.add_listener = _add_listener,
.remove_listener = _remove_listener,
.destroy = _destroy,
},
.lock = rwlock_create(RWLOCK_TYPE_DEFAULT),

View File

@ -74,6 +74,13 @@ struct lookip_listener_t {
void (*add_listener)(lookip_listener_t *this,
lookip_callback_t cb, void *user);
/**
* Unregister a listener by the user data.
*
* @param user user data, as passed during add_listener()
*/
void (*remove_listener)(lookip_listener_t *this, void *user);
/**
* Destroy a lookip_listener_t.
*/

View File

@ -69,7 +69,7 @@ struct lookip_request_t {
int type;
/** null terminated string representation of virtual IP */
char vip[40];
};
} __attribute__((packed));
/**
* Response message sent to client.
@ -91,6 +91,6 @@ struct lookip_response_t {
char name[40];
/** unique connection id */
unsigned int unique_id;
};
} __attribute__((packed));
#endif /** LOOKIP_MSG_H_ @}*/

View File

@ -48,17 +48,12 @@ struct private_lookip_socket_t {
lookip_listener_t *listener;
/**
* lookip unix socket file descriptor
* stream service accepting connections
*/
int socket;
stream_service_t *service;
/**
* List of registered listeners, as entry_t
*/
linked_list_t *registered;
/**
* List of connected clients, as uintptr_t FD
* List of connected clients, as entry_t
*/
linked_list_t *connected;
@ -69,88 +64,80 @@ struct private_lookip_socket_t {
};
/**
* Open lookip unix socket
*/
static bool open_socket(private_lookip_socket_t *this)
{
struct sockaddr_un addr;
mode_t old;
addr.sun_family = AF_UNIX;
strcpy(addr.sun_path, LOOKIP_SOCKET);
this->socket = socket(AF_UNIX, SOCK_SEQPACKET, 0);
if (this->socket == -1)
{
DBG1(DBG_CFG, "creating lookip socket failed");
return FALSE;
}
unlink(addr.sun_path);
old = umask(~(S_IRWXU | S_IRWXG));
if (bind(this->socket, (struct sockaddr*)&addr, sizeof(addr)) < 0)
{
DBG1(DBG_CFG, "binding lookip socket failed: %s", strerror(errno));
close(this->socket);
return FALSE;
}
umask(old);
if (chown(addr.sun_path, lib->caps->get_uid(lib->caps),
lib->caps->get_gid(lib->caps)) != 0)
{
DBG1(DBG_CFG, "changing lookip socket permissions failed: %s",
strerror(errno));
}
if (listen(this->socket, 10) < 0)
{
DBG1(DBG_CFG, "listening on lookip socket failed: %s", strerror(errno));
close(this->socket);
unlink(addr.sun_path);
return FALSE;
}
return TRUE;
}
/**
* Listener callback entry
* List entry for a connected stream
*/
typedef struct {
/* FD to write to */
int fd;
/* message type to send */
int type;
/* back pointer to socket, only for subscriptions */
/* stream to write to */
stream_t *stream;
/* registered for up events? */
bool up;
/* registered for down events? */
bool down;
/** backref to this for unregistration */
private_lookip_socket_t *this;
} entry_t;
/**
* Destroy entry
* Clean up a connection entry
*/
static void entry_destroy(entry_t *this)
static void entry_destroy(entry_t *entry)
{
close(this->fd);
free(this);
entry->stream->destroy(entry->stream);
free(entry);
}
/**
* Callback function for listener
* Disconnect a stream, remove connection entry
*/
static bool listener_cb(entry_t *entry, bool up, host_t *vip,
host_t *other, identification_t *id,
char *name, u_int unique_id)
static void disconnect(private_lookip_socket_t *this, stream_t *stream)
{
enumerator_t *enumerator;
entry_t *entry;
this->mutex->lock(this->mutex);
enumerator = this->connected->create_enumerator(this->connected);
while (enumerator->enumerate(enumerator, &entry))
{
if (entry->stream == stream)
{
this->connected->remove_at(this->connected, enumerator);
if (entry->up || entry->down)
{
this->listener->remove_listener(this->listener, entry);
}
entry_destroy(entry);
break;
}
}
enumerator->destroy(enumerator);
this->mutex->unlock(this->mutex);
}
/**
* Callback function for listener up/down events
*/
static bool event_cb(entry_t *entry, bool up, host_t *vip, host_t *other,
identification_t *id, char *name, u_int unique_id)
{
lookip_response_t resp = {
.type = entry->type,
.unique_id = unique_id,
.unique_id = htonl(unique_id),
};
/* filter events */
if (up && entry->type == LOOKIP_NOTIFY_DOWN)
if (up)
{
return TRUE;
if (!entry->up)
{
return TRUE;
}
resp.type = htonl(LOOKIP_NOTIFY_UP);
}
if (!up && entry->type == LOOKIP_NOTIFY_UP)
else
{
return TRUE;
if (!entry->down)
{
return TRUE;
}
resp.type = htonl(LOOKIP_NOTIFY_DOWN);
}
snprintf(resp.vip, sizeof(resp.vip), "%H", vip);
@ -158,37 +145,66 @@ static bool listener_cb(entry_t *entry, bool up, host_t *vip,
snprintf(resp.id, sizeof(resp.id), "%Y", id);
snprintf(resp.name, sizeof(resp.name), "%s", name);
switch (send(entry->fd, &resp, sizeof(resp), 0))
if (entry->stream->write_all(entry->stream, &resp, sizeof(resp)))
{
case sizeof(resp):
return TRUE;
case 0:
return TRUE;
}
switch (errno)
{
case ECONNRESET:
case EPIPE:
/* client disconnected, adios */
break;
default:
DBG1(DBG_CFG, "sending lookip event failed: %s", strerror(errno));
break;
}
/* don't unregister, as we return FALSE */
entry->up = entry->down = FALSE;
disconnect(entry->this, entry->stream);
return FALSE;
}
/**
* Callback function for queries
*/
static bool query_cb(stream_t *stream, bool up, host_t *vip, host_t *other,
identification_t *id, char *name, u_int unique_id)
{
lookip_response_t resp = {
.type = htonl(LOOKIP_ENTRY),
.unique_id = htonl(unique_id),
};
snprintf(resp.vip, sizeof(resp.vip), "%H", vip);
snprintf(resp.ip, sizeof(resp.ip), "%H", other);
snprintf(resp.id, sizeof(resp.id), "%Y", id);
snprintf(resp.name, sizeof(resp.name), "%s", name);
if (stream->write_all(stream, &resp, sizeof(resp)))
{
return TRUE;
}
switch (errno)
{
case ECONNRESET:
case EPIPE:
/* client disconnected, adios */
break;
default:
DBG1(DBG_CFG, "sending lookip response failed: %s", strerror(errno));
break;
}
if (entry->this)
{ /* unregister listener */
entry->this->mutex->lock(entry->this->mutex);
entry->this->registered->remove(entry->this->registered, entry, NULL);
entry->this->mutex->unlock(entry->this->mutex);
entry_destroy(entry);
}
return FALSE;
}
/**
* Perform a entry lookup
* Perform a lookup
*/
static void query(private_lookip_socket_t *this, int fd, lookip_request_t *req)
static void query(private_lookip_socket_t *this, stream_t *stream,
lookip_request_t *req)
{
entry_t entry = {
.fd = fd,
.type = LOOKIP_ENTRY,
};
host_t *vip = NULL;
int matches = 0;
@ -199,17 +215,17 @@ static void query(private_lookip_socket_t *this, int fd, lookip_request_t *req)
if (vip)
{
matches = this->listener->lookup(this->listener, vip,
(void*)listener_cb, &entry);
(void*)query_cb, stream);
vip->destroy(vip);
}
if (matches == 0)
{
lookip_response_t resp = {
.type = LOOKIP_NOT_FOUND,
.type = htonl(LOOKIP_NOT_FOUND),
};
snprintf(resp.vip, sizeof(resp.vip), "%s", req->vip);
if (send(fd, &resp, sizeof(resp), 0) < 0)
if (!stream->write_all(stream, &resp, sizeof(resp)))
{
DBG1(DBG_CFG, "sending lookip not-found failed: %s",
strerror(errno));
@ -219,46 +235,59 @@ static void query(private_lookip_socket_t *this, int fd, lookip_request_t *req)
else
{ /* dump */
this->listener->lookup(this->listener, NULL,
(void*)listener_cb, &entry);
(void*)query_cb, stream);
}
}
/**
* Subscribe to virtual IP events
*/
static void subscribe(private_lookip_socket_t *this, int fd, int type)
static void subscribe(private_lookip_socket_t *this, stream_t *stream, bool up)
{
enumerator_t *enumerator;
entry_t *entry;
INIT(entry,
.fd = fd,
.type = type,
.this = this,
);
this->mutex->lock(this->mutex);
this->registered->insert_last(this->registered, entry);
enumerator = this->connected->create_enumerator(this->connected);
while (enumerator->enumerate(enumerator, &entry))
{
if (entry->stream == stream)
{
if (!entry->up && !entry->down)
{ /* newly registered */
this->listener->add_listener(this->listener,
(void*)event_cb, entry);
}
if (up)
{
entry->up = TRUE;
}
else
{
entry->down = TRUE;
}
}
}
enumerator->destroy(enumerator);
this->mutex->unlock(this->mutex);
this->listener->add_listener(this->listener, (void*)listener_cb, entry);
}
/**
* Check if a client is subscribed for notifications
*/
static bool subscribed(private_lookip_socket_t *this, int fd)
static bool subscribed(private_lookip_socket_t *this, stream_t *stream)
{
enumerator_t *enumerator;
bool subscribed = FALSE;
entry_t *entry;
this->mutex->lock(this->mutex);
enumerator = this->registered->create_enumerator(this->registered);
enumerator = this->connected->create_enumerator(this->connected);
while (enumerator->enumerate(enumerator, &entry))
{
if (entry->fd == fd)
if (entry->stream == stream)
{
subscribed = TRUE;
subscribed = entry->up || entry->down;
break;
}
}
@ -269,164 +298,80 @@ static bool subscribed(private_lookip_socket_t *this, int fd)
}
/**
* Create a fd_set from all bound sockets
* Dispatch from a socket, on-read callback
*/
static int build_fds(private_lookip_socket_t *this, fd_set *fds)
{
enumerator_t *enumerator;
uintptr_t fd;
int maxfd;
FD_ZERO(fds);
FD_SET(this->socket, fds);
maxfd = this->socket;
this->mutex->lock(this->mutex);
enumerator = this->connected->create_enumerator(this->connected);
while (enumerator->enumerate(enumerator, &fd))
{
FD_SET(fd, fds);
maxfd = max(maxfd, fd);
}
enumerator->destroy(enumerator);
this->mutex->unlock(this->mutex);
return maxfd + 1;
}
/**
* Find the socket select()ed
*/
static int scan_fds(private_lookip_socket_t *this, fd_set *fds)
{
enumerator_t *enumerator;
uintptr_t fd;
int selected = -1;
this->mutex->lock(this->mutex);
enumerator = this->connected->create_enumerator(this->connected);
while (enumerator->enumerate(enumerator, &fd))
{
if (FD_ISSET(fd, fds))
{
selected = fd;
break;
}
}
enumerator->destroy(enumerator);
this->mutex->unlock(this->mutex);
return selected;
}
/**
* Dispatch from a socket, return TRUE to end communication
*/
static bool dispatch(private_lookip_socket_t *this, int fd)
static bool on_read(private_lookip_socket_t *this, stream_t *stream)
{
lookip_request_t req;
int len;
len = recv(fd, &req, sizeof(req), 0);
if (len != sizeof(req))
if (stream->read_all(stream, &req, sizeof(req)))
{
if (len != 0)
switch (ntohl(req.type))
{
case LOOKIP_LOOKUP:
query(this, stream, &req);
return TRUE;
case LOOKIP_DUMP:
query(this, stream, NULL);
return TRUE;
case LOOKIP_REGISTER_UP:
subscribe(this, stream, TRUE);
return TRUE;
case LOOKIP_REGISTER_DOWN:
subscribe(this, stream, FALSE);
return TRUE;
case LOOKIP_END:
break;
default:
DBG1(DBG_CFG, "received unknown lookip command");
break;
}
}
else
{
if (errno != ECONNRESET)
{
DBG1(DBG_CFG, "receiving lookip request failed: %s",
strerror(errno));
}
disconnect(this, stream);
return FALSE;
}
if (subscribed(this, stream))
{
return TRUE;
}
switch (req.type)
{
case LOOKIP_LOOKUP:
query(this, fd, &req);
return FALSE;
case LOOKIP_DUMP:
query(this, fd, NULL);
return FALSE;
case LOOKIP_REGISTER_UP:
subscribe(this, fd, LOOKIP_NOTIFY_UP);
return FALSE;
case LOOKIP_REGISTER_DOWN:
subscribe(this, fd, LOOKIP_NOTIFY_DOWN);
return FALSE;
case LOOKIP_END:
return TRUE;
default:
DBG1(DBG_CFG, "received unknown lookip command");
return TRUE;
}
disconnect(this, stream);
return FALSE;
}
/**
* Accept client connections, dispatch
*/
static job_requeue_t receive(private_lookip_socket_t *this)
static bool on_accept(private_lookip_socket_t *this, stream_t *stream)
{
struct sockaddr_un addr;
int fd, maxfd, len;
bool oldstate;
fd_set fds;
entry_t *entry;
while (TRUE)
{
maxfd = build_fds(this, &fds);
oldstate = thread_cancelability(TRUE);
if (select(maxfd, &fds, NULL, NULL, NULL) <= 0)
{
thread_cancelability(oldstate);
DBG1(DBG_CFG, "selecting lookip sockets failed: %s",
strerror(errno));
break;
}
thread_cancelability(oldstate);
INIT(entry,
.stream = stream,
.this = this,
);
if (FD_ISSET(this->socket, &fds))
{ /* new connection, accept() */
len = sizeof(addr);
fd = accept(this->socket, (struct sockaddr*)&addr, &len);
if (fd != -1)
{
this->mutex->lock(this->mutex);
this->connected->insert_last(this->connected,
(void*)(uintptr_t)fd);
this->mutex->unlock(this->mutex);
}
else
{
DBG1(DBG_CFG, "accepting lookip connection failed: %s",
strerror(errno));
}
continue;
}
this->mutex->lock(this->mutex);
this->connected->insert_last(this->connected, entry);
this->mutex->unlock(this->mutex);
fd = scan_fds(this, &fds);
if (fd == -1)
{
continue;
}
if (dispatch(this, fd))
{
this->mutex->lock(this->mutex);
this->connected->remove(this->connected, (void*)(uintptr_t)fd, NULL);
this->mutex->unlock(this->mutex);
if (!subscribed(this, fd))
{
close(fd);
}
}
}
return JOB_REQUEUE_FAIR;
stream->on_read(stream, (void*)on_read, this);
return TRUE;
}
METHOD(lookip_socket_t, destroy, void,
private_lookip_socket_t *this)
{
this->registered->destroy_function(this->registered, (void*)entry_destroy);
this->connected->destroy(this->connected);
DESTROY_IF(this->service);
this->connected->destroy_function(this->connected, (void*)entry_destroy);
this->mutex->destroy(this->mutex);
close(this->socket);
free(this);
}
@ -436,26 +381,30 @@ METHOD(lookip_socket_t, destroy, void,
lookip_socket_t *lookip_socket_create(lookip_listener_t *listener)
{
private_lookip_socket_t *this;
char *uri;
INIT(this,
.public = {
.destroy = _destroy,
},
.listener = listener,
.registered = linked_list_create(),
.connected = linked_list_create(),
.mutex = mutex_create(MUTEX_TYPE_DEFAULT),
);
if (!open_socket(this))
uri = lib->settings->get_str(lib->settings,
"%s.plugins.lookip.socket", "unix://" LOOKIP_SOCKET,
charon->name);
this->service = lib->streams->create_service(lib->streams, uri, 10);
if (!this->service)
{
free(this);
DBG1(DBG_CFG, "creating lookip socket failed");
destroy(this);
return NULL;
}
lib->processor->queue_job(lib->processor,
(job_t*)callback_job_create_with_prio((callback_job_cb_t)receive, this,
NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
this->service->on_accept(this->service, (stream_service_cb_t)on_accept,
this, JOB_PRIO_CRITICAL, 1);
return &this->public;
}