test/jdk/java/net/httpclient/websocket/WebSocketTest.java
branchhttp-client-branch
changeset 56263 4933a477d628
parent 56130 c99ed32a4d12
child 56269 234813fd33bc
--- a/test/jdk/java/net/httpclient/websocket/WebSocketTest.java	Wed Mar 07 15:39:25 2018 +0000
+++ b/test/jdk/java/net/httpclient/websocket/WebSocketTest.java	Wed Mar 07 17:16:28 2018 +0000
@@ -24,9 +24,8 @@
 /*
  * @test
  * @build DummyWebSocketServer
- * @run testng/othervm -Djdk.httpclient.HttpClient.log=trace WebSocketTest
+ * @run testng/othervm WebSocketTest
  */
-
 import org.testng.annotations.Test;
 
 import java.io.IOException;
@@ -36,12 +35,14 @@
 import java.nio.channels.SocketChannel;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 
 import static java.net.http.HttpClient.newHttpClient;
 import static java.net.http.WebSocket.NORMAL_CLOSURE;
@@ -58,16 +59,19 @@
     private static final Class<IllegalStateException> ISE = IllegalStateException.class;
     private static final Class<IOException> IOE = IOException.class;
 
-    @Test
-    public void abort() throws Exception {
+//    @Test
+    public void immediateAbort() throws Exception {
         try (DummyWebSocketServer server = serverWithCannedData(0x81, 0x00, 0x88, 0x00)) {
             server.open();
             CompletableFuture<Void> messageReceived = new CompletableFuture<>();
             WebSocket ws = newHttpClient()
                     .newWebSocketBuilder()
                     .buildAsync(server.getURI(), new WebSocket.Listener() {
+
                         @Override
-                        public void onOpen(WebSocket webSocket) { /* no initial request */ }
+                        public void onOpen(WebSocket webSocket) {
+                            /* no initial request */
+                        }
 
                         @Override
                         public CompletionStage<?> onText(WebSocket webSocket,
@@ -133,15 +137,17 @@
             try {
                 messageReceived.get(10, TimeUnit.SECONDS);
                 fail();
-            } catch (TimeoutException expected) { }
-            // TODO: No send operations MUST succeed
-//            assertCompletesExceptionally(IOE, ws.sendText("text!", false));
-//            assertCompletesExceptionally(IOE, ws.sendText("text!", true));
-//            assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(16), false));
-//            assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(16), true));
-//            assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(16)));
-//            assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(16)));
-//            assertCompletesExceptionally(IOE, ws.sendClose(NORMAL_CLOSURE, "a reason"));
+            } catch (TimeoutException expected) {
+                System.out.println("Finished waiting");
+            }
+            assertCompletesExceptionally(IOE, ws.sendText("text!", false));
+            assertCompletesExceptionally(IOE, ws.sendText("text!", true));
+            assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(16), false));
+            assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(16), true));
+            assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(16)));
+            assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(16)));
+            // Checked last because it changes the state of WebSocket
+            assertCompletesExceptionally(IOE, ws.sendClose(NORMAL_CLOSURE, "a reason"));
         }
     }
 
@@ -154,15 +160,40 @@
             @Override
             protected void serve(SocketChannel channel) throws IOException {
                 ByteBuffer closeMessage = ByteBuffer.wrap(copy);
-                int wrote = channel.write(closeMessage);
-                System.out.println("Wrote bytes: " + wrote);
+                channel.write(closeMessage);
                 super.serve(channel);
             }
         };
     }
 
+    private static void assertCompletesExceptionally(Class<? extends Throwable> clazz,
+                                                     CompletableFuture<?> stage) {
+        stage.handle((result, error) -> {
+            if (error instanceof CompletionException) {
+                Throwable cause = error.getCause();
+                if (cause == null) {
+                    throw new AssertionError("Unexpected null cause: " + error);
+                }
+                assertException(clazz, cause);
+            } else {
+                assertException(clazz, error);
+            }
+            return null;
+        }).join();
+    }
+
+    private static void assertException(Class<? extends Throwable> clazz,
+                                        Throwable t) {
+        if (t == null) {
+            throw new AssertionError("Expected " + clazz + ", caught nothing");
+        }
+        if (!clazz.isInstance(t)) {
+            throw new AssertionError("Expected " + clazz + ", caught " + t);
+        }
+    }
+
     @Test
