http-client-branch: review comment - WebSocket sizing queue; making queue faster; an extra test for the queue http-client-branch
authorprappo
Fri, 13 Apr 2018 15:33:13 +0100
branchhttp-client-branch
changeset 56427 7f1916397463
parent 56426 f39b316f10c9
child 56428 3dbf8ee93b08
http-client-branch: review comment - WebSocket sizing queue; making queue faster; an extra test for the queue
src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java
src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageQueue.java
src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportFactory.java
src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportFactoryImpl.java
src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java
src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java
test/jdk/java/net/httpclient/websocket/BlowupOutputQueue.java
test/jdk/java/net/httpclient/websocket/PendingOperations.java
test/jdk/java/net/httpclient/websocket/Support.java
test/jdk/java/net/httpclient/websocket/java.net.http/jdk/internal/net/http/websocket/MessageQueueTest.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java	Fri Apr 13 15:06:50 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java	Fri Apr 13 15:33:13 2018 +0100
@@ -25,11 +25,13 @@
 
 package jdk.internal.net.http.common;
 
-import java.net.http.HttpHeaders;
 import sun.net.NetProperties;
 import sun.net.util.IPAddressUtil;
 import sun.net.www.HeaderParser;
 
+import javax.net.ssl.ExtendedSSLSession;
+import javax.net.ssl.SSLParameters;
+import javax.net.ssl.SSLSession;
 import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
 import java.io.IOException;
@@ -41,6 +43,7 @@
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URLPermission;
+import java.net.http.HttpHeaders;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
@@ -59,9 +62,6 @@
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
-import javax.net.ssl.SSLParameters;
-import javax.net.ssl.SSLSession;
-import javax.net.ssl.ExtendedSSLSession;
 
 import static java.lang.String.format;
 import static java.util.Objects.requireNonNull;
@@ -917,4 +917,47 @@
         }
         return address;
     }
+
+    /**
+     * Returns the smallest (closest to zero) positive number {@code m} (which
+     * is also a power of 2) such that {@code n <= m}.
+     * <pre>{@code
+     *          n  pow2Size(n)
+     * -----------------------
+     *          0           1
+     *          1           1
+     *          2           2
+     *          3           4
+     *          4           4
+     *          5           8
+     *          6           8
+     *          7           8
+     *          8           8
+     *          9          16
+     *         10          16
+     *        ...         ...
+     * 2147483647  1073741824
+     * } </pre>
+     *
+     * The result is capped at {@code 1 << 30} as beyond that int wraps.
+     *
+     * @param n
+     *         capacity
+     *
+     * @return the size of the array
+     * @apiNote Used to size arrays in circular buffers (rings), usually in
+     * order to squeeze extra performance substituting {@code %} operation for
+     * {@code &}, which is up to 2 times faster.
+     */
+    public static int pow2Size(int n) {
+        if (n < 0) {
+            throw new IllegalArgumentException();
+        } else if (n == 0) {
+            return 1;
+        } else if (n >= (1 << 30)) { // 2^31 is a negative int
+            return 1 << 30;
+        } else {
+            return 1 << (32 - Integer.numberOfLeadingZeros(n - 1));
+        }
+    }
 }
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageQueue.java	Fri Apr 13 15:06:50 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageQueue.java	Fri Apr 13 15:33:13 2018 +0100
@@ -25,7 +25,6 @@
 
 package jdk.internal.net.http.websocket;
 
-import jdk.internal.net.http.common.Utils;
 import jdk.internal.vm.annotation.Stable;
 
 import java.io.IOException;
@@ -36,6 +35,8 @@
 import java.util.function.BiConsumer;
 import java.util.function.Supplier;
 
