8151299: Http client SelectorManager overwriting read and write events
authormichaelm
Wed, 09 Mar 2016 13:37:30 +0000
changeset 36434 3fd8dee1b158
parent 36433 dcbc230cfa4f
child 36435 0408881ad616
8151299: Http client SelectorManager overwriting read and write events Reviewed-by: chegar
jdk/src/java.httpclient/share/classes/java/net/http/HttpClientImpl.java
jdk/test/java/net/httpclient/whitebox/TEST.properties
jdk/test/java/net/httpclient/whitebox/java/net/http/SelectorTest.java
--- a/jdk/src/java.httpclient/share/classes/java/net/http/HttpClientImpl.java	Wed Mar 09 12:11:31 2016 +0000
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/HttpClientImpl.java	Wed Mar 09 13:37:30 2016 +0000
@@ -30,19 +30,17 @@
 import java.net.URI;
 import static java.net.http.Utils.BUFSIZE;
 import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
 import static java.nio.channels.SelectionKey.OP_CONNECT;
 import static java.nio.channels.SelectionKey.OP_READ;
 import static java.nio.channels.SelectionKey.OP_WRITE;
 import java.nio.channels.Selector;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
+import java.util.stream.Stream;
 import java.util.concurrent.ExecutorService;
 import java.security.NoSuchAlgorithmException;
-import java.util.ListIterator;
-import java.util.Optional;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import javax.net.ssl.SSLContext;
@@ -72,12 +70,6 @@
     private static final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
     private final LinkedList<TimeoutEvent> timeouts;
 
