http-client-branch: (WebSocket) example-test http-client-branch
authorprappo
Thu, 01 Feb 2018 15:37:38 +0000
branchhttp-client-branch
changeset 56058 a02d0098c630
parent 56057 ab28f851bd8c
child 56059 f5473559b6d3
child 56064 255e21e86b99
http-client-branch: (WebSocket) example-test
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/FrameConsumer.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java
test/jdk/java/net/httpclient/websocket/WebSocketTest.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/FrameConsumer.java	Wed Jan 31 17:09:02 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/FrameConsumer.java	Thu Feb 01 15:37:38 2018 +0000
@@ -49,7 +49,7 @@
 /* Non-final for testing purposes only */
 class FrameConsumer implements Frame.Consumer {
 
-    private final static boolean DEBUG = true;
+    private final static boolean DEBUG = false;
     private final MessageStreamConsumer output;
     private final UTF8AccumulatingDecoder decoder = new UTF8AccumulatingDecoder();
     private boolean fin;
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java	Wed Jan 31 17:09:02 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java	Thu Feb 01 15:37:38 2018 +0000
@@ -184,11 +184,9 @@
         return transport.sendPong(message);
     }
 
-    // TODO: this is the only method that works unconditionally: e.g. even if CF
-    // completes with an exception, the output will be closed
-    // Even if arguments are illegal the closure will happen (e.g. a default message)
     @Override
     public CompletableFuture<WebSocket> sendClose(int statusCode, String reason) {
+        Objects.requireNonNull(reason);
         if (!isLegalToSendFromClient(statusCode)) {
             return failedFuture(new IllegalArgumentException("statusCode"));
         }
@@ -200,8 +198,6 @@
      * messages are expected to be sent after this.
      */
     private CompletableFuture<WebSocket> sendClose0(int statusCode, String reason ) {
-        // TODO: MUST be a CF created once and shared across sendClose, otherwise
-        // a second sendClose may prematurely close the channel
         outputClosed = true;
         return transport.sendClose(statusCode, reason)
                 .whenComplete((result, error) -> {
--- a/test/jdk/java/net/httpclient/websocket/WebSocketTest.java	Wed Jan 31 17:09:02 2018 +0000
+++ b/test/jdk/java/net/httpclient/websocket/WebSocketTest.java	Thu Feb 01 15:37:38 2018 +0000
@@ -35,6 +35,8 @@
 import java.nio.CharBuffer;
 import java.nio.channels.SocketChannel;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.CompletionStage;
@@ -132,6 +134,14 @@
                 messageReceived.get(10, TimeUnit.SECONDS);
                 fail();
             } catch (TimeoutException expected) { }
+            // TODO: No send operations MUST succeed
+//            assertCompletesExceptionally(IOE, ws.sendText("text!", false));
+//            assertCompletesExceptionally(IOE, ws.sendText("text!", true));
+//            assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(16), false));
+//            assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(16), true));
+//            assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(16)));
+//            assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(16)));
+//            assertCompletesExceptionally(IOE, ws.sendClose(NORMAL_CLOSURE, "a reason"));
         }
     }
 
@@ -517,4 +527,99 @@
             assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(0)));
         }
     }
+
+    @Test
+    public void aggregatingMessages() throws IOException {
+
+        List<String> expected = List.of("alpha", "beta", "gamma", "delta");
+
+        int[] binary = new int[]{
+                0x81, 0x05, 0x61, 0x6c, 0x70, 0x68, 0x61, // "alpha"
+                0x01, 0x02, 0x62, 0x65,                   // "be
+                0x80, 0x02, 0x74, 0x61,                   // ta"
+                0x01, 0x01, 0x67,                         // "g
+                0x00, 0x01, 0x61,                         // a
+                0x00, 0x00,                               //
+                0x00, 0x00,                               //
+                0x00, 0x01, 0x6d,                         // m
+                0x00, 0x01, 0x6d,                         // m
+                0x80, 0x01, 0x61,                         // a"
+                0x8a, 0x00,                               // <PONG>
+                0x01, 0x04, 0x64, 0x65, 0x6c, 0x74,       // "delt
+                0x00, 0x01, 0x61,                         // a
+                0x80, 0x00,                               // "
+                0x88, 0x00                                // <CLOSE>
+
+        };
+        CompletableFuture<List<String>> actual = new CompletableFuture<>();
+
+
+        try (DummyWebSocketServer server = serverWithCannedData(binary)) {
+            server.open();
+
+            WebSocket.Listener listener = new WebSocket.Listener() {
+
+                List<CharSequence> parts = new ArrayList<>();
+                CompletableFuture<?> currentCf;
+                List<String> collected = new ArrayList<>();
+
+                @Override
+                public CompletionStage<?> onText(WebSocket webSocket,
+                                                 CharSequence message,
+                                                 WebSocket.MessagePart part) {
+                    switch (part) {
+                        case WHOLE:
+                            CompletableFuture<?> cf = new CompletableFuture<>();
+                            cf.thenRun(() -> webSocket.request(1));
+                            processWholeMessage(List.of(message), cf);
+                            return cf;
+                        case FIRST:
+                            parts.add(message);
+                            currentCf = new CompletableFuture<>();
+                            currentCf.thenRun(() -> webSocket.request(1));
+                            webSocket.request(1);
+                            break;
+                        case PART:
+                            parts.add(message);
+                            webSocket.request(1);
+                            break;
+                        case LAST:
+                            parts.add(message);
+                            List<CharSequence> copy = List.copyOf(parts);
+                            parts.clear();
+                            CompletableFuture<?> cf1 = currentCf;
+                            currentCf = null;
+                            processWholeMessage(copy, cf1);
+                            return cf1;
+                    }
+                    return currentCf;
+                }
+
+                @Override
+                public CompletionStage<?> onClose(WebSocket webSocket,
+                                                  int statusCode,
+                                                  String reason) {
+                    actual.complete(collected);
+                    return null;
+                }
+
+                public void processWholeMessage(List<CharSequence> data,
+                                                CompletableFuture<?> cf) {
+                    StringBuilder b = new StringBuilder();
+                    data.forEach(b::append);
+                    String s = b.toString();
+                    System.out.println(s);
+                    cf.complete(null);
+                    collected.add(s);
+                }
+            };
+            WebSocket ws = newHttpClient()
+                    .newWebSocketBuilder()
+                    .buildAsync(server.getURI(), listener)
+                    .join();
+
+            List<String> a = actual.join();
+            assertEquals(a, expected);
+        }
+    }
 }