Skip to content

Commit 69c4cb5

Browse files
authored
Merge branch 'main' into codex/fix-issue-with-kafka-header-extraction
Signed-off-by: Igor Macedo Quintanilha <igor.quintanilha@gmail.com>
2 parents a4251a9 + 797b96e commit 69c4cb5

File tree

11 files changed

+309
-41
lines changed

11 files changed

+309
-41
lines changed

build.gradle

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ buildscript {
1717
plugins {
1818
id 'base'
1919
id 'idea'
20-
id 'org.ajoberstar.grgit' version '5.3.0'
20+
id 'org.ajoberstar.grgit' version '5.3.2'
2121
id 'io.spring.nohttp' version '0.0.11'
2222
id 'io.spring.dependency-management' version '1.1.7' apply false
2323
id 'io.freefair.aggregate-javadoc' version '8.11'
@@ -55,16 +55,16 @@ ext {
5555
awaitilityVersion = '4.3.0'
5656
hamcrestVersion = '3.0'
5757
hibernateValidationVersion = '8.0.2.Final'
58-
jacksonBomVersion = '2.19.0'
58+
jacksonBomVersion = '2.19.1'
5959
jaywayJsonPathVersion = '2.9.0'
6060
junit4Version = '4.13.2'
6161
junitJupiterVersion = '5.13.1'
6262
kafkaVersion = '4.0.0'
6363
kotlinCoroutinesVersion = '1.10.2'
6464
log4jVersion = '2.24.3'
6565
micrometerDocsVersion = '1.0.4'
66-
micrometerVersion = '1.15.0'
67-
micrometerTracingVersion = '1.5.0'
66+
micrometerVersion = '1.15.1'
67+
micrometerTracingVersion = '1.5.1'
6868
mockitoVersion = '5.18.0'
6969
reactorVersion = '2025.0.0-SNAPSHOT'
7070
scalaVersion = '2.13'

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,35 @@ IMPORTANT: If the interceptor mutates the record (by creating a new one), the `t
2222

2323
The `CompositeRecordInterceptor` and `CompositeBatchInterceptor` can be used to invoke multiple interceptors.
2424

25+
Starting with version 4.0, `AbstractMessageListenerContainer` exposes `getRecordInterceptor()` as a public method.
26+
If the returned interceptor is an instance of `CompositeRecordInterceptor`, additional `RecordInterceptor` instances can be added to it even after the container instance extending `AbstractMessageListenerContainer` has been created and a `RecordInterceptor` has already been configured.
27+
The following example shows how to do so:
28+
29+
[source, java]
30+
----
31+
public void configureRecordInterceptor(KafkaMessageListenerContainer<Integer, String> container) {
32+
CompositeRecordInterceptor compositeInterceptor;
33+
34+
RecordInterceptor<Integer, String> previousInterceptor = container.getRecordInterceptor();
35+
if (previousInterceptor instanceof CompositeRecordInterceptor interceptor) {
36+
compositeInterceptor = interceptor;
37+
} else {
38+
compositeInterceptor = new CompositeRecordInterceptor<>();
39+
container.setRecordInterceptor(compositeInterceptor);
40+
}
41+
42+
if (previousInterceptor != null) {
43+
compositeRecordInterceptor.addRecordInterceptor(previousInterceptor);
44+
}
45+
46+
RecordInterceptor<Integer, String> recordInterceptor1 = new RecordInterceptor() {...};
47+
RecordInterceptor<Integer, String> recordInterceptor2 = new RecordInterceptor() {...};
48+
49+
compositeInterceptor.addRecordInterceptor(recordInterceptor1);
50+
compositeInterceptor.addRecordInterceptor(recordInterceptor2);
51+
}
52+
----
53+
2554
By default, starting with version 2.8, when using transactions, the interceptor is invoked before the transaction has started.
2655
You can set the listener container's `interceptBeforeTx` property to `false` to invoke the interceptor after the transaction has started instead.
2756
Starting with version 2.9, this will apply to any transaction manager, not just `KafkaAwareTransactionManager`+++s+++.
@@ -265,4 +294,3 @@ The listener containers implement `SmartLifecycle`, and `autoStartup` is `true`
265294
The containers are started in a late phase (`Integer.MAX-VALUE - 100`).
266295
Other components that implement `SmartLifecycle`, to handle data from listeners, should be started in an earlier phase.
267296
The `- 100` leaves room for later phases to enable components to be auto-started after the containers.
268-

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,16 @@ For details, see xref:kafka/receiving-messages/rebalance-listeners.adoc#new-reba
7777
The `DefaultKafkaHeaderMapper` and `SimpleKafkaHeaderMapper` support multi-value header mapping for Kafka records.
7878
More details are available in xref:kafka/headers.adoc#multi-value-header[Support multi-value header mapping].
7979

80+
<<<<<<< codex/fix-issue-with-kafka-header-extraction
8081
[[x40-batch-observability]]
8182
=== Per-Record Observation in Batch Listeners
8283

8384
It is now possible to get an observation for each record when using a batch listener.
8485
See xref:kafka/micrometer.adoc#batch-listener-obs[Observability for Batch Listeners] for more information.
86+
=======
87+
[[x40-add-record-interceptor]]
88+
=== Configure additional `RecordInterceptor`
89+
90+
Listener containers now support interceptor customization via `getRecordInterceptor()`.
91+
See the xref:kafka/receiving-messages/message-listener-container.adoc#message-listener-container[Message Listener Containers] section for details.
92+
>>>>>>> main

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,12 @@ public void setKafkaAdmin(KafkaAdmin kafkaAdmin) {
460460
this.kafkaAdmin = kafkaAdmin;
461461
}
462462

463-
protected @Nullable RecordInterceptor<K, V> getRecordInterceptor() {
463+
/**
464+
* Get the {@link RecordInterceptor} for modification, if configured.
465+
* @return the {@link RecordInterceptor}, or {@code null} if not configured
466+
* @since 4.0
467+
*/
468+
public @Nullable RecordInterceptor<K, V> getRecordInterceptor() {
464469
return this.recordInterceptor;
465470
}
466471

spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeRecordInterceptor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
*
3636
* @author Artem Bilan
3737
* @author Gary Russell
38+
* @author Sanghyeok An
3839
* @since 2.3
3940
*
4041
*/
@@ -92,4 +93,13 @@ public void afterRecord(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
9293
this.delegates.forEach(del -> del.afterRecord(record, consumer));
9394
}
9495

96+
/**
97+
* Add an {@link RecordInterceptor} to delegates.
98+
* @param recordInterceptor the interceptor.
99+
* @since 4.0
100+
*/
101+
public void addRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
102+
this.delegates.add(recordInterceptor);
103+
}
104+
95105
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1498,7 +1498,7 @@ protected void handleAsyncFailure() {
14981498
// We will give up on retrying with the remaining copied and failed Records.
14991499
for (FailedRecordTuple<K, V> copyFailedRecord : copyFailedRecords) {
15001500
try {
1501-
invokeErrorHandlerBySingleRecord(copyFailedRecord);
1501+
copyFailedRecord.observation.scoped(() -> invokeErrorHandlerBySingleRecord(copyFailedRecord));
15021502
}
15031503
catch (Exception e) {
15041504
this.logger.warn(() ->
@@ -3453,8 +3453,13 @@ private Collection<ConsumerRecord<K, V>> getHighestOffsetRecords(ConsumerRecords
34533453
.values();
34543454
}
34553455

3456+
private Observation getCurrentObservation() {
3457+
Observation currentObservation = this.observationRegistry.getCurrentObservation();
3458+
return currentObservation == null ? Observation.NOOP : currentObservation;
3459+
}
3460+
34563461
private void callbackForAsyncFailure(ConsumerRecord<K, V> cRecord, RuntimeException ex) {
3457-
this.failedRecords.addLast(new FailedRecordTuple<>(cRecord, ex));
3462+
this.failedRecords.addLast(new FailedRecordTuple<>(cRecord, ex, getCurrentObservation()));
34583463
}
34593464

34603465
@Override
@@ -4071,6 +4076,6 @@ private static class StopAfterFenceException extends KafkaException {
40714076

40724077
}
40734078

4074-
private record FailedRecordTuple<K, V>(ConsumerRecord<K, V> record, RuntimeException ex) { }
4079+
private record FailedRecordTuple<K, V>(ConsumerRecord<K, V> record, RuntimeException ex, Observation observation) { }
40754080

40764081
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,7 @@ else if (!(result instanceof CompletableFuture<?>)) {
549549
}
550550

551551
completableFutureResult.whenComplete((r, t) -> {
552-
try {
552+
try (var scope = observation.openScope()) {
553553
if (t == null) {
554554
asyncSuccess(r, replyTopic, source, messageReturnType);
555555
if (isAsyncReplies()) {
@@ -736,13 +736,15 @@ protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgm
736736
"Async Fail", Objects.requireNonNull(source).getPayload()), cause));
737737
}
738738
catch (Throwable ex) {
739-
this.logger.error(t, () -> "Future, Mono, or suspend function was completed with an exception for " + source);
740739
acknowledge(acknowledgment);
741740
if (canAsyncRetry(request, ex) && this.asyncRetryCallback != null) {
742741
@SuppressWarnings("unchecked")
743742
ConsumerRecord<K, V> record = (ConsumerRecord<K, V>) request;
744743
this.asyncRetryCallback.accept(record, (RuntimeException) ex);
745744
}
745+
else {
746+
this.logger.error(ex, () -> "Future, Mono, or suspend function was completed with an exception for " + source);
747+
}
746748
}
747749
}
748750

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTypeSerializer.java

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@
1616

1717
package org.springframework.kafka.support.serializer;
1818

19-
import java.util.LinkedHashMap;
19+
import java.util.Comparator;
2020
import java.util.Map;
2121
import java.util.Map.Entry;
22+
import java.util.TreeMap;
2223

2324
import org.apache.kafka.common.errors.SerializationException;
2425
import org.apache.kafka.common.header.Headers;
@@ -32,13 +33,29 @@
3233
* @author Gary Russell
3334
* @author Artem Bilan
3435
* @author Wang Zhiyang
36+
* @author Mahesh Aravind V
3537
*
3638
* @since 2.7.9
3739
*
3840
*/
3941
public class DelegatingByTypeSerializer implements Serializer<Object> {
4042

41-
private final Map<Class<?>, Serializer<?>> delegates = new LinkedHashMap<>();
43+
private static final Comparator<Class<?>> DELEGATES_ASSIGNABILITY_COMPARATOR =
44+
(class1, class2) -> {
45+
46+
if (class1 == class2) {
47+
return 0; // Classes are the same
48+
}
49+
if (class1.isAssignableFrom(class2)) {
50+
return 1; // class2 is a superclass or superinterface of class1, so class2 should come first
51+
}
52+
if (class2.isAssignableFrom(class1)) {
53+
return -1; // class1 is a superclass or superinterface of class2, so class1 should come first
54+
}
55+
return class1.getName().compareTo(class2.getName()); // If no inheritance relation, compare by name
56+
};
57+
58+
private final Map<Class<?>, Serializer<?>> delegates = new TreeMap<>(DELEGATES_ASSIGNABILITY_COMPARATOR);
4259

4360
private final boolean assignable;
4461

@@ -51,17 +68,21 @@ public DelegatingByTypeSerializer(Map<Class<?>, Serializer<?>> delegates) {
5168
}
5269

5370
/**
54-
* Construct an instance with the map of delegates; keys matched exactly or if the
55-
* target object is assignable to the key, depending on the assignable argument.
56-
* If assignable, entries are checked in the natural entry order so an ordered map
57-
* such as a {@link LinkedHashMap} is recommended.
58-
* @param delegates the delegates.
59-
* @param assignable whether the target is assignable to the key.
71+
* Construct an instance with the map of delegates.
72+
* If {@code assignable} is {@code false}, only exact key matches are considered.
73+
* If {@code assignable} is {@code true}, a delegate is selected if its key class
74+
* is assignable from the target object's class. When multiple matches are possible,
75+
* the most specific matching class is selected — that is, the closest match in the
76+
* class hierarchy.
77+
* @param delegates the delegates
78+
* @param assignable true if {@link #findDelegate(Object, Map)} should consider assignability to
79+
* the key rather than an exact match.
6080
* @since 2.8.3
6181
*/
6282
public DelegatingByTypeSerializer(Map<Class<?>, Serializer<?>> delegates, boolean assignable) {
6383
Assert.notNull(delegates, "'delegates' cannot be null");
6484
Assert.noNullElements(delegates.values(), "Serializers in delegates map cannot be null");
85+
6586
this.delegates.putAll(delegates);
6687
this.assignable = assignable;
6788
}
@@ -101,7 +122,7 @@ public byte[] serialize(String topic, Headers headers, Object data) {
101122
return delegate.serialize(topic, headers, data);
102123
}
103124

104-
private <T> Serializer<T> findDelegate(T data) {
125+
protected <T> Serializer<T> findDelegate(T data) {
105126
return findDelegate(data, this.delegates);
106127
}
107128

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3842,7 +3842,7 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
38423842
containerProps.setClientId("clientId");
38433843

38443844
CountDownLatch afterLatch = new CountDownLatch(1);
3845-
RecordInterceptor<Integer, String> recordInterceptor = spy(new RecordInterceptor<Integer, String>() {
3845+
RecordInterceptor<Integer, String> recordInterceptor1 = spy(new RecordInterceptor<Integer, String>() {
38463846

38473847
@Override
38483848
public @NonNull ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String> record,
@@ -3858,25 +3858,54 @@ public void clearThreadState(Consumer<?, ?> consumer) {
38583858

38593859
});
38603860

3861+
RecordInterceptor<Integer, String> recordInterceptor2 = spy(new RecordInterceptor<Integer, String>() {
3862+
3863+
@Override
3864+
public @NonNull ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String> record,
3865+
Consumer<Integer, String> consumer) {
3866+
3867+
return record;
3868+
}
3869+
3870+
@Override
3871+
public void clearThreadState(Consumer<?, ?> consumer) {
3872+
afterLatch.countDown();
3873+
}
3874+
3875+
});
3876+
38613877
KafkaMessageListenerContainer<Integer, String> container =
38623878
new KafkaMessageListenerContainer<>(cf, containerProps);
3863-
container.setRecordInterceptor(recordInterceptor);
3879+
container.setRecordInterceptor(new CompositeRecordInterceptor<>());
3880+
if (container.getRecordInterceptor() instanceof CompositeRecordInterceptor<Integer, String> composite) {
3881+
composite.addRecordInterceptor(recordInterceptor1);
3882+
composite.addRecordInterceptor(recordInterceptor2);
3883+
}
3884+
38643885
container.start();
38653886
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
38663887
assertThat(afterLatch.await(10, TimeUnit.SECONDS)).isTrue();
38673888

3868-
InOrder inOrder = inOrder(recordInterceptor, messageListener, consumer);
3869-
inOrder.verify(recordInterceptor).setupThreadState(eq(consumer));
3889+
InOrder inOrder = inOrder(recordInterceptor1, recordInterceptor2, messageListener, consumer);
3890+
inOrder.verify(recordInterceptor1).setupThreadState(eq(consumer));
3891+
inOrder.verify(recordInterceptor2).setupThreadState(eq(consumer));
38703892
inOrder.verify(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
3871-
inOrder.verify(recordInterceptor).intercept(eq(firstRecord), eq(consumer));
3893+
inOrder.verify(recordInterceptor1).intercept(eq(firstRecord), eq(consumer));
3894+
inOrder.verify(recordInterceptor2).intercept(eq(firstRecord), eq(consumer));
38723895
inOrder.verify(messageListener).onMessage(eq(firstRecord));
3873-
inOrder.verify(recordInterceptor).success(eq(firstRecord), eq(consumer));
3874-
inOrder.verify(recordInterceptor).afterRecord(eq(firstRecord), eq(consumer));
3875-
inOrder.verify(recordInterceptor).intercept(eq(secondRecord), eq(consumer));
3896+
inOrder.verify(recordInterceptor1).success(eq(firstRecord), eq(consumer));
3897+
inOrder.verify(recordInterceptor2).success(eq(firstRecord), eq(consumer));
3898+
inOrder.verify(recordInterceptor1).afterRecord(eq(firstRecord), eq(consumer));
3899+
inOrder.verify(recordInterceptor2).afterRecord(eq(firstRecord), eq(consumer));
3900+
inOrder.verify(recordInterceptor1).intercept(eq(secondRecord), eq(consumer));
3901+
inOrder.verify(recordInterceptor2).intercept(eq(secondRecord), eq(consumer));
38763902
inOrder.verify(messageListener).onMessage(eq(secondRecord));
3877-
inOrder.verify(recordInterceptor).success(eq(secondRecord), eq(consumer));
3878-
inOrder.verify(recordInterceptor).afterRecord(eq(secondRecord), eq(consumer));
3879-
inOrder.verify(recordInterceptor).clearThreadState(eq(consumer));
3903+
inOrder.verify(recordInterceptor1).success(eq(secondRecord), eq(consumer));
3904+
inOrder.verify(recordInterceptor2).success(eq(secondRecord), eq(consumer));
3905+
inOrder.verify(recordInterceptor1).afterRecord(eq(secondRecord), eq(consumer));
3906+
inOrder.verify(recordInterceptor2).afterRecord(eq(secondRecord), eq(consumer));
3907+
inOrder.verify(recordInterceptor1).clearThreadState(eq(consumer));
3908+
inOrder.verify(recordInterceptor2).clearThreadState(eq(consumer));
38803909
container.stop();
38813910
}
38823911

0 commit comments

Comments
 (0)