Skip to content

Commit 29630de

Browse files
committed
Fix describe_groups
This was completely broken previously because it didn't lookup the group coordinator of the consumer group. Also added basic error handling/raising.
1 parent ee849dc commit 29630de

File tree

1 file changed

+50
-13
lines changed

1 file changed

+50
-13
lines changed

kafka/admin/kafka.py

Lines changed: 50 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -562,23 +562,60 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Non
562562
# describe delegation_token protocol not yet implemented
563563
# Note: send the request to the least_loaded_node()
564564

565-
def describe_consumer_groups(self, group_ids):
565+
def describe_consumer_groups(self, group_ids, group_coordinator_id=None):
566566
"""Describe a set of consumer groups.
567567
568-
:param group_ids: A list of consumer group id names
569-
:return: Appropriate version of DescribeGroupsResponse class
568+
Any errors are immediately raised.
569+
570+
:param group_ids: A list of consumer group IDs. These are typically the
571+
group names as strings.
572+
:param group_coordinator_id: The node_id of the groups' coordinator
573+
broker. If set to None, it will query the cluster for each group to
574+
find that group's coordinator. Explicitly specifying this can be
575+
useful for avoiding extra network round trips if you already know
576+
the group coordinator. This is only useful when all the group_ids
577+
have the same coordinator, otherwise it will error. Default: None.
578+
:return: A list of group descriptions. For now the group descriptions
579+
are the raw results from the DescribeGroupsResponse. Long-term, we
580+
plan to change this to return namedtuples as well as decoding the
581+
partition assignments.
570582
"""
583+
group_descriptions = []
571584
version = self._matching_api_version(DescribeGroupsRequest)
572-
if version <= 1:
573-
request = DescribeGroupsRequest[version](
574-
groups = group_ids
575-
)
576-
else:
577-
raise NotImplementedError(
578-
"Support for DescribeGroups v{} has not yet been added to KafkaAdmin."
579-
.format(version))
580-
# TODO this is completely broken, as it needs to send to the group coordinator
581-
# return self._send(request)
585+
for group_id in group_ids:
586+
if group_coordinator_id is None:
587+
this_groups_coordinator_id = self._find_group_coordinator_id(group_id)
588+
if version <= 1:
589+
# Note: KAFKA-6788 A potential optimization is to group the
590+
# request per coordinator and send one request with a list of
591+
# all consumer groups. Java still hasn't implemented this
592+
# because the error checking is hard to get right when some
593+
# groups error and others don't.
594+
request = DescribeGroupsRequest[version](groups=(group_id,))
595+
response = self._send_request_to_node(this_groups_coordinator_id, request)
596+
assert len(response.groups) == 1
597+
# TODO need to implement converting the response tuple into
598+
# a more accessible interface like a namedtuple and then stop
599+
# hardcoding tuple indices here. Several Java examples,
600+
# including KafkaAdminClient.java
601+
group_description = response.groups[0]
602+
error_code = group_description[0]
603+
error_type = Errors.for_code(error_code)
604+
# Java has the note: KAFKA-6789, we can retry based on the error code
605+
if error_type is not Errors.NoError:
606+
raise error_type(
607+
"Request '{}' failed with response '{}'."
608+
.format(request, response))
609+
# TODO Java checks the group protocol type, and if consumer
610+
# (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
611+
# the members' partition assignments... that hasn't yet been
612+
# implemented here so just return the raw struct results
613+
group_descriptions.append(group_description)
614+
else:
615+
raise NotImplementedError(
616+
"Support for DescribeGroups v{} has not yet been added to KafkaAdmin."
617+
.format(version))
618+
return group_descriptions
582619

583620
def list_consumer_groups(self):
584621
"""List all consumer groups known to the cluster.

0 commit comments

Comments
 (0)