# HG changeset patch # User prappo # Date 1523629993 -3600 # Node ID 7f19163974638440a699c49918887bcb92ecc663 # Parent f39b316f10c9379e6efe2b33bf6967e132cc8af3 http-client-branch: review comment - WebSocket sizing queue; making queue faster; an extra test for the queue diff -r f39b316f10c9 -r 7f1916397463 src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java Fri Apr 13 15:06:50 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java Fri Apr 13 15:33:13 2018 +0100 @@ -25,11 +25,13 @@ package jdk.internal.net.http.common; -import java.net.http.HttpHeaders; import sun.net.NetProperties; import sun.net.util.IPAddressUtil; import sun.net.www.HeaderParser; +import javax.net.ssl.ExtendedSSLSession; +import javax.net.ssl.SSLParameters; +import javax.net.ssl.SSLSession; import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.IOException; @@ -41,6 +43,7 @@ import java.net.InetSocketAddress; import java.net.URI; import java.net.URLPermission; +import java.net.http.HttpHeaders; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; @@ -59,9 +62,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; -import javax.net.ssl.SSLParameters; -import javax.net.ssl.SSLSession; -import javax.net.ssl.ExtendedSSLSession; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -917,4 +917,47 @@ } return address; } + + /** + * Returns the smallest (closest to zero) positive number {@code m} (which + * is also a power of 2) such that {@code n <= m}. + *
{@code
+     *          n  pow2Size(n)
+     * -----------------------
+     *          0           1
+     *          1           1
+     *          2           2
+     *          3           4
+     *          4           4
+     *          5           8
+     *          6           8
+     *          7           8
+     *          8           8
+     *          9          16
+     *         10          16
+     *        ...         ...
+     * 2147483647  1073741824
+     * } 
+ * + * The result is capped at {@code 1 << 30} as beyond that int wraps. + * + * @param n + * capacity + * + * @return the size of the array + * @apiNote Used to size arrays in circular buffers (rings), usually in + * order to squeeze extra performance substituting {@code %} operation for + * {@code &}, which is up to 2 times faster. + */ + public static int pow2Size(int n) { + if (n < 0) { + throw new IllegalArgumentException(); + } else if (n == 0) { + return 1; + } else if (n >= (1 << 30)) { // 2^31 is a negative int + return 1 << 30; + } else { + return 1 << (32 - Integer.numberOfLeadingZeros(n - 1)); + } + } } diff -r f39b316f10c9 -r 7f1916397463 src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageQueue.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageQueue.java Fri Apr 13 15:06:50 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageQueue.java Fri Apr 13 15:33:13 2018 +0100 @@ -25,7 +25,6 @@ package jdk.internal.net.http.websocket; -import jdk.internal.net.http.common.Utils; import jdk.internal.vm.annotation.Stable; import java.io.IOException; @@ -36,6 +35,8 @@ import java.util.function.BiConsumer; import java.util.function.Supplier; +import static jdk.internal.net.http.common.Utils.pow2Size; + /* * A FIFO message storage facility. * @@ -84,49 +85,28 @@ */ public class MessageQueue { - private final static boolean DEBUG = false; - @Stable private final Message[] elements; private final AtomicInteger tail = new AtomicInteger(); private volatile int head; - public MessageQueue() { - this(defaultSize()); - } - - /* Exposed for testing purposes */ - protected MessageQueue(int size) { - if (size < 1) { + public MessageQueue(int capacity) { + if (capacity < 1) { throw new IllegalArgumentException(); } - Message[] array = new Message[size + 1]; + int s = pow2Size(capacity + 1); + assert s % 2 == 0 : s; + Message[] array = new Message[s]; for (int i = 0; i < array.length; i++) { array[i] = new Message(); } elements = array; } - private static int defaultSize() { - String property = "jdk.httpclient.websocket.outputQueueMaxSize"; - int defaultSize = 128; - String value = Utils.getNetProperty(property); - int size; - if (value == null) { - size = defaultSize; - } else { - try { - size = Integer.parseUnsignedInt(value); - } catch (NumberFormatException ignored) { - size = defaultSize; - } - } - if (DEBUG) { - System.out.printf("[MessageQueue] %s=%s, using size %s%n", - property, value, size); - } - return size; + /* Exposed for testing purposes */ + protected static int effectiveCapacityOf(int n) { + return pow2Size(n + 1) - 1; } public void addText(CharBuffer message, @@ -158,7 +138,7 @@ do { h = head; currentTail = tail.get(); - newTail = (currentTail + 1) % elements.length; + newTail = (currentTail + 1) & (elements.length - 1); if (newTail == h) { throw new IOException("Queue full"); } @@ -314,7 +294,7 @@ h.action = null; h.future = null; h.ready = false; - head = (currentHead + 1) % elements.length; + head = (currentHead + 1) & (elements.length - 1); } private enum Type { diff -r f39b316f10c9 -r 7f1916397463 src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportFactory.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportFactory.java Fri Apr 13 15:06:50 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportFactory.java Fri Apr 13 15:33:13 2018 +0100 @@ -28,5 +28,6 @@ @FunctionalInterface public interface TransportFactory { - Transport createTransport(MessageStreamConsumer consumer); + Transport createTransport(MessageQueue queue, + MessageStreamConsumer consumer); } diff -r f39b316f10c9 -r 7f1916397463 src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportFactoryImpl.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportFactoryImpl.java Fri Apr 13 15:06:50 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportFactoryImpl.java Fri Apr 13 15:33:13 2018 +0100 @@ -34,7 +34,8 @@ } @Override - public Transport createTransport(MessageStreamConsumer consumer) { - return new TransportImpl(consumer, channel); + public Transport createTransport(MessageQueue queue, + MessageStreamConsumer consumer) { + return new TransportImpl(queue, consumer, channel); } } diff -r f39b316f10c9 -r 7f1916397463 src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java Fri Apr 13 15:06:50 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java Fri Apr 13 15:33:13 2018 +0100 @@ -60,7 +60,7 @@ private final SequentialScheduler sendScheduler = new SequentialScheduler(new SendTask()); - private final MessageQueue queue = new MessageQueue(); + private final MessageQueue queue; private final MessageEncoder encoder = new MessageEncoder(); /* A reusable buffer for writing, initially with no remaining bytes */ private final ByteBuffer dst = createWriteBuffer().position(0).limit(0); @@ -82,7 +82,10 @@ private volatile ChannelState readState = UNREGISTERED; private boolean inputClosed; private boolean outputClosed; - public TransportImpl(MessageStreamConsumer consumer, RawChannel channel) { + + public TransportImpl(MessageQueue queue, MessageStreamConsumer consumer, + RawChannel channel) { + this.queue = queue; this.messageConsumer = consumer; this.channel = channel; this.decoder = new MessageDecoder(this.messageConsumer); @@ -145,6 +148,7 @@ queue.addText(text, isLast, attachment, action, f); sendScheduler.runOrSchedule(); } catch (IOException e) { + action.accept(null, e); f.completeExceptionally(e); } if (debug.isLoggable(Level.DEBUG)) { @@ -169,6 +173,7 @@ queue.addBinary(message, isLast, attachment, action, f); sendScheduler.runOrSchedule(); } catch (IOException e) { + action.accept(null, e); f.completeExceptionally(e); } if (debug.isLoggable(Level.DEBUG)) { @@ -192,6 +197,7 @@ queue.addPing(message, attachment, action, f); sendScheduler.runOrSchedule(); } catch (IOException e) { + action.accept(null, e); f.completeExceptionally(e); } if (debug.isLoggable(Level.DEBUG)) { @@ -215,6 +221,7 @@ queue.addPong(message, attachment, action, f); sendScheduler.runOrSchedule(); } catch (IOException e) { + action.accept(null, e); f.completeExceptionally(e); } if (debug.isLoggable(Level.DEBUG)) { @@ -238,6 +245,7 @@ queue.addPong(message, attachment, action, f); sendScheduler.runOrSchedule(); } catch (IOException e) { + action.accept(null, e); f.completeExceptionally(e); } if (debug.isLoggable(Level.DEBUG)) { @@ -262,6 +270,7 @@ queue.addClose(statusCode, CharBuffer.wrap(reason), attachment, action, f); sendScheduler.runOrSchedule(); } catch (IOException e) { + action.accept(null, e); f.completeExceptionally(e); } if (debug.isLoggable(Level.DEBUG)) { diff -r f39b316f10c9 -r 7f1916397463 src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java Fri Apr 13 15:06:50 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java Fri Apr 13 15:33:13 2018 +0100 @@ -33,7 +33,6 @@ import jdk.internal.net.http.websocket.OpeningHandshake.Result; import java.io.IOException; -import java.io.InterruptedIOException; import java.lang.System.Logger.Level; import java.lang.ref.Reference; import java.net.ProtocolException; @@ -55,7 +54,6 @@ import java.util.function.Function; import static java.util.Objects.requireNonNull; -import static jdk.internal.net.http.common.MinimalFuture.completedFuture; import static jdk.internal.net.http.common.MinimalFuture.failedFuture; import static jdk.internal.net.http.websocket.StatusCodes.CLOSED_ABNORMALLY; import static jdk.internal.net.http.websocket.StatusCodes.NO_STATUS_CODE; @@ -160,7 +158,11 @@ this.uri = requireNonNull(uri); this.subprotocol = requireNonNull(subprotocol); this.listener = requireNonNull(listener); - this.transport = transportFactory.createTransport( + // Why 6? 1 sendPing/sendPong + 1 sendText/sendBinary + 1 Close + + // 2 automatic Ping replies + 1 automatic Close = 6 messages + // Why 2 automatic Pong replies? One is being sent, but the byte buffer + // has been set to null, another just has been added. + this.transport = transportFactory.createTransport(new MessageQueue(6), new SignallingMessageConsumer()); } diff -r f39b316f10c9 -r 7f1916397463 test/jdk/java/net/httpclient/websocket/BlowupOutputQueue.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test/jdk/java/net/httpclient/websocket/BlowupOutputQueue.java Fri Apr 13 15:33:13 2018 +0100 @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +/* + * @test + * @build DummyWebSocketServer + * @run testng/othervm + * -Djdk.internal.httpclient.debug=true + * -Djdk.internal.httpclient.websocket.debug=true + * BlowupOutputQueue + */ + +import org.testng.annotations.Test; + +import java.io.IOException; +import java.net.http.WebSocket; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static java.net.http.HttpClient.newHttpClient; +import static org.testng.Assert.assertFalse; + +public class BlowupOutputQueue extends PendingOperations { + + CompletableFuture cfText; + CompletableFuture cfPing; + CompletableFuture cfClose; + MockListener listener; + + /* + * The idea is to arrange things such that the internal queue will be fully + * utilized and then make sure there won't be any errors caused by that. + * + * First, fill the queue with Text messages. Once done, send a Ping message. + * At this stage, there are at least 2 messages are in queue. Now, start + * receiving. Received Ping messages will cause automatic Pong replies. When + * all done, there will be at least 3 messages in the queue. (As at least + * the a single Ping has to be replied). Then send a Close message. Now + * there are at least 4 messages in the queue. Finally, receive the last + * message which is a Close message. This will cause an automatic reply with + * a Close message from the client side. All in all there should be at least + * 5 messages in the queue. + */ + @Test + public void full() throws Exception { + int N = 32; + int[] incoming = new int[2 * (N + 1)]; // 2 bytes per message + for (int i = 0; i < incoming.length - 2; i += 2) { + // + incoming[i + 0] = 0x89; + incoming[i + 1] = 0x00; + } + // + incoming[incoming.length - 2] = 0x88; + incoming[incoming.length - 1] = 0x00; + + repeatable(() -> { + CountDownLatch allButCloseReceived = new CountDownLatch(N); + server = Support.writingServer(incoming); + server.open(); + listener = new MockListener() { + + @Override + protected void replenish(WebSocket webSocket) { + /* do nothing */ + } + + @Override + protected CompletionStage onPing0(WebSocket webSocket, + ByteBuffer message) { + allButCloseReceived.countDown(); + return null; + } + }; + webSocket = newHttpClient().newWebSocketBuilder() + .buildAsync(server.getURI(), listener) + .join(); + CharBuffer data = CharBuffer.allocate(65536); + for (int i = 0; ; i++) { // fill up the send buffer + long start = System.currentTimeMillis(); + System.out.printf("begin cycle #%s at %s%n", i, start); + cfText = webSocket.sendText(data, true); + try { + cfText.get(MAX_WAIT_SEC, TimeUnit.SECONDS); + data.clear(); + } catch (TimeoutException e) { + break; + } finally { + long stop = System.currentTimeMillis(); + System.out.printf("end cycle #%s at %s (%s ms)%n", i, stop, stop - start); + } + } + cfPing = webSocket.sendPing(ByteBuffer.allocate(125)); + webSocket.request(N); + allButCloseReceived.await(); + webSocket.request(1); // Receive the last message: Close + return null; + }, () -> cfText.isDone()); + List invocations = listener.invocations(); + cfClose = webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "ok"); + + assertFalse(invocations.contains(new MockListener.OnError(webSocket, IOException.class))); + assertFalse(cfText.isDone()); + assertFalse(cfPing.isDone()); + assertFalse(cfClose.isDone()); + } +} diff -r f39b316f10c9 -r 7f1916397463 test/jdk/java/net/httpclient/websocket/PendingOperations.java --- a/test/jdk/java/net/httpclient/websocket/PendingOperations.java Fri Apr 13 15:06:50 2018 +0100 +++ b/test/jdk/java/net/httpclient/websocket/PendingOperations.java Fri Apr 13 15:33:13 2018 +0100 @@ -27,11 +27,8 @@ import java.io.IOException; import java.net.http.WebSocket; import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.BooleanSupplier; -import java.util.function.Predicate; -import java.util.function.Supplier; /* Common infrastructure for tests that check pending operations */ public class PendingOperations { diff -r f39b316f10c9 -r 7f1916397463 test/jdk/java/net/httpclient/websocket/Support.java --- a/test/jdk/java/net/httpclient/websocket/Support.java Fri Apr 13 15:06:50 2018 +0100 +++ b/test/jdk/java/net/httpclient/websocket/Support.java Fri Apr 13 15:33:13 2018 +0100 @@ -126,6 +126,42 @@ }; } + public static DummyWebSocketServer writingServer(int... data) { + byte[] copy = new byte[data.length]; + for (int i = 0; i < data.length; i++) { + copy[i] = (byte) data[i]; + } + return new DummyWebSocketServer() { + + @Override + protected void read(SocketChannel ch) throws IOException { + try { + Thread.sleep(Long.MAX_VALUE); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + @Override + protected void write(SocketChannel ch) throws IOException { + int off = 0; int n = 1; // 1 byte at a time + while (off + n < copy.length + n) { +// try { +// TimeUnit.MICROSECONDS.sleep(500); +// } catch (InterruptedException e) { +// return; +// } + int len = Math.min(copy.length - off, n); + ByteBuffer bytes = ByteBuffer.wrap(copy, off, len); + off += len; + ch.write(bytes); + } + super.write(ch); + } + }; + + } + public static String stringWith2NBytes(int n) { // -- Russian Alphabet (33 characters, 2 bytes per char) -- char[] abc = { diff -r f39b316f10c9 -r 7f1916397463 test/jdk/java/net/httpclient/websocket/java.net.http/jdk/internal/net/http/websocket/MessageQueueTest.java --- a/test/jdk/java/net/httpclient/websocket/java.net.http/jdk/internal/net/http/websocket/MessageQueueTest.java Fri Apr 13 15:06:50 2018 +0100 +++ b/test/jdk/java/net/httpclient/websocket/java.net.http/jdk/internal/net/http/websocket/MessageQueueTest.java Fri Apr 13 15:33:13 2018 +0100 @@ -45,11 +45,16 @@ import java.util.function.BiConsumer; import java.util.function.Supplier; +import static jdk.internal.net.http.websocket.MessageQueue.effectiveCapacityOf; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; +/* + * A unit test for MessageQueue. The test is aware of the details of the queue + * implementation. + */ public class MessageQueueTest { private static final Random r = new SecureRandom(); @@ -77,18 +82,19 @@ @Test(dataProvider = "capacities") public void fullness(int n) throws IOException { MessageQueue q = new MessageQueue(n); + int cap = effectiveCapacityOf(n); Adder adder = new Adder(); Queue referenceQueue = new LinkedList<>(); - for (int i = 0; i < n; i++) { + for (int i = 0; i < cap; i++) { Message m = createRandomMessage(); referenceQueue.add(m); adder.add(q, m); } - for (int i = 0; i < n + 1; i++) { + for (int i = 0; i < cap + 1; i++) { Message m = createRandomMessage(); assertThrows(IOException.class, () -> adder.add(q, m)); } - for (int i = 0; i < n; i++) { + for (int i = 0; i < cap; i++) { Message expected = referenceQueue.remove(); Message actual = new Remover().removeFrom(q); assertEquals(actual, expected); @@ -141,12 +147,13 @@ @Test(dataProvider = "capacities") public void caterpillarWalk(int n) throws IOException { // System.out.println("n: " + n); - for (int p = 1; p <= n; p++) { // pace + int cap = effectiveCapacityOf(n); + for (int p = 1; p <= cap; p++) { // pace // System.out.println(" pace: " + p); MessageQueue q = new MessageQueue(n); Queue referenceQueue = new LinkedList<>(); Adder adder = new Adder(); - for (int k = 0; k < (n / p) + 1; k++) { + for (int k = 0; k < (cap / p) + 1; k++) { // System.out.println(" cycle: " + k); for (int i = 0; i < p; i++) { Message m = createRandomMessage(); @@ -188,9 +195,6 @@ f.get(); // Just to check for exceptions } futures.clear(); - // Make sure the queue is full - assertThrows(IOException.class, - () -> adder.add(q, createRandomMessage())); } } finally { executorService.shutdownNow(); @@ -257,13 +261,14 @@ public void testSingleThreaded(int n) throws IOException { Queue referenceQueue = new LinkedList<>(); MessageQueue q = new MessageQueue(n); + int cap = effectiveCapacityOf(n); Adder adder = new Adder(); - for (int i = 0; i < n; i++) { + for (int i = 0; i < cap; i++) { Message m = createRandomMessage(); referenceQueue.add(m); adder.add(q, m); } - for (int i = 0; i < n; i++) { + for (int i = 0; i < cap; i++) { Message expected = referenceQueue.remove(); Message actual = new Remover().removeFrom(q); assertEquals(actual, expected);