+import static jdk.internal.net.http.common.Utils.pow2Size;
+
 /*
  * A FIFO message storage facility.
  *
@@ -84,49 +85,28 @@
  */
 public class MessageQueue {
 
-    private final static boolean DEBUG = false;
-
     @Stable
     private final Message[] elements;
 
     private final AtomicInteger tail = new AtomicInteger();
     private volatile int head;
 
-    public MessageQueue() {
-        this(defaultSize());
-    }
-
-    /* Exposed for testing purposes */
-    protected MessageQueue(int size) {
-        if (size < 1) {
+    public MessageQueue(int capacity) {
+        if (capacity < 1) {
             throw new IllegalArgumentException();
         }
-        Message[] array = new Message[size + 1];
+        int s = pow2Size(capacity + 1);
+        assert s % 2 == 0 : s;
+        Message[] array = new Message[s];
         for (int i = 0; i < array.length; i++) {
             array[i] = new Message();
         }
         elements = array;
     }
 
-    private static int defaultSize() {
-        String property = "jdk.httpclient.websocket.outputQueueMaxSize";
-        int defaultSize = 128;
-        String value = Utils.getNetProperty(property);
-        int size;
-        if (value == null) {
-            size = defaultSize;
-        } else {
-            try {
-                size = Integer.parseUnsignedInt(value);
-            } catch (NumberFormatException ignored) {
-                size = defaultSize;
-            }
-        }
-        if (DEBUG) {
-            System.out.printf("[MessageQueue] %s=%s, using size %s%n",
-                              property, value, size);
-        }
-        return size;
+    /* Exposed for testing purposes */
+    protected static int effectiveCapacityOf(int n) {
+        return pow2Size(n + 1) - 1;
     }
 
     public <T> void addText(CharBuffer message,
@@ -158,7 +138,7 @@
         do {
             h = head;
             currentTail = tail.get();
-            newTail = (currentTail + 1) % elements.length;
+            newTail = (currentTail + 1) & (elements.length - 1);
             if (newTail == h) {
                 throw new IOException("Queue full");
             }
@@ -314,7 +294,7 @@
         h.action = null;
         h.future = null;
         h.ready = false;
-        head = (currentHead + 1) % elements.length;
+        head = (currentHead + 1) & (elements.length - 1);
     }
 
     private enum Type {
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportFactory.java	Fri Apr 13 15:06:50 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportFactory.java	Fri Apr 13 15:33:13 2018 +0100
@@ -28,5 +28,6 @@
 @FunctionalInterface
 public interface TransportFactory {
 
-    Transport createTransport(MessageStreamConsumer consumer);
+    Transport createTransport(MessageQueue queue,
+                              MessageStreamConsumer consumer);
 }
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportFactoryImpl.java	Fri Apr 13 15:06:50 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportFactoryImpl.java	Fri Apr 13 15:33:13 2018 +0100
@@ -34,7 +34,8 @@
     }
 
     @Override
-    public Transport createTransport(MessageStreamConsumer consumer) {
-        return new TransportImpl(consumer, channel);
+    public Transport createTransport(MessageQueue queue,
+                                     MessageStreamConsumer consumer) {
+        return new TransportImpl(queue, consumer, channel);
     }
 }
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java	Fri Apr 13 15:06:50 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java	Fri Apr 13 15:33:13 2018 +0100
@@ -60,7 +60,7 @@
 
     private final SequentialScheduler sendScheduler = new SequentialScheduler(new SendTask());
 
-    private final MessageQueue queue = new MessageQueue();
+    private final MessageQueue queue;
     private final MessageEncoder encoder = new MessageEncoder();
     /* A reusable buffer for writing, initially with no remaining bytes */
     private final ByteBuffer dst = createWriteBuffer().position(0).limit(0);
@@ -82,7 +82,10 @@
     private volatile ChannelState readState = UNREGISTERED;
     private boolean inputClosed;
     private boolean outputClosed;
-    public TransportImpl(MessageStreamConsumer consumer, RawChannel channel) {
+
+    public TransportImpl(MessageQueue queue, MessageStreamConsumer consumer,
+                         RawChannel channel) {
+        this.queue = queue;
         this.messageConsumer = consumer;
         this.channel = channel;
         this.decoder = new MessageDecoder(this.messageConsumer);
@@ -145,6 +148,7 @@
             queue.addText(text, isLast, attachment, action, f);
             sendScheduler.runOrSchedule();
         } catch (IOException e) {
+            action.accept(null, e);
             f.completeExceptionally(e);
         }
         if (debug.isLoggable(Level.DEBUG)) {
@@ -169,6 +173,7 @@
             queue.addBinary(message, isLast, attachment, action, f);
             sendScheduler.runOrSchedule();
         } catch (IOException e) {
+            action.accept(null, e);
             f.completeExceptionally(e);
         }
         if (debug.isLoggable(Level.DEBUG)) {
@@ -192,6 +197,7 @@
             queue.addPing(message, attachment, action, f);
             sendScheduler.runOrSchedule();
         } catch (IOException e) {
+            action.accept(null, e);
             f.completeExceptionally(e);
         }
         if (debug.isLoggable(Level.DEBUG)) {
@@ -215,6 +221,7 @@
             queue.addPong(message, attachment, action, f);
             sendScheduler.runOrSchedule();
         } catch (IOException e) {
+            action.accept(null, e);
             f.completeExceptionally(e);
         }
         if (debug.isLoggable(Level.DEBUG)) {
@@ -238,6 +245,7 @@
             queue.addPong(message, attachment, action, f);
             sendScheduler.runOrSchedule();
         } catch (IOException e) {
+            action.accept(null, e);
             f.completeExceptionally(e);
         }
         if (debug.isLoggable(Level.DEBUG)) {
@@ -262,6 +270,7 @@
             queue.addClose(statusCode, CharBuffer.wrap(reason), attachment, action, f);
             sendScheduler.runOrSchedule();
         } catch (IOException e) {
+            action.accept(null, e);
             f.completeExceptionally(e);
         }
         if (debug.isLoggable(Level.DEBUG)) {
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java	Fri Apr 13 15:06:50 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java	Fri Apr 13 15:33:13 2018 +0100
@@ -33,7 +33,6 @@
 import jdk.internal.net.http.websocket.OpeningHandshake.Result;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.lang.System.Logger.Level;
 import java.lang.ref.Reference;
 import java.net.ProtocolException;
@@ -55,7 +54,6 @@
 import java.util.function.Function;
 
 import static java.util.Objects.requireNonNull;
-import static jdk.internal.net.http.common.MinimalFuture.completedFuture;
 import static jdk.internal.net.http.common.MinimalFuture.failedFuture;
 import static jdk.internal.net.http.websocket.StatusCodes.CLOSED_ABNORMALLY;
 import static jdk.internal.net.http.websocket.StatusCodes.NO_STATUS_CODE;
@@ -160,7 +158,11 @@
         this.uri = requireNonNull(uri);
         this.subprotocol = requireNonNull(subprotocol);
         this.listener = requireNonNull(listener);
-        this.transport = transportFactory.createTransport(
+        // Why 6? 1 sendPing/sendPong + 1 sendText/sendBinary + 1 Close +
+        // 2 automatic Ping replies + 1 automatic Close = 6 messages
+        // Why 2 automatic Pong replies? One is being sent, but the byte buffer
+        // has been set to null, another just has been added.
+        this.transport = transportFactory.createTransport(new MessageQueue(6),
                 new SignallingMessageConsumer());
     }
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/websocket/BlowupOutputQueue.java	Fri Apr 13 15:33:13 2018 +0100
@@ -0,0 +1,133 @@
+/*
+ * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * @test
+ * @build DummyWebSocketServer
+ * @run testng/othervm
+ *      -Djdk.internal.httpclient.debug=true
+ *      -Djdk.internal.httpclient.websocket.debug=true
+ *       BlowupOutputQueue
+ */
+
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.http.WebSocket;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static java.net.http.HttpClient.newHttpClient;
+import static org.testng.Assert.assertFalse;
+
+public class BlowupOutputQueue extends PendingOperations {
+
+    CompletableFuture<WebSocket> cfText;
+    CompletableFuture<WebSocket> cfPing;
+    CompletableFuture<WebSocket> cfClose;
+    MockListener listener;
+
+    /*
+     * The idea is to arrange things such that the internal queue will be fully
+     * utilized and then make sure there won't be any errors caused by that.
+     *
+     * First, fill the queue with Text messages. Once done, send a Ping message.
+     * At this stage, there are at least 2 messages are in queue. Now, start
+     * receiving. Received Ping messages will cause automatic Pong replies. When
+     * all done, there will be at least 3 messages in the queue. (As at least
+     * the a single Ping has to be replied). Then send a Close message. Now
+     * there are at least 4 messages in the queue. Finally, receive the last
+     * message which is a Close message. This will cause an automatic reply with
+     * a Close message from the client side. All in all there should be at least
+     * 5 messages in the queue.
+     */
+    @Test
+    public void full() throws Exception {
+        int N = 32;
+        int[] incoming = new int[2 * (N + 1)]; // 2 bytes per message
+        for (int i = 0; i < incoming.length - 2; i += 2) {
+            // <PING>
+            incoming[i + 0] = 0x89;
+            incoming[i + 1] = 0x00;
+        }
+        // <CLOSE>
+        incoming[incoming.length - 2] = 0x88;
+        incoming[incoming.length - 1] = 0x00;
+
+        repeatable(() -> {
+            CountDownLatch allButCloseReceived = new CountDownLatch(N);
+            server = Support.writingServer(incoming);
+            server.open();
+            listener = new MockListener() {
+
+                @Override
+                protected void replenish(WebSocket webSocket) {
+                    /* do nothing */
+                }
+
+                @Override
+                protected CompletionStage<?> onPing0(WebSocket webSocket,
+                                                     ByteBuffer message) {
+                    allButCloseReceived.countDown();
+                    return null;
+                }
+            };
+            webSocket = newHttpClient().newWebSocketBuilder()
+                    .buildAsync(server.getURI(), listener)
+                    .join();
+            CharBuffer data = CharBuffer.allocate(65536);
+            for (int i = 0; ; i++) {  // fill up the send buffer
+                long start = System.currentTimeMillis();
+                System.out.printf("begin cycle #%s at %s%n", i, start);
+                cfText = webSocket.sendText(data, true);
+                try {
+                    cfText.get(MAX_WAIT_SEC, TimeUnit.SECONDS);
+                    data.clear();
+                } catch (TimeoutException e) {
+                    break;
+                } finally {
+                    long stop = System.currentTimeMillis();
+                    System.out.printf("end cycle #%s at %s (%s ms)%n", i, stop, stop - start);
+                }
+            }
+            cfPing = webSocket.sendPing(ByteBuffer.allocate(125));
+            webSocket.request(N);
+            allButCloseReceived.await();
+            webSocket.request(1); // Receive the last message: Close
+            return null;
+        }, () -> cfText.isDone());
+        List<MockListener.Invocation> invocations = listener.invocations();
+        cfClose = webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "ok");
+
+        assertFalse(invocations.contains(new MockListener.OnError(webSocket, IOException.class)));
+        assertFalse(cfText.isDone());
+        assertFalse(cfPing.isDone());
+        assertFalse(cfClose.isDone());
+    }
+}
--- a/test/jdk/java/net/httpclient/websocket/PendingOperations.java	Fri Apr 13 15:06:50 2018 +0100
+++ b/test/jdk/java/net/httpclient/websocket/PendingOperations.java	Fri Apr 13 15:33:13 2018 +0100
@@ -27,11 +27,8 @@
 import java.io.IOException;
 import java.net.http.WebSocket;
 import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.function.BooleanSupplier;
-import java.util.function.Predicate;
-import java.util.function.Supplier;
 
 /* Common infrastructure for tests that check pending operations */
 public class PendingOperations {
--- a/test/jdk/java/net/httpclient/websocket/Support.java	Fri Apr 13 15:06:50 2018 +0100
+++ b/test/jdk/java/net/httpclient/websocket/Support.java	Fri Apr 13 15:33:13 2018 +0100
@@ -126,6 +126,42 @@
         };
     }
 
+    public static DummyWebSocketServer writingServer(int... data) {
+        byte[] copy = new byte[data.length];
+        for (int i = 0; i < data.length; i++) {
+            copy[i] = (byte) data[i];
+        }
+        return new DummyWebSocketServer() {
+
+            @Override
+            protected void read(SocketChannel ch) throws IOException {
+                try {
+                    Thread.sleep(Long.MAX_VALUE);
+                } catch (InterruptedException e) {
+                    throw new IOException(e);
+                }
+            }
+
+            @Override
+            protected void write(SocketChannel ch) throws IOException {
+                int off = 0; int n = 1; // 1 byte at a time
+                while (off + n < copy.length + n) {
+//                    try {
+//                        TimeUnit.MICROSECONDS.sleep(500);
+//                    } catch (InterruptedException e) {
+//                        return;
+//                    }
+                    int len = Math.min(copy.length - off, n);
+                    ByteBuffer bytes = ByteBuffer.wrap(copy, off, len);
+                    off += len;
+                    ch.write(bytes);
+                }
+                super.write(ch);
+            }
+        };
+
+    }
+
     public static String stringWith2NBytes(int n) {
         // -- Russian Alphabet (33 characters, 2 bytes per char) --
         char[] abc = {
--- a/test/jdk/java/net/httpclient/websocket/java.net.http/jdk/internal/net/http/websocket/MessageQueueTest.java	Fri Apr 13 15:06:50 2018 +0100
+++ b/test/jdk/java/net/httpclient/websocket/java.net.http/jdk/internal/net/http/websocket/MessageQueueTest.java	Fri Apr 13 15:33:13 2018 +0100
@@ -45,11 +45,16 @@
 import java.util.function.BiConsumer;
 import java.util.function.Supplier;
 
+import static jdk.internal.net.http.websocket.MessageQueue.effectiveCapacityOf;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
 
+/*
+ * A unit test for MessageQueue. The test is aware of the details of the queue
+ * implementation.
+ */
 public class MessageQueueTest {
 
     private static final Random r = new SecureRandom();
@@ -77,18 +82,19 @@
     @Test(dataProvider = "capacities")
     public void fullness(int n) throws IOException {
         MessageQueue q = new MessageQueue(n);
+        int cap = effectiveCapacityOf(n);
         Adder adder = new Adder();
         Queue<Message> referenceQueue = new LinkedList<>();
-        for (int i = 0; i < n; i++) {
+        for (int i = 0; i < cap; i++) {
             Message m = createRandomMessage();
             referenceQueue.add(m);
             adder.add(q, m);
         }
-        for (int i = 0; i < n + 1; i++) {
+        for (int i = 0; i < cap + 1; i++) {
             Message m = createRandomMessage();
             assertThrows(IOException.class, () -> adder.add(q, m));
         }
-        for (int i = 0; i < n; i++) {
+        for (int i = 0; i < cap; i++) {
             Message expected = referenceQueue.remove();
             Message actual = new Remover().removeFrom(q);
             assertEquals(actual, expected);
@@ -141,12 +147,13 @@
     @Test(dataProvider = "capacities")
     public void caterpillarWalk(int n) throws IOException {
 //        System.out.println("n: " + n);
-        for (int p = 1; p <= n; p++) { // pace
+        int cap = effectiveCapacityOf(n);
+        for (int p = 1; p <= cap; p++) { // pace
 //            System.out.println("  pace: " + p);
             MessageQueue q = new MessageQueue(n);
             Queue<Message> referenceQueue = new LinkedList<>();
             Adder adder = new Adder();
-            for (int k = 0; k < (n / p) + 1; k++) {
+            for (int k = 0; k < (cap / p) + 1; k++) {
 //                System.out.println("    cycle: " + k);
                 for (int i = 0; i < p; i++) {
                     Message m = createRandomMessage();
@@ -188,9 +195,6 @@
                     f.get();    // Just to check for exceptions
                 }
                 futures.clear();
-                // Make sure the queue is full
-                assertThrows(IOException.class,
-                             () -> adder.add(q, createRandomMessage()));
             }
         } finally {
             executorService.shutdownNow();
@@ -257,13 +261,14 @@
     public void testSingleThreaded(int n) throws IOException {
         Queue<Message> referenceQueue = new LinkedList<>();
         MessageQueue q = new MessageQueue(n);
+        int cap = effectiveCapacityOf(n);
         Adder adder = new Adder();
-        for (int i = 0; i < n; i++) {
+        for (int i = 0; i < cap; i++) {
             Message m = createRandomMessage();
             referenceQueue.add(m);
             adder.add(q, m);
         }
-        for (int i = 0; i < n; i++) {
+        for (int i = 0; i < cap; i++) {
             Message expected = referenceQueue.remove();
             Message actual = new Remover().removeFrom(q);
             assertEquals(actual, expected);