jdk/src/share/classes/java/util/stream/AbstractTask.java
changeset 17163 6a5e9b4f27d2
child 18572 53b8b8c30086
equal deleted inserted replaced
17162:6b3dc8e20c04 17163:6a5e9b4f27d2
       
     1 /*
       
     2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 package java.util.stream;
       
    26 
       
    27 import java.util.Spliterator;
       
    28 import java.util.concurrent.CountedCompleter;
       
    29 import java.util.concurrent.ForkJoinPool;
       
    30 
       
    31 /**
       
    32  * Abstract base class for most fork-join tasks used to implement stream ops.
       
    33  * Manages splitting logic, tracking of child tasks, and intermediate results.
       
    34  * Each task is associated with a {@link Spliterator} that describes the portion
       
    35  * of the input associated with the subtree rooted at this task.
       
    36  * Tasks may be leaf nodes (which will traverse the elements of
       
    37  * the {@code Spliterator}) or internal nodes (which split the
       
    38  * {@code Spliterator} into multiple child tasks).
       
    39  *
       
    40  * @implNote
       
    41  * <p>This class is based on {@link CountedCompleter}, a form of fork-join task
       
    42  * where each task has a semaphore-like count of uncompleted children, and the
       
    43  * task is implicitly completed and notified when its last child completes.
       
    44  * Internal node tasks will likely override the {@code onCompletion} method from
       
    45  * {@code CountedCompleter} to merge the results from child tasks into the
       
    46  * current task's result.
       
    47  *
       
    48  * <p>Splitting and setting up the child task links is done by {@code compute()}
       
    49  * for internal nodes.  At {@code compute()} time for leaf nodes, it is
       
    50  * guaranteed that the parent's child-related fields (including sibling links
       
    51  * for the parent's children) will be set up for all children.
       
    52  *
       
    53  * <p>For example, a task that performs a reduce would override {@code doLeaf()}
       
    54  * to perform a reduction on that leaf node's chunk using the
       
    55  * {@code Spliterator}, and override {@code onCompletion()} to merge the results
       
    56  * of the child tasks for internal nodes:
       
    57  *
       
    58  * <pre>{@code
       
    59  *     protected S doLeaf() {
       
    60  *         spliterator.forEach(...);
       
    61  *         return localReductionResult;
       
    62  *     }
       
    63  *
       
    64  *     public void onCompletion(CountedCompleter caller) {
       
    65  *         if (!isLeaf()) {
       
    66  *             ReduceTask<P_IN, P_OUT, T, R> child = children;
       
    67  *             R result = child.getLocalResult();
       
    68  *             child = child.nextSibling;
       
    69  *             for (; child != null; child = child.nextSibling)
       
    70  *                 result = combine(result, child.getLocalResult());
       
    71  *             setLocalResult(result);
       
    72  *         }
       
    73  *     }
       
    74  * }</pre>
       
    75  *
       
    76  * @param <P_IN> Type of elements input to the pipeline
       
    77  * @param <P_OUT> Type of elements output from the pipeline
       
    78  * @param <R> Type of intermediate result, which may be different from operation
       
    79  *        result type
       
    80  * @param <K> Type of parent, child and sibling tasks
       
    81  * @since 1.8
       
    82  */
       
    83 abstract class AbstractTask<P_IN, P_OUT, R,
       
    84                             K extends AbstractTask<P_IN, P_OUT, R, K>>
       
    85         extends CountedCompleter<R> {
       
    86 
       
    87     /**
       
    88      * Default target factor of leaf tasks for parallel decomposition.
       
    89      * To allow load balancing, we over-partition, currently to approximately
       
    90      * four tasks per processor, which enables others to help out
       
    91      * if leaf tasks are uneven or some processors are otherwise busy.
       
    92      */
       
    93     static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;
       
    94 
       
    95     /** The pipeline helper, common to all tasks in a computation */
       
    96     protected final PipelineHelper<P_OUT> helper;
       
    97 
       
    98     /**
       
    99      * The spliterator for the portion of the input associated with the subtree
       
   100      * rooted at this task
       
   101      */
       
   102     protected Spliterator<P_IN> spliterator;
       
   103 
       
   104     /** Target leaf size, common to all tasks in a computation */
       
   105     protected final long targetSize;
       
   106 
       
   107     /**
       
   108      * The left child.
       
   109      * null if no children
       
   110      * if non-null rightChild is non-null
       
   111      */
       
   112     protected K leftChild;
       
   113 
       
   114     /**
       
   115      * The right child.
       
   116      * null if no children
       
   117      * if non-null leftChild is non-null
       
   118      */
       
   119     protected K rightChild;
       
   120 
       
   121     /** The result of this node, if completed */
       
   122     private R localResult;
       
   123 
       
   124     /**
       
   125      * Constructor for root nodes.
       
   126      *
       
   127      * @param helper The {@code PipelineHelper} describing the stream pipeline
       
   128      *               up to this operation
       
   129      * @param spliterator The {@code Spliterator} describing the source for this
       
   130      *                    pipeline
       
   131      */
       
   132     protected AbstractTask(PipelineHelper<P_OUT> helper,
       
   133                            Spliterator<P_IN> spliterator) {
       
   134         super(null);
       
   135         this.helper = helper;
       
   136         this.spliterator = spliterator;
       
   137         this.targetSize = suggestTargetSize(spliterator.estimateSize());
       
   138     }
       
   139 
       
   140     /**
       
   141      * Constructor for non-root nodes.
       
   142      *
       
   143      * @param parent this node's parent task
       
   144      * @param spliterator {@code Spliterator} describing the subtree rooted at
       
   145      *        this node, obtained by splitting the parent {@code Spliterator}
       
   146      */
       
   147     protected AbstractTask(K parent,
       
   148                            Spliterator<P_IN> spliterator) {
       
   149         super(parent);
       
   150         this.spliterator = spliterator;
       
   151         this.helper = parent.helper;
       
   152         this.targetSize = parent.targetSize;
       
   153     }
       
   154 
       
   155     /**
       
   156      * Constructs a new node of type T whose parent is the receiver; must call
       
   157      * the AbstractTask(T, Spliterator) constructor with the receiver and the
       
   158      * provided Spliterator.
       
   159      *
       
   160      * @param spliterator {@code Spliterator} describing the subtree rooted at
       
   161      *        this node, obtained by splitting the parent {@code Spliterator}
       
   162      * @return newly constructed child node
       
   163      */
       
   164     protected abstract K makeChild(Spliterator<P_IN> spliterator);
       
   165 
       
   166     /**
       
   167      * Computes the result associated with a leaf node.  Will be called by
       
   168      * {@code compute()} and the result passed to @{code setLocalResult()}
       
   169      *
       
   170      * @return the computed result of a leaf node
       
   171      */
       
   172     protected abstract R doLeaf();
       
   173 
       
   174     /**
       
   175      * Returns a suggested target leaf size based on the initial size estimate.
       
   176      *
       
   177      * @return suggested target leaf size
       
   178      */
       
   179     public static long suggestTargetSize(long sizeEstimate) {
       
   180         long est = sizeEstimate / LEAF_TARGET;
       
   181         return est > 0L ? est : 1L;
       
   182     }
       
   183 
       
   184     /**
       
   185      * Returns a suggestion whether it is advisable to split the provided
       
   186      * spliterator based on target size and other considerations, such as pool
       
   187      * state.
       
   188       *
       
   189      * @return {@code true} if a split is advised otherwise {@code false}
       
   190      */
       
   191     public static boolean suggestSplit(Spliterator spliterator,
       
   192                                        long targetSize) {
       
   193         long remaining = spliterator.estimateSize();
       
   194         return (remaining > targetSize);
       
   195         // @@@ May additionally want to fold in pool characteristics such as surplus task count
       
   196     }
       
   197 
       
   198     /**
       
   199      * Returns a suggestion whether it is adviseable to split this task based on
       
   200      * target size and other considerations.
       
   201       *
       
   202      *  @return {@code true} if a split is advised otherwise {@code false}
       
   203      */
       
   204     public boolean suggestSplit() {
       
   205         return suggestSplit(spliterator, targetSize);
       
   206     }
       
   207 
       
   208     /**
       
   209      * Returns the local result, if any. Subclasses should use
       
   210      * {@link #setLocalResult(Object)} and {@link #getLocalResult()} to manage
       
   211      * results.  This returns the local result so that calls from within the
       
   212      * fork-join framework will return the correct result.
       
   213      *
       
   214      * @return local result for this node previously stored with
       
   215      * {@link #setLocalResult}
       
   216      */
       
   217     @Override
       
   218     public R getRawResult() {
       
   219         return localResult;
       
   220     }
       
   221 
       
   222     /**
       
   223      * Does nothing; instead, subclasses should use
       
   224      * {@link #setLocalResult(Object)}} to manage results.
       
   225      *
       
   226      * @param result must be null, or an exception is thrown (this is a safety
       
   227      *        tripwire to detect when {@code setRawResult()} is being used
       
   228      *        instead of {@code setLocalResult()}
       
   229      */
       
   230     @Override
       
   231     protected void setRawResult(R result) {
       
   232         if (result != null)
       
   233             throw new IllegalStateException();
       
   234     }
       
   235 
       
   236     /**
       
   237      * Retrieves a result previously stored with {@link #setLocalResult}
       
   238      *
       
   239      * @return local result for this node previously stored with
       
   240      * {@link #setLocalResult}
       
   241      */
       
   242     protected R getLocalResult() {
       
   243         return localResult;
       
   244     }
       
   245 
       
   246     /**
       
   247      * Associates the result with the task, can be retrieved with
       
   248      * {@link #getLocalResult}
       
   249      *
       
   250      * @param localResult local result for this node
       
   251      */
       
   252     protected void setLocalResult(R localResult) {
       
   253         this.localResult = localResult;
       
   254     }
       
   255 
       
   256     /**
       
   257      * Indicates whether this task is a leaf node.  (Only valid after
       
   258      * {@link #compute} has been called on this node).  If the node is not a
       
   259      * leaf node, then children will be non-null and numChildren will be
       
   260      * positive.
       
   261      *
       
   262      * @return {@code true} if this task is a leaf node
       
   263      */
       
   264     protected boolean isLeaf() {
       
   265         return leftChild == null;
       
   266     }
       
   267 
       
   268     /**
       
   269      * Indicates whether this task is the root node
       
   270      *
       
   271      * @return {@code true} if this task is the root node.
       
   272      */
       
   273     protected boolean isRoot() {
       
   274         return getParent() == null;
       
   275     }
       
   276 
       
   277     /**
       
   278      * Returns the parent of this task, or null if this task is the root
       
   279      *
       
   280      * @return the parent of this task, or null if this task is the root
       
   281      */
       
   282     @SuppressWarnings("unchecked")
       
   283     protected K getParent() {
       
   284         return (K) getCompleter();
       
   285     }
       
   286 
       
   287     /**
       
   288      * Decides whether or not to split a task further or compute it directly. If
       
   289      * computing directly, call {@code doLeaf} and pass the result to
       
   290      * {@code setRawResult}.  If splitting, set up the child-related fields,
       
   291      * create the child tasks, fork the leftmost (prefix) child tasks, and
       
   292      * compute the rightmost (remaining) child tasks.
       
   293      *
       
   294      * <p>
       
   295      * Computing will continue for rightmost tasks while a task can be computed
       
   296      * as determined by {@link #canCompute()} and that task should and can be
       
   297      * split into left and right tasks.
       
   298      *
       
   299      * <p>
       
   300      * The rightmost tasks are computed in a loop rather than recursively to
       
   301      * avoid potential stack overflows when computing with a right-balanced
       
   302      * tree, such as that produced when splitting with a {@link Spliterator}
       
   303      * created from an {@link java.util.Iterator}.
       
   304      */
       
   305     @Override
       
   306     public final void compute() {
       
   307         @SuppressWarnings("unchecked")
       
   308         K task = (K) this;
       
   309         while (task.canCompute()) {
       
   310             Spliterator<P_IN> split;
       
   311             if (!task.suggestSplit() || (split = task.spliterator.trySplit()) == null) {
       
   312                 task.setLocalResult(task.doLeaf());
       
   313                 task.tryComplete();
       
   314                 return;
       
   315             }
       
   316             else {
       
   317                 K l = task.leftChild = task.makeChild(split);
       
   318                 K r = task.rightChild = task.makeChild(task.spliterator);
       
   319                 task.setPendingCount(1);
       
   320                 l.fork();
       
   321                 task = r;
       
   322             }
       
   323         }
       
   324     }
       
   325 
       
   326     /**
       
   327      * {@inheritDoc}
       
   328      *
       
   329      * @implNote
       
   330      * Clears spliterator and children fields.  Overriders MUST call
       
   331      * {@code super.onCompletion} as the last thing they do if they want these
       
   332      * cleared.
       
   333      */
       
   334     @Override
       
   335     public void onCompletion(CountedCompleter<?> caller) {
       
   336         spliterator = null;
       
   337         leftChild = rightChild = null;
       
   338     }
       
   339 
       
   340     /**
       
   341      * Determines if the task can be computed.
       
   342      *
       
   343      * @implSpec The default always returns true
       
   344      *
       
   345      * @return {@code true} if this task can be computed to either calculate the
       
   346      *         leaf via {@link #doLeaf()} or split, otherwise false if this task
       
   347      *         cannot be computed, for example if this task has been canceled
       
   348      *         and/or a result for the computation has been found by another
       
   349      *         task.
       
   350      */
       
   351     protected boolean canCompute() {
       
   352         return true;
       
   353     }
       
   354 
       
   355     /**
       
   356      * Returns whether this node is a "leftmost" node -- whether the path from
       
   357      * the root to this node involves only traversing leftmost child links.  For
       
   358      * a leaf node, this means it is the first leaf node in the encounter order.
       
   359      *
       
   360      * @return {@code true} if this node is a "leftmost" node
       
   361      */
       
   362     protected boolean isLeftmostNode() {
       
   363         @SuppressWarnings("unchecked")
       
   364         K node = (K) this;
       
   365         while (node != null) {
       
   366             K parent = node.getParent();
       
   367             if (parent != null && parent.leftChild != node)
       
   368                 return false;
       
   369             node = parent;
       
   370         }
       
   371         return true;
       
   372     }
       
   373 }