8179021: Latest bugfixes to WebSocket/HPACK from the sandbox repo
authorprappo
Wed, 10 May 2017 12:36:14 +0100
changeset 45119 decbbff9fdb4
parent 45118 e4258d800b54
child 45120 ba4c8ba79c19
8179021: Latest bugfixes to WebSocket/HPACK from the sandbox repo Reviewed-by: dfuchs
jdk/src/java.base/share/classes/module-info.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpRequestImpl.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/WebSocket.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/hpack/Decoder.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/hpack/HeaderTable.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/BuilderImpl.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/CooperativeHandler.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Frame.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/FrameConsumer.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OpeningHandshake.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OutgoingMessage.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/StatusCodes.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transmitter.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java
jdk/test/java/net/httpclient/whitebox/Driver.java
jdk/test/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/RawChannelTest.java
--- a/jdk/src/java.base/share/classes/module-info.java	Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/java.base/share/classes/module-info.java	Wed May 10 12:36:14 2017 +0100
@@ -190,7 +190,8 @@
         jdk.unsupported;
     exports jdk.internal.vm.annotation to
         jdk.unsupported,
-        jdk.internal.vm.ci;
+        jdk.internal.vm.ci,
+        jdk.incubator.httpclient;
     exports jdk.internal.util.jar to
         jdk.jartool,
         jdk.jdeps,
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpRequestImpl.java	Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpRequestImpl.java	Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -123,7 +123,7 @@
         this.method = method;
         this.systemHeaders = new HttpHeadersImpl();
         this.userHeaders = ImmutableHeaders.empty();
-        this.uri = null;
+        this.uri = URI.create("socket://" + authority.getHostString() + ":" + Integer.toString(authority.getPort()) + "/");
         this.requestProcessor = HttpRequest.noBody();
         this.authority = authority;
         this.secure = false;
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java	Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java	Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -27,6 +27,7 @@
 
 import jdk.incubator.http.internal.common.ByteBufferReference;
 import jdk.incubator.http.internal.common.MinimalFuture;
