From 3ee1afd469bad0848e00a36e9832b9af7913d4fa Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 3 May 2025 19:39:02 -0700 Subject: [PATCH 1/2] Do not reset_generation after RebalanceInProgressError; improve CommitFailed error messages --- kafka/coordinator/consumer.py | 28 +++++++++++++++++++--------- kafka/errors.py | 24 ++++++++++++------------ 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 4361b3dc3..50a8b7212 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -615,17 +615,18 @@ def _send_offset_commit_request(self, offsets): offset_data[tp.topic][tp.partition] = offset if self._subscription.partitions_auto_assigned(): - generation = self.generation() or Generation.NO_GENERATION + generation = self.generation() else: generation = Generation.NO_GENERATION # if the generation is None, we are not part of an active group # (and we expect to be). The only thing we can do is fail the commit # and let the user rejoin the group in poll() - if self.config['api_version'] >= (0, 9) and generation is None: - return Future().failure(Errors.CommitFailedError()) - version = self._client.api_version(OffsetCommitRequest, max_version=6) + if version > 0 and generation is None: + log.info("Failing OffsetCommit request since the consumer is not part of an active group") + return Future().failure(Errors.CommitFailedError('Group rebalance in progress')) + if version == 0: request = OffsetCommitRequest[version]( self.group_id, @@ -747,13 +748,22 @@ def _handle_offset_commit_response(self, offsets, future, send_time, response): self.coordinator_dead(error_type()) future.failure(error_type(self.group_id)) return + elif error_type is Errors.RebalanceInProgressError: + # Consumer never tries to commit offset in between join-group and sync-group, + # and hence on broker-side it is not expected to see a commit offset request + # during CompletingRebalance phase; if it ever happens then broker would return + # this error. In this case we should just treat as a fatal CommitFailed exception. + # However, we do not need to reset generations and just request re-join, such that + # if the caller decides to proceed and poll, it would still try to proceed and re-join normally. + self.request_rejoin() + future.failure(Errors.CommitFailedError('Group rebalance in progress')) + return elif error_type in (Errors.UnknownMemberIdError, - Errors.IllegalGenerationError, - Errors.RebalanceInProgressError): - # need to re-join group + Errors.IllegalGenerationError): + # need reset generation and re-join group error = error_type(self.group_id) - log.debug("OffsetCommit for group %s failed: %s", - self.group_id, error) + log.warning("OffsetCommit for group %s failed: %s", + self.group_id, error) self.reset_generation() future.failure(Errors.CommitFailedError()) return diff --git a/kafka/errors.py b/kafka/errors.py index dfdc75015..898582615 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -21,18 +21,18 @@ class Cancelled(KafkaError): class CommitFailedError(KafkaError): - def __init__(self, *args, **kwargs): - super(CommitFailedError, self).__init__( - """Commit cannot be completed since the group has already - rebalanced and assigned the partitions to another member. - This means that the time between subsequent calls to poll() - was longer than the configured max_poll_interval_ms, which - typically implies that the poll loop is spending too much - time message processing. You can address this either by - increasing the rebalance timeout with max_poll_interval_ms, - or by reducing the maximum size of batches returned in poll() - with max_poll_records. - """, *args, **kwargs) + def __init__(self, *args): + if not args: + args = ("Commit cannot be completed since the group has already" + " rebalanced and assigned the partitions to another member." + " This means that the time between subsequent calls to poll()" + " was longer than the configured max_poll_interval_ms, which" + " typically implies that the poll loop is spending too much" + " time message processing. You can address this either by" + " increasing the rebalance timeout with max_poll_interval_ms," + " or by reducing the maximum size of batches returned in poll()" + " with max_poll_records.",) + super(CommitFailedError, self).__init__(*args) class IllegalArgumentError(KafkaError): From 4e95a5bcd0cad2644051561b027c36b5aa6d4320 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 3 May 2025 19:52:58 -0700 Subject: [PATCH 2/2] Fixup 0.8.2.2 generation --- kafka/coordinator/consumer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 50a8b7212..3db00d72c 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -614,7 +614,8 @@ def _send_offset_commit_request(self, offsets): for tp, offset in six.iteritems(offsets): offset_data[tp.topic][tp.partition] = offset - if self._subscription.partitions_auto_assigned(): + version = self._client.api_version(OffsetCommitRequest, max_version=6) + if version > 1 and self._subscription.partitions_auto_assigned(): generation = self.generation() else: generation = Generation.NO_GENERATION @@ -622,8 +623,7 @@ def _send_offset_commit_request(self, offsets): # if the generation is None, we are not part of an active group # (and we expect to be). The only thing we can do is fail the commit # and let the user rejoin the group in poll() - version = self._client.api_version(OffsetCommitRequest, max_version=6) - if version > 0 and generation is None: + if generation is None: log.info("Failing OffsetCommit request since the consumer is not part of an active group") return Future().failure(Errors.CommitFailedError('Group rebalance in progress'))