Automatically segment cluster using periodically sent status messages

This commit is contained in:
Martin Willi 2009-09-28 14:31:39 +02:00 committed by Martin Willi
parent b7f15be136
commit 3912fdb1ec
9 changed files with 177 additions and 70 deletions

View File

@ -606,6 +606,34 @@ static void process_segment(private_ha_sync_dispatcher_t *this,
enumerator->destroy(enumerator);
}
/**
* Process messages of type STATUS
*/
static void process_status(private_ha_sync_dispatcher_t *this,
ha_sync_message_t *message)
{
ha_sync_message_attribute_t attribute;
ha_sync_message_value_t value;
enumerator_t *enumerator;
segment_mask_t mask = 0;
enumerator = message->create_attribute_enumerator(message);
while (enumerator->enumerate(enumerator, &attribute, &value))
{
switch (attribute)
{
case HA_SYNC_SEGMENT:
mask |= SEGMENTS_BIT(value.u16);
break;
default:
break;
}
}
enumerator->destroy(enumerator);
this->segments->handle_status(this->segments, mask);
}
/**
* Dispatcher job function
*/
@ -637,6 +665,9 @@ static job_requeue_t dispatch(private_ha_sync_dispatcher_t *this)
case HA_SYNC_SEGMENT_TAKE:
process_segment(this, message, TRUE);
break;
case HA_SYNC_STATUS:
process_status(this, message);
break;
default:
DBG1(DBG_CFG, "received unknown HA sync message type %d",
message->get_type(message));

View File

