Take over all segments if heartbeat becomes silent
This commit is contained in:
parent
d87489661c
commit
e262f4e543
|
@ -15,10 +15,15 @@
|
|||
|
||||
#include "ha_segments.h"
|
||||
|
||||
#include <pthread.h>
|
||||
|
||||
#include <utils/mutex.h>
|
||||
#include <utils/linked_list.h>
|
||||
#include <processing/jobs/callback_job.h>
|
||||
|
||||
#define HEARTBEAT_DELAY 1000
|
||||
#define HEARTBEAT_TIMEOUT 2100
|
||||
|
||||
typedef struct private_ha_segments_t private_ha_segments_t;
|
||||
|
||||
/**
|
||||
|
@ -47,9 +52,19 @@ struct private_ha_segments_t {
|
|||
ha_kernel_t *kernel;
|
||||
|
||||
/**
|
||||
* read/write lock for segment manipulation
|
||||
* Mutex to lock segment manipulation
|
||||
*/
|
||||
rwlock_t *lock;
|
||||
mutex_t *mutex;
|
||||
|
||||
/**
|
||||
* Condvar to wait for heartbeats
|
||||
*/
|
||||
condvar_t *condvar;
|
||||
|
||||
/**
|
||||
* Job checking for heartbeats
|
||||
*/
|
||||
callback_job_t *job;
|
||||
|
||||
/**
|
||||
* Total number of ClusterIP segments
|
||||
|
@ -178,7 +193,7 @@ static void enable_disable_all(private_ha_segments_t *this, u_int segment,
|
|||
{
|
||||
int i;
|
||||
|
||||
this->lock->write_lock(this->lock);
|
||||
this->mutex->lock(this->mutex);
|
||||
if (segment == 0)
|
||||
{
|
||||
for (i = 1; i <= this->count; i++)
|
||||
|
@ -190,7 +205,7 @@ static void enable_disable_all(private_ha_segments_t *this, u_int segment,
|
|||
{
|
||||
enable_disable(this, segment, enable, notify);
|
||||
}
|
||||
this->lock->unlock(this->lock);
|
||||
this->mutex->unlock(this->mutex);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -245,7 +260,7 @@ static void resync(private_ha_segments_t *this, u_int segment)
|
|||
u_int16_t mask = SEGMENTS_BIT(segment);
|
||||
|
||||
list = linked_list_create();
|
||||
this->lock->read_lock(this->lock);
|
||||
this->mutex->lock(this->mutex);
|
||||
|
||||
if (segment > 0 && segment <= this->count && (this->active & mask))
|
||||
{
|
||||
|
@ -269,7 +284,7 @@ static void resync(private_ha_segments_t *this, u_int segment)
|
|||
}
|
||||
enumerator->destroy(enumerator);
|
||||
}
|
||||
this->lock->unlock(this->lock);
|
||||
this->mutex->unlock(this->mutex);
|
||||
|
||||
while (list->remove_last(list, (void**)&id) == SUCCESS)
|
||||
{
|
||||
|
@ -315,7 +330,7 @@ static void handle_status(private_ha_segments_t *this, segment_mask_t mask)
|
|||
segment_mask_t missing, overlap;
|
||||
int i, active = 0;
|
||||
|
||||
this->lock->write_lock(this->lock);
|
||||
this->mutex->lock(this->mutex);
|
||||
|
||||
missing = ~(this->active | mask);
|
||||
overlap = this->active & mask;
|
||||
|
@ -358,7 +373,8 @@ static void handle_status(private_ha_segments_t *this, segment_mask_t mask)
|
|||
}
|
||||
}
|
||||
}
|
||||
this->lock->unlock(this->lock);
|
||||
this->mutex->unlock(this->mutex);
|
||||
this->condvar->signal(this->condvar);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -385,17 +401,41 @@ static job_requeue_t send_status(private_ha_segments_t *this)
|
|||
charon->scheduler->schedule_job_ms(charon->scheduler, (job_t*)
|
||||
callback_job_create((callback_job_cb_t)
|
||||
send_status, this, NULL, NULL),
|
||||
1000);
|
||||
HEARTBEAT_DELAY);
|
||||
|
||||
return JOB_REQUEUE_NONE;
|
||||
}
|
||||
|
||||
/**
|
||||
* Monitor heartbeat activity of remote node
|
||||
*/
|
||||
static job_requeue_t watchdog(private_ha_segments_t *this)
|
||||
{
|
||||
int oldstate;
|
||||
bool timeout;
|
||||
|
||||
this->mutex->lock(this->mutex);
|
||||
pthread_cleanup_push((void*)this->mutex->unlock, this->mutex);
|
||||
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
|
||||
timeout = this->condvar->timed_wait(this->condvar, this->mutex,
|
||||
HEARTBEAT_TIMEOUT);
|
||||
pthread_setcancelstate(oldstate, NULL);
|
||||
pthread_cleanup_pop(TRUE);
|
||||
if (timeout)
|
||||
{ /* didn't get a heartbeat, take all segments */
|
||||
activate(this, 0, TRUE);
|
||||
}
|
||||
return JOB_REQUEUE_DIRECT;
|
||||
}
|
||||
|
||||
/**
|
||||
* Implementation of ha_segments_t.destroy.
|
||||
*/
|
||||
static void destroy(private_ha_segments_t *this)
|
||||
{
|
||||
this->lock->destroy(this->lock);
|
||||
this->job->cancel(this->job);
|
||||
this->mutex->destroy(this->mutex);
|
||||
this->condvar->destroy(this->condvar);
|
||||
free(this);
|
||||
}
|
||||
|
||||
|
@ -419,7 +459,8 @@ ha_segments_t *ha_segments_create(ha_socket_t *socket, ha_kernel_t *kernel,
|
|||
this->socket = socket;
|
||||
this->tunnel = tunnel;
|
||||
this->kernel = kernel;
|
||||
this->lock = rwlock_create(RWLOCK_TYPE_DEFAULT);
|
||||
this->mutex = mutex_create(MUTEX_TYPE_DEFAULT);
|
||||
this->condvar = condvar_create(CONDVAR_TYPE_DEFAULT);
|
||||
this->count = count;
|
||||
this->master = strcmp(local, remote) > 0;
|
||||
|
||||
|
@ -432,6 +473,11 @@ ha_segments_t *ha_segments_create(ha_socket_t *socket, ha_kernel_t *kernel,
|
|||
|
||||
send_status(this);
|
||||
|
||||
/* start heartbeat detection thread */
|
||||
this->job = callback_job_create((callback_job_cb_t)watchdog,
|
||||
this, NULL, NULL);
|
||||
charon->processor->queue_job(charon->processor, (job_t*)this->job);
|
||||
|
||||
return &this->public;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue