Skip to content

Commit e94bd4f

Browse files
committed
Add throttle warnings and top-level error handling for new offset commit/fetch handling
1 parent 2de3c34 commit e94bd4f

File tree

1 file changed

+19
-4
lines changed

1 file changed

+19
-4
lines changed

kafka/coordinator/consumer.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -666,7 +666,8 @@ def _send_offset_commit_request(self, offsets):
666666

667667
def _handle_offset_commit_response(self, offsets, future, send_time, response):
668668
if response.API_VERSION >= 3 and response.throttle_time_ms > 0:
669-
log.warning()
669+
log.warning("OffsetCommitRequest throttled by broker (%d ms)", response.throttle_time_ms)
670+
670671
# TODO look at adding request_latency_ms to response (like java kafka)
671672
self.consumer_sensors.commit_latency.record((time.time() - send_time) * 1000)
672673
unauthorized_topics = set()
@@ -785,11 +786,25 @@ def _send_offset_fetch_request(self, partitions):
785786

786787
def _handle_offset_fetch_response(self, future, response):
787788
if response.API_VERSION >= 3 and response.throttle_time_ms > 0:
788-
log.warning()
789+
log.warning("OffsetFetchRequest throttled by broker (%d ms)", response.throttle_time_ms)
789790

790791
if response.API_VERSION >= 2 and response.error_code != Errors.NoError.errno:
791792
error_type = Errors.for_code(response.error_code)
792-
# TODO: handle...
793+
log.debug("Offset fetch failed: %s", error_type.__name__)
794+
error = error_type()
795+
if error_type is Errors.GroupLoadInProgressError:
796+
# Retry
797+
future.failure(error)
798+
elif error_type is Errors.NotCoordinatorForGroupError:
799+
# re-discover the coordinator and retry
800+
self.coordinator_dead(error)
801+
future.failure(error)
802+
elif error_type is Errors.GroupAuthorizationFailedError:
803+
future.failure(error)
804+
else:
805+
log.error("Unknown error fetching offsets for %s: %s", tp, error)
806+
future.failure(error)
807+
return
793808

794809
offsets = {}
795810
for topic, partitions in response.topics:
@@ -812,7 +827,7 @@ def _handle_offset_fetch_response(self, future, response):
812827
future.failure(error)
813828
elif error_type is Errors.NotCoordinatorForGroupError:
814829
# re-discover the coordinator and retry
815-
self.coordinator_dead(error_type())
830+
self.coordinator_dead(error)
816831
future.failure(error)
817832
elif error_type is Errors.UnknownTopicOrPartitionError:
818833
log.warning("OffsetFetchRequest -- unknown topic %s"

0 commit comments

Comments
 (0)