/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.internal.operators.observable;
import java.util.*;
import java.util.concurrent.
TimeUnit;
import java.util.concurrent.atomic.
AtomicReference;
import io.reactivex.*;
import io.reactivex.
Observable;
import io.reactivex.
Observer;
import io.reactivex.
Scheduler.
Worker;
import io.reactivex.disposables.
Disposable;
import io.reactivex.internal.disposables.
DisposableHelper;
import io.reactivex.internal.observers.
QueueDrainObserver;
import io.reactivex.internal.queue.
MpscLinkedQueue;
import io.reactivex.internal.util.
NotificationLite;
import io.reactivex.observers.
SerializedObserver;
import io.reactivex.subjects.
UnicastSubject;
public final class
ObservableWindowTimed<T> extends
AbstractObservableWithUpstream<T,
Observable<T>> {
final long
timespan;
final long
timeskip;
final
TimeUnit unit;
final
Scheduler scheduler;
final long
maxSize;
final int
bufferSize;
final boolean
restartTimerOnMaxSize;
public
ObservableWindowTimed(
ObservableSource<T>
source,
long
timespan, long
timeskip,
TimeUnit unit,
Scheduler scheduler, long
maxSize,
int
bufferSize, boolean
restartTimerOnMaxSize) {
super(
source);
this.
timespan =
timespan;
this.
timeskip =
timeskip;
this.
unit =
unit;
this.
scheduler =
scheduler;
this.
maxSize =
maxSize;
this.
bufferSize =
bufferSize;
this.
restartTimerOnMaxSize =
restartTimerOnMaxSize;
}
@
Override
public void
subscribeActual(
Observer<? super
Observable<T>>
t) {
SerializedObserver<
Observable<T>>
actual = new
SerializedObserver<
Observable<T>>(
t);
if (
timespan ==
timeskip) {
if (
maxSize ==
Long.
MAX_VALUE) {
source.
subscribe(new
WindowExactUnboundedObserver<T>(
actual,
timespan,
unit,
scheduler,
bufferSize));
return;
}
source.
subscribe(new
WindowExactBoundedObserver<T>(
actual,
timespan,
unit,
scheduler,
bufferSize,
maxSize,
restartTimerOnMaxSize));
return;
}
source.
subscribe(new
WindowSkipObserver<T>(
actual,
timespan,
timeskip,
unit,
scheduler.
createWorker(),
bufferSize));
}
static final class
WindowExactUnboundedObserver<T>
extends
QueueDrainObserver<T,
Object,
Observable<T>>
implements
Observer<T>,
Disposable,
Runnable {
final long
timespan;
final
TimeUnit unit;
final
Scheduler scheduler;
final int
bufferSize;
Disposable upstream;
UnicastSubject<T>
window;
final
AtomicReference<
Disposable>
timer = new
AtomicReference<
Disposable>();
static final
Object NEXT = new
Object();
volatile boolean
terminated;
WindowExactUnboundedObserver(
Observer<? super
Observable<T>>
actual, long
timespan,
TimeUnit unit,
Scheduler scheduler, int
bufferSize) {
super(
actual, new
MpscLinkedQueue<
Object>());
this.
timespan =
timespan;
this.
unit =
unit;
this.
scheduler =
scheduler;
this.
bufferSize =
bufferSize;
}
@
Override
public void
onSubscribe(
Disposable d) {
if (
DisposableHelper.
validate(this.
upstream,
d)) {
this.
upstream =
d;
window =
UnicastSubject.<T>
create(
bufferSize);
Observer<? super
Observable<T>>
a =
downstream;
a.
onSubscribe(this);
a.
onNext(
window);
if (!
cancelled) {
Disposable task =
scheduler.
schedulePeriodicallyDirect(this,
timespan,
timespan,
unit);
DisposableHelper.
replace(
timer,
task);
}
}
}
@
Override
public void
onNext(T
t) {
if (
terminated) {
return;
}
if (
fastEnter()) {
window.
onNext(
t);
if (
leave(-1) == 0) {
return;
}
} else {
queue.
offer(
NotificationLite.
next(
t));
if (!
enter()) {
return;
}
}
drainLoop();
}
@
Override
public void
onError(
Throwable t) {
error =
t;
done = true;
if (
enter()) {
drainLoop();
}
disposeTimer();
downstream.
onError(
t);
}
@
Override
public void
onComplete() {
done = true;
if (
enter()) {
drainLoop();
}
disposeTimer();
downstream.
onComplete();
}
@
Override
public void
dispose() {
cancelled = true;
}
@
Override
public boolean
isDisposed() {
return
cancelled;
}
void
disposeTimer() {
DisposableHelper.
dispose(
timer);
}
@
Override
public void
run() {
if (
cancelled) {
terminated = true;
disposeTimer();
}
queue.
offer(
NEXT);
if (
enter()) {
drainLoop();
}
}
void
drainLoop() {
final
MpscLinkedQueue<
Object>
q = (
MpscLinkedQueue<
Object>)
queue;
final
Observer<? super
Observable<T>>
a =
downstream;
UnicastSubject<T>
w =
window;
int
missed = 1;
for (;;) {
for (;;) {
boolean
term =
terminated; // NOPMD
boolean
d =
done;
Object o =
q.
poll();
if (
d && (
o == null ||
o ==
NEXT)) {
window = null;
q.
clear();
disposeTimer();
Throwable err =
error;
if (
err != null) {
w.
onError(
err);
} else {
w.
onComplete();
}
return;
}
if (
o == null) {
break;
}
if (
o ==
NEXT) {
w.
onComplete();
if (!
term) {
w =
UnicastSubject.
create(
bufferSize);
window =
w;
a.
onNext(
w);
} else {
upstream.
dispose();
}
continue;
}
w.
onNext(
NotificationLite.<T>
getValue(
o));
}
missed =
leave(-
missed);
if (
missed == 0) {
break;
}
}
}
}
static final class
WindowExactBoundedObserver<T>
extends
QueueDrainObserver<T,
Object,
Observable<T>>
implements
Disposable {
final long
timespan;
final
TimeUnit unit;
final
Scheduler scheduler;
final int
bufferSize;
final boolean
restartTimerOnMaxSize;
final long
maxSize;
final
Scheduler.
Worker worker;
long
count;
long
producerIndex;
Disposable upstream;
UnicastSubject<T>
window;
volatile boolean
terminated;
final
AtomicReference<
Disposable>
timer = new
AtomicReference<
Disposable>();
WindowExactBoundedObserver(
Observer<? super
Observable<T>>
actual,
long
timespan,
TimeUnit unit,
Scheduler scheduler,
int
bufferSize, long
maxSize, boolean
restartTimerOnMaxSize) {
super(
actual, new
MpscLinkedQueue<
Object>());
this.
timespan =
timespan;
this.
unit =
unit;
this.
scheduler =
scheduler;
this.
bufferSize =
bufferSize;
this.
maxSize =
maxSize;
this.
restartTimerOnMaxSize =
restartTimerOnMaxSize;
if (
restartTimerOnMaxSize) {
worker =
scheduler.
createWorker();
} else {
worker = null;
}
}
@
Override
public void
onSubscribe(
Disposable d) {
if (
DisposableHelper.
validate(this.
upstream,
d)) {
this.
upstream =
d;
Observer<? super
Observable<T>>
a =
downstream;
a.
onSubscribe(this);
if (
cancelled) {
return;
}
UnicastSubject<T>
w =
UnicastSubject.
create(
bufferSize);
window =
w;
a.
onNext(
w);
Disposable task;
ConsumerIndexHolder consumerIndexHolder = new
ConsumerIndexHolder(
producerIndex, this);
if (
restartTimerOnMaxSize) {
task =
worker.
schedulePeriodically(
consumerIndexHolder,
timespan,
timespan,
unit);
} else {
task =
scheduler.
schedulePeriodicallyDirect(
consumerIndexHolder,
timespan,
timespan,
unit);
}
DisposableHelper.
replace(
timer,
task);
}
}
@
Override
public void
onNext(T
t) {
if (
terminated) {
return;
}
if (
fastEnter()) {
UnicastSubject<T>
w =
window;
w.
onNext(
t);
long
c =
count + 1;
if (
c >=
maxSize) {
producerIndex++;
count = 0;
w.
onComplete();
w =
UnicastSubject.
create(
bufferSize);
window =
w;
downstream.
onNext(
w);
if (
restartTimerOnMaxSize) {
Disposable tm =
timer.
get();
tm.
dispose();
Disposable task =
worker.
schedulePeriodically(
new
ConsumerIndexHolder(
producerIndex, this),
timespan,
timespan,
unit);
DisposableHelper.
replace(
timer,
task);
}
} else {
count =
c;
}
if (
leave(-1) == 0) {
return;
}
} else {
queue.
offer(
NotificationLite.
next(
t));
if (!
enter()) {
return;
}
}
drainLoop();
}
@
Override
public void
onError(
Throwable t) {
error =
t;
done = true;
if (
enter()) {
drainLoop();
}
downstream.
onError(
t);
disposeTimer();
}
@
Override
public void
onComplete() {
done = true;
if (
enter()) {
drainLoop();
}
downstream.
onComplete();
disposeTimer();
}
@
Override
public void
dispose() {
cancelled = true;
}
@
Override
public boolean
isDisposed() {
return
cancelled;
}
void
disposeTimer() {
DisposableHelper.
dispose(
timer);
Worker w =
worker;
if (
w != null) {
w.
dispose();
}
}
void
drainLoop() {
final
MpscLinkedQueue<
Object>
q = (
MpscLinkedQueue<
Object>)
queue;
final
Observer<? super
Observable<T>>
a =
downstream;
UnicastSubject<T>
w =
window;
int
missed = 1;
for (;;) {
for (;;) {
if (
terminated) {
upstream.
dispose();
q.
clear();
disposeTimer();
return;
}
boolean
d =
done;
Object o =
q.
poll();
boolean
empty =
o == null;
boolean
isHolder =
o instanceof
ConsumerIndexHolder;
if (
d && (
empty ||
isHolder)) {
window = null;
q.
clear();
disposeTimer();
Throwable err =
error;
if (
err != null) {
w.
onError(
err);
} else {
w.
onComplete();
}
return;
}
if (
empty) {
break;
}
if (
isHolder) {
ConsumerIndexHolder consumerIndexHolder = (
ConsumerIndexHolder)
o;
if (
restartTimerOnMaxSize ||
producerIndex ==
consumerIndexHolder.
index) {
w.
onComplete();
count = 0;
w =
UnicastSubject.
create(
bufferSize);
window =
w;
a.
onNext(
w);
}
continue;
}
w.
onNext(
NotificationLite.<T>
getValue(
o));
long
c =
count + 1;
if (
c >=
maxSize) {
producerIndex++;
count = 0;
w.
onComplete();
w =
UnicastSubject.
create(
bufferSize);
window =
w;
downstream.
onNext(
w);
if (
restartTimerOnMaxSize) {
Disposable tm =
timer.
get();
tm.
dispose();
Disposable task =
worker.
schedulePeriodically(
new
ConsumerIndexHolder(
producerIndex, this),
timespan,
timespan,
unit);
if (!
timer.
compareAndSet(
tm,
task)) {
task.
dispose();
}
}
} else {
count =
c;
}
}
missed =
leave(-
missed);
if (
missed == 0) {
break;
}
}
}
static final class
ConsumerIndexHolder implements
Runnable {
final long
index;
final
WindowExactBoundedObserver<?>
parent;
ConsumerIndexHolder(long
index,
WindowExactBoundedObserver<?>
parent) {
this.
index =
index;
this.
parent =
parent;
}
@
Override
public void
run() {
WindowExactBoundedObserver<?>
p =
parent;
if (!
p.
cancelled) {
p.
queue.
offer(this);
} else {
p.
terminated = true;
p.
disposeTimer();
}
if (
p.
enter()) {
p.
drainLoop();
}
}
}
}
static final class
WindowSkipObserver<T>
extends
QueueDrainObserver<T,
Object,
Observable<T>>
implements
Disposable,
Runnable {
final long
timespan;
final long
timeskip;
final
TimeUnit unit;
final
Scheduler.
Worker worker;
final int
bufferSize;
final
List<
UnicastSubject<T>>
windows;
Disposable upstream;
volatile boolean
terminated;
WindowSkipObserver(
Observer<? super
Observable<T>>
actual,
long
timespan, long
timeskip,
TimeUnit unit,
Worker worker, int
bufferSize) {
super(
actual, new
MpscLinkedQueue<
Object>());
this.
timespan =
timespan;
this.
timeskip =
timeskip;
this.
unit =
unit;
this.
worker =
worker;
this.
bufferSize =
bufferSize;
this.
windows = new
LinkedList<
UnicastSubject<T>>();
}
@
Override
public void
onSubscribe(
Disposable d) {
if (
DisposableHelper.
validate(this.
upstream,
d)) {
this.
upstream =
d;
downstream.
onSubscribe(this);
if (
cancelled) {
return;
}
final
UnicastSubject<T>
w =
UnicastSubject.
create(
bufferSize);
windows.
add(
w);
downstream.
onNext(
w);
worker.
schedule(new
CompletionTask(
w),
timespan,
unit);
worker.
schedulePeriodically(this,
timeskip,
timeskip,
unit);
}
}
@
Override
public void
onNext(T
t) {
if (
fastEnter()) {
for (
UnicastSubject<T>
w :
windows) {
w.
onNext(
t);
}
if (
leave(-1) == 0) {
return;
}
} else {
queue.
offer(
t);
if (!
enter()) {
return;
}
}
drainLoop();
}
@
Override
public void
onError(
Throwable t) {
error =
t;
done = true;
if (
enter()) {
drainLoop();
}
downstream.
onError(
t);
disposeWorker();
}
@
Override
public void
onComplete() {
done = true;
if (
enter()) {
drainLoop();
}
downstream.
onComplete();
disposeWorker();
}
@
Override
public void
dispose() {
cancelled = true;
}
@
Override
public boolean
isDisposed() {
return
cancelled;
}
void
disposeWorker() {
worker.
dispose();
}
void
complete(
UnicastSubject<T>
w) {
queue.
offer(new
SubjectWork<T>(
w, false));
if (
enter()) {
drainLoop();
}
}
@
SuppressWarnings("unchecked")
void
drainLoop() {
final
MpscLinkedQueue<
Object>
q = (
MpscLinkedQueue<
Object>)
queue;
final
Observer<? super
Observable<T>>
a =
downstream;
final
List<
UnicastSubject<T>>
ws =
windows;
int
missed = 1;
for (;;) {
for (;;) {
if (
terminated) {
upstream.
dispose();
disposeWorker();
q.
clear();
ws.
clear();
return;
}
boolean
d =
done;
Object v =
q.
poll();
boolean
empty =
v == null;
boolean
sw =
v instanceof
SubjectWork;
if (
d && (
empty ||
sw)) {
q.
clear();
Throwable e =
error;
if (
e != null) {
for (
UnicastSubject<T>
w :
ws) {
w.
onError(
e);
}
} else {
for (
UnicastSubject<T>
w :
ws) {
w.
onComplete();
}
}
disposeWorker();
ws.
clear();
return;
}
if (
empty) {
break;
}
if (
sw) {
SubjectWork<T>
work = (
SubjectWork<T>)
v;
if (
work.
open) {
if (
cancelled) {
continue;
}
final
UnicastSubject<T>
w =
UnicastSubject.
create(
bufferSize);
ws.
add(
w);
a.
onNext(
w);
worker.
schedule(new
CompletionTask(
w),
timespan,
unit);
} else {
ws.
remove(
work.
w);
work.
w.
onComplete();
if (
ws.
isEmpty() &&
cancelled) {
terminated = true;
}
}
} else {
for (
UnicastSubject<T>
w :
ws) {
w.
onNext((T)
v);
}
}
}
missed =
leave(-
missed);
if (
missed == 0) {
break;
}
}
}
@
Override
public void
run() {
UnicastSubject<T>
w =
UnicastSubject.
create(
bufferSize);
SubjectWork<T>
sw = new
SubjectWork<T>(
w, true);
if (!
cancelled) {
queue.
offer(
sw);
}
if (
enter()) {
drainLoop();
}
}
static final class
SubjectWork<T> {
final
UnicastSubject<T>
w;
final boolean
open;
SubjectWork(
UnicastSubject<T>
w, boolean
open) {
this.
w =
w;
this.
open =
open;
}
}
final class
CompletionTask implements
Runnable {
private final
UnicastSubject<T>
w;
CompletionTask(
UnicastSubject<T>
w) {
this.
w =
w;
}
@
Override
public void
run() {
complete(
w);
}
}
}
}