Skip to content

Commit feed7b6

Browse files
committed
DATAREDIS-999 - Use UsingWhen to apply functions to reactive Redis connections.
We now use Flux.usingWhen() to apply functions on reactive Redis connections and to release connections calling non-blocking methods.
1 parent 3c9c4ac commit feed7b6

File tree

3 files changed

+90
-40
lines changed

3 files changed

+90
-40
lines changed

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnection.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.function.Function;
3333

3434
import org.reactivestreams.Publisher;
35+
3536
import org.springframework.dao.DataAccessException;
3637
import org.springframework.dao.InvalidDataAccessResourceUsageException;
3738
import org.springframework.data.redis.connection.*;
@@ -237,7 +238,7 @@ public <T> Flux<T> executeDedicated(LettuceReactiveCallback<T> callback) {
237238
* @see org.springframework.data.redis.connection.ReactiveRedisConnection#closeLater()
238239
*/
239240
public Mono<Void> closeLater() {
240-
return Mono.fromRunnable(dedicatedConnection::close);
241+
return Flux.merge(dedicatedConnection.close(), pubSubConnection.close()).then();
241242
}
242243

243244
protected Mono<? extends StatefulConnection<ByteBuffer, ByteBuffer>> getConnection() {
@@ -363,7 +364,7 @@ static class AsyncConnect<T extends io.lettuce.core.api.StatefulConnection<?, ?>
363364
this.connectionProvider = connectionProvider;
364365

365366
Mono<StatefulConnection> defer = Mono
366-
.defer(() -> Mono.fromCompletionStage(connectionProvider.getConnectionAsync(connectionType)));
367+
.fromCompletionStage(() -> connectionProvider.getConnectionAsync(connectionType));
367368

368369
this.connectionPublisher = defer.doOnNext(it -> {
369370

@@ -403,21 +404,25 @@ Mono<T> getConnection() {
403404
}
404405

405406
/**
406-
* Close connection (blocking call).
407+
* Close connection.
407408
*/
408-
void close() {
409+
Mono<Void> close() {
409410

410-
if (state.compareAndSet(State.INITIAL, CLOSING) || state.compareAndSet(State.CONNECTION_REQUESTED, CLOSING)) {
411+
return Mono.defer(() -> {
411412

412-
StatefulConnection<ByteBuffer, ByteBuffer> connection = this.connection;
413-
this.connection = null;
413+
if (state.compareAndSet(State.INITIAL, CLOSING) || state.compareAndSet(State.CONNECTION_REQUESTED, CLOSING)) {
414414

415-
if (connection != null) {
416-
LettuceFutureUtils.join(connectionProvider.releaseAsync(connection));
417-
}
415+
StatefulConnection<ByteBuffer, ByteBuffer> connection = this.connection;
416+
this.connection = null;
418417

419-
state.set(State.CLOSED);
420-
}
418+
state.set(State.CLOSED);
419+
if (connection != null) {
420+
return Mono.fromCompletionStage(connectionProvider.releaseAsync(connection));
421+
}
422+
423+
}
424+
return Mono.empty();
425+
});
421426
}
422427

423428
private static boolean isClosing(State state) {

src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java

Lines changed: 18 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,6 @@
1515
*/
1616
package org.springframework.data.redis.core;
1717

18-
import org.springframework.data.redis.hash.HashMapper;
19-
import org.springframework.data.redis.hash.ObjectHashMapper;
20-
import org.springframework.lang.Nullable;
2118
import reactor.core.publisher.Flux;
2219
import reactor.core.publisher.Mono;
2320
import reactor.core.scheduler.Schedulers;
@@ -31,6 +28,7 @@
3128
import java.util.stream.Collectors;
3229

3330
import org.reactivestreams.Publisher;
31+
3432
import org.springframework.data.redis.connection.DataType;
3533
import org.springframework.data.redis.connection.ReactiveRedisConnection;
3634
import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse;
@@ -39,11 +37,14 @@
3937
import org.springframework.data.redis.core.script.DefaultReactiveScriptExecutor;
4038
import org.springframework.data.redis.core.script.ReactiveScriptExecutor;
4139
import org.springframework.data.redis.core.script.RedisScript;
40+
import org.springframework.data.redis.hash.HashMapper;
41+
import org.springframework.data.redis.hash.ObjectHashMapper;
4242
import org.springframework.data.redis.listener.ReactiveRedisMessageListenerContainer;
4343
import org.springframework.data.redis.listener.Topic;
4444
import org.springframework.data.redis.serializer.RedisElementReader;
4545
import org.springframework.data.redis.serializer.RedisElementWriter;
4646
import org.springframework.data.redis.serializer.RedisSerializationContext;
47+
import org.springframework.lang.Nullable;
4748
import org.springframework.util.Assert;
4849
import org.springframework.util.ClassUtils;
4950

@@ -131,22 +132,7 @@ public <T> Flux<T> execute(ReactiveRedisCallback<T> action) {
131132
public <T> Flux<T> execute(ReactiveRedisCallback<T> action, boolean exposeConnection) {
132133

133134
Assert.notNull(action, "Callback object must not be null");
134-
135-
ReactiveRedisConnectionFactory factory = getConnectionFactory();
136-
ReactiveRedisConnection conn = factory.getReactiveConnection();
137-
138-
try {
139-
140-
ReactiveRedisConnection connToUse = preProcessConnection(conn, false);
141-
142-
ReactiveRedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
143-
Publisher<T> result = action.doInRedis(connToExpose);
144-
145-
return Flux.from(postProcessResult(result, connToUse, false)).doFinally(signalType -> conn.close());
146-
} catch (RuntimeException e) {
147-
conn.close();
148-
throw e;
149-
}
135+
return Flux.from(doInConnection(action, exposeConnection));
150136
}
151137

152138
/**
@@ -160,7 +146,7 @@ public <T> Flux<T> createFlux(ReactiveRedisCallback<T> callback) {
160146

161147
Assert.notNull(callback, "ReactiveRedisCallback must not be null!");
162148

163-
return Flux.defer(() -> doInConnection(callback, exposeConnection));
149+
return Flux.from(doInConnection(callback, exposeConnection));
164150
}
165151

166152
/**
@@ -170,11 +156,11 @@ public <T> Flux<T> createFlux(ReactiveRedisCallback<T> callback) {
170156
* @param callback must not be {@literal null}
171157
* @return a {@link Mono} wrapping the {@link ReactiveRedisCallback}.
172158
*/
173-
public <T> Mono<T> createMono(final ReactiveRedisCallback<T> callback) {
159+
public <T> Mono<T> createMono(ReactiveRedisCallback<T> callback) {
174160

175161
Assert.notNull(callback, "ReactiveRedisCallback must not be null!");
176162

177-
return Mono.defer(() -> Mono.from(doInConnection(callback, exposeConnection)));
163+
return Mono.from(doInConnection(callback, exposeConnection));
178164
}
179165

180166
/**
@@ -190,15 +176,19 @@ private <T> Publisher<T> doInConnection(ReactiveRedisCallback<T> action, boolean
190176

191177
Assert.notNull(action, "Callback object must not be null");
192178

193-
ReactiveRedisConnectionFactory factory = getConnectionFactory();
194-
ReactiveRedisConnection conn = factory.getReactiveConnection();
179+
return Flux.usingWhen(Mono.fromSupplier(() -> {
180+
181+
ReactiveRedisConnectionFactory factory = getConnectionFactory();
182+
ReactiveRedisConnection conn = factory.getReactiveConnection();
183+
ReactiveRedisConnection connToUse = preProcessConnection(conn, false);
195184

196-
ReactiveRedisConnection connToUse = preProcessConnection(conn, false);
185+
return (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
186+
}), conn -> {
187+
Publisher<T> result = action.doInRedis(conn);
197188

198-
ReactiveRedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
199-
Publisher<T> result = action.doInRedis(connToExpose);
189+
return postProcessResult(result, conn, false);
200190

201-
return Flux.from(postProcessResult(result, connToUse, false)).doFinally(signal -> conn.close());
191+
}, ReactiveRedisConnection::closeLater, ReactiveRedisConnection::closeLater);
202192
}
203193

204194
/*
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.redis.core;
17+
18+
import static org.mockito.Mockito.*;
19+
20+
import reactor.core.publisher.Mono;
21+
import reactor.test.StepVerifier;
22+
23+
import org.junit.Test;
24+
25+
import org.springframework.data.redis.connection.ReactiveRedisConnection;
26+
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
27+
import org.springframework.data.redis.serializer.RedisSerializationContext;
28+
29+
/**
30+
* Unit tests for {@link ReactiveRedisTemplate}.
31+
*
32+
* @author Mark Paluch
33+
*/
34+
public class ReactiveRedisTemplateUnitTests {
35+
36+
ReactiveRedisConnectionFactory connectionFactoryMock = mock(ReactiveRedisConnectionFactory.class);
37+
ReactiveRedisConnection connectionMock = mock(ReactiveRedisConnection.class);
38+
39+
@Test // DATAREDIS-999
40+
public void closeShouldUseAsyncRelease() {
41+
42+
when(connectionFactoryMock.getReactiveConnection()).thenReturn(connectionMock);
43+
when(connectionMock.closeLater()).thenReturn(Mono.empty());
44+
45+
ReactiveRedisTemplate<String, String> template = new ReactiveRedisTemplate<>(connectionFactoryMock,
46+
RedisSerializationContext.string());
47+
48+
template.execute(connection -> Mono.empty()) //
49+
.as(StepVerifier::create) //
50+
.verifyComplete();
51+
52+
verify(connectionMock).closeLater();
53+
verifyNoMoreInteractions(connectionMock);
54+
}
55+
}

0 commit comments

Comments
 (0)