Skip to content

Commit b146e4e

Browse files
authored
Apply Nullability to spring-integration-kafka module
Related to: #10083 Signed-off-by: Jiandong Ma <jiandong.ma.cn@gmail.com>
1 parent a41c868 commit b146e4e

27 files changed

+192
-152
lines changed

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/AbstractKafkaChannel.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2022 the original author or authors.
2+
* Copyright 2020-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,8 @@
2020
import java.util.concurrent.TimeUnit;
2121
import java.util.concurrent.TimeoutException;
2222

23+
import org.jspecify.annotations.Nullable;
24+
2325
import org.springframework.integration.channel.AbstractMessageChannel;
2426
import org.springframework.kafka.core.KafkaOperations;
2527
import org.springframework.kafka.support.KafkaHeaders;
@@ -42,7 +44,7 @@ public abstract class AbstractKafkaChannel extends AbstractMessageChannel {
4244

4345
protected final String topic; // NOSONAR final
4446

45-
private String groupId;
47+
private @Nullable String groupId;
4648

4749
/**
4850
* Construct an instance with the provided {@link KafkaOperations} and topic.
@@ -64,7 +66,7 @@ public void setGroupId(String groupId) {
6466
this.groupId = groupId;
6567
}
6668

67-
protected String getGroupId() {
69+
protected @Nullable String getGroupId() {
6870
return this.groupId;
6971
}
7072

@@ -82,7 +84,8 @@ protected boolean doSend(Message<?> message, long timeout) {
8284
return false;
8385
}
8486
catch (ExecutionException e) {
85-
this.logger.error(e.getCause(), () -> "Interrupted while waiting for send result for: " + message);
87+
Throwable cause = e.getCause() != null ? e.getCause() : e;
88+
this.logger.error(cause, () -> "Interrupted while waiting for send result for: " + message);
8689
return false;
8790
}
8891
catch (TimeoutException e) {

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/PollableKafkaChannel.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2024 the original author or authors.
2+
* Copyright 2020-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,12 +20,13 @@
2020
import java.util.Deque;
2121
import java.util.List;
2222

23+
import org.jspecify.annotations.Nullable;
24+
2325
import org.springframework.integration.channel.ExecutorChannelInterceptorAware;
2426
import org.springframework.integration.kafka.inbound.KafkaMessageSource;
2527
import org.springframework.integration.support.management.metrics.CounterFacade;
2628
import org.springframework.integration.support.management.metrics.MetricsCaptor;
2729
import org.springframework.kafka.core.KafkaOperations;
28-
import org.springframework.lang.Nullable;
2930
import org.springframework.messaging.Message;
3031
import org.springframework.messaging.PollableChannel;
3132
import org.springframework.messaging.support.ChannelInterceptor;
@@ -47,7 +48,7 @@ public class PollableKafkaChannel extends AbstractKafkaChannel
4748

4849
private final KafkaMessageSource<?, ?> source;
4950

50-
private CounterFacade receiveCounter;
51+
private @Nullable CounterFacade receiveCounter;
5152

5253
private volatile int executorInterceptorsSize;
5354

@@ -197,8 +198,8 @@ public boolean hasExecutorInterceptors() {
197198

198199
private static String topic(KafkaMessageSource<?, ?> source) {
199200
Assert.notNull(source, "'source' cannot be null");
200-
String[] topics = source.getConsumerProperties().getTopics();
201-
Assert.isTrue(topics != null && topics.length == 1, "Only one topic is allowed");
201+
@Nullable String @Nullable [] topics = source.getConsumerProperties().getTopics();
202+
Assert.isTrue(topics != null && topics.length == 1 && topics[0] != null, "Only one topic is allowed");
202203
return topics[0];
203204
}
204205

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/SubscribableKafkaChannel.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2024 the original author or authors.
2+
* Copyright 2020-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import org.apache.kafka.clients.consumer.Consumer;
2020
import org.apache.kafka.clients.consumer.ConsumerRecord;
21+
import org.jspecify.annotations.Nullable;
2122

2223
import org.springframework.integration.dispatcher.MessageDispatcher;
2324
import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy;
@@ -56,8 +57,10 @@ public class SubscribableKafkaChannel extends AbstractKafkaChannel implements Su
5657

5758
private final IntegrationRecordMessageListener recordListener = new IntegrationRecordMessageListener();
5859

60+
@SuppressWarnings("NullAway.Init")
5961
private MessageDispatcher dispatcher;
6062

63+
@SuppressWarnings("NullAway.Init")
6164
private MessageListenerContainer container;
6265

6366
private boolean autoStartup = true;
@@ -183,8 +186,8 @@ private class IntegrationRecordMessageListener extends RecordMessagingMessageLis
183186
}
184187

185188
@Override
186-
public void onMessage(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment,
187-
Consumer<?, ?> consumer) {
189+
public void onMessage(ConsumerRecord<Object, Object> record, @Nullable Acknowledgment acknowledgment,
190+
@Nullable Consumer<?, ?> consumer) {
188191

189192
SubscribableKafkaChannel.this.dispatcher.dispatch(toMessagingMessage(record, acknowledgment, consumer));
190193
}
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/**
22
* Provides classes related to message channel implementations for Apache Kafka.
33
*/
4+
@org.jspecify.annotations.NullMarked
45
package org.springframework.integration.kafka.channel;

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/config/xml/KafkaChannelParser.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.integration.kafka.config.xml;
1818

19+
import org.jspecify.annotations.Nullable;
1920
import org.w3c.dom.Element;
2021

2122
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
@@ -38,7 +39,7 @@
3839
public class KafkaChannelParser extends AbstractChannelParser {
3940

4041
@Override
41-
protected BeanDefinitionBuilder buildBeanDefinition(Element element, ParserContext parserContext) {
42+
protected @Nullable BeanDefinitionBuilder buildBeanDefinition(Element element, ParserContext parserContext) {
4243
BeanDefinitionBuilder builder;
4344
String factory = element.getAttribute("container-factory");
4445
boolean hasFactory = StringUtils.hasText(factory);
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/**
22
* Provides parser classes to provide Xml namespace support for the Apache Kafka components.
33
*/
4+
@org.jspecify.annotations.NullMarked
45
package org.springframework.integration.kafka.config.xml;

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/AbstractKafkaChannelSpec.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2023 the original author or authors.
2+
* Copyright 2020-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,9 +16,10 @@
1616

1717
package org.springframework.integration.kafka.dsl;
1818

19+
import org.jspecify.annotations.Nullable;
20+
1921
import org.springframework.integration.dsl.MessageChannelSpec;
2022
import org.springframework.integration.kafka.channel.AbstractKafkaChannel;
21-
import org.springframework.lang.Nullable;
2223

2324
/**
2425
*

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaMessageListenerContainerSpec.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2023 the original author or authors.
2+
* Copyright 2018-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020

2121
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
2222
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
23+
import org.jspecify.annotations.Nullable;
2324

2425
import org.springframework.core.task.AsyncTaskExecutor;
2526
import org.springframework.integration.dsl.IntegrationComponentSpec;
@@ -28,7 +29,6 @@
2829
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
2930
import org.springframework.kafka.listener.ContainerProperties;
3031
import org.springframework.kafka.support.TopicPartitionOffset;
31-
import org.springframework.lang.Nullable;
3232

3333
/**
3434
* A helper class in the Builder pattern style to delegate options to the

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaTemplateSpec.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2023 the original author or authors.
2+
* Copyright 2018-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,12 +16,13 @@
1616

1717
package org.springframework.integration.kafka.dsl;
1818

19+
import org.jspecify.annotations.Nullable;
20+
1921
import org.springframework.integration.dsl.IntegrationComponentSpec;
2022
import org.springframework.kafka.core.KafkaTemplate;
2123
import org.springframework.kafka.core.ProducerFactory;
2224
import org.springframework.kafka.support.ProducerListener;
2325
import org.springframework.kafka.support.converter.RecordMessageConverter;
24-
import org.springframework.lang.Nullable;
2526

2627
/**
2728
* An {@link IntegrationComponentSpec} implementation for the {@link KafkaTemplate}.
Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
/**
22
* Provides Spring Integration Java DSL Components support for Apache Kafka.
33
*/
4-
@org.springframework.lang.NonNullApi
5-
@org.springframework.lang.NonNullFields
4+
@org.jspecify.annotations.NullMarked
65
package org.springframework.integration.kafka.dsl;

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundEndpoint.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2023 the original author or authors.
2+
* Copyright 2022-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import org.apache.kafka.clients.consumer.Consumer;
2020
import org.apache.kafka.clients.consumer.ConsumerRecord;
21+
import org.jspecify.annotations.Nullable;
2122

2223
import org.springframework.core.AttributeAccessor;
2324
import org.springframework.kafka.KafkaException;
@@ -63,8 +64,8 @@ public interface KafkaInboundEndpoint {
6364
* @param consumer the consumer.
6465
* @param runnable the runnable.
6566
*/
66-
default void doWithRetry(RetryTemplate template, RecoveryCallback<?> callback, ConsumerRecord<?, ?> record,
67-
Acknowledgment acknowledgment, Consumer<?, ?> consumer, Runnable runnable) {
67+
default void doWithRetry(RetryTemplate template, @Nullable RecoveryCallback<?> callback, ConsumerRecord<?, ?> record,
68+
@Nullable Acknowledgment acknowledgment, @Nullable Consumer<?, ?> consumer, Runnable runnable) {
6869

6970
try {
7071
template.execute(context -> {

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundGateway.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.kafka.clients.consumer.ConsumerRecord;
2626
import org.apache.kafka.common.TopicPartition;
2727
import org.apache.kafka.common.header.Header;
28+
import org.jspecify.annotations.Nullable;
2829

2930
import org.springframework.core.AttributeAccessor;
3031
import org.springframework.integration.IntegrationMessageHeaderAccessor;
@@ -50,7 +51,6 @@
5051
import org.springframework.kafka.support.converter.KafkaMessageHeaders;
5152
import org.springframework.kafka.support.converter.MessagingMessageConverter;
5253
import org.springframework.kafka.support.converter.RecordMessageConverter;
53-
import org.springframework.lang.Nullable;
5454
import org.springframework.messaging.Message;
5555
import org.springframework.messaging.MessageChannel;
5656
import org.springframework.messaging.MessageHeaders;
@@ -82,11 +82,11 @@ public class KafkaInboundGateway<K, V, R> extends MessagingGatewaySupport
8282

8383
private final KafkaTemplate<K, R> kafkaTemplate;
8484

85-
private RetryTemplate retryTemplate;
85+
private @Nullable RetryTemplate retryTemplate;
8686

87-
private RecoveryCallback<?> recoveryCallback;
87+
private @Nullable RecoveryCallback<?> recoveryCallback;
8888

89-
private BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedSeekCallback;
89+
private @Nullable BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedSeekCallback;
9090

9191
private boolean bindSourceRecord;
9292

@@ -271,7 +271,7 @@ private void setAttributesIfNecessary(Object record, @Nullable Message<?> messag
271271
}
272272

273273
@Override
274-
protected AttributeAccessor getErrorMessageAttributes(Message<?> message) {
274+
protected AttributeAccessor getErrorMessageAttributes(@Nullable Message<?> message) {
275275
AttributeAccessor attributes = ATTRIBUTES_HOLDER.get();
276276
if (attributes == null) {
277277
return super.getErrorMessageAttributes(message);
@@ -295,7 +295,8 @@ public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, Consumer
295295
}
296296

297297
@Override
298-
public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
298+
public void onMessage(ConsumerRecord<K, V> record, @Nullable Acknowledgment acknowledgment,
299+
@Nullable Consumer<?, ?> consumer) {
299300
Message<?> message = null;
300301
try {
301302
message = toMessagingMessage(record, acknowledgment, consumer);
@@ -321,8 +322,8 @@ public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment
321322
}
322323
}
323324

324-
private void sendAndReceive(ConsumerRecord<K, V> record, Message<?> message, Acknowledgment acknowledgment,
325-
Consumer<?, ?> consumer) {
325+
private void sendAndReceive(ConsumerRecord<K, V> record, Message<?> message,
326+
@Nullable Acknowledgment acknowledgment, @Nullable Consumer<?, ?> consumer) {
326327

327328
RetryTemplate template = KafkaInboundGateway.this.retryTemplate;
328329
if (template != null) {

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.kafka.clients.consumer.ConsumerRecord;
2929
import org.apache.kafka.common.TopicPartition;
3030
import org.apache.kafka.common.header.Header;
31+
import org.jspecify.annotations.Nullable;
3132

3233
import org.springframework.core.AttributeAccessor;
3334
import org.springframework.integration.IntegrationMessageHeaderAccessor;
@@ -59,7 +60,6 @@
5960
import org.springframework.kafka.support.converter.MessageConverter;
6061
import org.springframework.kafka.support.converter.MessagingMessageConverter;
6162
import org.springframework.kafka.support.converter.RecordMessageConverter;
62-
import org.springframework.lang.Nullable;
6363
import org.springframework.messaging.Message;
6464
import org.springframework.messaging.MessageChannel;
6565
import org.springframework.retry.RecoveryCallback;
@@ -91,17 +91,17 @@ public class KafkaMessageDrivenChannelAdapter<K, V> extends MessageProducerSuppo
9191

9292
private final ListenerMode mode;
9393

94-
private RecordFilterStrategy<K, V> recordFilterStrategy;
94+
private @Nullable RecordFilterStrategy<K, V> recordFilterStrategy;
9595

9696
private boolean ackDiscarded;
9797

98-
private RetryTemplate retryTemplate;
98+
private @Nullable RetryTemplate retryTemplate;
9999

100-
private RecoveryCallback<?> recoveryCallback;
100+
private @Nullable RecoveryCallback<?> recoveryCallback;
101101

102102
private boolean filterInRetry;
103103

104-
private BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedSeekCallback;
104+
private @Nullable BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedSeekCallback;
105105

106106
private boolean bindSourceRecord;
107107

@@ -378,7 +378,7 @@ private void setAttributesIfNecessary(Object record, @Nullable Message<?> messag
378378
}
379379

380380
@Override
381-
protected AttributeAccessor getErrorMessageAttributes(Message<?> message) {
381+
protected AttributeAccessor getErrorMessageAttributes(@Nullable Message<?> message) {
382382
AttributeAccessor attributes = ATTRIBUTES_HOLDER.get();
383383
if (attributes == null) {
384384
return super.getErrorMessageAttributes(message);
@@ -436,7 +436,8 @@ public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, Consumer
436436
}
437437

438438
@Override
439-
public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
439+
public void onMessage(ConsumerRecord<K, V> record, @Nullable Acknowledgment acknowledgment,
440+
@Nullable Consumer<?, ?> consumer) {
440441
Message<?> message;
441442
try {
442443
message = toMessagingMessage(record, acknowledgment, consumer);
@@ -524,8 +525,8 @@ public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, Consumer
524525
}
525526

526527
@Override
527-
public void onMessage(List<ConsumerRecord<K, V>> records, Acknowledgment acknowledgment,
528-
Consumer<?, ?> consumer) {
528+
public void onMessage(List<ConsumerRecord<K, V>> records, @Nullable Acknowledgment acknowledgment,
529+
@Nullable Consumer<?, ?> consumer) {
529530

530531
Message<?> message = null;
531532
if (!KafkaMessageDrivenChannelAdapter.this.filterInRetry) {
@@ -537,8 +538,8 @@ public void onMessage(List<ConsumerRecord<K, V>> records, Acknowledgment acknowl
537538
}
538539

539540
@Nullable
540-
private Message<?> toMessage(List<ConsumerRecord<K, V>> records, Acknowledgment acknowledgment,
541-
Consumer<?, ?> consumer) {
541+
private Message<?> toMessage(List<ConsumerRecord<K, V>> records, @Nullable Acknowledgment acknowledgment,
542+
@Nullable Consumer<?, ?> consumer) {
542543

543544
Message<?> message = null;
544545
try {

0 commit comments

Comments
 (0)