Skip to content

Commit a090dc3

Browse files
authored
MINOR: Cleanup Core Module- Scala Modules (4/n) (#19805)
Now that Kafka Brokers support Java 17, this PR makes some changes in core module. The changes in this PR are limited to only some Scala files in the Core module's tests. The changes mostly include: - Collections.emptyList(), Collections.singletonList() and Arrays.asList() are replaced with List.of() - Collections.emptyMap() and Collections.singletonMap() are replaced with Map.of() - Collections.singleton() is replaced with Set.of() To be clear, the directories being targeted in this PR from unit.kafka module: - log - network - security - tools - utils Reviewers: TengYao Chi <frankvicky@apache.org>
1 parent f866594 commit a090dc3

15 files changed

+169
-159
lines changed

core/src/test/scala/unit/kafka/KafkaConfigTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,12 +116,12 @@ class KafkaConfigTest {
116116
// We should be also able to set completely new property
117117
val config3 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact")))
118118
assertEquals(1, config3.nodeId)
119-
assertEquals(util.Arrays.asList("compact"), config3.logCleanupPolicy)
119+
assertEquals(util.List.of("compact"), config3.logCleanupPolicy)
120120

121121
// We should be also able to set several properties
122122
val config4 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact,delete", "--override", "node.id=2")))
123123
assertEquals(2, config4.nodeId)
124-
assertEquals(util.Arrays.asList("compact","delete"), config4.logCleanupPolicy)
124+
assertEquals(util.List.of("compact","delete"), config4.logCleanupPolicy)
125125
}
126126

127127
@Test

core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,7 @@ import org.junit.jupiter.api.{AfterEach, Test}
3737
import java.lang.{Long => JLong}
3838
import java.util
3939
import java.util.concurrent.ConcurrentHashMap
40-
import scala.collection.mutable
41-
import scala.jdk.CollectionConverters._
40+
import java.util.stream.Collectors
4241
import scala.jdk.OptionConverters.RichOptional
4342

4443
/**
@@ -61,13 +60,13 @@ class LogCleanerManagerTest extends Logging {
6160
val offset = 999
6261
val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false)
6362

64-
val cleanerCheckpoints: mutable.Map[TopicPartition, JLong] = mutable.Map[TopicPartition, JLong]()
63+
val cleanerCheckpoints: util.HashMap[TopicPartition, JLong] = new util.HashMap[TopicPartition, JLong]()
6564

6665
class LogCleanerManagerMock(logDirs: util.List[File],
6766
logs: util.concurrent.ConcurrentMap[TopicPartition, UnifiedLog],
6867
logDirFailureChannel: LogDirFailureChannel) extends LogCleanerManager(logDirs, logs, logDirFailureChannel) {
6968
override def allCleanerCheckpoints: util.Map[TopicPartition, JLong] = {
70-
cleanerCheckpoints.toMap.asJava
69+
cleanerCheckpoints
7170
}
7271

7372
override def updateCheckpoints(dataDir: File, partitionToUpdateOrAdd: Optional[util.Map.Entry[TopicPartition, JLong]],
@@ -382,7 +381,11 @@ class LogCleanerManagerTest extends Logging {
382381
assertEquals(0, cleanable.size, "should have 0 logs ready to be compacted")
383382

384383
// log cleanup finished, and log can be picked up for compaction
385-
cleanerManager.resumeCleaning(deletableLog.asScala.map(_.getKey).toSet.asJava)
384+
cleanerManager.resumeCleaning(
385+
deletableLog.stream()
386+
.map[TopicPartition](entry => entry.getKey)
387+
.collect(Collectors.toSet[TopicPartition]())
388+
)
386389
val cleanable2 = cleanerManager.grabFilthiestCompactedLog(time, new PreCleanStats()).toScala
387390
assertEquals(1, cleanable2.size, "should have 1 logs ready to be compacted")
388391

@@ -396,7 +399,7 @@ class LogCleanerManagerTest extends Logging {
396399
assertEquals(0, deletableLog2.size, "should have 0 logs ready to be deleted")
397400

398401
// compaction done, should have 1 log eligible for log cleanup
399-
cleanerManager.doneDeleting(Seq(cleanable2.get.topicPartition).asJava)
402+
cleanerManager.doneDeleting(util.List.of(cleanable2.get.topicPartition))
400403
val deletableLog3 = cleanerManager.pauseCleaningForNonCompactedPartitions()
401404
assertEquals(1, deletableLog3.size, "should have 1 logs ready to be deleted")
402405
}
@@ -501,9 +504,13 @@ class LogCleanerManagerTest extends Logging {
501504
val pausedPartitions = cleanerManager.pauseCleaningForNonCompactedPartitions()
502505
// Log truncation happens due to unclean leader election
503506
cleanerManager.abortAndPauseCleaning(log.topicPartition)
504-
cleanerManager.resumeCleaning(Set(log.topicPartition).asJava)
507+
cleanerManager.resumeCleaning(util.Set.of(log.topicPartition))
505508
// log cleanup finishes and pausedPartitions are resumed
506-
cleanerManager.resumeCleaning(pausedPartitions.asScala.map(_.getKey).toSet.asJava)
509+
cleanerManager.resumeCleaning(
510+
pausedPartitions.stream()
511+
.map[TopicPartition](entry => entry.getKey)
512+
.collect(Collectors.toSet[TopicPartition]())
513+
)
507514

508515
assertEquals(Optional.empty(), cleanerManager.cleaningState(log.topicPartition))
509516
}
@@ -522,7 +529,11 @@ class LogCleanerManagerTest extends Logging {
522529
// Broker processes StopReplicaRequest with delete=true
523530
cleanerManager.abortCleaning(log.topicPartition)
524531
// log cleanup finishes and pausedPartitions are resumed
525-
cleanerManager.resumeCleaning(pausedPartitions.asScala.map(_.getKey).toSet.asJava)
532+
cleanerManager.resumeCleaning(
533+
pausedPartitions.stream()
534+
.map[TopicPartition](entry => entry.getKey)
535+
.collect(Collectors.toSet[TopicPartition]())
536+
)
526537

527538
assertEquals(Optional.empty(), cleanerManager.cleaningState(log.topicPartition))
528539
}
@@ -743,17 +754,17 @@ class LogCleanerManagerTest extends Logging {
743754
val cleanerManager: LogCleanerManager = createCleanerManager(log)
744755
val tp = new TopicPartition("log", 0)
745756

746-
assertThrows(classOf[IllegalStateException], () => cleanerManager.doneDeleting(Seq(tp).asJava))
757+
assertThrows(classOf[IllegalStateException], () => cleanerManager.doneDeleting(util.List.of(tp)))
747758

748759
cleanerManager.setCleaningState(tp, LogCleaningState.logCleaningPaused(1))
749-
assertThrows(classOf[IllegalStateException], () => cleanerManager.doneDeleting(Seq(tp).asJava))
760+
assertThrows(classOf[IllegalStateException], () => cleanerManager.doneDeleting(util.List.of(tp)))
750761

751762
cleanerManager.setCleaningState(tp, LOG_CLEANING_IN_PROGRESS)
752-
cleanerManager.doneDeleting(Seq(tp).asJava)
763+
cleanerManager.doneDeleting(util.List.of(tp))
753764
assertTrue(cleanerManager.cleaningState(tp).isEmpty)
754765

755766
cleanerManager.setCleaningState(tp, LOG_CLEANING_ABORTED)
756-
cleanerManager.doneDeleting(Seq(tp).asJava)
767+
cleanerManager.doneDeleting(util.List.of(tp))
757768
assertEquals(LogCleaningState.logCleaningPaused(1), cleanerManager.cleaningState(tp).get)
758769
}
759770

@@ -771,7 +782,7 @@ class LogCleanerManagerTest extends Logging {
771782

772783
val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time, new PreCleanStats())
773784
assertEquals(Optional.empty(), filthiestLog, "Log should not be selected for cleaning")
774-
assertEquals(20L, cleanerCheckpoints(tp), "Unselected log should have checkpoint offset updated")
785+
assertEquals(20L, cleanerCheckpoints.get(tp), "Unselected log should have checkpoint offset updated")
775786
}
776787

777788
/**
@@ -793,17 +804,17 @@ class LogCleanerManagerTest extends Logging {
793804

794805
val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time, new PreCleanStats()).get
795806
assertEquals(tp1, filthiestLog.topicPartition, "Dirtier log should be selected")
796-
assertEquals(15L, cleanerCheckpoints(tp0), "Unselected log should have checkpoint offset updated")
807+
assertEquals(15L, cleanerCheckpoints.get(tp0), "Unselected log should have checkpoint offset updated")
797808
}
798809

799810
private def createCleanerManager(log: UnifiedLog): LogCleanerManager = {
800811
val logs = new util.concurrent.ConcurrentHashMap[TopicPartition, UnifiedLog]()
801812
logs.put(topicPartition, log)
802-
new LogCleanerManager(Seq(logDir, logDir2).asJava, logs, null)
813+
new LogCleanerManager(util.List.of(logDir, logDir2), logs, null)
803814
}
804815

805816
private def createCleanerManagerMock(pool: util.concurrent.ConcurrentMap[TopicPartition, UnifiedLog]): LogCleanerManagerMock = {
806-
new LogCleanerManagerMock(Seq(logDir).asJava, pool, null)
817+
new LogCleanerManagerMock(util.List.of(logDir), pool, null)
807818
}
808819

809820
private def createLog(segmentSize: Int,

core/src/test/scala/unit/kafka/log/LogCleanerTest.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -913,7 +913,7 @@ class LogCleanerTest extends Logging {
913913

914914
// clean the log
915915
val stats = new CleanerStats(Time.SYSTEM)
916-
cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head).asJava, map, 0L, stats, new CleanedTransactionMetadata, -1, log.logSegments.asScala.head.readNextOffset)
916+
cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head), map, 0L, stats, new CleanedTransactionMetadata, -1, log.logSegments.asScala.head.readNextOffset)
917917
val shouldRemain = LogTestUtils.keysInLog(log).filterNot(keys.contains)
918918
assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
919919
}
@@ -926,7 +926,7 @@ class LogCleanerTest extends Logging {
926926
val (log, offsetMap) = createLogWithMessagesLargerThanMaxSize(largeMessageSize = 1024 * 1024)
927927

928928
val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
929-
cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head).asJava, offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, -1, log.logSegments.asScala.head.readNextOffset)
929+
cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head), offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, -1, log.logSegments.asScala.head.readNextOffset)
930930
val shouldRemain = LogTestUtils.keysInLog(log).filter(k => !offsetMap.map.containsKey(k.toString))
931931
assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
932932
}
@@ -945,7 +945,7 @@ class LogCleanerTest extends Logging {
945945

946946
val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
947947
assertThrows(classOf[CorruptRecordException], () =>
948-
cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head).asJava, offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, -1, log.logSegments.asScala.head.readNextOffset)
948+
cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head), offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, -1, log.logSegments.asScala.head.readNextOffset)
949949
)
950950
}
951951

@@ -962,7 +962,7 @@ class LogCleanerTest extends Logging {
962962

963963
val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
964964
assertThrows(classOf[CorruptRecordException], () =>
965-
cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head).asJava, offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, -1, log.logSegments.asScala.head.readNextOffset)
965+
cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head), offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, -1, log.logSegments.asScala.head.readNextOffset)
966966
)
967967
}
968968

@@ -1636,7 +1636,7 @@ class LogCleanerTest extends Logging {
16361636

16371637
// Try to clean segment with offset overflow. This will trigger log split and the cleaning itself must abort.
16381638
assertThrows(classOf[LogCleaningAbortedException], () =>
1639-
cleaner.cleanSegments(log, Seq(segmentWithOverflow).asJava, offsetMap, 0L, new CleanerStats(Time.SYSTEM),
1639+
cleaner.cleanSegments(log, util.List.of(segmentWithOverflow), offsetMap, 0L, new CleanerStats(Time.SYSTEM),
16401640
new CleanedTransactionMetadata, -1, segmentWithOverflow.readNextOffset)
16411641
)
16421642
assertEquals(numSegmentsInitial + 1, log.logSegments.size)
@@ -1646,7 +1646,7 @@ class LogCleanerTest extends Logging {
16461646
// Clean each segment now that split is complete.
16471647
val upperBoundOffset = log.logSegments.asScala.last.readNextOffset
16481648
for (segmentToClean <- log.logSegments.asScala)
1649-
cleaner.cleanSegments(log, List(segmentToClean).asJava, offsetMap, 0L, new CleanerStats(Time.SYSTEM),
1649+
cleaner.cleanSegments(log, util.List.of(segmentToClean), offsetMap, 0L, new CleanerStats(Time.SYSTEM),
16501650
new CleanedTransactionMetadata, -1, upperBoundOffset)
16511651
assertEquals(expectedKeysAfterCleaning, LogTestUtils.keysInLog(log))
16521652
assertFalse(LogTestUtils.hasOffsetOverflow(log))

core/src/test/scala/unit/kafka/log/LogConfigTest.scala

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ import org.apache.kafka.server.common.MetadataVersion
2727
import org.junit.jupiter.api.Assertions._
2828
import org.junit.jupiter.api.Test
2929

30-
import java.util.{Collections, Properties}
30+
import java.util
31+
import java.util.Properties
3132
import org.apache.kafka.server.config.ServerLogConfigs
3233
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
3334
import org.apache.kafka.storage.internals.log.{LogConfig, ThrottledReplicaListValidator}
@@ -122,7 +123,7 @@ class LogConfigTest {
122123
/* Sanity check that toHtml produces one of the expected configs */
123124
@Test
124125
def testToHtml(): Unit = {
125-
val html = LogConfig.configDefCopy.toHtml(4, (key: String) => "prefix_" + key, Collections.emptyMap())
126+
val html = LogConfig.configDefCopy.toHtml(4, (key: String) => "prefix_" + key, util.Map.of)
126127
val expectedConfig = "<h4><a id=\"file.delete.delay.ms\"></a><a id=\"prefix_file.delete.delay.ms\" href=\"#prefix_file.delete.delay.ms\">file.delete.delay.ms</a></h4>"
127128
assertTrue(html.contains(expectedConfig), s"Could not find `$expectedConfig` in:\n $html")
128129
}
@@ -273,7 +274,7 @@ class LogConfigTest {
273274
props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, localRetentionMs.toString)
274275
props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localRetentionBytes.toString)
275276
assertThrows(classOf[ConfigException],
276-
() => LogConfig.validate(Collections.emptyMap(), props, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
277+
() => LogConfig.validate(util.Map.of, props, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled))
277278
}
278279

