equal
deleted
inserted
replaced
31 * domain, as explained at |
31 * domain, as explained at |
32 * http://creativecommons.org/publicdomain/zero/1.0/ |
32 * http://creativecommons.org/publicdomain/zero/1.0/ |
33 */ |
33 */ |
34 |
34 |
35 import java.util.concurrent.CompletableFuture; |
35 import java.util.concurrent.CompletableFuture; |
|
36 import java.util.concurrent.CountDownLatch; |
36 import java.util.concurrent.Executor; |
37 import java.util.concurrent.Executor; |
37 import java.util.concurrent.Executors; |
38 import java.util.concurrent.Executors; |
38 import java.util.concurrent.Flow; |
39 import java.util.concurrent.Flow; |
39 import java.util.concurrent.ForkJoinPool; |
40 import java.util.concurrent.ForkJoinPool; |
40 import java.util.concurrent.SubmissionPublisher; |
41 import java.util.concurrent.SubmissionPublisher; |
427 |
428 |
428 /** |
429 /** |
429 * Cancelling a subscription eventually causes no more onNexts to be issued |
430 * Cancelling a subscription eventually causes no more onNexts to be issued |
430 */ |
431 */ |
431 public void testCancel() { |
432 public void testCancel() { |
432 SubmissionPublisher<Integer> p = basicPublisher(); |
433 SubmissionPublisher<Integer> p = |
|
434 new SubmissionPublisher<Integer>(basicExecutor, 4); // must be < 20 |
433 TestSubscriber s1 = new TestSubscriber(); |
435 TestSubscriber s1 = new TestSubscriber(); |
434 TestSubscriber s2 = new TestSubscriber(); |
436 TestSubscriber s2 = new TestSubscriber(); |
435 p.subscribe(s1); |
437 p.subscribe(s1); |
436 p.subscribe(s2); |
438 p.subscribe(s2); |
437 s1.awaitSubscribe(); |
439 s1.awaitSubscribe(); |
664 TestSubscriber s1 = new TestSubscriber(); |
666 TestSubscriber s1 = new TestSubscriber(); |
665 TestSubscriber s2 = new TestSubscriber(); |
667 TestSubscriber s2 = new TestSubscriber(); |
666 p.subscribe(s1); |
668 p.subscribe(s1); |
667 p.subscribe(s2); |
669 p.subscribe(s2); |
668 for (int i = 1; i <= 20; ++i) { |
670 for (int i = 1; i <= 20; ++i) { |
669 assertTrue(p.estimateMinimumDemand() <= 1); |
|
670 assertTrue(p.submit(i) >= 0); |
671 assertTrue(p.submit(i) >= 0); |
671 } |
672 } |
672 p.close(); |
673 p.close(); |
673 s2.awaitComplete(); |
674 s2.awaitComplete(); |
674 s1.awaitComplete(); |
675 s1.awaitComplete(); |
1003 for (int i = 1; i <= n; ++i) |
1004 for (int i = 1; i <= n; ++i) |
1004 p.submit(i); |
1005 p.submit(i); |
1005 assertTrue(count.get() < n); |
1006 assertTrue(count.get() < n); |
1006 } |
1007 } |
1007 |
1008 |
|
1009 /** |
|
1010 * Tests scenario for |
|
1011 * JDK-8187947: A race condition in SubmissionPublisher |
|
1012 * cvs update -D '2017-11-25' src/main/java/util/concurrent/SubmissionPublisher.java && ant -Djsr166.expensiveTests=true -Djsr166.tckTestClass=SubmissionPublisherTest -Djsr166.methodFilter=testMissedSignal tck; cvs update -A src/main/java/util/concurrent/SubmissionPublisher.java |
|
1013 */ |
|
1014 public void testMissedSignal_8187947() throws Exception { |
|
1015 final int N = expensiveTests ? (1 << 20) : (1 << 10); |
|
1016 final CountDownLatch finished = new CountDownLatch(1); |
|
1017 final SubmissionPublisher<Boolean> pub = new SubmissionPublisher<>(); |
|
1018 class Sub implements Subscriber<Boolean> { |
|
1019 int received; |
|
1020 public void onSubscribe(Subscription s) { |
|
1021 s.request(N); |
|
1022 } |
|
1023 public void onNext(Boolean item) { |
|
1024 if (++received == N) |
|
1025 finished.countDown(); |
|
1026 else |
|
1027 CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE)); |
|
1028 } |
|
1029 public void onError(Throwable t) { throw new AssertionError(t); } |
|
1030 public void onComplete() {} |
|
1031 } |
|
1032 pub.subscribe(new Sub()); |
|
1033 CompletableFuture.runAsync(() -> pub.submit(Boolean.TRUE)); |
|
1034 await(finished); |
|
1035 } |
1008 } |
1036 } |