--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/websocket/Abort.java Mon Mar 19 17:04:28 2018 +0000
@@ -0,0 +1,416 @@
+/*
+ * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * @test
+ * @build DummyWebSocketServer
+ * @run testng/othervm
+ * -Djdk.internal.httpclient.websocket.debug=true
+ * Abort
+ */
+
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.ProtocolException;
+import java.net.http.WebSocket;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static java.net.http.HttpClient.newHttpClient;
+import static java.net.http.WebSocket.NORMAL_CLOSURE;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+public class Abort {
+
+ private static final Class<NullPointerException> NPE = NullPointerException.class;
+ private static final Class<IllegalArgumentException> IAE = IllegalArgumentException.class;
+ private static final Class<IOException> IOE = IOException.class;
+
+ private DummyWebSocketServer server;
+ private WebSocket webSocket;
+
+ @AfterTest
+ public void cleanup() {
+ server.close();
+ webSocket.abort();
+ }
+
+ @Test
+ public void onOpenThenAbort() throws Exception {
+ int[] bytes = new int[]{
+ 0x88, 0x00, // opcode=close
+ };
+ server = Support.serverWithCannedData(bytes);
+ server.open();
+ // messages are available
+ MockListener listener = new MockListener() {
+ @Override
+ protected void onOpen0(WebSocket webSocket) {
+ // unbounded request
+ webSocket.request(Long.MAX_VALUE);
+ webSocket.abort();
+ }
+ };
+ webSocket = newHttpClient().newWebSocketBuilder()
+ .buildAsync(server.getURI(), listener)
+ .join();
+ TimeUnit.SECONDS.sleep(5);
+ List<MockListener.Invocation> inv = listener.invocationsSoFar();
+ // no more invocations after onOpen as WebSocket was aborted
+ assertEquals(inv, List.of(MockListener.Invocation.onOpen(webSocket)));
+ }
+
+ @Test
+ public void onOpenThenOnTextThenAbort() throws Exception {
+ int[] bytes = new int[]{
+ 0x81, 0x00, // opcode=text, fin=true
+ 0x88, 0x00, // opcode=close
+ };
+ server = Support.serverWithCannedData(bytes);
+ server.open();
+ MockListener listener = new MockListener() {
+ @Override
+ protected void onOpen0(WebSocket webSocket) {
+ // unbounded request
+ webSocket.request(Long.MAX_VALUE);
+ }
+
+ @Override
+ protected CompletionStage<?> onText0(WebSocket webSocket,
+ CharSequence message,
+ boolean last) {
+ webSocket.abort();
+ return super.onText0(webSocket, message, last);
+ }
+ };
+ webSocket = newHttpClient().newWebSocketBuilder()
+ .buildAsync(server.getURI(), listener)
+ .join();
+ TimeUnit.SECONDS.sleep(5);
+ List<MockListener.Invocation> inv = listener.invocationsSoFar();
+ // no more invocations after onOpen, onBinary as WebSocket was aborted
+ List<MockListener.Invocation> expected = List.of(
+ MockListener.Invocation.onOpen(webSocket),
+ MockListener.Invocation.onText(webSocket, "", true));
+ assertEquals(inv, expected);
+ }
+
+ @Test
+ public void onOpenThenOnBinaryThenAbort() throws Exception {
+ int[] bytes = new int[]{
+ 0x82, 0x00, // opcode=binary, fin=true
+ 0x88, 0x00, // opcode=close
+ };
+ server = Support.serverWithCannedData(bytes);
+ server.open();
+ MockListener listener = new MockListener() {
+ @Override
+ protected void onOpen0(WebSocket webSocket) {
+ // unbounded request
+ webSocket.request(Long.MAX_VALUE);
+ }
+
+ @Override
+ protected CompletionStage<?> onBinary0(WebSocket webSocket,
+ ByteBuffer message,
+ boolean last) {
+ webSocket.abort();
+ return super.onBinary0(webSocket, message, last);
+ }
+ };
+ webSocket = newHttpClient().newWebSocketBuilder()
+ .buildAsync(server.getURI(), listener)
+ .join();
+ TimeUnit.SECONDS.sleep(5);
+ List<MockListener.Invocation> inv = listener.invocationsSoFar();
+ // no more invocations after onOpen, onBinary as WebSocket was aborted
+ List<MockListener.Invocation> expected = List.of(
+ MockListener.Invocation.onOpen(webSocket),
+ MockListener.Invocation.onBinary(webSocket, ByteBuffer.allocate(0), true));
+ assertEquals(inv, expected);
+ }
+
+ @Test
+ public void onOpenThenOnPingThenAbort() throws Exception {
+ int[] bytes = {
+ 0x89, 0x00, // opcode=ping
+ 0x88, 0x00, // opcode=close
+ };
+ server = Support.serverWithCannedData(bytes);
+ server.open();
+ MockListener listener = new MockListener() {
+ @Override
+ protected void onOpen0(WebSocket webSocket) {
+ // unbounded request
+ webSocket.request(Long.MAX_VALUE);
+ }
+
+ @Override
+ protected CompletionStage<?> onPing0(WebSocket webSocket,
+ ByteBuffer message) {
+ webSocket.abort();
+ return super.onPing0(webSocket, message);
+ }
+ };
+ webSocket = newHttpClient().newWebSocketBuilder()
+ .buildAsync(server.getURI(), listener)
+ .join();
+ TimeUnit.SECONDS.sleep(5);
+ List<MockListener.Invocation> inv = listener.invocationsSoFar();
+ // no more invocations after onOpen, onPing as WebSocket was aborted
+ List<MockListener.Invocation> expected = List.of(
+ MockListener.Invocation.onOpen(webSocket),
+ MockListener.Invocation.onPing(webSocket, ByteBuffer.allocate(0)));
+ assertEquals(inv, expected);
+ }
+
+ @Test
+ public void onOpenThenOnPongThenAbort() throws Exception {
+ int[] bytes = {
+ 0x8a, 0x00, // opcode=pong
+ 0x88, 0x00, // opcode=close
+ };
+ server = Support.serverWithCannedData(bytes);
+ server.open();
+ MockListener listener = new MockListener() {
+ @Override
+ protected void onOpen0(WebSocket webSocket) {
+ // unbounded request
+ webSocket.request(Long.MAX_VALUE);
+ }
+
+ @Override
+ protected CompletionStage<?> onPong0(WebSocket webSocket,
+ ByteBuffer message) {
+ webSocket.abort();
+ return super.onPong0(webSocket, message);
+ }
+ };
+ webSocket = newHttpClient().newWebSocketBuilder()
+ .buildAsync(server.getURI(), listener)
+ .join();
+ TimeUnit.SECONDS.sleep(5);
+ List<MockListener.Invocation> inv = listener.invocationsSoFar();
+ // no more invocations after onOpen, onPong as WebSocket was aborted
+ List<MockListener.Invocation> expected = List.of(
+ MockListener.Invocation.onOpen(webSocket),
+ MockListener.Invocation.onPong(webSocket, ByteBuffer.allocate(0)));
+ assertEquals(inv, expected);
+ }
+
+ @Test
+ public void onOpenThenOnCloseThenAbort() throws Exception {
+ int[] bytes = {
+ 0x88, 0x00, // opcode=close
+ 0x8a, 0x00, // opcode=pong
+ };
+ server = Support.serverWithCannedData(bytes);
+ server.open();
+ MockListener listener = new MockListener() {
+ @Override
+ protected void onOpen0(WebSocket webSocket) {
+ // unbounded request
+ webSocket.request(Long.MAX_VALUE);
+ }
+
+ @Override
+ protected CompletionStage<?> onClose0(WebSocket webSocket,
+ int statusCode,
+ String reason) {
+ webSocket.abort();
+ return super.onClose0(webSocket, statusCode, reason);
+ }
+ };
+ webSocket = newHttpClient().newWebSocketBuilder()
+ .buildAsync(server.getURI(), listener)
+ .join();
+ TimeUnit.SECONDS.sleep(5);
+ List<MockListener.Invocation> inv = listener.invocationsSoFar();
+ // no more invocations after onOpen, onClose
+ List<MockListener.Invocation> expected = List.of(
+ MockListener.Invocation.onOpen(webSocket),
+ MockListener.Invocation.onClose(webSocket, 1005, ""));
+ assertEquals(inv, expected);
+ }
+
+ @Test
+ public void onOpenThenOnErrorThenAbort() throws Exception {
+ // A header of 128 bytes long Ping (which is a protocol error)
+ int[] badPingHeader = new int[]{0x89, 0x7e, 0x00, 0x80};
+ int[] closeMessage = new int[]{0x88, 0x00};
+ int[] bytes = new int[badPingHeader.length + 128 + closeMessage.length];
+ System.arraycopy(badPingHeader, 0, bytes, 0, badPingHeader.length);
+ System.arraycopy(closeMessage, 0, bytes, badPingHeader.length + 128, closeMessage.length);
+ server = Support.serverWithCannedData(bytes);
+ server.open();
+ MockListener listener = new MockListener() {
+ @Override
+ protected void onOpen0(WebSocket webSocket) {
+ // unbounded request
+ webSocket.request(Long.MAX_VALUE);
+ }
+
+ @Override
+ protected void onError0(WebSocket webSocket, Throwable error) {
+ webSocket.abort();
+ super.onError0(webSocket, error);
+ }
+ };
+ webSocket = newHttpClient().newWebSocketBuilder()
+ .buildAsync(server.getURI(), listener)
+ .join();
+ TimeUnit.SECONDS.sleep(5);
+ List<MockListener.Invocation> inv = listener.invocationsSoFar();
+ // no more invocations after onOpen, onError
+ List<MockListener.Invocation> expected = List.of(
+ MockListener.Invocation.onOpen(webSocket),
+ MockListener.Invocation.onError(webSocket, ProtocolException.class));
+ System.out.println("actual invocations:" + Arrays.toString(inv.toArray()));
+ assertEquals(inv, expected);
+ }
+
+ @Test
+ public void immediateAbort() throws Exception {
+ CompletableFuture<Void> messageReceived = new CompletableFuture<>();
+ WebSocket.Listener listener = new WebSocket.Listener() {
+
+ @Override
+ public void onOpen(WebSocket webSocket) {
+ /* no initial request */
+ }
+
+ @Override
+ public CompletionStage<?> onText(WebSocket webSocket,
+ CharSequence message,
+ boolean last) {
+ messageReceived.complete(null);
+ return null;
+ }
+
+ @Override
+ public CompletionStage<?> onBinary(WebSocket webSocket,
+ ByteBuffer message,
+ boolean last) {
+ messageReceived.complete(null);
+ return null;
+ }
+
+ @Override
+ public CompletionStage<?> onPing(WebSocket webSocket,
+ ByteBuffer message) {
+ messageReceived.complete(null);
+ return null;
+ }
+
+ @Override
+ public CompletionStage<?> onPong(WebSocket webSocket,
+ ByteBuffer message) {
+ messageReceived.complete(null);
+ return null;
+ }
+
+ @Override
+ public CompletionStage<?> onClose(WebSocket webSocket,
+ int statusCode,
+ String reason) {
+ messageReceived.complete(null);
+ return null;
+ }
+ };
+
+ int[] bytes = new int[]{
+ 0x82, 0x00, // opcode=binary, fin=true
+ 0x88, 0x00, // opcode=close
+ };
+ server = Support.serverWithCannedData(bytes);
+ server.open();
+
+ WebSocket ws = newHttpClient()
+ .newWebSocketBuilder()
+ .buildAsync(server.getURI(), listener)
+ .join();
+ for (int i = 0; i < 3; i++) {
+ System.out.printf("iteration #%s%n", i);
+ // after the first abort() each consecutive one must be a no-op,
+ // moreover, query methods should continue to return consistent
+ // values
+ for (int j = 0; j < 3; j++) {
+ System.out.printf("abort #%s%n", j);
+ ws.abort();
+ assertTrue(ws.isInputClosed());
+ assertTrue(ws.isOutputClosed());
+ assertEquals(ws.getSubprotocol(), "");
+ }
+ // at this point valid requests MUST be a no-op:
+ for (int j = 0; j < 3; j++) {
+ System.out.printf("request #%s%n", j);
+ ws.request(1);
+ ws.request(2);
+ ws.request(8);
+ ws.request(Integer.MAX_VALUE);
+ ws.request(Long.MAX_VALUE);
+ // invalid requests MUST throw IAE:
+ assertThrows(IAE, () -> ws.request(Integer.MIN_VALUE));
+ assertThrows(IAE, () -> ws.request(Long.MIN_VALUE));
+ assertThrows(IAE, () -> ws.request(-1));
+ assertThrows(IAE, () -> ws.request(0));
+ }
+ }
+ // even though there is a bunch of messages readily available on the
+ // wire we shouldn't have received any of them as we aborted before
+ // the first request
+ try {
+ messageReceived.get(5, TimeUnit.SECONDS);
+ fail();
+ } catch (TimeoutException expected) {
+ System.out.println("Finished waiting");
+ }
+ for (int i = 0; i < 3; i++) {
+ System.out.printf("send #%s%n", i);
+ Support.assertFails(IOE, ws.sendText("text!", false));
+ Support.assertFails(IOE, ws.sendText("text!", true));
+ Support.assertFails(IOE, ws.sendBinary(ByteBuffer.allocate(16), false));
+ Support.assertFails(IOE, ws.sendBinary(ByteBuffer.allocate(16), true));
+ Support.assertFails(IOE, ws.sendPing(ByteBuffer.allocate(16)));
+ Support.assertFails(IOE, ws.sendPong(ByteBuffer.allocate(16)));
+ Support.assertFails(IOE, ws.sendClose(NORMAL_CLOSURE, "a reason"));
+ assertThrows(NPE, () -> ws.sendText(null, false));
+ assertThrows(NPE, () -> ws.sendText(null, true));
+ assertThrows(NPE, () -> ws.sendBinary(null, false));
+ assertThrows(NPE, () -> ws.sendBinary(null, true));
+ assertThrows(NPE, () -> ws.sendPing(null));
+ assertThrows(NPE, () -> ws.sendPong(null));
+ assertThrows(NPE, () -> ws.sendClose(NORMAL_CLOSURE, null));
+ }
+ }
+}
--- a/test/jdk/java/net/httpclient/websocket/ImmediateAbort.java Mon Mar 19 14:20:18 2018 +0000
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,168 +0,0 @@
-/*
- * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-/*
- * @test
- * @build DummyWebSocketServer
- * @run testng/othervm
- * -Djdk.internal.httpclient.websocket.debug=true
- * ImmediateAbort
- */
-
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-import java.net.http.WebSocket;
-import java.nio.ByteBuffer;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static java.net.http.HttpClient.newHttpClient;
-import static java.net.http.WebSocket.NORMAL_CLOSURE;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertThrows;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
-public class ImmediateAbort {
-
- private static final Class<NullPointerException> NPE = NullPointerException.class;
- private static final Class<IllegalArgumentException> IAE = IllegalArgumentException.class;
- private static final Class<IOException> IOE = IOException.class;
-
- /*
- * Examines WebSocket behaviour after a call to abort()
- */
- @Test
- public void immediateAbort() throws Exception {
- try (DummyWebSocketServer server = Support.serverWithCannedData(0x81, 0x00, 0x88, 0x00)) {
- server.open();
- CompletableFuture<Void> messageReceived = new CompletableFuture<>();
- WebSocket.Listener listener = new WebSocket.Listener() {
-
- @Override
- public void onOpen(WebSocket webSocket) {
- /* no initial request */
- }
-
- @Override
- public CompletionStage<?> onText(WebSocket webSocket,
- CharSequence message,
- boolean last) {
- messageReceived.complete(null);
- return null;
- }
-
- @Override
- public CompletionStage<?> onBinary(WebSocket webSocket,
- ByteBuffer message,
- boolean last) {
- messageReceived.complete(null);
- return null;
- }
-
- @Override
- public CompletionStage<?> onPing(WebSocket webSocket,
- ByteBuffer message) {
- messageReceived.complete(null);
- return null;
- }
-
- @Override
- public CompletionStage<?> onPong(WebSocket webSocket,
- ByteBuffer message) {
- messageReceived.complete(null);
- return null;
- }
-
- @Override
- public CompletionStage<?> onClose(WebSocket webSocket,
- int statusCode,
- String reason) {
- messageReceived.complete(null);
- return null;
- }
- };
-
- WebSocket ws = newHttpClient()
- .newWebSocketBuilder()
- .buildAsync(server.getURI(), listener)
- .join();
- for (int i = 0; i < 3; i++) {
- System.out.printf("iteration #%s%n", i);
- // after the first abort() each consecutive one must be a no-op,
- // moreover, query methods should continue to return consistent
- // values
- for (int j = 0; j < 3; j++) {
- System.out.printf("abort #%s%n", j);
- ws.abort();
- assertTrue(ws.isInputClosed());
- assertTrue(ws.isOutputClosed());
- assertEquals(ws.getSubprotocol(), "");
- }
- // at this point valid requests MUST be a no-op:
- for (int j = 0; j < 3; j++) {
- System.out.printf("request #%s%n", j);
- ws.request(1);
- ws.request(2);
- ws.request(8);
- ws.request(Integer.MAX_VALUE);
- ws.request(Long.MAX_VALUE);
- // invalid requests MUST throw IAE:
- assertThrows(IAE, () -> ws.request(Integer.MIN_VALUE));
- assertThrows(IAE, () -> ws.request(Long.MIN_VALUE));
- assertThrows(IAE, () -> ws.request(-1));
- assertThrows(IAE, () -> ws.request(0));
- }
- }
- // even though there is a bunch of messages readily available on the
- // wire we shouldn't have received any of them as we aborted before
- // the first request
- try {
- messageReceived.get(10, TimeUnit.SECONDS);
- fail();
- } catch (TimeoutException expected) {
- System.out.println("Finished waiting");
- }
- for (int i = 0; i < 3; i++) {
- System.out.printf("send #%s%n", i);
- Support.assertFails(IOE, ws.sendText("text!", false));
- Support.assertFails(IOE, ws.sendText("text!", true));
- Support.assertFails(IOE, ws.sendBinary(ByteBuffer.allocate(16), false));
- Support.assertFails(IOE, ws.sendBinary(ByteBuffer.allocate(16), true));
- Support.assertFails(IOE, ws.sendPing(ByteBuffer.allocate(16)));
- Support.assertFails(IOE, ws.sendPong(ByteBuffer.allocate(16)));
- Support.assertFails(IOE, ws.sendClose(NORMAL_CLOSURE, "a reason"));
- assertThrows(NPE, () -> ws.sendText(null, false));
- assertThrows(NPE, () -> ws.sendText(null, true));
- assertThrows(NPE, () -> ws.sendBinary(null, false));
- assertThrows(NPE, () -> ws.sendBinary(null, true));
- assertThrows(NPE, () -> ws.sendPing(null));
- assertThrows(NPE, () -> ws.sendPong(null));
- assertThrows(NPE, () -> ws.sendClose(NORMAL_CLOSURE, null));
- }
- }
- }
-}
--- a/test/jdk/java/net/httpclient/websocket/MockListener.java Mon Mar 19 14:20:18 2018 +0000
+++ b/test/jdk/java/net/httpclient/websocket/MockListener.java Mon Mar 19 17:04:28 2018 +0000
@@ -28,16 +28,13 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
public class MockListener implements WebSocket.Listener {
private final long bufferSize;
private long count;
- private final List<Invocation> invocations = new ArrayList<>(); // better sync
+ private final List<Invocation> invocations = new ArrayList<>();
private final CompletableFuture<?> lastCall = new CompletableFuture<>();
private final Predicate<? super Invocation> collectUntil;
@@ -66,7 +63,9 @@
public void onOpen(WebSocket webSocket) {
System.out.printf("onOpen(%s)%n", webSocket);
OnOpen inv = new OnOpen(webSocket);
- invocations.add(inv);
+ synchronized (invocations) {
+ invocations.add(inv);
+ }
if (collectUntil.test(inv)) {
lastCall.complete(null);
}
@@ -83,7 +82,9 @@
boolean last) {
System.out.printf("onText(%s, message.length=%s, %s)%n", webSocket, message.length(), last);
OnText inv = new OnText(webSocket, message.toString(), last);
- invocations.add(inv);
+ synchronized (invocations) {
+ invocations.add(inv);
+ }
if (collectUntil.test(inv)) {
lastCall.complete(null);
}
@@ -103,7 +104,9 @@
boolean last) {
System.out.printf("onBinary(%s, %s, %s)%n", webSocket, message, last);
OnBinary inv = new OnBinary(webSocket, fullCopy(message), last);
- invocations.add(inv);
+ synchronized (invocations) {
+ invocations.add(inv);
+ }
if (collectUntil.test(inv)) {
lastCall.complete(null);
}
@@ -121,7 +124,9 @@
public CompletionStage<?> onPing(WebSocket webSocket, ByteBuffer message) {
System.out.printf("onPing(%s, %s)%n", webSocket, message);
OnPing inv = new OnPing(webSocket, fullCopy(message));
- invocations.add(inv);
+ synchronized (invocations) {
+ invocations.add(inv);
+ }
if (collectUntil.test(inv)) {
lastCall.complete(null);
}
@@ -137,7 +142,9 @@
public CompletionStage<?> onPong(WebSocket webSocket, ByteBuffer message) {
System.out.printf("onPong(%s, %s)%n", webSocket, message);
OnPong inv = new OnPong(webSocket, fullCopy(message));
- invocations.add(inv);
+ synchronized (invocations) {
+ invocations.add(inv);
+ }
if (collectUntil.test(inv)) {
lastCall.complete(null);
}
@@ -155,10 +162,18 @@
String reason) {
System.out.printf("onClose(%s, %s, %s)%n", webSocket, statusCode, reason);
OnClose inv = new OnClose(webSocket, statusCode, reason);
- invocations.add(inv);
+ synchronized (invocations) {
+ invocations.add(inv);
+ }
if (collectUntil.test(inv)) {
lastCall.complete(null);
}
+ return onClose0(webSocket, statusCode, reason);
+ }
+
+ protected CompletionStage<?> onClose0(WebSocket webSocket,
+ int statusCode,
+ String reason) {
return null;
}
@@ -166,22 +181,28 @@
public void onError(WebSocket webSocket, Throwable error) {
System.out.printf("onError(%s, %s)%n", webSocket, error);
OnError inv = new OnError(webSocket, error == null ? null : error.getClass());
- invocations.add(inv);
+ synchronized (invocations) {
+ invocations.add(inv);
+ }
if (collectUntil.test(inv)) {
lastCall.complete(null);
}
+ onError0(webSocket, error);
}
- public List<Invocation> invocations(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException
- {
- lastCall.get(timeout, unit);
- return new ArrayList<>(invocations);
+ protected void onError0(WebSocket webSocket, Throwable error) { }
+
+ public List<Invocation> invocationsSoFar() {
+ synchronized (invocations) {
+ return new ArrayList<>(invocations);
+ }
}
public List<Invocation> invocations() {
lastCall.join();
- return new ArrayList<>(invocations);
+ synchronized (invocations) {
+ return new ArrayList<>(invocations);
+ }
}
protected void replenish(WebSocket webSocket) {