Kafka: fixed OffsetForLeaderEpoch dissection

Bug: 16023
Change-Id: I78e1354ac5509707c818d7968c7067583fb469ba
Reviewed-on: https://code.wireshark.org/review/34379
Petri-Dish: Michael Mann <mmann78@netscape.net>
Tested-by: Petri Dish Buildbot
Reviewed-by: Michael Mann <mmann78@netscape.net>
This commit is contained in:
Piotr Smolinski 2019-08-26 22:07:23 +02:00 committed by Michael Mann
parent a1532e7b16
commit 05b6a9ad0b

View file

@ -66,7 +66,6 @@ static int hf_kafka_log_start_offset = -1;
static int hf_kafka_first_offset = -1;
static int hf_kafka_producer_id = -1;
static int hf_kafka_producer_epoch = -1;
static int hf_kafka_message_set_size = -1;
static int hf_kafka_message_size = -1;
static int hf_kafka_message_crc = -1;
static int hf_kafka_message_magic = -1;
@ -145,8 +144,6 @@ static int hf_kafka_fetch_session_id = -1;
static int hf_kafka_fetch_session_epoch = -1;
static int hf_kafka_record_header_key = -1;
static int hf_kafka_record_header_value = -1;
static int hf_kafka_record_headers_count = -1;
static int hf_kafka_record_length = -1;
static int hf_kafka_record_attributes = -1;
static int hf_kafka_allow_auto_topic_creation = -1;
static int hf_kafka_validate_only = -1;
@ -254,6 +251,10 @@ static expert_field ei_kafka_unknown_api_key = EI_INIT;
static expert_field ei_kafka_unsupported_api_version = EI_INIT;
static expert_field ei_kafka_bad_string_length = EI_INIT;
static expert_field ei_kafka_bad_bytes_length = EI_INIT;
static expert_field ei_kafka_bad_array_length = EI_INIT;
static expert_field ei_kafka_bad_record_length = EI_INIT;
static expert_field ei_kafka_bad_varint = EI_INIT;
static expert_field ei_kafka_bad_message_set_length = EI_INIT;
static expert_field ei_kafka_unknown_message_magic = EI_INIT;
typedef gint16 kafka_api_key_t;
@ -692,7 +693,7 @@ typedef struct kafka_packet_values_t {
/* Forward declaration (dissect_kafka_message_set() and dissect_kafka_message() call each other...) */
static int
dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset, gboolean has_length_field, guint8 codec);
dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, gint offset, guint len, guint8 codec);
/* HELPERS */
@ -897,8 +898,13 @@ dissect_kafka_array_ref(proto_tree *tree, tvbuff_t *tvb, packet_info *pinfo, int
count = (gint32) tvb_get_ntohl(tvb, offset);
offset += 4;
for (i=0; i<count; i++) {
offset = func(tvb, pinfo, tree, offset, api_version);
if (count < -1) { // -1 means null array
expert_add_info(pinfo, proto_tree_get_parent(tree), &ei_kafka_bad_array_length);
}
else {
for (i=0; i<count; i++) {
offset = func(tvb, pinfo, tree, offset, api_version);
}
}
if (p_count != NULL) {
@ -916,7 +922,6 @@ dissect_kafka_array(proto_tree *tree, tvbuff_t *tvb, packet_info *pinfo, int off
return dissect_kafka_array_ref(tree, tvb, pinfo, offset, api_version, func, NULL);
}
static int
dissect_kafka_string(proto_tree *tree, int hf_item, tvbuff_t *tvb, packet_info *pinfo, int offset,
int *p_string_offset, int *p_string_len)
@ -1018,17 +1023,17 @@ dissect_kafka_timestamp(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree,
*
* tvb: actual data buffer
* offset: offset in the buffer where the string length is to be found
* p_val: pointer to a variable to store the returned value
* p_len: pointer to a variable to store the length of the variable
* p_overflow: pointer to a variable to store information that the value exceeds gint32 capacity
*
* returns: pointer to the next field in the message or -1
* returns: decoded value of 32-bit signed integer
*/
static gint32
tvb_read_kafka_varint32(tvbuff_t *tvb, int offset, gint32 *p_val, int *p_len)
tvb_read_kafka_varint32(tvbuff_t *tvb, gint offset, guint *p_len, gboolean *p_overflow)
{
gint32 v = 0;
guint8 p = 0;
int i = 0;
guint i = 0;
do {
p = tvb_get_guint8(tvb, offset+i);
@ -1036,21 +1041,17 @@ tvb_read_kafka_varint32(tvbuff_t *tvb, int offset, gint32 *p_val, int *p_len)
i += 1;
} while ((p&0x80)!=0 && i<5);
// 32-bit integer in zig-zag can take up to 5 octets
// the last octet can take at most 4 bits
// either continuation bit is set or there are more than 32 bits
DISSECTOR_ASSERT((p&0x80) == 0);
DISSECTOR_ASSERT(i < 5 || (p&0x70) == 0);
if (p_val != NULL) {
*p_val = (v>>1) ^ ((v & 1) ? -1 : 0);
}
if (p_len != NULL) {
*p_len = i;
}
// 32-bit integer in zig-zag can take up to 5 octets
// the last octet can take at most 4 bits
// either continuation bit is set or there are more than 32 bits
if (p_overflow != NULL) {
*p_overflow = ((p&0x80) != 0) || (i >= 5 && (p&0x70) != 0);
}
return offset+i;
return (v>>1) ^ ((v & 1) ? -1 : 0);
}
/*
@ -1061,18 +1062,17 @@ tvb_read_kafka_varint32(tvbuff_t *tvb, int offset, gint32 *p_val, int *p_len)
*
* tvb: actual data buffer
* offset: offset in the buffer where the string length is to be found
* p_val: pointer to a variable to store the returned value
* p_len: pointer to a variable to store the length of the variable
* p_overflow: pointer to a variable to store information that the value exceeds gint64 capacity
*
* returns: pointer to the next field in the message or -1
* returns: decoded value of 64-bit signed integer
*/
static gint64
tvb_read_kafka_varint64(tvbuff_t *tvb, int offset, gint64 *p_val, int *p_len)
tvb_read_kafka_varint64(tvbuff_t *tvb, gint offset, guint *p_len, gboolean *p_overflow)
{
gint64 v = 0;
guint8 p = 0;
int i = 0;
guint i = 0;
do {
p = tvb_get_guint8(tvb, offset+i);
@ -1080,48 +1080,39 @@ tvb_read_kafka_varint64(tvbuff_t *tvb, int offset, gint64 *p_val, int *p_len)
i += 1;
} while ((p&0x80)!=0 && i<10);
// 64-bit integer in zig-zag can take up to 10 octets
// the last octet can take at most 1 bit
// either continuation bit is set or there are more than 64 bits
DISSECTOR_ASSERT((p&0x80) == 0);
DISSECTOR_ASSERT(i < 10 || (p&0x7e) == 0);
if (p_val != NULL) {
*p_val = (v>>1) ^ ((v & 1) ? -1 : 0);
}
if (p_len != NULL) {
*p_len = i;
}
// 64-bit integer in zig-zag can take up to 10 octets
// the last octet can take at most 1 bit
// either continuation bit is set or there are more than 64 bits
if (p_overflow != NULL) {
*p_overflow = ((p&0x80) != 0) || (i >= 10 && (p&0x7e) != 0);
}
return offset+i;
}
static int
dissect_kafka_varint(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int hf_item, int offset)
{
gint64 val;
int len;
tvb_read_kafka_varint64(tvb, offset, &val, &len);
proto_tree_add_int64(tree, hf_item, tvb, offset, len, val);
return offset+len;
return (v>>1) ^ ((v & 1) ? -1 : 0);
}
static int
dissect_kafka_timestamp_delta(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int hf_item, int offset, guint64 first_timestamp)
{
nstime_t nstime;
guint64 milliseconds;
guint64 val;
int len;
nstime_t nstime;
guint64 milliseconds;
guint64 val;
int len;
gboolean overflow;
proto_item *pi;
tvb_read_kafka_varint64(tvb, offset, &val, &len);
val = tvb_read_kafka_varint64(tvb, offset, &len, &overflow);
milliseconds = first_timestamp + val;
nstime.secs = (time_t) (milliseconds / 1000);
nstime.nsecs = (int) ((milliseconds % 1000) * 1000000);
proto_tree_add_time(tree, hf_item, tvb, offset, len, &nstime);
pi = proto_tree_add_time(tree, hf_item, tvb, offset, len, &nstime);
if (overflow) {
expert_add_info(pinfo, pi, &ei_kafka_bad_varint);
}
return offset+len;
}
@ -1129,10 +1120,11 @@ dissect_kafka_timestamp_delta(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree
static int
dissect_kafka_offset_delta(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int hf_item, int offset, guint64 base_offset)
{
gint64 val;
int len;
gint64 val;
int len;
gboolean overflow;
tvb_read_kafka_varint64(tvb, offset, &val, &len);
val = tvb_read_kafka_varint64(tvb, offset, &len, &overflow);
proto_tree_add_int64(tree, hf_item, tvb, offset, len, base_offset+val);
@ -1162,12 +1154,30 @@ dissect_kafka_string_new(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree
{
gint val;
gint len;
gboolean overflow;
proto_item *pi;
tvb_read_kafka_varint32(tvb, offset, &val, &len);
val = tvb_read_kafka_varint32(tvb, offset, &len, &overflow);
DISSECTOR_ASSERT(val >= 0);
proto_tree_add_item(tree, hf_item, tvb, offset+len, val, ENC_UTF_8|ENC_NA);
if (overflow) {
pi = proto_tree_add_string_format_value(tree, hf_item, tvb, offset+len, 0, NULL, "<INVALID>");
expert_add_info(pinfo, pi, &ei_kafka_bad_varint);
val = 0;
} else if (val > 0) {
// there is payload available, possibly with 0 octets
pi = proto_tree_add_item(tree, hf_item, tvb, offset+len, val, ENC_NA | ENC_UTF_8);
} else if (val == 0) {
// there is empty payload (0 octets)
pi = proto_tree_add_string_format_value(tree, hf_item, tvb, offset+len, 0, NULL, "<EMPTY>");
} else if (val == -1) {
// there is no payload (null)
pi = proto_tree_add_string_format_value(tree, hf_item, tvb, offset+len, 0, NULL, "<NULL>");
val = 0;
} else {
pi = proto_tree_add_string_format_value(tree, hf_item, tvb, offset+len, 0, NULL, "<INVALID>");
expert_add_info(pinfo, pi, &ei_kafka_bad_string_length);
val = 0;
}
if (p_string_offset != NULL) {
*p_string_offset = offset+len;
@ -1200,24 +1210,33 @@ dissect_kafka_string_new(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree
static int
dissect_kafka_bytes_new(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int hf_item, int offset, int *p_bytes_offset, int *p_bytes_length)
{
gint val;
gint len;
gint val;
gint len;
gboolean overflow;
proto_item *pi;
tvb_read_kafka_varint32(tvb, offset, &val, &len);
val = tvb_read_kafka_varint32(tvb, offset, &len, &overflow);
DISSECTOR_ASSERT(val >= -1);
if (val > 0) {
if (overflow) {
pi = proto_tree_add_bytes_format_value(tree, hf_item, tvb, offset+len, 0, NULL, "<INVALID>");
expert_add_info(pinfo, pi, &ei_kafka_bad_varint);
val = 0;
} else if (val > 0) {
// there is payload available, possibly with 0 octets
proto_tree_add_item(tree, hf_item, tvb, offset+len, val, ENC_NA);
pi = proto_tree_add_item(tree, hf_item, tvb, offset+len, val, ENC_NA);
} else if (val == 0) {
// there is empty payload (0 octets)
proto_tree_add_bytes_format_value(tree, hf_item, tvb, offset+len, 0, NULL, "<EMPTY>");
} else {
pi = proto_tree_add_bytes_format_value(tree, hf_item, tvb, offset+len, 0, NULL, "<EMPTY>");
} else if (val == -1) {
// there is no payload (null)
proto_tree_add_bytes_format_value(tree, hf_item, tvb, offset+len, 0, NULL, "<NULL>");
pi = proto_tree_add_bytes_format_value(tree, hf_item, tvb, offset+len, 0, NULL, "<NULL>");
val = 0;
} else {
pi = proto_tree_add_bytes_format_value(tree, hf_item, tvb, offset+len, 0, NULL, "<INVALID>");
expert_add_info(pinfo, pi, &ei_kafka_bad_bytes_length);
val = 0;
}
if (p_bytes_offset != NULL) {
*p_bytes_offset = offset+len;
}
@ -1265,17 +1284,21 @@ dissect_kafka_record_headers(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *
{
proto_item *record_headers_ti;
proto_tree *subtree;
gint32 count;
int i;
gint32 count;
gint len;
gboolean overflow;
int i;
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_record_headers, &record_headers_ti, "Headers");
tvb_read_kafka_varint32(tvb, offset, &count, NULL);
offset = dissect_kafka_varint(tvb, pinfo, subtree, hf_kafka_record_headers_count, offset);
count = tvb_read_kafka_varint32(tvb, offset, &len, &overflow);
if (overflow) {
expert_add_info(pinfo, record_headers_ti, &ei_kafka_bad_varint);
} else if (count < -1) { // -1 means null array
expert_add_info(pinfo, record_headers_ti, &ei_kafka_bad_array_length);
}
// null array is marked by -1 as opposed to empty array marked by 0
DISSECTOR_ASSERT(count >= -1);
offset += len;
for (i=0;i<count;i++) {
offset = dissect_kafka_record_headers_header(tvb, pinfo, subtree, offset);
}
@ -1291,8 +1314,9 @@ dissect_kafka_record(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, in
proto_item *record_ti;
proto_tree *subtree;
gint32 val;
int len;
gint32 size;
gint len;
gboolean overflow;
int offset, end_offset;
@ -1300,11 +1324,17 @@ dissect_kafka_record(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, in
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_record, &record_ti, "Record");
tvb_read_kafka_varint32(tvb, offset, &val, &len);
DISSECTOR_ASSERT(val >= 0);
end_offset = offset + len + val;
size = tvb_read_kafka_varint32(tvb, offset, &len, &overflow);
if (overflow) {
expert_add_info(pinfo, record_ti, &ei_kafka_bad_varint);
return offset + len;
} else if (size < 6) {
expert_add_info(pinfo, record_ti, &ei_kafka_bad_record_length);
return offset + len;
}
offset = dissect_kafka_varint(tvb, pinfo, subtree, hf_kafka_record_length, offset);
end_offset = offset + len + size;
offset += len;
proto_tree_add_item(subtree, hf_kafka_record_attributes, tvb, offset, 1, ENC_BIG_ENDIAN);
offset += 1;
@ -1317,9 +1347,9 @@ dissect_kafka_record(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, in
offset = dissect_kafka_record_headers(tvb, pinfo, subtree, offset);
DISSECTOR_ASSERT(offset == end_offset);
proto_item_set_end(record_ti, tvb, offset);
if (offset != end_offset) {
expert_add_info(pinfo, record_ti, &ei_kafka_bad_record_length);
}
return end_offset;
}
@ -1660,13 +1690,15 @@ dissect_kafka_message_old(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, i
if (decompress(tvb, pinfo, offset, length, codec, &decompressed_tvb, &decompressed_offset)==1) {
add_new_data_source(pinfo, decompressed_tvb, "Decompressed content");
show_compression_reduction(tvb, subtree, length, tvb_captured_length(decompressed_tvb));
dissect_kafka_message_set(decompressed_tvb, pinfo, subtree, decompressed_offset, FALSE, codec);
dissect_kafka_message_set(decompressed_tvb, pinfo, subtree, decompressed_offset,
tvb_reported_length_remaining(decompressed_tvb, decompressed_offset), codec);
} else {
proto_item_append_text(subtree, " [Cannot decompress records]");
}
offset += length;
}
proto_item_set_len(message_ti, offset - start_offset);
proto_item_set_end(message_ti, tvb, offset);
return offset;
}
@ -1787,27 +1819,12 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int o
}
static int
dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset, gboolean has_length_field, guint8 codec)
dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, gint offset, guint len, guint8 codec)
{
proto_item *ti;
proto_tree *subtree;
gint len, message_size;
int offset = start_offset;
int messages = 0;
if (has_length_field) {
proto_tree_add_item(tree, hf_kafka_message_set_size, tvb, offset, 4, ENC_BIG_ENDIAN);
len = (gint)tvb_get_ntohl(tvb, offset);
offset += 4;
start_offset += 4;
}
else {
len = tvb_reported_length_remaining(tvb, offset);
}
if (len <= 0) {
return offset;
}
gint end_offset = offset + len;
guint messages = 0;
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_message_set, &ti, "Message Set");
/* If set came from a compressed message, make it obvious in tree root */
@ -1815,19 +1832,17 @@ dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, i
proto_item_append_text(subtree, " [from compressed %s message]", val_to_str_const(codec, kafka_message_codecs, "Unknown"));
}
while (offset - start_offset < len) {
message_size = tvb_get_guint32(tvb, offset+8, ENC_BIG_ENDIAN);
dissect_kafka_message(tvb, pinfo, subtree, offset);
offset += 12 + message_size;
while (offset < end_offset) {
offset = dissect_kafka_message(tvb, pinfo, subtree, offset);
messages += 1;
}
if (offset != end_offset) {
expert_add_info(pinfo, ti, &ei_kafka_bad_message_set_length);
}
proto_item_append_text(ti, " (%d Messages)", messages);
proto_item_set_len(ti, offset - start_offset);
proto_item_set_end(ti, tvb, offset);
return offset;
}
@ -2892,7 +2907,8 @@ dissect_kafka_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo, proto_
{
proto_item *ti;
proto_tree *subtree;
int offset = start_offset;
int offset = start_offset;
guint len;
kafka_packet_values_t packet_values;
memset(&packet_values, 0, sizeof(packet_values));
@ -2918,7 +2934,12 @@ dissect_kafka_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo, proto_
offset = dissect_kafka_aborted_transactions(tvb, pinfo, subtree, offset, api_version);
}
offset = dissect_kafka_message_set(tvb, pinfo, subtree, offset, TRUE, KAFKA_MESSAGE_CODEC_NONE);
len = tvb_get_ntohl(tvb, offset);
offset += 4;
if (len > 0) {
offset = dissect_kafka_message_set(tvb, pinfo, subtree, offset, len, KAFKA_MESSAGE_CODEC_NONE);
}
proto_item_set_len(ti, offset - start_offset);
@ -2983,6 +3004,7 @@ dissect_kafka_produce_request_partition(tvbuff_t *tvb, packet_info *pinfo, proto
{
proto_item *ti;
proto_tree *subtree;
guint len;
kafka_packet_values_t packet_values;
memset(&packet_values, 0, sizeof(packet_values));
@ -2990,7 +3012,12 @@ dissect_kafka_produce_request_partition(tvbuff_t *tvb, packet_info *pinfo, proto
offset = dissect_kafka_partition_id_ret(tvb, pinfo, subtree, offset, &packet_values.partition_id);
offset = dissect_kafka_message_set(tvb, pinfo, subtree, offset, TRUE, KAFKA_MESSAGE_CODEC_NONE);
len = tvb_get_ntohl(tvb, offset);
offset += 4;
if (len > 0) {
offset = dissect_kafka_message_set(tvb, pinfo, subtree, offset, len, KAFKA_MESSAGE_CODEC_NONE);
}
proto_item_append_text(ti, " (ID=%u)", packet_values.partition_id);
proto_item_set_end(ti, tvb, offset);
@ -5099,12 +5126,15 @@ dissect_kafka_offset_for_leader_epoch_request_topic_partition(tvbuff_t *tvb, pac
partition_id = tvb_get_ntohl(tvb, offset);
proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN);
offset += 4;
if (api_version >= 2) {
proto_tree_add_item(subtree, hf_kafka_current_leader_epoch, tvb, offset, 4, ENC_BIG_ENDIAN);
offset += 4;
}
proto_tree_add_item(subtree, hf_kafka_leader_epoch, tvb, offset, 4, ENC_BIG_ENDIAN);
offset += 4;
proto_item_set_end(subti, tvb, offset);
@ -5263,7 +5293,7 @@ dissect_kafka_add_partitions_to_txn_request_topic_partition(tvbuff_t *tvb, packe
{
proto_tree_add_item(tree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN);
return offset;
return offset+4;
}
static int
@ -8354,11 +8384,6 @@ proto_register_kafka(void)
FT_INT32, BASE_DEC, 0, 0,
NULL, HFILL }
},
{ &hf_kafka_message_set_size,
{ "Message Set Size", "kafka.message_set_size",
FT_INT32, BASE_DEC, 0, 0,
NULL, HFILL }
},
{ &hf_kafka_message_size,
{ "Message Size", "kafka.message_size",
FT_INT32, BASE_DEC, 0, 0,
@ -8734,16 +8759,6 @@ proto_register_kafka(void)
FT_BYTES, BASE_SHOW_ASCII_PRINTABLE, 0, 0,
NULL, HFILL }
},
{ &hf_kafka_record_headers_count,
{ "Headers Count", "kafka.headers_count",
FT_INT64, BASE_DEC, 0, 0,
NULL, HFILL }
},
{ &hf_kafka_record_length,
{ "Record Length", "kafka.record_length",
FT_INT64, BASE_DEC, 0, 0,
NULL, HFILL }
},
{ &hf_kafka_record_attributes,
{ "Record Attributes (reserved)", "kafka.record_attributes",
FT_INT8, BASE_DEC, 0, 0,
@ -9027,6 +9042,14 @@ proto_register_kafka(void)
{ "kafka.bad_string_length", PI_MALFORMED, PI_WARN, "Invalid string length field", EXPFILL }},
{ &ei_kafka_bad_bytes_length,
{ "kafka.bad_bytes_length", PI_MALFORMED, PI_WARN, "Invalid byte length field", EXPFILL }},
{ &ei_kafka_bad_array_length,
{ "kafka.bad_array_length", PI_MALFORMED, PI_WARN, "Invalid array length field", EXPFILL }},
{ &ei_kafka_bad_record_length,
{ "kafka.bad_record_length", PI_MALFORMED, PI_WARN, "Invalid record length field", EXPFILL }},
{ &ei_kafka_bad_varint,
{ "kafka.bad_varint", PI_MALFORMED, PI_WARN, "Invalid varint bytes", EXPFILL }},
{ &ei_kafka_bad_message_set_length,
{ "kafka.ei_kafka_bad_message_set_length", PI_MALFORMED, PI_WARN, "Message set size does not match content", EXPFILL }},
{ &ei_kafka_unknown_message_magic,
{ "kafka.unknown_message_magic", PI_MALFORMED, PI_WARN, "Invalid message magic field", EXPFILL }},
};