/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.schedulers;
import java.util.concurrent.
TimeUnit;
import java.util.concurrent.atomic.
AtomicBoolean;
import java.util.concurrent.atomic.
AtomicReference;
import rx.
Completable;
import rx.
Completable.
OnSubscribe;
import rx.
CompletableSubscriber;
import rx.
Observable;
import rx.
Observer;
import rx.
Scheduler;
import rx.
Subscription;
import rx.functions.
Action0;
import rx.functions.
Func1;
import rx.internal.operators.
BufferUntilSubscriber;
import rx.observers.
SerializedObserver;
import rx.subjects.
PublishSubject;
import rx.subscriptions.
Subscriptions;
/**
* Allows the use of operators for controlling the timing around when actions
* scheduled on workers are actually done. This makes it possible to layer
* additional behavior on this {@link Scheduler}. The only parameter is a
* function that flattens an {@link Observable} of {@link Observable} of
* {@link Completable}s into just one {@link Completable}. There must be a chain
* of operators connecting the returned value to the source {@link Observable}
* otherwise any work scheduled on the returned {@link Scheduler} will not be
* executed.
* <p>
* When {@link Scheduler#createWorker()} is invoked a {@link Observable} of
* {@link Completable}s is onNext'd to the combinator to be flattened. If the
* inner {@link Observable} is not immediately subscribed to an calls to
* {@link Worker#schedule} are buffered. Once the {@link Observable} is
* subscribed to actions are then onNext'd as {@link Completable}s.
* <p>
* Finally the actions scheduled on the parent {@link Scheduler} when the inner
* most {@link Completable}s are subscribed to.
* <p>
* When the {@link rx.Scheduler.Worker} is unsubscribed the {@link Completable} emits an
* onComplete and triggers any behavior in the flattening operator. The
* {@link Observable} and all {@link Completable}s give to the flattening
* function never onError.
* <p>
* Limit the amount concurrency two at a time without creating a new fix size
* thread pool:
*
* <pre>
* Scheduler limitScheduler = Schedulers.computation().when(workers -> {
* // use merge max concurrent to limit the number of concurrent
* // callbacks two at a time
* return Completable.merge(Observable.merge(workers), 2);
* });
* </pre>
* <p>
* This is a slightly different way to limit the concurrency but it has some
* interesting benefits and drawbacks to the method above. It works by limited
* the number of concurrent {@link rx.Scheduler.Worker}s rather than individual actions.
* Generally each {@link Observable} uses its own {@link rx.Scheduler.Worker}. This means
* that this will essentially limit the number of concurrent subscribes. The
* danger comes from using operators like
* {@link Observable#zip(Observable, Observable, rx.functions.Func2)} where
* subscribing to the first {@link Observable} could deadlock the subscription
* to the second.
*
* <pre>
* Scheduler limitScheduler = Schedulers.computation().when(workers -> {
* // use merge max concurrent to limit the number of concurrent
* // Observables two at a time
* return Completable.merge(Observable.merge(workers, 2));
* });
* </pre>
*
* Slowing down the rate to no more than than 1 a second. This suffers from the
* same problem as the one above I could find an {@link Observable} operator
* that limits the rate without dropping the values (aka leaky bucket
* algorithm).
*
* <pre>
* Scheduler slowScheduler = Schedulers.computation().when(workers -> {
* // use concatenate to make each worker happen one at a time.
* return Completable.concat(workers.map(actions -> {
* // delay the starting of the next worker by 1 second.
* return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS));
* }));
* });
* </pre>
* @since 1.3
*/
public class
SchedulerWhen extends
Scheduler implements
Subscription {
private final
Scheduler actualScheduler;
private final
Observer<
Observable<
Completable>>
workerObserver;
private final
Subscription subscription;
public
SchedulerWhen(
Func1<
Observable<
Observable<
Completable>>,
Completable>
combine,
Scheduler actualScheduler) {
this.
actualScheduler =
actualScheduler;
// workers are converted into completables and put in this queue.
PublishSubject<
Observable<
Completable>>
workerSubject =
PublishSubject.
create();
this.
workerObserver = new
SerializedObserver<
Observable<
Completable>>(
workerSubject);
// send it to a custom combinator to pick the order and rate at which
// workers are processed.
this.
subscription =
combine.
call(
workerSubject.
onBackpressureBuffer()).
subscribe();
}
@
Override
public void
unsubscribe() {
subscription.
unsubscribe();
}
@
Override
public boolean
isUnsubscribed() {
return
subscription.
isUnsubscribed();
}
@
Override
public
Worker createWorker() {
final
Worker actualWorker =
actualScheduler.
createWorker();
// a queue for the actions submitted while worker is waiting to get to
// the subscribe to off the workerQueue.
BufferUntilSubscriber<
ScheduledAction>
actionSubject =
BufferUntilSubscriber.<
ScheduledAction>
create();
final
Observer<
ScheduledAction>
actionObserver = new
SerializedObserver<
ScheduledAction>(
actionSubject);
// convert the work of scheduling all the actions into a completable
Observable<
Completable>
actions =
actionSubject.
map(new
Func1<
ScheduledAction,
Completable>() {
@
Override
public
Completable call(final
ScheduledAction action) {
return
Completable.
create(new
OnSubscribe() {
@
Override
public void
call(
CompletableSubscriber actionCompletable) {
actionCompletable.
onSubscribe(
action);
action.
call(
actualWorker,
actionCompletable);
}
});
}
});
// a worker that queues the action to the actionQueue subject.
Worker worker = new
Worker() {
private final
AtomicBoolean unsubscribed = new
AtomicBoolean();
@
Override
public void
unsubscribe() {
// complete the actionQueue when worker is unsubscribed to make
// room for the next worker in the workerQueue.
if (
unsubscribed.
compareAndSet(false, true)) {
actualWorker.
unsubscribe();
actionObserver.
onCompleted();
}
}
@
Override
public boolean
isUnsubscribed() {
return
unsubscribed.
get();
}
@
Override
public
Subscription schedule(final
Action0 action, final long
delayTime, final
TimeUnit unit) {
// send a scheduled action to the actionQueue
DelayedAction delayedAction = new
DelayedAction(
action,
delayTime,
unit);
actionObserver.
onNext(
delayedAction);
return
delayedAction;
}
@
Override
public
Subscription schedule(final
Action0 action) {
// send a scheduled action to the actionQueue
ImmediateAction immediateAction = new
ImmediateAction(
action);
actionObserver.
onNext(
immediateAction);
return
immediateAction;
}
};
// enqueue the completable that process actions put in reply subject
workerObserver.
onNext(
actions);
// return the worker that adds actions to the reply subject
return
worker;
}
static final
Subscription SUBSCRIBED = new
Subscription() {
@
Override
public void
unsubscribe() {
}
@
Override
public boolean
isUnsubscribed() {
return false;
}
};
static final
Subscription UNSUBSCRIBED =
Subscriptions.
unsubscribed();
@
SuppressWarnings("serial")
static abstract class
ScheduledAction extends
AtomicReference<
Subscription>implements
Subscription {
public
ScheduledAction() {
super(
SUBSCRIBED);
}
private void
call(
Worker actualWorker,
CompletableSubscriber actionCompletable) {
Subscription oldState =
get();
// either SUBSCRIBED or UNSUBSCRIBED
if (
oldState ==
UNSUBSCRIBED) {
// no need to schedule return
return;
}
if (
oldState !=
SUBSCRIBED) {
// has already been scheduled return
// should not be able to get here but handle it anyway by not
// rescheduling.
return;
}
Subscription newState =
callActual(
actualWorker,
actionCompletable);
if (!
compareAndSet(
SUBSCRIBED,
newState)) {
// set would only fail if the new current state is some other
// subscription from a concurrent call to this method.
// Unsubscribe from the action just scheduled because it lost
// the race.
newState.
unsubscribe();
}
}
protected abstract
Subscription callActual(
Worker actualWorker,
CompletableSubscriber actionCompletable);
@
Override
public boolean
isUnsubscribed() {
return
get().
isUnsubscribed();
}
@
Override
public void
unsubscribe() {
Subscription oldState;
// no matter what the current state is the new state is going to be
Subscription newState =
UNSUBSCRIBED;
do {
oldState =
get();
if (
oldState ==
UNSUBSCRIBED) {
// the action has already been unsubscribed
return;
}
} while (!
compareAndSet(
oldState,
newState));
if (
oldState !=
SUBSCRIBED) {
// the action was scheduled. stop it.
oldState.
unsubscribe();
}
}
}
@
SuppressWarnings("serial")
static class
ImmediateAction extends
ScheduledAction {
private final
Action0 action;
public
ImmediateAction(
Action0 action) {
this.
action =
action;
}
@
Override
protected
Subscription callActual(
Worker actualWorker,
CompletableSubscriber actionCompletable) {
return
actualWorker.
schedule(new
OnCompletedAction(
action,
actionCompletable));
}
}
@
SuppressWarnings("serial")
static class
DelayedAction extends
ScheduledAction {
private final
Action0 action;
private final long
delayTime;
private final
TimeUnit unit;
public
DelayedAction(
Action0 action, long
delayTime,
TimeUnit unit) {
this.
action =
action;
this.
delayTime =
delayTime;
this.
unit =
unit;
}
@
Override
protected
Subscription callActual(
Worker actualWorker,
CompletableSubscriber actionCompletable) {
return
actualWorker.
schedule(new
OnCompletedAction(
action,
actionCompletable),
delayTime,
unit);
}
}
static class
OnCompletedAction implements
Action0 {
private
CompletableSubscriber actionCompletable;
private
Action0 action;
public
OnCompletedAction(
Action0 action,
CompletableSubscriber actionCompletable) {
this.
action =
action;
this.
actionCompletable =
actionCompletable;
}
@
Override
public void
call() {
try {
action.
call();
} finally {
actionCompletable.
onCompleted();
}
}
}
}