Skip to content

Commit 13d18a9

Browse files
madgnomeRobWin
authored andcommitted
ReactiveX#35 Make sure the Reactor operators can be used together on a Flux (ReactiveX#211)
1 parent d26c832 commit 13d18a9

File tree

9 files changed

+165
-148
lines changed

9 files changed

+165
-148
lines changed

resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/bulkhead/operator/BulkheadSubscriber.java

Lines changed: 19 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.reactivestreams.Subscriber;
2222
import org.reactivestreams.Subscription;
2323
import reactor.core.CoreSubscriber;
24-
import reactor.core.publisher.Operators;
24+
import reactor.core.publisher.BaseSubscriber;
2525

2626
import java.util.concurrent.atomic.AtomicReference;
2727

@@ -32,71 +32,53 @@
3232
*
3333
* @param <T> the value type of the upstream and downstream
3434
*/
35-
class BulkheadSubscriber<T> extends Operators.MonoSubscriber<T, T> {
35+
class BulkheadSubscriber<T> extends BaseSubscriber<T> {
3636

37+
private final CoreSubscriber<? super T> actual;
3738
private final Bulkhead bulkhead;
3839
private final AtomicReference<Permit> permitted = new AtomicReference<>(Permit.PENDING);
3940

40-
private Subscription subscription;
41-
4241
public BulkheadSubscriber(Bulkhead bulkhead,
4342
CoreSubscriber<? super T> actual) {
44-
super(actual);
43+
this.actual = actual;
4544
this.bulkhead = requireNonNull(bulkhead);
4645
}
4746

4847
@Override
49-
public void onSubscribe(Subscription subscription) {
50-
if (Operators.validate(this.subscription, subscription)) {
51-
this.subscription = subscription;
52-
if (acquireCallPermit()) {
53-
actual.onSubscribe(this);
54-
} else {
55-
cancel();
56-
actual.onSubscribe(this);
57-
actual.onError(new BulkheadFullException(
58-
String.format("Bulkhead '%s' is full", bulkhead.getName())));
59-
}
48+
public void hookOnSubscribe(Subscription subscription) {
49+
if (acquireCallPermit()) {
50+
actual.onSubscribe(this);
51+
} else {
52+
cancel();
53+
actual.onSubscribe(this);
54+
actual.onError(new BulkheadFullException(
55+
String.format("Bulkhead '%s' is full", bulkhead.getName())));
6056
}
6157
}
6258

6359
@Override
64-
public void onNext(T t) {
65-
requireNonNull(t);
66-
67-
if (isInvocationPermitted()) {
60+
public void hookOnNext(T t) {
61+
if (notCancelled() && wasCallPermitted()) {
6862
actual.onNext(t);
6963
}
7064
}
7165

7266
@Override
73-
public void onError(Throwable t) {
74-
requireNonNull(t);
75-
76-
if (isInvocationPermitted()) {
67+
public void hookOnError(Throwable t) {
68+
if (wasCallPermitted()) {
7769
bulkhead.onComplete();
7870
actual.onError(t);
7971
}
8072
}
8173

8274
@Override
83-
public void onComplete() {
84-
if (isInvocationPermitted()) {
75+
public void hookOnComplete() {
76+
if (wasCallPermitted()) {
8577
releaseBulkhead();
8678
actual.onComplete();
8779
}
8880
}
8981

90-
@Override
91-
public void request(long n) {
92-
subscription.request(n);
93-
}
94-
95-
@Override
96-
public void cancel() {
97-
super.cancel();
98-
}
99-
10082
private boolean acquireCallPermit() {
10183
boolean callPermitted = false;
10284
if (permitted.compareAndSet(Permit.PENDING, Permit.ACQUIRED)) {
@@ -108,12 +90,8 @@ private boolean acquireCallPermit() {
10890
return callPermitted;
10991
}
11092

111-
private boolean isInvocationPermitted() {
112-
return notCancelled() && wasCallPermitted();
113-
}
114-
11593
private boolean notCancelled() {
116-
return !this.isCancelled();
94+
return !this.isDisposed();
11795
}
11896

11997
private boolean wasCallPermitted() {

resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/circuitbreaker/operator/CircuitBreakerSubscriber.java

Lines changed: 28 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.reactivestreams.Subscriber;
2323
import org.reactivestreams.Subscription;
2424
import reactor.core.CoreSubscriber;
25-
import reactor.core.publisher.Operators;
25+
import reactor.core.publisher.BaseSubscriber;
2626

2727
import java.util.concurrent.atomic.AtomicReference;
2828

@@ -33,65 +33,54 @@
3333
*
3434
* @param <T> the value type of the upstream and downstream
3535
*/
36-
class CircuitBreakerSubscriber<T> extends Operators.MonoSubscriber<T, T> {
36+
class CircuitBreakerSubscriber<T> extends BaseSubscriber<T> {
3737

38+
private final CoreSubscriber<? super T> actual;
3839
private final CircuitBreaker circuitBreaker;
3940
private final AtomicReference<Permit> permitted = new AtomicReference<>(Permit.PENDING);
4041
private StopWatch stopWatch;
41-
private Subscription subscription;
4242

4343
public CircuitBreakerSubscriber(CircuitBreaker circuitBreaker,
4444
CoreSubscriber<? super T> actual) {
45-
super(actual);
45+
this.actual = actual;
4646
this.circuitBreaker = requireNonNull(circuitBreaker);
4747
}
4848

4949
@Override
50-
public void onSubscribe(Subscription subscription) {
51-
if (Operators.validate(this.subscription, subscription)) {
52-
this.subscription = subscription;
53-
54-
if (acquireCallPermit()) {
55-
actual.onSubscribe(this);
56-
} else {
57-
cancel();
58-
actual.onSubscribe(this);
59-
actual.onError(new CircuitBreakerOpenException(
60-
String.format("CircuitBreaker '%s' is open", circuitBreaker.getName())));
61-
}
62-
}
63-
}
64-
65-
@Override
66-
public void onNext(T t) {
67-
requireNonNull(t);
68-
69-
if (isInvocationPermitted()) {
70-
actual.onNext(t);
50+
protected void hookOnSubscribe(Subscription subscription) {
51+
if (acquireCallPermit()) {
52+
actual.onSubscribe(this);
53+
} else {
54+
cancel();
55+
actual.onSubscribe(this);
56+
actual.onError(new CircuitBreakerOpenException(
57+
String.format("CircuitBreaker '%s' is open", circuitBreaker.getName())));
7158
}
7259
}
7360

7461
@Override
75-
public void onError(Throwable t) {
76-
requireNonNull(t);
77-
78-
markFailure(t);
79-
if (isInvocationPermitted()) {
80-
actual.onError(t);
62+
protected void hookOnNext(T value) {
63+
if (notCancelled() && wasCallPermitted()) {
64+
actual.onNext(value);
8165
}
8266
}
8367

8468
@Override
85-
public void onComplete() {
69+
protected void hookOnComplete() {
8670
markSuccess();
87-
if (isInvocationPermitted()) {
71+
if (wasCallPermitted()) {
8872
actual.onComplete();
8973
}
9074
}
9175

9276
@Override
93-
public void request(long n) {
94-
subscription.request(n);
77+
protected void hookOnError(Throwable t) {
78+
requireNonNull(t);
79+
80+
markFailure(t);
81+
if (wasCallPermitted()) {
82+
actual.onError(t);
83+
}
9584
}
9685

9786
private boolean acquireCallPermit() {
@@ -107,10 +96,6 @@ private boolean acquireCallPermit() {
10796
return callPermitted;
10897
}
10998

110-
private boolean isInvocationPermitted() {
111-
return !this.isCancelled() && wasCallPermitted();
112-
}
113-
11499
private void markFailure(Throwable e) {
115100
if (wasCallPermitted()) {
116101
circuitBreaker.onError(stopWatch.stop().getProcessingDuration().toNanos(), e);
@@ -123,6 +108,10 @@ private void markSuccess() {
123108
}
124109
}
125110

111+
private boolean notCancelled() {
112+
return !this.isDisposed();
113+
}
114+
126115
private boolean wasCallPermitted() {
127116
return permitted.get() == Permit.ACQUIRED;
128117
}

resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/ratelimiter/operator/RateLimiterSubscriber.java

Lines changed: 18 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.reactivestreams.Subscriber;
2222
import org.reactivestreams.Subscription;
2323
import reactor.core.CoreSubscriber;
24-
import reactor.core.publisher.Operators;
24+
import reactor.core.publisher.BaseSubscriber;
2525

2626
import java.util.concurrent.atomic.AtomicBoolean;
2727
import java.util.concurrent.atomic.AtomicReference;
@@ -33,39 +33,33 @@
3333
*
3434
* @param <T> the value type of the upstream and downstream
3535
*/
36-
class RateLimiterSubscriber<T> extends Operators.MonoSubscriber<T, T> {
36+
class RateLimiterSubscriber<T> extends BaseSubscriber<T> {
3737

38+
private final CoreSubscriber<? super T> actual;
3839
private final RateLimiter rateLimiter;
3940
private final AtomicReference<Permit> permitted = new AtomicReference<>(Permit.PENDING);
4041
private final AtomicBoolean firstEvent = new AtomicBoolean(true);
4142

42-
private Subscription subscription;
43-
4443
public RateLimiterSubscriber(RateLimiter rateLimiter,
4544
CoreSubscriber<? super T> actual) {
46-
super(actual);
45+
this.actual = actual;
4746
this.rateLimiter = requireNonNull(rateLimiter);
4847
}
4948

5049
@Override
51-
public void onSubscribe(Subscription subscription) {
52-
if (Operators.validate(this.subscription, subscription)) {
53-
this.subscription = subscription;
54-
if (acquireCallPermit()) {
55-
actual.onSubscribe(this);
56-
} else {
57-
cancel();
58-
actual.onSubscribe(this);
59-
actual.onError(rateLimitExceededException());
60-
}
50+
public void hookOnSubscribe(Subscription subscription) {
51+
if (acquireCallPermit()) {
52+
actual.onSubscribe(this);
53+
} else {
54+
cancel();
55+
actual.onSubscribe(this);
56+
actual.onError(rateLimitExceededException());
6157
}
6258
}
6359

6460
@Override
65-
public void onNext(T t) {
66-
requireNonNull(t);
67-
68-
if (isInvocationPermitted()) {
61+
public void hookOnNext(T t) {
62+
if (notCancelled() && wasCallPermitted()) {
6963
if (firstEvent.getAndSet(false) || rateLimiter.getPermission(rateLimiter.getRateLimiterConfig().getTimeoutDuration())) {
7064
actual.onNext(t);
7165
} else {
@@ -76,31 +70,19 @@ public void onNext(T t) {
7670
}
7771

7872
@Override
79-
public void onError(Throwable t) {
80-
requireNonNull(t);
81-
82-
if (isInvocationPermitted()) {
73+
public void hookOnError(Throwable t) {
74+
if (wasCallPermitted()) {
8375
actual.onError(t);
8476
}
8577
}
8678

8779
@Override
88-
public void onComplete() {
89-
if (isInvocationPermitted()) {
80+
public void hookOnComplete() {
81+
if (wasCallPermitted()) {
9082
actual.onComplete();
9183
}
9284
}
9385

94-
@Override
95-
public void request(long n) {
96-
subscription.request(n);
97-
}
98-
99-
@Override
100-
public void cancel() {
101-
super.cancel();
102-
}
103-
10486
private boolean acquireCallPermit() {
10587
boolean callPermitted = false;
10688
if (permitted.compareAndSet(Permit.PENDING, Permit.ACQUIRED)) {
@@ -112,12 +94,8 @@ private boolean acquireCallPermit() {
11294
return callPermitted;
11395
}
11496

115-
private boolean isInvocationPermitted() {
116-
return notCancelled() && wasCallPermitted();
117-
}
118-
11997
private boolean notCancelled() {
120-
return !this.isCancelled();
98+
return !this.isDisposed();
12199
}
122100

123101
private boolean wasCallPermitted() {

0 commit comments

Comments
 (0)