From d89b83a9e4f9a974a05943bca4447371003bd725 Mon Sep 17 00:00:00 2001 From: Martin Mathieson Date: Sat, 6 May 2017 12:35:13 +0100 Subject: [PATCH] Kafka: show decompressed messages more clearly Change-Id: I8edf251d9ab26f93bf54d8032706ac7b9e4f6dd1 Reviewed-on: https://code.wireshark.org/review/21538 Reviewed-by: Evan Huus Petri-Dish: Evan Huus Tested-by: Petri Dish Buildbot Reviewed-by: Martin Mathieson --- epan/dissectors/packet-kafka.c | 71 +++++++++++++++++++++++++++------- 1 file changed, 58 insertions(+), 13 deletions(-) diff --git a/epan/dissectors/packet-kafka.c b/epan/dissectors/packet-kafka.c index f6e58e6a54..7ed7388614 100644 --- a/epan/dissectors/packet-kafka.c +++ b/epan/dissectors/packet-kafka.c @@ -67,6 +67,8 @@ static int hf_kafka_message_timestamp_type = -1; static int hf_kafka_message_timestamp = -1; static int hf_kafka_message_key = -1; static int hf_kafka_message_value = -1; +static int hf_kafka_message_value_compressed = -1; +static int hf_kafka_message_compression_reduction = -1; static int hf_kafka_request_frame = -1; static int hf_kafka_response_frame = -1; static int hf_kafka_consumer_group = -1; @@ -344,7 +346,7 @@ typedef struct kafka_packet_values_t { /* Forward declaration (dissect_kafka_message_set() and dissect_kafka_message() call each other...) */ static int -dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset, gboolean has_length_field); +dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset, gboolean has_length_field, guint8 codec); /* HELPERS */ @@ -555,10 +557,22 @@ dissect_kafka_timestamp(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, return offset; } +/* Calculate and show the reduction in transmitted size due to compression */ +static void show_compression_reduction(tvbuff_t *tvb, proto_tree *tree, guint compressed_size, guint uncompressed_size) +{ + proto_item *ti; + /* Not really expecting a message to compress down to nothing, but defend against dividing by 0 anyway */ + if (uncompressed_size != 0) { + ti = proto_tree_add_float(tree, hf_kafka_message_compression_reduction, tvb, 0, 0, + (float)compressed_size / (float)uncompressed_size); + PROTO_ITEM_SET_GENERATED(ti); + } +} + static int dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset) { - proto_item *ti, *decrypt_item; + proto_item *message_ti, *decrypt_item; proto_tree *subtree; tvbuff_t *raw, *payload; int offset = start_offset; @@ -567,7 +581,7 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s guint bytes_length = 0; - subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_message, &ti, "Message"); + subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_message, &message_ti, "Message"); /* CRC */ proto_tree_add_item(subtree, hf_kafka_message_crc, tvb, offset, 4, ENC_BIG_ENDIAN); @@ -599,20 +613,27 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s offset += 4; if (raw) { + guint compressed_size = tvb_captured_length(raw); + + /* Raw compressed data */ + proto_tree_add_item(subtree, hf_kafka_message_value_compressed, tvb, offset, compressed_size, ENC_NA); + /* Unzip message and add payload to new data tab */ - payload = tvb_child_uncompress(tvb, raw, 0, tvb_captured_length(raw)); + payload = tvb_child_uncompress(tvb, raw, 0, compressed_size); if (payload) { + show_compression_reduction(tvb, subtree, compressed_size, tvb_captured_length(payload)); + add_new_data_source(pinfo, payload, "Uncompressed Message"); - dissect_kafka_message_set(payload, pinfo, subtree, 0, FALSE); + dissect_kafka_message_set(payload, pinfo, subtree, 0, FALSE, codec); } else { decrypt_item = proto_tree_add_item(subtree, hf_kafka_message_value, raw, 0, -1, ENC_NA); expert_add_info(pinfo, decrypt_item, &ei_kafka_message_decompress); } - offset += tvb_captured_length(raw); + offset += compressed_size; /* Add to summary */ col_append_fstr(pinfo->cinfo, COL_INFO, " [GZIPd message set]"); - proto_item_append_text(ti, " (GZIPd message set)"); + proto_item_append_text(message_ti, " (GZIPd message set)"); } else { proto_tree_add_bytes(subtree, hf_kafka_message_value, tvb, offset, 0, NULL); @@ -628,6 +649,9 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s size_t uncompressed_size; snappy_status ret = SNAPPY_INVALID_INPUT; + /* Raw compressed data */ + proto_tree_add_item(subtree, hf_kafka_message_value_compressed, tvb, offset, compressed_size, ENC_NA); + if (tvb_memeql(raw, 0, kafka_xerial_header, sizeof(kafka_xerial_header)) == 0) { /* xerial framing format */ guint chunk_size, pos = 16; @@ -669,8 +693,14 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s } } if (ret == SNAPPY_OK) { + show_compression_reduction(tvb, subtree, compressed_size, uncompressed_size); + add_new_data_source(pinfo, payload, "Uncompressed Message"); - dissect_kafka_message_set(payload, pinfo, subtree, 0, FALSE); + dissect_kafka_message_set(payload, pinfo, subtree, 0, FALSE, codec); + + /* Add to summary */ + col_append_fstr(pinfo->cinfo, COL_INFO, " [Snappy-compressed message set]"); + proto_item_append_text(message_ti, " (Snappy-compressed message set)"); } else { decrypt_item = proto_tree_add_item(subtree, hf_kafka_message_value, raw, 0, -1, ENC_NA); expert_add_info(pinfo, decrypt_item, &ei_kafka_message_decompress); @@ -680,22 +710,23 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s break; #endif case KAFKA_MESSAGE_CODEC_LZ4: + /* TODO: */ case KAFKA_MESSAGE_CODEC_NONE: default: offset = dissect_kafka_bytes(subtree, hf_kafka_message_value, tvb, pinfo, offset, NULL, &bytes_length); /* Add to summary */ col_append_fstr(pinfo->cinfo, COL_INFO, " [%u bytes]", bytes_length); - proto_item_append_text(ti, " (%u bytes)", bytes_length); + proto_item_append_text(message_ti, " (%u bytes)", bytes_length); } - proto_item_set_len(ti, offset - start_offset); + proto_item_set_len(message_ti, offset - start_offset); return offset; } static int -dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset, gboolean has_length_field) +dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset, gboolean has_length_field, guint8 codec) { proto_item *ti; proto_tree *subtree; @@ -718,6 +749,10 @@ dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, i } subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_message_set, &ti, "Message Set"); + /* If set came from a compressed message, make it obvious in tree root */ + if (codec != KAFKA_MESSAGE_CODEC_NONE) { + proto_item_append_text(subtree, " [from compressed %s message]", val_to_str_const(codec, kafka_message_codecs, "Unknown codec")); + } while (offset - start_offset < len) { proto_tree_add_item(subtree, hf_kafka_offset, tvb, offset, 8, ENC_BIG_ENDIAN); @@ -1466,7 +1501,7 @@ dissect_kafka_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo, proto_ offset = dissect_kafka_offset_get_value(tvb, pinfo, subtree, offset, &packet_values); - offset = dissect_kafka_message_set(tvb, pinfo, subtree, offset, TRUE); + offset = dissect_kafka_message_set(tvb, pinfo, subtree, offset, TRUE, KAFKA_MESSAGE_CODEC_NONE); proto_item_set_len(ti, offset - start_offset); @@ -1526,7 +1561,7 @@ dissect_kafka_produce_request_partition(tvbuff_t *tvb, packet_info *pinfo, proto offset = dissect_kafka_partition_id_get_value(tvb, pinfo, subtree, offset, &packet_values); - offset = dissect_kafka_message_set(tvb, pinfo, subtree, offset, TRUE); + offset = dissect_kafka_message_set(tvb, pinfo, subtree, offset, TRUE, KAFKA_MESSAGE_CODEC_NONE); proto_item_append_text(ti, " (Partition-ID=%u)", packet_values.partition_id); @@ -3587,6 +3622,16 @@ proto_register_kafka(void) FT_BYTES, BASE_NONE, 0, 0, NULL, HFILL } }, + { &hf_kafka_message_value_compressed, + { "Compressed Value", "kafka.message_value_compressed", + FT_BYTES, BASE_NONE, 0, 0, + NULL, HFILL } + }, + { &hf_kafka_message_compression_reduction, + { "Compression Reduction (compressed/uncompressed)", "kafka.message_compression_reduction", + FT_FLOAT, BASE_NONE, 0, 0, + NULL, HFILL } + }, { &hf_kafka_consumer_group, { "Consumer Group", "kafka.consumer_group", FT_STRING, BASE_NONE, 0, 0,