test/jdk/java/util/concurrent/ExecutorService/Invoke.java
changeset 47341 ed1fd45b6eb5
parent 47216 71c04702a3d5
child 47727 53020d8cdf5b
equal deleted inserted replaced
47340:83f933b97787 47341:ed1fd45b6eb5
    26  * @bug     6267833
    26  * @bug     6267833
    27  * @summary Tests for invokeAny, invokeAll
    27  * @summary Tests for invokeAny, invokeAll
    28  * @author  Martin Buchholz
    28  * @author  Martin Buchholz
    29  */
    29  */
    30 
    30 
    31 import java.util.Arrays;
    31 import static java.util.concurrent.TimeUnit.NANOSECONDS;
       
    32 import static java.util.concurrent.TimeUnit.SECONDS;
       
    33 
    32 import java.util.List;
    34 import java.util.List;
       
    35 import java.util.stream.Collectors;
       
    36 import java.util.stream.IntStream;
    33 import java.util.concurrent.Callable;
    37 import java.util.concurrent.Callable;
       
    38 import java.util.concurrent.CyclicBarrier;
    34 import java.util.concurrent.ExecutorService;
    39 import java.util.concurrent.ExecutorService;
    35 import java.util.concurrent.Executors;
    40 import java.util.concurrent.Executors;
    36 import java.util.concurrent.Future;
    41 import java.util.concurrent.Future;
       
    42 import java.util.concurrent.ThreadLocalRandom;
    37 import java.util.concurrent.atomic.AtomicLong;
    43 import java.util.concurrent.atomic.AtomicLong;
    38 
    44 
    39 public class Invoke {
    45 public class Invoke {
    40     static volatile int passed = 0, failed = 0;
    46     static volatile int passed = 0, failed = 0;
    41 
    47 
    59 
    65 
    60     static void check(boolean condition) {
    66     static void check(boolean condition) {
    61         check(condition, "Assertion failure");
    67         check(condition, "Assertion failure");
    62     }
    68     }
    63 
    69 
       
    70     static long secondsElapsedSince(long startTime) {
       
    71         return NANOSECONDS.toSeconds(System.nanoTime() - startTime);
       
    72     }
       
    73 
       
    74     static void awaitInterrupt(long timeoutSeconds) {
       
    75         long startTime = System.nanoTime();
       
    76         try {
       
    77             Thread.sleep(SECONDS.toMillis(timeoutSeconds));
       
    78             fail("timed out waiting for interrupt");
       
    79         } catch (InterruptedException expected) {
       
    80             check(secondsElapsedSince(startTime) < timeoutSeconds);
       
    81         }
       
    82     }
       
    83 
    64     public static void main(String[] args) {
    84     public static void main(String[] args) {
    65         try {
    85         try {
    66             final AtomicLong count = new AtomicLong(0);
    86             testInvokeAll();
    67             ExecutorService fixed = Executors.newFixedThreadPool(5);
    87             testInvokeAny();
    68             class Inc implements Callable<Long> {
    88             testInvokeAny_cancellationInterrupt();
    69                 public Long call() throws Exception {
    89         } catch (Throwable t) {  unexpected(t); }
    70                     Thread.sleep(200); // Catch IE from possible cancel
    90 
    71                     return count.incrementAndGet();
    91         if (failed > 0)
    72                 }
    92             throw new Error(
    73             }
    93                     String.format("Passed = %d, failed = %d", passed, failed));
    74             List<Inc> tasks = Arrays.asList(new Inc(), new Inc(), new Inc());
    94     }
    75             List<Future<Long>> futures = fixed.invokeAll(tasks);
    95 
       
    96     static final long timeoutSeconds = 10L;
       
    97 
       
    98     static void testInvokeAll() throws Throwable {
       
    99         final ThreadLocalRandom rnd = ThreadLocalRandom.current();
       
   100         final int nThreads = rnd.nextInt(2, 7);
       
   101         final boolean timed = rnd.nextBoolean();
       
   102         final ExecutorService pool = Executors.newFixedThreadPool(nThreads);
       
   103         final AtomicLong count = new AtomicLong(0);
       
   104         class Task implements Callable<Long> {
       
   105             public Long call() throws Exception {
       
   106                 return count.incrementAndGet();
       
   107             }
       
   108         }
       
   109 
       
   110         try {
       
   111             final List<Task> tasks =
       
   112                 IntStream.range(0, nThreads)
       
   113                 .mapToObj(i -> new Task())
       
   114                 .collect(Collectors.toList());
       
   115 
       
   116             List<Future<Long>> futures;
       
   117             if (timed) {
       
   118                 long startTime = System.nanoTime();
       
   119                 futures = pool.invokeAll(tasks, timeoutSeconds, SECONDS);
       
   120                 check(secondsElapsedSince(startTime) < timeoutSeconds);
       
   121             }
       
   122             else
       
   123                 futures = pool.invokeAll(tasks);
    76             check(futures.size() == tasks.size());
   124             check(futures.size() == tasks.size());
    77             check(count.get() == tasks.size());
   125             check(count.get() == tasks.size());
    78 
   126 
    79             long gauss = 0;
   127             long gauss = 0;
    80             for (Future<Long> future : futures) gauss += future.get();
   128             for (Future<Long> future : futures) gauss += future.get();
    81             check(gauss == ((tasks.size()+1)*tasks.size())/2);
   129             check(gauss == (tasks.size()+1)*tasks.size()/2);
    82 
   130 
    83             ExecutorService single = Executors.newSingleThreadExecutor();
   131             pool.shutdown();
    84             long save = count.get();
   132             check(pool.awaitTermination(10L, SECONDS));
    85             check(single.invokeAny(tasks) == save + 1);
   133         } finally {
    86             check(count.get() == save + 1);
   134             pool.shutdownNow();
    87 
   135         }
    88             fixed.shutdown();
   136     }
    89             single.shutdown();
   137 
    90 
   138     static void testInvokeAny() throws Throwable {
    91         } catch (Throwable t) { unexpected(t); }
   139         final ThreadLocalRandom rnd = ThreadLocalRandom.current();
    92 
   140         final boolean timed = rnd.nextBoolean();
    93         System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
   141         final ExecutorService pool = Executors.newSingleThreadExecutor();
    94         if (failed > 0) throw new Error("Some tests failed");
   142         final AtomicLong count = new AtomicLong(0);
       
   143         class Task implements Callable<Long> {
       
   144             public Long call() throws Exception {
       
   145                 long x = count.incrementAndGet();
       
   146                 check(x <= 2);
       
   147                 if (x == 2)
       
   148                     // wait for main thread to interrupt us
       
   149                     awaitInterrupt(timeoutSeconds);
       
   150                 return x;
       
   151             }
       
   152         }
       
   153 
       
   154         try {
       
   155             final List<Task> tasks =
       
   156                 IntStream.range(0, rnd.nextInt(1, 7))
       
   157                 .mapToObj(i -> new Task())
       
   158                 .collect(Collectors.toList());
       
   159 
       
   160             long val;
       
   161             if (timed) {
       
   162                 long startTime = System.nanoTime();
       
   163                 val = pool.invokeAny(tasks, timeoutSeconds, SECONDS);
       
   164                 check(secondsElapsedSince(startTime) < timeoutSeconds);
       
   165             }
       
   166             else
       
   167                 val = pool.invokeAny(tasks);
       
   168             check(val == 1);
       
   169 
       
   170             // inherent race between main thread interrupt and
       
   171             // start of second task
       
   172             check(count.get() == 1 || count.get() == 2);
       
   173 
       
   174             pool.shutdown();
       
   175             check(pool.awaitTermination(timeoutSeconds, SECONDS));
       
   176         } finally {
       
   177             pool.shutdownNow();
       
   178         }
       
   179     }
       
   180 
       
   181     /**
       
   182      * Every remaining running task is sent an interrupt for cancellation.
       
   183      */
       
   184     static void testInvokeAny_cancellationInterrupt() throws Throwable {
       
   185         final ThreadLocalRandom rnd = ThreadLocalRandom.current();
       
   186         final int nThreads = rnd.nextInt(2, 7);
       
   187         final boolean timed = rnd.nextBoolean();
       
   188         final ExecutorService pool = Executors.newFixedThreadPool(nThreads);
       
   189         final AtomicLong count = new AtomicLong(0);
       
   190         final AtomicLong interruptedCount = new AtomicLong(0);
       
   191         final CyclicBarrier allStarted = new CyclicBarrier(nThreads);
       
   192         class Task implements Callable<Long> {
       
   193             public Long call() throws Exception {
       
   194                 allStarted.await();
       
   195                 long x = count.incrementAndGet();
       
   196                 if (x > 1)
       
   197                     // main thread will interrupt us
       
   198                     awaitInterrupt(timeoutSeconds);
       
   199                 return x;
       
   200             }
       
   201         }
       
   202 
       
   203         try {
       
   204             final List<Task> tasks =
       
   205                 IntStream.range(0, nThreads)
       
   206                 .mapToObj(i -> new Task())
       
   207                 .collect(Collectors.toList());
       
   208 
       
   209             long val;
       
   210             if (timed) {
       
   211                 long startTime = System.nanoTime();
       
   212                 val = pool.invokeAny(tasks, timeoutSeconds, SECONDS);
       
   213                 check(secondsElapsedSince(startTime) < timeoutSeconds);
       
   214             }
       
   215             else
       
   216                 val = pool.invokeAny(tasks);
       
   217             check(val == 1);
       
   218 
       
   219             pool.shutdown();
       
   220             check(pool.awaitTermination(timeoutSeconds, SECONDS));
       
   221 
       
   222             // Check after shutdown to avoid race
       
   223             check(count.get() == nThreads);
       
   224         } finally {
       
   225             pool.shutdownNow();
       
   226         }
    95     }
   227     }
    96 }
   228 }