Merge branch 'netlink-extensions'

Introduces options to enable concurrent Netlink queries. While this does not
make much sense on vanilla Linux, this can help on third party stacks to
increase throughput if longer latencies are to expect. Netlink message
retransmission can be optionally enabled if transmission is unreliable.
Non-socket based IKE bypass policies and other tweaks bring better compatibility
to third party stacks using Netlink.
This commit is contained in:
Martin Willi 2014-11-21 11:01:39 +01:00
commit e796b88e86
10 changed files with 1042 additions and 106 deletions

View File

@ -13,6 +13,26 @@ charon.plugins.kernel-netlink.mss = 0
charon.plugins.kernel-netlink.mtu = 0 charon.plugins.kernel-netlink.mtu = 0
MTU to set on installed routes, 0 to disable. MTU to set on installed routes, 0 to disable.
charon.plugins.kernel-netlink.parallel_route = no
Whether to perform concurrent Netlink ROUTE queries on a single socket.
Whether to perform concurrent Netlink ROUTE queries on a single socket.
While parallel queries can improve throughput, it has more overhead. On
vanilla Linux, DUMP queries fail with EBUSY and must be retried, further
decreasing performance.
charon.plugins.kernel-netlink.parallel_xfrm = no
Whether to perform concurrent Netlink XFRM queries on a single socket.
charon.plugins.kernel-netlink.port_bypass = no
Whether to use port or socket based IKE XFRM bypass policies.
Whether to use port or socket based IKE XFRM bypass policies.
IKE bypass policies are used to exempt IKE traffic from XFRM processing.
The default socket based policies are directly tied to the IKE UDP sockets,
port based policies use global XFRM bypass policies for the used IKE UDP
ports.
charon.plugins.kernel-netlink.roam_events = yes charon.plugins.kernel-netlink.roam_events = yes
Whether to trigger roam events when interfaces, addresses or routes change. Whether to trigger roam events when interfaces, addresses or routes change.
@ -25,6 +45,12 @@ charon.plugins.kernel-netlink.set_proto_port_transport_sa = no
traffic, it also prevents the use of a single IPsec SA by more than one traffic, it also prevents the use of a single IPsec SA by more than one
traffic selector. traffic selector.
charon.plugins.kernel-netlink.retries = 0
Number of Netlink message retransmissions to send on timeout.
charon.plugins.kernel-netlink.timeout = 0
Netlink message retransmission timeout, 0 to disable retransmissions.
charon.plugins.kernel-netlink.xfrm_acq_expires = 165 charon.plugins.kernel-netlink.xfrm_acq_expires = 165
Lifetime of XFRM acquire state in kernel. Lifetime of XFRM acquire state in kernel.

View File

@ -0,0 +1 @@
tests

View File

@ -21,3 +21,24 @@ libstrongswan_kernel_netlink_la_SOURCES = \
kernel_netlink_shared.h kernel_netlink_shared.c kernel_netlink_shared.h kernel_netlink_shared.c
libstrongswan_kernel_netlink_la_LDFLAGS = -module -avoid-version libstrongswan_kernel_netlink_la_LDFLAGS = -module -avoid-version
TESTS = tests
check_PROGRAMS = $(TESTS)
tests_SOURCES = \
tests.h tests.c \
suites/test_socket.c \
kernel_netlink_shared.c
tests_CFLAGS = \
-I$(top_srcdir)/src/libstrongswan \
-I$(top_srcdir)/src/libstrongswan/tests \
-DNETLINK_MSG_LOSS_HOOK=netlink_msg_loss \
@COVERAGE_CFLAGS@
tests_LDFLAGS = @COVERAGE_LDFLAGS@
tests_LDADD = \
$(top_builddir)/src/libstrongswan/libstrongswan.la \
$(top_builddir)/src/libstrongswan/tests/libtest.la

View File

