http-client-branch: (WebSocket) moving http proxy-related code to OpeningHandshake http-client-branch
authorprappo
Wed, 22 Nov 2017 17:13:35 +0300
branchhttp-client-branch
changeset 55853 937985fc3c45
parent 55852 32f6aefec11e
child 55854 8898d000aec5
http-client-branch: (WebSocket) moving http proxy-related code to OpeningHandshake
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OpeningHandshake.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OpeningHandshake.java	Wed Nov 22 11:21:36 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OpeningHandshake.java	Wed Nov 22 17:13:35 2017 +0300
@@ -28,7 +28,9 @@
 import jdk.incubator.http.internal.common.MinimalFuture;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.Proxy;
+import java.net.ProxySelector;
 import java.net.URI;
 import java.net.URISyntaxException;
 import jdk.incubator.http.HttpClient;
@@ -39,7 +41,9 @@
 import jdk.incubator.http.HttpResponse.BodyHandler;
 import jdk.incubator.http.WebSocketHandshakeException;
 import jdk.incubator.http.internal.common.Pair;
+import jdk.incubator.http.internal.common.Utils;
 
+import java.net.URLPermission;
 import java.nio.charset.StandardCharsets;
 import java.security.AccessController;
 import java.security.MessageDigest;
@@ -57,12 +61,14 @@
 import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static java.lang.String.format;
 import static jdk.incubator.http.internal.common.Utils.isValidName;
+import static jdk.incubator.http.internal.common.Utils.permissionForProxy;
 import static jdk.incubator.http.internal.common.Utils.stringOf;
 
-final class OpeningHandshake {
+public class OpeningHandshake {
 
     private static final String HEADER_CONNECTION = "Connection";
     private static final String HEADER_UPGRADE    = "Upgrade";
@@ -83,7 +89,7 @@
                                        HEADER_VERSION));
     }
 
-    private static final SecureRandom srandom = new SecureRandom();
+    private static final SecureRandom random = new SecureRandom();
 
     private final MessageDigest sha1;
     private final HttpClient client;
@@ -102,7 +108,10 @@
     private final Collection<String> subprotocols;
     private final String nonce;
 