-    public void testNull() throws IOException {
+    public void sendMethodsThrowNPE() throws IOException {
         try (DummyWebSocketServer server = new DummyWebSocketServer()) {
             server.open();
             WebSocket ws = newHttpClient()
@@ -177,11 +208,25 @@
             assertThrows(NPE, () -> ws.sendPing(null));
             assertThrows(NPE, () -> ws.sendPong(null));
             assertThrows(NPE, () -> ws.sendClose(NORMAL_CLOSURE, null));
+
+            ws.abort();
+
+            assertThrows(NPE, () -> ws.sendText(null, false));
+            assertThrows(NPE, () -> ws.sendText(null, true));
+            assertThrows(NPE, () -> ws.sendBinary(null, false));
+            assertThrows(NPE, () -> ws.sendBinary(null, true));
+            assertThrows(NPE, () -> ws.sendPing(null));
+            assertThrows(NPE, () -> ws.sendPong(null));
+            assertThrows(NPE, () -> ws.sendClose(NORMAL_CLOSURE, null));
         }
     }
 
+    // TODO: request in onClose/onError
+    // TODO: throw exception in onClose/onError
+    // TODO: exception is thrown from request()
+
     @Test
-    public void testSendClose1() throws IOException {
+    public void sendCloseCompleted() throws IOException {
         try (DummyWebSocketServer server = new DummyWebSocketServer()) {
             server.open();
             WebSocket ws = newHttpClient()
@@ -197,7 +242,7 @@
     }
 
     @Test
-    public void testSendClose2() throws Exception {
+    public void sendClosePending() throws Exception {
         try (DummyWebSocketServer server = notReadingServer()) {
             server.open();
             WebSocket ws = newHttpClient()
@@ -205,7 +250,7 @@
                     .buildAsync(server.getURI(), new WebSocket.Listener() { })
                     .join();
             ByteBuffer data = ByteBuffer.allocate(65536);
-            for (int i = 0; ; i++) {
+            for (int i = 0; ; i++) { // fill up the send buffer
                 System.out.println("cycle #" + i);
                 try {
                     ws.sendBinary(data, true).get(10, TimeUnit.SECONDS);
@@ -215,12 +260,11 @@
                 }
             }
             CompletableFuture<WebSocket> cf = ws.sendClose(NORMAL_CLOSURE, "");
+            // The output closes even if the Close message has not been sent
+            assertFalse(cf.isDone());
             assertTrue(ws.isOutputClosed());
             assertFalse(ws.isInputClosed());
             assertEquals(ws.getSubprotocol(), "");
-            // The output closes regardless of whether or not the Close message
-            // has been sent
-            assertFalse(cf.isDone());
         }
     }
 
@@ -242,6 +286,78 @@
         };
     }
 
+//    @Test
+    public void abortPendingSendBinary() throws Exception {
+        try (DummyWebSocketServer server = notReadingServer()) {
+            server.open();
+            WebSocket ws = newHttpClient()
+                    .newWebSocketBuilder()
+                    .buildAsync(server.getURI(), new WebSocket.Listener() { })
+                    .join();
+            ByteBuffer data = ByteBuffer.allocate(65536);
+            CompletableFuture<WebSocket> cf = null;
+            for (int i = 0; ; i++) {  // fill up the send buffer
+                System.out.println("cycle #" + i);
+                try {
+                    cf = ws.sendBinary(data, true);
+                    cf.get(10, TimeUnit.SECONDS);
+                    data.clear();
+                } catch (TimeoutException e) {
+                    break;
+                }
+            }
+            ws.abort();
+            assertTrue(ws.isOutputClosed());
+            assertTrue(ws.isInputClosed());
+            assertCompletesExceptionally(IOException.class, cf);
+        }
+    }
+
+//    @Test
+    public void abortPendingSendText() throws Exception {
+        try (DummyWebSocketServer server = notReadingServer()) {
+            server.open();
+            WebSocket ws = newHttpClient()
+                    .newWebSocketBuilder()
+                    .buildAsync(server.getURI(), new WebSocket.Listener() { })
+                    .join();
+            String data = stringWith2NBytes(32768);
+            CompletableFuture<WebSocket> cf = null;
+            for (int i = 0; ; i++) {  // fill up the send buffer
+                System.out.println("cycle #" + i);
+                try {
+                    cf = ws.sendText(data, true);
+                    cf.get(10, TimeUnit.SECONDS);
+                } catch (TimeoutException e) {
+                    break;
+                }
+            }
+            ws.abort();
+            assertTrue(ws.isOutputClosed());
+            assertTrue(ws.isInputClosed());
+            assertCompletesExceptionally(IOException.class, cf);
+        }
+    }
+
+    private static String stringWith2NBytes(int n) {
+        // -- Russian Alphabet (33 characters, 2 bytes per char) --
+        char[] abc = {
+                0x0410, 0x0411, 0x0412, 0x0413, 0x0414, 0x0415, 0x0401, 0x0416,
+                0x0417, 0x0418, 0x0419, 0x041A, 0x041B, 0x041C, 0x041D, 0x041E,
+                0x041F, 0x0420, 0x0421, 0x0422, 0x0423, 0x0424, 0x0425, 0x0426,
+                0x0427, 0x0428, 0x0429, 0x042A, 0x042B, 0x042C, 0x042D, 0x042E,
+                0x042F,
+        };
+        // repeat cyclically
+        StringBuilder sb = new StringBuilder(n);
+        for (int i = 0, j = 0; i < n; i++, j = (j + 1) % abc.length) {
+            sb.append(abc[j]);
+        }
+        String s = sb.toString();
+        assert s.length() == n && s.getBytes(StandardCharsets.UTF_8).length == 2 * n;
+        return s;
+    }
+
     @Test
     public void testIllegalArgument() throws IOException {
         try (DummyWebSocketServer server = new DummyWebSocketServer()) {
@@ -263,10 +379,10 @@
             assertCompletesExceptionally(IAE, ws.sendPong(ByteBuffer.allocate(129)));
             assertCompletesExceptionally(IAE, ws.sendPong(ByteBuffer.allocate(256)));
 
-            assertCompletesExceptionally(IAE, ws.sendText(incompleteString(), true));
-            assertCompletesExceptionally(IAE, ws.sendText(incompleteString(), false));
-            assertCompletesExceptionally(IAE, ws.sendText(malformedString(), true));
-            assertCompletesExceptionally(IAE, ws.sendText(malformedString(), false));
+            assertCompletesExceptionally(IOE, ws.sendText(incompleteString(), true));
+            assertCompletesExceptionally(IOE, ws.sendText(incompleteString(), false));
+            assertCompletesExceptionally(IOE, ws.sendText(malformedString(), true));
+            assertCompletesExceptionally(IOE, ws.sendText(malformedString(), false));
 
             assertCompletesExceptionally(IAE, ws.sendClose(NORMAL_CLOSURE, stringWithNBytes(124)));
             assertCompletesExceptionally(IAE, ws.sendClose(NORMAL_CLOSURE, stringWithNBytes(125)));
@@ -316,58 +432,13 @@
     }
 
     private static String stringWithNBytes(int n) {
-        StringBuilder sb = new StringBuilder(n);
-        for (int i = 0; i < n; i++) {
-            sb.append("A");
-        }
-        return sb.toString();
-    }
-
-    private static String stringWith2NBytes(int n) {
-        // Russian alphabet repeated cyclically
-        char FIRST = '\u0410';
-        char LAST = '\u042F';
-        StringBuilder sb = new StringBuilder(n);
-        char c = FIRST;
-        for (int i = 0; i < n; i++) {
-            if (++c > LAST) {
-                c = FIRST;
-            }
-            sb.append(c);
-        }
-        String s = sb.toString();
-        assert s.length() == n && s.getBytes(StandardCharsets.UTF_8).length == 2 * n;
-        return s;
-    }
-
-    private static void assertCompletesExceptionally(Class<? extends Throwable> clazz,
-                                                     CompletableFuture<?> stage) {
-        stage.handle((result, error) -> {
-            if (error instanceof CompletionException) {
-                Throwable cause = error.getCause();
-                if (cause == null) {
-                    throw new AssertionError("Unexpected null cause: " + error);
-                }
-                assertException(clazz, cause);
-            } else {
-                assertException(clazz, error);
-            }
-            return null;
-        }).join();
-    }
-
-    private static void assertException(Class<? extends Throwable> clazz,
-                                        Throwable t) {
-        if (t == null) {
-            throw new AssertionError("Expected " + clazz + ", caught nothing");
-        }
-        if (!clazz.isInstance(t)) {
-            throw new AssertionError("Expected " + clazz + ", caught " + t);
-        }
+        char[] chars = new char[n];
+        Arrays.fill(chars, 'A');
+        return new String(chars);
     }
 
     @Test
-    public void testIllegalStateOutstanding1() throws Exception {
+    public void outstanding1() throws Exception {
         try (DummyWebSocketServer server = notReadingServer()) {
             server.open();
             WebSocket ws = newHttpClient()
@@ -376,7 +447,7 @@
                     .join();
 
             ByteBuffer data = ByteBuffer.allocate(65536);
-            for (int i = 0; ; i++) {
+            for (int i = 0; ; i++) {  // fill up the send buffer
                 System.out.println("cycle #" + i);
                 try {
                     ws.sendBinary(data, true).get(10, TimeUnit.SECONDS);
@@ -391,7 +462,7 @@
     }
 
     @Test
-    public void testIllegalStateOutstanding2() throws Exception {
+    public void outstanding2() throws Exception {
         try (DummyWebSocketServer server = notReadingServer()) {
             server.open();
             WebSocket ws = newHttpClient()
@@ -400,7 +471,7 @@
                     .join();
 
             CharBuffer data = CharBuffer.allocate(65536);
-            for (int i = 0; ; i++) {
+            for (int i = 0; ; i++) {  // fill up the send buffer
                 System.out.println("cycle #" + i);
                 try {
                     ws.sendText(data, true).get(10, TimeUnit.SECONDS);
@@ -415,7 +486,7 @@
     }
 
     @Test
-    public void testIllegalStateIntermixed1() throws IOException {
+    public void interleavingTypes1() throws IOException {
         try (DummyWebSocketServer server = new DummyWebSocketServer()) {
             server.open();
             WebSocket ws = newHttpClient()
@@ -430,7 +501,7 @@
     }
 
     @Test
-    public void testIllegalStateIntermixed2() throws IOException {
+    public void interleavingTypes2() throws IOException {
         try (DummyWebSocketServer server = new DummyWebSocketServer()) {
             server.open();
             WebSocket ws = newHttpClient()
@@ -445,7 +516,7 @@
     }
 
     @Test
-    public void testIllegalStateSendClose() throws IOException {
+    public void sendMethodsThrowIOE1() throws IOException {
         try (DummyWebSocketServer server = new DummyWebSocketServer()) {
             server.open();
             WebSocket ws = newHttpClient()
@@ -453,31 +524,33 @@
                     .buildAsync(server.getURI(), new WebSocket.Listener() { })
                     .join();
 
-            ws.sendClose(NORMAL_CLOSURE, "normal close").join();
+            ws.sendClose(NORMAL_CLOSURE, "ok").join();
+
+            assertCompletesExceptionally(IOE, ws.sendClose(WebSocket.NORMAL_CLOSURE, "ok"));
 
-            assertCompletesExceptionally(ISE, ws.sendText("", true));
-            assertCompletesExceptionally(ISE, ws.sendText("", false));
-            assertCompletesExceptionally(ISE, ws.sendText("abc", true));
-            assertCompletesExceptionally(ISE, ws.sendText("abc", false));
-            assertCompletesExceptionally(ISE, ws.sendBinary(ByteBuffer.allocate(0), true));
-            assertCompletesExceptionally(ISE, ws.sendBinary(ByteBuffer.allocate(0), false));
-            assertCompletesExceptionally(ISE, ws.sendBinary(ByteBuffer.allocate(1), true));
-            assertCompletesExceptionally(ISE, ws.sendBinary(ByteBuffer.allocate(1), false));
+            assertCompletesExceptionally(IOE, ws.sendText("", true));
+            assertCompletesExceptionally(IOE, ws.sendText("", false));
+            assertCompletesExceptionally(IOE, ws.sendText("abc", true));
+            assertCompletesExceptionally(IOE, ws.sendText("abc", false));
+            assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(0), true));
+            assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(0), false));
+            assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(1), true));
+            assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(1), false));
 
-            assertCompletesExceptionally(ISE, ws.sendPing(ByteBuffer.allocate(125)));
-            assertCompletesExceptionally(ISE, ws.sendPing(ByteBuffer.allocate(124)));
-            assertCompletesExceptionally(ISE, ws.sendPing(ByteBuffer.allocate(1)));
-            assertCompletesExceptionally(ISE, ws.sendPing(ByteBuffer.allocate(0)));
+            assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(125)));
+            assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(124)));
+            assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(1)));
+            assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(0)));
 
-            assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(125)));
-            assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(124)));
-            assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(1)));
-            assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(0)));
+            assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(125)));
+            assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(124)));
+            assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(1)));
+            assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(0)));
         }
     }
 
     @Test
