/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.operators;
import java.util.*;
import rx.*;
import rx.
Observable;
import rx.
Observable.
OnSubscribe;
import rx.
Observer;
import rx.exceptions.
Exceptions;
import rx.functions.*;
import rx.observers.*;
import rx.subjects.*;
import rx.subscriptions.*;
/**
* Correlates two sequences when they overlap and groups the results.
*
* @see <a href="http://msdn.microsoft.com/en-us/library/hh244235.aspx">MSDN: Observable.GroupJoin</a>
* @param <T1> the left value type
* @param <T2> the right value type
* @param <D1> the value type of the left duration
* @param <D2> the value type of the right duration
* @param <R> the result value type
*/
public final class
OnSubscribeGroupJoin<T1, T2, D1, D2, R> implements
OnSubscribe<R> {
final
Observable<T1>
left;
final
Observable<T2>
right;
final
Func1<? super T1, ? extends
Observable<D1>>
leftDuration;
final
Func1<? super T2, ? extends
Observable<D2>>
rightDuration;
final
Func2<? super T1, ? super
Observable<T2>, ? extends R>
resultSelector;
public
OnSubscribeGroupJoin(
Observable<T1>
left,
Observable<T2>
right,
Func1<? super T1, ? extends
Observable<D1>>
leftDuration,
Func1<? super T2, ? extends
Observable<D2>>
rightDuration,
Func2<? super T1, ? super
Observable<T2>, ? extends R>
resultSelector) {
this.
left =
left;
this.
right =
right;
this.
leftDuration =
leftDuration;
this.
rightDuration =
rightDuration;
this.
resultSelector =
resultSelector;
}
@
Override
public void
call(
Subscriber<? super R>
child) {
ResultManager ro = new
ResultManager(new
SerializedSubscriber<R>(
child));
child.
add(
ro);
ro.
init();
}
/** Manages sub-observers and subscriptions. */
final class
ResultManager extends
HashMap<
Integer,
Observer<T2>>implements
Subscription {
// HashMap aspect of `this` refers to `leftMap`
private static final long
serialVersionUID = -3035156013812425335L;
final
RefCountSubscription cancel;
final
Subscriber<? super R>
subscriber;
final
CompositeSubscription group;
/** Guarded by this. */
int
leftIds;
/** Guarded by this. */
int
rightIds;
/** Guarded by this. */
final
Map<
Integer, T2>
rightMap = new
HashMap<
Integer, T2>(); // NOPMD
/** Guarded by this. */
boolean
leftDone;
/** Guarded by this. */
boolean
rightDone;
public
ResultManager(
Subscriber<? super R>
subscriber) {
super();
this.
subscriber =
subscriber;
this.
group = new
CompositeSubscription();
this.
cancel = new
RefCountSubscription(
group);
}
public void
init() {
Subscriber<T1>
s1 = new
LeftObserver();
Subscriber<T2>
s2 = new
RightObserver();
group.
add(
s1);
group.
add(
s2);
left.
unsafeSubscribe(
s1);
right.
unsafeSubscribe(
s2);
}
@
Override
public void
unsubscribe() {
cancel.
unsubscribe();
}
@
Override
public boolean
isUnsubscribed() {
return
cancel.
isUnsubscribed();
}
Map<
Integer,
Observer<T2>>
leftMap() {
return this;
}
/**
* Notify everyone and cleanup.
* @param e the exception
*/
void
errorAll(
Throwable e) {
List<
Observer<T2>>
list;
synchronized (
ResultManager.this) {
list = new
ArrayList<
Observer<T2>>(
leftMap().
values());
leftMap().
clear();
rightMap.
clear();
}
for (
Observer<T2>
o :
list) {
o.
onError(
e);
}
subscriber.
onError(
e);
cancel.
unsubscribe();
}
/**
* Notify only the main subscriber and cleanup.
* @param e the exception
*/
void
errorMain(
Throwable e) {
synchronized (
ResultManager.this) {
leftMap().
clear();
rightMap.
clear();
}
subscriber.
onError(
e);
cancel.
unsubscribe();
}
void
complete(
List<
Observer<T2>>
list) {
if (
list != null) {
for (
Observer<T2>
o :
list) {
o.
onCompleted();
}
subscriber.
onCompleted();
cancel.
unsubscribe();
}
}
/** Observe the left source. */
final class
LeftObserver extends
Subscriber<T1> {
@
Override
public void
onNext(T1
args) {
try {
int
id;
Subject<T2, T2>
subj =
PublishSubject.
create();
Observer<T2>
subjSerial = new
SerializedObserver<T2>(
subj);
synchronized (
ResultManager.this) {
id =
leftIds++;
leftMap().
put(
id,
subjSerial);
}
Observable<T2>
window =
Observable.
unsafeCreate(new
WindowObservableFunc<T2>(
subj,
cancel));
Observable<D1>
duration =
leftDuration.
call(
args);
Subscriber<D1>
d1 = new
LeftDurationObserver(
id);
group.
add(
d1);
duration.
unsafeSubscribe(
d1);
R
result =
resultSelector.
call(
args,
window);
List<T2>
rightMapValues;
synchronized (
ResultManager.this) {
rightMapValues = new
ArrayList<T2>(
rightMap.
values());
}
subscriber.
onNext(
result);
for (T2
t2 :
rightMapValues) {
subjSerial.
onNext(
t2);
}
} catch (
Throwable t) {
Exceptions.
throwOrReport(
t, this);
}
}
@
Override
public void
onCompleted() {
List<
Observer<T2>>
list = null;
synchronized (
ResultManager.this) {
leftDone = true;
if (
rightDone) {
list = new
ArrayList<
Observer<T2>>(
leftMap().
values());
leftMap().
clear();
rightMap.
clear();
}
}
complete(
list);
}
@
Override
public void
onError(
Throwable e) {
errorAll(
e);
}
}
/** Observe the right source. */
final class
RightObserver extends
Subscriber<T2> {
@
Override
public void
onNext(T2
args) {
try {
int
id;
synchronized (
ResultManager.this) {
id =
rightIds++;
rightMap.
put(
id,
args);
}
Observable<D2>
duration =
rightDuration.
call(
args);
Subscriber<D2>
d2 = new
RightDurationObserver(
id);
group.
add(
d2);
duration.
unsafeSubscribe(
d2);
List<
Observer<T2>>
list;
synchronized (
ResultManager.this) {
list = new
ArrayList<
Observer<T2>>(
leftMap().
values());
}
for (
Observer<T2>
o :
list) {
o.
onNext(
args);
}
} catch (
Throwable t) {
Exceptions.
throwOrReport(
t, this);
}
}
@
Override
public void
onCompleted() {
List<
Observer<T2>>
list = null;
synchronized (
ResultManager.this) {
rightDone = true;
if (
leftDone) {
list = new
ArrayList<
Observer<T2>>(
leftMap().
values());
leftMap().
clear();
rightMap.
clear();
}
}
complete(
list);
}
@
Override
public void
onError(
Throwable e) {
errorAll(
e);
}
}
/** Observe left duration and apply termination. */
final class
LeftDurationObserver extends
Subscriber<D1> {
final int
id;
boolean
once = true;
public
LeftDurationObserver(int
id) {
this.
id =
id;
}
@
Override
public void
onCompleted() {
if (
once) {
once = false;
Observer<T2>
gr;
synchronized (
ResultManager.this) {
gr =
leftMap().
remove(
id);
}
if (
gr != null) {
gr.
onCompleted();
}
group.
remove(this);
}
}
@
Override
public void
onError(
Throwable e) {
errorMain(
e);
}
@
Override
public void
onNext(D1
args) {
onCompleted();
}
}
/** Observe right duration and apply termination. */
final class
RightDurationObserver extends
Subscriber<D2> {
final int
id;
boolean
once = true;
public
RightDurationObserver(int
id) {
this.
id =
id;
}
@
Override
public void
onCompleted() {
if (
once) {
once = false;
synchronized (
ResultManager.this) {
rightMap.
remove(
id);
}
group.
remove(this);
}
}
@
Override
public void
onError(
Throwable e) {
errorMain(
e);
}
@
Override
public void
onNext(D2
args) {
onCompleted();
}
}
}
/**
* The reference-counted window observable.
* Subscribes to the underlying Observable by using a reference-counted
* subscription.
*/
final static class
WindowObservableFunc<T> implements
OnSubscribe<T> {
final
RefCountSubscription refCount;
final
Observable<T>
underlying;
public
WindowObservableFunc(
Observable<T>
underlying,
RefCountSubscription refCount) {
this.
refCount =
refCount;
this.
underlying =
underlying;
}
@
Override
public void
call(
Subscriber<? super T>
t1) {
Subscription ref =
refCount.
get();
WindowSubscriber wo = new
WindowSubscriber(
t1,
ref);
wo.
add(
ref);
underlying.
unsafeSubscribe(
wo);
}
/** Observe activities on the window. */
final class
WindowSubscriber extends
Subscriber<T> {
final
Subscriber<? super T>
subscriber;
private final
Subscription ref;
public
WindowSubscriber(
Subscriber<? super T>
subscriber,
Subscription ref) {
super(
subscriber);
this.
subscriber =
subscriber;
this.
ref =
ref;
}
@
Override
public void
onNext(T
args) {
subscriber.
onNext(
args);
}
@
Override
public void
onError(
Throwable e) {
subscriber.
onError(
e);
ref.
unsubscribe();
}
@
Override
public void
onCompleted() {
subscriber.
onCompleted();
ref.
unsubscribe();
}
}
}
}