diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/headers.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/headers.adoc index 308f1a64df..5423054b46 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/headers.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/headers.adoc @@ -179,3 +179,40 @@ MessagingMessageConverter converter() { If using Spring Boot, it will auto configure this converter bean into the auto-configured `KafkaTemplate`; otherwise you should add this converter to the template. +[[multi-value-header]] +== Support multi-value header mapping + +Starting with 4.0, multi-value header mapping is supported, where the same logical header key appears more than once in a Kafka record. + +By default, the `HeaderMapper` does **not** create multiple Kafka headers with the same name. +Instead, when it encounters a collection value (e.g., a `List`), it serializes the entire collection into **one** Kafka header whose value is a JSON array. + +* **Producer side:** `DefaultKafkaHeaderMapper` writes the JSON bytes, while `SimpleKafkaHeaderMapper` ignore it. +* **Consumer side:** the mapper exposes the header as a single value—the **last occurrence wins**; earlier duplicates are silently discarded. + +Preserving each individual header requires explicit registration of patterns that designate the header as multi‑valued. + +`DefaultKafkaHeaderMapper#setMultiValueHeaderPatterns(String... patterns)` accepts a list of patterns, which can be either wildcard expressions or exact header names. + +[source, java] +---- +DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper(); + +// Explicit header names +mapper.setMultiValueHeaderPatterns("test-multi-value1", "test-multi-value2"); + +// Wildcard patterns for test-multi-value1, test-multi-value2 +mapper.setMultiValueHeaderPatterns("test-multi-*"); +---- + +Any header whose name matches one of the supplied patterns is + +* **Producer side:** written as separate Kafka headers, one per element. +* **Consumer side:** collected into a `List` that contains the individual header values; each element is returned to the application **after the usual deserialization or type conversion performed by the configured `HeaderMapper`.** + +NOTE: Regular expressions are *not* supported; only the +*+ wildcard is allowed in simple patterns—supporting direct equality and forms such as: +xxx*+, +*xxx+, +*xxx*+, +xxx*yyy+. + +[IMPORTANT] +==== +On the *Producer Side*, When `DefaultKafkaHeaderMapper` serializes a multi-value header, every element in that collection must be of a single Java type—mixing, for example, `String` and `byte[]` values under a single header key will lead to a conversion error. +==== diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index 189fe5200f..cf5ad0a932 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -70,3 +70,9 @@ Several deprecated items have been removed: Spring for Apache Kafka 4.0 supports Kafka 4.0’s new consumer rebalance protocol - https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol[KIP-848]. For details, see xref:kafka/receiving-messages/rebalance-listeners.adoc#new-rebalalcne-protocol[New Consumer Rebalace Protocol docs]. + +[[x40-multi-value-header]] +=== Support multi-value header + +The `DefaultKafkaHeaderMapper` and `SimpleKafkaHeaderMapper` support multi-value header mapping for Kafka records. +More details are available in xref:kafka/headers.adoc#multi-value-header[Support multi-value header mapping]. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java index ddad12080e..6404e29f83 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java @@ -65,6 +65,8 @@ public abstract class AbstractKafkaHeaderMapper implements KafkaHeaderMapper { private final List matchers = new ArrayList<>(); + private final List multiValueHeaderMatchers = new ArrayList<>(); + private final Map rawMappedHeaders = new HashMap<>(); { @@ -191,6 +193,18 @@ public void addRawMappedHeader(String name, boolean toString) { this.rawMappedHeaders.put(name, toString); } + /** + * Add patterns for matching multi-value headers under the same key. + * @param patterns the patterns for header. + * @since 4.0 + */ + public void setMultiValueHeaderPatterns(String ... patterns) { + this.multiValueHeaderMatchers.addAll(Arrays + .stream(patterns) + .map(SimplePatternBasedHeaderMatcher::new) + .toList()); + } + protected boolean matches(String header, Object value) { if (matches(header)) { if ((header.equals(MessageHeaders.REPLY_CHANNEL) || header.equals(MessageHeaders.ERROR_CHANNEL)) @@ -251,6 +265,40 @@ protected Object headerValueToAddOut(String key, Object value) { return valueToAdd; } + /** + * Check whether the header value should be mapped to multiple values. + * @param headerName the header name. + * @return True for multiple values at the same key. + * @since 4.0 + */ + protected boolean doesMatchMultiValueHeader(String headerName) { + for (HeaderMatcher headerMatcher : this.multiValueHeaderMatchers) { + if (headerMatcher.matchHeader(headerName)) { + return true; + } + } + return false; + } + + /** + * Handle non-reserved headers in {@link DefaultKafkaHeaderMapper}. + * @param headerName the header name. + * @param header the header instance. + * @param headers the target headers. + * @since 4.0 + */ + protected void fromUserHeader(String headerName, Header header, final Map headers) { + if (!doesMatchMultiValueHeader(headerName)) { + headers.put(headerName, headerValueToAddIn(header)); + } + else { + @SuppressWarnings("unchecked") + List headerValues = (List) + headers.computeIfAbsent(headerName, key -> new ArrayList<>()); + headerValues.add(headerValueToAddIn(header)); + } + } + @SuppressWarnings("NullAway") // Dataflow analysis limitation @Nullable private byte[] mapRawOut(String header, Object value) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java index 45e6c1eed2..fa4f0c7002 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java @@ -48,6 +48,7 @@ * @author Gary Russell * @author Artem Bilan * @author Soby Chacko + * @author Sanghyoek An * * @since 1.3 * @@ -266,31 +267,17 @@ public void fromHeaders(MessageHeaders headers, Headers target) { final ObjectMapper headerObjectMapper = getObjectMapper(); headers.forEach((key, rawValue) -> { if (matches(key, rawValue)) { - Object valueToAdd = headerValueToAddOut(key, rawValue); - if (valueToAdd instanceof byte[]) { - target.add(new RecordHeader(key, (byte[]) valueToAdd)); - } - else { - try { - String className = valueToAdd.getClass().getName(); - boolean encodeToJson = this.encodeStrings; - if (this.toStringClasses.contains(className)) { - valueToAdd = valueToAdd.toString(); - className = JAVA_LANG_STRING; - encodeToJson = true; - } - if (!encodeToJson && valueToAdd instanceof String) { - target.add(new RecordHeader(key, ((String) valueToAdd).getBytes(getCharset()))); - } - else { - target.add(new RecordHeader(key, headerObjectMapper.writeValueAsBytes(valueToAdd))); - } - jsonHeaders.put(key, className); + if (doesMatchMultiValueHeader(key)) { + if (rawValue instanceof Iterable valuesToMap) { + valuesToMap.forEach(o -> fromHeader(key, o, jsonHeaders, headerObjectMapper, target)); } - catch (Exception e) { - logger.error(e, () -> "Could not map " + key + " with type " + rawValue.getClass().getName()); + else { + fromHeader(key, rawValue, jsonHeaders, headerObjectMapper, target); } } + else { + fromHeader(key, rawValue, jsonHeaders, headerObjectMapper, target); + } } }); if (!jsonHeaders.isEmpty()) { @@ -324,12 +311,44 @@ else if (!(headerName.equals(JSON_TYPES)) && matchesForInbound(headerName)) { populateJsonValueHeader(header, requestedType, headers); } else { - headers.put(headerName, headerValueToAddIn(header)); + fromUserHeader(headerName, header, headers); } } }); } + private void fromHeader(String key, Object rawValue, Map jsonHeaders, + ObjectMapper headerObjectMapper, Headers target) { + + Object valueToAdd = headerValueToAddOut(key, rawValue); + if (valueToAdd instanceof byte[]) { + target.add(new RecordHeader(key, (byte[]) valueToAdd)); + } + else { + try { + String className = valueToAdd.getClass().getName(); + boolean encodeToJson = this.encodeStrings; + if (this.toStringClasses.contains(className)) { + valueToAdd = valueToAdd.toString(); + className = JAVA_LANG_STRING; + encodeToJson = true; + } + final byte[] calculatedValue; + if (!encodeToJson && valueToAdd instanceof String) { + calculatedValue = ((String) valueToAdd).getBytes(getCharset()); + } + else { + calculatedValue = headerObjectMapper.writeValueAsBytes(valueToAdd); + } + target.add(new RecordHeader(key, calculatedValue)); + jsonHeaders.putIfAbsent(key, className); + } + catch (Exception e) { + logger.error(e, () -> "Could not map " + key + " with type " + rawValue.getClass().getName()); + } + } + } + private void populateJsonValueHeader(Header header, String requestedType, Map headers) { Class type = Object.class; boolean trusted = false; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java index 30effa38e0..3c07593312 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/SimpleKafkaHeaderMapper.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,6 +35,7 @@ * The exceptions are correlation and reply headers for request/reply * * @author Gary Russell + * @author Sanghyeok An * @since 2.1.3 * */ @@ -94,27 +95,40 @@ public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patte public void fromHeaders(MessageHeaders headers, Headers target) { headers.forEach((key, value) -> { if (!NEVER.contains(key)) { - Object valueToAdd = headerValueToAddOut(key, value); - if (valueToAdd instanceof byte[] && matches(key, valueToAdd)) { - target.add(new RecordHeader(key, (byte[]) valueToAdd)); + if (doesMatchMultiValueHeader(key)) { + if (value instanceof Iterable valuesToMap) { + valuesToMap.forEach(o -> fromHeader(key, o, target)); + } + else { + fromHeader(key, value, target); + } + } + else { + fromHeader(key, value, target); } } }); } @Override - public void toHeaders(Headers source, Map target) { + public void toHeaders(Headers source, Map headers) { source.forEach(header -> { String headerName = header.key(); if (matchesForInbound(headerName)) { if (headerName.equals(KafkaHeaders.DELIVERY_ATTEMPT)) { - target.put(headerName, ByteBuffer.wrap(header.value()).getInt()); + headers.put(headerName, ByteBuffer.wrap(header.value()).getInt()); } else { - target.put(headerName, headerValueToAddIn(header)); + fromUserHeader(headerName, header, headers); } } }); } + private void fromHeader(String key, Object value, Headers target) { + if (headerValueToAddOut(key, value) instanceof byte[] valueToAdd && matches(key, valueToAdd)) { + target.add(new RecordHeader(key, valueToAdd)); + } + } + } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java index 1d799061cc..1151e6cd3d 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2024 the original author or authors. + * Copyright 2017-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,6 +33,7 @@ import org.junit.jupiter.api.Test; import org.springframework.core.log.LogAccessor; +import org.springframework.kafka.retrytopic.RetryTopicHeaders; import org.springframework.kafka.support.DefaultKafkaHeaderMapper.NonTrustedHeaderType; import org.springframework.kafka.support.serializer.DeserializationException; import org.springframework.kafka.support.serializer.SerializationTestUtils; @@ -56,6 +57,7 @@ * @author Gary Russell * @author Artem Bilan * @author Soby Chacko + * @author Sanghyeok An * * @since 1.3 * @@ -332,6 +334,151 @@ void inboundJson() { .containsKey("baz"); } + @Test + void multiValueHeaderToTest() { + // GIVEN + String multiValueHeader1 = "test-multi-value1"; + byte[] multiValueHeader1Value1 = { 0, 0, 0, 0 }; + byte[] multiValueHeader1Value2 = { 0, 0, 0, 1 }; + byte[] multiValueHeader1Value3 = { 0, 0, 0, 2 }; + byte[] multiValueHeader1Value4 = { 0, 0, 0, 3 }; + + String multiValueHeader2 = "test-multi-value2"; + byte[] multiValueHeader2Value1 = { 0, 0, 0, 4 }; + byte[] multiValueHeader2Value2 = { 0, 0, 0, 5 }; + + String multiValueWildCardHeader1 = "test-wildcard-value1"; + byte[] multiValueWildCardHeader1Value1 = { 0, 0, 0, 6 }; + byte[] multiValueWildCardHeader1Value2 = { 0, 0, 0, 7 }; + + String multiValueWildCardHeader2 = "test-wildcard-value2"; + byte[] multiValueWildCardHeader2Value1 = { 0, 0, 0, 8 }; + byte[] multiValueWildCardHeader2Value2 = { 0, 0, 0, 9 }; + + String singleValueHeader = "test-single-value1"; + byte[] singleValueHeaderValue = { 0, 0, 0, 6 }; + + DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper(); + mapper.setMultiValueHeaderPatterns(multiValueHeader1, multiValueHeader2, "test-wildcard-*"); + + Headers rawHeaders = new RecordHeaders(); + + byte[] deliveryAttemptHeaderValue = { 0, 0, 0, 1 }; + byte[] originalOffsetHeaderValue = { 0, 0, 0, 1 }; + byte[] defaultHeaderAttemptsValues = { 0, 0, 0, 5 }; + + rawHeaders.add(KafkaHeaders.DELIVERY_ATTEMPT, deliveryAttemptHeaderValue); + rawHeaders.add(KafkaHeaders.ORIGINAL_OFFSET, originalOffsetHeaderValue); + rawHeaders.add(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, defaultHeaderAttemptsValues); + rawHeaders.add(singleValueHeader, singleValueHeaderValue); + + rawHeaders.add(multiValueHeader1, multiValueHeader1Value1); + rawHeaders.add(multiValueHeader1, multiValueHeader1Value2); + rawHeaders.add(multiValueHeader1, multiValueHeader1Value3); + rawHeaders.add(multiValueHeader1, multiValueHeader1Value4); + + rawHeaders.add(multiValueHeader2, multiValueHeader2Value1); + rawHeaders.add(multiValueHeader2, multiValueHeader2Value2); + + rawHeaders.add(multiValueWildCardHeader1, multiValueWildCardHeader1Value1); + rawHeaders.add(multiValueWildCardHeader1, multiValueWildCardHeader1Value2); + rawHeaders.add(multiValueWildCardHeader2, multiValueWildCardHeader2Value1); + rawHeaders.add(multiValueWildCardHeader2, multiValueWildCardHeader2Value2); + + // WHEN + Map mappedHeaders = new HashMap<>(); + mapper.toHeaders(rawHeaders, mappedHeaders); + + // THEN + assertThat(mappedHeaders.get(KafkaHeaders.DELIVERY_ATTEMPT)).isEqualTo(1); + assertThat(mappedHeaders.get(KafkaHeaders.ORIGINAL_OFFSET)).isEqualTo(originalOffsetHeaderValue); + assertThat(mappedHeaders.get(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS)).isEqualTo(defaultHeaderAttemptsValues); + assertThat(mappedHeaders.get(singleValueHeader)).isEqualTo(singleValueHeaderValue); + + assertThat(mappedHeaders) + .extractingByKey(multiValueHeader1, InstanceOfAssertFactories.list(byte[].class)) + .containsExactly(multiValueHeader1Value1, multiValueHeader1Value2, + multiValueHeader1Value3, multiValueHeader1Value4); + + assertThat(mappedHeaders) + .extractingByKey(multiValueHeader2, InstanceOfAssertFactories.list(byte[].class)) + .containsExactly(multiValueHeader2Value1, multiValueHeader2Value2); + + assertThat(mappedHeaders) + .extractingByKey(multiValueWildCardHeader1, InstanceOfAssertFactories.list(byte[].class)) + .containsExactly(multiValueWildCardHeader1Value1, multiValueWildCardHeader1Value2); + + assertThat(mappedHeaders) + .extractingByKey(multiValueWildCardHeader2, InstanceOfAssertFactories.list(byte[].class)) + .containsExactly(multiValueWildCardHeader2Value1, multiValueWildCardHeader2Value2); + } + + @Test + void multiValueHeaderFromTest() { + // GIVEN + String multiValueHeader1 = "test-multi-value1"; + byte[] multiValueHeader1Value1 = { 0, 0, 0, 1 }; + byte[] multiValueHeader1Value2 = { 0, 0, 0, 2 }; + + String multiValueHeader2 = "test-multi-value2"; + byte[] multiValueHeader2Value1 = { 0, 0, 0, 3 }; + byte[] multiValueHeader2Value2 = { 0, 0, 0, 4 }; + + String multiValueHeader3 = "test-other-multi-value1"; + byte[] multiValueHeader3Value1 = { 0, 0, 0, 9 }; + byte[] multiValueHeader3Value2 = { 0, 0, 0, 10 }; + + String multiValueHeader4 = "test-prefix-match-multi"; + byte[] multiValueHeader4Value1 = { 0, 0, 0, 11 }; + byte[] multiValueHeader4Value2 = { 0, 0, 0, 12 }; + + String singleValueHeader = "test-single-value1"; + byte[] singleValueHeaderValue1 = { 0, 0, 0, 5 }; + + Message message = MessageBuilder + .withPayload("test-multi-value-header") + .setHeader(multiValueHeader1, List.of(multiValueHeader1Value1, + multiValueHeader1Value2)) + .setHeader(multiValueHeader2, List.of(multiValueHeader2Value1, + multiValueHeader2Value2)) + .setHeader(multiValueHeader3, List.of(multiValueHeader3Value1, + multiValueHeader3Value2)) + .setHeader(multiValueHeader4, List.of(multiValueHeader4Value1, + multiValueHeader4Value2)) + .setHeader(singleValueHeader, singleValueHeaderValue1) + .build(); + + DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper(); + mapper.setMultiValueHeaderPatterns("test-multi-*", + multiValueHeader3, + "*-prefix-match-multi*"); + + // WHEN + Headers results = new RecordHeaders(); + mapper.fromHeaders(message.getHeaders(), results); + + // THEN + assertThat(results.headers(multiValueHeader1)) + .extracting(Header::value) + .containsExactly(multiValueHeader1Value1, multiValueHeader1Value2); + + assertThat(results.headers(multiValueHeader2)) + .extracting(Header::value) + .containsExactly(multiValueHeader2Value1, multiValueHeader2Value2); + + assertThat(results.headers(multiValueHeader3)) + .extracting(Header::value) + .containsExactly(multiValueHeader3Value1, multiValueHeader3Value2); + + assertThat(results.headers(multiValueHeader4)) + .extracting(Header::value) + .containsExactly(multiValueHeader4Value1, multiValueHeader4Value2); + + assertThat(results.headers(singleValueHeader)) + .extracting(Header::value) + .containsExactly(singleValueHeaderValue1); + } + @Test void deserializationExceptionHeadersAreMappedAsNonByteArray() { DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/SimpleKafkaHeaderMapperTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/SimpleKafkaHeaderMapperTests.java index 9b19bfda6b..fbbfd1c8c2 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/SimpleKafkaHeaderMapperTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/SimpleKafkaHeaderMapperTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2024 the original author or authors. + * Copyright 2019-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,15 +18,20 @@ import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; +import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Test; +import org.springframework.kafka.retrytopic.RetryTopicHeaders; +import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.MessageBuilder; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatIllegalStateException; @@ -34,6 +39,7 @@ /** * @author Gary Russell + * @author Sanghyeok An * @since 2.2.5 * */ @@ -186,4 +192,103 @@ void inboundMappingWithPatterns() { .containsKey(KafkaHeaders.DELIVERY_ATTEMPT); } + @Test + void multiValueHeaderToTest() { + // GIVEN + String multiValueHeader1 = "test-multi-value1"; + byte[] multiValueHeader1Value1 = { 0, 0, 0, 0 }; + byte[] multiValueHeader1Value2 = { 0, 0, 0, 1 }; + byte[] multiValueHeader1Value3 = { 0, 0, 0, 2 }; + byte[] multiValueHeader1Value4 = { 0, 0, 0, 3 }; + + String multiValueHeader2 = "test-multi-value2"; + byte[] multiValueHeader2Value1 = { 0, 0, 0, 4 }; + byte[] multiValueHeader2Value2 = { 0, 0, 0, 5 }; + + String singleValueHeader = "test-single-value1"; + byte[] singleValueHeaderValue = { 0, 0, 0, 6 }; + + byte[] deliveryAttemptValue = { 0, 0, 0, 1 }; + byte[] originalOffset = { 0, 0, 0, 1 }; + byte[] defaultHeaderAttempts = { 0, 0, 0, 5 }; + + DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper(); + mapper.setMultiValueHeaderPatterns(multiValueHeader1, multiValueHeader2); + + Headers rawHeaders = new RecordHeaders(); + rawHeaders.add(KafkaHeaders.DELIVERY_ATTEMPT, deliveryAttemptValue); + rawHeaders.add(KafkaHeaders.ORIGINAL_OFFSET, originalOffset); + rawHeaders.add(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, defaultHeaderAttempts); + rawHeaders.add(singleValueHeader, singleValueHeaderValue); + + rawHeaders.add(multiValueHeader1, multiValueHeader1Value1); + rawHeaders.add(multiValueHeader1, multiValueHeader1Value2); + rawHeaders.add(multiValueHeader1, multiValueHeader1Value3); + rawHeaders.add(multiValueHeader1, multiValueHeader1Value4); + + rawHeaders.add(multiValueHeader2, multiValueHeader2Value1); + rawHeaders.add(multiValueHeader2, multiValueHeader2Value2); + + // WHEN + Map mappedHeaders = new HashMap<>(); + mapper.toHeaders(rawHeaders, mappedHeaders); + + // THEN + assertThat(mappedHeaders.get(KafkaHeaders.DELIVERY_ATTEMPT)).isEqualTo(1); + assertThat(mappedHeaders.get(KafkaHeaders.ORIGINAL_OFFSET)).isEqualTo(originalOffset); + assertThat(mappedHeaders.get(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS)).isEqualTo(defaultHeaderAttempts); + assertThat(mappedHeaders.get(singleValueHeader)).isEqualTo(singleValueHeaderValue); + + assertThat(mappedHeaders) + .extractingByKey(multiValueHeader1, InstanceOfAssertFactories.list(byte[].class)) + .containsExactly(multiValueHeader1Value1, multiValueHeader1Value2, + multiValueHeader1Value3, multiValueHeader1Value4); + + assertThat(mappedHeaders) + .extractingByKey(multiValueHeader2, InstanceOfAssertFactories.list(byte[].class)) + .containsExactly(multiValueHeader2Value1, multiValueHeader2Value2); + } + + @Test + void multiValueHeaderFromTest() { + // GIVEN + String multiValueHeader1 = "test-multi-value1"; + byte[] multiValueHeader1Value1 = { 0, 0, 0, 1 }; + byte[] multiValueHeader1Value2 = { 0, 0, 0, 2 }; + + String multiValueHeader2 = "test-multi-value2"; + byte[] multiValueHeader2Value1 = { 0, 0, 0, 3 }; + byte[] multiValueHeader2Value2 = { 0, 0, 0, 4 }; + + String singleValueHeader = "test-single-value1"; + byte[] singleValueHeaderValue = { 0, 0, 0, 5 }; + + Message message = MessageBuilder + .withPayload("test-multi-value-header") + .setHeader(multiValueHeader1, List.of(multiValueHeader1Value1, multiValueHeader1Value2)) + .setHeader(multiValueHeader2, List.of(multiValueHeader2Value1, multiValueHeader2Value2)) + .setHeader(singleValueHeader, singleValueHeaderValue) + .build(); + + SimpleKafkaHeaderMapper mapper = new SimpleKafkaHeaderMapper(); + mapper.setMultiValueHeaderPatterns(multiValueHeader1, multiValueHeader2); + + // WHEN + Headers results = new RecordHeaders(); + mapper.fromHeaders(message.getHeaders(), results); + + // THEN + assertThat(results.headers(multiValueHeader1)) + .extracting(Header::value) + .containsExactly(multiValueHeader1Value1, multiValueHeader1Value2); + + assertThat(results.headers(multiValueHeader2)) + .extracting(Header::value) + .containsExactly(multiValueHeader2Value1, multiValueHeader2Value2); + + assertThat(results.headers(singleValueHeader)) + .extracting(Header::value) + .containsExactly(singleValueHeaderValue); + } + }