Kafka: fix the FETCH response alignment issue

There was a problem in FETCH response parsing when the server had more data than the requested
maximal return size. In such case the server checks if the first chunk of data fits into buffer. If it does not,
the first chunk is returned as a whole to the requestor. Otherwise it is assumed that the client is capable
of discarding invalid content and the server pushes maximum available block. It makes sense, because
the default block is 10MB and pushing it opaque leverages zero-copy IO from the file system to the network.

In the existing implementation it was assumed that the last batch is aligned with the end of the buffer.
Actually, if there is some data more, the last part is delivered truncated.

This patch:
* fixes the last part alignment handling
* adds opaque field for truncated content
* moves preferred replica field to the proper context

Bug: 16623
Change-Id: Iee6d513ce6711091e5561646a3fd563501eabdda
Reviewed-on: https://code.wireshark.org/review/37446
Petri-Dish: Alexis La Goutte <alexis.lagoutte@gmail.com>
Tested-by: Petri Dish Buildbot
Reviewed-by: Anders Broman <a.broman58@gmail.com>
This commit is contained in:
Piotr Smolinski 2020-06-10 22:56:34 +02:00 committed by Anders Broman
parent c97076b7d7
commit 24712ec073
1 changed files with 35 additions and 13 deletions

View File

@ -85,6 +85,7 @@ static int hf_kafka_batch_size = -1;
static int hf_kafka_message_key = -1;
static int hf_kafka_message_value = -1;
static int hf_kafka_message_compression_reduction = -1;
static int hf_kafka_truncated_content = -1;
static int hf_kafka_request_frame = -1;
static int hf_kafka_response_frame = -1;
static int hf_kafka_consumer_group = -1;
@ -1572,11 +1573,12 @@ decompress(tvbuff_t *tvb, packet_info *pinfo, int offset, guint32 length, int co
* tree: protocol information tree to append the item
* hf_item: protocol information item descriptor index
* offset: pointer to the message
* end_offset: last possible offset in this batch
*
* returns: pointer to the next message/batch
*/
static int
dissect_kafka_message_old(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset)
dissect_kafka_message_old(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, int end_offset _U_)
{
proto_item *message_ti;
proto_tree *subtree;
@ -1657,11 +1659,12 @@ dissect_kafka_message_old(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, i
* tree: protocol information tree to append the item
* hf_item: protocol information item descriptor index
* offset: pointer to the message
* end_offset: last possible offset in this batch
*
* returns: pointer to the next message/batch
*/
static int
dissect_kafka_message_new(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset)
dissect_kafka_message_new(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, int end_offset _U_)
{
proto_item *batch_ti;
proto_tree *subtree;
@ -1749,15 +1752,30 @@ dissect_kafka_message_new(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, i
}
static int
dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset)
dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, int end_offset)
{
gint8 magic_byte;
guint32 message_size;
magic_byte = tvb_get_guint8(tvb, offset+16);
if (offset + 12 > end_offset) {
// in this case we deal with truncated message, where the size part may be also truncated
// actually we may add truncated info
proto_tree_add_item(tree, hf_kafka_truncated_content, tvb, offset, end_offset-offset, ENC_NA);
return end_offset;
}
message_size = tvb_get_guint32(tvb, offset + 8, ENC_BIG_ENDIAN);
if (offset + 12 + message_size > (guint32)end_offset) {
// in this case we deal with truncated message, where the truncation point falls somewhere
// in the message body
proto_tree_add_item(tree, hf_kafka_truncated_content, tvb, offset, end_offset-offset, ENC_NA);
return end_offset;
}
magic_byte = tvb_get_guint8(tvb, offset + 16);
if (magic_byte < 2) {
return dissect_kafka_message_old(tvb, pinfo, tree, offset);
return dissect_kafka_message_old(tvb, pinfo, tree, offset, end_offset);
} else {
return dissect_kafka_message_new(tvb, pinfo, tree, offset);
return dissect_kafka_message_new(tvb, pinfo, tree, offset, end_offset);
}
}
@ -1776,7 +1794,7 @@ dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, g
}
while (offset < end_offset) {
offset = dissect_kafka_message(tvb, pinfo, subtree, offset);
offset = dissect_kafka_message(tvb, pinfo, subtree, offset, end_offset);
messages += 1;
}
@ -1784,7 +1802,6 @@ dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, g
expert_add_info(pinfo, ti, &ei_kafka_bad_message_set_length);
}
proto_item_append_text(ti, " (%d Messages)", messages);
proto_item_set_end(ti, tvb, offset);
return offset;
@ -2873,6 +2890,11 @@ dissect_kafka_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo, proto_
offset = dissect_kafka_aborted_transactions(tvb, pinfo, subtree, offset, api_version);
}
if (api_version >= 11) {
proto_tree_add_item(subtree, hf_kafka_replica, tvb, offset, 4, ENC_BIG_ENDIAN);
offset += 4;
}
len = tvb_get_ntohl(tvb, offset);
offset += 4;
@ -2925,11 +2947,6 @@ dissect_kafka_fetch_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree
offset += 4;
}
if (api_version >= 11) {
proto_tree_add_item(tree, hf_kafka_replica, tvb, offset, 4, ENC_BIG_ENDIAN);
offset += 4;
}
return dissect_kafka_array(tree, tvb, pinfo, offset, api_version, &dissect_kafka_fetch_response_topic);
}
@ -8425,6 +8442,11 @@ proto_register_kafka(void)
FT_FLOAT, BASE_NONE, 0, 0,
NULL, HFILL }
},
{ &hf_kafka_truncated_content,
{ "Truncated Content", "kafka.truncated_content",
FT_BYTES, BASE_NONE, 0, 0,
NULL, HFILL }
},
{ &hf_kafka_consumer_group,
{ "Consumer Group", "kafka.consumer_group",
FT_STRING, STR_ASCII, 0, 0,