jdk/src/share/classes/java/util/concurrent/ForkJoinTask.java
changeset 18790 d25399d849bc
parent 16049 92a3a919d4dc
child 19048 7d0a94c79779
equal deleted inserted replaced
18789:b518cd4045bc 18790:d25399d849bc
   163  * (DAG). Otherwise, executions may encounter a form of deadlock as
   163  * (DAG). Otherwise, executions may encounter a form of deadlock as
   164  * tasks cyclically wait for each other.  However, this framework
   164  * tasks cyclically wait for each other.  However, this framework
   165  * supports other methods and techniques (for example the use of
   165  * supports other methods and techniques (for example the use of
   166  * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that
   166  * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that
   167  * may be of use in constructing custom subclasses for problems that
   167  * may be of use in constructing custom subclasses for problems that
   168  * are not statically structured as DAGs. To support such usages a
   168  * are not statically structured as DAGs. To support such usages, a
   169  * ForkJoinTask may be atomically <em>tagged</em> with a {@code short}
   169  * ForkJoinTask may be atomically <em>tagged</em> with a {@code short}
   170  * value using {@link #setForkJoinTaskTag} or {@link
   170  * value using {@link #setForkJoinTaskTag} or {@link
   171  * #compareAndSetForkJoinTaskTag} and checked using {@link
   171  * #compareAndSetForkJoinTaskTag} and checked using {@link
   172  * #getForkJoinTaskTag}. The ForkJoinTask implementation does not use
   172  * #getForkJoinTaskTag}. The ForkJoinTask implementation does not use
   173  * these {@code protected} methods or tags for any purpose, but they
   173  * these {@code protected} methods or tags for any purpose, but they
   312      * Blocks a non-worker-thread until completion.
   312      * Blocks a non-worker-thread until completion.
   313      * @return status upon completion
   313      * @return status upon completion
   314      */
   314      */
   315     private int externalAwaitDone() {
   315     private int externalAwaitDone() {
   316         int s;
   316         int s;
   317         ForkJoinPool.externalHelpJoin(this);
   317         ForkJoinPool cp = ForkJoinPool.common;
   318         boolean interrupted = false;
   318         if ((s = status) >= 0) {
   319         while ((s = status) >= 0) {
   319             if (cp != null) {
   320             if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
   320                 if (this instanceof CountedCompleter)
   321                 synchronized (this) {
   321                     s = cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE);
   322                     if (status >= 0) {
   322                 else if (cp.tryExternalUnpush(this))
   323                         try {
   323                     s = doExec();
   324                             wait();
   324             }
   325                         } catch (InterruptedException ie) {
   325             if (s >= 0 && (s = status) >= 0) {
   326                             interrupted = true;
   326                 boolean interrupted = false;
       
   327                 do {
       
   328                     if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
       
   329                         synchronized (this) {
       
   330                             if (status >= 0) {
       
   331                                 try {
       
   332                                     wait();
       
   333                                 } catch (InterruptedException ie) {
       
   334                                     interrupted = true;
       
   335                                 }
       
   336                             }
       
   337                             else
       
   338                                 notifyAll();
   327                         }
   339                         }
   328                     }
   340                     }
   329                     else
   341                 } while ((s = status) >= 0);
   330                         notifyAll();
   342                 if (interrupted)
   331                 }
   343                     Thread.currentThread().interrupt();
   332             }
   344             }
   333         }
   345         }
   334         if (interrupted)
       
   335             Thread.currentThread().interrupt();
       
   336         return s;
   346         return s;
   337     }
   347     }
   338 
   348 
   339     /**
   349     /**
   340      * Blocks a non-worker-thread until completion or interruption.
   350      * Blocks a non-worker-thread until completion or interruption.
   341      */
   351      */
   342     private int externalInterruptibleAwaitDone() throws InterruptedException {
   352     private int externalInterruptibleAwaitDone() throws InterruptedException {
   343         int s;
   353         int s;
       
   354         ForkJoinPool cp = ForkJoinPool.common;
   344         if (Thread.interrupted())
   355         if (Thread.interrupted())
   345             throw new InterruptedException();
   356             throw new InterruptedException();
   346         ForkJoinPool.externalHelpJoin(this);
   357         if ((s = status) >= 0 && cp != null) {
       
   358             if (this instanceof CountedCompleter)
       
   359                 cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE);
       
   360             else if (cp.tryExternalUnpush(this))
       
   361                 doExec();
       
   362         }
   347         while ((s = status) >= 0) {
   363         while ((s = status) >= 0) {
   348             if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
   364             if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
   349                 synchronized (this) {
   365                 synchronized (this) {
   350                     if (status >= 0)
   366                     if (status >= 0)
   351                         wait();
   367                         wait();
   354                 }
   370                 }
   355             }
   371             }
   356         }
   372         }
   357         return s;
   373         return s;
   358     }
   374     }
   359 
       
   360 
   375 
   361     /**
   376     /**
   362      * Implementation for join, get, quietlyJoin. Directly handles
   377      * Implementation for join, get, quietlyJoin. Directly handles
   363      * only cases of already-completed, external wait, and
   378      * only cases of already-completed, external wait, and
   364      * unfork+exec.  Others are relayed to ForkJoinPool.awaitJoin.
   379      * unfork+exec.  Others are relayed to ForkJoinPool.awaitJoin.
   627     }
   642     }
   628 
   643 
   629     /**
   644     /**
   630      * A version of "sneaky throw" to relay exceptions
   645      * A version of "sneaky throw" to relay exceptions
   631      */
   646      */
   632     static void rethrow(final Throwable ex) {
   647     static void rethrow(Throwable ex) {
   633         if (ex != null) {
   648         if (ex != null)
   634             if (ex instanceof Error)
       
   635                 throw (Error)ex;
       
   636             if (ex instanceof RuntimeException)
       
   637                 throw (RuntimeException)ex;
       
   638             ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
   649             ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
   639         }
       
   640     }
   650     }
   641 
   651 
   642     /**
   652     /**
   643      * The sneaky part of sneaky throw, relying on generics
   653      * The sneaky part of sneaky throw, relying on generics
   644      * limitations to evade compiler complaints about rethrowing
   654      * limitations to evade compiler complaints about rethrowing
   645      * unchecked exceptions
   655      * unchecked exceptions
   646      */
   656      */
   647     @SuppressWarnings("unchecked") static <T extends Throwable>
   657     @SuppressWarnings("unchecked") static <T extends Throwable>
   648         void uncheckedThrow(Throwable t) throws T {
   658         void uncheckedThrow(Throwable t) throws T {
   649         if (t != null)
   659         throw (T)t; // rely on vacuous cast
   650             throw (T)t; // rely on vacuous cast
       
   651     }
   660     }
   652 
   661 
   653     /**
   662     /**
   654      * Throws exception, if any, associated with the given status.
   663      * Throws exception, if any, associated with the given status.
   655      */
   664      */
  1008         if (Thread.interrupted())
  1017         if (Thread.interrupted())
  1009             throw new InterruptedException();
  1018             throw new InterruptedException();
  1010         // Messy in part because we measure in nanosecs, but wait in millisecs
  1019         // Messy in part because we measure in nanosecs, but wait in millisecs
  1011         int s; long ms;
  1020         int s; long ms;
  1012         long ns = unit.toNanos(timeout);
  1021         long ns = unit.toNanos(timeout);
       
  1022         ForkJoinPool cp;
  1013         if ((s = status) >= 0 && ns > 0L) {
  1023         if ((s = status) >= 0 && ns > 0L) {
  1014             long deadline = System.nanoTime() + ns;
  1024             long deadline = System.nanoTime() + ns;
  1015             ForkJoinPool p = null;
  1025             ForkJoinPool p = null;
  1016             ForkJoinPool.WorkQueue w = null;
  1026             ForkJoinPool.WorkQueue w = null;
  1017             Thread t = Thread.currentThread();
  1027             Thread t = Thread.currentThread();
  1019                 ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
  1029                 ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
  1020                 p = wt.pool;
  1030                 p = wt.pool;
  1021                 w = wt.workQueue;
  1031                 w = wt.workQueue;
  1022                 p.helpJoinOnce(w, this); // no retries on failure
  1032                 p.helpJoinOnce(w, this); // no retries on failure
  1023             }
  1033             }
  1024             else
  1034             else if ((cp = ForkJoinPool.common) != null) {
  1025                 ForkJoinPool.externalHelpJoin(this);
  1035                 if (this instanceof CountedCompleter)
       
  1036                     cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE);
       
  1037                 else if (cp.tryExternalUnpush(this))
       
  1038                     doExec();
       
  1039             }
  1026             boolean canBlock = false;
  1040             boolean canBlock = false;
  1027             boolean interrupted = false;
  1041             boolean interrupted = false;
  1028             try {
  1042             try {
  1029                 while ((s = status) >= 0) {
  1043                 while ((s = status) >= 0) {
  1030                     if (w != null && w.qlock < 0)
  1044                     if (w != null && w.qlock < 0)
  1031                         cancelIgnoringExceptions(this);
  1045                         cancelIgnoringExceptions(this);
  1032                     else if (!canBlock) {
  1046                     else if (!canBlock) {
  1033                         if (p == null || p.tryCompensate())
  1047                         if (p == null || p.tryCompensate(p.ctl))
  1034                             canBlock = true;
  1048                             canBlock = true;
  1035                     }
  1049                     }
  1036                     else {
  1050                     else {
  1037                         if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
  1051                         if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
  1038                             U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
  1052                             U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
  1169      */
  1183      */
  1170     public boolean tryUnfork() {
  1184     public boolean tryUnfork() {
  1171         Thread t;
  1185         Thread t;
  1172         return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
  1186         return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
  1173                 ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) :
  1187                 ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) :
  1174                 ForkJoinPool.tryExternalUnpush(this));
  1188                 ForkJoinPool.common.tryExternalUnpush(this));
  1175     }
  1189     }
  1176 
  1190 
  1177     /**
  1191     /**
  1178      * Returns an estimate of the number of tasks that have been
  1192      * Returns an estimate of the number of tasks that have been
  1179      * forked by the current worker thread but not yet executed. This
  1193      * forked by the current worker thread but not yet executed. This
  1338      * before processing, otherwise exiting because the node has
  1352      * before processing, otherwise exiting because the node has
  1339      * already been visited.
  1353      * already been visited.
  1340      *
  1354      *
  1341      * @param e the expected tag value
  1355      * @param e the expected tag value
  1342      * @param tag the new tag value
  1356      * @param tag the new tag value
  1343      * @return true if successful; i.e., the current value was
  1357      * @return {@code true} if successful; i.e., the current value was
  1344      * equal to e and is now tag.
  1358      * equal to e and is now tag.
  1345      * @since 1.8
  1359      * @since 1.8
  1346      */
  1360      */
  1347     public final boolean compareAndSetForkJoinTaskTag(short e, short tag) {
  1361     public final boolean compareAndSetForkJoinTaskTag(short e, short tag) {
  1348         for (int s;;) {
  1362         for (int s;;) {