-    OpeningHandshake(BuilderImpl b, Proxy proxy) {
+    public OpeningHandshake(BuilderImpl b) {
+        checkURI(b.getUri());
+        Proxy proxy = proxyFor(b.getProxySelector(), b.getUri());
+        checkPermissions(b, proxy);
         this.client = b.getClient();
         URI httpURI = createRequestURI(b.getUri());
         HttpRequest.Builder requestBuilder = HttpRequest.newBuilder(httpURI);
@@ -158,12 +167,8 @@
      * https://tools.ietf.org/html/rfc6455#section-3
      */
     static URI createRequestURI(URI uri) {
-        String s = uri.getScheme(); // The scheme might be null (i.e. undefined)
-        if (!("ws".equalsIgnoreCase(s) || "wss".equalsIgnoreCase(s))
-                || uri.getFragment() != null)
-        {
-            throw illegal("Bad URI: " + uri);
-        }
+        String s = uri.getScheme();
+        assert "ws".equalsIgnoreCase(s) || "wss".equalsIgnoreCase(s);
         String scheme = "ws".equalsIgnoreCase(s) ? "http" : "https";
         try {
             return new URI(scheme,
@@ -175,11 +180,11 @@
                            null); // No fragment
         } catch (URISyntaxException e) {
             // Shouldn't happen: URI invariant
-            throw new InternalError(e); // TODO: should actually report on this instead of throwing in builder (rev. 47704:34d7cc00f87a4b18b6c30c122fc3d55456833ae0)
+            throw new InternalError(e);
         }
     }
 
-    CompletableFuture<Result> send() {
+    public CompletableFuture<Result> send() {
         PrivilegedAction<CompletableFuture<Result>> pa = () ->
                 client.sendAsync(this.request, BodyHandler.<Void>discard(null))
                       .thenCompose(this::resultFrom);
@@ -251,9 +256,7 @@
         String expected = Base64.getEncoder().encodeToString(this.sha1.digest());
         String actual = requireSingle(headers, HEADER_ACCEPT);
         if (!actual.trim().equals(expected)) {
-            // TODO: why do we need the value here?
-            throw checkFailed("Bad " + HEADER_ACCEPT + ", expected:["
-                              + expected + "] ,got:[" + actual.trim() + "]");
+            throw checkFailed("Bad " + HEADER_ACCEPT);
         }
         String subprotocol = checkAndReturnSubprotocol(headers);
         RawChannel channel = ((RawChannel.Provider) response).rawChannel();
@@ -306,15 +309,69 @@
 
     private static String createNonce() {
         byte[] bytes = new byte[16];
-        OpeningHandshake.srandom.nextBytes(bytes);
+        OpeningHandshake.random.nextBytes(bytes);
         return Base64.getEncoder().encodeToString(bytes);
     }
 
+    private static CheckFailedException checkFailed(String message) {
+        throw new CheckFailedException(message);
+    }
+
+    private static URI checkURI(URI uri) {
+        String scheme = uri.getScheme();
+        if (!("ws".equalsIgnoreCase(scheme) || "wss".equalsIgnoreCase(scheme)))
+            throw illegal("invalid URI scheme: " + scheme);
+        if (uri.getHost() == null)
+            throw illegal("URI must contain a host: " + uri);
+        if (uri.getFragment() != null)
+            throw illegal("URI must not contain a fragment: " + uri);
+        return uri;
+    }
+
     private static IllegalArgumentException illegal(String message) {
         return new IllegalArgumentException(message);
     }
 
-    private static CheckFailedException checkFailed(String message) {
-        throw new CheckFailedException(message);
+    /**
+     * Returns the proxy for the given URI when sent through the given client,
+     * or {@code null} if none is required or applicable.
+     */
+    private static Proxy proxyFor(Optional<ProxySelector> selector, URI uri) {
+        if (!selector.isPresent()) {
+            return null;
+        }
+        URI requestURI = createRequestURI(uri); // Based on the HTTP scheme
+        List<Proxy> pl = selector.get().select(requestURI);
+        if (pl.isEmpty()) {
+            return null;
+        }
+        Proxy proxy = pl.get(0);
+        if (proxy.type() != Proxy.Type.HTTP) {
+            return null;
+        }
+        return proxy;
+    }
+
+    /**
+     * Performs the necessary security permissions checks to connect ( possibly
+     * through a proxy ) to the builders WebSocket URI.
+     *
+     * @throws SecurityException if the security manager denies access
+     */
+    static void checkPermissions(BuilderImpl b, Proxy proxy) {
+        SecurityManager sm = System.getSecurityManager();
+        if (sm == null) {
+            return;
+        }
+        Stream<String> headers = b.getHeaders().stream().map(p -> p.first).distinct();
+        URLPermission perm1 = Utils.permissionForServer(b.getUri(), "", headers);
+        sm.checkPermission(perm1);
+        if (proxy == null) {
+            return;
+        }
+        URLPermission perm2 = permissionForProxy((InetSocketAddress) proxy.address());
+        if (perm2 != null) {
+            sm.checkPermission(perm2);
+        }
     }
 }
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java	Wed Nov 22 11:21:36 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java	Wed Nov 22 17:13:35 2017 +0300
@@ -27,15 +27,9 @@
 
 import java.io.IOException;
 import java.lang.ref.Reference;
-import java.net.InetSocketAddress;
 import java.net.ProtocolException;
-import java.net.Proxy;
-import java.net.ProxySelector;
 import java.net.URI;
-import java.net.URLPermission;
 import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
@@ -61,11 +55,9 @@
 import jdk.incubator.http.internal.websocket.OutgoingMessage.Pong;
 import jdk.incubator.http.internal.websocket.OutgoingMessage.Text;
 
-import static java.lang.String.format;
 import static java.util.Objects.requireNonNull;
 import static jdk.incubator.http.internal.common.MinimalFuture.failedFuture;
 import static jdk.incubator.http.internal.common.Pair.pair;
-import static jdk.incubator.http.internal.common.Utils.permissionForProxy;
 import static jdk.incubator.http.internal.websocket.StatusCodes.CLOSED_ABNORMALLY;
 import static jdk.incubator.http.internal.websocket.StatusCodes.NO_STATUS_CODE;
 import static jdk.incubator.http.internal.websocket.StatusCodes.isLegalToSendFromClient;
@@ -77,7 +69,7 @@
 
     private final URI uri;
     private final String subprotocol;
-    private final RawChannel channel;
+    private final RawChannel channel; /* Stored to call close() on */
     private final Listener listener;
 
     private volatile boolean intputClosed;
@@ -118,83 +110,10 @@
      */
     private final Object lock = new Object();
 
-    private final CompletableFuture<?> closeReceived = new MinimalFuture<>();
-    private final CompletableFuture<?> closeSent = new MinimalFuture<>();
-
-    /**
-     * Returns the proxy for the given URI when sent through the given client,
-     * or {@code null} if none is required or applicable.
-     */
-    private static Proxy proxyFor(Optional<ProxySelector> selector, URI uri) {
-        if (!selector.isPresent()) {
-            return null;
-        }
-        URI requestURI = OpeningHandshake.createRequestURI(uri);  // based on the HTTP scheme
-        List<Proxy> pl = selector.get().select(requestURI);
-        if (pl.isEmpty()) {
-            return null;
-        }
-        Proxy proxy = pl.get(0);
-        if (proxy.type() != Proxy.Type.HTTP) {
-            return null;
-        }
-        return proxy;
-    }
-
-    /**
-     * Performs the necessary security permissions checks to connect ( possibly
-     * through a proxy ) to the builders WebSocket URI.
-     *
-     * @throws SecurityException if the security manager denies access
-     */
-    static void checkPermissions(BuilderImpl b, Proxy proxy) {
-        SecurityManager sm = System.getSecurityManager();
-        if (sm == null) {
-            return;
-        }
-        URLPermission perm1 = Utils.permissionForServer(
-                b.getUri(), "", b.getHeaders().stream().map(p -> p.first).distinct());
-        sm.checkPermission(perm1);
-        if (proxy == null) {
-            return;
-        }
-        URLPermission perm2 = permissionForProxy((InetSocketAddress) proxy.address());
-        if (perm2 != null) {
-            sm.checkPermission(perm2);
-        }
-    }
-
-    private static IllegalArgumentException newIAE(String message, Object... args) {
-        return new IllegalArgumentException(format(message, args));
-    }
-
-    private static URI checkURI(URI uri) {
-        String scheme = uri.getScheme();
-        if (scheme == null)
-            throw newIAE("URI with undefined scheme");
-        scheme = scheme.toLowerCase();
-        if (!(scheme.equals("ws") || scheme.equals("wss")))
-            throw newIAE("invalid URI scheme %s", scheme);
-        if (uri.getHost() == null)
-            throw newIAE("URI must contain a host: %s", uri);
-        if (uri.getFragment() != null)
-            throw newIAE("URI must not contain a fragment: %s", uri);
-        return uri;
-    }
+    private final CompletableFuture<?> channelInputClosed = new MinimalFuture<>();
+    private final CompletableFuture<?> channelOutputClosed = new MinimalFuture<>();
 
     static CompletableFuture<WebSocket> newInstanceAsync(BuilderImpl b) {
-        try {
-            checkURI(b.getUri());
-        } catch (IllegalArgumentException e) {
-            return failedFuture(e);
-        }
-        Proxy proxy = proxyFor(b.getProxySelector(), b.getUri());
-        try {
-            checkPermissions(b, proxy);
-        } catch (Throwable throwable) {
-            return failedFuture(throwable);
-        }
-
         Function<Result, WebSocket> newWebSocket = r -> {
             WebSocketImpl ws = new WebSocketImpl(b.getUri(),
                                                  r.subprotocol,
@@ -213,8 +132,8 @@
         };
         OpeningHandshake h;
         try {
-            h = new OpeningHandshake(b, proxy);
-        } catch (IllegalArgumentException e) {
+            h = new OpeningHandshake(b);
+        } catch (Exception e) {
             return failedFuture(e);
         }
         return h.send().thenApply(newWebSocket);
@@ -231,10 +150,10 @@
         this.listener = requireNonNull(listener);
         this.transmitter = new Transmitter(channel);
         this.receiver = new Receiver(messageConsumerOf(listener), channel);
-        this.sendScheduler = new SequentialScheduler(new SendFirstTask());
+        this.sendScheduler = new SequentialScheduler(new SendTask());
 
-        // Set up the Closing Handshake action
-        CompletableFuture.allOf(closeReceived, closeSent)
+        // Set up automatic channel closing action
+        CompletableFuture.allOf(channelInputClosed, channelOutputClosed)
                 .whenComplete((result, error) -> {
                     try {
                         channel.close();
@@ -289,7 +208,7 @@
         } catch (IOException e) {
             Log.logError(e);
         }
-        boolean alreadyCompleted = !closeReceived.complete(null);
+        boolean alreadyCompleted = !channelInputClosed.complete(null);
         if (alreadyCompleted) {
             // This CF is supposed to be completed only once, the first time a
             // Close message is received. No further messages are pulled from
@@ -364,7 +283,6 @@
     @Override
     public CompletableFuture<WebSocket> sendClose(int statusCode,
                                                   String reason) {
-        outputClosed = true;
         if (!isLegalToSendFromClient(statusCode)) {
             return failedFuture(
                     new IllegalArgumentException("statusCode: " + statusCode));
@@ -375,6 +293,7 @@
         } catch (IllegalArgumentException e) {
             return failedFuture(e);
         }
+        outputClosed = true;
         return enqueueClose(msg);
     }
 
@@ -398,7 +317,7 @@
                         } catch (IOException e) {
                             Log.logError(e);
                         }
-                        closeSent.complete(null);
+                        channelOutputClosed.complete(null);
                     }
                 });
     }
@@ -430,10 +349,11 @@
     }
 
     /*
-     * This is the main sending task. It may be run in different threads,
-     * but never concurrently.
+     * This is a message sending task. It pulls messages from the queue one by
+     * one and sends them. It may be run in different threads, but never
+     * concurrently.
      */
-    private class SendFirstTask implements SequentialScheduler.RestartableTask {
+    private class SendTask implements SequentialScheduler.RestartableTask {
         @Override
         public void run(DeferredCompleter taskCompleter) {
             Pair<OutgoingMessage, CompletableFuture<WebSocket>> p = queue.poll();
@@ -452,6 +372,10 @@
                         cf.completeExceptionally(e);
                     }
                     taskCompleter.complete();
+                    // More than a single message may have been enqueued while
+                    // the task has been busy with the current message, but
+                    // there only one signal is recorded
+                    sendScheduler.runOrSchedule();
                 };
                 transmitter.send(message, h);
             } catch (Exception t) {