111 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { |
110 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { |
112 |
111 |
113 /* |
112 /* |
114 * Overview: |
113 * Overview: |
115 * |
114 * |
116 * 1. Non-nullness of field result (set via CAS) indicates done. |
115 * A CompletableFuture may have dependent completion actions, |
117 * An AltResult is used to box null as a result, as well as to |
116 * collected in a linked stack. It atomically completes by CASing |
118 * hold exceptions. Using a single field makes completion fast |
117 * a result field, and then pops off and runs those actions. This |
119 * and simple to detect and trigger, at the expense of a lot of |
118 * applies across normal vs exceptional outcomes, sync vs async |
120 * encoding and decoding that infiltrates many methods. One minor |
119 * actions, binary triggers, and various forms of completions. |
121 * simplification relies on the (static) NIL (to box null results) |
120 * |
122 * being the only AltResult with a null exception field, so we |
121 * Non-nullness of field result (set via CAS) indicates done. An |
123 * don't usually need explicit comparisons with NIL. The CF |
122 * AltResult is used to box null as a result, as well as to hold |
124 * exception propagation mechanics surrounding decoding rely on |
123 * exceptions. Using a single field makes completion simple to |
125 * unchecked casts of decoded results really being unchecked, |
124 * detect and trigger. Encoding and decoding is straightforward |
126 * where user type errors are caught at point of use, as is |
125 * but adds to the sprawl of trapping and associating exceptions |
127 * currently the case in Java. These are highlighted by using |
126 * with targets. Minor simplifications rely on (static) NIL (to |
128 * SuppressWarnings-annotated temporaries. |
127 * box null results) being the only AltResult with a null |
129 * |
128 * exception field, so we don't usually need explicit comparisons. |
130 * 2. Waiters are held in a Treiber stack similar to the one used |
129 * Even though some of the generics casts are unchecked (see |
131 * in FutureTask, Phaser, and SynchronousQueue. See their |
130 * SuppressWarnings annotations), they are placed to be |
132 * internal documentation for algorithmic details. |
131 * appropriate even if checked. |
133 * |
132 * |
134 * 3. Completions are also kept in a list/stack, and pulled off |
133 * Dependent actions are represented by Completion objects linked |
135 * and run when completion is triggered. (We could even use the |
134 * as Treiber stacks headed by field "stack". There are Completion |
136 * same stack as for waiters, but would give up the potential |
135 * classes for each kind of action, grouped into single-input |
137 * parallelism obtained because woken waiters help release/run |
136 * (UniCompletion), two-input (BiCompletion), projected |
138 * others -- see method postComplete). Because post-processing |
137 * (BiCompletions using either (not both) of two inputs), shared |
139 * may race with direct calls, class Completion opportunistically |
138 * (CoCompletion, used by the second of two sources), zero-input |
140 * extends AtomicInteger so callers can claim the action via |
139 * source actions, and Signallers that unblock waiters. Class |
141 * compareAndSet(0, 1). The Completion.run methods are all |
140 * Completion extends ForkJoinTask to enable async execution |
142 * written a boringly similar uniform way (that sometimes includes |
141 * (adding no space overhead because we exploit its "tag" methods |
143 * unnecessary-looking checks, kept to maintain uniformity). |
142 * to maintain claims). It is also declared as Runnable to allow |
144 * There are enough dimensions upon which they differ that |
143 * usage with arbitrary executors. |
145 * attempts to factor commonalities while maintaining efficiency |
144 * |
146 * require more lines of code than they would save. |
145 * Support for each kind of CompletionStage relies on a separate |
147 * |
146 * class, along with two CompletableFuture methods: |
148 * 4. The exported then/and/or methods do support a bit of |
147 * |
149 * factoring (see doThenApply etc). They must cope with the |
148 * * A Completion class with name X corresponding to function, |
150 * intrinsic races surrounding addition of a dependent action |
149 * prefaced with "Uni", "Bi", or "Or". Each class contains |
151 * versus performing the action directly because the task is |
150 * fields for source(s), actions, and dependent. They are |
152 * already complete. For example, a CF may not be complete upon |
151 * boringly similar, differing from others only with respect to |
153 * entry, so a dependent completion is added, but by the time it |
152 * underlying functional forms. We do this so that users don't |
154 * is added, the target CF is complete, so must be directly |
153 * encounter layers of adaptors in common usages. We also |
155 * executed. This is all done while avoiding unnecessary object |
154 * include "Relay" classes/methods that don't correspond to user |
156 * construction in safe-bypass cases. |
155 * methods; they copy results from one stage to another. |
157 */ |
156 * |
158 |
157 * * Boolean CompletableFuture method x(...) (for example |
159 // preliminaries |
158 * uniApply) takes all of the arguments needed to check that an |
160 |
159 * action is triggerable, and then either runs the action or |
161 static final class AltResult { |
160 * arranges its async execution by executing its Completion |
162 final Throwable ex; // null only for NIL |
161 * argument, if present. The method returns true if known to be |
163 AltResult(Throwable ex) { this.ex = ex; } |
162 * complete. |
164 } |
163 * |
165 |
164 * * Completion method tryFire(int mode) invokes the associated x |
|
165 * method with its held arguments, and on success cleans up. |
|
166 * The mode argument allows tryFire to be called twice (SYNC, |
|
167 * then ASYNC); the first to screen and trap exceptions while |
|
168 * arranging to execute, and the second when called from a |
|
169 * task. (A few classes are not used async so take slightly |
|
170 * different forms.) The claim() callback suppresses function |
|
171 * invocation if already claimed by another thread. |
|
172 * |
|
173 * * CompletableFuture method xStage(...) is called from a public |
|
174 * stage method of CompletableFuture x. It screens user |
|
175 * arguments and invokes and/or creates the stage object. If |
|
176 * not async and x is already complete, the action is run |
|
177 * immediately. Otherwise a Completion c is created, pushed to |
|
178 * x's stack (unless done), and started or triggered via |
|
179 * c.tryFire. This also covers races possible if x completes |
|
180 * while pushing. Classes with two inputs (for example BiApply) |
|
181 * deal with races across both while pushing actions. The |
|
182 * second completion is a CoCompletion pointing to the first, |
|
183 * shared so that at most one performs the action. The |
|
184 * multiple-arity methods allOf and anyOf do this pairwise to |
|
185 * form trees of completions. |
|
186 * |
|
187 * Note that the generic type parameters of methods vary according |
|
188 * to whether "this" is a source, dependent, or completion. |
|
189 * |
|
190 * Method postComplete is called upon completion unless the target |
|
191 * is guaranteed not to be observable (i.e., not yet returned or |
|
192 * linked). Multiple threads can call postComplete, which |
|
193 * atomically pops each dependent action, and tries to trigger it |
|
194 * via method tryFire, in NESTED mode. Triggering can propagate |
|
195 * recursively, so NESTED mode returns its completed dependent (if |
|
196 * one exists) for further processing by its caller (see method |
|
197 * postFire). |
|
198 * |
|
199 * Blocking methods get() and join() rely on Signaller Completions |
|
200 * that wake up waiting threads. The mechanics are similar to |
|
201 * Treiber stack wait-nodes used in FutureTask, Phaser, and |
|
202 * SynchronousQueue. See their internal documentation for |
|
203 * algorithmic details. |
|
204 * |
|
205 * Without precautions, CompletableFutures would be prone to |
|
206 * garbage accumulation as chains of Completions build up, each |
|
207 * pointing back to its sources. So we null out fields as soon as |
|
208 * possible (see especially method Completion.detach). The |
|
209 * screening checks needed anyway harmlessly ignore null arguments |
|
210 * that may have been obtained during races with threads nulling |
|
211 * out fields. We also try to unlink fired Completions from |
|
212 * stacks that might never be popped (see method postFire). |
|
213 * Completion fields need not be declared as final or volatile |
|
214 * because they are only visible to other threads upon safe |
|
215 * publication. |
|
216 */ |
|
217 |
|
218 volatile Object result; // Either the result or boxed AltResult |
|
219 volatile Completion stack; // Top of Treiber stack of dependent actions |
|
220 |
|
221 final boolean internalComplete(Object r) { // CAS from null to r |
|
222 return UNSAFE.compareAndSwapObject(this, RESULT, null, r); |
|
223 } |
|
224 |
|
225 final boolean casStack(Completion cmp, Completion val) { |
|
226 return UNSAFE.compareAndSwapObject(this, STACK, cmp, val); |
|
227 } |
|
228 |
|
229 /** Returns true if successfully pushed c onto stack. */ |
|
230 final boolean tryPushStack(Completion c) { |
|
231 Completion h = stack; |
|
232 lazySetNext(c, h); |
|
233 return UNSAFE.compareAndSwapObject(this, STACK, h, c); |
|
234 } |
|
235 |
|
236 /** Unconditionally pushes c onto stack, retrying if necessary. */ |
|
237 final void pushStack(Completion c) { |
|
238 do {} while (!tryPushStack(c)); |
|
239 } |
|
240 |
|
241 /* ------------- Encoding and decoding outcomes -------------- */ |
|
242 |
|
243 static final class AltResult { // See above |
|
244 final Throwable ex; // null only for NIL |
|
245 AltResult(Throwable x) { this.ex = x; } |
|
246 } |
|
247 |
|
248 /** The encoding of the null value. */ |
166 static final AltResult NIL = new AltResult(null); |
249 static final AltResult NIL = new AltResult(null); |
167 |
250 |
168 // Fields |
251 /** Completes with the null value, unless already completed. */ |
169 |
252 final boolean completeNull() { |
170 volatile Object result; // Either the result or boxed AltResult |
253 return UNSAFE.compareAndSwapObject(this, RESULT, null, |
171 volatile WaitNode waiters; // Treiber stack of threads blocked on get() |
254 NIL); |
172 volatile CompletionNode completions; // list (Treiber stack) of completions |
255 } |
173 |
256 |
174 // Basic utilities for triggering and processing completions |
257 /** Returns the encoding of the given non-exceptional value. */ |
175 |
258 final Object encodeValue(T t) { |
176 /** |
259 return (t == null) ? NIL : t; |
177 * Removes and signals all waiting threads and runs all completions. |
260 } |
|
261 |
|
262 /** Completes with a non-exceptional result, unless already completed. */ |
|
263 final boolean completeValue(T t) { |
|
264 return UNSAFE.compareAndSwapObject(this, RESULT, null, |
|
265 (t == null) ? NIL : t); |
|
266 } |
|
267 |
|
268 /** |
|
269 * Returns the encoding of the given (non-null) exception as a |
|
270 * wrapped CompletionException unless it is one already. |
|
271 */ |
|
272 static AltResult encodeThrowable(Throwable x) { |
|
273 return new AltResult((x instanceof CompletionException) ? x : |
|
274 new CompletionException(x)); |
|
275 } |
|
276 |
|
277 /** Completes with an exceptional result, unless already completed. */ |
|
278 final boolean completeThrowable(Throwable x) { |
|
279 return UNSAFE.compareAndSwapObject(this, RESULT, null, |
|
280 encodeThrowable(x)); |
|
281 } |
|
282 |
|
283 /** |
|
284 * Returns the encoding of the given (non-null) exception as a |
|
285 * wrapped CompletionException unless it is one already. May |
|
286 * return the given Object r (which must have been the result of a |
|
287 * source future) if it is equivalent, i.e. if this is a simple |
|
288 * relay of an existing CompletionException. |
|
289 */ |
|
290 static Object encodeThrowable(Throwable x, Object r) { |
|
291 if (!(x instanceof CompletionException)) |
|
292 x = new CompletionException(x); |
|
293 else if (r instanceof AltResult && x == ((AltResult)r).ex) |
|
294 return r; |
|
295 return new AltResult(x); |
|
296 } |
|
297 |
|
298 /** |
|
299 * Completes with the given (non-null) exceptional result as a |
|
300 * wrapped CompletionException unless it is one already, unless |
|
301 * already completed. May complete with the given Object r |
|
302 * (which must have been the result of a source future) if it is |
|
303 * equivalent, i.e. if this is a simple propagation of an |
|
304 * existing CompletionException. |
|
305 */ |
|
306 final boolean completeThrowable(Throwable x, Object r) { |
|
307 return UNSAFE.compareAndSwapObject(this, RESULT, null, |
|
308 encodeThrowable(x, r)); |
|
309 } |
|
310 |
|
311 /** |
|
312 * Returns the encoding of the given arguments: if the exception |
|
313 * is non-null, encodes as AltResult. Otherwise uses the given |
|
314 * value, boxed as NIL if null. |
|
315 */ |
|
316 Object encodeOutcome(T t, Throwable x) { |
|
317 return (x == null) ? (t == null) ? NIL : t : encodeThrowable(x); |
|
318 } |
|
319 |
|
320 /** |
|
321 * Returns the encoding of a copied outcome; if exceptional, |
|
322 * rewraps as a CompletionException, else returns argument. |
|
323 */ |
|
324 static Object encodeRelay(Object r) { |
|
325 Throwable x; |
|
326 return (((r instanceof AltResult) && |
|
327 (x = ((AltResult)r).ex) != null && |
|
328 !(x instanceof CompletionException)) ? |
|
329 new AltResult(new CompletionException(x)) : r); |
|
330 } |
|
331 |
|
332 /** |
|
333 * Completes with r or a copy of r, unless already completed. |
|
334 * If exceptional, r is first coerced to a CompletionException. |
|
335 */ |
|
336 final boolean completeRelay(Object r) { |
|
337 return UNSAFE.compareAndSwapObject(this, RESULT, null, |
|
338 encodeRelay(r)); |
|
339 } |
|
340 |
|
341 /** |
|
342 * Reports result using Future.get conventions. |
|
343 */ |
|
344 private static <T> T reportGet(Object r) |
|
345 throws InterruptedException, ExecutionException { |
|
346 if (r == null) // by convention below, null means interrupted |
|
347 throw new InterruptedException(); |
|
348 if (r instanceof AltResult) { |
|
349 Throwable x, cause; |
|
350 if ((x = ((AltResult)r).ex) == null) |
|
351 return null; |
|
352 if (x instanceof CancellationException) |
|
353 throw (CancellationException)x; |
|
354 if ((x instanceof CompletionException) && |
|
355 (cause = x.getCause()) != null) |
|
356 x = cause; |
|
357 throw new ExecutionException(x); |
|
358 } |
|
359 @SuppressWarnings("unchecked") T t = (T) r; |
|
360 return t; |
|
361 } |
|
362 |
|
363 /** |
|
364 * Decodes outcome to return result or throw unchecked exception. |
|
365 */ |
|
366 private static <T> T reportJoin(Object r) { |
|
367 if (r instanceof AltResult) { |
|
368 Throwable x; |
|
369 if ((x = ((AltResult)r).ex) == null) |
|
370 return null; |
|
371 if (x instanceof CancellationException) |
|
372 throw (CancellationException)x; |
|
373 if (x instanceof CompletionException) |
|
374 throw (CompletionException)x; |
|
375 throw new CompletionException(x); |
|
376 } |
|
377 @SuppressWarnings("unchecked") T t = (T) r; |
|
378 return t; |
|
379 } |
|
380 |
|
381 /* ------------- Async task preliminaries -------------- */ |
|
382 |
|
383 /** |
|
384 * A marker interface identifying asynchronous tasks produced by |
|
385 * {@code async} methods. This may be useful for monitoring, |
|
386 * debugging, and tracking asynchronous activities. |
|
387 * |
|
388 * @since 1.8 |
|
389 */ |
|
390 public static interface AsynchronousCompletionTask { |
|
391 } |
|
392 |
|
393 private static final boolean useCommonPool = |
|
394 (ForkJoinPool.getCommonPoolParallelism() > 1); |
|
395 |
|
396 /** |
|
397 * Default executor -- ForkJoinPool.commonPool() unless it cannot |
|
398 * support parallelism. |
|
399 */ |
|
400 private static final Executor asyncPool = useCommonPool ? |
|
401 ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); |
|
402 |
|
403 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ |
|
404 static final class ThreadPerTaskExecutor implements Executor { |
|
405 public void execute(Runnable r) { new Thread(r).start(); } |
|
406 } |
|
407 |
|
408 /** |
|
409 * Null-checks user executor argument, and translates uses of |
|
410 * commonPool to asyncPool in case parallelism disabled. |
|
411 */ |
|
412 static Executor screenExecutor(Executor e) { |
|
413 if (!useCommonPool && e == ForkJoinPool.commonPool()) |
|
414 return asyncPool; |
|
415 if (e == null) throw new NullPointerException(); |
|
416 return e; |
|
417 } |
|
418 |
|
419 // Modes for Completion.tryFire. Signedness matters. |
|
420 static final int SYNC = 0; |
|
421 static final int ASYNC = 1; |
|
422 static final int NESTED = -1; |
|
423 |
|
424 /* ------------- Base Completion classes and operations -------------- */ |
|
425 |
|
426 @SuppressWarnings("serial") |
|
427 abstract static class Completion extends ForkJoinTask<Void> |
|
428 implements Runnable, AsynchronousCompletionTask { |
|
429 volatile Completion next; // Treiber stack link |
|
430 |
|
431 /** |
|
432 * Performs completion action if triggered, returning a |
|
433 * dependent that may need propagation, if one exists. |
|
434 * |
|
435 * @param mode SYNC, ASYNC, or NESTED |
|
436 */ |
|
437 abstract CompletableFuture<?> tryFire(int mode); |
|
438 |
|
439 /** Returns true if possibly still triggerable. Used by cleanStack. */ |
|
440 abstract boolean isLive(); |
|
441 |
|
442 public final void run() { tryFire(ASYNC); } |
|
443 public final boolean exec() { tryFire(ASYNC); return true; } |
|
444 public final Void getRawResult() { return null; } |
|
445 public final void setRawResult(Void v) {} |
|
446 } |
|
447 |
|
448 static void lazySetNext(Completion c, Completion next) { |
|
449 UNSAFE.putOrderedObject(c, NEXT, next); |
|
450 } |
|
451 |
|
452 /** |
|
453 * Pops and tries to trigger all reachable dependents. Call only |
|
454 * when known to be done. |
178 */ |
455 */ |
179 final void postComplete() { |
456 final void postComplete() { |
180 WaitNode q; Thread t; |
457 /* |
181 while ((q = waiters) != null) { |
458 * On each step, variable f holds current dependents to pop |
182 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) && |
459 * and run. It is extended along only one path at a time, |
183 (t = q.thread) != null) { |
460 * pushing others to avoid unbounded recursion. |
184 q.thread = null; |
461 */ |
185 LockSupport.unpark(t); |
462 CompletableFuture<?> f = this; Completion h; |
186 } |
463 while ((h = f.stack) != null || |
187 } |
464 (f != this && (h = (f = this).stack) != null)) { |
188 |
465 CompletableFuture<?> d; Completion t; |
189 CompletionNode h; Completion c; |
466 if (f.casStack(h, t = h.next)) { |
190 while ((h = completions) != null) { |
467 if (t != null) { |
191 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) && |
468 if (f != this) { |
192 (c = h.completion) != null) |
469 pushStack(h); |
193 c.run(); |
470 continue; |
194 } |
471 } |
195 } |
472 h.next = null; // detach |
196 |
473 } |
197 /** |
474 f = (d = h.tryFire(NESTED)) == null ? this : d; |
198 * Triggers completion with the encoding of the given arguments: |
475 } |
199 * if the exception is non-null, encodes it as a wrapped |
476 } |
200 * CompletionException unless it is one already. Otherwise uses |
477 } |
201 * the given result, boxed as NIL if null. |
478 |
202 */ |
479 /** Traverses stack and unlinks dead Completions. */ |
203 final void internalComplete(T v, Throwable ex) { |
480 final void cleanStack() { |
|
481 for (Completion p = null, q = stack; q != null;) { |
|
482 Completion s = q.next; |
|
483 if (q.isLive()) { |
|
484 p = q; |
|
485 q = s; |
|
486 } |
|
487 else if (p == null) { |
|
488 casStack(q, s); |
|
489 q = stack; |
|
490 } |
|
491 else { |
|
492 p.next = s; |
|
493 if (p.isLive()) |
|
494 q = s; |
|
495 else { |
|
496 p = null; // restart |
|
497 q = stack; |
|
498 } |
|
499 } |
|
500 } |
|
501 } |
|
502 |
|
503 /* ------------- One-input Completions -------------- */ |
|
504 |
|
505 /** A Completion with a source, dependent, and executor. */ |
|
506 @SuppressWarnings("serial") |
|
507 abstract static class UniCompletion<T,V> extends Completion { |
|
508 Executor executor; // executor to use (null if none) |
|
509 CompletableFuture<V> dep; // the dependent to complete |
|
510 CompletableFuture<T> src; // source for action |
|
511 |
|
512 UniCompletion(Executor executor, CompletableFuture<V> dep, |
|
513 CompletableFuture<T> src) { |
|
514 this.executor = executor; this.dep = dep; this.src = src; |
|
515 } |
|
516 |
|
517 /** |
|
518 * Returns true if action can be run. Call only when known to |
|
519 * be triggerable. Uses FJ tag bit to ensure that only one |
|
520 * thread claims ownership. If async, starts as task -- a |
|
521 * later call to tryFire will run action. |
|
522 */ |
|
523 final boolean claim() { |
|
524 Executor e = executor; |
|
525 if (compareAndSetForkJoinTaskTag((short)0, (short)1)) { |
|
526 if (e == null) |
|
527 return true; |
|
528 executor = null; // disable |
|
529 e.execute(this); |
|
530 } |
|
531 return false; |
|
532 } |
|
533 |
|
534 final boolean isLive() { return dep != null; } |
|
535 } |
|
536 |
|
537 /** Pushes the given completion (if it exists) unless done. */ |
|
538 final void push(UniCompletion<?,?> c) { |
|
539 if (c != null) { |
|
540 while (result == null && !tryPushStack(c)) |
|
541 lazySetNext(c, null); // clear on failure |
|
542 } |
|
543 } |
|
544 |
|
545 /** |
|
546 * Post-processing by dependent after successful UniCompletion |
|
547 * tryFire. Tries to clean stack of source a, and then either runs |
|
548 * postComplete or returns this to caller, depending on mode. |
|
549 */ |
|
550 final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) { |
|
551 if (a != null && a.stack != null) { |
|
552 if (mode < 0 || a.result == null) |
|
553 a.cleanStack(); |
|
554 else |
|
555 a.postComplete(); |
|
556 } |
|
557 if (result != null && stack != null) { |
|
558 if (mode < 0) |
|
559 return this; |
|
560 else |
|
561 postComplete(); |
|
562 } |
|
563 return null; |
|
564 } |
|
565 |
|
566 @SuppressWarnings("serial") |
|
567 static final class UniApply<T,V> extends UniCompletion<T,V> { |
|
568 Function<? super T,? extends V> fn; |
|
569 UniApply(Executor executor, CompletableFuture<V> dep, |
|
570 CompletableFuture<T> src, |
|
571 Function<? super T,? extends V> fn) { |
|
572 super(executor, dep, src); this.fn = fn; |
|
573 } |
|
574 final CompletableFuture<V> tryFire(int mode) { |
|
575 CompletableFuture<V> d; CompletableFuture<T> a; |
|
576 if ((d = dep) == null || |
|
577 !d.uniApply(a = src, fn, mode > 0 ? null : this)) |
|
578 return null; |
|
579 dep = null; src = null; fn = null; |
|
580 return d.postFire(a, mode); |
|
581 } |
|
582 } |
|
583 |
|
584 final <S> boolean uniApply(CompletableFuture<S> a, |
|
585 Function<? super S,? extends T> f, |
|
586 UniApply<S,T> c) { |
|
587 Object r; Throwable x; |
|
588 if (a == null || (r = a.result) == null || f == null) |
|
589 return false; |
|
590 tryComplete: if (result == null) { |
|
591 if (r instanceof AltResult) { |
|
592 if ((x = ((AltResult)r).ex) != null) { |
|
593 completeThrowable(x, r); |
|
594 break tryComplete; |
|
595 } |
|
596 r = null; |
|
597 } |
|
598 try { |
|
599 if (c != null && !c.claim()) |
|
600 return false; |
|
601 @SuppressWarnings("unchecked") S s = (S) r; |
|
602 completeValue(f.apply(s)); |
|
603 } catch (Throwable ex) { |
|
604 completeThrowable(ex); |
|
605 } |
|
606 } |
|
607 return true; |
|
608 } |
|
609 |
|
610 private <V> CompletableFuture<V> uniApplyStage( |
|
611 Executor e, Function<? super T,? extends V> f) { |
|
612 if (f == null) throw new NullPointerException(); |
|
613 CompletableFuture<V> d = new CompletableFuture<V>(); |
|
614 if (e != null || !d.uniApply(this, f, null)) { |
|
615 UniApply<T,V> c = new UniApply<T,V>(e, d, this, f); |
|
616 push(c); |
|
617 c.tryFire(SYNC); |
|
618 } |
|
619 return d; |
|
620 } |
|
621 |
|
622 @SuppressWarnings("serial") |
|
623 static final class UniAccept<T> extends UniCompletion<T,Void> { |
|
624 Consumer<? super T> fn; |
|
625 UniAccept(Executor executor, CompletableFuture<Void> dep, |
|
626 CompletableFuture<T> src, Consumer<? super T> fn) { |
|
627 super(executor, dep, src); this.fn = fn; |
|
628 } |
|
629 final CompletableFuture<Void> tryFire(int mode) { |
|
630 CompletableFuture<Void> d; CompletableFuture<T> a; |
|
631 if ((d = dep) == null || |
|
632 !d.uniAccept(a = src, fn, mode > 0 ? null : this)) |
|
633 return null; |
|
634 dep = null; src = null; fn = null; |
|
635 return d.postFire(a, mode); |
|
636 } |
|
637 } |
|
638 |
|
639 final <S> boolean uniAccept(CompletableFuture<S> a, |
|
640 Consumer<? super S> f, UniAccept<S> c) { |
|
641 Object r; Throwable x; |
|
642 if (a == null || (r = a.result) == null || f == null) |
|
643 return false; |
|
644 tryComplete: if (result == null) { |
|
645 if (r instanceof AltResult) { |
|
646 if ((x = ((AltResult)r).ex) != null) { |
|
647 completeThrowable(x, r); |
|
648 break tryComplete; |
|
649 } |
|
650 r = null; |
|
651 } |
|
652 try { |
|
653 if (c != null && !c.claim()) |
|
654 return false; |
|
655 @SuppressWarnings("unchecked") S s = (S) r; |
|
656 f.accept(s); |
|
657 completeNull(); |
|
658 } catch (Throwable ex) { |
|
659 completeThrowable(ex); |
|
660 } |
|
661 } |
|
662 return true; |
|
663 } |
|
664 |
|
665 private CompletableFuture<Void> uniAcceptStage(Executor e, |
|
666 Consumer<? super T> f) { |
|
667 if (f == null) throw new NullPointerException(); |
|
668 CompletableFuture<Void> d = new CompletableFuture<Void>(); |
|
669 if (e != null || !d.uniAccept(this, f, null)) { |
|
670 UniAccept<T> c = new UniAccept<T>(e, d, this, f); |
|
671 push(c); |
|
672 c.tryFire(SYNC); |
|
673 } |
|
674 return d; |
|
675 } |
|
676 |
|
677 @SuppressWarnings("serial") |
|
678 static final class UniRun<T> extends UniCompletion<T,Void> { |
|
679 Runnable fn; |
|
680 UniRun(Executor executor, CompletableFuture<Void> dep, |
|
681 CompletableFuture<T> src, Runnable fn) { |
|
682 super(executor, dep, src); this.fn = fn; |
|
683 } |
|
684 final CompletableFuture<Void> tryFire(int mode) { |
|
685 CompletableFuture<Void> d; CompletableFuture<T> a; |
|
686 if ((d = dep) == null || |
|
687 !d.uniRun(a = src, fn, mode > 0 ? null : this)) |
|
688 return null; |
|
689 dep = null; src = null; fn = null; |
|
690 return d.postFire(a, mode); |
|
691 } |
|
692 } |
|
693 |
|
694 final boolean uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c) { |
|
695 Object r; Throwable x; |
|
696 if (a == null || (r = a.result) == null || f == null) |
|
697 return false; |
|
698 if (result == null) { |
|
699 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) |
|
700 completeThrowable(x, r); |
|
701 else |
|
702 try { |
|
703 if (c != null && !c.claim()) |
|
704 return false; |
|
705 f.run(); |
|
706 completeNull(); |
|
707 } catch (Throwable ex) { |
|
708 completeThrowable(ex); |
|
709 } |
|
710 } |
|
711 return true; |
|
712 } |
|
713 |
|
714 private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) { |
|
715 if (f == null) throw new NullPointerException(); |
|
716 CompletableFuture<Void> d = new CompletableFuture<Void>(); |
|
717 if (e != null || !d.uniRun(this, f, null)) { |
|
718 UniRun<T> c = new UniRun<T>(e, d, this, f); |
|
719 push(c); |
|
720 c.tryFire(SYNC); |
|
721 } |
|
722 return d; |
|
723 } |
|
724 |
|
725 @SuppressWarnings("serial") |
|
726 static final class UniWhenComplete<T> extends UniCompletion<T,T> { |
|
727 BiConsumer<? super T, ? super Throwable> fn; |
|
728 UniWhenComplete(Executor executor, CompletableFuture<T> dep, |
|
729 CompletableFuture<T> src, |
|
730 BiConsumer<? super T, ? super Throwable> fn) { |
|
731 super(executor, dep, src); this.fn = fn; |
|
732 } |
|
733 final CompletableFuture<T> tryFire(int mode) { |
|
734 CompletableFuture<T> d; CompletableFuture<T> a; |
|
735 if ((d = dep) == null || |
|
736 !d.uniWhenComplete(a = src, fn, mode > 0 ? null : this)) |
|
737 return null; |
|
738 dep = null; src = null; fn = null; |
|
739 return d.postFire(a, mode); |
|
740 } |
|
741 } |
|
742 |
|
743 final boolean uniWhenComplete(CompletableFuture<T> a, |
|
744 BiConsumer<? super T,? super Throwable> f, |
|
745 UniWhenComplete<T> c) { |
|
746 Object r; T t; Throwable x = null; |
|
747 if (a == null || (r = a.result) == null || f == null) |
|
748 return false; |
|
749 if (result == null) { |
|
750 try { |
|
751 if (c != null && !c.claim()) |
|
752 return false; |
|
753 if (r instanceof AltResult) { |
|
754 x = ((AltResult)r).ex; |
|
755 t = null; |
|
756 } else { |
|
757 @SuppressWarnings("unchecked") T tr = (T) r; |
|
758 t = tr; |
|
759 } |
|
760 f.accept(t, x); |
|
761 if (x == null) { |
|
762 internalComplete(r); |
|
763 return true; |
|
764 } |
|
765 } catch (Throwable ex) { |
|
766 if (x == null) |
|
767 x = ex; |
|
768 } |
|
769 completeThrowable(x, r); |
|
770 } |
|
771 return true; |
|
772 } |
|
773 |
|
774 private CompletableFuture<T> uniWhenCompleteStage( |
|
775 Executor e, BiConsumer<? super T, ? super Throwable> f) { |
|
776 if (f == null) throw new NullPointerException(); |
|
777 CompletableFuture<T> d = new CompletableFuture<T>(); |
|
778 if (e != null || !d.uniWhenComplete(this, f, null)) { |
|
779 UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f); |
|
780 push(c); |
|
781 c.tryFire(SYNC); |
|
782 } |
|
783 return d; |
|
784 } |
|
785 |
|
786 @SuppressWarnings("serial") |
|
787 static final class UniHandle<T,V> extends UniCompletion<T,V> { |
|
788 BiFunction<? super T, Throwable, ? extends V> fn; |
|
789 UniHandle(Executor executor, CompletableFuture<V> dep, |
|
790 CompletableFuture<T> src, |
|
791 BiFunction<? super T, Throwable, ? extends V> fn) { |
|
792 super(executor, dep, src); this.fn = fn; |
|
793 } |
|
794 final CompletableFuture<V> tryFire(int mode) { |
|
795 CompletableFuture<V> d; CompletableFuture<T> a; |
|
796 if ((d = dep) == null || |
|
797 !d.uniHandle(a = src, fn, mode > 0 ? null : this)) |
|
798 return null; |
|
799 dep = null; src = null; fn = null; |
|
800 return d.postFire(a, mode); |
|
801 } |
|
802 } |
|
803 |
|
804 final <S> boolean uniHandle(CompletableFuture<S> a, |
|
805 BiFunction<? super S, Throwable, ? extends T> f, |
|
806 UniHandle<S,T> c) { |
|
807 Object r; S s; Throwable x; |
|
808 if (a == null || (r = a.result) == null || f == null) |
|
809 return false; |
|
810 if (result == null) { |
|
811 try { |
|
812 if (c != null && !c.claim()) |
|
813 return false; |
|
814 if (r instanceof AltResult) { |
|
815 x = ((AltResult)r).ex; |
|
816 s = null; |
|
817 } else { |
|
818 x = null; |
|
819 @SuppressWarnings("unchecked") S ss = (S) r; |
|
820 s = ss; |
|
821 } |
|
822 completeValue(f.apply(s, x)); |
|
823 } catch (Throwable ex) { |
|
824 completeThrowable(ex); |
|
825 } |
|
826 } |
|
827 return true; |
|
828 } |
|
829 |
|
830 private <V> CompletableFuture<V> uniHandleStage( |
|
831 Executor e, BiFunction<? super T, Throwable, ? extends V> f) { |
|
832 if (f == null) throw new NullPointerException(); |
|
833 CompletableFuture<V> d = new CompletableFuture<V>(); |
|
834 if (e != null || !d.uniHandle(this, f, null)) { |
|
835 UniHandle<T,V> c = new UniHandle<T,V>(e, d, this, f); |
|
836 push(c); |
|
837 c.tryFire(SYNC); |
|
838 } |
|
839 return d; |
|
840 } |
|
841 |
|
842 @SuppressWarnings("serial") |
|
843 static final class UniExceptionally<T> extends UniCompletion<T,T> { |
|
844 Function<? super Throwable, ? extends T> fn; |
|
845 UniExceptionally(CompletableFuture<T> dep, CompletableFuture<T> src, |
|
846 Function<? super Throwable, ? extends T> fn) { |
|
847 super(null, dep, src); this.fn = fn; |
|
848 } |
|
849 final CompletableFuture<T> tryFire(int mode) { // never ASYNC |
|
850 // assert mode != ASYNC; |
|
851 CompletableFuture<T> d; CompletableFuture<T> a; |
|
852 if ((d = dep) == null || !d.uniExceptionally(a = src, fn, this)) |
|
853 return null; |
|
854 dep = null; src = null; fn = null; |
|
855 return d.postFire(a, mode); |
|
856 } |
|
857 } |
|
858 |
|
859 final boolean uniExceptionally(CompletableFuture<T> a, |
|
860 Function<? super Throwable, ? extends T> f, |
|
861 UniExceptionally<T> c) { |
|
862 Object r; Throwable x; |
|
863 if (a == null || (r = a.result) == null || f == null) |
|
864 return false; |
|
865 if (result == null) { |
|
866 try { |
|
867 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) { |
|
868 if (c != null && !c.claim()) |
|
869 return false; |
|
870 completeValue(f.apply(x)); |
|
871 } else |
|
872 internalComplete(r); |
|
873 } catch (Throwable ex) { |
|
874 completeThrowable(ex); |
|
875 } |
|
876 } |
|
877 return true; |
|
878 } |
|
879 |
|
880 private CompletableFuture<T> uniExceptionallyStage( |
|
881 Function<Throwable, ? extends T> f) { |
|
882 if (f == null) throw new NullPointerException(); |
|
883 CompletableFuture<T> d = new CompletableFuture<T>(); |
|
884 if (!d.uniExceptionally(this, f, null)) { |
|
885 UniExceptionally<T> c = new UniExceptionally<T>(d, this, f); |
|
886 push(c); |
|
887 c.tryFire(SYNC); |
|
888 } |
|
889 return d; |
|
890 } |
|
891 |
|
892 @SuppressWarnings("serial") |
|
893 static final class UniRelay<T> extends UniCompletion<T,T> { // for Compose |
|
894 UniRelay(CompletableFuture<T> dep, CompletableFuture<T> src) { |
|
895 super(null, dep, src); |
|
896 } |
|
897 final CompletableFuture<T> tryFire(int mode) { |
|
898 CompletableFuture<T> d; CompletableFuture<T> a; |
|
899 if ((d = dep) == null || !d.uniRelay(a = src)) |
|
900 return null; |
|
901 src = null; dep = null; |
|
902 return d.postFire(a, mode); |
|
903 } |
|
904 } |
|
905 |
|
906 final boolean uniRelay(CompletableFuture<T> a) { |
|
907 Object r; |
|
908 if (a == null || (r = a.result) == null) |
|
909 return false; |
|
910 if (result == null) // no need to claim |
|
911 completeRelay(r); |
|
912 return true; |
|
913 } |
|
914 |
|
915 @SuppressWarnings("serial") |
|
916 static final class UniCompose<T,V> extends UniCompletion<T,V> { |
|
917 Function<? super T, ? extends CompletionStage<V>> fn; |
|
918 UniCompose(Executor executor, CompletableFuture<V> dep, |
|
919 CompletableFuture<T> src, |
|
920 Function<? super T, ? extends CompletionStage<V>> fn) { |
|
921 super(executor, dep, src); this.fn = fn; |
|
922 } |
|
923 final CompletableFuture<V> tryFire(int mode) { |
|
924 CompletableFuture<V> d; CompletableFuture<T> a; |
|
925 if ((d = dep) == null || |
|
926 !d.uniCompose(a = src, fn, mode > 0 ? null : this)) |
|
927 return null; |
|
928 dep = null; src = null; fn = null; |
|
929 return d.postFire(a, mode); |
|
930 } |
|
931 } |
|
932 |
|
933 final <S> boolean uniCompose( |
|
934 CompletableFuture<S> a, |
|
935 Function<? super S, ? extends CompletionStage<T>> f, |
|
936 UniCompose<S,T> c) { |
|
937 Object r; Throwable x; |
|
938 if (a == null || (r = a.result) == null || f == null) |
|
939 return false; |
|
940 tryComplete: if (result == null) { |
|
941 if (r instanceof AltResult) { |
|
942 if ((x = ((AltResult)r).ex) != null) { |
|
943 completeThrowable(x, r); |
|
944 break tryComplete; |
|
945 } |
|
946 r = null; |
|
947 } |
|
948 try { |
|
949 if (c != null && !c.claim()) |
|
950 return false; |
|
951 @SuppressWarnings("unchecked") S s = (S) r; |
|
952 CompletableFuture<T> g = f.apply(s).toCompletableFuture(); |
|
953 if (g.result == null || !uniRelay(g)) { |
|
954 UniRelay<T> copy = new UniRelay<T>(this, g); |
|
955 g.push(copy); |
|
956 copy.tryFire(SYNC); |
|
957 if (result == null) |
|
958 return false; |
|
959 } |
|
960 } catch (Throwable ex) { |
|
961 completeThrowable(ex); |
|
962 } |
|
963 } |
|
964 return true; |
|
965 } |
|
966 |
|
967 private <V> CompletableFuture<V> uniComposeStage( |
|
968 Executor e, Function<? super T, ? extends CompletionStage<V>> f) { |
|
969 if (f == null) throw new NullPointerException(); |
|
970 Object r; Throwable x; |
|
971 if (e == null && (r = result) != null) { |
|
972 // try to return function result directly |
|
973 if (r instanceof AltResult) { |
|
974 if ((x = ((AltResult)r).ex) != null) { |
|
975 return new CompletableFuture<V>(encodeThrowable(x, r)); |
|
976 } |
|
977 r = null; |
|
978 } |
|
979 try { |
|
980 @SuppressWarnings("unchecked") T t = (T) r; |
|
981 return f.apply(t).toCompletableFuture(); |
|
982 } catch (Throwable ex) { |
|
983 return new CompletableFuture<V>(encodeThrowable(ex)); |
|
984 } |
|
985 } |
|
986 CompletableFuture<V> d = new CompletableFuture<V>(); |
|
987 UniCompose<T,V> c = new UniCompose<T,V>(e, d, this, f); |
|
988 push(c); |
|
989 c.tryFire(SYNC); |
|
990 return d; |
|
991 } |
|
992 |
|
993 /* ------------- Two-input Completions -------------- */ |
|
994 |
|
995 /** A Completion for an action with two sources */ |
|
996 @SuppressWarnings("serial") |
|
997 abstract static class BiCompletion<T,U,V> extends UniCompletion<T,V> { |
|
998 CompletableFuture<U> snd; // second source for action |
|
999 BiCompletion(Executor executor, CompletableFuture<V> dep, |
|
1000 CompletableFuture<T> src, CompletableFuture<U> snd) { |
|
1001 super(executor, dep, src); this.snd = snd; |
|
1002 } |
|
1003 } |
|
1004 |
|
1005 /** A Completion delegating to a BiCompletion */ |
|
1006 @SuppressWarnings("serial") |
|
1007 static final class CoCompletion extends Completion { |
|
1008 BiCompletion<?,?,?> base; |
|
1009 CoCompletion(BiCompletion<?,?,?> base) { this.base = base; } |
|
1010 final CompletableFuture<?> tryFire(int mode) { |
|
1011 BiCompletion<?,?,?> c; CompletableFuture<?> d; |
|
1012 if ((c = base) == null || (d = c.tryFire(mode)) == null) |
|
1013 return null; |
|
1014 base = null; // detach |
|
1015 return d; |
|
1016 } |
|
1017 final boolean isLive() { |
|
1018 BiCompletion<?,?,?> c; |
|
1019 return (c = base) != null && c.dep != null; |
|
1020 } |
|
1021 } |
|
1022 |
|
1023 /** Pushes completion to this and b unless both done. */ |
|
1024 final void bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c) { |
|
1025 if (c != null) { |
|
1026 Object r; |
|
1027 while ((r = result) == null && !tryPushStack(c)) |
|
1028 lazySetNext(c, null); // clear on failure |
|
1029 if (b != null && b != this && b.result == null) { |
|
1030 Completion q = (r != null) ? c : new CoCompletion(c); |
|
1031 while (b.result == null && !b.tryPushStack(q)) |
|
1032 lazySetNext(q, null); // clear on failure |
|
1033 } |
|
1034 } |
|
1035 } |
|
1036 |
|
1037 /** Post-processing after successful BiCompletion tryFire. */ |
|
1038 final CompletableFuture<T> postFire(CompletableFuture<?> a, |
|
1039 CompletableFuture<?> b, int mode) { |
|
1040 if (b != null && b.stack != null) { // clean second source |
|
1041 if (mode < 0 || b.result == null) |
|
1042 b.cleanStack(); |
|
1043 else |
|
1044 b.postComplete(); |
|
1045 } |
|
1046 return postFire(a, mode); |
|
1047 } |
|
1048 |
|
1049 @SuppressWarnings("serial") |
|
1050 static final class BiApply<T,U,V> extends BiCompletion<T,U,V> { |
|
1051 BiFunction<? super T,? super U,? extends V> fn; |
|
1052 BiApply(Executor executor, CompletableFuture<V> dep, |
|
1053 CompletableFuture<T> src, CompletableFuture<U> snd, |
|
1054 BiFunction<? super T,? super U,? extends V> fn) { |
|
1055 super(executor, dep, src, snd); this.fn = fn; |
|
1056 } |
|
1057 final CompletableFuture<V> tryFire(int mode) { |
|
1058 CompletableFuture<V> d; |
|
1059 CompletableFuture<T> a; |
|
1060 CompletableFuture<U> b; |
|
1061 if ((d = dep) == null || |
|
1062 !d.biApply(a = src, b = snd, fn, mode > 0 ? null : this)) |
|
1063 return null; |
|
1064 dep = null; src = null; snd = null; fn = null; |
|
1065 return d.postFire(a, b, mode); |
|
1066 } |
|
1067 } |
|
1068 |
|
1069 final <R,S> boolean biApply(CompletableFuture<R> a, |
|
1070 CompletableFuture<S> b, |
|
1071 BiFunction<? super R,? super S,? extends T> f, |
|
1072 BiApply<R,S,T> c) { |
|
1073 Object r, s; Throwable x; |
|
1074 if (a == null || (r = a.result) == null || |
|
1075 b == null || (s = b.result) == null || f == null) |
|
1076 return false; |
|
1077 tryComplete: if (result == null) { |
|
1078 if (r instanceof AltResult) { |
|
1079 if ((x = ((AltResult)r).ex) != null) { |
|
1080 completeThrowable(x, r); |
|
1081 break tryComplete; |
|
1082 } |
|
1083 r = null; |
|
1084 } |
|
1085 if (s instanceof AltResult) { |
|
1086 if ((x = ((AltResult)s).ex) != null) { |
|
1087 completeThrowable(x, s); |
|
1088 break tryComplete; |
|
1089 } |
|
1090 s = null; |
|
1091 } |
|
1092 try { |
|
1093 if (c != null && !c.claim()) |
|
1094 return false; |
|
1095 @SuppressWarnings("unchecked") R rr = (R) r; |
|
1096 @SuppressWarnings("unchecked") S ss = (S) s; |
|
1097 completeValue(f.apply(rr, ss)); |
|
1098 } catch (Throwable ex) { |
|
1099 completeThrowable(ex); |
|
1100 } |
|
1101 } |
|
1102 return true; |
|
1103 } |
|
1104 |
|
1105 private <U,V> CompletableFuture<V> biApplyStage( |
|
1106 Executor e, CompletionStage<U> o, |
|
1107 BiFunction<? super T,? super U,? extends V> f) { |
|
1108 CompletableFuture<U> b; |
|
1109 if (f == null || (b = o.toCompletableFuture()) == null) |
|
1110 throw new NullPointerException(); |
|
1111 CompletableFuture<V> d = new CompletableFuture<V>(); |
|
1112 if (e != null || !d.biApply(this, b, f, null)) { |
|
1113 BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f); |
|
1114 bipush(b, c); |
|
1115 c.tryFire(SYNC); |
|
1116 } |
|
1117 return d; |
|
1118 } |
|
1119 |
|
1120 @SuppressWarnings("serial") |
|
1121 static final class BiAccept<T,U> extends BiCompletion<T,U,Void> { |
|
1122 BiConsumer<? super T,? super U> fn; |
|
1123 BiAccept(Executor executor, CompletableFuture<Void> dep, |
|
1124 CompletableFuture<T> src, CompletableFuture<U> snd, |
|
1125 BiConsumer<? super T,? super U> fn) { |
|
1126 super(executor, dep, src, snd); this.fn = fn; |
|
1127 } |
|
1128 final CompletableFuture<Void> tryFire(int mode) { |
|
1129 CompletableFuture<Void> d; |
|
1130 CompletableFuture<T> a; |
|
1131 CompletableFuture<U> b; |
|
1132 if ((d = dep) == null || |
|
1133 !d.biAccept(a = src, b = snd, fn, mode > 0 ? null : this)) |
|
1134 return null; |
|
1135 dep = null; src = null; snd = null; fn = null; |
|
1136 return d.postFire(a, b, mode); |
|
1137 } |
|
1138 } |
|
1139 |
|
1140 final <R,S> boolean biAccept(CompletableFuture<R> a, |
|
1141 CompletableFuture<S> b, |
|
1142 BiConsumer<? super R,? super S> f, |
|
1143 BiAccept<R,S> c) { |
|
1144 Object r, s; Throwable x; |
|
1145 if (a == null || (r = a.result) == null || |
|
1146 b == null || (s = b.result) == null || f == null) |
|
1147 return false; |
|
1148 tryComplete: if (result == null) { |
|
1149 if (r instanceof AltResult) { |
|
1150 if ((x = ((AltResult)r).ex) != null) { |
|
1151 completeThrowable(x, r); |
|
1152 break tryComplete; |
|
1153 } |
|
1154 r = null; |
|
1155 } |
|
1156 if (s instanceof AltResult) { |
|
1157 if ((x = ((AltResult)s).ex) != null) { |
|
1158 completeThrowable(x, s); |
|
1159 break tryComplete; |
|
1160 } |
|
1161 s = null; |
|
1162 } |
|
1163 try { |
|
1164 if (c != null && !c.claim()) |
|
1165 return false; |
|
1166 @SuppressWarnings("unchecked") R rr = (R) r; |
|
1167 @SuppressWarnings("unchecked") S ss = (S) s; |
|
1168 f.accept(rr, ss); |
|
1169 completeNull(); |
|
1170 } catch (Throwable ex) { |
|
1171 completeThrowable(ex); |
|
1172 } |
|
1173 } |
|
1174 return true; |
|
1175 } |
|
1176 |
|
1177 private <U> CompletableFuture<Void> biAcceptStage( |
|
1178 Executor e, CompletionStage<U> o, |
|
1179 BiConsumer<? super T,? super U> f) { |
|
1180 CompletableFuture<U> b; |
|
1181 if (f == null || (b = o.toCompletableFuture()) == null) |
|
1182 throw new NullPointerException(); |
|
1183 CompletableFuture<Void> d = new CompletableFuture<Void>(); |
|
1184 if (e != null || !d.biAccept(this, b, f, null)) { |
|
1185 BiAccept<T,U> c = new BiAccept<T,U>(e, d, this, b, f); |
|
1186 bipush(b, c); |
|
1187 c.tryFire(SYNC); |
|
1188 } |
|
1189 return d; |
|
1190 } |
|
1191 |
|
1192 @SuppressWarnings("serial") |
|
1193 static final class BiRun<T,U> extends BiCompletion<T,U,Void> { |
|
1194 Runnable fn; |
|
1195 BiRun(Executor executor, CompletableFuture<Void> dep, |
|
1196 CompletableFuture<T> src, |
|
1197 CompletableFuture<U> snd, |
|
1198 Runnable fn) { |
|
1199 super(executor, dep, src, snd); this.fn = fn; |
|
1200 } |
|
1201 final CompletableFuture<Void> tryFire(int mode) { |
|
1202 CompletableFuture<Void> d; |
|
1203 CompletableFuture<T> a; |
|
1204 CompletableFuture<U> b; |
|
1205 if ((d = dep) == null || |
|
1206 !d.biRun(a = src, b = snd, fn, mode > 0 ? null : this)) |
|
1207 return null; |
|
1208 dep = null; src = null; snd = null; fn = null; |
|
1209 return d.postFire(a, b, mode); |
|
1210 } |
|
1211 } |
|
1212 |
|
1213 final boolean biRun(CompletableFuture<?> a, CompletableFuture<?> b, |
|
1214 Runnable f, BiRun<?,?> c) { |
|
1215 Object r, s; Throwable x; |
|
1216 if (a == null || (r = a.result) == null || |
|
1217 b == null || (s = b.result) == null || f == null) |
|
1218 return false; |
|
1219 if (result == null) { |
|
1220 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) |
|
1221 completeThrowable(x, r); |
|
1222 else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null) |
|
1223 completeThrowable(x, s); |
|
1224 else |
|
1225 try { |
|
1226 if (c != null && !c.claim()) |
|
1227 return false; |
|
1228 f.run(); |
|
1229 completeNull(); |
|
1230 } catch (Throwable ex) { |
|
1231 completeThrowable(ex); |
|
1232 } |
|
1233 } |
|
1234 return true; |
|
1235 } |
|
1236 |
|
1237 private CompletableFuture<Void> biRunStage(Executor e, CompletionStage<?> o, |
|
1238 Runnable f) { |
|
1239 CompletableFuture<?> b; |
|
1240 if (f == null || (b = o.toCompletableFuture()) == null) |
|
1241 throw new NullPointerException(); |
|
1242 CompletableFuture<Void> d = new CompletableFuture<Void>(); |
|
1243 if (e != null || !d.biRun(this, b, f, null)) { |
|
1244 BiRun<T,?> c = new BiRun<>(e, d, this, b, f); |
|
1245 bipush(b, c); |
|
1246 c.tryFire(SYNC); |
|
1247 } |
|
1248 return d; |
|
1249 } |
|
1250 |
|
1251 @SuppressWarnings("serial") |
|
1252 static final class BiRelay<T,U> extends BiCompletion<T,U,Void> { // for And |
|
1253 BiRelay(CompletableFuture<Void> dep, |
|
1254 CompletableFuture<T> src, |
|
1255 CompletableFuture<U> snd) { |
|
1256 super(null, dep, src, snd); |
|
1257 } |
|
1258 final CompletableFuture<Void> tryFire(int mode) { |
|
1259 CompletableFuture<Void> d; |
|
1260 CompletableFuture<T> a; |
|
1261 CompletableFuture<U> b; |
|
1262 if ((d = dep) == null || !d.biRelay(a = src, b = snd)) |
|
1263 return null; |
|
1264 src = null; snd = null; dep = null; |
|
1265 return d.postFire(a, b, mode); |
|
1266 } |
|
1267 } |
|
1268 |
|
1269 boolean biRelay(CompletableFuture<?> a, CompletableFuture<?> b) { |
|
1270 Object r, s; Throwable x; |
|
1271 if (a == null || (r = a.result) == null || |
|
1272 b == null || (s = b.result) == null) |
|
1273 return false; |
|
1274 if (result == null) { |
|
1275 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) |
|
1276 completeThrowable(x, r); |
|
1277 else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null) |
|
1278 completeThrowable(x, s); |
|
1279 else |
|
1280 completeNull(); |
|
1281 } |
|
1282 return true; |
|
1283 } |
|
1284 |
|
1285 /** Recursively constructs a tree of completions. */ |
|
1286 static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs, |
|
1287 int lo, int hi) { |
|
1288 CompletableFuture<Void> d = new CompletableFuture<Void>(); |
|
1289 if (lo > hi) // empty |
|
1290 d.result = NIL; |
|
1291 else { |
|
1292 CompletableFuture<?> a, b; |
|
1293 int mid = (lo + hi) >>> 1; |
|
1294 if ((a = (lo == mid ? cfs[lo] : |
|
1295 andTree(cfs, lo, mid))) == null || |
|
1296 (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : |
|
1297 andTree(cfs, mid+1, hi))) == null) |
|
1298 throw new NullPointerException(); |
|
1299 if (!d.biRelay(a, b)) { |
|
1300 BiRelay<?,?> c = new BiRelay<>(d, a, b); |
|
1301 a.bipush(b, c); |
|
1302 c.tryFire(SYNC); |
|
1303 } |
|
1304 } |
|
1305 return d; |
|
1306 } |
|
1307 |
|
1308 /* ------------- Projected (Ored) BiCompletions -------------- */ |
|
1309 |
|
1310 /** Pushes completion to this and b unless either done. */ |
|
1311 final void orpush(CompletableFuture<?> b, BiCompletion<?,?,?> c) { |
|
1312 if (c != null) { |
|
1313 while ((b == null || b.result == null) && result == null) { |
|
1314 if (tryPushStack(c)) { |
|
1315 if (b != null && b != this && b.result == null) { |
|
1316 Completion q = new CoCompletion(c); |
|
1317 while (result == null && b.result == null && |
|
1318 !b.tryPushStack(q)) |
|
1319 lazySetNext(q, null); // clear on failure |
|
1320 } |
|
1321 break; |
|
1322 } |
|
1323 lazySetNext(c, null); // clear on failure |
|
1324 } |
|
1325 } |
|
1326 } |
|
1327 |
|
1328 @SuppressWarnings("serial") |
|
1329 static final class OrApply<T,U extends T,V> extends BiCompletion<T,U,V> { |
|
1330 Function<? super T,? extends V> fn; |
|
1331 OrApply(Executor executor, CompletableFuture<V> dep, |
|
1332 CompletableFuture<T> src, |
|
1333 CompletableFuture<U> snd, |
|
1334 Function<? super T,? extends V> fn) { |
|
1335 super(executor, dep, src, snd); this.fn = fn; |
|
1336 } |
|
1337 final CompletableFuture<V> tryFire(int mode) { |
|
1338 CompletableFuture<V> d; |
|
1339 CompletableFuture<T> a; |
|
1340 CompletableFuture<U> b; |
|
1341 if ((d = dep) == null || |
|
1342 !d.orApply(a = src, b = snd, fn, mode > 0 ? null : this)) |
|
1343 return null; |
|
1344 dep = null; src = null; snd = null; fn = null; |
|
1345 return d.postFire(a, b, mode); |
|
1346 } |
|
1347 } |
|
1348 |
|
1349 final <R,S extends R> boolean orApply(CompletableFuture<R> a, |
|
1350 CompletableFuture<S> b, |
|
1351 Function<? super R, ? extends T> f, |
|
1352 OrApply<R,S,T> c) { |
|
1353 Object r; Throwable x; |
|
1354 if (a == null || b == null || |
|
1355 ((r = a.result) == null && (r = b.result) == null) || f == null) |
|
1356 return false; |
|
1357 tryComplete: if (result == null) { |
|
1358 try { |
|
1359 if (c != null && !c.claim()) |
|
1360 return false; |
|
1361 if (r instanceof AltResult) { |
|
1362 if ((x = ((AltResult)r).ex) != null) { |
|
1363 completeThrowable(x, r); |
|
1364 break tryComplete; |
|
1365 } |
|
1366 r = null; |
|
1367 } |
|
1368 @SuppressWarnings("unchecked") R rr = (R) r; |
|
1369 completeValue(f.apply(rr)); |
|
1370 } catch (Throwable ex) { |
|
1371 completeThrowable(ex); |
|
1372 } |
|
1373 } |
|
1374 return true; |
|
1375 } |
|
1376 |
|
1377 private <U extends T,V> CompletableFuture<V> orApplyStage( |
|
1378 Executor e, CompletionStage<U> o, |
|
1379 Function<? super T, ? extends V> f) { |
|
1380 CompletableFuture<U> b; |
|
1381 if (f == null || (b = o.toCompletableFuture()) == null) |
|
1382 throw new NullPointerException(); |
|
1383 CompletableFuture<V> d = new CompletableFuture<V>(); |
|
1384 if (e != null || !d.orApply(this, b, f, null)) { |
|
1385 OrApply<T,U,V> c = new OrApply<T,U,V>(e, d, this, b, f); |
|
1386 orpush(b, c); |
|
1387 c.tryFire(SYNC); |
|
1388 } |
|
1389 return d; |
|
1390 } |
|
1391 |
|
1392 @SuppressWarnings("serial") |
|
1393 static final class OrAccept<T,U extends T> extends BiCompletion<T,U,Void> { |
|
1394 Consumer<? super T> fn; |
|
1395 OrAccept(Executor executor, CompletableFuture<Void> dep, |
|
1396 CompletableFuture<T> src, |
|
1397 CompletableFuture<U> snd, |
|
1398 Consumer<? super T> fn) { |
|
1399 super(executor, dep, src, snd); this.fn = fn; |
|
1400 } |
|
1401 final CompletableFuture<Void> tryFire(int mode) { |
|
1402 CompletableFuture<Void> d; |
|
1403 CompletableFuture<T> a; |
|
1404 CompletableFuture<U> b; |
|
1405 if ((d = dep) == null || |
|
1406 !d.orAccept(a = src, b = snd, fn, mode > 0 ? null : this)) |
|
1407 return null; |
|
1408 dep = null; src = null; snd = null; fn = null; |
|
1409 return d.postFire(a, b, mode); |
|
1410 } |
|
1411 } |
|
1412 |
|
1413 final <R,S extends R> boolean orAccept(CompletableFuture<R> a, |
|
1414 CompletableFuture<S> b, |
|
1415 Consumer<? super R> f, |
|
1416 OrAccept<R,S> c) { |
|
1417 Object r; Throwable x; |
|
1418 if (a == null || b == null || |
|
1419 ((r = a.result) == null && (r = b.result) == null) || f == null) |
|
1420 return false; |
|
1421 tryComplete: if (result == null) { |
|
1422 try { |
|
1423 if (c != null && !c.claim()) |
|
1424 return false; |
|
1425 if (r instanceof AltResult) { |
|
1426 if ((x = ((AltResult)r).ex) != null) { |
|
1427 completeThrowable(x, r); |
|
1428 break tryComplete; |
|
1429 } |
|
1430 r = null; |
|
1431 } |
|
1432 @SuppressWarnings("unchecked") R rr = (R) r; |
|
1433 f.accept(rr); |
|
1434 completeNull(); |
|
1435 } catch (Throwable ex) { |
|
1436 completeThrowable(ex); |
|
1437 } |
|
1438 } |
|
1439 return true; |
|
1440 } |
|
1441 |
|
1442 private <U extends T> CompletableFuture<Void> orAcceptStage( |
|
1443 Executor e, CompletionStage<U> o, Consumer<? super T> f) { |
|
1444 CompletableFuture<U> b; |
|
1445 if (f == null || (b = o.toCompletableFuture()) == null) |
|
1446 throw new NullPointerException(); |
|
1447 CompletableFuture<Void> d = new CompletableFuture<Void>(); |
|
1448 if (e != null || !d.orAccept(this, b, f, null)) { |
|
1449 OrAccept<T,U> c = new OrAccept<T,U>(e, d, this, b, f); |
|
1450 orpush(b, c); |
|
1451 c.tryFire(SYNC); |
|
1452 } |
|
1453 return d; |
|
1454 } |
|
1455 |
|
1456 @SuppressWarnings("serial") |
|
1457 static final class OrRun<T,U> extends BiCompletion<T,U,Void> { |
|
1458 Runnable fn; |
|
1459 OrRun(Executor executor, CompletableFuture<Void> dep, |
|
1460 CompletableFuture<T> src, |
|
1461 CompletableFuture<U> snd, |
|
1462 Runnable fn) { |
|
1463 super(executor, dep, src, snd); this.fn = fn; |
|
1464 } |
|
1465 final CompletableFuture<Void> tryFire(int mode) { |
|
1466 CompletableFuture<Void> d; |
|
1467 CompletableFuture<T> a; |
|
1468 CompletableFuture<U> b; |
|
1469 if ((d = dep) == null || |
|
1470 !d.orRun(a = src, b = snd, fn, mode > 0 ? null : this)) |
|
1471 return null; |
|
1472 dep = null; src = null; snd = null; fn = null; |
|
1473 return d.postFire(a, b, mode); |
|
1474 } |
|
1475 } |
|
1476 |
|
1477 final boolean orRun(CompletableFuture<?> a, CompletableFuture<?> b, |
|
1478 Runnable f, OrRun<?,?> c) { |
|
1479 Object r; Throwable x; |
|
1480 if (a == null || b == null || |
|
1481 ((r = a.result) == null && (r = b.result) == null) || f == null) |
|
1482 return false; |
|
1483 if (result == null) { |
|
1484 try { |
|
1485 if (c != null && !c.claim()) |
|
1486 return false; |
|
1487 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) |
|
1488 completeThrowable(x, r); |
|
1489 else { |
|
1490 f.run(); |
|
1491 completeNull(); |
|
1492 } |
|
1493 } catch (Throwable ex) { |
|
1494 completeThrowable(ex); |
|
1495 } |
|
1496 } |
|
1497 return true; |
|
1498 } |
|
1499 |
|
1500 private CompletableFuture<Void> orRunStage(Executor e, CompletionStage<?> o, |
|
1501 Runnable f) { |
|
1502 CompletableFuture<?> b; |
|
1503 if (f == null || (b = o.toCompletableFuture()) == null) |
|
1504 throw new NullPointerException(); |
|
1505 CompletableFuture<Void> d = new CompletableFuture<Void>(); |
|
1506 if (e != null || !d.orRun(this, b, f, null)) { |
|
1507 OrRun<T,?> c = new OrRun<>(e, d, this, b, f); |
|
1508 orpush(b, c); |
|
1509 c.tryFire(SYNC); |
|
1510 } |
|
1511 return d; |
|
1512 } |
|
1513 |
|
1514 @SuppressWarnings("serial") |
|
1515 static final class OrRelay<T,U> extends BiCompletion<T,U,Object> { // for Or |
|
1516 OrRelay(CompletableFuture<Object> dep, CompletableFuture<T> src, |
|
1517 CompletableFuture<U> snd) { |
|
1518 super(null, dep, src, snd); |
|
1519 } |
|
1520 final CompletableFuture<Object> tryFire(int mode) { |
|
1521 CompletableFuture<Object> d; |
|
1522 CompletableFuture<T> a; |
|
1523 CompletableFuture<U> b; |
|
1524 if ((d = dep) == null || !d.orRelay(a = src, b = snd)) |
|
1525 return null; |
|
1526 src = null; snd = null; dep = null; |
|
1527 return d.postFire(a, b, mode); |
|
1528 } |
|
1529 } |
|
1530 |
|
1531 final boolean orRelay(CompletableFuture<?> a, CompletableFuture<?> b) { |
|
1532 Object r; |
|
1533 if (a == null || b == null || |
|
1534 ((r = a.result) == null && (r = b.result) == null)) |
|
1535 return false; |
204 if (result == null) |
1536 if (result == null) |
205 UNSAFE.compareAndSwapObject |
1537 completeRelay(r); |
206 (this, RESULT, null, |
1538 return true; |
207 (ex == null) ? (v == null) ? NIL : v : |
1539 } |
208 new AltResult((ex instanceof CompletionException) ? ex : |
1540 |
209 new CompletionException(ex))); |
1541 /** Recursively constructs a tree of completions. */ |
210 postComplete(); // help out even if not triggered |
1542 static CompletableFuture<Object> orTree(CompletableFuture<?>[] cfs, |
211 } |
1543 int lo, int hi) { |
212 |
1544 CompletableFuture<Object> d = new CompletableFuture<Object>(); |
213 /** |
1545 if (lo <= hi) { |
214 * If triggered, helps release and/or process completions. |
1546 CompletableFuture<?> a, b; |
215 */ |
1547 int mid = (lo + hi) >>> 1; |
216 final void helpPostComplete() { |
1548 if ((a = (lo == mid ? cfs[lo] : |
217 if (result != null) |
1549 orTree(cfs, lo, mid))) == null || |
218 postComplete(); |
1550 (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : |
219 } |
1551 orTree(cfs, mid+1, hi))) == null) |
220 |
1552 throw new NullPointerException(); |
221 /* ------------- waiting for completions -------------- */ |
1553 if (!d.orRelay(a, b)) { |
222 |
1554 OrRelay<?,?> c = new OrRelay<>(d, a, b); |
223 /** Number of processors, for spin control */ |
1555 a.orpush(b, c); |
224 static final int NCPU = Runtime.getRuntime().availableProcessors(); |
1556 c.tryFire(SYNC); |
225 |
1557 } |
226 /** |
1558 } |
227 * Heuristic spin value for waitingGet() before blocking on |
1559 return d; |
228 * multiprocessors |
1560 } |
229 */ |
1561 |
230 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0; |
1562 /* ------------- Zero-input Async forms -------------- */ |
231 |
1563 |
232 /** |
1564 @SuppressWarnings("serial") |
233 * Linked nodes to record waiting threads in a Treiber stack. See |
1565 static final class AsyncSupply<T> extends ForkJoinTask<Void> |
234 * other classes such as Phaser and SynchronousQueue for more |
1566 implements Runnable, AsynchronousCompletionTask { |
235 * detailed explanation. This class implements ManagedBlocker to |
1567 CompletableFuture<T> dep; Supplier<T> fn; |
236 * avoid starvation when blocking actions pile up in |
1568 AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) { |
237 * ForkJoinPools. |
1569 this.dep = dep; this.fn = fn; |
238 */ |
1570 } |
239 static final class WaitNode implements ForkJoinPool.ManagedBlocker { |
1571 |
240 long nanos; // wait time if timed |
1572 public final Void getRawResult() { return null; } |
241 final long deadline; // non-zero if timed |
1573 public final void setRawResult(Void v) {} |
|
1574 public final boolean exec() { run(); return true; } |
|
1575 |
|
1576 public void run() { |
|
1577 CompletableFuture<T> d; Supplier<T> f; |
|
1578 if ((d = dep) != null && (f = fn) != null) { |
|
1579 dep = null; fn = null; |
|
1580 if (d.result == null) { |
|
1581 try { |
|
1582 d.completeValue(f.get()); |
|
1583 } catch (Throwable ex) { |
|
1584 d.completeThrowable(ex); |
|
1585 } |
|
1586 } |
|
1587 d.postComplete(); |
|
1588 } |
|
1589 } |
|
1590 } |
|
1591 |
|
1592 static <U> CompletableFuture<U> asyncSupplyStage(Executor e, |
|
1593 Supplier<U> f) { |
|
1594 if (f == null) throw new NullPointerException(); |
|
1595 CompletableFuture<U> d = new CompletableFuture<U>(); |
|
1596 e.execute(new AsyncSupply<U>(d, f)); |
|
1597 return d; |
|
1598 } |
|
1599 |
|
1600 @SuppressWarnings("serial") |
|
1601 static final class AsyncRun extends ForkJoinTask<Void> |
|
1602 implements Runnable, AsynchronousCompletionTask { |
|
1603 CompletableFuture<Void> dep; Runnable fn; |
|
1604 AsyncRun(CompletableFuture<Void> dep, Runnable fn) { |
|
1605 this.dep = dep; this.fn = fn; |
|
1606 } |
|
1607 |
|
1608 public final Void getRawResult() { return null; } |
|
1609 public final void setRawResult(Void v) {} |
|
1610 public final boolean exec() { run(); return true; } |
|
1611 |
|
1612 public void run() { |
|
1613 CompletableFuture<Void> d; Runnable f; |
|
1614 if ((d = dep) != null && (f = fn) != null) { |
|
1615 dep = null; fn = null; |
|
1616 if (d.result == null) { |
|
1617 try { |
|
1618 f.run(); |
|
1619 d.completeNull(); |
|
1620 } catch (Throwable ex) { |
|
1621 d.completeThrowable(ex); |
|
1622 } |
|
1623 } |
|
1624 d.postComplete(); |
|
1625 } |
|
1626 } |
|
1627 } |
|
1628 |
|
1629 static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) { |
|
1630 if (f == null) throw new NullPointerException(); |
|
1631 CompletableFuture<Void> d = new CompletableFuture<Void>(); |
|
1632 e.execute(new AsyncRun(d, f)); |
|
1633 return d; |
|
1634 } |
|
1635 |
|
1636 /* ------------- Signallers -------------- */ |
|
1637 |
|
1638 /** |
|
1639 * Completion for recording and releasing a waiting thread. This |
|
1640 * class implements ManagedBlocker to avoid starvation when |
|
1641 * blocking actions pile up in ForkJoinPools. |
|
1642 */ |
|
1643 @SuppressWarnings("serial") |
|
1644 static final class Signaller extends Completion |
|
1645 implements ForkJoinPool.ManagedBlocker { |
|
1646 long nanos; // wait time if timed |
|
1647 final long deadline; // non-zero if timed |
242 volatile int interruptControl; // > 0: interruptible, < 0: interrupted |
1648 volatile int interruptControl; // > 0: interruptible, < 0: interrupted |
243 volatile Thread thread; |
1649 volatile Thread thread; |
244 volatile WaitNode next; |
1650 |
245 WaitNode(boolean interruptible, long nanos, long deadline) { |
1651 Signaller(boolean interruptible, long nanos, long deadline) { |
246 this.thread = Thread.currentThread(); |
1652 this.thread = Thread.currentThread(); |
247 this.interruptControl = interruptible ? 1 : 0; |
1653 this.interruptControl = interruptible ? 1 : 0; |
248 this.nanos = nanos; |
1654 this.nanos = nanos; |
249 this.deadline = deadline; |
1655 this.deadline = deadline; |
|
1656 } |
|
1657 final CompletableFuture<?> tryFire(int ignore) { |
|
1658 Thread w; // no need to atomically claim |
|
1659 if ((w = thread) != null) { |
|
1660 thread = null; |
|
1661 LockSupport.unpark(w); |
|
1662 } |
|
1663 return null; |
250 } |
1664 } |
251 public boolean isReleasable() { |
1665 public boolean isReleasable() { |
252 if (thread == null) |
1666 if (thread == null) |
253 return true; |
1667 return true; |
254 if (Thread.interrupted()) { |
1668 if (Thread.interrupted()) { |
271 LockSupport.park(this); |
1685 LockSupport.park(this); |
272 else if (nanos > 0L) |
1686 else if (nanos > 0L) |
273 LockSupport.parkNanos(this, nanos); |
1687 LockSupport.parkNanos(this, nanos); |
274 return isReleasable(); |
1688 return isReleasable(); |
275 } |
1689 } |
|
1690 final boolean isLive() { return thread != null; } |
276 } |
1691 } |
277 |
1692 |
278 /** |
1693 /** |
279 * Returns raw result after waiting, or null if interruptible and |
1694 * Returns raw result after waiting, or null if interruptible and |
280 * interrupted. |
1695 * interrupted. |
281 */ |
1696 */ |
282 private Object waitingGet(boolean interruptible) { |
1697 private Object waitingGet(boolean interruptible) { |
283 WaitNode q = null; |
1698 Signaller q = null; |
284 boolean queued = false; |
1699 boolean queued = false; |
285 int spins = SPINS; |
1700 int spins = -1; |
286 for (Object r;;) { |
1701 Object r; |
287 if ((r = result) != null) { |
1702 while ((r = result) == null) { |
288 if (q != null) { // suppress unpark |
1703 if (spins < 0) |
289 q.thread = null; |
1704 spins = (Runtime.getRuntime().availableProcessors() > 1) ? |
290 if (q.interruptControl < 0) { |
1705 1 << 8 : 0; // Use brief spin-wait on multiprocessors |
291 if (interruptible) { |
|
292 removeWaiter(q); |
|
293 return null; |
|
294 } |
|
295 Thread.currentThread().interrupt(); |
|
296 } |
|
297 } |
|
298 postComplete(); // help release others |
|
299 return r; |
|
300 } |
|
301 else if (spins > 0) { |
1706 else if (spins > 0) { |
302 int rnd = ThreadLocalRandom.nextSecondarySeed(); |
1707 if (ThreadLocalRandom.nextSecondarySeed() >= 0) |
303 if (rnd == 0) |
|
304 rnd = ThreadLocalRandom.current().nextInt(); |
|
305 if (rnd >= 0) |
|
306 --spins; |
1708 --spins; |
307 } |
1709 } |
308 else if (q == null) |
1710 else if (q == null) |
309 q = new WaitNode(interruptible, 0L, 0L); |
1711 q = new Signaller(interruptible, 0L, 0L); |
310 else if (!queued) |
1712 else if (!queued) |
311 queued = UNSAFE.compareAndSwapObject(this, WAITERS, |
1713 queued = tryPushStack(q); |
312 q.next = waiters, q); |
|
313 else if (interruptible && q.interruptControl < 0) { |
1714 else if (interruptible && q.interruptControl < 0) { |
314 removeWaiter(q); |
1715 q.thread = null; |
|
1716 cleanStack(); |
315 return null; |
1717 return null; |
316 } |
1718 } |
317 else if (q.thread != null && result == null) { |
1719 else if (q.thread != null && result == null) { |
318 try { |
1720 try { |
319 ForkJoinPool.managedBlock(q); |
1721 ForkJoinPool.managedBlock(q); |
320 } catch (InterruptedException ex) { |
1722 } catch (InterruptedException ie) { |
321 q.interruptControl = -1; |
1723 q.interruptControl = -1; |
322 } |
1724 } |
323 } |
1725 } |
324 } |
1726 } |
325 } |
1727 if (q != null) { |
326 |
1728 q.thread = null; |
327 /** |
1729 if (q.interruptControl < 0) { |
328 * Awaits completion or aborts on interrupt or timeout. |
1730 if (interruptible) |
329 * |
1731 r = null; // report interruption |
330 * @param nanos time to wait |
1732 else |
331 * @return raw result |
1733 Thread.currentThread().interrupt(); |
332 */ |
1734 } |
333 private Object timedAwaitDone(long nanos) |
1735 } |
334 throws InterruptedException, TimeoutException { |
1736 postComplete(); |
335 WaitNode q = null; |
1737 return r; |
|
1738 } |
|
1739 |
|
1740 /** |
|
1741 * Returns raw result after waiting, or null if interrupted, or |
|
1742 * throws TimeoutException on timeout. |
|
1743 */ |
|
1744 private Object timedGet(long nanos) throws TimeoutException { |
|
1745 if (Thread.interrupted()) |
|
1746 return null; |
|
1747 if (nanos <= 0L) |
|
1748 throw new TimeoutException(); |
|
1749 long d = System.nanoTime() + nanos; |
|
1750 Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0 |
336 boolean queued = false; |
1751 boolean queued = false; |
337 for (Object r;;) { |
1752 Object r; |
338 if ((r = result) != null) { |
1753 // We intentionally don't spin here (as waitingGet does) because |
339 if (q != null) { |
1754 // the call to nanoTime() above acts much like a spin. |
340 q.thread = null; |
1755 while ((r = result) == null) { |
341 if (q.interruptControl < 0) { |
1756 if (!queued) |
342 removeWaiter(q); |
1757 queued = tryPushStack(q); |
343 throw new InterruptedException(); |
1758 else if (q.interruptControl < 0 || q.nanos <= 0L) { |
344 } |
1759 q.thread = null; |
345 } |
1760 cleanStack(); |
346 postComplete(); |
1761 if (q.interruptControl < 0) |
347 return r; |
1762 return null; |
348 } |
1763 throw new TimeoutException(); |
349 else if (q == null) { |
|
350 if (nanos <= 0L) |
|
351 throw new TimeoutException(); |
|
352 long d = System.nanoTime() + nanos; |
|
353 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0 |
|
354 } |
|
355 else if (!queued) |
|
356 queued = UNSAFE.compareAndSwapObject(this, WAITERS, |
|
357 q.next = waiters, q); |
|
358 else if (q.interruptControl < 0) { |
|
359 removeWaiter(q); |
|
360 throw new InterruptedException(); |
|
361 } |
|
362 else if (q.nanos <= 0L) { |
|
363 if (result == null) { |
|
364 removeWaiter(q); |
|
365 throw new TimeoutException(); |
|
366 } |
|
367 } |
1764 } |
368 else if (q.thread != null && result == null) { |
1765 else if (q.thread != null && result == null) { |
369 try { |
1766 try { |
370 ForkJoinPool.managedBlock(q); |
1767 ForkJoinPool.managedBlock(q); |
371 } catch (InterruptedException ex) { |
1768 } catch (InterruptedException ie) { |
372 q.interruptControl = -1; |
1769 q.interruptControl = -1; |
373 } |
1770 } |
374 } |
1771 } |
375 } |
1772 } |
376 } |
1773 if (q.interruptControl < 0) |
377 |
1774 r = null; |
378 /** |
1775 q.thread = null; |
379 * Tries to unlink a timed-out or interrupted wait node to avoid |
1776 postComplete(); |
380 * accumulating garbage. Internal nodes are simply unspliced |
1777 return r; |
381 * without CAS since it is harmless if they are traversed anyway |
1778 } |
382 * by releasers. To avoid effects of unsplicing from already |
1779 |
383 * removed nodes, the list is retraversed in case of an apparent |
1780 /* ------------- public methods -------------- */ |
384 * race. This is slow when there are a lot of nodes, but we don't |
|
385 * expect lists to be long enough to outweigh higher-overhead |
|
386 * schemes. |
|
387 */ |
|
388 private void removeWaiter(WaitNode node) { |
|
389 if (node != null) { |
|
390 node.thread = null; |
|
391 retry: |
|
392 for (;;) { // restart on removeWaiter race |
|
393 for (WaitNode pred = null, q = waiters, s; q != null; q = s) { |
|
394 s = q.next; |
|
395 if (q.thread != null) |
|
396 pred = q; |
|
397 else if (pred != null) { |
|
398 pred.next = s; |
|
399 if (pred.thread == null) // check for race |
|
400 continue retry; |
|
401 } |
|
402 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s)) |
|
403 continue retry; |
|
404 } |
|
405 break; |
|
406 } |
|
407 } |
|
408 } |
|
409 |
|
410 /* ------------- Async tasks -------------- */ |
|
411 |
|
412 /** |
|
413 * A marker interface identifying asynchronous tasks produced by |
|
414 * {@code async} methods. This may be useful for monitoring, |
|
415 * debugging, and tracking asynchronous activities. |
|
416 * |
|
417 * @since 1.8 |
|
418 */ |
|
419 public static interface AsynchronousCompletionTask { |
|
420 } |
|
421 |
|
422 /** Base class can act as either FJ or plain Runnable */ |
|
423 @SuppressWarnings("serial") |
|
424 abstract static class Async extends ForkJoinTask<Void> |
|
425 implements Runnable, AsynchronousCompletionTask { |
|
426 public final Void getRawResult() { return null; } |
|
427 public final void setRawResult(Void v) { } |
|
428 public final void run() { exec(); } |
|
429 } |
|
430 |
|
431 /** |
|
432 * Starts the given async task using the given executor, unless |
|
433 * the executor is ForkJoinPool.commonPool and it has been |
|
434 * disabled, in which case starts a new thread. |
|
435 */ |
|
436 static void execAsync(Executor e, Async r) { |
|
437 if (e == ForkJoinPool.commonPool() && |
|
438 ForkJoinPool.getCommonPoolParallelism() <= 1) |
|
439 new Thread(r).start(); |
|
440 else |
|
441 e.execute(r); |
|
442 } |
|
443 |
|
444 static final class AsyncRun extends Async { |
|
445 final Runnable fn; |
|
446 final CompletableFuture<Void> dst; |
|
447 AsyncRun(Runnable fn, CompletableFuture<Void> dst) { |
|
448 this.fn = fn; this.dst = dst; |
|
449 } |
|
450 public final boolean exec() { |
|
451 CompletableFuture<Void> d; Throwable ex; |
|
452 if ((d = this.dst) != null && d.result == null) { |
|
453 try { |
|
454 fn.run(); |
|
455 ex = null; |
|
456 } catch (Throwable rex) { |
|
457 ex = rex; |
|
458 } |
|
459 d.internalComplete(null, ex); |
|
460 } |
|
461 return true; |
|
462 } |
|
463 private static final long serialVersionUID = 5232453952276885070L; |
|
464 } |
|
465 |
|
466 static final class AsyncSupply<U> extends Async { |
|
467 final Supplier<U> fn; |
|
468 final CompletableFuture<U> dst; |
|
469 AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) { |
|
470 this.fn = fn; this.dst = dst; |
|
471 } |
|
472 public final boolean exec() { |
|
473 CompletableFuture<U> d; U u; Throwable ex; |
|
474 if ((d = this.dst) != null && d.result == null) { |
|
475 try { |
|
476 u = fn.get(); |
|
477 ex = null; |
|
478 } catch (Throwable rex) { |
|
479 ex = rex; |
|
480 u = null; |
|
481 } |
|
482 d.internalComplete(u, ex); |
|
483 } |
|
484 return true; |
|
485 } |
|
486 private static final long serialVersionUID = 5232453952276885070L; |
|
487 } |
|
488 |
|
489 static final class AsyncApply<T,U> extends Async { |
|
490 final T arg; |
|
491 final Function<? super T,? extends U> fn; |
|
492 final CompletableFuture<U> dst; |
|
493 AsyncApply(T arg, Function<? super T,? extends U> fn, |
|
494 CompletableFuture<U> dst) { |
|
495 this.arg = arg; this.fn = fn; this.dst = dst; |
|
496 } |
|
497 public final boolean exec() { |
|
498 CompletableFuture<U> d; U u; Throwable ex; |
|
499 if ((d = this.dst) != null && d.result == null) { |
|
500 try { |
|
501 u = fn.apply(arg); |
|
502 ex = null; |
|
503 } catch (Throwable rex) { |
|
504 ex = rex; |
|
505 u = null; |
|
506 } |
|
507 d.internalComplete(u, ex); |
|
508 } |
|
509 return true; |
|
510 } |
|
511 private static final long serialVersionUID = 5232453952276885070L; |
|
512 } |
|
513 |
|
514 static final class AsyncCombine<T,U,V> extends Async { |
|
515 final T arg1; |
|
516 final U arg2; |
|
517 final BiFunction<? super T,? super U,? extends V> fn; |
|
518 final CompletableFuture<V> dst; |
|
519 AsyncCombine(T arg1, U arg2, |
|
520 BiFunction<? super T,? super U,? extends V> fn, |
|
521 CompletableFuture<V> dst) { |
|
522 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst; |
|
523 } |
|
524 public final boolean exec() { |
|
525 CompletableFuture<V> d; V v; Throwable ex; |
|
526 if ((d = this.dst) != null && d.result == null) { |
|
527 try { |
|
528 v = fn.apply(arg1, arg2); |
|
529 ex = null; |
|
530 } catch (Throwable rex) { |
|
531 ex = rex; |
|
532 v = null; |
|
533 } |
|
534 d.internalComplete(v, ex); |
|
535 } |
|
536 return true; |
|
537 } |
|
538 private static final long serialVersionUID = 5232453952276885070L; |
|
539 } |
|
540 |
|
541 static final class AsyncAccept<T> extends Async { |
|
542 final T arg; |
|
543 final Consumer<? super T> fn; |
|
544 final CompletableFuture<?> dst; |
|
545 AsyncAccept(T arg, Consumer<? super T> fn, |
|
546 CompletableFuture<?> dst) { |
|
547 this.arg = arg; this.fn = fn; this.dst = dst; |
|
548 } |
|
549 public final boolean exec() { |
|
550 CompletableFuture<?> d; Throwable ex; |
|
551 if ((d = this.dst) != null && d.result == null) { |
|
552 try { |
|
553 fn.accept(arg); |
|
554 ex = null; |
|
555 } catch (Throwable rex) { |
|
556 ex = rex; |
|
557 } |
|
558 d.internalComplete(null, ex); |
|
559 } |
|
560 return true; |
|
561 } |
|
562 private static final long serialVersionUID = 5232453952276885070L; |
|
563 } |
|
564 |
|
565 static final class AsyncAcceptBoth<T,U> extends Async { |
|
566 final T arg1; |
|
567 final U arg2; |
|
568 final BiConsumer<? super T,? super U> fn; |
|
569 final CompletableFuture<?> dst; |
|
570 AsyncAcceptBoth(T arg1, U arg2, |
|
571 BiConsumer<? super T,? super U> fn, |
|
572 CompletableFuture<?> dst) { |
|
573 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst; |
|
574 } |
|
575 public final boolean exec() { |
|
576 CompletableFuture<?> d; Throwable ex; |
|
577 if ((d = this.dst) != null && d.result == null) { |
|
578 try { |
|
579 fn.accept(arg1, arg2); |
|
580 ex = null; |
|
581 } catch (Throwable rex) { |
|
582 ex = rex; |
|
583 } |
|
584 d.internalComplete(null, ex); |
|
585 } |
|
586 return true; |
|
587 } |
|
588 private static final long serialVersionUID = 5232453952276885070L; |
|
589 } |
|
590 |
|
591 static final class AsyncCompose<T,U> extends Async { |
|
592 final T arg; |
|
593 final Function<? super T, ? extends CompletionStage<U>> fn; |
|
594 final CompletableFuture<U> dst; |
|
595 AsyncCompose(T arg, |
|
596 Function<? super T, ? extends CompletionStage<U>> fn, |
|
597 CompletableFuture<U> dst) { |
|
598 this.arg = arg; this.fn = fn; this.dst = dst; |
|
599 } |
|
600 public final boolean exec() { |
|
601 CompletableFuture<U> d, fr; U u; Throwable ex; |
|
602 if ((d = this.dst) != null && d.result == null) { |
|
603 try { |
|
604 CompletionStage<U> cs = fn.apply(arg); |
|
605 fr = (cs == null) ? null : cs.toCompletableFuture(); |
|
606 ex = (fr == null) ? new NullPointerException() : null; |
|
607 } catch (Throwable rex) { |
|
608 ex = rex; |
|
609 fr = null; |
|
610 } |
|
611 if (ex != null) |
|
612 u = null; |
|
613 else { |
|
614 Object r = fr.result; |
|
615 if (r == null) |
|
616 r = fr.waitingGet(false); |
|
617 if (r instanceof AltResult) { |
|
618 ex = ((AltResult)r).ex; |
|
619 u = null; |
|
620 } |
|
621 else { |
|
622 @SuppressWarnings("unchecked") U ur = (U) r; |
|
623 u = ur; |
|
624 } |
|
625 } |
|
626 d.internalComplete(u, ex); |
|
627 } |
|
628 return true; |
|
629 } |
|
630 private static final long serialVersionUID = 5232453952276885070L; |
|
631 } |
|
632 |
|
633 static final class AsyncWhenComplete<T> extends Async { |
|
634 final T arg1; |
|
635 final Throwable arg2; |
|
636 final BiConsumer<? super T,? super Throwable> fn; |
|
637 final CompletableFuture<T> dst; |
|
638 AsyncWhenComplete(T arg1, Throwable arg2, |
|
639 BiConsumer<? super T,? super Throwable> fn, |
|
640 CompletableFuture<T> dst) { |
|
641 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst; |
|
642 } |
|
643 public final boolean exec() { |
|
644 CompletableFuture<T> d; |
|
645 if ((d = this.dst) != null && d.result == null) { |
|
646 Throwable ex = arg2; |
|
647 try { |
|
648 fn.accept(arg1, ex); |
|
649 } catch (Throwable rex) { |
|
650 if (ex == null) |
|
651 ex = rex; |
|
652 } |
|
653 d.internalComplete(arg1, ex); |
|
654 } |
|
655 return true; |
|
656 } |
|
657 private static final long serialVersionUID = 5232453952276885070L; |
|
658 } |
|
659 |
|
660 /* ------------- Completions -------------- */ |
|
661 |
|
662 /** |
|
663 * Simple linked list nodes to record completions, used in |
|
664 * basically the same way as WaitNodes. (We separate nodes from |
|
665 * the Completions themselves mainly because for the And and Or |
|
666 * methods, the same Completion object resides in two lists.) |
|
667 */ |
|
668 static final class CompletionNode { |
|
669 final Completion completion; |
|
670 volatile CompletionNode next; |
|
671 CompletionNode(Completion completion) { this.completion = completion; } |
|
672 } |
|
673 |
|
674 // Opportunistically subclass AtomicInteger to use compareAndSet to claim. |
|
675 @SuppressWarnings("serial") |
|
676 abstract static class Completion extends AtomicInteger implements Runnable { |
|
677 } |
|
678 |
|
679 static final class ThenApply<T,U> extends Completion { |
|
680 final CompletableFuture<? extends T> src; |
|
681 final Function<? super T,? extends U> fn; |
|
682 final CompletableFuture<U> dst; |
|
683 final Executor executor; |
|
684 ThenApply(CompletableFuture<? extends T> src, |
|
685 Function<? super T,? extends U> fn, |
|
686 CompletableFuture<U> dst, |
|
687 Executor executor) { |
|
688 this.src = src; this.fn = fn; this.dst = dst; |
|
689 this.executor = executor; |
|
690 } |
|
691 public final void run() { |
|
692 final CompletableFuture<? extends T> a; |
|
693 final Function<? super T,? extends U> fn; |
|
694 final CompletableFuture<U> dst; |
|
695 Object r; T t; Throwable ex; |
|
696 if ((dst = this.dst) != null && |
|
697 (fn = this.fn) != null && |
|
698 (a = this.src) != null && |
|
699 (r = a.result) != null && |
|
700 compareAndSet(0, 1)) { |
|
701 if (r instanceof AltResult) { |
|
702 ex = ((AltResult)r).ex; |
|
703 t = null; |
|
704 } |
|
705 else { |
|
706 ex = null; |
|
707 @SuppressWarnings("unchecked") T tr = (T) r; |
|
708 t = tr; |
|
709 } |
|
710 Executor e = executor; |
|
711 U u = null; |
|
712 if (ex == null) { |
|
713 try { |
|
714 if (e != null) |
|
715 execAsync(e, new AsyncApply<T,U>(t, fn, dst)); |
|
716 else |
|
717 u = fn.apply(t); |
|
718 } catch (Throwable rex) { |
|
719 ex = rex; |
|
720 } |
|
721 } |
|
722 if (e == null || ex != null) |
|
723 dst.internalComplete(u, ex); |
|
724 } |
|
725 } |
|
726 private static final long serialVersionUID = 5232453952276885070L; |
|
727 } |
|
728 |
|
729 static final class ThenAccept<T> extends Completion { |
|
730 final CompletableFuture<? extends T> src; |
|
731 final Consumer<? super T> fn; |
|
732 final CompletableFuture<?> dst; |
|
733 final Executor executor; |
|
734 ThenAccept(CompletableFuture<? extends T> src, |
|
735 Consumer<? super T> fn, |
|
736 CompletableFuture<?> dst, |
|
737 Executor executor) { |
|
738 this.src = src; this.fn = fn; this.dst = dst; |
|
739 this.executor = executor; |
|
740 } |
|
741 public final void run() { |
|
742 final CompletableFuture<? extends T> a; |
|
743 final Consumer<? super T> fn; |
|
744 final CompletableFuture<?> dst; |
|
745 Object r; T t; Throwable ex; |
|
746 if ((dst = this.dst) != null && |
|
747 (fn = this.fn) != null && |
|
748 (a = this.src) != null && |
|
749 (r = a.result) != null && |
|
750 compareAndSet(0, 1)) { |
|
751 if (r instanceof AltResult) { |
|
752 ex = ((AltResult)r).ex; |
|
753 t = null; |
|
754 } |
|
755 else { |
|
756 ex = null; |
|
757 @SuppressWarnings("unchecked") T tr = (T) r; |
|
758 t = tr; |
|
759 } |
|
760 Executor e = executor; |
|
761 if (ex == null) { |
|
762 try { |
|
763 if (e != null) |
|
764 execAsync(e, new AsyncAccept<T>(t, fn, dst)); |
|
765 else |
|
766 fn.accept(t); |
|
767 } catch (Throwable rex) { |
|
768 ex = rex; |
|
769 } |
|
770 } |
|
771 if (e == null || ex != null) |
|
772 dst.internalComplete(null, ex); |
|
773 } |
|
774 } |
|
775 private static final long serialVersionUID = 5232453952276885070L; |
|
776 } |
|
777 |
|
778 static final class ThenRun extends Completion { |
|
779 final CompletableFuture<?> src; |
|
780 final Runnable fn; |
|
781 final CompletableFuture<Void> dst; |
|
782 final Executor executor; |
|
783 ThenRun(CompletableFuture<?> src, |
|
784 Runnable fn, |
|
785 CompletableFuture<Void> dst, |
|
786 Executor executor) { |
|
787 this.src = src; this.fn = fn; this.dst = dst; |
|
788 this.executor = executor; |
|
789 } |
|
790 public final void run() { |
|
791 final CompletableFuture<?> a; |
|
792 final Runnable fn; |
|
793 final CompletableFuture<Void> dst; |
|
794 Object r; Throwable ex; |
|
795 if ((dst = this.dst) != null && |
|
796 (fn = this.fn) != null && |
|
797 (a = this.src) != null && |
|
798 (r = a.result) != null && |
|
799 compareAndSet(0, 1)) { |
|
800 if (r instanceof AltResult) |
|
801 ex = ((AltResult)r).ex; |
|
802 else |
|
803 ex = null; |
|
804 Executor e = executor; |
|
805 if (ex == null) { |
|
806 try { |
|
807 if (e != null) |
|
808 execAsync(e, new AsyncRun(fn, dst)); |
|
809 else |
|
810 fn.run(); |
|
811 } catch (Throwable rex) { |
|
812 ex = rex; |
|
813 } |
|
814 } |
|
815 if (e == null || ex != null) |
|
816 dst.internalComplete(null, ex); |
|
817 } |
|
818 } |
|
819 private static final long serialVersionUID = 5232453952276885070L; |
|
820 } |
|
821 |
|
822 static final class ThenCombine<T,U,V> extends Completion { |
|
823 final CompletableFuture<? extends T> src; |
|
824 final CompletableFuture<? extends U> snd; |
|
825 final BiFunction<? super T,? super U,? extends V> fn; |
|
826 final CompletableFuture<V> dst; |
|
827 final Executor executor; |
|
828 ThenCombine(CompletableFuture<? extends T> src, |
|
829 CompletableFuture<? extends U> snd, |
|
830 BiFunction<? super T,? super U,? extends V> fn, |
|
831 CompletableFuture<V> dst, |
|
832 Executor executor) { |
|
833 this.src = src; this.snd = snd; |
|
834 this.fn = fn; this.dst = dst; |
|
835 this.executor = executor; |
|
836 } |
|
837 public final void run() { |
|
838 final CompletableFuture<? extends T> a; |
|
839 final CompletableFuture<? extends U> b; |
|
840 final BiFunction<? super T,? super U,? extends V> fn; |
|
841 final CompletableFuture<V> dst; |
|
842 Object r, s; T t; U u; Throwable ex; |
|
843 if ((dst = this.dst) != null && |
|
844 (fn = this.fn) != null && |
|
845 (a = this.src) != null && |
|
846 (r = a.result) != null && |
|
847 (b = this.snd) != null && |
|
848 (s = b.result) != null && |
|
849 compareAndSet(0, 1)) { |
|
850 if (r instanceof AltResult) { |
|
851 ex = ((AltResult)r).ex; |
|
852 t = null; |
|
853 } |
|
854 else { |
|
855 ex = null; |
|
856 @SuppressWarnings("unchecked") T tr = (T) r; |
|
857 t = tr; |
|
858 } |
|
859 if (ex != null) |
|
860 u = null; |
|
861 else if (s instanceof AltResult) { |
|
862 ex = ((AltResult)s).ex; |
|
863 u = null; |
|
864 } |
|
865 else { |
|
866 @SuppressWarnings("unchecked") U us = (U) s; |
|
867 u = us; |
|
868 } |
|
869 Executor e = executor; |
|
870 V v = null; |
|
871 if (ex == null) { |
|
872 try { |
|
873 if (e != null) |
|
874 execAsync(e, new AsyncCombine<T,U,V>(t, u, fn, dst)); |
|
875 else |
|
876 v = fn.apply(t, u); |
|
877 } catch (Throwable rex) { |
|
878 ex = rex; |
|
879 } |
|
880 } |
|
881 if (e == null || ex != null) |
|
882 dst.internalComplete(v, ex); |
|
883 } |
|
884 } |
|
885 private static final long serialVersionUID = 5232453952276885070L; |
|
886 } |
|
887 |
|
888 static final class ThenAcceptBoth<T,U> extends Completion { |
|
889 final CompletableFuture<? extends T> src; |
|
890 final CompletableFuture<? extends U> snd; |
|
891 final BiConsumer<? super T,? super U> fn; |
|
892 final CompletableFuture<Void> dst; |
|
893 final Executor executor; |
|
894 ThenAcceptBoth(CompletableFuture<? extends T> src, |
|
895 CompletableFuture<? extends U> snd, |
|
896 BiConsumer<? super T,? super U> fn, |
|
897 CompletableFuture<Void> dst, |
|
898 Executor executor) { |
|
899 this.src = src; this.snd = snd; |
|
900 this.fn = fn; this.dst = dst; |
|
901 this.executor = executor; |
|
902 } |
|
903 public final void run() { |
|
904 final CompletableFuture<? extends T> a; |
|
905 final CompletableFuture<? extends U> b; |
|
906 final BiConsumer<? super T,? super U> fn; |
|
907 final CompletableFuture<Void> dst; |
|
908 Object r, s; T t; U u; Throwable ex; |
|
909 if ((dst = this.dst) != null && |
|
910 (fn = this.fn) != null && |
|
911 (a = this.src) != null && |
|
912 (r = a.result) != null && |
|
913 (b = this.snd) != null && |
|
914 (s = b.result) != null && |
|
915 compareAndSet(0, 1)) { |
|
916 if (r instanceof AltResult) { |
|
917 ex = ((AltResult)r).ex; |
|
918 t = null; |
|
919 } |
|
920 else { |
|
921 ex = null; |
|
922 @SuppressWarnings("unchecked") T tr = (T) r; |
|
923 t = tr; |
|
924 } |
|
925 if (ex != null) |
|
926 u = null; |
|
927 else if (s instanceof AltResult) { |
|
928 ex = ((AltResult)s).ex; |
|
929 u = null; |
|
930 } |
|
931 else { |
|
932 @SuppressWarnings("unchecked") U us = (U) s; |
|
933 u = us; |
|
934 } |
|
935 Executor e = executor; |
|
936 if (ex == null) { |
|
937 try { |
|
938 if (e != null) |
|
939 execAsync(e, new AsyncAcceptBoth<T,U>(t, u, fn, dst)); |
|
940 else |
|
941 fn.accept(t, u); |
|
942 } catch (Throwable rex) { |
|
943 ex = rex; |
|
944 } |
|
945 } |
|
946 if (e == null || ex != null) |
|
947 dst.internalComplete(null, ex); |
|
948 } |
|
949 } |
|
950 private static final long serialVersionUID = 5232453952276885070L; |
|
951 } |
|
952 |
|
953 static final class RunAfterBoth extends Completion { |
|
954 final CompletableFuture<?> src; |
|
955 final CompletableFuture<?> snd; |
|
956 final Runnable fn; |
|
957 final CompletableFuture<Void> dst; |
|
958 final Executor executor; |
|
959 RunAfterBoth(CompletableFuture<?> src, |
|
960 CompletableFuture<?> snd, |
|
961 Runnable fn, |
|
962 CompletableFuture<Void> dst, |
|
963 Executor executor) { |
|
964 this.src = src; this.snd = snd; |
|
965 this.fn = fn; this.dst = dst; |
|
966 this.executor = executor; |
|
967 } |
|
968 public final void run() { |
|
969 final CompletableFuture<?> a; |
|
970 final CompletableFuture<?> b; |
|
971 final Runnable fn; |
|
972 final CompletableFuture<Void> dst; |
|
973 Object r, s; Throwable ex; |
|
974 if ((dst = this.dst) != null && |
|
975 (fn = this.fn) != null && |
|
976 (a = this.src) != null && |
|
977 (r = a.result) != null && |
|
978 (b = this.snd) != null && |
|
979 (s = b.result) != null && |
|
980 compareAndSet(0, 1)) { |
|
981 if (r instanceof AltResult) |
|
982 ex = ((AltResult)r).ex; |
|
983 else |
|
984 ex = null; |
|
985 if (ex == null && (s instanceof AltResult)) |
|
986 ex = ((AltResult)s).ex; |
|
987 Executor e = executor; |
|
988 if (ex == null) { |
|
989 try { |
|
990 if (e != null) |
|
991 execAsync(e, new AsyncRun(fn, dst)); |
|
992 else |
|
993 fn.run(); |
|
994 } catch (Throwable rex) { |
|
995 ex = rex; |
|
996 } |
|
997 } |
|
998 if (e == null || ex != null) |
|
999 dst.internalComplete(null, ex); |
|
1000 } |
|
1001 } |
|
1002 private static final long serialVersionUID = 5232453952276885070L; |
|
1003 } |
|
1004 |
|
1005 static final class AndCompletion extends Completion { |
|
1006 final CompletableFuture<?> src; |
|
1007 final CompletableFuture<?> snd; |
|
1008 final CompletableFuture<Void> dst; |
|
1009 AndCompletion(CompletableFuture<?> src, |
|
1010 CompletableFuture<?> snd, |
|
1011 CompletableFuture<Void> dst) { |
|
1012 this.src = src; this.snd = snd; this.dst = dst; |
|
1013 } |
|
1014 public final void run() { |
|
1015 final CompletableFuture<?> a; |
|
1016 final CompletableFuture<?> b; |
|
1017 final CompletableFuture<Void> dst; |
|
1018 Object r, s; Throwable ex; |
|
1019 if ((dst = this.dst) != null && |
|
1020 (a = this.src) != null && |
|
1021 (r = a.result) != null && |
|
1022 (b = this.snd) != null && |
|
1023 (s = b.result) != null && |
|
1024 compareAndSet(0, 1)) { |
|
1025 if (r instanceof AltResult) |
|
1026 ex = ((AltResult)r).ex; |
|
1027 else |
|
1028 ex = null; |
|
1029 if (ex == null && (s instanceof AltResult)) |
|
1030 ex = ((AltResult)s).ex; |
|
1031 dst.internalComplete(null, ex); |
|
1032 } |
|
1033 } |
|
1034 private static final long serialVersionUID = 5232453952276885070L; |
|
1035 } |
|
1036 |
|
1037 static final class ApplyToEither<T,U> extends Completion { |
|
1038 final CompletableFuture<? extends T> src; |
|
1039 final CompletableFuture<? extends T> snd; |
|
1040 final Function<? super T,? extends U> fn; |
|
1041 final CompletableFuture<U> dst; |
|
1042 final Executor executor; |
|
1043 ApplyToEither(CompletableFuture<? extends T> src, |
|
1044 CompletableFuture<? extends T> snd, |
|
1045 Function<? super T,? extends U> fn, |
|
1046 CompletableFuture<U> dst, |
|
1047 Executor executor) { |
|
1048 this.src = src; this.snd = snd; |
|
1049 this.fn = fn; this.dst = dst; |
|
1050 this.executor = executor; |
|
1051 } |
|
1052 public final void run() { |
|
1053 final CompletableFuture<? extends T> a; |
|
1054 final CompletableFuture<? extends T> b; |
|
1055 final Function<? super T,? extends U> fn; |
|
1056 final CompletableFuture<U> dst; |
|
1057 Object r; T t; Throwable ex; |
|
1058 if ((dst = this.dst) != null && |
|
1059 (fn = this.fn) != null && |
|
1060 (((a = this.src) != null && (r = a.result) != null) || |
|
1061 ((b = this.snd) != null && (r = b.result) != null)) && |
|
1062 compareAndSet(0, 1)) { |
|
1063 if (r instanceof AltResult) { |
|
1064 ex = ((AltResult)r).ex; |
|
1065 t = null; |
|
1066 } |
|
1067 else { |
|
1068 ex = null; |
|
1069 @SuppressWarnings("unchecked") T tr = (T) r; |
|
1070 t = tr; |
|
1071 } |
|
1072 Executor e = executor; |
|
1073 U u = null; |
|
1074 if (ex == null) { |
|
1075 try { |
|
1076 if (e != null) |
|
1077 execAsync(e, new AsyncApply<T,U>(t, fn, dst)); |
|
1078 else |
|
1079 u = fn.apply(t); |
|
1080 } catch (Throwable rex) { |
|
1081 ex = rex; |
|
1082 } |
|
1083 } |
|
1084 if (e == null || ex != null) |
|
1085 dst.internalComplete(u, ex); |
|
1086 } |
|
1087 } |
|
1088 private static final long serialVersionUID = 5232453952276885070L; |
|
1089 } |
|
1090 |
|
1091 static final class AcceptEither<T> extends Completion { |
|
1092 final CompletableFuture<? extends T> src; |
|
1093 final CompletableFuture<? extends T> snd; |
|
1094 final Consumer<? super T> fn; |
|
1095 final CompletableFuture<Void> dst; |
|
1096 final Executor executor; |
|
1097 AcceptEither(CompletableFuture<? extends T> src, |
|
1098 CompletableFuture<? extends T> snd, |
|
1099 Consumer<? super T> fn, |
|
1100 CompletableFuture<Void> dst, |
|
1101 Executor executor) { |
|
1102 this.src = src; this.snd = snd; |
|
1103 this.fn = fn; this.dst = dst; |
|
1104 this.executor = executor; |
|
1105 } |
|
1106 public final void run() { |
|
1107 final CompletableFuture<? extends T> a; |
|
1108 final CompletableFuture<? extends T> b; |
|
1109 final Consumer<? super T> fn; |
|
1110 final CompletableFuture<Void> dst; |
|
1111 Object r; T t; Throwable ex; |
|
1112 if ((dst = this.dst) != null && |
|
1113 (fn = this.fn) != null && |
|
1114 (((a = this.src) != null && (r = a.result) != null) || |
|
1115 ((b = this.snd) != null && (r = b.result) != null)) && |
|
1116 compareAndSet(0, 1)) { |
|
1117 if (r instanceof AltResult) { |
|
1118 ex = ((AltResult)r).ex; |
|
1119 t = null; |
|
1120 } |
|
1121 else { |
|
1122 ex = null; |
|
1123 @SuppressWarnings("unchecked") T tr = (T) r; |
|
1124 t = tr; |
|
1125 } |
|
1126 Executor e = executor; |
|
1127 if (ex == null) { |
|
1128 try { |
|
1129 if (e != null) |
|
1130 execAsync(e, new AsyncAccept<T>(t, fn, dst)); |
|
1131 else |
|
1132 fn.accept(t); |
|
1133 } catch (Throwable rex) { |
|
1134 ex = rex; |
|
1135 } |
|
1136 } |
|
1137 if (e == null || ex != null) |
|
1138 dst.internalComplete(null, ex); |
|
1139 } |
|
1140 } |
|
1141 private static final long serialVersionUID = 5232453952276885070L; |
|
1142 } |
|
1143 |
|
1144 static final class RunAfterEither extends Completion { |
|
1145 final CompletableFuture<?> src; |
|
1146 final CompletableFuture<?> snd; |
|
1147 final Runnable fn; |
|
1148 final CompletableFuture<Void> dst; |
|
1149 final Executor executor; |
|
1150 RunAfterEither(CompletableFuture<?> src, |
|
1151 CompletableFuture<?> snd, |
|
1152 Runnable fn, |
|
1153 CompletableFuture<Void> dst, |
|
1154 Executor executor) { |
|
1155 this.src = src; this.snd = snd; |
|
1156 this.fn = fn; this.dst = dst; |
|
1157 this.executor = executor; |
|
1158 } |
|
1159 public final void run() { |
|
1160 final CompletableFuture<?> a; |
|
1161 final CompletableFuture<?> b; |
|
1162 final Runnable fn; |
|
1163 final CompletableFuture<Void> dst; |
|
1164 Object r; Throwable ex; |
|
1165 if ((dst = this.dst) != null && |
|
1166 (fn = this.fn) != null && |
|
1167 (((a = this.src) != null && (r = a.result) != null) || |
|
1168 ((b = this.snd) != null && (r = b.result) != null)) && |
|
1169 compareAndSet(0, 1)) { |
|
1170 if (r instanceof AltResult) |
|
1171 ex = ((AltResult)r).ex; |
|
1172 else |
|
1173 ex = null; |
|
1174 Executor e = executor; |
|
1175 if (ex == null) { |
|
1176 try { |
|
1177 if (e != null) |
|
1178 execAsync(e, new AsyncRun(fn, dst)); |
|
1179 else |
|
1180 fn.run(); |
|
1181 } catch (Throwable rex) { |
|
1182 ex = rex; |
|
1183 } |
|
1184 } |
|
1185 if (e == null || ex != null) |
|
1186 dst.internalComplete(null, ex); |
|
1187 } |
|
1188 } |
|
1189 private static final long serialVersionUID = 5232453952276885070L; |
|
1190 } |
|
1191 |
|
1192 static final class OrCompletion extends Completion { |
|
1193 final CompletableFuture<?> src; |
|
1194 final CompletableFuture<?> snd; |
|
1195 final CompletableFuture<Object> dst; |
|
1196 OrCompletion(CompletableFuture<?> src, |
|
1197 CompletableFuture<?> snd, |
|
1198 CompletableFuture<Object> dst) { |
|
1199 this.src = src; this.snd = snd; this.dst = dst; |
|
1200 } |
|
1201 public final void run() { |
|
1202 final CompletableFuture<?> a; |
|
1203 final CompletableFuture<?> b; |
|
1204 final CompletableFuture<Object> dst; |
|
1205 Object r, t; Throwable ex; |
|
1206 if ((dst = this.dst) != null && |
|
1207 (((a = this.src) != null && (r = a.result) != null) || |
|
1208 ((b = this.snd) != null && (r = b.result) != null)) && |
|
1209 compareAndSet(0, 1)) { |
|
1210 if (r instanceof AltResult) { |
|
1211 ex = ((AltResult)r).ex; |
|
1212 t = null; |
|
1213 } |
|
1214 else { |
|
1215 ex = null; |
|
1216 t = r; |
|
1217 } |
|
1218 dst.internalComplete(t, ex); |
|
1219 } |
|
1220 } |
|
1221 private static final long serialVersionUID = 5232453952276885070L; |
|
1222 } |
|
1223 |
|
1224 static final class ExceptionCompletion<T> extends Completion { |
|
1225 final CompletableFuture<? extends T> src; |
|
1226 final Function<? super Throwable, ? extends T> fn; |
|
1227 final CompletableFuture<T> dst; |
|
1228 ExceptionCompletion(CompletableFuture<? extends T> src, |
|
1229 Function<? super Throwable, ? extends T> fn, |
|
1230 CompletableFuture<T> dst) { |
|
1231 this.src = src; this.fn = fn; this.dst = dst; |
|
1232 } |
|
1233 public final void run() { |
|
1234 final CompletableFuture<? extends T> a; |
|
1235 final Function<? super Throwable, ? extends T> fn; |
|
1236 final CompletableFuture<T> dst; |
|
1237 Object r; T t = null; Throwable ex, dx = null; |
|
1238 if ((dst = this.dst) != null && |
|
1239 (fn = this.fn) != null && |
|
1240 (a = this.src) != null && |
|
1241 (r = a.result) != null && |
|
1242 compareAndSet(0, 1)) { |
|
1243 if ((r instanceof AltResult) && |
|
1244 (ex = ((AltResult)r).ex) != null) { |
|
1245 try { |
|
1246 t = fn.apply(ex); |
|
1247 } catch (Throwable rex) { |
|
1248 dx = rex; |
|
1249 } |
|
1250 } |
|
1251 else { |
|
1252 @SuppressWarnings("unchecked") T tr = (T) r; |
|
1253 t = tr; |
|
1254 } |
|
1255 dst.internalComplete(t, dx); |
|
1256 } |
|
1257 } |
|
1258 private static final long serialVersionUID = 5232453952276885070L; |
|
1259 } |
|
1260 |
|
1261 static final class WhenCompleteCompletion<T> extends Completion { |
|
1262 final CompletableFuture<? extends T> src; |
|
1263 final BiConsumer<? super T, ? super Throwable> fn; |
|
1264 final CompletableFuture<T> dst; |
|
1265 final Executor executor; |
|
1266 WhenCompleteCompletion(CompletableFuture<? extends T> src, |
|
1267 BiConsumer<? super T, ? super Throwable> fn, |
|
1268 CompletableFuture<T> dst, |
|
1269 Executor executor) { |
|
1270 this.src = src; this.fn = fn; this.dst = dst; |
|
1271 this.executor = executor; |
|
1272 } |
|
1273 public final void run() { |
|
1274 final CompletableFuture<? extends T> a; |
|
1275 final BiConsumer<? super T, ? super Throwable> fn; |
|
1276 final CompletableFuture<T> dst; |
|
1277 Object r; T t; Throwable ex; |
|
1278 if ((dst = this.dst) != null && |
|
1279 (fn = this.fn) != null && |
|
1280 (a = this.src) != null && |
|
1281 (r = a.result) != null && |
|
1282 compareAndSet(0, 1)) { |
|
1283 if (r instanceof AltResult) { |
|
1284 ex = ((AltResult)r).ex; |
|
1285 t = null; |
|
1286 } |
|
1287 else { |
|
1288 ex = null; |
|
1289 @SuppressWarnings("unchecked") T tr = (T) r; |
|
1290 t = tr; |
|
1291 } |
|
1292 Executor e = executor; |
|
1293 Throwable dx = null; |
|
1294 try { |
|
1295 if (e != null) |
|
1296 execAsync(e, new AsyncWhenComplete<T>(t, ex, fn, dst)); |
|
1297 else |
|
1298 fn.accept(t, ex); |
|
1299 } catch (Throwable rex) { |
|
1300 dx = rex; |
|
1301 } |
|
1302 if (e == null || dx != null) |
|
1303 dst.internalComplete(t, ex != null ? ex : dx); |
|
1304 } |
|
1305 } |
|
1306 private static final long serialVersionUID = 5232453952276885070L; |
|
1307 } |
|
1308 |
|
1309 static final class ThenCopy<T> extends Completion { |
|
1310 final CompletableFuture<?> src; |
|
1311 final CompletableFuture<T> dst; |
|
1312 ThenCopy(CompletableFuture<?> src, |
|
1313 CompletableFuture<T> dst) { |
|
1314 this.src = src; this.dst = dst; |
|
1315 } |
|
1316 public final void run() { |
|
1317 final CompletableFuture<?> a; |
|
1318 final CompletableFuture<T> dst; |
|
1319 Object r; T t; Throwable ex; |
|
1320 if ((dst = this.dst) != null && |
|
1321 (a = this.src) != null && |
|
1322 (r = a.result) != null && |
|
1323 compareAndSet(0, 1)) { |
|
1324 if (r instanceof AltResult) { |
|
1325 ex = ((AltResult)r).ex; |
|
1326 t = null; |
|
1327 } |
|
1328 else { |
|
1329 ex = null; |
|
1330 @SuppressWarnings("unchecked") T tr = (T) r; |
|
1331 t = tr; |
|
1332 } |
|
1333 dst.internalComplete(t, ex); |
|
1334 } |
|
1335 } |
|
1336 private static final long serialVersionUID = 5232453952276885070L; |
|
1337 } |
|
1338 |
|
1339 // version of ThenCopy for CompletableFuture<Void> dst |
|
1340 static final class ThenPropagate extends Completion { |
|
1341 final CompletableFuture<?> src; |
|
1342 final CompletableFuture<Void> dst; |
|
1343 ThenPropagate(CompletableFuture<?> src, |
|
1344 CompletableFuture<Void> dst) { |
|
1345 this.src = src; this.dst = dst; |
|
1346 } |
|
1347 public final void run() { |
|
1348 final CompletableFuture<?> a; |
|
1349 final CompletableFuture<Void> dst; |
|
1350 Object r; Throwable ex; |
|
1351 if ((dst = this.dst) != null && |
|
1352 (a = this.src) != null && |
|
1353 (r = a.result) != null && |
|
1354 compareAndSet(0, 1)) { |
|
1355 if (r instanceof AltResult) |
|
1356 ex = ((AltResult)r).ex; |
|
1357 else |
|
1358 ex = null; |
|
1359 dst.internalComplete(null, ex); |
|
1360 } |
|
1361 } |
|
1362 private static final long serialVersionUID = 5232453952276885070L; |
|
1363 } |
|
1364 |
|
1365 static final class HandleCompletion<T,U> extends Completion { |
|
1366 final CompletableFuture<? extends T> src; |
|
1367 final BiFunction<? super T, Throwable, ? extends U> fn; |
|
1368 final CompletableFuture<U> dst; |
|
1369 final Executor executor; |
|
1370 HandleCompletion(CompletableFuture<? extends T> src, |
|
1371 BiFunction<? super T, Throwable, ? extends U> fn, |
|
1372 CompletableFuture<U> dst, |
|
1373 Executor executor) { |
|
1374 this.src = src; this.fn = fn; this.dst = dst; |
|
1375 this.executor = executor; |
|
1376 } |
|
1377 public final void run() { |
|
1378 final CompletableFuture<? extends T> a; |
|
1379 final BiFunction<? super T, Throwable, ? extends U> fn; |
|
1380 final CompletableFuture<U> dst; |
|
1381 Object r; T t; Throwable ex; |
|
1382 if ((dst = this.dst) != null && |
|
1383 (fn = this.fn) != null && |
|
1384 (a = this.src) != null && |
|
1385 (r = a.result) != null && |
|
1386 compareAndSet(0, 1)) { |
|
1387 if (r instanceof AltResult) { |
|
1388 ex = ((AltResult)r).ex; |
|
1389 t = null; |
|
1390 } |
|
1391 else { |
|
1392 ex = null; |
|
1393 @SuppressWarnings("unchecked") T tr = (T) r; |
|
1394 t = tr; |
|
1395 } |
|
1396 Executor e = executor; |
|
1397 U u = null; |
|
1398 Throwable dx = null; |
|
1399 try { |
|
1400 if (e != null) |
|
1401 execAsync(e, new AsyncCombine<T,Throwable,U>(t, ex, fn, dst)); |
|
1402 else |
|
1403 u = fn.apply(t, ex); |
|
1404 } catch (Throwable rex) { |
|
1405 dx = rex; |
|
1406 } |
|
1407 if (e == null || dx != null) |
|
1408 dst.internalComplete(u, dx); |
|
1409 } |
|
1410 } |
|
1411 private static final long serialVersionUID = 5232453952276885070L; |
|
1412 } |
|
1413 |
|
1414 static final class ThenCompose<T,U> extends Completion { |
|
1415 final CompletableFuture<? extends T> src; |
|
1416 final Function<? super T, ? extends CompletionStage<U>> fn; |
|
1417 final CompletableFuture<U> dst; |
|
1418 final Executor executor; |
|
1419 ThenCompose(CompletableFuture<? extends T> src, |
|
1420 Function<? super T, ? extends CompletionStage<U>> fn, |
|
1421 CompletableFuture<U> dst, |
|
1422 Executor executor) { |
|
1423 this.src = src; this.fn = fn; this.dst = dst; |
|
1424 this.executor = executor; |
|
1425 } |
|
1426 public final void run() { |
|
1427 final CompletableFuture<? extends T> a; |
|
1428 final Function<? super T, ? extends CompletionStage<U>> fn; |
|
1429 final CompletableFuture<U> dst; |
|
1430 Object r; T t; Throwable ex; Executor e; |
|
1431 if ((dst = this.dst) != null && |
|
1432 (fn = this.fn) != null && |
|
1433 (a = this.src) != null && |
|
1434 (r = a.result) != null && |
|
1435 compareAndSet(0, 1)) { |
|
1436 if (r instanceof AltResult) { |
|
1437 ex = ((AltResult)r).ex; |
|
1438 t = null; |
|
1439 } |
|
1440 else { |
|
1441 ex = null; |
|
1442 @SuppressWarnings("unchecked") T tr = (T) r; |
|
1443 t = tr; |
|
1444 } |
|
1445 CompletableFuture<U> c = null; |
|
1446 U u = null; |
|
1447 boolean complete = false; |
|
1448 if (ex == null) { |
|
1449 if ((e = executor) != null) |
|
1450 execAsync(e, new AsyncCompose<T,U>(t, fn, dst)); |
|
1451 else { |
|
1452 try { |
|
1453 CompletionStage<U> cs = fn.apply(t); |
|
1454 c = (cs == null) ? null : cs.toCompletableFuture(); |
|
1455 if (c == null) |
|
1456 ex = new NullPointerException(); |
|
1457 } catch (Throwable rex) { |
|
1458 ex = rex; |
|
1459 } |
|
1460 } |
|
1461 } |
|
1462 if (c != null) { |
|
1463 ThenCopy<U> d = null; |
|
1464 Object s; |
|
1465 if ((s = c.result) == null) { |
|
1466 CompletionNode p = new CompletionNode |
|
1467 (d = new ThenCopy<U>(c, dst)); |
|
1468 while ((s = c.result) == null) { |
|
1469 if (UNSAFE.compareAndSwapObject |
|
1470 (c, COMPLETIONS, p.next = c.completions, p)) |
|
1471 break; |
|
1472 } |
|
1473 } |
|
1474 if (s != null && (d == null || d.compareAndSet(0, 1))) { |
|
1475 complete = true; |
|
1476 if (s instanceof AltResult) { |
|
1477 ex = ((AltResult)s).ex; // no rewrap |
|
1478 u = null; |
|
1479 } |
|
1480 else { |
|
1481 @SuppressWarnings("unchecked") U us = (U) s; |
|
1482 u = us; |
|
1483 } |
|
1484 } |
|
1485 } |
|
1486 if (complete || ex != null) |
|
1487 dst.internalComplete(u, ex); |
|
1488 if (c != null) |
|
1489 c.helpPostComplete(); |
|
1490 } |
|
1491 } |
|
1492 private static final long serialVersionUID = 5232453952276885070L; |
|
1493 } |
|
1494 |
|
1495 // Implementations of stage methods with (plain, async, Executor) forms |
|
1496 |
|
1497 private <U> CompletableFuture<U> doThenApply |
|
1498 (Function<? super T,? extends U> fn, |
|
1499 Executor e) { |
|
1500 if (fn == null) throw new NullPointerException(); |
|
1501 CompletableFuture<U> dst = new CompletableFuture<U>(); |
|
1502 ThenApply<T,U> d = null; |
|
1503 Object r; |
|
1504 if ((r = result) == null) { |
|
1505 CompletionNode p = new CompletionNode |
|
1506 (d = new ThenApply<T,U>(this, fn, dst, e)); |
|
1507 while ((r = result) == null) { |
|
1508 if (UNSAFE.compareAndSwapObject |
|
1509 (this, COMPLETIONS, p.next = completions, p)) |
|
1510 break; |
|
1511 } |
|
1512 } |
|
1513 if (r != null && (d == null || d.compareAndSet(0, 1))) { |
|
1514 T t; Throwable ex; |
|
1515 if (r instanceof AltResult) { |
|
1516 ex = ((AltResult)r).ex; |
|
1517 t = null; |
|
1518 } |
|
1519 else { |
|
1520 ex = null; |
|
1521 @SuppressWarnings("unchecked") T tr = (T) r; |
|
1522 t = tr; |
|
1523 } |
|
1524 U u = null; |
|
1525 if (ex == null) { |
|
1526 try { |
|
1527 if (e != null) |
|
1528 execAsync(e, new AsyncApply<T,U>(t, fn, dst)); |
|
1529 else |
|
1530 u = fn.apply(t); |
|
1531 } catch (Throwable rex) { |
|
1532 ex = rex; |
|
1533 } |
|
1534 } |
|
1535 if (e == null || ex != null) |
|
1536 dst.internalComplete(u, ex); |
|
1537 } |
|
1538 helpPostComplete(); |
|
1539 return dst; |
|
1540 } |
|
1541 |
|
1542 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn, |
|
1543 Executor e) { |
|
1544 if (fn == null) throw new NullPointerException(); |
|
1545 CompletableFuture<Void> dst = new CompletableFuture<Void>(); |
|
1546 ThenAccept<T> d = null; |
|
1547 Object r; |
|
1548 if ((r = result) == null) { |
|
1549 CompletionNode p = new CompletionNode |
|
1550 (d = new ThenAccept<T>(this, fn, dst, e)); |
|
1551 while ((r = result) == null) { |
|
1552 if (UNSAFE.compareAndSwapObject |
|
1553 (this, COMPLETIONS, p.next = completions, p)) |
|
1554 break; |
|
1555 } |
|
1556 } |
|
1557 if (r != null && (d == null || d.compareAndSet(0, 1))) { |
|
1558 T t; Throwable ex; |
|
1559 if (r instanceof AltResult) { |
|
1560 ex = ((AltResult)r).ex; |
|
1561 t = null; |
|
1562 } |
|
1563 else { |
|
1564 ex = null; |
|
1565 @SuppressWarnings("unchecked") T tr = (T) r; |
|
1566 t = tr; |
|
1567 } |
|
1568 if (ex == null) { |
|
1569 try { |
|
1570 if (e != null) |
|
1571 execAsync(e, new AsyncAccept<T>(t, fn, dst)); |
|
1572 else |
|
1573 fn.accept(t); |
|
1574 } catch (Throwable rex) { |
|
1575 ex = rex; |
|
1576 } |
|
1577 } |
|
1578 if (e == null || ex != null) |
|
1579 dst.internalComplete(null, ex); |
|
1580 } |
|
1581 helpPostComplete(); |
|
1582 return dst; |
|
1583 } |
|
1584 |
|
1585 private CompletableFuture<Void> doThenRun(Runnable action, |
|
1586 Executor e) { |
|
1587 if (action == null) throw new NullPointerException(); |
|
1588 CompletableFuture<Void> dst = new CompletableFuture<Void>(); |
|
1589 ThenRun d = null; |
|
1590 Object r; |
|
1591 if ((r = result) == null) { |
|
1592 CompletionNode p = new CompletionNode |
|
1593 (d = new ThenRun(this, action, dst, e)); |
|
1594 while ((r = result) == null) { |
|
1595 if (UNSAFE.compareAndSwapObject |
|
1596 (this, COMPLETIONS, p.next = completions, p)) |
|
1597 break; |
|
1598 } |
|
1599 } |
|
1600 if (r != null && (d == null || d.compareAndSet(0, 1))) { |
|
1601 Throwable ex; |
|
1602 if (r instanceof AltResult) |
|
1603 ex = ((AltResult)r).ex; |
|
1604 else |
|
1605 ex = null; |
|
1606 if (ex == null) { |
|
1607 try { |
|
1608 if (e != null) |
|
1609 execAsync(e, new AsyncRun(action, dst)); |
|
1610 else |
|
1611 action.run(); |
|
1612 } catch (Throwable rex) { |
|
1613 ex = rex; |
|
1614 } |
|
1615 } |
|
1616 if (e == null || ex != null) |
|
1617 dst.internalComplete(null, ex); |
|
1618 } |
|
1619 helpPostComplete(); |
|
1620 return dst; |
|
1621 } |
|
1622 |
|
1623 private <U,V> CompletableFuture<V> doThenCombine |
|
1624 (CompletableFuture<? extends U> other, |
|
1625 BiFunction<? super T,? super U,? extends V> fn, |
|
1626 Executor e) { |
|
1627 if (other == null || fn == null) throw new NullPointerException(); |
|
1628 CompletableFuture<V> dst = new CompletableFuture<V>(); |
|
1629 ThenCombine<T,U,V> d = null; |
|
1630 Object r, s = null; |
|
1631 if ((r = result) == null || (s = other.result) == null) { |
|
1632 d = new ThenCombine<T,U,V>(this, other, fn, dst, e); |
|
1633 CompletionNode q = null, p = new CompletionNode(d); |
|
1634 while ((r == null && (r = result) == null) || |
|
1635 (s == null && (s = other.result) == null)) { |
|
1636 if (q != null) { |
|
1637 if (s != null || |
|
1638 UNSAFE.compareAndSwapObject |
|
1639 (other, COMPLETIONS, q.next = other.completions, q)) |
|
1640 break; |
|
1641 } |
|
1642 else if (r != null || |
|
1643 UNSAFE.compareAndSwapObject |
|
1644 (this, COMPLETIONS, p.next = completions, p)) { |
|
1645 if (s != null) |
|
1646 break; |
|
1647 q = new CompletionNode(d); |
|
1648 } |
|
1649 } |
|
1650 } |
|
1651 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) { |
|
1652 T t; U u; Throwable ex; |
|
1653 if (r instanceof AltResult) { |
|
1654 ex = ((AltResult)r).ex; |
|
1655 t = null; |
|
1656 } |
|
1657 else { |
|
1658 ex = null; |
|
1659 @SuppressWarnings("unchecked") T tr = (T) r; |
|
1660 t = tr; |
|
1661 } |
|
1662 if (ex != null) |
|
1663 u = null; |
|
1664 else if (s instanceof AltResult) { |
|
1665 ex = ((AltResult)s).ex; |
|
1666 u = null; |
|
1667 } |
|
1668 else { |
|
1669 @SuppressWarnings("unchecked") U us = (U) s; |
|
1670 u = us; |
|
1671 } |
|
1672 V v = null; |
|
1673 if (ex == null) { |
|
1674 try { |
|
1675 if (e != null) |
|
1676 execAsync(e, new AsyncCombine<T,U,V>(t, u, fn, dst)); |
|
1677 else |
|
1678 v = fn.apply(t, u); |
|
1679 } catch (Throwable rex) { |
|
1680 ex = rex; |
|
1681 } |
|
1682 } |
|
1683 if (e == null || ex != null) |
|
1684 dst.internalComplete(v, ex); |
|
1685 } |
|
1686 helpPostComplete(); |
|
1687 other.helpPostComplete(); |
|
1688 return dst; |
|
1689 } |
|
1690 |
|
1691 private <U> CompletableFuture<Void> doThenAcceptBoth |
|
1692 (CompletableFuture<? extends U> other, |
|
1693 BiConsumer<? super T,? super U> fn, |
|
1694 Executor e) { |
|
1695 if (other == null || fn == null) throw new NullPointerException(); |
|
1696 CompletableFuture<Void> dst = new CompletableFuture<Void>(); |
|
1697 ThenAcceptBoth<T,U> d = null; |
|
1698 Object r, s = null; |
|
1699 if ((r = result) == null || (s = other.result) == null) { |
|
1700 d = new ThenAcceptBoth<T,U>(this, other, fn, dst, e); |
|
1701 CompletionNode q = null, p = new CompletionNode(d); |
|
1702 while ((r == null && (r = result) == null) || |
|
1703 (s == null && (s = other.result) == null)) { |
|
1704 if (q != null) { |
|
1705 if (s != null || |
|
1706 UNSAFE.compareAndSwapObject |
|
1707 (other, COMPLETIONS, q.next = other.completions, q)) |
|
1708 break; |
|
1709 } |
|
1710 else if (r != null || |
|
1711 UNSAFE.compareAndSwapObject |
|
1712 (this, COMPLETIONS, p.next = completions, p)) { |
|
1713 if (s != null) |
|
1714 break; |
|
1715 q = new CompletionNode(d); |
|
1716 } |
|
1717 } |
|
1718 } |
|
1719 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) { |
|
1720 T t; U u; Throwable ex; |
|
1721 if (r instanceof AltResult) { |
|
1722 ex = ((AltResult)r).ex; |
|
1723 t = null; |
|
1724 } |
|
1725 else { |
|
1726 ex = null; |
|
1727 @SuppressWarnings("unchecked") T tr = (T) r; |
|
1728 t = tr; |
|
1729 } |
|
1730 if (ex != null) |
|
1731 u = null; |
|
1732 else if (s instanceof AltResult) { |
|
1733 ex = ((AltResult)s).ex; |
|
1734 u = null; |
|
1735 } |
|
1736 else { |
|
1737 @SuppressWarnings("unchecked") U us = (U) s; |
|
1738 u = us; |
|
1739 } |
|
1740 if (ex == null) { |
|
1741 try { |
|
1742 if (e != null) |
|
1743 execAsync(e, new AsyncAcceptBoth<T,U>(t, u, fn, dst)); |
|
1744 else |
|
1745 fn.accept(t, u); |
|
1746 } catch (Throwable rex) { |
|
1747 ex = rex; |
|
1748 } |
|
1749 } |
|
1750 if (e == null || ex != null) |
|
1751 dst.internalComplete(null, ex); |
|
1752 } |
|
1753 helpPostComplete(); |
|
1754 other.helpPostComplete(); |
|
1755 return dst; |
|
1756 } |
|
1757 |
|
1758 private CompletableFuture<Void> doRunAfterBoth(CompletableFuture<?> other, |
|
1759 Runnable action, |
|
1760 Executor e) { |
|
1761 if (other == null || action == null) throw new NullPointerException(); |
|
1762 CompletableFuture<Void> dst = new CompletableFuture<Void>(); |
|
1763 RunAfterBoth d = null; |
|
1764 Object r, s = null; |
|
1765 if ((r = result) == null || (s = other.result) == null) { |
|
1766 d = new RunAfterBoth(this, other, action, dst, e); |
|
1767 CompletionNode q = null, p = new CompletionNode(d); |
|
1768 while ((r == null && (r = result) == null) || |
|
1769 (s == null && (s = other.result) == null)) { |
|
1770 if (q != null) { |
|
1771 if (s != null || |
|
1772 UNSAFE.compareAndSwapObject |
|
1773 (other, COMPLETIONS, q.next = other.completions, q)) |
|
1774 break; |
|
1775 } |
|
1776 else if (r != null || |
|
1777 UNSAFE.compareAndSwapObject |
|
1778 (this, COMPLETIONS, p.next = completions, p)) { |
|
1779 if (s != null) |
|
1780 break; |
|
1781 q = new CompletionNode(d); |
|
1782 } |
|
1783 } |
|
1784 } |
|
1785 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) { |
|
1786 Throwable ex; |
|
1787 if (r instanceof AltResult) |
|
1788 ex = ((AltResult)r).ex; |
|
1789 else |
|
1790 ex = null; |
|
1791 if (ex == null && (s instanceof AltResult)) |
|
1792 ex = ((AltResult)s).ex; |
|
1793 if (ex == null) { |
|
1794 try { |
|
1795 if (e != null) |
|
1796 execAsync(e, new AsyncRun(action, dst)); |
|
1797 else |
|
1798 action.run(); |
|
1799 } catch (Throwable rex) { |
|
1800 ex = rex; |
|
1801 } |
|
1802 } |
|
1803 if (e == null || ex != null) |
|
1804 dst.internalComplete(null, ex); |
|
1805 } |
|
1806 helpPostComplete(); |
|
1807 other.helpPostComplete(); |
|
1808 return dst; |
|
1809 } |
|
1810 |
|
1811 private <U> CompletableFuture<U> doApplyToEither |
|
1812 (CompletableFuture<? extends T> other, |
|
1813 Function<? super T, U> fn, |
|
1814 Executor e) { |
|
1815 if (other == null || fn == null) throw new NullPointerException(); |
|
1816 CompletableFuture<U> dst = new CompletableFuture<U>(); |
|
1817 ApplyToEither<T,U> d = null; |
|
1818 Object r; |
|
1819 if ((r = result) == null && (r = other.result) == null) { |
|
1820 d = new ApplyToEither<T,U>(this, other, fn, dst, e); |
|
1821 CompletionNode q = null, p = new CompletionNode(d); |
|
1822 while ((r = result) == null && (r = other.result) == null) { |
|
1823 if (q != null) { |
|
1824 if (UNSAFE.compareAndSwapObject |
|
1825 (other, COMPLETIONS, q.next = other.completions, q)) |
|
1826 break; |
|
1827 } |
|
1828 else if (UNSAFE.compareAndSwapObject |
|
1829 (this, COMPLETIONS, p.next = completions, p)) |
|
1830 q = new CompletionNode(d); |
|
1831 } |
|
1832 } |
|
1833 if (r != null && (d == null || d.compareAndSet(0, 1))) { |
|
1834 T t; Throwable ex; |
|
1835 if (r instanceof AltResult) { |
|
1836 ex = ((AltResult)r).ex; |
|
1837 t = null; |
|
1838 } |
|
1839 else { |
|
1840 ex = null; |
|
1841 @SuppressWarnings("unchecked") T tr = (T) r; |
|
1842 t = tr; |
|
1843 } |
|
1844 U u = null; |
|
1845 if (ex == null) { |
|
1846 try { |
|
1847 if (e != null) |
|
1848 execAsync(e, new AsyncApply<T,U>(t, fn, dst)); |
|
1849 else |
|
1850 u = fn.apply(t); |
|
1851 } catch (Throwable rex) { |
|
1852 ex = rex; |
|
1853 } |
|
1854 } |
|
1855 if (e == null || ex != null) |
|
1856 dst.internalComplete(u, ex); |
|
1857 } |
|
1858 helpPostComplete(); |
|
1859 other.helpPostComplete(); |
|
1860 return dst; |
|
1861 } |
|
1862 |
|
1863 private CompletableFuture<Void> doAcceptEither |
|
1864 (CompletableFuture<? extends T> other, |
|
1865 Consumer<? super T> fn, |
|
1866 Executor e) { |
|
1867 if (other == null || fn == null) throw new NullPointerException(); |
|
1868 CompletableFuture<Void> dst = new CompletableFuture<Void>(); |
|
1869 AcceptEither<T> d = null; |
|
1870 Object r; |
|
1871 if ((r = result) == null && (r = other.result) == null) { |
|
1872 d = new AcceptEither<T>(this, other, fn, dst, e); |
|
1873 CompletionNode q = null, p = new CompletionNode(d); |
|
1874 while ((r = result) == null && (r = other.result) == null) { |
|
1875 if (q != null) { |
|
1876 if (UNSAFE.compareAndSwapObject |
|
1877 (other, COMPLETIONS, q.next = other.completions, q)) |
|
1878 break; |
|
1879 } |
|
1880 else if (UNSAFE.compareAndSwapObject |
|
1881 (this, COMPLETIONS, p.next = completions, p)) |
|
1882 q = new CompletionNode(d); |
|
1883 } |
|
1884 } |
|
1885 if (r != null && (d == null || d.compareAndSet(0, 1))) { |
|
1886 T t; Throwable ex; |
|
1887 if (r instanceof AltResult) { |
|
1888 ex = ((AltResult)r).ex; |
|
1889 t = null; |
|
1890 } |
|
1891 else { |
|
1892 ex = null; |
|
1893 @SuppressWarnings("unchecked") T tr = (T) r; |
|
1894 t = tr; |
|
1895 } |
|
1896 if (ex == null) { |
|
1897 try { |
|
1898 if (e != null) |
|
1899 execAsync(e, new AsyncAccept<T>(t, fn, dst)); |
|
1900 else |
|
1901 fn.accept(t); |
|
1902 } catch (Throwable rex) { |
|
1903 ex = rex; |
|
1904 } |
|
1905 } |
|
1906 if (e == null || ex != null) |
|
1907 dst.internalComplete(null, ex); |
|
1908 } |
|
1909 helpPostComplete(); |
|
1910 other.helpPostComplete(); |
|
1911 return dst; |
|
1912 } |
|
1913 |
|
1914 private CompletableFuture<Void> doRunAfterEither |
|
1915 (CompletableFuture<?> other, |
|
1916 Runnable action, |
|
1917 Executor e) { |
|
1918 if (other == null || action == null) throw new NullPointerException(); |
|
1919 CompletableFuture<Void> dst = new CompletableFuture<Void>(); |
|
1920 RunAfterEither d = null; |
|
1921 Object r; |
|
1922 if ((r = result) == null && (r = other.result) == null) { |
|
1923 d = new RunAfterEither(this, other, action, dst, e); |
|
1924 CompletionNode q = null, p = new CompletionNode(d); |
|
1925 while ((r = result) == null && (r = other.result) == null) { |
|
1926 if (q != null) { |
|
1927 if (UNSAFE.compareAndSwapObject |
|
1928 (other, COMPLETIONS, q.next = other.completions, q)) |
|
1929 break; |
|
1930 } |
|
1931 else if (UNSAFE.compareAndSwapObject |
|
1932 (this, COMPLETIONS, p.next = completions, p)) |
|
1933 q = new CompletionNode(d); |
|
1934 } |
|
1935 } |
|
1936 if (r != null && (d == null || d.compareAndSet(0, 1))) { |
|
1937 Throwable ex; |
|
1938 if (r instanceof AltResult) |
|
1939 ex = ((AltResult)r).ex; |
|
1940 else |
|
1941 ex = null; |
|
1942 if (ex == null) { |
|
1943 try { |
|
1944 if (e != null) |
|
1945 execAsync(e, new AsyncRun(action, dst)); |
|
1946 else |
|
1947 action.run(); |
|
1948 } catch (Throwable rex) { |
|
1949 ex = rex; |
|
1950 } |
|
1951 } |
|
1952 if (e == null || ex != null) |
|
1953 dst.internalComplete(null, ex); |
|
1954 } |
|
1955 helpPostComplete(); |
|
1956 other.helpPostComplete(); |
|
1957 return dst; |
|
1958 } |
|
1959 |
|
1960 private <U> CompletableFuture<U> doThenCompose |
|
1961 (Function<? super T, ? extends CompletionStage<U>> fn, |
|
1962 Executor e) { |
|
1963 if (fn == null) throw new NullPointerException(); |
|
1964 CompletableFuture<U> dst = null; |
|
1965 ThenCompose<T,U> d = null; |
|
1966 Object r; |
|
1967 if ((r = result) == null) { |
|
1968 dst = new CompletableFuture<U>(); |
|
1969 CompletionNode p = new CompletionNode |
|
1970 (d = new ThenCompose<T,U>(this, fn, dst, e)); |
|
1971 while ((r = result) == null) { |
|
1972 if (UNSAFE.compareAndSwapObject |
|
1973 (this, COMPLETIONS, p.next = completions, p)) |
|
1974 break; |
|
1975 } |
|
1976 } |
|
1977 if (r != null && (d == null || d.compareAndSet(0, 1))) { |
|
1978 T t; Throwable ex; |
|
1979 if (r instanceof AltResult) { |
|
1980 ex = ((AltResult)r).ex; |
|
1981 t = null; |
|
1982 } |
|
1983 else { |
|
1984 ex = null; |
|
1985 @SuppressWarnings("unchecked") T tr = (T) r; |
|
1986 t = tr; |
|
1987 } |
|
1988 if (ex == null) { |
|
1989 if (e != null) { |
|
1990 if (dst == null) |
|
1991 dst = new CompletableFuture<U>(); |
|
1992 execAsync(e, new AsyncCompose<T,U>(t, fn, dst)); |
|
1993 } |
|
1994 else { |
|
1995 try { |
|
1996 CompletionStage<U> cs = fn.apply(t); |
|
1997 if (cs == null || |
|
1998 (dst = cs.toCompletableFuture()) == null) |
|
1999 ex = new NullPointerException(); |
|
2000 } catch (Throwable rex) { |
|
2001 ex = rex; |
|
2002 } |
|
2003 } |
|
2004 } |
|
2005 if (dst == null) |
|
2006 dst = new CompletableFuture<U>(); |
|
2007 if (ex != null) |
|
2008 dst.internalComplete(null, ex); |
|
2009 } |
|
2010 helpPostComplete(); |
|
2011 dst.helpPostComplete(); |
|
2012 return dst; |
|
2013 } |
|
2014 |
|
2015 private CompletableFuture<T> doWhenComplete |
|
2016 (BiConsumer<? super T, ? super Throwable> fn, |
|
2017 Executor e) { |
|
2018 if (fn == null) throw new NullPointerException(); |
|
2019 CompletableFuture<T> dst = new CompletableFuture<T>(); |
|
2020 WhenCompleteCompletion<T> d = null; |
|
2021 Object r; |
|
2022 if ((r = result) == null) { |
|
2023 CompletionNode p = |
|
2024 new CompletionNode(d = new WhenCompleteCompletion<T> |
|
2025 (this, fn, dst, e)); |
|
2026 while ((r = result) == null) { |
|
2027 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, |
|
2028 p.next = completions, p)) |
|
2029 break; |
|
2030 } |
|
2031 } |
|
2032 if (r != null && (d == null || d.compareAndSet(0, 1))) { |
|
2033 T t; Throwable ex; |
|
2034 if (r instanceof AltResult) { |
|
2035 ex = ((AltResult)r).ex; |
|
2036 t = null; |
|
2037 } |
|
2038 else { |
|
2039 ex = null; |
|
2040 @SuppressWarnings("unchecked") T tr = (T) r; |
|
2041 t = tr; |
|
2042 } |
|
2043 Throwable dx = null; |
|
2044 try { |
|
2045 if (e != null) |
|
2046 execAsync(e, new AsyncWhenComplete<T>(t, ex, fn, dst)); |
|
2047 else |
|
2048 fn.accept(t, ex); |
|
2049 } catch (Throwable rex) { |
|
2050 dx = rex; |
|
2051 } |
|
2052 if (e == null || dx != null) |
|
2053 dst.internalComplete(t, ex != null ? ex : dx); |
|
2054 } |
|
2055 helpPostComplete(); |
|
2056 return dst; |
|
2057 } |
|
2058 |
|
2059 private <U> CompletableFuture<U> doHandle |
|
2060 (BiFunction<? super T, Throwable, ? extends U> fn, |
|
2061 Executor e) { |
|
2062 if (fn == null) throw new NullPointerException(); |
|
2063 CompletableFuture<U> dst = new CompletableFuture<U>(); |
|
2064 HandleCompletion<T,U> d = null; |
|
2065 Object r; |
|
2066 if ((r = result) == null) { |
|
2067 CompletionNode p = |
|
2068 new CompletionNode(d = new HandleCompletion<T,U> |
|
2069 (this, fn, dst, e)); |
|
2070 while ((r = result) == null) { |
|
2071 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, |
|
2072 p.next = completions, p)) |
|
2073 break; |
|
2074 } |
|
2075 } |
|
2076 if (r != null && (d == null || d.compareAndSet(0, 1))) { |
|
2077 T t; Throwable ex; |
|
2078 if (r instanceof AltResult) { |
|
2079 ex = ((AltResult)r).ex; |
|
2080 t = null; |
|
2081 } |
|
2082 else { |
|
2083 ex = null; |
|
2084 @SuppressWarnings("unchecked") T tr = (T) r; |
|
2085 t = tr; |
|
2086 } |
|
2087 U u = null; |
|
2088 Throwable dx = null; |
|
2089 try { |
|
2090 if (e != null) |
|
2091 execAsync(e, new AsyncCombine<T,Throwable,U>(t, ex, fn, dst)); |
|
2092 else { |
|
2093 u = fn.apply(t, ex); |
|
2094 dx = null; |
|
2095 } |
|
2096 } catch (Throwable rex) { |
|
2097 dx = rex; |
|
2098 u = null; |
|
2099 } |
|
2100 if (e == null || dx != null) |
|
2101 dst.internalComplete(u, dx); |
|
2102 } |
|
2103 helpPostComplete(); |
|
2104 return dst; |
|
2105 } |
|
2106 |
|
2107 |
|
2108 // public methods |
|
2109 |
1781 |
2110 /** |
1782 /** |
2111 * Creates a new incomplete CompletableFuture. |
1783 * Creates a new incomplete CompletableFuture. |
2112 */ |
1784 */ |
2113 public CompletableFuture() { |
1785 public CompletableFuture() { |
|
1786 } |
|
1787 |
|
1788 /** |
|
1789 * Creates a new complete CompletableFuture with given encoded result. |
|
1790 */ |
|
1791 private CompletableFuture(Object r) { |
|
1792 this.result = r; |
2114 } |
1793 } |
2115 |
1794 |
2116 /** |
1795 /** |
2117 * Returns a new CompletableFuture that is asynchronously completed |
1796 * Returns a new CompletableFuture that is asynchronously completed |
2118 * by a task running in the {@link ForkJoinPool#commonPool()} with |
1797 * by a task running in the {@link ForkJoinPool#commonPool()} with |
2354 * @return {@code true} if this invocation caused this CompletableFuture |
1963 * @return {@code true} if this invocation caused this CompletableFuture |
2355 * to transition to a completed state, else {@code false} |
1964 * to transition to a completed state, else {@code false} |
2356 */ |
1965 */ |
2357 public boolean completeExceptionally(Throwable ex) { |
1966 public boolean completeExceptionally(Throwable ex) { |
2358 if (ex == null) throw new NullPointerException(); |
1967 if (ex == null) throw new NullPointerException(); |
2359 boolean triggered = result == null && |
1968 boolean triggered = internalComplete(new AltResult(ex)); |
2360 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex)); |
|
2361 postComplete(); |
1969 postComplete(); |
2362 return triggered; |
1970 return triggered; |
2363 } |
1971 } |
2364 |
1972 |
2365 // CompletionStage methods |
1973 public <U> CompletableFuture<U> thenApply( |
2366 |
1974 Function<? super T,? extends U> fn) { |
2367 public <U> CompletableFuture<U> thenApply |
1975 return uniApplyStage(null, fn); |
2368 (Function<? super T,? extends U> fn) { |
1976 } |
2369 return doThenApply(fn, null); |
1977 |
2370 } |
1978 public <U> CompletableFuture<U> thenApplyAsync( |
2371 |
1979 Function<? super T,? extends U> fn) { |
2372 public <U> CompletableFuture<U> thenApplyAsync |
1980 return uniApplyStage(asyncPool, fn); |
2373 (Function<? super T,? extends U> fn) { |
1981 } |
2374 return doThenApply(fn, ForkJoinPool.commonPool()); |
1982 |
2375 } |
1983 public <U> CompletableFuture<U> thenApplyAsync( |
2376 |
1984 Function<? super T,? extends U> fn, Executor executor) { |
2377 public <U> CompletableFuture<U> thenApplyAsync |
1985 return uniApplyStage(screenExecutor(executor), fn); |
2378 (Function<? super T,? extends U> fn, |
1986 } |
2379 Executor executor) { |
1987 |
2380 if (executor == null) throw new NullPointerException(); |
1988 public CompletableFuture<Void> thenAccept(Consumer<? super T> action) { |
2381 return doThenApply(fn, executor); |
1989 return uniAcceptStage(null, action); |
2382 } |
1990 } |
2383 |
1991 |
2384 public CompletableFuture<Void> thenAccept |
1992 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) { |
2385 (Consumer<? super T> action) { |
1993 return uniAcceptStage(asyncPool, action); |
2386 return doThenAccept(action, null); |
1994 } |
2387 } |
1995 |
2388 |
1996 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, |
2389 public CompletableFuture<Void> thenAcceptAsync |
1997 Executor executor) { |
2390 (Consumer<? super T> action) { |
1998 return uniAcceptStage(screenExecutor(executor), action); |
2391 return doThenAccept(action, ForkJoinPool.commonPool()); |
1999 } |
2392 } |
2000 |
2393 |
2001 public CompletableFuture<Void> thenRun(Runnable action) { |
2394 public CompletableFuture<Void> thenAcceptAsync |
2002 return uniRunStage(null, action); |
2395 (Consumer<? super T> action, |
2003 } |
2396 Executor executor) { |
2004 |
2397 if (executor == null) throw new NullPointerException(); |
2005 public CompletableFuture<Void> thenRunAsync(Runnable action) { |
2398 return doThenAccept(action, executor); |
2006 return uniRunStage(asyncPool, action); |
2399 } |
2007 } |
2400 |
2008 |
2401 public CompletableFuture<Void> thenRun |
2009 public CompletableFuture<Void> thenRunAsync(Runnable action, |
2402 (Runnable action) { |
2010 Executor executor) { |
2403 return doThenRun(action, null); |
2011 return uniRunStage(screenExecutor(executor), action); |
2404 } |
2012 } |
2405 |
2013 |
2406 public CompletableFuture<Void> thenRunAsync |
2014 public <U,V> CompletableFuture<V> thenCombine( |
2407 (Runnable action) { |
2015 CompletionStage<? extends U> other, |
2408 return doThenRun(action, ForkJoinPool.commonPool()); |
2016 BiFunction<? super T,? super U,? extends V> fn) { |
2409 } |
2017 return biApplyStage(null, other, fn); |
2410 |
2018 } |
2411 public CompletableFuture<Void> thenRunAsync |
2019 |
2412 (Runnable action, |
2020 public <U,V> CompletableFuture<V> thenCombineAsync( |
2413 Executor executor) { |
2021 CompletionStage<? extends U> other, |
2414 if (executor == null) throw new NullPointerException(); |
2022 BiFunction<? super T,? super U,? extends V> fn) { |
2415 return doThenRun(action, executor); |
2023 return biApplyStage(asyncPool, other, fn); |
2416 } |
2024 } |
2417 |
2025 |
2418 public <U,V> CompletableFuture<V> thenCombine |
2026 public <U,V> CompletableFuture<V> thenCombineAsync( |
2419 (CompletionStage<? extends U> other, |
2027 CompletionStage<? extends U> other, |
2420 BiFunction<? super T,? super U,? extends V> fn) { |
2028 BiFunction<? super T,? super U,? extends V> fn, Executor executor) { |
2421 return doThenCombine(other.toCompletableFuture(), fn, null); |
2029 return biApplyStage(screenExecutor(executor), other, fn); |
2422 } |
2030 } |
2423 |
2031 |
2424 public <U,V> CompletableFuture<V> thenCombineAsync |
2032 public <U> CompletableFuture<Void> thenAcceptBoth( |
2425 (CompletionStage<? extends U> other, |
2033 CompletionStage<? extends U> other, |
2426 BiFunction<? super T,? super U,? extends V> fn) { |
2034 BiConsumer<? super T, ? super U> action) { |
2427 return doThenCombine(other.toCompletableFuture(), fn, |
2035 return biAcceptStage(null, other, action); |
2428 ForkJoinPool.commonPool()); |
2036 } |
2429 } |
2037 |
2430 |
2038 public <U> CompletableFuture<Void> thenAcceptBothAsync( |
2431 public <U,V> CompletableFuture<V> thenCombineAsync |
2039 CompletionStage<? extends U> other, |
2432 (CompletionStage<? extends U> other, |
2040 BiConsumer<? super T, ? super U> action) { |
2433 BiFunction<? super T,? super U,? extends V> fn, |
2041 return biAcceptStage(asyncPool, other, action); |
2434 Executor executor) { |
2042 } |
2435 if (executor == null) throw new NullPointerException(); |
2043 |
2436 return doThenCombine(other.toCompletableFuture(), fn, executor); |
2044 public <U> CompletableFuture<Void> thenAcceptBothAsync( |
2437 } |
2045 CompletionStage<? extends U> other, |
2438 |
2046 BiConsumer<? super T, ? super U> action, Executor executor) { |
2439 public <U> CompletableFuture<Void> thenAcceptBoth |
2047 return biAcceptStage(screenExecutor(executor), other, action); |
2440 (CompletionStage<? extends U> other, |
2048 } |
2441 BiConsumer<? super T, ? super U> action) { |
2049 |
2442 return doThenAcceptBoth(other.toCompletableFuture(), action, null); |
2050 public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, |
2443 } |
2051 Runnable action) { |
2444 |
2052 return biRunStage(null, other, action); |
2445 public <U> CompletableFuture<Void> thenAcceptBothAsync |
2053 } |
2446 (CompletionStage<? extends U> other, |
2054 |
2447 BiConsumer<? super T, ? super U> action) { |
2055 public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, |
2448 return doThenAcceptBoth(other.toCompletableFuture(), action, |
2056 Runnable action) { |
2449 ForkJoinPool.commonPool()); |
2057 return biRunStage(asyncPool, other, action); |
2450 } |
2058 } |
2451 |
2059 |
2452 public <U> CompletableFuture<Void> thenAcceptBothAsync |
2060 public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, |
2453 (CompletionStage<? extends U> other, |
2061 Runnable action, |
2454 BiConsumer<? super T, ? super U> action, |
2062 Executor executor) { |
2455 Executor executor) { |
2063 return biRunStage(screenExecutor(executor), other, action); |
2456 if (executor == null) throw new NullPointerException(); |
2064 } |
2457 return doThenAcceptBoth(other.toCompletableFuture(), action, executor); |
2065 |
2458 } |
2066 public <U> CompletableFuture<U> applyToEither( |
2459 |
2067 CompletionStage<? extends T> other, Function<? super T, U> fn) { |
2460 public CompletableFuture<Void> runAfterBoth |
2068 return orApplyStage(null, other, fn); |
2461 (CompletionStage<?> other, |
2069 } |
2462 Runnable action) { |
2070 |
2463 return doRunAfterBoth(other.toCompletableFuture(), action, null); |
2071 public <U> CompletableFuture<U> applyToEitherAsync( |
2464 } |
2072 CompletionStage<? extends T> other, Function<? super T, U> fn) { |
2465 |
2073 return orApplyStage(asyncPool, other, fn); |
2466 public CompletableFuture<Void> runAfterBothAsync |
2074 } |
2467 (CompletionStage<?> other, |
2075 |
2468 Runnable action) { |
2076 public <U> CompletableFuture<U> applyToEitherAsync( |
2469 return doRunAfterBoth(other.toCompletableFuture(), action, |
2077 CompletionStage<? extends T> other, Function<? super T, U> fn, |
2470 ForkJoinPool.commonPool()); |
2078 Executor executor) { |
2471 } |
2079 return orApplyStage(screenExecutor(executor), other, fn); |
2472 |
2080 } |
2473 public CompletableFuture<Void> runAfterBothAsync |
2081 |
2474 (CompletionStage<?> other, |
2082 public CompletableFuture<Void> acceptEither( |
2475 Runnable action, |
2083 CompletionStage<? extends T> other, Consumer<? super T> action) { |
2476 Executor executor) { |
2084 return orAcceptStage(null, other, action); |
2477 if (executor == null) throw new NullPointerException(); |
2085 } |
2478 return doRunAfterBoth(other.toCompletableFuture(), action, executor); |
2086 |
2479 } |
2087 public CompletableFuture<Void> acceptEitherAsync( |
2480 |
2088 CompletionStage<? extends T> other, Consumer<? super T> action) { |
2481 |
2089 return orAcceptStage(asyncPool, other, action); |
2482 public <U> CompletableFuture<U> applyToEither |
2090 } |
2483 (CompletionStage<? extends T> other, |
2091 |
2484 Function<? super T, U> fn) { |
2092 public CompletableFuture<Void> acceptEitherAsync( |
2485 return doApplyToEither(other.toCompletableFuture(), fn, null); |
2093 CompletionStage<? extends T> other, Consumer<? super T> action, |
2486 } |
2094 Executor executor) { |
2487 |
2095 return orAcceptStage(screenExecutor(executor), other, action); |
2488 public <U> CompletableFuture<U> applyToEitherAsync |
|
2489 (CompletionStage<? extends T> other, |
|
2490 Function<? super T, U> fn) { |
|
2491 return doApplyToEither(other.toCompletableFuture(), fn, |
|
2492 ForkJoinPool.commonPool()); |
|
2493 } |
|
2494 |
|
2495 public <U> CompletableFuture<U> applyToEitherAsync |
|
2496 (CompletionStage<? extends T> other, |
|
2497 Function<? super T, U> fn, |
|
2498 Executor executor) { |
|
2499 if (executor == null) throw new NullPointerException(); |
|
2500 return doApplyToEither(other.toCompletableFuture(), fn, executor); |
|
2501 } |
|
2502 |
|
2503 public CompletableFuture<Void> acceptEither |
|
2504 (CompletionStage<? extends T> other, |
|
2505 Consumer<? super T> action) { |
|
2506 return doAcceptEither(other.toCompletableFuture(), action, null); |
|
2507 } |
|
2508 |
|
2509 public CompletableFuture<Void> acceptEitherAsync |
|
2510 (CompletionStage<? extends T> other, |
|
2511 Consumer<? super T> action) { |
|
2512 return doAcceptEither(other.toCompletableFuture(), action, |
|
2513 ForkJoinPool.commonPool()); |
|
2514 } |
|
2515 |
|
2516 public CompletableFuture<Void> acceptEitherAsync |
|
2517 (CompletionStage<? extends T> other, |
|
2518 Consumer<? super T> action, |
|
2519 Executor executor) { |
|
2520 if (executor == null) throw new NullPointerException(); |
|
2521 return doAcceptEither(other.toCompletableFuture(), action, executor); |
|
2522 } |
2096 } |
2523 |
2097 |
2524 public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, |
2098 public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, |
2525 Runnable action) { |
2099 Runnable action) { |
2526 return doRunAfterEither(other.toCompletableFuture(), action, null); |
2100 return orRunStage(null, other, action); |
2527 } |
2101 } |
2528 |
2102 |
2529 public CompletableFuture<Void> runAfterEitherAsync |
2103 public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, |
2530 (CompletionStage<?> other, |
2104 Runnable action) { |
2531 Runnable action) { |
2105 return orRunStage(asyncPool, other, action); |
2532 return doRunAfterEither(other.toCompletableFuture(), action, |
2106 } |
2533 ForkJoinPool.commonPool()); |
2107 |
2534 } |
2108 public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, |
2535 |
2109 Runnable action, |
2536 public CompletableFuture<Void> runAfterEitherAsync |
2110 Executor executor) { |
2537 (CompletionStage<?> other, |
2111 return orRunStage(screenExecutor(executor), other, action); |
2538 Runnable action, |
2112 } |
2539 Executor executor) { |
2113 |
2540 if (executor == null) throw new NullPointerException(); |
2114 public <U> CompletableFuture<U> thenCompose( |
2541 return doRunAfterEither(other.toCompletableFuture(), action, executor); |
2115 Function<? super T, ? extends CompletionStage<U>> fn) { |
2542 } |
2116 return uniComposeStage(null, fn); |
2543 |
2117 } |
2544 public <U> CompletableFuture<U> thenCompose |
2118 |
2545 (Function<? super T, ? extends CompletionStage<U>> fn) { |
2119 public <U> CompletableFuture<U> thenComposeAsync( |
2546 return doThenCompose(fn, null); |
2120 Function<? super T, ? extends CompletionStage<U>> fn) { |
2547 } |
2121 return uniComposeStage(asyncPool, fn); |
2548 |
2122 } |
2549 public <U> CompletableFuture<U> thenComposeAsync |
2123 |
2550 (Function<? super T, ? extends CompletionStage<U>> fn) { |
2124 public <U> CompletableFuture<U> thenComposeAsync( |
2551 return doThenCompose(fn, ForkJoinPool.commonPool()); |
2125 Function<? super T, ? extends CompletionStage<U>> fn, |
2552 } |
2126 Executor executor) { |
2553 |
2127 return uniComposeStage(screenExecutor(executor), fn); |
2554 public <U> CompletableFuture<U> thenComposeAsync |
2128 } |
2555 (Function<? super T, ? extends CompletionStage<U>> fn, |
2129 |
2556 Executor executor) { |
2130 public CompletableFuture<T> whenComplete( |
2557 if (executor == null) throw new NullPointerException(); |
2131 BiConsumer<? super T, ? super Throwable> action) { |
2558 return doThenCompose(fn, executor); |
2132 return uniWhenCompleteStage(null, action); |
2559 } |
2133 } |
2560 |
2134 |
2561 public CompletableFuture<T> whenComplete |
2135 public CompletableFuture<T> whenCompleteAsync( |
2562 (BiConsumer<? super T, ? super Throwable> action) { |
2136 BiConsumer<? super T, ? super Throwable> action) { |
2563 return doWhenComplete(action, null); |
2137 return uniWhenCompleteStage(asyncPool, action); |
2564 } |
2138 } |
2565 |
2139 |
2566 public CompletableFuture<T> whenCompleteAsync |
2140 public CompletableFuture<T> whenCompleteAsync( |
2567 (BiConsumer<? super T, ? super Throwable> action) { |
2141 BiConsumer<? super T, ? super Throwable> action, Executor executor) { |
2568 return doWhenComplete(action, ForkJoinPool.commonPool()); |
2142 return uniWhenCompleteStage(screenExecutor(executor), action); |
2569 } |
2143 } |
2570 |
2144 |
2571 public CompletableFuture<T> whenCompleteAsync |
2145 public <U> CompletableFuture<U> handle( |
2572 (BiConsumer<? super T, ? super Throwable> action, |
2146 BiFunction<? super T, Throwable, ? extends U> fn) { |
2573 Executor executor) { |
2147 return uniHandleStage(null, fn); |
2574 if (executor == null) throw new NullPointerException(); |
2148 } |
2575 return doWhenComplete(action, executor); |
2149 |
2576 } |
2150 public <U> CompletableFuture<U> handleAsync( |
2577 |
2151 BiFunction<? super T, Throwable, ? extends U> fn) { |
2578 public <U> CompletableFuture<U> handle |
2152 return uniHandleStage(asyncPool, fn); |
2579 (BiFunction<? super T, Throwable, ? extends U> fn) { |
2153 } |
2580 return doHandle(fn, null); |
2154 |
2581 } |
2155 public <U> CompletableFuture<U> handleAsync( |
2582 |
2156 BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) { |
2583 public <U> CompletableFuture<U> handleAsync |
2157 return uniHandleStage(screenExecutor(executor), fn); |
2584 (BiFunction<? super T, Throwable, ? extends U> fn) { |
2158 } |
2585 return doHandle(fn, ForkJoinPool.commonPool()); |
2159 |
2586 } |
2160 /** |
2587 |
2161 * Returns this CompletableFuture. |
2588 public <U> CompletableFuture<U> handleAsync |
|
2589 (BiFunction<? super T, Throwable, ? extends U> fn, |
|
2590 Executor executor) { |
|
2591 if (executor == null) throw new NullPointerException(); |
|
2592 return doHandle(fn, executor); |
|
2593 } |
|
2594 |
|
2595 /** |
|
2596 * Returns this CompletableFuture |
|
2597 * |
2162 * |
2598 * @return this CompletableFuture |
2163 * @return this CompletableFuture |
2599 */ |
2164 */ |
2600 public CompletableFuture<T> toCompletableFuture() { |
2165 public CompletableFuture<T> toCompletableFuture() { |
2601 return this; |
2166 return this; |