/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.internal.operators;
import java.util.*;
import rx.
Observable;
import rx.
Observable.
Operator;
import rx.
Observer;
import rx.
Subscriber;
import rx.exceptions.
Exceptions;
import rx.functions.
Func1;
import rx.observers.
SerializedSubscriber;
import rx.subscriptions.
CompositeSubscription;
/**
* This operation takes
* values from the specified {@link Observable} source and stores them in the currently active chunks.
* Initially there are no chunks active.
* <p>
* Chunks can be created by pushing a {@code TOpening} value to the "bufferOpenings"
* {@link Observable}. This creates a new buffer which will then start recording values which are produced
* by the "source" {@link Observable}. Additionally the "bufferClosingSelector" will be used to construct an
* {@link Observable} which can produce values. When it does so it will close this (and only this) newly
* created buffer. When the source {@link Observable} completes or produces an error, all chunks are
* emitted, and the event is propagated to all subscribed {@link Observer}s.
* </p><p>
* Note that when using this operation <strong>multiple overlapping chunks</strong> could be active at any
* one point.
* </p>
*
* @param <T> the buffered value type
* @param <TOpening> the value type of the Observable opening buffers
* @param <TClosing> the value type of the Observable closing buffers
*/
public final class
OperatorBufferWithStartEndObservable<T, TOpening, TClosing> implements
Operator<
List<T>, T> {
final
Observable<? extends TOpening>
bufferOpening;
final
Func1<? super TOpening, ? extends
Observable<? extends TClosing>>
bufferClosing;
/**
* @param bufferOpenings
* an {@link Observable} which when it produces a {@code TOpening} value will create a
* new buffer which instantly starts recording the "source" {@link Observable}
* @param bufferClosingSelector
* a {@link Func1} object which produces {@link Observable}s. These {@link Observable}s determine
* when a buffer is emitted and replaced by simply producing an object.
*/
public
OperatorBufferWithStartEndObservable(
Observable<? extends TOpening>
bufferOpenings,
Func1<? super TOpening, ? extends
Observable<? extends TClosing>>
bufferClosingSelector) {
this.
bufferOpening =
bufferOpenings;
this.
bufferClosing =
bufferClosingSelector;
}
@
Override
public
Subscriber<? super T>
call(final
Subscriber<? super
List<T>>
child) {
final
BufferingSubscriber s = new
BufferingSubscriber(new
SerializedSubscriber<
List<T>>(
child));
Subscriber<TOpening>
openSubscriber = new
Subscriber<TOpening>() {
@
Override
public void
onNext(TOpening
t) {
s.
startBuffer(
t);
}
@
Override
public void
onError(
Throwable e) {
s.
onError(
e);
}
@
Override
public void
onCompleted() {
s.
onCompleted();
}
};
child.
add(
openSubscriber);
child.
add(
s);
bufferOpening.
unsafeSubscribe(
openSubscriber);
return
s;
}
final class
BufferingSubscriber extends
Subscriber<T> {
final
Subscriber<? super
List<T>>
child;
/** Guarded by this. */
final
List<
List<T>>
chunks;
/** Guarded by this. */
boolean
done;
final
CompositeSubscription closingSubscriptions;
public
BufferingSubscriber(
Subscriber<? super
List<T>>
child) {
this.
child =
child;
this.
chunks = new
LinkedList<
List<T>>();
this.
closingSubscriptions = new
CompositeSubscription();
add(this.
closingSubscriptions);
}
@
Override
public void
onNext(T
t) {
synchronized (this) {
for (
List<T>
chunk :
chunks) {
chunk.
add(
t);
}
}
}
@
Override
public void
onError(
Throwable e) {
synchronized (this) {
if (
done) {
return;
}
done = true;
chunks.
clear();
}
child.
onError(
e);
unsubscribe();
}
@
Override
public void
onCompleted() {
try {
List<
List<T>>
toEmit;
synchronized (this) {
if (
done) {
return;
}
done = true;
toEmit = new
LinkedList<
List<T>>(
chunks);
chunks.
clear();
}
for (
List<T>
chunk :
toEmit) {
child.
onNext(
chunk);
}
} catch (
Throwable t) {
Exceptions.
throwOrReport(
t,
child);
return;
}
child.
onCompleted();
unsubscribe();
}
void
startBuffer(TOpening
v) {
final
List<T>
chunk = new
ArrayList<T>();
synchronized (this) {
if (
done) {
return;
}
chunks.
add(
chunk);
}
Observable<? extends TClosing>
cobs;
try {
cobs =
bufferClosing.
call(
v);
} catch (
Throwable t) {
Exceptions.
throwOrReport(
t, this);
return;
}
Subscriber<TClosing>
closeSubscriber = new
Subscriber<TClosing>() {
@
Override
public void
onNext(TClosing
t) {
closingSubscriptions.
remove(this);
endBuffer(
chunk);
}
@
Override
public void
onError(
Throwable e) {
BufferingSubscriber.this.
onError(
e);
}
@
Override
public void
onCompleted() {
closingSubscriptions.
remove(this);
endBuffer(
chunk);
}
};
closingSubscriptions.
add(
closeSubscriber);
cobs.
unsafeSubscribe(
closeSubscriber);
}
void
endBuffer(
List<T>
toEnd) {
boolean
canEnd = false;
synchronized (this) {
if (
done) {
return;
}
Iterator<
List<T>>
it =
chunks.
iterator();
while (
it.
hasNext()) {
List<T>
chunk =
it.
next();
if (
chunk ==
toEnd) {
canEnd = true;
it.
remove();
break;
}
}
}
if (
canEnd) {
child.
onNext(
toEnd);
}
}
}
}