merge: Allow unlimited number of files

merging is embarrassingly parallel, so long as we preserve ordering
when merging intermediate results, so we can process files in
batches if we hit RLIMIT_NOFILE and have the same output[1] we would
otherwise.

[1] One minor exception is in the SHB comments, but currently those aren't
written properly anyway. Another minor exception is that the ordering
of addresses within the NRBs can vary (be reversed), but both of these
are underlying issues, not having to do with the algorithm of batch
processing and merging.

Wireshark currently supports merging lots of files at once through
drag and drop, but not through the File->Merge dialog; we might
want to change the latter.

Fix #17598
This commit is contained in:
John Thacker 2023-09-21 00:00:36 -04:00
parent a6c406d718
commit c207041da6
1 changed files with 176 additions and 100 deletions

View File

@ -17,6 +17,7 @@
#define WS_LOG_DOMAIN LOG_DOMAIN_WIRETAP
#include <stdlib.h>
#include <errno.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
@ -29,11 +30,11 @@
#include <wsutil/filesystem.h>
#include "wsutil/os_version_info.h"
#include <wsutil/report_message.h>
#include <wsutil/wslog.h>
#include <wsutil/ws_assert.h>
static const char* idb_merge_mode_strings[] = {
/* IDB_MERGE_MODE_NONE */
"none",
@ -106,9 +107,10 @@ add_idb_index_map(merge_in_file_t *in_file, const guint orig_index _U_, const gu
* @param err wiretap error, if failed
* @param err_info wiretap error string, if failed
* @param err_fileno file on which open failed, if failed
* @return TRUE if all files could be opened, FALSE otherwise
* @return The number of input files opened, which can be less than
* the number requested if the limit of open file descriptors is reached.
*/
static gboolean
static guint
merge_open_in_files(guint in_file_count, const char *const *in_file_names,
merge_in_file_t **out_files, merge_progress_callback_t* cb,
int *err, gchar **err_info, guint *err_fileno)
@ -129,12 +131,30 @@ merge_open_in_files(guint in_file_count, const char *const *in_file_names,
files[i].packet_num = 0;
if (!files[i].wth) {
/* Close the files we've already opened. */
for (j = 0; j < i; j++)
cleanup_in_file(&files[j]);
g_free(files);
*err_fileno = i;
return FALSE;
if (*err == EMFILE && i > 2) {
/* We need at least two opened files to merge things if we
* are batch processing. (If there was only one file to open
* then we can "merge" a single file so long as we don't get
* EMFILE, even though that's pointless.)
*/
#ifdef _WIN32
report_warning("Requested opening %u files but could only open %u: %s\nUsing temporary files to batch process.", in_file_count, i, g_strerror(*err));
#else
report_warning("Requested opening %u files but could only open %u: %s\nUsing temporary files to batch process (try ulimit -n to adjust the limit).", in_file_count, i, g_strerror(*err));
#endif
in_file_count = i;
files_size = in_file_count * sizeof(merge_in_file_t);
files = (merge_in_file_t *)g_realloc(files, files_size);
*err = 0;
break;
} else {
/* Close the files we've already opened. */
for (j = 0; j < i; j++)
cleanup_in_file(&files[j]);
g_free(files);
*err_fileno = i;
return 0;
}
}
size = wtap_file_size(files[i].wth, err);
if (size == -1) {
@ -142,7 +162,7 @@ merge_open_in_files(guint in_file_count, const char *const *in_file_names,
cleanup_in_file(&files[j]);
g_free(files);
*err_fileno = i;
return FALSE;
return 0;
}
wtap_rec_init(&files[i].rec);
ws_buffer_init(&files[i].frame_buffer, 1514);
@ -154,7 +174,7 @@ merge_open_in_files(guint in_file_count, const char *const *in_file_names,
cb->callback_func(MERGE_EVENT_INPUT_FILES_OPENED, 0, files, in_file_count, cb->data);
*out_files = files;
return TRUE;
return in_file_count;
}
/** Close the input files again.
@ -1112,6 +1132,13 @@ merge_process_packets(wtap_dumper *pdh, const int file_type,
return status;
}
static void
tempfile_free(gpointer data) {
char *filename = (char*)data;
ws_unlink(filename);
g_free(filename);
}
static merge_result
merge_files_common(const gchar* out_filename, /* filename in normal output mode,
optional tempdir in tempfile mode (NULL for OS default) */
@ -1125,12 +1152,15 @@ merge_files_common(const gchar* out_filename, /* filename in normal output mode,
{
merge_in_file_t *in_files = NULL;
int frame_type = WTAP_ENCAP_PER_PACKET;
unsigned open_file_count;
merge_result status = MERGE_OK;
wtap_dumper *pdh;
GArray *shb_hdrs = NULL;
wtapng_iface_descriptions_t *idb_inf = NULL;
GArray *nrb_combined = NULL;
GArray *dsb_combined = NULL;
GPtrArray *temp_files = NULL;
int dup_fd;
ws_assert(in_file_count > 0);
ws_assert(in_filenames != NULL);
@ -1144,112 +1174,158 @@ merge_files_common(const gchar* out_filename, /* filename in normal output mode,
ws_debug("merge_files: begin");
/* open the input files */
if (!merge_open_in_files(in_file_count, in_filenames, &in_files, cb,
err, err_info, err_fileno)) {
ws_debug("merge_open_in_files() failed with err=%d", *err);
*err_framenum = 0;
return MERGE_ERR_CANT_OPEN_INFILE;
}
for (unsigned total_file_count = 0; total_file_count < in_file_count && status == MERGE_OK; total_file_count += open_file_count) {
if (snaplen == 0) {
/* Snapshot length not specified - default to the maximum. */
snaplen = WTAP_MAX_PACKET_SIZE_STANDARD;
}
/* Reserve a file descriptor for the output; if we run out of file
* descriptors we will end up writing to a temp file instead of the
* file or stdout originally requested, but this simplifies EMFILE
* handling.
*/
dup_fd = ws_dup(1);
if (dup_fd == -1) {
return MERGE_ERR_CANT_OPEN_OUTFILE;
}
/*
* This doesn't tell us that much. It tells us what to set the outfile's
* encap type to, but that's all - for example, it does *not* tell us
* whether the input files had the same number of IDBs, for the same exact
* interfaces, and only one IDB each, so it doesn't actually tell us
* whether we can merge IDBs into one or not.
*
* XXX: If an input file is WTAP_ENCAP_PER_PACKET, just because the
* output file format (e.g. pcapng) can write WTAP_ENCAP_PER_PACKET,
* that doesn't mean that the format can actually write all the IDBs.
*/
frame_type = merge_select_frame_type(file_type, in_file_count, in_files);
ws_debug("got frame_type=%d", frame_type);
/* open the input files */
open_file_count = merge_open_in_files(in_file_count - total_file_count, &in_filenames[total_file_count], &in_files, cb, err, err_info, err_fileno);
if (open_file_count == 0) {
ws_debug("merge_open_in_files() failed with err=%d", *err);
*err_framenum = 0;
return MERGE_ERR_CANT_OPEN_INFILE;
}
if (cb)
cb->callback_func(MERGE_EVENT_FRAME_TYPE_SELECTED, frame_type, in_files, in_file_count, cb->data);
if (snaplen == 0) {
/* Snapshot length not specified - default to the maximum. */
snaplen = WTAP_MAX_PACKET_SIZE_STANDARD;
}
/* prepare the outfile */
wtap_dump_params params = WTAP_DUMP_PARAMS_INIT;
params.encap = frame_type;
params.snaplen = snaplen;
/*
* Does this file type support identifying the interfaces on
* which packets arrive?
*
* That mean that the abstract interface provided by libwiretap
* involves WTAP_BLOCK_IF_ID_AND_INFO blocks.
*/
if (wtap_file_type_subtype_supports_block(file_type,
WTAP_BLOCK_IF_ID_AND_INFO) != BLOCK_NOT_SUPPORTED) {
shb_hdrs = create_shb_header(in_files, in_file_count, app_name);
ws_debug("SHB created");
/*
* This doesn't tell us that much. It tells us what to set the outfile's
* encap type to, but that's all - for example, it does *not* tell us
* whether the input files had the same number of IDBs, for the same exact
* interfaces, and only one IDB each, so it doesn't actually tell us
* whether we can merge IDBs into one or not.
*
* XXX: If an input file is WTAP_ENCAP_PER_PACKET, just because the
* output file format (e.g. pcapng) can write WTAP_ENCAP_PER_PACKET,
* that doesn't mean that the format can actually write all the IDBs.
*/
frame_type = merge_select_frame_type(file_type, open_file_count, in_files);
ws_debug("got frame_type=%d", frame_type);
idb_inf = generate_merged_idbs(in_files, in_file_count, &mode);
ws_debug("IDB merge operation complete, got %u IDBs", idb_inf ? idb_inf->interface_data->len : 0);
if (cb)
cb->callback_func(MERGE_EVENT_FRAME_TYPE_SELECTED, frame_type, in_files, open_file_count, cb->data);
/* prepare the outfile */
wtap_dump_params params = WTAP_DUMP_PARAMS_INIT;
params.encap = frame_type;
params.snaplen = snaplen;
/*
* Does this file type support identifying the interfaces on
* which packets arrive?
*
* That mean that the abstract interface provided by libwiretap
* involves WTAP_BLOCK_IF_ID_AND_INFO blocks.
*/
if (wtap_file_type_subtype_supports_block(file_type,
WTAP_BLOCK_IF_ID_AND_INFO) != BLOCK_NOT_SUPPORTED) {
shb_hdrs = create_shb_header(in_files, open_file_count, app_name);
ws_debug("SHB created");
idb_inf = generate_merged_idbs(in_files, open_file_count, &mode);
ws_debug("IDB merge operation complete, got %u IDBs", idb_inf ? idb_inf->interface_data->len : 0);
/* XXX other blocks like ISB are now discarded. */
params.shb_hdrs = shb_hdrs;
params.idb_inf = idb_inf;
}
if (wtap_file_type_subtype_supports_block(file_type,
WTAP_BLOCK_NAME_RESOLUTION) != BLOCK_NOT_SUPPORTED) {
nrb_combined = g_array_new(FALSE, FALSE, sizeof(wtap_block_t));
params.nrbs_growing = nrb_combined;
}
if (wtap_file_type_subtype_supports_block(file_type,
WTAP_BLOCK_DECRYPTION_SECRETS) != BLOCK_NOT_SUPPORTED) {
dsb_combined = g_array_new(FALSE, FALSE, sizeof(wtap_block_t));
params.dsbs_growing = dsb_combined;
}
ws_close(dup_fd);
if (open_file_count < in_file_count) {
if (temp_files == NULL) {
temp_files = g_ptr_array_new_with_free_func(tempfile_free);
}
char* temp_filename;
/* If out_filenamep is not null, then out_filename is the
* desired tempdir, so let's use that.
*/
pdh = wtap_dump_open_tempfile(out_filenamep ? out_filename : NULL,
&temp_filename,
pfx ? pfx : "mergecap", file_type,
WTAP_UNCOMPRESSED, &params, err,
err_info);
if (pdh) {
g_ptr_array_add(temp_files, temp_filename);
}
} else if (out_filenamep) {
pdh = wtap_dump_open_tempfile(out_filename, out_filenamep, pfx, file_type,
WTAP_UNCOMPRESSED, &params, err,
err_info);
} else if (out_filename) {
pdh = wtap_dump_open(out_filename, file_type, WTAP_UNCOMPRESSED,
&params, err, err_info);
} else {
pdh = wtap_dump_open_stdout(file_type, WTAP_UNCOMPRESSED, &params, err,
err_info);
}
if (pdh == NULL) {
merge_close_in_files(open_file_count, in_files);
g_free(in_files);
wtap_block_array_free(shb_hdrs);
wtap_free_idb_info(idb_inf);
if (nrb_combined) {
g_array_free(nrb_combined, TRUE);
}
if (dsb_combined) {
g_array_free(dsb_combined, TRUE);
}
if (temp_files) {
g_ptr_array_free(temp_files, TRUE);
}
*err_framenum = 0;
return MERGE_ERR_CANT_OPEN_OUTFILE;
}
if (cb)
cb->callback_func(MERGE_EVENT_READY_TO_MERGE, 0, in_files, open_file_count, cb->data);
status = merge_process_packets(pdh, file_type, in_files, open_file_count,
do_append, mode, snaplen, cb,
idb_inf, nrb_combined, dsb_combined,
err, err_info,
err_fileno, err_framenum);
/* XXX other blocks like ISB are now discarded. */
params.shb_hdrs = shb_hdrs;
params.idb_inf = idb_inf;
}
if (wtap_file_type_subtype_supports_block(file_type,
WTAP_BLOCK_NAME_RESOLUTION) != BLOCK_NOT_SUPPORTED) {
nrb_combined = g_array_new(FALSE, FALSE, sizeof(wtap_block_t));
params.nrbs_growing = nrb_combined;
}
if (wtap_file_type_subtype_supports_block(file_type,
WTAP_BLOCK_DECRYPTION_SECRETS) != BLOCK_NOT_SUPPORTED) {
dsb_combined = g_array_new(FALSE, FALSE, sizeof(wtap_block_t));
params.dsbs_growing = dsb_combined;
}
if (out_filenamep) {
pdh = wtap_dump_open_tempfile(out_filename, out_filenamep, pfx, file_type,
WTAP_UNCOMPRESSED, &params, err,
err_info);
} else if (out_filename) {
pdh = wtap_dump_open(out_filename, file_type, WTAP_UNCOMPRESSED,
&params, err, err_info);
} else {
pdh = wtap_dump_open_stdout(file_type, WTAP_UNCOMPRESSED, &params, err,
err_info);
}
if (pdh == NULL) {
merge_close_in_files(in_file_count, in_files);
g_free(in_files);
wtap_block_array_free(shb_hdrs);
wtap_free_idb_info(idb_inf);
if (nrb_combined) {
g_array_free(nrb_combined, TRUE);
nrb_combined = NULL;
}
if (dsb_combined) {
g_array_free(dsb_combined, TRUE);
dsb_combined = NULL;
}
*err_framenum = 0;
return MERGE_ERR_CANT_OPEN_OUTFILE;
}
if (cb)
cb->callback_func(MERGE_EVENT_READY_TO_MERGE, 0, in_files, in_file_count, cb->data);
status = merge_process_packets(pdh, file_type, in_files, in_file_count,
do_append, mode, snaplen, cb,
idb_inf, nrb_combined, dsb_combined,
err, err_info,
err_fileno, err_framenum);
g_free(in_files);
wtap_block_array_free(shb_hdrs);
wtap_free_idb_info(idb_inf);
if (nrb_combined) {
g_array_free(nrb_combined, TRUE);
}
if (dsb_combined) {
g_array_free(dsb_combined, TRUE);
if (temp_files != NULL) {
if (status == MERGE_OK) {
status = merge_files_common(out_filename, out_filenamep, pfx,
file_type, (const char**)temp_files->pdata,
temp_files->len, do_append, mode, snaplen, app_name,
cb, err, err_info, err_fileno, err_framenum);
}
g_ptr_array_free(temp_files, TRUE);
}
return status;