28
28
import reactor .core .publisher .Mono ;
29
29
30
30
import java .nio .ByteBuffer ;
31
- import java .util .concurrent .atomic .AtomicReference ;
31
+ import java .util .concurrent .atomic .AtomicReferenceFieldUpdater ;
32
32
import java .util .function .Function ;
33
33
34
34
import org .reactivestreams .Publisher ;
35
-
36
35
import org .springframework .dao .DataAccessException ;
37
36
import org .springframework .dao .InvalidDataAccessResourceUsageException ;
38
37
import org .springframework .data .redis .connection .*;
@@ -48,10 +47,12 @@ class LettuceReactiveRedisConnection implements ReactiveRedisConnection {
48
47
49
48
static final RedisCodec <ByteBuffer , ByteBuffer > CODEC = ByteBufferCodec .INSTANCE ;
50
49
50
+ private final Object mutex = new Object ();
51
+
51
52
private final AsyncConnect <StatefulConnection <ByteBuffer , ByteBuffer >> dedicatedConnection ;
52
53
private final AsyncConnect <StatefulRedisPubSubConnection <ByteBuffer , ByteBuffer >> pubSubConnection ;
53
54
54
- private final LettuceReactivePubSubCommands pubSub = new LettuceReactivePubSubCommands ( this ) ;
55
+ private volatile LettuceReactivePubSubCommands pubSub ;
55
56
56
57
private @ Nullable Mono <StatefulConnection <ByteBuffer , ByteBuffer >> sharedConnection ;
57
58
@@ -139,7 +140,13 @@ public ReactiveHyperLogLogCommands hyperLogLogCommands() {
139
140
140
141
@ Override
141
142
public ReactivePubSubCommands pubSubCommands () {
142
- return pubSub ;
143
+
144
+ synchronized (mutex ) {
145
+ if (this .pubSub == null ) {
146
+ this .pubSub = new LettuceReactivePubSubCommands (this );
147
+ }
148
+ return pubSub ;
149
+ }
143
150
}
144
151
145
152
@ Override
@@ -291,10 +298,13 @@ public ByteBuffer encodeValue(ByteBuffer value) {
291
298
*/
292
299
static class AsyncConnect <T extends io .lettuce .core .api .StatefulConnection <?, ?>> {
293
300
301
+ static AtomicReferenceFieldUpdater <AsyncConnect , State > STATE = AtomicReferenceFieldUpdater
302
+ .newUpdater (AsyncConnect .class , State .class , "state" );
303
+
294
304
private final Mono <T > connectionPublisher ;
295
305
private final LettuceConnectionProvider connectionProvider ;
296
306
297
- private AtomicReference < State > state = new AtomicReference <>( State .INITIAL ) ;
307
+ private volatile State state = State .INITIAL ;
298
308
private volatile @ Nullable StatefulConnection <ByteBuffer , ByteBuffer > connection ;
299
309
300
310
@ SuppressWarnings ("unchecked" )
@@ -310,7 +320,7 @@ static class AsyncConnect<T extends io.lettuce.core.api.StatefulConnection<?, ?>
310
320
311
321
this .connectionPublisher = defer .doOnNext (it -> {
312
322
313
- if (isClosing (this . state . get ())) {
323
+ if (isClosing (STATE . get (this ))) {
314
324
it .closeAsync ();
315
325
} else {
316
326
connection = it ;
@@ -319,7 +329,7 @@ static class AsyncConnect<T extends io.lettuce.core.api.StatefulConnection<?, ?>
319
329
.cache () //
320
330
.handle ((connection , sink ) -> {
321
331
322
- if (isClosing (this . state . get ())) {
332
+ if (isClosing (STATE . get (this ))) {
323
333
sink .error (new IllegalStateException ("Unable to connect; Connection is closed" ));
324
334
} else {
325
335
sink .next ((T ) connection );
@@ -335,12 +345,12 @@ static class AsyncConnect<T extends io.lettuce.core.api.StatefulConnection<?, ?>
335
345
*/
336
346
Mono <T > getConnection () {
337
347
338
- State state = this . state . get ();
348
+ State state = STATE . get (this );
339
349
if (isClosing (state )) {
340
350
return Mono .error (new IllegalStateException ("Unable to connect; Connection is closed" ));
341
351
}
342
352
343
- this . state . compareAndSet (State .INITIAL , State .CONNECTION_REQUESTED );
353
+ STATE . compareAndSet (this , State .INITIAL , State .CONNECTION_REQUESTED );
344
354
345
355
return connectionPublisher ;
346
356
}
@@ -352,12 +362,13 @@ Mono<Void> close() {
352
362
353
363
return Mono .defer (() -> {
354
364
355
- if (state .compareAndSet (State .INITIAL , CLOSING ) || state .compareAndSet (State .CONNECTION_REQUESTED , CLOSING )) {
365
+ if (STATE .compareAndSet (this , State .INITIAL , CLOSING )
366
+ || STATE .compareAndSet (this , State .CONNECTION_REQUESTED , CLOSING )) {
356
367
357
368
StatefulConnection <ByteBuffer , ByteBuffer > connection = this .connection ;
358
369
this .connection = null ;
359
370
360
- state .set (State .CLOSED );
371
+ STATE .set (this , State .CLOSED );
361
372
if (connection != null ) {
362
373
return Mono .fromCompletionStage (connectionProvider .releaseAsync (connection ));
363
374
}
0 commit comments