@ -42,6 +42,7 @@ struct ha_sync_dispatcher_t {
*
* @param socket socket to pull messages from
* @param segments segments to control based on received messages
* @param manager distributed management logic for segment control
* @return dispatcher object
*/
ha_sync_dispatcher_t *ha_sync_dispatcher_create(ha_sync_socket_t *socket,

View File

@ -46,7 +46,7 @@ struct private_ha_sync_kernel_t {
/**
* Total number of ClusterIP segments
*/
u_int segment_count;
u_int count;
/**
* List of virtual addresses, as host_t*
@ -68,7 +68,7 @@ static bool in_segment(private_ha_sync_kernel_t *this,
addr = *(u_int32_t*)host->get_address(host).ptr;
hash = jhash_1word(ntohl(addr), this->initval);
if ((((u_int64_t)hash * this->segment_count) >> 32) + 1 == segment)
if ((((u_int64_t)hash * this->count) >> 32) + 1 == segment)
{
return TRUE;
}
@ -128,8 +128,7 @@ static void deactivate(private_ha_sync_kernel_t *this, u_int segment)
/**
* Mangle IPtable rules for virtual addresses
*/
static bool mangle_rules(private_ha_sync_kernel_t *this, bool add,
segment_mask_t active)
static bool mangle_rules(private_ha_sync_kernel_t *this, bool add)
{
enumerator_t *enumerator;
host_t *host;
@ -148,13 +147,12 @@ static bool mangle_rules(private_ha_sync_kernel_t *this, bool add,
host->destroy(host);
continue;
}
/* iptables insists of a local node specification. We add '1' but drop
* it afterwards. */
/* iptables insists of a local node specification, enable node 1 */
snprintf(buf, sizeof(buf),
"/sbin/iptables -%c INPUT -i %s -d %H -j CLUSTERIP --new "
"--hashmode sourceip --clustermac 01:00:5e:00:00:%2x "
"--total-nodes %d --local-node 1",
add ? 'A' : 'D', iface, host, mac++, this->segment_count);
add ? 'A' : 'D', iface, host, mac++, this->count);
free(iface);
if (system(buf) != 0)
{
@ -165,13 +163,9 @@ static bool mangle_rules(private_ha_sync_kernel_t *this, bool add,
if (add)
{
deactivate(this, 1);
for (i = 0; i < SEGMENTS_MAX; i++)
for (i = 2; i <= this->count; i++)
{
if (active & SEGMENTS_BIT(i))
{
activate(this, i);
}
activate(this, i);
}
}
return TRUE;
@ -207,7 +201,7 @@ static void parse_virtuals(private_ha_sync_kernel_t *this, char *virtual)
*/
static void destroy(private_ha_sync_kernel_t *this)
{
mangle_rules(this, FALSE, 0);
mangle_rules(this, FALSE);
this->virtuals->destroy_offset(this->virtuals, offsetof(host_t, destroy));
free(this);
}
@ -215,10 +209,11 @@ static void destroy(private_ha_sync_kernel_t *this)
/**
* See header
*/
ha_sync_kernel_t *ha_sync_kernel_create(u_int count, segment_mask_t active,
char *virtuals)
ha_sync_kernel_t *ha_sync_kernel_create(u_int count, char *virtuals)
{
private_ha_sync_kernel_t *this = malloc_thing(private_ha_sync_kernel_t);
segment_mask_t active;
int i;
this->public.in_segment = (bool(*)(ha_sync_kernel_t*, host_t *host, u_int segment))in_segment;
this->public.activate = (void(*)(ha_sync_kernel_t*, u_int segment))activate;
@ -226,12 +221,12 @@ ha_sync_kernel_t *ha_sync_kernel_create(u_int count, segment_mask_t active,
this->public.destroy = (void(*)(ha_sync_kernel_t*))destroy;
this->initval = 0;
this->segment_count = count;
this->count = count;
this->virtuals = linked_list_create();
parse_virtuals(this, virtuals);
if (!mangle_rules(this, TRUE, active))
if (!mangle_rules(this, TRUE))
{
destroy(this);
return NULL;

View File

@ -66,7 +66,6 @@ struct ha_sync_kernel_t {
* @param active bitmask of initially active segments
* @param virtuals comma separated list of virtual cluster addresses
*/
ha_sync_kernel_t *ha_sync_kernel_create(u_int count, segment_mask_t active,
char *virtuals);
ha_sync_kernel_t *ha_sync_kernel_create(u_int count, char *virtuals);
#endif /* HA_SYNC_KERNEL_ @}*/

View File

@ -55,6 +55,8 @@ enum ha_sync_message_type_t {
HA_SYNC_SEGMENT_DROP,
/** segments the sending node is taking over */
HA_SYNC_SEGMENT_TAKE,
/** status with the segments the sending node is currently serving */
HA_SYNC_STATUS,
};
/**

View File

@ -97,29 +97,6 @@ static void destroy(private_ha_sync_plugin_t *this)
free(this);
}
/**
* Convert segment string to mask
*/
static segment_mask_t parse_active(char *active)
{
enumerator_t *enumerator;
u_int segment;
segment_mask_t mask = 0;
enumerator = enumerator_create_token(active, ",", " ");
while (enumerator->enumerate(enumerator, &active))
{
segment = atoi(active);
if (segment > 0 && segment < SEGMENTS_MAX)
{
mask |= SEGMENTS_BIT(segment);
}
}
enumerator->destroy(enumerator);
return mask;
}
/*
* see header file
*/
@ -127,7 +104,6 @@ plugin_t *plugin_create()
{
private_ha_sync_plugin_t *this;
char *local, *remote, *secret, *virtuals;
segment_mask_t active;
u_int count;
bool fifo;
@ -143,8 +119,6 @@ plugin_t *plugin_create()
"charon.plugins.ha_sync.fifo_interface", FALSE);
count = min(SEGMENTS_MAX, lib->settings->get_int(lib->settings,
"charon.plugins.ha_sync.segment_count", 1));
active = parse_active(lib->settings->get_str(lib->settings,
"charon.plugins.ha_sync.active_segments", "1"));
if (!local || !remote)
{
DBG1(DBG_CFG, "HA sync config misses local/remote address");
@ -163,7 +137,7 @@ plugin_t *plugin_create()
free(this);
return NULL;
}
this->kernel = ha_sync_kernel_create(count, active, virtuals);
this->kernel = ha_sync_kernel_create(count, virtuals);
if (!this->kernel)
{
this->socket->destroy(this->socket);
@ -176,7 +150,7 @@ plugin_t *plugin_create()
this->tunnel = ha_sync_tunnel_create(local, remote, secret);
}
this->segments = ha_sync_segments_create(this->socket, this->kernel,
this->tunnel, count, active);
this->tunnel, local, remote, count);
if (fifo)
{
this->ctl = ha_sync_ctl_create(this->segments);

View File

@ -17,6 +17,7 @@
#include <utils/mutex.h>
#include <utils/linked_list.h>
#include <processing/jobs/callback_job.h>
typedef struct private_ha_sync_segments_t private_ha_sync_segments_t;
@ -53,12 +54,17 @@ struct private_ha_sync_segments_t {
/**
* Total number of ClusterIP segments
*/
u_int segment_count;
u_int count;
/**
* mask of active segments
*/
segment_mask_t active;
/**
* Are we the master node handling segment assignement?
*/
bool master;
};
/**
@ -71,9 +77,9 @@ static void log_segments(private_ha_sync_segments_t *this, bool activated,
int i;
bool first = TRUE;
for (i = 0; i < this->segment_count; i++)
for (i = 1; i <= this->count; i++)
{
if (this->active & 0x01 << i)
if (this->active & SEGMENTS_BIT(i))
{
if (first)
{
@ -83,7 +89,7 @@ static void log_segments(private_ha_sync_segments_t *this, bool activated,
{
pos += snprintf(pos, buf + sizeof(buf) - pos, ",");
}
pos += snprintf(pos, buf + sizeof(buf) - pos, "%d", i+1);
pos += snprintf(pos, buf + sizeof(buf) - pos, "%d", i);
}
}
DBG1(DBG_CFG, "HA sync segment %d %sactivated, now active: %s",
@ -98,19 +104,20 @@ static void enable_disable(private_ha_sync_segments_t *this, u_int segment,
{
ike_sa_t *ike_sa;
enumerator_t *enumerator;
u_int i, limit;
u_int i, from, to;
this->lock->write_lock(this->lock);
if (segment == 0 || segment <= this->segment_count)
if (segment == 0 || segment <= this->count)
{
if (segment)
{ /* loop once for single segment ... */
limit = segment + 1;
from = to = segment;
}
else
{ /* or segment_count times for all segments */
limit = this->segment_count;
{ /* or count times for all segments */
from = 1;
to = this->count;
}
enumerator = charon->ike_sa_manager->create_enumerator(charon->ike_sa_manager);
while (enumerator->enumerate(enumerator, &ike_sa))
@ -123,7 +130,7 @@ static void enable_disable(private_ha_sync_segments_t *this, u_int segment,
{
continue;
}
for (i = segment; i < limit; i++)
for (i = from; i <= to; i++)
{
if (this->kernel->in_segment(this->kernel,
ike_sa->get_other_host(ike_sa), i))
@ -133,7 +140,7 @@ static void enable_disable(private_ha_sync_segments_t *this, u_int segment,
}
}
enumerator->destroy(enumerator);
for (i = segment; i < limit; i++)
for (i = from; i <= to; i++)
{
if (enable)
{
@ -152,7 +159,6 @@ static void enable_disable(private_ha_sync_segments_t *this, u_int segment,
}
}
}
log_segments(this, enable, segment);
}
@ -233,7 +239,7 @@ static void resync(private_ha_sync_segments_t *this, u_int segment)
list = linked_list_create();
this->lock->read_lock(this->lock);
if (segment > 0 && segment <= this->segment_count && (this->active & mask))
if (segment > 0 && segment <= this->count && (this->active & mask))
{
this->active &= ~mask;
@ -290,7 +296,7 @@ static bool alert_hook(private_ha_sync_segments_t *this, ike_sa_t *ike_sa,
{
int i;
for (i = 0; i < SEGMENTS_MAX; i++)
for (i = 1; i <= this->count; i++)
{
if (this->active & SEGMENTS_BIT(i))
{
@ -301,6 +307,88 @@ static bool alert_hook(private_ha_sync_segments_t *this, ike_sa_t *ike_sa,
return TRUE;
}
/**
* Implementation of ha_sync_segments_t.handle_status
*/
static void handle_status(private_ha_sync_segments_t *this, segment_mask_t mask)
{
segment_mask_t missing, overlap;
int i, active = 0;
this->lock->read_lock(this->lock);
missing = ~(this->active | mask);
overlap = this->active & mask;
for (i = 1; i <= this->count; i++)
{
if (this->active & SEGMENTS_BIT(i))
{
active++;
}
}
this->lock->unlock(this->lock);
/* Activate any missing segment. The master will disable overlapping
* segments if both nodes activate the missing segments simultaneously. */
for (i = 1; i <= this->count; i++)
{
if (missing & SEGMENTS_BIT(i))
{
DBG1(DBG_CFG, "HA segment %d was not handled", i);
activate(this, i, TRUE);
}
}
if (this->master && overlap)
{
/* Disable overlapping segment on one node, controlled by master */
for (i = 1; i <= this->count; i++)
{
if (overlap & SEGMENTS_BIT(i))
{
DBG1(DBG_CFG, "HA segment %d handled twice", i);
if (active > this->count)
{
deactivate(this, i, TRUE);
active--;
}
else
{
activate(this, i, TRUE);
active++;
}
}
}
}
}
/**
* Send a status message with our active segments
*/
static job_requeue_t send_status(private_ha_sync_segments_t *this)
{
ha_sync_message_t *message;
int i;
message = ha_sync_message_create(HA_SYNC_STATUS);
for (i = 1; i <= this->count; i++)
{
if (this->active & SEGMENTS_BIT(i))
{
message->add_attribute(message, HA_SYNC_SEGMENT, i);
}
}
this->socket->push(this->socket, message);
/* schedule next invocation */
charon->scheduler->schedule_job_ms(charon->scheduler, (job_t*)
callback_job_create((callback_job_cb_t)
send_status, this, NULL, NULL),
1000);
return JOB_REQUEUE_NONE;
}
/**
* Implementation of ha_sync_segments_t.destroy.
*/
@ -314,25 +402,35 @@ static void destroy(private_ha_sync_segments_t *this)
* See header
*/
ha_sync_segments_t *ha_sync_segments_create(ha_sync_socket_t *socket,
ha_sync_kernel_t *kernel,
ha_sync_tunnel_t *tunnel,
u_int count, segment_mask_t active)
ha_sync_kernel_t *kernel, ha_sync_tunnel_t *tunnel,
char *local, char *remote, u_int count)
{
private_ha_sync_segments_t *this = malloc_thing(private_ha_sync_segments_t);
int i;
memset(&this->public.listener, 0, sizeof(listener_t));
this->public.listener.alert = (bool(*)(listener_t*, ike_sa_t *, alert_t, va_list))alert_hook;
this->public.activate = (void(*)(ha_sync_segments_t*, u_int segment,bool))activate;
this->public.deactivate = (void(*)(ha_sync_segments_t*, u_int segment,bool))deactivate;
this->public.resync = (void(*)(ha_sync_segments_t*, u_int segment))resync;
this->public.handle_status = (void(*)(ha_sync_segments_t*, segment_mask_t mask))handle_status;
this->public.destroy = (void(*)(ha_sync_segments_t*))destroy;
this->socket = socket;
this->tunnel = tunnel;
this->kernel = kernel;
this->lock = rwlock_create(RWLOCK_TYPE_DEFAULT);
this->active = active;
this->segment_count = count;
this->count = count;
this->master = strcmp(local, remote) > 0;
/* initially all segments are active */
this->active = 0;
for (i = 1; i <= count; i++)
{
this->active |= SEGMENTS_BIT(i);
}
send_status(this);
return &this->public;
}

View File

@ -79,6 +79,13 @@ struct ha_sync_segments_t {
*/
void (*resync)(ha_sync_segments_t *this, u_int segment);
/**
* Handle a status message from the remote node.
*
* @param mask segments the remote node is serving actively
*/
void (*handle_status)(ha_sync_segments_t *this, segment_mask_t mask);
/**
* Destroy a ha_sync_segments_t.
*/
@ -95,8 +102,7 @@ struct ha_sync_segments_t {
* @return segment object
*/
ha_sync_segments_t *ha_sync_segments_create(ha_sync_socket_t *socket,
ha_sync_kernel_t *kernel,
ha_sync_tunnel_t *tunnel,
u_int count, segment_mask_t active);
ha_sync_kernel_t *kernel, ha_sync_tunnel_t *tunnel,
char *local, char *remote, u_int count);
#endif /* HA_SYNC_SEGMENTS_ @}*/

View File

@ -152,6 +152,7 @@ static void push(private_ha_sync_socket_t *this, ha_sync_message_t *message)
job = callback_job_create((callback_job_cb_t)send_message,
data, (void*)job_data_destroy, NULL);
charon->processor->queue_job(charon->processor, (job_t*)job);
sched_yield();
}
/**