diff --git a/conf/vanilla/autoload_configs/amqp.conf.xml b/conf/vanilla/autoload_configs/amqp.conf.xml index d665d1104b..d6c24f4ff7 100644 --- a/conf/vanilla/autoload_configs/amqp.conf.xml +++ b/conf/vanilla/autoload_configs/amqp.conf.xml @@ -60,4 +60,24 @@ + + + + + + + + + + + + + + + + + + + + diff --git a/src/mod/event_handlers/mod_amqp/Makefile.am b/src/mod/event_handlers/mod_amqp/Makefile.am index 3a7ffd0f30..7bb93927b8 100644 --- a/src/mod/event_handlers/mod_amqp/Makefile.am +++ b/src/mod/event_handlers/mod_amqp/Makefile.am @@ -4,7 +4,7 @@ MODNAME=mod_amqp if HAVE_AMQP mod_LTLIBRARIES = mod_amqp.la -mod_amqp_la_SOURCES = mod_amqp_utils.c mod_amqp_connection.c mod_amqp_producer.c mod_amqp_command.c mod_amqp.c +mod_amqp_la_SOURCES = mod_amqp_utils.c mod_amqp_connection.c mod_amqp_producer.c mod_amqp_command.c mod_amqp_logging.c mod_amqp.c mod_amqp_la_CFLAGS = $(AM_CFLAGS) $(AMQP_CFLAGS) mod_amqp_la_LIBADD = $(switch_builddir)/libfreeswitch.la mod_amqp_la_LDFLAGS = -avoid-version -module -no-undefined -shared $(AMQP_LIBS) $(SWITCH_AM_LDFLAGS) diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp.c b/src/mod/event_handlers/mod_amqp/mod_amqp.c index a481064975..ce82d739e8 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp.c +++ b/src/mod/event_handlers/mod_amqp/mod_amqp.c @@ -62,6 +62,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_amqp_load) globals.pool = pool; switch_core_hash_init(&(globals.producer_hash)); switch_core_hash_init(&(globals.command_hash)); + switch_core_hash_init(&(globals.logging_hash)); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_apqp loading: Version %s\n", switch_version_full()); @@ -72,6 +73,8 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_amqp_load) SWITCH_ADD_API(api_interface, "amqp", "amqp API", amqp_reload, "syntax"); + switch_log_bind_logger(mod_amqp_logging_recv, SWITCH_LOG_DEBUG, SWITCH_FALSE); + return SWITCH_STATUS_SUCCESS; } @@ -84,6 +87,7 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_amqp_shutdown) switch_hash_index_t *hi; mod_amqp_producer_profile_t *producer; mod_amqp_command_profile_t *command; + mod_amqp_logging_profile_t *logging; switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Mod starting shutting down\n"); switch_event_unbind_callback(mod_amqp_producer_event_handler); @@ -98,6 +102,13 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_amqp_shutdown) mod_amqp_command_destroy(&command); } + switch_log_unbind_logger(mod_amqp_logging_recv); + + while ((hi = switch_core_hash_first(globals.logging_hash))) { + switch_core_hash_this(hi, NULL, NULL, (void **)&logging); + mod_amqp_logging_destroy(&logging); + } + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Mod finished shutting down\n"); return SWITCH_STATUS_SUCCESS; } diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp.h b/src/mod/event_handlers/mod_amqp/mod_amqp.h index f82a8c5a9c..238236a3b4 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp.h +++ b/src/mod/event_handlers/mod_amqp/mod_amqp.h @@ -146,17 +146,46 @@ typedef struct { char *custom_attr; } mod_amqp_command_profile_t; +typedef struct { + char *name; + + char *exchange; + char *exchange_type; + int exchange_durable; + int exchange_auto_delete; + + uint32_t log_level_mask; + + /* Note: The AMQP channel is not reentrant this MUTEX serializes sending events. */ + mod_amqp_connection_t *conn_root; + mod_amqp_connection_t *conn_active; + + int reconnect_interval_ms; + + /* Logging thread */ + switch_thread_t *logging_thread; + switch_queue_t *send_queue; + unsigned int send_queue_size; + + switch_mutex_t *mutex; + switch_bool_t running; + char *custom_attr; + switch_memory_pool_t *pool; +} mod_amqp_logging_profile_t; + struct { switch_memory_pool_t *pool; switch_hash_t *producer_hash; switch_hash_t *command_hash; + switch_hash_t *logging_hash; } globals; /* utils */ switch_status_t mod_amqp_do_config(switch_bool_t reload); int mod_amqp_log_if_amqp_error(amqp_rpc_reply_t x, char const *context); int mod_amqp_count_chars(const char* string, char ch); +void mod_amqp_util_msg_destroy(mod_amqp_message_t **msg); /* connection */ switch_status_t mod_amqp_connection_create(mod_amqp_connection_t **conn, switch_xml_t cfg, switch_memory_pool_t *pool); @@ -179,5 +208,11 @@ void * SWITCH_THREAD_FUNC mod_amqp_producer_thread(switch_thread_t *thread, void char *amqp_util_encode(char *key, char *dest); +/* logging */ +switch_status_t mod_amqp_logging_recv(const switch_log_node_t *node, switch_log_level_t level); +switch_status_t mod_amqp_logging_create(char *name, switch_xml_t cfg); +switch_status_t mod_amqp_logging_destroy(mod_amqp_logging_profile_t **prof); +void * SWITCH_THREAD_FUNC mod_amqp_logging_thread(switch_thread_t *thread, void *data); + #endif /* MOD_AMQP_H */ diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp_logging.c b/src/mod/event_handlers/mod_amqp/mod_amqp_logging.c new file mode 100644 index 0000000000..d6a304f0d9 --- /dev/null +++ b/src/mod/event_handlers/mod_amqp/mod_amqp_logging.c @@ -0,0 +1,416 @@ +/* +* FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application +* Copyright (C) 2005-2012, Anthony Minessale II +* +* Version: MPL 1.1 +* +* The contents of this file are subject to the Mozilla Public License Version +* 1.1 (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* http://www.mozilla.org/MPL/ +* +* Software distributed under the License is distributed on an "AS IS" basis, +* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +* for the specific language governing rights and limitations under the +* License. +* +* The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application +* +* The Initial Developer of the Original Code is +* Anthony Minessale II +* Portions created by the Initial Developer are Copyright (C) +* the Initial Developer. All Rights Reserved. +* +* Based on mod_skel by +* Anthony Minessale II +* +* Contributor(s): +* +* Daniel Bryars +* Tim Brown +* Anthony Minessale II +* William King +* Mike Jerris +* +* mod_amqp.c -- Sends FreeSWITCH events to an AMQP broker +* +*/ + +#include "mod_amqp.h" + +switch_status_t mod_amqp_logging_recv(const switch_log_node_t *node, switch_log_level_t level) +{ + switch_hash_index_t *hi = NULL; + mod_amqp_message_t *msg = NULL; + mod_amqp_logging_profile_t *logging = NULL; + char *json = NULL; + + if (!strcmp(node->file, "mod_amqp_logging.c")) { + return SWITCH_STATUS_SUCCESS; + } + + /* + 1. Loop through logging hash of profiles. Check for a profile that accepts this logging level, and file regex. + 2. If event not already parsed/created, then create it now + 3. Queue copy of event into logging profile send queue + 4. Destroy local event copy + */ + for (hi = switch_core_hash_first(globals.logging_hash); hi; hi = switch_core_hash_next(&hi)) { + switch_core_hash_this(hi, NULL, NULL, (void **)&logging); + + if ( logging && switch_log_check_mask(logging->log_level_mask, level) ) { + char file[128] = {0}; + if ( !json ) { + cJSON *body = NULL; + char date[80] = ""; + switch_time_exp_t tm; + + switch_time_exp_lt(&tm, node->timestamp); + switch_snprintf(date, sizeof(date), "%0.4d-%0.2d-%0.2d %0.2d:%0.2d:%0.2d.%0.6d", + tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec, tm.tm_usec); + + /* Create cJSON body */ + body = cJSON_CreateObject(); + + cJSON_AddItemToObject(body, "file", cJSON_CreateString((const char *) node->file)); + cJSON_AddItemToObject(body, "function", cJSON_CreateString((const char *) node->func)); + cJSON_AddItemToObject(body, "line", cJSON_CreateNumber((double) node->line)); + cJSON_AddItemToObject(body, "level", cJSON_CreateString(switch_log_level2str(node->level))); + cJSON_AddItemToObject(body, "timestamp", cJSON_CreateString((const char *)date)); + cJSON_AddItemToObject(body, "timestamp_epoch", cJSON_CreateNumber((double) node->timestamp / 1000000)); + + cJSON_AddItemToObject(body, "content", cJSON_CreateString(node->content )); + + json = cJSON_Print(body); + cJSON_Delete(body); + } + + /* Create message */ + switch_malloc(msg, sizeof(mod_amqp_message_t)); + msg->pjson = strdup(json); + strcpy(file, node->file); + switch_replace_char(file, '.', '_', 0); + + snprintf(msg->routing_key, sizeof(msg->routing_key), "%s.%s.%s.%s", switch_core_get_hostname(), node->userdata, switch_log_level2str(node->level), file); + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "AMQP Created message with routing key[%s]\n", msg->routing_key); + + if (switch_queue_trypush(logging->send_queue, msg) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "AMQP logging message queue full. Messages will be dropped!\n"); + return SWITCH_STATUS_SUCCESS; + } + } + } + + + switch_safe_free(json); + return SWITCH_STATUS_SUCCESS; + +} + +switch_status_t mod_amqp_logging_destroy(mod_amqp_logging_profile_t **prof) +{ + mod_amqp_message_t *msg = NULL; + switch_status_t status = SWITCH_STATUS_SUCCESS; + mod_amqp_connection_t *conn = NULL, *conn_next = NULL; + switch_memory_pool_t *pool; + mod_amqp_logging_profile_t *profile; + + if (!prof || !*prof) { + return SWITCH_STATUS_SUCCESS; + } + + profile = *prof; + pool = profile->pool; + + if (profile->name) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Profile[%s] shutting down...\n", profile->name); + switch_core_hash_delete(globals.logging_hash, profile->name); + } + + profile->running = 0; + + if (profile->logging_thread) { + switch_thread_join(&status, profile->logging_thread); + } + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Profile[%s] closing AMQP socket...\n", profile->name); + + for (conn = profile->conn_root; conn; conn = conn_next) { + conn_next = conn->next; + mod_amqp_connection_destroy(&conn); + } + + profile->conn_active = NULL; + profile->conn_root = NULL; + + while (profile->send_queue && switch_queue_trypop(profile->send_queue, (void**)&msg) == SWITCH_STATUS_SUCCESS) { + mod_amqp_util_msg_destroy(&msg); + } + + if (pool) { + switch_core_destroy_memory_pool(&pool); + } + + *prof = NULL; + + return SWITCH_STATUS_SUCCESS; +} + +switch_status_t mod_amqp_logging_create(char *name, switch_xml_t cfg) +{ + mod_amqp_logging_profile_t *profile = NULL; + switch_xml_t params, param, connections, connection; + switch_threadattr_t *thd_attr = NULL; + char *exchange = NULL, *exchange_type = NULL; + int exchange_durable = 1; /* durable */ + switch_memory_pool_t *pool; + + if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) { + goto err; + } + + profile = switch_core_alloc(pool, sizeof(mod_amqp_logging_profile_t)); + profile->pool = pool; + profile->name = switch_core_strdup(profile->pool, name); + profile->running = 1; + profile->conn_root = NULL; + profile->conn_active = NULL; + profile->log_level_mask = 0; + profile->send_queue_size = 5000; + + if ((params = switch_xml_child(cfg, "params")) != NULL) { + for (param = switch_xml_child(params, "param"); param; param = param->next) { + char *var = (char *) switch_xml_attr_soft(param, "name"); + char *val = (char *) switch_xml_attr_soft(param, "value"); + + if (!var) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] param missing 'name' attribute\n", profile->name); + continue; + } + + if (!val) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] param[%s] missing 'value' attribute\n", profile->name, var); + continue; + } + + if (!strncmp(var, "reconnect_interval_ms", 21)) { + int interval = atoi(val); + if ( interval && interval > 0 ) { + profile->reconnect_interval_ms = interval; + } + } else if (!strncmp(var, "send_queue_size", 15)) { + int interval = atoi(val); + if ( interval && interval > 0 ) { + profile->send_queue_size = interval; + } + } else if (!strncmp(var, "exchange-type", 13)) { + exchange_type = switch_core_strdup(profile->pool, val); + } else if (!strncmp(var, "exchange-name", 13)) { + exchange = switch_core_strdup(profile->pool, val); + } else if (!strncmp(var, "exchange-durable", 16)) { + exchange_durable = switch_true(val); + } else if (!strncmp(var, "log-levels", 10)) { + profile->log_level_mask = switch_log_str2mask(val); + } + } /* params for loop */ + } + + /* Handle defaults of string types */ + profile->exchange = exchange ? exchange : switch_core_strdup(profile->pool, "TAP.Events"); + profile->exchange_type = exchange_type ? exchange_type : switch_core_strdup(profile->pool, "topic"); + profile->exchange_durable = exchange_durable; + + if ((connections = switch_xml_child(cfg, "connections")) != NULL) { + for (connection = switch_xml_child(connections, "connection"); connection; connection = connection->next) { + if ( ! profile->conn_root ) { /* Handle first root node */ + if (mod_amqp_connection_create(&(profile->conn_root), connection, profile->pool) != SWITCH_STATUS_SUCCESS) { + /* Handle connection create failure */ + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Profile[%s] failed to create connection\n", profile->name); + continue; + } + profile->conn_active = profile->conn_root; + } else { + if (mod_amqp_connection_create(&(profile->conn_active->next), connection, profile->pool) != SWITCH_STATUS_SUCCESS) { + /* Handle connection create failure */ + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Profile[%s] failed to create connection\n", profile->name); + continue; + } + profile->conn_active = profile->conn_active->next; + } + } + } + profile->conn_active = NULL; + + if ( mod_amqp_connection_open(profile->conn_root, &(profile->conn_active), profile->name, profile->custom_attr) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile[%s] was unable to connect to any connection\n", profile->name); + goto err; + } + + amqp_exchange_declare(profile->conn_active->state, 1, + amqp_cstring_bytes(profile->exchange), + amqp_cstring_bytes(profile->exchange_type), + 0, /* passive */ + profile->exchange_durable, + amqp_empty_table); + + if (mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Declaring exchange")) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile[%s] failed to create exchange\n", profile->name); + goto err; + } + + /* Create a bounded FIFO queue for sending messages */ + if (switch_queue_create(&(profile->send_queue), profile->send_queue_size, profile->pool) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Cannot create send queue of size %d!\n", profile->send_queue_size); + goto err; + } + + /* Start the event send thread. This will set up the initial connection */ + switch_threadattr_create(&thd_attr, profile->pool); + switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); + if (switch_thread_create(&profile->logging_thread, thd_attr, mod_amqp_logging_thread, profile, profile->pool)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Cannot create 'amqp event sender' thread!\n"); + goto err; + } + + if ( switch_core_hash_insert(globals.logging_hash, name, (void *) profile) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to insert new profile [%s] into mod_amqp profile hash\n", name); + goto err; + } + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile[%s] Successfully started\n", profile->name); + return SWITCH_STATUS_SUCCESS; + + err: + /* Cleanup */ + mod_amqp_logging_destroy(&profile); + return SWITCH_STATUS_GENERR; + +} + +/* This should only be called in a single threaded context from the logging profile send thread */ +switch_status_t mod_amqp_logging_send(mod_amqp_logging_profile_t *profile, mod_amqp_message_t *msg) +{ + amqp_basic_properties_t props; + int status; + + if (! profile->conn_active) { + /* No connection, so we can not send the message. */ + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] not active\n", profile->name); + return SWITCH_STATUS_NOT_INITALIZED; + } + memset(&props, 0, sizeof(amqp_basic_properties_t)); + + props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG; + props.content_type = amqp_cstring_bytes("application/json"); + + status = amqp_basic_publish( + profile->conn_active->state, + 1, + amqp_cstring_bytes(profile->exchange), + amqp_cstring_bytes(msg->routing_key), + 0, + 0, + &props, + amqp_cstring_bytes(msg->pjson)); + + if (status < 0) { + const char *errstr = amqp_error_string2(-status); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] failed to send event on connection[%s]: %s\n", + profile->name, profile->conn_active->name, errstr); + + /* This is bad, we couldn't send the message. Clear up any connection */ + mod_amqp_connection_close(profile->conn_active); + profile->conn_active = NULL; + return SWITCH_STATUS_SOCKERR; + } + + return SWITCH_STATUS_SUCCESS; +} + + + +void * SWITCH_THREAD_FUNC mod_amqp_logging_thread(switch_thread_t *thread, void *data) +{ + mod_amqp_message_t *msg = NULL; + switch_status_t status = SWITCH_STATUS_SUCCESS; + mod_amqp_logging_profile_t *profile = (mod_amqp_logging_profile_t *)data; + amqp_boolean_t passive = 0; + amqp_boolean_t durable = 1; + + while (profile->running) { + if (!profile->conn_active) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Amqp no connection- reconnecting...\n"); + + status = mod_amqp_connection_open(profile->conn_root, &(profile->conn_active), profile->name, profile->custom_attr); + if ( status == SWITCH_STATUS_SUCCESS ) { + // Ensure that the exchange exists, and is of the correct type + amqp_exchange_declare(profile->conn_active->state, 1, + amqp_cstring_bytes(profile->exchange), + amqp_cstring_bytes(profile->exchange_type), + passive, + durable, + amqp_empty_table); + + if (!mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Declaring exchange")) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Amqp reconnect successful- connected\n"); + continue; + } + } + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Profile[%s] failed to connect with code(%d), sleeping for %dms\n", + profile->name, status, profile->reconnect_interval_ms); + switch_sleep(profile->reconnect_interval_ms * 1000); + continue; + } + + if (!msg && switch_queue_pop_timeout(profile->send_queue, (void**)&msg, 1000000) != SWITCH_STATUS_SUCCESS) { + continue; + } + + if (msg) { + switch (mod_amqp_logging_send(profile, msg)) { + case SWITCH_STATUS_SUCCESS: + /* Success: prepare for next message */ + mod_amqp_util_msg_destroy(&msg); + break; + + case SWITCH_STATUS_NOT_INITALIZED: + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Send failed with 'not initialised'\n"); + break; + + case SWITCH_STATUS_SOCKERR: + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Send failed with 'socket error'\n"); + break; + + default: + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Send failed with a generic error\n"); + + /* Send failed and closed the connection; reconnect will happen at the beginning of the loop + * NB: do we need a delay here to prevent a fast reconnect-send-fail loop? */ + break; + } + } + } + + /* Abort the current message */ + mod_amqp_util_msg_destroy(&msg); + + // Terminate the thread + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Event sender thread stopped\n"); + switch_thread_exit(thread, SWITCH_STATUS_SUCCESS); + return NULL; +} + + + +/* For Emacs: + * Local Variables: + * mode:c + * indent-tabs-mode:t + * tab-width:4 + * c-basic-offset:4 + * End: + * For VIM: + * vim:set softtabstop=4 shiftwidth=4 tabstop=4 + */ diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp_producer.c b/src/mod/event_handlers/mod_amqp/mod_amqp_producer.c index 980cad8a05..39e6d5e5e1 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp_producer.c +++ b/src/mod/event_handlers/mod_amqp/mod_amqp_producer.c @@ -38,13 +38,6 @@ #include "mod_amqp.h" -void mod_amqp_producer_msg_destroy(mod_amqp_message_t **msg) -{ - if (!msg || !*msg) return; - switch_safe_free((*msg)->pjson); - switch_safe_free(*msg); -} - switch_status_t mod_amqp_producer_routing_key(mod_amqp_producer_profile_t *profile, char routingKey[MAX_AMQP_ROUTING_KEY_LENGTH], switch_event_t* evt, mod_amqp_keypart_t routingKeyEventHeaderNames[]) { @@ -115,7 +108,7 @@ void mod_amqp_producer_event_handler(switch_event_t* evt) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "AMQP message queue full. Messages will be dropped for %.1fs! (Queue capacity %d)", profile->circuit_breaker_ms / 1000.0, queue_size); - mod_amqp_producer_msg_destroy(&amqp_message); + mod_amqp_util_msg_destroy(&amqp_message); } } @@ -155,7 +148,7 @@ switch_status_t mod_amqp_producer_destroy(mod_amqp_producer_profile_t **prof) { profile->conn_root = NULL; while (profile->send_queue && switch_queue_trypop(profile->send_queue, (void**)&msg) == SWITCH_STATUS_SUCCESS) { - mod_amqp_producer_msg_destroy(&msg); + mod_amqp_util_msg_destroy(&msg); } if (pool) { @@ -497,7 +490,7 @@ void * SWITCH_THREAD_FUNC mod_amqp_producer_thread(switch_thread_t *thread, void switch (mod_amqp_producer_send(profile, msg)) { case SWITCH_STATUS_SUCCESS: /* Success: prepare for next message */ - mod_amqp_producer_msg_destroy(&msg); + mod_amqp_util_msg_destroy(&msg); break; case SWITCH_STATUS_NOT_INITALIZED: @@ -541,7 +534,7 @@ void * SWITCH_THREAD_FUNC mod_amqp_producer_thread(switch_thread_t *thread, void } /* Abort the current message */ - mod_amqp_producer_msg_destroy(&msg); + mod_amqp_util_msg_destroy(&msg); // Terminate the thread switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Event sender thread stopped\n"); diff --git a/src/mod/event_handlers/mod_amqp/mod_amqp_utils.c b/src/mod/event_handlers/mod_amqp/mod_amqp_utils.c index 5c59cba1f7..eda879d310 100644 --- a/src/mod/event_handlers/mod_amqp/mod_amqp_utils.c +++ b/src/mod/event_handlers/mod_amqp/mod_amqp_utils.c @@ -144,6 +144,30 @@ switch_status_t mod_amqp_do_config(switch_bool_t reload) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Unable to locate commands section for mod_amqp\n" ); } + if ((profiles = switch_xml_child(cfg, "logging"))) { + if ((profile = switch_xml_child(profiles, "profile"))) { + for (; profile; profile = profile->next) { + char *name = (char *) switch_xml_attr_soft(profile, "name"); + + if (zstr(name)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to load mod_amqp profile. Check configs missing name attr\n"); + continue; + } + name = switch_core_strdup(globals.pool, name); + + if ( mod_amqp_logging_create(name, profile) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to load mod_amqp profile [%s]. Check configs\n", name); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Loaded mod_amqp profile [%s] successfully\n", name); + } + } + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Unable to locate a profile for mod_amqp\n" ); + } + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Unable to locate logging section for mod_amqp\n" ); + } + return SWITCH_STATUS_SUCCESS; } @@ -183,6 +207,14 @@ char *amqp_util_encode(char *key, char *dest) { return dest; } +void mod_amqp_util_msg_destroy(mod_amqp_message_t **msg) +{ + if (!msg || !*msg) return; + switch_safe_free((*msg)->pjson); + switch_safe_free(*msg); +} + + /* For Emacs: * Local Variables: