# HG changeset patch # User prappo # Date 1517499458 0 # Node ID a02d0098c630752decfc4c718acafb01e03d5146 # Parent ab28f851bd8c5355a2b61e9dd6ef989a5dc739cf http-client-branch: (WebSocket) example-test diff -r ab28f851bd8c -r a02d0098c630 src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/FrameConsumer.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/FrameConsumer.java Wed Jan 31 17:09:02 2018 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/FrameConsumer.java Thu Feb 01 15:37:38 2018 +0000 @@ -49,7 +49,7 @@ /* Non-final for testing purposes only */ class FrameConsumer implements Frame.Consumer { - private final static boolean DEBUG = true; + private final static boolean DEBUG = false; private final MessageStreamConsumer output; private final UTF8AccumulatingDecoder decoder = new UTF8AccumulatingDecoder(); private boolean fin; diff -r ab28f851bd8c -r a02d0098c630 src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java Wed Jan 31 17:09:02 2018 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java Thu Feb 01 15:37:38 2018 +0000 @@ -184,11 +184,9 @@ return transport.sendPong(message); } - // TODO: this is the only method that works unconditionally: e.g. even if CF - // completes with an exception, the output will be closed - // Even if arguments are illegal the closure will happen (e.g. a default message) @Override public CompletableFuture sendClose(int statusCode, String reason) { + Objects.requireNonNull(reason); if (!isLegalToSendFromClient(statusCode)) { return failedFuture(new IllegalArgumentException("statusCode")); } @@ -200,8 +198,6 @@ * messages are expected to be sent after this. */ private CompletableFuture sendClose0(int statusCode, String reason ) { - // TODO: MUST be a CF created once and shared across sendClose, otherwise - // a second sendClose may prematurely close the channel outputClosed = true; return transport.sendClose(statusCode, reason) .whenComplete((result, error) -> { diff -r ab28f851bd8c -r a02d0098c630 test/jdk/java/net/httpclient/websocket/WebSocketTest.java --- a/test/jdk/java/net/httpclient/websocket/WebSocketTest.java Wed Jan 31 17:09:02 2018 +0000 +++ b/test/jdk/java/net/httpclient/websocket/WebSocketTest.java Thu Feb 01 15:37:38 2018 +0000 @@ -35,6 +35,8 @@ import java.nio.CharBuffer; import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; @@ -132,6 +134,14 @@ messageReceived.get(10, TimeUnit.SECONDS); fail(); } catch (TimeoutException expected) { } + // TODO: No send operations MUST succeed +// assertCompletesExceptionally(IOE, ws.sendText("text!", false)); +// assertCompletesExceptionally(IOE, ws.sendText("text!", true)); +// assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(16), false)); +// assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(16), true)); +// assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(16))); +// assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(16))); +// assertCompletesExceptionally(IOE, ws.sendClose(NORMAL_CLOSURE, "a reason")); } } @@ -517,4 +527,99 @@ assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(0))); } } + + @Test + public void aggregatingMessages() throws IOException { + + List expected = List.of("alpha", "beta", "gamma", "delta"); + + int[] binary = new int[]{ + 0x81, 0x05, 0x61, 0x6c, 0x70, 0x68, 0x61, // "alpha" + 0x01, 0x02, 0x62, 0x65, // "be + 0x80, 0x02, 0x74, 0x61, // ta" + 0x01, 0x01, 0x67, // "g + 0x00, 0x01, 0x61, // a + 0x00, 0x00, // + 0x00, 0x00, // + 0x00, 0x01, 0x6d, // m + 0x00, 0x01, 0x6d, // m + 0x80, 0x01, 0x61, // a" + 0x8a, 0x00, // + 0x01, 0x04, 0x64, 0x65, 0x6c, 0x74, // "delt + 0x00, 0x01, 0x61, // a + 0x80, 0x00, // " + 0x88, 0x00 // + + }; + CompletableFuture> actual = new CompletableFuture<>(); + + + try (DummyWebSocketServer server = serverWithCannedData(binary)) { + server.open(); + + WebSocket.Listener listener = new WebSocket.Listener() { + + List parts = new ArrayList<>(); + CompletableFuture currentCf; + List collected = new ArrayList<>(); + + @Override + public CompletionStage onText(WebSocket webSocket, + CharSequence message, + WebSocket.MessagePart part) { + switch (part) { + case WHOLE: + CompletableFuture cf = new CompletableFuture<>(); + cf.thenRun(() -> webSocket.request(1)); + processWholeMessage(List.of(message), cf); + return cf; + case FIRST: + parts.add(message); + currentCf = new CompletableFuture<>(); + currentCf.thenRun(() -> webSocket.request(1)); + webSocket.request(1); + break; + case PART: + parts.add(message); + webSocket.request(1); + break; + case LAST: + parts.add(message); + List copy = List.copyOf(parts); + parts.clear(); + CompletableFuture cf1 = currentCf; + currentCf = null; + processWholeMessage(copy, cf1); + return cf1; + } + return currentCf; + } + + @Override + public CompletionStage onClose(WebSocket webSocket, + int statusCode, + String reason) { + actual.complete(collected); + return null; + } + + public void processWholeMessage(List data, + CompletableFuture cf) { + StringBuilder b = new StringBuilder(); + data.forEach(b::append); + String s = b.toString(); + System.out.println(s); + cf.complete(null); + collected.add(s); + } + }; + WebSocket ws = newHttpClient() + .newWebSocketBuilder() + .buildAsync(server.getURI(), listener) + .join(); + + List a = actual.join(); + assertEquals(a, expected); + } + } }