8179021: Latest bugfixes to WebSocket/HPACK from the sandbox repo
Reviewed-by: dfuchs
--- a/jdk/src/java.base/share/classes/module-info.java Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/java.base/share/classes/module-info.java Wed May 10 12:36:14 2017 +0100
@@ -190,7 +190,8 @@
jdk.unsupported;
exports jdk.internal.vm.annotation to
jdk.unsupported,
- jdk.internal.vm.ci;
+ jdk.internal.vm.ci,
+ jdk.incubator.httpclient;
exports jdk.internal.util.jar to
jdk.jartool,
jdk.jdeps,
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpRequestImpl.java Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpRequestImpl.java Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -123,7 +123,7 @@
this.method = method;
this.systemHeaders = new HttpHeadersImpl();
this.userHeaders = ImmutableHeaders.empty();
- this.uri = null;
+ this.uri = URI.create("socket://" + authority.getHostString() + ":" + Integer.toString(authority.getPort()) + "/");
this.requestProcessor = HttpRequest.noBody();
this.authority = authority;
this.secure = false;
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -27,6 +27,7 @@
import jdk.incubator.http.internal.common.ByteBufferReference;
import jdk.incubator.http.internal.common.MinimalFuture;
+import jdk.incubator.http.HttpResponse.BodyHandler;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -72,7 +73,8 @@
public void connect() throws IOException, InterruptedException {
delegate.connect();
HttpRequestImpl req = new HttpRequestImpl("CONNECT", client, address);
- Exchange<?> connectExchange = new Exchange<>(req, null);
+ MultiExchange<Void,Void> mul = new MultiExchange<>(req, client, BodyHandler.<Void>discard(null));
+ Exchange<Void> connectExchange = new Exchange<>(req, mul);
Response r = connectExchange.responseImpl(delegate);
if (r.statusCode() != 200) {
throw new IOException("Tunnel failed");
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/WebSocket.java Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/WebSocket.java Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -43,7 +43,7 @@
* <p> To create a {@code WebSocket} use a {@linkplain HttpClient#newWebSocketBuilder(
* URI, Listener) builder}. Once a {@code WebSocket} is built, it's ready
* to send and receive messages. When the {@code WebSocket} is no longer needed
- * it must be closed: a Close message must both be {@linkplain #sendClose()
+ * it must be closed: a Close message must both be {@linkplain #sendClose
* sent} and {@linkplain Listener#onClose(WebSocket, int, String) received}.
* The {@code WebSocket} may be also closed {@linkplain #abort() abruptly}.
*
@@ -94,17 +94,6 @@
int NORMAL_CLOSURE = 1000;
/**
- * The WebSocket Close message status code (<code>{@value}</code>), is
- * designated for use in applications expecting a status code to indicate
- * that the connection was closed abnormally, e.g., without sending or
- * receiving a Close message.
- *
- * @see Listener#onClose(WebSocket, int, String)
- * @see #abort()
- */
- int CLOSED_ABNORMALLY = 1006;
-
- /**
* A builder for creating {@code WebSocket} instances.
* {@Incubating}
*
@@ -509,7 +498,7 @@
*
* <p> The {@code WebSocket} will close at the earliest of completion of
* the returned {@code CompletionStage} or sending a Close message. In
- * particular, if a Close message has been {@link WebSocket#sendClose()
+ * particular, if a Close message has been {@linkplain WebSocket#sendClose
* sent} before, then this invocation completes the closing handshake
* and by the time this method is invoked, the {@code WebSocket} will
* have been closed.
@@ -643,44 +632,6 @@
CompletableFuture<WebSocket> sendText(CharSequence message, boolean isLast);
/**
- * Sends a whole Text message with characters from the given {@code
- * CharSequence}.
- *
- * <p> This is a convenience method. For the general case, use {@link
- * #sendText(CharSequence, boolean)}.
- *
- * <p> Returns a {@code CompletableFuture<WebSocket>} which completes
- * normally when the message has been sent or completes exceptionally if an
- * error occurs.
- *
- * <p> The {@code CharSequence} must not be modified until the returned
- * {@code CompletableFuture} completes (either normally or exceptionally).
- *
- * <p> The returned {@code CompletableFuture} can complete exceptionally
- * with:
- * <ul>
- * <li> {@link IllegalArgumentException} -
- * if {@code message} is a malformed UTF-16 sequence
- * <li> {@link IllegalStateException} -
- * if the {@code WebSocket} is closed;
- * or if a Close message has been sent;
- * or if there is an outstanding send operation;
- * or if a previous Binary message was sent with {@code isLast == false}
- * <li> {@link IOException} -
- * if an I/O error occurs during this operation;
- * or if the {@code WebSocket} has been closed due to an error;
- * </ul>
- *
- * @param message
- * the message
- *
- * @return a {@code CompletableFuture} with this {@code WebSocket}
- */
- default CompletableFuture<WebSocket> sendText(CharSequence message) {
- return sendText(message, true);
- }
-
- /**
* Sends a Binary message with bytes from the given {@code ByteBuffer}.
*
* <p> Returns a {@code CompletableFuture<WebSocket>} which completes
@@ -831,47 +782,10 @@
* the reason
*
* @return a {@code CompletableFuture} with this {@code WebSocket}
- *
- * @see #sendClose()
*/
CompletableFuture<WebSocket> sendClose(int statusCode, String reason);
/**
- * Sends an empty Close message.
- *
- * <p> When this method has been invoked, no further messages can be sent.
- *
- * <p> For more details on Close message see RFC 6455 section
- * <a href="https://tools.ietf.org/html/rfc6455#section-5.5.1">5.5.1. Close</a>
- *
- * <p> The method returns a {@code CompletableFuture<WebSocket>} which
- * completes normally when the message has been sent or completes
- * exceptionally if an error occurs.
- *
- * <p> The returned {@code CompletableFuture} can complete exceptionally
- * with:
- * <ul>
- * <li> {@link IOException} -
- * if an I/O error occurs during this operation;
- * or the {@code WebSocket} has been closed due to an error
- * </ul>
- *
- * <p> If this method has already been invoked or the {@code WebSocket} is
- * closed, then subsequent invocations of this method have no effect and the
- * returned {@code CompletableFuture} completes normally.
- *
- * <p> If a Close message has been {@linkplain Listener#onClose(WebSocket,
- * int, String) received} before, then this invocation completes the closing
- * handshake and by the time the returned {@code CompletableFuture}
- * completes, the {@code WebSocket} will have been closed.
- *
- * @return a {@code CompletableFuture} with this {@code WebSocket}
- *
- * @see #sendClose(int, String)
- */
- CompletableFuture<WebSocket> sendClose();
-
- /**
* Allows {@code n} more messages to be received by the {@link Listener
* Listener}.
*
@@ -928,8 +842,7 @@
* state.
*
* <p> As the result {@link Listener#onClose(WebSocket, int, String)
- * Listener.onClose} will be invoked with the status code {@link
- * #CLOSED_ABNORMALLY} unless either {@code onClose} or {@link
+ * Listener.onClose} will be invoked unless either {@code onClose} or {@link
* Listener#onError(WebSocket, Throwable) onError} has been invoked before.
* In which case no additional invocation will happen.
*
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/hpack/Decoder.java Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/hpack/Decoder.java Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2014, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2014, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -24,6 +24,8 @@
*/
package jdk.incubator.http.internal.hpack;
+import jdk.internal.vm.annotation.Stable;
+
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.ProtocolException;
@@ -60,6 +62,7 @@
*/
public final class Decoder {
+ @Stable
private static final State[] states = new State[256];
static {
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/hpack/HeaderTable.java Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/hpack/HeaderTable.java Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2014, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2014, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -24,6 +24,8 @@
*/
package jdk.incubator.http.internal.hpack;
+import jdk.internal.vm.annotation.Stable;
+
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -40,6 +42,7 @@
//
final class HeaderTable {
+ @Stable
private static final HeaderField[] staticTable = {
null, // To make index 1-based, instead of 0-based
new HeaderField(":authority"),
@@ -110,7 +113,7 @@
private static final Map<String, LinkedHashMap<String, Integer>> staticIndexes;
static {
- staticIndexes = new HashMap<>(STATIC_TABLE_LENGTH);
+ staticIndexes = new HashMap<>(STATIC_TABLE_LENGTH); // TODO: Map.of
for (int i = 1; i <= STATIC_TABLE_LENGTH; i++) {
HeaderField f = staticTable[i];
Map<String, Integer> values = staticIndexes
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/BuilderImpl.java Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/BuilderImpl.java Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -46,7 +46,7 @@
private final HttpClient client;
private final URI uri;
private final Listener listener;
- private final List<Pair<String, String>> headers = new LinkedList<>();
+ private final Collection<Pair<String, String>> headers = new LinkedList<>();
private final Collection<String> subprotocols = new LinkedList<>();
private Duration timeout;
@@ -65,17 +65,18 @@
}
@Override
- public Builder subprotocols(String mostPreferred, String... lesserPreferred)
+ public Builder subprotocols(String mostPreferred,
+ String... lesserPreferred)
{
requireNonNull(mostPreferred, "mostPreferred");
requireNonNull(lesserPreferred, "lesserPreferred");
List<String> subprotocols = new LinkedList<>();
+ subprotocols.add(mostPreferred);
for (int i = 0; i < lesserPreferred.length; i++) {
String p = lesserPreferred[i];
requireNonNull(p, "lesserPreferred[" + i + "]");
subprotocols.add(p);
}
- subprotocols.add(0, mostPreferred);
this.subprotocols.clear();
this.subprotocols.addAll(subprotocols);
return this;
@@ -98,20 +99,9 @@
Listener getListener() { return listener; }
- List<Pair<String, String>> getHeaders() { return headers; }
+ Collection<Pair<String, String>> getHeaders() { return headers; }
Collection<String> getSubprotocols() { return subprotocols; }
Duration getConnectTimeout() { return timeout; }
-
- @Override
- public String toString() {
- return "WebSocket.Builder{"
- + ", uri=" + uri
- + ", listener=" + listener
- + (!headers.isEmpty() ? ", headers=" + headers : "")
- + (!subprotocols.isEmpty() ? ", subprotocols=" + subprotocols : "")
- + ( timeout != null ? ", connectTimeout=" + timeout : "")
- + '}';
- }
}
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/CooperativeHandler.java Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/CooperativeHandler.java Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -25,70 +25,184 @@
package jdk.incubator.http.internal.websocket;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import static java.util.Objects.requireNonNull;
-final class CooperativeHandler {
+/*
+ * A synchronization aid that assists a number of parties in running a task
+ * in a mutually exclusive fashion.
+ *
+ * To run the task, a party invokes `handle`. To permanently prevent the task
+ * from subsequent runs, the party invokes `stop`.
+ *
+ * The parties do not have to operate in different threads.
+ *
+ * The task can be either synchronous or asynchronous.
+ *
+ * If the task is synchronous, it is represented with `Runnable`.
+ * The handler invokes `Runnable.run` to run the task.
+ *
+ * If the task is asynchronous, it is represented with `Consumer<Runnable>`.
+ * The handler invokes `Consumer.accept(end)` to begin the task. The task
+ * invokes `end.run()` when it has ended.
+ *
+ * The next run of the task will not begin until the previous run has finished.
+ *
+ * The task may invoke `handle()` by itself, it's a normal situation.
+ */
+public final class CooperativeHandler {
- private static final long CONTINUE = 0;
- private static final long OFF = 1;
- private static final long ON = 2;
- private static final long STOP = 4;
+ /*
+ Since the task is fixed and known beforehand, no blocking synchronization
+ (locks, queues, etc.) is required. The job can be done solely using
+ nonblocking primitives.
+
+ The machinery below addresses two problems:
+
+ 1. Running the task in a sequential order (no concurrent runs):
+
+ begin, end, begin, end...
+
+ 2. Avoiding indefinite recursion:
- private final AtomicLong state = new AtomicLong(OFF);
+ begin
+ end
+ begin
+ end
+ ...
+
+ Problem #1 is solved with a finite state machine with 4 states:
+
+ BEGIN, AGAIN, END, and STOP.
+
+ Problem #2 is solved with a "state modifier" OFFLOAD.
+
+ Parties invoke `handle()` to signal the task must run. A party that has
+ invoked `handle()` either begins the task or exploits the party that is
+ either beginning the task or ending it.
+
+ The party that is trying to end the task either ends it or begins it
+ again.
- private final Runnable task;
+ To avoid indefinite recursion, before re-running the task tryEnd() sets
+ OFFLOAD bit, signalling to its "child" tryEnd() that this ("parent")
+ tryEnd() is available and the "child" must offload the task on to the
+ "parent". Then a race begins. Whichever invocation of tryEnd() manages
+ to unset OFFLOAD bit first does not do the work.
+
+ There is at most 1 thread that is beginning the task and at most 2
+ threads that are trying to end it: "parent" and "child". In case of a
+ synchronous task "parent" and "child" are the same thread.
+ */
- CooperativeHandler(Runnable task) {
- this.task = requireNonNull(task);
+ private static final int OFFLOAD = 1;
+ private static final int AGAIN = 2;
+ private static final int BEGIN = 4;
+ private static final int STOP = 8;
+ private static final int END = 16;
+
+ private final AtomicInteger state = new AtomicInteger(END);
+ private final Consumer<Runnable> begin;
+
+ public CooperativeHandler(Runnable task) {
+ this(asyncOf(task));
+ }
+
+ public CooperativeHandler(Consumer<Runnable> begin) {
+ this.begin = requireNonNull(begin);
}
/*
- * Causes the task supplied to the constructor to run. The task may be run
- * by this thread as well as by any other that has invoked this method.
+ * Runs the task (though maybe by a different party).
*
* The recursion which is possible here will have the maximum depth of 1:
*
- * task.run()
- * this.startOrContinue()
- * task.run()
+ * this.handle()
+ * begin.accept()
+ * this.handle()
*/
- void startOrContinue() {
- long s;
+ public void handle() {
while (true) {
- s = state.get();
- if (s == OFF && state.compareAndSet(OFF, ON)) {
- // No one is running the task, we are going to run it
- break;
- }
- if (s == ON && state.compareAndSet(ON, CONTINUE)) {
- // Some other thread is running the task. We have managed to
- // update the state, it will be surely noticed by that thread.
+ int s = state.get();
+ if (s == END) {
+ if (state.compareAndSet(END, BEGIN)) {
+ break;
+ }
+ } else if ((s & BEGIN) != 0) {
+ // Tries to change the state to AGAIN, preserving OFFLOAD bit
+ if (state.compareAndSet(s, AGAIN | (s & OFFLOAD))) {
+ return;
+ }
+ } else if ((s & AGAIN) != 0 || s == STOP) {
return;
- }
- if (s == CONTINUE || s == STOP) {
- return;
+ } else {
+ throw new InternalError(String.valueOf(s));
}
}
+ begin.accept(this::tryEnd);
+ }
+
+ private void tryEnd() {
while (true) {
- task.run();
- // State checks are ordered by the probability of expected values
- // (it might be different in different usage patterns, say, when
- // invocations to `startOrContinue()` are concurrent)
- if (state.compareAndSet(ON, OFF)) {
- break; // The state hasn't changed, all done
+ int s;
+ while (((s = state.get()) & OFFLOAD) != 0) {
+ // Tries to offload ending of the task to the parent
+ if (state.compareAndSet(s, s & ~OFFLOAD)) {
+ return;
+ }
}
- if (state.compareAndSet(CONTINUE, ON)) {
- continue;
+ while (true) {
+ if (s == BEGIN) {
+ if (state.compareAndSet(BEGIN, END)) {
+ return;
+ }
+ } else if (s == AGAIN) {
+ if (state.compareAndSet(AGAIN, BEGIN | OFFLOAD)) {
+ break;
+ }
+ } else if (s == STOP) {
+ return;
+ } else {
+ throw new InternalError(String.valueOf(s));
+ }
+ s = state.get();
}
- // Other threads can change the state from CONTINUE to STOP only
- // So if it's not ON and not CONTINUE, it can only be STOP
- break;
+ begin.accept(this::tryEnd);
}
}
- void stop() {
+ /*
+ * Checks whether or not this handler has been permanently stopped.
+ *
+ * Should be used from inside the task to poll the status of the handler,
+ * pretty much the same way as it is done for threads:
+ *
+ * if (!Thread.currentThread().isInterrupted()) {
+ * ...
+ * }
+ */
+ public boolean isStopped() {
+ return state.get() == STOP;
+ }
+
+ /*
+ * Signals this handler to ignore subsequent invocations to `handle()`.
+ *
+ * If the task has already begun, this invocation will not affect it,
+ * unless the task itself uses `isStopped()` method to check the state
+ * of the handler.
+ */
+ public void stop() {
state.set(STOP);
}
+
+ private static Consumer<Runnable> asyncOf(Runnable task) {
+ requireNonNull(task);
+ return ender -> {
+ task.run();
+ ender.run();
+ };
+ }
}
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Frame.java Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Frame.java Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -25,6 +25,8 @@
package jdk.incubator.http.internal.websocket;
+import jdk.internal.vm.annotation.Stable;
+
import java.nio.ByteBuffer;
import static jdk.incubator.http.internal.common.Utils.dump;
@@ -58,6 +60,7 @@
CONTROL_0xE (0xE),
CONTROL_0xF (0xF);
+ @Stable
private static final Opcode[] opcodes;
static {
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/FrameConsumer.java Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/FrameConsumer.java Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -38,7 +38,7 @@
import static java.util.Objects.requireNonNull;
import static jdk.incubator.http.internal.common.Utils.dump;
import static jdk.incubator.http.internal.websocket.StatusCodes.NO_STATUS_CODE;
-import static jdk.incubator.http.internal.websocket.StatusCodes.checkIncomingCode;
+import static jdk.incubator.http.internal.websocket.StatusCodes.isLegalToReceiveFromServer;
/*
* Consumes frame parts and notifies a message consumer, when there is
@@ -212,20 +212,20 @@
}
switch (opcode) {
case CLOSE:
- int statusCode = NO_STATUS_CODE;
+ char statusCode = NO_STATUS_CODE;
String reason = "";
if (payloadLen != 0) {
int len = binaryData.remaining();
assert 2 <= len && len <= 125 : dump(len, payloadLen);
+ statusCode = binaryData.getChar();
+ if (!isLegalToReceiveFromServer(statusCode)) {
+ throw new FailWebSocketException(
+ "Illegal status code: " + statusCode);
+ }
try {
- statusCode = checkIncomingCode(binaryData.getChar());
reason = UTF_8.newDecoder().decode(binaryData).toString();
- } catch (CheckFailedException e) {
- throw new FailWebSocketException("Incorrect status code")
- .initCause(e);
} catch (CharacterCodingException e) {
- throw new FailWebSocketException(
- "Close reason is a malformed UTF-8 sequence")
+ throw new FailWebSocketException("Illegal close reason")
.initCause(e);
}
}
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OpeningHandshake.java Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OpeningHandshake.java Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -37,6 +37,8 @@
import jdk.incubator.http.HttpResponse;
import jdk.incubator.http.HttpResponse.BodyHandler;
import jdk.incubator.http.WebSocketHandshakeException;
+import jdk.incubator.http.internal.common.Pair;
+
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
@@ -66,7 +68,6 @@
private static final String HEADER_KEY = "Sec-WebSocket-Key";
private static final String HEADER_PROTOCOL = "Sec-WebSocket-Protocol";
private static final String HEADER_VERSION = "Sec-WebSocket-Version";
- private static final String VALUE_VERSION = "13"; // WebSocket's lucky number
private static final Set<String> FORBIDDEN_HEADERS;
@@ -106,12 +107,18 @@
if (connectTimeout != null) {
requestBuilder.timeout(connectTimeout);
}
+ for (Pair<String, String> p : b.getHeaders()) {
+ if (FORBIDDEN_HEADERS.contains(p.first)) {
+ throw illegal("Illegal header: " + p.first);
+ }
+ requestBuilder.header(p.first, p.second);
+ }
this.subprotocols = createRequestSubprotocols(b.getSubprotocols());
if (!this.subprotocols.isEmpty()) {
String p = this.subprotocols.stream().collect(Collectors.joining(", "));
requestBuilder.header(HEADER_PROTOCOL, p);
}
- requestBuilder.header(HEADER_VERSION, VALUE_VERSION);
+ requestBuilder.header(HEADER_VERSION, "13"); // WebSocket's lucky number
this.nonce = createNonce();
requestBuilder.header(HEADER_KEY, this.nonce);
// Setting request version to HTTP/1.1 forcibly, since it's not possible
@@ -133,11 +140,7 @@
if (s.trim().isEmpty() || !isValidName(s)) {
throw illegal("Bad subprotocol syntax: " + s);
}
- if (FORBIDDEN_HEADERS.contains(s)) {
- throw illegal("Forbidden header: " + s);
- }
- boolean unique = sp.add(s);
- if (!unique) {
+ if (!sp.add(s)) {
throw illegal("Duplicating subprotocol: " + s);
}
}
@@ -176,7 +179,7 @@
CompletableFuture<Result> send() {
return client.sendAsync(this.request, BodyHandler.<Void>discard(null))
- .thenCompose(this::resultFrom);
+ .thenCompose(this::resultFrom);
}
/*
@@ -283,7 +286,6 @@
private static String requireSingle(HttpHeaders responseHeaders,
String headerName)
- throws CheckFailedException
{
List<String> values = responseHeaders.allValues(headerName);
if (values.isEmpty()) {
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OutgoingMessage.java Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OutgoingMessage.java Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -60,6 +60,7 @@
*/
abstract class OutgoingMessage {
+ // Share per WebSocket?
private static final SecureRandom maskingKeys = new SecureRandom();
protected ByteBuffer[] frame;
@@ -71,6 +72,8 @@
* convenient moment (up to the point where sentTo is invoked).
*/
protected void contextualize(Context context) {
+ // masking and charset decoding should be performed here rather than in
+ // the constructor (as of today)
if (context.isCloseSent()) {
throw new IllegalStateException("Close sent");
}
@@ -101,7 +104,7 @@
private final boolean isLast;
Text(CharSequence characters, boolean isLast) {
- CharsetEncoder encoder = UTF_8.newEncoder();
+ CharsetEncoder encoder = UTF_8.newEncoder(); // Share per WebSocket?
try {
payload = encoder.encode(CharBuffer.wrap(characters));
} catch (CharacterCodingException e) {
@@ -172,11 +175,11 @@
Close(int statusCode, CharSequence reason) {
ByteBuffer payload = ByteBuffer.allocate(125)
- .putChar((char) statusCode);
+ .putChar((char) statusCode);
CoderResult result = UTF_8.newEncoder()
- .encode(CharBuffer.wrap(reason),
- payload,
- true);
+ .encode(CharBuffer.wrap(reason),
+ payload,
+ true);
if (result.isOverflow()) {
throw new IllegalArgumentException("Long reason");
} else if (result.isError()) {
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -58,8 +58,8 @@
private final Frame.Reader reader = new Frame.Reader();
private final RawChannel.RawEvent event = createHandler();
private final AtomicLong demand = new AtomicLong();
- private final CooperativeHandler receiveHandler =
- new CooperativeHandler(this::tryDeliver);
+ private final CooperativeHandler handler =
+ new CooperativeHandler(this::pushContinuously);
/*
* Used to ensure registering the channel event at most once (i.e. to avoid
* multiple registrations).
@@ -72,8 +72,8 @@
this.channel = channel;
this.data = channel.initialByteBuffer();
this.frameConsumer = new FrameConsumer(this.messageConsumer);
- // To ensure the initial `data` will be read correctly (happens-before)
- // after readable.get()
+ // To ensure the initial non-final `data` will be read correctly
+ // (happens-before) by reader after executing readable.get()
readable.set(true);
}
@@ -88,7 +88,7 @@
@Override
public void handle() {
readable.set(true);
- receiveHandler.startOrContinue();
+ handler.handle();
}
};
}
@@ -98,7 +98,7 @@
throw new IllegalArgumentException("Negative: " + n);
}
demand.accumulateAndGet(n, (p, i) -> p + i < 0 ? Long.MAX_VALUE : p + i);
- receiveHandler.startOrContinue();
+ handler.handle();
}
void acknowledge() {
@@ -113,41 +113,21 @@
* regardless of the current demand.
*/
void close() {
- receiveHandler.stop();
+ handler.stop();
}
- private void tryDeliver() {
- if (readable.get() && demand.get() > 0) {
- deliverAtMostOne();
+ private void pushContinuously() {
+ while (readable.get() && demand.get() > 0 && !handler.isStopped()) {
+ pushOnce();
}
}
- private void deliverAtMostOne() {
- if (data == null) {
- try {
- data = channel.read();
- } catch (IOException e) {
- readable.set(false);
- messageConsumer.onError(e);
- return;
- }
- if (data == null || !data.hasRemaining()) {
- readable.set(false);
- if (!data.hasRemaining()) {
- try {
- channel.registerEvent(event);
- } catch (IOException e) {
- messageConsumer.onError(e);
- return;
- }
- } else if (data == null) {
- messageConsumer.onComplete();
- }
- return;
- }
+ private void pushOnce() {
+ if (data == null && !readData()) {
+ return;
}
try {
- reader.readFrame(data, frameConsumer);
+ reader.readFrame(data, frameConsumer); // Pushing frame parts to the consumer
} catch (FailWebSocketException e) {
messageConsumer.onError(e);
return;
@@ -156,4 +136,28 @@
data = null;
}
}
+
+ private boolean readData() {
+ try {
+ data = channel.read();
+ } catch (IOException e) {
+ messageConsumer.onError(e);
+ return false;
+ }
+ if (data == null) { // EOF
+ messageConsumer.onComplete();
+ return false;
+ } else if (!data.hasRemaining()) { // No data in the socket at the moment
+ data = null;
+ readable.set(false);
+ try {
+ channel.registerEvent(event);
+ } catch (IOException e) {
+ messageConsumer.onError(e);
+ }
+ return false;
+ }
+ assert data.hasRemaining();
+ return true;
+ }
}
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/StatusCodes.java Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/StatusCodes.java Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -25,120 +25,72 @@
package jdk.incubator.http.internal.websocket;
-import static jdk.incubator.http.WebSocket.CLOSED_ABNORMALLY;
-
/*
- * Utilities and common constants for WebSocket status codes. For more details
- * on status codes and their meaning see:
+ * Utilities for WebSocket status codes.
*
* 1. https://tools.ietf.org/html/rfc6455#section-7.4
* 2. http://www.iana.org/assignments/websocket/websocket.xhtml#close-code-number
*/
final class StatusCodes {
- static final int PROTOCOL_ERROR = 1002;
- static final int CANNOT_ACCEPT = 1003;
- static final int NO_STATUS_CODE = 1005;
- static final int NOT_CONSISTENT = 1007;
- static final int TOO_BIG = 1009;
- static final int NO_EXTENSION = 1010;
- static final int SERVICE_RESTART = 1012;
- static final int TRY_AGAIN_LATER = 1013;
- static final int TLS_HANDSHAKE_FAILURE = 1015;
+ static final int PROTOCOL_ERROR = 1002;
+ static final int NO_STATUS_CODE = 1005;
+ static final int CLOSED_ABNORMALLY = 1006;
+ static final int NOT_CONSISTENT = 1007;
private StatusCodes() { }
- /*
- * Returns the given code if it doesn't violate any rules for outgoing
- * codes, otherwise throws a CFE with a detailed description.
- */
- static int checkOutgoingCode(int code) {
- checkCommon(code);
- if (code > 4999) {
- throw new CheckFailedException("Unspecified: " + code);
- }
- if (isNotUserSettable(code)) {
- throw new CheckFailedException("Cannot set: " + code);
- }
- return code;
- }
-
- /*
- * Returns the given code if it doesn't violate any rules for incoming
- * codes, otherwise throws a CFE with a detailed description.
- */
- static int checkIncomingCode(int code) {
- checkCommon(code);
- if (code == NO_EXTENSION) {
- throw new CheckFailedException("Bad server code: " + code);
+ static boolean isLegalToSendFromClient(int code) {
+ if (!isLegal(code)) {
+ return false;
}
- return code;
- }
-
- private static int checkCommon(int code) {
- if (isOutOfRange(code)) {
- throw new CheckFailedException("Out of range: " + code);
- }
- if (isForbidden(code)) {
- throw new CheckFailedException("Forbidden: " + code);
+ // Codes from unreserved range
+ if (code > 4999) {
+ return false;
}
- if (isUnassigned(code)) {
- throw new CheckFailedException("Unassigned: " + code);
- }
- return code;
- }
-
- /*
- * Returns true if the given code cannot be set by a user of the WebSocket
- * API. e.g. this code means something which only a WebSocket implementation
- * is responsible for or it doesn't make sense to be send by a WebSocket
- * client.
- */
- private static boolean isNotUserSettable(int code) {
+ // Codes below are not allowed to be sent using a WebSocket client API
switch (code) {
case PROTOCOL_ERROR:
- case CANNOT_ACCEPT:
case NOT_CONSISTENT:
- case TOO_BIG:
- case NO_EXTENSION:
- case TRY_AGAIN_LATER:
- case SERVICE_RESTART:
+ case 1003:
+ case 1009:
+ case 1010:
+ case 1012: // code sent by servers
+ case 1013: // code sent by servers
+ case 1014: // code sent by servers
+ return false;
+ default:
return true;
- default:
- return false;
}
}
- /*
- * Returns true if the given code cannot appear on the wire. It's always an
- * error to send a frame with such a code or to receive one.
- */
- private static boolean isForbidden(int code) {
+ static boolean isLegalToReceiveFromServer(int code) {
+ if (!isLegal(code)) {
+ return false;
+ }
+ return code != 1010; // code sent by clients
+ }
+
+ private static boolean isLegal(int code) {
+ // 2-byte unsigned integer excluding first 1000 numbers from the range
+ // [0, 999] which are never used
+ if (code < 1000 || code > 65535) {
+ return false;
+ }
+ // Codes from the range below has no known meaning under the WebSocket
+ // specification (i.e. unassigned/undefined)
+ if ((code >= 1016 && code <= 2999) || code == 1004) {
+ return false;
+ }
+ // Codes below cannot appear on the wire. It's always an error either
+ // to send a frame with such a code or to receive one.
switch (code) {
case NO_STATUS_CODE:
case CLOSED_ABNORMALLY:
- case TLS_HANDSHAKE_FAILURE:
+ case 1015:
+ return false;
+ default:
return true;
- default:
- return false;
}
}
-
- /*
- * Returns true if the given code has no known meaning under the WebSocket
- * specification (i.e. unassigned/undefined).
- */
- private static boolean isUnassigned(int code) {
- return (code >= 1016 && code <= 2999) || code == 1004 || code == 1014;
- }
-
- /*
- * Returns true if the given code is not in domain of status codes:
- *
- * 2-byte unsigned integer minus first 1000 numbers from the range [0, 999]
- * that are never used.
- */
- private static boolean isOutOfRange(int code) {
- return code < 1000 || code > 65535;
- }
}
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transmitter.java Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transmitter.java Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -37,11 +37,12 @@
*
* No matter whether the message has been fully sent or an error has occurred,
* the transmitter reports the outcome to the supplied handler and becomes ready
- * to accept a new message. Until then, it is considered "busy" and an
- * IllegalStateException will be thrown on each attempt to invoke send.
+ * to accept a new message. Until then, the transmitter is considered "busy" and
+ * an IllegalStateException will be thrown on each attempt to invoke send.
*/
final class Transmitter {
+ /* This flag is used solely for assertions */
private final AtomicBoolean busy = new AtomicBoolean();
private OutgoingMessage message;
private Consumer<Exception> completionHandler;
@@ -53,9 +54,10 @@
this.event = createHandler();
}
- /*
- * The supplied handler may be invoked in the calling thread, so watch out
- * for stack overflow.
+ /**
+ * The supplied handler may be invoked in the calling thread.
+ * A {@code StackOverflowError} may thus occur if there's a possibility
+ * that this method is called again by the supplied handler.
*/
void send(OutgoingMessage message, Consumer<Exception> completionHandler) {
requireNonNull(message);
@@ -86,8 +88,9 @@
private void send0(OutgoingMessage message, Consumer<Exception> handler) {
boolean b = busy.get();
assert b; // Please don't inline this, as busy.get() has memory
- // visibility effects and we don't want the correctness
- // of the algorithm to depend on assertions flag
+ // visibility effects and we don't want the program behaviour
+ // to depend on whether the assertions are turned on
+ // or turned off
try {
boolean sent = message.sendTo(channel);
if (sent) {
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -51,9 +51,9 @@
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static jdk.incubator.http.internal.common.Pair.pair;
+import static jdk.incubator.http.internal.websocket.StatusCodes.CLOSED_ABNORMALLY;
import static jdk.incubator.http.internal.websocket.StatusCodes.NO_STATUS_CODE;
-import static jdk.incubator.http.internal.websocket.StatusCodes.TLS_HANDSHAKE_FAILURE;
-import static jdk.incubator.http.internal.websocket.StatusCodes.checkOutgoingCode;
+import static jdk.incubator.http.internal.websocket.StatusCodes.isLegalToSendFromClient;
/*
* A WebSocket client.
@@ -74,8 +74,8 @@
private final AtomicBoolean outstandingSend = new AtomicBoolean();
private final CooperativeHandler sendHandler =
new CooperativeHandler(this::sendFirst);
- private final Queue<Pair<OutgoingMessage, Consumer<Exception>>> queue =
- new ConcurrentLinkedQueue<>();
+ private final Queue<Pair<OutgoingMessage, CompletableFuture<WebSocket>>>
+ queue = new ConcurrentLinkedQueue<>();
private final Context context = new OutgoingMessage.Context();
private final Transmitter transmitter;
private final Receiver receiver;
@@ -110,6 +110,9 @@
r.subprotocol,
r.channel,
b.getListener());
+ // The order of calls might cause a subtle effects, like CF will be
+ // returned from the buildAsync _after_ onOpen has been signalled.
+ // This means if onOpen is lengthy, it might cause some problems.
ws.signalOpen();
return ws;
};
@@ -125,7 +128,8 @@
WebSocketImpl(URI uri,
String subprotocol,
RawChannel channel,
- Listener listener) {
+ Listener listener)
+ {
this.uri = requireNonNull(uri);
this.subprotocol = requireNonNull(subprotocol);
this.channel = requireNonNull(channel);
@@ -182,15 +186,17 @@
* Processes a Close event that came from the channel. Invoked at most once.
*/
private void processClose(int statusCode, String reason) {
- assert statusCode != TLS_HANDSHAKE_FAILURE; // TLS problems happen long before WebSocket is alive
receiver.close();
try {
channel.shutdownInput();
} catch (IOException e) {
Log.logError(e);
}
- boolean wasComplete = !closeReceived.complete(null);
- if (wasComplete) {
+ boolean alreadyCompleted = !closeReceived.complete(null);
+ if (alreadyCompleted) {
+ // This CF is supposed to be completed only once, the first time a
+ // Close message is received. No further messages are pulled from
+ // the socket.
throw new InternalError();
}
int code;
@@ -261,19 +267,17 @@
@Override
public CompletableFuture<WebSocket> sendClose(int statusCode,
String reason) {
- try {
- checkOutgoingCode(statusCode);
- } catch (CheckFailedException e) {
- IllegalArgumentException ex = new IllegalArgumentException(
- "Bad status code: " + statusCode, e);
- failedFuture(ex);
+ if (!isLegalToSendFromClient(statusCode)) {
+ return failedFuture(
+ new IllegalArgumentException("statusCode: " + statusCode));
}
- return enqueueClose(new Close(statusCode, reason));
- }
-
- @Override
- public CompletableFuture<WebSocket> sendClose() {
- return enqueueClose(new Close());
+ Close msg;
+ try {
+ msg = new Close(statusCode, reason);
+ } catch (IllegalArgumentException e) {
+ return failedFuture(e);
+ }
+ return enqueueClose(msg);
}
/*
@@ -288,8 +292,8 @@
} catch (IOException e) {
Log.logError(e);
}
- boolean wasComplete = !closeSent.complete(null);
- if (wasComplete) {
+ boolean alreadyCompleted = !closeSent.complete(null);
+ if (alreadyCompleted) {
// Shouldn't happen as this callback must run at most once
throw new InternalError();
}
@@ -316,40 +320,41 @@
private CompletableFuture<WebSocket> enqueue(OutgoingMessage m) {
CompletableFuture<WebSocket> cf = new CompletableFuture<>();
- Consumer<Exception> h = e -> {
- if (e == null) {
- cf.complete(WebSocketImpl.this);
- sendHandler.startOrContinue();
- } else {
-
-// what if this is not a users message? (must be different entry points for different messages)
-
- // TODO: think about correct behaviour in the face of error in
- // the queue, for now it seems like the best solution is to
- // deliver the error and stop
- cf.completeExceptionally(e);
- }
- };
- queue.add(pair(m, h)); // Always returns true
- sendHandler.startOrContinue();
+ boolean added = queue.add(pair(m, cf));
+ if (!added) {
+ // The queue is supposed to be unbounded
+ throw new InternalError();
+ }
+ sendHandler.handle();
return cf;
}
- private void sendFirst() {
- Pair<OutgoingMessage, Consumer<Exception>> p = queue.poll();
+ /*
+ * This is the main sending method. It may be run in different threads,
+ * but never concurrently.
+ */
+ private void sendFirst(Runnable whenSent) {
+ Pair<OutgoingMessage, CompletableFuture<WebSocket>> p = queue.poll();
if (p == null) {
+ whenSent.run();
return;
}
OutgoingMessage message = p.first;
- Consumer<Exception> h = p.second;
+ CompletableFuture<WebSocket> cf = p.second;
try {
- // At this point messages are finally ordered and will be written
- // one by one in a mutually exclusive fashion; thus it's a pretty
- // convenient place to contextualize them
message.contextualize(context);
+ Consumer<Exception> h = e -> {
+ if (e == null) {
+ cf.complete(WebSocketImpl.this);
+ } else {
+ cf.completeExceptionally(e);
+ }
+ sendHandler.handle();
+ whenSent.run();
+ };
transmitter.send(message, h);
} catch (Exception t) {
- h.accept(t);
+ cf.completeExceptionally(t);
}
}
@@ -381,7 +386,7 @@
@Override
public String toString() {
return super.toString()
- + "[" + (closed.get() ? "OPEN" : "CLOSED") + "]: " + uri
+ + "[" + (closed.get() ? "CLOSED" : "OPEN") + "]: " + uri
+ (!subprotocol.isEmpty() ? ", subprotocol=" + subprotocol : "");
}
@@ -476,7 +481,9 @@
int code = ((FailWebSocketException) error).getStatusCode();
enqueueClose(new Close(code, ""))
.whenComplete((r, e) -> {
- ex.addSuppressed(e);
+ if (e != null) {
+ ex.addSuppressed(e);
+ }
try {
channel.close();
} catch (IOException e1) {
--- a/jdk/test/java/net/httpclient/whitebox/Driver.java Wed May 10 09:02:43 2017 +0200
+++ b/jdk/test/java/net/httpclient/whitebox/Driver.java Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -26,5 +26,6 @@
* @bug 8151299 8164704
* @modules jdk.incubator.httpclient
* @run testng jdk.incubator.httpclient/jdk.incubator.http.SelectorTest
+ * @run testng jdk.incubator.httpclient/jdk.incubator.http.RawChannelTest
* @run testng jdk.incubator.httpclient/jdk.incubator.http.ResponseHeadersTest
*/
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/RawChannelTest.java Wed May 10 12:36:14 2017 +0100
@@ -0,0 +1,287 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.incubator.http;
+
+import jdk.incubator.http.internal.websocket.RawChannel;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static jdk.incubator.http.HttpResponse.BodyHandler.discard;
+import static org.testng.Assert.assertEquals;
+
+/*
+ * This test exercises mechanics of _independent_ reads and writes on the
+ * RawChannel. It verifies that the underlying implementation can manage more
+ * than a single type of notifications at the same time.
+ */
+public class RawChannelTest {
+
+ private final AtomicLong clientWritten = new AtomicLong();
+ private final AtomicLong serverWritten = new AtomicLong();
+ private final AtomicLong clientRead = new AtomicLong();
+ private final AtomicLong serverRead = new AtomicLong();
+
+ /*
+ * Since at this level we don't have any control over the low level socket
+ * parameters, this latch ensures a write to the channel will stall at least
+ * once (socket's send buffer filled up).
+ */
+ private final CountDownLatch writeStall = new CountDownLatch(1);
+
+ /*
+ * This one works similarly by providing means to ensure a read from the
+ * channel will stall at least once (no more data available on the socket).
+ */
+ private final CountDownLatch readStall = new CountDownLatch(1);
+
+ private final AtomicInteger writeHandles = new AtomicInteger();
+ private final AtomicInteger readHandles = new AtomicInteger();
+
+ private final CountDownLatch exit = new CountDownLatch(1);
+
+ @Test
+ public void test() throws Exception {
+ try (ServerSocket server = new ServerSocket(0)) {
+ int port = server.getLocalPort();
+ new TestServer(server).start();
+ final RawChannel chan = channelOf(port);
+
+ // It's very important not to forget the initial bytes, possibly
+ // left from the HTTP thingy
+ int initialBytes = chan.initialByteBuffer().remaining();
+ print("RawChannel has %s initial bytes", initialBytes);
+ clientRead.addAndGet(initialBytes);
+
+ chan.registerEvent(new RawChannel.RawEvent() {
+
+ private final ByteBuffer reusableBuffer = ByteBuffer.allocate(32768);
+
+ @Override
+ public int interestOps() {
+ return SelectionKey.OP_WRITE;
+ }
+
+ @Override
+ public void handle() {
+ int i = writeHandles.incrementAndGet();
+ print("OP_WRITE #%s", i);
+ if (i > 3) { // Fill up the send buffer not more than 3 times
+ try {
+ chan.shutdownOutput();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return;
+ }
+ long total = 0;
+ try {
+ long n;
+ do {
+ ByteBuffer[] array = {reusableBuffer.slice()};
+ n = chan.write(array, 0, 1);
+ total += n;
+ } while (n > 0);
+ print("OP_WRITE clogged SNDBUF with %s bytes", total);
+ clientWritten.addAndGet(total);
+ chan.registerEvent(this);
+ writeStall.countDown(); // signal send buffer is full
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ });
+
+ chan.registerEvent(new RawChannel.RawEvent() {
+
+ @Override
+ public int interestOps() {
+ return SelectionKey.OP_READ;
+ }
+
+ @Override
+ public void handle() {
+ int i = readHandles.incrementAndGet();
+ print("OP_READ #%s", i);
+ ByteBuffer read = null;
+ long total = 0;
+ while (true) {
+ try {
+ read = chan.read();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ if (read == null) {
+ print("OP_READ EOF");
+ break;
+ } else if (!read.hasRemaining()) {
+ print("OP_READ stall");
+ try {
+ chan.registerEvent(this);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ readStall.countDown();
+ break;
+ }
+ int r = read.remaining();
+ total += r;
+ clientRead.addAndGet(r);
+ }
+ print("OP_READ read %s bytes", total);
+ }
+ });
+ exit.await(); // All done, we need to compare results:
+ assertEquals(clientRead.get(), serverWritten.get());
+ assertEquals(serverRead.get(), clientWritten.get());
+ }
+ }
+
+ private static RawChannel channelOf(int port) throws Exception {
+ URI uri = URI.create("http://127.0.0.1:" + port + "/");
+ print("raw channel to %s", uri.toString());
+ HttpRequest req = HttpRequest.newBuilder(uri).build();
+ HttpResponse<?> r = HttpClient.newHttpClient().send(req, discard(null));
+ r.body();
+ return ((HttpResponseImpl) r).rawChannel();
+ }
+
+ private class TestServer extends Thread { // Powered by Slowpokes
+
+ private final ServerSocket server;
+
+ TestServer(ServerSocket server) throws IOException {
+ this.server = server;
+ }
+
+ @Override
+ public void run() {
+ try (Socket s = server.accept()) {
+ InputStream is = s.getInputStream();
+ OutputStream os = s.getOutputStream();
+
+ processHttp(is, os);
+
+ Thread reader = new Thread(() -> {
+ try {
+ long n = readSlowly(is);
+ print("Server read %s bytes", n);
+ serverRead.addAndGet(n);
+ s.shutdownInput();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+
+ Thread writer = new Thread(() -> {
+ try {
+ long n = writeSlowly(os);
+ print("Server written %s bytes", n);
+ serverWritten.addAndGet(n);
+ s.shutdownOutput();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+
+ reader.start();
+ writer.start();
+
+ reader.join();
+ writer.join();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ exit.countDown();
+ }
+ }
+
+ private void processHttp(InputStream is, OutputStream os)
+ throws IOException
+ {
+ os.write("HTTP/1.1 200 OK\r\nContent-length: 0\r\n\r\n".getBytes());
+ byte[] buf = new byte[1024];
+ String s = "";
+ while (true) {
+ int n = is.read(buf);
+ if (n <= 0) {
+ throw new RuntimeException("Unexpected end of request");
+ }
+ s = s + new String(buf, 0, n);
+ if (s.contains("\r\n\r\n")) {
+ break;
+ }
+ }
+ }
+
+ private long writeSlowly(OutputStream os) throws Exception {
+ byte[] first = byteArrayOfSize(1024);
+ long total = first.length;
+ os.write(first);
+ // Let's wait for the signal from the raw channel that its read has
+ // stalled, and then continue sending a bit more stuff
+ readStall.await();
+ for (int i = 0; i < 32; i++) {
+ byte[] b = byteArrayOfSize(1024);
+ os.write(b);
+ total += b.length;
+ TimeUnit.MILLISECONDS.sleep(1);
+ }
+ return total;
+ }
+
+ private long readSlowly(InputStream is) throws Exception {
+ // Wait for the raw channel to fill up the its send buffer
+ writeStall.await();
+ long overall = 0;
+ byte[] array = new byte[1024];
+ for (int n = 0; n != -1; n = is.read(array)) {
+ TimeUnit.MILLISECONDS.sleep(1);
+ overall += n;
+ }
+ return overall;
+ }
+ }
+
+ private static void print(String format, Object... args) {
+ System.out.println(Thread.currentThread() + ": " + String.format(format, args));
+ }
+
+ private static byte[] byteArrayOfSize(int bound) {
+ return new byte[new Random().nextInt(1 + bound)];
+ }
+}