Skip to content

Commit 99ac0be

Browse files
Resolves #2009
The expected KafkaBackoffException's message is being logged at WARN level. Make KafkaBackoffException be logged in DEBUG level Remove coupling between KafkaMessageListenerContainer and KafkaBackoffException
1 parent 86e0cd7 commit 99ac0be

File tree

4 files changed

+69
-10
lines changed

4 files changed

+69
-10
lines changed
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2016-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.List;
20+
21+
import org.apache.kafka.clients.consumer.Consumer;
22+
import org.apache.kafka.clients.consumer.ConsumerRecord;
23+
24+
import org.springframework.kafka.KafkaException;
25+
import org.springframework.util.backoff.BackOff;
26+
27+
/**
28+
* An error handler prepared to handle a {@link KafkaBackoffException}
29+
* thrown by the listener.
30+
*
31+
* @author Tomaz Fernandes
32+
* @since 2.8.2
33+
*
34+
*/
35+
public class KafkaBackOffAwareErrorHandler extends DefaultErrorHandler {
36+
37+
public KafkaBackOffAwareErrorHandler() {
38+
}
39+
40+
public KafkaBackOffAwareErrorHandler(BackOff backOff) {
41+
super(backOff);
42+
}
43+
44+
public KafkaBackOffAwareErrorHandler(ConsumerRecordRecoverer recoverer) {
45+
super(recoverer);
46+
}
47+
48+
public KafkaBackOffAwareErrorHandler(ConsumerRecordRecoverer recoverer, BackOff backOff) {
49+
super(recoverer, backOff);
50+
}
51+
52+
@Override
53+
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
54+
SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(), // NOSONAR
55+
getRecoveryStrategy(records, thrownException), this.logger, SeekUtils.isBackoffException(thrownException)
56+
? getKafkaBackOffExceptionLogLevel()
57+
: getLogLevel());
58+
}
59+
60+
protected KafkaException.Level getKafkaBackOffExceptionLogLevel() {
61+
return KafkaException.Level.DEBUG;
62+
}
63+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaExceptionLogLevelAware.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -41,7 +41,7 @@ public void setLogLevel(KafkaException.Level logLevel) {
4141
}
4242

4343
/**
44-
* Set the level at which the exception thrown by this handler is logged.
44+
* Get the level at which the exception thrown by this handler is logged.
4545
* @return the level.
4646
*/
4747
protected KafkaException.Level getLogLevel() {

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2495,12 +2495,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> recor
24952495
commitOffsetsIfNeeded(record);
24962496
}
24972497
catch (KafkaException ke) {
2498-
if (ke.contains(KafkaBackoffException.class)) {
2499-
this.logger.debug(ke.getMessage());
2500-
}
2501-
else {
2502-
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
2503-
}
2498+
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
25042499
return ke;
25052500
}
25062501
catch (RuntimeException ee) {

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -35,6 +35,7 @@
3535
import org.springframework.kafka.listener.ContainerProperties;
3636
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
3737
import org.springframework.kafka.listener.DefaultErrorHandler;
38+
import org.springframework.kafka.listener.KafkaBackOffAwareErrorHandler;
3839
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
3940
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
4041
import org.springframework.util.Assert;
@@ -141,7 +142,7 @@ public void setErrorHandlerCustomizer(Consumer<CommonErrorHandler> errorHandlerC
141142
}
142143

143144
private CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
144-
DefaultErrorHandler errorHandler = new DefaultErrorHandler(deadLetterPublishingRecoverer,
145+
DefaultErrorHandler errorHandler = new KafkaBackOffAwareErrorHandler(deadLetterPublishingRecoverer,
145146
new FixedBackOff(0, 0));
146147
errorHandler.setCommitRecovered(true);
147148
this.errorHandlerCustomizer.accept(errorHandler);

0 commit comments

Comments
 (0)