43 import java.util.concurrent.BlockingQueue; |
43 import java.util.concurrent.BlockingQueue; |
44 import java.util.concurrent.CountDownLatch; |
44 import java.util.concurrent.CountDownLatch; |
45 import java.util.concurrent.Executors; |
45 import java.util.concurrent.Executors; |
46 import java.util.concurrent.ExecutorService; |
46 import java.util.concurrent.ExecutorService; |
47 import java.util.concurrent.SynchronousQueue; |
47 import java.util.concurrent.SynchronousQueue; |
48 import java.util.concurrent.ThreadLocalRandom; |
|
49 |
48 |
50 import junit.framework.Test; |
49 import junit.framework.Test; |
51 |
50 |
52 public class SynchronousQueueTest extends JSR166TestCase { |
51 public class SynchronousQueueTest extends JSR166TestCase { |
53 |
52 |
205 assertEquals(0, q.remainingCapacity()); |
204 assertEquals(0, q.remainingCapacity()); |
206 try { assertSame(one, q.take()); } |
205 try { assertSame(one, q.take()); } |
207 catch (InterruptedException e) { threadUnexpectedException(e); } |
206 catch (InterruptedException e) { threadUnexpectedException(e); } |
208 |
207 |
209 await(pleaseInterrupt); |
208 await(pleaseInterrupt); |
210 assertThreadBlocks(t, Thread.State.WAITING); |
209 if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING); |
211 t.interrupt(); |
210 t.interrupt(); |
212 awaitTermination(t); |
211 awaitTermination(t); |
213 assertEquals(0, q.remainingCapacity()); |
212 assertEquals(0, q.remainingCapacity()); |
214 } |
213 } |
215 |
214 |
216 /** |
215 /** |
217 * timed offer times out if elements not taken |
216 * timed offer times out if elements not taken |
218 */ |
217 */ |
219 public void testTimedOffer() { |
218 public void testTimedOffer() { |
220 final boolean fair = ThreadLocalRandom.current().nextBoolean(); |
219 final boolean fair = randomBoolean(); |
221 final SynchronousQueue q = new SynchronousQueue(fair); |
220 final SynchronousQueue q = new SynchronousQueue(fair); |
222 final CountDownLatch pleaseInterrupt = new CountDownLatch(1); |
221 final CountDownLatch pleaseInterrupt = new CountDownLatch(1); |
223 Thread t = newStartedThread(new CheckedRunnable() { |
222 Thread t = newStartedThread(new CheckedRunnable() { |
224 public void realRun() throws InterruptedException { |
223 public void realRun() throws InterruptedException { |
225 long startTime = System.nanoTime(); |
224 long startTime = System.nanoTime(); |
|
225 |
226 assertFalse(q.offer(new Object(), timeoutMillis(), MILLISECONDS)); |
226 assertFalse(q.offer(new Object(), timeoutMillis(), MILLISECONDS)); |
227 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); |
227 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); |
228 |
228 |
229 Thread.currentThread().interrupt(); |
229 Thread.currentThread().interrupt(); |
230 try { |
230 try { |
231 q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS); |
231 q.offer(new Object(), randomTimeout(), randomTimeUnit()); |
232 shouldThrow(); |
232 shouldThrow(); |
233 } catch (InterruptedException success) {} |
233 } catch (InterruptedException success) {} |
234 assertFalse(Thread.interrupted()); |
234 assertFalse(Thread.interrupted()); |
235 |
235 |
236 pleaseInterrupt.countDown(); |
236 pleaseInterrupt.countDown(); |
237 try { |
237 try { |
238 q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS); |
238 q.offer(new Object(), LONGER_DELAY_MS, MILLISECONDS); |
239 shouldThrow(); |
239 shouldThrow(); |
240 } catch (InterruptedException success) {} |
240 } catch (InterruptedException success) {} |
241 assertFalse(Thread.interrupted()); |
241 assertFalse(Thread.interrupted()); |
242 }}); |
242 }}); |
243 |
243 |
244 await(pleaseInterrupt); |
244 await(pleaseInterrupt); |
245 assertThreadBlocks(t, Thread.State.TIMED_WAITING); |
245 if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING); |
246 t.interrupt(); |
246 t.interrupt(); |
247 awaitTermination(t); |
247 awaitTermination(t); |
248 } |
248 } |
249 |
249 |
250 /** |
250 /** |
270 |
270 |
271 /** |
271 /** |
272 * timed poll with nonzero timeout times out if no active putter |
272 * timed poll with nonzero timeout times out if no active putter |
273 */ |
273 */ |
274 public void testTimedPoll() { |
274 public void testTimedPoll() { |
275 final boolean fair = ThreadLocalRandom.current().nextBoolean(); |
275 final boolean fair = randomBoolean(); |
276 final SynchronousQueue q = new SynchronousQueue(fair); |
276 final SynchronousQueue q = new SynchronousQueue(fair); |
277 final long startTime = System.nanoTime(); |
277 final long startTime = System.nanoTime(); |
278 try { assertNull(q.poll(timeoutMillis(), MILLISECONDS)); } |
278 try { assertNull(q.poll(timeoutMillis(), MILLISECONDS)); } |
279 catch (InterruptedException e) { threadUnexpectedException(e); } |
279 catch (InterruptedException e) { threadUnexpectedException(e); } |
280 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); |
280 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); |
283 /** |
283 /** |
284 * timed poll before a delayed offer times out, returning null; |
284 * timed poll before a delayed offer times out, returning null; |
285 * after offer succeeds; on interruption throws |
285 * after offer succeeds; on interruption throws |
286 */ |
286 */ |
287 public void testTimedPollWithOffer() { |
287 public void testTimedPollWithOffer() { |
288 final boolean fair = ThreadLocalRandom.current().nextBoolean(); |
288 final boolean fair = randomBoolean(); |
289 final SynchronousQueue q = new SynchronousQueue(fair); |
289 final SynchronousQueue q = new SynchronousQueue(fair); |
290 final CountDownLatch pleaseOffer = new CountDownLatch(1); |
290 final CountDownLatch pleaseOffer = new CountDownLatch(1); |
291 final CountDownLatch pleaseInterrupt = new CountDownLatch(1); |
291 final CountDownLatch pleaseInterrupt = new CountDownLatch(1); |
292 Thread t = newStartedThread(new CheckedRunnable() { |
292 Thread t = newStartedThread(new CheckedRunnable() { |
293 public void realRun() throws InterruptedException { |
293 public void realRun() throws InterruptedException { |
299 startTime = System.nanoTime(); |
299 startTime = System.nanoTime(); |
300 assertSame(zero, q.poll(LONG_DELAY_MS, MILLISECONDS)); |
300 assertSame(zero, q.poll(LONG_DELAY_MS, MILLISECONDS)); |
301 |
301 |
302 Thread.currentThread().interrupt(); |
302 Thread.currentThread().interrupt(); |
303 try { |
303 try { |
304 q.poll(LONG_DELAY_MS, MILLISECONDS); |
304 q.poll(randomTimeout(), randomTimeUnit()); |
305 shouldThrow(); |
305 shouldThrow(); |
306 } catch (InterruptedException success) {} |
306 } catch (InterruptedException success) {} |
307 assertFalse(Thread.interrupted()); |
307 assertFalse(Thread.interrupted()); |
308 |
308 |
309 pleaseInterrupt.countDown(); |
309 pleaseInterrupt.countDown(); |
321 try { assertTrue(q.offer(zero, LONG_DELAY_MS, MILLISECONDS)); } |
321 try { assertTrue(q.offer(zero, LONG_DELAY_MS, MILLISECONDS)); } |
322 catch (InterruptedException e) { threadUnexpectedException(e); } |
322 catch (InterruptedException e) { threadUnexpectedException(e); } |
323 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); |
323 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); |
324 |
324 |
325 await(pleaseInterrupt); |
325 await(pleaseInterrupt); |
326 assertThreadBlocks(t, Thread.State.TIMED_WAITING); |
326 if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING); |
327 t.interrupt(); |
327 t.interrupt(); |
328 awaitTermination(t); |
328 awaitTermination(t); |
329 } |
329 } |
330 |
330 |
331 /** |
331 /** |