diff --git a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java index 725d7781ae..382f8ba8aa 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java +++ b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java @@ -21,9 +21,10 @@ import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.LinkedList; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Test; @@ -42,6 +43,14 @@ public class OperationCombineLatest { + /** + * Combines the two given observables, emitting an event containing an aggregation of the latest values of each of the source observables + * each time an event is received from one of the source observables, where the aggregation is defined by the given function. + * @param w0 The first source observable. + * @param w1 The second source observable. + * @param combineLatestFunction The aggregation function used to combine the source observable values. + * @return A function from an observer to a subscription. This can be used to create an observable from. + */ public static Func1, Subscription> combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction) { Aggregator a = new Aggregator(Functions.fromFunc(combineLatestFunction)); a.addObserver(new CombineObserver(a, w0)); @@ -49,6 +58,9 @@ public static Func1, Subscription> combineLatest(Observa return a; } + /** + * @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction) + */ public static Func1, Subscription> combineLatest(Observable w0, Observable w1, Observable w2, Func3 combineLatestFunction) { Aggregator a = new Aggregator(Functions.fromFunc(combineLatestFunction)); a.addObserver(new CombineObserver(a, w0)); @@ -57,6 +69,9 @@ public static Func1, Subscription> combineLatest(Obs return a; } + /** + * @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction) + */ public static Func1, Subscription> combineLatest(Observable w0, Observable w1, Observable w2, Observable w3, Func4 combineLatestFunction) { Aggregator a = new Aggregator(Functions.fromFunc(combineLatestFunction)); a.addObserver(new CombineObserver(a, w0)); @@ -90,46 +105,56 @@ public void onCompleted() { @Override public void onError(Exception e) { - a.error(this, e); + a.error(e); } @Override - public void onNext(Object args) { + public void onNext(T args) { a.next(this, args); } } /** - * Receive notifications from each of the Observables we are reducing and execute the combineLatestFunction whenever we have received events from all Observables. - * - * @param + * Receive notifications from each of the observables we are reducing and execute the combineLatestFunction + * whenever we have received an event from one of the observables, as soon as each Observable has received + * at least one event. */ private static class Aggregator implements Func1, Subscription> { + private Observer observer; + private final FuncN combineLatestFunction; - private Observer Observer; - private AtomicBoolean running = new AtomicBoolean(true); + private final AtomicBoolean running = new AtomicBoolean(true); + // used as an internal lock for handling the latest values and the completed state of each observer + private final Object lockObject = new Object(); + /** - * Use LinkedHashMap to retain the order we receive the CombineLatestObserver objects in. + * Store when an observer completes. *

- * Note that access to this LinkedList inside MUST BE SYNCHRONIZED - */ - private Map, LinkedList> receivedValuesPerObserver = new LinkedHashMap, LinkedList>(); + * Note that access to this set MUST BE SYNCHRONIZED via 'lockObject' above. + * */ + private final Set> completed = new HashSet>(); /** - * store when a Observer completes + * The latest value from each observer *

- * Note that access to this set MUST BE SYNCHRONIZED + * Note that access to this set MUST BE SYNCHRONIZED via 'lockObject' above. * */ - private HashSet> completed = new HashSet>(); - + private final Map, Object> latestValue = new HashMap, Object>(); + /** - * The last value from a Observer + * Whether each observer has a latest value at all. *

- * Note that access to this set MUST BE SYNCHRONIZED + * Note that access to this set MUST BE SYNCHRONIZED via 'lockObject' above. * */ - private HashMap, Object> lastValue = new HashMap, Object>(); + private final Set> hasLatestValue = new HashSet>(); + + /** + * Ordered list of observers to combine. + * No synchronization is necessary as these can not be added or changed asynchronously. + */ + private final List> observers = new LinkedList>(); public Aggregator(FuncN combineLatestFunction) { this.combineLatestFunction = combineLatestFunction; @@ -138,56 +163,53 @@ public Aggregator(FuncN combineLatestFunction) { /** * Receive notification of a Observer starting (meaning we should require it for aggregation) * - * @param w + * @param w The observer to add. */ - synchronized void addObserver(CombineObserver w) { - // initialize this CombineLatestObserver - receivedValuesPerObserver.put(w, new LinkedList()); + void addObserver(CombineObserver w) { + observers.add(w); } /** * Receive notification of a Observer completing its iterations. * - * @param w + * @param w The observer that has completed. */ - synchronized void complete(CombineObserver w) { - // store that this ZipObserver is completed - completed.add(w); - // if all CombineObservers are completed, we mark the whole thing as completed - if (completed.size() == receivedValuesPerObserver.size()) { - if (running.get()) { - // mark ourselves as done - Observer.onCompleted(); - // just to ensure we stop processing in case we receive more onNext/complete/error calls after this - running.set(false); + void complete(CombineObserver w) { + synchronized(lockObject) { + // store that this CombineLatestObserver is completed + completed.add(w); + // if all CombineObservers are completed, we mark the whole thing as completed + if (completed.size() == observers.size()) { + if (running.get()) { + // mark ourselves as done + observer.onCompleted(); + // just to ensure we stop processing in case we receive more onNext/complete/error calls after this + running.set(false); + } } } } /** * Receive error for a Observer. Throw the error up the chain and stop processing. - * - * @param w */ - synchronized void error(CombineObserver w, Exception e) { - Observer.onError(e); - /* tell ourselves to stop processing onNext events, event if the Observers don't obey the unsubscribe we're about to send */ - running.set(false); - /* tell all Observers to unsubscribe since we had an error */ + void error(Exception e) { + observer.onError(e); + /* tell all observers to unsubscribe since we had an error */ stop(); } /** - * Receive the next value from a Observer. + * Receive the next value from an observer. *

- * If we have received values from all Observers, trigger the combineLatest function, otherwise store the value and keep waiting. + * If we have received values from all observers, trigger the combineLatest function, otherwise store the value and keep waiting. * * @param w * @param arg */ - void next(CombineObserver w, Object arg) { - if (Observer == null) { - throw new RuntimeException("This shouldn't be running if a Observer isn't registered"); + void next(CombineObserver w, T arg) { + if (observer == null) { + throw new RuntimeException("This shouldn't be running if an Observer isn't registered"); } /* if we've been 'unsubscribed' don't process anything further even if the things we're watching keep sending (likely because they are not responding to the unsubscribe call) */ @@ -196,80 +218,65 @@ void next(CombineObserver w, Object arg) { } // define here so the variable is out of the synchronized scope - Object[] argsToCombineLatest = new Object[receivedValuesPerObserver.size()]; - - // we synchronize everything that touches receivedValues and the internal LinkedList objects - synchronized (this) { - // add this value to the queue of the CombineLatestObserver for values received - receivedValuesPerObserver.get(w).add(arg); - // remember this as the last value for this Observer - lastValue.put(w, arg); - - // if all CombineLatestObservers in 'receivedValues' map have a value, invoke the combineLatestFunction - for (CombineObserver rw : receivedValuesPerObserver.keySet()) { - if (receivedValuesPerObserver.get(rw).peek() == null && !completed.contains(rw)) { - // we have a null (and the Observer isn't completed) meaning the queues aren't all populated so won't do anything + Object[] argsToCombineLatest = new Object[observers.size()]; + + // we synchronize everything that touches latest values + synchronized (lockObject) { + // remember this as the latest value for this observer + latestValue.put(w, arg); + + // remember that this observer now has a latest value set + hasLatestValue.add(w); + + // if all observers in the 'observers' list have a value, invoke the combineLatestFunction + for (CombineObserver rw : observers) { + if (!hasLatestValue.contains(rw)) { + // we don't have a value yet for each observer to combine, so we don't have a combined value yet either return; } } - // if we get to here this means all the queues have data (or some are completed) + // if we get to here this means all the queues have data int i = 0; - boolean foundData = false; - for (CombineObserver _w : receivedValuesPerObserver.keySet()) { - LinkedList q = receivedValuesPerObserver.get(_w); - if (q.peek() == null) { - // this is a completed Observer - // we rely on the check above looking at completed.contains to mean that NULL here represents a completed Observer - argsToCombineLatest[i++] = lastValue.get(_w); - } else { - foundData = true; - argsToCombineLatest[i++] = q.remove(); - } - } - if (completed.size() == receivedValuesPerObserver.size() && !foundData) { - // all are completed and queues have run out of data, so return and don't send empty data - return; + for (CombineObserver _w : observers) { + argsToCombineLatest[i++] = latestValue.get(_w); } } // if we did not return above from the synchronized block we can now invoke the combineLatestFunction with all of the args // we do this outside the synchronized block as it is now safe to call this concurrently and don't need to block other threads from calling // this 'next' method while another thread finishes calling this combineLatestFunction - Observer.onNext(combineLatestFunction.call(argsToCombineLatest)); + observer.onNext(combineLatestFunction.call(argsToCombineLatest)); } @Override - public Subscription call(Observer Observer) { - if (this.Observer != null) { + public Subscription call(Observer observer) { + if (this.observer != null) { throw new IllegalStateException("Only one Observer can subscribe to this Observable."); } - this.Observer = Observer; + this.observer = observer; - /* start the Observers */ - for (CombineObserver rw : receivedValuesPerObserver.keySet()) { + /* start the observers */ + for (CombineObserver rw : observers) { rw.startWatching(); } return new Subscription() { - @Override public void unsubscribe() { stop(); } - }; } private void stop() { /* tell ourselves to stop processing onNext events */ running.set(false); - /* propogate to all Observers to unsubscribe */ - for (CombineObserver rw : receivedValuesPerObserver.keySet()) { + /* propogate to all observers to unsubscribe */ + for (CombineObserver rw : observers) { if (rw.subscription != null) { rw.subscription.unsubscribe(); } } } - } public static class UnitTest { @@ -290,13 +297,13 @@ public void testCombineLatestDifferentLengthObservableSequences1() { /* simulate sending data */ // once for w1 w1.Observer.onNext("1a"); + w2.Observer.onNext("2a"); + w3.Observer.onNext("3a"); w1.Observer.onCompleted(); // twice for w2 - w2.Observer.onNext("2a"); w2.Observer.onNext("2b"); w2.Observer.onCompleted(); // 4 times for w3 - w3.Observer.onNext("3a"); w3.Observer.onNext("3b"); w3.Observer.onNext("3c"); w3.Observer.onNext("3d"); @@ -305,10 +312,11 @@ public void testCombineLatestDifferentLengthObservableSequences1() { /* we should have been called 4 times on the Observer */ InOrder inOrder = inOrder(w); inOrder.verify(w).onNext("1a2a3a"); + inOrder.verify(w).onNext("1a2b3a"); inOrder.verify(w).onNext("1a2b3b"); inOrder.verify(w).onNext("1a2b3c"); inOrder.verify(w).onNext("1a2b3d"); - + inOrder.verify(w, never()).onNext(anyString()); inOrder.verify(w, times(1)).onCompleted(); } @@ -341,13 +349,52 @@ public void testCombineLatestDifferentLengthObservableSequences2() { /* we should have been called 1 time only on the Observer since we only combine the "latest" we don't go back and loop through others once completed */ InOrder inOrder = inOrder(w); - inOrder.verify(w, times(1)).onNext("1a2a3a"); + inOrder.verify(w, times(1)).onNext("1d2b3a"); inOrder.verify(w, never()).onNext(anyString()); inOrder.verify(w, times(1)).onCompleted(); } + @Test + public void testCombineLatestWithInterleavingSequences() { + @SuppressWarnings("unchecked") + Observer w = mock(Observer.class); + + TestObservable w1 = new TestObservable(); + TestObservable w2 = new TestObservable(); + TestObservable w3 = new TestObservable(); + + Observable combineLatestW = Observable.create(combineLatest(w1, w2, w3, getConcat3StringsCombineLatestFunction())); + combineLatestW.subscribe(w); + + /* simulate sending data */ + w1.Observer.onNext("1a"); + w2.Observer.onNext("2a"); + w2.Observer.onNext("2b"); + w3.Observer.onNext("3a"); + + w1.Observer.onNext("1b"); + w2.Observer.onNext("2c"); + w2.Observer.onNext("2d"); + w3.Observer.onNext("3b"); + + w1.Observer.onCompleted(); + w2.Observer.onCompleted(); + w3.Observer.onCompleted(); + + /* we should have been called 5 times on the Observer */ + InOrder inOrder = inOrder(w); + inOrder.verify(w).onNext("1a2b3a"); + inOrder.verify(w).onNext("1b2b3a"); + inOrder.verify(w).onNext("1b2c3a"); + inOrder.verify(w).onNext("1b2d3a"); + inOrder.verify(w).onNext("1b2d3b"); + + inOrder.verify(w, never()).onNext(anyString()); + inOrder.verify(w, times(1)).onCompleted(); + } + /** * Testing internal private logic due to the complexity so I want to use TDD to test as a I build it rather than relying purely on the overall functionality expected by the public methods. */ @@ -446,7 +493,7 @@ public void testAggregateMultipleTypes() { /* mock the Observable Observers that are 'pushing' data for us */ CombineObserver r1 = mock(CombineObserver.class); - CombineObserver r2 = mock(CombineObserver.class); + CombineObserver r2 = mock(CombineObserver.class); /* pretend we're starting up */ a.addObserver(r1); @@ -529,14 +576,14 @@ public void testAggregatorsWithDifferentSizesAndTiming() { verify(aObserver, never()).onError(any(Exception.class)); verify(aObserver, never()).onCompleted(); - verify(aObserver, times(1)).onNext("oneA"); + verify(aObserver, times(1)).onNext("threeA"); a.next(r1, "four"); a.complete(r1); a.next(r2, "B"); - verify(aObserver, times(1)).onNext("twoB"); + verify(aObserver, times(1)).onNext("fourB"); a.next(r2, "C"); - verify(aObserver, times(1)).onNext("threeC"); + verify(aObserver, times(1)).onNext("fourC"); a.next(r2, "D"); verify(aObserver, times(1)).onNext("fourD"); a.next(r2, "E"); @@ -575,7 +622,7 @@ public void testAggregatorError() { verify(aObserver, never()).onCompleted(); verify(aObserver, times(1)).onNext("helloworld"); - a.error(r1, new RuntimeException("")); + a.error(new RuntimeException("")); a.next(r1, "hello"); a.next(r2, "again"); @@ -649,16 +696,18 @@ public void testAggregatorEarlyCompletion() { a.complete(r1); a.next(r2, "A"); - verify(aObserver, never()).onError(any(Exception.class)); - verify(aObserver, never()).onCompleted(); - verify(aObserver, times(1)).onNext("oneA"); + InOrder inOrder = inOrder(aObserver); + + inOrder.verify(aObserver, never()).onError(any(Exception.class)); + inOrder.verify(aObserver, never()).onCompleted(); + inOrder.verify(aObserver, times(1)).onNext("twoA"); a.complete(r2); - verify(aObserver, never()).onError(any(Exception.class)); - verify(aObserver, times(1)).onCompleted(); + inOrder.verify(aObserver, never()).onError(any(Exception.class)); + inOrder.verify(aObserver, times(1)).onCompleted(); // we shouldn't get this since completed is called before any other onNext calls could trigger this - verify(aObserver, never()).onNext("twoA"); + inOrder.verify(aObserver, never()).onNext(anyString()); } @SuppressWarnings("unchecked") @@ -675,7 +724,7 @@ public void testCombineLatest2Types() { verify(aObserver, never()).onError(any(Exception.class)); verify(aObserver, times(1)).onCompleted(); - verify(aObserver, times(1)).onNext("one2"); + verify(aObserver, times(1)).onNext("two2"); verify(aObserver, times(1)).onNext("two3"); verify(aObserver, times(1)).onNext("two4"); } @@ -694,7 +743,7 @@ public void testCombineLatest3TypesA() { verify(aObserver, never()).onError(any(Exception.class)); verify(aObserver, times(1)).onCompleted(); - verify(aObserver, times(1)).onNext("one2[4, 5, 6]"); + verify(aObserver, times(1)).onNext("two2[4, 5, 6]"); } @SuppressWarnings("unchecked")