diff -r 836adbf7a2cd -r 3317bb8137f4 jdk/src/java.base/share/classes/java/util/stream/AbstractTask.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/jdk/src/java.base/share/classes/java/util/stream/AbstractTask.java Sun Aug 17 15:54:13 2014 +0100 @@ -0,0 +1,352 @@ +/* + * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.util.stream; + +import java.util.Spliterator; +import java.util.concurrent.CountedCompleter; +import java.util.concurrent.ForkJoinPool; + +/** + * Abstract base class for most fork-join tasks used to implement stream ops. + * Manages splitting logic, tracking of child tasks, and intermediate results. + * Each task is associated with a {@link Spliterator} that describes the portion + * of the input associated with the subtree rooted at this task. + * Tasks may be leaf nodes (which will traverse the elements of + * the {@code Spliterator}) or internal nodes (which split the + * {@code Spliterator} into multiple child tasks). + * + * @implNote + *

This class is based on {@link CountedCompleter}, a form of fork-join task + * where each task has a semaphore-like count of uncompleted children, and the + * task is implicitly completed and notified when its last child completes. + * Internal node tasks will likely override the {@code onCompletion} method from + * {@code CountedCompleter} to merge the results from child tasks into the + * current task's result. + * + *

Splitting and setting up the child task links is done by {@code compute()} + * for internal nodes. At {@code compute()} time for leaf nodes, it is + * guaranteed that the parent's child-related fields (including sibling links + * for the parent's children) will be set up for all children. + * + *

For example, a task that performs a reduce would override {@code doLeaf()} + * to perform a reduction on that leaf node's chunk using the + * {@code Spliterator}, and override {@code onCompletion()} to merge the results + * of the child tasks for internal nodes: + * + *

{@code
+ *     protected S doLeaf() {
+ *         spliterator.forEach(...);
+ *         return localReductionResult;
+ *     }
+ *
+ *     public void onCompletion(CountedCompleter caller) {
+ *         if (!isLeaf()) {
+ *             ReduceTask child = children;
+ *             R result = child.getLocalResult();
+ *             child = child.nextSibling;
+ *             for (; child != null; child = child.nextSibling)
+ *                 result = combine(result, child.getLocalResult());
+ *             setLocalResult(result);
+ *         }
+ *     }
+ * }
+ * + *

Serialization is not supported as there is no intention to serialize + * tasks managed by stream ops. + * + * @param Type of elements input to the pipeline + * @param Type of elements output from the pipeline + * @param Type of intermediate result, which may be different from operation + * result type + * @param Type of parent, child and sibling tasks + * @since 1.8 + */ +@SuppressWarnings("serial") +abstract class AbstractTask> + extends CountedCompleter { + + /** + * Default target factor of leaf tasks for parallel decomposition. + * To allow load balancing, we over-partition, currently to approximately + * four tasks per processor, which enables others to help out + * if leaf tasks are uneven or some processors are otherwise busy. + */ + static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2; + + /** The pipeline helper, common to all tasks in a computation */ + protected final PipelineHelper helper; + + /** + * The spliterator for the portion of the input associated with the subtree + * rooted at this task + */ + protected Spliterator spliterator; + + /** Target leaf size, common to all tasks in a computation */ + protected long targetSize; // may be laziliy initialized + + /** + * The left child. + * null if no children + * if non-null rightChild is non-null + */ + protected K leftChild; + + /** + * The right child. + * null if no children + * if non-null leftChild is non-null + */ + protected K rightChild; + + /** The result of this node, if completed */ + private R localResult; + + /** + * Constructor for root nodes. + * + * @param helper The {@code PipelineHelper} describing the stream pipeline + * up to this operation + * @param spliterator The {@code Spliterator} describing the source for this + * pipeline + */ + protected AbstractTask(PipelineHelper helper, + Spliterator spliterator) { + super(null); + this.helper = helper; + this.spliterator = spliterator; + this.targetSize = 0L; + } + + /** + * Constructor for non-root nodes. + * + * @param parent this node's parent task + * @param spliterator {@code Spliterator} describing the subtree rooted at + * this node, obtained by splitting the parent {@code Spliterator} + */ + protected AbstractTask(K parent, + Spliterator spliterator) { + super(parent); + this.spliterator = spliterator; + this.helper = parent.helper; + this.targetSize = parent.targetSize; + } + + /** + * Constructs a new node of type T whose parent is the receiver; must call + * the AbstractTask(T, Spliterator) constructor with the receiver and the + * provided Spliterator. + * + * @param spliterator {@code Spliterator} describing the subtree rooted at + * this node, obtained by splitting the parent {@code Spliterator} + * @return newly constructed child node + */ + protected abstract K makeChild(Spliterator spliterator); + + /** + * Computes the result associated with a leaf node. Will be called by + * {@code compute()} and the result passed to @{code setLocalResult()} + * + * @return the computed result of a leaf node + */ + protected abstract R doLeaf(); + + /** + * Returns a suggested target leaf size based on the initial size estimate. + * + * @return suggested target leaf size + */ + public static long suggestTargetSize(long sizeEstimate) { + long est = sizeEstimate / LEAF_TARGET; + return est > 0L ? est : 1L; + } + + /** + * Returns the targetSize, initializing it via the supplied + * size estimate if not already initialized. + */ + protected final long getTargetSize(long sizeEstimate) { + long s; + return ((s = targetSize) != 0 ? s : + (targetSize = suggestTargetSize(sizeEstimate))); + } + + /** + * Returns the local result, if any. Subclasses should use + * {@link #setLocalResult(Object)} and {@link #getLocalResult()} to manage + * results. This returns the local result so that calls from within the + * fork-join framework will return the correct result. + * + * @return local result for this node previously stored with + * {@link #setLocalResult} + */ + @Override + public R getRawResult() { + return localResult; + } + + /** + * Does nothing; instead, subclasses should use + * {@link #setLocalResult(Object)}} to manage results. + * + * @param result must be null, or an exception is thrown (this is a safety + * tripwire to detect when {@code setRawResult()} is being used + * instead of {@code setLocalResult()} + */ + @Override + protected void setRawResult(R result) { + if (result != null) + throw new IllegalStateException(); + } + + /** + * Retrieves a result previously stored with {@link #setLocalResult} + * + * @return local result for this node previously stored with + * {@link #setLocalResult} + */ + protected R getLocalResult() { + return localResult; + } + + /** + * Associates the result with the task, can be retrieved with + * {@link #getLocalResult} + * + * @param localResult local result for this node + */ + protected void setLocalResult(R localResult) { + this.localResult = localResult; + } + + /** + * Indicates whether this task is a leaf node. (Only valid after + * {@link #compute} has been called on this node). If the node is not a + * leaf node, then children will be non-null and numChildren will be + * positive. + * + * @return {@code true} if this task is a leaf node + */ + protected boolean isLeaf() { + return leftChild == null; + } + + /** + * Indicates whether this task is the root node + * + * @return {@code true} if this task is the root node. + */ + protected boolean isRoot() { + return getParent() == null; + } + + /** + * Returns the parent of this task, or null if this task is the root + * + * @return the parent of this task, or null if this task is the root + */ + @SuppressWarnings("unchecked") + protected K getParent() { + return (K) getCompleter(); + } + + /** + * Decides whether or not to split a task further or compute it + * directly. If computing directly, calls {@code doLeaf} and pass + * the result to {@code setRawResult}. Otherwise splits off + * subtasks, forking one and continuing as the other. + * + *

The method is structured to conserve resources across a + * range of uses. The loop continues with one of the child tasks + * when split, to avoid deep recursion. To cope with spliterators + * that may be systematically biased toward left-heavy or + * right-heavy splits, we alternate which child is forked versus + * continued in the loop. + */ + @Override + public void compute() { + Spliterator rs = spliterator, ls; // right, left spliterators + long sizeEstimate = rs.estimateSize(); + long sizeThreshold = getTargetSize(sizeEstimate); + boolean forkRight = false; + @SuppressWarnings("unchecked") K task = (K) this; + while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) { + K leftChild, rightChild, taskToFork; + task.leftChild = leftChild = task.makeChild(ls); + task.rightChild = rightChild = task.makeChild(rs); + task.setPendingCount(1); + if (forkRight) { + forkRight = false; + rs = ls; + task = leftChild; + taskToFork = rightChild; + } + else { + forkRight = true; + task = rightChild; + taskToFork = leftChild; + } + taskToFork.fork(); + sizeEstimate = rs.estimateSize(); + } + task.setLocalResult(task.doLeaf()); + task.tryComplete(); + } + + /** + * {@inheritDoc} + * + * @implNote + * Clears spliterator and children fields. Overriders MUST call + * {@code super.onCompletion} as the last thing they do if they want these + * cleared. + */ + @Override + public void onCompletion(CountedCompleter caller) { + spliterator = null; + leftChild = rightChild = null; + } + + /** + * Returns whether this node is a "leftmost" node -- whether the path from + * the root to this node involves only traversing leftmost child links. For + * a leaf node, this means it is the first leaf node in the encounter order. + * + * @return {@code true} if this node is a "leftmost" node + */ + protected boolean isLeftmostNode() { + @SuppressWarnings("unchecked") + K node = (K) this; + while (node != null) { + K parent = node.getParent(); + if (parent != null && parent.leftChild != node) + return false; + node = parent; + } + return true; + } +}