From 72d9872fffa2b8f6d534612436b1613ed062e026 Mon Sep 17 00:00:00 2001 From: Joachim Hofer Date: Mon, 25 Mar 2013 19:29:54 +0100 Subject: [PATCH 1/5] updated a test and added another one, trying to get the expected behavior right --- .../rx/operators/OperationCombineLatest.java | 41 ++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java index 725d7781ae..53a11e0b24 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java +++ b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java @@ -304,7 +304,7 @@ public void testCombineLatestDifferentLengthObservableSequences1() { /* we should have been called 4 times on the Observer */ InOrder inOrder = inOrder(w); - inOrder.verify(w).onNext("1a2a3a"); + inOrder.verify(w).onNext("1a2b3a"); inOrder.verify(w).onNext("1a2b3b"); inOrder.verify(w).onNext("1a2b3c"); inOrder.verify(w).onNext("1a2b3d"); @@ -348,6 +348,45 @@ public void testCombineLatestDifferentLengthObservableSequences2() { } + @SuppressWarnings("unchecked") + /* mock calls don't do generics */ + @Test + public void testCombineLatestWithInterleavingSequences() { + Observer w = mock(Observer.class); + + TestObservable w1 = new TestObservable(); + TestObservable w2 = new TestObservable(); + TestObservable w3 = new TestObservable(); + + Observable combineLatestW = Observable.create(combineLatest(w1, w2, w3, getConcat3StringsCombineLatestFunction())); + combineLatestW.subscribe(w); + + /* simulate sending data */ + w1.Observer.onNext("1a"); + w2.Observer.onNext("2a"); + w2.Observer.onNext("2b"); + w3.Observer.onNext("3a"); + + w1.Observer.onNext("1b"); + w2.Observer.onNext("2c"); + w2.Observer.onNext("2d"); + w3.Observer.onNext("3b"); + + w1.Observer.onCompleted(); + w2.Observer.onCompleted(); + w3.Observer.onCompleted(); + + /* we should have been called 5 times on the Observer */ + InOrder inOrder = inOrder(w); + inOrder.verify(w).onNext("1a2b3a"); + inOrder.verify(w).onNext("1b2b3a"); + inOrder.verify(w).onNext("1b2c3a"); + inOrder.verify(w).onNext("1b2d3a"); + inOrder.verify(w).onNext("1b2d3b"); + + inOrder.verify(w, times(1)).onCompleted(); + } + /** * Testing internal private logic due to the complexity so I want to use TDD to test as a I build it rather than relying purely on the overall functionality expected by the public methods. */ From bc0a089686b721149a962e6c409b8719a44872a4 Mon Sep 17 00:00:00 2001 From: Joachim Hofer Date: Mon, 25 Mar 2013 19:41:24 +0100 Subject: [PATCH 2/5] made 'next' more typesafe --- .../src/main/java/rx/operators/OperationCombineLatest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java index 53a11e0b24..d3dc7f3c66 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java +++ b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java @@ -94,7 +94,7 @@ public void onError(Exception e) { } @Override - public void onNext(Object args) { + public void onNext(T args) { a.next(this, args); } } @@ -185,7 +185,7 @@ synchronized void error(CombineObserver w, Exception e) { * @param w * @param arg */ - void next(CombineObserver w, Object arg) { + void next(CombineObserver w, T arg) { if (Observer == null) { throw new RuntimeException("This shouldn't be running if a Observer isn't registered"); } @@ -485,7 +485,7 @@ public void testAggregateMultipleTypes() { /* mock the Observable Observers that are 'pushing' data for us */ CombineObserver r1 = mock(CombineObserver.class); - CombineObserver r2 = mock(CombineObserver.class); + CombineObserver r2 = mock(CombineObserver.class); /* pretend we're starting up */ a.addObserver(r1); From 1fabd4252e703c2195ffd1c0bcc00cc356bdf555 Mon Sep 17 00:00:00 2001 From: Joachim Hofer Date: Mon, 25 Mar 2013 20:40:58 +0100 Subject: [PATCH 3/5] An attempt at implementing the correct combineLatest semantics. The tests do pass again now (they had to be adapted too). --- .../rx/operators/OperationCombineLatest.java | 103 ++++++++---------- 1 file changed, 44 insertions(+), 59 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java index d3dc7f3c66..3cf2a410c4 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java +++ b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java @@ -21,9 +21,10 @@ import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.LinkedList; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Test; @@ -110,26 +111,22 @@ private static class Aggregator implements Func1, Subscription> { private Observer Observer; private AtomicBoolean running = new AtomicBoolean(true); - /** - * Use LinkedHashMap to retain the order we receive the CombineLatestObserver objects in. - *

- * Note that access to this LinkedList inside MUST BE SYNCHRONIZED - */ - private Map, LinkedList> receivedValuesPerObserver = new LinkedHashMap, LinkedList>(); - /** * store when a Observer completes *

* Note that access to this set MUST BE SYNCHRONIZED * */ - private HashSet> completed = new HashSet>(); + private Set> completed = new HashSet>(); /** * The last value from a Observer *

* Note that access to this set MUST BE SYNCHRONIZED * */ - private HashMap, Object> lastValue = new HashMap, Object>(); + private Map, Object> lastValue = new HashMap, Object>(); + + private Set> hasLastValue = new HashSet>(); + private List> observers = new LinkedList>(); public Aggregator(FuncN combineLatestFunction) { this.combineLatestFunction = combineLatestFunction; @@ -140,9 +137,8 @@ public Aggregator(FuncN combineLatestFunction) { * * @param w */ - synchronized void addObserver(CombineObserver w) { - // initialize this CombineLatestObserver - receivedValuesPerObserver.put(w, new LinkedList()); + synchronized void addObserver(CombineObserver w) { + observers.add(w); } /** @@ -150,11 +146,11 @@ synchronized void addObserver(CombineObserver w) { * * @param w */ - synchronized void complete(CombineObserver w) { - // store that this ZipObserver is completed + synchronized void complete(CombineObserver w) { + // store that this CombineLatestObserver is completed completed.add(w); // if all CombineObservers are completed, we mark the whole thing as completed - if (completed.size() == receivedValuesPerObserver.size()) { + if (completed.size() == observers.size()) { if (running.get()) { // mark ourselves as done Observer.onCompleted(); @@ -169,7 +165,7 @@ synchronized void complete(CombineObserver w) { * * @param w */ - synchronized void error(CombineObserver w, Exception e) { + synchronized void error(CombineObserver w, Exception e) { Observer.onError(e); /* tell ourselves to stop processing onNext events, event if the Observers don't obey the unsubscribe we're about to send */ running.set(false); @@ -196,39 +192,25 @@ void next(CombineObserver w, T arg) { } // define here so the variable is out of the synchronized scope - Object[] argsToCombineLatest = new Object[receivedValuesPerObserver.size()]; + Object[] argsToCombineLatest = new Object[observers.size()]; // we synchronize everything that touches receivedValues and the internal LinkedList objects synchronized (this) { - // add this value to the queue of the CombineLatestObserver for values received - receivedValuesPerObserver.get(w).add(arg); // remember this as the last value for this Observer lastValue.put(w, arg); + hasLastValue.add(w); // if all CombineLatestObservers in 'receivedValues' map have a value, invoke the combineLatestFunction - for (CombineObserver rw : receivedValuesPerObserver.keySet()) { - if (receivedValuesPerObserver.get(rw).peek() == null && !completed.contains(rw)) { - // we have a null (and the Observer isn't completed) meaning the queues aren't all populated so won't do anything + for (CombineObserver rw : observers) { + if (!hasLastValue.contains(rw)) { + // we don't have a value yet for each observer to combine, so we don't have a combined value yet either return; } } - // if we get to here this means all the queues have data (or some are completed) + // if we get to here this means all the queues have data int i = 0; - boolean foundData = false; - for (CombineObserver _w : receivedValuesPerObserver.keySet()) { - LinkedList q = receivedValuesPerObserver.get(_w); - if (q.peek() == null) { - // this is a completed Observer - // we rely on the check above looking at completed.contains to mean that NULL here represents a completed Observer - argsToCombineLatest[i++] = lastValue.get(_w); - } else { - foundData = true; - argsToCombineLatest[i++] = q.remove(); - } - } - if (completed.size() == receivedValuesPerObserver.size() && !foundData) { - // all are completed and queues have run out of data, so return and don't send empty data - return; + for (CombineObserver _w : observers) { + argsToCombineLatest[i++] = lastValue.get(_w); } } // if we did not return above from the synchronized block we can now invoke the combineLatestFunction with all of the args @@ -245,7 +227,7 @@ public Subscription call(Observer Observer) { this.Observer = Observer; /* start the Observers */ - for (CombineObserver rw : receivedValuesPerObserver.keySet()) { + for (CombineObserver rw : observers) { rw.startWatching(); } @@ -263,7 +245,7 @@ private void stop() { /* tell ourselves to stop processing onNext events */ running.set(false); /* propogate to all Observers to unsubscribe */ - for (CombineObserver rw : receivedValuesPerObserver.keySet()) { + for (CombineObserver rw : observers) { if (rw.subscription != null) { rw.subscription.unsubscribe(); } @@ -290,13 +272,13 @@ public void testCombineLatestDifferentLengthObservableSequences1() { /* simulate sending data */ // once for w1 w1.Observer.onNext("1a"); + w2.Observer.onNext("2a"); + w3.Observer.onNext("3a"); w1.Observer.onCompleted(); // twice for w2 - w2.Observer.onNext("2a"); w2.Observer.onNext("2b"); w2.Observer.onCompleted(); // 4 times for w3 - w3.Observer.onNext("3a"); w3.Observer.onNext("3b"); w3.Observer.onNext("3c"); w3.Observer.onNext("3d"); @@ -304,11 +286,12 @@ public void testCombineLatestDifferentLengthObservableSequences1() { /* we should have been called 4 times on the Observer */ InOrder inOrder = inOrder(w); + inOrder.verify(w).onNext("1a2a3a"); inOrder.verify(w).onNext("1a2b3a"); inOrder.verify(w).onNext("1a2b3b"); inOrder.verify(w).onNext("1a2b3c"); inOrder.verify(w).onNext("1a2b3d"); - + inOrder.verify(w, never()).onNext(anyString()); inOrder.verify(w, times(1)).onCompleted(); } @@ -341,17 +324,16 @@ public void testCombineLatestDifferentLengthObservableSequences2() { /* we should have been called 1 time only on the Observer since we only combine the "latest" we don't go back and loop through others once completed */ InOrder inOrder = inOrder(w); - inOrder.verify(w, times(1)).onNext("1a2a3a"); + inOrder.verify(w, times(1)).onNext("1d2b3a"); inOrder.verify(w, never()).onNext(anyString()); inOrder.verify(w, times(1)).onCompleted(); } - @SuppressWarnings("unchecked") - /* mock calls don't do generics */ @Test public void testCombineLatestWithInterleavingSequences() { + @SuppressWarnings("unchecked") Observer w = mock(Observer.class); TestObservable w1 = new TestObservable(); @@ -383,7 +365,8 @@ public void testCombineLatestWithInterleavingSequences() { inOrder.verify(w).onNext("1b2c3a"); inOrder.verify(w).onNext("1b2d3a"); inOrder.verify(w).onNext("1b2d3b"); - + + inOrder.verify(w, never()).onNext(anyString()); inOrder.verify(w, times(1)).onCompleted(); } @@ -568,14 +551,14 @@ public void testAggregatorsWithDifferentSizesAndTiming() { verify(aObserver, never()).onError(any(Exception.class)); verify(aObserver, never()).onCompleted(); - verify(aObserver, times(1)).onNext("oneA"); + verify(aObserver, times(1)).onNext("threeA"); a.next(r1, "four"); a.complete(r1); a.next(r2, "B"); - verify(aObserver, times(1)).onNext("twoB"); + verify(aObserver, times(1)).onNext("fourB"); a.next(r2, "C"); - verify(aObserver, times(1)).onNext("threeC"); + verify(aObserver, times(1)).onNext("fourC"); a.next(r2, "D"); verify(aObserver, times(1)).onNext("fourD"); a.next(r2, "E"); @@ -688,16 +671,18 @@ public void testAggregatorEarlyCompletion() { a.complete(r1); a.next(r2, "A"); - verify(aObserver, never()).onError(any(Exception.class)); - verify(aObserver, never()).onCompleted(); - verify(aObserver, times(1)).onNext("oneA"); + InOrder inOrder = inOrder(aObserver); + + inOrder.verify(aObserver, never()).onError(any(Exception.class)); + inOrder.verify(aObserver, never()).onCompleted(); + inOrder.verify(aObserver, times(1)).onNext("twoA"); a.complete(r2); - verify(aObserver, never()).onError(any(Exception.class)); - verify(aObserver, times(1)).onCompleted(); + inOrder.verify(aObserver, never()).onError(any(Exception.class)); + inOrder.verify(aObserver, times(1)).onCompleted(); // we shouldn't get this since completed is called before any other onNext calls could trigger this - verify(aObserver, never()).onNext("twoA"); + inOrder.verify(aObserver, never()).onNext(anyString()); } @SuppressWarnings("unchecked") @@ -714,7 +699,7 @@ public void testCombineLatest2Types() { verify(aObserver, never()).onError(any(Exception.class)); verify(aObserver, times(1)).onCompleted(); - verify(aObserver, times(1)).onNext("one2"); + verify(aObserver, times(1)).onNext("two2"); verify(aObserver, times(1)).onNext("two3"); verify(aObserver, times(1)).onNext("two4"); } @@ -733,7 +718,7 @@ public void testCombineLatest3TypesA() { verify(aObserver, never()).onError(any(Exception.class)); verify(aObserver, times(1)).onCompleted(); - verify(aObserver, times(1)).onNext("one2[4, 5, 6]"); + verify(aObserver, times(1)).onNext("two2[4, 5, 6]"); } @SuppressWarnings("unchecked") From e449fb139b70db15ffae182355d5306d90389adb Mon Sep 17 00:00:00 2001 From: Joachim Hofer Date: Wed, 27 Mar 2013 21:45:27 +0100 Subject: [PATCH 4/5] Fixed javadoc and comments, cleaned up a bit, and tried to fix synchronization. --- .../rx/operators/OperationCombineLatest.java | 139 +++++++++++------- 1 file changed, 82 insertions(+), 57 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java index 3cf2a410c4..87d0cbe0c9 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java +++ b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java @@ -43,6 +43,14 @@ public class OperationCombineLatest { + /** + * Combines the two given observables, emitting an event containing an aggregation of the latest values of each of the source observables + * each time an event is received from one of the source observables, where the aggregation is defined by the given function. + * @param w0 The first source observable. + * @param w1 The second source observable. + * @param combineLatestFunction The aggregation function used to combine the source observable values. + * @return A function from an observer to a subscription. This can be used to create an observable from. + */ public static Func1, Subscription> combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction) { Aggregator a = new Aggregator(Functions.fromFunc(combineLatestFunction)); a.addObserver(new CombineObserver(a, w0)); @@ -50,6 +58,9 @@ public static Func1, Subscription> combineLatest(Observa return a; } + /** + * @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction) + */ public static Func1, Subscription> combineLatest(Observable w0, Observable w1, Observable w2, Func3 combineLatestFunction) { Aggregator a = new Aggregator(Functions.fromFunc(combineLatestFunction)); a.addObserver(new CombineObserver(a, w0)); @@ -58,6 +69,9 @@ public static Func1, Subscription> combineLatest(Obs return a; } + /** + * @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction) + */ public static Func1, Subscription> combineLatest(Observable w0, Observable w1, Observable w2, Observable w3, Func4 combineLatestFunction) { Aggregator a = new Aggregator(Functions.fromFunc(combineLatestFunction)); a.addObserver(new CombineObserver(a, w0)); @@ -91,7 +105,7 @@ public void onCompleted() { @Override public void onError(Exception e) { - a.error(this, e); + a.error(e); } @Override @@ -101,32 +115,46 @@ public void onNext(T args) { } /** - * Receive notifications from each of the Observables we are reducing and execute the combineLatestFunction whenever we have received events from all Observables. - * - * @param + * Receive notifications from each of the observables we are reducing and execute the combineLatestFunction + * whenever we have received an event from one of the observables, as soon as each Observable has received + * at least one event. */ private static class Aggregator implements Func1, Subscription> { + private Observer observer; + private final FuncN combineLatestFunction; - private Observer Observer; - private AtomicBoolean running = new AtomicBoolean(true); + private final AtomicBoolean running = new AtomicBoolean(true); + // used as an internal lock for handling the latest values and the completed state of each observer + private final Object lockObject = new Object(); + /** - * store when a Observer completes + * Store when an observer completes. *

- * Note that access to this set MUST BE SYNCHRONIZED + * Note that access to this set MUST BE SYNCHRONIZED via 'lockObject' above. * */ - private Set> completed = new HashSet>(); + private final Set> completed = new HashSet>(); /** - * The last value from a Observer + * The latest value from each observer *

- * Note that access to this set MUST BE SYNCHRONIZED + * Note that access to this set MUST BE SYNCHRONIZED via 'lockObject' above. * */ - private Map, Object> lastValue = new HashMap, Object>(); + private final Map, Object> latestValue = new HashMap, Object>(); - private Set> hasLastValue = new HashSet>(); - private List> observers = new LinkedList>(); + /** + * Whether each observer has a latest value at all. + *

+ * Note that access to this set MUST BE SYNCHRONIZED via 'lockObject' above. + * */ + private final Set> hasLatestValue = new HashSet>(); + + /** + * Ordered list of observers to combine. + * No synchronization is necessary as these can not be added or changed asynchronously. + */ + private final List> observers = new LinkedList>(); public Aggregator(FuncN combineLatestFunction) { this.combineLatestFunction = combineLatestFunction; @@ -135,55 +163,53 @@ public Aggregator(FuncN combineLatestFunction) { /** * Receive notification of a Observer starting (meaning we should require it for aggregation) * - * @param w + * @param w The observer to add. */ - synchronized void addObserver(CombineObserver w) { - observers.add(w); + void addObserver(CombineObserver w) { + observers.add(w); } /** * Receive notification of a Observer completing its iterations. * - * @param w + * @param w The observer that has completed. */ - synchronized void complete(CombineObserver w) { - // store that this CombineLatestObserver is completed - completed.add(w); - // if all CombineObservers are completed, we mark the whole thing as completed - if (completed.size() == observers.size()) { - if (running.get()) { - // mark ourselves as done - Observer.onCompleted(); - // just to ensure we stop processing in case we receive more onNext/complete/error calls after this - running.set(false); + void complete(CombineObserver w) { + synchronized(lockObject) { + // store that this CombineLatestObserver is completed + completed.add(w); + // if all CombineObservers are completed, we mark the whole thing as completed + if (completed.size() == observers.size()) { + if (running.get()) { + // mark ourselves as done + observer.onCompleted(); + // just to ensure we stop processing in case we receive more onNext/complete/error calls after this + running.set(false); + } } } } /** * Receive error for a Observer. Throw the error up the chain and stop processing. - * - * @param w */ - synchronized void error(CombineObserver w, Exception e) { - Observer.onError(e); - /* tell ourselves to stop processing onNext events, event if the Observers don't obey the unsubscribe we're about to send */ - running.set(false); - /* tell all Observers to unsubscribe since we had an error */ + void error(Exception e) { + observer.onError(e); + /* tell all observers to unsubscribe since we had an error */ stop(); } /** - * Receive the next value from a Observer. + * Receive the next value from an observer. *

- * If we have received values from all Observers, trigger the combineLatest function, otherwise store the value and keep waiting. + * If we have received values from all observers, trigger the combineLatest function, otherwise store the value and keep waiting. * * @param w * @param arg */ void next(CombineObserver w, T arg) { - if (Observer == null) { - throw new RuntimeException("This shouldn't be running if a Observer isn't registered"); + if (observer == null) { + throw new RuntimeException("This shouldn't be running if an Observer isn't registered"); } /* if we've been 'unsubscribed' don't process anything further even if the things we're watching keep sending (likely because they are not responding to the unsubscribe call) */ @@ -194,15 +220,17 @@ void next(CombineObserver w, T arg) { // define here so the variable is out of the synchronized scope Object[] argsToCombineLatest = new Object[observers.size()]; - // we synchronize everything that touches receivedValues and the internal LinkedList objects - synchronized (this) { - // remember this as the last value for this Observer - lastValue.put(w, arg); - hasLastValue.add(w); + // we synchronize everything that touches latest values + synchronized (lockObject) { + // remember this as the latest value for this observer + latestValue.put(w, arg); + + // remember that this observer now has a latest value set + hasLatestValue.add(w); - // if all CombineLatestObservers in 'receivedValues' map have a value, invoke the combineLatestFunction + // if all observers in the 'observers' list have a value, invoke the combineLatestFunction for (CombineObserver rw : observers) { - if (!hasLastValue.contains(rw)) { + if (!hasLatestValue.contains(rw)) { // we don't have a value yet for each observer to combine, so we don't have a combined value yet either return; } @@ -210,48 +238,45 @@ void next(CombineObserver w, T arg) { // if we get to here this means all the queues have data int i = 0; for (CombineObserver _w : observers) { - argsToCombineLatest[i++] = lastValue.get(_w); + argsToCombineLatest[i++] = latestValue.get(_w); } } // if we did not return above from the synchronized block we can now invoke the combineLatestFunction with all of the args // we do this outside the synchronized block as it is now safe to call this concurrently and don't need to block other threads from calling // this 'next' method while another thread finishes calling this combineLatestFunction - Observer.onNext(combineLatestFunction.call(argsToCombineLatest)); + observer.onNext(combineLatestFunction.call(argsToCombineLatest)); } @Override - public Subscription call(Observer Observer) { - if (this.Observer != null) { + public Subscription call(Observer observer) { + if (this.observer != null) { throw new IllegalStateException("Only one Observer can subscribe to this Observable."); } - this.Observer = Observer; + this.observer = observer; - /* start the Observers */ + /* start the observers */ for (CombineObserver rw : observers) { rw.startWatching(); } return new Subscription() { - @Override public void unsubscribe() { stop(); } - }; } private void stop() { /* tell ourselves to stop processing onNext events */ running.set(false); - /* propogate to all Observers to unsubscribe */ + /* propogate to all observers to unsubscribe */ for (CombineObserver rw : observers) { if (rw.subscription != null) { rw.subscription.unsubscribe(); } } } - } public static class UnitTest { @@ -597,7 +622,7 @@ public void testAggregatorError() { verify(aObserver, never()).onCompleted(); verify(aObserver, times(1)).onNext("helloworld"); - a.error(r1, new RuntimeException("")); + a.error(new RuntimeException("")); a.next(r1, "hello"); a.next(r2, "again"); From 420ac77b66f5f5848004403405dad1d351635ae0 Mon Sep 17 00:00:00 2001 From: Joachim Hofer Date: Wed, 27 Mar 2013 21:50:31 +0100 Subject: [PATCH 5/5] Whoops, generics not allowed in @see javadoc... --- .../src/main/java/rx/operators/OperationCombineLatest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java index 87d0cbe0c9..382f8ba8aa 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java +++ b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java @@ -59,7 +59,7 @@ public static Func1, Subscription> combineLatest(Observa } /** - * @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction) + * @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction) */ public static Func1, Subscription> combineLatest(Observable w0, Observable w1, Observable w2, Func3 combineLatestFunction) { Aggregator a = new Aggregator(Functions.fromFunc(combineLatestFunction)); @@ -70,7 +70,7 @@ public static Func1, Subscription> combineLatest(Obs } /** - * @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction) + * @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction) */ public static Func1, Subscription> combineLatest(Observable w0, Observable w1, Observable w2, Observable w3, Func4 combineLatestFunction) { Aggregator a = new Aggregator(Functions.fromFunc(combineLatestFunction));