From 8112b8d7a83536193c75a7fd1e51d3af8b1c8c01 Mon Sep 17 00:00:00 2001 From: Pascal Quantin Date: Tue, 9 May 2017 21:21:04 +0100 Subject: [PATCH] Kafka: add LZ4 decompression Change-Id: Idf2f63782c8751778ad88f46a7f65fe7d5d49f3b Reviewed-on: https://code.wireshark.org/review/21577 Reviewed-by: Pascal Quantin Petri-Dish: Pascal Quantin Tested-by: Petri Dish Buildbot Reviewed-by: Michael Mann --- epan/dissectors/packet-kafka.c | 212 ++++++++++++++++++++++++++++++++- 1 file changed, 211 insertions(+), 1 deletion(-) diff --git a/epan/dissectors/packet-kafka.c b/epan/dissectors/packet-kafka.c index bcaeceb344..9079ee552b 100644 --- a/epan/dissectors/packet-kafka.c +++ b/epan/dissectors/packet-kafka.c @@ -33,6 +33,9 @@ #ifdef HAVE_SNAPPY #include #endif +#ifdef HAVE_LZ4 +#include +#endif #include "packet-tcp.h" void proto_register_kafka(void); @@ -351,6 +354,121 @@ dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, i /* HELPERS */ +#ifdef HAVE_LZ4 +/* Local copy of XXH32() algorithm as found in https://github.com/lz4/lz4/blob/v1.7.5/lib/xxhash.c + as some packagers are not providing xxhash.h in liblz4 */ +typedef struct { + guint32 total_len_32; + guint32 large_len; + guint32 v1; + guint32 v2; + guint32 v3; + guint32 v4; + guint32 mem32[4]; /* buffer defined as U32 for alignment */ + guint32 memsize; + guint32 reserved; /* never read nor write, will be removed in a future version */ +} XXH32_state_t; + +typedef enum { + XXH_bigEndian=0, + XXH_littleEndian=1 +} XXH_endianess; + +static const int g_one = 1; +#define XXH_CPU_LITTLE_ENDIAN (*(const char*)(&g_one)) + +static const guint32 PRIME32_1 = 2654435761U; +static const guint32 PRIME32_2 = 2246822519U; +static const guint32 PRIME32_3 = 3266489917U; +static const guint32 PRIME32_4 = 668265263U; +static const guint32 PRIME32_5 = 374761393U; + +#define XXH_rotl32(x,r) ((x << r) | (x >> (32 - r))) + +static guint32 XXH_read32(const void* memPtr) +{ + guint32 val; + memcpy(&val, memPtr, sizeof(val)); + return val; +} + +static guint32 XXH_swap32(guint32 x) +{ + return ((x << 24) & 0xff000000 ) | + ((x << 8) & 0x00ff0000 ) | + ((x >> 8) & 0x0000ff00 ) | + ((x >> 24) & 0x000000ff ); +} + +#define XXH_readLE32(ptr, endian) (endian==XXH_littleEndian ? XXH_read32(ptr) : XXH_swap32(XXH_read32(ptr))) + +static guint32 XXH32_round(guint32 seed, guint32 input) +{ + seed += input * PRIME32_2; + seed = XXH_rotl32(seed, 13); + seed *= PRIME32_1; + return seed; +} + +static guint32 XXH32_endian(const void* input, size_t len, guint32 seed, XXH_endianess endian) +{ + const gint8* p = (const gint8*)input; + const gint8* bEnd = p + len; + guint32 h32; +#define XXH_get32bits(p) XXH_readLE32(p, endian) + + if (len>=16) { + const gint8* const limit = bEnd - 16; + guint32 v1 = seed + PRIME32_1 + PRIME32_2; + guint32 v2 = seed + PRIME32_2; + guint32 v3 = seed + 0; + guint32 v4 = seed - PRIME32_1; + + do { + v1 = XXH32_round(v1, XXH_get32bits(p)); p+=4; + v2 = XXH32_round(v2, XXH_get32bits(p)); p+=4; + v3 = XXH32_round(v3, XXH_get32bits(p)); p+=4; + v4 = XXH32_round(v4, XXH_get32bits(p)); p+=4; + } while (p<=limit); + + h32 = XXH_rotl32(v1, 1) + XXH_rotl32(v2, 7) + XXH_rotl32(v3, 12) + XXH_rotl32(v4, 18); + } else { + h32 = seed + PRIME32_5; + } + + h32 += (guint32) len; + + while (p+4<=bEnd) { + h32 += XXH_get32bits(p) * PRIME32_3; + h32 = XXH_rotl32(h32, 17) * PRIME32_4 ; + p+=4; + } + + while (p> 15; + h32 *= PRIME32_2; + h32 ^= h32 >> 13; + h32 *= PRIME32_3; + h32 ^= h32 >> 16; + + return h32; +} + +static guint XXH32(const void* input, size_t len, guint seed) +{ + XXH_endianess endian_detected = (XXH_endianess)XXH_CPU_LITTLE_ENDIAN; + if (endian_detected==XXH_littleEndian) + return XXH32_endian(input, len, seed, XXH_littleEndian); + else + return XXH32_endian(input, len, seed, XXH_bigEndian); +} +#endif + static const char * kafka_error_to_str(kafka_error_t error) { @@ -710,7 +828,99 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s break; #endif case KAFKA_MESSAGE_CODEC_LZ4: - /* TODO: */ +#ifdef HAVE_LZ4 + raw = kafka_get_bytes(subtree, tvb, pinfo, offset); + offset += 4; + if (raw) { + LZ4F_decompressionContext_t lz4_ctxt; + LZ4F_frameInfo_t lz4_info; + LZ4F_errorCode_t ret; + LZ4F_decompressOptions_t dec_opts = {0}; + size_t src_offset, src_size, dst_size; + guchar *decompressed_buffer = NULL; + + /* Prepare compressed data buffer */ + guint compressed_size = tvb_reported_length(raw); + guint8 *data = (guint8*)tvb_memdup(wmem_packet_scope(), raw, 0, compressed_size); + /* Override header checksum to workaround buggy Kafka implementations */ + if (compressed_size > 7) { + guint hdr_end = 6; + if (data[4] & 0x08) { + hdr_end += 8; + } + if (hdr_end < compressed_size) { + data[hdr_end] = (XXH32(&data[4], hdr_end - 4, 0) >> 8) & 0xff; + } + } + + /* Show raw compressed data */ + proto_tree_add_item(subtree, hf_kafka_message_value_compressed, tvb, offset, compressed_size, ENC_NA); + + /* Allocate output buffer */ + ret = LZ4F_createDecompressionContext(&lz4_ctxt, LZ4F_VERSION); + if (LZ4F_isError(ret)) { + goto fail; + } + src_offset = compressed_size; + ret = LZ4F_getFrameInfo(lz4_ctxt, &lz4_info, data, &src_offset); + if (LZ4F_isError(ret)) { + LZ4F_freeDecompressionContext(lz4_ctxt); + goto fail; + } + switch (lz4_info.blockSizeID) { + case LZ4F_max64KB: + dst_size = 1 << 16; + break; + case LZ4F_max256KB: + dst_size = 1 << 18; + break; + case LZ4F_max1MB: + dst_size = 1 << 20; + break; + case LZ4F_max4MB: + dst_size = 1 << 22; + break; + default: + LZ4F_freeDecompressionContext(lz4_ctxt); + goto fail; + } + if (lz4_info.contentSize && lz4_info.contentSize < dst_size) { + dst_size = (size_t)lz4_info.contentSize; + } + decompressed_buffer = (guchar*)wmem_alloc(pinfo->pool, dst_size); + + /* Attempt the decompression. */ + src_size = compressed_size - src_offset; + ret = LZ4F_decompress(lz4_ctxt, decompressed_buffer, &dst_size, + &data[src_offset], &src_size, &dec_opts); + LZ4F_freeDecompressionContext(lz4_ctxt); + if (ret == 0) { + size_t uncompressed_size = dst_size; + + show_compression_reduction(tvb, subtree, compressed_size, (guint)uncompressed_size); + + /* Add as separate data tab */ + payload = tvb_new_child_real_data(tvb, decompressed_buffer, + (guint32)uncompressed_size, (guint32)uncompressed_size); + add_new_data_source(pinfo, payload, "Uncompressed Message"); + + /* Dissect as a message set */ + dissect_kafka_message_set(payload, pinfo, subtree, 0, FALSE, codec); + + /* Add to summary */ + col_append_fstr(pinfo->cinfo, COL_INFO, " [LZ4-compressed message set]"); + proto_item_append_text(message_ti, " (LZ4-compressed message set)"); + } else { + fail: + /* Error */ + decrypt_item = proto_tree_add_item(subtree, hf_kafka_message_value, raw, 0, -1, ENC_NA); + expert_add_info(pinfo, decrypt_item, &ei_kafka_message_decompress); + } + offset += compressed_size; + } + break; +#endif + case KAFKA_MESSAGE_CODEC_NONE: default: offset = dissect_kafka_bytes(subtree, hf_kafka_message_value, tvb, pinfo, offset, NULL, &bytes_length);