major factor of pgsql field handling

This commit is contained in:
Daniel Swarbrick 2011-02-07 22:22:37 +01:00
parent f0a31e1bff
commit bcb2262fdc
2 changed files with 130 additions and 174 deletions

View File

@ -16,10 +16,25 @@
<!-- This is like the info app but after the call is hung up -->
<!--<param name="debug" value="true"/>-->
<param name="default-template" value="example"/>
</settings>
<templates>
<template name="example">"${local_ip_v4}","${caller_id_name}","${caller_id_number}","${destination_number}","${context}","${start_stamp}","${answer_stamp}","${end_stamp}","${duration}","${billsec}","${hangup_cause}","${uuid}","${bleg_uuid}","${accountcode}","${read_codec}","${write_codec}","${sip_hangup_disposition}","${ani}"</template>
</templates>
<schema>
<field var="local_ip_v4"/>
<field var="caller_id_name"/>
<field var="caller_id_number"/>
<field var="destination_number"/>
<field var="context"/>
<field var="start_stamp"/>
<field var="answer_stamp"/>
<field var="end_stamp"/>
<field var="duration" quote="false"/>
<field var="billsec" quote="false"/>
<field var="hangup_cause"/>
<field var="uuid"/>
<field var="bleg_uuid"/>
<field var="accountcode"/>
<field var="read_codec"/>
<field var="write_codec"/>
<!-- <field var="sip_hangup_disposition"/> -->
<!-- <field var="ani"/> -->
</schema>
</configuration>

View File

