dect
/
asterisk
Archived
13
0
Fork 0

Add coordination between AMI and AGI applications, with an asyncagi method

Feature proposed and patched by: moy
(Closes issue #11282)


git-svn-id: http://svn.digium.com/svn/asterisk/trunk@96174 f38db490-d61c-443f-a65b-d21fe96a405b
This commit is contained in:
tilghman 2008-01-03 06:16:48 +00:00
parent 2790a8dc3e
commit 3929e42198
1 changed files with 391 additions and 3 deletions

View File

@ -52,6 +52,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/lock.h"
#include "asterisk/strings.h"
#include "asterisk/agi.h"
#include "asterisk/manager.h"
#include "asterisk/version.h"
#include "asterisk/speech.h"
#include "asterisk/manager.h"
@ -101,6 +102,7 @@ static int agidebug = 0;
enum agi_result {
AGI_RESULT_SUCCESS,
AGI_RESULT_SUCCESS_FAST,
AGI_RESULT_SUCCESS_ASYNC,
AGI_RESULT_FAILURE,
AGI_RESULT_NOTFOUND,
AGI_RESULT_HANGUP,
@ -140,6 +142,373 @@ int ast_agi_fdprintf(struct ast_channel *chan, int fd, char *fmt, ...)
return ast_carefulwrite(fd, buf->str, buf->used, 100);
}
/* linked list of AGI commands ready to be executed by Async AGI */
struct agi_cmd {
char *cmd_buffer;
char *cmd_id;
AST_LIST_ENTRY(agi_cmd) entry;
};
static void free_agi_cmd(struct agi_cmd *cmd)
{
ast_free(cmd->cmd_buffer);
ast_free(cmd->cmd_id);
ast_free(cmd);
}
/* AGI datastore destructor */
static void agi_destroy_commands_cb(void *data)
{
struct agi_cmd *cmd;
AST_LIST_HEAD(, agi_cmd) *chan_cmds = data;
AST_LIST_LOCK(chan_cmds);
while ( (cmd = AST_LIST_REMOVE_HEAD(chan_cmds, entry)) ) {
free_agi_cmd(cmd);
}
AST_LIST_UNLOCK(chan_cmds);
AST_LIST_HEAD_DESTROY(chan_cmds);
ast_free(chan_cmds);
}
/* channel datastore to keep the queue of AGI commands in the channel */
static const struct ast_datastore_info agi_commands_datastore_info = {
.type = "AsyncAGI",
.destroy = agi_destroy_commands_cb
};
static const char mandescr_asyncagi[] =
"Description: Add an AGI command to the execute queue of the channel in Async AGI\n"
"Variables:\n"
" *Channel: Channel that is currently in Async AGI\n"
" *Command: Application to execute\n"
" CommandID: comand id. This will be sent back in CommandID header of AsyncAGI exec event notification\n"
"\n";
static struct agi_cmd *get_agi_cmd(struct ast_channel *chan)
{
struct ast_datastore *store;
struct agi_cmd *cmd;
AST_LIST_HEAD(, agi_cmd) *agi_commands;
ast_channel_lock(chan);
store = ast_channel_datastore_find(chan, &agi_commands_datastore_info, NULL);
ast_channel_unlock(chan);
if (!store) {
ast_log(LOG_ERROR, "Hu? datastore disappeared at Async AGI on Channel %s!\n", chan->name);
return NULL;
}
agi_commands = store->data;
AST_LIST_LOCK(agi_commands);
cmd = AST_LIST_REMOVE_HEAD(agi_commands, entry);
AST_LIST_UNLOCK(agi_commands);
return cmd;
}
/* channel is locked when calling this one either from the CLI or manager thread */
static int add_agi_cmd(struct ast_channel *chan, const char *cmd_buff, const char *cmd_id)
{
struct ast_datastore *store;
struct agi_cmd *cmd;
AST_LIST_HEAD(, agi_cmd) *agi_commands;
store = ast_channel_datastore_find(chan, &agi_commands_datastore_info, NULL);
if (!store) {
ast_log(LOG_WARNING, "Channel %s is not at Async AGI.\n", chan->name);
return -1;
}
agi_commands = store->data;
cmd = ast_calloc(1, sizeof(*cmd));
if (!cmd) {
return -1;
}
cmd->cmd_buffer = ast_strdup(cmd_buff);
if (!cmd->cmd_buffer) {
ast_free(cmd);
return -1;
}
cmd->cmd_id = ast_strdup(cmd_id);
if (!cmd->cmd_id) {
ast_free(cmd->cmd_buffer);
ast_free(cmd);
return -1;
}
AST_LIST_LOCK(agi_commands);
AST_LIST_INSERT_TAIL(agi_commands, cmd, entry);
AST_LIST_UNLOCK(agi_commands);
return 0;
}
static int add_to_agi(struct ast_channel *chan)
{
struct ast_datastore *datastore;
AST_LIST_HEAD(, agi_cmd) *agi_cmds_list;
/* check if already on AGI */
ast_channel_lock(chan);
datastore = ast_channel_datastore_find(chan, &agi_commands_datastore_info, NULL);
ast_channel_unlock(chan);
if (datastore) {
/* we already have an AGI datastore, let's just
return success */
return 0;
}
/* the channel has never been on Async AGI,
let's allocate it's datastore */
datastore = ast_channel_datastore_alloc(&agi_commands_datastore_info, "AGI");
if (!datastore) {
return -1;
}
agi_cmds_list = ast_calloc(1, sizeof(*agi_cmds_list));
if (!agi_cmds_list) {
ast_log(LOG_ERROR, "Unable to allocate Async AGI commands list.\n");
ast_channel_datastore_free(datastore);
return -1;
}
datastore->data = agi_cmds_list;
AST_LIST_HEAD_INIT(agi_cmds_list);
ast_channel_lock(chan);
ast_channel_datastore_add(chan, datastore);
ast_channel_unlock(chan);
return 0;
}
/*!
* \brief CLI command to add applications to execute in Async AGI
* \param e
* \param cmd
* \param a
*
* \retval CLI_SUCCESS on success
* \retval NULL when init or tab completion is used
*/
static char *handle_cli_agi_add_cmd(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{
struct ast_channel *chan;
switch (cmd) {
case CLI_INIT:
e->command = "agi exec";
e->usage = "Usage: agi exec <channel name> <app and arguments> [id]\n"
" Add AGI command to the execute queue of the specified channel in Async AGI\n";
return NULL;
case CLI_GENERATE:
if (a->pos == 2)
return ast_complete_channels(a->line, a->word, a->pos, a->n, 2);
return NULL;
}
if (a->argc < 4)
return CLI_SHOWUSAGE;
chan = ast_get_channel_by_name_locked(a->argv[2]);
if (!chan) {
ast_log(LOG_WARNING, "Channel %s does not exists or cannot lock it\n", a->argv[2]);
return CLI_FAILURE;
}
if (add_agi_cmd(chan, a->argv[3], (a->argc > 4 ? a->argv[4] : ""))) {
ast_log(LOG_WARNING, "failed to add AGI command to queue of channel %s\n", chan->name);
ast_channel_unlock(chan);
return CLI_FAILURE;
}
ast_log(LOG_DEBUG, "Added AGI command to channel %s queue\n", chan->name);
ast_channel_unlock(chan);
return CLI_SUCCESS;
}
/*!
* \brief Add a new command to execute by the Async AGI application
* \param s
* \param m
*
* It will append the application to the specified channel's queue
* if the channel is not inside Async AGI application it will return an error
* \retval 0 on success or incorrect use
* \retval 1 on failure to add the command ( most likely because the channel
* is not in Async AGI loop )
*/
static int action_add_agi_cmd(struct mansession *s, const struct message *m)
{
const char *channel = astman_get_header(m, "Channel");
const char *cmdbuff = astman_get_header(m, "Command");
const char *cmdid = astman_get_header(m, "CommandID");
struct ast_channel *chan;
char buf[256];
if (ast_strlen_zero(channel) || ast_strlen_zero(cmdbuff)) {
astman_send_error(s, m, "Both, Channel and Command are *required*");
return 0;
}
chan = ast_get_channel_by_name_locked(channel);
if (!chan) {
snprintf(buf, sizeof(buf), "Channel %s does not exists or cannot get its lock", channel);
astman_send_error(s, m, buf);
return 1;
}
if (add_agi_cmd(chan, cmdbuff, cmdid)) {
snprintf(buf, sizeof(buf), "Failed to add AGI command to channel %s queue", chan->name);
astman_send_error(s, m, buf);
ast_channel_unlock(chan);
return 1;
}
astman_send_ack(s, m, "Added AGI command to queue");
ast_channel_unlock(chan);
return 0;
}
static int agi_handle_command(struct ast_channel *chan, AGI *agi, char *buf, int dead);
static void setup_env(struct ast_channel *chan, char *request, int fd, int enhanced, int argc, char *argv[]);
static enum agi_result launch_asyncagi(struct ast_channel *chan, char *argv[], int *efd)
{
/* This buffer sizes might cause truncation if the AGI command writes more data
than AGI_BUF_SIZE as result. But let's be serious, is there an AGI command
that writes a response larger than 1024 bytes?, I don't think so, most of
them are just result=blah stuff. However probably if GET VARIABLE is called
and the variable has large amount of data, that could be a problem. We could
make this buffers dynamic, but let's leave that as a second step.
AMI_BUF_SIZE is twice AGI_BUF_SIZE just for the sake of choosing a safe
number. Some characters of AGI buf will be url encoded to be sent to manager
clients. An URL encoded character will take 3 bytes, but again, to cause
truncation more than about 70% of the AGI buffer should be URL encoded for
that to happen. Not likely at all.
On the other hand. I wonder if read() could eventually return less data than
the amount already available in the pipe? If so, how to deal with that?
So far, my tests on Linux have not had any problems.
*/
#define AGI_BUF_SIZE 1024
#define AMI_BUF_SIZE 2048
struct ast_frame *f;
struct agi_cmd *cmd;
int res, fds[2];
int timeout = 100;
char agi_buffer[AGI_BUF_SIZE + 1];
char ami_buffer[AMI_BUF_SIZE];
enum agi_result returnstatus = AGI_RESULT_SUCCESS_ASYNC;
AGI async_agi;
if (efd) {
ast_log(LOG_WARNING, "Async AGI does not support Enhanced AGI yet\n");
return AGI_RESULT_FAILURE;
}
/* add AsyncAGI datastore to the channel */
if (add_to_agi(chan)) {
ast_log(LOG_ERROR, "failed to start Async AGI on channel %s\n", chan->name);
return AGI_RESULT_FAILURE;
}
/* this pipe allows us to create a "fake" AGI struct to use
the AGI commands */
res = pipe(fds);
if (res) {
ast_log(LOG_ERROR, "failed to create Async AGI pipe\n");
/* intentionally do not remove datastore, added with
add_to_agi(), from channel. It will be removed when
the channel is hung up anyways */
return AGI_RESULT_FAILURE;
}
/* handlers will get the pipe write fd and we read the AGI responses
from the pipe read fd */
async_agi.fd = fds[1];
async_agi.ctrl = fds[1];
async_agi.audio = -1; /* no audio support */
async_agi.fast = 0;
/* notify possible manager users of a new channel ready to
receive commands */
setup_env(chan, "async", fds[1], 0, 0, NULL);
/* read the environment */
res = read(fds[0], agi_buffer, AGI_BUF_SIZE);
if (!res) {
ast_log(LOG_ERROR, "failed to read from Async AGI pipe on channel %s\n", chan->name);
returnstatus = AGI_RESULT_FAILURE;
goto quit;
}
agi_buffer[res] = '\0';
/* encode it and send it thru the manager so whoever is going to take
care of AGI commands on this channel can decide which AGI commands
to execute based on the setup info */
ast_uri_encode(agi_buffer, ami_buffer, AMI_BUF_SIZE, 1);
manager_event(EVENT_FLAG_CALL, "AsyncAGI", "SubEvent: Start\r\nChannel: %s\r\nEnv: %s\r\n", chan->name, ami_buffer);
while (1) {
/* bail out if we need to hangup */
if (ast_check_hangup(chan)) {
ast_log(LOG_DEBUG, "ast_check_hangup returned true on chan %s\n", chan->name);
break;
}
/* retrieve a command
(commands are added via the manager or the cli threads) */
cmd = get_agi_cmd(chan);
if (cmd) {
/* OK, we have a command, let's call the
command handler. */
res = agi_handle_command(chan, &async_agi, cmd->cmd_buffer, 0);
if ((res < 0) || (res == AST_PBX_KEEPALIVE)) {
free_agi_cmd(cmd);
break;
}
/* the command handler must have written to our fake
AGI struct fd (the pipe), let's read the response */
res = read(fds[0], agi_buffer, AGI_BUF_SIZE);
if (!res) {
returnstatus = AGI_RESULT_FAILURE;
ast_log(LOG_ERROR, "failed to read from AsyncAGI pipe on channel %s\n", chan->name);
free_agi_cmd(cmd);
break;
}
/* we have a response, let's send the response thru the
manager. Include the CommandID if it was specified
when the command was added */
agi_buffer[res] = '\0';
ast_uri_encode(agi_buffer, ami_buffer, AMI_BUF_SIZE, 1);
if (ast_strlen_zero(cmd->cmd_id))
manager_event(EVENT_FLAG_CALL, "AsyncAGI", "SubEvent: Exec\r\nChannel: %s\r\nResult: %s\r\n", chan->name, ami_buffer);
else
manager_event(EVENT_FLAG_CALL, "AsyncAGI", "SubEvent: Exec\r\nChannel: %s\r\nCommandID: %s\r\nResult: %s\r\n", chan->name, cmd->cmd_id, ami_buffer);
free_agi_cmd(cmd);
} else {
/* no command so far, wait a bit for a frame to read */
res = ast_waitfor(chan, timeout);
if (res < 0) {
ast_log(LOG_DEBUG, "ast_waitfor returned <= 0 on chan %s\n", chan->name);
break;
}
if (res == 0)
continue;
f = ast_read(chan);
if (!f) {
ast_log(LOG_DEBUG, "No frame read on channel %s, going out ...\n", chan->name);
returnstatus = AGI_RESULT_HANGUP;
break;
}
/* is there any other frame we should care about
besides AST_CONTROL_HANGUP? */
if (f->frametype == AST_FRAME_CONTROL && f->subclass == AST_CONTROL_HANGUP) {
ast_log(LOG_DEBUG, "Got HANGUP frame on channel %s, going out ...\n", chan->name);
ast_frfree(f);
break;
}
ast_frfree(f);
}
}
quit:
/* notify manager users this channel cannot be
controlled anymore by Async AGI */
manager_event(EVENT_FLAG_CALL, "AsyncAGI", "SubEvent: End\r\nChannel: %s\r\n", chan->name);
/* close the pipe */
close(fds[0]);
close(fds[1]);
/* intentionally don't get rid of the datastore. So commands can be
still in the queue in case AsyncAGI gets called again.
Datastore destructor will be called on channel destroy anyway */
return returnstatus;
#undef AGI_BUF_SIZE
#undef AMI_BUF_SIZE
}
/* launch_netscript: The fastagi handler.
FastAGI defaults to port 4573 */
static enum agi_result launch_netscript(char *agiurl, char *argv[], int *fds, int *efd, int *opid)
@ -230,7 +599,7 @@ static enum agi_result launch_netscript(char *agiurl, char *argv[], int *fds, in
return AGI_RESULT_SUCCESS_FAST;
}
static enum agi_result launch_script(char *script, char *argv[], int *fds, int *efd, int *opid)
static enum agi_result launch_script(struct ast_channel *chan, char *script, char *argv[], int *fds, int *efd, int *opid)
{
char tmp[256];
int pid, toast[2], fromast[2], audio[2], x, res;
@ -239,6 +608,8 @@ static enum agi_result launch_script(char *script, char *argv[], int *fds, int *
if (!strncasecmp(script, "agi://", 6))
return launch_netscript(script, argv, fds, efd, opid);
if (!strncasecmp(script, "agi:async", sizeof("agi:async")-1))
return launch_asyncagi(chan, argv, efd);
if (script[0] != '/') {
snprintf(tmp, sizeof(tmp), "%s/%s", ast_config_AST_AGI_DIR, script);
@ -1628,6 +1999,12 @@ static int handle_speechrecognize(struct ast_channel *chan, AGI *agi, int argc,
return RESULT_SUCCESS;
}
static int handle_asyncagi_break(struct ast_channel *chan, AGI *agi, int argc, char *argv[])
{
ast_agi_fdprintf(chan, agi->fd, "200 result=0\n");
return AST_PBX_KEEPALIVE;
}
static char usage_setmusic[] =
" Usage: SET MUSIC ON <on|off> <class>\n"
" Enables/Disables the music on hold generator. If <class> is\n"
@ -1871,6 +2248,10 @@ static char usage_autohangup[] =
" future. Of course it can be hungup before then as well. Setting to 0 will\n"
" cause the autohangup feature to be disabled on this channel.\n";
static char usage_break_aagi[] =
" Usage: ASYNCAGI BREAK\n"
" Break the Async AGI loop.\n";
static char usage_noop[] =
" Usage: NoOp\n"
" Does nothing.\n";
@ -1956,6 +2337,7 @@ static struct agi_command commands[] = {
{ { "speech", "activate", "grammar", NULL }, handle_speechactivategrammar, "Activates a grammar", usage_speechactivategrammar, 0 },
{ { "speech", "deactivate", "grammar", NULL }, handle_speechdeactivategrammar, "Deactivates a grammar", usage_speechdeactivategrammar, 0 },
{ { "speech", "recognize", NULL }, handle_speechrecognize, "Recognizes speech", usage_speechrecognize, 0 },
{ { "asyncagi", "break", NULL }, handle_asyncagi_break, "Break AsyncAGI loop", usage_break_aagi, 0 },
};
static AST_RWLIST_HEAD_STATIC(agi_commands, agi_command);
@ -2500,7 +2882,9 @@ static int agi_exec_full(struct ast_channel *chan, void *data, int enhanced, int
}
}
#endif
res = launch_script(args.argv[0], args.argv, fds, enhanced ? &efd : NULL, &pid);
res = launch_script(chan, args.argv[0], args.argv, fds, enhanced ? &efd : NULL, &pid);
/* Async AGI do not require run_agi(), so just proceed if normal AGI
or Fast AGI are setup with success. */
if (res == AGI_RESULT_SUCCESS || res == AGI_RESULT_SUCCESS_FAST) {
int status = 0;
agi.fd = fds[1];
@ -2516,12 +2900,13 @@ static int agi_exec_full(struct ast_channel *chan, void *data, int enhanced, int
if (efd > -1)
close(efd);
ast_unreplace_sigchld();
}
}
ast_module_user_remove(u);
switch (res) {
case AGI_RESULT_SUCCESS:
case AGI_RESULT_SUCCESS_FAST:
case AGI_RESULT_SUCCESS_ASYNC:
pbx_builtin_setvar_helper(chan, "AGISTATUS", "SUCCESS");
break;
case AGI_RESULT_FAILURE:
@ -2575,6 +2960,7 @@ static int deadagi_exec(struct ast_channel *chan, void *data)
}
static struct ast_cli_entry cli_agi[] = {
AST_CLI_DEFINE(handle_cli_agi_add_cmd, "Add AGI command to a channel in Async AGI"),
AST_CLI_DEFINE(handle_cli_agi_debug, "Enable/Disable AGI debugging"),
AST_CLI_DEFINE(handle_cli_agi_show, "List AGI commands or specific help"),
AST_CLI_DEFINE(handle_cli_agi_dumphtml, "Dumps a list of AGI commands in HTML format")
@ -2586,6 +2972,7 @@ static int unload_module(void)
ast_agi_unregister_multiple(ast_module_info->self, commands, sizeof(commands) / sizeof(struct agi_command));
ast_unregister_application(eapp);
ast_unregister_application(deadapp);
ast_manager_unregister("AGI");
return ast_unregister_application(app);
}
@ -2595,6 +2982,7 @@ static int load_module(void)
ast_agi_register_multiple(ast_module_info->self, commands, sizeof(commands) / sizeof(struct agi_command));
ast_register_application(deadapp, deadagi_exec, deadsynopsis, descrip);
ast_register_application(eapp, eagi_exec, esynopsis, descrip);
ast_manager_register2("AGI", EVENT_FLAG_CALL, action_add_agi_cmd, "Add an AGI command to execute by Async AGI", mandescr_asyncagi);
return ast_register_application(app, agi_exec, synopsis, descrip);
}