http-client-branch: (WebSocket) impl change http-client-branch
authorprappo
Wed, 07 Mar 2018 17:16:28 +0000
branchhttp-client-branch
changeset 56263 4933a477d628
parent 56262 d818a6a8295a
child 56264 c012b93297b0
http-client-branch: (WebSocket) impl change
src/java.net.http/share/classes/java/net/http/WebSocket.java
src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java
src/java.net.http/share/classes/jdk/internal/net/http/websocket/Frame.java
src/java.net.http/share/classes/jdk/internal/net/http/websocket/FrameConsumer.java
src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageDecoder.java
src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageEncoder.java
src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageQueue.java
src/java.net.http/share/classes/jdk/internal/net/http/websocket/OutgoingMessage.java
src/java.net.http/share/classes/jdk/internal/net/http/websocket/Transport.java
src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportFactory.java
src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportFactoryImpl.java
src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java
src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java
test/jdk/java/net/httpclient/websocket/DummyWebSocketServer.java
test/jdk/java/net/httpclient/websocket/Frame.java
test/jdk/java/net/httpclient/websocket/MessageQueueTestDriver.java
test/jdk/java/net/httpclient/websocket/WebSocketImplDriver.java
test/jdk/java/net/httpclient/websocket/WebSocketTest.java
test/jdk/java/net/httpclient/websocket/WebSocketTextTest.java
test/jdk/java/net/httpclient/websocket/java.net.http/jdk/internal/net/http/websocket/MessageQueueTest.java
test/jdk/java/net/httpclient/websocket/java.net.http/jdk/internal/net/http/websocket/WebSocketImplTest.java
--- a/src/java.net.http/share/classes/java/net/http/WebSocket.java	Wed Mar 07 15:39:25 2018 +0000
+++ b/src/java.net.http/share/classes/java/net/http/WebSocket.java	Wed Mar 07 17:16:28 2018 +0000
@@ -508,8 +508,6 @@
      * <p> A {@code CompletableFuture} returned from this method can
      * complete exceptionally with:
      * <ul>
-     * <li> {@link IllegalArgumentException} -
-     *          if {@code message} is a malformed UTF-16 sequence
      * <li> {@link IllegalStateException} -
      *          if the previous Text or Binary message has not been sent yet
      *          or if a previous Binary message has been sent with
@@ -518,9 +516,9 @@
      *          if an I/O error occurs
      * </ul>
      *
-     * @implNote If a partial UTF-16 sequence is passed to this method, a
-     * {@code CompletableFuture} returned will complete exceptionally with
-     * {@code IOException}.
+     * @implNote If a partial or malformed UTF-16 sequence is passed to this
+     * method, a {@code CompletableFuture} returned will complete exceptionally
+     * with {@code IOException}.
      *
      * @param message
      *         the message
@@ -632,7 +630,7 @@
      * complete exceptionally with:
      * <ul>
      * <li> {@link IllegalArgumentException} -
-     *          if {@code statusCode} or {@code reason} are illegal
+     *          if {@code statusCode} is illegal
      * <li> {@link IOException} -
      *          if an I/O error occurs
      * </ul>
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java	Wed Mar 07 15:39:25 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java	Wed Mar 07 17:16:28 2018 +0000
@@ -373,7 +373,7 @@
                 NetProperties.getInteger(name, defaultValue));
     }
 