+import jdk.incubator.http.HttpResponse.BodyHandler;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -72,7 +73,8 @@
     public void connect() throws IOException, InterruptedException {
         delegate.connect();
         HttpRequestImpl req = new HttpRequestImpl("CONNECT", client, address);
-        Exchange<?> connectExchange = new Exchange<>(req, null);
+        MultiExchange<Void,Void> mul = new MultiExchange<>(req, client, BodyHandler.<Void>discard(null));
+        Exchange<Void> connectExchange = new Exchange<>(req, mul);
         Response r = connectExchange.responseImpl(delegate);
         if (r.statusCode() != 200) {
             throw new IOException("Tunnel failed");
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/WebSocket.java	Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/WebSocket.java	Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -43,7 +43,7 @@
  * <p> To create a {@code WebSocket} use a {@linkplain HttpClient#newWebSocketBuilder(
  * URI, Listener) builder}. Once a {@code WebSocket} is built, it's ready
  * to send and receive messages. When the {@code WebSocket} is no longer needed
- * it must be closed: a Close message must both be {@linkplain #sendClose()
+ * it must be closed: a Close message must both be {@linkplain #sendClose
  * sent} and {@linkplain Listener#onClose(WebSocket, int, String) received}.
  * The {@code WebSocket} may be also closed {@linkplain #abort() abruptly}.
  *
@@ -94,17 +94,6 @@
     int NORMAL_CLOSURE = 1000;
 
     /**
-     * The WebSocket Close message status code (<code>{@value}</code>), is
-     * designated for use in applications expecting a status code to indicate
-     * that the connection was closed abnormally, e.g., without sending or
-     * receiving a Close message.
-     *
-     * @see Listener#onClose(WebSocket, int, String)
-     * @see #abort()
-     */
-    int CLOSED_ABNORMALLY = 1006;
-
-    /**
      * A builder for creating {@code WebSocket} instances.
      * {@Incubating}
      *
@@ -509,7 +498,7 @@
          *
          * <p> The {@code WebSocket} will close at the earliest of completion of
          * the returned {@code CompletionStage} or sending a Close message. In
-         * particular, if a Close message has been {@link WebSocket#sendClose()
+         * particular, if a Close message has been {@linkplain WebSocket#sendClose
          * sent} before, then this invocation completes the closing handshake
          * and by the time this method is invoked, the {@code WebSocket} will
          * have been closed.
@@ -643,44 +632,6 @@
     CompletableFuture<WebSocket> sendText(CharSequence message, boolean isLast);
 
     /**
-     * Sends a whole Text message with characters from the given {@code
-     * CharSequence}.
-     *
-     * <p> This is a convenience method. For the general case, use {@link
-     * #sendText(CharSequence, boolean)}.
-     *
-     * <p> Returns a {@code CompletableFuture<WebSocket>} which completes
-     * normally when the message has been sent or completes exceptionally if an
-     * error occurs.
-     *
-     * <p> The {@code CharSequence} must not be modified until the returned
-     * {@code CompletableFuture} completes (either normally or exceptionally).
-     *
-     * <p> The returned {@code CompletableFuture} can complete exceptionally
-     * with:
-     * <ul>
-     * <li> {@link IllegalArgumentException} -
-     *          if {@code message} is a malformed UTF-16 sequence
-     * <li> {@link IllegalStateException} -
-     *          if the {@code WebSocket} is closed;
-     *          or if a Close message has been sent;
-     *          or if there is an outstanding send operation;
-     *          or if a previous Binary message was sent with {@code isLast == false}
-     * <li> {@link IOException} -
-     *          if an I/O error occurs during this operation;
-     *          or if the {@code WebSocket} has been closed due to an error;
-     * </ul>
-     *
-     * @param message
-     *         the message
-     *
-     * @return a {@code CompletableFuture} with this {@code WebSocket}
-     */
-    default CompletableFuture<WebSocket> sendText(CharSequence message) {
-        return sendText(message, true);
-    }
-
-    /**
      * Sends a Binary message with bytes from the given {@code ByteBuffer}.
      *
      * <p> Returns a {@code CompletableFuture<WebSocket>} which completes
@@ -831,47 +782,10 @@
      *         the reason
      *
      * @return a {@code CompletableFuture} with this {@code WebSocket}
-     *
-     * @see #sendClose()
      */
     CompletableFuture<WebSocket> sendClose(int statusCode, String reason);
 
     /**
-     * Sends an empty Close message.
-     *
-     * <p> When this method has been invoked, no further messages can be sent.
-     *
-     * <p> For more details on Close message see RFC 6455 section
-     * <a href="https://tools.ietf.org/html/rfc6455#section-5.5.1">5.5.1. Close</a>
-     *
-     * <p> The method returns a {@code CompletableFuture<WebSocket>} which
-     * completes normally when the message has been sent or completes
-     * exceptionally if an error occurs.
-     *
-     * <p> The returned {@code CompletableFuture} can complete exceptionally
-     * with:
-     * <ul>
-     * <li> {@link IOException} -
-     *          if an I/O error occurs during this operation;
-     *          or the {@code WebSocket} has been closed due to an error
-     * </ul>
-     *
-     * <p> If this method has already been invoked or the {@code WebSocket} is
-     * closed, then subsequent invocations of this method have no effect and the
-     * returned {@code CompletableFuture} completes normally.
-     *
-     * <p> If a Close message has been {@linkplain Listener#onClose(WebSocket,
-     * int, String) received} before, then this invocation completes the closing
-     * handshake and by the time the returned {@code CompletableFuture}
-     * completes, the {@code WebSocket} will have been closed.
-     *
-     * @return a {@code CompletableFuture} with this {@code WebSocket}
-     *
-     * @see #sendClose(int, String)
-     */
-    CompletableFuture<WebSocket> sendClose();
-
-    /**
      * Allows {@code n} more messages to be received by the {@link Listener
      * Listener}.
      *
@@ -928,8 +842,7 @@
      * state.
      *
      * <p> As the result {@link Listener#onClose(WebSocket, int, String)
-     * Listener.onClose} will be invoked with the status code {@link
-     * #CLOSED_ABNORMALLY} unless either {@code onClose} or {@link
+     * Listener.onClose} will be invoked unless either {@code onClose} or {@link
      * Listener#onError(WebSocket, Throwable) onError} has been invoked before.
      * In which case no additional invocation will happen.
      *
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/hpack/Decoder.java	Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/hpack/Decoder.java	Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2014, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -24,6 +24,8 @@
  */
 package jdk.incubator.http.internal.hpack;
 
+import jdk.internal.vm.annotation.Stable;
+
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.net.ProtocolException;
@@ -60,6 +62,7 @@
  */
 public final class Decoder {
 
+    @Stable
     private static final State[] states = new State[256];
 
     static {
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/hpack/HeaderTable.java	Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/hpack/HeaderTable.java	Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2014, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -24,6 +24,8 @@
  */
 package jdk.incubator.http.internal.hpack;
 
+import jdk.internal.vm.annotation.Stable;
+
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -40,6 +42,7 @@
 //
 final class HeaderTable {
 
+    @Stable
     private static final HeaderField[] staticTable = {
             null, // To make index 1-based, instead of 0-based
             new HeaderField(":authority"),
@@ -110,7 +113,7 @@
     private static final Map<String, LinkedHashMap<String, Integer>> staticIndexes;
 
     static {
-        staticIndexes = new HashMap<>(STATIC_TABLE_LENGTH);
+        staticIndexes = new HashMap<>(STATIC_TABLE_LENGTH); // TODO: Map.of
         for (int i = 1; i <= STATIC_TABLE_LENGTH; i++) {
             HeaderField f = staticTable[i];
             Map<String, Integer> values = staticIndexes
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/BuilderImpl.java	Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/BuilderImpl.java	Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -46,7 +46,7 @@
     private final HttpClient client;
     private final URI uri;
     private final Listener listener;
-    private final List<Pair<String, String>> headers = new LinkedList<>();
+    private final Collection<Pair<String, String>> headers = new LinkedList<>();
     private final Collection<String> subprotocols = new LinkedList<>();
     private Duration timeout;
 
@@ -65,17 +65,18 @@
     }
 
     @Override
-    public Builder subprotocols(String mostPreferred, String... lesserPreferred)
+    public Builder subprotocols(String mostPreferred,
+                                String... lesserPreferred)
     {
         requireNonNull(mostPreferred, "mostPreferred");
         requireNonNull(lesserPreferred, "lesserPreferred");
         List<String> subprotocols = new LinkedList<>();
+        subprotocols.add(mostPreferred);
         for (int i = 0; i < lesserPreferred.length; i++) {
             String p = lesserPreferred[i];
             requireNonNull(p, "lesserPreferred[" + i + "]");
             subprotocols.add(p);
         }
-        subprotocols.add(0, mostPreferred);
         this.subprotocols.clear();
         this.subprotocols.addAll(subprotocols);
         return this;
@@ -98,20 +99,9 @@
 
     Listener getListener() { return listener; }
 
-    List<Pair<String, String>> getHeaders() { return headers; }
+    Collection<Pair<String, String>> getHeaders() { return headers; }
 
     Collection<String> getSubprotocols() { return subprotocols; }
 
     Duration getConnectTimeout() { return timeout; }
-
-    @Override
-    public String toString() {
-        return "WebSocket.Builder{"
-                + ", uri=" + uri
-                + ", listener=" + listener
-                + (!headers.isEmpty() ? ", headers=" + headers : "")
-                + (!subprotocols.isEmpty() ? ", subprotocols=" + subprotocols : "")
-                + ( timeout != null ? ", connectTimeout=" + timeout : "")
-                + '}';
-    }
 }
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/CooperativeHandler.java	Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/CooperativeHandler.java	Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -25,70 +25,184 @@
 
 package jdk.incubator.http.internal.websocket;
 
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
 import static java.util.Objects.requireNonNull;
 
-final class CooperativeHandler {
+/*
+ * A synchronization aid that assists a number of parties in running a task
+ * in a mutually exclusive fashion.
+ *
+ * To run the task, a party invokes `handle`. To permanently prevent the task
+ * from subsequent runs, the party invokes `stop`.
+ *
+ * The parties do not have to operate in different threads.
+ *
+ * The task can be either synchronous or asynchronous.
+ *
+ * If the task is synchronous, it is represented with `Runnable`.
+ * The handler invokes `Runnable.run` to run the task.
+ *
+ * If the task is asynchronous, it is represented with `Consumer<Runnable>`.
+ * The handler invokes `Consumer.accept(end)` to begin the task. The task
+ * invokes `end.run()` when it has ended.
+ *
+ * The next run of the task will not begin until the previous run has finished.
+ *
+ * The task may invoke `handle()` by itself, it's a normal situation.
+ */
+public final class CooperativeHandler {
 
-    private static final long CONTINUE = 0;
-    private static final long OFF      = 1;
-    private static final long ON       = 2;
-    private static final long STOP     = 4;
+    /*
+       Since the task is fixed and known beforehand, no blocking synchronization
+       (locks, queues, etc.) is required. The job can be done solely using
+       nonblocking primitives.
+
+       The machinery below addresses two problems:
+
+         1. Running the task in a sequential order (no concurrent runs):
+
+                begin, end, begin, end...
+
+         2. Avoiding indefinite recursion:
 
-    private final AtomicLong state = new AtomicLong(OFF);
+                begin
+                  end
+                    begin
+                      end
+                        ...
+
+       Problem #1 is solved with a finite state machine with 4 states:
+
+           BEGIN, AGAIN, END, and STOP.
+
+       Problem #2 is solved with a "state modifier" OFFLOAD.
+
+       Parties invoke `handle()` to signal the task must run. A party that has
+       invoked `handle()` either begins the task or exploits the party that is
+       either beginning the task or ending it.
+
+       The party that is trying to end the task either ends it or begins it
+       again.
 
-    private final Runnable task;
+       To avoid indefinite recursion, before re-running the task tryEnd() sets
+       OFFLOAD bit, signalling to its "child" tryEnd() that this ("parent")
+       tryEnd() is available and the "child" must offload the task on to the
+       "parent". Then a race begins. Whichever invocation of tryEnd() manages
+       to unset OFFLOAD bit first does not do the work.
+
+       There is at most 1 thread that is beginning the task and at most 2
+       threads that are trying to end it: "parent" and "child". In case of a
+       synchronous task "parent" and "child" are the same thread.
+     */
 
-    CooperativeHandler(Runnable task) {
-        this.task = requireNonNull(task);
+    private static final int OFFLOAD =  1;
+    private static final int AGAIN   =  2;
+    private static final int BEGIN   =  4;
+    private static final int STOP    =  8;
+    private static final int END     = 16;
+
+    private final AtomicInteger state = new AtomicInteger(END);
+    private final Consumer<Runnable> begin;
+
+    public CooperativeHandler(Runnable task) {
+        this(asyncOf(task));
+    }
+
+    public CooperativeHandler(Consumer<Runnable> begin) {
+        this.begin = requireNonNull(begin);
     }
 
     /*
-     * Causes the task supplied to the constructor to run. The task may be run
-     * by this thread as well as by any other that has invoked this method.
+     * Runs the task (though maybe by a different party).
      *
      * The recursion which is possible here will have the maximum depth of 1:
      *
-     *     task.run()
-     *         this.startOrContinue()
-     *             task.run()
+     *     this.handle()
+     *         begin.accept()
+     *             this.handle()
      */
-    void startOrContinue() {
-        long s;
+    public void handle() {
         while (true) {
-            s = state.get();
-            if (s == OFF && state.compareAndSet(OFF, ON)) {
-                // No one is running the task, we are going to run it
-                break;
-            }
-            if (s == ON && state.compareAndSet(ON, CONTINUE)) {
-                // Some other thread is running the task. We have managed to
-                // update the state, it will be surely noticed by that thread.
+            int s = state.get();
+            if (s == END) {
+                if (state.compareAndSet(END, BEGIN)) {
+                    break;
+                }
+            } else if ((s & BEGIN) != 0) {
+                // Tries to change the state to AGAIN, preserving OFFLOAD bit
+                if (state.compareAndSet(s, AGAIN | (s & OFFLOAD))) {
+                    return;
+                }
+            } else if ((s & AGAIN) != 0 || s == STOP) {
                 return;
-            }
-            if (s == CONTINUE || s == STOP) {
-                return;
+            } else {
+                throw new InternalError(String.valueOf(s));
             }
         }
+        begin.accept(this::tryEnd);
+    }
+
+    private void tryEnd() {
         while (true) {
-            task.run();
-            // State checks are ordered by the probability of expected values
-            // (it might be different in different usage patterns, say, when
-            // invocations to `startOrContinue()` are concurrent)
-            if (state.compareAndSet(ON, OFF)) {
-                break; // The state hasn't changed, all done
+            int s;
+            while (((s = state.get()) & OFFLOAD) != 0) {
+                // Tries to offload ending of the task to the parent
+                if (state.compareAndSet(s, s & ~OFFLOAD)) {
+                    return;
+                }
             }
-            if (state.compareAndSet(CONTINUE, ON)) {
-                continue;
+            while (true) {
+                if (s == BEGIN) {
+                    if (state.compareAndSet(BEGIN, END)) {
+                        return;
+                    }
+                } else if (s == AGAIN) {
+                    if (state.compareAndSet(AGAIN, BEGIN | OFFLOAD)) {
+                        break;
+                    }
+                } else if (s == STOP) {
+                    return;
+                } else {
+                    throw new InternalError(String.valueOf(s));
+                }
+                s = state.get();
             }
-            // Other threads can change the state from CONTINUE to STOP only
-            // So if it's not ON and not CONTINUE, it can only be STOP
-            break;
+            begin.accept(this::tryEnd);
         }
     }
 
-    void stop() {
+    /*
+     * Checks whether or not this handler has been permanently stopped.
+     *
+     * Should be used from inside the task to poll the status of the handler,
+     * pretty much the same way as it is done for threads:
+     *
+     *     if (!Thread.currentThread().isInterrupted()) {
+     *         ...
+     *     }
+     */
+    public boolean isStopped() {
+        return state.get() == STOP;
+    }
+
+    /*
+     * Signals this handler to ignore subsequent invocations to `handle()`.
+     *
+     * If the task has already begun, this invocation will not affect it,
+     * unless the task itself uses `isStopped()` method to check the state
+     * of the handler.
+     */
+    public void stop() {
         state.set(STOP);
     }
+
+    private static Consumer<Runnable> asyncOf(Runnable task) {
+        requireNonNull(task);
+        return ender -> {
+            task.run();
+            ender.run();
+        };
+    }
 }
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Frame.java	Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Frame.java	Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -25,6 +25,8 @@
 
 package jdk.incubator.http.internal.websocket;
 
+import jdk.internal.vm.annotation.Stable;
+
 import java.nio.ByteBuffer;
 
 import static jdk.incubator.http.internal.common.Utils.dump;
@@ -58,6 +60,7 @@
         CONTROL_0xE    (0xE),
         CONTROL_0xF    (0xF);
 
+        @Stable
         private static final Opcode[] opcodes;
 
         static {
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/FrameConsumer.java	Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/FrameConsumer.java	Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -38,7 +38,7 @@
 import static java.util.Objects.requireNonNull;
 import static jdk.incubator.http.internal.common.Utils.dump;
 import static jdk.incubator.http.internal.websocket.StatusCodes.NO_STATUS_CODE;
-import static jdk.incubator.http.internal.websocket.StatusCodes.checkIncomingCode;
+import static jdk.incubator.http.internal.websocket.StatusCodes.isLegalToReceiveFromServer;
 
 /*
  * Consumes frame parts and notifies a message consumer, when there is
@@ -212,20 +212,20 @@
         }
         switch (opcode) {
             case CLOSE:
-                int statusCode = NO_STATUS_CODE;
+                char statusCode = NO_STATUS_CODE;
                 String reason = "";
                 if (payloadLen != 0) {
                     int len = binaryData.remaining();
                     assert 2 <= len && len <= 125 : dump(len, payloadLen);
+                    statusCode = binaryData.getChar();
+                    if (!isLegalToReceiveFromServer(statusCode)) {
+                        throw new FailWebSocketException(
+                                "Illegal status code: " + statusCode);
+                    }
                     try {
-                        statusCode = checkIncomingCode(binaryData.getChar());
                         reason = UTF_8.newDecoder().decode(binaryData).toString();
-                    } catch (CheckFailedException e) {
-                        throw new FailWebSocketException("Incorrect status code")
-                                .initCause(e);
                     } catch (CharacterCodingException e) {
-                        throw new FailWebSocketException(
-                                "Close reason is a malformed UTF-8 sequence")
+                        throw new FailWebSocketException("Illegal close reason")
                                 .initCause(e);
                     }
                 }
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OpeningHandshake.java	Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OpeningHandshake.java	Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -37,6 +37,8 @@
 import jdk.incubator.http.HttpResponse;
 import jdk.incubator.http.HttpResponse.BodyHandler;
 import jdk.incubator.http.WebSocketHandshakeException;
+import jdk.incubator.http.internal.common.Pair;
+
 import java.nio.charset.StandardCharsets;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
@@ -66,7 +68,6 @@
     private static final String HEADER_KEY        = "Sec-WebSocket-Key";
     private static final String HEADER_PROTOCOL   = "Sec-WebSocket-Protocol";
     private static final String HEADER_VERSION    = "Sec-WebSocket-Version";
-    private static final String VALUE_VERSION     = "13"; // WebSocket's lucky number
 
     private static final Set<String> FORBIDDEN_HEADERS;
 
@@ -106,12 +107,18 @@
         if (connectTimeout != null) {
             requestBuilder.timeout(connectTimeout);
         }
+        for (Pair<String, String> p : b.getHeaders()) {
+            if (FORBIDDEN_HEADERS.contains(p.first)) {
+                throw illegal("Illegal header: " + p.first);
+            }
+            requestBuilder.header(p.first, p.second);
+        }
         this.subprotocols = createRequestSubprotocols(b.getSubprotocols());
         if (!this.subprotocols.isEmpty()) {
             String p = this.subprotocols.stream().collect(Collectors.joining(", "));
             requestBuilder.header(HEADER_PROTOCOL, p);
         }
-        requestBuilder.header(HEADER_VERSION, VALUE_VERSION);
+        requestBuilder.header(HEADER_VERSION, "13"); // WebSocket's lucky number
         this.nonce = createNonce();
         requestBuilder.header(HEADER_KEY, this.nonce);
         // Setting request version to HTTP/1.1 forcibly, since it's not possible
@@ -133,11 +140,7 @@
             if (s.trim().isEmpty() || !isValidName(s)) {
                 throw illegal("Bad subprotocol syntax: " + s);
             }
-            if (FORBIDDEN_HEADERS.contains(s)) {
-                throw illegal("Forbidden header: " + s);
-            }
-            boolean unique = sp.add(s);
-            if (!unique) {
+            if (!sp.add(s)) {
                 throw illegal("Duplicating subprotocol: " + s);
             }
         }
@@ -176,7 +179,7 @@
 
     CompletableFuture<Result> send() {
         return client.sendAsync(this.request, BodyHandler.<Void>discard(null))
-                     .thenCompose(this::resultFrom);
+                .thenCompose(this::resultFrom);
     }
 
     /*
@@ -283,7 +286,6 @@
 
     private static String requireSingle(HttpHeaders responseHeaders,
                                         String headerName)
-            throws CheckFailedException
     {
         List<String> values = responseHeaders.allValues(headerName);
         if (values.isEmpty()) {
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OutgoingMessage.java	Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OutgoingMessage.java	Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -60,6 +60,7 @@
  */
 abstract class OutgoingMessage {
 
+    // Share per WebSocket?
     private static final SecureRandom maskingKeys = new SecureRandom();
 
     protected ByteBuffer[] frame;
@@ -71,6 +72,8 @@
      * convenient moment (up to the point where sentTo is invoked).
      */
     protected void contextualize(Context context) {
+        // masking and charset decoding should be performed here rather than in
+        // the constructor (as of today)
         if (context.isCloseSent()) {
             throw new IllegalStateException("Close sent");
         }
@@ -101,7 +104,7 @@
         private final boolean isLast;
 
         Text(CharSequence characters, boolean isLast) {
-            CharsetEncoder encoder = UTF_8.newEncoder();
+            CharsetEncoder encoder = UTF_8.newEncoder(); // Share per WebSocket?
             try {
                 payload = encoder.encode(CharBuffer.wrap(characters));
             } catch (CharacterCodingException e) {
@@ -172,11 +175,11 @@
 
         Close(int statusCode, CharSequence reason) {
             ByteBuffer payload = ByteBuffer.allocate(125)
-                                           .putChar((char) statusCode);
+                    .putChar((char) statusCode);
             CoderResult result = UTF_8.newEncoder()
-                                      .encode(CharBuffer.wrap(reason),
-                                              payload,
-                                              true);
+                    .encode(CharBuffer.wrap(reason),
+                            payload,
+                            true);
             if (result.isOverflow()) {
                 throw new IllegalArgumentException("Long reason");
             } else if (result.isError()) {
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java	Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java	Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -58,8 +58,8 @@
     private final Frame.Reader reader = new Frame.Reader();
     private final RawChannel.RawEvent event = createHandler();
     private final AtomicLong demand = new AtomicLong();
-    private final CooperativeHandler receiveHandler =
-              new CooperativeHandler(this::tryDeliver);
+    private final CooperativeHandler handler =
+              new CooperativeHandler(this::pushContinuously);
     /*
      * Used to ensure registering the channel event at most once (i.e. to avoid
      * multiple registrations).
@@ -72,8 +72,8 @@
         this.channel = channel;
         this.data = channel.initialByteBuffer();
         this.frameConsumer = new FrameConsumer(this.messageConsumer);
-        // To ensure the initial `data` will be read correctly (happens-before)
-        // after readable.get()
+        // To ensure the initial non-final `data` will be read correctly
+        // (happens-before) by reader after executing readable.get()
         readable.set(true);
     }
 
@@ -88,7 +88,7 @@
             @Override
             public void handle() {
                 readable.set(true);
-                receiveHandler.startOrContinue();
+                handler.handle();
             }
         };
     }
@@ -98,7 +98,7 @@
             throw new IllegalArgumentException("Negative: " + n);
         }
         demand.accumulateAndGet(n, (p, i) -> p + i < 0 ? Long.MAX_VALUE : p + i);
-        receiveHandler.startOrContinue();
+        handler.handle();
     }
 
     void acknowledge() {
@@ -113,41 +113,21 @@
      * regardless of the current demand.
      */
     void close() {
-        receiveHandler.stop();
+        handler.stop();
     }
 
-    private void tryDeliver() {
-        if (readable.get() && demand.get() > 0) {
-            deliverAtMostOne();
+    private void pushContinuously() {
+        while (readable.get() && demand.get() > 0 && !handler.isStopped()) {
+            pushOnce();
         }
     }
 
-    private void deliverAtMostOne() {
-        if (data == null) {
-            try {
-                data = channel.read();
-            } catch (IOException e) {
-                readable.set(false);
-                messageConsumer.onError(e);
-                return;
-            }
-            if (data == null || !data.hasRemaining()) {
-                readable.set(false);
-                if (!data.hasRemaining()) {
-                    try {
-                        channel.registerEvent(event);
-                    } catch (IOException e) {
-                        messageConsumer.onError(e);
-                        return;
-                    }
-                } else if (data == null) {
-                    messageConsumer.onComplete();
-                }
-                return;
-            }
+    private void pushOnce() {
+        if (data == null && !readData()) {
+            return;
         }
         try {
-            reader.readFrame(data, frameConsumer);
+            reader.readFrame(data, frameConsumer); // Pushing frame parts to the consumer
         } catch (FailWebSocketException e) {
             messageConsumer.onError(e);
             return;
@@ -156,4 +136,28 @@
             data = null;
         }
     }
+
+    private boolean readData() {
+        try {
+            data = channel.read();
+        } catch (IOException e) {
+            messageConsumer.onError(e);
+            return false;
+        }
+        if (data == null) { // EOF
+            messageConsumer.onComplete();
+            return false;
+        } else if (!data.hasRemaining()) { // No data in the socket at the moment
+            data = null;
+            readable.set(false);
+            try {
+                channel.registerEvent(event);
+            } catch (IOException e) {
+                messageConsumer.onError(e);
+            }
+            return false;
+        }
+        assert data.hasRemaining();
+        return true;
+    }
 }
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/StatusCodes.java	Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/StatusCodes.java	Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -25,120 +25,72 @@
 
 package jdk.incubator.http.internal.websocket;
 
-import static jdk.incubator.http.WebSocket.CLOSED_ABNORMALLY;
-
 /*
- * Utilities and common constants for WebSocket status codes. For more details
- * on status codes and their meaning see:
+ * Utilities for WebSocket status codes.
  *
  *     1. https://tools.ietf.org/html/rfc6455#section-7.4
  *     2. http://www.iana.org/assignments/websocket/websocket.xhtml#close-code-number
  */
 final class StatusCodes {
 
-    static final int PROTOCOL_ERROR        = 1002;
-    static final int CANNOT_ACCEPT         = 1003;
-    static final int NO_STATUS_CODE        = 1005;
-    static final int NOT_CONSISTENT        = 1007;
-    static final int TOO_BIG               = 1009;
-    static final int NO_EXTENSION          = 1010;
-    static final int SERVICE_RESTART       = 1012;
-    static final int TRY_AGAIN_LATER       = 1013;
-    static final int TLS_HANDSHAKE_FAILURE = 1015;
+    static final int PROTOCOL_ERROR    = 1002;
+    static final int NO_STATUS_CODE    = 1005;
+    static final int CLOSED_ABNORMALLY = 1006;
+    static final int NOT_CONSISTENT    = 1007;
 
     private StatusCodes() { }
 
-    /*
-     * Returns the given code if it doesn't violate any rules for outgoing
-     * codes, otherwise throws a CFE with a detailed description.
-     */
-    static int checkOutgoingCode(int code) {
-        checkCommon(code);
-        if (code > 4999) {
-            throw new CheckFailedException("Unspecified: " + code);
-        }
-        if (isNotUserSettable(code)) {
-            throw new CheckFailedException("Cannot set: " + code);
-        }
-        return code;
-    }
-
-    /*
-     * Returns the given code if it doesn't violate any rules for incoming
-     * codes, otherwise throws a CFE with a detailed description.
-     */
-    static int checkIncomingCode(int code) {
-        checkCommon(code);
-        if (code == NO_EXTENSION) {
-            throw new CheckFailedException("Bad server code: " + code);
+    static boolean isLegalToSendFromClient(int code) {
+        if (!isLegal(code)) {
+            return false;
         }
-        return code;
-    }
-
-    private static int checkCommon(int code) {
-        if (isOutOfRange(code)) {
-            throw new CheckFailedException("Out of range: " + code);
-        }
-        if (isForbidden(code)) {
-            throw new CheckFailedException("Forbidden: " + code);
+        // Codes from unreserved range
+        if (code > 4999) {
+            return false;
         }
-        if (isUnassigned(code)) {
-            throw new CheckFailedException("Unassigned: " + code);
-        }
-        return code;
-    }
-
-    /*
-     * Returns true if the given code cannot be set by a user of the WebSocket
-     * API. e.g. this code means something which only a WebSocket implementation
-     * is responsible for or it doesn't make sense to be send by a WebSocket
-     * client.
-     */
-    private static boolean isNotUserSettable(int code) {
+        // Codes below are not allowed to be sent using a WebSocket client API
         switch (code) {
             case PROTOCOL_ERROR:
-            case CANNOT_ACCEPT:
             case NOT_CONSISTENT:
-            case TOO_BIG:
-            case NO_EXTENSION:
-            case TRY_AGAIN_LATER:
-            case SERVICE_RESTART:
+            case 1003:
+            case 1009:
+            case 1010:
+            case 1012:  // code sent by servers
+            case 1013:  // code sent by servers
+            case 1014:  // code sent by servers
+                return false;
+            default:
                 return true;
-            default:
-                return false;
         }
     }
 
-    /*
-     * Returns true if the given code cannot appear on the wire. It's always an
-     * error to send a frame with such a code or to receive one.
-     */
-    private static boolean isForbidden(int code) {
+    static boolean isLegalToReceiveFromServer(int code) {
+        if (!isLegal(code)) {
+            return false;
+        }
+        return code != 1010;  // code sent by clients
+    }
+
+    private static boolean isLegal(int code) {
+        // 2-byte unsigned integer excluding first 1000 numbers from the range
+        // [0, 999] which are never used
+        if (code < 1000 || code > 65535) {
+            return false;
+        }
+        // Codes from the range below has no known meaning under the WebSocket
+        // specification (i.e. unassigned/undefined)
+        if ((code >= 1016 && code <= 2999) || code == 1004) {
+            return false;
+        }
+        // Codes below cannot appear on the wire. It's always an error either
+        // to send a frame with such a code or to receive one.
         switch (code) {
             case NO_STATUS_CODE:
             case CLOSED_ABNORMALLY:
-            case TLS_HANDSHAKE_FAILURE:
+            case 1015:
+                return false;
+            default:
                 return true;
-            default:
-                return false;
         }
     }
-
-    /*
-     * Returns true if the given code has no known meaning under the WebSocket
-     * specification (i.e. unassigned/undefined).
-     */
-    private static boolean isUnassigned(int code) {
-        return (code >= 1016 && code <= 2999) || code == 1004 || code == 1014;
-    }
-
-    /*
-     * Returns true if the given code is not in domain of status codes:
-     *
-     * 2-byte unsigned integer minus first 1000 numbers from the range [0, 999]
-     * that are never used.
-     */
-    private static boolean isOutOfRange(int code) {
-        return code < 1000 || code > 65535;
-    }
 }
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transmitter.java	Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transmitter.java	Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -37,11 +37,12 @@
  *
  * No matter whether the message has been fully sent or an error has occurred,
  * the transmitter reports the outcome to the supplied handler and becomes ready
- * to accept a new message. Until then, it is considered "busy" and an
- * IllegalStateException will be thrown on each attempt to invoke send.
+ * to accept a new message. Until then, the transmitter is considered "busy" and
+ * an IllegalStateException will be thrown on each attempt to invoke send.
  */
 final class Transmitter {
 
+    /* This flag is used solely for assertions */
     private final AtomicBoolean busy = new AtomicBoolean();
     private OutgoingMessage message;
     private Consumer<Exception> completionHandler;
@@ -53,9 +54,10 @@
         this.event = createHandler();
     }
 
-    /*
-     * The supplied handler may be invoked in the calling thread, so watch out
-     * for stack overflow.
+    /**
+     * The supplied handler may be invoked in the calling thread.
+     * A {@code StackOverflowError} may thus occur if there's a possibility
+     * that this method is called again by the supplied handler.
      */
     void send(OutgoingMessage message, Consumer<Exception> completionHandler) {
         requireNonNull(message);
@@ -86,8 +88,9 @@
     private void send0(OutgoingMessage message, Consumer<Exception> handler) {
         boolean b = busy.get();
         assert b; // Please don't inline this, as busy.get() has memory
-                  // visibility effects and we don't want the correctness
-                  // of the algorithm to depend on assertions flag
+                  // visibility effects and we don't want the program behaviour
+                  // to depend on whether the assertions are turned on
+                  // or turned off
         try {
             boolean sent = message.sendTo(channel);
             if (sent) {
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java	Wed May 10 09:02:43 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java	Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -51,9 +51,9 @@
 import static java.util.Objects.requireNonNull;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static jdk.incubator.http.internal.common.Pair.pair;
+import static jdk.incubator.http.internal.websocket.StatusCodes.CLOSED_ABNORMALLY;
 import static jdk.incubator.http.internal.websocket.StatusCodes.NO_STATUS_CODE;
-import static jdk.incubator.http.internal.websocket.StatusCodes.TLS_HANDSHAKE_FAILURE;
-import static jdk.incubator.http.internal.websocket.StatusCodes.checkOutgoingCode;
+import static jdk.incubator.http.internal.websocket.StatusCodes.isLegalToSendFromClient;
 
 /*
  * A WebSocket client.
@@ -74,8 +74,8 @@
     private final AtomicBoolean outstandingSend = new AtomicBoolean();
     private final CooperativeHandler sendHandler =
               new CooperativeHandler(this::sendFirst);
-    private final Queue<Pair<OutgoingMessage, Consumer<Exception>>> queue =
-              new ConcurrentLinkedQueue<>();
+    private final Queue<Pair<OutgoingMessage, CompletableFuture<WebSocket>>>
+            queue = new ConcurrentLinkedQueue<>();
     private final Context context = new OutgoingMessage.Context();
     private final Transmitter transmitter;
     private final Receiver receiver;
@@ -110,6 +110,9 @@
                                                  r.subprotocol,
                                                  r.channel,
                                                  b.getListener());
+            // The order of calls might cause a subtle effects, like CF will be
+            // returned from the buildAsync _after_ onOpen has been signalled.
+            // This means if onOpen is lengthy, it might cause some problems.
             ws.signalOpen();
             return ws;
         };
@@ -125,7 +128,8 @@
     WebSocketImpl(URI uri,
                   String subprotocol,
                   RawChannel channel,
-                  Listener listener) {
+                  Listener listener)
+    {
         this.uri = requireNonNull(uri);
         this.subprotocol = requireNonNull(subprotocol);
         this.channel = requireNonNull(channel);
@@ -182,15 +186,17 @@
      * Processes a Close event that came from the channel. Invoked at most once.
      */
     private void processClose(int statusCode, String reason) {
-        assert statusCode != TLS_HANDSHAKE_FAILURE; // TLS problems happen long before WebSocket is alive
         receiver.close();
         try {
             channel.shutdownInput();
         } catch (IOException e) {
             Log.logError(e);
         }
-        boolean wasComplete = !closeReceived.complete(null);
-        if (wasComplete) {
+        boolean alreadyCompleted = !closeReceived.complete(null);
+        if (alreadyCompleted) {
+            // This CF is supposed to be completed only once, the first time a
+            // Close message is received. No further messages are pulled from
+            // the socket.
             throw new InternalError();
         }
         int code;
@@ -261,19 +267,17 @@
     @Override
     public CompletableFuture<WebSocket> sendClose(int statusCode,
                                                   String reason) {
-        try {
-            checkOutgoingCode(statusCode);
-        } catch (CheckFailedException e) {
-            IllegalArgumentException ex = new IllegalArgumentException(
-                    "Bad status code: " + statusCode, e);
-            failedFuture(ex);
+        if (!isLegalToSendFromClient(statusCode)) {
+            return failedFuture(
+                    new IllegalArgumentException("statusCode: " + statusCode));
         }
-        return enqueueClose(new Close(statusCode, reason));
-    }
-
-    @Override
-    public CompletableFuture<WebSocket> sendClose() {
-        return enqueueClose(new Close());
+        Close msg;
+        try {
+            msg = new Close(statusCode, reason);
+        } catch (IllegalArgumentException e) {
+            return failedFuture(e);
+        }
+        return enqueueClose(msg);
     }
 
     /*
@@ -288,8 +292,8 @@
             } catch (IOException e) {
                 Log.logError(e);
             }
-            boolean wasComplete = !closeSent.complete(null);
-            if (wasComplete) {
+            boolean alreadyCompleted = !closeSent.complete(null);
+            if (alreadyCompleted) {
                 // Shouldn't happen as this callback must run at most once
                 throw new InternalError();
             }
@@ -316,40 +320,41 @@
 
     private CompletableFuture<WebSocket> enqueue(OutgoingMessage m) {
         CompletableFuture<WebSocket> cf = new CompletableFuture<>();
-        Consumer<Exception> h = e -> {
-            if (e == null) {
-                cf.complete(WebSocketImpl.this);
-                sendHandler.startOrContinue();
-            } else {
-
-//                what if this is not a users message? (must be different entry points for different messages)
-
-                // TODO: think about correct behaviour in the face of error in
-                // the queue, for now it seems like the best solution is to
-                // deliver the error and stop
-                cf.completeExceptionally(e);
-            }
-        };
-        queue.add(pair(m, h)); // Always returns true
-        sendHandler.startOrContinue();
+        boolean added = queue.add(pair(m, cf));
+        if (!added) {
+            // The queue is supposed to be unbounded
+            throw new InternalError();
+        }
+        sendHandler.handle();
         return cf;
     }
 
-    private void sendFirst() {
-        Pair<OutgoingMessage, Consumer<Exception>> p = queue.poll();
+    /*
+     * This is the main sending method. It may be run in different threads,
+     * but never concurrently.
+     */
+    private void sendFirst(Runnable whenSent) {
+        Pair<OutgoingMessage, CompletableFuture<WebSocket>> p = queue.poll();
         if (p == null) {
+            whenSent.run();
             return;
         }
         OutgoingMessage message = p.first;
-        Consumer<Exception> h = p.second;
+        CompletableFuture<WebSocket> cf = p.second;
         try {
-            // At this point messages are finally ordered and will be written
-            // one by one in a mutually exclusive fashion; thus it's a pretty
-            // convenient place to contextualize them
             message.contextualize(context);
+            Consumer<Exception> h = e -> {
+                if (e == null) {
+                    cf.complete(WebSocketImpl.this);
+                } else {
+                    cf.completeExceptionally(e);
+                }
+                sendHandler.handle();
+                whenSent.run();
+            };
             transmitter.send(message, h);
         } catch (Exception t) {
-            h.accept(t);
+            cf.completeExceptionally(t);
         }
     }
 
@@ -381,7 +386,7 @@
     @Override
     public String toString() {
         return super.toString()
-                + "[" + (closed.get() ? "OPEN" : "CLOSED") + "]: " + uri
+                + "[" + (closed.get() ? "CLOSED" : "OPEN") + "]: " + uri
                 + (!subprotocol.isEmpty() ? ", subprotocol=" + subprotocol : "");
     }
 
@@ -476,7 +481,9 @@
                     int code = ((FailWebSocketException) error).getStatusCode();
                     enqueueClose(new Close(code, ""))
                             .whenComplete((r, e) -> {
-                                ex.addSuppressed(e);
+                                if (e != null) {
+                                    ex.addSuppressed(e);
+                                }
                                 try {
                                     channel.close();
                                 } catch (IOException e1) {
--- a/jdk/test/java/net/httpclient/whitebox/Driver.java	Wed May 10 09:02:43 2017 +0200
+++ b/jdk/test/java/net/httpclient/whitebox/Driver.java	Wed May 10 12:36:14 2017 +0100
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -26,5 +26,6 @@
  * @bug 8151299 8164704
  * @modules jdk.incubator.httpclient
  * @run testng jdk.incubator.httpclient/jdk.incubator.http.SelectorTest
+ * @run testng jdk.incubator.httpclient/jdk.incubator.http.RawChannelTest
  * @run testng jdk.incubator.httpclient/jdk.incubator.http.ResponseHeadersTest
  */
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/RawChannelTest.java	Wed May 10 12:36:14 2017 +0100
@@ -0,0 +1,287 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.incubator.http;
+
+import jdk.incubator.http.internal.websocket.RawChannel;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static jdk.incubator.http.HttpResponse.BodyHandler.discard;
+import static org.testng.Assert.assertEquals;
+
+/*
+ * This test exercises mechanics of _independent_ reads and writes on the
+ * RawChannel. It verifies that the underlying implementation can manage more
+ * than a single type of notifications at the same time.
+ */
+public class RawChannelTest {
+
+    private final AtomicLong clientWritten = new AtomicLong();
+    private final AtomicLong serverWritten = new AtomicLong();
+    private final AtomicLong clientRead = new AtomicLong();
+    private final AtomicLong serverRead = new AtomicLong();
+
+    /*
+     * Since at this level we don't have any control over the low level socket
+     * parameters, this latch ensures a write to the channel will stall at least
+     * once (socket's send buffer filled up).
+     */
+    private final CountDownLatch writeStall = new CountDownLatch(1);
+
+    /*
+     * This one works similarly by providing means to ensure a read from the
+     * channel will stall at least once (no more data available on the socket).
+     */
+    private final CountDownLatch readStall = new CountDownLatch(1);
+
+    private final AtomicInteger writeHandles = new AtomicInteger();
+    private final AtomicInteger readHandles = new AtomicInteger();
+
+    private final CountDownLatch exit = new CountDownLatch(1);
+
+    @Test
+    public void test() throws Exception {
+        try (ServerSocket server = new ServerSocket(0)) {
+            int port = server.getLocalPort();
+            new TestServer(server).start();
+            final RawChannel chan = channelOf(port);
+
+            // It's very important not to forget the initial bytes, possibly
+            // left from the HTTP thingy
+            int initialBytes = chan.initialByteBuffer().remaining();
+            print("RawChannel has %s initial bytes", initialBytes);
+            clientRead.addAndGet(initialBytes);
+
+            chan.registerEvent(new RawChannel.RawEvent() {
+
+                private final ByteBuffer reusableBuffer = ByteBuffer.allocate(32768);
+
+                @Override
+                public int interestOps() {
+                    return SelectionKey.OP_WRITE;
+                }
+
+                @Override
+                public void handle() {
+                    int i = writeHandles.incrementAndGet();
+                    print("OP_WRITE #%s", i);
+                    if (i > 3) { // Fill up the send buffer not more than 3 times
+                        try {
+                            chan.shutdownOutput();
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                        }
+                        return;
+                    }
+                    long total = 0;
+                    try {
+                        long n;
+                        do {
+                            ByteBuffer[] array = {reusableBuffer.slice()};
+                            n = chan.write(array, 0, 1);
+                            total += n;
+                        } while (n > 0);
+                        print("OP_WRITE clogged SNDBUF with %s bytes", total);
+                        clientWritten.addAndGet(total);
+                        chan.registerEvent(this);
+                        writeStall.countDown(); // signal send buffer is full
+                    } catch (IOException e) {
+                        throw new UncheckedIOException(e);
+                    }
+                }
+            });
+
+            chan.registerEvent(new RawChannel.RawEvent() {
+
+                @Override
+                public int interestOps() {
+                    return SelectionKey.OP_READ;
+                }
+
+                @Override
+                public void handle() {
+                    int i = readHandles.incrementAndGet();
+                    print("OP_READ #%s", i);
+                    ByteBuffer read = null;
+                    long total = 0;
+                    while (true) {
+                        try {
+                            read = chan.read();
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                        }
+                        if (read == null) {
+                            print("OP_READ EOF");
+                            break;
+                        } else if (!read.hasRemaining()) {
+                            print("OP_READ stall");
+                            try {
+                                chan.registerEvent(this);
+                            } catch (IOException e) {
+                                e.printStackTrace();
+                            }
+                            readStall.countDown();
+                            break;
+                        }
+                        int r = read.remaining();
+                        total += r;
+                        clientRead.addAndGet(r);
+                    }
+                    print("OP_READ read %s bytes", total);
+                }
+            });
+            exit.await(); // All done, we need to compare results:
+            assertEquals(clientRead.get(), serverWritten.get());
+            assertEquals(serverRead.get(), clientWritten.get());
+        }
+    }
+
+    private static RawChannel channelOf(int port) throws Exception {
+        URI uri = URI.create("http://127.0.0.1:" + port + "/");
+        print("raw channel to %s", uri.toString());
+        HttpRequest req = HttpRequest.newBuilder(uri).build();
+        HttpResponse<?> r = HttpClient.newHttpClient().send(req, discard(null));
+        r.body();
+        return ((HttpResponseImpl) r).rawChannel();
+    }
+
+    private class TestServer extends Thread { // Powered by Slowpokes
+
+        private final ServerSocket server;
+
+        TestServer(ServerSocket server) throws IOException {
+            this.server = server;
+        }
+
+        @Override
+        public void run() {
+            try (Socket s = server.accept()) {
+                InputStream is = s.getInputStream();
+                OutputStream os = s.getOutputStream();
+
+                processHttp(is, os);
+
+                Thread reader = new Thread(() -> {
+                    try {
+                        long n = readSlowly(is);
+                        print("Server read %s bytes", n);
+                        serverRead.addAndGet(n);
+                        s.shutdownInput();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                });
+
+                Thread writer = new Thread(() -> {
+                    try {
+                        long n = writeSlowly(os);
+                        print("Server written %s bytes", n);
+                        serverWritten.addAndGet(n);
+                        s.shutdownOutput();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                });
+
+                reader.start();
+                writer.start();
+
+                reader.join();
+                writer.join();
+            } catch (Exception e) {
+                e.printStackTrace();
+            } finally {
+                exit.countDown();
+            }
+        }
+
+        private void processHttp(InputStream is, OutputStream os)
+                throws IOException
+        {
+            os.write("HTTP/1.1 200 OK\r\nContent-length: 0\r\n\r\n".getBytes());
+            byte[] buf = new byte[1024];
+            String s = "";
+            while (true) {
+                int n = is.read(buf);
+                if (n <= 0) {
+                    throw new RuntimeException("Unexpected end of request");
+                }
+                s = s + new String(buf, 0, n);
+                if (s.contains("\r\n\r\n")) {
+                    break;
+                }
+            }
+        }
+
+        private long writeSlowly(OutputStream os) throws Exception {
+            byte[] first = byteArrayOfSize(1024);
+            long total = first.length;
+            os.write(first);
+            // Let's wait for the signal from the raw channel that its read has
+            // stalled, and then continue sending a bit more stuff
+            readStall.await();
+            for (int i = 0; i < 32; i++) {
+                byte[] b = byteArrayOfSize(1024);
+                os.write(b);
+                total += b.length;
+                TimeUnit.MILLISECONDS.sleep(1);
+            }
+            return total;
+        }
+
+        private long readSlowly(InputStream is) throws Exception {
+            // Wait for the raw channel to fill up the its send buffer
+            writeStall.await();
+            long overall = 0;
+            byte[] array = new byte[1024];
+            for (int n = 0; n != -1; n = is.read(array)) {
+                TimeUnit.MILLISECONDS.sleep(1);
+                overall += n;
+            }
+            return overall;
+        }
+    }
+
+    private static void print(String format, Object... args) {
+        System.out.println(Thread.currentThread() + ": " + String.format(format, args));
+    }
+
+    private static byte[] byteArrayOfSize(int bound) {
+        return new byte[new Random().nextInt(1 + bound)];
+    }
+}