dect
/
asterisk
Archived
13
0
Fork 0

Permit additional CDR columns to be saved in Postgres. Note that these

changes are backward-compatible, so no changes to UPGRADE.txt are
necessary.
(closes issue #9279)
 Reported by: rottenroddy
 Patches: 
       20080125__bug9279.diff.txt uploaded by Corydon76 (license 14)
 Tested by: Corydon76


git-svn-id: http://svn.digium.com/svn/asterisk/trunk@104101 f38db490-d61c-443f-a65b-d21fe96a405b
This commit is contained in:
tilghman 2008-02-25 23:04:20 +00:00
parent e323ca23a7
commit e5122f6dca
2 changed files with 243 additions and 50 deletions

26
CHANGES
View File

@ -450,6 +450,29 @@ Logger changes
and to ensure that the oldest log file gets deleted.
* Added realtime support for the queue log
Call Detail Records
-------------------
* The cdr_manager module has a [mappings] feature, like cdr_custom,
to add fields to the manager event from the CDR variables.
* Added cdr_adaptive_odbc, a new module that adapts to the structure of your
backend database CDR table. Specifically, additional, non-standard
columns are supported, merely by setting the corresponding CDR variable in
your dialplan. In addition, you may alias any column to another name (for
example, if you want the 'src' CDR variable to be column 'ANI' in the DB,
simply "alias src => ANI" in the configuration file). Records may be
posted to more than one backend, simply by specifying multiple categories
in the configuration file. And finally, you may filter which CDRs get
posted to each backend, by specifying a filter (which the record must
match) for the particular category. Filters are additive (meaning all
rules must match to post that CDR).
* The Postgres CDR module now supports some features of the cdr_adaptive_odbc
module. Specifically, you may add additional columns into the table and
they will be set, if you set the corresponding CDR variable name. Also,
if you omit columns in your database table, they will be silently skipped
(but a record will still be inserted, based on what columns remain). Note
that the other two features from cdr_adaptive_odbc (alias and filter) are
not currently supported.
Miscellaneous New Modules
-------------------------
* Added a new CDR module, cdr_sqlite3_custom.
@ -494,8 +517,6 @@ Miscellaneous
* Added maxfiles option to options section of asterisk.conf which allows you to specify
what Asterisk should set as the maximum number of open files when it loads.
* Added the jittertargetextra configuration option.
* The cdr_manager module has a [mappings] feature, like cdr_custom,
to add fields to the manager event from the CDR variables.
* Added support for setting the CoS for VLAN traffic (802.1p). See the sample
configuration files for the IP channel drivers. The new option is "cos".
This information is also documented in doc/qos.tex, or the IP Quality of Service
@ -523,3 +544,4 @@ Miscellaneous
* Added a compiler flag, CHANNEL_TRACE, which permits channel tracing to be
turned on, via the CHANNEL(trace) dialplan function. Could be useful for
dialplan debugging.

View File

@ -50,29 +50,68 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/cdr.h"
#include "asterisk/module.h"
#define DATE_FORMAT "%Y-%m-%d %T"
#define DATE_FORMAT "'%Y-%m-%d %T'"
static char *name = "pgsql";
static char *config = "cdr_pgsql.conf";
static char *pghostname = NULL, *pgdbname = NULL, *pgdbuser = NULL, *pgpassword = NULL, *pgdbport = NULL, *table = NULL;
static int connected = 0;
static int maxsize = 512, maxsize2 = 512;
AST_MUTEX_DEFINE_STATIC(pgsql_lock);
static PGconn *conn = NULL;
struct columns {
char *name;
char *type;
int len;
AST_RWLIST_ENTRY(columns) list;
};
static AST_RWLIST_HEAD_STATIC(psql_columns, columns);
#define LENGTHEN_BUF1(size) \
do { \
/* Lengthen buffer, if necessary */ \
if ((newsize = lensql + (size) + 3) > sizesql) { \
if ((tmp = ast_realloc(sql, (newsize / 512 + 1) * 512))) { \
sql = tmp; \
sizesql = (newsize / 512 + 1) * 512; \
} else { \
ast_log(LOG_ERROR, "Unable to allocate sufficient memory. Insert CDR failed.\n"); \
ast_free(sql); \
ast_free(sql2); \
AST_RWLIST_UNLOCK(&psql_columns); \
return -1; \
} \
} \
} while (0)
#define LENGTHEN_BUF2(size) \
do { \
if ((newsize = lensql2 + (size) + 3) > sizesql2) { \
if ((tmp = ast_realloc(sql2, (newsize / 512 + 1) * 512))) { \
sql2 = tmp; \
sizesql2 = (newsize / 512 + 1) * 512; \
} else { \
ast_log(LOG_ERROR, "Unable to allocate sufficient memory. Insert CDR failed.\n"); \
ast_free(sql); \
ast_free(sql2); \
AST_RWLIST_UNLOCK(&psql_columns); \
return -1; \
} \
} \
} while (0)
static int pgsql_log(struct ast_cdr *cdr)
{
struct ast_tm tm;
char sqlcmd[2048] = "", timestr[128];
char *pgerror;
PGresult *result;
ast_mutex_lock(&pgsql_lock);
ast_localtime(&cdr->start, &tm, NULL);
ast_strftime(timestr, sizeof(timestr), DATE_FORMAT, &tm);
if ((!connected) && pghostname && pgdbuser && pgpassword && pgdbname) {
conn = PQsetdbLogin(pghostname, pgdbport, NULL, NULL, pgdbname, pgdbuser, pgpassword);
if (PQstatus(conn) != CONNECTION_BAD) {
@ -87,49 +126,135 @@ static int pgsql_log(struct ast_cdr *cdr)
}
if (connected) {
char *clid=NULL, *dcontext=NULL, *channel=NULL, *dstchannel=NULL, *lastapp=NULL, *lastdata=NULL;
char *src=NULL, *dst=NULL, *uniqueid=NULL, *userfield=NULL;
int pgerr;
struct columns *cur;
int lensql, lensql2, sizesql = maxsize, sizesql2 = maxsize2, newsize;
char *sql = ast_calloc(sizeof(char), sizesql), *sql2 = ast_calloc(sizeof(char), sizesql2), *tmp, *value;
char buf[257], escapebuf[513];
lensql = snprintf(sql, sizesql, "INSERT INTO %s (", table);
lensql2 = snprintf(sql2, sizesql2, " VALUES (");
AST_RWLIST_RDLOCK(&psql_columns);
AST_RWLIST_TRAVERSE(&psql_columns, cur, list) {
/* For fields not set, simply skip them */
ast_cdr_getvar(cdr, cur->name, &value, buf, sizeof(buf), 0, 0);
if (!value)
continue;
LENGTHEN_BUF1(strlen(cur->name));
lensql += snprintf(sql + lensql, sizesql - lensql, "%s,", cur->name);
/* Maximum space needed would be if all characters needed to be escaped, plus a trailing NULL */
if ((clid = alloca(strlen(cdr->clid) * 2 + 1)) != NULL)
PQescapeStringConn(conn, clid, cdr->clid, strlen(cdr->clid), &pgerr);
if ((dcontext = alloca(strlen(cdr->dcontext) * 2 + 1)) != NULL)
PQescapeStringConn(conn, dcontext, cdr->dcontext, strlen(cdr->dcontext), &pgerr);
if ((channel = alloca(strlen(cdr->channel) * 2 + 1)) != NULL)
PQescapeStringConn(conn, channel, cdr->channel, strlen(cdr->channel), &pgerr);
if ((dstchannel = alloca(strlen(cdr->dstchannel) * 2 + 1)) != NULL)
PQescapeStringConn(conn, dstchannel, cdr->dstchannel, strlen(cdr->dstchannel), &pgerr);
if ((lastapp = alloca(strlen(cdr->lastapp) * 2 + 1)) != NULL)
PQescapeStringConn(conn, lastapp, cdr->lastapp, strlen(cdr->lastapp), &pgerr);
if ((lastdata = alloca(strlen(cdr->lastdata) * 2 + 1)) != NULL)
PQescapeStringConn(conn, lastdata, cdr->lastdata, strlen(cdr->lastdata), &pgerr);
if ((uniqueid = alloca(strlen(cdr->uniqueid) * 2 + 1)) != NULL)
PQescapeStringConn(conn, uniqueid, cdr->uniqueid, strlen(cdr->uniqueid), &pgerr);
if ((userfield = alloca(strlen(cdr->userfield) * 2 + 1)) != NULL)
PQescapeStringConn(conn, userfield, cdr->userfield, strlen(cdr->userfield), &pgerr);
if ((src = alloca(strlen(cdr->src) * 2 + 1)) != NULL)
PQescapeStringConn(conn, src, cdr->src, strlen(cdr->src), &pgerr);
if ((dst = alloca(strlen(cdr->dst) * 2 + 1)) != NULL)
PQescapeStringConn(conn, dst, cdr->dst, strlen(cdr->dst), &pgerr);
/* Check for all alloca failures above at once */
if ((!clid) || (!dcontext) || (!channel) || (!dstchannel) || (!lastapp) || (!lastdata) || (!uniqueid) || (!userfield) || (!src) || (!dst)) {
ast_log(LOG_ERROR, "cdr_pgsql: Out of memory error (insert fails)\n");
ast_mutex_unlock(&pgsql_lock);
return -1;
}
if (strcmp(cur->name, "start") == 0 || strcmp(cur->name, "calldate") == 0) {
if (strncmp(cur->type, "int", 3) == 0) {
LENGTHEN_BUF2(12);
lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%ld", cdr->start.tv_sec);
} else if (strncmp(cur->type, "float", 5) == 0) {
LENGTHEN_BUF2(30);
lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%f", (double)cdr->start.tv_sec + (double)cdr->start.tv_usec / 1000000.0);
} else {
/* char, hopefully */
LENGTHEN_BUF2(30);
ast_localtime(&cdr->start, &tm, NULL);
lensql2 += ast_strftime(sql2 + lensql2, sizesql2 - lensql2, DATE_FORMAT, &tm);
}
} else if (strcmp(cur->name, "answer") == 0) {
if (strncmp(cur->type, "int", 3) == 0) {
LENGTHEN_BUF2(12);
lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%ld", cdr->answer.tv_sec);
} else if (strncmp(cur->type, "float", 5) == 0) {
LENGTHEN_BUF2(30);
lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%f", (double)cdr->answer.tv_sec + (double)cdr->answer.tv_usec / 1000000.0);
} else {
/* char, hopefully */
LENGTHEN_BUF2(30);
ast_localtime(&cdr->start, &tm, NULL);
lensql2 += ast_strftime(sql2 + lensql2, sizesql2 - lensql2, DATE_FORMAT, &tm);
}
} else if (strcmp(cur->name, "end") == 0) {
if (strncmp(cur->type, "int", 3) == 0) {
LENGTHEN_BUF2(12);
lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%ld", cdr->end.tv_sec);
} else if (strncmp(cur->type, "float", 5) == 0) {
LENGTHEN_BUF2(30);
lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%f", (double)cdr->end.tv_sec + (double)cdr->end.tv_usec / 1000000.0);
} else {
/* char, hopefully */
LENGTHEN_BUF2(30);
ast_localtime(&cdr->end, &tm, NULL);
lensql2 += ast_strftime(sql2 + lensql2, sizesql2 - lensql2, DATE_FORMAT, &tm);
}
} else if (strcmp(cur->name, "duration") == 0 || strcmp(cur->name, "billsec") == 0) {
if (cur->type[0] == 'i') {
/* Get integer, no need to escape anything */
ast_cdr_getvar(cdr, cur->name, &value, buf, sizeof(buf), 0, 0);
LENGTHEN_BUF2(12);
lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%s", value);
} else if (strncmp(cur->type, "float", 5) == 0) {
struct timeval *tv = cur->name[0] == 'd' ? &cdr->start : &cdr->answer;
LENGTHEN_BUF2(30);
lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%f", (double)cdr->end.tv_sec - tv->tv_sec + cdr->end.tv_usec / 1000000.0 - tv->tv_usec / 1000000.0);
} else {
/* Char field, probably */
struct timeval *tv = cur->name[0] == 'd' ? &cdr->start : &cdr->answer;
LENGTHEN_BUF2(30);
lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "'%f'", (double)cdr->end.tv_sec - tv->tv_sec + cdr->end.tv_usec / 1000000.0 - tv->tv_usec / 1000000.0);
}
} else if (strcmp(cur->name, "disposition") == 0 || strcmp(cur->name, "amaflags") == 0) {
if (strncmp(cur->type, "int", 3) == 0) {
/* Integer, no need to escape anything */
ast_cdr_getvar(cdr, cur->name, &value, buf, sizeof(buf), 0, 1);
LENGTHEN_BUF2(12);
lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%s", value);
} else {
/* Although this is a char field, there are no special characters in the values for these fields */
ast_cdr_getvar(cdr, cur->name, &value, buf, sizeof(buf), 0, 0);
LENGTHEN_BUF2(30);
lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "'%s'", value);
}
} else {
/* Arbitrary field, could be anything */
ast_cdr_getvar(cdr, cur->name, &value, buf, sizeof(buf), 0, 0);
if (strncmp(cur->type, "int", 3) == 0) {
long long whatever;
if (value && sscanf(value, "%lld", &whatever) == 1) {
LENGTHEN_BUF2(25);
lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%lld", whatever);
} else {
LENGTHEN_BUF2(1);
lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "0");
}
} else if (strncmp(cur->type, "float", 5) == 0) {
long double whatever;
if (value && sscanf(value, "%Lf", &whatever) == 1) {
LENGTHEN_BUF2(50);
lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "%30Lf", whatever);
} else {
LENGTHEN_BUF2(1);
lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "0");
}
/* XXX Might want to handle dates, times, and other misc fields here XXX */
} else {
if (value)
PQescapeStringConn(conn, escapebuf, value, strlen(value), NULL);
else
escapebuf[0] = '\0';
LENGTHEN_BUF2(strlen(escapebuf) + 2);
lensql2 += snprintf(sql2 + lensql2, sizesql2 - lensql2, "'%s'", escapebuf);
}
}
LENGTHEN_BUF2(1);
strcat(sql2 + lensql2, ",");
lensql2++;
}
AST_RWLIST_UNLOCK(&psql_columns);
LENGTHEN_BUF1(lensql2);
sql[lensql - 1] = ')';
sql2[lensql2 - 1] = ')';
strcat(sql + lensql, sql2);
ast_verb(11, "[%s]\n", sql);
ast_debug(2, "cdr_pgsql: inserting a CDR record.\n");
snprintf(sqlcmd,sizeof(sqlcmd),"INSERT INTO %s (calldate,clid,src,dst,dcontext,channel,dstchannel,"
"lastapp,lastdata,duration,billsec,disposition,amaflags,accountcode,uniqueid,userfield) VALUES"
" ('%s','%s','%s','%s','%s', '%s','%s','%s','%s',%ld,%ld,'%s',%ld,'%s','%s','%s')",
table, timestr, clid, src, dst, dcontext, channel, dstchannel, lastapp, lastdata,
cdr->duration,cdr->billsec,ast_cdr_disp2str(cdr->disposition),cdr->amaflags, cdr->accountcode, uniqueid, userfield);
ast_debug(3, "cdr_pgsql: SQL command executed: %s\n",sqlcmd);
/* Test to be sure we're still connected... */
/* If we're connected, and connection is working, good. */
/* Otherwise, attempt reconnect. If it fails... sorry... */
@ -152,7 +277,7 @@ static int pgsql_log(struct ast_cdr *cdr)
return -1;
}
}
result = PQexec(conn, sqlcmd);
result = PQexec(conn, sql);
if (PQresultStatus(result) != PGRES_COMMAND_OK) {
pgerror = PQresultErrorMessage(result);
ast_log(LOG_ERROR,"cdr_pgsql: Failed to insert call detail record into database!\n");
@ -163,7 +288,7 @@ static int pgsql_log(struct ast_cdr *cdr)
ast_log(LOG_ERROR, "cdr_pgsql: Connection reestablished.\n");
connected = 1;
PQclear(result);
result = PQexec(conn, sqlcmd);
result = PQexec(conn, sql);
if (PQresultStatus(result) != PGRES_COMMAND_OK) {
pgerror = PQresultErrorMessage(result);
ast_log(LOG_ERROR,"cdr_pgsql: HARD ERROR! Attempted reconnection failed. DROPPING CALL RECORD!\n");
@ -181,8 +306,14 @@ static int pgsql_log(struct ast_cdr *cdr)
}
static int unload_module(void)
{
{
struct columns *cur;
ast_cdr_unregister(name);
/* Give all threads time to finish */
usleep(1);
PQfinish(conn);
if (pghostname)
ast_free(pghostname);
if (pgdbname)
@ -195,7 +326,13 @@ static int unload_module(void)
ast_free(pgdbport);
if (table)
ast_free(table);
ast_cdr_unregister(name);
AST_RWLIST_WRLOCK(&psql_columns);
while ((cur = AST_RWLIST_REMOVE_HEAD(&psql_columns, list))) {
ast_free(cur);
}
AST_RWLIST_UNLOCK(&psql_columns);
return 0;
}
@ -203,6 +340,8 @@ static int config_module(int reload)
{
struct ast_variable *var;
char *pgerror;
struct columns *cur;
PGresult *result;
const char *tmp;
struct ast_config *cfg;
struct ast_flags config_flags = { reload ? CONFIG_FLAG_FILEUNCHANGED : 0 };
@ -304,8 +443,40 @@ static int config_module(int reload)
conn = PQsetdbLogin(pghostname, pgdbport, NULL, NULL, pgdbname, pgdbuser, pgpassword);
if (PQstatus(conn) != CONNECTION_BAD) {
char sqlcmd[256];
char *fname, *ftype, *flen;
int i, rows;
ast_debug(1, "Successfully connected to PostgreSQL database.\n");
connected = 1;
/* Query the columns */
snprintf(sqlcmd, sizeof(sqlcmd), "select a.attname, t.typname, a.attlen from pg_class c, pg_attribute a, pg_type t where c.oid = a.attrelid and a.atttypid = t.oid and (a.attnum > 0) and c.relname = '%s' order by c.relname, attnum", table);
result = PQexec(conn, sqlcmd);
if (PQresultStatus(result) != PGRES_TUPLES_OK) {
pgerror = PQresultErrorMessage(result);
ast_log(LOG_ERROR, "cdr_pgsql: Failed to query database columns: %s\n", pgerror);
PQclear(result);
unload_module();
return AST_MODULE_LOAD_DECLINE;
}
rows = PQntuples(result);
for (i = 0; i < rows; i++) {
fname = PQgetvalue(result, i, 0);
ftype = PQgetvalue(result, i, 1);
flen = PQgetvalue(result, i, 2);
ast_verb(4, "Found column '%s' of type '%s'\n", fname, ftype);
cur = ast_calloc(1, sizeof(*cur) + strlen(fname) + strlen(ftype) + 2);
if (cur) {
sscanf(flen, "%d", &cur->len);
cur->name = (char *)cur + sizeof(*cur);
cur->type = (char *)cur + sizeof(*cur) + strlen(fname) + 1;
strcpy(cur->name, fname);
strcpy(cur->type, ftype);
AST_RWLIST_INSERT_TAIL(&psql_columns, cur, list);
}
}
PQclear(result);
} else {
pgerror = PQerrorMessage(conn);
ast_log(LOG_ERROR, "cdr_pgsql: Unable to connect to database server %s. CALLS WILL NOT BE LOGGED!!\n", pghostname);