--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java Wed Jan 31 17:09:02 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java Thu Feb 01 15:37:38 2018 +0000
@@ -184,11 +184,9 @@
return transport.sendPong(message);
}
- // TODO: this is the only method that works unconditionally: e.g. even if CF
- // completes with an exception, the output will be closed
- // Even if arguments are illegal the closure will happen (e.g. a default message)
@Override
public CompletableFuture<WebSocket> sendClose(int statusCode, String reason) {
+ Objects.requireNonNull(reason);
if (!isLegalToSendFromClient(statusCode)) {
return failedFuture(new IllegalArgumentException("statusCode"));
}
@@ -200,8 +198,6 @@
* messages are expected to be sent after this.
*/
private CompletableFuture<WebSocket> sendClose0(int statusCode, String reason ) {
- // TODO: MUST be a CF created once and shared across sendClose, otherwise
- // a second sendClose may prematurely close the channel
outputClosed = true;
return transport.sendClose(statusCode, reason)
.whenComplete((result, error) -> {
--- a/test/jdk/java/net/httpclient/websocket/WebSocketTest.java Wed Jan 31 17:09:02 2018 +0000
+++ b/test/jdk/java/net/httpclient/websocket/WebSocketTest.java Thu Feb 01 15:37:38 2018 +0000
@@ -35,6 +35,8 @@
import java.nio.CharBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
@@ -132,6 +134,14 @@
messageReceived.get(10, TimeUnit.SECONDS);
fail();
} catch (TimeoutException expected) { }
+ // TODO: No send operations MUST succeed
+// assertCompletesExceptionally(IOE, ws.sendText("text!", false));
+// assertCompletesExceptionally(IOE, ws.sendText("text!", true));
+// assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(16), false));
+// assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(16), true));
+// assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(16)));
+// assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(16)));
+// assertCompletesExceptionally(IOE, ws.sendClose(NORMAL_CLOSURE, "a reason"));
}
}
@@ -517,4 +527,99 @@
assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(0)));
}
}
+
+ @Test
+ public void aggregatingMessages() throws IOException {
+
+ List<String> expected = List.of("alpha", "beta", "gamma", "delta");
+
+ int[] binary = new int[]{
+ 0x81, 0x05, 0x61, 0x6c, 0x70, 0x68, 0x61, // "alpha"
+ 0x01, 0x02, 0x62, 0x65, // "be
+ 0x80, 0x02, 0x74, 0x61, // ta"
+ 0x01, 0x01, 0x67, // "g
+ 0x00, 0x01, 0x61, // a
+ 0x00, 0x00, //
+ 0x00, 0x00, //
+ 0x00, 0x01, 0x6d, // m
+ 0x00, 0x01, 0x6d, // m
+ 0x80, 0x01, 0x61, // a"
+ 0x8a, 0x00, // <PONG>
+ 0x01, 0x04, 0x64, 0x65, 0x6c, 0x74, // "delt
+ 0x00, 0x01, 0x61, // a
+ 0x80, 0x00, // "
+ 0x88, 0x00 // <CLOSE>
+
+ };
+ CompletableFuture<List<String>> actual = new CompletableFuture<>();
+
+
+ try (DummyWebSocketServer server = serverWithCannedData(binary)) {
+ server.open();
+
+ WebSocket.Listener listener = new WebSocket.Listener() {
+
+ List<CharSequence> parts = new ArrayList<>();
+ CompletableFuture<?> currentCf;
+ List<String> collected = new ArrayList<>();
+
+ @Override
+ public CompletionStage<?> onText(WebSocket webSocket,
+ CharSequence message,
+ WebSocket.MessagePart part) {
+ switch (part) {
+ case WHOLE:
+ CompletableFuture<?> cf = new CompletableFuture<>();
+ cf.thenRun(() -> webSocket.request(1));
+ processWholeMessage(List.of(message), cf);
+ return cf;
+ case FIRST:
+ parts.add(message);
+ currentCf = new CompletableFuture<>();
+ currentCf.thenRun(() -> webSocket.request(1));
+ webSocket.request(1);
+ break;
+ case PART:
+ parts.add(message);
+ webSocket.request(1);
+ break;
+ case LAST:
+ parts.add(message);
+ List<CharSequence> copy = List.copyOf(parts);
+ parts.clear();
+ CompletableFuture<?> cf1 = currentCf;
+ currentCf = null;
+ processWholeMessage(copy, cf1);
+ return cf1;
+ }
+ return currentCf;
+ }
+
+ @Override
+ public CompletionStage<?> onClose(WebSocket webSocket,
+ int statusCode,
+ String reason) {
+ actual.complete(collected);
+ return null;
+ }
+
+ public void processWholeMessage(List<CharSequence> data,
+ CompletableFuture<?> cf) {
+ StringBuilder b = new StringBuilder();
+ data.forEach(b::append);
+ String s = b.toString();
+ System.out.println(s);
+ cf.complete(null);
+ collected.add(s);
+ }
+ };
+ WebSocket ws = newHttpClient()
+ .newWebSocketBuilder()
+ .buildAsync(server.getURI(), listener)
+ .join();
+
+ List<String> a = actual.join();
+ assertEquals(a, expected);
+ }
+ }
}