--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/java.base/share/classes/java/util/stream/AbstractTask.java Sun Aug 17 15:54:13 2014 +0100
@@ -0,0 +1,352 @@
+/*
+ * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util.stream;
+
+import java.util.Spliterator;
+import java.util.concurrent.CountedCompleter;
+import java.util.concurrent.ForkJoinPool;
+
+/**
+ * Abstract base class for most fork-join tasks used to implement stream ops.
+ * Manages splitting logic, tracking of child tasks, and intermediate results.
+ * Each task is associated with a {@link Spliterator} that describes the portion
+ * of the input associated with the subtree rooted at this task.
+ * Tasks may be leaf nodes (which will traverse the elements of
+ * the {@code Spliterator}) or internal nodes (which split the
+ * {@code Spliterator} into multiple child tasks).
+ *
+ * @implNote
+ * <p>This class is based on {@link CountedCompleter}, a form of fork-join task
+ * where each task has a semaphore-like count of uncompleted children, and the
+ * task is implicitly completed and notified when its last child completes.
+ * Internal node tasks will likely override the {@code onCompletion} method from
+ * {@code CountedCompleter} to merge the results from child tasks into the
+ * current task's result.
+ *
+ * <p>Splitting and setting up the child task links is done by {@code compute()}
+ * for internal nodes. At {@code compute()} time for leaf nodes, it is
+ * guaranteed that the parent's child-related fields (including sibling links
+ * for the parent's children) will be set up for all children.
+ *
+ * <p>For example, a task that performs a reduce would override {@code doLeaf()}
+ * to perform a reduction on that leaf node's chunk using the
+ * {@code Spliterator}, and override {@code onCompletion()} to merge the results
+ * of the child tasks for internal nodes:
+ *
+ * <pre>{@code
+ * protected S doLeaf() {
+ * spliterator.forEach(...);
+ * return localReductionResult;
+ * }
+ *
+ * public void onCompletion(CountedCompleter caller) {
+ * if (!isLeaf()) {
+ * ReduceTask<P_IN, P_OUT, T, R> child = children;
+ * R result = child.getLocalResult();
+ * child = child.nextSibling;
+ * for (; child != null; child = child.nextSibling)
+ * result = combine(result, child.getLocalResult());
+ * setLocalResult(result);
+ * }
+ * }
+ * }</pre>
+ *
+ * <p>Serialization is not supported as there is no intention to serialize
+ * tasks managed by stream ops.
+ *
+ * @param <P_IN> Type of elements input to the pipeline
+ * @param <P_OUT> Type of elements output from the pipeline
+ * @param <R> Type of intermediate result, which may be different from operation
+ * result type
+ * @param <K> Type of parent, child and sibling tasks
+ * @since 1.8
+ */
+@SuppressWarnings("serial")
+abstract class AbstractTask<P_IN, P_OUT, R,
+ K extends AbstractTask<P_IN, P_OUT, R, K>>
+ extends CountedCompleter<R> {
+
+ /**
+ * Default target factor of leaf tasks for parallel decomposition.
+ * To allow load balancing, we over-partition, currently to approximately
+ * four tasks per processor, which enables others to help out
+ * if leaf tasks are uneven or some processors are otherwise busy.
+ */
+ static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;
+
+ /** The pipeline helper, common to all tasks in a computation */
+ protected final PipelineHelper<P_OUT> helper;
+
+ /**
+ * The spliterator for the portion of the input associated with the subtree
+ * rooted at this task
+ */
+ protected Spliterator<P_IN> spliterator;
+
+ /** Target leaf size, common to all tasks in a computation */
+ protected long targetSize; // may be laziliy initialized
+
+ /**
+ * The left child.
+ * null if no children
+ * if non-null rightChild is non-null
+ */
+ protected K leftChild;
+
+ /**
+ * The right child.
+ * null if no children
+ * if non-null leftChild is non-null
+ */
+ protected K rightChild;
+
+ /** The result of this node, if completed */
+ private R localResult;
+
+ /**
+ * Constructor for root nodes.
+ *
+ * @param helper The {@code PipelineHelper} describing the stream pipeline
+ * up to this operation
+ * @param spliterator The {@code Spliterator} describing the source for this
+ * pipeline
+ */
+ protected AbstractTask(PipelineHelper<P_OUT> helper,
+ Spliterator<P_IN> spliterator) {
+ super(null);
+ this.helper = helper;
+ this.spliterator = spliterator;
+ this.targetSize = 0L;
+ }
+
+ /**
+ * Constructor for non-root nodes.
+ *
+ * @param parent this node's parent task
+ * @param spliterator {@code Spliterator} describing the subtree rooted at
+ * this node, obtained by splitting the parent {@code Spliterator}
+ */
+ protected AbstractTask(K parent,
+ Spliterator<P_IN> spliterator) {
+ super(parent);
+ this.spliterator = spliterator;
+ this.helper = parent.helper;
+ this.targetSize = parent.targetSize;
+ }
+
+ /**
+ * Constructs a new node of type T whose parent is the receiver; must call
+ * the AbstractTask(T, Spliterator) constructor with the receiver and the
+ * provided Spliterator.
+ *
+ * @param spliterator {@code Spliterator} describing the subtree rooted at
+ * this node, obtained by splitting the parent {@code Spliterator}
+ * @return newly constructed child node
+ */
+ protected abstract K makeChild(Spliterator<P_IN> spliterator);
+
+ /**
+ * Computes the result associated with a leaf node. Will be called by
+ * {@code compute()} and the result passed to @{code setLocalResult()}
+ *
+ * @return the computed result of a leaf node
+ */
+ protected abstract R doLeaf();
+
+ /**
+ * Returns a suggested target leaf size based on the initial size estimate.
+ *
+ * @return suggested target leaf size
+ */
+ public static long suggestTargetSize(long sizeEstimate) {
+ long est = sizeEstimate / LEAF_TARGET;
+ return est > 0L ? est : 1L;
+ }
+
+ /**
+ * Returns the targetSize, initializing it via the supplied
+ * size estimate if not already initialized.
+ */
+ protected final long getTargetSize(long sizeEstimate) {
+ long s;
+ return ((s = targetSize) != 0 ? s :
+ (targetSize = suggestTargetSize(sizeEstimate)));
+ }
+
+ /**
+ * Returns the local result, if any. Subclasses should use
+ * {@link #setLocalResult(Object)} and {@link #getLocalResult()} to manage
+ * results. This returns the local result so that calls from within the
+ * fork-join framework will return the correct result.
+ *
+ * @return local result for this node previously stored with
+ * {@link #setLocalResult}
+ */
+ @Override
+ public R getRawResult() {
+ return localResult;
+ }
+
+ /**
+ * Does nothing; instead, subclasses should use
+ * {@link #setLocalResult(Object)}} to manage results.
+ *
+ * @param result must be null, or an exception is thrown (this is a safety
+ * tripwire to detect when {@code setRawResult()} is being used
+ * instead of {@code setLocalResult()}
+ */
+ @Override
+ protected void setRawResult(R result) {
+ if (result != null)
+ throw new IllegalStateException();
+ }
+
+ /**
+ * Retrieves a result previously stored with {@link #setLocalResult}
+ *
+ * @return local result for this node previously stored with
+ * {@link #setLocalResult}
+ */
+ protected R getLocalResult() {
+ return localResult;
+ }
+
+ /**
+ * Associates the result with the task, can be retrieved with
+ * {@link #getLocalResult}
+ *
+ * @param localResult local result for this node
+ */
+ protected void setLocalResult(R localResult) {
+ this.localResult = localResult;
+ }
+
+ /**
+ * Indicates whether this task is a leaf node. (Only valid after
+ * {@link #compute} has been called on this node). If the node is not a
+ * leaf node, then children will be non-null and numChildren will be
+ * positive.
+ *
+ * @return {@code true} if this task is a leaf node
+ */
+ protected boolean isLeaf() {
+ return leftChild == null;
+ }
+
+ /**
+ * Indicates whether this task is the root node
+ *
+ * @return {@code true} if this task is the root node.
+ */
+ protected boolean isRoot() {
+ return getParent() == null;
+ }
+
+ /**
+ * Returns the parent of this task, or null if this task is the root
+ *
+ * @return the parent of this task, or null if this task is the root
+ */
+ @SuppressWarnings("unchecked")
+ protected K getParent() {
+ return (K) getCompleter();
+ }
+
+ /**
+ * Decides whether or not to split a task further or compute it
+ * directly. If computing directly, calls {@code doLeaf} and pass
+ * the result to {@code setRawResult}. Otherwise splits off
+ * subtasks, forking one and continuing as the other.
+ *
+ * <p> The method is structured to conserve resources across a
+ * range of uses. The loop continues with one of the child tasks
+ * when split, to avoid deep recursion. To cope with spliterators
+ * that may be systematically biased toward left-heavy or
+ * right-heavy splits, we alternate which child is forked versus
+ * continued in the loop.
+ */
+ @Override
+ public void compute() {
+ Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
+ long sizeEstimate = rs.estimateSize();
+ long sizeThreshold = getTargetSize(sizeEstimate);
+ boolean forkRight = false;
+ @SuppressWarnings("unchecked") K task = (K) this;
+ while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
+ K leftChild, rightChild, taskToFork;
+ task.leftChild = leftChild = task.makeChild(ls);
+ task.rightChild = rightChild = task.makeChild(rs);
+ task.setPendingCount(1);
+ if (forkRight) {
+ forkRight = false;
+ rs = ls;
+ task = leftChild;
+ taskToFork = rightChild;
+ }
+ else {
+ forkRight = true;
+ task = rightChild;
+ taskToFork = leftChild;
+ }
+ taskToFork.fork();
+ sizeEstimate = rs.estimateSize();
+ }
+ task.setLocalResult(task.doLeaf());
+ task.tryComplete();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @implNote
+ * Clears spliterator and children fields. Overriders MUST call
+ * {@code super.onCompletion} as the last thing they do if they want these
+ * cleared.
+ */
+ @Override
+ public void onCompletion(CountedCompleter<?> caller) {
+ spliterator = null;
+ leftChild = rightChild = null;
+ }
+
+ /**
+ * Returns whether this node is a "leftmost" node -- whether the path from
+ * the root to this node involves only traversing leftmost child links. For
+ * a leaf node, this means it is the first leaf node in the encounter order.
+ *
+ * @return {@code true} if this node is a "leftmost" node
+ */
+ protected boolean isLeftmostNode() {
+ @SuppressWarnings("unchecked")
+ K node = (K) this;
+ while (node != null) {
+ K parent = node.getParent();
+ if (parent != null && parent.leftChild != node)
+ return false;
+ node = parent;
+ }
+ return true;
+ }
+}