@ -38,6 +38,7 @@
#include <hydra.h> #include <hydra.h>
#include <utils/debug.h> #include <utils/debug.h>
#include <threading/mutex.h> #include <threading/mutex.h>
#include <collections/array.h>
#include <collections/hashtable.h> #include <collections/hashtable.h>
#include <collections/linked_list.h> #include <collections/linked_list.h>
@ -319,6 +320,16 @@ struct private_kernel_netlink_ipsec_t {
* Whether to track the history of a policy * Whether to track the history of a policy
*/ */
bool policy_history; bool policy_history;
/**
* Wheter to always use UPDATE to install policies
*/
bool policy_update;
/**
* Installed port based IKE bypass policies, as bypass_t
*/
array_t *bypass;
}; };
typedef struct route_entry_t route_entry_t; typedef struct route_entry_t route_entry_t;
@ -2313,6 +2324,11 @@ METHOD(kernel_ipsec_t, add_policy, status_t,
return SUCCESS; return SUCCESS;
} }
if (this->policy_update)
{
found = TRUE;
}
DBG2(DBG_KNL, "%s policy %R === %R %N (mark %u/0x%08x)", DBG2(DBG_KNL, "%s policy %R === %R %N (mark %u/0x%08x)",
found ? "updating" : "adding", src_ts, dst_ts, found ? "updating" : "adding", src_ts, dst_ts,
policy_dir_names, direction, mark.value, mark.mask); policy_dir_names, direction, mark.value, mark.mask);
@ -2576,9 +2592,11 @@ METHOD(kernel_ipsec_t, flush_policies, status_t,
return SUCCESS; return SUCCESS;
} }
/**
METHOD(kernel_ipsec_t, bypass_socket, bool, * Bypass socket using a per-socket policy
private_kernel_netlink_ipsec_t *this, int fd, int family) */
static bool add_socket_bypass(private_kernel_netlink_ipsec_t *this,
int fd, int family)
{ {
struct xfrm_userpolicy_info policy; struct xfrm_userpolicy_info policy;
u_int sol, ipsec_policy; u_int sol, ipsec_policy;
@ -2618,6 +2636,154 @@ METHOD(kernel_ipsec_t, bypass_socket, bool,
return TRUE; return TRUE;
} }
/**
* Port based IKE bypass policy
*/
typedef struct {
/** address family */
int family;
/** layer 4 protocol */
int proto;
/** port number, network order */
u_int16_t port;
} bypass_t;
/**
* Add or remove a bypass policy from/to kernel
*/
static bool manage_bypass(private_kernel_netlink_ipsec_t *this,
int type, policy_dir_t dir, bypass_t *bypass)
{
netlink_buf_t request;
struct xfrm_selector *sel;
struct nlmsghdr *hdr;
memset(&request, 0, sizeof(request));
hdr = &request.hdr;
hdr->nlmsg_flags = NLM_F_REQUEST | NLM_F_ACK;
hdr->nlmsg_type = type;
if (type == XFRM_MSG_NEWPOLICY)
{
struct xfrm_userpolicy_info *policy;
hdr->nlmsg_len = NLMSG_LENGTH(sizeof(struct xfrm_userpolicy_info));
policy = NLMSG_DATA(hdr);
policy->dir = dir;
policy->priority = 32;
policy->action = XFRM_POLICY_ALLOW;
policy->share = XFRM_SHARE_ANY;
policy->lft.soft_byte_limit = XFRM_INF;
policy->lft.soft_packet_limit = XFRM_INF;
policy->lft.hard_byte_limit = XFRM_INF;
policy->lft.hard_packet_limit = XFRM_INF;
sel = &policy->sel;
}
else /* XFRM_MSG_DELPOLICY */
{
struct xfrm_userpolicy_id *policy;
hdr->nlmsg_len = NLMSG_LENGTH(sizeof(struct xfrm_userpolicy_id));
policy = NLMSG_DATA(hdr);
policy->dir = dir;
sel = &policy->sel;
}
sel->family = bypass->family;
sel->proto = bypass->proto;
if (dir == POLICY_IN)
{
sel->dport = bypass->port;
sel->dport_mask = 0xffff;
}
else
{
sel->sport = bypass->port;
sel->sport_mask = 0xffff;
}
return this->socket_xfrm->send_ack(this->socket_xfrm, hdr) == SUCCESS;
}
/**
* Bypass socket using a port-based bypass policy
*/
static bool add_port_bypass(private_kernel_netlink_ipsec_t *this,
int fd, int family)
{
union {
struct sockaddr sa;
struct sockaddr_in in;
struct sockaddr_in6 in6;
} saddr;
socklen_t len;
bypass_t bypass = {
.family = family,
};
len = sizeof(saddr);
if (getsockname(fd, &saddr.sa, &len) != 0)
{
return FALSE;
}
#ifdef SO_PROTOCOL /* since 2.6.32 */
len = sizeof(bypass.proto);
if (getsockopt(fd, SOL_SOCKET, SO_PROTOCOL, &bypass.proto, &len) != 0)
#endif
{ /* assume UDP if SO_PROTOCOL not supported */
bypass.proto = IPPROTO_UDP;
}
switch (family)
{
case AF_INET:
bypass.port = saddr.in.sin_port;
break;
case AF_INET6:
bypass.port = saddr.in6.sin6_port;
break;
default:
return FALSE;
}
if (!manage_bypass(this, XFRM_MSG_NEWPOLICY, POLICY_IN, &bypass))
{
return FALSE;
}
if (!manage_bypass(this, XFRM_MSG_NEWPOLICY, POLICY_OUT, &bypass))
{
manage_bypass(this, XFRM_MSG_DELPOLICY, POLICY_IN, &bypass);
return FALSE;
}
array_insert(this->bypass, ARRAY_TAIL, &bypass);
return TRUE;
}
/**
* Remove installed port based bypass policy
*/
static void remove_port_bypass(bypass_t *bypass, int idx,
private_kernel_netlink_ipsec_t *this)
{
manage_bypass(this, XFRM_MSG_DELPOLICY, POLICY_OUT, bypass);
manage_bypass(this, XFRM_MSG_DELPOLICY, POLICY_IN, bypass);
}
METHOD(kernel_ipsec_t, bypass_socket, bool,
private_kernel_netlink_ipsec_t *this, int fd, int family)
{
if (lib->settings->get_bool(lib->settings,
"%s.plugins.kernel-netlink.port_bypass", FALSE, lib->ns))
{
return add_port_bypass(this, fd, family);
}
return add_socket_bypass(this, fd, family);
}
METHOD(kernel_ipsec_t, enable_udp_decap, bool, METHOD(kernel_ipsec_t, enable_udp_decap, bool,
private_kernel_netlink_ipsec_t *this, int fd, int family, u_int16_t port) private_kernel_netlink_ipsec_t *this, int fd, int family, u_int16_t port)
{ {
@ -2637,6 +2803,8 @@ METHOD(kernel_ipsec_t, destroy, void,
enumerator_t *enumerator; enumerator_t *enumerator;
policy_entry_t *policy; policy_entry_t *policy;
array_destroy_function(this->bypass,
(array_callback_t)remove_port_bypass, this);
if (this->socket_xfrm_events > 0) if (this->socket_xfrm_events > 0)
{ {
lib->watcher->remove(lib->watcher, this->socket_xfrm_events); lib->watcher->remove(lib->watcher, this->socket_xfrm_events);
@ -2688,8 +2856,11 @@ kernel_netlink_ipsec_t *kernel_netlink_ipsec_create()
(hashtable_equals_t)policy_equals, 32), (hashtable_equals_t)policy_equals, 32),
.sas = hashtable_create((hashtable_hash_t)ipsec_sa_hash, .sas = hashtable_create((hashtable_hash_t)ipsec_sa_hash,
(hashtable_equals_t)ipsec_sa_equals, 32), (hashtable_equals_t)ipsec_sa_equals, 32),
.bypass = array_create(sizeof(bypass_t), 0),
.mutex = mutex_create(MUTEX_TYPE_DEFAULT), .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
.policy_history = TRUE, .policy_history = TRUE,
.policy_update = lib->settings->get_bool(lib->settings,
"%s.plugins.kernel-netlink.policy_update", FALSE, lib->ns),
.install_routes = lib->settings->get_bool(lib->settings, .install_routes = lib->settings->get_bool(lib->settings,
"%s.install_routes", TRUE, lib->ns), "%s.install_routes", TRUE, lib->ns),
.proto_port_transport = lib->settings->get_bool(lib->settings, .proto_port_transport = lib->settings->get_bool(lib->settings,
@ -2711,7 +2882,9 @@ kernel_netlink_ipsec_t *kernel_netlink_ipsec_create()
fclose(f); fclose(f);
} }
this->socket_xfrm = netlink_socket_create(NETLINK_XFRM, xfrm_msg_names); this->socket_xfrm = netlink_socket_create(NETLINK_XFRM, xfrm_msg_names,
lib->settings->get_bool(lib->settings,
"%s.plugins.kernel-netlink.parallel_xfrm", FALSE, lib->ns));
if (!this->socket_xfrm) if (!this->socket_xfrm)
{ {
destroy(this); destroy(this);

View File

@ -1975,6 +1975,8 @@ METHOD(kernel_net_t, add_ip, status_t,
if (iface) if (iface)
{ {
addr_entry_t *addr; addr_entry_t *addr;
char *ifname;
int ifi;
INIT(addr, INIT(addr,
.ip = virtual_ip->clone(virtual_ip), .ip = virtual_ip->clone(virtual_ip),
@ -1983,26 +1985,30 @@ METHOD(kernel_net_t, add_ip, status_t,
); );
iface->addrs->insert_last(iface->addrs, addr); iface->addrs->insert_last(iface->addrs, addr);
addr_map_entry_add(this->vips, addr, iface); addr_map_entry_add(this->vips, addr, iface);
ifi = iface->ifindex;
this->lock->unlock(this->lock);
if (manage_ipaddr(this, RTM_NEWADDR, NLM_F_CREATE | NLM_F_EXCL, if (manage_ipaddr(this, RTM_NEWADDR, NLM_F_CREATE | NLM_F_EXCL,
iface->ifindex, virtual_ip, prefix) == SUCCESS) ifi, virtual_ip, prefix) == SUCCESS)
{ {
this->lock->write_lock(this->lock);
while (!is_vip_installed_or_gone(this, virtual_ip, &entry)) while (!is_vip_installed_or_gone(this, virtual_ip, &entry))
{ /* wait until address appears */ { /* wait until address appears */
this->condvar->wait(this->condvar, this->lock); this->condvar->wait(this->condvar, this->lock);
} }
if (entry) if (entry)
{ /* we fail if the interface got deleted in the meantime */ { /* we fail if the interface got deleted in the meantime */
DBG2(DBG_KNL, "virtual IP %H installed on %s", virtual_ip, ifname = strdup(entry->iface->ifname);
entry->iface->ifname);
this->lock->unlock(this->lock); this->lock->unlock(this->lock);
DBG2(DBG_KNL, "virtual IP %H installed on %s",
virtual_ip, ifname);
/* during IKEv1 reauthentication, children get moved from /* during IKEv1 reauthentication, children get moved from
* old the new SA before the virtual IP is available. This * old the new SA before the virtual IP is available. This
* kills the route for our virtual IP, reinstall. */ * kills the route for our virtual IP, reinstall. */
queue_route_reinstall(this, strdup(entry->iface->ifname)); queue_route_reinstall(this, ifname);
return SUCCESS; return SUCCESS;
} }
this->lock->unlock(this->lock);
} }
this->lock->unlock(this->lock);
DBG1(DBG_KNL, "adding virtual IP %H failed", virtual_ip); DBG1(DBG_KNL, "adding virtual IP %H failed", virtual_ip);
return FAILED; return FAILED;
} }
@ -2048,20 +2054,23 @@ METHOD(kernel_net_t, del_ip, status_t,
if (entry->addr->refcount == 1) if (entry->addr->refcount == 1)
{ {
status_t status; status_t status;
int ifi;
/* we set this flag so that threads calling add_ip will block and wait /* we set this flag so that threads calling add_ip will block and wait
* until the entry is gone, also so we can wait below */ * until the entry is gone, also so we can wait below */
entry->addr->installed = FALSE; entry->addr->installed = FALSE;
status = manage_ipaddr(this, RTM_DELADDR, 0, entry->iface->ifindex, ifi = entry->iface->ifindex;
virtual_ip, prefix); this->lock->unlock(this->lock);
status = manage_ipaddr(this, RTM_DELADDR, 0, ifi, virtual_ip, prefix);
if (status == SUCCESS && wait) if (status == SUCCESS && wait)
{ /* wait until the address is really gone */ { /* wait until the address is really gone */
this->lock->write_lock(this->lock);
while (is_known_vip(this, virtual_ip)) while (is_known_vip(this, virtual_ip))
{ {
this->condvar->wait(this->condvar, this->lock); this->condvar->wait(this->condvar, this->lock);
} }
this->lock->unlock(this->lock);
} }
this->lock->unlock(this->lock);
return status; return status;
} }
else else
@ -2490,7 +2499,9 @@ kernel_netlink_net_t *kernel_netlink_net_create()
.destroy = _destroy, .destroy = _destroy,
}, },
}, },
.socket = netlink_socket_create(NETLINK_ROUTE, rt_msg_names), .socket = netlink_socket_create(NETLINK_ROUTE, rt_msg_names,
lib->settings->get_bool(lib->settings,
"%s.plugins.kernel-netlink.parallel_route", FALSE, lib->ns)),
.rt_exclude = linked_list_create(), .rt_exclude = linked_list_create(),
.routes = hashtable_create((hashtable_hash_t)route_entry_hash, .routes = hashtable_create((hashtable_hash_t)route_entry_hash,
(hashtable_equals_t)route_entry_equals, 16), (hashtable_equals_t)route_entry_equals, 16),

View File

@ -1,4 +1,6 @@
/* /*
* Copyright (C) 2014 Martin Willi
* Copyright (C) 2014 revosec AG
* Copyright (C) 2008 Tobias Brunner * Copyright (C) 2008 Tobias Brunner
* Hochschule fuer Technik Rapperswil * Hochschule fuer Technik Rapperswil
* *
@ -16,6 +18,7 @@
#include <sys/socket.h> #include <sys/socket.h>
#include <linux/netlink.h> #include <linux/netlink.h>
#include <linux/rtnetlink.h> #include <linux/rtnetlink.h>
#include <linux/xfrm.h>
#include <errno.h> #include <errno.h>
#include <unistd.h> #include <unistd.h>
@ -23,6 +26,9 @@
#include <utils/debug.h> #include <utils/debug.h>
#include <threading/mutex.h> #include <threading/mutex.h>
#include <threading/condvar.h>
#include <collections/array.h>
#include <collections/hashtable.h>
typedef struct private_netlink_socket_t private_netlink_socket_t; typedef struct private_netlink_socket_t private_netlink_socket_t;
@ -30,138 +36,445 @@ typedef struct private_netlink_socket_t private_netlink_socket_t;
* Private variables and functions of netlink_socket_t class. * Private variables and functions of netlink_socket_t class.
*/ */
struct private_netlink_socket_t { struct private_netlink_socket_t {
/** /**
* public part of the netlink_socket_t object. * public part of the netlink_socket_t object.
*/ */
netlink_socket_t public; netlink_socket_t public;
/** /**
* mutex to lock access to netlink socket * mutex to lock access entries
*/ */
mutex_t *mutex; mutex_t *mutex;
/** /**
* current sequence number for netlink request * Netlink request entries currently active, uintptr_t seq => entry_t
*/ */
int seq; hashtable_t *entries;
/**
* Current sequence number for Netlink requests
*/
refcount_t seq;
/** /**
* netlink socket * netlink socket
*/ */
int socket; int socket;
/**
* Netlink protocol
*/
int protocol;
/** /**
* Enum names for Netlink messages * Enum names for Netlink messages
*/ */
enum_name_t *names; enum_name_t *names;
/**
* Timeout for Netlink replies, in ms
*/
u_int timeout;
/**
* Number of times to repeat timed out queries
*/
u_int retries;
/**
* Use parallel netlink queries
*/
bool parallel;
/**
* Ignore errors potentially resulting from a retransmission
*/
bool ignore_retransmit_errors;
}; };
/** /**
* Imported from kernel_netlink_ipsec.c * #definable hook to simulate request message loss
*/ */
extern enum_name_t *xfrm_msg_names; #ifdef NETLINK_MSG_LOSS_HOOK
bool NETLINK_MSG_LOSS_HOOK(struct nlmsghdr *msg);
#define msg_loss_hook(msg) NETLINK_MSG_LOSS_HOOK(msg)
#else
#define msg_loss_hook(msg) FALSE
#endif
/**
* Request entry the answer for a waiting thread is collected in
*/
typedef struct {
/** Condition variable thread is waiting */
condvar_t *condvar;
/** Array of hdrs in a multi-message response, as struct nlmsghdr* */
array_t *hdrs;
/** All response messages received? */
bool complete;
} entry_t;
/**
* Clean up a thread waiting entry
*/
static void destroy_entry(entry_t *entry)
{
entry->condvar->destroy(entry->condvar);
array_destroy_function(entry->hdrs, (void*)free, NULL);
free(entry);
}
/**
* Write a Netlink message to socket
*/
static bool write_msg(private_netlink_socket_t *this, struct nlmsghdr *msg)
{
struct sockaddr_nl addr = {
.nl_family = AF_NETLINK,
};
int len;
if (msg_loss_hook(msg))
{
return TRUE;
}
while (TRUE)
{
len = sendto(this->socket, msg, msg->nlmsg_len, 0,
(struct sockaddr*)&addr, sizeof(addr));
if (len != msg->nlmsg_len)
{
if (errno == EINTR)
{
continue;
}
DBG1(DBG_KNL, "netlink write error: %s", strerror(errno));
return FALSE;
}
return TRUE;
}
}
/**
* Read a single Netlink message from socket, return 0 on error, -1 on timeout
*/
static ssize_t read_msg(private_netlink_socket_t *this,
char buf[4096], size_t buflen, bool block)
{
ssize_t len;
if (block)
{
fd_set set;
timeval_t tv = {};
FD_ZERO(&set);
FD_SET(this->socket, &set);
timeval_add_ms(&tv, this->timeout);
if (select(this->socket + 1, &set, NULL, NULL,
this->timeout ? &tv : NULL) <= 0)
{
return -1;
}
}
len = recv(this->socket, buf, buflen, block ? 0 : MSG_DONTWAIT);
if (len == buflen)
{
DBG1(DBG_KNL, "netlink response exceeds buffer size");
return 0;
}
if (len < 0)
{
if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR)
{
DBG1(DBG_KNL, "netlink read error: %s", strerror(errno));
}
return 0;
}
return len;
}
/**
* Queue received response message
*/
static bool queue(private_netlink_socket_t *this, struct nlmsghdr *buf)
{
struct nlmsghdr *hdr;
entry_t *entry;
uintptr_t seq;
seq = (uintptr_t)buf->nlmsg_seq;
this->mutex->lock(this->mutex);
entry = this->entries->get(this->entries, (void*)seq);
if (entry)
{
hdr = malloc(buf->nlmsg_len);
memcpy(hdr, buf, buf->nlmsg_len);
array_insert(entry->hdrs, ARRAY_TAIL, hdr);
if (hdr->nlmsg_type == NLMSG_DONE || !(hdr->nlmsg_flags & NLM_F_MULTI))
{
entry->complete = TRUE;
entry->condvar->signal(entry->condvar);
}
}
else
{
DBG1(DBG_KNL, "received unknown netlink seq %u, ignored", seq);
}
this->mutex->unlock(this->mutex);
return entry != NULL;
}
/**
* Read and queue response message, optionally blocking, returns TRUE on timeout
*/
static bool read_and_queue(private_netlink_socket_t *this, bool block)
{
struct nlmsghdr *hdr;
union {
struct nlmsghdr hdr;
char bytes[4096];
} buf;
ssize_t len;
len = read_msg(this, buf.bytes, sizeof(buf.bytes), block);
if (len == -1)
{
return TRUE;
}
if (len)
{
hdr = &buf.hdr;
while (NLMSG_OK(hdr, len))
{
if (!queue(this, hdr))
{
break;
}
hdr = NLMSG_NEXT(hdr, len);
}
}
return FALSE;
}
CALLBACK(watch, bool,
private_netlink_socket_t *this, int fd, watcher_event_t event)
{
if (event == WATCHER_READ)
{
read_and_queue(this, FALSE);
}
return TRUE;
}
/**
* Send a netlink request, try once
*/
static status_t send_once(private_netlink_socket_t *this, struct nlmsghdr *in,
uintptr_t seq, struct nlmsghdr **out, size_t *out_len)
{
struct nlmsghdr *hdr;
chunk_t result = {};
entry_t *entry;
in->nlmsg_seq = seq;
in->nlmsg_pid = getpid();
if (this->names)
{
DBG3(DBG_KNL, "sending %N %u: %b", this->names, in->nlmsg_type,
(u_int)seq, in, in->nlmsg_len);
}
this->mutex->lock(this->mutex);
if (!write_msg(this, in))
{
this->mutex->unlock(this->mutex);
return FAILED;
}
INIT(entry,
.condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
.hdrs = array_create(0, 0),
);
this->entries->put(this->entries, (void*)seq, entry);
while (!entry->complete)
{
if (this->parallel &&
lib->watcher->get_state(lib->watcher) == WATCHER_RUNNING)
{
if (this->timeout)
{
if (entry->condvar->timed_wait(entry->condvar, this->mutex,
this->timeout))
{
break;
}
}
else
{
entry->condvar->wait(entry->condvar, this->mutex);
}
}
else
{ /* During (de-)initialization, no watcher thread is active.
* collect responses ourselves. */
if (read_and_queue(this, TRUE))
{
break;
}
}
}
this->entries->remove(this->entries, (void*)seq);
this->mutex->unlock(this->mutex);
if (!entry->complete)
{ /* timeout */
destroy_entry(entry);
return OUT_OF_RES;
}
while (array_remove(entry->hdrs, ARRAY_HEAD, &hdr))
{
if (this->names)
{
DBG3(DBG_KNL, "received %N %u: %b", this->names, hdr->nlmsg_type,
hdr->nlmsg_seq, hdr, hdr->nlmsg_len);
}
result = chunk_cat("mm", result,
chunk_create((char*)hdr, hdr->nlmsg_len));
}
destroy_entry(entry);
*out_len = result.len;
*out = (struct nlmsghdr*)result.ptr;
return SUCCESS;
}
/**
* Ignore errors for message types that might have completed previously
*/
static void ignore_retransmit_error(private_netlink_socket_t *this,
struct nlmsgerr *err, int type)
{
switch (err->error)
{
case -EEXIST:
switch (this->protocol)
{
case NETLINK_XFRM:
switch (type)
{
case XFRM_MSG_NEWPOLICY:
case XFRM_MSG_NEWSA:
err->error = 0;
break;
}
break;
case NETLINK_ROUTE:
switch (type)
{
case RTM_NEWADDR:
case RTM_NEWLINK:
case RTM_NEWNEIGH:
case RTM_NEWROUTE:
case RTM_NEWRULE:
err->error = 0;
break;
}
break;
}
break;
case -ENOENT:
switch (this->protocol)
{
case NETLINK_XFRM:
switch (type)
{
case XFRM_MSG_DELPOLICY:
case XFRM_MSG_DELSA:
err->error = 0;
break;
}
break;
case NETLINK_ROUTE:
switch (type)
{
case RTM_DELADDR:
case RTM_DELLINK:
case RTM_DELNEIGH:
case RTM_DELROUTE:
case RTM_DELRULE:
err->error = 0;
break;
}
break;
}
break;
}
}
METHOD(netlink_socket_t, netlink_send, status_t, METHOD(netlink_socket_t, netlink_send, status_t,
private_netlink_socket_t *this, struct nlmsghdr *in, struct nlmsghdr **out, private_netlink_socket_t *this, struct nlmsghdr *in, struct nlmsghdr **out,
size_t *out_len) size_t *out_len)
{ {
union { uintptr_t seq;
struct nlmsghdr hdr; u_int try;
u_char bytes[4096];
} response;
struct sockaddr_nl addr;
chunk_t result = chunk_empty;
int len;
this->mutex->lock(this->mutex); seq = ref_get(&this->seq);
in->nlmsg_seq = ++this->seq; for (try = 0; try <= this->retries; ++try)
in->nlmsg_pid = getpid();
memset(&addr, 0, sizeof(addr));
addr.nl_family = AF_NETLINK;
addr.nl_pid = 0;
addr.nl_groups = 0;
if (this->names)
{ {
DBG3(DBG_KNL, "sending %N: %b", struct nlmsghdr *hdr;
this->names, in->nlmsg_type, in, in->nlmsg_len); status_t status;
} size_t len;
while (TRUE)
{
len = sendto(this->socket, in, in->nlmsg_len, 0,
(struct sockaddr*)&addr, sizeof(addr));
if (len != in->nlmsg_len) if (try > 0)
{ {
if (errno == EINTR) DBG1(DBG_KNL, "retransmitting Netlink request (%u/%u)",
try, this->retries);
}
status = send_once(this, in, seq, &hdr, &len);
switch (status)
{
case SUCCESS:
break;
case OUT_OF_RES:
continue;
default:
return status;
}
if (hdr->nlmsg_type == NLMSG_ERROR)
{
struct nlmsgerr* err;
err = NLMSG_DATA(hdr);
if (err->error == -EBUSY)
{ {
/* interrupted, try again */ free(hdr);
try--;
continue; continue;
} }
this->mutex->unlock(this->mutex); if (this->ignore_retransmit_errors && try > 0)
DBG1(DBG_KNL, "error sending to netlink socket: %s", strerror(errno));
return FAILED;
}
break;
}
while (TRUE)
{
len = recv(this->socket, &response, sizeof(response), 0);
if (len < 0)
{
if (errno == EINTR)
{ {
DBG1(DBG_KNL, "got interrupted"); ignore_retransmit_error(this, err, in->nlmsg_type);
/* interrupted, try again */
continue;
} }
DBG1(DBG_KNL, "error reading from netlink socket: %s", strerror(errno));
this->mutex->unlock(this->mutex);
free(result.ptr);
return FAILED;
} }
if (!NLMSG_OK(&response.hdr, len)) *out = hdr;
{ *out_len = len;
DBG1(DBG_KNL, "received corrupted netlink message"); return SUCCESS;
this->mutex->unlock(this->mutex);
free(result.ptr);
return FAILED;
}
if (response.hdr.nlmsg_seq != this->seq)
{
DBG1(DBG_KNL, "received invalid netlink sequence number");
if (response.hdr.nlmsg_seq < this->seq)
{
continue;
}
this->mutex->unlock(this->mutex);
free(result.ptr);
return FAILED;
}
result = chunk_cat("mc", result, chunk_create(response.bytes, len));
/* NLM_F_MULTI flag does not seem to be set correctly, we use sequence
* numbers to detect multi header messages */
len = recv(this->socket, &response.hdr, sizeof(response.hdr),
MSG_PEEK | MSG_DONTWAIT);
if (len == sizeof(response.hdr) && response.hdr.nlmsg_seq == this->seq)
{
/* seems to be multipart */
continue;
}
break;
} }
DBG1(DBG_KNL, "Netlink request timed out after %u retransmits",
*out_len = result.len; this->retries);
*out = (struct nlmsghdr*)result.ptr; return OUT_OF_RES;
this->mutex->unlock(this->mutex);
return SUCCESS;
} }
METHOD(netlink_socket_t, netlink_send_ack, status_t, METHOD(netlink_socket_t, netlink_send_ack, status_t,
@ -221,8 +534,13 @@ METHOD(netlink_socket_t, destroy, void,
{ {
if (this->socket != -1) if (this->socket != -1)
{ {
if (this->parallel)
{
lib->watcher->remove(lib->watcher, this->socket);
}
close(this->socket); close(this->socket);
} }
this->entries->destroy(this->entries);
this->mutex->destroy(this->mutex); this->mutex->destroy(this->mutex);
free(this); free(this);
} }
@ -230,7 +548,8 @@ METHOD(netlink_socket_t, destroy, void,
/** /**
* Described in header. * Described in header.
*/ */
netlink_socket_t *netlink_socket_create(int protocol, enum_name_t *names) netlink_socket_t *netlink_socket_create(int protocol, enum_name_t *names,
bool parallel)
{ {
private_netlink_socket_t *this; private_netlink_socket_t *this;
struct sockaddr_nl addr = { struct sockaddr_nl addr = {
@ -244,9 +563,19 @@ netlink_socket_t *netlink_socket_create(int protocol, enum_name_t *names)
.destroy = _destroy, .destroy = _destroy,
}, },
.seq = 200, .seq = 200,
.mutex = mutex_create(MUTEX_TYPE_DEFAULT), .mutex = mutex_create(MUTEX_TYPE_RECURSIVE),
.socket = socket(AF_NETLINK, SOCK_RAW, protocol), .socket = socket(AF_NETLINK, SOCK_RAW, protocol),
.entries = hashtable_create(hashtable_hash_ptr, hashtable_equals_ptr, 4),
.protocol = protocol,
.names = names, .names = names,
.timeout = lib->settings->get_int(lib->settings,
"%s.plugins.kernel-netlink.timeout", 0, lib->ns),
.retries = lib->settings->get_int(lib->settings,
"%s.plugins.kernel-netlink.retries", 0, lib->ns),
.ignore_retransmit_errors = lib->settings->get_bool(lib->settings,
"%s.plugins.kernel-netlink.ignore_retransmit_errors",
FALSE, lib->ns),
.parallel = parallel,
); );
if (this->socket == -1) if (this->socket == -1)
@ -261,6 +590,10 @@ netlink_socket_t *netlink_socket_create(int protocol, enum_name_t *names)
destroy(this); destroy(this);
return NULL; return NULL;
} }
if (this->parallel)
{
lib->watcher->add(lib->watcher, this->socket, WATCHER_READ, watch, this);
}
return &this->public; return &this->public;
} }

View File

@ -66,8 +66,10 @@ struct netlink_socket_t {
* *
* @param protocol protocol type (e.g. NETLINK_XFRM or NETLINK_ROUTE) * @param protocol protocol type (e.g. NETLINK_XFRM or NETLINK_ROUTE)
* @param names optional enum names for Netlink messages * @param names optional enum names for Netlink messages
* @param parallel support parallel queries on this Netlink socket
*/ */
netlink_socket_t *netlink_socket_create(int protocol, enum_name_t *names); netlink_socket_t *netlink_socket_create(int protocol, enum_name_t *names,
bool parallel);
/** /**
* Creates an rtattr and adds it to the given netlink message. * Creates an rtattr and adds it to the given netlink message.

View File

@ -0,0 +1,302 @@
/*
* Copyright (C) 2014 Martin Willi
* Copyright (C) 2014 revosec AG
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*/
#include <test_suite.h>
#include <threading/thread.h>
#include "../kernel_netlink_shared.h"
/**
* Netlink message drop configuration
*/
static int drop_interval = 0;
/**
* Netlink message drop hook
*/
bool netlink_msg_loss(struct nlmsghdr *hdr)
{
static refcount_t i;
if (drop_interval)
{
return ref_get(&i) % drop_interval == drop_interval - 1;
}
return FALSE;
}
START_TEST(test_echo)
{
netlink_socket_t *s;
struct nlmsghdr *out;
struct rtmsg *msg;
char dst[] = {
127,0,0,1
};
size_t len;
netlink_buf_t request = {
.hdr = {
.nlmsg_len = NLMSG_LENGTH(sizeof(struct rtmsg)),
.nlmsg_flags = NLM_F_REQUEST,
.nlmsg_type = RTM_GETROUTE,
},
};
msg = NLMSG_DATA(&request.hdr);
msg->rtm_family = AF_INET;
netlink_add_attribute(&request.hdr, RTA_DST,
chunk_from_thing(dst), sizeof(request));
s = netlink_socket_create(NETLINK_ROUTE, NULL, _i != 0);
ck_assert(s->send(s, &request.hdr, &out, &len) == SUCCESS);
ck_assert_int_eq(out->nlmsg_type, RTM_NEWROUTE);
free(out);
s->destroy(s);
}
END_TEST
START_TEST(test_echo_dump)
{
netlink_socket_t *s;
struct nlmsghdr *out, *current;
struct rtgenmsg *msg;
size_t len;
netlink_buf_t request = {
.hdr = {
.nlmsg_len = NLMSG_LENGTH(sizeof(struct rtgenmsg)),
.nlmsg_flags = NLM_F_REQUEST | NLM_F_MATCH | NLM_F_ROOT,
.nlmsg_type = RTM_GETLINK,
},
};
s = netlink_socket_create(NETLINK_ROUTE, NULL, _i != 0);
msg = NLMSG_DATA(&request.hdr);
msg->rtgen_family = AF_UNSPEC;
ck_assert(s->send(s, &request.hdr, &out, &len) == SUCCESS);
current = out;
while (TRUE)
{
ck_assert(NLMSG_OK(current, len));
if (current->nlmsg_type == NLMSG_DONE)
{
break;
}
ck_assert_int_eq(current->nlmsg_type, RTM_NEWLINK);
current = NLMSG_NEXT(current, len);
}
free(out);
s->destroy(s);
}
END_TEST
CALLBACK(stress, void*,
netlink_socket_t *s)
{
struct nlmsghdr *out;
struct rtmsg *msg;
char dst[] = {
127,0,0,1
};
size_t len;
int i;
netlink_buf_t request = {
.hdr = {
.nlmsg_len = NLMSG_LENGTH(sizeof(struct rtmsg)),
.nlmsg_flags = NLM_F_REQUEST,
.nlmsg_type = RTM_GETROUTE,
},
};
for (i = 0; i < 10; i++)
{
msg = NLMSG_DATA(&request.hdr);
msg->rtm_family = AF_INET;
netlink_add_attribute(&request.hdr, RTA_DST,
chunk_from_thing(dst), sizeof(request));
ck_assert(s->send(s, &request.hdr, &out, &len) == SUCCESS);
ck_assert_int_eq(out->nlmsg_type, RTM_NEWROUTE);
free(out);
}
return NULL;
}
CALLBACK(stress_dump, void*,
netlink_socket_t *s)
{
struct nlmsghdr *out, *current;
struct rtgenmsg *msg;
size_t len;
int i;
netlink_buf_t request = {
.hdr = {
.nlmsg_len = NLMSG_LENGTH(sizeof(struct rtgenmsg)),
.nlmsg_flags = NLM_F_REQUEST | NLM_F_MATCH | NLM_F_ROOT,
.nlmsg_type = RTM_GETLINK,
},
};
msg = NLMSG_DATA(&request.hdr);
msg->rtgen_family = AF_UNSPEC;
for (i = 0; i < 10; i++)
{
ck_assert(s->send(s, &request.hdr, &out, &len) == SUCCESS);
current = out;
while (TRUE)
{
ck_assert(NLMSG_OK(current, len));
if (current->nlmsg_type == NLMSG_DONE)
{
break;
}
ck_assert_int_eq(current->nlmsg_type, RTM_NEWLINK);
current = NLMSG_NEXT(current, len);
}
free(out);
}
return NULL;
}
START_TEST(test_stress)
{
thread_t *threads[10];
netlink_socket_t *s;
int i;
s = netlink_socket_create(NETLINK_ROUTE, NULL, _i != 0);
for (i = 0; i < countof(threads); i++)
{
threads[i] = thread_create(stress, s);
}
for (i = 0; i < countof(threads); i++)
{
threads[i]->join(threads[i]);
}
s->destroy(s);
}
END_TEST
START_TEST(test_stress_dump)
{
thread_t *threads[10];
netlink_socket_t *s;
int i;
s = netlink_socket_create(NETLINK_ROUTE, NULL, _i != 0);
for (i = 0; i < countof(threads); i++)
{
threads[i] = thread_create(stress_dump, s);
}
for (i = 0; i < countof(threads); i++)
{
threads[i]->join(threads[i]);
}
s->destroy(s);
}
END_TEST
START_TEST(test_retransmit_success)
{
netlink_socket_t *s;
struct nlmsghdr *out;
struct rtgenmsg *msg;
size_t len;
netlink_buf_t request = {
.hdr = {
.nlmsg_len = NLMSG_LENGTH(sizeof(struct rtgenmsg)),
.nlmsg_flags = NLM_F_REQUEST | NLM_F_MATCH | NLM_F_ROOT,
.nlmsg_type = RTM_GETLINK,
},
};
drop_interval = 2;
lib->settings->set_int(lib->settings,
"%s.plugins.kernel-netlink.timeout", 100, lib->ns);
lib->settings->set_int(lib->settings,
"%s.plugins.kernel-netlink.retries", 1, lib->ns);
s = netlink_socket_create(NETLINK_ROUTE, NULL, _i != 0);
msg = NLMSG_DATA(&request.hdr);
msg->rtgen_family = AF_UNSPEC;
ck_assert(s->send(s, &request.hdr, &out, &len) == SUCCESS);
free(out);
s->destroy(s);
drop_interval = 0;
}
END_TEST
START_TEST(test_retransmit_fail)
{
netlink_socket_t *s;
struct nlmsghdr *out;
struct rtgenmsg *msg;
size_t len;
netlink_buf_t request = {
.hdr = {
.nlmsg_len = NLMSG_LENGTH(sizeof(struct rtgenmsg)),
.nlmsg_flags = NLM_F_REQUEST | NLM_F_MATCH | NLM_F_ROOT,
.nlmsg_type = RTM_GETLINK,
},
};
drop_interval = 1;
lib->settings->set_int(lib->settings,
"%s.plugins.kernel-netlink.timeout", 50, lib->ns);
lib->settings->set_int(lib->settings,
"%s.plugins.kernel-netlink.retries", 3, lib->ns);
s = netlink_socket_create(NETLINK_ROUTE, NULL, _i != 0);
msg = NLMSG_DATA(&request.hdr);
msg->rtgen_family = AF_UNSPEC;
ck_assert(s->send(s, &request.hdr, &out, &len) == OUT_OF_RES);
s->destroy(s);
drop_interval = 0;
}
END_TEST
Suite *socket_suite_create()
{
Suite *s;
TCase *tc;
s = suite_create("netlink socket");
tc = tcase_create("echo");
tcase_add_loop_test(tc, test_echo, 0, 2);
tcase_add_loop_test(tc, test_echo_dump, 0, 2);
suite_add_tcase(s, tc);
tc = tcase_create("stress");
tcase_add_loop_test(tc, test_stress, 0, 2);
tcase_add_loop_test(tc, test_stress_dump, 0, 2);
suite_add_tcase(s, tc);
tc = tcase_create("retransmit");
tcase_add_loop_test(tc, test_retransmit_success, 0, 2);
tcase_add_loop_test(tc, test_retransmit_fail, 0, 2);
suite_add_tcase(s, tc);
return s;
}

View File

@ -0,0 +1,51 @@
/*
* Copyright (C) 2014 Martin Willi
* Copyright (C) 2014 revosec AG
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*/
#include <test_runner.h>
#include <hydra.h>
/* declare test suite constructors */
#define TEST_SUITE(x) test_suite_t* x();
#include "tests.h"
#undef TEST_SUITE
static test_configuration_t tests[] = {
#define TEST_SUITE(x) \
{ .suite = x, },
#include "tests.h"
{ .suite = NULL, }
};
static bool test_runner_init(bool init)
{
if (init)
{
dbg_default_set_level(0);
lib->processor->set_threads(lib->processor, 8);
dbg_default_set_level(1);
}
else
{
lib->processor->set_threads(lib->processor, 0);
lib->processor->cancel(lib->processor);
}
return TRUE;
}
int main(int argc, char *argv[])
{
return test_runner_run("kernel-netlink", tests, test_runner_init);
}

View File

@ -0,0 +1,16 @@
/*
* Copyright (C) 2014 Martin Willi
* Copyright (C) 2014 revosec AG
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*/
TEST_SUITE(socket_suite_create)