-    public void testIllegalStateOnClose() throws Exception {
+    public void sendMethodsThrowIOE2() throws Exception {
         try (DummyWebSocketServer server = serverWithCannedData(0x88, 0x00)) {
             server.open();
             CompletableFuture<Void> onCloseCalled = new CompletableFuture<>();
@@ -490,7 +563,7 @@
                         public CompletionStage<?> onClose(WebSocket webSocket,
                                                           int statusCode,
                                                           String reason) {
-                            System.out.println("onClose(" + statusCode + ")");
+                            System.out.printf("onClose(%s, '%s')%n", statusCode, reason);
                             onCloseCalled.complete(null);
                             return canClose;
                         }
@@ -498,38 +571,143 @@
                         @Override
                         public void onError(WebSocket webSocket, Throwable error) {
                             System.out.println("onError(" + error + ")");
-                            error.printStackTrace();
+                            onCloseCalled.completeExceptionally(error);
                         }
                     })
                     .join();
 
             onCloseCalled.join();      // Wait for onClose to be called
+            canClose.complete(null);   // Signal to the WebSocket it can close the output
             TimeUnit.SECONDS.sleep(5); // Give canClose some time to reach the WebSocket
-            canClose.complete(null);   // Signal to the WebSocket it can close the output
+
+            assertCompletesExceptionally(IOE, ws.sendClose(WebSocket.NORMAL_CLOSURE, "ok"));
 
-            assertCompletesExceptionally(ISE, ws.sendText("", true));
-            assertCompletesExceptionally(ISE, ws.sendText("", false));
-            assertCompletesExceptionally(ISE, ws.sendText("abc", true));
-            assertCompletesExceptionally(ISE, ws.sendText("abc", false));
-            assertCompletesExceptionally(ISE, ws.sendBinary(ByteBuffer.allocate(0), true));
-            assertCompletesExceptionally(ISE, ws.sendBinary(ByteBuffer.allocate(0), false));
-            assertCompletesExceptionally(ISE, ws.sendBinary(ByteBuffer.allocate(1), true));
-            assertCompletesExceptionally(ISE, ws.sendBinary(ByteBuffer.allocate(1), false));
+            assertCompletesExceptionally(IOE, ws.sendText("", true));
+            assertCompletesExceptionally(IOE, ws.sendText("", false));
+            assertCompletesExceptionally(IOE, ws.sendText("abc", true));
+            assertCompletesExceptionally(IOE, ws.sendText("abc", false));
+            assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(0), true));
+            assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(0), false));
+            assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(1), true));
+            assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(1), false));
 