-    static String getNetProperty(String name) {
+    public static String getNetProperty(String name) {
         return AccessController.doPrivileged((PrivilegedAction<String>) () ->
                 NetProperties.get(name));
     }
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/Frame.java	Wed Mar 07 15:39:25 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/Frame.java	Wed Mar 07 17:16:28 2018 +0000
@@ -40,6 +40,7 @@
     private Frame() { }
 
     static final int MAX_HEADER_SIZE_BYTES = 2 + 8 + 4;
+    static final int MAX_CONTROL_FRAME_PAYLOAD_LENGTH = 125;
 
     enum Opcode {
 
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/FrameConsumer.java	Wed Mar 07 15:39:25 2018 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,288 +0,0 @@
-/*
- * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.internal.net.http.websocket;
-
-import java.net.http.WebSocket.MessagePart;
-import jdk.internal.net.http.websocket.Frame.Opcode;
-
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.charset.CharacterCodingException;
-
-import static java.lang.String.format;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.Objects.requireNonNull;
-import static jdk.internal.net.http.common.Utils.dump;
-import static jdk.internal.net.http.websocket.StatusCodes.NO_STATUS_CODE;
-import static jdk.internal.net.http.websocket.StatusCodes.isLegalToReceiveFromServer;
-
-/*
- * Consumes frame parts and notifies a message consumer, when there is
- * sufficient data to produce a message, or part thereof.
- *
- * Data consumed but not yet translated is accumulated until it's sufficient to
- * form a message.
- */
-/* Non-final for testing purposes only */
-class FrameConsumer implements Frame.Consumer {
-
-    private final static boolean DEBUG = false;
-    private final MessageStreamConsumer output;
-    private final UTF8AccumulatingDecoder decoder = new UTF8AccumulatingDecoder();
-    private boolean fin;
-    private Opcode opcode, originatingOpcode;
-    private MessagePart part = MessagePart.WHOLE;
-    private long payloadLen;
-    private long unconsumedPayloadLen;
-    private ByteBuffer binaryData;
-
-    FrameConsumer(MessageStreamConsumer output) {
-        this.output = requireNonNull(output);
-    }
-
-    /* Exposed for testing purposes only */
-    MessageStreamConsumer getOutput() {
-        return output;
-    }
-
-    @Override
-    public void fin(boolean value) {
-        if (DEBUG) {
-            System.out.printf("Reading fin: %s%n", value);
-        }
-        fin = value;
-    }
-
-    @Override
-    public void rsv1(boolean value) {
-        if (DEBUG) {
-            System.out.printf("Reading rsv1: %s%n", value);
-        }
-        if (value) {
-            throw new FailWebSocketException("Unexpected rsv1 bit");
-        }
-    }
-
-    @Override
-    public void rsv2(boolean value) {
-        if (DEBUG) {
-            System.out.printf("Reading rsv2: %s%n", value);
-        }
-        if (value) {
-            throw new FailWebSocketException("Unexpected rsv2 bit");
-        }
-    }
-
-    @Override
-    public void rsv3(boolean value) {
-        if (DEBUG) {
-            System.out.printf("Reading rsv3: %s%n", value);
-        }
-        if (value) {
-            throw new FailWebSocketException("Unexpected rsv3 bit");
-        }
-    }
-
-    @Override
-    public void opcode(Opcode v) {
-        if (DEBUG) {
-            System.out.printf("Reading opcode: %s%n", v);
-        }
-        if (v == Opcode.PING || v == Opcode.PONG || v == Opcode.CLOSE) {
-            if (!fin) {
-                throw new FailWebSocketException("Fragmented control frame  " + v);
-            }
-            opcode = v;
-        } else if (v == Opcode.TEXT || v == Opcode.BINARY) {
-            if (originatingOpcode != null) {
-                throw new FailWebSocketException(
-                        format("Unexpected frame %s (fin=%s)", v, fin));
-            }
-            opcode = v;
-            if (!fin) {
-                originatingOpcode = v;
-            }
-        } else if (v == Opcode.CONTINUATION) {
-            if (originatingOpcode == null) {
-                throw new FailWebSocketException(
-                        format("Unexpected frame %s (fin=%s)", v, fin));
-            }
-            opcode = v;
-        } else {
-            throw new FailWebSocketException("Unexpected opcode " + v);
-        }
-    }
-
-    @Override
-    public void mask(boolean value) {
-        if (DEBUG) {
-            System.out.printf("Reading mask: %s%n", value);
-        }
-        if (value) {
-            throw new FailWebSocketException("Masked frame received");
-        }
-    }
-
-    @Override
-    public void payloadLen(long value) {
-        if (DEBUG) {
-            System.out.printf("Reading payloadLen: %s%n", value);
-        }
-        if (opcode.isControl()) {
-            if (value > 125) {
-                throw new FailWebSocketException(
-                        format("%s's payload length %s", opcode, value));
-            }
-            assert Opcode.CLOSE.isControl();
-            if (opcode == Opcode.CLOSE && value == 1) {
-                throw new FailWebSocketException("Incomplete status code");
-            }
-        }
-        payloadLen = value;
-        unconsumedPayloadLen = value;
-    }
-
-    @Override
-    public void maskingKey(int value) {
-        // `FrameConsumer.mask(boolean)` is where a masked frame is detected and
-        // reported on; `FrameConsumer.mask(boolean)` MUST be invoked before
-        // this method;
-        // So this method (`maskingKey`) is not supposed to be invoked while
-        // reading a frame that has came from the server. If this method is
-        // invoked, then it's an error in implementation, thus InternalError
-        throw new InternalError();
-    }
-
-    @Override
-    public void payloadData(ByteBuffer data) {
-        if (DEBUG) {
-            System.out.printf("Reading payloadData: %s%n", data);
-        }
-        unconsumedPayloadLen -= data.remaining();
-        boolean isLast = unconsumedPayloadLen == 0;
-        if (opcode.isControl()) {
-            if (binaryData != null) { // An intermediate or the last chunk
-                binaryData.put(data);
-            } else if (!isLast) { // The first chunk
-                int remaining = data.remaining();
-                // It shouldn't be 125, otherwise the next chunk will be of size
-                // 0, which is not what Reader promises to deliver (eager
-                // reading)
-                assert remaining < 125 : dump(remaining);
-                binaryData = ByteBuffer.allocate(125).put(data);
-            } else { // The only chunk
-                binaryData = ByteBuffer.allocate(data.remaining()).put(data);
-            }
-        } else {
-            part = determinePart(isLast);
-            boolean text = opcode == Opcode.TEXT || originatingOpcode == Opcode.TEXT;
-            if (!text) {
-                output.onBinary(data.slice(), part);
-                data.position(data.limit()); // Consume
-            } else {
-                boolean binaryNonEmpty = data.hasRemaining();
-                CharBuffer textData;
-                try {
-                    textData = decoder.decode(data, part == MessagePart.WHOLE || part == MessagePart.LAST);
-                } catch (CharacterCodingException e) {
-                    throw new FailWebSocketException(
-                            "Invalid UTF-8 in frame " + opcode, StatusCodes.NOT_CONSISTENT)
-                            .initCause(e);
-                }
-                if (!(binaryNonEmpty && !textData.hasRemaining())) {
-                    // If there's a binary data, that result in no text, then we
-                    // don't deliver anything
-                    output.onText(textData, part);
-                }
-            }
-        }
-    }
-
-    @Override
-    public void endFrame() {
-        if (DEBUG) {
-            System.out.println("End frame");
-        }
-        if (opcode.isControl()) {
-            binaryData.flip();
-        }
-        switch (opcode) {
-            case CLOSE:
-                char statusCode = NO_STATUS_CODE;
-                String reason = "";
-                if (payloadLen != 0) {
-                    int len = binaryData.remaining();
-                    assert 2 <= len && len <= 125 : dump(len, payloadLen);
-                    statusCode = binaryData.getChar();
-                    if (!isLegalToReceiveFromServer(statusCode)) {
-                        throw new FailWebSocketException(
-                                "Illegal status code: " + statusCode);
-                    }
-                    try {
-                        reason = UTF_8.newDecoder().decode(binaryData).toString();
-                    } catch (CharacterCodingException e) {
-                        throw new FailWebSocketException("Illegal close reason")
-                                .initCause(e);
-                    }
-                }
-                output.onClose(statusCode, reason);
-                break;
-            case PING:
-                output.onPing(binaryData);
-                binaryData = null;
-                break;
-            case PONG:
-                output.onPong(binaryData);
-                binaryData = null;
-                break;
-            default:
-                assert opcode == Opcode.TEXT || opcode == Opcode.BINARY
-                        || opcode == Opcode.CONTINUATION : dump(opcode);
-                if (fin) {
-                    // It is always the last chunk:
-                    // either TEXT(FIN=TRUE)/BINARY(FIN=TRUE) or CONT(FIN=TRUE)
-                    originatingOpcode = null;
-                }
-                break;
-        }
-        payloadLen = 0;
-        opcode = null;
-    }
-
-    private MessagePart determinePart(boolean isLast) {
-        boolean lastChunk = fin && isLast;
-        switch (part) {
-            case LAST:
-            case WHOLE:
-                return lastChunk ? MessagePart.WHOLE : MessagePart.FIRST;
-            case FIRST:
-            case PART:
-                return lastChunk ? MessagePart.LAST : MessagePart.PART;
-            default:
-                throw new InternalError(String.valueOf(part));
-        }
-    }
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageDecoder.java	Wed Mar 07 17:16:28 2018 +0000
@@ -0,0 +1,294 @@
+/*
+ * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.internal.net.http.websocket;
+
+import jdk.internal.net.http.websocket.Frame.Opcode;
+
+import java.net.http.WebSocket.MessagePart;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+
+import static java.lang.String.format;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Objects.requireNonNull;
+import static jdk.internal.net.http.common.Utils.dump;
+import static jdk.internal.net.http.websocket.StatusCodes.NO_STATUS_CODE;
+import static jdk.internal.net.http.websocket.StatusCodes.isLegalToReceiveFromServer;
+
+/*
+ * Consumes frame parts and notifies a message consumer, when there is
+ * sufficient data to produce a message, or part thereof.
+ *
+ * Data consumed but not yet translated is accumulated until it's sufficient to
+ * form a message.
+ */
+/* Non-final for testing purposes only */
+class MessageDecoder implements Frame.Consumer {
+
+    private final static boolean DEBUG = false;
+    private final MessageStreamConsumer output;
+    private final UTF8AccumulatingDecoder decoder = new UTF8AccumulatingDecoder();
+    private boolean fin;
+    private Opcode opcode, originatingOpcode;
+    private MessagePart part = MessagePart.WHOLE;
+    private long payloadLen;
+    private long unconsumedPayloadLen;
+    private ByteBuffer binaryData;
+
+    MessageDecoder(MessageStreamConsumer output) {
+        this.output = requireNonNull(output);
+    }
+
+    /* Exposed for testing purposes only */
+    MessageStreamConsumer getOutput() {
+        return output;
+    }
+
+    @Override
+    public void fin(boolean value) {
+        if (DEBUG) {
+            System.out.printf("[Input] fin %s%n", value);
+        }
+        fin = value;
+    }
+
+    @Override
+    public void rsv1(boolean value) {
+        if (DEBUG) {
+            System.out.printf("[Input] rsv1 %s%n", value);
+        }
+        if (value) {
+            throw new FailWebSocketException("Unexpected rsv1 bit");
+        }
+    }
+
+    @Override
+    public void rsv2(boolean value) {
+        if (DEBUG) {
+            System.out.printf("[Input] rsv2 %s%n", value);
+        }
+        if (value) {
+            throw new FailWebSocketException("Unexpected rsv2 bit");
+        }
+    }
+
+    @Override
+    public void rsv3(boolean value) {
+        if (DEBUG) {
+            System.out.printf("[Input] rsv3 %s%n", value);
+        }
+        if (value) {
+            throw new FailWebSocketException("Unexpected rsv3 bit");
+        }
+    }
+
+    @Override
+    public void opcode(Opcode v) {
+        if (DEBUG) {
+            System.out.printf("[Input] opcode %s%n", v);
+        }
+        if (v == Opcode.PING || v == Opcode.PONG || v == Opcode.CLOSE) {
+            if (!fin) {
+                throw new FailWebSocketException("Fragmented control frame  " + v);
+            }
+            opcode = v;
+        } else if (v == Opcode.TEXT || v == Opcode.BINARY) {
+            if (originatingOpcode != null) {
+                throw new FailWebSocketException(
+                        format("Unexpected frame %s (fin=%s)", v, fin));
+            }
+            opcode = v;
+            if (!fin) {
+                originatingOpcode = v;
+            }
+        } else if (v == Opcode.CONTINUATION) {
+            if (originatingOpcode == null) {
+                throw new FailWebSocketException(
+                        format("Unexpected frame %s (fin=%s)", v, fin));
+            }
+            opcode = v;
+        } else {
+            throw new FailWebSocketException("Unexpected opcode " + v);
+        }
+    }
+
+    @Override
+    public void mask(boolean value) {
+        if (DEBUG) {
+            System.out.printf("[Input] mask %s%n", value);
+        }
+        if (value) {
+            throw new FailWebSocketException("Masked frame received");
+        }
+    }
+
+    @Override
+    public void payloadLen(long value) {
+        if (DEBUG) {
+            System.out.printf("[Input] payloadLen %s%n", value);
+        }
+        if (opcode.isControl()) {
+            if (value > Frame.MAX_CONTROL_FRAME_PAYLOAD_LENGTH) {
+                throw new FailWebSocketException(
+                        format("%s's payload length %s", opcode, value));
+            }
+            assert Opcode.CLOSE.isControl();
+            if (opcode == Opcode.CLOSE && value == 1) {
+                throw new FailWebSocketException("Incomplete status code");
+            }
+        }
+        payloadLen = value;
+        unconsumedPayloadLen = value;
+    }
+
+    @Override
+    public void maskingKey(int value) {
+        // `MessageDecoder.mask(boolean)` is where a masked frame is detected and
+        // reported on; `MessageDecoder.mask(boolean)` MUST be invoked before
+        // this method;
+        // So this method (`maskingKey`) is not supposed to be invoked while
+        // reading a frame that has came from the server. If this method is
+        // invoked, then it's an error in implementation, thus InternalError
+        throw new InternalError();
+    }
+
+    @Override
+    public void payloadData(ByteBuffer data) {
+        if (DEBUG) {
+            System.out.printf("[Input] payload %s%n", data);
+        }
+        unconsumedPayloadLen -= data.remaining();
+        boolean isLast = unconsumedPayloadLen == 0;
+        if (opcode.isControl()) {
+            if (binaryData != null) { // An intermediate or the last chunk
+                binaryData.put(data);
+            } else if (!isLast) { // The first chunk
+                int remaining = data.remaining();
+                // It shouldn't be 125, otherwise the next chunk will be of size
+                // 0, which is not what Reader promises to deliver (eager
+                // reading)
+                assert remaining < Frame.MAX_CONTROL_FRAME_PAYLOAD_LENGTH
+                        : dump(remaining);
+                binaryData = ByteBuffer.allocate(
+                        Frame.MAX_CONTROL_FRAME_PAYLOAD_LENGTH).put(data);
+            } else { // The only chunk
+                binaryData = ByteBuffer.allocate(data.remaining()).put(data);
+            }
+        } else {
+            part = determinePart(isLast);
+            boolean text = opcode == Opcode.TEXT || originatingOpcode == Opcode.TEXT;
+            if (!text) {
+                output.onBinary(data.slice(), part);
+                data.position(data.limit()); // Consume
+            } else {
+                boolean binaryNonEmpty = data.hasRemaining();
+                CharBuffer textData;
+                try {
+                    boolean eof = part == MessagePart.WHOLE
+                            || part == MessagePart.LAST;
+                    textData = decoder.decode(data, eof);
+                } catch (CharacterCodingException e) {
+                    throw new FailWebSocketException(
+                            "Invalid UTF-8 in frame " + opcode,
+                            StatusCodes.NOT_CONSISTENT).initCause(e);
+                }
+                if (!(binaryNonEmpty && !textData.hasRemaining())) {
+                    // If there's a binary data, that result in no text, then we
+                    // don't deliver anything
+                    output.onText(textData, part);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void endFrame() {
+        if (DEBUG) {
+            System.out.println("[Input] end frame");
+        }
+        if (opcode.isControl()) {
+            binaryData.flip();
+        }
+        switch (opcode) {
+            case CLOSE:
+                char statusCode = NO_STATUS_CODE;
+                String reason = "";
+                if (payloadLen != 0) {
+                    int len = binaryData.remaining();
+                    assert 2 <= len
+                            && len <= Frame.MAX_CONTROL_FRAME_PAYLOAD_LENGTH
+                            : dump(len, payloadLen);
+                    statusCode = binaryData.getChar();
+                    if (!isLegalToReceiveFromServer(statusCode)) {
+                        throw new FailWebSocketException(
+                                "Illegal status code: " + statusCode);
+                    }
+                    try {
+                        reason = UTF_8.newDecoder().decode(binaryData).toString();
+                    } catch (CharacterCodingException e) {
+                        throw new FailWebSocketException("Illegal close reason")
+                                .initCause(e);
+                    }
+                }
+                output.onClose(statusCode, reason);
+                break;
+            case PING:
+                output.onPing(binaryData);
+                binaryData = null;
+                break;
+            case PONG:
+                output.onPong(binaryData);
+                binaryData = null;
+                break;
+            default:
+                assert opcode == Opcode.TEXT || opcode == Opcode.BINARY
+                        || opcode == Opcode.CONTINUATION : dump(opcode);
+                if (fin) {
+                    // It is always the last chunk:
+                    // either TEXT(FIN=TRUE)/BINARY(FIN=TRUE) or CONT(FIN=TRUE)
+                    originatingOpcode = null;
+                }
+                break;
+        }
+        payloadLen = 0;
+        opcode = null;
+    }
+
+    private MessagePart determinePart(boolean isLast) {
+        boolean lastChunk = fin && isLast;
+        switch (part) {
+            case LAST:
+            case WHOLE:
+                return lastChunk ? MessagePart.WHOLE : MessagePart.FIRST;
+            case FIRST:
+            case PART:
+                return lastChunk ? MessagePart.LAST : MessagePart.PART;
+            default:
+                throw new InternalError(String.valueOf(part));
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageEncoder.java	Wed Mar 07 17:16:28 2018 +0000
@@ -0,0 +1,416 @@
+/*
+ * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.internal.net.http.websocket;
+
+import jdk.internal.net.http.common.Utils;
+import jdk.internal.net.http.websocket.Frame.Opcode;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.CoderResult;
+import java.nio.charset.CodingErrorAction;
+import java.nio.charset.StandardCharsets;
+import java.security.SecureRandom;
+
+/*
+ * A stateful producer of binary representations of WebSocket messages being
+ * sent from the client to the server.
+ *
+ * An encoding methods are given original messages and byte buffers to put the
+ * resulting bytes to.
+ *
+ * The method is called
+ * repeatedly with a non-empty target buffer. Once the caller finds the buffer
+ * unmodified after the call returns, the message has been completely encoded.
+ */
+
+/*
+ * The state of encoding.An instance of this class is passed sequentially between messages, so
+ * every message in a sequence can check the context it is in and update it
+ * if necessary.
+ */
+
+public class MessageEncoder {
+
+    // FIXME: write frame method
+
+    private final static boolean DEBUG = false;
+
+    private final SecureRandom maskingKeySource = new SecureRandom();
+    private final Frame.HeaderWriter headerWriter = new Frame.HeaderWriter();
+    private final Frame.Masker payloadMasker = new Frame.Masker();
+    private final CharsetEncoder charsetEncoder
+            = StandardCharsets.UTF_8.newEncoder()
+            .onMalformedInput(CodingErrorAction.REPORT)
+            .onUnmappableCharacter(CodingErrorAction.REPORT);
+    /*
+     * This buffer is used both to encode characters to UTF-8 and to calculate
+     * the length of the resulting frame's payload. The length of the payload
+     * must be known before the frame's header can be written.
+     * For implementation reasons, this buffer must have a capacity of at least
+     * the maximum size of a Close frame payload, which is 125 bytes
+     * (or Frame.MAX_CONTROL_FRAME_PAYLOAD_LENGTH).
+     */
+    private final ByteBuffer intermediateBuffer = createIntermediateBuffer(
+            Frame.MAX_CONTROL_FRAME_PAYLOAD_LENGTH);
+    private final ByteBuffer headerBuffer = ByteBuffer.allocate(
+            Frame.MAX_HEADER_SIZE_BYTES);
+
+    private boolean started;
+    private boolean flushing;
+    private boolean moreText = true;
+    private long headerCount;
+    private boolean previousLast = true;
+    private boolean previousText;
+    private boolean closed;
+
+    /*
+     * How many bytes of the current message have been already encoded.
+     *
+     * Even though the user hands their buffers over to us, they still can
+     * manipulate these buffers while we are getting data out of them.
+     * The number of produced bytes guards us from such behaviour in the
+     * case of messages that must be restricted in size (Ping, Pong and Close).
+     * For other messages this measure provides a best-effort attempt to detect
+     * concurrent changes to buffer.
+     *
+     * Making a shallow copy (duplicate/slice) and then checking the size
+     * precondition on it would also solve the problem, but at the cost of this
+     * extra copy.
+     */
+    private int actualLen;
+
+    /*
+     * How many bytes were originally there in the message, before the encoding
+     * started.
+     */
+    private int expectedLen;
+
+    /* Exposed for testing purposes */
+    protected ByteBuffer createIntermediateBuffer(int minSize) {
+        int capacity = Utils.getIntegerNetProperty(
+                "jdk.httpclient.websocket.intermediateBufferSize", 16384);
+        return ByteBuffer.allocate(Math.max(minSize, capacity));
+    }
+
+    public void reset() {
+        // Do not reset the message stream state fields, e.g. previousLast,
+        // previousText. Just an individual message state:
+        started = false;
+        flushing = false;
+        moreText = true;
+        headerCount = 0;
+        actualLen = 0;
+    }
+
+    /*
+     * Encodes text messages by cutting them into fragments of maximum size of
+     * intermediateBuffer.capacity()
+     */
+    public boolean encodeText(CharBuffer src, boolean last, ByteBuffer dst)
+            throws IOException
+    {
+        if (DEBUG) {
+            System.out.printf("[Output] encodeText src.remaining()=%s, %s, %s%n",
+                              src.remaining(), last, dst);
+        }
+        if (closed) {
+            throw new IOException("Output closed");
+        }
+        if (!started) {
+            if (!previousText && !previousLast) {
+                // Previous data message was a partial binary message
+                throw new IllegalStateException("Unexpected text message");
+            }
+            started = true;
+            headerBuffer.position(0).limit(0);
+            intermediateBuffer.position(0).limit(0);
+            charsetEncoder.reset();
+        }
+        while (true) {
+            if (DEBUG) {
+                System.out.printf("[Output] put%n");
+            }
+            if (!putAvailable(headerBuffer, dst)) {
+                return false;
+            }
+            if (DEBUG) {
+                System.out.printf("[Output] mask%n");
+            }
+            if (maskAvailable(intermediateBuffer, dst) < 0) {
+                return false;
+            }
+            if (DEBUG) {
+                System.out.printf("[Output] moreText%n");
+            }
+            if (!moreText) {
+                return true;
+            }
+            intermediateBuffer.clear();
+            CoderResult r = null;
+            if (!flushing) {
+                r = charsetEncoder.encode(src, intermediateBuffer, true);
+                if (r.isUnderflow()) {
+                    flushing = true;
+                }
+            }
+            if (flushing) {
+                r = charsetEncoder.flush(intermediateBuffer);
+                if (r.isUnderflow()) {
+                    moreText = false;
+                }
+            }
+            if (r.isError()) {
+                try {
+                    r.throwException();
+                } catch (CharacterCodingException e) {
+                    throw new IOException("Malformed text message", e);
+                }
+            }
+            if (DEBUG) {
+                System.out.printf("[Output] header #%s%n", headerCount);
+            }
+            if (headerCount == 0) { // set once
+                previousLast = last;
+                previousText = true;
+            }
+            intermediateBuffer.flip();
+            headerBuffer.clear();
+            int mask = maskingKeySource.nextInt();
+            Opcode opcode = previousLast && headerCount == 0
+                    ? Opcode.TEXT : Opcode.CONTINUATION;
+            if (DEBUG) {
+                System.out.printf("[Output] opcode %s%n", opcode);
+            }
+            headerWriter.fin(last && !moreText)
+                    .opcode(opcode)
+                    .payloadLen(intermediateBuffer.remaining())
+                    .mask(mask)
+                    .write(headerBuffer);
+            headerBuffer.flip();
+            headerCount++;
+            payloadMasker.mask(mask);
+        }
+    }
+
+    private boolean putAvailable(ByteBuffer src, ByteBuffer dst) {
+        int available = dst.remaining();
+        if (available >= src.remaining()) {
+            dst.put(src);
+            return true;
+        } else {
+            int lim = src.limit();                   // save the limit
+            src.limit(src.position() + available);
+            dst.put(src);
+            src.limit(lim);                          // restore the limit
+            return false;
+        }
+    }
+
+    public boolean encodeBinary(ByteBuffer src, boolean last, ByteBuffer dst)
+            throws IOException
+    {
+        if (DEBUG) {
+            System.out.printf("[Output] encodeBinary %s, %s, %s%n",
+                              src, last, dst);
+        }
+        if (closed) {
+            throw new IOException("Output closed");
+        }
+        if (!started) {
+            if (previousText && !previousLast) {
+                // Previous data message was a partial text message
+                throw new IllegalStateException("Unexpected binary message");
+            }
+            expectedLen = src.remaining();
+            int mask = maskingKeySource.nextInt();
+            headerBuffer.clear();
+            headerWriter.fin(last)
+                    .opcode(previousLast ? Opcode.BINARY : Opcode.CONTINUATION)
+                    .payloadLen(expectedLen)
+                    .mask(mask)
+                    .write(headerBuffer);
+            headerBuffer.flip();
+            payloadMasker.mask(mask);
+            previousLast = last;
+            previousText = false;
+            started = true;
+        }
+        if (!putAvailable(headerBuffer, dst)) {
+            return false;
+        }
+        int count = maskAvailable(src, dst);
+        actualLen += Math.abs(count);
+        if (count >= 0 && actualLen != expectedLen) {
+            throw new IOException("Concurrent message modification");
+        }
+        return count >= 0;
+    }
+
+    private int maskAvailable(ByteBuffer src, ByteBuffer dst) {
+        int r0 = dst.remaining();
+        payloadMasker.transferMasking(src, dst);
+        int masked = r0 - dst.remaining();
+        return src.hasRemaining() ? -masked : masked;
+    }
+
+    public boolean encodePing(ByteBuffer src, ByteBuffer dst)
+            throws IOException
+    {
+        if (closed) {
+            throw new IOException("Output closed");
+        }
+        if (DEBUG) System.out.printf("[Output] encodePing %s, %s%n", src, dst);
+        if (!started) {
+            expectedLen = src.remaining();
+            if (expectedLen > Frame.MAX_CONTROL_FRAME_PAYLOAD_LENGTH) {
+                throw new IllegalArgumentException("Long message: " + expectedLen);
+            }
+            int mask = maskingKeySource.nextInt();
+            headerBuffer.clear();
+            headerWriter.fin(true)
+                    .opcode(Opcode.PING)
+                    .payloadLen(expectedLen)
+                    .mask(mask)
+                    .write(headerBuffer);
+            headerBuffer.flip();
+            payloadMasker.mask(mask);
+            started = true;
+        }
+        if (!putAvailable(headerBuffer, dst)) {
+            return false;
+        }
+        int count = maskAvailable(src, dst);
+        actualLen += Math.abs(count);
+        if (count >= 0 && actualLen != expectedLen) {
+            throw new IOException("Concurrent message modification");
+        }
+        return count >= 0;
+    }
+
+    public boolean encodePong(ByteBuffer src, ByteBuffer dst)
+            throws IOException
+    {
+        if (closed) {
+            throw new IOException("Output closed");
+        }
+        if (DEBUG) {
+            System.out.printf("[Output] encodePong %s, %s%n",
+                              src, dst);
+        }
+        if (!started) {
+            expectedLen = src.remaining();
+            if (expectedLen > Frame.MAX_CONTROL_FRAME_PAYLOAD_LENGTH) {
+                throw new IllegalArgumentException("Long message: " + expectedLen);
+            }
+            int mask = maskingKeySource.nextInt();
+            headerBuffer.clear();
+            headerWriter.fin(true)
+                    .opcode(Opcode.PONG)
+                    .payloadLen(expectedLen)
+                    .mask(mask)
+                    .write(headerBuffer);
+            headerBuffer.flip();
+            payloadMasker.mask(mask);
+            started = true;
+        }
+        if (!putAvailable(headerBuffer, dst)) {
+            return false;
+        }
+        int count = maskAvailable(src, dst);
+        actualLen += Math.abs(count);
+        if (count >= 0 && actualLen != expectedLen) {
+            throw new IOException("Concurrent message modification");
+        }
+        return count >= 0;
+    }
+
+    public boolean encodeClose(int statusCode, CharBuffer reason, ByteBuffer dst)
+            throws IOException
+    {
+        if (DEBUG) {
+            System.out.printf("[Output] encodeClose %s, reason.length=%s, %s%n",
+                              statusCode, reason.length(), dst);
+        }
+        if (closed) {
+            throw new IOException("Output closed");
+        }
+        if (!started) {
+            if (DEBUG) {
+                System.out.printf("[Output] reason size %s%n", reason.remaining());
+            }
+            intermediateBuffer.position(0).limit(Frame.MAX_CONTROL_FRAME_PAYLOAD_LENGTH);
+            intermediateBuffer.putChar((char) statusCode);
+            CoderResult r = charsetEncoder.reset().encode(reason, intermediateBuffer, true);
+            if (r.isUnderflow()) {
+                if (DEBUG) {
+                    System.out.printf("[Output] flushing%n");
+                }
+                r = charsetEncoder.flush(intermediateBuffer);
+            }
+            if (DEBUG) {
+                System.out.printf("[Output] encoding result: %s%n", r);
+            }
+            if (r.isError()) {
+                try {
+                    r.throwException();
+                } catch (CharacterCodingException e) {
+                    throw new IllegalArgumentException("Malformed reason", e);
+                }
+            } else if (r.isOverflow()) {
+                // Here the 125 bytes size is ensured by the check for overflow
+                throw new IllegalArgumentException("Long reason");
+            } else if (!r.isUnderflow()) {
+                throw new InternalError(); // assertion
+            }
+            intermediateBuffer.flip();
+            headerBuffer.clear();
+            int mask = maskingKeySource.nextInt();
+            headerWriter.fin(true)
+                    .opcode(Opcode.CLOSE)
+                    .payloadLen(intermediateBuffer.remaining())
+                    .mask(mask)
+                    .write(headerBuffer);
+            headerBuffer.flip();
+            payloadMasker.mask(mask);
+            started = true;
+            closed = true;
+            if (DEBUG) {
+                System.out.printf("[Output] intermediateBuffer=%s%n",
+                                  intermediateBuffer);
+            }
+        }
+        if (!putAvailable(headerBuffer, dst)) {
+            return false;
+        }
+        return maskAvailable(intermediateBuffer, dst) >= 0;
+    }
+}
+
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageQueue.java	Wed Mar 07 17:16:28 2018 +0000
@@ -0,0 +1,371 @@
+/*
+ * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.internal.net.http.websocket;
+
+import jdk.internal.net.http.common.Utils;
+import jdk.internal.vm.annotation.Stable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+
+/*
+ * A FIFO message storage facility.
+ *
+ * The queue supports at most one consumer and an arbitrary number of producers.
+ * Methods `peek`, `remove` and `isEmpty` must not be invoked concurrently.
+ * Methods `addText`, `addBinary`, `addPing`, `addPong` and `addClose` may be
+ * invoked concurrently.
+ *
+ * This queue is of a bounded size. The queue pre-allocates array of the said
+ * size and fills it with `Message` elements. The resulting structure never
+ * changes. This allows to avoid re-allocation and garbage collection of
+ * elements and arrays thereof. For this reason `Message` elements are never
+ * returned from the `peek` method. Instead their components passed to the
+ * provided callback.
+ *
+ * The queue consists of:
+ *
+ *   - a ring array of n + 1 `Message` elements
+ *   - indexes H and T denoting the head and the tail elements of the queue
+ *     respectively
+ *
+ * Each `Message` element contains a boolean flag. This flag is an auxiliary
+ * communication between the producers and the consumer. The flag shows
+ * whether or not the element is ready to be consumed (peeked at, removed). The
+ * flag is required since updating an element involves many fields and thus is
+ * not an atomic action. An addition to the queue happens in two steps:
+ *
+ * # Step 1
+ *
+ * Producers race with each other to secure an index for the element they add.
+ * T is atomically advanced [1] only if the advanced value doesn't equal to H
+ * (a producer doesn't bump into the head of the queue).
+ *
+ * # Step 2
+ *
+ * Once T is advanced in the previous step, the producer updates the message
+ * fields of the element at the previous value of T and then sets the flag of
+ * this element.
+ *
+ * A removal happens in a single step. The consumer gets the element at index H.
+ * If the flag of this element is set, the consumer clears the fields of the
+ * element, clears the flag and finally advances H.
+ *
+ * ----------------------------------------------------------------------------
+ * [1] To advance the index is to change it from i to (i + 1) % (n + 1).
+ */
+public class MessageQueue {
+
+    private final static boolean DEBUG = false;
+
+    @Stable
+    private final Message[] elements;
+
+    private final AtomicInteger tail = new AtomicInteger();
+    private volatile int head;
+
+    public MessageQueue() {
+        this(defaultSize());
+    }
+
+    /* Exposed for testing */
+    protected MessageQueue(int size) {
+        if (size < 1) {
+            throw new IllegalArgumentException();
+        }
+        Message[] array = new Message[size + 1];
+        for (int i = 0; i < array.length; i++) {
+            array[i] = new Message();
+        }
+        elements = array;
+    }
+
+    private static int defaultSize() {
+        String property = "jdk.httpclient.websocket.outputQueueMaxSize";
+        int defaultSize = 128;
+        String value = Utils.getNetProperty(property);
+        int size;
+        if (value == null) {
+            size = defaultSize;
+        } else {
+            try {
+                size = Integer.parseUnsignedInt(value);
+            } catch (NumberFormatException ignored) {
+                size = defaultSize;
+            }
+        }
+        if (DEBUG) {
+            System.out.printf("[MessageQueue] %s=%s, using size %s%n",
+                              property, value, size);
+        }
+        return size;
+    }
+
+    public <T> void addText(CharBuffer message,
+                            boolean isLast,
+                            T attachment,
+                            BiConsumer<? super T, ? super Throwable> action,
+                            CompletableFuture<T> future)
+            throws IOException
+    {
+        add(MessageQueue.Type.TEXT, null, message, isLast, -1, attachment,
+            action, future);
+    }
+
+    private <T> void add(Type type,
+                         ByteBuffer binary,
+                         CharBuffer text,
+                         boolean isLast,
+                         int statusCode,
+                         T attachment,
+                         BiConsumer<? super T, ? super Throwable> action,
+                         CompletableFuture<? super T> future)
+            throws IOException
+    {
+        int h, currentTail, newTail;
+        do {
+            h = head;
+            currentTail = tail.get();
+            newTail = (currentTail + 1) % elements.length;
+            if (newTail == h) {
+                throw new IOException("Queue full");
+            }
+        } while (!tail.compareAndSet(currentTail, newTail));
+        Message t = elements[currentTail];
+        if (t.ready) {
+            throw new InternalError();
+        }
+        t.type = type;
+        t.binary = binary;
+        t.text = text;
+        t.isLast = isLast;
+        t.statusCode = statusCode;
+        t.attachment = attachment;
+        t.action = action;
+        t.future = future;
+        t.ready = true;
+    }
+
+    public <T> void addBinary(ByteBuffer message,
+                              boolean isLast,
+                              T attachment,
+                              BiConsumer<? super T, ? super Throwable> action,
+                              CompletableFuture<? super T> future)
+            throws IOException
+    {
+        add(MessageQueue.Type.BINARY, message, null, isLast, -1, attachment,
+            action, future);
+    }
+
+    public <T> void addPing(ByteBuffer message,
+                            T attachment,
+                            BiConsumer<? super T, ? super Throwable> action,
+                            CompletableFuture<? super T> future)
+            throws IOException
+    {
+        add(MessageQueue.Type.PING, message, null, false, -1, attachment,
+            action, future);
+    }
+
+    public <T> void addPong(ByteBuffer message,
+                            T attachment,
+                            BiConsumer<? super T, ? super Throwable> action,
+                            CompletableFuture<? super T> future)
+            throws IOException
+    {
+        add(MessageQueue.Type.PONG, message, null, false, -1, attachment,
+            action, future);
+    }
+
+    public <T> void addClose(int statusCode,
+                             CharBuffer reason,
+                             T attachment,
+                             BiConsumer<? super T, ? super Throwable> action,
+                             CompletableFuture<? super T> future)
+            throws IOException
+    {
+        add(MessageQueue.Type.CLOSE, null, reason, false, statusCode,
+            attachment, action, future);
+    }
+
+    @SuppressWarnings("unchecked")
+    public <R, E extends Throwable> R peek(QueueCallback<R, E> callback)
+            throws E
+    {
+        Message h = elements[head];
+        if (!h.ready) {
+            return callback.onEmpty();
+        }
+        Type type = h.type;
+        switch (type) {
+            case TEXT:
+                try {
+                    return (R) callback.onText(h.text, h.isLast, h.attachment,
+                                               h.action, h.future);
+                } catch (Throwable t) {
+                    // Something unpleasant is going on here with the compiler.
+                    // If this seemingly useless catch is omitted, the compiler
+                    // reports an error:
+                    //
+                    //   java: unreported exception java.lang.Throwable;
+                    //   must be caught or declared to be thrown
+                    //
+                    // My guess is there is a problem with both the type
+                    // inference for the method AND @SuppressWarnings("unchecked")
+                    // being working at the same time.
+                    throw (E) t;
+                }
+            case BINARY:
+                try {
+                    return (R) callback.onBinary(h.binary, h.isLast, h.attachment,
+                                                 h.action, h.future);
+                } catch (Throwable t) {
+                    throw (E) t;
+                }
+            case PING:
+                try {
+                    return (R) callback.onPing(h.binary, h.attachment, h.action,
+                                               h.future);
+                } catch (Throwable t) {
+                    throw (E) t;
+                }
+            case PONG:
+                try {
+                    return (R) callback.onPong(h.binary, h.attachment, h.action,
+                                               h.future);
+                } catch (Throwable t) {
+                    throw (E) t;
+                }
+            case CLOSE:
+                try {
+                    return (R) callback.onClose(h.statusCode, h.text, h.attachment,
+                                                h.action, h.future);
+                } catch (Throwable t) {
+                    throw (E) t;
+                }
+            default:
+                throw new InternalError(String.valueOf(type));
+        }
+    }
+
+    public boolean isEmpty() {
+        return !elements[head].ready;
+    }
+
+    public void remove() {
+        int currentHead = head;
+        Message h = elements[currentHead];
+        if (!h.ready) {
+            throw new InternalError("Queue empty");
+        }
+        h.type = null;
+        h.binary = null;
+        h.text = null;
+        h.attachment = null;
+        h.action = null;
+        h.future = null;
+        h.ready = false;
+        head = (currentHead + 1) % elements.length;
+    }
+
+    private enum Type {
+
+        TEXT,
+        BINARY,
+        PING,
+        PONG,
+        CLOSE
+    }
+
+    /*
+     * A callback for consuming a queue element's fields. Can return a result of
+     * type T or throw an exception of type E. This design allows to avoid
+     * "returning" results or "throwing" errors by updating some objects from
+     * the outside of the methods.
+     */
+    public interface QueueCallback<R, E extends Throwable> {
+
+        <T> R onText(CharBuffer message,
+                     boolean isLast,
+                     T attachment,
+                     BiConsumer<? super T, ? super Throwable> action,
+                     CompletableFuture<? super T> future) throws E;
+
+        <T> R onBinary(ByteBuffer message,
+                       boolean isLast,
+                       T attachment,
+                       BiConsumer<? super T, ? super Throwable> action,
+                       CompletableFuture<? super T> future) throws E;
+
+        <T> R onPing(ByteBuffer message,
+                     T attachment,
+                     BiConsumer<? super T, ? super Throwable> action,
+                     CompletableFuture<? super T> future) throws E;
+
+        <T> R onPong(ByteBuffer message,
+                     T attachment,
+                     BiConsumer<? super T, ? super Throwable> action,
+                     CompletableFuture<? super T> future) throws E;
+
+        <T> R onClose(int statusCode,
+                      CharBuffer reason,
+                      T attachment,
+                      BiConsumer<? super T, ? super Throwable> action,
+                      CompletableFuture<? super T> future) throws E;
+
+        /* The queue is empty*/
+        R onEmpty() throws E;
+    }
+
+    /*
+     * A union of components of all WebSocket message types; also a node in a
+     * queue.
+     *
+     * A `Message` never leaves the context of the queue, thus the reference to
+     * it cannot be retained by anyone other than the queue.
+     */
+    private static class Message {
+
+        private volatile boolean ready;
+
+        // -- The source message fields --
+
+        private Type type;
+        private ByteBuffer binary;
+        private CharBuffer text;
+        private boolean isLast;
+        private int statusCode;
+        private Object attachment;
+        @SuppressWarnings("rawtypes")
+        private BiConsumer action;
+        @SuppressWarnings("rawtypes")
+        private CompletableFuture future;
+    }
+}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/OutgoingMessage.java	Wed Mar 07 15:39:25 2018 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,296 +0,0 @@
-/*
- * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.internal.net.http.websocket;
-
-import jdk.internal.net.http.websocket.Frame.Opcode;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.CharsetEncoder;
-import java.nio.charset.CoderResult;
-import java.security.SecureRandom;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.Objects.requireNonNull;
-import static jdk.internal.net.http.common.Utils.EMPTY_BYTEBUFFER;
-import static jdk.internal.net.http.websocket.Frame.MAX_HEADER_SIZE_BYTES;
-import static jdk.internal.net.http.websocket.Frame.Opcode.BINARY;
-import static jdk.internal.net.http.websocket.Frame.Opcode.CLOSE;
-import static jdk.internal.net.http.websocket.Frame.Opcode.CONTINUATION;
-import static jdk.internal.net.http.websocket.Frame.Opcode.PING;
-import static jdk.internal.net.http.websocket.Frame.Opcode.PONG;
-import static jdk.internal.net.http.websocket.Frame.Opcode.TEXT;
-
-/*
- * A stateful object that represents a WebSocket message being sent to the
- * channel.
- *
- * Data provided to the constructors is copied. Otherwise we would have to deal
- * with mutability, security, masking/unmasking, readonly status, etc. So
- * copying greatly simplifies the implementation.
- *
- * In the case of memory-sensitive environments an alternative implementation
- * could use an internal pool of buffers though at the cost of extra complexity
- * and possible performance degradation.
- */
-abstract class OutgoingMessage {
-
-    // Share per WebSocket?
-    private static final SecureRandom maskingKeys = new SecureRandom();
-
-    protected ByteBuffer[] frame;
-    protected int offset;
-
-    /*
-     * Performs contextualization. This method is not a part of the constructor
-     * so it would be possible to defer the work it does until the most
-     * convenient moment (up to the point where sentTo is invoked).
-     */
-    protected boolean contextualize(Context context) {
-        // masking and charset decoding should be performed here rather than in
-        // the constructor (as of today)
-        if (context.isCloseSent()) {
-            throw new IllegalStateException("Close sent");
-        }
-        return true;
-    }
-
-    protected boolean sendTo(RawChannel channel) throws IOException {
-        while ((offset = nextUnwrittenIndex()) != -1) {
-            long n = channel.write(frame, offset, frame.length - offset);
-            if (n == 0) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    private int nextUnwrittenIndex() {
-        for (int i = offset; i < frame.length; i++) {
-            if (frame[i].hasRemaining()) {
-                return i;
-            }
-        }
-        return -1;
-    }
-
-    static final class Text extends OutgoingMessage {
-
-        private final ByteBuffer payload;
-        private final boolean isLast;
-
-        Text(CharSequence characters, boolean isLast) {
-            CharsetEncoder encoder = UTF_8.newEncoder(); // Share per WebSocket?
-            try {
-                payload = encoder.encode(CharBuffer.wrap(characters));
-            } catch (CharacterCodingException e) {
-                throw new IllegalArgumentException(
-                        "Malformed UTF-8 text message");
-            }
-            this.isLast = isLast;
-        }
-
-        @Override
-        protected boolean contextualize(Context context) {
-            super.contextualize(context);
-            if (context.isPreviousBinary() && !context.isPreviousLast()) {
-                throw new IllegalStateException("Unexpected text message");
-            }
-            frame = getDataMessageBuffers(
-                    TEXT, context.isPreviousLast(), isLast, payload, payload);
-            context.setPreviousBinary(false);
-            context.setPreviousText(true);
-            context.setPreviousLast(isLast);
-            return true;
-        }
-    }
-
-    static final class Binary extends OutgoingMessage {
-
-        private final ByteBuffer payload;
-        private final boolean isLast;
-
-        Binary(ByteBuffer payload, boolean isLast) {
-            this.payload = requireNonNull(payload);
-            this.isLast = isLast;
-        }
-
-        @Override
-        protected boolean contextualize(Context context) {
-            super.contextualize(context);
-            if (context.isPreviousText() && !context.isPreviousLast()) {
-                throw new IllegalStateException("Unexpected binary message");
-            }
-            ByteBuffer newBuffer = ByteBuffer.allocate(payload.remaining());
-            frame = getDataMessageBuffers(
-                    BINARY, context.isPreviousLast(), isLast, payload, newBuffer);
-            context.setPreviousText(false);
-            context.setPreviousBinary(true);
-            context.setPreviousLast(isLast);
-            return true;
-        }
-    }
-
-    static final class Ping extends OutgoingMessage {
-
-        Ping(ByteBuffer payload) {
-            frame = getControlMessageBuffers(PING, payload);
-        }
-    }
-
-    static final class Pong extends OutgoingMessage {
-
-        Pong(ByteBuffer payload) {
-            frame = getControlMessageBuffers(PONG, payload);
-        }
-    }
-
-    static final class Close extends OutgoingMessage {
-
-        Close() {
-            frame = getControlMessageBuffers(CLOSE, EMPTY_BYTEBUFFER);
-        }
-
-        Close(int statusCode, CharSequence reason) {
-            ByteBuffer payload = ByteBuffer.allocate(125)
-                    .putChar((char) statusCode);
-            CoderResult result = UTF_8.newEncoder()
-                    .encode(CharBuffer.wrap(reason),
-                            payload,
-                            true);
-            if (result.isOverflow()) {
-                throw new IllegalArgumentException("Long reason");
-            } else if (result.isError()) {
-                try {
-                    result.throwException();
-                } catch (CharacterCodingException e) {
-                    throw new IllegalArgumentException(
-                            "Malformed UTF-8 reason", e);
-                }
-            }
-            payload.flip();
-            frame = getControlMessageBuffers(CLOSE, payload);
-        }
-
-        @Override
-        protected boolean contextualize(Context context) {
-            if (context.isCloseSent()) {
-                return false;
-            } else {
-                context.setCloseSent();
-                return true;
-            }
-        }
-    }
-
-    private static ByteBuffer[] getControlMessageBuffers(Opcode opcode,
-                                                         ByteBuffer payload) {
-        assert opcode.isControl() : opcode;
-        int remaining = payload.remaining();
-        if (remaining > 125) {
-            throw new IllegalArgumentException
-                    ("Long message: " + remaining);
-        }
-        ByteBuffer frame = ByteBuffer.allocate(MAX_HEADER_SIZE_BYTES + remaining);
-        int mask = maskingKeys.nextInt();
-        new Frame.HeaderWriter()
-                .fin(true)
-                .opcode(opcode)
-                .payloadLen(remaining)
-                .mask(mask)
-                .write(frame);
-        Frame.Masker.transferMasking(payload, frame, mask);
-        frame.flip();
-        return new ByteBuffer[]{frame};
-    }
-
-    private static ByteBuffer[] getDataMessageBuffers(Opcode type,
-                                                      boolean isPreviousLast,
-                                                      boolean isLast,
-                                                      ByteBuffer payloadSrc,
-                                                      ByteBuffer payloadDst) {
-        assert !type.isControl() && type != CONTINUATION : type;
-        ByteBuffer header = ByteBuffer.allocate(MAX_HEADER_SIZE_BYTES);
-        int mask = maskingKeys.nextInt();
-        new Frame.HeaderWriter()
-                .fin(isLast)
-                .opcode(isPreviousLast ? type : CONTINUATION)
-                .payloadLen(payloadDst.remaining())
-                .mask(mask)
-                .write(header);
-        header.flip();
-        Frame.Masker.transferMasking(payloadSrc, payloadDst, mask);
-        payloadDst.flip();
-        return new ByteBuffer[]{header, payloadDst};
-    }
-
-    /*
-     * An instance of this class is passed sequentially between messages, so
-     * every message in a sequence can check the context it is in and update it
-     * if necessary.
-     */
-    public static class Context {
-
-        boolean previousLast = true;
-        boolean previousBinary;
-        boolean previousText;
-        boolean closeSent;
-
-        private boolean isPreviousText() {
-            return this.previousText;
-        }
-
-        private void setPreviousText(boolean value) {
-            this.previousText = value;
-        }
-
-        private boolean isPreviousBinary() {
-            return this.previousBinary;
-        }
-
-        private void setPreviousBinary(boolean value) {
-            this.previousBinary = value;
-        }
-
-        private boolean isPreviousLast() {
-            return this.previousLast;
-        }
-
-        private void setPreviousLast(boolean value) {
-            this.previousLast = value;
-        }
-
-        private boolean isCloseSent() {
-            return closeSent;
-        }
-
-        private void setCloseSent() {
-            closeSent = true;
-        }
-    }
-}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/Transport.java	Wed Mar 07 15:39:25 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/Transport.java	Wed Mar 07 17:16:28 2018 +0000
@@ -28,60 +28,69 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
 
 /*
- * Transport needs some way to asynchronously notify the send operation has been
- * completed. It can have several different designs each of which has its own
- * pros and cons:
+ * A WebSocket view of the underlying communication channel. This view provides
+ * an asynchronous exchange of WebSocket messages rather than asynchronous
+ * exchange of bytes.
  *
- *     (1) void sendMessage(..., Callback)
- *     (2) CompletableFuture<T> sendMessage(...)
- *     (3) CompletableFuture<T> sendMessage(..., Callback)
- *     (4) boolean sendMessage(..., Callback) throws IOException
- *     ...
+ * Methods sendText, sendBinary, sendPing, sendPong and sendClose initiate a
+ * corresponding operation and return a CompletableFuture (CF) which will
+ * complete once the operation has completed (succeeded or failed).
  *
- * If Transport's users use CFs, (1) forces these users to create CFs and pass
- * them to the callback. If any additional (dependant) action needs to be
- * attached to the returned CF, this means an extra object (CF) must be created
- * in (2). (3) and (4) solves both issues, however (4) does not abstract out
- * when exactly the operation has been performed. So the handling code needs to
- * be repeated twice. And that leads to 2 different code paths (more bugs).
- * Unless designed for this, the user should not assume any specific order of
- * completion in (3) (e.g. callback first and then the returned CF).
+ * These methods are designed such that their clients may take an advantage on
+ * possible implementation optimizations. Namely, these methods:
+ *
+ * 1. May return null which is considered the same as a CF completed normally
+ * 2. Accept an arbitrary attachment to complete a CF with
+ * 3. Accept an action to take once the operation has completed
  *
- * The only parametrization of Transport<T> used is Transport<WebSocket>. The
- * type parameter T was introduced solely to avoid circular dependency between
- * Transport and WebSocket. After all, instances of T are used solely to
- * complete CompletableFutures. Transport doesn't care about the exact type of
- * T.
- *
- * This way the Transport is fully in charge of creating CompletableFutures.
- * On the one hand, Transport may use it to cache/reuse CompletableFutures. On
- * the other hand, the class that uses Transport, may benefit by not converting
- * from CompletableFuture<K> returned from Transport, to CompletableFuture<V>
- * needed by the said class.
+ * All of the above allows not to create unnecessary instances of CF.
+ * For example, if a message has been sent straight away, there's no need to
+ * create a CF (given the parties agree on the meaning of null and are prepared
+ * to handle it).
+ * If the result of a returned CF is useless to the client, they may specify the
+ * exact instance (attachment) they want the CF to complete with. Thus, no need
+ * to create transforming stages (e.g. thenApply(useless -> myResult)).
+ * If there is the same action that needs to be done each time the CF completes,
+ * the client may pass it directly to the method instead of creating a dependant
+ * stage (e.g. whenComplete(action)).
  */
-public interface Transport<T> {
+public interface Transport {
 
-    CompletableFuture<T> sendText(CharSequence message, boolean isLast);
+    <T> CompletableFuture<T> sendText(CharSequence message,
+                                      boolean isLast,
+                                      T attachment,
+                                      BiConsumer<? super T, ? super Throwable> action);
 
-    CompletableFuture<T> sendBinary(ByteBuffer message, boolean isLast);
+    <T> CompletableFuture<T> sendBinary(ByteBuffer message,
+                                        boolean isLast,
+                                        T attachment,
+                                        BiConsumer<? super T, ? super Throwable> action);
 
-    CompletableFuture<T> sendPing(ByteBuffer message);
+    <T> CompletableFuture<T> sendPing(ByteBuffer message,
+                                      T attachment,
+                                      BiConsumer<? super T, ? super Throwable> action);
 
-    CompletableFuture<T> sendPong(ByteBuffer message);
+    <T> CompletableFuture<T> sendPong(ByteBuffer message,
+                                      T attachment,
+                                      BiConsumer<? super T, ? super Throwable> action);
 
-    CompletableFuture<T> sendClose(int statusCode, String reason);
+    <T> CompletableFuture<T> sendClose(int statusCode,
+                                       String reason,
+                                       T attachment,
+                                       BiConsumer<? super T, ? super Throwable> action);
 
     void request(long n);
 
     /*
-     * Why is this method needed? Since Receiver operates through callbacks
-     * this method allows to abstract out what constitutes as a message being
-     * received (i.e. to decide outside this type when exactly one should
-     * decrement the demand).
+     * Why is this method needed? Since receiving of messages operates through
+     * callbacks this method allows to abstract out what constitutes as a
+     * message being received (i.e. to decide outside this type when exactly one
+     * should decrement the demand).
      */
-    void acknowledgeReception();
+    void acknowledgeReception(); // TODO: hide
 
     void closeOutput() throws IOException;
 
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportFactory.java	Wed Mar 07 15:39:25 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportFactory.java	Wed Mar 07 17:16:28 2018 +0000
@@ -1,10 +1,32 @@
-package jdk.internal.net.http.websocket;
+/*
+ * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
 
-import java.util.function.Supplier;
+package jdk.internal.net.http.websocket;
 
 @FunctionalInterface
 public interface TransportFactory {
 
-    <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
-                                     MessageStreamConsumer consumer);
+    Transport createTransport(MessageStreamConsumer consumer);
 }
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportFactoryImpl.java	Wed Mar 07 15:39:25 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportFactoryImpl.java	Wed Mar 07 17:16:28 2018 +0000
@@ -22,10 +22,9 @@
  * or visit www.oracle.com if you need additional information or have any
  * questions.
  */
+
 package jdk.internal.net.http.websocket;
 
-import java.util.function.Supplier;
-
 public class TransportFactoryImpl implements TransportFactory {
 
     private final RawChannel channel;
@@ -35,8 +34,7 @@
     }
 
     @Override
-    public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
-                                            MessageStreamConsumer consumer) {
-        return new TransportImpl<T>(sendResultSupplier, consumer, channel);
+    public Transport createTransport(MessageStreamConsumer consumer) {
+        return new TransportImpl(consumer, channel);
     }
 }
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java	Wed Mar 07 15:39:25 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java	Wed Mar 07 17:16:28 2018 +0000
@@ -27,62 +27,58 @@
 
 import jdk.internal.net.http.common.Demand;
 import jdk.internal.net.http.common.MinimalFuture;
-import jdk.internal.net.http.common.Pair;
 import jdk.internal.net.http.common.SequentialScheduler;
+import jdk.internal.net.http.common.SequentialScheduler.CompleteRestartableTask;
+import jdk.internal.net.http.common.Utils;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
 import java.nio.channels.SelectionKey;
-import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
 
-import static java.util.Objects.requireNonNull;
-import static jdk.internal.net.http.common.MinimalFuture.failedFuture;
-import static jdk.internal.net.http.common.Pair.pair;
+import static jdk.internal.net.http.websocket.TransportImpl.ChannelState.AVAILABLE;
+import static jdk.internal.net.http.websocket.TransportImpl.ChannelState.UNREGISTERED;
+import static jdk.internal.net.http.websocket.TransportImpl.ChannelState.WAITING;
 
-public class TransportImpl<T> implements Transport<T> {
+public class TransportImpl implements Transport {
+
+    // -- Debugging infrastructure --
+
+    private final static boolean DEBUG = false;
 
-    /* This flag is used solely for assertions */
-    private final AtomicBoolean busy = new AtomicBoolean();
-    private OutgoingMessage message;
-    private Consumer<Exception> completionHandler;
-    private final RawChannel channel;
-    private final RawChannel.RawEvent writeEvent = createWriteEvent();
+    /* Used for correlating enters to and exists from a method */
+    private final static AtomicLong COUNTER = new AtomicLong();
+
     private final SequentialScheduler sendScheduler = new SequentialScheduler(new SendTask());
-    private final Queue<Pair<OutgoingMessage, CompletableFuture<T>>>
-            queue = new ConcurrentLinkedQueue<>();
-    private final OutgoingMessage.Context context = new OutgoingMessage.Context();
-    private final Supplier<T> resultSupplier;
 
+    private final MessageQueue queue = new MessageQueue();
+    private final MessageEncoder encoder = new MessageEncoder();
+    /* A reusable buffer for writing, initially with no remaining bytes */
+    private final ByteBuffer dst = createWriteBuffer().position(0).limit(0);
+    /* This array is created once for gathering writes accepted by RawChannel */
+    private final ByteBuffer[] dstArray = new ByteBuffer[]{dst};
     private final MessageStreamConsumer messageConsumer;
-    private final FrameConsumer frameConsumer;
+    private final MessageDecoder decoder;
     private final Frame.Reader reader = new Frame.Reader();
-    private final RawChannel.RawEvent readEvent = createReadEvent();
+
     private final Demand demand = new Demand();
     private final SequentialScheduler receiveScheduler;
-
+    private final RawChannel channel;
+    private final Object closeLock = new Object();
+    private final RawChannel.RawEvent writeEvent = new WriteEvent();
+    private final RawChannel.RawEvent readEvent = new ReadEvent();
+    private volatile ChannelState writeState = UNREGISTERED;
     private ByteBuffer data;
-    private volatile int state;
-
-    private static final int UNREGISTERED = 0;
-    private static final int AVAILABLE    = 1;
-    private static final int WAITING      = 2;
-
-    private final Object lock = new Object();
+    private volatile ChannelState readState = UNREGISTERED;
     private boolean inputClosed;
     private boolean outputClosed;
-
-    public TransportImpl(Supplier<T> sendResultSupplier,
-                         MessageStreamConsumer consumer,
-                         RawChannel channel) {
-        this.resultSupplier = sendResultSupplier;
+    public TransportImpl(MessageStreamConsumer consumer, RawChannel channel) {
         this.messageConsumer = consumer;
         this.channel = channel;
-        this.frameConsumer = new FrameConsumer(this.messageConsumer);
+        this.decoder = new MessageDecoder(this.messageConsumer);
         this.data = channel.initialByteBuffer();
         // To ensure the initial non-final `data` will be visible
         // (happens-before) when `readEvent.handle()` invokes `receiveScheduler`
@@ -90,190 +86,164 @@
         receiveScheduler = new SequentialScheduler(new ReceiveTask());
     }
 
-    /**
-     * The supplied handler may be invoked in the calling thread.
-     * A {@code StackOverflowError} may thus occur if there's a possibility
-     * that this method is called again by the supplied handler.
-     */
-    public void send(OutgoingMessage message,
-                     Consumer<Exception> completionHandler) {
-        requireNonNull(message);
-        requireNonNull(completionHandler);
-        if (!busy.compareAndSet(false, true)) {
-            throw new IllegalStateException();
+    private ByteBuffer createWriteBuffer() {
+        String name = "jdk.httpclient.websocket.writeBufferSize";
+        int capacity = Utils.getIntegerNetProperty(name, 16384);
+        if (DEBUG) {
+            System.out.printf("[Transport] write buffer capacity %s", capacity);
         }
-        send0(message, completionHandler);
+        // TODO (optimization?): allocateDirect if SSL?
+        return ByteBuffer.allocate(capacity);
     }
 
-    private RawChannel.RawEvent createWriteEvent() {
-        return new RawChannel.RawEvent() {
-
-            @Override
-            public int interestOps() {
-                return SelectionKey.OP_WRITE;
+    private boolean write() throws IOException {
+        if (DEBUG) {
+            System.out.printf("[Transport] writing to the channel%n");
+        }
+        long count = channel.write(dstArray, 0, dstArray.length);
+        if (DEBUG) {
+            System.out.printf("[Transport] %s bytes written%n", count);
+        }
+        for (ByteBuffer b : dstArray) {
+            if (b.hasRemaining()) {
+                return false;
             }
-
-            @Override
-            public void handle() {
-                // registerEvent(e) happens-before subsequent e.handle(), so
-                // we're fine reading the stored message and the completionHandler
-                send0(message, completionHandler);
-            }
-        };
+        }
+        return true;
     }
 
-    private void send0(OutgoingMessage message, Consumer<Exception> handler) {
-        boolean b = busy.get();
-        assert b; // Please don't inline this, as busy.get() has memory
-                  // visibility effects and we don't want the program behaviour
-                  // to depend on whether the assertions are turned on
-                  // or turned off
+    @Override
+    public <T> CompletableFuture<T> sendText(CharSequence message,
+                                             boolean isLast,
+                                             T attachment,
+                                             BiConsumer<? super T, ? super Throwable> action) {
+        long id;
+        if (DEBUG) {
+            id = COUNTER.incrementAndGet();
+            System.out.printf("[Transport] %s: sendText message.length()=%s, last=%s%n",
+                              id, message.length(), isLast);
+        }
+        // TODO (optimization?):
+        // These sendXXX methods might be a good place to decide whether or not
+        // we can write straight ahead, possibly returning null instead of
+        // creating a CompletableFuture
+
+        // Even if the text is already CharBuffer, the client will not be happy
+        // if they discover the position is changing. So, no instanceof
+        // cheating, wrap always.
+        CharBuffer text = CharBuffer.wrap(message);
+        MinimalFuture<T> f = new MinimalFuture<>();
         try {
-            boolean sent = message.sendTo(channel);
-            if (sent) {
-                busy.set(false);
-                handler.accept(null);
-            } else {
-                // The message has not been fully sent, the transmitter needs to
-                // remember the message until it can continue with sending it
-                this.message = message;
-                this.completionHandler = handler;
-                try {
-                    channel.registerEvent(writeEvent);
-                } catch (IOException e) {
-                    this.message = null;
-                    this.completionHandler = null;
-                    busy.set(false);
-                    handler.accept(e);
-                }
-            }
+            queue.addText(text, isLast, attachment, action, f);
+            sendScheduler.runOrSchedule();
         } catch (IOException e) {
-            busy.set(false);
-            handler.accept(e);
-        }
-    }
-
-    public CompletableFuture<T> sendText(CharSequence message,
-                                         boolean isLast) {
-        OutgoingMessage.Text m;
-        try {
-            m = new OutgoingMessage.Text(message, isLast);
-        } catch (IllegalArgumentException e) {
-            return failedFuture(e);
+            f.completeExceptionally(e);
         }
-        return enqueue(m);
-    }
-
-    public CompletableFuture<T> sendBinary(ByteBuffer message,
-                                           boolean isLast) {
-        return enqueue(new OutgoingMessage.Binary(message, isLast));
-    }
-
-    public CompletableFuture<T> sendPing(ByteBuffer message) {
-        OutgoingMessage.Ping m;
-        try {
-            m = new OutgoingMessage.Ping(message);
-        } catch (IllegalArgumentException e) {
-            return failedFuture(e);
+        if (DEBUG) {
+            System.out.printf("[Transport] %s: sendText returned %s%n", id, f);
         }
-        return enqueue(m);
+        return f;
     }
 
-    public CompletableFuture<T> sendPong(ByteBuffer message) {
-        OutgoingMessage.Pong m;
-        try {
-            m = new OutgoingMessage.Pong(message);
-        } catch (IllegalArgumentException e) {
-            return failedFuture(e);
+    @Override
+    public <T> CompletableFuture<T> sendBinary(ByteBuffer message,
+                                               boolean isLast,
+                                               T attachment,
+                                               BiConsumer<? super T, ? super Throwable> action) {
+        long id;
+        if (DEBUG) {
+            id = COUNTER.incrementAndGet();
+            System.out.printf("[Transport] %s: sendBinary message.remaining()=%s, last=%s%n",
+                              id, message.remaining(), isLast);
         }
-        return enqueue(m);
-    }
-
-    public CompletableFuture<T> sendClose(int statusCode, String reason) {
-        OutgoingMessage.Close m;
+        MinimalFuture<T> f = new MinimalFuture<>();
         try {
-            m = new OutgoingMessage.Close(statusCode, reason);
-        } catch (IllegalArgumentException e) {
-            return failedFuture(e);
+            queue.addBinary(message, isLast, attachment, action, f);
+            sendScheduler.runOrSchedule();
+        } catch (IOException e) {
+            f.completeExceptionally(e);
         }
-        return enqueue(m);
-    }
-
-    private CompletableFuture<T> enqueue(OutgoingMessage m) {
-        CompletableFuture<T> cf = new MinimalFuture<>();
-        boolean added = queue.add(pair(m, cf));
-        if (!added) {
-            // The queue is supposed to be unbounded
-            throw new InternalError();
+        if (DEBUG) {
+            System.out.printf("[Transport] %s: sendBinary returned %s%n", id, f);
         }
-        sendScheduler.runOrSchedule();
-        return cf;
+        return f;
     }
 
-    /*
-     * This is a message sending task. It pulls messages from the queue one by
-     * one and sends them. It may be run in different threads, but never
-     * concurrently.
-     */
-    private class SendTask implements SequentialScheduler.RestartableTask {
-
-        @Override
-        public void run(SequentialScheduler.DeferredCompleter taskCompleter) {
-            Pair<OutgoingMessage, CompletableFuture<T>> p = queue.poll();
-            if (p == null) {
-                taskCompleter.complete();
-                return;
-            }
-            OutgoingMessage message = p.first;
-            CompletableFuture<T> cf = p.second;
-            try {
-                if (!message.contextualize(context)) { // Do not send the message
-                    cf.complete(resultSupplier.get());
-                    repeat(taskCompleter);
-                    return;
-                }
-                Consumer<Exception> h = e -> {
-                    if (e == null) {
-                        cf.complete(resultSupplier.get());
-                    } else {
-                        cf.completeExceptionally(e);
-                    }
-                    repeat(taskCompleter);
-                };
-                send(message, h);
-            } catch (Throwable t) {
-                cf.completeExceptionally(t);
-                repeat(taskCompleter);
-            }
+    @Override
+    public <T> CompletableFuture<T> sendPing(ByteBuffer message,
+                                             T attachment,
+                                             BiConsumer<? super T, ? super Throwable> action) {
+        long id;
+        if (DEBUG) {
+            id = COUNTER.incrementAndGet();
+            System.out.printf("[Transport] %s: sendPing message.remaining()=%s%n",
+                              id, message.remaining());
         }
-
-        private void repeat(SequentialScheduler.DeferredCompleter taskCompleter) {
-            taskCompleter.complete();
-            // More than a single message may have been enqueued while
-            // the task has been busy with the current message, but
-            // there is only a single signal recorded
+        MinimalFuture<T> f = new MinimalFuture<>();
+        try {
+            queue.addPing(message, attachment, action, f);
             sendScheduler.runOrSchedule();
+        } catch (IOException e) {
+            f.completeExceptionally(e);
         }
+        if (DEBUG) {
+            System.out.printf("[Transport] %s: sendPing returned %s%n", id, f);
+        }
+        return f;
     }
 
-    private RawChannel.RawEvent createReadEvent() {
-        return new RawChannel.RawEvent() {
+    @Override
+    public <T> CompletableFuture<T> sendPong(ByteBuffer message,
+                                             T attachment,
+                                             BiConsumer<? super T, ? super Throwable> action) {
+        long id;
+        if (DEBUG) {
+            id = COUNTER.incrementAndGet();
+            System.out.printf("[Transport] %s: sendPong message.remaining()=%s%n",
+                              id, message.remaining());
+        }
+        MinimalFuture<T> f = new MinimalFuture<>();
+        try {
+            queue.addPong(message, attachment, action, f);
+            sendScheduler.runOrSchedule();
+        } catch (IOException e) {
+            f.completeExceptionally(e);
+        }
+        if (DEBUG) {
+            System.out.printf("[Transport] %s: sendPong returned %s%n", id, f);
+        }
+        return f;
+    }
 
-            @Override
-            public int interestOps() {
-                return SelectionKey.OP_READ;
-            }
-
-            @Override
-            public void handle() {
-                state = AVAILABLE;
-                receiveScheduler.runOrSchedule();
-            }
-        };
+    @Override
+    public <T> CompletableFuture<T> sendClose(int statusCode,
+                                              String reason,
+                                              T attachment,
+                                              BiConsumer<? super T, ? super Throwable> action) {
+        long id;
+        if (DEBUG) {
+            id = COUNTER.incrementAndGet();
+            System.out.printf("[Transport] %s: sendClose statusCode=%s, reason.length()=%s%n",
+                              id, statusCode, reason.length());
+        }
+        MinimalFuture<T> f = new MinimalFuture<>();
+        try {
+            queue.addClose(statusCode, CharBuffer.wrap(reason), attachment, action, f);
+            sendScheduler.runOrSchedule();
+        } catch (IOException e) {
+            f.completeExceptionally(e);
+        }
+        if (DEBUG) {
+            System.out.printf("[Transport] %s: sendClose returned %s%n", id, f);
+        }
+        return f;
     }
 
     @Override
     public void request(long n) {
+        if (DEBUG) {
+            System.out.printf("[Transport] request %s%n", n);
+        }
         if (demand.increase(n)) {
             receiveScheduler.runOrSchedule();
         }
@@ -287,58 +257,20 @@
         }
     }
 
-    private class ReceiveTask extends SequentialScheduler.CompleteRestartableTask {
-
-        @Override
-        public void run() {
-            while (!receiveScheduler.isStopped()) {
-                if (data.hasRemaining()) {
-                    if (!demand.isFulfilled()) {
-                        try {
-                            int oldPos = data.position();
-                            reader.readFrame(data, frameConsumer);
-                            int newPos = data.position();
-                            assert oldPos != newPos : data; // reader always consumes bytes
-                        } catch (Throwable e) {
-                            receiveScheduler.stop();
-                            messageConsumer.onError(e);
-                        }
-                        continue;
+    @Override
+    public void closeOutput() throws IOException {
+        if (DEBUG) {
+            System.out.printf("[Transport] closeOutput%n");
+        }
+        synchronized (closeLock) {
+            if (!outputClosed) {
+                outputClosed = true;
+                try {
+                    channel.shutdownOutput();
+                } finally {
+                    if (inputClosed) {
+                        channel.close();
                     }
-                    break;
-                }
-                switch (state) {
-                    case WAITING:
-                        return;
-                    case UNREGISTERED:
-                        try {
-                            state = WAITING;
-                            channel.registerEvent(readEvent);
-                        } catch (Throwable e) {
-                            receiveScheduler.stop();
-                            messageConsumer.onError(e);
-                        }
-                        return;
-                    case AVAILABLE:
-                        try {
-                            data = channel.read();
-                        } catch (Throwable e) {
-                            receiveScheduler.stop();
-                            messageConsumer.onError(e);
-                            return;
-                        }
-                        if (data == null) { // EOF
-                            receiveScheduler.stop();
-                            messageConsumer.onComplete();
-                            return;
-                        } else if (!data.hasRemaining()) {
-                            // No data at the moment Pretty much a "goto",
-                            // reusing the existing code path for registration
-                            state = UNREGISTERED;
-                        }
-                        continue;
-                    default:
-                        throw new InternalError(String.valueOf(state));
                 }
             }
         }
@@ -350,7 +282,10 @@
      */
     @Override
     public void closeInput() throws IOException {
-        synchronized (lock) {
+        if (DEBUG) {
+            System.out.printf("[Transport] closeInput%n");
+        }
+        synchronized (closeLock) {
             if (!inputClosed) {
                 inputClosed = true;
                 try {
@@ -365,19 +300,394 @@
         }
     }
 
-    @Override
-    public void closeOutput() throws IOException {
-        synchronized (lock) {
-            if (!outputClosed) {
-                outputClosed = true;
+    /*  Common states for send and receive tasks */
+    enum ChannelState {
+        UNREGISTERED,
+        AVAILABLE,
+        WAITING
+    }
+
+    @SuppressWarnings({"rawtypes"})
+    private class SendTask extends CompleteRestartableTask {
+
+        private final MessageQueue.QueueCallback<Boolean, IOException>
+                encodingCallback = new MessageQueue.QueueCallback<>() {
+
+            @Override
+            public <T> Boolean onText(CharBuffer message,
+                                      boolean isLast,
+                                      T attachment,
+                                      BiConsumer<? super T, ? super Throwable> action,
+                                      CompletableFuture<? super T> future) throws IOException
+            {
+                return encoder.encodeText(message, isLast, dst);
+            }
+
+            @Override
+            public <T> Boolean onBinary(ByteBuffer message,
+                                        boolean isLast,
+                                        T attachment,
+                                        BiConsumer<? super T, ? super Throwable> action,
+                                        CompletableFuture<? super T> future) throws IOException
+            {
+                return encoder.encodeBinary(message, isLast, dst);
+            }
+
+            @Override
+            public <T> Boolean onPing(ByteBuffer message,
+                                      T attachment,
+                                      BiConsumer<? super T, ? super Throwable> action,
+                                      CompletableFuture<? super T> future) throws IOException
+            {
+                return encoder.encodePing(message, dst);
+            }
+
+            @Override
+            public <T> Boolean onPong(ByteBuffer message,
+                                      T attachment,
+                                      BiConsumer<? super T, ? super Throwable> action,
+                                      CompletableFuture<? super T> future) throws IOException
+            {
+                return encoder.encodePong(message, dst);
+            }
+
+            @Override
+            public <T> Boolean onClose(int statusCode,
+                                       CharBuffer reason,
+                                       T attachment,
+                                       BiConsumer<? super T, ? super Throwable> action,
+                                       CompletableFuture<? super T> future) throws IOException
+            {
+                return encoder.encodeClose(statusCode, reason, dst);
+            }
+
+            @Override
+            public Boolean onEmpty() {
+                return false;
+            }
+        };
+
+        /* Whether the task sees the current head message for first time */
+        private boolean firstPass = true;
+        /* Whether the message has been fully encoded */
+        private boolean encoded;
+
+        // -- Current message completion communication fields --
+
+        private Object attachment;
+        private BiConsumer action;
+        private CompletableFuture future;
+        private final MessageQueue.QueueCallback<Boolean, RuntimeException>
+                /* If there is a message, loads its completion communication fields */
+                loadCallback = new MessageQueue.QueueCallback<Boolean, RuntimeException>() {
+
+            @Override
+            public <T> Boolean onText(CharBuffer message,
+                                      boolean isLast,
+                                      T attachment,
+                                      BiConsumer<? super T, ? super Throwable> action,
+                                      CompletableFuture<? super T> future)
+            {
+                SendTask.this.attachment = attachment;
+                SendTask.this.action = action;
+                SendTask.this.future = future;
+                return true;
+            }
+
+            @Override
+            public <T> Boolean onBinary(ByteBuffer message,
+                                        boolean isLast,
+                                        T attachment,
+                                        BiConsumer<? super T, ? super Throwable> action,
+                                        CompletableFuture<? super T> future)
+            {
+                SendTask.this.attachment = attachment;
+                SendTask.this.action = action;
+                SendTask.this.future = future;
+                return true;
+            }
+
+            @Override
+            public <T> Boolean onPing(ByteBuffer message,
+                                      T attachment,
+                                      BiConsumer<? super T, ? super Throwable> action,
+                                      CompletableFuture<? super T> future)
+            {
+                SendTask.this.attachment = attachment;
+                SendTask.this.action = action;
+                SendTask.this.future = future;
+                return true;
+            }
+
+            @Override
+            public <T> Boolean onPong(ByteBuffer message,
+                                      T attachment,
+                                      BiConsumer<? super T, ? super Throwable> action,
+                                      CompletableFuture<? super T> future)
+            {
+                SendTask.this.attachment = attachment;
+                SendTask.this.action = action;
+                SendTask.this.future = future;
+                return true;
+            }
+
+            @Override
+            public <T> Boolean onClose(int statusCode,
+                                       CharBuffer reason,
+                                       T attachment,
+                                       BiConsumer<? super T, ? super Throwable> action,
+                                       CompletableFuture<? super T> future)
+            {
+                SendTask.this.attachment = attachment;
+                SendTask.this.action = action;
+                SendTask.this.future = future;
+                return true;
+            }
+
+            @Override
+            public Boolean onEmpty() {
+                return false;
+            }
+        };
+
+        @Override
+        public void run() {
+            // Could have been only called in one of the following cases:
+            //   (a) A message has been added to the queue
+            //   (b) The channel is ready for writing
+            if (DEBUG) {
+                System.out.printf("[Transport] begin send task%n");
+            }
+            while (!queue.isEmpty()) {
                 try {
-                    channel.shutdownOutput();
+                    if (dst.hasRemaining()) {
+                        if (DEBUG) {
+                            System.out.printf("[Transport] %s bytes in buffer%n",
+                                              dst.remaining());
+                        }
+                        // The previous part of the binary representation of the message
+                        // hasn't been fully written
+                        if (!tryCompleteWrite()) {
+                            return;
+                        }
+                    } else if (!encoded) {
+                        if (firstPass) {
+                            firstPass = false;
+                            queue.peek(loadCallback);
+                            if (DEBUG) {
+                                System.out.printf("[Transport] loaded message%n");
+                            }
+                        }
+                        dst.clear();
+                        encoded = queue.peek(encodingCallback);
+                        dst.flip();
+                        if (!tryCompleteWrite()) {
+                            return;
+                        }
+                    } else {
+                        // All done, remove and complete
+                        encoder.reset();
+                        removeAndComplete(null);
+                    }
+                } catch (Throwable t) {
+                    if (DEBUG) {
+                        System.out.printf("[Transport] exception %s; cleanup%n", t);
+                    }
+                    // buffer cleanup: if there is an exception, the buffer
+                    // should appear empty for the next write as there is
+                    // nothing to write
+                    dst.position(dst.limit());
+                    encoder.reset();
+                    removeAndComplete(t);
+                }
+            }
+            if (DEBUG) {
+                System.out.printf("[Transport] end send task%n");
+            }
+        }
+
+        private boolean tryCompleteWrite() throws IOException {
+            if (DEBUG) {
+                System.out.printf("[Transport] begin writing%n");
+            }
+            boolean finished = false;
+            loop:
+            while (true) {
+                final ChannelState ws = writeState;
+                if (DEBUG) {
+                    System.out.printf("[Transport] write state: %s%n", ws);
+                }
+                switch (ws) {
+                    case WAITING:
+                        break loop;
+                    case UNREGISTERED:
+                        if (DEBUG) {
+                            System.out.printf("[Transport] registering write event%n");
+                        }
+                        writeState = WAITING;
+                        try {
+                            channel.registerEvent(writeEvent);
+                        } catch (Throwable t) {
+                            writeState = UNREGISTERED;
+                            throw t;
+                        }
+                        if (DEBUG) {
+                            System.out.printf("[Transport] registered write event%n");
+                        }
+                        break loop;
+                    case AVAILABLE:
+                        boolean written = write();
+                        if (written) {
+                            if (DEBUG) {
+                                System.out.printf("[Transport] finished writing to the channel%n");
+                            }
+                            finished = true;
+                            break loop;   // All done
+                        } else {
+                            writeState = UNREGISTERED;
+                            continue loop; //  Effectively "goto UNREGISTERED"
+                        }
+                    default:
+                        throw new InternalError(String.valueOf(ws));
+                }
+            }
+            if (DEBUG) {
+                System.out.printf("[Transport] end writing%n");
+            }
+            return finished;
+        }
+
+        @SuppressWarnings("unchecked")
+        private void removeAndComplete(Throwable error) {
+            if (DEBUG) {
+                System.out.printf("[Transport] removeAndComplete error=%s%n", error);
+            }
+            queue.remove();
+            if (error != null) {
+                try {
+                    action.accept(null, error);
                 } finally {
-                    if (inputClosed) {
-                        channel.close();
+                    future.completeExceptionally(error);
+                }
+            } else {
+                try {
+                    action.accept(attachment, null);
+                } finally {
+                    future.complete(attachment);
+                }
+            }
+            encoded = false;
+            firstPass = true;
+            attachment = null;
+            action = null;
+            future = null;
+        }
+    }
+
+    private class ReceiveTask extends CompleteRestartableTask {
+
+        @Override
+        public void run() {
+            if (DEBUG) {
+                System.out.printf("[Transport] begin receive task%n");
+            }
+            loop:
+            while (!receiveScheduler.isStopped()) {
+                if (data.hasRemaining()) {
+                    if (DEBUG) {
+                        System.out.printf("[Transport] remaining bytes received %s%n",
+                                          data.remaining());
                     }
+                    if (!demand.isFulfilled()) {
+                        try {
+                            int oldPos = data.position();
+                            reader.readFrame(data, decoder);
+                            int newPos = data.position();
+                            // Reader always consumes bytes:
+                            assert oldPos != newPos : data;
+                        } catch (Throwable e) {
+                            receiveScheduler.stop();
+                            messageConsumer.onError(e);
+                        }
+                        continue;
+                    }
+                    break loop;
                 }
+                final ChannelState rs = readState;
+                if (DEBUG) {
+                    System.out.printf("[Transport] receive state: %s%n", rs);
+                }
+                switch (rs) {
+                    case WAITING:
+                        break loop;
+                    case UNREGISTERED:
+                        try {
+                            readState = WAITING;
+                            channel.registerEvent(readEvent);
+                        } catch (Throwable e) {
+                            receiveScheduler.stop();
+                            messageConsumer.onError(e);
+                        }
+                        break loop;
+                    case AVAILABLE:
+                        try {
+                            data = channel.read();
+                        } catch (Throwable e) {
+                            receiveScheduler.stop();
+                            messageConsumer.onError(e);
+                            break loop;
+                        }
+                        if (data == null) { // EOF
+                            receiveScheduler.stop();
+                            messageConsumer.onComplete();
+                            break loop;
+                        } else if (!data.hasRemaining()) {
+                            // No data at the moment. Pretty much a "goto",
+                            // reusing the existing code path for registration
+                            readState = UNREGISTERED;
+                        }
+                        continue loop;
+                    default:
+                        throw new InternalError(String.valueOf(rs));
+                }
+            }
+            if (DEBUG) {
+                System.out.printf("[Transport] end receive task%n");
             }
         }
     }
+
+    private class WriteEvent implements RawChannel.RawEvent {
+
+        @Override
+        public int interestOps() {
+            return SelectionKey.OP_WRITE;
+        }
+
+        @Override
+        public void handle() {
+            if (DEBUG) {
+                System.out.printf("[Transport] ready to write%n");
+            }
+            writeState = AVAILABLE;
+            sendScheduler.runOrSchedule();
+        }
+    }
+
+    private class ReadEvent implements RawChannel.RawEvent {
+
+        @Override
+        public int interestOps() {
+            return SelectionKey.OP_READ;
+        }
+
+        @Override
+        public void handle() {
+            if (DEBUG) {
+                System.out.printf("[Transport] ready to read%n");
+            }
+            readState = AVAILABLE;
+            receiveScheduler.runOrSchedule();
+        }
+    }
 }
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java	Wed Mar 07 15:39:25 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java	Wed Mar 07 17:16:28 2018 +0000
@@ -25,7 +25,6 @@
 
 package jdk.internal.net.http.websocket;
 
-import java.net.http.WebSocket;
 import jdk.internal.net.http.common.Demand;
 import jdk.internal.net.http.common.Log;
 import jdk.internal.net.http.common.MinimalFuture;
@@ -37,6 +36,7 @@
 import java.lang.ref.Reference;
 import java.net.ProtocolException;
 import java.net.URI;
+import java.net.http.WebSocket;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
@@ -44,6 +44,7 @@
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
 import java.util.function.Function;
 
 import static java.util.Objects.requireNonNull;
@@ -66,6 +67,8 @@
  */
 public final class WebSocketImpl implements WebSocket {
 
+    private final static boolean DEBUG = false;
+
     enum State {
         OPEN,
         IDLE,
@@ -78,6 +81,7 @@
         ERROR;
     }
 
+    private final MinimalFuture<WebSocket> DONE = MinimalFuture.completedFuture(this);
     private volatile boolean inputClosed;
     private volatile boolean outputClosed;
 
@@ -96,8 +100,9 @@
     private final Listener listener;
 
     private final AtomicBoolean outstandingSend = new AtomicBoolean();
-    private final Transport<WebSocket> transport;
-    private final SequentialScheduler receiveScheduler = new SequentialScheduler(new ReceiveTask());
+    private final Transport transport;
+    private final SequentialScheduler receiveScheduler
+            = new SequentialScheduler(new ReceiveTask());
     private final Demand demand = new Demand();
 
     public static CompletableFuture<WebSocket> newInstanceAsync(BuilderImpl b) {
@@ -142,10 +147,11 @@
         this.subprotocol = requireNonNull(subprotocol);
         this.listener = requireNonNull(listener);
         this.transport = transportFactory.createTransport(
-                () -> WebSocketImpl.this, // What about escape of WebSocketImpl.this?
                 new SignallingMessageConsumer());
     }
 
+    // FIXME: add to action handling of errors -> signalError()
+
     @Override
     public CompletableFuture<WebSocket> sendText(CharSequence message,
                                                  boolean isLast) {
@@ -153,8 +159,10 @@
         if (!outstandingSend.compareAndSet(false, true)) {
             return failedFuture(new IllegalStateException("Send pending"));
         }
-        CompletableFuture<WebSocket> cf = transport.sendText(message, isLast);
-        return cf.whenComplete((r, e) -> outstandingSend.set(false));
+        CompletableFuture<WebSocket> cf
+                = transport.sendText(message, isLast, this,
+                                     (r, e) -> outstandingSend.set(false));
+        return replaceNull(cf);
     }
 
     @Override
@@ -164,61 +172,88 @@
         if (!outstandingSend.compareAndSet(false, true)) {
             return failedFuture(new IllegalStateException("Send pending"));
         }
-        CompletableFuture<WebSocket> cf = transport.sendBinary(message, isLast);
-        // Optimize?
-        //     if (cf.isDone()) {
-        //         outstandingSend.set(false);
-        //     } else {
-        //         cf.whenComplete((r, e) -> outstandingSend.set(false));
-        //     }
-        return cf.whenComplete((r, e) -> outstandingSend.set(false));
+        CompletableFuture<WebSocket> cf
+                = transport.sendBinary(message, isLast, this,
+                                       (r, e) -> outstandingSend.set(false));
+        return replaceNull(cf);
+    }
+
+    private CompletableFuture<WebSocket> replaceNull(
+            CompletableFuture<WebSocket> cf)
+    {
+        if (cf == null) {
+            return DONE;
+        } else {
+            return cf;
+        }
     }
 
     @Override
     public CompletableFuture<WebSocket> sendPing(ByteBuffer message) {
-        return transport.sendPing(message);
+        Objects.requireNonNull(message);
+        CompletableFuture<WebSocket> cf
+                = transport.sendPing(message, this, (r, e) -> { });
+        return replaceNull(cf);
     }
 
     @Override
     public CompletableFuture<WebSocket> sendPong(ByteBuffer message) {
-        return transport.sendPong(message);
+        Objects.requireNonNull(message);
+        CompletableFuture<WebSocket> cf
+                = transport.sendPong(message, this, (r, e) -> { });
+        return replaceNull(cf);
     }
 
     @Override
-    public CompletableFuture<WebSocket> sendClose(int statusCode, String reason) {
+    public CompletableFuture<WebSocket> sendClose(int statusCode,
+                                                  String reason) {
         Objects.requireNonNull(reason);
         if (!isLegalToSendFromClient(statusCode)) {
             return failedFuture(new IllegalArgumentException("statusCode"));
         }
-        return sendClose0(statusCode, reason);
+        CompletableFuture<WebSocket> cf = sendClose0(statusCode, reason);
+        return replaceNull(cf);
     }
 
     /*
      * Sends a Close message, then shuts down the output since no more
-     * messages are expected to be sent after this.
+     * messages are expected to be sent at this point.
      */
-    private CompletableFuture<WebSocket> sendClose0(int statusCode, String reason ) {
+    private CompletableFuture<WebSocket> sendClose0(int statusCode,
+                                                    String reason) {
         outputClosed = true;
-        return transport.sendClose(statusCode, reason)
-                .whenComplete((result, error) -> {
-                    try {
-                        transport.closeOutput();
-                    } catch (IOException e) {
-                        Log.logError(e);
-                    }
-                    Throwable cause = Utils.getCompletionCause(error);
-                    if (cause instanceof TimeoutException) {
-                        try {
-                            transport.closeInput();
-                        } catch (IOException e) {
-                            Log.logError(e);
-                        }
-                    }
-                });
+        BiConsumer<WebSocket, Throwable> closer = (r, e) -> {
+            Throwable cause = Utils.getCompletionCause(e);
+            if (cause instanceof IllegalArgumentException) {
+                // or pre=check it (isLegalToSendFromClient(statusCode))
+                return;
+            }
+            try {
+                transport.closeOutput();
+            } catch (IOException ex) {
+                Log.logError(ex);
+            }
+            if (cause instanceof TimeoutException) { // FIXME: it is not the case anymore
+                if (DEBUG) {
+                    System.out.println("[WebSocket] sendClose0 error: " + e);
+                }
+                try {
+                    transport.closeInput();
+                } catch (IOException ex) {
+                    Log.logError(ex);
+                }
+            }
+        };
+        CompletableFuture<WebSocket> cf
+                = transport.sendClose(statusCode, reason, this, closer);
+        return cf;
     }
 
     @Override
     public void request(long n) {
+        if (DEBUG) {
+            System.out.printf("[WebSocket] request(%s)%n", n);
+        }
         if (demand.increase(n)) {
             receiveScheduler.runOrSchedule();
         }
@@ -241,6 +276,9 @@
 
     @Override
     public void abort() {
+        if (DEBUG) {
+            System.out.printf("[WebSocket] abort()%n");
+        }
         inputClosed = true;
         outputClosed = true;
         receiveScheduler.stop();
@@ -327,6 +365,9 @@
         }
 
         private void processError() throws IOException {
+            if (DEBUG) {
+                System.out.println("[WebSocket] processError");
+            }
             transport.closeInput();
             receiveScheduler.stop();
             Throwable err = error.get();
@@ -345,24 +386,33 @@
         }
 
         private void processClose() throws IOException {
+            if (DEBUG) {
+                System.out.println("[WebSocket] processClose");
+            }
             transport.closeInput();
             receiveScheduler.stop();
             CompletionStage<?> readyToClose;
             readyToClose = listener.onClose(WebSocketImpl.this, statusCode, reason);
             if (readyToClose == null) {
-                readyToClose = MinimalFuture.completedFuture(null);
+                readyToClose = DONE;
             }
             int code;
             if (statusCode == NO_STATUS_CODE || statusCode == CLOSED_ABNORMALLY) {
                 code = NORMAL_CLOSURE;
+                if (DEBUG) {
+                    System.out.printf("[WebSocket] using statusCode %s instead of %s%n",
+                                      statusCode, code);
+                }
             } else {
                 code = statusCode;
             }
             readyToClose.whenComplete((r, e) -> {
-                sendClose0(code, "")
+                sendClose0(code, "") // FIXME errors from here?
                         .whenComplete((r1, e1) -> {
-                            if (e1 != null) {
-                                Log.logError(e1);
+                            if (DEBUG) {
+                                if (e1 != null) {
+                                    e1.printStackTrace(System.out);
+                                }
                             }
                         });
             });
@@ -381,14 +431,12 @@
                     .put(binaryData)
                     .flip();
             // Non-exclusive send;
-            CompletableFuture<WebSocket> pongSent = transport.sendPong(copy);
-            pongSent.whenComplete(
-                    (r, e) -> {
-                        if (e != null) {
-                            signalError(Utils.getCompletionCause(e));
-                        }
-                    }
-            );
+            BiConsumer<WebSocketImpl, Throwable> reporter = (r, e) -> {
+                if (e != null) {
+                    signalError(Utils.getCompletionCause(e));
+                }
+            };
+            transport.sendPong(copy, WebSocketImpl.this, reporter);
             listener.onPing(WebSocketImpl.this, slice);
         }
 
@@ -406,10 +454,16 @@
     }
 
     private void signalOpen() {
+        if (DEBUG) {
+            System.out.printf("[WebSocket] signalOpen%n");
+        }
         receiveScheduler.runOrSchedule();
     }
 
     private void signalError(Throwable error) {
+        if (DEBUG) {
+            System.out.printf("[WebSocket] signalError %s%n", error);
+        }
         inputClosed = true;
         outputClosed = true;
         if (!this.error.compareAndSet(null, error) || !trySetState(ERROR)) {
@@ -420,32 +474,56 @@
     }
 
     private void close() {
+        if (DEBUG) {
+            System.out.println("[WebSocket] close");
+        }
+        Throwable first = null;
         try {
+            transport.closeInput();
+        } catch (Throwable t1) {
+            first = t1;
+        } finally {
+            Throwable second = null;
             try {
-                transport.closeInput();
+                transport.closeOutput();
+            } catch (Throwable t2) {
+                second = t2;
             } finally {
-                transport.closeOutput();
+                Throwable e = null;
+                if (first != null && second != null) {
+                    first.addSuppressed(second);
+                    e = first;
+                } else if (first != null) {
+                    e = first;
+                } else if (second != null) {
+                    e = second;
+                }
+                if (DEBUG) {
+                    if (e != null) {
+                        e.printStackTrace(System.out);
+                    }
+                }
             }
-        } catch (Throwable t) {
-            Log.logError(t);
         }
     }
 
-    /*
-     * Signals a Close event (might not correspond to anything happened on the
-     * channel, i.e. might be synthetic).
-     */
     private void signalClose(int statusCode, String reason) {
+        // FIXME: make sure no race reason & close are not intermixed
         inputClosed = true;
         this.statusCode = statusCode;
         this.reason = reason;
-        if (!trySetState(CLOSE)) {
-            Log.logTrace("Close: {0}, ''{1}''", statusCode, reason);
-        } else {
+        boolean managed = trySetState(CLOSE);
+        if (DEBUG) {
+            System.out.printf("[WebSocket] signalClose statusCode=%s, reason.length()=%s: %s%n",
+                              statusCode, reason.length(), managed);
+        }
+        if (managed) {
             try {
                 transport.closeInput();
             } catch (Throwable t) {
-                Log.logError(t);
+                if (DEBUG) {
+                    t.printStackTrace(System.out);
+                }
             }
         }
     }
@@ -501,33 +579,45 @@
     }
 
     private boolean trySetState(State newState) {
+        State currentState;
+        boolean success = false;
         while (true) {
-            State currentState = state.get();
+            currentState = state.get();
             if (currentState == ERROR || currentState == CLOSE) {
-                return false;
+                break;
             } else if (state.compareAndSet(currentState, newState)) {
                 receiveScheduler.runOrSchedule();
-                return true;
+                success = true;
+                break;
             }
         }
+        if (DEBUG) {
+            System.out.printf("[WebSocket] set state %s (previous %s) %s%n",
+                              newState, currentState, success);
+        }
+        return success;
     }
 
     private boolean tryChangeState(State expectedState, State newState) {
         State witness = state.compareAndExchange(expectedState, newState);
+        boolean success = false;
         if (witness == expectedState) {
             receiveScheduler.runOrSchedule();
-            return true;
-        }
-        // This should be the only reason for inability to change the state from
-        // IDLE to WAITING: the state has changed to terminal
-        if (witness != ERROR && witness != CLOSE) {
+            success = true;
+        } else if (witness != ERROR && witness != CLOSE) {
+            // This should be the only reason for inability to change the state
+            // from IDLE to WAITING: the state has changed to terminal
             throw new InternalError();
         }
-        return false;
+        if (DEBUG) {
+            System.out.printf("[WebSocket] change state from %s to %s %s%n",
+                              expectedState, newState, success);
+        }
+        return success;
     }
 
     /* Exposed for testing purposes */
-    protected final Transport<WebSocket> transport() {
+    protected Transport transport() {
         return transport;
     }
 }
--- a/test/jdk/java/net/httpclient/websocket/DummyWebSocketServer.java	Wed Mar 07 15:39:25 2018 +0000
+++ b/test/jdk/java/net/httpclient/websocket/DummyWebSocketServer.java	Wed Mar 07 17:16:28 2018 +0000
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -42,6 +42,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 import java.util.regex.Pattern;
@@ -86,6 +87,8 @@
     private final Thread thread;
     private volatile ServerSocketChannel ssc;
     private volatile InetSocketAddress address;
+    private ByteBuffer read = ByteBuffer.allocate(1024);
+    private final CountDownLatch readReady = new CountDownLatch(1);
 
     public DummyWebSocketServer() {
         this(defaultMapping());
@@ -114,6 +117,7 @@
                     } finally {
                         err.println("Closed: " + channel);
                         close(channel);
+                        readReady.countDown();
                     }
                 }
             } catch (ClosedByInterruptException ignored) {
@@ -133,8 +137,26 @@
         // or the input is shutdown
         ByteBuffer b = ByteBuffer.allocate(1024);
         while (channel.read(b) != -1) {
+            b.flip();
+            if (read.remaining() < b.remaining()) {
+                int required = read.capacity() - read.remaining() + b.remaining();
+                int log2required = 32 - Integer.numberOfLeadingZeros(required - 1);
+                ByteBuffer newBuffer = ByteBuffer.allocate(1 << log2required);
+                newBuffer.put(read.flip());
+                read = newBuffer;
+            }
+            read.put(b);
             b.clear();
         }
+        ByteBuffer close = ByteBuffer.wrap(new byte[]{(byte) 0x88, 0x00});
+        while (close.hasRemaining()) {
+            channel.write(close);
+        }
+    }
+
+    public ByteBuffer read() throws InterruptedException {
+        readReady.await();
+        return read.duplicate().asReadOnlyBuffer().flip();
     }
 
     public void open() throws IOException {
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/websocket/Frame.java	Wed Mar 07 17:16:28 2018 +0000
@@ -0,0 +1,497 @@
+/*
+ * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+import java.nio.ByteBuffer;
+
+/* Copied from jdk.internal.net.http.websocket.Frame */
+final class Frame {
+
+    final Opcode opcode;
+    final ByteBuffer data;
+    final boolean last;
+
+    public Frame(Opcode opcode, ByteBuffer data, boolean last) {
+        this.opcode = opcode;
+        /* copy */
+        this.data = ByteBuffer.allocate(data.remaining()).put(data.slice()).flip();
+        this.last = last;
+    }
+
+    static final int MAX_HEADER_SIZE_BYTES = 2 + 8 + 4;
+    static final int MAX_CONTROL_FRAME_PAYLOAD_SIZE = 125;
+
+    enum Opcode {
+
+        CONTINUATION   (0x0),
+        TEXT           (0x1),
+        BINARY         (0x2),
+        NON_CONTROL_0x3(0x3),
+        NON_CONTROL_0x4(0x4),
+        NON_CONTROL_0x5(0x5),
+        NON_CONTROL_0x6(0x6),
+        NON_CONTROL_0x7(0x7),
+        CLOSE          (0x8),
+        PING           (0x9),
+        PONG           (0xA),
+        CONTROL_0xB    (0xB),
+        CONTROL_0xC    (0xC),
+        CONTROL_0xD    (0xD),
+        CONTROL_0xE    (0xE),
+        CONTROL_0xF    (0xF);
+
+        private static final Opcode[] opcodes;
+
+        static {
+            Opcode[] values = values();
+            opcodes = new Opcode[values.length];
+            for (Opcode c : values) {
+                opcodes[c.code] = c;
+            }
+        }
+
+        private final byte code;
+
+        Opcode(int code) {
+            this.code = (byte) code;
+        }
+
+        boolean isControl() {
+            return (code & 0x8) != 0;
+        }
+
+        static Opcode ofCode(int code) {
+            return opcodes[code & 0xF];
+        }
+    }
+
+    /*
+     * A utility for masking frame payload data.
+     */
+    static final class Masker {
+
+        // Exploiting ByteBuffer's ability to read/write multi-byte integers
+        private final ByteBuffer acc = ByteBuffer.allocate(8);
+        private final int[] maskBytes = new int[4];
+        private int offset;
+        private long maskLong;
+
+        /*
+         * Reads all remaining bytes from the given input buffer, masks them
+         * with the supplied mask and writes the resulting bytes to the given
+         * output buffer.
+         *
+         * The source and the destination buffers may be the same instance.
+         */
+        static void transferMasking(ByteBuffer src, ByteBuffer dst, int mask) {
+            if (src.remaining() > dst.remaining()) {
+                throw new IllegalArgumentException();
+            }
+            new Masker().mask(mask).transferMasking(src, dst);
+        }
+
+        /*
+         * Clears this instance's state and sets the mask.
+         *
+         * The behaviour is as if the mask was set on a newly created instance.
+         */
+        Masker mask(int value) {
+            acc.clear().putInt(value).putInt(value).flip();
+            for (int i = 0; i < maskBytes.length; i++) {
+                maskBytes[i] = acc.get(i);
+            }
+            offset = 0;
+            maskLong = acc.getLong(0);
+            return this;
+        }
+
+        /*
+         * Reads as many remaining bytes as possible from the given input
+         * buffer, masks them with the previously set mask and writes the
+         * resulting bytes to the given output buffer.
+         *
+         * The source and the destination buffers may be the same instance. If
+         * the mask hasn't been previously set it is assumed to be 0.
+         */
+        Masker transferMasking(ByteBuffer src, ByteBuffer dst) {
+            begin(src, dst);
+            loop(src, dst);
+            end(src, dst);
+            return this;
+        }
+
+        /*
+         * Applies up to 3 remaining from the previous pass bytes of the mask.
+         */
+        private void begin(ByteBuffer src, ByteBuffer dst) {
+            if (offset == 0) { // No partially applied mask from the previous invocation
+                return;
+            }
+            int i = src.position(), j = dst.position();
+            final int srcLim = src.limit(), dstLim = dst.limit();
+            for (; offset < 4 && i < srcLim && j < dstLim; i++, j++, offset++)
+            {
+                dst.put(j, (byte) (src.get(i) ^ maskBytes[offset]));
+            }
+            offset &= 3; // Will become 0 if the mask has been fully applied
+            src.position(i);
+            dst.position(j);
+        }
+
+        /*
+         * Gallops one long (mask + mask) at a time.
+         */
+        private void loop(ByteBuffer src, ByteBuffer dst) {
+            int i = src.position();
+            int j = dst.position();
+            final int srcLongLim = src.limit() - 7, dstLongLim = dst.limit() - 7;
+            for (; i < srcLongLim && j < dstLongLim; i += 8, j += 8) {
+                dst.putLong(j, src.getLong(i) ^ maskLong);
+            }
+            if (i > src.limit()) {
+                src.position(i - 8);
+            } else {
+                src.position(i);
+            }
+            if (j > dst.limit()) {
+                dst.position(j - 8);
+            } else {
+                dst.position(j);
+            }
+        }
+
+        /*
+         * Applies up to 7 remaining from the "galloping" phase bytes of the
+         * mask.
+         */
+        private void end(ByteBuffer src, ByteBuffer dst) {
+            assert Math.min(src.remaining(), dst.remaining()) < 8;
+            final int srcLim = src.limit(), dstLim = dst.limit();
+            int i = src.position(), j = dst.position();
+            for (; i < srcLim && j < dstLim;
+                 i++, j++, offset = (offset + 1) & 3) // offset cycles through 0..3
+            {
+                dst.put(j, (byte) (src.get(i) ^ maskBytes[offset]));
+            }
+            src.position(i);
+            dst.position(j);
+        }
+    }
+
+    /*
+     * A builder-style writer of frame headers.
+     *
+     * The writer does not enforce any protocol-level rules, it simply writes a
+     * header structure to the given buffer. The order of calls to intermediate
+     * methods is NOT significant.
+     */
+    static final class HeaderWriter {
+
+        private char firstChar;
+        private long payloadLen;
+        private int maskingKey;
+        private boolean mask;
+
+        HeaderWriter fin(boolean value) {
+            if (value) {
+                firstChar |=  0b10000000_00000000;
+            } else {
+                firstChar &= ~0b10000000_00000000;
+            }
+            return this;
+        }
+
+        HeaderWriter rsv1(boolean value) {
+            if (value) {
+                firstChar |=  0b01000000_00000000;
+            } else {
+                firstChar &= ~0b01000000_00000000;
+            }
+            return this;
+        }
+
+        HeaderWriter rsv2(boolean value) {
+            if (value) {
+                firstChar |=  0b00100000_00000000;
+            } else {
+                firstChar &= ~0b00100000_00000000;
+            }
+            return this;
+        }
+
+        HeaderWriter rsv3(boolean value) {
+            if (value) {
+                firstChar |=  0b00010000_00000000;
+            } else {
+                firstChar &= ~0b00010000_00000000;
+            }
+            return this;
+        }
+
+        HeaderWriter opcode(Opcode value) {
+            firstChar = (char) ((firstChar & 0xF0FF) | (value.code << 8));
+            return this;
+        }
+
+        HeaderWriter payloadLen(long value) {
+            if (value < 0) {
+                throw new IllegalArgumentException("Negative: " + value);
+            }
+            payloadLen = value;
+            firstChar &= 0b11111111_10000000; // Clear previous payload length leftovers
+            if (payloadLen < 126) {
+                firstChar |= payloadLen;
+            } else if (payloadLen < 65536) {
+                firstChar |= 126;
+            } else {
+                firstChar |= 127;
+            }
+            return this;
+        }
+
+        HeaderWriter mask(int value) {
+            firstChar |= 0b00000000_10000000;
+            maskingKey = value;
+            mask = true;
+            return this;
+        }
+
+        HeaderWriter noMask() {
+            firstChar &= ~0b00000000_10000000;
+            mask = false;
+            return this;
+        }
+
+        /*
+         * Writes the header to the given buffer.
+         *
+         * The buffer must have at least MAX_HEADER_SIZE_BYTES remaining. The
+         * buffer's position is incremented by the number of bytes written.
+         */
+        void write(ByteBuffer buffer) {
+            buffer.putChar(firstChar);
+            if (payloadLen >= 126) {
+                if (payloadLen < 65536) {
+                    buffer.putChar((char) payloadLen);
+                } else {
+                    buffer.putLong(payloadLen);
+                }
+            }
+            if (mask) {
+                buffer.putInt(maskingKey);
+            }
+        }
+    }
+
+    /*
+     * A consumer of frame parts.
+     *
+     * Frame.Reader invokes the consumer's methods in the following order:
+     *
+     *     fin rsv1 rsv2 rsv3 opcode mask payloadLength maskingKey? payloadData+ endFrame
+     */
+    interface Consumer {
+
+        void fin(boolean value);
+
+        void rsv1(boolean value);
+
+        void rsv2(boolean value);
+
+        void rsv3(boolean value);
+
+        void opcode(Opcode value);
+
+        void mask(boolean value);
+
+        void payloadLen(long value);
+
+        void maskingKey(int value);
+
+        /*
+         * Called by the Frame.Reader when a part of the (or a complete) payload
+         * is ready to be consumed.
+         *
+         * The sum of numbers of bytes consumed in each invocation of this
+         * method corresponding to the given frame WILL be equal to
+         * 'payloadLen', reported to `void payloadLen(long value)` before that.
+         *
+         * In particular, if `payloadLen` is 0, then there WILL be a single
+         * invocation to this method.
+         *
+         * No unmasking is done.
+         */
+        void payloadData(ByteBuffer data);
+
+        void endFrame();
+    }
+
+    /*
+     * A Reader of frames.
+     *
+     * No protocol-level rules are checked.
+     */
+    static final class Reader {
+
+        private static final int AWAITING_FIRST_BYTE  =  1;
+        private static final int AWAITING_SECOND_BYTE =  2;
+        private static final int READING_16_LENGTH    =  4;
+        private static final int READING_64_LENGTH    =  8;
+        private static final int READING_MASK         = 16;
+        private static final int READING_PAYLOAD      = 32;
+
+        // Exploiting ByteBuffer's ability to read multi-byte integers
+        private final ByteBuffer accumulator = ByteBuffer.allocate(8);
+        private int state = AWAITING_FIRST_BYTE;
+        private boolean mask;
+        private long remainingPayloadLength;
+
+        /*
+         * Reads at most one frame from the given buffer invoking the consumer's
+         * methods corresponding to the frame parts found.
+         *
+         * As much of the frame's payload, if any, is read. The buffer's
+         * position is updated to reflect the number of bytes read.
+         *
+         * Throws FailWebSocketException if detects the frame is malformed.
+         */
+        void readFrame(ByteBuffer input, Consumer consumer) {
+            loop:
+            while (true) {
+                byte b;
+                switch (state) {
+                    case AWAITING_FIRST_BYTE:
+                        if (!input.hasRemaining()) {
+                            break loop;
+                        }
+                        b = input.get();
+                        consumer.fin( (b & 0b10000000) != 0);
+                        consumer.rsv1((b & 0b01000000) != 0);
+                        consumer.rsv2((b & 0b00100000) != 0);
+                        consumer.rsv3((b & 0b00010000) != 0);
+                        consumer.opcode(Opcode.ofCode(b));
+                        state = AWAITING_SECOND_BYTE;
+                        continue loop;
+                    case AWAITING_SECOND_BYTE:
+                        if (!input.hasRemaining()) {
+                            break loop;
+                        }
+                        b = input.get();
+                        consumer.mask(mask = (b & 0b10000000) != 0);
+                        byte p1 = (byte) (b & 0b01111111);
+                        if (p1 < 126) {
+                            assert p1 >= 0 : p1;
+                            consumer.payloadLen(remainingPayloadLength = p1);
+                            state = mask ? READING_MASK : READING_PAYLOAD;
+                        } else if (p1 < 127) {
+                            state = READING_16_LENGTH;
+                        } else {
+                            state = READING_64_LENGTH;
+                        }
+                        continue loop;
+                    case READING_16_LENGTH:
+                        if (!input.hasRemaining()) {
+                            break loop;
+                        }
+                        b = input.get();
+                        if (accumulator.put(b).position() < 2) {
+                            continue loop;
+                        }
+                        remainingPayloadLength = accumulator.flip().getChar();
+                        if (remainingPayloadLength < 126) {
+                            throw notMinimalEncoding(remainingPayloadLength);
+                        }
+                        consumer.payloadLen(remainingPayloadLength);
+                        accumulator.clear();
+                        state = mask ? READING_MASK : READING_PAYLOAD;
+                        continue loop;
+                    case READING_64_LENGTH:
+                        if (!input.hasRemaining()) {
+                            break loop;
+                        }
+                        b = input.get();
+                        if (accumulator.put(b).position() < 8) {
+                            continue loop;
+                        }
+                        remainingPayloadLength = accumulator.flip().getLong();
+                        if (remainingPayloadLength < 0) {
+                            throw negativePayload(remainingPayloadLength);
+                        } else if (remainingPayloadLength < 65536) {
+                            throw notMinimalEncoding(remainingPayloadLength);
+                        }
+                        consumer.payloadLen(remainingPayloadLength);
+                        accumulator.clear();
+                        state = mask ? READING_MASK : READING_PAYLOAD;
+                        continue loop;
+                    case READING_MASK:
+                        if (!input.hasRemaining()) {
+                            break loop;
+                        }
+                        b = input.get();
+                        if (accumulator.put(b).position() != 4) {
+                            continue loop;
+                        }
+                        consumer.maskingKey(accumulator.flip().getInt());
+                        accumulator.clear();
+                        state = READING_PAYLOAD;
+                        continue loop;
+                    case READING_PAYLOAD:
+                        // This state does not require any bytes to be available
+                        // in the input buffer in order to proceed
+                        int deliverable = (int) Math.min(remainingPayloadLength,
+                                                         input.remaining());
+                        int oldLimit = input.limit();
+                        input.limit(input.position() + deliverable);
+                        if (deliverable != 0 || remainingPayloadLength == 0) {
+                            consumer.payloadData(input);
+                        }
+                        int consumed = deliverable - input.remaining();
+                        if (consumed < 0) {
+                            // Consumer cannot consume more than there was available
+                            throw new InternalError();
+                        }
+                        input.limit(oldLimit);
+                        remainingPayloadLength -= consumed;
+                        if (remainingPayloadLength == 0) {
+                            consumer.endFrame();
+                            state = AWAITING_FIRST_BYTE;
+                        }
+                        break loop;
+                    default:
+                        throw new InternalError(String.valueOf(state));
+                }
+            }
+        }
+
+        private static IllegalArgumentException negativePayload(long payloadLength)
+        {
+            return new IllegalArgumentException("Negative payload length: "
+                                                        + payloadLength);
+        }
+
+        private static IllegalArgumentException notMinimalEncoding(long payloadLength)
+        {
+            return new IllegalArgumentException("Not minimally-encoded payload length:"
+                                                      + payloadLength);
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/websocket/MessageQueueTestDriver.java	Wed Mar 07 17:16:28 2018 +0000
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * @test
+ * @bug 8159053
+ * @modules java.net.http/jdk.internal.net.http.websocket:open
+ * @run testng/othervm
+ *      --add-reads java.net.http=ALL-UNNAMED
+ *      java.net.http/jdk.internal.net.http.websocket.MessageQueueTest
+ */
+public final class MessageQueueTestDriver { }
--- a/test/jdk/java/net/httpclient/websocket/WebSocketImplDriver.java	Wed Mar 07 15:39:25 2018 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,31 +0,0 @@
-/*
- * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-/*
- * @test
- * @modules java.net.http/jdk.internal.net.http.websocket:open
- * @run testng/othervm
- *      --add-reads java.net.http=ALL-UNNAMED
- *      java.net.http/jdk.internal.net.http.websocket.WebSocketImplTest
- */
-public class WebSocketImplDriver { }
--- a/test/jdk/java/net/httpclient/websocket/WebSocketTest.java	Wed Mar 07 15:39:25 2018 +0000
+++ b/test/jdk/java/net/httpclient/websocket/WebSocketTest.java	Wed Mar 07 17:16:28 2018 +0000
@@ -24,9 +24,8 @@
 /*
  * @test
  * @build DummyWebSocketServer
- * @run testng/othervm -Djdk.httpclient.HttpClient.log=trace WebSocketTest
+ * @run testng/othervm WebSocketTest
  */
-
 import org.testng.annotations.Test;
 
 import java.io.IOException;
@@ -36,12 +35,14 @@
 import java.nio.channels.SocketChannel;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 
 import static java.net.http.HttpClient.newHttpClient;
 import static java.net.http.WebSocket.NORMAL_CLOSURE;
@@ -58,16 +59,19 @@
     private static final Class<IllegalStateException> ISE = IllegalStateException.class;
     private static final Class<IOException> IOE = IOException.class;
 
-    @Test
-    public void abort() throws Exception {
+//    @Test
+    public void immediateAbort() throws Exception {
         try (DummyWebSocketServer server = serverWithCannedData(0x81, 0x00, 0x88, 0x00)) {
             server.open();
             CompletableFuture<Void> messageReceived = new CompletableFuture<>();
             WebSocket ws = newHttpClient()
                     .newWebSocketBuilder()
                     .buildAsync(server.getURI(), new WebSocket.Listener() {
+
                         @Override
-                        public void onOpen(WebSocket webSocket) { /* no initial request */ }
+                        public void onOpen(WebSocket webSocket) {
+                            /* no initial request */
+                        }
 
                         @Override
                         public CompletionStage<?> onText(WebSocket webSocket,
@@ -133,15 +137,17 @@
             try {
                 messageReceived.get(10, TimeUnit.SECONDS);
                 fail();
-            } catch (TimeoutException expected) { }
-            // TODO: No send operations MUST succeed
-//            assertCompletesExceptionally(IOE, ws.sendText("text!", false));
-//            assertCompletesExceptionally(IOE, ws.sendText("text!", true));
-//            assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(16), false));
-//            assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(16), true));
-//            assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(16)));
-//            assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(16)));
-//            assertCompletesExceptionally(IOE, ws.sendClose(NORMAL_CLOSURE, "a reason"));
+            } catch (TimeoutException expected) {
+                System.out.println("Finished waiting");
+            }
+            assertCompletesExceptionally(IOE, ws.sendText("text!", false));
+            assertCompletesExceptionally(IOE, ws.sendText("text!", true));
+            assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(16), false));
+            assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(16), true));
+            assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(16)));
+            assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(16)));
+            // Checked last because it changes the state of WebSocket
+            assertCompletesExceptionally(IOE, ws.sendClose(NORMAL_CLOSURE, "a reason"));
         }
     }
 
@@ -154,15 +160,40 @@
             @Override
             protected void serve(SocketChannel channel) throws IOException {
                 ByteBuffer closeMessage = ByteBuffer.wrap(copy);
-                int wrote = channel.write(closeMessage);
-                System.out.println("Wrote bytes: " + wrote);
+                channel.write(closeMessage);
                 super.serve(channel);
             }
         };
     }
 
+    private static void assertCompletesExceptionally(Class<? extends Throwable> clazz,
+                                                     CompletableFuture<?> stage) {
+        stage.handle((result, error) -> {
+            if (error instanceof CompletionException) {
+                Throwable cause = error.getCause();
+                if (cause == null) {
+                    throw new AssertionError("Unexpected null cause: " + error);
+                }
+                assertException(clazz, cause);
+            } else {
+                assertException(clazz, error);
+            }
+            return null;
+        }).join();
+    }
+
+    private static void assertException(Class<? extends Throwable> clazz,
+                                        Throwable t) {
+        if (t == null) {
+            throw new AssertionError("Expected " + clazz + ", caught nothing");
+        }
+        if (!clazz.isInstance(t)) {
+            throw new AssertionError("Expected " + clazz + ", caught " + t);
+        }
+    }
+
     @Test
-    public void testNull() throws IOException {
+    public void sendMethodsThrowNPE() throws IOException {
         try (DummyWebSocketServer server = new DummyWebSocketServer()) {
             server.open();
             WebSocket ws = newHttpClient()
@@ -177,11 +208,25 @@
             assertThrows(NPE, () -> ws.sendPing(null));
             assertThrows(NPE, () -> ws.sendPong(null));
             assertThrows(NPE, () -> ws.sendClose(NORMAL_CLOSURE, null));
+
+            ws.abort();
+
+            assertThrows(NPE, () -> ws.sendText(null, false));
+            assertThrows(NPE, () -> ws.sendText(null, true));
+            assertThrows(NPE, () -> ws.sendBinary(null, false));
+            assertThrows(NPE, () -> ws.sendBinary(null, true));
+            assertThrows(NPE, () -> ws.sendPing(null));
+            assertThrows(NPE, () -> ws.sendPong(null));
+            assertThrows(NPE, () -> ws.sendClose(NORMAL_CLOSURE, null));
         }
     }
 
+    // TODO: request in onClose/onError
+    // TODO: throw exception in onClose/onError
+    // TODO: exception is thrown from request()
+
     @Test
-    public void testSendClose1() throws IOException {
+    public void sendCloseCompleted() throws IOException {
         try (DummyWebSocketServer server = new DummyWebSocketServer()) {
             server.open();
             WebSocket ws = newHttpClient()
@@ -197,7 +242,7 @@
     }
 
     @Test
-    public void testSendClose2() throws Exception {
+    public void sendClosePending() throws Exception {
         try (DummyWebSocketServer server = notReadingServer()) {
             server.open();
             WebSocket ws = newHttpClient()
@@ -205,7 +250,7 @@
                     .buildAsync(server.getURI(), new WebSocket.Listener() { })
                     .join();
             ByteBuffer data = ByteBuffer.allocate(65536);
-            for (int i = 0; ; i++) {
+            for (int i = 0; ; i++) { // fill up the send buffer
                 System.out.println("cycle #" + i);
                 try {
                     ws.sendBinary(data, true).get(10, TimeUnit.SECONDS);
@@ -215,12 +260,11 @@
                 }
             }
             CompletableFuture<WebSocket> cf = ws.sendClose(NORMAL_CLOSURE, "");
+            // The output closes even if the Close message has not been sent
+            assertFalse(cf.isDone());
             assertTrue(ws.isOutputClosed());
             assertFalse(ws.isInputClosed());
             assertEquals(ws.getSubprotocol(), "");
-            // The output closes regardless of whether or not the Close message
-            // has been sent
-            assertFalse(cf.isDone());
         }
     }
 
@@ -242,6 +286,78 @@
         };
     }
 
+//    @Test
+    public void abortPendingSendBinary() throws Exception {
+        try (DummyWebSocketServer server = notReadingServer()) {
+            server.open();
+            WebSocket ws = newHttpClient()
+                    .newWebSocketBuilder()
+                    .buildAsync(server.getURI(), new WebSocket.Listener() { })
+                    .join();
+            ByteBuffer data = ByteBuffer.allocate(65536);
+            CompletableFuture<WebSocket> cf = null;
+            for (int i = 0; ; i++) {  // fill up the send buffer
+                System.out.println("cycle #" + i);
+                try {
+                    cf = ws.sendBinary(data, true);
+                    cf.get(10, TimeUnit.SECONDS);
+                    data.clear();
+                } catch (TimeoutException e) {
+                    break;
+                }
+            }
+            ws.abort();
+            assertTrue(ws.isOutputClosed());
+            assertTrue(ws.isInputClosed());
+            assertCompletesExceptionally(IOException.class, cf);
+        }
+    }
+
+//    @Test
+    public void abortPendingSendText() throws Exception {
+        try (DummyWebSocketServer server = notReadingServer()) {
+            server.open();
+            WebSocket ws = newHttpClient()
+                    .newWebSocketBuilder()
+                    .buildAsync(server.getURI(), new WebSocket.Listener() { })
+                    .join();
+            String data = stringWith2NBytes(32768);
+            CompletableFuture<WebSocket> cf = null;
+            for (int i = 0; ; i++) {  // fill up the send buffer
+                System.out.println("cycle #" + i);
+                try {
+                    cf = ws.sendText(data, true);
+                    cf.get(10, TimeUnit.SECONDS);
+                } catch (TimeoutException e) {
+                    break;
+                }
+            }
+            ws.abort();
+            assertTrue(ws.isOutputClosed());
+            assertTrue(ws.isInputClosed());
+            assertCompletesExceptionally(IOException.class, cf);
+        }
+    }
+
+    private static String stringWith2NBytes(int n) {
+        // -- Russian Alphabet (33 characters, 2 bytes per char) --
+        char[] abc = {
+                0x0410, 0x0411, 0x0412, 0x0413, 0x0414, 0x0415, 0x0401, 0x0416,
+                0x0417, 0x0418, 0x0419, 0x041A, 0x041B, 0x041C, 0x041D, 0x041E,
+                0x041F, 0x0420, 0x0421, 0x0422, 0x0423, 0x0424, 0x0425, 0x0426,
+                0x0427, 0x0428, 0x0429, 0x042A, 0x042B, 0x042C, 0x042D, 0x042E,
+                0x042F,
+        };
+        // repeat cyclically
+        StringBuilder sb = new StringBuilder(n);
+        for (int i = 0, j = 0; i < n; i++, j = (j + 1) % abc.length) {
+            sb.append(abc[j]);
+        }
+        String s = sb.toString();
+        assert s.length() == n && s.getBytes(StandardCharsets.UTF_8).length == 2 * n;
+        return s;
+    }
+
     @Test
     public void testIllegalArgument() throws IOException {
         try (DummyWebSocketServer server = new DummyWebSocketServer()) {
@@ -263,10 +379,10 @@
             assertCompletesExceptionally(IAE, ws.sendPong(ByteBuffer.allocate(129)));
             assertCompletesExceptionally(IAE, ws.sendPong(ByteBuffer.allocate(256)));
 
-            assertCompletesExceptionally(IAE, ws.sendText(incompleteString(), true));
-            assertCompletesExceptionally(IAE, ws.sendText(incompleteString(), false));
-            assertCompletesExceptionally(IAE, ws.sendText(malformedString(), true));
-            assertCompletesExceptionally(IAE, ws.sendText(malformedString(), false));
+            assertCompletesExceptionally(IOE, ws.sendText(incompleteString(), true));
+            assertCompletesExceptionally(IOE, ws.sendText(incompleteString(), false));
+            assertCompletesExceptionally(IOE, ws.sendText(malformedString(), true));
+            assertCompletesExceptionally(IOE, ws.sendText(malformedString(), false));
 
             assertCompletesExceptionally(IAE, ws.sendClose(NORMAL_CLOSURE, stringWithNBytes(124)));
             assertCompletesExceptionally(IAE, ws.sendClose(NORMAL_CLOSURE, stringWithNBytes(125)));
@@ -316,58 +432,13 @@
     }
 
     private static String stringWithNBytes(int n) {
-        StringBuilder sb = new StringBuilder(n);
-        for (int i = 0; i < n; i++) {
-            sb.append("A");
-        }
-        return sb.toString();
-    }
-
-    private static String stringWith2NBytes(int n) {
-        // Russian alphabet repeated cyclically
-        char FIRST = '\u0410';
-        char LAST = '\u042F';
-        StringBuilder sb = new StringBuilder(n);
-        char c = FIRST;
-        for (int i = 0; i < n; i++) {
-            if (++c > LAST) {
-                c = FIRST;
-            }
-            sb.append(c);
-        }
-        String s = sb.toString();
-        assert s.length() == n && s.getBytes(StandardCharsets.UTF_8).length == 2 * n;
-        return s;
-    }
-
-    private static void assertCompletesExceptionally(Class<? extends Throwable> clazz,
-                                                     CompletableFuture<?> stage) {
-        stage.handle((result, error) -> {
-            if (error instanceof CompletionException) {
-                Throwable cause = error.getCause();
-                if (cause == null) {
-                    throw new AssertionError("Unexpected null cause: " + error);
-                }
-                assertException(clazz, cause);
-            } else {
-                assertException(clazz, error);
-            }
-            return null;
-        }).join();
-    }
-
-    private static void assertException(Class<? extends Throwable> clazz,
-                                        Throwable t) {
-        if (t == null) {
-            throw new AssertionError("Expected " + clazz + ", caught nothing");
-        }
-        if (!clazz.isInstance(t)) {
-            throw new AssertionError("Expected " + clazz + ", caught " + t);
-        }
+        char[] chars = new char[n];
+        Arrays.fill(chars, 'A');
+        return new String(chars);
     }
 
     @Test
-    public void testIllegalStateOutstanding1() throws Exception {
+    public void outstanding1() throws Exception {
         try (DummyWebSocketServer server = notReadingServer()) {
             server.open();
             WebSocket ws = newHttpClient()
@@ -376,7 +447,7 @@
                     .join();
 
             ByteBuffer data = ByteBuffer.allocate(65536);
-            for (int i = 0; ; i++) {
+            for (int i = 0; ; i++) {  // fill up the send buffer
                 System.out.println("cycle #" + i);
                 try {
                     ws.sendBinary(data, true).get(10, TimeUnit.SECONDS);
@@ -391,7 +462,7 @@
     }
 
     @Test
-    public void testIllegalStateOutstanding2() throws Exception {
+    public void outstanding2() throws Exception {
         try (DummyWebSocketServer server = notReadingServer()) {
             server.open();
             WebSocket ws = newHttpClient()
@@ -400,7 +471,7 @@
                     .join();
 
             CharBuffer data = CharBuffer.allocate(65536);
-            for (int i = 0; ; i++) {
+            for (int i = 0; ; i++) {  // fill up the send buffer
                 System.out.println("cycle #" + i);
                 try {
                     ws.sendText(data, true).get(10, TimeUnit.SECONDS);
@@ -415,7 +486,7 @@
     }
 
     @Test
-    public void testIllegalStateIntermixed1() throws IOException {
+    public void interleavingTypes1() throws IOException {
         try (DummyWebSocketServer server = new DummyWebSocketServer()) {
             server.open();
             WebSocket ws = newHttpClient()
@@ -430,7 +501,7 @@
     }
 
     @Test
-    public void testIllegalStateIntermixed2() throws IOException {
+    public void interleavingTypes2() throws IOException {
         try (DummyWebSocketServer server = new DummyWebSocketServer()) {
             server.open();
             WebSocket ws = newHttpClient()
@@ -445,7 +516,7 @@
     }
 
     @Test
-    public void testIllegalStateSendClose() throws IOException {
+    public void sendMethodsThrowIOE1() throws IOException {
         try (DummyWebSocketServer server = new DummyWebSocketServer()) {
             server.open();
             WebSocket ws = newHttpClient()
@@ -453,31 +524,33 @@
                     .buildAsync(server.getURI(), new WebSocket.Listener() { })
                     .join();
 
-            ws.sendClose(NORMAL_CLOSURE, "normal close").join();
+            ws.sendClose(NORMAL_CLOSURE, "ok").join();
+
+            assertCompletesExceptionally(IOE, ws.sendClose(WebSocket.NORMAL_CLOSURE, "ok"));
 
-            assertCompletesExceptionally(ISE, ws.sendText("", true));
-            assertCompletesExceptionally(ISE, ws.sendText("", false));
-            assertCompletesExceptionally(ISE, ws.sendText("abc", true));
-            assertCompletesExceptionally(ISE, ws.sendText("abc", false));
-            assertCompletesExceptionally(ISE, ws.sendBinary(ByteBuffer.allocate(0), true));
-            assertCompletesExceptionally(ISE, ws.sendBinary(ByteBuffer.allocate(0), false));
-            assertCompletesExceptionally(ISE, ws.sendBinary(ByteBuffer.allocate(1), true));
-            assertCompletesExceptionally(ISE, ws.sendBinary(ByteBuffer.allocate(1), false));
+            assertCompletesExceptionally(IOE, ws.sendText("", true));
+            assertCompletesExceptionally(IOE, ws.sendText("", false));
+            assertCompletesExceptionally(IOE, ws.sendText("abc", true));
+            assertCompletesExceptionally(IOE, ws.sendText("abc", false));
+            assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(0), true));
+            assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(0), false));
+            assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(1), true));
+            assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(1), false));
 
-            assertCompletesExceptionally(ISE, ws.sendPing(ByteBuffer.allocate(125)));
-            assertCompletesExceptionally(ISE, ws.sendPing(ByteBuffer.allocate(124)));
-            assertCompletesExceptionally(ISE, ws.sendPing(ByteBuffer.allocate(1)));
-            assertCompletesExceptionally(ISE, ws.sendPing(ByteBuffer.allocate(0)));
+            assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(125)));
+            assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(124)));
+            assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(1)));
+            assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(0)));
 
-            assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(125)));
-            assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(124)));
-            assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(1)));
-            assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(0)));
+            assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(125)));
+            assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(124)));
+            assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(1)));
+            assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(0)));
         }
     }
 
     @Test
-    public void testIllegalStateOnClose() throws Exception {
+    public void sendMethodsThrowIOE2() throws Exception {
         try (DummyWebSocketServer server = serverWithCannedData(0x88, 0x00)) {
             server.open();
             CompletableFuture<Void> onCloseCalled = new CompletableFuture<>();
@@ -490,7 +563,7 @@
                         public CompletionStage<?> onClose(WebSocket webSocket,
                                                           int statusCode,
                                                           String reason) {
-                            System.out.println("onClose(" + statusCode + ")");
+                            System.out.printf("onClose(%s, '%s')%n", statusCode, reason);
                             onCloseCalled.complete(null);
                             return canClose;
                         }
@@ -498,38 +571,143 @@
                         @Override
                         public void onError(WebSocket webSocket, Throwable error) {
                             System.out.println("onError(" + error + ")");
-                            error.printStackTrace();
+                            onCloseCalled.completeExceptionally(error);
                         }
                     })
                     .join();
 
             onCloseCalled.join();      // Wait for onClose to be called
+            canClose.complete(null);   // Signal to the WebSocket it can close the output
             TimeUnit.SECONDS.sleep(5); // Give canClose some time to reach the WebSocket
-            canClose.complete(null);   // Signal to the WebSocket it can close the output
+
+            assertCompletesExceptionally(IOE, ws.sendClose(WebSocket.NORMAL_CLOSURE, "ok"));
 
-            assertCompletesExceptionally(ISE, ws.sendText("", true));
-            assertCompletesExceptionally(ISE, ws.sendText("", false));
-            assertCompletesExceptionally(ISE, ws.sendText("abc", true));
-            assertCompletesExceptionally(ISE, ws.sendText("abc", false));
-            assertCompletesExceptionally(ISE, ws.sendBinary(ByteBuffer.allocate(0), true));
-            assertCompletesExceptionally(ISE, ws.sendBinary(ByteBuffer.allocate(0), false));
-            assertCompletesExceptionally(ISE, ws.sendBinary(ByteBuffer.allocate(1), true));
-            assertCompletesExceptionally(ISE, ws.sendBinary(ByteBuffer.allocate(1), false));
+            assertCompletesExceptionally(IOE, ws.sendText("", true));
+            assertCompletesExceptionally(IOE, ws.sendText("", false));
+            assertCompletesExceptionally(IOE, ws.sendText("abc", true));
+            assertCompletesExceptionally(IOE, ws.sendText("abc", false));
+            assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(0), true));
+            assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(0), false));
+            assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(1), true));
+            assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(1), false));
 
-            assertCompletesExceptionally(ISE, ws.sendPing(ByteBuffer.allocate(125)));
-            assertCompletesExceptionally(ISE, ws.sendPing(ByteBuffer.allocate(124)));
-            assertCompletesExceptionally(ISE, ws.sendPing(ByteBuffer.allocate(1)));
-            assertCompletesExceptionally(ISE, ws.sendPing(ByteBuffer.allocate(0)));
+            assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(125)));
+            assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(124)));
+            assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(1)));
+            assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(0)));
 
-            assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(125)));
-            assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(124)));
-            assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(1)));
-            assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(0)));
+            assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(125)));
+            assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(124)));
+            assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(1)));
+            assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(0)));
         }
     }
 
     @Test
