Skip to content

RedisMessageListenerContainer.removeMessageListener(…) unsubscribes twice #2355

Closed
@atisvagyok

Description

@atisvagyok

Hi,

Recently I upgraded to spring-data-redis 2.7.1 and I noticed an issue. I have a pool which is used by pub/sub, stream and "regular" key-value operations. I noticed once I unsubscribed from a topic the subsequent Redis connection validation (the pool has testOnBorrow enabled) fails because the RedisInputStream or the buffer still has the result of the previous command, which was and unsubscribe result.
It fails at redis.clients.jedis.Connection#getStatusCodeReply after the PING command was sent, because the first byte is not a + byte but an * byte, since the following content is still in the buffer:

*3
$11
unsubscribe
$8
my-topic
:0
msg

This is caused by the fix of #2331 in JedisConnection:

I think this could be fixed by checking subscription.isAlive() and only close the subscription if it is still alive. In my case the subscription was already closed by container.removeMessageListener(listener).

I have created a minimal SpringBoot reproducer:

package com.repro;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.jedis.JedisClientConfiguration;
import org.springframework.data.redis.connection.jedis.JedisClientConfiguration.JedisClientConfigurationBuilder;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import redis.clients.jedis.Jedis;

@SpringBootApplication(scanBasePackages = {"com.repro"})
public class RedisReproApplication {

  public static void main(String[] args) throws InterruptedException {
    final ConfigurableApplicationContext ctx =
        SpringApplication.run(RedisReproApplication.class, args);
    final RedisMessageListenerContainer container =
        ctx.getBean(RedisMessageListenerContainer.class);
    final CountDownLatch receivedLatch = new CountDownLatch(1);
    final MessageListener listener =
        (message, pattern) -> {
          System.out.printf(
              "Message: %s -- %s%n",
              new String(message.getChannel()), new String(message.getBody()));
          receivedLatch.countDown();
        };
    final StringRedisTemplate template = ctx.getBean(StringRedisTemplate.class);

    container.addMessageListener(listener, new ChannelTopic("my-topic"));
    template.convertAndSend("my-topic", "test-msg");
    if (!receivedLatch.await(5, TimeUnit.SECONDS)) {
      System.out.println("Message was not received within 5 seconds!");
    }
    container.removeMessageListener(listener);

    final RedisCallback<List<Object>> callback =
        conn -> {
          conn.multi();
          conn.setNX("foo".getBytes(), "0".getBytes());
          conn.incr("foo".getBytes());
          return conn.exec();
        };
    final List<Object> results = template.execute(callback);
    System.out.println("Multi result: " + results);
  }

  @Bean
  public JedisConnectionFactory connectionFactory() {
    GenericObjectPoolConfig<Jedis> poolConfig = new GenericObjectPoolConfig<>();
    poolConfig.setMaxTotal(5);
    poolConfig.setTestOnBorrow(true);

    final RedisStandaloneConfiguration connectionConfiguration =
        new RedisStandaloneConfiguration("localhost", 6379);

    final JedisClientConfigurationBuilder clientConfigurationBuilder =
        JedisClientConfiguration.builder()
            .usePooling()
            .poolConfig(poolConfig)
            .and()
            .connectTimeout(Duration.ofMillis(5_000));

    return new JedisConnectionFactory(connectionConfiguration, clientConfigurationBuilder.build());
  }

  @Bean
  public StringRedisTemplate template(JedisConnectionFactory connectionFactory) {
    return new StringRedisTemplate(connectionFactory);
  }

  @Bean
  public RedisMessageListenerContainer container(JedisConnectionFactory connectionFactory) {
    final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    return container;
  }
}

Spring Boot: v2.7.1
Spring Data Redis: v2.7.1
Jedis: v3.8.0

The output with Spring Data Redis 2.7.1 is the following:

2022-07-05 08:37:30.900  INFO 18608 --- [           main] com.repro.RedisReproApplication          : Started RedisReproApplication in 6.198 seconds (JVM running for 7.41)
Message: my-topic -- test-msg
2022-07-05 08:37:30.976 ERROR 18608 --- [           main] redis.clients.jedis.JedisFactory         : Error while validating pooled Jedis object.

java.lang.ClassCastException: class java.util.ArrayList cannot be cast to class [B (java.util.ArrayList and [B are in module java.base of loader 'bootstrap')
	at redis.clients.jedis.Connection.getStatusCodeReply(Connection.java:270) ~[jedis-3.8.0.jar:na]
	at redis.clients.jedis.BinaryJedis.ping(BinaryJedis.java:384) ~[jedis-3.8.0.jar:na]
	at redis.clients.jedis.JedisFactory.validateObject(JedisFactory.java:214) ~[jedis-3.8.0.jar:na]
	at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:343) ~[commons-pool2-2.11.1.jar:2.11.1]
	at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:223) ~[commons-pool2-2.11.1.jar:2.11.1]
	at redis.clients.jedis.util.Pool.getResource(Pool.java:75) ~[jedis-3.8.0.jar:na]
	at redis.clients.jedis.JedisPool.getResource(JedisPool.java:377) ~[jedis-3.8.0.jar:na]
	at redis.clients.jedis.JedisPool.getResource(JedisPool.java:15) ~[jedis-3.8.0.jar:na]
	at org.springframework.data.redis.connection.jedis.JedisConnectionFactory.fetchJedisConnector(JedisConnectionFactory.java:283) ~[spring-data-redis-2.7.1.jar:2.7.1]
	at org.springframework.data.redis.connection.jedis.JedisConnectionFactory.getConnection(JedisConnectionFactory.java:515) ~[spring-data-redis-2.7.1.jar:2.7.1]
	at org.springframework.data.redis.core.RedisConnectionUtils.fetchConnection(RedisConnectionUtils.java:193) ~[spring-data-redis-2.7.1.jar:2.7.1]
	at org.springframework.data.redis.core.RedisConnectionUtils.doGetConnection(RedisConnectionUtils.java:144) ~[spring-data-redis-2.7.1.jar:2.7.1]
	at org.springframework.data.redis.core.RedisConnectionUtils.getConnection(RedisConnectionUtils.java:105) ~[spring-data-redis-2.7.1.jar:2.7.1]
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:211) ~[spring-data-redis-2.7.1.jar:2.7.1]
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:191) ~[spring-data-redis-2.7.1.jar:2.7.1]
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:178) ~[spring-data-redis-2.7.1.jar:2.7.1]
	at com.repro.RedisReproApplication.main(RedisReproApplication.java:55) ~[classes/:na]

Multi result: [false, 23]

Both operations succeeded, however there was connection validation failure in the middle.

With Spring Data Redis 2.7.0 the result is:

2022-07-05 08:55:33.822  INFO 20524 --- [           main] com.repro.RedisReproApplication          : Started RedisReproApplication in 6.181 seconds (JVM running for 6.816)
Message: my-topic -- test-msg
Multi result: [false, 24]

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions