/*
* Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.core.publisher;
import java.util.
Objects;
import java.util.
Queue;
import java.util.concurrent.
TimeUnit;
import java.util.concurrent.atomic.
AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.
AtomicLongFieldUpdater;
import java.util.concurrent.atomic.
AtomicReferenceFieldUpdater;
import java.util.stream.
Stream;
import org.reactivestreams.
Subscriber;
import org.reactivestreams.
Subscription;
import reactor.core.
CoreSubscriber;
import reactor.core.
Disposable;
import reactor.core.
Disposables;
import reactor.core.
Exceptions;
import reactor.core.
Scannable;
import reactor.core.scheduler.
Scheduler;
import reactor.util.concurrent.
Queues;
/**
* @author David Karnok
*/
final class
FluxWindowTimeout<T> extends
FluxOperator<T,
Flux<T>> {
final int
maxSize;
final long
timespan;
final
Scheduler timer;
FluxWindowTimeout(
Flux<T>
source, int
maxSize, long
timespan,
Scheduler timer) {
super(
source);
if (
timespan <= 0) {
throw new
IllegalArgumentException("Timeout period must be strictly positive");
}
if (
maxSize <= 0) {
throw new
IllegalArgumentException("maxSize must be strictly positive");
}
this.
timer =
Objects.
requireNonNull(
timer, "Timer");
this.
timespan =
timespan;
this.
maxSize =
maxSize;
}
@
Override
public void
subscribe(
CoreSubscriber<? super
Flux<T>>
actual) {
source.
subscribe(new
WindowTimeoutSubscriber<>(
actual,
maxSize,
timespan,
timer));
}
@
Override
public
Object scanUnsafe(
Attr key) {
if (
key ==
Attr.
RUN_ON) return
timer;
return super.scanUnsafe(
key);
}
static final class
WindowTimeoutSubscriber<T> implements
InnerOperator<T,
Flux<T>> {
final
CoreSubscriber<? super
Flux<T>>
actual;
final long
timespan;
final
Scheduler scheduler;
final int
maxSize;
final
Scheduler.
Worker worker;
final
Queue<
Object>
queue;
Throwable error;
volatile boolean
done;
volatile boolean
cancelled;
volatile long
requested;
@
SuppressWarnings("rawtypes")
static final
AtomicLongFieldUpdater<
WindowTimeoutSubscriber>
REQUESTED =
AtomicLongFieldUpdater.
newUpdater(
WindowTimeoutSubscriber.class,
"requested");
volatile int
wip;
@
SuppressWarnings("rawtypes")
static final
AtomicIntegerFieldUpdater<
WindowTimeoutSubscriber>
WIP =
AtomicIntegerFieldUpdater.
newUpdater(
WindowTimeoutSubscriber.class,
"wip");
int
count;
long
producerIndex;
Subscription s;
UnicastProcessor<T>
window;
volatile boolean
terminated;
volatile
Disposable timer;
@
SuppressWarnings("rawtypes")
static final
AtomicReferenceFieldUpdater<
WindowTimeoutSubscriber,
Disposable>
TIMER =
AtomicReferenceFieldUpdater.
newUpdater(
WindowTimeoutSubscriber.class,
Disposable.class, "timer");
WindowTimeoutSubscriber(
CoreSubscriber<? super
Flux<T>>
actual,
int
maxSize,
long
timespan,
Scheduler scheduler) {
this.
actual =
actual;
this.
queue =
Queues.
unboundedMultiproducer().
get();
this.
timespan =
timespan;
this.
scheduler =
scheduler;
this.
maxSize =
maxSize;
this.
worker =
scheduler.
createWorker();
}
@
Override
public
CoreSubscriber<? super
Flux<T>>
actual() {
return
actual;
}
@
Override
public
Stream<? extends
Scannable>
inners() {
UnicastProcessor<T>
w =
window;
return
w == null ?
Stream.
empty() :
Stream.
of(
w);
}
@
Override
public
Object scanUnsafe(
Attr key) {
if (
key ==
Attr.
PARENT) return
s;
if (
key ==
Attr.
CANCELLED) return
cancelled;
if (
key ==
Attr.
TERMINATED) return
done;
if (
key ==
Attr.
REQUESTED_FROM_DOWNSTREAM) return
requested;
if (
key ==
Attr.
CAPACITY) return
maxSize;
if (
key ==
Attr.
BUFFERED) return
queue.
size();
if (
key ==
Attr.
RUN_ON) return
worker;
return
InnerOperator.super.scanUnsafe(
key);
}
@
Override
public void
onSubscribe(
Subscription s) {
if (
Operators.
validate(this.
s,
s)) {
this.
s =
s;
Subscriber<? super
Flux<T>>
a =
actual;
a.
onSubscribe(this);
if (
cancelled) {
return;
}
UnicastProcessor<T>
w =
UnicastProcessor.
create();
window =
w;
long
r =
requested;
if (
r != 0L) {
a.
onNext(
w);
if (
r !=
Long.
MAX_VALUE) {
REQUESTED.
decrementAndGet(this);
}
}
else {
a.
onError(
Operators.
onOperatorError(
s,
Exceptions.
failWithOverflow(),
actual.
currentContext()));
return;
}
if (
OperatorDisposables.
replace(
TIMER, this,
newPeriod())) {
s.
request(
Long.
MAX_VALUE);
}
}
}
Disposable newPeriod() {
try {
return
worker.
schedulePeriodically(new
ConsumerIndexHolder(
producerIndex,
this),
timespan,
timespan,
TimeUnit.
MILLISECONDS);
}
catch (
Exception e) {
actual.
onError(
Operators.
onRejectedExecution(
e,
s, null, null,
actual.
currentContext()));
return
Disposables.
disposed();
}
}
@
Override
public void
onNext(T
t) {
if (
terminated) {
return;
}
if (
WIP.
get(this) == 0 &&
WIP.
compareAndSet(this, 0, 1)) {
UnicastProcessor<T>
w =
window;
w.
onNext(
t);
int
c =
count + 1;
if (
c >=
maxSize) {
producerIndex++;
count = 0;
w.
onComplete();
long
r =
requested;
if (
r != 0L) {
w =
UnicastProcessor.
create();
window =
w;
actual.
onNext(
w);
if (
r !=
Long.
MAX_VALUE) {
REQUESTED.
decrementAndGet(this);
}
Disposable tm =
timer;
tm.
dispose();
Disposable task =
newPeriod();
if (!
TIMER.
compareAndSet(this,
tm,
task)) {
task.
dispose();
}
}
else {
window = null;
actual.
onError(
Operators.
onOperatorError(
s,
Exceptions.
failWithOverflow(),
t,
actual
.
currentContext()));
timer.
dispose();
worker.
dispose();
return;
}
}
else {
count =
c;
}
if (
WIP.
decrementAndGet(this) == 0) {
return;
}
}
else {
queue.
offer(
t);
if (!
enter()) {
return;
}
}
drainLoop();
}
@
Override
public void
onError(
Throwable t) {
error =
t;
done = true;
if (
enter()) {
drainLoop();
}
actual.
onError(
t);
timer.
dispose();
worker.
dispose();
}
@
Override
public void
onComplete() {
done = true;
if (
enter()) {
drainLoop();
}
actual.
onComplete();
timer.
dispose();
worker.
dispose();
}
@
Override
public void
request(long
n) {
if(
Operators.
validate(
n)) {
Operators.
addCap(
REQUESTED, this,
n);
}
}
@
Override
public void
cancel() {
cancelled = true;
}
@
SuppressWarnings("unchecked")
void
drainLoop() {
final
Queue<
Object>
q =
queue;
final
Subscriber<? super
Flux<T>>
a =
actual;
UnicastProcessor<T>
w =
window;
int
missed = 1;
for (; ; ) {
for (; ; ) {
if (
terminated) {
s.
cancel();
q.
clear();
timer.
dispose();
worker.
dispose();
return;
}
boolean
d =
done;
Object o =
q.
poll();
boolean
empty =
o == null;
boolean
isHolder =
o instanceof
ConsumerIndexHolder;
if (
d && (
empty ||
isHolder)) {
window = null;
q.
clear();
Throwable err =
error;
if (
err != null) {
w.
onError(
err);
}
else {
w.
onComplete();
}
timer.
dispose();
worker.
dispose();
return;
}
if (
empty) {
break;
}
if (
isHolder) {
w.
onComplete();
count = 0;
w =
UnicastProcessor.
create();
window =
w;
long
r =
requested;
if (
r != 0L) {
a.
onNext(
w);
if (
r !=
Long.
MAX_VALUE) {
REQUESTED.
decrementAndGet(this);
}
}
else {
window = null;
queue.
clear();
a.
onError(
Operators.
onOperatorError(
s,
Exceptions.
failWithOverflow(),
actual.
currentContext()));
timer.
dispose();
worker.
dispose();
return;
}
continue;
}
w.
onNext((T)
o);
int
c =
count + 1;
if (
c >=
maxSize) {
producerIndex++;
count = 0;
w.
onComplete();
long
r =
requested;
if (
r != 0L) {
w =
UnicastProcessor.
create();
window =
w;
actual.
onNext(
w);
if (
r !=
Long.
MAX_VALUE) {
REQUESTED.
decrementAndGet(this);
}
Disposable tm =
timer;
tm.
dispose();
Disposable task =
newPeriod();
if (!
TIMER.
compareAndSet(this,
tm,
task)) {
task.
dispose();
}
}
else {
window = null;
a.
onError(
Operators.
onOperatorError(
s,
Exceptions.
failWithOverflow(),
o,
actual
.
currentContext()));
timer.
dispose();
worker.
dispose();
return;
}
}
else {
count =
c;
}
}
missed =
WIP.
addAndGet(this, -
missed);
if (
missed == 0) {
break;
}
}
}
boolean
enter() {
return
WIP.
getAndIncrement(this) == 0;
}
static final class
ConsumerIndexHolder implements
Runnable {
final long
index;
final
WindowTimeoutSubscriber<?>
parent;
ConsumerIndexHolder(long
index,
WindowTimeoutSubscriber<?>
parent) {
this.
index =
index;
this.
parent =
parent;
}
@
Override
public void
run() {
WindowTimeoutSubscriber<?>
p =
parent;
if (!
p.
cancelled) {
p.
queue.
offer(this);
}
else {
p.
terminated = true;
p.
timer.
dispose();
p.
worker.
dispose();
}
if (
p.
enter()) {
p.
drainLoop();
}
}
}
}
}