Skip to content

Commit da1de92

Browse files
mp911dechristophstrobl
authored andcommitted
DATAREDIS-988 - Discard transactions when releasing pooled connections.
We now check on connection release whether a connection is in MULTI state. If so, then we discard (rollback) the transaction to reset the connection to a fresh state. Original Pull Request: #453
1 parent dfa366d commit da1de92

File tree

3 files changed

+93
-2
lines changed

3 files changed

+93
-2
lines changed

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1281,6 +1281,13 @@ private class LettucePoolConnectionProvider implements LettuceConnectionProvider
12811281
public void release(StatefulConnection<?, ?> connection) {
12821282

12831283
if (connection.isOpen()) {
1284+
1285+
if (connection instanceof StatefulRedisConnection) {
1286+
StatefulRedisConnection<?, ?> redisConnection = (StatefulRedisConnection<?, ?>) connection;
1287+
if (redisConnection.isMulti()) {
1288+
redisConnection.async().discard();
1289+
}
1290+
}
12841291
pool.returnResource((StatefulConnection<byte[], byte[]>) connection);
12851292
} else {
12861293
pool.returnBrokenResource((StatefulConnection<byte[], byte[]>) connection);

src/main/java/org/springframework/data/redis/connection/lettuce/LettucePoolingConnectionProvider.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import io.lettuce.core.AbstractRedisClient;
1919
import io.lettuce.core.api.StatefulConnection;
20+
import io.lettuce.core.api.StatefulRedisConnection;
2021
import io.lettuce.core.support.AsyncConnectionPoolSupport;
2122
import io.lettuce.core.support.AsyncPool;
2223
import io.lettuce.core.support.BoundedPoolConfig;
@@ -109,7 +110,7 @@ class LettucePoolingConnectionProvider implements LettuceConnectionProvider, Red
109110
}
110111
}
111112

112-
/*
113+
/*
113114
* (non-Javadoc)
114115
* @see org.springframework.data.redis.connection.lettuce.LettuceConnectionProvider#getConnectionAsync(java.lang.Class)
115116
*/
@@ -170,14 +171,27 @@ public void release(StatefulConnection<?, ?> connection) {
170171
+ " was either previously returned or does not belong to this connection provider");
171172
}
172173

174+
discardIfNecessary(connection);
173175
asyncPool.release(connection).join();
174176
return;
175177
}
176178

179+
discardIfNecessary(connection);
177180
pool.returnObject(connection);
178181
}
179182

180-
/*
183+
private void discardIfNecessary(StatefulConnection<?, ?> connection) {
184+
185+
if (connection instanceof StatefulRedisConnection) {
186+
187+
StatefulRedisConnection<?, ?> redisConnection = (StatefulRedisConnection<?, ?>) connection;
188+
if (redisConnection.isMulti()) {
189+
redisConnection.async().discard();
190+
}
191+
}
192+
}
193+
194+
/*
181195
* (non-Javadoc)
182196
* @see org.springframework.data.redis.connection.lettuce.LettuceConnectionProvider#releaseAsync(io.lettuce.core.api.StatefulConnection)
183197
*/
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.redis.connection.lettuce;
17+
18+
import static org.mockito.Mockito.*;
19+
20+
import io.lettuce.core.api.StatefulRedisConnection;
21+
import io.lettuce.core.api.async.RedisAsyncCommands;
22+
23+
import org.junit.Before;
24+
import org.junit.Test;
25+
import org.junit.runner.RunWith;
26+
import org.mockito.Mock;
27+
import org.mockito.junit.MockitoJUnitRunner;
28+
29+
/**
30+
* Unit tests for {@link LettucePoolingConnectionProvider}.
31+
*
32+
* @author Mark Paluch
33+
*/
34+
@RunWith(MockitoJUnitRunner.class)
35+
public class LettucePoolingConnectionProviderUnitTests {
36+
37+
@Mock LettuceConnectionProvider connectionProviderMock;
38+
@Mock StatefulRedisConnection<byte[], byte[]> connectionMock;
39+
@Mock RedisAsyncCommands<byte[], byte[]> commandsMock;
40+
41+
LettucePoolingClientConfiguration config = LettucePoolingClientConfiguration.defaultConfiguration();
42+
43+
@Before
44+
public void before() {
45+
46+
when(connectionMock.async()).thenReturn(commandsMock);
47+
when(connectionProviderMock.getConnection(any())).thenReturn(connectionMock);
48+
}
49+
50+
@Test // DATAREDIS-988
51+
public void shouldReturnConnectionOnRelease() {
52+
53+
LettucePoolingConnectionProvider provider = new LettucePoolingConnectionProvider(connectionProviderMock, config);
54+
55+
provider.release(provider.getConnection(StatefulRedisConnection.class));
56+
57+
verifyZeroInteractions(commandsMock);
58+
}
59+
60+
@Test // DATAREDIS-988
61+
public void shouldDiscardTransactionOnReleaseOnActiveTransaction() {
62+
63+
LettucePoolingConnectionProvider provider = new LettucePoolingConnectionProvider(connectionProviderMock, config);
64+
when(connectionMock.isMulti()).thenReturn(true);
65+
66+
provider.release(provider.getConnection(StatefulRedisConnection.class));
67+
68+
verify(commandsMock).discard();
69+
}
70+
}

0 commit comments

Comments
 (0)