From dfc784126f259361f01f1927f44f5d1aa4e49a43 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Thu, 14 Mar 2013 14:53:33 +0200 Subject: [PATCH 01/17] Naive schedulers implementation --- rxjava-core/src/main/java/rx/Observable.java | 43 ++++------ rxjava-core/src/main/java/rx/Scheduler.java | 20 +++++ .../rx/concurrency/AbstractScheduler.java | 43 ++++++++++ .../concurrency/CurrentThreadScheduler.java | 83 +++++++++++++++++++ .../java/rx/concurrency/DelayedAction.java | 35 ++++++++ .../rx/concurrency/DiscardableAction.java | 35 ++++++++ .../rx/concurrency/ExecutorScheduler.java | 29 +++++++ .../rx/concurrency/ImmediateScheduler.java | 22 +++++ .../rx/concurrency/NewThreadScheduler.java | 30 +++++++ .../main/java/rx/concurrency/Schedulers.java | 27 ++++++ .../rx/observables/ScheduledObserver.java | 45 ++++++++++ .../java/rx/operators/OperationObserveOn.java | 71 ++++++++++++++++ .../rx/operators/OperationSubscribeOn.java | 56 +++++++++++++ 13 files changed, 513 insertions(+), 26 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/Scheduler.java create mode 100644 rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java create mode 100644 rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java create mode 100644 rxjava-core/src/main/java/rx/concurrency/DelayedAction.java create mode 100644 rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java create mode 100644 rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java create mode 100644 rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java create mode 100644 rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java create mode 100644 rxjava-core/src/main/java/rx/concurrency/Schedulers.java create mode 100644 rxjava-core/src/main/java/rx/observables/ScheduledObserver.java create mode 100644 rxjava-core/src/main/java/rx/operators/OperationObserveOn.java create mode 100644 rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index e9015b0379..8a4a254366 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -36,32 +36,7 @@ import org.mockito.MockitoAnnotations; import rx.observables.GroupedObservable; -import rx.operators.OperationConcat; -import rx.operators.OperationDefer; -import rx.operators.OperationDematerialize; -import rx.operators.OperationFilter; -import rx.operators.OperationMap; -import rx.operators.OperationMaterialize; -import rx.operators.OperationMerge; -import rx.operators.OperationMergeDelayError; -import rx.operators.OperationMostRecent; -import rx.operators.OperationNext; -import rx.operators.OperationOnErrorResumeNextViaFunction; -import rx.operators.OperationOnErrorResumeNextViaObservable; -import rx.operators.OperationOnErrorReturn; -import rx.operators.OperationScan; -import rx.operators.OperationSkip; -import rx.operators.OperationSynchronize; -import rx.operators.OperationTake; -import rx.operators.OperationTakeLast; -import rx.operators.OperationToObservableFuture; -import rx.operators.OperationToObservableIterable; -import rx.operators.OperationToObservableList; -import rx.operators.OperationToObservableSortedList; -import rx.operators.OperationZip; -import rx.operators.OperatorGroupBy; -import rx.operators.OperatorTakeUntil; -import rx.operators.OperatorToIterator; +import rx.operators.*; import rx.plugins.RxJavaErrorHandler; import rx.plugins.RxJavaPlugins; import rx.subscriptions.Subscriptions; @@ -766,6 +741,14 @@ public static Observable range(int start, int count) { return from(Range.createWithCount(start, count)); } + public static Observable subscribeOn(Observable source, Scheduler scheduler) { + return _create(OperationSubscribeOn.subscribeOn(source, scheduler)); + } + + public static Observable observeOn(Observable source, Scheduler scheduler) { + return _create(OperationObserveOn.observeOn(source, scheduler)); + } + /** * Returns an observable sequence that invokes the observable factory whenever a new observer subscribes. * The Defer operator allows you to defer or delay the creation of the sequence until the time when an observer @@ -2589,6 +2572,14 @@ public Observable> materialize() { return materialize(this); } + public Observable subscribeOn(Scheduler scheduler) { + return subscribeOn(this, scheduler); + } + + public Observable observeOn(Scheduler scheduler) { + return observeOn(this, scheduler); + } + /** * Dematerializes the explicit notification values of an observable sequence as implicit notifications. * diff --git a/rxjava-core/src/main/java/rx/Scheduler.java b/rxjava-core/src/main/java/rx/Scheduler.java new file mode 100644 index 0000000000..ea6a234dca --- /dev/null +++ b/rxjava-core/src/main/java/rx/Scheduler.java @@ -0,0 +1,20 @@ +package rx; + +import rx.util.functions.Action0; +import rx.util.functions.Func0; + +import java.util.concurrent.TimeUnit; + +public interface Scheduler { + + Subscription schedule(Action0 action); + + Subscription schedule(Func0 action); + + Subscription schedule(Action0 action, long timespan, TimeUnit unit); + + Subscription schedule(Func0 action, long timespan, TimeUnit unit); + + long now(); + +} diff --git a/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java new file mode 100644 index 0000000000..d4620cb482 --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java @@ -0,0 +1,43 @@ +package rx.concurrency; + +import rx.Scheduler; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Func0; + +import java.util.concurrent.TimeUnit; + +public abstract class AbstractScheduler implements Scheduler { + + @Override + public Subscription schedule(Action0 action) { + return schedule(asFunc0(action)); + } + + @Override + public Subscription schedule(Action0 action, long timespan, TimeUnit unit) { + return schedule(asFunc0(action), timespan, unit); + } + + @Override + public Subscription schedule(Func0 action, long timespan, TimeUnit unit) { + return schedule(new DelayedAction(action, this, timespan, unit)); + } + + @Override + public long now() { + return System.nanoTime(); + } + + private static Func0 asFunc0(final Action0 action) { + return new Func0() { + @Override + public Subscription call() { + action.call(); + return Subscriptions.empty(); + } + }; + } + +} diff --git a/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java new file mode 100644 index 0000000000..e15290e536 --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java @@ -0,0 +1,83 @@ +package rx.concurrency; + +import org.junit.Test; +import rx.Scheduler; +import rx.Subscription; +import rx.util.functions.Action0; +import rx.util.functions.Func0; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.TimeUnit; + +public class CurrentThreadScheduler extends AbstractScheduler { + private static final CurrentThreadScheduler INSTANCE = new CurrentThreadScheduler(); + public static CurrentThreadScheduler getInstance() { + return INSTANCE; + } + + private static final ThreadLocal> QUEUE = new ThreadLocal>(); + + private CurrentThreadScheduler() { + } + + @Override + public Subscription schedule(Func0 action) { + DiscardableAction discardableAction = new DiscardableAction(action); + enqueue(discardableAction); + return discardableAction; + } + + private void enqueue(DiscardableAction action) { + Queue queue = QUEUE.get(); + boolean exec = false; + + if (queue == null) { + queue = new LinkedList(); + QUEUE.set(queue); + exec = true; + } + + queue.add(action); + + while (exec && !queue.isEmpty()) { + queue.poll().call(); + } + } + + public static class UnitTest { + + @Test + public void testScheduler() { + final CurrentThreadScheduler scheduler = new CurrentThreadScheduler(); + + final Action0 firstAction = new Action0() { + @Override + public void call() { + System.out.println("First action start"); + System.out.println("First action end"); + } + }; + final Action0 secondAction = new Action0() { + @Override + public void call() { + System.out.println("Second action start"); + scheduler.schedule(firstAction); + System.out.println("Second action end"); + + } + }; + final Action0 thirdAction = new Action0() { + @Override + public void call() { + System.out.println("Third action start"); + scheduler.schedule(secondAction); + System.out.println("Third action end"); + } + }; + + scheduler.schedule(thirdAction); + } + + } +} diff --git a/rxjava-core/src/main/java/rx/concurrency/DelayedAction.java b/rxjava-core/src/main/java/rx/concurrency/DelayedAction.java new file mode 100644 index 0000000000..d83004ae34 --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/DelayedAction.java @@ -0,0 +1,35 @@ +package rx.concurrency; + +import rx.Scheduler; +import rx.Subscription; +import rx.util.functions.Action0; +import rx.util.functions.Func0; + +import java.util.concurrent.TimeUnit; + +public class DelayedAction implements Func0 { + private final Func0 underlying; + private final Scheduler scheduler; + private final long execTime; + + public DelayedAction(Func0 underlying, Scheduler scheduler, long timespan, TimeUnit timeUnit) { + this.underlying = underlying; + this.scheduler = scheduler; + this.execTime = scheduler.now() + timeUnit.toMillis(timespan); + } + + @Override + public Subscription call() { + if (execTime < scheduler.now()) { + try { + Thread.sleep(scheduler.now() - execTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + return underlying.call(); + + } +} diff --git a/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java b/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java new file mode 100644 index 0000000000..e9d4ee8379 --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java @@ -0,0 +1,35 @@ +package rx.concurrency; + +import rx.Subscription; +import rx.util.AtomicObservableSubscription; +import rx.util.functions.Func0; + +import java.util.concurrent.atomic.AtomicBoolean; + +public class DiscardableAction implements Func0, Subscription { + private final Func0 underlying; + + private final AtomicObservableSubscription wrapper = new AtomicObservableSubscription(); + private final AtomicBoolean ready = new AtomicBoolean(true); + + public DiscardableAction(Func0 underlying) { + this.underlying = underlying; + } + + @Override + public Subscription call() { + if (ready.compareAndSet(true, false)) { + Subscription subscription = underlying.call(); + wrapper.wrap(subscription); + return subscription; + } + return wrapper; + } + + @Override + public void unsubscribe() { + ready.set(false); + wrapper.unsubscribe(); + } +} + diff --git a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java new file mode 100644 index 0000000000..e27d6fea81 --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java @@ -0,0 +1,29 @@ +package rx.concurrency; + +import rx.Subscription; +import rx.util.functions.Func0; + +import java.util.concurrent.Executor; + +public class ExecutorScheduler extends AbstractScheduler { + private final Executor executor; + + public ExecutorScheduler(Executor executor) { + this.executor = executor; + } + + @Override + public Subscription schedule(Func0 action) { + final DiscardableAction discardableAction = new DiscardableAction(action); + + executor.execute(new Runnable() { + @Override + public void run() { + discardableAction.call(); + } + }); + + return discardableAction; + + } +} diff --git a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java new file mode 100644 index 0000000000..209bf964fd --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java @@ -0,0 +1,22 @@ +package rx.concurrency; + +import rx.Subscription; +import rx.util.functions.Func0; + +public final class ImmediateScheduler extends AbstractScheduler { + private static final ImmediateScheduler INSTANCE = new ImmediateScheduler(); + + private ImmediateScheduler() { + + } + + public static ImmediateScheduler getInstance() { + return INSTANCE; + } + + @Override + public Subscription schedule(Func0 action) { + return new DiscardableAction(action); + } + +} diff --git a/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java new file mode 100644 index 0000000000..deba7dec1e --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java @@ -0,0 +1,30 @@ +package rx.concurrency; + +import rx.Subscription; +import rx.util.functions.Func0; + +public class NewThreadScheduler extends AbstractScheduler { + private static final NewThreadScheduler INSTANCE = new NewThreadScheduler(); + + public static NewThreadScheduler getInstance() { + return INSTANCE; + } + + + @Override + public Subscription schedule(Func0 action) { + final DiscardableAction discardableAction = new DiscardableAction(action); + + Thread t = new Thread(new Runnable() { + @Override + public void run() { + discardableAction.call(); + } + }); + + t.start(); + + return discardableAction; + } + +} diff --git a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java new file mode 100644 index 0000000000..dee9327c04 --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java @@ -0,0 +1,27 @@ +package rx.concurrency; + +import rx.Scheduler; + +import java.util.concurrent.Executor; + +public class Schedulers { + private Schedulers() { + + } + + public static Scheduler immediate() { + return ImmediateScheduler.getInstance(); + } + + public static Scheduler currentThread() { + return CurrentThreadScheduler.getInstance(); + } + + public static Scheduler newThread() { + return NewThreadScheduler.getInstance(); + } + + public static Scheduler executor(Executor executor) { + return new ExecutorScheduler(executor); + } +} diff --git a/rxjava-core/src/main/java/rx/observables/ScheduledObserver.java b/rxjava-core/src/main/java/rx/observables/ScheduledObserver.java new file mode 100644 index 0000000000..90025d6db4 --- /dev/null +++ b/rxjava-core/src/main/java/rx/observables/ScheduledObserver.java @@ -0,0 +1,45 @@ +package rx.observables; + +import rx.Observer; +import rx.Scheduler; +import rx.util.functions.Action0; + +public class ScheduledObserver implements Observer { + private final Observer underlying; + private final Scheduler scheduler; + + public ScheduledObserver(Observer underlying, Scheduler scheduler) { + this.underlying = underlying; + this.scheduler = scheduler; + } + + @Override + public void onCompleted() { + scheduler.schedule(new Action0() { + @Override + public void call() { + underlying.onCompleted(); + } + }); + } + + @Override + public void onError(Exception e) { + scheduler.schedule(new Action0() { + @Override + public void call() { + underlying.onCompleted(); + } + }); + } + + @Override + public void onNext(T args) { + scheduler.schedule(new Action0() { + @Override + public void call() { + underlying.onCompleted(); + } + }); + } +} diff --git a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java new file mode 100644 index 0000000000..7ec96bc3f1 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java @@ -0,0 +1,71 @@ +package rx.operators; + +import rx.Observable; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.util.functions.Action0; +import rx.util.functions.Func0; +import rx.util.functions.Func1; + +public class OperationObserveOn { + + public static Func1, Subscription> observeOn(Observable source, Scheduler scheduler) { + return new ObserveOn(source, scheduler); + } + + private static class ObserveOn implements Func1, Subscription> { + private final Observable source; + private final Scheduler scheduler; + + public ObserveOn(Observable source, Scheduler scheduler) { + this.source = source; + this.scheduler = scheduler; + } + + @Override + public Subscription call(final Observer observer) { + return source.subscribe(new ScheduledObserver(observer, scheduler)); + } + } + + private static class ScheduledObserver implements Observer { + private final Observer underlying; + private final Scheduler scheduler; + + public ScheduledObserver(Observer underlying, Scheduler scheduler) { + this.underlying = underlying; + this.scheduler = scheduler; + } + + @Override + public void onCompleted() { + scheduler.schedule(new Action0() { + @Override + public void call() { + underlying.onCompleted(); + } + }); + } + + @Override + public void onError(Exception e) { + scheduler.schedule(new Action0() { + @Override + public void call() { + underlying.onCompleted(); + } + }); + } + + @Override + public void onNext(T args) { + scheduler.schedule(new Action0() { + @Override + public void call() { + underlying.onCompleted(); + } + }); + } + } +} diff --git a/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java b/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java new file mode 100644 index 0000000000..b16c0263d0 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java @@ -0,0 +1,56 @@ +package rx.operators; + +import rx.Observable; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.util.functions.Action0; +import rx.util.functions.Func0; +import rx.util.functions.Func1; + +public class OperationSubscribeOn { + + public static Func1, Subscription> subscribeOn(Observable source, Scheduler scheduler) { + return new SubscribeOn(source, scheduler); + } + + private static class SubscribeOn implements Func1, Subscription> { + private final Observable source; + private final Scheduler scheduler; + + public SubscribeOn(Observable source, Scheduler scheduler) { + this.source = source; + this.scheduler = scheduler; + } + + @Override + public Subscription call(final Observer observer) { + return scheduler.schedule(new Func0() { + @Override + public Subscription call() { + return new ScheduledSubscription(source.subscribe(observer), scheduler); + } + }); + } + } + + private static class ScheduledSubscription implements Subscription { + private final Subscription underlying; + private final Scheduler scheduler; + + private ScheduledSubscription(Subscription underlying, Scheduler scheduler) { + this.underlying = underlying; + this.scheduler = scheduler; + } + + @Override + public void unsubscribe() { + scheduler.schedule(new Action0() { + @Override + public void call() { + underlying.unsubscribe(); + } + }); + } + } +} From 0aa6ca2f4d21eefdcffa17a4da0449744bb14742 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Mon, 18 Mar 2013 17:01:06 +0200 Subject: [PATCH 02/17] Added ScheduledExecutorServiceScheduler --- .../ScheduledExecutorServiceScheduler.java | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java diff --git a/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java new file mode 100644 index 0000000000..1f788e9c8d --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java @@ -0,0 +1,34 @@ +package rx.concurrency; + +import rx.Subscription; +import rx.util.functions.Func0; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +// TODO [@mairbek] silly name +public class ScheduledExecutorServiceScheduler extends AbstractScheduler { + private final ScheduledExecutorService executorService; + + public ScheduledExecutorServiceScheduler(ScheduledExecutorService executorService) { + this.executorService = executorService; + } + + @Override + public Subscription schedule(Func0 action) { + return schedule(action, 0, TimeUnit.MILLISECONDS); + } + + @Override + public Subscription schedule(Func0 action, long timespan, TimeUnit unit) { + final DiscardableAction discardableAction = new DiscardableAction(action); + executorService.schedule(new Runnable() { + @Override + public void run() { + discardableAction.call(); + } + }, timespan, unit); + return discardableAction; + } + +} From 1896da37a91e4d1c1451847decfa9f3b48cb12bc Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Mon, 18 Mar 2013 17:02:23 +0200 Subject: [PATCH 03/17] Added to Schedulers --- rxjava-core/src/main/java/rx/concurrency/Schedulers.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java index dee9327c04..d198d8cc98 100644 --- a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java +++ b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java @@ -3,6 +3,7 @@ import rx.Scheduler; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; public class Schedulers { private Schedulers() { @@ -24,4 +25,8 @@ public static Scheduler newThread() { public static Scheduler executor(Executor executor) { return new ExecutorScheduler(executor); } + + public static Scheduler scheduledExecutor(ScheduledExecutorService executor) { + return new ScheduledExecutorServiceScheduler(executor); + } } From 9eb111e1c7bc65c994768a6ed8afd7b9842662bd Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Mon, 18 Mar 2013 17:57:49 +0200 Subject: [PATCH 04/17] More tests --- .../rx/concurrency/AbstractScheduler.java | 2 +- .../concurrency/CurrentThreadScheduler.java | 66 +++++++++++++---- .../rx/concurrency/ImmediateScheduler.java | 73 +++++++++++++++++-- ...DelayedAction.java => SleepingAction.java} | 4 +- 4 files changed, 123 insertions(+), 22 deletions(-) rename rxjava-core/src/main/java/rx/concurrency/{DelayedAction.java => SleepingAction.java} (82%) diff --git a/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java index d4620cb482..34ad11ab0e 100644 --- a/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java @@ -22,7 +22,7 @@ public Subscription schedule(Action0 action, long timespan, TimeUnit unit) { @Override public Subscription schedule(Func0 action, long timespan, TimeUnit unit) { - return schedule(new DelayedAction(action, this, timespan, unit)); + return schedule(new SleepingAction(action, this, timespan, unit)); } @Override diff --git a/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java index e15290e536..be2659d060 100644 --- a/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java @@ -1,17 +1,19 @@ package rx.concurrency; import org.junit.Test; -import rx.Scheduler; +import org.mockito.InOrder; import rx.Subscription; import rx.util.functions.Action0; import rx.util.functions.Func0; import java.util.LinkedList; import java.util.Queue; -import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.*; public class CurrentThreadScheduler extends AbstractScheduler { private static final CurrentThreadScheduler INSTANCE = new CurrentThreadScheduler(); + public static CurrentThreadScheduler getInstance() { return INSTANCE; } @@ -30,53 +32,89 @@ public Subscription schedule(Func0 action) { private void enqueue(DiscardableAction action) { Queue queue = QUEUE.get(); - boolean exec = false; + boolean exec = queue == null; - if (queue == null) { + if (exec) { queue = new LinkedList(); QUEUE.set(queue); - exec = true; } queue.add(action); - while (exec && !queue.isEmpty()) { - queue.poll().call(); + if (exec) { + while (!queue.isEmpty()) { + queue.poll().call(); + } + + QUEUE.set(null); } } public static class UnitTest { @Test - public void testScheduler() { + public void testOrdering() { final CurrentThreadScheduler scheduler = new CurrentThreadScheduler(); + final Action0 firstStepStart = mock(Action0.class); + final Action0 firstStepEnd = mock(Action0.class); + + final Action0 secondStepStart = mock(Action0.class); + final Action0 secondStepEnd = mock(Action0.class); + + final Action0 thirdStepStart = mock(Action0.class); + final Action0 thirdStepEnd = mock(Action0.class); + final Action0 firstAction = new Action0() { @Override public void call() { - System.out.println("First action start"); - System.out.println("First action end"); + firstStepStart.call(); + firstStepEnd.call(); } }; final Action0 secondAction = new Action0() { @Override public void call() { - System.out.println("Second action start"); + secondStepStart.call(); scheduler.schedule(firstAction); - System.out.println("Second action end"); + secondStepEnd.call(); } }; final Action0 thirdAction = new Action0() { @Override public void call() { - System.out.println("Third action start"); + thirdStepStart.call(); scheduler.schedule(secondAction); - System.out.println("Third action end"); + thirdStepEnd.call(); } }; + InOrder inOrder = inOrder(firstStepStart, firstStepEnd, secondStepStart, secondStepEnd, thirdStepStart, thirdStepEnd); + scheduler.schedule(thirdAction); + + inOrder.verify(thirdStepStart, times(1)).call(); + inOrder.verify(thirdStepEnd, times(1)).call(); + inOrder.verify(secondStepStart, times(1)).call(); + inOrder.verify(secondStepEnd, times(1)).call(); + inOrder.verify(firstStepStart, times(1)).call(); + inOrder.verify(firstStepEnd, times(1)).call(); + } + + @Test + public void testSequenceOfActions() { + final CurrentThreadScheduler scheduler = new CurrentThreadScheduler(); + + final Action0 first = mock(Action0.class); + final Action0 second = mock(Action0.class); + + scheduler.schedule(first); + scheduler.schedule(second); + + verify(first, times(1)).call(); + verify(second, times(1)).call(); + } } diff --git a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java index 209bf964fd..b49f6b3352 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java @@ -1,22 +1,85 @@ package rx.concurrency; +import org.junit.Test; +import org.mockito.InOrder; import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; import rx.util.functions.Func0; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; + public final class ImmediateScheduler extends AbstractScheduler { private static final ImmediateScheduler INSTANCE = new ImmediateScheduler(); - private ImmediateScheduler() { - - } - public static ImmediateScheduler getInstance() { return INSTANCE; } + private ImmediateScheduler() { + } + @Override public Subscription schedule(Func0 action) { - return new DiscardableAction(action); + action.call(); + return Subscriptions.empty(); } + public static class UnitTest { + + @Test + public void testOrdering() { + final ImmediateScheduler scheduler = new ImmediateScheduler(); + + final Action0 firstStepStart = mock(Action0.class); + final Action0 firstStepEnd = mock(Action0.class); + + final Action0 secondStepStart = mock(Action0.class); + final Action0 secondStepEnd = mock(Action0.class); + + final Action0 thirdStepStart = mock(Action0.class); + final Action0 thirdStepEnd = mock(Action0.class); + + final Action0 firstAction = new Action0() { + @Override + public void call() { + firstStepStart.call(); + firstStepEnd.call(); + } + }; + final Action0 secondAction = new Action0() { + @Override + public void call() { + secondStepStart.call(); + scheduler.schedule(firstAction); + secondStepEnd.call(); + + } + }; + final Action0 thirdAction = new Action0() { + @Override + public void call() { + thirdStepStart.call(); + scheduler.schedule(secondAction); + thirdStepEnd.call(); + } + }; + + InOrder inOrder = inOrder(firstStepStart, firstStepEnd, secondStepStart, secondStepEnd, thirdStepStart, thirdStepEnd); + + scheduler.schedule(thirdAction); + + inOrder.verify(thirdStepStart, times(1)).call(); + inOrder.verify(secondStepStart, times(1)).call(); + inOrder.verify(firstStepStart, times(1)).call(); + inOrder.verify(firstStepEnd, times(1)).call(); + inOrder.verify(secondStepEnd, times(1)).call(); + inOrder.verify(thirdStepEnd, times(1)).call(); + } + + } + + } diff --git a/rxjava-core/src/main/java/rx/concurrency/DelayedAction.java b/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java similarity index 82% rename from rxjava-core/src/main/java/rx/concurrency/DelayedAction.java rename to rxjava-core/src/main/java/rx/concurrency/SleepingAction.java index d83004ae34..ce10fb1903 100644 --- a/rxjava-core/src/main/java/rx/concurrency/DelayedAction.java +++ b/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java @@ -7,12 +7,12 @@ import java.util.concurrent.TimeUnit; -public class DelayedAction implements Func0 { +public class SleepingAction implements Func0 { private final Func0 underlying; private final Scheduler scheduler; private final long execTime; - public DelayedAction(Func0 underlying, Scheduler scheduler, long timespan, TimeUnit timeUnit) { + public SleepingAction(Func0 underlying, Scheduler scheduler, long timespan, TimeUnit timeUnit) { this.underlying = underlying; this.scheduler = scheduler; this.execTime = scheduler.now() + timeUnit.toMillis(timespan); From 86a750c76d98eafb14d8f03734bf719bbb0d95b9 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Mon, 18 Mar 2013 18:31:57 +0200 Subject: [PATCH 05/17] Headers --- rxjava-core/src/main/java/rx/Scheduler.java | 15 +++++++++++++++ .../java/rx/concurrency/AbstractScheduler.java | 15 +++++++++++++++ .../rx/concurrency/CurrentThreadScheduler.java | 15 +++++++++++++++ .../java/rx/concurrency/DiscardableAction.java | 15 +++++++++++++++ .../java/rx/concurrency/ExecutorScheduler.java | 15 +++++++++++++++ .../java/rx/concurrency/ImmediateScheduler.java | 15 +++++++++++++++ .../java/rx/concurrency/NewThreadScheduler.java | 15 +++++++++++++++ .../ScheduledExecutorServiceScheduler.java | 15 +++++++++++++++ .../src/main/java/rx/concurrency/Schedulers.java | 15 +++++++++++++++ .../main/java/rx/concurrency/SleepingAction.java | 15 +++++++++++++++ .../java/rx/operators/OperationObserveOn.java | 15 +++++++++++++++ .../java/rx/operators/OperationSubscribeOn.java | 15 +++++++++++++++ 12 files changed, 180 insertions(+) diff --git a/rxjava-core/src/main/java/rx/Scheduler.java b/rxjava-core/src/main/java/rx/Scheduler.java index ea6a234dca..06b69fe2ff 100644 --- a/rxjava-core/src/main/java/rx/Scheduler.java +++ b/rxjava-core/src/main/java/rx/Scheduler.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx; import rx.util.functions.Action0; diff --git a/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java index 34ad11ab0e..63ed7a6a60 100644 --- a/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.concurrency; import rx.Scheduler; diff --git a/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java index be2659d060..b91a22ab53 100644 --- a/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.concurrency; import org.junit.Test; diff --git a/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java b/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java index e9d4ee8379..632ec69b1a 100644 --- a/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java +++ b/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.concurrency; import rx.Subscription; diff --git a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java index e27d6fea81..88aabc46eb 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.concurrency; import rx.Subscription; diff --git a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java index b49f6b3352..59908e4e0c 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.concurrency; import org.junit.Test; diff --git a/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java index deba7dec1e..a78c9633f1 100644 --- a/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.concurrency; import rx.Subscription; diff --git a/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java index 1f788e9c8d..c8a61d7292 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.concurrency; import rx.Subscription; diff --git a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java index d198d8cc98..61b51d070d 100644 --- a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java +++ b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.concurrency; import rx.Scheduler; diff --git a/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java b/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java index ce10fb1903..02823436d4 100644 --- a/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java +++ b/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.concurrency; import rx.Scheduler; diff --git a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java index 7ec96bc3f1..85228f2cff 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.operators; import rx.Observable; diff --git a/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java b/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java index b16c0263d0..b59dd5f37d 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.operators; import rx.Observable; From df09fcb6a53b70a828698fbad71e681edbcdc7f4 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Mon, 18 Mar 2013 18:52:25 +0200 Subject: [PATCH 06/17] ObserveOn/SubscribeOn unit tests --- .../rx/concurrency/ForwardingScheduler.java | 56 +++++++++++++++++++ .../rx/concurrency/ImmediateScheduler.java | 5 +- .../main/java/rx/concurrency/Schedulers.java | 4 ++ .../java/rx/operators/OperationObserveOn.java | 39 +++++++++++-- .../rx/operators/OperationSubscribeOn.java | 33 ++++++++++- 5 files changed, 129 insertions(+), 8 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/concurrency/ForwardingScheduler.java diff --git a/rxjava-core/src/main/java/rx/concurrency/ForwardingScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ForwardingScheduler.java new file mode 100644 index 0000000000..2714808766 --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/ForwardingScheduler.java @@ -0,0 +1,56 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.concurrency; + +import rx.Scheduler; +import rx.Subscription; +import rx.util.functions.Action0; +import rx.util.functions.Func0; + +import java.util.concurrent.TimeUnit; + +public class ForwardingScheduler implements Scheduler { + private final Scheduler underlying; + + public ForwardingScheduler(Scheduler underlying) { + this.underlying = underlying; + } + + @Override + public Subscription schedule(Action0 action) { + return underlying.schedule(action); + } + + @Override + public Subscription schedule(Func0 action) { + return underlying.schedule(action); + } + + @Override + public Subscription schedule(Action0 action, long timespan, TimeUnit unit) { + return underlying.schedule(action, timespan, unit); + } + + @Override + public Subscription schedule(Func0 action, long timespan, TimeUnit unit) { + return underlying.schedule(action, timespan, unit); + } + + @Override + public long now() { + return underlying.now(); + } +} \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java index 59908e4e0c..e54d178ae0 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java @@ -38,8 +38,9 @@ private ImmediateScheduler() { @Override public Subscription schedule(Func0 action) { - action.call(); - return Subscriptions.empty(); + DiscardableAction discardableAction = new DiscardableAction(action); + discardableAction.call(); + return discardableAction; } public static class UnitTest { diff --git a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java index 61b51d070d..9f5ff2065d 100644 --- a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java +++ b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java @@ -44,4 +44,8 @@ public static Scheduler executor(Executor executor) { public static Scheduler scheduledExecutor(ScheduledExecutorService executor) { return new ScheduledExecutorServiceScheduler(executor); } + + public static Scheduler forwardingScheduler(Scheduler underlying) { + return new ForwardingScheduler(underlying); + } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java index 85228f2cff..4e6e14bb85 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java @@ -15,14 +15,20 @@ */ package rx.operators; +import org.junit.Test; import rx.Observable; import rx.Observer; import rx.Scheduler; import rx.Subscription; +import rx.concurrency.Schedulers; import rx.util.functions.Action0; -import rx.util.functions.Func0; import rx.util.functions.Func1; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + public class OperationObserveOn { public static Func1, Subscription> observeOn(Observable source, Scheduler scheduler) { @@ -64,23 +70,46 @@ public void call() { } @Override - public void onError(Exception e) { + public void onError(final Exception e) { scheduler.schedule(new Action0() { @Override public void call() { - underlying.onCompleted(); + underlying.onError(e); } }); } @Override - public void onNext(T args) { + public void onNext(final T args) { scheduler.schedule(new Action0() { @Override public void call() { - underlying.onCompleted(); + underlying.onNext(args); } }); } } + + public static class UnitTest { + + @Test + @SuppressWarnings("unchecked") + public void testObserveOn() { + + Scheduler scheduler = spy(Schedulers.forwardingScheduler(Schedulers.immediate())); + + Observer observer = mock(Observer.class); + Observable.create(observeOn(Observable.toObservable(1, 2, 3), scheduler)).subscribe(observer); + + verify(scheduler, times(4)).schedule(any(Action0.class)); + verifyNoMoreInteractions(scheduler); + + verify(observer, times(1)).onNext(1); + verify(observer, times(1)).onNext(2); + verify(observer, times(1)).onNext(3); + verify(observer, times(1)).onCompleted(); + } + + } + } diff --git a/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java b/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java index b59dd5f37d..104a134657 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java @@ -15,14 +15,19 @@ */ package rx.operators; +import org.junit.Test; import rx.Observable; import rx.Observer; import rx.Scheduler; import rx.Subscription; +import rx.concurrency.Schedulers; import rx.util.functions.Action0; import rx.util.functions.Func0; import rx.util.functions.Func1; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + public class OperationSubscribeOn { public static Func1, Subscription> subscribeOn(Observable source, Scheduler scheduler) { @@ -68,4 +73,30 @@ public void call() { }); } } -} + + public static class UnitTest { + + @Test + @SuppressWarnings("unchecked") + public void testSubscribeOn() { + Observable w = Observable.toObservable(1, 2, 3); + + Scheduler scheduler = spy(Schedulers.forwardingScheduler(Schedulers.immediate())); + + Observer observer = mock(Observer.class); + Subscription subscription = Observable.create(subscribeOn(w, scheduler)).subscribe(observer); + + verify(scheduler, times(1)).schedule(any(Func0.class)); + subscription.unsubscribe(); + verify(scheduler, times(1)).schedule(any(Action0.class)); + verifyNoMoreInteractions(scheduler); + + verify(observer, times(1)).onNext(1); + verify(observer, times(1)).onNext(2); + verify(observer, times(1)).onNext(3); + verify(observer, times(1)).onCompleted(); + } + + } + +} \ No newline at end of file From 2d1c45da58b09063ce19863bffcf7866e5fb4300 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Mon, 18 Mar 2013 19:41:25 +0200 Subject: [PATCH 07/17] Some documentation --- rxjava-core/src/main/java/rx/Observable.java | 28 +++++++++++++++++++ .../ScheduledExecutorServiceScheduler.java | 1 - 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 8a4a254366..bbbbd10d02 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -741,10 +741,26 @@ public static Observable range(int start, int count) { return from(Range.createWithCount(start, count)); } + /** + * Asynchronously subscribes and unsubscribes observers on the specified scheduler. + * + * @param source the source observable. + * @param scheduler the scheduler to perform subscription and unsubscription actions on. + * @param the type of observable. + * @return the source sequence whose subscriptions and unsubscriptions happen on the specified scheduler. + */ public static Observable subscribeOn(Observable source, Scheduler scheduler) { return _create(OperationSubscribeOn.subscribeOn(source, scheduler)); } + /** + * Asynchronously notify observers on the specified scheduler. + * + * @param source the source observable. + * @param scheduler the scheduler to notify observers on. + * @param the type of observable. + * @return the source sequence whose observations happen on the specified scheduler. + */ public static Observable observeOn(Observable source, Scheduler scheduler) { return _create(OperationObserveOn.observeOn(source, scheduler)); } @@ -2572,10 +2588,22 @@ public Observable> materialize() { return materialize(this); } + /** + * Asynchronously subscribes and unsubscribes observers on the specified scheduler. + * + * @param scheduler the scheduler to perform subscription and unsubscription actions on. + * @return the source sequence whose subscriptions and unsubscriptions happen on the specified scheduler. + */ public Observable subscribeOn(Scheduler scheduler) { return subscribeOn(this, scheduler); } + /** + * Asynchronously notify observers on the specified scheduler. + * + * @param scheduler the scheduler to notify observers on. + * @return the source sequence whose observations happen on the specified scheduler. + */ public Observable observeOn(Scheduler scheduler) { return observeOn(this, scheduler); } diff --git a/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java index c8a61d7292..a37f51edac 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java @@ -21,7 +21,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -// TODO [@mairbek] silly name public class ScheduledExecutorServiceScheduler extends AbstractScheduler { private final ScheduledExecutorService executorService; From db9f9a60afdfe3b3a4f8a9e2d0e085b5e9629847 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Tue, 19 Mar 2013 20:09:36 +0200 Subject: [PATCH 08/17] Documenting code --- rxjava-core/src/main/java/rx/Scheduler.java | 38 +++++++++++++++++-- .../rx/concurrency/AbstractScheduler.java | 8 ++-- .../rx/concurrency/ForwardingScheduler.java | 8 ++-- .../ScheduledExecutorServiceScheduler.java | 4 +- 4 files changed, 44 insertions(+), 14 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Scheduler.java b/rxjava-core/src/main/java/rx/Scheduler.java index 06b69fe2ff..b4d2c5d471 100644 --- a/rxjava-core/src/main/java/rx/Scheduler.java +++ b/rxjava-core/src/main/java/rx/Scheduler.java @@ -20,16 +20,46 @@ import java.util.concurrent.TimeUnit; +/** + * Represents an object that schedules units of work. + */ public interface Scheduler { - Subscription schedule(Action0 action); - + /** + * Schedules a cancelable action to be executed. + * + * @param action action + * @return a subscription to be able to unsubscribe from action. + */ Subscription schedule(Func0 action); - Subscription schedule(Action0 action, long timespan, TimeUnit unit); + /** + * Schedules an action to be executed. + * + * @param action action + * @return a subscription to be able to unsubscribe from action. + */ + Subscription schedule(Action0 action); + + /** + * Schedules an action to be executed in dueTime. + * + * @param action action + * @return a subscription to be able to unsubscribe from action. + */ + Subscription schedule(Action0 action, long dueTime, TimeUnit unit); - Subscription schedule(Func0 action, long timespan, TimeUnit unit); + /** + * Schedules a cancelable action to be executed in dueTime. + * + * @param action action + * @return a subscription to be able to unsubscribe from action. + */ + Subscription schedule(Func0 action, long dueTime, TimeUnit unit); + /** + * Returns the scheduler's notion of current time. + */ long now(); } diff --git a/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java index 63ed7a6a60..8ad8b436a0 100644 --- a/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java @@ -31,13 +31,13 @@ public Subscription schedule(Action0 action) { } @Override - public Subscription schedule(Action0 action, long timespan, TimeUnit unit) { - return schedule(asFunc0(action), timespan, unit); + public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) { + return schedule(asFunc0(action), dueTime, unit); } @Override - public Subscription schedule(Func0 action, long timespan, TimeUnit unit) { - return schedule(new SleepingAction(action, this, timespan, unit)); + public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { + return schedule(new SleepingAction(action, this, dueTime, unit)); } @Override diff --git a/rxjava-core/src/main/java/rx/concurrency/ForwardingScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ForwardingScheduler.java index 2714808766..a32f72aa9f 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ForwardingScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ForwardingScheduler.java @@ -40,13 +40,13 @@ public Subscription schedule(Func0 action) { } @Override - public Subscription schedule(Action0 action, long timespan, TimeUnit unit) { - return underlying.schedule(action, timespan, unit); + public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) { + return underlying.schedule(action, dueTime, unit); } @Override - public Subscription schedule(Func0 action, long timespan, TimeUnit unit) { - return underlying.schedule(action, timespan, unit); + public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { + return underlying.schedule(action, dueTime, unit); } @Override diff --git a/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java index a37f51edac..0558d6fa26 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java @@ -34,14 +34,14 @@ public Subscription schedule(Func0 action) { } @Override - public Subscription schedule(Func0 action, long timespan, TimeUnit unit) { + public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { final DiscardableAction discardableAction = new DiscardableAction(action); executorService.schedule(new Runnable() { @Override public void run() { discardableAction.call(); } - }, timespan, unit); + }, dueTime, unit); return discardableAction; } From eaa0316231ac0b8233d49f77a7ba4f85ea5d9537 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Tue, 19 Mar 2013 20:10:36 +0200 Subject: [PATCH 09/17] renamed tests --- .../src/main/java/rx/concurrency/CurrentThreadScheduler.java | 2 +- .../src/main/java/rx/concurrency/ImmediateScheduler.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java index b91a22ab53..3b60f631ae 100644 --- a/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java @@ -68,7 +68,7 @@ private void enqueue(DiscardableAction action) { public static class UnitTest { @Test - public void testOrdering() { + public void testNestedActions() { final CurrentThreadScheduler scheduler = new CurrentThreadScheduler(); final Action0 firstStepStart = mock(Action0.class); diff --git a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java index e54d178ae0..59e52078f3 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java @@ -46,7 +46,7 @@ public Subscription schedule(Func0 action) { public static class UnitTest { @Test - public void testOrdering() { + public void testNestedActions() { final ImmediateScheduler scheduler = new ImmediateScheduler(); final Action0 firstStepStart = mock(Action0.class); From 81ee35dcf7cbbd06c8b5c932543f3d23f407b857 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Tue, 19 Mar 2013 20:21:24 +0200 Subject: [PATCH 10/17] Extracted ScheduledObserver as a separate class --- .../rx/observables/ScheduledObserver.java | 8 ++-- .../java/rx/operators/OperationObserveOn.java | 41 +------------------ 2 files changed, 5 insertions(+), 44 deletions(-) diff --git a/rxjava-core/src/main/java/rx/observables/ScheduledObserver.java b/rxjava-core/src/main/java/rx/observables/ScheduledObserver.java index 90025d6db4..ec38b90d0e 100644 --- a/rxjava-core/src/main/java/rx/observables/ScheduledObserver.java +++ b/rxjava-core/src/main/java/rx/observables/ScheduledObserver.java @@ -24,21 +24,21 @@ public void call() { } @Override - public void onError(Exception e) { + public void onError(final Exception e) { scheduler.schedule(new Action0() { @Override public void call() { - underlying.onCompleted(); + underlying.onError(e); } }); } @Override - public void onNext(T args) { + public void onNext(final T args) { scheduler.schedule(new Action0() { @Override public void call() { - underlying.onCompleted(); + underlying.onNext(args); } }); } diff --git a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java index 4e6e14bb85..16eed89270 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java @@ -21,6 +21,7 @@ import rx.Scheduler; import rx.Subscription; import rx.concurrency.Schedulers; +import rx.observables.ScheduledObserver; import rx.util.functions.Action0; import rx.util.functions.Func1; @@ -50,46 +51,6 @@ public Subscription call(final Observer observer) { } } - private static class ScheduledObserver implements Observer { - private final Observer underlying; - private final Scheduler scheduler; - - public ScheduledObserver(Observer underlying, Scheduler scheduler) { - this.underlying = underlying; - this.scheduler = scheduler; - } - - @Override - public void onCompleted() { - scheduler.schedule(new Action0() { - @Override - public void call() { - underlying.onCompleted(); - } - }); - } - - @Override - public void onError(final Exception e) { - scheduler.schedule(new Action0() { - @Override - public void call() { - underlying.onError(e); - } - }); - } - - @Override - public void onNext(final T args) { - scheduler.schedule(new Action0() { - @Override - public void call() { - underlying.onNext(args); - } - }); - } - } - public static class UnitTest { @Test From fb22a73a7df3ccada20d3f2e7b0da2db806bcb2e Mon Sep 17 00:00:00 2001 From: John Myers Date: Wed, 27 Mar 2013 21:48:41 -0700 Subject: [PATCH 11/17] Add tests to demonstrate bugs --- .../src/main/java/rx/operators/OperationTake.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index e86263e562..fc4372d5ae 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -260,6 +260,18 @@ public void testTake2() { verify(aObserver, times(1)).onCompleted(); } + @Test + public void testTakeDoesntLeakErrors() { + Observable source = Observable.concat(Observable.from("one"), Observable.error(new Exception("test failed"))); + Observable.create(take(source, 1)).last(); + } + + @Test + public void testTakeZeroDoesntLeakError() { + Observable source = Observable.error(new Exception("test failed")); + Observable.create(take(source, 0)).lastOrDefault("ok"); + } + @Test public void testUnsubscribeAfterTake() { Subscription s = mock(Subscription.class); From 4919472dd12f0230b9116544182d0e706e44db06 Mon Sep 17 00:00:00 2001 From: John Myers Date: Thu, 28 Mar 2013 21:24:26 -0700 Subject: [PATCH 12/17] Split Take and TakeWhile --- rxjava-core/src/main/java/rx/Observable.java | 13 +- .../main/java/rx/operators/OperationTake.java | 160 ++------- .../java/rx/operators/OperationTakeWhile.java | 313 ++++++++++++++++++ 3 files changed, 349 insertions(+), 137 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index d24e48106c..e332098ba2 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -41,6 +41,8 @@ import rx.operators.OperationDefer; import rx.operators.OperationDematerialize; import rx.operators.OperationFilter; +import rx.operators.OperationTake; +import rx.operators.OperationTakeWhile; import rx.operators.OperationWhere; import rx.operators.OperationMap; import rx.operators.OperationMaterialize; @@ -54,7 +56,6 @@ import rx.operators.OperationScan; import rx.operators.OperationSkip; import rx.operators.OperationSynchronize; -import rx.operators.OperationTake; import rx.operators.OperationTakeLast; import rx.operators.OperationToObservableFuture; import rx.operators.OperationToObservableIterable; @@ -1779,7 +1780,7 @@ public static Observable takeLast(final Observable items, final int co * @return */ public static Observable takeWhile(final Observable items, Func1 predicate) { - return create(OperationTake.takeWhile(items, predicate)); + return create(OperationTakeWhile.takeWhile(items, predicate)); } /** @@ -1811,16 +1812,18 @@ public Boolean call(T t) { * @return */ public static Observable takeWhileWithIndex(final Observable items, Func2 predicate) { - return create(OperationTake.takeWhileWithIndex(items, predicate)); + return create(OperationTakeWhile.takeWhileWithIndex(items, predicate)); } public static Observable takeWhileWithIndex(final Observable items, Object predicate) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(predicate); - return create(OperationTake.takeWhileWithIndex(items, new Func2() { + return create(OperationTakeWhile.takeWhileWithIndex(items, new Func2() + { @Override - public Boolean call(T t, Integer integer) { + public Boolean call(T t, Integer integer) + { return (Boolean) _f.call(t, integer); } })); diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index fc4372d5ae..1b853d5bba 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -15,21 +15,23 @@ */ package rx.operators; -import static org.junit.Assert.*; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; - -import java.util.concurrent.atomic.AtomicInteger; - import org.junit.Test; - import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.subscriptions.Subscriptions; import rx.util.AtomicObservableSubscription; import rx.util.functions.Func1; -import rx.util.functions.Func2; -import rx.subjects.Subject; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + /** * Returns a specified number of contiguous values from the start of an observable sequence. */ @@ -43,61 +45,17 @@ public final class OperationTake { * @return */ public static Func1, Subscription> take(final Observable items, final int num) { - return takeWhileWithIndex(items, OperationTake. numPredicate(num)); - } - - /** - * Returns a specified number of contiguous values from the start of an observable sequence. - * - * @param items - * @param predicate - * a function to test each source element for a condition - * @return - */ - public static Func1, Subscription> takeWhile(final Observable items, final Func1 predicate) { - return takeWhileWithIndex(items, OperationTake. skipIndex(predicate)); - } - - /** - * Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values. - * - * @param items - * @param predicate - * a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false. - * @return - */ - public static Func1, Subscription> takeWhileWithIndex(final Observable items, final Func2 predicate) { // wrap in a Func so that if a chain is built up, then asynchronously subscribed to twice we will have 2 instances of Take rather than 1 handing both, which is not thread-safe. return new Func1, Subscription>() { @Override public Subscription call(Observer observer) { - return new TakeWhile(items, predicate).call(observer); - } - - }; - } - - private static Func2 numPredicate(final int num) { - return new Func2() { - - @Override - public Boolean call(T input, Integer index) { - return index < num; + return new Take(items, num).call(observer); } }; } - private static Func2 skipIndex(final Func1 underlying) { - return new Func2() { - @Override - public Boolean call(T input, Integer index) { - return underlying.call(input); - } - }; - } - /** * This class is NOT thread-safe if invoked and referenced multiple times. In other words, don't subscribe to it multiple times from different threads. *

