--- a/test/jdk/java/net/httpclient/websocket/WebSocketTest.java Wed Mar 07 15:39:25 2018 +0000
+++ b/test/jdk/java/net/httpclient/websocket/WebSocketTest.java Wed Mar 07 17:16:28 2018 +0000
@@ -24,9 +24,8 @@
/*
* @test
* @build DummyWebSocketServer
- * @run testng/othervm -Djdk.httpclient.HttpClient.log=trace WebSocketTest
+ * @run testng/othervm WebSocketTest
*/
-
import org.testng.annotations.Test;
import java.io.IOException;
@@ -36,12 +35,14 @@
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
import static java.net.http.HttpClient.newHttpClient;
import static java.net.http.WebSocket.NORMAL_CLOSURE;
@@ -58,16 +59,19 @@
private static final Class<IllegalStateException> ISE = IllegalStateException.class;
private static final Class<IOException> IOE = IOException.class;
- @Test
- public void abort() throws Exception {
+// @Test
+ public void immediateAbort() throws Exception {
try (DummyWebSocketServer server = serverWithCannedData(0x81, 0x00, 0x88, 0x00)) {
server.open();
CompletableFuture<Void> messageReceived = new CompletableFuture<>();
WebSocket ws = newHttpClient()
.newWebSocketBuilder()
.buildAsync(server.getURI(), new WebSocket.Listener() {
+
@Override
- public void onOpen(WebSocket webSocket) { /* no initial request */ }
+ public void onOpen(WebSocket webSocket) {
+ /* no initial request */
+ }
@Override
public CompletionStage<?> onText(WebSocket webSocket,
@@ -133,15 +137,17 @@
try {
messageReceived.get(10, TimeUnit.SECONDS);
fail();
- } catch (TimeoutException expected) { }
- // TODO: No send operations MUST succeed
-// assertCompletesExceptionally(IOE, ws.sendText("text!", false));
-// assertCompletesExceptionally(IOE, ws.sendText("text!", true));
-// assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(16), false));
-// assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(16), true));
-// assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(16)));
-// assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(16)));
-// assertCompletesExceptionally(IOE, ws.sendClose(NORMAL_CLOSURE, "a reason"));
+ } catch (TimeoutException expected) {
+ System.out.println("Finished waiting");
+ }
+ assertCompletesExceptionally(IOE, ws.sendText("text!", false));
+ assertCompletesExceptionally(IOE, ws.sendText("text!", true));
+ assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(16), false));
+ assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(16), true));
+ assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(16)));
+ assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(16)));
+ // Checked last because it changes the state of WebSocket
+ assertCompletesExceptionally(IOE, ws.sendClose(NORMAL_CLOSURE, "a reason"));
}
}
@@ -154,15 +160,40 @@
@Override
protected void serve(SocketChannel channel) throws IOException {
ByteBuffer closeMessage = ByteBuffer.wrap(copy);
- int wrote = channel.write(closeMessage);
- System.out.println("Wrote bytes: " + wrote);
+ channel.write(closeMessage);
super.serve(channel);
}
};
}
+ private static void assertCompletesExceptionally(Class<? extends Throwable> clazz,
+ CompletableFuture<?> stage) {
+ stage.handle((result, error) -> {
+ if (error instanceof CompletionException) {
+ Throwable cause = error.getCause();
+ if (cause == null) {
+ throw new AssertionError("Unexpected null cause: " + error);
+ }
+ assertException(clazz, cause);
+ } else {
+ assertException(clazz, error);
+ }
+ return null;
+ }).join();
+ }
+
+ private static void assertException(Class<? extends Throwable> clazz,
+ Throwable t) {
+ if (t == null) {
+ throw new AssertionError("Expected " + clazz + ", caught nothing");
+ }
+ if (!clazz.isInstance(t)) {
+ throw new AssertionError("Expected " + clazz + ", caught " + t);
+ }
+ }
+
@Test
- public void testNull() throws IOException {
+ public void sendMethodsThrowNPE() throws IOException {
try (DummyWebSocketServer server = new DummyWebSocketServer()) {
server.open();
WebSocket ws = newHttpClient()
@@ -177,11 +208,25 @@
assertThrows(NPE, () -> ws.sendPing(null));
assertThrows(NPE, () -> ws.sendPong(null));
assertThrows(NPE, () -> ws.sendClose(NORMAL_CLOSURE, null));
+
+ ws.abort();
+
+ assertThrows(NPE, () -> ws.sendText(null, false));
+ assertThrows(NPE, () -> ws.sendText(null, true));
+ assertThrows(NPE, () -> ws.sendBinary(null, false));
+ assertThrows(NPE, () -> ws.sendBinary(null, true));
+ assertThrows(NPE, () -> ws.sendPing(null));
+ assertThrows(NPE, () -> ws.sendPong(null));
+ assertThrows(NPE, () -> ws.sendClose(NORMAL_CLOSURE, null));
}
}
+ // TODO: request in onClose/onError
+ // TODO: throw exception in onClose/onError
+ // TODO: exception is thrown from request()
+
@Test
- public void testSendClose1() throws IOException {
+ public void sendCloseCompleted() throws IOException {
try (DummyWebSocketServer server = new DummyWebSocketServer()) {
server.open();
WebSocket ws = newHttpClient()
@@ -197,7 +242,7 @@
}
@Test
- public void testSendClose2() throws Exception {
+ public void sendClosePending() throws Exception {
try (DummyWebSocketServer server = notReadingServer()) {
server.open();
WebSocket ws = newHttpClient()
@@ -205,7 +250,7 @@
.buildAsync(server.getURI(), new WebSocket.Listener() { })
.join();
ByteBuffer data = ByteBuffer.allocate(65536);
- for (int i = 0; ; i++) {
+ for (int i = 0; ; i++) { // fill up the send buffer
System.out.println("cycle #" + i);
try {
ws.sendBinary(data, true).get(10, TimeUnit.SECONDS);
@@ -215,12 +260,11 @@
}
}
CompletableFuture<WebSocket> cf = ws.sendClose(NORMAL_CLOSURE, "");
+ // The output closes even if the Close message has not been sent
+ assertFalse(cf.isDone());
assertTrue(ws.isOutputClosed());
assertFalse(ws.isInputClosed());
assertEquals(ws.getSubprotocol(), "");
- // The output closes regardless of whether or not the Close message
- // has been sent
- assertFalse(cf.isDone());
}
}
@@ -242,6 +286,78 @@
};
}
+// @Test
+ public void abortPendingSendBinary() throws Exception {
+ try (DummyWebSocketServer server = notReadingServer()) {
+ server.open();
+ WebSocket ws = newHttpClient()
+ .newWebSocketBuilder()
+ .buildAsync(server.getURI(), new WebSocket.Listener() { })
+ .join();
+ ByteBuffer data = ByteBuffer.allocate(65536);
+ CompletableFuture<WebSocket> cf = null;
+ for (int i = 0; ; i++) { // fill up the send buffer
+ System.out.println("cycle #" + i);
+ try {
+ cf = ws.sendBinary(data, true);
+ cf.get(10, TimeUnit.SECONDS);
+ data.clear();
+ } catch (TimeoutException e) {
+ break;
+ }
+ }
+ ws.abort();
+ assertTrue(ws.isOutputClosed());
+ assertTrue(ws.isInputClosed());
+ assertCompletesExceptionally(IOException.class, cf);
+ }
+ }
+
+// @Test
+ public void abortPendingSendText() throws Exception {
+ try (DummyWebSocketServer server = notReadingServer()) {
+ server.open();
+ WebSocket ws = newHttpClient()
+ .newWebSocketBuilder()
+ .buildAsync(server.getURI(), new WebSocket.Listener() { })
+ .join();
+ String data = stringWith2NBytes(32768);
+ CompletableFuture<WebSocket> cf = null;
+ for (int i = 0; ; i++) { // fill up the send buffer
+ System.out.println("cycle #" + i);
+ try {
+ cf = ws.sendText(data, true);
+ cf.get(10, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ break;
+ }
+ }
+ ws.abort();
+ assertTrue(ws.isOutputClosed());
+ assertTrue(ws.isInputClosed());
+ assertCompletesExceptionally(IOException.class, cf);
+ }
+ }
+
+ private static String stringWith2NBytes(int n) {
+ // -- Russian Alphabet (33 characters, 2 bytes per char) --
+ char[] abc = {
+ 0x0410, 0x0411, 0x0412, 0x0413, 0x0414, 0x0415, 0x0401, 0x0416,
+ 0x0417, 0x0418, 0x0419, 0x041A, 0x041B, 0x041C, 0x041D, 0x041E,
+ 0x041F, 0x0420, 0x0421, 0x0422, 0x0423, 0x0424, 0x0425, 0x0426,
+ 0x0427, 0x0428, 0x0429, 0x042A, 0x042B, 0x042C, 0x042D, 0x042E,
+ 0x042F,
+ };
+ // repeat cyclically
+ StringBuilder sb = new StringBuilder(n);
+ for (int i = 0, j = 0; i < n; i++, j = (j + 1) % abc.length) {
+ sb.append(abc[j]);
+ }
+ String s = sb.toString();
+ assert s.length() == n && s.getBytes(StandardCharsets.UTF_8).length == 2 * n;
+ return s;
+ }
+
@Test
public void testIllegalArgument() throws IOException {
try (DummyWebSocketServer server = new DummyWebSocketServer()) {
@@ -263,10 +379,10 @@
assertCompletesExceptionally(IAE, ws.sendPong(ByteBuffer.allocate(129)));
assertCompletesExceptionally(IAE, ws.sendPong(ByteBuffer.allocate(256)));
- assertCompletesExceptionally(IAE, ws.sendText(incompleteString(), true));
- assertCompletesExceptionally(IAE, ws.sendText(incompleteString(), false));
- assertCompletesExceptionally(IAE, ws.sendText(malformedString(), true));
- assertCompletesExceptionally(IAE, ws.sendText(malformedString(), false));
+ assertCompletesExceptionally(IOE, ws.sendText(incompleteString(), true));
+ assertCompletesExceptionally(IOE, ws.sendText(incompleteString(), false));
+ assertCompletesExceptionally(IOE, ws.sendText(malformedString(), true));
+ assertCompletesExceptionally(IOE, ws.sendText(malformedString(), false));
assertCompletesExceptionally(IAE, ws.sendClose(NORMAL_CLOSURE, stringWithNBytes(124)));
assertCompletesExceptionally(IAE, ws.sendClose(NORMAL_CLOSURE, stringWithNBytes(125)));
@@ -316,58 +432,13 @@
}
private static String stringWithNBytes(int n) {
- StringBuilder sb = new StringBuilder(n);
- for (int i = 0; i < n; i++) {
- sb.append("A");
- }
- return sb.toString();
- }
-
- private static String stringWith2NBytes(int n) {
- // Russian alphabet repeated cyclically
- char FIRST = '\u0410';
- char LAST = '\u042F';
- StringBuilder sb = new StringBuilder(n);
- char c = FIRST;
- for (int i = 0; i < n; i++) {
- if (++c > LAST) {
- c = FIRST;
- }
- sb.append(c);
- }
- String s = sb.toString();
- assert s.length() == n && s.getBytes(StandardCharsets.UTF_8).length == 2 * n;
- return s;
- }
-
- private static void assertCompletesExceptionally(Class<? extends Throwable> clazz,
- CompletableFuture<?> stage) {
- stage.handle((result, error) -> {
- if (error instanceof CompletionException) {
- Throwable cause = error.getCause();
- if (cause == null) {
- throw new AssertionError("Unexpected null cause: " + error);
- }
- assertException(clazz, cause);
- } else {
- assertException(clazz, error);
- }
- return null;
- }).join();
- }
-
- private static void assertException(Class<? extends Throwable> clazz,
- Throwable t) {
- if (t == null) {
- throw new AssertionError("Expected " + clazz + ", caught nothing");
- }
- if (!clazz.isInstance(t)) {
- throw new AssertionError("Expected " + clazz + ", caught " + t);
- }
+ char[] chars = new char[n];
+ Arrays.fill(chars, 'A');
+ return new String(chars);
}
@Test
- public void testIllegalStateOutstanding1() throws Exception {
+ public void outstanding1() throws Exception {
try (DummyWebSocketServer server = notReadingServer()) {
server.open();
WebSocket ws = newHttpClient()
@@ -376,7 +447,7 @@
.join();
ByteBuffer data = ByteBuffer.allocate(65536);
- for (int i = 0; ; i++) {
+ for (int i = 0; ; i++) { // fill up the send buffer
System.out.println("cycle #" + i);
try {
ws.sendBinary(data, true).get(10, TimeUnit.SECONDS);
@@ -391,7 +462,7 @@
}
@Test
- public void testIllegalStateOutstanding2() throws Exception {
+ public void outstanding2() throws Exception {
try (DummyWebSocketServer server = notReadingServer()) {
server.open();
WebSocket ws = newHttpClient()
@@ -400,7 +471,7 @@
.join();
CharBuffer data = CharBuffer.allocate(65536);
- for (int i = 0; ; i++) {
+ for (int i = 0; ; i++) { // fill up the send buffer
System.out.println("cycle #" + i);
try {
ws.sendText(data, true).get(10, TimeUnit.SECONDS);
@@ -415,7 +486,7 @@
}
@Test
- public void testIllegalStateIntermixed1() throws IOException {
+ public void interleavingTypes1() throws IOException {
try (DummyWebSocketServer server = new DummyWebSocketServer()) {
server.open();
WebSocket ws = newHttpClient()
@@ -430,7 +501,7 @@
}
@Test
- public void testIllegalStateIntermixed2() throws IOException {
+ public void interleavingTypes2() throws IOException {
try (DummyWebSocketServer server = new DummyWebSocketServer()) {
server.open();
WebSocket ws = newHttpClient()
@@ -445,7 +516,7 @@
}
@Test
- public void testIllegalStateSendClose() throws IOException {
+ public void sendMethodsThrowIOE1() throws IOException {
try (DummyWebSocketServer server = new DummyWebSocketServer()) {
server.open();
WebSocket ws = newHttpClient()
@@ -453,31 +524,33 @@
.buildAsync(server.getURI(), new WebSocket.Listener() { })
.join();
- ws.sendClose(NORMAL_CLOSURE, "normal close").join();
+ ws.sendClose(NORMAL_CLOSURE, "ok").join();
+
+ assertCompletesExceptionally(IOE, ws.sendClose(WebSocket.NORMAL_CLOSURE, "ok"));
- assertCompletesExceptionally(ISE, ws.sendText("", true));
- assertCompletesExceptionally(ISE, ws.sendText("", false));
- assertCompletesExceptionally(ISE, ws.sendText("abc", true));
- assertCompletesExceptionally(ISE, ws.sendText("abc", false));
- assertCompletesExceptionally(ISE, ws.sendBinary(ByteBuffer.allocate(0), true));
- assertCompletesExceptionally(ISE, ws.sendBinary(ByteBuffer.allocate(0), false));
- assertCompletesExceptionally(ISE, ws.sendBinary(ByteBuffer.allocate(1), true));
- assertCompletesExceptionally(ISE, ws.sendBinary(ByteBuffer.allocate(1), false));
+ assertCompletesExceptionally(IOE, ws.sendText("", true));
+ assertCompletesExceptionally(IOE, ws.sendText("", false));
+ assertCompletesExceptionally(IOE, ws.sendText("abc", true));
+ assertCompletesExceptionally(IOE, ws.sendText("abc", false));
+ assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(0), true));
+ assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(0), false));
+ assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(1), true));
+ assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(1), false));
- assertCompletesExceptionally(ISE, ws.sendPing(ByteBuffer.allocate(125)));
- assertCompletesExceptionally(ISE, ws.sendPing(ByteBuffer.allocate(124)));
- assertCompletesExceptionally(ISE, ws.sendPing(ByteBuffer.allocate(1)));
- assertCompletesExceptionally(ISE, ws.sendPing(ByteBuffer.allocate(0)));
+ assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(125)));
+ assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(124)));
+ assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(1)));
+ assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(0)));
- assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(125)));
- assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(124)));
- assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(1)));
- assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(0)));
+ assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(125)));
+ assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(124)));
+ assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(1)));
+ assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(0)));
}
}
@Test
- public void testIllegalStateOnClose() throws Exception {
+ public void sendMethodsThrowIOE2() throws Exception {
try (DummyWebSocketServer server = serverWithCannedData(0x88, 0x00)) {
server.open();
CompletableFuture<Void> onCloseCalled = new CompletableFuture<>();
@@ -490,7 +563,7 @@
public CompletionStage<?> onClose(WebSocket webSocket,
int statusCode,
String reason) {
- System.out.println("onClose(" + statusCode + ")");
+ System.out.printf("onClose(%s, '%s')%n", statusCode, reason);
onCloseCalled.complete(null);
return canClose;
}
@@ -498,38 +571,143 @@
@Override
public void onError(WebSocket webSocket, Throwable error) {
System.out.println("onError(" + error + ")");
- error.printStackTrace();
+ onCloseCalled.completeExceptionally(error);
}
})
.join();
onCloseCalled.join(); // Wait for onClose to be called
+ canClose.complete(null); // Signal to the WebSocket it can close the output
TimeUnit.SECONDS.sleep(5); // Give canClose some time to reach the WebSocket
- canClose.complete(null); // Signal to the WebSocket it can close the output
+
+ assertCompletesExceptionally(IOE, ws.sendClose(WebSocket.NORMAL_CLOSURE, "ok"));
- assertCompletesExceptionally(ISE, ws.sendText("", true));
- assertCompletesExceptionally(ISE, ws.sendText("", false));
- assertCompletesExceptionally(ISE, ws.sendText("abc", true));
- assertCompletesExceptionally(ISE, ws.sendText("abc", false));
- assertCompletesExceptionally(ISE, ws.sendBinary(ByteBuffer.allocate(0), true));
- assertCompletesExceptionally(ISE, ws.sendBinary(ByteBuffer.allocate(0), false));
- assertCompletesExceptionally(ISE, ws.sendBinary(ByteBuffer.allocate(1), true));
- assertCompletesExceptionally(ISE, ws.sendBinary(ByteBuffer.allocate(1), false));
+ assertCompletesExceptionally(IOE, ws.sendText("", true));
+ assertCompletesExceptionally(IOE, ws.sendText("", false));
+ assertCompletesExceptionally(IOE, ws.sendText("abc", true));
+ assertCompletesExceptionally(IOE, ws.sendText("abc", false));
+ assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(0), true));
+ assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(0), false));
+ assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(1), true));
+ assertCompletesExceptionally(IOE, ws.sendBinary(ByteBuffer.allocate(1), false));
- assertCompletesExceptionally(ISE, ws.sendPing(ByteBuffer.allocate(125)));
- assertCompletesExceptionally(ISE, ws.sendPing(ByteBuffer.allocate(124)));
- assertCompletesExceptionally(ISE, ws.sendPing(ByteBuffer.allocate(1)));
- assertCompletesExceptionally(ISE, ws.sendPing(ByteBuffer.allocate(0)));
+ assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(125)));
+ assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(124)));
+ assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(1)));
+ assertCompletesExceptionally(IOE, ws.sendPing(ByteBuffer.allocate(0)));
- assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(125)));
- assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(124)));
- assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(1)));
- assertCompletesExceptionally(ISE, ws.sendPong(ByteBuffer.allocate(0)));
+ assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(125)));
+ assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(124)));
+ assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(1)));
+ assertCompletesExceptionally(IOE, ws.sendPong(ByteBuffer.allocate(0)));
}
}
@Test
- public void simpleAggregatingMessages() throws IOException {
+ public void simpleAggregatingBinaryMessages() throws IOException {
+ List<byte[]> expected = List.of("alpha", "beta", "gamma", "delta")
+ .stream()
+ .map(s -> s.getBytes(StandardCharsets.US_ASCII))
+ .collect(Collectors.toList());
+ int[] binary = new int[]{
+ 0x82, 0x05, 0x61, 0x6c, 0x70, 0x68, 0x61, // [alpha]
+ 0x02, 0x02, 0x62, 0x65, // [be
+ 0x80, 0x02, 0x74, 0x61, // ta]
+ 0x02, 0x01, 0x67, // [g
+ 0x00, 0x01, 0x61, // a
+ 0x00, 0x00, //
+ 0x00, 0x00, //
+ 0x00, 0x01, 0x6d, // m
+ 0x00, 0x01, 0x6d, // m
+ 0x80, 0x01, 0x61, // a]
+ 0x8a, 0x00, // <PONG>
+ 0x02, 0x04, 0x64, 0x65, 0x6c, 0x74, // [delt
+ 0x00, 0x01, 0x61, // a
+ 0x80, 0x00, // ]
+ 0x88, 0x00 // <CLOSE>
+ };
+ CompletableFuture<List<byte[]>> actual = new CompletableFuture<>();
+
+ try (DummyWebSocketServer server = serverWithCannedData(binary)) {
+ server.open();
+
+ WebSocket.Listener listener = new WebSocket.Listener() {
+
+ List<byte[]> collectedBytes = new ArrayList<>();
+ ByteBuffer binary;
+
+ @Override
+ public CompletionStage<?> onBinary(WebSocket webSocket,
+ ByteBuffer message,
+ WebSocket.MessagePart part) {
+ System.out.printf("onBinary(%s, %s)%n", message, part);
+ webSocket.request(1);
+ byte[] bytes = null;
+ switch (part) {
+ case FIRST:
+ binary = ByteBuffer.allocate(message.remaining() * 2);
+ case PART:
+ append(message);
+ return null;
+ case LAST:
+ append(message);
+ binary.flip();
+ bytes = new byte[binary.remaining()];
+ binary.get(bytes);
+ binary.clear();
+ break;
+ case WHOLE:
+ bytes = new byte[message.remaining()];
+ message.get(bytes);
+ break;
+ }
+ processWholeBinary(bytes);
+ return null;
+ }
+
+ private void append(ByteBuffer message) {
+ if (binary.remaining() < message.remaining()) {
+ assert message.remaining() > 0;
+ int cap = (binary.capacity() + message.remaining()) * 2;
+ ByteBuffer b = ByteBuffer.allocate(cap);
+ b.put(binary.flip());
+ binary = b;
+ }
+ binary.put(message);
+ }
+
+ private void processWholeBinary(byte[] bytes) {
+ String stringBytes = new String(bytes, StandardCharsets.UTF_8);
+ System.out.println("processWholeBinary: " + stringBytes);
+ collectedBytes.add(bytes);
+ }
+
+ @Override
+ public CompletionStage<?> onClose(WebSocket webSocket,
+ int statusCode,
+ String reason) {
+ actual.complete(collectedBytes);
+ return null;
+ }
+
+ @Override
+ public void onError(WebSocket webSocket, Throwable error) {
+ actual.completeExceptionally(error);
+ }
+ };
+
+ newHttpClient().newWebSocketBuilder()
+ .buildAsync(server.getURI(), listener)
+ .join();
+
+ List<byte[]> a = actual.join();
+ System.out.println("joined");
+ assertEquals(a, expected);
+ }
+ }
+
+ @Test
+ public void simpleAggregatingTextMessages() throws IOException {
List<String> expected = List.of("alpha", "beta", "gamma", "delta");
@@ -557,24 +735,25 @@
WebSocket.Listener listener = new WebSocket.Listener() {
- List<String> collected = new ArrayList<>();
- StringBuilder text = new StringBuilder();
+ List<String> collectedStrings = new ArrayList<>();
+ StringBuilder text;
@Override
public CompletionStage<?> onText(WebSocket webSocket,
CharSequence message,
WebSocket.MessagePart part) {
+ System.out.printf("onText(%s, %s)%n", message, part);
webSocket.request(1);
String str = null;
switch (part) {
case FIRST:
+ text = new StringBuilder(message.length() * 2);
case PART:
text.append(message);
return null;
case LAST:
text.append(message);
str = text.toString();
- text.setLength(0);
break;
case WHOLE:
str = message.toString();
@@ -586,30 +765,38 @@
private void processWholeText(String string) {
System.out.println(string);
- // -- your code here --
- collected.add(string);
+ collectedStrings.add(string);
}
@Override
public CompletionStage<?> onClose(WebSocket webSocket,
int statusCode,
String reason) {
- actual.complete(collected);
+ actual.complete(collectedStrings);
return null;
}
+
+ @Override
+ public void onError(WebSocket webSocket, Throwable error) {
+ actual.completeExceptionally(error);
+ }
};
newHttpClient().newWebSocketBuilder()
- .buildAsync(server.getURI(), listener)
- .join();
+ .buildAsync(server.getURI(), listener)
+ .join();
List<String> a = actual.join();
assertEquals(a, expected);
}
}
+ /*
+ * Exercises the scenario where requests for more messages are made prior to
+ * completing the returned CompletionStage instances.
+ */
@Test
- public void aggregatingMessages() throws IOException {
+ public void aggregatingTextMessages() throws IOException {
List<String> expected = List.of("alpha", "beta", "gamma", "delta");
@@ -638,7 +825,12 @@
WebSocket.Listener listener = new WebSocket.Listener() {
- List<CharSequence> parts = new ArrayList<>();
+ List<CharSequence> parts;
+ /*
+ * A CompletableFuture which will complete once the current
+ * message has been fully assembled (LAST/WHOLE). Until then
+ * the listener returns this instance for every call.
+ */
CompletableFuture<?> currentCf;
List<String> collected = new ArrayList<>();
@@ -653,6 +845,7 @@
processWholeMessage(List.of(message), cf);
return cf;
case FIRST:
+ parts = new ArrayList<>();
parts.add(message);
currentCf = new CompletableFuture<>();
currentCf.thenRun(() -> webSocket.request(1));
@@ -664,12 +857,11 @@
break;
case LAST:
parts.add(message);
- List<CharSequence> copy = List.copyOf(parts);
- parts.clear();
- CompletableFuture<?> cf1 = currentCf;
+ CompletableFuture<?> copyCf = this.currentCf;
+ processWholeMessage(parts, copyCf);
currentCf = null;
- processWholeMessage(copy, cf1);
- return cf1;
+ parts = null;
+ return copyCf;
}
return currentCf;
}
@@ -682,6 +874,11 @@
return null;
}
+ @Override
+ public void onError(WebSocket webSocket, Throwable error) {
+ actual.completeExceptionally(error);
+ }
+
public void processWholeMessage(List<CharSequence> data,
CompletableFuture<?> cf) {
StringBuilder b = new StringBuilder();
@@ -692,10 +889,10 @@
collected.add(s);
}
};
- WebSocket ws = newHttpClient()
- .newWebSocketBuilder()
- .buildAsync(server.getURI(), listener)
- .join();
+
+ newHttpClient().newWebSocketBuilder()
+ .buildAsync(server.getURI(), listener)
+ .join();
List<String> a = actual.join();
assertEquals(a, expected);