FS-7526: add enable_fallback_format_fields for mod_amqp producer profiles if the profile param is set.

This commit is contained in:
William King 2015-05-03 11:23:30 -07:00
parent d8d3db284c
commit d3bac671d2
3 changed files with 50 additions and 11 deletions

View File

@ -25,10 +25,16 @@
<param name="circuit_breaker_ms" value="10000"/>
<param name="reconnect_interval_ms" value="1000"/>
<param name="send_queue_size" value="5000"/>
<param name="enable_fallback_format_fields" value="1"/>
<!-- The routing key is made from the format string, using the header values in the event specified in the format_fields.-->
<!-- Fields that are prefixed with a # are treated as literals rather than doing a header lookup -->
<param name="format_fields" value="#FreeSWITCH,FreeSWITCH-Hostname,Event-Name,Event-Subclass,Unique-ID"/>
<!-- If enable_fallback_format_fields is enabled, then you can | separate event headers, and if the first does not exist
then the system will check additional configured header values.
-->
<!-- <param name="format_fields" value="#FreeSWITCH,FreeSWITCH-Hostname|#Unknown,Event-Name,Event-Subclass,Unique-ID"/> -->
<!-- <param name="eventFilter" value="SWITCH_EVENT_ALL"/> -->
<param name="event_filter" value="SWITCH_EVENT_CHANNEL_CREATE,SWITCH_EVENT_CHANNEL_DESTROY,SWITCH_EVENT_HEARTBEAT,SWITCH_EVENT_DTMF"/>

View File

@ -104,6 +104,7 @@ typedef struct {
int reconnect_interval_ms;
int circuit_breaker_ms;
switch_time_t circuit_breaker_reset_time;
switch_bool_t enable_fallback_format_fields;
switch_bool_t running;
switch_memory_pool_t *pool;
@ -156,7 +157,8 @@ void * SWITCH_THREAD_FUNC mod_amqp_command_thread(switch_thread_t *thread, void
/* producer */
void mod_amqp_producer_event_handler(switch_event_t* evt);
switch_status_t mod_amqp_producer_routing_key(char routingKey[MAX_AMQP_ROUTING_KEY_LENGTH],switch_event_t* evt, char* routingKeyEventHeaderNames[]);
switch_status_t mod_amqp_producer_routing_key(mod_amqp_producer_profile_t *profile, char routingKey[MAX_AMQP_ROUTING_KEY_LENGTH],
switch_event_t* evt, char* routingKeyEventHeaderNames[]);
switch_status_t mod_amqp_producer_destroy(mod_amqp_producer_profile_t **profile);
switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg);
void * SWITCH_THREAD_FUNC mod_amqp_producer_thread(switch_thread_t *thread, void *data);

View File

@ -45,7 +45,8 @@ void mod_amqp_producer_msg_destroy(mod_amqp_message_t **msg)
switch_safe_free(*msg);
}
switch_status_t mod_amqp_producer_routing_key(char routingKey[MAX_AMQP_ROUTING_KEY_LENGTH], switch_event_t* evt, char* routingKeyEventHeaderNames[])
switch_status_t mod_amqp_producer_routing_key(mod_amqp_producer_profile_t *profile, char routingKey[MAX_AMQP_ROUTING_KEY_LENGTH],
switch_event_t* evt, char* routingKeyEventHeaderNames[])
{
int i = 0, idx = 0;
@ -54,16 +55,41 @@ switch_status_t mod_amqp_producer_routing_key(char routingKey[MAX_AMQP_ROUTING_K
if (idx) {
routingKey[idx++] = '.';
}
if (routingKeyEventHeaderNames[i][0] == '#') {
strncpy(routingKey + idx, routingKeyEventHeaderNames[i] + 1, MAX_AMQP_ROUTING_KEY_LENGTH - idx);
} else {
char *value = switch_event_get_header(evt, routingKeyEventHeaderNames[i]);
strncpy(routingKey + idx, value ? value : "", MAX_AMQP_ROUTING_KEY_LENGTH - idx);
if ( profile->enable_fallback_format_fields) {
int count = 0, x = 0;
char *argv[10];
/* Replace dots with underscores so that the routing key does not get corrupted */
switch_replace_char(routingKey + idx, '.', '_', 0);
count = switch_separate_string(routingKeyEventHeaderNames[i], '|', argv, (sizeof(argv) / sizeof(argv[0])));
for( x = 0; x < count; x++) {
if (argv[x][0] == '#') {
strncpy(routingKey + idx, argv[x] + 1, MAX_AMQP_ROUTING_KEY_LENGTH - idx);
break;
} else {
char *value = switch_event_get_header(evt, argv[x]);
if (!value) {
continue;
}
strncpy(routingKey + idx, value, MAX_AMQP_ROUTING_KEY_LENGTH - idx);
/* Replace dots with underscores so that the routing key does not get corrupted */
switch_replace_char(routingKey + idx, '.', '_', 0);
}
}
idx += strlen(routingKey + idx);
} else {
if (routingKeyEventHeaderNames[i][0] == '#') {
strncpy(routingKey + idx, routingKeyEventHeaderNames[i] + 1, MAX_AMQP_ROUTING_KEY_LENGTH - idx);
} else {
char *value = switch_event_get_header(evt, routingKeyEventHeaderNames[i]);
strncpy(routingKey + idx, value ? value : "", MAX_AMQP_ROUTING_KEY_LENGTH - idx);
/* Replace dots with underscores so that the routing key does not get corrupted */
switch_replace_char(routingKey + idx, '.', '_', 0);
}
idx += strlen(routingKey + idx);
}
idx += strlen(routingKey + idx);
}
}
return SWITCH_STATUS_SUCCESS;
@ -97,7 +123,7 @@ void mod_amqp_producer_event_handler(switch_event_t* evt)
switch_malloc(amqp_message, sizeof(mod_amqp_message_t));
switch_event_serialize_json(evt, &amqp_message->pjson);
mod_amqp_producer_routing_key(amqp_message->routing_key, evt, profile->format_fields);
mod_amqp_producer_routing_key(profile, amqp_message->routing_key, evt, profile->format_fields);
/* Queue the message to be sent by the worker thread, errors are reported only once per circuit breaker interval */
if (switch_queue_trypush(profile->send_queue, amqp_message) != SWITCH_STATUS_SUCCESS) {
@ -221,6 +247,11 @@ switch_status_t mod_amqp_producer_create(char *name, switch_xml_t cfg)
if ( interval && interval > 0 ) {
profile->send_queue_size = interval;
}
} else if (!strncmp(var, "enable_fallback_format_fields", 29)) {
int interval = atoi(val);
if ( interval && interval > 0 ) {
profile->enable_fallback_format_fields = 1;
}
} else if (!strncmp(var, "exchange", 8)) {
exchange = switch_core_strdup(profile->pool, "TAP.Events");
} else if (!strncmp(var, "format_fields", 13)) {