Kafka: add Snappy decompression support

Change-Id: Ida8d941809a4e0f2fd4d9f142363187a757d0278
Reviewed-on: https://code.wireshark.org/review/18288
Reviewed-by: Pascal Quantin <pascal.quantin@gmail.com>
Reviewed-by: Martin Mathieson <martin.r.mathieson@googlemail.com>
This commit is contained in:
Pascal Quantin 2016-10-18 22:42:09 +02:00
parent 25122f5ef6
commit dbb391a646
1 changed files with 67 additions and 2 deletions

View File

@ -30,6 +30,9 @@
#include <epan/prefs.h>
#include <epan/prefs-int.h>
#include <epan/proto_data.h>
#ifdef HAVE_SNAPPY
#include <snappy-c.h>
#endif
#include "packet-tcp.h"
void proto_register_kafka(void);
@ -182,6 +185,9 @@ static const value_string kafka_codecs[] = {
{ KAFKA_COMPRESSION_SNAPPY, "Snappy" },
{ 0, NULL }
};
#ifdef HAVE_SNAPPY
static const guint8 kafka_xerial_header[8] = {0x82, 0x53, 0x4e, 0x41, 0x50, 0x50, 0x59, 0x00};
#endif
/* List/range of TCP ports to register */
static range_t *current_kafka_tcp_range = NULL;
@ -346,7 +352,7 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s
switch (codec) {
case KAFKA_COMPRESSION_GZIP:
raw = kafka_get_bytes(tree, tvb, pinfo, offset);
raw = kafka_get_bytes(subtree, tvb, pinfo, offset);
offset += 4;
if (raw) {
@ -370,7 +376,66 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s
break;
case KAFKA_COMPRESSION_SNAPPY:
/* We can't uncompress snappy yet... */
#ifdef HAVE_SNAPPY
raw = kafka_get_bytes(subtree, tvb, pinfo, offset);
offset += 4;
if (raw) {
guint compressed_size = tvb_reported_length(raw);
guint8 *data = (guint8*)tvb_memdup(wmem_packet_scope(), raw, 0, compressed_size);
size_t uncompressed_size;
snappy_status ret = SNAPPY_INVALID_INPUT;
if (tvb_memeql(raw, 0, kafka_xerial_header, sizeof(kafka_xerial_header)) == 0) {
/* xerial framing format */
guint chunk_size, pos = 16;
payload = tvb_new_composite();
while (pos < compressed_size) {
chunk_size = tvb_get_ntohl(raw, pos);
pos += 4;
ret = snappy_uncompressed_length(&data[pos], chunk_size, &uncompressed_size);
if (ret == SNAPPY_OK) {
guint8 *decompressed_buffer = (guint8*)wmem_alloc(pinfo->pool, uncompressed_size);
ret = snappy_uncompress(&data[pos], chunk_size, decompressed_buffer, &uncompressed_size);
if (ret == SNAPPY_OK) {
tvb_composite_append(payload,
tvb_new_child_real_data(tvb, decompressed_buffer,
(guint32)uncompressed_size, (guint32)uncompressed_size));
} else {
wmem_free(pinfo->pool, decompressed_buffer);
break;
}
}
pos += chunk_size;
}
tvb_composite_finalize(payload);
} else {
/* unframed format */
ret = snappy_uncompressed_length(data, compressed_size, &uncompressed_size);
if (ret == SNAPPY_OK) {
guint8 *decompressed_buffer = (guint8*)wmem_alloc(pinfo->pool, uncompressed_size);
ret = snappy_uncompress(data, compressed_size, decompressed_buffer, &uncompressed_size);
if (ret == SNAPPY_OK) {
payload = tvb_new_child_real_data(tvb, decompressed_buffer,
(guint32)uncompressed_size, (guint32)uncompressed_size);
} else {
wmem_free(pinfo->pool, decompressed_buffer);
}
}
}
if (ret == SNAPPY_OK) {
add_new_data_source(pinfo, payload, "Uncompressed Message");
dissect_kafka_message_set(payload, pinfo, subtree, 0, FALSE);
} else {
decrypt_item = proto_tree_add_item(subtree, hf_kafka_message_value, raw, 0, -1, ENC_NA);
expert_add_info(pinfo, decrypt_item, &ei_kafka_message_decompress);
}
offset += tvb_captured_length(raw);
}
break;
#endif
case KAFKA_COMPRESSION_NONE:
default:
offset = dissect_kafka_bytes(subtree, hf_kafka_message_value, tvb, pinfo, offset, NULL, &bytes_length);