jdk/src/java.base/share/classes/java/util/concurrent/SubmissionPublisher.java
changeset 42927 1d31e540bfcb
parent 40817 4f5fb115676d
child 44099 bc1a91ee90f0
equal deleted inserted replaced
42926:8b9cacdadb2d 42927:1d31e540bfcb
   193         (ForkJoinPool.getCommonPoolParallelism() > 1) ?
   193         (ForkJoinPool.getCommonPoolParallelism() > 1) ?
   194         ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
   194         ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
   195 
   195 
   196     /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
   196     /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
   197     private static final class ThreadPerTaskExecutor implements Executor {
   197     private static final class ThreadPerTaskExecutor implements Executor {
       
   198         ThreadPerTaskExecutor() {}      // prevent access constructor creation
   198         public void execute(Runnable r) { new Thread(r).start(); }
   199         public void execute(Runnable r) { new Thread(r).start(); }
   199     }
   200     }
   200 
   201 
   201     /**
   202     /**
   202      * Clients (BufferedSubscriptions) are maintained in a linked list
   203      * Clients (BufferedSubscriptions) are maintained in a linked list
  1452         /**
  1453         /**
  1453          * Responds to control events in consume().
  1454          * Responds to control events in consume().
  1454          */
  1455          */
  1455         private boolean checkControl(Flow.Subscriber<? super T> s, int c) {
  1456         private boolean checkControl(Flow.Subscriber<? super T> s, int c) {
  1456             boolean stat = true;
  1457             boolean stat = true;
  1457             if ((c & ERROR) != 0) {
  1458             if ((c & SUBSCRIBE) != 0) {
       
  1459                 if (CTL.compareAndSet(this, c, c & ~SUBSCRIBE)) {
       
  1460                     try {
       
  1461                         if (s != null)
       
  1462                             s.onSubscribe(this);
       
  1463                     } catch (Throwable ex) {
       
  1464                         onError(ex);
       
  1465                     }
       
  1466                 }
       
  1467             }
       
  1468             else if ((c & ERROR) != 0) {
  1458                 Throwable ex = pendingError;
  1469                 Throwable ex = pendingError;
  1459                 ctl = DISABLED;           // no need for CAS
  1470                 ctl = DISABLED;           // no need for CAS
  1460                 if (ex != null) {         // null if errorless cancel
  1471                 if (ex != null) {         // null if errorless cancel
  1461                     try {
  1472                     try {
  1462                         if (s != null)
  1473                         if (s != null)
  1463                             s.onError(ex);
  1474                             s.onError(ex);
  1464                     } catch (Throwable ignore) {
  1475                     } catch (Throwable ignore) {
  1465                     }
       
  1466                 }
       
  1467             }
       
  1468             else if ((c & SUBSCRIBE) != 0) {
       
  1469                 if (CTL.compareAndSet(this, c, c & ~SUBSCRIBE)) {
       
  1470                     try {
       
  1471                         if (s != null)
       
  1472                             s.onSubscribe(this);
       
  1473                     } catch (Throwable ex) {
       
  1474                         onError(ex);
       
  1475                     }
  1476                     }
  1476                 }
  1477                 }
  1477             }
  1478             }
  1478             else {
  1479             else {
  1479                 detach();
  1480                 detach();