The {@code CompletableFuture} returned from this method can
* complete exceptionally with:
*
The {@code CompletableFuture} returned from this method can
* complete exceptionally with:
*
sendPong(ByteBuffer message);
/**
- * Sends a Close message with the given status code and the reason,
- * initiating an orderly closure.
+ * Initiates an orderly closure of this {@code WebSocket}'s output by
+ * sending a Close message with the given status code and the reason.
*
* The {@code statusCode} is an integer from the range
* {@code 1000 <= code <= 4999}. Status codes {@code 1002}, {@code 1003},
@@ -623,29 +637,33 @@
* status codes is implementation-specific. The {@code reason} is a string
* that has an UTF-8 representation not longer than {@code 123} bytes.
*
- *
Use the provided integer constant {@link #NORMAL_CLOSURE} as a status
- * code and an empty string as a reason in a typical case.
- *
*
A {@code CompletableFuture} returned from this method can
* complete exceptionally with:
*
* - {@link IllegalArgumentException} -
* if {@code statusCode} is illegal
*
- {@link IOException} -
- * if an I/O error occurs
+ * if an I/O error occurs, or if the output is closed
*
*
- * By the time the {@code CompletableFuture} returned from this method
- * completes normally, the output will have been closed.
+ *
Unless the {@code CompletableFuture} returned from this method
+ * completes with {@code IllegalArgumentException}, or the method throws
+ * {@code NullPointerException}, the output will be closed.
+ *
+ *
If not already closed, the input remains open until it is
+ * {@linkplain Listener#onClose(WebSocket, int, String) closed} by the server,
+ * or {@code abort} is invoked, or an
+ * {@linkplain Listener#onError(WebSocket, Throwable) error} occurs.
*
- * @implSpec An endpoint sending a Close message might not receive a
- * complementing Close message in a timely manner for a variety of reasons.
- * The {@code WebSocket} implementation is responsible for providing a
- * closure mechanism that guarantees that once {@code sendClose} method has
- * been invoked the {@code WebSocket} will close regardless of whether or
- * not a Close frame has been received and without further intervention from
- * the user of this API. Method {@code sendClose} is designed to be,
- * possibly, the last call from the user of this API.
+ * @apiNote Use the provided integer constant {@link #NORMAL_CLOSURE} as a
+ * status code and an empty string as a reason in a typical case
+ *
{@code
+ * CompletableFuture webSocket = ...
+ * webSocket.thenCompose(ws -> ws.sendText("Hello, ", false))
+ * .thenCompose(ws -> ws.sendText("world!", true))
+ * .thenCompose(ws -> ws.sendClose(WebSocket.NORMAL_CLOSURE, ""))
+ * .join();
+ * }
*
* @param statusCode
* the status code
@@ -681,8 +699,7 @@
String getSubprotocol();
/**
- * Tells whether or not this {@code WebSocket} is permanently closed
- * for sending messages.
+ * Tells whether this {@code WebSocket}'s output is closed.
*
* If this method returns {@code true}, subsequent invocations will also
* return {@code true}.
@@ -692,8 +709,7 @@
boolean isOutputClosed();
/**
- * Tells whether or not this {@code WebSocket} is permanently closed
- * for receiving messages.
+ * Tells whether this {@code WebSocket}'s input is closed.
*
*
If this method returns {@code true}, subsequent invocations will also
* return {@code true}.
@@ -703,15 +719,11 @@
boolean isInputClosed();
/**
- * Closes this {@code WebSocket} abruptly.
+ * Closes this {@code WebSocket}'s input and output abruptly.
*
*
When this method returns both the input and the output will have been
- * closed. Subsequent invocations will have no effect.
- *
- * @apiNote Depending on its implementation, the state (for example, whether
- * or not a message is being transferred at the moment) and possible errors
- * while releasing associated resources, this {@code WebSocket} may invoke
- * its listener's {@code onError}.
+ * closed. Any pending send operations will fail with {@code IOException}.
+ * Subsequent invocations of {@code abort()} will have no effect.
*/
void abort();
}
diff -r 52a9f6b74e43 -r 2a96e88888b2 src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java Sat Mar 17 18:01:01 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java Sun Mar 18 14:31:36 2018 +0000
@@ -115,7 +115,8 @@
private final String subprotocol;
private final Listener listener;
- private final AtomicBoolean outstandingSend = new AtomicBoolean();
+ private final AtomicBoolean pendingTextOrBinary = new AtomicBoolean();
+ private final AtomicBoolean pendingPingOrPong = new AtomicBoolean();
private final Transport transport;
private final SequentialScheduler receiveScheduler
= new SequentialScheduler(new ReceiveTask());
@@ -198,11 +199,11 @@
id, message.length(), isLast);
}
CompletableFuture result;
- if (!outstandingSend.compareAndSet(false, true)) {
+ if (!setPendingTextOrBinary()) {
result = failedFuture(new IllegalStateException("Send pending"));
} else {
result = transport.sendText(message, isLast, this,
- (r, e) -> outstandingSend.set(false));
+ (r, e) -> clearPendingTextOrBinary());
}
debug.log(Level.DEBUG, "exit send text %s returned %s", id, result);
@@ -220,16 +221,24 @@
id, message, isLast);
}
CompletableFuture result;
- if (!outstandingSend.compareAndSet(false, true)) {
+ if (!setPendingTextOrBinary()) {
result = failedFuture(new IllegalStateException("Send pending"));
} else {
result = transport.sendBinary(message, isLast, this,
- (r, e) -> outstandingSend.set(false));
+ (r, e) -> clearPendingTextOrBinary());
}
debug.log(Level.DEBUG, "exit send binary %s returned %s", id, result);
return replaceNull(result);
}
+ private void clearPendingTextOrBinary() {
+ pendingTextOrBinary.set(false);
+ }
+
+ private boolean setPendingTextOrBinary() {
+ return pendingTextOrBinary.compareAndSet(false, true);
+ }
+
private CompletableFuture replaceNull(
CompletableFuture cf)
{
@@ -248,8 +257,13 @@
id = sendCounter.incrementAndGet();
debug.log(Level.DEBUG, "enter send ping %s payload=%s", id, message);
}
- CompletableFuture result = transport.sendPing(message, this,
- (r, e) -> { });
+ CompletableFuture result;
+ if (!setPendingPingOrPong()) {
+ result = failedFuture(new IllegalStateException("Send pending"));
+ } else {
+ result = transport.sendPing(message, this,
+ (r, e) -> clearPendingPingOrPong());
+ }
debug.log(Level.DEBUG, "exit send ping %s returned %s", id, result);
return replaceNull(result);
}
@@ -262,12 +276,25 @@
id = sendCounter.incrementAndGet();
debug.log(Level.DEBUG, "enter send pong %s payload=%s", id, message);
}
- CompletableFuture result = transport.sendPong(message, this,
- (r, e) -> { });
+ CompletableFuture result;
+ if (!setPendingPingOrPong()) {
+ result = failedFuture(new IllegalStateException("Send pending"));
+ } else {
+ result = transport.sendPong(message, this,
+ (r, e) -> clearPendingPingOrPong());
+ }
debug.log(Level.DEBUG, "exit send pong %s returned %s", id, result);
return replaceNull(result);
}
+ private boolean setPendingPingOrPong() {
+ return pendingPingOrPong.compareAndSet(false, true);
+ }
+
+ private void clearPendingPingOrPong() {
+ pendingPingOrPong.set(false);
+ }
+
@Override
public CompletableFuture sendClose(int statusCode,
String reason) {
diff -r 52a9f6b74e43 -r 2a96e88888b2 test/jdk/java/net/httpclient/websocket/ImmediateAbort.java
--- a/test/jdk/java/net/httpclient/websocket/ImmediateAbort.java Sat Mar 17 18:01:01 2018 +0000
+++ b/test/jdk/java/net/httpclient/websocket/ImmediateAbort.java Sun Mar 18 14:31:36 2018 +0000
@@ -29,16 +29,16 @@
* ImmediateAbort
*/
+import org.testng.annotations.Test;
+
import java.io.IOException;
import java.net.http.WebSocket;
import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.testng.annotations.Test;
+
import static java.net.http.HttpClient.newHttpClient;
import static java.net.http.WebSocket.NORMAL_CLOSURE;
import static org.testng.Assert.assertEquals;
@@ -57,7 +57,7 @@
*/
@Test
public void immediateAbort() throws Exception {
- try (DummyWebSocketServer server = serverWithCannedData(0x81, 0x00, 0x88, 0x00)) {
+ try (DummyWebSocketServer server = Support.serverWithCannedData(0x81, 0x00, 0x88, 0x00)) {
server.open();
CompletableFuture messageReceived = new CompletableFuture<>();
WebSocket.Listener listener = new WebSocket.Listener() {
@@ -113,8 +113,8 @@
for (int i = 0; i < 3; i++) {
System.out.printf("iteration #%s%n", i);
// after the first abort() each consecutive one must be a no-op,
- // moreover, query methods should continue to return consistent,
- // permanent values
+ // moreover, query methods should continue to return consistent
+ // values
for (int j = 0; j < 3; j++) {
System.out.printf("abort #%s%n", j);
ws.abort();
@@ -148,13 +148,13 @@
}
for (int i = 0; i < 3; i++) {
System.out.printf("send #%s%n", i);
- assertFails(IOE, ws.sendText("text!", false));
- assertFails(IOE, ws.sendText("text!", true));
- assertFails(IOE, ws.sendBinary(ByteBuffer.allocate(16), false));
- assertFails(IOE, ws.sendBinary(ByteBuffer.allocate(16), true));
- assertFails(IOE, ws.sendPing(ByteBuffer.allocate(16)));
- assertFails(IOE, ws.sendPong(ByteBuffer.allocate(16)));
- assertFails(IOE, ws.sendClose(NORMAL_CLOSURE, "a reason"));
+ Support.assertFails(IOE, ws.sendText("text!", false));
+ Support.assertFails(IOE, ws.sendText("text!", true));
+ Support.assertFails(IOE, ws.sendBinary(ByteBuffer.allocate(16), false));
+ Support.assertFails(IOE, ws.sendBinary(ByteBuffer.allocate(16), true));
+ Support.assertFails(IOE, ws.sendPing(ByteBuffer.allocate(16)));
+ Support.assertFails(IOE, ws.sendPong(ByteBuffer.allocate(16)));
+ Support.assertFails(IOE, ws.sendClose(NORMAL_CLOSURE, "a reason"));
assertThrows(NPE, () -> ws.sendText(null, false));
assertThrows(NPE, () -> ws.sendText(null, true));
assertThrows(NPE, () -> ws.sendBinary(null, false));
@@ -165,29 +165,4 @@
}
}
}
-
- private static void assertFails(Class extends Throwable> clazz,
- CompletionStage> stage) {
- Support.assertCompletesExceptionally(clazz, stage);
- }
-
- private static DummyWebSocketServer serverWithCannedData(int... data) {
- byte[] copy = new byte[data.length];
- for (int i = 0; i < data.length; i++) {
- copy[i] = (byte) data[i];
- }
- return serverWithCannedData(copy);
- }
-
- private static DummyWebSocketServer serverWithCannedData(byte... data) {
- byte[] copy = Arrays.copyOf(data, data.length);
- return new DummyWebSocketServer() {
- @Override
- protected void serve(SocketChannel channel) throws IOException {
- ByteBuffer closeMessage = ByteBuffer.wrap(copy);
- channel.write(closeMessage);
- super.serve(channel);
- }
- };
- }
}
diff -r 52a9f6b74e43 -r 2a96e88888b2 test/jdk/java/net/httpclient/websocket/SendTest.java
--- a/test/jdk/java/net/httpclient/websocket/SendTest.java Sat Mar 17 18:01:01 2018 +0000
+++ b/test/jdk/java/net/httpclient/websocket/SendTest.java Sun Mar 18 14:31:36 2018 +0000
@@ -29,17 +29,15 @@
* SendTest
*/
+import org.testng.annotations.Test;
+
import java.io.IOException;
import java.net.http.WebSocket;
import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.testng.annotations.Test;
+
import static java.net.http.HttpClient.newHttpClient;
import static java.net.http.WebSocket.NORMAL_CLOSURE;
import static org.testng.Assert.assertEquals;
@@ -51,24 +49,6 @@
private static final Class NPE = NullPointerException.class;
- /* shortcut */
- private static void assertFails(Class extends Throwable> clazz,
- CompletionStage> stage) {
- Support.assertCompletesExceptionally(clazz, stage);
- }
-
- private static DummyWebSocketServer serverWithCannedData(byte... data) {
- byte[] copy = Arrays.copyOf(data, data.length);
- return new DummyWebSocketServer() {
- @Override
- protected void serve(SocketChannel channel) throws IOException {
- ByteBuffer closeMessage = ByteBuffer.wrap(copy);
- channel.write(closeMessage);
- super.serve(channel);
- }
- };
- }
-
@Test
public void sendMethodsThrowNPE() throws IOException {
try (DummyWebSocketServer server = new DummyWebSocketServer()) {
@@ -119,7 +99,7 @@
@Test
public void sendClosePending() throws Exception {
- try (DummyWebSocketServer server = notReadingServer()) {
+ try (DummyWebSocketServer server = Support.notReadingServer()) {
server.open();
WebSocket ws = newHttpClient()
.newWebSocketBuilder()
@@ -151,27 +131,9 @@
}
}
- /*
- * This server does not read from the wire, allowing its client to fill up
- * their send buffer. Used to test scenarios with outstanding send
- * operations.
- */
- private static DummyWebSocketServer notReadingServer() {
- return new DummyWebSocketServer() {
- @Override
- protected void serve(SocketChannel channel) throws IOException {
- try {
- Thread.sleep(Long.MAX_VALUE);
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
- };
- }
-
@Test
public void abortPendingSendBinary() throws Exception {
- try (DummyWebSocketServer server = notReadingServer()) {
+ try (DummyWebSocketServer server = Support.notReadingServer()) {
server.open();
WebSocket ws = newHttpClient()
.newWebSocketBuilder()
@@ -196,19 +158,19 @@
ws.abort();
assertTrue(ws.isOutputClosed());
assertTrue(ws.isInputClosed());
- assertFails(IOException.class, cf);
+ Support.assertFails(IOException.class, cf);
}
}
@Test
public void abortPendingSendText() throws Exception {
- try (DummyWebSocketServer server = notReadingServer()) {
+ try (DummyWebSocketServer server = Support.notReadingServer()) {
server.open();
WebSocket ws = newHttpClient()
.newWebSocketBuilder()
.buildAsync(server.getURI(), new WebSocket.Listener() { })
.join();
- String data = stringWith2NBytes(32768);
+ String data = Support.stringWith2NBytes(32768);
CompletableFuture cf = null;
for (int i = 0; ; i++) { // fill up the send buffer
System.out.printf("begin cycle #%s at %s%n",
@@ -226,38 +188,19 @@
ws.abort();
assertTrue(ws.isOutputClosed());
assertTrue(ws.isInputClosed());
- assertFails(IOException.class, cf);
+ Support.assertFails(IOException.class, cf);
}
}
- private static String stringWith2NBytes(int n) {
- // -- Russian Alphabet (33 characters, 2 bytes per char) --
- char[] abc = {
- 0x0410, 0x0411, 0x0412, 0x0413, 0x0414, 0x0415, 0x0401, 0x0416,
- 0x0417, 0x0418, 0x0419, 0x041A, 0x041B, 0x041C, 0x041D, 0x041E,
- 0x041F, 0x0420, 0x0421, 0x0422, 0x0423, 0x0424, 0x0425, 0x0426,
- 0x0427, 0x0428, 0x0429, 0x042A, 0x042B, 0x042C, 0x042D, 0x042E,
- 0x042F,
- };
- // repeat cyclically
- StringBuilder sb = new StringBuilder(n);
- for (int i = 0, j = 0; i < n; i++, j = (j + 1) % abc.length) {
- sb.append(abc[j]);
- }
- String s = sb.toString();
- assert s.length() == n && s.getBytes(StandardCharsets.UTF_8).length == 2 * n;
- return s;
- }
-
@Test
public void sendCloseTimeout() throws Exception {
- try (DummyWebSocketServer server = notReadingServer()) {
+ try (DummyWebSocketServer server = Support.notReadingServer()) {
server.open();
WebSocket ws = newHttpClient()
.newWebSocketBuilder()
.buildAsync(server.getURI(), new WebSocket.Listener() { })
.join();
- String data = stringWith2NBytes(32768);
+ String data = Support.stringWith2NBytes(32768);
CompletableFuture cf = null;
for (int i = 0; ; i++) { // fill up the send buffer
System.out.printf("begin cycle #%s at %s%n",
@@ -273,7 +216,7 @@
}
}
long before = System.currentTimeMillis();
- assertFails(IOException.class,
+ Support.assertFails(IOException.class,
ws.sendClose(WebSocket.NORMAL_CLOSURE, "ok"));
long after = System.currentTimeMillis();
// default timeout should be 30 seconds
@@ -282,7 +225,7 @@
assertTrue(elapsed >= 29_000, String.valueOf(elapsed));
assertTrue(ws.isOutputClosed());
assertTrue(ws.isInputClosed());
- assertFails(IOException.class, cf);
+ Support.assertFails(IOException.class, cf);
}
}
}
diff -r 52a9f6b74e43 -r 2a96e88888b2 test/jdk/java/net/httpclient/websocket/Support.java
--- a/test/jdk/java/net/httpclient/websocket/Support.java Sat Mar 17 18:01:01 2018 +0000
+++ b/test/jdk/java/net/httpclient/websocket/Support.java Sun Mar 18 14:31:36 2018 +0000
@@ -21,10 +21,16 @@
* questions.
*/
+import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static org.testng.Assert.assertThrows;
@@ -32,6 +38,11 @@
private Support() { }
+ public static void assertFails(Class extends Throwable> clazz,
+ CompletionStage> stage) {
+ Support.assertCompletesExceptionally(clazz, stage);
+ }
+
public static void assertCompletesExceptionally(Class extends Throwable> clazz,
CompletionStage> stage) {
CompletableFuture> cf =
@@ -45,6 +56,18 @@
});
}
+ public static void assertHangs(CompletionStage> stage) {
+ Support.assertDoesNotCompleteWithin(5, TimeUnit.SECONDS, stage);
+ }
+
+ public static void assertDoesNotCompleteWithin(long timeout,
+ TimeUnit unit,
+ CompletionStage> stage) {
+ CompletableFuture> cf =
+ CompletableFuture.completedFuture(null).thenCompose(x -> stage);
+ assertThrows(TimeoutException.class, () -> cf.get(timeout, unit));
+ }
+
public static ByteBuffer fullCopy(ByteBuffer src) {
ByteBuffer copy = ByteBuffer.allocate(src.capacity());
int p = src.position();
@@ -54,4 +77,75 @@
src.position(p).limit(l);
return copy;
}
+
+ public static DummyWebSocketServer serverWithCannedData(int... data) {
+ byte[] copy = new byte[data.length];
+ for (int i = 0; i < data.length; i++) {
+ copy[i] = (byte) data[i];
+ }
+ return serverWithCannedData(copy);
+ }
+
+ public static DummyWebSocketServer serverWithCannedData(byte... data) {
+ byte[] copy = Arrays.copyOf(data, data.length);
+ return new DummyWebSocketServer() {
+ @Override
+ protected void serve(SocketChannel channel) throws IOException {
+ ByteBuffer closeMessage = ByteBuffer.wrap(copy);
+ channel.write(closeMessage);
+ super.serve(channel);
+ }
+ };
+ }
+
+ /*
+ * This server does not read from the wire, allowing its client to fill up
+ * their send buffer. Used to test scenarios with outstanding send
+ * operations.
+ */
+ public static DummyWebSocketServer notReadingServer() {
+ return new DummyWebSocketServer() {
+ @Override
+ protected void serve(SocketChannel channel) throws IOException {
+ try {
+ Thread.sleep(Long.MAX_VALUE);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+ };
+ }
+
+ public static String stringWith2NBytes(int n) {
+ // -- Russian Alphabet (33 characters, 2 bytes per char) --
+ char[] abc = {
+ 0x0410, 0x0411, 0x0412, 0x0413, 0x0414, 0x0415, 0x0401, 0x0416,
+ 0x0417, 0x0418, 0x0419, 0x041A, 0x041B, 0x041C, 0x041D, 0x041E,
+ 0x041F, 0x0420, 0x0421, 0x0422, 0x0423, 0x0424, 0x0425, 0x0426,
+ 0x0427, 0x0428, 0x0429, 0x042A, 0x042B, 0x042C, 0x042D, 0x042E,
+ 0x042F,
+ };
+ // repeat cyclically
+ StringBuilder sb = new StringBuilder(n);
+ for (int i = 0, j = 0; i < n; i++, j = (j + 1) % abc.length) {
+ sb.append(abc[j]);
+ }
+ String s = sb.toString();
+ assert s.length() == n && s.getBytes(StandardCharsets.UTF_8).length == 2 * n;
+ return s;
+ }
+
+ public static String malformedString() {
+ return new String(new char[]{0xDC00, 0xD800});
+ }
+
+ public static String incompleteString() {
+ return new String(new char[]{0xD800});
+ }
+
+ public static String stringWithNBytes(int n) {
+ char[] chars = new char[n];
+ Arrays.fill(chars, 'A');
+ return new String(chars);
+ }
}
diff -r 52a9f6b74e43 -r 2a96e88888b2 test/jdk/java/net/httpclient/websocket/WebSocketTest.java
--- a/test/jdk/java/net/httpclient/websocket/WebSocketTest.java Sat Mar 17 18:01:01 2018 +0000
+++ b/test/jdk/java/net/httpclient/websocket/WebSocketTest.java Sun Mar 18 14:31:36 2018 +0000
@@ -24,27 +24,27 @@
/*
* @test
* @build DummyWebSocketServer
- * @run testng/othervm
+ * @run testng/othervm/timeout=600
* -Djdk.internal.httpclient.websocket.debug=true
* WebSocketTest
*/
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
import java.io.IOException;
import java.net.http.WebSocket;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
-import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
+
import static java.net.http.HttpClient.newHttpClient;
import static java.net.http.WebSocket.NORMAL_CLOSURE;
import static org.testng.Assert.assertEquals;
@@ -65,65 +65,8 @@
Support.assertCompletesExceptionally(clazz, stage);
}
- private static DummyWebSocketServer serverWithCannedData(int... data) {
- byte[] copy = new byte[data.length];
- for (int i = 0; i < data.length; i++) {
- copy[i] = (byte) data[i];
- }
- return serverWithCannedData(copy);
- }
-
- private static DummyWebSocketServer serverWithCannedData(byte... data) {
- byte[] copy = Arrays.copyOf(data, data.length);
- return new DummyWebSocketServer() {
- @Override
- protected void serve(SocketChannel channel) throws IOException {
- ByteBuffer closeMessage = ByteBuffer.wrap(copy);
- channel.write(closeMessage);
- super.serve(channel);
- }
- };
- }
-
- /*
- * This server does not read from the wire, allowing its client to fill up
- * their send buffer. Used to test scenarios with outstanding send
- * operations.
- */
- private static DummyWebSocketServer notReadingServer() {
- return new DummyWebSocketServer() {
- @Override
- protected void serve(SocketChannel channel) throws IOException {
- try {
- Thread.sleep(Long.MAX_VALUE);
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
- };
- }
-
- private static String stringWith2NBytes(int n) {
- // -- Russian Alphabet (33 characters, 2 bytes per char) --
- char[] abc = {
- 0x0410, 0x0411, 0x0412, 0x0413, 0x0414, 0x0415, 0x0401, 0x0416,
- 0x0417, 0x0418, 0x0419, 0x041A, 0x041B, 0x041C, 0x041D, 0x041E,
- 0x041F, 0x0420, 0x0421, 0x0422, 0x0423, 0x0424, 0x0425, 0x0426,
- 0x0427, 0x0428, 0x0429, 0x042A, 0x042B, 0x042C, 0x042D, 0x042E,
- 0x042F,
- };
- // repeat cyclically
- StringBuilder sb = new StringBuilder(n);
- for (int i = 0, j = 0; i < n; i++, j = (j + 1) % abc.length) {
- sb.append(abc[j]);
- }
- String s = sb.toString();
- assert s.length() == n && s.getBytes(StandardCharsets.UTF_8).length == 2 * n;
- return s;
- }
-
@Test
- public void testIllegalArgument() throws IOException {
+ public void illegalArgument() throws IOException {
try (DummyWebSocketServer server = new DummyWebSocketServer()) {
server.open();
WebSocket ws = newHttpClient()
@@ -143,19 +86,19 @@
assertFails(IAE, ws.sendPong(ByteBuffer.allocate(129)));
assertFails(IAE, ws.sendPong(ByteBuffer.allocate(256)));
- assertFails(IOE, ws.sendText(incompleteString(), true));
- assertFails(IOE, ws.sendText(incompleteString(), false));
- assertFails(IOE, ws.sendText(malformedString(), true));
- assertFails(IOE, ws.sendText(malformedString(), false));
+ assertFails(IOE, ws.sendText(Support.incompleteString(), true));
+ assertFails(IOE, ws.sendText(Support.incompleteString(), false));
+ assertFails(IOE, ws.sendText(Support.malformedString(), true));
+ assertFails(IOE, ws.sendText(Support.malformedString(), false));
- assertFails(IAE, ws.sendClose(NORMAL_CLOSURE, stringWithNBytes(124)));
- assertFails(IAE, ws.sendClose(NORMAL_CLOSURE, stringWithNBytes(125)));
- assertFails(IAE, ws.sendClose(NORMAL_CLOSURE, stringWithNBytes(128)));
- assertFails(IAE, ws.sendClose(NORMAL_CLOSURE, stringWithNBytes(256)));
- assertFails(IAE, ws.sendClose(NORMAL_CLOSURE, stringWithNBytes(257)));
- assertFails(IAE, ws.sendClose(NORMAL_CLOSURE, stringWith2NBytes((123 / 2) + 1)));
- assertFails(IAE, ws.sendClose(NORMAL_CLOSURE, malformedString()));
- assertFails(IAE, ws.sendClose(NORMAL_CLOSURE, incompleteString()));
+ assertFails(IAE, ws.sendClose(NORMAL_CLOSURE, Support.stringWithNBytes(124)));
+ assertFails(IAE, ws.sendClose(NORMAL_CLOSURE, Support.stringWithNBytes(125)));
+ assertFails(IAE, ws.sendClose(NORMAL_CLOSURE, Support.stringWithNBytes(128)));
+ assertFails(IAE, ws.sendClose(NORMAL_CLOSURE, Support.stringWithNBytes(256)));
+ assertFails(IAE, ws.sendClose(NORMAL_CLOSURE, Support.stringWithNBytes(257)));
+ assertFails(IAE, ws.sendClose(NORMAL_CLOSURE, Support.stringWith2NBytes((123 / 2) + 1)));
+ assertFails(IAE, ws.sendClose(NORMAL_CLOSURE, Support.malformedString()));
+ assertFails(IAE, ws.sendClose(NORMAL_CLOSURE, Support.incompleteString()));
assertFails(IAE, ws.sendClose(-2, "a reason"));
assertFails(IAE, ws.sendClose(-1, "a reason"));
@@ -187,23 +130,98 @@
}
}
- private static String malformedString() {
- return new String(new char[]{0xDC00, 0xD800});
- }
+ @Test(dataProvider = "booleans")
+ public void pendingTextPingClose(boolean last) throws Exception {
+ try (DummyWebSocketServer server = Support.notReadingServer()) {
+ server.open();
+ WebSocket ws = newHttpClient()
+ .newWebSocketBuilder()
+ .buildAsync(server.getURI(), new WebSocket.Listener() { })
+ .join();
- private static String incompleteString() {
- return new String(new char[]{0xD800});
+ CharBuffer data = CharBuffer.allocate(65536);
+ CompletableFuture cfText;
+ for (int i = 0; ; i++) { // fill up the send buffer
+ System.out.printf("begin cycle #%s at %s%n",
+ i, System.currentTimeMillis());
+ cfText = ws.sendText(data, last);
+ try {
+ cfText.get(5, TimeUnit.SECONDS);
+ data.clear();
+ } catch (TimeoutException e) {
+ break;
+ } finally {
+ System.out.printf("end cycle #%s at %s%n",
+ i, System.currentTimeMillis());
+ }
+ }
+ assertFails(ISE, ws.sendText("", true));
+ assertFails(ISE, ws.sendText("", false));
+ assertFails(ISE, ws.sendBinary(ByteBuffer.allocate(0), true));
+ assertFails(ISE, ws.sendBinary(ByteBuffer.allocate(0), false));
+ CompletableFuture cfPing = ws.sendPing(ByteBuffer.allocate(125));
+ assertHangs(cfPing);
+ assertFails(ISE, ws.sendPing(ByteBuffer.allocate(125)));
+ assertFails(ISE, ws.sendPong(ByteBuffer.allocate(125)));
+ CompletableFuture cfClose = ws.sendClose(WebSocket.NORMAL_CLOSURE, "ok");
+ assertHangs(cfClose);
+ ws.abort();
+ assertFails(IOE, cfText);
+ assertFails(IOE, cfPing);
+ assertFails(IOE, cfClose);
+ }
}
- private static String stringWithNBytes(int n) {
- char[] chars = new char[n];
- Arrays.fill(chars, 'A');
- return new String(chars);
+ /* shortcut */
+ public static void assertHangs(CompletionStage> stage) {
+ Support.assertHangs(stage);
}
- @Test
- public void outstanding1() throws Exception {
- try (DummyWebSocketServer server = notReadingServer()) {
+ @Test(dataProvider = "booleans")
+ public void pendingTextPongClose(boolean last) throws Exception {
+ try (DummyWebSocketServer server = Support.notReadingServer()) {
+ server.open();
+ WebSocket ws = newHttpClient()
+ .newWebSocketBuilder()
+ .buildAsync(server.getURI(), new WebSocket.Listener() { })
+ .join();
+
+ CharBuffer data = CharBuffer.allocate(65536);
+ CompletableFuture cfText;
+ for (int i = 0; ; i++) { // fill up the send buffer
+ System.out.printf("begin cycle #%s at %s%n",
+ i, System.currentTimeMillis());
+ cfText = ws.sendText(data, last);
+ try {
+ cfText.get(5, TimeUnit.SECONDS);
+ data.clear();
+ } catch (TimeoutException e) {
+ break;
+ } finally {
+ System.out.printf("end cycle #%s at %s%n",
+ i, System.currentTimeMillis());
+ }
+ }
+ assertFails(ISE, ws.sendText("", true));
+ assertFails(ISE, ws.sendText("", false));
+ assertFails(ISE, ws.sendBinary(ByteBuffer.allocate(0), true));
+ assertFails(ISE, ws.sendBinary(ByteBuffer.allocate(0), false));
+ CompletableFuture cfPong = ws.sendPong(ByteBuffer.allocate(125));
+ assertHangs(cfPong);
+ assertFails(ISE, ws.sendPing(ByteBuffer.allocate(125)));
+ assertFails(ISE, ws.sendPong(ByteBuffer.allocate(125)));
+ CompletableFuture cfClose = ws.sendClose(WebSocket.NORMAL_CLOSURE, "ok");
+ assertHangs(cfClose);
+ ws.abort();
+ assertFails(IOE, cfText);
+ assertFails(IOE, cfPong);
+ assertFails(IOE, cfClose);
+ }
+ }
+
+ @Test(dataProvider = "booleans")
+ public void pendingBinaryPingClose(boolean last) throws Exception {
+ try (DummyWebSocketServer server = Support.notReadingServer()) {
server.open();
WebSocket ws = newHttpClient()
.newWebSocketBuilder()
@@ -211,35 +229,55 @@
.join();
ByteBuffer data = ByteBuffer.allocate(65536);
+ CompletableFuture cfBinary;
for (int i = 0; ; i++) { // fill up the send buffer
- System.out.println("cycle #" + i);
+ System.out.printf("begin cycle #%s at %s%n",
+ i, System.currentTimeMillis());
+ cfBinary = ws.sendBinary(data, last);
try {
- ws.sendBinary(data, true).get(10, TimeUnit.SECONDS);
+ cfBinary.get(5, TimeUnit.SECONDS);
data.clear();
} catch (TimeoutException e) {
break;
+ } finally {
+ System.out.printf("end cycle #%s at %s%n",
+ i, System.currentTimeMillis());
}
}
+ assertFails(ISE, ws.sendText("", true));
+ assertFails(ISE, ws.sendText("", false));
assertFails(ISE, ws.sendBinary(ByteBuffer.allocate(0), true));
- assertFails(ISE, ws.sendText("", true));
+ assertFails(ISE, ws.sendBinary(ByteBuffer.allocate(0), false));
+ CompletableFuture cfPing = ws.sendPing(ByteBuffer.allocate(125));
+ assertHangs(cfPing);
+ assertFails(ISE, ws.sendPing(ByteBuffer.allocate(125)));
+ assertFails(ISE, ws.sendPong(ByteBuffer.allocate(125)));
+ CompletableFuture cfClose = ws.sendClose(WebSocket.NORMAL_CLOSURE, "ok");
+ assertHangs(cfClose);
+ ws.abort();
+ assertFails(IOE, cfBinary);
+ assertFails(IOE, cfPing);
+ assertFails(IOE, cfClose);
}
}
- @Test
- public void outstanding2() throws Exception {
- try (DummyWebSocketServer server = notReadingServer()) {
+ @Test(dataProvider = "booleans")
+ public void pendingBinaryPongClose(boolean last) throws Exception {
+ try (DummyWebSocketServer server = Support.notReadingServer()) {
server.open();
WebSocket ws = newHttpClient()
.newWebSocketBuilder()
.buildAsync(server.getURI(), new WebSocket.Listener() { })
.join();
- CharBuffer data = CharBuffer.allocate(65536);
+ ByteBuffer data = ByteBuffer.allocate(65536);
+ CompletableFuture cfBinary;
for (int i = 0; ; i++) { // fill up the send buffer
System.out.printf("begin cycle #%s at %s%n",
i, System.currentTimeMillis());
+ cfBinary = ws.sendBinary(data, last);
try {
- ws.sendText(data, true).get(10, TimeUnit.SECONDS);
+ cfBinary.get(5, TimeUnit.SECONDS);
data.clear();
} catch (TimeoutException e) {
break;
@@ -249,12 +287,166 @@
}
}
assertFails(ISE, ws.sendText("", true));
+ assertFails(ISE, ws.sendText("", false));
assertFails(ISE, ws.sendBinary(ByteBuffer.allocate(0), true));
+ assertFails(ISE, ws.sendBinary(ByteBuffer.allocate(0), false));
+ CompletableFuture cfPong = ws.sendPong(ByteBuffer.allocate(125));
+ assertHangs(cfPong);
+ assertFails(ISE, ws.sendPing(ByteBuffer.allocate(125)));
+ assertFails(ISE, ws.sendPong(ByteBuffer.allocate(125)));
+ CompletableFuture cfClose = ws.sendClose(WebSocket.NORMAL_CLOSURE, "ok");
+ assertHangs(cfClose);
+ ws.abort();
+ assertFails(IOE, cfBinary);
+ assertFails(IOE, cfPong);
+ assertFails(IOE, cfClose);
+ }
+ }
+
+ @Test(dataProvider = "booleans")
+ public void pendingPingTextClose(boolean last) throws Exception {
+ try (DummyWebSocketServer server = Support.notReadingServer()) {
+ server.open();
+ WebSocket ws = newHttpClient()
+ .newWebSocketBuilder()
+ .buildAsync(server.getURI(), new WebSocket.Listener() { })
+ .join();
+
+ ByteBuffer data = ByteBuffer.allocate(125);
+ CompletableFuture cfPing;
+ for (int i = 0; ; i++) { // fill up the send buffer
+ System.out.println("cycle #" + i);
+ cfPing = ws.sendPing(data);
+ try {
+ cfPing.get(5, TimeUnit.SECONDS);
+ data.clear();
+ } catch (TimeoutException e) {
+ break;
+ }
+ }
+ assertFails(ISE, ws.sendPing(ByteBuffer.allocate(125)));
+ assertFails(ISE, ws.sendPong(ByteBuffer.allocate(125)));
+ CompletableFuture cfText = ws.sendText("hello", last);
+ assertHangs(cfText);
+ CompletableFuture cfClose
+ = ws.sendClose(WebSocket.NORMAL_CLOSURE, "ok");
+ assertHangs(cfClose);
+ ws.abort();
+ assertFails(IOE, cfPing);
+ assertFails(IOE, cfText);
+ assertFails(IOE, cfClose);
+ }
+ }
+
+ @Test(dataProvider = "booleans")
+ public void pendingPingBinaryClose(boolean last) throws Exception {
+ try (DummyWebSocketServer server = Support.notReadingServer()) {
+ server.open();
+ WebSocket ws = newHttpClient()
+ .newWebSocketBuilder()
+ .buildAsync(server.getURI(), new WebSocket.Listener() { })
+ .join();
+
+ ByteBuffer data = ByteBuffer.allocate(125);
+ CompletableFuture cfPing;
+ for (int i = 0; ; i++) { // fill up the send buffer
+ System.out.println("cycle #" + i);
+ cfPing = ws.sendPing(data);
+ try {
+ cfPing.get(5, TimeUnit.SECONDS);
+ data.clear();
+ } catch (TimeoutException e) {
+ break;
+ }
+ }
+ assertFails(ISE, ws.sendPing(ByteBuffer.allocate(125)));
+ assertFails(ISE, ws.sendPong(ByteBuffer.allocate(125)));
+ CompletableFuture cfBinary
+ = ws.sendBinary(ByteBuffer.allocate(4), last);
+ assertHangs(cfBinary);
+ CompletableFuture cfClose
+ = ws.sendClose(WebSocket.NORMAL_CLOSURE, "ok");
+ assertHangs(cfClose);
+ ws.abort();
+ assertFails(IOE, cfPing);
+ assertFails(IOE, cfBinary);
+ assertFails(IOE, cfClose);
+ }
+ }
+
+ @Test(dataProvider = "booleans")
+ public void pendingPongTextClose(boolean last) throws Exception {
+ try (DummyWebSocketServer server = Support.notReadingServer()) {
+ server.open();
+ WebSocket ws = newHttpClient()
+ .newWebSocketBuilder()
+ .buildAsync(server.getURI(), new WebSocket.Listener() { })
+ .join();
+
+ ByteBuffer data = ByteBuffer.allocate(125);
+ CompletableFuture cfPong;
+ for (int i = 0; ; i++) { // fill up the send buffer
+ System.out.println("cycle #" + i);
+ cfPong = ws.sendPong(data);
+ try {
+ cfPong.get(5, TimeUnit.SECONDS);
+ data.clear();
+ } catch (TimeoutException e) {
+ break;
+ }
+ }
+ assertFails(ISE, ws.sendPing(ByteBuffer.allocate(125)));
+ assertFails(ISE, ws.sendPong(ByteBuffer.allocate(125)));
+ CompletableFuture cfText = ws.sendText("hello", last);
+ assertHangs(cfText);
+ CompletableFuture cfClose
+ = ws.sendClose(WebSocket.NORMAL_CLOSURE, "ok");
+ assertHangs(cfClose);
+ ws.abort();
+ assertFails(IOE, cfPong);
+ assertFails(IOE, cfText);
+ assertFails(IOE, cfClose);
+ }
+ }
+
+ @Test(dataProvider = "booleans")
+ public void pendingPongBinaryClose(boolean last) throws Exception {
+ try (DummyWebSocketServer server = Support.notReadingServer()) {
+ server.open();
+ WebSocket ws = newHttpClient()
+ .newWebSocketBuilder()
+ .buildAsync(server.getURI(), new WebSocket.Listener() { })
+ .join();
+
+ ByteBuffer data = ByteBuffer.allocate(125);
+ CompletableFuture cfPong;
+ for (int i = 0; ; i++) { // fill up the send buffer
+ System.out.println("cycle #" + i);
+ cfPong = ws.sendPong(data);
+ try {
+ cfPong.get(5, TimeUnit.SECONDS);
+ data.clear();
+ } catch (TimeoutException e) {
+ break;
+ }
+ }
+ assertFails(ISE, ws.sendPing(ByteBuffer.allocate(125)));
+ assertFails(ISE, ws.sendPong(ByteBuffer.allocate(125)));
+ CompletableFuture cfBinary
+ = ws.sendBinary(ByteBuffer.allocate(4), last);
+ assertHangs(cfBinary);
+ CompletableFuture cfClose
+ = ws.sendClose(WebSocket.NORMAL_CLOSURE, "ok");
+ assertHangs(cfClose);
+ ws.abort();
+ assertFails(IOE, cfPong);
+ assertFails(IOE, cfBinary);
+ assertFails(IOE, cfClose);
}
}
@Test
- public void interleavingTypes1() throws IOException {
+ public void partialBinaryThenText() throws IOException {
try (DummyWebSocketServer server = new DummyWebSocketServer()) {
server.open();
WebSocket ws = newHttpClient()
@@ -265,11 +457,14 @@
ws.sendBinary(ByteBuffer.allocate(16), false).join();
assertFails(ISE, ws.sendText("text", false));
assertFails(ISE, ws.sendText("text", true));
+ // Pings & Pongs are fine
+ ws.sendPing(ByteBuffer.allocate(125)).join();
+ ws.sendPong(ByteBuffer.allocate(125)).join();
}
}
@Test
- public void interleavingTypes2() throws IOException {
+ public void partialTextThenBinary() throws IOException {
try (DummyWebSocketServer server = new DummyWebSocketServer()) {
server.open();
WebSocket ws = newHttpClient()
@@ -280,6 +475,9 @@
ws.sendText("text", false).join();
assertFails(ISE, ws.sendBinary(ByteBuffer.allocate(16), false));
assertFails(ISE, ws.sendBinary(ByteBuffer.allocate(16), true));
+ // Pings & Pongs are fine
+ ws.sendPing(ByteBuffer.allocate(125)).join();
+ ws.sendPong(ByteBuffer.allocate(125)).join();
}
}
@@ -319,7 +517,7 @@
@Test
public void sendMethodsThrowIOE2() throws Exception {
- try (DummyWebSocketServer server = serverWithCannedData(0x88, 0x00)) {
+ try (DummyWebSocketServer server = Support.serverWithCannedData(0x88, 0x00)) {
server.open();
CompletableFuture onCloseCalled = new CompletableFuture<>();
CompletableFuture canClose = new CompletableFuture<>();
@@ -396,7 +594,7 @@
};
CompletableFuture> actual = new CompletableFuture<>();
- try (DummyWebSocketServer server = serverWithCannedData(binary)) {
+ try (DummyWebSocketServer server = Support.serverWithCannedData(binary)) {
server.open();
WebSocket.Listener listener = new WebSocket.Listener() {
@@ -498,7 +696,7 @@
};
CompletableFuture> actual = new CompletableFuture<>();
- try (DummyWebSocketServer server = serverWithCannedData(binary)) {
+ try (DummyWebSocketServer server = Support.serverWithCannedData(binary)) {
server.open();
WebSocket.Listener listener = new WebSocket.Listener() {
@@ -588,7 +786,7 @@
CompletableFuture> actual = new CompletableFuture<>();
- try (DummyWebSocketServer server = serverWithCannedData(binary)) {
+ try (DummyWebSocketServer server = Support.serverWithCannedData(binary)) {
server.open();
WebSocket.Listener listener = new WebSocket.Listener() {
@@ -668,13 +866,14 @@
}
/*
- * The server sends Pong messages. The WebSocket replies to messages automatically.
- * According to RFC 6455 The WebSocket is free
+ * The server sends Ping messages. The WebSocket replies to these messages
+ * automatically. According to RFC 6455 a WebSocket client is free to reply
+ * only to the most recent Pings.
*/
@Test(dataProvider = "nPings")
public void automaticPongs(int nPings) throws Exception {
// big enough to not bother with resize
- ByteBuffer buffer = ByteBuffer.allocate(16384);
+ ByteBuffer buffer = ByteBuffer.allocate(65536);
Frame.HeaderWriter w = new Frame.HeaderWriter();
for (int i = 0; i < nPings; i++) {
w.fin(true)
@@ -691,7 +890,7 @@
.write(buffer);
buffer.putChar((char) 1000);
buffer.flip();
- try (DummyWebSocketServer server = serverWithCannedData(buffer.array())) {
+ try (DummyWebSocketServer server = Support.serverWithCannedData(buffer.array())) {
MockListener listener = new MockListener();
server.open();
WebSocket ws = newHttpClient()
@@ -768,4 +967,9 @@
public Object[][] nPings() {
return new Object[][]{{1}, {2}, {4}, {8}, {9}, {1023}};
}
+
+ @DataProvider(name = "booleans")
+ public Object[][] booleans() {
+ return new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}};
+ }
}