8029452: Fork/Join task ForEachOps.ForEachOrderedTask clarifications and minor improvements
authorpsandoz
Thu, 16 Jan 2014 18:20:31 +0100
changeset 22289 bb9c71b84919
parent 22288 81efc55fac99
child 22290 6d4551cf4b3e
child 22291 6106c1f013f1
8029452: Fork/Join task ForEachOps.ForEachOrderedTask clarifications and minor improvements Reviewed-by: mduigou, briangoetz
jdk/src/share/classes/java/util/stream/ForEachOps.java
--- a/jdk/src/share/classes/java/util/stream/ForEachOps.java	Thu Jan 16 10:33:07 2014 -0500
+++ b/jdk/src/share/classes/java/util/stream/ForEachOps.java	Thu Jan 16 18:20:31 2014 +0100
@@ -317,12 +317,55 @@
      */
     @SuppressWarnings("serial")
     static final class ForEachOrderedTask<S, T> extends CountedCompleter<Void> {
+        /*
+         * Our goal is to ensure that the elements associated with a task are
+         * processed according to an in-order traversal of the computation tree.
+         * We use completion counts for representing these dependencies, so that
+         * a task does not complete until all the tasks preceding it in this
+         * order complete.  We use the "completion map" to associate the next
+         * task in this order for any left child.  We increase the pending count
+         * of any node on the right side of such a mapping by one to indicate
+         * its dependency, and when a node on the left side of such a mapping
+         * completes, it decrements the pending count of its corresponding right
+         * side.  As the computation tree is expanded by splitting, we must
+         * atomically update the mappings to maintain the invariant that the
+         * completion map maps left children to the next node in the in-order
+         * traversal.
+         *
+         * Take, for example, the following computation tree of tasks:
+         *
+         *       a
+         *      / \
+         *     b   c
+         *    / \ / \
+         *   d  e f  g
+         *
+         * The complete map will contain (not necessarily all at the same time)
+         * the following associations:
+         *
+         *   d -> e
+         *   b -> f
+         *   f -> g
+         *
+         * Tasks e, f, g will have their pending counts increased by 1.
+         *
+         * The following relationships hold:
+         *
+         *   - completion of d "happens-before" e;
+         *   - completion of d and e "happens-before b;
+         *   - completion of b "happens-before" f; and
+         *   - completion of f "happens-before" g
+         *
+         * Thus overall the "happens-before" relationship holds for the
+         * reporting of elements, covered by tasks d, e, f and g, as specified
+         * by the forEachOrdered operation.
+         */
+
         private final PipelineHelper<T> helper;
         private Spliterator<S> spliterator;
         private final long targetSize;
         private final ConcurrentHashMap<ForEachOrderedTask<S, T>, ForEachOrderedTask<S, T>> completionMap;
         private final Sink<T> action;
-        private final Object lock;
         private final ForEachOrderedTask<S, T> leftPredecessor;
         private Node<T> node;
 
@@ -333,9 +376,9 @@
             this.helper = helper;
             this.spliterator = spliterator;
             this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize());
-            this.completionMap = new ConcurrentHashMap<>();
+            // Size map to avoid concurrent re-sizes
+            this.completionMap = new ConcurrentHashMap<>(Math.max(16, AbstractTask.LEAF_TARGET << 1));
             this.action = action;
-            this.lock = new Object();
             this.leftPredecessor = null;
         }
 
@@ -348,7 +391,6 @@
             this.targetSize = parent.targetSize;
             this.completionMap = parent.completionMap;
             this.action = parent.action;
-            this.lock = parent.lock;
             this.leftPredecessor = leftPredecessor;
         }
 
@@ -367,16 +409,42 @@
                     new ForEachOrderedTask<>(task, leftSplit, task.leftPredecessor);
                 ForEachOrderedTask<S, T> rightChild =
                     new ForEachOrderedTask<>(task, rightSplit, leftChild);
