http-client-branch: (WebSocket) refactoring for the sake of extra test coverage http-client-branch
authorprappo
Fri, 15 Dec 2017 00:47:16 +0300
branchhttp-client-branch
changeset 55988 7f1e0cf933a6
parent 55983 e4a1f0c9d4c6
child 55989 76ac25076fdc
http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OpeningHandshake.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transmitter.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transport.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/TransportFactory.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/TransportFactoryImpl.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/TransportImpl.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/TransportSupplier.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java
test/jdk/java/net/httpclient/websocket/ReceivingTestDriver.java
test/jdk/java/net/httpclient/websocket/SendingTestDriver.java
test/jdk/java/net/httpclient/websocket/WebSocketImplDriver.java
test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockListener.java
test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockReceiver.java
test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockTransmitter.java
test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockTransport.java
test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/ReceivingTest.java
test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/SendingTest.java
test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/TestSupport.java
test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/WebSocketImplTest.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OpeningHandshake.java	Thu Dec 14 18:41:57 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OpeningHandshake.java	Fri Dec 15 00:47:16 2017 +0300
@@ -197,9 +197,9 @@
     static final class Result {
 
         final String subprotocol;
-        final TransportSupplier transport;
+        final TransportFactory transport;
 
-        private Result(String subprotocol, TransportSupplier transport) {
+        private Result(String subprotocol, TransportFactory transport) {
             this.subprotocol = subprotocol;
             this.transport = transport;
         }
@@ -263,7 +263,7 @@
         }
         String subprotocol = checkAndReturnSubprotocol(headers);
         RawChannel channel = ((RawChannel.Provider) response).rawChannel();
-        return new Result(subprotocol, new TransportSupplier(channel));
+        return new Result(subprotocol, new TransportFactoryImpl(channel));
     }
 
     private String checkAndReturnSubprotocol(HttpHeaders responseHeaders)
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java	Thu Dec 14 18:41:57 2017 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,188 +0,0 @@
-/*
- * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.incubator.http.internal.websocket;
-
-import jdk.incubator.http.internal.common.Demand;
-import jdk.incubator.http.internal.common.SequentialScheduler;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-
-/*
- * Receives incoming data from the channel on demand and converts it into a
- * stream of WebSocket messages which are then delivered to the supplied message
- * consumer in a strict sequential order and non-recursively. In other words,
- *
- *     onText()
- *     onText()
- *     onBinary()
- *     ...
- *
- * instead of
- *
- *     onText()
- *       onText()
- *         onBinary()
- *     ...
- *
- * even if `request(long n)` is called from inside these invocations.
- */
-public class Receiver {
-
-    private final MessageStreamConsumer messageConsumer;
-    private final RawChannel channel;
-    private final FrameConsumer frameConsumer;
-    private final Frame.Reader reader = new Frame.Reader();
-    private final RawChannel.RawEvent event = createHandler();
-    protected final Demand demand = new Demand(); /* Exposed for testing purposes */
-    private final SequentialScheduler pushScheduler;
-
-    private ByteBuffer data;
-    private volatile int state;
-
-    private static final int UNREGISTERED = 0;
-    private static final int AVAILABLE    = 1;
-    private static final int WAITING      = 2;
-
-    public Receiver(MessageStreamConsumer messageConsumer, RawChannel channel) {
-        this.messageConsumer = messageConsumer;
-        this.channel = channel;
-        this.frameConsumer = new FrameConsumer(this.messageConsumer);
-        this.data = channel.initialByteBuffer();
-        // To ensure the initial non-final `data` will be visible
-        // (happens-before) when `handler` invokes `pushContinuously`
-        // the following assignment is done last:
-        pushScheduler = createScheduler();
-    }
-
-    /* Exposed for testing purposes */
-    protected SequentialScheduler createScheduler() {
-        return new SequentialScheduler(new PushContinuouslyTask());
-    }
-
-    private RawChannel.RawEvent createHandler() {
-        return new RawChannel.RawEvent() {
-
-            @Override
-            public int interestOps() {
-                return SelectionKey.OP_READ;
-            }
-
-            @Override
-            public void handle() {
-                state = AVAILABLE;
-                pushScheduler.runOrSchedule();
-            }
-        };
-    }
-
-    public void request(long n) {
-        if (demand.increase(n)) {
-            pushScheduler.runOrSchedule();
-        }
-    }
-
-    /*
-     * Why is this method needed? Since Receiver operates through callbacks
-     * this method allows to abstract out what constitutes as a message being
-     * received (i.e. to decide outside this type when exactly one should
-     * decrement the demand).
-     */
-    void acknowledge() {
-        long x = demand.decreaseAndGet(1);
-        if (x < 0) {
-            throw new InternalError(String.valueOf(x));
-        }
-    }
-
-    /*
-     * Stops the machinery from reading and delivering messages permanently,
-     * regardless of the current demand and data availability.
-     */
-    public void close() throws IOException {
-        pushScheduler.stop();
-        channel.shutdownInput();
-    }
-
-    private class PushContinuouslyTask
-        extends SequentialScheduler.CompleteRestartableTask
-    {
-        @Override
-        public void run() {
-            while (!pushScheduler.isStopped()) {
-                if (data.hasRemaining()) {
-                    if (!demand.isFulfilled()) {
-                        try {
-                            int oldPos = data.position();
-                            reader.readFrame(data, frameConsumer);
-                            int newPos = data.position();
-                            assert oldPos != newPos : data; // reader always consumes bytes
-                        } catch (Throwable e) {
-                            pushScheduler.stop();
-                            messageConsumer.onError(e);
-                        }
-                        continue;
-                    }
-                    break;
-                }
-                switch (state) {
-                    case WAITING:
-                        return;
-                    case UNREGISTERED:
-                        try {
-                            state = WAITING;
-                            channel.registerEvent(event);
-                        } catch (Throwable e) {
-                            pushScheduler.stop();
-                            messageConsumer.onError(e);
-                        }
-                        return;
-                    case AVAILABLE:
-                        try {
-                            data = channel.read();
-                        } catch (Throwable e) {
-                            pushScheduler.stop();
-                            messageConsumer.onError(e);
-                            return;
-                        }
-                        if (data == null) { // EOF
-                            pushScheduler.stop();
-                            messageConsumer.onComplete();
-                            return;
-                        } else if (!data.hasRemaining()) { // No data at the moment
-                            // Pretty much a "goto", reusing the existing code path
-                            // for registration
-                            state = UNREGISTERED;
-                        }
-                        continue;
-                    default:
-                        throw new InternalError(String.valueOf(state));
-                }
-            }
-        }
-    }
-}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transmitter.java	Thu Dec 14 18:41:57 2017 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,124 +0,0 @@
-/*
- * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.incubator.http.internal.websocket;
-
-import java.io.IOException;
-import java.nio.channels.SelectionKey;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
-
-import static java.util.Objects.requireNonNull;
-
-/*
- * Sends messages one at a time, in an asynchronous and non-blocking fashion.
- *
- * No matter whether the message has been fully sent or an error has occurred,
- * the transmitter reports the outcome to the supplied handler and becomes ready
- * to accept a new message. Until then, the transmitter is considered "busy" and
- * an IllegalStateException will be thrown on each attempt to invoke send.
- */
-public class Transmitter {
-
-    /* This flag is used solely for assertions */
-    private final AtomicBoolean busy = new AtomicBoolean();
-    private OutgoingMessage message;
-    private Consumer<Exception> completionHandler;
-    private final RawChannel channel;
-    private final RawChannel.RawEvent event;
-
-    public Transmitter(RawChannel channel) {
-        this.channel = channel;
-        this.event = createHandler();
-    }
-
-    /**
-     * The supplied handler may be invoked in the calling thread.
-     * A {@code StackOverflowError} may thus occur if there's a possibility
-     * that this method is called again by the supplied handler.
-     */
-    public void send(OutgoingMessage message,
-                     Consumer<Exception> completionHandler)
-    {
-        requireNonNull(message);
-        requireNonNull(completionHandler);
-        if (!busy.compareAndSet(false, true)) {
-            throw new IllegalStateException();
-        }
-        send0(message, completionHandler);
-    }
-
-    public void close() throws IOException {
-        channel.shutdownOutput();
-    }
-
-    private RawChannel.RawEvent createHandler() {
-        return new RawChannel.RawEvent() {
-
-            @Override
-            public int interestOps() {
-                return SelectionKey.OP_WRITE;
-            }
-
-            @Override
-            public void handle() {
-                // registerEvent(e) happens-before subsequent e.handle(), so
-                // we're fine reading the stored message and the completionHandler
-                send0(message, completionHandler);
-            }
-        };
-    }
-
-    private void send0(OutgoingMessage message, Consumer<Exception> handler) {
-        boolean b = busy.get();
-        assert b; // Please don't inline this, as busy.get() has memory
-                  // visibility effects and we don't want the program behaviour
-                  // to depend on whether the assertions are turned on
-                  // or turned off
-        try {
-            boolean sent = message.sendTo(channel);
-            if (sent) {
-                busy.set(false);
-                handler.accept(null);
-            } else {
-                // The message has not been fully sent, the transmitter needs to
-                // remember the message until it can continue with sending it
-                this.message = message;
-                this.completionHandler = handler;
-                try {
-                    channel.registerEvent(event);
-                } catch (IOException e) {
-                    this.message = null;
-                    this.completionHandler = null;
-                    busy.set(false);
-                    handler.accept(e);
-                }
-            }
-        } catch (IOException e) {
-            busy.set(false);
-            handler.accept(e);
-        }
-    }
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transport.java	Fri Dec 15 00:47:16 2017 +0300
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.incubator.http.internal.websocket;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+/*
+ * The only parametrization of Transport<T> used is Transport<WebSocket>. The
+ * type parameter T was introduced solely to avoid circular dependency between
+ * Transport and WebSocket. After all, instances of T are used solely to
+ * complete CompletableFutures. Transport doesn't care about the exact type of
+ * T.
+ *
+ * This way the Transport is fully in charge of creating CompletableFutures.
+ * On the one hand, Transport may use it to cache/reuse CompletableFutures. On
+ * the other hand, the class that uses Transport, may benefit by not converting
+ * from CompletableFuture<K> returned from Transport, to CompletableFuture<V>
+ * needed by the said class.
+ */
+public interface Transport<T> {
+
+    CompletableFuture<T> sendText(CharSequence message, boolean isLast);
+
+    CompletableFuture<T> sendBinary(ByteBuffer message, boolean isLast);
+
+    CompletableFuture<T> sendPing(ByteBuffer message);
+
+    CompletableFuture<T> sendPong(ByteBuffer message);
+
+    CompletableFuture<T> sendClose(int statusCode, String reason);
+
+    void request(long n);
+
+    /*
+     * Why is this method needed? Since Receiver operates through callbacks
+     * this method allows to abstract out what constitutes as a message being
+     * received (i.e. to decide outside this type when exactly one should
+     * decrement the demand).
+     */
+    void acknowledgeReception();
+
+    void closeOutput() throws IOException;
+
+    void closeInput() throws IOException;
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/TransportFactory.java	Fri Dec 15 00:47:16 2017 +0300
@@ -0,0 +1,10 @@
+package jdk.incubator.http.internal.websocket;
+
+import java.util.function.Supplier;
+
+@FunctionalInterface
+public interface TransportFactory {
+
+    <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
+                                     MessageStreamConsumer consumer);
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/TransportFactoryImpl.java	Fri Dec 15 00:47:16 2017 +0300
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package jdk.incubator.http.internal.websocket;
+
+import java.util.function.Supplier;
+
+public class TransportFactoryImpl implements TransportFactory {
+
+    private final RawChannel channel;
+
+    public TransportFactoryImpl(RawChannel channel) {
+        this.channel = channel;
+    }
+
+    @Override
+    public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
+                                            MessageStreamConsumer consumer) {
+        return new TransportImpl<T>(sendResultSupplier, consumer, channel);
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/TransportImpl.java	Fri Dec 15 00:47:16 2017 +0300
@@ -0,0 +1,358 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.incubator.http.internal.websocket;
+
+import jdk.incubator.http.internal.common.Demand;
+import jdk.incubator.http.internal.common.MinimalFuture;
+import jdk.incubator.http.internal.common.Pair;
+import jdk.incubator.http.internal.common.SequentialScheduler;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+import static jdk.incubator.http.internal.common.Pair.pair;
+
+public class TransportImpl<T> implements Transport<T> {
+
+    /* This flag is used solely for assertions */
+    private final AtomicBoolean busy = new AtomicBoolean();
+    private OutgoingMessage message;
+    private Consumer<Exception> completionHandler;
+    private final RawChannel channel;
+    private final RawChannel.RawEvent writeEvent = createWriteEvent();
+    private final SequentialScheduler sendScheduler = new SequentialScheduler(new SendTask());
+    private final Queue<Pair<OutgoingMessage, CompletableFuture<T>>>
+            queue = new ConcurrentLinkedQueue<>();
+    private final OutgoingMessage.Context context = new OutgoingMessage.Context();
+    private final Supplier<T> resultSupplier;
+
+    private final MessageStreamConsumer messageConsumer;
+    private final FrameConsumer frameConsumer;
+    private final Frame.Reader reader = new Frame.Reader();
+    private final RawChannel.RawEvent readEvent = createReadEvent();
+    private final Demand demand = new Demand();
+    private final SequentialScheduler receiveScheduler;
+
+    private ByteBuffer data;
+    private volatile int state;
+
+    private static final int UNREGISTERED = 0;
+    private static final int AVAILABLE    = 1;
+    private static final int WAITING      = 2;
+
+    private final Object lock = new Object();
+    private boolean inputClosed;
+    private boolean outputClosed;
+
+    public TransportImpl(Supplier<T> sendResultSupplier,
+                         MessageStreamConsumer consumer,
+                         RawChannel channel) {
+        this.resultSupplier = sendResultSupplier;
+        this.messageConsumer = consumer;
+        this.channel = channel;
+        this.frameConsumer = new FrameConsumer(this.messageConsumer);
+        this.data = channel.initialByteBuffer();
+        // To ensure the initial non-final `data` will be visible
+        // (happens-before) when `readEvent.handle()` invokes `receiveScheduler`
+        // the following assignment is done last:
+        receiveScheduler = new SequentialScheduler(new ReceiveTask());
+    }
+
+    /**
+     * The supplied handler may be invoked in the calling thread.
+     * A {@code StackOverflowError} may thus occur if there's a possibility
+     * that this method is called again by the supplied handler.
+     */
+    public void send(OutgoingMessage message,
+                     Consumer<Exception> completionHandler) {
+        requireNonNull(message);
+        requireNonNull(completionHandler);
+        if (!busy.compareAndSet(false, true)) {
+            throw new IllegalStateException();
+        }
+        send0(message, completionHandler);
+    }
+
+    private RawChannel.RawEvent createWriteEvent() {
+        return new RawChannel.RawEvent() {
+
+            @Override
+            public int interestOps() {
+                return SelectionKey.OP_WRITE;
+            }
+
+            @Override
+            public void handle() {
+                // registerEvent(e) happens-before subsequent e.handle(), so
+                // we're fine reading the stored message and the completionHandler
+                send0(message, completionHandler);
+            }
+        };
+    }
+
+    private void send0(OutgoingMessage message, Consumer<Exception> handler) {
+        boolean b = busy.get();
+        assert b; // Please don't inline this, as busy.get() has memory
+        // visibility effects and we don't want the program behaviour
+        // to depend on whether the assertions are turned on
+        // or turned off
+        try {
+            boolean sent = message.sendTo(channel);
+            if (sent) {
+                busy.set(false);
+                handler.accept(null);
+            } else {
+                // The message has not been fully sent, the transmitter needs to
+                // remember the message until it can continue with sending it
+                this.message = message;
+                this.completionHandler = handler;
+                try {
+                    channel.registerEvent(writeEvent);
+                } catch (IOException e) {
+                    this.message = null;
+                    this.completionHandler = null;
+                    busy.set(false);
+                    handler.accept(e);
+                }
+            }
+        } catch (IOException e) {
+            busy.set(false);
+            handler.accept(e);
+        }
+    }
+
+    public CompletableFuture<T> sendText(CharSequence message,
+                                         boolean isLast) {
+        return enqueue(new OutgoingMessage.Text(message, isLast));
+    }
+
+    public CompletableFuture<T> sendBinary(ByteBuffer message,
+                                           boolean isLast) {
+        return enqueue(new OutgoingMessage.Binary(message, isLast));
+    }
+
+    public CompletableFuture<T> sendPing(ByteBuffer message) {
+        return enqueue(new OutgoingMessage.Ping(message));
+    }
+
+    public CompletableFuture<T> sendPong(ByteBuffer message) {
+        return enqueue(new OutgoingMessage.Pong(message));
+    }
+
+    public CompletableFuture<T> sendClose(int statusCode, String reason) {
+        return enqueue(new OutgoingMessage.Close(statusCode, reason));
+    }
+
+    private CompletableFuture<T> enqueue(OutgoingMessage m) {
+        CompletableFuture<T> cf = new MinimalFuture<>();
+        boolean added = queue.add(pair(m, cf));
+        if (!added) {
+            // The queue is supposed to be unbounded
+            throw new InternalError();
+        }
+        sendScheduler.runOrSchedule();
+        return cf;
+    }
+
+    /*
+     * This is a message sending task. It pulls messages from the queue one by
+     * one and sends them. It may be run in different threads, but never
+     * concurrently.
+     */
+    private class SendTask implements SequentialScheduler.RestartableTask {
+
+        @Override
+        public void run(SequentialScheduler.DeferredCompleter taskCompleter) {
+            Pair<OutgoingMessage, CompletableFuture<T>> p = queue.poll();
+            if (p == null) {
+                taskCompleter.complete();
+                return;
+            }
+            OutgoingMessage message = p.first;
+            CompletableFuture<T> cf = p.second;
+            try {
+                if (!message.contextualize(context)) { // Do not send the message
+                    cf.complete(null);
+                    repeat(taskCompleter);
+                    return;
+                }
+                Consumer<Exception> h = e -> {
+                    if (e == null) {
+                        cf.complete(resultSupplier.get());
+                    } else {
+                        cf.completeExceptionally(e);
+                    }
+                    repeat(taskCompleter);
+                };
+                send(message, h);
+            } catch (Throwable t) {
+                cf.completeExceptionally(t);
+                repeat(taskCompleter);
+            }
+        }
+
+        private void repeat(SequentialScheduler.DeferredCompleter taskCompleter) {
+            taskCompleter.complete();
+            // More than a single message may have been enqueued while
+            // the task has been busy with the current message, but
+            // there is only a single signal recorded
+            sendScheduler.runOrSchedule();
+        }
+    }
+
+    private RawChannel.RawEvent createReadEvent() {
+        return new RawChannel.RawEvent() {
+
+            @Override
+            public int interestOps() {
+                return SelectionKey.OP_READ;
+            }
+
+            @Override
+            public void handle() {
+                state = AVAILABLE;
+                receiveScheduler.runOrSchedule();
+            }
+        };
+    }
+
+    @Override
+    public void request(long n) {
+        if (demand.increase(n)) {
+            receiveScheduler.runOrSchedule();
+        }
+    }
+
+    @Override
+    public void acknowledgeReception() {
+        long x = demand.decreaseAndGet(1);
+        if (x < 0) {
+            throw new InternalError(String.valueOf(x));
+        }
+    }
+
+    private class ReceiveTask extends SequentialScheduler.CompleteRestartableTask {
+
+        @Override
+        public void run() {
+            while (!receiveScheduler.isStopped()) {
+                if (data.hasRemaining()) {
+                    if (!demand.isFulfilled()) {
+                        try {
+                            int oldPos = data.position();
+                            reader.readFrame(data, frameConsumer);
+                            int newPos = data.position();
+                            assert oldPos != newPos : data; // reader always consumes bytes
+                        } catch (Throwable e) {
+                            receiveScheduler.stop();
+                            messageConsumer.onError(e);
+                        }
+                        continue;
+                    }
+                    break;
+                }
+                switch (state) {
+                    case WAITING:
+                        return;
+                    case UNREGISTERED:
+                        try {
+                            state = WAITING;
+                            channel.registerEvent(readEvent);
+                        } catch (Throwable e) {
+                            receiveScheduler.stop();
+                            messageConsumer.onError(e);
+                        }
+                        return;
+                    case AVAILABLE:
+                        try {
+                            data = channel.read();
+                        } catch (Throwable e) {
+                            receiveScheduler.stop();
+                            messageConsumer.onError(e);
+                            return;
+                        }
+                        if (data == null) { // EOF
+                            receiveScheduler.stop();
+                            messageConsumer.onComplete();
+                            return;
+                        } else if (!data.hasRemaining()) {
+                            // No data at the moment Pretty much a "goto",
+                            // reusing the existing code path for registration
+                            state = UNREGISTERED;
+                        }
+                        continue;
+                    default:
+                        throw new InternalError(String.valueOf(state));
+                }
+            }
+        }
+    }
+
+    /*
+     * Stops the machinery from reading and delivering messages permanently,
+     * regardless of the current demand and data availability.
+     */
+    @Override
+    public void closeInput() throws IOException {
+        synchronized (lock) {
+            if (!inputClosed) {
+                inputClosed = true;
+                try {
+                    receiveScheduler.stop();
+                    channel.shutdownInput();
+                } finally {
+                    if (outputClosed) {
+                        channel.close();
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void closeOutput() throws IOException {
+        synchronized (lock) {
+            if (!outputClosed) {
+                outputClosed = true;
+                try {
+                    channel.shutdownOutput();
+                } finally {
+                    if (inputClosed) {
+                        channel.close();
+                    }
+                }
+            }
+        }
+    }
+}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/TransportSupplier.java	Thu Dec 14 18:41:57 2017 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,109 +0,0 @@
-/*
- * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.incubator.http.internal.websocket;
-
-import java.io.IOException;
-
-/*
- * Abstracts out I/O channel for the WebSocket implementation. The latter then
- * deals with input and output streams of messages and does not have to
- * understand the state machine of channels (e.g. how exactly they are closed).
- * Mocking this type will allow testing WebSocket message exchange in isolation.
- */
-public class TransportSupplier {
-
-    protected final RawChannel channel; /* Exposed for testing purposes */
-    private final Object lock = new Object();
-    private Transmitter transmitter;
-    private Receiver receiver;
-    private boolean receiverShutdown;
-    private boolean transmitterShutdown;
-    private boolean closed;
-
-    public TransportSupplier(RawChannel channel) {
-        this.channel = channel;
-    }
-
-    public Receiver receiver(MessageStreamConsumer consumer) {
-        synchronized (lock) {
-            if (receiver == null) {
-                receiver = newReceiver(consumer);
-            }
-            return receiver;
-        }
-    }
-
-    public Transmitter transmitter() {
-        synchronized (lock) {
-            if (transmitter == null) {
-                transmitter = newTransmitter();
-            }
-            return transmitter;
-        }
-    }
-
-    protected Receiver newReceiver(MessageStreamConsumer consumer) {
-        return new Receiver(consumer, channel) {
-            @Override
-            public void close() throws IOException {
-                synchronized (lock) {
-                    if (!closed) {
-                        try {
-                            super.close();
-                        } finally {
-                            receiverShutdown = true;
-                            if (transmitterShutdown) {
-                                closed = true;
-                                channel.close();
-                            }
-                        }
-                    }
-                }
-            }
-        };
-    }
-
-    protected Transmitter newTransmitter() {
-        return new Transmitter(channel) {
-            @Override
-            public void close() throws IOException {
-                synchronized (lock) {
-                    if (!closed) {
-                        try {
-                            super.close();
-                        } finally {
-                            transmitterShutdown = true;
-                            if (receiverShutdown) {
-                                closed = true;
-                                channel.close();
-                            }
-                        }
-                    }
-                }
-            }
-        };
-    }
-}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java	Thu Dec 14 18:41:57 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java	Fri Dec 15 00:47:16 2017 +0300
@@ -29,37 +29,25 @@
 import jdk.incubator.http.internal.common.Demand;
 import jdk.incubator.http.internal.common.Log;
 import jdk.incubator.http.internal.common.MinimalFuture;
-import jdk.incubator.http.internal.common.Pair;
 import jdk.incubator.http.internal.common.SequentialScheduler;
-import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter;
 import jdk.incubator.http.internal.common.Utils;
 import jdk.incubator.http.internal.websocket.OpeningHandshake.Result;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Binary;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Close;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Context;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Ping;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Pong;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Text;
 
 import java.io.IOException;
 import java.lang.ref.Reference;
 import java.net.ProtocolException;
 import java.net.URI;
 import java.nio.ByteBuffer;
-import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
 import java.util.function.Function;
 
 import static java.util.Objects.requireNonNull;
 import static jdk.incubator.http.internal.common.MinimalFuture.failedFuture;
-import static jdk.incubator.http.internal.common.Pair.pair;
 import static jdk.incubator.http.internal.websocket.StatusCodes.CLOSED_ABNORMALLY;
 import static jdk.incubator.http.internal.websocket.StatusCodes.NO_STATUS_CODE;
 import static jdk.incubator.http.internal.websocket.StatusCodes.isLegalToSendFromClient;
@@ -108,12 +96,7 @@
     private final Listener listener;
 
     private final AtomicBoolean outstandingSend = new AtomicBoolean();
-    private final SequentialScheduler sendScheduler = new SequentialScheduler(new SendTask());
-    private final Queue<Pair<OutgoingMessage, CompletableFuture<WebSocket>>>
-            queue = new ConcurrentLinkedQueue<>();
-    private final Context context = new OutgoingMessage.Context();
-    private final Transmitter transmitter;
-    private final Receiver receiver;
+    private final Transport<WebSocket> transport;
     private final SequentialScheduler receiveScheduler = new SequentialScheduler(new ReceiveTask());
     private final Demand demand = new Demand();
 
@@ -140,10 +123,10 @@
     }
 
     /* Exposed for testing purposes */
-    static WebSocket newInstance(URI uri,
-                                 String subprotocol,
-                                 Listener listener,
-                                 TransportSupplier transport) {
+    static WebSocketImpl newInstance(URI uri,
+                                     String subprotocol,
+                                     Listener listener,
+                                     TransportFactory transport) {
         WebSocketImpl ws = new WebSocketImpl(uri, subprotocol, listener, transport);
         // This initialisation is outside of the constructor for the sake of
         // safe publication of WebSocketImpl.this
@@ -154,68 +137,82 @@
     private WebSocketImpl(URI uri,
                           String subprotocol,
                           Listener listener,
-                          TransportSupplier transport) {
+                          TransportFactory transportFactory) {
         this.uri = requireNonNull(uri);
         this.subprotocol = requireNonNull(subprotocol);
         this.listener = requireNonNull(listener);
-        this.transmitter = transport.transmitter();
-        this.receiver = transport.receiver(new SignallingMessageConsumer());
+        this.transport = transportFactory.createTransport(
+                () -> WebSocketImpl.this, // What about escape of WebSocketImpl.this?
+                new SignallingMessageConsumer());
     }
 
     @Override
-    public CompletableFuture<WebSocket> sendText(CharSequence message, boolean isLast) {
-        return enqueueExclusively(new Text(message, isLast));
+    public CompletableFuture<WebSocket> sendText(CharSequence message,
+                                                 boolean isLast) {
+        if (!outstandingSend.compareAndSet(false, true)) {
+            return failedFuture(new IllegalStateException("Send pending"));
+        }
+        CompletableFuture<WebSocket> cf = transport.sendText(message, isLast);
+        cf.whenComplete((r, e) -> outstandingSend.set(false));
+        return cf;
     }
 
     @Override
-    public CompletableFuture<WebSocket> sendBinary(ByteBuffer message, boolean isLast) {
-        return enqueueExclusively(new Binary(message, isLast));
+    public CompletableFuture<WebSocket> sendBinary(ByteBuffer message,
+                                                   boolean isLast) {
+        if (!outstandingSend.compareAndSet(false, true)) {
+            return failedFuture(new IllegalStateException("Send pending"));
+        }
+        CompletableFuture<WebSocket> cf = transport.sendBinary(message, isLast);
+        // Optimize?
+        //        if (cf.isDone()) {
+        //            outstandingSend.set(false);
+        //        } else {
+        //            cf.whenComplete((r, e) -> outstandingSend.set(false));
+        //        }
+        cf.whenComplete((r, e) -> outstandingSend.set(false));
+        return cf;
     }
 
     @Override
     public CompletableFuture<WebSocket> sendPing(ByteBuffer message) {
-        return enqueue(new Ping(message));
+        return transport.sendPing(message);
     }
 
     @Override
     public CompletableFuture<WebSocket> sendPong(ByteBuffer message) {
-        return enqueue(new Pong(message));
+        return transport.sendPong(message);
     }
 
     @Override
     public CompletableFuture<WebSocket> sendClose(int statusCode, String reason) {
         if (!isLegalToSendFromClient(statusCode)) {
-            return failedFuture(
-                    new IllegalArgumentException("statusCode: " + statusCode));
+            return failedFuture(new IllegalArgumentException("statusCode"));
         }
-        Close msg;
-        try {
-            msg = new Close(statusCode, reason);
-        } catch (IllegalArgumentException e) {
-            return failedFuture(e);
-        }
-        outputClosed = true;
-        return enqueueClose(msg);
+        return sendClose0(statusCode, reason);
     }
 
     /*
-     * Sends a Close message, then shuts down the transmitter since no more
+     * Sends a Close message, then shuts down the output since no more
      * messages are expected to be sent after this.
+     *
+     * TODO: Even if arguments are illegal the default message will be sent.
      */
-    private CompletableFuture<WebSocket> enqueueClose(Close m) {
+    private CompletableFuture<WebSocket> sendClose0(int statusCode, String reason ) {
         // TODO: MUST be a CF created once and shared across sendClose, otherwise
         // a second sendClose may prematurely close the channel
-        return enqueue(m)
+        outputClosed = true;
+        return transport.sendClose(statusCode, reason)
                 .orTimeout(60, TimeUnit.SECONDS)
                 .whenComplete((r, error) -> {
                     try {
-                        transmitter.close();
+                        transport.closeOutput();
                     } catch (IOException e) {
                         Log.logError(e);
                     }
                     if (error instanceof TimeoutException) {
                         try {
-                            receiver.close();
+                            transport.closeInput();
                         } catch (IOException e) {
                             Log.logError(e);
                         }
@@ -223,77 +220,6 @@
                 });
     }
 
-    /*
-     * Accepts the given message into the outgoing queue in a mutually-exclusive
-     * fashion in respect to other messages accepted through this method. No
-     * further messages will be accepted until the returned CompletableFuture
-     * completes. This method is used to enforce "one outstanding send
-     * operation" policy.
-     */
-    private CompletableFuture<WebSocket> enqueueExclusively(OutgoingMessage m) {
-        if (!outstandingSend.compareAndSet(false, true)) {
-            return failedFuture(new IllegalStateException("Send pending"));
-        }
-        return enqueue(m).whenComplete((r, e) -> outstandingSend.set(false));
-    }
-
-    private CompletableFuture<WebSocket> enqueue(OutgoingMessage m) {
-        CompletableFuture<WebSocket> cf = new MinimalFuture<>();
-        boolean added = queue.add(pair(m, cf));
-        if (!added) {
-            // The queue is supposed to be unbounded
-            throw new InternalError();
-        }
-        sendScheduler.runOrSchedule();
-        return cf;
-    }
-
-    /*
-     * This is a message sending task. It pulls messages from the queue one by
-     * one and sends them. It may be run in different threads, but never
-     * concurrently.
-     */
-    private class SendTask implements SequentialScheduler.RestartableTask {
-
-        @Override
-        public void run(DeferredCompleter taskCompleter) {
-            Pair<OutgoingMessage, CompletableFuture<WebSocket>> p = queue.poll();
-            if (p == null) {
-                taskCompleter.complete();
-                return;
-            }
-            OutgoingMessage message = p.first;
-            CompletableFuture<WebSocket> cf = p.second;
-            try {
-                if (!message.contextualize(context)) { // Do not send the message
-                    cf.complete(null);
-                    repeat(taskCompleter);
-                    return;
-                }
-                Consumer<Exception> h = e -> {
-                    if (e == null) {
-                        cf.complete(WebSocketImpl.this);
-                    } else {
-                        cf.completeExceptionally(e);
-                    }
-                    repeat(taskCompleter);
-                };
-                transmitter.send(message, h);
-            } catch (Throwable t) {
-                cf.completeExceptionally(t);
-                repeat(taskCompleter);
-            }
-        }
-
-        private void repeat(DeferredCompleter taskCompleter) {
-            taskCompleter.complete();
-            // More than a single message may have been enqueued while
-            // the task has been busy with the current message, but
-            // there is only a single signal recorded
-            sendScheduler.runOrSchedule();
-        }
-    }
-
     @Override
     public void request(long n) {
         if (demand.increase(n)) {
@@ -348,7 +274,7 @@
      */
     private class ReceiveTask extends SequentialScheduler.CompleteRestartableTask {
 
-        // Receiver only asked here and nowhere else because we must make sure
+        // Transport only asked here and nowhere else because we must make sure
         // onOpen is invoked first and no messages become pending before onOpen
         // finishes
 
@@ -387,7 +313,7 @@
                         case IDLE:
                             if (demand.tryDecrement()
                                     && tryChangeState(IDLE, WAITING)) {
-                                receiver.request(1);
+                                transport.request(1);
                             }
                             return;
                         case WAITING:
@@ -404,13 +330,13 @@
         }
 
         private void processError() throws IOException {
-            receiver.close();
+            transport.closeInput();
             receiveScheduler.stop();
             Throwable err = error.get();
             if (err instanceof FailWebSocketException) {
                 int code1 = ((FailWebSocketException) err).getStatusCode();
                 err = new ProtocolException().initCause(err);
-                enqueueClose(new Close(code1, ""))
+                sendClose0(code1, "")
                         .whenComplete(
                                 (r, e) -> {
                                     if (e != null) {
@@ -422,7 +348,7 @@
         }
 
         private void processClose() throws IOException {
-            receiver.close();
+            transport.closeInput();
             receiveScheduler.stop();
             CompletionStage<?> readyToClose;
             readyToClose = listener.onClose(WebSocketImpl.this, statusCode, reason);
@@ -436,7 +362,7 @@
                 code = statusCode;
             }
             readyToClose.whenComplete((r, e) -> {
-                enqueueClose(new Close(code, ""))
+                sendClose0(code, "")
                         .whenComplete((r1, e1) -> {
                             if (e1 != null) {
                                 Log.logError(e1);
@@ -458,7 +384,7 @@
                     .put(binaryData)
                     .flip();
             // Non-exclusive send;
-            CompletableFuture<WebSocket> pongSent = enqueue(new Pong(copy));
+            CompletableFuture<WebSocket> pongSent = transport.sendPong(copy);
             pongSent.whenComplete(
                     (r, e) -> {
                         if (e != null) {
@@ -499,9 +425,9 @@
     private void close() {
         try {
             try {
-                receiver.close();
+                transport.closeInput();
             } finally {
-                transmitter.close();
+                transport.closeOutput();
             }
         } catch (Throwable t) {
             Log.logError(t);
@@ -520,7 +446,7 @@
             Log.logTrace("Close: {0}, ''{1}''", statusCode, reason);
         } else {
             try {
-                receiver.close();
+                transport.closeInput();
             } catch (Throwable t) {
                 Log.logError(t);
             }
@@ -531,7 +457,7 @@
 
         @Override
         public void onText(CharSequence data, MessagePart part) {
-            receiver.acknowledge();
+            transport.acknowledgeReception();
             text = data;
             WebSocketImpl.this.part = part;
             tryChangeState(WAITING, TEXT);
@@ -539,7 +465,7 @@
 
         @Override
         public void onBinary(ByteBuffer data, MessagePart part) {
-            receiver.acknowledge();
+            transport.acknowledgeReception();
             binaryData = data;
             WebSocketImpl.this.part = part;
             tryChangeState(WAITING, BINARY);
@@ -547,27 +473,27 @@
 
         @Override
         public void onPing(ByteBuffer data) {
-            receiver.acknowledge();
+            transport.acknowledgeReception();
             binaryData = data;
             tryChangeState(WAITING, PING);
         }
 
         @Override
         public void onPong(ByteBuffer data) {
-            receiver.acknowledge();
+            transport.acknowledgeReception();
             binaryData = data;
             tryChangeState(WAITING, PONG);
         }
 
         @Override
         public void onClose(int statusCode, CharSequence reason) {
-            receiver.acknowledge();
+            transport.acknowledgeReception();
             signalClose(statusCode, reason.toString());
         }
 
         @Override
         public void onComplete() {
-            receiver.acknowledge();
+            transport.acknowledgeReception();
             signalClose(CLOSED_ABNORMALLY, "");
         }
 
@@ -602,4 +528,9 @@
         }
         return false;
     }
+
+    /* Exposed for testing purposes */
+    protected final Transport<WebSocket> transport() {
+        return transport;
+    }
 }
--- a/test/jdk/java/net/httpclient/websocket/ReceivingTestDriver.java	Thu Dec 14 18:41:57 2017 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,29 +0,0 @@
-/*
- * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-/*
- * @test
- * @modules jdk.incubator.httpclient/jdk.incubator.http.internal.websocket:open
- * @run testng/othervm --add-reads jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.ReceivingTest
- */
-public class ReceivingTestDriver { }
--- a/test/jdk/java/net/httpclient/websocket/SendingTestDriver.java	Thu Dec 14 18:41:57 2017 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,29 +0,0 @@
-/*
- * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-/*
- * @test
- * @modules jdk.incubator.httpclient/jdk.incubator.http.internal.websocket:open
- * @run testng/othervm --add-reads jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.SendingTest
- */
-public class SendingTestDriver { }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/websocket/WebSocketImplDriver.java	Fri Dec 15 00:47:16 2017 +0300
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * @test
+ * @modules jdk.incubator.httpclient/jdk.incubator.http.internal.websocket:open
+ * @run testng/othervm/timeout=30 --add-reads jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.WebSocketImplTest
+ */
+public class WebSocketImplDriver { }
--- a/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockListener.java	Thu Dec 14 18:41:57 2017 +0000
+++ b/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockListener.java	Fri Dec 15 00:47:16 2017 +0300
@@ -39,7 +39,7 @@
 
     private final long bufferSize;
     private long count;
-    private final List<ListenerInvocation> invocations = new ArrayList<>();
+    private final List<Invocation> invocations = new ArrayList<>();
     private final CompletableFuture<?> lastCall = new CompletableFuture<>();
 
     /*
@@ -147,11 +147,11 @@
         webSocket.request(count);
     }
 
-    public List<ListenerInvocation> invocations() {
+    public List<Invocation> invocations() {
         return new ArrayList<>(invocations);
     }
 
-    public abstract static class ListenerInvocation {
+    public abstract static class Invocation {
 
         public static OnOpen onOpen(WebSocket webSocket) {
             return new OnOpen(webSocket);
@@ -192,12 +192,12 @@
 
         final WebSocket webSocket;
 
-        private ListenerInvocation(WebSocket webSocket) {
+        private Invocation(WebSocket webSocket) {
             this.webSocket = webSocket;
         }
     }
 
-    public static final class OnOpen extends ListenerInvocation {
+    public static final class OnOpen extends Invocation {
 
         public OnOpen(WebSocket webSocket) {
             super(webSocket);
@@ -207,7 +207,7 @@
         public boolean equals(Object o) {
             if (this == o) return true;
             if (o == null || getClass() != o.getClass()) return false;
-            ListenerInvocation that = (ListenerInvocation) o;
+            Invocation that = (Invocation) o;
             return Objects.equals(webSocket, that.webSocket);
         }
 
@@ -217,7 +217,7 @@
         }
     }
 
-    public static final class OnText extends ListenerInvocation {
+    public static final class OnText extends Invocation {
 
         final String text;
         final MessagePart part;
@@ -249,7 +249,7 @@
         }
     }
 
-    public static final class OnBinary extends ListenerInvocation {
+    public static final class OnBinary extends Invocation {
 
         final ByteBuffer data;
         final MessagePart part;
@@ -281,7 +281,7 @@
         }
     }
 
-    public static final class OnPing extends ListenerInvocation {
+    public static final class OnPing extends Invocation {
 
         final ByteBuffer data;
 
@@ -310,7 +310,7 @@
         }
     }
 
-    public static final class OnPong extends ListenerInvocation {
+    public static final class OnPong extends Invocation {
 
         final ByteBuffer data;
 
@@ -339,7 +339,7 @@
         }
     }
 
-    public static final class OnClose extends ListenerInvocation {
+    public static final class OnClose extends Invocation {
 
         final int statusCode;
         final String reason;
@@ -371,7 +371,7 @@
         }
     }
 
-    public static final class OnError extends ListenerInvocation {
+    public static final class OnError extends Invocation {
 
         final Class<? extends Throwable> clazz;
 
--- a/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockReceiver.java	Thu Dec 14 18:41:57 2017 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,87 +0,0 @@
-/*
- * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.incubator.http.internal.websocket;
-
-import jdk.incubator.http.internal.common.Pair;
-import jdk.incubator.http.internal.common.SequentialScheduler;
-import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter;
-
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.function.Consumer;
-
-public class MockReceiver extends Receiver {
-
-    private final Iterator<Pair<CompletionStage<?>, Consumer<MessageStreamConsumer>>> iterator;
-    private final MessageStreamConsumer consumer;
-
-    public MockReceiver(MessageStreamConsumer consumer, RawChannel channel,
-                        Pair<CompletionStage<?>, Consumer<MessageStreamConsumer>>... pairs) {
-        super(consumer, channel);
-        this.consumer = consumer;
-        iterator = Arrays.asList(pairs).iterator();
-    }
-
-    @Override
-    protected SequentialScheduler createScheduler() {
-        class X { // Class is hack needed to allow the task to refer to the scheduler
-            SequentialScheduler scheduler = new SequentialScheduler(task());
-
-            SequentialScheduler.RestartableTask task() {
-                return new SequentialScheduler.RestartableTask() {
-                    @Override
-                    public void run(DeferredCompleter taskCompleter) {
-                        if (!scheduler.isStopped() && !demand.isFulfilled()) {
-                            if (!iterator.hasNext()) {
-                                taskCompleter.complete();
-                                return;
-                            }
-                            Pair<CompletionStage<?>, Consumer<MessageStreamConsumer>> p = iterator.next();
-                            CompletableFuture<?> cf = p.first.toCompletableFuture();
-                            if (cf.isDone()) { // Forcing synchronous execution
-                                p.second.accept(consumer);
-                                repeat(taskCompleter);
-                            } else {
-                                cf.whenCompleteAsync((r, e) -> {
-                                    p.second.accept(consumer);
-                                    repeat(taskCompleter);
-                                });
-                            }
-                        } else {
-                            taskCompleter.complete();
-                        }
-                    }
-
-                    private void repeat(DeferredCompleter taskCompleter) {
-                        taskCompleter.complete();
-                        scheduler.runOrSchedule();
-                    }
-                };
-            }
-        }
-        return new X().scheduler;
-    }
-}
--- a/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockTransmitter.java	Thu Dec 14 18:41:57 2017 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,71 +0,0 @@
-/*
- * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.incubator.http.internal.websocket;
-
-import java.util.Queue;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.function.Consumer;
-
-public abstract class MockTransmitter extends Transmitter {
-
-    private final long startTime = System.currentTimeMillis();
-
-    private final Queue<OutgoingMessage> messages = new ConcurrentLinkedQueue<>();
-
-    public MockTransmitter() {
-        super(null);
-    }
-
-    @Override
-    public void send(OutgoingMessage message,
-                     Consumer<Exception> completionHandler) {
-        System.out.printf("[%6s ms.] begin send(%s)%n",
-                          System.currentTimeMillis() - startTime,
-                          message);
-        messages.add(message);
-        whenSent().whenComplete((r, e) -> {
-            System.out.printf("[%6s ms.] complete send(%s)%n",
-                              System.currentTimeMillis() - startTime,
-                              message);
-            if (e != null) {
-                completionHandler.accept((Exception) e);
-            } else {
-                completionHandler.accept(null);
-            }
-        });
-        System.out.printf("[%6s ms.] end send(%s)%n",
-                          System.currentTimeMillis() - startTime,
-                          message);
-    }
-
-    @Override
-    public void close() { }
-
-    protected abstract CompletionStage<?> whenSent();
-
-    public Queue<OutgoingMessage> queue() {
-        return messages;
-    }
-}
--- a/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockTransport.java	Thu Dec 14 18:41:57 2017 +0000
+++ b/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockTransport.java	Fri Dec 15 00:47:16 2017 +0300
@@ -23,46 +23,414 @@
 
 package jdk.incubator.http.internal.websocket;
 
+import jdk.incubator.http.WebSocket.MessagePart;
+import jdk.incubator.http.internal.common.Demand;
+import jdk.incubator.http.internal.common.SequentialScheduler;
+
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static jdk.incubator.http.internal.websocket.TestSupport.fullCopy;
+
+public class MockTransport<T> implements Transport<T> {
+
+    private final long startTime = System.currentTimeMillis();
+    private final Queue<Invocation> output = new ConcurrentLinkedQueue<>();
+    private final Queue<CompletableFuture<Consumer<MessageStreamConsumer>>>
+            input = new ConcurrentLinkedQueue<>();
+    private final Supplier<T> supplier;
+    private final MessageStreamConsumer consumer;
+    private final SequentialScheduler scheduler
+            = new SequentialScheduler(new ReceiveTask());
+    private final Demand demand = new Demand();
+
+    public MockTransport(Supplier<T> sendResultSupplier,
+                         MessageStreamConsumer consumer) {
+        this.supplier = sendResultSupplier;
+        this.consumer = consumer;
+        input.addAll(receive());
+    }
+
+    @Override
+    public final CompletableFuture<T> sendText(CharSequence message,
+                                               boolean isLast) {
+        output.add(Invocation.sendText(message, isLast));
+        return send(String.format("sendText(%s, %s)", message, isLast),
+                    () -> sendText0(message, isLast));
+    }
 
-public class MockTransport extends TransportSupplier {
+    protected CompletableFuture<T> sendText0(CharSequence message,
+                                             boolean isLast) {
+        return defaultSend();
+    }
+
+    protected CompletableFuture<T> defaultSend() {
+        return CompletableFuture.completedFuture(result());
+    }
+
+    @Override
+    public final CompletableFuture<T> sendBinary(ByteBuffer message,
+                                                 boolean isLast) {
+        output.add(Invocation.sendBinary(message, isLast));
+        return send(String.format("sendBinary(%s, %s)", message, isLast),
+                    () -> sendBinary0(message, isLast));
+    }
+
+    protected CompletableFuture<T> sendBinary0(ByteBuffer message,
+                                               boolean isLast) {
+        return defaultSend();
+    }
 
-    public MockTransport() {
-        super(new NullRawChannel());
+    @Override
+    public final CompletableFuture<T> sendPing(ByteBuffer message) {
+        output.add(Invocation.sendPing(message));
+        return send(String.format("sendPing(%s)", message),
+                    () -> sendPing0(message));
+    }
+
+    protected CompletableFuture<T> sendPing0(ByteBuffer message) {
+        return defaultSend();
+    }
+
+    @Override
+    public final CompletableFuture<T> sendPong(ByteBuffer message) {
+        output.add(Invocation.sendPong(message));
+        return send(String.format("sendPong(%s)", message),
+                    () -> sendPong0(message));
+    }
+
+    protected CompletableFuture<T> sendPong0(ByteBuffer message) {
+        return defaultSend();
+    }
+
+    @Override
+    public final CompletableFuture<T> sendClose(int statusCode, String reason) {
+        output.add(Invocation.sendClose(statusCode, reason));
+        return send(String.format("sendClose(%s, %s)", statusCode, reason),
+                    () -> sendClose0(statusCode, reason));
+    }
+
+    protected CompletableFuture<T> sendClose0(int statusCode, String reason) {
+        return defaultSend();
     }
 
-    public static class NullRawChannel implements RawChannel {
+    protected Collection<CompletableFuture<Consumer<MessageStreamConsumer>>> receive() {
+        return List.of();
+    }
+
+    public static Consumer<MessageStreamConsumer> onText(CharSequence data,
+                                                         MessagePart part) {
+        return c -> c.onText(data.toString(), part);
+    }
+
+    public static Consumer<MessageStreamConsumer> onBinary(ByteBuffer data,
+                                                           MessagePart part) {
+        return c -> c.onBinary(fullCopy(data), part);
+    }
+
+    public static Consumer<MessageStreamConsumer> onPing(ByteBuffer data) {
+        return c -> c.onPing(fullCopy(data));
+    }
+
+    public static Consumer<MessageStreamConsumer> onPong(ByteBuffer data) {
+        return c -> c.onPong(fullCopy(data));
+    }
+
+    public static Consumer<MessageStreamConsumer> onClose(int statusCode,
+                                                          String reason) {
+        return c -> c.onClose(statusCode, reason);
+    }
+
+    public static Consumer<MessageStreamConsumer> onError(Throwable error) {
+        return c -> c.onError(error);
+    }
+
+    public static Consumer<MessageStreamConsumer> onComplete() {
+        return c -> c.onComplete();
+    }
 
-        @Override
-        public void registerEvent(RawEvent event) {
-            throw new UnsupportedOperationException();
+    @Override
+    public void request(long n) {
+        demand.increase(n);
+        scheduler.runOrSchedule();
+    }
+
+    @Override
+    public void acknowledgeReception() {
+        demand.tryDecrement();
+    }
+
+    @Override
+    public final void closeOutput() throws IOException {
+        output.add(Invocation.closeOutput());
+        begin("closeOutput()");
+        closeOutput0();
+        end("closeOutput()");
+    }
+
+    protected void closeOutput0() throws IOException {
+        defaultClose();
+    }
+
+    protected void defaultClose() throws IOException {
+    }
+
+    @Override
+    public final void closeInput() throws IOException {
+        output.add(Invocation.closeInput());
+        begin("closeInput()");
+        closeInput0();
+        end("closeInput()");
+    }
+
+    protected void closeInput0() throws IOException {
+        defaultClose();
+    }
+
+    public abstract static class Invocation {
+
+        static Invocation.SendText sendText(CharSequence message,
+                                            boolean isLast) {
+            return new SendText(message, isLast);
         }
 
-        @Override
-        public ByteBuffer initialByteBuffer() {
-            return ByteBuffer.allocate(0);
+        static Invocation.SendBinary sendBinary(ByteBuffer message,
+                                                boolean isLast) {
+            return new SendBinary(message, isLast);
+        }
+
+        static Invocation.SendPing sendPing(ByteBuffer message) {
+            return new SendPing(message);
+        }
+
+        static Invocation.SendPong sendPong(ByteBuffer message) {
+            return new SendPong(message);
+        }
+
+        static Invocation.SendClose sendClose(int statusCode, String reason) {
+            return new SendClose(statusCode, reason);
+        }
+
+        public static CloseOutput closeOutput() {
+            return new CloseOutput();
         }
 
-        @Override
-        public ByteBuffer read() {
-            throw new UnsupportedOperationException();
+        public static CloseInput closeInput() {
+            return new CloseInput();
+        }
+
+        public static final class SendText extends Invocation {
+
+            final CharSequence message;
+            final boolean isLast;
+
+            SendText(CharSequence message, boolean isLast) {
+                this.message = message.toString();
+                this.isLast = isLast;
+            }
+
+            @Override
+            public boolean equals(Object obj) {
+                if (this == obj) return true;
+                if (obj == null || getClass() != obj.getClass()) return false;
+                SendText sendText = (SendText) obj;
+                return isLast == sendText.isLast &&
+                        Objects.equals(message, sendText.message);
+            }
+
+            @Override
+            public int hashCode() {
+                return Objects.hash(isLast, message);
+            }
         }
 
-        @Override
-        public long write(ByteBuffer[] srcs, int offset, int length) {
-            throw new UnsupportedOperationException();
+        public static final class SendBinary extends Invocation {
+
+            final ByteBuffer message;
+            final boolean isLast;
+
+            SendBinary(ByteBuffer message, boolean isLast) {
+                this.message = fullCopy(message);
+                this.isLast = isLast;
+            }
+
+            @Override
+            public boolean equals(Object obj) {
+                if (this == obj) return true;
+                if (obj == null || getClass() != obj.getClass()) return false;
+                SendBinary that = (SendBinary) obj;
+                return isLast == that.isLast &&
+                        Objects.equals(message, that.message);
+            }
+
+            @Override
+            public int hashCode() {
+                return Objects.hash(message, isLast);
+            }
+        }
+
+        private static final class SendPing extends Invocation {
+
+            final ByteBuffer message;
+
+            SendPing(ByteBuffer message) {
+                this.message = fullCopy(message);
+            }
+
+            @Override
+            public boolean equals(Object obj) {
+                if (this == obj) return true;
+                if (obj == null || getClass() != obj.getClass()) return false;
+                SendPing sendPing = (SendPing) obj;
+                return Objects.equals(message, sendPing.message);
+            }
+
+            @Override
+            public int hashCode() {
+                return Objects.hash(message);
+            }
+        }
+
+        private static final class SendPong extends Invocation {
+
+            final ByteBuffer message;
+
+            SendPong(ByteBuffer message) {
+                this.message = fullCopy(message);
+            }
+
+            @Override
+            public boolean equals(Object obj) {
+                if (this == obj) return true;
+                if (obj == null || getClass() != obj.getClass()) return false;
+                SendPing sendPing = (SendPing) obj;
+                return Objects.equals(message, sendPing.message);
+            }
+
+            @Override
+            public int hashCode() {
+                return Objects.hash(message);
+            }
         }
 
-        @Override
-        public void shutdownInput() {
+        private static final class SendClose extends Invocation {
+
+            final int statusCode;
+            final String reason;
+
+            SendClose(int statusCode, String reason) {
+                this.statusCode = statusCode;
+                this.reason = reason;
+            }
+
+            @Override
+            public boolean equals(Object obj) {
+                if (this == obj) return true;
+                if (obj == null || getClass() != obj.getClass()) return false;
+                SendClose sendClose = (SendClose) obj;
+                return statusCode == sendClose.statusCode &&
+                        Objects.equals(reason, sendClose.reason);
+            }
+
+            @Override
+            public int hashCode() {
+                return Objects.hash(statusCode, reason);
+            }
+        }
+
+        private static final class CloseOutput extends Invocation {
+
+            CloseOutput() { }
+
+            @Override
+            public int hashCode() {
+                return 0;
+            }
+
+            @Override
+            public boolean equals(Object obj) {
+                return obj instanceof CloseOutput;
+            }
         }
 
+        private static final class CloseInput extends Invocation {
+
+            CloseInput() { }
+
+            @Override
+            public int hashCode() {
+                return 0;
+            }
+
+            @Override
+            public boolean equals(Object obj) {
+                return obj instanceof CloseInput;
+            }
+        }
+    }
+
+    public Queue<Invocation> invocations() {
+        return new LinkedList<>(output);
+    }
+
+    protected final T result() {
+        return supplier.get();
+    }
+
+    private CompletableFuture<T> send(String name,
+                                      Supplier<CompletableFuture<T>> supplier) {
+        begin(name);
+        CompletableFuture<T> cf = supplier.get().whenComplete((r, e) -> {
+            System.out.printf("[%6s ms.] complete %s%n", elapsedTime(), name);
+        });
+        end(name);
+        return cf;
+    }
+
+    private void begin(String name) {
+        System.out.printf("[%6s ms.] begin %s%n", elapsedTime(), name);
+    }
+
+    private void end(String name) {
+        System.out.printf("[%6s ms.] end %s%n", elapsedTime(), name);
+    }
+
+    private long elapsedTime() {
+        return System.currentTimeMillis() - startTime;
+    }
+
+    private final class ReceiveTask implements SequentialScheduler.RestartableTask {
+
         @Override
-        public void shutdownOutput() {
+        public void run(SequentialScheduler.DeferredCompleter taskCompleter) {
+            if (!scheduler.isStopped() && !demand.isFulfilled() && !input.isEmpty()) {
+                CompletableFuture<Consumer<MessageStreamConsumer>> cf = input.remove();
+                if (cf.isDone()) { // Forcing synchronous execution
+                    cf.join().accept(consumer);
+                    repeat(taskCompleter);
+                } else {
+                    cf.whenCompleteAsync((r, e) -> {
+                        r.accept(consumer);
+                        repeat(taskCompleter);
+                    });
+                }
+            } else {
+                taskCompleter.complete();
+            }
         }
 
-        @Override
-        public void close() {
+        private void repeat(SequentialScheduler.DeferredCompleter taskCompleter) {
+            taskCompleter.complete();
+            scheduler.runOrSchedule();
         }
     }
 }
--- a/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/ReceivingTest.java	Thu Dec 14 18:41:57 2017 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,206 +0,0 @@
-/*
- * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.incubator.http.internal.websocket;
-
-import jdk.incubator.http.WebSocket;
-import org.testng.annotations.Test;
-
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.TimeUnit;
-
-import static java.util.concurrent.CompletableFuture.completedStage;
-import static jdk.incubator.http.WebSocket.MessagePart.FIRST;
-import static jdk.incubator.http.WebSocket.MessagePart.LAST;
-import static jdk.incubator.http.WebSocket.MessagePart.PART;
-import static jdk.incubator.http.WebSocket.MessagePart.WHOLE;
-import static jdk.incubator.http.WebSocket.NORMAL_CLOSURE;
-import static jdk.incubator.http.internal.common.Pair.pair;
-import static jdk.incubator.http.internal.websocket.MockListener.ListenerInvocation.onClose;
-import static jdk.incubator.http.internal.websocket.MockListener.ListenerInvocation.onError;
-import static jdk.incubator.http.internal.websocket.MockListener.ListenerInvocation.onOpen;
-import static jdk.incubator.http.internal.websocket.MockListener.ListenerInvocation.onPing;
-import static jdk.incubator.http.internal.websocket.MockListener.ListenerInvocation.onPong;
-import static jdk.incubator.http.internal.websocket.MockListener.ListenerInvocation.onText;
-import static org.testng.Assert.assertEquals;
-
-public class ReceivingTest {
-
-    // TODO: request in onClose/onError
-    // TODO: throw exception in onClose/onError
-    // TODO: exception is thrown from request()
-
-    @Test
-    public void testNonPositiveRequest() throws Exception {
-        MockListener listener = new MockListener(Long.MAX_VALUE) {
-            @Override
-            protected void onOpen0(WebSocket webSocket) {
-                webSocket.request(0);
-            }
-        };
-        MockTransport transport = new MockTransport() {
-            @Override
-            protected Receiver newReceiver(MessageStreamConsumer consumer) {
-                return new MockReceiver(consumer, channel, pair(now(), m -> m.onText("1", WHOLE)));
-            }
-        };
-        WebSocket ws = newInstance(listener, transport);
-        listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
-        List<MockListener.ListenerInvocation> invocations = listener.invocations();
-        assertEquals(invocations, List.of(onOpen(ws), onError(ws, IllegalArgumentException.class)));
-    }
-
-    @Test
-    public void testText1() throws Exception {
-        MockListener listener = new MockListener(Long.MAX_VALUE);
-        MockTransport transport = new MockTransport() {
-            @Override
-            protected Receiver newReceiver(MessageStreamConsumer consumer) {
-                return new MockReceiver(consumer, channel,
-                                        pair(now(), m -> m.onText("1", FIRST)),
-                                        pair(now(), m -> m.onText("2", PART)),
-                                        pair(now(), m -> m.onText("3", LAST)),
-                                        pair(now(), m -> m.onClose(NORMAL_CLOSURE, "no reason")));
-            }
-        };
-        WebSocket ws = newInstance(listener, transport);
-        listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
-        List<MockListener.ListenerInvocation> invocations = listener.invocations();
-        assertEquals(invocations, List.of(onOpen(ws),
-                                          onText(ws, "1", FIRST),
-                                          onText(ws, "2", PART),
-                                          onText(ws, "3", LAST),
-                                          onClose(ws, NORMAL_CLOSURE, "no reason")));
-    }
-
-    @Test
-    public void testText2() throws Exception {
-        MockListener listener = new MockListener(Long.MAX_VALUE);
-        MockTransport transport = new MockTransport() {
-            @Override
-            protected Receiver newReceiver(MessageStreamConsumer consumer) {
-                return new MockReceiver(consumer, channel,
-                                        pair(now(),      m -> m.onText("1", FIRST)),
-                                        pair(seconds(1), m -> m.onText("2", PART)),
-                                        pair(now(),      m -> m.onText("3", LAST)),
-                                        pair(seconds(1), m -> m.onClose(NORMAL_CLOSURE, "no reason")));
-            }
-        };
-        WebSocket ws = newInstance(listener, transport);
-        listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
-        List<MockListener.ListenerInvocation> invocations = listener.invocations();
-        assertEquals(invocations, List.of(onOpen(ws),
-                                          onText(ws, "1", FIRST),
-                                          onText(ws, "2", PART),
-                                          onText(ws, "3", LAST),
-                                          onClose(ws, NORMAL_CLOSURE, "no reason")));
-    }
-
-    @Test
-    public void testTextIntermixedWithPongs() throws Exception {
-        MockListener listener = new MockListener(Long.MAX_VALUE);
-        MockTransport transport = new MockTransport() {
-            @Override
-            protected Receiver newReceiver(MessageStreamConsumer consumer) {
-                return new MockReceiver(consumer, channel,
-                                        pair(now(),      m -> m.onText("1", FIRST)),
-                                        pair(now(),      m -> m.onText("2", PART)),
-                                        pair(now(),      m -> m.onPong(ByteBuffer.allocate(16))),
-                                        pair(seconds(1), m -> m.onPong(ByteBuffer.allocate(32))),
-                                        pair(now(),      m -> m.onText("3", LAST)),
-                                        pair(now(),      m -> m.onPong(ByteBuffer.allocate(64))),
-                                        pair(now(),      m -> m.onClose(NORMAL_CLOSURE, "no reason")));
-            }
-        };
-        WebSocket ws = newInstance(listener, transport);
-        listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
-        List<MockListener.ListenerInvocation> invocations = listener.invocations();
-        assertEquals(invocations, List.of(onOpen(ws),
-                                          onText(ws, "1", FIRST),
-                                          onText(ws, "2", PART),
-                                          onPong(ws, ByteBuffer.allocate(16)),
-                                          onPong(ws, ByteBuffer.allocate(32)),
-                                          onText(ws, "3", LAST),
-                                          onPong(ws, ByteBuffer.allocate(64)),
-                                          onClose(ws, NORMAL_CLOSURE, "no reason")));
-    }
-
-    @Test
-    public void testTextIntermixedWithPings() throws Exception {
-        MockListener listener = new MockListener(Long.MAX_VALUE);
-        MockTransport transport = new MockTransport() {
-            @Override
-            protected Receiver newReceiver(MessageStreamConsumer consumer) {
-                return new MockReceiver(consumer, channel,
-                                        pair(now(),      m -> m.onText("1", FIRST)),
-                                        pair(now(),      m -> m.onText("2", PART)),
-                                        pair(now(),      m -> m.onPing(ByteBuffer.allocate(16))),
-                                        pair(seconds(1), m -> m.onPing(ByteBuffer.allocate(32))),
-                                        pair(now(),      m -> m.onText("3", LAST)),
-                                        pair(now(),      m -> m.onPing(ByteBuffer.allocate(64))),
-                                        pair(now(),      m -> m.onClose(NORMAL_CLOSURE, "no reason")));
-            }
-
-            @Override
-            protected Transmitter newTransmitter() {
-                return new MockTransmitter() {
-                    @Override
-                    protected CompletionStage<?> whenSent() {
-                        return now();
-                    }
-                };
-            }
-        };
-        WebSocket ws = newInstance(listener, transport);
-        listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
-        List<MockListener.ListenerInvocation> invocations = listener.invocations();
-        System.out.println(invocations);
-        assertEquals(invocations, List.of(onOpen(ws),
-                                          onText(ws, "1", FIRST),
-                                          onText(ws, "2", PART),
-                                          onPing(ws, ByteBuffer.allocate(16)),
-                                          onPing(ws, ByteBuffer.allocate(32)),
-                                          onText(ws, "3", LAST),
-                                          onPing(ws, ByteBuffer.allocate(64)),
-                                          onClose(ws, NORMAL_CLOSURE, "no reason")));
-    }
-
-    private static CompletionStage<?> seconds(long s) {
-        return new CompletableFuture<>().completeOnTimeout(null, s, TimeUnit.SECONDS);
-    }
-
-    private static CompletionStage<?> now() {
-        return completedStage(null);
-    }
-
-    private static WebSocket newInstance(WebSocket.Listener listener,
-                                         TransportSupplier transport) {
-        URI uri = URI.create("ws://localhost");
-        String subprotocol = "";
-        return WebSocketImpl.newInstance(uri, subprotocol, listener, transport);
-    }
-}
--- a/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/SendingTest.java	Thu Dec 14 18:41:57 2017 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,164 +0,0 @@
-/*
- * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.incubator.http.internal.websocket;
-
-import jdk.incubator.http.WebSocket;
-import org.testng.annotations.Test;
-
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.Random;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static jdk.incubator.http.WebSocket.NORMAL_CLOSURE;
-import static jdk.incubator.http.internal.websocket.TestSupport.assertCompletesExceptionally;
-import static jdk.incubator.http.internal.websocket.WebSocketImpl.newInstance;
-import static org.testng.Assert.assertEquals;
-
-public class SendingTest {
-
-    @Test
-    public void sendTextImmediately() {
-        MockTransmitter t = new MockTransmitter() {
-            @Override
-            protected CompletionStage<?> whenSent() {
-                return CompletableFuture.completedFuture(null);
-            }
-        };
-        WebSocket ws = newWebSocket(t);
-        CompletableFuture.completedFuture(ws)
-                .thenCompose(w -> w.sendText("1", true))
-                .thenCompose(w -> w.sendText("2", true))
-                .thenCompose(w -> w.sendText("3", true))
-                .join();
-
-        assertEquals(t.queue().size(), 3);
-    }
-
-    @Test
-    public void sendTextWithDelay() {
-        MockTransmitter t = new MockTransmitter() {
-            @Override
-            protected CompletionStage<?> whenSent() {
-                return new CompletableFuture<>()
-                        .completeOnTimeout(null, 1, TimeUnit.SECONDS);
-            }
-        };
-        WebSocket ws = newWebSocket(t);
-        CompletableFuture.completedFuture(ws)
-                .thenCompose(w -> w.sendText("1", true))
-                .thenCompose(w -> w.sendText("2", true))
-                .thenCompose(w -> w.sendText("3", true))
-                .join();
-
-        assertEquals(t.queue().size(), 3);
-    }
-
-    @Test
-    public void sendTextMixedDelay() {
-        Random r = new Random();
-
-        MockTransmitter t = new MockTransmitter() {
-            @Override
-            protected CompletionStage<?> whenSent() {
-                return r.nextBoolean() ?
-                        new CompletableFuture<>().completeOnTimeout(null, 1, TimeUnit.SECONDS) :
-                        CompletableFuture.completedFuture(null);
-            }
-        };
-        WebSocket ws = newWebSocket(t);
-        CompletableFuture.completedFuture(ws)
-                .thenCompose(w -> w.sendText("1", true))
-                .thenCompose(w -> w.sendText("2", true))
-                .thenCompose(w -> w.sendText("3", true))
-                .thenCompose(w -> w.sendText("4", true))
-                .thenCompose(w -> w.sendText("5", true))
-                .thenCompose(w -> w.sendText("6", true))
-                .thenCompose(w -> w.sendText("7", true))
-                .thenCompose(w -> w.sendText("8", true))
-                .thenCompose(w -> w.sendText("9", true))
-                .join();
-
-        assertEquals(t.queue().size(), 9);
-    }
-
-    @Test
-    public void sendControlMessagesConcurrently() {
-
-        CompletableFuture<?> first = new CompletableFuture<>();
-
-        MockTransmitter t = new MockTransmitter() {
-
-            final AtomicInteger i = new AtomicInteger();
-
-            @Override
-            protected CompletionStage<?> whenSent() {
-                if (i.incrementAndGet() == 1) {
-                    return first;
-                } else {
-                    return CompletableFuture.completedFuture(null);
-                }
-            }
-        };
-        WebSocket ws = newWebSocket(t);
-
-        CompletableFuture<?> cf1 = ws.sendPing(ByteBuffer.allocate(0));
-        CompletableFuture<?> cf2 = ws.sendPong(ByteBuffer.allocate(0));
-        CompletableFuture<?> cf3 = ws.sendClose(NORMAL_CLOSURE, "");
-        CompletableFuture<?> cf4 = ws.sendClose(NORMAL_CLOSURE, "");
-        CompletableFuture<?> cf5 = ws.sendPing(ByteBuffer.allocate(0));
-        CompletableFuture<?> cf6 = ws.sendPong(ByteBuffer.allocate(0));
-
-        first.complete(null);
-        // Don't care about exceptional completion, only that all of them have
-        // completed
-        CompletableFuture.allOf(cf1, cf2, cf3, cf4, cf5, cf6)
-                .handle((v, e) -> null).join();
-
-        cf3.join(); /* Check that sendClose has completed normally */
-        cf4.join(); /* Check that repeated sendClose has completed normally */
-        assertCompletesExceptionally(IllegalStateException.class, cf5);
-        assertCompletesExceptionally(IllegalStateException.class, cf6);
-
-        assertEquals(t.queue().size(), 3); // 6 minus 3 that were not accepted
-    }
-
-    private static WebSocket newWebSocket(Transmitter transmitter) {
-        URI uri = URI.create("ws://localhost");
-        String subprotocol = "";
-        TransportSupplier transport = new MockTransport() {
-            @Override
-            public Transmitter transmitter() {
-                return transmitter;
-            }
-        };
-        return newInstance(uri,
-                           subprotocol,
-                           new MockListener(Long.MAX_VALUE),
-                           transport);
-    }
-}
--- a/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/TestSupport.java	Thu Dec 14 18:41:57 2017 +0000
+++ b/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/TestSupport.java	Fri Dec 15 00:47:16 2017 +0300
@@ -297,13 +297,13 @@
         } catch (Throwable t) {
             caught = t;
         }
+        if (caught == null) {
+            throw new AssertionFailedException("No exception was thrown");
+        }
         if (predicate.test(caught)) {
             System.out.println("Got expected exception: " + caught);
             return caught;
         }
-        if (caught == null) {
-            throw new AssertionFailedException("No exception was thrown");
-        }
         throw new AssertionFailedException("Caught exception didn't match the predicate", caught);
     }
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/WebSocketImplTest.java	Fri Dec 15 00:47:16 2017 +0300
@@ -0,0 +1,380 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.incubator.http.internal.websocket;
+
+import jdk.incubator.http.WebSocket;
+import org.testng.annotations.Test;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static jdk.incubator.http.WebSocket.MessagePart.FIRST;
+import static jdk.incubator.http.WebSocket.MessagePart.LAST;
+import static jdk.incubator.http.WebSocket.MessagePart.PART;
+import static jdk.incubator.http.WebSocket.MessagePart.WHOLE;
+import static jdk.incubator.http.WebSocket.NORMAL_CLOSURE;
+import static jdk.incubator.http.internal.websocket.MockListener.Invocation.onClose;
+import static jdk.incubator.http.internal.websocket.MockListener.Invocation.onError;
+import static jdk.incubator.http.internal.websocket.MockListener.Invocation.onOpen;
+import static jdk.incubator.http.internal.websocket.MockListener.Invocation.onPing;
+import static jdk.incubator.http.internal.websocket.MockListener.Invocation.onPong;
+import static jdk.incubator.http.internal.websocket.MockListener.Invocation.onText;
+import static jdk.incubator.http.internal.websocket.MockTransport.onClose;
+import static jdk.incubator.http.internal.websocket.MockTransport.onPing;
+import static jdk.incubator.http.internal.websocket.MockTransport.onPong;
+import static jdk.incubator.http.internal.websocket.MockTransport.onText;
+import static jdk.incubator.http.internal.websocket.TestSupport.assertCompletesExceptionally;
+import static org.testng.Assert.assertEquals;
+
+/*
+ * Formatting in this file may seem strange:
+ *
+ *  (
+ *   ( ...)
+ *  ...
+ *  )
+ *  ...
+ *
+ *  However there is a rationale behind it. Sometimes the level of argument
+ *  nesting is high, which makes it hard to manage parentheses.
+ */
+public class WebSocketImplTest {
+
+    // TODO: request in onClose/onError
+    // TODO: throw exception in onClose/onError
+    // TODO: exception is thrown from request()
+
+    @Test
+    public void testNonPositiveRequest() throws Exception {
+        MockListener listener = new MockListener(Long.MAX_VALUE) {
+            @Override
+            protected void onOpen0(WebSocket webSocket) {
+                webSocket.request(0);
+            }
+        };
+        WebSocket ws = newInstance(listener, List.of(now(onText("1", WHOLE))));
+        listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
+        List<MockListener.Invocation> invocations = listener.invocations();
+        assertEquals(
+                invocations,
+                List.of(
+                        onOpen(ws),
+                        onError(ws, IllegalArgumentException.class)
+                )
+        );
+    }
+
+    @Test
+    public void testText1() throws Exception {
+        MockListener listener = new MockListener(Long.MAX_VALUE);
+        WebSocket ws = newInstance(
+                listener,
+                List.of(
+                        now(onText("1", FIRST)),
+                        now(onText("2", PART)),
+                        now(onText("3", LAST)),
+                        now(onClose(NORMAL_CLOSURE, "no reason"))
+                )
+        );
+        listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
+        List<MockListener.Invocation> invocations = listener.invocations();
+        assertEquals(
+                invocations,
+                List.of(
+                        onOpen(ws),
+                        onText(ws, "1", FIRST),
+                        onText(ws, "2", PART),
+                        onText(ws, "3", LAST),
+                        onClose(ws, NORMAL_CLOSURE, "no reason")
+                )
+        );
+    }
+
+    @Test
+    public void testText2() throws Exception {
+        MockListener listener = new MockListener(Long.MAX_VALUE);
+        WebSocket ws = newInstance(
+                listener,
+                List.of(
+                        now(onText("1", FIRST)),
+                        seconds(1, onText("2", PART)),
+                        now(onText("3", LAST)),
+                        seconds(1, onClose(NORMAL_CLOSURE, "no reason"))
+                )
+        );
+        listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
+        List<MockListener.Invocation> invocations = listener.invocations();
+        assertEquals(
+                invocations,
+                List.of(
+                        onOpen(ws),
+                        onText(ws, "1", FIRST),
+                        onText(ws, "2", PART),
+                        onText(ws, "3", LAST),
+                        onClose(ws, NORMAL_CLOSURE, "no reason")
+                )
+        );
+    }
+
+    @Test
+    public void testTextIntermixedWithPongs() throws Exception {
+        MockListener listener = new MockListener(Long.MAX_VALUE);
+        WebSocket ws = newInstance(
+                listener,
+                List.of(
+                        now(onText("1", FIRST)),
+                        now(onText("2", PART)),
+                        now(onPong(ByteBuffer.allocate(16))),
+                        seconds(1, onPong(ByteBuffer.allocate(32))),
+                        now(onText("3", LAST)),
+                        now(onPong(ByteBuffer.allocate(64))),
+                        now(onClose(NORMAL_CLOSURE, "no reason"))
+                )
+        );
+        listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
+        List<MockListener.Invocation> invocations = listener.invocations();
+        assertEquals(
+                invocations,
+                List.of(
+                        onOpen(ws),
+                        onText(ws, "1", FIRST),
+                        onText(ws, "2", PART),
+                        onPong(ws, ByteBuffer.allocate(16)),
+                        onPong(ws, ByteBuffer.allocate(32)),
+                        onText(ws, "3", LAST),
+                        onPong(ws, ByteBuffer.allocate(64)),
+                        onClose(ws, NORMAL_CLOSURE, "no reason")
+                )
+        );
+    }
+
+    @Test
+    public void testTextIntermixedWithPings() throws Exception {
+        MockListener listener = new MockListener(Long.MAX_VALUE);
+        WebSocket ws = newInstance(
+                listener,
+                List.of(
+                        now(onText("1", FIRST)),
+                        now(onText("2", PART)),
+                        now(onPing(ByteBuffer.allocate(16))),
+                        seconds(1, onPing(ByteBuffer.allocate(32))),
+                        now(onText("3", LAST)),
+                        now(onPing(ByteBuffer.allocate(64))),
+                        now(onClose(NORMAL_CLOSURE, "no reason"))
+                )
+        );
+        listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
+        List<MockListener.Invocation> invocations = listener.invocations();
+        assertEquals(
+                invocations,
+                List.of(
+                        onOpen(ws),
+                        onText(ws, "1", FIRST),
+                        onText(ws, "2", PART),
+                        onPing(ws, ByteBuffer.allocate(16)),
+                        onPing(ws, ByteBuffer.allocate(32)),
+                        onText(ws, "3", LAST),
+                        onPing(ws, ByteBuffer.allocate(64)),
+                        onClose(ws, NORMAL_CLOSURE, "no reason"))
+        );
+    }
+
+    @Test
+    public void sendTextImmediately() {
+        WebSocketImpl ws = newInstance(
+                new MockListener(1),
+                new TransportFactory() {
+                    @Override
+                    public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
+                                                            MessageStreamConsumer consumer) {
+                        return new MockTransport<>(sendResultSupplier, consumer);
+                    }
+                });
+        CompletableFuture.completedFuture(ws)
+                .thenCompose(w -> w.sendText("1", true))
+                .thenCompose(w -> w.sendText("2", true))
+                .thenCompose(w -> w.sendText("3", true))
+                .join();
+        MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport();
+        assertEquals(transport.invocations().size(), 3);
+    }
+
+    @Test
+    public void sendTextWithDelay() {
+        MockListener listener = new MockListener(1);
+        WebSocketImpl ws = newInstance(
+                listener,
+                new TransportFactory() {
+                    @Override
+                    public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
+                                                            MessageStreamConsumer consumer) {
+                        return new MockTransport<>(sendResultSupplier, consumer) {
+                            @Override
+                            protected CompletableFuture<T> defaultSend() {
+                                return seconds(1, result());
+                            }
+                        };
+                    }
+                });
+        CompletableFuture.completedFuture(ws)
+                .thenCompose(w -> w.sendText("1", true))
+                .thenCompose(w -> w.sendText("2", true))
+                .thenCompose(w -> w.sendText("3", true))
+                .join();
+        assertEquals(listener.invocations(), List.of(onOpen(ws)));
+        MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport();
+        assertEquals(transport.invocations().size(), 3);
+    }
+
+    @Test
+    public void sendTextMixedDelay() {
+        MockListener listener = new MockListener(1);
+        WebSocketImpl ws = newInstance(
+                listener,
+                new TransportFactory() {
+
+                    final Random r = new Random();
+
+                    @Override
+                    public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
+                                                            MessageStreamConsumer consumer) {
+                        return new MockTransport<>(sendResultSupplier, consumer) {
+                            @Override
+                            protected CompletableFuture<T> defaultSend() {
+                                return r.nextBoolean()
+                                        ? seconds(1, result())
+                                        : now(result());
+                            }
+                        };
+                    }
+                });
+        CompletableFuture.completedFuture(ws)
+                .thenCompose(w -> w.sendText("1", true))
+                .thenCompose(w -> w.sendText("2", true))
+                .thenCompose(w -> w.sendText("3", true))
+                .thenCompose(w -> w.sendText("4", true))
+                .thenCompose(w -> w.sendText("5", true))
+                .thenCompose(w -> w.sendText("6", true))
+                .thenCompose(w -> w.sendText("7", true))
+                .thenCompose(w -> w.sendText("8", true))
+                .thenCompose(w -> w.sendText("9", true))
+                .join();
+        assertEquals(listener.invocations(), List.of(onOpen(ws)));
+        MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport();
+        assertEquals(transport.invocations().size(), 9);
+    }
+
+    @Test(enabled = false) // temporarily disabled
+    public void sendControlMessagesConcurrently() {
+        MockListener listener = new MockListener(1);
+
+        CompletableFuture<?> first = new CompletableFuture<>(); // barrier
+
+        WebSocketImpl ws = newInstance(
+                listener,
+                new TransportFactory() {
+
+                    final AtomicInteger i = new AtomicInteger();
+
+                    @Override
+                    public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
+                                                            MessageStreamConsumer consumer) {
+                        return new MockTransport<>(sendResultSupplier, consumer) {
+                            @Override
+                            protected CompletableFuture<T> defaultSend() {
+                                if (i.incrementAndGet() == 1) {
+                                    return first.thenApply(o -> result());
+                                } else {
+                                    return now(result());
+                                }
+                            }
+                        };
+                    }
+                });
+
+        CompletableFuture<?> cf1 = ws.sendPing(ByteBuffer.allocate(0));
+        CompletableFuture<?> cf2 = ws.sendPong(ByteBuffer.allocate(0));
+        CompletableFuture<?> cf3 = ws.sendClose(NORMAL_CLOSURE, "");
+        CompletableFuture<?> cf4 = ws.sendClose(NORMAL_CLOSURE, "");
+        CompletableFuture<?> cf5 = ws.sendPing(ByteBuffer.allocate(0));
+        CompletableFuture<?> cf6 = ws.sendPong(ByteBuffer.allocate(0));
+
+        first.complete(null);
+        // Don't care about exceptional completion, only that all of them have
+        // completed
+        CompletableFuture.allOf(cf1, cf2, cf3, cf4, cf5, cf6)
+                .handle((v, e) -> null).join();
+
+        cf3.join(); /* Check that sendClose has completed normally */
+        cf4.join(); /* Check that repeated sendClose has completed normally */
+        assertCompletesExceptionally(IllegalStateException.class, cf5);
+        assertCompletesExceptionally(IllegalStateException.class, cf6);
+
+        assertEquals(listener.invocations(), List.of(onOpen(ws)));
+        MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport();
+        assertEquals(transport.invocations().size(), 3); // 6 minus 3 that were not accepted
+    }
+
+    private static <T> CompletableFuture<T> seconds(long sec, T result) {
+        return new CompletableFuture<T>()
+                .completeOnTimeout(result, sec, TimeUnit.SECONDS);
+    }
+
+    private static <T> CompletableFuture<T> now(T result) {
+        return CompletableFuture.completedFuture(result);
+    }
+
+    private static WebSocketImpl newInstance(
+            WebSocket.Listener listener,
+            Collection<CompletableFuture<Consumer<MessageStreamConsumer>>> input) {
+        TransportFactory factory = new TransportFactory() {
+            @Override
+            public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
+                                                    MessageStreamConsumer consumer) {
+                return new MockTransport<T>(sendResultSupplier, consumer) {
+                    @Override
+                    protected Collection<CompletableFuture<Consumer<MessageStreamConsumer>>> receive() {
+                        return input;
+                    }
+                };
+            }
+        };
+        return newInstance(listener, factory);
+    }
+
+    private static WebSocketImpl newInstance(WebSocket.Listener listener,
+                                             TransportFactory factory) {
+        URI uri = URI.create("ws://localhost");
+        String subprotocol = "";
+        return WebSocketImpl.newInstance(uri, subprotocol, listener, factory);
+    }
+}