/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.internal.operators;
import java.util.
Queue;
import java.util.concurrent.atomic.*;
import rx.*;
import rx.exceptions.*;
import rx.functions.*;
import rx.internal.util.
RxRingBuffer;
import rx.internal.util.atomic.
SpscAtomicArrayQueue;
import rx.internal.util.unsafe.*;
import rx.observables.
ConnectableObservable;
import rx.subscriptions.
Subscriptions;
/**
* A connectable observable which shares an underlying source and dispatches source values to subscribers in a backpressure-aware
* manner.
* @param <T> the value type
*/
public final class
OperatorPublish<T> extends
ConnectableObservable<T> {
/** The source observable. */
final
Observable<? extends T>
source;
/** Holds the current subscriber that is, will be or just was subscribed to the source observable. */
final
AtomicReference<
PublishSubscriber<T>>
current;
/**
* Creates a OperatorPublish instance to publish values of the given source observable.
* @param <T> the value type
* @param source the source observable
* @return the connectable observable
*/
public static <T>
ConnectableObservable<T>
create(
Observable<? extends T>
source) {
// the current connection to source needs to be shared between the operator and its onSubscribe call
final
AtomicReference<
PublishSubscriber<T>>
curr = new
AtomicReference<
PublishSubscriber<T>>();
OnSubscribe<T>
onSubscribe = new
OnSubscribe<T>() {
@
Override
public void
call(
Subscriber<? super T>
child) {
// concurrent connection/disconnection may change the state,
// we loop to be atomic while the child subscribes
for (;;) {
// get the current subscriber-to-source
PublishSubscriber<T>
r =
curr.
get();
// if there isn't one or it is unsubscribed
if (
r == null ||
r.
isUnsubscribed()) {
// create a new subscriber to source
PublishSubscriber<T>
u = new
PublishSubscriber<T>(
curr);
// perform extra initialization to avoid 'this' to escape during construction
u.
init();
// let's try setting it as the current subscriber-to-source
if (!
curr.
compareAndSet(
r,
u)) {
// didn't work, maybe someone else did it or the current subscriber
// to source has just finished
continue;
}
// we won, let's use it going onwards
r =
u;
}
// create the backpressure-managing producer for this child
InnerProducer<T>
inner = new
InnerProducer<T>(
r,
child);
/*
* Try adding it to the current subscriber-to-source, add is atomic in respect
* to other adds and the termination of the subscriber-to-source.
*/
if (
r.
add(
inner)) {
// the producer has been registered with the current subscriber-to-source so
// at least it will receive the next terminal event
child.
add(
inner);
// setting the producer will trigger the first request to be considered by
// the subscriber-to-source.
child.
setProducer(
inner);
break; // NOPMD
}
/*
* The current PublishSubscriber has been terminated, try with a newer one.
*/
/*
* Note: although technically correct, concurrent disconnects can cause
* unexpected behavior such as child subscribers never receiving anything
* (unless connected again). An alternative approach, similar to
* PublishSubject would be to immediately terminate such child
* subscribers as well:
*
* Object term = r.terminalEvent;
* if (NotificationLite.isCompleted(term)) {
* child.onCompleted();
* } else {
* child.onError(NotificationLite.getError(term));
* }
* return;
*
* The original concurrent behavior was non-deterministic in this regard as well.
* Allowing this behavior, however, may introduce another unexpected behavior:
* after disconnecting a previous connection, one might not be able to prepare
* a new connection right after a previous termination by subscribing new child
* subscribers asynchronously before a connect call.
*/
}
}
};
return new
OperatorPublish<T>(
onSubscribe,
source,
curr);
}
public static <T, R>
Observable<R>
create(final
Observable<? extends T>
source,
final
Func1<? super
Observable<T>, ? extends
Observable<R>>
selector) {
return
create(
source,
selector, false);
}
public static <T, R>
Observable<R>
create(final
Observable<? extends T>
source,
final
Func1<? super
Observable<T>, ? extends
Observable<R>>
selector, final boolean
delayError) {
return
unsafeCreate(new
OnSubscribe<R>() {
@
Override
public void
call(final
Subscriber<? super R>
child) {
final
OnSubscribePublishMulticast<T>
op = new
OnSubscribePublishMulticast<T>(
RxRingBuffer.
SIZE,
delayError);
Subscriber<R>
subscriber = new
Subscriber<R>() {
@
Override
public void
onNext(R
t) {
child.
onNext(
t);
}
@
Override
public void
onError(
Throwable e) {
op.
unsubscribe();
child.
onError(
e);
}
@
Override
public void
onCompleted() {
op.
unsubscribe();
child.
onCompleted();
}
@
Override
public void
setProducer(
Producer p) {
child.
setProducer(
p);
}
};
child.
add(
op);
child.
add(
subscriber);
selector.
call(
Observable.
unsafeCreate(
op)).
unsafeSubscribe(
subscriber);
source.
unsafeSubscribe(
op.
subscriber());
}
});
}
private
OperatorPublish(
OnSubscribe<T>
onSubscribe,
Observable<? extends T>
source,
final
AtomicReference<
PublishSubscriber<T>>
current) {
super(
onSubscribe);
this.
source =
source;
this.
current =
current;
}
@
Override
public void
connect(
Action1<? super
Subscription>
connection) {
boolean
doConnect;
PublishSubscriber<T>
ps;
// we loop because concurrent connect/disconnect and termination may change the state
for (;;) {
// retrieve the current subscriber-to-source instance
ps =
current.
get();
// if there is none yet or the current has unsubscribed
if (
ps == null ||
ps.
isUnsubscribed()) {
// create a new subscriber-to-source
PublishSubscriber<T>
u = new
PublishSubscriber<T>(
current);
// initialize out the constructor to avoid 'this' to escape
u.
init();
// try setting it as the current subscriber-to-source
if (!
current.
compareAndSet(
ps,
u)) {
// did not work, perhaps a new subscriber arrived
// and created a new subscriber-to-source as well, retry
continue;
}
ps =
u;
}
// if connect() was called concurrently, only one of them should actually
// connect to the source
doConnect = !
ps.
shouldConnect.
get() &&
ps.
shouldConnect.
compareAndSet(false, true);
break; // NOPMD
}
/*
* Notify the callback that we have a (new) connection which it can unsubscribe
* but since ps is unique to a connection, multiple calls to connect() will return the
* same Subscription and even if there was a connect-disconnect-connect pair, the older
* references won't disconnect the newer connection.
* Synchronous source consumers have the opportunity to disconnect via unsubscribe on the
* Subscription as unsafeSubscribe may never return in its own.
*
* Note however, that asynchronously disconnecting a running source might leave
* child-subscribers without any terminal event; PublishSubject does not have this
* issue because the unsubscription was always triggered by the child-subscribers
* themselves.
*/
connection.
call(
ps);
if (
doConnect) {
source.
unsafeSubscribe(
ps);
}
}
@
SuppressWarnings("rawtypes")
static final class
PublishSubscriber<T> extends
Subscriber<T> implements
Subscription {
/** Holds notifications from upstream. */
final
Queue<
Object>
queue;
/** Holds onto the current connected PublishSubscriber. */
final
AtomicReference<
PublishSubscriber<T>>
current;
/** Contains either an onCompleted or an onError token from upstream. */
volatile
Object terminalEvent;
/** Indicates an empty array of inner producers. */
static final
InnerProducer[]
EMPTY = new
InnerProducer[0];
/** Indicates a terminated PublishSubscriber. */
static final
InnerProducer[]
TERMINATED = new
InnerProducer[0];
/** Tracks the subscribed producers. */
final
AtomicReference<
InnerProducer[]>
producers;
/**
* Atomically changed from false to true by connect to make sure the
* connection is only performed by one thread.
*/
final
AtomicBoolean shouldConnect;
/** Guarded by this. */
boolean
emitting;
/** Guarded by this. */
boolean
missed;
public
PublishSubscriber(
AtomicReference<
PublishSubscriber<T>>
current) {
this.
queue =
UnsafeAccess.
isUnsafeAvailable()
? new
SpscArrayQueue<
Object>(
RxRingBuffer.
SIZE)
: new
SpscAtomicArrayQueue<
Object>(
RxRingBuffer.
SIZE);
this.
producers = new
AtomicReference<
InnerProducer[]>(
EMPTY);
this.
current =
current;
this.
shouldConnect = new
AtomicBoolean();
}
/** Should be called after the constructor finished to setup nulling-out the current reference. */
void
init() {
add(
Subscriptions.
create(new
Action0() {
@
Override
public void
call() {
PublishSubscriber.this.
producers.
getAndSet(
TERMINATED);
current.
compareAndSet(
PublishSubscriber.this, null);
// we don't care if it fails because it means the current has
// been replaced in the meantime
}
}));
}
@
Override
public void
onStart() {
// since subscribers may have different amount of requests, we try to
// optimize by buffering values up-front and replaying it on individual demand
request(
RxRingBuffer.
SIZE);
}
@
Override
public void
onNext(T
t) {
// we expect upstream to honor backpressure requests
// nl is required because JCTools queue doesn't accept nulls.
if (!
queue.
offer(
NotificationLite.
next(
t))) {
onError(new
MissingBackpressureException());
} else {
// since many things can happen concurrently, we have a common dispatch
// loop to act on the current state serially
dispatch();
}
}
@
Override
public void
onError(
Throwable e) {
// The observer front is accessed serially as required by spec so
// no need to CAS in the terminal value
if (
terminalEvent == null) {
terminalEvent =
NotificationLite.
error(
e);
// since many things can happen concurrently, we have a common dispatch
// loop to act on the current state serially
dispatch();
}
}
@
Override
public void
onCompleted() {
// The observer front is accessed serially as required by spec so
// no need to CAS in the terminal value
if (
terminalEvent == null) {
terminalEvent =
NotificationLite.
completed();
// since many things can happen concurrently, we have a common dispatch loop
// to act on the current state serially
dispatch();
}
}
/**
* Atomically try adding a new InnerProducer to this Subscriber or return false if this
* Subscriber was terminated.
* @param producer the producer to add
* @return true if succeeded, false otherwise
*/
boolean
add(
InnerProducer<T>
producer) {
if (
producer == null) {
throw new
NullPointerException();
}
// the state can change so we do a CAS loop to achieve atomicity
for (;;) {
// get the current producer array
InnerProducer[]
c =
producers.
get();
// if this subscriber-to-source reached a terminal state by receiving
// an onError or onCompleted, just refuse to add the new producer
if (
c ==
TERMINATED) {
return false;
}
// we perform a copy-on-write logic
int
len =
c.length;
InnerProducer[]
u = new
InnerProducer[
len + 1];
System.
arraycopy(
c, 0,
u, 0,
len);
u[
len] =
producer;
// try setting the producers array
if (
producers.
compareAndSet(
c,
u)) {
return true;
}
// if failed, some other operation succeeded (another add, remove or termination)
// so retry
}
}
/**
* Atomically removes the given producer from the producers array.
* @param producer the producer to remove
*/
void
remove(
InnerProducer<T>
producer) {
// the state can change so we do a CAS loop to achieve atomicity
for (;;) {
// let's read the current producers array
InnerProducer[]
c =
producers.
get();
// if it is either empty or terminated, there is nothing to remove so we quit
if (
c ==
EMPTY ||
c ==
TERMINATED) {
return;
}
// let's find the supplied producer in the array
// although this is O(n), we don't expect too many child subscribers in general
int
j = -1;
int
len =
c.length;
for (int
i = 0;
i <
len;
i++) {
if (
c[
i].
equals(
producer)) {
j =
i;
break;
}
}
// we didn't find it so just quit
if (
j < 0) {
return;
}
// we do copy-on-write logic here
InnerProducer[]
u;
// we don't create a new empty array if producer was the single inhabitant
// but rather reuse an empty array
if (
len == 1) {
u =
EMPTY;
} else {
// otherwise, create a new array one less in size
u = new
InnerProducer[
len - 1];
// copy elements being before the given producer
System.
arraycopy(
c, 0,
u, 0,
j);
// copy elements being after the given producer
System.
arraycopy(
c,
j + 1,
u,
j,
len -
j - 1);
}
// try setting this new array as
if (
producers.
compareAndSet(
c,
u)) {
return;
}
// if we failed, it means something else happened
// (a concurrent add/remove or termination), we need to retry
}
}
/**
* Perform termination actions in case the source has terminated in some way and
* the queue has also become empty.
* @param term the terminal event (a NotificationLite.error or completed)
* @param empty set to true if the queue is empty
* @return true if there is indeed a terminal condition
*/
boolean
checkTerminated(
Object term, boolean
empty) {
// first of all, check if there is actually a terminal event
if (
term != null) {
// is it a completion event (impl. note, this is much cheaper than checking for isError)
if (
NotificationLite.
isCompleted(
term)) {
// but we also need to have an empty queue
if (
empty) {
// this will prevent OnSubscribe spinning on a terminated but
// not yet unsubscribed PublishSubscriber
current.
compareAndSet(this, null);
try {
/*
* This will swap in a terminated array so add() in OnSubscribe will reject
* child subscribers to associate themselves with a terminated and thus
* never again emitting chain.
*
* Since we atomically change the contents of 'producers' only one
* operation wins at a time. If an add() wins before this getAndSet,
* its value will be part of the returned array by getAndSet and thus
* will receive the terminal notification. Otherwise, if getAndSet wins,
* add() will refuse to add the child producer and will trigger the
* creation of subscriber-to-source.
*/
for (
InnerProducer<?>
ip :
producers.
getAndSet(
TERMINATED)) {
ip.
child.
onCompleted();
}
} finally {
// we explicitly unsubscribe/disconnect from the upstream
// after we sent out the terminal event to child subscribers
unsubscribe();
}
// indicate we reached the terminal state
return true;
}
} else {
Throwable t =
NotificationLite.
getError(
term);
// this will prevent OnSubscribe spinning on a terminated
// but not yet unsubscribed PublishSubscriber
current.
compareAndSet(this, null);
try {
// this will swap in a terminated array so add() in OnSubscribe will reject
// child subscribers to associate themselves with a terminated and thus
// never again emitting chain
for (
InnerProducer<?>
ip :
producers.
getAndSet(
TERMINATED)) {
ip.
child.
onError(
t);
}
} finally {
// we explicitly unsubscribe/disconnect from the upstream
// after we sent out the terminal event to child subscribers
unsubscribe();
}
// indicate we reached the terminal state
return true;
}
}
// there is still work to be done
return false;
}
/**
* The common serialization point of events arriving from upstream and child-subscribers
* requesting more.
*/
void
dispatch() {
// standard construct of emitter loop (blocking)
// if there is an emission going on, indicate that more work needs to be done
// the exact nature of this work needs to be determined from other data structures
synchronized (this) {
if (
emitting) {
missed = true;
return;
}
// there was no emission going on, we won and will start emitting
emitting = true;
missed = false;
}
/*
* In case an exception is thrown in the loop, we need to set emitting back to false
* on the way out (the exception will propagate up) so if it bounces back and
* onError is called, its dispatch() call will have the opportunity to emit it.
* However, if we want to exit regularly, we will set the emitting to false (+ other operations)
* atomically so we want to prevent the finally part to accidentally unlock some other
* emissions happening between the two synchronized blocks.
*/
boolean
skipFinal = false;
try {
for (;;) {
/*
* We need to read terminalEvent before checking the queue for emptiness because
* all enqueue happens before setting the terminal event.
* If it were the other way around, when the emission is paused between
* checking isEmpty and checking terminalEvent, some other thread might
* have produced elements and set the terminalEvent and we'd quit emitting
* prematurely.
*/
Object term =
terminalEvent;
/*
* See if the queue is empty; since we need this information multiple
* times later on, we read it one.
* Although the queue can become non-empty in the mean time, we will
* detect it through the missing flag and will do another iteration.
*/
boolean
empty =
queue.
isEmpty();
// if the queue is empty and the terminal event was received, quit
// and don't bother restoring emitting to false: no further activity is
// possible at this point
if (
checkTerminated(
term,
empty)) {
skipFinal = true;
return;
}
// We have elements queued. Note that due to the serialization nature of dispatch()
// this loop is the only one which can turn a non-empty queue into an empty one
// and as such, no need to ask the queue itself again for that.
if (!
empty) {
// We take a snapshot of the current child-subscribers.
// Concurrent subscribers may miss this iteration, but it is to be expected
@
SuppressWarnings("unchecked")
InnerProducer<T>[]
ps =
producers.
get();
int
len =
ps.length;
// Let's assume everyone requested the maximum value.
long
maxRequested =
Long.
MAX_VALUE;
// count how many have triggered unsubscription
int
unsubscribed = 0;
// Now find the minimum amount each child-subscriber requested
// since we can only emit that much to all of them without violating
// backpressure constraints
for (
InnerProducer<T>
ip :
ps) {
long
r =
ip.
get();
// if there is one child subscriber that hasn't requested yet
// we can't emit anything to anyone
if (
r >= 0L) {
maxRequested =
Math.
min(
maxRequested,
r);
} else
// unsubscription is indicated by a special value
if (
r ==
InnerProducer.
UNSUBSCRIBED) {
unsubscribed++;
}
// we ignore those with NOT_REQUESTED as if they aren't even there
}
// it may happen everyone has unsubscribed between here and producers.get()
// or we have no subscribers at all to begin with
if (
len ==
unsubscribed) {
term =
terminalEvent;
// so let's consume a value from the queue
Object v =
queue.
poll();
// or terminate if there was a terminal event and the queue is empty
if (
checkTerminated(
term,
v == null)) {
skipFinal = true;
return;
}
// otherwise, just ask for a new value
request(1);
// and retry emitting to potential new child-subscribers
continue;
}
// if we get here, it means there are non-unsubscribed child-subscribers
// and we count the number of emitted values because the queue
// may contain less than requested
int
d = 0;
while (
d <
maxRequested) {
term =
terminalEvent;
Object v =
queue.
poll();
empty =
v == null;
// let's check if there is a terminal event and the queue became empty just now
if (
checkTerminated(
term,
empty)) {
skipFinal = true;
return;
}
// the queue is empty but we aren't terminated yet, finish this emission loop
if (
empty) {
break;
}
// we need to unwrap potential nulls
T
value =
NotificationLite.
getValue(
v);
// let's emit this value to all child subscribers
for (
InnerProducer<T>
ip :
ps) {
// if ip.get() is negative, the child has either unsubscribed in the
// meantime or hasn't requested anything yet
// this eager behavior will skip unsubscribed children in case
// multiple values are available in the queue
if (
ip.
get() > 0L) {
try {
ip.
child.
onNext(
value);
} catch (
Throwable t) {
// we bounce back exceptions and kick out the child subscriber
ip.
unsubscribe();
Exceptions.
throwOrReport(
t,
ip.
child,
value);
continue;
}
// indicate this child has received 1 element
ip.
produced(1);
}
}
// indicate we emitted one element
d++;
}
// if we did emit at least one element, request more to replenish the queue
if (
d > 0) {
request(
d);
}
// if we have requests but not an empty queue after emission
// let's try again to see if more requests/child-subscribers are
// ready to receive more
if (
maxRequested != 0L && !
empty) {
continue;
}
}
// we did what we could: either the queue is empty or child subscribers
// haven't requested more (or both), let's try to finish dispatching
synchronized (this) {
// since missed is changed atomically, if we see it as true
// it means some state has changed and we need to loop again
// and handle that case
if (!
missed) {
// but if no missed dispatch happened, let's stop emitting
emitting = false;
// and skip the emitting = false in the finally block as well
skipFinal = true;
return;
}
// we acknowledge the missed changes so far
missed = false;
}
}
} finally {
// unless returned cleanly (i.e., some method above threw)
if (!
skipFinal) {
// we stop emitting so the error can propagate back down through onError
synchronized (this) {
emitting = false;
}
}
}
}
}
/**
* A Producer and Subscription that manages the request and unsubscription state of a
* child subscriber in thread-safe manner.
* We use AtomicLong as a base class to save on extra allocation of an AtomicLong and also
* save the overhead of the AtomicIntegerFieldUpdater.
* @param <T> the value type
*/
static final class
InnerProducer<T> extends
AtomicLong implements
Producer,
Subscription {
/** */
private static final long
serialVersionUID = -4453897557930727610L;
/**
* The parent subscriber-to-source used to allow removing the child in case of
* child unsubscription.
*/
final
PublishSubscriber<T>
parent;
/** The actual child subscriber. */
final
Subscriber<? super T>
child;
/**
* Indicates this child has been unsubscribed: the state is swapped in atomically and
* will prevent the dispatch() to emit (too many) values to a terminated child subscriber.
*/
static final long
UNSUBSCRIBED =
Long.
MIN_VALUE;
/**
* Indicates this child has not yet requested any value. We pretend we don't
* see such child subscribers in dispatch() to allow other child subscribers who
* have requested to make progress. In a concurrent subscription scenario,
* one can't be sure when a subscription happens exactly so this virtual shift
* should not cause any problems.
*/
static final long
NOT_REQUESTED =
Long.
MIN_VALUE / 2;
public
InnerProducer(
PublishSubscriber<T>
parent,
Subscriber<? super T>
child) {
this.
parent =
parent;
this.
child =
child;
this.
lazySet(
NOT_REQUESTED);
}
@
Override
public void
request(long
n) {
// ignore negative requests
if (
n < 0) {
return;
}
// In general, RxJava doesn't prevent concurrent requests (with each other or with
// an unsubscribe) so we need a CAS-loop, but we need to handle
// request overflow and unsubscribed/not requested state as well.
for (;;) {
// get the current request amount
long
r =
get();
// if child called unsubscribe() do nothing
if (
r ==
UNSUBSCRIBED) {
return;
}
// ignore zero requests except any first that sets in zero
if (
r >= 0L &&
n == 0) {
return;
}
long
u;
// if this child has not requested yet
if (
r ==
NOT_REQUESTED) {
// let the new request value this (no overflow check needed)
u =
n;
} else {
// otherwise, increase the request count
u =
r +
n;
// and check for long overflow
if (
u < 0) {
// cap at max value, which is essentially unlimited
u =
Long.
MAX_VALUE;
}
}
// try setting the new request value
if (
compareAndSet(
r,
u)) {
// if successful, notify the parent dispatcher this child can receive more
// elements
parent.
dispatch();
return;
}
// otherwise, someone else changed the state (perhaps a concurrent
// request or unsubscription so retry
}
}
/**
* Indicate that values have been emitted to this child subscriber by the dispatch() method.
* @param n the number of items emitted
* @return the updated request value (may indicate how much can be produced or a terminal state)
*/
public long
produced(long
n) {
// we don't allow producing zero or less: it would be a bug in the operator
if (
n <= 0) {
throw new
IllegalArgumentException("Cant produce zero or less");
}
for (;;) {
// get the current request value
long
r =
get();
// if no request has been made yet, we shouldn't have emitted to this child
// subscriber so there is a bug in this operator
if (
r ==
NOT_REQUESTED) {
throw new
IllegalStateException("Produced without request");
}
// if the child has unsubscribed, simply return and indicate this
if (
r ==
UNSUBSCRIBED) {
return
UNSUBSCRIBED;
}
// reduce the requested amount
long
u =
r -
n;
// if the new amount is less than zero, we have a bug in this operator
if (
u < 0) {
throw new
IllegalStateException("More produced (" +
n + ") than requested (" +
r + ")");
}
// try updating the request value
if (
compareAndSet(
r,
u)) {
// and return the updated value
return
u;
}
// otherwise, some concurrent activity happened and we need to retry
}
}
@
Override
public boolean
isUnsubscribed() {
return
get() ==
UNSUBSCRIBED;
}
@
Override
public void
unsubscribe() {
long
r =
get();
// let's see if we are unsubscribed
if (
r !=
UNSUBSCRIBED) {
// if not, swap in the terminal state, this is idempotent
// because other methods using CAS won't overwrite this value,
// concurrent calls to unsubscribe will atomically swap in the same
// terminal value
r =
getAndSet(
UNSUBSCRIBED);
// and only one of them will see a non-terminated value before the swap
if (
r !=
UNSUBSCRIBED) {
// remove this from the parent
parent.
remove(this);
// After removal, we might have unblocked the other child subscribers:
// let's assume this child had 0 requested before the unsubscription while
// the others had non-zero. By removing this 'blocking' child, the others
// are now free to receive events
parent.
dispatch();
}
}
}
}
}