Skip to content

Commit d579afe

Browse files
committed
Merge _find_coordinator_id methods
Previously there were two methods: * `_find_coordinator_id()` * `_find_many_coordinator_ids()` But they do basically the same thing internally. And we need the plural two places, but the singular only one place. So merge them, and change the function signature to take a list of `group_ids` and return a dict of `group_id: coordinator_id`s. As a result of this, the `describe_groups()` command should scale better because the `_find_coordinator_ids()` command issues all the requests async, instead of sequentially blocking as the `described_groups()` used to do.
1 parent 16a0b31 commit d579afe

File tree

1 file changed

+27
-42
lines changed

1 file changed

+27
-42
lines changed

kafka/admin/client.py

Lines changed: 27 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -328,43 +328,27 @@ def _find_coordinator_id_process_response(self, response):
328328
.format(response.API_VERSION))
329329
return response.coordinator_id
330330

331-
def _find_coordinator_id(self, group_id):
332-
"""Find the broker node_id of the coordinator of the given group.
333-
334-
Sends a FindCoordinatorRequest message to the cluster. Will block until
335-
the FindCoordinatorResponse is received. Any errors are immediately
336-
raised.
337-
338-
:param group_id: The consumer group ID. This is typically the group
339-
name as a string.
340-
:return: The node_id of the broker that is the coordinator.
341-
"""
342-
future = self._find_coordinator_id_send_request(group_id)
343-
self._wait_for_futures([future])
344-
response = future.value
345-
return self._find_coordinator_id_process_response(response)
346-
347-
def _find_many_coordinator_ids(self, group_ids):
348-
"""Find the broker node_id of the coordinator for each of the given groups.
331+
def _find_coordinator_ids(self, group_ids):
332+
"""Find the broker node_ids of the coordinators of the given groups.
349333
350334
Sends a FindCoordinatorRequest message to the cluster for each group_id.
351335
Will block until the FindCoordinatorResponse is received for all groups.
352336
Any errors are immediately raised.
353337
354338
:param group_ids: A list of consumer group IDs. This is typically the group
355339
name as a string.
356-
:return: A list of tuples (group_id, node_id) where node_id is the id
357-
of the broker that is the coordinator for the corresponding group.
340+
:return: A dict of {group_id: node_id} where node_id is the id of the
341+
broker that is the coordinator for the corresponding group.
358342
"""
359-
futures = {
343+
groups_futures = {
360344
group_id: self._find_coordinator_id_send_request(group_id)
361345
for group_id in group_ids
362346
}
363-
self._wait_for_futures(list(futures.values()))
364-
groups_coordinators = [
365-
(group_id, self._find_coordinator_id_process_response(f.value))
366-
for group_id, f in futures.items()
367-
]
347+
self._wait_for_futures(groups_futures.values())
348+
groups_coordinators = {
349+
group_id: self._find_coordinator_id_process_response(future.value)
350+
for group_id, future in groups_futures.items()
351+
}
368352
return groups_coordinators
369353

370354
def _send_request_to_node(self, node_id, request):
@@ -1095,18 +1079,19 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include
10951079
partition assignments.
10961080
"""
10971081
group_descriptions = []
1098-
futures = []
1099-
for group_id in group_ids:
1100-
if group_coordinator_id is not None:
1101-
this_groups_coordinator_id = group_coordinator_id
1102-
else:
1103-
this_groups_coordinator_id = self._find_coordinator_id(group_id)
1104-
f = self._describe_consumer_groups_send_request(
1082+
1083+
if group_coordinator_id is not None:
1084+
groups_coordinators = {group_id: group_coordinator_id for group_id in group_ids}
1085+
else:
1086+
groups_coordinators = self.groups_coordinators(group_ids)
1087+
1088+
futures = [
1089+
self._describe_consumer_groups_send_request(
11051090
group_id,
1106-
this_groups_coordinator_id,
1091+
coordinator_id,
11071092
include_authorized_operations)
1108-
futures.append(f)
1109-
1093+
for group_id, coordinator_id in groups_coordinators.items()
1094+
]
11101095
self._wait_for_futures(futures)
11111096

11121097
for future in futures:
@@ -1278,7 +1263,7 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
12781263
explicitly specified.
12791264
"""
12801265
if group_coordinator_id is None:
1281-
group_coordinator_id = self._find_coordinator_id(group_id)
1266+
group_coordinator_id = self._find_coordinator_ids([group_id])[group_id]
12821267
future = self._list_consumer_group_offsets_send_request(
12831268
group_id, group_coordinator_id, partitions)
12841269
self._wait_for_futures([future])
@@ -1306,12 +1291,12 @@ def delete_consumer_groups(self, group_ids, group_coordinator_id=None):
13061291
if group_coordinator_id is not None:
13071292
futures = [self._delete_consumer_groups_send_request(group_ids, group_coordinator_id)]
13081293
else:
1309-
groups_coordinators = defaultdict(list)
1310-
for group_id, group_coordinator_id in self._find_many_coordinator_ids(group_ids):
1311-
groups_coordinators[group_coordinator_id].append(group_id)
1294+
coordinators_groups = defaultdict(list)
1295+
for group_id, coordinator_id in self._find_coordinator_ids(group_ids).items():
1296+
coordinators_groups[coordinator_id].append(group_id)
13121297
futures = [
1313-
self._delete_consumer_groups_send_request(group_ids, group_coordinator_id)
1314-
for group_coordinator_id, group_ids in groups_coordinators.items()
1298+
self._delete_consumer_groups_send_request(group_ids, coordinator_id)
1299+
for coordinator_id, group_ids in coordinators_groups.items()
13151300
]
13161301

13171302
self._wait_for_futures(futures)

0 commit comments

Comments
 (0)