test/jdk/java/net/httpclient/websocket/WebSocketTest.java
changeset 49765 ee6f7a61f3a5
child 49944 4690a2871b44
child 56451 9585061fdb04
equal deleted inserted replaced
49707:f7fd051519ac 49765:ee6f7a61f3a5
       
     1 /*
       
     2  * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.
       
     8  *
       
     9  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    12  * version 2 for more details (a copy is included in the LICENSE file that
       
    13  * accompanied this code).
       
    14  *
       
    15  * You should have received a copy of the GNU General Public License version
       
    16  * 2 along with this work; if not, write to the Free Software Foundation,
       
    17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    18  *
       
    19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    20  * or visit www.oracle.com if you need additional information or have any
       
    21  * questions.
       
    22  */
       
    23 
       
    24 /*
       
    25  * @test
       
    26  * @build DummyWebSocketServer
       
    27  * @run testng/othervm
       
    28  *      -Djdk.internal.httpclient.websocket.debug=true
       
    29  *       WebSocketTest
       
    30  */
       
    31 
       
    32 import org.testng.annotations.AfterTest;
       
    33 import org.testng.annotations.Test;
       
    34 
       
    35 import java.io.IOException;
       
    36 import java.net.http.WebSocket;
       
    37 import java.nio.ByteBuffer;
       
    38 import java.nio.charset.StandardCharsets;
       
    39 import java.util.ArrayList;
       
    40 import java.util.List;
       
    41 import java.util.concurrent.CompletableFuture;
       
    42 import java.util.concurrent.CompletionStage;
       
    43 import java.util.concurrent.TimeUnit;
       
    44 import java.util.stream.Collectors;
       
    45 
       
    46 import static java.net.http.HttpClient.newHttpClient;
       
    47 import static java.net.http.WebSocket.NORMAL_CLOSURE;
       
    48 import static org.testng.Assert.assertEquals;
       
    49 import static org.testng.Assert.assertThrows;
       
    50 
       
    51 public class WebSocketTest {
       
    52 
       
    53     private static final Class<IllegalArgumentException> IAE = IllegalArgumentException.class;
       
    54     private static final Class<IllegalStateException> ISE = IllegalStateException.class;
       
    55     private static final Class<IOException> IOE = IOException.class;
       
    56 
       
    57     /* shortcut */
       
    58     private static void assertFails(Class<? extends Throwable> clazz,
       
    59                                     CompletionStage<?> stage) {
       
    60         Support.assertCompletesExceptionally(clazz, stage);
       
    61     }
       
    62 
       
    63     private DummyWebSocketServer server;
       
    64     private WebSocket webSocket;
       
    65 
       
    66     @AfterTest
       
    67     public void cleanup() {
       
    68         server.close();
       
    69         webSocket.abort();
       
    70     }
       
    71 
       
    72     @Test
       
    73     public void illegalArgument() throws IOException {
       
    74         server = new DummyWebSocketServer();
       
    75         server.open();
       
    76         webSocket = newHttpClient()
       
    77                 .newWebSocketBuilder()
       
    78                 .buildAsync(server.getURI(), new WebSocket.Listener() { })
       
    79                 .join();
       
    80 
       
    81         assertFails(IAE, webSocket.sendPing(ByteBuffer.allocate(126)));
       
    82         assertFails(IAE, webSocket.sendPing(ByteBuffer.allocate(127)));
       
    83         assertFails(IAE, webSocket.sendPing(ByteBuffer.allocate(128)));
       
    84         assertFails(IAE, webSocket.sendPing(ByteBuffer.allocate(129)));
       
    85         assertFails(IAE, webSocket.sendPing(ByteBuffer.allocate(256)));
       
    86 
       
    87         assertFails(IAE, webSocket.sendPong(ByteBuffer.allocate(126)));
       
    88         assertFails(IAE, webSocket.sendPong(ByteBuffer.allocate(127)));
       
    89         assertFails(IAE, webSocket.sendPong(ByteBuffer.allocate(128)));
       
    90         assertFails(IAE, webSocket.sendPong(ByteBuffer.allocate(129)));
       
    91         assertFails(IAE, webSocket.sendPong(ByteBuffer.allocate(256)));
       
    92 
       
    93         assertFails(IOE, webSocket.sendText(Support.incompleteString(), true));
       
    94         assertFails(IOE, webSocket.sendText(Support.incompleteString(), false));
       
    95         assertFails(IOE, webSocket.sendText(Support.malformedString(), true));
       
    96         assertFails(IOE, webSocket.sendText(Support.malformedString(), false));
       
    97 
       
    98         assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.stringWithNBytes(124)));
       
    99         assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.stringWithNBytes(125)));
       
   100         assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.stringWithNBytes(128)));
       
   101         assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.stringWithNBytes(256)));
       
   102         assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.stringWithNBytes(257)));
       
   103         assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.stringWith2NBytes((123 / 2) + 1)));
       
   104         assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.malformedString()));
       
   105         assertFails(IAE, webSocket.sendClose(NORMAL_CLOSURE, Support.incompleteString()));
       
   106 
       
   107         assertFails(IAE, webSocket.sendClose(-2, "a reason"));
       
   108         assertFails(IAE, webSocket.sendClose(-1, "a reason"));
       
   109         assertFails(IAE, webSocket.sendClose(0, "a reason"));
       
   110         assertFails(IAE, webSocket.sendClose(1, "a reason"));
       
   111         assertFails(IAE, webSocket.sendClose(500, "a reason"));
       
   112         assertFails(IAE, webSocket.sendClose(998, "a reason"));
       
   113         assertFails(IAE, webSocket.sendClose(999, "a reason"));
       
   114         assertFails(IAE, webSocket.sendClose(1002, "a reason"));
       
   115         assertFails(IAE, webSocket.sendClose(1003, "a reason"));
       
   116         assertFails(IAE, webSocket.sendClose(1006, "a reason"));
       
   117         assertFails(IAE, webSocket.sendClose(1007, "a reason"));
       
   118         assertFails(IAE, webSocket.sendClose(1009, "a reason"));
       
   119         assertFails(IAE, webSocket.sendClose(1010, "a reason"));
       
   120         assertFails(IAE, webSocket.sendClose(1012, "a reason"));
       
   121         assertFails(IAE, webSocket.sendClose(1013, "a reason"));
       
   122         assertFails(IAE, webSocket.sendClose(1015, "a reason"));
       
   123         assertFails(IAE, webSocket.sendClose(5000, "a reason"));
       
   124         assertFails(IAE, webSocket.sendClose(32768, "a reason"));
       
   125         assertFails(IAE, webSocket.sendClose(65535, "a reason"));
       
   126         assertFails(IAE, webSocket.sendClose(65536, "a reason"));
       
   127         assertFails(IAE, webSocket.sendClose(Integer.MAX_VALUE, "a reason"));
       
   128         assertFails(IAE, webSocket.sendClose(Integer.MIN_VALUE, "a reason"));
       
   129 
       
   130         assertThrows(IAE, () -> webSocket.request(Integer.MIN_VALUE));
       
   131         assertThrows(IAE, () -> webSocket.request(Long.MIN_VALUE));
       
   132         assertThrows(IAE, () -> webSocket.request(-1));
       
   133         assertThrows(IAE, () -> webSocket.request(0));
       
   134     }
       
   135 
       
   136     @Test
       
   137     public void partialBinaryThenText() throws IOException {
       
   138         server = new DummyWebSocketServer();
       
   139         server.open();
       
   140         webSocket = newHttpClient().newWebSocketBuilder()
       
   141                 .buildAsync(server.getURI(), new WebSocket.Listener() { })
       
   142                 .join();
       
   143         webSocket.sendBinary(ByteBuffer.allocate(16), false).join();
       
   144         assertFails(ISE, webSocket.sendText("text", false));
       
   145         assertFails(ISE, webSocket.sendText("text", true));
       
   146         // Pings & Pongs are fine
       
   147         webSocket.sendPing(ByteBuffer.allocate(125)).join();
       
   148         webSocket.sendPong(ByteBuffer.allocate(125)).join();
       
   149     }
       
   150 
       
   151     @Test
       
   152     public void partialTextThenBinary() throws IOException {
       
   153         server = new DummyWebSocketServer();
       
   154         server.open();
       
   155         webSocket = newHttpClient().newWebSocketBuilder()
       
   156                 .buildAsync(server.getURI(), new WebSocket.Listener() { })
       
   157                 .join();
       
   158 
       
   159         webSocket.sendText("text", false).join();
       
   160         assertFails(ISE, webSocket.sendBinary(ByteBuffer.allocate(16), false));
       
   161         assertFails(ISE, webSocket.sendBinary(ByteBuffer.allocate(16), true));
       
   162         // Pings & Pongs are fine
       
   163         webSocket.sendPing(ByteBuffer.allocate(125)).join();
       
   164         webSocket.sendPong(ByteBuffer.allocate(125)).join();
       
   165     }
       
   166 
       
   167     @Test
       
   168     public void sendMethodsThrowIOE1() throws IOException {
       
   169         server = new DummyWebSocketServer();
       
   170         server.open();
       
   171         webSocket = newHttpClient()
       
   172                 .newWebSocketBuilder()
       
   173                 .buildAsync(server.getURI(), new WebSocket.Listener() { })
       
   174                 .join();
       
   175 
       
   176         webSocket.sendClose(NORMAL_CLOSURE, "ok").join();
       
   177 
       
   178         assertFails(IOE, webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "ok"));
       
   179 
       
   180         assertFails(IOE, webSocket.sendText("", true));
       
   181         assertFails(IOE, webSocket.sendText("", false));
       
   182         assertFails(IOE, webSocket.sendText("abc", true));
       
   183         assertFails(IOE, webSocket.sendText("abc", false));
       
   184         assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(0), true));
       
   185         assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(0), false));
       
   186         assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(1), true));
       
   187         assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(1), false));
       
   188 
       
   189         assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(125)));
       
   190         assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(124)));
       
   191         assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(1)));
       
   192         assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(0)));
       
   193 
       
   194         assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(125)));
       
   195         assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(124)));
       
   196         assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(1)));
       
   197         assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(0)));
       
   198     }
       
   199 
       
   200     @Test
       
   201     public void sendMethodsThrowIOE2() throws Exception {
       
   202         server = Support.serverWithCannedData(0x88, 0x00);
       
   203         server.open();
       
   204         CompletableFuture<Void> onCloseCalled = new CompletableFuture<>();
       
   205         CompletableFuture<Void> canClose = new CompletableFuture<>();
       
   206 
       
   207         WebSocket.Listener listener = new WebSocket.Listener() {
       
   208             @Override
       
   209             public CompletionStage<?> onClose(WebSocket webSocket,
       
   210                                               int statusCode,
       
   211                                               String reason) {
       
   212                 System.out.printf("onClose(%s, '%s')%n", statusCode, reason);
       
   213                 onCloseCalled.complete(null);
       
   214                 return canClose;
       
   215             }
       
   216 
       
   217             @Override
       
   218             public void onError(WebSocket webSocket, Throwable error) {
       
   219                 System.out.println("onError(" + error + ")");
       
   220                 onCloseCalled.completeExceptionally(error);
       
   221             }
       
   222         };
       
   223 
       
   224         webSocket = newHttpClient().newWebSocketBuilder()
       
   225                 .buildAsync(server.getURI(), listener)
       
   226                 .join();
       
   227 
       
   228         onCloseCalled.join();      // Wait for onClose to be called
       
   229         canClose.complete(null);   // Signal to the WebSocket it can close the output
       
   230         TimeUnit.SECONDS.sleep(5); // Give canClose some time to reach the WebSocket
       
   231 
       
   232         assertFails(IOE, webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "ok"));
       
   233 
       
   234         assertFails(IOE, webSocket.sendText("", true));
       
   235         assertFails(IOE, webSocket.sendText("", false));
       
   236         assertFails(IOE, webSocket.sendText("abc", true));
       
   237         assertFails(IOE, webSocket.sendText("abc", false));
       
   238         assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(0), true));
       
   239         assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(0), false));
       
   240         assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(1), true));
       
   241         assertFails(IOE, webSocket.sendBinary(ByteBuffer.allocate(1), false));
       
   242 
       
   243         assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(125)));
       
   244         assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(124)));
       
   245         assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(1)));
       
   246         assertFails(IOE, webSocket.sendPing(ByteBuffer.allocate(0)));
       
   247 
       
   248         assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(125)));
       
   249         assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(124)));
       
   250         assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(1)));
       
   251         assertFails(IOE, webSocket.sendPong(ByteBuffer.allocate(0)));
       
   252     }
       
   253 
       
   254     @Test
       
   255     public void simpleAggregatingBinaryMessages() throws IOException {
       
   256         List<byte[]> expected = List.of("alpha", "beta", "gamma", "delta")
       
   257                 .stream()
       
   258                 .map(s -> s.getBytes(StandardCharsets.US_ASCII))
       
   259                 .collect(Collectors.toList());
       
   260         int[] binary = new int[]{
       
   261                 0x82, 0x05, 0x61, 0x6c, 0x70, 0x68, 0x61, // [alpha]
       
   262                 0x02, 0x02, 0x62, 0x65,                   // [be
       
   263                 0x80, 0x02, 0x74, 0x61,                   // ta]
       
   264                 0x02, 0x01, 0x67,                         // [g
       
   265                 0x00, 0x01, 0x61,                         // a
       
   266                 0x00, 0x00,                               //
       
   267                 0x00, 0x00,                               //
       
   268                 0x00, 0x01, 0x6d,                         // m
       
   269                 0x00, 0x01, 0x6d,                         // m
       
   270                 0x80, 0x01, 0x61,                         // a]
       
   271                 0x8a, 0x00,                               // <PONG>
       
   272                 0x02, 0x04, 0x64, 0x65, 0x6c, 0x74,       // [delt
       
   273                 0x00, 0x01, 0x61,                         // a
       
   274                 0x80, 0x00,                               // ]
       
   275                 0x88, 0x00                                // <CLOSE>
       
   276         };
       
   277         CompletableFuture<List<byte[]>> actual = new CompletableFuture<>();
       
   278 
       
   279         server = Support.serverWithCannedData(binary);
       
   280         server.open();
       
   281 
       
   282         WebSocket.Listener listener = new WebSocket.Listener() {
       
   283 
       
   284             List<byte[]> collectedBytes = new ArrayList<>();
       
   285             ByteBuffer buffer = ByteBuffer.allocate(1024);
       
   286 
       
   287             @Override
       
   288             public CompletionStage<?> onBinary(WebSocket webSocket,
       
   289                                                ByteBuffer message,
       
   290                                                boolean last) {
       
   291                 System.out.printf("onBinary(%s, %s)%n", message, last);
       
   292                 webSocket.request(1);
       
   293 
       
   294                 append(message);
       
   295                 if (last) {
       
   296                     buffer.flip();
       
   297                     byte[] bytes = new byte[buffer.remaining()];
       
   298                     buffer.get(bytes);
       
   299                     buffer.clear();
       
   300                     processWholeBinary(bytes);
       
   301                 }
       
   302                 return null;
       
   303             }
       
   304 
       
   305             private void append(ByteBuffer message) {
       
   306                 if (buffer.remaining() < message.remaining()) {
       
   307                     assert message.remaining() > 0;
       
   308                     int cap = (buffer.capacity() + message.remaining()) * 2;
       
   309                     ByteBuffer b = ByteBuffer.allocate(cap);
       
   310                     b.put(buffer.flip());
       
   311                     buffer = b;
       
   312                 }
       
   313                 buffer.put(message);
       
   314             }
       
   315 
       
   316             private void processWholeBinary(byte[] bytes) {
       
   317                 String stringBytes = new String(bytes, StandardCharsets.UTF_8);
       
   318                 System.out.println("processWholeBinary: " + stringBytes);
       
   319                 collectedBytes.add(bytes);
       
   320             }
       
   321 
       
   322             @Override
       
   323             public CompletionStage<?> onClose(WebSocket webSocket,
       
   324                                               int statusCode,
       
   325                                               String reason) {
       
   326                 actual.complete(collectedBytes);
       
   327                 return null;
       
   328             }
       
   329 
       
   330             @Override
       
   331             public void onError(WebSocket webSocket, Throwable error) {
       
   332                 actual.completeExceptionally(error);
       
   333             }
       
   334         };
       
   335 
       
   336         webSocket = newHttpClient().newWebSocketBuilder()
       
   337                 .buildAsync(server.getURI(), listener)
       
   338                 .join();
       
   339 
       
   340         List<byte[]> a = actual.join();
       
   341         assertEquals(a, expected);
       
   342     }
       
   343 
       
   344     @Test
       
   345     public void simpleAggregatingTextMessages() throws IOException {
       
   346 
       
   347         List<String> expected = List.of("alpha", "beta", "gamma", "delta");
       
   348 
       
   349         int[] binary = new int[]{
       
   350                 0x81, 0x05, 0x61, 0x6c, 0x70, 0x68, 0x61, // "alpha"
       
   351                 0x01, 0x02, 0x62, 0x65,                   // "be
       
   352                 0x80, 0x02, 0x74, 0x61,                   // ta"
       
   353                 0x01, 0x01, 0x67,                         // "g
       
   354                 0x00, 0x01, 0x61,                         // a
       
   355                 0x00, 0x00,                               //
       
   356                 0x00, 0x00,                               //
       
   357                 0x00, 0x01, 0x6d,                         // m
       
   358                 0x00, 0x01, 0x6d,                         // m
       
   359                 0x80, 0x01, 0x61,                         // a"
       
   360                 0x8a, 0x00,                               // <PONG>
       
   361                 0x01, 0x04, 0x64, 0x65, 0x6c, 0x74,       // "delt
       
   362                 0x00, 0x01, 0x61,                         // a
       
   363                 0x80, 0x00,                               // "
       
   364                 0x88, 0x00                                // <CLOSE>
       
   365         };
       
   366         CompletableFuture<List<String>> actual = new CompletableFuture<>();
       
   367 
       
   368         server = Support.serverWithCannedData(binary);
       
   369         server.open();
       
   370 
       
   371         WebSocket.Listener listener = new WebSocket.Listener() {
       
   372 
       
   373             List<String> collectedStrings = new ArrayList<>();
       
   374             StringBuilder text = new StringBuilder();
       
   375 
       
   376             @Override
       
   377             public CompletionStage<?> onText(WebSocket webSocket,
       
   378                                              CharSequence message,
       
   379                                              boolean last) {
       
   380                 System.out.printf("onText(%s, %s)%n", message, last);
       
   381                 webSocket.request(1);
       
   382                 text.append(message);
       
   383                 if (last) {
       
   384                     String str = text.toString();
       
   385                     text.setLength(0);
       
   386                     processWholeText(str);
       
   387                 }
       
   388                 return null;
       
   389             }
       
   390 
       
   391             private void processWholeText(String string) {
       
   392                 System.out.println(string);
       
   393                 collectedStrings.add(string);
       
   394             }
       
   395 
       
   396             @Override
       
   397             public CompletionStage<?> onClose(WebSocket webSocket,
       
   398                                               int statusCode,
       
   399                                               String reason) {
       
   400                 actual.complete(collectedStrings);
       
   401                 return null;
       
   402             }
       
   403 
       
   404             @Override
       
   405             public void onError(WebSocket webSocket, Throwable error) {
       
   406                 actual.completeExceptionally(error);
       
   407             }
       
   408         };
       
   409 
       
   410         webSocket = newHttpClient().newWebSocketBuilder()
       
   411                 .buildAsync(server.getURI(), listener)
       
   412                 .join();
       
   413 
       
   414         List<String> a = actual.join();
       
   415         assertEquals(a, expected);
       
   416     }
       
   417 
       
   418     /*
       
   419      * Exercises the scenario where requests for more messages are made prior to
       
   420      * completing the returned CompletionStage instances.
       
   421      */
       
   422     @Test
       
   423     public void aggregatingTextMessages() throws IOException {
       
   424 
       
   425         List<String> expected = List.of("alpha", "beta", "gamma", "delta");
       
   426 
       
   427         int[] binary = new int[]{
       
   428                 0x81, 0x05, 0x61, 0x6c, 0x70, 0x68, 0x61, // "alpha"
       
   429                 0x01, 0x02, 0x62, 0x65,                   // "be
       
   430                 0x80, 0x02, 0x74, 0x61,                   // ta"
       
   431                 0x01, 0x01, 0x67,                         // "g
       
   432                 0x00, 0x01, 0x61,                         // a
       
   433                 0x00, 0x00,                               //
       
   434                 0x00, 0x00,                               //
       
   435                 0x00, 0x01, 0x6d,                         // m
       
   436                 0x00, 0x01, 0x6d,                         // m
       
   437                 0x80, 0x01, 0x61,                         // a"
       
   438                 0x8a, 0x00,                               // <PONG>
       
   439                 0x01, 0x04, 0x64, 0x65, 0x6c, 0x74,       // "delt
       
   440                 0x00, 0x01, 0x61,                         // a
       
   441                 0x80, 0x00,                               // "
       
   442                 0x88, 0x00                                // <CLOSE>
       
   443         };
       
   444         CompletableFuture<List<String>> actual = new CompletableFuture<>();
       
   445 
       
   446 
       
   447         server = Support.serverWithCannedData(binary);
       
   448         server.open();
       
   449 
       
   450         WebSocket.Listener listener = new WebSocket.Listener() {
       
   451 
       
   452             List<CharSequence> parts = new ArrayList<>();
       
   453             /*
       
   454              * A CompletableFuture which will complete once the current
       
   455              * message has been fully assembled. Until then the listener
       
   456              * returns this instance for every call.
       
   457              */
       
   458             CompletableFuture<?> currentCf = new CompletableFuture<>();
       
   459             List<String> collected = new ArrayList<>();
       
   460 
       
   461             @Override
       
   462             public CompletionStage<?> onText(WebSocket webSocket,
       
   463                                              CharSequence message,
       
   464                                              boolean last) {
       
   465                 parts.add(message);
       
   466                 if (!last) {
       
   467                     webSocket.request(1);
       
   468                 } else {
       
   469                     this.currentCf.thenRun(() -> webSocket.request(1));
       
   470                     CompletableFuture<?> refCf = this.currentCf;
       
   471                     processWholeMessage(new ArrayList<>(parts), refCf);
       
   472                     currentCf = new CompletableFuture<>();
       
   473                     parts.clear();
       
   474                     return refCf;
       
   475                 }
       
   476                 return currentCf;
       
   477             }
       
   478 
       
   479             @Override
       
   480             public CompletionStage<?> onClose(WebSocket webSocket,
       
   481                                               int statusCode,
       
   482                                               String reason) {
       
   483                 actual.complete(collected);
       
   484                 return null;
       
   485             }
       
   486 
       
   487             @Override
       
   488             public void onError(WebSocket webSocket, Throwable error) {
       
   489                 actual.completeExceptionally(error);
       
   490             }
       
   491 
       
   492             public void processWholeMessage(List<CharSequence> data,
       
   493                                             CompletableFuture<?> cf) {
       
   494                 StringBuilder b = new StringBuilder();
       
   495                 data.forEach(b::append);
       
   496                 String s = b.toString();
       
   497                 System.out.println(s);
       
   498                 cf.complete(null);
       
   499                 collected.add(s);
       
   500             }
       
   501         };
       
   502 
       
   503         webSocket = newHttpClient().newWebSocketBuilder()
       
   504                 .buildAsync(server.getURI(), listener)
       
   505                 .join();
       
   506 
       
   507         List<String> a = actual.join();
       
   508         assertEquals(a, expected);
       
   509     }
       
   510 }