jdk/test/java/nio/channels/Selector/LotsOfCancels.java
changeset 4183 0e342cecb3a9
child 5506 202f599c92aa
equal deleted inserted replaced
4182:ffdc79c88a0e 4183:0e342cecb3a9
       
     1 /*
       
     2  * Copyright 2009 Google Inc.  All Rights Reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.
       
     8  *
       
     9  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    12  * version 2 for more details (a copy is included in the LICENSE file that
       
    13  * accompanied this code).
       
    14  *
       
    15  * You should have received a copy of the GNU General Public License version
       
    16  * 2 along with this work; if not, write to the Free Software Foundation,
       
    17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    18  *
       
    19  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
       
    20  * CA 95054 USA or visit www.sun.com if you need additional information or
       
    21  * have any questions.
       
    22  */
       
    23 
       
    24 import java.net.InetSocketAddress;
       
    25 import java.net.SocketAddress;
       
    26 import java.nio.channels.SelectionKey;
       
    27 import java.nio.channels.Selector;
       
    28 import java.nio.channels.ServerSocketChannel;
       
    29 import java.nio.channels.SocketChannel;
       
    30 import java.util.ArrayList;
       
    31 import java.util.Iterator;
       
    32 import java.util.List;
       
    33 
       
    34 /**
       
    35  * Reproduces O(N^2) behavior of JDK6/7 select() call. This happens when
       
    36  * a selector has many unprocessed updates to its interest set (e.g. adding
       
    37  * OP_READ on a bunch of newly accepted sockets). The O(N^2) is triggered
       
    38  * by cancelling a number of selection keys (or just closing a few sockets).
       
    39  * In this case, select() will first go through the list of cancelled keys
       
    40  * and try to deregister them. That deregistration is O(N^2) over the list
       
    41  * of unprocessed updates to the interest set.
       
    42  *
       
    43  * <p> This O(N^2) behavior is a BUG in JVM and should be fixed.
       
    44  *
       
    45  * <p> The test first creates initCount connections, and adds them
       
    46  * to the server epoll set. It then creates massCount connections,
       
    47  * registers interest (causing updateList to be populated with massCount*2
       
    48  * elements), but does not add them to epoll set (that would've cleared
       
    49  * updateList). The test then closes initCount connections, thus populating
       
    50  * deregistration queue. The subsequent call to selectNow() will first process
       
    51  * deregistration queue, performing O(N^2) over updateList size,
       
    52  * equal to massCount * 2.
       
    53  *
       
    54  * <p> Note that connect rate is artificially slowed down to compensate
       
    55  * for what I believe is a Linux bug, where too high of a connection rate
       
    56  * ends up in SYN's being dropped and then slow retransmits.
       
    57  *
       
    58  * @author Igor Chernyshev
       
    59  */
       
    60 public class LotsOfCancels {
       
    61 
       
    62     static long testStartTime;
       
    63 
       
    64     public static void main(String[] args) throws Exception {
       
    65         // the final select should run in less than 1000ms.
       
    66         runTest(500, 2700, 1000);
       
    67     }
       
    68 
       
    69     static void log(String msg) {
       
    70         System.out.println(getLogPrefix() + msg);
       
    71     }
       
    72 
       
    73     static String getLogPrefix() {
       
    74         return durationMillis(testStartTime) + ": ";
       
    75     }
       
    76 
       
    77     /**
       
    78      * Returns the elapsed time since startNanos, in milliseconds.
       
    79      * @param startNanos the start time; this must be a value returned
       
    80      * by {@link System.nanoTime}
       
    81      */
       
    82     static long durationMillis(long startNanos) {
       
    83         return (System.nanoTime() - startNanos) / (1000L * 1000L);
       
    84     }
       
    85 
       
    86     static void runTest(int initCount, int massCount, int maxSelectTime)
       
    87             throws Exception {
       
    88         testStartTime = System.nanoTime();
       
    89 
       
    90         InetSocketAddress address = new InetSocketAddress("127.0.0.1", 7359);
       
    91 
       
    92         // Create server channel, add it to selector and run epoll_ctl.
       
    93         log("Setting up server");
       
    94         Selector serverSelector = Selector.open();
       
    95         ServerSocketChannel server = ServerSocketChannel.open();
       
    96         server.configureBlocking(false);
       
    97         server.socket().bind(address, 5000);
       
    98         server.register(serverSelector, SelectionKey.OP_ACCEPT);
       
    99         serverSelector.selectNow();
       
   100 
       
   101         log("Setting up client");
       
   102         ClientThread client = new ClientThread(address);
       
   103         client.start();
       
   104         Thread.sleep(100);
       
   105 
       
   106         // Set up initial set of client sockets.
       
   107         log("Starting initial client connections");
       
   108         client.connectClients(initCount);
       
   109         Thread.sleep(500);  // Wait for client connections to arrive
       
   110 
       
   111         // Accept all initial client sockets, add to selector and run
       
   112         // epoll_ctl.
       
   113         log("Accepting initial connections");
       
   114         List<SocketChannel> serverChannels1 =
       
   115             acceptAndAddAll(serverSelector, server, initCount);
       
   116         if (serverChannels1.size() != initCount) {
       
   117             throw new Exception("Accepted " + serverChannels1.size() +
       
   118                                 " instead of " + initCount);
       
   119         }
       
   120         serverSelector.selectNow();
       
   121 
       
   122         // Set up mass set of client sockets.
       
   123         log("Requesting mass client connections");
       
   124         client.connectClients(massCount);
       
   125         Thread.sleep(500);  // Wait for client connections to arrive
       
   126 
       
   127         // Accept all mass client sockets, add to selector and do NOT
       
   128         // run epoll_ctl.
       
   129         log("Accepting mass connections");
       
   130         List<SocketChannel> serverChannels2 =
       
   131             acceptAndAddAll(serverSelector, server, massCount);
       
   132         if (serverChannels2.size() != massCount) {
       
   133             throw new Exception("Accepted " + serverChannels2.size() +
       
   134                                 " instead of " + massCount);
       
   135         }
       
   136 
       
   137         // Close initial set of sockets.
       
   138         log("Closing initial connections");
       
   139         closeAll(serverChannels1);
       
   140 
       
   141         // Now get the timing of select() call.
       
   142         log("Running the final select call");
       
   143         long startTime = System.nanoTime();
       
   144         serverSelector.selectNow();
       
   145         long duration = durationMillis(startTime);
       
   146         log("Init count = " + initCount +
       
   147             ", mass count = " + massCount +
       
   148             ", duration = " + duration + "ms");
       
   149 
       
   150         if (duration > maxSelectTime) {
       
   151             System.out.println
       
   152                 ("\n\n\n\n\nFAILURE: The final selectNow() took " +
       
   153                  duration + "ms " +
       
   154                  "- seems like O(N^2) bug is still here\n\n");
       
   155             System.exit(1);
       
   156         }
       
   157     }
       
   158 
       
   159     static List<SocketChannel> acceptAndAddAll(Selector selector,
       
   160                                                ServerSocketChannel server,
       
   161                                                int expected)
       
   162             throws Exception {
       
   163         int retryCount = 0;
       
   164         int acceptCount = 0;
       
   165         List<SocketChannel> channels = new ArrayList<SocketChannel>();
       
   166         while (channels.size() < expected) {
       
   167             SocketChannel channel = server.accept();
       
   168             if (channel == null) {
       
   169                 log("accept() returned null " +
       
   170                     "after accepting " + acceptCount + " more connections");
       
   171                 acceptCount = 0;
       
   172                 if (retryCount < 10) {
       
   173                     // See if more new sockets got stacked behind.
       
   174                     retryCount++;
       
   175                     Thread.sleep(500);
       
   176                     continue;
       
   177                 }
       
   178                 break;
       
   179             }
       
   180             retryCount = 0;
       
   181             acceptCount++;
       
   182             channel.configureBlocking(false);
       
   183             channel.register(selector, SelectionKey.OP_READ);
       
   184             channels.add(channel);
       
   185         }
       
   186         // Cause an additional updateList entry per channel.
       
   187         for (SocketChannel channel : channels) {
       
   188             channel.register(selector, SelectionKey.OP_WRITE);
       
   189         }
       
   190         return channels;
       
   191     }
       
   192 
       
   193     static void closeAll(List<SocketChannel> channels)
       
   194             throws Exception {
       
   195         for (SocketChannel channel : channels) {
       
   196             channel.close();
       
   197         }
       
   198     }
       
   199 
       
   200     static class ClientThread extends Thread {
       
   201         private final SocketAddress address;
       
   202         private final Selector selector;
       
   203         private int connectionsNeeded;
       
   204         private int totalCreated;
       
   205 
       
   206         ClientThread(SocketAddress address) throws Exception {
       
   207             this.address = address;
       
   208             selector = Selector.open();
       
   209             setDaemon(true);
       
   210         }
       
   211 
       
   212         void connectClients(int count) throws Exception {
       
   213             synchronized (this) {
       
   214                 connectionsNeeded += count;
       
   215             }
       
   216             selector.wakeup();
       
   217         }
       
   218 
       
   219         @Override
       
   220         public void run() {
       
   221             try {
       
   222                 handleClients();
       
   223             } catch (Throwable e) {
       
   224                 e.printStackTrace();
       
   225                 System.exit(1);
       
   226             }
       
   227         }
       
   228 
       
   229         private void handleClients() throws Exception {
       
   230             int selectCount = 0;
       
   231             while (true) {
       
   232                 int createdCount = 0;
       
   233                 synchronized (this) {
       
   234                     if (connectionsNeeded > 0) {
       
   235 
       
   236                         while (connectionsNeeded > 0 && createdCount < 20) {
       
   237                             connectionsNeeded--;
       
   238                             createdCount++;
       
   239                             totalCreated++;
       
   240 
       
   241                             SocketChannel channel = SocketChannel.open();
       
   242                             channel.configureBlocking(false);
       
   243                             channel.connect(address);
       
   244                             if (!channel.finishConnect()) {
       
   245                                 channel.register(selector,
       
   246                                                  SelectionKey.OP_CONNECT);
       
   247                             }
       
   248                         }
       
   249 
       
   250                         log("Started total of " +
       
   251                             totalCreated + " client connections");
       
   252                         Thread.sleep(200);
       
   253                     }
       
   254                 }
       
   255 
       
   256                 if (createdCount > 0) {
       
   257                     selector.selectNow();
       
   258                 } else {
       
   259                     selectCount++;
       
   260                     long startTime = System.nanoTime();
       
   261                     selector.select();
       
   262                     long duration = durationMillis(startTime);
       
   263                     log("Exited clientSelector.select(), loop #"
       
   264                         + selectCount + ", duration = " + duration + "ms");
       
   265                 }
       
   266 
       
   267                 int keyCount = -1;
       
   268                 Iterator<SelectionKey> keys =
       
   269                     selector.selectedKeys().iterator();
       
   270                 while (keys.hasNext()) {
       
   271                     SelectionKey key = keys.next();
       
   272                     synchronized (key) {
       
   273                         keyCount++;
       
   274                         keys.remove();
       
   275                         if (!key.isValid()) {
       
   276                             log("Ignoring client key #" + keyCount);
       
   277                             continue;
       
   278                         }
       
   279                         int readyOps = key.readyOps();
       
   280                         if (readyOps == SelectionKey.OP_CONNECT) {
       
   281                             key.interestOps(0);
       
   282                             ((SocketChannel) key.channel()).finishConnect();
       
   283                         } else {
       
   284                             log("readyOps() on client key #" + keyCount +
       
   285                                 " returned " + readyOps);
       
   286                         }
       
   287                     }
       
   288                 }
       
   289             }
       
   290         }
       
   291     }
       
   292 }