/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.subjects;
import java.lang.reflect.
Array;
import java.util.*;
import java.util.concurrent.
TimeUnit;
import java.util.concurrent.atomic.*;
import rx.*;
import rx.
Observer;
import rx.exceptions.
Exceptions;
import rx.internal.operators.
BackpressureUtils;
import rx.plugins.
RxJavaHooks;
import rx.schedulers.
Schedulers;
/**
* Subject that buffers all items it observes and replays them to any {@link Observer} that subscribes.
* <p>
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/S.ReplaySubject.png" alt="">
* <p>
* Example usage:
* <p>
* <pre> {@code
ReplaySubject<Object> subject = ReplaySubject.create();
subject.onNext("one");
subject.onNext("two");
subject.onNext("three");
subject.onCompleted();
// both of the following will get the onNext/onCompleted calls from above
subject.subscribe(observer1);
subject.subscribe(observer2);
} </pre>
*
* @param <T>
* the type of items observed and emitted by the Subject
*/
public final class
ReplaySubject<T> extends
Subject<T, T> {
/** The state storing the history and the references. */
final
ReplayState<T>
state;
/** An empty array to trigger getValues() to return a new array. */
private static final
Object[]
EMPTY_ARRAY = new
Object[0];
/**
* Creates an unbounded replay subject.
* <p>
* The internal buffer is backed by an {@link ArrayList} and starts with an initial capacity of 16. Once the
* number of items reaches this capacity, it will grow as necessary (usually by 50%). However, as the
* number of items grows, this causes frequent array reallocation and copying, and may hurt performance
* and latency. This can be avoided with the {@link #create(int)} overload which takes an initial capacity
* parameter and can be tuned to reduce the array reallocation frequency as needed.
*
* @param <T>
* the type of items observed and emitted by the Subject
* @return the created subject
*/
public static <T>
ReplaySubject<T>
create() {
return
create(16);
}
/**
* Creates an unbounded replay subject with the specified initial buffer capacity.
* <p>
* Use this method to avoid excessive array reallocation while the internal buffer grows to accommodate new
* items. For example, if you know that the buffer will hold 32k items, you can ask the
* {@code ReplaySubject} to preallocate its internal array with a capacity to hold that many items. Once
* the items start to arrive, the internal array won't need to grow, creating less garbage and no overhead
* due to frequent array-copying.
*
* @param <T>
* the type of items observed and emitted by the Subject
* @param capacity
* the initial buffer capacity
* @return the created subject
*/
public static <T>
ReplaySubject<T>
create(int
capacity) {
if (
capacity <= 0) {
throw new
IllegalArgumentException("capacity > 0 required but it was " +
capacity);
}
ReplayBuffer<T>
buffer = new
ReplayUnboundedBuffer<T>(
capacity);
ReplayState<T>
state = new
ReplayState<T>(
buffer);
return new
ReplaySubject<T>(
state);
}
/**
* Creates an unbounded replay subject with the bounded-implementation for testing purposes.
* <p>
* This variant behaves like the regular unbounded {@code ReplaySubject} created via {@link #create()} but
* uses the structures of the bounded-implementation. This is by no means intended for the replacement of
* the original, array-backed and unbounded {@code ReplaySubject} due to the additional overhead of the
* linked-list based internal buffer. The sole purpose is to allow testing and reasoning about the behavior
* of the bounded implementations without the interference of the eviction policies.
*
* @param <T>
* the type of items observed and emitted by the Subject
* @return the created subject
*/
/* public */ static <T>
ReplaySubject<T>
createUnbounded() {
ReplayBuffer<T>
buffer = new
ReplaySizeBoundBuffer<T>(
Integer.
MAX_VALUE);
ReplayState<T>
state = new
ReplayState<T>(
buffer);
return new
ReplaySubject<T>(
state);
}
/**
* Creates an unbounded replay subject with the time-bounded-implementation for testing purposes.
* <p>
* This variant behaves like the regular unbounded {@code ReplaySubject} created via {@link #create()} but
* uses the structures of the bounded-implementation. This is by no means intended for the replacement of
* the original, array-backed and unbounded {@code ReplaySubject} due to the additional overhead of the
* linked-list based internal buffer. The sole purpose is to allow testing and reasoning about the behavior
* of the bounded implementations without the interference of the eviction policies.
*
* @param <T>
* the type of items observed and emitted by the Subject
* @return the created subject
*/
/* public */ static <T>
ReplaySubject<T>
createUnboundedTime() {
ReplayBuffer<T>
buffer = new
ReplaySizeAndTimeBoundBuffer<T>(
Integer.
MAX_VALUE,
Long.
MAX_VALUE,
Schedulers.
immediate());
ReplayState<T>
state = new
ReplayState<T>(
buffer);
return new
ReplaySubject<T>(
state);
}
/**
* Creates a size-bounded replay subject.
* <p>
* In this setting, the {@code ReplaySubject} holds at most {@code size} items in its internal buffer and
* discards the oldest item.
* <p>
* When observers subscribe to a terminated {@code ReplaySubject}, they are guaranteed to see at most
* {@code size} {@code onNext} events followed by a termination event.
* <p>
* If an observer subscribes while the {@code ReplaySubject} is active, it will observe all items in the
* buffer at that point in time and each item observed afterwards, even if the buffer evicts items due to
* the size constraint in the mean time. In other words, once an Observer subscribes, it will receive items
* without gaps in the sequence.
*
* @param <T>
* the type of items observed and emitted by the Subject
* @param size
* the maximum number of buffered items
* @return the created subject
*/
public static <T>
ReplaySubject<T>
createWithSize(int
size) {
ReplayBuffer<T>
buffer = new
ReplaySizeBoundBuffer<T>(
size);
ReplayState<T>
state = new
ReplayState<T>(
buffer);
return new
ReplaySubject<T>(
state);
}
/**
* Creates a time-bounded replay subject.
* <p>
* In this setting, the {@code ReplaySubject} internally tags each observed item with a timestamp value
* supplied by the {@link Scheduler} and keeps only those whose age is less than the supplied time value
* converted to milliseconds. For example, an item arrives at T=0 and the max age is set to 5; at T>=5
* this first item is then evicted by any subsequent item or termination event, leaving the buffer empty.
* <p>
* Once the subject is terminated, observers subscribing to it will receive items that remained in the
* buffer after the terminal event, regardless of their age.
* <p>
* If an observer subscribes while the {@code ReplaySubject} is active, it will observe only those items
* from within the buffer that have an age less than the specified time, and each item observed thereafter,
* even if the buffer evicts items due to the time constraint in the mean time. In other words, once an
* observer subscribes, it observes items without gaps in the sequence except for any outdated items at the
* beginning of the sequence.
* <p>
* Note that terminal notifications ({@code onError} and {@code onCompleted}) trigger eviction as well. For
* example, with a max age of 5, the first item is observed at T=0, then an {@code onCompleted} notification
* arrives at T=10. If an observer subscribes at T=11, it will find an empty {@code ReplaySubject} with just
* an {@code onCompleted} notification.
*
* @param <T>
* the type of items observed and emitted by the Subject
* @param time
* the maximum age of the contained items
* @param unit
* the time unit of {@code time}
* @param scheduler
* the {@link Scheduler} that provides the current time
* @return the created subject
*/
public static <T>
ReplaySubject<T>
createWithTime(long
time,
TimeUnit unit, final
Scheduler scheduler) {
return
createWithTimeAndSize(
time,
unit,
Integer.
MAX_VALUE,
scheduler);
}
/**
* Creates a time- and size-bounded replay subject.
* <p>
* In this setting, the {@code ReplaySubject} internally tags each received item with a timestamp value
* supplied by the {@link Scheduler} and holds at most {@code size} items in its internal buffer. It evicts
* items from the start of the buffer if their age becomes less-than or equal to the supplied age in
* milliseconds or the buffer reaches its {@code size} limit.
* <p>
* When observers subscribe to a terminated {@code ReplaySubject}, they observe the items that remained in
* the buffer after the terminal notification, regardless of their age, but at most {@code size} items.
* <p>
* If an observer subscribes while the {@code ReplaySubject} is active, it will observe only those items
* from within the buffer that have age less than the specified time and each subsequent item, even if the
* buffer evicts items due to the time constraint in the mean time. In other words, once an observer
* subscribes, it observes items without gaps in the sequence except for the outdated items at the beginning
* of the sequence.
* <p>
* Note that terminal notifications ({@code onError} and {@code onCompleted}) trigger eviction as well. For
* example, with a max age of 5, the first item is observed at T=0, then an {@code onCompleted} notification
* arrives at T=10. If an observer subscribes at T=11, it will find an empty {@code ReplaySubject} with just
* an {@code onCompleted} notification.
*
* @param <T>
* the type of items observed and emitted by the Subject
* @param time
* the maximum age of the contained items
* @param unit
* the time unit of {@code time}
* @param size
* the maximum number of buffered items
* @param scheduler
* the {@link Scheduler} that provides the current time
* @return the created subject
*/
public static <T>
ReplaySubject<T>
createWithTimeAndSize(long
time,
TimeUnit unit, int
size, final
Scheduler scheduler) {
ReplayBuffer<T>
buffer = new
ReplaySizeAndTimeBoundBuffer<T>(
size,
unit.
toMillis(
time),
scheduler);
ReplayState<T>
state = new
ReplayState<T>(
buffer);
return new
ReplaySubject<T>(
state);
}
ReplaySubject(
ReplayState<T>
state) {
super(
state);
this.
state =
state;
}
@
Override
public void
onNext(T
t) {
state.
onNext(
t);
}
@
Override
public void
onError(final
Throwable e) {
state.
onError(
e);
}
@
Override
public void
onCompleted() {
state.
onCompleted();
}
/**
* @return Returns the number of subscribers.
*/
/* Support test. */int
subscriberCount() {
return
state.
get().length;
}
@
Override
public boolean
hasObservers() {
return
state.
get().length != 0;
}
/**
* Check if the Subject has terminated with an exception.
* @return true if the subject has received a throwable through {@code onError}.
* @since 1.2
*/
public boolean
hasThrowable() {
return
state.
isTerminated() &&
state.
buffer.
error() != null;
}
/**
* Check if the Subject has terminated normally.
* @return true if the subject completed normally via {@code onCompleted}
* @since 1.2
*/
public boolean
hasCompleted() {
return
state.
isTerminated() &&
state.
buffer.
error() == null;
}
/**
* Returns the Throwable that terminated the Subject.
* @return the Throwable that terminated the Subject or {@code null} if the
* subject hasn't terminated yet or it terminated normally.
* @since 1.2
*/
public
Throwable getThrowable() {
if (
state.
isTerminated()) {
return
state.
buffer.
error();
}
return null;
}
/**
* Returns the current number of items (non-terminal events) available for replay.
* @return the number of items available
* @since 1.2
*/
public int
size() {
return
state.
buffer.
size();
}
/**
* @return true if the Subject holds at least one non-terminal event available for replay
* @since 1.2
*/
public boolean
hasAnyValue() {
return !
state.
buffer.
isEmpty();
}
/**
* @return true if the Subject holds at least one non-terminal event available for replay
* @since 1.2
*/
public boolean
hasValue() {
return
hasAnyValue();
}
/**
* Returns a snapshot of the currently buffered non-terminal events into
* the provided {@code a} array or creates a new array if it has not enough capacity.
* @param a the array to fill in
* @return the array {@code a} if it had enough capacity or a new array containing the available values
* @since 1.2
*/
public T[]
getValues(T[]
a) {
return
state.
buffer.
toArray(
a);
}
/**
* Returns a snapshot of the currently buffered non-terminal events.
* <p>The operation is thread-safe.
*
* @return a snapshot of the currently buffered non-terminal events.
* @since 1.2
*/
@
SuppressWarnings("unchecked")
public
Object[]
getValues() {
T[]
r =
getValues((T[])
EMPTY_ARRAY);
if (
r ==
EMPTY_ARRAY) {
return new
Object[0]; // don't leak the default empty array.
}
return
r;
}
/**
* @return the latest value available
* @since 1.2
*/
public T
getValue() {
return
state.
buffer.
last();
}
/**
* Holds onto the array of Subscriber-wrapping ReplayProducers and
* the buffer that holds values to be replayed; it manages
* subscription and signal dispatching.
*
* @param <T> the value type
*/
static final class
ReplayState<T>
extends
AtomicReference<
ReplayProducer<T>[]>
implements
OnSubscribe<T>,
Observer<T> {
/** */
private static final long
serialVersionUID = 5952362471246910544L;
final
ReplayBuffer<T>
buffer;
@
SuppressWarnings("rawtypes")
static final
ReplayProducer[]
EMPTY = new
ReplayProducer[0];
@
SuppressWarnings("rawtypes")
static final
ReplayProducer[]
TERMINATED = new
ReplayProducer[0];
@
SuppressWarnings("unchecked")
public
ReplayState(
ReplayBuffer<T>
buffer) {
this.
buffer =
buffer;
lazySet(
EMPTY);
}
@
Override
public void
call(
Subscriber<? super T>
t) {
ReplayProducer<T>
rp = new
ReplayProducer<T>(
t, this);
t.
add(
rp);
t.
setProducer(
rp);
if (
add(
rp)) {
if (
rp.
isUnsubscribed()) {
remove(
rp);
return;
}
}
buffer.
drain(
rp);
}
boolean
add(
ReplayProducer<T>
rp) {
for (;;) {
ReplayProducer<T>[]
a =
get();
if (
a ==
TERMINATED) {
return false;
}
int
n =
a.length;
@
SuppressWarnings("unchecked")
ReplayProducer<T>[]
b = new
ReplayProducer[
n + 1];
System.
arraycopy(
a, 0,
b, 0,
n);
b[
n] =
rp;
if (
compareAndSet(
a,
b)) {
return true;
}
}
}
@
SuppressWarnings("unchecked")
void
remove(
ReplayProducer<T>
rp) {
for (;;) {
ReplayProducer<T>[]
a =
get();
if (
a ==
TERMINATED ||
a ==
EMPTY) {
return;
}
int
n =
a.length;
int
j = -1;
for (int
i = 0;
i <
n;
i++) {
if (
a[
i] ==
rp) {
j =
i;
break;
}
}
if (
j < 0) {
return;
}
ReplayProducer<T>[]
b;
if (
n == 1) {
b =
EMPTY;
} else {
b = new
ReplayProducer[
n - 1];
System.
arraycopy(
a, 0,
b, 0,
j);
System.
arraycopy(
a,
j + 1,
b,
j,
n -
j - 1);
}
if (
compareAndSet(
a,
b)) {
return;
}
}
}
@
Override
public void
onNext(T
t) {
ReplayBuffer<T>
b =
buffer;
b.
next(
t);
for (
ReplayProducer<T>
rp :
get()) {
b.
drain(
rp);
}
}
@
SuppressWarnings("unchecked")
@
Override
public void
onError(
Throwable e) {
ReplayBuffer<T>
b =
buffer;
b.
error(
e);
List<
Throwable>
errors = null;
for (
ReplayProducer<T>
rp :
getAndSet(
TERMINATED)) {
try {
b.
drain(
rp);
} catch (
Throwable ex) {
if (
errors == null) {
errors = new
ArrayList<
Throwable>();
}
errors.
add(
ex);
}
}
Exceptions.
throwIfAny(
errors);
}
@
SuppressWarnings("unchecked")
@
Override
public void
onCompleted() {
ReplayBuffer<T>
b =
buffer;
b.
complete();
for (
ReplayProducer<T>
rp :
getAndSet(
TERMINATED)) {
b.
drain(
rp);
}
}
boolean
isTerminated() {
return
get() ==
TERMINATED;
}
}
/**
* The base interface for buffering signals to be replayed to individual
* Subscribers.
*
* @param <T> the value type
*/
interface
ReplayBuffer<T> {
void
next(T
t);
void
error(
Throwable e);
void
complete();
void
drain(
ReplayProducer<T>
rp);
boolean
isComplete();
Throwable error();
T
last();
int
size();
boolean
isEmpty();
T[]
toArray(T[]
a);
}
/**
* An unbounded ReplayBuffer implementation that uses linked-arrays
* to avoid copy-on-grow situation with ArrayList.
*
* @param <T> the value type
*/
static final class
ReplayUnboundedBuffer<T> implements
ReplayBuffer<T> {
final int
capacity;
volatile int
size;
final
Object[]
head;
Object[]
tail;
int
tailIndex;
volatile boolean
done;
Throwable error;
public
ReplayUnboundedBuffer(int
capacity) {
this.
capacity =
capacity;
this.
tail = this.
head = new
Object[
capacity + 1];
}
@
Override
public void
next(T
t) {
if (
done) {
return;
}
int
i =
tailIndex;
Object[]
a =
tail;
if (
i ==
a.length - 1) {
Object[]
b = new
Object[
a.length];
b[0] =
t;
tailIndex = 1;
a[
i] =
b;
tail =
b;
} else {
a[
i] =
t;
tailIndex =
i + 1;
}
size++;
}
@
Override
public void
error(
Throwable e) {
if (
done) {
RxJavaHooks.
onError(
e);
return;
}
error =
e;
done = true;
}
@
Override
public void
complete() {
done = true;
}
@
Override
public void
drain(
ReplayProducer<T>
rp) {
if (
rp.
getAndIncrement() != 0) {
return;
}
int
missed = 1;
final
Subscriber<? super T>
a =
rp.
actual;
final int
n =
capacity;
for (;;) {
long
r =
rp.
requested.
get();
long
e = 0L;
Object[]
node = (
Object[])
rp.
node;
if (
node == null) {
node =
head;
}
int
tailIndex =
rp.
tailIndex;
int
index =
rp.
index;
while (
e !=
r) {
if (
a.
isUnsubscribed()) {
rp.
node = null;
return;
}
boolean
d =
done;
boolean
empty =
index ==
size;
if (
d &&
empty) {
rp.
node = null;
Throwable ex =
error;
if (
ex != null) {
a.
onError(
ex);
} else {
a.
onCompleted();
}
return;
}
if (
empty) {
break;
}
if (
tailIndex ==
n) {
node = (
Object[])
node[
tailIndex];
tailIndex = 0;
}
@
SuppressWarnings("unchecked")
T
v = (T)
node[
tailIndex];
a.
onNext(
v);
e++;
tailIndex++;
index++;
}
if (
e ==
r) {
if (
a.
isUnsubscribed()) {
rp.
node = null;
return;
}
boolean
d =
done;
boolean
empty =
index ==
size;
if (
d &&
empty) {
rp.
node = null;
Throwable ex =
error;
if (
ex != null) {
a.
onError(
ex);
} else {
a.
onCompleted();
}
return;
}
}
if (
e != 0L) {
if (
r !=
Long.
MAX_VALUE) {
BackpressureUtils.
produced(
rp.
requested,
e);
}
}
rp.
index =
index;
rp.
tailIndex =
tailIndex;
rp.
node =
node;
missed =
rp.
addAndGet(-
missed);
if (
missed == 0) {
return;
}
}
}
@
Override
public boolean
isComplete() {
return
done;
}
@
Override
public
Throwable error() {
return
error;
}
@
SuppressWarnings("unchecked")
@
Override
public T
last() {
// we don't have a volatile read on tail and tailIndex
// so we have to traverse the linked structure up until
// we read size / capacity nodes and index into the array
// via size % capacity
int
s =
size;
if (
s == 0) {
return null;
}
Object[]
h =
head;
int
n =
capacity;
while (
s >=
n) {
h = (
Object[])
h[
n];
s -=
n;
}
return (T)
h[
s - 1];
}
@
Override
public int
size() {
return
size;
}
@
Override
public boolean
isEmpty() {
return
size == 0;
}
@
SuppressWarnings("unchecked")
@
Override
public T[]
toArray(T[]
a) {
int
s =
size;
if (
a.length <
s) {
a = (T[])
Array.
newInstance(
a.
getClass().
getComponentType(),
s);
}
Object[]
h =
head;
int
n =
capacity;
int
j = 0;
while (
j +
n <
s) {
System.
arraycopy(
h, 0,
a,
j,
n);
j +=
n;
h = (
Object[])
h[
n];
}
System.
arraycopy(
h, 0,
a,
j,
s -
j);
if (
a.length >
s) {
a[
s] = null;
}
return
a;
}
}
static final class
ReplaySizeBoundBuffer<T> implements
ReplayBuffer<T> {
final int
limit;
volatile
Node<T>
head;
Node<T>
tail;
int
size;
volatile boolean
done;
Throwable error;
public
ReplaySizeBoundBuffer(int
limit) {
this.
limit =
limit;
Node<T>
n = new
Node<T>(null);
this.
tail =
n;
this.
head =
n;
}
@
Override
public void
next(T
value) {
Node<T>
n = new
Node<T>(
value);
tail.
set(
n);
tail =
n;
int
s =
size;
if (
s ==
limit) {
head =
head.
get();
} else {
size =
s + 1;
}
}
@
Override
public void
error(
Throwable ex) {
error =
ex;
done = true;
}
@
Override
public void
complete() {
done = true;
}
@
Override
public void
drain(
ReplayProducer<T>
rp) {
if (
rp.
getAndIncrement() != 0) {
return;
}
final
Subscriber<? super T>
a =
rp.
actual;
int
missed = 1;
for (;;) {
long
r =
rp.
requested.
get();
long
e = 0L;
@
SuppressWarnings("unchecked")
Node<T>
node = (
Node<T>)
rp.
node;
if (
node == null) {
node =
head;
}
while (
e !=
r) {
if (
a.
isUnsubscribed()) {
rp.
node = null;
return;
}
boolean
d =
done;
Node<T>
next =
node.
get();
boolean
empty =
next == null;
if (
d &&
empty) {
rp.
node = null;
Throwable ex =
error;
if (
ex != null) {
a.
onError(
ex);
} else {
a.
onCompleted();
}
return;
}
if (
empty) {
break;
}
a.
onNext(
next.
value);
e++;
node =
next;
}
if (
e ==
r) {
if (
a.
isUnsubscribed()) {
rp.
node = null;
return;
}
boolean
d =
done;
boolean
empty =
node.
get() == null;
if (
d &&
empty) {
rp.
node = null;
Throwable ex =
error;
if (
ex != null) {
a.
onError(
ex);
} else {
a.
onCompleted();
}
return;
}
}
if (
e != 0L) {
if (
r !=
Long.
MAX_VALUE) {
BackpressureUtils.
produced(
rp.
requested,
e);
}
}
rp.
node =
node;
missed =
rp.
addAndGet(-
missed);
if (
missed == 0) {
return;
}
}
}
static final class
Node<T> extends
AtomicReference<
Node<T>> {
/** */
private static final long
serialVersionUID = 3713592843205853725L;
final T
value;
public
Node(T
value) {
this.
value =
value;
}
}
@
Override
public boolean
isComplete() {
return
done;
}
@
Override
public
Throwable error() {
return
error;
}
@
Override
public T
last() {
Node<T>
h =
head;
Node<T>
n;
while ((
n =
h.
get()) != null) {
h =
n;
}
return
h.
value;
}
@
Override
public int
size() {
int
s = 0;
Node<T>
n =
head.
get();
while (
n != null &&
s !=
Integer.
MAX_VALUE) {
n =
n.
get();
s++;
}
return
s;
}
@
Override
public boolean
isEmpty() {
return
head.
get() == null;
}
@
Override
public T[]
toArray(T[]
a) {
List<T>
list = new
ArrayList<T>();
Node<T>
n =
head.
get();
while (
n != null) {
list.
add(
n.
value);
n =
n.
get();
}
return
list.
toArray(
a);
}
}
static final class
ReplaySizeAndTimeBoundBuffer<T> implements
ReplayBuffer<T> {
final int
limit;
final long
maxAgeMillis;
final
Scheduler scheduler;
volatile
TimedNode<T>
head;
TimedNode<T>
tail;
int
size;
volatile boolean
done;
Throwable error;
public
ReplaySizeAndTimeBoundBuffer(int
limit, long
maxAgeMillis,
Scheduler scheduler) {
this.
limit =
limit;
TimedNode<T>
n = new
TimedNode<T>(null, 0L);
this.
tail =
n;
this.
head =
n;
this.
maxAgeMillis =
maxAgeMillis;
this.
scheduler =
scheduler;
}
@
Override
public void
next(T
value) {
long
now =
scheduler.
now();
TimedNode<T>
n = new
TimedNode<T>(
value,
now);
tail.
set(
n);
tail =
n;
now -=
maxAgeMillis;
int
s =
size;
TimedNode<T>
h0 =
head;
TimedNode<T>
h =
h0;
if (
s ==
limit) {
h =
h.
get();
} else {
s++;
}
while ((
n =
h.
get()) != null) {
if (
n.
timestamp >
now) {
break;
}
h =
n;
s--;
}
size =
s;
if (
h !=
h0) {
head =
h;
}
}
@
Override
public void
error(
Throwable ex) {
evictFinal();
error =
ex;
done = true;
}
@
Override
public void
complete() {
evictFinal();
done = true;
}
void
evictFinal() {
long
now =
scheduler.
now() -
maxAgeMillis;
TimedNode<T>
h0 =
head;
TimedNode<T>
h =
h0;
TimedNode<T>
n;
while ((
n =
h.
get()) != null) {
if (
n.
timestamp >
now) {
break;
}
h =
n;
}
if (
h0 !=
h) {
head =
h;
}
}
TimedNode<T>
latestHead() {
long
now =
scheduler.
now() -
maxAgeMillis;
TimedNode<T>
h =
head;
TimedNode<T>
n;
while ((
n =
h.
get()) != null) {
if (
n.
timestamp >
now) {
break;
}
h =
n;
}
return
h;
}
@
Override
public void
drain(
ReplayProducer<T>
rp) {
if (
rp.
getAndIncrement() != 0) {
return;
}
final
Subscriber<? super T>
a =
rp.
actual;
int
missed = 1;
for (;;) {
long
r =
rp.
requested.
get();
long
e = 0L;
@
SuppressWarnings("unchecked")
TimedNode<T>
node = (
TimedNode<T>)
rp.
node;
if (
node == null) {
node =
latestHead();
}
while (
e !=
r) {
if (
a.
isUnsubscribed()) {
rp.
node = null;
return;
}
boolean
d =
done;
TimedNode<T>
next =
node.
get();
boolean
empty =
next == null;
if (
d &&
empty) {
rp.
node = null;
Throwable ex =
error;
if (
ex != null) {
a.
onError(
ex);
} else {
a.
onCompleted();
}
return;
}
if (
empty) {
break;
}
a.
onNext(
next.
value);
e++;
node =
next;
}
if (
e ==
r) {
if (
a.
isUnsubscribed()) {
rp.
node = null;
return;
}
boolean
d =
done;
boolean
empty =
node.
get() == null;
if (
d &&
empty) {
rp.
node = null;
Throwable ex =
error;
if (
ex != null) {
a.
onError(
ex);
} else {
a.
onCompleted();
}
return;
}
}
if (
e != 0L) {
if (
r !=
Long.
MAX_VALUE) {
BackpressureUtils.
produced(
rp.
requested,
e);
}
}
rp.
node =
node;
missed =
rp.
addAndGet(-
missed);
if (
missed == 0) {
return;
}
}
}
static final class
TimedNode<T> extends
AtomicReference<
TimedNode<T>> {
/** */
private static final long
serialVersionUID = 3713592843205853725L;
final T
value;
final long
timestamp;
public
TimedNode(T
value, long
timestamp) {
this.
value =
value;
this.
timestamp =
timestamp;
}
}
@
Override
public boolean
isComplete() {
return
done;
}
@
Override
public
Throwable error() {
return
error;
}
@
Override
public T
last() {
TimedNode<T>
h =
latestHead();
TimedNode<T>
n;
while ((
n =
h.
get()) != null) {
h =
n;
}
return
h.
value;
}
@
Override
public int
size() {
int
s = 0;
TimedNode<T>
n =
latestHead().
get();
while (
n != null &&
s !=
Integer.
MAX_VALUE) {
n =
n.
get();
s++;
}
return
s;
}
@
Override
public boolean
isEmpty() {
return
latestHead().
get() == null;
}
@
Override
public T[]
toArray(T[]
a) {
List<T>
list = new
ArrayList<T>();
TimedNode<T>
n =
latestHead().
get();
while (
n != null) {
list.
add(
n.
value);
n =
n.
get();
}
return
list.
toArray(
a);
}
}
/**
* A producer and subscription implementation that tracks the current
* replay position of a particular subscriber.
* <p>
* The this holds the current work-in-progress indicator used by serializing
* replays.
*
* @param <T> the value type
*/
static final class
ReplayProducer<T>
extends
AtomicInteger
implements
Producer,
Subscription {
/** */
private static final long
serialVersionUID = -5006209596735204567L;
/** The wrapped Subscriber instance. */
final
Subscriber<? super T>
actual;
/** Holds the current requested amount. */
final
AtomicLong requested;
/** Holds the back-reference to the replay state object. */
final
ReplayState<T>
state;
/**
* Unbounded buffer.drain() uses this field to remember the absolute index of
* values replayed to this Subscriber.
*/
int
index;
/**
* Unbounded buffer.drain() uses this index within its current node to indicate
* how many items were replayed from that particular node so far.
*/
int
tailIndex;
/**
* Stores the current replay node of the buffer to be used by buffer.drain().
*/
Object node;
public
ReplayProducer(
Subscriber<? super T>
actual,
ReplayState<T>
state) {
this.
actual =
actual;
this.
requested = new
AtomicLong();
this.
state =
state;
}
@
Override
public void
unsubscribe() {
state.
remove(this);
}
@
Override
public boolean
isUnsubscribed() {
return
actual.
isUnsubscribed();
}
@
Override
public void
request(long
n) {
if (
n > 0L) {
BackpressureUtils.
getAndAddRequest(
requested,
n);
state.
buffer.
drain(this);
} else if (
n < 0L) {
throw new
IllegalArgumentException("n >= required but it was " +
n);
}
}
}
}