Clan up implementation of Diva streaming

This commit is contained in:
MelwareDE 2010-06-30 08:33:41 +00:00
parent dcfeb51b01
commit 2aeb53c779
6 changed files with 370 additions and 336 deletions

View File

@ -130,6 +130,7 @@ OBJECTS += divastreaming/diva_streaming_idi_host_ifc_impl.o \
divastreaming/diva_streaming_manager.o \
divastreaming/diva_streaming_messages.o \
divastreaming/segment_alloc.o \
divastreaming/chan_capi_divastreaming_utils.o \
divastreaming/runtime.o
endif

View File

@ -46,6 +46,7 @@ struct _diva_streaming_vector* vind;
#include "diva_streaming_messages.h"
#include "diva_streaming_vector.h"
#include "diva_streaming_manager.h"
#include "chan_capi_divastreaming_utils.h"
#endif
/* #define CC_VERSION "x.y.z" */

View File

@ -25,12 +25,14 @@
#include "chan_capi_rtp.h"
#include "chan_capi_utils.h"
#include "chan_capi_supplementary.h"
#ifdef DIVA_STREAMING
#include "platform.h"
#include "diva_streaming_result.h"
#include "diva_streaming_messages.h"
#include "diva_streaming_vector.h"
#include "diva_streaming_manager.h"
#include "chan_capi_divastreaming_utils.h"
#endif
int capidebug = 0;
@ -53,32 +55,6 @@ static struct peerlink_s {
time_t age;
} peerlinkchannel[CAPI_MAX_PEERLINKCHANNELS];
#ifdef DIVA_STREAMING
AST_MUTEX_DEFINE_STATIC(stream_write_lock);
typedef enum _diva_stream_state {
DivaStreamCreated = 0,
DivaStreamActive = 1,
DivaStreamCancelSent = 2,
DivaStreamDisconnectSent = 3,
DivaStreamDisconnected = 4
} diva_stream_state_t;
typedef struct _diva_stream_scheduling_entry {
diva_entity_link_t link;
struct _diva_stream *diva_stream;
diva_stream_state_t diva_stream_state;
struct capi_pvt *i;
int rx_flow_control;
int tx_flow_control;
char vname[CAPI_MAX_STRING]; /* Cached from capi_pvt */
dword PLCI; /* Cached from capi_pvt */
time_t cancel_start;
} diva_stream_scheduling_entry_t;
static diva_entity_queue_t diva_streaming_new; /* protected by stream_write_lock, new streams */
#endif
/*
* helper for <pbx>_verbose
*/
@ -1161,304 +1137,6 @@ done:
return error;
}
#ifdef DIVA_STREAMING
int capi_DivaStreamingSupported (unsigned controller)
{
MESSAGE_EXCHANGE_ERROR error;
int waitcount = 50;
unsigned char manbuf[CAPI_MANUFACTURER_LEN];
_cmsg CMSG;
int ret = 0;
if (capi20_get_manufacturer(controller, manbuf) == NULL) {
goto done;
}
if ((strstr((char *)manbuf, "Eicon") == 0) &&
(strstr((char *)manbuf, "Dialogic") == 0)) {
goto done;
}
error = capi_sendf (NULL, 0, CAPI_MANUFACTURER_REQ, controller, get_capi_MessageNumber(),
"dw(bs)", _DI_MANU_ID, _DI_STREAM_CTRL, 2, "");
if (error)
goto done;
while (waitcount) {
error = capidev_check_wait_get_cmsg(&CMSG);
if (IS_MANUFACTURER_CONF(&CMSG) && (CMSG.ManuID == _DI_MANU_ID) &&
((CMSG.Class & 0xffff) == _DI_STREAM_CTRL)) {
error = (MESSAGE_EXCHANGE_ERROR)(CMSG.Class >> 16);
ret = (error == 0);
break;
}
usleep(30000);
waitcount--;
}
done:
return ret;
}
static int divaStreamingMessageRx (void* user_context, dword message, dword length, const struct _diva_streaming_vector* v, dword nr_v)
{
diva_stream_scheduling_entry_t* pE = (diva_stream_scheduling_entry_t*)user_context;
dword message_type = (message & 0xff);
if (message_type == 0) { /* IDI message */
dword offset = 0;
diva_streaming_vector_t vind[8];
byte Ind = 0;
int vind_nr = 0;
int process_indication;
do {
vind_nr = sizeof(vind)/sizeof(vind[0]);
offset = diva_streaming_get_indication_data (offset, message, length, v, nr_v, &Ind, vind, &vind_nr);
process_indication = (diva_streaming_idi_supported_ind (Ind, vind_nr != 0, vind_nr != 0 ? (byte*)vind->data : (byte*)"") != 0);
if (likely(process_indication != 0)) {
if (likely(Ind == 8)) {
if (likely(pE->i != 0 && pE->i->NCCI != 0))
capidev_handle_data_b3_indication_vector (pE->i, vind, vind_nr);
} else {
dword i = 0, k = 0;
word data_length;
byte ind_data_buffer[2048+512];
data_length = (word)diva_streaming_read_vector_data (vind, vind_nr, &i, &k, ind_data_buffer, sizeof(ind_data_buffer));
DBG_TRC(("Ind: %02x length:%u", Ind, data_length))
}
}
} while (offset != 0);
} else if (message_type == 0xff) { /* System message */
switch ((byte)(message >> 8)) {
case DIVA_STREAM_MESSAGE_INIT: /* Stream active */
if (pE->PLCI == 0 && pE->i != 0) {
pE->PLCI = pE->i->PLCI;
}
cc_verbose(3, 0, VERBOSE_PREFIX_2 "%s: stream active (PLCI=%#x)\n", pE->vname, pE->PLCI);
if (pE->diva_stream_state == DivaStreamCreated) {
pE->diva_stream_state = DivaStreamActive;
} else if (pE->diva_stream_state == DivaStreamCancelSent) {
pE->diva_stream->release_stream(pE->diva_stream);
pE->i = 0;
pE->diva_stream_state = DivaStreamDisconnectSent;
}
break;
case DIVA_STREAM_MESSAGE_RX_TX_ACK: /* Resolved Tx flow control */
/* cc_verbose(4, 1, "%s: stream tx ack (PLCI=%#x)\n", pE->vname, pE->PLCI); */
break;
case DIVA_STREAM_MESSAGE_SYNC_ACK: /* Sync ack request acknowledge */
cc_verbose(4, 1, "%s: stream sync ack (PLCI=%#x, sequence=%08x)\n", pE->vname, pE->PLCI, length);
break;
case DIVA_STREAM_MESSAGE_RELEASE_ACK: /* Release stream acknowledge */
pE->diva_stream_state = DivaStreamDisconnected;
pE->diva_stream = 0;
cc_verbose(3, 0, VERBOSE_PREFIX_2 "%s: stream released (PLCI=%#x)\n", pE->vname, pE->PLCI);
break;
case DIVA_STREAM_MESSAGE_INIT_ERROR: /* Failed to initialize stream */
pE->diva_stream_state = DivaStreamDisconnected;
pE->diva_stream = 0;
cc_verbose(3, 0, VERBOSE_PREFIX_2 "%s: stream init error (PLCI=%#x, error=%d)\n", pE->vname, pE->PLCI, length);
break;
default:
cc_verbose(3, 0, VERBOSE_PREFIX_2 "%s: stream unknown system message (PLCI=%#x, message=%08x)\n", pE->vname, pE->PLCI, message);
break;
}
} else {
cc_verbose(3, 0, VERBOSE_PREFIX_2 "%s: unknown stream message (PLCI=%#x, message=%08x)\n", pE->vname, pE->PLCI, message);
}
return (0);
}
/*
* Create Diva stream
*
*/
void capi_DivaStreamingOn(struct capi_pvt *i, byte streamCommand, _cword messageNumber)
{
diva_stream_scheduling_entry_t* pE;
int ret;
char trace_ident[8];
unsigned int effectivePLCI;
pE = malloc (sizeof(*pE));
if (pE == 0)
return;
snprintf (trace_ident, sizeof(trace_ident), "C%02x", (byte)i->PLCI);
trace_ident[sizeof(trace_ident)-1] = 0;
cc_mutex_lock(&stream_write_lock);
ret = diva_stream_create (&pE->diva_stream, NULL, 255, divaStreamingMessageRx, pE, trace_ident);
if (ret == 0) {
byte* description = (byte*)pE->diva_stream->description (pE->diva_stream);
MESSAGE_EXCHANGE_ERROR error;
description[1] = streamCommand;
description[3] |= 0x01;
if (streamCommand == 0) {
messageNumber = get_capi_MessageNumber();
effectivePLCI = i->PLCI;
} else {
/*
PLCI still not assigned. Send to controller and tag with message number
where command receives effective
*/
effectivePLCI = i->controller;
}
error = capi_sendf (NULL, 0, CAPI_MANUFACTURER_REQ, effectivePLCI, messageNumber,
"dws", _DI_MANU_ID, _DI_STREAM_CTRL, description);
if (error == 0) {
pE->diva_stream_state = DivaStreamCreated;
pE->PLCI = i->PLCI;
pE->i = i;
i->diva_stream_entry = pE;
memcpy (pE->vname, i->vname, MIN(sizeof(pE->vname), sizeof(i->vname)));
pE->vname[sizeof(pE->vname)-1] = 0;
pE->rx_flow_control = 0;
pE->tx_flow_control = 0;
diva_q_add_tail (&diva_streaming_new, &pE->link);
} else {
pE->diva_stream->release (pE->diva_stream);
free (pE);
}
}
cc_mutex_unlock(&stream_write_lock);
}
/*
* Remove stream info
*
* To remove stream from one active connection:
* remove stream info
* disconnect B3
* remove stream
* select_b
*/
void capi_DivaStreamingRemoveInfo(struct capi_pvt *i)
{
byte description[] = { 2, 0, 0 };
MESSAGE_EXCHANGE_ERROR error;
int send;
cc_mutex_lock(&stream_write_lock);
send = i->diva_stream_entry != 0;
cc_mutex_unlock(&stream_write_lock);
if (send != 0)
error = capi_sendf (NULL, 0, CAPI_MANUFACTURER_REQ, i->PLCI, get_capi_MessageNumber(),
"dws", _DI_MANU_ID, _DI_STREAM_CTRL, description);
}
void capi_DivaStreamingRemove(struct capi_pvt *i)
{
diva_stream_scheduling_entry_t* pE = i->diva_stream_entry;
int send_cancel = 0;
cc_mutex_lock(&stream_write_lock);
pE = i->diva_stream_entry;
if (pE != 0) {
i->diva_stream_entry = 0;
pE->i = 0;
if (pE->diva_stream_state == DivaStreamCreated) {
if (i->NCCI != 0) {
/*
If NCCI is not sen then this is no possibility to send cancel request
to queued in the IDI L2 streaming info. But in user mode this is OK,
if removing PLCI CAPI removes networking entity and this operation
causes cancellation of create streaming request.
Timeout is only for the rare case where create streaming request was newer
sent to hardware.
*/
send_cancel = 1;
}
pE->diva_stream_state = DivaStreamCancelSent;
pE->cancel_start = time(NULL) + 5;
DBG_LOG(("stream cancelled [%p]", pE->diva_stream))
} else if (pE->diva_stream_state == DivaStreamActive) {
pE->diva_stream->release_stream(pE->diva_stream);
pE->diva_stream_state = DivaStreamDisconnectSent;
}
}
cc_mutex_unlock(&stream_write_lock);
if (send_cancel != 0) {
static byte data[] = { 0x8 /* CONTROL */, 0x01 /* CANCEL */};
MESSAGE_EXCHANGE_ERROR error;
error = capi_sendf(NULL, 0, CAPI_DATA_B3_REQ, i->NCCI, get_capi_MessageNumber(),
"dwww", data, sizeof(data), 0, 1U << 4);
if (likely(error == 0)) {
cc_mutex_lock(&i->lock);
i->B3count++;
cc_mutex_unlock(&i->lock);
}
}
}
/*
This is only one used to access streaming scheduling list routine.
This ensures list can be accessed w/o anly locks
To ensure exclusive access scheduling queue is static function variable
*/
void divaStreamingWakeup (void) {
static diva_entity_queue_t active_streams;
diva_entity_link_t* link;
time_t current_time = time (NULL);
cc_mutex_lock(&stream_write_lock);
while ((link = diva_q_get_head (&diva_streaming_new)) != 0) {
diva_stream_scheduling_entry_t* pE = DIVAS_CONTAINING_RECORD(link, diva_stream_scheduling_entry_t, link);
diva_q_remove (&diva_streaming_new, &pE->link);
diva_q_add_tail (&active_streams, &pE->link);
}
cc_mutex_unlock(&stream_write_lock);
for (link = diva_q_get_head (&active_streams); likely(link != 0);) {
diva_entity_link_t* next = diva_q_get_next(link);
diva_stream_scheduling_entry_t* pE = DIVAS_CONTAINING_RECORD(link, diva_stream_scheduling_entry_t, link);
cc_mutex_lock(&stream_write_lock);
pE->diva_stream->wakeup (pE->diva_stream);
if (unlikely(pE->diva_stream_state == DivaStreamCancelSent && pE->cancel_start < current_time)) {
DBG_LOG(("stream reclaimed [%p]", pE->diva_stream))
pE->diva_stream->release (pE->diva_stream);
pE->diva_stream_state = DivaStreamDisconnected;
pE->diva_stream = 0;
}
if (unlikely(pE->diva_stream == 0)) {
diva_q_remove (&active_streams, &pE->link);
if (pE->i != 0) {
pE->i->diva_stream_entry = 0;
}
free (pE);
}
cc_mutex_unlock(&stream_write_lock);
link = next;
}
}
#endif
/*
* convert a number
*/
@ -1740,7 +1418,7 @@ int capi_write_frame(struct capi_pvt *i, struct ast_frame *f)
if (i->bproto == CC_BPROTO_VOCODER) {
#ifdef DIVA_STREAMING
cc_mutex_lock(&stream_write_lock);
capi_DivaStreamLock();
if (i->diva_stream_entry != 0) {
int written = 0, ready = 0;
@ -1750,7 +1428,7 @@ int capi_write_frame(struct capi_pvt *i, struct ast_frame *f)
written = i->diva_stream_entry->diva_stream->write (i->diva_stream_entry->diva_stream, 8U << 8 | DIVA_STREAM_MESSAGE_TX_IDI_REQUEST, f->FRAME_DATA_PTR, f->datalen);
i->diva_stream_entry->diva_stream->flush_stream(i->diva_stream_entry->diva_stream);
}
cc_mutex_unlock(&stream_write_lock);
capi_DivaStreamUnLock ();
error = written != f->datalen;
if (unlikely(error != 0)) {
@ -1760,7 +1438,7 @@ int capi_write_frame(struct capi_pvt *i, struct ast_frame *f)
#endif
{
#ifdef DIVA_STREAMING
cc_mutex_unlock(&stream_write_lock);
capi_DivaStreamUnLock ();
#endif
buf = &(i->send_buffer[(i->send_buffer_handle % CAPI_MAX_B3_BLOCKS) *
(CAPI_MAX_B3_BLOCK_SIZE + AST_FRIENDLY_OFFSET)]);
@ -1828,13 +1506,13 @@ int capi_write_frame(struct capi_pvt *i, struct ast_frame *f)
int written = 0, ready = 0;
B3Blocks = 0;
cc_mutex_lock(&stream_write_lock);
capi_DivaStreamLock();
if ((ready = (i->diva_stream_entry->diva_stream_state == DivaStreamActive)) &&
(i->diva_stream_entry->diva_stream->get_tx_free (i->diva_stream_entry->diva_stream) > 2*CAPI_MAX_B3_BLOCK_SIZE+128)) {
written = i->diva_stream_entry->diva_stream->write (i->diva_stream_entry->diva_stream, 8U << 8 | DIVA_STREAM_MESSAGE_TX_IDI_REQUEST, buf, fsmooth->datalen);
i->diva_stream_entry->diva_stream->flush_stream(i->diva_stream_entry->diva_stream);
}
cc_mutex_unlock(&stream_write_lock);
capi_DivaStreamUnLock ();
error = written != fsmooth->datalen;
if (unlikely(error != 0)) {

View File

@ -59,13 +59,6 @@ extern int capi_create_reader_writer_pipe(struct capi_pvt *i);
extern struct ast_frame *capi_read_pipeframe(struct capi_pvt *i);
extern int capi_write_frame(struct capi_pvt *i, struct ast_frame *f);
extern int capi_verify_resource_plci(const struct capi_pvt *i);
#ifdef DIVA_STREAMING
extern int capi_DivaStreamingSupported (unsigned controller);
extern void capi_DivaStreamingOn(struct capi_pvt *i, unsigned char streamCommand, _cword messageNumber);
extern void capi_DivaStreamingRemoveInfo(struct capi_pvt *i);
extern void capi_DivaStreamingRemove(struct capi_pvt *i);
extern void divaStreamingWakeup (void);
#endif
#define capi_number(data, strip) \
capi_number_func(data, strip, alloca(AST_MAX_EXTENSION))

View File

@ -0,0 +1,328 @@
#include "chan_capi_platform.h"
#include "chan_capi20.h"
#include "chan_capi.h"
#include "chan_capi_utils.h"
#include "platform.h"
#include "diva_streaming_result.h"
#include "diva_streaming_messages.h"
#include "diva_streaming_vector.h"
#include "diva_streaming_manager.h"
#include "chan_capi_divastreaming_utils.h"
/*
LOCALS
*/
AST_MUTEX_DEFINE_STATIC(stream_write_lock);
static diva_entity_queue_t diva_streaming_new; /* protected by stream_write_lock, new streams */
int capi_DivaStreamingSupported (unsigned controller)
{
MESSAGE_EXCHANGE_ERROR error;
int waitcount = 50;
unsigned char manbuf[CAPI_MANUFACTURER_LEN];
_cmsg CMSG;
int ret = 0;
if (capi20_get_manufacturer(controller, manbuf) == NULL) {
goto done;
}
if ((strstr((char *)manbuf, "Eicon") == 0) &&
(strstr((char *)manbuf, "Dialogic") == 0)) {
goto done;
}
error = capi_sendf (NULL, 0, CAPI_MANUFACTURER_REQ, controller, get_capi_MessageNumber(),
"dw(bs)", _DI_MANU_ID, _DI_STREAM_CTRL, 2, "");
if (error)
goto done;
while (waitcount) {
error = capidev_check_wait_get_cmsg(&CMSG);
if (IS_MANUFACTURER_CONF(&CMSG) && (CMSG.ManuID == _DI_MANU_ID) &&
((CMSG.Class & 0xffff) == _DI_STREAM_CTRL)) {
error = (MESSAGE_EXCHANGE_ERROR)(CMSG.Class >> 16);
ret = (error == 0);
break;
}
usleep(30000);
waitcount--;
}
done:
return ret;
}
static int divaStreamingMessageRx (void* user_context, dword message, dword length, const struct _diva_streaming_vector* v, dword nr_v)
{
diva_stream_scheduling_entry_t* pE = (diva_stream_scheduling_entry_t*)user_context;
dword message_type = (message & 0xff);
if (message_type == 0) { /* IDI message */
dword offset = 0;
diva_streaming_vector_t vind[8];
byte Ind = 0;
int vind_nr = 0;
int process_indication;
do {
vind_nr = sizeof(vind)/sizeof(vind[0]);
offset = diva_streaming_get_indication_data (offset, message, length, v, nr_v, &Ind, vind, &vind_nr);
process_indication = (diva_streaming_idi_supported_ind (Ind, vind_nr != 0, vind_nr != 0 ? (byte*)vind->data : (byte*)"") != 0);
if (likely(process_indication != 0)) {
if (likely(Ind == 8)) {
if (likely(pE->i != 0 && pE->i->NCCI != 0))
capidev_handle_data_b3_indication_vector (pE->i, vind, vind_nr);
} else {
dword i = 0, k = 0;
word data_length;
byte ind_data_buffer[2048+512];
data_length = (word)diva_streaming_read_vector_data (vind, vind_nr, &i, &k, ind_data_buffer, sizeof(ind_data_buffer));
DBG_TRC(("Ind: %02x length:%u", Ind, data_length))
}
}
} while (offset != 0);
} else if (message_type == 0xff) { /* System message */
switch ((byte)(message >> 8)) {
case DIVA_STREAM_MESSAGE_INIT: /* Stream active */
if (pE->PLCI == 0 && pE->i != 0) {
pE->PLCI = pE->i->PLCI;
}
cc_verbose(3, 0, VERBOSE_PREFIX_2 "%s: stream active (PLCI=%#x)\n", pE->vname, pE->PLCI);
if (pE->diva_stream_state == DivaStreamCreated) {
pE->diva_stream_state = DivaStreamActive;
} else if (pE->diva_stream_state == DivaStreamCancelSent) {
pE->diva_stream->release_stream(pE->diva_stream);
pE->i = 0;
pE->diva_stream_state = DivaStreamDisconnectSent;
}
break;
case DIVA_STREAM_MESSAGE_RX_TX_ACK: /* Resolved Tx flow control */
/* cc_verbose(4, 1, "%s: stream tx ack (PLCI=%#x)\n", pE->vname, pE->PLCI); */
break;
case DIVA_STREAM_MESSAGE_SYNC_ACK: /* Sync ack request acknowledge */
cc_verbose(4, 1, "%s: stream sync ack (PLCI=%#x, sequence=%08x)\n", pE->vname, pE->PLCI, length);
break;
case DIVA_STREAM_MESSAGE_RELEASE_ACK: /* Release stream acknowledge */
pE->diva_stream_state = DivaStreamDisconnected;
pE->diva_stream = 0;
cc_verbose(3, 0, VERBOSE_PREFIX_2 "%s: stream released (PLCI=%#x)\n", pE->vname, pE->PLCI);
break;
case DIVA_STREAM_MESSAGE_INIT_ERROR: /* Failed to initialize stream */
pE->diva_stream_state = DivaStreamDisconnected;
pE->diva_stream = 0;
cc_verbose(3, 0, VERBOSE_PREFIX_2 "%s: stream init error (PLCI=%#x, error=%d)\n", pE->vname, pE->PLCI, length);
break;
default:
cc_verbose(3, 0, VERBOSE_PREFIX_2 "%s: stream unknown system message (PLCI=%#x, message=%08x)\n", pE->vname, pE->PLCI, message);
break;
}
} else {
cc_verbose(3, 0, VERBOSE_PREFIX_2 "%s: unknown stream message (PLCI=%#x, message=%08x)\n", pE->vname, pE->PLCI, message);
}
return (0);
}
/*
* Create Diva stream
*
*/
void capi_DivaStreamingOn(struct capi_pvt *i, byte streamCommand, _cword messageNumber)
{
diva_stream_scheduling_entry_t* pE;
int ret;
char trace_ident[8];
unsigned int effectivePLCI;
pE = malloc (sizeof(*pE));
if (pE == 0)
return;
snprintf (trace_ident, sizeof(trace_ident), "C%02x", (byte)i->PLCI);
trace_ident[sizeof(trace_ident)-1] = 0;
cc_mutex_lock(&stream_write_lock);
ret = diva_stream_create (&pE->diva_stream, NULL, 255, divaStreamingMessageRx, pE, trace_ident);
if (ret == 0) {
byte* description = (byte*)pE->diva_stream->description (pE->diva_stream);
MESSAGE_EXCHANGE_ERROR error;
description[1] = streamCommand;
description[3] |= 0x01;
if (streamCommand == 0) {
messageNumber = get_capi_MessageNumber();
effectivePLCI = i->PLCI;
} else {
/*
PLCI still not assigned. Send to controller and tag with message number
where command receives effective
*/
effectivePLCI = i->controller;
}
error = capi_sendf (NULL, 0, CAPI_MANUFACTURER_REQ, effectivePLCI, messageNumber,
"dws", _DI_MANU_ID, _DI_STREAM_CTRL, description);
if (error == 0) {
pE->diva_stream_state = DivaStreamCreated;
pE->PLCI = i->PLCI;
pE->i = i;
i->diva_stream_entry = pE;
memcpy (pE->vname, i->vname, MIN(sizeof(pE->vname), sizeof(i->vname)));
pE->vname[sizeof(pE->vname)-1] = 0;
pE->rx_flow_control = 0;
pE->tx_flow_control = 0;
diva_q_add_tail (&diva_streaming_new, &pE->link);
} else {
pE->diva_stream->release (pE->diva_stream);
free (pE);
}
}
cc_mutex_unlock(&stream_write_lock);
}
/*
* Remove stream info
*
* To remove stream from one active connection:
* remove stream info
* disconnect B3
* remove stream
* select_b
*/
void capi_DivaStreamingRemoveInfo(struct capi_pvt *i)
{
byte description[] = { 2, 0, 0 };
MESSAGE_EXCHANGE_ERROR error;
int send;
cc_mutex_lock(&stream_write_lock);
send = i->diva_stream_entry != 0;
cc_mutex_unlock(&stream_write_lock);
if (send != 0)
error = capi_sendf (NULL, 0, CAPI_MANUFACTURER_REQ, i->PLCI, get_capi_MessageNumber(),
"dws", _DI_MANU_ID, _DI_STREAM_CTRL, description);
}
void capi_DivaStreamingRemove(struct capi_pvt *i)
{
diva_stream_scheduling_entry_t* pE = i->diva_stream_entry;
int send_cancel = 0;
cc_mutex_lock(&stream_write_lock);
pE = i->diva_stream_entry;
if (pE != 0) {
i->diva_stream_entry = 0;
pE->i = 0;
if (pE->diva_stream_state == DivaStreamCreated) {
if (i->NCCI != 0) {
/*
If NCCI is not sen then this is no possibility to send cancel request
to queued in the IDI L2 streaming info. But in user mode this is OK,
if removing PLCI CAPI removes networking entity and this operation
causes cancellation of create streaming request.
Timeout is only for the rare case where create streaming request was newer
sent to hardware.
*/
send_cancel = 1;
}
pE->diva_stream_state = DivaStreamCancelSent;
pE->cancel_start = time(NULL) + 5;
DBG_LOG(("stream cancelled [%p]", pE->diva_stream))
} else if (pE->diva_stream_state == DivaStreamActive) {
pE->diva_stream->release_stream(pE->diva_stream);
pE->diva_stream_state = DivaStreamDisconnectSent;
}
}
cc_mutex_unlock(&stream_write_lock);
if (send_cancel != 0) {
static byte data[] = { 0x8 /* CONTROL */, 0x01 /* CANCEL */};
MESSAGE_EXCHANGE_ERROR error;
error = capi_sendf(NULL, 0, CAPI_DATA_B3_REQ, i->NCCI, get_capi_MessageNumber(),
"dwww", data, sizeof(data), 0, 1U << 4);
if (likely(error == 0)) {
cc_mutex_lock(&i->lock);
i->B3count++;
cc_mutex_unlock(&i->lock);
}
}
}
/*
This is only one used to access streaming scheduling list routine.
This ensures list can be accessed w/o anly locks
To ensure exclusive access scheduling queue is static function variable
*/
void divaStreamingWakeup (void)
{
static diva_entity_queue_t active_streams;
diva_entity_link_t* link;
time_t current_time = time (NULL);
cc_mutex_lock(&stream_write_lock);
while ((link = diva_q_get_head (&diva_streaming_new)) != 0) {
diva_stream_scheduling_entry_t* pE = DIVAS_CONTAINING_RECORD(link, diva_stream_scheduling_entry_t, link);
diva_q_remove (&diva_streaming_new, &pE->link);
diva_q_add_tail (&active_streams, &pE->link);
}
cc_mutex_unlock(&stream_write_lock);
for (link = diva_q_get_head (&active_streams); likely(link != 0);) {
diva_entity_link_t* next = diva_q_get_next(link);
diva_stream_scheduling_entry_t* pE = DIVAS_CONTAINING_RECORD(link, diva_stream_scheduling_entry_t, link);
cc_mutex_lock(&stream_write_lock);
pE->diva_stream->wakeup (pE->diva_stream);
if (unlikely(pE->diva_stream_state == DivaStreamCancelSent && pE->cancel_start < current_time)) {
DBG_LOG(("stream reclaimed [%p]", pE->diva_stream))
pE->diva_stream->release (pE->diva_stream);
pE->diva_stream_state = DivaStreamDisconnected;
pE->diva_stream = 0;
}
if (unlikely(pE->diva_stream == 0)) {
diva_q_remove (&active_streams, &pE->link);
if (pE->i != 0) {
pE->i->diva_stream_entry = 0;
}
free (pE);
}
cc_mutex_unlock(&stream_write_lock);
link = next;
}
}
void capi_DivaStreamLock (void)
{
cc_mutex_lock(&stream_write_lock);
}
void capi_DivaStreamUnLock (void)
{
cc_mutex_unlock(&stream_write_lock);
}

View File

@ -0,0 +1,33 @@
#ifndef __DIVA_CAPI_STREAMING_UTILS_H__
#define __DIVA_CAPI_STREAMING_UTILS_H__
extern int capi_DivaStreamingSupported(unsigned controller);
extern void capi_DivaStreamingOn(struct capi_pvt *i, unsigned char streamCommand, _cword messageNumber);
extern void capi_DivaStreamingRemoveInfo(struct capi_pvt *i);
extern void capi_DivaStreamingRemove(struct capi_pvt *i);
extern void divaStreamingWakeup(void);
extern void capi_DivaStreamLock(void);
extern void capi_DivaStreamUnLock (void);
typedef enum _diva_stream_state {
DivaStreamCreated = 0,
DivaStreamActive = 1,
DivaStreamCancelSent = 2,
DivaStreamDisconnectSent = 3,
DivaStreamDisconnected = 4
} diva_stream_state_t;
typedef struct _diva_stream_scheduling_entry {
diva_entity_link_t link;
struct _diva_stream *diva_stream;
diva_stream_state_t diva_stream_state;
struct capi_pvt *i;
int rx_flow_control;
int tx_flow_control;
char vname[CAPI_MAX_STRING]; /* Cached from capi_pvt */
dword PLCI; /* Cached from capi_pvt */
time_t cancel_start;
} diva_stream_scheduling_entry_t;
#endif