|
1 /* |
|
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
|
3 * |
|
4 * This code is free software; you can redistribute it and/or modify it |
|
5 * under the terms of the GNU General Public License version 2 only, as |
|
6 * published by the Free Software Foundation. Oracle designates this |
|
7 * particular file as subject to the "Classpath" exception as provided |
|
8 * by Oracle in the LICENSE file that accompanied this code. |
|
9 * |
|
10 * This code is distributed in the hope that it will be useful, but WITHOUT |
|
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
|
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
|
13 * version 2 for more details (a copy is included in the LICENSE file that |
|
14 * accompanied this code). |
|
15 * |
|
16 * You should have received a copy of the GNU General Public License version |
|
17 * 2 along with this work; if not, write to the Free Software Foundation, |
|
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
|
19 * |
|
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
|
21 * or visit www.oracle.com if you need additional information or have any |
|
22 * questions. |
|
23 */ |
|
24 |
|
25 /* |
|
26 * This file is available under and governed by the GNU General Public |
|
27 * License version 2 only, as published by the Free Software Foundation. |
|
28 * However, the following notice accompanied the original version of this |
|
29 * file: |
|
30 * |
|
31 * Written by Doug Lea with assistance from members of JCP JSR-166 |
|
32 * Expert Group and released to the public domain, as explained at |
|
33 * http://creativecommons.org/publicdomain/zero/1.0/ |
|
34 */ |
|
35 |
|
36 package java.util.concurrent; |
|
37 import java.util.function.Supplier; |
|
38 import java.util.function.Consumer; |
|
39 import java.util.function.BiConsumer; |
|
40 import java.util.function.Function; |
|
41 import java.util.function.BiFunction; |
|
42 import java.util.concurrent.Future; |
|
43 import java.util.concurrent.TimeUnit; |
|
44 import java.util.concurrent.ForkJoinPool; |
|
45 import java.util.concurrent.ForkJoinTask; |
|
46 import java.util.concurrent.Executor; |
|
47 import java.util.concurrent.ThreadLocalRandom; |
|
48 import java.util.concurrent.ExecutionException; |
|
49 import java.util.concurrent.TimeoutException; |
|
50 import java.util.concurrent.CancellationException; |
|
51 import java.util.concurrent.atomic.AtomicInteger; |
|
52 import java.util.concurrent.locks.LockSupport; |
|
53 |
|
54 /** |
|
55 * A {@link Future} that may be explicitly completed (setting its |
|
56 * value and status), and may include dependent functions and actions |
|
57 * that trigger upon its completion. |
|
58 * |
|
59 * <p>When two or more threads attempt to |
|
60 * {@link #complete complete}, |
|
61 * {@link #completeExceptionally completeExceptionally}, or |
|
62 * {@link #cancel cancel} |
|
63 * a CompletableFuture, only one of them succeeds. |
|
64 * |
|
65 * <p>Methods are available for adding dependents based on |
|
66 * user-provided Functions, Consumers, or Runnables. The appropriate |
|
67 * form to use depends on whether actions require arguments and/or |
|
68 * produce results. Completion of a dependent action will trigger the |
|
69 * completion of another CompletableFuture. Actions may also be |
|
70 * triggered after either or both the current and another |
|
71 * CompletableFuture complete. Multiple CompletableFutures may also |
|
72 * be grouped as one using {@link #anyOf(CompletableFuture...)} and |
|
73 * {@link #allOf(CompletableFuture...)}. |
|
74 * |
|
75 * <p>CompletableFutures themselves do not execute asynchronously. |
|
76 * However, actions supplied for dependent completions of another |
|
77 * CompletableFuture may do so, depending on whether they are provided |
|
78 * via one of the <em>async</em> methods (that is, methods with names |
|
79 * of the form <tt><var>xxx</var>Async</tt>). The <em>async</em> |
|
80 * methods provide a way to commence asynchronous processing of an |
|
81 * action using either a given {@link Executor} or by default the |
|
82 * {@link ForkJoinPool#commonPool()}. To simplify monitoring, |
|
83 * debugging, and tracking, all generated asynchronous tasks are |
|
84 * instances of the marker interface {@link AsynchronousCompletionTask}. |
|
85 * |
|
86 * <p>Actions supplied for dependent completions of <em>non-async</em> |
|
87 * methods may be performed by the thread that completes the current |
|
88 * CompletableFuture, or by any other caller of these methods. There |
|
89 * are no guarantees about the order of processing completions unless |
|
90 * constrained by these methods. |
|
91 * |
|
92 * <p>Since (unlike {@link FutureTask}) this class has no direct |
|
93 * control over the computation that causes it to be completed, |
|
94 * cancellation is treated as just another form of exceptional completion. |
|
95 * Method {@link #cancel cancel} has the same effect as |
|
96 * {@code completeExceptionally(new CancellationException())}. |
|
97 * |
|
98 * <p>Upon exceptional completion (including cancellation), or when a |
|
99 * completion entails an additional computation which terminates |
|
100 * abruptly with an (unchecked) exception or error, then all of their |
|
101 * dependent completions (and their dependents in turn) generally act |
|
102 * as {@code completeExceptionally} with a {@link CompletionException} |
|
103 * holding that exception as its cause. However, the {@link |
|
104 * #exceptionally exceptionally} and {@link #handle handle} |
|
105 * completions <em>are</em> able to handle exceptional completions of |
|
106 * the CompletableFutures they depend on. |
|
107 * |
|
108 * <p>In case of exceptional completion with a CompletionException, |
|
109 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an |
|
110 * {@link ExecutionException} with the same cause as held in the |
|
111 * corresponding CompletionException. However, in these cases, |
|
112 * methods {@link #join()} and {@link #getNow} throw the |
|
113 * CompletionException, which simplifies usage. |
|
114 * |
|
115 * <p>Arguments used to pass a completion result (that is, for parameters |
|
116 * of type {@code T}) may be null, but passing a null value for any other |
|
117 * parameter will result in a {@link NullPointerException} being thrown. |
|
118 * |
|
119 * @author Doug Lea |
|
120 * @since 1.8 |
|
121 */ |
|
122 public class CompletableFuture<T> implements Future<T> { |
|
123 |
|
124 /* |
|
125 * Overview: |
|
126 * |
|
127 * 1. Non-nullness of field result (set via CAS) indicates done. |
|
128 * An AltResult is used to box null as a result, as well as to |
|
129 * hold exceptions. Using a single field makes completion fast |
|
130 * and simple to detect and trigger, at the expense of a lot of |
|
131 * encoding and decoding that infiltrates many methods. One minor |
|
132 * simplification relies on the (static) NIL (to box null results) |
|
133 * being the only AltResult with a null exception field, so we |
|
134 * don't usually need explicit comparisons with NIL. The CF |
|
135 * exception propagation mechanics surrounding decoding rely on |
|
136 * unchecked casts of decoded results really being unchecked, |
|
137 * where user type errors are caught at point of use, as is |
|
138 * currently the case in Java. These are highlighted by using |
|
139 * SuppressWarnings-annotated temporaries. |
|
140 * |
|
141 * 2. Waiters are held in a Treiber stack similar to the one used |
|
142 * in FutureTask, Phaser, and SynchronousQueue. See their |
|
143 * internal documentation for algorithmic details. |
|
144 * |
|
145 * 3. Completions are also kept in a list/stack, and pulled off |
|
146 * and run when completion is triggered. (We could even use the |
|
147 * same stack as for waiters, but would give up the potential |
|
148 * parallelism obtained because woken waiters help release/run |
|
149 * others -- see method postComplete). Because post-processing |
|
150 * may race with direct calls, class Completion opportunistically |
|
151 * extends AtomicInteger so callers can claim the action via |
|
152 * compareAndSet(0, 1). The Completion.run methods are all |
|
153 * written a boringly similar uniform way (that sometimes includes |
|
154 * unnecessary-looking checks, kept to maintain uniformity). |
|
155 * There are enough dimensions upon which they differ that |
|
156 * attempts to factor commonalities while maintaining efficiency |
|
157 * require more lines of code than they would save. |
|
158 * |
|
159 * 4. The exported then/and/or methods do support a bit of |
|
160 * factoring (see doThenApply etc). They must cope with the |
|
161 * intrinsic races surrounding addition of a dependent action |
|
162 * versus performing the action directly because the task is |
|
163 * already complete. For example, a CF may not be complete upon |
|
164 * entry, so a dependent completion is added, but by the time it |
|
165 * is added, the target CF is complete, so must be directly |
|
166 * executed. This is all done while avoiding unnecessary object |
|
167 * construction in safe-bypass cases. |
|
168 */ |
|
169 |
|
170 // preliminaries |
|
171 |
|
172 static final class AltResult { |
|
173 final Throwable ex; // null only for NIL |
|
174 AltResult(Throwable ex) { this.ex = ex; } |
|
175 } |
|
176 |
|
177 static final AltResult NIL = new AltResult(null); |
|
178 |
|
179 // Fields |
|
180 |
|
181 volatile Object result; // Either the result or boxed AltResult |
|
182 volatile WaitNode waiters; // Treiber stack of threads blocked on get() |
|
183 volatile CompletionNode completions; // list (Treiber stack) of completions |
|
184 |
|
185 // Basic utilities for triggering and processing completions |
|
186 |
|
187 /** |
|
188 * Removes and signals all waiting threads and runs all completions. |
|
189 */ |
|
190 final void postComplete() { |
|
191 WaitNode q; Thread t; |
|
192 while ((q = waiters) != null) { |
|
193 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) && |
|
194 (t = q.thread) != null) { |
|
195 q.thread = null; |
|
196 LockSupport.unpark(t); |
|
197 } |
|
198 } |
|
199 |
|
200 CompletionNode h; Completion c; |
|
201 while ((h = completions) != null) { |
|
202 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) && |
|
203 (c = h.completion) != null) |
|
204 c.run(); |
|
205 } |
|
206 } |
|
207 |
|
208 /** |
|
209 * Triggers completion with the encoding of the given arguments: |
|
210 * if the exception is non-null, encodes it as a wrapped |
|
211 * CompletionException unless it is one already. Otherwise uses |
|
212 * the given result, boxed as NIL if null. |
|
213 */ |
|
214 final void internalComplete(T v, Throwable ex) { |
|
215 if (result == null) |
|
216 UNSAFE.compareAndSwapObject |
|
217 (this, RESULT, null, |
|
218 (ex == null) ? (v == null) ? NIL : v : |
|
219 new AltResult((ex instanceof CompletionException) ? ex : |
|
220 new CompletionException(ex))); |
|
221 postComplete(); // help out even if not triggered |
|
222 } |
|
223 |
|
224 /** |
|
225 * If triggered, helps release and/or process completions. |
|
226 */ |
|
227 final void helpPostComplete() { |
|
228 if (result != null) |
|
229 postComplete(); |
|
230 } |
|
231 |
|
232 /* ------------- waiting for completions -------------- */ |
|
233 |
|
234 /** Number of processors, for spin control */ |
|
235 static final int NCPU = Runtime.getRuntime().availableProcessors(); |
|
236 |
|
237 /** |
|
238 * Heuristic spin value for waitingGet() before blocking on |
|
239 * multiprocessors |
|
240 */ |
|
241 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0; |
|
242 |
|
243 /** |
|
244 * Linked nodes to record waiting threads in a Treiber stack. See |
|
245 * other classes such as Phaser and SynchronousQueue for more |
|
246 * detailed explanation. This class implements ManagedBlocker to |
|
247 * avoid starvation when blocking actions pile up in |
|
248 * ForkJoinPools. |
|
249 */ |
|
250 static final class WaitNode implements ForkJoinPool.ManagedBlocker { |
|
251 long nanos; // wait time if timed |
|
252 final long deadline; // non-zero if timed |
|
253 volatile int interruptControl; // > 0: interruptible, < 0: interrupted |
|
254 volatile Thread thread; |
|
255 volatile WaitNode next; |
|
256 WaitNode(boolean interruptible, long nanos, long deadline) { |
|
257 this.thread = Thread.currentThread(); |
|
258 this.interruptControl = interruptible ? 1 : 0; |
|
259 this.nanos = nanos; |
|
260 this.deadline = deadline; |
|
261 } |
|
262 public boolean isReleasable() { |
|
263 if (thread == null) |
|
264 return true; |
|
265 if (Thread.interrupted()) { |
|
266 int i = interruptControl; |
|
267 interruptControl = -1; |
|
268 if (i > 0) |
|
269 return true; |
|
270 } |
|
271 if (deadline != 0L && |
|
272 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) { |
|
273 thread = null; |
|
274 return true; |
|
275 } |
|
276 return false; |
|
277 } |
|
278 public boolean block() { |
|
279 if (isReleasable()) |
|
280 return true; |
|
281 else if (deadline == 0L) |
|
282 LockSupport.park(this); |
|
283 else if (nanos > 0L) |
|
284 LockSupport.parkNanos(this, nanos); |
|
285 return isReleasable(); |
|
286 } |
|
287 } |
|
288 |
|
289 /** |
|
290 * Returns raw result after waiting, or null if interruptible and |
|
291 * interrupted. |
|
292 */ |
|
293 private Object waitingGet(boolean interruptible) { |
|
294 WaitNode q = null; |
|
295 boolean queued = false; |
|
296 int spins = SPINS; |
|
297 for (Object r;;) { |
|
298 if ((r = result) != null) { |
|
299 if (q != null) { // suppress unpark |
|
300 q.thread = null; |
|
301 if (q.interruptControl < 0) { |
|
302 if (interruptible) { |
|
303 removeWaiter(q); |
|
304 return null; |
|
305 } |
|
306 Thread.currentThread().interrupt(); |
|
307 } |
|
308 } |
|
309 postComplete(); // help release others |
|
310 return r; |
|
311 } |
|
312 else if (spins > 0) { |
|
313 int rnd = ThreadLocalRandom.nextSecondarySeed(); |
|
314 if (rnd == 0) |
|
315 rnd = ThreadLocalRandom.current().nextInt(); |
|
316 if (rnd >= 0) |
|
317 --spins; |
|
318 } |
|
319 else if (q == null) |
|
320 q = new WaitNode(interruptible, 0L, 0L); |
|
321 else if (!queued) |
|
322 queued = UNSAFE.compareAndSwapObject(this, WAITERS, |
|
323 q.next = waiters, q); |
|
324 else if (interruptible && q.interruptControl < 0) { |
|
325 removeWaiter(q); |
|
326 return null; |
|
327 } |
|
328 else if (q.thread != null && result == null) { |
|
329 try { |
|
330 ForkJoinPool.managedBlock(q); |
|
331 } catch (InterruptedException ex) { |
|
332 q.interruptControl = -1; |
|
333 } |
|
334 } |
|
335 } |
|
336 } |
|
337 |
|
338 /** |
|
339 * Awaits completion or aborts on interrupt or timeout. |
|
340 * |
|
341 * @param nanos time to wait |
|
342 * @return raw result |
|
343 */ |
|
344 private Object timedAwaitDone(long nanos) |
|
345 throws InterruptedException, TimeoutException { |
|
346 WaitNode q = null; |
|
347 boolean queued = false; |
|
348 for (Object r;;) { |
|
349 if ((r = result) != null) { |
|
350 if (q != null) { |
|
351 q.thread = null; |
|
352 if (q.interruptControl < 0) { |
|
353 removeWaiter(q); |
|
354 throw new InterruptedException(); |
|
355 } |
|
356 } |
|
357 postComplete(); |
|
358 return r; |
|
359 } |
|
360 else if (q == null) { |
|
361 if (nanos <= 0L) |
|
362 throw new TimeoutException(); |
|
363 long d = System.nanoTime() + nanos; |
|
364 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0 |
|
365 } |
|
366 else if (!queued) |
|
367 queued = UNSAFE.compareAndSwapObject(this, WAITERS, |
|
368 q.next = waiters, q); |
|
369 else if (q.interruptControl < 0) { |
|
370 removeWaiter(q); |
|
371 throw new InterruptedException(); |
|
372 } |
|
373 else if (q.nanos <= 0L) { |
|
374 if (result == null) { |
|
375 removeWaiter(q); |
|
376 throw new TimeoutException(); |
|
377 } |
|
378 } |
|
379 else if (q.thread != null && result == null) { |
|
380 try { |
|
381 ForkJoinPool.managedBlock(q); |
|
382 } catch (InterruptedException ex) { |
|
383 q.interruptControl = -1; |
|
384 } |
|
385 } |
|
386 } |
|
387 } |
|
388 |
|
389 /** |
|
390 * Tries to unlink a timed-out or interrupted wait node to avoid |
|
391 * accumulating garbage. Internal nodes are simply unspliced |
|
392 * without CAS since it is harmless if they are traversed anyway |
|
393 * by releasers. To avoid effects of unsplicing from already |
|
394 * removed nodes, the list is retraversed in case of an apparent |
|
395 * race. This is slow when there are a lot of nodes, but we don't |
|
396 * expect lists to be long enough to outweigh higher-overhead |
|
397 * schemes. |
|
398 */ |
|
399 private void removeWaiter(WaitNode node) { |
|
400 if (node != null) { |
|
401 node.thread = null; |
|
402 retry: |
|
403 for (;;) { // restart on removeWaiter race |
|
404 for (WaitNode pred = null, q = waiters, s; q != null; q = s) { |
|
405 s = q.next; |
|
406 if (q.thread != null) |
|
407 pred = q; |
|
408 else if (pred != null) { |
|
409 pred.next = s; |
|
410 if (pred.thread == null) // check for race |
|
411 continue retry; |
|
412 } |
|
413 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s)) |
|
414 continue retry; |
|
415 } |
|
416 break; |
|
417 } |
|
418 } |
|
419 } |
|
420 |
|
421 /* ------------- Async tasks -------------- */ |
|
422 |
|
423 /** |
|
424 * A marker interface identifying asynchronous tasks produced by |
|
425 * {@code async} methods. This may be useful for monitoring, |
|
426 * debugging, and tracking asynchronous activities. |
|
427 * |
|
428 * @since 1.8 |
|
429 */ |
|
430 public static interface AsynchronousCompletionTask { |
|
431 } |
|
432 |
|
433 /** Base class can act as either FJ or plain Runnable */ |
|
434 abstract static class Async extends ForkJoinTask<Void> |
|
435 implements Runnable, AsynchronousCompletionTask { |
|
436 public final Void getRawResult() { return null; } |
|
437 public final void setRawResult(Void v) { } |
|
438 public final void run() { exec(); } |
|
439 } |
|
440 |
|
441 static final class AsyncRun extends Async { |
|
442 final Runnable fn; |
|
443 final CompletableFuture<Void> dst; |
|
444 AsyncRun(Runnable fn, CompletableFuture<Void> dst) { |
|
445 this.fn = fn; this.dst = dst; |
|
446 } |
|
447 public final boolean exec() { |
|
448 CompletableFuture<Void> d; Throwable ex; |
|
449 if ((d = this.dst) != null && d.result == null) { |
|
450 try { |
|
451 fn.run(); |
|
452 ex = null; |
|
453 } catch (Throwable rex) { |
|
454 ex = rex; |
|
455 } |
|
456 d.internalComplete(null, ex); |
|
457 } |
|
458 return true; |
|
459 } |
|
460 private static final long serialVersionUID = 5232453952276885070L; |
|
461 } |
|
462 |
|
463 static final class AsyncSupply<U> extends Async { |
|
464 final Supplier<U> fn; |
|
465 final CompletableFuture<U> dst; |
|
466 AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) { |
|
467 this.fn = fn; this.dst = dst; |
|
468 } |
|
469 public final boolean exec() { |
|
470 CompletableFuture<U> d; U u; Throwable ex; |
|
471 if ((d = this.dst) != null && d.result == null) { |
|
472 try { |
|
473 u = fn.get(); |
|
474 ex = null; |
|
475 } catch (Throwable rex) { |
|
476 ex = rex; |
|
477 u = null; |
|
478 } |
|
479 d.internalComplete(u, ex); |
|
480 } |
|
481 return true; |
|
482 } |
|
483 private static final long serialVersionUID = 5232453952276885070L; |
|
484 } |
|
485 |
|
486 static final class AsyncApply<T,U> extends Async { |
|
487 final T arg; |
|
488 final Function<? super T,? extends U> fn; |
|
489 final CompletableFuture<U> dst; |
|
490 AsyncApply(T arg, Function<? super T,? extends U> fn, |
|
491 CompletableFuture<U> dst) { |
|
492 this.arg = arg; this.fn = fn; this.dst = dst; |
|
493 } |
|
494 public final boolean exec() { |
|
495 CompletableFuture<U> d; U u; Throwable ex; |
|
496 if ((d = this.dst) != null && d.result == null) { |
|
497 try { |
|
498 u = fn.apply(arg); |
|
499 ex = null; |
|
500 } catch (Throwable rex) { |
|
501 ex = rex; |
|
502 u = null; |
|
503 } |
|
504 d.internalComplete(u, ex); |
|
505 } |
|
506 return true; |
|
507 } |
|
508 private static final long serialVersionUID = 5232453952276885070L; |
|
509 } |
|
510 |
|
511 static final class AsyncCombine<T,U,V> extends Async { |
|
512 final T arg1; |
|
513 final U arg2; |
|
514 final BiFunction<? super T,? super U,? extends V> fn; |
|
515 final CompletableFuture<V> dst; |
|
516 AsyncCombine(T arg1, U arg2, |
|
517 BiFunction<? super T,? super U,? extends V> fn, |
|
518 CompletableFuture<V> dst) { |
|
519 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst; |
|
520 } |
|
521 public final boolean exec() { |
|
522 CompletableFuture<V> d; V v; Throwable ex; |
|
523 if ((d = this.dst) != null && d.result == null) { |
|
524 try { |
|
525 v = fn.apply(arg1, arg2); |
|
526 ex = null; |
|
527 } catch (Throwable rex) { |
|
528 ex = rex; |
|
529 v = null; |
|
530 } |
|
531 d.internalComplete(v, ex); |
|
532 } |
|
533 return true; |
|
534 } |
|
535 private static final long serialVersionUID = 5232453952276885070L; |
|
536 } |
|
537 |
|
538 static final class AsyncAccept<T> extends Async { |
|
539 final T arg; |
|
540 final Consumer<? super T> fn; |
|
541 final CompletableFuture<Void> dst; |
|
542 AsyncAccept(T arg, Consumer<? super T> fn, |
|
543 CompletableFuture<Void> dst) { |
|
544 this.arg = arg; this.fn = fn; this.dst = dst; |
|
545 } |
|
546 public final boolean exec() { |
|
547 CompletableFuture<Void> d; Throwable ex; |
|
548 if ((d = this.dst) != null && d.result == null) { |
|
549 try { |
|
550 fn.accept(arg); |
|
551 ex = null; |
|
552 } catch (Throwable rex) { |
|
553 ex = rex; |
|
554 } |
|
555 d.internalComplete(null, ex); |
|
556 } |
|
557 return true; |
|
558 } |
|
559 private static final long serialVersionUID = 5232453952276885070L; |
|
560 } |
|
561 |
|
562 static final class AsyncAcceptBoth<T,U> extends Async { |
|
563 final T arg1; |
|
564 final U arg2; |
|
565 final BiConsumer<? super T,? super U> fn; |
|
566 final CompletableFuture<Void> dst; |
|
567 AsyncAcceptBoth(T arg1, U arg2, |
|
568 BiConsumer<? super T,? super U> fn, |
|
569 CompletableFuture<Void> dst) { |
|
570 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst; |
|
571 } |
|
572 public final boolean exec() { |
|
573 CompletableFuture<Void> d; Throwable ex; |
|
574 if ((d = this.dst) != null && d.result == null) { |
|
575 try { |
|
576 fn.accept(arg1, arg2); |
|
577 ex = null; |
|
578 } catch (Throwable rex) { |
|
579 ex = rex; |
|
580 } |
|
581 d.internalComplete(null, ex); |
|
582 } |
|
583 return true; |
|
584 } |
|
585 private static final long serialVersionUID = 5232453952276885070L; |
|
586 } |
|
587 |
|
588 static final class AsyncCompose<T,U> extends Async { |
|
589 final T arg; |
|
590 final Function<? super T, CompletableFuture<U>> fn; |
|
591 final CompletableFuture<U> dst; |
|
592 AsyncCompose(T arg, |
|
593 Function<? super T, CompletableFuture<U>> fn, |
|
594 CompletableFuture<U> dst) { |
|
595 this.arg = arg; this.fn = fn; this.dst = dst; |
|
596 } |
|
597 public final boolean exec() { |
|
598 CompletableFuture<U> d, fr; U u; Throwable ex; |
|
599 if ((d = this.dst) != null && d.result == null) { |
|
600 try { |
|
601 fr = fn.apply(arg); |
|
602 ex = (fr == null) ? new NullPointerException() : null; |
|
603 } catch (Throwable rex) { |
|
604 ex = rex; |
|
605 fr = null; |
|
606 } |
|
607 if (ex != null) |
|
608 u = null; |
|
609 else { |
|
610 Object r = fr.result; |
|
611 if (r == null) |
|
612 r = fr.waitingGet(false); |
|
613 if (r instanceof AltResult) { |
|
614 ex = ((AltResult)r).ex; |
|
615 u = null; |
|
616 } |
|
617 else { |
|
618 @SuppressWarnings("unchecked") U ur = (U) r; |
|
619 u = ur; |
|
620 } |
|
621 } |
|
622 d.internalComplete(u, ex); |
|
623 } |
|
624 return true; |
|
625 } |
|
626 private static final long serialVersionUID = 5232453952276885070L; |
|
627 } |
|
628 |
|
629 /* ------------- Completions -------------- */ |
|
630 |
|
631 /** |
|
632 * Simple linked list nodes to record completions, used in |
|
633 * basically the same way as WaitNodes. (We separate nodes from |
|
634 * the Completions themselves mainly because for the And and Or |
|
635 * methods, the same Completion object resides in two lists.) |
|
636 */ |
|
637 static final class CompletionNode { |
|
638 final Completion completion; |
|
639 volatile CompletionNode next; |
|
640 CompletionNode(Completion completion) { this.completion = completion; } |
|
641 } |
|
642 |
|
643 // Opportunistically subclass AtomicInteger to use compareAndSet to claim. |
|
644 abstract static class Completion extends AtomicInteger implements Runnable { |
|
645 } |
|
646 |
|
647 static final class ThenApply<T,U> extends Completion { |
|
648 final CompletableFuture<? extends T> src; |
|
649 final Function<? super T,? extends U> fn; |
|
650 final CompletableFuture<U> dst; |
|
651 final Executor executor; |
|
652 ThenApply(CompletableFuture<? extends T> src, |
|
653 Function<? super T,? extends U> fn, |
|
654 CompletableFuture<U> dst, |
|
655 Executor executor) { |
|
656 this.src = src; this.fn = fn; this.dst = dst; |
|
657 this.executor = executor; |
|
658 } |
|
659 public final void run() { |
|
660 final CompletableFuture<? extends T> a; |
|
661 final Function<? super T,? extends U> fn; |
|
662 final CompletableFuture<U> dst; |
|
663 Object r; T t; Throwable ex; |
|
664 if ((dst = this.dst) != null && |
|
665 (fn = this.fn) != null && |
|
666 (a = this.src) != null && |
|
667 (r = a.result) != null && |
|
668 compareAndSet(0, 1)) { |
|
669 if (r instanceof AltResult) { |
|
670 ex = ((AltResult)r).ex; |
|
671 t = null; |
|
672 } |
|
673 else { |
|
674 ex = null; |
|
675 @SuppressWarnings("unchecked") T tr = (T) r; |
|
676 t = tr; |
|
677 } |
|
678 Executor e = executor; |
|
679 U u = null; |
|
680 if (ex == null) { |
|
681 try { |
|
682 if (e != null) |
|
683 e.execute(new AsyncApply<T,U>(t, fn, dst)); |
|
684 else |
|
685 u = fn.apply(t); |
|
686 } catch (Throwable rex) { |
|
687 ex = rex; |
|
688 } |
|
689 } |
|
690 if (e == null || ex != null) |
|
691 dst.internalComplete(u, ex); |
|
692 } |
|
693 } |
|
694 private static final long serialVersionUID = 5232453952276885070L; |
|
695 } |
|
696 |
|
697 static final class ThenAccept<T> extends Completion { |
|
698 final CompletableFuture<? extends T> src; |
|
699 final Consumer<? super T> fn; |
|
700 final CompletableFuture<Void> dst; |
|
701 final Executor executor; |
|
702 ThenAccept(CompletableFuture<? extends T> src, |
|
703 Consumer<? super T> fn, |
|
704 CompletableFuture<Void> dst, |
|
705 Executor executor) { |
|
706 this.src = src; this.fn = fn; this.dst = dst; |
|
707 this.executor = executor; |
|
708 } |
|
709 public final void run() { |
|
710 final CompletableFuture<? extends T> a; |
|
711 final Consumer<? super T> fn; |
|
712 final CompletableFuture<Void> dst; |
|
713 Object r; T t; Throwable ex; |
|
714 if ((dst = this.dst) != null && |
|
715 (fn = this.fn) != null && |
|
716 (a = this.src) != null && |
|
717 (r = a.result) != null && |
|
718 compareAndSet(0, 1)) { |
|
719 if (r instanceof AltResult) { |
|
720 ex = ((AltResult)r).ex; |
|
721 t = null; |
|
722 } |
|
723 else { |
|
724 ex = null; |
|
725 @SuppressWarnings("unchecked") T tr = (T) r; |
|
726 t = tr; |
|
727 } |
|
728 Executor e = executor; |
|
729 if (ex == null) { |
|
730 try { |
|
731 if (e != null) |
|
732 e.execute(new AsyncAccept<T>(t, fn, dst)); |
|
733 else |
|
734 fn.accept(t); |
|
735 } catch (Throwable rex) { |
|
736 ex = rex; |
|
737 } |
|
738 } |
|
739 if (e == null || ex != null) |
|
740 dst.internalComplete(null, ex); |
|
741 } |
|
742 } |
|
743 private static final long serialVersionUID = 5232453952276885070L; |
|
744 } |
|
745 |
|
746 static final class ThenRun extends Completion { |
|
747 final CompletableFuture<?> src; |
|
748 final Runnable fn; |
|
749 final CompletableFuture<Void> dst; |
|
750 final Executor executor; |
|
751 ThenRun(CompletableFuture<?> src, |
|
752 Runnable fn, |
|
753 CompletableFuture<Void> dst, |
|
754 Executor executor) { |
|
755 this.src = src; this.fn = fn; this.dst = dst; |
|
756 this.executor = executor; |
|
757 } |
|
758 public final void run() { |
|
759 final CompletableFuture<?> a; |
|
760 final Runnable fn; |
|
761 final CompletableFuture<Void> dst; |
|
762 Object r; Throwable ex; |
|
763 if ((dst = this.dst) != null && |
|
764 (fn = this.fn) != null && |
|
765 (a = this.src) != null && |
|
766 (r = a.result) != null && |
|
767 compareAndSet(0, 1)) { |
|
768 if (r instanceof AltResult) |
|
769 ex = ((AltResult)r).ex; |
|
770 else |
|
771 ex = null; |
|
772 Executor e = executor; |
|
773 if (ex == null) { |
|
774 try { |
|
775 if (e != null) |
|
776 e.execute(new AsyncRun(fn, dst)); |
|
777 else |
|
778 fn.run(); |
|
779 } catch (Throwable rex) { |
|
780 ex = rex; |
|
781 } |
|
782 } |
|
783 if (e == null || ex != null) |
|
784 dst.internalComplete(null, ex); |
|
785 } |
|
786 } |
|
787 private static final long serialVersionUID = 5232453952276885070L; |
|
788 } |
|
789 |
|
790 static final class ThenCombine<T,U,V> extends Completion { |
|
791 final CompletableFuture<? extends T> src; |
|
792 final CompletableFuture<? extends U> snd; |
|
793 final BiFunction<? super T,? super U,? extends V> fn; |
|
794 final CompletableFuture<V> dst; |
|
795 final Executor executor; |
|
796 ThenCombine(CompletableFuture<? extends T> src, |
|
797 CompletableFuture<? extends U> snd, |
|
798 BiFunction<? super T,? super U,? extends V> fn, |
|
799 CompletableFuture<V> dst, |
|
800 Executor executor) { |
|
801 this.src = src; this.snd = snd; |
|
802 this.fn = fn; this.dst = dst; |
|
803 this.executor = executor; |
|
804 } |
|
805 public final void run() { |
|
806 final CompletableFuture<? extends T> a; |
|
807 final CompletableFuture<? extends U> b; |
|
808 final BiFunction<? super T,? super U,? extends V> fn; |
|
809 final CompletableFuture<V> dst; |
|
810 Object r, s; T t; U u; Throwable ex; |
|
811 if ((dst = this.dst) != null && |
|
812 (fn = this.fn) != null && |
|
813 (a = this.src) != null && |
|
814 (r = a.result) != null && |
|
815 (b = this.snd) != null && |
|
816 (s = b.result) != null && |
|
817 compareAndSet(0, 1)) { |
|
818 if (r instanceof AltResult) { |
|
819 ex = ((AltResult)r).ex; |
|
820 t = null; |
|
821 } |
|
822 else { |
|
823 ex = null; |
|
824 @SuppressWarnings("unchecked") T tr = (T) r; |
|
825 t = tr; |
|
826 } |
|
827 if (ex != null) |
|
828 u = null; |
|
829 else if (s instanceof AltResult) { |
|
830 ex = ((AltResult)s).ex; |
|
831 u = null; |
|
832 } |
|
833 else { |
|
834 @SuppressWarnings("unchecked") U us = (U) s; |
|
835 u = us; |
|
836 } |
|
837 Executor e = executor; |
|
838 V v = null; |
|
839 if (ex == null) { |
|
840 try { |
|
841 if (e != null) |
|
842 e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst)); |
|
843 else |
|
844 v = fn.apply(t, u); |
|
845 } catch (Throwable rex) { |
|
846 ex = rex; |
|
847 } |
|
848 } |
|
849 if (e == null || ex != null) |
|
850 dst.internalComplete(v, ex); |
|
851 } |
|
852 } |
|
853 private static final long serialVersionUID = 5232453952276885070L; |
|
854 } |
|
855 |
|
856 static final class ThenAcceptBoth<T,U> extends Completion { |
|
857 final CompletableFuture<? extends T> src; |
|
858 final CompletableFuture<? extends U> snd; |
|
859 final BiConsumer<? super T,? super U> fn; |
|
860 final CompletableFuture<Void> dst; |
|
861 final Executor executor; |
|
862 ThenAcceptBoth(CompletableFuture<? extends T> src, |
|
863 CompletableFuture<? extends U> snd, |
|
864 BiConsumer<? super T,? super U> fn, |
|
865 CompletableFuture<Void> dst, |
|
866 Executor executor) { |
|
867 this.src = src; this.snd = snd; |
|
868 this.fn = fn; this.dst = dst; |
|
869 this.executor = executor; |
|
870 } |
|
871 public final void run() { |
|
872 final CompletableFuture<? extends T> a; |
|
873 final CompletableFuture<? extends U> b; |
|
874 final BiConsumer<? super T,? super U> fn; |
|
875 final CompletableFuture<Void> dst; |
|
876 Object r, s; T t; U u; Throwable ex; |
|
877 if ((dst = this.dst) != null && |
|
878 (fn = this.fn) != null && |
|
879 (a = this.src) != null && |
|
880 (r = a.result) != null && |
|
881 (b = this.snd) != null && |
|
882 (s = b.result) != null && |
|
883 compareAndSet(0, 1)) { |
|
884 if (r instanceof AltResult) { |
|
885 ex = ((AltResult)r).ex; |
|
886 t = null; |
|
887 } |
|
888 else { |
|
889 ex = null; |
|
890 @SuppressWarnings("unchecked") T tr = (T) r; |
|
891 t = tr; |
|
892 } |
|
893 if (ex != null) |
|
894 u = null; |
|
895 else if (s instanceof AltResult) { |
|
896 ex = ((AltResult)s).ex; |
|
897 u = null; |
|
898 } |
|
899 else { |
|
900 @SuppressWarnings("unchecked") U us = (U) s; |
|
901 u = us; |
|
902 } |
|
903 Executor e = executor; |
|
904 if (ex == null) { |
|
905 try { |
|
906 if (e != null) |
|
907 e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst)); |
|
908 else |
|
909 fn.accept(t, u); |
|
910 } catch (Throwable rex) { |
|
911 ex = rex; |
|
912 } |
|
913 } |
|
914 if (e == null || ex != null) |
|
915 dst.internalComplete(null, ex); |
|
916 } |
|
917 } |
|
918 private static final long serialVersionUID = 5232453952276885070L; |
|
919 } |
|
920 |
|
921 static final class RunAfterBoth extends Completion { |
|
922 final CompletableFuture<?> src; |
|
923 final CompletableFuture<?> snd; |
|
924 final Runnable fn; |
|
925 final CompletableFuture<Void> dst; |
|
926 final Executor executor; |
|
927 RunAfterBoth(CompletableFuture<?> src, |
|
928 CompletableFuture<?> snd, |
|
929 Runnable fn, |
|
930 CompletableFuture<Void> dst, |
|
931 Executor executor) { |
|
932 this.src = src; this.snd = snd; |
|
933 this.fn = fn; this.dst = dst; |
|
934 this.executor = executor; |
|
935 } |
|
936 public final void run() { |
|
937 final CompletableFuture<?> a; |
|
938 final CompletableFuture<?> b; |
|
939 final Runnable fn; |
|
940 final CompletableFuture<Void> dst; |
|
941 Object r, s; Throwable ex; |
|
942 if ((dst = this.dst) != null && |
|
943 (fn = this.fn) != null && |
|
944 (a = this.src) != null && |
|
945 (r = a.result) != null && |
|
946 (b = this.snd) != null && |
|
947 (s = b.result) != null && |
|
948 compareAndSet(0, 1)) { |
|
949 if (r instanceof AltResult) |
|
950 ex = ((AltResult)r).ex; |
|
951 else |
|
952 ex = null; |
|
953 if (ex == null && (s instanceof AltResult)) |
|
954 ex = ((AltResult)s).ex; |
|
955 Executor e = executor; |
|
956 if (ex == null) { |
|
957 try { |
|
958 if (e != null) |
|
959 e.execute(new AsyncRun(fn, dst)); |
|
960 else |
|
961 fn.run(); |
|
962 } catch (Throwable rex) { |
|
963 ex = rex; |
|
964 } |
|
965 } |
|
966 if (e == null || ex != null) |
|
967 dst.internalComplete(null, ex); |
|
968 } |
|
969 } |
|
970 private static final long serialVersionUID = 5232453952276885070L; |
|
971 } |
|
972 |
|
973 static final class AndCompletion extends Completion { |
|
974 final CompletableFuture<?> src; |
|
975 final CompletableFuture<?> snd; |
|
976 final CompletableFuture<Void> dst; |
|
977 AndCompletion(CompletableFuture<?> src, |
|
978 CompletableFuture<?> snd, |
|
979 CompletableFuture<Void> dst) { |
|
980 this.src = src; this.snd = snd; this.dst = dst; |
|
981 } |
|
982 public final void run() { |
|
983 final CompletableFuture<?> a; |
|
984 final CompletableFuture<?> b; |
|
985 final CompletableFuture<Void> dst; |
|
986 Object r, s; Throwable ex; |
|
987 if ((dst = this.dst) != null && |
|
988 (a = this.src) != null && |
|
989 (r = a.result) != null && |
|
990 (b = this.snd) != null && |
|
991 (s = b.result) != null && |
|
992 compareAndSet(0, 1)) { |
|
993 if (r instanceof AltResult) |
|
994 ex = ((AltResult)r).ex; |
|
995 else |
|
996 ex = null; |
|
997 if (ex == null && (s instanceof AltResult)) |
|
998 ex = ((AltResult)s).ex; |
|
999 dst.internalComplete(null, ex); |
|
1000 } |
|
1001 } |
|
1002 private static final long serialVersionUID = 5232453952276885070L; |
|
1003 } |
|
1004 |
|
1005 static final class ApplyToEither<T,U> extends Completion { |
|
1006 final CompletableFuture<? extends T> src; |
|
1007 final CompletableFuture<? extends T> snd; |
|
1008 final Function<? super T,? extends U> fn; |
|
1009 final CompletableFuture<U> dst; |
|
1010 final Executor executor; |
|
1011 ApplyToEither(CompletableFuture<? extends T> src, |
|
1012 CompletableFuture<? extends T> snd, |
|
1013 Function<? super T,? extends U> fn, |
|
1014 CompletableFuture<U> dst, |
|
1015 Executor executor) { |
|
1016 this.src = src; this.snd = snd; |
|
1017 this.fn = fn; this.dst = dst; |
|
1018 this.executor = executor; |
|
1019 } |
|
1020 public final void run() { |
|
1021 final CompletableFuture<? extends T> a; |
|
1022 final CompletableFuture<? extends T> b; |
|
1023 final Function<? super T,? extends U> fn; |
|
1024 final CompletableFuture<U> dst; |
|
1025 Object r; T t; Throwable ex; |
|
1026 if ((dst = this.dst) != null && |
|
1027 (fn = this.fn) != null && |
|
1028 (((a = this.src) != null && (r = a.result) != null) || |
|
1029 ((b = this.snd) != null && (r = b.result) != null)) && |
|
1030 compareAndSet(0, 1)) { |
|
1031 if (r instanceof AltResult) { |
|
1032 ex = ((AltResult)r).ex; |
|
1033 t = null; |
|
1034 } |
|
1035 else { |
|
1036 ex = null; |
|
1037 @SuppressWarnings("unchecked") T tr = (T) r; |
|
1038 t = tr; |
|
1039 } |
|
1040 Executor e = executor; |
|
1041 U u = null; |
|
1042 if (ex == null) { |
|
1043 try { |
|
1044 if (e != null) |
|
1045 e.execute(new AsyncApply<T,U>(t, fn, dst)); |
|
1046 else |
|
1047 u = fn.apply(t); |
|
1048 } catch (Throwable rex) { |
|
1049 ex = rex; |
|
1050 } |
|
1051 } |
|
1052 if (e == null || ex != null) |
|
1053 dst.internalComplete(u, ex); |
|
1054 } |
|
1055 } |
|
1056 private static final long serialVersionUID = 5232453952276885070L; |
|
1057 } |
|
1058 |
|
1059 static final class AcceptEither<T> extends Completion { |
|
1060 final CompletableFuture<? extends T> src; |
|
1061 final CompletableFuture<? extends T> snd; |
|
1062 final Consumer<? super T> fn; |
|
1063 final CompletableFuture<Void> dst; |
|
1064 final Executor executor; |
|
1065 AcceptEither(CompletableFuture<? extends T> src, |
|
1066 CompletableFuture<? extends T> snd, |
|
1067 Consumer<? super T> fn, |
|
1068 CompletableFuture<Void> dst, |
|
1069 Executor executor) { |
|
1070 this.src = src; this.snd = snd; |
|
1071 this.fn = fn; this.dst = dst; |
|
1072 this.executor = executor; |
|
1073 } |
|
1074 public final void run() { |
|
1075 final CompletableFuture<? extends T> a; |
|
1076 final CompletableFuture<? extends T> b; |
|
1077 final Consumer<? super T> fn; |
|
1078 final CompletableFuture<Void> dst; |
|
1079 Object r; T t; Throwable ex; |
|
1080 if ((dst = this.dst) != null && |
|
1081 (fn = this.fn) != null && |
|
1082 (((a = this.src) != null && (r = a.result) != null) || |
|
1083 ((b = this.snd) != null && (r = b.result) != null)) && |
|
1084 compareAndSet(0, 1)) { |
|
1085 if (r instanceof AltResult) { |
|
1086 ex = ((AltResult)r).ex; |
|
1087 t = null; |
|
1088 } |
|
1089 else { |
|
1090 ex = null; |
|
1091 @SuppressWarnings("unchecked") T tr = (T) r; |
|
1092 t = tr; |
|
1093 } |
|
1094 Executor e = executor; |
|
1095 if (ex == null) { |
|
1096 try { |
|
1097 if (e != null) |
|
1098 e.execute(new AsyncAccept<T>(t, fn, dst)); |
|
1099 else |
|
1100 fn.accept(t); |
|
1101 } catch (Throwable rex) { |
|
1102 ex = rex; |
|
1103 } |
|
1104 } |
|
1105 if (e == null || ex != null) |
|
1106 dst.internalComplete(null, ex); |
|
1107 } |
|
1108 } |
|
1109 private static final long serialVersionUID = 5232453952276885070L; |
|
1110 } |
|
1111 |
|
1112 static final class RunAfterEither extends Completion { |
|
1113 final CompletableFuture<?> src; |
|
1114 final CompletableFuture<?> snd; |
|
1115 final Runnable fn; |
|
1116 final CompletableFuture<Void> dst; |
|
1117 final Executor executor; |
|
1118 RunAfterEither(CompletableFuture<?> src, |
|
1119 CompletableFuture<?> snd, |
|
1120 Runnable fn, |
|
1121 CompletableFuture<Void> dst, |
|
1122 Executor executor) { |
|
1123 this.src = src; this.snd = snd; |
|
1124 this.fn = fn; this.dst = dst; |
|
1125 this.executor = executor; |
|
1126 } |
|
1127 public final void run() { |
|
1128 final CompletableFuture<?> a; |
|
1129 final CompletableFuture<?> b; |
|
1130 final Runnable fn; |
|
1131 final CompletableFuture<Void> dst; |
|
1132 Object r; Throwable ex; |
|
1133 if ((dst = this.dst) != null && |
|
1134 (fn = this.fn) != null && |
|
1135 (((a = this.src) != null && (r = a.result) != null) || |
|
1136 ((b = this.snd) != null && (r = b.result) != null)) && |
|
1137 compareAndSet(0, 1)) { |
|
1138 if (r instanceof AltResult) |
|
1139 ex = ((AltResult)r).ex; |
|
1140 else |
|
1141 ex = null; |
|
1142 Executor e = executor; |
|
1143 if (ex == null) { |
|
1144 try { |
|
1145 if (e != null) |
|
1146 e.execute(new AsyncRun(fn, dst)); |
|
1147 else |
|
1148 fn.run(); |
|
1149 } catch (Throwable rex) { |
|
1150 ex = rex; |
|
1151 } |
|
1152 } |
|
1153 if (e == null || ex != null) |
|
1154 dst.internalComplete(null, ex); |
|
1155 } |
|
1156 } |
|
1157 private static final long serialVersionUID = 5232453952276885070L; |
|
1158 } |
|
1159 |
|
1160 static final class OrCompletion extends Completion { |
|
1161 final CompletableFuture<?> src; |
|
1162 final CompletableFuture<?> snd; |
|
1163 final CompletableFuture<Object> dst; |
|
1164 OrCompletion(CompletableFuture<?> src, |
|
1165 CompletableFuture<?> snd, |
|
1166 CompletableFuture<Object> dst) { |
|
1167 this.src = src; this.snd = snd; this.dst = dst; |
|
1168 } |
|
1169 public final void run() { |
|
1170 final CompletableFuture<?> a; |
|
1171 final CompletableFuture<?> b; |
|
1172 final CompletableFuture<Object> dst; |
|
1173 Object r, t; Throwable ex; |
|
1174 if ((dst = this.dst) != null && |
|
1175 (((a = this.src) != null && (r = a.result) != null) || |
|
1176 ((b = this.snd) != null && (r = b.result) != null)) && |
|
1177 compareAndSet(0, 1)) { |
|
1178 if (r instanceof AltResult) { |
|
1179 ex = ((AltResult)r).ex; |
|
1180 t = null; |
|
1181 } |
|
1182 else { |
|
1183 ex = null; |
|
1184 t = r; |
|
1185 } |
|
1186 dst.internalComplete(t, ex); |
|
1187 } |
|
1188 } |
|
1189 private static final long serialVersionUID = 5232453952276885070L; |
|
1190 } |
|
1191 |
|
1192 static final class ExceptionCompletion<T> extends Completion { |
|
1193 final CompletableFuture<? extends T> src; |
|
1194 final Function<? super Throwable, ? extends T> fn; |
|
1195 final CompletableFuture<T> dst; |
|
1196 ExceptionCompletion(CompletableFuture<? extends T> src, |
|
1197 Function<? super Throwable, ? extends T> fn, |
|
1198 CompletableFuture<T> dst) { |
|
1199 this.src = src; this.fn = fn; this.dst = dst; |
|
1200 } |
|
1201 public final void run() { |
|
1202 final CompletableFuture<? extends T> a; |
|
1203 final Function<? super Throwable, ? extends T> fn; |
|
1204 final CompletableFuture<T> dst; |
|
1205 Object r; T t = null; Throwable ex, dx = null; |
|
1206 if ((dst = this.dst) != null && |
|
1207 (fn = this.fn) != null && |
|
1208 (a = this.src) != null && |
|
1209 (r = a.result) != null && |
|
1210 compareAndSet(0, 1)) { |
|
1211 if ((r instanceof AltResult) && |
|
1212 (ex = ((AltResult)r).ex) != null) { |
|
1213 try { |
|
1214 t = fn.apply(ex); |
|
1215 } catch (Throwable rex) { |
|
1216 dx = rex; |
|
1217 } |
|
1218 } |
|
1219 else { |
|
1220 @SuppressWarnings("unchecked") T tr = (T) r; |
|
1221 t = tr; |
|
1222 } |
|
1223 dst.internalComplete(t, dx); |
|
1224 } |
|
1225 } |
|
1226 private static final long serialVersionUID = 5232453952276885070L; |
|
1227 } |
|
1228 |
|
1229 static final class ThenCopy<T> extends Completion { |
|
1230 final CompletableFuture<?> src; |
|
1231 final CompletableFuture<T> dst; |
|
1232 ThenCopy(CompletableFuture<?> src, |
|
1233 CompletableFuture<T> dst) { |
|
1234 this.src = src; this.dst = dst; |
|
1235 } |
|
1236 public final void run() { |
|
1237 final CompletableFuture<?> a; |
|
1238 final CompletableFuture<T> dst; |
|
1239 Object r; T t; Throwable ex; |
|
1240 if ((dst = this.dst) != null && |
|
1241 (a = this.src) != null && |
|
1242 (r = a.result) != null && |
|
1243 compareAndSet(0, 1)) { |
|
1244 if (r instanceof AltResult) { |
|
1245 ex = ((AltResult)r).ex; |
|
1246 t = null; |
|
1247 } |
|
1248 else { |
|
1249 ex = null; |
|
1250 @SuppressWarnings("unchecked") T tr = (T) r; |
|
1251 t = tr; |
|
1252 } |
|
1253 dst.internalComplete(t, ex); |
|
1254 } |
|
1255 } |
|
1256 private static final long serialVersionUID = 5232453952276885070L; |
|
1257 } |
|
1258 |
|
1259 // version of ThenCopy for CompletableFuture<Void> dst |
|
1260 static final class ThenPropagate extends Completion { |
|
1261 final CompletableFuture<?> src; |
|
1262 final CompletableFuture<Void> dst; |
|
1263 ThenPropagate(CompletableFuture<?> src, |
|
1264 CompletableFuture<Void> dst) { |
|
1265 this.src = src; this.dst = dst; |
|
1266 } |
|
1267 public final void run() { |
|
1268 final CompletableFuture<?> a; |
|
1269 final CompletableFuture<Void> dst; |
|
1270 Object r; Throwable ex; |
|
1271 if ((dst = this.dst) != null && |
|
1272 (a = this.src) != null && |
|
1273 (r = a.result) != null && |
|
1274 compareAndSet(0, 1)) { |
|
1275 if (r instanceof AltResult) |
|
1276 ex = ((AltResult)r).ex; |
|
1277 else |
|
1278 ex = null; |
|
1279 dst.internalComplete(null, ex); |
|
1280 } |
|
1281 } |
|
1282 private static final long serialVersionUID = 5232453952276885070L; |
|
1283 } |
|
1284 |
|
1285 static final class HandleCompletion<T,U> extends Completion { |
|
1286 final CompletableFuture<? extends T> src; |
|
1287 final BiFunction<? super T, Throwable, ? extends U> fn; |
|
1288 final CompletableFuture<U> dst; |
|
1289 HandleCompletion(CompletableFuture<? extends T> src, |
|
1290 BiFunction<? super T, Throwable, ? extends U> fn, |
|
1291 CompletableFuture<U> dst) { |
|
1292 this.src = src; this.fn = fn; this.dst = dst; |
|
1293 } |
|
1294 public final void run() { |
|
1295 final CompletableFuture<? extends T> a; |
|
1296 final BiFunction<? super T, Throwable, ? extends U> fn; |
|
1297 final CompletableFuture<U> dst; |
|
1298 Object r; T t; Throwable ex; |
|
1299 if ((dst = this.dst) != null && |
|
1300 (fn = this.fn) != null && |
|
1301 (a = this.src) != null && |
|
1302 (r = a.result) != null && |
|
1303 compareAndSet(0, 1)) { |
|
1304 if (r instanceof AltResult) { |
|
1305 ex = ((AltResult)r).ex; |
|
1306 t = null; |
|
1307 } |
|
1308 else { |
|
1309 ex = null; |
|
1310 @SuppressWarnings("unchecked") T tr = (T) r; |
|
1311 t = tr; |
|
1312 } |
|
1313 U u = null; Throwable dx = null; |
|
1314 try { |
|
1315 u = fn.apply(t, ex); |
|
1316 } catch (Throwable rex) { |
|
1317 dx = rex; |
|
1318 } |
|
1319 dst.internalComplete(u, dx); |
|
1320 } |
|
1321 } |
|
1322 private static final long serialVersionUID = 5232453952276885070L; |
|
1323 } |
|
1324 |
|
1325 static final class ThenCompose<T,U> extends Completion { |
|
1326 final CompletableFuture<? extends T> src; |
|
1327 final Function<? super T, CompletableFuture<U>> fn; |
|
1328 final CompletableFuture<U> dst; |
|
1329 final Executor executor; |
|
1330 ThenCompose(CompletableFuture<? extends T> src, |
|
1331 Function<? super T, CompletableFuture<U>> fn, |
|
1332 CompletableFuture<U> dst, |
|
1333 Executor executor) { |
|
1334 this.src = src; this.fn = fn; this.dst = dst; |
|
1335 this.executor = executor; |
|
1336 } |
|
1337 public final void run() { |
|
1338 final CompletableFuture<? extends T> a; |
|
1339 final Function<? super T, CompletableFuture<U>> fn; |
|
1340 final CompletableFuture<U> dst; |
|
1341 Object r; T t; Throwable ex; Executor e; |
|
1342 if ((dst = this.dst) != null && |
|
1343 (fn = this.fn) != null && |
|
1344 (a = this.src) != null && |
|
1345 (r = a.result) != null && |
|
1346 compareAndSet(0, 1)) { |
|
1347 if (r instanceof AltResult) { |
|
1348 ex = ((AltResult)r).ex; |
|
1349 t = null; |
|
1350 } |
|
1351 else { |
|
1352 ex = null; |
|
1353 @SuppressWarnings("unchecked") T tr = (T) r; |
|
1354 t = tr; |
|
1355 } |
|
1356 CompletableFuture<U> c = null; |
|
1357 U u = null; |
|
1358 boolean complete = false; |
|
1359 if (ex == null) { |
|
1360 if ((e = executor) != null) |
|
1361 e.execute(new AsyncCompose<T,U>(t, fn, dst)); |
|
1362 else { |
|
1363 try { |
|
1364 if ((c = fn.apply(t)) == null) |
|
1365 ex = new NullPointerException(); |
|
1366 } catch (Throwable rex) { |
|
1367 ex = rex; |
|
1368 } |
|
1369 } |
|
1370 } |
|
1371 if (c != null) { |
|
1372 ThenCopy<U> d = null; |
|
1373 Object s; |
|
1374 if ((s = c.result) == null) { |
|
1375 CompletionNode p = new CompletionNode |
|
1376 (d = new ThenCopy<U>(c, dst)); |
|
1377 while ((s = c.result) == null) { |
|
1378 if (UNSAFE.compareAndSwapObject |
|
1379 (c, COMPLETIONS, p.next = c.completions, p)) |
|
1380 break; |
|
1381 } |
|
1382 } |
|
1383 if (s != null && (d == null || d.compareAndSet(0, 1))) { |
|
1384 complete = true; |
|
1385 if (s instanceof AltResult) { |
|
1386 ex = ((AltResult)s).ex; // no rewrap |
|
1387 u = null; |
|
1388 } |
|
1389 else { |
|
1390 @SuppressWarnings("unchecked") U us = (U) s; |
|
1391 u = us; |
|
1392 } |
|
1393 } |
|
1394 } |
|
1395 if (complete || ex != null) |
|
1396 dst.internalComplete(u, ex); |
|
1397 if (c != null) |
|
1398 c.helpPostComplete(); |
|
1399 } |
|
1400 } |
|
1401 private static final long serialVersionUID = 5232453952276885070L; |
|
1402 } |
|
1403 |
|
1404 // public methods |
|
1405 |
|
1406 /** |
|
1407 * Creates a new incomplete CompletableFuture. |
|
1408 */ |
|
1409 public CompletableFuture() { |
|
1410 } |
|
1411 |
|
1412 /** |
|
1413 * Returns a new CompletableFuture that is asynchronously completed |
|
1414 * by a task running in the {@link ForkJoinPool#commonPool()} with |
|
1415 * the value obtained by calling the given Supplier. |
|
1416 * |
|
1417 * @param supplier a function returning the value to be used |
|
1418 * to complete the returned CompletableFuture |
|
1419 * @return the new CompletableFuture |
|
1420 */ |
|
1421 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { |
|
1422 if (supplier == null) throw new NullPointerException(); |
|
1423 CompletableFuture<U> f = new CompletableFuture<U>(); |
|
1424 ForkJoinPool.commonPool(). |
|
1425 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f)); |
|
1426 return f; |
|
1427 } |
|
1428 |
|
1429 /** |
|
1430 * Returns a new CompletableFuture that is asynchronously completed |
|
1431 * by a task running in the given executor with the value obtained |
|
1432 * by calling the given Supplier. |
|
1433 * |
|
1434 * @param supplier a function returning the value to be used |
|
1435 * to complete the returned CompletableFuture |
|
1436 * @param executor the executor to use for asynchronous execution |
|
1437 * @return the new CompletableFuture |
|
1438 */ |
|
1439 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, |
|
1440 Executor executor) { |
|
1441 if (executor == null || supplier == null) |
|
1442 throw new NullPointerException(); |
|
1443 CompletableFuture<U> f = new CompletableFuture<U>(); |
|
1444 executor.execute(new AsyncSupply<U>(supplier, f)); |
|
1445 return f; |
|
1446 } |
|
1447 |
|
1448 /** |
|
1449 * Returns a new CompletableFuture that is asynchronously completed |
|
1450 * by a task running in the {@link ForkJoinPool#commonPool()} after |
|
1451 * it runs the given action. |
|
1452 * |
|
1453 * @param runnable the action to run before completing the |
|
1454 * returned CompletableFuture |
|
1455 * @return the new CompletableFuture |
|
1456 */ |
|
1457 public static CompletableFuture<Void> runAsync(Runnable runnable) { |
|
1458 if (runnable == null) throw new NullPointerException(); |
|
1459 CompletableFuture<Void> f = new CompletableFuture<Void>(); |
|
1460 ForkJoinPool.commonPool(). |
|
1461 execute((ForkJoinTask<?>)new AsyncRun(runnable, f)); |
|
1462 return f; |
|
1463 } |
|
1464 |
|
1465 /** |
|
1466 * Returns a new CompletableFuture that is asynchronously completed |
|
1467 * by a task running in the given executor after it runs the given |
|
1468 * action. |
|
1469 * |
|
1470 * @param runnable the action to run before completing the |
|
1471 * returned CompletableFuture |
|
1472 * @param executor the executor to use for asynchronous execution |
|
1473 * @return the new CompletableFuture |
|
1474 */ |
|
1475 public static CompletableFuture<Void> runAsync(Runnable runnable, |
|
1476 Executor executor) { |
|
1477 if (executor == null || runnable == null) |
|
1478 throw new NullPointerException(); |
|
1479 CompletableFuture<Void> f = new CompletableFuture<Void>(); |
|
1480 executor.execute(new AsyncRun(runnable, f)); |
|
1481 return f; |
|
1482 } |
|
1483 |
|
1484 /** |
|
1485 * Returns a new CompletableFuture that is already completed with |
|
1486 * the given value. |
|
1487 * |
|
1488 * @param value the value |
|
1489 * @return the completed CompletableFuture |
|
1490 */ |
|
1491 public static <U> CompletableFuture<U> completedFuture(U value) { |
|
1492 CompletableFuture<U> f = new CompletableFuture<U>(); |
|
1493 f.result = (value == null) ? NIL : value; |
|
1494 return f; |
|
1495 } |
|
1496 |
|
1497 /** |
|
1498 * Returns {@code true} if completed in any fashion: normally, |
|
1499 * exceptionally, or via cancellation. |
|
1500 * |
|
1501 * @return {@code true} if completed |
|
1502 */ |
|
1503 public boolean isDone() { |
|
1504 return result != null; |
|
1505 } |
|
1506 |
|
1507 /** |
|
1508 * Waits if necessary for this future to complete, and then |
|
1509 * returns its result. |
|
1510 * |
|
1511 * @return the result value |
|
1512 * @throws CancellationException if this future was cancelled |
|
1513 * @throws ExecutionException if this future completed exceptionally |
|
1514 * @throws InterruptedException if the current thread was interrupted |
|
1515 * while waiting |
|
1516 */ |
|
1517 public T get() throws InterruptedException, ExecutionException { |
|
1518 Object r; Throwable ex, cause; |
|
1519 if ((r = result) == null && (r = waitingGet(true)) == null) |
|
1520 throw new InterruptedException(); |
|
1521 if (!(r instanceof AltResult)) { |
|
1522 @SuppressWarnings("unchecked") T tr = (T) r; |
|
1523 return tr; |
|
1524 } |
|
1525 if ((ex = ((AltResult)r).ex) == null) |
|
1526 return null; |
|
1527 if (ex instanceof CancellationException) |
|
1528 throw (CancellationException)ex; |
|
1529 if ((ex instanceof CompletionException) && |
|
1530 (cause = ex.getCause()) != null) |
|
1531 ex = cause; |
|
1532 throw new ExecutionException(ex); |
|
1533 } |
|
1534 |
|
1535 /** |
|
1536 * Waits if necessary for at most the given time for this future |
|
1537 * to complete, and then returns its result, if available. |
|
1538 * |
|
1539 * @param timeout the maximum time to wait |
|
1540 * @param unit the time unit of the timeout argument |
|
1541 * @return the result value |
|
1542 * @throws CancellationException if this future was cancelled |
|
1543 * @throws ExecutionException if this future completed exceptionally |
|
1544 * @throws InterruptedException if the current thread was interrupted |
|
1545 * while waiting |
|
1546 * @throws TimeoutException if the wait timed out |
|
1547 */ |
|
1548 public T get(long timeout, TimeUnit unit) |
|
1549 throws InterruptedException, ExecutionException, TimeoutException { |
|
1550 Object r; Throwable ex, cause; |
|
1551 long nanos = unit.toNanos(timeout); |
|
1552 if (Thread.interrupted()) |
|
1553 throw new InterruptedException(); |
|
1554 if ((r = result) == null) |
|
1555 r = timedAwaitDone(nanos); |
|
1556 if (!(r instanceof AltResult)) { |
|
1557 @SuppressWarnings("unchecked") T tr = (T) r; |
|
1558 return tr; |
|
1559 } |
|
1560 if ((ex = ((AltResult)r).ex) == null) |
|
1561 return null; |
|
1562 if (ex instanceof CancellationException) |
|
1563 throw (CancellationException)ex; |
|
1564 if ((ex instanceof CompletionException) && |
|
1565 (cause = ex.getCause()) != null) |
|
1566 ex = cause; |
|
1567 throw new ExecutionException(ex); |
|
1568 } |
|
1569 |
|
1570 /** |
|
1571 * Returns the result value when complete, or throws an |
|
1572 * (unchecked) exception if completed exceptionally. To better |
|
1573 * conform with the use of common functional forms, if a |
|
1574 * computation involved in the completion of this |
|
1575 * CompletableFuture threw an exception, this method throws an |
|
1576 * (unchecked) {@link CompletionException} with the underlying |
|
1577 * exception as its cause. |
|
1578 * |
|
1579 * @return the result value |
|
1580 * @throws CancellationException if the computation was cancelled |
|
1581 * @throws CompletionException if this future completed |
|
1582 * exceptionally or a completion computation threw an exception |
|
1583 */ |
|
1584 public T join() { |
|
1585 Object r; Throwable ex; |
|
1586 if ((r = result) == null) |
|
1587 r = waitingGet(false); |
|
1588 if (!(r instanceof AltResult)) { |
|
1589 @SuppressWarnings("unchecked") T tr = (T) r; |
|
1590 return tr; |
|
1591 } |
|
1592 if ((ex = ((AltResult)r).ex) == null) |
|
1593 return null; |
|
1594 if (ex instanceof CancellationException) |
|
1595 throw (CancellationException)ex; |
|
1596 if (ex instanceof CompletionException) |
|
1597 throw (CompletionException)ex; |
|
1598 throw new CompletionException(ex); |
|
1599 } |
|
1600 |
|
1601 /** |
|
1602 * Returns the result value (or throws any encountered exception) |
|
1603 * if completed, else returns the given valueIfAbsent. |
|
1604 * |
|
1605 * @param valueIfAbsent the value to return if not completed |
|
1606 * @return the result value, if completed, else the given valueIfAbsent |
|
1607 * @throws CancellationException if the computation was cancelled |
|
1608 * @throws CompletionException if this future completed |
|
1609 * exceptionally or a completion computation threw an exception |
|
1610 */ |
|
1611 public T getNow(T valueIfAbsent) { |
|
1612 Object r; Throwable ex; |
|
1613 if ((r = result) == null) |
|
1614 return valueIfAbsent; |
|
1615 if (!(r instanceof AltResult)) { |
|
1616 @SuppressWarnings("unchecked") T tr = (T) r; |
|
1617 return tr; |
|
1618 } |
|
1619 if ((ex = ((AltResult)r).ex) == null) |
|
1620 return null; |
|
1621 if (ex instanceof CancellationException) |
|
1622 throw (CancellationException)ex; |
|
1623 if (ex instanceof CompletionException) |
|
1624 throw (CompletionException)ex; |
|
1625 throw new CompletionException(ex); |
|
1626 } |
|
1627 |
|
1628 /** |
|
1629 * If not already completed, sets the value returned by {@link |
|
1630 * #get()} and related methods to the given value. |
|
1631 * |
|
1632 * @param value the result value |
|
1633 * @return {@code true} if this invocation caused this CompletableFuture |
|
1634 * to transition to a completed state, else {@code false} |
|
1635 */ |
|
1636 public boolean complete(T value) { |
|
1637 boolean triggered = result == null && |
|
1638 UNSAFE.compareAndSwapObject(this, RESULT, null, |
|
1639 value == null ? NIL : value); |
|
1640 postComplete(); |
|
1641 return triggered; |
|
1642 } |
|
1643 |
|
1644 /** |
|
1645 * If not already completed, causes invocations of {@link #get()} |
|
1646 * and related methods to throw the given exception. |
|
1647 * |
|
1648 * @param ex the exception |
|
1649 * @return {@code true} if this invocation caused this CompletableFuture |
|
1650 * to transition to a completed state, else {@code false} |
|
1651 */ |
|
1652 public boolean completeExceptionally(Throwable ex) { |
|
1653 if (ex == null) throw new NullPointerException(); |
|
1654 boolean triggered = result == null && |
|
1655 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex)); |
|
1656 postComplete(); |
|
1657 return triggered; |
|
1658 } |
|
1659 |
|
1660 /** |
|
1661 * Returns a new CompletableFuture that is completed |
|
1662 * when this CompletableFuture completes, with the result of the |
|
1663 * given function of this CompletableFuture's result. |
|
1664 * |
|
1665 * <p>If this CompletableFuture completes exceptionally, or the |
|
1666 * supplied function throws an exception, then the returned |
|
1667 * CompletableFuture completes exceptionally with a |
|
1668 * CompletionException holding the exception as its cause. |
|
1669 * |
|
1670 * @param fn the function to use to compute the value of |
|
1671 * the returned CompletableFuture |
|
1672 * @return the new CompletableFuture |
|
1673 */ |
|
1674 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) { |
|
1675 return doThenApply(fn, null); |
|
1676 } |
|
1677 |
|
1678 /** |
|
1679 * Returns a new CompletableFuture that is asynchronously completed |
|
1680 * when this CompletableFuture completes, with the result of the |
|
1681 * given function of this CompletableFuture's result from a |
|
1682 * task running in the {@link ForkJoinPool#commonPool()}. |
|
1683 * |
|
1684 * <p>If this CompletableFuture completes exceptionally, or the |
|
1685 * supplied function throws an exception, then the returned |
|
1686 * CompletableFuture completes exceptionally with a |
|
1687 * CompletionException holding the exception as its cause. |
|
1688 * |
|
1689 * @param fn the function to use to compute the value of |
|
1690 * the returned CompletableFuture |
|
1691 * @return the new CompletableFuture |
|
1692 */ |
|
1693 public <U> CompletableFuture<U> thenApplyAsync |
|
1694 (Function<? super T,? extends U> fn) { |
|
1695 return doThenApply(fn, ForkJoinPool.commonPool()); |
|
1696 } |
|
1697 |
|
1698 /** |
|
1699 * Returns a new CompletableFuture that is asynchronously completed |
|
1700 * when this CompletableFuture completes, with the result of the |
|
1701 * given function of this CompletableFuture's result from a |
|
1702 * task running in the given executor. |
|
1703 * |
|
1704 * <p>If this CompletableFuture completes exceptionally, or the |
|
1705 * supplied function throws an exception, then the returned |
|
1706 * CompletableFuture completes exceptionally with a |
|
1707 * CompletionException holding the exception as its cause. |
|
1708 * |
|
1709 * @param fn the function to use to compute the value of |
|
1710 * the returned CompletableFuture |
|
1711 * @param executor the executor to use for asynchronous execution |
|
1712 * @return the new CompletableFuture |
|
1713 */ |
|
1714 public <U> CompletableFuture<U> thenApplyAsync |
|
1715 (Function<? super T,? extends U> fn, |
|
1716 Executor executor) { |
|
1717 if (executor == null) throw new NullPointerException(); |
|
1718 return doThenApply(fn, executor); |
|
1719 } |
|
1720 |
|
1721 private <U> CompletableFuture<U> doThenApply |
|
1722 (Function<? super T,? extends U> fn, |
|
1723 Executor e) { |
|
1724 if (fn == null) throw new NullPointerException(); |
|
1725 CompletableFuture<U> dst = new CompletableFuture<U>(); |
|
1726 ThenApply<T,U> d = null; |
|
1727 Object r; |
|
1728 if ((r = result) == null) { |
|
1729 CompletionNode p = new CompletionNode |
|
1730 (d = new ThenApply<T,U>(this, fn, dst, e)); |
|
1731 while ((r = result) == null) { |
|
1732 if (UNSAFE.compareAndSwapObject |
|
1733 (this, COMPLETIONS, p.next = completions, p)) |
|
1734 break; |
|
1735 } |
|
1736 } |
|
1737 if (r != null && (d == null || d.compareAndSet(0, 1))) { |
|
1738 T t; Throwable ex; |
|
1739 if (r instanceof AltResult) { |
|
1740 ex = ((AltResult)r).ex; |
|
1741 t = null; |
|
1742 } |
|
1743 else { |
|
1744 ex = null; |
|
1745 @SuppressWarnings("unchecked") T tr = (T) r; |
|
1746 t = tr; |
|
1747 } |
|
1748 U u = null; |
|
1749 if (ex == null) { |
|
1750 try { |
|
1751 if (e != null) |
|
1752 e.execute(new AsyncApply<T,U>(t, fn, dst)); |
|
1753 else |
|
1754 u = fn.apply(t); |
|
1755 } catch (Throwable rex) { |
|
1756 ex = rex; |
|
1757 } |
|
1758 } |
|
1759 if (e == null || ex != null) |
|
1760 dst.internalComplete(u, ex); |
|
1761 } |
|
1762 helpPostComplete(); |
|
1763 return dst; |
|
1764 } |
|
1765 |
|
1766 /** |
|
1767 * Returns a new CompletableFuture that is completed |
|
1768 * when this CompletableFuture completes, after performing the given |
|
1769 * action with this CompletableFuture's result. |
|
1770 * |
|
1771 * <p>If this CompletableFuture completes exceptionally, or the |
|
1772 * supplied action throws an exception, then the returned |
|
1773 * CompletableFuture completes exceptionally with a |
|
1774 * CompletionException holding the exception as its cause. |
|
1775 * |
|
1776 * @param block the action to perform before completing the |
|
1777 * returned CompletableFuture |
|
1778 * @return the new CompletableFuture |
|
1779 */ |
|
1780 public CompletableFuture<Void> thenAccept(Consumer<? super T> block) { |
|
1781 return doThenAccept(block, null); |
|
1782 } |
|
1783 |
|
1784 /** |
|
1785 * Returns a new CompletableFuture that is asynchronously completed |
|
1786 * when this CompletableFuture completes, after performing the given |
|
1787 * action with this CompletableFuture's result from a task running |
|
1788 * in the {@link ForkJoinPool#commonPool()}. |
|
1789 * |
|
1790 * <p>If this CompletableFuture completes exceptionally, or the |
|
1791 * supplied action throws an exception, then the returned |
|
1792 * CompletableFuture completes exceptionally with a |
|
1793 * CompletionException holding the exception as its cause. |
|
1794 * |
|
1795 * @param block the action to perform before completing the |
|
1796 * returned CompletableFuture |
|
1797 * @return the new CompletableFuture |
|
1798 */ |
|
1799 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block) { |
|
1800 return doThenAccept(block, ForkJoinPool.commonPool()); |
|
1801 } |
|
1802 |
|
1803 /** |
|
1804 * Returns a new CompletableFuture that is asynchronously completed |
|
1805 * when this CompletableFuture completes, after performing the given |
|
1806 * action with this CompletableFuture's result from a task running |
|
1807 * in the given executor. |
|
1808 * |
|
1809 * <p>If this CompletableFuture completes exceptionally, or the |
|
1810 * supplied action throws an exception, then the returned |
|
1811 * CompletableFuture completes exceptionally with a |
|
1812 * CompletionException holding the exception as its cause. |
|
1813 * |
|
1814 * @param block the action to perform before completing the |
|
1815 * returned CompletableFuture |
|
1816 * @param executor the executor to use for asynchronous execution |
|
1817 * @return the new CompletableFuture |
|
1818 */ |
|
1819 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block, |
|
1820 Executor executor) { |
|
1821 if (executor == null) throw new NullPointerException(); |
|
1822 return doThenAccept(block, executor); |
|
1823 } |
|
1824 |
|
1825 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn, |
|
1826 Executor e) { |
|
1827 if (fn == null) throw new NullPointerException(); |
|
1828 CompletableFuture<Void> dst = new CompletableFuture<Void>(); |
|
1829 ThenAccept<T> d = null; |
|
1830 Object r; |
|
1831 if ((r = result) == null) { |
|
1832 CompletionNode p = new CompletionNode |
|
1833 (d = new ThenAccept<T>(this, fn, dst, e)); |
|
1834 while ((r = result) == null) { |
|
1835 if (UNSAFE.compareAndSwapObject |
|
1836 (this, COMPLETIONS, p.next = completions, p)) |
|
1837 break; |
|
1838 } |
|
1839 } |
|
1840 if (r != null && (d == null || d.compareAndSet(0, 1))) { |
|
1841 T t; Throwable ex; |
|
1842 if (r instanceof AltResult) { |
|
1843 ex = ((AltResult)r).ex; |
|
1844 t = null; |
|
1845 } |
|
1846 else { |
|
1847 ex = null; |
|
1848 @SuppressWarnings("unchecked") T tr = (T) r; |
|
1849 t = tr; |
|
1850 } |
|
1851 if (ex == null) { |
|
1852 try { |
|
1853 if (e != null) |
|
1854 e.execute(new AsyncAccept<T>(t, fn, dst)); |
|
1855 else |
|
1856 fn.accept(t); |
|
1857 } catch (Throwable rex) { |
|
1858 ex = rex; |
|
1859 } |
|
1860 } |
|
1861 if (e == null || ex != null) |
|
1862 dst.internalComplete(null, ex); |
|
1863 } |
|
1864 helpPostComplete(); |
|
1865 return dst; |
|
1866 } |
|
1867 |
|
1868 /** |
|
1869 * Returns a new CompletableFuture that is completed |
|
1870 * when this CompletableFuture completes, after performing the given |
|
1871 * action. |
|
1872 * |
|
1873 * <p>If this CompletableFuture completes exceptionally, or the |
|
1874 * supplied action throws an exception, then the returned |
|
1875 * CompletableFuture completes exceptionally with a |
|
1876 * CompletionException holding the exception as its cause. |
|
1877 * |
|
1878 * @param action the action to perform before completing the |
|
1879 * returned CompletableFuture |
|
1880 * @return the new CompletableFuture |
|
1881 */ |
|
1882 public CompletableFuture<Void> thenRun(Runnable action) { |
|
1883 return doThenRun(action, null); |
|
1884 } |
|
1885 |
|
1886 /** |
|
1887 * Returns a new CompletableFuture that is asynchronously completed |
|
1888 * when this CompletableFuture completes, after performing the given |
|
1889 * action from a task running in the {@link ForkJoinPool#commonPool()}. |
|
1890 * |
|
1891 * <p>If this CompletableFuture completes exceptionally, or the |
|
1892 * supplied action throws an exception, then the returned |
|
1893 * CompletableFuture completes exceptionally with a |
|
1894 * CompletionException holding the exception as its cause. |
|
1895 * |
|
1896 * @param action the action to perform before completing the |
|
1897 * returned CompletableFuture |
|
1898 * @return the new CompletableFuture |
|
1899 */ |
|
1900 public CompletableFuture<Void> thenRunAsync(Runnable action) { |
|
1901 return doThenRun(action, ForkJoinPool.commonPool()); |
|
1902 } |
|
1903 |
|
1904 /** |
|
1905 * Returns a new CompletableFuture that is asynchronously completed |
|
1906 * when this CompletableFuture completes, after performing the given |
|
1907 * action from a task running in the given executor. |
|
1908 * |
|
1909 * <p>If this CompletableFuture completes exceptionally, or the |
|
1910 * supplied action throws an exception, then the returned |
|
1911 * CompletableFuture completes exceptionally with a |
|
1912 * CompletionException holding the exception as its cause. |
|
1913 * |
|
1914 * @param action the action to perform before completing the |
|
1915 * returned CompletableFuture |
|
1916 * @param executor the executor to use for asynchronous execution |
|
1917 * @return the new CompletableFuture |
|
1918 */ |
|
1919 public CompletableFuture<Void> thenRunAsync(Runnable action, |
|
1920 Executor executor) { |
|
1921 if (executor == null) throw new NullPointerException(); |
|
1922 return doThenRun(action, executor); |
|
1923 } |
|
1924 |
|
1925 private CompletableFuture<Void> doThenRun(Runnable action, |
|
1926 Executor e) { |
|
1927 if (action == null) throw new NullPointerException(); |
|
1928 CompletableFuture<Void> dst = new CompletableFuture<Void>(); |
|
1929 ThenRun d = null; |
|
1930 Object r; |
|
1931 if ((r = result) == null) { |
|
1932 CompletionNode p = new CompletionNode |
|
1933 (d = new ThenRun(this, action, dst, e)); |
|
1934 while ((r = result) == null) { |
|
1935 if (UNSAFE.compareAndSwapObject |
|
1936 (this, COMPLETIONS, p.next = completions, p)) |
|
1937 break; |
|
1938 } |
|
1939 } |
|
1940 if (r != null && (d == null || d.compareAndSet(0, 1))) { |
|
1941 Throwable ex; |
|
1942 if (r instanceof AltResult) |
|
1943 ex = ((AltResult)r).ex; |
|
1944 else |
|
1945 ex = null; |
|
1946 if (ex == null) { |
|
1947 try { |
|
1948 if (e != null) |
|
1949 e.execute(new AsyncRun(action, dst)); |
|
1950 else |
|
1951 action.run(); |
|
1952 } catch (Throwable rex) { |
|
1953 ex = rex; |
|
1954 } |
|
1955 } |
|
1956 if (e == null || ex != null) |
|
1957 dst.internalComplete(null, ex); |
|
1958 } |
|
1959 helpPostComplete(); |
|
1960 return dst; |
|
1961 } |
|
1962 |
|
1963 /** |
|
1964 * Returns a new CompletableFuture that is completed |
|
1965 * when both this and the other given CompletableFuture complete, |
|
1966 * with the result of the given function of the results of the two |
|
1967 * CompletableFutures. |
|
1968 * |
|
1969 * <p>If this and/or the other CompletableFuture complete |
|
1970 * exceptionally, or the supplied function throws an exception, |
|
1971 * then the returned CompletableFuture completes exceptionally |
|
1972 * with a CompletionException holding the exception as its cause. |
|
1973 * |
|
1974 * @param other the other CompletableFuture |
|
1975 * @param fn the function to use to compute the value of |
|
1976 * the returned CompletableFuture |
|
1977 * @return the new CompletableFuture |
|
1978 */ |
|
1979 public <U,V> CompletableFuture<V> thenCombine |
|
1980 (CompletableFuture<? extends U> other, |
|
1981 BiFunction<? super T,? super U,? extends V> fn) { |
|
1982 return doThenCombine(other, fn, null); |
|
1983 } |
|
1984 |
|
1985 /** |
|
1986 * Returns a new CompletableFuture that is asynchronously completed |
|
1987 * when both this and the other given CompletableFuture complete, |
|
1988 * with the result of the given function of the results of the two |
|
1989 * CompletableFutures from a task running in the |
|
1990 * {@link ForkJoinPool#commonPool()}. |
|
1991 * |
|
1992 * <p>If this and/or the other CompletableFuture complete |
|
1993 * exceptionally, or the supplied function throws an exception, |
|
1994 * then the returned CompletableFuture completes exceptionally |
|
1995 * with a CompletionException holding the exception as its cause. |
|
1996 * |
|
1997 * @param other the other CompletableFuture |
|
1998 * @param fn the function to use to compute the value of |
|
1999 * the returned CompletableFuture |
|
2000 * @return the new CompletableFuture |
|
2001 */ |
|
2002 public <U,V> CompletableFuture<V> thenCombineAsync |
|
2003 (CompletableFuture<? extends U> other, |
|
2004 BiFunction<? super T,? super U,? extends V> fn) { |
|
2005 return doThenCombine(other, fn, ForkJoinPool.commonPool()); |
|
2006 } |
|
2007 |
|
2008 /** |
|
2009 * Returns a new CompletableFuture that is asynchronously completed |
|
2010 * when both this and the other given CompletableFuture complete, |
|
2011 * with the result of the given function of the results of the two |
|
2012 * CompletableFutures from a task running in the given executor. |
|
2013 * |
|
2014 * <p>If this and/or the other CompletableFuture complete |
|
2015 * exceptionally, or the supplied function throws an exception, |
|
2016 * then the returned CompletableFuture completes exceptionally |
|
2017 * with a CompletionException holding the exception as its cause. |
|
2018 * |
|
2019 * @param other the other CompletableFuture |
|
2020 * @param fn the function to use to compute the value of |
|
2021 * the returned CompletableFuture |
|
2022 * @param executor the executor to use for asynchronous execution |
|
2023 * @return the new CompletableFuture |
|
2024 */ |
|
2025 public <U,V> CompletableFuture<V> thenCombineAsync |
|
2026 (CompletableFuture<? extends U> other, |
|
2027 BiFunction<? super T,? super U,? extends V> fn, |
|
2028 Executor executor) { |
|
2029 if (executor == null) throw new NullPointerException(); |
|
2030 return doThenCombine(other, fn, executor); |
|
2031 } |
|
2032 |
|
2033 private <U,V> CompletableFuture<V> doThenCombine |
|
2034 (CompletableFuture<? extends U> other, |
|
2035 BiFunction<? super T,? super U,? extends V> fn, |
|
2036 Executor e) { |
|
2037 if (other == null || fn == null) throw new NullPointerException(); |
|
2038 CompletableFuture<V> dst = new CompletableFuture<V>(); |
|
2039 ThenCombine<T,U,V> d = null; |
|
2040 Object r, s = null; |
|
2041 if ((r = result) == null || (s = other.result) == null) { |
|
2042 d = new ThenCombine<T,U,V>(this, other, fn, dst, e); |
|
2043 CompletionNode q = null, p = new CompletionNode(d); |
|
2044 while ((r == null && (r = result) == null) || |
|
2045 (s == null && (s = other.result) == null)) { |
|
2046 if (q != null) { |
|
2047 if (s != null || |
|
2048 UNSAFE.compareAndSwapObject |
|
2049 (other, COMPLETIONS, q.next = other.completions, q)) |
|
2050 break; |
|
2051 } |
|
2052 else if (r != null || |
|
2053 UNSAFE.compareAndSwapObject |
|
2054 (this, COMPLETIONS, p.next = completions, p)) { |
|
2055 if (s != null) |
|
2056 break; |
|
2057 q = new CompletionNode(d); |
|
2058 } |
|
2059 } |
|
2060 } |
|
2061 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) { |
|
2062 T t; U u; Throwable ex; |
|
2063 if (r instanceof AltResult) { |
|
2064 ex = ((AltResult)r).ex; |
|
2065 t = null; |
|
2066 } |
|
2067 else { |
|
2068 ex = null; |
|
2069 @SuppressWarnings("unchecked") T tr = (T) r; |
|
2070 t = tr; |
|
2071 } |
|
2072 if (ex != null) |
|
2073 u = null; |
|
2074 else if (s instanceof AltResult) { |
|
2075 ex = ((AltResult)s).ex; |
|
2076 u = null; |
|
2077 } |
|
2078 else { |
|
2079 @SuppressWarnings("unchecked") U us = (U) s; |
|
2080 u = us; |
|
2081 } |
|
2082 V v = null; |
|
2083 if (ex == null) { |
|
2084 try { |
|
2085 if (e != null) |
|
2086 e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst)); |
|
2087 else |
|
2088 v = fn.apply(t, u); |
|
2089 } catch (Throwable rex) { |
|
2090 ex = rex; |
|
2091 } |
|
2092 } |
|
2093 if (e == null || ex != null) |
|
2094 dst.internalComplete(v, ex); |
|
2095 } |
|
2096 helpPostComplete(); |
|
2097 other.helpPostComplete(); |
|
2098 return dst; |
|
2099 } |
|
2100 |
|
2101 /** |
|
2102 * Returns a new CompletableFuture that is completed |
|
2103 * when both this and the other given CompletableFuture complete, |
|
2104 * after performing the given action with the results of the two |
|
2105 * CompletableFutures. |
|
2106 * |
|
2107 * <p>If this and/or the other CompletableFuture complete |
|
2108 * exceptionally, or the supplied action throws an exception, |
|
2109 * then the returned CompletableFuture completes exceptionally |
|
2110 * with a CompletionException holding the exception as its cause. |
|
2111 * |
|
2112 * @param other the other CompletableFuture |
|
2113 * @param block the action to perform before completing the |
|
2114 * returned CompletableFuture |
|
2115 * @return the new CompletableFuture |
|
2116 */ |
|
2117 public <U> CompletableFuture<Void> thenAcceptBoth |
|
2118 (CompletableFuture<? extends U> other, |
|
2119 BiConsumer<? super T, ? super U> block) { |
|
2120 return doThenAcceptBoth(other, block, null); |
|
2121 } |
|
2122 |
|
2123 /** |
|
2124 * Returns a new CompletableFuture that is asynchronously completed |
|
2125 * when both this and the other given CompletableFuture complete, |
|
2126 * after performing the given action with the results of the two |
|
2127 * CompletableFutures from a task running in the {@link |
|
2128 * ForkJoinPool#commonPool()}. |
|
2129 * |
|
2130 * <p>If this and/or the other CompletableFuture complete |
|
2131 * exceptionally, or the supplied action throws an exception, |
|
2132 * then the returned CompletableFuture completes exceptionally |
|
2133 * with a CompletionException holding the exception as its cause. |
|
2134 * |
|
2135 * @param other the other CompletableFuture |
|
2136 * @param block the action to perform before completing the |
|
2137 * returned CompletableFuture |
|
2138 * @return the new CompletableFuture |
|
2139 */ |
|
2140 public <U> CompletableFuture<Void> thenAcceptBothAsync |
|
2141 (CompletableFuture<? extends U> other, |
|
2142 BiConsumer<? super T, ? super U> block) { |
|
2143 return doThenAcceptBoth(other, block, ForkJoinPool.commonPool()); |
|
2144 } |
|
2145 |
|
2146 /** |
|
2147 * Returns a new CompletableFuture that is asynchronously completed |
|
2148 * when both this and the other given CompletableFuture complete, |
|
2149 * after performing the given action with the results of the two |
|
2150 * CompletableFutures from a task running in the given executor. |
|
2151 * |
|
2152 * <p>If this and/or the other CompletableFuture complete |
|
2153 * exceptionally, or the supplied action throws an exception, |
|
2154 * then the returned CompletableFuture completes exceptionally |
|
2155 * with a CompletionException holding the exception as its cause. |
|
2156 * |
|
2157 * @param other the other CompletableFuture |
|
2158 * @param block the action to perform before completing the |
|
2159 * returned CompletableFuture |
|
2160 * @param executor the executor to use for asynchronous execution |
|
2161 * @return the new CompletableFuture |
|
2162 */ |
|
2163 public <U> CompletableFuture<Void> thenAcceptBothAsync |
|
2164 (CompletableFuture<? extends U> other, |
|
2165 BiConsumer<? super T, ? super U> block, |
|
2166 Executor executor) { |
|
2167 if (executor == null) throw new NullPointerException(); |
|
2168 return doThenAcceptBoth(other, block, executor); |
|
2169 } |
|
2170 |
|
2171 private <U> CompletableFuture<Void> doThenAcceptBoth |
|
2172 (CompletableFuture<? extends U> other, |
|
2173 BiConsumer<? super T,? super U> fn, |
|
2174 Executor e) { |
|
2175 if (other == null || fn == null) throw new NullPointerException(); |
|
2176 CompletableFuture<Void> dst = new CompletableFuture<Void>(); |
|
2177 ThenAcceptBoth<T,U> d = null; |
|
2178 Object r, s = null; |
|
2179 if ((r = result) == null || (s = other.result) == null) { |
|
2180 d = new ThenAcceptBoth<T,U>(this, other, fn, dst, e); |
|
2181 CompletionNode q = null, p = new CompletionNode(d); |
|
2182 while ((r == null && (r = result) == null) || |
|
2183 (s == null && (s = other.result) == null)) { |
|
2184 if (q != null) { |
|
2185 if (s != null || |
|
2186 UNSAFE.compareAndSwapObject |
|
2187 (other, COMPLETIONS, q.next = other.completions, q)) |
|
2188 break; |
|
2189 } |
|
2190 else if (r != null || |
|
2191 UNSAFE.compareAndSwapObject |
|
2192 (this, COMPLETIONS, p.next = completions, p)) { |
|
2193 if (s != null) |
|
2194 break; |
|
2195 q = new CompletionNode(d); |
|
2196 } |
|
2197 } |
|
2198 } |
|
2199 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) { |
|
2200 T t; U u; Throwable ex; |
|
2201 if (r instanceof AltResult) { |
|
2202 ex = ((AltResult)r).ex; |
|
2203 t = null; |
|
2204 } |
|
2205 else { |
|
2206 ex = null; |
|
2207 @SuppressWarnings("unchecked") T tr = (T) r; |
|
2208 t = tr; |
|
2209 } |
|
2210 if (ex != null) |
|
2211 u = null; |
|
2212 else if (s instanceof AltResult) { |
|
2213 ex = ((AltResult)s).ex; |
|
2214 u = null; |
|
2215 } |
|
2216 else { |
|
2217 @SuppressWarnings("unchecked") U us = (U) s; |
|
2218 u = us; |
|
2219 } |
|
2220 if (ex == null) { |
|
2221 try { |
|
2222 if (e != null) |
|
2223 e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst)); |
|
2224 else |
|
2225 fn.accept(t, u); |
|
2226 } catch (Throwable rex) { |
|
2227 ex = rex; |
|
2228 } |
|
2229 } |
|
2230 if (e == null || ex != null) |
|
2231 dst.internalComplete(null, ex); |
|
2232 } |
|
2233 helpPostComplete(); |
|
2234 other.helpPostComplete(); |
|
2235 return dst; |
|
2236 } |
|
2237 |
|
2238 /** |
|
2239 * Returns a new CompletableFuture that is completed |
|
2240 * when both this and the other given CompletableFuture complete, |
|
2241 * after performing the given action. |
|
2242 * |
|
2243 * <p>If this and/or the other CompletableFuture complete |
|
2244 * exceptionally, or the supplied action throws an exception, |
|
2245 * then the returned CompletableFuture completes exceptionally |
|
2246 * with a CompletionException holding the exception as its cause. |
|
2247 * |
|
2248 * @param other the other CompletableFuture |
|
2249 * @param action the action to perform before completing the |
|
2250 * returned CompletableFuture |
|
2251 * @return the new CompletableFuture |
|
2252 */ |
|
2253 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other, |
|
2254 Runnable action) { |
|
2255 return doRunAfterBoth(other, action, null); |
|
2256 } |
|
2257 |
|
2258 /** |
|
2259 * Returns a new CompletableFuture that is asynchronously completed |
|
2260 * when both this and the other given CompletableFuture complete, |
|
2261 * after performing the given action from a task running in the |
|
2262 * {@link ForkJoinPool#commonPool()}. |
|
2263 * |
|
2264 * <p>If this and/or the other CompletableFuture complete |
|
2265 * exceptionally, or the supplied action throws an exception, |
|
2266 * then the returned CompletableFuture completes exceptionally |
|
2267 * with a CompletionException holding the exception as its cause. |
|
2268 * |
|
2269 * @param other the other CompletableFuture |
|
2270 * @param action the action to perform before completing the |
|
2271 * returned CompletableFuture |
|
2272 * @return the new CompletableFuture |
|
2273 */ |
|
2274 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other, |
|
2275 Runnable action) { |
|
2276 return doRunAfterBoth(other, action, ForkJoinPool.commonPool()); |
|
2277 } |
|
2278 |
|
2279 /** |
|
2280 * Returns a new CompletableFuture that is asynchronously completed |
|
2281 * when both this and the other given CompletableFuture complete, |
|
2282 * after performing the given action from a task running in the |
|
2283 * given executor. |
|
2284 * |
|
2285 * <p>If this and/or the other CompletableFuture complete |
|
2286 * exceptionally, or the supplied action throws an exception, |
|
2287 * then the returned CompletableFuture completes exceptionally |
|
2288 * with a CompletionException holding the exception as its cause. |
|
2289 * |
|
2290 * @param other the other CompletableFuture |
|
2291 * @param action the action to perform before completing the |
|
2292 * returned CompletableFuture |
|
2293 * @param executor the executor to use for asynchronous execution |
|
2294 * @return the new CompletableFuture |
|
2295 */ |
|
2296 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other, |
|
2297 Runnable action, |
|
2298 Executor executor) { |
|
2299 if (executor == null) throw new NullPointerException(); |
|
2300 return doRunAfterBoth(other, action, executor); |
|
2301 } |
|
2302 |
|
2303 private CompletableFuture<Void> doRunAfterBoth(CompletableFuture<?> other, |
|
2304 Runnable action, |
|
2305 Executor e) { |
|
2306 if (other == null || action == null) throw new NullPointerException(); |
|
2307 CompletableFuture<Void> dst = new CompletableFuture<Void>(); |
|
2308 RunAfterBoth d = null; |
|
2309 Object r, s = null; |
|
2310 if ((r = result) == null || (s = other.result) == null) { |
|
2311 d = new RunAfterBoth(this, other, action, dst, e); |
|
2312 CompletionNode q = null, p = new CompletionNode(d); |
|
2313 while ((r == null && (r = result) == null) || |
|
2314 (s == null && (s = other.result) == null)) { |
|
2315 if (q != null) { |
|
2316 if (s != null || |
|
2317 UNSAFE.compareAndSwapObject |
|
2318 (other, COMPLETIONS, q.next = other.completions, q)) |
|
2319 break; |
|
2320 } |
|
2321 else if (r != null || |
|
2322 UNSAFE.compareAndSwapObject |
|
2323 (this, COMPLETIONS, p.next = completions, p)) { |
|
2324 if (s != null) |
|
2325 break; |
|
2326 q = new CompletionNode(d); |
|
2327 } |
|
2328 } |
|
2329 } |
|
2330 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) { |
|
2331 Throwable ex; |
|
2332 if (r instanceof AltResult) |
|
2333 ex = ((AltResult)r).ex; |
|
2334 else |
|
2335 ex = null; |
|
2336 if (ex == null && (s instanceof AltResult)) |
|
2337 ex = ((AltResult)s).ex; |
|
2338 if (ex == null) { |
|
2339 try { |
|
2340 if (e != null) |
|
2341 e.execute(new AsyncRun(action, dst)); |
|
2342 else |
|
2343 action.run(); |
|
2344 } catch (Throwable rex) { |
|
2345 ex = rex; |
|
2346 } |
|
2347 } |
|
2348 if (e == null || ex != null) |
|
2349 dst.internalComplete(null, ex); |
|
2350 } |
|
2351 helpPostComplete(); |
|
2352 other.helpPostComplete(); |
|
2353 return dst; |
|
2354 } |
|
2355 |
|
2356 /** |
|
2357 * Returns a new CompletableFuture that is completed |
|
2358 * when either this or the other given CompletableFuture completes, |
|
2359 * with the result of the given function of either this or the other |
|
2360 * CompletableFuture's result. |
|
2361 * |
|
2362 * <p>If this and/or the other CompletableFuture complete |
|
2363 * exceptionally, then the returned CompletableFuture may also do so, |
|
2364 * with a CompletionException holding one of these exceptions as its |
|
2365 * cause. No guarantees are made about which result or exception is |
|
2366 * used in the returned CompletableFuture. If the supplied function |
|
2367 * throws an exception, then the returned CompletableFuture completes |
|
2368 * exceptionally with a CompletionException holding the exception as |
|
2369 * its cause. |
|
2370 * |
|
2371 * @param other the other CompletableFuture |
|
2372 * @param fn the function to use to compute the value of |
|
2373 * the returned CompletableFuture |
|
2374 * @return the new CompletableFuture |
|
2375 */ |
|
2376 public <U> CompletableFuture<U> applyToEither |
|
2377 (CompletableFuture<? extends T> other, |
|
2378 Function<? super T, U> fn) { |
|
2379 return doApplyToEither(other, fn, null); |
|
2380 } |
|
2381 |
|
2382 /** |
|
2383 * Returns a new CompletableFuture that is asynchronously completed |
|
2384 * when either this or the other given CompletableFuture completes, |
|
2385 * with the result of the given function of either this or the other |
|
2386 * CompletableFuture's result from a task running in the |
|
2387 * {@link ForkJoinPool#commonPool()}. |
|
2388 * |
|
2389 * <p>If this and/or the other CompletableFuture complete |
|
2390 * exceptionally, then the returned CompletableFuture may also do so, |
|
2391 * with a CompletionException holding one of these exceptions as its |
|
2392 * cause. No guarantees are made about which result or exception is |
|
2393 * used in the returned CompletableFuture. If the supplied function |
|
2394 * throws an exception, then the returned CompletableFuture completes |
|
2395 * exceptionally with a CompletionException holding the exception as |
|
2396 * its cause. |
|
2397 * |
|
2398 * @param other the other CompletableFuture |
|
2399 * @param fn the function to use to compute the value of |
|
2400 * the returned CompletableFuture |
|
2401 * @return the new CompletableFuture |
|
2402 */ |
|
2403 public <U> CompletableFuture<U> applyToEitherAsync |
|
2404 (CompletableFuture<? extends T> other, |
|
2405 Function<? super T, U> fn) { |
|
2406 return doApplyToEither(other, fn, ForkJoinPool.commonPool()); |
|
2407 } |
|
2408 |
|
2409 /** |
|
2410 * Returns a new CompletableFuture that is asynchronously completed |
|
2411 * when either this or the other given CompletableFuture completes, |
|
2412 * with the result of the given function of either this or the other |
|
2413 * CompletableFuture's result from a task running in the |
|
2414 * given executor. |
|
2415 * |
|
2416 * <p>If this and/or the other CompletableFuture complete |
|
2417 * exceptionally, then the returned CompletableFuture may also do so, |
|
2418 * with a CompletionException holding one of these exceptions as its |
|
2419 * cause. No guarantees are made about which result or exception is |
|
2420 * used in the returned CompletableFuture. If the supplied function |
|
2421 * throws an exception, then the returned CompletableFuture completes |
|
2422 * exceptionally with a CompletionException holding the exception as |
|
2423 * its cause. |
|
2424 * |
|
2425 * @param other the other CompletableFuture |
|
2426 * @param fn the function to use to compute the value of |
|
2427 * the returned CompletableFuture |
|
2428 * @param executor the executor to use for asynchronous execution |
|
2429 * @return the new CompletableFuture |
|
2430 */ |
|
2431 public <U> CompletableFuture<U> applyToEitherAsync |
|
2432 (CompletableFuture<? extends T> other, |
|
2433 Function<? super T, U> fn, |
|
2434 Executor executor) { |
|
2435 if (executor == null) throw new NullPointerException(); |
|
2436 return doApplyToEither(other, fn, executor); |
|
2437 } |
|
2438 |
|
2439 private <U> CompletableFuture<U> doApplyToEither |
|
2440 (CompletableFuture<? extends T> other, |
|
2441 Function<? super T, U> fn, |
|
2442 Executor e) { |
|
2443 if (other == null || fn == null) throw new NullPointerException(); |
|
2444 CompletableFuture<U> dst = new CompletableFuture<U>(); |
|
2445 ApplyToEither<T,U> d = null; |
|
2446 Object r; |
|
2447 if ((r = result) == null && (r = other.result) == null) { |
|
2448 d = new ApplyToEither<T,U>(this, other, fn, dst, e); |
|
2449 CompletionNode q = null, p = new CompletionNode(d); |
|
2450 while ((r = result) == null && (r = other.result) == null) { |
|
2451 if (q != null) { |
|
2452 if (UNSAFE.compareAndSwapObject |
|
2453 (other, COMPLETIONS, q.next = other.completions, q)) |
|
2454 break; |
|
2455 } |
|
2456 else if (UNSAFE.compareAndSwapObject |
|
2457 (this, COMPLETIONS, p.next = completions, p)) |
|
2458 q = new CompletionNode(d); |
|
2459 } |
|
2460 } |
|
2461 if (r != null && (d == null || d.compareAndSet(0, 1))) { |
|
2462 T t; Throwable ex; |
|
2463 if (r instanceof AltResult) { |
|
2464 ex = ((AltResult)r).ex; |
|
2465 t = null; |
|
2466 } |
|
2467 else { |
|
2468 ex = null; |
|
2469 @SuppressWarnings("unchecked") T tr = (T) r; |
|
2470 t = tr; |
|
2471 } |
|
2472 U u = null; |
|
2473 if (ex == null) { |
|
2474 try { |
|
2475 if (e != null) |
|
2476 e.execute(new AsyncApply<T,U>(t, fn, dst)); |
|
2477 else |
|
2478 u = fn.apply(t); |
|
2479 } catch (Throwable rex) { |
|
2480 ex = rex; |
|
2481 } |
|
2482 } |
|
2483 if (e == null || ex != null) |
|
2484 dst.internalComplete(u, ex); |
|
2485 } |
|
2486 helpPostComplete(); |
|
2487 other.helpPostComplete(); |
|
2488 return dst; |
|
2489 } |
|
2490 |
|
2491 /** |
|
2492 * Returns a new CompletableFuture that is completed |
|
2493 * when either this or the other given CompletableFuture completes, |
|
2494 * after performing the given action with the result of either this |
|
2495 * or the other CompletableFuture's result. |
|
2496 * |
|
2497 * <p>If this and/or the other CompletableFuture complete |
|
2498 * exceptionally, then the returned CompletableFuture may also do so, |
|
2499 * with a CompletionException holding one of these exceptions as its |
|
2500 * cause. No guarantees are made about which result or exception is |
|
2501 * used in the returned CompletableFuture. If the supplied action |
|
2502 * throws an exception, then the returned CompletableFuture completes |
|
2503 * exceptionally with a CompletionException holding the exception as |
|
2504 * its cause. |
|
2505 * |
|
2506 * @param other the other CompletableFuture |
|
2507 * @param block the action to perform before completing the |
|
2508 * returned CompletableFuture |
|
2509 * @return the new CompletableFuture |
|
2510 */ |
|
2511 public CompletableFuture<Void> acceptEither |
|
2512 (CompletableFuture<? extends T> other, |
|
2513 Consumer<? super T> block) { |
|
2514 return doAcceptEither(other, block, null); |
|
2515 } |
|
2516 |
|
2517 /** |
|
2518 * Returns a new CompletableFuture that is asynchronously completed |
|
2519 * when either this or the other given CompletableFuture completes, |
|
2520 * after performing the given action with the result of either this |
|
2521 * or the other CompletableFuture's result from a task running in |
|
2522 * the {@link ForkJoinPool#commonPool()}. |
|
2523 * |
|
2524 * <p>If this and/or the other CompletableFuture complete |
|
2525 * exceptionally, then the returned CompletableFuture may also do so, |
|
2526 * with a CompletionException holding one of these exceptions as its |
|
2527 * cause. No guarantees are made about which result or exception is |
|
2528 * used in the returned CompletableFuture. If the supplied action |
|
2529 * throws an exception, then the returned CompletableFuture completes |
|
2530 * exceptionally with a CompletionException holding the exception as |
|
2531 * its cause. |
|
2532 * |
|
2533 * @param other the other CompletableFuture |
|
2534 * @param block the action to perform before completing the |
|
2535 * returned CompletableFuture |
|
2536 * @return the new CompletableFuture |
|
2537 */ |
|
2538 public CompletableFuture<Void> acceptEitherAsync |
|
2539 (CompletableFuture<? extends T> other, |
|
2540 Consumer<? super T> block) { |
|
2541 return doAcceptEither(other, block, ForkJoinPool.commonPool()); |
|
2542 } |
|
2543 |
|
2544 /** |
|
2545 * Returns a new CompletableFuture that is asynchronously completed |
|
2546 * when either this or the other given CompletableFuture completes, |
|
2547 * after performing the given action with the result of either this |
|
2548 * or the other CompletableFuture's result from a task running in |
|
2549 * the given executor. |
|
2550 * |
|
2551 * <p>If this and/or the other CompletableFuture complete |
|
2552 * exceptionally, then the returned CompletableFuture may also do so, |
|
2553 * with a CompletionException holding one of these exceptions as its |
|
2554 * cause. No guarantees are made about which result or exception is |
|
2555 * used in the returned CompletableFuture. If the supplied action |
|
2556 * throws an exception, then the returned CompletableFuture completes |
|
2557 * exceptionally with a CompletionException holding the exception as |
|
2558 * its cause. |
|
2559 * |
|
2560 * @param other the other CompletableFuture |
|
2561 * @param block the action to perform before completing the |
|
2562 * returned CompletableFuture |
|
2563 * @param executor the executor to use for asynchronous execution |
|
2564 * @return the new CompletableFuture |
|
2565 */ |
|
2566 public CompletableFuture<Void> acceptEitherAsync |
|
2567 (CompletableFuture<? extends T> other, |
|
2568 Consumer<? super T> block, |
|
2569 Executor executor) { |
|
2570 if (executor == null) throw new NullPointerException(); |
|
2571 return doAcceptEither(other, block, executor); |
|
2572 } |
|
2573 |
|
2574 private CompletableFuture<Void> doAcceptEither |
|
2575 (CompletableFuture<? extends T> other, |
|
2576 Consumer<? super T> fn, |
|
2577 Executor e) { |
|
2578 if (other == null || fn == null) throw new NullPointerException(); |
|
2579 CompletableFuture<Void> dst = new CompletableFuture<Void>(); |
|
2580 AcceptEither<T> d = null; |
|
2581 Object r; |
|
2582 if ((r = result) == null && (r = other.result) == null) { |
|
2583 d = new AcceptEither<T>(this, other, fn, dst, e); |
|
2584 CompletionNode q = null, p = new CompletionNode(d); |
|
2585 while ((r = result) == null && (r = other.result) == null) { |
|
2586 if (q != null) { |
|
2587 if (UNSAFE.compareAndSwapObject |
|
2588 (other, COMPLETIONS, q.next = other.completions, q)) |
|
2589 break; |
|
2590 } |
|
2591 else if (UNSAFE.compareAndSwapObject |
|
2592 (this, COMPLETIONS, p.next = completions, p)) |
|
2593 q = new CompletionNode(d); |
|
2594 } |
|
2595 } |
|
2596 if (r != null && (d == null || d.compareAndSet(0, 1))) { |
|
2597 T t; Throwable ex; |
|
2598 if (r instanceof AltResult) { |
|
2599 ex = ((AltResult)r).ex; |
|
2600 t = null; |
|
2601 } |
|
2602 else { |
|
2603 ex = null; |
|
2604 @SuppressWarnings("unchecked") T tr = (T) r; |
|
2605 t = tr; |
|
2606 } |
|
2607 if (ex == null) { |
|
2608 try { |
|
2609 if (e != null) |
|
2610 e.execute(new AsyncAccept<T>(t, fn, dst)); |
|
2611 else |
|
2612 fn.accept(t); |
|
2613 } catch (Throwable rex) { |
|
2614 ex = rex; |
|
2615 } |
|
2616 } |
|
2617 if (e == null || ex != null) |
|
2618 dst.internalComplete(null, ex); |
|
2619 } |
|
2620 helpPostComplete(); |
|
2621 other.helpPostComplete(); |
|
2622 return dst; |
|
2623 } |
|
2624 |
|
2625 /** |
|
2626 * Returns a new CompletableFuture that is completed |
|
2627 * when either this or the other given CompletableFuture completes, |
|
2628 * after performing the given action. |
|
2629 * |
|
2630 * <p>If this and/or the other CompletableFuture complete |
|
2631 * exceptionally, then the returned CompletableFuture may also do so, |
|
2632 * with a CompletionException holding one of these exceptions as its |
|
2633 * cause. No guarantees are made about which result or exception is |
|
2634 * used in the returned CompletableFuture. If the supplied action |
|
2635 * throws an exception, then the returned CompletableFuture completes |
|
2636 * exceptionally with a CompletionException holding the exception as |
|
2637 * its cause. |
|
2638 * |
|
2639 * @param other the other CompletableFuture |
|
2640 * @param action the action to perform before completing the |
|
2641 * returned CompletableFuture |
|
2642 * @return the new CompletableFuture |
|
2643 */ |
|
2644 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other, |
|
2645 Runnable action) { |
|
2646 return doRunAfterEither(other, action, null); |
|
2647 } |
|
2648 |
|
2649 /** |
|
2650 * Returns a new CompletableFuture that is asynchronously completed |
|
2651 * when either this or the other given CompletableFuture completes, |
|
2652 * after performing the given action from a task running in the |
|
2653 * {@link ForkJoinPool#commonPool()}. |
|
2654 * |
|
2655 * <p>If this and/or the other CompletableFuture complete |
|
2656 * exceptionally, then the returned CompletableFuture may also do so, |
|
2657 * with a CompletionException holding one of these exceptions as its |
|
2658 * cause. No guarantees are made about which result or exception is |
|
2659 * used in the returned CompletableFuture. If the supplied action |
|
2660 * throws an exception, then the returned CompletableFuture completes |
|
2661 * exceptionally with a CompletionException holding the exception as |
|
2662 * its cause. |
|
2663 * |
|
2664 * @param other the other CompletableFuture |
|
2665 * @param action the action to perform before completing the |
|
2666 * returned CompletableFuture |
|
2667 * @return the new CompletableFuture |
|
2668 */ |
|
2669 public CompletableFuture<Void> runAfterEitherAsync |
|
2670 (CompletableFuture<?> other, |
|
2671 Runnable action) { |
|
2672 return doRunAfterEither(other, action, ForkJoinPool.commonPool()); |
|
2673 } |
|
2674 |
|
2675 /** |
|
2676 * Returns a new CompletableFuture that is asynchronously completed |
|
2677 * when either this or the other given CompletableFuture completes, |
|
2678 * after performing the given action from a task running in the |
|
2679 * given executor. |
|
2680 * |
|
2681 * <p>If this and/or the other CompletableFuture complete |
|
2682 * exceptionally, then the returned CompletableFuture may also do so, |
|
2683 * with a CompletionException holding one of these exceptions as its |
|
2684 * cause. No guarantees are made about which result or exception is |
|
2685 * used in the returned CompletableFuture. If the supplied action |
|
2686 * throws an exception, then the returned CompletableFuture completes |
|
2687 * exceptionally with a CompletionException holding the exception as |
|
2688 * its cause. |
|
2689 * |
|
2690 * @param other the other CompletableFuture |
|
2691 * @param action the action to perform before completing the |
|
2692 * returned CompletableFuture |
|
2693 * @param executor the executor to use for asynchronous execution |
|
2694 * @return the new CompletableFuture |
|
2695 */ |
|
2696 public CompletableFuture<Void> runAfterEitherAsync |
|
2697 (CompletableFuture<?> other, |
|
2698 Runnable action, |
|
2699 Executor executor) { |
|
2700 if (executor == null) throw new NullPointerException(); |
|
2701 return doRunAfterEither(other, action, executor); |
|
2702 } |
|
2703 |
|
2704 private CompletableFuture<Void> doRunAfterEither |
|
2705 (CompletableFuture<?> other, |
|
2706 Runnable action, |
|
2707 Executor e) { |
|
2708 if (other == null || action == null) throw new NullPointerException(); |
|
2709 CompletableFuture<Void> dst = new CompletableFuture<Void>(); |
|
2710 RunAfterEither d = null; |
|
2711 Object r; |
|
2712 if ((r = result) == null && (r = other.result) == null) { |
|
2713 d = new RunAfterEither(this, other, action, dst, e); |
|
2714 CompletionNode q = null, p = new CompletionNode(d); |
|
2715 while ((r = result) == null && (r = other.result) == null) { |
|
2716 if (q != null) { |
|
2717 if (UNSAFE.compareAndSwapObject |
|
2718 (other, COMPLETIONS, q.next = other.completions, q)) |
|
2719 break; |
|
2720 } |
|
2721 else if (UNSAFE.compareAndSwapObject |
|
2722 (this, COMPLETIONS, p.next = completions, p)) |
|
2723 q = new CompletionNode(d); |
|
2724 } |
|
2725 } |
|
2726 if (r != null && (d == null || d.compareAndSet(0, 1))) { |
|
2727 Throwable ex; |
|
2728 if (r instanceof AltResult) |
|
2729 ex = ((AltResult)r).ex; |
|
2730 else |
|
2731 ex = null; |
|
2732 if (ex == null) { |
|
2733 try { |
|
2734 if (e != null) |
|
2735 e.execute(new AsyncRun(action, dst)); |
|
2736 else |
|
2737 action.run(); |
|
2738 } catch (Throwable rex) { |
|
2739 ex = rex; |
|
2740 } |
|
2741 } |
|
2742 if (e == null || ex != null) |
|
2743 dst.internalComplete(null, ex); |
|
2744 } |
|
2745 helpPostComplete(); |
|
2746 other.helpPostComplete(); |
|
2747 return dst; |
|
2748 } |
|
2749 |
|
2750 /** |
|
2751 * Returns a CompletableFuture that upon completion, has the same |
|
2752 * value as produced by the given function of the result of this |
|
2753 * CompletableFuture. |
|
2754 * |
|
2755 * <p>If this CompletableFuture completes exceptionally, then the |
|
2756 * returned CompletableFuture also does so, with a |
|
2757 * CompletionException holding this exception as its cause. |
|
2758 * Similarly, if the computed CompletableFuture completes |
|
2759 * exceptionally, then so does the returned CompletableFuture. |
|
2760 * |
|
2761 * @param fn the function returning a new CompletableFuture |
|
2762 * @return the CompletableFuture |
|
2763 */ |
|
2764 public <U> CompletableFuture<U> thenCompose |
|
2765 (Function<? super T, CompletableFuture<U>> fn) { |
|
2766 return doThenCompose(fn, null); |
|
2767 } |
|
2768 |
|
2769 /** |
|
2770 * Returns a CompletableFuture that upon completion, has the same |
|
2771 * value as that produced asynchronously using the {@link |
|
2772 * ForkJoinPool#commonPool()} by the given function of the result |
|
2773 * of this CompletableFuture. |
|
2774 * |
|
2775 * <p>If this CompletableFuture completes exceptionally, then the |
|
2776 * returned CompletableFuture also does so, with a |
|
2777 * CompletionException holding this exception as its cause. |
|
2778 * Similarly, if the computed CompletableFuture completes |
|
2779 * exceptionally, then so does the returned CompletableFuture. |
|
2780 * |
|
2781 * @param fn the function returning a new CompletableFuture |
|
2782 * @return the CompletableFuture |
|
2783 */ |
|
2784 public <U> CompletableFuture<U> thenComposeAsync |
|
2785 (Function<? super T, CompletableFuture<U>> fn) { |
|
2786 return doThenCompose(fn, ForkJoinPool.commonPool()); |
|
2787 } |
|
2788 |
|
2789 /** |
|
2790 * Returns a CompletableFuture that upon completion, has the same |
|
2791 * value as that produced asynchronously using the given executor |
|
2792 * by the given function of this CompletableFuture. |
|
2793 * |
|
2794 * <p>If this CompletableFuture completes exceptionally, then the |
|
2795 * returned CompletableFuture also does so, with a |
|
2796 * CompletionException holding this exception as its cause. |
|
2797 * Similarly, if the computed CompletableFuture completes |
|
2798 * exceptionally, then so does the returned CompletableFuture. |
|
2799 * |
|
2800 * @param fn the function returning a new CompletableFuture |
|
2801 * @param executor the executor to use for asynchronous execution |
|
2802 * @return the CompletableFuture |
|
2803 */ |
|
2804 public <U> CompletableFuture<U> thenComposeAsync |
|
2805 (Function<? super T, CompletableFuture<U>> fn, |
|
2806 Executor executor) { |
|
2807 if (executor == null) throw new NullPointerException(); |
|
2808 return doThenCompose(fn, executor); |
|
2809 } |
|
2810 |
|
2811 private <U> CompletableFuture<U> doThenCompose |
|
2812 (Function<? super T, CompletableFuture<U>> fn, |
|
2813 Executor e) { |
|
2814 if (fn == null) throw new NullPointerException(); |
|
2815 CompletableFuture<U> dst = null; |
|
2816 ThenCompose<T,U> d = null; |
|
2817 Object r; |
|
2818 if ((r = result) == null) { |
|
2819 dst = new CompletableFuture<U>(); |
|
2820 CompletionNode p = new CompletionNode |
|
2821 (d = new ThenCompose<T,U>(this, fn, dst, e)); |
|
2822 while ((r = result) == null) { |
|
2823 if (UNSAFE.compareAndSwapObject |
|
2824 (this, COMPLETIONS, p.next = completions, p)) |
|
2825 break; |
|
2826 } |
|
2827 } |
|
2828 if (r != null && (d == null || d.compareAndSet(0, 1))) { |
|
2829 T t; Throwable ex; |
|
2830 if (r instanceof AltResult) { |
|
2831 ex = ((AltResult)r).ex; |
|
2832 t = null; |
|
2833 } |
|
2834 else { |
|
2835 ex = null; |
|
2836 @SuppressWarnings("unchecked") T tr = (T) r; |
|
2837 t = tr; |
|
2838 } |
|
2839 if (ex == null) { |
|
2840 if (e != null) { |
|
2841 if (dst == null) |
|
2842 dst = new CompletableFuture<U>(); |
|
2843 e.execute(new AsyncCompose<T,U>(t, fn, dst)); |
|
2844 } |
|
2845 else { |
|
2846 try { |
|
2847 if ((dst = fn.apply(t)) == null) |
|
2848 ex = new NullPointerException(); |
|
2849 } catch (Throwable rex) { |
|
2850 ex = rex; |
|
2851 } |
|
2852 } |
|
2853 } |
|
2854 if (dst == null) |
|
2855 dst = new CompletableFuture<U>(); |
|
2856 if (e == null || ex != null) |
|
2857 dst.internalComplete(null, ex); |
|
2858 } |
|
2859 helpPostComplete(); |
|
2860 dst.helpPostComplete(); |
|
2861 return dst; |
|
2862 } |
|
2863 |
|
2864 /** |
|
2865 * Returns a new CompletableFuture that is completed when this |
|
2866 * CompletableFuture completes, with the result of the given |
|
2867 * function of the exception triggering this CompletableFuture's |
|
2868 * completion when it completes exceptionally; otherwise, if this |
|
2869 * CompletableFuture completes normally, then the returned |
|
2870 * CompletableFuture also completes normally with the same value. |
|
2871 * |
|
2872 * @param fn the function to use to compute the value of the |
|
2873 * returned CompletableFuture if this CompletableFuture completed |
|
2874 * exceptionally |
|
2875 * @return the new CompletableFuture |
|
2876 */ |
|
2877 public CompletableFuture<T> exceptionally |
|
2878 (Function<Throwable, ? extends T> fn) { |
|
2879 if (fn == null) throw new NullPointerException(); |
|
2880 CompletableFuture<T> dst = new CompletableFuture<T>(); |
|
2881 ExceptionCompletion<T> d = null; |
|
2882 Object r; |
|
2883 if ((r = result) == null) { |
|
2884 CompletionNode p = |
|
2885 new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst)); |
|
2886 while ((r = result) == null) { |
|
2887 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, |
|
2888 p.next = completions, p)) |
|
2889 break; |
|
2890 } |
|
2891 } |
|
2892 if (r != null && (d == null || d.compareAndSet(0, 1))) { |
|
2893 T t = null; Throwable ex, dx = null; |
|
2894 if (r instanceof AltResult) { |
|
2895 if ((ex = ((AltResult)r).ex) != null) { |
|
2896 try { |
|
2897 t = fn.apply(ex); |
|
2898 } catch (Throwable rex) { |
|
2899 dx = rex; |
|
2900 } |
|
2901 } |
|
2902 } |
|
2903 else { |
|
2904 @SuppressWarnings("unchecked") T tr = (T) r; |
|
2905 t = tr; |
|
2906 } |
|
2907 dst.internalComplete(t, dx); |
|
2908 } |
|
2909 helpPostComplete(); |
|
2910 return dst; |
|
2911 } |
|
2912 |
|
2913 /** |
|
2914 * Returns a new CompletableFuture that is completed when this |
|
2915 * CompletableFuture completes, with the result of the given |
|
2916 * function of the result and exception of this CompletableFuture's |
|
2917 * completion. The given function is invoked with the result (or |
|
2918 * {@code null} if none) and the exception (or {@code null} if none) |
|
2919 * of this CompletableFuture when complete. |
|
2920 * |
|
2921 * @param fn the function to use to compute the value of the |
|
2922 * returned CompletableFuture |
|
2923 * @return the new CompletableFuture |
|
2924 */ |
|
2925 public <U> CompletableFuture<U> handle |
|
2926 (BiFunction<? super T, Throwable, ? extends U> fn) { |
|
2927 if (fn == null) throw new NullPointerException(); |
|
2928 CompletableFuture<U> dst = new CompletableFuture<U>(); |
|
2929 HandleCompletion<T,U> d = null; |
|
2930 Object r; |
|
2931 if ((r = result) == null) { |
|
2932 CompletionNode p = |
|
2933 new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst)); |
|
2934 while ((r = result) == null) { |
|
2935 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, |
|
2936 p.next = completions, p)) |
|
2937 break; |
|
2938 } |
|
2939 } |
|
2940 if (r != null && (d == null || d.compareAndSet(0, 1))) { |
|
2941 T t; Throwable ex; |
|
2942 if (r instanceof AltResult) { |
|
2943 ex = ((AltResult)r).ex; |
|
2944 t = null; |
|
2945 } |
|
2946 else { |
|
2947 ex = null; |
|
2948 @SuppressWarnings("unchecked") T tr = (T) r; |
|
2949 t = tr; |
|
2950 } |
|
2951 U u; Throwable dx; |
|
2952 try { |
|
2953 u = fn.apply(t, ex); |
|
2954 dx = null; |
|
2955 } catch (Throwable rex) { |
|
2956 dx = rex; |
|
2957 u = null; |
|
2958 } |
|
2959 dst.internalComplete(u, dx); |
|
2960 } |
|
2961 helpPostComplete(); |
|
2962 return dst; |
|
2963 } |
|
2964 |
|
2965 |
|
2966 /* ------------- Arbitrary-arity constructions -------------- */ |
|
2967 |
|
2968 /* |
|
2969 * The basic plan of attack is to recursively form binary |
|
2970 * completion trees of elements. This can be overkill for small |
|
2971 * sets, but scales nicely. The And/All vs Or/Any forms use the |
|
2972 * same idea, but details differ. |
|
2973 */ |
|
2974 |
|
2975 /** |
|
2976 * Returns a new CompletableFuture that is completed when all of |
|
2977 * the given CompletableFutures complete. If any of the given |
|
2978 * CompletableFutures complete exceptionally, then the returned |
|
2979 * CompletableFuture also does so, with a CompletionException |
|
2980 * holding this exception as its cause. Otherwise, the results, |
|
2981 * if any, of the given CompletableFutures are not reflected in |
|
2982 * the returned CompletableFuture, but may be obtained by |
|
2983 * inspecting them individually. If no CompletableFutures are |
|
2984 * provided, returns a CompletableFuture completed with the value |
|
2985 * {@code null}. |
|
2986 * |
|
2987 * <p>Among the applications of this method is to await completion |
|
2988 * of a set of independent CompletableFutures before continuing a |
|
2989 * program, as in: {@code CompletableFuture.allOf(c1, c2, |
|
2990 * c3).join();}. |
|
2991 * |
|
2992 * @param cfs the CompletableFutures |
|
2993 * @return a new CompletableFuture that is completed when all of the |
|
2994 * given CompletableFutures complete |
|
2995 * @throws NullPointerException if the array or any of its elements are |
|
2996 * {@code null} |
|
2997 */ |
|
2998 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) { |
|
2999 int len = cfs.length; // Directly handle empty and singleton cases |
|
3000 if (len > 1) |
|
3001 return allTree(cfs, 0, len - 1); |
|
3002 else { |
|
3003 CompletableFuture<Void> dst = new CompletableFuture<Void>(); |
|
3004 CompletableFuture<?> f; |
|
3005 if (len == 0) |
|
3006 dst.result = NIL; |
|
3007 else if ((f = cfs[0]) == null) |
|
3008 throw new NullPointerException(); |
|
3009 else { |
|
3010 ThenPropagate d = null; |
|
3011 CompletionNode p = null; |
|
3012 Object r; |
|
3013 while ((r = f.result) == null) { |
|
3014 if (d == null) |
|
3015 d = new ThenPropagate(f, dst); |
|
3016 else if (p == null) |
|
3017 p = new CompletionNode(d); |
|
3018 else if (UNSAFE.compareAndSwapObject |
|
3019 (f, COMPLETIONS, p.next = f.completions, p)) |
|
3020 break; |
|
3021 } |
|
3022 if (r != null && (d == null || d.compareAndSet(0, 1))) |
|
3023 dst.internalComplete(null, (r instanceof AltResult) ? |
|
3024 ((AltResult)r).ex : null); |
|
3025 f.helpPostComplete(); |
|
3026 } |
|
3027 return dst; |
|
3028 } |
|
3029 } |
|
3030 |
|
3031 /** |
|
3032 * Recursively constructs an And'ed tree of CompletableFutures. |
|
3033 * Called only when array known to have at least two elements. |
|
3034 */ |
|
3035 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs, |
|
3036 int lo, int hi) { |
|
3037 CompletableFuture<?> fst, snd; |
|
3038 int mid = (lo + hi) >>> 1; |
|
3039 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null || |
|
3040 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null) |
|
3041 throw new NullPointerException(); |
|
3042 CompletableFuture<Void> dst = new CompletableFuture<Void>(); |
|
3043 AndCompletion d = null; |
|
3044 CompletionNode p = null, q = null; |
|
3045 Object r = null, s = null; |
|
3046 while ((r = fst.result) == null || (s = snd.result) == null) { |
|
3047 if (d == null) |
|
3048 d = new AndCompletion(fst, snd, dst); |
|
3049 else if (p == null) |
|
3050 p = new CompletionNode(d); |
|
3051 else if (q == null) { |
|
3052 if (UNSAFE.compareAndSwapObject |
|
3053 (fst, COMPLETIONS, p.next = fst.completions, p)) |
|
3054 q = new CompletionNode(d); |
|
3055 } |
|
3056 else if (UNSAFE.compareAndSwapObject |
|
3057 (snd, COMPLETIONS, q.next = snd.completions, q)) |
|
3058 break; |
|
3059 } |
|
3060 if ((r != null || (r = fst.result) != null) && |
|
3061 (s != null || (s = snd.result) != null) && |
|
3062 (d == null || d.compareAndSet(0, 1))) { |
|
3063 Throwable ex; |
|
3064 if (r instanceof AltResult) |
|
3065 ex = ((AltResult)r).ex; |
|
3066 else |
|
3067 ex = null; |
|
3068 if (ex == null && (s instanceof AltResult)) |
|
3069 ex = ((AltResult)s).ex; |
|
3070 dst.internalComplete(null, ex); |
|
3071 } |
|
3072 fst.helpPostComplete(); |
|
3073 snd.helpPostComplete(); |
|
3074 return dst; |
|
3075 } |
|
3076 |
|
3077 /** |
|
3078 * Returns a new CompletableFuture that is completed when any of |
|
3079 * the given CompletableFutures complete, with the same result. |
|
3080 * Otherwise, if it completed exceptionally, the returned |
|
3081 * CompletableFuture also does so, with a CompletionException |
|
3082 * holding this exception as its cause. If no CompletableFutures |
|
3083 * are provided, returns an incomplete CompletableFuture. |
|
3084 * |
|
3085 * @param cfs the CompletableFutures |
|
3086 * @return a new CompletableFuture that is completed with the |
|
3087 * result or exception of any of the given CompletableFutures when |
|
3088 * one completes |
|
3089 * @throws NullPointerException if the array or any of its elements are |
|
3090 * {@code null} |
|
3091 */ |
|
3092 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) { |
|
3093 int len = cfs.length; // Same idea as allOf |
|
3094 if (len > 1) |
|
3095 return anyTree(cfs, 0, len - 1); |
|
3096 else { |
|
3097 CompletableFuture<Object> dst = new CompletableFuture<Object>(); |
|
3098 CompletableFuture<?> f; |
|
3099 if (len == 0) |
|
3100 ; // skip |
|
3101 else if ((f = cfs[0]) == null) |
|
3102 throw new NullPointerException(); |
|
3103 else { |
|
3104 ThenCopy<Object> d = null; |
|
3105 CompletionNode p = null; |
|
3106 Object r; |
|
3107 while ((r = f.result) == null) { |
|
3108 if (d == null) |
|
3109 d = new ThenCopy<Object>(f, dst); |
|
3110 else if (p == null) |
|
3111 p = new CompletionNode(d); |
|
3112 else if (UNSAFE.compareAndSwapObject |
|
3113 (f, COMPLETIONS, p.next = f.completions, p)) |
|
3114 break; |
|
3115 } |
|
3116 if (r != null && (d == null || d.compareAndSet(0, 1))) { |
|
3117 Throwable ex; Object t; |
|
3118 if (r instanceof AltResult) { |
|
3119 ex = ((AltResult)r).ex; |
|
3120 t = null; |
|
3121 } |
|
3122 else { |
|
3123 ex = null; |
|
3124 t = r; |
|
3125 } |
|
3126 dst.internalComplete(t, ex); |
|
3127 } |
|
3128 f.helpPostComplete(); |
|
3129 } |
|
3130 return dst; |
|
3131 } |
|
3132 } |
|
3133 |
|
3134 /** |
|
3135 * Recursively constructs an Or'ed tree of CompletableFutures. |
|
3136 */ |
|
3137 private static CompletableFuture<Object> anyTree(CompletableFuture<?>[] cfs, |
|
3138 int lo, int hi) { |
|
3139 CompletableFuture<?> fst, snd; |
|
3140 int mid = (lo + hi) >>> 1; |
|
3141 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null || |
|
3142 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null) |
|
3143 throw new NullPointerException(); |
|
3144 CompletableFuture<Object> dst = new CompletableFuture<Object>(); |
|
3145 OrCompletion d = null; |
|
3146 CompletionNode p = null, q = null; |
|
3147 Object r; |
|
3148 while ((r = fst.result) == null && (r = snd.result) == null) { |
|
3149 if (d == null) |
|
3150 d = new OrCompletion(fst, snd, dst); |
|
3151 else if (p == null) |
|
3152 p = new CompletionNode(d); |
|
3153 else if (q == null) { |
|
3154 if (UNSAFE.compareAndSwapObject |
|
3155 (fst, COMPLETIONS, p.next = fst.completions, p)) |
|
3156 q = new CompletionNode(d); |
|
3157 } |
|
3158 else if (UNSAFE.compareAndSwapObject |
|
3159 (snd, COMPLETIONS, q.next = snd.completions, q)) |
|
3160 break; |
|
3161 } |
|
3162 if ((r != null || (r = fst.result) != null || |
|
3163 (r = snd.result) != null) && |
|
3164 (d == null || d.compareAndSet(0, 1))) { |
|
3165 Throwable ex; Object t; |
|
3166 if (r instanceof AltResult) { |
|
3167 ex = ((AltResult)r).ex; |
|
3168 t = null; |
|
3169 } |
|
3170 else { |
|
3171 ex = null; |
|
3172 t = r; |
|
3173 } |
|
3174 dst.internalComplete(t, ex); |
|
3175 } |
|
3176 fst.helpPostComplete(); |
|
3177 snd.helpPostComplete(); |
|
3178 return dst; |
|
3179 } |
|
3180 |
|
3181 /* ------------- Control and status methods -------------- */ |
|
3182 |
|
3183 /** |
|
3184 * If not already completed, completes this CompletableFuture with |
|
3185 * a {@link CancellationException}. Dependent CompletableFutures |
|
3186 * that have not already completed will also complete |
|
3187 * exceptionally, with a {@link CompletionException} caused by |
|
3188 * this {@code CancellationException}. |
|
3189 * |
|
3190 * @param mayInterruptIfRunning this value has no effect in this |
|
3191 * implementation because interrupts are not used to control |
|
3192 * processing. |
|
3193 * |
|
3194 * @return {@code true} if this task is now cancelled |
|
3195 */ |
|
3196 public boolean cancel(boolean mayInterruptIfRunning) { |
|
3197 boolean cancelled = (result == null) && |
|
3198 UNSAFE.compareAndSwapObject |
|
3199 (this, RESULT, null, new AltResult(new CancellationException())); |
|
3200 postComplete(); |
|
3201 return cancelled || isCancelled(); |
|
3202 } |
|
3203 |
|
3204 /** |
|
3205 * Returns {@code true} if this CompletableFuture was cancelled |
|
3206 * before it completed normally. |
|
3207 * |
|
3208 * @return {@code true} if this CompletableFuture was cancelled |
|
3209 * before it completed normally |
|
3210 */ |
|
3211 public boolean isCancelled() { |
|
3212 Object r; |
|
3213 return ((r = result) instanceof AltResult) && |
|
3214 (((AltResult)r).ex instanceof CancellationException); |
|
3215 } |
|
3216 |
|
3217 /** |
|
3218 * Forcibly sets or resets the value subsequently returned by |
|
3219 * method {@link #get()} and related methods, whether or not |
|
3220 * already completed. This method is designed for use only in |
|
3221 * error recovery actions, and even in such situations may result |
|
3222 * in ongoing dependent completions using established versus |
|
3223 * overwritten outcomes. |
|
3224 * |
|
3225 * @param value the completion value |
|
3226 */ |
|
3227 public void obtrudeValue(T value) { |
|
3228 result = (value == null) ? NIL : value; |
|
3229 postComplete(); |
|
3230 } |
|
3231 |
|
3232 /** |
|
3233 * Forcibly causes subsequent invocations of method {@link #get()} |
|
3234 * and related methods to throw the given exception, whether or |
|
3235 * not already completed. This method is designed for use only in |
|
3236 * recovery actions, and even in such situations may result in |
|
3237 * ongoing dependent completions using established versus |
|
3238 * overwritten outcomes. |
|
3239 * |
|
3240 * @param ex the exception |
|
3241 */ |
|
3242 public void obtrudeException(Throwable ex) { |
|
3243 if (ex == null) throw new NullPointerException(); |
|
3244 result = new AltResult(ex); |
|
3245 postComplete(); |
|
3246 } |
|
3247 |
|
3248 /** |
|
3249 * Returns the estimated number of CompletableFutures whose |
|
3250 * completions are awaiting completion of this CompletableFuture. |
|
3251 * This method is designed for use in monitoring system state, not |
|
3252 * for synchronization control. |
|
3253 * |
|
3254 * @return the number of dependent CompletableFutures |
|
3255 */ |
|
3256 public int getNumberOfDependents() { |
|
3257 int count = 0; |
|
3258 for (CompletionNode p = completions; p != null; p = p.next) |
|
3259 ++count; |
|
3260 return count; |
|
3261 } |
|
3262 |
|
3263 /** |
|
3264 * Returns a string identifying this CompletableFuture, as well as |
|
3265 * its completion state. The state, in brackets, contains the |
|
3266 * String {@code "Completed Normally"} or the String {@code |
|
3267 * "Completed Exceptionally"}, or the String {@code "Not |
|
3268 * completed"} followed by the number of CompletableFutures |
|
3269 * dependent upon its completion, if any. |
|
3270 * |
|
3271 * @return a string identifying this CompletableFuture, as well as its state |
|
3272 */ |
|
3273 public String toString() { |
|
3274 Object r = result; |
|
3275 int count; |
|
3276 return super.toString() + |
|
3277 ((r == null) ? |
|
3278 (((count = getNumberOfDependents()) == 0) ? |
|
3279 "[Not completed]" : |
|
3280 "[Not completed, " + count + " dependents]") : |
|
3281 (((r instanceof AltResult) && ((AltResult)r).ex != null) ? |
|
3282 "[Completed exceptionally]" : |
|
3283 "[Completed normally]")); |
|
3284 } |
|
3285 |
|
3286 // Unsafe mechanics |
|
3287 private static final sun.misc.Unsafe UNSAFE; |
|
3288 private static final long RESULT; |
|
3289 private static final long WAITERS; |
|
3290 private static final long COMPLETIONS; |
|
3291 static { |
|
3292 try { |
|
3293 UNSAFE = sun.misc.Unsafe.getUnsafe(); |
|
3294 Class<?> k = CompletableFuture.class; |
|
3295 RESULT = UNSAFE.objectFieldOffset |
|
3296 (k.getDeclaredField("result")); |
|
3297 WAITERS = UNSAFE.objectFieldOffset |
|
3298 (k.getDeclaredField("waiters")); |
|
3299 COMPLETIONS = UNSAFE.objectFieldOffset |
|
3300 (k.getDeclaredField("completions")); |
|
3301 } catch (Exception e) { |
|
3302 throw new Error(e); |
|
3303 } |
|
3304 } |
|
3305 } |