added generic interrupt support

git-svn-id: http://svn.openzap.org/svn/openzap/branches/sangoma_boost@1057 a93c3328-9c30-0410-af19-c9cd2b2d52af
This commit is contained in:
Moises Silva 2010-03-12 18:27:24 +00:00
parent 617020ea0c
commit 5305611be3
11 changed files with 372 additions and 233 deletions

View File

@ -5,3 +5,5 @@
then ftdm_event_t would be renamed to ftdm_oob_event_t and the enum_id renamed to type, then ftdm_span_next_event()
will only return OOB events
- query span hw status (connected/disconnected) on startup

View File

@ -49,6 +49,8 @@
#include "ftdm_pika.h"
#endif
#define SPAN_PENDING_CHANS_QUEUE_SIZE 1000
static int time_is_init = 0;
static void time_init(void)
@ -410,6 +412,7 @@ static ftdm_status_t ftdm_span_destroy(ftdm_span_t *span)
}
/* destroy final basic resources of the span data structure */
ftdm_queue_destroy(&span->pendingchans);
ftdm_mutex_unlock(span->mutex);
ftdm_mutex_destroy(&span->mutex);
ftdm_safe_free(span->signal_data);
@ -504,16 +507,19 @@ FT_DECLARE(ftdm_status_t) ftdm_span_create(ftdm_io_interface_t *fio, ftdm_span_t
ftdm_span_t *new_span = NULL;
ftdm_status_t status = FTDM_FAIL;
assert(fio != NULL);
ftdm_assert(fio != NULL, "No IO provided\n");
ftdm_mutex_lock(globals.mutex);
if (globals.span_index < FTDM_MAX_SPANS_INTERFACE) {
new_span = ftdm_malloc(sizeof(*new_span));
assert(new_span);
memset(new_span, 0, sizeof(*new_span));
new_span = ftdm_calloc(sizeof(*new_span), 1);
ftdm_assert(new_span, "allocating span failed\n");
status = ftdm_mutex_create(&new_span->mutex);
assert(status == FTDM_SUCCESS);
ftdm_assert(status == FTDM_SUCCESS, "mutex creation failed\n");
status = ftdm_queue_create(&new_span->pendingchans, SPAN_PENDING_CHANS_QUEUE_SIZE);
ftdm_assert(status == FTDM_SUCCESS, "span chans queue creation failed\n");
ftdm_set_flag(new_span, FTDM_SPAN_CONFIGURED);
new_span->span_id = ++globals.span_index;
@ -1154,7 +1160,12 @@ FT_DECLARE(ftdm_status_t) ftdm_channel_set_state(ftdm_channel_t *ftdmchan, ftdm_
if (ok) {
ftdm_set_flag(ftdmchan, FTDM_CHANNEL_STATE_CHANGE);
ftdm_set_flag_locked(ftdmchan->span, FTDM_SPAN_STATE_CHANGE);
ftdm_mutex_lock(ftdmchan->span->mutex);
ftdm_set_flag(ftdmchan->span, FTDM_SPAN_STATE_CHANGE);
ftdm_queue_enqueue(ftdmchan->span->pendingchans, ftdmchan);
ftdm_mutex_unlock(ftdmchan->span->mutex);
ftdmchan->last_state = ftdmchan->state;
ftdmchan->state = state;
}
@ -3155,44 +3166,85 @@ FT_DECLARE(int) ftdm_load_modules(void)
FT_DECLARE(ftdm_status_t) ftdm_unload_modules(void)
{
ftdm_hash_iterator_t *i;
ftdm_dso_lib_t lib;
ftdm_hash_iterator_t *i = NULL;
ftdm_dso_lib_t lib = NULL;
char modpath[255] = { 0 };
/* stop signaling interfaces first as signaling depends on I/O and not the other way around */
for (i = hashtable_first(globals.module_hash); i; i = hashtable_next(i)) {
const void *key;
void *val;
const void *key = NULL;
void *val = NULL;
ftdm_module_t *mod = NULL;
hashtable_this(i, &key, NULL, &val);
if (key && val) {
ftdm_module_t *mod = (ftdm_module_t *) val;
if (!mod) {
continue;
}
if (mod->io_unload) {
if (mod->io_unload() == FTDM_SUCCESS) {
ftdm_log(FTDM_LOG_INFO, "Unloading IO %s\n", mod->name);
} else {
ftdm_log(FTDM_LOG_ERROR, "Error unloading IO %s\n", mod->name);
}
}
if (mod->sig_unload) {
if (mod->sig_unload() == FTDM_SUCCESS) {
ftdm_log(FTDM_LOG_INFO, "Unloading SIG %s\n", mod->name);
} else {
ftdm_log(FTDM_LOG_ERROR, "Error unloading SIG %s\n", mod->name);
}
}
ftdm_log(FTDM_LOG_INFO, "Unloading %s\n", mod->path);
lib = mod->lib;
ftdm_dso_destroy(&lib);
if (!key || !val) {
continue;
}
mod = (ftdm_module_t *) val;
if (!mod->sig_unload) {
continue;
}
ftdm_log(FTDM_LOG_INFO, "Unloading signaling interface %s\n", mod->name);
if (mod->sig_unload() != FTDM_SUCCESS) {
ftdm_log(FTDM_LOG_ERROR, "Error unloading signaling interface %s\n", mod->name);
continue;
}
ftdm_log(FTDM_LOG_INFO, "Unloaded signaling interface %s\n", mod->name);
}
/* Now go ahead with I/O interfaces */
for (i = hashtable_first(globals.module_hash); i; i = hashtable_next(i)) {
const void *key = NULL;
void *val = NULL;
ftdm_module_t *mod = NULL;
hashtable_this(i, &key, NULL, &val);
if (!key || !val) {
continue;
}
mod = (ftdm_module_t *) val;
if (!mod->io_unload) {
continue;
}
ftdm_log(FTDM_LOG_INFO, "Unloading I/O interface %s\n", mod->name);
if (mod->io_unload() != FTDM_SUCCESS) {
ftdm_log(FTDM_LOG_ERROR, "Error unloading I/O interface %s\n", mod->name);
continue;
}
ftdm_log(FTDM_LOG_INFO, "Unloaded I/O interface %s\n", mod->name);
}
/* Now unload the actual shared object/dll */
for (i = hashtable_first(globals.module_hash); i; i = hashtable_next(i)) {
ftdm_module_t *mod = NULL;
const void *key = NULL;
void *val = NULL;
hashtable_this(i, &key, NULL, &val);
if (!key || !val) {
continue;
}
mod = (ftdm_module_t *) val;
lib = mod->lib;
snprintf(modpath, sizeof(modpath), "%s", mod->path);
ftdm_log(FTDM_LOG_INFO, "Unloading module %s\n", modpath);
ftdm_dso_destroy(&lib);
ftdm_log(FTDM_LOG_INFO, "Unloaded module %s\n", modpath);
}
return FTDM_SUCCESS;
@ -3506,8 +3558,12 @@ FT_DECLARE(ftdm_status_t) ftdm_global_destroy(void)
time_end();
globals.running = 0;
globals.span_index = 0;
ftdm_span_close_all();
ftdm_sleep(1000);
ftdm_unload_modules();
ftdm_mutex_lock(globals.span_mutex);
for (sp = globals.spans; sp;) {
@ -3529,10 +3585,6 @@ FT_DECLARE(ftdm_status_t) ftdm_global_destroy(void)
globals.spans = NULL;
ftdm_mutex_unlock(globals.span_mutex);
globals.span_index = 0;
ftdm_unload_modules();
ftdm_mutex_lock(globals.mutex);
hashtable_destroy(globals.interface_hash);
hashtable_destroy(globals.module_hash);

View File

@ -38,11 +38,12 @@ static ftdm_status_t ftdm_std_queue_create(ftdm_queue_t **outqueue, ftdm_size_t
static ftdm_status_t ftdm_std_queue_enqueue(ftdm_queue_t *queue, void *obj);
static void *ftdm_std_queue_dequeue(ftdm_queue_t *queue);
static ftdm_status_t ftdm_std_queue_wait(ftdm_queue_t *queue, int ms);
static ftdm_status_t ftdm_std_queue_get_interrupt(ftdm_queue_t *queue, ftdm_interrupt_t **interrupt);
static ftdm_status_t ftdm_std_queue_destroy(ftdm_queue_t **inqueue);
struct ftdm_queue {
ftdm_mutex_t *mutex;
ftdm_condition_t *condition;
ftdm_interrupt_t *interrupt;
ftdm_size_t capacity;
ftdm_size_t size;
unsigned rindex;
@ -56,6 +57,7 @@ FT_DECLARE_DATA ftdm_queue_handler_t g_ftdm_queue_handler =
/*.enqueue = */ ftdm_std_queue_enqueue,
/*.dequeue = */ ftdm_std_queue_dequeue,
/*.wait = */ ftdm_std_queue_wait,
/*.get_interrupt = */ ftdm_std_queue_get_interrupt,
/*.destroy = */ ftdm_std_queue_destroy
};
@ -66,6 +68,7 @@ FT_DECLARE(ftdm_status_t) ftdm_global_set_queue_handler(ftdm_queue_handler_t *ha
!handler->enqueue ||
!handler->dequeue ||
!handler->wait ||
!handler->get_interrupt ||
!handler->destroy) {
return FTDM_FAIL;
}
@ -95,7 +98,7 @@ static ftdm_status_t ftdm_std_queue_create(ftdm_queue_t **outqueue, ftdm_size_t
goto failed;
}
if (ftdm_condition_create(&queue->condition, queue->mutex) != FTDM_SUCCESS) {
if (ftdm_interrupt_create(&queue->interrupt, FTDM_INVALID_SOCKET) != FTDM_SUCCESS) {
goto failed;
}
@ -104,8 +107,8 @@ static ftdm_status_t ftdm_std_queue_create(ftdm_queue_t **outqueue, ftdm_size_t
failed:
if (queue) {
if (queue->condition) {
ftdm_condition_destroy(&queue->condition);
if (queue->interrupt) {
ftdm_interrupt_destroy(&queue->interrupt);
}
if (queue->mutex) {
ftdm_mutex_destroy(&queue->mutex);
@ -139,7 +142,7 @@ static ftdm_status_t ftdm_std_queue_enqueue(ftdm_queue_t *queue, void *obj)
status = FTDM_SUCCESS;
/* wake up queue reader */
ftdm_condition_signal(queue->condition);
ftdm_interrupt_signal(queue->interrupt);
done:
@ -188,7 +191,7 @@ static ftdm_status_t ftdm_std_queue_wait(ftdm_queue_t *queue, int ms)
}
/* no elements on the queue, wait for someone to write an element */
ret = ftdm_condition_wait(queue->condition, ms);
ret = ftdm_interrupt_wait(queue->interrupt, ms);
/* got an element or timeout, bail out */
ftdm_mutex_unlock(queue->mutex);
@ -196,14 +199,22 @@ static ftdm_status_t ftdm_std_queue_wait(ftdm_queue_t *queue, int ms)
return ret;
}
static ftdm_status_t ftdm_std_queue_get_interrupt(ftdm_queue_t *queue, ftdm_interrupt_t **interrupt)
{
ftdm_assert_return(queue != NULL, FTDM_FAIL, "Queue is null!\n");
ftdm_assert_return(interrupt != NULL, FTDM_FAIL, "Queue is null!\n");
*interrupt = queue->interrupt;
return FTDM_SUCCESS;
}
static ftdm_status_t ftdm_std_queue_destroy(ftdm_queue_t **inqueue)
{
ftdm_queue_t *queue = NULL;
ftdm_assert_return(inqueue != NULL, FTDM_FAIL, "Queue is null!");
ftdm_assert_return(*inqueue != NULL, FTDM_FAIL, "Queue is null!");
ftdm_assert_return(inqueue != NULL, FTDM_FAIL, "Queue is null!\n");
ftdm_assert_return(*inqueue != NULL, FTDM_FAIL, "Queue is null!\n");
queue = *inqueue;
ftdm_condition_destroy(&queue->condition);
ftdm_interrupt_destroy(&queue->interrupt);
ftdm_mutex_destroy(&queue->mutex);
ftdm_safe_free(queue->elements);
ftdm_safe_free(queue);

View File

@ -40,6 +40,7 @@ struct ftdm_mutex {
#else
#include <pthread.h>
#include <poll.h>
#define FTDM_THREAD_CALLING_CONVENTION
@ -49,16 +50,18 @@ struct ftdm_mutex {
#endif
struct ftdm_condition {
struct ftdm_interrupt {
ftdm_socket_t device;
#ifdef WIN32
HANDLE condition;
/* for generic interruption */
HANDLE event;
#else
pthread_cond_t condition;
/* for generic interruption */
int readfd;
int writefd;
#endif
ftdm_mutex_t *mutex;
};
struct ftdm_thread {
#ifdef WIN32
void *handle;
@ -238,123 +241,207 @@ FT_DECLARE(ftdm_status_t) _ftdm_mutex_unlock(ftdm_mutex_t *mutex)
}
FT_DECLARE(ftdm_status_t) ftdm_condition_create(ftdm_condition_t **incondition, ftdm_mutex_t *mutex)
FT_DECLARE(ftdm_status_t) ftdm_interrupt_create(ftdm_interrupt_t **ininterrupt, ftdm_socket_t device)
{
ftdm_condition_t *condition = NULL;
ftdm_interrupt_t *interrupt = NULL;
#ifndef WIN32
int fds[2];
#endif
ftdm_assert_return(incondition != NULL, FTDM_FAIL, "Condition double pointer is null!\n");
ftdm_assert_return(mutex != NULL, FTDM_FAIL, "Mutex for condition must not be null!\n");
ftdm_assert_return(ininterrupt != NULL, FTDM_FAIL, "interrupt double pointer is null!\n");
condition = ftdm_calloc(1, sizeof(*condition));
if (!condition) {
interrupt = ftdm_calloc(1, sizeof(*interrupt));
if (!interrupt) {
ftdm_log(FTDM_LOG_ERROR, "Failed to allocate interrupt memory\n");
return FTDM_FAIL;
}
condition->mutex = mutex;
interrupt->device = device;
#ifdef WIN32
condition->condition = CreateEvent(NULL, FALSE, FALSE, NULL);
if (!condition->condition) {
interrupt->interrupt = CreateEvent(NULL, FALSE, FALSE, NULL);
if (!interrupt->interrupt) {
ftdm_log(FTDM_LOG_ERROR, "Failed to allocate interrupt event\n");
goto failed;
}
#else
if (pthread_cond_init(&condition->condition, NULL)) {
if (pipe(fds)) {
ftdm_log(FTDM_LOG_ERROR, "Failed to allocate interrupt pipe: %s\n", strerror(errno));
goto failed;
}
interrupt->readfd = fds[0];
interrupt->writefd = fds[1];
#endif
*incondition = condition;
*ininterrupt = interrupt;
return FTDM_SUCCESS;
failed:
if (condition) {
ftdm_safe_free(condition);
if (interrupt) {
#ifndef WIN32
if (interrupt->readfd) {
close(interrupt->readfd);
close(interrupt->writefd);
interrupt->readfd = -1;
interrupt->writefd = -1;
}
#endif
ftdm_safe_free(interrupt);
}
return FTDM_FAIL;
}
#define ONE_BILLION 1000000000
FT_DECLARE(ftdm_status_t) ftdm_condition_wait(ftdm_condition_t *condition, int ms)
FT_DECLARE(ftdm_status_t) ftdm_interrupt_wait(ftdm_interrupt_t *interrupt, int ms)
{
int num = 1;
#ifdef WIN32
DWORD res = 0;
HANDLE ints[2];
#else
int res = 0;
struct pollfd ints[2];
char pipebuf[255];
#endif
ftdm_assert_return(condition != NULL, FTDM_FAIL, "Condition is null!\n");
ftdm_assert_return(interrupt != NULL, FTDM_FAIL, "Condition is null!\n");
/* start implementation */
#ifdef WIN32
ftdm_mutex_unlock(condition->mutex);
res = WaitForSingleObject(condition->condition, ms > 0 ? ms : INFINITE);
ftdm_mutex_lock(condition->mutex);
ints[0] = interrupt->event;
if (interrupt->device != FTDM_INVALID_SOCKET) {
num++;
ints[1] = interrupt->device;
}
res = WaitForMultipleObjects(num, &ints, FALSE, ms >= 0 ? ms : INFINITE);
switch (res) {
case WAIT_ABANDONED:
case WAIT_TIMEOUT:
return FTDM_TIMEOUT;
case WAIT_FAILED:
case WAIT_ABANDONED: /* is it right to fail with abandoned? */
return FTDM_FAIL;
case WAIT_OBJECT_0:
return FTDM_SUCCESS;
default:
ftdm_log(FTDM_LOG_ERROR, "Error waiting for freetdm condition event (WaitForSingleObject returned %d)\n", res);
return FTDM_FAIL;
if (res >= (sizeof(ints)/sizeof(ints[0]))) {
ftdm_log(FTDM_LOG_ERROR, "Error waiting for freetdm interrupt event (WaitForSingleObject returned %d)\n", res);
return FTDM_FAIL;
}
return FTDM_SUCCESS;
}
#else
int res = 0;
if (ms > 0) {
struct timeval t;
struct timespec waitms;
gettimeofday(&t, NULL);
waitms.tv_sec = t.tv_sec + ( ms / 1000 );
waitms.tv_nsec = 1000*(t.tv_usec + (1000 * ( ms % 1000 )));
if (waitms.tv_nsec >= ONE_BILLION) {
waitms.tv_sec++;
waitms.tv_nsec -= ONE_BILLION;
}
res = pthread_cond_timedwait(&condition->condition, &condition->mutex->mutex, &waitms);
} else {
res = pthread_cond_wait(&condition->condition, &condition->mutex->mutex);
}
if (res != 0) {
if (res == ETIMEDOUT) {
return FTDM_TIMEOUT;
}
ints[0].fd = interrupt->readfd;
ints[0].events = POLLIN;
ints[0].revents = 0;
ftdm_log(FTDM_LOG_CRIT,"pthread_cond_timedwait failed (%d)\n", res);
if (interrupt->device != FTDM_INVALID_SOCKET) {
num++;
ints[1].fd = interrupt->device;
ints[1].events = POLLIN;
ints[1].revents = 0;
}
res = poll(ints, num, ms);
if (res == -1) {
ftdm_log(FTDM_LOG_CRIT, "interrupt poll failed (%s)\n", strerror(errno));
return FTDM_FAIL;
}
if (res == 0) {
return FTDM_TIMEOUT;
}
if (ints[0].revents & POLLIN) {
res = read(ints[0].fd, pipebuf, sizeof(pipebuf));
if (res == -1) {
ftdm_log(FTDM_LOG_CRIT, "reading interrupt descriptor failed (%s)\n", strerror(errno));
}
}
return FTDM_SUCCESS;
#endif
}
FT_DECLARE(ftdm_status_t) ftdm_condition_signal(ftdm_condition_t *condition)
FT_DECLARE(ftdm_status_t) ftdm_interrupt_signal(ftdm_interrupt_t *interrupt)
{
ftdm_assert_return(condition != NULL, FTDM_FAIL, "Condition is null!\n");
ftdm_assert_return(interrupt != NULL, FTDM_FAIL, "Interrupt is null!\n");
#ifdef WIN32
if (!SetEvent(condition->condition)) {
if (!SetEvent(interrupt->interrupt)) {
ftdm_log(FTDM_LOG_ERROR, "Failed to signal interrupt\n");
return FTDM_FAIL;
}
#else
int err;
if ((err = pthread_cond_signal(&condition->condition))) {
ftdm_log(FTDM_LOG_ERROR, "Failed to signal condition %d:%s\n", err, strerror(err));
if ((err = write(interrupt->writefd, "w", 1)) != 1) {
ftdm_log(FTDM_LOG_ERROR, "Failed to signal interrupt: %s\n", errno, strerror(errno));
return FTDM_FAIL;
}
#endif
return FTDM_SUCCESS;
}
FT_DECLARE(ftdm_status_t) ftdm_condition_destroy(ftdm_condition_t **incondition)
FT_DECLARE(ftdm_status_t) ftdm_interrupt_destroy(ftdm_interrupt_t **ininterrupt)
{
ftdm_condition_t *condition = NULL;
ftdm_assert_return(incondition != NULL, FTDM_FAIL, "Condition null when destroying!\n");
condition = *incondition;
ftdm_interrupt_t *interrupt = NULL;
ftdm_assert_return(ininterrupt != NULL, FTDM_FAIL, "Interrupt null when destroying!\n");
interrupt = *ininterrupt;
#ifdef WIN32
CloseHandle(condition->condition);
CloseHandle(interrupt->interrupt);
#else
if (pthread_cond_destroy(&condition->condition)) {
close(interrupt->readfd);
close(interrupt->writefd);
interrupt->readfd = -1;
interrupt->writefd = -1;
#endif
ftdm_safe_free(interrupt);
*ininterrupt = NULL;
return FTDM_SUCCESS;
}
FT_DECLARE(ftdm_status_t) ftdm_interrupt_multiple_wait(ftdm_interrupt_t *interrupts[], ftdm_size_t size, int ms)
{
#ifndef WIN32
int i;
int res = 0;
int numdevices = 0;
char pipebuf[255];
struct pollfd ints[size*2];
memset(&ints, 0, sizeof(ints));
for (i = 0; i < size; i++) {
ints[i].events = POLLIN;
ints[i].revents = 0;
ints[i].fd = interrupts[i]->readfd;
if (interrupts[i]->device != FTDM_INVALID_SOCKET) {
ints[i+numdevices].events = POLLIN;
ints[i+numdevices].revents = 0;
ints[i+numdevices].fd = interrupts[i]->device;
numdevices++;
}
}
res = poll(ints, size + numdevices, ms);
if (res == -1) {
ftdm_log(FTDM_LOG_CRIT, "interrupt poll failed (%s)\n", strerror(errno));
return FTDM_FAIL;
}
if (res == 0) {
return FTDM_TIMEOUT;
}
for (i = size; i < ftdm_array_len(ints); i++) {
if (ints[i].revents & POLLIN) {
res = read(ints[0].fd, pipebuf, sizeof(pipebuf));
if (res == -1) {
ftdm_log(FTDM_LOG_CRIT, "reading interrupt descriptor failed (%s)\n", strerror(errno));
}
}
}
#endif
ftdm_safe_free(condition);
*incondition = NULL;
return FTDM_SUCCESS;
}

View File

@ -46,8 +46,6 @@ typedef enum {
typedef struct ftdm_sangoma_boost_data {
sangomabc_connection_t mcon;
sangomabc_connection_t pcon;
fd_set rfds;
fd_set efds;
int iteration;
uint32_t flags;
boost_sigmod_interface_t *sigmod;

View File

@ -39,7 +39,7 @@
*/
/* NOTE:
On WIN32 platform this code works with sigmod ONLY, don't try to make sense of any socket code for win32
On __WINDOWS__ platform this code works with sigmod ONLY, don't try to make sense of any socket code for win
I basically ifdef out everything that the compiler complained about
*/
@ -1016,11 +1016,8 @@ static void handle_heartbeat(sangomabc_connection_t *mcon, sangomabc_short_event
err = sangomabc_connection_writep(mcon, (sangomabc_event_t*)event);
if (err <= 0) {
ftdm_log(FTDM_LOG_CRIT, "Failed to tx on ISUP socket [%s]: %s\n", strerror(errno));
ftdm_log(FTDM_LOG_CRIT, "Failed to tx on boost connection [%s]: %s\n", strerror(errno));
}
mcon->hb_elapsed = 0;
return;
}
@ -1050,7 +1047,6 @@ static void handle_restart(sangomabc_connection_t *mcon, ftdm_span_t *span, sang
ftdm_set_flag_locked(span, FTDM_SPAN_SUSPENDED);
ftdm_set_flag(sangoma_boost_data, FTDM_SANGOMA_BOOST_RESTARTING);
mcon->hb_elapsed = 0;
}
/**
@ -1452,6 +1448,7 @@ static __inline__ void init_outgoing_array(void)
*/
static __inline__ void check_state(ftdm_span_t *span)
{
ftdm_channel_t *ftdmchan = NULL;
ftdm_sangoma_boost_data_t *sangoma_boost_data = span->signal_data;
int susp = ftdm_test_flag(span, FTDM_SPAN_SUSPENDED);
@ -1462,17 +1459,23 @@ static __inline__ void check_state(ftdm_span_t *span)
if (ftdm_test_flag(span, FTDM_SPAN_STATE_CHANGE) || susp) {
uint32_t j;
ftdm_clear_flag_locked(span, FTDM_SPAN_STATE_CHANGE);
for(j = 1; j <= span->chan_count; j++) {
if (ftdm_test_flag((span->channels[j]), FTDM_CHANNEL_STATE_CHANGE) || susp) {
if (susp) {
for (j = 0; j <= span->chan_count; j++) {
ftdm_mutex_lock(span->channels[j]->mutex);
ftdm_clear_flag((span->channels[j]), FTDM_CHANNEL_STATE_CHANGE);
if (susp && span->channels[j]->state != FTDM_CHANNEL_STATE_DOWN) {
ftdm_channel_set_state(span->channels[j], FTDM_CHANNEL_STATE_RESTART, 0);
}
ftdm_channel_set_state(span->channels[j], FTDM_CHANNEL_STATE_RESTART, 0);
state_advance(span->channels[j]);
ftdm_channel_complete_state(span->channels[j]);
ftdm_mutex_unlock(span->channels[j]->mutex);
}
} else {
while ((ftdmchan = ftdm_queue_dequeue(span->pendingchans))) {
ftdm_mutex_lock(ftdmchan->mutex);
ftdm_clear_flag(ftdmchan, FTDM_CHANNEL_STATE_CHANGE);
state_advance(ftdmchan);
ftdm_channel_complete_state(ftdmchan);
ftdm_mutex_unlock(ftdmchan->mutex);
}
}
}
@ -1487,7 +1490,6 @@ static __inline__ void check_state(ftdm_span_t *span)
ftdm_clear_flag(sangoma_boost_data, FTDM_SANGOMA_BOOST_RESTARTING);
ftdm_clear_flag_locked(span, FTDM_SPAN_SUSPENDED);
ftdm_clear_flag((&sangoma_boost_data->mcon), MSU_FLAG_DOWN);
sangoma_boost_data->mcon.hb_elapsed = 0;
init_outgoing_array();
}
}
@ -1596,6 +1598,18 @@ static ftdm_status_t ftdm_boost_connection_open(ftdm_span_t *span)
ftdm_log(FTDM_LOG_ERROR, "Error: Opening PCON Socket [%d] %s\n", sangoma_boost_data->pcon.socket, strerror(errno));
return FTDM_FAIL;
}
/* try to create the boost sockets interrupt objects */
if (ftdm_interrupt_create(&sangoma_boost_data->pcon.sock_interrupt, sangoma_boost_data->pcon.socket) != FTDM_SUCCESS) {
ftdm_log(FTDM_LOG_ERROR, "Span %s could not create its boost msock interrupt!\n", span->name);
return FTDM_FAIL;
}
if (ftdm_interrupt_create(&sangoma_boost_data->mcon.sock_interrupt, sangoma_boost_data->mcon.socket) != FTDM_SUCCESS) {
ftdm_log(FTDM_LOG_ERROR, "Span %s could not create its boost psock interrupt!\n", span->name);
return FTDM_FAIL;
}
return FTDM_SUCCESS;
}
@ -1603,49 +1617,34 @@ static ftdm_status_t ftdm_boost_connection_open(ftdm_span_t *span)
\brief wait for a boost event
\return -1 on error, 0 on timeout, 1 when there are events
*/
static int ftdm_boost_wait_event(ftdm_span_t *span, int ms)
static int ftdm_boost_wait_event(ftdm_span_t *span)
{
#ifndef WIN32
struct timeval tv = { 0, ms * 1000 };
sangomabc_connection_t *mcon, *pcon;
int max, activity;
#endif
ftdm_status_t res;
ftdm_interrupt_t *ints[3];
int numints;
ftdm_sangoma_boost_data_t *sangoma_boost_data = span->signal_data;
ftdm_queue_get_interrupt(span->pendingchans, &ints[0]);
numints = 1;
/* if in queue mode wait for both the pendingchans queue and the boost msg queue */
if (sangoma_boost_data->sigmod) {
ftdm_status_t res;
res = ftdm_queue_wait(sangoma_boost_data->boost_queue, ms);
if (FTDM_TIMEOUT == res) {
return 0;
}
if (FTDM_SUCCESS != res) {
return -1;
}
return 1;
ftdm_queue_get_interrupt(sangoma_boost_data->boost_queue, &ints[1]);
numints = 2;
}
#ifndef __WINDOWS__
else {
/* socket mode ... */
ints[1] = sangoma_boost_data->mcon.sock_interrupt;
ints[2] = sangoma_boost_data->pcon.sock_interrupt;
numints = 3;
sangoma_boost_data->iteration = 0;
}
#ifndef WIN32
mcon = &sangoma_boost_data->mcon;
pcon = &sangoma_boost_data->pcon;
FD_ZERO(&sangoma_boost_data->rfds);
FD_ZERO(&sangoma_boost_data->efds);
FD_SET(mcon->socket, &sangoma_boost_data->rfds);
FD_SET(mcon->socket, &sangoma_boost_data->efds);
FD_SET(pcon->socket, &sangoma_boost_data->rfds);
FD_SET(pcon->socket, &sangoma_boost_data->efds);
sangoma_boost_data->iteration = 0;
max = ((pcon->socket > mcon->socket) ? pcon->socket : mcon->socket) + 1;
if ((activity = select(max, &sangoma_boost_data->rfds, NULL, &sangoma_boost_data->efds, &tv)) < 0) {
return -1;
}
if (FD_ISSET(pcon->socket, &sangoma_boost_data->efds) || FD_ISSET(mcon->socket, &sangoma_boost_data->efds)) {
return -1;
}
return 1;
#endif
res = ftdm_interrupt_multiple_wait(ints, numints, -1);
if (FTDM_SUCCESS != res) {
ftdm_log(FTDM_LOG_CRIT, "Unexpected return value from interrupt waiting: %d\n", res);
return -1;
}
return 0;
}
@ -1659,19 +1658,13 @@ static sangomabc_event_t *ftdm_boost_read_event(ftdm_span_t *span)
mcon = &sangoma_boost_data->mcon;
pcon = &sangoma_boost_data->pcon;
if (sangoma_boost_data->sigmod
#ifndef WIN32
|| FD_ISSET(pcon->socket, &sangoma_boost_data->rfds)
#endif
) {
event = sangomabc_connection_readp(pcon, sangoma_boost_data->iteration);
}
#ifndef WIN32
event = sangomabc_connection_readp(pcon, sangoma_boost_data->iteration);
/* if there is no event and this is not a sigmod-driven span it's time to try the other connection for events */
if (!event && !sangoma_boost_data->sigmod && FD_ISSET(mcon->socket, &sangoma_boost_data->rfds)) {
event = sangomabc_connection_readp(mcon, sangoma_boost_data->iteration);
if (!event && !sangoma_boost_data->sigmod) {
event = sangomabc_connection_read(mcon, sangoma_boost_data->iteration);
}
#endif
return event;
}
@ -1684,7 +1677,6 @@ static void *ftdm_sangoma_boost_run(ftdm_thread_t *me, void *obj)
{
ftdm_span_t *span = (ftdm_span_t *) obj;
sangomabc_connection_t *mcon, *pcon;
uint32_t ms = 10;
ftdm_sangoma_boost_data_t *sangoma_boost_data = span->signal_data;
mcon = &sangoma_boost_data->mcon;
@ -1719,7 +1711,6 @@ static void *ftdm_sangoma_boost_run(ftdm_thread_t *me, void *obj)
while (ftdm_test_flag(sangoma_boost_data, FTDM_SANGOMA_BOOST_RUNNING)) {
sangomabc_event_t *event = NULL;
int activity = 0;
if (!ftdm_running()) {
if (!sangoma_boost_data->sigmod) {
@ -1735,27 +1726,17 @@ static void *ftdm_sangoma_boost_run(ftdm_thread_t *me, void *obj)
break;
}
if ((activity = ftdm_boost_wait_event(span, ms)) < 0) {
if (ftdm_boost_wait_event(span) < 0) {
ftdm_log(FTDM_LOG_ERROR, "ftdm_boost_wait_event failed\n");
goto error;
}
if (activity) {
while ((event = ftdm_boost_read_event(span))) {
parse_sangoma_event(span, pcon, (sangomabc_short_event_t*)event);
sangoma_boost_data->iteration++;
}
while ((event = ftdm_boost_read_event(span))) {
parse_sangoma_event(span, pcon, (sangomabc_short_event_t*)event);
sangoma_boost_data->iteration++;
}
pcon->hb_elapsed += ms;
if (ftdm_test_flag(span, FTDM_SPAN_SUSPENDED) || ftdm_test_flag(mcon, MSU_FLAG_DOWN)) {
pcon->hb_elapsed = 0;
}
if (ftdm_running()) {
check_state(span);
}
check_state(span);
}
goto end;
@ -2235,6 +2216,7 @@ static FIO_CONFIGURE_SPAN_SIGNALING_FUNCTION(ftdm_sangoma_boost_configure_span)
sangoma_boost_data->sigmod = sigmod_iface;
sigmod_iface->configure_span(span, ftdm_parameters);
} else {
ftdm_log(FTDM_LOG_NOTICE, "Span %s will use boost socket mode\n", span->name);
ftdm_set_string(sangoma_boost_data->mcon.cfg.local_ip, local_ip);
sangoma_boost_data->mcon.cfg.local_port = local_port;
ftdm_set_string(sangoma_boost_data->mcon.cfg.remote_ip, remote_ip);

View File

@ -311,11 +311,7 @@ sangomabc_event_t *__sangomabc_connection_read(sangomabc_connection_t *mcon, int
ftdm_log(FTDM_LOG_CRIT, "Invalid Boost Version %i Expecting %i\n",mcon->event.version, SIGBOOST_VERSION);
}
/* Must check for < 0 cannot rely on bytes > MIN_SIZE_... compiler issue */
if (bytes < 0) {
msg_ok=0;
} else if ((bytes >= MIN_SIZE_CALLSTART_MSG) && boost_full_event(mcon->event.event_id)) {
if ((bytes >= MIN_SIZE_CALLSTART_MSG) && boost_full_event(mcon->event.event_id)) {
msg_ok=1;
} else if (bytes == sizeof(sangomabc_short_event_t)) {

View File

@ -105,10 +105,10 @@ struct sangomabc_connection {
unsigned int txwindow;
unsigned int rxseq_reset;
sangomabc_ip_cfg_t cfg;
uint32_t hb_elapsed;
/* boost signaling mod interface pointer (if not working in TCP mode) */
boost_sigmod_interface_t *sigmod;
ftdm_queue_t *boost_queue;
ftdm_interrupt_t *sock_interrupt;
ftdm_span_t *span;
};

View File

@ -381,6 +381,42 @@ struct ftdm_event {
void *data;
};
typedef struct ftdm_queue ftdm_queue_t;
typedef ftdm_status_t (*ftdm_queue_create_func_t)(ftdm_queue_t **queue, ftdm_size_t capacity);
typedef ftdm_status_t (*ftdm_queue_enqueue_func_t)(ftdm_queue_t *queue, void *obj);
typedef void *(*ftdm_queue_dequeue_func_t)(ftdm_queue_t *queue);
typedef ftdm_status_t (*ftdm_queue_wait_func_t)(ftdm_queue_t *queue, int ms);
typedef ftdm_status_t (*ftdm_queue_get_interrupt_func_t)(ftdm_queue_t *queue, ftdm_interrupt_t **interrupt);
typedef ftdm_status_t (*ftdm_queue_destroy_func_t)(ftdm_queue_t **queue);
typedef struct ftdm_queue_handler {
ftdm_queue_create_func_t create;
ftdm_queue_enqueue_func_t enqueue;
ftdm_queue_dequeue_func_t dequeue;
ftdm_queue_wait_func_t wait;
ftdm_queue_get_interrupt_func_t get_interrupt;
ftdm_queue_destroy_func_t destroy;
} ftdm_queue_handler_t;
FT_DECLARE_DATA extern ftdm_queue_handler_t g_ftdm_queue_handler;
/*! brief create a new queue */
#define ftdm_queue_create(queue, capacity) g_ftdm_queue_handler.create(queue, capacity)
/*! Enqueue an object */
#define ftdm_queue_enqueue(queue, obj) g_ftdm_queue_handler.enqueue(queue, obj)
/*! dequeue an object from the queue */
#define ftdm_queue_dequeue(queue) g_ftdm_queue_handler.dequeue(queue)
/*! wait ms milliseconds for a queue to have available objects, -1 to wait forever */
#define ftdm_queue_wait(queue, ms) g_ftdm_queue_handler.wait(queue, ms)
/*! get the internal interrupt object (to wait for elements to be added from the outside bypassing ftdm_queue_wait) */
#define ftdm_queue_get_interrupt(queue, ms) g_ftdm_queue_handler.get_interrupt(queue, ms)
/*! destroy the queue */
#define ftdm_queue_destroy(queue) g_ftdm_queue_handler.destroy(queue)
#define FTDM_TOKEN_STRLEN 128
#define FTDM_MAX_TOKENS 10
@ -621,6 +657,7 @@ struct ftdm_span {
int suggest_chan_id;
ftdm_state_map_t *state_map;
ftdm_caller_data_t default_caller_data;
ftdm_queue_t *pendingchans;
struct ftdm_span *next;
};
@ -673,36 +710,6 @@ struct ftdm_io_interface {
fio_api_t api;
};
typedef struct ftdm_queue ftdm_queue_t;
typedef ftdm_status_t (*ftdm_queue_create_func_t)(ftdm_queue_t **queue, ftdm_size_t capacity);
typedef ftdm_status_t (*ftdm_queue_enqueue_func_t)(ftdm_queue_t *queue, void *obj);
typedef void *(*ftdm_queue_dequeue_func_t)(ftdm_queue_t *queue);
typedef ftdm_status_t (*ftdm_queue_wait_func_t)(ftdm_queue_t *queue, int ms);
typedef ftdm_status_t (*ftdm_queue_destroy_func_t)(ftdm_queue_t **queue);
typedef struct ftdm_queue_handler {
ftdm_queue_create_func_t create;
ftdm_queue_enqueue_func_t enqueue;
ftdm_queue_dequeue_func_t dequeue;
ftdm_queue_wait_func_t wait;
ftdm_queue_destroy_func_t destroy;
} ftdm_queue_handler_t;
FT_DECLARE_DATA extern ftdm_queue_handler_t g_ftdm_queue_handler;
/*! brief create a new queue */
#define ftdm_queue_create(queue, capacity) g_ftdm_queue_handler.create(queue, capacity)
/*! Enqueue an object */
#define ftdm_queue_enqueue(queue, obj) g_ftdm_queue_handler.enqueue(queue, obj)
/*! dequeue an object from the queue */
#define ftdm_queue_dequeue(queue) g_ftdm_queue_handler.dequeue(queue)
/*! wait ms milliseconds for a queue to have available objects, -1 to wait forever */
#define ftdm_queue_wait(queue, ms) g_ftdm_queue_handler.wait(queue, ms)
/*! destroy the queue */
#define ftdm_queue_destroy(queue) g_ftdm_queue_handler.destroy(queue)
/*! \brief Override the default queue handler */
FT_DECLARE(ftdm_status_t) ftdm_global_set_queue_handler(ftdm_queue_handler_t *handler);
@ -890,6 +897,8 @@ FIO_CODEC_FUNCTION(fio_alaw2ulaw);
*/
#define ftdm_socket_close(it) if (it > -1) { close(it); it = -1;}
#define ftdm_array_len(array) sizeof(array)/sizeof(array[0])
static __inline__ void ftdm_abort(void)
{
#ifdef __cplusplus

View File

@ -32,7 +32,7 @@ extern "C" {
#endif
typedef struct ftdm_mutex ftdm_mutex_t;
typedef struct ftdm_thread ftdm_thread_t;
typedef struct ftdm_condition ftdm_condition_t;
typedef struct ftdm_interrupt ftdm_interrupt_t;
typedef void *(*ftdm_thread_function_t) (ftdm_thread_t *, void *);
FT_DECLARE(ftdm_status_t) ftdm_thread_create_detached(ftdm_thread_function_t func, void *data);
@ -43,10 +43,11 @@ FT_DECLARE(ftdm_status_t) ftdm_mutex_destroy(ftdm_mutex_t **mutex);
FT_DECLARE(ftdm_status_t) _ftdm_mutex_lock(ftdm_mutex_t *mutex);
FT_DECLARE(ftdm_status_t) _ftdm_mutex_trylock(ftdm_mutex_t *mutex);
FT_DECLARE(ftdm_status_t) _ftdm_mutex_unlock(ftdm_mutex_t *mutex);
FT_DECLARE(ftdm_status_t) ftdm_condition_create(ftdm_condition_t **cond, ftdm_mutex_t *mutex);
FT_DECLARE(ftdm_status_t) ftdm_condition_destroy(ftdm_condition_t **cond);
FT_DECLARE(ftdm_status_t) ftdm_condition_signal(ftdm_condition_t *cond);
FT_DECLARE(ftdm_status_t) ftdm_condition_wait(ftdm_condition_t *cond, int ms);
FT_DECLARE(ftdm_status_t) ftdm_interrupt_create(ftdm_interrupt_t **cond, ftdm_socket_t device);
FT_DECLARE(ftdm_status_t) ftdm_interrupt_destroy(ftdm_interrupt_t **cond);
FT_DECLARE(ftdm_status_t) ftdm_interrupt_signal(ftdm_interrupt_t *cond);
FT_DECLARE(ftdm_status_t) ftdm_interrupt_wait(ftdm_interrupt_t *cond, int ms);
FT_DECLARE(ftdm_status_t) ftdm_interrupt_multiple_wait(ftdm_interrupt_t *interrupts[], ftdm_size_t size, int ms);
#ifdef __cplusplus
}

View File

@ -40,6 +40,7 @@
#define FTDM_TYPES_H
#include "fsk.h"
#define FTDM_INVALID_SOCKET -1
#ifdef WIN32
#include <windows.h>
typedef HANDLE ftdm_socket_t;