--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java Tue Nov 28 13:57:23 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java Tue Nov 28 17:30:14 2017 +0300
@@ -58,7 +58,7 @@
private final FrameConsumer frameConsumer;
private final Frame.Reader reader = new Frame.Reader();
private final RawChannel.RawEvent event = createHandler();
- private final Demand demand = new Demand();
+ protected final Demand demand = new Demand(); /* Exposed for testing purposes */
private final SequentialScheduler pushScheduler;
private ByteBuffer data;
@@ -137,7 +137,7 @@
public void run() {
while (!pushScheduler.isStopped()) {
if (data.hasRemaining()) {
- if (demand.get() > 0) {
+ if (!demand.isFulfilled()) {
try {
int oldPos = data.position();
reader.readFrame(data, frameConsumer);
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/TransportSupplier.java Tue Nov 28 13:57:23 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/TransportSupplier.java Tue Nov 28 17:30:14 2017 +0300
@@ -35,7 +35,7 @@
*/
public class TransportSupplier {
- private final RawChannel channel;
+ protected final RawChannel channel; /* Exposed for testing purposes */
private final Object lock = new Object();
private Transmitter transmitter;
private Receiver receiver;
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java Tue Nov 28 13:57:23 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java Tue Nov 28 17:30:14 2017 +0300
@@ -107,14 +107,11 @@
public static CompletableFuture<WebSocket> newInstanceAsync(BuilderImpl b) {
Function<Result, WebSocket> newWebSocket = r -> {
- WebSocketImpl ws = new WebSocketImpl(b.getUri(),
- r.subprotocol,
- b.getListener(),
- r.transport);
- // This initialisation is outside of the constructor for the sake of
- // safe publication of WebSocketImpl.this
- ws.signalOpen();
- // make sure we don't release the builder until this lambda
+ WebSocket ws = newInstance(b.getUri(),
+ r.subprotocol,
+ b.getListener(),
+ r.transport);
+ // Make sure we don't release the builder until this lambda
// has been executed. The builder has a strong reference to
// the HttpClientFacade, and we want to keep that live until
// after the raw channel is created and passed to WebSocketImpl.
@@ -130,10 +127,22 @@
return h.send().thenApply(newWebSocket);
}
- WebSocketImpl(URI uri,
- String subprotocol,
- Listener listener,
- TransportSupplier transport)
+ /* Exposed for testing purposes */
+ static WebSocket newInstance(URI uri,
+ String subprotocol,
+ Listener listener,
+ TransportSupplier transport) {
+ WebSocketImpl ws = new WebSocketImpl(uri, subprotocol, listener, transport);
+ // This initialisation is outside of the constructor for the sake of
+ // safe publication of WebSocketImpl.this
+ ws.signalOpen();
+ return ws;
+ }
+
+ private WebSocketImpl(URI uri,
+ String subprotocol,
+ Listener listener,
+ TransportSupplier transport)
{
this.uri = requireNonNull(uri);
this.subprotocol = requireNonNull(subprotocol);
@@ -362,12 +371,6 @@
default:
throw new InternalError(String.valueOf(s));
}
- // Do not keep references to arbitrary big objects we no longer
- // need. It is unknown when the next message might come (if
- // ever), so the following references should be null the sooner
- // the better:
- binaryData = null;
- text = null;
} catch (Throwable t) {
signalError(t);
}
--- a/test/jdk/java/net/httpclient/websocket/BuildingWebSocketDriver.java Tue Nov 28 13:57:23 2017 +0000
+++ b/test/jdk/java/net/httpclient/websocket/BuildingWebSocketDriver.java Tue Nov 28 17:30:14 2017 +0300
@@ -25,7 +25,6 @@
* @test
* @bug 8159053
* @modules jdk.incubator.httpclient/jdk.incubator.http.internal.websocket:open
- * @compile/module=jdk.incubator.httpclient jdk/incubator/http/internal/websocket/TestSupport.java
* @run testng/othervm --add-reads jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.BuildingWebSocketTest
*/
public final class BuildingWebSocketDriver { }
--- a/test/jdk/java/net/httpclient/websocket/HeaderWriterDriver.java Tue Nov 28 13:57:23 2017 +0000
+++ b/test/jdk/java/net/httpclient/websocket/HeaderWriterDriver.java Tue Nov 28 17:30:14 2017 +0300
@@ -25,7 +25,6 @@
* @test
* @bug 8159053
* @modules jdk.incubator.httpclient/jdk.incubator.http.internal.websocket:open
- * @compile/module=jdk.incubator.httpclient jdk/incubator/http/internal/websocket/TestSupport.java
* @run testng/othervm --add-reads jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.HeaderWriterTest
*/
public final class HeaderWriterDriver { }
--- a/test/jdk/java/net/httpclient/websocket/MaskerDriver.java Tue Nov 28 13:57:23 2017 +0000
+++ b/test/jdk/java/net/httpclient/websocket/MaskerDriver.java Tue Nov 28 17:30:14 2017 +0300
@@ -25,7 +25,6 @@
* @test
* @bug 8159053
* @modules jdk.incubator.httpclient/jdk.incubator.http.internal.websocket:open
- * @compile/module=jdk.incubator.httpclient jdk/incubator/http/internal/websocket/TestSupport.java
* @run testng/othervm --add-reads jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.MaskerTest
*/
public final class MaskerDriver { }
--- a/test/jdk/java/net/httpclient/websocket/ReaderDriver.java Tue Nov 28 13:57:23 2017 +0000
+++ b/test/jdk/java/net/httpclient/websocket/ReaderDriver.java Tue Nov 28 17:30:14 2017 +0300
@@ -25,7 +25,6 @@
* @test
* @bug 8159053
* @modules jdk.incubator.httpclient/jdk.incubator.http.internal.websocket:open
- * @compile/module=jdk.incubator.httpclient jdk/incubator/http/internal/websocket/TestSupport.java
* @run testng/othervm --add-reads jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.ReaderTest
*/
public final class ReaderDriver { }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/websocket/ReceivingTestDriver.java Tue Nov 28 17:30:14 2017 +0300
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * @test
+ * @modules jdk.incubator.httpclient/jdk.incubator.http.internal.websocket:open
+ * @run testng/othervm --add-reads jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.ReceivingTest
+ */
+public class ReceivingTestDriver { }
--- a/test/jdk/java/net/httpclient/websocket/SendingTestDriver.java Tue Nov 28 13:57:23 2017 +0000
+++ b/test/jdk/java/net/httpclient/websocket/SendingTestDriver.java Tue Nov 28 17:30:14 2017 +0300
@@ -24,7 +24,6 @@
/*
* @test
* @modules jdk.incubator.httpclient/jdk.incubator.http.internal.websocket:open
- * @compile/module=jdk.incubator.httpclient jdk/incubator/http/internal/websocket/TestSupport.java
* @run testng/othervm --add-reads jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.SendingTest
*/
public class SendingTestDriver { }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockListener.java Tue Nov 28 17:30:14 2017 +0300
@@ -0,0 +1,82 @@
+package jdk.incubator.http.internal.websocket;
+
+import jdk.incubator.http.WebSocket;
+import jdk.incubator.http.WebSocket.MessagePart;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletionStage;
+
+public class MockListener implements WebSocket.Listener {
+
+ private final long bufferSize;
+ private long count;
+
+ /*
+ * Typical buffer sizes: 1, n, Long.MAX_VALUE
+ */
+ public MockListener(long bufferSize) {
+ if (bufferSize < 1) {
+ throw new IllegalArgumentException();
+ }
+ this.bufferSize = bufferSize;
+ }
+
+ @Override
+ public void onOpen(WebSocket webSocket) {
+ System.out.printf("onOpen(%s)%n", webSocket);
+ replenishDemandIfNeeded(webSocket);
+ }
+
+ @Override
+ public CompletionStage<?> onText(WebSocket webSocket,
+ CharSequence message,
+ MessagePart part) {
+ System.out.printf("onText(%s, %s, %s)%n", webSocket, message, part);
+ replenishDemandIfNeeded(webSocket);
+ return null;
+ }
+
+ @Override
+ public CompletionStage<?> onBinary(WebSocket webSocket,
+ ByteBuffer message,
+ MessagePart part) {
+ System.out.printf("onBinary(%s, %s, %s)%n", webSocket, message, part);
+ replenishDemandIfNeeded(webSocket);
+ return null;
+ }
+
+ @Override
+ public CompletionStage<?> onPing(WebSocket webSocket, ByteBuffer message) {
+ System.out.printf("onPing(%s, %s)%n", webSocket, message);
+ replenishDemandIfNeeded(webSocket);
+ return null;
+ }
+
+ @Override
+ public CompletionStage<?> onPong(WebSocket webSocket, ByteBuffer message) {
+ System.out.printf("onPong(%s, %s)%n", webSocket, message);
+ replenishDemandIfNeeded(webSocket);
+ return null;
+ }
+
+ @Override
+ public CompletionStage<?> onClose(WebSocket webSocket,
+ int statusCode,
+ String reason) {
+ System.out.printf("onClose(%s, %s, %s)%n", webSocket, statusCode, reason);
+ return null;
+ }
+
+ @Override
+ public void onError(WebSocket webSocket, Throwable error) {
+ System.out.printf("onError(%s, %s)%n", webSocket, error);
+ }
+
+ private void replenishDemandIfNeeded(WebSocket webSocket) {
+ if (--count <= 0) {
+ count = bufferSize - bufferSize / 2;
+ System.out.printf("request(%s)%n", count);
+ webSocket.request(count);
+ }
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockReceiver.java Tue Nov 28 17:30:14 2017 +0300
@@ -0,0 +1,85 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.incubator.http.internal.websocket;
+
+import jdk.incubator.http.internal.common.Pair;
+import jdk.incubator.http.internal.common.SequentialScheduler;
+import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.function.Consumer;
+
+public class MockReceiver extends Receiver {
+
+ private final Iterator<Pair<CompletionStage<?>, Consumer<MessageStreamConsumer>>> iterator;
+ private final MessageStreamConsumer consumer;
+
+ public MockReceiver(MessageStreamConsumer consumer, RawChannel channel,
+ Pair<CompletionStage<?>, Consumer<MessageStreamConsumer>>... pairs) {
+ super(consumer, channel);
+ this.consumer = consumer;
+ iterator = Arrays.asList(pairs).iterator();
+ }
+
+ @Override
+ protected SequentialScheduler createScheduler() {
+ class X { // Class is hack needed to allow the task to refer to the scheduler
+ SequentialScheduler scheduler = new SequentialScheduler(task());
+
+ SequentialScheduler.RestartableTask task() {
+ return new SequentialScheduler.RestartableTask() {
+ @Override
+ public void run(DeferredCompleter taskCompleter) {
+ if (!scheduler.isStopped() && !demand.isFulfilled()) {
+ if (!iterator.hasNext()) {
+ taskCompleter.complete();
+ return;
+ }
+ Pair<CompletionStage<?>, Consumer<MessageStreamConsumer>> p = iterator.next();
+ CompletableFuture<?> cf = p.first.toCompletableFuture();
+ if (cf.isDone()) { // Forcing synchronous execution
+ p.second.accept(consumer);
+ repeat(taskCompleter);
+ } else {
+ cf.whenCompleteAsync((r, e) -> {
+ p.second.accept(consumer);
+ repeat(taskCompleter);
+ });
+ }
+ }
+ }
+
+ private void repeat(DeferredCompleter taskCompleter) {
+ taskCompleter.complete();
+ scheduler.runOrSchedule();
+ }
+ };
+ }
+ }
+ return new X().scheduler;
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockTransmitter.java Tue Nov 28 17:30:14 2017 +0300
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.incubator.http.internal.websocket;
+
+import java.util.Queue;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Consumer;
+
+public abstract class MockTransmitter extends Transmitter {
+
+ private final long startTime = System.currentTimeMillis();
+
+ private final Queue<OutgoingMessage> messages = new ConcurrentLinkedQueue<>();
+
+ public MockTransmitter() {
+ super(null);
+ }
+
+ @Override
+ public void send(OutgoingMessage message,
+ Consumer<Exception> completionHandler) {
+ System.out.printf("[%6s ms.] begin send(%s)%n",
+ System.currentTimeMillis() - startTime,
+ message);
+ messages.add(message);
+ whenSent().whenComplete((r, e) -> {
+ System.out.printf("[%6s ms.] complete send(%s)%n",
+ System.currentTimeMillis() - startTime,
+ message);
+ if (e != null) {
+ completionHandler.accept((Exception) e);
+ } else {
+ completionHandler.accept(null);
+ }
+ });
+ System.out.printf("[%6s ms.] end send(%s)%n",
+ System.currentTimeMillis() - startTime,
+ message);
+ }
+
+ @Override
+ public void close() { }
+
+ protected abstract CompletionStage<?> whenSent();
+
+ public Queue<OutgoingMessage> queue() {
+ return messages;
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockTransport.java Tue Nov 28 17:30:14 2017 +0300
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.incubator.http.internal.websocket;
+
+import java.nio.ByteBuffer;
+
+public class MockTransport extends TransportSupplier {
+
+ public MockTransport() {
+ super(new NullRawChannel());
+ }
+
+ public static class NullRawChannel implements RawChannel {
+
+ @Override
+ public void registerEvent(RawEvent event) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ByteBuffer initialByteBuffer() {
+ return ByteBuffer.allocate(0);
+ }
+
+ @Override
+ public ByteBuffer read() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long write(ByteBuffer[] srcs, int offset, int length) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void shutdownInput() {
+ }
+
+ @Override
+ public void shutdownOutput() {
+ }
+
+ @Override
+ public void close() {
+ }
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/ReceivingTest.java Tue Nov 28 17:30:14 2017 +0300
@@ -0,0 +1,155 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.incubator.http.internal.websocket;
+
+import jdk.incubator.http.WebSocket;
+import jdk.incubator.http.WebSocket.MessagePart;
+import org.testng.annotations.Test;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.concurrent.CompletableFuture.completedStage;
+import static jdk.incubator.http.WebSocket.MessagePart.FIRST;
+import static jdk.incubator.http.WebSocket.MessagePart.LAST;
+import static jdk.incubator.http.WebSocket.MessagePart.PART;
+import static jdk.incubator.http.WebSocket.MessagePart.WHOLE;
+import static jdk.incubator.http.WebSocket.NORMAL_CLOSURE;
+import static jdk.incubator.http.internal.common.Pair.pair;
+import static jdk.incubator.http.internal.websocket.WebSocketImpl.newInstance;
+
+public class ReceivingTest {
+
+ // TODO: request in onClose/onError
+ // TODO: throw exception in onClose/onError
+
+ @Test
+ public void testNonPositiveRequest() {
+ URI uri = URI.create("ws://localhost");
+ String subprotocol = "";
+ CompletableFuture<Throwable> result = new CompletableFuture<>();
+ newInstance(uri, subprotocol, new MockListener(Long.MAX_VALUE) {
+
+ final AtomicInteger onOpenCount = new AtomicInteger();
+ volatile WebSocket webSocket;
+
+ @Override
+ public void onOpen(WebSocket webSocket) {
+ int i = onOpenCount.incrementAndGet();
+ if (i > 1) {
+ result.completeExceptionally(new IllegalStateException());
+ } else {
+ this.webSocket = webSocket;
+ webSocket.request(0);
+ }
+ }
+
+ @Override
+ public CompletionStage<?> onBinary(WebSocket webSocket,
+ ByteBuffer message,
+ MessagePart part) {
+ result.completeExceptionally(new IllegalStateException());
+ return null;
+ }
+
+ @Override
+ public CompletionStage<?> onText(WebSocket webSocket,
+ CharSequence message,
+ MessagePart part) {
+ result.completeExceptionally(new IllegalStateException());
+ return null;
+ }
+
+ @Override
+ public CompletionStage<?> onPing(WebSocket webSocket,
+ ByteBuffer message) {
+ result.completeExceptionally(new IllegalStateException());
+ return null;
+ }
+
+ @Override
+ public CompletionStage<?> onPong(WebSocket webSocket,
+ ByteBuffer message) {
+ result.completeExceptionally(new IllegalStateException());
+ return null;
+ }
+
+ @Override
+ public CompletionStage<?> onClose(WebSocket webSocket,
+ int statusCode,
+ String reason) {
+ result.completeExceptionally(new IllegalStateException());
+ return null;
+ }
+
+ @Override
+ public void onError(WebSocket webSocket, Throwable error) {
+ if (!this.webSocket.equals(webSocket)) {
+ result.completeExceptionally(new IllegalArgumentException());
+ } else if (error == null || error.getClass() != IllegalArgumentException.class) {
+ result.completeExceptionally(new IllegalArgumentException());
+ } else {
+ result.complete(null);
+ }
+ }
+ }, new MockTransport() {
+ @Override
+ protected Receiver newReceiver(MessageStreamConsumer consumer) {
+ return new MockReceiver(consumer, channel, pair(now(), m -> m.onText("1", WHOLE) ));
+ }
+ });
+ result.join();
+ }
+
+ @Test
+ public void testText1() throws InterruptedException {
+ URI uri = URI.create("ws://localhost");
+ String subprotocol = "";
+ newInstance(uri, subprotocol, new MockListener(Long.MAX_VALUE),
+ new MockTransport() {
+ @Override
+ protected Receiver newReceiver(MessageStreamConsumer consumer) {
+ return new MockReceiver(consumer, channel,
+ pair(now(), m -> m.onText("1", FIRST)),
+ pair(now(), m -> m.onText("2", PART)),
+ pair(now(), m -> m.onText("3", PART)),
+ pair(now(), m -> m.onText("4", LAST)),
+ pair(now(), m -> m.onClose(NORMAL_CLOSURE, "no reason")));
+ }
+ });
+ Thread.sleep(2000);
+ }
+
+ private CompletionStage<?> inSeconds(long s) {
+ return new CompletableFuture<>().completeOnTimeout(null, s, TimeUnit.SECONDS);
+ }
+
+ private CompletionStage<?> now() {
+ return completedStage(null);
+ }
+}
--- a/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/SendingTest.java Tue Nov 28 13:57:23 2017 +0000
+++ b/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/SendingTest.java Tue Nov 28 17:30:14 2017 +0300
@@ -26,20 +26,17 @@
import jdk.incubator.http.WebSocket;
import org.testng.annotations.Test;
-import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
-import java.util.Queue;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
import static jdk.incubator.http.WebSocket.NORMAL_CLOSURE;
import static jdk.incubator.http.internal.websocket.TestSupport.assertCompletesExceptionally;
+import static jdk.incubator.http.internal.websocket.WebSocketImpl.newInstance;
import static org.testng.Assert.assertEquals;
public class SendingTest {
@@ -153,92 +150,15 @@
private static WebSocket newWebSocket(Transmitter transmitter) {
URI uri = URI.create("ws://localhost");
String subprotocol = "";
- RawChannel channel = new RawChannel() {
-
- @Override
- public void registerEvent(RawEvent event) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ByteBuffer initialByteBuffer() {
- return ByteBuffer.allocate(0);
- }
-
- @Override
- public ByteBuffer read() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long write(ByteBuffer[] srcs, int offset, int length) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void shutdownInput() {
- }
-
- @Override
- public void shutdownOutput() {
- }
-
- @Override
- public void close() {
- }
- };
- TransportSupplier transport = new TransportSupplier(channel) {
+ TransportSupplier transport = new MockTransport() {
@Override
public Transmitter transmitter() {
return transmitter;
}
};
- return new WebSocketImpl(
- uri,
- subprotocol,
- new WebSocket.Listener() { },
- transport);
- }
-
- private abstract class MockTransmitter extends Transmitter {
-
- private final long startTime = System.currentTimeMillis();
-
- private final Queue<OutgoingMessage> messages = new ConcurrentLinkedQueue<>();
-
- public MockTransmitter() {
- super(null);
- }
-
- @Override
- public void send(OutgoingMessage message,
- Consumer<Exception> completionHandler) {
- System.out.printf("[%6s ms.] begin send(%s)%n",
- System.currentTimeMillis() - startTime,
- message);
- messages.add(message);
- whenSent().whenComplete((r, e) -> {
- System.out.printf("[%6s ms.] complete send(%s)%n",
- System.currentTimeMillis() - startTime,
- message);
- if (e != null) {
- completionHandler.accept((Exception) e);
- } else {
- completionHandler.accept(null);
- }
- });
- System.out.printf("[%6s ms.] end send(%s)%n",
- System.currentTimeMillis() - startTime,
- message);
- }
-
- @Override
- public void close() { }
-
- protected abstract CompletionStage<?> whenSent();
-
- public Queue<OutgoingMessage> queue() {
- return messages;
- }
+ return newInstance(uri,
+ subprotocol,
+ new MockListener(Long.MAX_VALUE),
+ transport);
}
}