Dissect more of the Kafka protocol.

svn path=/trunk/; revision=51030
This commit is contained in:
Evan Huus 2013-07-30 17:13:57 +00:00
parent 338701e9fd
commit 1a28f66086
1 changed files with 151 additions and 0 deletions

View File

@ -53,6 +53,10 @@ static int hf_kafka_partition_id = -1;
static int hf_kafka_message_set_size = -1;
static int hf_kafka_request_frame = -1;
static int hf_kafka_response_frame = -1;
static int hf_kafka_consumer_group = -1;
static int hf_kafka_offset = -1;
static int hf_kafka_metadata = -1;
static int hf_kafka_error = -1;
static gint ett_kafka = -1;
static gint ett_kafka_request_topic = -1;
@ -82,6 +86,24 @@ static const value_string kafka_apis[] = {
{ 0, NULL }
};
static const value_string kafka_errors[] = {
{ -1, "Unexpected Server Error" },
{ 0, "No Error" },
{ 1, "Offset Out Of Range" },
{ 2, "Invalid Message" },
{ 3, "Unknown Topic or Partition" },
{ 4, "Invalid Message Size" },
{ 5, "Leader Not Available" },
{ 6, "Not Leader For Partition" },
{ 7, "Request Timed Out" },
{ 8, "Broker Not Available" },
{ 9, "Replica Not Available" },
{ 10, "Message Size Too Large" },
{ 11, "Stale Controller Epoch Code" },
{ 12, "Offset Metadata Too Large" },
{ 0, NULL }
};
typedef struct _kafka_query_response_t {
gint16 api_key;
guint32 request_frame;
@ -89,6 +111,8 @@ typedef struct _kafka_query_response_t {
gboolean response_found;
} kafka_query_response_t;
/* HELPERS */
static guint
get_kafka_pdu_len(packet_info *pinfo _U_, tvbuff_t *tvb, int offset)
{
@ -134,6 +158,103 @@ dissect_kafka_string(proto_tree *tree, int hf_item, tvbuff_t *tvb, packet_info *
return offset;
}
/* OFFSET FETCH REQUEST */
static int
dissect_kafka_offset_fetch_request_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset)
{
proto_tree_add_item(tree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN);
offset += 4;
return offset;
}
static int
dissect_kafka_offset_fetch_request_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset)
{
proto_item *ti;
proto_tree *subtree;
int offset = start_offset;
ti = proto_tree_add_text(tree, tvb, offset, -1, "Offset Fetch Request Topic");
subtree = proto_item_add_subtree(ti, ett_kafka_request_topic);
offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset);
offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_offset_fetch_request_partition);
proto_item_set_len(ti, offset - start_offset);
return offset;
}
static int
dissect_kafka_offset_fetch_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset)
{
offset = dissect_kafka_string(tree, hf_kafka_consumer_group, tvb, pinfo, offset);
offset = dissect_kafka_array(tree, tvb, pinfo, offset, &dissect_kafka_offset_fetch_request_topic);
return offset;
}
/* OFFSET FETCH RESPONSE */
static int
dissect_kafka_offset_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int start_offset)
{
proto_item *ti;
proto_tree *subtree;
int offset = start_offset;
ti = proto_tree_add_text(tree, tvb, offset, -1, "Offset Fetch Response Partition");
subtree = proto_item_add_subtree(ti, ett_kafka_request_partition);
proto_tree_add_item(subtree, hf_kafka_partition_id, tvb, offset, 4, ENC_BIG_ENDIAN);
offset += 4;
proto_tree_add_item(subtree, hf_kafka_offset, tvb, offset, 8, ENC_BIG_ENDIAN);
offset += 8;
offset = dissect_kafka_string(tree, hf_kafka_metadata, tvb, pinfo, offset);
proto_tree_add_item(subtree, hf_kafka_error, tvb, offset, 2, ENC_BIG_ENDIAN);
offset += 2;
proto_item_set_len(ti, offset - start_offset);
return offset;
}
static int
dissect_kafka_offset_fetch_response_topic(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int start_offset)
{
proto_item *ti;
proto_tree *subtree;
int offset = start_offset;
ti = proto_tree_add_text(tree, tvb, offset, -1, "offset fetch response topic");
subtree = proto_item_add_subtree(ti, ett_kafka_response_topic);
offset = dissect_kafka_string(subtree, hf_kafka_topic_name, tvb, pinfo, offset);
offset = dissect_kafka_array(subtree, tvb, pinfo, offset, &dissect_kafka_offset_fetch_response_partition);
proto_item_set_len(ti, offset - start_offset);
return offset;
}
static int
dissect_kafka_offset_fetch_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset)
{
offset = dissect_kafka_string(tree, hf_kafka_client_id, tvb, pinfo, offset);
offset = dissect_kafka_array(tree, tvb, pinfo, offset, &dissect_kafka_offset_fetch_response_topic);
return offset;
}
/* PRODUCE REQUEST */
static int
dissect_kafka_produce_request_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset)
{
@ -184,6 +305,8 @@ dissect_kafka_produce_request(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tre
return offset;
}
/* PRODUCE RESPONSE */
static int
dissect_kafka_produce_response_partition(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree, int offset)
{
@ -225,6 +348,8 @@ dissect_kafka_produce_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tr
return dissect_kafka_array(tree, tvb, pinfo, offset, &dissect_kafka_produce_response_topic);
}
/* MAIN */
static void
dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree)
{
@ -309,6 +434,9 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree)
}
offset = dissect_kafka_produce_request(tvb, pinfo, kafka_tree, offset);
break;
case KAFKA_OFFSET_FETCH:
offset = dissect_kafka_offset_fetch_request(tvb, pinfo, kafka_tree, offset);
break;
}
}
else {
@ -348,6 +476,9 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree)
case KAFKA_PRODUCE:
offset = dissect_kafka_produce_response(tvb, pinfo, kafka_tree, offset);
break;
case KAFKA_OFFSET_FETCH:
offset = dissect_kafka_offset_fetch_response(tvb, pinfo, kafka_tree, offset);
break;
}
}
@ -375,6 +506,21 @@ proto_register_kafka(void)
FT_INT32, BASE_DEC, 0, 0,
"The length of this Kafka packet.", HFILL }
},
{ &hf_kafka_offset,
{ "Offset", "kafka.offset",
FT_INT64, BASE_DEC, 0, 0,
NULL, HFILL }
},
{ &hf_kafka_metadata,
{ "Metadata", "kafka.metadata",
FT_STRING, BASE_NONE, 0, 0,
NULL, HFILL }
},
{ &hf_kafka_error,
{ "Error", "kafka.error",
FT_INT16, BASE_DEC, VALS(kafka_errors), 0,
NULL, HFILL }
},
{ &hf_kafka_request_api_key,
{ "API Key", "kafka.request_key",
FT_INT16, BASE_DEC, VALS(kafka_apis), 0,
@ -435,6 +581,11 @@ proto_register_kafka(void)
FT_INT32, BASE_DEC, 0, 0,
NULL, HFILL }
},
{ &hf_kafka_consumer_group,
{ "Consumer Group", "kafka.consumer_group",
FT_STRING, BASE_NONE, 0, 0,
NULL, HFILL }
},
{ &hf_kafka_request_frame,
{ "Request Frame", "kafka.request_frame",
FT_FRAMENUM, BASE_NONE, 0, 0,