jdk/src/share/classes/java/util/stream/ForEachOps.java
changeset 22289 bb9c71b84919
parent 19218 8e7212b90b81
child 22297 1c62c67d9dd2
equal deleted inserted replaced
22288:81efc55fac99 22289:bb9c71b84919
   315      * A {@code ForkJoinTask} for performing a parallel for-each operation
   315      * A {@code ForkJoinTask} for performing a parallel for-each operation
   316      * which visits the elements in encounter order
   316      * which visits the elements in encounter order
   317      */
   317      */
   318     @SuppressWarnings("serial")
   318     @SuppressWarnings("serial")
   319     static final class ForEachOrderedTask<S, T> extends CountedCompleter<Void> {
   319     static final class ForEachOrderedTask<S, T> extends CountedCompleter<Void> {
       
   320         /*
       
   321          * Our goal is to ensure that the elements associated with a task are
       
   322          * processed according to an in-order traversal of the computation tree.
       
   323          * We use completion counts for representing these dependencies, so that
       
   324          * a task does not complete until all the tasks preceding it in this
       
   325          * order complete.  We use the "completion map" to associate the next
       
   326          * task in this order for any left child.  We increase the pending count
       
   327          * of any node on the right side of such a mapping by one to indicate
       
   328          * its dependency, and when a node on the left side of such a mapping
       
   329          * completes, it decrements the pending count of its corresponding right
       
   330          * side.  As the computation tree is expanded by splitting, we must
       
   331          * atomically update the mappings to maintain the invariant that the
       
   332          * completion map maps left children to the next node in the in-order
       
   333          * traversal.
       
   334          *
       
   335          * Take, for example, the following computation tree of tasks:
       
   336          *
       
   337          *       a
       
   338          *      / \
       
   339          *     b   c
       
   340          *    / \ / \
       
   341          *   d  e f  g
       
   342          *
       
   343          * The complete map will contain (not necessarily all at the same time)
       
   344          * the following associations:
       
   345          *
       
   346          *   d -> e
       
   347          *   b -> f
       
   348          *   f -> g
       
   349          *
       
   350          * Tasks e, f, g will have their pending counts increased by 1.
       
   351          *
       
   352          * The following relationships hold:
       
   353          *
       
   354          *   - completion of d "happens-before" e;
       
   355          *   - completion of d and e "happens-before b;
       
   356          *   - completion of b "happens-before" f; and
       
   357          *   - completion of f "happens-before" g
       
   358          *
       
   359          * Thus overall the "happens-before" relationship holds for the
       
   360          * reporting of elements, covered by tasks d, e, f and g, as specified
       
   361          * by the forEachOrdered operation.
       
   362          */
       
   363 
   320         private final PipelineHelper<T> helper;
   364         private final PipelineHelper<T> helper;
   321         private Spliterator<S> spliterator;
   365         private Spliterator<S> spliterator;
   322         private final long targetSize;
   366         private final long targetSize;
   323         private final ConcurrentHashMap<ForEachOrderedTask<S, T>, ForEachOrderedTask<S, T>> completionMap;
   367         private final ConcurrentHashMap<ForEachOrderedTask<S, T>, ForEachOrderedTask<S, T>> completionMap;
   324         private final Sink<T> action;
   368         private final Sink<T> action;
   325         private final Object lock;
       
   326         private final ForEachOrderedTask<S, T> leftPredecessor;
   369         private final ForEachOrderedTask<S, T> leftPredecessor;
   327         private Node<T> node;
   370         private Node<T> node;
   328 
   371 
   329         protected ForEachOrderedTask(PipelineHelper<T> helper,
   372         protected ForEachOrderedTask(PipelineHelper<T> helper,
   330                                      Spliterator<S> spliterator,
   373                                      Spliterator<S> spliterator,
   331                                      Sink<T> action) {
   374                                      Sink<T> action) {
   332             super(null);
   375             super(null);
   333             this.helper = helper;
   376             this.helper = helper;
   334             this.spliterator = spliterator;
   377             this.spliterator = spliterator;
   335             this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize());
   378             this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize());
   336             this.completionMap = new ConcurrentHashMap<>();
   379             // Size map to avoid concurrent re-sizes
       
   380             this.completionMap = new ConcurrentHashMap<>(Math.max(16, AbstractTask.LEAF_TARGET << 1));
   337             this.action = action;
   381             this.action = action;
   338             this.lock = new Object();
       
   339             this.leftPredecessor = null;
   382             this.leftPredecessor = null;
   340         }
   383         }
   341 
   384 
   342         ForEachOrderedTask(ForEachOrderedTask<S, T> parent,
   385         ForEachOrderedTask(ForEachOrderedTask<S, T> parent,
   343                            Spliterator<S> spliterator,
   386                            Spliterator<S> spliterator,
   346             this.helper = parent.helper;
   389             this.helper = parent.helper;
   347             this.spliterator = spliterator;
   390             this.spliterator = spliterator;
   348             this.targetSize = parent.targetSize;
   391             this.targetSize = parent.targetSize;
   349             this.completionMap = parent.completionMap;
   392             this.completionMap = parent.completionMap;
   350             this.action = parent.action;
   393             this.action = parent.action;
   351             this.lock = parent.lock;
       
   352             this.leftPredecessor = leftPredecessor;
   394             this.leftPredecessor = leftPredecessor;
   353         }
   395         }
   354 
   396 
   355         @Override
   397         @Override
   356         public final void compute() {
   398         public final void compute() {
   365                    (leftSplit = rightSplit.trySplit()) != null) {
   407                    (leftSplit = rightSplit.trySplit()) != null) {
   366                 ForEachOrderedTask<S, T> leftChild =
   408                 ForEachOrderedTask<S, T> leftChild =
   367                     new ForEachOrderedTask<>(task, leftSplit, task.leftPredecessor);
   409                     new ForEachOrderedTask<>(task, leftSplit, task.leftPredecessor);
   368                 ForEachOrderedTask<S, T> rightChild =
   410                 ForEachOrderedTask<S, T> rightChild =
   369                     new ForEachOrderedTask<>(task, rightSplit, leftChild);
   411                     new ForEachOrderedTask<>(task, rightSplit, leftChild);
       
   412 
       
   413                 // Fork the parent task
       
   414                 // Completion of the left and right children "happens-before"
       
   415                 // completion of the parent
       
   416                 task.addToPendingCount(1);
       
   417                 // Completion of the left child "happens-before" completion of
       
   418                 // the right child
       
   419                 rightChild.addToPendingCount(1);
   370                 task.completionMap.put(leftChild, rightChild);
   420                 task.completionMap.put(leftChild, rightChild);
   371                 task.addToPendingCount(1); // forking
   421 
   372                 rightChild.addToPendingCount(1); // right pending on left child
   422                 // If task is not on the left spine
   373                 if (task.leftPredecessor != null) {
   423                 if (task.leftPredecessor != null) {
   374                     leftChild.addToPendingCount(1); // left pending on previous subtree, except left spine
   424                     /*
   375                     if (task.completionMap.replace(task.leftPredecessor, task, leftChild))
   425                      * Completion of left-predecessor, or left subtree,
   376                         task.addToPendingCount(-1); // transfer my "right child" count to my left child
   426                      * "happens-before" completion of left-most leaf node of
   377                     else
   427                      * right subtree.
   378                         leftChild.addToPendingCount(-1); // left child is ready to go when ready
   428                      * The left child's pending count needs to be updated before
       
   429                      * it is associated in the completion map, otherwise the
       
   430                      * left child can complete prematurely and violate the
       
   431                      * "happens-before" constraint.
       
   432                      */
       
   433                     leftChild.addToPendingCount(1);
       
   434                     // Update association of left-predecessor to left-most
       
   435                     // leaf node of right subtree
       
   436                     if (task.completionMap.replace(task.leftPredecessor, task, leftChild)) {
       
   437                         // If replaced, adjust the pending count of the parent
       
   438                         // to complete when its children complete
       
   439                         task.addToPendingCount(-1);
       
   440                     } else {
       
   441                         // Left-predecessor has already completed, parent's
       
   442                         // pending count is adjusted by left-predecessor;
       
   443                         // left child is ready to complete
       
   444                         leftChild.addToPendingCount(-1);
       
   445                     }
   379                 }
   446                 }
       
   447 
   380                 ForEachOrderedTask<S, T> taskToFork;
   448                 ForEachOrderedTask<S, T> taskToFork;
   381                 if (forkRight) {
   449                 if (forkRight) {
   382                     forkRight = false;
   450                     forkRight = false;
   383                     rightSplit = leftSplit;
   451                     rightSplit = leftSplit;
   384                     task = leftChild;
   452                     task = leftChild;
   389                     task = rightChild;
   457                     task = rightChild;
   390                     taskToFork = leftChild;
   458                     taskToFork = leftChild;
   391                 }
   459                 }
   392                 taskToFork.fork();
   460                 taskToFork.fork();
   393             }
   461             }
   394             if (task.getPendingCount() == 0) {
   462 
   395                 task.helper.wrapAndCopyInto(task.action, rightSplit);
   463             /*
   396             }
   464              * Task's pending count is either 0 or 1.  If 1 then the completion
   397             else {
   465              * map will contain a value that is task, and two calls to
       
   466              * tryComplete are required for completion, one below and one
       
   467              * triggered by the completion of task's left-predecessor in
       
   468              * onCompletion.  Therefore there is no data race within the if
       
   469              * block.
       
   470              */
       
   471             if (task.getPendingCount() > 0) {
       
   472                 // Cannot complete just yet so buffer elements into a Node
       
   473                 // for use when completion occurs
   398                 Node.Builder<T> nb = task.helper.makeNodeBuilder(
   474                 Node.Builder<T> nb = task.helper.makeNodeBuilder(
   399                   task.helper.exactOutputSizeIfKnown(rightSplit),
   475                         task.helper.exactOutputSizeIfKnown(rightSplit),
   400                   size -> (T[]) new Object[size]);
   476                         size -> (T[]) new Object[size]);
   401                 task.node = task.helper.wrapAndCopyInto(nb, rightSplit).build();
   477                 task.node = task.helper.wrapAndCopyInto(nb, rightSplit).build();
       
   478                 task.spliterator = null;
   402             }
   479             }
   403             task.tryComplete();
   480             task.tryComplete();
   404         }
   481         }
   405 
   482 
   406         @Override
   483         @Override
   407         public void onCompletion(CountedCompleter<?> caller) {
   484         public void onCompletion(CountedCompleter<?> caller) {
   408             spliterator = null;
       
   409             if (node != null) {
   485             if (node != null) {
   410                 // Dump any data from this leaf into the sink
   486                 // Dump buffered elements from this leaf into the sink
   411                 synchronized (lock) {
   487                 node.forEach(action);
   412                     node.forEach(action);
       
   413                 }
       
   414                 node = null;
   488                 node = null;
   415             }
   489             }
   416             ForEachOrderedTask<S, T> victim = completionMap.remove(this);
   490             else if (spliterator != null) {
   417             if (victim != null)
   491                 // Dump elements output from this leaf's pipeline into the sink
   418                 victim.tryComplete();
   492                 helper.wrapAndCopyInto(action, spliterator);
       
   493                 spliterator = null;
       
   494             }
       
   495 
       
   496             // The completion of this task *and* the dumping of elements
       
   497             // "happens-before" completion of the associated left-most leaf task
       
   498             // of right subtree (if any, which can be this task's right sibling)
       
   499             //
       
   500             ForEachOrderedTask<S, T> leftDescendant = completionMap.remove(this);
       
   501             if (leftDescendant != null)
       
   502                 leftDescendant.tryComplete();
   419         }
   503         }
   420     }
   504     }
   421 }
   505 }