diff --git a/chan_capi.c b/chan_capi.c index b7d5d0c..46785d0 100644 --- a/chan_capi.c +++ b/chan_capi.c @@ -4625,6 +4625,11 @@ static void capidev_handle_disconnect_indication(_cmsg *CMSG, unsigned int PLCI, FRAME_SUBCLASS_INTEGER(fr.subclass) = AST_CONTROL_HANGUP; +#ifdef DIVA_STREAMING + if (i->diva_stream_entry != 0) + capi_DivaStreamingRemove(i); +#endif + capi_sendf(NULL, 0, CAPI_DISCONNECT_RESP, PLCI, HEADER_MSGNUM(CMSG), ""); show_capi_info(i, DISCONNECT_IND_REASON(CMSG)); @@ -4852,6 +4857,13 @@ static void capidev_handle_connect_indication(_cmsg *CMSG, unsigned int PLCI, un /* Handle QSIG informations, if any */ cc_qsig_handle_capiind(CONNECT_IND_FACILITYDATAARRAY(CMSG), i); + +#ifdef DIVA_STREAMING + i->diva_stream_entry = 0; + if (capi_controllers[i->controller]->divaStreaming != 0) { + capi_DivaStreamingOn(i); + } +#endif if (i->immediate) { if ((i->isdnmode == CAPI_ISDNMODE_MSN) || (!(strlen(i->dnid)))) { @@ -7094,6 +7106,9 @@ static void *capidev_loop(void *data) lastcall = newtime; capidev_run_secondly(newtime); } +#ifdef DIVA_STREAMING + divaStreamingWakeup (); +#endif } /* for */ /* never reached */ @@ -7947,7 +7962,11 @@ static int cc_init_capi(void) if (privateoptions & 0x04) { cc_verbose(3, 0, VERBOSE_PREFIX_4 "T.38 is supported (not implemented yet)\n"); } - +#ifdef DIVA_STREAMING + /** \todo check CAPI profile */ + cc_verbose(3, 0, VERBOSE_PREFIX_4 "Divaa streaming is supported\n"); + cp->divaStreaming = 1; +#endif capi_controllers[controller] = cp; } return 0; diff --git a/chan_capi.h b/chan_capi.h index de42d1d..ff5cdd6 100644 --- a/chan_capi.h +++ b/chan_capi.h @@ -49,6 +49,10 @@ #ifndef _PBX_CAPI_H #define _PBX_CAPI_H +#ifdef DIVA_STREAMING +struct _diva_stream_scheduling_entry; +#endif + #define CAPI_MAX_CONTROLLERS 64 #define CAPI_MAX_B3_BLOCKS 7 @@ -533,6 +537,10 @@ struct capi_pvt { struct capi_pvt *line_plci; /* Resource PLCI data data if line */ struct capi_pvt *data_plci; + +#ifdef DIVA_STREAMING + struct _diva_stream_scheduling_entry* diva_stream_entry; +#endif /*! Next channel in list */ struct capi_pvt *next; @@ -635,6 +643,9 @@ struct cc_capi_controller { int divaExtendedFeaturesAvailable; int ecPath; int fax_t30_extended; +#ifdef DIVA_STREAMING + int divaStreaming; +#endif }; diff --git a/chan_capi_utils.c b/chan_capi_utils.c index 00f2152..6adf02c 100644 --- a/chan_capi_utils.c +++ b/chan_capi_utils.c @@ -25,6 +25,13 @@ #include "chan_capi_rtp.h" #include "chan_capi_utils.h" #include "chan_capi_supplementary.h" +#ifdef DIVA_STREAMING +#include "platform.h" +#include "diva_streaming_result.h" +#include "diva_streaming_messages.h" +#include "diva_streaming_vector.h" +#include "diva_streaming_manager.h" +#endif int capidebug = 0; char *emptyid = "\0"; @@ -46,6 +53,28 @@ static struct peerlink_s { time_t age; } peerlinkchannel[CAPI_MAX_PEERLINKCHANNELS]; +#ifdef DIVA_STREAMING +typedef enum _diva_stream_state { + DivaStreamCreated = 0, + DivaStreamActive = 1, + DivaStreamDisconnectSent = 2, + DivaStreamDisconnected = 3 +} diva_stream_state_t; + +typedef struct _diva_stream_scheduling_entry { + diva_entity_link_t link; + struct _diva_stream *diva_stream; + diva_stream_state_t diva_stream_state; + struct capi_pvt *i; + int rx_flow_control; + int tx_flow_control; + char vname[CAPI_MAX_STRING]; /* Cached from capi_pvt */ + dword PLCI; /* Cached from capi_pvt */ +} diva_stream_scheduling_entry_t; + +static diva_entity_queue_t diva_streaming_new; /* protected by capi_put_lock, new streams */ +#endif + /* * helper for _verbose */ @@ -529,7 +558,11 @@ MESSAGE_EXCHANGE_ERROR capidev_check_wait_get_cmsg(_cmsg *CMSG) struct timeval tv; tv.tv_sec = 0; +#ifdef DIVA_STREAMING + tv.tv_usec = 5000; +#else tv.tv_usec = 500000; +#endif Info = capi20_waitformessage(capi_ApplID, &tv); @@ -1124,6 +1157,177 @@ done: return error; } +#ifdef DIVA_STREAMING +static int divaStreamingMessageRx (void* user_context, dword message, dword length, const struct _diva_streaming_vector* v, dword nr_v) +{ + diva_stream_scheduling_entry_t* pE = (diva_stream_scheduling_entry_t*)user_context; + dword message_type = (message & 0xff); + + if (message_type == 0) { /* IDI message */ +#if 0 + dword offset = 0; + diva_streaming_vector_t vind[8]; + byte Ind = 0; + int vind_nr = 0; + int process_indication; + + do { + vind_nr = sizeof(vind)/sizeof(vind[0]); + offset = diva_streaming_get_indication_data (offset, message, length, v, nr_v, &Ind, vind, &vind_nr); + + process_indication = ((Ind & 0x0f) == N_UDATA || channel_suspended == 0) && (diva_streaming_idi_supported_ind (Ind, vind_nr != 0, vind_nr != 0 ? (byte*)vind->data : (byte*)"") != 0); + + if (likely(process_indication != 0)) { + dword i = 0, k = 0; + byte* indication_length_data; + word data_length; + + pE->combined_ind.data[pE->combined_ind.total_length++] = Ind; + pE->combined_ind.data[pE->combined_ind.total_length++] = pE->channels[pE->diva_stream_xdi_channel].xdi_channel; + indication_length_data = &pE->combined_ind.data[pE->combined_ind.total_length]; + pE->combined_ind.total_length += 2; + + data_length = (word)diva_streaming_read_vector_data (vind, vind_nr, &i, &k, + &pE->combined_ind.data[pE->combined_ind.total_length], + sizeof(pE->combined_ind.data_buffer)-pE->combined_ind.total_length-1); + *indication_length_data++ = (byte)data_length; + *indication_length_data = (byte)(data_length >> 8); + pE->combined_ind.total_length += data_length; + } + } while (offset != 0); +#endif + } else if (message_type == 0xff) { /* System message */ + switch ((byte)(message >> 8)) { + case DIVA_STREAM_MESSAGE_INIT: /* Stream active */ + cc_verbose(3, 0, VERBOSE_PREFIX_2 "%s: stream active (PLCI=%#x)\n", pE->vname, pE->PLCI); + pE->diva_stream_state = DivaStreamActive; + break; + + case DIVA_STREAM_MESSAGE_RX_TX_ACK: /* Resolved Tx flow control */ + cc_verbose(4, 1, "%s: stream tx ack (PLCI=%#x)\n", pE->vname, pE->PLCI); + break; + + case DIVA_STREAM_MESSAGE_SYNC_ACK: /* Sync ack request acknowledge */ + cc_verbose(4, 1, "%s: stream sync ack (PLCI=%#x, sequence=%08x)\n", pE->vname, pE->PLCI, length); + break; + + case DIVA_STREAM_MESSAGE_RELEASE_ACK: /* Release stream acknowledge */ + pE->diva_stream_state = DivaStreamDisconnected; + pE->diva_stream = 0; + cc_verbose(3, 0, VERBOSE_PREFIX_2 "%s: stream released (PLCI=%#x)\n", pE->vname, pE->PLCI); + break; + + case DIVA_STREAM_MESSAGE_INIT_ERROR: /* Failed to initialize stream */ + pE->diva_stream_state = DivaStreamDisconnected; + pE->diva_stream = 0; + cc_verbose(3, 0, VERBOSE_PREFIX_2 "%s: stream init error (PLCI=%#x, error=%d)\n", pE->vname, pE->PLCI, length); + break; + + default: + cc_verbose(3, 0, VERBOSE_PREFIX_2 "%s: stream unknown system message (PLCI=%#x, message=%08x)\n", pE->vname, pE->PLCI, message); + break; + } + } else { + cc_verbose(3, 0, VERBOSE_PREFIX_2 "%s: unknown stream message (PLCI=%#x, message=%08x)\n", pE->vname, pE->PLCI, message); + } + + return (0); +} + +/* + * Create Diva stream + * + */ +void capi_DivaStreamingOn(struct capi_pvt *i) +{ + diva_stream_scheduling_entry_t* pE; + int ret; + char trace_ident[8]; + + pE = malloc (sizeof(*pE)); + if (pE == 0) + return; + + snprintf (trace_ident, sizeof(trace_ident), "C%02x", (byte)i->PLCI); + trace_ident[sizeof(trace_ident)-1] = 0; + + ret = diva_stream_create (&pE->diva_stream, NULL, 255, divaStreamingMessageRx, pE, trace_ident); + + if (ret == 0) { + const byte* description = pE->diva_stream->description (pE->diva_stream); + MESSAGE_EXCHANGE_ERROR error; + + cc_mutex_lock(&capi_put_lock); + error = capi_sendf (NULL, 0, CAPI_MANUFACTURER_REQ, i->PLCI, get_capi_MessageNumber(), + "dws", _DI_MANU_ID, _DI_STREAM_CTRL, description); + if (error == 0) { + pE->diva_stream_state = DivaStreamCreated; + pE->PLCI = i->PLCI; + pE->i = i; + i->diva_stream_entry = pE; + memcpy (pE->vname, i->vname, MIN(sizeof(pE->vname), sizeof(i->vname))); + pE->vname[sizeof(pE->vname)-1] = 0; + pE->rx_flow_control = 0; + pE->tx_flow_control = 0; + diva_q_add_tail (&diva_streaming_new, &pE->link); + } else { + pE->diva_stream->release (pE->diva_stream); + free (pE); + } + cc_mutex_unlock(&capi_put_lock); + } +} + +void capi_DivaStreamingRemove(struct capi_pvt *i) +{ + diva_stream_scheduling_entry_t* pE = i->diva_stream_entry; + + if (pE != 0) { + if (pE->diva_stream_state == DivaStreamCreated) { + /** \todo Use Diva MANUFACTURER_REQ to cancel stream */ + } else if (pE->diva_stream_state == DivaStreamActive) { + pE->diva_stream->release_stream(pE->diva_stream); + } + } +} + +/* + This is only one used to access streaming scheduling list routine. + This ensures list can be accessed w/o anly locks + + To ensure exclusive access scheduling queue is static function variable + */ +void diva_streaming_wakeup (void) { + static diva_entity_queue_t active_streams; + diva_entity_link_t* link; + + cc_mutex_lock(&capi_put_lock); + while ((link = diva_q_get_head (&diva_streaming_new)) != 0) { + diva_stream_scheduling_entry_t* pE = DIVAS_CONTAINING_RECORD(link, diva_stream_scheduling_entry_t, link); + diva_q_remove (&diva_streaming_new, &pE->link); + diva_q_add_tail (&active_streams, &pE->link); + } + cc_mutex_unlock(&capi_put_lock); + + for (link = diva_q_get_head (&active_streams); link != 0;) { + diva_entity_link_t* next = diva_q_get_next(link); + diva_stream_scheduling_entry_t* pE = DIVAS_CONTAINING_RECORD(link, diva_stream_scheduling_entry_t, link); + + pE->diva_stream->wakeup (pE->diva_stream); + + if (pE->diva_stream == 0) { + diva_q_remove (&active_streams, &pE->link); + if (pE->i != 0) { + pE->i->diva_stream_entry = 0; + } + free (pE); + } + + link = next; + } +} +#endif + /* * convert a number */ diff --git a/chan_capi_utils.h b/chan_capi_utils.h index 8dbc7d5..316b1ea 100644 --- a/chan_capi_utils.h +++ b/chan_capi_utils.h @@ -59,6 +59,11 @@ extern int capi_create_reader_writer_pipe(struct capi_pvt *i); extern struct ast_frame *capi_read_pipeframe(struct capi_pvt *i); extern int capi_write_frame(struct capi_pvt *i, struct ast_frame *f); extern int capi_verify_resource_plci(const struct capi_pvt *i); +#ifdef DIVA_STREAMING +extern void capi_DivaStreamingOn(struct capi_pvt *i); +extern void capi_DivaStreamingRemove(struct capi_pvt *i); +extern void divaStreamingWakeup (void); +#endif #define capi_number(data, strip) \ capi_number_func(data, strip, alloca(AST_MAX_EXTENSION)) diff --git a/divastreaming/platform.h b/divastreaming/platform.h index 90ebd58..4f0869b 100644 --- a/divastreaming/platform.h +++ b/divastreaming/platform.h @@ -80,6 +80,8 @@ typedef unsigned long long qword; void* diva_os_malloc (unsigned long flags, unsigned long size); void diva_os_free (unsigned long flags, void* ptr); +#define _DI_STREAM_CTRL 0x0014 + #include #include "debuglib.h"