/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.operators;
import java.util.*;
import java.util.concurrent.
TimeUnit;
import java.util.concurrent.atomic.*;
import rx.*;
import rx.
Observable;
import rx.exceptions.*;
import rx.functions.*;
import rx.internal.util.
OpenHashSet;
import rx.observables.
ConnectableObservable;
import rx.schedulers.
Timestamped;
import rx.subscriptions.
Subscriptions;
public final class
OperatorReplay<T> extends
ConnectableObservable<T> implements
Subscription {
/** The source observable. */
final
Observable<? extends T>
source;
/** Holds the current subscriber that is, will be or just was subscribed to the source observable. */
final
AtomicReference<
ReplaySubscriber<T>>
current;
/** A factory that creates the appropriate buffer for the ReplaySubscriber. */
final
Func0<? extends
ReplayBuffer<T>>
bufferFactory;
@
SuppressWarnings("rawtypes")
static final
Func0 DEFAULT_UNBOUNDED_FACTORY = new
Func0() {
@
Override
public
Object call() {
return new
UnboundedReplayBuffer<
Object>(16);
}
};
/**
* Given a connectable observable factory, it multicasts over the generated
* ConnectableObservable via a selector function.
* @param <T> the upstream's value type
* @param <U> the intermediate value type of the ConnectableObservable
* @param <R> the final value type provided by the selector function
* @param connectableFactory the factory that returns a ConnectableObservable instance
* @param selector the function applied on the ConnectableObservable and returns the Observable
* the downstream will subscribe to.
* @return the Observable multicasting over a transformation of a ConnectableObservable
*/
public static <T, U, R>
Observable<R>
multicastSelector(
final
Func0<? extends
ConnectableObservable<U>>
connectableFactory,
final
Func1<? super
Observable<U>, ? extends
Observable<R>>
selector) {
return
Observable.
unsafeCreate(new
OnSubscribe<R>() {
@
Override
public void
call(final
Subscriber<? super R>
child) {
ConnectableObservable<U>
co;
Observable<R>
observable;
try {
co =
connectableFactory.
call();
observable =
selector.
call(
co);
} catch (
Throwable e) {
Exceptions.
throwOrReport(
e,
child);
return;
}
observable.
subscribe(
child);
co.
connect(new
Action1<
Subscription>() {
@
Override
public void
call(
Subscription t) {
child.
add(
t);
}
});
}
});
}
/**
* Child Subscribers will observe the events of the ConnectableObservable on the
* specified scheduler.
* @param <T> the value type
* @param co the ConnectableObservable to schedule on the specified scheduler
* @param scheduler the target Scheduler instance
* @return the ConnectableObservable instance that is observed on the specified scheduler
*/
public static <T>
ConnectableObservable<T>
observeOn(final
ConnectableObservable<T>
co, final
Scheduler scheduler) {
final
Observable<T>
observable =
co.
observeOn(
scheduler);
OnSubscribe<T>
onSubscribe = new
OnSubscribe<T>() {
@
Override
public void
call(final
Subscriber<? super T>
child) {
// apply observeOn and prevent calling onStart() again
observable.
unsafeSubscribe(new
Subscriber<T>(
child) {
@
Override
public void
onNext(T
t) {
child.
onNext(
t);
}
@
Override
public void
onError(
Throwable e) {
child.
onError(
e);
}
@
Override
public void
onCompleted() {
child.
onCompleted();
}
});
}
};
return new
ConnectableObservable<T>(
onSubscribe) {
@
Override
public void
connect(
Action1<? super
Subscription>
connection) {
co.
connect(
connection);
}
};
}
/**
* Creates a replaying ConnectableObservable with an unbounded buffer.
* @param <T> the value type
* @param source the source Observable
* @return the replaying ConnectableObservable
*/
@
SuppressWarnings("unchecked")
public static <T>
ConnectableObservable<T>
create(
Observable<? extends T>
source) {
return
create(
source,
DEFAULT_UNBOUNDED_FACTORY);
}
/**
* Creates a replaying ConnectableObservable with a size bound buffer.
* @param <T> the value type
* @param source the source Observable
* @param bufferSize the maximum number of elements buffered
* @return the replaying ConnectableObservable
*/
@
SuppressWarnings("cast")
public static <T>
ConnectableObservable<T>
create(
Observable<? extends T>
source,
final int
bufferSize) {
if (
bufferSize ==
Integer.
MAX_VALUE) {
return (
ConnectableObservable<T>)
create(
source);
}
return
create(
source, new
Func0<
ReplayBuffer<T>>() {
@
Override
public
ReplayBuffer<T>
call() {
return new
SizeBoundReplayBuffer<T>(
bufferSize);
}
});
}
/**
* Creates a replaying ConnectableObservable with a time bound buffer.
* @param <T> the value type
* @param source the source Observable
* @param maxAge the maximum age (exclusive) of each item when timestamped with the given scheduler
* @param unit the time unit of the maximum age
* @param scheduler the scheduler providing the notion of current time
* @return the replaying ConnectableObservable
*/
@
SuppressWarnings("cast")
public static <T>
ConnectableObservable<T>
create(
Observable<? extends T>
source,
long
maxAge,
TimeUnit unit,
Scheduler scheduler) {
return (
ConnectableObservable<T>)
create(
source,
maxAge,
unit,
scheduler,
Integer.
MAX_VALUE);
}
/**
* Creates a replaying ConnectableObservable with a size and time bound buffer.
* @param <T> the value type
* @param source the source Observable
* @param maxAge the maximum age (exclusive) of each item when timestamped with the given scheduler
* @param unit the time unit of the maximum age
* @param scheduler the scheduler providing the notion of current time
* @param bufferSize the maximum number of elements buffered
* @return the replaying ConnectableObservable
*/
public static <T>
ConnectableObservable<T>
create(
Observable<? extends T>
source,
long
maxAge,
TimeUnit unit, final
Scheduler scheduler, final int
bufferSize) {
final long
maxAgeInMillis =
unit.
toMillis(
maxAge);
return
create(
source, new
Func0<
ReplayBuffer<T>>() {
@
Override
public
ReplayBuffer<T>
call() {
return new
SizeAndTimeBoundReplayBuffer<T>(
bufferSize,
maxAgeInMillis,
scheduler);
}
});
}
/**
* Creates a OperatorReplay instance to replay values of the given source observable.
* @param source the source observable
* @param bufferFactory the factory to instantiate the appropriate buffer when the observable becomes active
* @return the connectable observable
*/
static <T>
ConnectableObservable<T>
create(
Observable<? extends T>
source,
final
Func0<? extends
ReplayBuffer<T>>
bufferFactory) {
// the current connection to source needs to be shared between the operator and its onSubscribe call
final
AtomicReference<
ReplaySubscriber<T>>
curr = new
AtomicReference<
ReplaySubscriber<T>>();
OnSubscribe<T>
onSubscribe = new
OnSubscribe<T>() {
@
Override
public void
call(
Subscriber<? super T>
child) {
// concurrent connection/disconnection may change the state,
// we loop to be atomic while the child subscribes
for (;;) {
// get the current subscriber-to-source
ReplaySubscriber<T>
r =
curr.
get();
// if there isn't one
if (
r == null) {
// create a new subscriber to source
ReplaySubscriber<T>
u = new
ReplaySubscriber<T>(
bufferFactory.
call());
// perform extra initialization to avoid 'this' to escape during construction
u.
init();
// let's try setting it as the current subscriber-to-source
if (!
curr.
compareAndSet(
r,
u)) {
// didn't work, maybe someone else did it or the current subscriber
// to source has just finished
continue;
}
// we won, let's use it going onwards
r =
u;
}
// create the backpressure-managing producer for this child
InnerProducer<T>
inner = new
InnerProducer<T>(
r,
child);
// we try to add it to the array of producers
// if it fails, no worries because we will still have its buffer
// so it is going to replay it for us
r.
add(
inner);
// the producer has been registered with the current subscriber-to-source so
// at least it will receive the next terminal event
child.
add(
inner);
// pin the head of the buffer here, shouldn't affect anything else
r.
buffer.
replay(
inner);
// setting the producer will trigger the first request to be considered by
// the subscriber-to-source.
child.
setProducer(
inner);
break; // NOPMD
}
}
};
return new
OperatorReplay<T>(
onSubscribe,
source,
curr,
bufferFactory);
}
private
OperatorReplay(
OnSubscribe<T>
onSubscribe,
Observable<? extends T>
source,
final
AtomicReference<
ReplaySubscriber<T>>
current,
final
Func0<? extends
ReplayBuffer<T>>
bufferFactory) {
super(
onSubscribe);
this.
source =
source;
this.
current =
current;
this.
bufferFactory =
bufferFactory;
}
@
Override
public void
unsubscribe() {
current.
lazySet(null);
}
@
Override
public boolean
isUnsubscribed() {
ReplaySubscriber<T>
ps =
current.
get();
return
ps == null ||
ps.
isUnsubscribed();
}
@
Override
public void
connect(
Action1<? super
Subscription>
connection) {
boolean
doConnect;
ReplaySubscriber<T>
ps;
// we loop because concurrent connect/disconnect and termination may change the state
for (;;) {
// retrieve the current subscriber-to-source instance
ps =
current.
get();
// if there is none yet or the current has unsubscribed
if (
ps == null ||
ps.
isUnsubscribed()) {
// create a new subscriber-to-source
ReplaySubscriber<T>
u = new
ReplaySubscriber<T>(
bufferFactory.
call());
// initialize out the constructor to avoid 'this' to escape
u.
init();
// try setting it as the current subscriber-to-source
if (!
current.
compareAndSet(
ps,
u)) {
// did not work, perhaps a new subscriber arrived
// and created a new subscriber-to-source as well, retry
continue;
}
ps =
u;
}
// if connect() was called concurrently, only one of them should actually
// connect to the source
doConnect = !
ps.
shouldConnect.
get() &&
ps.
shouldConnect.
compareAndSet(false, true);
break; // NOPMD
}
/*
* Notify the callback that we have a (new) connection which it can unsubscribe
* but since ps is unique to a connection, multiple calls to connect() will return the
* same Subscription and even if there was a connect-disconnect-connect pair, the older
* references won't disconnect the newer connection.
* Synchronous source consumers have the opportunity to disconnect via unsubscribe on the
* Subscription as unsafeSubscribe may never return in its own.
*
* Note however, that asynchronously disconnecting a running source might leave
* child-subscribers without any terminal event; ReplaySubject does not have this
* issue because the unsubscription was always triggered by the child-subscribers
* themselves.
*/
connection.
call(
ps);
if (
doConnect) {
source.
unsafeSubscribe(
ps);
}
}
@
SuppressWarnings("rawtypes")
static final class
ReplaySubscriber<T> extends
Subscriber<T> implements
Subscription {
/** Holds notifications from upstream. */
final
ReplayBuffer<T>
buffer;
/** Contains either an onCompleted or an onError token from upstream. */
boolean
done;
/** Indicates an empty array of inner producers. */
static final
InnerProducer[]
EMPTY = new
InnerProducer[0];
/** Indicates a terminated ReplaySubscriber. */
static final
InnerProducer[]
TERMINATED = new
InnerProducer[0];
/** Indicates no further InnerProducers are accepted. */
volatile boolean
terminated;
/** Tracks the subscribed producers. Guarded by itself. */
final
OpenHashSet<
InnerProducer<T>>
producers;
/** Contains a copy of the producers. Modified only from the source side. */
InnerProducer<T>[]
producersCache;
/** Contains number of modifications to the producers set.*/
volatile long
producersVersion;
/** Contains the number of modifications that the producersCache holds. */
long
producersCacheVersion;
/**
* Atomically changed from false to true by connect to make sure the
* connection is only performed by one thread.
*/
final
AtomicBoolean shouldConnect;
/** Guarded by this. */
boolean
emitting;
/** Guarded by this. */
boolean
missed;
/** Contains the maximum element index the child Subscribers requested so far. Accessed while emitting is true. */
long
maxChildRequested;
/** Counts the outstanding upstream requests until the producer arrives. */
long
maxUpstreamRequested;
/** The upstream producer. */
volatile
Producer producer;
/** The queue that holds producers with request changes that need to be coordinated. */
List<
InnerProducer<T>>
coordinationQueue;
/** Indicate that all request amounts should be considered. */
boolean
coordinateAll;
@
SuppressWarnings("unchecked")
public
ReplaySubscriber(
ReplayBuffer<T>
buffer) {
this.
buffer =
buffer;
this.
producers = new
OpenHashSet<
InnerProducer<T>>();
this.
producersCache =
EMPTY;
this.
shouldConnect = new
AtomicBoolean();
// make sure the source doesn't produce values until the child subscribers
// expressed their request amounts
this.
request(0);
}
/** Should be called after the constructor finished to setup nulling-out the current reference. */
void
init() {
add(
Subscriptions.
create(new
Action0() {
@
Override
public void
call() {
if (!
terminated) {
synchronized (
producers) {
if (!
terminated) {
producers.
terminate();
producersVersion++;
terminated = true;
}
}
}
// unlike OperatorPublish, we can't null out the terminated so
// late subscribers can still get replay
// current.compareAndSet(ReplaySubscriber.this, null);
// we don't care if it fails because it means the current has
// been replaced in the meantime
}
}));
}
/**
* Atomically try adding a new InnerProducer to this Subscriber or return false if this
* Subscriber was terminated.
* @param producer the producer to add
* @return true if succeeded, false otherwise
*/
boolean
add(
InnerProducer<T>
producer) {
if (
producer == null) {
throw new
NullPointerException();
}
if (
terminated) {
return false;
}
synchronized (
producers) {
if (
terminated) {
return false;
}
producers.
add(
producer);
producersVersion++;
}
return true;
}
/**
* Atomically removes the given producer from the producers array.
* @param producer the producer to remove
*/
@
SuppressWarnings("unchecked")
void
remove(
InnerProducer<T>
producer) {
if (
terminated) {
return;
}
synchronized (
producers) {
if (
terminated) {
return;
}
producers.
remove(
producer);
if (
producers.
isEmpty()) {
producersCache =
EMPTY;
}
producersVersion++;
}
}
@
Override
public void
setProducer(
Producer p) {
Producer p0 =
producer;
if (
p0 != null) {
throw new
IllegalStateException("Only a single producer can be set on a Subscriber.");
}
producer =
p;
manageRequests(null);
replay();
}
@
Override
public void
onNext(T
t) {
if (!
done) {
buffer.
next(
t);
replay();
}
}
@
Override
public void
onError(
Throwable e) {
// The observer front is accessed serially as required by spec so
// no need to CAS in the terminal value
if (!
done) {
done = true;
try {
buffer.
error(
e);
replay();
} finally {
unsubscribe(); // expectation of testIssue2191
}
}
}
@
Override
public void
onCompleted() {
// The observer front is accessed serially as required by spec so
// no need to CAS in the terminal value
if (!
done) {
done = true;
try {
buffer.
complete();
replay();
} finally {
unsubscribe();
}
}
}
/**
* Coordinates the request amounts of various child Subscribers.
*/
void
manageRequests(
InnerProducer<T>
inner) {
// if the upstream has completed, no more requesting is possible
if (
isUnsubscribed()) {
return;
}
synchronized (this) {
if (
emitting) {
if (
inner != null) {
List<
InnerProducer<T>>
q =
coordinationQueue;
if (
q == null) {
q = new
ArrayList<
InnerProducer<T>>();
coordinationQueue =
q;
}
q.
add(
inner);
} else {
coordinateAll = true;
}
missed = true;
return;
}
emitting = true;
}
long
ri =
maxChildRequested;
long
maxTotalRequested;
if (
inner != null) {
maxTotalRequested =
Math.
max(
ri,
inner.
totalRequested.
get());
} else {
maxTotalRequested =
ri;
InnerProducer<T>[]
a =
copyProducers();
for (
InnerProducer<T>
rp :
a) {
if (
rp != null) {
maxTotalRequested =
Math.
max(
maxTotalRequested,
rp.
totalRequested.
get());
}
}
}
makeRequest(
maxTotalRequested,
ri);
for (;;) {
// if the upstream has completed, no more requesting is possible
if (
isUnsubscribed()) {
return;
}
List<
InnerProducer<T>>
q;
boolean
all;
synchronized (this) {
if (!
missed) {
emitting = false;
return;
}
missed = false;
q =
coordinationQueue;
coordinationQueue = null;
all =
coordinateAll;
coordinateAll = false;
}
ri =
maxChildRequested;
maxTotalRequested =
ri;
if (
q != null) {
for (
InnerProducer<T>
rp :
q) {
maxTotalRequested =
Math.
max(
maxTotalRequested,
rp.
totalRequested.
get());
}
}
if (
all) {
InnerProducer<T>[]
a =
copyProducers();
for (
InnerProducer<T>
rp :
a) {
if (
rp != null) {
maxTotalRequested =
Math.
max(
maxTotalRequested,
rp.
totalRequested.
get());
}
}
}
makeRequest(
maxTotalRequested,
ri);
}
}
InnerProducer<T>[]
copyProducers() {
synchronized (
producers) {
Object[]
a =
producers.
values();
int
n =
a.length;
@
SuppressWarnings("unchecked")
InnerProducer<T>[]
result = new
InnerProducer[
n];
System.
arraycopy(
a, 0,
result, 0,
n);
return
result;
}
}
void
makeRequest(long
maxTotalRequests, long
previousTotalRequests) {
long
ur =
maxUpstreamRequested;
Producer p =
producer;
long
diff =
maxTotalRequests -
previousTotalRequests;
if (
diff != 0) {
maxChildRequested =
maxTotalRequests;
if (
p != null) {
if (
ur != 0L) {
maxUpstreamRequested = 0L;
p.
request(
ur +
diff);
} else {
p.
request(
diff);
}
} else {
// collect upstream request amounts until there is a producer for them
long
u =
ur +
diff;
if (
u < 0) {
u =
Long.
MAX_VALUE;
}
maxUpstreamRequested =
u;
}
} else
// if there were outstanding upstream requests and we have a producer
if (
ur != 0L &&
p != null) {
maxUpstreamRequested = 0L;
// fire the accumulated requests
p.
request(
ur);
}
}
/**
* Tries to replay the buffer contents to all known subscribers.
*/
@
SuppressWarnings("unchecked")
void
replay() {
InnerProducer<T>[]
pc =
producersCache;
if (
producersCacheVersion !=
producersVersion) {
synchronized (
producers) {
pc =
producersCache;
// if the producers hasn't changed do nothing
// otherwise make a copy of the current set of producers
Object[]
a =
producers.
values();
int
n =
a.length;
if (
pc.length !=
n) {
pc = new
InnerProducer[
n];
producersCache =
pc;
}
System.
arraycopy(
a, 0,
pc, 0,
n);
producersCacheVersion =
producersVersion;
}
}
ReplayBuffer<T>
b =
buffer;
for (
InnerProducer<T>
rp :
pc) {
if (
rp != null) {
b.
replay(
rp);
}
}
}
}
/**
* A Producer and Subscription that manages the request and unsubscription state of a
* child subscriber in thread-safe manner.
* We use AtomicLong as a base class to save on extra allocation of an AtomicLong and also
* save the overhead of the AtomicIntegerFieldUpdater.
* @param <T> the value type
*/
static final class
InnerProducer<T> extends
AtomicLong implements
Producer,
Subscription {
/** */
private static final long
serialVersionUID = -4453897557930727610L;
/**
* The parent subscriber-to-source used to allow removing the child in case of
* child unsubscription.
*/
final
ReplaySubscriber<T>
parent;
/** The actual child subscriber. */
Subscriber<? super T>
child;
/**
* Holds an object that represents the current location in the buffer.
* Guarded by the emitter loop.
*/
Object index;
/**
* Keeps the sum of all requested amounts.
*/
final
AtomicLong totalRequested;
/** Indicates an emission state. Guarded by this. */
boolean
emitting;
/** Indicates a missed update. Guarded by this. */
boolean
missed;
/**
* Indicates this child has been unsubscribed: the state is swapped in atomically and
* will prevent the dispatch() to emit (too many) values to a terminated child subscriber.
*/
static final long
UNSUBSCRIBED =
Long.
MIN_VALUE;
public
InnerProducer(
ReplaySubscriber<T>
parent,
Subscriber<? super T>
child) {
this.
parent =
parent;
this.
child =
child;
this.
totalRequested = new
AtomicLong();
}
@
Override
public void
request(long
n) {
// ignore negative requests
if (
n < 0) {
return;
}
// In general, RxJava doesn't prevent concurrent requests (with each other or with
// an unsubscribe) so we need a CAS-loop, but we need to handle
// request overflow and unsubscribed/not requested state as well.
for (;;) {
// get the current request amount
long
r =
get();
// if child called unsubscribe() do nothing
if (
r ==
UNSUBSCRIBED) {
return;
}
// ignore zero requests except any first that sets in zero
if (
r >= 0L &&
n == 0) {
return;
}
// otherwise, increase the request count
long
u =
r +
n;
// and check for long overflow
if (
u < 0) {
// cap at max value, which is essentially unlimited
u =
Long.
MAX_VALUE;
}
// try setting the new request value
if (
compareAndSet(
r,
u)) {
// increment the total request counter
addTotalRequested(
n);
// if successful, notify the parent dispatcher this child can receive more
// elements
parent.
manageRequests(this);
parent.
buffer.
replay(this);
return;
}
// otherwise, someone else changed the state (perhaps a concurrent
// request or unsubscription so retry
}
}
/**
* Increments the total requested amount.
* @param n the additional request amount
*/
void
addTotalRequested(long
n) {
for (;;) {
long
r =
totalRequested.
get();
long
u =
r +
n;
if (
u < 0) {
u =
Long.
MAX_VALUE;
}
if (
totalRequested.
compareAndSet(
r,
u)) {
return;
}
}
}
/**
* Indicate that values have been emitted to this child subscriber by the dispatch() method.
* @param n the number of items emitted
* @return the updated request value (may indicate how much can be produced or a terminal state)
*/
public long
produced(long
n) {
// we don't allow producing zero or less: it would be a bug in the operator
if (
n <= 0) {
throw new
IllegalArgumentException("Cant produce zero or less");
}
for (;;) {
// get the current request value
long
r =
get();
// if the child has unsubscribed, simply return and indicate this
if (
r ==
UNSUBSCRIBED) {
return
UNSUBSCRIBED;
}
// reduce the requested amount
long
u =
r -
n;
// if the new amount is less than zero, we have a bug in this operator
if (
u < 0) {
throw new
IllegalStateException("More produced (" +
n + ") than requested (" +
r + ")");
}
// try updating the request value
if (
compareAndSet(
r,
u)) {
// and return the updated value
return
u;
}
// otherwise, some concurrent activity happened and we need to retry
}
}
@
Override
public boolean
isUnsubscribed() {
return
get() ==
UNSUBSCRIBED;
}
@
Override
public void
unsubscribe() {
long
r =
get();
// let's see if we are unsubscribed
if (
r !=
UNSUBSCRIBED) {
// if not, swap in the terminal state, this is idempotent
// because other methods using CAS won't overwrite this value,
// concurrent calls to unsubscribe will atomically swap in the same
// terminal value
r =
getAndSet(
UNSUBSCRIBED);
// and only one of them will see a non-terminated value before the swap
if (
r !=
UNSUBSCRIBED) {
// remove this from the parent
parent.
remove(this);
// After removal, we might have unblocked the other child subscribers:
// let's assume this child had 0 requested before the unsubscription while
// the others had non-zero. By removing this 'blocking' child, the others
// are now free to receive events
parent.
manageRequests(this);
// break the reference
child = null;
}
}
}
/**
* Convenience method to auto-cast the index object.
* @return the associated index object or null
*/
@
SuppressWarnings("unchecked")
<U> U
index() {
return (U)
index;
}
}
/**
* The interface for interacting with various buffering logic.
*
* @param <T> the value type
*/
interface
ReplayBuffer<T> {
/**
* Adds a regular value to the buffer.
* @param value the value to buffer
*/
void
next(T
value);
/**
* Adds a terminal exception to the buffer
* @param e the Throwable to buffer
*/
void
error(
Throwable e);
/**
* Adds a completion event to the buffer
*/
void
complete();
/**
* Tries to replay the buffered values to the
* subscriber inside the output if there
* is new value and requests available at the
* same time.
* @param output the producer of the downstream consumer
*/
void
replay(
InnerProducer<T>
output);
}
/**
* Holds an unbounded list of events.
*
* @param <T> the value type
*/
static final class
UnboundedReplayBuffer<T> extends
ArrayList<
Object> implements
ReplayBuffer<T> {
/** */
private static final long
serialVersionUID = 7063189396499112664L;
/** The total number of events in the buffer. */
volatile int
size;
public
UnboundedReplayBuffer(int
capacityHint) {
super(
capacityHint);
}
@
Override
public void
next(T
value) {
add(
NotificationLite.
next(
value));
size++;
}
@
Override
public void
error(
Throwable e) {
add(
NotificationLite.
error(
e));
size++;
}
@
Override
public void
complete() {
add(
NotificationLite.
completed());
size++;
}
@
Override
public void
replay(
InnerProducer<T>
output) {
synchronized (
output) {
if (
output.
emitting) {
output.
missed = true;
return;
}
output.
emitting = true;
}
for (;;) {
if (
output.
isUnsubscribed()) {
return;
}
int
sourceIndex =
size;
Integer destinationIndexObject =
output.
index();
int
destinationIndex =
destinationIndexObject != null ?
destinationIndexObject : 0;
Subscriber<? super T>
child =
output.
child;
if (
child == null) {
return;
}
long
r =
output.
get();
long
e = 0L;
while (
e !=
r &&
destinationIndex <
sourceIndex) {
Object o =
get(
destinationIndex);
try {
if (
NotificationLite.
accept(
child,
o)) {
return;
}
} catch (
Throwable err) {
Exceptions.
throwIfFatal(
err);
output.
unsubscribe();
if (!
NotificationLite.
isError(
o) && !
NotificationLite.
isCompleted(
o)) {
child.
onError(
OnErrorThrowable.
addValueAsLastCause(
err,
NotificationLite.
getValue(
o)));
}
return;
}
if (
output.
isUnsubscribed()) {
return;
}
destinationIndex++;
e++;
}
if (
e != 0L) {
output.
index =
destinationIndex;
if (
r !=
Long.
MAX_VALUE) {
output.
produced(
e);
}
}
synchronized (
output) {
if (!
output.
missed) {
output.
emitting = false;
return;
}
output.
missed = false;
}
}
}
}
/**
* Represents a node in a bounded replay buffer's linked list.
*/
static final class
Node extends
AtomicReference<
Node> {
/** */
private static final long
serialVersionUID = 245354315435971818L;
/** The contained value. */
final
Object value;
/** The absolute index of the value. */
final long
index;
public
Node(
Object value, long
index) {
this.
value =
value;
this.
index =
index;
}
}
/**
* Base class for bounded buffering with options to specify an
* enter and leave transforms and custom truncation behavior.
*
* @param <T> the value type
*/
static class
BoundedReplayBuffer<T> extends
AtomicReference<
Node> implements
ReplayBuffer<T> {
/** */
private static final long
serialVersionUID = 2346567790059478686L;
Node tail;
int
size;
/** The total number of received values so far. */
long
index;
public
BoundedReplayBuffer() {
Node n = new
Node(null, 0);
tail =
n;
set(
n);
}
/**
* Add a new node to the linked list.
* @param n the node to add as last
*/
final void
addLast(
Node n) {
tail.
set(
n);
tail =
n;
size++;
}
/**
* Remove the first node from the linked list.
*/
final void
removeFirst() {
Node head =
get();
Node next =
head.
get();
if (
next == null) {
throw new
IllegalStateException("Empty list!");
}
size--;
// can't just move the head because it would retain the very first value
// can't null out the head's value because of late replayers would see null
setFirst(
next);
}
/* test */ final void
removeSome(int
n) {
Node head =
get();
while (
n > 0) {
head =
head.
get();
n--;
size--;
}
setFirst(
head);
}
/**
* Arranges the given node is the new head from now on.
* @param n the node to set as first
*/
final void
setFirst(
Node n) {
set(
n);
}
/**
* Returns the current head for initializing the replay location
* for a new subscriber.
* Override it to consider linked but outdated elements.
* @return the current head
*/
Node getInitialHead() {
return
get();
}
@
Override
public final void
next(T
value) {
Object o =
enterTransform(
NotificationLite.
next(
value));
Node n = new
Node(
o, ++
index);
addLast(
n);
truncate();
}
@
Override
public final void
error(
Throwable e) {
Object o =
enterTransform(
NotificationLite.
error(
e));
Node n = new
Node(
o, ++
index);
addLast(
n);
truncateFinal();
}
@
Override
public final void
complete() {
Object o =
enterTransform(
NotificationLite.
completed());
Node n = new
Node(
o, ++
index);
addLast(
n);
truncateFinal();
}
@
Override
public final void
replay(
InnerProducer<T>
output) {
synchronized (
output) {
if (
output.
emitting) {
output.
missed = true;
return;
}
output.
emitting = true;
}
for (;;) {
if (
output.
isUnsubscribed()) {
return;
}
Node node =
output.
index();
if (
node == null) {
node =
getInitialHead();
output.
index =
node;
/*
* Since this is a latecomer, fix its total requested amount
* as if it got all the values up to the node.index
*/
output.
addTotalRequested(
node.
index);
}
if (
output.
isUnsubscribed()) {
return;
}
Subscriber<? super T>
child =
output.
child;
if (
child == null) {
return;
}
long
r =
output.
get();
long
e = 0L;
while (
e !=
r) {
Node v =
node.
get();
if (
v != null) {
Object o =
leaveTransform(
v.
value);
try {
if (
NotificationLite.
accept(
child,
o)) {
output.
index = null;
return;
}
} catch (
Throwable err) {
output.
index = null;
Exceptions.
throwIfFatal(
err);
output.
unsubscribe();
if (!
NotificationLite.
isError(
o) && !
NotificationLite.
isCompleted(
o)) {
child.
onError(
OnErrorThrowable.
addValueAsLastCause(
err,
NotificationLite.
getValue(
o)));
}
return;
}
e++;
node =
v;
} else {
break;
}
if (
output.
isUnsubscribed()) {
return;
}
}
if (
e != 0L) {
output.
index =
node;
if (
r !=
Long.
MAX_VALUE) {
output.
produced(
e);
}
}
synchronized (
output) {
if (!
output.
missed) {
output.
emitting = false;
return;
}
output.
missed = false;
}
}
}
/**
* Override this to wrap the NotificationLite object into a
* container to be used later by truncate.
* @param value the value to transform into the internal representation
* @return the internal representation of the value
*/
Object enterTransform(
Object value) {
return
value;
}
/**
* Override this to unwrap the transformed value into a
* NotificationLite object.
* @param value the value to transform back to external representation from
* the internal representation
* @return the external representation of the value
*/
Object leaveTransform(
Object value) {
return
value;
}
/**
* Override this method to truncate a non-terminated buffer
* based on its current properties.
*/
void
truncate() {
// no op by default
}
/**
* Override this method to truncate a terminated buffer
* based on its properties (i.e., truncate but the very last node).
*/
void
truncateFinal() {
// no op by default
}
/* test */ final void
collect(
Collection<? super T>
output) {
Node n =
getInitialHead();
for (;;) {
Node next =
n.
get();
if (
next != null) {
Object o =
next.
value;
Object v =
leaveTransform(
o);
if (
NotificationLite.
isCompleted(
v) ||
NotificationLite.
isError(
v)) {
break;
}
output.
add(
NotificationLite.<T>
getValue(
v));
n =
next;
} else {
break;
}
}
}
/* test */ boolean
hasError() {
return
tail.
value != null &&
NotificationLite.
isError(
leaveTransform(
tail.
value));
}
/* test */ boolean
hasCompleted() {
return
tail.
value != null &&
NotificationLite.
isCompleted(
leaveTransform(
tail.
value));
}
}
/**
* A bounded replay buffer implementation with size limit only.
*
* @param <T> the value type
*/
static final class
SizeBoundReplayBuffer<T> extends
BoundedReplayBuffer<T> {
/** */
private static final long
serialVersionUID = -5898283885385201806L;
final int
limit;
public
SizeBoundReplayBuffer(int
limit) {
this.
limit =
limit;
}
@
Override
void
truncate() {
// overflow can be at most one element
if (
size >
limit) {
removeFirst();
}
}
// no need for final truncation because values are truncated one by one
}
/**
* Size and time bound replay buffer.
*
* @param <T> the buffered value type
*/
static final class
SizeAndTimeBoundReplayBuffer<T> extends
BoundedReplayBuffer<T> {
/** */
private static final long
serialVersionUID = 3457957419649567404L;
final
Scheduler scheduler;
final long
maxAgeInMillis;
final int
limit;
public
SizeAndTimeBoundReplayBuffer(int
limit, long
maxAgeInMillis,
Scheduler scheduler) {
this.
scheduler =
scheduler;
this.
limit =
limit;
this.
maxAgeInMillis =
maxAgeInMillis;
}
@
Override
Object enterTransform(
Object value) {
return new
Timestamped<
Object>(
scheduler.
now(),
value);
}
@
Override
Object leaveTransform(
Object value) {
return ((
Timestamped<?>)
value).
getValue();
}
@
Override
Node getInitialHead() {
long
timeLimit =
scheduler.
now() -
maxAgeInMillis;
Node prev =
get();
Node next =
prev.
get();
while (
next != null) {
Object o =
next.
value;
Object v =
leaveTransform(
o);
if (
NotificationLite.
isCompleted(
v) ||
NotificationLite.
isError(
v)) {
break;
}
if (((
Timestamped<?>)
o).
getTimestampMillis() <=
timeLimit) {
prev =
next;
next =
next.
get();
} else {
break;
}
}
return
prev;
}
@
Override
void
truncate() {
long
timeLimit =
scheduler.
now() -
maxAgeInMillis;
Node prev =
get();
Node next =
prev.
get();
int
e = 0;
for (;;) {
if (
next != null) {
if (
size >
limit) {
e++;
size--;
prev =
next;
next =
next.
get();
} else {
Timestamped<?>
v = (
Timestamped<?>)
next.
value;
if (
v.
getTimestampMillis() <=
timeLimit) {
e++;
size--;
prev =
next;
next =
next.
get();
} else {
break;
}
}
} else {
break;
}
}
if (
e != 0) {
setFirst(
prev);
}
}
@
Override
void
truncateFinal() {
long
timeLimit =
scheduler.
now() -
maxAgeInMillis;
Node prev =
get();
Node next =
prev.
get();
int
e = 0;
for (;;) {
if (
next != null &&
size > 1) {
Timestamped<?>
v = (
Timestamped<?>)
next.
value;
if (
v.
getTimestampMillis() <=
timeLimit) {
e++;
size--;
prev =
next;
next =
next.
get();
} else {
break;
}
} else {
break;
}
}
if (
e != 0) {
setFirst(
prev);
}
}
}
}