/**
* Copyright 2014 Netflix, Inc.
* <p/>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.util;
import rx.*;
import rx.
Scheduler.
Worker;
import rx.functions.*;
import rx.internal.schedulers.
EventLoopsScheduler;
public final class
ScalarSynchronousSingle<T> extends
Single<T> {
final T
value;
public static <T>
ScalarSynchronousSingle<T>
create(T
t) {
return new
ScalarSynchronousSingle<T>(
t);
}
protected
ScalarSynchronousSingle(final T
t) {
super(new
OnSubscribe<T>() {
@
Override
public void
call(
SingleSubscriber<? super T>
te) {
te.
onSuccess(
t);
}
});
this.
value =
t;
}
public T
get() {
return
value;
}
/**
* Customized observeOn/subscribeOn implementation which emits the scalar
* value directly or with less overhead on the specified scheduler.
*
* @param scheduler the target scheduler
* @return the new observable
*/
public
Single<T>
scalarScheduleOn(
Scheduler scheduler) {
if (
scheduler instanceof
EventLoopsScheduler) {
EventLoopsScheduler es = (
EventLoopsScheduler)
scheduler;
return
create(new
DirectScheduledEmission<T>(
es,
value));
}
return
create(new
NormalScheduledEmission<T>(
scheduler,
value));
}
/**
* Optimized observeOn for scalar value observed on the EventLoopsScheduler.
*/
static final class
DirectScheduledEmission<T> implements
OnSubscribe<T> {
private final
EventLoopsScheduler es;
private final T
value;
DirectScheduledEmission(
EventLoopsScheduler es, T
value) {
this.
es =
es;
this.
value =
value;
}
@
Override
public void
call(
SingleSubscriber<? super T>
singleSubscriber) {
singleSubscriber.
add(
es.
scheduleDirect(new
ScalarSynchronousSingleAction<T>(
singleSubscriber,
value)));
}
}
/**
* Emits a scalar value on a general scheduler.
*/
static final class
NormalScheduledEmission<T> implements
OnSubscribe<T> {
private final
Scheduler scheduler;
private final T
value;
NormalScheduledEmission(
Scheduler scheduler, T
value) {
this.
scheduler =
scheduler;
this.
value =
value;
}
@
Override
public void
call(
SingleSubscriber<? super T>
singleSubscriber) {
Worker worker =
scheduler.
createWorker();
singleSubscriber.
add(
worker);
worker.
schedule(new
ScalarSynchronousSingleAction<T>(
singleSubscriber,
value));
}
}
/**
* Action that emits a single value when called.
*/
static final class
ScalarSynchronousSingleAction<T> implements
Action0 {
private final
SingleSubscriber<? super T>
subscriber;
private final T
value;
ScalarSynchronousSingleAction(
SingleSubscriber<? super T>
subscriber,
T
value) {
this.
subscriber =
subscriber;
this.
value =
value;
}
@
Override
public void
call() {
try {
subscriber.
onSuccess(
value);
} catch (
Throwable t) {
subscriber.
onError(
t);
}
}
}
public <R>
Single<R>
scalarFlatMap(final
Func1<? super T, ? extends
Single<? extends R>>
func) {
return
create(new
OnSubscribe<R>() {
@
Override
public void
call(final
SingleSubscriber<? super R>
child) {
Single<? extends R>
o =
func.
call(
value);
if (
o instanceof
ScalarSynchronousSingle) {
child.
onSuccess(((
ScalarSynchronousSingle<? extends R>)
o).
value);
} else {
SingleSubscriber<R>
subscriber = new
SingleSubscriber<R>() {
@
Override
public void
onError(
Throwable e) {
child.
onError(
e);
}
@
Override
public void
onSuccess(R
r) {
child.
onSuccess(
r);
}
};
child.
add(
subscriber);
o.
subscribe(
subscriber);
}
}
});
}
}