equal
deleted
inserted
replaced
193 (ForkJoinPool.getCommonPoolParallelism() > 1) ? |
193 (ForkJoinPool.getCommonPoolParallelism() > 1) ? |
194 ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); |
194 ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); |
195 |
195 |
196 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ |
196 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ |
197 private static final class ThreadPerTaskExecutor implements Executor { |
197 private static final class ThreadPerTaskExecutor implements Executor { |
|
198 ThreadPerTaskExecutor() {} // prevent access constructor creation |
198 public void execute(Runnable r) { new Thread(r).start(); } |
199 public void execute(Runnable r) { new Thread(r).start(); } |
199 } |
200 } |
200 |
201 |
201 /** |
202 /** |
202 * Clients (BufferedSubscriptions) are maintained in a linked list |
203 * Clients (BufferedSubscriptions) are maintained in a linked list |
1452 /** |
1453 /** |
1453 * Responds to control events in consume(). |
1454 * Responds to control events in consume(). |
1454 */ |
1455 */ |
1455 private boolean checkControl(Flow.Subscriber<? super T> s, int c) { |
1456 private boolean checkControl(Flow.Subscriber<? super T> s, int c) { |
1456 boolean stat = true; |
1457 boolean stat = true; |
1457 if ((c & ERROR) != 0) { |
1458 if ((c & SUBSCRIBE) != 0) { |
|
1459 if (CTL.compareAndSet(this, c, c & ~SUBSCRIBE)) { |
|
1460 try { |
|
1461 if (s != null) |
|
1462 s.onSubscribe(this); |
|
1463 } catch (Throwable ex) { |
|
1464 onError(ex); |
|
1465 } |
|
1466 } |
|
1467 } |
|
1468 else if ((c & ERROR) != 0) { |
1458 Throwable ex = pendingError; |
1469 Throwable ex = pendingError; |
1459 ctl = DISABLED; // no need for CAS |
1470 ctl = DISABLED; // no need for CAS |
1460 if (ex != null) { // null if errorless cancel |
1471 if (ex != null) { // null if errorless cancel |
1461 try { |
1472 try { |
1462 if (s != null) |
1473 if (s != null) |
1463 s.onError(ex); |
1474 s.onError(ex); |
1464 } catch (Throwable ignore) { |
1475 } catch (Throwable ignore) { |
1465 } |
|
1466 } |
|
1467 } |
|
1468 else if ((c & SUBSCRIBE) != 0) { |
|
1469 if (CTL.compareAndSet(this, c, c & ~SUBSCRIBE)) { |
|
1470 try { |
|
1471 if (s != null) |
|
1472 s.onSubscribe(this); |
|
1473 } catch (Throwable ex) { |
|
1474 onError(ex); |
|
1475 } |
1476 } |
1476 } |
1477 } |
1477 } |
1478 } |
1478 else { |
1479 else { |
1479 detach(); |
1480 detach(); |