Propagate segment manipulation to cluster node
This commit is contained in:
parent
3d672d4b0a
commit
37459ea928
|
@ -78,10 +78,10 @@ static job_requeue_t dispatch_fifo(private_ha_sync_ctl_t *this)
|
|||
switch (buf[0])
|
||||
{
|
||||
case '+':
|
||||
this->segments->activate(this->segments, segment);
|
||||
this->segments->activate(this->segments, segment, TRUE);
|
||||
break;
|
||||
case '-':
|
||||
this->segments->deactivate(this->segments, segment);
|
||||
this->segments->deactivate(this->segments, segment, TRUE);
|
||||
break;
|
||||
case '*':
|
||||
this->segments->resync(this->segments, segment);
|
||||
|
|
|
@ -35,6 +35,11 @@ struct private_ha_sync_dispatcher_t {
|
|||
*/
|
||||
ha_sync_socket_t *socket;
|
||||
|
||||
/**
|
||||
* segments to control
|
||||
*/
|
||||
ha_sync_segments_t *segments;
|
||||
|
||||
/**
|
||||
* Dispatcher job
|
||||
*/
|
||||
|
@ -569,6 +574,38 @@ static void process_child_delete(private_ha_sync_dispatcher_t *this,
|
|||
enumerator->destroy(enumerator);
|
||||
}
|
||||
|
||||
/**
|
||||
* Process messages of type SEGMENT_TAKE/DROP
|
||||
*/
|
||||
static void process_segment(private_ha_sync_dispatcher_t *this,
|
||||
ha_sync_message_t *message, bool take)
|
||||
{
|
||||
ha_sync_message_attribute_t attribute;
|
||||
ha_sync_message_value_t value;
|
||||
enumerator_t *enumerator;
|
||||
|
||||
enumerator = message->create_attribute_enumerator(message);
|
||||
while (enumerator->enumerate(enumerator, &attribute, &value))
|
||||
{
|
||||
switch (attribute)
|
||||
{
|
||||
case HA_SYNC_SEGMENT:
|
||||
if (take)
|
||||
{
|
||||
this->segments->deactivate(this->segments, value.u16, FALSE);
|
||||
}
|
||||
else
|
||||
{
|
||||
this->segments->activate(this->segments, value.u16, FALSE);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
enumerator->destroy(enumerator);
|
||||
}
|
||||
|
||||
/**
|
||||
* Dispatcher job function
|
||||
*/
|
||||
|
@ -594,6 +631,12 @@ static job_requeue_t dispatch(private_ha_sync_dispatcher_t *this)
|
|||
case HA_SYNC_CHILD_DELETE:
|
||||
process_child_delete(this, message);
|
||||
break;
|
||||
case HA_SYNC_SEGMENT_DROP:
|
||||
process_segment(this, message, FALSE);
|
||||
break;
|
||||
case HA_SYNC_SEGMENT_TAKE:
|
||||
process_segment(this, message, TRUE);
|
||||
break;
|
||||
default:
|
||||
DBG1(DBG_CFG, "received unknown HA sync message type %d",
|
||||
message->get_type(message));
|
||||
|
@ -616,16 +659,19 @@ static void destroy(private_ha_sync_dispatcher_t *this)
|
|||
/**
|
||||
* See header
|
||||
*/
|
||||
ha_sync_dispatcher_t *ha_sync_dispatcher_create(ha_sync_socket_t *socket)
|
||||
ha_sync_dispatcher_t *ha_sync_dispatcher_create(ha_sync_socket_t *socket,
|
||||
ha_sync_segments_t *segments)
|
||||
{
|
||||
private_ha_sync_dispatcher_t *this = malloc_thing(private_ha_sync_dispatcher_t);
|
||||
|
||||
this->public.destroy = (void(*)(ha_sync_dispatcher_t*))destroy;
|
||||
|
||||
this->socket = socket;
|
||||
this->segments = segments;
|
||||
this->job = callback_job_create((callback_job_cb_t)dispatch,
|
||||
this, NULL, NULL);
|
||||
charon->processor->queue_job(charon->processor, (job_t*)this->job);
|
||||
|
||||
return &this->public;
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
#define HA_SYNC_DISPATCHER_H_
|
||||
|
||||
#include "ha_sync_socket.h"
|
||||
#include "ha_sync_segments.h"
|
||||
|
||||
typedef struct ha_sync_dispatcher_t ha_sync_dispatcher_t;
|
||||
|
||||
|
@ -40,8 +41,10 @@ struct ha_sync_dispatcher_t {
|
|||
* Create a ha_sync_dispatcher instance pulling from socket.
|
||||
*
|
||||
* @param socket socket to pull messages from
|
||||
* @param segments segments to control based on received messages
|
||||
* @return dispatcher object
|
||||
*/
|
||||
ha_sync_dispatcher_t *ha_sync_dispatcher_create(ha_sync_socket_t *socket);
|
||||
ha_sync_dispatcher_t *ha_sync_dispatcher_create(ha_sync_socket_t *socket,
|
||||
ha_sync_segments_t *segments);
|
||||
|
||||
#endif /* HA_SYNC_DISPATCHER_ @}*/
|
||||
|
|
|
@ -223,6 +223,7 @@ static void add_attribute(private_ha_sync_message_t *this,
|
|||
case HA_SYNC_ALG_INTEG:
|
||||
case HA_SYNC_INBOUND_CPI:
|
||||
case HA_SYNC_OUTBOUND_CPI:
|
||||
case HA_SYNC_SEGMENT:
|
||||
{
|
||||
u_int16_t val;
|
||||
|
||||
|
@ -438,6 +439,7 @@ static bool attribute_enumerate(attribute_enumerator_t *this,
|
|||
case HA_SYNC_ALG_INTEG:
|
||||
case HA_SYNC_INBOUND_CPI:
|
||||
case HA_SYNC_OUTBOUND_CPI:
|
||||
case HA_SYNC_SEGMENT:
|
||||
{
|
||||
if (this->buf.len < sizeof(u_int16_t))
|
||||
{
|
||||
|
|
|
@ -51,6 +51,10 @@ enum ha_sync_message_type_t {
|
|||
HA_SYNC_CHILD_ADD,
|
||||
/** delete an existing CHILD_SA */
|
||||
HA_SYNC_CHILD_DELETE,
|
||||
/** segments the sending node is giving up */
|
||||
HA_SYNC_SEGMENT_DROP,
|
||||
/** segments the sending node is taking over */
|
||||
HA_SYNC_SEGMENT_TAKE,
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -121,6 +125,8 @@ enum ha_sync_message_attribute_t {
|
|||
HA_SYNC_INITIATE_MID,
|
||||
/** u_int32_t, responding message ID */
|
||||
HA_SYNC_RESPOND_MID,
|
||||
/** u_int16_t, HA sync segment */
|
||||
HA_SYNC_SEGMENT,
|
||||
};
|
||||
|
||||
/**
|
||||
|
|
|
@ -125,7 +125,7 @@ plugin_t *plugin_create()
|
|||
free(this);
|
||||
return NULL;
|
||||
}
|
||||
this->segments = ha_sync_segments_create();
|
||||
this->segments = ha_sync_segments_create(this->socket);
|
||||
if (secret)
|
||||
{
|
||||
this->tunnel = ha_sync_tunnel_create(secret, local, remote);
|
||||
|
@ -134,7 +134,7 @@ plugin_t *plugin_create()
|
|||
{
|
||||
this->ctl = ha_sync_ctl_create(this->segments);
|
||||
}
|
||||
this->dispatcher = ha_sync_dispatcher_create(this->socket);
|
||||
this->dispatcher = ha_sync_dispatcher_create(this->socket, this->segments);
|
||||
this->ike = ha_sync_ike_create(this->socket, this->tunnel);
|
||||
this->child = ha_sync_child_create(this->socket, this->tunnel);
|
||||
charon->bus->add_listener(charon->bus, &this->ike->listener);
|
||||
|
|
|
@ -37,6 +37,11 @@ struct private_ha_sync_segments_t {
|
|||
*/
|
||||
ha_sync_segments_t public;
|
||||
|
||||
/**
|
||||
* communication socket
|
||||
*/
|
||||
ha_sync_socket_t *socket;
|
||||
|
||||
/**
|
||||
* read/write lock for segment manipulation
|
||||
*/
|
||||
|
@ -175,17 +180,37 @@ static void enable_disable(private_ha_sync_segments_t *this, u_int segment,
|
|||
/**
|
||||
* Implementation of ha_sync_segments_t.activate
|
||||
*/
|
||||
static void activate(private_ha_sync_segments_t *this, u_int segment)
|
||||
static void activate(private_ha_sync_segments_t *this, u_int segment,
|
||||
bool notify)
|
||||
{
|
||||
return enable_disable(this, segment, IKE_PASSIVE, IKE_ESTABLISHED, TRUE);
|
||||
ha_sync_message_t *message;
|
||||
|
||||
enable_disable(this, segment, IKE_PASSIVE, IKE_ESTABLISHED, TRUE);
|
||||
|
||||
if (notify)
|
||||
{
|
||||
message = ha_sync_message_create(HA_SYNC_SEGMENT_TAKE);
|
||||
message->add_attribute(message, HA_SYNC_SEGMENT, segment);
|
||||
this->socket->push(this->socket, message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Implementation of ha_sync_segments_t.deactivate
|
||||
*/
|
||||
static void deactivate(private_ha_sync_segments_t *this, u_int segment)
|
||||
static void deactivate(private_ha_sync_segments_t *this, u_int segment,
|
||||
bool notify)
|
||||
{
|
||||
return enable_disable(this, segment, IKE_ESTABLISHED, IKE_PASSIVE, FALSE);
|
||||
ha_sync_message_t *message;
|
||||
|
||||
enable_disable(this, segment, IKE_ESTABLISHED, IKE_PASSIVE, FALSE);
|
||||
|
||||
if (notify)
|
||||
{
|
||||
message = ha_sync_message_create(HA_SYNC_SEGMENT_DROP);
|
||||
message->add_attribute(message, HA_SYNC_SEGMENT, segment);
|
||||
this->socket->push(this->socket, message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -284,18 +309,19 @@ static void destroy(private_ha_sync_segments_t *this)
|
|||
/**
|
||||
* See header
|
||||
*/
|
||||
ha_sync_segments_t *ha_sync_segments_create()
|
||||
ha_sync_segments_t *ha_sync_segments_create(ha_sync_socket_t *socket)
|
||||
{
|
||||
private_ha_sync_segments_t *this = malloc_thing(private_ha_sync_segments_t);
|
||||
enumerator_t *enumerator;
|
||||
u_int segment;
|
||||
char *str;
|
||||
|
||||
this->public.activate = (void(*)(ha_sync_segments_t*, u_int segment))activate;
|
||||
this->public.deactivate = (void(*)(ha_sync_segments_t*, u_int segment))deactivate;
|
||||
this->public.activate = (void(*)(ha_sync_segments_t*, u_int segment,bool))activate;
|
||||
this->public.deactivate = (void(*)(ha_sync_segments_t*, u_int segment,bool))deactivate;
|
||||
this->public.resync = (void(*)(ha_sync_segments_t*, u_int segment))resync;
|
||||
this->public.destroy = (void(*)(ha_sync_segments_t*))destroy;
|
||||
|
||||
this->socket = socket;
|
||||
this->lock = rwlock_create(RWLOCK_TYPE_DEFAULT);
|
||||
this->initval = 0;
|
||||
this->active = 0;
|
||||
|
|
|
@ -21,32 +21,32 @@
|
|||
#ifndef HA_SYNC_SEGMENTS_H_
|
||||
#define HA_SYNC_SEGMENTS_H_
|
||||
|
||||
#include "ha_sync_socket.h"
|
||||
|
||||
#include <daemon.h>
|
||||
|
||||
typedef struct ha_sync_segments_t ha_sync_segments_t;
|
||||
|
||||
/**
|
||||
* Locally segmentsd HA state synced from other nodes.
|
||||
* Segmentation of peers into active and passive.
|
||||
*/
|
||||
struct ha_sync_segments_t {
|
||||
|
||||
/**
|
||||
* Activate a set of IKE_SAs identified by a segments.
|
||||
*
|
||||
* Activating means do a takeover of SAs as the responsible node has failed.
|
||||
* This involves moving all SAs to the daemons IKE_SA manager and handle
|
||||
* them actively now.
|
||||
* Activate a set of IKE_SAs identified by a segment.
|
||||
*
|
||||
* @param segment numerical segment to takeover, 0 for all
|
||||
* @param notify wheter to notify other nodes about activation
|
||||
*/
|
||||
void (*activate)(ha_sync_segments_t *this, u_int segment);
|
||||
void (*activate)(ha_sync_segments_t *this, u_int segment, bool notify);
|
||||
|
||||
/**
|
||||
* Deactivate a set of IKE_SAs identified by a segments.
|
||||
* Deactivate a set of IKE_SAs identified by a segment.
|
||||
*
|
||||
* @param segment numerical segment to takeover, 0 for all
|
||||
* @param notify wheter to notify other nodes about deactivation
|
||||
*/
|
||||
void (*deactivate)(ha_sync_segments_t *this, u_int segment);
|
||||
void (*deactivate)(ha_sync_segments_t *this, u_int segment, bool notify);
|
||||
|
||||
/**
|
||||
* Resync an active segment.
|
||||
|
@ -68,7 +68,10 @@ struct ha_sync_segments_t {
|
|||
|
||||
/**
|
||||
* Create a ha_sync_segments instance.
|
||||
*
|
||||
* @param socket socket to communicate segment (de-)activation
|
||||
* @return segment object
|
||||
*/
|
||||
ha_sync_segments_t *ha_sync_segments_create();
|
||||
ha_sync_segments_t *ha_sync_segments_create(ha_sync_socket_t *socket);
|
||||
|
||||
#endif /* HA_SYNC_SEGMENTS_ @}*/
|
||||
|
|
Loading…
Reference in New Issue