# HG changeset patch # User psandoz # Date 1389892831 -3600 # Node ID bb9c71b84919bcc33ce375d9d65179938471b20d # Parent 81efc55fac99569e8073ba797251fae59f54b866 8029452: Fork/Join task ForEachOps.ForEachOrderedTask clarifications and minor improvements Reviewed-by: mduigou, briangoetz diff -r 81efc55fac99 -r bb9c71b84919 jdk/src/share/classes/java/util/stream/ForEachOps.java --- a/jdk/src/share/classes/java/util/stream/ForEachOps.java Thu Jan 16 10:33:07 2014 -0500 +++ b/jdk/src/share/classes/java/util/stream/ForEachOps.java Thu Jan 16 18:20:31 2014 +0100 @@ -317,12 +317,55 @@ */ @SuppressWarnings("serial") static final class ForEachOrderedTask extends CountedCompleter { + /* + * Our goal is to ensure that the elements associated with a task are + * processed according to an in-order traversal of the computation tree. + * We use completion counts for representing these dependencies, so that + * a task does not complete until all the tasks preceding it in this + * order complete. We use the "completion map" to associate the next + * task in this order for any left child. We increase the pending count + * of any node on the right side of such a mapping by one to indicate + * its dependency, and when a node on the left side of such a mapping + * completes, it decrements the pending count of its corresponding right + * side. As the computation tree is expanded by splitting, we must + * atomically update the mappings to maintain the invariant that the + * completion map maps left children to the next node in the in-order + * traversal. + * + * Take, for example, the following computation tree of tasks: + * + * a + * / \ + * b c + * / \ / \ + * d e f g + * + * The complete map will contain (not necessarily all at the same time) + * the following associations: + * + * d -> e + * b -> f + * f -> g + * + * Tasks e, f, g will have their pending counts increased by 1. + * + * The following relationships hold: + * + * - completion of d "happens-before" e; + * - completion of d and e "happens-before b; + * - completion of b "happens-before" f; and + * - completion of f "happens-before" g + * + * Thus overall the "happens-before" relationship holds for the + * reporting of elements, covered by tasks d, e, f and g, as specified + * by the forEachOrdered operation. + */ + private final PipelineHelper helper; private Spliterator spliterator; private final long targetSize; private final ConcurrentHashMap, ForEachOrderedTask> completionMap; private final Sink action; - private final Object lock; private final ForEachOrderedTask leftPredecessor; private Node node; @@ -333,9 +376,9 @@ this.helper = helper; this.spliterator = spliterator; this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize()); - this.completionMap = new ConcurrentHashMap<>(); + // Size map to avoid concurrent re-sizes + this.completionMap = new ConcurrentHashMap<>(Math.max(16, AbstractTask.LEAF_TARGET << 1)); this.action = action; - this.lock = new Object(); this.leftPredecessor = null; } @@ -348,7 +391,6 @@ this.targetSize = parent.targetSize; this.completionMap = parent.completionMap; this.action = parent.action; - this.lock = parent.lock; this.leftPredecessor = leftPredecessor; } @@ -367,16 +409,42 @@ new ForEachOrderedTask<>(task, leftSplit, task.leftPredecessor); ForEachOrderedTask rightChild = new ForEachOrderedTask<>(task, rightSplit, leftChild); + + // Fork the parent task + // Completion of the left and right children "happens-before" + // completion of the parent + task.addToPendingCount(1); + // Completion of the left child "happens-before" completion of + // the right child + rightChild.addToPendingCount(1); task.completionMap.put(leftChild, rightChild); - task.addToPendingCount(1); // forking - rightChild.addToPendingCount(1); // right pending on left child + + // If task is not on the left spine if (task.leftPredecessor != null) { - leftChild.addToPendingCount(1); // left pending on previous subtree, except left spine - if (task.completionMap.replace(task.leftPredecessor, task, leftChild)) - task.addToPendingCount(-1); // transfer my "right child" count to my left child - else - leftChild.addToPendingCount(-1); // left child is ready to go when ready + /* + * Completion of left-predecessor, or left subtree, + * "happens-before" completion of left-most leaf node of + * right subtree. + * The left child's pending count needs to be updated before + * it is associated in the completion map, otherwise the + * left child can complete prematurely and violate the + * "happens-before" constraint. + */ + leftChild.addToPendingCount(1); + // Update association of left-predecessor to left-most + // leaf node of right subtree + if (task.completionMap.replace(task.leftPredecessor, task, leftChild)) { + // If replaced, adjust the pending count of the parent + // to complete when its children complete + task.addToPendingCount(-1); + } else { + // Left-predecessor has already completed, parent's + // pending count is adjusted by left-predecessor; + // left child is ready to complete + leftChild.addToPendingCount(-1); + } } + ForEachOrderedTask taskToFork; if (forkRight) { forkRight = false; @@ -391,31 +459,47 @@ } taskToFork.fork(); } - if (task.getPendingCount() == 0) { - task.helper.wrapAndCopyInto(task.action, rightSplit); - } - else { + + /* + * Task's pending count is either 0 or 1. If 1 then the completion + * map will contain a value that is task, and two calls to + * tryComplete are required for completion, one below and one + * triggered by the completion of task's left-predecessor in + * onCompletion. Therefore there is no data race within the if + * block. + */ + if (task.getPendingCount() > 0) { + // Cannot complete just yet so buffer elements into a Node + // for use when completion occurs Node.Builder nb = task.helper.makeNodeBuilder( - task.helper.exactOutputSizeIfKnown(rightSplit), - size -> (T[]) new Object[size]); + task.helper.exactOutputSizeIfKnown(rightSplit), + size -> (T[]) new Object[size]); task.node = task.helper.wrapAndCopyInto(nb, rightSplit).build(); + task.spliterator = null; } task.tryComplete(); } @Override public void onCompletion(CountedCompleter caller) { - spliterator = null; if (node != null) { - // Dump any data from this leaf into the sink - synchronized (lock) { - node.forEach(action); - } + // Dump buffered elements from this leaf into the sink + node.forEach(action); node = null; } - ForEachOrderedTask victim = completionMap.remove(this); - if (victim != null) - victim.tryComplete(); + else if (spliterator != null) { + // Dump elements output from this leaf's pipeline into the sink + helper.wrapAndCopyInto(action, spliterator); + spliterator = null; + } + + // The completion of this task *and* the dumping of elements + // "happens-before" completion of the associated left-most leaf task + // of right subtree (if any, which can be this task's right sibling) + // + ForEachOrderedTask leftDescendant = completionMap.remove(this); + if (leftDescendant != null) + leftDescendant.tryComplete(); } } }