test/jdk/java/util/concurrent/tck/SubmissionPublisherTest.java
changeset 48047 ff597804e8c1
parent 47730 c7b5b1ce8145
child 48541 946e34c2dec9
equal deleted inserted replaced
48046:98801bd22f5b 48047:ff597804e8c1
    31  * domain, as explained at
    31  * domain, as explained at
    32  * http://creativecommons.org/publicdomain/zero/1.0/
    32  * http://creativecommons.org/publicdomain/zero/1.0/
    33  */
    33  */
    34 
    34 
    35 import java.util.concurrent.CompletableFuture;
    35 import java.util.concurrent.CompletableFuture;
       
    36 import java.util.concurrent.CountDownLatch;
    36 import java.util.concurrent.Executor;
    37 import java.util.concurrent.Executor;
    37 import java.util.concurrent.Executors;
    38 import java.util.concurrent.Executors;
    38 import java.util.concurrent.Flow;
    39 import java.util.concurrent.Flow;
    39 import java.util.concurrent.ForkJoinPool;
    40 import java.util.concurrent.ForkJoinPool;
    40 import java.util.concurrent.SubmissionPublisher;
    41 import java.util.concurrent.SubmissionPublisher;
   427 
   428 
   428     /**
   429     /**
   429      * Cancelling a subscription eventually causes no more onNexts to be issued
   430      * Cancelling a subscription eventually causes no more onNexts to be issued
   430      */
   431      */
   431     public void testCancel() {
   432     public void testCancel() {
   432         SubmissionPublisher<Integer> p = basicPublisher();
   433         SubmissionPublisher<Integer> p =
       
   434             new SubmissionPublisher<Integer>(basicExecutor, 4); // must be < 20
   433         TestSubscriber s1 = new TestSubscriber();
   435         TestSubscriber s1 = new TestSubscriber();
   434         TestSubscriber s2 = new TestSubscriber();
   436         TestSubscriber s2 = new TestSubscriber();
   435         p.subscribe(s1);
   437         p.subscribe(s1);
   436         p.subscribe(s2);
   438         p.subscribe(s2);
   437         s1.awaitSubscribe();
   439         s1.awaitSubscribe();
   664         TestSubscriber s1 = new TestSubscriber();
   666         TestSubscriber s1 = new TestSubscriber();
   665         TestSubscriber s2 = new TestSubscriber();
   667         TestSubscriber s2 = new TestSubscriber();
   666         p.subscribe(s1);
   668         p.subscribe(s1);
   667         p.subscribe(s2);
   669         p.subscribe(s2);
   668         for (int i = 1; i <= 20; ++i) {
   670         for (int i = 1; i <= 20; ++i) {
   669             assertTrue(p.estimateMinimumDemand() <= 1);
       
   670             assertTrue(p.submit(i) >= 0);
   671             assertTrue(p.submit(i) >= 0);
   671         }
   672         }
   672         p.close();
   673         p.close();
   673         s2.awaitComplete();
   674         s2.awaitComplete();
   674         s1.awaitComplete();
   675         s1.awaitComplete();
  1003         for (int i = 1; i <= n; ++i)
  1004         for (int i = 1; i <= n; ++i)
  1004             p.submit(i);
  1005             p.submit(i);
  1005         assertTrue(count.get() < n);
  1006         assertTrue(count.get() < n);
  1006     }
  1007     }
  1007 
  1008 
       
  1009     /**
       
  1010      * Tests scenario for
       
  1011      * JDK-8187947: A race condition in SubmissionPublisher
       
  1012      * cvs update -D '2017-11-25' src/main/java/util/concurrent/SubmissionPublisher.java && ant -Djsr166.expensiveTests=true -Djsr166.tckTestClass=SubmissionPublisherTest -Djsr166.methodFilter=testMissedSignal tck; cvs update -A src/main/java/util/concurrent/SubmissionPublisher.java
       
  1013      */
       
  1014     public void testMissedSignal_8187947() throws Exception {
       
  1015         final int N = expensiveTests ? (1 << 20) : (1 << 10);
       
  1016         final CountDownLatch finished = new CountDownLatch(1);
       
  1017         final SubmissionPublisher<Boolean> pub = new SubmissionPublisher<>();
       
  1018         class Sub implements Subscriber<Boolean> {
       
  1019             int received;
       
  1020             public void onSubscribe(Subscription s) {
       
  1021                 s.request(N);
       
  1022             }
       
  1023             public void onNext(Boolean item) {
       
  1024                 if (++received == N)
       
  1025                     finished.countDown();
       
  1026                 else
       
  1027                     CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE));
       
  1028             }
       
  1029             public void onError(Throwable t) { throw new AssertionError(t); }
       
  1030             public void onComplete() {}
       
  1031         }
       
  1032         pub.subscribe(new Sub());
       
  1033         CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE));
       
  1034         await(finished);
       
  1035     }
  1008 }
  1036 }