jdk/test/java/util/concurrent/ConcurrentQueues/ConcurrentQueueLoops.java
changeset 3414 cdf768813b4d
parent 2 90ce3da70b43
child 3708 f838f712922e
equal deleted inserted replaced
3330:04c1ec47b42e 3414:cdf768813b4d
       
     1 /*
       
     2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     3  *
       
     4  * This code is free software; you can redistribute it and/or modify it
       
     5  * under the terms of the GNU General Public License version 2 only, as
       
     6  * published by the Free Software Foundation.
       
     7  *
       
     8  * This code is distributed in the hope that it will be useful, but WITHOUT
       
     9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    10  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    11  * version 2 for more details (a copy is included in the LICENSE file that
       
    12  * accompanied this code).
       
    13  *
       
    14  * You should have received a copy of the GNU General Public License version
       
    15  * 2 along with this work; if not, write to the Free Software Foundation,
       
    16  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    17  *
       
    18  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
       
    19  * CA 95054 USA or visit www.sun.com if you need additional information or
       
    20  * have any questions.
       
    21  */
       
    22 
       
    23 /*
       
    24  * This file is available under and governed by the GNU General Public
       
    25  * License version 2 only, as published by the Free Software Foundation.
       
    26  * However, the following notice accompanied the original version of this
       
    27  * file:
       
    28  *
       
    29  * Written by Doug Lea with assistance from members of JCP JSR-166
       
    30  * Expert Group and released to the public domain, as explained at
       
    31  * http://creativecommons.org/licenses/publicdomain
       
    32  */
       
    33 
       
    34 /*
       
    35  * @test
       
    36  * @bug 4486658 6785442
       
    37  * @run main ConcurrentQueueLoops 8 123456
       
    38  * @summary Checks that a set of threads can repeatedly get and modify items
       
    39  */
       
    40 
       
    41 import java.util.*;
       
    42 import java.util.concurrent.*;
       
    43 import java.util.concurrent.atomic.*;
       
    44 
       
    45 public class ConcurrentQueueLoops {
       
    46     ExecutorService pool;
       
    47     AtomicInteger totalItems;
       
    48     boolean print;
       
    49 
       
    50     // Suitable for benchmarking.  Overriden by args[0] for testing.
       
    51     int maxStages = 20;
       
    52 
       
    53     // Suitable for benchmarking.  Overriden by args[1] for testing.
       
    54     int items = 1024 * 1024;
       
    55 
       
    56     Collection<Queue<Integer>> concurrentQueues() {
       
    57         List<Queue<Integer>> queues = new ArrayList<Queue<Integer>>();
       
    58         queues.add(new ConcurrentLinkedQueue<Integer>());
       
    59         queues.add(new ArrayBlockingQueue<Integer>(items, false));
       
    60         //queues.add(new ArrayBlockingQueue<Integer>(count, true));
       
    61         queues.add(new LinkedBlockingQueue<Integer>());
       
    62         queues.add(new LinkedBlockingDeque<Integer>());
       
    63 
       
    64         try {
       
    65             queues.add((Queue<Integer>)
       
    66                        Class.forName("java.util.concurrent.LinkedTransferQueue")
       
    67                        .newInstance());
       
    68         } catch (IllegalAccessException e) {
       
    69         } catch (InstantiationException e) {
       
    70         } catch (ClassNotFoundException e) {
       
    71             // OK; not yet added to JDK
       
    72         }
       
    73 
       
    74         // Following additional implementations are available from:
       
    75         // http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
       
    76         // queues.add(new LinkedTransferQueue<Integer>());
       
    77         // queues.add(new SynchronizedLinkedListQueue<Integer>());
       
    78 
       
    79         // Avoid "first fast, second slow" benchmark effect.
       
    80         Collections.shuffle(queues);
       
    81         return queues;
       
    82     }
       
    83 
       
    84     void test(String[] args) throws Throwable {
       
    85         if (args.length > 0)
       
    86             maxStages = Integer.parseInt(args[0]);
       
    87         if (args.length > 1)
       
    88             items = Integer.parseInt(args[1]);
       
    89 
       
    90         for (Queue<Integer> queue : concurrentQueues())
       
    91             test(queue);
       
    92     }
       
    93 
       
    94     void test(final Queue<Integer> q) throws Throwable {
       
    95         System.out.println(q.getClass().getSimpleName());
       
    96         pool = Executors.newCachedThreadPool();
       
    97         print = false;
       
    98 
       
    99         print = false;
       
   100         System.out.println("Warmup...");
       
   101         oneRun(1, items, q);
       
   102         //Thread.sleep(100);
       
   103         oneRun(3, items, q);
       
   104         Thread.sleep(100);
       
   105         print = true;
       
   106 
       
   107         for (int i = 1; i <= maxStages; i += (i+1) >>> 1) {
       
   108             oneRun(i, items, q);
       
   109         }
       
   110         pool.shutdown();
       
   111         check(pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS));
       
   112    }
       
   113 
       
   114     class Stage implements Callable<Integer> {
       
   115         final Queue<Integer> queue;
       
   116         final CyclicBarrier barrier;
       
   117         int items;
       
   118         Stage (Queue<Integer> q, CyclicBarrier b, int items) {
       
   119             queue = q;
       
   120             barrier = b;
       
   121             this.items = items;
       
   122         }
       
   123 
       
   124         public Integer call() {
       
   125             // Repeatedly take something from queue if possible,
       
   126             // transform it, and put back in.
       
   127             try {
       
   128                 barrier.await();
       
   129                 int l = 4321;
       
   130                 int takes = 0;
       
   131                 for (;;) {
       
   132                     Integer item = queue.poll();
       
   133                     if (item != null) {
       
   134                         ++takes;
       
   135                         l = LoopHelpers.compute2(item.intValue());
       
   136                     }
       
   137                     else if (takes != 0) {
       
   138                         totalItems.getAndAdd(-takes);
       
   139                         takes = 0;
       
   140                     }
       
   141                     else if (totalItems.get() <= 0)
       
   142                         break;
       
   143                     l = LoopHelpers.compute1(l);
       
   144                     if (items > 0) {
       
   145                         --items;
       
   146                         queue.offer(new Integer(l));
       
   147                     }
       
   148                     else if ( (l & (3 << 5)) == 0) // spinwait
       
   149                         Thread.sleep(1);
       
   150                 }
       
   151                 return new Integer(l);
       
   152             }
       
   153             catch (Throwable t) { unexpected(t); return null; }
       
   154         }
       
   155     }
       
   156 
       
   157     void oneRun(int n, int items, final Queue<Integer> q) throws Exception {
       
   158         LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
       
   159         CyclicBarrier barrier = new CyclicBarrier(n + 1, timer);
       
   160         totalItems = new AtomicInteger(n * items);
       
   161         ArrayList<Future<Integer>> results = new ArrayList<Future<Integer>>(n);
       
   162         for (int i = 0; i < n; ++i)
       
   163             results.add(pool.submit(new Stage(q, barrier, items)));
       
   164 
       
   165         if (print)
       
   166             System.out.print("Threads: " + n + "\t:");
       
   167         barrier.await();
       
   168         int total = 0;
       
   169         for (int i = 0; i < n; ++i) {
       
   170             Future<Integer> f = results.get(i);
       
   171             Integer r = f.get();
       
   172             total += r.intValue();
       
   173         }
       
   174         long endTime = System.nanoTime();
       
   175         long time = endTime - timer.startTime;
       
   176         if (print)
       
   177             System.out.println(LoopHelpers.rightJustify(time / (items * n)) + " ns per item");
       
   178         if (total == 0) // avoid overoptimization
       
   179             System.out.println("useless result: " + total);
       
   180     }
       
   181 
       
   182     //--------------------- Infrastructure ---------------------------
       
   183     volatile int passed = 0, failed = 0;
       
   184     void pass() {passed++;}
       
   185     void fail() {failed++; Thread.dumpStack();}
       
   186     void fail(String msg) {System.err.println(msg); fail();}
       
   187     void unexpected(Throwable t) {failed++; t.printStackTrace();}
       
   188     void check(boolean cond) {if (cond) pass(); else fail();}
       
   189     void equal(Object x, Object y) {
       
   190         if (x == null ? y == null : x.equals(y)) pass();
       
   191         else fail(x + " not equal to " + y);}
       
   192     public static void main(String[] args) throws Throwable {
       
   193         new ConcurrentQueueLoops().instanceMain(args);}
       
   194     public void instanceMain(String[] args) throws Throwable {
       
   195         try {test(args);} catch (Throwable t) {unexpected(t);}
       
   196         System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
       
   197         if (failed > 0) throw new AssertionError("Some tests failed");}
       
   198 }