FS-8306 Now command queues can specify the queue to subscribe to. This enables very interesting use cases that would involve single job queue, and multiple consumers.

This commit is contained in:
William King 2015-10-10 16:39:53 -07:00
parent ba63cc4574
commit b5301688d7
2 changed files with 6 additions and 2 deletions

View File

@ -128,6 +128,7 @@ typedef struct {
char *name;
char *exchange;
char *queue;
char *binding_key;
/* Note: The AMQP channel is not reentrant this MUTEX serializes sending events. */

View File

@ -86,7 +86,7 @@ switch_status_t mod_amqp_command_create(char *name, switch_xml_t cfg)
switch_xml_t params, param, connections, connection;
switch_threadattr_t *thd_attr = NULL;
switch_memory_pool_t *pool;
char *exchange = NULL, *binding_key = NULL;
char *exchange = NULL, *binding_key = NULL, *queue = NULL;
if (switch_core_new_memory_pool(&pool) != SWITCH_STATUS_SUCCESS) {
goto err;
@ -121,6 +121,8 @@ switch_status_t mod_amqp_command_create(char *name, switch_xml_t cfg)
}
} else if (!strncmp(var, "exchange-name", 13)) {
exchange = switch_core_strdup(profile->pool, val);
} else if (!strncmp(var, "queue-name", 10)) {
queue = switch_core_strdup(profile->pool, val);
} else if (!strncmp(var, "binding_key", 11)) {
binding_key = switch_core_strdup(profile->pool, val);
}
@ -129,6 +131,7 @@ switch_status_t mod_amqp_command_create(char *name, switch_xml_t cfg)
/* Handle defaults of string types */
profile->exchange = exchange ? exchange : switch_core_strdup(profile->pool, "TAP.Commands");
profile->queue = queue ? queue : NULL;
profile->binding_key = binding_key ? binding_key : switch_core_strdup(profile->pool, "commandBindingKey");
if ((connections = switch_xml_child(cfg, "connections")) != NULL) {
@ -214,7 +217,7 @@ void * SWITCH_THREAD_FUNC mod_amqp_command_thread(switch_thread_t *thread, void
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Creating command queue");
recv_queue = amqp_queue_declare(profile->conn_active->state, // state
1, // channel
amqp_empty_bytes, // queue name
profile->queue ? amqp_cstring_bytes(profile->queue) : amqp_empty_bytes, // queue name
0, 0, // passive, durable
0, 1, // exclusive, auto-delete
amqp_empty_table); // args