diff --git a/pom.xml b/pom.xml
index 9d60cefa32..4d48ce42c7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
org.springframework.data
spring-data-redis
- 2.2.0.BUILD-SNAPSHOT
+ 2.2.0.DATAREDIS-999-SNAPSHOT
Spring Data Redis
diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnection.java
index c05560ac50..9de2d508fe 100644
--- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnection.java
+++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnection.java
@@ -32,6 +32,7 @@
import java.util.function.Function;
import org.reactivestreams.Publisher;
+
import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.connection.*;
@@ -237,7 +238,7 @@ public Flux executeDedicated(LettuceReactiveCallback callback) {
* @see org.springframework.data.redis.connection.ReactiveRedisConnection#closeLater()
*/
public Mono closeLater() {
- return Mono.fromRunnable(dedicatedConnection::close);
+ return Flux.mergeDelayError(2, dedicatedConnection.close(), pubSubConnection.close()).then();
}
protected Mono extends StatefulConnection> getConnection() {
@@ -363,7 +364,7 @@ static class AsyncConnect
this.connectionProvider = connectionProvider;
Mono defer = Mono
- .defer(() -> Mono.fromCompletionStage(connectionProvider.getConnectionAsync(connectionType)));
+ .fromCompletionStage(() -> connectionProvider.getConnectionAsync(connectionType));
this.connectionPublisher = defer.doOnNext(it -> {
@@ -403,21 +404,25 @@ Mono getConnection() {
}
/**
- * Close connection (blocking call).
+ * Close connection.
*/
- void close() {
+ Mono close() {
- if (state.compareAndSet(State.INITIAL, CLOSING) || state.compareAndSet(State.CONNECTION_REQUESTED, CLOSING)) {
+ return Mono.defer(() -> {
- StatefulConnection connection = this.connection;
- this.connection = null;
+ if (state.compareAndSet(State.INITIAL, CLOSING) || state.compareAndSet(State.CONNECTION_REQUESTED, CLOSING)) {
- if (connection != null) {
- LettuceFutureUtils.join(connectionProvider.releaseAsync(connection));
- }
+ StatefulConnection connection = this.connection;
+ this.connection = null;
- state.set(State.CLOSED);
- }
+ state.set(State.CLOSED);
+ if (connection != null) {
+ return Mono.fromCompletionStage(connectionProvider.releaseAsync(connection));
+ }
+
+ }
+ return Mono.empty();
+ });
}
private static boolean isClosing(State state) {
diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java b/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java
index 926ecda131..db578787a9 100644
--- a/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java
+++ b/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java
@@ -15,9 +15,6 @@
*/
package org.springframework.data.redis.core;
-import org.springframework.data.redis.hash.HashMapper;
-import org.springframework.data.redis.hash.ObjectHashMapper;
-import org.springframework.lang.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@@ -31,6 +28,7 @@
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
+
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse;
@@ -39,11 +37,14 @@
import org.springframework.data.redis.core.script.DefaultReactiveScriptExecutor;
import org.springframework.data.redis.core.script.ReactiveScriptExecutor;
import org.springframework.data.redis.core.script.RedisScript;
+import org.springframework.data.redis.hash.HashMapper;
+import org.springframework.data.redis.hash.ObjectHashMapper;
import org.springframework.data.redis.listener.ReactiveRedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.serializer.RedisElementReader;
import org.springframework.data.redis.serializer.RedisElementWriter;
import org.springframework.data.redis.serializer.RedisSerializationContext;
+import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
@@ -131,22 +132,7 @@ public Flux execute(ReactiveRedisCallback action) {
public Flux execute(ReactiveRedisCallback action, boolean exposeConnection) {
Assert.notNull(action, "Callback object must not be null");
-
- ReactiveRedisConnectionFactory factory = getConnectionFactory();
- ReactiveRedisConnection conn = factory.getReactiveConnection();
-
- try {
-
- ReactiveRedisConnection connToUse = preProcessConnection(conn, false);
-
- ReactiveRedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
- Publisher result = action.doInRedis(connToExpose);
-
- return Flux.from(postProcessResult(result, connToUse, false)).doFinally(signalType -> conn.close());
- } catch (RuntimeException e) {
- conn.close();
- throw e;
- }
+ return Flux.from(doInConnection(action, exposeConnection));
}
/**
@@ -160,7 +146,7 @@ public Flux createFlux(ReactiveRedisCallback callback) {
Assert.notNull(callback, "ReactiveRedisCallback must not be null!");
- return Flux.defer(() -> doInConnection(callback, exposeConnection));
+ return Flux.from(doInConnection(callback, exposeConnection));
}
/**
@@ -170,11 +156,11 @@ public Flux createFlux(ReactiveRedisCallback callback) {
* @param callback must not be {@literal null}
* @return a {@link Mono} wrapping the {@link ReactiveRedisCallback}.
*/
- public Mono createMono(final ReactiveRedisCallback callback) {
+ public Mono createMono(ReactiveRedisCallback callback) {
Assert.notNull(callback, "ReactiveRedisCallback must not be null!");
- return Mono.defer(() -> Mono.from(doInConnection(callback, exposeConnection)));
+ return Mono.from(doInConnection(callback, exposeConnection));
}
/**
@@ -190,15 +176,19 @@ private Publisher doInConnection(ReactiveRedisCallback action, boolean
Assert.notNull(action, "Callback object must not be null");
- ReactiveRedisConnectionFactory factory = getConnectionFactory();
- ReactiveRedisConnection conn = factory.getReactiveConnection();
+ return Flux.usingWhen(Mono.fromSupplier(() -> {
+
+ ReactiveRedisConnectionFactory factory = getConnectionFactory();
+ ReactiveRedisConnection conn = factory.getReactiveConnection();
+ ReactiveRedisConnection connToUse = preProcessConnection(conn, false);
- ReactiveRedisConnection connToUse = preProcessConnection(conn, false);
+ return (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
+ }), conn -> {
+ Publisher result = action.doInRedis(conn);
- ReactiveRedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
- Publisher result = action.doInRedis(connToExpose);
+ return postProcessResult(result, conn, false);
- return Flux.from(postProcessResult(result, connToUse, false)).doFinally(signal -> conn.close());
+ }, ReactiveRedisConnection::closeLater, ReactiveRedisConnection::closeLater);
}
/*
diff --git a/src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateUnitTests.java b/src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateUnitTests.java
new file mode 100644
index 0000000000..9b958ab466
--- /dev/null
+++ b/src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateUnitTests.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.redis.core;
+
+import static org.mockito.Mockito.*;
+
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+import org.junit.Test;
+
+import org.springframework.data.redis.connection.ReactiveRedisConnection;
+import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
+import org.springframework.data.redis.serializer.RedisSerializationContext;
+
+/**
+ * Unit tests for {@link ReactiveRedisTemplate}.
+ *
+ * @author Mark Paluch
+ */
+public class ReactiveRedisTemplateUnitTests {
+
+ ReactiveRedisConnectionFactory connectionFactoryMock = mock(ReactiveRedisConnectionFactory.class);
+ ReactiveRedisConnection connectionMock = mock(ReactiveRedisConnection.class);
+
+ @Test // DATAREDIS-999
+ public void closeShouldUseAsyncRelease() {
+
+ when(connectionFactoryMock.getReactiveConnection()).thenReturn(connectionMock);
+ when(connectionMock.closeLater()).thenReturn(Mono.empty());
+
+ ReactiveRedisTemplate template = new ReactiveRedisTemplate<>(connectionFactoryMock,
+ RedisSerializationContext.string());
+
+ template.execute(connection -> Mono.empty()) //
+ .as(StepVerifier::create) //
+ .verifyComplete();
+
+ verify(connectionMock).closeLater();
+ verifyNoMoreInteractions(connectionMock);
+ }
+}