163 * (DAG). Otherwise, executions may encounter a form of deadlock as |
163 * (DAG). Otherwise, executions may encounter a form of deadlock as |
164 * tasks cyclically wait for each other. However, this framework |
164 * tasks cyclically wait for each other. However, this framework |
165 * supports other methods and techniques (for example the use of |
165 * supports other methods and techniques (for example the use of |
166 * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that |
166 * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that |
167 * may be of use in constructing custom subclasses for problems that |
167 * may be of use in constructing custom subclasses for problems that |
168 * are not statically structured as DAGs. To support such usages a |
168 * are not statically structured as DAGs. To support such usages, a |
169 * ForkJoinTask may be atomically <em>tagged</em> with a {@code short} |
169 * ForkJoinTask may be atomically <em>tagged</em> with a {@code short} |
170 * value using {@link #setForkJoinTaskTag} or {@link |
170 * value using {@link #setForkJoinTaskTag} or {@link |
171 * #compareAndSetForkJoinTaskTag} and checked using {@link |
171 * #compareAndSetForkJoinTaskTag} and checked using {@link |
172 * #getForkJoinTaskTag}. The ForkJoinTask implementation does not use |
172 * #getForkJoinTaskTag}. The ForkJoinTask implementation does not use |
173 * these {@code protected} methods or tags for any purpose, but they |
173 * these {@code protected} methods or tags for any purpose, but they |
312 * Blocks a non-worker-thread until completion. |
312 * Blocks a non-worker-thread until completion. |
313 * @return status upon completion |
313 * @return status upon completion |
314 */ |
314 */ |
315 private int externalAwaitDone() { |
315 private int externalAwaitDone() { |
316 int s; |
316 int s; |
317 ForkJoinPool.externalHelpJoin(this); |
317 ForkJoinPool cp = ForkJoinPool.common; |
318 boolean interrupted = false; |
318 if ((s = status) >= 0) { |
319 while ((s = status) >= 0) { |
319 if (cp != null) { |
320 if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { |
320 if (this instanceof CountedCompleter) |
321 synchronized (this) { |
321 s = cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE); |
322 if (status >= 0) { |
322 else if (cp.tryExternalUnpush(this)) |
323 try { |
323 s = doExec(); |
324 wait(); |
324 } |
325 } catch (InterruptedException ie) { |
325 if (s >= 0 && (s = status) >= 0) { |
326 interrupted = true; |
326 boolean interrupted = false; |
|
327 do { |
|
328 if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { |
|
329 synchronized (this) { |
|
330 if (status >= 0) { |
|
331 try { |
|
332 wait(); |
|
333 } catch (InterruptedException ie) { |
|
334 interrupted = true; |
|
335 } |
|
336 } |
|
337 else |
|
338 notifyAll(); |
327 } |
339 } |
328 } |
340 } |
329 else |
341 } while ((s = status) >= 0); |
330 notifyAll(); |
342 if (interrupted) |
331 } |
343 Thread.currentThread().interrupt(); |
332 } |
344 } |
333 } |
345 } |
334 if (interrupted) |
|
335 Thread.currentThread().interrupt(); |
|
336 return s; |
346 return s; |
337 } |
347 } |
338 |
348 |
339 /** |
349 /** |
340 * Blocks a non-worker-thread until completion or interruption. |
350 * Blocks a non-worker-thread until completion or interruption. |
341 */ |
351 */ |
342 private int externalInterruptibleAwaitDone() throws InterruptedException { |
352 private int externalInterruptibleAwaitDone() throws InterruptedException { |
343 int s; |
353 int s; |
|
354 ForkJoinPool cp = ForkJoinPool.common; |
344 if (Thread.interrupted()) |
355 if (Thread.interrupted()) |
345 throw new InterruptedException(); |
356 throw new InterruptedException(); |
346 ForkJoinPool.externalHelpJoin(this); |
357 if ((s = status) >= 0 && cp != null) { |
|
358 if (this instanceof CountedCompleter) |
|
359 cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE); |
|
360 else if (cp.tryExternalUnpush(this)) |
|
361 doExec(); |
|
362 } |
347 while ((s = status) >= 0) { |
363 while ((s = status) >= 0) { |
348 if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { |
364 if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { |
349 synchronized (this) { |
365 synchronized (this) { |
350 if (status >= 0) |
366 if (status >= 0) |
351 wait(); |
367 wait(); |
627 } |
642 } |
628 |
643 |
629 /** |
644 /** |
630 * A version of "sneaky throw" to relay exceptions |
645 * A version of "sneaky throw" to relay exceptions |
631 */ |
646 */ |
632 static void rethrow(final Throwable ex) { |
647 static void rethrow(Throwable ex) { |
633 if (ex != null) { |
648 if (ex != null) |
634 if (ex instanceof Error) |
|
635 throw (Error)ex; |
|
636 if (ex instanceof RuntimeException) |
|
637 throw (RuntimeException)ex; |
|
638 ForkJoinTask.<RuntimeException>uncheckedThrow(ex); |
649 ForkJoinTask.<RuntimeException>uncheckedThrow(ex); |
639 } |
|
640 } |
650 } |
641 |
651 |
642 /** |
652 /** |
643 * The sneaky part of sneaky throw, relying on generics |
653 * The sneaky part of sneaky throw, relying on generics |
644 * limitations to evade compiler complaints about rethrowing |
654 * limitations to evade compiler complaints about rethrowing |
645 * unchecked exceptions |
655 * unchecked exceptions |
646 */ |
656 */ |
647 @SuppressWarnings("unchecked") static <T extends Throwable> |
657 @SuppressWarnings("unchecked") static <T extends Throwable> |
648 void uncheckedThrow(Throwable t) throws T { |
658 void uncheckedThrow(Throwable t) throws T { |
649 if (t != null) |
659 throw (T)t; // rely on vacuous cast |
650 throw (T)t; // rely on vacuous cast |
|
651 } |
660 } |
652 |
661 |
653 /** |
662 /** |
654 * Throws exception, if any, associated with the given status. |
663 * Throws exception, if any, associated with the given status. |
655 */ |
664 */ |
1008 if (Thread.interrupted()) |
1017 if (Thread.interrupted()) |
1009 throw new InterruptedException(); |
1018 throw new InterruptedException(); |
1010 // Messy in part because we measure in nanosecs, but wait in millisecs |
1019 // Messy in part because we measure in nanosecs, but wait in millisecs |
1011 int s; long ms; |
1020 int s; long ms; |
1012 long ns = unit.toNanos(timeout); |
1021 long ns = unit.toNanos(timeout); |
|
1022 ForkJoinPool cp; |
1013 if ((s = status) >= 0 && ns > 0L) { |
1023 if ((s = status) >= 0 && ns > 0L) { |
1014 long deadline = System.nanoTime() + ns; |
1024 long deadline = System.nanoTime() + ns; |
1015 ForkJoinPool p = null; |
1025 ForkJoinPool p = null; |
1016 ForkJoinPool.WorkQueue w = null; |
1026 ForkJoinPool.WorkQueue w = null; |
1017 Thread t = Thread.currentThread(); |
1027 Thread t = Thread.currentThread(); |
1019 ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; |
1029 ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; |
1020 p = wt.pool; |
1030 p = wt.pool; |
1021 w = wt.workQueue; |
1031 w = wt.workQueue; |
1022 p.helpJoinOnce(w, this); // no retries on failure |
1032 p.helpJoinOnce(w, this); // no retries on failure |
1023 } |
1033 } |
1024 else |
1034 else if ((cp = ForkJoinPool.common) != null) { |
1025 ForkJoinPool.externalHelpJoin(this); |
1035 if (this instanceof CountedCompleter) |
|
1036 cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE); |
|
1037 else if (cp.tryExternalUnpush(this)) |
|
1038 doExec(); |
|
1039 } |
1026 boolean canBlock = false; |
1040 boolean canBlock = false; |
1027 boolean interrupted = false; |
1041 boolean interrupted = false; |
1028 try { |
1042 try { |
1029 while ((s = status) >= 0) { |
1043 while ((s = status) >= 0) { |
1030 if (w != null && w.qlock < 0) |
1044 if (w != null && w.qlock < 0) |
1031 cancelIgnoringExceptions(this); |
1045 cancelIgnoringExceptions(this); |
1032 else if (!canBlock) { |
1046 else if (!canBlock) { |
1033 if (p == null || p.tryCompensate()) |
1047 if (p == null || p.tryCompensate(p.ctl)) |
1034 canBlock = true; |
1048 canBlock = true; |
1035 } |
1049 } |
1036 else { |
1050 else { |
1037 if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L && |
1051 if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L && |
1038 U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { |
1052 U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { |
1169 */ |
1183 */ |
1170 public boolean tryUnfork() { |
1184 public boolean tryUnfork() { |
1171 Thread t; |
1185 Thread t; |
1172 return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? |
1186 return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? |
1173 ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) : |
1187 ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) : |
1174 ForkJoinPool.tryExternalUnpush(this)); |
1188 ForkJoinPool.common.tryExternalUnpush(this)); |
1175 } |
1189 } |
1176 |
1190 |
1177 /** |
1191 /** |
1178 * Returns an estimate of the number of tasks that have been |
1192 * Returns an estimate of the number of tasks that have been |
1179 * forked by the current worker thread but not yet executed. This |
1193 * forked by the current worker thread but not yet executed. This |
1338 * before processing, otherwise exiting because the node has |
1352 * before processing, otherwise exiting because the node has |
1339 * already been visited. |
1353 * already been visited. |
1340 * |
1354 * |
1341 * @param e the expected tag value |
1355 * @param e the expected tag value |
1342 * @param tag the new tag value |
1356 * @param tag the new tag value |
1343 * @return true if successful; i.e., the current value was |
1357 * @return {@code true} if successful; i.e., the current value was |
1344 * equal to e and is now tag. |
1358 * equal to e and is now tag. |
1345 * @since 1.8 |
1359 * @since 1.8 |
1346 */ |
1360 */ |
1347 public final boolean compareAndSetForkJoinTaskTag(short e, short tag) { |
1361 public final boolean compareAndSetForkJoinTaskTag(short e, short tag) { |
1348 for (int s;;) { |
1362 for (int s;;) { |