Skip to content

Add assertions to fail early on absent values using StreamMessageListenerContainer #2472

Closed
@survivant

Description

@survivant

I have a junit that publish and listen for messages on a Redis Stream. Something when I start the code, I get a NPE exception. I found that it's the thread that start receiving messages when the listener wasn't yet added. I found this thread that had the same error then me, and after I ran my code in debug mode and found out the issue. redisson/redisson#4135

image

here my code and configs

I have the same issue but with lettuce. Here the code that I have.

logs

07:52:55.111 [SimpleAsyncTaskExecutor-1] ERROR o.s.d.r.s.DefaultStreamMessageListenerContainer$LoggingErrorHandler - Unexpected error occurred in scheduled task.
java.lang.NullPointerException: null
	at org.springframework.data.redis.stream.StreamPollTask.deserializeAndEmitRecords(StreamPollTask.java:177)
	at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:148)
	at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:132)
	at java.base/java.lang.Thread.run(Thread.java:829)
07:52:55.109 [SimpleAsyncTaskExecutor-2] INFO  c.e.demoredis.controller.StreamTest - Message received in JUNIT for group-consumer [group-a-consumer-a] [DemoMessage(id=0, message=message1, createdBy=user1, modifyBy=null)]
07:52:55.117 [pool-2-thread-2] INFO  c.e.demoredis.controller.StreamTest - Messages received
 @Test
    public void testStreamListener() {
        publishDemoMessage(1);

        var future = CompletableFuture.supplyAsync(() -> waitNotification(countDownLatch), executor)
                .thenAcceptAsync(release -> LOGGER.info("Messages received"), executor);

        // wait for the future complete
        waitForFutureToComplete(future);

        assertEquals(0, countDownLatch.getCount());
}
private void publishDemoMessage(int count) {
        for (var i = 0; i<count; i++) {
            var message = DemoMessage.builder()
                    .id(String.valueOf(movieID++))
                    .message("message1")
                    .createdBy("user1")
                    .build();

            var record = StreamRecords.newRecord()
                    .ofObject(message)
                    .withStreamKey(KEY);

            var recordId = redisTemplate.opsForStream().add(record).block();
        }
    }
@Configuration
@RequiredArgsConstructor
public class StreamRedisConfiguration {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamRedisConfiguration.class);

    public static String STREAM_CHANNEL;
    public static String STREAM_GROUP;
    public static String STREAM_CONSUMER;

    private StreamListener<String, ObjectRecord<String, DemoMessage>> streamListener;

    @Value("${spring.redis.stream.channel}")
    private void setStreamChannel(String channel){
        STREAM_CHANNEL = channel;
    }

    @Value("${spring.redis.stream.group}")
    private void setStreamGroup(String group){
        STREAM_GROUP = group;
    }

    @Value("${spring.redis.stream.consumer}")
    private void setChannelConsumer(String consumer){
        STREAM_CONSUMER = consumer;
    }

    @Bean
    public StreamMessageListenerContainer<String, ObjectRecord<String, DemoMessage>> listenerContainer(RedisConnectionFactory redisConnectionFactory) {
        var options = StreamMessageListenerContainerOptions
                .builder()
                .pollTimeout(Duration.ofSeconds(1))
                .targetType(DemoMessage.class)
                .build();
        var listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);

        return listenerContainer;
    }

    @Bean
    public Subscription subscription(RedisConnectionFactory redisConnectionFactory, ReactiveRedisTemplate<String, DemoMessage> reactiveRedisTemplate) {

        try {
            redisConnectionFactory.getConnection()
                    .xGroupCreate(STREAM_CHANNEL.getBytes(), STREAM_GROUP, ReadOffset.from("0-0"), true);
        } catch (RedisSystemException exception) {
            LOGGER.warn(exception.getCause().getMessage());
        }

        var listenerContainer = listenerContainer(redisConnectionFactory);

        var subscription = listenerContainer
                .receive(Consumer.from(STREAM_GROUP, STREAM_CONSUMER),
                StreamOffset.create(STREAM_CHANNEL, ReadOffset.lastConsumed())
                , streamListener);
        listenerContainer.start();
        return subscription;
    }
}
@Service
@RequiredArgsConstructor
public class RedisStreamMessageConsumer implements StreamListener<String, ObjectRecord<String, DemoMessage>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisStreamMessageConsumer.class);

	private final ReactiveRedisTemplate<String, String> redisTemplate;

	@Override
	public void onMessage(ObjectRecord<String, DemoMessage> record) {
        LOGGER.info("Stream Subscriber >> [{}]", record.getValue());
	}

}

my pom.xml

in my parent

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.6</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.comact.demo</groupId>
        <artifactId>redis-demo</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <relativePath>../pom.xml</relativePath>
    </parent>
    <groupId>com.example</groupId>
    <artifactId>demo-redis-service</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>service-watcher</name>
    <description>service-watcher</description>
    <properties>
        <java.version>11</java.version>
        <springdoc.version>1.6.12</springdoc.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.example</groupId>
            <artifactId>demo-redis-pojo</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springdoc</groupId>
            <artifactId>springdoc-openapi-ui</artifactId>
            <version>${springdoc.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springdoc</groupId>
            <artifactId>springdoc-openapi-webflux-ui</artifactId>
            <version>${springdoc.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.codehaus.janino</groupId>
            <artifactId>janino</artifactId>
            <version>3.1.8</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bootstrap</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.playtika.testcontainers/embedded-redis -->
        <dependency>
            <groupId>com.playtika.testcontainers</groupId>
            <artifactId>embedded-redis</artifactId>
            <version>2.2.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springdoc</groupId>
                <artifactId>springdoc-openapi-maven-plugin</artifactId>
                <version>1.4</version>
            </plugin>
        </plugins>
    </build>

</project>

and my config

spring:
  redis:
    host: ${embedded.redis.host}
    port: ${embedded.redis.port}
    password: ${embedded.redis.password}
    pubSub:
      channel: 'JUNIT-MESSAGES_CHANNEL'
    stream:
      channel: 'JUNIT-STREAM_CHANNEL'
      group: 'group'
      consumer: 'consumer'

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions