Kafka: Limit our decompression size.

Don't assume that the Internet has our best interests at heart when it
gives us the size of our decompression buffer. Assign an arbitrary limit
of 50 MB.

This fixes #16739 in that it takes care of

** (process:17681): WARNING **: 20:03:07.440: Dissector bug, protocol Kafka, in packet 31: ../epan/proto.c:7043: failed assertion "end >= fi->start"

which is different from the original error output. It looks like *that*
might have taken care of in one of the other recent Kafka bug fixes.

The decompression routines return a success or failure status. Use
gbooleans instead of ints for that.
This commit is contained in:
Gerald Combs 2020-12-01 12:00:13 -08:00
parent a9fc769d7b
commit f4374967bb
1 changed files with 32 additions and 24 deletions

View File

@ -266,6 +266,7 @@ static expert_field ei_kafka_bad_array_length = EI_INIT;
static expert_field ei_kafka_bad_record_length = EI_INIT;
static expert_field ei_kafka_bad_varint = EI_INIT;
static expert_field ei_kafka_bad_message_set_length = EI_INIT;
static expert_field ei_kafka_bad_decompression_length = EI_INIT;
static expert_field ei_kafka_unknown_message_magic = EI_INIT;
static expert_field ei_kafka_pdu_length_mismatch = EI_INIT;
@ -1592,29 +1593,29 @@ dissect_kafka_record(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, in
return end_offset;
}
static int
static gboolean
decompress_none(tvbuff_t *tvb, packet_info *pinfo _U_, int offset, guint32 length _U_, tvbuff_t **decompressed_tvb, int *decompressed_offset)
{
*decompressed_tvb = tvb;
*decompressed_offset = offset;
return 1;
return TRUE;
}
static int
static gboolean
decompress_gzip(tvbuff_t *tvb, packet_info *pinfo, int offset, guint32 length, tvbuff_t **decompressed_tvb, int *decompressed_offset)
{
*decompressed_tvb = tvb_child_uncompress(tvb, tvb, offset, length);
*decompressed_offset = 0;
if (*decompressed_tvb) {
return 1;
return TRUE;
} else {
col_append_str(pinfo->cinfo, COL_INFO, " [gzip decompression failed] ");
return 0;
return FALSE;
}
}
#ifdef HAVE_LZ4FRAME_H
static int
static gboolean
decompress_lz4(tvbuff_t *tvb, packet_info *pinfo, int offset, guint32 length, tvbuff_t **decompressed_tvb, int *decompressed_offset)
{
LZ4F_decompressionContext_t lz4_ctxt = NULL;
@ -1624,7 +1625,7 @@ decompress_lz4(tvbuff_t *tvb, packet_info *pinfo, int offset, guint32 length, tv
guchar *decompressed_buffer = NULL;
tvbuff_t *composite_tvb = NULL;
int ret = 0;
gboolean ret = FALSE;
/* Prepare compressed data buffer */
guint8 *data = (guint8*)tvb_memdup(wmem_packet_scope(), tvb, offset, length);
@ -1694,7 +1695,7 @@ decompress_lz4(tvbuff_t *tvb, packet_info *pinfo, int offset, guint32 length, tv
src_offset += src_size; // bump up the offset for the next iteration
} while (rc > 0);
ret = 1;
ret = TRUE;
end:
if (composite_tvb) {
tvb_composite_finalize(composite_tvb);
@ -1710,23 +1711,23 @@ end:
return ret;
}
#else
static int
static gboolean
decompress_lz4(tvbuff_t *tvb _U_, packet_info *pinfo, int offset _U_, guint32 length _U_, tvbuff_t **decompressed_tvb _U_, int *decompressed_offset _U_)
{
col_append_str(pinfo->cinfo, COL_INFO, " [lz4 decompression unsupported]");
return 0;
return FALSE;
}
#endif /* HAVE_LZ4FRAME_H */
#ifdef HAVE_SNAPPY
static int
static gboolean
decompress_snappy(tvbuff_t *tvb, packet_info *pinfo, int offset, guint32 length, tvbuff_t **decompressed_tvb, int *decompressed_offset)
{
guint8 *data = (guint8*)tvb_memdup(wmem_packet_scope(), tvb, offset, length);
size_t uncompressed_size;
snappy_status rc = SNAPPY_OK;
tvbuff_t *composite_tvb = NULL;
int ret = 0;
gboolean ret = FALSE;
if (tvb_memeql(tvb, offset, kafka_xerial_header, sizeof(kafka_xerial_header)) == 0) {
@ -1791,7 +1792,7 @@ decompress_snappy(tvbuff_t *tvb, packet_info *pinfo, int offset, guint32 length,
*decompressed_offset = 0;
}
ret = 1;
ret = TRUE;
end:
if (composite_tvb) {
tvb_composite_finalize(composite_tvb);
@ -1800,29 +1801,29 @@ end:
*decompressed_offset = 0;
}
}
if (ret == 0) {
if (ret == FALSE) {
col_append_str(pinfo->cinfo, COL_INFO, " [snappy decompression failed]");
}
return ret;
}
#else
static int
static gboolean
decompress_snappy(tvbuff_t *tvb _U_, packet_info *pinfo, int offset _U_, int length _U_, tvbuff_t **decompressed_tvb _U_, int *decompressed_offset _U_)
{
col_append_str(pinfo->cinfo, COL_INFO, " [snappy decompression unsupported]");
return 0;
return FALSE;
}
#endif /* HAVE_SNAPPY */
#ifdef HAVE_ZSTD
static int
static gboolean
decompress_zstd(tvbuff_t *tvb, packet_info *pinfo, int offset, guint32 length, tvbuff_t **decompressed_tvb, int *decompressed_offset)
{
ZSTD_inBuffer input = { tvb_memdup(wmem_packet_scope(), tvb, offset, length), length, 0 };
ZSTD_DStream *zds = ZSTD_createDStream();
size_t rc = 0;
tvbuff_t *composite_tvb = NULL;
int ret = 0;
gboolean ret = FALSE;
do {
ZSTD_outBuffer output = { wmem_alloc(pinfo->pool, ZSTD_DStreamOutSize()), ZSTD_DStreamOutSize(), 0 };
@ -1839,7 +1840,7 @@ decompress_zstd(tvbuff_t *tvb, packet_info *pinfo, int offset, guint32 length, t
tvb_new_child_real_data(tvb, (guint8*)output.dst, (guint)output.pos, (gint)output.pos));
// rc == 0 means there is nothing more to decompress, but there could be still something in the data
} while (rc > 0);
ret = 1;
ret = TRUE;
end:
if (composite_tvb) {
tvb_composite_finalize(composite_tvb);
@ -1855,17 +1856,22 @@ end:
return ret;
}
#else
static int
static gboolean
decompress_zstd(tvbuff_t *tvb _U_, packet_info *pinfo, int offset _U_, guint32 length _U_, tvbuff_t **decompressed_tvb _U_, int *decompressed_offset _U_)
{
col_append_str(pinfo->cinfo, COL_INFO, " [zstd compression unsupported]");
return 0;
return FALSE;
}
#endif /* HAVE_ZSTD */
static int
#define MAX_DECOMPRESSION_SIZE (50 * 1000 * 1000) // Arbitrary
static gboolean
decompress(tvbuff_t *tvb, packet_info *pinfo, int offset, guint32 length, int codec, tvbuff_t **decompressed_tvb, int *decompressed_offset)
{
if (length > MAX_DECOMPRESSION_SIZE) {
expert_add_info(pinfo, NULL, &ei_kafka_bad_decompression_length);
return FALSE;
}
switch (codec) {
case KAFKA_MESSAGE_CODEC_SNAPPY:
return decompress_snappy(tvb, pinfo, offset, length, decompressed_tvb, decompressed_offset);
@ -1879,7 +1885,7 @@ decompress(tvbuff_t *tvb, packet_info *pinfo, int offset, guint32 length, int co
return decompress_none(tvb, pinfo, offset, length, decompressed_tvb, decompressed_offset);
default:
col_append_str(pinfo->cinfo, COL_INFO, " [unsupported compression type]");
return 0;
return FALSE;
}
}
@ -1966,10 +1972,10 @@ dissect_kafka_message_old(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, i
show_compression_reduction(tvb, subtree, length, tvb_captured_length(decompressed_tvb));
dissect_kafka_message_set(decompressed_tvb, pinfo, subtree, decompressed_offset,
tvb_reported_length_remaining(decompressed_tvb, decompressed_offset), codec);
offset += length;
} else {
proto_item_append_text(subtree, " [Cannot decompress records]");
}
offset += length;
}
proto_item_set_end(message_ti, tvb, offset);
@ -10242,6 +10248,8 @@ proto_register_kafka_expert_module(const int proto) {
{ "kafka.bad_varint", PI_MALFORMED, PI_WARN, "Invalid varint bytes", EXPFILL }},
{ &ei_kafka_bad_message_set_length,
{ "kafka.ei_kafka_bad_message_set_length", PI_MALFORMED, PI_WARN, "Message set size does not match content", EXPFILL }},
{ &ei_kafka_bad_decompression_length,
{ "kafka.ei_kafka_bad_decompression_length", PI_MALFORMED, PI_WARN, "Decompression size too large", EXPFILL }},
{ &ei_kafka_unknown_message_magic,
{ "kafka.unknown_message_magic", PI_MALFORMED, PI_WARN, "Invalid message magic field", EXPFILL }},
{ &ei_kafka_pdu_length_mismatch,