diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/AbstractKafkaChannel.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/AbstractKafkaChannel.java index d9b872f87db..c4e9bc3c5d3 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/AbstractKafkaChannel.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/AbstractKafkaChannel.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 the original author or authors. + * Copyright 2020-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.jspecify.annotations.Nullable; + import org.springframework.integration.channel.AbstractMessageChannel; import org.springframework.kafka.core.KafkaOperations; import org.springframework.kafka.support.KafkaHeaders; @@ -42,7 +44,7 @@ public abstract class AbstractKafkaChannel extends AbstractMessageChannel { protected final String topic; // NOSONAR final - private String groupId; + private @Nullable String groupId; /** * Construct an instance with the provided {@link KafkaOperations} and topic. @@ -64,7 +66,7 @@ public void setGroupId(String groupId) { this.groupId = groupId; } - protected String getGroupId() { + protected @Nullable String getGroupId() { return this.groupId; } @@ -82,7 +84,8 @@ protected boolean doSend(Message message, long timeout) { return false; } catch (ExecutionException e) { - this.logger.error(e.getCause(), () -> "Interrupted while waiting for send result for: " + message); + Throwable cause = e.getCause() != null ? e.getCause() : e; + this.logger.error(cause, () -> "Interrupted while waiting for send result for: " + message); return false; } catch (TimeoutException e) { diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/PollableKafkaChannel.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/PollableKafkaChannel.java index 2c8cc23f6fb..a5e46bbe0de 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/PollableKafkaChannel.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/PollableKafkaChannel.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2024 the original author or authors. + * Copyright 2020-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,12 +20,13 @@ import java.util.Deque; import java.util.List; +import org.jspecify.annotations.Nullable; + import org.springframework.integration.channel.ExecutorChannelInterceptorAware; import org.springframework.integration.kafka.inbound.KafkaMessageSource; import org.springframework.integration.support.management.metrics.CounterFacade; import org.springframework.integration.support.management.metrics.MetricsCaptor; import org.springframework.kafka.core.KafkaOperations; -import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.PollableChannel; import org.springframework.messaging.support.ChannelInterceptor; @@ -47,7 +48,7 @@ public class PollableKafkaChannel extends AbstractKafkaChannel private final KafkaMessageSource source; - private CounterFacade receiveCounter; + private @Nullable CounterFacade receiveCounter; private volatile int executorInterceptorsSize; @@ -197,8 +198,8 @@ public boolean hasExecutorInterceptors() { private static String topic(KafkaMessageSource source) { Assert.notNull(source, "'source' cannot be null"); - String[] topics = source.getConsumerProperties().getTopics(); - Assert.isTrue(topics != null && topics.length == 1, "Only one topic is allowed"); + @Nullable String @Nullable [] topics = source.getConsumerProperties().getTopics(); + Assert.isTrue(topics != null && topics.length == 1 && topics[0] != null, "Only one topic is allowed"); return topics[0]; } diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/SubscribableKafkaChannel.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/SubscribableKafkaChannel.java index d83ea42b7d5..f966b3e3f49 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/SubscribableKafkaChannel.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/SubscribableKafkaChannel.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2024 the original author or authors. + * Copyright 2020-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.jspecify.annotations.Nullable; import org.springframework.integration.dispatcher.MessageDispatcher; import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy; @@ -56,8 +57,10 @@ public class SubscribableKafkaChannel extends AbstractKafkaChannel implements Su private final IntegrationRecordMessageListener recordListener = new IntegrationRecordMessageListener(); + @SuppressWarnings("NullAway.Init") private MessageDispatcher dispatcher; + @SuppressWarnings("NullAway.Init") private MessageListenerContainer container; private boolean autoStartup = true; @@ -183,8 +186,8 @@ private class IntegrationRecordMessageListener extends RecordMessagingMessageLis } @Override - public void onMessage(ConsumerRecord record, Acknowledgment acknowledgment, - Consumer consumer) { + public void onMessage(ConsumerRecord record, @Nullable Acknowledgment acknowledgment, + @Nullable Consumer consumer) { SubscribableKafkaChannel.this.dispatcher.dispatch(toMessagingMessage(record, acknowledgment, consumer)); } diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/package-info.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/package-info.java index 3816bbfa8f2..47754acfc8b 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/package-info.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/package-info.java @@ -1,4 +1,5 @@ /** * Provides classes related to message channel implementations for Apache Kafka. */ +@org.jspecify.annotations.NullMarked package org.springframework.integration.kafka.channel; diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/config/xml/KafkaChannelParser.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/config/xml/KafkaChannelParser.java index ba241223388..7681dd5d2bd 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/config/xml/KafkaChannelParser.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/config/xml/KafkaChannelParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 the original author or authors. + * Copyright 2020-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package org.springframework.integration.kafka.config.xml; +import org.jspecify.annotations.Nullable; import org.w3c.dom.Element; import org.springframework.beans.factory.support.BeanDefinitionBuilder; @@ -38,7 +39,7 @@ public class KafkaChannelParser extends AbstractChannelParser { @Override - protected BeanDefinitionBuilder buildBeanDefinition(Element element, ParserContext parserContext) { + protected @Nullable BeanDefinitionBuilder buildBeanDefinition(Element element, ParserContext parserContext) { BeanDefinitionBuilder builder; String factory = element.getAttribute("container-factory"); boolean hasFactory = StringUtils.hasText(factory); diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/config/xml/package-info.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/config/xml/package-info.java index d102e5e5ab0..110c9f3eb40 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/config/xml/package-info.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/config/xml/package-info.java @@ -1,4 +1,5 @@ /** * Provides parser classes to provide Xml namespace support for the Apache Kafka components. */ +@org.jspecify.annotations.NullMarked package org.springframework.integration.kafka.config.xml; diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/AbstractKafkaChannelSpec.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/AbstractKafkaChannelSpec.java index c22f000bb3b..f420986054c 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/AbstractKafkaChannelSpec.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/AbstractKafkaChannelSpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2023 the original author or authors. + * Copyright 2020-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,9 +16,10 @@ package org.springframework.integration.kafka.dsl; +import org.jspecify.annotations.Nullable; + import org.springframework.integration.dsl.MessageChannelSpec; import org.springframework.integration.kafka.channel.AbstractKafkaChannel; -import org.springframework.lang.Nullable; /** * diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaMessageListenerContainerSpec.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaMessageListenerContainerSpec.java index 072a78b5fb4..edb185d897c 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaMessageListenerContainerSpec.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaMessageListenerContainerSpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.jspecify.annotations.Nullable; import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.integration.dsl.IntegrationComponentSpec; @@ -28,7 +29,6 @@ import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.support.TopicPartitionOffset; -import org.springframework.lang.Nullable; /** * A helper class in the Builder pattern style to delegate options to the diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaTemplateSpec.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaTemplateSpec.java index 2ec42fbc900..e655b05f6be 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaTemplateSpec.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaTemplateSpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,12 +16,13 @@ package org.springframework.integration.kafka.dsl; +import org.jspecify.annotations.Nullable; + import org.springframework.integration.dsl.IntegrationComponentSpec; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.ProducerListener; import org.springframework.kafka.support.converter.RecordMessageConverter; -import org.springframework.lang.Nullable; /** * An {@link IntegrationComponentSpec} implementation for the {@link KafkaTemplate}. diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/package-info.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/package-info.java index 53f5ad837bc..5a2f136f533 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/package-info.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/package-info.java @@ -1,6 +1,5 @@ /** * Provides Spring Integration Java DSL Components support for Apache Kafka. */ -@org.springframework.lang.NonNullApi -@org.springframework.lang.NonNullFields +@org.jspecify.annotations.NullMarked package org.springframework.integration.kafka.dsl; diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundEndpoint.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundEndpoint.java index 3f2090322e8..23319beb60d 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundEndpoint.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundEndpoint.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 the original author or authors. + * Copyright 2022-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.jspecify.annotations.Nullable; import org.springframework.core.AttributeAccessor; import org.springframework.kafka.KafkaException; @@ -63,8 +64,8 @@ public interface KafkaInboundEndpoint { * @param consumer the consumer. * @param runnable the runnable. */ - default void doWithRetry(RetryTemplate template, RecoveryCallback callback, ConsumerRecord record, - Acknowledgment acknowledgment, Consumer consumer, Runnable runnable) { + default void doWithRetry(RetryTemplate template, @Nullable RecoveryCallback callback, ConsumerRecord record, + @Nullable Acknowledgment acknowledgment, @Nullable Consumer consumer, Runnable runnable) { try { template.execute(context -> { diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundGateway.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundGateway.java index 67a5a833927..ede846a15b3 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundGateway.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundGateway.java @@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Header; +import org.jspecify.annotations.Nullable; import org.springframework.core.AttributeAccessor; import org.springframework.integration.IntegrationMessageHeaderAccessor; @@ -50,7 +51,6 @@ import org.springframework.kafka.support.converter.KafkaMessageHeaders; import org.springframework.kafka.support.converter.MessagingMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; -import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHeaders; @@ -82,11 +82,11 @@ public class KafkaInboundGateway extends MessagingGatewaySupport private final KafkaTemplate kafkaTemplate; - private RetryTemplate retryTemplate; + private @Nullable RetryTemplate retryTemplate; - private RecoveryCallback recoveryCallback; + private @Nullable RecoveryCallback recoveryCallback; - private BiConsumer, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedSeekCallback; + private @Nullable BiConsumer, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedSeekCallback; private boolean bindSourceRecord; @@ -271,7 +271,7 @@ private void setAttributesIfNecessary(Object record, @Nullable Message messag } @Override - protected AttributeAccessor getErrorMessageAttributes(Message message) { + protected AttributeAccessor getErrorMessageAttributes(@Nullable Message message) { AttributeAccessor attributes = ATTRIBUTES_HOLDER.get(); if (attributes == null) { return super.getErrorMessageAttributes(message); @@ -295,7 +295,8 @@ public void onPartitionsAssigned(Map assignments, Consumer } @Override - public void onMessage(ConsumerRecord record, Acknowledgment acknowledgment, Consumer consumer) { + public void onMessage(ConsumerRecord record, @Nullable Acknowledgment acknowledgment, + @Nullable Consumer consumer) { Message message = null; try { message = toMessagingMessage(record, acknowledgment, consumer); @@ -321,8 +322,8 @@ public void onMessage(ConsumerRecord record, Acknowledgment acknowledgment } } - private void sendAndReceive(ConsumerRecord record, Message message, Acknowledgment acknowledgment, - Consumer consumer) { + private void sendAndReceive(ConsumerRecord record, Message message, + @Nullable Acknowledgment acknowledgment, @Nullable Consumer consumer) { RetryTemplate template = KafkaInboundGateway.this.retryTemplate; if (template != null) { diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java index 20006e21f27..bf941440c8f 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java @@ -28,6 +28,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Header; +import org.jspecify.annotations.Nullable; import org.springframework.core.AttributeAccessor; import org.springframework.integration.IntegrationMessageHeaderAccessor; @@ -59,7 +60,6 @@ import org.springframework.kafka.support.converter.MessageConverter; import org.springframework.kafka.support.converter.MessagingMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; -import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.retry.RecoveryCallback; @@ -91,17 +91,17 @@ public class KafkaMessageDrivenChannelAdapter extends MessageProducerSuppo private final ListenerMode mode; - private RecordFilterStrategy recordFilterStrategy; + private @Nullable RecordFilterStrategy recordFilterStrategy; private boolean ackDiscarded; - private RetryTemplate retryTemplate; + private @Nullable RetryTemplate retryTemplate; - private RecoveryCallback recoveryCallback; + private @Nullable RecoveryCallback recoveryCallback; private boolean filterInRetry; - private BiConsumer, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedSeekCallback; + private @Nullable BiConsumer, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedSeekCallback; private boolean bindSourceRecord; @@ -378,7 +378,7 @@ private void setAttributesIfNecessary(Object record, @Nullable Message messag } @Override - protected AttributeAccessor getErrorMessageAttributes(Message message) { + protected AttributeAccessor getErrorMessageAttributes(@Nullable Message message) { AttributeAccessor attributes = ATTRIBUTES_HOLDER.get(); if (attributes == null) { return super.getErrorMessageAttributes(message); @@ -436,7 +436,8 @@ public void onPartitionsAssigned(Map assignments, Consumer } @Override - public void onMessage(ConsumerRecord record, Acknowledgment acknowledgment, Consumer consumer) { + public void onMessage(ConsumerRecord record, @Nullable Acknowledgment acknowledgment, + @Nullable Consumer consumer) { Message message; try { message = toMessagingMessage(record, acknowledgment, consumer); @@ -524,8 +525,8 @@ public void onPartitionsAssigned(Map assignments, Consumer } @Override - public void onMessage(List> records, Acknowledgment acknowledgment, - Consumer consumer) { + public void onMessage(List> records, @Nullable Acknowledgment acknowledgment, + @Nullable Consumer consumer) { Message message = null; if (!KafkaMessageDrivenChannelAdapter.this.filterInRetry) { @@ -537,8 +538,8 @@ public void onMessage(List> records, Acknowledgment acknowl } @Nullable - private Message toMessage(List> records, Acknowledgment acknowledgment, - Consumer consumer) { + private Message toMessage(List> records, @Nullable Acknowledgment acknowledgment, + @Nullable Consumer consumer) { Message message = null; try { diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java index 607994cbc9e..5da24375084 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java @@ -26,6 +26,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.Set; import java.util.TreeSet; @@ -47,6 +48,7 @@ import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; +import org.jspecify.annotations.Nullable; import org.springframework.beans.factory.BeanClassLoaderAware; import org.springframework.core.log.LogAccessor; @@ -75,7 +77,6 @@ import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.kafka.support.serializer.DeserializationException; import org.springframework.kafka.support.serializer.SerializationUtils; -import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; @@ -138,7 +139,7 @@ public class KafkaMessageSource extends AbstractMessageSource private final Collection assignedPartitions = new LinkedHashSet<>(); - private final Duration commitTimeout; + private final @Nullable Duration commitTimeout; private final Duration assignTimeout; @@ -154,7 +155,7 @@ public class KafkaMessageSource extends AbstractMessageSource private RecordMessageConverter messageConverter = new MessagingMessageConverter(); - private Class payloadType; + private @Nullable Class payloadType; private boolean rawMessageHeader; @@ -164,13 +165,13 @@ public class KafkaMessageSource extends AbstractMessageSource private boolean checkNullValueForExceptions; - private volatile Consumer consumer; + private volatile @Nullable Consumer consumer; - private volatile Iterator> recordsIterator; + private volatile @Nullable Iterator> recordsIterator; public volatile boolean newAssignment; // NOSONAR - direct access from inner - private ClassLoader classLoader; + private @Nullable ClassLoader classLoader; /** * Construct an instance with the supplied parameters. Fetching multiple @@ -307,7 +308,7 @@ public ConsumerProperties getConsumerProperties() { return this.consumerProperties; } - protected String getGroupId() { + protected @Nullable String getGroupId() { return this.consumerProperties.getGroupId(); } @@ -332,7 +333,7 @@ public void setMessageConverter(RecordMessageConverter messageConverter) { this.messageConverter = messageConverter; } - protected Class getPayloadType() { + protected @Nullable Class getPayloadType() { return this.payloadType; } @@ -345,7 +346,7 @@ public void setPayloadType(Class payloadType) { this.payloadType = payloadType; } - protected ConsumerRebalanceListener getRebalanceListener() { + protected @Nullable ConsumerRebalanceListener getRebalanceListener() { return this.consumerProperties.getConsumerRebalanceListener(); } @@ -369,7 +370,7 @@ public void setRawMessageHeader(boolean rawMessageHeader) { this.rawMessageHeader = rawMessageHeader; } - protected Duration getCommitTimeout() { + protected @Nullable Duration getCommitTimeout() { return this.commitTimeout; } @@ -450,7 +451,7 @@ public boolean isPaused() { } @Override // NOSONAR - not so complex - protected Object doReceive() { + protected @Nullable Object doReceive() { this.receiveLock.lock(); try { @@ -461,6 +462,7 @@ protected Object doReceive() { if (this.consumer == null) { createConsumer(); } + Assert.state(this.consumer != null, "'consumer' must not be null"); if (this.pausing.get() && !this.paused.get() && !this.assignedPartitions.isEmpty()) { this.consumer.pause(this.assignedPartitions); this.paused.set(true); @@ -492,7 +494,7 @@ protected void createConsumer() { new IntegrationConsumerRebalanceListener(this.consumerProperties.getConsumerRebalanceListener()); Pattern topicPattern = this.consumerProperties.getTopicPattern(); - TopicPartitionOffset[] partitions = this.consumerProperties.getTopicPartitions(); + @Nullable TopicPartitionOffset @Nullable [] partitions = this.consumerProperties.getTopicPartitions(); if (topicPattern != null) { this.consumer.subscribe(topicPattern, rebalanceCallback); } @@ -509,15 +511,18 @@ else if (partitions != null) { } } - private void assignAndSeekPartitions(TopicPartitionOffset[] partitions) { + private void assignAndSeekPartitions(@Nullable TopicPartitionOffset[] partitions) { List topicPartitionsToAssign = Arrays.stream(partitions) + .filter(Objects::nonNull) .map(TopicPartitionOffset::getTopicPartition) .collect(Collectors.toList()); + Assert.state(this.consumer != null, "'consumer' must not be null"); this.consumer.assign(topicPartitionsToAssign); this.assignedPartitions.addAll(topicPartitionsToAssign); for (TopicPartitionOffset partition : partitions) { + Assert.state(partition != null, "'partition' must not be null"); if (TopicPartitionOffset.SeekPosition.BEGINNING.equals(partition.getPosition())) { this.consumer.seekToBeginning(Collections.singleton(partition.getTopicPartition())); } @@ -548,8 +553,9 @@ else if (partition.isRelativeToCurrent()) { this.consumer.seek(topicPartition, newOffset); } catch (Exception ex) { + var consumer = this.consumer; this.logger.error(ex, () -> "Failed to set initial offset for " + topicPartition - + " at " + newOffset + ". Position is " + this.consumer + + " at " + newOffset + ". Position is " + consumer .position(topicPartition)); } } @@ -566,6 +572,7 @@ private ConsumerRecord pollRecord() { this.consumerLock.lock(); try { try { + Assert.state(this.consumer != null, "'consumer' must not be null"); ConsumerRecords records = this.consumer .poll(this.assignedPartitions.isEmpty() ? this.assignTimeout : this.pollTimeout); this.logger.debug(() -> records == null @@ -595,6 +602,7 @@ private ConsumerRecord pollRecord() { private ConsumerRecord nextRecord() { ConsumerRecord record; + Assert.state(this.recordsIterator != null, "'recordsIterator' must not be null"); record = this.recordsIterator.next(); if (!this.recordsIterator.hasNext()) { this.recordsIterator = null; @@ -689,11 +697,11 @@ private static ConsumerFactory fixConsumerFactory(ConsumerFactory partitions) { KafkaMessageSource.this.logger.info(() -> "Partitions revoked: " + partitions); if (this.providedRebalanceListener != null) { if (this.isConsumerAware) { + Assert.state(KafkaMessageSource.this.consumer != null, "'consumer' must not be null"); ((ConsumerAwareRebalanceListener) this.providedRebalanceListener) .onPartitionsRevokedAfterCommit(KafkaMessageSource.this.consumer, partitions); } @@ -725,6 +734,7 @@ public void onPartitionsLost(Collection partitions) { @Override public void onPartitionsAssigned(Collection partitions) { KafkaMessageSource.this.assignedPartitions.addAll(partitions); + Assert.state(KafkaMessageSource.this.consumer != null, "'consumer' must not be null"); if (KafkaMessageSource.this.paused.get()) { KafkaMessageSource.this.consumer.pause(KafkaMessageSource.this.assignedPartitions); KafkaMessageSource.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; " @@ -777,7 +787,7 @@ public static class KafkaAckCallback implements AcknowledgmentCallback, Ac private final KafkaAckInfo ackInfo; - private final Duration commitTimeout; + private final @Nullable Duration commitTimeout; private final OffsetCommitCallback commitCallback; @@ -830,15 +840,19 @@ public void acknowledge(Status status) { finally { this.acknowledged = true; if (!this.ackInfo.isAckDeferred()) { - this.ackInfo.getOffsets().get(this.ackInfo.getTopicPartition()).remove(this.ackInfo); + var inflight = this.ackInfo.getOffsets().get(this.ackInfo.getTopicPartition()); + Assert.state(inflight != null, "'inflight' must not be null"); + inflight.remove(this.ackInfo); } } } } private void rollback(ConsumerRecord record) { + Assert.state(this.ackInfo.getConsumer() != null, "'consumer' must not be null"); this.ackInfo.getConsumer().seek(this.ackInfo.getTopicPartition(), record.offset()); Set> inflight = this.ackInfo.getOffsets().get(this.ackInfo.getTopicPartition()); + Assert.state(inflight != null, "'infligt' must not be null"); synchronized (inflight) { if (inflight.size() > 1) { List rewound = @@ -866,6 +880,7 @@ private void commitIfPossible(ConsumerRecord record) { // NOSONAR } else { Set> candidates = this.ackInfo.getOffsets().get(this.ackInfo.getTopicPartition()); + Assert.state(candidates != null, "'candidates' must not be null"); KafkaAckInfo ackInformation = null; synchronized (candidates) { if (candidates.iterator().next().equals(this.ackInfo)) { @@ -902,6 +917,7 @@ private void commitIfPossible(ConsumerRecord record) { // NOSONAR Map offset = Collections.singletonMap(ackInformation.getTopicPartition(), new OffsetAndMetadata(ackInformation.getRecord().offset() + 1)); + Assert.state(ackInformation.getConsumer() != null, "'consumer' must not be null"); if (this.isSyncCommits) { if (this.commitTimeout == null) { ackInformation.getConsumer().commitSync(offset); @@ -967,12 +983,12 @@ public Object getConsumerMonitor() { } @Override - public String getGroupId() { + public @Nullable String getGroupId() { return KafkaMessageSource.this.getGroupId(); } @Override - public Consumer getConsumer() { + public @Nullable Consumer getConsumer() { return KafkaMessageSource.this.consumer; } @@ -1035,9 +1051,9 @@ public interface KafkaAckInfo extends Comparable> { Object getConsumerMonitor(); - String getGroupId(); + @Nullable String getGroupId(); - Consumer getConsumer(); + @Nullable Consumer getConsumer(); ConsumerRecord getRecord(); diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/package-info.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/package-info.java index e75fc2ac772..a5ed7e6b7f2 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/package-info.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/package-info.java @@ -1,4 +1,5 @@ /** * Provides Spring Integration inbound components for Apache Kafka. */ +@org.jspecify.annotations.NullMarked package org.springframework.integration.kafka.inbound; diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java index 3498526d7c1..6089953b1f4 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java @@ -33,6 +33,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; +import org.jspecify.annotations.Nullable; import org.springframework.expression.EvaluationContext; import org.springframework.expression.Expression; @@ -59,7 +60,6 @@ import org.springframework.kafka.support.converter.KafkaMessageHeaders; import org.springframework.kafka.support.converter.MessagingMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; -import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandlingException; @@ -118,38 +118,40 @@ public class KafkaProducerMessageHandler extends AbstractReplyProducingMes private final long deliveryTimeoutMsProperty; + @SuppressWarnings("NullAway.Init") private EvaluationContext evaluationContext; - private Expression topicExpression; + private @Nullable Expression topicExpression; - private Expression messageKeyExpression; + private @Nullable Expression messageKeyExpression; - private Expression partitionIdExpression; + private @Nullable Expression partitionIdExpression; - private Expression timestampExpression; + private @Nullable Expression timestampExpression; private Expression flushExpression = new FunctionExpression>(message -> Boolean.TRUE.equals(message.getHeaders().get(KafkaIntegrationHeaders.FLUSH))); private boolean sync; + @SuppressWarnings("NullAway.Init") private Expression sendTimeoutExpression; - private KafkaHeaderMapper headerMapper; + private @Nullable KafkaHeaderMapper headerMapper; private RecordMessageConverter replyMessageConverter = new MessagingMessageConverter(); - private MessageChannel sendFailureChannel; + private @Nullable MessageChannel sendFailureChannel; - private String sendFailureChannelName; + private @Nullable String sendFailureChannelName; - private MessageChannel sendSuccessChannel; + private @Nullable MessageChannel sendSuccessChannel; - private String sendSuccessChannelName; + private @Nullable String sendSuccessChannelName; - private MessageChannel futuresChannel; + private @Nullable MessageChannel futuresChannel; - private String futuresChannelName; + private @Nullable String futuresChannelName; private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy(); @@ -165,8 +167,6 @@ public class KafkaProducerMessageHandler extends AbstractReplyProducingMes private Duration assignmentDuration = DEFAULT_ASSIGNMENT_TIMEOUT; - private volatile byte[] singleReplyTopic; - @SuppressWarnings("this-escape") public KafkaProducerMessageHandler(final KafkaTemplate kafkaTemplate) { Assert.notNull(kafkaTemplate, "kafkaTemplate cannot be null"); @@ -190,9 +190,9 @@ public KafkaProducerMessageHandler(final KafkaTemplate kafkaTemplate) { + "configured to read uncommitted records"); } determineSendTimeout(); - this.deliveryTimeoutMsProperty = - this.sendTimeoutExpression.getValue(Long.class) // NOSONAR - never null after determineSendTimeout() - - this.timeoutBuffer; + Long sendTimeout = this.sendTimeoutExpression.getValue(Long.class); // never null after determineSendTimeout() + Assert.state(sendTimeout != null, "'sendTimeout' must not be null"); + this.deliveryTimeoutMsProperty = sendTimeout - this.timeoutBuffer; } private void determineSendTimeout() { @@ -254,7 +254,7 @@ public void setHeaderMapper(KafkaHeaderMapper headerMapper) { this.headerMapper = headerMapper; } - public KafkaHeaderMapper getHeaderMapper() { + public @Nullable KafkaHeaderMapper getHeaderMapper() { return this.headerMapper; } @@ -446,7 +446,7 @@ else if (this.sendFailureChannelName != null) { return null; } - protected MessageChannel getSendSuccessChannel() { + protected @Nullable MessageChannel getSendSuccessChannel() { if (this.sendSuccessChannel != null) { return this.sendSuccessChannel; } @@ -457,7 +457,7 @@ else if (this.sendSuccessChannelName != null) { return null; } - protected MessageChannel getFuturesChannel() { + protected @Nullable MessageChannel getFuturesChannel() { if (this.futuresChannel != null) { return this.futuresChannel; } @@ -492,7 +492,7 @@ public boolean isRunning() { @SuppressWarnings("unchecked") // NOSONAR - complexity @Override - protected Object handleRequestMessage(final Message message) { + protected @Nullable Object handleRequestMessage(final Message message) { final ProducerRecord producerRecord; boolean flush = Boolean.TRUE.equals(this.flushExpression.getValue(this.evaluationContext, message, Boolean.class)); @@ -544,7 +544,8 @@ protected Object handleRequestMessage(final Message message) { throw new MessageHandlingException(message, e); } catch (ExecutionException e) { - throw new MessageHandlingException(message, e.getCause()); // NOSONAR + Throwable cause = e.getCause() != null ? e.getCause() : e; + throw new MessageHandlingException(message, cause); } return processReplyFuture(gatewayFuture); } @@ -600,7 +601,6 @@ private void waitForAssignment() { } } - @Nullable private void addReplyTopicIfAny(MessageHeaders messageHeaders, Headers headers) { if (this.isGateway) { Object replyHeader = messageHeaders.get(KafkaHeaders.REPLY_TOPIC); @@ -616,9 +616,10 @@ else if (replyHeader != null) { } } - private void sendFutureIfRequested(CompletableFuture> sendFuture, Object futureToken) { + private void sendFutureIfRequested(@Nullable CompletableFuture> sendFuture, + @Nullable Object futureToken) { - if (futureToken != null) { + if (sendFuture != null && futureToken != null) { MessageChannel futures = getFuturesChannel(); if (futures != null) { try { @@ -635,9 +636,13 @@ private void sendFutureIfRequested(CompletableFuture> sendFutur } public void processSendResult(final Message message, final ProducerRecord producerRecord, - CompletableFuture> future, MessageChannel metadataChannel) + @Nullable CompletableFuture> future, @Nullable MessageChannel metadataChannel) throws InterruptedException, ExecutionException { + if (future == null) { + this.logger.warn("send future is null, skip processing send result."); + return; + } final MessageChannel failureChannel = getSendFailureChannel(); if (failureChannel != null || metadataChannel != null) { future.whenComplete((sendResult, exception) -> { @@ -689,7 +694,7 @@ private void sendFailure(final Message message, final ProducerRecord pr } } - private Future processReplyFuture(@Nullable RequestReplyFuture future) { + private @Nullable Future processReplyFuture(@Nullable RequestReplyFuture future) { if (future == null) { return null; } @@ -763,8 +768,8 @@ public interface ProducerRecordCreator { * @param headers the headers. * @return the record. */ - ProducerRecord create(Message message, String topic, Integer partition, Long timestamp, K key, V value, - Headers headers); + ProducerRecord create(Message message, @Nullable String topic, @Nullable Integer partition, + @Nullable Long timestamp, @Nullable K key, @Nullable V value, Headers headers); } diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/outbound/package-info.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/outbound/package-info.java index 8f27e06393d..021e99c5893 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/outbound/package-info.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/outbound/package-info.java @@ -1,4 +1,5 @@ /** * Provides Spring Integration outbound components for Apache Kafka. */ +@org.jspecify.annotations.NullMarked package org.springframework.integration.kafka.outbound; diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/package-info.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/package-info.java index d77f8072e7e..e175b2f23a4 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/package-info.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/package-info.java @@ -1,4 +1,5 @@ /** * Root package of the Apache Kafka Module. */ +@org.jspecify.annotations.NullMarked package org.springframework.integration.kafka; diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/support/RawRecordHeaderErrorMessageStrategy.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/support/RawRecordHeaderErrorMessageStrategy.java index 552fb6ba5fa..a60ce740b98 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/support/RawRecordHeaderErrorMessageStrategy.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/support/RawRecordHeaderErrorMessageStrategy.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,12 +19,13 @@ import java.util.HashMap; import java.util.Map; +import org.jspecify.annotations.Nullable; + import org.springframework.core.AttributeAccessor; import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.support.ErrorMessageStrategy; import org.springframework.integration.support.ErrorMessageUtils; import org.springframework.kafka.support.KafkaHeaders; -import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.support.ErrorMessage; diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/support/package-info.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/support/package-info.java index 0b8fe48568d..72a98346d47 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/support/package-info.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/support/package-info.java @@ -1,4 +1,5 @@ /** * Provides support classes. */ +@org.jspecify.annotations.NullMarked package org.springframework.integration.kafka.support; diff --git a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/channnel/ChannelTests.java b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/channnel/ChannelTests.java index fd4115459f0..8bd7bc64657 100644 --- a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/channnel/ChannelTests.java +++ b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/channnel/ChannelTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 the original author or authors. + * Copyright 2020-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -129,7 +129,7 @@ public ProducerFactory pf() { @Bean public ConsumerFactory cf() { Map consumerProps = - KafkaTestUtils.consumerProps(this.embeddedKafkaBrokers, "channelTests", "false"); + KafkaTestUtils.consumerProps(this.embeddedKafkaBrokers, "channelTests", false); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return new DefaultKafkaConsumerFactory<>(consumerProps); } diff --git a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java index 930f50da364..5ed6f539f29 100644 --- a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java +++ b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2024 the original author or authors. + * Copyright 2015-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -280,7 +280,7 @@ public static class ContextConfiguration { @Bean public ConsumerFactory consumerFactory() { - Map props = KafkaTestUtils.consumerProps(this.embeddedKafkaBrokers, "dsl-group", "false"); + Map props = KafkaTestUtils.consumerProps(this.embeddedKafkaBrokers, "dsl-group", false); return new DefaultKafkaConsumerFactory<>(props); } diff --git a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/InboundGatewayTests.java b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/InboundGatewayTests.java index 0af6037c030..3edf76515a8 100644 --- a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/InboundGatewayTests.java +++ b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/InboundGatewayTests.java @@ -102,13 +102,13 @@ class InboundGatewayTests { @Test void testInbound(EmbeddedKafkaBroker embeddedKafka) throws Exception { Map consumerProps = - KafkaTestUtils.consumerProps("replyHandler1", "false", embeddedKafka); + KafkaTestUtils.consumerProps(embeddedKafka, "replyHandler1", false); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); ConsumerFactory cf2 = new DefaultKafkaConsumerFactory<>(consumerProps); Consumer consumer = cf2.createConsumer(); embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic2); - Map props = KafkaTestUtils.consumerProps("test1", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "test1", false); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic1); @@ -195,13 +195,13 @@ public Message toMessage(ConsumerRecord record, Acknowledgment acknowle @Test void testInboundErrorRecover(EmbeddedKafkaBroker embeddedKafka) { - Map consumerProps = KafkaTestUtils.consumerProps("replyHandler2", "false", embeddedKafka); + Map consumerProps = KafkaTestUtils.consumerProps(embeddedKafka, "replyHandler2", false); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); ConsumerFactory cf2 = new DefaultKafkaConsumerFactory<>(consumerProps); Consumer consumer = cf2.createConsumer(); embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic4); - Map props = KafkaTestUtils.consumerProps("test2", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "test2", false); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic3); @@ -276,13 +276,13 @@ public Message toMessage(ConsumerRecord record, Acknowledgment acknowle @Test void testInboundRetryErrorRecover(EmbeddedKafkaBroker embeddedKafka) { - Map consumerProps = KafkaTestUtils.consumerProps("replyHandler3", "false", embeddedKafka); + Map consumerProps = KafkaTestUtils.consumerProps(embeddedKafka, "replyHandler3", false); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); ConsumerFactory cf2 = new DefaultKafkaConsumerFactory<>(consumerProps); Consumer consumer = cf2.createConsumer(); embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic6); - Map props = KafkaTestUtils.consumerProps("test3", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "test3", false); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic5); @@ -362,13 +362,13 @@ public Message toMessage(ConsumerRecord record, Acknowledgment acknowle @Test void testInboundRetryErrorRecoverWithoutRecoveryCallback(EmbeddedKafkaBroker embeddedKafka) throws Exception { - Map consumerProps = KafkaTestUtils.consumerProps("replyHandler4", "false", embeddedKafka); + Map consumerProps = KafkaTestUtils.consumerProps(embeddedKafka, "replyHandler4", false); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); ConsumerFactory cf2 = new DefaultKafkaConsumerFactory<>(consumerProps); Consumer consumer = cf2.createConsumer(); embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic7); - Map props = KafkaTestUtils.consumerProps("test4", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps(embeddedKafka, "test4", false); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic7); diff --git a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java index a1be77c5e4c..269f41d494c 100644 --- a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java +++ b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java @@ -133,7 +133,7 @@ static void setup() { @Test void testInboundRecord() { - Map props = KafkaTestUtils.consumerProps(EMBEDDED_BROKERS, "test1", "true"); + Map props = KafkaTestUtils.consumerProps(EMBEDDED_BROKERS, "test1", true); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic1); @@ -222,7 +222,7 @@ public Message toMessage(ConsumerRecord record, Acknowledgment acknowle @Test void testInboundRecordRetryRecover() { - Map props = KafkaTestUtils.consumerProps(EMBEDDED_BROKERS, "test4", "true"); + Map props = KafkaTestUtils.consumerProps(EMBEDDED_BROKERS, "test4", true); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic4); @@ -289,7 +289,7 @@ protected boolean doSend(Message message, long timeout) { */ @Test void testInboundRecordRetryRecoverWithoutRecoveryCallback() throws Exception { - Map props = KafkaTestUtils.consumerProps(EMBEDDED_BROKERS, "test6", "true"); + Map props = KafkaTestUtils.consumerProps(EMBEDDED_BROKERS, "test6", true); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic6); @@ -340,7 +340,7 @@ public void onError(RetryContext context, RetryCallback @Test void testInboundRecordNoRetryRecover() { - Map props = KafkaTestUtils.consumerProps(EMBEDDED_BROKERS, "test5", "true"); + Map props = KafkaTestUtils.consumerProps(EMBEDDED_BROKERS, "test5", true); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic5); @@ -396,7 +396,7 @@ protected boolean doSend(Message message, long timeout) { @Test void testInboundBatch() throws Exception { - Map props = KafkaTestUtils.consumerProps(EMBEDDED_BROKERS, "test2", "true"); + Map props = KafkaTestUtils.consumerProps(EMBEDDED_BROKERS, "test2", true); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic2); @@ -486,7 +486,7 @@ public Message toMessage(List> records, Acknowledgment a @Test void testInboundJson() { - Map props = KafkaTestUtils.consumerProps(EMBEDDED_BROKERS, "test3", "true"); + Map props = KafkaTestUtils.consumerProps(EMBEDDED_BROKERS, "test3", true); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic3); @@ -529,7 +529,7 @@ void testInboundJson() { @Test void testInboundJsonWithPayload() { - Map props = KafkaTestUtils.consumerProps(EMBEDDED_BROKERS, "test7", "true"); + Map props = KafkaTestUtils.consumerProps(EMBEDDED_BROKERS, "test7", true); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic6); @@ -585,8 +585,8 @@ void testPauseResume() throws Exception { records.put(new TopicPartition("foo", 0), Arrays.asList( new ConsumerRecord<>("foo", 0, 0L, 1, "foo"), new ConsumerRecord<>("foo", 0, 1L, 1, "bar"))); - ConsumerRecords consumerRecords = new ConsumerRecords<>(records); - ConsumerRecords emptyRecords = new ConsumerRecords<>(Collections.emptyMap()); + ConsumerRecords consumerRecords = new ConsumerRecords<>(records, Map.of()); + ConsumerRecords emptyRecords = new ConsumerRecords<>(Collections.emptyMap(), Map.of()); AtomicBoolean first = new AtomicBoolean(true); given(consumer.poll(any(Duration.class))).willAnswer(i -> { Thread.sleep(50); diff --git a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageSourceIntegrationTests.java b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageSourceIntegrationTests.java index c1bf2cadd2a..ad991d3b129 100644 --- a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageSourceIntegrationTests.java +++ b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageSourceIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2024 the original author or authors. + * Copyright 2018-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -69,7 +69,7 @@ static void setup() { @Test void testSource() throws Exception { - Map consumerProps = KafkaTestUtils.consumerProps(brokers, "testSource", "false"); + Map consumerProps = KafkaTestUtils.consumerProps(brokers, "testSource", false); consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 42); @@ -140,7 +140,7 @@ public void onPartitionsAssigned(Collection partitions) { @Test void deserializationErrorIsThrownFromSource() { - Map consumerProps = KafkaTestUtils.consumerProps(brokers, "testErrorChannelSource", "false"); + Map consumerProps = KafkaTestUtils.consumerProps(brokers, "testErrorChannelSource", false); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, FailingDeserializer.class); diff --git a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageSourceTests.java b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageSourceTests.java index cfe99822c35..7a537f734ab 100644 --- a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageSourceTests.java +++ b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageSourceTests.java @@ -46,7 +46,7 @@ import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.record.TimestampType; @@ -267,11 +267,11 @@ private void testAckCommon(boolean sync, boolean timeout) { records4.put(topicPartition, Collections.singletonList( new ConsumerRecord("foo", 0, 3L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "qux", new RecordHeaders(), Optional.empty()))); - ConsumerRecords cr1 = new ConsumerRecords(records1); - ConsumerRecords cr2 = new ConsumerRecords(records2); - ConsumerRecords cr3 = new ConsumerRecords(records3); - ConsumerRecords cr4 = new ConsumerRecords(records4); - ConsumerRecords cr5 = new ConsumerRecords(Collections.emptyMap()); + ConsumerRecords cr1 = new ConsumerRecords(records1, Map.of()); + ConsumerRecords cr2 = new ConsumerRecords(records2, Map.of()); + ConsumerRecords cr3 = new ConsumerRecords(records3, Map.of()); + ConsumerRecords cr4 = new ConsumerRecords(records4, Map.of()); + ConsumerRecords cr5 = new ConsumerRecords(Collections.emptyMap(), Map.of()); given(consumer.poll(any(Duration.class))).willReturn(cr1, cr2, cr3, cr4, cr5); ConsumerFactory consumerFactory = mock(ConsumerFactory.class); willReturn(Collections.singletonMap(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1)).given(consumerFactory) @@ -404,13 +404,13 @@ void testAckOutOfOrder() { records6.put(topicPartition, Collections.singletonList( new ConsumerRecord("foo", 0, 5L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "buz", new RecordHeaders(), Optional.empty()))); - ConsumerRecords cr1 = new ConsumerRecords(records1); - ConsumerRecords cr2 = new ConsumerRecords(records2); - ConsumerRecords cr3 = new ConsumerRecords(records3); - ConsumerRecords cr4 = new ConsumerRecords(records4); - ConsumerRecords cr5 = new ConsumerRecords(records5); - ConsumerRecords cr6 = new ConsumerRecords(records6); - ConsumerRecords cr7 = new ConsumerRecords(Collections.emptyMap()); + ConsumerRecords cr1 = new ConsumerRecords(records1, Map.of()); + ConsumerRecords cr2 = new ConsumerRecords(records2, Map.of()); + ConsumerRecords cr3 = new ConsumerRecords(records3, Map.of()); + ConsumerRecords cr4 = new ConsumerRecords(records4, Map.of()); + ConsumerRecords cr5 = new ConsumerRecords(records5, Map.of()); + ConsumerRecords cr6 = new ConsumerRecords(records6, Map.of()); + ConsumerRecords cr7 = new ConsumerRecords(Collections.emptyMap(), Map.of()); given(consumer.poll(any(Duration.class))).willReturn(cr1, cr2, cr3, cr4, cr5, cr6, cr7); ConsumerFactory consumerFactory = mock(ConsumerFactory.class); willReturn(Collections.singletonMap(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1)).given(consumerFactory) @@ -480,13 +480,13 @@ void testNack() { records1.put(topicPartition, Collections.singletonList( new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo", new RecordHeaders(), Optional.empty()))); - ConsumerRecords cr1 = new ConsumerRecords(records1); + ConsumerRecords cr1 = new ConsumerRecords(records1, Map.of()); Map> records2 = new LinkedHashMap<>(); records2.put(topicPartition, Collections.singletonList( new ConsumerRecord("foo", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "bar", new RecordHeaders(), Optional.empty()))); - ConsumerRecords cr2 = new ConsumerRecords(records2); - ConsumerRecords cr3 = new ConsumerRecords(Collections.emptyMap()); + ConsumerRecords cr2 = new ConsumerRecords(records2, Map.of()); + ConsumerRecords cr3 = new ConsumerRecords(Collections.emptyMap(), Map.of()); given(consumer.poll(any(Duration.class))).willReturn(cr1, cr1, cr2, cr2, cr3); ConsumerFactory consumerFactory = mock(ConsumerFactory.class); willReturn(Collections.singletonMap(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1)).given(consumerFactory) @@ -551,13 +551,13 @@ void testNackWithLaterInflight() { records1.put(topicPartition, Collections.singletonList( new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo", new RecordHeaders(), Optional.empty()))); - ConsumerRecords cr1 = new ConsumerRecords(records1); + ConsumerRecords cr1 = new ConsumerRecords(records1, Map.of()); Map> records2 = new LinkedHashMap<>(); records2.put(topicPartition, Collections.singletonList( new ConsumerRecord("foo", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "bar", new RecordHeaders(), Optional.empty()))); - ConsumerRecords cr2 = new ConsumerRecords(records2); - ConsumerRecords cr3 = new ConsumerRecords(Collections.emptyMap()); + ConsumerRecords cr2 = new ConsumerRecords(records2, Map.of()); + ConsumerRecords cr3 = new ConsumerRecords(Collections.emptyMap(), Map.of()); given(consumer.poll(any(Duration.class))).willReturn(cr1, cr2, cr1, cr2, cr3); ConsumerFactory consumerFactory = mock(ConsumerFactory.class); willReturn(Collections.singletonMap(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1)).given(consumerFactory) @@ -656,13 +656,13 @@ void testPollTimeouts() { records1.put(topicPartition, Collections.singletonList( new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo", new RecordHeaders(), Optional.empty()))); - ConsumerRecords cr1 = new ConsumerRecords(records1); + ConsumerRecords cr1 = new ConsumerRecords(records1, Map.of()); given(consumer.poll(Duration.of(20 * 5000, ChronoUnit.MILLIS))).willReturn(cr1, ConsumerRecords.EMPTY); Map> records2 = new LinkedHashMap<>(); records2.put(topicPartition, Collections.singletonList( new ConsumerRecord("foo", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo", new RecordHeaders(), Optional.empty()))); - ConsumerRecords cr2 = new ConsumerRecords(records2); + ConsumerRecords cr2 = new ConsumerRecords(records2, Map.of()); given(consumer.poll(Duration.of(5000, ChronoUnit.MILLIS))).willReturn(cr2, ConsumerRecords.EMPTY); ConsumerFactory consumerFactory = mock(ConsumerFactory.class); willReturn(Collections.singletonMap(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1)).given(consumerFactory) @@ -727,8 +727,8 @@ void testAllowMulti() { new RecordHeaders(), Optional.empty()), new ConsumerRecord("foo", 0, 3L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "qux", new RecordHeaders(), Optional.empty()))); - ConsumerRecords cr1 = new ConsumerRecords(records); - ConsumerRecords cr2 = new ConsumerRecords(Collections.emptyMap()); + ConsumerRecords cr1 = new ConsumerRecords(records, Map.of()); + ConsumerRecords cr2 = new ConsumerRecords(Collections.emptyMap(), Map.of()); given(consumer.poll(any(Duration.class))).willReturn(cr1, cr2); ConsumerFactory consumerFactory = mock(ConsumerFactory.class); willReturn(Collections.singletonMap(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 4)).given(consumerFactory) @@ -775,7 +775,7 @@ void testAllowMulti() { @SuppressWarnings("unchecked") @Test void testTopicPatternBasedMessageSource() { - MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + MockConsumer consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()); TopicPartition topicPartition1 = new TopicPartition("abc_foo", 0); TopicPartition topicPartition2 = new TopicPartition("abc_foo", 1); TopicPartition topicPartition3 = new TopicPartition("def_foo", 0); @@ -836,7 +836,7 @@ void testTopicPatternBasedMessageSource() { @SuppressWarnings({"rawtypes", "unchecked"}) @Test void testStaticPartitionAssignment() { - MockConsumer consumer = spy(new MockConsumer<>(OffsetResetStrategy.EARLIEST)); + MockConsumer consumer = spy(new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name())); TopicPartition beginning = new TopicPartition("foo", 0); TopicPartition end = new TopicPartition("foo", 1); diff --git a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java index 2382dd31dd8..3d038bb3ebb 100644 --- a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java +++ b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2024 the original author or authors. + * Copyright 2016-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,6 +42,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; +import org.jspecify.annotations.Nullable; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -79,7 +80,6 @@ import org.springframework.kafka.test.EmbeddedKafkaKraftBroker; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.kafka.transaction.KafkaTransactionManager; -import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandlingException; import org.springframework.messaging.MessagingException; @@ -141,7 +141,7 @@ class KafkaProducerMessageHandlerTests { static void setup() { embeddedKafka = new EmbeddedKafkaKraftBroker(1, 2, topic1, topic2, topic3, topic4, topic5, topic6); embeddedKafka.afterPropertiesSet(); - Map consumerProps = KafkaTestUtils.consumerProps("testOut", "true", embeddedKafka); + Map consumerProps = KafkaTestUtils.consumerProps(embeddedKafka, "testOut", true); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); ConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); consumer = cf.createConsumer(); @@ -408,7 +408,7 @@ void testOutboundGatewayPrPayload() throws Exception { private void testOutboundGatewayGuts(ProducerRecord payload) throws Exception { ConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory<>( - KafkaTestUtils.consumerProps(topic5, "false", embeddedKafka)); + KafkaTestUtils.consumerProps(embeddedKafka, topic5, false)); ContainerProperties containerProperties = new ContainerProperties(topic6); final CountDownLatch assigned = new CountDownLatch(1); containerProperties.setConsumerRebalanceListener(new ConsumerRebalanceListener() { @@ -507,7 +507,7 @@ void testConsumeAndProduceTransaction() throws Exception { return null; }).given(mockConsumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class)); ConsumerRecords records = new ConsumerRecords(Collections.singletonMap(topicPartition, - Collections.singletonList(new ConsumerRecord<>("foo", 0, 0, "key", "value")))); + Collections.singletonList(new ConsumerRecord<>("foo", 0, 0, "key", "value"))), Map.of()); final AtomicBoolean done = new AtomicBoolean(); willAnswer(i -> { if (done.compareAndSet(false, true)) { @@ -642,7 +642,7 @@ void testConsumeAndProduceTransactionTxIdOverride() throws Exception { return null; }).given(mockConsumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class)); ConsumerRecords records = new ConsumerRecords(Collections.singletonMap(topicPartition, - Collections.singletonList(new ConsumerRecord<>("foo", 0, 0, "key", "value")))); + Collections.singletonList(new ConsumerRecord<>("foo", 0, 0, "key", "value"))), Map.of()); final AtomicBoolean done = new AtomicBoolean(); willAnswer(i -> { if (done.compareAndSet(false, true)) {