From 7575adcab8757d56f07ec1ee2b53f0c8e271d5b3 Mon Sep 17 00:00:00 2001 From: William King Date: Mon, 28 Dec 2015 15:13:07 -0800 Subject: [PATCH] FS-8692 Added functionality to mod_amqp command listener to be able to send the api response back as an event to an exchange defined in the api message headers --- .../mod_amqp/mod_amqp_command.c | 84 ++++++++++++++++++- 1 file changed, 81 insertions(+), 3 deletions(-) diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp_command.c b/src/mod/event_handlers/mod_amqp/mod_amqp_command.c index ba12ffcba6..430257c532 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp_command.c +++ b/src/mod/event_handlers/mod_amqp/mod_amqp_command.c @@ -178,6 +178,60 @@ switch_status_t mod_amqp_command_create(char *name, switch_xml_t cfg) return SWITCH_STATUS_GENERR; } +static void mod_amqp_command_response(mod_amqp_command_profile_t *profile, char *command, switch_stream_handle_t stream, + char *fs_resp_exchange, char *fs_resp_key, switch_status_t status) +{ + char *json_output = NULL; + amqp_basic_properties_t props; + cJSON *message = NULL; + + if (! profile->conn_active) { + /* No connection, so we can not send the message. */ + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] not active\n", profile->name); + return; + } + + /* Construct the api response */ + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Preparing api command response: [%s]\n", (char *)stream.data); + message = cJSON_CreateObject(); + + cJSON_AddItemToObject(message, "output", cJSON_CreateString((const char *) stream.data)); + cJSON_AddItemToObject(message, "command", cJSON_CreateString(command)); + cJSON_AddItemToObject(message, "status", cJSON_CreateNumber((double) status)); + + json_output = cJSON_Print(message); + cJSON_Delete(message); + + memset(&props, 0, sizeof(amqp_basic_properties_t)); + + props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG; + props.content_type = amqp_cstring_bytes("text/json"); + + status = amqp_basic_publish( + profile->conn_active->state, + 1, + amqp_cstring_bytes(fs_resp_exchange), + amqp_cstring_bytes(fs_resp_key), + 0, + 0, + &props, + amqp_cstring_bytes(json_output)); + + switch_safe_free(json_output); + + if (status < 0) { + const char *errstr = amqp_error_string2(-status); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] failed to send event on connection[%s]: %s\n", + profile->name, profile->conn_active->name, errstr); + + /* This is bad, we couldn't send the message. Clear up any connection */ + mod_amqp_connection_close(profile->conn_active); + profile->conn_active = NULL; + return; + } + + return; +} void * SWITCH_THREAD_FUNC mod_amqp_command_thread(switch_thread_t *thread, void *data) { @@ -286,6 +340,7 @@ void * SWITCH_THREAD_FUNC mod_amqp_command_thread(switch_thread_t *thread, void COMMAND_FORMAT_UNKNOWN, COMMAND_FORMAT_PLAINTEXT } commandFormat = COMMAND_FORMAT_PLAINTEXT; + char *fs_resp_exchange = NULL, *fs_resp_key = NULL; amqp_maybe_release_buffers(profile->conn_active->state); @@ -342,6 +397,24 @@ void * SWITCH_THREAD_FUNC mod_amqp_command_thread(switch_thread_t *thread, void } } + if (envelope.message.properties.headers.num_entries) { + int x = 0; + + for ( x = 0; x < envelope.message.properties.headers.num_entries; x++) { + char *header_key = (char *)envelope.message.properties.headers.entries[x].key.bytes; + char *header_value = (char *)envelope.message.properties.headers.entries[x].value.value.bytes.bytes; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "AMQP message custom header key[%s] value[%s]\n", header_key, header_value); + + if ( !strncmp(header_key, "x-fs-api-resp-exchange", 22)) { + fs_resp_exchange = header_value; + } else if (!strncmp(header_key, "x-fs-api-resp-key", 17)) { + fs_resp_key = header_value; + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Ignoring unrecognized event header [%s]\n", header_key); + } + } + } + if (commandFormat == COMMAND_FORMAT_PLAINTEXT) { switch_stream_handle_t stream = { 0 }; /* Collects the command output */ @@ -353,10 +426,15 @@ void * SWITCH_THREAD_FUNC mod_amqp_command_thread(switch_thread_t *thread, void SWITCH_STANDARD_STREAM(stream); - if (switch_console_execute(command, 0, &stream) != SWITCH_STATUS_SUCCESS) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Remote command failed:\n%s\n", (char *) stream.data); + if ( fs_resp_exchange && fs_resp_key ) { + switch_status_t status = switch_console_execute(command, 0, &stream); + mod_amqp_command_response(profile, command, stream, fs_resp_exchange, fs_resp_key, status); } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Remote command succeeded:\n%s\n", (char *) stream.data); + if (switch_console_execute(command, 0, &stream) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Remote command failed:\n%s\n", (char *) stream.data); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Remote command succeeded:\n%s\n", (char *) stream.data); + } } switch_safe_free(stream.data); }