Description
Description
When enabling batch listeners in the Kafka consumer configuration by setting factory.setBatchListener(true)
.
Current Configuration
In KafkaConsumerConfig.java
, we have:
factory.getContainerProperties().setObservationEnabled(true);
factory.setBatchListener(true);
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9093");
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
"birk");
props.put(
ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
"20");
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setObservationEnabled(true);
factory.setBatchListener(true);
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setPollTimeout(5000);
factory.setConcurrency(1);
return factory;
}
}
Expected Behavior
- Traces should be properly propagated and visible in the tracing system (Tempo/Grafana)
Actual Behavior
- No traces are being recorded when batch processing is enabled
Workaround
Currently, the only workaround is to disable batch processing by removing factory.setBatchListener(true)
, but this comes at the cost of reduced performance.
Environment
- Spring Boot 3.4.4
- Spring Kafka
- OpenTelemetry with Tempo/Grafana backend
- Java 17
Related Components
- OpenTelemetry configuration
@Bean
public SpanExporter primaryOtlpGrpcSpanExporter() {
return OtlpGrpcSpanExporter.builder()
.setEndpoint("https://tempo-.grafana.net:443")
.addHeader("Authorization", "Basic abc")
.build();
}
MyService.java
- Message processing service
@Service
public class MyService {
@Autowired
private GreetingRepository greetingRepository;
@KafkaListener(id = "batch", topics = "mytopicnameone", groupId = "foo", batch = "true", containerFactory = "kafkaListenerContainerFactory")
public void listenGroupFooA(ConsumerRecords<String, String> records) {
System.out.println("Received Message in group foo: " + records.count());
List<Greeting> greetings = new ArrayList<>();
for (ConsumerRecord<String, String> record : records) {
greetings.add(new Greeting(record.value().toString()));
}
greetingRepository.saveAll(greetings);
}
@KafkaListener(topics = "mytopicnameone", groupId = "foo")
public void listenGroupFooB(String message) {
System.out.println("Received Message in group foo: " + message);
greetingRepository.save(new Greeting(message.toUpperCase()));
}
}
To add details, if the method setBatchListener
method is removed:
factory.getContainerProperties().setObservationEnabled(true);
//factory.setBatchListener(true);
Not having to change anything else, this would work in terms of tracing.
Meaning, if there are 20 messages, we are able to see the 20 traces. But of course, we are not able to batch process the messages, and we have to do 20 times a single processing. But in terms of tracing, all traces are there.
Once we enable the batch mode, all traces are lost, however.
#Why do I think it might be a Spring Kafka issue?
First of all, I am not blaming Spring Kafka here. Just from the engineering investigation:
The API seems to be Spring Kafka.
I have the same producer that produces messages with traces.
I have, on one hand, just to have something to compare to, a Spark Streaming application, whose batch size is the same as the Spring Kafka consumer.
On the other hand, I have a Spring Kafka consumer with an enabled set batch mode.
The Spark consumer is able to consume, then process the messages, and retain the traces, even in the Spark streaming micro batch scenario.
Meaning, let's say there are 20 messages in the ONE batch, all 20 messages in this one batch have their own headers containing the traces. In this one Spark micro batch, there are 20 messages from the Value, but also 20 valid Headers, in this one batch. No traces are lost.
On the other hand, the Spring Kafka app seems not to be able to continue the trace.
Again, the issue seems to be when a batch is set. When it is not set, it works.
In terms of design, a batch, no matter its size, if all the message of the batch has a valid header with traces, it should be able to retain them
However, it seems here it is not able to retain them when it is enabled
On your side, please feel free to compare one consumer without batch mode and one consumer with batch mode.
Thank you