Diva Streaming

This commit is contained in:
MelwareDE 2010-04-08 22:10:54 +00:00
parent d6c8fe4db9
commit 325380b5e9
5 changed files with 242 additions and 1 deletions

View File

@ -4625,6 +4625,11 @@ static void capidev_handle_disconnect_indication(_cmsg *CMSG, unsigned int PLCI,
FRAME_SUBCLASS_INTEGER(fr.subclass) = AST_CONTROL_HANGUP;
#ifdef DIVA_STREAMING
if (i->diva_stream_entry != 0)
capi_DivaStreamingRemove(i);
#endif
capi_sendf(NULL, 0, CAPI_DISCONNECT_RESP, PLCI, HEADER_MSGNUM(CMSG), "");
show_capi_info(i, DISCONNECT_IND_REASON(CMSG));
@ -4852,6 +4857,13 @@ static void capidev_handle_connect_indication(_cmsg *CMSG, unsigned int PLCI, un
/* Handle QSIG informations, if any */
cc_qsig_handle_capiind(CONNECT_IND_FACILITYDATAARRAY(CMSG), i);
#ifdef DIVA_STREAMING
i->diva_stream_entry = 0;
if (capi_controllers[i->controller]->divaStreaming != 0) {
capi_DivaStreamingOn(i);
}
#endif
if (i->immediate) {
if ((i->isdnmode == CAPI_ISDNMODE_MSN) || (!(strlen(i->dnid)))) {
@ -7094,6 +7106,9 @@ static void *capidev_loop(void *data)
lastcall = newtime;
capidev_run_secondly(newtime);
}
#ifdef DIVA_STREAMING
divaStreamingWakeup ();
#endif
} /* for */
/* never reached */
@ -7947,7 +7962,11 @@ static int cc_init_capi(void)
if (privateoptions & 0x04) {
cc_verbose(3, 0, VERBOSE_PREFIX_4 "T.38 is supported (not implemented yet)\n");
}
#ifdef DIVA_STREAMING
/** \todo check CAPI profile */
cc_verbose(3, 0, VERBOSE_PREFIX_4 "Divaa streaming is supported\n");
cp->divaStreaming = 1;
#endif
capi_controllers[controller] = cp;
}
return 0;

View File

@ -49,6 +49,10 @@
#ifndef _PBX_CAPI_H
#define _PBX_CAPI_H
#ifdef DIVA_STREAMING
struct _diva_stream_scheduling_entry;
#endif
#define CAPI_MAX_CONTROLLERS 64
#define CAPI_MAX_B3_BLOCKS 7
@ -533,6 +537,10 @@ struct capi_pvt {
struct capi_pvt *line_plci;
/* Resource PLCI data data if line */
struct capi_pvt *data_plci;
#ifdef DIVA_STREAMING
struct _diva_stream_scheduling_entry* diva_stream_entry;
#endif
/*! Next channel in list */
struct capi_pvt *next;
@ -635,6 +643,9 @@ struct cc_capi_controller {
int divaExtendedFeaturesAvailable;
int ecPath;
int fax_t30_extended;
#ifdef DIVA_STREAMING
int divaStreaming;
#endif
};

View File

@ -25,6 +25,13 @@
#include "chan_capi_rtp.h"
#include "chan_capi_utils.h"
#include "chan_capi_supplementary.h"
#ifdef DIVA_STREAMING
#include "platform.h"
#include "diva_streaming_result.h"
#include "diva_streaming_messages.h"
#include "diva_streaming_vector.h"
#include "diva_streaming_manager.h"
#endif
int capidebug = 0;
char *emptyid = "\0";
@ -46,6 +53,28 @@ static struct peerlink_s {
time_t age;
} peerlinkchannel[CAPI_MAX_PEERLINKCHANNELS];
#ifdef DIVA_STREAMING
typedef enum _diva_stream_state {
DivaStreamCreated = 0,
DivaStreamActive = 1,
DivaStreamDisconnectSent = 2,
DivaStreamDisconnected = 3
} diva_stream_state_t;
typedef struct _diva_stream_scheduling_entry {
diva_entity_link_t link;
struct _diva_stream *diva_stream;
diva_stream_state_t diva_stream_state;
struct capi_pvt *i;
int rx_flow_control;
int tx_flow_control;
char vname[CAPI_MAX_STRING]; /* Cached from capi_pvt */
dword PLCI; /* Cached from capi_pvt */
} diva_stream_scheduling_entry_t;
static diva_entity_queue_t diva_streaming_new; /* protected by capi_put_lock, new streams */
#endif
/*
* helper for <pbx>_verbose
*/
@ -529,7 +558,11 @@ MESSAGE_EXCHANGE_ERROR capidev_check_wait_get_cmsg(_cmsg *CMSG)
struct timeval tv;
tv.tv_sec = 0;
#ifdef DIVA_STREAMING
tv.tv_usec = 5000;
#else
tv.tv_usec = 500000;
#endif
Info = capi20_waitformessage(capi_ApplID, &tv);
@ -1124,6 +1157,177 @@ done:
return error;
}
#ifdef DIVA_STREAMING
static int divaStreamingMessageRx (void* user_context, dword message, dword length, const struct _diva_streaming_vector* v, dword nr_v)
{
diva_stream_scheduling_entry_t* pE = (diva_stream_scheduling_entry_t*)user_context;
dword message_type = (message & 0xff);
if (message_type == 0) { /* IDI message */
#if 0
dword offset = 0;
diva_streaming_vector_t vind[8];
byte Ind = 0;
int vind_nr = 0;
int process_indication;
do {
vind_nr = sizeof(vind)/sizeof(vind[0]);
offset = diva_streaming_get_indication_data (offset, message, length, v, nr_v, &Ind, vind, &vind_nr);
process_indication = ((Ind & 0x0f) == N_UDATA || channel_suspended == 0) && (diva_streaming_idi_supported_ind (Ind, vind_nr != 0, vind_nr != 0 ? (byte*)vind->data : (byte*)"") != 0);
if (likely(process_indication != 0)) {
dword i = 0, k = 0;
byte* indication_length_data;
word data_length;
pE->combined_ind.data[pE->combined_ind.total_length++] = Ind;
pE->combined_ind.data[pE->combined_ind.total_length++] = pE->channels[pE->diva_stream_xdi_channel].xdi_channel;
indication_length_data = &pE->combined_ind.data[pE->combined_ind.total_length];
pE->combined_ind.total_length += 2;
data_length = (word)diva_streaming_read_vector_data (vind, vind_nr, &i, &k,
&pE->combined_ind.data[pE->combined_ind.total_length],
sizeof(pE->combined_ind.data_buffer)-pE->combined_ind.total_length-1);
*indication_length_data++ = (byte)data_length;
*indication_length_data = (byte)(data_length >> 8);
pE->combined_ind.total_length += data_length;
}
} while (offset != 0);
#endif
} else if (message_type == 0xff) { /* System message */
switch ((byte)(message >> 8)) {
case DIVA_STREAM_MESSAGE_INIT: /* Stream active */
cc_verbose(3, 0, VERBOSE_PREFIX_2 "%s: stream active (PLCI=%#x)\n", pE->vname, pE->PLCI);
pE->diva_stream_state = DivaStreamActive;
break;
case DIVA_STREAM_MESSAGE_RX_TX_ACK: /* Resolved Tx flow control */
cc_verbose(4, 1, "%s: stream tx ack (PLCI=%#x)\n", pE->vname, pE->PLCI);
break;
case DIVA_STREAM_MESSAGE_SYNC_ACK: /* Sync ack request acknowledge */
cc_verbose(4, 1, "%s: stream sync ack (PLCI=%#x, sequence=%08x)\n", pE->vname, pE->PLCI, length);
break;
case DIVA_STREAM_MESSAGE_RELEASE_ACK: /* Release stream acknowledge */
pE->diva_stream_state = DivaStreamDisconnected;
pE->diva_stream = 0;
cc_verbose(3, 0, VERBOSE_PREFIX_2 "%s: stream released (PLCI=%#x)\n", pE->vname, pE->PLCI);
break;
case DIVA_STREAM_MESSAGE_INIT_ERROR: /* Failed to initialize stream */
pE->diva_stream_state = DivaStreamDisconnected;
pE->diva_stream = 0;
cc_verbose(3, 0, VERBOSE_PREFIX_2 "%s: stream init error (PLCI=%#x, error=%d)\n", pE->vname, pE->PLCI, length);
break;
default:
cc_verbose(3, 0, VERBOSE_PREFIX_2 "%s: stream unknown system message (PLCI=%#x, message=%08x)\n", pE->vname, pE->PLCI, message);
break;
}
} else {
cc_verbose(3, 0, VERBOSE_PREFIX_2 "%s: unknown stream message (PLCI=%#x, message=%08x)\n", pE->vname, pE->PLCI, message);
}
return (0);
}
/*
* Create Diva stream
*
*/
void capi_DivaStreamingOn(struct capi_pvt *i)
{
diva_stream_scheduling_entry_t* pE;
int ret;
char trace_ident[8];
pE = malloc (sizeof(*pE));
if (pE == 0)
return;
snprintf (trace_ident, sizeof(trace_ident), "C%02x", (byte)i->PLCI);
trace_ident[sizeof(trace_ident)-1] = 0;
ret = diva_stream_create (&pE->diva_stream, NULL, 255, divaStreamingMessageRx, pE, trace_ident);
if (ret == 0) {
const byte* description = pE->diva_stream->description (pE->diva_stream);
MESSAGE_EXCHANGE_ERROR error;
cc_mutex_lock(&capi_put_lock);
error = capi_sendf (NULL, 0, CAPI_MANUFACTURER_REQ, i->PLCI, get_capi_MessageNumber(),
"dws", _DI_MANU_ID, _DI_STREAM_CTRL, description);
if (error == 0) {
pE->diva_stream_state = DivaStreamCreated;
pE->PLCI = i->PLCI;
pE->i = i;
i->diva_stream_entry = pE;
memcpy (pE->vname, i->vname, MIN(sizeof(pE->vname), sizeof(i->vname)));
pE->vname[sizeof(pE->vname)-1] = 0;
pE->rx_flow_control = 0;
pE->tx_flow_control = 0;
diva_q_add_tail (&diva_streaming_new, &pE->link);
} else {
pE->diva_stream->release (pE->diva_stream);
free (pE);
}
cc_mutex_unlock(&capi_put_lock);
}
}
void capi_DivaStreamingRemove(struct capi_pvt *i)
{
diva_stream_scheduling_entry_t* pE = i->diva_stream_entry;
if (pE != 0) {
if (pE->diva_stream_state == DivaStreamCreated) {
/** \todo Use Diva MANUFACTURER_REQ to cancel stream */
} else if (pE->diva_stream_state == DivaStreamActive) {
pE->diva_stream->release_stream(pE->diva_stream);
}
}
}
/*
This is only one used to access streaming scheduling list routine.
This ensures list can be accessed w/o anly locks
To ensure exclusive access scheduling queue is static function variable
*/
void diva_streaming_wakeup (void) {
static diva_entity_queue_t active_streams;
diva_entity_link_t* link;
cc_mutex_lock(&capi_put_lock);
while ((link = diva_q_get_head (&diva_streaming_new)) != 0) {
diva_stream_scheduling_entry_t* pE = DIVAS_CONTAINING_RECORD(link, diva_stream_scheduling_entry_t, link);
diva_q_remove (&diva_streaming_new, &pE->link);
diva_q_add_tail (&active_streams, &pE->link);
}
cc_mutex_unlock(&capi_put_lock);
for (link = diva_q_get_head (&active_streams); link != 0;) {
diva_entity_link_t* next = diva_q_get_next(link);
diva_stream_scheduling_entry_t* pE = DIVAS_CONTAINING_RECORD(link, diva_stream_scheduling_entry_t, link);
pE->diva_stream->wakeup (pE->diva_stream);
if (pE->diva_stream == 0) {
diva_q_remove (&active_streams, &pE->link);
if (pE->i != 0) {
pE->i->diva_stream_entry = 0;
}
free (pE);
}
link = next;
}
}
#endif
/*
* convert a number
*/

View File

@ -59,6 +59,11 @@ extern int capi_create_reader_writer_pipe(struct capi_pvt *i);
extern struct ast_frame *capi_read_pipeframe(struct capi_pvt *i);
extern int capi_write_frame(struct capi_pvt *i, struct ast_frame *f);
extern int capi_verify_resource_plci(const struct capi_pvt *i);
#ifdef DIVA_STREAMING
extern void capi_DivaStreamingOn(struct capi_pvt *i);
extern void capi_DivaStreamingRemove(struct capi_pvt *i);
extern void divaStreamingWakeup (void);
#endif
#define capi_number(data, strip) \
capi_number_func(data, strip, alloca(AST_MAX_EXTENSION))

View File

@ -80,6 +80,8 @@ typedef unsigned long long qword;
void* diva_os_malloc (unsigned long flags, unsigned long size);
void diva_os_free (unsigned long flags, void* ptr);
#define _DI_STREAM_CTRL 0x0014
#include <string.h>
#include "debuglib.h"