+
+                // Fork the parent task
+                // Completion of the left and right children "happens-before"
+                // completion of the parent
+                task.addToPendingCount(1);
+                // Completion of the left child "happens-before" completion of
+                // the right child
+                rightChild.addToPendingCount(1);
                 task.completionMap.put(leftChild, rightChild);
-                task.addToPendingCount(1); // forking
-                rightChild.addToPendingCount(1); // right pending on left child
+
+                // If task is not on the left spine
                 if (task.leftPredecessor != null) {
-                    leftChild.addToPendingCount(1); // left pending on previous subtree, except left spine
-                    if (task.completionMap.replace(task.leftPredecessor, task, leftChild))
-                        task.addToPendingCount(-1); // transfer my "right child" count to my left child
-                    else
-                        leftChild.addToPendingCount(-1); // left child is ready to go when ready
+                    /*
+                     * Completion of left-predecessor, or left subtree,
+                     * "happens-before" completion of left-most leaf node of
+                     * right subtree.
+                     * The left child's pending count needs to be updated before
+                     * it is associated in the completion map, otherwise the
+                     * left child can complete prematurely and violate the
+                     * "happens-before" constraint.
+                     */
+                    leftChild.addToPendingCount(1);
+                    // Update association of left-predecessor to left-most
+                    // leaf node of right subtree
+                    if (task.completionMap.replace(task.leftPredecessor, task, leftChild)) {
+                        // If replaced, adjust the pending count of the parent
+                        // to complete when its children complete
+                        task.addToPendingCount(-1);
+                    } else {
+                        // Left-predecessor has already completed, parent's
+                        // pending count is adjusted by left-predecessor;
+                        // left child is ready to complete
+                        leftChild.addToPendingCount(-1);
+                    }
                 }
+
                 ForEachOrderedTask<S, T> taskToFork;
                 if (forkRight) {
                     forkRight = false;
@@ -391,31 +459,47 @@
                 }
                 taskToFork.fork();
             }
-            if (task.getPendingCount() == 0) {
-                task.helper.wrapAndCopyInto(task.action, rightSplit);
-            }
-            else {
+
+            /*
+             * Task's pending count is either 0 or 1.  If 1 then the completion
+             * map will contain a value that is task, and two calls to
+             * tryComplete are required for completion, one below and one
+             * triggered by the completion of task's left-predecessor in
+             * onCompletion.  Therefore there is no data race within the if
+             * block.
+             */
+            if (task.getPendingCount() > 0) {
+                // Cannot complete just yet so buffer elements into a Node
+                // for use when completion occurs
                 Node.Builder<T> nb = task.helper.makeNodeBuilder(
-                  task.helper.exactOutputSizeIfKnown(rightSplit),
-                  size -> (T[]) new Object[size]);
+                        task.helper.exactOutputSizeIfKnown(rightSplit),
+                        size -> (T[]) new Object[size]);
                 task.node = task.helper.wrapAndCopyInto(nb, rightSplit).build();
+                task.spliterator = null;
             }
             task.tryComplete();
         }
 
         @Override
         public void onCompletion(CountedCompleter<?> caller) {
-            spliterator = null;
             if (node != null) {
-                // Dump any data from this leaf into the sink
-                synchronized (lock) {
-                    node.forEach(action);
-                }
+                // Dump buffered elements from this leaf into the sink
+                node.forEach(action);
                 node = null;
             }
-            ForEachOrderedTask<S, T> victim = completionMap.remove(this);
-            if (victim != null)
-                victim.tryComplete();
+            else if (spliterator != null) {
+                // Dump elements output from this leaf's pipeline into the sink
+                helper.wrapAndCopyInto(action, spliterator);
+                spliterator = null;
+            }
+
+            // The completion of this task *and* the dumping of elements
+            // "happens-before" completion of the associated left-most leaf task
+            // of right subtree (if any, which can be this task's right sibling)
+            //
+            ForEachOrderedTask<S, T> leftDescendant = completionMap.remove(this);
+            if (leftDescendant != null)
+                leftDescendant.tryComplete();
         }
     }
 }