295 } |
295 } |
296 return s; |
296 return s; |
297 } |
297 } |
298 |
298 |
299 /** |
299 /** |
300 * Tries to set SIGNAL status unless already completed. Used by |
300 * If not done, sets SIGNAL status and performs Object.wait(timeout). |
301 * ForkJoinPool. Other variants are directly incorporated into |
301 * This task may or may not be done on exit. Ignores interrupts. |
302 * externalAwaitDone etc. |
302 * |
303 * |
303 * @param timeout using Object.wait conventions. |
304 * @return true if successful |
304 */ |
305 */ |
305 final void internalWait(long timeout) { |
306 final boolean trySetSignal() { |
306 int s; |
307 int s = status; |
307 if ((s = status) >= 0 && // force completer to issue notify |
308 return s >= 0 && U.compareAndSwapInt(this, STATUS, s, s | SIGNAL); |
308 U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { |
|
309 synchronized (this) { |
|
310 if (status >= 0) |
|
311 try { wait(timeout); } catch (InterruptedException ie) { } |
|
312 else |
|
313 notifyAll(); |
|
314 } |
|
315 } |
309 } |
316 } |
310 |
317 |
311 /** |
318 /** |
312 * Blocks a non-worker-thread until completion. |
319 * Blocks a non-worker-thread until completion. |
313 * @return status upon completion |
320 * @return status upon completion |
314 */ |
321 */ |
315 private int externalAwaitDone() { |
322 private int externalAwaitDone() { |
316 int s; |
323 int s = ((this instanceof CountedCompleter) ? // try helping |
317 ForkJoinPool cp = ForkJoinPool.common; |
324 ForkJoinPool.common.externalHelpComplete( |
318 if ((s = status) >= 0) { |
325 (CountedCompleter<?>)this, 0) : |
319 if (cp != null) { |
326 ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0); |
320 if (this instanceof CountedCompleter) |
327 if (s >= 0 && (s = status) >= 0) { |
321 s = cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE); |
328 boolean interrupted = false; |
322 else if (cp.tryExternalUnpush(this)) |
329 do { |
323 s = doExec(); |
330 if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { |
324 } |
331 synchronized (this) { |
325 if (s >= 0 && (s = status) >= 0) { |
332 if (status >= 0) { |
326 boolean interrupted = false; |
333 try { |
327 do { |
334 wait(0L); |
328 if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { |
335 } catch (InterruptedException ie) { |
329 synchronized (this) { |
336 interrupted = true; |
330 if (status >= 0) { |
|
331 try { |
|
332 wait(); |
|
333 } catch (InterruptedException ie) { |
|
334 interrupted = true; |
|
335 } |
|
336 } |
337 } |
337 else |
|
338 notifyAll(); |
|
339 } |
338 } |
|
339 else |
|
340 notifyAll(); |
340 } |
341 } |
341 } while ((s = status) >= 0); |
342 } |
342 if (interrupted) |
343 } while ((s = status) >= 0); |
343 Thread.currentThread().interrupt(); |
344 if (interrupted) |
344 } |
345 Thread.currentThread().interrupt(); |
345 } |
346 } |
346 return s; |
347 return s; |
347 } |
348 } |
348 |
349 |
349 /** |
350 /** |
350 * Blocks a non-worker-thread until completion or interruption. |
351 * Blocks a non-worker-thread until completion or interruption. |
351 */ |
352 */ |
352 private int externalInterruptibleAwaitDone() throws InterruptedException { |
353 private int externalInterruptibleAwaitDone() throws InterruptedException { |
353 int s; |
354 int s; |
354 ForkJoinPool cp = ForkJoinPool.common; |
|
355 if (Thread.interrupted()) |
355 if (Thread.interrupted()) |
356 throw new InterruptedException(); |
356 throw new InterruptedException(); |
357 if ((s = status) >= 0 && cp != null) { |
357 if ((s = status) >= 0 && |
358 if (this instanceof CountedCompleter) |
358 (s = ((this instanceof CountedCompleter) ? |
359 cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE); |
359 ForkJoinPool.common.externalHelpComplete( |
360 else if (cp.tryExternalUnpush(this)) |
360 (CountedCompleter<?>)this, 0) : |
361 doExec(); |
361 ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : |
362 } |
362 0)) >= 0) { |
363 while ((s = status) >= 0) { |
363 while ((s = status) >= 0) { |
364 if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { |
364 if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { |
365 synchronized (this) { |
365 synchronized (this) { |
366 if (status >= 0) |
366 if (status >= 0) |
367 wait(); |
367 wait(0L); |
368 else |
368 else |
369 notifyAll(); |
369 notifyAll(); |
|
370 } |
370 } |
371 } |
371 } |
372 } |
372 } |
373 } |
373 return s; |
374 return s; |
374 } |
375 } |
384 int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; |
385 int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; |
385 return (s = status) < 0 ? s : |
386 return (s = status) < 0 ? s : |
386 ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? |
387 ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? |
387 (w = (wt = (ForkJoinWorkerThread)t).workQueue). |
388 (w = (wt = (ForkJoinWorkerThread)t).workQueue). |
388 tryUnpush(this) && (s = doExec()) < 0 ? s : |
389 tryUnpush(this) && (s = doExec()) < 0 ? s : |
389 wt.pool.awaitJoin(w, this) : |
390 wt.pool.awaitJoin(w, this, 0L) : |
390 externalAwaitDone(); |
391 externalAwaitDone(); |
391 } |
392 } |
392 |
393 |
393 /** |
394 /** |
394 * Implementation for invoke, quietlyInvoke. |
395 * Implementation for invoke, quietlyInvoke. |
575 lock.unlock(); |
577 lock.unlock(); |
576 } |
578 } |
577 Throwable ex; |
579 Throwable ex; |
578 if (e == null || (ex = e.ex) == null) |
580 if (e == null || (ex = e.ex) == null) |
579 return null; |
581 return null; |
580 if (false && e.thrower != Thread.currentThread().getId()) { |
582 if (e.thrower != Thread.currentThread().getId()) { |
581 Class<? extends Throwable> ec = ex.getClass(); |
583 Class<? extends Throwable> ec = ex.getClass(); |
582 try { |
584 try { |
583 Constructor<?> noArgCtor = null; |
585 Constructor<?> noArgCtor = null; |
584 Constructor<?>[] cs = ec.getConstructors();// public ctors only |
586 Constructor<?>[] cs = ec.getConstructors();// public ctors only |
585 for (int i = 0; i < cs.length; ++i) { |
587 for (int i = 0; i < cs.length; ++i) { |
586 Constructor<?> c = cs[i]; |
588 Constructor<?> c = cs[i]; |
587 Class<?>[] ps = c.getParameterTypes(); |
589 Class<?>[] ps = c.getParameterTypes(); |
588 if (ps.length == 0) |
590 if (ps.length == 0) |
589 noArgCtor = c; |
591 noArgCtor = c; |
590 else if (ps.length == 1 && ps[0] == Throwable.class) |
592 else if (ps.length == 1 && ps[0] == Throwable.class) { |
591 return (Throwable)(c.newInstance(ex)); |
593 Throwable wx = (Throwable)c.newInstance(ex); |
|
594 return (wx == null) ? ex : wx; |
|
595 } |
592 } |
596 } |
593 if (noArgCtor != null) { |
597 if (noArgCtor != null) { |
594 Throwable wx = (Throwable)(noArgCtor.newInstance()); |
598 Throwable wx = (Throwable)(noArgCtor.newInstance()); |
595 wx.initCause(ex); |
599 if (wx != null) { |
596 return wx; |
600 wx.initCause(ex); |
|
601 return wx; |
|
602 } |
597 } |
603 } |
598 } catch (Exception ignore) { |
604 } catch (Exception ignore) { |
599 } |
605 } |
600 } |
606 } |
601 return ex; |
607 return ex; |
1015 * member of a ForkJoinPool and was interrupted while waiting |
1021 * member of a ForkJoinPool and was interrupted while waiting |
1016 * @throws TimeoutException if the wait timed out |
1022 * @throws TimeoutException if the wait timed out |
1017 */ |
1023 */ |
1018 public final V get(long timeout, TimeUnit unit) |
1024 public final V get(long timeout, TimeUnit unit) |
1019 throws InterruptedException, ExecutionException, TimeoutException { |
1025 throws InterruptedException, ExecutionException, TimeoutException { |
|
1026 int s; |
|
1027 long nanos = unit.toNanos(timeout); |
1020 if (Thread.interrupted()) |
1028 if (Thread.interrupted()) |
1021 throw new InterruptedException(); |
1029 throw new InterruptedException(); |
1022 // Messy in part because we measure in nanosecs, but wait in millisecs |
1030 if ((s = status) >= 0 && nanos > 0L) { |
1023 int s; long ms; |
1031 long d = System.nanoTime() + nanos; |
1024 long ns = unit.toNanos(timeout); |
1032 long deadline = (d == 0L) ? 1L : d; // avoid 0 |
1025 ForkJoinPool cp; |
|
1026 if ((s = status) >= 0 && ns > 0L) { |
|
1027 long deadline = System.nanoTime() + ns; |
|
1028 ForkJoinPool p = null; |
|
1029 ForkJoinPool.WorkQueue w = null; |
|
1030 Thread t = Thread.currentThread(); |
1033 Thread t = Thread.currentThread(); |
1031 if (t instanceof ForkJoinWorkerThread) { |
1034 if (t instanceof ForkJoinWorkerThread) { |
1032 ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; |
1035 ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; |
1033 p = wt.pool; |
1036 s = wt.pool.awaitJoin(wt.workQueue, this, deadline); |
1034 w = wt.workQueue; |
1037 } |
1035 p.helpJoinOnce(w, this); // no retries on failure |
1038 else if ((s = ((this instanceof CountedCompleter) ? |
1036 } |
1039 ForkJoinPool.common.externalHelpComplete( |
1037 else if ((cp = ForkJoinPool.common) != null) { |
1040 (CountedCompleter<?>)this, 0) : |
1038 if (this instanceof CountedCompleter) |
1041 ForkJoinPool.common.tryExternalUnpush(this) ? |
1039 cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE); |
1042 doExec() : 0)) >= 0) { |
1040 else if (cp.tryExternalUnpush(this)) |
1043 long ns, ms; // measure in nanosecs, but wait in millisecs |
1041 doExec(); |
1044 while ((s = status) >= 0 && |
1042 } |
1045 (ns = deadline - System.nanoTime()) > 0L) { |
1043 boolean canBlock = false; |
1046 if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L && |
1044 boolean interrupted = false; |
1047 U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { |
1045 try { |
1048 synchronized (this) { |
1046 while ((s = status) >= 0) { |
1049 if (status >= 0) |
1047 if (w != null && w.qlock < 0) |
1050 wait(ms); // OK to throw InterruptedException |
1048 cancelIgnoringExceptions(this); |
1051 else |
1049 else if (!canBlock) { |
1052 notifyAll(); |
1050 if (p == null || p.tryCompensate(p.ctl)) |
|
1051 canBlock = true; |
|
1052 } |
|
1053 else { |
|
1054 if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L && |
|
1055 U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { |
|
1056 synchronized (this) { |
|
1057 if (status >= 0) { |
|
1058 try { |
|
1059 wait(ms); |
|
1060 } catch (InterruptedException ie) { |
|
1061 if (p == null) |
|
1062 interrupted = true; |
|
1063 } |
|
1064 } |
|
1065 else |
|
1066 notifyAll(); |
|
1067 } |
|
1068 } |
1053 } |
1069 if ((s = status) < 0 || interrupted || |
|
1070 (ns = deadline - System.nanoTime()) <= 0L) |
|
1071 break; |
|
1072 } |
1054 } |
1073 } |
1055 } |
1074 } finally { |
1056 } |
1075 if (p != null && canBlock) |
1057 } |
1076 p.incrementActiveCount(); |
1058 if (s >= 0) |
1077 } |
1059 s = status; |
1078 if (interrupted) |
|
1079 throw new InterruptedException(); |
|
1080 } |
|
1081 if ((s &= DONE_MASK) != NORMAL) { |
1060 if ((s &= DONE_MASK) != NORMAL) { |
1082 Throwable ex; |
1061 Throwable ex; |
1083 if (s == CANCELLED) |
1062 if (s == CANCELLED) |
1084 throw new CancellationException(); |
1063 throw new CancellationException(); |
1085 if (s != EXCEPTIONAL) |
1064 if (s != EXCEPTIONAL) |