196 * design is roughly similar to those in the papers "Dynamic |
207 * design is roughly similar to those in the papers "Dynamic |
197 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005 |
208 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005 |
198 * (http://research.sun.com/scalable/pubs/index.html) and |
209 * (http://research.sun.com/scalable/pubs/index.html) and |
199 * "Idempotent work stealing" by Michael, Saraswat, and Vechev, |
210 * "Idempotent work stealing" by Michael, Saraswat, and Vechev, |
200 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186). |
211 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186). |
201 * See also "Correct and Efficient Work-Stealing for Weak Memory |
212 * The main differences ultimately stem from GC requirements that |
202 * Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013 |
213 * we null out taken slots as soon as we can, to maintain as small |
203 * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an |
214 * a footprint as possible even in programs generating huge |
204 * analysis of memory ordering (atomic, volatile etc) issues. The |
215 * numbers of tasks. To accomplish this, we shift the CAS |
205 * main differences ultimately stem from GC requirements that we |
216 * arbitrating pop vs poll (steal) from being on the indices |
206 * null out taken slots as soon as we can, to maintain as small a |
217 * ("base" and "top") to the slots themselves. |
207 * footprint as possible even in programs generating huge numbers |
218 * |
208 * of tasks. To accomplish this, we shift the CAS arbitrating pop |
219 * Adding tasks then takes the form of a classic array push(task): |
209 * vs poll (steal) from being on the indices ("base" and "top") to |
220 * q.array[q.top] = task; ++q.top; |
210 * the slots themselves. So, both a successful pop and poll |
221 * |
211 * mainly entail a CAS of a slot from non-null to null. Because |
222 * (The actual code needs to null-check and size-check the array, |
212 * we rely on CASes of references, we do not need tag bits on base |
223 * properly fence the accesses, and possibly signal waiting |
213 * or top. They are simple ints as used in any circular |
224 * workers to start scanning -- see below.) Both a successful pop |
|
225 * and poll mainly entail a CAS of a slot from non-null to null. |
|
226 * |
|
227 * The pop operation (always performed by owner) is: |
|
228 * if ((base != top) and |
|
229 * (the task at top slot is not null) and |
|
230 * (CAS slot to null)) |
|
231 * decrement top and return task; |
|
232 * |
|
233 * And the poll operation (usually by a stealer) is |
|
234 * if ((base != top) and |
|
235 * (the task at base slot is not null) and |
|
236 * (base has not changed) and |
|
237 * (CAS slot to null)) |
|
238 * increment base and return task; |
|
239 * |
|
240 * Because we rely on CASes of references, we do not need tag bits |
|
241 * on base or top. They are simple ints as used in any circular |
214 * array-based queue (see for example ArrayDeque). Updates to the |
242 * array-based queue (see for example ArrayDeque). Updates to the |
215 * indices must still be ordered in a way that guarantees that top |
243 * indices guarantee that top == base means the queue is empty, |
216 * == base means the queue is empty, but otherwise may err on the |
244 * but otherwise may err on the side of possibly making the queue |
217 * side of possibly making the queue appear nonempty when a push, |
245 * appear nonempty when a push, pop, or poll have not fully |
218 * pop, or poll have not fully committed. Note that this means |
246 * committed. (Method isEmpty() checks the case of a partially |
219 * that the poll operation, considered individually, is not |
247 * completed removal of the last element.) Because of this, the |
220 * wait-free. One thief cannot successfully continue until another |
248 * poll operation, considered individually, is not wait-free. One |
221 * in-progress one (or, if previously empty, a push) completes. |
249 * thief cannot successfully continue until another in-progress |
222 * However, in the aggregate, we ensure at least probabilistic |
250 * one (or, if previously empty, a push) completes. However, in |
|
251 * the aggregate, we ensure at least probabilistic |
223 * non-blockingness. If an attempted steal fails, a thief always |
252 * non-blockingness. If an attempted steal fails, a thief always |
224 * chooses a different random victim target to try next. So, in |
253 * chooses a different random victim target to try next. So, in |
225 * order for one thief to progress, it suffices for any |
254 * order for one thief to progress, it suffices for any |
226 * in-progress poll or new push on any empty queue to |
255 * in-progress poll or new push on any empty queue to |
227 * complete. (This is why we normally use method pollAt and its |
256 * complete. (This is why we normally use method pollAt and its |
228 * variants that try once at the apparent base index, else |
257 * variants that try once at the apparent base index, else |
229 * consider alternative actions, rather than method poll.) |
258 * consider alternative actions, rather than method poll, which |
230 * |
259 * retries.) |
231 * This approach also enables support of a user mode in which local |
260 * |
232 * task processing is in FIFO, not LIFO order, simply by using |
261 * This approach also enables support of a user mode in which |
233 * poll rather than pop. This can be useful in message-passing |
262 * local task processing is in FIFO, not LIFO order, simply by |
234 * frameworks in which tasks are never joined. However neither |
263 * using poll rather than pop. This can be useful in |
235 * mode considers affinities, loads, cache localities, etc, so |
264 * message-passing frameworks in which tasks are never joined. |
236 * rarely provide the best possible performance on a given |
265 * However neither mode considers affinities, loads, cache |
237 * machine, but portably provide good throughput by averaging over |
266 * localities, etc, so rarely provide the best possible |
238 * these factors. (Further, even if we did try to use such |
267 * performance on a given machine, but portably provide good |
239 * information, we do not usually have a basis for exploiting it. |
268 * throughput by averaging over these factors. Further, even if |
240 * For example, some sets of tasks profit from cache affinities, |
269 * we did try to use such information, we do not usually have a |
241 * but others are harmed by cache pollution effects.) |
270 * basis for exploiting it. For example, some sets of tasks |
|
271 * profit from cache affinities, but others are harmed by cache |
|
272 * pollution effects. Additionally, even though it requires |
|
273 * scanning, long-term throughput is often best using random |
|
274 * selection rather than directed selection policies, so cheap |
|
275 * randomization of sufficient quality is used whenever |
|
276 * applicable. Various Marsaglia XorShifts (some with different |
|
277 * shift constants) are inlined at use points. |
242 * |
278 * |
243 * WorkQueues are also used in a similar way for tasks submitted |
279 * WorkQueues are also used in a similar way for tasks submitted |
244 * to the pool. We cannot mix these tasks in the same queues used |
280 * to the pool. We cannot mix these tasks in the same queues used |
245 * for work-stealing (this would contaminate lifo/fifo |
281 * by workers. Instead, we randomly associate submission queues |
246 * processing). Instead, we randomly associate submission queues |
|
247 * with submitting threads, using a form of hashing. The |
282 * with submitting threads, using a form of hashing. The |
248 * ThreadLocalRandom probe value serves as a hash code for |
283 * ThreadLocalRandom probe value serves as a hash code for |
249 * choosing existing queues, and may be randomly repositioned upon |
284 * choosing existing queues, and may be randomly repositioned upon |
250 * contention with other submitters. In essence, submitters act |
285 * contention with other submitters. In essence, submitters act |
251 * like workers except that they are restricted to executing local |
286 * like workers except that they are restricted to executing local |
252 * tasks that they submitted (or in the case of CountedCompleters, |
287 * tasks that they submitted (or in the case of CountedCompleters, |
253 * others with the same root task). However, because most |
288 * others with the same root task). Insertion of tasks in shared |
254 * shared/external queue operations are more expensive than |
|
255 * internal, and because, at steady state, external submitters |
|
256 * will compete for CPU with workers, ForkJoinTask.join and |
|
257 * related methods disable them from repeatedly helping to process |
|
258 * tasks if all workers are active. Insertion of tasks in shared |
|
259 * mode requires a lock (mainly to protect in the case of |
289 * mode requires a lock (mainly to protect in the case of |
260 * resizing) but we use only a simple spinlock (using bits in |
290 * resizing) but we use only a simple spinlock (using field |
261 * field qlock), because submitters encountering a busy queue move |
291 * qlock), because submitters encountering a busy queue move on to |
262 * on to try or create other queues -- they block only when |
292 * try or create other queues -- they block only when creating and |
263 * creating and registering new queues. |
293 * registering new queues. Additionally, "qlock" saturates to an |
|
294 * unlockable value (-1) at shutdown. Unlocking still can be and |
|
295 * is performed by cheaper ordered writes of "qlock" in successful |
|
296 * cases, but uses CAS in unsuccessful cases. |
264 * |
297 * |
265 * Management |
298 * Management |
266 * ========== |
299 * ========== |
267 * |
300 * |
268 * The main throughput advantages of work-stealing stem from |
301 * The main throughput advantages of work-stealing stem from |
269 * decentralized control -- workers mostly take tasks from |
302 * decentralized control -- workers mostly take tasks from |
270 * themselves or each other. We cannot negate this in the |
303 * themselves or each other, at rates that can exceed a billion |
271 * implementation of other management responsibilities. The main |
304 * per second. The pool itself creates, activates (enables |
272 * tactic for avoiding bottlenecks is packing nearly all |
305 * scanning for and running tasks), deactivates, blocks, and |
273 * essentially atomic control state into two volatile variables |
306 * terminates threads, all with minimal central information. |
274 * that are by far most often read (not written) as status and |
307 * There are only a few properties that we can globally track or |
275 * consistency checks. |
308 * maintain, so we pack them into a small number of variables, |
276 * |
309 * often maintaining atomicity without blocking or locking. |
277 * Field "ctl" contains 64 bits holding all the information needed |
310 * Nearly all essentially atomic control state is held in two |
278 * to atomically decide to add, inactivate, enqueue (on an event |
311 * volatile variables that are by far most often read (not |
|
312 * written) as status and consistency checks. (Also, field |
|
313 * "config" holds unchanging configuration state.) |
|
314 * |
|
315 * Field "ctl" contains 64 bits holding information needed to |
|
316 * atomically decide to add, inactivate, enqueue (on an event |
279 * queue), dequeue, and/or re-activate workers. To enable this |
317 * queue), dequeue, and/or re-activate workers. To enable this |
280 * packing, we restrict maximum parallelism to (1<<15)-1 (which is |
318 * packing, we restrict maximum parallelism to (1<<15)-1 (which is |
281 * far in excess of normal operating range) to allow ids, counts, |
319 * far in excess of normal operating range) to allow ids, counts, |
282 * and their negations (used for thresholding) to fit into 16bit |
320 * and their negations (used for thresholding) to fit into 16bit |
283 * fields. |
321 * subfields. |
284 * |
322 * |
285 * Field "plock" is a form of sequence lock with a saturating |
323 * Field "runState" holds lockable state bits (STARTED, STOP, etc) |
286 * shutdown bit (similarly for per-queue "qlocks"), mainly |
324 * also protecting updates to the workQueues array. When used as |
287 * protecting updates to the workQueues array, as well as to |
325 * a lock, it is normally held only for a few instructions (the |
288 * enable shutdown. When used as a lock, it is normally only very |
326 * only exceptions are one-time array initialization and uncommon |
289 * briefly held, so is nearly always available after at most a |
327 * resizing), so is nearly always available after at most a brief |
290 * brief spin, but we use a monitor-based backup strategy to |
328 * spin. But to be extra-cautious, after spinning, method |
291 * block when needed. |
329 * awaitRunStateLock (called only if an initial CAS fails), uses a |
|
330 * wait/notify mechanics on a builtin monitor to block when |
|
331 * (rarely) needed. This would be a terrible idea for a highly |
|
332 * contended lock, but most pools run without the lock ever |
|
333 * contending after the spin limit, so this works fine as a more |
|
334 * conservative alternative. Because we don't otherwise have an |
|
335 * internal Object to use as a monitor, the "stealCounter" (an |
|
336 * AtomicLong) is used when available (it too must be lazily |
|
337 * initialized; see externalSubmit). |
|
338 * |
|
339 * Usages of "runState" vs "ctl" interact in only one case: |
|
340 * deciding to add a worker thread (see tryAddWorker), in which |
|
341 * case the ctl CAS is performed while the lock is held. |
292 * |
342 * |
293 * Recording WorkQueues. WorkQueues are recorded in the |
343 * Recording WorkQueues. WorkQueues are recorded in the |
294 * "workQueues" array that is created upon first use and expanded |
344 * "workQueues" array. The array is created upon first use (see |
295 * if necessary. Updates to the array while recording new workers |
345 * externalSubmit) and expanded if necessary. Updates to the |
296 * and unrecording terminated ones are protected from each other |
346 * array while recording new workers and unrecording terminated |
297 * by a lock but the array is otherwise concurrently readable, and |
347 * ones are protected from each other by the runState lock, but |
298 * accessed directly. To simplify index-based operations, the |
348 * the array is otherwise concurrently readable, and accessed |
299 * array size is always a power of two, and all readers must |
349 * directly. We also ensure that reads of the array reference |
300 * tolerate null slots. Worker queues are at odd indices. Shared |
350 * itself never become too stale. To simplify index-based |
301 * (submission) queues are at even indices, up to a maximum of 64 |
351 * operations, the array size is always a power of two, and all |
302 * slots, to limit growth even if array needs to expand to add |
352 * readers must tolerate null slots. Worker queues are at odd |
303 * more workers. Grouping them together in this way simplifies and |
353 * indices. Shared (submission) queues are at even indices, up to |
304 * speeds up task scanning. |
354 * a maximum of 64 slots, to limit growth even if array needs to |
|
355 * expand to add more workers. Grouping them together in this way |
|
356 * simplifies and speeds up task scanning. |
305 * |
357 * |
306 * All worker thread creation is on-demand, triggered by task |
358 * All worker thread creation is on-demand, triggered by task |
307 * submissions, replacement of terminated workers, and/or |
359 * submissions, replacement of terminated workers, and/or |
308 * compensation for blocked workers. However, all other support |
360 * compensation for blocked workers. However, all other support |
309 * code is set up to work with other policies. To ensure that we |
361 * code is set up to work with other policies. To ensure that we |
310 * do not hold on to worker references that would prevent GC, ALL |
362 * do not hold on to worker references that would prevent GC, All |
311 * accesses to workQueues are via indices into the workQueues |
363 * accesses to workQueues are via indices into the workQueues |
312 * array (which is one source of some of the messy code |
364 * array (which is one source of some of the messy code |
313 * constructions here). In essence, the workQueues array serves as |
365 * constructions here). In essence, the workQueues array serves as |
314 * a weak reference mechanism. Thus for example the wait queue |
366 * a weak reference mechanism. Thus for example the stack top |
315 * field of ctl stores indices, not references. Access to the |
367 * subfield of ctl stores indices, not references. |
316 * workQueues in associated methods (for example signalWork) must |
368 * |
317 * both index-check and null-check the IDs. All such accesses |
369 * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we |
318 * ignore bad IDs by returning out early from what they are doing, |
370 * cannot let workers spin indefinitely scanning for tasks when |
319 * since this can only be associated with termination, in which |
371 * none can be found immediately, and we cannot start/resume |
320 * case it is OK to give up. All uses of the workQueues array |
372 * workers unless there appear to be tasks available. On the |
321 * also check that it is non-null (even if previously |
373 * other hand, we must quickly prod them into action when new |
322 * non-null). This allows nulling during termination, which is |
374 * tasks are submitted or generated. In many usages, ramp-up time |
323 * currently not necessary, but remains an option for |
375 * to activate workers is the main limiting factor in overall |
324 * resource-revocation-based shutdown schemes. It also helps |
376 * performance, which is compounded at program start-up by JIT |
325 * reduce JIT issuance of uncommon-trap code, which tends to |
377 * compilation and allocation. So we streamline this as much as |
326 * unnecessarily complicate control flow in some methods. |
378 * possible. |
327 * |
379 * |
328 * Event Queuing. Unlike HPC work-stealing frameworks, we cannot |
380 * The "ctl" field atomically maintains active and total worker |
329 * let workers spin indefinitely scanning for tasks when none can |
381 * counts as well as a queue to place waiting threads so they can |
330 * be found immediately, and we cannot start/resume workers unless |
382 * be located for signalling. Active counts also play the role of |
331 * there appear to be tasks available. On the other hand, we must |
383 * quiescence indicators, so are decremented when workers believe |
332 * quickly prod them into action when new tasks are submitted or |
384 * that there are no more tasks to execute. The "queue" is |
333 * generated. In many usages, ramp-up time to activate workers is |
385 * actually a form of Treiber stack. A stack is ideal for |
334 * the main limiting factor in overall performance (this is |
386 * activating threads in most-recently used order. This improves |
335 * compounded at program start-up by JIT compilation and |
387 * performance and locality, outweighing the disadvantages of |
336 * allocation). So we try to streamline this as much as possible. |
388 * being prone to contention and inability to release a worker |
337 * We park/unpark workers after placing in an event wait queue |
389 * unless it is topmost on stack. We park/unpark workers after |
338 * when they cannot find work. This "queue" is actually a simple |
390 * pushing on the idle worker stack (represented by the lower |
339 * Treiber stack, headed by the "id" field of ctl, plus a 15bit |
391 * 32bit subfield of ctl) when they cannot find work. The top |
340 * counter value (that reflects the number of times a worker has |
392 * stack state holds the value of the "scanState" field of the |
341 * been inactivated) to avoid ABA effects (we need only as many |
393 * worker: its index and status, plus a version counter that, in |
342 * version numbers as worker threads). Successors are held in |
394 * addition to the count subfields (also serving as version |
343 * field WorkQueue.nextWait. Queuing deals with several intrinsic |
395 * stamps) provide protection against Treiber stack ABA effects. |
344 * races, mainly that a task-producing thread can miss seeing (and |
396 * |
345 * signalling) another thread that gave up looking for work but |
397 * Field scanState is used by both workers and the pool to manage |
346 * has not yet entered the wait queue. We solve this by requiring |
398 * and track whether a worker is INACTIVE (possibly blocked |
347 * a full sweep of all workers (via repeated calls to method |
399 * waiting for a signal), or SCANNING for tasks (when neither hold |
348 * scan()) both before and after a newly waiting worker is added |
400 * it is busy running tasks). When a worker is inactivated, its |
349 * to the wait queue. Because enqueued workers may actually be |
401 * scanState field is set, and is prevented from executing tasks, |
350 * rescanning rather than waiting, we set and clear the "parker" |
402 * even though it must scan once for them to avoid queuing |
|
403 * races. Note that scanState updates lag queue CAS releases so |
|
404 * usage requires care. When queued, the lower 16 bits of |
|
405 * scanState must hold its pool index. So we place the index there |
|
406 * upon initialization (see registerWorker) and otherwise keep it |
|
407 * there or restore it when necessary. |
|
408 * |
|
409 * Memory ordering. See "Correct and Efficient Work-Stealing for |
|
410 * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013 |
|
411 * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an |
|
412 * analysis of memory ordering requirements in work-stealing |
|
413 * algorithms similar to the one used here. We usually need |
|
414 * stronger than minimal ordering because we must sometimes signal |
|
415 * workers, requiring Dekker-like full-fences to avoid lost |
|
416 * signals. Arranging for enough ordering without expensive |
|
417 * over-fencing requires tradeoffs among the supported means of |
|
418 * expressing access constraints. The most central operations, |
|
419 * taking from queues and updating ctl state, require full-fence |
|
420 * CAS. Array slots are read using the emulation of volatiles |
|
421 * provided by Unsafe. Access from other threads to WorkQueue |
|
422 * base, top, and array requires a volatile load of the first of |
|
423 * any of these read. We use the convention of declaring the |
|
424 * "base" index volatile, and always read it before other fields. |
|
425 * The owner thread must ensure ordered updates, so writes use |
|
426 * ordered intrinsics unless they can piggyback on those for other |
|
427 * writes. Similar conventions and rationales hold for other |
|
428 * WorkQueue fields (such as "currentSteal") that are only written |
|
429 * by owners but observed by others. |
|
430 * |
|
431 * Creating workers. To create a worker, we pre-increment total |
|
432 * count (serving as a reservation), and attempt to construct a |
|
433 * ForkJoinWorkerThread via its factory. Upon construction, the |
|
434 * new thread invokes registerWorker, where it constructs a |
|
435 * WorkQueue and is assigned an index in the workQueues array |
|
436 * (expanding the array if necessary). The thread is then |
|
437 * started. Upon any exception across these steps, or null return |
|
438 * from factory, deregisterWorker adjusts counts and records |
|
439 * accordingly. If a null return, the pool continues running with |
|
440 * fewer than the target number workers. If exceptional, the |
|
441 * exception is propagated, generally to some external caller. |
|
442 * Worker index assignment avoids the bias in scanning that would |
|
443 * occur if entries were sequentially packed starting at the front |
|
444 * of the workQueues array. We treat the array as a simple |
|
445 * power-of-two hash table, expanding as needed. The seedIndex |
|
446 * increment ensures no collisions until a resize is needed or a |
|
447 * worker is deregistered and replaced, and thereafter keeps |
|
448 * probability of collision low. We cannot use |
|
449 * ThreadLocalRandom.getProbe() for similar purposes here because |
|
450 * the thread has not started yet, but do so for creating |
|
451 * submission queues for existing external threads. |
|
452 * |
|
453 * Deactivation and waiting. Queuing encounters several intrinsic |
|
454 * races; most notably that a task-producing thread can miss |
|
455 * seeing (and signalling) another thread that gave up looking for |
|
456 * work but has not yet entered the wait queue. When a worker |
|
457 * cannot find a task to steal, it deactivates and enqueues. Very |
|
458 * often, the lack of tasks is transient due to GC or OS |
|
459 * scheduling. To reduce false-alarm deactivation, scanners |
|
460 * compute checksums of queue states during sweeps. (The |
|
461 * stability checks used here and elsewhere are probabilistic |
|
462 * variants of snapshot techniques -- see Herlihy & Shavit.) |
|
463 * Workers give up and try to deactivate only after the sum is |
|
464 * stable across scans. Further, to avoid missed signals, they |
|
465 * repeat this scanning process after successful enqueuing until |
|
466 * again stable. In this state, the worker cannot take/run a task |
|
467 * it sees until it is released from the queue, so the worker |
|
468 * itself eventually tries to release itself or any successor (see |
|
469 * tryRelease). Otherwise, upon an empty scan, a deactivated |
|
470 * worker uses an adaptive local spin construction (see awaitWork) |
|
471 * before blocking (via park). Note the unusual conventions about |
|
472 * Thread.interrupts surrounding parking and other blocking: |
|
473 * Because interrupts are used solely to alert threads to check |
|
474 * termination, which is checked anyway upon blocking, we clear |
|
475 * status (using Thread.interrupted) before any call to park, so |
|
476 * that park does not immediately return due to status being set |
|
477 * via some other unrelated call to interrupt in user code. |
|
478 * |
|
479 * Signalling and activation. Workers are created or activated |
|
480 * only when there appears to be at least one task they might be |
|
481 * able to find and execute. Upon push (either by a worker or an |
|
482 * external submission) to a previously (possibly) empty queue, |
|
483 * workers are signalled if idle, or created if fewer exist than |
|
484 * the given parallelism level. These primary signals are |
|
485 * buttressed by others whenever other threads remove a task from |
|
486 * a queue and notice that there are other tasks there as well. |
|
487 * On most platforms, signalling (unpark) overhead time is |
|
488 * noticeably long, and the time between signalling a thread and |
|
489 * it actually making progress can be very noticeably long, so it |
|
490 * is worth offloading these delays from critical paths as much as |
|
491 * possible. Also, because inactive workers are often rescanning |
|
492 * or spinning rather than blocking, we set and clear the "parker" |
351 * field of WorkQueues to reduce unnecessary calls to unpark. |
493 * field of WorkQueues to reduce unnecessary calls to unpark. |
352 * (This requires a secondary recheck to avoid missed signals.) |
494 * (This requires a secondary recheck to avoid missed signals.) |
353 * Note the unusual conventions about Thread.interrupts |
|
354 * surrounding parking and other blocking: Because interrupts are |
|
355 * used solely to alert threads to check termination, which is |
|
356 * checked anyway upon blocking, we clear status (using |
|
357 * Thread.interrupted) before any call to park, so that park does |
|
358 * not immediately return due to status being set via some other |
|
359 * unrelated call to interrupt in user code. |
|
360 * |
|
361 * Signalling. We create or wake up workers only when there |
|
362 * appears to be at least one task they might be able to find and |
|
363 * execute. When a submission is added or another worker adds a |
|
364 * task to a queue that has fewer than two tasks, they signal |
|
365 * waiting workers (or trigger creation of new ones if fewer than |
|
366 * the given parallelism level -- signalWork). These primary |
|
367 * signals are buttressed by others whenever other threads remove |
|
368 * a task from a queue and notice that there are other tasks there |
|
369 * as well. So in general, pools will be over-signalled. On most |
|
370 * platforms, signalling (unpark) overhead time is noticeably |
|
371 * long, and the time between signalling a thread and it actually |
|
372 * making progress can be very noticeably long, so it is worth |
|
373 * offloading these delays from critical paths as much as |
|
374 * possible. Additionally, workers spin-down gradually, by staying |
|
375 * alive so long as they see the ctl state changing. Similar |
|
376 * stability-sensing techniques are also used before blocking in |
|
377 * awaitJoin and helpComplete. |
|
378 * |
495 * |
379 * Trimming workers. To release resources after periods of lack of |
496 * Trimming workers. To release resources after periods of lack of |
380 * use, a worker starting to wait when the pool is quiescent will |
497 * use, a worker starting to wait when the pool is quiescent will |
381 * time out and terminate if the pool has remained quiescent for a |
498 * time out and terminate (see awaitWork) if the pool has remained |
382 * given period -- a short period if there are more threads than |
499 * quiescent for period IDLE_TIMEOUT, increasing the period as the |
383 * parallelism, longer as the number of threads decreases. This |
500 * number of threads decreases, eventually removing all workers. |
384 * will slowly propagate, eventually terminating all workers after |
501 * Also, when more than two spare threads exist, excess threads |
385 * periods of non-use. |
502 * are immediately terminated at the next quiescent point. |
386 * |
503 * (Padding by two avoids hysteresis.) |
387 * Shutdown and Termination. A call to shutdownNow atomically sets |
504 * |
388 * a plock bit and then (non-atomically) sets each worker's |
505 * Shutdown and Termination. A call to shutdownNow invokes |
389 * qlock status, cancels all unprocessed tasks, and wakes up |
506 * tryTerminate to atomically set a runState bit. The calling |
390 * all waiting workers. Detecting whether termination should |
507 * thread, as well as every other worker thereafter terminating, |
391 * commence after a non-abrupt shutdown() call requires more work |
508 * helps terminate others by setting their (qlock) status, |
392 * and bookkeeping. We need consensus about quiescence (i.e., that |
509 * cancelling their unprocessed tasks, and waking them up, doing |
393 * there is no more work). The active count provides a primary |
510 * so repeatedly until stable (but with a loop bounded by the |
394 * indication but non-abrupt shutdown still requires a rechecking |
511 * number of workers). Calls to non-abrupt shutdown() preface |
395 * scan for any workers that are inactive but not queued. |
512 * this by checking whether termination should commence. This |
|
513 * relies primarily on the active count bits of "ctl" maintaining |
|
514 * consensus -- tryTerminate is called from awaitWork whenever |
|
515 * quiescent. However, external submitters do not take part in |
|
516 * this consensus. So, tryTerminate sweeps through queues (until |
|
517 * stable) to ensure lack of in-flight submissions and workers |
|
518 * about to process them before triggering the "STOP" phase of |
|
519 * termination. (Note: there is an intrinsic conflict if |
|
520 * helpQuiescePool is called when shutdown is enabled. Both wait |
|
521 * for quiescence, but tryTerminate is biased to not trigger until |
|
522 * helpQuiescePool completes.) |
|
523 * |
396 * |
524 * |
397 * Joining Tasks |
525 * Joining Tasks |
398 * ============= |
526 * ============= |
399 * |
527 * |
400 * Any of several actions may be taken when one worker is waiting |
528 * Any of several actions may be taken when one worker is waiting |
401 * to join a task stolen (or always held) by another. Because we |
529 * to join a task stolen (or always held) by another. Because we |
402 * are multiplexing many tasks on to a pool of workers, we can't |
530 * are multiplexing many tasks on to a pool of workers, we can't |
403 * just let them block (as in Thread.join). We also cannot just |
531 * just let them block (as in Thread.join). We also cannot just |
404 * reassign the joiner's run-time stack with another and replace |
532 * reassign the joiner's run-time stack with another and replace |
405 * it later, which would be a form of "continuation", that even if |
533 * it later, which would be a form of "continuation", that even if |
406 * possible is not necessarily a good idea since we sometimes need |
534 * possible is not necessarily a good idea since we may need both |
407 * both an unblocked task and its continuation to progress. |
535 * an unblocked task and its continuation to progress. Instead we |
408 * Instead we combine two tactics: |
536 * combine two tactics: |
409 * |
537 * |
410 * Helping: Arranging for the joiner to execute some task that it |
538 * Helping: Arranging for the joiner to execute some task that it |
411 * would be running if the steal had not occurred. |
539 * would be running if the steal had not occurred. |
412 * |
540 * |
413 * Compensating: Unless there are already enough live threads, |
541 * Compensating: Unless there are already enough live threads, |
450 * potentially cyclic mutual steals. (3) It is intentionally |
578 * potentially cyclic mutual steals. (3) It is intentionally |
451 * racy: field currentJoin is updated only while actively joining, |
579 * racy: field currentJoin is updated only while actively joining, |
452 * which means that we miss links in the chain during long-lived |
580 * which means that we miss links in the chain during long-lived |
453 * tasks, GC stalls etc (which is OK since blocking in such cases |
581 * tasks, GC stalls etc (which is OK since blocking in such cases |
454 * is usually a good idea). (4) We bound the number of attempts |
582 * is usually a good idea). (4) We bound the number of attempts |
455 * to find work (see MAX_HELP) and fall back to suspending the |
583 * to find work using checksums and fall back to suspending the |
456 * worker and if necessary replacing it with another. |
584 * worker and if necessary replacing it with another. |
457 * |
585 * |
458 * Helping actions for CountedCompleters are much simpler: Method |
586 * Helping actions for CountedCompleters do not require tracking |
459 * helpComplete can take and execute any task with the same root |
587 * currentJoins: Method helpComplete takes and executes any task |
460 * as the task being waited on. However, this still entails some |
588 * with the same root as the task being waited on (preferring |
461 * traversal of completer chains, so is less efficient than using |
589 * local pops to non-local polls). However, this still entails |
462 * CountedCompleters without explicit joins. |
590 * some traversal of completer chains, so is less efficient than |
463 * |
591 * using CountedCompleters without explicit joins. |
464 * It is impossible to keep exactly the target parallelism number |
592 * |
465 * of threads running at any given time. Determining the |
593 * Compensation does not aim to keep exactly the target |
466 * existence of conservatively safe helping targets, the |
594 * parallelism number of unblocked threads running at any given |
467 * availability of already-created spares, and the apparent need |
595 * time. Some previous versions of this class employed immediate |
468 * to create new spares are all racy, so we rely on multiple |
596 * compensations for any blocked join. However, in practice, the |
469 * retries of each. Compensation in the apparent absence of |
597 * vast majority of blockages are transient byproducts of GC and |
470 * helping opportunities is challenging to control on JVMs, where |
598 * other JVM or OS activities that are made worse by replacement. |
471 * GC and other activities can stall progress of tasks that in |
599 * Currently, compensation is attempted only after validating that |
472 * turn stall out many other dependent tasks, without us being |
600 * all purportedly active threads are processing tasks by checking |
473 * able to determine whether they will ever require compensation. |
601 * field WorkQueue.scanState, which eliminates most false |
474 * Even though work-stealing otherwise encounters little |
602 * positives. Also, compensation is bypassed (tolerating fewer |
475 * degradation in the presence of more threads than cores, |
603 * threads) in the most common case in which it is rarely |
476 * aggressively adding new threads in such cases entails risk of |
604 * beneficial: when a worker with an empty queue (thus no |
477 * unwanted positive feedback control loops in which more threads |
605 * continuation tasks) blocks on a join and there still remain |
478 * cause more dependent stalls (as well as delayed progress of |
606 * enough threads to ensure liveness. |
479 * unblocked threads to the point that we know they are available) |
607 * |
480 * leading to more situations requiring more threads, and so |
608 * The compensation mechanism may be bounded. Bounds for the |
481 * on. This aspect of control can be seen as an (analytically |
609 * commonPool (see commonMaxSpares) better enable JVMs to cope |
482 * intractable) game with an opponent that may choose the worst |
610 * with programming errors and abuse before running out of |
483 * (for us) active thread to stall at any time. We take several |
611 * resources to do so. In other cases, users may supply factories |
484 * precautions to bound losses (and thus bound gains), mainly in |
612 * that limit thread construction. The effects of bounding in this |
485 * methods tryCompensate and awaitJoin. |
613 * pool (like all others) is imprecise. Total worker counts are |
|
614 * decremented when threads deregister, not when they exit and |
|
615 * resources are reclaimed by the JVM and OS. So the number of |
|
616 * simultaneously live threads may transiently exceed bounds. |
486 * |
617 * |
487 * Common Pool |
618 * Common Pool |
488 * =========== |
619 * =========== |
489 * |
620 * |
490 * The static common pool always exists after static |
621 * The static common pool always exists after static |
491 * initialization. Since it (or any other created pool) need |
622 * initialization. Since it (or any other created pool) need |
492 * never be used, we minimize initial construction overhead and |
623 * never be used, we minimize initial construction overhead and |
493 * footprint to the setup of about a dozen fields, with no nested |
624 * footprint to the setup of about a dozen fields, with no nested |
494 * allocation. Most bootstrapping occurs within method |
625 * allocation. Most bootstrapping occurs within method |
495 * fullExternalPush during the first submission to the pool. |
626 * externalSubmit during the first submission to the pool. |
496 * |
627 * |
497 * When external threads submit to the common pool, they can |
628 * When external threads submit to the common pool, they can |
498 * perform subtask processing (see externalHelpJoin and related |
629 * perform subtask processing (see externalHelpComplete and |
499 * methods). This caller-helps policy makes it sensible to set |
630 * related methods) upon joins. This caller-helps policy makes it |
500 * common pool parallelism level to one (or more) less than the |
631 * sensible to set common pool parallelism level to one (or more) |
501 * total number of available cores, or even zero for pure |
632 * less than the total number of available cores, or even zero for |
502 * caller-runs. We do not need to record whether external |
633 * pure caller-runs. We do not need to record whether external |
503 * submissions are to the common pool -- if not, externalHelpJoin |
634 * submissions are to the common pool -- if not, external help |
504 * returns quickly (at the most helping to signal some common pool |
635 * methods return quickly. These submitters would otherwise be |
505 * workers). These submitters would otherwise be blocked waiting |
636 * blocked waiting for completion, so the extra effort (with |
506 * for completion, so the extra effort (with liberally sprinkled |
637 * liberally sprinkled task status checks) in inapplicable cases |
507 * task status checks) in inapplicable cases amounts to an odd |
638 * amounts to an odd form of limited spin-wait before blocking in |
508 * form of limited spin-wait before blocking in ForkJoinTask.join. |
639 * ForkJoinTask.join. |
509 * |
640 * |
510 * As a more appropriate default in managed environments, unless |
641 * As a more appropriate default in managed environments, unless |
511 * overridden by system properties, we use workers of subclass |
642 * overridden by system properties, we use workers of subclass |
512 * InnocuousForkJoinWorkerThread when there is a SecurityManager |
643 * InnocuousForkJoinWorkerThread when there is a SecurityManager |
513 * present. These workers have no permissions set, do not belong |
644 * present. These workers have no permissions set, do not belong |
514 * to any user-defined ThreadGroup, and erase all ThreadLocals |
645 * to any user-defined ThreadGroup, and erase all ThreadLocals |
515 * after executing any top-level task (see WorkQueue.runTask). The |
646 * after executing any top-level task (see WorkQueue.runTask). |
516 * associated mechanics (mainly in ForkJoinWorkerThread) may be |
647 * The associated mechanics (mainly in ForkJoinWorkerThread) may |
517 * JVM-dependent and must access particular Thread class fields to |
648 * be JVM-dependent and must access particular Thread class fields |
518 * achieve this effect. |
649 * to achieve this effect. |
519 * |
650 * |
520 * Style notes |
651 * Style notes |
521 * =========== |
652 * =========== |
|
653 * |
|
654 * Memory ordering relies mainly on Unsafe intrinsics that carry |
|
655 * the further responsibility of explicitly performing null- and |
|
656 * bounds- checks otherwise carried out implicitly by JVMs. This |
|
657 * can be awkward and ugly, but also reflects the need to control |
|
658 * outcomes across the unusual cases that arise in very racy code |
|
659 * with very few invariants. So these explicit checks would exist |
|
660 * in some form anyway. All fields are read into locals before |
|
661 * use, and null-checked if they are references. This is usually |
|
662 * done in a "C"-like style of listing declarations at the heads |
|
663 * of methods or blocks, and using inline assignments on first |
|
664 * encounter. Array bounds-checks are usually performed by |
|
665 * masking with array.length-1, which relies on the invariant that |
|
666 * these arrays are created with positive lengths, which is itself |
|
667 * paranoically checked. Nearly all explicit checks lead to |
|
668 * bypass/return, not exception throws, because they may |
|
669 * legitimately arise due to cancellation/revocation during |
|
670 * shutdown. |
522 * |
671 * |
523 * There is a lot of representation-level coupling among classes |
672 * There is a lot of representation-level coupling among classes |
524 * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The |
673 * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The |
525 * fields of WorkQueue maintain data structures managed by |
674 * fields of WorkQueue maintain data structures managed by |
526 * ForkJoinPool, so are directly accessed. There is little point |
675 * ForkJoinPool, so are directly accessed. There is little point |
527 * trying to reduce this, since any associated future changes in |
676 * trying to reduce this, since any associated future changes in |
528 * representations will need to be accompanied by algorithmic |
677 * representations will need to be accompanied by algorithmic |
529 * changes anyway. Several methods intrinsically sprawl because |
678 * changes anyway. Several methods intrinsically sprawl because |
530 * they must accumulate sets of consistent reads of volatiles held |
679 * they must accumulate sets of consistent reads of fields held in |
531 * in local variables. Methods signalWork() and scan() are the |
680 * local variables. There are also other coding oddities |
532 * main bottlenecks, so are especially heavily |
681 * (including several unnecessary-looking hoisted null checks) |
533 * micro-optimized/mangled. There are lots of inline assignments |
682 * that help some methods perform reasonably even when interpreted |
534 * (of form "while ((local = field) != 0)") which are usually the |
683 * (not compiled). |
535 * simplest way to ensure the required read orderings (which are |
684 * |
536 * sometimes critical). This leads to a "C"-like style of listing |
685 * The order of declarations in this file is (with a few exceptions): |
537 * declarations of these locals at the heads of methods or blocks. |
|
538 * There are several occurrences of the unusual "do {} while |
|
539 * (!cas...)" which is the simplest way to force an update of a |
|
540 * CAS'ed variable. There are also other coding oddities (including |
|
541 * several unnecessary-looking hoisted null checks) that help |
|
542 * some methods perform reasonably even when interpreted (not |
|
543 * compiled). |
|
544 * |
|
545 * The order of declarations in this file is: |
|
546 * (1) Static utility functions |
686 * (1) Static utility functions |
547 * (2) Nested (static) classes |
687 * (2) Nested (static) classes |
548 * (3) Static fields |
688 * (3) Static fields |
549 * (4) Fields, along with constants used when unpacking some of them |
689 * (4) Fields, along with constants used when unpacking some of them |
550 * (5) Internal control methods |
690 * (5) Internal control methods |
891 for (ForkJoinTask<?> t; (t = poll()) != null;) |
1023 for (ForkJoinTask<?> t; (t = poll()) != null;) |
892 t.doExec(); |
1024 t.doExec(); |
893 } |
1025 } |
894 |
1026 |
895 /** |
1027 /** |
896 * Executes a top-level task and any local tasks remaining |
1028 * Removes and executes all local tasks. If LIFO, invokes |
897 * after execution. |
1029 * pollAndExecAll. Otherwise implements a specialized pop loop |
|
1030 * to exec until empty. |
898 */ |
1031 */ |
899 final void runTask(ForkJoinTask<?> task) { |
1032 final void execLocalTasks() { |
900 if ((currentSteal = task) != null) { |
1033 int b = base, m, s; |
901 ForkJoinWorkerThread thread; |
1034 ForkJoinTask<?>[] a = array; |
902 task.doExec(); |
1035 if (b - (s = top - 1) <= 0 && a != null && |
903 ForkJoinTask<?>[] a = array; |
1036 (m = a.length - 1) >= 0) { |
904 int md = mode; |
1037 if ((config & FIFO_QUEUE) == 0) { |
905 ++nsteals; |
1038 for (ForkJoinTask<?> t;;) { |
906 currentSteal = null; |
1039 if ((t = (ForkJoinTask<?>)U.getAndSetObject |
907 if (md != 0) |
1040 (a, ((m & s) << ASHIFT) + ABASE, null)) == null) |
908 pollAndExecAll(); |
1041 break; |
909 else if (a != null) { |
1042 U.putOrderedInt(this, QTOP, s); |
910 int s, m = a.length - 1; |
|
911 ForkJoinTask<?> t; |
|
912 while ((s = top - 1) - base >= 0 && |
|
913 (t = (ForkJoinTask<?>)U.getAndSetObject |
|
914 (a, ((m & s) << ASHIFT) + ABASE, null)) != null) { |
|
915 top = s; |
|
916 t.doExec(); |
1043 t.doExec(); |
917 } |
1044 if (base - (s = top - 1) > 0) |
918 } |
|
919 if ((thread = owner) != null) // no need to do in finally clause |
|
920 thread.afterTopLevelExec(); |
|
921 } |
|
922 } |
|
923 |
|
924 /** |
|
925 * If present, removes from queue and executes the given task, |
|
926 * or any other cancelled task. Returns (true) on any CAS |
|
927 * or consistency check failure so caller can retry. |
|
928 * |
|
929 * @return false if no progress can be made, else true |
|
930 */ |
|
931 final boolean tryRemoveAndExec(ForkJoinTask<?> task) { |
|
932 boolean stat; |
|
933 ForkJoinTask<?>[] a; int m, s, b, n; |
|
934 if (task != null && (a = array) != null && (m = a.length - 1) >= 0 && |
|
935 (n = (s = top) - (b = base)) > 0) { |
|
936 boolean removed = false, empty = true; |
|
937 stat = true; |
|
938 for (ForkJoinTask<?> t;;) { // traverse from s to b |
|
939 long j = ((--s & m) << ASHIFT) + ABASE; |
|
940 t = (ForkJoinTask<?>)U.getObject(a, j); |
|
941 if (t == null) // inconsistent length |
|
942 break; |
|
943 else if (t == task) { |
|
944 if (s + 1 == top) { // pop |
|
945 if (!U.compareAndSwapObject(a, j, task, null)) |
|
946 break; |
|
947 top = s; |
|
948 removed = true; |
|
949 } |
|
950 else if (base == b) // replace with proxy |
|
951 removed = U.compareAndSwapObject(a, j, task, |
|
952 new EmptyTask()); |
|
953 break; |
|
954 } |
|
955 else if (t.status >= 0) |
|
956 empty = false; |
|
957 else if (s + 1 == top) { // pop and throw away |
|
958 if (U.compareAndSwapObject(a, j, t, null)) |
|
959 top = s; |
|
960 break; |
|
961 } |
|
962 if (--n == 0) { |
|
963 if (!empty && base == b) |
|
964 stat = false; |
|
965 break; |
|
966 } |
|
967 } |
|
968 if (removed) |
|
969 task.doExec(); |
|
970 } |
|
971 else |
|
972 stat = false; |
|
973 return stat; |
|
974 } |
|
975 |
|
976 /** |
|
977 * Tries to poll for and execute the given task or any other |
|
978 * task in its CountedCompleter computation. |
|
979 */ |
|
980 final boolean pollAndExecCC(CountedCompleter<?> root) { |
|
981 ForkJoinTask<?>[] a; int b; Object o; CountedCompleter<?> t, r; |
|
982 if ((b = base) - top < 0 && (a = array) != null) { |
|
983 long j = (((a.length - 1) & b) << ASHIFT) + ABASE; |
|
984 if ((o = U.getObjectVolatile(a, j)) == null) |
|
985 return true; // retry |
|
986 if (o instanceof CountedCompleter) { |
|
987 for (t = (CountedCompleter<?>)o, r = t;;) { |
|
988 if (r == root) { |
|
989 if (base == b && |
|
990 U.compareAndSwapObject(a, j, t, null)) { |
|
991 U.putOrderedInt(this, QBASE, b + 1); |
|
992 t.doExec(); |
|
993 } |
|
994 return true; |
|
995 } |
|
996 else if ((r = r.completer) == null) |
|
997 break; // not part of root computation |
|
998 } |
|
999 } |
|
1000 } |
|
1001 return false; |
|
1002 } |
|
1003 |
|
1004 /** |
|
1005 * Tries to pop and execute the given task or any other task |
|
1006 * in its CountedCompleter computation. |
|
1007 */ |
|
1008 final boolean externalPopAndExecCC(CountedCompleter<?> root) { |
|
1009 ForkJoinTask<?>[] a; int s; Object o; CountedCompleter<?> t, r; |
|
1010 if (base - (s = top) < 0 && (a = array) != null) { |
|
1011 long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; |
|
1012 if ((o = U.getObject(a, j)) instanceof CountedCompleter) { |
|
1013 for (t = (CountedCompleter<?>)o, r = t;;) { |
|
1014 if (r == root) { |
|
1015 if (U.compareAndSwapInt(this, QLOCK, 0, 1)) { |
|
1016 if (top == s && array == a && |
|
1017 U.compareAndSwapObject(a, j, t, null)) { |
|
1018 top = s - 1; |
|
1019 qlock = 0; |
|
1020 t.doExec(); |
|
1021 } |
|
1022 else |
|
1023 qlock = 0; |
|
1024 } |
|
1025 return true; |
|
1026 } |
|
1027 else if ((r = r.completer) == null) |
|
1028 break; |
1045 break; |
1029 } |
1046 } |
1030 } |
1047 } |
1031 } |
1048 else |
1032 return false; |
1049 pollAndExecAll(); |
|
1050 } |
1033 } |
1051 } |
1034 |
1052 |
1035 /** |
1053 /** |
1036 * Internal version |
1054 * Executes the given task and any remaining local tasks. |
1037 */ |
1055 */ |
1038 final boolean internalPopAndExecCC(CountedCompleter<?> root) { |
1056 final void runTask(ForkJoinTask<?> task) { |
1039 ForkJoinTask<?>[] a; int s; Object o; CountedCompleter<?> t, r; |
1057 if (task != null) { |
|
1058 scanState &= ~SCANNING; // mark as busy |
|
1059 (currentSteal = task).doExec(); |
|
1060 U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC |
|
1061 execLocalTasks(); |
|
1062 ForkJoinWorkerThread thread = owner; |
|
1063 if (++nsteals < 0) // collect on overflow |
|
1064 transferStealCount(pool); |
|
1065 scanState |= SCANNING; |
|
1066 if (thread != null) |
|
1067 thread.afterTopLevelExec(); |
|
1068 } |
|
1069 } |
|
1070 |
|
1071 /** |
|
1072 * Adds steal count to pool stealCounter if it exists, and resets. |
|
1073 */ |
|
1074 final void transferStealCount(ForkJoinPool p) { |
|
1075 AtomicLong sc; |
|
1076 if (p != null && (sc = p.stealCounter) != null) { |
|
1077 int s = nsteals; |
|
1078 nsteals = 0; // if negative, correct for overflow |
|
1079 sc.getAndAdd((long)(s < 0 ? Integer.MAX_VALUE : s)); |
|
1080 } |
|
1081 } |
|
1082 |
|
1083 /** |
|
1084 * If present, removes from queue and executes the given task, |
|
1085 * or any other cancelled task. Used only by awaitJoin. |
|
1086 * |
|
1087 * @return true if queue empty and task not known to be done |
|
1088 */ |
|
1089 final boolean tryRemoveAndExec(ForkJoinTask<?> task) { |
|
1090 ForkJoinTask<?>[] a; int m, s, b, n; |
|
1091 if ((a = array) != null && (m = a.length - 1) >= 0 && |
|
1092 task != null) { |
|
1093 while ((n = (s = top) - (b = base)) > 0) { |
|
1094 for (ForkJoinTask<?> t;;) { // traverse from s to b |
|
1095 long j = ((--s & m) << ASHIFT) + ABASE; |
|
1096 if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null) |
|
1097 return s + 1 == top; // shorter than expected |
|
1098 else if (t == task) { |
|
1099 boolean removed = false; |
|
1100 if (s + 1 == top) { // pop |
|
1101 if (U.compareAndSwapObject(a, j, task, null)) { |
|
1102 U.putOrderedInt(this, QTOP, s); |
|
1103 removed = true; |
|
1104 } |
|
1105 } |
|
1106 else if (base == b) // replace with proxy |
|
1107 removed = U.compareAndSwapObject( |
|
1108 a, j, task, new EmptyTask()); |
|
1109 if (removed) |
|
1110 task.doExec(); |
|
1111 break; |
|
1112 } |
|
1113 else if (t.status < 0 && s + 1 == top) { |
|
1114 if (U.compareAndSwapObject(a, j, t, null)) |
|
1115 U.putOrderedInt(this, QTOP, s); |
|
1116 break; // was cancelled |
|
1117 } |
|
1118 if (--n == 0) |
|
1119 return false; |
|
1120 } |
|
1121 if (task.status < 0) |
|
1122 return false; |
|
1123 } |
|
1124 } |
|
1125 return true; |
|
1126 } |
|
1127 |
|
1128 /** |
|
1129 * Pops task if in the same CC computation as the given task, |
|
1130 * in either shared or owned mode. Used only by helpComplete. |
|
1131 */ |
|
1132 final CountedCompleter<?> popCC(CountedCompleter<?> task, int mode) { |
|
1133 int s; ForkJoinTask<?>[] a; Object o; |
1040 if (base - (s = top) < 0 && (a = array) != null) { |
1134 if (base - (s = top) < 0 && (a = array) != null) { |
1041 long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; |
1135 long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; |
1042 if ((o = U.getObject(a, j)) instanceof CountedCompleter) { |
1136 if ((o = U.getObjectVolatile(a, j)) != null && |
1043 for (t = (CountedCompleter<?>)o, r = t;;) { |
1137 (o instanceof CountedCompleter)) { |
1044 if (r == root) { |
1138 CountedCompleter<?> t = (CountedCompleter<?>)o; |
1045 if (U.compareAndSwapObject(a, j, t, null)) { |
1139 for (CountedCompleter<?> r = t;;) { |
1046 top = s - 1; |
1140 if (r == task) { |
1047 t.doExec(); |
1141 if (mode < 0) { // must lock |
|
1142 if (U.compareAndSwapInt(this, QLOCK, 0, 1)) { |
|
1143 if (top == s && array == a && |
|
1144 U.compareAndSwapObject(a, j, t, null)) { |
|
1145 U.putOrderedInt(this, QTOP, s - 1); |
|
1146 U.putOrderedInt(this, QLOCK, 0); |
|
1147 return t; |
|
1148 } |
|
1149 U.compareAndSwapInt(this, QLOCK, 1, 0); |
|
1150 } |
1048 } |
1151 } |
1049 return true; |
1152 else if (U.compareAndSwapObject(a, j, t, null)) { |
|
1153 U.putOrderedInt(this, QTOP, s - 1); |
|
1154 return t; |
|
1155 } |
|
1156 break; |
1050 } |
1157 } |
1051 else if ((r = r.completer) == null) |
1158 else if ((r = r.completer) == null) // try parent |
1052 break; |
1159 break; |
1053 } |
1160 } |
1054 } |
1161 } |
1055 } |
1162 } |
1056 return false; |
1163 return null; |
|
1164 } |
|
1165 |
|
1166 /** |
|
1167 * Steals and runs a task in the same CC computation as the |
|
1168 * given task if one exists and can be taken without |
|
1169 * contention. Otherwise returns a checksum/control value for |
|
1170 * use by method helpComplete. |
|
1171 * |
|
1172 * @return 1 if successful, 2 if retryable (lost to another |
|
1173 * stealer), -1 if non-empty but no matching task found, else |
|
1174 * the base index, forced negative. |
|
1175 */ |
|
1176 final int pollAndExecCC(CountedCompleter<?> task) { |
|
1177 int b, h; ForkJoinTask<?>[] a; Object o; |
|
1178 if ((b = base) - top >= 0 || (a = array) == null) |
|
1179 h = b | Integer.MIN_VALUE; // to sense movement on re-poll |
|
1180 else { |
|
1181 long j = (((a.length - 1) & b) << ASHIFT) + ABASE; |
|
1182 if ((o = U.getObjectVolatile(a, j)) == null) |
|
1183 h = 2; // retryable |
|
1184 else if (!(o instanceof CountedCompleter)) |
|
1185 h = -1; // unmatchable |
|
1186 else { |
|
1187 CountedCompleter<?> t = (CountedCompleter<?>)o; |
|
1188 for (CountedCompleter<?> r = t;;) { |
|
1189 if (r == task) { |
|
1190 if (base == b && |
|
1191 U.compareAndSwapObject(a, j, t, null)) { |
|
1192 base = b + 1; |
|
1193 t.doExec(); |
|
1194 h = 1; // success |
|
1195 } |
|
1196 else |
|
1197 h = 2; // lost CAS |
|
1198 break; |
|
1199 } |
|
1200 else if ((r = r.completer) == null) { |
|
1201 h = -1; // unmatched |
|
1202 break; |
|
1203 } |
|
1204 } |
|
1205 } |
|
1206 } |
|
1207 return h; |
1057 } |
1208 } |
1058 |
1209 |
1059 /** |
1210 /** |
1060 * Returns true if owned and not known to be blocked. |
1211 * Returns true if owned and not known to be blocked. |
1061 */ |
1212 */ |
1062 final boolean isApparentlyUnblocked() { |
1213 final boolean isApparentlyUnblocked() { |
1063 Thread wt; Thread.State s; |
1214 Thread wt; Thread.State s; |
1064 return (eventCount >= 0 && |
1215 return (scanState >= 0 && |
1065 (wt = owner) != null && |
1216 (wt = owner) != null && |
1066 (s = wt.getState()) != Thread.State.BLOCKED && |
1217 (s = wt.getState()) != Thread.State.BLOCKED && |
1067 s != Thread.State.WAITING && |
1218 s != Thread.State.WAITING && |
1068 s != Thread.State.TIMED_WAITING); |
1219 s != Thread.State.TIMED_WAITING); |
1069 } |
1220 } |
1070 |
1221 |
1071 // Unsafe mechanics |
1222 // Unsafe mechanics. Note that some are (and must be) the same as in FJP |
1072 private static final sun.misc.Unsafe U; |
1223 private static final sun.misc.Unsafe U; |
1073 private static final long QBASE; |
1224 private static final int ABASE; |
|
1225 private static final int ASHIFT; |
|
1226 private static final long QTOP; |
1074 private static final long QLOCK; |
1227 private static final long QLOCK; |
1075 private static final int ABASE; |
1228 private static final long QCURRENTSTEAL; |
1076 private static final int ASHIFT; |
|
1077 static { |
1229 static { |
1078 try { |
1230 try { |
1079 U = sun.misc.Unsafe.getUnsafe(); |
1231 U = sun.misc.Unsafe.getUnsafe(); |
1080 Class<?> k = WorkQueue.class; |
1232 Class<?> wk = WorkQueue.class; |
1081 Class<?> ak = ForkJoinTask[].class; |
1233 Class<?> ak = ForkJoinTask[].class; |
1082 QBASE = U.objectFieldOffset |
1234 QTOP = U.objectFieldOffset |
1083 (k.getDeclaredField("base")); |
1235 (wk.getDeclaredField("top")); |
1084 QLOCK = U.objectFieldOffset |
1236 QLOCK = U.objectFieldOffset |
1085 (k.getDeclaredField("qlock")); |
1237 (wk.getDeclaredField("qlock")); |
|
1238 QCURRENTSTEAL = U.objectFieldOffset |
|
1239 (wk.getDeclaredField("currentSteal")); |
1086 ABASE = U.arrayBaseOffset(ak); |
1240 ABASE = U.arrayBaseOffset(ak); |
1087 int scale = U.arrayIndexScale(ak); |
1241 int scale = U.arrayIndexScale(ak); |
1088 if ((scale & (scale - 1)) != 0) |
1242 if ((scale & (scale - 1)) != 0) |
1089 throw new Error("data type scale not a power of two"); |
1243 throw new Error("data type scale not a power of two"); |
1090 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); |
1244 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); |
1136 */ |
1295 */ |
1137 private static final synchronized int nextPoolId() { |
1296 private static final synchronized int nextPoolId() { |
1138 return ++poolNumberSequence; |
1297 return ++poolNumberSequence; |
1139 } |
1298 } |
1140 |
1299 |
1141 // static constants |
1300 // static configuration constants |
1142 |
1301 |
1143 /** |
1302 /** |
1144 * Initial timeout value (in nanoseconds) for the thread |
1303 * Initial timeout value (in nanoseconds) for the thread |
1145 * triggering quiescence to park waiting for new work. On timeout, |
1304 * triggering quiescence to park waiting for new work. On timeout, |
1146 * the thread will instead try to shrink the number of |
1305 * the thread will instead try to shrink the number of |
1147 * workers. The value should be large enough to avoid overly |
1306 * workers. The value should be large enough to avoid overly |
1148 * aggressive shrinkage during most transient stalls (long GCs |
1307 * aggressive shrinkage during most transient stalls (long GCs |
1149 * etc). |
1308 * etc). |
1150 */ |
1309 */ |
1151 private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec |
1310 private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec |
1152 |
|
1153 /** |
|
1154 * Timeout value when there are more threads than parallelism level |
|
1155 */ |
|
1156 private static final long FAST_IDLE_TIMEOUT = 200L * 1000L * 1000L; |
|
1157 |
1311 |
1158 /** |
1312 /** |
1159 * Tolerance for idle timeouts, to cope with timer undershoots |
1313 * Tolerance for idle timeouts, to cope with timer undershoots |
1160 */ |
1314 */ |
1161 private static final long TIMEOUT_SLOP = 2000000L; |
1315 private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L; // 20ms |
1162 |
1316 |
1163 /** |
1317 /** |
1164 * The maximum stolen->joining link depth allowed in method |
1318 * The initial value for commonMaxSpares during static |
1165 * tryHelpStealer. Must be a power of two. Depths for legitimate |
1319 * initialization unless overridden using System property |
1166 * chains are unbounded, but we use a fixed constant to avoid |
1320 * "java.util.concurrent.ForkJoinPool.common.maximumSpares". The |
1167 * (otherwise unchecked) cycles and to bound staleness of |
1321 * default value is far in excess of normal requirements, but also |
1168 * traversal parameters at the expense of sometimes blocking when |
1322 * far short of MAX_CAP and typical OS thread limits, so allows |
1169 * we could be helping. |
1323 * JVMs to catch misuse/abuse before running out of resources |
1170 */ |
1324 * needed to do so. |
1171 private static final int MAX_HELP = 64; |
1325 */ |
|
1326 private static final int DEFAULT_COMMON_MAX_SPARES = 256; |
|
1327 |
|
1328 /** |
|
1329 * Number of times to spin-wait before blocking. The spins (in |
|
1330 * awaitRunStateLock and awaitWork) currently use randomized |
|
1331 * spins. If/when MWAIT-like intrinsics becomes available, they |
|
1332 * may allow quieter spinning. The value of SPINS must be a power |
|
1333 * of two, at least 4. The current value causes spinning for a |
|
1334 * small fraction of typical context-switch times, well worthwhile |
|
1335 * given the typical likelihoods that blocking is not necessary. |
|
1336 */ |
|
1337 private static final int SPINS = 1 << 11; |
1172 |
1338 |
1173 /** |
1339 /** |
1174 * Increment for seed generators. See class ThreadLocal for |
1340 * Increment for seed generators. See class ThreadLocal for |
1175 * explanation. |
1341 * explanation. |
1176 */ |
1342 */ |
1177 private static final int SEED_INCREMENT = 0x9e3779b9; |
1343 private static final int SEED_INCREMENT = 0x9e3779b9; |
1178 |
1344 |
1179 /* |
1345 /* |
1180 * Bits and masks for control variables |
1346 * Bits and masks for field ctl, packed with 4 16 bit subfields: |
1181 * |
1347 * AC: Number of active running workers minus target parallelism |
1182 * Field ctl is a long packed with: |
1348 * TC: Number of total workers minus target parallelism |
1183 * AC: Number of active running workers minus target parallelism (16 bits) |
1349 * SS: version count and status of top waiting thread |
1184 * TC: Number of total workers minus target parallelism (16 bits) |
1350 * ID: poolIndex of top of Treiber stack of waiters |
1185 * ST: true if pool is terminating (1 bit) |
1351 * |
1186 * EC: the wait count of top waiting thread (15 bits) |
1352 * When convenient, we can extract the lower 32 stack top bits |
1187 * ID: poolIndex of top of Treiber stack of waiters (16 bits) |
1353 * (including version bits) as sp=(int)ctl. The offsets of counts |
1188 * |
1354 * by the target parallelism and the positionings of fields makes |
1189 * When convenient, we can extract the upper 32 bits of counts and |
1355 * it possible to perform the most common checks via sign tests of |
1190 * the lower 32 bits of queue state, u = (int)(ctl >>> 32) and e = |
1356 * fields: When ac is negative, there are not enough active |
1191 * (int)ctl. The ec field is never accessed alone, but always |
1357 * workers, when tc is negative, there are not enough total |
1192 * together with id and st. The offsets of counts by the target |
1358 * workers. When sp is non-zero, there are waiting workers. To |
1193 * parallelism and the positionings of fields makes it possible to |
1359 * deal with possibly negative fields, we use casts in and out of |
1194 * perform the most common checks via sign tests of fields: When |
1360 * "short" and/or signed shifts to maintain signedness. |
1195 * ac is negative, there are not enough active workers, when tc is |
1361 * |
1196 * negative, there are not enough total workers, and when e is |
1362 * Because it occupies uppermost bits, we can add one active count |
1197 * negative, the pool is terminating. To deal with these possibly |
1363 * using getAndAddLong of AC_UNIT, rather than CAS, when returning |
1198 * negative fields, we use casts in and out of "short" and/or |
1364 * from a blocked join. Other updates entail multiple subfields |
1199 * signed shifts to maintain signedness. |
1365 * and masking, requiring CAS. |
1200 * |
1366 */ |
1201 * When a thread is queued (inactivated), its eventCount field is |
1367 |
1202 * set negative, which is the only way to tell if a worker is |
1368 // Lower and upper word masks |
1203 * prevented from executing tasks, even though it must continue to |
1369 private static final long SP_MASK = 0xffffffffL; |
1204 * scan for them to avoid queuing races. Note however that |
1370 private static final long UC_MASK = ~SP_MASK; |
1205 * eventCount updates lag releases so usage requires care. |
1371 |
1206 * |
1372 // Active counts |
1207 * Field plock is an int packed with: |
|
1208 * SHUTDOWN: true if shutdown is enabled (1 bit) |
|
1209 * SEQ: a sequence lock, with PL_LOCK bit set if locked (30 bits) |
|
1210 * SIGNAL: set when threads may be waiting on the lock (1 bit) |
|
1211 * |
|
1212 * The sequence number enables simple consistency checks: |
|
1213 * Staleness of read-only operations on the workQueues array can |
|
1214 * be checked by comparing plock before vs after the reads. |
|
1215 */ |
|
1216 |
|
1217 // bit positions/shifts for fields |
|
1218 private static final int AC_SHIFT = 48; |
1373 private static final int AC_SHIFT = 48; |
|
1374 private static final long AC_UNIT = 0x0001L << AC_SHIFT; |
|
1375 private static final long AC_MASK = 0xffffL << AC_SHIFT; |
|
1376 |
|
1377 // Total counts |
1219 private static final int TC_SHIFT = 32; |
1378 private static final int TC_SHIFT = 32; |
1220 private static final int ST_SHIFT = 31; |
1379 private static final long TC_UNIT = 0x0001L << TC_SHIFT; |
1221 private static final int EC_SHIFT = 16; |
1380 private static final long TC_MASK = 0xffffL << TC_SHIFT; |
1222 |
1381 private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign |
1223 // bounds |
1382 |
1224 private static final int SMASK = 0xffff; // short bits |
1383 // runState bits: SHUTDOWN must be negative, others arbitrary powers of two |
1225 private static final int MAX_CAP = 0x7fff; // max #workers - 1 |
1384 private static final int RSLOCK = 1; |
1226 private static final int EVENMASK = 0xfffe; // even short bits |
1385 private static final int RSIGNAL = 1 << 1; |
1227 private static final int SQMASK = 0x007e; // max 64 (even) slots |
1386 private static final int STARTED = 1 << 2; |
1228 private static final int SHORT_SIGN = 1 << 15; |
1387 private static final int STOP = 1 << 29; |
1229 private static final int INT_SIGN = 1 << 31; |
1388 private static final int TERMINATED = 1 << 30; |
1230 |
1389 private static final int SHUTDOWN = 1 << 31; |
1231 // masks |
|
1232 private static final long STOP_BIT = 0x0001L << ST_SHIFT; |
|
1233 private static final long AC_MASK = ((long)SMASK) << AC_SHIFT; |
|
1234 private static final long TC_MASK = ((long)SMASK) << TC_SHIFT; |
|
1235 |
|
1236 // units for incrementing and decrementing |
|
1237 private static final long TC_UNIT = 1L << TC_SHIFT; |
|
1238 private static final long AC_UNIT = 1L << AC_SHIFT; |
|
1239 |
|
1240 // masks and units for dealing with u = (int)(ctl >>> 32) |
|
1241 private static final int UAC_SHIFT = AC_SHIFT - 32; |
|
1242 private static final int UTC_SHIFT = TC_SHIFT - 32; |
|
1243 private static final int UAC_MASK = SMASK << UAC_SHIFT; |
|
1244 private static final int UTC_MASK = SMASK << UTC_SHIFT; |
|
1245 private static final int UAC_UNIT = 1 << UAC_SHIFT; |
|
1246 private static final int UTC_UNIT = 1 << UTC_SHIFT; |
|
1247 |
|
1248 // masks and units for dealing with e = (int)ctl |
|
1249 private static final int E_MASK = 0x7fffffff; // no STOP_BIT |
|
1250 private static final int E_SEQ = 1 << EC_SHIFT; |
|
1251 |
|
1252 // plock bits |
|
1253 private static final int SHUTDOWN = 1 << 31; |
|
1254 private static final int PL_LOCK = 2; |
|
1255 private static final int PL_SIGNAL = 1; |
|
1256 private static final int PL_SPINS = 1 << 8; |
|
1257 |
|
1258 // access mode for WorkQueue |
|
1259 static final int LIFO_QUEUE = 0; |
|
1260 static final int FIFO_QUEUE = 1; |
|
1261 static final int SHARED_QUEUE = -1; |
|
1262 |
1390 |
1263 // Instance fields |
1391 // Instance fields |
1264 volatile long stealCount; // collects worker counts |
1392 volatile long ctl; // main pool control |
1265 volatile long ctl; // main pool control |
1393 volatile int runState; // lockable status |
1266 volatile int plock; // shutdown status and seqLock |
1394 final int config; // parallelism, mode |
1267 volatile int indexSeed; // worker/submitter index seed |
1395 int indexSeed; // to generate worker index |
1268 final short parallelism; // parallelism level |
1396 volatile WorkQueue[] workQueues; // main registry |
1269 final short mode; // LIFO/FIFO |
|
1270 WorkQueue[] workQueues; // main registry |
|
1271 final ForkJoinWorkerThreadFactory factory; |
1397 final ForkJoinWorkerThreadFactory factory; |
1272 final UncaughtExceptionHandler ueh; // per-worker UEH |
1398 final UncaughtExceptionHandler ueh; // per-worker UEH |
1273 final String workerNamePrefix; // to create worker name string |
1399 final String workerNamePrefix; // to create worker name string |
1274 |
1400 volatile AtomicLong stealCounter; // also used as sync monitor |
1275 /** |
1401 |
1276 * Acquires the plock lock to protect worker array and related |
1402 /** |
1277 * updates. This method is called only if an initial CAS on plock |
1403 * Acquires the runState lock; returns current (locked) runState. |
1278 * fails. This acts as a spinlock for normal cases, but falls back |
1404 */ |
1279 * to builtin monitor to block when (rarely) needed. This would be |
1405 private int lockRunState() { |
1280 * a terrible idea for a highly contended lock, but works fine as |
1406 int rs; |
1281 * a more conservative alternative to a pure spinlock. |
1407 return ((((rs = runState) & RSLOCK) != 0 || |
1282 */ |
1408 !U.compareAndSwapInt(this, RUNSTATE, rs, rs |= RSLOCK)) ? |
1283 private int acquirePlock() { |
1409 awaitRunStateLock() : rs); |
1284 int spins = PL_SPINS, ps, nps; |
1410 } |
1285 for (;;) { |
1411 |
1286 if (((ps = plock) & PL_LOCK) == 0 && |
1412 /** |
1287 U.compareAndSwapInt(this, PLOCK, ps, nps = ps + PL_LOCK)) |
1413 * Spins and/or blocks until runstate lock is available. See |
1288 return nps; |
1414 * above for explanation. |
1289 else if (spins >= 0) { |
1415 */ |
1290 if (ThreadLocalRandom.nextSecondarySeed() >= 0) |
1416 private int awaitRunStateLock() { |
|
1417 Object lock; |
|
1418 boolean wasInterrupted = false; |
|
1419 for (int spins = SPINS, r = 0, rs, ns;;) { |
|
1420 if (((rs = runState) & RSLOCK) == 0) { |
|
1421 if (U.compareAndSwapInt(this, RUNSTATE, rs, ns = rs | RSLOCK)) { |
|
1422 if (wasInterrupted) { |
|
1423 try { |
|
1424 Thread.currentThread().interrupt(); |
|
1425 } catch (SecurityException ignore) { |
|
1426 } |
|
1427 } |
|
1428 return ns; |
|
1429 } |
|
1430 } |
|
1431 else if (r == 0) |
|
1432 r = ThreadLocalRandom.nextSecondarySeed(); |
|
1433 else if (spins > 0) { |
|
1434 r ^= r << 6; r ^= r >>> 21; r ^= r << 7; // xorshift |
|
1435 if (r >= 0) |
1291 --spins; |
1436 --spins; |
1292 } |
1437 } |
1293 else if (U.compareAndSwapInt(this, PLOCK, ps, ps | PL_SIGNAL)) { |
1438 else if ((rs & STARTED) == 0 || (lock = stealCounter) == null) |
1294 synchronized (this) { |
1439 Thread.yield(); // initialization race |
1295 if ((plock & PL_SIGNAL) != 0) { |
1440 else if (U.compareAndSwapInt(this, RUNSTATE, rs, rs | RSIGNAL)) { |
|
1441 synchronized (lock) { |
|
1442 if ((runState & RSIGNAL) != 0) { |
1296 try { |
1443 try { |
1297 wait(); |
1444 lock.wait(); |
1298 } catch (InterruptedException ie) { |
1445 } catch (InterruptedException ie) { |
1299 try { |
1446 if (!(Thread.currentThread() instanceof |
1300 Thread.currentThread().interrupt(); |
1447 ForkJoinWorkerThread)) |
1301 } catch (SecurityException ignore) { |
1448 wasInterrupted = true; |
1302 } |
|
1303 } |
1449 } |
1304 } |
1450 } |
1305 else |
1451 else |
1306 notifyAll(); |
1452 lock.notifyAll(); |
1307 } |
1453 } |
1308 } |
1454 } |
1309 } |
1455 } |
1310 } |
1456 } |
1311 |
1457 |
1312 /** |
1458 /** |
1313 * Unlocks and signals any thread waiting for plock. Called only |
1459 * Unlocks and sets runState to newRunState. |
1314 * when CAS of seq value for unlock fails. |
1460 * |
1315 */ |
1461 * @param oldRunState a value returned from lockRunState |
1316 private void releasePlock(int ps) { |
1462 * @param newRunState the next value (must have lock bit clear). |
1317 plock = ps; |
1463 */ |
1318 synchronized (this) { notifyAll(); } |
1464 private void unlockRunState(int oldRunState, int newRunState) { |
1319 } |
1465 if (!U.compareAndSwapInt(this, RUNSTATE, oldRunState, newRunState)) { |
1320 |
1466 Object lock = stealCounter; |
1321 /** |
1467 runState = newRunState; // clears RSIGNAL bit |
1322 * Tries to create and start one worker if fewer than target |
1468 if (lock != null) |
1323 * parallelism level exist. Adjusts counts etc on failure. |
1469 synchronized (lock) { lock.notifyAll(); } |
1324 */ |
1470 } |
1325 private void tryAddWorker() { |
1471 } |
1326 long c; int u, e; |
1472 |
1327 while ((u = (int)((c = ctl) >>> 32)) < 0 && |
1473 // Creating, registering and deregistering workers |
1328 (u & SHORT_SIGN) != 0 && (e = (int)c) >= 0) { |
1474 |
1329 long nc = ((long)(((u + UTC_UNIT) & UTC_MASK) | |
1475 /** |
1330 ((u + UAC_UNIT) & UAC_MASK)) << 32) | (long)e; |
1476 * Tries to construct and start one worker. Assumes that total |
1331 if (U.compareAndSwapLong(this, CTL, c, nc)) { |
1477 * count has already been incremented as a reservation. Invokes |
1332 ForkJoinWorkerThreadFactory fac; |
1478 * deregisterWorker on any failure. |
1333 Throwable ex = null; |
1479 * |
1334 ForkJoinWorkerThread wt = null; |
1480 * @return true if successful |
1335 try { |
1481 */ |
1336 if ((fac = factory) != null && |
1482 private boolean createWorker() { |
1337 (wt = fac.newThread(this)) != null) { |
1483 ForkJoinWorkerThreadFactory fac = factory; |
1338 wt.start(); |
1484 Throwable ex = null; |
1339 break; |
1485 ForkJoinWorkerThread wt = null; |
1340 } |
1486 try { |
1341 } catch (Throwable rex) { |
1487 if (fac != null && (wt = fac.newThread(this)) != null) { |
1342 ex = rex; |
1488 wt.start(); |
|
1489 return true; |
|
1490 } |
|
1491 } catch (Throwable rex) { |
|
1492 ex = rex; |
|
1493 } |
|
1494 deregisterWorker(wt, ex); |
|
1495 return false; |
|
1496 } |
|
1497 |
|
1498 /** |
|
1499 * Tries to add one worker, incrementing ctl counts before doing |
|
1500 * so, relying on createWorker to back out on failure. |
|
1501 * |
|
1502 * @param c incoming ctl value, with total count negative and no |
|
1503 * idle workers. On CAS failure, c is refreshed and retried if |
|
1504 * this holds (otherwise, a new worker is not needed). |
|
1505 */ |
|
1506 private void tryAddWorker(long c) { |
|
1507 boolean add = false; |
|
1508 do { |
|
1509 long nc = ((AC_MASK & (c + AC_UNIT)) | |
|
1510 (TC_MASK & (c + TC_UNIT))); |
|
1511 if (ctl == c) { |
|
1512 int rs, stop; // check if terminating |
|
1513 if ((stop = (rs = lockRunState()) & STOP) == 0) |
|
1514 add = U.compareAndSwapLong(this, CTL, c, nc); |
|
1515 unlockRunState(rs, rs & ~RSLOCK); |
|
1516 if (stop != 0) |
|
1517 break; |
|
1518 if (add) { |
|
1519 createWorker(); |
|
1520 break; |
1343 } |
1521 } |
1344 deregisterWorker(wt, ex); |
1522 } |
1345 break; |
1523 } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0); |
1346 } |
1524 } |
1347 } |
1525 |
1348 } |
1526 /** |
1349 |
1527 * Callback from ForkJoinWorkerThread constructor to establish and |
1350 // Registering and deregistering workers |
1528 * record its WorkQueue. |
1351 |
|
1352 /** |
|
1353 * Callback from ForkJoinWorkerThread to establish and record its |
|
1354 * WorkQueue. To avoid scanning bias due to packing entries in |
|
1355 * front of the workQueues array, we treat the array as a simple |
|
1356 * power-of-two hash table using per-thread seed as hash, |
|
1357 * expanding as needed. |
|
1358 * |
1529 * |
1359 * @param wt the worker thread |
1530 * @param wt the worker thread |
1360 * @return the worker's queue |
1531 * @return the worker's queue |
1361 */ |
1532 */ |
1362 final WorkQueue registerWorker(ForkJoinWorkerThread wt) { |
1533 final WorkQueue registerWorker(ForkJoinWorkerThread wt) { |
1363 UncaughtExceptionHandler handler; WorkQueue[] ws; int s, ps; |
1534 UncaughtExceptionHandler handler; |
1364 wt.setDaemon(true); |
1535 wt.setDaemon(true); // configure thread |
1365 if ((handler = ueh) != null) |
1536 if ((handler = ueh) != null) |
1366 wt.setUncaughtExceptionHandler(handler); |
1537 wt.setUncaughtExceptionHandler(handler); |
1367 do {} while (!U.compareAndSwapInt(this, INDEXSEED, s = indexSeed, |
1538 WorkQueue w = new WorkQueue(this, wt); |
1368 s += SEED_INCREMENT) || |
1539 int i = 0; // assign a pool index |
1369 s == 0); // skip 0 |
1540 int mode = config & MODE_MASK; |
1370 WorkQueue w = new WorkQueue(this, wt, mode, s); |
1541 int rs = lockRunState(); |
1371 if (((ps = plock) & PL_LOCK) != 0 || |
|
1372 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) |
|
1373 ps = acquirePlock(); |
|
1374 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); |
|
1375 try { |
1542 try { |
1376 if ((ws = workQueues) != null) { // skip if shutting down |
1543 WorkQueue[] ws; int n; // skip if no array |
1377 int n = ws.length, m = n - 1; |
1544 if ((ws = workQueues) != null && (n = ws.length) > 0) { |
1378 int r = (s << 1) | 1; // use odd-numbered indices |
1545 int s = indexSeed += SEED_INCREMENT; // unlikely to collide |
1379 if (ws[r &= m] != null) { // collision |
1546 int m = n - 1; |
1380 int probes = 0; // step by approx half size |
1547 i = ((s << 1) | 1) & m; // odd-numbered indices |
|
1548 if (ws[i] != null) { // collision |
|
1549 int probes = 0; // step by approx half n |
1381 int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2; |
1550 int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2; |
1382 while (ws[r = (r + step) & m] != null) { |
1551 while (ws[i = (i + step) & m] != null) { |
1383 if (++probes >= n) { |
1552 if (++probes >= n) { |
1384 workQueues = ws = Arrays.copyOf(ws, n <<= 1); |
1553 workQueues = ws = Arrays.copyOf(ws, n <<= 1); |
1385 m = n - 1; |
1554 m = n - 1; |
1386 probes = 0; |
1555 probes = 0; |
1387 } |
1556 } |
1388 } |
1557 } |
1389 } |
1558 } |
1390 w.poolIndex = (short)r; |
1559 w.hint = s; // use as random seed |
1391 w.eventCount = r; // volatile write orders |
1560 w.config = i | mode; |
1392 ws[r] = w; |
1561 w.scanState = i; // publication fence |
|
1562 ws[i] = w; |
1393 } |
1563 } |
1394 } finally { |
1564 } finally { |
1395 if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) |
1565 unlockRunState(rs, rs & ~RSLOCK); |
1396 releasePlock(nps); |
1566 } |
1397 } |
1567 wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1))); |
1398 wt.setName(workerNamePrefix.concat(Integer.toString(w.poolIndex >>> 1))); |
|
1399 return w; |
1568 return w; |
1400 } |
1569 } |
1401 |
1570 |
1402 /** |
1571 /** |
1403 * Final callback from terminating worker, as well as upon failure |
1572 * Final callback from terminating worker, as well as upon failure |
1409 * @param ex the exception causing failure, or null if none |
1578 * @param ex the exception causing failure, or null if none |
1410 */ |
1579 */ |
1411 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { |
1580 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { |
1412 WorkQueue w = null; |
1581 WorkQueue w = null; |
1413 if (wt != null && (w = wt.workQueue) != null) { |
1582 if (wt != null && (w = wt.workQueue) != null) { |
1414 int ps; |
1583 WorkQueue[] ws; // remove index from array |
1415 w.qlock = -1; // ensure set |
1584 int idx = w.config & SMASK; |
1416 U.getAndAddLong(this, STEALCOUNT, w.nsteals); // collect steals |
1585 int rs = lockRunState(); |
1417 if (((ps = plock) & PL_LOCK) != 0 || |
1586 if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w) |
1418 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) |
1587 ws[idx] = null; |
1419 ps = acquirePlock(); |
1588 unlockRunState(rs, rs & ~RSLOCK); |
1420 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); |
1589 } |
1421 try { |
1590 long c; // decrement counts |
1422 int idx = w.poolIndex; |
|
1423 WorkQueue[] ws = workQueues; |
|
1424 if (ws != null && idx >= 0 && idx < ws.length && ws[idx] == w) |
|
1425 ws[idx] = null; |
|
1426 } finally { |
|
1427 if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) |
|
1428 releasePlock(nps); |
|
1429 } |
|
1430 } |
|
1431 |
|
1432 long c; // adjust ctl counts |
|
1433 do {} while (!U.compareAndSwapLong |
1591 do {} while (!U.compareAndSwapLong |
1434 (this, CTL, c = ctl, (((c - AC_UNIT) & AC_MASK) | |
1592 (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) | |
1435 ((c - TC_UNIT) & TC_MASK) | |
1593 (TC_MASK & (c - TC_UNIT)) | |
1436 (c & ~(AC_MASK|TC_MASK))))); |
1594 (SP_MASK & c)))); |
1437 |
1595 if (w != null) { |
1438 if (!tryTerminate(false, false) && w != null && w.array != null) { |
1596 w.qlock = -1; // ensure set |
1439 w.cancelAll(); // cancel remaining tasks |
1597 w.transferStealCount(this); |
1440 WorkQueue[] ws; WorkQueue v; Thread p; int u, i, e; |
1598 w.cancelAll(); // cancel remaining tasks |
1441 while ((u = (int)((c = ctl) >>> 32)) < 0 && (e = (int)c) >= 0) { |
1599 } |
1442 if (e > 0) { // activate or create replacement |
1600 for (;;) { // possibly replace |
1443 if ((ws = workQueues) == null || |
1601 WorkQueue[] ws; int m, sp; |
1444 (i = e & SMASK) >= ws.length || |
1602 if (tryTerminate(false, false) || w == null || w.array == null || |
1445 (v = ws[i]) == null) |
1603 (runState & STOP) != 0 || (ws = workQueues) == null || |
1446 break; |
1604 (m = ws.length - 1) < 0) // already terminating |
1447 long nc = (((long)(v.nextWait & E_MASK)) | |
1605 break; |
1448 ((long)(u + UAC_UNIT) << 32)); |
1606 if ((sp = (int)(c = ctl)) != 0) { // wake up replacement |
1449 if (v.eventCount != (e | INT_SIGN)) |
1607 if (tryRelease(c, ws[sp & m], AC_UNIT)) |
1450 break; |
|
1451 if (U.compareAndSwapLong(this, CTL, c, nc)) { |
|
1452 v.eventCount = (e + E_SEQ) & E_MASK; |
|
1453 if ((p = v.parker) != null) |
|
1454 U.unpark(p); |
|
1455 break; |
|
1456 } |
|
1457 } |
|
1458 else { |
|
1459 if ((short)u < 0) |
|
1460 tryAddWorker(); |
|
1461 break; |
1608 break; |
1462 } |
1609 } |
1463 } |
1610 else if (ex != null && (c & ADD_WORKER) != 0L) { |
1464 } |
1611 tryAddWorker(c); // create replacement |
1465 if (ex == null) // help clean refs on way out |
1612 break; |
|
1613 } |
|
1614 else // don't need replacement |
|
1615 break; |
|
1616 } |
|
1617 if (ex == null) // help clean on way out |
1466 ForkJoinTask.helpExpungeStaleExceptions(); |
1618 ForkJoinTask.helpExpungeStaleExceptions(); |
1467 else // rethrow |
1619 else // rethrow |
1468 ForkJoinTask.rethrow(ex); |
1620 ForkJoinTask.rethrow(ex); |
1469 } |
1621 } |
1470 |
1622 |
1471 // Submissions |
1623 // Signalling |
1472 |
|
1473 /** |
|
1474 * Unless shutting down, adds the given task to a submission queue |
|
1475 * at submitter's current queue index (modulo submission |
|
1476 * range). Only the most common path is directly handled in this |
|
1477 * method. All others are relayed to fullExternalPush. |
|
1478 * |
|
1479 * @param task the task. Caller must ensure non-null. |
|
1480 */ |
|
1481 final void externalPush(ForkJoinTask<?> task) { |
|
1482 WorkQueue q; int m, s, n, am; ForkJoinTask<?>[] a; |
|
1483 int r = ThreadLocalRandom.getProbe(); |
|
1484 int ps = plock; |
|
1485 WorkQueue[] ws = workQueues; |
|
1486 if (ps > 0 && ws != null && (m = (ws.length - 1)) >= 0 && |
|
1487 (q = ws[m & r & SQMASK]) != null && r != 0 && |
|
1488 U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock |
|
1489 if ((a = q.array) != null && |
|
1490 (am = a.length - 1) > (n = (s = q.top) - q.base)) { |
|
1491 int j = ((am & s) << ASHIFT) + ABASE; |
|
1492 U.putOrderedObject(a, j, task); |
|
1493 q.top = s + 1; // push on to deque |
|
1494 q.qlock = 0; |
|
1495 if (n <= 1) |
|
1496 signalWork(ws, q); |
|
1497 return; |
|
1498 } |
|
1499 q.qlock = 0; |
|
1500 } |
|
1501 fullExternalPush(task); |
|
1502 } |
|
1503 |
|
1504 /** |
|
1505 * Full version of externalPush. This method is called, among |
|
1506 * other times, upon the first submission of the first task to the |
|
1507 * pool, so must perform secondary initialization. It also |
|
1508 * detects first submission by an external thread by looking up |
|
1509 * its ThreadLocal, and creates a new shared queue if the one at |
|
1510 * index if empty or contended. The plock lock body must be |
|
1511 * exception-free (so no try/finally) so we optimistically |
|
1512 * allocate new queues outside the lock and throw them away if |
|
1513 * (very rarely) not needed. |
|
1514 * |
|
1515 * Secondary initialization occurs when plock is zero, to create |
|
1516 * workQueue array and set plock to a valid value. This lock body |
|
1517 * must also be exception-free. Because the plock seq value can |
|
1518 * eventually wrap around zero, this method harmlessly fails to |
|
1519 * reinitialize if workQueues exists, while still advancing plock. |
|
1520 */ |
|
1521 private void fullExternalPush(ForkJoinTask<?> task) { |
|
1522 int r; |
|
1523 if ((r = ThreadLocalRandom.getProbe()) == 0) { |
|
1524 ThreadLocalRandom.localInit(); |
|
1525 r = ThreadLocalRandom.getProbe(); |
|
1526 } |
|
1527 for (;;) { |
|
1528 WorkQueue[] ws; WorkQueue q; int ps, m, k; |
|
1529 boolean move = false; |
|
1530 if ((ps = plock) < 0) |
|
1531 throw new RejectedExecutionException(); |
|
1532 else if (ps == 0 || (ws = workQueues) == null || |
|
1533 (m = ws.length - 1) < 0) { // initialize workQueues |
|
1534 int p = parallelism; // find power of two table size |
|
1535 int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots |
|
1536 n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; |
|
1537 n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; |
|
1538 WorkQueue[] nws = ((ws = workQueues) == null || ws.length == 0 ? |
|
1539 new WorkQueue[n] : null); |
|
1540 if (((ps = plock) & PL_LOCK) != 0 || |
|
1541 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) |
|
1542 ps = acquirePlock(); |
|
1543 if (((ws = workQueues) == null || ws.length == 0) && nws != null) |
|
1544 workQueues = nws; |
|
1545 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); |
|
1546 if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) |
|
1547 releasePlock(nps); |
|
1548 } |
|
1549 else if ((q = ws[k = r & m & SQMASK]) != null) { |
|
1550 if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { |
|
1551 ForkJoinTask<?>[] a = q.array; |
|
1552 int s = q.top; |
|
1553 boolean submitted = false; |
|
1554 try { // locked version of push |
|
1555 if ((a != null && a.length > s + 1 - q.base) || |
|
1556 (a = q.growArray()) != null) { // must presize |
|
1557 int j = (((a.length - 1) & s) << ASHIFT) + ABASE; |
|
1558 U.putOrderedObject(a, j, task); |
|
1559 q.top = s + 1; |
|
1560 submitted = true; |
|
1561 } |
|
1562 } finally { |
|
1563 q.qlock = 0; // unlock |
|
1564 } |
|
1565 if (submitted) { |
|
1566 signalWork(ws, q); |
|
1567 return; |
|
1568 } |
|
1569 } |
|
1570 move = true; // move on failure |
|
1571 } |
|
1572 else if (((ps = plock) & PL_LOCK) == 0) { // create new queue |
|
1573 q = new WorkQueue(this, null, SHARED_QUEUE, r); |
|
1574 q.poolIndex = (short)k; |
|
1575 if (((ps = plock) & PL_LOCK) != 0 || |
|
1576 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) |
|
1577 ps = acquirePlock(); |
|
1578 if ((ws = workQueues) != null && k < ws.length && ws[k] == null) |
|
1579 ws[k] = q; |
|
1580 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); |
|
1581 if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) |
|
1582 releasePlock(nps); |
|
1583 } |
|
1584 else |
|
1585 move = true; // move if busy |
|
1586 if (move) |
|
1587 r = ThreadLocalRandom.advanceProbe(r); |
|
1588 } |
|
1589 } |
|
1590 |
|
1591 // Maintaining ctl counts |
|
1592 |
|
1593 /** |
|
1594 * Increments active count; mainly called upon return from blocking. |
|
1595 */ |
|
1596 final void incrementActiveCount() { |
|
1597 long c; |
|
1598 do {} while (!U.compareAndSwapLong |
|
1599 (this, CTL, c = ctl, ((c & ~AC_MASK) | |
|
1600 ((c & AC_MASK) + AC_UNIT)))); |
|
1601 } |
|
1602 |
1624 |
1603 /** |
1625 /** |
1604 * Tries to create or activate a worker if too few are active. |
1626 * Tries to create or activate a worker if too few are active. |
1605 * |
1627 * |
1606 * @param ws the worker array to use to find signallees |
1628 * @param ws the worker array to use to find signallees |
1607 * @param q if non-null, the queue holding tasks to be processed |
1629 * @param q a WorkQueue --if non-null, don't retry if now empty |
1608 */ |
1630 */ |
1609 final void signalWork(WorkQueue[] ws, WorkQueue q) { |
1631 final void signalWork(WorkQueue[] ws, WorkQueue q) { |
1610 for (;;) { |
1632 long c; int sp, i; WorkQueue v; Thread p; |
1611 long c; int e, u, i; WorkQueue w; Thread p; |
1633 while ((c = ctl) < 0L) { // too few active |
1612 if ((u = (int)((c = ctl) >>> 32)) >= 0) |
1634 if ((sp = (int)c) == 0) { // no idle workers |
|
1635 if ((c & ADD_WORKER) != 0L) // too few workers |
|
1636 tryAddWorker(c); |
1613 break; |
1637 break; |
1614 if ((e = (int)c) <= 0) { |
1638 } |
1615 if ((short)u < 0) |
1639 if (ws == null) // unstarted/terminated |
1616 tryAddWorker(); |
|
1617 break; |
1640 break; |
1618 } |
1641 if (ws.length <= (i = sp & SMASK)) // terminated |
1619 if (ws == null || ws.length <= (i = e & SMASK) || |
|
1620 (w = ws[i]) == null) |
|
1621 break; |
1642 break; |
1622 long nc = (((long)(w.nextWait & E_MASK)) | |
1643 if ((v = ws[i]) == null) // terminating |
1623 ((long)(u + UAC_UNIT)) << 32); |
1644 break; |
1624 int ne = (e + E_SEQ) & E_MASK; |
1645 int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState |
1625 if (w.eventCount == (e | INT_SIGN) && |
1646 int d = sp - v.scanState; // screen CAS |
1626 U.compareAndSwapLong(this, CTL, c, nc)) { |
1647 long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred); |
1627 w.eventCount = ne; |
1648 if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) { |
1628 if ((p = w.parker) != null) |
1649 v.scanState = vs; // activate v |
|
1650 if ((p = v.parker) != null) |
1629 U.unpark(p); |
1651 U.unpark(p); |
1630 break; |
1652 break; |
1631 } |
1653 } |
1632 if (q != null && q.base >= q.top) |
1654 if (q != null && q.base == q.top) // no more work |
1633 break; |
1655 break; |
1634 } |
1656 } |
1635 } |
1657 } |
1636 |
1658 |
|
1659 /** |
|
1660 * Signals and releases worker v if it is top of idle worker |
|
1661 * stack. This performs a one-shot version of signalWork only if |
|
1662 * there is (apparently) at least one idle worker. |
|
1663 * |
|
1664 * @param c incoming ctl value |
|
1665 * @param v if non-null, a worker |
|
1666 * @param inc the increment to active count (zero when compensating) |
|
1667 * @return true if successful |
|
1668 */ |
|
1669 private boolean tryRelease(long c, WorkQueue v, long inc) { |
|
1670 int sp = (int)c, vs = (sp + SS_SEQ) & ~INACTIVE; Thread p; |
|
1671 if (v != null && v.scanState == sp) { // v is at top of stack |
|
1672 long nc = (UC_MASK & (c + inc)) | (SP_MASK & v.stackPred); |
|
1673 if (U.compareAndSwapLong(this, CTL, c, nc)) { |
|
1674 v.scanState = vs; |
|
1675 if ((p = v.parker) != null) |
|
1676 U.unpark(p); |
|
1677 return true; |
|
1678 } |
|
1679 } |
|
1680 return false; |
|
1681 } |
|
1682 |
1637 // Scanning for tasks |
1683 // Scanning for tasks |
1638 |
1684 |
1639 /** |
1685 /** |
1640 * Top-level runloop for workers, called by ForkJoinWorkerThread.run. |
1686 * Top-level runloop for workers, called by ForkJoinWorkerThread.run. |
1641 */ |
1687 */ |
1642 final void runWorker(WorkQueue w) { |
1688 final void runWorker(WorkQueue w) { |
1643 w.growArray(); // allocate queue |
1689 w.growArray(); // allocate queue |
1644 for (int r = w.hint; scan(w, r) == 0; ) { |
1690 int seed = w.hint; // initially holds randomization hint |
|
1691 int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift |
|
1692 for (ForkJoinTask<?> t;;) { |
|
1693 if ((t = scan(w, r)) != null) |
|
1694 w.runTask(t); |
|
1695 else if (!awaitWork(w, r)) |
|
1696 break; |
1645 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift |
1697 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift |
1646 } |
1698 } |
1647 } |
1699 } |
1648 |
1700 |
1649 /** |
1701 /** |
1650 * Scans for and, if found, runs one task, else possibly |
1702 * Scans for and tries to steal a top-level task. Scans start at a |
1651 * inactivates the worker. This method operates on single reads of |
1703 * random location, randomly moving on apparent contention, |
1652 * volatile state and is designed to be re-invoked continuously, |
1704 * otherwise continuing linearly until reaching two consecutive |
1653 * in part because it returns upon detecting inconsistencies, |
1705 * empty passes over all queues with the same checksum (summing |
1654 * contention, or state changes that indicate possible success on |
1706 * each base index of each queue, that moves on each steal), at |
1655 * re-invocation. |
1707 * which point the worker tries to inactivate and then re-scans, |
1656 * |
1708 * attempting to re-activate (itself or some other worker) if |
1657 * The scan searches for tasks across queues starting at a random |
1709 * finding a task; otherwise returning null to await work. Scans |
1658 * index, checking each at least twice. The scan terminates upon |
1710 * otherwise touch as little memory as possible, to reduce |
1659 * either finding a non-empty queue, or completing the sweep. If |
1711 * disruption on other scanning threads. |
1660 * the worker is not inactivated, it takes and runs a task from |
|
1661 * this queue. Otherwise, if not activated, it tries to activate |
|
1662 * itself or some other worker by signalling. On failure to find a |
|
1663 * task, returns (for retry) if pool state may have changed during |
|
1664 * an empty scan, or tries to inactivate if active, else possibly |
|
1665 * blocks or terminates via method awaitWork. |
|
1666 * |
1712 * |
1667 * @param w the worker (via its WorkQueue) |
1713 * @param w the worker (via its WorkQueue) |
1668 * @param r a random seed |
1714 * @param r a random seed |
1669 * @return worker qlock status if would have waited, else 0 |
1715 * @return a task, or null if none found |
1670 */ |
1716 */ |
1671 private final int scan(WorkQueue w, int r) { |
1717 private ForkJoinTask<?> scan(WorkQueue w, int r) { |
1672 WorkQueue[] ws; int m; |
1718 WorkQueue[] ws; int m; |
1673 long c = ctl; // for consistency check |
1719 if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) { |
1674 if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && w != null) { |
1720 int ss = w.scanState; // initially non-negative |
1675 for (int j = m + m + 1, ec = w.eventCount;;) { |
1721 for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) { |
1676 WorkQueue q; int b, e; ForkJoinTask<?>[] a; ForkJoinTask<?> t; |
1722 WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t; |
1677 if ((q = ws[(r - j) & m]) != null && |
1723 int b, n; long c; |
1678 (b = q.base) - q.top < 0 && (a = q.array) != null) { |
1724 if ((q = ws[k]) != null) { |
1679 long i = (((a.length - 1) & b) << ASHIFT) + ABASE; |
1725 if ((n = (b = q.base) - q.top) < 0 && |
1680 if ((t = ((ForkJoinTask<?>) |
1726 (a = q.array) != null) { // non-empty |
1681 U.getObjectVolatile(a, i))) != null) { |
1727 long i = (((a.length - 1) & b) << ASHIFT) + ABASE; |
1682 if (ec < 0) |
1728 if ((t = ((ForkJoinTask<?>) |
1683 helpRelease(c, ws, w, q, b); |
1729 U.getObjectVolatile(a, i))) != null && |
1684 else if (q.base == b && |
1730 q.base == b) { |
1685 U.compareAndSwapObject(a, i, t, null)) { |
1731 if (ss >= 0) { |
1686 U.putOrderedInt(q, QBASE, b + 1); |
1732 if (U.compareAndSwapObject(a, i, t, null)) { |
1687 if ((b + 1) - q.top < 0) |
1733 q.base = b + 1; |
1688 signalWork(ws, q); |
1734 if (n < -1) // signal others |
1689 w.runTask(t); |
1735 signalWork(ws, q); |
|
1736 return t; |
|
1737 } |
|
1738 } |
|
1739 else if (oldSum == 0 && // try to activate |
|
1740 w.scanState < 0) |
|
1741 tryRelease(c = ctl, ws[m & (int)c], AC_UNIT); |
1690 } |
1742 } |
|
1743 if (ss < 0) // refresh |
|
1744 ss = w.scanState; |
|
1745 r ^= r << 1; r ^= r >>> 3; r ^= r << 10; |
|
1746 origin = k = r & m; // move and rescan |
|
1747 oldSum = checkSum = 0; |
|
1748 continue; |
1691 } |
1749 } |
1692 break; |
1750 checkSum += b; |
1693 } |
1751 } |
1694 else if (--j < 0) { |
1752 if ((k = (k + 1) & m) == origin) { // continue until stable |
1695 if ((ec | (e = (int)c)) < 0) // inactive or terminating |
1753 if ((ss >= 0 || (ss == (ss = w.scanState))) && |
1696 return awaitWork(w, c, ec); |
1754 oldSum == (oldSum = checkSum)) { |
1697 else if (ctl == c) { // try to inactivate and enqueue |
1755 if (ss < 0 || w.qlock < 0) // already inactive |
1698 long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK)); |
1756 break; |
1699 w.nextWait = e; |
1757 int ns = ss | INACTIVE; // try to inactivate |
1700 w.eventCount = ec | INT_SIGN; |
1758 long nc = ((SP_MASK & ns) | |
1701 if (!U.compareAndSwapLong(this, CTL, c, nc)) |
1759 (UC_MASK & ((c = ctl) - AC_UNIT))); |
1702 w.eventCount = ec; // back out |
1760 w.stackPred = (int)c; // hold prev stack top |
|
1761 U.putInt(w, QSCANSTATE, ns); |
|
1762 if (U.compareAndSwapLong(this, CTL, c, nc)) |
|
1763 ss = ns; |
|
1764 else |
|
1765 w.scanState = ss; // back out |
1703 } |
1766 } |
1704 break; |
1767 checkSum = 0; |
1705 } |
1768 } |
1706 } |
1769 } |
1707 } |
1770 } |
1708 return 0; |
1771 return null; |
1709 } |
1772 } |
1710 |
1773 |
1711 /** |
1774 /** |
1712 * A continuation of scan(), possibly blocking or terminating |
1775 * Possibly blocks worker w waiting for a task to steal, or |
1713 * worker w. Returns without blocking if pool state has apparently |
1776 * returns false if the worker should terminate. If inactivating |
1714 * changed since last invocation. Also, if inactivating w has |
1777 * w has caused the pool to become quiescent, checks for pool |
1715 * caused the pool to become quiescent, checks for pool |
|
1716 * termination, and, so long as this is not the only worker, waits |
1778 * termination, and, so long as this is not the only worker, waits |
1717 * for event for up to a given duration. On timeout, if ctl has |
1779 * for up to a given duration. On timeout, if ctl has not |
1718 * not changed, terminates the worker, which will in turn wake up |
1780 * changed, terminates the worker, which will in turn wake up |
1719 * another worker to possibly repeat this process. |
1781 * another worker to possibly repeat this process. |
1720 * |
1782 * |
1721 * @param w the calling worker |
1783 * @param w the calling worker |
1722 * @param c the ctl value on entry to scan |
1784 * @param r a random seed (for spins) |
1723 * @param ec the worker's eventCount on entry to scan |
1785 * @return false if the worker should terminate |
1724 */ |
1786 */ |
1725 private final int awaitWork(WorkQueue w, long c, int ec) { |
1787 private boolean awaitWork(WorkQueue w, int r) { |
1726 int stat, ns; long parkTime, deadline; |
1788 if (w == null || w.qlock < 0) // w is terminating |
1727 if ((stat = w.qlock) >= 0 && w.eventCount == ec && ctl == c && |
1789 return false; |
1728 !Thread.interrupted()) { |
1790 for (int pred = w.stackPred, spins = SPINS, ss;;) { |
1729 int e = (int)c; |
1791 if ((ss = w.scanState) >= 0) |
1730 int u = (int)(c >>> 32); |
1792 break; |
1731 int d = (u >> UAC_SHIFT) + parallelism; // active count |
1793 else if (spins > 0) { |
1732 |
1794 r ^= r << 6; r ^= r >>> 21; r ^= r << 7; |
1733 if (e < 0 || (d <= 0 && tryTerminate(false, false))) |
1795 if (r >= 0 && --spins == 0) { // randomize spins |
1734 stat = w.qlock = -1; // pool is terminating |
1796 WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc; |
1735 else if ((ns = w.nsteals) != 0) { // collect steals and retry |
1797 if (pred != 0 && (ws = workQueues) != null && |
1736 w.nsteals = 0; |
1798 (j = pred & SMASK) < ws.length && |
1737 U.getAndAddLong(this, STEALCOUNT, (long)ns); |
1799 (v = ws[j]) != null && // see if pred parking |
1738 } |
1800 (v.parker == null || v.scanState >= 0)) |
1739 else { |
1801 spins = SPINS; // continue spinning |
1740 long pc = ((d > 0 || ec != (e | INT_SIGN)) ? 0L : |
1802 } |
1741 ((long)(w.nextWait & E_MASK)) | // ctl to restore |
1803 } |
1742 ((long)(u + UAC_UNIT)) << 32); |
1804 else if (w.qlock < 0) // recheck after spins |
1743 if (pc != 0L) { // timed wait if last waiter |
1805 return false; |
1744 int dc = -(short)(c >>> TC_SHIFT); |
1806 else if (!Thread.interrupted()) { |
1745 parkTime = (dc < 0 ? FAST_IDLE_TIMEOUT: |
1807 long c, prevctl, parkTime, deadline; |
1746 (dc + 1) * IDLE_TIMEOUT); |
1808 int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK); |
|
1809 if ((ac <= 0 && tryTerminate(false, false)) || |
|
1810 (runState & STOP) != 0) // pool terminating |
|
1811 return false; |
|
1812 if (ac <= 0 && ss == (int)c) { // is last waiter |
|
1813 prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred); |
|
1814 int t = (short)(c >>> TC_SHIFT); // shrink excess spares |
|
1815 if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl)) |
|
1816 return false; // else use timed wait |
|
1817 parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t); |
1747 deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP; |
1818 deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP; |
1748 } |
1819 } |
1749 else |
1820 else |
1750 parkTime = deadline = 0L; |
1821 prevctl = parkTime = deadline = 0L; |
1751 if (w.eventCount == ec && ctl == c) { |
1822 Thread wt = Thread.currentThread(); |
1752 Thread wt = Thread.currentThread(); |
1823 U.putObject(wt, PARKBLOCKER, this); // emulate LockSupport |
1753 U.putObject(wt, PARKBLOCKER, this); |
1824 w.parker = wt; |
1754 w.parker = wt; // emulate LockSupport.park |
1825 if (w.scanState < 0 && ctl == c) // recheck before park |
1755 if (w.eventCount == ec && ctl == c) |
1826 U.park(false, parkTime); |
1756 U.park(false, parkTime); // must recheck before park |
1827 U.putOrderedObject(w, QPARKER, null); |
1757 w.parker = null; |
1828 U.putObject(wt, PARKBLOCKER, null); |
1758 U.putObject(wt, PARKBLOCKER, null); |
1829 if (w.scanState >= 0) |
1759 if (parkTime != 0L && ctl == c && |
1830 break; |
1760 deadline - System.nanoTime() <= 0L && |
1831 if (parkTime != 0L && ctl == c && |
1761 U.compareAndSwapLong(this, CTL, c, pc)) |
1832 deadline - System.nanoTime() <= 0L && |
1762 stat = w.qlock = -1; // shrink pool |
1833 U.compareAndSwapLong(this, CTL, c, prevctl)) |
|
1834 return false; // shrink pool |
|
1835 } |
|
1836 } |
|
1837 return true; |
|
1838 } |
|
1839 |
|
1840 // Joining tasks |
|
1841 |
|
1842 /** |
|
1843 * Tries to steal and run tasks within the target's computation. |
|
1844 * Uses a variant of the top-level algorithm, restricted to tasks |
|
1845 * with the given task as ancestor: It prefers taking and running |
|
1846 * eligible tasks popped from the worker's own queue (via |
|
1847 * popCC). Otherwise it scans others, randomly moving on |
|
1848 * contention or execution, deciding to give up based on a |
|
1849 * checksum (via return codes frob pollAndExecCC). The maxTasks |
|
1850 * argument supports external usages; internal calls use zero, |
|
1851 * allowing unbounded steps (external calls trap non-positive |
|
1852 * values). |
|
1853 * |
|
1854 * @param w caller |
|
1855 * @param maxTasks if non-zero, the maximum number of other tasks to run |
|
1856 * @return task status on exit |
|
1857 */ |
|
1858 final int helpComplete(WorkQueue w, CountedCompleter<?> task, |
|
1859 int maxTasks) { |
|
1860 WorkQueue[] ws; int s = 0, m; |
|
1861 if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && |
|
1862 task != null && w != null) { |
|
1863 int mode = w.config; // for popCC |
|
1864 int r = w.hint ^ w.top; // arbitrary seed for origin |
|
1865 int origin = r & m; // first queue to scan |
|
1866 int h = 1; // 1:ran, >1:contended, <0:hash |
|
1867 for (int k = origin, oldSum = 0, checkSum = 0;;) { |
|
1868 CountedCompleter<?> p; WorkQueue q; |
|
1869 if ((s = task.status) < 0) |
|
1870 break; |
|
1871 if (h == 1 && (p = w.popCC(task, mode)) != null) { |
|
1872 p.doExec(); // run local task |
|
1873 if (maxTasks != 0 && --maxTasks == 0) |
|
1874 break; |
|
1875 origin = k; // reset |
|
1876 oldSum = checkSum = 0; |
1763 } |
1877 } |
1764 } |
1878 else { // poll other queues |
1765 } |
1879 if ((q = ws[k]) == null) |
1766 return stat; |
1880 h = 0; |
1767 } |
1881 else if ((h = q.pollAndExecCC(task)) < 0) |
1768 |
1882 checkSum += h; |
1769 /** |
1883 if (h > 0) { |
1770 * Possibly releases (signals) a worker. Called only from scan() |
1884 if (h == 1 && maxTasks != 0 && --maxTasks == 0) |
1771 * when a worker with apparently inactive status finds a non-empty |
1885 break; |
1772 * queue. This requires revalidating all of the associated state |
1886 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift |
1773 * from caller. |
1887 origin = k = r & m; // move and restart |
1774 */ |
1888 oldSum = checkSum = 0; |
1775 private final void helpRelease(long c, WorkQueue[] ws, WorkQueue w, |
1889 } |
1776 WorkQueue q, int b) { |
1890 else if ((k = (k + 1) & m) == origin) { |
1777 WorkQueue v; int e, i; Thread p; |
1891 if (oldSum == (oldSum = checkSum)) |
1778 if (w != null && w.eventCount < 0 && (e = (int)c) > 0 && |
1892 break; |
1779 ws != null && ws.length > (i = e & SMASK) && |
1893 checkSum = 0; |
1780 (v = ws[i]) != null && ctl == c) { |
1894 } |
1781 long nc = (((long)(v.nextWait & E_MASK)) | |
1895 } |
1782 ((long)((int)(c >>> 32) + UAC_UNIT)) << 32); |
1896 } |
1783 int ne = (e + E_SEQ) & E_MASK; |
1897 } |
1784 if (q != null && q.base == b && w.eventCount < 0 && |
1898 return s; |
1785 v.eventCount == (e | INT_SIGN) && |
|
1786 U.compareAndSwapLong(this, CTL, c, nc)) { |
|
1787 v.eventCount = ne; |
|
1788 if ((p = v.parker) != null) |
|
1789 U.unpark(p); |
|
1790 } |
|
1791 } |
|
1792 } |
1899 } |
1793 |
1900 |
1794 /** |
1901 /** |
1795 * Tries to locate and execute tasks for a stealer of the given |
1902 * Tries to locate and execute tasks for a stealer of the given |
1796 * task, or in turn one of its stealers, Traces currentSteal -> |
1903 * task, or in turn one of its stealers, Traces currentSteal -> |
1797 * currentJoin links looking for a thread working on a descendant |
1904 * currentJoin links looking for a thread working on a descendant |
1798 * of the given task and with a non-empty queue to steal back and |
1905 * of the given task and with a non-empty queue to steal back and |
1799 * execute tasks from. The first call to this method upon a |
1906 * execute tasks from. The first call to this method upon a |
1800 * waiting join will often entail scanning/search, (which is OK |
1907 * waiting join will often entail scanning/search, (which is OK |
1801 * because the joiner has nothing better to do), but this method |
1908 * because the joiner has nothing better to do), but this method |
1802 * leaves hints in workers to speed up subsequent calls. The |
1909 * leaves hints in workers to speed up subsequent calls. |
1803 * implementation is very branchy to cope with potential |
1910 * |
1804 * inconsistencies or loops encountering chains that are stale, |
1911 * @param w caller |
1805 * unknown, or so long that they are likely cyclic. |
|
1806 * |
|
1807 * @param joiner the joining worker |
|
1808 * @param task the task to join |
1912 * @param task the task to join |
1809 * @return 0 if no progress can be made, negative if task |
1913 */ |
1810 * known complete, else positive |
1914 private void helpStealer(WorkQueue w, ForkJoinTask<?> task) { |
1811 */ |
1915 WorkQueue[] ws = workQueues; |
1812 private int tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) { |
1916 int oldSum = 0, checkSum, m; |
1813 int stat = 0, steps = 0; // bound to avoid cycles |
1917 if (ws != null && (m = ws.length - 1) >= 0 && w != null && |
1814 if (task != null && joiner != null && |
1918 task != null) { |
1815 joiner.base - joiner.top >= 0) { // hoist checks |
1919 do { // restart point |
1816 restart: for (;;) { |
1920 checkSum = 0; // for stability check |
1817 ForkJoinTask<?> subtask = task; // current target |
1921 ForkJoinTask<?> subtask; |
1818 for (WorkQueue j = joiner, v;;) { // v is stealer of subtask |
1922 WorkQueue j = w, v; // v is subtask stealer |
1819 WorkQueue[] ws; int m, s, h; |
1923 descent: for (subtask = task; subtask.status >= 0; ) { |
1820 if ((s = task.status) < 0) { |
1924 for (int h = j.hint | 1, k = 0, i; ; k += 2) { |
1821 stat = s; |
1925 if (k > m) // can't find stealer |
1822 break restart; |
1926 break descent; |
1823 } |
1927 if ((v = ws[i = (h + k) & m]) != null) { |
1824 if ((ws = workQueues) == null || (m = ws.length - 1) <= 0) |
1928 if (v.currentSteal == subtask) { |
1825 break restart; // shutting down |
1929 j.hint = i; |
1826 if ((v = ws[h = (j.hint | 1) & m]) == null || |
|
1827 v.currentSteal != subtask) { |
|
1828 for (int origin = h;;) { // find stealer |
|
1829 if (((h = (h + 2) & m) & 15) == 1 && |
|
1830 (subtask.status < 0 || j.currentJoin != subtask)) |
|
1831 continue restart; // occasional staleness check |
|
1832 if ((v = ws[h]) != null && |
|
1833 v.currentSteal == subtask) { |
|
1834 j.hint = h; // save hint |
|
1835 break; |
1930 break; |
1836 } |
1931 } |
1837 if (h == origin) |
1932 checkSum += v.base; |
1838 break restart; // cannot find stealer |
|
1839 } |
1933 } |
1840 } |
1934 } |
1841 for (;;) { // help stealer or descend to its stealer |
1935 for (;;) { // help v or descend |
1842 ForkJoinTask<?>[] a; int b; |
1936 ForkJoinTask<?>[] a; int b; |
1843 if (subtask.status < 0) // surround probes with |
1937 checkSum += (b = v.base); |
1844 continue restart; // consistency checks |
1938 ForkJoinTask<?> next = v.currentJoin; |
1845 if ((b = v.base) - v.top < 0 && (a = v.array) != null) { |
1939 if (subtask.status < 0 || j.currentJoin != subtask || |
1846 int i = (((a.length - 1) & b) << ASHIFT) + ABASE; |
1940 v.currentSteal != subtask) // stale |
1847 ForkJoinTask<?> t = |
1941 break descent; |
1848 (ForkJoinTask<?>)U.getObjectVolatile(a, i); |
1942 if (b - v.top >= 0 || (a = v.array) == null) { |
1849 if (subtask.status < 0 || j.currentJoin != subtask || |
1943 if ((subtask = next) == null) |
1850 v.currentSteal != subtask) |
1944 break descent; |
1851 continue restart; // stale |
1945 j = v; |
1852 stat = 1; // apparent progress |
1946 break; |
1853 if (v.base == b) { |
|
1854 if (t == null) |
|
1855 break restart; |
|
1856 if (U.compareAndSwapObject(a, i, t, null)) { |
|
1857 U.putOrderedInt(v, QBASE, b + 1); |
|
1858 ForkJoinTask<?> ps = joiner.currentSteal; |
|
1859 int jt = joiner.top; |
|
1860 do { |
|
1861 joiner.currentSteal = t; |
|
1862 t.doExec(); // clear local tasks too |
|
1863 } while (task.status >= 0 && |
|
1864 joiner.top != jt && |
|
1865 (t = joiner.pop()) != null); |
|
1866 joiner.currentSteal = ps; |
|
1867 break restart; |
|
1868 } |
|
1869 } |
|
1870 } |
1947 } |
1871 else { // empty -- try to descend |
1948 int i = (((a.length - 1) & b) << ASHIFT) + ABASE; |
1872 ForkJoinTask<?> next = v.currentJoin; |
1949 ForkJoinTask<?> t = ((ForkJoinTask<?>) |
1873 if (subtask.status < 0 || j.currentJoin != subtask || |
1950 U.getObjectVolatile(a, i)); |
1874 v.currentSteal != subtask) |
1951 if (v.base == b) { |
1875 continue restart; // stale |
1952 if (t == null) // stale |
1876 else if (next == null || ++steps == MAX_HELP) |
1953 break descent; |
1877 break restart; // dead-end or maybe cyclic |
1954 if (U.compareAndSwapObject(a, i, t, null)) { |
1878 else { |
1955 v.base = b + 1; |
1879 subtask = next; |
1956 ForkJoinTask<?> ps = w.currentSteal; |
1880 j = v; |
1957 int top = w.top; |
1881 break; |
1958 do { |
|
1959 U.putOrderedObject(w, QCURRENTSTEAL, t); |
|
1960 t.doExec(); // clear local tasks too |
|
1961 } while (task.status >= 0 && |
|
1962 w.top != top && |
|
1963 (t = w.pop()) != null); |
|
1964 U.putOrderedObject(w, QCURRENTSTEAL, ps); |
|
1965 if (w.base != w.top) |
|
1966 return; // can't further help |
1882 } |
1967 } |
1883 } |
1968 } |
1884 } |
1969 } |
1885 } |
1970 } |
1886 } |
1971 } while (task.status >= 0 && oldSum != (oldSum = checkSum)); |
1887 } |
1972 } |
1888 return stat; |
1973 } |
1889 } |
1974 |
1890 |
1975 /** |
1891 /** |
1976 * Tries to decrement active count (sometimes implicitly) and |
1892 * Analog of tryHelpStealer for CountedCompleters. Tries to steal |
1977 * possibly release or create a compensating worker in preparation |
1893 * and run tasks within the target's computation. |
1978 * for blocking. Returns false (retryable by caller), on |
1894 * |
1979 * contention, detected staleness, instability, or termination. |
1895 * @param task the task to join |
1980 * |
1896 * @param maxTasks the maximum number of other tasks to run |
1981 * @param w caller |
1897 */ |
1982 */ |
1898 final int helpComplete(WorkQueue joiner, CountedCompleter<?> task, |
1983 private boolean tryCompensate(WorkQueue w) { |
1899 int maxTasks) { |
1984 boolean canBlock; |
1900 WorkQueue[] ws; int m; |
1985 WorkQueue[] ws; long c; int m, pc, sp; |
|
1986 if (w == null || w.qlock < 0 || // caller terminating |
|
1987 (ws = workQueues) == null || (m = ws.length - 1) <= 0 || |
|
1988 (pc = config & SMASK) == 0) // parallelism disabled |
|
1989 canBlock = false; |
|
1990 else if ((sp = (int)(c = ctl)) != 0) // release idle worker |
|
1991 canBlock = tryRelease(c, ws[sp & m], 0L); |
|
1992 else { |
|
1993 int ac = (int)(c >> AC_SHIFT) + pc; |
|
1994 int tc = (short)(c >> TC_SHIFT) + pc; |
|
1995 int nbusy = 0; // validate saturation |
|
1996 for (int i = 0; i <= m; ++i) { // two passes of odd indices |
|
1997 WorkQueue v; |
|
1998 if ((v = ws[((i << 1) | 1) & m]) != null) { |
|
1999 if ((v.scanState & SCANNING) != 0) |
|
2000 break; |
|
2001 ++nbusy; |
|
2002 } |
|
2003 } |
|
2004 if (nbusy != (tc << 1) || ctl != c) |
|
2005 canBlock = false; // unstable or stale |
|
2006 else if (tc >= pc && ac > 1 && w.isEmpty()) { |
|
2007 long nc = ((AC_MASK & (c - AC_UNIT)) | |
|
2008 (~AC_MASK & c)); // uncompensated |
|
2009 canBlock = U.compareAndSwapLong(this, CTL, c, nc); |
|
2010 } |
|
2011 else if (tc >= MAX_CAP || |
|
2012 (this == common && tc >= pc + commonMaxSpares)) |
|
2013 throw new RejectedExecutionException( |
|
2014 "Thread limit exceeded replacing blocked worker"); |
|
2015 else { // similar to tryAddWorker |
|
2016 boolean add = false; int rs; // CAS within lock |
|
2017 long nc = ((AC_MASK & c) | |
|
2018 (TC_MASK & (c + TC_UNIT))); |
|
2019 if (((rs = lockRunState()) & STOP) == 0) |
|
2020 add = U.compareAndSwapLong(this, CTL, c, nc); |
|
2021 unlockRunState(rs, rs & ~RSLOCK); |
|
2022 canBlock = add && createWorker(); // throws on exception |
|
2023 } |
|
2024 } |
|
2025 return canBlock; |
|
2026 } |
|
2027 |
|
2028 /** |
|
2029 * Helps and/or blocks until the given task is done or timeout. |
|
2030 * |
|
2031 * @param w caller |
|
2032 * @param task the task |
|
2033 * @param deadline for timed waits, if nonzero |
|
2034 * @return task status on exit |
|
2035 */ |
|
2036 final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) { |
1901 int s = 0; |
2037 int s = 0; |
1902 if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && |
2038 if (task != null && w != null) { |
1903 joiner != null && task != null) { |
2039 ForkJoinTask<?> prevJoin = w.currentJoin; |
1904 int j = joiner.poolIndex; |
2040 U.putOrderedObject(w, QCURRENTJOIN, task); |
1905 int scans = m + m + 1; |
2041 CountedCompleter<?> cc = (task instanceof CountedCompleter) ? |
1906 long c = 0L; // for stability check |
2042 (CountedCompleter<?>)task : null; |
1907 for (int k = scans; ; j += 2) { |
2043 for (;;) { |
1908 WorkQueue q; |
|
1909 if ((s = task.status) < 0) |
2044 if ((s = task.status) < 0) |
1910 break; |
2045 break; |
1911 else if (joiner.internalPopAndExecCC(task)) { |
2046 if (cc != null) |
1912 if (--maxTasks <= 0) { |
2047 helpComplete(w, cc, 0); |
1913 s = task.status; |
2048 else if (w.base == w.top || w.tryRemoveAndExec(task)) |
1914 break; |
2049 helpStealer(w, task); |
1915 } |
2050 if ((s = task.status) < 0) |
1916 k = scans; |
2051 break; |
|
2052 long ms, ns; |
|
2053 if (deadline == 0L) |
|
2054 ms = 0L; |
|
2055 else if ((ns = deadline - System.nanoTime()) <= 0L) |
|
2056 break; |
|
2057 else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L) |
|
2058 ms = 1L; |
|
2059 if (tryCompensate(w)) { |
|
2060 task.internalWait(ms); |
|
2061 U.getAndAddLong(this, CTL, AC_UNIT); |
1917 } |
2062 } |
1918 else if ((s = task.status) < 0) |
2063 } |
1919 break; |
2064 U.putOrderedObject(w, QCURRENTJOIN, prevJoin); |
1920 else if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) { |
|
1921 if (--maxTasks <= 0) { |
|
1922 s = task.status; |
|
1923 break; |
|
1924 } |
|
1925 k = scans; |
|
1926 } |
|
1927 else if (--k < 0) { |
|
1928 if (c == (c = ctl)) |
|
1929 break; |
|
1930 k = scans; |
|
1931 } |
|
1932 } |
|
1933 } |
2065 } |
1934 return s; |
2066 return s; |
1935 } |
2067 } |
1936 |
2068 |
1937 /** |
2069 // Specialized scanning |
1938 * Tries to decrement active count (sometimes implicitly) and |
|
1939 * possibly release or create a compensating worker in preparation |
|
1940 * for blocking. Fails on contention or termination. Otherwise, |
|
1941 * adds a new thread if no idle workers are available and pool |
|
1942 * may become starved. |
|
1943 * |
|
1944 * @param c the assumed ctl value |
|
1945 */ |
|
1946 final boolean tryCompensate(long c) { |
|
1947 WorkQueue[] ws = workQueues; |
|
1948 int pc = parallelism, e = (int)c, m, tc; |
|
1949 if (ws != null && (m = ws.length - 1) >= 0 && e >= 0 && ctl == c) { |
|
1950 WorkQueue w = ws[e & m]; |
|
1951 if (e != 0 && w != null) { |
|
1952 Thread p; |
|
1953 long nc = ((long)(w.nextWait & E_MASK) | |
|
1954 (c & (AC_MASK|TC_MASK))); |
|
1955 int ne = (e + E_SEQ) & E_MASK; |
|
1956 if (w.eventCount == (e | INT_SIGN) && |
|
1957 U.compareAndSwapLong(this, CTL, c, nc)) { |
|
1958 w.eventCount = ne; |
|
1959 if ((p = w.parker) != null) |
|
1960 U.unpark(p); |
|
1961 return true; // replace with idle worker |
|
1962 } |
|
1963 } |
|
1964 else if ((tc = (short)(c >>> TC_SHIFT)) >= 0 && |
|
1965 (int)(c >> AC_SHIFT) + pc > 1) { |
|
1966 long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK); |
|
1967 if (U.compareAndSwapLong(this, CTL, c, nc)) |
|
1968 return true; // no compensation |
|
1969 } |
|
1970 else if (tc + pc < MAX_CAP) { |
|
1971 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); |
|
1972 if (U.compareAndSwapLong(this, CTL, c, nc)) { |
|
1973 ForkJoinWorkerThreadFactory fac; |
|
1974 Throwable ex = null; |
|
1975 ForkJoinWorkerThread wt = null; |
|
1976 try { |
|
1977 if ((fac = factory) != null && |
|
1978 (wt = fac.newThread(this)) != null) { |
|
1979 wt.start(); |
|
1980 return true; |
|
1981 } |
|
1982 } catch (Throwable rex) { |
|
1983 ex = rex; |
|
1984 } |
|
1985 deregisterWorker(wt, ex); // clean up and return false |
|
1986 } |
|
1987 } |
|
1988 } |
|
1989 return false; |
|
1990 } |
|
1991 |
|
1992 /** |
|
1993 * Helps and/or blocks until the given task is done. |
|
1994 * |
|
1995 * @param joiner the joining worker |
|
1996 * @param task the task |
|
1997 * @return task status on exit |
|
1998 */ |
|
1999 final int awaitJoin(WorkQueue joiner, ForkJoinTask<?> task) { |
|
2000 int s = 0; |
|
2001 if (task != null && (s = task.status) >= 0 && joiner != null) { |
|
2002 ForkJoinTask<?> prevJoin = joiner.currentJoin; |
|
2003 joiner.currentJoin = task; |
|
2004 do {} while (joiner.tryRemoveAndExec(task) && // process local tasks |
|
2005 (s = task.status) >= 0); |
|
2006 if (s >= 0 && (task instanceof CountedCompleter)) |
|
2007 s = helpComplete(joiner, (CountedCompleter<?>)task, Integer.MAX_VALUE); |
|
2008 long cc = 0; // for stability checks |
|
2009 while (s >= 0 && (s = task.status) >= 0) { |
|
2010 if ((s = tryHelpStealer(joiner, task)) == 0 && |
|
2011 (s = task.status) >= 0) { |
|
2012 if (!tryCompensate(cc)) |
|
2013 cc = ctl; |
|
2014 else { |
|
2015 if (task.trySetSignal() && (s = task.status) >= 0) { |
|
2016 synchronized (task) { |
|
2017 if (task.status >= 0) { |
|
2018 try { // see ForkJoinTask |
|
2019 task.wait(); // for explanation |
|
2020 } catch (InterruptedException ie) { |
|
2021 } |
|
2022 } |
|
2023 else |
|
2024 task.notifyAll(); |
|
2025 } |
|
2026 } |
|
2027 long c; // reactivate |
|
2028 do {} while (!U.compareAndSwapLong |
|
2029 (this, CTL, c = ctl, |
|
2030 ((c & ~AC_MASK) | |
|
2031 ((c & AC_MASK) + AC_UNIT)))); |
|
2032 } |
|
2033 } |
|
2034 } |
|
2035 joiner.currentJoin = prevJoin; |
|
2036 } |
|
2037 return s; |
|
2038 } |
|
2039 |
|
2040 /** |
|
2041 * Stripped-down variant of awaitJoin used by timed joins. Tries |
|
2042 * to help join only while there is continuous progress. (Caller |
|
2043 * will then enter a timed wait.) |
|
2044 * |
|
2045 * @param joiner the joining worker |
|
2046 * @param task the task |
|
2047 */ |
|
2048 final void helpJoinOnce(WorkQueue joiner, ForkJoinTask<?> task) { |
|
2049 int s; |
|
2050 if (joiner != null && task != null && (s = task.status) >= 0) { |
|
2051 ForkJoinTask<?> prevJoin = joiner.currentJoin; |
|
2052 joiner.currentJoin = task; |
|
2053 do {} while (joiner.tryRemoveAndExec(task) && // process local tasks |
|
2054 (s = task.status) >= 0); |
|
2055 if (s >= 0) { |
|
2056 if (task instanceof CountedCompleter) |
|
2057 helpComplete(joiner, (CountedCompleter<?>)task, Integer.MAX_VALUE); |
|
2058 do {} while (task.status >= 0 && |
|
2059 tryHelpStealer(joiner, task) > 0); |
|
2060 } |
|
2061 joiner.currentJoin = prevJoin; |
|
2062 } |
|
2063 } |
|
2064 |
2070 |
2065 /** |
2071 /** |
2066 * Returns a (probably) non-empty steal queue, if one is found |
2072 * Returns a (probably) non-empty steal queue, if one is found |
2067 * during a scan, else null. This method must be retried by |
2073 * during a scan, else null. This method must be retried by |
2068 * caller if, by the time it tries to use the queue, it is empty. |
2074 * caller if, by the time it tries to use the queue, it is empty. |
2069 */ |
2075 */ |
2070 private WorkQueue findNonEmptyStealQueue() { |
2076 private WorkQueue findNonEmptyStealQueue() { |
|
2077 WorkQueue[] ws; int m; // one-shot version of scan loop |
2071 int r = ThreadLocalRandom.nextSecondarySeed(); |
2078 int r = ThreadLocalRandom.nextSecondarySeed(); |
2072 for (;;) { |
2079 if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) { |
2073 int ps = plock, m; WorkQueue[] ws; WorkQueue q; |
2080 for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) { |
2074 if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) { |
2081 WorkQueue q; int b; |
2075 for (int j = (m + 1) << 2; j >= 0; --j) { |
2082 if ((q = ws[k]) != null) { |
2076 if ((q = ws[(((r - j) << 1) | 1) & m]) != null && |
2083 if ((b = q.base) - q.top < 0) |
2077 q.base - q.top < 0) |
|
2078 return q; |
2084 return q; |
|
2085 checkSum += b; |
2079 } |
2086 } |
2080 } |
2087 if ((k = (k + 1) & m) == origin) { |
2081 if (plock == ps) |
2088 if (oldSum == (oldSum = checkSum)) |
2082 return null; |
2089 break; |
2083 } |
2090 checkSum = 0; |
|
2091 } |
|
2092 } |
|
2093 } |
|
2094 return null; |
2084 } |
2095 } |
2085 |
2096 |
2086 /** |
2097 /** |
2087 * Runs tasks until {@code isQuiescent()}. We piggyback on |
2098 * Runs tasks until {@code isQuiescent()}. We piggyback on |
2088 * active count ctl maintenance, but rather than blocking |
2099 * active count ctl maintenance, but rather than blocking |
2089 * when tasks cannot be found, we rescan until all others cannot |
2100 * when tasks cannot be found, we rescan until all others cannot |
2090 * find tasks either. |
2101 * find tasks either. |
2091 */ |
2102 */ |
2092 final void helpQuiescePool(WorkQueue w) { |
2103 final void helpQuiescePool(WorkQueue w) { |
2093 ForkJoinTask<?> ps = w.currentSteal; |
2104 ForkJoinTask<?> ps = w.currentSteal; // save context |
2094 for (boolean active = true;;) { |
2105 for (boolean active = true;;) { |
2095 long c; WorkQueue q; ForkJoinTask<?> t; int b; |
2106 long c; WorkQueue q; ForkJoinTask<?> t; int b; |
2096 while ((t = w.nextLocalTask()) != null) |
2107 w.execLocalTasks(); // run locals before each scan |
2097 t.doExec(); |
|
2098 if ((q = findNonEmptyStealQueue()) != null) { |
2108 if ((q = findNonEmptyStealQueue()) != null) { |
2099 if (!active) { // re-establish active count |
2109 if (!active) { // re-establish active count |
2100 active = true; |
2110 active = true; |
2101 do {} while (!U.compareAndSwapLong |
2111 U.getAndAddLong(this, CTL, AC_UNIT); |
2102 (this, CTL, c = ctl, |
|
2103 ((c & ~AC_MASK) | |
|
2104 ((c & AC_MASK) + AC_UNIT)))); |
|
2105 } |
2112 } |
2106 if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) |
2113 if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) { |
2107 w.runTask(t); |
2114 U.putOrderedObject(w, QCURRENTSTEAL, t); |
|
2115 t.doExec(); |
|
2116 if (++w.nsteals < 0) |
|
2117 w.transferStealCount(this); |
|
2118 } |
2108 } |
2119 } |
2109 else if (active) { // decrement active count without queuing |
2120 else if (active) { // decrement active count without queuing |
2110 long nc = ((c = ctl) & ~AC_MASK) | ((c & AC_MASK) - AC_UNIT); |
2121 long nc = (AC_MASK & ((c = ctl) - AC_UNIT)) | (~AC_MASK & c); |
2111 if ((int)(nc >> AC_SHIFT) + parallelism == 0) |
2122 if ((int)(nc >> AC_SHIFT) + (config & SMASK) <= 0) |
2112 break; // bypass decrement-then-increment |
2123 break; // bypass decrement-then-increment |
2113 if (U.compareAndSwapLong(this, CTL, c, nc)) |
2124 if (U.compareAndSwapLong(this, CTL, c, nc)) |
2114 active = false; |
2125 active = false; |
2115 } |
2126 } |
2116 else if ((int)((c = ctl) >> AC_SHIFT) + parallelism <= 0 && |
2127 else if ((int)((c = ctl) >> AC_SHIFT) + (config & SMASK) <= 0 && |
2117 U.compareAndSwapLong |
2128 U.compareAndSwapLong(this, CTL, c, c + AC_UNIT)) |
2118 (this, CTL, c, ((c & ~AC_MASK) | |
|
2119 ((c & AC_MASK) + AC_UNIT)))) |
|
2120 break; |
2129 break; |
2121 } |
2130 } |
|
2131 U.putOrderedObject(w, QCURRENTSTEAL, ps); |
2122 } |
2132 } |
2123 |
2133 |
2124 /** |
2134 /** |
2125 * Gets and removes a local or stolen task for the given worker. |
2135 * Gets and removes a local or stolen task for the given worker. |
2126 * |
2136 * |
2200 } |
2207 } |
2201 |
2208 |
2202 // Termination |
2209 // Termination |
2203 |
2210 |
2204 /** |
2211 /** |
2205 * Possibly initiates and/or completes termination. The caller |
2212 * Possibly initiates and/or completes termination. |
2206 * triggering termination runs three passes through workQueues: |
|
2207 * (0) Setting termination status, followed by wakeups of queued |
|
2208 * workers; (1) cancelling all tasks; (2) interrupting lagging |
|
2209 * threads (likely in external tasks, but possibly also blocked in |
|
2210 * joins). Each pass repeats previous steps because of potential |
|
2211 * lagging thread creation. |
|
2212 * |
2213 * |
2213 * @param now if true, unconditionally terminate, else only |
2214 * @param now if true, unconditionally terminate, else only |
2214 * if no work and no active workers |
2215 * if no work and no active workers |
2215 * @param enable if true, enable shutdown when next possible |
2216 * @param enable if true, enable shutdown when next possible |
2216 * @return true if now terminating or terminated |
2217 * @return true if now terminating or terminated |
2217 */ |
2218 */ |
2218 private boolean tryTerminate(boolean now, boolean enable) { |
2219 private boolean tryTerminate(boolean now, boolean enable) { |
2219 int ps; |
2220 int rs; |
2220 if (this == common) // cannot shut down |
2221 if (this == common) // cannot shut down |
2221 return false; |
2222 return false; |
2222 if ((ps = plock) >= 0) { // enable by setting plock |
2223 if ((rs = runState) >= 0) { |
2223 if (!enable) |
2224 if (!enable) |
2224 return false; |
2225 return false; |
2225 if ((ps & PL_LOCK) != 0 || |
2226 rs = lockRunState(); // enter SHUTDOWN phase |
2226 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) |
2227 unlockRunState(rs, (rs & ~RSLOCK) | SHUTDOWN); |
2227 ps = acquirePlock(); |
2228 } |
2228 int nps = ((ps + PL_LOCK) & ~SHUTDOWN) | SHUTDOWN; |
2229 |
2229 if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) |
2230 if ((rs & STOP) == 0) { |
2230 releasePlock(nps); |
2231 if (!now) { // check quiescence |
2231 } |
2232 for (long oldSum = 0L;;) { // repeat until stable |
2232 for (long c;;) { |
2233 WorkQueue[] ws; WorkQueue w; int m, b; long c; |
2233 if (((c = ctl) & STOP_BIT) != 0) { // already terminating |
2234 long checkSum = ctl; |
2234 if ((short)(c >>> TC_SHIFT) + parallelism <= 0) { |
2235 if ((int)(checkSum >> AC_SHIFT) + (config & SMASK) > 0) |
2235 synchronized (this) { |
2236 return false; // still active workers |
2236 notifyAll(); // signal when 0 workers |
2237 if ((ws = workQueues) == null || (m = ws.length - 1) <= 0) |
|
2238 break; // check queues |
|
2239 for (int i = 0; i <= m; ++i) { |
|
2240 if ((w = ws[i]) != null) { |
|
2241 if ((b = w.base) != w.top || w.scanState >= 0 || |
|
2242 w.currentSteal != null) { |
|
2243 tryRelease(c = ctl, ws[m & (int)c], AC_UNIT); |
|
2244 return false; // arrange for recheck |
|
2245 } |
|
2246 checkSum += b; |
|
2247 if ((i & 1) == 0) |
|
2248 w.qlock = -1; // try to disable external |
|
2249 } |
2237 } |
2250 } |
|
2251 if (oldSum == (oldSum = checkSum)) |
|
2252 break; |
2238 } |
2253 } |
2239 return true; |
2254 } |
2240 } |
2255 if ((runState & STOP) == 0) { |
2241 if (!now) { // check if idle & no tasks |
2256 rs = lockRunState(); // enter STOP phase |
2242 WorkQueue[] ws; WorkQueue w; |
2257 unlockRunState(rs, (rs & ~RSLOCK) | STOP); |
2243 if ((int)(c >> AC_SHIFT) + parallelism > 0) |
2258 } |
2244 return false; |
2259 } |
2245 if ((ws = workQueues) != null) { |
2260 |
2246 for (int i = 0; i < ws.length; ++i) { |
2261 int pass = 0; // 3 passes to help terminate |
2247 if ((w = ws[i]) != null && |
2262 for (long oldSum = 0L;;) { // or until done or stable |
2248 (!w.isEmpty() || |
2263 WorkQueue[] ws; WorkQueue w; ForkJoinWorkerThread wt; int m; |
2249 ((i & 1) != 0 && w.eventCount >= 0))) { |
2264 long checkSum = ctl; |
2250 signalWork(ws, w); |
2265 if ((short)(checkSum >>> TC_SHIFT) + (config & SMASK) <= 0 || |
2251 return false; |
2266 (ws = workQueues) == null || (m = ws.length - 1) <= 0) { |
|
2267 if ((runState & TERMINATED) == 0) { |
|
2268 rs = lockRunState(); // done |
|
2269 unlockRunState(rs, (rs & ~RSLOCK) | TERMINATED); |
|
2270 synchronized (this) { notifyAll(); } // for awaitTermination |
|
2271 } |
|
2272 break; |
|
2273 } |
|
2274 for (int i = 0; i <= m; ++i) { |
|
2275 if ((w = ws[i]) != null) { |
|
2276 checkSum += w.base; |
|
2277 w.qlock = -1; // try to disable |
|
2278 if (pass > 0) { |
|
2279 w.cancelAll(); // clear queue |
|
2280 if (pass > 1 && (wt = w.owner) != null) { |
|
2281 if (!wt.isInterrupted()) { |
|
2282 try { // unblock join |
|
2283 wt.interrupt(); |
|
2284 } catch (Throwable ignore) { |
|
2285 } |
|
2286 } |
|
2287 if (w.scanState < 0) |
|
2288 U.unpark(wt); // wake up |
2252 } |
2289 } |
2253 } |
2290 } |
2254 } |
2291 } |
2255 } |
2292 } |
2256 if (U.compareAndSwapLong(this, CTL, c, c | STOP_BIT)) { |
2293 if (checkSum != oldSum) { // unstable |
2257 for (int pass = 0; pass < 3; ++pass) { |
2294 oldSum = checkSum; |
2258 WorkQueue[] ws; WorkQueue w; Thread wt; |
2295 pass = 0; |
2259 if ((ws = workQueues) != null) { |
2296 } |
2260 int n = ws.length; |
2297 else if (pass > 3 && pass > m) // can't further help |
2261 for (int i = 0; i < n; ++i) { |
2298 break; |
2262 if ((w = ws[i]) != null) { |
2299 else if (++pass > 1) { // try to dequeue |
2263 w.qlock = -1; |
2300 long c; int j = 0, sp; // bound attempts |
2264 if (pass > 0) { |
2301 while (j++ <= m && (sp = (int)(c = ctl)) != 0) |
2265 w.cancelAll(); |
2302 tryRelease(c, ws[sp & m], AC_UNIT); |
2266 if (pass > 1 && (wt = w.owner) != null) { |
2303 } |
2267 if (!wt.isInterrupted()) { |
2304 } |
2268 try { |
2305 return true; |
2269 wt.interrupt(); |
2306 } |
2270 } catch (Throwable ignore) { |
2307 |
2271 } |
2308 // External operations |
2272 } |
2309 |
2273 U.unpark(wt); |
2310 /** |
2274 } |
2311 * Full version of externalPush, handling uncommon cases, as well |
2275 } |
2312 * as performing secondary initialization upon the first |
2276 } |
2313 * submission of the first task to the pool. It also detects |
|
2314 * first submission by an external thread and creates a new shared |
|
2315 * queue if the one at index if empty or contended. |
|
2316 * |
|
2317 * @param task the task. Caller must ensure non-null. |
|
2318 */ |
|
2319 private void externalSubmit(ForkJoinTask<?> task) { |
|
2320 int r; // initialize caller's probe |
|
2321 if ((r = ThreadLocalRandom.getProbe()) == 0) { |
|
2322 ThreadLocalRandom.localInit(); |
|
2323 r = ThreadLocalRandom.getProbe(); |
|
2324 } |
|
2325 for (;;) { |
|
2326 WorkQueue[] ws; WorkQueue q; int rs, m, k; |
|
2327 boolean move = false; |
|
2328 if ((rs = runState) < 0) { |
|
2329 tryTerminate(false, false); // help terminate |
|
2330 throw new RejectedExecutionException(); |
|
2331 } |
|
2332 else if ((rs & STARTED) == 0 || // initialize |
|
2333 ((ws = workQueues) == null || (m = ws.length - 1) < 0)) { |
|
2334 int ns = 0; |
|
2335 rs = lockRunState(); |
|
2336 try { |
|
2337 if ((rs & STARTED) == 0) { |
|
2338 U.compareAndSwapObject(this, STEALCOUNTER, null, |
|
2339 new AtomicLong()); |
|
2340 // create workQueues array with size a power of two |
|
2341 int p = config & SMASK; // ensure at least 2 slots |
|
2342 int n = (p > 1) ? p - 1 : 1; |
|
2343 n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; |
|
2344 n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; |
|
2345 workQueues = new WorkQueue[n]; |
|
2346 ns = STARTED; |
|
2347 } |
|
2348 } finally { |
|
2349 unlockRunState(rs, (rs & ~RSLOCK) | ns); |
|
2350 } |
|
2351 } |
|
2352 else if ((q = ws[k = r & m & SQMASK]) != null) { |
|
2353 if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { |
|
2354 ForkJoinTask<?>[] a = q.array; |
|
2355 int s = q.top; |
|
2356 boolean submitted = false; // initial submission or resizing |
|
2357 try { // locked version of push |
|
2358 if ((a != null && a.length > s + 1 - q.base) || |
|
2359 (a = q.growArray()) != null) { |
|
2360 int j = (((a.length - 1) & s) << ASHIFT) + ABASE; |
|
2361 U.putOrderedObject(a, j, task); |
|
2362 U.putOrderedInt(q, QTOP, s + 1); |
|
2363 submitted = true; |
2277 } |
2364 } |
2278 // Wake up workers parked on event queue |
2365 } finally { |
2279 int i, e; long cc; Thread p; |
2366 U.compareAndSwapInt(q, QLOCK, 1, 0); |
2280 while ((e = (int)(cc = ctl) & E_MASK) != 0 && |
2367 } |
2281 (i = e & SMASK) < n && i >= 0 && |
2368 if (submitted) { |
2282 (w = ws[i]) != null) { |
2369 signalWork(ws, q); |
2283 long nc = ((long)(w.nextWait & E_MASK) | |
2370 return; |
2284 ((cc + AC_UNIT) & AC_MASK) | |
|
2285 (cc & (TC_MASK|STOP_BIT))); |
|
2286 if (w.eventCount == (e | INT_SIGN) && |
|
2287 U.compareAndSwapLong(this, CTL, cc, nc)) { |
|
2288 w.eventCount = (e + E_SEQ) & E_MASK; |
|
2289 w.qlock = -1; |
|
2290 if ((p = w.parker) != null) |
|
2291 U.unpark(p); |
|
2292 } |
|
2293 } |
|
2294 } |
2371 } |
2295 } |
2372 } |
2296 } |
2373 move = true; // move on failure |
2297 } |
2374 } |
2298 } |
2375 else if (((rs = runState) & RSLOCK) == 0) { // create new queue |
2299 |
2376 q = new WorkQueue(this, null); |
2300 // external operations on common pool |
2377 q.hint = r; |
2301 |
2378 q.config = k | SHARED_QUEUE; |
2302 /** |
2379 q.scanState = INACTIVE; |
2303 * Returns common pool queue for a thread that has submitted at |
2380 rs = lockRunState(); // publish index |
2304 * least one task. |
2381 if (rs > 0 && (ws = workQueues) != null && |
|
2382 k < ws.length && ws[k] == null) |
|
2383 ws[k] = q; // else terminated |
|
2384 unlockRunState(rs, rs & ~RSLOCK); |
|
2385 } |
|
2386 else |
|
2387 move = true; // move if busy |
|
2388 if (move) |
|
2389 r = ThreadLocalRandom.advanceProbe(r); |
|
2390 } |
|
2391 } |
|
2392 |
|
2393 /** |
|
2394 * Tries to add the given task to a submission queue at |
|
2395 * submitter's current queue. Only the (vastly) most common path |
|
2396 * is directly handled in this method, while screening for need |
|
2397 * for externalSubmit. |
|
2398 * |
|
2399 * @param task the task. Caller must ensure non-null. |
|
2400 */ |
|
2401 final void externalPush(ForkJoinTask<?> task) { |
|
2402 WorkQueue[] ws; WorkQueue q; int m; |
|
2403 int r = ThreadLocalRandom.getProbe(); |
|
2404 int rs = runState; |
|
2405 if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 && |
|
2406 (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 && |
|
2407 U.compareAndSwapInt(q, QLOCK, 0, 1)) { |
|
2408 ForkJoinTask<?>[] a; int am, n, s; |
|
2409 if ((a = q.array) != null && |
|
2410 (am = a.length - 1) > (n = (s = q.top) - q.base)) { |
|
2411 int j = ((am & s) << ASHIFT) + ABASE; |
|
2412 U.putOrderedObject(a, j, task); |
|
2413 U.putOrderedInt(q, QTOP, s + 1); |
|
2414 U.putOrderedInt(q, QLOCK, 0); |
|
2415 if (n <= 1) |
|
2416 signalWork(ws, q); |
|
2417 return; |
|
2418 } |
|
2419 U.compareAndSwapInt(q, QLOCK, 1, 0); |
|
2420 } |
|
2421 externalSubmit(task); |
|
2422 } |
|
2423 |
|
2424 /** |
|
2425 * Returns common pool queue for an external thread. |
2305 */ |
2426 */ |
2306 static WorkQueue commonSubmitterQueue() { |
2427 static WorkQueue commonSubmitterQueue() { |
2307 ForkJoinPool p; WorkQueue[] ws; int m, z; |
2428 ForkJoinPool p = common; |
2308 return ((z = ThreadLocalRandom.getProbe()) != 0 && |
2429 int r = ThreadLocalRandom.getProbe(); |
2309 (p = common) != null && |
2430 WorkQueue[] ws; int m; |
2310 (ws = p.workQueues) != null && |
2431 return (p != null && (ws = p.workQueues) != null && |
2311 (m = ws.length - 1) >= 0) ? |
2432 (m = ws.length - 1) >= 0) ? |
2312 ws[m & z & SQMASK] : null; |
2433 ws[m & r & SQMASK] : null; |
2313 } |
2434 } |
2314 |
2435 |
2315 /** |
2436 /** |
2316 * Tries to pop the given task from submitter's queue in common pool. |
2437 * Performs tryUnpush for an external submitter: Finds queue, |
|
2438 * locks if apparently non-empty, validates upon locking, and |
|
2439 * adjusts top. Each check can fail but rarely does. |
2317 */ |
2440 */ |
2318 final boolean tryExternalUnpush(ForkJoinTask<?> task) { |
2441 final boolean tryExternalUnpush(ForkJoinTask<?> task) { |
2319 WorkQueue joiner; ForkJoinTask<?>[] a; int m, s; |
2442 WorkQueue[] ws; WorkQueue w; ForkJoinTask<?>[] a; int m, s; |
2320 WorkQueue[] ws = workQueues; |
2443 int r = ThreadLocalRandom.getProbe(); |
2321 int z = ThreadLocalRandom.getProbe(); |
2444 if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && |
2322 boolean popped = false; |
2445 (w = ws[m & r & SQMASK]) != null && |
2323 if (ws != null && (m = ws.length - 1) >= 0 && |
2446 (a = w.array) != null && (s = w.top) != w.base) { |
2324 (joiner = ws[z & m & SQMASK]) != null && |
|
2325 joiner.base != (s = joiner.top) && |
|
2326 (a = joiner.array) != null) { |
|
2327 long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; |
2447 long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; |
2328 if (U.getObject(a, j) == task && |
2448 if (U.compareAndSwapInt(w, QLOCK, 0, 1)) { |
2329 U.compareAndSwapInt(joiner, QLOCK, 0, 1)) { |
2449 if (w.top == s && w.array == a && |
2330 if (joiner.top == s && joiner.array == a && |
2450 U.getObject(a, j) == task && |
2331 U.compareAndSwapObject(a, j, task, null)) { |
2451 U.compareAndSwapObject(a, j, task, null)) { |
2332 joiner.top = s - 1; |
2452 U.putOrderedInt(w, QTOP, s - 1); |
2333 popped = true; |
2453 U.putOrderedInt(w, QLOCK, 0); |
|
2454 return true; |
2334 } |
2455 } |
2335 joiner.qlock = 0; |
2456 U.compareAndSwapInt(w, QLOCK, 1, 0); |
2336 } |
2457 } |
2337 } |
2458 } |
2338 return popped; |
2459 return false; |
2339 } |
2460 } |
2340 |
2461 |
|
2462 /** |
|
2463 * Performs helpComplete for an external submitter. |
|
2464 */ |
2341 final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) { |
2465 final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) { |
2342 WorkQueue joiner; int m; |
2466 WorkQueue[] ws; int n; |
2343 WorkQueue[] ws = workQueues; |
2467 int r = ThreadLocalRandom.getProbe(); |
2344 int j = ThreadLocalRandom.getProbe(); |
2468 return ((ws = workQueues) == null || (n = ws.length) == 0) ? 0 : |
2345 int s = 0; |
2469 helpComplete(ws[(n - 1) & r & SQMASK], task, maxTasks); |
2346 if (ws != null && (m = ws.length - 1) >= 0 && |
|
2347 (joiner = ws[j & m & SQMASK]) != null && task != null) { |
|
2348 int scans = m + m + 1; |
|
2349 long c = 0L; // for stability check |
|
2350 j |= 1; // poll odd queues |
|
2351 for (int k = scans; ; j += 2) { |
|
2352 WorkQueue q; |
|
2353 if ((s = task.status) < 0) |
|
2354 break; |
|
2355 else if (joiner.externalPopAndExecCC(task)) { |
|
2356 if (--maxTasks <= 0) { |
|
2357 s = task.status; |
|
2358 break; |
|
2359 } |
|
2360 k = scans; |
|
2361 } |
|
2362 else if ((s = task.status) < 0) |
|
2363 break; |
|
2364 else if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) { |
|
2365 if (--maxTasks <= 0) { |
|
2366 s = task.status; |
|
2367 break; |
|
2368 } |
|
2369 k = scans; |
|
2370 } |
|
2371 else if (--k < 0) { |
|
2372 if (c == (c = ctl)) |
|
2373 break; |
|
2374 k = scans; |
|
2375 } |
|
2376 } |
|
2377 } |
|
2378 return s; |
|
2379 } |
2470 } |
2380 |
2471 |
2381 // Exported methods |
2472 // Exported methods |
2382 |
2473 |
2383 // Constructors |
2474 // Constructors |