diff --git a/kafka/cluster.py b/kafka/cluster.py index 28b71c9d1..a50e88a59 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -65,6 +65,7 @@ def __init__(self, **configs): self.config[key] = configs[key] self._bootstrap_brokers = self._generate_bootstrap_brokers() + self._coordinator_brokers = {} def _generate_bootstrap_brokers(self): # collect_hosts does not perform DNS, so we should be fine to re-use @@ -96,7 +97,11 @@ def broker_metadata(self, broker_id): Returns: BrokerMetadata or None if not found """ - return self._brokers.get(broker_id) or self._bootstrap_brokers.get(broker_id) + return ( + self._brokers.get(broker_id) or + self._bootstrap_brokers.get(broker_id) or + self._coordinator_brokers.get(broker_id) + ) def partitions_for_topic(self, topic): """Return set of all partitions for topic (whether available or not) @@ -341,41 +346,28 @@ def add_group_coordinator(self, group, response): response (GroupCoordinatorResponse): broker response Returns: - bool: True if metadata is updated, False on error + string: coordinator node_id if metadata is updated, None on error """ log.debug("Updating coordinator for %s: %s", group, response) error_type = Errors.for_code(response.error_code) if error_type is not Errors.NoError: log.error("GroupCoordinatorResponse error: %s", error_type) self._groups[group] = -1 - return False + return - node_id = response.coordinator_id + # Use a coordinator-specific node id so that group requests + # get a dedicated connection + node_id = 'coordinator-{}'.format(response.coordinator_id) coordinator = BrokerMetadata( - response.coordinator_id, + node_id, response.host, response.port, None) - # Assume that group coordinators are just brokers - # (this is true now, but could diverge in future) - if node_id not in self._brokers: - self._brokers[node_id] = coordinator - - # If this happens, either brokers have moved without - # changing IDs, or our assumption above is wrong - else: - node = self._brokers[node_id] - if coordinator.host != node.host or coordinator.port != node.port: - log.error("GroupCoordinator metadata conflicts with existing" - " broker metadata. Coordinator: %s, Broker: %s", - coordinator, node) - self._groups[group] = node_id - return False - log.info("Group coordinator for %s is %s", group, coordinator) + self._coordinator_brokers[node_id] = coordinator self._groups[group] = node_id - return True + return node_id def with_partitions(self, partitions_to_add): """Returns a copy of cluster metadata with partitions added""" diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index e538fda33..421360eab 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -676,14 +676,14 @@ def _handle_group_coordinator_response(self, future, response): error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: with self._client._lock, self._lock: - ok = self._client.cluster.add_group_coordinator(self.group_id, response) - if not ok: + coordinator_id = self._client.cluster.add_group_coordinator(self.group_id, response) + if not coordinator_id: # This could happen if coordinator metadata is different # than broker metadata future.failure(Errors.IllegalStateError()) return - self.coordinator_id = response.coordinator_id + self.coordinator_id = coordinator_id log.info("Discovered coordinator %s for group %s", self.coordinator_id, self.group_id) self._client.maybe_connect(self.coordinator_id)