From 24712ec0733ffba581f44a333349dd21bc4427a7 Mon Sep 17 00:00:00 2001 From: Piotr Smolinski Date: Wed, 10 Jun 2020 22:56:34 +0200 Subject: [PATCH] Kafka: fix the FETCH response alignment issue There was a problem in FETCH response parsing when the server had more data than the requested maximal return size. In such case the server checks if the first chunk of data fits into buffer. If it does not, the first chunk is returned as a whole to the requestor. Otherwise it is assumed that the client is capable of discarding invalid content and the server pushes maximum available block. It makes sense, because the default block is 10MB and pushing it opaque leverages zero-copy IO from the file system to the network. In the existing implementation it was assumed that the last batch is aligned with the end of the buffer. Actually, if there is some data more, the last part is delivered truncated. This patch: * fixes the last part alignment handling * adds opaque field for truncated content * moves preferred replica field to the proper context Bug: 16623 Change-Id: Iee6d513ce6711091e5561646a3fd563501eabdda Reviewed-on: https://code.wireshark.org/review/37446 Petri-Dish: Alexis La Goutte Tested-by: Petri Dish Buildbot Reviewed-by: Anders Broman --- epan/dissectors/packet-kafka.c | 48 +++++++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 13 deletions(-) diff --git a/epan/dissectors/packet-kafka.c b/epan/dissectors/packet-kafka.c index ba9932c4da..daab237e65 100644 --- a/epan/dissectors/packet-kafka.c +++ b/epan/dissectors/packet-kafka.c @@ -85,6 +85,7 @@ static int hf_kafka_batch_size = -1; static int hf_kafka_message_key = -1; static int hf_kafka_message_value = -1; static int hf_kafka_message_compression_reduction = -1; +static int hf_kafka_truncated_content = -1; static int hf_kafka_request_frame = -1; static int hf_kafka_response_frame = -1; static int hf_kafka_consumer_group = -1; @@ -1572,11 +1573,12 @@ decompress(tvbuff_t *tvb, packet_info *pinfo, int offset, guint32 length, int co * tree: protocol information tree to append the item * hf_item: protocol information item descriptor index * offset: pointer to the message + * end_offset: last possible offset in this batch * * returns: pointer to the next message/batch */ static int -dissect_kafka_message_old(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset) +dissect_kafka_message_old(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, int end_offset _U_) { proto_item *message_ti; proto_tree *subtree; @@ -1657,11 +1659,12 @@ dissect_kafka_message_old(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, i * tree: protocol information tree to append the item * hf_item: protocol information item descriptor index * offset: pointer to the message + * end_offset: last possible offset in this batch * * returns: pointer to the next message/batch */ static int -dissect_kafka_message_new(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset) +dissect_kafka_message_new(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, int end_offset _U_) { proto_item *batch_ti; proto_tree *subtree; @@ -1749,15 +1752,30 @@ dissect_kafka_message_new(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, i } static int -dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset) +dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, int end_offset) { gint8 magic_byte; + guint32 message_size; - magic_byte = tvb_get_guint8(tvb, offset+16); + if (offset + 12 > end_offset) { + // in this case we deal with truncated message, where the size part may be also truncated + // actually we may add truncated info + proto_tree_add_item(tree, hf_kafka_truncated_content, tvb, offset, end_offset-offset, ENC_NA); + return end_offset; + } + message_size = tvb_get_guint32(tvb, offset + 8, ENC_BIG_ENDIAN); + if (offset + 12 + message_size > (guint32)end_offset) { + // in this case we deal with truncated message, where the truncation point falls somewhere + // in the message body + proto_tree_add_item(tree, hf_kafka_truncated_content, tvb, offset, end_offset-offset, ENC_NA); + return end_offset; + } + + magic_byte = tvb_get_guint8(tvb, offset + 16); if (magic_byte < 2) { - return dissect_kafka_message_old(tvb, pinfo, tree, offset); + return dissect_kafka_message_old(tvb, pinfo, tree, offset, end_offset); } else { - return dissect_kafka_message_new(tvb, pinfo, tree, offset); + return dissect_kafka_message_new(tvb, pinfo, tree, offset, end_offset); } } @@ -1776,7 +1794,7 @@ dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, g } while (offset < end_offset) { - offset = dissect_kafka_message(tvb, pinfo, subtree, offset); + offset = dissect_kafka_message(tvb, pinfo, subtree, offset, end_offset); messages += 1; } @@ -1784,7 +1802,6 @@ dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, g expert_add_info(pinfo, ti, &ei_kafka_bad_message_set_length); } - proto_item_append_text(ti, " (%d Messages)", messages); proto_item_set_end(ti, tvb, offset); return offset; @@ -2873,6 +2890,11 @@ dissect_kafka_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo, proto_ offset = dissect_kafka_aborted_transactions(tvb, pinfo, subtree, offset, api_version); } + if (api_version >= 11) { + proto_tree_add_item(subtree, hf_kafka_replica, tvb, offset, 4, ENC_BIG_ENDIAN); + offset += 4; + } + len = tvb_get_ntohl(tvb, offset); offset += 4; @@ -2925,11 +2947,6 @@ dissect_kafka_fetch_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree offset += 4; } - if (api_version >= 11) { - proto_tree_add_item(tree, hf_kafka_replica, tvb, offset, 4, ENC_BIG_ENDIAN); - offset += 4; - } - return dissect_kafka_array(tree, tvb, pinfo, offset, api_version, &dissect_kafka_fetch_response_topic); } @@ -8425,6 +8442,11 @@ proto_register_kafka(void) FT_FLOAT, BASE_NONE, 0, 0, NULL, HFILL } }, + { &hf_kafka_truncated_content, + { "Truncated Content", "kafka.truncated_content", + FT_BYTES, BASE_NONE, 0, 0, + NULL, HFILL } + }, { &hf_kafka_consumer_group, { "Consumer Group", "kafka.consumer_group", FT_STRING, STR_ASCII, 0, 0,