inter session communication goodies

git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@309 d0543943-73ff-0310-b7d9-9358b9ac24b2
This commit is contained in:
Anthony Minessale 2006-01-09 19:48:20 +00:00
parent 73b4b25b15
commit 061c1ecb1b
6 changed files with 140 additions and 10 deletions

View File

@ -47,6 +47,37 @@ extern "C" {
#define SWITCH_MAX_CORE_THREAD_SESSION_OBJS 128
#define SWITCH_MAX_STREAMS 128
/*! \brief A message object designed to allow unlike technologies to exchange data */
struct switch_core_session_message {
/*! uuid of the sender (for replies)*/
char *from;
/*! enumeration of the type of message */
switch_core_session_message_t message_id;
/*! optional numeric arg*/
int numeric_arg;
/*! optional string arg*/
char *string_arg;
/*! optional string arg*/
size_t string_arg_size;
/*! optional arbitrary pointer arg */
void *pointer_arg;
/*! optional arbitrary pointer arg's size */
size_t pointer_arg_size;
/*! optional numeric reply*/
int numeric_reply;
/*! optional string reply*/
char *string_reply;
/*! optional string reply*/
size_t string_reply_size;
/*! optional arbitrary pointer reply */
void *pointer_reply;
/*! optional arbitrary pointer reply's size */
size_t pointer_reply_size;
};
/*! \brief A generic object to pass as a thread's session object to allow mutiple arguements and a pool */
struct switch_core_thread_session {
/*! status of the thread */
@ -213,6 +244,14 @@ SWITCH_DECLARE(void) switch_core_session_signal_state_change(switch_core_session
*/
SWITCH_DECLARE(char *) switch_core_session_get_uuid(switch_core_session *session);
/*!
\brief Send a message to another session using it's uuid
\param uuid_str the unique id of the session you want to send a message to
\param message the switch_core_session_message object to send
\return the status returned by the message handler
*/
SWITCH_DECLARE (switch_status) switch_core_session_message_send(char *uuid_str, switch_core_session_message *message);
/*!
\brief Retrieve private user data from a session
\param session the session to retrieve from
@ -293,6 +332,13 @@ SWITCH_DECLARE(switch_status) switch_core_session_outgoing_channel(switch_core_s
*/
SWITCH_DECLARE(switch_status) switch_core_session_answer_channel(switch_core_session *session);
/*!
\brief Receive a message on a given session
\param session the session to receive the message from
\return the status returned by the message handler
*/
SWITCH_DECLARE(switch_status) switch_core_session_receive_message(switch_core_session *session, switch_core_session_message *message);
/*!
\brief Read a frame from a session
\param session the session to read from

View File

@ -76,6 +76,13 @@ struct switch_io_event_hook_answer_channel {
struct switch_io_event_hook_answer_channel *next;
};
/*! \brief Node in which to store custom receive message callback hooks */
struct switch_io_event_hook_receive_message {
/*! the answer channel callback hook*/
switch_receive_message_hook receive_message;
struct switch_io_event_hook_receive_message *next;
};
/*! \brief Node in which to store custom read frame channel callback hooks */
struct switch_io_event_hook_read_frame {
/*! the read frame channel callback hook*/
@ -124,6 +131,8 @@ struct switch_io_event_hooks {
struct switch_io_event_hook_outgoing_channel *outgoing_channel;
/*! a list of answer channel hooks */
struct switch_io_event_hook_answer_channel *answer_channel;
/*! a list of receive message hooks */
struct switch_io_event_hook_receive_message *receive_message;
/*! a list of read frame hooks */
struct switch_io_event_hook_read_frame *read_frame;
/*! a list of write frame hooks */
@ -156,6 +165,8 @@ struct switch_io_routines {
switch_status (*waitfor_write)(switch_core_session *, int, int);
/*! send a string of DTMF digits to a session's channel */
switch_status (*send_dtmf)(switch_core_session *, char *);
/*! receive a message from another session*/
switch_status (*receive_message)(switch_core_session *, switch_core_session_message *);
};
/*! \brief Abstraction of an module endpoint interface

View File

@ -44,6 +44,21 @@ extern "C" {
#define SWITCH_GLOBAL_VERSION "1"
#define SWITCH_MAX_CODECS 30
/*!
\enum switch_core_session_message_t
\brief Possible types of messages for inter-session communication
<pre>
SWITCH_MESSAGE_REDIRECT_AUDIO - Indication to redirect audio to another location if possible
SWITCH_MESSAGE_TRANSMIT_TEXT - A text message
</pre>
*/
typedef enum {
SWITCH_MESSAGE_REDIRECT_AUDIO,
SWITCH_MESSAGE_TRANSMIT_TEXT
} switch_core_session_message_t;
/*!
\enum switch_stack_t
\brief Expression of how to stack a list
@ -140,7 +155,7 @@ typedef enum {
<pre>
CF_SEND_AUDIO = (1 << 0) - Channel will send audio
CF_RECV_AUDIO = (1 << 1) - Channel will recieve audio
CF_RECV_AUDIO = (1 << 1) - Channel will receive audio
CF_ANSWERED = (1 << 2) - Channel is answered
CF_OUTBOUND = (1 << 3) - Channel is an outbound channel
</pre>
@ -283,6 +298,7 @@ typedef enum {
} switch_event_t;
typedef struct switch_core_session_message switch_core_session_message;
typedef struct switch_audio_resampler switch_audio_resampler;
typedef struct switch_event_header switch_event_header;
typedef struct switch_event switch_event;
@ -312,6 +328,7 @@ typedef struct switch_core_thread_session switch_core_thread_session;
typedef struct switch_codec_implementation switch_codec_implementation;
typedef struct switch_io_event_hook_outgoing_channel switch_io_event_hook_outgoing_channel;
typedef struct switch_io_event_hook_answer_channel switch_io_event_hook_answer_channel;
typedef struct switch_io_event_hook_receive_message switch_io_event_hook_receive_message;
typedef struct switch_io_event_hook_read_frame switch_io_event_hook_read_frame;
typedef struct switch_io_event_hook_write_frame switch_io_event_hook_write_frame;
typedef struct switch_io_event_hook_kill_channel switch_io_event_hook_kill_channel;
@ -329,6 +346,7 @@ typedef switch_caller_extension *(*switch_dialplan_hunt_function)(switch_core_se
typedef switch_status (*switch_event_handler)(switch_core_session *);
typedef switch_status (*switch_outgoing_channel_hook)(switch_core_session *, switch_caller_profile *, switch_core_session *);
typedef switch_status (*switch_answer_channel_hook)(switch_core_session *);
typedef switch_status (*switch_receive_message_hook)(switch_core_session *, switch_core_session_message *);
typedef switch_status (*switch_read_frame_hook)(switch_core_session *, switch_frame **, int, switch_io_flag, int);
typedef switch_status (*switch_write_frame_hook)(switch_core_session *, switch_frame *, int, switch_io_flag, int);
typedef switch_status (*switch_kill_channel_hook)(switch_core_session *, int);

View File

@ -46,6 +46,8 @@ struct audio_bridge_data {
static void *audio_bridge_thread(switch_thread *thread, void *obj)
{
struct switch_core_thread_session *data = obj;
int *stream_id_p;
int stream_id = *stream_id_p;
switch_channel *chan_a, *chan_b;
switch_frame *read_frame;
@ -53,6 +55,8 @@ static void *audio_bridge_thread(switch_thread *thread, void *obj)
session_a = data->objs[0];
session_b = data->objs[1];
stream_id = data->objs[2];
chan_a = switch_core_session_get_channel(session_a);
chan_b = switch_core_session_get_channel(session_b);
@ -74,8 +78,9 @@ default:
switch_channel_dequeue_dtmf(chan_a, dtmf, sizeof(dtmf));
switch_core_session_send_dtmf(session_b, dtmf);
}
if (switch_core_session_read_frame(session_a, &read_frame, -1) == SWITCH_STATUS_SUCCESS && read_frame->datalen) {
if (switch_core_session_write_frame(session_b, read_frame, -1) != SWITCH_STATUS_SUCCESS) {
if (switch_core_session_read_frame(session_a, &read_frame, -1, stream_id) == SWITCH_STATUS_SUCCESS && read_frame->datalen) {
if (switch_core_session_write_frame(session_b, read_frame, -1, stream_id) != SWITCH_STATUS_SUCCESS) {
switch_console_printf(SWITCH_CHANNEL_CONSOLE, "write: Bad Frame.... %d Bubye!\n", read_frame->datalen);
data->running = -1;
}
@ -83,6 +88,7 @@ default:
switch_console_printf(SWITCH_CHANNEL_CONSOLE, "read: Bad Frame.... %d Bubye!\n", read_frame->datalen);
data->running = -1;
}
}
switch_channel_hangup(chan_b);
@ -188,16 +194,19 @@ static void audio_bridge_function(switch_core_session *session, char *data)
} else {
struct switch_core_thread_session this_audio_thread, other_audio_thread;
time_t start;
int stream_id = 0;
peer_channel = switch_core_session_get_channel(peer_session);
memset(&other_audio_thread, 0, sizeof(other_audio_thread));
memset(&this_audio_thread, 0, sizeof(this_audio_thread));
other_audio_thread.objs[0] = session;
other_audio_thread.objs[1] = peer_session;
other_audio_thread.objs[2] = &stream_id;
other_audio_thread.running = 5;
this_audio_thread.objs[0] = peer_session;
this_audio_thread.objs[1] = session;
this_audio_thread.objs[2] = &stream_id;
this_audio_thread.running = 2;

View File

@ -53,6 +53,7 @@ void playback_function(switch_core_session *session, char *data)
switch_file_handle fh;
char *codec_name;
int x;
int stream_id;
channel = switch_core_session_get_channel(session);
assert(channel != NULL);
@ -107,7 +108,9 @@ void playback_function(switch_core_session *session, char *data)
switch_console_printf(SWITCH_CHANNEL_CONSOLE, "setup timer success %d bytes per %d ms!\n", len, interval);
/* start a thread to absorb incoming audio */
switch_core_service_session(session, &thread_session);
for(stream_id = 0; stream_id < switch_core_session_get_stream_count(session); stream_id++) {
switch_core_service_session(session, &thread_session, stream_id);
}
ilen = samples;
while(switch_channel_get_state(channel) == CS_EXECUTE) {
int done = 0;
@ -140,9 +143,17 @@ void playback_function(switch_core_session *session, char *data)
#ifdef SWAP_LINEAR
switch_swap_linear(write_frame.data, (int)write_frame.datalen / 2);
#endif
if (switch_core_session_write_frame(session, &write_frame, -1) != SWITCH_STATUS_SUCCESS) {
switch_console_printf(SWITCH_CHANNEL_CONSOLE, "Bad Write\n");
break;
for(stream_id = 0; stream_id < switch_core_session_get_stream_count(session); stream_id++) {
if (switch_core_session_write_frame(session, &write_frame, -1, stream_id) != SWITCH_STATUS_SUCCESS) {
switch_console_printf(SWITCH_CHANNEL_CONSOLE, "Bad Write\n");
done = 1;
break;
}
if (done) {
break;
}
}
if ((x = switch_core_timer_next(&timer)) < 0) {

View File

@ -186,6 +186,18 @@ SWITCH_DECLARE(switch_status) switch_core_do_perl(char *txt)
}
#endif
SWITCH_DECLARE (switch_status) switch_core_session_message_send(char *uuid_str, switch_core_session_message *message)
{
switch_core_session *session = NULL;
if ((session = switch_core_hash_find(runtime.session_table, uuid_str))) {
if (switch_channel_get_state(session->channel) < CS_HANGUP) {
return switch_core_session_receive_message(session, message);
}
}
return SWITCH_STATUS_FALSE;
}
SWITCH_DECLARE(char *) switch_core_session_get_uuid(switch_core_session *session)
{
@ -472,9 +484,10 @@ static void *switch_core_service_thread(switch_thread *thread, void *obj)
{
switch_core_thread_session *data = obj;
switch_core_session *session = data->objs[0];
int *stream_id = data->objs[1];
int *stream_id_p = data->objs[1];
switch_channel *channel;
switch_frame *read_frame;
int stream_id = *stream_id_p;
assert(session != NULL);
channel = switch_core_session_get_channel(session);
@ -482,7 +495,7 @@ static void *switch_core_service_thread(switch_thread *thread, void *obj)
while(data->running > 0) {
switch(switch_core_session_read_frame(session, &read_frame, -1, *stream_id)) {
switch(switch_core_session_read_frame(session, &read_frame, -1, stream_id)) {
case SWITCH_STATUS_SUCCESS:
break;
case SWITCH_STATUS_TIMEOUT:
@ -712,6 +725,27 @@ SWITCH_DECLARE(switch_status) switch_core_session_answer_channel(switch_core_ses
return status;
}
SWITCH_DECLARE(switch_status) switch_core_session_receive_message(switch_core_session *session, switch_core_session_message *message)
{
struct switch_io_event_hook_receive_message *ptr;
switch_status status = SWITCH_STATUS_FALSE;
assert(session != NULL);
if (session->endpoint_interface->io_routines->receive_message) {
if ((status = session->endpoint_interface->io_routines->receive_message(session, message)) == SWITCH_STATUS_SUCCESS) {
for (ptr = session->event_hooks.receive_message; ptr ; ptr = ptr->next) {
if ((status = ptr->receive_message(session, message)) != SWITCH_STATUS_SUCCESS) {
break;
}
}
}
} else {
status = SWITCH_STATUS_SUCCESS;
}
return status;
}
SWITCH_DECLARE(switch_status) switch_core_session_read_frame(switch_core_session *session, switch_frame **frame, int timeout, int stream_id)
{
struct switch_io_event_hook_read_frame *ptr;
@ -1726,8 +1760,9 @@ static void * SWITCH_THREAD_FUNC switch_core_session_thread(switch_thread *threa
snprintf(session->name, sizeof(session->name), "%ld", session->id);
switch_core_hash_insert(runtime.session_table, session->name, session);
switch_core_hash_insert(runtime.session_table, session->uuid_str, session);
switch_core_session_run(session);
switch_core_hash_delete(runtime.session_table, session->uuid_str);
switch_core_session_destroy(&session);