/*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
* ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*/
package java.util.stream;
import java.util.
Spliterator;
import java.util.concurrent.
CountedCompleter;
import java.util.function.
IntFunction;
/**
* Factory for instances of a short-circuiting stateful intermediate operations
* that produce subsequences of their input stream.
*
* @since 1.8
*/
final class
SliceOps {
// No instances
private
SliceOps() { }
/**
* Calculates the sliced size given the current size, number of elements
* skip, and the number of elements to limit.
*
* @param size the current size
* @param skip the number of elements to skip, assumed to be >= 0
* @param limit the number of elements to limit, assumed to be >= 0, with
* a value of {@code Long.MAX_VALUE} if there is no limit
* @return the sliced size
*/
private static long
calcSize(long
size, long
skip, long
limit) {
return
size >= 0 ?
Math.
max(-1,
Math.
min(
size -
skip,
limit)) : -1;
}
/**
* Calculates the slice fence, which is one past the index of the slice
* range
* @param skip the number of elements to skip, assumed to be >= 0
* @param limit the number of elements to limit, assumed to be >= 0, with
* a value of {@code Long.MAX_VALUE} if there is no limit
* @return the slice fence.
*/
private static long
calcSliceFence(long
skip, long
limit) {
long
sliceFence =
limit >= 0 ?
skip +
limit :
Long.
MAX_VALUE;
// Check for overflow
return (
sliceFence >= 0) ?
sliceFence :
Long.
MAX_VALUE;
}
/**
* Creates a slice spliterator given a stream shape governing the
* spliterator type. Requires that the underlying Spliterator
* be SUBSIZED.
*/
@
SuppressWarnings("unchecked")
private static <P_IN>
Spliterator<P_IN>
sliceSpliterator(
StreamShape shape,
Spliterator<P_IN>
s,
long
skip, long
limit) {
assert
s.
hasCharacteristics(
Spliterator.
SUBSIZED);
long
sliceFence =
calcSliceFence(
skip,
limit);
switch (
shape) {
case
REFERENCE:
return new
StreamSpliterators
.
SliceSpliterator.
OfRef<>(
s,
skip,
sliceFence);
case
INT_VALUE:
return (
Spliterator<P_IN>) new
StreamSpliterators
.
SliceSpliterator.
OfInt((
Spliterator.
OfInt)
s,
skip,
sliceFence);
case
LONG_VALUE:
return (
Spliterator<P_IN>) new
StreamSpliterators
.
SliceSpliterator.
OfLong((
Spliterator.
OfLong)
s,
skip,
sliceFence);
case
DOUBLE_VALUE:
return (
Spliterator<P_IN>) new
StreamSpliterators
.
SliceSpliterator.
OfDouble((
Spliterator.
OfDouble)
s,
skip,
sliceFence);
default:
throw new
IllegalStateException("Unknown shape " +
shape);
}
}
@
SuppressWarnings("unchecked")
private static <T>
IntFunction<T[]>
castingArray() {
return
size -> (T[]) new
Object[
size];
}
/**
* Appends a "slice" operation to the provided stream. The slice operation
* may be may be skip-only, limit-only, or skip-and-limit.
*
* @param <T> the type of both input and output elements
* @param upstream a reference stream with element type T
* @param skip the number of elements to skip. Must be >= 0.
* @param limit the maximum size of the resulting stream, or -1 if no limit
* is to be imposed
*/
public static <T>
Stream<T>
makeRef(
AbstractPipeline<?, T, ?>
upstream,
long
skip, long
limit) {
if (
skip < 0)
throw new
IllegalArgumentException("Skip must be non-negative: " +
skip);
return new
ReferencePipeline.
StatefulOp<T, T>(
upstream,
StreamShape.
REFERENCE,
flags(
limit)) {
Spliterator<T>
unorderedSkipLimitSpliterator(
Spliterator<T>
s,
long
skip, long
limit, long
sizeIfKnown) {
if (
skip <=
sizeIfKnown) {
// Use just the limit if the number of elements
// to skip is <= the known pipeline size
limit =
limit >= 0 ?
Math.
min(
limit,
sizeIfKnown -
skip) :
sizeIfKnown -
skip;
skip = 0;
}
return new
StreamSpliterators.
UnorderedSliceSpliterator.
OfRef<>(
s,
skip,
limit);
}
@
Override
<P_IN>
Spliterator<T>
opEvaluateParallelLazy(
PipelineHelper<T>
helper,
Spliterator<P_IN>
spliterator) {
long
size =
helper.
exactOutputSizeIfKnown(
spliterator);
if (
size > 0 &&
spliterator.
hasCharacteristics(
Spliterator.
SUBSIZED)) {
return new
StreamSpliterators.
SliceSpliterator.
OfRef<>(
helper.
wrapSpliterator(
spliterator),
skip,
calcSliceFence(
skip,
limit));
} else if (!
StreamOpFlag.
ORDERED.
isKnown(
helper.
getStreamAndOpFlags())) {
return
unorderedSkipLimitSpliterator(
helper.
wrapSpliterator(
spliterator),
skip,
limit,
size);
}
else {
// @@@ OOMEs will occur for LongStream.longs().filter(i -> true).limit(n)
// regardless of the value of n
// Need to adjust the target size of splitting for the
// SliceTask from say (size / k) to say min(size / k, 1 << 14)
// This will limit the size of the buffers created at the leaf nodes
// cancellation will be more aggressive cancelling later tasks
// if the target slice size has been reached from a given task,
// cancellation should also clear local results if any
return new
SliceTask<>(this,
helper,
spliterator,
castingArray(),
skip,
limit).
invoke().
spliterator();
}
}
@
Override
<P_IN>
Node<T>
opEvaluateParallel(
PipelineHelper<T>
helper,
Spliterator<P_IN>
spliterator,
IntFunction<T[]>
generator) {
long
size =
helper.
exactOutputSizeIfKnown(
spliterator);
if (
size > 0 &&
spliterator.
hasCharacteristics(
Spliterator.
SUBSIZED)) {
// Because the pipeline is SIZED the slice spliterator
// can be created from the source, this requires matching
// to shape of the source, and is potentially more efficient
// than creating the slice spliterator from the pipeline
// wrapping spliterator
Spliterator<P_IN>
s =
sliceSpliterator(
helper.
getSourceShape(),
spliterator,
skip,
limit);
return
Nodes.
collect(
helper,
s, true,
generator);
} else if (!
StreamOpFlag.
ORDERED.
isKnown(
helper.
getStreamAndOpFlags())) {
Spliterator<T>
s =
unorderedSkipLimitSpliterator(
helper.
wrapSpliterator(
spliterator),
skip,
limit,
size);
// Collect using this pipeline, which is empty and therefore
// can be used with the pipeline wrapping spliterator
// Note that we cannot create a slice spliterator from
// the source spliterator if the pipeline is not SIZED
return
Nodes.
collect(this,
s, true,
generator);
}
else {
return new
SliceTask<>(this,
helper,
spliterator,
generator,
skip,
limit).
invoke();
}
}
@
Override
Sink<T>
opWrapSink(int
flags,
Sink<T>
sink) {
return new
Sink.
ChainedReference<T, T>(
sink) {
long
n =
skip;
long
m =
limit >= 0 ?
limit :
Long.
MAX_VALUE;
@
Override
public void
begin(long
size) {
downstream.
begin(
calcSize(
size,
skip,
m));
}
@
Override
public void
accept(T
t) {
if (
n == 0) {
if (
m > 0) {
m--;
downstream.
accept(
t);
}
}
else {
n--;
}
}
@
Override
public boolean
cancellationRequested() {
return
m == 0 ||
downstream.
cancellationRequested();
}
};
}
};
}
/**
* Appends a "slice" operation to the provided IntStream. The slice
* operation may be may be skip-only, limit-only, or skip-and-limit.
*
* @param upstream An IntStream
* @param skip The number of elements to skip. Must be >= 0.
* @param limit The maximum size of the resulting stream, or -1 if no limit
* is to be imposed
*/
public static
IntStream makeInt(
AbstractPipeline<?,
Integer, ?>
upstream,
long
skip, long
limit) {
if (
skip < 0)
throw new
IllegalArgumentException("Skip must be non-negative: " +
skip);
return new
IntPipeline.
StatefulOp<
Integer>(
upstream,
StreamShape.
INT_VALUE,
flags(
limit)) {
Spliterator.
OfInt unorderedSkipLimitSpliterator(
Spliterator.
OfInt s, long
skip, long
limit, long
sizeIfKnown) {
if (
skip <=
sizeIfKnown) {
// Use just the limit if the number of elements
// to skip is <= the known pipeline size
limit =
limit >= 0 ?
Math.
min(
limit,
sizeIfKnown -
skip) :
sizeIfKnown -
skip;
skip = 0;
}
return new
StreamSpliterators.
UnorderedSliceSpliterator.
OfInt(
s,
skip,
limit);
}
@
Override
<P_IN>
Spliterator<
Integer>
opEvaluateParallelLazy(
PipelineHelper<
Integer>
helper,
Spliterator<P_IN>
spliterator) {
long
size =
helper.
exactOutputSizeIfKnown(
spliterator);
if (
size > 0 &&
spliterator.
hasCharacteristics(
Spliterator.
SUBSIZED)) {
return new
StreamSpliterators.
SliceSpliterator.
OfInt(
(
Spliterator.
OfInt)
helper.
wrapSpliterator(
spliterator),
skip,
calcSliceFence(
skip,
limit));
} else if (!
StreamOpFlag.
ORDERED.
isKnown(
helper.
getStreamAndOpFlags())) {
return
unorderedSkipLimitSpliterator(
(
Spliterator.
OfInt)
helper.
wrapSpliterator(
spliterator),
skip,
limit,
size);
}
else {
return new
SliceTask<>(this,
helper,
spliterator,
Integer[]::new,
skip,
limit).
invoke().
spliterator();
}
}
@
Override
<P_IN>
Node<
Integer>
opEvaluateParallel(
PipelineHelper<
Integer>
helper,
Spliterator<P_IN>
spliterator,
IntFunction<
Integer[]>
generator) {
long
size =
helper.
exactOutputSizeIfKnown(
spliterator);
if (
size > 0 &&
spliterator.
hasCharacteristics(
Spliterator.
SUBSIZED)) {
// Because the pipeline is SIZED the slice spliterator
// can be created from the source, this requires matching
// to shape of the source, and is potentially more efficient
// than creating the slice spliterator from the pipeline
// wrapping spliterator
Spliterator<P_IN>
s =
sliceSpliterator(
helper.
getSourceShape(),
spliterator,
skip,
limit);
return
Nodes.
collectInt(
helper,
s, true);
} else if (!
StreamOpFlag.
ORDERED.
isKnown(
helper.
getStreamAndOpFlags())) {
Spliterator.
OfInt s =
unorderedSkipLimitSpliterator(
(
Spliterator.
OfInt)
helper.
wrapSpliterator(
spliterator),
skip,
limit,
size);
// Collect using this pipeline, which is empty and therefore
// can be used with the pipeline wrapping spliterator
// Note that we cannot create a slice spliterator from
// the source spliterator if the pipeline is not SIZED
return
Nodes.
collectInt(this,
s, true);
}
else {
return new
SliceTask<>(this,
helper,
spliterator,
generator,
skip,
limit).
invoke();
}
}
@
Override
Sink<
Integer>
opWrapSink(int
flags,
Sink<
Integer>
sink) {
return new
Sink.
ChainedInt<
Integer>(
sink) {
long
n =
skip;
long
m =
limit >= 0 ?
limit :
Long.
MAX_VALUE;
@
Override
public void
begin(long
size) {
downstream.
begin(
calcSize(
size,
skip,
m));
}
@
Override
public void
accept(int
t) {
if (
n == 0) {
if (
m > 0) {
m--;
downstream.
accept(
t);
}
}
else {
n--;
}
}
@
Override
public boolean
cancellationRequested() {
return
m == 0 ||
downstream.
cancellationRequested();
}
};
}
};
}
/**
* Appends a "slice" operation to the provided LongStream. The slice
* operation may be may be skip-only, limit-only, or skip-and-limit.
*
* @param upstream A LongStream
* @param skip The number of elements to skip. Must be >= 0.
* @param limit The maximum size of the resulting stream, or -1 if no limit
* is to be imposed
*/
public static
LongStream makeLong(
AbstractPipeline<?,
Long, ?>
upstream,
long
skip, long
limit) {
if (
skip < 0)
throw new
IllegalArgumentException("Skip must be non-negative: " +
skip);
return new
LongPipeline.
StatefulOp<
Long>(
upstream,
StreamShape.
LONG_VALUE,
flags(
limit)) {
Spliterator.
OfLong unorderedSkipLimitSpliterator(
Spliterator.
OfLong s, long
skip, long
limit, long
sizeIfKnown) {
if (
skip <=
sizeIfKnown) {
// Use just the limit if the number of elements
// to skip is <= the known pipeline size
limit =
limit >= 0 ?
Math.
min(
limit,
sizeIfKnown -
skip) :
sizeIfKnown -
skip;
skip = 0;
}
return new
StreamSpliterators.
UnorderedSliceSpliterator.
OfLong(
s,
skip,
limit);
}
@
Override
<P_IN>
Spliterator<
Long>
opEvaluateParallelLazy(
PipelineHelper<
Long>
helper,
Spliterator<P_IN>
spliterator) {
long
size =
helper.
exactOutputSizeIfKnown(
spliterator);
if (
size > 0 &&
spliterator.
hasCharacteristics(
Spliterator.
SUBSIZED)) {
return new
StreamSpliterators.
SliceSpliterator.
OfLong(
(
Spliterator.
OfLong)
helper.
wrapSpliterator(
spliterator),
skip,
calcSliceFence(
skip,
limit));
} else if (!
StreamOpFlag.
ORDERED.
isKnown(
helper.
getStreamAndOpFlags())) {
return
unorderedSkipLimitSpliterator(
(
Spliterator.
OfLong)
helper.
wrapSpliterator(
spliterator),
skip,
limit,
size);
}
else {
return new
SliceTask<>(this,
helper,
spliterator,
Long[]::new,
skip,
limit).
invoke().
spliterator();
}
}
@
Override
<P_IN>
Node<
Long>
opEvaluateParallel(
PipelineHelper<
Long>
helper,
Spliterator<P_IN>
spliterator,
IntFunction<
Long[]>
generator) {
long
size =
helper.
exactOutputSizeIfKnown(
spliterator);
if (
size > 0 &&
spliterator.
hasCharacteristics(
Spliterator.
SUBSIZED)) {
// Because the pipeline is SIZED the slice spliterator
// can be created from the source, this requires matching
// to shape of the source, and is potentially more efficient
// than creating the slice spliterator from the pipeline
// wrapping spliterator
Spliterator<P_IN>
s =
sliceSpliterator(
helper.
getSourceShape(),
spliterator,
skip,
limit);
return
Nodes.
collectLong(
helper,
s, true);
} else if (!
StreamOpFlag.
ORDERED.
isKnown(
helper.
getStreamAndOpFlags())) {
Spliterator.
OfLong s =
unorderedSkipLimitSpliterator(
(
Spliterator.
OfLong)
helper.
wrapSpliterator(
spliterator),
skip,
limit,
size);
// Collect using this pipeline, which is empty and therefore
// can be used with the pipeline wrapping spliterator
// Note that we cannot create a slice spliterator from
// the source spliterator if the pipeline is not SIZED
return
Nodes.
collectLong(this,
s, true);
}
else {
return new
SliceTask<>(this,
helper,
spliterator,
generator,
skip,
limit).
invoke();
}
}
@
Override
Sink<
Long>
opWrapSink(int
flags,
Sink<
Long>
sink) {
return new
Sink.
ChainedLong<
Long>(
sink) {
long
n =
skip;
long
m =
limit >= 0 ?
limit :
Long.
MAX_VALUE;
@
Override
public void
begin(long
size) {
downstream.
begin(
calcSize(
size,
skip,
m));
}
@
Override
public void
accept(long
t) {
if (
n == 0) {
if (
m > 0) {
m--;
downstream.
accept(
t);
}
}
else {
n--;
}
}
@
Override
public boolean
cancellationRequested() {
return
m == 0 ||
downstream.
cancellationRequested();
}
};
}
};
}
/**
* Appends a "slice" operation to the provided DoubleStream. The slice
* operation may be may be skip-only, limit-only, or skip-and-limit.
*
* @param upstream A DoubleStream
* @param skip The number of elements to skip. Must be >= 0.
* @param limit The maximum size of the resulting stream, or -1 if no limit
* is to be imposed
*/
public static
DoubleStream makeDouble(
AbstractPipeline<?,
Double, ?>
upstream,
long
skip, long
limit) {
if (
skip < 0)
throw new
IllegalArgumentException("Skip must be non-negative: " +
skip);
return new
DoublePipeline.
StatefulOp<
Double>(
upstream,
StreamShape.
DOUBLE_VALUE,
flags(
limit)) {
Spliterator.
OfDouble unorderedSkipLimitSpliterator(
Spliterator.
OfDouble s, long
skip, long
limit, long
sizeIfKnown) {
if (
skip <=
sizeIfKnown) {
// Use just the limit if the number of elements
// to skip is <= the known pipeline size
limit =
limit >= 0 ?
Math.
min(
limit,
sizeIfKnown -
skip) :
sizeIfKnown -
skip;
skip = 0;
}
return new
StreamSpliterators.
UnorderedSliceSpliterator.
OfDouble(
s,
skip,
limit);
}
@
Override
<P_IN>
Spliterator<
Double>
opEvaluateParallelLazy(
PipelineHelper<
Double>
helper,
Spliterator<P_IN>
spliterator) {
long
size =
helper.
exactOutputSizeIfKnown(
spliterator);
if (
size > 0 &&
spliterator.
hasCharacteristics(
Spliterator.
SUBSIZED)) {
return new
StreamSpliterators.
SliceSpliterator.
OfDouble(
(
Spliterator.
OfDouble)
helper.
wrapSpliterator(
spliterator),
skip,
calcSliceFence(
skip,
limit));
} else if (!
StreamOpFlag.
ORDERED.
isKnown(
helper.
getStreamAndOpFlags())) {
return
unorderedSkipLimitSpliterator(
(
Spliterator.
OfDouble)
helper.
wrapSpliterator(
spliterator),
skip,
limit,
size);
}
else {
return new
SliceTask<>(this,
helper,
spliterator,
Double[]::new,
skip,
limit).
invoke().
spliterator();
}
}
@
Override
<P_IN>
Node<
Double>
opEvaluateParallel(
PipelineHelper<
Double>
helper,
Spliterator<P_IN>
spliterator,
IntFunction<
Double[]>
generator) {
long
size =
helper.
exactOutputSizeIfKnown(
spliterator);
if (
size > 0 &&
spliterator.
hasCharacteristics(
Spliterator.
SUBSIZED)) {
// Because the pipeline is SIZED the slice spliterator
// can be created from the source, this requires matching
// to shape of the source, and is potentially more efficient
// than creating the slice spliterator from the pipeline
// wrapping spliterator
Spliterator<P_IN>
s =
sliceSpliterator(
helper.
getSourceShape(),
spliterator,
skip,
limit);
return
Nodes.
collectDouble(
helper,
s, true);
} else if (!
StreamOpFlag.
ORDERED.
isKnown(
helper.
getStreamAndOpFlags())) {
Spliterator.
OfDouble s =
unorderedSkipLimitSpliterator(
(
Spliterator.
OfDouble)
helper.
wrapSpliterator(
spliterator),
skip,
limit,
size);
// Collect using this pipeline, which is empty and therefore
// can be used with the pipeline wrapping spliterator
// Note that we cannot create a slice spliterator from
// the source spliterator if the pipeline is not SIZED
return
Nodes.
collectDouble(this,
s, true);
}
else {
return new
SliceTask<>(this,
helper,
spliterator,
generator,
skip,
limit).
invoke();
}
}
@
Override
Sink<
Double>
opWrapSink(int
flags,
Sink<
Double>
sink) {
return new
Sink.
ChainedDouble<
Double>(
sink) {
long
n =
skip;
long
m =
limit >= 0 ?
limit :
Long.
MAX_VALUE;
@
Override
public void
begin(long
size) {
downstream.
begin(
calcSize(
size,
skip,
m));
}
@
Override
public void
accept(double
t) {
if (
n == 0) {
if (
m > 0) {
m--;
downstream.
accept(
t);
}
}
else {
n--;
}
}
@
Override
public boolean
cancellationRequested() {
return
m == 0 ||
downstream.
cancellationRequested();
}
};
}
};
}
private static int
flags(long
limit) {
return
StreamOpFlag.
NOT_SIZED | ((
limit != -1) ?
StreamOpFlag.
IS_SHORT_CIRCUIT : 0);
}
/**
* {@code ForkJoinTask} implementing slice computation.
*
* @param <P_IN> Input element type to the stream pipeline
* @param <P_OUT> Output element type from the stream pipeline
*/
@
SuppressWarnings("serial")
private static final class
SliceTask<P_IN, P_OUT>
extends
AbstractShortCircuitTask<P_IN, P_OUT,
Node<P_OUT>,
SliceTask<P_IN, P_OUT>> {
private final
AbstractPipeline<P_OUT, P_OUT, ?>
op;
private final
IntFunction<P_OUT[]>
generator;
private final long
targetOffset,
targetSize;
private long
thisNodeSize;
private volatile boolean
completed;
SliceTask(
AbstractPipeline<P_OUT, P_OUT, ?>
op,
PipelineHelper<P_OUT>
helper,
Spliterator<P_IN>
spliterator,
IntFunction<P_OUT[]>
generator,
long
offset, long
size) {
super(
helper,
spliterator);
this.
op =
op;
this.
generator =
generator;
this.
targetOffset =
offset;
this.
targetSize =
size;
}
SliceTask(
SliceTask<P_IN, P_OUT>
parent,
Spliterator<P_IN>
spliterator) {
super(
parent,
spliterator);
this.
op =
parent.
op;
this.
generator =
parent.
generator;
this.
targetOffset =
parent.
targetOffset;
this.
targetSize =
parent.
targetSize;
}
@
Override
protected
SliceTask<P_IN, P_OUT>
makeChild(
Spliterator<P_IN>
spliterator) {
return new
SliceTask<>(this,
spliterator);
}
@
Override
protected final
Node<P_OUT>
getEmptyResult() {
return
Nodes.
emptyNode(
op.
getOutputShape());
}
@
Override
protected final
Node<P_OUT>
doLeaf() {
if (
isRoot()) {
long
sizeIfKnown =
StreamOpFlag.
SIZED.
isPreserved(
op.
sourceOrOpFlags)
?
op.
exactOutputSizeIfKnown(
spliterator)
: -1;
final
Node.
Builder<P_OUT>
nb =
op.
makeNodeBuilder(
sizeIfKnown,
generator);
Sink<P_OUT>
opSink =
op.
opWrapSink(
helper.
getStreamAndOpFlags(),
nb);
helper.
copyIntoWithCancel(
helper.
wrapSink(
opSink),
spliterator);
// There is no need to truncate since the op performs the
// skipping and limiting of elements
return
nb.
build();
}
else {
Node<P_OUT>
node =
helper.
wrapAndCopyInto(
helper.
makeNodeBuilder(-1,
generator),
spliterator).
build();
thisNodeSize =
node.
count();
completed = true;
spliterator = null;
return
node;
}
}
@
Override
public final void
onCompletion(
CountedCompleter<?>
caller) {
if (!
isLeaf()) {
Node<P_OUT>
result;
thisNodeSize =
leftChild.
thisNodeSize +
rightChild.
thisNodeSize;
if (
canceled) {
thisNodeSize = 0;
result =
getEmptyResult();
}
else if (
thisNodeSize == 0)
result =
getEmptyResult();
else if (
leftChild.
thisNodeSize == 0)
result =
rightChild.
getLocalResult();
else {
result =
Nodes.
conc(
op.
getOutputShape(),
leftChild.
getLocalResult(),
rightChild.
getLocalResult());
}
setLocalResult(
isRoot() ?
doTruncate(
result) :
result);
completed = true;
}
if (
targetSize >= 0
&& !
isRoot()
&&
isLeftCompleted(
targetOffset +
targetSize))
cancelLaterNodes();
super.onCompletion(
caller);
}
@
Override
protected void
cancel() {
super.cancel();
if (
completed)
setLocalResult(
getEmptyResult());
}
private
Node<P_OUT>
doTruncate(
Node<P_OUT>
input) {
long
to =
targetSize >= 0 ?
Math.
min(
input.
count(),
targetOffset +
targetSize) :
thisNodeSize;
return
input.
truncate(
targetOffset,
to,
generator);
}
/**
* Determine if the number of completed elements in this node and nodes
* to the left of this node is greater than or equal to the target size.
*
* @param target the target size
* @return true if the number of elements is greater than or equal to
* the target size, otherwise false.
*/
private boolean
isLeftCompleted(long
target) {
long
size =
completed ?
thisNodeSize :
completedSize(
target);
if (
size >=
target)
return true;
for (
SliceTask<P_IN, P_OUT>
parent =
getParent(),
node = this;
parent != null;
node =
parent,
parent =
parent.
getParent()) {
if (
node ==
parent.
rightChild) {
SliceTask<P_IN, P_OUT>
left =
parent.
leftChild;
if (
left != null) {
size +=
left.
completedSize(
target);
if (
size >=
target)
return true;
}
}
}
return
size >=
target;
}
/**
* Compute the number of completed elements in this node.
* <p>
* Computation terminates if all nodes have been processed or the
* number of completed elements is greater than or equal to the target
* size.
*
* @param target the target size
* @return return the number of completed elements
*/
private long
completedSize(long
target) {
if (
completed)
return
thisNodeSize;
else {
SliceTask<P_IN, P_OUT>
left =
leftChild;
SliceTask<P_IN, P_OUT>
right =
rightChild;
if (
left == null ||
right == null) {
// must be completed
return
thisNodeSize;
}
else {
long
leftSize =
left.
completedSize(
target);
return (
leftSize >=
target) ?
leftSize :
leftSize +
right.
completedSize(
target);
}
}
}
}
}