maxmind: Move response processing to a thread.

Move response processing to a thread. Blind attempt at fixing bug 14701.

Bug: 14701
Change-Id: I2b7e6a0669c4784c7c169e659fa37ea2e62c96a3
Reviewed-on: https://code.wireshark.org/review/30837
Petri-Dish: Gerald Combs <gerald@wireshark.org>
Tested-by: Petri Dish Buildbot
Reviewed-by: Anders Broman <a.broman58@gmail.com>
This commit is contained in:
Gerald Combs 2018-11-29 15:59:25 -08:00 committed by Anders Broman
parent af6aa3f76a
commit 1bab83de53
1 changed files with 152 additions and 100 deletions

View File

@ -41,14 +41,22 @@ static mmdb_lookup_t mmdb_not_found;
// - Switch to a different format? I was going to use g_key_file_* to parse
// the mmdbresolve output, but it was easier to just parse it directly.
static GThread *mmdbr_thread;
static GThread *write_mmdbr_stdin_thread;
static GAsyncQueue *mmdbr_request_q; // g_allocated char *
static GMutex mmdbr_pipe_mtx;
static GRWLock mmdbr_pipe_mtx;
// Hashes of mmdb_lookup_t
typedef struct _mmdbr_response_t {
gboolean is_ipv4;
ws_in4_addr ipv4_addr;
ws_in6_addr ipv6_addr;
mmdb_lookup_t mmdb_val;
} mmdb_response_t;
static wmem_map_t *mmdb_ipv4_map;
static wmem_map_t *mmdb_ipv6_map;
static gboolean new_entries;
static GAsyncQueue *mmdbr_response_q; // g_allocated mmdbr_response_t *
static GThread *read_mmdbr_stdout_thread;
// Interned strings
static wmem_map_t *mmdb_str_chunk;
@ -56,7 +64,6 @@ static wmem_map_t *mmdb_ipv6_chunk;
/* Child mmdbresolve process */
static ws_pipe_t mmdbr_pipe; // Requires mutex
static FILE *mmdbr_stdout; // Requires mutex
/* UAT definitions. Copied from oids.c */
typedef struct _maxmind_db_path_t {
@ -91,7 +98,7 @@ static GPtrArray *mmdb_file_arr; // .mmdb files
#define MMDB_DEBUG(...)
#endif
static void mmdb_resolve_stop(gboolean lock_mutex);
static void mmdb_resolve_stop(void);
// Hopefully scanning a few lines asynchronously has less overhead than
// reading in a child thread.
@ -135,20 +142,21 @@ static void init_lookup(mmdb_lookup_t *lookup) {
}
static gboolean mmdbr_pipe_valid(void) {
g_mutex_lock(&mmdbr_pipe_mtx);
g_rw_lock_reader_lock(&mmdbr_pipe_mtx);
gboolean pipe_valid = ws_pipe_valid(&mmdbr_pipe);
g_mutex_unlock(&mmdbr_pipe_mtx);
g_rw_lock_reader_unlock(&mmdbr_pipe_mtx);
return pipe_valid;
}
// Writing to mmdbr_pipe.stdin_fd can block. Do so in a separate thread.
#define MMDB_WAIT_TIME (150 * 1000) // microseconds
static gpointer
write_mmdbr_stdin_worker(gpointer data _U_) {
write_mmdbr_stdin_worker(gpointer sifd_data) {
int stdin_fd = GPOINTER_TO_INT(sifd_data);
while (1) {
if (!mmdbr_pipe_valid()) {
// Should be due to mmdb_resolve_stop.
MMDB_DEBUG("invalid mmdbr pipe. exiting thread.");
MMDB_DEBUG("invalid mmdbr stdin pipe. exiting thread.");
return NULL;
}
@ -158,9 +166,7 @@ write_mmdbr_stdin_worker(gpointer data _U_) {
}
MMDB_DEBUG("write %s ql %d", request, g_async_queue_length(mmdbr_request_q));
g_mutex_lock(&mmdbr_pipe_mtx);
ssize_t req_status = ws_write(mmdbr_pipe.stdin_fd, request, (unsigned int)strlen(request));
g_mutex_unlock(&mmdbr_pipe_mtx);
ssize_t req_status = ws_write(stdin_fd, request, (unsigned int)strlen(request));
if (req_status < 0) {
MMDB_DEBUG("write error %s. exiting thread.", g_strerror(errno));
return NULL;
@ -170,33 +176,38 @@ write_mmdbr_stdin_worker(gpointer data _U_) {
return NULL;
}
static void
read_mmdbr_stdout(void) {
static char cur_addr[WS_INET6_ADDRSTRLEN];
static mmdb_lookup_t cur_lookup;
g_mutex_lock(&mmdbr_pipe_mtx);
if (!ws_pipe_valid(&mmdbr_pipe)) {
g_mutex_unlock(&mmdbr_pipe_mtx);
return;
}
MMDB_DEBUG("read mmdbr %d", ws_pipe_data_available(mmdbr_pipe.stdout_fd));
static gpointer
read_mmdbr_stdout_worker(gpointer sofd_data) {
mmdb_response_t *response = g_new0(mmdb_response_t, 1);
int stdout_fd = GPOINTER_TO_INT(sofd_data);
FILE *stdout_fp = ws_fdopen(stdout_fd, "r");
GString *country_iso = g_string_new("");
GString *country = g_string_new("");
GString *city = g_string_new("");
GString *as_org = g_string_new("");
int read_buf_size = 2048;
char *read_buf = (char *) g_malloc(read_buf_size);
while (ws_pipe_data_available(mmdbr_pipe.stdout_fd)) {
while (1) {
char cur_addr[WS_INET6_ADDRSTRLEN];
if (!mmdbr_pipe_valid()) {
// Should be due to mmdb_resolve_stop.
MMDB_DEBUG("invalid mmdbr stdout pipe. exiting thread.");
break;
}
read_buf[0] = '\0';
char *line = fgets(read_buf, read_buf_size, mmdbr_stdout);
if (!line || ferror(mmdbr_stdout)) {
char *line = fgets(read_buf, read_buf_size, stdout_fp);
if (!line || ferror(stdout_fp)) {
MMDB_DEBUG("read error %s", g_strerror(errno));
mmdb_resolve_stop(FALSE);
break;
}
line = g_strstrip(line);
size_t line_len = strlen(line);
MMDB_DEBUG("read %zd bytes, feof %d: %s", line_len, feof(mmdbr_stdout), line);
MMDB_DEBUG("read %zd bytes, feof %d: %s", line_len, feof(stdout_fp), line);
if (line_len < 1) continue;
char *val_start = strchr(line, ':');
@ -206,67 +217,82 @@ read_mmdbr_stdout(void) {
// [init] or resolved address in square brackets.
line[line_len - 1] = '\0';
g_strlcpy(cur_addr, line + 1, WS_INET6_ADDRSTRLEN);
init_lookup(&cur_lookup);
init_lookup(&response->mmdb_val);
} else if (strcmp(line, RES_STATUS_ERROR) == 0) {
// Error during init.
cur_addr[0] = '\0';
init_lookup(&cur_lookup);
mmdb_resolve_stop(FALSE);
init_lookup(&response->mmdb_val);
break;
} else if (val_start && g_str_has_prefix(line, RES_COUNTRY_ISO_CODE)) {
cur_lookup.found = TRUE;
cur_lookup.country_iso = chunkify_string(val_start);
response->mmdb_val.found = TRUE;
g_string_printf(country_iso, "%s", val_start);
} else if (val_start && g_str_has_prefix(line, RES_COUNTRY_NAMES_EN)) {
cur_lookup.found = TRUE;
cur_lookup.country = chunkify_string(val_start);
response->mmdb_val.found = TRUE;
g_string_printf(country, "%s", val_start);
} else if (val_start && g_str_has_prefix(line, RES_CITY_NAMES_EN)) {
cur_lookup.found = TRUE;
cur_lookup.city = chunkify_string(val_start);
response->mmdb_val.found = TRUE;
g_string_printf(city, "%s", val_start);
} else if (val_start && g_str_has_prefix(line, RES_ASN_ORG)) {
cur_lookup.found = TRUE;
cur_lookup.as_org = chunkify_string(val_start);
response->mmdb_val.found = TRUE;
g_string_printf(as_org, "%s", val_start);
} else if (val_start && g_str_has_prefix(line, RES_ASN_NUMBER)) {
if (ws_strtou32(val_start, NULL, &cur_lookup.as_number)) {
cur_lookup.found = TRUE;
if (ws_strtou32(val_start, NULL, &response->mmdb_val.as_number)) {
response->mmdb_val.found = TRUE;
} else {
MMDB_DEBUG("Invalid as number: %s", val_start);
}
} else if (val_start && g_str_has_prefix(line, RES_LOCATION_LATITUDE)) {
cur_lookup.found = TRUE;
cur_lookup.latitude = g_ascii_strtod(val_start, NULL);
response->mmdb_val.found = TRUE;
response->mmdb_val.latitude = g_ascii_strtod(val_start, NULL);
} else if (val_start && g_str_has_prefix(line, RES_LOCATION_LONGITUDE)) {
cur_lookup.found = TRUE;
cur_lookup.longitude = g_ascii_strtod(val_start, NULL);
response->mmdb_val.found = TRUE;
response->mmdb_val.longitude = g_ascii_strtod(val_start, NULL);
} else if (g_str_has_prefix(line, RES_END)) {
if (cur_lookup.found) {
mmdb_lookup_t *mmdb_val = (mmdb_lookup_t *) wmem_memdup(wmem_epan_scope(), &cur_lookup, sizeof(cur_lookup));
if (strstr(cur_addr, ".")) {
MMDB_DEBUG("inserting v4 %p %s: city %s country %s", (void *) mmdb_val, cur_addr, mmdb_val->city, mmdb_val->country);
guint32 addr;
ws_inet_pton4(cur_addr, &addr);
wmem_map_insert(mmdb_ipv4_map, GUINT_TO_POINTER(addr), mmdb_val);
new_entries = TRUE;
} else if (strstr(cur_addr, ":")) {
MMDB_DEBUG("inserting v6 %p %s: city %s country %s", (void *) mmdb_val, cur_addr, mmdb_val->city, mmdb_val->country);
ws_in6_addr addr;
ws_inet_pton6(cur_addr, &addr);
wmem_map_insert(mmdb_ipv6_map, chunkify_v6_addr(&addr), mmdb_val);
new_entries = TRUE;
if (response->mmdb_val.found) {
if (country_iso->len) {
response->mmdb_val.country_iso = g_strdup(country_iso->str);
}
if (country->len) {
response->mmdb_val.country = g_strdup(country->str);
}
if (city->len) {
response->mmdb_val.city = g_strdup(city->str);
}
if (as_org->len) {
response->mmdb_val.as_org = g_strdup(as_org->str);
}
if (strstr(cur_addr, ".")) {
ws_inet_pton4(cur_addr, &response->ipv4_addr);
response->is_ipv4 = TRUE;
MMDB_DEBUG("queued v4 %s: city %s country %s", cur_addr, response->mmdb_val.city, response->mmdb_val.country);
} else if (strstr(cur_addr, ":")) {
ws_inet_pton6(cur_addr, &response->ipv6_addr);
MMDB_DEBUG("queued v6 %s: city %s country %s", cur_addr, response->mmdb_val.city, response->mmdb_val.country);
}
g_async_queue_push(mmdbr_response_q, response); // Will be freed by maxmind_db_lookup_process.
response = g_new0(mmdb_response_t, 1);
}
cur_addr[0] = '\0';
init_lookup(&cur_lookup);
init_lookup(&response->mmdb_val);
}
}
g_mutex_unlock(&mmdbr_pipe_mtx);
g_string_free(country_iso, TRUE);
g_string_free(country, TRUE);
g_string_free(city, TRUE);
g_string_free(as_org, TRUE);
g_free(read_buf);
g_free(response);
return NULL;
}
/**
* Stop our mmdbresolve process.
* Can be called from any thread.
*/
static void mmdb_resolve_stop(gboolean lock_mutex) {
static void mmdb_resolve_stop(void) {
char *request;
mmdb_response_t *response;
while (mmdbr_request_q && (request = (char *) g_async_queue_try_pop(mmdbr_request_q)) != NULL) {
g_free(request);
@ -277,18 +303,26 @@ static void mmdb_resolve_stop(gboolean lock_mutex) {
return;
}
if (lock_mutex)
g_mutex_lock(&mmdbr_pipe_mtx);
g_rw_lock_writer_lock(&mmdbr_pipe_mtx);
ws_close(mmdbr_pipe.stdin_fd);
fclose(mmdbr_stdout);
ws_close(mmdbr_pipe.stdout_fd);
MMDB_DEBUG("closing pid %d", mmdbr_pipe.pid);
ws_pipe_close(&mmdbr_pipe);
mmdbr_stdout = NULL;
if (lock_mutex)
g_mutex_unlock(&mmdbr_pipe_mtx);
g_rw_lock_writer_unlock(&mmdbr_pipe_mtx);
g_thread_join(mmdbr_thread);
mmdbr_thread = NULL;
g_thread_join(write_mmdbr_stdin_thread);
write_mmdbr_stdin_thread = NULL;
g_thread_join(read_mmdbr_stdout_thread);
read_mmdbr_stdout_thread = NULL;
while (mmdbr_response_q && (response = (mmdb_response_t *) g_async_queue_try_pop(mmdbr_response_q)) != NULL) {
g_free((char *) response->mmdb_val.country_iso);
g_free((char *) response->mmdb_val.country);
g_free((char *) response->mmdb_val.city);
g_free((char *) response->mmdb_val.as_org);
g_free(response);
}
}
/**
@ -299,6 +333,10 @@ static void mmdb_resolve_start(void) {
mmdbr_request_q = g_async_queue_new();
}
if (!mmdbr_response_q) {
mmdbr_response_q = g_async_queue_new();
}
if (!mmdb_ipv4_map) {
mmdb_ipv4_map = wmem_map_new(wmem_epan_scope(), g_direct_hash, g_direct_equal);
}
@ -320,7 +358,7 @@ static void mmdb_resolve_start(void) {
return;
}
mmdb_resolve_stop(TRUE);
mmdb_resolve_stop();
if (mmdb_file_arr->len == 0) {
MMDB_DEBUG("no GeoIP databases found");
@ -337,7 +375,6 @@ static void mmdb_resolve_start(void) {
g_ptr_array_add(args, NULL);
ws_pipe_init(&mmdbr_pipe);
mmdbr_stdout = NULL;
GPid pipe_pid = ws_pipe_spawn_async(&mmdbr_pipe, args);
MMDB_DEBUG("spawned %s pid %d", mmdbresolve, pipe_pid);
@ -353,11 +390,8 @@ static void mmdb_resolve_start(void) {
return;
}
// XXX Should we set O_NONBLOCK similar to dumpcap?
mmdbr_stdout = ws_fdopen(mmdbr_pipe.stdout_fd, "r");
setvbuf(mmdbr_stdout, NULL, _IONBF, 0);
mmdbr_thread = g_thread_new("write_mmdbr_stdin_worker", write_mmdbr_stdin_worker, NULL);
write_mmdbr_stdin_thread = g_thread_new("write_mmdbr_stdin_worker", write_mmdbr_stdin_worker, GINT_TO_POINTER(mmdbr_pipe.stdin_fd));
read_mmdbr_stdout_thread = g_thread_new("read_mmdbr_stdout_worker", read_mmdbr_stdout_worker, GINT_TO_POINTER(mmdbr_pipe.stdout_fd));
}
/**
@ -404,7 +438,7 @@ static void maxmind_db_path_free_cb(void* p) {
static void maxmind_db_cleanup(void) {
guint i;
mmdb_resolve_stop(TRUE);
mmdb_resolve_stop();
/* If we have old data, clear out the whole thing
* and start again. TODO: Just update the ones that
@ -481,7 +515,7 @@ maxmind_db_pref_init(module_t *nameres)
void maxmind_db_pref_cleanup(void)
{
mmdb_resolve_stop(TRUE);
mmdb_resolve_stop();
}
/**
@ -490,13 +524,43 @@ void maxmind_db_pref_cleanup(void)
gboolean maxmind_db_lookup_process(void)
{
if (mmdbr_pipe_valid()) {
read_mmdbr_stdout();
gboolean new_entries = FALSE;
mmdb_response_t *response;
while (mmdbr_response_q && (response = (mmdb_response_t *) g_async_queue_try_pop(mmdbr_response_q)) != NULL) {
mmdb_lookup_t *mmdb_val = (mmdb_lookup_t *) g_memdup(&response->mmdb_val, sizeof(mmdb_lookup_t));
if (response->mmdb_val.country_iso) {
char *country_iso = (char *) response->mmdb_val.country_iso;
mmdb_val->country_iso = chunkify_string(country_iso);
g_free(country_iso);
}
if (response->mmdb_val.country) {
char *country = (char *) response->mmdb_val.country;
mmdb_val->country = chunkify_string(country);
g_free(country);
}
if (response->mmdb_val.city) {
char *city = (char *) response->mmdb_val.city;
mmdb_val->city = chunkify_string(city);
g_free(city);
}
if (response->mmdb_val.as_org) {
char *as_org = (char *) response->mmdb_val.as_org;
mmdb_val->as_org = chunkify_string(as_org);
g_free(as_org);
}
MMDB_DEBUG("popped response v4 %d city %s country %s", response->is_ipv4, mmdb_val->city, mmdb_val->country);
if (response->is_ipv4) {
wmem_map_insert(mmdb_ipv4_map, GUINT_TO_POINTER(response->ipv4_addr), mmdb_val);
} else {
wmem_map_insert(mmdb_ipv6_map, chunkify_v6_addr(&response->ipv6_addr), mmdb_val);
}
new_entries = TRUE;
g_free(response);
}
gboolean prev_ne = new_entries;
new_entries = FALSE;
return prev_ne;
return new_entries;
}
const mmdb_lookup_t *
@ -504,21 +568,15 @@ maxmind_db_lookup_ipv4(guint32 addr) {
mmdb_lookup_t *result = (mmdb_lookup_t *) wmem_map_lookup(mmdb_ipv4_map, GUINT_TO_POINTER(addr));
if (!result) {
// Try again, mainly so that we empty our pipe buffers.
read_mmdbr_stdout();
result = (mmdb_lookup_t *) wmem_map_lookup(mmdb_ipv4_map, GUINT_TO_POINTER(addr));
}
result = &mmdb_not_found;
wmem_map_insert(mmdb_ipv4_map, GUINT_TO_POINTER(addr), result);
if (!result) {
if (mmdbr_pipe_valid()) {
char addr_str[WS_INET_ADDRSTRLEN];
ws_inet_ntop4(&addr, addr_str, WS_INET_ADDRSTRLEN);
MMDB_DEBUG("looking up %s", addr_str);
g_async_queue_push(mmdbr_request_q, g_strdup_printf("%s\n", addr_str));
}
result = &mmdb_not_found;
wmem_map_insert(mmdb_ipv4_map, GUINT_TO_POINTER(addr), result);
}
return result;
@ -529,21 +587,15 @@ maxmind_db_lookup_ipv6(const ws_in6_addr *addr) {
mmdb_lookup_t * result = (mmdb_lookup_t *) wmem_map_lookup(mmdb_ipv6_map, addr->bytes);
if (!result) {
// Try again, mainly so that we empty our pipe buffers.
read_mmdbr_stdout();
result = (mmdb_lookup_t *) wmem_map_lookup(mmdb_ipv6_map, addr->bytes);
}
result = &mmdb_not_found;
wmem_map_insert(mmdb_ipv6_map, chunkify_v6_addr(addr), result);
if (!result) {
if (mmdbr_pipe_valid()) {
char addr_str[WS_INET6_ADDRSTRLEN];
ws_inet_ntop6(addr, addr_str, WS_INET6_ADDRSTRLEN);
MMDB_DEBUG("looking up %s", addr_str);
g_async_queue_push(mmdbr_request_q, g_strdup_printf("%s\n", addr_str));
}
result = &mmdb_not_found;
wmem_map_insert(mmdb_ipv6_map, chunkify_v6_addr(addr), result);
}
return result;