http-client-branch: (WebSocket) moving http proxy-related code to OpeningHandshake
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OpeningHandshake.java Wed Nov 22 11:21:36 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OpeningHandshake.java Wed Nov 22 17:13:35 2017 +0300
@@ -28,7 +28,9 @@
import jdk.incubator.http.internal.common.MinimalFuture;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.net.Proxy;
+import java.net.ProxySelector;
import java.net.URI;
import java.net.URISyntaxException;
import jdk.incubator.http.HttpClient;
@@ -39,7 +41,9 @@
import jdk.incubator.http.HttpResponse.BodyHandler;
import jdk.incubator.http.WebSocketHandshakeException;
import jdk.incubator.http.internal.common.Pair;
+import jdk.incubator.http.internal.common.Utils;
+import java.net.URLPermission;
import java.nio.charset.StandardCharsets;
import java.security.AccessController;
import java.security.MessageDigest;
@@ -57,12 +61,14 @@
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static java.lang.String.format;
import static jdk.incubator.http.internal.common.Utils.isValidName;
+import static jdk.incubator.http.internal.common.Utils.permissionForProxy;
import static jdk.incubator.http.internal.common.Utils.stringOf;
-final class OpeningHandshake {
+public class OpeningHandshake {
private static final String HEADER_CONNECTION = "Connection";
private static final String HEADER_UPGRADE = "Upgrade";
@@ -83,7 +89,7 @@
HEADER_VERSION));
}
- private static final SecureRandom srandom = new SecureRandom();
+ private static final SecureRandom random = new SecureRandom();
private final MessageDigest sha1;
private final HttpClient client;
@@ -102,7 +108,10 @@
private final Collection<String> subprotocols;
private final String nonce;
- OpeningHandshake(BuilderImpl b, Proxy proxy) {
+ public OpeningHandshake(BuilderImpl b) {
+ checkURI(b.getUri());
+ Proxy proxy = proxyFor(b.getProxySelector(), b.getUri());
+ checkPermissions(b, proxy);
this.client = b.getClient();
URI httpURI = createRequestURI(b.getUri());
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder(httpURI);
@@ -158,12 +167,8 @@
* https://tools.ietf.org/html/rfc6455#section-3
*/
static URI createRequestURI(URI uri) {
- String s = uri.getScheme(); // The scheme might be null (i.e. undefined)
- if (!("ws".equalsIgnoreCase(s) || "wss".equalsIgnoreCase(s))
- || uri.getFragment() != null)
- {
- throw illegal("Bad URI: " + uri);
- }
+ String s = uri.getScheme();
+ assert "ws".equalsIgnoreCase(s) || "wss".equalsIgnoreCase(s);
String scheme = "ws".equalsIgnoreCase(s) ? "http" : "https";
try {
return new URI(scheme,
@@ -175,11 +180,11 @@
null); // No fragment
} catch (URISyntaxException e) {
// Shouldn't happen: URI invariant
- throw new InternalError(e); // TODO: should actually report on this instead of throwing in builder (rev. 47704:34d7cc00f87a4b18b6c30c122fc3d55456833ae0)
+ throw new InternalError(e);
}
}
- CompletableFuture<Result> send() {
+ public CompletableFuture<Result> send() {
PrivilegedAction<CompletableFuture<Result>> pa = () ->
client.sendAsync(this.request, BodyHandler.<Void>discard(null))
.thenCompose(this::resultFrom);
@@ -251,9 +256,7 @@
String expected = Base64.getEncoder().encodeToString(this.sha1.digest());
String actual = requireSingle(headers, HEADER_ACCEPT);
if (!actual.trim().equals(expected)) {
- // TODO: why do we need the value here?
- throw checkFailed("Bad " + HEADER_ACCEPT + ", expected:["
- + expected + "] ,got:[" + actual.trim() + "]");
+ throw checkFailed("Bad " + HEADER_ACCEPT);
}
String subprotocol = checkAndReturnSubprotocol(headers);
RawChannel channel = ((RawChannel.Provider) response).rawChannel();
@@ -306,15 +309,69 @@
private static String createNonce() {
byte[] bytes = new byte[16];
- OpeningHandshake.srandom.nextBytes(bytes);
+ OpeningHandshake.random.nextBytes(bytes);
return Base64.getEncoder().encodeToString(bytes);
}
+ private static CheckFailedException checkFailed(String message) {
+ throw new CheckFailedException(message);
+ }
+
+ private static URI checkURI(URI uri) {
+ String scheme = uri.getScheme();
+ if (!("ws".equalsIgnoreCase(scheme) || "wss".equalsIgnoreCase(scheme)))
+ throw illegal("invalid URI scheme: " + scheme);
+ if (uri.getHost() == null)
+ throw illegal("URI must contain a host: " + uri);
+ if (uri.getFragment() != null)
+ throw illegal("URI must not contain a fragment: " + uri);
+ return uri;
+ }
+
private static IllegalArgumentException illegal(String message) {
return new IllegalArgumentException(message);
}
- private static CheckFailedException checkFailed(String message) {
- throw new CheckFailedException(message);
+ /**
+ * Returns the proxy for the given URI when sent through the given client,
+ * or {@code null} if none is required or applicable.
+ */
+ private static Proxy proxyFor(Optional<ProxySelector> selector, URI uri) {
+ if (!selector.isPresent()) {
+ return null;
+ }
+ URI requestURI = createRequestURI(uri); // Based on the HTTP scheme
+ List<Proxy> pl = selector.get().select(requestURI);
+ if (pl.isEmpty()) {
+ return null;
+ }
+ Proxy proxy = pl.get(0);
+ if (proxy.type() != Proxy.Type.HTTP) {
+ return null;
+ }
+ return proxy;
+ }
+
+ /**
+ * Performs the necessary security permissions checks to connect ( possibly
+ * through a proxy ) to the builders WebSocket URI.
+ *
+ * @throws SecurityException if the security manager denies access
+ */
+ static void checkPermissions(BuilderImpl b, Proxy proxy) {
+ SecurityManager sm = System.getSecurityManager();
+ if (sm == null) {
+ return;
+ }
+ Stream<String> headers = b.getHeaders().stream().map(p -> p.first).distinct();
+ URLPermission perm1 = Utils.permissionForServer(b.getUri(), "", headers);
+ sm.checkPermission(perm1);
+ if (proxy == null) {
+ return;
+ }
+ URLPermission perm2 = permissionForProxy((InetSocketAddress) proxy.address());
+ if (perm2 != null) {
+ sm.checkPermission(perm2);
+ }
}
}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java Wed Nov 22 11:21:36 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java Wed Nov 22 17:13:35 2017 +0300
@@ -27,15 +27,9 @@
import java.io.IOException;
import java.lang.ref.Reference;
-import java.net.InetSocketAddress;
import java.net.ProtocolException;
-import java.net.Proxy;
-import java.net.ProxySelector;
import java.net.URI;
-import java.net.URLPermission;
import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
@@ -61,11 +55,9 @@
import jdk.incubator.http.internal.websocket.OutgoingMessage.Pong;
import jdk.incubator.http.internal.websocket.OutgoingMessage.Text;
-import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static jdk.incubator.http.internal.common.MinimalFuture.failedFuture;
import static jdk.incubator.http.internal.common.Pair.pair;
-import static jdk.incubator.http.internal.common.Utils.permissionForProxy;
import static jdk.incubator.http.internal.websocket.StatusCodes.CLOSED_ABNORMALLY;
import static jdk.incubator.http.internal.websocket.StatusCodes.NO_STATUS_CODE;
import static jdk.incubator.http.internal.websocket.StatusCodes.isLegalToSendFromClient;
@@ -77,7 +69,7 @@
private final URI uri;
private final String subprotocol;
- private final RawChannel channel;
+ private final RawChannel channel; /* Stored to call close() on */
private final Listener listener;
private volatile boolean intputClosed;
@@ -118,83 +110,10 @@
*/
private final Object lock = new Object();
- private final CompletableFuture<?> closeReceived = new MinimalFuture<>();
- private final CompletableFuture<?> closeSent = new MinimalFuture<>();
-
- /**
- * Returns the proxy for the given URI when sent through the given client,
- * or {@code null} if none is required or applicable.
- */
- private static Proxy proxyFor(Optional<ProxySelector> selector, URI uri) {
- if (!selector.isPresent()) {
- return null;
- }
- URI requestURI = OpeningHandshake.createRequestURI(uri); // based on the HTTP scheme
- List<Proxy> pl = selector.get().select(requestURI);
- if (pl.isEmpty()) {
- return null;
- }
- Proxy proxy = pl.get(0);
- if (proxy.type() != Proxy.Type.HTTP) {
- return null;
- }
- return proxy;
- }
-
- /**
- * Performs the necessary security permissions checks to connect ( possibly
- * through a proxy ) to the builders WebSocket URI.
- *
- * @throws SecurityException if the security manager denies access
- */
- static void checkPermissions(BuilderImpl b, Proxy proxy) {
- SecurityManager sm = System.getSecurityManager();
- if (sm == null) {
- return;
- }
- URLPermission perm1 = Utils.permissionForServer(
- b.getUri(), "", b.getHeaders().stream().map(p -> p.first).distinct());
- sm.checkPermission(perm1);
- if (proxy == null) {
- return;
- }
- URLPermission perm2 = permissionForProxy((InetSocketAddress) proxy.address());
- if (perm2 != null) {
- sm.checkPermission(perm2);
- }
- }
-
- private static IllegalArgumentException newIAE(String message, Object... args) {
- return new IllegalArgumentException(format(message, args));
- }
-
- private static URI checkURI(URI uri) {
- String scheme = uri.getScheme();
- if (scheme == null)
- throw newIAE("URI with undefined scheme");
- scheme = scheme.toLowerCase();
- if (!(scheme.equals("ws") || scheme.equals("wss")))
- throw newIAE("invalid URI scheme %s", scheme);
- if (uri.getHost() == null)
- throw newIAE("URI must contain a host: %s", uri);
- if (uri.getFragment() != null)
- throw newIAE("URI must not contain a fragment: %s", uri);
- return uri;
- }
+ private final CompletableFuture<?> channelInputClosed = new MinimalFuture<>();
+ private final CompletableFuture<?> channelOutputClosed = new MinimalFuture<>();
static CompletableFuture<WebSocket> newInstanceAsync(BuilderImpl b) {
- try {
- checkURI(b.getUri());
- } catch (IllegalArgumentException e) {
- return failedFuture(e);
- }
- Proxy proxy = proxyFor(b.getProxySelector(), b.getUri());
- try {
- checkPermissions(b, proxy);
- } catch (Throwable throwable) {
- return failedFuture(throwable);
- }
-
Function<Result, WebSocket> newWebSocket = r -> {
WebSocketImpl ws = new WebSocketImpl(b.getUri(),
r.subprotocol,
@@ -213,8 +132,8 @@
};
OpeningHandshake h;
try {
- h = new OpeningHandshake(b, proxy);
- } catch (IllegalArgumentException e) {
+ h = new OpeningHandshake(b);
+ } catch (Exception e) {
return failedFuture(e);
}
return h.send().thenApply(newWebSocket);
@@ -231,10 +150,10 @@
this.listener = requireNonNull(listener);
this.transmitter = new Transmitter(channel);
this.receiver = new Receiver(messageConsumerOf(listener), channel);
- this.sendScheduler = new SequentialScheduler(new SendFirstTask());
+ this.sendScheduler = new SequentialScheduler(new SendTask());
- // Set up the Closing Handshake action
- CompletableFuture.allOf(closeReceived, closeSent)
+ // Set up automatic channel closing action
+ CompletableFuture.allOf(channelInputClosed, channelOutputClosed)
.whenComplete((result, error) -> {
try {
channel.close();
@@ -289,7 +208,7 @@
} catch (IOException e) {
Log.logError(e);
}
- boolean alreadyCompleted = !closeReceived.complete(null);
+ boolean alreadyCompleted = !channelInputClosed.complete(null);
if (alreadyCompleted) {
// This CF is supposed to be completed only once, the first time a
// Close message is received. No further messages are pulled from
@@ -364,7 +283,6 @@
@Override
public CompletableFuture<WebSocket> sendClose(int statusCode,
String reason) {
- outputClosed = true;
if (!isLegalToSendFromClient(statusCode)) {
return failedFuture(
new IllegalArgumentException("statusCode: " + statusCode));
@@ -375,6 +293,7 @@
} catch (IllegalArgumentException e) {
return failedFuture(e);
}
+ outputClosed = true;
return enqueueClose(msg);
}
@@ -398,7 +317,7 @@
} catch (IOException e) {
Log.logError(e);
}
- closeSent.complete(null);
+ channelOutputClosed.complete(null);
}
});
}
@@ -430,10 +349,11 @@
}
/*
- * This is the main sending task. It may be run in different threads,
- * but never concurrently.
+ * This is a message sending task. It pulls messages from the queue one by
+ * one and sends them. It may be run in different threads, but never
+ * concurrently.
*/
- private class SendFirstTask implements SequentialScheduler.RestartableTask {
+ private class SendTask implements SequentialScheduler.RestartableTask {
@Override
public void run(DeferredCompleter taskCompleter) {
Pair<OutgoingMessage, CompletableFuture<WebSocket>> p = queue.poll();
@@ -452,6 +372,10 @@
cf.completeExceptionally(e);
}
taskCompleter.complete();
+ // More than a single message may have been enqueued while
+ // the task has been busy with the current message, but
+ // there only one signal is recorded
+ sendScheduler.runOrSchedule();
};
transmitter.send(message, h);
} catch (Exception t) {