@ -59,28 +59,34 @@ typedef struct {
switch_mutex_t *mutex;
} cdr_fd_t;
const char *default_template =
"\"${local_ip_v4}\",\"${caller_id_name}\",\"${caller_id_number}\",\"${destination_number}\",\"${context}\",\"${start_stamp}\","
"\"${answer_stamp}\",\"${end_stamp}\",\"${duration}\",\"${billsec}\",\"${hangup_cause}\",\"${uuid}\",\"${bleg_uuid}\",\"${accountcode}\","
"\"${read_codec}\",\"${write_codec}\"";
typedef struct {
char *col_name;
char *var_name;
switch_bool_t quote;
switch_bool_t not_null;
} cdr_field_t;
typedef struct {
char *columns;
cdr_field_t fields[1];
} db_schema_t;
static struct {
switch_memory_pool_t *pool;
switch_hash_t *fd_hash;
switch_hash_t *template_hash;
int shutdown;
char *db_info;
char *db_table;
db_schema_t *db_schema;
PGconn *db_connection;
switch_mutex_t *db_mutex;
int db_online;
cdr_leg_t legs;
char *spool_dir;
spool_format_t spool_format;
int rotate;
int debug;
char *default_template;
switch_mutex_t *db_mutex;
} globals = { 0 };
} globals;
static switch_xml_config_enum_item_t config_opt_cdr_leg_enum[] = {
{"a", CDR_LEG_A},
@ -110,7 +116,6 @@ static switch_xml_config_item_t config_settings[] = {
/* key, type, flags, ptr, default_value, data, syntax, helptext */
SWITCH_CONFIG_ITEM_STRING_STRDUP("db-info", CONFIG_RELOADABLE, &globals.db_info, "dbname=cdr", NULL, NULL),
SWITCH_CONFIG_ITEM_STRING_STRDUP("db-table", CONFIG_RELOADABLE, &globals.db_table, "cdr", NULL, NULL),
SWITCH_CONFIG_ITEM_STRING_STRDUP("default-template", CONFIG_RELOADABLE, &globals.default_template, "default", NULL, NULL),
SWITCH_CONFIG_ITEM("legs", SWITCH_CONFIG_ENUM, CONFIG_RELOADABLE, &globals.legs, (void *) CDR_LEG_A, &config_opt_cdr_leg_enum, "a|b|ab", NULL),
SWITCH_CONFIG_ITEM("spool-format", SWITCH_CONFIG_ENUM, CONFIG_RELOADABLE, &globals.spool_format, (void *) SPOOL_FORMAT_CSV, &config_opt_spool_format_enum, "csv|sql", "Disk spool format to use if SQL insert fails."),
SWITCH_CONFIG_ITEM("rotate-on-hup", SWITCH_CONFIG_BOOL, CONFIG_RELOADABLE, &globals.rotate, SWITCH_FALSE, NULL, NULL, NULL),
@ -239,135 +244,13 @@ static void spool_cdr(const char *path, const char *log_line)
switch_safe_free(log_line_lf);
}
static switch_status_t insert_cdr(const char * const template, const char * const cdr)
static switch_status_t insert_cdr(const char *values)
{
char *columns, *values;
char *p, *q;
unsigned vlen;
char *nullValues, *temp, *tp;
int nullCounter = 0, charCounter = 0;
char *sql = NULL, *path = NULL;
PGresult *res;
if (!template || !*template || !cdr || !*cdr) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Bad parameter\n");
return SWITCH_STATUS_FALSE;
}
/* Build comma-separated list of field names by dropping $ { } ; chars */
switch_strdup(columns, template);
for (p = columns, q = columns; *p; ++p) {
switch (*p) {
case '$': case '"': case '{': case '}': case ';':
break;
default:
*q++ = *p;
}
}
*q = '\0';
/*
* In the expanded vars, replace double quotes (") with single quotes (')
* for correct PostgreSQL syntax, and replace semi-colon with space to
* prevent SQL injection attacks
*/
switch_strdup(values, cdr);
for (p = values; *p; ++p) {
switch(*p) {
case '"':
*p = '\'';
break;
case ';':
*p = ' ';
break;
}
}
vlen = p - values;
/*
* Patch for changing empty strings ('') in the expanded variables to
* PostgreSQL null
*/
for (p = values; *p; ++p) {
if (*p == ',') {
if (charCounter == 0) {
nullCounter++;
}
charCounter = 0;
} else if (*p != ' ' && *p != '\'') {
charCounter++;
}
}
if (charCounter == 0) {
nullCounter++;
}
nullCounter *= 4;
vlen += nullCounter;
switch_zmalloc(nullValues, strlen(values) + nullCounter + 1);
charCounter = 0;
temp = nullValues;
tp = nullValues;
for (p = values; *p; ++tp, ++p) {
if (*p == ',') {
if (charCounter == 0) {
temp++;
*temp = 'n';
temp++;
if (temp == tp) tp++;
*temp = 'u';
temp++;
if (temp == tp) tp++;
*temp = 'l';
temp++;
if (temp == tp) tp++;
*temp = 'l';
temp++;
while (temp != tp) {
*temp = ' ';
temp++;
}
}
charCounter = 0;
temp = tp;
} else if (*p != ' ' && *p != '\'') {
charCounter++;
}
*tp = *p;
}
if (charCounter == 0) {
temp++;
*temp = 'n';
temp++;
if (temp == tp) tp++;
*temp = 'u';
temp++;
if (temp == tp) tp++;
*temp = 'l';
temp++;
if (temp == tp) tp++;
*temp = 'l';
temp++;
while (temp != tp) {
*temp = ' ';
temp++;
}
}
charCounter = 0;
temp = tp;
*tp = 0;
tp = values;
values = nullValues;
switch_safe_free(tp);
sql = switch_mprintf("INSERT INTO %s (%s) VALUES (%s);", globals.db_table, columns, values);
sql = switch_mprintf("INSERT INTO %s (%s) VALUES (%s);", globals.db_table, globals.db_schema->columns, values);
assert(sql);
switch_safe_free(columns);
switch_safe_free(values);
if (globals.debug) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Query: \"%s\"\n", sql);
@ -415,7 +298,7 @@ static switch_status_t insert_cdr(const char * const template, const char * cons
} else {
path = switch_mprintf("%s%scdr-spool.csv", globals.spool_dir, SWITCH_PATH_SEPARATOR);
assert(path);
spool_cdr(path, cdr);
spool_cdr(path, values);
}
switch_safe_free(path);
@ -428,8 +311,10 @@ static switch_status_t my_on_reporting(switch_core_session_t *session)
{
switch_channel_t *channel = switch_core_session_get_channel(session);
switch_status_t status = SWITCH_STATUS_SUCCESS;
const char *template_str = NULL;
char *expanded_vars = NULL;
char *values = NULL, *tmp = NULL, *pq_var = NULL;
const char *var = NULL;
cdr_field_t *cdr_field = NULL;
switch_size_t len, offset;
if (globals.shutdown) {
return SWITCH_STATUS_SUCCESS;
@ -465,24 +350,40 @@ static switch_status_t my_on_reporting(switch_core_session_t *session)
}
}
template_str = (const char *) switch_core_hash_find(globals.template_hash, globals.default_template);
switch_zmalloc(values, 1);
offset = 0;
if (!template_str) {
template_str = default_template;
for (cdr_field = globals.db_schema->fields; cdr_field->var_name; cdr_field++) {
if ((var = switch_channel_get_variable(channel, cdr_field->var_name))) {
/* Allocate sufficient buffer for PQescapeString */
len = strlen(var);
tmp = switch_core_session_alloc(session, len * 2 + 1);
PQescapeString(tmp, var, len);
var = tmp;
}
if (cdr_field->quote) {
if ((cdr_field->not_null == SWITCH_FALSE) && zstr(var)) {
pq_var = switch_mprintf("null,", var);
} else {
pq_var = switch_mprintf("'%s',", var);
}
} else {
pq_var = switch_mprintf("%s,", var);
}
/* Resize values buffer to accomodate next var */
len = strlen(pq_var);
tmp = realloc(values, offset + len);
values = tmp;
memcpy(values + offset, pq_var, len);
switch_safe_free(pq_var);
offset += len;
}
*(values + --offset) = '\0';
expanded_vars = switch_channel_expand_variables(channel, template_str);
if (!expanded_vars) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error expanding CDR variables.\n");
return SWITCH_STATUS_FALSE;
}
insert_cdr(template_str, expanded_vars);
if (expanded_vars != template_str) {
switch_safe_free(expanded_vars);
}
insert_cdr(values);
switch_safe_free(values);
return status;
}
@ -532,9 +433,13 @@ static switch_state_handler_table_t state_handlers = {
static switch_status_t load_config(switch_memory_pool_t *pool)
{
char *cf = "cdr_pg_csv.conf";
switch_xml_t cfg, xml, settings, param;
switch_status_t status = SWITCH_STATUS_SUCCESS;
char *cf = "cdr_pg_csv.conf", *ptr;
switch_xml_t cfg, xml, schema, field;
const char *attr;
int num_fields = 0;
switch_size_t len = 0;
cdr_field_t *cdr_field;
if (globals.db_online) {
PQfinish(globals.db_connection);
@ -544,32 +449,69 @@ static switch_status_t load_config(switch_memory_pool_t *pool)
memset(&globals, 0, sizeof(globals));
switch_core_hash_init(&globals.fd_hash, pool);
switch_core_hash_init(&globals.template_hash, pool);
switch_mutex_init(&globals.db_mutex, SWITCH_MUTEX_NESTED, pool);
globals.pool = pool;
switch_core_hash_insert(globals.template_hash, "default", default_template);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Adding default template.\n");
globals.legs = CDR_LEG_A;
if (switch_xml_config_parse_module_settings("cdr_pg_csv.conf", SWITCH_FALSE, config_settings) != SWITCH_STATUS_SUCCESS) {
if (switch_xml_config_parse_module_settings(cf, SWITCH_FALSE, config_settings) != SWITCH_STATUS_SUCCESS) {
return SWITCH_STATUS_FALSE;
}
if ((xml = switch_xml_open_cfg(cf, &cfg, NULL))) {
if ((settings = switch_xml_child(cfg, "templates"))) {
for (param = switch_xml_child(settings, "template"); param; param = param->next) {
char *var = (char *) switch_xml_attr(param, "name");
if (var) {
char *tpl;
tpl = switch_core_strdup(pool, param->txt);
switch_core_hash_insert(globals.template_hash, var, tpl);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Adding template %s.\n", var);
if ((schema = switch_xml_child(cfg, "schema"))) {
/* Count fields in schema so we can calculate required buffer size */
for (field = switch_xml_child(schema, "field"); field; field = field->next) {
if (switch_xml_attr(field, "var")) {
num_fields++;
}
}
globals.db_schema = switch_core_alloc(pool, (num_fields + 1) * sizeof(cdr_field_t));
cdr_field = globals.db_schema->fields;
for (field = switch_xml_child(schema, "field"); field; field = field->next) {
if ((attr = switch_xml_attr(field, "var"))) {
cdr_field->var_name = switch_core_strdup(pool, attr);
/* Assume SQL column name is the same as FreeSWITCH channel var name, unless specified otherwise */
if ((attr = switch_xml_attr(field, "column"))) {
cdr_field->col_name = switch_core_strdup(pool, attr);
} else {
cdr_field->col_name = switch_core_strdup(pool, cdr_field->var_name);
}
/* Assume all fields should be quoted (treated as strings), unless specified otherwise */
if ((attr = switch_xml_attr(field, "quote")) && !strncmp(attr, "false", 5)) {
cdr_field->quote = SWITCH_FALSE;
} else {
cdr_field->quote = SWITCH_TRUE;
}
/* Assume all fields allow SQL nulls, unless specified otherwise */
if ((attr = switch_xml_attr(field, "not-null")) && !strncmp(attr, "true", 4)) {
cdr_field->not_null = SWITCH_TRUE;
} else {
cdr_field->not_null = SWITCH_FALSE;
}
len += strlen(cdr_field->col_name) + 1;
cdr_field++;
}
}
cdr_field->var_name = 0;
globals.db_schema->columns = switch_core_alloc(pool, len);
ptr = globals.db_schema->columns;
for (cdr_field = globals.db_schema->fields; cdr_field->col_name; cdr_field++) {
len = strlen(cdr_field->col_name);
memcpy(ptr, cdr_field->col_name, len);
ptr += len;
*ptr = ',';
ptr++;
}
*--ptr = '\0';
}
switch_xml_free(xml);
}
@ -596,7 +538,6 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_cdr_pg_csv_load)
switch_core_add_state_handler(&state_handlers);
*module_interface = switch_loadable_module_create_module_interface(pool, modname);
return status;
}