/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.internal.operators.observable;
import java.util.concurrent.atomic.*;
import io.reactivex.*;
import io.reactivex.disposables.
Disposable;
import io.reactivex.exceptions.
Exceptions;
import io.reactivex.functions.
Function;
import io.reactivex.internal.disposables.*;
import io.reactivex.internal.functions.
ObjectHelper;
import io.reactivex.internal.queue.
SpscLinkedArrayQueue;
import io.reactivex.internal.util.
AtomicThrowable;
import io.reactivex.plugins.
RxJavaPlugins;
public final class
ObservableCombineLatest<T, R> extends
Observable<R> {
final
ObservableSource<? extends T>[]
sources;
final
Iterable<? extends
ObservableSource<? extends T>>
sourcesIterable;
final
Function<? super
Object[], ? extends R>
combiner;
final int
bufferSize;
final boolean
delayError;
public
ObservableCombineLatest(
ObservableSource<? extends T>[]
sources,
Iterable<? extends
ObservableSource<? extends T>>
sourcesIterable,
Function<? super
Object[], ? extends R>
combiner, int
bufferSize,
boolean
delayError) {
this.
sources =
sources;
this.
sourcesIterable =
sourcesIterable;
this.
combiner =
combiner;
this.
bufferSize =
bufferSize;
this.
delayError =
delayError;
}
@
Override
@
SuppressWarnings("unchecked")
public void
subscribeActual(
Observer<? super R>
observer) {
ObservableSource<? extends T>[]
sources = this.
sources;
int
count = 0;
if (
sources == null) {
sources = new
Observable[8];
for (
ObservableSource<? extends T>
p :
sourcesIterable) {
if (
count ==
sources.length) {
ObservableSource<? extends T>[]
b = new
ObservableSource[
count + (
count >> 2)];
System.
arraycopy(
sources, 0,
b, 0,
count);
sources =
b;
}
sources[
count++] =
p;
}
} else {
count =
sources.length;
}
if (
count == 0) {
EmptyDisposable.
complete(
observer);
return;
}
LatestCoordinator<T, R>
lc = new
LatestCoordinator<T, R>(
observer,
combiner,
count,
bufferSize,
delayError);
lc.
subscribe(
sources);
}
static final class
LatestCoordinator<T, R> extends
AtomicInteger implements
Disposable {
private static final long
serialVersionUID = 8567835998786448817L;
final
Observer<? super R>
downstream;
final
Function<? super
Object[], ? extends R>
combiner;
final
CombinerObserver<T, R>[]
observers;
Object[]
latest;
final
SpscLinkedArrayQueue<
Object[]>
queue;
final boolean
delayError;
volatile boolean
cancelled;
volatile boolean
done;
final
AtomicThrowable errors = new
AtomicThrowable();
int
active;
int
complete;
@
SuppressWarnings("unchecked")
LatestCoordinator(
Observer<? super R>
actual,
Function<? super
Object[], ? extends R>
combiner,
int
count, int
bufferSize, boolean
delayError) {
this.
downstream =
actual;
this.
combiner =
combiner;
this.
delayError =
delayError;
this.
latest = new
Object[
count];
CombinerObserver<T, R>[]
as = new
CombinerObserver[
count];
for (int
i = 0;
i <
count;
i++) {
as[
i] = new
CombinerObserver<T, R>(this,
i);
}
this.
observers =
as;
this.
queue = new
SpscLinkedArrayQueue<
Object[]>(
bufferSize);
}
public void
subscribe(
ObservableSource<? extends T>[]
sources) {
Observer<T>[]
as =
observers;
int
len =
as.length;
downstream.
onSubscribe(this);
for (int
i = 0;
i <
len;
i++) {
if (
done ||
cancelled) {
return;
}
sources[
i].
subscribe(
as[
i]);
}
}
@
Override
public void
dispose() {
if (!
cancelled) {
cancelled = true;
cancelSources();
if (
getAndIncrement() == 0) {
clear(
queue);
}
}
}
@
Override
public boolean
isDisposed() {
return
cancelled;
}
void
cancelSources() {
for (
CombinerObserver<T, R>
observer :
observers) {
observer.
dispose();
}
}
void
clear(
SpscLinkedArrayQueue<?>
q) {
synchronized (this) {
latest = null;
}
q.
clear();
}
void
drain() {
if (
getAndIncrement() != 0) {
return;
}
final
SpscLinkedArrayQueue<
Object[]>
q =
queue;
final
Observer<? super R>
a =
downstream;
final boolean
delayError = this.
delayError;
int
missed = 1;
for (;;) {
for (;;) {
if (
cancelled) {
clear(
q);
return;
}
if (!
delayError &&
errors.
get() != null) {
cancelSources();
clear(
q);
a.
onError(
errors.
terminate());
return;
}
boolean
d =
done;
Object[]
s =
q.
poll();
boolean
empty =
s == null;
if (
d &&
empty) {
clear(
q);
Throwable ex =
errors.
terminate();
if (
ex == null) {
a.
onComplete();
} else {
a.
onError(
ex);
}
return;
}
if (
empty) {
break;
}
R
v;
try {
v =
ObjectHelper.
requireNonNull(
combiner.
apply(
s), "The combiner returned a null value");
} catch (
Throwable ex) {
Exceptions.
throwIfFatal(
ex);
errors.
addThrowable(
ex);
cancelSources();
clear(
q);
ex =
errors.
terminate();
a.
onError(
ex);
return;
}
a.
onNext(
v);
}
missed =
addAndGet(-
missed);
if (
missed == 0) {
break;
}
}
}
void
innerNext(int
index, T
item) {
boolean
shouldDrain = false;
synchronized (this) {
Object[]
latest = this.
latest;
if (
latest == null) {
return;
}
Object o =
latest[
index];
int
a =
active;
if (
o == null) {
active = ++
a;
}
latest[
index] =
item;
if (
a ==
latest.length) {
queue.
offer(
latest.
clone());
shouldDrain = true;
}
}
if (
shouldDrain) {
drain();
}
}
void
innerError(int
index,
Throwable ex) {
if (
errors.
addThrowable(
ex)) {
boolean
cancelOthers = true;
if (
delayError) {
synchronized (this) {
Object[]
latest = this.
latest;
if (
latest == null) {
return;
}
cancelOthers =
latest[
index] == null;
if (
cancelOthers || ++
complete ==
latest.length) {
done = true;
}
}
}
if (
cancelOthers) {
cancelSources();
}
drain();
} else {
RxJavaPlugins.
onError(
ex);
}
}
void
innerComplete(int
index) {
boolean
cancelOthers = false;
synchronized (this) {
Object[]
latest = this.
latest;
if (
latest == null) {
return;
}
cancelOthers =
latest[
index] == null;
if (
cancelOthers || ++
complete ==
latest.length) {
done = true;
}
}
if (
cancelOthers) {
cancelSources();
}
drain();
}
}
static final class
CombinerObserver<T, R> extends
AtomicReference<
Disposable> implements
Observer<T> {
private static final long
serialVersionUID = -4823716997131257941L;
final
LatestCoordinator<T, R>
parent;
final int
index;
CombinerObserver(
LatestCoordinator<T, R>
parent, int
index) {
this.
parent =
parent;
this.
index =
index;
}
@
Override
public void
onSubscribe(
Disposable d) {
DisposableHelper.
setOnce(this,
d);
}
@
Override
public void
onNext(T
t) {
parent.
innerNext(
index,
t);
}
@
Override
public void
onError(
Throwable t) {
parent.
innerError(
index,
t);
}
@
Override
public void
onComplete() {
parent.
innerComplete(
index);
}
public void
dispose() {
DisposableHelper.
dispose(this);
}
}
}