test/jdk/java/nio/channels/Selector/LotsOfCancels.java
author mli
Fri, 31 Aug 2018 10:00:22 +0800
changeset 51602 dbb0e798deeb
parent 47216 71c04702a3d5
child 54086 ccb4a50bee06
permissions -rw-r--r--
8208280: java/nio/channels/Selector/RegisterDuringSelect.java fails with "key not removed from key set" Reviewed-by: alanb

/*
 * Copyright 2009 Google Inc.  All Rights Reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
 * Reproduces O(N^2) behavior of JDK6/7 select() call. This happens when
 * a selector has many unprocessed updates to its interest set (e.g. adding
 * OP_READ on a bunch of newly accepted sockets). The O(N^2) is triggered
 * by cancelling a number of selection keys (or just closing a few sockets).
 * In this case, select() will first go through the list of cancelled keys
 * and try to deregister them. That deregistration is O(N^2) over the list
 * of unprocessed updates to the interest set.
 *
 * <p> This O(N^2) behavior is a BUG in JVM and should be fixed.
 *
 * <p> The test first creates initCount connections, and adds them
 * to the server epoll set. It then creates massCount connections,
 * registers interest (causing updateList to be populated with massCount*2
 * elements), but does not add them to epoll set (that would've cleared
 * updateList). The test then closes initCount connections, thus populating
 * deregistration queue. The subsequent call to selectNow() will first process
 * deregistration queue, performing O(N^2) over updateList size,
 * equal to massCount * 2.
 *
 * <p> Note that connect rate is artificially slowed down to compensate
 * for what I believe is a Linux bug, where too high of a connection rate
 * ends up in SYN's being dropped and then slow retransmits.
 *
 * @author Igor Chernyshev
 */
public class LotsOfCancels {

    static long testStartTime;

    public static void main(String[] args) throws Exception {
        // the final select should run in less than 1000ms.
        runTest(500, 2700, 1000);
    }

    static void log(String msg) {
        System.out.println(getLogPrefix() + msg);
    }

    static String getLogPrefix() {
        return durationMillis(testStartTime) + ": ";
    }

    /**
     * Returns the elapsed time since startNanos, in milliseconds.
     * @param startNanos the start time; this must be a value returned
     * by {@link System.nanoTime}
     */
    static long durationMillis(long startNanos) {
        return (System.nanoTime() - startNanos) / (1000L * 1000L);
    }

    static void runTest(int initCount, int massCount, int maxSelectTime)
            throws Exception {
        testStartTime = System.nanoTime();

        InetSocketAddress address = new InetSocketAddress("127.0.0.1", 7359);

        // Create server channel, add it to selector and run epoll_ctl.
        log("Setting up server");
        Selector serverSelector = Selector.open();
        ServerSocketChannel server = ServerSocketChannel.open();
        server.configureBlocking(false);
        server.socket().bind(address, 5000);
        server.register(serverSelector, SelectionKey.OP_ACCEPT);
        serverSelector.selectNow();

        log("Setting up client");
        ClientThread client = new ClientThread(address);
        client.start();
        Thread.sleep(100);

        // Set up initial set of client sockets.
        log("Starting initial client connections");
        client.connectClients(initCount);
        Thread.sleep(500);  // Wait for client connections to arrive

        // Accept all initial client sockets, add to selector and run
        // epoll_ctl.
        log("Accepting initial connections");
        List<SocketChannel> serverChannels1 =
            acceptAndAddAll(serverSelector, server, initCount);
        if (serverChannels1.size() != initCount) {
            throw new Exception("Accepted " + serverChannels1.size() +
                                " instead of " + initCount);
        }
        serverSelector.selectNow();

        // Set up mass set of client sockets.
        log("Requesting mass client connections");
        client.connectClients(massCount);
        Thread.sleep(500);  // Wait for client connections to arrive

        // Accept all mass client sockets, add to selector and do NOT
        // run epoll_ctl.
        log("Accepting mass connections");
        List<SocketChannel> serverChannels2 =
            acceptAndAddAll(serverSelector, server, massCount);
        if (serverChannels2.size() != massCount) {
            throw new Exception("Accepted " + serverChannels2.size() +
                                " instead of " + massCount);
        }

        // Close initial set of sockets.
        log("Closing initial connections");
        closeAll(serverChannels1);

        // Now get the timing of select() call.
        log("Running the final select call");
        long startTime = System.nanoTime();
        serverSelector.selectNow();
        long duration = durationMillis(startTime);
        log("Init count = " + initCount +
            ", mass count = " + massCount +
            ", duration = " + duration + "ms");

        if (duration > maxSelectTime) {
            System.out.println
                ("\n\n\n\n\nFAILURE: The final selectNow() took " +
                 duration + "ms " +
                 "- seems like O(N^2) bug is still here\n\n");
            System.exit(1);
        }
    }

