26 * @bug 6267833 |
26 * @bug 6267833 |
27 * @summary Tests for invokeAny, invokeAll |
27 * @summary Tests for invokeAny, invokeAll |
28 * @author Martin Buchholz |
28 * @author Martin Buchholz |
29 */ |
29 */ |
30 |
30 |
31 import java.util.Arrays; |
31 import static java.util.concurrent.TimeUnit.NANOSECONDS; |
|
32 import static java.util.concurrent.TimeUnit.SECONDS; |
|
33 |
32 import java.util.List; |
34 import java.util.List; |
|
35 import java.util.stream.Collectors; |
|
36 import java.util.stream.IntStream; |
33 import java.util.concurrent.Callable; |
37 import java.util.concurrent.Callable; |
|
38 import java.util.concurrent.CyclicBarrier; |
34 import java.util.concurrent.ExecutorService; |
39 import java.util.concurrent.ExecutorService; |
35 import java.util.concurrent.Executors; |
40 import java.util.concurrent.Executors; |
36 import java.util.concurrent.Future; |
41 import java.util.concurrent.Future; |
|
42 import java.util.concurrent.ThreadLocalRandom; |
37 import java.util.concurrent.atomic.AtomicLong; |
43 import java.util.concurrent.atomic.AtomicLong; |
38 |
44 |
39 public class Invoke { |
45 public class Invoke { |
40 static volatile int passed = 0, failed = 0; |
46 static volatile int passed = 0, failed = 0; |
41 |
47 |
59 |
65 |
60 static void check(boolean condition) { |
66 static void check(boolean condition) { |
61 check(condition, "Assertion failure"); |
67 check(condition, "Assertion failure"); |
62 } |
68 } |
63 |
69 |
|
70 static long secondsElapsedSince(long startTime) { |
|
71 return NANOSECONDS.toSeconds(System.nanoTime() - startTime); |
|
72 } |
|
73 |
|
74 static void awaitInterrupt(long timeoutSeconds) { |
|
75 long startTime = System.nanoTime(); |
|
76 try { |
|
77 Thread.sleep(SECONDS.toMillis(timeoutSeconds)); |
|
78 fail("timed out waiting for interrupt"); |
|
79 } catch (InterruptedException expected) { |
|
80 check(secondsElapsedSince(startTime) < timeoutSeconds); |
|
81 } |
|
82 } |
|
83 |
64 public static void main(String[] args) { |
84 public static void main(String[] args) { |
65 try { |
85 try { |
66 final AtomicLong count = new AtomicLong(0); |
86 testInvokeAll(); |
67 ExecutorService fixed = Executors.newFixedThreadPool(5); |
87 testInvokeAny(); |
68 class Inc implements Callable<Long> { |
88 testInvokeAny_cancellationInterrupt(); |
69 public Long call() throws Exception { |
89 } catch (Throwable t) { unexpected(t); } |
70 Thread.sleep(200); // Catch IE from possible cancel |
90 |
71 return count.incrementAndGet(); |
91 if (failed > 0) |
72 } |
92 throw new Error( |
73 } |
93 String.format("Passed = %d, failed = %d", passed, failed)); |
74 List<Inc> tasks = Arrays.asList(new Inc(), new Inc(), new Inc()); |
94 } |
75 List<Future<Long>> futures = fixed.invokeAll(tasks); |
95 |
|
96 static final long timeoutSeconds = 10L; |
|
97 |
|
98 static void testInvokeAll() throws Throwable { |
|
99 final ThreadLocalRandom rnd = ThreadLocalRandom.current(); |
|
100 final int nThreads = rnd.nextInt(2, 7); |
|
101 final boolean timed = rnd.nextBoolean(); |
|
102 final ExecutorService pool = Executors.newFixedThreadPool(nThreads); |
|
103 final AtomicLong count = new AtomicLong(0); |
|
104 class Task implements Callable<Long> { |
|
105 public Long call() throws Exception { |
|
106 return count.incrementAndGet(); |
|
107 } |
|
108 } |
|
109 |
|
110 try { |
|
111 final List<Task> tasks = |
|
112 IntStream.range(0, nThreads) |
|
113 .mapToObj(i -> new Task()) |
|
114 .collect(Collectors.toList()); |
|
115 |
|
116 List<Future<Long>> futures; |
|
117 if (timed) { |
|
118 long startTime = System.nanoTime(); |
|
119 futures = pool.invokeAll(tasks, timeoutSeconds, SECONDS); |
|
120 check(secondsElapsedSince(startTime) < timeoutSeconds); |
|
121 } |
|
122 else |
|
123 futures = pool.invokeAll(tasks); |
76 check(futures.size() == tasks.size()); |
124 check(futures.size() == tasks.size()); |
77 check(count.get() == tasks.size()); |
125 check(count.get() == tasks.size()); |
78 |
126 |
79 long gauss = 0; |
127 long gauss = 0; |
80 for (Future<Long> future : futures) gauss += future.get(); |
128 for (Future<Long> future : futures) gauss += future.get(); |
81 check(gauss == ((tasks.size()+1)*tasks.size())/2); |
129 check(gauss == (tasks.size()+1)*tasks.size()/2); |
82 |
130 |
83 ExecutorService single = Executors.newSingleThreadExecutor(); |
131 pool.shutdown(); |
84 long save = count.get(); |
132 check(pool.awaitTermination(10L, SECONDS)); |
85 check(single.invokeAny(tasks) == save + 1); |
133 } finally { |
86 check(count.get() == save + 1); |
134 pool.shutdownNow(); |
87 |
135 } |
88 fixed.shutdown(); |
136 } |
89 single.shutdown(); |
137 |
90 |
138 static void testInvokeAny() throws Throwable { |
91 } catch (Throwable t) { unexpected(t); } |
139 final ThreadLocalRandom rnd = ThreadLocalRandom.current(); |
92 |
140 final boolean timed = rnd.nextBoolean(); |
93 System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed); |
141 final ExecutorService pool = Executors.newSingleThreadExecutor(); |
94 if (failed > 0) throw new Error("Some tests failed"); |
142 final AtomicLong count = new AtomicLong(0); |
|
143 class Task implements Callable<Long> { |
|
144 public Long call() throws Exception { |
|
145 long x = count.incrementAndGet(); |
|
146 check(x <= 2); |
|
147 if (x == 2) |
|
148 // wait for main thread to interrupt us |
|
149 awaitInterrupt(timeoutSeconds); |
|
150 return x; |
|
151 } |
|
152 } |
|
153 |
|
154 try { |
|
155 final List<Task> tasks = |
|
156 IntStream.range(0, rnd.nextInt(1, 7)) |
|
157 .mapToObj(i -> new Task()) |
|
158 .collect(Collectors.toList()); |
|
159 |
|
160 long val; |
|
161 if (timed) { |
|
162 long startTime = System.nanoTime(); |
|
163 val = pool.invokeAny(tasks, timeoutSeconds, SECONDS); |
|
164 check(secondsElapsedSince(startTime) < timeoutSeconds); |
|
165 } |
|
166 else |
|
167 val = pool.invokeAny(tasks); |
|
168 check(val == 1); |
|
169 |
|
170 // inherent race between main thread interrupt and |
|
171 // start of second task |
|
172 check(count.get() == 1 || count.get() == 2); |
|
173 |
|
174 pool.shutdown(); |
|
175 check(pool.awaitTermination(timeoutSeconds, SECONDS)); |
|
176 } finally { |
|
177 pool.shutdownNow(); |
|
178 } |
|
179 } |
|
180 |
|
181 /** |
|
182 * Every remaining running task is sent an interrupt for cancellation. |
|
183 */ |
|
184 static void testInvokeAny_cancellationInterrupt() throws Throwable { |
|
185 final ThreadLocalRandom rnd = ThreadLocalRandom.current(); |
|
186 final int nThreads = rnd.nextInt(2, 7); |
|
187 final boolean timed = rnd.nextBoolean(); |
|
188 final ExecutorService pool = Executors.newFixedThreadPool(nThreads); |
|
189 final AtomicLong count = new AtomicLong(0); |
|
190 final AtomicLong interruptedCount = new AtomicLong(0); |
|
191 final CyclicBarrier allStarted = new CyclicBarrier(nThreads); |
|
192 class Task implements Callable<Long> { |
|
193 public Long call() throws Exception { |
|
194 allStarted.await(); |
|
195 long x = count.incrementAndGet(); |
|
196 if (x > 1) |
|
197 // main thread will interrupt us |
|
198 awaitInterrupt(timeoutSeconds); |
|
199 return x; |
|
200 } |
|
201 } |
|
202 |
|
203 try { |
|
204 final List<Task> tasks = |
|
205 IntStream.range(0, nThreads) |
|
206 .mapToObj(i -> new Task()) |
|
207 .collect(Collectors.toList()); |
|
208 |
|
209 long val; |
|
210 if (timed) { |
|
211 long startTime = System.nanoTime(); |
|
212 val = pool.invokeAny(tasks, timeoutSeconds, SECONDS); |
|
213 check(secondsElapsedSince(startTime) < timeoutSeconds); |
|
214 } |
|
215 else |
|
216 val = pool.invokeAny(tasks); |
|
217 check(val == 1); |
|
218 |
|
219 pool.shutdown(); |
|
220 check(pool.awaitTermination(timeoutSeconds, SECONDS)); |
|
221 |
|
222 // Check after shutdown to avoid race |
|
223 check(count.get() == nThreads); |
|
224 } finally { |
|
225 pool.shutdownNow(); |
|
226 } |
95 } |
227 } |
96 } |
228 } |