Archived
14
0
Fork 0

A new way to try and deal with deadlocks that occur in app_queue at present. Using this approach, we only manipulate the main queue mutexes when we get a dev state change on a device that is actually a member of a queue. Further optimizations are still possible (eg - store and manage pointers to the status integer of the member record that this interface/device has a one-to-one relationship with and then go directly to those pointers to make status modifications rather than the recursive looping that goes on now) BUT first things first. :)

git-svn-id: http://svn.digium.com/svn/asterisk/trunk@30430 f38db490-d61c-443f-a65b-d21fe96a405b
This commit is contained in:
bweschke 2006-05-25 21:47:03 +00:00
parent 8a415b1bf7
commit 6d7eb1ed13

View file

@ -318,6 +318,13 @@ struct member {
struct member *next; /*!< Next member */
};
struct ast_member_interfaces {
char interface[80];
AST_LIST_ENTRY(ast_member_interfaces) list; /*!< Next call queue */
};
static AST_LIST_HEAD_STATIC(interfaces, ast_member_interfaces);
/* values used in multi-bit flags in ast_call_queue */
#define QUEUE_EMPTY_NORMAL 1
#define QUEUE_EMPTY_STRICT 2
@ -482,6 +489,7 @@ static void *changethread(void *data)
struct ast_call_queue *q;
struct statechange *sc = data;
struct member *cur;
struct ast_member_interfaces *curint;
char *loc;
char *technology;
@ -494,36 +502,50 @@ static void *changethread(void *data)
free(sc);
return NULL;
}
if (option_debug)
ast_log(LOG_DEBUG, "Device '%s/%s' changed to state '%d' (%s)\n", technology, loc, sc->state, devstate2str(sc->state));
AST_LIST_LOCK(&queues);
AST_LIST_TRAVERSE(&queues, q, list) {
ast_mutex_lock(&q->lock);
cur = q->members;
while(cur) {
if (!strcasecmp(sc->dev, cur->interface)) {
if (cur->status != sc->state) {
cur->status = sc->state;
if (!q->maskmemberstatus) {
manager_event(EVENT_FLAG_AGENT, "QueueMemberStatus",
"Queue: %s\r\n"
"Location: %s\r\n"
"Membership: %s\r\n"
"Penalty: %d\r\n"
"CallsTaken: %d\r\n"
"LastCall: %d\r\n"
"Status: %d\r\n"
"Paused: %d\r\n",
q->name, cur->interface, cur->dynamic ? "dynamic" : "static",
cur->penalty, cur->calls, (int)cur->lastcall, cur->status, cur->paused);
AST_LIST_LOCK(&interfaces);
AST_LIST_TRAVERSE(&interfaces, curint, list) {
if (!strcasecmp(curint->interface, sc->dev))
break;
}
AST_LIST_UNLOCK(&interfaces);
if (curint) {
if (option_debug)
ast_log(LOG_DEBUG, "Device '%s/%s' changed to state '%d' (%s)\n", technology, loc, sc->state, devstate2str(sc->state));
AST_LIST_LOCK(&queues);
AST_LIST_TRAVERSE(&queues, q, list) {
ast_mutex_lock(&q->lock);
cur = q->members;
while(cur) {
if (!strcasecmp(sc->dev, cur->interface)) {
if (cur->status != sc->state) {
cur->status = sc->state;
if (!q->maskmemberstatus) {
manager_event(EVENT_FLAG_AGENT, "QueueMemberStatus",
"Queue: %s\r\n"
"Location: %s\r\n"
"Membership: %s\r\n"
"Penalty: %d\r\n"
"CallsTaken: %d\r\n"
"LastCall: %d\r\n"
"Status: %d\r\n"
"Paused: %d\r\n",
q->name, cur->interface, cur->dynamic ? "dynamic" : "static",
cur->penalty, cur->calls, (int)cur->lastcall, cur->status, cur->paused);
}
}
}
cur = cur->next;
}
cur = cur->next;
ast_mutex_unlock(&q->lock);
}
ast_mutex_unlock(&q->lock);
AST_LIST_UNLOCK(&queues);
} else {
if (option_debug)
ast_log(LOG_DEBUG, "Device '%s/%s' changed to state '%d' (%s) but we don't care because they're not a member of any queue.\n", technology, loc, sc->state, devstate2str(sc->state));
}
AST_LIST_UNLOCK(&queues);
free(sc);
return NULL;
}
@ -622,6 +644,87 @@ static void clear_queue(struct ast_call_queue *q)
q->wrapuptime = 0;
}
static int add_to_interfaces(char *interface)
{
struct ast_member_interfaces *curint, *newint;
AST_LIST_LOCK(&interfaces);
AST_LIST_TRAVERSE(&interfaces, curint, list) {
if (!strcasecmp(curint->interface, interface))
break;
}
if (!curint) {
if (option_debug)
ast_log(LOG_DEBUG, "Adding %s to the list of interfaces that make up all of our queue members.\n", interface);
if ((newint = ast_calloc(1, sizeof(*newint)))) {
ast_copy_string(newint->interface, interface, sizeof(newint->interface));
AST_LIST_INSERT_HEAD(&interfaces, newint, list);
}
}
AST_LIST_UNLOCK(&interfaces);
return 0;
}
static int interface_exists_global(char *interface)
{
struct ast_call_queue *q;
struct member *mem;
int ret = 0;
AST_LIST_LOCK(&queues);
AST_LIST_TRAVERSE(&queues, q, list) {
ast_mutex_lock(&q->lock);
for (mem = q->members; mem; mem = mem->next)
if (!strcasecmp(interface, mem->interface)) {
ast_mutex_unlock(&q->lock);
ret = 1;
break;
}
ast_mutex_unlock(&q->lock);
}
AST_LIST_UNLOCK(&queues);
return ret;
}
static int remove_from_interfaces(char *interface)
{
struct ast_member_interfaces *curint;
AST_LIST_LOCK(&interfaces);
AST_LIST_TRAVERSE_SAFE_BEGIN(&interfaces, curint, list) {
if (!strcasecmp(curint->interface, interface) && !interface_exists_global(interface)) {
if (option_debug)
ast_log(LOG_DEBUG, "Removing %s from the list of interfaces that make up all of our queue members.\n", interface);
AST_LIST_REMOVE_CURRENT(&interfaces, list);
free(curint);
}
}
AST_LIST_TRAVERSE_SAFE_END;
AST_LIST_UNLOCK(&interfaces);
return 0;
}
static void clear_and_free_interfaces(void)
{
struct ast_member_interfaces *curint;
AST_LIST_LOCK(&interfaces);
AST_LIST_TRAVERSE_SAFE_BEGIN(&interfaces, curint, list) {
AST_LIST_REMOVE_CURRENT(&interfaces, list);
free(curint);
}
AST_LIST_TRAVERSE_SAFE_END;
AST_LIST_UNLOCK(&interfaces);
return;
}
/*! \brief Configure a queue parameter.
\par
For error reporting, line number is passed for .conf static configuration.
@ -802,6 +905,7 @@ static void rt_handle_member_record(struct ast_call_queue *q, char *interface, c
m = create_queue_member(interface, penalty, 0);
if (m) {
m->dead = 0;
add_to_interfaces(interface);
if (prev_m) {
prev_m->next = m;
} else {
@ -826,6 +930,7 @@ static void free_members(struct ast_call_queue *q, int all)
prev->next = next;
else
q->members = next;
remove_from_interfaces(curm->interface);
free(curm);
} else
prev = curm;
@ -948,6 +1053,7 @@ static struct ast_call_queue *find_queue_by_name_rt(const char *queuename, struc
} else {
q->members = next_m;
}
remove_from_interfaces(m->interface);
free(m);
} else {
prev_m = m;
@ -1065,9 +1171,8 @@ static int join_queue(char *queuename, struct queue_ent *qe, enum queue_result *
S_OR(qe->chan->cid.cid_num, "unknown"), /* XXX somewhere else it is <unknown> */
S_OR(qe->chan->cid.cid_name, "unknown"),
q->name, qe->pos, q->count, qe->chan->uniqueid );
#if 0
ast_log(LOG_NOTICE, "Queue '%s' Join, Channel '%s', Position '%d'\n", q->name, qe->chan->name, qe->pos );
#endif
if (option_debug)
ast_log(LOG_DEBUG, "Queue '%s' Join, Channel '%s', Position '%d'\n", q->name, qe->chan->name, qe->pos );
}
ast_mutex_unlock(&q->lock);
AST_LIST_UNLOCK(&queues);
@ -2596,10 +2701,14 @@ static int remove_from_queue(char *queuename, char *interface)
}
ast_mutex_unlock(&q->lock);
}
if (res == RES_OKAY) {
remove_from_interfaces(interface);
}
AST_LIST_UNLOCK(&queues);
return res;
}
static int add_to_queue(char *queuename, char *interface, int penalty, int paused, int dump)
{
struct ast_call_queue *q;
@ -2615,6 +2724,9 @@ static int add_to_queue(char *queuename, char *interface, int penalty, int pause
if (q) {
ast_mutex_lock(&q->lock);
if (interface_exists(q, interface) == NULL) {
add_to_interfaces(interface);
new_member = create_queue_member(interface, penalty, paused);
if (new_member != NULL) {
@ -3183,7 +3295,6 @@ check_turns:
/* Try calling all queue members for 'timeout' seconds */
res = try_calling(&qe, args.options, args.announceoverride, args.url, &go_on, args.agi);
if (res) {
if (res < 0) {
if (!qe.handled) {
@ -3552,6 +3663,8 @@ static void reload_queues(void)
}
free(cur);
} else {
/* Add them to the master int list if necessary */
add_to_interfaces(interface);
newm->next = q->members;
q->members = newm;
}
@ -3575,6 +3688,7 @@ static void reload_queues(void)
q->members = cur->next;
newm = cur;
}
remove_from_interfaces(cur->interface);
}
}
@ -4128,6 +4242,7 @@ static int unload_module(void *mod)
{
int res;
clear_and_free_interfaces();
res = ast_cli_unregister(&cli_show_queue);
res |= ast_cli_unregister(&cli_show_queues);
res |= ast_cli_unregister(&cli_add_queue_member);
@ -4137,7 +4252,6 @@ static int unload_module(void *mod)
res |= ast_manager_unregister("QueueAdd");
res |= ast_manager_unregister("QueueRemove");
res |= ast_manager_unregister("QueuePause");
ast_devstate_del(statechange_queue, NULL);
res |= ast_unregister_application(app_aqm);
res |= ast_unregister_application(app_rqm);
res |= ast_unregister_application(app_pqm);
@ -4162,7 +4276,6 @@ static int load_module(void *mod)
res |= ast_cli_register(&cli_show_queues);
res |= ast_cli_register(&cli_add_queue_member);
res |= ast_cli_register(&cli_remove_queue_member);
res |= ast_devstate_add(statechange_queue, NULL);
res |= ast_manager_register( "Queues", 0, manager_queues_show, "Queues" );
res |= ast_manager_register( "QueueStatus", 0, manager_queues_status, "Queue Status" );
res |= ast_manager_register( "QueueAdd", EVENT_FLAG_AGENT, manager_add_queue_member, "Add interface to queue." );
@ -4176,6 +4289,7 @@ static int load_module(void *mod)
res |= ast_custom_function_register(&queuemembercount_function);
res |= ast_custom_function_register(&queuememberlist_function);
res |= ast_custom_function_register(&queuewaitingcount_function);
res |= ast_devstate_add(statechange_queue, NULL);
if (!res) {
reload_queues();