Skip to content

Commit d3236e2

Browse files
madgnomeRobWin
authored andcommitted
ReactiveX#212 reactor.core.Exceptions$BubblingException thrown if exception is… (ReactiveX#213)
* ReactiveX#212 reactor.core.Exceptions$BubblingException thrown if exception is thrown in subscribe * ReactiveX#35 Fix Codacity issues
1 parent 3d792f4 commit d3236e2

File tree

11 files changed

+426
-123
lines changed

11 files changed

+426
-123
lines changed
Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
/*
2+
* Copyright 2018 Julien Hoarau
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.github.resilience4j.reactor;
17+
18+
import org.reactivestreams.Subscription;
19+
import reactor.core.CoreSubscriber;
20+
import reactor.core.Disposable;
21+
import reactor.core.Exceptions;
22+
import reactor.core.publisher.Operators;
23+
import reactor.core.publisher.SignalType;
24+
import reactor.util.context.Context;
25+
26+
import java.util.Objects;
27+
import java.util.concurrent.atomic.AtomicReference;
28+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
29+
30+
/**
31+
* Heavily inspired by {@link reactor.core.publisher.BaseSubscriber}
32+
*
33+
* @param <T>
34+
*/
35+
public abstract class ResilienceBaseSubscriber<T> implements CoreSubscriber<T>, Subscription,
36+
Disposable {
37+
38+
protected final CoreSubscriber<? super T> actual;
39+
40+
private volatile Subscription subscription;
41+
42+
private static final AtomicReferenceFieldUpdater<ResilienceBaseSubscriber, Subscription> S =
43+
AtomicReferenceFieldUpdater.newUpdater(ResilienceBaseSubscriber.class, Subscription.class, "subscription");
44+
45+
private final AtomicReference<Permit> permitted = new AtomicReference<>(Permit.PENDING);
46+
47+
protected ResilienceBaseSubscriber(CoreSubscriber<? super T> actual) {
48+
this.actual = actual;
49+
}
50+
51+
/**
52+
* Return current {@link Subscription}
53+
* @return current {@link Subscription}
54+
*/
55+
protected Subscription upstream() {
56+
return subscription;
57+
}
58+
59+
@Override
60+
public boolean isDisposed() {
61+
return subscription == Operators.cancelledSubscription();
62+
}
63+
64+
/**
65+
* {@link Disposable#dispose() Dispose} the {@link Subscription} by
66+
* {@link Subscription#cancel() cancelling} it.
67+
*/
68+
@Override
69+
public void dispose() {
70+
cancel();
71+
}
72+
73+
@Override
74+
public Context currentContext() {
75+
return actual.currentContext();
76+
}
77+
78+
protected boolean notCancelled() {
79+
return !this.isDisposed();
80+
}
81+
82+
/**
83+
* Hook for further processing of onSubscribe's Subscription.
84+
*
85+
* @param subscription the subscription to optionally process
86+
*/
87+
protected void hookOnSubscribe(Subscription subscription){
88+
// NO-OP
89+
}
90+
91+
/**
92+
* Hook for processing of onNext values. You can call {@link #request(long)} here
93+
* to further request data from the source {@link org.reactivestreams.Publisher} if
94+
* the {@link #hookOnSubscribe(Subscription) initial request} wasn't unbounded.
95+
* <p>Defaults to doing nothing.
96+
*
97+
* @param value the emitted value to process
98+
*/
99+
protected void hookOnNext(T value){
100+
// NO-OP
101+
}
102+
103+
/**
104+
* Optional hook for completion processing. Defaults to doing nothing.
105+
*/
106+
protected void hookOnComplete() {
107+
// NO-OP
108+
}
109+
110+
/**
111+
* Optional hook for error processing. Default is to call
112+
* {@link Exceptions#errorCallbackNotImplemented(Throwable)}.
113+
*
114+
* @param throwable the error to process
115+
*/
116+
protected void hookOnError(Throwable throwable) {
117+
throw Exceptions.errorCallbackNotImplemented(throwable);
118+
}
119+
120+
/**
121+
* Optional hook executed when the subscription is cancelled by calling this
122+
* Subscriber's {@link #cancel()} method. Defaults to doing nothing.
123+
*/
124+
protected void hookOnCancel() {
125+
//NO-OP
126+
}
127+
128+
/**
129+
* Optional hook executed after any of the termination events (onError, onComplete,
130+
* cancel). The hook is executed in addition to and after {@link #hookOnError(Throwable)},
131+
* {@link #hookOnComplete()} and {@link #hookOnCancel()} hooks, even if these callbacks
132+
* fail. Defaults to doing nothing. A failure of the callback will be caught by
133+
* {@link Operators#onErrorDropped(Throwable, reactor.util.context.Context)}.
134+
*
135+
* @param type the type of termination event that triggered the hook
136+
* ({@link SignalType#ON_ERROR}, {@link SignalType#ON_COMPLETE} or
137+
* {@link SignalType#CANCEL})
138+
*/
139+
protected void hookFinally(SignalType type) {
140+
//NO-OP
141+
}
142+
143+
/**
144+
* Optional hook executed when permit call is acquired.
145+
*/
146+
protected void hookOnPermitAcquired() {
147+
//NO-OP
148+
}
149+
150+
/**
151+
* @return true if call is permitted, false otherwise
152+
*/
153+
protected abstract boolean isCallPermitted();
154+
155+
protected boolean acquireCallPermit() {
156+
boolean callPermitted = false;
157+
if (permitted.compareAndSet(Permit.PENDING, Permit.ACQUIRED)) {
158+
callPermitted = isCallPermitted();
159+
if (!callPermitted) {
160+
permitted.set(Permit.REJECTED);
161+
} else {
162+
hookOnPermitAcquired();
163+
}
164+
}
165+
return callPermitted;
166+
}
167+
168+
protected boolean wasCallPermitted() {
169+
return permitted.get() == Permit.ACQUIRED;
170+
}
171+
172+
protected abstract Throwable getThrowable();
173+
174+
@Override
175+
public final void onSubscribe(Subscription s) {
176+
if (Operators.setOnce(S, this, s)) {
177+
try {
178+
hookOnSubscribe(s);
179+
if (acquireCallPermit()) {
180+
actual.onSubscribe(this);
181+
} else {
182+
cancel();
183+
actual.onSubscribe(this);
184+
actual.onError(getThrowable());
185+
}
186+
}
187+
catch (Throwable throwable) {
188+
onError(Operators.onOperatorError(s, throwable, currentContext()));
189+
}
190+
}
191+
}
192+
193+
@Override
194+
public final void onNext(T value) {
195+
Objects.requireNonNull(value, "onNext");
196+
try {
197+
hookOnNext(value);
198+
}
199+
catch (Throwable throwable) {
200+
onError(Operators.onOperatorError(subscription, throwable, value, currentContext()));
201+
}
202+
}
203+
204+
@Override
205+
public final void onError(Throwable t) {
206+
Objects.requireNonNull(t, "onError");
207+
208+
if (S.getAndSet(this, Operators.cancelledSubscription()) == Operators
209+
.cancelledSubscription()) {
210+
211+
// already cancelled concurrently...
212+
if (permitted.get() == Permit.REJECTED) {
213+
// Ignore if the call was rejected
214+
return;
215+
}
216+
217+
// signal on error dropped otherwise
218+
Operators.onErrorDropped(t, currentContext());
219+
return;
220+
}
221+
222+
223+
try {
224+
hookOnError(t);
225+
}
226+
catch (Throwable e) {
227+
e = Exceptions.addSuppressed(e, t);
228+
Operators.onErrorDropped(e, currentContext());
229+
}
230+
finally {
231+
safeHookFinally(SignalType.ON_ERROR);
232+
}
233+
}
234+
235+
@Override
236+
public final void onComplete() {
237+
if (S.getAndSet(this, Operators.cancelledSubscription()) != Operators
238+
.cancelledSubscription()) {
239+
//we're sure it has not been concurrently cancelled
240+
try {
241+
hookOnComplete();
242+
}
243+
catch (Throwable throwable) {
244+
//onError itself will short-circuit due to the CancelledSubscription being push above
245+
hookOnError(Operators.onOperatorError(throwable, currentContext()));
246+
}
247+
finally {
248+
safeHookFinally(SignalType.ON_COMPLETE);
249+
}
250+
}
251+
}
252+
253+
@Override
254+
public final void request(long n) {
255+
if (Operators.validate(n)) {
256+
Subscription s = this.subscription;
257+
if (s != null) {
258+
s.request(n);
259+
}
260+
}
261+
}
262+
263+
/**
264+
* {@link #request(long) Request} an unbounded amount.
265+
*/
266+
public final void requestUnbounded() {
267+
request(Long.MAX_VALUE);
268+
}
269+
270+
@Override
271+
public final void cancel() {
272+
if (Operators.terminate(S, this)) {
273+
try {
274+
hookOnCancel();
275+
}
276+
catch (Throwable throwable) {
277+
hookOnError(Operators.onOperatorError(subscription, throwable, currentContext()));
278+
}
279+
finally {
280+
safeHookFinally(SignalType.CANCEL);
281+
}
282+
}
283+
}
284+
285+
private void safeHookFinally(SignalType type) {
286+
try {
287+
hookFinally(type);
288+
}
289+
catch (Throwable finallyFailure) {
290+
Operators.onErrorDropped(finallyFailure, currentContext());
291+
}
292+
}
293+
294+
@Override
295+
public String toString() {
296+
return getClass().getSimpleName();
297+
}
298+
}

resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/bulkhead/operator/BulkheadSubscriber.java

Lines changed: 13 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,9 @@
1717

1818
import io.github.resilience4j.bulkhead.Bulkhead;
1919
import io.github.resilience4j.bulkhead.BulkheadFullException;
20-
import io.github.resilience4j.reactor.Permit;
20+
import io.github.resilience4j.reactor.ResilienceBaseSubscriber;
2121
import org.reactivestreams.Subscriber;
22-
import org.reactivestreams.Subscription;
2322
import reactor.core.CoreSubscriber;
24-
import reactor.core.publisher.BaseSubscriber;
25-
26-
import java.util.concurrent.atomic.AtomicReference;
2723

2824
import static java.util.Objects.requireNonNull;
2925

@@ -32,30 +28,16 @@
3228
*
3329
* @param <T> the value type of the upstream and downstream
3430
*/
35-
class BulkheadSubscriber<T> extends BaseSubscriber<T> {
31+
class BulkheadSubscriber<T> extends ResilienceBaseSubscriber<T> {
3632

37-
private final CoreSubscriber<? super T> actual;
3833
private final Bulkhead bulkhead;
39-
private final AtomicReference<Permit> permitted = new AtomicReference<>(Permit.PENDING);
4034

4135
public BulkheadSubscriber(Bulkhead bulkhead,
4236
CoreSubscriber<? super T> actual) {
43-
this.actual = actual;
37+
super(actual);
4438
this.bulkhead = requireNonNull(bulkhead);
4539
}
4640

47-
@Override
48-
public void hookOnSubscribe(Subscription subscription) {
49-
if (acquireCallPermit()) {
50-
actual.onSubscribe(this);
51-
} else {
52-
cancel();
53-
actual.onSubscribe(this);
54-
actual.onError(new BulkheadFullException(
55-
String.format("Bulkhead '%s' is full", bulkhead.getName())));
56-
}
57-
}
58-
5941
@Override
6042
public void hookOnNext(T t) {
6143
if (notCancelled() && wasCallPermitted()) {
@@ -71,6 +53,16 @@ public void hookOnError(Throwable t) {
7153
}
7254
}
7355

56+
@Override
57+
protected boolean isCallPermitted() {
58+
return bulkhead.isCallPermitted();
59+
}
60+
61+
@Override
62+
protected Throwable getThrowable() {
63+
return new BulkheadFullException(String.format("Bulkhead '%s' is full", bulkhead.getName()));
64+
}
65+
7466
@Override
7567
public void hookOnComplete() {
7668
if (wasCallPermitted()) {
@@ -79,25 +71,6 @@ public void hookOnComplete() {
7971
}
8072
}
8173

82-
private boolean acquireCallPermit() {
83-
boolean callPermitted = false;
84-
if (permitted.compareAndSet(Permit.PENDING, Permit.ACQUIRED)) {
85-
callPermitted = bulkhead.isCallPermitted();
86-
if (!callPermitted) {
87-
permitted.set(Permit.REJECTED);
88-
}
89-
}
90-
return callPermitted;
91-
}
92-
93-
private boolean notCancelled() {
94-
return !this.isDisposed();
95-
}
96-
97-
private boolean wasCallPermitted() {
98-
return permitted.get() == Permit.ACQUIRED;
99-
}
100-
10174
private void releaseBulkhead() {
10275
if (wasCallPermitted()) {
10376
bulkhead.onComplete();

0 commit comments

Comments
 (0)