use uncompress_zstd in Kafka

This commit is contained in:
Kevin Albertson 2022-12-19 20:45:24 -05:00 committed by Alexis La Goutte
parent f5c400c6e7
commit eca4655a48
1 changed files with 7 additions and 47 deletions

View File

@ -26,9 +26,6 @@
#include <lz4.h>
#include <lz4frame.h>
#endif
#ifdef HAVE_ZSTD
#include <zstd.h>
#endif
#include "packet-tcp.h"
#include "packet-tls.h"
@ -1832,51 +1829,14 @@ decompress_snappy(tvbuff_t *tvb _U_, packet_info *pinfo, int offset _U_, int len
static gboolean
decompress_zstd(tvbuff_t *tvb, packet_info *pinfo, int offset, guint32 length, tvbuff_t **decompressed_tvb, int *decompressed_offset)
{
ZSTD_inBuffer input = { tvb_memdup(pinfo->pool, tvb, offset, length), length, 0 };
ZSTD_DStream *zds = ZSTD_createDStream();
size_t rc = 0;
tvbuff_t *composite_tvb = NULL;
gboolean ret = FALSE;
ZSTD_outBuffer output = { wmem_alloc(pinfo->pool, ZSTD_DStreamOutSize()), ZSTD_DStreamOutSize(), 0 };
int count = 0;
while (input.pos < input.size && count < MAX_LOOP_ITERATIONS) {
rc = ZSTD_decompressStream(zds, &output, &input);
// rc is 0 when a frame is completely decoded and fully flushed,
// or an error code, which can be tested using ZSTD_isError(),
// or any other value > 0, which means there is still some decoding or flushing to do to complete current frame :
// the return value is a suggested next input size (just a hint for better latency)
// that will never request more than the remaining frame size.
if (ZSTD_isError(rc)) {
goto end;
}
if (!composite_tvb) {
composite_tvb = tvb_new_composite();
}
tvb_composite_append(composite_tvb,
tvb_new_child_real_data(tvb, (guint8*)output.dst, (guint)output.pos, (gint)output.pos));
// Reset output pos to zero to clear the output buffer.
output.pos = 0;
count++;
DISSECTOR_ASSERT_HINT(count < MAX_LOOP_ITERATIONS, "MAX_LOOP_ITERATIONS exceeded");
*decompressed_tvb = tvb_child_uncompress_zstd(tvb, tvb, offset, length);
*decompressed_offset = 0;
if (*decompressed_tvb) {
return TRUE;
} else {
col_append_str(pinfo->cinfo, COL_INFO, " [zstd decompression failed] ");
return FALSE;
}
ret = TRUE;
end:
if (composite_tvb) {
tvb_composite_finalize(composite_tvb);
}
ZSTD_freeDStream(zds);
if (ret == 1) {
*decompressed_tvb = composite_tvb;
*decompressed_offset = 0;
}
else {
col_append_str(pinfo->cinfo, COL_INFO, " [zstd decompression failed]");
}
return ret;
}
#else
static gboolean