From 11d03864adc34851e4bfc1cfde973533e160d18a Mon Sep 17 00:00:00 2001 From: Anthony Minessale Date: Wed, 19 Mar 2008 16:03:51 +0000 Subject: [PATCH] update git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@7929 d0543943-73ff-0310-b7d9-9358b9ac24b2 --- src/mod/applications/mod_fifo/mod_fifo.c | 135 +++++++++++++++++------ 1 file changed, 102 insertions(+), 33 deletions(-) diff --git a/src/mod/applications/mod_fifo/mod_fifo.c b/src/mod/applications/mod_fifo/mod_fifo.c index 4b6a44efba..9553e1b584 100644 --- a/src/mod/applications/mod_fifo/mod_fifo.c +++ b/src/mod/applications/mod_fifo/mod_fifo.c @@ -36,6 +36,20 @@ SWITCH_MODULE_DEFINITION(mod_fifo, mod_fifo_load, mod_fifo_shutdown, NULL); #define FIFO_EVENT "fifo::info" +#define MAX_PRI 10 + +struct fifo_node { + char *name; + switch_mutex_t *mutex; + switch_queue_t *fifo_list[MAX_PRI]; + switch_hash_t *caller_hash; + switch_hash_t *consumer_hash; + int caller_count; + int waiting_count; + int consumer_count; +}; + +typedef struct fifo_node fifo_node_t; static switch_status_t on_dtmf(switch_core_session_t *session, void *input, switch_input_type_t itype, void *buf, unsigned int buflen) @@ -66,10 +80,17 @@ static switch_status_t on_dtmf(switch_core_session_t *session, void *input, swit static switch_status_t read_frame_callback(switch_core_session_t *session, switch_frame_t *frame, void *user_data) { - switch_queue_t *fifo = (switch_queue_t *) user_data; - if (switch_queue_size(fifo)) { + fifo_node_t *node = (fifo_node_t *) user_data; + int x = 0, total = 0; + + for (x = 0; x < MAX_PRI; x++) { + total += switch_queue_size(node->fifo_list[x]); + } + + if (total) { return SWITCH_STATUS_FALSE; } + return SWITCH_STATUS_SUCCESS; } @@ -81,35 +102,24 @@ static struct { } globals; -struct fifo_node { - char *name; - switch_mutex_t *mutex; - switch_queue_t *fifo; - switch_hash_t *caller_hash; - switch_hash_t *consumer_hash; - int caller_count; - int waiting_count; - int consumer_count; -}; - -typedef struct fifo_node fifo_node_t; - - static fifo_node_t *create_node(const char *name) { fifo_node_t *node; - + int x = 0; + if (!globals.running) { return NULL; } node = switch_core_alloc(globals.pool, sizeof(*node)); node->name = switch_core_strdup(globals.pool, name); - - switch_queue_create(&node->fifo, SWITCH_CORE_QUEUE_LEN, globals.pool); + for (x = 0; x < MAX_PRI; x++) { + switch_queue_create(&node->fifo_list[x], SWITCH_CORE_QUEUE_LEN, globals.pool); + switch_assert(node->fifo_list[x]); + } switch_core_hash_init(&node->caller_hash, globals.pool); switch_core_hash_init(&node->consumer_hash, globals.pool); - switch_assert(node->fifo); + switch_mutex_init(&node->mutex, SWITCH_MUTEX_NESTED, globals.pool); switch_core_hash_insert(globals.fifo_hash, name, node); @@ -228,6 +238,8 @@ SWITCH_STANDARD_APP(fifo_function) if (!strcasecmp(argv[1], "in")) { const char *uuid = strdup(switch_core_session_get_uuid(session)); + const char *pri; + int p = 0; switch_channel_answer(channel); @@ -253,7 +265,17 @@ SWITCH_STANDARD_APP(fifo_function) node->waiting_count++; send_presence(node); switch_core_hash_insert(node->caller_hash, uuid, session); - switch_queue_push(node->fifo, (void *)uuid); + + if ((pri = switch_channel_get_variable(channel, "fifo_priority"))) { + p = atoi(pri); + } + + if (p >= MAX_PRI) { + p = MAX_PRI - 1; + } + + switch_queue_push(node->fifo_list[p], (void *)uuid); + switch_mutex_unlock(node->mutex); ts = switch_timestamp_now(); @@ -311,7 +333,11 @@ SWITCH_STANDARD_APP(fifo_function) int done = 0; switch_core_session_t *other_session; switch_input_args_t args = { 0 }; - + const char *pop_order = NULL; + int custom_pop = 0; + int pop_array[MAX_PRI] = { 0 }; + char *pop_list[MAX_PRI] = { 0 }; + if (argc > 3) { announce = argv[3]; } @@ -344,27 +370,58 @@ SWITCH_STANDARD_APP(fifo_function) switch_channel_set_variable(channel, "fifo_status", "WAITING"); switch_channel_set_variable(channel, "fifo_timestamp", date); + if ((pop_order = switch_channel_get_variable(channel, "fifo_pop_order"))) { + char *tmp = switch_core_session_strdup(session, pop_order); + int x; + custom_pop = switch_separate_string(tmp, ',', pop_list, (sizeof(pop_list) / sizeof(pop_list[0]))); + if (custom_pop >= MAX_PRI) { + custom_pop = MAX_PRI -1; + } + + for (x = 0; x < custom_pop; x++) { + int tmp = atoi(pop_list[x]); + if (tmp > -1 && tmp < MAX_PRI) { + pop_array[x] = tmp; + } + } + } + while(switch_channel_ready(channel)) { + int x = 0 ; if (moh) { args.read_frame_callback = read_frame_callback; - args.user_data = node->fifo; + args.user_data = node; switch_ivr_play_file(session, NULL, moh, &args); } + + if (custom_pop) { + for(x = 0; x < MAX_PRI; x++) { + if (switch_queue_trypop(node->fifo_list[pop_array[x]], &pop) == SWITCH_STATUS_SUCCESS && pop) { + break; + } + } + } else { + for(x = 0; x < MAX_PRI; x++) { + if (switch_queue_trypop(node->fifo_list[x], &pop) == SWITCH_STATUS_SUCCESS && pop) { + break; + } + } + } - if (switch_queue_trypop(node->fifo, &pop) != SWITCH_STATUS_SUCCESS) { + if (!pop) { if (nowait) { break; } + status = switch_core_session_read_frame(session, &read_frame, -1, 0); + if (!SWITCH_READ_ACCEPTABLE(status)) { break; } + continue; } - if (!pop) { - break; - } - + uuid = (char *) pop; if ((other_session = switch_core_session_locate(uuid))) { @@ -615,9 +672,13 @@ SWITCH_STANDARD_API(fifo_api_function) } else if (!strcasecmp(argv[0], "count")) { if (argc < 2) { for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) { + int x = 0; switch_hash_this(hi, &var, NULL, &val); node = (fifo_node_t *) val; - len = switch_queue_size(node->fifo); + len = 0; + for (x = 0 ;x < MAX_PRI; x++) { + len += switch_queue_size(node->fifo_list[x]); + } switch_mutex_lock(node->mutex); stream->write_function(stream, "%s:%d:%d:%d\n", (char *)var, node->consumer_count, node->caller_count, len); switch_mutex_unlock(node->mutex); @@ -629,7 +690,12 @@ SWITCH_STANDARD_API(fifo_api_function) } } else { if ((node = switch_core_hash_find(globals.fifo_hash, argv[1]))) { - len = switch_queue_size(node->fifo); + int x = 0; + len = 0; + for (x = 0 ;x < MAX_PRI; x++) { + len += switch_queue_size(node->fifo_list[x]); + } + } switch_mutex_lock(node->mutex); stream->write_function(stream, "%s:%d:%d:%d\n", argv[1], node->consumer_count, node->caller_count, len); @@ -689,11 +755,14 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_fifo_shutdown) globals.running = 0; /* Cleanup*/ for (hi = switch_hash_first(NULL, globals.fifo_hash); hi; hi = switch_hash_next(hi)) { + int x = 0 ; switch_hash_this(hi, NULL, NULL, &val); node = (fifo_node_t *) val; - while (switch_queue_trypop(node->fifo, &pop) == SWITCH_STATUS_SUCCESS) { - free(pop); - } + for (x = 0; x < MAX_PRI; x++) { + while (switch_queue_trypop(node->fifo_list[x], &pop) == SWITCH_STATUS_SUCCESS) { + free(pop); + } + } switch_core_hash_destroy(&node->caller_hash); switch_core_hash_destroy(&node->consumer_hash); }