Skip to content

Merge of Pull #212 for Take/TakeWhile Fixes #214

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 72 additions & 10 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,29 @@
import rx.operators.OperationDefer;
import rx.operators.OperationDematerialize;
import rx.operators.OperationFilter;
import rx.operators.OperationTake;
import rx.operators.OperationTakeWhile;
import rx.operators.OperationWhere;
import rx.operators.OperationMap;
import rx.operators.OperationMaterialize;
import rx.operators.OperationMerge;
import rx.operators.OperationMergeDelayError;
import rx.operators.OperationMostRecent;
import rx.operators.OperationNext;
import rx.operators.OperationObserveOn;
import rx.operators.OperationOnErrorResumeNextViaFunction;
import rx.operators.OperationOnErrorResumeNextViaObservable;
import rx.operators.OperationOnErrorReturn;
import rx.operators.OperationScan;
import rx.operators.OperationSkip;
import rx.operators.OperationSubscribeOn;
import rx.operators.OperationSynchronize;
import rx.operators.OperationTake;
import rx.operators.OperationTakeLast;
import rx.operators.OperationToObservableFuture;
import rx.operators.OperationToObservableIterable;
import rx.operators.OperationToObservableList;
import rx.operators.OperationToObservableSortedList;
import rx.operators.OperationWhere;
import rx.operators.OperationZip;
import rx.operators.OperatorGroupBy;
import rx.operators.OperatorTakeUntil;
Expand Down Expand Up @@ -728,7 +732,7 @@ public Boolean call(T t1) {
* Filters an Observable by discarding any of its emissions that do not meet some test.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/filter.png">
*
*
* @param that
* the Observable to filter
* @param predicate
Expand Down Expand Up @@ -783,6 +787,36 @@ public static Observable<Integer> range(int start, int count) {
return from(Range.createWithCount(start, count));
}

/**
* Asynchronously subscribes and unsubscribes observers on the specified scheduler.
*
* @param source
* the source observable.
* @param scheduler
* the scheduler to perform subscription and unsubscription actions on.
* @param <T>
* the type of observable.
* @return the source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.
*/
public static <T> Observable<T> subscribeOn(Observable<T> source, Scheduler scheduler) {
return _create(OperationSubscribeOn.subscribeOn(source, scheduler));
}

/**
* Asynchronously notify observers on the specified scheduler.
*
* @param source
* the source observable.
* @param scheduler
* the scheduler to notify observers on.
* @param <T>
* the type of observable.
* @return the source sequence whose observations happen on the specified scheduler.
*/
public static <T> Observable<T> observeOn(Observable<T> source, Scheduler scheduler) {
return _create(OperationObserveOn.observeOn(source, scheduler));
}

/**
* Returns an observable sequence that invokes the observable factory whenever a new observer subscribes.
* The Defer operator allows you to defer or delay the creation of the sequence until the time when an observer
Expand Down Expand Up @@ -1779,7 +1813,7 @@ public static <T> Observable<T> takeLast(final Observable<T> items, final int co
* @return
*/
public static <T> Observable<T> takeWhile(final Observable<T> items, Func1<T, Boolean> predicate) {
return create(OperationTake.takeWhile(items, predicate));
return create(OperationTakeWhile.takeWhile(items, predicate));
}

/**
Expand Down Expand Up @@ -1811,16 +1845,18 @@ public Boolean call(T t) {
* @return
*/
public static <T> Observable<T> takeWhileWithIndex(final Observable<T> items, Func2<T, Integer, Boolean> predicate) {
return create(OperationTake.takeWhileWithIndex(items, predicate));
return create(OperationTakeWhile.takeWhileWithIndex(items, predicate));
}

public static <T> Observable<T> takeWhileWithIndex(final Observable<T> items, Object predicate) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(predicate);

return create(OperationTake.takeWhileWithIndex(items, new Func2<T, Integer, Boolean>() {
return create(OperationTakeWhile.takeWhileWithIndex(items, new Func2<T, Integer, Boolean>()
{
@Override
public Boolean call(T t, Integer integer) {
public Boolean call(T t, Integer integer)
{
return (Boolean) _f.call(t, integer);
}
}));
Expand Down Expand Up @@ -2469,7 +2505,7 @@ public Boolean call(T t1) {
* Filters an Observable by discarding any of its emissions that do not meet some test.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/filter.png">
*
*
* @param predicate
* a function that evaluates the items emitted by the source Observable, returning
* <code>true</code> if they pass the filter
Expand Down Expand Up @@ -2650,6 +2686,28 @@ public Observable<Notification<T>> materialize() {
return materialize(this);
}

/**
* Asynchronously subscribes and unsubscribes observers on the specified scheduler.
*
* @param scheduler
* the scheduler to perform subscription and unsubscription actions on.
* @return the source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.
*/
public Observable<T> subscribeOn(Scheduler scheduler) {
return subscribeOn(this, scheduler);
}

/**
* Asynchronously notify observers on the specified scheduler.
*
* @param scheduler
* the scheduler to notify observers on.
* @return the source sequence whose observations happen on the specified scheduler.
*/
public Observable<T> observeOn(Scheduler scheduler) {
return observeOn(this, scheduler);
}

/**
* Dematerializes the explicit notification values of an observable sequence as implicit notifications.
*
Expand All @@ -2660,7 +2718,7 @@ public Observable<Notification<T>> materialize() {
*/
@SuppressWarnings("unchecked")
public <T2> Observable<T2> dematerialize() {
return dematerialize((Observable<Notification<T2>>)this);
return dematerialize((Observable<Notification<T2>>) this);
}

/**
Expand Down Expand Up @@ -3005,7 +3063,9 @@ public Observable<T> scan(final T initialValue, final Object accumulator) {

/**
* Determines whether all elements of an observable sequence satisfies a condition.
* @param predicate a function to test each element for a condition.
*
* @param predicate
* a function to test each element for a condition.
* @return true if all elements of an observable sequence satisfies a condition; otherwise, false.
*/
public Observable<Boolean> all(Func1<T, Boolean> predicate) {
Expand All @@ -3014,7 +3074,9 @@ public Observable<Boolean> all(Func1<T, Boolean> predicate) {

/**
* Determines whether all elements of an observable sequence satisfies a condition.
* @param predicate a function to test each element for a condition.
*
* @param predicate
* a function to test each element for a condition.
* @return true if all elements of an observable sequence satisfies a condition; otherwise, false.
*/
public Observable<Boolean> all(Object predicate) {
Expand Down
65 changes: 65 additions & 0 deletions rxjava-core/src/main/java/rx/Scheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx;

import rx.util.functions.Action0;
import rx.util.functions.Func0;

import java.util.concurrent.TimeUnit;

/**
* Represents an object that schedules units of work.
*/
public interface Scheduler {

/**
* Schedules a cancelable action to be executed.
*
* @param action action
* @return a subscription to be able to unsubscribe from action.
*/
Subscription schedule(Func0<Subscription> action);

/**
* Schedules an action to be executed.
*
* @param action action
* @return a subscription to be able to unsubscribe from action.
*/
Subscription schedule(Action0 action);

/**
* Schedules an action to be executed in dueTime.
*
* @param action action
* @return a subscription to be able to unsubscribe from action.
*/
Subscription schedule(Action0 action, long dueTime, TimeUnit unit);

/**
* Schedules a cancelable action to be executed in dueTime.
*
* @param action action
* @return a subscription to be able to unsubscribe from action.
*/
Subscription schedule(Func0<Subscription> action, long dueTime, TimeUnit unit);

/**
* Returns the scheduler's notion of current time.
*/
long now();

}
58 changes: 58 additions & 0 deletions rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.concurrency;

import rx.Scheduler;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Func0;

import java.util.concurrent.TimeUnit;

public abstract class AbstractScheduler implements Scheduler {

@Override
public Subscription schedule(Action0 action) {
return schedule(asFunc0(action));
}

@Override
public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) {
return schedule(asFunc0(action), dueTime, unit);
}

@Override
public Subscription schedule(Func0<Subscription> action, long dueTime, TimeUnit unit) {
return schedule(new SleepingAction(action, this, dueTime, unit));
}

@Override
public long now() {
return System.nanoTime();
}

private static Func0<Subscription> asFunc0(final Action0 action) {
return new Func0<Subscription>() {
@Override
public Subscription call() {
action.call();
return Subscriptions.empty();
}
};
}

}
Loading