-            assertCompletesExceptionally(ISE, ws.sendPing(ByteBuffer.allocate(125)));
-            assertCompletesExceptionally(ISE, ws.sendPing(ByteBuffer.allocate(124)));
-            assertCompletesExceptionally(ISE, ws.sendPing(ByteBuffer.allocate(1)));
-            assertCompletesExceptionally(ISE, ws.sendPing(ByteBuffer.allocate(0)));
+            assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(125)));
+            assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(124)));
+            assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(1)));
+            assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(0)));
 
-            assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(125)));
-            assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(124)));
-            assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(1)));
-            assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(0)));
+            assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(125)));
+            assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(124)));
+            assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(1)));
+            assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(0)));
         }
     }
 
     @Test
-    public void simpleAggregatingMessages() throws IOException {
+    public void simpleAggregatingBinaryMessages() throws IOException {
+        List<byte[]> expected = List.of("alpha", "beta", "gamma", "delta")
+                .stream()
+                .map(s -> s.getBytes(StandardCharsets.US_ASCII))
+                .collect(Collectors.toList());
+        int[] binary = new int[]{
+                0x82, 0x05, 0x61, 0x6c, 0x70, 0x68, 0x61, // [alpha]
+                0x02, 0x02, 0x62, 0x65,                   // [be
+                0x80, 0x02, 0x74, 0x61,                   // ta]
+                0x02, 0x01, 0x67,                         // [g
+                0x00, 0x01, 0x61,                         // a
+                0x00, 0x00,                               //
+                0x00, 0x00,                               //
+                0x00, 0x01, 0x6d,                         // m
+                0x00, 0x01, 0x6d,                         // m
+                0x80, 0x01, 0x61,                         // a]
+                0x8a, 0x00,                               // <PONG>
+                0x02, 0x04, 0x64, 0x65, 0x6c, 0x74,       // [delt
+                0x00, 0x01, 0x61,                         // a
+                0x80, 0x00,                               // ]
+                0x88, 0x00                                // <CLOSE>
+        };
+        CompletableFuture<List<byte[]>> actual = new CompletableFuture<>();
+
+        try (DummyWebSocketServer server = serverWithCannedData(binary)) {
+            server.open();
+
+            WebSocket.Listener listener = new WebSocket.Listener() {
+
+                List<byte[]> collectedBytes = new ArrayList<>();
+                ByteBuffer binary;
+
+                @Override
+                public CompletionStage<?> onBinary(WebSocket webSocket,
+                                                   ByteBuffer message,
+                                                   WebSocket.MessagePart part) {
+                    System.out.printf("onBinary(%s, %s)%n", message, part);
+                    webSocket.request(1);
+                    byte[] bytes = null;
+                    switch (part) {
+                        case FIRST:
+                            binary = ByteBuffer.allocate(message.remaining() * 2);
+                        case PART:
+                            append(message);
+                            return null;
+                        case LAST:
+                            append(message);
+                            binary.flip();
+                            bytes = new byte[binary.remaining()];
+                            binary.get(bytes);
+                            binary.clear();
+                            break;
+                        case WHOLE:
+                            bytes = new byte[message.remaining()];
+                            message.get(bytes);
+                            break;
+                    }
+                    processWholeBinary(bytes);
+                    return null;
+                }
+
+                private void append(ByteBuffer message) {
+                    if (binary.remaining() < message.remaining()) {
+                        assert message.remaining() > 0;
+                        int cap = (binary.capacity() + message.remaining()) * 2;
+                        ByteBuffer b = ByteBuffer.allocate(cap);
+                        b.put(binary.flip());
+                        binary = b;
+                    }
+                    binary.put(message);
+                }
+
+                private void processWholeBinary(byte[] bytes) {
+                    String stringBytes = new String(bytes, StandardCharsets.UTF_8);
+                    System.out.println("processWholeBinary: " + stringBytes);
+                    collectedBytes.add(bytes);
+                }
+
+                @Override
+                public CompletionStage<?> onClose(WebSocket webSocket,
+                                                  int statusCode,
+                                                  String reason) {
+                    actual.complete(collectedBytes);
+                    return null;
+                }
+
+                @Override
+                public void onError(WebSocket webSocket, Throwable error) {
+                    actual.completeExceptionally(error);
+                }
+            };
+
+            newHttpClient().newWebSocketBuilder()
+                           .buildAsync(server.getURI(), listener)
+                           .join();
+
+            List<byte[]> a = actual.join();
+            System.out.println("joined");
+            assertEquals(a, expected);
+        }
+    }
+
+    @Test
+    public void simpleAggregatingTextMessages() throws IOException {
 
         List<String> expected = List.of("alpha", "beta", "gamma", "delta");
 
@@ -557,24 +735,25 @@
 
             WebSocket.Listener listener = new WebSocket.Listener() {
 
-                List<String> collected = new ArrayList<>();
-                StringBuilder text = new StringBuilder();
+                List<String> collectedStrings = new ArrayList<>();
+                StringBuilder text;
 
                 @Override
                 public CompletionStage<?> onText(WebSocket webSocket,
                                                  CharSequence message,
                                                  WebSocket.MessagePart part) {
+                    System.out.printf("onText(%s, %s)%n", message, part);
                     webSocket.request(1);
                     String str = null;
                     switch (part) {
                         case FIRST:
+                            text = new StringBuilder(message.length() * 2);
                         case PART:
                             text.append(message);
                             return null;
                         case LAST:
                             text.append(message);
                             str = text.toString();
-                            text.setLength(0);
                             break;
                         case WHOLE:
                             str = message.toString();
@@ -586,30 +765,38 @@
 
                 private void processWholeText(String string) {
                     System.out.println(string);
-                    // -- your code here --
-                    collected.add(string);
+                    collectedStrings.add(string);
                 }
 
                 @Override
                 public CompletionStage<?> onClose(WebSocket webSocket,
                                                   int statusCode,
                                                   String reason) {
-                    actual.complete(collected);
+                    actual.complete(collectedStrings);
                     return null;
                 }
+
+                @Override
+                public void onError(WebSocket webSocket, Throwable error) {
+                    actual.completeExceptionally(error);
+                }
             };
 
             newHttpClient().newWebSocketBuilder()
-                    .buildAsync(server.getURI(), listener)
-                    .join();
+                           .buildAsync(server.getURI(), listener)
+                           .join();
 
             List<String> a = actual.join();
             assertEquals(a, expected);
         }
     }
 
+    /*
+     * Exercises the scenario where requests for more messages are made prior to
+     * completing the returned CompletionStage instances.
+     */
     @Test
-    public void aggregatingMessages() throws IOException {
+    public void aggregatingTextMessages() throws IOException {
 
         List<String> expected = List.of("alpha", "beta", "gamma", "delta");
 
@@ -638,7 +825,12 @@
 
             WebSocket.Listener listener = new WebSocket.Listener() {
 
-                List<CharSequence> parts = new ArrayList<>();
+                List<CharSequence> parts;
+                /*
+                 * A CompletableFuture which will complete once the current
+                 * message has been fully assembled (LAST/WHOLE). Until then
+                 * the listener returns this instance for every call.
+                 */
                 CompletableFuture<?> currentCf;
                 List<String> collected = new ArrayList<>();
 
@@ -653,6 +845,7 @@
                             processWholeMessage(List.of(message), cf);
                             return cf;
                         case FIRST:
+                            parts = new ArrayList<>();
                             parts.add(message);
                             currentCf = new CompletableFuture<>();
                             currentCf.thenRun(() -> webSocket.request(1));
@@ -664,12 +857,11 @@
                             break;
                         case LAST:
                             parts.add(message);
-                            List<CharSequence> copy = List.copyOf(parts);
-                            parts.clear();
-                            CompletableFuture<?> cf1 = currentCf;
+                            CompletableFuture<?> copyCf = this.currentCf;
+                            processWholeMessage(parts, copyCf);
                             currentCf = null;
-                            processWholeMessage(copy, cf1);
-                            return cf1;
+                            parts = null;
+                            return copyCf;
                     }
                     return currentCf;
                 }
@@ -682,6 +874,11 @@
                     return null;
                 }
 
+                @Override
+                public void onError(WebSocket webSocket, Throwable error) {
+                    actual.completeExceptionally(error);
+                }
+
                 public void processWholeMessage(List<CharSequence> data,
                                                 CompletableFuture<?> cf) {
                     StringBuilder b = new StringBuilder();
@@ -692,10 +889,10 @@
                     collected.add(s);
                 }
             };
-            WebSocket ws = newHttpClient()
-                    .newWebSocketBuilder()
-                    .buildAsync(server.getURI(), listener)
-                    .join();
+
+            newHttpClient().newWebSocketBuilder()
+                           .buildAsync(server.getURI(), listener)
+                           .join();
 
             List<String> a = actual.join();
             assertEquals(a, expected);