-    //@Override
-    void debugPrint() {
-        selmgr.debugPrint();
-        client2.debugPrint();
-    }
-
     public static HttpClientImpl create(HttpClientBuilderImpl builder) {
         HttpClientImpl impl = new HttpClientImpl(builder);
         impl.start();
@@ -173,19 +165,15 @@
     // Main loop for this client's selector
 
     class SelectorManager extends Thread {
-
         final Selector selector;
         boolean closed;
 
         final List<AsyncEvent> readyList;
         final List<AsyncEvent> registrations;
 
-        List<AsyncEvent> debugList;
-
         SelectorManager() throws IOException {
             readyList = new LinkedList<>();
             registrations = new LinkedList<>();
-            debugList = new LinkedList<>();
             selector = Selector.open();
         }
 
@@ -216,13 +204,6 @@
             return c;
         }
 
-        synchronized void debugPrint() {
-            System.err.println("Selecting on:");
-            for (AsyncEvent e : debugList) {
-                System.err.println(opvals(e.interestOps()));
-            }
-        }
-
         String opvals(int i) {
             StringBuilder sb = new StringBuilder();
             if ((i & OP_READ) != 0)
@@ -239,14 +220,18 @@
             try {
                 while (true) {
                     synchronized (this) {
-                        debugList = copy(registrations);
                         for (AsyncEvent exchange : registrations) {
                             SelectableChannel c = exchange.channel();
                             try {
                                 c.configureBlocking(false);
-                                c.register(selector,
-                                           exchange.interestOps(),
-                                           exchange);
+                                SelectionKey key = c.keyFor(selector);
+                                SelectorAttachment sa;
+                                if (key == null) {
+                                    sa = new SelectorAttachment(c, selector);
+                                } else {
+                                    sa = (SelectorAttachment)key.attachment();
+                                }
+                                sa.register(exchange);
                             } catch (IOException e) {
                                 Log.logError("HttpClientImpl: " + e);
                                 c.close();
@@ -266,11 +251,10 @@
                     Set<SelectionKey> keys = selector.selectedKeys();
 
                     for (SelectionKey key : keys) {
-                        if (key.isReadable() || key.isConnectable() || key.isWritable()) {
-                            key.cancel();
-                            AsyncEvent exchange = (AsyncEvent) key.attachment();
-                            readyList.add(exchange);
-                        }
+                        SelectorAttachment sa = (SelectorAttachment)key.attachment();
+                        int eventsOccurred = key.readyOps();
+                        sa.events(eventsOccurred).forEach(readyList::add);
+                        sa.resetInterestOps(eventsOccurred);
                     }
                     selector.selectNow(); // complete cancellation
                     selector.selectedKeys().clear();
@@ -306,6 +290,80 @@
     }
 
     /**
+     * Tracks multiple user level registrations associated with one NIO
+     * registration (SelectionKey). In this implementation, registrations
+     * are one-off and when an event is posted the registration is cancelled
+     * until explicitly registered again.
+     *
+     * <p> No external synchronization required as this class is only used
+     * by the SelectorManager thread. One of these objects required per
+     * connection.
+     */
+    private static class SelectorAttachment {
+        private final SelectableChannel chan;
+        private final Selector selector;
+        private final ArrayList<AsyncEvent> pending;
+        private int interestops;
+
+        SelectorAttachment(SelectableChannel chan, Selector selector) {
+            this.pending = new ArrayList<>();
+            this.chan = chan;
+            this.selector = selector;
+        }
+
+        void register(AsyncEvent e) throws ClosedChannelException {
+            int newops = e.interestOps();
+            boolean reRegister = (interestops & newops) != newops;
+            interestops |= newops;
+            pending.add(e);
+            if (reRegister) {
+                // first time registration happens here also
+                chan.register(selector, interestops, this);
+            }
+        }
+
+        int interestOps() {
+            return interestops;
+        }
+
+        /**
+         * Returns a Stream<AsyncEvents> containing only events that are
+         * registered with the given {@code interestop}.
+         */
+        Stream<AsyncEvent> events(int interestop) {
+            return pending.stream()
+                          .filter(ev -> (ev.interestOps() & interestop) != 0);
+        }
+
+        /**
+         * Removes any events with the given {@code interestop}, and if no
+         * events remaining, cancels the associated SelectionKey.
+         */
+        void resetInterestOps(int interestop) {
+            int newops = 0;
+
+            Iterator<AsyncEvent> itr = pending.iterator();
+            while (itr.hasNext()) {
+                AsyncEvent event = itr.next();
+                int evops = event.interestOps();
+                if ((evops & interestop) != 0) {
+                    itr.remove();
+                } else {
+                    newops |= evops;
+                }
+            }
+
+            interestops = newops;
+            SelectionKey key = chan.keyFor(selector);
+            if (newops == 0) {
+                key.cancel();
+            } else {
+                key.interestOps(newops);
+            }
+        }
+    }
+
+    /**
      * Creates a HttpRequest associated with this group.
      *
      * @throws IllegalStateException if the group has been stopped
@@ -425,18 +483,9 @@
             }
         }
         iter.add(event);
-        //debugPrintList("register");
         selmgr.wakeupSelector();
     }
 
-    void debugPrintList(String s) {
-        System.err.printf("%s: {", s);
-        for (TimeoutEvent e : timeouts) {
-            System.err.printf("(%d,%d) ", e.delta, e.timeval);
-        }
-        System.err.println("}");
-    }
-
     synchronized void signalTimeouts(long then) {
         if (timeouts.isEmpty()) {
             return;
@@ -462,7 +511,6 @@
                 break;
             }
         }
-        //debugPrintList("signalTimeouts");
     }
 
     synchronized void cancelTimer(TimeoutEvent event) {
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/net/httpclient/whitebox/TEST.properties	Wed Mar 09 13:37:30 2016 +0000
@@ -0,0 +1,3 @@
+TestNG.dirs = .
+
+bootclasspath.dirs = /java/net/httpclient
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/net/httpclient/whitebox/java/net/http/SelectorTest.java	Wed Mar 09 13:37:30 2016 +0000
@@ -0,0 +1,208 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/**
+ * @test
+ * @bug 8151299
+ * @summary Http client SelectorManager overwriting read and write events
+ */
+package java.net.http;
+
+import java.net.*;
+import java.io.*;
+import java.nio.channels.*;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import static java.lang.System.out;
+import static java.nio.charset.StandardCharsets.US_ASCII;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import org.testng.annotations.Test;
+
+/**
+ * Whitebox test of selector mechanics. Currently only a simple test
+ * setting one read and one write event is done. It checks that the
+ * write event occurs first, followed by the read event and then no
+ * further events occur despite the conditions actually still existing.
+ */
+@Test
+public class SelectorTest {
+
+    AtomicInteger counter = new AtomicInteger();
+    volatile boolean error;
+    static final CountDownLatch finishingGate = new CountDownLatch(1);
+
+    String readSomeBytes(RawChannel chan) {
+        try {
+            ByteBuffer buf = ByteBuffer.allocate(1024);
+            int t = chan.read(buf);
+            if (t <= 0) {
+                out.printf("chan read returned %d\n", t);
+                return null;
+            }
+            byte[] bb = new byte[t];
+            buf.get(bb);
+            return new String(bb, US_ASCII);
+        } catch (IOException ioe) {
+            throw new UncheckedIOException(ioe);
+        }
+    }
+
+    @Test(timeOut = 10000)
+    public void test() throws Exception {
+
+        try (ServerSocket server = new ServerSocket(0)) {
+            int port = server.getLocalPort();
+
+            out.println("Listening on port " + server.getLocalPort());
+
+            TestServer t = new TestServer(server);
+            t.start();
+            out.println("Started server thread");
+
+            final RawChannel chan = getARawChannel(port);
+
+            chan.registerEvent(new RawChannel.NonBlockingEvent() {
+                @Override
+                public int interestOps() {
+                    return SelectionKey.OP_READ;
+                }
+
+                @Override
+                public void handle() {
+                    readSomeBytes(chan);
+                    out.printf("OP_READ\n");
+                    if (counter.get() != 1) {
+                        out.printf("OP_READ error counter = %d\n", counter);
+                        error = true;
+                    }
+                }
+            });
+
+            chan.registerEvent(new RawChannel.NonBlockingEvent() {
+                @Override
+                public int interestOps() {
+                    return SelectionKey.OP_WRITE;
+                }
+
+                @Override
+                public void handle() {
+                    out.printf("OP_WRITE\n");
+                    if (counter.get() != 0) {
+                        out.printf("OP_WRITE error counter = %d\n", counter);
+                        error = true;
+                    } else {
+                        ByteBuffer bb = ByteBuffer.wrap(TestServer.INPUT);
+                        counter.incrementAndGet();
+                        try {
+                            chan.write(bb);
+                        } catch (IOException e) {
+                            throw new UncheckedIOException(e);
+                        }
+                    }
+                }
+
+            });
+            out.println("Events registered. Waiting");
+            finishingGate.await(30, SECONDS);
+            if (error)
+                throw new RuntimeException("Error");
+            else
+                out.println("No error");
+        }
+    }
+
+    static RawChannel getARawChannel(int port) throws Exception {
+        URI uri = URI.create("http://127.0.0.1:" + port + "/");
+        out.println("client connecting to " + uri.toString());
+        HttpRequest req = HttpRequest.create(uri).GET();
+        HttpResponse r = req.response();
+        r.body(HttpResponse.ignoreBody());
+        return ((HttpResponseImpl) r).rawChannel();
+    }
+
+    static class TestServer extends Thread {
+        static final byte[] INPUT = "Hello world".getBytes(US_ASCII);
+        static final byte[] OUTPUT = "Goodbye world".getBytes(US_ASCII);
+        static final String FIRST_RESPONSE = "HTTP/1.1 200 OK\r\nContent-length: 0\r\n\r\n";
+        final ServerSocket server;
+
+        TestServer(ServerSocket server) throws IOException {
+            this.server = server;
+        }
+
+        public void run() {
+            try (Socket s = server.accept();
+                 InputStream is = s.getInputStream();
+                 OutputStream os = s.getOutputStream()) {
+
+                out.println("Got connection");
+                readRequest(is);
+                os.write(FIRST_RESPONSE.getBytes());
+                read(is);
+                write(os);
+                Thread.sleep(1000);
+                // send some more data, and make sure WRITE op does not get called
+                write(os);
+                out.println("TestServer exiting");
+                SelectorTest.finishingGate.countDown();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+
+        // consumes the HTTP request
+        static void readRequest(InputStream is) throws IOException {
+            out.println("starting readRequest");
+            byte[] buf = new byte[1024];
+            String s = "";
+            while (true) {
+                int n = is.read(buf);
+                if (n <= 0)
+                    throw new IOException("Error");
+                s = s + new String(buf, 0, n);
+                if (s.indexOf("\r\n\r\n") != -1)
+                    break;
+            }
+            out.println("returning from readRequest");
+        }
+
+        static void read(InputStream is) throws IOException {
+            out.println("starting read");
+            for (int i = 0; i < INPUT.length; i++) {
+                int c = is.read();
+                if (c == -1)
+                    throw new IOException("closed");
+                if (INPUT[i] != (byte) c)
+                    throw new IOException("Error. Expected:" + INPUT[i] + ", got:" + c);
+            }
+            out.println("returning from read");
+        }
+
+        static void write(OutputStream os) throws IOException {
+            out.println("doing write");
+            os.write(OUTPUT);
+        }
+    }
+}