/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.operators;
import java.util.*;
import rx.*;
import rx.
Observable;
import rx.
Observable.
OnSubscribe;
import rx.exceptions.
Exceptions;
import rx.functions.*;
import rx.observers.
SerializedSubscriber;
import rx.subscriptions.*;
/**
* Correlates the elements of two sequences based on overlapping durations.
*
* @param <TLeft> the left value type
* @param <TRight> the right value type
* @param <TLeftDuration> the left duration value type
* @param <TRightDuration> the right duration type
* @param <R> the result type
*/
public final class
OnSubscribeJoin<TLeft, TRight, TLeftDuration, TRightDuration, R> implements
OnSubscribe<R> {
final
Observable<TLeft>
left;
final
Observable<TRight>
right;
final
Func1<TLeft,
Observable<TLeftDuration>>
leftDurationSelector;
final
Func1<TRight,
Observable<TRightDuration>>
rightDurationSelector;
final
Func2<TLeft, TRight, R>
resultSelector;
public
OnSubscribeJoin(
Observable<TLeft>
left,
Observable<TRight>
right,
Func1<TLeft,
Observable<TLeftDuration>>
leftDurationSelector,
Func1<TRight,
Observable<TRightDuration>>
rightDurationSelector,
Func2<TLeft, TRight, R>
resultSelector) {
this.
left =
left;
this.
right =
right;
this.
leftDurationSelector =
leftDurationSelector;
this.
rightDurationSelector =
rightDurationSelector;
this.
resultSelector =
resultSelector;
}
@
Override
public void
call(
Subscriber<? super R>
t1) {
ResultSink result = new
ResultSink(new
SerializedSubscriber<R>(
t1));
result.
run();
}
/** Manage the left and right sources. */
final class
ResultSink extends
HashMap<
Integer,TLeft> {
//HashMap aspect of `this` refers to the `leftMap`
private static final long
serialVersionUID = 3491669543549085380L;
final
CompositeSubscription group;
final
Subscriber<? super R>
subscriber;
/** Guarded by this. */
boolean
leftDone;
/** Guarded by this. */
int
leftId;
/** Guarded by this. */
boolean
rightDone;
/** Guarded by this. */
int
rightId;
/** Guarded by this. */
final
Map<
Integer, TRight>
rightMap;
public
ResultSink(
Subscriber<? super R>
subscriber) {
super();
this.
subscriber =
subscriber;
this.
group = new
CompositeSubscription();
//`leftMap` is `this`
this.
rightMap = new
HashMap<
Integer, TRight>();
}
HashMap<
Integer, TLeft>
leftMap() {
return this;
}
public void
run() {
subscriber.
add(
group);
Subscriber<TLeft>
s1 = new
LeftSubscriber();
Subscriber<TRight>
s2 = new
RightSubscriber();
group.
add(
s1);
group.
add(
s2);
left.
unsafeSubscribe(
s1);
right.
unsafeSubscribe(
s2);
}
/** Observes the left values. */
final class
LeftSubscriber extends
Subscriber<TLeft> {
protected void
expire(int
id,
Subscription resource) {
boolean
complete = false;
synchronized (
ResultSink.this) {
if (
leftMap().
remove(
id) != null &&
leftMap().
isEmpty() &&
leftDone) {
complete = true;
}
}
if (
complete) {
subscriber.
onCompleted();
subscriber.
unsubscribe();
} else {
group.
remove(
resource);
}
}
@
Override
public void
onNext(TLeft
args) {
int
id;
int
highRightId;
synchronized (
ResultSink.this) {
id =
leftId++;
leftMap().
put(
id,
args);
highRightId =
rightId;
}
Observable<TLeftDuration>
duration;
try {
duration =
leftDurationSelector.
call(
args);
Subscriber<TLeftDuration>
d1 = new
LeftDurationSubscriber(
id);
group.
add(
d1);
duration.
unsafeSubscribe(
d1);
List<TRight>
rightValues = new
ArrayList<TRight>();
synchronized (
ResultSink.this) {
for (
Map.
Entry<
Integer, TRight>
entry :
rightMap.
entrySet()) {
if (
entry.
getKey() <
highRightId) {
rightValues.
add(
entry.
getValue());
}
}
}
for (TRight
r :
rightValues) {
R
result =
resultSelector.
call(
args,
r);
subscriber.
onNext(
result);
}
} catch (
Throwable t) {
Exceptions.
throwOrReport(
t, this);
}
}
@
Override
public void
onError(
Throwable e) {
subscriber.
onError(
e);
subscriber.
unsubscribe();
}
@
Override
public void
onCompleted() {
boolean
complete = false;
synchronized (
ResultSink.this) {
leftDone = true;
if (
rightDone ||
leftMap().
isEmpty()) {
complete = true;
}
}
if (
complete) {
subscriber.
onCompleted();
subscriber.
unsubscribe();
} else {
group.
remove(this);
}
}
/** Observes the left duration. */
final class
LeftDurationSubscriber extends
Subscriber<TLeftDuration> {
final int
id;
boolean
once = true;
public
LeftDurationSubscriber(int
id) {
this.
id =
id;
}
@
Override
public void
onNext(TLeftDuration
args) {
onCompleted();
}
@
Override
public void
onError(
Throwable e) {
LeftSubscriber.this.
onError(
e);
}
@
Override
public void
onCompleted() {
if (
once) {
once = false;
expire(
id, this);
}
}
}
}
/** Observes the right values. */
final class
RightSubscriber extends
Subscriber<TRight> {
void
expire(int
id,
Subscription resource) {
boolean
complete = false;
synchronized (
ResultSink.this) {
if (
rightMap.
remove(
id) != null &&
rightMap.
isEmpty() &&
rightDone) {
complete = true;
}
}
if (
complete) {
subscriber.
onCompleted();
subscriber.
unsubscribe();
} else {
group.
remove(
resource);
}
}
@
Override
public void
onNext(TRight
args) {
int
id;
int
highLeftId;
synchronized (
ResultSink.this) {
id =
rightId++;
rightMap.
put(
id,
args);
highLeftId =
leftId;
}
SerialSubscription md = new
SerialSubscription();
group.
add(
md);
Observable<TRightDuration>
duration;
try {
duration =
rightDurationSelector.
call(
args);
Subscriber<TRightDuration>
d2 = new
RightDurationSubscriber(
id);
group.
add(
d2);
duration.
unsafeSubscribe(
d2);
List<TLeft>
leftValues = new
ArrayList<TLeft>();
synchronized (
ResultSink.this) {
for (
Map.
Entry<
Integer, TLeft>
entry :
leftMap().
entrySet()) {
if (
entry.
getKey() <
highLeftId) {
leftValues.
add(
entry.
getValue());
}
}
}
for (TLeft
lv :
leftValues) {
R
result =
resultSelector.
call(
lv,
args);
subscriber.
onNext(
result);
}
} catch (
Throwable t) {
Exceptions.
throwOrReport(
t, this);
}
}
@
Override
public void
onError(
Throwable e) {
subscriber.
onError(
e);
subscriber.
unsubscribe();
}
@
Override
public void
onCompleted() {
boolean
complete = false;
synchronized (
ResultSink.this) {
rightDone = true;
if (
leftDone ||
rightMap.
isEmpty()) {
complete = true;
}
}
if (
complete) {
subscriber.
onCompleted();
subscriber.
unsubscribe();
} else {
group.
remove(this);
}
}
/** Observe the right duration. */
final class
RightDurationSubscriber extends
Subscriber<TRightDuration> {
final int
id;
boolean
once = true;
public
RightDurationSubscriber(int
id) {
this.
id =
id;
}
@
Override
public void
onNext(TRightDuration
args) {
onCompleted();
}
@
Override
public void
onError(
Throwable e) {
RightSubscriber.this.
onError(
e);
}
@
Override
public void
onCompleted() {
if (
once) {
once = false;
expire(
id, this);
}
}
}
}
}
}