8020040: Improve and generalize the F/J tasks to handle right or left-balanced trees
authorpsandoz
Wed, 10 Jul 2013 10:24:38 +0200
changeset 18795 25d68f4a1b38
parent 18794 17d9c2ec5e47
child 18796 486b43748d9b
8020040: Improve and generalize the F/J tasks to handle right or left-balanced trees Reviewed-by: briangoetz Contributed-by: doug lea <dl@cs.oswego.edu>, paul sandoz <paul.sandoz@oracle.com>
jdk/src/share/classes/java/util/stream/AbstractShortCircuitTask.java
jdk/src/share/classes/java/util/stream/AbstractTask.java
jdk/src/share/classes/java/util/stream/ForEachOps.java
jdk/src/share/classes/java/util/stream/Nodes.java
--- a/jdk/src/share/classes/java/util/stream/AbstractShortCircuitTask.java	Wed Jul 10 09:52:02 2013 +0200
+++ b/jdk/src/share/classes/java/util/stream/AbstractShortCircuitTask.java	Wed Jul 10 10:24:38 2013 +0200
@@ -92,22 +92,51 @@
      */
     protected abstract R getEmptyResult();
 
+    /**
+     * Overrides AbstractTask version to include checks for early
+     * exits while splitting or computing.
+     */
     @Override
