8020040: Improve and generalize the F/J tasks to handle right or left-balanced trees
Reviewed-by: briangoetz
Contributed-by: doug lea <dl@cs.oswego.edu>, paul sandoz <paul.sandoz@oracle.com>
--- a/jdk/src/share/classes/java/util/stream/AbstractShortCircuitTask.java Wed Jul 10 09:52:02 2013 +0200
+++ b/jdk/src/share/classes/java/util/stream/AbstractShortCircuitTask.java Wed Jul 10 10:24:38 2013 +0200
@@ -92,22 +92,51 @@
*/
protected abstract R getEmptyResult();
+ /**
+ * Overrides AbstractTask version to include checks for early
+ * exits while splitting or computing.
+ */
@Override
- protected boolean canCompute() {
- // Have we already found an answer?
- if (sharedResult.get() != null) {
- tryComplete();
- return false;
- } else if (taskCanceled()) {
- setLocalResult(getEmptyResult());
- tryComplete();
- return false;
+ public void compute() {
+ Spliterator<P_IN> rs = spliterator, ls;
+ long sizeEstimate = rs.estimateSize();
+ long sizeThreshold = getTargetSize(sizeEstimate);
+ boolean forkRight = false;
+ @SuppressWarnings("unchecked") K task = (K) this;
+ AtomicReference<R> sr = sharedResult;
+ R result;
+ while ((result = sr.get()) == null) {
+ if (task.taskCanceled()) {
+ result = task.getEmptyResult();
+ break;
+ }
+ if (sizeEstimate <= sizeThreshold || (ls = rs.trySplit()) == null) {
+ result = task.doLeaf();
+ break;
+ }
+ K leftChild, rightChild, taskToFork;
+ task.leftChild = leftChild = task.makeChild(ls);
+ task.rightChild = rightChild = task.makeChild(rs);
+ task.setPendingCount(1);
+ if (forkRight) {
+ forkRight = false;
+ rs = ls;
+ task = leftChild;
+ taskToFork = rightChild;
+ }
+ else {
+ forkRight = true;
+ task = rightChild;
+ taskToFork = leftChild;
+ }
+ taskToFork.fork();
+ sizeEstimate = rs.estimateSize();
}
- else {
- return true;
- }
+ task.setLocalResult(result);
+ task.tryComplete();
}
+
/**
* Declares that a globally valid result has been found. If another task has
* not already found the answer, the result is installed in
--- a/jdk/src/share/classes/java/util/stream/AbstractTask.java Wed Jul 10 09:52:02 2013 +0200
+++ b/jdk/src/share/classes/java/util/stream/AbstractTask.java Wed Jul 10 10:24:38 2013 +0200
@@ -102,7 +102,7 @@
protected Spliterator<P_IN> spliterator;
/** Target leaf size, common to all tasks in a computation */
- protected final long targetSize;
+ protected long targetSize; // may be laziliy initialized
/**
* The left child.
@@ -134,7 +134,7 @@
super(null);
this.helper = helper;
this.spliterator = spliterator;
- this.targetSize = suggestTargetSize(spliterator.estimateSize());
+ this.targetSize = 0L;
}
/**
@@ -182,27 +182,13 @@
}
/**
- * Returns a suggestion whether it is advisable to split the provided
- * spliterator based on target size and other considerations, such as pool
- * state.
- *
- * @return {@code true} if a split is advised otherwise {@code false}
+ * Returns the targetSize, initializing it via the supplied
+ * size estimate if not already initialized.
*/
- public static boolean suggestSplit(Spliterator spliterator,
- long targetSize) {
- long remaining = spliterator.estimateSize();
- return (remaining > targetSize);
- // @@@ May additionally want to fold in pool characteristics such as surplus task count
- }
-
- /**
- * Returns a suggestion whether it is adviseable to split this task based on
- * target size and other considerations.
- *
- * @return {@code true} if a split is advised otherwise {@code false}
- */
- public boolean suggestSplit() {
- return suggestSplit(spliterator, targetSize);
+ protected final long getTargetSize(long sizeEstimate) {
+ long s;
+ return ((s = targetSize) != 0 ? s :
+ (targetSize = suggestTargetSize(sizeEstimate)));
}
/**
@@ -285,43 +271,46 @@
}
/**
- * Decides whether or not to split a task further or compute it directly. If
- * computing directly, call {@code doLeaf} and pass the result to
- * {@code setRawResult}. If splitting, set up the child-related fields,
- * create the child tasks, fork the leftmost (prefix) child tasks, and
- * compute the rightmost (remaining) child tasks.
+ * Decides whether or not to split a task further or compute it
+ * directly. If computing directly, calls {@code doLeaf} and pass
+ * the result to {@code setRawResult}. Otherwise splits off
+ * subtasks, forking one and continuing as the other.
*
- * <p>
- * Computing will continue for rightmost tasks while a task can be computed
- * as determined by {@link #canCompute()} and that task should and can be
- * split into left and right tasks.
- *
- * <p>
- * The rightmost tasks are computed in a loop rather than recursively to
- * avoid potential stack overflows when computing with a right-balanced
- * tree, such as that produced when splitting with a {@link Spliterator}
- * created from an {@link java.util.Iterator}.
+ * <p> The method is structured to conserve resources across a
+ * range of uses. The loop continues with one of the child tasks
+ * when split, to avoid deep recursion. To cope with spliterators
+ * that may be systematically biased toward left-heavy or
+ * right-heavy splits, we alternate which child is forked versus
+ * continued in the loop.
*/
@Override
- public final void compute() {
- @SuppressWarnings("unchecked")
- K task = (K) this;
- while (task.canCompute()) {
- Spliterator<P_IN> split;
- if (!task.suggestSplit() || (split = task.spliterator.trySplit()) == null) {
- task.setLocalResult(task.doLeaf());
- task.tryComplete();
- return;
+ public void compute() {
+ Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
+ long sizeEstimate = rs.estimateSize();
+ long sizeThreshold = getTargetSize(sizeEstimate);
+ boolean forkRight = false;
+ @SuppressWarnings("unchecked") K task = (K) this;
+ while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
+ K leftChild, rightChild, taskToFork;
+ task.leftChild = leftChild = task.makeChild(ls);
+ task.rightChild = rightChild = task.makeChild(rs);
+ task.setPendingCount(1);
+ if (forkRight) {
+ forkRight = false;
+ rs = ls;
+ task = leftChild;
+ taskToFork = rightChild;
}
else {
- K l = task.leftChild = task.makeChild(split);
- K r = task.rightChild = task.makeChild(task.spliterator);
- task.spliterator = null;
- task.setPendingCount(1);
- l.fork();
- task = r;
+ forkRight = true;
+ task = rightChild;
+ taskToFork = leftChild;
}
+ taskToFork.fork();
+ sizeEstimate = rs.estimateSize();
}
+ task.setLocalResult(task.doLeaf());
+ task.tryComplete();
}
/**
@@ -339,21 +328,6 @@
}
/**
- * Determines if the task can be computed.
- *
- * @implSpec The default always returns true
- *
- * @return {@code true} if this task can be computed to either calculate the
- * leaf via {@link #doLeaf()} or split, otherwise false if this task
- * cannot be computed, for example if this task has been canceled
- * and/or a result for the computation has been found by another
- * task.
- */
- protected boolean canCompute() {
- return true;
- }
-
- /**
* Returns whether this node is a "leftmost" node -- whether the path from
* the root to this node involves only traversing leftmost child links. For
* a leaf node, this means it is the first leaf node in the encounter order.
--- a/jdk/src/share/classes/java/util/stream/ForEachOps.java Wed Jul 10 09:52:02 2013 +0200
+++ b/jdk/src/share/classes/java/util/stream/ForEachOps.java Wed Jul 10 10:24:38 2013 +0200
@@ -28,6 +28,7 @@
import java.util.Spliterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountedCompleter;
+import java.util.concurrent.ForkJoinTask;
import java.util.function.Consumer;
import java.util.function.DoubleConsumer;
import java.util.function.IntConsumer;
@@ -128,7 +129,7 @@
*
* @param <T> the output type of the stream pipeline
*/
- private static abstract class ForEachOp<T>
+ static abstract class ForEachOp<T>
implements TerminalOp<T, Void>, TerminalSink<T, Void> {
private final boolean ordered;
@@ -169,7 +170,7 @@
// Implementations
/** Implementation class for reference streams */
- private static class OfRef<T> extends ForEachOp<T> {
+ static final class OfRef<T> extends ForEachOp<T> {
final Consumer<? super T> consumer;
OfRef(Consumer<? super T> consumer, boolean ordered) {
@@ -184,7 +185,7 @@
}
/** Implementation class for {@code IntStream} */
- private static class OfInt extends ForEachOp<Integer>
+ static final class OfInt extends ForEachOp<Integer>
implements Sink.OfInt {
final IntConsumer consumer;
@@ -205,7 +206,7 @@
}
/** Implementation class for {@code LongStream} */
- private static class OfLong extends ForEachOp<Long>
+ static final class OfLong extends ForEachOp<Long>
implements Sink.OfLong {
final LongConsumer consumer;
@@ -226,7 +227,7 @@
}
/** Implementation class for {@code DoubleStream} */
- private static class OfDouble extends ForEachOp<Double>
+ static final class OfDouble extends ForEachOp<Double>
implements Sink.OfDouble {
final DoubleConsumer consumer;
@@ -248,20 +249,20 @@
}
/** A {@code ForkJoinTask} for performing a parallel for-each operation */
- private static class ForEachTask<S, T> extends CountedCompleter<Void> {
+ static final class ForEachTask<S, T> extends CountedCompleter<Void> {
private Spliterator<S> spliterator;
private final Sink<S> sink;
private final PipelineHelper<T> helper;
- private final long targetSize;
+ private long targetSize;
ForEachTask(PipelineHelper<T> helper,
Spliterator<S> spliterator,
Sink<S> sink) {
super(null);
+ this.sink = sink;
+ this.helper = helper;
this.spliterator = spliterator;
- this.sink = sink;
- this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize());
- this.helper = helper;
+ this.targetSize = 0L;
}
ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator) {
@@ -272,28 +273,40 @@
this.helper = parent.helper;
}
+ // Similar to AbstractTask but doesn't need to track child tasks
public void compute() {
+ Spliterator<S> rightSplit = spliterator, leftSplit;
+ long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
+ if ((sizeThreshold = targetSize) == 0L)
+ targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
- while (true) {
- if (isShortCircuit && sink.cancellationRequested()) {
- propagateCompletion();
- spliterator = null;
- return;
+ boolean forkRight = false;
+ Sink<S> taskSink = sink;
+ ForEachTask<S, T> task = this;
+ while (!isShortCircuit || !taskSink.cancellationRequested()) {
+ if (sizeEstimate <= sizeThreshold ||
+ (leftSplit = rightSplit.trySplit()) == null) {
+ task.helper.copyInto(taskSink, rightSplit);
+ break;
}
-
- Spliterator<S> split;
- if (!AbstractTask.suggestSplit(spliterator, targetSize)
- || (split = spliterator.trySplit()) == null) {
- helper.copyInto(sink, spliterator);
- propagateCompletion();
- spliterator = null;
- return;
+ ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
+ task.addToPendingCount(1);
+ ForEachTask<S, T> taskToFork;
+ if (forkRight) {
+ forkRight = false;
+ rightSplit = leftSplit;
+ taskToFork = task;
+ task = leftTask;
}
else {
- addToPendingCount(1);
- new ForEachTask<>(this, split).fork();
+ forkRight = true;
+ taskToFork = leftTask;
}
+ taskToFork.fork();
+ sizeEstimate = rightSplit.estimateSize();
}
+ task.spliterator = null;
+ task.propagateCompletion();
}
}
@@ -301,7 +314,7 @@
* A {@code ForkJoinTask} for performing a parallel for-each operation
* which visits the elements in encounter order
*/
- private static class ForEachOrderedTask<S, T> extends CountedCompleter<Void> {
+ static final class ForEachOrderedTask<S, T> extends CountedCompleter<Void> {
private final PipelineHelper<T> helper;
private Spliterator<S> spliterator;
private final long targetSize;
@@ -343,39 +356,49 @@
}
private static <S, T> void doCompute(ForEachOrderedTask<S, T> task) {
- while (true) {
- Spliterator<S> split;
- if (!AbstractTask.suggestSplit(task.spliterator, task.targetSize)
- || (split = task.spliterator.trySplit()) == null) {
- if (task.getPendingCount() == 0) {
- task.helper.wrapAndCopyInto(task.action, task.spliterator);
- }
- else {
- Node.Builder<T> nb = task.helper.makeNodeBuilder(
- task.helper.exactOutputSizeIfKnown(task.spliterator),
- size -> (T[]) new Object[size]);
- task.node = task.helper.wrapAndCopyInto(nb, task.spliterator).build();
- }
- task.tryComplete();
- return;
+ Spliterator<S> rightSplit = task.spliterator, leftSplit;
+ long sizeThreshold = task.targetSize;
+ boolean forkRight = false;
+ while (rightSplit.estimateSize() > sizeThreshold &&
+ (leftSplit = rightSplit.trySplit()) != null) {
+ ForEachOrderedTask<S, T> leftChild =
+ new ForEachOrderedTask<>(task, leftSplit, task.leftPredecessor);
+ ForEachOrderedTask<S, T> rightChild =
+ new ForEachOrderedTask<>(task, rightSplit, leftChild);
+ task.completionMap.put(leftChild, rightChild);
+ task.addToPendingCount(1); // forking
+ rightChild.addToPendingCount(1); // right pending on left child
+ if (task.leftPredecessor != null) {
+ leftChild.addToPendingCount(1); // left pending on previous subtree, except left spine
+ if (task.completionMap.replace(task.leftPredecessor, task, leftChild))
+ task.addToPendingCount(-1); // transfer my "right child" count to my left child
+ else
+ leftChild.addToPendingCount(-1); // left child is ready to go when ready
+ }
+ ForEachOrderedTask<S, T> taskToFork;
+ if (forkRight) {
+ forkRight = false;
+ rightSplit = leftSplit;
+ task = leftChild;
+ taskToFork = rightChild;
}
else {
- ForEachOrderedTask<S, T> leftChild = new ForEachOrderedTask<>(task, split, task.leftPredecessor);
- ForEachOrderedTask<S, T> rightChild = new ForEachOrderedTask<>(task, task.spliterator, leftChild);
- task.completionMap.put(leftChild, rightChild);
- task.addToPendingCount(1); // forking
- rightChild.addToPendingCount(1); // right pending on left child
- if (task.leftPredecessor != null) {
- leftChild.addToPendingCount(1); // left pending on previous subtree, except left spine
- if (task.completionMap.replace(task.leftPredecessor, task, leftChild))
- task.addToPendingCount(-1); // transfer my "right child" count to my left child
- else
- leftChild.addToPendingCount(-1); // left child is ready to go when ready
- }
- leftChild.fork();
+ forkRight = true;
task = rightChild;
+ taskToFork = leftChild;
}
+ taskToFork.fork();
}
+ if (task.getPendingCount() == 0) {
+ task.helper.wrapAndCopyInto(task.action, rightSplit);
+ }
+ else {
+ Node.Builder<T> nb = task.helper.makeNodeBuilder(
+ task.helper.exactOutputSizeIfKnown(rightSplit),
+ size -> (T[]) new Object[size]);
+ task.node = task.helper.wrapAndCopyInto(nb, rightSplit).build();
+ }
+ task.tryComplete();
}
@Override
--- a/jdk/src/share/classes/java/util/stream/Nodes.java Wed Jul 10 09:52:02 2013 +0200
+++ b/jdk/src/share/classes/java/util/stream/Nodes.java Wed Jul 10 10:24:38 2013 +0200
@@ -1829,25 +1829,20 @@
@Override
public void compute() {
SizedCollectorTask<P_IN, P_OUT, T_SINK, K> task = this;
- while (true) {
- Spliterator<P_IN> leftSplit;
- if (!AbstractTask.suggestSplit(task.spliterator, task.targetSize)
- || ((leftSplit = task.spliterator.trySplit()) == null)) {
- if (task.offset + task.length >= MAX_ARRAY_SIZE)
- throw new IllegalArgumentException("Stream size exceeds max array size");
- T_SINK sink = (T_SINK) task;
- task.helper.wrapAndCopyInto(sink, task.spliterator);
- task.propagateCompletion();
- return;
- }
- else {
- task.setPendingCount(1);
- long leftSplitSize = leftSplit.estimateSize();
- task.makeChild(leftSplit, task.offset, leftSplitSize).fork();
- task = task.makeChild(task.spliterator, task.offset + leftSplitSize,
- task.length - leftSplitSize);
- }
+ Spliterator<P_IN> rightSplit = spliterator, leftSplit;
+ while (rightSplit.estimateSize() > task.targetSize &&
+ (leftSplit = rightSplit.trySplit()) != null) {
+ task.setPendingCount(1);
+ long leftSplitSize = leftSplit.estimateSize();
+ task.makeChild(leftSplit, task.offset, leftSplitSize).fork();
+ task = task.makeChild(rightSplit, task.offset + leftSplitSize,
+ task.length - leftSplitSize);
}
+ if (task.offset + task.length >= MAX_ARRAY_SIZE)
+ throw new IllegalArgumentException("Stream size exceeds max array size");
+ T_SINK sink = (T_SINK) task;
+ task.helper.wrapAndCopyInto(sink, rightSplit);
+ task.propagateCompletion();
}
abstract K makeChild(Spliterator<P_IN> spliterator, long offset, long size);