jdk/src/share/classes/java/util/concurrent/ForkJoinTask.java
changeset 4110 ac033ba6ede4
child 5506 202f599c92aa
equal deleted inserted replaced
4109:b997a0a1005d 4110:ac033ba6ede4
       
     1 /*
       
     2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     3  *
       
     4  * This code is free software; you can redistribute it and/or modify it
       
     5  * under the terms of the GNU General Public License version 2 only, as
       
     6  * published by the Free Software Foundation.  Sun designates this
       
     7  * particular file as subject to the "Classpath" exception as provided
       
     8  * by Sun in the LICENSE file that accompanied this code.
       
     9  *
       
    10  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    13  * version 2 for more details (a copy is included in the LICENSE file that
       
    14  * accompanied this code).
       
    15  *
       
    16  * You should have received a copy of the GNU General Public License version
       
    17  * 2 along with this work; if not, write to the Free Software Foundation,
       
    18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    19  *
       
    20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
       
    21  * CA 95054 USA or visit www.sun.com if you need additional information or
       
    22  * have any questions.
       
    23  */
       
    24 
       
    25 /*
       
    26  * This file is available under and governed by the GNU General Public
       
    27  * License version 2 only, as published by the Free Software Foundation.
       
    28  * However, the following notice accompanied the original version of this
       
    29  * file:
       
    30  *
       
    31  * Written by Doug Lea with assistance from members of JCP JSR-166
       
    32  * Expert Group and released to the public domain, as explained at
       
    33  * http://creativecommons.org/licenses/publicdomain
       
    34  */
       
    35 
       
    36 package java.util.concurrent;
       
    37 
       
    38 import java.io.Serializable;
       
    39 import java.util.Collection;
       
    40 import java.util.Collections;
       
    41 import java.util.List;
       
    42 import java.util.RandomAccess;
       
    43 import java.util.Map;
       
    44 import java.util.WeakHashMap;
       
    45 
       
    46 /**
       
    47  * Abstract base class for tasks that run within a {@link ForkJoinPool}.
       
    48  * A {@code ForkJoinTask} is a thread-like entity that is much
       
    49  * lighter weight than a normal thread.  Huge numbers of tasks and
       
    50  * subtasks may be hosted by a small number of actual threads in a
       
    51  * ForkJoinPool, at the price of some usage limitations.
       
    52  *
       
    53  * <p>A "main" {@code ForkJoinTask} begins execution when submitted
       
    54  * to a {@link ForkJoinPool}.  Once started, it will usually in turn
       
    55  * start other subtasks.  As indicated by the name of this class,
       
    56  * many programs using {@code ForkJoinTask} employ only methods
       
    57  * {@link #fork} and {@link #join}, or derivatives such as {@link
       
    58  * #invokeAll}.  However, this class also provides a number of other
       
    59  * methods that can come into play in advanced usages, as well as
       
    60  * extension mechanics that allow support of new forms of fork/join
       
    61  * processing.
       
    62  *
       
    63  * <p>A {@code ForkJoinTask} is a lightweight form of {@link Future}.
       
    64  * The efficiency of {@code ForkJoinTask}s stems from a set of
       
    65  * restrictions (that are only partially statically enforceable)
       
    66  * reflecting their intended use as computational tasks calculating
       
    67  * pure functions or operating on purely isolated objects.  The
       
    68  * primary coordination mechanisms are {@link #fork}, that arranges
       
    69  * asynchronous execution, and {@link #join}, that doesn't proceed
       
    70  * until the task's result has been computed.  Computations should
       
    71  * avoid {@code synchronized} methods or blocks, and should minimize
       
    72  * other blocking synchronization apart from joining other tasks or
       
    73  * using synchronizers such as Phasers that are advertised to
       
    74  * cooperate with fork/join scheduling. Tasks should also not perform
       
    75  * blocking IO, and should ideally access variables that are
       
    76  * completely independent of those accessed by other running
       
    77  * tasks. Minor breaches of these restrictions, for example using
       
    78  * shared output streams, may be tolerable in practice, but frequent
       
    79  * use may result in poor performance, and the potential to
       
    80  * indefinitely stall if the number of threads not waiting for IO or
       
    81  * other external synchronization becomes exhausted. This usage
       
    82  * restriction is in part enforced by not permitting checked
       
    83  * exceptions such as {@code IOExceptions} to be thrown. However,
       
    84  * computations may still encounter unchecked exceptions, that are
       
    85  * rethrown to callers attempting to join them. These exceptions may
       
    86  * additionally include {@link RejectedExecutionException} stemming
       
    87  * from internal resource exhaustion, such as failure to allocate
       
    88  * internal task queues.
       
    89  *
       
    90  * <p>The primary method for awaiting completion and extracting
       
    91  * results of a task is {@link #join}, but there are several variants:
       
    92  * The {@link Future#get} methods support interruptible and/or timed
       
    93  * waits for completion and report results using {@code Future}
       
    94  * conventions. Method {@link #helpJoin} enables callers to actively
       
    95  * execute other tasks while awaiting joins, which is sometimes more
       
    96  * efficient but only applies when all subtasks are known to be
       
    97  * strictly tree-structured. Method {@link #invoke} is semantically
       
    98  * equivalent to {@code fork(); join()} but always attempts to begin
       
    99  * execution in the current thread. The "<em>quiet</em>" forms of
       
   100  * these methods do not extract results or report exceptions. These
       
   101  * may be useful when a set of tasks are being executed, and you need
       
   102  * to delay processing of results or exceptions until all complete.
       
   103  * Method {@code invokeAll} (available in multiple versions)
       
   104  * performs the most common form of parallel invocation: forking a set
       
   105  * of tasks and joining them all.
       
   106  *
       
   107  * <p>The execution status of tasks may be queried at several levels
       
   108  * of detail: {@link #isDone} is true if a task completed in any way
       
   109  * (including the case where a task was cancelled without executing);
       
   110  * {@link #isCompletedNormally} is true if a task completed without
       
   111  * cancellation or encountering an exception; {@link #isCancelled} is
       
   112  * true if the task was cancelled (in which case {@link #getException}
       
   113  * returns a {@link java.util.concurrent.CancellationException}); and
       
   114  * {@link #isCompletedAbnormally} is true if a task was either
       
   115  * cancelled or encountered an exception, in which case {@link
       
   116  * #getException} will return either the encountered exception or
       
   117  * {@link java.util.concurrent.CancellationException}.
       
   118  *
       
   119  * <p>The ForkJoinTask class is not usually directly subclassed.
       
   120  * Instead, you subclass one of the abstract classes that support a
       
   121  * particular style of fork/join processing, typically {@link
       
   122  * RecursiveAction} for computations that do not return results, or
       
   123  * {@link RecursiveTask} for those that do.  Normally, a concrete
       
   124  * ForkJoinTask subclass declares fields comprising its parameters,
       
   125  * established in a constructor, and then defines a {@code compute}
       
   126  * method that somehow uses the control methods supplied by this base
       
   127  * class. While these methods have {@code public} access (to allow
       
   128  * instances of different task subclasses to call each other's
       
   129  * methods), some of them may only be called from within other
       
   130  * ForkJoinTasks (as may be determined using method {@link
       
   131  * #inForkJoinPool}).  Attempts to invoke them in other contexts
       
   132  * result in exceptions or errors, possibly including
       
   133  * ClassCastException.
       
   134  *
       
   135  * <p>Most base support methods are {@code final}, to prevent
       
   136  * overriding of implementations that are intrinsically tied to the
       
   137  * underlying lightweight task scheduling framework.  Developers
       
   138  * creating new basic styles of fork/join processing should minimally
       
   139  * implement {@code protected} methods {@link #exec}, {@link
       
   140  * #setRawResult}, and {@link #getRawResult}, while also introducing
       
   141  * an abstract computational method that can be implemented in its
       
   142  * subclasses, possibly relying on other {@code protected} methods
       
   143  * provided by this class.
       
   144  *
       
   145  * <p>ForkJoinTasks should perform relatively small amounts of
       
   146  * computation. Large tasks should be split into smaller subtasks,
       
   147  * usually via recursive decomposition. As a very rough rule of thumb,
       
   148  * a task should perform more than 100 and less than 10000 basic
       
   149  * computational steps. If tasks are too big, then parallelism cannot
       
   150  * improve throughput. If too small, then memory and internal task
       
   151  * maintenance overhead may overwhelm processing.
       
   152  *
       
   153  * <p>This class provides {@code adapt} methods for {@link Runnable}
       
   154  * and {@link Callable}, that may be of use when mixing execution of
       
   155  * {@code ForkJoinTasks} with other kinds of tasks. When all tasks
       
   156  * are of this form, consider using a pool in
       
   157  * {@linkplain ForkJoinPool#setAsyncMode async mode}.
       
   158  *
       
   159  * <p>ForkJoinTasks are {@code Serializable}, which enables them to be
       
   160  * used in extensions such as remote execution frameworks. It is
       
   161  * sensible to serialize tasks only before or after, but not during,
       
   162  * execution. Serialization is not relied on during execution itself.
       
   163  *
       
   164  * @since 1.7
       
   165  * @author Doug Lea
       
   166  */
       
   167 public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
       
   168 
       
   169     /**
       
   170      * Run control status bits packed into a single int to minimize
       
   171      * footprint and to ensure atomicity (via CAS).  Status is
       
   172      * initially zero, and takes on nonnegative values until
       
   173      * completed, upon which status holds COMPLETED. CANCELLED, or
       
   174      * EXCEPTIONAL, which use the top 3 bits.  Tasks undergoing
       
   175      * blocking waits by other threads have SIGNAL_MASK bits set --
       
   176      * bit 15 for external (nonFJ) waits, and the rest a count of
       
   177      * waiting FJ threads.  (This representation relies on
       
   178      * ForkJoinPool max thread limits). Completion of a stolen task
       
   179      * with SIGNAL_MASK bits set awakens waiter via notifyAll. Even
       
   180      * though suboptimal for some purposes, we use basic builtin
       
   181      * wait/notify to take advantage of "monitor inflation" in JVMs
       
   182      * that we would otherwise need to emulate to avoid adding further
       
   183      * per-task bookkeeping overhead. Note that bits 16-28 are
       
   184      * currently unused. Also value 0x80000000 is available as spare
       
   185      * completion value.
       
   186      */
       
   187     volatile int status; // accessed directly by pool and workers
       
   188 
       
   189     static final int COMPLETION_MASK      = 0xe0000000;
       
   190     static final int NORMAL               = 0xe0000000; // == mask
       
   191     static final int CANCELLED            = 0xc0000000;
       
   192     static final int EXCEPTIONAL          = 0xa0000000;
       
   193     static final int SIGNAL_MASK          = 0x0000ffff;
       
   194     static final int INTERNAL_SIGNAL_MASK = 0x00007fff;
       
   195     static final int EXTERNAL_SIGNAL      = 0x00008000; // top bit of low word
       
   196 
       
   197     /**
       
   198      * Table of exceptions thrown by tasks, to enable reporting by
       
   199      * callers. Because exceptions are rare, we don't directly keep
       
   200      * them with task objects, but instead use a weak ref table.  Note
       
   201      * that cancellation exceptions don't appear in the table, but are
       
   202      * instead recorded as status values.
       
   203      * TODO: Use ConcurrentReferenceHashMap
       
   204      */
       
   205     static final Map<ForkJoinTask<?>, Throwable> exceptionMap =
       
   206         Collections.synchronizedMap
       
   207         (new WeakHashMap<ForkJoinTask<?>, Throwable>());
       
   208 
       
   209     // within-package utilities
       
   210 
       
   211     /**
       
   212      * Gets current worker thread, or null if not a worker thread.
       
   213      */
       
   214     static ForkJoinWorkerThread getWorker() {
       
   215         Thread t = Thread.currentThread();
       
   216         return ((t instanceof ForkJoinWorkerThread) ?
       
   217                 (ForkJoinWorkerThread) t : null);
       
   218     }
       
   219 
       
   220     final boolean casStatus(int cmp, int val) {
       
   221         return UNSAFE.compareAndSwapInt(this, statusOffset, cmp, val);
       
   222     }
       
   223 
       
   224     /**
       
   225      * Workaround for not being able to rethrow unchecked exceptions.
       
   226      */
       
   227     static void rethrowException(Throwable ex) {
       
   228         if (ex != null)
       
   229             UNSAFE.throwException(ex);
       
   230     }
       
   231 
       
   232     // Setting completion status
       
   233 
       
   234     /**
       
   235      * Marks completion and wakes up threads waiting to join this task.
       
   236      *
       
   237      * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
       
   238      */
       
   239     final void setCompletion(int completion) {
       
   240         ForkJoinPool pool = getPool();
       
   241         if (pool != null) {
       
   242             int s; // Clear signal bits while setting completion status
       
   243             do {} while ((s = status) >= 0 && !casStatus(s, completion));
       
   244 
       
   245             if ((s & SIGNAL_MASK) != 0) {
       
   246                 if ((s &= INTERNAL_SIGNAL_MASK) != 0)
       
   247                     pool.updateRunningCount(s);
       
   248                 synchronized (this) { notifyAll(); }
       
   249             }
       
   250         }
       
   251         else
       
   252             externallySetCompletion(completion);
       
   253     }
       
   254 
       
   255     /**
       
   256      * Version of setCompletion for non-FJ threads.  Leaves signal
       
   257      * bits for unblocked threads to adjust, and always notifies.
       
   258      */
       
   259     private void externallySetCompletion(int completion) {
       
   260         int s;
       
   261         do {} while ((s = status) >= 0 &&
       
   262                      !casStatus(s, (s & SIGNAL_MASK) | completion));
       
   263         synchronized (this) { notifyAll(); }
       
   264     }
       
   265 
       
   266     /**
       
   267      * Sets status to indicate normal completion.
       
   268      */
       
   269     final void setNormalCompletion() {
       
   270         // Try typical fast case -- single CAS, no signal, not already done.
       
   271         // Manually expand casStatus to improve chances of inlining it
       
   272         if (!UNSAFE.compareAndSwapInt(this, statusOffset, 0, NORMAL))
       
   273             setCompletion(NORMAL);
       
   274     }
       
   275 
       
   276     // internal waiting and notification
       
   277 
       
   278     /**
       
   279      * Performs the actual monitor wait for awaitDone.
       
   280      */
       
   281     private void doAwaitDone() {
       
   282         // Minimize lock bias and in/de-flation effects by maximizing
       
   283         // chances of waiting inside sync
       
   284         try {
       
   285             while (status >= 0)
       
   286                 synchronized (this) { if (status >= 0) wait(); }
       
   287         } catch (InterruptedException ie) {
       
   288             onInterruptedWait();
       
   289         }
       
   290     }
       
   291 
       
   292     /**
       
   293      * Performs the actual timed monitor wait for awaitDone.
       
   294      */
       
   295     private void doAwaitDone(long startTime, long nanos) {
       
   296         synchronized (this) {
       
   297             try {
       
   298                 while (status >= 0) {
       
   299                     long nt = nanos - (System.nanoTime() - startTime);
       
   300                     if (nt <= 0)
       
   301                         break;
       
   302                     wait(nt / 1000000, (int) (nt % 1000000));
       
   303                 }
       
   304             } catch (InterruptedException ie) {
       
   305                 onInterruptedWait();
       
   306             }
       
   307         }
       
   308     }
       
   309 
       
   310     // Awaiting completion
       
   311 
       
   312     /**
       
   313      * Sets status to indicate there is joiner, then waits for join,
       
   314      * surrounded with pool notifications.
       
   315      *
       
   316      * @return status upon exit
       
   317      */
       
   318     private int awaitDone(ForkJoinWorkerThread w,
       
   319                           boolean maintainParallelism) {
       
   320         ForkJoinPool pool = (w == null) ? null : w.pool;
       
   321         int s;
       
   322         while ((s = status) >= 0) {
       
   323             if (casStatus(s, (pool == null) ? s|EXTERNAL_SIGNAL : s+1)) {
       
   324                 if (pool == null || !pool.preJoin(this, maintainParallelism))
       
   325                     doAwaitDone();
       
   326                 if (((s = status) & INTERNAL_SIGNAL_MASK) != 0)
       
   327                     adjustPoolCountsOnUnblock(pool);
       
   328                 break;
       
   329             }
       
   330         }
       
   331         return s;
       
   332     }
       
   333 
       
   334     /**
       
   335      * Timed version of awaitDone
       
   336      *
       
   337      * @return status upon exit
       
   338      */
       
   339     private int awaitDone(ForkJoinWorkerThread w, long nanos) {
       
   340         ForkJoinPool pool = (w == null) ? null : w.pool;
       
   341         int s;
       
   342         while ((s = status) >= 0) {
       
   343             if (casStatus(s, (pool == null) ? s|EXTERNAL_SIGNAL : s+1)) {
       
   344                 long startTime = System.nanoTime();
       
   345                 if (pool == null || !pool.preJoin(this, false))
       
   346                     doAwaitDone(startTime, nanos);
       
   347                 if ((s = status) >= 0) {
       
   348                     adjustPoolCountsOnCancelledWait(pool);
       
   349                     s = status;
       
   350                 }
       
   351                 if (s < 0 && (s & INTERNAL_SIGNAL_MASK) != 0)
       
   352                     adjustPoolCountsOnUnblock(pool);
       
   353                 break;
       
   354             }
       
   355         }
       
   356         return s;
       
   357     }
       
   358 
       
   359     /**
       
   360      * Notifies pool that thread is unblocked. Called by signalled
       
   361      * threads when woken by non-FJ threads (which is atypical).
       
   362      */
       
   363     private void adjustPoolCountsOnUnblock(ForkJoinPool pool) {
       
   364         int s;
       
   365         do {} while ((s = status) < 0 && !casStatus(s, s & COMPLETION_MASK));
       
   366         if (pool != null && (s &= INTERNAL_SIGNAL_MASK) != 0)
       
   367             pool.updateRunningCount(s);
       
   368     }
       
   369 
       
   370     /**
       
   371      * Notifies pool to adjust counts on cancelled or timed out wait.
       
   372      */
       
   373     private void adjustPoolCountsOnCancelledWait(ForkJoinPool pool) {
       
   374         if (pool != null) {
       
   375             int s;
       
   376             while ((s = status) >= 0 && (s & INTERNAL_SIGNAL_MASK) != 0) {
       
   377                 if (casStatus(s, s - 1)) {
       
   378                     pool.updateRunningCount(1);
       
   379                     break;
       
   380                 }
       
   381             }
       
   382         }
       
   383     }
       
   384 
       
   385     /**
       
   386      * Handles interruptions during waits.
       
   387      */
       
   388     private void onInterruptedWait() {
       
   389         ForkJoinWorkerThread w = getWorker();
       
   390         if (w == null)
       
   391             Thread.currentThread().interrupt(); // re-interrupt
       
   392         else if (w.isTerminating())
       
   393             cancelIgnoringExceptions();
       
   394         // else if FJworker, ignore interrupt
       
   395     }
       
   396 
       
   397     // Recording and reporting exceptions
       
   398 
       
   399     private void setDoneExceptionally(Throwable rex) {
       
   400         exceptionMap.put(this, rex);
       
   401         setCompletion(EXCEPTIONAL);
       
   402     }
       
   403 
       
   404     /**
       
   405      * Throws the exception associated with status s.
       
   406      *
       
   407      * @throws the exception
       
   408      */
       
   409     private void reportException(int s) {
       
   410         if ((s &= COMPLETION_MASK) < NORMAL) {
       
   411             if (s == CANCELLED)
       
   412                 throw new CancellationException();
       
   413             else
       
   414                 rethrowException(exceptionMap.get(this));
       
   415         }
       
   416     }
       
   417 
       
   418     /**
       
   419      * Returns result or throws exception using j.u.c.Future conventions.
       
   420      * Only call when {@code isDone} known to be true or thread known
       
   421      * to be interrupted.
       
   422      */
       
   423     private V reportFutureResult()
       
   424         throws InterruptedException, ExecutionException {
       
   425         if (Thread.interrupted())
       
   426             throw new InterruptedException();
       
   427         int s = status & COMPLETION_MASK;
       
   428         if (s < NORMAL) {
       
   429             Throwable ex;
       
   430             if (s == CANCELLED)
       
   431                 throw new CancellationException();
       
   432             if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
       
   433                 throw new ExecutionException(ex);
       
   434         }
       
   435         return getRawResult();
       
   436     }
       
   437 
       
   438     /**
       
   439      * Returns result or throws exception using j.u.c.Future conventions
       
   440      * with timeouts.
       
   441      */
       
   442     private V reportTimedFutureResult()
       
   443         throws InterruptedException, ExecutionException, TimeoutException {
       
   444         if (Thread.interrupted())
       
   445             throw new InterruptedException();
       
   446         Throwable ex;
       
   447         int s = status & COMPLETION_MASK;
       
   448         if (s == NORMAL)
       
   449             return getRawResult();
       
   450         else if (s == CANCELLED)
       
   451             throw new CancellationException();
       
   452         else if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
       
   453             throw new ExecutionException(ex);
       
   454         else
       
   455             throw new TimeoutException();
       
   456     }
       
   457 
       
   458     // internal execution methods
       
   459 
       
   460     /**
       
   461      * Calls exec, recording completion, and rethrowing exception if
       
   462      * encountered. Caller should normally check status before calling.
       
   463      *
       
   464      * @return true if completed normally
       
   465      */
       
   466     private boolean tryExec() {
       
   467         try { // try block must contain only call to exec
       
   468             if (!exec())
       
   469                 return false;
       
   470         } catch (Throwable rex) {
       
   471             setDoneExceptionally(rex);
       
   472             rethrowException(rex);
       
   473             return false; // not reached
       
   474         }
       
   475         setNormalCompletion();
       
   476         return true;
       
   477     }
       
   478 
       
   479     /**
       
   480      * Main execution method used by worker threads. Invokes
       
   481      * base computation unless already complete.
       
   482      */
       
   483     final void quietlyExec() {
       
   484         if (status >= 0) {
       
   485             try {
       
   486                 if (!exec())
       
   487                     return;
       
   488             } catch (Throwable rex) {
       
   489                 setDoneExceptionally(rex);
       
   490                 return;
       
   491             }
       
   492             setNormalCompletion();
       
   493         }
       
   494     }
       
   495 
       
   496     /**
       
   497      * Calls exec(), recording but not rethrowing exception.
       
   498      * Caller should normally check status before calling.
       
   499      *
       
   500      * @return true if completed normally
       
   501      */
       
   502     private boolean tryQuietlyInvoke() {
       
   503         try {
       
   504             if (!exec())
       
   505                 return false;
       
   506         } catch (Throwable rex) {
       
   507             setDoneExceptionally(rex);
       
   508             return false;
       
   509         }
       
   510         setNormalCompletion();
       
   511         return true;
       
   512     }
       
   513 
       
   514     /**
       
   515      * Cancels, ignoring any exceptions it throws.
       
   516      */
       
   517     final void cancelIgnoringExceptions() {
       
   518         try {
       
   519             cancel(false);
       
   520         } catch (Throwable ignore) {
       
   521         }
       
   522     }
       
   523 
       
   524     /**
       
   525      * Main implementation of helpJoin
       
   526      */
       
   527     private int busyJoin(ForkJoinWorkerThread w) {
       
   528         int s;
       
   529         ForkJoinTask<?> t;
       
   530         while ((s = status) >= 0 && (t = w.scanWhileJoining(this)) != null)
       
   531             t.quietlyExec();
       
   532         return (s >= 0) ? awaitDone(w, false) : s; // block if no work
       
   533     }
       
   534 
       
   535     // public methods
       
   536 
       
   537     /**
       
   538      * Arranges to asynchronously execute this task.  While it is not
       
   539      * necessarily enforced, it is a usage error to fork a task more
       
   540      * than once unless it has completed and been reinitialized.
       
   541      * Subsequent modifications to the state of this task or any data
       
   542      * it operates on are not necessarily consistently observable by
       
   543      * any thread other than the one executing it unless preceded by a
       
   544      * call to {@link #join} or related methods, or a call to {@link
       
   545      * #isDone} returning {@code true}.
       
   546      *
       
   547      * <p>This method may be invoked only from within {@code
       
   548      * ForkJoinTask} computations (as may be determined using method
       
   549      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
       
   550      * result in exceptions or errors, possibly including {@code
       
   551      * ClassCastException}.
       
   552      *
       
   553      * @return {@code this}, to simplify usage
       
   554      */
       
   555     public final ForkJoinTask<V> fork() {
       
   556         ((ForkJoinWorkerThread) Thread.currentThread())
       
   557             .pushTask(this);
       
   558         return this;
       
   559     }
       
   560 
       
   561     /**
       
   562      * Returns the result of the computation when it {@link #isDone is done}.
       
   563      * This method differs from {@link #get()} in that
       
   564      * abnormal completion results in {@code RuntimeException} or
       
   565      * {@code Error}, not {@code ExecutionException}.
       
   566      *
       
   567      * @return the computed result
       
   568      */
       
   569     public final V join() {
       
   570         ForkJoinWorkerThread w = getWorker();
       
   571         if (w == null || status < 0 || !w.unpushTask(this) || !tryExec())
       
   572             reportException(awaitDone(w, true));
       
   573         return getRawResult();
       
   574     }
       
   575 
       
   576     /**
       
   577      * Commences performing this task, awaits its completion if
       
   578      * necessary, and return its result, or throws an (unchecked)
       
   579      * exception if the underlying computation did so.
       
   580      *
       
   581      * @return the computed result
       
   582      */
       
   583     public final V invoke() {
       
   584         if (status >= 0 && tryExec())
       
   585             return getRawResult();
       
   586         else
       
   587             return join();
       
   588     }
       
   589 
       
   590     /**
       
   591      * Forks the given tasks, returning when {@code isDone} holds for
       
   592      * each task or an (unchecked) exception is encountered, in which
       
   593      * case the exception is rethrown.  If either task encounters an
       
   594      * exception, the other one may be, but is not guaranteed to be,
       
   595      * cancelled.  If both tasks throw an exception, then this method
       
   596      * throws one of them.  The individual status of each task may be
       
   597      * checked using {@link #getException()} and related methods.
       
   598      *
       
   599      * <p>This method may be invoked only from within {@code
       
   600      * ForkJoinTask} computations (as may be determined using method
       
   601      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
       
   602      * result in exceptions or errors, possibly including {@code
       
   603      * ClassCastException}.
       
   604      *
       
   605      * @param t1 the first task
       
   606      * @param t2 the second task
       
   607      * @throws NullPointerException if any task is null
       
   608      */
       
   609     public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
       
   610         t2.fork();
       
   611         t1.invoke();
       
   612         t2.join();
       
   613     }
       
   614 
       
   615     /**
       
   616      * Forks the given tasks, returning when {@code isDone} holds for
       
   617      * each task or an (unchecked) exception is encountered, in which
       
   618      * case the exception is rethrown. If any task encounters an
       
   619      * exception, others may be, but are not guaranteed to be,
       
   620      * cancelled.  If more than one task encounters an exception, then
       
   621      * this method throws any one of these exceptions.  The individual
       
   622      * status of each task may be checked using {@link #getException()}
       
   623      * and related methods.
       
   624      *
       
   625      * <p>This method may be invoked only from within {@code
       
   626      * ForkJoinTask} computations (as may be determined using method
       
   627      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
       
   628      * result in exceptions or errors, possibly including {@code
       
   629      * ClassCastException}.
       
   630      *
       
   631      * @param tasks the tasks
       
   632      * @throws NullPointerException if any task is null
       
   633      */
       
   634     public static void invokeAll(ForkJoinTask<?>... tasks) {
       
   635         Throwable ex = null;
       
   636         int last = tasks.length - 1;
       
   637         for (int i = last; i >= 0; --i) {
       
   638             ForkJoinTask<?> t = tasks[i];
       
   639             if (t == null) {
       
   640                 if (ex == null)
       
   641                     ex = new NullPointerException();
       
   642             }
       
   643             else if (i != 0)
       
   644                 t.fork();
       
   645             else {
       
   646                 t.quietlyInvoke();
       
   647                 if (ex == null)
       
   648                     ex = t.getException();
       
   649             }
       
   650         }
       
   651         for (int i = 1; i <= last; ++i) {
       
   652             ForkJoinTask<?> t = tasks[i];
       
   653             if (t != null) {
       
   654                 if (ex != null)
       
   655                     t.cancel(false);
       
   656                 else {
       
   657                     t.quietlyJoin();
       
   658                     if (ex == null)
       
   659                         ex = t.getException();
       
   660                 }
       
   661             }
       
   662         }
       
   663         if (ex != null)
       
   664             rethrowException(ex);
       
   665     }
       
   666 
       
   667     /**
       
   668      * Forks all tasks in the specified collection, returning when
       
   669      * {@code isDone} holds for each task or an (unchecked) exception
       
   670      * is encountered.  If any task encounters an exception, others
       
   671      * may be, but are not guaranteed to be, cancelled.  If more than
       
   672      * one task encounters an exception, then this method throws any
       
   673      * one of these exceptions.  The individual status of each task
       
   674      * may be checked using {@link #getException()} and related
       
   675      * methods.  The behavior of this operation is undefined if the
       
   676      * specified collection is modified while the operation is in
       
   677      * progress.
       
   678      *
       
   679      * <p>This method may be invoked only from within {@code
       
   680      * ForkJoinTask} computations (as may be determined using method
       
   681      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
       
   682      * result in exceptions or errors, possibly including {@code
       
   683      * ClassCastException}.
       
   684      *
       
   685      * @param tasks the collection of tasks
       
   686      * @return the tasks argument, to simplify usage
       
   687      * @throws NullPointerException if tasks or any element are null
       
   688      */
       
   689     public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) {
       
   690         if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) {
       
   691             invokeAll(tasks.toArray(new ForkJoinTask<?>[tasks.size()]));
       
   692             return tasks;
       
   693         }
       
   694         @SuppressWarnings("unchecked")
       
   695         List<? extends ForkJoinTask<?>> ts =
       
   696             (List<? extends ForkJoinTask<?>>) tasks;
       
   697         Throwable ex = null;
       
   698         int last = ts.size() - 1;
       
   699         for (int i = last; i >= 0; --i) {
       
   700             ForkJoinTask<?> t = ts.get(i);
       
   701             if (t == null) {
       
   702                 if (ex == null)
       
   703                     ex = new NullPointerException();
       
   704             }
       
   705             else if (i != 0)
       
   706                 t.fork();
       
   707             else {
       
   708                 t.quietlyInvoke();
       
   709                 if (ex == null)
       
   710                     ex = t.getException();
       
   711             }
       
   712         }
       
   713         for (int i = 1; i <= last; ++i) {
       
   714             ForkJoinTask<?> t = ts.get(i);
       
   715             if (t != null) {
       
   716                 if (ex != null)
       
   717                     t.cancel(false);
       
   718                 else {
       
   719                     t.quietlyJoin();
       
   720                     if (ex == null)
       
   721                         ex = t.getException();
       
   722                 }
       
   723             }
       
   724         }
       
   725         if (ex != null)
       
   726             rethrowException(ex);
       
   727         return tasks;
       
   728     }
       
   729 
       
   730     /**
       
   731      * Attempts to cancel execution of this task. This attempt will
       
   732      * fail if the task has already completed, has already been
       
   733      * cancelled, or could not be cancelled for some other reason. If
       
   734      * successful, and this task has not started when cancel is
       
   735      * called, execution of this task is suppressed, {@link
       
   736      * #isCancelled} will report true, and {@link #join} will result
       
   737      * in a {@code CancellationException} being thrown.
       
   738      *
       
   739      * <p>This method may be overridden in subclasses, but if so, must
       
   740      * still ensure that these minimal properties hold. In particular,
       
   741      * the {@code cancel} method itself must not throw exceptions.
       
   742      *
       
   743      * <p>This method is designed to be invoked by <em>other</em>
       
   744      * tasks. To terminate the current task, you can just return or
       
   745      * throw an unchecked exception from its computation method, or
       
   746      * invoke {@link #completeExceptionally}.
       
   747      *
       
   748      * @param mayInterruptIfRunning this value is ignored in the
       
   749      * default implementation because tasks are not
       
   750      * cancelled via interruption
       
   751      *
       
   752      * @return {@code true} if this task is now cancelled
       
   753      */
       
   754     public boolean cancel(boolean mayInterruptIfRunning) {
       
   755         setCompletion(CANCELLED);
       
   756         return (status & COMPLETION_MASK) == CANCELLED;
       
   757     }
       
   758 
       
   759     public final boolean isDone() {
       
   760         return status < 0;
       
   761     }
       
   762 
       
   763     public final boolean isCancelled() {
       
   764         return (status & COMPLETION_MASK) == CANCELLED;
       
   765     }
       
   766 
       
   767     /**
       
   768      * Returns {@code true} if this task threw an exception or was cancelled.
       
   769      *
       
   770      * @return {@code true} if this task threw an exception or was cancelled
       
   771      */
       
   772     public final boolean isCompletedAbnormally() {
       
   773         return (status & COMPLETION_MASK) < NORMAL;
       
   774     }
       
   775 
       
   776     /**
       
   777      * Returns {@code true} if this task completed without throwing an
       
   778      * exception and was not cancelled.
       
   779      *
       
   780      * @return {@code true} if this task completed without throwing an
       
   781      * exception and was not cancelled
       
   782      */
       
   783     public final boolean isCompletedNormally() {
       
   784         return (status & COMPLETION_MASK) == NORMAL;
       
   785     }
       
   786 
       
   787     /**
       
   788      * Returns the exception thrown by the base computation, or a
       
   789      * {@code CancellationException} if cancelled, or {@code null} if
       
   790      * none or if the method has not yet completed.
       
   791      *
       
   792      * @return the exception, or {@code null} if none
       
   793      */
       
   794     public final Throwable getException() {
       
   795         int s = status & COMPLETION_MASK;
       
   796         return ((s >= NORMAL)    ? null :
       
   797                 (s == CANCELLED) ? new CancellationException() :
       
   798                 exceptionMap.get(this));
       
   799     }
       
   800 
       
   801     /**
       
   802      * Completes this task abnormally, and if not already aborted or
       
   803      * cancelled, causes it to throw the given exception upon
       
   804      * {@code join} and related operations. This method may be used
       
   805      * to induce exceptions in asynchronous tasks, or to force
       
   806      * completion of tasks that would not otherwise complete.  Its use
       
   807      * in other situations is discouraged.  This method is
       
   808      * overridable, but overridden versions must invoke {@code super}
       
   809      * implementation to maintain guarantees.
       
   810      *
       
   811      * @param ex the exception to throw. If this exception is not a
       
   812      * {@code RuntimeException} or {@code Error}, the actual exception
       
   813      * thrown will be a {@code RuntimeException} with cause {@code ex}.
       
   814      */
       
   815     public void completeExceptionally(Throwable ex) {
       
   816         setDoneExceptionally((ex instanceof RuntimeException) ||
       
   817                              (ex instanceof Error) ? ex :
       
   818                              new RuntimeException(ex));
       
   819     }
       
   820 
       
   821     /**
       
   822      * Completes this task, and if not already aborted or cancelled,
       
   823      * returning a {@code null} result upon {@code join} and related
       
   824      * operations. This method may be used to provide results for
       
   825      * asynchronous tasks, or to provide alternative handling for
       
   826      * tasks that would not otherwise complete normally. Its use in
       
   827      * other situations is discouraged. This method is
       
   828      * overridable, but overridden versions must invoke {@code super}
       
   829      * implementation to maintain guarantees.
       
   830      *
       
   831      * @param value the result value for this task
       
   832      */
       
   833     public void complete(V value) {
       
   834         try {
       
   835             setRawResult(value);
       
   836         } catch (Throwable rex) {
       
   837             setDoneExceptionally(rex);
       
   838             return;
       
   839         }
       
   840         setNormalCompletion();
       
   841     }
       
   842 
       
   843     public final V get() throws InterruptedException, ExecutionException {
       
   844         ForkJoinWorkerThread w = getWorker();
       
   845         if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
       
   846             awaitDone(w, true);
       
   847         return reportFutureResult();
       
   848     }
       
   849 
       
   850     public final V get(long timeout, TimeUnit unit)
       
   851         throws InterruptedException, ExecutionException, TimeoutException {
       
   852         long nanos = unit.toNanos(timeout);
       
   853         ForkJoinWorkerThread w = getWorker();
       
   854         if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
       
   855             awaitDone(w, nanos);
       
   856         return reportTimedFutureResult();
       
   857     }
       
   858 
       
   859     /**
       
   860      * Possibly executes other tasks until this task {@link #isDone is
       
   861      * done}, then returns the result of the computation.  This method
       
   862      * may be more efficient than {@code join}, but is only applicable
       
   863      * when there are no potential dependencies between continuation
       
   864      * of the current task and that of any other task that might be
       
   865      * executed while helping. (This usually holds for pure
       
   866      * divide-and-conquer tasks).
       
   867      *
       
   868      * <p>This method may be invoked only from within {@code
       
   869      * ForkJoinTask} computations (as may be determined using method
       
   870      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
       
   871      * result in exceptions or errors, possibly including {@code
       
   872      * ClassCastException}.
       
   873      *
       
   874      * @return the computed result
       
   875      */
       
   876     public final V helpJoin() {
       
   877         ForkJoinWorkerThread w = (ForkJoinWorkerThread) Thread.currentThread();
       
   878         if (status < 0 || !w.unpushTask(this) || !tryExec())
       
   879             reportException(busyJoin(w));
       
   880         return getRawResult();
       
   881     }
       
   882 
       
   883     /**
       
   884      * Possibly executes other tasks until this task {@link #isDone is
       
   885      * done}.  This method may be useful when processing collections
       
   886      * of tasks when some have been cancelled or otherwise known to
       
   887      * have aborted.
       
   888      *
       
   889      * <p>This method may be invoked only from within {@code
       
   890      * ForkJoinTask} computations (as may be determined using method
       
   891      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
       
   892      * result in exceptions or errors, possibly including {@code
       
   893      * ClassCastException}.
       
   894      */
       
   895     public final void quietlyHelpJoin() {
       
   896         if (status >= 0) {
       
   897             ForkJoinWorkerThread w =
       
   898                 (ForkJoinWorkerThread) Thread.currentThread();
       
   899             if (!w.unpushTask(this) || !tryQuietlyInvoke())
       
   900                 busyJoin(w);
       
   901         }
       
   902     }
       
   903 
       
   904     /**
       
   905      * Joins this task, without returning its result or throwing an
       
   906      * exception. This method may be useful when processing
       
   907      * collections of tasks when some have been cancelled or otherwise
       
   908      * known to have aborted.
       
   909      */
       
   910     public final void quietlyJoin() {
       
   911         if (status >= 0) {
       
   912             ForkJoinWorkerThread w = getWorker();
       
   913             if (w == null || !w.unpushTask(this) || !tryQuietlyInvoke())
       
   914                 awaitDone(w, true);
       
   915         }
       
   916     }
       
   917 
       
   918     /**
       
   919      * Commences performing this task and awaits its completion if
       
   920      * necessary, without returning its result or throwing an
       
   921      * exception. This method may be useful when processing
       
   922      * collections of tasks when some have been cancelled or otherwise
       
   923      * known to have aborted.
       
   924      */
       
   925     public final void quietlyInvoke() {
       
   926         if (status >= 0 && !tryQuietlyInvoke())
       
   927             quietlyJoin();
       
   928     }
       
   929 
       
   930     /**
       
   931      * Possibly executes tasks until the pool hosting the current task
       
   932      * {@link ForkJoinPool#isQuiescent is quiescent}. This method may
       
   933      * be of use in designs in which many tasks are forked, but none
       
   934      * are explicitly joined, instead executing them until all are
       
   935      * processed.
       
   936      *
       
   937      * <p>This method may be invoked only from within {@code
       
   938      * ForkJoinTask} computations (as may be determined using method
       
   939      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
       
   940      * result in exceptions or errors, possibly including {@code
       
   941      * ClassCastException}.
       
   942      */
       
   943     public static void helpQuiesce() {
       
   944         ((ForkJoinWorkerThread) Thread.currentThread())
       
   945             .helpQuiescePool();
       
   946     }
       
   947 
       
   948     /**
       
   949      * Resets the internal bookkeeping state of this task, allowing a
       
   950      * subsequent {@code fork}. This method allows repeated reuse of
       
   951      * this task, but only if reuse occurs when this task has either
       
   952      * never been forked, or has been forked, then completed and all
       
   953      * outstanding joins of this task have also completed. Effects
       
   954      * under any other usage conditions are not guaranteed.
       
   955      * This method may be useful when executing
       
   956      * pre-constructed trees of subtasks in loops.
       
   957      */
       
   958     public void reinitialize() {
       
   959         if ((status & COMPLETION_MASK) == EXCEPTIONAL)
       
   960             exceptionMap.remove(this);
       
   961         status = 0;
       
   962     }
       
   963 
       
   964     /**
       
   965      * Returns the pool hosting the current task execution, or null
       
   966      * if this task is executing outside of any ForkJoinPool.
       
   967      *
       
   968      * @see #inForkJoinPool
       
   969      * @return the pool, or {@code null} if none
       
   970      */
       
   971     public static ForkJoinPool getPool() {
       
   972         Thread t = Thread.currentThread();
       
   973         return (t instanceof ForkJoinWorkerThread) ?
       
   974             ((ForkJoinWorkerThread) t).pool : null;
       
   975     }
       
   976 
       
   977     /**
       
   978      * Returns {@code true} if the current thread is executing as a
       
   979      * ForkJoinPool computation.
       
   980      *
       
   981      * @return {@code true} if the current thread is executing as a
       
   982      * ForkJoinPool computation, or false otherwise
       
   983      */
       
   984     public static boolean inForkJoinPool() {
       
   985         return Thread.currentThread() instanceof ForkJoinWorkerThread;
       
   986     }
       
   987 
       
   988     /**
       
   989      * Tries to unschedule this task for execution. This method will
       
   990      * typically succeed if this task is the most recently forked task
       
   991      * by the current thread, and has not commenced executing in
       
   992      * another thread.  This method may be useful when arranging
       
   993      * alternative local processing of tasks that could have been, but
       
   994      * were not, stolen.
       
   995      *
       
   996      * <p>This method may be invoked only from within {@code
       
   997      * ForkJoinTask} computations (as may be determined using method
       
   998      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
       
   999      * result in exceptions or errors, possibly including {@code
       
  1000      * ClassCastException}.
       
  1001      *
       
  1002      * @return {@code true} if unforked
       
  1003      */
       
  1004     public boolean tryUnfork() {
       
  1005         return ((ForkJoinWorkerThread) Thread.currentThread())
       
  1006             .unpushTask(this);
       
  1007     }
       
  1008 
       
  1009     /**
       
  1010      * Returns an estimate of the number of tasks that have been
       
  1011      * forked by the current worker thread but not yet executed. This
       
  1012      * value may be useful for heuristic decisions about whether to
       
  1013      * fork other tasks.
       
  1014      *
       
  1015      * <p>This method may be invoked only from within {@code
       
  1016      * ForkJoinTask} computations (as may be determined using method
       
  1017      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
       
  1018      * result in exceptions or errors, possibly including {@code
       
  1019      * ClassCastException}.
       
  1020      *
       
  1021      * @return the number of tasks
       
  1022      */
       
  1023     public static int getQueuedTaskCount() {
       
  1024         return ((ForkJoinWorkerThread) Thread.currentThread())
       
  1025             .getQueueSize();
       
  1026     }
       
  1027 
       
  1028     /**
       
  1029      * Returns an estimate of how many more locally queued tasks are
       
  1030      * held by the current worker thread than there are other worker
       
  1031      * threads that might steal them.  This value may be useful for
       
  1032      * heuristic decisions about whether to fork other tasks. In many
       
  1033      * usages of ForkJoinTasks, at steady state, each worker should
       
  1034      * aim to maintain a small constant surplus (for example, 3) of
       
  1035      * tasks, and to process computations locally if this threshold is
       
  1036      * exceeded.
       
  1037      *
       
  1038      * <p>This method may be invoked only from within {@code
       
  1039      * ForkJoinTask} computations (as may be determined using method
       
  1040      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
       
  1041      * result in exceptions or errors, possibly including {@code
       
  1042      * ClassCastException}.
       
  1043      *
       
  1044      * @return the surplus number of tasks, which may be negative
       
  1045      */
       
  1046     public static int getSurplusQueuedTaskCount() {
       
  1047         return ((ForkJoinWorkerThread) Thread.currentThread())
       
  1048             .getEstimatedSurplusTaskCount();
       
  1049     }
       
  1050 
       
  1051     // Extension methods
       
  1052 
       
  1053     /**
       
  1054      * Returns the result that would be returned by {@link #join}, even
       
  1055      * if this task completed abnormally, or {@code null} if this task
       
  1056      * is not known to have been completed.  This method is designed
       
  1057      * to aid debugging, as well as to support extensions. Its use in
       
  1058      * any other context is discouraged.
       
  1059      *
       
  1060      * @return the result, or {@code null} if not completed
       
  1061      */
       
  1062     public abstract V getRawResult();
       
  1063 
       
  1064     /**
       
  1065      * Forces the given value to be returned as a result.  This method
       
  1066      * is designed to support extensions, and should not in general be
       
  1067      * called otherwise.
       
  1068      *
       
  1069      * @param value the value
       
  1070      */
       
  1071     protected abstract void setRawResult(V value);
       
  1072 
       
  1073     /**
       
  1074      * Immediately performs the base action of this task.  This method
       
  1075      * is designed to support extensions, and should not in general be
       
  1076      * called otherwise. The return value controls whether this task
       
  1077      * is considered to be done normally. It may return false in
       
  1078      * asynchronous actions that require explicit invocations of
       
  1079      * {@link #complete} to become joinable. It may also throw an
       
  1080      * (unchecked) exception to indicate abnormal exit.
       
  1081      *
       
  1082      * @return {@code true} if completed normally
       
  1083      */
       
  1084     protected abstract boolean exec();
       
  1085 
       
  1086     /**
       
  1087      * Returns, but does not unschedule or execute, a task queued by
       
  1088      * the current thread but not yet executed, if one is immediately
       
  1089      * available. There is no guarantee that this task will actually
       
  1090      * be polled or executed next. Conversely, this method may return
       
  1091      * null even if a task exists but cannot be accessed without
       
  1092      * contention with other threads.  This method is designed
       
  1093      * primarily to support extensions, and is unlikely to be useful
       
  1094      * otherwise.
       
  1095      *
       
  1096      * <p>This method may be invoked only from within {@code
       
  1097      * ForkJoinTask} computations (as may be determined using method
       
  1098      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
       
  1099      * result in exceptions or errors, possibly including {@code
       
  1100      * ClassCastException}.
       
  1101      *
       
  1102      * @return the next task, or {@code null} if none are available
       
  1103      */
       
  1104     protected static ForkJoinTask<?> peekNextLocalTask() {
       
  1105         return ((ForkJoinWorkerThread) Thread.currentThread())
       
  1106             .peekTask();
       
  1107     }
       
  1108 
       
  1109     /**
       
  1110      * Unschedules and returns, without executing, the next task
       
  1111      * queued by the current thread but not yet executed.  This method
       
  1112      * is designed primarily to support extensions, and is unlikely to
       
  1113      * be useful otherwise.
       
  1114      *
       
  1115      * <p>This method may be invoked only from within {@code
       
  1116      * ForkJoinTask} computations (as may be determined using method
       
  1117      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
       
  1118      * result in exceptions or errors, possibly including {@code
       
  1119      * ClassCastException}.
       
  1120      *
       
  1121      * @return the next task, or {@code null} if none are available
       
  1122      */
       
  1123     protected static ForkJoinTask<?> pollNextLocalTask() {
       
  1124         return ((ForkJoinWorkerThread) Thread.currentThread())
       
  1125             .pollLocalTask();
       
  1126     }
       
  1127 
       
  1128     /**
       
  1129      * Unschedules and returns, without executing, the next task
       
  1130      * queued by the current thread but not yet executed, if one is
       
  1131      * available, or if not available, a task that was forked by some
       
  1132      * other thread, if available. Availability may be transient, so a
       
  1133      * {@code null} result does not necessarily imply quiescence
       
  1134      * of the pool this task is operating in.  This method is designed
       
  1135      * primarily to support extensions, and is unlikely to be useful
       
  1136      * otherwise.
       
  1137      *
       
  1138      * <p>This method may be invoked only from within {@code
       
  1139      * ForkJoinTask} computations (as may be determined using method
       
  1140      * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
       
  1141      * result in exceptions or errors, possibly including {@code
       
  1142      * ClassCastException}.
       
  1143      *
       
  1144      * @return a task, or {@code null} if none are available
       
  1145      */
       
  1146     protected static ForkJoinTask<?> pollTask() {
       
  1147         return ((ForkJoinWorkerThread) Thread.currentThread())
       
  1148             .pollTask();
       
  1149     }
       
  1150 
       
  1151     /**
       
  1152      * Adaptor for Runnables. This implements RunnableFuture
       
  1153      * to be compliant with AbstractExecutorService constraints
       
  1154      * when used in ForkJoinPool.
       
  1155      */
       
  1156     static final class AdaptedRunnable<T> extends ForkJoinTask<T>
       
  1157         implements RunnableFuture<T> {
       
  1158         final Runnable runnable;
       
  1159         final T resultOnCompletion;
       
  1160         T result;
       
  1161         AdaptedRunnable(Runnable runnable, T result) {
       
  1162             if (runnable == null) throw new NullPointerException();
       
  1163             this.runnable = runnable;
       
  1164             this.resultOnCompletion = result;
       
  1165         }
       
  1166         public T getRawResult() { return result; }
       
  1167         public void setRawResult(T v) { result = v; }
       
  1168         public boolean exec() {
       
  1169             runnable.run();
       
  1170             result = resultOnCompletion;
       
  1171             return true;
       
  1172         }
       
  1173         public void run() { invoke(); }
       
  1174         private static final long serialVersionUID = 5232453952276885070L;
       
  1175     }
       
  1176 
       
  1177     /**
       
  1178      * Adaptor for Callables
       
  1179      */
       
  1180     static final class AdaptedCallable<T> extends ForkJoinTask<T>
       
  1181         implements RunnableFuture<T> {
       
  1182         final Callable<? extends T> callable;
       
  1183         T result;
       
  1184         AdaptedCallable(Callable<? extends T> callable) {
       
  1185             if (callable == null) throw new NullPointerException();
       
  1186             this.callable = callable;
       
  1187         }
       
  1188         public T getRawResult() { return result; }
       
  1189         public void setRawResult(T v) { result = v; }
       
  1190         public boolean exec() {
       
  1191             try {
       
  1192                 result = callable.call();
       
  1193                 return true;
       
  1194             } catch (Error err) {
       
  1195                 throw err;
       
  1196             } catch (RuntimeException rex) {
       
  1197                 throw rex;
       
  1198             } catch (Exception ex) {
       
  1199                 throw new RuntimeException(ex);
       
  1200             }
       
  1201         }
       
  1202         public void run() { invoke(); }
       
  1203         private static final long serialVersionUID = 2838392045355241008L;
       
  1204     }
       
  1205 
       
  1206     /**
       
  1207      * Returns a new {@code ForkJoinTask} that performs the {@code run}
       
  1208      * method of the given {@code Runnable} as its action, and returns
       
  1209      * a null result upon {@link #join}.
       
  1210      *
       
  1211      * @param runnable the runnable action
       
  1212      * @return the task
       
  1213      */
       
  1214     public static ForkJoinTask<?> adapt(Runnable runnable) {
       
  1215         return new AdaptedRunnable<Void>(runnable, null);
       
  1216     }
       
  1217 
       
  1218     /**
       
  1219      * Returns a new {@code ForkJoinTask} that performs the {@code run}
       
  1220      * method of the given {@code Runnable} as its action, and returns
       
  1221      * the given result upon {@link #join}.
       
  1222      *
       
  1223      * @param runnable the runnable action
       
  1224      * @param result the result upon completion
       
  1225      * @return the task
       
  1226      */
       
  1227     public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) {
       
  1228         return new AdaptedRunnable<T>(runnable, result);
       
  1229     }
       
  1230 
       
  1231     /**
       
  1232      * Returns a new {@code ForkJoinTask} that performs the {@code call}
       
  1233      * method of the given {@code Callable} as its action, and returns
       
  1234      * its result upon {@link #join}, translating any checked exceptions
       
  1235      * encountered into {@code RuntimeException}.
       
  1236      *
       
  1237      * @param callable the callable action
       
  1238      * @return the task
       
  1239      */
       
  1240     public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) {
       
  1241         return new AdaptedCallable<T>(callable);
       
  1242     }
       
  1243 
       
  1244     // Serialization support
       
  1245 
       
  1246     private static final long serialVersionUID = -7721805057305804111L;
       
  1247 
       
  1248     /**
       
  1249      * Saves the state to a stream.
       
  1250      *
       
  1251      * @serialData the current run status and the exception thrown
       
  1252      * during execution, or {@code null} if none
       
  1253      * @param s the stream
       
  1254      */
       
  1255     private void writeObject(java.io.ObjectOutputStream s)
       
  1256         throws java.io.IOException {
       
  1257         s.defaultWriteObject();
       
  1258         s.writeObject(getException());
       
  1259     }
       
  1260 
       
  1261     /**
       
  1262      * Reconstitutes the instance from a stream.
       
  1263      *
       
  1264      * @param s the stream
       
  1265      */
       
  1266     private void readObject(java.io.ObjectInputStream s)
       
  1267         throws java.io.IOException, ClassNotFoundException {
       
  1268         s.defaultReadObject();
       
  1269         status &= ~INTERNAL_SIGNAL_MASK; // clear internal signal counts
       
  1270         status |= EXTERNAL_SIGNAL; // conservatively set external signal
       
  1271         Object ex = s.readObject();
       
  1272         if (ex != null)
       
  1273             setDoneExceptionally((Throwable) ex);
       
  1274     }
       
  1275 
       
  1276     // Unsafe mechanics
       
  1277 
       
  1278     private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
       
  1279     private static final long statusOffset =
       
  1280         objectFieldOffset("status", ForkJoinTask.class);
       
  1281 
       
  1282     private static long objectFieldOffset(String field, Class<?> klazz) {
       
  1283         try {
       
  1284             return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
       
  1285         } catch (NoSuchFieldException e) {
       
  1286             // Convert Exception to corresponding Error
       
  1287             NoSuchFieldError error = new NoSuchFieldError(field);
       
  1288             error.initCause(e);
       
  1289             throw error;
       
  1290         }
       
  1291     }
       
  1292 }