/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.observers;
import java.util.
Arrays;
import rx.
Subscriber;
import rx.exceptions.*;
import rx.plugins.*;
/**
* {@code SafeSubscriber} is a wrapper around {@code Subscriber} that ensures that the {@code Subscriber}
* complies with <a href="http://reactivex.io/documentation/contract.html">the Observable contract</a>.
* <p>
* The following is taken from <a href="http://go.microsoft.com/fwlink/?LinkID=205219">the Rx Design Guidelines
* document</a>:
* <blockquote><p>
* Messages sent to instances of the {@code IObserver} interface follow the following grammar:
* </p><blockquote><p> {@code OnNext* (OnCompleted | OnError)?} </p></blockquote><p>
* This grammar allows observable sequences to send any amount (0 or more) of {@code OnNext} messages to the
* subscriber, optionally followed by a single success ({@code OnCompleted}) or failure ({@code OnError})
* message.
* </p><p>
* The single message indicating that an observable sequence has finished ensures that consumers of the
* observable sequence can deterministically establish that it is safe to perform cleanup operations.
* </p><p>
* A single failure further ensures that abort semantics can be maintained for operators that work on
* multiple observable sequences (see paragraph 6.6).
* </p></blockquote>
* <p>
* This wrapper does the following:
* <ul>
* <li>Allows only single execution of either {@code onError} or {@code onCompleted}.</li>
* <li>Ensures that once an {@code onCompleted} or {@code onError} is performed, no further calls can be executed</li>
* <li>If {@code unsubscribe} is called, the upstream {@code Observable} is notified and the event delivery will be stopped in a
* best effort manner (i.e., further onXXX calls may still slip through).</li>
* <li>When {@code onError} or {@code onCompleted} occur, unsubscribes from the {@code Observable} (if executing asynchronously).</li>
* </ul>
* {@code SafeSubscriber} will not synchronize {@code onNext} execution. Use {@link SerializedSubscriber} to do
* that.
*
* @param <T>
* the type of item expected by the {@link Subscriber}
*/
public class
SafeSubscriber<T> extends
Subscriber<T> {
private final
Subscriber<? super T>
actual;
boolean
done;
public
SafeSubscriber(
Subscriber<? super T>
actual) {
super(
actual);
this.
actual =
actual;
}
/**
* Notifies the Subscriber that the {@code Observable} has finished sending push-based notifications.
* <p>
* The {@code Observable} will not call this method if it calls {@link #onError}.
*/
@
Override
public void
onCompleted() {
if (!
done) {
done = true;
try {
actual.
onCompleted();
} catch (
Throwable e) {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.
throwIfFatal(
e);
RxJavaHooks.
onError(
e);
throw new
OnCompletedFailedException(
e.
getMessage(),
e);
} finally { // NOPMD
try {
// Similarly to onError if failure occurs in unsubscribe then Rx contract is broken
// and we throw an UnsubscribeFailureException.
unsubscribe();
} catch (
Throwable e) {
RxJavaHooks.
onError(
e);
throw new
UnsubscribeFailedException(
e.
getMessage(),
e);
}
}
}
}
/**
* Notifies the Subscriber that the {@code Observable} has experienced an error condition.
* <p>
* If the {@code Observable} calls this method, it will not thereafter call {@link #onNext} or
* {@link #onCompleted}.
*
* @param e
* the exception encountered by the Observable
*/
@
Override
public void
onError(
Throwable e) {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.
throwIfFatal(
e);
if (!
done) {
done = true;
_onError(
e);
}
}
/**
* Provides the Subscriber with a new item to observe.
* <p>
* The {@code Observable} may call this method 0 or more times.
* <p>
* The {@code Observable} will not call this method again after it calls either {@link #onCompleted} or
* {@link #onError}.
*
* @param t
* the item emitted by the Observable
*/
@
Override
public void
onNext(T
t) {
try {
if (!
done) {
actual.
onNext(
t);
}
} catch (
Throwable e) {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.
throwOrReport(
e, this);
}
}
/**
* The logic for {@code onError} without the {@code isFinished} check so it can be called from within
* {@code onCompleted}.
*
* @see <a href="https://github.com/ReactiveX/RxJava/issues/630">the report of this bug</a>
*/
@
SuppressWarnings("deprecation")
protected void
_onError(
Throwable e) { // NOPMD
RxJavaPlugins.
getInstance().
getErrorHandler().
handleError(
e);
try {
actual.
onError(
e);
} catch (
OnErrorNotImplementedException e2) { // NOPMD
/*
* onError isn't implemented so throw
*
* https://github.com/ReactiveX/RxJava/issues/198
*
* Rx Design Guidelines 5.2
*
* "when calling the Subscribe method that only has an onNext argument, the OnError behavior
* will be to rethrow the exception on the thread that the message comes out from the observable
* sequence. The OnCompleted behavior in this case is to do nothing."
*/
try {
unsubscribe();
} catch (
Throwable unsubscribeException) {
RxJavaHooks.
onError(
unsubscribeException);
throw new
OnErrorNotImplementedException("Observer.onError not implemented and error while unsubscribing.", new
CompositeException(
Arrays.
asList(
e,
unsubscribeException))); // NOPMD
}
throw
e2;
} catch (
Throwable e2) {
/*
* throw since the Rx contract is broken if onError failed
*
* https://github.com/ReactiveX/RxJava/issues/198
*/
RxJavaHooks.
onError(
e2);
try {
unsubscribe();
} catch (
Throwable unsubscribeException) {
RxJavaHooks.
onError(
unsubscribeException);
throw new
OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.", new
CompositeException(
Arrays.
asList(
e,
e2,
unsubscribeException)));
}
throw new
OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError", new
CompositeException(
Arrays.
asList(
e,
e2)));
}
// if we did not throw above we will unsubscribe here, if onError failed then unsubscribe happens in the catch
try {
unsubscribe();
} catch (
Throwable unsubscribeException) {
RxJavaHooks.
onError(
unsubscribeException);
throw new
OnErrorFailedException(
unsubscribeException);
}
}
/**
* Returns the {@link Subscriber} underlying this {@code SafeSubscriber}.
*
* @return the {@link Subscriber} that was used to create this {@code SafeSubscriber}
*/
public
Subscriber<? super T>
getActual() {
return
actual;
}
}