From 3c9c4ac69c94f06a0f09d14b07ae5e2908a7006f Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Fri, 28 Jun 2019 11:32:10 +0200 Subject: [PATCH 1/2] DATAREDIS-999 - Prepare issue branch. --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 9d60cefa32..4d48ce42c7 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-redis - 2.2.0.BUILD-SNAPSHOT + 2.2.0.DATAREDIS-999-SNAPSHOT Spring Data Redis From 766bd8b482729c6d24ef2ad6c02eda422ac269d9 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Fri, 28 Jun 2019 11:57:11 +0200 Subject: [PATCH 2/2] DATAREDIS-999 - Use UsingWhen to apply functions to reactive Redis connections. We now use Flux.usingWhen() to apply functions on reactive Redis connections and to release connections calling non-blocking methods. --- .../LettuceReactiveRedisConnection.java | 29 ++++++---- .../redis/core/ReactiveRedisTemplate.java | 46 ++++++---------- .../core/ReactiveRedisTemplateUnitTests.java | 55 +++++++++++++++++++ 3 files changed, 90 insertions(+), 40 deletions(-) create mode 100644 src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateUnitTests.java diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnection.java index c05560ac50..9de2d508fe 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveRedisConnection.java @@ -32,6 +32,7 @@ import java.util.function.Function; import org.reactivestreams.Publisher; + import org.springframework.dao.DataAccessException; import org.springframework.dao.InvalidDataAccessResourceUsageException; import org.springframework.data.redis.connection.*; @@ -237,7 +238,7 @@ public Flux executeDedicated(LettuceReactiveCallback callback) { * @see org.springframework.data.redis.connection.ReactiveRedisConnection#closeLater() */ public Mono closeLater() { - return Mono.fromRunnable(dedicatedConnection::close); + return Flux.mergeDelayError(2, dedicatedConnection.close(), pubSubConnection.close()).then(); } protected Mono> getConnection() { @@ -363,7 +364,7 @@ static class AsyncConnect this.connectionProvider = connectionProvider; Mono defer = Mono - .defer(() -> Mono.fromCompletionStage(connectionProvider.getConnectionAsync(connectionType))); + .fromCompletionStage(() -> connectionProvider.getConnectionAsync(connectionType)); this.connectionPublisher = defer.doOnNext(it -> { @@ -403,21 +404,25 @@ Mono getConnection() { } /** - * Close connection (blocking call). + * Close connection. */ - void close() { + Mono close() { - if (state.compareAndSet(State.INITIAL, CLOSING) || state.compareAndSet(State.CONNECTION_REQUESTED, CLOSING)) { + return Mono.defer(() -> { - StatefulConnection connection = this.connection; - this.connection = null; + if (state.compareAndSet(State.INITIAL, CLOSING) || state.compareAndSet(State.CONNECTION_REQUESTED, CLOSING)) { - if (connection != null) { - LettuceFutureUtils.join(connectionProvider.releaseAsync(connection)); - } + StatefulConnection connection = this.connection; + this.connection = null; - state.set(State.CLOSED); - } + state.set(State.CLOSED); + if (connection != null) { + return Mono.fromCompletionStage(connectionProvider.releaseAsync(connection)); + } + + } + return Mono.empty(); + }); } private static boolean isClosing(State state) { diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java b/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java index 926ecda131..db578787a9 100644 --- a/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java +++ b/src/main/java/org/springframework/data/redis/core/ReactiveRedisTemplate.java @@ -15,9 +15,6 @@ */ package org.springframework.data.redis.core; -import org.springframework.data.redis.hash.HashMapper; -import org.springframework.data.redis.hash.ObjectHashMapper; -import org.springframework.lang.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -31,6 +28,7 @@ import java.util.stream.Collectors; import org.reactivestreams.Publisher; + import org.springframework.data.redis.connection.DataType; import org.springframework.data.redis.connection.ReactiveRedisConnection; import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse; @@ -39,11 +37,14 @@ import org.springframework.data.redis.core.script.DefaultReactiveScriptExecutor; import org.springframework.data.redis.core.script.ReactiveScriptExecutor; import org.springframework.data.redis.core.script.RedisScript; +import org.springframework.data.redis.hash.HashMapper; +import org.springframework.data.redis.hash.ObjectHashMapper; import org.springframework.data.redis.listener.ReactiveRedisMessageListenerContainer; import org.springframework.data.redis.listener.Topic; import org.springframework.data.redis.serializer.RedisElementReader; import org.springframework.data.redis.serializer.RedisElementWriter; import org.springframework.data.redis.serializer.RedisSerializationContext; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.ClassUtils; @@ -131,22 +132,7 @@ public Flux execute(ReactiveRedisCallback action) { public Flux execute(ReactiveRedisCallback action, boolean exposeConnection) { Assert.notNull(action, "Callback object must not be null"); - - ReactiveRedisConnectionFactory factory = getConnectionFactory(); - ReactiveRedisConnection conn = factory.getReactiveConnection(); - - try { - - ReactiveRedisConnection connToUse = preProcessConnection(conn, false); - - ReactiveRedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse)); - Publisher result = action.doInRedis(connToExpose); - - return Flux.from(postProcessResult(result, connToUse, false)).doFinally(signalType -> conn.close()); - } catch (RuntimeException e) { - conn.close(); - throw e; - } + return Flux.from(doInConnection(action, exposeConnection)); } /** @@ -160,7 +146,7 @@ public Flux createFlux(ReactiveRedisCallback callback) { Assert.notNull(callback, "ReactiveRedisCallback must not be null!"); - return Flux.defer(() -> doInConnection(callback, exposeConnection)); + return Flux.from(doInConnection(callback, exposeConnection)); } /** @@ -170,11 +156,11 @@ public Flux createFlux(ReactiveRedisCallback callback) { * @param callback must not be {@literal null} * @return a {@link Mono} wrapping the {@link ReactiveRedisCallback}. */ - public Mono createMono(final ReactiveRedisCallback callback) { + public Mono createMono(ReactiveRedisCallback callback) { Assert.notNull(callback, "ReactiveRedisCallback must not be null!"); - return Mono.defer(() -> Mono.from(doInConnection(callback, exposeConnection))); + return Mono.from(doInConnection(callback, exposeConnection)); } /** @@ -190,15 +176,19 @@ private Publisher doInConnection(ReactiveRedisCallback action, boolean Assert.notNull(action, "Callback object must not be null"); - ReactiveRedisConnectionFactory factory = getConnectionFactory(); - ReactiveRedisConnection conn = factory.getReactiveConnection(); + return Flux.usingWhen(Mono.fromSupplier(() -> { + + ReactiveRedisConnectionFactory factory = getConnectionFactory(); + ReactiveRedisConnection conn = factory.getReactiveConnection(); + ReactiveRedisConnection connToUse = preProcessConnection(conn, false); - ReactiveRedisConnection connToUse = preProcessConnection(conn, false); + return (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse)); + }), conn -> { + Publisher result = action.doInRedis(conn); - ReactiveRedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse)); - Publisher result = action.doInRedis(connToExpose); + return postProcessResult(result, conn, false); - return Flux.from(postProcessResult(result, connToUse, false)).doFinally(signal -> conn.close()); + }, ReactiveRedisConnection::closeLater, ReactiveRedisConnection::closeLater); } /* diff --git a/src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateUnitTests.java b/src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateUnitTests.java new file mode 100644 index 0000000000..9b958ab466 --- /dev/null +++ b/src/test/java/org/springframework/data/redis/core/ReactiveRedisTemplateUnitTests.java @@ -0,0 +1,55 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.core; + +import static org.mockito.Mockito.*; + +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import org.junit.Test; + +import org.springframework.data.redis.connection.ReactiveRedisConnection; +import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; +import org.springframework.data.redis.serializer.RedisSerializationContext; + +/** + * Unit tests for {@link ReactiveRedisTemplate}. + * + * @author Mark Paluch + */ +public class ReactiveRedisTemplateUnitTests { + + ReactiveRedisConnectionFactory connectionFactoryMock = mock(ReactiveRedisConnectionFactory.class); + ReactiveRedisConnection connectionMock = mock(ReactiveRedisConnection.class); + + @Test // DATAREDIS-999 + public void closeShouldUseAsyncRelease() { + + when(connectionFactoryMock.getReactiveConnection()).thenReturn(connectionMock); + when(connectionMock.closeLater()).thenReturn(Mono.empty()); + + ReactiveRedisTemplate template = new ReactiveRedisTemplate<>(connectionFactoryMock, + RedisSerializationContext.string()); + + template.execute(connection -> Mono.empty()) // + .as(StepVerifier::create) // + .verifyComplete(); + + verify(connectionMock).closeLater(); + verifyNoMoreInteractions(connectionMock); + } +}