http-client-branch: (WebSocket) refactoring http-client-branch
authorprappo
Fri, 15 Dec 2017 03:05:16 +0300
branchhttp-client-branch
changeset 55989 76ac25076fdc
parent 55988 7f1e0cf933a6
child 55990 002db7829808
http-client-branch: (WebSocket) refactoring
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transport.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/TransportImpl.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java
test/jdk/java/net/httpclient/websocket/WebSocketImplDriver.java
test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/WebSocketImplTest.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transport.java	Fri Dec 15 00:47:16 2017 +0300
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transport.java	Fri Dec 15 03:05:16 2017 +0300
@@ -30,6 +30,25 @@
 import java.util.concurrent.CompletableFuture;
 
 /*
+ * Transport needs some way to asynchronously notify the send operation has been
+ * completed. It can have several different designs each of which has its own
+ * pros and cons:
+ *
+ *     (1) void sendMessage(..., Callback)
+ *     (2) CompletableFuture<T> sendMessage(...)
+ *     (3) CompletableFuture<T> sendMessage(..., Callback)
+ *     (4) boolean sendMessage(..., Callback) throws IOException
+ *     ...
+ *
+ * If Transport's users use CFs, (1) forces these users to create CFs and pass
+ * them to the callback. If any additional (dependant) action needs to be
+ * attached to the returned CF, this means an extra object (CF) must be created
+ * in (2). (3) and (4) solves both issues, however (4) does not abstract out
+ * when exactly the operation has been performed. So the handling code needs to
+ * be repeated twice. And that leads to 2 different code paths (more bugs).
+ * Unless designed for this, the user should not assume any specific order of
+ * completion in (3) (e.g. callback first and then the returned CF).
+ *
  * The only parametrization of Transport<T> used is Transport<WebSocket>. The
  * type parameter T was introduced solely to avoid circular dependency between
  * Transport and WebSocket. After all, instances of T are used solely to
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/TransportImpl.java	Fri Dec 15 00:47:16 2017 +0300
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/TransportImpl.java	Fri Dec 15 03:05:16 2017 +0300
@@ -124,9 +124,9 @@
     private void send0(OutgoingMessage message, Consumer<Exception> handler) {
         boolean b = busy.get();
         assert b; // Please don't inline this, as busy.get() has memory
-        // visibility effects and we don't want the program behaviour
-        // to depend on whether the assertions are turned on
-        // or turned off
+                  // visibility effects and we don't want the program behaviour
+                  // to depend on whether the assertions are turned on
+                  // or turned off
         try {
             boolean sent = message.sendTo(channel);
             if (sent) {
@@ -203,7 +203,7 @@
             CompletableFuture<T> cf = p.second;
             try {
                 if (!message.contextualize(context)) { // Do not send the message
-                    cf.complete(null);
+                    cf.complete(resultSupplier.get());
                     repeat(taskCompleter);
                     return;
                 }
@@ -256,9 +256,9 @@
 
     @Override
     public void acknowledgeReception() {
-        long x = demand.decreaseAndGet(1);
-        if (x < 0) {
-            throw new InternalError(String.valueOf(x));
+        boolean decremented = demand.tryDecrement();
+        if (!decremented) {
+            throw new InternalError();
         }
     }
 
@@ -320,7 +320,7 @@
     }
 
     /*
-     * Stops the machinery from reading and delivering messages permanently,
+     * Permanently stops reading from the channel and delivering messages
      * regardless of the current demand and data availability.
      */
     @Override
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java	Fri Dec 15 00:47:16 2017 +0300
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java	Fri Dec 15 03:05:16 2017 +0300
@@ -153,8 +153,7 @@
             return failedFuture(new IllegalStateException("Send pending"));
         }
         CompletableFuture<WebSocket> cf = transport.sendText(message, isLast);
-        cf.whenComplete((r, e) -> outstandingSend.set(false));
-        return cf;
+        return cf.whenComplete((r, e) -> outstandingSend.set(false));
     }
 
     @Override
@@ -170,8 +169,7 @@
         //        } else {
         //            cf.whenComplete((r, e) -> outstandingSend.set(false));
         //        }
