/*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
* ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*/
package java.util.stream;
import java.util.
Objects;
import java.util.
Optional;
import java.util.
OptionalDouble;
import java.util.
OptionalInt;
import java.util.
OptionalLong;
import java.util.
Spliterator;
import java.util.concurrent.
CountedCompleter;
import java.util.function.
BiConsumer;
import java.util.function.
BiFunction;
import java.util.function.
BinaryOperator;
import java.util.function.
DoubleBinaryOperator;
import java.util.function.
IntBinaryOperator;
import java.util.function.
LongBinaryOperator;
import java.util.function.
ObjDoubleConsumer;
import java.util.function.
ObjIntConsumer;
import java.util.function.
ObjLongConsumer;
import java.util.function.
Supplier;
/**
* Factory for creating instances of {@code TerminalOp} that implement
* reductions.
*
* @since 1.8
*/
final class
ReduceOps {
private
ReduceOps() { }
/**
* Constructs a {@code TerminalOp} that implements a functional reduce on
* reference values.
*
* @param <T> the type of the input elements
* @param <U> the type of the result
* @param seed the identity element for the reduction
* @param reducer the accumulating function that incorporates an additional
* input element into the result
* @param combiner the combining function that combines two intermediate
* results
* @return a {@code TerminalOp} implementing the reduction
*/
public static <T, U>
TerminalOp<T, U>
makeRef(U
seed,
BiFunction<U, ? super T, U>
reducer,
BinaryOperator<U>
combiner) {
Objects.
requireNonNull(
reducer);
Objects.
requireNonNull(
combiner);
class
ReducingSink extends
Box<U> implements
AccumulatingSink<T, U,
ReducingSink> {
@
Override
public void
begin(long
size) {
state =
seed;
}
@
Override
public void
accept(T
t) {
state =
reducer.
apply(
state,
t);
}
@
Override
public void
combine(
ReducingSink other) {
state =
combiner.
apply(
state,
other.
state);
}
}
return new
ReduceOp<T, U,
ReducingSink>(
StreamShape.
REFERENCE) {
@
Override
public
ReducingSink makeSink() {
return new
ReducingSink();
}
};
}
/**
* Constructs a {@code TerminalOp} that implements a functional reduce on
* reference values producing an optional reference result.
*
* @param <T> The type of the input elements, and the type of the result
* @param operator The reducing function
* @return A {@code TerminalOp} implementing the reduction
*/
public static <T>
TerminalOp<T,
Optional<T>>
makeRef(
BinaryOperator<T>
operator) {
Objects.
requireNonNull(
operator);
class
ReducingSink
implements
AccumulatingSink<T,
Optional<T>,
ReducingSink> {
private boolean
empty;
private T
state;
public void
begin(long
size) {
empty = true;
state = null;
}
@
Override
public void
accept(T
t) {
if (
empty) {
empty = false;
state =
t;
} else {
state =
operator.
apply(
state,
t);
}
}
@
Override
public
Optional<T>
get() {
return
empty ?
Optional.
empty() :
Optional.
of(
state);
}
@
Override
public void
combine(
ReducingSink other) {
if (!
other.
empty)
accept(
other.
state);
}
}
return new
ReduceOp<T,
Optional<T>,
ReducingSink>(
StreamShape.
REFERENCE) {
@
Override
public
ReducingSink makeSink() {
return new
ReducingSink();
}
};
}
/**
* Constructs a {@code TerminalOp} that implements a mutable reduce on
* reference values.
*
* @param <T> the type of the input elements
* @param <I> the type of the intermediate reduction result
* @param collector a {@code Collector} defining the reduction
* @return a {@code ReduceOp} implementing the reduction
*/
public static <T, I>
TerminalOp<T, I>
makeRef(
Collector<? super T, I, ?>
collector) {
Supplier<I>
supplier =
Objects.
requireNonNull(
collector).
supplier();
BiConsumer<I, ? super T>
accumulator =
collector.
accumulator();
BinaryOperator<I>
combiner =
collector.
combiner();
class
ReducingSink extends
Box<I>
implements
AccumulatingSink<T, I,
ReducingSink> {
@
Override
public void
begin(long
size) {
state =
supplier.
get();
}
@
Override
public void
accept(T
t) {
accumulator.
accept(
state,
t);
}
@
Override
public void
combine(
ReducingSink other) {
state =
combiner.
apply(
state,
other.
state);
}
}
return new
ReduceOp<T, I,
ReducingSink>(
StreamShape.
REFERENCE) {
@
Override
public
ReducingSink makeSink() {
return new
ReducingSink();
}
@
Override
public int
getOpFlags() {
return
collector.
characteristics().
contains(
Collector.
Characteristics.
UNORDERED)
?
StreamOpFlag.
NOT_ORDERED
: 0;
}
};
}
/**
* Constructs a {@code TerminalOp} that implements a mutable reduce on
* reference values.
*
* @param <T> the type of the input elements
* @param <R> the type of the result
* @param seedFactory a factory to produce a new base accumulator
* @param accumulator a function to incorporate an element into an
* accumulator
* @param reducer a function to combine an accumulator into another
* @return a {@code TerminalOp} implementing the reduction
*/
public static <T, R>
TerminalOp<T, R>
makeRef(
Supplier<R>
seedFactory,
BiConsumer<R, ? super T>
accumulator,
BiConsumer<R,R>
reducer) {
Objects.
requireNonNull(
seedFactory);
Objects.
requireNonNull(
accumulator);
Objects.
requireNonNull(
reducer);
class
ReducingSink extends
Box<R>
implements
AccumulatingSink<T, R,
ReducingSink> {
@
Override
public void
begin(long
size) {
state =
seedFactory.
get();
}
@
Override
public void
accept(T
t) {
accumulator.
accept(
state,
t);
}
@
Override
public void
combine(
ReducingSink other) {
reducer.
accept(
state,
other.
state);
}
}
return new
ReduceOp<T, R,
ReducingSink>(
StreamShape.
REFERENCE) {
@
Override
public
ReducingSink makeSink() {
return new
ReducingSink();
}
};
}
/**
* Constructs a {@code TerminalOp} that implements a functional reduce on
* {@code int} values.
*
* @param identity the identity for the combining function
* @param operator the combining function
* @return a {@code TerminalOp} implementing the reduction
*/
public static
TerminalOp<
Integer,
Integer>
makeInt(int
identity,
IntBinaryOperator operator) {
Objects.
requireNonNull(
operator);
class
ReducingSink
implements
AccumulatingSink<
Integer,
Integer,
ReducingSink>,
Sink.
OfInt {
private int
state;
@
Override
public void
begin(long
size) {
state =
identity;
}
@
Override
public void
accept(int
t) {
state =
operator.
applyAsInt(
state,
t);
}
@
Override
public
Integer get() {
return
state;
}
@
Override
public void
combine(
ReducingSink other) {
accept(
other.
state);
}
}
return new
ReduceOp<
Integer,
Integer,
ReducingSink>(
StreamShape.
INT_VALUE) {
@
Override
public
ReducingSink makeSink() {
return new
ReducingSink();
}
};
}
/**
* Constructs a {@code TerminalOp} that implements a functional reduce on
* {@code int} values, producing an optional integer result.
*
* @param operator the combining function
* @return a {@code TerminalOp} implementing the reduction
*/
public static
TerminalOp<
Integer,
OptionalInt>
makeInt(
IntBinaryOperator operator) {
Objects.
requireNonNull(
operator);
class
ReducingSink
implements
AccumulatingSink<
Integer,
OptionalInt,
ReducingSink>,
Sink.
OfInt {
private boolean
empty;
private int
state;
public void
begin(long
size) {
empty = true;
state = 0;
}
@
Override
public void
accept(int
t) {
if (
empty) {
empty = false;
state =
t;
}
else {
state =
operator.
applyAsInt(
state,
t);
}
}
@
Override
public
OptionalInt get() {
return
empty ?
OptionalInt.
empty() :
OptionalInt.
of(
state);
}
@
Override
public void
combine(
ReducingSink other) {
if (!
other.
empty)
accept(
other.
state);
}
}
return new
ReduceOp<
Integer,
OptionalInt,
ReducingSink>(
StreamShape.
INT_VALUE) {
@
Override
public
ReducingSink makeSink() {
return new
ReducingSink();
}
};
}
/**
* Constructs a {@code TerminalOp} that implements a mutable reduce on
* {@code int} values.
*
* @param <R> The type of the result
* @param supplier a factory to produce a new accumulator of the result type
* @param accumulator a function to incorporate an int into an
* accumulator
* @param combiner a function to combine an accumulator into another
* @return A {@code ReduceOp} implementing the reduction
*/
public static <R>
TerminalOp<
Integer, R>
makeInt(
Supplier<R>
supplier,
ObjIntConsumer<R>
accumulator,
BinaryOperator<R>
combiner) {
Objects.
requireNonNull(
supplier);
Objects.
requireNonNull(
accumulator);
Objects.
requireNonNull(
combiner);
class
ReducingSink extends
Box<R>
implements
AccumulatingSink<
Integer, R,
ReducingSink>,
Sink.
OfInt {
@
Override
public void
begin(long
size) {
state =
supplier.
get();
}
@
Override
public void
accept(int
t) {
accumulator.
accept(
state,
t);
}
@
Override
public void
combine(
ReducingSink other) {
state =
combiner.
apply(
state,
other.
state);
}
}
return new
ReduceOp<
Integer, R,
ReducingSink>(
StreamShape.
INT_VALUE) {
@
Override
public
ReducingSink makeSink() {
return new
ReducingSink();
}
};
}
/**
* Constructs a {@code TerminalOp} that implements a functional reduce on
* {@code long} values.
*
* @param identity the identity for the combining function
* @param operator the combining function
* @return a {@code TerminalOp} implementing the reduction
*/
public static
TerminalOp<
Long,
Long>
makeLong(long
identity,
LongBinaryOperator operator) {
Objects.
requireNonNull(
operator);
class
ReducingSink
implements
AccumulatingSink<
Long,
Long,
ReducingSink>,
Sink.
OfLong {
private long
state;
@
Override
public void
begin(long
size) {
state =
identity;
}
@
Override
public void
accept(long
t) {
state =
operator.
applyAsLong(
state,
t);
}
@
Override
public
Long get() {
return
state;
}
@
Override
public void
combine(
ReducingSink other) {
accept(
other.
state);
}
}
return new
ReduceOp<
Long,
Long,
ReducingSink>(
StreamShape.
LONG_VALUE) {
@
Override
public
ReducingSink makeSink() {
return new
ReducingSink();
}
};
}
/**
* Constructs a {@code TerminalOp} that implements a functional reduce on
* {@code long} values, producing an optional long result.
*
* @param operator the combining function
* @return a {@code TerminalOp} implementing the reduction
*/
public static
TerminalOp<
Long,
OptionalLong>
makeLong(
LongBinaryOperator operator) {
Objects.
requireNonNull(
operator);
class
ReducingSink
implements
AccumulatingSink<
Long,
OptionalLong,
ReducingSink>,
Sink.
OfLong {
private boolean
empty;
private long
state;
public void
begin(long
size) {
empty = true;
state = 0;
}
@
Override
public void
accept(long
t) {
if (
empty) {
empty = false;
state =
t;
}
else {
state =
operator.
applyAsLong(
state,
t);
}
}
@
Override
public
OptionalLong get() {
return
empty ?
OptionalLong.
empty() :
OptionalLong.
of(
state);
}
@
Override
public void
combine(
ReducingSink other) {
if (!
other.
empty)
accept(
other.
state);
}
}
return new
ReduceOp<
Long,
OptionalLong,
ReducingSink>(
StreamShape.
LONG_VALUE) {
@
Override
public
ReducingSink makeSink() {
return new
ReducingSink();
}
};
}
/**
* Constructs a {@code TerminalOp} that implements a mutable reduce on
* {@code long} values.
*
* @param <R> the type of the result
* @param supplier a factory to produce a new accumulator of the result type
* @param accumulator a function to incorporate an int into an
* accumulator
* @param combiner a function to combine an accumulator into another
* @return a {@code TerminalOp} implementing the reduction
*/
public static <R>
TerminalOp<
Long, R>
makeLong(
Supplier<R>
supplier,
ObjLongConsumer<R>
accumulator,
BinaryOperator<R>
combiner) {
Objects.
requireNonNull(
supplier);
Objects.
requireNonNull(
accumulator);
Objects.
requireNonNull(
combiner);
class
ReducingSink extends
Box<R>
implements
AccumulatingSink<
Long, R,
ReducingSink>,
Sink.
OfLong {
@
Override
public void
begin(long
size) {
state =
supplier.
get();
}
@
Override
public void
accept(long
t) {
accumulator.
accept(
state,
t);
}
@
Override
public void
combine(
ReducingSink other) {
state =
combiner.
apply(
state,
other.
state);
}
}
return new
ReduceOp<
Long, R,
ReducingSink>(
StreamShape.
LONG_VALUE) {
@
Override
public
ReducingSink makeSink() {
return new
ReducingSink();
}
};
}
/**
* Constructs a {@code TerminalOp} that implements a functional reduce on
* {@code double} values.
*
* @param identity the identity for the combining function
* @param operator the combining function
* @return a {@code TerminalOp} implementing the reduction
*/
public static
TerminalOp<
Double,
Double>
makeDouble(double
identity,
DoubleBinaryOperator operator) {
Objects.
requireNonNull(
operator);
class
ReducingSink
implements
AccumulatingSink<
Double,
Double,
ReducingSink>,
Sink.
OfDouble {
private double
state;
@
Override
public void
begin(long
size) {
state =
identity;
}
@
Override
public void
accept(double
t) {
state =
operator.
applyAsDouble(
state,
t);
}
@
Override
public
Double get() {
return
state;
}
@
Override
public void
combine(
ReducingSink other) {
accept(
other.
state);
}
}
return new
ReduceOp<
Double,
Double,
ReducingSink>(
StreamShape.
DOUBLE_VALUE) {
@
Override
public
ReducingSink makeSink() {
return new
ReducingSink();
}
};
}
/**
* Constructs a {@code TerminalOp} that implements a functional reduce on
* {@code double} values, producing an optional double result.
*
* @param operator the combining function
* @return a {@code TerminalOp} implementing the reduction
*/
public static
TerminalOp<
Double,
OptionalDouble>
makeDouble(
DoubleBinaryOperator operator) {
Objects.
requireNonNull(
operator);
class
ReducingSink
implements
AccumulatingSink<
Double,
OptionalDouble,
ReducingSink>,
Sink.
OfDouble {
private boolean
empty;
private double
state;
public void
begin(long
size) {
empty = true;
state = 0;
}
@
Override
public void
accept(double
t) {
if (
empty) {
empty = false;
state =
t;
}
else {
state =
operator.
applyAsDouble(
state,
t);
}
}
@
Override
public
OptionalDouble get() {
return
empty ?
OptionalDouble.
empty() :
OptionalDouble.
of(
state);
}
@
Override
public void
combine(
ReducingSink other) {
if (!
other.
empty)
accept(
other.
state);
}
}
return new
ReduceOp<
Double,
OptionalDouble,
ReducingSink>(
StreamShape.
DOUBLE_VALUE) {
@
Override
public
ReducingSink makeSink() {
return new
ReducingSink();
}
};
}
/**
* Constructs a {@code TerminalOp} that implements a mutable reduce on
* {@code double} values.
*
* @param <R> the type of the result
* @param supplier a factory to produce a new accumulator of the result type
* @param accumulator a function to incorporate an int into an
* accumulator
* @param combiner a function to combine an accumulator into another
* @return a {@code TerminalOp} implementing the reduction
*/
public static <R>
TerminalOp<
Double, R>
makeDouble(
Supplier<R>
supplier,
ObjDoubleConsumer<R>
accumulator,
BinaryOperator<R>
combiner) {
Objects.
requireNonNull(
supplier);
Objects.
requireNonNull(
accumulator);
Objects.
requireNonNull(
combiner);
class
ReducingSink extends
Box<R>
implements
AccumulatingSink<
Double, R,
ReducingSink>,
Sink.
OfDouble {
@
Override
public void
begin(long
size) {
state =
supplier.
get();
}
@
Override
public void
accept(double
t) {
accumulator.
accept(
state,
t);
}
@
Override
public void
combine(
ReducingSink other) {
state =
combiner.
apply(
state,
other.
state);
}
}
return new
ReduceOp<
Double, R,
ReducingSink>(
StreamShape.
DOUBLE_VALUE) {
@
Override
public
ReducingSink makeSink() {
return new
ReducingSink();
}
};
}
/**
* A type of {@code TerminalSink} that implements an associative reducing
* operation on elements of type {@code T} and producing a result of type
* {@code R}.
*
* @param <T> the type of input element to the combining operation
* @param <R> the result type
* @param <K> the type of the {@code AccumulatingSink}.
*/
private interface
AccumulatingSink<T, R, K extends
AccumulatingSink<T, R, K>>
extends
TerminalSink<T, R> {
public void
combine(K
other);
}
/**
* State box for a single state element, used as a base class for
* {@code AccumulatingSink} instances
*
* @param <U> The type of the state element
*/
private static abstract class
Box<U> {
U
state;
Box() {} // Avoid creation of special accessor
public U
get() {
return
state;
}
}
/**
* A {@code TerminalOp} that evaluates a stream pipeline and sends the
* output into an {@code AccumulatingSink}, which performs a reduce
* operation. The {@code AccumulatingSink} must represent an associative
* reducing operation.
*
* @param <T> the output type of the stream pipeline
* @param <R> the result type of the reducing operation
* @param <S> the type of the {@code AccumulatingSink}
*/
private static abstract class
ReduceOp<T, R, S extends
AccumulatingSink<T, R, S>>
implements
TerminalOp<T, R> {
private final
StreamShape inputShape;
/**
* Create a {@code ReduceOp} of the specified stream shape which uses
* the specified {@code Supplier} to create accumulating sinks.
*
* @param shape The shape of the stream pipeline
*/
ReduceOp(
StreamShape shape) {
inputShape =
shape;
}
public abstract S
makeSink();
@
Override
public
StreamShape inputShape() {
return
inputShape;
}
@
Override
public <P_IN> R
evaluateSequential(
PipelineHelper<T>
helper,
Spliterator<P_IN>
spliterator) {
return
helper.
wrapAndCopyInto(
makeSink(),
spliterator).
get();
}
@
Override
public <P_IN> R
evaluateParallel(
PipelineHelper<T>
helper,
Spliterator<P_IN>
spliterator) {
return new
ReduceTask<>(this,
helper,
spliterator).
invoke().
get();
}
}
/**
* A {@code ForkJoinTask} for performing a parallel reduce operation.
*/
@
SuppressWarnings("serial")
private static final class
ReduceTask<P_IN, P_OUT, R,
S extends
AccumulatingSink<P_OUT, R, S>>
extends
AbstractTask<P_IN, P_OUT, S,
ReduceTask<P_IN, P_OUT, R, S>> {
private final
ReduceOp<P_OUT, R, S>
op;
ReduceTask(
ReduceOp<P_OUT, R, S>
op,
PipelineHelper<P_OUT>
helper,
Spliterator<P_IN>
spliterator) {
super(
helper,
spliterator);
this.
op =
op;
}
ReduceTask(
ReduceTask<P_IN, P_OUT, R, S>
parent,
Spliterator<P_IN>
spliterator) {
super(
parent,
spliterator);
this.
op =
parent.
op;
}
@
Override
protected
ReduceTask<P_IN, P_OUT, R, S>
makeChild(
Spliterator<P_IN>
spliterator) {
return new
ReduceTask<>(this,
spliterator);
}
@
Override
protected S
doLeaf() {
return
helper.
wrapAndCopyInto(
op.
makeSink(),
spliterator);
}
@
Override
public void
onCompletion(
CountedCompleter<?>
caller) {
if (!
isLeaf()) {
S
leftResult =
leftChild.
getLocalResult();
leftResult.
combine(
rightChild.
getLocalResult());
setLocalResult(
leftResult);
}
// GC spliterator, left and right child
super.onCompletion(
caller);
}
}
}