test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/WebSocketImplTest.java
branchhttp-client-branch
changeset 55988 7f1e0cf933a6
child 55989 76ac25076fdc
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/WebSocketImplTest.java	Fri Dec 15 00:47:16 2017 +0300
@@ -0,0 +1,380 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.incubator.http.internal.websocket;
+
+import jdk.incubator.http.WebSocket;
+import org.testng.annotations.Test;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static jdk.incubator.http.WebSocket.MessagePart.FIRST;
+import static jdk.incubator.http.WebSocket.MessagePart.LAST;
+import static jdk.incubator.http.WebSocket.MessagePart.PART;
+import static jdk.incubator.http.WebSocket.MessagePart.WHOLE;
+import static jdk.incubator.http.WebSocket.NORMAL_CLOSURE;
+import static jdk.incubator.http.internal.websocket.MockListener.Invocation.onClose;
+import static jdk.incubator.http.internal.websocket.MockListener.Invocation.onError;
+import static jdk.incubator.http.internal.websocket.MockListener.Invocation.onOpen;
+import static jdk.incubator.http.internal.websocket.MockListener.Invocation.onPing;
+import static jdk.incubator.http.internal.websocket.MockListener.Invocation.onPong;
+import static jdk.incubator.http.internal.websocket.MockListener.Invocation.onText;
+import static jdk.incubator.http.internal.websocket.MockTransport.onClose;
+import static jdk.incubator.http.internal.websocket.MockTransport.onPing;
+import static jdk.incubator.http.internal.websocket.MockTransport.onPong;
+import static jdk.incubator.http.internal.websocket.MockTransport.onText;
+import static jdk.incubator.http.internal.websocket.TestSupport.assertCompletesExceptionally;
+import static org.testng.Assert.assertEquals;
+
+/*
+ * Formatting in this file may seem strange:
+ *
+ *  (
+ *   ( ...)
+ *  ...
+ *  )
+ *  ...
+ *
+ *  However there is a rationale behind it. Sometimes the level of argument
+ *  nesting is high, which makes it hard to manage parentheses.
+ */
+public class WebSocketImplTest {
+
+    // TODO: request in onClose/onError
+    // TODO: throw exception in onClose/onError
+    // TODO: exception is thrown from request()
+
+    @Test
+    public void testNonPositiveRequest() throws Exception {
+        MockListener listener = new MockListener(Long.MAX_VALUE) {
+            @Override
+            protected void onOpen0(WebSocket webSocket) {
+                webSocket.request(0);
+            }
+        };
+        WebSocket ws = newInstance(listener, List.of(now(onText("1", WHOLE))));
+        listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
+        List<MockListener.Invocation> invocations = listener.invocations();
+        assertEquals(
+                invocations,
+                List.of(
+                        onOpen(ws),
+                        onError(ws, IllegalArgumentException.class)
+                )
+        );
+    }
+
+    @Test
+    public void testText1() throws Exception {
+        MockListener listener = new MockListener(Long.MAX_VALUE);
+        WebSocket ws = newInstance(
+                listener,
+                List.of(
+                        now(onText("1", FIRST)),
+                        now(onText("2", PART)),
+                        now(onText("3", LAST)),
+                        now(onClose(NORMAL_CLOSURE, "no reason"))
+                )
+        );
+        listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
+        List<MockListener.Invocation> invocations = listener.invocations();
+        assertEquals(
+                invocations,
+                List.of(
+                        onOpen(ws),
+                        onText(ws, "1", FIRST),
+                        onText(ws, "2", PART),
+                        onText(ws, "3", LAST),
+                        onClose(ws, NORMAL_CLOSURE, "no reason")
+                )
+        );
+    }
+
+    @Test
+    public void testText2() throws Exception {
+        MockListener listener = new MockListener(Long.MAX_VALUE);
+        WebSocket ws = newInstance(
+                listener,
+                List.of(
+                        now(onText("1", FIRST)),
+                        seconds(1, onText("2", PART)),
+                        now(onText("3", LAST)),
+                        seconds(1, onClose(NORMAL_CLOSURE, "no reason"))
+                )
+        );
+        listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
+        List<MockListener.Invocation> invocations = listener.invocations();
+        assertEquals(
+                invocations,
+                List.of(
+                        onOpen(ws),
+                        onText(ws, "1", FIRST),
+                        onText(ws, "2", PART),
+                        onText(ws, "3", LAST),
+                        onClose(ws, NORMAL_CLOSURE, "no reason")
+                )
+        );
+    }
+
+    @Test
+    public void testTextIntermixedWithPongs() throws Exception {
+        MockListener listener = new MockListener(Long.MAX_VALUE);
+        WebSocket ws = newInstance(
+                listener,
+                List.of(
+                        now(onText("1", FIRST)),
+                        now(onText("2", PART)),
+                        now(onPong(ByteBuffer.allocate(16))),
+                        seconds(1, onPong(ByteBuffer.allocate(32))),
+                        now(onText("3", LAST)),
+                        now(onPong(ByteBuffer.allocate(64))),
+                        now(onClose(NORMAL_CLOSURE, "no reason"))
+                )
+        );
+        listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
+        List<MockListener.Invocation> invocations = listener.invocations();
+        assertEquals(
+                invocations,
+                List.of(
+                        onOpen(ws),
+                        onText(ws, "1", FIRST),
+                        onText(ws, "2", PART),
+                        onPong(ws, ByteBuffer.allocate(16)),
+                        onPong(ws, ByteBuffer.allocate(32)),
+                        onText(ws, "3", LAST),
+                        onPong(ws, ByteBuffer.allocate(64)),
+                        onClose(ws, NORMAL_CLOSURE, "no reason")
+                )
+        );
+    }
+
+    @Test
+    public void testTextIntermixedWithPings() throws Exception {
+        MockListener listener = new MockListener(Long.MAX_VALUE);
+        WebSocket ws = newInstance(
+                listener,
+                List.of(
+                        now(onText("1", FIRST)),
+                        now(onText("2", PART)),
+                        now(onPing(ByteBuffer.allocate(16))),
+                        seconds(1, onPing(ByteBuffer.allocate(32))),
+                        now(onText("3", LAST)),
+                        now(onPing(ByteBuffer.allocate(64))),
+                        now(onClose(NORMAL_CLOSURE, "no reason"))
+                )
+        );
+        listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
+        List<MockListener.Invocation> invocations = listener.invocations();
+        assertEquals(
+                invocations,
+                List.of(
+                        onOpen(ws),
+                        onText(ws, "1", FIRST),
+                        onText(ws, "2", PART),
+                        onPing(ws, ByteBuffer.allocate(16)),
+                        onPing(ws, ByteBuffer.allocate(32)),
+                        onText(ws, "3", LAST),
+                        onPing(ws, ByteBuffer.allocate(64)),
+                        onClose(ws, NORMAL_CLOSURE, "no reason"))
+        );
+    }
+
+    @Test
+    public void sendTextImmediately() {
+        WebSocketImpl ws = newInstance(
+                new MockListener(1),
+                new TransportFactory() {
+                    @Override
+                    public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
+                                                            MessageStreamConsumer consumer) {
+                        return new MockTransport<>(sendResultSupplier, consumer);
+                    }
+                });
+        CompletableFuture.completedFuture(ws)
+                .thenCompose(w -> w.sendText("1", true))
+                .thenCompose(w -> w.sendText("2", true))
+                .thenCompose(w -> w.sendText("3", true))
+                .join();
+        MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport();
+        assertEquals(transport.invocations().size(), 3);
+    }
+
+    @Test
+    public void sendTextWithDelay() {
+        MockListener listener = new MockListener(1);
+        WebSocketImpl ws = newInstance(
+                listener,
+                new TransportFactory() {
+                    @Override
+                    public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
+                                                            MessageStreamConsumer consumer) {
+                        return new MockTransport<>(sendResultSupplier, consumer) {
+                            @Override
+                            protected CompletableFuture<T> defaultSend() {
+                                return seconds(1, result());
+                            }
+                        };
+                    }
+                });
+        CompletableFuture.completedFuture(ws)
+                .thenCompose(w -> w.sendText("1", true))
+                .thenCompose(w -> w.sendText("2", true))
+                .thenCompose(w -> w.sendText("3", true))
+                .join();
+        assertEquals(listener.invocations(), List.of(onOpen(ws)));
+        MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport();
+        assertEquals(transport.invocations().size(), 3);
+    }
+
+    @Test
+    public void sendTextMixedDelay() {
+        MockListener listener = new MockListener(1);
+        WebSocketImpl ws = newInstance(
+                listener,
+                new TransportFactory() {
+
+                    final Random r = new Random();
+
+                    @Override
+                    public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
+                                                            MessageStreamConsumer consumer) {
+                        return new MockTransport<>(sendResultSupplier, consumer) {
+                            @Override
+                            protected CompletableFuture<T> defaultSend() {
+                                return r.nextBoolean()
+                                        ? seconds(1, result())
+                                        : now(result());
+                            }
+                        };
+                    }
+                });
+        CompletableFuture.completedFuture(ws)
+                .thenCompose(w -> w.sendText("1", true))
+                .thenCompose(w -> w.sendText("2", true))
+                .thenCompose(w -> w.sendText("3", true))
+                .thenCompose(w -> w.sendText("4", true))
+                .thenCompose(w -> w.sendText("5", true))
+                .thenCompose(w -> w.sendText("6", true))
+                .thenCompose(w -> w.sendText("7", true))
+                .thenCompose(w -> w.sendText("8", true))
+                .thenCompose(w -> w.sendText("9", true))
+                .join();
+        assertEquals(listener.invocations(), List.of(onOpen(ws)));
+        MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport();
+        assertEquals(transport.invocations().size(), 9);
+    }
+
+    @Test(enabled = false) // temporarily disabled
+    public void sendControlMessagesConcurrently() {
+        MockListener listener = new MockListener(1);
+
+        CompletableFuture<?> first = new CompletableFuture<>(); // barrier
+
+        WebSocketImpl ws = newInstance(
+                listener,
+                new TransportFactory() {
+
+                    final AtomicInteger i = new AtomicInteger();
+
+                    @Override
+                    public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
+                                                            MessageStreamConsumer consumer) {
+                        return new MockTransport<>(sendResultSupplier, consumer) {
+                            @Override
+                            protected CompletableFuture<T> defaultSend() {
+                                if (i.incrementAndGet() == 1) {
+                                    return first.thenApply(o -> result());
+                                } else {
+                                    return now(result());
+                                }
+                            }
+                        };
+                    }
+                });
+
+        CompletableFuture<?> cf1 = ws.sendPing(ByteBuffer.allocate(0));
+        CompletableFuture<?> cf2 = ws.sendPong(ByteBuffer.allocate(0));
+        CompletableFuture<?> cf3 = ws.sendClose(NORMAL_CLOSURE, "");
+        CompletableFuture<?> cf4 = ws.sendClose(NORMAL_CLOSURE, "");
+        CompletableFuture<?> cf5 = ws.sendPing(ByteBuffer.allocate(0));
+        CompletableFuture<?> cf6 = ws.sendPong(ByteBuffer.allocate(0));
+
+        first.complete(null);
+        // Don't care about exceptional completion, only that all of them have
+        // completed
+        CompletableFuture.allOf(cf1, cf2, cf3, cf4, cf5, cf6)
+                .handle((v, e) -> null).join();
+
+        cf3.join(); /* Check that sendClose has completed normally */
+        cf4.join(); /* Check that repeated sendClose has completed normally */
+        assertCompletesExceptionally(IllegalStateException.class, cf5);
+        assertCompletesExceptionally(IllegalStateException.class, cf6);
+
+        assertEquals(listener.invocations(), List.of(onOpen(ws)));
+        MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport();
+        assertEquals(transport.invocations().size(), 3); // 6 minus 3 that were not accepted
+    }
+
+    private static <T> CompletableFuture<T> seconds(long sec, T result) {
+        return new CompletableFuture<T>()
+                .completeOnTimeout(result, sec, TimeUnit.SECONDS);
+    }
+
+    private static <T> CompletableFuture<T> now(T result) {
+        return CompletableFuture.completedFuture(result);
+    }
+
+    private static WebSocketImpl newInstance(
+            WebSocket.Listener listener,
+            Collection<CompletableFuture<Consumer<MessageStreamConsumer>>> input) {
+        TransportFactory factory = new TransportFactory() {
+            @Override
+            public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
+                                                    MessageStreamConsumer consumer) {
+                return new MockTransport<T>(sendResultSupplier, consumer) {
+                    @Override
+                    protected Collection<CompletableFuture<Consumer<MessageStreamConsumer>>> receive() {
+                        return input;
+                    }
+                };
+            }
+        };
+        return newInstance(listener, factory);
+    }
+
+    private static WebSocketImpl newInstance(WebSocket.Listener listener,
+                                             TransportFactory factory) {
+        URI uri = URI.create("ws://localhost");
+        String subprotocol = "";
+        return WebSocketImpl.newInstance(uri, subprotocol, listener, factory);
+    }
+}