Decompress gzipped Kafka messages.

svn path=/trunk/; revision=51726
This commit is contained in:
Evan Huus 2013-09-03 16:43:08 +00:00
parent dc002f2524
commit feb39701c5
1 changed files with 49 additions and 3 deletions

View File

@ -181,7 +181,7 @@ dissect_kafka_string(proto_tree *tree, int hf_item, tvbuff_t *tvb, packet_info *
proto_tree_add_string(tree, hf_item, tvb, offset, 0, NULL);
}
else {
proto_tree_add_item(tree, hf_item, tvb, offset, len, ENC_BIG_ENDIAN);
proto_tree_add_item(tree, hf_item, tvb, offset, len, ENC_NA|ENC_ASCII);
offset += len;
}
@ -204,19 +204,43 @@ dissect_kafka_bytes(proto_tree *tree, int hf_item, tvbuff_t *tvb, packet_info *p
proto_tree_add_bytes(tree, hf_item, tvb, offset, 0, NULL);
}
else {
proto_tree_add_item(tree, hf_item, tvb, offset, len, ENC_BIG_ENDIAN);
proto_tree_add_item(tree, hf_item, tvb, offset, len, ENC_NA);
offset += len;
}
return offset;
}
static tvbuff_t *
kafka_get_bytes(proto_tree *tree, tvbuff_t *tvb, packet_info *pinfo _U_, int offset)
{
gint32 len;
len = (gint32) tvb_get_ntohl(tvb, offset);
proto_tree_add_item(tree, hf_kafka_bytes_len, tvb, offset, 4, ENC_BIG_ENDIAN);
offset += 4;
if (len < -1) {
/* TODO expert info */
return NULL;
}
else if (len == -1) {
return NULL;
}
else {
return tvb_new_subset(tvb, offset, len, len);
}
}
static int
dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset)
{
proto_item *ti;
proto_tree *subtree;
tvbuff_t *raw, *payload;
int offset = start_offset;
guint8 codec;
ti = proto_tree_add_text(tree, tvb, offset, -1, "Message");
subtree = proto_item_add_subtree(ti, ett_kafka_message);
@ -228,10 +252,32 @@ dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int s
offset += 1;
proto_tree_add_item(subtree, hf_kafka_message_codec, tvb, offset, 1, ENC_BIG_ENDIAN);
codec = tvb_get_guint8(tvb, offset) & 0x07;
offset += 1;
offset = dissect_kafka_bytes(subtree, hf_kafka_message_key, tvb, pinfo, offset);
offset = dissect_kafka_bytes(subtree, hf_kafka_message_value, tvb, pinfo, offset);
switch (codec) {
case KAFKA_COMPRESSION_GZIP:
raw = kafka_get_bytes(tree, tvb, pinfo, offset);
offset += 4;
if (raw) {
payload = tvb_child_uncompress(tvb, raw, 0, tvb_length(raw));
add_new_data_source(pinfo, payload, "Uncompressed Message");
proto_tree_add_item(subtree, hf_kafka_message_value, payload, 0, -1, ENC_NA);
offset += tvb_length(raw);
}
else {
proto_tree_add_bytes(subtree, hf_kafka_message_value, tvb, offset, 0, NULL);
}
break;
case KAFKA_COMPRESSION_SNAPPY:
/* We can't uncompress snappy yet... */
case KAFKA_COMPRESSION_NONE:
default:
offset = dissect_kafka_bytes(subtree, hf_kafka_message_value, tvb, pinfo, offset);
}
proto_item_set_len(ti, offset - start_offset);