http-client-branch: review comment - WebSocket sizing queue; making queue faster; an extra test for the queue
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java Fri Apr 13 15:06:50 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java Fri Apr 13 15:33:13 2018 +0100
@@ -25,11 +25,13 @@
package jdk.internal.net.http.common;
-import java.net.http.HttpHeaders;
import sun.net.NetProperties;
import sun.net.util.IPAddressUtil;
import sun.net.www.HeaderParser;
+import javax.net.ssl.ExtendedSSLSession;
+import javax.net.ssl.SSLParameters;
+import javax.net.ssl.SSLSession;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
@@ -41,6 +43,7 @@
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URLPermission;
+import java.net.http.HttpHeaders;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
@@ -59,9 +62,6 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import javax.net.ssl.SSLParameters;
-import javax.net.ssl.SSLSession;
-import javax.net.ssl.ExtendedSSLSession;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
@@ -917,4 +917,47 @@
}
return address;
}
+
+ /**
+ * Returns the smallest (closest to zero) positive number {@code m} (which
+ * is also a power of 2) such that {@code n <= m}.
+ * <pre>{@code
+ * n pow2Size(n)
+ * -----------------------
+ * 0 1
+ * 1 1
+ * 2 2
+ * 3 4
+ * 4 4
+ * 5 8
+ * 6 8
+ * 7 8
+ * 8 8
+ * 9 16
+ * 10 16
+ * ... ...
+ * 2147483647 1073741824
+ * } </pre>
+ *
+ * The result is capped at {@code 1 << 30} as beyond that int wraps.
+ *
+ * @param n
+ * capacity
+ *
+ * @return the size of the array
+ * @apiNote Used to size arrays in circular buffers (rings), usually in
+ * order to squeeze extra performance substituting {@code %} operation for
+ * {@code &}, which is up to 2 times faster.
+ */
+ public static int pow2Size(int n) {
+ if (n < 0) {
+ throw new IllegalArgumentException();
+ } else if (n == 0) {
+ return 1;
+ } else if (n >= (1 << 30)) { // 2^31 is a negative int
+ return 1 << 30;
+ } else {
+ return 1 << (32 - Integer.numberOfLeadingZeros(n - 1));
+ }
+ }
}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageQueue.java Fri Apr 13 15:06:50 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageQueue.java Fri Apr 13 15:33:13 2018 +0100
@@ -25,7 +25,6 @@
package jdk.internal.net.http.websocket;
-import jdk.internal.net.http.common.Utils;
import jdk.internal.vm.annotation.Stable;
import java.io.IOException;
@@ -36,6 +35,8 @@
import java.util.function.BiConsumer;
import java.util.function.Supplier;
+import static jdk.internal.net.http.common.Utils.pow2Size;
+
/*
* A FIFO message storage facility.
*
@@ -84,49 +85,28 @@
*/
public class MessageQueue {
- private final static boolean DEBUG = false;
-
@Stable
private final Message[] elements;
private final AtomicInteger tail = new AtomicInteger();
private volatile int head;
- public MessageQueue() {
- this(defaultSize());
- }
-
- /* Exposed for testing purposes */
- protected MessageQueue(int size) {
- if (size < 1) {
+ public MessageQueue(int capacity) {
+ if (capacity < 1) {
throw new IllegalArgumentException();
}
- Message[] array = new Message[size + 1];
+ int s = pow2Size(capacity + 1);
+ assert s % 2 == 0 : s;
+ Message[] array = new Message[s];
for (int i = 0; i < array.length; i++) {
array[i] = new Message();
}
elements = array;
}
- private static int defaultSize() {
- String property = "jdk.httpclient.websocket.outputQueueMaxSize";
- int defaultSize = 128;
- String value = Utils.getNetProperty(property);
- int size;
- if (value == null) {
- size = defaultSize;
- } else {
- try {
- size = Integer.parseUnsignedInt(value);
- } catch (NumberFormatException ignored) {
- size = defaultSize;
- }
- }
- if (DEBUG) {
- System.out.printf("[MessageQueue] %s=%s, using size %s%n",
- property, value, size);
- }
- return size;
+ /* Exposed for testing purposes */
+ protected static int effectiveCapacityOf(int n) {
+ return pow2Size(n + 1) - 1;
}
public <T> void addText(CharBuffer message,
@@ -158,7 +138,7 @@
do {
h = head;
currentTail = tail.get();
- newTail = (currentTail + 1) % elements.length;
+ newTail = (currentTail + 1) & (elements.length - 1);
if (newTail == h) {
throw new IOException("Queue full");
}
@@ -314,7 +294,7 @@
h.action = null;
h.future = null;
h.ready = false;
- head = (currentHead + 1) % elements.length;
+ head = (currentHead + 1) & (elements.length - 1);
}
private enum Type {
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportFactory.java Fri Apr 13 15:06:50 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportFactory.java Fri Apr 13 15:33:13 2018 +0100
@@ -28,5 +28,6 @@
@FunctionalInterface
public interface TransportFactory {
- Transport createTransport(MessageStreamConsumer consumer);
+ Transport createTransport(MessageQueue queue,
+ MessageStreamConsumer consumer);
}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportFactoryImpl.java Fri Apr 13 15:06:50 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportFactoryImpl.java Fri Apr 13 15:33:13 2018 +0100
@@ -34,7 +34,8 @@
}
@Override
- public Transport createTransport(MessageStreamConsumer consumer) {
- return new TransportImpl(consumer, channel);
+ public Transport createTransport(MessageQueue queue,
+ MessageStreamConsumer consumer) {
+ return new TransportImpl(queue, consumer, channel);
}
}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java Fri Apr 13 15:06:50 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java Fri Apr 13 15:33:13 2018 +0100
@@ -60,7 +60,7 @@
private final SequentialScheduler sendScheduler = new SequentialScheduler(new SendTask());
- private final MessageQueue queue = new MessageQueue();
+ private final MessageQueue queue;
private final MessageEncoder encoder = new MessageEncoder();
/* A reusable buffer for writing, initially with no remaining bytes */
private final ByteBuffer dst = createWriteBuffer().position(0).limit(0);
@@ -82,7 +82,10 @@
private volatile ChannelState readState = UNREGISTERED;
private boolean inputClosed;
private boolean outputClosed;
- public TransportImpl(MessageStreamConsumer consumer, RawChannel channel) {
+
+ public TransportImpl(MessageQueue queue, MessageStreamConsumer consumer,
+ RawChannel channel) {
+ this.queue = queue;
this.messageConsumer = consumer;
this.channel = channel;
this.decoder = new MessageDecoder(this.messageConsumer);
@@ -145,6 +148,7 @@
queue.addText(text, isLast, attachment, action, f);
sendScheduler.runOrSchedule();
} catch (IOException e) {
+ action.accept(null, e);
f.completeExceptionally(e);
}
if (debug.isLoggable(Level.DEBUG)) {
@@ -169,6 +173,7 @@
queue.addBinary(message, isLast, attachment, action, f);
sendScheduler.runOrSchedule();
} catch (IOException e) {
+ action.accept(null, e);
f.completeExceptionally(e);
}
if (debug.isLoggable(Level.DEBUG)) {
@@ -192,6 +197,7 @@
queue.addPing(message, attachment, action, f);
sendScheduler.runOrSchedule();
} catch (IOException e) {
+ action.accept(null, e);
f.completeExceptionally(e);
}
if (debug.isLoggable(Level.DEBUG)) {
@@ -215,6 +221,7 @@
queue.addPong(message, attachment, action, f);
sendScheduler.runOrSchedule();
} catch (IOException e) {
+ action.accept(null, e);
f.completeExceptionally(e);
}
if (debug.isLoggable(Level.DEBUG)) {
@@ -238,6 +245,7 @@
queue.addPong(message, attachment, action, f);
sendScheduler.runOrSchedule();
} catch (IOException e) {
+ action.accept(null, e);
f.completeExceptionally(e);
}
if (debug.isLoggable(Level.DEBUG)) {
@@ -262,6 +270,7 @@
queue.addClose(statusCode, CharBuffer.wrap(reason), attachment, action, f);
sendScheduler.runOrSchedule();
} catch (IOException e) {
+ action.accept(null, e);
f.completeExceptionally(e);
}
if (debug.isLoggable(Level.DEBUG)) {
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java Fri Apr 13 15:06:50 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java Fri Apr 13 15:33:13 2018 +0100
@@ -33,7 +33,6 @@
import jdk.internal.net.http.websocket.OpeningHandshake.Result;
import java.io.IOException;
-import java.io.InterruptedIOException;
import java.lang.System.Logger.Level;
import java.lang.ref.Reference;
import java.net.ProtocolException;
@@ -55,7 +54,6 @@
import java.util.function.Function;
import static java.util.Objects.requireNonNull;
-import static jdk.internal.net.http.common.MinimalFuture.completedFuture;
import static jdk.internal.net.http.common.MinimalFuture.failedFuture;
import static jdk.internal.net.http.websocket.StatusCodes.CLOSED_ABNORMALLY;
import static jdk.internal.net.http.websocket.StatusCodes.NO_STATUS_CODE;
@@ -160,7 +158,11 @@
this.uri = requireNonNull(uri);
this.subprotocol = requireNonNull(subprotocol);
this.listener = requireNonNull(listener);
- this.transport = transportFactory.createTransport(
+ // Why 6? 1 sendPing/sendPong + 1 sendText/sendBinary + 1 Close +
+ // 2 automatic Ping replies + 1 automatic Close = 6 messages
+ // Why 2 automatic Pong replies? One is being sent, but the byte buffer
+ // has been set to null, another just has been added.
+ this.transport = transportFactory.createTransport(new MessageQueue(6),
new SignallingMessageConsumer());
}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/websocket/BlowupOutputQueue.java Fri Apr 13 15:33:13 2018 +0100
@@ -0,0 +1,133 @@
+/*
+ * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * @test
+ * @build DummyWebSocketServer
+ * @run testng/othervm
+ * -Djdk.internal.httpclient.debug=true
+ * -Djdk.internal.httpclient.websocket.debug=true
+ * BlowupOutputQueue
+ */
+
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.http.WebSocket;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static java.net.http.HttpClient.newHttpClient;
+import static org.testng.Assert.assertFalse;
+
+public class BlowupOutputQueue extends PendingOperations {
+
+ CompletableFuture<WebSocket> cfText;
+ CompletableFuture<WebSocket> cfPing;
+ CompletableFuture<WebSocket> cfClose;
+ MockListener listener;
+
+ /*
+ * The idea is to arrange things such that the internal queue will be fully
+ * utilized and then make sure there won't be any errors caused by that.
+ *
+ * First, fill the queue with Text messages. Once done, send a Ping message.
+ * At this stage, there are at least 2 messages are in queue. Now, start
+ * receiving. Received Ping messages will cause automatic Pong replies. When
+ * all done, there will be at least 3 messages in the queue. (As at least
+ * the a single Ping has to be replied). Then send a Close message. Now
+ * there are at least 4 messages in the queue. Finally, receive the last
+ * message which is a Close message. This will cause an automatic reply with
+ * a Close message from the client side. All in all there should be at least
+ * 5 messages in the queue.
+ */
+ @Test
+ public void full() throws Exception {
+ int N = 32;
+ int[] incoming = new int[2 * (N + 1)]; // 2 bytes per message
+ for (int i = 0; i < incoming.length - 2; i += 2) {
+ // <PING>
+ incoming[i + 0] = 0x89;
+ incoming[i + 1] = 0x00;
+ }
+ // <CLOSE>
+ incoming[incoming.length - 2] = 0x88;
+ incoming[incoming.length - 1] = 0x00;
+
+ repeatable(() -> {
+ CountDownLatch allButCloseReceived = new CountDownLatch(N);
+ server = Support.writingServer(incoming);
+ server.open();
+ listener = new MockListener() {
+
+ @Override
+ protected void replenish(WebSocket webSocket) {
+ /* do nothing */
+ }
+
+ @Override
+ protected CompletionStage<?> onPing0(WebSocket webSocket,
+ ByteBuffer message) {
+ allButCloseReceived.countDown();
+ return null;
+ }
+ };
+ webSocket = newHttpClient().newWebSocketBuilder()
+ .buildAsync(server.getURI(), listener)
+ .join();
+ CharBuffer data = CharBuffer.allocate(65536);
+ for (int i = 0; ; i++) { // fill up the send buffer
+ long start = System.currentTimeMillis();
+ System.out.printf("begin cycle #%s at %s%n", i, start);
+ cfText = webSocket.sendText(data, true);
+ try {
+ cfText.get(MAX_WAIT_SEC, TimeUnit.SECONDS);
+ data.clear();
+ } catch (TimeoutException e) {
+ break;
+ } finally {
+ long stop = System.currentTimeMillis();
+ System.out.printf("end cycle #%s at %s (%s ms)%n", i, stop, stop - start);
+ }
+ }
+ cfPing = webSocket.sendPing(ByteBuffer.allocate(125));
+ webSocket.request(N);
+ allButCloseReceived.await();
+ webSocket.request(1); // Receive the last message: Close
+ return null;
+ }, () -> cfText.isDone());
+ List<MockListener.Invocation> invocations = listener.invocations();
+ cfClose = webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "ok");
+
+ assertFalse(invocations.contains(new MockListener.OnError(webSocket, IOException.class)));
+ assertFalse(cfText.isDone());
+ assertFalse(cfPing.isDone());
+ assertFalse(cfClose.isDone());
+ }
+}
--- a/test/jdk/java/net/httpclient/websocket/PendingOperations.java Fri Apr 13 15:06:50 2018 +0100
+++ b/test/jdk/java/net/httpclient/websocket/PendingOperations.java Fri Apr 13 15:33:13 2018 +0100
@@ -27,11 +27,8 @@
import java.io.IOException;
import java.net.http.WebSocket;
import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BooleanSupplier;
-import java.util.function.Predicate;
-import java.util.function.Supplier;
/* Common infrastructure for tests that check pending operations */
public class PendingOperations {
--- a/test/jdk/java/net/httpclient/websocket/Support.java Fri Apr 13 15:06:50 2018 +0100
+++ b/test/jdk/java/net/httpclient/websocket/Support.java Fri Apr 13 15:33:13 2018 +0100
@@ -126,6 +126,42 @@
};
}
+ public static DummyWebSocketServer writingServer(int... data) {
+ byte[] copy = new byte[data.length];
+ for (int i = 0; i < data.length; i++) {
+ copy[i] = (byte) data[i];
+ }
+ return new DummyWebSocketServer() {
+
+ @Override
+ protected void read(SocketChannel ch) throws IOException {
+ try {
+ Thread.sleep(Long.MAX_VALUE);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ protected void write(SocketChannel ch) throws IOException {
+ int off = 0; int n = 1; // 1 byte at a time
+ while (off + n < copy.length + n) {
+// try {
+// TimeUnit.MICROSECONDS.sleep(500);
+// } catch (InterruptedException e) {
+// return;
+// }
+ int len = Math.min(copy.length - off, n);
+ ByteBuffer bytes = ByteBuffer.wrap(copy, off, len);
+ off += len;
+ ch.write(bytes);
+ }
+ super.write(ch);
+ }
+ };
+
+ }
+
public static String stringWith2NBytes(int n) {
// -- Russian Alphabet (33 characters, 2 bytes per char) --
char[] abc = {
--- a/test/jdk/java/net/httpclient/websocket/java.net.http/jdk/internal/net/http/websocket/MessageQueueTest.java Fri Apr 13 15:06:50 2018 +0100
+++ b/test/jdk/java/net/httpclient/websocket/java.net.http/jdk/internal/net/http/websocket/MessageQueueTest.java Fri Apr 13 15:33:13 2018 +0100
@@ -45,11 +45,16 @@
import java.util.function.BiConsumer;
import java.util.function.Supplier;
+import static jdk.internal.net.http.websocket.MessageQueue.effectiveCapacityOf;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
+/*
+ * A unit test for MessageQueue. The test is aware of the details of the queue
+ * implementation.
+ */
public class MessageQueueTest {
private static final Random r = new SecureRandom();
@@ -77,18 +82,19 @@
@Test(dataProvider = "capacities")
public void fullness(int n) throws IOException {
MessageQueue q = new MessageQueue(n);
+ int cap = effectiveCapacityOf(n);
Adder adder = new Adder();
Queue<Message> referenceQueue = new LinkedList<>();
- for (int i = 0; i < n; i++) {
+ for (int i = 0; i < cap; i++) {
Message m = createRandomMessage();
referenceQueue.add(m);
adder.add(q, m);
}
- for (int i = 0; i < n + 1; i++) {
+ for (int i = 0; i < cap + 1; i++) {
Message m = createRandomMessage();
assertThrows(IOException.class, () -> adder.add(q, m));
}
- for (int i = 0; i < n; i++) {
+ for (int i = 0; i < cap; i++) {
Message expected = referenceQueue.remove();
Message actual = new Remover().removeFrom(q);
assertEquals(actual, expected);
@@ -141,12 +147,13 @@
@Test(dataProvider = "capacities")
public void caterpillarWalk(int n) throws IOException {
// System.out.println("n: " + n);
- for (int p = 1; p <= n; p++) { // pace
+ int cap = effectiveCapacityOf(n);
+ for (int p = 1; p <= cap; p++) { // pace
// System.out.println(" pace: " + p);
MessageQueue q = new MessageQueue(n);
Queue<Message> referenceQueue = new LinkedList<>();
Adder adder = new Adder();
- for (int k = 0; k < (n / p) + 1; k++) {
+ for (int k = 0; k < (cap / p) + 1; k++) {
// System.out.println(" cycle: " + k);
for (int i = 0; i < p; i++) {
Message m = createRandomMessage();
@@ -188,9 +195,6 @@
f.get(); // Just to check for exceptions
}
futures.clear();
- // Make sure the queue is full
- assertThrows(IOException.class,
- () -> adder.add(q, createRandomMessage()));
}
} finally {
executorService.shutdownNow();
@@ -257,13 +261,14 @@
public void testSingleThreaded(int n) throws IOException {
Queue<Message> referenceQueue = new LinkedList<>();
MessageQueue q = new MessageQueue(n);
+ int cap = effectiveCapacityOf(n);
Adder adder = new Adder();
- for (int i = 0; i < n; i++) {
+ for (int i = 0; i < cap; i++) {
Message m = createRandomMessage();
referenceQueue.add(m);
adder.add(q, m);
}
- for (int i = 0; i < n; i++) {
+ for (int i = 0; i < cap; i++) {
Message expected = referenceQueue.remove();
Message actual = new Remover().removeFrom(q);
assertEquals(actual, expected);