Skip to content

Commit 05d6494

Browse files
authored
KafkaTestUtils consumer properties API unification (#3938)
* Deprecate `KafkaTestUtils.consumerProps` methods based on the `String` for `autoCommit` * Introduce `KafkaTestUtils.consumerProps` based on the `boolean` for `autoCommit` * Fix respective deprecation warnings Signed-off-by: Mikhail Polivakha <mikhailpolivakha@gmail.com>
1 parent 9674334 commit 05d6494

File tree

49 files changed

+144
-120
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+144
-120
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
* @author Hugo Wood
6060
* @author Artem Bilan
6161
* @author Sanghyeok An
62+
* @author Mikhail Polivakha
6263
*/
6364
public final class KafkaTestUtils {
6465

@@ -81,13 +82,30 @@ private KafkaTestUtils() {
8182
* @param autoCommit the auto commit.
8283
* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
8384
* @return the properties.
85+
* @deprecated please, use {@link #consumerProps(EmbeddedKafkaBroker, String, boolean)} instead
8486
*/
87+
@Deprecated(forRemoval = true, since = "4.0.0")
8588
public static Map<String, Object> consumerProps(String group, String autoCommit,
8689
EmbeddedKafkaBroker embeddedKafka) {
8790

8891
return consumerProps(embeddedKafka.getBrokersAsString(), group, autoCommit);
8992
}
9093

94+
/**
95+
* Set up test properties for an {@code <Integer, String>} consumer.
96+
*
97+
* @param group the group id.
98+
* @param autoCommit the auto commit.
99+
* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
100+
* @return the properties.
101+
* @since 4.0
102+
*/
103+
public static Map<String, Object> consumerProps(EmbeddedKafkaBroker embeddedKafka, String group,
104+
boolean autoCommit) {
105+
106+
return consumerProps(embeddedKafka.getBrokersAsString(), group, autoCommit);
107+
}
108+
91109
/**
92110
* Set up test properties for an {@code <Integer, String>} consumer.
93111
* @param brokers the bootstrapServers property.
@@ -96,7 +114,7 @@ public static Map<String, Object> consumerProps(String group, String autoCommit,
96114
* @since 3.3
97115
*/
98116
public static Map<String, Object> consumerProps(String brokers, String group) {
99-
return consumerProps(brokers, group, "false");
117+
return consumerProps(brokers, group, false);
100118
}
101119

102120
/**
@@ -114,8 +132,22 @@ public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafk
114132
* @param group the group id.
115133
* @param autoCommit the auto commit.
116134
* @return the properties.
117-
*/
135+
* @deprecated Please, use {@link #consumerProps(String, String, boolean)} instead.
136+
*/
137+
@Deprecated(forRemoval = true, since = "4.0.0")
118138
public static Map<String, Object> consumerProps(String brokers, String group, String autoCommit) {
139+
return consumerProps(brokers, group, Boolean.parseBoolean(autoCommit));
140+
}
141+
142+
/**
143+
* Set up test properties for an {@code <Integer, String>} consumer.
144+
* @param brokers the bootstrapServers property.
145+
* @param group the group id.
146+
* @param autoCommit the auto commit.
147+
* @return the properties.
148+
* @since 4.0
149+
*/
150+
public static Map<String, Object> consumerProps(String brokers, String group, boolean autoCommit) {
119151
Map<String, Object> props = new HashMap<>();
120152
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
121153
props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
@@ -235,7 +267,7 @@ public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consume
235267
public static ConsumerRecord<?, ?> getOneRecord(String brokerAddresses, String group, String topic, int partition,
236268
boolean seekToLast, boolean commit, Duration timeout) {
237269

238-
Map<String, Object> consumerConfig = consumerProps(brokerAddresses, group, "false");
270+
Map<String, Object> consumerConfig = consumerProps(brokerAddresses, group, false);
239271
consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
240272
try (KafkaConsumer consumer = new KafkaConsumer(consumerConfig)) {
241273
TopicPartition topicPart = new TopicPartition(topic, partition);

spring-kafka-test/src/test/java/org/springframework/kafka/test/AddressableEmbeddedBrokerTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void testKafkaEmbedded() {
6969

7070
@Test
7171
public void testLateStartedConsumer() {
72-
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(TEST_EMBEDDED, "false", this.broker);
72+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(this.broker, TEST_EMBEDDED, false);
7373
Consumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
7474
this.broker.consumeFromAnEmbeddedTopic(consumer, TEST_EMBEDDED);
7575

@@ -78,7 +78,7 @@ public void testLateStartedConsumer() {
7878
producer.close();
7979
KafkaTestUtils.getSingleRecord(consumer, TEST_EMBEDDED);
8080

81-
consumerProps = KafkaTestUtils.consumerProps("another" + TEST_EMBEDDED, "false", this.broker);
81+
consumerProps = KafkaTestUtils.consumerProps(this.broker, "another" + TEST_EMBEDDED, false);
8282
Consumer<Integer, String> consumer2 = new KafkaConsumer<>(consumerProps);
8383
this.broker.consumeFromAnEmbeddedTopic(consumer2, TEST_EMBEDDED);
8484
KafkaTestUtils.getSingleRecord(consumer2, TEST_EMBEDDED);

spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaKraftBrokerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ void testConsumeFromEmbeddedWithSeekToEnd() {
5151
Map<String, Object> producerProps = KafkaTestUtils.producerProps(kafka);
5252
KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProps);
5353
producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "beforeSeekToEnd"));
54-
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("seekTest", "false", kafka);
54+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(kafka, "seekTest", false);
5555
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
5656
kafka.consumeFromAnEmbeddedTopic(consumer, true /* seekToEnd */, "seekTestTopic");
5757
producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "afterSeekToEnd"));

spring-kafka-test/src/test/java/org/springframework/kafka/test/utils/KafkaTestUtilsTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ void testGetSingleWithMoreThanOneTopic(EmbeddedKafkaBroker broker) {
5555
producer.send(new ProducerRecord<>("singleTopic1", 0, 1, "foo"));
5656
producer.send(new ProducerRecord<>("singleTopic2", 0, 1, "foo"));
5757
producer.close();
58-
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("ktuTests1", "false", broker);
58+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(broker, "ktuTests1", false);
5959
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
6060
broker.consumeFromAllEmbeddedTopics(consumer);
6161
KafkaTestUtils.getSingleRecord(consumer, "singleTopic1");
@@ -72,7 +72,7 @@ void testGetSingleWithMoreThanOneTopicRecordNotThereYet(EmbeddedKafkaBroker brok
7272
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
7373
KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProps);
7474
producer.send(new ProducerRecord<>("singleTopic4", 0, 1, "foo"));
75-
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("ktuTests2", "false", broker);
75+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(broker, "ktuTests2", false);
7676
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
7777
broker.consumeFromEmbeddedTopics(consumer, "singleTopic4", "singleTopic5");
7878
long t1 = System.currentTimeMillis();
@@ -117,7 +117,7 @@ public void testMultiMinRecords(EmbeddedKafkaBroker broker) {
117117
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
118118
KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProps);
119119
producer.send(new ProducerRecord<>("multiTopic1", 0, 1, "foo"));
120-
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("ktuTests3", "false", broker);
120+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(broker, "ktuTests3", false);
121121
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
122122
broker.consumeFromAnEmbeddedTopic(consumer, "multiTopic1");
123123
new Thread(() -> {

spring-kafka/src/test/java/org/springframework/kafka/annotation/AliasPropertiesTests.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,7 @@ public DefaultKafkaConsumerFactory<Integer, String> consumerFactory() {
157157

158158
@Bean
159159
public Map<String, Object> consumerConfigs() {
160-
Map<String, Object> consumerProps =
161-
KafkaTestUtils.consumerProps("myAliasGroup", "false", embeddedKafka());
162-
return consumerProps;
160+
return KafkaTestUtils.consumerProps(embeddedKafka(), "myAliasGroup", false);
163161
}
164162

165163
@Bean

spring-kafka/src/test/java/org/springframework/kafka/annotation/AsyncListenerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ public DefaultKafkaConsumerFactory<String, String> consumerFactory(
343343

344344
@Bean
345345
public Map<String, Object> consumerConfigs(EmbeddedKafkaBroker embeddedKafka) {
346-
return KafkaTestUtils.consumerProps("test", "false", embeddedKafka);
346+
return KafkaTestUtils.consumerProps(embeddedKafka, "test", false);
347347
}
348348

349349
@Bean

spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversion2Tests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public DefaultKafkaConsumerFactory<Integer, Foo> consumerFactory() {
105105
@Bean
106106
public Map<String, Object> consumerConfigs() {
107107
Map<String, Object> consumerProps =
108-
KafkaTestUtils.consumerProps(DEFAULT_TEST_GROUP_ID, "false", this.embeddedKafka);
108+
KafkaTestUtils.consumerProps(this.embeddedKafka, DEFAULT_TEST_GROUP_ID, false);
109109
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
110110
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
111111
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedFooProvider.class);

spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ public DefaultKafkaConsumerFactory<Integer, Foo> consumerFactory(EmbeddedKafkaBr
199199
@Bean
200200
public Map<String, Object> consumerConfigs(EmbeddedKafkaBroker embeddedKafka) {
201201
Map<String, Object> consumerProps =
202-
KafkaTestUtils.consumerProps(DEFAULT_TEST_GROUP_ID, "false", embeddedKafka);
202+
KafkaTestUtils.consumerProps(embeddedKafka, DEFAULT_TEST_GROUP_ID, false);
203203
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
204204
consumerProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1000);
205205
consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1555,7 +1555,7 @@ private ConsumerFactory<Integer, String> configuredConsumerFactory(String client
15551555

15561556
@Bean
15571557
public Map<String, Object> consumerConfigs() {
1558-
return KafkaTestUtils.consumerProps(DEFAULT_TEST_GROUP_ID, "false", this.embeddedKafka);
1558+
return KafkaTestUtils.consumerProps(this.embeddedKafka, DEFAULT_TEST_GROUP_ID, false);
15591559
}
15601560

15611561
@Bean

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ public void testNestedTxProducerIsCached() throws Exception {
367367
DefaultKafkaProducerFactory<Integer, String> pfTx = new DefaultKafkaProducerFactory<>(producerProps);
368368
pfTx.setTransactionIdPrefix("fooTx.");
369369
KafkaOperations<Integer, String> templateTx = new KafkaTemplate<>(pfTx);
370-
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("txCache1Group", "false", this.embeddedKafka);
370+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(this.embeddedKafka, "txCache1Group", false);
371371
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
372372
AtomicReference<Consumer<Integer, String>> wrapped = new AtomicReference<>();
373373
cf.addPostProcessor(consumer -> {
@@ -418,7 +418,7 @@ public void testNestedTxProducerIsFixed() throws Exception {
418418
TransactionIdSuffixStrategy suffixStrategy = new DefaultTransactionIdSuffixStrategy(3);
419419
pfTx.setTransactionIdSuffixStrategy(suffixStrategy);
420420
KafkaOperations<Integer, String> templateTx = new KafkaTemplate<>(pfTx);
421-
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("txCache1FixedGroup", "false", this.embeddedKafka);
421+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(this.embeddedKafka, "txCache1FixedGroup", false);
422422
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
423423
AtomicReference<Consumer<Integer, String>> wrapped = new AtomicReference<>();
424424
cf.addPostProcessor(consumer -> {
@@ -464,7 +464,7 @@ public void testNestedTxProducerIsFixed() throws Exception {
464464
@ParameterizedTest
465465
@ValueSource(booleans = { true, false })
466466
void listener(boolean closeWithTimeout) {
467-
Map<String, Object> consumerConfig = KafkaTestUtils.consumerProps("txCache1Group", "false", this.embeddedKafka);
467+
Map<String, Object> consumerConfig = KafkaTestUtils.consumerProps(this.embeddedKafka, "txCache1Group", false);
468468
consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "foo-0");
469469
DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(consumerConfig);
470470
List<String> adds = new ArrayList<>();
@@ -503,7 +503,7 @@ public void consumerRemoved(String id, Consumer consumer) {
503503
void configDeserializer() {
504504
Deserializer key = mock(Deserializer.class);
505505
Deserializer value = mock(Deserializer.class);
506-
Map<String, Object> config = KafkaTestUtils.consumerProps("mockGroup", "false", this.embeddedKafka);
506+
Map<String, Object> config = KafkaTestUtils.consumerProps(this.embeddedKafka, "mockGroup", false);
507507
DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(config, key, value);
508508
Deserializer keyDeserializer = cf.getKeyDeserializer();
509509
assertThat(keyDeserializer).isSameAs(key);

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public void producerRemoved(String id, Producer<String, String> producer) {
136136
public static void setUp() {
137137
embeddedKafka = EmbeddedKafkaCondition.getBroker();
138138
Map<String, Object> consumerProps = KafkaTestUtils
139-
.consumerProps("KafkaTemplatetests" + UUID.randomUUID(), "false", embeddedKafka);
139+
.consumerProps(embeddedKafka, "KafkaTemplatetests" + UUID.randomUUID(), false);
140140
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
141141
consumer = cf.createConsumer();
142142
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, INT_KEY_TOPIC);
@@ -165,7 +165,7 @@ void testTemplate() {
165165
template.setDefaultTopic(INT_KEY_TOPIC);
166166

167167
template.setConsumerFactory(
168-
new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps("xx", "false", embeddedKafka)));
168+
new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps(embeddedKafka, "xx", false)));
169169
ConsumerRecords<Integer, String> initialRecords =
170170
template.receive(Collections.singleton(new TopicPartitionOffset(INT_KEY_TOPIC, 1, 1L)));
171171
assertThat(initialRecords).isEmpty();
@@ -475,7 +475,7 @@ void testTemplateDisambiguation() {
475475
pf.setKeySerializer(new StringSerializer());
476476
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf, true);
477477
template.setDefaultTopic(STRING_KEY_TOPIC);
478-
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testTString", "false", embeddedKafka);
478+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(embeddedKafka, "testTString", false);
479479
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
480480
cf.setKeyDeserializer(new StringDeserializer());
481481
Consumer<String, String> localConsumer = cf.createConsumer();
@@ -630,7 +630,7 @@ void testReceiveWhenOffsetIsInvalid(Long offset) {
630630
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
631631

632632
template.setConsumerFactory(
633-
new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps("xx", "false", embeddedKafka)));
633+
new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps(embeddedKafka, "xx", false)));
634634
TopicPartitionOffset tpoWithNullOffset = new TopicPartitionOffset(INT_KEY_TOPIC, 1, offset);
635635

636636
assertThatExceptionOfType(KafkaException.class)

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ public void testLocalTransaction() {
115115
pf.setKeySerializer(new StringSerializer());
116116
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
117117
template.setDefaultTopic(STRING_KEY_TOPIC);
118-
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testLocalTx", "false", embeddedKafka);
118+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(embeddedKafka, "testLocalTx", false);
119119
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
120120
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
121121
cf.setKeyDeserializer(new StringDeserializer());
@@ -173,7 +173,7 @@ public void testLocalTransactionIsFixed() {
173173
pf.setTransactionIdSuffixStrategy(suffixStrategy);
174174
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
175175
template.setDefaultTopic(STRING_KEY_TOPIC);
176-
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testLocalTxFixed", "false", embeddedKafka);
176+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(embeddedKafka, "testLocalTxFixed", false);
177177
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
178178
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
179179
cf.setKeyDeserializer(new StringDeserializer());
@@ -232,7 +232,7 @@ public void testGlobalTransaction() {
232232
pf.setTransactionIdPrefix("my.transaction.");
233233
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
234234
template.setDefaultTopic(STRING_KEY_TOPIC);
235-
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGlobalTx", "false", embeddedKafka);
235+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(embeddedKafka, "testGlobalTx", false);
236236
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
237237
cf.setKeyDeserializer(new StringDeserializer());
238238
Consumer<String, String> consumer = cf.createConsumer();

spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateIntegrationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public class ReactiveKafkaProducerTemplateIntegrationTests {
9999
@BeforeAll
100100
public static void setUpBeforeClass() {
101101
Map<String, Object> consumerProps = KafkaTestUtils
102-
.consumerProps("reactive_consumer_group", "false", EmbeddedKafkaCondition.getBroker());
102+
.consumerProps(EmbeddedKafkaCondition.getBroker(), "reactive_consumer_group", false);
103103
reactiveKafkaConsumerTemplate =
104104
new ReactiveKafkaConsumerTemplate<>(setupReceiverOptionsWithDefaultTopic(consumerProps));
105105
}

spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateTransactionIntegrationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public class ReactiveKafkaProducerTemplateTransactionIntegrationTests {
100100
@BeforeAll
101101
public static void setUpBeforeClass() {
102102
Map<String, Object> consumerProps =
103-
KafkaTestUtils.consumerProps(CONSUMER_GROUP_ID, "false", EmbeddedKafkaCondition.getBroker());
103+
KafkaTestUtils.consumerProps(EmbeddedKafkaCondition.getBroker(), CONSUMER_GROUP_ID, false);
104104
reactiveKafkaConsumerTemplate =
105105
new ReactiveKafkaConsumerTemplate<>(setupReceiverOptionsWithDefaultTopic(consumerProps));
106106
}

spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFa
150150

151151
@Bean
152152
ConsumerFactory<String, String> consumerFactory() {
153-
return new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps("test-group", "false", this.broker));
153+
return new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps(this.broker, "test-group", false));
154154
}
155155

156156
@Bean

spring-kafka/src/test/java/org/springframework/kafka/listener/AsyncAckAfterHandleTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerCon
110110

111111
@Bean
112112
ConsumerFactory<Integer, String> consumerFactory(EmbeddedKafkaBroker broker) {
113-
Map<String, Object> props = KafkaTestUtils.consumerProps("asaac.grp", "false", broker);
113+
Map<String, Object> props = KafkaTestUtils.consumerProps(broker, "asaac.grp", false);
114114
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 3);
115115
return new DefaultKafkaConsumerFactory<>(
116116
props);

0 commit comments

Comments
 (0)