315 * A {@code ForkJoinTask} for performing a parallel for-each operation |
315 * A {@code ForkJoinTask} for performing a parallel for-each operation |
316 * which visits the elements in encounter order |
316 * which visits the elements in encounter order |
317 */ |
317 */ |
318 @SuppressWarnings("serial") |
318 @SuppressWarnings("serial") |
319 static final class ForEachOrderedTask<S, T> extends CountedCompleter<Void> { |
319 static final class ForEachOrderedTask<S, T> extends CountedCompleter<Void> { |
|
320 /* |
|
321 * Our goal is to ensure that the elements associated with a task are |
|
322 * processed according to an in-order traversal of the computation tree. |
|
323 * We use completion counts for representing these dependencies, so that |
|
324 * a task does not complete until all the tasks preceding it in this |
|
325 * order complete. We use the "completion map" to associate the next |
|
326 * task in this order for any left child. We increase the pending count |
|
327 * of any node on the right side of such a mapping by one to indicate |
|
328 * its dependency, and when a node on the left side of such a mapping |
|
329 * completes, it decrements the pending count of its corresponding right |
|
330 * side. As the computation tree is expanded by splitting, we must |
|
331 * atomically update the mappings to maintain the invariant that the |
|
332 * completion map maps left children to the next node in the in-order |
|
333 * traversal. |
|
334 * |
|
335 * Take, for example, the following computation tree of tasks: |
|
336 * |
|
337 * a |
|
338 * / \ |
|
339 * b c |
|
340 * / \ / \ |
|
341 * d e f g |
|
342 * |
|
343 * The complete map will contain (not necessarily all at the same time) |
|
344 * the following associations: |
|
345 * |
|
346 * d -> e |
|
347 * b -> f |
|
348 * f -> g |
|
349 * |
|
350 * Tasks e, f, g will have their pending counts increased by 1. |
|
351 * |
|
352 * The following relationships hold: |
|
353 * |
|
354 * - completion of d "happens-before" e; |
|
355 * - completion of d and e "happens-before b; |
|
356 * - completion of b "happens-before" f; and |
|
357 * - completion of f "happens-before" g |
|
358 * |
|
359 * Thus overall the "happens-before" relationship holds for the |
|
360 * reporting of elements, covered by tasks d, e, f and g, as specified |
|
361 * by the forEachOrdered operation. |
|
362 */ |
|
363 |
320 private final PipelineHelper<T> helper; |
364 private final PipelineHelper<T> helper; |
321 private Spliterator<S> spliterator; |
365 private Spliterator<S> spliterator; |
322 private final long targetSize; |
366 private final long targetSize; |
323 private final ConcurrentHashMap<ForEachOrderedTask<S, T>, ForEachOrderedTask<S, T>> completionMap; |
367 private final ConcurrentHashMap<ForEachOrderedTask<S, T>, ForEachOrderedTask<S, T>> completionMap; |
324 private final Sink<T> action; |
368 private final Sink<T> action; |
325 private final Object lock; |
|
326 private final ForEachOrderedTask<S, T> leftPredecessor; |
369 private final ForEachOrderedTask<S, T> leftPredecessor; |
327 private Node<T> node; |
370 private Node<T> node; |
328 |
371 |
329 protected ForEachOrderedTask(PipelineHelper<T> helper, |
372 protected ForEachOrderedTask(PipelineHelper<T> helper, |
330 Spliterator<S> spliterator, |
373 Spliterator<S> spliterator, |
331 Sink<T> action) { |
374 Sink<T> action) { |
332 super(null); |
375 super(null); |
333 this.helper = helper; |
376 this.helper = helper; |
334 this.spliterator = spliterator; |
377 this.spliterator = spliterator; |
335 this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize()); |
378 this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize()); |
336 this.completionMap = new ConcurrentHashMap<>(); |
379 // Size map to avoid concurrent re-sizes |
|
380 this.completionMap = new ConcurrentHashMap<>(Math.max(16, AbstractTask.LEAF_TARGET << 1)); |
337 this.action = action; |
381 this.action = action; |
338 this.lock = new Object(); |
|
339 this.leftPredecessor = null; |
382 this.leftPredecessor = null; |
340 } |
383 } |
341 |
384 |
342 ForEachOrderedTask(ForEachOrderedTask<S, T> parent, |
385 ForEachOrderedTask(ForEachOrderedTask<S, T> parent, |
343 Spliterator<S> spliterator, |
386 Spliterator<S> spliterator, |
365 (leftSplit = rightSplit.trySplit()) != null) { |
407 (leftSplit = rightSplit.trySplit()) != null) { |
366 ForEachOrderedTask<S, T> leftChild = |
408 ForEachOrderedTask<S, T> leftChild = |
367 new ForEachOrderedTask<>(task, leftSplit, task.leftPredecessor); |
409 new ForEachOrderedTask<>(task, leftSplit, task.leftPredecessor); |
368 ForEachOrderedTask<S, T> rightChild = |
410 ForEachOrderedTask<S, T> rightChild = |
369 new ForEachOrderedTask<>(task, rightSplit, leftChild); |
411 new ForEachOrderedTask<>(task, rightSplit, leftChild); |
|
412 |
|
413 // Fork the parent task |
|
414 // Completion of the left and right children "happens-before" |
|
415 // completion of the parent |
|
416 task.addToPendingCount(1); |
|
417 // Completion of the left child "happens-before" completion of |
|
418 // the right child |
|
419 rightChild.addToPendingCount(1); |
370 task.completionMap.put(leftChild, rightChild); |
420 task.completionMap.put(leftChild, rightChild); |
371 task.addToPendingCount(1); // forking |
421 |
372 rightChild.addToPendingCount(1); // right pending on left child |
422 // If task is not on the left spine |
373 if (task.leftPredecessor != null) { |
423 if (task.leftPredecessor != null) { |
374 leftChild.addToPendingCount(1); // left pending on previous subtree, except left spine |
424 /* |
375 if (task.completionMap.replace(task.leftPredecessor, task, leftChild)) |
425 * Completion of left-predecessor, or left subtree, |
376 task.addToPendingCount(-1); // transfer my "right child" count to my left child |
426 * "happens-before" completion of left-most leaf node of |
377 else |
427 * right subtree. |
378 leftChild.addToPendingCount(-1); // left child is ready to go when ready |
428 * The left child's pending count needs to be updated before |
|
429 * it is associated in the completion map, otherwise the |
|
430 * left child can complete prematurely and violate the |
|
431 * "happens-before" constraint. |
|
432 */ |
|
433 leftChild.addToPendingCount(1); |
|
434 // Update association of left-predecessor to left-most |
|
435 // leaf node of right subtree |
|
436 if (task.completionMap.replace(task.leftPredecessor, task, leftChild)) { |
|
437 // If replaced, adjust the pending count of the parent |
|
438 // to complete when its children complete |
|
439 task.addToPendingCount(-1); |
|
440 } else { |
|
441 // Left-predecessor has already completed, parent's |
|
442 // pending count is adjusted by left-predecessor; |
|
443 // left child is ready to complete |
|
444 leftChild.addToPendingCount(-1); |
|
445 } |
379 } |
446 } |
|
447 |
380 ForEachOrderedTask<S, T> taskToFork; |
448 ForEachOrderedTask<S, T> taskToFork; |
381 if (forkRight) { |
449 if (forkRight) { |
382 forkRight = false; |
450 forkRight = false; |
383 rightSplit = leftSplit; |
451 rightSplit = leftSplit; |
384 task = leftChild; |
452 task = leftChild; |
389 task = rightChild; |
457 task = rightChild; |
390 taskToFork = leftChild; |
458 taskToFork = leftChild; |
391 } |
459 } |
392 taskToFork.fork(); |
460 taskToFork.fork(); |
393 } |
461 } |
394 if (task.getPendingCount() == 0) { |
462 |
395 task.helper.wrapAndCopyInto(task.action, rightSplit); |
463 /* |
396 } |
464 * Task's pending count is either 0 or 1. If 1 then the completion |
397 else { |
465 * map will contain a value that is task, and two calls to |
|
466 * tryComplete are required for completion, one below and one |
|
467 * triggered by the completion of task's left-predecessor in |
|
468 * onCompletion. Therefore there is no data race within the if |
|
469 * block. |
|
470 */ |
|
471 if (task.getPendingCount() > 0) { |
|
472 // Cannot complete just yet so buffer elements into a Node |
|
473 // for use when completion occurs |
398 Node.Builder<T> nb = task.helper.makeNodeBuilder( |
474 Node.Builder<T> nb = task.helper.makeNodeBuilder( |
399 task.helper.exactOutputSizeIfKnown(rightSplit), |
475 task.helper.exactOutputSizeIfKnown(rightSplit), |
400 size -> (T[]) new Object[size]); |
476 size -> (T[]) new Object[size]); |
401 task.node = task.helper.wrapAndCopyInto(nb, rightSplit).build(); |
477 task.node = task.helper.wrapAndCopyInto(nb, rightSplit).build(); |
|
478 task.spliterator = null; |
402 } |
479 } |
403 task.tryComplete(); |
480 task.tryComplete(); |
404 } |
481 } |
405 |
482 |
406 @Override |
483 @Override |
407 public void onCompletion(CountedCompleter<?> caller) { |
484 public void onCompletion(CountedCompleter<?> caller) { |
408 spliterator = null; |
|
409 if (node != null) { |
485 if (node != null) { |
410 // Dump any data from this leaf into the sink |
486 // Dump buffered elements from this leaf into the sink |
411 synchronized (lock) { |
487 node.forEach(action); |
412 node.forEach(action); |
|
413 } |
|
414 node = null; |
488 node = null; |
415 } |
489 } |
416 ForEachOrderedTask<S, T> victim = completionMap.remove(this); |
490 else if (spliterator != null) { |
417 if (victim != null) |
491 // Dump elements output from this leaf's pipeline into the sink |
418 victim.tryComplete(); |
492 helper.wrapAndCopyInto(action, spliterator); |
|
493 spliterator = null; |
|
494 } |
|
495 |
|
496 // The completion of this task *and* the dumping of elements |
|
497 // "happens-before" completion of the associated left-most leaf task |
|
498 // of right subtree (if any, which can be this task's right sibling) |
|
499 // |
|
500 ForEachOrderedTask<S, T> leftDescendant = completionMap.remove(this); |
|
501 if (leftDescendant != null) |
|
502 leftDescendant.tryComplete(); |
419 } |
503 } |
420 } |
504 } |
421 } |
505 } |