@@ -109,19 +67,24 @@ public Boolean call(T input, Integer index) { * * @param */ - private static class TakeWhile implements Func1, Subscription> { + private static class Take implements Func1, Subscription> { private final AtomicInteger counter = new AtomicInteger(); private final Observable items; - private final Func2 predicate; + private final int num; private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); - private TakeWhile(Observable items, Func2 predicate) { + private Take(Observable items, int num) { this.items = items; - this.predicate = predicate; + this.num = num; } @Override public Subscription call(Observer observer) { + if (num < 1) { + observer.onCompleted(); + return Subscriptions.empty(); + } + return subscription.wrap(items.subscribe(new ItemObserver(observer))); } @@ -144,10 +107,14 @@ public void onError(Exception e) { @Override public void onNext(T args) { - if (predicate.call(args, counter.getAndIncrement())) { + final int count = counter.incrementAndGet(); + if (count <= num) { observer.onNext(args); - } else { - observer.onCompleted(); + if (count == num) { + observer.onCompleted(); + } + } + if (count >= num) { // this will work if the sequence is asynchronous, it will have no effect on a synchronous observable subscription.unsubscribe(); } @@ -159,77 +126,6 @@ public void onNext(T args) { public static class UnitTest { - @Test - public void testTakeWhile1() { - Observable w = Observable.toObservable(1, 2, 3); - Observable take = Observable.create(takeWhile(w, new Func1() { - @Override - public Boolean call(Integer input) { - return input < 3; - } - })); - - @SuppressWarnings("unchecked") - Observer aObserver = mock(Observer.class); - take.subscribe(aObserver); - verify(aObserver, times(1)).onNext(1); - verify(aObserver, times(1)).onNext(2); - verify(aObserver, never()).onNext(3); - verify(aObserver, never()).onError(any(Exception.class)); - verify(aObserver, times(1)).onCompleted(); - } - - @Test - public void testTakeWhileOnSubject1() { - Subject s = Subject.create(); - Observable w = (Observable)s; - Observable take = Observable.create(takeWhile(w, new Func1() { - @Override - public Boolean call(Integer input) { - return input < 3; - } - })); - - @SuppressWarnings("unchecked") - Observer aObserver = mock(Observer.class); - take.subscribe(aObserver); - - s.onNext(1); - s.onNext(2); - s.onNext(3); - s.onNext(4); - s.onNext(5); - s.onCompleted(); - - verify(aObserver, times(1)).onNext(1); - verify(aObserver, times(1)).onNext(2); - verify(aObserver, never()).onNext(3); - verify(aObserver, never()).onNext(4); - verify(aObserver, never()).onNext(5); - verify(aObserver, never()).onError(any(Exception.class)); - verify(aObserver, times(1)).onCompleted(); - } - - @Test - public void testTakeWhile2() { - Observable w = Observable.toObservable("one", "two", "three"); - Observable take = Observable.create(takeWhileWithIndex(w, new Func2() { - @Override - public Boolean call(String input, Integer index) { - return index < 2; - } - })); - - @SuppressWarnings("unchecked") - Observer aObserver = mock(Observer.class); - take.subscribe(aObserver); - verify(aObserver, times(1)).onNext("one"); - verify(aObserver, times(1)).onNext("two"); - verify(aObserver, never()).onNext("three"); - verify(aObserver, never()).onError(any(Exception.class)); - verify(aObserver, times(1)).onCompleted(); - } - @Test public void testTake1() { Observable w = Observable.toObservable("one", "two", "three"); diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java new file mode 100644 index 0000000000..f45efabc92 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java @@ -0,0 +1,313 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.AtomicObservableSubscription; +import rx.util.functions.Func1; +import rx.util.functions.Func2; +import rx.subjects.Subject; +/** + * Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values. + */ +public final class OperationTakeWhile { + + /** + * Returns a specified number of contiguous values from the start of an observable sequence. + * + * @param items + * @param predicate + * a function to test each source element for a condition + * @return + */ + public static Func1, Subscription> takeWhile(final Observable items, final Func1 predicate) { + return takeWhileWithIndex(items, OperationTakeWhile.skipIndex(predicate)); + } + + /** + * Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values. + * + * @param items + * @param predicate + * a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false. + * @return + */ + public static Func1, Subscription> takeWhileWithIndex(final Observable items, final Func2 predicate) { + // wrap in a Func so that if a chain is built up, then asynchronously subscribed to twice we will have 2 instances of Take rather than 1 handing both, which is not thread-safe. + return new Func1, Subscription>() { + + @Override + public Subscription call(Observer observer) { + return new TakeWhile(items, predicate).call(observer); + } + + }; + } + + private static Func2 skipIndex(final Func1 underlying) { + return new Func2() { + @Override + public Boolean call(T input, Integer index) { + return underlying.call(input); + } + }; + } + + /** + * This class is NOT thread-safe if invoked and referenced multiple times. In other words, don't subscribe to it multiple times from different threads. + *

+ * It IS thread-safe from within it while receiving onNext events from multiple threads. + *

+ * This should all be fine as long as it's kept as a private class and a new instance created from static factory method above. + *

+ * Note how the takeWhileWithIndex() factory method above protects us from a single instance being exposed with the Observable wrapper handling the subscribe flow. + * + * @param + */ + private static class TakeWhile implements Func1, Subscription> { + private final AtomicInteger counter = new AtomicInteger(); + private final Observable items; + private final Func2 predicate; + private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + + private TakeWhile(Observable items, Func2 predicate) { + this.items = items; + this.predicate = predicate; + } + + @Override + public Subscription call(Observer observer) { + return subscription.wrap(items.subscribe(new ItemObserver(observer))); + } + + private class ItemObserver implements Observer { + private final Observer observer; + + public ItemObserver(Observer observer) { + this.observer = observer; + } + + @Override + public void onCompleted() { + observer.onCompleted(); + } + + @Override + public void onError(Exception e) { + observer.onError(e); + } + + @Override + public void onNext(T args) { + if (predicate.call(args, counter.getAndIncrement())) { + observer.onNext(args); + } else { + observer.onCompleted(); + // this will work if the sequence is asynchronous, it will have no effect on a synchronous observable + subscription.unsubscribe(); + } + } + + } + + } + + public static class UnitTest { + + @Test + public void testTakeWhile1() { + Observable w = Observable.toObservable(1, 2, 3); + Observable take = Observable.create(takeWhile(w, new Func1() + { + @Override + public Boolean call(Integer input) + { + return input < 3; + } + })); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(aObserver); + verify(aObserver, times(1)).onNext(1); + verify(aObserver, times(1)).onNext(2); + verify(aObserver, never()).onNext(3); + verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testTakeWhileOnSubject1() { + Subject s = Subject.create(); + Observable w = (Observable)s; + Observable take = Observable.create(takeWhile(w, new Func1() + { + @Override + public Boolean call(Integer input) + { + return input < 3; + } + })); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(aObserver); + + s.onNext(1); + s.onNext(2); + s.onNext(3); + s.onNext(4); + s.onNext(5); + s.onCompleted(); + + verify(aObserver, times(1)).onNext(1); + verify(aObserver, times(1)).onNext(2); + verify(aObserver, never()).onNext(3); + verify(aObserver, never()).onNext(4); + verify(aObserver, never()).onNext(5); + verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testTakeWhile2() { + Observable w = Observable.toObservable("one", "two", "three"); + Observable take = Observable.create(takeWhileWithIndex(w, new Func2() + { + @Override + public Boolean call(String input, Integer index) + { + return index < 2; + } + })); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(aObserver); + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, times(1)).onNext("two"); + verify(aObserver, never()).onNext("three"); + verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testTakeWhileDoesntLeakErrors() { + Observable source = Observable.create(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onNext("one"); + observer.onError(new Exception("test failed")); + return Subscriptions.empty(); + } + }); + + Observable.create(takeWhile(source, new Func1() + { + @Override + public Boolean call(String s) + { + return false; + } + })).last(); + } + + @Test + public void testUnsubscribeAfterTake() { + Subscription s = mock(Subscription.class); + TestObservable w = new TestObservable(s, "one", "two", "three"); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + Observable take = Observable.create(takeWhileWithIndex(w, new Func2() + { + @Override + public Boolean call(String s, Integer index) + { + return index < 1; + } + })); + take.subscribe(aObserver); + + // wait for the Observable to complete + try { + w.t.join(); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + System.out.println("TestObservable thread finished"); + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, never()).onNext("two"); + verify(aObserver, never()).onNext("three"); + verify(s, times(1)).unsubscribe(); + } + + private static class TestObservable extends Observable { + + final Subscription s; + final String[] values; + Thread t = null; + + public TestObservable(Subscription s, String... values) { + this.s = s; + this.values = values; + } + + @Override + public Subscription subscribe(final Observer observer) { + System.out.println("TestObservable subscribed to ..."); + t = new Thread(new Runnable() { + + @Override + public void run() { + try { + System.out.println("running TestObservable thread"); + for (String s : values) { + System.out.println("TestObservable onNext: " + s); + observer.onNext(s); + } + observer.onCompleted(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + }); + System.out.println("starting TestObservable thread"); + t.start(); + System.out.println("done starting TestObservable thread"); + return s; + } + + } + } + +} From b2697cb4553e77df6091c7dfcc048585e3a5ca02 Mon Sep 17 00:00:00 2001 From: John Myers Date: Thu, 28 Mar 2013 23:08:24 -0700 Subject: [PATCH 13/17] Implement TrustedObservableTester.assertTrustedObservable() --- .../rx/testing/TrustedObservableTester.java | 253 ++++++++++++++++++ 1 file changed, 253 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/testing/TrustedObservableTester.java diff --git a/rxjava-core/src/main/java/rx/testing/TrustedObservableTester.java b/rxjava-core/src/main/java/rx/testing/TrustedObservableTester.java new file mode 100644 index 0000000000..f48be8d3f5 --- /dev/null +++ b/rxjava-core/src/main/java/rx/testing/TrustedObservableTester.java @@ -0,0 +1,253 @@ +package rx.testing; + +import org.junit.Test; +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Func1; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; + +public class TrustedObservableTester +{ + private TrustedObservableTester() {} + + public static Func1, Subscription> assertTrustedObservable(final Func1, Subscription> source) + { + return new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + return source.call(new TestingObserver(observer)); + } + }; + } + + public static class TestingObserver implements Observer { + + private final Observer actual; + private final AtomicBoolean isFinished = new AtomicBoolean(false); + private final AtomicBoolean isInCallback = new AtomicBoolean(false); + + public TestingObserver(Observer actual) { + this.actual = actual; + } + + @Override + public void onCompleted() { + assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true)); + assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true)); + actual.onCompleted(); + isInCallback.set(false); + } + + @Override + public void onError(Exception e) { + assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true)); + assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true)); + actual.onError(e); + isInCallback.set(false); + } + + @Override + public void onNext(T args) { + assertFalse("previous call to onCompleted() or onError()", isFinished.get()); + assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true)); + actual.onNext(args); + isInCallback.set(false); + } + + } + + public static class UnitTest { + @Test(expected = AssertionError.class) + public void testDoubleCompleted() { + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onCompleted(); + observer.onCompleted(); + return Subscriptions.empty(); + } + })).lastOrDefault("end"); + + } + + @Test(expected = AssertionError.class) + public void testCompletedError() { + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onCompleted(); + observer.onError(new Exception()); + return Subscriptions.empty(); + } + })).lastOrDefault("end"); + } + + @Test(expected = AssertionError.class) + public void testCompletedNext() { + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onCompleted(); + observer.onNext("one"); + return Subscriptions.empty(); + } + })).lastOrDefault("end"); + } + + @Test(expected = AssertionError.class) + public void testErrorCompleted() { + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onError(new Exception()); + observer.onCompleted(); + return Subscriptions.empty(); + } + })).lastOrDefault("end"); + } + + @Test(expected = AssertionError.class) + public void testDoubleError() { + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onError(new Exception()); + observer.onError(new Exception()); + return Subscriptions.empty(); + } + })).lastOrDefault("end"); + } + + + @Test(expected = AssertionError.class) + public void testErrorNext() { + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onError(new Exception()); + observer.onNext("one"); + return Subscriptions.empty(); + } + })).lastOrDefault("end"); + } + + @Test + public void testNextCompleted() { + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onNext("one"); + observer.onCompleted(); + return Subscriptions.empty(); + } + })).lastOrDefault("end"); + } + + @Test + public void testConcurrentNextNext() { + final List threads = new ArrayList(); + final AtomicReference threadFailure = new AtomicReference(); + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(final Observer observer) + { + threads.add(new Thread(new Runnable() + { + @Override + public void run() + { + observer.onNext("one"); + } + })); + threads.add(new Thread(new Runnable() + { + @Override + public void run() + { + observer.onNext("two"); + } + })); + return Subscriptions.empty(); + } + })).subscribe(new SlowObserver()); + for (Thread thread : threads) { + thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() + { + @Override + public void uncaughtException(Thread thread, Throwable throwable) + { + threadFailure.set(throwable); + } + }); + thread.start(); + } + for (Thread thread : threads) { + try { + thread.join(); + } catch (InterruptedException ignored) { + } + } + // Junit seems pretty bad about exposing test failures inside of created threads. + assertNotNull("exception thrown by thread", threadFailure.get()); + assertEquals("class of exception thrown by thread", AssertionError.class, threadFailure.get().getClass()); + } + + private static class SlowObserver implements Observer + { + @Override + public void onCompleted() + { + try { + Thread.sleep(10); + } catch (InterruptedException ignored) { + } + } + + @Override + public void onError(Exception e) + { + try { + Thread.sleep(10); + } catch (InterruptedException ignored) { + } + } + + @Override + public void onNext(String args) + { + try { + Thread.sleep(10); + } catch (InterruptedException ignored) { + } + } + } + } +} From 550005cf260188a06660a3652c65fda594ee8f85 Mon Sep 17 00:00:00 2001 From: John Myers Date: Thu, 28 Mar 2013 23:11:36 -0700 Subject: [PATCH 14/17] Fix violations of the Observer contract. --- .../main/java/rx/operators/OperationTake.java | 32 +++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index 1b853d5bba..ebefc09c65 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -31,6 +31,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static rx.testing.TrustedObservableTester.assertTrustedObservable; /** * Returns a specified number of contiguous values from the start of an observable sequence. @@ -97,12 +98,16 @@ public ItemObserver(Observer observer) { @Override public void onCompleted() { - observer.onCompleted(); + if (counter.getAndSet(num) < num) { + observer.onCompleted(); + } } @Override public void onError(Exception e) { - observer.onError(e); + if (counter.getAndSet(num) < num) { + observer.onError(e); + } } @Override @@ -129,7 +134,7 @@ public static class UnitTest { @Test public void testTake1() { Observable w = Observable.toObservable("one", "two", "three"); - Observable take = Observable.create(take(w, 2)); + Observable take = Observable.create(assertTrustedObservable(take(w, 2))); @SuppressWarnings("unchecked") Observer aObserver = mock(Observer.class); @@ -144,7 +149,7 @@ public void testTake1() { @Test public void testTake2() { Observable w = Observable.toObservable("one", "two", "three"); - Observable take = Observable.create(take(w, 1)); + Observable take = Observable.create(assertTrustedObservable(take(w, 1))); @SuppressWarnings("unchecked") Observer aObserver = mock(Observer.class); @@ -158,14 +163,23 @@ public void testTake2() { @Test public void testTakeDoesntLeakErrors() { - Observable source = Observable.concat(Observable.from("one"), Observable.error(new Exception("test failed"))); - Observable.create(take(source, 1)).last(); + Observable source = Observable.create(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onNext("one"); + observer.onError(new Exception("test failed")); + return Subscriptions.empty(); + } + }); + Observable.create(assertTrustedObservable(take(source, 1))).last(); } @Test public void testTakeZeroDoesntLeakError() { - Observable source = Observable.error(new Exception("test failed")); - Observable.create(take(source, 0)).lastOrDefault("ok"); + Observable source = Observable.error(new Exception("test failed")); + Observable.create(assertTrustedObservable(take(source, 0))).lastOrDefault("ok"); } @Test @@ -175,7 +189,7 @@ public void testUnsubscribeAfterTake() { @SuppressWarnings("unchecked") Observer aObserver = mock(Observer.class); - Observable take = Observable.create(take(w, 1)); + Observable take = Observable.create(assertTrustedObservable(take(w, 1))); take.subscribe(aObserver); // wait for the Observable to complete From 6c1a1abe0002018d282fb366b94b00ebb366de11 Mon Sep 17 00:00:00 2001 From: John Myers Date: Fri, 29 Mar 2013 20:49:31 -0700 Subject: [PATCH 15/17] take(0) subscribes to its source --- .../main/java/rx/operators/OperationTake.java | 41 ++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index ebefc09c65..c4335b71d8 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -23,8 +23,10 @@ import rx.util.AtomicObservableSubscription; import rx.util.functions.Func1; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; @@ -82,6 +84,23 @@ private Take(Observable items, int num) { @Override public Subscription call(Observer observer) { if (num < 1) { + items.subscribe(new Observer() + { + @Override + public void onCompleted() + { + } + + @Override + public void onError(Exception e) + { + } + + @Override + public void onNext(T args) + { + } + }).unsubscribe(); observer.onCompleted(); return Subscriptions.empty(); } @@ -178,8 +197,28 @@ public Subscription call(Observer observer) @Test public void testTakeZeroDoesntLeakError() { - Observable source = Observable.error(new Exception("test failed")); + final AtomicBoolean subscribed = new AtomicBoolean(false); + final AtomicBoolean unSubscribed = new AtomicBoolean(false); + Observable source = Observable.create(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + subscribed.set(true); + observer.onError(new Exception("test failed")); + return new Subscription() + { + @Override + public void unsubscribe() + { + unSubscribed.set(true); + } + }; + } + }); Observable.create(assertTrustedObservable(take(source, 0))).lastOrDefault("ok"); + assertTrue("source subscribed", subscribed.get()); + assertTrue("source unsubscribed", unSubscribed.get()); } @Test From c10fdd6214e79fb7f7f3074d850abb5e8010f2f2 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Sat, 30 Mar 2013 10:49:41 -0700 Subject: [PATCH 16/17] comments while reviewing - DO NOT PUSH --- .../concurrency/CurrentThreadScheduler.java | 21 ++++++++++++++++--- .../rx/concurrency/ExecutorScheduler.java | 15 +++++++++++++ .../rx/concurrency/ImmediateScheduler.java | 4 ++++ .../ScheduledExecutorServiceScheduler.java | 4 ++++ .../main/java/rx/concurrency/Schedulers.java | 3 +++ .../java/rx/concurrency/SleepingAction.java | 10 +++++---- 6 files changed, 50 insertions(+), 7 deletions(-) diff --git a/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java index 3b60f631ae..c1a8eed455 100644 --- a/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -35,6 +35,13 @@ public static CurrentThreadScheduler getInstance() { private static final ThreadLocal> QUEUE = new ThreadLocal>(); +// private static final ThreadLocal> QUEUE = new ThreadLocal>() { +// @Override +// protected Queue initialValue() { +// return new LinkedList(); +// } +//}; + private CurrentThreadScheduler() { } @@ -54,10 +61,18 @@ private void enqueue(DiscardableAction action) { QUEUE.set(queue); } + // this means enqueue will always synchronously execute everything on the current thread ... + // so how would there ever be more than one item in the queue? + // SleepingAction is also blocking so it wouldn't skip those, it would block until it finishes sleeping + // and if it did skip something what would ever trigger it eventually being executed unless something else + // is enqueued? + queue.add(action); if (exec) { + System.out.println("exec"); while (!queue.isEmpty()) { + System.out.println("call in queue"); queue.poll().call(); } diff --git a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java index 88aabc46eb..55427c874e 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java @@ -16,9 +16,11 @@ package rx.concurrency; import rx.Subscription; +import rx.util.functions.Action0; import rx.util.functions.Func0; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; public class ExecutorScheduler extends AbstractScheduler { private final Executor executor; @@ -27,10 +29,23 @@ public ExecutorScheduler(Executor executor) { this.executor = executor; } + @Override + public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) { + + // this should delegate to ScheduledExecutorServiceScheduler + // and that should be an implemenation detail I think ... not be a choice someone needs to make + + return super.schedule(action, dueTime, unit); + } + @Override public Subscription schedule(Func0 action) { final DiscardableAction discardableAction = new DiscardableAction(action); + // if it's a delayed Action (has a TimeUnit) then we should use a timer + // otherwise it will tie up a thread and sleep + // ... see the method above ... + executor.execute(new Runnable() { @Override public void run() { diff --git a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java index 59e52078f3..22c49b6437 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java @@ -38,6 +38,10 @@ private ImmediateScheduler() { @Override public Subscription schedule(Func0 action) { + // do we need to wrap this when we're executing it directly? + // the Func0 already returns a subscription so does this buy us anything? + // it will only ever return if the Func0 is actually async anyways and if it's async its + // subscription would do what's needed DiscardableAction discardableAction = new DiscardableAction(action); discardableAction.call(); return discardableAction; diff --git a/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java index 0558d6fa26..a219dd5f61 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java @@ -24,6 +24,10 @@ public class ScheduledExecutorServiceScheduler extends AbstractScheduler { private final ScheduledExecutorService executorService; + + // this should probably just become an implementation detail of ExecutorScheduler + + public ScheduledExecutorServiceScheduler(ScheduledExecutorService executorService) { this.executorService = executorService; } diff --git a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java index 9f5ff2065d..9748c660d1 100644 --- a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java +++ b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java @@ -41,6 +41,9 @@ public static Scheduler executor(Executor executor) { return new ExecutorScheduler(executor); } + // do we need this one? + // since the Scheduler interface allows both scheduled and non-scheduled it seems awkward to make someone choose what scheduler to use + // because the wrong choice will make the Scheduler not work correctly if a TimeUnit is given public static Scheduler scheduledExecutor(ScheduledExecutorService executor) { return new ScheduledExecutorServiceScheduler(executor); } diff --git a/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java b/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java index 02823436d4..6f845aee8e 100644 --- a/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java +++ b/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java @@ -15,13 +15,12 @@ */ package rx.concurrency; +import java.util.concurrent.TimeUnit; + import rx.Scheduler; import rx.Subscription; -import rx.util.functions.Action0; import rx.util.functions.Func0; -import java.util.concurrent.TimeUnit; - public class SleepingAction implements Func0 { private final Func0 underlying; private final Scheduler scheduler; @@ -37,6 +36,10 @@ public SleepingAction(Func0 underlying, Scheduler scheduler, long public Subscription call() { if (execTime < scheduler.now()) { try { + // this will block the current thread ... which doesn't seem to work well with CurrentThreadScheduler + // shouldn't CurrentThreadScheduler be capable of doing other things while this is sleeping? + // In fact, this will block any of the concurrent systems -- it will take up a thread in a threadpool and make it sleep + // whereas I would think it should schedule itself on a timer Thread.sleep(scheduler.now() - execTime); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -45,6 +48,5 @@ public Subscription call() { } return underlying.call(); - } } From a7a1f50d4c651a65fa5fe703167a0d4e5de9cf25 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Sat, 30 Mar 2013 19:43:16 -0700 Subject: [PATCH 17/17] Small reorganization of code for OperationTake and TrustedObservableTester - removed rx.testing package (if that's going to exist that means it's bleeding into something that should live in /src/test and beyond what works well for inner class testing) - made TrustedObservableTester part of rx.operation package and an inner UnitTest class so it doesn't become part of the public API --- .../AbstractOperation.java} | 107 +++++++++--------- .../main/java/rx/operators/OperationTake.java | 21 ++-- 2 files changed, 64 insertions(+), 64 deletions(-) rename rxjava-core/src/main/java/rx/{testing/TrustedObservableTester.java => operators/AbstractOperation.java} (77%) diff --git a/rxjava-core/src/main/java/rx/testing/TrustedObservableTester.java b/rxjava-core/src/main/java/rx/operators/AbstractOperation.java similarity index 77% rename from rxjava-core/src/main/java/rx/testing/TrustedObservableTester.java rename to rxjava-core/src/main/java/rx/operators/AbstractOperation.java index f48be8d3f5..c76deec109 100644 --- a/rxjava-core/src/main/java/rx/testing/TrustedObservableTester.java +++ b/rxjava-core/src/main/java/rx/operators/AbstractOperation.java @@ -1,11 +1,6 @@ -package rx.testing; +package rx.operators; -import org.junit.Test; -import rx.Observable; -import rx.Observer; -import rx.Subscription; -import rx.subscriptions.Subscriptions; -import rx.util.functions.Func1; +import static org.junit.Assert.*; import java.lang.Thread.UncaughtExceptionHandler; import java.util.ArrayList; @@ -13,63 +8,72 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; +import org.junit.Test; -public class TrustedObservableTester +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Func1; + +/** + * Common utility functions for operator implementations and tests. + */ +/* package */class AbstractOperation { - private TrustedObservableTester() {} + private AbstractOperation() { + } - public static Func1, Subscription> assertTrustedObservable(final Func1, Subscription> source) - { - return new Func1, Subscription>() + public static class UnitTest { + + public static Func1, Subscription> assertTrustedObservable(final Func1, Subscription> source) { - @Override - public Subscription call(Observer observer) + return new Func1, Subscription>() { - return source.call(new TestingObserver(observer)); - } - }; - } + @Override + public Subscription call(Observer observer) + { + return source.call(new TestingObserver(observer)); + } + }; + } - public static class TestingObserver implements Observer { + public static class TestingObserver implements Observer { - private final Observer actual; - private final AtomicBoolean isFinished = new AtomicBoolean(false); - private final AtomicBoolean isInCallback = new AtomicBoolean(false); + private final Observer actual; + private final AtomicBoolean isFinished = new AtomicBoolean(false); + private final AtomicBoolean isInCallback = new AtomicBoolean(false); - public TestingObserver(Observer actual) { - this.actual = actual; - } + public TestingObserver(Observer actual) { + this.actual = actual; + } - @Override - public void onCompleted() { - assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true)); - assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true)); - actual.onCompleted(); - isInCallback.set(false); - } + @Override + public void onCompleted() { + assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true)); + assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true)); + actual.onCompleted(); + isInCallback.set(false); + } - @Override - public void onError(Exception e) { - assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true)); - assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true)); - actual.onError(e); - isInCallback.set(false); - } + @Override + public void onError(Exception e) { + assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true)); + assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true)); + actual.onError(e); + isInCallback.set(false); + } - @Override - public void onNext(T args) { - assertFalse("previous call to onCompleted() or onError()", isFinished.get()); - assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true)); - actual.onNext(args); - isInCallback.set(false); - } + @Override + public void onNext(T args) { + assertFalse("previous call to onCompleted() or onError()", isFinished.get()); + assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true)); + actual.onNext(args); + isInCallback.set(false); + } - } + } - public static class UnitTest { @Test(expected = AssertionError.class) public void testDoubleCompleted() { Observable.create(assertTrustedObservable(new Func1, Subscription>() @@ -141,7 +145,6 @@ public Subscription call(Observer observer) })).lastOrDefault("end"); } - @Test(expected = AssertionError.class) public void testErrorNext() { Observable.create(assertTrustedObservable(new Func1, Subscription>() diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index c4335b71d8..5ea6b627e4 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -15,7 +15,16 @@ */ package rx.operators; +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; +import static rx.operators.AbstractOperation.UnitTest.*; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + import org.junit.Test; + import rx.Observable; import rx.Observer; import rx.Subscription; @@ -23,18 +32,6 @@ import rx.util.AtomicObservableSubscription; import rx.util.functions.Func1; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static rx.testing.TrustedObservableTester.assertTrustedObservable; - /** * Returns a specified number of contiguous values from the start of an observable sequence. */