Description
Hi,
Recently I upgraded to spring-data-redis 2.7.1 and I noticed an issue. I have a pool which is used by pub/sub, stream and "regular" key-value operations. I noticed once I unsubscribed from a topic the subsequent Redis connection validation (the pool has testOnBorrow
enabled) fails because the RedisInputStream
or the buffer still has the result of the previous command, which was and unsubscribe
result.
It fails at redis.clients.jedis.Connection#getStatusCodeReply
after the PING
command was sent, because the first byte is not a +
byte but an *
byte, since the following content is still in the buffer:
*3
$11
unsubscribe
$8
my-topic
:0
msg
This is caused by the fix of #2331 in JedisConnection
:
I think this could be fixed by checking subscription.isAlive()
and only close the subscription if it is still alive. In my case the subscription was already closed by container.removeMessageListener(listener)
.
I have created a minimal SpringBoot reproducer:
package com.repro;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.jedis.JedisClientConfiguration;
import org.springframework.data.redis.connection.jedis.JedisClientConfiguration.JedisClientConfigurationBuilder;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import redis.clients.jedis.Jedis;
@SpringBootApplication(scanBasePackages = {"com.repro"})
public class RedisReproApplication {
public static void main(String[] args) throws InterruptedException {
final ConfigurableApplicationContext ctx =
SpringApplication.run(RedisReproApplication.class, args);
final RedisMessageListenerContainer container =
ctx.getBean(RedisMessageListenerContainer.class);
final CountDownLatch receivedLatch = new CountDownLatch(1);
final MessageListener listener =
(message, pattern) -> {
System.out.printf(
"Message: %s -- %s%n",
new String(message.getChannel()), new String(message.getBody()));
receivedLatch.countDown();
};
final StringRedisTemplate template = ctx.getBean(StringRedisTemplate.class);
container.addMessageListener(listener, new ChannelTopic("my-topic"));
template.convertAndSend("my-topic", "test-msg");
if (!receivedLatch.await(5, TimeUnit.SECONDS)) {
System.out.println("Message was not received within 5 seconds!");
}
container.removeMessageListener(listener);
final RedisCallback<List<Object>> callback =
conn -> {
conn.multi();
conn.setNX("foo".getBytes(), "0".getBytes());
conn.incr("foo".getBytes());
return conn.exec();
};
final List<Object> results = template.execute(callback);
System.out.println("Multi result: " + results);
}
@Bean
public JedisConnectionFactory connectionFactory() {
GenericObjectPoolConfig<Jedis> poolConfig = new GenericObjectPoolConfig<>();
poolConfig.setMaxTotal(5);
poolConfig.setTestOnBorrow(true);
final RedisStandaloneConfiguration connectionConfiguration =
new RedisStandaloneConfiguration("localhost", 6379);
final JedisClientConfigurationBuilder clientConfigurationBuilder =
JedisClientConfiguration.builder()
.usePooling()
.poolConfig(poolConfig)
.and()
.connectTimeout(Duration.ofMillis(5_000));
return new JedisConnectionFactory(connectionConfiguration, clientConfigurationBuilder.build());
}
@Bean
public StringRedisTemplate template(JedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}
@Bean
public RedisMessageListenerContainer container(JedisConnectionFactory connectionFactory) {
final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
}
Spring Boot: v2.7.1
Spring Data Redis: v2.7.1
Jedis: v3.8.0
The output with Spring Data Redis 2.7.1 is the following:
2022-07-05 08:37:30.900 INFO 18608 --- [ main] com.repro.RedisReproApplication : Started RedisReproApplication in 6.198 seconds (JVM running for 7.41)
Message: my-topic -- test-msg
2022-07-05 08:37:30.976 ERROR 18608 --- [ main] redis.clients.jedis.JedisFactory : Error while validating pooled Jedis object.
java.lang.ClassCastException: class java.util.ArrayList cannot be cast to class [B (java.util.ArrayList and [B are in module java.base of loader 'bootstrap')
at redis.clients.jedis.Connection.getStatusCodeReply(Connection.java:270) ~[jedis-3.8.0.jar:na]
at redis.clients.jedis.BinaryJedis.ping(BinaryJedis.java:384) ~[jedis-3.8.0.jar:na]
at redis.clients.jedis.JedisFactory.validateObject(JedisFactory.java:214) ~[jedis-3.8.0.jar:na]
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:343) ~[commons-pool2-2.11.1.jar:2.11.1]
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:223) ~[commons-pool2-2.11.1.jar:2.11.1]
at redis.clients.jedis.util.Pool.getResource(Pool.java:75) ~[jedis-3.8.0.jar:na]
at redis.clients.jedis.JedisPool.getResource(JedisPool.java:377) ~[jedis-3.8.0.jar:na]
at redis.clients.jedis.JedisPool.getResource(JedisPool.java:15) ~[jedis-3.8.0.jar:na]
at org.springframework.data.redis.connection.jedis.JedisConnectionFactory.fetchJedisConnector(JedisConnectionFactory.java:283) ~[spring-data-redis-2.7.1.jar:2.7.1]
at org.springframework.data.redis.connection.jedis.JedisConnectionFactory.getConnection(JedisConnectionFactory.java:515) ~[spring-data-redis-2.7.1.jar:2.7.1]
at org.springframework.data.redis.core.RedisConnectionUtils.fetchConnection(RedisConnectionUtils.java:193) ~[spring-data-redis-2.7.1.jar:2.7.1]
at org.springframework.data.redis.core.RedisConnectionUtils.doGetConnection(RedisConnectionUtils.java:144) ~[spring-data-redis-2.7.1.jar:2.7.1]
at org.springframework.data.redis.core.RedisConnectionUtils.getConnection(RedisConnectionUtils.java:105) ~[spring-data-redis-2.7.1.jar:2.7.1]
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:211) ~[spring-data-redis-2.7.1.jar:2.7.1]
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:191) ~[spring-data-redis-2.7.1.jar:2.7.1]
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:178) ~[spring-data-redis-2.7.1.jar:2.7.1]
at com.repro.RedisReproApplication.main(RedisReproApplication.java:55) ~[classes/:na]
Multi result: [false, 23]
Both operations succeeded, however there was connection validation failure in the middle.
With Spring Data Redis 2.7.0 the result is:
2022-07-05 08:55:33.822 INFO 20524 --- [ main] com.repro.RedisReproApplication : Started RedisReproApplication in 6.181 seconds (JVM running for 6.816)
Message: my-topic -- test-msg
Multi result: [false, 24]