--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OutgoingMessage.java Thu Nov 23 12:50:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OutgoingMessage.java Thu Nov 23 17:46:02 2017 +0300
@@ -71,12 +71,13 @@
* so it would be possible to defer the work it does until the most
* convenient moment (up to the point where sentTo is invoked).
*/
- protected void contextualize(Context context) {
+ protected boolean contextualize(Context context) {
// masking and charset decoding should be performed here rather than in
// the constructor (as of today)
if (context.isCloseSent()) {
throw new IllegalStateException("Close sent");
}
+ return true;
}
protected boolean sendTo(RawChannel channel) throws IOException {
@@ -115,7 +116,7 @@
}
@Override
- protected void contextualize(Context context) {
+ protected boolean contextualize(Context context) {
super.contextualize(context);
if (context.isPreviousBinary() && !context.isPreviousLast()) {
throw new IllegalStateException("Unexpected text message");
@@ -125,6 +126,7 @@
context.setPreviousBinary(false);
context.setPreviousText(true);
context.setPreviousLast(isLast);
+ return true;
}
}
@@ -139,7 +141,7 @@
}
@Override
- protected void contextualize(Context context) {
+ protected boolean contextualize(Context context) {
super.contextualize(context);
if (context.isPreviousText() && !context.isPreviousLast()) {
throw new IllegalStateException("Unexpected binary message");
@@ -150,6 +152,7 @@
context.setPreviousText(false);
context.setPreviousBinary(true);
context.setPreviousLast(isLast);
+ return true;
}
}
@@ -195,9 +198,13 @@
}
@Override
- protected void contextualize(Context context) {
- super.contextualize(context);
- context.setCloseSent();
+ protected boolean contextualize(Context context) {
+ if (context.isCloseSent()) {
+ return false;
+ } else {
+ context.setCloseSent();
+ return true;
+ }
}
}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transmitter.java Thu Nov 23 12:50:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transmitter.java Thu Nov 23 17:46:02 2017 +0300
@@ -40,7 +40,7 @@
* to accept a new message. Until then, the transmitter is considered "busy" and
* an IllegalStateException will be thrown on each attempt to invoke send.
*/
-final class Transmitter {
+public class Transmitter {
/* This flag is used solely for assertions */
private final AtomicBoolean busy = new AtomicBoolean();
@@ -49,8 +49,8 @@
private final RawChannel channel;
private final RawChannel.RawEvent event;
- Transmitter(RawChannel channel) {
- this.channel = requireNonNull(channel);
+ public Transmitter(RawChannel channel) {
+ this.channel = channel;
this.event = createHandler();
}
@@ -59,7 +59,9 @@
* A {@code StackOverflowError} may thus occur if there's a possibility
* that this method is called again by the supplied handler.
*/
- void send(OutgoingMessage message, Consumer<Exception> completionHandler) {
+ public void send(OutgoingMessage message,
+ Consumer<Exception> completionHandler)
+ {
requireNonNull(message);
requireNonNull(completionHandler);
if (!busy.compareAndSet(false, true)) {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java Thu Nov 23 12:50:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java Thu Nov 23 17:46:02 2017 +0300
@@ -65,14 +65,14 @@
/*
* A WebSocket client.
*/
-final class WebSocketImpl implements WebSocket {
+public final class WebSocketImpl implements WebSocket {
private final URI uri;
private final String subprotocol;
private final RawChannel channel; /* Stored to call close() on */
private final Listener listener;
- private volatile boolean intputClosed;
+ private volatile boolean inputClosed;
private volatile boolean outputClosed;
/*
@@ -113,7 +113,7 @@
private final CompletableFuture<?> channelInputClosed = new MinimalFuture<>();
private final CompletableFuture<?> channelOutputClosed = new MinimalFuture<>();
- static CompletableFuture<WebSocket> newInstanceAsync(BuilderImpl b) {
+ public static CompletableFuture<WebSocket> newInstanceAsync(BuilderImpl b) {
Function<Result, WebSocket> newWebSocket = r -> {
WebSocketImpl ws = new WebSocketImpl(b.getUri(),
r.subprotocol,
@@ -144,11 +144,25 @@
RawChannel channel,
Listener listener)
{
+ this(uri,
+ subprotocol,
+ channel,
+ listener,
+ new Transmitter(channel));
+ }
+
+ /* Exposed for testing purposes */
+ WebSocketImpl(URI uri,
+ String subprotocol,
+ RawChannel channel,
+ Listener listener,
+ Transmitter transmitter)
+ {
this.uri = requireNonNull(uri);
this.subprotocol = requireNonNull(subprotocol);
this.channel = requireNonNull(channel);
this.listener = requireNonNull(listener);
- this.transmitter = new Transmitter(channel);
+ this.transmitter = transmitter;
this.receiver = new Receiver(messageConsumerOf(listener), channel);
this.sendScheduler = new SequentialScheduler(new SendTask());
@@ -201,7 +215,7 @@
* Processes a Close event that came from the channel. Invoked at most once.
*/
private void processClose(int statusCode, String reason) {
- intputClosed = true;
+ inputClosed = true;
receiver.close();
try {
channel.shutdownInput();
@@ -364,24 +378,33 @@
OutgoingMessage message = p.first;
CompletableFuture<WebSocket> cf = p.second;
try {
- message.contextualize(context);
+ if (!message.contextualize(context)) { // Do not send the message
+ cf.complete(null);
+ repeat(taskCompleter);
+ return;
+ }
Consumer<Exception> h = e -> {
if (e == null) {
cf.complete(WebSocketImpl.this);
} else {
cf.completeExceptionally(e);
}
- taskCompleter.complete();
- // More than a single message may have been enqueued while
- // the task has been busy with the current message, but
- // there only one signal is recorded
- sendScheduler.runOrSchedule();
+ repeat(taskCompleter);
};
transmitter.send(message, h);
} catch (Exception t) {
cf.completeExceptionally(t);
+ repeat(taskCompleter);
}
}
+
+ private void repeat(DeferredCompleter taskCompleter) {
+ taskCompleter.complete();
+ // More than a single message may have been enqueued while
+ // the task has been busy with the current message, but
+ // there is only a single signal recorded
+ sendScheduler.runOrSchedule();
+ }
}
@Override
@@ -401,12 +424,12 @@
@Override
public boolean isInputClosed() {
- return intputClosed;
+ return inputClosed;
}
@Override
public void abort() {
- intputClosed = true;
+ inputClosed = true;
outputClosed = true;
try {
channel.close();
@@ -504,7 +527,7 @@
@Override
public void onError(Exception error) {
- intputClosed = true;
+ inputClosed = true;
outputClosed = true;
if (!(error instanceof FailWebSocketException)) {
abort();
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/websocket/SendingTestDriver.java Thu Nov 23 17:46:02 2017 +0300
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * @test
+ * @modules jdk.incubator.httpclient/jdk.incubator.http.internal.websocket:open
+ * @compile/module=jdk.incubator.httpclient jdk/incubator/http/internal/websocket/TestSupport.java
+ * @run testng/othervm --add-reads jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.SendingTest
+ */
+public class SendingTestDriver { }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/SendingTest.java Thu Nov 23 17:46:02 2017 +0300
@@ -0,0 +1,234 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.incubator.http.internal.websocket;
+
+import jdk.incubator.http.WebSocket;
+import org.testng.annotations.Test;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+import static jdk.incubator.http.WebSocket.NORMAL_CLOSURE;
+import static jdk.incubator.http.internal.websocket.TestSupport.assertCompletesExceptionally;
+import static org.testng.Assert.assertEquals;
+
+public class SendingTest {
+
+ @Test
+ public void sendTextImmediately() {
+ MockTransmitter t = new MockTransmitter() {
+ @Override
+ protected CompletionStage<?> whenSent() {
+ return CompletableFuture.completedFuture(null);
+ }
+ };
+ WebSocket ws = newWebSocket(t);
+ CompletableFuture.completedFuture(ws)
+ .thenCompose(w -> w.sendText("1", true))
+ .thenCompose(w -> w.sendText("2", true))
+ .thenCompose(w -> w.sendText("3", true))
+ .join();
+
+ assertEquals(t.queue().size(), 3);
+ }
+
+ @Test
+ public void sendTextWithDelay() {
+ MockTransmitter t = new MockTransmitter() {
+ @Override
+ protected CompletionStage<?> whenSent() {
+ return new CompletableFuture<>()
+ .completeOnTimeout(null, 1, TimeUnit.SECONDS);
+ }
+ };
+ WebSocket ws = newWebSocket(t);
+ CompletableFuture.completedFuture(ws)
+ .thenCompose(w -> w.sendText("1", true))
+ .thenCompose(w -> w.sendText("2", true))
+ .thenCompose(w -> w.sendText("3", true))
+ .join();
+
+ assertEquals(t.queue().size(), 3);
+ }
+
+ @Test
+ public void sendTextMixedDelay() {
+ Random r = new Random();
+
+ MockTransmitter t = new MockTransmitter() {
+ @Override
+ protected CompletionStage<?> whenSent() {
+ return r.nextBoolean() ?
+ new CompletableFuture<>().completeOnTimeout(null, 1, TimeUnit.SECONDS) :
+ CompletableFuture.completedFuture(null);
+ }
+ };
+ WebSocket ws = newWebSocket(t);
+ CompletableFuture.completedFuture(ws)
+ .thenCompose(w -> w.sendText("1", true))
+ .thenCompose(w -> w.sendText("2", true))
+ .thenCompose(w -> w.sendText("3", true))
+ .thenCompose(w -> w.sendText("4", true))
+ .thenCompose(w -> w.sendText("5", true))
+ .thenCompose(w -> w.sendText("6", true))
+ .thenCompose(w -> w.sendText("7", true))
+ .thenCompose(w -> w.sendText("8", true))
+ .thenCompose(w -> w.sendText("9", true))
+ .join();
+
+ assertEquals(t.queue().size(), 9);
+ }
+
+ @Test
+ public void sendControlMessagesConcurrently() {
+
+ CompletableFuture<?> first = new CompletableFuture<>();
+
+ MockTransmitter t = new MockTransmitter() {
+
+ final AtomicInteger i = new AtomicInteger();
+
+ @Override
+ protected CompletionStage<?> whenSent() {
+ if (i.incrementAndGet() == 1) {
+ return first;
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+ };
+ WebSocket ws = newWebSocket(t);
+
+ CompletableFuture<?> cf1 = ws.sendPing(ByteBuffer.allocate(0));
+ CompletableFuture<?> cf2 = ws.sendPong(ByteBuffer.allocate(0));
+ CompletableFuture<?> cf3 = ws.sendClose(NORMAL_CLOSURE, "");
+ CompletableFuture<?> cf4 = ws.sendClose(NORMAL_CLOSURE, "");
+ CompletableFuture<?> cf5 = ws.sendPing(ByteBuffer.allocate(0));
+ CompletableFuture<?> cf6 = ws.sendPong(ByteBuffer.allocate(0));
+
+ first.complete(null);
+ // Don't care about exceptional completion, only that all of them have
+ // completed
+ CompletableFuture.allOf(cf1, cf2, cf3, cf4, cf5, cf6)
+ .handle((v, e) -> null).join();
+
+ cf3.join(); /* Check that sendClose has completed normally */
+ cf4.join(); /* Check that repeated sendClose has completed normally */
+ assertCompletesExceptionally(IllegalStateException.class, cf5);
+ assertCompletesExceptionally(IllegalStateException.class, cf6);
+
+ assertEquals(t.queue().size(), 3); // 6 minus 3 that were not accepted
+ }
+
+ private static WebSocket newWebSocket(Transmitter transmitter) {
+ URI uri = URI.create("ws://localhost");
+ String subprotocol = "";
+ RawChannel channel = new RawChannel() {
+
+ @Override
+ public void registerEvent(RawEvent event) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ByteBuffer initialByteBuffer() {
+ return ByteBuffer.allocate(0);
+ }
+
+ @Override
+ public ByteBuffer read() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long write(ByteBuffer[] srcs, int offset, int length) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void shutdownInput() {
+ }
+
+ @Override
+ public void shutdownOutput() {
+ }
+
+ @Override
+ public void close() {
+ }
+ };
+ return new WebSocketImpl(
+ uri,
+ subprotocol,
+ channel,
+ new WebSocket.Listener() { }, transmitter);
+ }
+
+ private abstract class MockTransmitter extends Transmitter {
+
+ private final long startTime = System.currentTimeMillis();
+
+ private final Queue<OutgoingMessage> messages = new ConcurrentLinkedQueue<>();
+
+ public MockTransmitter() {
+ super(null);
+ }
+
+ @Override
+ public void send(OutgoingMessage message,
+ Consumer<Exception> completionHandler) {
+ System.out.printf("[%6s ms.] begin send(%s)%n",
+ System.currentTimeMillis() - startTime,
+ message);
+ messages.add(message);
+ whenSent().whenComplete((r, e) -> {
+ System.out.printf("[%6s ms.] complete send(%s)%n",
+ System.currentTimeMillis() - startTime,
+ message);
+ if (e != null) {
+ completionHandler.accept((Exception) e);
+ } else {
+ completionHandler.accept(null);
+ }
+ });
+ System.out.printf("[%6s ms.] end send(%s)%n",
+ System.currentTimeMillis() - startTime,
+ message);
+ }
+
+ protected abstract CompletionStage<?> whenSent();
+
+ public Queue<OutgoingMessage> queue() {
+ return messages;
+ }
+ }
+}