author | dl |
Wed, 28 Nov 2018 15:25:14 -0800 | |
changeset 52730 | 345266000aba |
parent 49565 | b5705ade8c8d |
permissions | -rw-r--r-- |
2 | 1 |
/* |
2 |
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
|
3 |
* |
|
4 |
* This code is free software; you can redistribute it and/or modify it |
|
5 |
* under the terms of the GNU General Public License version 2 only, as |
|
5506 | 6 |
* published by the Free Software Foundation. Oracle designates this |
2 | 7 |
* particular file as subject to the "Classpath" exception as provided |
5506 | 8 |
* by Oracle in the LICENSE file that accompanied this code. |
2 | 9 |
* |
10 |
* This code is distributed in the hope that it will be useful, but WITHOUT |
|
11 |
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
|
12 |
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
|
13 |
* version 2 for more details (a copy is included in the LICENSE file that |
|
14 |
* accompanied this code). |
|
15 |
* |
|
16 |
* You should have received a copy of the GNU General Public License version |
|
17 |
* 2 along with this work; if not, write to the Free Software Foundation, |
|
18 |
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
|
19 |
* |
|
5506 | 20 |
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
21 |
* or visit www.oracle.com if you need additional information or have any |
|
22 |
* questions. |
|
2 | 23 |
*/ |
24 |
||
25 |
/* |
|
26 |
* This file is available under and governed by the GNU General Public |
|
27 |
* License version 2 only, as published by the Free Software Foundation. |
|
28 |
* However, the following notice accompanied the original version of this |
|
29 |
* file: |
|
30 |
* |
|
31 |
* Written by Doug Lea, Bill Scherer, and Michael Scott with |
|
32 |
* assistance from members of JCP JSR-166 Expert Group and released to |
|
33 |
* the public domain, as explained at |
|
9242
ef138d47df58
7034657: Update Creative Commons license URL in legal notices
dl
parents:
8764
diff
changeset
|
34 |
* http://creativecommons.org/publicdomain/zero/1.0/ |
2 | 35 |
*/ |
36 |
||
37 |
package java.util.concurrent; |
|
38 |
||
39725
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
39 |
import java.lang.invoke.MethodHandles; |
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
40 |
import java.lang.invoke.VarHandle; |
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
41 |
import java.util.concurrent.locks.LockSupport; |
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
42 |
|
2 | 43 |
/** |
44 |
* A synchronization point at which threads can pair and swap elements |
|
45 |
* within pairs. Each thread presents some object on entry to the |
|
46 |
* {@link #exchange exchange} method, matches with a partner thread, |
|
47 |
* and receives its partner's object on return. An Exchanger may be |
|
48 |
* viewed as a bidirectional form of a {@link SynchronousQueue}. |
|
49 |
* Exchangers may be useful in applications such as genetic algorithms |
|
50 |
* and pipeline designs. |
|
51 |
* |
|
52 |
* <p><b>Sample Usage:</b> |
|
53 |
* Here are the highlights of a class that uses an {@code Exchanger} |
|
54 |
* to swap buffers between threads so that the thread filling the |
|
55 |
* buffer gets a freshly emptied one when it needs it, handing off the |
|
56 |
* filled one to the thread emptying the buffer. |
|
32991
b27c76b82713
8134853: Bulk integration of java.util.concurrent classes
dl
parents:
25859
diff
changeset
|
57 |
* <pre> {@code |
2 | 58 |
* class FillAndEmpty { |
32991
b27c76b82713
8134853: Bulk integration of java.util.concurrent classes
dl
parents:
25859
diff
changeset
|
59 |
* Exchanger<DataBuffer> exchanger = new Exchanger<>(); |
2 | 60 |
* DataBuffer initialEmptyBuffer = ... a made-up type |
61 |
* DataBuffer initialFullBuffer = ... |
|
62 |
* |
|
63 |
* class FillingLoop implements Runnable { |
|
64 |
* public void run() { |
|
65 |
* DataBuffer currentBuffer = initialEmptyBuffer; |
|
66 |
* try { |
|
67 |
* while (currentBuffer != null) { |
|
68 |
* addToBuffer(currentBuffer); |
|
69 |
* if (currentBuffer.isFull()) |
|
70 |
* currentBuffer = exchanger.exchange(currentBuffer); |
|
71 |
* } |
|
72 |
* } catch (InterruptedException ex) { ... handle ... } |
|
73 |
* } |
|
74 |
* } |
|
75 |
* |
|
76 |
* class EmptyingLoop implements Runnable { |
|
77 |
* public void run() { |
|
78 |
* DataBuffer currentBuffer = initialFullBuffer; |
|
79 |
* try { |
|
80 |
* while (currentBuffer != null) { |
|
81 |
* takeFromBuffer(currentBuffer); |
|
82 |
* if (currentBuffer.isEmpty()) |
|
83 |
* currentBuffer = exchanger.exchange(currentBuffer); |
|
84 |
* } |
|
85 |
* } catch (InterruptedException ex) { ... handle ...} |
|
86 |
* } |
|
87 |
* } |
|
88 |
* |
|
89 |
* void start() { |
|
90 |
* new Thread(new FillingLoop()).start(); |
|
91 |
* new Thread(new EmptyingLoop()).start(); |
|
92 |
* } |
|
18768 | 93 |
* }}</pre> |
2 | 94 |
* |
95 |
* <p>Memory consistency effects: For each pair of threads that |
|
96 |
* successfully exchange objects via an {@code Exchanger}, actions |
|
97 |
* prior to the {@code exchange()} in each thread |
|
98 |
* <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> |
|
99 |
* those subsequent to a return from the corresponding {@code exchange()} |
|
100 |
* in the other thread. |
|
101 |
* |
|
102 |
* @since 1.5 |
|
103 |
* @author Doug Lea and Bill Scherer and Michael Scott |
|
104 |
* @param <V> The type of objects that may be exchanged |
|
105 |
*/ |
|
106 |
public class Exchanger<V> { |
|
18768 | 107 |
|
2 | 108 |
/* |
18768 | 109 |
* Overview: The core algorithm is, for an exchange "slot", |
110 |
* and a participant (caller) with an item: |
|
2 | 111 |
* |
18768 | 112 |
* for (;;) { |
113 |
* if (slot is empty) { // offer |
|
114 |
* place item in a Node; |
|
115 |
* if (can CAS slot from empty to node) { |
|
116 |
* wait for release; |
|
117 |
* return matching item in node; |
|
118 |
* } |
|
119 |
* } |
|
120 |
* else if (can CAS slot from node to empty) { // release |
|
121 |
* get the item in node; |
|
122 |
* set matching item in node; |
|
123 |
* release waiting thread; |
|
124 |
* } |
|
125 |
* // else retry on CAS failure |
|
126 |
* } |
|
127 |
* |
|
128 |
* This is among the simplest forms of a "dual data structure" -- |
|
129 |
* see Scott and Scherer's DISC 04 paper and |
|
130 |
* http://www.cs.rochester.edu/research/synchronization/pseudocode/duals.html |
|
2 | 131 |
* |
18768 | 132 |
* This works great in principle. But in practice, like many |
133 |
* algorithms centered on atomic updates to a single location, it |
|
134 |
* scales horribly when there are more than a few participants |
|
135 |
* using the same Exchanger. So the implementation instead uses a |
|
136 |
* form of elimination arena, that spreads out this contention by |
|
137 |
* arranging that some threads typically use different slots, |
|
138 |
* while still ensuring that eventually, any two parties will be |
|
139 |
* able to exchange items. That is, we cannot completely partition |
|
140 |
* across threads, but instead give threads arena indices that |
|
141 |
* will on average grow under contention and shrink under lack of |
|
142 |
* contention. We approach this by defining the Nodes that we need |
|
143 |
* anyway as ThreadLocals, and include in them per-thread index |
|
144 |
* and related bookkeeping state. (We can safely reuse per-thread |
|
145 |
* nodes rather than creating them fresh each time because slots |
|
146 |
* alternate between pointing to a node vs null, so cannot |
|
147 |
* encounter ABA problems. However, we do need some care in |
|
148 |
* resetting them between uses.) |
|
2 | 149 |
* |
18768 | 150 |
* Implementing an effective arena requires allocating a bunch of |
151 |
* space, so we only do so upon detecting contention (except on |
|
152 |
* uniprocessors, where they wouldn't help, so aren't used). |
|
153 |
* Otherwise, exchanges use the single-slot slotExchange method. |
|
154 |
* On contention, not only must the slots be in different |
|
155 |
* locations, but the locations must not encounter memory |
|
156 |
* contention due to being on the same cache line (or more |
|
157 |
* generally, the same coherence unit). Because, as of this |
|
158 |
* writing, there is no way to determine cacheline size, we define |
|
159 |
* a value that is enough for common platforms. Additionally, |
|
160 |
* extra care elsewhere is taken to avoid other false/unintended |
|
161 |
* sharing and to enhance locality, including adding padding (via |
|
39725
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
162 |
* @Contended) to Nodes, embedding "bound" as an Exchanger field. |
18768 | 163 |
* |
164 |
* The arena starts out with only one used slot. We expand the |
|
165 |
* effective arena size by tracking collisions; i.e., failed CASes |
|
166 |
* while trying to exchange. By nature of the above algorithm, the |
|
167 |
* only kinds of collision that reliably indicate contention are |
|
168 |
* when two attempted releases collide -- one of two attempted |
|
169 |
* offers can legitimately fail to CAS without indicating |
|
170 |
* contention by more than one other thread. (Note: it is possible |
|
171 |
* but not worthwhile to more precisely detect contention by |
|
172 |
* reading slot values after CAS failures.) When a thread has |
|
173 |
* collided at each slot within the current arena bound, it tries |
|
174 |
* to expand the arena size by one. We track collisions within |
|
175 |
* bounds by using a version (sequence) number on the "bound" |
|
176 |
* field, and conservatively reset collision counts when a |
|
177 |
* participant notices that bound has been updated (in either |
|
178 |
* direction). |
|
2 | 179 |
* |
18768 | 180 |
* The effective arena size is reduced (when there is more than |
181 |
* one slot) by giving up on waiting after a while and trying to |
|
182 |
* decrement the arena size on expiration. The value of "a while" |
|
183 |
* is an empirical matter. We implement by piggybacking on the |
|
184 |
* use of spin->yield->block that is essential for reasonable |
|
185 |
* waiting performance anyway -- in a busy exchanger, offers are |
|
186 |
* usually almost immediately released, in which case context |
|
187 |
* switching on multiprocessors is extremely slow/wasteful. Arena |
|
188 |
* waits just omit the blocking part, and instead cancel. The spin |
|
189 |
* count is empirically chosen to be a value that avoids blocking |
|
190 |
* 99% of the time under maximum sustained exchange rates on a |
|
191 |
* range of test machines. Spins and yields entail some limited |
|
192 |
* randomness (using a cheap xorshift) to avoid regular patterns |
|
193 |
* that can induce unproductive grow/shrink cycles. (Using a |
|
194 |
* pseudorandom also helps regularize spin cycle duration by |
|
195 |
* making branches unpredictable.) Also, during an offer, a |
|
196 |
* waiter can "know" that it will be released when its slot has |
|
197 |
* changed, but cannot yet proceed until match is set. In the |
|
198 |
* mean time it cannot cancel the offer, so instead spins/yields. |
|
199 |
* Note: It is possible to avoid this secondary check by changing |
|
200 |
* the linearization point to be a CAS of the match field (as done |
|
201 |
* in one case in the Scott & Scherer DISC paper), which also |
|
202 |
* increases asynchrony a bit, at the expense of poorer collision |
|
203 |
* detection and inability to always reuse per-thread nodes. So |
|
204 |
* the current scheme is typically a better tradeoff. |
|
205 |
* |
|
206 |
* On collisions, indices traverse the arena cyclically in reverse |
|
207 |
* order, restarting at the maximum index (which will tend to be |
|
208 |
* sparsest) when bounds change. (On expirations, indices instead |
|
209 |
* are halved until reaching 0.) It is possible (and has been |
|
210 |
* tried) to use randomized, prime-value-stepped, or double-hash |
|
211 |
* style traversal instead of simple cyclic traversal to reduce |
|
212 |
* bunching. But empirically, whatever benefits these may have |
|
213 |
* don't overcome their added overhead: We are managing operations |
|
214 |
* that occur very quickly unless there is sustained contention, |
|
215 |
* so simpler/faster control policies work better than more |
|
216 |
* accurate but slower ones. |
|
217 |
* |
|
218 |
* Because we use expiration for arena size control, we cannot |
|
219 |
* throw TimeoutExceptions in the timed version of the public |
|
220 |
* exchange method until the arena size has shrunken to zero (or |
|
221 |
* the arena isn't enabled). This may delay response to timeout |
|
222 |
* but is still within spec. |
|
2 | 223 |
* |
18768 | 224 |
* Essentially all of the implementation is in methods |
225 |
* slotExchange and arenaExchange. These have similar overall |
|
226 |
* structure, but differ in too many details to combine. The |
|
227 |
* slotExchange method uses the single Exchanger field "slot" |
|
228 |
* rather than arena array elements. However, it still needs |
|
229 |
* minimal collision detection to trigger arena construction. |
|
230 |
* (The messiest part is making sure interrupt status and |
|
231 |
* InterruptedExceptions come out right during transitions when |
|
232 |
* both methods may be called. This is done by using null return |
|
233 |
* as a sentinel to recheck interrupt status.) |
|
2 | 234 |
* |
18768 | 235 |
* As is too common in this sort of code, methods are monolithic |
236 |
* because most of the logic relies on reads of fields that are |
|
237 |
* maintained as local variables so can't be nicely factored -- |
|
39781
8190c004acbd
8161591: Miscellaneous changes imported from jsr166 CVS 2016-07
dl
parents:
39725
diff
changeset
|
238 |
* mainly, here, bulky spin->yield->block/cancel code. Note that |
8190c004acbd
8161591: Miscellaneous changes imported from jsr166 CVS 2016-07
dl
parents:
39725
diff
changeset
|
239 |
* field Node.item is not declared as volatile even though it is |
8190c004acbd
8161591: Miscellaneous changes imported from jsr166 CVS 2016-07
dl
parents:
39725
diff
changeset
|
240 |
* read by releasing threads, because they only do so after CAS |
8190c004acbd
8161591: Miscellaneous changes imported from jsr166 CVS 2016-07
dl
parents:
39725
diff
changeset
|
241 |
* operations that must precede access, and all uses by the owning |
8190c004acbd
8161591: Miscellaneous changes imported from jsr166 CVS 2016-07
dl
parents:
39725
diff
changeset
|
242 |
* thread are otherwise acceptably ordered by other operations. |
8190c004acbd
8161591: Miscellaneous changes imported from jsr166 CVS 2016-07
dl
parents:
39725
diff
changeset
|
243 |
* (Because the actual points of atomicity are slot CASes, it |
8190c004acbd
8161591: Miscellaneous changes imported from jsr166 CVS 2016-07
dl
parents:
39725
diff
changeset
|
244 |
* would also be legal for the write to Node.match in a release to |
8190c004acbd
8161591: Miscellaneous changes imported from jsr166 CVS 2016-07
dl
parents:
39725
diff
changeset
|
245 |
* be weaker than a full volatile write. However, this is not done |
8190c004acbd
8161591: Miscellaneous changes imported from jsr166 CVS 2016-07
dl
parents:
39725
diff
changeset
|
246 |
* because it could allow further postponement of the write, |
8190c004acbd
8161591: Miscellaneous changes imported from jsr166 CVS 2016-07
dl
parents:
39725
diff
changeset
|
247 |
* delaying progress.) |
2 | 248 |
*/ |
249 |
||
18768 | 250 |
/** |
39725
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
251 |
* The index distance (as a shift value) between any two used slots |
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
252 |
* in the arena, spacing them out to avoid false sharing. |
18768 | 253 |
*/ |
39725
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
254 |
private static final int ASHIFT = 5; |
18768 | 255 |
|
256 |
/** |
|
257 |
* The maximum supported arena index. The maximum allocatable |
|
258 |
* arena size is MMASK + 1. Must be a power of two minus one, less |
|
259 |
* than (1<<(31-ASHIFT)). The cap of 255 (0xff) more than suffices |
|
260 |
* for the expected scaling limits of the main algorithms. |
|
261 |
*/ |
|
262 |
private static final int MMASK = 0xff; |
|
263 |
||
264 |
/** |
|
265 |
* Unit for sequence/version bits of bound field. Each successful |
|
266 |
* change to the bound also adds SEQ. |
|
267 |
*/ |
|
268 |
private static final int SEQ = MMASK + 1; |
|
269 |
||
2 | 270 |
/** The number of CPUs, for sizing and spin control */ |
271 |
private static final int NCPU = Runtime.getRuntime().availableProcessors(); |
|
272 |
||
273 |
/** |
|
18768 | 274 |
* The maximum slot index of the arena: The number of slots that |
275 |
* can in principle hold all threads without contention, or at |
|
276 |
* most the maximum indexable value. |
|
2 | 277 |
*/ |
18768 | 278 |
static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1; |
2 | 279 |
|
280 |
/** |
|
18768 | 281 |
* The bound for spins while waiting for a match. The actual |
282 |
* number of iterations will on average be about twice this value |
|
283 |
* due to randomization. Note: Spinning is disabled when NCPU==1. |
|
2 | 284 |
*/ |
18768 | 285 |
private static final int SPINS = 1 << 10; |
2 | 286 |
|
287 |
/** |
|
288 |
* Value representing null arguments/returns from public |
|
18768 | 289 |
* methods. Needed because the API originally didn't disallow null |
290 |
* arguments, which it should have. |
|
2 | 291 |
*/ |
292 |
private static final Object NULL_ITEM = new Object(); |
|
293 |
||
294 |
/** |
|
18768 | 295 |
* Sentinel value returned by internal exchange methods upon |
296 |
* timeout, to avoid need for separate timed versions of these |
|
297 |
* methods. |
|
2 | 298 |
*/ |
18768 | 299 |
private static final Object TIMED_OUT = new Object(); |
2 | 300 |
|
18768 | 301 |
/** |
302 |
* Nodes hold partially exchanged data, plus other per-thread |
|
35981
e3e89c0bb3d9
8145485: Miscellaneous changes imported from jsr166 CVS 2016-02
dl
parents:
34369
diff
changeset
|
303 |
* bookkeeping. Padded via @Contended to reduce memory contention. |
18768 | 304 |
*/ |
34369
b6df4cc80001
8140687: Move @Contended to the jdk.internal.vm.annotation package
chegar
parents:
33674
diff
changeset
|
305 |
@jdk.internal.vm.annotation.Contended static final class Node { |
18768 | 306 |
int index; // Arena index |
307 |
int bound; // Last recorded value of Exchanger.bound |
|
308 |
int collides; // Number of CAS failures at current bound |
|
309 |
int hash; // Pseudo-random for spins |
|
310 |
Object item; // This thread's current item |
|
311 |
volatile Object match; // Item provided by releasing thread |
|
312 |
volatile Thread parked; // Set to this thread when parked, else null |
|
2 | 313 |
} |
314 |
||
18768 | 315 |
/** The corresponding thread local class */ |
316 |
static final class Participant extends ThreadLocal<Node> { |
|
317 |
public Node initialValue() { return new Node(); } |
|
2 | 318 |
} |
319 |
||
320 |
/** |
|
32991
b27c76b82713
8134853: Bulk integration of java.util.concurrent classes
dl
parents:
25859
diff
changeset
|
321 |
* Per-thread state. |
2 | 322 |
*/ |
18768 | 323 |
private final Participant participant; |
324 |
||
325 |
/** |
|
326 |
* Elimination array; null until enabled (within slotExchange). |
|
327 |
* Element accesses use emulation of volatile gets and CAS. |
|
328 |
*/ |
|
329 |
private volatile Node[] arena; |
|
2 | 330 |
|
331 |
/** |
|
18768 | 332 |
* Slot used until contention detected. |
2 | 333 |
*/ |
18768 | 334 |
private volatile Node slot; |
2 | 335 |
|
336 |
/** |
|
18768 | 337 |
* The index of the largest valid arena position, OR'ed with SEQ |
338 |
* number in high bits, incremented on each update. The initial |
|
339 |
* update from 0 to SEQ is used to ensure that the arena array is |
|
340 |
* constructed only once. |
|
341 |
*/ |
|
342 |
private volatile int bound; |
|
343 |
||
344 |
/** |
|
345 |
* Exchange function when arenas enabled. See above for explanation. |
|
2 | 346 |
* |
347 |
* @param item the (non-null) item to exchange |
|
348 |
* @param timed true if the wait is timed |
|
18768 | 349 |
* @param ns if timed, the maximum wait time, else 0L |
350 |
* @return the other thread's item; or null if interrupted; or |
|
351 |
* TIMED_OUT if timed and timed out |
|
2 | 352 |
*/ |
18768 | 353 |
private final Object arenaExchange(Object item, boolean timed, long ns) { |
354 |
Node[] a = arena; |
|
39725
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
355 |
int alen = a.length; |
18768 | 356 |
Node p = participant.get(); |
357 |
for (int i = p.index;;) { // access slot at i |
|
39725
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
358 |
int b, m, c; |
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
359 |
int j = (i << ASHIFT) + ((1 << ASHIFT) - 1); |
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
360 |
if (j < 0 || j >= alen) |
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
361 |
j = alen - 1; |
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
362 |
Node q = (Node)AA.getAcquire(a, j); |
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
363 |
if (q != null && AA.compareAndSet(a, j, q, null)) { |
18768 | 364 |
Object v = q.item; // release |
365 |
q.match = item; |
|
366 |
Thread w = q.parked; |
|
367 |
if (w != null) |
|
39725
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
368 |
LockSupport.unpark(w); |
18768 | 369 |
return v; |
2 | 370 |
} |
18768 | 371 |
else if (i <= (m = (b = bound) & MMASK) && q == null) { |
372 |
p.item = item; // offer |
|
39725
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
373 |
if (AA.compareAndSet(a, j, null, p)) { |
18768 | 374 |
long end = (timed && m == 0) ? System.nanoTime() + ns : 0L; |
375 |
Thread t = Thread.currentThread(); // wait |
|
376 |
for (int h = p.hash, spins = SPINS;;) { |
|
377 |
Object v = p.match; |
|
378 |
if (v != null) { |
|
39725
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
379 |
MATCH.setRelease(p, null); |
18768 | 380 |
p.item = null; // clear for next use |
381 |
p.hash = h; |
|
382 |
return v; |
|
383 |
} |
|
384 |
else if (spins > 0) { |
|
385 |
h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift |
|
386 |
if (h == 0) // initialize hash |
|
387 |
h = SPINS | (int)t.getId(); |
|
388 |
else if (h < 0 && // approx 50% true |
|
389 |
(--spins & ((SPINS >>> 1) - 1)) == 0) |
|
390 |
Thread.yield(); // two yields per wait |
|
391 |
} |
|
39725
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
392 |
else if (AA.getAcquire(a, j) != p) |
18768 | 393 |
spins = SPINS; // releaser hasn't set match yet |
394 |
else if (!t.isInterrupted() && m == 0 && |
|
395 |
(!timed || |
|
396 |
(ns = end - System.nanoTime()) > 0L)) { |
|
397 |
p.parked = t; // minimize window |
|
39725
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
398 |
if (AA.getAcquire(a, j) == p) { |
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
399 |
if (ns == 0L) |
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
400 |
LockSupport.park(this); |
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
401 |
else |
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
402 |
LockSupport.parkNanos(this, ns); |
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
403 |
} |
18768 | 404 |
p.parked = null; |
405 |
} |
|
39725
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
406 |
else if (AA.getAcquire(a, j) == p && |
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
407 |
AA.compareAndSet(a, j, p, null)) { |
18768 | 408 |
if (m != 0) // try to shrink |
39725
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
409 |
BOUND.compareAndSet(this, b, b + SEQ - 1); |
18768 | 410 |
p.item = null; |
411 |
p.hash = h; |
|
412 |
i = p.index >>>= 1; // descend |
|
413 |
if (Thread.interrupted()) |
|
414 |
return null; |
|
415 |
if (timed && m == 0 && ns <= 0L) |
|
416 |
return TIMED_OUT; |
|
417 |
break; // expired; restart |
|
418 |
} |
|
419 |
} |
|
420 |
} |
|
421 |
else |
|
422 |
p.item = null; // clear offer |
|
2 | 423 |
} |
18768 | 424 |
else { |
425 |
if (p.bound != b) { // stale; reset |
|
426 |
p.bound = b; |
|
427 |
p.collides = 0; |
|
428 |
i = (i != m || m == 0) ? m : m - 1; |
|
429 |
} |
|
430 |
else if ((c = p.collides) < m || m == FULL || |
|
39725
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
431 |
!BOUND.compareAndSet(this, b, b + SEQ + 1)) { |
18768 | 432 |
p.collides = c + 1; |
433 |
i = (i == 0) ? m : i - 1; // cyclically traverse |
|
434 |
} |
|
435 |
else |
|
436 |
i = m + 1; // grow |
|
437 |
p.index = i; |
|
2 | 438 |
} |
439 |
} |
|
440 |
} |
|
441 |
||
442 |
/** |
|
18768 | 443 |
* Exchange function used until arenas enabled. See above for explanation. |
2 | 444 |
* |
18768 | 445 |
* @param item the item to exchange |
446 |
* @param timed true if the wait is timed |
|
447 |
* @param ns if timed, the maximum wait time, else 0L |
|
448 |
* @return the other thread's item; or null if either the arena |
|
449 |
* was enabled or the thread was interrupted before completion; or |
|
450 |
* TIMED_OUT if timed and timed out |
|
2 | 451 |
*/ |
18768 | 452 |
private final Object slotExchange(Object item, boolean timed, long ns) { |
453 |
Node p = participant.get(); |
|
454 |
Thread t = Thread.currentThread(); |
|
455 |
if (t.isInterrupted()) // preserve interrupt status so caller can recheck |
|
456 |
return null; |
|
2 | 457 |
|
18768 | 458 |
for (Node q;;) { |
459 |
if ((q = slot) != null) { |
|
39725
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
460 |
if (SLOT.compareAndSet(this, q, null)) { |
18768 | 461 |
Object v = q.item; |
462 |
q.match = item; |
|
463 |
Thread w = q.parked; |
|
464 |
if (w != null) |
|
39725
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
465 |
LockSupport.unpark(w); |
18768 | 466 |
return v; |
467 |
} |
|
468 |
// create arena on contention, but continue until slot null |
|
469 |
if (NCPU > 1 && bound == 0 && |
|
39725
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
470 |
BOUND.compareAndSet(this, 0, SEQ)) |
18768 | 471 |
arena = new Node[(FULL + 2) << ASHIFT]; |
2 | 472 |
} |
18768 | 473 |
else if (arena != null) |
474 |
return null; // caller must reroute to arenaExchange |
|
475 |
else { |
|
476 |
p.item = item; |
|
39725
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
477 |
if (SLOT.compareAndSet(this, null, p)) |
18768 | 478 |
break; |
479 |
p.item = null; |
|
2 | 480 |
} |
481 |
} |
|
18768 | 482 |
|
483 |
// await release |
|
484 |
int h = p.hash; |
|
485 |
long end = timed ? System.nanoTime() + ns : 0L; |
|
486 |
int spins = (NCPU > 1) ? SPINS : 1; |
|
487 |
Object v; |
|
488 |
while ((v = p.match) == null) { |
|
489 |
if (spins > 0) { |
|
490 |
h ^= h << 1; h ^= h >>> 3; h ^= h << 10; |
|
491 |
if (h == 0) |
|
492 |
h = SPINS | (int)t.getId(); |
|
493 |
else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) |
|
494 |
Thread.yield(); |
|
495 |
} |
|
496 |
else if (slot != p) |
|
497 |
spins = SPINS; |
|
498 |
else if (!t.isInterrupted() && arena == null && |
|
499 |
(!timed || (ns = end - System.nanoTime()) > 0L)) { |
|
500 |
p.parked = t; |
|
39725
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
501 |
if (slot == p) { |
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
502 |
if (ns == 0L) |
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
503 |
LockSupport.park(this); |
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
504 |
else |
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
505 |
LockSupport.parkNanos(this, ns); |
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
506 |
} |
18768 | 507 |
p.parked = null; |
508 |
} |
|
39725
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
509 |
else if (SLOT.compareAndSet(this, p, null)) { |
18768 | 510 |
v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null; |
511 |
break; |
|
512 |
} |
|
513 |
} |
|
39725
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
514 |
MATCH.setRelease(p, null); |
18768 | 515 |
p.item = null; |
516 |
p.hash = h; |
|
517 |
return v; |
|
2 | 518 |
} |
519 |
||
520 |
/** |
|
521 |
* Creates a new Exchanger. |
|
522 |
*/ |
|
523 |
public Exchanger() { |
|
18768 | 524 |
participant = new Participant(); |
2 | 525 |
} |
526 |
||
527 |
/** |
|
528 |
* Waits for another thread to arrive at this exchange point (unless |
|
529 |
* the current thread is {@linkplain Thread#interrupt interrupted}), |
|
530 |
* and then transfers the given object to it, receiving its object |
|
531 |
* in return. |
|
532 |
* |
|
533 |
* <p>If another thread is already waiting at the exchange point then |
|
534 |
* it is resumed for thread scheduling purposes and receives the object |
|
535 |
* passed in by the current thread. The current thread returns immediately, |
|
536 |
* receiving the object passed to the exchange by that other thread. |
|
537 |
* |
|
538 |
* <p>If no other thread is already waiting at the exchange then the |
|
539 |
* current thread is disabled for thread scheduling purposes and lies |
|
540 |
* dormant until one of two things happens: |
|
541 |
* <ul> |
|
542 |
* <li>Some other thread enters the exchange; or |
|
7518 | 543 |
* <li>Some other thread {@linkplain Thread#interrupt interrupts} |
544 |
* the current thread. |
|
2 | 545 |
* </ul> |
546 |
* <p>If the current thread: |
|
547 |
* <ul> |
|
548 |
* <li>has its interrupted status set on entry to this method; or |
|
549 |
* <li>is {@linkplain Thread#interrupt interrupted} while waiting |
|
550 |
* for the exchange, |
|
551 |
* </ul> |
|
552 |
* then {@link InterruptedException} is thrown and the current thread's |
|
553 |
* interrupted status is cleared. |
|
554 |
* |
|
555 |
* @param x the object to exchange |
|
556 |
* @return the object provided by the other thread |
|
557 |
* @throws InterruptedException if the current thread was |
|
558 |
* interrupted while waiting |
|
559 |
*/ |
|
11279 | 560 |
@SuppressWarnings("unchecked") |
2 | 561 |
public V exchange(V x) throws InterruptedException { |
18768 | 562 |
Object v; |
39725
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
563 |
Node[] a; |
18768 | 564 |
Object item = (x == null) ? NULL_ITEM : x; // translate null args |
39725
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
565 |
if (((a = arena) != null || |
18768 | 566 |
(v = slotExchange(item, false, 0L)) == null) && |
52730
345266000aba
8211283: Miscellaneous changes imported from jsr166 CVS 2018-11
dl
parents:
49565
diff
changeset
|
567 |
(Thread.interrupted() || // disambiguates null return |
345266000aba
8211283: Miscellaneous changes imported from jsr166 CVS 2018-11
dl
parents:
49565
diff
changeset
|
568 |
(v = arenaExchange(item, false, 0L)) == null)) |
18768 | 569 |
throw new InterruptedException(); |
570 |
return (v == NULL_ITEM) ? null : (V)v; |
|
2 | 571 |
} |
572 |
||
573 |
/** |
|
574 |
* Waits for another thread to arrive at this exchange point (unless |
|
575 |
* the current thread is {@linkplain Thread#interrupt interrupted} or |
|
576 |
* the specified waiting time elapses), and then transfers the given |
|
577 |
* object to it, receiving its object in return. |
|
578 |
* |
|
579 |
* <p>If another thread is already waiting at the exchange point then |
|
580 |
* it is resumed for thread scheduling purposes and receives the object |
|
581 |
* passed in by the current thread. The current thread returns immediately, |
|
582 |
* receiving the object passed to the exchange by that other thread. |
|
583 |
* |
|
584 |
* <p>If no other thread is already waiting at the exchange then the |
|
585 |
* current thread is disabled for thread scheduling purposes and lies |
|
586 |
* dormant until one of three things happens: |
|
587 |
* <ul> |
|
588 |
* <li>Some other thread enters the exchange; or |
|
589 |
* <li>Some other thread {@linkplain Thread#interrupt interrupts} |
|
590 |
* the current thread; or |
|
591 |
* <li>The specified waiting time elapses. |
|
592 |
* </ul> |
|
593 |
* <p>If the current thread: |
|
594 |
* <ul> |
|
595 |
* <li>has its interrupted status set on entry to this method; or |
|
596 |
* <li>is {@linkplain Thread#interrupt interrupted} while waiting |
|
597 |
* for the exchange, |
|
598 |
* </ul> |
|
599 |
* then {@link InterruptedException} is thrown and the current thread's |
|
600 |
* interrupted status is cleared. |
|
601 |
* |
|
602 |
* <p>If the specified waiting time elapses then {@link |
|
603 |
* TimeoutException} is thrown. If the time is less than or equal |
|
604 |
* to zero, the method will not wait at all. |
|
605 |
* |
|
606 |
* @param x the object to exchange |
|
607 |
* @param timeout the maximum time to wait |
|
18768 | 608 |
* @param unit the time unit of the {@code timeout} argument |
2 | 609 |
* @return the object provided by the other thread |
610 |
* @throws InterruptedException if the current thread was |
|
611 |
* interrupted while waiting |
|
612 |
* @throws TimeoutException if the specified waiting time elapses |
|
613 |
* before another thread enters the exchange |
|
614 |
*/ |
|
11279 | 615 |
@SuppressWarnings("unchecked") |
2 | 616 |
public V exchange(V x, long timeout, TimeUnit unit) |
617 |
throws InterruptedException, TimeoutException { |
|
18768 | 618 |
Object v; |
619 |
Object item = (x == null) ? NULL_ITEM : x; |
|
620 |
long ns = unit.toNanos(timeout); |
|
621 |
if ((arena != null || |
|
622 |
(v = slotExchange(item, true, ns)) == null) && |
|
52730
345266000aba
8211283: Miscellaneous changes imported from jsr166 CVS 2018-11
dl
parents:
49565
diff
changeset
|
623 |
(Thread.interrupted() || |
345266000aba
8211283: Miscellaneous changes imported from jsr166 CVS 2018-11
dl
parents:
49565
diff
changeset
|
624 |
(v = arenaExchange(item, true, ns)) == null)) |
18768 | 625 |
throw new InterruptedException(); |
626 |
if (v == TIMED_OUT) |
|
627 |
throw new TimeoutException(); |
|
628 |
return (v == NULL_ITEM) ? null : (V)v; |
|
629 |
} |
|
630 |
||
39725
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
631 |
// VarHandle mechanics |
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
632 |
private static final VarHandle BOUND; |
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
633 |
private static final VarHandle SLOT; |
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
634 |
private static final VarHandle MATCH; |
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
635 |
private static final VarHandle AA; |
18768 | 636 |
static { |
637 |
try { |
|
39725
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
638 |
MethodHandles.Lookup l = MethodHandles.lookup(); |
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
639 |
BOUND = l.findVarHandle(Exchanger.class, "bound", int.class); |
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
640 |
SLOT = l.findVarHandle(Exchanger.class, "slot", Node.class); |
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
641 |
MATCH = l.findVarHandle(Node.class, "match", Object.class); |
9548f8d846e9
8080603: Replace Unsafe with VarHandle in java.util.concurrent classes
dl
parents:
36936
diff
changeset
|
642 |
AA = MethodHandles.arrayElementVarHandle(Node[].class); |
32991
b27c76b82713
8134853: Bulk integration of java.util.concurrent classes
dl
parents:
25859
diff
changeset
|
643 |
} catch (ReflectiveOperationException e) { |
49565
b5705ade8c8d
8197531: Miscellaneous changes imported from jsr166 CVS 2018-04
dl
parents:
47216
diff
changeset
|
644 |
throw new ExceptionInInitializerError(e); |
2 | 645 |
} |
646 |
} |
|
18768 | 647 |
|
2 | 648 |
} |