279280
@Test
@@ -283,7 +284,7 @@ class LogConfigTest {
283284
val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
284285
val logProps = new Properties()
285286
def validateCleanupPolicy(): Unit = {
286-
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
287+
LogConfig.validate(util.Map.of, logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
287288
}
288289
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)
289290
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
@@ -310,10 +311,10 @@ class LogConfigTest {
310311
val logProps = new Properties()
311312
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
312313
if (sysRemoteStorageEnabled) {
313-
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
314+
LogConfig.validate(util.Map.of, logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
314315
} else {
315316
val message = assertThrows(classOf[ConfigException],
316-
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
317+
() => LogConfig.validate(util.Map.of, logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled))
317318
assertTrue(message.getMessage.contains("Tiered Storage functionality is disabled in the broker"))
318319
}
319320
}
@@ -329,21 +330,21 @@ class LogConfigTest {
329330
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")
330331
if (wasRemoteStorageEnabled) {
331332
val message = assertThrows(classOf[InvalidConfigurationException],
332-
() => LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"),
333-
logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
333+
() => LogConfig.validate(util.Map.of(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"),
334+
logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled))
334335
assertTrue(message.getMessage.contains("It is invalid to disable remote storage without deleting remote data. " +
335336
"If you want to keep the remote data and turn to read only, please set `remote.storage.enable=true,remote.log.copy.disable=true`. " +
336337
"If you want to disable remote storage and delete all remote data, please set `remote.storage.enable=false,remote.log.delete.on.disable=true`."))
337338

338339

339340
// It should be able to disable the remote log storage when delete on disable is set to true
340341
logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true")
341-
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"),
342-
logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
342+
LogConfig.validate(util.Map.of(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"),
343+
logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
343344
} else {
344-
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
345-
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"), logProps,
346-
kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
345+
LogConfig.validate(util.Map.of, logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
346+
LogConfig.validate(util.Map.of(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"), logProps,
347+
kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
347348
}
348349
}
349350

@@ -362,12 +363,12 @@ class LogConfigTest {
362363
logProps.put(TopicConfig.RETENTION_MS_CONFIG, "500")
363364
if (sysRemoteStorageEnabled) {
364365
val message = assertThrows(classOf[ConfigException],
365-
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap,
366-
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
366+
() => LogConfig.validate(util.Map.of, logProps, kafkaConfig.extractLogConfigMap,
367+
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled))
367368
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG))
368369
} else {
369-
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap,
370-
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
370+
LogConfig.validate(util.Map.of, logProps, kafkaConfig.extractLogConfigMap,
371+
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
371372
}
372373
}
373374

@@ -386,12 +387,12 @@ class LogConfigTest {
386387
logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, "128")
387388
if (sysRemoteStorageEnabled) {
388389
val message = assertThrows(classOf[ConfigException],
389-
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap,
390-
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
390+
() => LogConfig.validate(util.Map.of, logProps, kafkaConfig.extractLogConfigMap,
391+
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled))
391392
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG))
392393
} else {
393-
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap,
394-
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
394+
LogConfig.validate(util.Map.of, logProps, kafkaConfig.extractLogConfigMap,
395+
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
395396
}
396397
}
397398

@@ -406,10 +407,10 @@ class LogConfigTest {
406407

407408
if (sysRemoteStorageEnabled) {
408409
val message = assertThrows(classOf[ConfigException],
409-
() => LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
410+
() => LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled))
410411
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG))
411412
} else {
412-
LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
413+
LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
413414
}
414415
}
415416

0 commit comments

Comments
 (0)