FS-8692 Added functionality to mod_amqp command listener to be able to send the api response back as an event to an exchange defined in the api message headers

This commit is contained in:
William King 2015-12-28 15:13:07 -08:00
parent c643405f49
commit 7575adcab8
1 changed files with 81 additions and 3 deletions

View File

@ -178,6 +178,60 @@ switch_status_t mod_amqp_command_create(char *name, switch_xml_t cfg)
return SWITCH_STATUS_GENERR;
}
static void mod_amqp_command_response(mod_amqp_command_profile_t *profile, char *command, switch_stream_handle_t stream,
char *fs_resp_exchange, char *fs_resp_key, switch_status_t status)
{
char *json_output = NULL;
amqp_basic_properties_t props;
cJSON *message = NULL;
if (! profile->conn_active) {
/* No connection, so we can not send the message. */
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] not active\n", profile->name);
return;
}
/* Construct the api response */
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Preparing api command response: [%s]\n", (char *)stream.data);
message = cJSON_CreateObject();
cJSON_AddItemToObject(message, "output", cJSON_CreateString((const char *) stream.data));
cJSON_AddItemToObject(message, "command", cJSON_CreateString(command));
cJSON_AddItemToObject(message, "status", cJSON_CreateNumber((double) status));
json_output = cJSON_Print(message);
cJSON_Delete(message);
memset(&props, 0, sizeof(amqp_basic_properties_t));
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG;
props.content_type = amqp_cstring_bytes("text/json");
status = amqp_basic_publish(
profile->conn_active->state,
1,
amqp_cstring_bytes(fs_resp_exchange),
amqp_cstring_bytes(fs_resp_key),
0,
0,
&props,
amqp_cstring_bytes(json_output));
switch_safe_free(json_output);
if (status < 0) {
const char *errstr = amqp_error_string2(-status);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] failed to send event on connection[%s]: %s\n",
profile->name, profile->conn_active->name, errstr);
/* This is bad, we couldn't send the message. Clear up any connection */
mod_amqp_connection_close(profile->conn_active);
profile->conn_active = NULL;
return;
}
return;
}
void * SWITCH_THREAD_FUNC mod_amqp_command_thread(switch_thread_t *thread, void *data)
{
@ -286,6 +340,7 @@ void * SWITCH_THREAD_FUNC mod_amqp_command_thread(switch_thread_t *thread, void
COMMAND_FORMAT_UNKNOWN,
COMMAND_FORMAT_PLAINTEXT
} commandFormat = COMMAND_FORMAT_PLAINTEXT;
char *fs_resp_exchange = NULL, *fs_resp_key = NULL;
amqp_maybe_release_buffers(profile->conn_active->state);
@ -342,6 +397,24 @@ void * SWITCH_THREAD_FUNC mod_amqp_command_thread(switch_thread_t *thread, void
}
}
if (envelope.message.properties.headers.num_entries) {
int x = 0;
for ( x = 0; x < envelope.message.properties.headers.num_entries; x++) {
char *header_key = (char *)envelope.message.properties.headers.entries[x].key.bytes;
char *header_value = (char *)envelope.message.properties.headers.entries[x].value.value.bytes.bytes;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "AMQP message custom header key[%s] value[%s]\n", header_key, header_value);
if ( !strncmp(header_key, "x-fs-api-resp-exchange", 22)) {
fs_resp_exchange = header_value;
} else if (!strncmp(header_key, "x-fs-api-resp-key", 17)) {
fs_resp_key = header_value;
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Ignoring unrecognized event header [%s]\n", header_key);
}
}
}
if (commandFormat == COMMAND_FORMAT_PLAINTEXT) {
switch_stream_handle_t stream = { 0 }; /* Collects the command output */
@ -353,10 +426,15 @@ void * SWITCH_THREAD_FUNC mod_amqp_command_thread(switch_thread_t *thread, void
SWITCH_STANDARD_STREAM(stream);
if (switch_console_execute(command, 0, &stream) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Remote command failed:\n%s\n", (char *) stream.data);
if ( fs_resp_exchange && fs_resp_key ) {
switch_status_t status = switch_console_execute(command, 0, &stream);
mod_amqp_command_response(profile, command, stream, fs_resp_exchange, fs_resp_key, status);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Remote command succeeded:\n%s\n", (char *) stream.data);
if (switch_console_execute(command, 0, &stream) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Remote command failed:\n%s\n", (char *) stream.data);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Remote command succeeded:\n%s\n", (char *) stream.data);
}
}
switch_safe_free(stream.data);
}