jdk/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
changeset 26448 5853628b0e63
parent 25859 3317bb8137f4
child 32988 da3715f8eec3
equal deleted inserted replaced
26367:5da963ed0720 26448:5853628b0e63
   295         }
   295         }
   296         return s;
   296         return s;
   297     }
   297     }
   298 
   298 
   299     /**
   299     /**
   300      * Tries to set SIGNAL status unless already completed. Used by
   300      * If not done, sets SIGNAL status and performs Object.wait(timeout).
   301      * ForkJoinPool. Other variants are directly incorporated into
   301      * This task may or may not be done on exit. Ignores interrupts.
   302      * externalAwaitDone etc.
   302      *
   303      *
   303      * @param timeout using Object.wait conventions.
   304      * @return true if successful
   304      */
   305      */
   305     final void internalWait(long timeout) {
   306     final boolean trySetSignal() {
   306         int s;
   307         int s = status;
   307         if ((s = status) >= 0 && // force completer to issue notify
   308         return s >= 0 && U.compareAndSwapInt(this, STATUS, s, s | SIGNAL);
   308             U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
       
   309             synchronized (this) {
       
   310                 if (status >= 0)
       
   311                     try { wait(timeout); } catch (InterruptedException ie) { }
       
   312                 else
       
   313                     notifyAll();
       
   314             }
       
   315         }
   309     }
   316     }
   310 
   317 
   311     /**
   318     /**
   312      * Blocks a non-worker-thread until completion.
   319      * Blocks a non-worker-thread until completion.
   313      * @return status upon completion
   320      * @return status upon completion
   314      */
   321      */
   315     private int externalAwaitDone() {
   322     private int externalAwaitDone() {
   316         int s;
   323         int s = ((this instanceof CountedCompleter) ? // try helping
   317         ForkJoinPool cp = ForkJoinPool.common;
   324                  ForkJoinPool.common.externalHelpComplete(
   318         if ((s = status) >= 0) {
   325                      (CountedCompleter<?>)this, 0) :
   319             if (cp != null) {
   326                  ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
   320                 if (this instanceof CountedCompleter)
   327         if (s >= 0 && (s = status) >= 0) {
   321                     s = cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE);
   328             boolean interrupted = false;
   322                 else if (cp.tryExternalUnpush(this))
   329             do {
   323                     s = doExec();
   330                 if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
   324             }
   331                     synchronized (this) {
   325             if (s >= 0 && (s = status) >= 0) {
   332                         if (status >= 0) {
   326                 boolean interrupted = false;
   333                             try {
   327                 do {
   334                                 wait(0L);
   328                     if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
   335                             } catch (InterruptedException ie) {
   329                         synchronized (this) {
   336                                 interrupted = true;
   330                             if (status >= 0) {
       
   331                                 try {
       
   332                                     wait();
       
   333                                 } catch (InterruptedException ie) {
       
   334                                     interrupted = true;
       
   335                                 }
       
   336                             }
   337                             }
   337                             else
       
   338                                 notifyAll();
       
   339                         }
   338                         }
       
   339                         else
       
   340                             notifyAll();
   340                     }
   341                     }
   341                 } while ((s = status) >= 0);
   342                 }
   342                 if (interrupted)
   343             } while ((s = status) >= 0);
   343                     Thread.currentThread().interrupt();
   344             if (interrupted)
   344             }
   345                 Thread.currentThread().interrupt();
   345         }
   346         }
   346         return s;
   347         return s;
   347     }
   348     }
   348 
   349 
   349     /**
   350     /**
   350      * Blocks a non-worker-thread until completion or interruption.
   351      * Blocks a non-worker-thread until completion or interruption.
   351      */
   352      */
   352     private int externalInterruptibleAwaitDone() throws InterruptedException {
   353     private int externalInterruptibleAwaitDone() throws InterruptedException {
   353         int s;
   354         int s;
   354         ForkJoinPool cp = ForkJoinPool.common;
       
   355         if (Thread.interrupted())
   355         if (Thread.interrupted())
   356             throw new InterruptedException();
   356             throw new InterruptedException();
   357         if ((s = status) >= 0 && cp != null) {
   357         if ((s = status) >= 0 &&
   358             if (this instanceof CountedCompleter)
   358             (s = ((this instanceof CountedCompleter) ?
   359                 cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE);
   359                   ForkJoinPool.common.externalHelpComplete(
   360             else if (cp.tryExternalUnpush(this))
   360                       (CountedCompleter<?>)this, 0) :
   361                 doExec();
   361                   ForkJoinPool.common.tryExternalUnpush(this) ? doExec() :
   362         }
   362                   0)) >= 0) {
   363         while ((s = status) >= 0) {
   363             while ((s = status) >= 0) {
   364             if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
   364                 if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
   365                 synchronized (this) {
   365                     synchronized (this) {
   366                     if (status >= 0)
   366                         if (status >= 0)
   367                         wait();
   367                             wait(0L);
   368                     else
   368                         else
   369                         notifyAll();
   369                             notifyAll();
       
   370                     }
   370                 }
   371                 }
   371             }
   372             }
   372         }
   373         }
   373         return s;
   374         return s;
   374     }
   375     }
   384         int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
   385         int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
   385         return (s = status) < 0 ? s :
   386         return (s = status) < 0 ? s :
   386             ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
   387             ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
   387             (w = (wt = (ForkJoinWorkerThread)t).workQueue).
   388             (w = (wt = (ForkJoinWorkerThread)t).workQueue).
   388             tryUnpush(this) && (s = doExec()) < 0 ? s :
   389             tryUnpush(this) && (s = doExec()) < 0 ? s :
   389             wt.pool.awaitJoin(w, this) :
   390             wt.pool.awaitJoin(w, this, 0L) :
   390             externalAwaitDone();
   391             externalAwaitDone();
   391     }
   392     }
   392 
   393 
   393     /**
   394     /**
   394      * Implementation for invoke, quietlyInvoke.
   395      * Implementation for invoke, quietlyInvoke.
   397      */
   398      */
   398     private int doInvoke() {
   399     private int doInvoke() {
   399         int s; Thread t; ForkJoinWorkerThread wt;
   400         int s; Thread t; ForkJoinWorkerThread wt;
   400         return (s = doExec()) < 0 ? s :
   401         return (s = doExec()) < 0 ? s :
   401             ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
   402             ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
   402             (wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue, this) :
   403             (wt = (ForkJoinWorkerThread)t).pool.
       
   404             awaitJoin(wt.workQueue, this, 0L) :
   403             externalAwaitDone();
   405             externalAwaitDone();
   404     }
   406     }
   405 
   407 
   406     // Exception table support
   408     // Exception table support
   407 
   409 
   575             lock.unlock();
   577             lock.unlock();
   576         }
   578         }
   577         Throwable ex;
   579         Throwable ex;
   578         if (e == null || (ex = e.ex) == null)
   580         if (e == null || (ex = e.ex) == null)
   579             return null;
   581             return null;
   580         if (false && e.thrower != Thread.currentThread().getId()) {
   582         if (e.thrower != Thread.currentThread().getId()) {
   581             Class<? extends Throwable> ec = ex.getClass();
   583             Class<? extends Throwable> ec = ex.getClass();
   582             try {
   584             try {
   583                 Constructor<?> noArgCtor = null;
   585                 Constructor<?> noArgCtor = null;
   584                 Constructor<?>[] cs = ec.getConstructors();// public ctors only
   586                 Constructor<?>[] cs = ec.getConstructors();// public ctors only
   585                 for (int i = 0; i < cs.length; ++i) {
   587                 for (int i = 0; i < cs.length; ++i) {
   586                     Constructor<?> c = cs[i];
   588                     Constructor<?> c = cs[i];
   587                     Class<?>[] ps = c.getParameterTypes();
   589                     Class<?>[] ps = c.getParameterTypes();
   588                     if (ps.length == 0)
   590                     if (ps.length == 0)
   589                         noArgCtor = c;
   591                         noArgCtor = c;
   590                     else if (ps.length == 1 && ps[0] == Throwable.class)
   592                     else if (ps.length == 1 && ps[0] == Throwable.class) {
   591                         return (Throwable)(c.newInstance(ex));
   593                         Throwable wx = (Throwable)c.newInstance(ex);
       
   594                         return (wx == null) ? ex : wx;
       
   595                     }
   592                 }
   596                 }
   593                 if (noArgCtor != null) {
   597                 if (noArgCtor != null) {
   594                     Throwable wx = (Throwable)(noArgCtor.newInstance());
   598                     Throwable wx = (Throwable)(noArgCtor.newInstance());
   595                     wx.initCause(ex);
   599                     if (wx != null) {
   596                     return wx;
   600                         wx.initCause(ex);
       
   601                         return wx;
       
   602                     }
   597                 }
   603                 }
   598             } catch (Exception ignore) {
   604             } catch (Exception ignore) {
   599             }
   605             }
   600         }
   606         }
   601         return ex;
   607         return ex;
  1015      * member of a ForkJoinPool and was interrupted while waiting
  1021      * member of a ForkJoinPool and was interrupted while waiting
  1016      * @throws TimeoutException if the wait timed out
  1022      * @throws TimeoutException if the wait timed out
  1017      */
  1023      */
  1018     public final V get(long timeout, TimeUnit unit)
  1024     public final V get(long timeout, TimeUnit unit)
  1019         throws InterruptedException, ExecutionException, TimeoutException {
  1025         throws InterruptedException, ExecutionException, TimeoutException {
       
  1026         int s;
       
  1027         long nanos = unit.toNanos(timeout);
  1020         if (Thread.interrupted())
  1028         if (Thread.interrupted())
  1021             throw new InterruptedException();
  1029             throw new InterruptedException();
  1022         // Messy in part because we measure in nanosecs, but wait in millisecs
  1030         if ((s = status) >= 0 && nanos > 0L) {
  1023         int s; long ms;
  1031             long d = System.nanoTime() + nanos;
  1024         long ns = unit.toNanos(timeout);
  1032             long deadline = (d == 0L) ? 1L : d; // avoid 0
  1025         ForkJoinPool cp;
       
  1026         if ((s = status) >= 0 && ns > 0L) {
       
  1027             long deadline = System.nanoTime() + ns;
       
  1028             ForkJoinPool p = null;
       
  1029             ForkJoinPool.WorkQueue w = null;
       
  1030             Thread t = Thread.currentThread();
  1033             Thread t = Thread.currentThread();
  1031             if (t instanceof ForkJoinWorkerThread) {
  1034             if (t instanceof ForkJoinWorkerThread) {
  1032                 ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
  1035                 ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
  1033                 p = wt.pool;
  1036                 s = wt.pool.awaitJoin(wt.workQueue, this, deadline);
  1034                 w = wt.workQueue;
  1037             }
  1035                 p.helpJoinOnce(w, this); // no retries on failure
  1038             else if ((s = ((this instanceof CountedCompleter) ?
  1036             }
  1039                            ForkJoinPool.common.externalHelpComplete(
  1037             else if ((cp = ForkJoinPool.common) != null) {
  1040                                (CountedCompleter<?>)this, 0) :
  1038                 if (this instanceof CountedCompleter)
  1041                            ForkJoinPool.common.tryExternalUnpush(this) ?
  1039                     cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE);
  1042                            doExec() : 0)) >= 0) {
  1040                 else if (cp.tryExternalUnpush(this))
  1043                 long ns, ms; // measure in nanosecs, but wait in millisecs
  1041                     doExec();
  1044                 while ((s = status) >= 0 &&
  1042             }
  1045                        (ns = deadline - System.nanoTime()) > 0L) {
  1043             boolean canBlock = false;
  1046                     if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
  1044             boolean interrupted = false;
  1047                         U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
  1045             try {
  1048                         synchronized (this) {
  1046                 while ((s = status) >= 0) {
  1049                             if (status >= 0)
  1047                     if (w != null && w.qlock < 0)
  1050                                 wait(ms); // OK to throw InterruptedException
  1048                         cancelIgnoringExceptions(this);
  1051                             else
  1049                     else if (!canBlock) {
  1052                                 notifyAll();
  1050                         if (p == null || p.tryCompensate(p.ctl))
       
  1051                             canBlock = true;
       
  1052                     }
       
  1053                     else {
       
  1054                         if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
       
  1055                             U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
       
  1056                             synchronized (this) {
       
  1057                                 if (status >= 0) {
       
  1058                                     try {
       
  1059                                         wait(ms);
       
  1060                                     } catch (InterruptedException ie) {
       
  1061                                         if (p == null)
       
  1062                                             interrupted = true;
       
  1063                                     }
       
  1064                                 }
       
  1065                                 else
       
  1066                                     notifyAll();
       
  1067                             }
       
  1068                         }
  1053                         }
  1069                         if ((s = status) < 0 || interrupted ||
       
  1070                             (ns = deadline - System.nanoTime()) <= 0L)
       
  1071                             break;
       
  1072                     }
  1054                     }
  1073                 }
  1055                 }
  1074             } finally {
  1056             }
  1075                 if (p != null && canBlock)
  1057         }
  1076                     p.incrementActiveCount();
  1058         if (s >= 0)
  1077             }
  1059             s = status;
  1078             if (interrupted)
       
  1079                 throw new InterruptedException();
       
  1080         }
       
  1081         if ((s &= DONE_MASK) != NORMAL) {
  1060         if ((s &= DONE_MASK) != NORMAL) {
  1082             Throwable ex;
  1061             Throwable ex;
  1083             if (s == CANCELLED)
  1062             if (s == CANCELLED)
  1084                 throw new CancellationException();
  1063                 throw new CancellationException();
  1085             if (s != EXCEPTIONAL)
  1064             if (s != EXCEPTIONAL)