jdk/src/share/classes/java/util/concurrent/CyclicBarrier.java
changeset 2 90ce3da70b43
child 5506 202f599c92aa
equal deleted inserted replaced
0:fd16c54261b3 2:90ce3da70b43
       
     1 /*
       
     2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     3  *
       
     4  * This code is free software; you can redistribute it and/or modify it
       
     5  * under the terms of the GNU General Public License version 2 only, as
       
     6  * published by the Free Software Foundation.  Sun designates this
       
     7  * particular file as subject to the "Classpath" exception as provided
       
     8  * by Sun in the LICENSE file that accompanied this code.
       
     9  *
       
    10  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    13  * version 2 for more details (a copy is included in the LICENSE file that
       
    14  * accompanied this code).
       
    15  *
       
    16  * You should have received a copy of the GNU General Public License version
       
    17  * 2 along with this work; if not, write to the Free Software Foundation,
       
    18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    19  *
       
    20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
       
    21  * CA 95054 USA or visit www.sun.com if you need additional information or
       
    22  * have any questions.
       
    23  */
       
    24 
       
    25 /*
       
    26  * This file is available under and governed by the GNU General Public
       
    27  * License version 2 only, as published by the Free Software Foundation.
       
    28  * However, the following notice accompanied the original version of this
       
    29  * file:
       
    30  *
       
    31  * Written by Doug Lea with assistance from members of JCP JSR-166
       
    32  * Expert Group and released to the public domain, as explained at
       
    33  * http://creativecommons.org/licenses/publicdomain
       
    34  */
       
    35 
       
    36 package java.util.concurrent;
       
    37 import java.util.concurrent.locks.*;
       
    38 
       
    39 /**
       
    40  * A synchronization aid that allows a set of threads to all wait for
       
    41  * each other to reach a common barrier point.  CyclicBarriers are
       
    42  * useful in programs involving a fixed sized party of threads that
       
    43  * must occasionally wait for each other. The barrier is called
       
    44  * <em>cyclic</em> because it can be re-used after the waiting threads
       
    45  * are released.
       
    46  *
       
    47  * <p>A <tt>CyclicBarrier</tt> supports an optional {@link Runnable} command
       
    48  * that is run once per barrier point, after the last thread in the party
       
    49  * arrives, but before any threads are released.
       
    50  * This <em>barrier action</em> is useful
       
    51  * for updating shared-state before any of the parties continue.
       
    52  *
       
    53  * <p><b>Sample usage:</b> Here is an example of
       
    54  *  using a barrier in a parallel decomposition design:
       
    55  * <pre>
       
    56  * class Solver {
       
    57  *   final int N;
       
    58  *   final float[][] data;
       
    59  *   final CyclicBarrier barrier;
       
    60  *
       
    61  *   class Worker implements Runnable {
       
    62  *     int myRow;
       
    63  *     Worker(int row) { myRow = row; }
       
    64  *     public void run() {
       
    65  *       while (!done()) {
       
    66  *         processRow(myRow);
       
    67  *
       
    68  *         try {
       
    69  *           barrier.await();
       
    70  *         } catch (InterruptedException ex) {
       
    71  *           return;
       
    72  *         } catch (BrokenBarrierException ex) {
       
    73  *           return;
       
    74  *         }
       
    75  *       }
       
    76  *     }
       
    77  *   }
       
    78  *
       
    79  *   public Solver(float[][] matrix) {
       
    80  *     data = matrix;
       
    81  *     N = matrix.length;
       
    82  *     barrier = new CyclicBarrier(N,
       
    83  *                                 new Runnable() {
       
    84  *                                   public void run() {
       
    85  *                                     mergeRows(...);
       
    86  *                                   }
       
    87  *                                 });
       
    88  *     for (int i = 0; i < N; ++i)
       
    89  *       new Thread(new Worker(i)).start();
       
    90  *
       
    91  *     waitUntilDone();
       
    92  *   }
       
    93  * }
       
    94  * </pre>
       
    95  * Here, each worker thread processes a row of the matrix then waits at the
       
    96  * barrier until all rows have been processed. When all rows are processed
       
    97  * the supplied {@link Runnable} barrier action is executed and merges the
       
    98  * rows. If the merger
       
    99  * determines that a solution has been found then <tt>done()</tt> will return
       
   100  * <tt>true</tt> and each worker will terminate.
       
   101  *
       
   102  * <p>If the barrier action does not rely on the parties being suspended when
       
   103  * it is executed, then any of the threads in the party could execute that
       
   104  * action when it is released. To facilitate this, each invocation of
       
   105  * {@link #await} returns the arrival index of that thread at the barrier.
       
   106  * You can then choose which thread should execute the barrier action, for
       
   107  * example:
       
   108  * <pre>  if (barrier.await() == 0) {
       
   109  *     // log the completion of this iteration
       
   110  *   }</pre>
       
   111  *
       
   112  * <p>The <tt>CyclicBarrier</tt> uses an all-or-none breakage model
       
   113  * for failed synchronization attempts: If a thread leaves a barrier
       
   114  * point prematurely because of interruption, failure, or timeout, all
       
   115  * other threads waiting at that barrier point will also leave
       
   116  * abnormally via {@link BrokenBarrierException} (or
       
   117  * {@link InterruptedException} if they too were interrupted at about
       
   118  * the same time).
       
   119  *
       
   120  * <p>Memory consistency effects: Actions in a thread prior to calling
       
   121  * {@code await()}
       
   122  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
       
   123  * actions that are part of the barrier action, which in turn
       
   124  * <i>happen-before</i> actions following a successful return from the
       
   125  * corresponding {@code await()} in other threads.
       
   126  *
       
   127  * @since 1.5
       
   128  * @see CountDownLatch
       
   129  *
       
   130  * @author Doug Lea
       
   131  */
       
   132 public class CyclicBarrier {
       
   133     /**
       
   134      * Each use of the barrier is represented as a generation instance.
       
   135      * The generation changes whenever the barrier is tripped, or
       
   136      * is reset. There can be many generations associated with threads
       
   137      * using the barrier - due to the non-deterministic way the lock
       
   138      * may be allocated to waiting threads - but only one of these
       
   139      * can be active at a time (the one to which <tt>count</tt> applies)
       
   140      * and all the rest are either broken or tripped.
       
   141      * There need not be an active generation if there has been a break
       
   142      * but no subsequent reset.
       
   143      */
       
   144     private static class Generation {
       
   145         boolean broken = false;
       
   146     }
       
   147 
       
   148     /** The lock for guarding barrier entry */
       
   149     private final ReentrantLock lock = new ReentrantLock();
       
   150     /** Condition to wait on until tripped */
       
   151     private final Condition trip = lock.newCondition();
       
   152     /** The number of parties */
       
   153     private final int parties;
       
   154     /* The command to run when tripped */
       
   155     private final Runnable barrierCommand;
       
   156     /** The current generation */
       
   157     private Generation generation = new Generation();
       
   158 
       
   159     /**
       
   160      * Number of parties still waiting. Counts down from parties to 0
       
   161      * on each generation.  It is reset to parties on each new
       
   162      * generation or when broken.
       
   163      */
       
   164     private int count;
       
   165 
       
   166     /**
       
   167      * Updates state on barrier trip and wakes up everyone.
       
   168      * Called only while holding lock.
       
   169      */
       
   170     private void nextGeneration() {
       
   171         // signal completion of last generation
       
   172         trip.signalAll();
       
   173         // set up next generation
       
   174         count = parties;
       
   175         generation = new Generation();
       
   176     }
       
   177 
       
   178     /**
       
   179      * Sets current barrier generation as broken and wakes up everyone.
       
   180      * Called only while holding lock.
       
   181      */
       
   182     private void breakBarrier() {
       
   183         generation.broken = true;
       
   184         count = parties;
       
   185         trip.signalAll();
       
   186     }
       
   187 
       
   188     /**
       
   189      * Main barrier code, covering the various policies.
       
   190      */
       
   191     private int dowait(boolean timed, long nanos)
       
   192         throws InterruptedException, BrokenBarrierException,
       
   193                TimeoutException {
       
   194         final ReentrantLock lock = this.lock;
       
   195         lock.lock();
       
   196         try {
       
   197             final Generation g = generation;
       
   198 
       
   199             if (g.broken)
       
   200                 throw new BrokenBarrierException();
       
   201 
       
   202             if (Thread.interrupted()) {
       
   203                 breakBarrier();
       
   204                 throw new InterruptedException();
       
   205             }
       
   206 
       
   207            int index = --count;
       
   208            if (index == 0) {  // tripped
       
   209                boolean ranAction = false;
       
   210                try {
       
   211                    final Runnable command = barrierCommand;
       
   212                    if (command != null)
       
   213                        command.run();
       
   214                    ranAction = true;
       
   215                    nextGeneration();
       
   216                    return 0;
       
   217                } finally {
       
   218                    if (!ranAction)
       
   219                        breakBarrier();
       
   220                }
       
   221            }
       
   222 
       
   223             // loop until tripped, broken, interrupted, or timed out
       
   224             for (;;) {
       
   225                 try {
       
   226                     if (!timed)
       
   227                         trip.await();
       
   228                     else if (nanos > 0L)
       
   229                         nanos = trip.awaitNanos(nanos);
       
   230                 } catch (InterruptedException ie) {
       
   231                     if (g == generation && ! g.broken) {
       
   232                         breakBarrier();
       
   233                         throw ie;
       
   234                     } else {
       
   235                         // We're about to finish waiting even if we had not
       
   236                         // been interrupted, so this interrupt is deemed to
       
   237                         // "belong" to subsequent execution.
       
   238                         Thread.currentThread().interrupt();
       
   239                     }
       
   240                 }
       
   241 
       
   242                 if (g.broken)
       
   243                     throw new BrokenBarrierException();
       
   244 
       
   245                 if (g != generation)
       
   246                     return index;
       
   247 
       
   248                 if (timed && nanos <= 0L) {
       
   249                     breakBarrier();
       
   250                     throw new TimeoutException();
       
   251                 }
       
   252             }
       
   253         } finally {
       
   254             lock.unlock();
       
   255         }
       
   256     }
       
   257 
       
   258     /**
       
   259      * Creates a new <tt>CyclicBarrier</tt> that will trip when the
       
   260      * given number of parties (threads) are waiting upon it, and which
       
   261      * will execute the given barrier action when the barrier is tripped,
       
   262      * performed by the last thread entering the barrier.
       
   263      *
       
   264      * @param parties the number of threads that must invoke {@link #await}
       
   265      *        before the barrier is tripped
       
   266      * @param barrierAction the command to execute when the barrier is
       
   267      *        tripped, or {@code null} if there is no action
       
   268      * @throws IllegalArgumentException if {@code parties} is less than 1
       
   269      */
       
   270     public CyclicBarrier(int parties, Runnable barrierAction) {
       
   271         if (parties <= 0) throw new IllegalArgumentException();
       
   272         this.parties = parties;
       
   273         this.count = parties;
       
   274         this.barrierCommand = barrierAction;
       
   275     }
       
   276 
       
   277     /**
       
   278      * Creates a new <tt>CyclicBarrier</tt> that will trip when the
       
   279      * given number of parties (threads) are waiting upon it, and
       
   280      * does not perform a predefined action when the barrier is tripped.
       
   281      *
       
   282      * @param parties the number of threads that must invoke {@link #await}
       
   283      *        before the barrier is tripped
       
   284      * @throws IllegalArgumentException if {@code parties} is less than 1
       
   285      */
       
   286     public CyclicBarrier(int parties) {
       
   287         this(parties, null);
       
   288     }
       
   289 
       
   290     /**
       
   291      * Returns the number of parties required to trip this barrier.
       
   292      *
       
   293      * @return the number of parties required to trip this barrier
       
   294      */
       
   295     public int getParties() {
       
   296         return parties;
       
   297     }
       
   298 
       
   299     /**
       
   300      * Waits until all {@linkplain #getParties parties} have invoked
       
   301      * <tt>await</tt> on this barrier.
       
   302      *
       
   303      * <p>If the current thread is not the last to arrive then it is
       
   304      * disabled for thread scheduling purposes and lies dormant until
       
   305      * one of the following things happens:
       
   306      * <ul>
       
   307      * <li>The last thread arrives; or
       
   308      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
       
   309      * the current thread; or
       
   310      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
       
   311      * one of the other waiting threads; or
       
   312      * <li>Some other thread times out while waiting for barrier; or
       
   313      * <li>Some other thread invokes {@link #reset} on this barrier.
       
   314      * </ul>
       
   315      *
       
   316      * <p>If the current thread:
       
   317      * <ul>
       
   318      * <li>has its interrupted status set on entry to this method; or
       
   319      * <li>is {@linkplain Thread#interrupt interrupted} while waiting
       
   320      * </ul>
       
   321      * then {@link InterruptedException} is thrown and the current thread's
       
   322      * interrupted status is cleared.
       
   323      *
       
   324      * <p>If the barrier is {@link #reset} while any thread is waiting,
       
   325      * or if the barrier {@linkplain #isBroken is broken} when
       
   326      * <tt>await</tt> is invoked, or while any thread is waiting, then
       
   327      * {@link BrokenBarrierException} is thrown.
       
   328      *
       
   329      * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
       
   330      * then all other waiting threads will throw
       
   331      * {@link BrokenBarrierException} and the barrier is placed in the broken
       
   332      * state.
       
   333      *
       
   334      * <p>If the current thread is the last thread to arrive, and a
       
   335      * non-null barrier action was supplied in the constructor, then the
       
   336      * current thread runs the action before allowing the other threads to
       
   337      * continue.
       
   338      * If an exception occurs during the barrier action then that exception
       
   339      * will be propagated in the current thread and the barrier is placed in
       
   340      * the broken state.
       
   341      *
       
   342      * @return the arrival index of the current thread, where index
       
   343      *         <tt>{@link #getParties()} - 1</tt> indicates the first
       
   344      *         to arrive and zero indicates the last to arrive
       
   345      * @throws InterruptedException if the current thread was interrupted
       
   346      *         while waiting
       
   347      * @throws BrokenBarrierException if <em>another</em> thread was
       
   348      *         interrupted or timed out while the current thread was
       
   349      *         waiting, or the barrier was reset, or the barrier was
       
   350      *         broken when {@code await} was called, or the barrier
       
   351      *         action (if present) failed due an exception.
       
   352      */
       
   353     public int await() throws InterruptedException, BrokenBarrierException {
       
   354         try {
       
   355             return dowait(false, 0L);
       
   356         } catch (TimeoutException toe) {
       
   357             throw new Error(toe); // cannot happen;
       
   358         }
       
   359     }
       
   360 
       
   361     /**
       
   362      * Waits until all {@linkplain #getParties parties} have invoked
       
   363      * <tt>await</tt> on this barrier, or the specified waiting time elapses.
       
   364      *
       
   365      * <p>If the current thread is not the last to arrive then it is
       
   366      * disabled for thread scheduling purposes and lies dormant until
       
   367      * one of the following things happens:
       
   368      * <ul>
       
   369      * <li>The last thread arrives; or
       
   370      * <li>The specified timeout elapses; or
       
   371      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
       
   372      * the current thread; or
       
   373      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
       
   374      * one of the other waiting threads; or
       
   375      * <li>Some other thread times out while waiting for barrier; or
       
   376      * <li>Some other thread invokes {@link #reset} on this barrier.
       
   377      * </ul>
       
   378      *
       
   379      * <p>If the current thread:
       
   380      * <ul>
       
   381      * <li>has its interrupted status set on entry to this method; or
       
   382      * <li>is {@linkplain Thread#interrupt interrupted} while waiting
       
   383      * </ul>
       
   384      * then {@link InterruptedException} is thrown and the current thread's
       
   385      * interrupted status is cleared.
       
   386      *
       
   387      * <p>If the specified waiting time elapses then {@link TimeoutException}
       
   388      * is thrown. If the time is less than or equal to zero, the
       
   389      * method will not wait at all.
       
   390      *
       
   391      * <p>If the barrier is {@link #reset} while any thread is waiting,
       
   392      * or if the barrier {@linkplain #isBroken is broken} when
       
   393      * <tt>await</tt> is invoked, or while any thread is waiting, then
       
   394      * {@link BrokenBarrierException} is thrown.
       
   395      *
       
   396      * <p>If any thread is {@linkplain Thread#interrupt interrupted} while
       
   397      * waiting, then all other waiting threads will throw {@link
       
   398      * BrokenBarrierException} and the barrier is placed in the broken
       
   399      * state.
       
   400      *
       
   401      * <p>If the current thread is the last thread to arrive, and a
       
   402      * non-null barrier action was supplied in the constructor, then the
       
   403      * current thread runs the action before allowing the other threads to
       
   404      * continue.
       
   405      * If an exception occurs during the barrier action then that exception
       
   406      * will be propagated in the current thread and the barrier is placed in
       
   407      * the broken state.
       
   408      *
       
   409      * @param timeout the time to wait for the barrier
       
   410      * @param unit the time unit of the timeout parameter
       
   411      * @return the arrival index of the current thread, where index
       
   412      *         <tt>{@link #getParties()} - 1</tt> indicates the first
       
   413      *         to arrive and zero indicates the last to arrive
       
   414      * @throws InterruptedException if the current thread was interrupted
       
   415      *         while waiting
       
   416      * @throws TimeoutException if the specified timeout elapses
       
   417      * @throws BrokenBarrierException if <em>another</em> thread was
       
   418      *         interrupted or timed out while the current thread was
       
   419      *         waiting, or the barrier was reset, or the barrier was broken
       
   420      *         when {@code await} was called, or the barrier action (if
       
   421      *         present) failed due an exception
       
   422      */
       
   423     public int await(long timeout, TimeUnit unit)
       
   424         throws InterruptedException,
       
   425                BrokenBarrierException,
       
   426                TimeoutException {
       
   427         return dowait(true, unit.toNanos(timeout));
       
   428     }
       
   429 
       
   430     /**
       
   431      * Queries if this barrier is in a broken state.
       
   432      *
       
   433      * @return {@code true} if one or more parties broke out of this
       
   434      *         barrier due to interruption or timeout since
       
   435      *         construction or the last reset, or a barrier action
       
   436      *         failed due to an exception; {@code false} otherwise.
       
   437      */
       
   438     public boolean isBroken() {
       
   439         final ReentrantLock lock = this.lock;
       
   440         lock.lock();
       
   441         try {
       
   442             return generation.broken;
       
   443         } finally {
       
   444             lock.unlock();
       
   445         }
       
   446     }
       
   447 
       
   448     /**
       
   449      * Resets the barrier to its initial state.  If any parties are
       
   450      * currently waiting at the barrier, they will return with a
       
   451      * {@link BrokenBarrierException}. Note that resets <em>after</em>
       
   452      * a breakage has occurred for other reasons can be complicated to
       
   453      * carry out; threads need to re-synchronize in some other way,
       
   454      * and choose one to perform the reset.  It may be preferable to
       
   455      * instead create a new barrier for subsequent use.
       
   456      */
       
   457     public void reset() {
       
   458         final ReentrantLock lock = this.lock;
       
   459         lock.lock();
       
   460         try {
       
   461             breakBarrier();   // break the current generation
       
   462             nextGeneration(); // start a new generation
       
   463         } finally {
       
   464             lock.unlock();
       
   465         }
       
   466     }
       
   467 
       
   468     /**
       
   469      * Returns the number of parties currently waiting at the barrier.
       
   470      * This method is primarily useful for debugging and assertions.
       
   471      *
       
   472      * @return the number of parties currently blocked in {@link #await}
       
   473      */
       
   474     public int getNumberWaiting() {
       
   475         final ReentrantLock lock = this.lock;
       
   476         lock.lock();
       
   477         try {
       
   478             return parties - count;
       
   479         } finally {
       
   480             lock.unlock();
       
   481         }
       
   482     }
       
   483 }