-        cf.whenComplete((r, e) -> outstandingSend.set(false));
-        return cf;
+        return cf.whenComplete((r, e) -> outstandingSend.set(false));
     }
 
     @Override
--- a/test/jdk/java/net/httpclient/websocket/WebSocketImplDriver.java	Fri Dec 15 00:47:16 2017 +0300
+++ b/test/jdk/java/net/httpclient/websocket/WebSocketImplDriver.java	Fri Dec 15 03:05:16 2017 +0300
@@ -24,6 +24,6 @@
 /*
  * @test
  * @modules jdk.incubator.httpclient/jdk.incubator.http.internal.websocket:open
- * @run testng/othervm/timeout=30 --add-reads jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.WebSocketImplTest
+ * @run testng/othervm --add-reads jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.WebSocketImplTest
  */
 public class WebSocketImplDriver { }
--- a/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/WebSocketImplTest.java	Fri Dec 15 00:47:16 2017 +0300
+++ b/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/WebSocketImplTest.java	Fri Dec 15 03:05:16 2017 +0300
@@ -72,6 +72,8 @@
     // TODO: request in onClose/onError
     // TODO: throw exception in onClose/onError
     // TODO: exception is thrown from request()
+    // TODO: repeated sendClose complete normally
+    // TODO: default Close message is sent if IAE is thrown from sendClose
 
     @Test
     public void testNonPositiveRequest() throws Exception {
@@ -208,6 +210,72 @@
         );
     }
 
+    // Tease out "java.lang.IllegalStateException: Send pending" due to possible
+    // race between sending a message and replenishing the permit
+    @Test
+    public void testManyTextMessages() {
+        WebSocketImpl ws = newInstance(
+                new MockListener(1),
+                new TransportFactory() {
+                    @Override
+                    public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
+                                                            MessageStreamConsumer consumer) {
+
+                        final Random r = new Random();
+
+                        return new MockTransport<>(sendResultSupplier, consumer) {
+                            @Override
+                            protected CompletableFuture<T> defaultSend() {
+                                return millis(r.nextInt(100), result());
+                            }
+                        };
+                    }
+                });
+        int NUM_MESSAGES = 512;
+        CompletableFuture<WebSocket> current = CompletableFuture.completedFuture(ws);
+        for (int i = 0; i < NUM_MESSAGES; i++) {
+            current = current.thenCompose(w -> w.sendText(" ", true));
+        }
+        current.join();
+        MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport();
+        assertEquals(transport.invocations().size(), NUM_MESSAGES);
+    }
+
+    @Test
+    public void testManyBinaryMessages() {
+        WebSocketImpl ws = newInstance(
+                new MockListener(1),
+                new TransportFactory() {
+                    @Override
+                    public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
+                                                            MessageStreamConsumer consumer) {
+
+                        final Random r = new Random();
+
+                        return new MockTransport<>(sendResultSupplier, consumer) {
+                            @Override
+                            protected CompletableFuture<T> defaultSend() {
+                                return millis(r.nextInt(150), result());
+                            }
+                        };
+                    }
+                });
+        CompletableFuture<WebSocket> start = new CompletableFuture<>();
+
+        int NUM_MESSAGES = 512;
+        CompletableFuture<WebSocket> current = start;
+        for (int i = 0; i < NUM_MESSAGES; i++) {
+            current = current.thenComposeAsync(w -> w.sendBinary(ByteBuffer.allocate(1), true));
+        }
+
+        start.completeAsync(() -> ws);
+        current.join();
+
+        MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport();
+        assertEquals(transport.invocations().size(), NUM_MESSAGES);
+    }
+
+
     @Test
     public void sendTextImmediately() {
         WebSocketImpl ws = newInstance(
@@ -349,6 +417,11 @@
                 .completeOnTimeout(result, sec, TimeUnit.SECONDS);
     }
 
+    private static <T> CompletableFuture<T> millis(long sec, T result) {
+        return new CompletableFuture<T>()
+                .completeOnTimeout(result, sec, TimeUnit.MILLISECONDS);
+    }
+
     private static <T> CompletableFuture<T> now(T result) {
         return CompletableFuture.completedFuture(result);
     }