http-client-branch: (WebSocket) testing sending messages http-client-branch
authorprappo
Thu, 23 Nov 2017 17:46:02 +0300
branchhttp-client-branch
changeset 55862 faa39b5ec8e1
parent 55861 0683f22cf2b9
child 55863 04c2bb7a1693
http-client-branch: (WebSocket) testing sending messages
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OutgoingMessage.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transmitter.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java
test/jdk/java/net/httpclient/websocket/SendingTestDriver.java
test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/SendingTest.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OutgoingMessage.java	Thu Nov 23 12:50:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OutgoingMessage.java	Thu Nov 23 17:46:02 2017 +0300
@@ -71,12 +71,13 @@
      * so it would be possible to defer the work it does until the most
      * convenient moment (up to the point where sentTo is invoked).
      */
-    protected void contextualize(Context context) {
+    protected boolean contextualize(Context context) {
         // masking and charset decoding should be performed here rather than in
         // the constructor (as of today)
         if (context.isCloseSent()) {
             throw new IllegalStateException("Close sent");
         }
+        return true;
     }
 
     protected boolean sendTo(RawChannel channel) throws IOException {
@@ -115,7 +116,7 @@
         }
 
         @Override
-        protected void contextualize(Context context) {
+        protected boolean contextualize(Context context) {
             super.contextualize(context);
             if (context.isPreviousBinary() && !context.isPreviousLast()) {
                 throw new IllegalStateException("Unexpected text message");
@@ -125,6 +126,7 @@
             context.setPreviousBinary(false);
             context.setPreviousText(true);
             context.setPreviousLast(isLast);
+            return true;
         }
     }
 
@@ -139,7 +141,7 @@
         }
 
         @Override
-        protected void contextualize(Context context) {
+        protected boolean contextualize(Context context) {
             super.contextualize(context);
             if (context.isPreviousText() && !context.isPreviousLast()) {
                 throw new IllegalStateException("Unexpected binary message");
@@ -150,6 +152,7 @@
             context.setPreviousText(false);
             context.setPreviousBinary(true);
             context.setPreviousLast(isLast);
+            return true;
         }
     }
 
@@ -195,9 +198,13 @@
         }
 
         @Override
-        protected void contextualize(Context context) {
-            super.contextualize(context);
-            context.setCloseSent();
+        protected boolean contextualize(Context context) {
+            if (context.isCloseSent()) {
+                return false;
+            } else {
+                context.setCloseSent();
+                return true;
+            }
         }
     }
 
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transmitter.java	Thu Nov 23 12:50:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transmitter.java	Thu Nov 23 17:46:02 2017 +0300
@@ -40,7 +40,7 @@
  * to accept a new message. Until then, the transmitter is considered "busy" and
  * an IllegalStateException will be thrown on each attempt to invoke send.
  */
-final class Transmitter {
+public class Transmitter {
 
     /* This flag is used solely for assertions */
     private final AtomicBoolean busy = new AtomicBoolean();
@@ -49,8 +49,8 @@
     private final RawChannel channel;
     private final RawChannel.RawEvent event;
 
-    Transmitter(RawChannel channel) {
-        this.channel = requireNonNull(channel);
+    public Transmitter(RawChannel channel) {
+        this.channel = channel;
         this.event = createHandler();
     }
 
@@ -59,7 +59,9 @@
      * A {@code StackOverflowError} may thus occur if there's a possibility
      * that this method is called again by the supplied handler.
      */
-    void send(OutgoingMessage message, Consumer<Exception> completionHandler) {
+    public void send(OutgoingMessage message,
+                     Consumer<Exception> completionHandler)
+    {
         requireNonNull(message);
         requireNonNull(completionHandler);
         if (!busy.compareAndSet(false, true)) {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java	Thu Nov 23 12:50:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java	Thu Nov 23 17:46:02 2017 +0300
@@ -65,14 +65,14 @@
 /*
  * A WebSocket client.
  */
-final class WebSocketImpl implements WebSocket {
+public final class WebSocketImpl implements WebSocket {
 
     private final URI uri;
     private final String subprotocol;
     private final RawChannel channel; /* Stored to call close() on */
     private final Listener listener;
 
-    private volatile boolean intputClosed;
+    private volatile boolean inputClosed;
     private volatile boolean outputClosed;
 
     /*
@@ -113,7 +113,7 @@
     private final CompletableFuture<?> channelInputClosed = new MinimalFuture<>();
     private final CompletableFuture<?> channelOutputClosed = new MinimalFuture<>();
 
-    static CompletableFuture<WebSocket> newInstanceAsync(BuilderImpl b) {
+    public static CompletableFuture<WebSocket> newInstanceAsync(BuilderImpl b) {
         Function<Result, WebSocket> newWebSocket = r -> {
             WebSocketImpl ws = new WebSocketImpl(b.getUri(),
                                                  r.subprotocol,
@@ -144,11 +144,25 @@
                   RawChannel channel,
                   Listener listener)
     {
+        this(uri,
+             subprotocol,
+             channel,
+             listener,
+             new Transmitter(channel));
+    }
+
+    /* Exposed for testing purposes */
+    WebSocketImpl(URI uri,
+                  String subprotocol,
+                  RawChannel channel,
+                  Listener listener,
+                  Transmitter transmitter)
+    {
         this.uri = requireNonNull(uri);
         this.subprotocol = requireNonNull(subprotocol);
         this.channel = requireNonNull(channel);
         this.listener = requireNonNull(listener);
-        this.transmitter = new Transmitter(channel);
+        this.transmitter = transmitter;
         this.receiver = new Receiver(messageConsumerOf(listener), channel);
         this.sendScheduler = new SequentialScheduler(new SendTask());
 
@@ -201,7 +215,7 @@
      * Processes a Close event that came from the channel. Invoked at most once.
      */
     private void processClose(int statusCode, String reason) {
-        intputClosed = true;
+        inputClosed = true;
         receiver.close();
         try {
             channel.shutdownInput();
@@ -364,24 +378,33 @@
             OutgoingMessage message = p.first;
             CompletableFuture<WebSocket> cf = p.second;
             try {
-                message.contextualize(context);
+                if (!message.contextualize(context)) { // Do not send the message
+                    cf.complete(null);
+                    repeat(taskCompleter);
+                    return;
+                }
                 Consumer<Exception> h = e -> {
                     if (e == null) {
                         cf.complete(WebSocketImpl.this);
                     } else {
                         cf.completeExceptionally(e);
                     }
-                    taskCompleter.complete();
-                    // More than a single message may have been enqueued while
-                    // the task has been busy with the current message, but
-                    // there only one signal is recorded
-                    sendScheduler.runOrSchedule();
+                    repeat(taskCompleter);
                 };
                 transmitter.send(message, h);
             } catch (Exception t) {
                 cf.completeExceptionally(t);
+                repeat(taskCompleter);
             }
         }
+
+        private void repeat(DeferredCompleter taskCompleter) {
+            taskCompleter.complete();
+            // More than a single message may have been enqueued while
+            // the task has been busy with the current message, but
+            // there is only a single signal recorded
+            sendScheduler.runOrSchedule();
+        }
     }
 
     @Override
@@ -401,12 +424,12 @@
 
     @Override
     public boolean isInputClosed() {
-        return intputClosed;
+        return inputClosed;
     }
 
     @Override
     public void abort() {
-        intputClosed = true;
+        inputClosed = true;
         outputClosed = true;
         try {
             channel.close();
@@ -504,7 +527,7 @@
 
             @Override
             public void onError(Exception error) {
-                intputClosed = true;
+                inputClosed = true;
                 outputClosed = true;
                 if (!(error instanceof FailWebSocketException)) {
                     abort();
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/websocket/SendingTestDriver.java	Thu Nov 23 17:46:02 2017 +0300
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * @test
+ * @modules jdk.incubator.httpclient/jdk.incubator.http.internal.websocket:open
+ * @compile/module=jdk.incubator.httpclient jdk/incubator/http/internal/websocket/TestSupport.java
+ * @run testng/othervm --add-reads jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.SendingTest
+ */
+public class SendingTestDriver { }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/SendingTest.java	Thu Nov 23 17:46:02 2017 +0300
@@ -0,0 +1,234 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.incubator.http.internal.websocket;
+
+import jdk.incubator.http.WebSocket;
+import org.testng.annotations.Test;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+import static jdk.incubator.http.WebSocket.NORMAL_CLOSURE;
+import static jdk.incubator.http.internal.websocket.TestSupport.assertCompletesExceptionally;
+import static org.testng.Assert.assertEquals;
+
+public class SendingTest {
+
+    @Test
+    public void sendTextImmediately() {
+        MockTransmitter t = new MockTransmitter() {
+            @Override
+            protected CompletionStage<?> whenSent() {
+                return CompletableFuture.completedFuture(null);
+            }
+        };
+        WebSocket ws = newWebSocket(t);
+        CompletableFuture.completedFuture(ws)
+                .thenCompose(w -> w.sendText("1", true))
+                .thenCompose(w -> w.sendText("2", true))
+                .thenCompose(w -> w.sendText("3", true))
+                .join();
+
+        assertEquals(t.queue().size(), 3);
+    }
+
+    @Test
+    public void sendTextWithDelay() {
+        MockTransmitter t = new MockTransmitter() {
+            @Override
+            protected CompletionStage<?> whenSent() {
+                return new CompletableFuture<>()
+                        .completeOnTimeout(null, 1, TimeUnit.SECONDS);
+            }
+        };
+        WebSocket ws = newWebSocket(t);
+        CompletableFuture.completedFuture(ws)
+                .thenCompose(w -> w.sendText("1", true))
+                .thenCompose(w -> w.sendText("2", true))
+                .thenCompose(w -> w.sendText("3", true))
+                .join();
+
+        assertEquals(t.queue().size(), 3);
+    }
+
+    @Test
+    public void sendTextMixedDelay() {
+        Random r = new Random();
+
+        MockTransmitter t = new MockTransmitter() {
+            @Override
+            protected CompletionStage<?> whenSent() {
+                return r.nextBoolean() ?
+                        new CompletableFuture<>().completeOnTimeout(null, 1, TimeUnit.SECONDS) :
+                        CompletableFuture.completedFuture(null);
+            }
+        };
+        WebSocket ws = newWebSocket(t);
+        CompletableFuture.completedFuture(ws)
+                .thenCompose(w -> w.sendText("1", true))
+                .thenCompose(w -> w.sendText("2", true))
+                .thenCompose(w -> w.sendText("3", true))
+                .thenCompose(w -> w.sendText("4", true))
+                .thenCompose(w -> w.sendText("5", true))
+                .thenCompose(w -> w.sendText("6", true))
+                .thenCompose(w -> w.sendText("7", true))
+                .thenCompose(w -> w.sendText("8", true))
+                .thenCompose(w -> w.sendText("9", true))
+                .join();
+
+        assertEquals(t.queue().size(), 9);
+    }
+
+    @Test
+    public void sendControlMessagesConcurrently() {
+
+        CompletableFuture<?> first = new CompletableFuture<>();
+
+        MockTransmitter t = new MockTransmitter() {
+
+            final AtomicInteger i = new AtomicInteger();
+
+            @Override
+            protected CompletionStage<?> whenSent() {
+                if (i.incrementAndGet() == 1) {
+                    return first;
+                } else {
+                    return CompletableFuture.completedFuture(null);
+                }
+            }
+        };
+        WebSocket ws = newWebSocket(t);
+
+        CompletableFuture<?> cf1 = ws.sendPing(ByteBuffer.allocate(0));
+        CompletableFuture<?> cf2 = ws.sendPong(ByteBuffer.allocate(0));
+        CompletableFuture<?> cf3 = ws.sendClose(NORMAL_CLOSURE, "");
+        CompletableFuture<?> cf4 = ws.sendClose(NORMAL_CLOSURE, "");
+        CompletableFuture<?> cf5 = ws.sendPing(ByteBuffer.allocate(0));
+        CompletableFuture<?> cf6 = ws.sendPong(ByteBuffer.allocate(0));
+
+        first.complete(null);
+        // Don't care about exceptional completion, only that all of them have
+        // completed
+        CompletableFuture.allOf(cf1, cf2, cf3, cf4, cf5, cf6)
+                .handle((v, e) -> null).join();
+
+        cf3.join(); /* Check that sendClose has completed normally */
+        cf4.join(); /* Check that repeated sendClose has completed normally */
+        assertCompletesExceptionally(IllegalStateException.class, cf5);
+        assertCompletesExceptionally(IllegalStateException.class, cf6);
+
+        assertEquals(t.queue().size(), 3); // 6 minus 3 that were not accepted
+    }
+
+    private static WebSocket newWebSocket(Transmitter transmitter) {
+        URI uri = URI.create("ws://localhost");
+        String subprotocol = "";
+        RawChannel channel = new RawChannel() {
+
+            @Override
+            public void registerEvent(RawEvent event) {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public ByteBuffer initialByteBuffer() {
+                return ByteBuffer.allocate(0);
+            }
+
+            @Override
+            public ByteBuffer read() {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public long write(ByteBuffer[] srcs, int offset, int length) {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public void shutdownInput() {
+            }
+
+            @Override
+            public void shutdownOutput() {
+            }
+
+            @Override
+            public void close() {
+            }
+        };
+        return new WebSocketImpl(
+                uri,
+                subprotocol,
+                channel,
+                new WebSocket.Listener() { }, transmitter);
+    }
+
+    private abstract class MockTransmitter extends Transmitter {
+
+        private final long startTime = System.currentTimeMillis();
+
+        private final Queue<OutgoingMessage> messages = new ConcurrentLinkedQueue<>();
+
+        public MockTransmitter() {
+            super(null);
+        }
+
+        @Override
+        public void send(OutgoingMessage message,
+                         Consumer<Exception> completionHandler) {
+            System.out.printf("[%6s ms.] begin send(%s)%n",
+                              System.currentTimeMillis() - startTime,
+                              message);
+            messages.add(message);
+            whenSent().whenComplete((r, e) -> {
+                System.out.printf("[%6s ms.] complete send(%s)%n",
+                                  System.currentTimeMillis() - startTime,
+                                  message);
+                if (e != null) {
+                    completionHandler.accept((Exception) e);
+                } else {
+                    completionHandler.accept(null);
+                }
+            });
+            System.out.printf("[%6s ms.] end send(%s)%n",
+                              System.currentTimeMillis() - startTime,
+                              message);
+        }
+
+        protected abstract CompletionStage<?> whenSent();
+
+        public Queue<OutgoingMessage> queue() {
+            return messages;
+        }
+    }
+}