/* * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application * Copyright (C) 2005-2012, Anthony Minessale II * * Version: MPL 1.1 * * The contents of this file are subject to the Mozilla Public License Version * 1.1 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * http://www.mozilla.org/MPL/ * * Software distributed under the License is distributed on an "AS IS" basis, * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License * for the specific language governing rights and limitations under the * License. * * The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application * * The Initial Developer of the Original Code is * Anthony Minessale II * Portions created by the Initial Developer are Copyright (C) * the Initial Developer. All Rights Reserved. * * Based on mod_skel by * Anthony Minessale II * * Contributor(s): * * Daniel Bryars * Tim Brown * Anthony Minessale II * William King * Mike Jerris * * mod_amqp.c -- Sends FreeSWITCH events to an AMQP broker * */ #include "mod_amqp.h" switch_status_t mod_amqp_producer_routing_key(mod_amqp_producer_profile_t *profile, char routingKey[MAX_AMQP_ROUTING_KEY_LENGTH], switch_event_t* evt, mod_amqp_keypart_t routingKeyEventHeaderNames[]) { int i = 0, idx = 0, x = 0; char keybuffer[MAX_AMQP_ROUTING_KEY_LENGTH]; for (i = 0; i < MAX_ROUTING_KEY_FORMAT_FIELDS && idx < MAX_AMQP_ROUTING_KEY_LENGTH; i++) { if (routingKeyEventHeaderNames[i].size) { if (idx) { routingKey[idx++] = '.'; } for( x = 0; x < routingKeyEventHeaderNames[i].size; x++) { if (routingKeyEventHeaderNames[i].name[x][0] == '#') { strncpy(routingKey + idx, routingKeyEventHeaderNames[i].name[x] + 1, MAX_AMQP_ROUTING_KEY_LENGTH - idx); break; } else { char *value = switch_event_get_header(evt, routingKeyEventHeaderNames[i].name[x]); if (value) { amqp_util_encode(value, keybuffer); strncpy(routingKey + idx, keybuffer, MAX_AMQP_ROUTING_KEY_LENGTH - idx); break; } } } idx += strlen(routingKey + idx); } } return SWITCH_STATUS_SUCCESS; } void mod_amqp_producer_event_handler(switch_event_t* evt) { mod_amqp_message_t *amqp_message; mod_amqp_producer_profile_t *profile = (mod_amqp_producer_profile_t *)evt->bind_user_data; switch_time_t now = switch_time_now(); switch_time_t reset_time; if (!profile) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Event without a profile %p %p\n", (void *)evt, (void *)evt->event_user_data); return; } /* If the mod is disabled ignore the event */ if (!profile->running) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile[%s] not running\n", profile->name); return; } /* If the circuit breaker is active, ignore the event */ reset_time = profile->circuit_breaker_reset_time; if (now < reset_time) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile[%s] circuit breaker hit[%d] (%d)\n", profile->name, (int) now, (int) reset_time); return; } switch_malloc(amqp_message, sizeof(mod_amqp_message_t)); switch_event_serialize_json(evt, &amqp_message->pjson); mod_amqp_producer_routing_key(profile, amqp_message->routing_key, evt, profile->format_fields); /* Queue the message to be sent by the worker thread, errors are reported only once per circuit breaker interval */ if (switch_queue_trypush(profile->send_queue, amqp_message) != SWITCH_STATUS_SUCCESS) { unsigned int queue_size = switch_queue_size(profile->send_queue); /* Trip the circuit breaker for a short period to stop recurring error messages (time is measured in uS) */ profile->circuit_breaker_reset_time = now + profile->circuit_breaker_ms * 1000; switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "AMQP message queue full. Messages will be dropped for %.1fs! (Queue capacity %d)", profile->circuit_breaker_ms / 1000.0, queue_size); mod_amqp_util_msg_destroy(&amqp_message); } } switch_status_t mod_amqp_producer_destroy(mod_amqp_producer_profile_t **prof) { mod_amqp_message_t *msg = NULL; switch_status_t status = SWITCH_STATUS_SUCCESS; mod_amqp_connection_t *conn = NULL, *conn_next = NULL; switch_memory_pool_t *pool; mod_amqp_producer_profile_t *profile; if (!prof || !*prof) { return SWITCH_STATUS_SUCCESS; } profile = *prof; pool = profile->pool; if (profile->name) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Profile[%s] shutting down...\n", profile->name); switch_core_hash_delete(mod_amqp_globals.producer_hash, profile->name); } profile->running = 0; if (profile->producer_thread) { switch_thread_join(&status, profile->producer_thread); } switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Profile[%s] closing AMQP socket...\n", profile->name); for (conn = profile->conn_root; conn; conn = conn_next) { conn_next = conn->next; mod_amqp_connection_destroy(&conn); } profile->conn_active = NULL; profile->conn_root = NULL; while (profile->send_queue && switch_queue_trypop(profile->send_queue, (void**)&msg) == SWITCH_STATUS_SUCCESS) { mod_amqp_util_msg_destroy(&msg); } if (pool) { switch_core_destroy_memory_pool(&pool); } *prof = NULL; return SWITCH_STATUS_SUCCESS; } switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg) { mod_amqp_producer_profile_t *profile = NULL; int arg = 0, i = 0; char *argv[SWITCH_EVENT_ALL]; switch_xml_t params, param, connections, connection; switch_threadattr_t *thd_attr = NULL; char *exchange = NULL, *exchange_type = NULL, *content_type = NULL; int exchange_durable = 1; /* durable */ int delivery_mode = -1; int delivery_timestamp = 1; switch_memory_pool_t *pool; char *format_fields[MAX_ROUTING_KEY_FORMAT_FIELDS+1]; int format_fields_size = 0; memset(format_fields, 0, MAX_ROUTING_KEY_FORMAT_FIELDS + 1); if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) { goto err; } profile = switch_core_alloc(pool, sizeof(mod_amqp_producer_profile_t)); profile->pool = pool; profile->name = switch_core_strdup(profile->pool, name); profile->running = 1; memset(profile->format_fields, 0, (MAX_ROUTING_KEY_FORMAT_FIELDS + 1) * sizeof(mod_amqp_keypart_t)); profile->event_ids[0] = SWITCH_EVENT_ALL; profile->event_subscriptions = 1; profile->conn_root = NULL; profile->conn_active = NULL; /* Set reasonable defaults which may change if more reasonable defaults are found */ /* Handle defaults of non string types */ profile->circuit_breaker_ms = 10000; profile->reconnect_interval_ms = 1000; profile->send_queue_size = 5000; if ((params = switch_xml_child(cfg, "params")) != NULL) { for (param = switch_xml_child(params, "param"); param; param = param->next) { char *var = (char *) switch_xml_attr_soft(param, "name"); char *val = (char *) switch_xml_attr_soft(param, "value"); if (!var) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] param missing 'name' attribute\n", profile->name); continue; } if (!val) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] param[%s] missing 'value' attribute\n", profile->name, var); continue; } if (!strncmp(var, "reconnect_interval_ms", 21)) { int interval = atoi(val); if ( interval && interval > 0 ) { profile->reconnect_interval_ms = interval; } } else if (!strncmp(var, "circuit_breaker_ms", 18)) { int interval = atoi(val); if ( interval && interval > 0 ) { profile->circuit_breaker_ms = interval; } } else if (!strncmp(var, "send_queue_size", 15)) { int interval = atoi(val); if ( interval && interval > 0 ) { profile->send_queue_size = interval; } } else if (!strncmp(var, "enable_fallback_format_fields", 29)) { int interval = atoi(val); if ( interval && interval > 0 ) { profile->enable_fallback_format_fields = 1; switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "amqp fallback format fields enabled\n"); } } else if (!strncmp(var, "exchange-type", 13)) { exchange_type = switch_core_strdup(profile->pool, val); } else if (!strncmp(var, "exchange-name", 13)) { exchange = switch_core_strdup(profile->pool, val); } else if (!strncmp(var, "exchange-durable", 16)) { exchange_durable = switch_true(val); } else if (!strncmp(var, "delivery-mode", 13)) { delivery_mode = atoi(val); } else if (!strncmp(var, "delivery-timestamp", 18)) { delivery_timestamp = switch_true(val); } else if (!strncmp(var, "exchange_type", 13)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Found exchange_type parameter. please change to exchange-type\n"); } else if (!strncmp(var, "exchange", 8)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Found exchange parameter. please change to exchange-name\n"); } else if (!strncmp(var, "content-type", 12)) { content_type = switch_core_strdup(profile->pool, val); } else if (!strncmp(var, "format_fields", 13)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "amqp format fields : %s\n", val); if ((format_fields_size = mod_amqp_count_chars(val, ',')) >= MAX_ROUTING_KEY_FORMAT_FIELDS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "You can have only %d routing fields in the routing key.\n", MAX_ROUTING_KEY_FORMAT_FIELDS); goto err; } /* increment size because the count returned the number of separators, not number of fields */ format_fields_size++; switch_separate_string(val, ',', format_fields, MAX_ROUTING_KEY_FORMAT_FIELDS); format_fields[format_fields_size] = NULL; } else if (!strncmp(var, "event_filter", 12)) { /* Parse new events */ profile->event_subscriptions = switch_separate_string(val, ',', argv, (sizeof(argv) / sizeof(argv[0]))); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Found %d subscriptions\n", profile->event_subscriptions); for (arg = 0; arg < profile->event_subscriptions; arg++) { if (switch_name_event(argv[arg], &(profile->event_ids[arg])) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "The switch event %s was not recognised.\n", argv[arg]); } } } } /* params for loop */ } /* Handle defaults of string types */ profile->exchange = exchange ? exchange : switch_core_strdup(profile->pool, "TAP.Events"); profile->exchange_type = exchange_type ? exchange_type : switch_core_strdup(profile->pool, "topic"); profile->exchange_durable = exchange_durable; profile->delivery_mode = delivery_mode; profile->delivery_timestamp = delivery_timestamp; profile->content_type = content_type ? content_type : switch_core_strdup(profile->pool, MOD_AMQP_DEFAULT_CONTENT_TYPE); for(i = 0; i < format_fields_size; i++) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "amqp routing key %d : %s\n", i, format_fields[i]); if(profile->enable_fallback_format_fields) { profile->format_fields[i].size = switch_separate_string(format_fields[i], '|', profile->format_fields[i].name, MAX_ROUTING_KEY_FORMAT_FALLBACK_FIELDS); if(profile->format_fields[i].size > 1) { for(arg = 0; arg < profile->format_fields[i].size; arg++) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "amqp routing key %d : sub key %d : %s\n", i, arg, profile->format_fields[i].name[arg]); } } } else { profile->format_fields[i].name[0] = format_fields[i]; profile->format_fields[i].size = 1; } } if ((connections = switch_xml_child(cfg, "connections")) != NULL) { for (connection = switch_xml_child(connections, "connection"); connection; connection = connection->next) { if ( ! profile->conn_root ) { /* Handle first root node */ if (mod_amqp_connection_create(&(profile->conn_root), connection, profile->pool) != SWITCH_STATUS_SUCCESS) { /* Handle connection create failure */ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Profile[%s] failed to create connection\n", profile->name); continue; } profile->conn_active = profile->conn_root; } else { if (mod_amqp_connection_create(&(profile->conn_active->next), connection, profile->pool) != SWITCH_STATUS_SUCCESS) { /* Handle connection create failure */ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Profile[%s] failed to create connection\n", profile->name); continue; } profile->conn_active = profile->conn_active->next; } } } profile->conn_active = NULL; if ( mod_amqp_connection_open(profile->conn_root, &(profile->conn_active), profile->name, profile->custom_attr) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile[%s] was unable to connect to any connection\n", profile->name); goto err; } #if AMQP_VERSION_MAJOR == 0 && AMQP_VERSION_MINOR >= 6 amqp_exchange_declare(profile->conn_active->state, 1, amqp_cstring_bytes(profile->exchange), amqp_cstring_bytes(profile->exchange_type), 0, /* passive */ profile->exchange_durable, profile->exchange_auto_delete, 0, amqp_empty_table); #else amqp_exchange_declare(profile->conn_active->state, 1, amqp_cstring_bytes(profile->exchange), amqp_cstring_bytes(profile->exchange_type), 0, /* passive */ profile->exchange_durable, amqp_empty_table); #endif if (mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Declaring exchange")) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Profile[%s] failed to create exchange\n", profile->name); goto err; } /* Create a bounded FIFO queue for sending messages */ if (switch_queue_create(&(profile->send_queue), profile->send_queue_size, profile->pool) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Cannot create send queue of size %d!\n", profile->send_queue_size); goto err; } /* Start the event send thread. This will set up the initial connection */ switch_threadattr_create(&thd_attr, profile->pool); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); if (switch_thread_create(&profile->producer_thread, thd_attr, mod_amqp_producer_thread, profile, profile->pool)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Cannot create 'amqp event sender' thread!\n"); goto err; } /* Subscribe events */ for (i = 0; i < profile->event_subscriptions; i++) { if (switch_event_bind_removable("AMQP", profile->event_ids[i], SWITCH_EVENT_SUBCLASS_ANY, mod_amqp_producer_event_handler, profile, &(profile->event_nodes[i])) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Cannot bind to event handler %d!\n",(int)profile->event_ids[i]); goto err; } } if ( switch_core_hash_insert(mod_amqp_globals.producer_hash, name, (void *) profile) != SWITCH_STATUS_SUCCESS) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Failed to insert new profile [%s] into mod_amqp profile hash\n", name); goto err; } switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Profile[%s] Successfully started\n", profile->name); return SWITCH_STATUS_SUCCESS; err: /* Cleanup */ mod_amqp_producer_destroy(&profile); return SWITCH_STATUS_GENERR; } /* This should only be called in a single threaded context from the producer profile send thread */ switch_status_t mod_amqp_producer_send(mod_amqp_producer_profile_t *profile, mod_amqp_message_t *msg) { amqp_table_entry_t messageTableEntries[2]; amqp_basic_properties_t props; int status; uint64_t timestamp; if (! profile->conn_active) { /* No connection, so we can not send the message. */ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] not active\n", profile->name); return SWITCH_STATUS_NOT_INITALIZED; } memset(&props, 0, sizeof(amqp_basic_properties_t)); props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG; props.content_type = amqp_cstring_bytes(profile->content_type); if(profile->delivery_mode > 0) { props._flags |= AMQP_BASIC_DELIVERY_MODE_FLAG; props.delivery_mode = profile->delivery_mode; } if(profile->delivery_timestamp) { props._flags |= AMQP_BASIC_TIMESTAMP_FLAG | AMQP_BASIC_HEADERS_FLAG; props.timestamp = (uint64_t)time(NULL); props.headers.num_entries = 1; props.headers.entries = messageTableEntries; timestamp = (uint64_t)switch_micro_time_now(); messageTableEntries[0].key = amqp_cstring_bytes("x_Liquid_MessageSentTimeStamp"); messageTableEntries[0].value.kind = AMQP_FIELD_KIND_TIMESTAMP; messageTableEntries[0].value.value.u64 = (uint64_t)(timestamp / 1000000); messageTableEntries[1].key = amqp_cstring_bytes("x_Liquid_MessageSentTimeStampMicro"); messageTableEntries[1].value.kind = AMQP_FIELD_KIND_U64; messageTableEntries[1].value.value.u64 = timestamp; } status = amqp_basic_publish( profile->conn_active->state, 1, amqp_cstring_bytes(profile->exchange), amqp_cstring_bytes(msg->routing_key), 0, 0, &props, amqp_cstring_bytes(msg->pjson)); if (status < 0) { const char *errstr = amqp_error_string2(-status); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] failed to send event on connection[%s]: %s\n", profile->name, profile->conn_active->name, errstr); /* This is bad, we couldn't send the message. Clear up any connection */ mod_amqp_connection_close(profile->conn_active); profile->conn_active = NULL; return SWITCH_STATUS_SOCKERR; } return SWITCH_STATUS_SUCCESS; } void * SWITCH_THREAD_FUNC mod_amqp_producer_thread(switch_thread_t *thread, void *data) { mod_amqp_message_t *msg = NULL; switch_status_t status = SWITCH_STATUS_SUCCESS; mod_amqp_producer_profile_t *profile = (mod_amqp_producer_profile_t *)data; amqp_boolean_t passive = 0; amqp_boolean_t durable = 1; while (profile->running) { if (!profile->conn_active) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Amqp no connection- reconnecting...\n"); status = mod_amqp_connection_open(profile->conn_root, &(profile->conn_active), profile->name, profile->custom_attr); if ( status == SWITCH_STATUS_SUCCESS ) { // Ensure that the exchange exists, and is of the correct type #if AMQP_VERSION_MAJOR == 0 && AMQP_VERSION_MINOR >= 6 amqp_exchange_declare(profile->conn_active->state, 1, amqp_cstring_bytes(profile->exchange), amqp_cstring_bytes(profile->exchange_type), passive, durable, profile->exchange_auto_delete, 0, amqp_empty_table); #else amqp_exchange_declare(profile->conn_active->state, 1, amqp_cstring_bytes(profile->exchange), amqp_cstring_bytes(profile->exchange_type), passive, durable, amqp_empty_table); #endif if (!mod_amqp_log_if_amqp_error(amqp_get_rpc_reply(profile->conn_active->state), "Declaring exchange")) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Amqp reconnect successful- connected\n"); continue; } } switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Profile[%s] failed to connect with code(%d), sleeping for %dms\n", profile->name, status, profile->reconnect_interval_ms); switch_sleep(profile->reconnect_interval_ms * 1000); continue; } if (!msg && switch_queue_pop_timeout(profile->send_queue, (void**)&msg, 1000000) != SWITCH_STATUS_SUCCESS) { continue; } if (msg) { #ifdef MOD_AMQP_DEBUG_TIMING long times[TIME_STATS_TO_AGGREGATE]; static unsigned int thistime = 0; switch_time_t start = switch_time_now(); #endif switch (mod_amqp_producer_send(profile, msg)) { case SWITCH_STATUS_SUCCESS: /* Success: prepare for next message */ mod_amqp_util_msg_destroy(&msg); break; case SWITCH_STATUS_NOT_INITALIZED: switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Send failed with 'not initialised'\n"); break; case SWITCH_STATUS_SOCKERR: switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Send failed with 'socket error'\n"); break; default: switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Send failed with a generic error\n"); /* Send failed and closed the connection; reconnect will happen at the beginning of the loop * NB: do we need a delay here to prevent a fast reconnect-send-fail loop? */ break; } #ifdef MOD_AMQP_DEBUG_TIMING times[thistime++] = switch_time_now() - start; if (thistime >= TIME_STATS_TO_AGGREGATE) { int i; long min_time, max_time, avg_time; /* Calculate aggregate times */ min_time = max_time = avg_time = times[0]; for (i = 1; i < TIME_STATS_TO_AGGREGATE; ++i) { avg_time += times[i]; if (times[i] < min_time) min_time = times[i]; if (times[i] > max_time) max_time = times[i]; } avg_time /= TIME_STATS_TO_AGGREGATE; switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Microseconds to send last %d messages: Min %ld Max %ld Avg %ld\n", TIME_STATS_TO_AGGREGATE, min_time, max_time, avg_time); thistime = 0; } #endif } } /* Abort the current message */ mod_amqp_util_msg_destroy(&msg); // Terminate the thread switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Event sender thread stopped\n"); switch_thread_exit(thread, SWITCH_STATUS_SUCCESS); return NULL; } /* For Emacs: * Local Variables: * mode:c * indent-tabs-mode:t * tab-width:4 * c-basic-offset:4 * End: * For VIM: * vim:set softtabstop=4 shiftwidth=4 tabstop=4 */