    static List<SocketChannel> acceptAndAddAll(Selector selector,
                                               ServerSocketChannel server,
                                               int expected)
            throws Exception {
        int retryCount = 0;
        int acceptCount = 0;
        List<SocketChannel> channels = new ArrayList<SocketChannel>();
        while (channels.size() < expected) {
            SocketChannel channel = server.accept();
            if (channel == null) {
                log("accept() returned null " +
                    "after accepting " + acceptCount + " more connections");
                acceptCount = 0;
                if (retryCount < 10) {
                    // See if more new sockets got stacked behind.
                    retryCount++;
                    Thread.sleep(500);
                    continue;
                }
                break;
            }
            retryCount = 0;
            acceptCount++;
            channel.configureBlocking(false);
            channel.register(selector, SelectionKey.OP_READ);
            channels.add(channel);
        }
        // Cause an additional updateList entry per channel.
        for (SocketChannel channel : channels) {
            channel.register(selector, SelectionKey.OP_WRITE);
        }
        return channels;
    }

    static void closeAll(List<SocketChannel> channels)
            throws Exception {
        for (SocketChannel channel : channels) {
            channel.close();
        }
    }

    static class ClientThread extends Thread {
        private final SocketAddress address;
        private final Selector selector;
        private int connectionsNeeded;
        private int totalCreated;

        ClientThread(SocketAddress address) throws Exception {
            this.address = address;
            selector = Selector.open();
            setDaemon(true);
        }

        void connectClients(int count) throws Exception {
            synchronized (this) {
                connectionsNeeded += count;
            }
            selector.wakeup();
        }

        @Override
        public void run() {
            try {
                handleClients();
            } catch (Throwable e) {
                e.printStackTrace();
                System.exit(1);
            }
        }

        private void handleClients() throws Exception {
            int selectCount = 0;
            while (true) {
                int createdCount = 0;
                synchronized (this) {
                    if (connectionsNeeded > 0) {

                        while (connectionsNeeded > 0 && createdCount < 20) {
                            connectionsNeeded--;
                            createdCount++;
                            totalCreated++;

                            SocketChannel channel = SocketChannel.open();
                            channel.configureBlocking(false);
                            channel.connect(address);
                            if (!channel.finishConnect()) {
                                channel.register(selector,
                                                 SelectionKey.OP_CONNECT);
                            }
                        }

                        log("Started total of " +
                            totalCreated + " client connections");
                        Thread.sleep(200);
                    }
                }

                if (createdCount > 0) {
                    selector.selectNow();
                } else {
                    selectCount++;
                    long startTime = System.nanoTime();
                    selector.select();
                    long duration = durationMillis(startTime);
                    log("Exited clientSelector.select(), loop #"
                        + selectCount + ", duration = " + duration + "ms");
                }

                int keyCount = -1;
                Iterator<SelectionKey> keys =
                    selector.selectedKeys().iterator();
                while (keys.hasNext()) {
                    SelectionKey key = keys.next();
                    synchronized (key) {
                        keyCount++;
                        keys.remove();
                        if (!key.isValid()) {
                            log("Ignoring client key #" + keyCount);
                            continue;
                        }
                        int readyOps = key.readyOps();
                        if (readyOps == SelectionKey.OP_CONNECT) {
                            key.interestOps(0);
                            ((SocketChannel) key.channel()).finishConnect();
                        } else {
                            log("readyOps() on client key #" + keyCount +
                                " returned " + readyOps);
                        }
                    }
                }
            }
        }
    }
}