forked from osmocom/wireshark
Kafka: show decompressed messages more clearly
Change-Id: I8edf251d9ab26f93bf54d8032706ac7b9e4f6dd1 Reviewed-on: https://code.wireshark.org/review/21538 Reviewed-by: Evan Huus <eapache@gmail.com> Petri-Dish: Evan Huus <eapache@gmail.com> Tested-by: Petri Dish Buildbot <buildbot-no-reply@wireshark.org> Reviewed-by: Martin Mathieson <martin.r.mathieson@googlemail.com>
This commit is contained in:
parent
6e925a9e7d
commit
d89b83a9e4
|
@ -67,6 +67,8 @@ static int hf_kafka_message_timestamp_type = -1;
|
|||
static int hf_kafka_message_timestamp = -1;
|
||||
static int hf_kafka_message_key = -1;
|
||||
static int hf_kafka_message_value = -1;
|
||||
static int hf_kafka_message_value_compressed = -1;
|
||||
static int hf_kafka_message_compression_reduction = -1;
|
||||
static int hf_kafka_request_frame = -1;
|
||||
static int hf_kafka_response_frame = -1;
|
||||
static int hf_kafka_consumer_group = -1;
|
||||
|
@ -344,7 +346,7 @@ typedef struct kafka_packet_values_t {
|
|||
|
||||
/* Forward declaration (dissect_kafka_message_set() and dissect_kafka_message() call each other...) */
|
||||
static int
|
||||
dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset, gboolean has_length_field);
|
||||
dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset, gboolean has_length_field, guint8 codec);
|
||||
|
||||
|
||||
/* HELPERS */
|
||||
|
@ -555,10 +557,22 @@ dissect_kafka_timestamp(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree,
|
|||
return offset;
|
||||
}
|
||||
|
||||
/* Calculate and show the reduction in transmitted size due to compression */
|
||||
static void show_compression_reduction(tvbuff_t *tvb, proto_tree *tree, guint compressed_size, guint uncompressed_size)
|
||||
{
|
||||
proto_item *ti;
|
||||
/* Not really expecting a message to compress down to nothing, but defend against dividing by 0 anyway */
|
||||
if (uncompressed_size != 0) {
|
||||
ti = proto_tree_add_float(tree, hf_kafka_message_compression_reduction, tvb, 0, 0,
|
||||
(float)compressed_size / (float)uncompressed_size);
|
||||
PROTO_ITEM_SET_GENERATED(ti);
|
||||
}
|
||||
}
|
||||
|
||||
static int
|
||||
dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset)
|
||||
{
|
||||
proto_item *ti, *decrypt_item;
|
||||
proto_item *message_ti, *decrypt_item;
|
||||
proto_tree *subtree;
|
||||
tvbuff_t *raw, *payload;
|
||||
int offset = start_offset;
|
||||
|
@ -567,7 +581,7 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s
|
|||
guint bytes_length = 0;
|
||||
|
||||
|
||||
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_message, &ti, "Message");
|
||||
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_message, &message_ti, "Message");
|
||||
|
||||
/* CRC */
|
||||
proto_tree_add_item(subtree, hf_kafka_message_crc, tvb, offset, 4, ENC_BIG_ENDIAN);
|
||||
|
@ -599,20 +613,27 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s
|
|||
offset += 4;
|
||||
|
||||
if (raw) {
|
||||
guint compressed_size = tvb_captured_length(raw);
|
||||
|
||||
/* Raw compressed data */
|
||||
proto_tree_add_item(subtree, hf_kafka_message_value_compressed, tvb, offset, compressed_size, ENC_NA);
|
||||
|
||||
/* Unzip message and add payload to new data tab */
|
||||
payload = tvb_child_uncompress(tvb, raw, 0, tvb_captured_length(raw));
|
||||
payload = tvb_child_uncompress(tvb, raw, 0, compressed_size);
|
||||
if (payload) {
|
||||
show_compression_reduction(tvb, subtree, compressed_size, tvb_captured_length(payload));
|
||||
|
||||
add_new_data_source(pinfo, payload, "Uncompressed Message");
|
||||
dissect_kafka_message_set(payload, pinfo, subtree, 0, FALSE);
|
||||
dissect_kafka_message_set(payload, pinfo, subtree, 0, FALSE, codec);
|
||||
} else {
|
||||
decrypt_item = proto_tree_add_item(subtree, hf_kafka_message_value, raw, 0, -1, ENC_NA);
|
||||
expert_add_info(pinfo, decrypt_item, &ei_kafka_message_decompress);
|
||||
}
|
||||
offset += tvb_captured_length(raw);
|
||||
offset += compressed_size;
|
||||
|
||||
/* Add to summary */
|
||||
col_append_fstr(pinfo->cinfo, COL_INFO, " [GZIPd message set]");
|
||||
proto_item_append_text(ti, " (GZIPd message set)");
|
||||
proto_item_append_text(message_ti, " (GZIPd message set)");
|
||||
}
|
||||
else {
|
||||
proto_tree_add_bytes(subtree, hf_kafka_message_value, tvb, offset, 0, NULL);
|
||||
|
@ -628,6 +649,9 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s
|
|||
size_t uncompressed_size;
|
||||
snappy_status ret = SNAPPY_INVALID_INPUT;
|
||||
|
||||
/* Raw compressed data */
|
||||
proto_tree_add_item(subtree, hf_kafka_message_value_compressed, tvb, offset, compressed_size, ENC_NA);
|
||||
|
||||
if (tvb_memeql(raw, 0, kafka_xerial_header, sizeof(kafka_xerial_header)) == 0) {
|
||||
/* xerial framing format */
|
||||
guint chunk_size, pos = 16;
|
||||
|
@ -669,8 +693,14 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s
|
|||
}
|
||||
}
|
||||
if (ret == SNAPPY_OK) {
|
||||
show_compression_reduction(tvb, subtree, compressed_size, uncompressed_size);
|
||||
|
||||
add_new_data_source(pinfo, payload, "Uncompressed Message");
|
||||
dissect_kafka_message_set(payload, pinfo, subtree, 0, FALSE);
|
||||
dissect_kafka_message_set(payload, pinfo, subtree, 0, FALSE, codec);
|
||||
|
||||
/* Add to summary */
|
||||
col_append_fstr(pinfo->cinfo, COL_INFO, " [Snappy-compressed message set]");
|
||||
proto_item_append_text(message_ti, " (Snappy-compressed message set)");
|
||||
} else {
|
||||
decrypt_item = proto_tree_add_item(subtree, hf_kafka_message_value, raw, 0, -1, ENC_NA);
|
||||
expert_add_info(pinfo, decrypt_item, &ei_kafka_message_decompress);
|
||||
|
@ -680,22 +710,23 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s
|
|||
break;
|
||||
#endif
|
||||
case KAFKA_MESSAGE_CODEC_LZ4:
|
||||
/* TODO: */
|
||||
case KAFKA_MESSAGE_CODEC_NONE:
|
||||
default:
|
||||
offset = dissect_kafka_bytes(subtree, hf_kafka_message_value, tvb, pinfo, offset, NULL, &bytes_length);
|
||||
|
||||
/* Add to summary */
|
||||
col_append_fstr(pinfo->cinfo, COL_INFO, " [%u bytes]", bytes_length);
|
||||
proto_item_append_text(ti, " (%u bytes)", bytes_length);
|
||||
proto_item_append_text(message_ti, " (%u bytes)", bytes_length);
|
||||
}
|
||||
|
||||
proto_item_set_len(ti, offset - start_offset);
|
||||
proto_item_set_len(message_ti, offset - start_offset);
|
||||
|
||||
return offset;
|
||||
}
|
||||
|
||||
static int
|
||||
dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset, gboolean has_length_field)
|
||||
dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset, gboolean has_length_field, guint8 codec)
|
||||
{
|
||||
proto_item *ti;
|
||||
proto_tree *subtree;
|
||||
|
@ -718,6 +749,10 @@ dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, i
|
|||
}
|
||||
|
||||
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_message_set, &ti, "Message Set");
|
||||
/* If set came from a compressed message, make it obvious in tree root */
|
||||
if (codec != KAFKA_MESSAGE_CODEC_NONE) {
|
||||
proto_item_append_text(subtree, " [from compressed %s message]", val_to_str_const(codec, kafka_message_codecs, "Unknown codec"));
|
||||
}
|
||||
|
||||
while (offset - start_offset < len) {
|
||||
proto_tree_add_item(subtree, hf_kafka_offset, tvb, offset, 8, ENC_BIG_ENDIAN);
|
||||
|
@ -1466,7 +1501,7 @@ dissect_kafka_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo, proto_
|
|||
|
||||
offset = dissect_kafka_offset_get_value(tvb, pinfo, subtree, offset, &packet_values);
|
||||
|
||||
offset = dissect_kafka_message_set(tvb, pinfo, subtree, offset, TRUE);
|
||||
offset = dissect_kafka_message_set(tvb, pinfo, subtree, offset, TRUE, KAFKA_MESSAGE_CODEC_NONE);
|
||||
|
||||
proto_item_set_len(ti, offset - start_offset);
|
||||
|
||||
|
@ -1526,7 +1561,7 @@ dissect_kafka_produce_request_partition(tvbuff_t *tvb, packet_info *pinfo, proto
|
|||
|
||||
offset = dissect_kafka_partition_id_get_value(tvb, pinfo, subtree, offset, &packet_values);
|
||||
|
||||
offset = dissect_kafka_message_set(tvb, pinfo, subtree, offset, TRUE);
|
||||
offset = dissect_kafka_message_set(tvb, pinfo, subtree, offset, TRUE, KAFKA_MESSAGE_CODEC_NONE);
|
||||
|
||||
proto_item_append_text(ti, " (Partition-ID=%u)", packet_values.partition_id);
|
||||
|
||||
|
@ -3587,6 +3622,16 @@ proto_register_kafka(void)
|
|||
FT_BYTES, BASE_NONE, 0, 0,
|
||||
NULL, HFILL }
|
||||
},
|
||||
{ &hf_kafka_message_value_compressed,
|
||||
{ "Compressed Value", "kafka.message_value_compressed",
|
||||
FT_BYTES, BASE_NONE, 0, 0,
|
||||
NULL, HFILL }
|
||||
},
|
||||
{ &hf_kafka_message_compression_reduction,
|
||||
{ "Compression Reduction (compressed/uncompressed)", "kafka.message_compression_reduction",
|
||||
FT_FLOAT, BASE_NONE, 0, 0,
|
||||
NULL, HFILL }
|
||||
},
|
||||
{ &hf_kafka_consumer_group,
|
||||
{ "Consumer Group", "kafka.consumer_group",
|
||||
FT_STRING, BASE_NONE, 0, 0,
|
||||
|
|
Loading…
Reference in New Issue