/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.observables;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.
AtomicReference;
import rx.*;
import rx.
Observable;
import rx.
Observer;
import rx.exceptions.*;
import rx.functions.*;
import rx.internal.operators.*;
import rx.internal.util.*;
import rx.subscriptions.
Subscriptions;
/**
* {@code BlockingObservable} is a variety of {@link Observable} that provides blocking operators. It can be
* useful for testing and demo purposes, but is generally inappropriate for production applications (if you
* think you need to use a {@code BlockingObservable} this is usually a sign that you should rethink your
* design).
* <p>
* You construct a {@code BlockingObservable} from an {@code Observable} with {@link #from(Observable)} or
* {@link Observable#toBlocking()}.
* <p>
* The documentation for this interface makes use of a form of marble diagram that has been modified to
* illustrate blocking operators. The following legend explains these marble diagrams:
* <p>
* <img width="640" height="301" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.legend.png" alt="">
*
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators">RxJava wiki: Blocking
* Observable Operators</a>
* @param <T>
* the type of item emitted by the {@code BlockingObservable}
*/
public final class
BlockingObservable<T> {
private final
Observable<? extends T>
o;
/** Constant to indicate the onStart method should be called. */
static final
Object ON_START = new
Object();
/** Constant indicating the setProducer method should be called. */
static final
Object SET_PRODUCER = new
Object();
/** Indicates an unsubscription happened */
static final
Object UNSUBSCRIBE = new
Object();
private
BlockingObservable(
Observable<? extends T>
o) {
this.
o =
o;
}
/**
* Converts an {@link Observable} into a {@code BlockingObservable}.
*
* @param <T> the observed value type
* @param o
* the {@link Observable} you want to convert
* @return a {@code BlockingObservable} version of {@code o}
*/
public static <T>
BlockingObservable<T>
from(final
Observable<? extends T>
o) {
return new
BlockingObservable<T>(
o);
}
/**
* Invokes a method on each item emitted by this {@code BlockingObservable} and blocks until the Observable
* completes.
* <p>
* <em>Note:</em> This will block even if the underlying Observable is asynchronous.
* <p>
* <img width="640" height="330" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.forEach.png" alt="">
* <p>
* This is similar to {@link Observable#subscribe(Subscriber)}, but it blocks. Because it blocks it does not
* need the {@link Subscriber#onCompleted()} or {@link Subscriber#onError(Throwable)} methods. If the
* underlying Observable terminates with an error, rather than calling {@code onError}, this method will
* throw an exception.
*
* <p>The difference between this method and {@link #subscribe(Action1)} is that the {@code onNext} action
* is executed on the emission thread instead of the current thread.
*
* @param onNext
* the {@link Action1} to invoke for each item emitted by the {@code BlockingObservable}
* @throws RuntimeException
* if an error occurs
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX documentation: Subscribe</a>
* @see #subscribe(Action1)
*/
public void
forEach(final
Action1<? super T>
onNext) {
final
CountDownLatch latch = new
CountDownLatch(1);
final
AtomicReference<
Throwable>
exceptionFromOnError = new
AtomicReference<
Throwable>();
/*
* Use 'subscribe' instead of 'unsafeSubscribe' for Rx contract behavior
* (see http://reactivex.io/documentation/contract.html) as this is the final subscribe in the chain.
*/
@
SuppressWarnings("unchecked")
Subscription subscription = ((
Observable<T>)
o).
subscribe(new
Subscriber<T>() {
@
Override
public void
onCompleted() {
latch.
countDown();
}
@
Override
public void
onError(
Throwable e) {
/*
* If we receive an onError event we set the reference on the
* outer thread so we can git it and throw after the
* latch.await().
*
* We do this instead of throwing directly since this may be on
* a different thread and the latch is still waiting.
*/
exceptionFromOnError.
set(
e);
latch.
countDown();
}
@
Override
public void
onNext(T
args) {
onNext.
call(
args);
}
});
BlockingUtils.
awaitForComplete(
latch,
subscription);
if (
exceptionFromOnError.
get() != null) {
Exceptions.
propagate(
exceptionFromOnError.
get());
}
}
/**
* Returns an {@link Iterator} that iterates over all items emitted by this {@code BlockingObservable}.
* <p>
* <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.getIterator.png" alt="">
*
* @return an {@link Iterator} that can iterate over the items emitted by this {@code BlockingObservable}
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX documentation: To</a>
*/
@
SuppressWarnings({ "unchecked", "cast" })
public
Iterator<T>
getIterator() {
return
BlockingOperatorToIterator.
toIterator((
Observable<T>)
o);
}
/**
* Returns the first item emitted by this {@code BlockingObservable}, or throws
* {@code NoSuchElementException} if it emits no items.
*
* @return the first item emitted by this {@code BlockingObservable}
* @throws NoSuchElementException
* if this {@code BlockingObservable} emits no items
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
*/
public T
first() {
return
blockForSingle(
o.
first());
}
/**
* Returns the first item emitted by this {@code BlockingObservable} that matches a predicate, or throws
* {@code NoSuchElementException} if it emits no such item.
*
* @param predicate
* a predicate function to evaluate items emitted by this {@code BlockingObservable}
* @return the first item emitted by this {@code BlockingObservable} that matches the predicate
* @throws NoSuchElementException
* if this {@code BlockingObservable} emits no such items
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
*/
public T
first(
Func1<? super T,
Boolean>
predicate) {
return
blockForSingle(
o.
first(
predicate));
}
/**
* Returns the first item emitted by this {@code BlockingObservable}, or a default value if it emits no
* items.
*
* @param defaultValue
* a default value to return if this {@code BlockingObservable} emits no items
* @return the first item emitted by this {@code BlockingObservable}, or the default value if it emits no
* items
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
*/
public T
firstOrDefault(T
defaultValue) {
return
blockForSingle(
o.
map(
UtilityFunctions.<T>
identity()).
firstOrDefault(
defaultValue));
}
/**
* Returns the first item emitted by this {@code BlockingObservable} that matches a predicate, or a default
* value if it emits no such items.
*
* @param defaultValue
* a default value to return if this {@code BlockingObservable} emits no matching items
* @param predicate
* a predicate function to evaluate items emitted by this {@code BlockingObservable}
* @return the first item emitted by this {@code BlockingObservable} that matches the predicate, or the
* default value if this {@code BlockingObservable} emits no matching items
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
*/
public T
firstOrDefault(T
defaultValue,
Func1<? super T,
Boolean>
predicate) {
return
blockForSingle(
o.
filter(
predicate).
map(
UtilityFunctions.<T>
identity()).
firstOrDefault(
defaultValue));
}
/**
* Returns the last item emitted by this {@code BlockingObservable}, or throws
* {@code NoSuchElementException} if this {@code BlockingObservable} emits no items.
* <p>
* <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.last.png" alt="">
*
* @return the last item emitted by this {@code BlockingObservable}
* @throws NoSuchElementException
* if this {@code BlockingObservable} emits no items
* @see <a href="http://reactivex.io/documentation/operators/last.html">ReactiveX documentation: Last</a>
*/
public T
last() {
return
blockForSingle(
o.
last());
}
/**
* Returns the last item emitted by this {@code BlockingObservable} that matches a predicate, or throws
* {@code NoSuchElementException} if it emits no such items.
* <p>
* <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.last.p.png" alt="">
*
* @param predicate
* a predicate function to evaluate items emitted by the {@code BlockingObservable}
* @return the last item emitted by the {@code BlockingObservable} that matches the predicate
* @throws NoSuchElementException
* if this {@code BlockingObservable} emits no items
* @see <a href="http://reactivex.io/documentation/operators/last.html">ReactiveX documentation: Last</a>
*/
public T
last(final
Func1<? super T,
Boolean>
predicate) {
return
blockForSingle(
o.
last(
predicate));
}
/**
* Returns the last item emitted by this {@code BlockingObservable}, or a default value if it emits no
* items.
* <p>
* <img width="640" height="310" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.lastOrDefault.png" alt="">
*
* @param defaultValue
* a default value to return if this {@code BlockingObservable} emits no items
* @return the last item emitted by the {@code BlockingObservable}, or the default value if it emits no
* items
* @see <a href="http://reactivex.io/documentation/operators/last.html">ReactiveX documentation: Last</a>
*/
public T
lastOrDefault(T
defaultValue) {
return
blockForSingle(
o.
map(
UtilityFunctions.<T>
identity()).
lastOrDefault(
defaultValue));
}
/**
* Returns the last item emitted by this {@code BlockingObservable} that matches a predicate, or a default
* value if it emits no such items.
* <p>
* <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.lastOrDefault.p.png" alt="">
*
* @param defaultValue
* a default value to return if this {@code BlockingObservable} emits no matching items
* @param predicate
* a predicate function to evaluate items emitted by this {@code BlockingObservable}
* @return the last item emitted by this {@code BlockingObservable} that matches the predicate, or the
* default value if it emits no matching items
* @see <a href="http://reactivex.io/documentation/operators/last.html">ReactiveX documentation: Last</a>
*/
public T
lastOrDefault(T
defaultValue,
Func1<? super T,
Boolean>
predicate) {
return
blockForSingle(
o.
filter(
predicate).
map(
UtilityFunctions.<T>
identity()).
lastOrDefault(
defaultValue));
}
/**
* Returns an {@link Iterable} that always returns the item most recently emitted by this
* {@code BlockingObservable}.
* <p>
* <img width="640" height="490" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.mostRecent.png" alt="">
*
* @param initialValue
* the initial value that the {@link Iterable} sequence will yield if this
* {@code BlockingObservable} has not yet emitted an item
* @return an {@link Iterable} that on each iteration returns the item that this {@code BlockingObservable}
* has most recently emitted
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
*/
public
Iterable<T>
mostRecent(T
initialValue) {
return
BlockingOperatorMostRecent.
mostRecent(
o,
initialValue);
}
/**
* Returns an {@link Iterable} that blocks until this {@code BlockingObservable} emits another item, then
* returns that item.
* <p>
* <img width="640" height="490" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.next.png" alt="">
*
* @return an {@link Iterable} that blocks upon each iteration until this {@code BlockingObservable} emits
* a new item, whereupon the Iterable returns that item
* @see <a href="http://reactivex.io/documentation/operators/takelast.html">ReactiveX documentation: TakeLast</a>
*/
@
SuppressWarnings({ "cast", "unchecked" })
public
Iterable<T>
next() {
return
BlockingOperatorNext.
next((
Observable<T>)
o);
}
/**
* Returns an {@link Iterable} that returns the latest item emitted by this {@code BlockingObservable},
* waiting if necessary for one to become available.
* <p>
* If this {@code BlockingObservable} produces items faster than {@code Iterator.next} takes them,
* {@code onNext} events might be skipped, but {@code onError} or {@code onCompleted} events are not.
* <p>
* Note also that an {@code onNext} directly followed by {@code onCompleted} might hide the {@code onNext}
* event.
*
* @return an Iterable that always returns the latest item emitted by this {@code BlockingObservable}
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
*/
@
SuppressWarnings({ "cast", "unchecked" })
public
Iterable<T>
latest() {
return
BlockingOperatorLatest.
latest((
Observable<T>)
o);
}
/**
* If this {@code BlockingObservable} completes after emitting a single item, return that item, otherwise
* throw a {@code NoSuchElementException}.
* <p>
* <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.single.png" alt="">
*
* @return the single item emitted by this {@code BlockingObservable}
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
*/
public T
single() {
return
blockForSingle(
o.
single());
}
/**
* If this {@code BlockingObservable} completes after emitting a single item that matches a given predicate,
* return that item, otherwise throw a {@code NoSuchElementException}.
* <p>
* <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.single.p.png" alt="">
*
* @param predicate
* a predicate function to evaluate items emitted by this {@link BlockingObservable}
* @return the single item emitted by this {@code BlockingObservable} that matches the predicate
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
*/
public T
single(
Func1<? super T,
Boolean>
predicate) {
return
blockForSingle(
o.
single(
predicate));
}
/**
* If this {@code BlockingObservable} completes after emitting a single item, return that item; if it emits
* more than one item, throw an {@code IllegalArgumentException}; if it emits no items, return a default
* value.
* <p>
* <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.singleOrDefault.png" alt="">
*
* @param defaultValue
* a default value to return if this {@code BlockingObservable} emits no items
* @return the single item emitted by this {@code BlockingObservable}, or the default value if it emits no
* items
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
*/
public T
singleOrDefault(T
defaultValue) {
return
blockForSingle(
o.
map(
UtilityFunctions.<T>
identity()).
singleOrDefault(
defaultValue));
}
/**
* If this {@code BlockingObservable} completes after emitting a single item that matches a predicate,
* return that item; if it emits more than one such item, throw an {@code IllegalArgumentException}; if it
* emits no items, return a default value.
* <p>
* <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.singleOrDefault.p.png" alt="">
*
* @param defaultValue
* a default value to return if this {@code BlockingObservable} emits no matching items
* @param predicate
* a predicate function to evaluate items emitted by this {@code BlockingObservable}
* @return the single item emitted by the {@code BlockingObservable} that matches the predicate, or the
* default value if no such items are emitted
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
*/
public T
singleOrDefault(T
defaultValue,
Func1<? super T,
Boolean>
predicate) {
return
blockForSingle(
o.
filter(
predicate).
map(
UtilityFunctions.<T>
identity()).
singleOrDefault(
defaultValue));
}
/**
* Returns a {@link Future} representing the single value emitted by this {@code BlockingObservable}.
* <p>
* If {@link BlockingObservable} emits more than one item, {@link java.util.concurrent.Future} will receive an
* {@link java.lang.IllegalArgumentException}. If {@link BlockingObservable} is empty, {@link java.util.concurrent.Future}
* will receive an {@link java.util.NoSuchElementException}.
* <p>
* If the {@code BlockingObservable} may emit more than one item, use {@code Observable.toList().toBlocking().toFuture()}.
* <p>
* <img width="640" height="395" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.toFuture.png" alt="">
*
* @return a {@link Future} that expects a single item to be emitted by this {@code BlockingObservable}
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX documentation: To</a>
*/
@
SuppressWarnings({ "cast", "unchecked" })
public
Future<T>
toFuture() {
return
BlockingOperatorToFuture.
toFuture((
Observable<T>)
o);
}
/**
* Converts this {@code BlockingObservable} into an {@link Iterable}.
* <p>
* <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.toIterable.png" alt="">
*
* @return an {@link Iterable} version of this {@code BlockingObservable}
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX documentation: To</a>
*/
public
Iterable<T>
toIterable() {
return new
Iterable<T>() {
@
Override
public
Iterator<T>
iterator() {
return
getIterator();
}
};
}
/**
* Helper method which handles the actual blocking for a single response.
* <p>
* If the {@link Observable} errors, it will be thrown right away.
*
* @return the actual item
*/
private T
blockForSingle(final
Observable<? extends T>
observable) {
final
AtomicReference<T>
returnItem = new
AtomicReference<T>();
final
AtomicReference<
Throwable>
returnException = new
AtomicReference<
Throwable>();
final
CountDownLatch latch = new
CountDownLatch(1);
@
SuppressWarnings("unchecked")
Subscription subscription = ((
Observable<T>)
observable).
subscribe(new
Subscriber<T>() {
@
Override
public void
onCompleted() {
latch.
countDown();
}
@
Override
public void
onError(final
Throwable e) {
returnException.
set(
e);
latch.
countDown();
}
@
Override
public void
onNext(final T
item) {
returnItem.
set(
item);
}
});
BlockingUtils.
awaitForComplete(
latch,
subscription);
if (
returnException.
get() != null) {
Exceptions.
propagate(
returnException.
get());
}
return
returnItem.
get();
}
/**
* Runs the source observable to a terminal event, ignoring any values and rethrowing any exception.
* @since 1.3
*/
public void
subscribe() {
final
CountDownLatch cdl = new
CountDownLatch(1);
final
Throwable[]
error = { null };
@
SuppressWarnings("unchecked")
Subscription s = ((
Observable<T>)
o).
subscribe(new
Subscriber<T>() {
@
Override
public void
onNext(T
t) {
// deliberately ignored
}
@
Override
public void
onError(
Throwable e) {
error[0] =
e;
cdl.
countDown();
}
@
Override
public void
onCompleted() {
cdl.
countDown();
}
});
BlockingUtils.
awaitForComplete(
cdl,
s);
Throwable e =
error[0];
if (
e != null) {
Exceptions.
propagate(
e);
}
}
/**
* Subscribes to the source and calls back the Observer methods on the current thread.
* @param observer the observer to call event methods on
* @since 1.3
*/
public void
subscribe(
Observer<? super T>
observer) {
final
BlockingQueue<
Object>
queue = new
LinkedBlockingQueue<
Object>();
@
SuppressWarnings("unchecked")
Subscription s = ((
Observable<T>)
o).
subscribe(new
Subscriber<T>() {
@
Override
public void
onNext(T
t) {
queue.
offer(
NotificationLite.
next(
t));
}
@
Override
public void
onError(
Throwable e) {
queue.
offer(
NotificationLite.
error(
e));
}
@
Override
public void
onCompleted() {
queue.
offer(
NotificationLite.
completed());
}
});
try {
for (;;) {
Object o =
queue.
poll();
if (
o == null) {
o =
queue.
take();
}
if (
NotificationLite.
accept(
observer,
o)) {
return;
}
}
} catch (
InterruptedException e) {
Thread.
currentThread().
interrupt();
observer.
onError(
e);
} finally {
s.
unsubscribe();
}
}
/**
* Subscribes to the source and calls the Subscriber methods on the current thread.
* <p>
* The unsubscription and backpressure is composed through.
* @param subscriber the subscriber to forward events and calls to in the current thread
* @since 1.3
*/
@
SuppressWarnings("unchecked")
public void
subscribe(
Subscriber<? super T>
subscriber) {
final
BlockingQueue<
Object>
queue = new
LinkedBlockingQueue<
Object>();
final
Producer[]
theProducer = { null };
Subscriber<T>
s = new
Subscriber<T>() {
@
Override
public void
onNext(T
t) {
queue.
offer(
NotificationLite.
next(
t));
}
@
Override
public void
onError(
Throwable e) {
queue.
offer(
NotificationLite.
error(
e));
}
@
Override
public void
onCompleted() {
queue.
offer(
NotificationLite.
completed());
}
@
Override
public void
setProducer(
Producer p) {
theProducer[0] =
p;
queue.
offer(
SET_PRODUCER);
}
@
Override
public void
onStart() {
queue.
offer(
ON_START);
}
};
subscriber.
add(
s);
subscriber.
add(
Subscriptions.
create(new
Action0() {
@
Override
public void
call() {
queue.
offer(
UNSUBSCRIBE);
}
}));
((
Observable<T>)
o).
subscribe(
s);
try {
for (;;) {
if (
subscriber.
isUnsubscribed()) {
break;
}
Object o =
queue.
poll();
if (
o == null) {
o =
queue.
take();
}
if (
subscriber.
isUnsubscribed() ||
o ==
UNSUBSCRIBE) {
break;
}
if (
o ==
ON_START) {
subscriber.
onStart();
} else
if (
o ==
SET_PRODUCER) {
subscriber.
setProducer(
theProducer[0]);
} else
if (
NotificationLite.
accept(
subscriber,
o)) {
return;
}
}
} catch (
InterruptedException e) {
Thread.
currentThread().
interrupt();
subscriber.
onError(
e);
} finally {
s.
unsubscribe();
}
}
/**
* Subscribes to the source and calls the given action on the current thread and rethrows any exception wrapped
* into OnErrorNotImplementedException.
*
* <p>The difference between this method and {@link #forEach(Action1)} is that the
* action is always executed on the current thread.
*
* @param onNext the callback action for each source value
* @see #forEach(Action1)
* @since 1.3
*/
public void
subscribe(final
Action1<? super T>
onNext) {
subscribe(
onNext, new
Action1<
Throwable>() {
@
Override
public void
call(
Throwable t) {
throw new
OnErrorNotImplementedException(
t);
}
},
Actions.
empty());
}
/**
* Subscribes to the source and calls the given actions on the current thread.
* @param onNext the callback action for each source value
* @param onError the callback action for an error event
* @since 1.3
*/
public void
subscribe(final
Action1<? super T>
onNext, final
Action1<? super
Throwable>
onError) {
subscribe(
onNext,
onError,
Actions.
empty());
}
/**
* Subscribes to the source and calls the given actions on the current thread.
* @param onNext the callback action for each source value
* @param onError the callback action for an error event
* @param onCompleted the callback action for the completion event.
* @since 1.3
*/
public void
subscribe(final
Action1<? super T>
onNext, final
Action1<? super
Throwable>
onError, final
Action0 onCompleted) {
subscribe(new
Observer<T>() {
@
Override
public void
onNext(T
t) {
onNext.
call(
t);
}
@
Override
public void
onError(
Throwable e) {
onError.
call(
e);
}
@
Override
public void
onCompleted() {
onCompleted.
call();
}
});
}
}