-    public void simpleAggregatingMessages() throws IOException {
+    public void simpleAggregatingBinaryMessages() throws IOException {
+        List<byte[]> expected = List.of("alpha", "beta", "gamma", "delta")
+                .stream()
+                .map(s -> s.getBytes(StandardCharsets.US_ASCII))
+                .collect(Collectors.toList());
+        int[] binary = new int[]{
+                0x82, 0x05, 0x61, 0x6c, 0x70, 0x68, 0x61, // [alpha]
+                0x02, 0x02, 0x62, 0x65,                   // [be
+                0x80, 0x02, 0x74, 0x61,                   // ta]
+                0x02, 0x01, 0x67,                         // [g
+                0x00, 0x01, 0x61,                         // a
+                0x00, 0x00,                               //
+                0x00, 0x00,                               //
+                0x00, 0x01, 0x6d,                         // m
+                0x00, 0x01, 0x6d,                         // m
+                0x80, 0x01, 0x61,                         // a]
+                0x8a, 0x00,                               // <PONG>
+                0x02, 0x04, 0x64, 0x65, 0x6c, 0x74,       // [delt
+                0x00, 0x01, 0x61,                         // a
+                0x80, 0x00,                               // ]
+                0x88, 0x00                                // <CLOSE>
+        };
+        CompletableFuture<List<byte[]>> actual = new CompletableFuture<>();
+
+        try (DummyWebSocketServer server = serverWithCannedData(binary)) {
+            server.open();
+
+            WebSocket.Listener listener = new WebSocket.Listener() {
+
+                List<byte[]> collectedBytes = new ArrayList<>();
+                ByteBuffer binary;
+
+                @Override
+                public CompletionStage<?> onBinary(WebSocket webSocket,
+                                                   ByteBuffer message,
+                                                   WebSocket.MessagePart part) {
+                    System.out.printf("onBinary(%s, %s)%n", message, part);
+                    webSocket.request(1);
+                    byte[] bytes = null;
+                    switch (part) {
+                        case FIRST:
+                            binary = ByteBuffer.allocate(message.remaining() * 2);
+                        case PART:
+                            append(message);
+                            return null;
+                        case LAST:
+                            append(message);
+                            binary.flip();
+                            bytes = new byte[binary.remaining()];
+                            binary.get(bytes);
+                            binary.clear();
+                            break;
+                        case WHOLE:
+                            bytes = new byte[message.remaining()];
+                            message.get(bytes);
+                            break;
+                    }
+                    processWholeBinary(bytes);
+                    return null;
+                }
+
+                private void append(ByteBuffer message) {
+                    if (binary.remaining() < message.remaining()) {
+                        assert message.remaining() > 0;
+                        int cap = (binary.capacity() + message.remaining()) * 2;
+                        ByteBuffer b = ByteBuffer.allocate(cap);
+                        b.put(binary.flip());
+                        binary = b;
+                    }
+                    binary.put(message);
+                }
+
+                private void processWholeBinary(byte[] bytes) {
+                    String stringBytes = new String(bytes, StandardCharsets.UTF_8);
+                    System.out.println("processWholeBinary: " + stringBytes);
+                    collectedBytes.add(bytes);
+                }
+
+                @Override
+                public CompletionStage<?> onClose(WebSocket webSocket,
+                                                  int statusCode,
+                                                  String reason) {
+                    actual.complete(collectedBytes);
+                    return null;
+                }
+
+                @Override
+                public void onError(WebSocket webSocket, Throwable error) {
+                    actual.completeExceptionally(error);
+                }
+            };
+
+            newHttpClient().newWebSocketBuilder()
+                           .buildAsync(server.getURI(), listener)
+                           .join();
+
+            List<byte[]> a = actual.join();
+            System.out.println("joined");
+            assertEquals(a, expected);
+        }
+    }
+
+    @Test
+    public void simpleAggregatingTextMessages() throws IOException {
 
         List<String> expected = List.of("alpha", "beta", "gamma", "delta");
 
@@ -557,24 +735,25 @@
 
             WebSocket.Listener listener = new WebSocket.Listener() {
 
-                List<String> collected = new ArrayList<>();
-                StringBuilder text = new StringBuilder();
+                List<String> collectedStrings = new ArrayList<>();
+                StringBuilder text;
 
                 @Override
                 public CompletionStage<?> onText(WebSocket webSocket,
                                                  CharSequence message,
                                                  WebSocket.MessagePart part) {
+                    System.out.printf("onText(%s, %s)%n", message, part);
                     webSocket.request(1);
                     String str = null;
                     switch (part) {
                         case FIRST:
+                            text = new StringBuilder(message.length() * 2);
                         case PART:
                             text.append(message);
                             return null;
                         case LAST:
                             text.append(message);
                             str = text.toString();
-                            text.setLength(0);
                             break;
                         case WHOLE:
                             str = message.toString();
@@ -586,30 +765,38 @@
 
                 private void processWholeText(String string) {
                     System.out.println(string);
-                    // -- your code here --
-                    collected.add(string);
+                    collectedStrings.add(string);
                 }
 
                 @Override
                 public CompletionStage<?> onClose(WebSocket webSocket,
                                                   int statusCode,
                                                   String reason) {
-                    actual.complete(collected);
+                    actual.complete(collectedStrings);
                     return null;
                 }
+
+                @Override
+                public void onError(WebSocket webSocket, Throwable error) {
+                    actual.completeExceptionally(error);
+                }
             };
 
             newHttpClient().newWebSocketBuilder()
-                    .buildAsync(server.getURI(), listener)
-                    .join();
+                           .buildAsync(server.getURI(), listener)
+                           .join();
 
             List<String> a = actual.join();
             assertEquals(a, expected);
         }
     }
 
+    /*
+     * Exercises the scenario where requests for more messages are made prior to
+     * completing the returned CompletionStage instances.
+     */
     @Test
-    public void aggregatingMessages() throws IOException {
+    public void aggregatingTextMessages() throws IOException {
 
         List<String> expected = List.of("alpha", "beta", "gamma", "delta");
 
@@ -638,7 +825,12 @@
 
             WebSocket.Listener listener = new WebSocket.Listener() {
 
-                List<CharSequence> parts = new ArrayList<>();
+                List<CharSequence> parts;
+                /*
+                 * A CompletableFuture which will complete once the current
+                 * message has been fully assembled (LAST/WHOLE). Until then
+                 * the listener returns this instance for every call.
+                 */
                 CompletableFuture<?> currentCf;
                 List<String> collected = new ArrayList<>();
 
@@ -653,6 +845,7 @@
                             processWholeMessage(List.of(message), cf);
                             return cf;
                         case FIRST:
+                            parts = new ArrayList<>();
                             parts.add(message);
                             currentCf = new CompletableFuture<>();
                             currentCf.thenRun(() -> webSocket.request(1));
@@ -664,12 +857,11 @@
                             break;
                         case LAST:
                             parts.add(message);
-                            List<CharSequence> copy = List.copyOf(parts);
-                            parts.clear();
-                            CompletableFuture<?> cf1 = currentCf;
+                            CompletableFuture<?> copyCf = this.currentCf;
+                            processWholeMessage(parts, copyCf);
                             currentCf = null;
-                            processWholeMessage(copy, cf1);
-                            return cf1;
+                            parts = null;
+                            return copyCf;
                     }
                     return currentCf;
                 }
@@ -682,6 +874,11 @@
                     return null;
                 }
 
+                @Override
+                public void onError(WebSocket webSocket, Throwable error) {
+                    actual.completeExceptionally(error);
+                }
+
                 public void processWholeMessage(List<CharSequence> data,
                                                 CompletableFuture<?> cf) {
                     StringBuilder b = new StringBuilder();
@@ -692,10 +889,10 @@
                     collected.add(s);
                 }
             };
-            WebSocket ws = newHttpClient()
-                    .newWebSocketBuilder()
-                    .buildAsync(server.getURI(), listener)
-                    .join();
+
+            newHttpClient().newWebSocketBuilder()
+                           .buildAsync(server.getURI(), listener)
+                           .join();
 
             List<String> a = actual.join();
             assertEquals(a, expected);
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/websocket/WebSocketTextTest.java	Wed Mar 07 17:16:28 2018 +0000
@@ -0,0 +1,318 @@
+/*
+ * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.http.WebSocket;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static java.net.http.HttpClient.newHttpClient;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+/*
+ * @test
+ * @bug 8159053
+ *
+ *
+ * @run testng/othervm
+ *      -Djdk.httpclient.websocket.writeBufferSize=1024
+ *      -Djdk.httpclient.websocket.intermediateBufferSize=2048 WebSocketTextTest
+ */
+public class WebSocketTextTest {
+
+    private final static Random random;
+    static {
+        long seed = System.currentTimeMillis();
+        System.out.println("seed=" + seed);
+        random = new Random(seed);
+    }
+
+// * @run testng/othervm
+// *      -Djdk.httpclient.websocket.writeBufferSize=16
+// *      -Djdk.httpclient.sendBufferSize=32 WebSocketTextTest
+
+
+
+    // FIXME ensure subsequent (sendText/Binary, false) only CONTINUATIONs
+
+    @Test(dataProvider = "binary")
+    public void binary(ByteBuffer expected) throws IOException, InterruptedException {
+        try (DummyWebSocketServer server = new DummyWebSocketServer()) {
+            server.open();
+            WebSocket ws = newHttpClient()
+                    .newWebSocketBuilder()
+                    .buildAsync(server.getURI(), new WebSocket.Listener() { })
+                    .join();
+            ws.sendBinary(expected.duplicate(), true).join();
+            ws.abort();
+            ByteBuffer data = server.read();
+            List<Frame> frames = readFrames(data);
+            assertEquals(frames.size(), 1);
+            Frame f = frames.get(0);
+            assertTrue(f.last);
+            assertEquals(f.opcode, Frame.Opcode.BINARY);
+            assertEquals(f.data, expected);
+        }
+    }
+
+    private static List<Frame> readFrames(ByteBuffer src) {
+        List<Frame> frames = new ArrayList<>();
+        Frame.Consumer consumer = new Frame.Consumer() {
+
+            ByteBuffer data;
+            Frame.Opcode opcode;
+            Frame.Masker masker = new Frame.Masker();
+            boolean last;
+
+            @Override
+            public void fin(boolean value) {
+                last = value;
+            }
+
+            @Override
+            public void rsv1(boolean value) {
+                if (value) {
+                    throw new AssertionError();
+                }
+            }
+
+            @Override
+            public void rsv2(boolean value) {
+                if (value) {
+                    throw new AssertionError();
+                }
+            }
+
+            @Override
+            public void rsv3(boolean value) {
+                if (value) {
+                    throw new AssertionError();
+                }
+            }
+
+            @Override
+            public void opcode(Frame.Opcode value) {
+                opcode = value;
+            }
+
+            @Override
+            public void mask(boolean value) {
+                if (!value) { // Frames from the client MUST be masked
+                    throw new AssertionError();
+                }
+            }
+
+            @Override
+            public void payloadLen(long value) {
+                data = ByteBuffer.allocate((int) value);
+            }
+
+            @Override
+            public void maskingKey(int value) {
+                masker.mask(value);
+            }
+
+            @Override
+            public void payloadData(ByteBuffer data) {
+                masker.transferMasking(data, this.data);
+            }
+
+            @Override
+            public void endFrame() {
+                frames.add(new Frame(opcode, this.data.flip(), last));
+            }
+        };
+
+        Frame.Reader r = new Frame.Reader();
+        while (src.hasRemaining()) {
+            r.readFrame(src, consumer);
+        }
+        return frames;
+    }
+
+    @Test(dataProvider = "pingPong")
+    public void ping(ByteBuffer expected) throws Exception {
+        try (DummyWebSocketServer server = new DummyWebSocketServer()) {
+            server.open();
+            WebSocket ws = newHttpClient()
+                    .newWebSocketBuilder()
+                    .buildAsync(server.getURI(), new WebSocket.Listener() { })
+                    .join();
+            ws.sendPing(expected.duplicate()).join();
+            ws.abort();
+            ByteBuffer data = server.read();
+            List<Frame> frames = readFrames(data);
+            assertEquals(frames.size(), 1);
+            Frame f = frames.get(0);
+            assertEquals(f.opcode, Frame.Opcode.PING);
+            ByteBuffer actual = ByteBuffer.allocate(expected.remaining());
+            actual.put(f.data);
+            actual.flip();
+            assertEquals(actual, expected);
+        }
+    }
+
+    @Test(dataProvider = "pingPong")
+    public void pong(ByteBuffer expected) throws Exception {
+        try (DummyWebSocketServer server = new DummyWebSocketServer()) {
+            server.open();
+            WebSocket ws = newHttpClient()
+                    .newWebSocketBuilder()
+                    .buildAsync(server.getURI(), new WebSocket.Listener() { })
+                    .join();
+            ws.sendPong(expected.duplicate()).join();
+            ws.abort();
+            ByteBuffer data = server.read();
+            List<Frame> frames = readFrames(data);
+            assertEquals(frames.size(), 1);
+            Frame f = frames.get(0);
+            assertEquals(f.opcode, Frame.Opcode.PONG);
+            ByteBuffer actual = ByteBuffer.allocate(expected.remaining());
+            actual.put(f.data);
+            actual.flip();
+            assertEquals(actual, expected);
+        }
+    }
+
+    @Test(dataProvider = "close")
+    public void close(int statusCode, String reason) throws Exception {
+        try (DummyWebSocketServer server = new DummyWebSocketServer()) {
+            server.open();
+            WebSocket ws = newHttpClient()
+                    .newWebSocketBuilder()
+                    .buildAsync(server.getURI(), new WebSocket.Listener() { })
+                    .join();
+            ws.sendClose(statusCode, reason).join();
+            ws.abort();
+            ByteBuffer data = server.read();
+            List<Frame> frames = readFrames(data);
+            assertEquals(frames.size(), 1);
+            Frame f = frames.get(0);
+            assertEquals(f.opcode, Frame.Opcode.CLOSE);
+            ByteBuffer actual = ByteBuffer.allocate(Frame.MAX_CONTROL_FRAME_PAYLOAD_SIZE);
+            actual.put(f.data);
+            actual.flip();
+            assertEquals(actual.getChar(), statusCode);
+            assertEquals(StandardCharsets.UTF_8.decode(actual).toString(), reason);
+        }
+    }
+
+    @Test(dataProvider = "text")
+    public void text(String expected) throws Exception {
+        try (DummyWebSocketServer server = new DummyWebSocketServer()) {
+            server.open();
+            WebSocket ws = newHttpClient()
+                    .newWebSocketBuilder()
+                    .buildAsync(server.getURI(), new WebSocket.Listener() { })
+                    .join();
+            ws.sendText(expected, true).join();
+            ws.abort();
+            ByteBuffer data = server.read();
+            List<Frame> frames = readFrames(data);
+
+            int maxBytes = (int) StandardCharsets.UTF_8.newEncoder().maxBytesPerChar() * expected.length();
+            ByteBuffer actual = ByteBuffer.allocate(maxBytes);
+            frames.stream().forEachOrdered(f -> actual.put(f.data));
+            actual.flip();
+            assertEquals(StandardCharsets.UTF_8.decode(actual).toString(), expected);
+        }
+    }
+
+    @DataProvider(name = "pingPong")
+    public Object[][] pingPongSizes() {
+        return new Object[][]{
+                {bytes(  0)},
+                {bytes(  1)},
+                {bytes( 63)},
+                {bytes(125)},
+        };
+    }
+
+    @DataProvider(name = "close")
+    public Object[][] closeArguments() {
+        return new Object[][]{
+                {WebSocket.NORMAL_CLOSURE, utf8String( 0)},
+                {WebSocket.NORMAL_CLOSURE, utf8String( 1)},
+                // 123 / 3 = max reason bytes / max bytes per char
+                {WebSocket.NORMAL_CLOSURE, utf8String(41)},
+        };
+    }
+
+    private static String utf8String(int n) {
+        char[] abc = {
+                // -- English Alphabet (26 characters, 1 byte per char) --
+                0x0041, 0x0042, 0x0043, 0x0044, 0x0045, 0x0046, 0x0047, 0x0048,
+                0x0049, 0x004A, 0x004B, 0x004C, 0x004D, 0x004E, 0x004F, 0x0050,
+                0x0051, 0x0052, 0x0053, 0x0054, 0x0055, 0x0056, 0x0057, 0x0058,
+                0x0059, 0x005A,
+                // -- Russian Alphabet (33 characters, 2 bytes per char) --
+                0x0410, 0x0411, 0x0412, 0x0413, 0x0414, 0x0415, 0x0401, 0x0416,
+                0x0417, 0x0418, 0x0419, 0x041A, 0x041B, 0x041C, 0x041D, 0x041E,
+                0x041F, 0x0420, 0x0421, 0x0422, 0x0423, 0x0424, 0x0425, 0x0426,
+                0x0427, 0x0428, 0x0429, 0x042A, 0x042B, 0x042C, 0x042D, 0x042E,
+                0x042F,
+                // -- Hiragana base characters (46 characters, 3 bytes per char) --
+                0x3042, 0x3044, 0x3046, 0x3048, 0x304A, 0x304B, 0x304D, 0x304F,
+                0x3051, 0x3053, 0x3055, 0x3057, 0x3059, 0x305B, 0x305D, 0x305F,
+                0x3061, 0x3064, 0x3066, 0x3068, 0x306A, 0x306B, 0x306C, 0x306D,
+                0x306E, 0x306F, 0x3072, 0x3075, 0x3078, 0x307B, 0x307E, 0x307F,
+                0x3080, 0x3081, 0x3082, 0x3084, 0x3086, 0x3088, 0x3089, 0x308A,
+                0x308B, 0x308C, 0x308D, 0x308F, 0x3092, 0x3093,
+        };
+
+        assert new String(abc).getBytes(StandardCharsets.UTF_8).length > abc.length;
+
+        StringBuilder str = new StringBuilder(n);
+        random.ints(0, abc.length).limit(n).forEach(i -> str.append(abc[i]));
+        return str.toString();
+    }
+
+    @DataProvider(name = "text")
+    public Object[][] texts() {
+        return new Object[][]{
+                {utf8String(   0)},
+                {utf8String(1024)},
+        };
+    }
+
+    @DataProvider(name = "binary")
+    public Object[][] binary() {
+        return new Object[][]{
+                {bytes(   0)},
+                {bytes(1024)},
+        };
+    }
+
+    private static ByteBuffer bytes(int n) {
+        byte[] array = new byte[n];
+        random.nextBytes(array);
+        return ByteBuffer.wrap(array);
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/websocket/java.net.http/jdk/internal/net/http/websocket/MessageQueueTest.java	Wed Mar 07 17:16:28 2018 +0000
@@ -0,0 +1,449 @@
+/*
+ * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.internal.net.http.websocket;
+
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
+
+public class MessageQueueTest {
+
+    private static final Random r = new SecureRandom();
+
+    @DataProvider(name = "illegalCapacities")
+    public static Object[][] illegalCapacities() {
+        return new Object[][]{
+                new Object[]{Integer.MIN_VALUE},
+                new Object[]{-2},
+                new Object[]{-1},
+                new Object[]{ 0},
+        };
+    }
+
+    @Test(dataProvider = "illegalCapacities")
+    public void illegalCapacity(int n) {
+        assertThrows(IllegalArgumentException.class, () -> new MessageQueue(n));
+    }
+
+    @Test(dataProvider = "capacities")
+    public void emptiness(int n) {
+        assertTrue(new MessageQueue(n).isEmpty());
+    }
+
+    @Test(dataProvider = "capacities")
+    public void fullness(int n) throws IOException {
+        MessageQueue q = new MessageQueue(n);
+        Adder adder = new Adder();
+        Queue<Message> referenceQueue = new LinkedList<>();
+        for (int i = 0; i < n; i++) {
+            Message m = createRandomMessage();
+            referenceQueue.add(m);
+            adder.add(q, m);
+        }
+        for (int i = 0; i < n + 1; i++) {
+            Message m = createRandomMessage();
+            assertThrows(IOException.class, () -> adder.add(q, m));
+        }
+        for (int i = 0; i < n; i++) {
+            Message expected = referenceQueue.remove();
+            Message actual = new Remover().removeFrom(q);
+            assertEquals(actual, expected);
+        }
+    }
+
+    private Message createRandomMessage() {
+        Message.Type[] values = Message.Type.values();
+        Message.Type type = values[r.nextInt(values.length)];
+        ByteBuffer binary = null;
+        CharBuffer text = null;
+        boolean isLast = false;
+        int statusCode = -1;
+        switch (type) {
+            case TEXT:
+                text = CharBuffer.allocate(r.nextInt(17));
+                isLast = r.nextBoolean();
+                break;
+            case BINARY:
+                binary = ByteBuffer.allocate(r.nextInt(19));
+                isLast = r.nextBoolean();
+                break;
+            case PING:
+                binary = ByteBuffer.allocate(r.nextInt(19));
+                break;
+            case PONG:
+                binary = ByteBuffer.allocate(r.nextInt(19));
+                break;
+            case CLOSE:
+                text = CharBuffer.allocate(r.nextInt(17));
+                statusCode = r.nextInt();
+                break;
+            default:
+                throw new AssertionError();
+        }
+        BiConsumer<Integer, Throwable> action = new BiConsumer<>() {
+            @Override
+            public void accept(Integer o, Throwable throwable) { }
+        };
+        CompletableFuture<Integer> future = new CompletableFuture<>();
+        return new Message(type, binary, text, isLast, statusCode, r.nextInt(),
+                           action, future);
+    }
+
+    @Test(dataProvider = "capacities")
+    public void caterpillarWalk(int n) throws IOException {
+//        System.out.println("n: " + n);
+        for (int p = 1; p <= n; p++) { // pace
+//            System.out.println("  pace: " + p);
+            MessageQueue q = new MessageQueue(n);
+            Queue<Message> referenceQueue = new LinkedList<>();
+            Adder adder = new Adder();
+            for (int k = 0; k < (n / p) + 1; k++) {
+//                System.out.println("    cycle: " + k);
+                for (int i = 0; i < p; i++) {
+                    Message m = createRandomMessage();
+                    referenceQueue.add(m);
+                    adder.add(q, m);
+                }
+                Remover remover = new Remover();
+                for (int i = 0; i < p; i++) {
+                    Message expected = referenceQueue.remove();
+                    Message actual = remover.removeFrom(q);
+                    assertEquals(actual, expected);
+                }
+                assertTrue(q.isEmpty());
+            }
+        }
+    }
+
+    /* Exercises only concurrent additions */
+    @Test
+    public void halfConcurrency() throws ExecutionException, InterruptedException {
+        int n = Runtime.getRuntime().availableProcessors() + 2;
+        ExecutorService executorService = Executors.newFixedThreadPool(n);
+        CyclicBarrier start = new CyclicBarrier(n);
+        Adder adder = new Adder();
+        List<Future<?>> futures = new ArrayList<>(n);
+        try {
+            for (int k = 0; k < 1024; k++) {
+                MessageQueue q = new MessageQueue(n);
+                for (int i = 0; i < n; i++) {
+                    Message m = createRandomMessage();
+                    Future<Void> f = executorService.submit(() -> {
+                        start.await();
+                        adder.add(q, m);
+                        return null;
+                    });
+                    futures.add(f);
+                }
+                for (Future<?> f : futures) {
+                    f.get();    // Just to check for exceptions
+                }
+                futures.clear();
+                // Make sure the queue is full
+                assertThrows(IOException.class,
+                             () -> adder.add(q, createRandomMessage()));
+            }
+        } finally {
+            executorService.shutdownNow();
+        }
+    }
+
+    // TODO: same message; different messages; a mix thereof
+
+    @Test
+    public void concurrency() throws ExecutionException, InterruptedException {
+        int nProducers = Runtime.getRuntime().availableProcessors() + 2;
+        int nThreads = nProducers + 1;
+        ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+        CyclicBarrier start = new CyclicBarrier(nThreads);
+        MessageQueue q = new MessageQueue(nProducers);
+        Adder adder = new Adder();
+        Remover remover = new Remover();
+        List<Message> expectedList = new ArrayList<>(nProducers);
+        List<Message> actualList = new ArrayList<>(nProducers);
+        List<Future<?>> futures = new ArrayList<>(nProducers);
+        try {
+            for (int k = 0; k < 1024; k++) {
+                for (int i = 0; i < nProducers; i++) {
+                    Message m = createRandomMessage();
+                    expectedList.add(m);
+                    Future<Void> f = executorService.submit(() -> {
+                        start.await();
+                        adder.add(q, m);
+                        return null;
+                    });
+                    futures.add(f);
+                }
+                Future<Void> consumer = executorService.submit(() -> {
+                    int i = 0;
+                    start.await();
+                    while (i < nProducers) {
+                        Message m = remover.removeFrom(q);
+                        if (m != null) {
+                            actualList.add(m);
+                            i++;
+                        }
+                    }
+                    return null;
+                });
+                for (Future<?> f : futures) {
+                    f.get();    // Just to check for exceptions
+                }
+                consumer.get(); // Waiting for consumer to collect all the messages
+                assertEquals(actualList.size(), expectedList.size());
+                for (Message m : expectedList) {
+                    assertTrue(actualList.remove(m));
+                }
+                assertTrue(actualList.isEmpty());
+                assertTrue(q.isEmpty());
+                expectedList.clear();
+                futures.clear();
+            }
+        } finally {
+            executorService.shutdownNow();
+        }
+    }
+
+    @Test(dataProvider = "capacities")
+    public void testSingleThreaded(int n) throws IOException {
+        Queue<Message> referenceQueue = new LinkedList<>();
+        MessageQueue q = new MessageQueue(n);
+        Adder adder = new Adder();
+        for (int i = 0; i < n; i++) {
+            Message m = createRandomMessage();
+            referenceQueue.add(m);
+            adder.add(q, m);
+        }
+        for (int i = 0; i < n; i++) {
+            Message expected = referenceQueue.remove();
+            Message actual = new Remover().removeFrom(q);
+            assertEquals(actual, expected);
+        }
+        assertTrue(q.isEmpty());
+    }
+
+    @DataProvider(name = "capacities")
+    public Object[][] capacities() {
+        return new Object[][]{
+                new Object[]{  1},
+                new Object[]{  2},
+                new Object[]{  3},
+                new Object[]{  4},
+                new Object[]{  5},
+                new Object[]{  6},
+                new Object[]{  7},
+                new Object[]{  8},
+                new Object[]{  9},
+                new Object[]{128},
+                new Object[]{256},
+        };
+    }
+
+    // -- auxiliary test infrastructure --
+
+    static class Adder {
+
+        @SuppressWarnings("unchecked")
+        void add(MessageQueue q, Message m) throws IOException {
+            switch (m.type) {
+                case TEXT:
+                    q.addText(m.text, m.isLast, m.attachment, m.action, m.future);
+                    break;
+                case BINARY:
+                    q.addBinary(m.binary, m.isLast, m.attachment, m.action, m.future);
+                    break;
+                case PING:
+                    q.addPing(m.binary, m.attachment, m.action, m.future);
+                    break;
+                case PONG:
+                    q.addPong(m.binary, m.attachment, m.action, m.future);
+                    break;
+                case CLOSE:
+                    q.addClose(m.statusCode, m.text, m.attachment, m.action, m.future);
+                    break;
+                default:
+                    throw new InternalError();
+            }
+        }
+    }
+
+    static class Remover {
+
+        Message removeFrom(MessageQueue q) {
+            Message m = q.peek(new MessageQueue.QueueCallback<>() {
+
+                boolean called;
+
+                @Override
+                public <T> Message onText(CharBuffer message,
+                                          boolean isLast,
+                                          T attachment,
+                                          BiConsumer<? super T, ? super Throwable> action,
+                                          CompletableFuture<? super T> future) {
+                    assertFalse(called);
+                    called = true;
+                    return new Message(Message.Type.TEXT, null, message, isLast,
+                                       -1, attachment, action, future);
+                }
+
+                @Override
+                public <T> Message onBinary(ByteBuffer message,
+                                            boolean isLast,
+                                            T attachment,
+                                            BiConsumer<? super T, ? super Throwable> action,
+                                            CompletableFuture<? super T> future) {
+                    assertFalse(called);
+                    called = true;
+                    return new Message(Message.Type.BINARY, message, null, isLast,
+                                       -1, attachment, action, future);
+                }
+
+                @Override
+                public <T> Message onPing(ByteBuffer message,
+                                          T attachment,
+                                          BiConsumer<? super T, ? super Throwable> action,
+                                          CompletableFuture<? super T> future) {
+                    assertFalse(called);
+                    called = true;
+                    return new Message(Message.Type.PING, message, null, false,
+                                       -1, attachment, action, future);
+                }
+
+                @Override
+                public <T> Message onPong(ByteBuffer message,
+                                          T attachment,
+                                          BiConsumer<? super T, ? super Throwable> action,
+                                          CompletableFuture<? super T> future) {
+                    assertFalse(called);
+                    called = true;
+                    return new Message(Message.Type.PONG, message, null, false,
+                                       -1, attachment, action, future);
+                }
+
+                @Override
+                public <T> Message onClose(int statusCode,
+                                           CharBuffer reason,
+                                           T attachment,
+                                           BiConsumer<? super T, ? super Throwable> action,
+                                           CompletableFuture<? super T> future) {
+                    assertFalse(called);
+                    called = true;
+                    return new Message(Message.Type.CLOSE, null, reason, false,
+                                       statusCode, attachment, action, future);
+                }
+
+                @Override
+                public Message onEmpty() throws RuntimeException {
+                    return null;
+                }
+            });
+            if (m != null) {
+                q.remove();
+            }
+            return m;
+        }
+    }
+
+    static class Message {
+
+        private final Type type;
+        private final ByteBuffer binary;
+        private final CharBuffer text;
+        private final boolean isLast;
+        private final int statusCode;
+        private final Object attachment;
+        @SuppressWarnings("rawtypes")
+        private final BiConsumer action;
+        @SuppressWarnings("rawtypes")
+        private final CompletableFuture future;
+
+        <T> Message(Type type,
+                    ByteBuffer binary,
+                    CharBuffer text,
+                    boolean isLast,
+                    int statusCode,
+                    T attachment,
+                    BiConsumer<? super T, ? super Throwable> action,
+                    CompletableFuture<? super T> future) {
+            this.type = type;
+            this.binary = binary;
+            this.text = text;
+            this.isLast = isLast;
+            this.statusCode = statusCode;
+            this.attachment = attachment;
+            this.action = action;
+            this.future = future;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(type, binary, text, isLast, statusCode, attachment, action, future);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Message message = (Message) o;
+            return isLast == message.isLast &&
+                    statusCode == message.statusCode &&
+                    type == message.type &&
+                    Objects.equals(binary, message.binary) &&
+                    Objects.equals(text, message.text) &&
+                    Objects.equals(attachment, message.attachment) &&
+                    Objects.equals(action, message.action) &&
+                    Objects.equals(future, message.future);
+        }
+
+        enum Type {
+            TEXT,
+            BINARY,
+            PING,
+            PONG,
+            CLOSE
+        }
+    }
+}
--- a/test/jdk/java/net/httpclient/websocket/java.net.http/jdk/internal/net/http/websocket/WebSocketImplTest.java	Wed Mar 07 15:39:25 2018 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,453 +0,0 @@
-/*
- * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.internal.net.http.websocket;
-
-import java.net.http.WebSocket;
-import org.testng.annotations.Test;
-
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-
-import static java.net.http.WebSocket.MessagePart.FIRST;
-import static java.net.http.WebSocket.MessagePart.LAST;
-import static java.net.http.WebSocket.MessagePart.PART;
-import static java.net.http.WebSocket.MessagePart.WHOLE;
-import static java.net.http.WebSocket.NORMAL_CLOSURE;
-import static jdk.internal.net.http.websocket.MockListener.Invocation.onClose;
-import static jdk.internal.net.http.websocket.MockListener.Invocation.onError;
-import static jdk.internal.net.http.websocket.MockListener.Invocation.onOpen;
-import static jdk.internal.net.http.websocket.MockListener.Invocation.onPing;
-import static jdk.internal.net.http.websocket.MockListener.Invocation.onPong;
-import static jdk.internal.net.http.websocket.MockListener.Invocation.onText;
-import static jdk.internal.net.http.websocket.MockTransport.onClose;
-import static jdk.internal.net.http.websocket.MockTransport.onPing;
-import static jdk.internal.net.http.websocket.MockTransport.onPong;
-import static jdk.internal.net.http.websocket.MockTransport.onText;
-import static jdk.internal.net.http.websocket.TestSupport.assertCompletesExceptionally;
-import static org.testng.Assert.assertEquals;
-
-/*
- * Formatting in this file may seem strange:
- *
- *  (
- *   ( ...)
- *  ...
- *  )
- *  ...
- *
- *  However there is a rationale behind it. Sometimes the level of argument
- *  nesting is high, which makes it hard to manage parentheses.
- */
-public class WebSocketImplTest {
-
-    // TODO: request in onClose/onError
-    // TODO: throw exception in onClose/onError
-    // TODO: exception is thrown from request()
-    // TODO: repeated sendClose complete normally
-    // TODO: default Close message is sent if IAE is thrown from sendClose
-
-    @Test
-    public void testNonPositiveRequest() throws Exception {
-        MockListener listener = new MockListener(Long.MAX_VALUE) {
-            @Override
-            protected void onOpen0(WebSocket webSocket) {
-                webSocket.request(0);
-            }
-        };
-        WebSocket ws = newInstance(listener, List.of(now(onText("1", WHOLE))));
-        listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
-        List<MockListener.Invocation> invocations = listener.invocations();
-        assertEquals(
-                invocations,
-                List.of(
-                        onOpen(ws),
-                        onError(ws, IllegalArgumentException.class)
-                )
-        );
-    }
-
-    @Test
-    public void testText1() throws Exception {
-        MockListener listener = new MockListener(Long.MAX_VALUE);
-        WebSocket ws = newInstance(
-                listener,
-                List.of(
-                        now(onText("1", FIRST)),
-                        now(onText("2", PART)),
-                        now(onText("3", LAST)),
-                        now(onClose(NORMAL_CLOSURE, "no reason"))
-                )
-        );
-        listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
-        List<MockListener.Invocation> invocations = listener.invocations();
-        assertEquals(
-                invocations,
-                List.of(
-                        onOpen(ws),
-                        onText(ws, "1", FIRST),
-                        onText(ws, "2", PART),
-                        onText(ws, "3", LAST),
-                        onClose(ws, NORMAL_CLOSURE, "no reason")
-                )
-        );
-    }
-
-    @Test
-    public void testText2() throws Exception {
-        MockListener listener = new MockListener(Long.MAX_VALUE);
-        WebSocket ws = newInstance(
-                listener,
-                List.of(
-                        now(onText("1", FIRST)),
-                        seconds(1, onText("2", PART)),
-                        now(onText("3", LAST)),
-                        seconds(1, onClose(NORMAL_CLOSURE, "no reason"))
-                )
-        );
-        listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
-        List<MockListener.Invocation> invocations = listener.invocations();
-        assertEquals(
-                invocations,
-                List.of(
-                        onOpen(ws),
-                        onText(ws, "1", FIRST),
-                        onText(ws, "2", PART),
-                        onText(ws, "3", LAST),
-                        onClose(ws, NORMAL_CLOSURE, "no reason")
-                )
-        );
-    }
-
-    @Test
-    public void testTextIntermixedWithPongs() throws Exception {
-        MockListener listener = new MockListener(Long.MAX_VALUE);
-        WebSocket ws = newInstance(
-                listener,
-                List.of(
-                        now(onText("1", FIRST)),
-                        now(onText("2", PART)),
-                        now(onPong(ByteBuffer.allocate(16))),
-                        seconds(1, onPong(ByteBuffer.allocate(32))),
-                        now(onText("3", LAST)),
-                        now(onPong(ByteBuffer.allocate(64))),
-                        now(onClose(NORMAL_CLOSURE, "no reason"))
-                )
-        );
-        listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
-        List<MockListener.Invocation> invocations = listener.invocations();
-        assertEquals(
-                invocations,
-                List.of(
-                        onOpen(ws),
-                        onText(ws, "1", FIRST),
-                        onText(ws, "2", PART),
-                        onPong(ws, ByteBuffer.allocate(16)),
-                        onPong(ws, ByteBuffer.allocate(32)),
-                        onText(ws, "3", LAST),
-                        onPong(ws, ByteBuffer.allocate(64)),
-                        onClose(ws, NORMAL_CLOSURE, "no reason")
-                )
-        );
-    }
-
-    @Test
-    public void testTextIntermixedWithPings() throws Exception {
-        MockListener listener = new MockListener(Long.MAX_VALUE);
-        WebSocket ws = newInstance(
-                listener,
-                List.of(
-                        now(onText("1", FIRST)),
-                        now(onText("2", PART)),
-                        now(onPing(ByteBuffer.allocate(16))),
-                        seconds(1, onPing(ByteBuffer.allocate(32))),
-                        now(onText("3", LAST)),
-                        now(onPing(ByteBuffer.allocate(64))),
-                        now(onClose(NORMAL_CLOSURE, "no reason"))
-                )
-        );
-        listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
-        List<MockListener.Invocation> invocations = listener.invocations();
-        assertEquals(
-                invocations,
-                List.of(
-                        onOpen(ws),
-                        onText(ws, "1", FIRST),
-                        onText(ws, "2", PART),
-                        onPing(ws, ByteBuffer.allocate(16)),
-                        onPing(ws, ByteBuffer.allocate(32)),
-                        onText(ws, "3", LAST),
-                        onPing(ws, ByteBuffer.allocate(64)),
-                        onClose(ws, NORMAL_CLOSURE, "no reason"))
-        );
-    }
-
-    // Tease out "java.lang.IllegalStateException: Send pending" due to possible
-    // race between sending a message and replenishing the permit
-    @Test
-    public void testManyTextMessages() {
-        WebSocketImpl ws = newInstance(
-                new MockListener(1),
-                new TransportFactory() {
-                    @Override
-                    public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
-                                                            MessageStreamConsumer consumer) {
-
-                        final Random r = new Random();
-
-                        return new MockTransport<>(sendResultSupplier, consumer) {
-                            @Override
-                            protected CompletableFuture<T> defaultSend() {
-                                return millis(r.nextInt(100), result());
-                            }
-                        };
-                    }
-                });
-        int NUM_MESSAGES = 512;
-        CompletableFuture<WebSocket> current = CompletableFuture.completedFuture(ws);
-        for (int i = 0; i < NUM_MESSAGES; i++) {
-            current = current.thenCompose(w -> w.sendText(" ", true));
-        }
-        current.join();
-        MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport();
-        assertEquals(transport.invocations().size(), NUM_MESSAGES);
-    }
-
-    @Test
-    public void testManyBinaryMessages() {
-        WebSocketImpl ws = newInstance(
-                new MockListener(1),
-                new TransportFactory() {
-                    @Override
-                    public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
-                                                            MessageStreamConsumer consumer) {
-
-                        final Random r = new Random();
-
-                        return new MockTransport<>(sendResultSupplier, consumer) {
-                            @Override
-                            protected CompletableFuture<T> defaultSend() {
-                                return millis(r.nextInt(150), result());
-                            }
-                        };
-                    }
-                });
-        CompletableFuture<WebSocket> start = new CompletableFuture<>();
-
-        int NUM_MESSAGES = 512;
-        CompletableFuture<WebSocket> current = start;
-        for (int i = 0; i < NUM_MESSAGES; i++) {
-            current = current.thenComposeAsync(w -> w.sendBinary(ByteBuffer.allocate(1), true));
-        }
-
-        start.completeAsync(() -> ws);
-        current.join();
-
-        MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport();
-        assertEquals(transport.invocations().size(), NUM_MESSAGES);
-    }
-
-
-    @Test
-    public void sendTextImmediately() {
-        WebSocketImpl ws = newInstance(
-                new MockListener(1),
-                new TransportFactory() {
-                    @Override
-                    public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
-                                                            MessageStreamConsumer consumer) {
-                        return new MockTransport<>(sendResultSupplier, consumer);
-                    }
-                });
-        CompletableFuture.completedFuture(ws)
-                .thenCompose(w -> w.sendText("1", true))
-                .thenCompose(w -> w.sendText("2", true))
-                .thenCompose(w -> w.sendText("3", true))
-                .join();
-        MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport();
-        assertEquals(transport.invocations().size(), 3);
-    }
-
-    @Test
-    public void sendTextWithDelay() {
-        MockListener listener = new MockListener(1);
-        WebSocketImpl ws = newInstance(
-                listener,
-                new TransportFactory() {
-                    @Override
-                    public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
-                                                            MessageStreamConsumer consumer) {
-                        return new MockTransport<>(sendResultSupplier, consumer) {
-                            @Override
-                            protected CompletableFuture<T> defaultSend() {
-                                return seconds(1, result());
-                            }
-                        };
-                    }
-                });
-        CompletableFuture.completedFuture(ws)
-                .thenCompose(w -> w.sendText("1", true))
-                .thenCompose(w -> w.sendText("2", true))
-                .thenCompose(w -> w.sendText("3", true))
-                .join();
-        assertEquals(listener.invocations(), List.of(onOpen(ws)));
-        MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport();
-        assertEquals(transport.invocations().size(), 3);
-    }
-
-    @Test
-    public void sendTextMixedDelay() {
-        MockListener listener = new MockListener(1);
-        WebSocketImpl ws = newInstance(
-                listener,
-                new TransportFactory() {
-
-                    final Random r = new Random();
-
-                    @Override
-                    public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
-                                                            MessageStreamConsumer consumer) {
-                        return new MockTransport<>(sendResultSupplier, consumer) {
-                            @Override
-                            protected CompletableFuture<T> defaultSend() {
-                                return r.nextBoolean()
-                                        ? seconds(1, result())
-                                        : now(result());
-                            }
-                        };
-                    }
-                });
-        CompletableFuture.completedFuture(ws)
-                .thenCompose(w -> w.sendText("1", true))
-                .thenCompose(w -> w.sendText("2", true))
-                .thenCompose(w -> w.sendText("3", true))
-                .thenCompose(w -> w.sendText("4", true))
-                .thenCompose(w -> w.sendText("5", true))
-                .thenCompose(w -> w.sendText("6", true))
-                .thenCompose(w -> w.sendText("7", true))
-                .thenCompose(w -> w.sendText("8", true))
-                .thenCompose(w -> w.sendText("9", true))
-                .join();
-        assertEquals(listener.invocations(), List.of(onOpen(ws)));
-        MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport();
-        assertEquals(transport.invocations().size(), 9);
-    }
-
-    @Test(enabled = false) // temporarily disabled
-    public void sendControlMessagesConcurrently() {
-        MockListener listener = new MockListener(1);
-
-        CompletableFuture<?> first = new CompletableFuture<>(); // barrier
-
-        WebSocketImpl ws = newInstance(
-                listener,
-                new TransportFactory() {
-
-                    final AtomicInteger i = new AtomicInteger();
-
-                    @Override
-                    public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
-                                                            MessageStreamConsumer consumer) {
-                        return new MockTransport<>(sendResultSupplier, consumer) {
-                            @Override
-                            protected CompletableFuture<T> defaultSend() {
-                                if (i.incrementAndGet() == 1) {
-                                    return first.thenApply(o -> result());
-                                } else {
-                                    return now(result());
-                                }
-                            }
-                        };
-                    }
-                });
-
-        CompletableFuture<?> cf1 = ws.sendPing(ByteBuffer.allocate(0));
-        CompletableFuture<?> cf2 = ws.sendPong(ByteBuffer.allocate(0));
-        CompletableFuture<?> cf3 = ws.sendClose(NORMAL_CLOSURE, "");
-        CompletableFuture<?> cf4 = ws.sendClose(NORMAL_CLOSURE, "");
-        CompletableFuture<?> cf5 = ws.sendPing(ByteBuffer.allocate(0));
-        CompletableFuture<?> cf6 = ws.sendPong(ByteBuffer.allocate(0));
-
-        first.complete(null);
-        // Don't care about exceptional completion, only that all of them have
-        // completed
-        CompletableFuture.allOf(cf1, cf2, cf3, cf4, cf5, cf6)
-                .handle((v, e) -> null).join();
-
-        cf3.join(); /* Check that sendClose has completed normally */
-        cf4.join(); /* Check that repeated sendClose has completed normally */
-        assertCompletesExceptionally(IllegalStateException.class, cf5);
-        assertCompletesExceptionally(IllegalStateException.class, cf6);
-
-        assertEquals(listener.invocations(), List.of(onOpen(ws)));
-        MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport();
-        assertEquals(transport.invocations().size(), 3); // 6 minus 3 that were not accepted
-    }
-
-    private static <T> CompletableFuture<T> seconds(long val, T result) {
-        return new CompletableFuture<T>()
-                .completeOnTimeout(result, val, TimeUnit.SECONDS);
-    }
-
-    private static <T> CompletableFuture<T> millis(long val, T result) {
-        return new CompletableFuture<T>()
-                .completeOnTimeout(result, val, TimeUnit.MILLISECONDS);
-    }
-
-    private static <T> CompletableFuture<T> now(T result) {
-        return CompletableFuture.completedFuture(result);
-    }
-
-    private static WebSocketImpl newInstance(
-            WebSocket.Listener listener,
-            Collection<CompletableFuture<Consumer<MessageStreamConsumer>>> input) {
-        TransportFactory factory = new TransportFactory() {
-            @Override
-            public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
-                                                    MessageStreamConsumer consumer) {
-                return new MockTransport<T>(sendResultSupplier, consumer) {
-                    @Override
-                    protected Collection<CompletableFuture<Consumer<MessageStreamConsumer>>> receive() {
-                        return input;
-                    }
-                };
-            }
-        };
-        return newInstance(listener, factory);
-    }
-
-    private static WebSocketImpl newInstance(WebSocket.Listener listener,
-                                             TransportFactory factory) {
-        URI uri = URI.create("ws://localhost");
-        String subprotocol = "";
-        return WebSocketImpl.newInstance(uri, subprotocol, listener, factory);
-    }
-}