Closed
Description
I have a junit that publish and listen for messages on a Redis Stream. Something when I start the code, I get a NPE exception. I found that it's the thread that start receiving messages when the listener wasn't yet added. I found this thread that had the same error then me, and after I ran my code in debug mode and found out the issue. redisson/redisson#4135
here my code and configs
I have the same issue but with lettuce. Here the code that I have.
logs
07:52:55.111 [SimpleAsyncTaskExecutor-1] ERROR o.s.d.r.s.DefaultStreamMessageListenerContainer$LoggingErrorHandler - Unexpected error occurred in scheduled task.
java.lang.NullPointerException: null
at org.springframework.data.redis.stream.StreamPollTask.deserializeAndEmitRecords(StreamPollTask.java:177)
at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:148)
at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:132)
at java.base/java.lang.Thread.run(Thread.java:829)
07:52:55.109 [SimpleAsyncTaskExecutor-2] INFO c.e.demoredis.controller.StreamTest - Message received in JUNIT for group-consumer [group-a-consumer-a] [DemoMessage(id=0, message=message1, createdBy=user1, modifyBy=null)]
07:52:55.117 [pool-2-thread-2] INFO c.e.demoredis.controller.StreamTest - Messages received
@Test
public void testStreamListener() {
publishDemoMessage(1);
var future = CompletableFuture.supplyAsync(() -> waitNotification(countDownLatch), executor)
.thenAcceptAsync(release -> LOGGER.info("Messages received"), executor);
// wait for the future complete
waitForFutureToComplete(future);
assertEquals(0, countDownLatch.getCount());
}
private void publishDemoMessage(int count) {
for (var i = 0; i<count; i++) {
var message = DemoMessage.builder()
.id(String.valueOf(movieID++))
.message("message1")
.createdBy("user1")
.build();
var record = StreamRecords.newRecord()
.ofObject(message)
.withStreamKey(KEY);
var recordId = redisTemplate.opsForStream().add(record).block();
}
}
@Configuration
@RequiredArgsConstructor
public class StreamRedisConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(StreamRedisConfiguration.class);
public static String STREAM_CHANNEL;
public static String STREAM_GROUP;
public static String STREAM_CONSUMER;
private StreamListener<String, ObjectRecord<String, DemoMessage>> streamListener;
@Value("${spring.redis.stream.channel}")
private void setStreamChannel(String channel){
STREAM_CHANNEL = channel;
}
@Value("${spring.redis.stream.group}")
private void setStreamGroup(String group){
STREAM_GROUP = group;
}
@Value("${spring.redis.stream.consumer}")
private void setChannelConsumer(String consumer){
STREAM_CONSUMER = consumer;
}
@Bean
public StreamMessageListenerContainer<String, ObjectRecord<String, DemoMessage>> listenerContainer(RedisConnectionFactory redisConnectionFactory) {
var options = StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.targetType(DemoMessage.class)
.build();
var listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);
return listenerContainer;
}
@Bean
public Subscription subscription(RedisConnectionFactory redisConnectionFactory, ReactiveRedisTemplate<String, DemoMessage> reactiveRedisTemplate) {
try {
redisConnectionFactory.getConnection()
.xGroupCreate(STREAM_CHANNEL.getBytes(), STREAM_GROUP, ReadOffset.from("0-0"), true);
} catch (RedisSystemException exception) {
LOGGER.warn(exception.getCause().getMessage());
}
var listenerContainer = listenerContainer(redisConnectionFactory);
var subscription = listenerContainer
.receive(Consumer.from(STREAM_GROUP, STREAM_CONSUMER),
StreamOffset.create(STREAM_CHANNEL, ReadOffset.lastConsumed())
, streamListener);
listenerContainer.start();
return subscription;
}
}
@Service
@RequiredArgsConstructor
public class RedisStreamMessageConsumer implements StreamListener<String, ObjectRecord<String, DemoMessage>> {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisStreamMessageConsumer.class);
private final ReactiveRedisTemplate<String, String> redisTemplate;
@Override
public void onMessage(ObjectRecord<String, DemoMessage> record) {
LOGGER.info("Stream Subscriber >> [{}]", record.getValue());
}
}
my pom.xml
in my parent
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.6</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.comact.demo</groupId>
<artifactId>redis-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>com.example</groupId>
<artifactId>demo-redis-service</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>service-watcher</name>
<description>service-watcher</description>
<properties>
<java.version>11</java.version>
<springdoc.version>1.6.12</springdoc.version>
</properties>
<dependencies>
<dependency>
<groupId>com.example</groupId>
<artifactId>demo-redis-pojo</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-ui</artifactId>
<version>${springdoc.version}</version>
</dependency>
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-webflux-ui</artifactId>
<version>${springdoc.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>3.1.8</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.playtika.testcontainers/embedded-redis -->
<dependency>
<groupId>com.playtika.testcontainers</groupId>
<artifactId>embedded-redis</artifactId>
<version>2.2.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-maven-plugin</artifactId>
<version>1.4</version>
</plugin>
</plugins>
</build>
</project>
and my config
spring:
redis:
host: ${embedded.redis.host}
port: ${embedded.redis.port}
password: ${embedded.redis.password}
pubSub:
channel: 'JUNIT-MESSAGES_CHANNEL'
stream:
channel: 'JUNIT-STREAM_CHANNEL'
group: 'group'
consumer: 'consumer'