--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transport.java Fri Dec 15 00:47:16 2017 +0300
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transport.java Fri Dec 15 03:05:16 2017 +0300
@@ -30,6 +30,25 @@
import java.util.concurrent.CompletableFuture;
/*
+ * Transport needs some way to asynchronously notify the send operation has been
+ * completed. It can have several different designs each of which has its own
+ * pros and cons:
+ *
+ * (1) void sendMessage(..., Callback)
+ * (2) CompletableFuture<T> sendMessage(...)
+ * (3) CompletableFuture<T> sendMessage(..., Callback)
+ * (4) boolean sendMessage(..., Callback) throws IOException
+ * ...
+ *
+ * If Transport's users use CFs, (1) forces these users to create CFs and pass
+ * them to the callback. If any additional (dependant) action needs to be
+ * attached to the returned CF, this means an extra object (CF) must be created
+ * in (2). (3) and (4) solves both issues, however (4) does not abstract out
+ * when exactly the operation has been performed. So the handling code needs to
+ * be repeated twice. And that leads to 2 different code paths (more bugs).
+ * Unless designed for this, the user should not assume any specific order of
+ * completion in (3) (e.g. callback first and then the returned CF).
+ *
* The only parametrization of Transport<T> used is Transport<WebSocket>. The
* type parameter T was introduced solely to avoid circular dependency between
* Transport and WebSocket. After all, instances of T are used solely to
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/TransportImpl.java Fri Dec 15 00:47:16 2017 +0300
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/TransportImpl.java Fri Dec 15 03:05:16 2017 +0300
@@ -124,9 +124,9 @@
private void send0(OutgoingMessage message, Consumer<Exception> handler) {
boolean b = busy.get();
assert b; // Please don't inline this, as busy.get() has memory
- // visibility effects and we don't want the program behaviour
- // to depend on whether the assertions are turned on
- // or turned off
+ // visibility effects and we don't want the program behaviour
+ // to depend on whether the assertions are turned on
+ // or turned off
try {
boolean sent = message.sendTo(channel);
if (sent) {
@@ -203,7 +203,7 @@
CompletableFuture<T> cf = p.second;
try {
if (!message.contextualize(context)) { // Do not send the message
- cf.complete(null);
+ cf.complete(resultSupplier.get());
repeat(taskCompleter);
return;
}
@@ -256,9 +256,9 @@
@Override
public void acknowledgeReception() {
- long x = demand.decreaseAndGet(1);
- if (x < 0) {
- throw new InternalError(String.valueOf(x));
+ boolean decremented = demand.tryDecrement();
+ if (!decremented) {
+ throw new InternalError();
}
}
@@ -320,7 +320,7 @@
}
/*
- * Stops the machinery from reading and delivering messages permanently,
+ * Permanently stops reading from the channel and delivering messages
* regardless of the current demand and data availability.
*/
@Override
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java Fri Dec 15 00:47:16 2017 +0300
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java Fri Dec 15 03:05:16 2017 +0300
@@ -153,8 +153,7 @@
return failedFuture(new IllegalStateException("Send pending"));
}
CompletableFuture<WebSocket> cf = transport.sendText(message, isLast);
- cf.whenComplete((r, e) -> outstandingSend.set(false));
- return cf;
+ return cf.whenComplete((r, e) -> outstandingSend.set(false));
}
@Override
@@ -170,8 +169,7 @@
// } else {
// cf.whenComplete((r, e) -> outstandingSend.set(false));
// }
- cf.whenComplete((r, e) -> outstandingSend.set(false));
- return cf;
+ return cf.whenComplete((r, e) -> outstandingSend.set(false));
}
@Override
--- a/test/jdk/java/net/httpclient/websocket/WebSocketImplDriver.java Fri Dec 15 00:47:16 2017 +0300
+++ b/test/jdk/java/net/httpclient/websocket/WebSocketImplDriver.java Fri Dec 15 03:05:16 2017 +0300
@@ -24,6 +24,6 @@
/*
* @test
* @modules jdk.incubator.httpclient/jdk.incubator.http.internal.websocket:open
- * @run testng/othervm/timeout=30 --add-reads jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.WebSocketImplTest
+ * @run testng/othervm --add-reads jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.WebSocketImplTest
*/
public class WebSocketImplDriver { }
--- a/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/WebSocketImplTest.java Fri Dec 15 00:47:16 2017 +0300
+++ b/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/WebSocketImplTest.java Fri Dec 15 03:05:16 2017 +0300
@@ -72,6 +72,8 @@
// TODO: request in onClose/onError
// TODO: throw exception in onClose/onError
// TODO: exception is thrown from request()
+ // TODO: repeated sendClose complete normally
+ // TODO: default Close message is sent if IAE is thrown from sendClose
@Test
public void testNonPositiveRequest() throws Exception {
@@ -208,6 +210,72 @@
);
}
+ // Tease out "java.lang.IllegalStateException: Send pending" due to possible
+ // race between sending a message and replenishing the permit
+ @Test
+ public void testManyTextMessages() {
+ WebSocketImpl ws = newInstance(
+ new MockListener(1),
+ new TransportFactory() {
+ @Override
+ public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
+ MessageStreamConsumer consumer) {
+
+ final Random r = new Random();
+
+ return new MockTransport<>(sendResultSupplier, consumer) {
+ @Override
+ protected CompletableFuture<T> defaultSend() {
+ return millis(r.nextInt(100), result());
+ }
+ };
+ }
+ });
+ int NUM_MESSAGES = 512;
+ CompletableFuture<WebSocket> current = CompletableFuture.completedFuture(ws);
+ for (int i = 0; i < NUM_MESSAGES; i++) {
+ current = current.thenCompose(w -> w.sendText(" ", true));
+ }
+ current.join();
+ MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport();
+ assertEquals(transport.invocations().size(), NUM_MESSAGES);
+ }
+
+ @Test
+ public void testManyBinaryMessages() {
+ WebSocketImpl ws = newInstance(
+ new MockListener(1),
+ new TransportFactory() {
+ @Override
+ public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
+ MessageStreamConsumer consumer) {
+
+ final Random r = new Random();
+
+ return new MockTransport<>(sendResultSupplier, consumer) {
+ @Override
+ protected CompletableFuture<T> defaultSend() {
+ return millis(r.nextInt(150), result());
+ }
+ };
+ }
+ });
+ CompletableFuture<WebSocket> start = new CompletableFuture<>();
+
+ int NUM_MESSAGES = 512;
+ CompletableFuture<WebSocket> current = start;
+ for (int i = 0; i < NUM_MESSAGES; i++) {
+ current = current.thenComposeAsync(w -> w.sendBinary(ByteBuffer.allocate(1), true));
+ }
+
+ start.completeAsync(() -> ws);
+ current.join();
+
+ MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport();
+ assertEquals(transport.invocations().size(), NUM_MESSAGES);
+ }
+
+
@Test
public void sendTextImmediately() {
WebSocketImpl ws = newInstance(
@@ -349,6 +417,11 @@
.completeOnTimeout(result, sec, TimeUnit.SECONDS);
}
+ private static <T> CompletableFuture<T> millis(long sec, T result) {
+ return new CompletableFuture<T>()
+ .completeOnTimeout(result, sec, TimeUnit.MILLISECONDS);
+ }
+
private static <T> CompletableFuture<T> now(T result) {
return CompletableFuture.completedFuture(result);
}