/*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
* ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*/
package java.util.stream;
import java.util.
Optional;
import java.util.
OptionalDouble;
import java.util.
OptionalInt;
import java.util.
OptionalLong;
import java.util.
Spliterator;
import java.util.concurrent.
CountedCompleter;
import java.util.function.
Predicate;
import java.util.function.
Supplier;
/**
* Factory for instances of a short-circuiting {@code TerminalOp} that searches
* for an element in a stream pipeline, and terminates when it finds one.
* Supported variants include find-first (find the first element in the
* encounter order) and find-any (find any element, may not be the first in
* encounter order.)
*
* @since 1.8
*/
final class
FindOps {
private
FindOps() { }
/**
* Constructs a {@code TerminalOp} for streams of objects.
*
* @param <T> the type of elements of the stream
* @param mustFindFirst whether the {@code TerminalOp} must produce the
* first element in the encounter order
* @return a {@code TerminalOp} implementing the find operation
*/
public static <T>
TerminalOp<T,
Optional<T>>
makeRef(boolean
mustFindFirst) {
return new
FindOp<>(
mustFindFirst,
StreamShape.
REFERENCE,
Optional.
empty(),
Optional::isPresent,
FindSink.
OfRef::new);
}
/**
* Constructs a {@code TerminalOp} for streams of ints.
*
* @param mustFindFirst whether the {@code TerminalOp} must produce the
* first element in the encounter order
* @return a {@code TerminalOp} implementing the find operation
*/
public static
TerminalOp<
Integer,
OptionalInt>
makeInt(boolean
mustFindFirst) {
return new
FindOp<>(
mustFindFirst,
StreamShape.
INT_VALUE,
OptionalInt.
empty(),
OptionalInt::isPresent,
FindSink.
OfInt::new);
}
/**
* Constructs a {@code TerminalOp} for streams of longs.
*
* @param mustFindFirst whether the {@code TerminalOp} must produce the
* first element in the encounter order
* @return a {@code TerminalOp} implementing the find operation
*/
public static
TerminalOp<
Long,
OptionalLong>
makeLong(boolean
mustFindFirst) {
return new
FindOp<>(
mustFindFirst,
StreamShape.
LONG_VALUE,
OptionalLong.
empty(),
OptionalLong::isPresent,
FindSink.
OfLong::new);
}
/**
* Constructs a {@code FindOp} for streams of doubles.
*
* @param mustFindFirst whether the {@code TerminalOp} must produce the
* first element in the encounter order
* @return a {@code TerminalOp} implementing the find operation
*/
public static
TerminalOp<
Double,
OptionalDouble>
makeDouble(boolean
mustFindFirst) {
return new
FindOp<>(
mustFindFirst,
StreamShape.
DOUBLE_VALUE,
OptionalDouble.
empty(),
OptionalDouble::isPresent,
FindSink.
OfDouble::new);
}
/**
* A short-circuiting {@code TerminalOp} that searches for an element in a
* stream pipeline, and terminates when it finds one. Implements both
* find-first (find the first element in the encounter order) and find-any
* (find any element, may not be the first in encounter order.)
*
* @param <T> the output type of the stream pipeline
* @param <O> the result type of the find operation, typically an optional
* type
*/
private static final class
FindOp<T, O> implements
TerminalOp<T, O> {
private final
StreamShape shape;
final boolean
mustFindFirst;
final O
emptyValue;
final
Predicate<O>
presentPredicate;
final
Supplier<
TerminalSink<T, O>>
sinkSupplier;
/**
* Constructs a {@code FindOp}.
*
* @param mustFindFirst if true, must find the first element in
* encounter order, otherwise can find any element
* @param shape stream shape of elements to search
* @param emptyValue result value corresponding to "found nothing"
* @param presentPredicate {@code Predicate} on result value
* corresponding to "found something"
* @param sinkSupplier supplier for a {@code TerminalSink} implementing
* the matching functionality
*/
FindOp(boolean
mustFindFirst,
StreamShape shape,
O
emptyValue,
Predicate<O>
presentPredicate,
Supplier<
TerminalSink<T, O>>
sinkSupplier) {
this.
mustFindFirst =
mustFindFirst;
this.
shape =
shape;
this.
emptyValue =
emptyValue;
this.
presentPredicate =
presentPredicate;
this.
sinkSupplier =
sinkSupplier;
}
@
Override
public int
getOpFlags() {
return
StreamOpFlag.
IS_SHORT_CIRCUIT | (
mustFindFirst ? 0 :
StreamOpFlag.
NOT_ORDERED);
}
@
Override
public
StreamShape inputShape() {
return
shape;
}
@
Override
public <S> O
evaluateSequential(
PipelineHelper<T>
helper,
Spliterator<S>
spliterator) {
O
result =
helper.
wrapAndCopyInto(
sinkSupplier.
get(),
spliterator).
get();
return
result != null ?
result :
emptyValue;
}
@
Override
public <P_IN> O
evaluateParallel(
PipelineHelper<T>
helper,
Spliterator<P_IN>
spliterator) {
return new
FindTask<>(this,
helper,
spliterator).
invoke();
}
}
/**
* Implementation of @{code TerminalSink} that implements the find
* functionality, requesting cancellation when something has been found
*
* @param <T> The type of input element
* @param <O> The result type, typically an optional type
*/
private static abstract class
FindSink<T, O> implements
TerminalSink<T, O> {
boolean
hasValue;
T
value;
FindSink() {} // Avoid creation of special accessor
@
Override
public void
accept(T
value) {
if (!
hasValue) {
hasValue = true;
this.
value =
value;
}
}
@
Override
public boolean
cancellationRequested() {
return
hasValue;
}
/** Specialization of {@code FindSink} for reference streams */
static final class
OfRef<T> extends
FindSink<T,
Optional<T>> {
@
Override
public
Optional<T>
get() {
return
hasValue ?
Optional.
of(
value) : null;
}
}
/** Specialization of {@code FindSink} for int streams */
static final class
OfInt extends
FindSink<
Integer,
OptionalInt>
implements
Sink.
OfInt {
@
Override
public void
accept(int
value) {
// Boxing is OK here, since few values will actually flow into the sink
accept((
Integer)
value);
}
@
Override
public
OptionalInt get() {
return
hasValue ?
OptionalInt.
of(
value) : null;
}
}
/** Specialization of {@code FindSink} for long streams */
static final class
OfLong extends
FindSink<
Long,
OptionalLong>
implements
Sink.
OfLong {
@
Override
public void
accept(long
value) {
// Boxing is OK here, since few values will actually flow into the sink
accept((
Long)
value);
}
@
Override
public
OptionalLong get() {
return
hasValue ?
OptionalLong.
of(
value) : null;
}
}
/** Specialization of {@code FindSink} for double streams */
static final class
OfDouble extends
FindSink<
Double,
OptionalDouble>
implements
Sink.
OfDouble {
@
Override
public void
accept(double
value) {
// Boxing is OK here, since few values will actually flow into the sink
accept((
Double)
value);
}
@
Override
public
OptionalDouble get() {
return
hasValue ?
OptionalDouble.
of(
value) : null;
}
}
}
/**
* {@code ForkJoinTask} implementing parallel short-circuiting search
* @param <P_IN> Input element type to the stream pipeline
* @param <P_OUT> Output element type from the stream pipeline
* @param <O> Result type from the find operation
*/
@
SuppressWarnings("serial")
private static final class
FindTask<P_IN, P_OUT, O>
extends
AbstractShortCircuitTask<P_IN, P_OUT, O,
FindTask<P_IN, P_OUT, O>> {
private final
FindOp<P_OUT, O>
op;
FindTask(
FindOp<P_OUT, O>
op,
PipelineHelper<P_OUT>
helper,
Spliterator<P_IN>
spliterator) {
super(
helper,
spliterator);
this.
op =
op;
}
FindTask(
FindTask<P_IN, P_OUT, O>
parent,
Spliterator<P_IN>
spliterator) {
super(
parent,
spliterator);
this.
op =
parent.
op;
}
@
Override
protected
FindTask<P_IN, P_OUT, O>
makeChild(
Spliterator<P_IN>
spliterator) {
return new
FindTask<>(this,
spliterator);
}
@
Override
protected O
getEmptyResult() {
return
op.
emptyValue;
}
private void
foundResult(O
answer) {
if (
isLeftmostNode())
shortCircuit(
answer);
else
cancelLaterNodes();
}
@
Override
protected O
doLeaf() {
O
result =
helper.
wrapAndCopyInto(
op.
sinkSupplier.
get(),
spliterator).
get();
if (!
op.
mustFindFirst) {
if (
result != null)
shortCircuit(
result);
return null;
}
else {
if (
result != null) {
foundResult(
result);
return
result;
}
else
return null;
}
}
@
Override
public void
onCompletion(
CountedCompleter<?>
caller) {
if (
op.
mustFindFirst) {
for (
FindTask<P_IN, P_OUT, O>
child =
leftChild,
p = null;
child !=
p;
p =
child,
child =
rightChild) {
O
result =
child.
getLocalResult();
if (
result != null &&
op.
presentPredicate.
test(
result)) {
setLocalResult(
result);
foundResult(
result);
break;
}
}
}
super.onCompletion(
caller);
}
}
}