jdk/test/java/util/concurrent/BlockingQueue/MultipleProducersSingleConsumerLoops.java
changeset 7976 f273c0d04215
parent 5506 202f599c92aa
child 9242 ef138d47df58
equal deleted inserted replaced
7975:f0de2d05f34c 7976:f273c0d04215
    32  */
    32  */
    33 
    33 
    34 /*
    34 /*
    35  * @test
    35  * @test
    36  * @bug 4486658
    36  * @bug 4486658
    37  * @compile MultipleProducersSingleConsumerLoops.java
    37  * @compile -source 1.5 MultipleProducersSingleConsumerLoops.java
    38  * @run main/timeout=3600 MultipleProducersSingleConsumerLoops
    38  * @run main/timeout=3600 MultipleProducersSingleConsumerLoops
    39  * @summary  multiple producers and single consumer using blocking queues
    39  * @summary  multiple producers and single consumer using blocking queues
    40  */
    40  */
    41 
    41 
    42 import java.util.concurrent.*;
    42 import java.util.concurrent.*;
    85         pool.shutdown();
    85         pool.shutdown();
    86         if (! pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS))
    86         if (! pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS))
    87             throw new Error();
    87             throw new Error();
    88    }
    88    }
    89 
    89 
    90     static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
       
    91         LTQasSQ() { super(); }
       
    92         public void put(T x) {
       
    93             try { super.transfer(x); }
       
    94             catch (InterruptedException ex) { throw new Error(); }
       
    95         }
       
    96         private final static long serialVersionUID = 42;
       
    97     }
       
    98 
       
    99     static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
       
   100         HalfSyncLTQ() { super(); }
       
   101         public void put(T x) {
       
   102             if (ThreadLocalRandom.current().nextBoolean())
       
   103                 super.put(x);
       
   104             else {
       
   105                 try { super.transfer(x); }
       
   106                 catch (InterruptedException ex) { throw new Error(); }
       
   107             }
       
   108         }
       
   109         private final static long serialVersionUID = 42;
       
   110     }
       
   111 
       
   112     static void oneTest(int producers, int iters) throws Exception {
    90     static void oneTest(int producers, int iters) throws Exception {
   113         oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), producers, iters);
    91         oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), producers, iters);
   114         oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), producers, iters);
    92         oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), producers, iters);
   115         oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), producers, iters);
    93         oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), producers, iters);
   116         oneRun(new LinkedTransferQueue<Integer>(), producers, iters);
    94         oneRun(new LinkedTransferQueue<Integer>(), producers, iters);
   117         oneRun(new LTQasSQ<Integer>(), producers, iters);
       
   118         oneRun(new HalfSyncLTQ<Integer>(), producers, iters);
       
   119 
    95 
   120         // Don't run PBQ since can legitimately run out of memory
    96         // Don't run PBQ since can legitimately run out of memory
   121         //        if (print)
    97         //        if (print)
   122         //            System.out.print("PriorityBlockingQueue   ");
    98         //            System.out.print("PriorityBlockingQueue   ");
   123         //        oneRun(new PriorityBlockingQueue<Integer>(), producers, iters);
    99         //        oneRun(new PriorityBlockingQueue<Integer>(), producers, iters);
   127             System.out.println("fair implementations:");
   103             System.out.println("fair implementations:");
   128         oneRun(new SynchronousQueue<Integer>(true), producers, iters);
   104         oneRun(new SynchronousQueue<Integer>(true), producers, iters);
   129         oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), producers, iters);
   105         oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), producers, iters);
   130     }
   106     }
   131 
   107 
   132     static abstract class Stage implements Runnable {
   108     abstract static class Stage implements Runnable {
   133         final int iters;
   109         final int iters;
   134         final BlockingQueue<Integer> queue;
   110         final BlockingQueue<Integer> queue;
   135         final CyclicBarrier barrier;
   111         final CyclicBarrier barrier;
   136         Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
   112         Stage(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
   137             queue = q;
   113             queue = q;
   138             barrier = b;
   114             barrier = b;
   139             this.iters = iters;
   115             this.iters = iters;
   140         }
   116         }
   141     }
   117     }