|
1 /* |
|
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
|
3 * |
|
4 * This code is free software; you can redistribute it and/or modify it |
|
5 * under the terms of the GNU General Public License version 2 only, as |
|
6 * published by the Free Software Foundation. Sun designates this |
|
7 * particular file as subject to the "Classpath" exception as provided |
|
8 * by Sun in the LICENSE file that accompanied this code. |
|
9 * |
|
10 * This code is distributed in the hope that it will be useful, but WITHOUT |
|
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
|
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
|
13 * version 2 for more details (a copy is included in the LICENSE file that |
|
14 * accompanied this code). |
|
15 * |
|
16 * You should have received a copy of the GNU General Public License version |
|
17 * 2 along with this work; if not, write to the Free Software Foundation, |
|
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
|
19 * |
|
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, |
|
21 * CA 95054 USA or visit www.sun.com if you need additional information or |
|
22 * have any questions. |
|
23 */ |
|
24 |
|
25 /* |
|
26 * This file is available under and governed by the GNU General Public |
|
27 * License version 2 only, as published by the Free Software Foundation. |
|
28 * However, the following notice accompanied the original version of this |
|
29 * file: |
|
30 * |
|
31 * Written by Doug Lea with assistance from members of JCP JSR-166 |
|
32 * Expert Group and released to the public domain, as explained at |
|
33 * http://creativecommons.org/licenses/publicdomain |
|
34 */ |
|
35 |
|
36 package java.util.concurrent; |
|
37 import java.util.concurrent.locks.*; |
|
38 |
|
39 /** |
|
40 * A synchronization aid that allows a set of threads to all wait for |
|
41 * each other to reach a common barrier point. CyclicBarriers are |
|
42 * useful in programs involving a fixed sized party of threads that |
|
43 * must occasionally wait for each other. The barrier is called |
|
44 * <em>cyclic</em> because it can be re-used after the waiting threads |
|
45 * are released. |
|
46 * |
|
47 * <p>A <tt>CyclicBarrier</tt> supports an optional {@link Runnable} command |
|
48 * that is run once per barrier point, after the last thread in the party |
|
49 * arrives, but before any threads are released. |
|
50 * This <em>barrier action</em> is useful |
|
51 * for updating shared-state before any of the parties continue. |
|
52 * |
|
53 * <p><b>Sample usage:</b> Here is an example of |
|
54 * using a barrier in a parallel decomposition design: |
|
55 * <pre> |
|
56 * class Solver { |
|
57 * final int N; |
|
58 * final float[][] data; |
|
59 * final CyclicBarrier barrier; |
|
60 * |
|
61 * class Worker implements Runnable { |
|
62 * int myRow; |
|
63 * Worker(int row) { myRow = row; } |
|
64 * public void run() { |
|
65 * while (!done()) { |
|
66 * processRow(myRow); |
|
67 * |
|
68 * try { |
|
69 * barrier.await(); |
|
70 * } catch (InterruptedException ex) { |
|
71 * return; |
|
72 * } catch (BrokenBarrierException ex) { |
|
73 * return; |
|
74 * } |
|
75 * } |
|
76 * } |
|
77 * } |
|
78 * |
|
79 * public Solver(float[][] matrix) { |
|
80 * data = matrix; |
|
81 * N = matrix.length; |
|
82 * barrier = new CyclicBarrier(N, |
|
83 * new Runnable() { |
|
84 * public void run() { |
|
85 * mergeRows(...); |
|
86 * } |
|
87 * }); |
|
88 * for (int i = 0; i < N; ++i) |
|
89 * new Thread(new Worker(i)).start(); |
|
90 * |
|
91 * waitUntilDone(); |
|
92 * } |
|
93 * } |
|
94 * </pre> |
|
95 * Here, each worker thread processes a row of the matrix then waits at the |
|
96 * barrier until all rows have been processed. When all rows are processed |
|
97 * the supplied {@link Runnable} barrier action is executed and merges the |
|
98 * rows. If the merger |
|
99 * determines that a solution has been found then <tt>done()</tt> will return |
|
100 * <tt>true</tt> and each worker will terminate. |
|
101 * |
|
102 * <p>If the barrier action does not rely on the parties being suspended when |
|
103 * it is executed, then any of the threads in the party could execute that |
|
104 * action when it is released. To facilitate this, each invocation of |
|
105 * {@link #await} returns the arrival index of that thread at the barrier. |
|
106 * You can then choose which thread should execute the barrier action, for |
|
107 * example: |
|
108 * <pre> if (barrier.await() == 0) { |
|
109 * // log the completion of this iteration |
|
110 * }</pre> |
|
111 * |
|
112 * <p>The <tt>CyclicBarrier</tt> uses an all-or-none breakage model |
|
113 * for failed synchronization attempts: If a thread leaves a barrier |
|
114 * point prematurely because of interruption, failure, or timeout, all |
|
115 * other threads waiting at that barrier point will also leave |
|
116 * abnormally via {@link BrokenBarrierException} (or |
|
117 * {@link InterruptedException} if they too were interrupted at about |
|
118 * the same time). |
|
119 * |
|
120 * <p>Memory consistency effects: Actions in a thread prior to calling |
|
121 * {@code await()} |
|
122 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> |
|
123 * actions that are part of the barrier action, which in turn |
|
124 * <i>happen-before</i> actions following a successful return from the |
|
125 * corresponding {@code await()} in other threads. |
|
126 * |
|
127 * @since 1.5 |
|
128 * @see CountDownLatch |
|
129 * |
|
130 * @author Doug Lea |
|
131 */ |
|
132 public class CyclicBarrier { |
|
133 /** |
|
134 * Each use of the barrier is represented as a generation instance. |
|
135 * The generation changes whenever the barrier is tripped, or |
|
136 * is reset. There can be many generations associated with threads |
|
137 * using the barrier - due to the non-deterministic way the lock |
|
138 * may be allocated to waiting threads - but only one of these |
|
139 * can be active at a time (the one to which <tt>count</tt> applies) |
|
140 * and all the rest are either broken or tripped. |
|
141 * There need not be an active generation if there has been a break |
|
142 * but no subsequent reset. |
|
143 */ |
|
144 private static class Generation { |
|
145 boolean broken = false; |
|
146 } |
|
147 |
|
148 /** The lock for guarding barrier entry */ |
|
149 private final ReentrantLock lock = new ReentrantLock(); |
|
150 /** Condition to wait on until tripped */ |
|
151 private final Condition trip = lock.newCondition(); |
|
152 /** The number of parties */ |
|
153 private final int parties; |
|
154 /* The command to run when tripped */ |
|
155 private final Runnable barrierCommand; |
|
156 /** The current generation */ |
|
157 private Generation generation = new Generation(); |
|
158 |
|
159 /** |
|
160 * Number of parties still waiting. Counts down from parties to 0 |
|
161 * on each generation. It is reset to parties on each new |
|
162 * generation or when broken. |
|
163 */ |
|
164 private int count; |
|
165 |
|
166 /** |
|
167 * Updates state on barrier trip and wakes up everyone. |
|
168 * Called only while holding lock. |
|
169 */ |
|
170 private void nextGeneration() { |
|
171 // signal completion of last generation |
|
172 trip.signalAll(); |
|
173 // set up next generation |
|
174 count = parties; |
|
175 generation = new Generation(); |
|
176 } |
|
177 |
|
178 /** |
|
179 * Sets current barrier generation as broken and wakes up everyone. |
|
180 * Called only while holding lock. |
|
181 */ |
|
182 private void breakBarrier() { |
|
183 generation.broken = true; |
|
184 count = parties; |
|
185 trip.signalAll(); |
|
186 } |
|
187 |
|
188 /** |
|
189 * Main barrier code, covering the various policies. |
|
190 */ |
|
191 private int dowait(boolean timed, long nanos) |
|
192 throws InterruptedException, BrokenBarrierException, |
|
193 TimeoutException { |
|
194 final ReentrantLock lock = this.lock; |
|
195 lock.lock(); |
|
196 try { |
|
197 final Generation g = generation; |
|
198 |
|
199 if (g.broken) |
|
200 throw new BrokenBarrierException(); |
|
201 |
|
202 if (Thread.interrupted()) { |
|
203 breakBarrier(); |
|
204 throw new InterruptedException(); |
|
205 } |
|
206 |
|
207 int index = --count; |
|
208 if (index == 0) { // tripped |
|
209 boolean ranAction = false; |
|
210 try { |
|
211 final Runnable command = barrierCommand; |
|
212 if (command != null) |
|
213 command.run(); |
|
214 ranAction = true; |
|
215 nextGeneration(); |
|
216 return 0; |
|
217 } finally { |
|
218 if (!ranAction) |
|
219 breakBarrier(); |
|
220 } |
|
221 } |
|
222 |
|
223 // loop until tripped, broken, interrupted, or timed out |
|
224 for (;;) { |
|
225 try { |
|
226 if (!timed) |
|
227 trip.await(); |
|
228 else if (nanos > 0L) |
|
229 nanos = trip.awaitNanos(nanos); |
|
230 } catch (InterruptedException ie) { |
|
231 if (g == generation && ! g.broken) { |
|
232 breakBarrier(); |
|
233 throw ie; |
|
234 } else { |
|
235 // We're about to finish waiting even if we had not |
|
236 // been interrupted, so this interrupt is deemed to |
|
237 // "belong" to subsequent execution. |
|
238 Thread.currentThread().interrupt(); |
|
239 } |
|
240 } |
|
241 |
|
242 if (g.broken) |
|
243 throw new BrokenBarrierException(); |
|
244 |
|
245 if (g != generation) |
|
246 return index; |
|
247 |
|
248 if (timed && nanos <= 0L) { |
|
249 breakBarrier(); |
|
250 throw new TimeoutException(); |
|
251 } |
|
252 } |
|
253 } finally { |
|
254 lock.unlock(); |
|
255 } |
|
256 } |
|
257 |
|
258 /** |
|
259 * Creates a new <tt>CyclicBarrier</tt> that will trip when the |
|
260 * given number of parties (threads) are waiting upon it, and which |
|
261 * will execute the given barrier action when the barrier is tripped, |
|
262 * performed by the last thread entering the barrier. |
|
263 * |
|
264 * @param parties the number of threads that must invoke {@link #await} |
|
265 * before the barrier is tripped |
|
266 * @param barrierAction the command to execute when the barrier is |
|
267 * tripped, or {@code null} if there is no action |
|
268 * @throws IllegalArgumentException if {@code parties} is less than 1 |
|
269 */ |
|
270 public CyclicBarrier(int parties, Runnable barrierAction) { |
|
271 if (parties <= 0) throw new IllegalArgumentException(); |
|
272 this.parties = parties; |
|
273 this.count = parties; |
|
274 this.barrierCommand = barrierAction; |
|
275 } |
|
276 |
|
277 /** |
|
278 * Creates a new <tt>CyclicBarrier</tt> that will trip when the |
|
279 * given number of parties (threads) are waiting upon it, and |
|
280 * does not perform a predefined action when the barrier is tripped. |
|
281 * |
|
282 * @param parties the number of threads that must invoke {@link #await} |
|
283 * before the barrier is tripped |
|
284 * @throws IllegalArgumentException if {@code parties} is less than 1 |
|
285 */ |
|
286 public CyclicBarrier(int parties) { |
|
287 this(parties, null); |
|
288 } |
|
289 |
|
290 /** |
|
291 * Returns the number of parties required to trip this barrier. |
|
292 * |
|
293 * @return the number of parties required to trip this barrier |
|
294 */ |
|
295 public int getParties() { |
|
296 return parties; |
|
297 } |
|
298 |
|
299 /** |
|
300 * Waits until all {@linkplain #getParties parties} have invoked |
|
301 * <tt>await</tt> on this barrier. |
|
302 * |
|
303 * <p>If the current thread is not the last to arrive then it is |
|
304 * disabled for thread scheduling purposes and lies dormant until |
|
305 * one of the following things happens: |
|
306 * <ul> |
|
307 * <li>The last thread arrives; or |
|
308 * <li>Some other thread {@linkplain Thread#interrupt interrupts} |
|
309 * the current thread; or |
|
310 * <li>Some other thread {@linkplain Thread#interrupt interrupts} |
|
311 * one of the other waiting threads; or |
|
312 * <li>Some other thread times out while waiting for barrier; or |
|
313 * <li>Some other thread invokes {@link #reset} on this barrier. |
|
314 * </ul> |
|
315 * |
|
316 * <p>If the current thread: |
|
317 * <ul> |
|
318 * <li>has its interrupted status set on entry to this method; or |
|
319 * <li>is {@linkplain Thread#interrupt interrupted} while waiting |
|
320 * </ul> |
|
321 * then {@link InterruptedException} is thrown and the current thread's |
|
322 * interrupted status is cleared. |
|
323 * |
|
324 * <p>If the barrier is {@link #reset} while any thread is waiting, |
|
325 * or if the barrier {@linkplain #isBroken is broken} when |
|
326 * <tt>await</tt> is invoked, or while any thread is waiting, then |
|
327 * {@link BrokenBarrierException} is thrown. |
|
328 * |
|
329 * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting, |
|
330 * then all other waiting threads will throw |
|
331 * {@link BrokenBarrierException} and the barrier is placed in the broken |
|
332 * state. |
|
333 * |
|
334 * <p>If the current thread is the last thread to arrive, and a |
|
335 * non-null barrier action was supplied in the constructor, then the |
|
336 * current thread runs the action before allowing the other threads to |
|
337 * continue. |
|
338 * If an exception occurs during the barrier action then that exception |
|
339 * will be propagated in the current thread and the barrier is placed in |
|
340 * the broken state. |
|
341 * |
|
342 * @return the arrival index of the current thread, where index |
|
343 * <tt>{@link #getParties()} - 1</tt> indicates the first |
|
344 * to arrive and zero indicates the last to arrive |
|
345 * @throws InterruptedException if the current thread was interrupted |
|
346 * while waiting |
|
347 * @throws BrokenBarrierException if <em>another</em> thread was |
|
348 * interrupted or timed out while the current thread was |
|
349 * waiting, or the barrier was reset, or the barrier was |
|
350 * broken when {@code await} was called, or the barrier |
|
351 * action (if present) failed due an exception. |
|
352 */ |
|
353 public int await() throws InterruptedException, BrokenBarrierException { |
|
354 try { |
|
355 return dowait(false, 0L); |
|
356 } catch (TimeoutException toe) { |
|
357 throw new Error(toe); // cannot happen; |
|
358 } |
|
359 } |
|
360 |
|
361 /** |
|
362 * Waits until all {@linkplain #getParties parties} have invoked |
|
363 * <tt>await</tt> on this barrier, or the specified waiting time elapses. |
|
364 * |
|
365 * <p>If the current thread is not the last to arrive then it is |
|
366 * disabled for thread scheduling purposes and lies dormant until |
|
367 * one of the following things happens: |
|
368 * <ul> |
|
369 * <li>The last thread arrives; or |
|
370 * <li>The specified timeout elapses; or |
|
371 * <li>Some other thread {@linkplain Thread#interrupt interrupts} |
|
372 * the current thread; or |
|
373 * <li>Some other thread {@linkplain Thread#interrupt interrupts} |
|
374 * one of the other waiting threads; or |
|
375 * <li>Some other thread times out while waiting for barrier; or |
|
376 * <li>Some other thread invokes {@link #reset} on this barrier. |
|
377 * </ul> |
|
378 * |
|
379 * <p>If the current thread: |
|
380 * <ul> |
|
381 * <li>has its interrupted status set on entry to this method; or |
|
382 * <li>is {@linkplain Thread#interrupt interrupted} while waiting |
|
383 * </ul> |
|
384 * then {@link InterruptedException} is thrown and the current thread's |
|
385 * interrupted status is cleared. |
|
386 * |
|
387 * <p>If the specified waiting time elapses then {@link TimeoutException} |
|
388 * is thrown. If the time is less than or equal to zero, the |
|
389 * method will not wait at all. |
|
390 * |
|
391 * <p>If the barrier is {@link #reset} while any thread is waiting, |
|
392 * or if the barrier {@linkplain #isBroken is broken} when |
|
393 * <tt>await</tt> is invoked, or while any thread is waiting, then |
|
394 * {@link BrokenBarrierException} is thrown. |
|
395 * |
|
396 * <p>If any thread is {@linkplain Thread#interrupt interrupted} while |
|
397 * waiting, then all other waiting threads will throw {@link |
|
398 * BrokenBarrierException} and the barrier is placed in the broken |
|
399 * state. |
|
400 * |
|
401 * <p>If the current thread is the last thread to arrive, and a |
|
402 * non-null barrier action was supplied in the constructor, then the |
|
403 * current thread runs the action before allowing the other threads to |
|
404 * continue. |
|
405 * If an exception occurs during the barrier action then that exception |
|
406 * will be propagated in the current thread and the barrier is placed in |
|
407 * the broken state. |
|
408 * |
|
409 * @param timeout the time to wait for the barrier |
|
410 * @param unit the time unit of the timeout parameter |
|
411 * @return the arrival index of the current thread, where index |
|
412 * <tt>{@link #getParties()} - 1</tt> indicates the first |
|
413 * to arrive and zero indicates the last to arrive |
|
414 * @throws InterruptedException if the current thread was interrupted |
|
415 * while waiting |
|
416 * @throws TimeoutException if the specified timeout elapses |
|
417 * @throws BrokenBarrierException if <em>another</em> thread was |
|
418 * interrupted or timed out while the current thread was |
|
419 * waiting, or the barrier was reset, or the barrier was broken |
|
420 * when {@code await} was called, or the barrier action (if |
|
421 * present) failed due an exception |
|
422 */ |
|
423 public int await(long timeout, TimeUnit unit) |
|
424 throws InterruptedException, |
|
425 BrokenBarrierException, |
|
426 TimeoutException { |
|
427 return dowait(true, unit.toNanos(timeout)); |
|
428 } |
|
429 |
|
430 /** |
|
431 * Queries if this barrier is in a broken state. |
|
432 * |
|
433 * @return {@code true} if one or more parties broke out of this |
|
434 * barrier due to interruption or timeout since |
|
435 * construction or the last reset, or a barrier action |
|
436 * failed due to an exception; {@code false} otherwise. |
|
437 */ |
|
438 public boolean isBroken() { |
|
439 final ReentrantLock lock = this.lock; |
|
440 lock.lock(); |
|
441 try { |
|
442 return generation.broken; |
|
443 } finally { |
|
444 lock.unlock(); |
|
445 } |
|
446 } |
|
447 |
|
448 /** |
|
449 * Resets the barrier to its initial state. If any parties are |
|
450 * currently waiting at the barrier, they will return with a |
|
451 * {@link BrokenBarrierException}. Note that resets <em>after</em> |
|
452 * a breakage has occurred for other reasons can be complicated to |
|
453 * carry out; threads need to re-synchronize in some other way, |
|
454 * and choose one to perform the reset. It may be preferable to |
|
455 * instead create a new barrier for subsequent use. |
|
456 */ |
|
457 public void reset() { |
|
458 final ReentrantLock lock = this.lock; |
|
459 lock.lock(); |
|
460 try { |
|
461 breakBarrier(); // break the current generation |
|
462 nextGeneration(); // start a new generation |
|
463 } finally { |
|
464 lock.unlock(); |
|
465 } |
|
466 } |
|
467 |
|
468 /** |
|
469 * Returns the number of parties currently waiting at the barrier. |
|
470 * This method is primarily useful for debugging and assertions. |
|
471 * |
|
472 * @return the number of parties currently blocked in {@link #await} |
|
473 */ |
|
474 public int getNumberWaiting() { |
|
475 final ReentrantLock lock = this.lock; |
|
476 lock.lock(); |
|
477 try { |
|
478 return parties - count; |
|
479 } finally { |
|
480 lock.unlock(); |
|
481 } |
|
482 } |
|
483 } |