test/jdk/java/net/httpclient/websocket/java.net.http/java/net/http/internal/websocket/WebSocketImplTest.java
branchhttp-client-branch
changeset 56092 fd85b2bf2b0d
parent 56091 aedd6133e7a0
child 56093 22d94c4a3641
equal deleted inserted replaced
56091:aedd6133e7a0 56092:fd85b2bf2b0d
     1 /*
       
     2  * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.
       
     8  *
       
     9  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    12  * version 2 for more details (a copy is included in the LICENSE file that
       
    13  * accompanied this code).
       
    14  *
       
    15  * You should have received a copy of the GNU General Public License version
       
    16  * 2 along with this work; if not, write to the Free Software Foundation,
       
    17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    18  *
       
    19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    20  * or visit www.oracle.com if you need additional information or have any
       
    21  * questions.
       
    22  */
       
    23 
       
    24 package java.net.http.internal.websocket;
       
    25 
       
    26 import java.net.http.WebSocket;
       
    27 import org.testng.annotations.Test;
       
    28 
       
    29 import java.net.URI;
       
    30 import java.nio.ByteBuffer;
       
    31 import java.util.Collection;
       
    32 import java.util.List;
       
    33 import java.util.Random;
       
    34 import java.util.concurrent.CompletableFuture;
       
    35 import java.util.concurrent.TimeUnit;
       
    36 import java.util.concurrent.atomic.AtomicInteger;
       
    37 import java.util.function.Consumer;
       
    38 import java.util.function.Supplier;
       
    39 
       
    40 import static java.net.http.WebSocket.MessagePart.FIRST;
       
    41 import static java.net.http.WebSocket.MessagePart.LAST;
       
    42 import static java.net.http.WebSocket.MessagePart.PART;
       
    43 import static java.net.http.WebSocket.MessagePart.WHOLE;
       
    44 import static java.net.http.WebSocket.NORMAL_CLOSURE;
       
    45 import static java.net.http.internal.websocket.MockListener.Invocation.onClose;
       
    46 import static java.net.http.internal.websocket.MockListener.Invocation.onError;
       
    47 import static java.net.http.internal.websocket.MockListener.Invocation.onOpen;
       
    48 import static java.net.http.internal.websocket.MockListener.Invocation.onPing;
       
    49 import static java.net.http.internal.websocket.MockListener.Invocation.onPong;
       
    50 import static java.net.http.internal.websocket.MockListener.Invocation.onText;
       
    51 import static java.net.http.internal.websocket.MockTransport.onClose;
       
    52 import static java.net.http.internal.websocket.MockTransport.onPing;
       
    53 import static java.net.http.internal.websocket.MockTransport.onPong;
       
    54 import static java.net.http.internal.websocket.MockTransport.onText;
       
    55 import static java.net.http.internal.websocket.TestSupport.assertCompletesExceptionally;
       
    56 import static org.testng.Assert.assertEquals;
       
    57 
       
    58 /*
       
    59  * Formatting in this file may seem strange:
       
    60  *
       
    61  *  (
       
    62  *   ( ...)
       
    63  *  ...
       
    64  *  )
       
    65  *  ...
       
    66  *
       
    67  *  However there is a rationale behind it. Sometimes the level of argument
       
    68  *  nesting is high, which makes it hard to manage parentheses.
       
    69  */
       
    70 public class WebSocketImplTest {
       
    71 
       
    72     // TODO: request in onClose/onError
       
    73     // TODO: throw exception in onClose/onError
       
    74     // TODO: exception is thrown from request()
       
    75     // TODO: repeated sendClose complete normally
       
    76     // TODO: default Close message is sent if IAE is thrown from sendClose
       
    77 
       
    78     @Test
       
    79     public void testNonPositiveRequest() throws Exception {
       
    80         MockListener listener = new MockListener(Long.MAX_VALUE) {
       
    81             @Override
       
    82             protected void onOpen0(WebSocket webSocket) {
       
    83                 webSocket.request(0);
       
    84             }
       
    85         };
       
    86         WebSocket ws = newInstance(listener, List.of(now(onText("1", WHOLE))));
       
    87         listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
       
    88         List<MockListener.Invocation> invocations = listener.invocations();
       
    89         assertEquals(
       
    90                 invocations,
       
    91                 List.of(
       
    92                         onOpen(ws),
       
    93                         onError(ws, IllegalArgumentException.class)
       
    94                 )
       
    95         );
       
    96     }
       
    97 
       
    98     @Test
       
    99     public void testText1() throws Exception {
       
   100         MockListener listener = new MockListener(Long.MAX_VALUE);
       
   101         WebSocket ws = newInstance(
       
   102                 listener,
       
   103                 List.of(
       
   104                         now(onText("1", FIRST)),
       
   105                         now(onText("2", PART)),
       
   106                         now(onText("3", LAST)),
       
   107                         now(onClose(NORMAL_CLOSURE, "no reason"))
       
   108                 )
       
   109         );
       
   110         listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
       
   111         List<MockListener.Invocation> invocations = listener.invocations();
       
   112         assertEquals(
       
   113                 invocations,
       
   114                 List.of(
       
   115                         onOpen(ws),
       
   116                         onText(ws, "1", FIRST),
       
   117                         onText(ws, "2", PART),
       
   118                         onText(ws, "3", LAST),
       
   119                         onClose(ws, NORMAL_CLOSURE, "no reason")
       
   120                 )
       
   121         );
       
   122     }
       
   123 
       
   124     @Test
       
   125     public void testText2() throws Exception {
       
   126         MockListener listener = new MockListener(Long.MAX_VALUE);
       
   127         WebSocket ws = newInstance(
       
   128                 listener,
       
   129                 List.of(
       
   130                         now(onText("1", FIRST)),
       
   131                         seconds(1, onText("2", PART)),
       
   132                         now(onText("3", LAST)),
       
   133                         seconds(1, onClose(NORMAL_CLOSURE, "no reason"))
       
   134                 )
       
   135         );
       
   136         listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
       
   137         List<MockListener.Invocation> invocations = listener.invocations();
       
   138         assertEquals(
       
   139                 invocations,
       
   140                 List.of(
       
   141                         onOpen(ws),
       
   142                         onText(ws, "1", FIRST),
       
   143                         onText(ws, "2", PART),
       
   144                         onText(ws, "3", LAST),
       
   145                         onClose(ws, NORMAL_CLOSURE, "no reason")
       
   146                 )
       
   147         );
       
   148     }
       
   149 
       
   150     @Test
       
   151     public void testTextIntermixedWithPongs() throws Exception {
       
   152         MockListener listener = new MockListener(Long.MAX_VALUE);
       
   153         WebSocket ws = newInstance(
       
   154                 listener,
       
   155                 List.of(
       
   156                         now(onText("1", FIRST)),
       
   157                         now(onText("2", PART)),
       
   158                         now(onPong(ByteBuffer.allocate(16))),
       
   159                         seconds(1, onPong(ByteBuffer.allocate(32))),
       
   160                         now(onText("3", LAST)),
       
   161                         now(onPong(ByteBuffer.allocate(64))),
       
   162                         now(onClose(NORMAL_CLOSURE, "no reason"))
       
   163                 )
       
   164         );
       
   165         listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
       
   166         List<MockListener.Invocation> invocations = listener.invocations();
       
   167         assertEquals(
       
   168                 invocations,
       
   169                 List.of(
       
   170                         onOpen(ws),
       
   171                         onText(ws, "1", FIRST),
       
   172                         onText(ws, "2", PART),
       
   173                         onPong(ws, ByteBuffer.allocate(16)),
       
   174                         onPong(ws, ByteBuffer.allocate(32)),
       
   175                         onText(ws, "3", LAST),
       
   176                         onPong(ws, ByteBuffer.allocate(64)),
       
   177                         onClose(ws, NORMAL_CLOSURE, "no reason")
       
   178                 )
       
   179         );
       
   180     }
       
   181 
       
   182     @Test
       
   183     public void testTextIntermixedWithPings() throws Exception {
       
   184         MockListener listener = new MockListener(Long.MAX_VALUE);
       
   185         WebSocket ws = newInstance(
       
   186                 listener,
       
   187                 List.of(
       
   188                         now(onText("1", FIRST)),
       
   189                         now(onText("2", PART)),
       
   190                         now(onPing(ByteBuffer.allocate(16))),
       
   191                         seconds(1, onPing(ByteBuffer.allocate(32))),
       
   192                         now(onText("3", LAST)),
       
   193                         now(onPing(ByteBuffer.allocate(64))),
       
   194                         now(onClose(NORMAL_CLOSURE, "no reason"))
       
   195                 )
       
   196         );
       
   197         listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
       
   198         List<MockListener.Invocation> invocations = listener.invocations();
       
   199         assertEquals(
       
   200                 invocations,
       
   201                 List.of(
       
   202                         onOpen(ws),
       
   203                         onText(ws, "1", FIRST),
       
   204                         onText(ws, "2", PART),
       
   205                         onPing(ws, ByteBuffer.allocate(16)),
       
   206                         onPing(ws, ByteBuffer.allocate(32)),
       
   207                         onText(ws, "3", LAST),
       
   208                         onPing(ws, ByteBuffer.allocate(64)),
       
   209                         onClose(ws, NORMAL_CLOSURE, "no reason"))
       
   210         );
       
   211     }
       
   212 
       
   213     // Tease out "java.lang.IllegalStateException: Send pending" due to possible
       
   214     // race between sending a message and replenishing the permit
       
   215     @Test
       
   216     public void testManyTextMessages() {
       
   217         WebSocketImpl ws = newInstance(
       
   218                 new MockListener(1),
       
   219                 new TransportFactory() {
       
   220                     @Override
       
   221                     public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
       
   222                                                             MessageStreamConsumer consumer) {
       
   223 
       
   224                         final Random r = new Random();
       
   225 
       
   226                         return new MockTransport<>(sendResultSupplier, consumer) {
       
   227                             @Override
       
   228                             protected CompletableFuture<T> defaultSend() {
       
   229                                 return millis(r.nextInt(100), result());
       
   230                             }
       
   231                         };
       
   232                     }
       
   233                 });
       
   234         int NUM_MESSAGES = 512;
       
   235         CompletableFuture<WebSocket> current = CompletableFuture.completedFuture(ws);
       
   236         for (int i = 0; i < NUM_MESSAGES; i++) {
       
   237             current = current.thenCompose(w -> w.sendText(" ", true));
       
   238         }
       
   239         current.join();
       
   240         MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport();
       
   241         assertEquals(transport.invocations().size(), NUM_MESSAGES);
       
   242     }
       
   243 
       
   244     @Test
       
   245     public void testManyBinaryMessages() {
       
   246         WebSocketImpl ws = newInstance(
       
   247                 new MockListener(1),
       
   248                 new TransportFactory() {
       
   249                     @Override
       
   250                     public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
       
   251                                                             MessageStreamConsumer consumer) {
       
   252 
       
   253                         final Random r = new Random();
       
   254 
       
   255                         return new MockTransport<>(sendResultSupplier, consumer) {
       
   256                             @Override
       
   257                             protected CompletableFuture<T> defaultSend() {
       
   258                                 return millis(r.nextInt(150), result());
       
   259                             }
       
   260                         };
       
   261                     }
       
   262                 });
       
   263         CompletableFuture<WebSocket> start = new CompletableFuture<>();
       
   264 
       
   265         int NUM_MESSAGES = 512;
       
   266         CompletableFuture<WebSocket> current = start;
       
   267         for (int i = 0; i < NUM_MESSAGES; i++) {
       
   268             current = current.thenComposeAsync(w -> w.sendBinary(ByteBuffer.allocate(1), true));
       
   269         }
       
   270 
       
   271         start.completeAsync(() -> ws);
       
   272         current.join();
       
   273 
       
   274         MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport();
       
   275         assertEquals(transport.invocations().size(), NUM_MESSAGES);
       
   276     }
       
   277 
       
   278 
       
   279     @Test
       
   280     public void sendTextImmediately() {
       
   281         WebSocketImpl ws = newInstance(
       
   282                 new MockListener(1),
       
   283                 new TransportFactory() {
       
   284                     @Override
       
   285                     public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
       
   286                                                             MessageStreamConsumer consumer) {
       
   287                         return new MockTransport<>(sendResultSupplier, consumer);
       
   288                     }
       
   289                 });
       
   290         CompletableFuture.completedFuture(ws)
       
   291                 .thenCompose(w -> w.sendText("1", true))
       
   292                 .thenCompose(w -> w.sendText("2", true))
       
   293                 .thenCompose(w -> w.sendText("3", true))
       
   294                 .join();
       
   295         MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport();
       
   296         assertEquals(transport.invocations().size(), 3);
       
   297     }
       
   298 
       
   299     @Test
       
   300     public void sendTextWithDelay() {
       
   301         MockListener listener = new MockListener(1);
       
   302         WebSocketImpl ws = newInstance(
       
   303                 listener,
       
   304                 new TransportFactory() {
       
   305                     @Override
       
   306                     public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
       
   307                                                             MessageStreamConsumer consumer) {
       
   308                         return new MockTransport<>(sendResultSupplier, consumer) {
       
   309                             @Override
       
   310                             protected CompletableFuture<T> defaultSend() {
       
   311                                 return seconds(1, result());
       
   312                             }
       
   313                         };
       
   314                     }
       
   315                 });
       
   316         CompletableFuture.completedFuture(ws)
       
   317                 .thenCompose(w -> w.sendText("1", true))
       
   318                 .thenCompose(w -> w.sendText("2", true))
       
   319                 .thenCompose(w -> w.sendText("3", true))
       
   320                 .join();
       
   321         assertEquals(listener.invocations(), List.of(onOpen(ws)));
       
   322         MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport();
       
   323         assertEquals(transport.invocations().size(), 3);
       
   324     }
       
   325 
       
   326     @Test
       
   327     public void sendTextMixedDelay() {
       
   328         MockListener listener = new MockListener(1);
       
   329         WebSocketImpl ws = newInstance(
       
   330                 listener,
       
   331                 new TransportFactory() {
       
   332 
       
   333                     final Random r = new Random();
       
   334 
       
   335                     @Override
       
   336                     public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
       
   337                                                             MessageStreamConsumer consumer) {
       
   338                         return new MockTransport<>(sendResultSupplier, consumer) {
       
   339                             @Override
       
   340                             protected CompletableFuture<T> defaultSend() {
       
   341                                 return r.nextBoolean()
       
   342                                         ? seconds(1, result())
       
   343                                         : now(result());
       
   344                             }
       
   345                         };
       
   346                     }
       
   347                 });
       
   348         CompletableFuture.completedFuture(ws)
       
   349                 .thenCompose(w -> w.sendText("1", true))
       
   350                 .thenCompose(w -> w.sendText("2", true))
       
   351                 .thenCompose(w -> w.sendText("3", true))
       
   352                 .thenCompose(w -> w.sendText("4", true))
       
   353                 .thenCompose(w -> w.sendText("5", true))
       
   354                 .thenCompose(w -> w.sendText("6", true))
       
   355                 .thenCompose(w -> w.sendText("7", true))
       
   356                 .thenCompose(w -> w.sendText("8", true))
       
   357                 .thenCompose(w -> w.sendText("9", true))
       
   358                 .join();
       
   359         assertEquals(listener.invocations(), List.of(onOpen(ws)));
       
   360         MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport();
       
   361         assertEquals(transport.invocations().size(), 9);
       
   362     }
       
   363 
       
   364     @Test(enabled = false) // temporarily disabled
       
   365     public void sendControlMessagesConcurrently() {
       
   366         MockListener listener = new MockListener(1);
       
   367 
       
   368         CompletableFuture<?> first = new CompletableFuture<>(); // barrier
       
   369 
       
   370         WebSocketImpl ws = newInstance(
       
   371                 listener,
       
   372                 new TransportFactory() {
       
   373 
       
   374                     final AtomicInteger i = new AtomicInteger();
       
   375 
       
   376                     @Override
       
   377                     public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
       
   378                                                             MessageStreamConsumer consumer) {
       
   379                         return new MockTransport<>(sendResultSupplier, consumer) {
       
   380                             @Override
       
   381                             protected CompletableFuture<T> defaultSend() {
       
   382                                 if (i.incrementAndGet() == 1) {
       
   383                                     return first.thenApply(o -> result());
       
   384                                 } else {
       
   385                                     return now(result());
       
   386                                 }
       
   387                             }
       
   388                         };
       
   389                     }
       
   390                 });
       
   391 
       
   392         CompletableFuture<?> cf1 = ws.sendPing(ByteBuffer.allocate(0));
       
   393         CompletableFuture<?> cf2 = ws.sendPong(ByteBuffer.allocate(0));
       
   394         CompletableFuture<?> cf3 = ws.sendClose(NORMAL_CLOSURE, "");
       
   395         CompletableFuture<?> cf4 = ws.sendClose(NORMAL_CLOSURE, "");
       
   396         CompletableFuture<?> cf5 = ws.sendPing(ByteBuffer.allocate(0));
       
   397         CompletableFuture<?> cf6 = ws.sendPong(ByteBuffer.allocate(0));
       
   398 
       
   399         first.complete(null);
       
   400         // Don't care about exceptional completion, only that all of them have
       
   401         // completed
       
   402         CompletableFuture.allOf(cf1, cf2, cf3, cf4, cf5, cf6)
       
   403                 .handle((v, e) -> null).join();
       
   404 
       
   405         cf3.join(); /* Check that sendClose has completed normally */
       
   406         cf4.join(); /* Check that repeated sendClose has completed normally */
       
   407         assertCompletesExceptionally(IllegalStateException.class, cf5);
       
   408         assertCompletesExceptionally(IllegalStateException.class, cf6);
       
   409 
       
   410         assertEquals(listener.invocations(), List.of(onOpen(ws)));
       
   411         MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport();
       
   412         assertEquals(transport.invocations().size(), 3); // 6 minus 3 that were not accepted
       
   413     }
       
   414 
       
   415     private static <T> CompletableFuture<T> seconds(long val, T result) {
       
   416         return new CompletableFuture<T>()
       
   417                 .completeOnTimeout(result, val, TimeUnit.SECONDS);
       
   418     }
       
   419 
       
   420     private static <T> CompletableFuture<T> millis(long val, T result) {
       
   421         return new CompletableFuture<T>()
       
   422                 .completeOnTimeout(result, val, TimeUnit.MILLISECONDS);
       
   423     }
       
   424 
       
   425     private static <T> CompletableFuture<T> now(T result) {
       
   426         return CompletableFuture.completedFuture(result);
       
   427     }
       
   428 
       
   429     private static WebSocketImpl newInstance(
       
   430             WebSocket.Listener listener,
       
   431             Collection<CompletableFuture<Consumer<MessageStreamConsumer>>> input) {
       
   432         TransportFactory factory = new TransportFactory() {
       
   433             @Override
       
   434             public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
       
   435                                                     MessageStreamConsumer consumer) {
       
   436                 return new MockTransport<T>(sendResultSupplier, consumer) {
       
   437                     @Override
       
   438                     protected Collection<CompletableFuture<Consumer<MessageStreamConsumer>>> receive() {
       
   439                         return input;
       
   440                     }
       
   441                 };
       
   442             }
       
   443         };
       
   444         return newInstance(listener, factory);
       
   445     }
       
   446 
       
   447     private static WebSocketImpl newInstance(WebSocket.Listener listener,
       
   448                                              TransportFactory factory) {
       
   449         URI uri = URI.create("ws://localhost");
       
   450         String subprotocol = "";
       
   451         return WebSocketImpl.newInstance(uri, subprotocol, listener, factory);
       
   452     }
       
   453 }