Skip to content

Commit a5c28cf

Browse files
Merge branch 'take-test' of git://github.com/johngmyers/RxJava into pull-212-take-merge
2 parents f1c54b5 + 6c1a1ab commit a5c28cf

File tree

4 files changed

+671
-141
lines changed

4 files changed

+671
-141
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import rx.operators.OperationDefer;
4242
import rx.operators.OperationDematerialize;
4343
import rx.operators.OperationFilter;
44+
import rx.operators.OperationTake;
45+
import rx.operators.OperationTakeWhile;
4446
import rx.operators.OperationWhere;
4547
import rx.operators.OperationMap;
4648
import rx.operators.OperationMaterialize;
@@ -54,7 +56,6 @@
5456
import rx.operators.OperationScan;
5557
import rx.operators.OperationSkip;
5658
import rx.operators.OperationSynchronize;
57-
import rx.operators.OperationTake;
5859
import rx.operators.OperationTakeLast;
5960
import rx.operators.OperationToObservableFuture;
6061
import rx.operators.OperationToObservableIterable;
@@ -1779,7 +1780,7 @@ public static <T> Observable<T> takeLast(final Observable<T> items, final int co
17791780
* @return
17801781
*/
17811782
public static <T> Observable<T> takeWhile(final Observable<T> items, Func1<T, Boolean> predicate) {
1782-
return create(OperationTake.takeWhile(items, predicate));
1783+
return create(OperationTakeWhile.takeWhile(items, predicate));
17831784
}
17841785

17851786
/**
@@ -1811,16 +1812,18 @@ public Boolean call(T t) {
18111812
* @return
18121813
*/
18131814
public static <T> Observable<T> takeWhileWithIndex(final Observable<T> items, Func2<T, Integer, Boolean> predicate) {
1814-
return create(OperationTake.takeWhileWithIndex(items, predicate));
1815+
return create(OperationTakeWhile.takeWhileWithIndex(items, predicate));
18151816
}
18161817

18171818
public static <T> Observable<T> takeWhileWithIndex(final Observable<T> items, Object predicate) {
18181819
@SuppressWarnings("rawtypes")
18191820
final FuncN _f = Functions.from(predicate);
18201821

1821-
return create(OperationTake.takeWhileWithIndex(items, new Func2<T, Integer, Boolean>() {
1822+
return create(OperationTakeWhile.takeWhileWithIndex(items, new Func2<T, Integer, Boolean>()
1823+
{
18221824
@Override
1823-
public Boolean call(T t, Integer integer) {
1825+
public Boolean call(T t, Integer integer)
1826+
{
18241827
return (Boolean) _f.call(t, integer);
18251828
}
18261829
}));

rxjava-core/src/main/java/rx/operators/OperationTake.java

Lines changed: 97 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,26 @@
1515
*/
1616
package rx.operators;
1717

18-
import static org.junit.Assert.*;
19-
import static org.mockito.Matchers.*;
20-
import static org.mockito.Mockito.*;
21-
22-
import java.util.concurrent.atomic.AtomicInteger;
23-
2418
import org.junit.Test;
25-
2619
import rx.Observable;
2720
import rx.Observer;
2821
import rx.Subscription;
22+
import rx.subscriptions.Subscriptions;
2923
import rx.util.AtomicObservableSubscription;
3024
import rx.util.functions.Func1;
31-
import rx.util.functions.Func2;
32-
import rx.subjects.Subject;
25+
26+
import java.util.concurrent.atomic.AtomicBoolean;
27+
import java.util.concurrent.atomic.AtomicInteger;
28+
29+
import static org.junit.Assert.assertTrue;
30+
import static org.junit.Assert.fail;
31+
import static org.mockito.Matchers.any;
32+
import static org.mockito.Mockito.mock;
33+
import static org.mockito.Mockito.never;
34+
import static org.mockito.Mockito.times;
35+
import static org.mockito.Mockito.verify;
36+
import static rx.testing.TrustedObservableTester.assertTrustedObservable;
37+
3338
/**
3439
* Returns a specified number of contiguous values from the start of an observable sequence.
3540
*/
@@ -43,61 +48,17 @@ public final class OperationTake {
4348
* @return
4449
*/
4550
public static <T> Func1<Observer<T>, Subscription> take(final Observable<T> items, final int num) {
46-
return takeWhileWithIndex(items, OperationTake.<T> numPredicate(num));
47-
}
48-
49-
/**
50-
* Returns a specified number of contiguous values from the start of an observable sequence.
51-
*
52-
* @param items
53-
* @param predicate
54-
* a function to test each source element for a condition
55-
* @return
56-
*/
57-
public static <T> Func1<Observer<T>, Subscription> takeWhile(final Observable<T> items, final Func1<T, Boolean> predicate) {
58-
return takeWhileWithIndex(items, OperationTake.<T> skipIndex(predicate));
59-
}
60-
61-
/**
62-
* Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values.
63-
*
64-
* @param items
65-
* @param predicate
66-
* a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false.
67-
* @return
68-
*/
69-
public static <T> Func1<Observer<T>, Subscription> takeWhileWithIndex(final Observable<T> items, final Func2<T, Integer, Boolean> predicate) {
7051
// wrap in a Func so that if a chain is built up, then asynchronously subscribed to twice we will have 2 instances of Take<T> rather than 1 handing both, which is not thread-safe.
7152
return new Func1<Observer<T>, Subscription>() {
7253

7354
@Override
7455
public Subscription call(Observer<T> observer) {
75-
return new TakeWhile<T>(items, predicate).call(observer);
56+
return new Take<T>(items, num).call(observer);
7657
}
7758

7859
};
7960
}
8061

81-
private static <T> Func2<T, Integer, Boolean> numPredicate(final int num) {
82-
return new Func2<T, Integer, Boolean>() {
83-
84-
@Override
85-
public Boolean call(T input, Integer index) {
86-
return index < num;
87-
}
88-
89-
};
90-
}
91-
92-
private static <T> Func2<T, Integer, Boolean> skipIndex(final Func1<T, Boolean> underlying) {
93-
return new Func2<T, Integer, Boolean>() {
94-
@Override
95-
public Boolean call(T input, Integer index) {
96-
return underlying.call(input);
97-
}
98-
};
99-
}
100-
10162
/**
10263
* This class is NOT thread-safe if invoked and referenced multiple times. In other words, don't subscribe to it multiple times from different threads.
10364
* <p>
@@ -109,19 +70,41 @@ public Boolean call(T input, Integer index) {
10970
*
11071
* @param <T>
11172
*/
112-
private static class TakeWhile<T> implements Func1<Observer<T>, Subscription> {
73+
private static class Take<T> implements Func1<Observer<T>, Subscription> {
11374
private final AtomicInteger counter = new AtomicInteger();
11475
private final Observable<T> items;
115-
private final Func2<T, Integer, Boolean> predicate;
76+
private final int num;
11677
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
11778

118-
private TakeWhile(Observable<T> items, Func2<T, Integer, Boolean> predicate) {
79+
private Take(Observable<T> items, int num) {
11980
this.items = items;
120-
this.predicate = predicate;
81+
this.num = num;
12182
}
12283

12384
@Override
12485
public Subscription call(Observer<T> observer) {
86+
if (num < 1) {
87+
items.subscribe(new Observer<T>()
88+
{
89+
@Override
90+
public void onCompleted()
91+
{
92+
}
93+
94+
@Override
95+
public void onError(Exception e)
96+
{
97+
}
98+
99+
@Override
100+
public void onNext(T args)
101+
{
102+
}
103+
}).unsubscribe();
104+
observer.onCompleted();
105+
return Subscriptions.empty();
106+
}
107+
125108
return subscription.wrap(items.subscribe(new ItemObserver(observer)));
126109
}
127110

@@ -134,20 +117,28 @@ public ItemObserver(Observer<T> observer) {
134117

135118
@Override
136119
public void onCompleted() {
137-
observer.onCompleted();
120+
if (counter.getAndSet(num) < num) {
121+
observer.onCompleted();
122+
}
138123
}
139124

140125
@Override
141126
public void onError(Exception e) {
142-
observer.onError(e);
127+
if (counter.getAndSet(num) < num) {
128+
observer.onError(e);
129+
}
143130
}
144131

145132
@Override
146133
public void onNext(T args) {
147-
if (predicate.call(args, counter.getAndIncrement())) {
134+
final int count = counter.incrementAndGet();
135+
if (count <= num) {
148136
observer.onNext(args);
149-
} else {
150-
observer.onCompleted();
137+
if (count == num) {
138+
observer.onCompleted();
139+
}
140+
}
141+
if (count >= num) {
151142
// this will work if the sequence is asynchronous, it will have no effect on a synchronous observable
152143
subscription.unsubscribe();
153144
}
@@ -160,65 +151,9 @@ public void onNext(T args) {
160151
public static class UnitTest {
161152

162153
@Test
163-
public void testTakeWhile1() {
164-
Observable<Integer> w = Observable.toObservable(1, 2, 3);
165-
Observable<Integer> take = Observable.create(takeWhile(w, new Func1<Integer, Boolean>() {
166-
@Override
167-
public Boolean call(Integer input) {
168-
return input < 3;
169-
}
170-
}));
171-
172-
@SuppressWarnings("unchecked")
173-
Observer<Integer> aObserver = mock(Observer.class);
174-
take.subscribe(aObserver);
175-
verify(aObserver, times(1)).onNext(1);
176-
verify(aObserver, times(1)).onNext(2);
177-
verify(aObserver, never()).onNext(3);
178-
verify(aObserver, never()).onError(any(Exception.class));
179-
verify(aObserver, times(1)).onCompleted();
180-
}
181-
182-
@Test
183-
public void testTakeWhileOnSubject1() {
184-
Subject<Integer> s = Subject.create();
185-
Observable<Integer> w = (Observable<Integer>)s;
186-
Observable<Integer> take = Observable.create(takeWhile(w, new Func1<Integer, Boolean>() {
187-
@Override
188-
public Boolean call(Integer input) {
189-
return input < 3;
190-
}
191-
}));
192-
193-
@SuppressWarnings("unchecked")
194-
Observer<Integer> aObserver = mock(Observer.class);
195-
take.subscribe(aObserver);
196-
197-
s.onNext(1);
198-
s.onNext(2);
199-
s.onNext(3);
200-
s.onNext(4);
201-
s.onNext(5);
202-
s.onCompleted();
203-
204-
verify(aObserver, times(1)).onNext(1);
205-
verify(aObserver, times(1)).onNext(2);
206-
verify(aObserver, never()).onNext(3);
207-
verify(aObserver, never()).onNext(4);
208-
verify(aObserver, never()).onNext(5);
209-
verify(aObserver, never()).onError(any(Exception.class));
210-
verify(aObserver, times(1)).onCompleted();
211-
}
212-
213-
@Test
214-
public void testTakeWhile2() {
154+
public void testTake1() {
215155
Observable<String> w = Observable.toObservable("one", "two", "three");
216-
Observable<String> take = Observable.create(takeWhileWithIndex(w, new Func2<String, Integer, Boolean>() {
217-
@Override
218-
public Boolean call(String input, Integer index) {
219-
return index < 2;
220-
}
221-
}));
156+
Observable<String> take = Observable.create(assertTrustedObservable(take(w, 2)));
222157

223158
@SuppressWarnings("unchecked")
224159
Observer<String> aObserver = mock(Observer.class);
@@ -231,33 +166,59 @@ public Boolean call(String input, Integer index) {
231166
}
232167

233168
@Test
234-
public void testTake1() {
169+
public void testTake2() {
235170
Observable<String> w = Observable.toObservable("one", "two", "three");
236-
Observable<String> take = Observable.create(take(w, 2));
171+
Observable<String> take = Observable.create(assertTrustedObservable(take(w, 1)));
237172

238173
@SuppressWarnings("unchecked")
239174
Observer<String> aObserver = mock(Observer.class);
240175
take.subscribe(aObserver);
241176
verify(aObserver, times(1)).onNext("one");
242-
verify(aObserver, times(1)).onNext("two");
177+
verify(aObserver, never()).onNext("two");
243178
verify(aObserver, never()).onNext("three");
244179
verify(aObserver, never()).onError(any(Exception.class));
245180
verify(aObserver, times(1)).onCompleted();
246181
}
247182

248183
@Test
249-
public void testTake2() {
250-
Observable<String> w = Observable.toObservable("one", "two", "three");
251-
Observable<String> take = Observable.create(take(w, 1));
184+
public void testTakeDoesntLeakErrors() {
185+
Observable<String> source = Observable.create(new Func1<Observer<String>, Subscription>()
186+
{
187+
@Override
188+
public Subscription call(Observer<String> observer)
189+
{
190+
observer.onNext("one");
191+
observer.onError(new Exception("test failed"));
192+
return Subscriptions.empty();
193+
}
194+
});
195+
Observable.create(assertTrustedObservable(take(source, 1))).last();
196+
}
252197

253-
@SuppressWarnings("unchecked")
254-
Observer<String> aObserver = mock(Observer.class);
255-
take.subscribe(aObserver);
256-
verify(aObserver, times(1)).onNext("one");
257-
verify(aObserver, never()).onNext("two");
258-
verify(aObserver, never()).onNext("three");
259-
verify(aObserver, never()).onError(any(Exception.class));
260-
verify(aObserver, times(1)).onCompleted();
198+
@Test
199+
public void testTakeZeroDoesntLeakError() {
200+
final AtomicBoolean subscribed = new AtomicBoolean(false);
201+
final AtomicBoolean unSubscribed = new AtomicBoolean(false);
202+
Observable<String> source = Observable.create(new Func1<Observer<String>, Subscription>()
203+
{
204+
@Override
205+
public Subscription call(Observer<String> observer)
206+
{
207+
subscribed.set(true);
208+
observer.onError(new Exception("test failed"));
209+
return new Subscription()
210+
{
211+
@Override
212+
public void unsubscribe()
213+
{
214+
unSubscribed.set(true);
215+
}
216+
};
217+
}
218+
});
219+
Observable.create(assertTrustedObservable(take(source, 0))).lastOrDefault("ok");
220+
assertTrue("source subscribed", subscribed.get());
221+
assertTrue("source unsubscribed", unSubscribed.get());
261222
}
262223

263224
@Test
@@ -267,7 +228,7 @@ public void testUnsubscribeAfterTake() {
267228

268229
@SuppressWarnings("unchecked")
269230
Observer<String> aObserver = mock(Observer.class);
270-
Observable<String> take = Observable.create(take(w, 1));
231+
Observable<String> take = Observable.create(assertTrustedObservable(take(w, 1)));
271232
take.subscribe(aObserver);
272233

273234
// wait for the Observable to complete

0 commit comments

Comments
 (0)