http-client-branch: (WebSocket) tests & bugfixes http-client-branch
authorprappo
Tue, 28 Nov 2017 17:30:14 +0300
branchhttp-client-branch
changeset 55907 f6a3a657416e
parent 55906 5917595fbf16
child 55908 a36a236e55d8
http-client-branch: (WebSocket) tests & bugfixes
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/TransportSupplier.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java
test/jdk/java/net/httpclient/websocket/BuildingWebSocketDriver.java
test/jdk/java/net/httpclient/websocket/HeaderWriterDriver.java
test/jdk/java/net/httpclient/websocket/MaskerDriver.java
test/jdk/java/net/httpclient/websocket/ReaderDriver.java
test/jdk/java/net/httpclient/websocket/ReceivingTestDriver.java
test/jdk/java/net/httpclient/websocket/SendingTestDriver.java
test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockListener.java
test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockReceiver.java
test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockTransmitter.java
test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockTransport.java
test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/ReceivingTest.java
test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/SendingTest.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java	Tue Nov 28 13:57:23 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java	Tue Nov 28 17:30:14 2017 +0300
@@ -58,7 +58,7 @@
     private final FrameConsumer frameConsumer;
     private final Frame.Reader reader = new Frame.Reader();
     private final RawChannel.RawEvent event = createHandler();
-    private final Demand demand = new Demand();
+    protected final Demand demand = new Demand(); /* Exposed for testing purposes */
     private final SequentialScheduler pushScheduler;
 
     private ByteBuffer data;
