Skip to content

DATAREDIS-999 - Use UsingWhen to apply functions to reactive Redis connections #461

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.2.0.BUILD-SNAPSHOT</version>
<version>2.2.0.DATAREDIS-999-SNAPSHOT</version>

<name>Spring Data Redis</name>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.function.Function;

import org.reactivestreams.Publisher;

import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.connection.*;
Expand Down Expand Up @@ -237,7 +238,7 @@ public <T> Flux<T> executeDedicated(LettuceReactiveCallback<T> callback) {
* @see org.springframework.data.redis.connection.ReactiveRedisConnection#closeLater()
*/
public Mono<Void> closeLater() {
return Mono.fromRunnable(dedicatedConnection::close);
return Flux.mergeDelayError(2, dedicatedConnection.close(), pubSubConnection.close()).then();
}

protected Mono<? extends StatefulConnection<ByteBuffer, ByteBuffer>> getConnection() {
Expand Down Expand Up @@ -363,7 +364,7 @@ static class AsyncConnect<T extends io.lettuce.core.api.StatefulConnection<?, ?>
this.connectionProvider = connectionProvider;

Mono<StatefulConnection> defer = Mono
.defer(() -> Mono.fromCompletionStage(connectionProvider.getConnectionAsync(connectionType)));
.fromCompletionStage(() -> connectionProvider.getConnectionAsync(connectionType));

this.connectionPublisher = defer.doOnNext(it -> {

Expand Down Expand Up @@ -403,21 +404,25 @@ Mono<T> getConnection() {
}

/**
* Close connection (blocking call).
* Close connection.
*/
void close() {
Mono<Void> close() {

if (state.compareAndSet(State.INITIAL, CLOSING) || state.compareAndSet(State.CONNECTION_REQUESTED, CLOSING)) {
return Mono.defer(() -> {

StatefulConnection<ByteBuffer, ByteBuffer> connection = this.connection;
this.connection = null;
if (state.compareAndSet(State.INITIAL, CLOSING) || state.compareAndSet(State.CONNECTION_REQUESTED, CLOSING)) {

if (connection != null) {
LettuceFutureUtils.join(connectionProvider.releaseAsync(connection));
}
StatefulConnection<ByteBuffer, ByteBuffer> connection = this.connection;
this.connection = null;

state.set(State.CLOSED);
}
state.set(State.CLOSED);
if (connection != null) {
return Mono.fromCompletionStage(connectionProvider.releaseAsync(connection));
}

}
return Mono.empty();
});
}

private static boolean isClosing(State state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
*/
package org.springframework.data.redis.core;

import org.springframework.data.redis.hash.HashMapper;
import org.springframework.data.redis.hash.ObjectHashMapper;
import org.springframework.lang.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
Expand All @@ -31,6 +28,7 @@
import java.util.stream.Collectors;

import org.reactivestreams.Publisher;

import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse;
Expand All @@ -39,11 +37,14 @@
import org.springframework.data.redis.core.script.DefaultReactiveScriptExecutor;
import org.springframework.data.redis.core.script.ReactiveScriptExecutor;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.hash.HashMapper;
import org.springframework.data.redis.hash.ObjectHashMapper;
import org.springframework.data.redis.listener.ReactiveRedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.serializer.RedisElementReader;
import org.springframework.data.redis.serializer.RedisElementWriter;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

Expand Down Expand Up @@ -131,22 +132,7 @@ public <T> Flux<T> execute(ReactiveRedisCallback<T> action) {
public <T> Flux<T> execute(ReactiveRedisCallback<T> action, boolean exposeConnection) {

Assert.notNull(action, "Callback object must not be null");

ReactiveRedisConnectionFactory factory = getConnectionFactory();
ReactiveRedisConnection conn = factory.getReactiveConnection();

try {

ReactiveRedisConnection connToUse = preProcessConnection(conn, false);

ReactiveRedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
Publisher<T> result = action.doInRedis(connToExpose);

return Flux.from(postProcessResult(result, connToUse, false)).doFinally(signalType -> conn.close());
} catch (RuntimeException e) {
conn.close();
throw e;
}
return Flux.from(doInConnection(action, exposeConnection));
}

/**
Expand All @@ -160,7 +146,7 @@ public <T> Flux<T> createFlux(ReactiveRedisCallback<T> callback) {

Assert.notNull(callback, "ReactiveRedisCallback must not be null!");

return Flux.defer(() -> doInConnection(callback, exposeConnection));
return Flux.from(doInConnection(callback, exposeConnection));
}

/**
Expand All @@ -170,11 +156,11 @@ public <T> Flux<T> createFlux(ReactiveRedisCallback<T> callback) {
* @param callback must not be {@literal null}
* @return a {@link Mono} wrapping the {@link ReactiveRedisCallback}.
*/
public <T> Mono<T> createMono(final ReactiveRedisCallback<T> callback) {
public <T> Mono<T> createMono(ReactiveRedisCallback<T> callback) {

Assert.notNull(callback, "ReactiveRedisCallback must not be null!");

return Mono.defer(() -> Mono.from(doInConnection(callback, exposeConnection)));
return Mono.from(doInConnection(callback, exposeConnection));
}

/**
Expand All @@ -190,15 +176,19 @@ private <T> Publisher<T> doInConnection(ReactiveRedisCallback<T> action, boolean

Assert.notNull(action, "Callback object must not be null");

ReactiveRedisConnectionFactory factory = getConnectionFactory();
ReactiveRedisConnection conn = factory.getReactiveConnection();
return Flux.usingWhen(Mono.fromSupplier(() -> {

ReactiveRedisConnectionFactory factory = getConnectionFactory();
ReactiveRedisConnection conn = factory.getReactiveConnection();
ReactiveRedisConnection connToUse = preProcessConnection(conn, false);

ReactiveRedisConnection connToUse = preProcessConnection(conn, false);
return (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
}), conn -> {
Publisher<T> result = action.doInRedis(conn);

ReactiveRedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
Publisher<T> result = action.doInRedis(connToExpose);
return postProcessResult(result, conn, false);

return Flux.from(postProcessResult(result, connToUse, false)).doFinally(signal -> conn.close());
}, ReactiveRedisConnection::closeLater, ReactiveRedisConnection::closeLater);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really nonblocking call?
I see that ReactiveRedisConnection::closeLater returns Mono, but inside LettuceReactiveRedisConnection it uses call marked as blocking


There are situations when Subscription could be canceled in non blocking thread and this code will block nonblocking thread.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. That's fixed now.

}

/*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.redis.core;

import static org.mockito.Mockito.*;

import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import org.junit.Test;

import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.serializer.RedisSerializationContext;

/**
* Unit tests for {@link ReactiveRedisTemplate}.
*
* @author Mark Paluch
*/
public class ReactiveRedisTemplateUnitTests {

ReactiveRedisConnectionFactory connectionFactoryMock = mock(ReactiveRedisConnectionFactory.class);
ReactiveRedisConnection connectionMock = mock(ReactiveRedisConnection.class);

@Test // DATAREDIS-999
public void closeShouldUseAsyncRelease() {

when(connectionFactoryMock.getReactiveConnection()).thenReturn(connectionMock);
when(connectionMock.closeLater()).thenReturn(Mono.empty());

ReactiveRedisTemplate<String, String> template = new ReactiveRedisTemplate<>(connectionFactoryMock,
RedisSerializationContext.string());

template.execute(connection -> Mono.empty()) //
.as(StepVerifier::create) //
.verifyComplete();

verify(connectionMock).closeLater();
verifyNoMoreInteractions(connectionMock);
}
}