Skip to content

Commit 2cad131

Browse files
Resolves #2009
The expected KafkaBackoffException's message is being logged at WARN level. Make KafkaBackoffException be logged in DEBUG level Remove coupling between KafkaMessageListenerContainer and KafkaBackoffException
1 parent 86e0cd7 commit 2cad131

File tree

2 files changed

+4
-7
lines changed

2 files changed

+4
-7
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2495,12 +2495,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> recor
24952495
commitOffsetsIfNeeded(record);
24962496
}
24972497
catch (KafkaException ke) {
2498-
if (ke.contains(KafkaBackoffException.class)) {
2499-
this.logger.debug(ke.getMessage());
2500-
}
2501-
else {
2502-
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
2503-
}
2498+
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
25042499
return ke;
25052500
}
25062501
catch (RuntimeException ee) {

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@
2828

2929
import org.springframework.beans.factory.annotation.Qualifier;
3030
import org.springframework.core.log.LogAccessor;
31+
import org.springframework.kafka.KafkaException;
3132
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
3233
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
3334
import org.springframework.kafka.listener.CommonErrorHandler;
@@ -143,6 +144,7 @@ public void setErrorHandlerCustomizer(Consumer<CommonErrorHandler> errorHandlerC
143144
private CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
144145
DefaultErrorHandler errorHandler = new DefaultErrorHandler(deadLetterPublishingRecoverer,
145146
new FixedBackOff(0, 0));
147+
errorHandler.setLogLevel(KafkaException.Level.DEBUG);
146148
errorHandler.setCommitRecovered(true);
147149
this.errorHandlerCustomizer.accept(errorHandler);
148150
return errorHandler;

0 commit comments

Comments
 (0)