@@ -137,7 +137,7 @@
         public void run() {
             while (!pushScheduler.isStopped()) {
                 if (data.hasRemaining()) {
-                    if (demand.get() > 0) {
+                    if (!demand.isFulfilled()) {
                         try {
                             int oldPos = data.position();
                             reader.readFrame(data, frameConsumer);
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/TransportSupplier.java	Tue Nov 28 13:57:23 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/TransportSupplier.java	Tue Nov 28 17:30:14 2017 +0300
@@ -35,7 +35,7 @@
  */
 public class TransportSupplier {
 
-    private final RawChannel channel;
+    protected final RawChannel channel; /* Exposed for testing purposes */
     private final Object lock = new Object();
     private Transmitter transmitter;
     private Receiver receiver;
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java	Tue Nov 28 13:57:23 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java	Tue Nov 28 17:30:14 2017 +0300
@@ -107,14 +107,11 @@
 
     public static CompletableFuture<WebSocket> newInstanceAsync(BuilderImpl b) {
         Function<Result, WebSocket> newWebSocket = r -> {
-            WebSocketImpl ws = new WebSocketImpl(b.getUri(),
-                                                 r.subprotocol,
-                                                 b.getListener(),
-                                                 r.transport);
-            // This initialisation is outside of the constructor for the sake of
-            // safe publication of WebSocketImpl.this
-            ws.signalOpen();
-            // make sure we don't release the builder until this lambda
+            WebSocket ws = newInstance(b.getUri(),
+                                       r.subprotocol,
+                                       b.getListener(),
+                                       r.transport);
+            // Make sure we don't release the builder until this lambda
             // has been executed. The builder has a strong reference to
             // the HttpClientFacade, and we want to keep that live until
             // after the raw channel is created and passed to WebSocketImpl.
@@ -130,10 +127,22 @@
         return h.send().thenApply(newWebSocket);
     }
 
-    WebSocketImpl(URI uri,
-                  String subprotocol,
-                  Listener listener,
-                  TransportSupplier transport)
+    /* Exposed for testing purposes */
+    static WebSocket newInstance(URI uri,
+                                 String subprotocol,
+                                 Listener listener,
+                                 TransportSupplier transport) {
+        WebSocketImpl ws = new WebSocketImpl(uri, subprotocol, listener, transport);
+        // This initialisation is outside of the constructor for the sake of
+        // safe publication of WebSocketImpl.this
+        ws.signalOpen();
+        return ws;
+    }
+
+    private WebSocketImpl(URI uri,
+                          String subprotocol,
+                          Listener listener,
+                          TransportSupplier transport)
     {
         this.uri = requireNonNull(uri);
         this.subprotocol = requireNonNull(subprotocol);
@@ -362,12 +371,6 @@
                     default:
                         throw new InternalError(String.valueOf(s));
                 }
-                // Do not keep references to arbitrary big objects we no longer
-                // need. It is unknown when the next message might come (if
-                // ever), so the following references should be null the sooner
-                // the better:
-                binaryData = null;
-                text = null;
             } catch (Throwable t) {
                 signalError(t);
             }
--- a/test/jdk/java/net/httpclient/websocket/BuildingWebSocketDriver.java	Tue Nov 28 13:57:23 2017 +0000
+++ b/test/jdk/java/net/httpclient/websocket/BuildingWebSocketDriver.java	Tue Nov 28 17:30:14 2017 +0300
@@ -25,7 +25,6 @@
  * @test
  * @bug 8159053
  * @modules jdk.incubator.httpclient/jdk.incubator.http.internal.websocket:open
- * @compile/module=jdk.incubator.httpclient jdk/incubator/http/internal/websocket/TestSupport.java
  * @run testng/othervm --add-reads jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.BuildingWebSocketTest
  */
 public final class BuildingWebSocketDriver { }
--- a/test/jdk/java/net/httpclient/websocket/HeaderWriterDriver.java	Tue Nov 28 13:57:23 2017 +0000
+++ b/test/jdk/java/net/httpclient/websocket/HeaderWriterDriver.java	Tue Nov 28 17:30:14 2017 +0300
@@ -25,7 +25,6 @@
  * @test
  * @bug 8159053
  * @modules jdk.incubator.httpclient/jdk.incubator.http.internal.websocket:open
- * @compile/module=jdk.incubator.httpclient jdk/incubator/http/internal/websocket/TestSupport.java
  * @run testng/othervm --add-reads jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.HeaderWriterTest
  */
 public final class HeaderWriterDriver { }
--- a/test/jdk/java/net/httpclient/websocket/MaskerDriver.java	Tue Nov 28 13:57:23 2017 +0000
+++ b/test/jdk/java/net/httpclient/websocket/MaskerDriver.java	Tue Nov 28 17:30:14 2017 +0300
@@ -25,7 +25,6 @@
  * @test
  * @bug 8159053
  * @modules jdk.incubator.httpclient/jdk.incubator.http.internal.websocket:open
- * @compile/module=jdk.incubator.httpclient jdk/incubator/http/internal/websocket/TestSupport.java
  * @run testng/othervm --add-reads jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.MaskerTest
  */
 public final class MaskerDriver { }
--- a/test/jdk/java/net/httpclient/websocket/ReaderDriver.java	Tue Nov 28 13:57:23 2017 +0000
+++ b/test/jdk/java/net/httpclient/websocket/ReaderDriver.java	Tue Nov 28 17:30:14 2017 +0300
@@ -25,7 +25,6 @@
  * @test
  * @bug 8159053
  * @modules jdk.incubator.httpclient/jdk.incubator.http.internal.websocket:open
- * @compile/module=jdk.incubator.httpclient jdk/incubator/http/internal/websocket/TestSupport.java
  * @run testng/othervm --add-reads jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.ReaderTest
  */
 public final class ReaderDriver { }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/websocket/ReceivingTestDriver.java	Tue Nov 28 17:30:14 2017 +0300
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * @test
+ * @modules jdk.incubator.httpclient/jdk.incubator.http.internal.websocket:open
+ * @run testng/othervm --add-reads jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.ReceivingTest
+ */
+public class ReceivingTestDriver { }
--- a/test/jdk/java/net/httpclient/websocket/SendingTestDriver.java	Tue Nov 28 13:57:23 2017 +0000
+++ b/test/jdk/java/net/httpclient/websocket/SendingTestDriver.java	Tue Nov 28 17:30:14 2017 +0300
@@ -24,7 +24,6 @@
 /*
  * @test
  * @modules jdk.incubator.httpclient/jdk.incubator.http.internal.websocket:open
- * @compile/module=jdk.incubator.httpclient jdk/incubator/http/internal/websocket/TestSupport.java
  * @run testng/othervm --add-reads jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.SendingTest
  */
 public class SendingTestDriver { }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockListener.java	Tue Nov 28 17:30:14 2017 +0300
@@ -0,0 +1,82 @@
+package jdk.incubator.http.internal.websocket;
+
+import jdk.incubator.http.WebSocket;
+import jdk.incubator.http.WebSocket.MessagePart;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletionStage;
+
+public class MockListener implements WebSocket.Listener {
+
+    private final long bufferSize;
+    private long count;
+
+    /*
+     * Typical buffer sizes: 1, n, Long.MAX_VALUE
+     */
+    public MockListener(long bufferSize) {
+        if (bufferSize < 1) {
+            throw new IllegalArgumentException();
+        }
+        this.bufferSize = bufferSize;
+    }
+
+    @Override
+    public void onOpen(WebSocket webSocket) {
+        System.out.printf("onOpen(%s)%n", webSocket);
+        replenishDemandIfNeeded(webSocket);
+    }
+
+    @Override
+    public CompletionStage<?> onText(WebSocket webSocket,
+                                     CharSequence message,
+                                     MessagePart part) {
+        System.out.printf("onText(%s, %s, %s)%n", webSocket, message, part);
+        replenishDemandIfNeeded(webSocket);
+        return null;
+    }
+
+    @Override
+    public CompletionStage<?> onBinary(WebSocket webSocket,
+                                       ByteBuffer message,
+                                       MessagePart part) {
+        System.out.printf("onBinary(%s, %s, %s)%n", webSocket, message, part);
+        replenishDemandIfNeeded(webSocket);
+        return null;
+    }
+
+    @Override
+    public CompletionStage<?> onPing(WebSocket webSocket, ByteBuffer message) {
+        System.out.printf("onPing(%s, %s)%n", webSocket, message);
+        replenishDemandIfNeeded(webSocket);
+        return null;
+    }
+
+    @Override
+    public CompletionStage<?> onPong(WebSocket webSocket, ByteBuffer message) {
+        System.out.printf("onPong(%s, %s)%n", webSocket, message);
+        replenishDemandIfNeeded(webSocket);
+        return null;
+    }
+
+    @Override
+    public CompletionStage<?> onClose(WebSocket webSocket,
+                                      int statusCode,
+                                      String reason) {
+        System.out.printf("onClose(%s, %s, %s)%n", webSocket, statusCode, reason);
+        return null;
+    }
+
+    @Override
+    public void onError(WebSocket webSocket, Throwable error) {
+        System.out.printf("onError(%s, %s)%n", webSocket, error);
+    }
+
+    private void replenishDemandIfNeeded(WebSocket webSocket) {
+        if (--count <= 0) {
+            count = bufferSize - bufferSize / 2;
+            System.out.printf("request(%s)%n", count);
+            webSocket.request(count);
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockReceiver.java	Tue Nov 28 17:30:14 2017 +0300
@@ -0,0 +1,85 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.incubator.http.internal.websocket;
+
+import jdk.incubator.http.internal.common.Pair;
+import jdk.incubator.http.internal.common.SequentialScheduler;
+import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.function.Consumer;
+
+public class MockReceiver extends Receiver {
+
+    private final Iterator<Pair<CompletionStage<?>, Consumer<MessageStreamConsumer>>> iterator;
+    private final MessageStreamConsumer consumer;
+
+    public MockReceiver(MessageStreamConsumer consumer, RawChannel channel,
+                        Pair<CompletionStage<?>, Consumer<MessageStreamConsumer>>... pairs) {
+        super(consumer, channel);
+        this.consumer = consumer;
+        iterator = Arrays.asList(pairs).iterator();
+    }
+
+    @Override
+    protected SequentialScheduler createScheduler() {
+        class X { // Class is hack needed to allow the task to refer to the scheduler
+            SequentialScheduler scheduler = new SequentialScheduler(task());
+
+            SequentialScheduler.RestartableTask task() {
+                return new SequentialScheduler.RestartableTask() {
+                    @Override
+                    public void run(DeferredCompleter taskCompleter) {
+                        if (!scheduler.isStopped() && !demand.isFulfilled()) {
+                            if (!iterator.hasNext()) {
+                                taskCompleter.complete();
+                                return;
+                            }
+                            Pair<CompletionStage<?>, Consumer<MessageStreamConsumer>> p = iterator.next();
+                            CompletableFuture<?> cf = p.first.toCompletableFuture();
+                            if (cf.isDone()) { // Forcing synchronous execution
+                                p.second.accept(consumer);
+                                repeat(taskCompleter);
+                            } else {
+                                cf.whenCompleteAsync((r, e) -> {
+                                    p.second.accept(consumer);
+                                    repeat(taskCompleter);
+                                });
+                            }
+                        }
+                    }
+
+                    private void repeat(DeferredCompleter taskCompleter) {
+                        taskCompleter.complete();
+                        scheduler.runOrSchedule();
+                    }
+                };
+            }
+        }
+        return new X().scheduler;
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockTransmitter.java	Tue Nov 28 17:30:14 2017 +0300
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.incubator.http.internal.websocket;
+
+import java.util.Queue;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Consumer;
+
+public abstract class MockTransmitter extends Transmitter {
+
+    private final long startTime = System.currentTimeMillis();
+
+    private final Queue<OutgoingMessage> messages = new ConcurrentLinkedQueue<>();
+
+    public MockTransmitter() {
+        super(null);
+    }
+
+    @Override
+    public void send(OutgoingMessage message,
+                     Consumer<Exception> completionHandler) {
+        System.out.printf("[%6s ms.] begin send(%s)%n",
+                          System.currentTimeMillis() - startTime,
+                          message);
+        messages.add(message);
+        whenSent().whenComplete((r, e) -> {
+            System.out.printf("[%6s ms.] complete send(%s)%n",
+                              System.currentTimeMillis() - startTime,
+                              message);
+            if (e != null) {
+                completionHandler.accept((Exception) e);
+            } else {
+                completionHandler.accept(null);
+            }
+        });
+        System.out.printf("[%6s ms.] end send(%s)%n",
+                          System.currentTimeMillis() - startTime,
+                          message);
+    }
+
+    @Override
+    public void close() { }
+
+    protected abstract CompletionStage<?> whenSent();
+
+    public Queue<OutgoingMessage> queue() {
+        return messages;
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockTransport.java	Tue Nov 28 17:30:14 2017 +0300
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.incubator.http.internal.websocket;
+
+import java.nio.ByteBuffer;
+
+public class MockTransport extends TransportSupplier {
+
+    public MockTransport() {
+        super(new NullRawChannel());
+    }
+
+    public static class NullRawChannel implements RawChannel {
+
+        @Override
+        public void registerEvent(RawEvent event) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ByteBuffer initialByteBuffer() {
+            return ByteBuffer.allocate(0);
+        }
+
+        @Override
+        public ByteBuffer read() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public long write(ByteBuffer[] srcs, int offset, int length) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void shutdownInput() {
+        }
+
+        @Override
+        public void shutdownOutput() {
+        }
+
+        @Override
+        public void close() {
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/ReceivingTest.java	Tue Nov 28 17:30:14 2017 +0300
@@ -0,0 +1,155 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.incubator.http.internal.websocket;
+
+import jdk.incubator.http.WebSocket;
+import jdk.incubator.http.WebSocket.MessagePart;
+import org.testng.annotations.Test;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.concurrent.CompletableFuture.completedStage;
+import static jdk.incubator.http.WebSocket.MessagePart.FIRST;
+import static jdk.incubator.http.WebSocket.MessagePart.LAST;
+import static jdk.incubator.http.WebSocket.MessagePart.PART;
+import static jdk.incubator.http.WebSocket.MessagePart.WHOLE;
+import static jdk.incubator.http.WebSocket.NORMAL_CLOSURE;
+import static jdk.incubator.http.internal.common.Pair.pair;
+import static jdk.incubator.http.internal.websocket.WebSocketImpl.newInstance;
+
+public class ReceivingTest {
+
+    // TODO: request in onClose/onError
+    // TODO: throw exception in onClose/onError
+
+    @Test
+    public void testNonPositiveRequest() {
+        URI uri = URI.create("ws://localhost");
+        String subprotocol = "";
+        CompletableFuture<Throwable> result = new CompletableFuture<>();
+        newInstance(uri, subprotocol, new MockListener(Long.MAX_VALUE) {
+
+            final AtomicInteger onOpenCount = new AtomicInteger();
+            volatile WebSocket webSocket;
+
+            @Override
+            public void onOpen(WebSocket webSocket) {
+                int i = onOpenCount.incrementAndGet();
+                if (i > 1) {
+                    result.completeExceptionally(new IllegalStateException());
+                } else {
+                    this.webSocket = webSocket;
+                    webSocket.request(0);
+                }
+            }
+
+            @Override
+            public CompletionStage<?> onBinary(WebSocket webSocket,
+                                               ByteBuffer message,
+                                               MessagePart part) {
+                result.completeExceptionally(new IllegalStateException());
+                return null;
+            }
+
+            @Override
+            public CompletionStage<?> onText(WebSocket webSocket,
+                                             CharSequence message,
+                                             MessagePart part) {
+                result.completeExceptionally(new IllegalStateException());
+                return null;
+            }
+
+            @Override
+            public CompletionStage<?> onPing(WebSocket webSocket,
+                                             ByteBuffer message) {
+                result.completeExceptionally(new IllegalStateException());
+                return null;
+            }
+
+            @Override
+            public CompletionStage<?> onPong(WebSocket webSocket,
+                                             ByteBuffer message) {
+                result.completeExceptionally(new IllegalStateException());
+                return null;
+            }
+
+            @Override
+            public CompletionStage<?> onClose(WebSocket webSocket,
+                                              int statusCode,
+                                              String reason) {
+                result.completeExceptionally(new IllegalStateException());
+                return null;
+            }
+
+            @Override
+            public void onError(WebSocket webSocket, Throwable error) {
+                if (!this.webSocket.equals(webSocket)) {
+                    result.completeExceptionally(new IllegalArgumentException());
+                } else if (error == null || error.getClass() != IllegalArgumentException.class) {
+                    result.completeExceptionally(new IllegalArgumentException());
+                } else {
+                    result.complete(null);
+                }
+            }
+        }, new MockTransport() {
+            @Override
+            protected Receiver newReceiver(MessageStreamConsumer consumer) {
+                return new MockReceiver(consumer, channel, pair(now(), m -> m.onText("1", WHOLE) ));
+            }
+        });
+        result.join();
+    }
+
+    @Test
+    public void testText1() throws InterruptedException {
+        URI uri = URI.create("ws://localhost");
+        String subprotocol = "";
+        newInstance(uri, subprotocol, new MockListener(Long.MAX_VALUE),
+                    new MockTransport() {
+                        @Override
+                        protected Receiver newReceiver(MessageStreamConsumer consumer) {
+                            return new MockReceiver(consumer, channel,
+                                                    pair(now(), m -> m.onText("1", FIRST)),
+                                                    pair(now(), m -> m.onText("2", PART)),
+                                                    pair(now(), m -> m.onText("3", PART)),
+                                                    pair(now(), m -> m.onText("4", LAST)),
+                                                    pair(now(), m -> m.onClose(NORMAL_CLOSURE, "no reason")));
+                        }
+                    });
+        Thread.sleep(2000);
+    }
+
+    private CompletionStage<?> inSeconds(long s) {
+        return new CompletableFuture<>().completeOnTimeout(null, s, TimeUnit.SECONDS);
+    }
+
+    private CompletionStage<?> now() {
+        return completedStage(null);
+    }
+}
--- a/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/SendingTest.java	Tue Nov 28 13:57:23 2017 +0000
+++ b/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/SendingTest.java	Tue Nov 28 17:30:14 2017 +0300
@@ -26,20 +26,17 @@
 import jdk.incubator.http.WebSocket;
 import org.testng.annotations.Test;
 
-import java.io.IOException;
 import java.net.URI;
 import java.nio.ByteBuffer;
-import java.util.Queue;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
 
 import static jdk.incubator.http.WebSocket.NORMAL_CLOSURE;
 import static jdk.incubator.http.internal.websocket.TestSupport.assertCompletesExceptionally;
+import static jdk.incubator.http.internal.websocket.WebSocketImpl.newInstance;
 import static org.testng.Assert.assertEquals;
 
 public class SendingTest {
@@ -153,92 +150,15 @@
     private static WebSocket newWebSocket(Transmitter transmitter) {
         URI uri = URI.create("ws://localhost");
         String subprotocol = "";
-        RawChannel channel = new RawChannel() {
-
-            @Override
-            public void registerEvent(RawEvent event) {
-                throw new UnsupportedOperationException();
-            }
-
-            @Override
-            public ByteBuffer initialByteBuffer() {
-                return ByteBuffer.allocate(0);
-            }
-
-            @Override
-            public ByteBuffer read() {
-                throw new UnsupportedOperationException();
-            }
-
-            @Override
-            public long write(ByteBuffer[] srcs, int offset, int length) {
-                throw new UnsupportedOperationException();
-            }
-
-            @Override
-            public void shutdownInput() {
-            }
-
-            @Override
-            public void shutdownOutput() {
-            }
-
-            @Override
-            public void close() {
-            }
-        };
-        TransportSupplier transport = new TransportSupplier(channel) {
+        TransportSupplier transport = new MockTransport() {
             @Override
             public Transmitter transmitter() {
                 return transmitter;
             }
         };
-        return new WebSocketImpl(
-                uri,
-                subprotocol,
-                new WebSocket.Listener() { },
-                transport);
-    }
-
-    private abstract class MockTransmitter extends Transmitter {
-
-        private final long startTime = System.currentTimeMillis();
-
-        private final Queue<OutgoingMessage> messages = new ConcurrentLinkedQueue<>();
-
-        public MockTransmitter() {
-            super(null);
-        }
-
-        @Override
-        public void send(OutgoingMessage message,
-                         Consumer<Exception> completionHandler) {
-            System.out.printf("[%6s ms.] begin send(%s)%n",
-                              System.currentTimeMillis() - startTime,
-                              message);
-            messages.add(message);
-            whenSent().whenComplete((r, e) -> {
-                System.out.printf("[%6s ms.] complete send(%s)%n",
-                                  System.currentTimeMillis() - startTime,
-                                  message);
-                if (e != null) {
-                    completionHandler.accept((Exception) e);
-                } else {
-                    completionHandler.accept(null);
-                }
-            });
-            System.out.printf("[%6s ms.] end send(%s)%n",
-                              System.currentTimeMillis() - startTime,
-                              message);
-        }
-
-        @Override
-        public void close() { }
-
-        protected abstract CompletionStage<?> whenSent();
-
-        public Queue<OutgoingMessage> queue() {
-            return messages;
-        }
+        return newInstance(uri,
+                           subprotocol,
+                           new MockListener(Long.MAX_VALUE),
+                           transport);
     }
 }