-    protected boolean canCompute() {
-        // Have we already found an answer?
-        if (sharedResult.get() != null) {
-            tryComplete();
-            return false;
-        } else if (taskCanceled()) {
-            setLocalResult(getEmptyResult());
-            tryComplete();
-            return false;
+    public void compute() {
+        Spliterator<P_IN> rs = spliterator, ls;
+        long sizeEstimate = rs.estimateSize();
+        long sizeThreshold = getTargetSize(sizeEstimate);
+        boolean forkRight = false;
+        @SuppressWarnings("unchecked") K task = (K) this;
+        AtomicReference<R> sr = sharedResult;
+        R result;
+        while ((result = sr.get()) == null) {
+            if (task.taskCanceled()) {
+                result = task.getEmptyResult();
+                break;
+            }
+            if (sizeEstimate <= sizeThreshold || (ls = rs.trySplit()) == null) {
+                result = task.doLeaf();
+                break;
+            }
+            K leftChild, rightChild, taskToFork;
+            task.leftChild  = leftChild = task.makeChild(ls);
+            task.rightChild = rightChild = task.makeChild(rs);
+            task.setPendingCount(1);
+            if (forkRight) {
+                forkRight = false;
+                rs = ls;
+                task = leftChild;
+                taskToFork = rightChild;
+            }
+            else {
+                forkRight = true;
+                task = rightChild;
+                taskToFork = leftChild;
+            }
+            taskToFork.fork();
+            sizeEstimate = rs.estimateSize();
         }
-        else {
-            return true;
-        }
+        task.setLocalResult(result);
+        task.tryComplete();
     }
 
+
     /**
      * Declares that a globally valid result has been found.  If another task has
      * not already found the answer, the result is installed in
--- a/jdk/src/share/classes/java/util/stream/AbstractTask.java	Wed Jul 10 09:52:02 2013 +0200
+++ b/jdk/src/share/classes/java/util/stream/AbstractTask.java	Wed Jul 10 10:24:38 2013 +0200
@@ -102,7 +102,7 @@
     protected Spliterator<P_IN> spliterator;
 
     /** Target leaf size, common to all tasks in a computation */
-    protected final long targetSize;
+    protected long targetSize; // may be laziliy initialized
 
     /**
      * The left child.
@@ -134,7 +134,7 @@
         super(null);
         this.helper = helper;
         this.spliterator = spliterator;
-        this.targetSize = suggestTargetSize(spliterator.estimateSize());
+        this.targetSize = 0L;
     }
 
     /**
@@ -182,27 +182,13 @@
     }
 
     /**
-     * Returns a suggestion whether it is advisable to split the provided
-     * spliterator based on target size and other considerations, such as pool
-     * state.
-      *
-     * @return {@code true} if a split is advised otherwise {@code false}
+     * Returns the targetSize, initializing it via the supplied
+     * size estimate if not already initialized.
      */
-    public static boolean suggestSplit(Spliterator spliterator,
-                                       long targetSize) {
-        long remaining = spliterator.estimateSize();
-        return (remaining > targetSize);
-        // @@@ May additionally want to fold in pool characteristics such as surplus task count
-    }
-
-    /**
-     * Returns a suggestion whether it is adviseable to split this task based on
-     * target size and other considerations.
-      *
-     *  @return {@code true} if a split is advised otherwise {@code false}
-     */
-    public boolean suggestSplit() {
-        return suggestSplit(spliterator, targetSize);
+    protected final long getTargetSize(long sizeEstimate) {
+        long s;
+        return ((s = targetSize) != 0 ? s :
+                (targetSize = suggestTargetSize(sizeEstimate)));
     }
 
     /**
@@ -285,43 +271,46 @@
     }
 
     /**
-     * Decides whether or not to split a task further or compute it directly. If
-     * computing directly, call {@code doLeaf} and pass the result to
-     * {@code setRawResult}.  If splitting, set up the child-related fields,
-     * create the child tasks, fork the leftmost (prefix) child tasks, and
-     * compute the rightmost (remaining) child tasks.
+     * Decides whether or not to split a task further or compute it
+     * directly. If computing directly, calls {@code doLeaf} and pass
+     * the result to {@code setRawResult}. Otherwise splits off
+     * subtasks, forking one and continuing as the other.
      *
-     * <p>
-     * Computing will continue for rightmost tasks while a task can be computed
-     * as determined by {@link #canCompute()} and that task should and can be
-     * split into left and right tasks.
-     *
-     * <p>
-     * The rightmost tasks are computed in a loop rather than recursively to
-     * avoid potential stack overflows when computing with a right-balanced
-     * tree, such as that produced when splitting with a {@link Spliterator}
-     * created from an {@link java.util.Iterator}.
+     * <p> The method is structured to conserve resources across a
+     * range of uses.  The loop continues with one of the child tasks
+     * when split, to avoid deep recursion. To cope with spliterators
+     * that may be systematically biased toward left-heavy or
+     * right-heavy splits, we alternate which child is forked versus
+     * continued in the loop.
      */
     @Override
-    public final void compute() {
-        @SuppressWarnings("unchecked")
-        K task = (K) this;
-        while (task.canCompute()) {
-            Spliterator<P_IN> split;
-            if (!task.suggestSplit() || (split = task.spliterator.trySplit()) == null) {
-                task.setLocalResult(task.doLeaf());
-                task.tryComplete();
-                return;
+    public void compute() {
+        Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
+        long sizeEstimate = rs.estimateSize();
+        long sizeThreshold = getTargetSize(sizeEstimate);
+        boolean forkRight = false;
+        @SuppressWarnings("unchecked") K task = (K) this;
+        while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
+            K leftChild, rightChild, taskToFork;
+            task.leftChild  = leftChild = task.makeChild(ls);
+            task.rightChild = rightChild = task.makeChild(rs);
+            task.setPendingCount(1);
+            if (forkRight) {
+                forkRight = false;
+                rs = ls;
+                task = leftChild;
+                taskToFork = rightChild;
             }
             else {
-                K l = task.leftChild = task.makeChild(split);
-                K r = task.rightChild = task.makeChild(task.spliterator);
-                task.spliterator = null;
-                task.setPendingCount(1);
-                l.fork();
-                task = r;
+                forkRight = true;
+                task = rightChild;
+                taskToFork = leftChild;
             }
+            taskToFork.fork();
+            sizeEstimate = rs.estimateSize();
         }
+        task.setLocalResult(task.doLeaf());
+        task.tryComplete();
     }
 
     /**
@@ -339,21 +328,6 @@
     }
 
     /**
-     * Determines if the task can be computed.
-     *
-     * @implSpec The default always returns true
-     *
-     * @return {@code true} if this task can be computed to either calculate the
-     *         leaf via {@link #doLeaf()} or split, otherwise false if this task
-     *         cannot be computed, for example if this task has been canceled
-     *         and/or a result for the computation has been found by another
-     *         task.
-     */
-    protected boolean canCompute() {
-        return true;
-    }
-
-    /**
      * Returns whether this node is a "leftmost" node -- whether the path from
      * the root to this node involves only traversing leftmost child links.  For
      * a leaf node, this means it is the first leaf node in the encounter order.
--- a/jdk/src/share/classes/java/util/stream/ForEachOps.java	Wed Jul 10 09:52:02 2013 +0200
+++ b/jdk/src/share/classes/java/util/stream/ForEachOps.java	Wed Jul 10 10:24:38 2013 +0200
@@ -28,6 +28,7 @@
 import java.util.Spliterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountedCompleter;
+import java.util.concurrent.ForkJoinTask;
 import java.util.function.Consumer;
 import java.util.function.DoubleConsumer;
 import java.util.function.IntConsumer;
@@ -128,7 +129,7 @@
      *
      * @param <T> the output type of the stream pipeline
      */
-    private static abstract class ForEachOp<T>
+    static abstract class ForEachOp<T>
             implements TerminalOp<T, Void>, TerminalSink<T, Void> {
         private final boolean ordered;
 
@@ -169,7 +170,7 @@
         // Implementations
 
         /** Implementation class for reference streams */
-        private static class OfRef<T> extends ForEachOp<T> {
+        static final class OfRef<T> extends ForEachOp<T> {
             final Consumer<? super T> consumer;
 
             OfRef(Consumer<? super T> consumer, boolean ordered) {
@@ -184,7 +185,7 @@
         }
 
         /** Implementation class for {@code IntStream} */
-        private static class OfInt extends ForEachOp<Integer>
+        static final class OfInt extends ForEachOp<Integer>
                 implements Sink.OfInt {
             final IntConsumer consumer;
 
@@ -205,7 +206,7 @@
         }
 
         /** Implementation class for {@code LongStream} */
-        private static class OfLong extends ForEachOp<Long>
+        static final class OfLong extends ForEachOp<Long>
                 implements Sink.OfLong {
             final LongConsumer consumer;
 
@@ -226,7 +227,7 @@
         }
 
         /** Implementation class for {@code DoubleStream} */
-        private static class OfDouble extends ForEachOp<Double>
+        static final class OfDouble extends ForEachOp<Double>
                 implements Sink.OfDouble {
             final DoubleConsumer consumer;
 
@@ -248,20 +249,20 @@
     }
 
     /** A {@code ForkJoinTask} for performing a parallel for-each operation */
-    private static class ForEachTask<S, T> extends CountedCompleter<Void> {
+    static final class ForEachTask<S, T> extends CountedCompleter<Void> {
         private Spliterator<S> spliterator;
         private final Sink<S> sink;
         private final PipelineHelper<T> helper;
-        private final long targetSize;
+        private long targetSize;
 
         ForEachTask(PipelineHelper<T> helper,
                     Spliterator<S> spliterator,
                     Sink<S> sink) {
             super(null);
+            this.sink = sink;
+            this.helper = helper;
             this.spliterator = spliterator;
-            this.sink = sink;
-            this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize());
-            this.helper = helper;
+            this.targetSize = 0L;
         }
 
         ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator) {
@@ -272,28 +273,40 @@
             this.helper = parent.helper;
         }
 
+        // Similar to AbstractTask but doesn't need to track child tasks
         public void compute() {
+            Spliterator<S> rightSplit = spliterator, leftSplit;
+            long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
+            if ((sizeThreshold = targetSize) == 0L)
+                targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
             boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
-            while (true) {
-                if (isShortCircuit && sink.cancellationRequested()) {
-                    propagateCompletion();
-                    spliterator = null;
-                    return;
+            boolean forkRight = false;
+            Sink<S> taskSink = sink;
+            ForEachTask<S, T> task = this;
+            while (!isShortCircuit || !taskSink.cancellationRequested()) {
+                if (sizeEstimate <= sizeThreshold ||
+                    (leftSplit = rightSplit.trySplit()) == null) {
+                    task.helper.copyInto(taskSink, rightSplit);
+                    break;
                 }
-
-                Spliterator<S> split;
-                if (!AbstractTask.suggestSplit(spliterator, targetSize)
-                    || (split = spliterator.trySplit()) == null) {
-                    helper.copyInto(sink, spliterator);
-                    propagateCompletion();
-                    spliterator = null;
-                    return;
+                ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
+                task.addToPendingCount(1);
+                ForEachTask<S, T> taskToFork;
+                if (forkRight) {
+                    forkRight = false;
+                    rightSplit = leftSplit;
+                    taskToFork = task;
+                    task = leftTask;
                 }
                 else {
-                    addToPendingCount(1);
-                    new ForEachTask<>(this, split).fork();
+                    forkRight = true;
+                    taskToFork = leftTask;
                 }
+                taskToFork.fork();
+                sizeEstimate = rightSplit.estimateSize();
             }
+            task.spliterator = null;
+            task.propagateCompletion();
         }
     }
 
@@ -301,7 +314,7 @@
      * A {@code ForkJoinTask} for performing a parallel for-each operation
      * which visits the elements in encounter order
      */
-    private static class ForEachOrderedTask<S, T> extends CountedCompleter<Void> {
+    static final class ForEachOrderedTask<S, T> extends CountedCompleter<Void> {
         private final PipelineHelper<T> helper;
         private Spliterator<S> spliterator;
         private final long targetSize;
@@ -343,39 +356,49 @@
         }
 
         private static <S, T> void doCompute(ForEachOrderedTask<S, T> task) {
-            while (true) {
-                Spliterator<S> split;
-                if (!AbstractTask.suggestSplit(task.spliterator, task.targetSize)
-                    || (split = task.spliterator.trySplit()) == null) {
-                    if (task.getPendingCount() == 0) {
-                        task.helper.wrapAndCopyInto(task.action, task.spliterator);
-                    }
-                    else {
-                        Node.Builder<T> nb = task.helper.makeNodeBuilder(
-                                task.helper.exactOutputSizeIfKnown(task.spliterator),
-                                size -> (T[]) new Object[size]);
-                        task.node = task.helper.wrapAndCopyInto(nb, task.spliterator).build();
-                    }
-                    task.tryComplete();
-                    return;
+            Spliterator<S> rightSplit = task.spliterator, leftSplit;
+            long sizeThreshold = task.targetSize;
+            boolean forkRight = false;
+            while (rightSplit.estimateSize() > sizeThreshold &&
+                   (leftSplit = rightSplit.trySplit()) != null) {
+                ForEachOrderedTask<S, T> leftChild =
+                    new ForEachOrderedTask<>(task, leftSplit, task.leftPredecessor);
+                ForEachOrderedTask<S, T> rightChild =
+                    new ForEachOrderedTask<>(task, rightSplit, leftChild);
+                task.completionMap.put(leftChild, rightChild);
+                task.addToPendingCount(1); // forking
+                rightChild.addToPendingCount(1); // right pending on left child
+                if (task.leftPredecessor != null) {
+                    leftChild.addToPendingCount(1); // left pending on previous subtree, except left spine
+                    if (task.completionMap.replace(task.leftPredecessor, task, leftChild))
+                        task.addToPendingCount(-1); // transfer my "right child" count to my left child
+                    else
+                        leftChild.addToPendingCount(-1); // left child is ready to go when ready
+                }
+                ForEachOrderedTask<S, T> taskToFork;
+                if (forkRight) {
+                    forkRight = false;
+                    rightSplit = leftSplit;
+                    task = leftChild;
+                    taskToFork = rightChild;
                 }
                 else {
-                    ForEachOrderedTask<S, T> leftChild = new ForEachOrderedTask<>(task, split, task.leftPredecessor);
-                    ForEachOrderedTask<S, T> rightChild = new ForEachOrderedTask<>(task, task.spliterator, leftChild);
-                    task.completionMap.put(leftChild, rightChild);
-                    task.addToPendingCount(1); // forking
-                    rightChild.addToPendingCount(1); // right pending on left child
-                    if (task.leftPredecessor != null) {
-                        leftChild.addToPendingCount(1); // left pending on previous subtree, except left spine
-                        if (task.completionMap.replace(task.leftPredecessor, task, leftChild))
-                            task.addToPendingCount(-1);      // transfer my "right child" count to my left child
-                        else
-                            leftChild.addToPendingCount(-1); // left child is ready to go when ready
-                    }
-                    leftChild.fork();
+                    forkRight = true;
                     task = rightChild;
+                    taskToFork = leftChild;
                 }
+                taskToFork.fork();
             }
+            if (task.getPendingCount() == 0) {
+                task.helper.wrapAndCopyInto(task.action, rightSplit);
+            }
+            else {
+                Node.Builder<T> nb = task.helper.makeNodeBuilder(
+                  task.helper.exactOutputSizeIfKnown(rightSplit),
+                  size -> (T[]) new Object[size]);
+                task.node = task.helper.wrapAndCopyInto(nb, rightSplit).build();
+            }
+            task.tryComplete();
         }
 
         @Override
--- a/jdk/src/share/classes/java/util/stream/Nodes.java	Wed Jul 10 09:52:02 2013 +0200
+++ b/jdk/src/share/classes/java/util/stream/Nodes.java	Wed Jul 10 10:24:38 2013 +0200
@@ -1829,25 +1829,20 @@
         @Override
         public void compute() {
             SizedCollectorTask<P_IN, P_OUT, T_SINK, K> task = this;
-            while (true) {
-                Spliterator<P_IN> leftSplit;
-                if (!AbstractTask.suggestSplit(task.spliterator, task.targetSize)
-                    || ((leftSplit = task.spliterator.trySplit()) == null)) {
-                    if (task.offset + task.length >= MAX_ARRAY_SIZE)
-                        throw new IllegalArgumentException("Stream size exceeds max array size");
-                    T_SINK sink = (T_SINK) task;
-                    task.helper.wrapAndCopyInto(sink, task.spliterator);
-                    task.propagateCompletion();
-                    return;
-                }
-                else {
-                    task.setPendingCount(1);
-                    long leftSplitSize = leftSplit.estimateSize();
-                    task.makeChild(leftSplit, task.offset, leftSplitSize).fork();
-                    task = task.makeChild(task.spliterator, task.offset + leftSplitSize,
-                                          task.length - leftSplitSize);
-                }
+            Spliterator<P_IN> rightSplit = spliterator, leftSplit;
+            while (rightSplit.estimateSize() > task.targetSize &&
+                   (leftSplit = rightSplit.trySplit()) != null) {
+                task.setPendingCount(1);
+                long leftSplitSize = leftSplit.estimateSize();
+                task.makeChild(leftSplit, task.offset, leftSplitSize).fork();
+                task = task.makeChild(rightSplit, task.offset + leftSplitSize,
+                                      task.length - leftSplitSize);
             }
+            if (task.offset + task.length >= MAX_ARRAY_SIZE)
+                throw new IllegalArgumentException("Stream size exceeds max array size");
+            T_SINK sink = (T_SINK) task;
+            task.helper.wrapAndCopyInto(sink, rightSplit);
+            task.propagateCompletion();
         }
 
         abstract K makeChild(Spliterator<P_IN> spliterator, long offset, long size);