Kafka: add LZ4 decompression

Change-Id: Idf2f63782c8751778ad88f46a7f65fe7d5d49f3b
Reviewed-on: https://code.wireshark.org/review/21577
Reviewed-by: Pascal Quantin <pascal.quantin@gmail.com>
Petri-Dish: Pascal Quantin <pascal.quantin@gmail.com>
Tested-by: Petri Dish Buildbot <buildbot-no-reply@wireshark.org>
Reviewed-by: Michael Mann <mmann78@netscape.net>
This commit is contained in:
Pascal Quantin 2017-05-09 21:21:04 +01:00 committed by Michael Mann
parent 2c9b07a8b6
commit 8112b8d7a8
1 changed files with 211 additions and 1 deletions

View File

@ -33,6 +33,9 @@
#ifdef HAVE_SNAPPY
#include <snappy-c.h>
#endif
#ifdef HAVE_LZ4
#include <lz4frame.h>
#endif
#include "packet-tcp.h"
void proto_register_kafka(void);
@ -351,6 +354,121 @@ dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, i
/* HELPERS */
#ifdef HAVE_LZ4
/* Local copy of XXH32() algorithm as found in https://github.com/lz4/lz4/blob/v1.7.5/lib/xxhash.c
as some packagers are not providing xxhash.h in liblz4 */
typedef struct {
guint32 total_len_32;
guint32 large_len;
guint32 v1;
guint32 v2;
guint32 v3;
guint32 v4;
guint32 mem32[4]; /* buffer defined as U32 for alignment */
guint32 memsize;
guint32 reserved; /* never read nor write, will be removed in a future version */
} XXH32_state_t;
typedef enum {
XXH_bigEndian=0,
XXH_littleEndian=1
} XXH_endianess;
static const int g_one = 1;
#define XXH_CPU_LITTLE_ENDIAN (*(const char*)(&g_one))
static const guint32 PRIME32_1 = 2654435761U;
static const guint32 PRIME32_2 = 2246822519U;
static const guint32 PRIME32_3 = 3266489917U;
static const guint32 PRIME32_4 = 668265263U;
static const guint32 PRIME32_5 = 374761393U;
#define XXH_rotl32(x,r) ((x << r) | (x >> (32 - r)))
static guint32 XXH_read32(const void* memPtr)
{
guint32 val;
memcpy(&val, memPtr, sizeof(val));
return val;
}
static guint32 XXH_swap32(guint32 x)
{
return ((x << 24) & 0xff000000 ) |
((x << 8) & 0x00ff0000 ) |
((x >> 8) & 0x0000ff00 ) |
((x >> 24) & 0x000000ff );
}
#define XXH_readLE32(ptr, endian) (endian==XXH_littleEndian ? XXH_read32(ptr) : XXH_swap32(XXH_read32(ptr)))
static guint32 XXH32_round(guint32 seed, guint32 input)
{
seed += input * PRIME32_2;
seed = XXH_rotl32(seed, 13);
seed *= PRIME32_1;
return seed;
}
static guint32 XXH32_endian(const void* input, size_t len, guint32 seed, XXH_endianess endian)
{
const gint8* p = (const gint8*)input;
const gint8* bEnd = p + len;
guint32 h32;
#define XXH_get32bits(p) XXH_readLE32(p, endian)
if (len>=16) {
const gint8* const limit = bEnd - 16;
guint32 v1 = seed + PRIME32_1 + PRIME32_2;
guint32 v2 = seed + PRIME32_2;
guint32 v3 = seed + 0;
guint32 v4 = seed - PRIME32_1;
do {
v1 = XXH32_round(v1, XXH_get32bits(p)); p+=4;
v2 = XXH32_round(v2, XXH_get32bits(p)); p+=4;
v3 = XXH32_round(v3, XXH_get32bits(p)); p+=4;
v4 = XXH32_round(v4, XXH_get32bits(p)); p+=4;
} while (p<=limit);
h32 = XXH_rotl32(v1, 1) + XXH_rotl32(v2, 7) + XXH_rotl32(v3, 12) + XXH_rotl32(v4, 18);
} else {
h32 = seed + PRIME32_5;
}
h32 += (guint32) len;
while (p+4<=bEnd) {
h32 += XXH_get32bits(p) * PRIME32_3;
h32 = XXH_rotl32(h32, 17) * PRIME32_4 ;
p+=4;
}
while (p<bEnd) {
h32 += (*p) * PRIME32_5;
h32 = XXH_rotl32(h32, 11) * PRIME32_1 ;
p++;
}
h32 ^= h32 >> 15;
h32 *= PRIME32_2;
h32 ^= h32 >> 13;
h32 *= PRIME32_3;
h32 ^= h32 >> 16;
return h32;
}
static guint XXH32(const void* input, size_t len, guint seed)
{
XXH_endianess endian_detected = (XXH_endianess)XXH_CPU_LITTLE_ENDIAN;
if (endian_detected==XXH_littleEndian)
return XXH32_endian(input, len, seed, XXH_littleEndian);
else
return XXH32_endian(input, len, seed, XXH_bigEndian);
}
#endif
static const char *
kafka_error_to_str(kafka_error_t error)
{
@ -710,7 +828,99 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s
break;
#endif
case KAFKA_MESSAGE_CODEC_LZ4:
/* TODO: */
#ifdef HAVE_LZ4
raw = kafka_get_bytes(subtree, tvb, pinfo, offset);
offset += 4;
if (raw) {
LZ4F_decompressionContext_t lz4_ctxt;
LZ4F_frameInfo_t lz4_info;
LZ4F_errorCode_t ret;
LZ4F_decompressOptions_t dec_opts = {0};
size_t src_offset, src_size, dst_size;
guchar *decompressed_buffer = NULL;
/* Prepare compressed data buffer */
guint compressed_size = tvb_reported_length(raw);
guint8 *data = (guint8*)tvb_memdup(wmem_packet_scope(), raw, 0, compressed_size);
/* Override header checksum to workaround buggy Kafka implementations */
if (compressed_size > 7) {
guint hdr_end = 6;
if (data[4] & 0x08) {
hdr_end += 8;
}
if (hdr_end < compressed_size) {
data[hdr_end] = (XXH32(&data[4], hdr_end - 4, 0) >> 8) & 0xff;
}
}
/* Show raw compressed data */
proto_tree_add_item(subtree, hf_kafka_message_value_compressed, tvb, offset, compressed_size, ENC_NA);
/* Allocate output buffer */
ret = LZ4F_createDecompressionContext(&lz4_ctxt, LZ4F_VERSION);
if (LZ4F_isError(ret)) {
goto fail;
}
src_offset = compressed_size;
ret = LZ4F_getFrameInfo(lz4_ctxt, &lz4_info, data, &src_offset);
if (LZ4F_isError(ret)) {
LZ4F_freeDecompressionContext(lz4_ctxt);
goto fail;
}
switch (lz4_info.blockSizeID) {
case LZ4F_max64KB:
dst_size = 1 << 16;
break;
case LZ4F_max256KB:
dst_size = 1 << 18;
break;
case LZ4F_max1MB:
dst_size = 1 << 20;
break;
case LZ4F_max4MB:
dst_size = 1 << 22;
break;
default:
LZ4F_freeDecompressionContext(lz4_ctxt);
goto fail;
}
if (lz4_info.contentSize && lz4_info.contentSize < dst_size) {
dst_size = (size_t)lz4_info.contentSize;
}
decompressed_buffer = (guchar*)wmem_alloc(pinfo->pool, dst_size);
/* Attempt the decompression. */
src_size = compressed_size - src_offset;
ret = LZ4F_decompress(lz4_ctxt, decompressed_buffer, &dst_size,
&data[src_offset], &src_size, &dec_opts);
LZ4F_freeDecompressionContext(lz4_ctxt);
if (ret == 0) {
size_t uncompressed_size = dst_size;
show_compression_reduction(tvb, subtree, compressed_size, (guint)uncompressed_size);
/* Add as separate data tab */
payload = tvb_new_child_real_data(tvb, decompressed_buffer,
(guint32)uncompressed_size, (guint32)uncompressed_size);
add_new_data_source(pinfo, payload, "Uncompressed Message");
/* Dissect as a message set */
dissect_kafka_message_set(payload, pinfo, subtree, 0, FALSE, codec);
/* Add to summary */
col_append_fstr(pinfo->cinfo, COL_INFO, " [LZ4-compressed message set]");
proto_item_append_text(message_ti, " (LZ4-compressed message set)");
} else {
fail:
/* Error */
decrypt_item = proto_tree_add_item(subtree, hf_kafka_message_value, raw, 0, -1, ENC_NA);
expert_add_info(pinfo, decrypt_item, &ei_kafka_message_decompress);
}
offset += compressed_size;
}
break;
#endif
case KAFKA_MESSAGE_CODEC_NONE:
default:
offset = dissect_kafka_bytes(subtree, hf_kafka_message_value, tvb, pinfo, offset, NULL, &bytes_length);