--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java Mon Mar 12 12:47:29 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java Tue Mar 13 17:10:20 2018 +0000
@@ -122,7 +122,7 @@
long id;
if (DEBUG) {
id = counter.incrementAndGet();
- System.out.printf("[Transport] %s: sendText message.length()=%s, last=%s%n",
+ System.out.printf("[Transport] enter send text %s message.length()=%s last=%s%n",
id, message.length(), isLast);
}
// TODO (optimization?):
@@ -142,7 +142,7 @@
f.completeExceptionally(e);
}
if (DEBUG) {
- System.out.printf("[Transport] %s: sendText returned %s%n", id, f);
+ System.out.printf("[Transport] exit send text %s returned %s%n", id, f);
}
return f;
}
@@ -155,7 +155,7 @@
long id;
if (DEBUG) {
id = counter.incrementAndGet();
- System.out.printf("[Transport] %s: sendBinary message.remaining()=%s, last=%s%n",
+ System.out.printf("[Transport] enter send binary %s message.remaining()=%s last=%s%n",
id, message.remaining(), isLast);
}
MinimalFuture<T> f = new MinimalFuture<>();
@@ -166,7 +166,7 @@
f.completeExceptionally(e);
}
if (DEBUG) {
- System.out.printf("[Transport] %s: sendBinary returned %s%n", id, f);
+ System.out.printf("[Transport] exit send binary %s returned %s%n", id, f);
}
return f;
}
@@ -178,7 +178,7 @@
long id;
if (DEBUG) {
id = counter.incrementAndGet();
- System.out.printf("[Transport] %s: sendPing message.remaining()=%s%n",
+ System.out.printf("[Transport] enter send ping %s message.remaining()=%s%n",
id, message.remaining());
}
MinimalFuture<T> f = new MinimalFuture<>();
@@ -189,7 +189,7 @@
f.completeExceptionally(e);
}
if (DEBUG) {
- System.out.printf("[Transport] %s: sendPing returned %s%n", id, f);
+ System.out.printf("[Transport] exit send ping %s returned %s%n", id, f);
}
return f;
}
@@ -201,7 +201,7 @@
long id;
if (DEBUG) {
id = counter.incrementAndGet();
- System.out.printf("[Transport] %s: sendPong message.remaining()=%s%n",
+ System.out.printf("[Transport] enter send pong %s message.remaining()=%s%n",
id, message.remaining());
}
MinimalFuture<T> f = new MinimalFuture<>();
@@ -212,7 +212,7 @@
f.completeExceptionally(e);
}
if (DEBUG) {
- System.out.printf("[Transport] %s: sendPong returned %s%n", id, f);
+ System.out.printf("[Transport] exit send pong %s returned %s%n", id, f);
}
return f;
}
@@ -225,7 +225,7 @@
long id;
if (DEBUG) {
id = counter.incrementAndGet();
- System.out.printf("[Transport] %s: sendClose statusCode=%s, reason.length()=%s%n",
+ System.out.printf("[Transport] enter send close %s statusCode=%s, reason.length()=%s%n",
id, statusCode, reason.length());
}
MinimalFuture<T> f = new MinimalFuture<>();
@@ -236,7 +236,7 @@
f.completeExceptionally(e);
}
if (DEBUG) {
- System.out.printf("[Transport] %s: sendClose returned %s%n", id, f);
+ System.out.printf("[Transport] exit send close %s returned %s%n", id, f);
}
return f;
}
@@ -461,7 +461,7 @@
// (a) A message has been added to the queue
// (b) The channel is ready for writing
if (DEBUG) {
- System.out.printf("[Transport] begin send task%n");
+ System.out.printf("[Transport] enter send task%n");
}
while (!queue.isEmpty()) {
try {
@@ -507,13 +507,13 @@
}
}
if (DEBUG) {
- System.out.printf("[Transport] end send task%n");
+ System.out.printf("[Transport] exit send task%n");
}
}
private boolean tryCompleteWrite() throws IOException {
if (DEBUG) {
- System.out.printf("[Transport] begin writing%n");
+ System.out.printf("[Transport] enter writing%n");
}
boolean finished = false;
loop:
@@ -554,7 +554,7 @@
}
}
if (DEBUG) {
- System.out.printf("[Transport] end writing%n");
+ System.out.printf("[Transport] exit writing%n");
}
return finished;
}
@@ -592,7 +592,7 @@
@Override
public void run() {
if (DEBUG) {
- System.out.printf("[Transport] begin receive task%n");
+ System.out.printf("[Transport] enter receive task%n");
}
loop:
while (!receiveScheduler.isStopped()) {
@@ -655,7 +655,7 @@
}
}
if (DEBUG) {
- System.out.printf("[Transport] end receive task%n");
+ System.out.printf("[Transport] exit receive task%n");
}
}
}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java Mon Mar 12 12:47:29 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java Tue Mar 13 17:10:20 2018 +0000
@@ -33,6 +33,7 @@
import jdk.internal.net.http.websocket.OpeningHandshake.Result;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.lang.ref.Reference;
import java.net.ProtocolException;
import java.net.URI;
@@ -41,6 +42,7 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -49,6 +51,7 @@
import java.util.function.Function;
import static java.util.Objects.requireNonNull;
+import static jdk.internal.net.http.common.MinimalFuture.completedFuture;
import static jdk.internal.net.http.common.MinimalFuture.failedFuture;
import static jdk.internal.net.http.websocket.StatusCodes.CLOSED_ABNORMALLY;
import static jdk.internal.net.http.websocket.StatusCodes.NO_STATUS_CODE;
@@ -69,7 +72,8 @@
public final class WebSocketImpl implements WebSocket {
private final static boolean DEBUG = true;
- private final AtomicLong counter = new AtomicLong();
+ private final AtomicLong sendCounter = new AtomicLong();
+ private final AtomicLong receiveCounter = new AtomicLong();
enum State {
OPEN,
@@ -84,6 +88,7 @@
}
private final MinimalFuture<WebSocket> DONE = MinimalFuture.completedFuture(this);
+ private final long closeTimeout;
private volatile boolean inputClosed;
private volatile boolean outputClosed;
@@ -150,6 +155,28 @@
this.listener = requireNonNull(listener);
this.transport = transportFactory.createTransport(
new SignallingMessageConsumer());
+ closeTimeout = readCloseTimeout();
+ }
+
+ private static int readCloseTimeout() {
+ String property = "jdk.httpclient.websocket.closeTimeout";
+ int defaultValue = 30;
+ String value = Utils.getNetProperty(property);
+ int v;
+ if (value == null) {
+ v = defaultValue;
+ } else {
+ try {
+ v = Integer.parseUnsignedInt(value);
+ } catch (NumberFormatException ignored) {
+ v = defaultValue;
+ }
+ }
+ if (DEBUG) {
+ System.out.printf("[WebSocket] %s=%s, using value %s%n",
+ property, value, v);
+ }
+ return v;
}
// FIXME: add to action handling of errors -> signalError()
@@ -160,7 +187,7 @@
Objects.requireNonNull(message);
long id;
if (DEBUG) {
- id = counter.incrementAndGet();
+ id = sendCounter.incrementAndGet();
System.out.printf("[WebSocket] %s send text: payload length=%s last=%s%n",
id, message.length(), isLast);
}
@@ -184,7 +211,7 @@
Objects.requireNonNull(message);
long id;
if (DEBUG) {
- id = counter.incrementAndGet();
+ id = sendCounter.incrementAndGet();
System.out.printf("[WebSocket] %s send binary: payload=%s last=%s%n",
id, message, isLast);
}
@@ -217,7 +244,7 @@
Objects.requireNonNull(message);
long id;
if (DEBUG) {
- id = counter.incrementAndGet();
+ id = sendCounter.incrementAndGet();
System.out.printf("[WebSocket] %s send ping: payload=%s%n",
id, message);
}
@@ -235,7 +262,7 @@
Objects.requireNonNull(message);
long id;
if (DEBUG) {
- id = counter.incrementAndGet();
+ id = sendCounter.incrementAndGet();
System.out.printf("[WebSocket] %s send pong: payload=%s%n",
id, message);
}
@@ -254,7 +281,7 @@
Objects.requireNonNull(reason);
long id;
if (DEBUG) {
- id = counter.incrementAndGet();
+ id = sendCounter.incrementAndGet();
System.out.printf("[WebSocket] %s send close: statusCode=%s, reason.length=%s%n",
id, statusCode, reason);
}
@@ -272,38 +299,50 @@
return replaceNull(result);
}
- /*
- * Sends a Close message, then shuts down the output since no more
- * messages are expected to be sent at this point.
- */
private CompletableFuture<WebSocket> sendClose0(int statusCode,
String reason) {
outputClosed = true;
- BiConsumer<WebSocket, Throwable> closer = (r, e) -> {
- Throwable cause = Utils.getCompletionCause(e);
- if (cause instanceof IllegalArgumentException) {
- // or pre=check it (isLegalToSendFromClient(statusCode))
- return;
- }
- try {
- transport.closeOutput();
- } catch (IOException ex) {
- Log.logError(ex);
+ CompletableFuture<WebSocket> cf
+ = transport.sendClose(statusCode, reason, this, (r, e) -> { });
+ CompletableFuture<WebSocket> closeOrTimeout
+ = replaceNull(cf).orTimeout(closeTimeout, TimeUnit.SECONDS);
+ // The snippet below, whose purpose might not be immediately obvious,
+ // is a trick used to complete a dependant stage with an IOException.
+ // A checked IOException cannot be thrown from inside the BiConsumer
+ // supplied to the handle method. Instead a CompletionStage completed
+ // exceptionally with this IOException is returned.
+ return closeOrTimeout.handle(this::processCloseOutcome)
+ .thenCompose(Function.identity());
+ }
+
+ private CompletionStage<WebSocket> processCloseOutcome(WebSocket webSocket,
+ Throwable e) {
+ if (DEBUG) {
+ System.out.printf("[WebSocket] send close completed, error=%s%n", e);
+ if (e != null) {
+ e.printStackTrace(System.out);
}
- if (cause instanceof TimeoutException) { // FIXME: it is not the case anymore
- if (DEBUG) {
- System.out.println("[WebSocket] sendClose0 error: " + e);
- }
- try {
- transport.closeInput();
- } catch (IOException ex) {
- Log.logError(ex);
- }
- }
- };
- CompletableFuture<WebSocket> cf
- = transport.sendClose(statusCode, reason, this, closer);
- return cf;
+ }
+ if (e == null) {
+ return completedFuture(webSocket);
+ }
+ Throwable cause = Utils.getCompletionCause(e);
+ if (cause instanceof IllegalArgumentException) {
+ return failedFuture(cause);
+ }
+ try {
+ transport.closeOutput();
+ } catch (IOException ignored) { }
+
+ if (cause instanceof TimeoutException) {
+ inputClosed = true;
+ try {
+ transport.closeInput();
+ } catch (IOException ignored) { }
+ return failedFuture(new InterruptedIOException(
+ "Could not send close within a reasonable timeout"));
+ }
+ return failedFuture(cause);
}
@Override
@@ -372,8 +411,15 @@
@Override
public void run() {
+ if (DEBUG) {
+ System.out.printf("[WebSocket] enter receive task%n");
+ }
+ loop:
while (true) {
State s = state.get();
+ if (DEBUG) {
+ System.out.printf("[WebSocket] receive state: %s%n", s);
+ }
try {
switch (s) {
case OPEN:
@@ -398,20 +444,20 @@
break;
case CLOSE:
processClose();
- return;
+ break loop;
case ERROR:
processError();
- return;
+ break loop;
case IDLE:
if (demand.tryDecrement()
&& tryChangeState(IDLE, WAITING)) {
transport.request(1);
}
- return;
+ break loop;
case WAITING:
// For debugging spurious signalling: when there was a
// signal, but apparently nothing has changed
- return;
+ break loop;
default:
throw new InternalError(String.valueOf(s));
}
@@ -419,6 +465,9 @@
signalError(t);
}
}
+ if (DEBUG) {
+ System.out.printf("[WebSocket] exit receive task%n");
+ }
}
private void processError() throws IOException {
@@ -431,7 +480,11 @@
if (err instanceof FailWebSocketException) {
int code1 = ((FailWebSocketException) err).getStatusCode();
err = new ProtocolException().initCause(err);
- sendClose0(code1, "")
+ if (DEBUG) {
+ System.out.printf("[WebSocket] failing %s with error=%s statusCode=%s%n",
+ WebSocketImpl.this, err, code1);
+ }
+ sendClose0(code1, "") // TODO handle errors from here
.whenComplete(
(r, e) -> {
if (e != null) {
@@ -439,7 +492,19 @@
}
});
}
- listener.onError(WebSocketImpl.this, err);
+ long id;
+ if (DEBUG) {
+ id = receiveCounter.incrementAndGet();
+ System.out.printf("[WebSocket] enter onError %s error=%s%n",
+ id, err);
+ }
+ try {
+ listener.onError(WebSocketImpl.this, err);
+ } finally {
+ if (DEBUG) {
+ System.out.printf("[WebSocket] exit onError %s%n", id);
+ }
+ }
}
private void processClose() throws IOException {
@@ -448,10 +513,21 @@
}
transport.closeInput();
receiveScheduler.stop();
- CompletionStage<?> readyToClose;
- readyToClose = listener.onClose(WebSocketImpl.this, statusCode, reason);
- if (readyToClose == null) {
- readyToClose = DONE;
+ CompletionStage<?> cs = null; // when the listener is ready to close
+ long id;
+ if (DEBUG) {
+ id = receiveCounter.incrementAndGet();
+ System.out.printf("[WebSocket] enter onClose %s statusCode=%s reason.length=%s%n",
+ id, statusCode, reason.length());
+ }
+ try {
+ cs = listener.onClose(WebSocketImpl.this, statusCode, reason);
+ } finally {
+ System.out.printf("[WebSocket] exit onClose %s returned %s%n",
+ id, cs);
+ }
+ if (cs == null) {
+ cs = DONE;
}
int code;
if (statusCode == NO_STATUS_CODE || statusCode == CLOSED_ABNORMALLY) {
@@ -463,8 +539,8 @@
} else {
code = statusCode;
}
- readyToClose.whenComplete((r, e) -> {
- sendClose0(code, "") // FIXME errors from here?
+ cs.whenComplete((r, e) -> { // TODO log
+ sendClose0(code, "") // TODO handle errors from here
.whenComplete((r1, e1) -> {
if (DEBUG) {
if (e1 != null) {
@@ -476,37 +552,105 @@
}
private void processPong() {
- listener.onPong(WebSocketImpl.this, binaryData);
+ long id;
+ if (DEBUG) {
+ id = receiveCounter.incrementAndGet();
+ System.out.printf("[WebSocket] enter onPong %s payload=%s%n",
+ id, binaryData);
+ }
+ CompletionStage<?> cs = null;
+ try {
+ cs = listener.onPong(WebSocketImpl.this, binaryData);
+ } finally {
+ System.out.printf("[WebSocket] exit onPong %s returned %s%n",
+ id, cs);
+ }
}
private void processPing() {
- // Let's make a full copy of this tiny data. What we want here
- // is to rule out a possibility the shared data we send might be
- // corrupted by processing in the listener.
+ if (DEBUG) {
+ System.out.printf("[WebSocket] processPing%n");
+ }
ByteBuffer slice = binaryData.slice();
+ // A full copy of this (small) data is made. This way sending a
+ // replying Pong could be done in parallel with the listener
+ // handling this Ping.
ByteBuffer copy = ByteBuffer.allocate(binaryData.remaining())
.put(binaryData)
.flip();
// Non-exclusive send;
BiConsumer<WebSocketImpl, Throwable> reporter = (r, e) -> {
- if (e != null) { // Better error handing. What if already closed?
+ if (e != null) { // TODO: better error handing. What if already closed?
signalError(Utils.getCompletionCause(e));
}
};
transport.sendPong(copy, WebSocketImpl.this, reporter);
- listener.onPing(WebSocketImpl.this, slice);
+ long id;
+ if (DEBUG) {
+ id = receiveCounter.incrementAndGet();
+ System.out.printf("[WebSocket] enter onPing %s payload=%s%n",
+ id, slice);
+ }
+ CompletionStage<?> cs = null;
+ try {
+ cs = listener.onPing(WebSocketImpl.this, slice);
+ } finally {
+ if (DEBUG) {
+ System.out.printf("[WebSocket] exit onPing %s returned %s%n",
+ id, cs);
+ }
+ }
}
private void processBinary() {
- listener.onBinary(WebSocketImpl.this, binaryData, part);
+ long id;
+ if (DEBUG) {
+ id = receiveCounter.incrementAndGet();
+ System.out.printf("[WebSocket] enter onBinary %s payload=%s, part=%s%n",
+ id, binaryData, part);
+ }
+ CompletionStage<?> cs = null;
+ try {
+ cs = listener.onBinary(WebSocketImpl.this, binaryData, part);
+ } finally {
+ if (DEBUG) {
+ System.out.printf("[WebSocket] exit onBinary %s returned %s%n",
+ id, cs);
+ }
+ }
}
private void processText() {
- listener.onText(WebSocketImpl.this, text, part);
+ long id;
+ if (DEBUG) {
+ id = receiveCounter.incrementAndGet();
+ System.out.printf("[WebSocket] enter onText %s payload.length=%s part=%s%n",
+ id, text.length(), part);
+ }
+ CompletionStage<?> cs = null;
+ try {
+ cs = listener.onText(WebSocketImpl.this, text, part);
+ } finally {
+ if (DEBUG) {
+ System.out.printf("[WebSocket] exit onText %s returned %s%n",
+ id, cs);
+ }
+ }
}
private void processOpen() {
- listener.onOpen(WebSocketImpl.this);
+ long id;
+ if (DEBUG) {
+ id = receiveCounter.incrementAndGet();
+ System.out.printf("[WebSocket] enter onOpen %s%n", id);
+ }
+ try {
+ listener.onOpen(WebSocketImpl.this);
+ } finally {
+ if (DEBUG) {
+ System.out.printf("[WebSocket] exit onOpen %s%n", id);
+ }
+ }
}
}
--- a/test/jdk/java/net/httpclient/websocket/WebSocketTest.java Mon Mar 12 12:47:29 2018 +0000
+++ b/test/jdk/java/net/httpclient/websocket/WebSocketTest.java Tue Mar 13 17:10:20 2018 +0000
@@ -248,21 +248,29 @@
.newWebSocketBuilder()
.buildAsync(server.getURI(), new WebSocket.Listener() { })
.join();
- ByteBuffer data = ByteBuffer.allocate(65536);
- for (int i = 0; ; i++) { // fill up the send buffer
- System.out.println("cycle #" + i);
- try {
- ws.sendBinary(data, true).get(10, TimeUnit.SECONDS);
- data.clear();
- } catch (TimeoutException e) {
- break;
+ try {
+ ByteBuffer data = ByteBuffer.allocate(65536);
+ for (int i = 0; ; i++) { // fill up the send buffer
+ System.out.printf("begin cycle #%s at %s%n",
+ i, System.currentTimeMillis());
+ try {
+ ws.sendBinary(data, true).get(10, TimeUnit.SECONDS);
+ data.clear();
+ } catch (TimeoutException e) {
+ break;
+ } finally {
+ System.out.printf("end cycle #%s at %s%n",
+ i, System.currentTimeMillis());
+ }
}
+ CompletableFuture<WebSocket> cf = ws.sendClose(NORMAL_CLOSURE, "");
+ // The output closes even if the Close message has not been sent
+ assertFalse(cf.isDone());
+ assertTrue(ws.isOutputClosed());
+ assertEquals(ws.getSubprotocol(), "");
+ } finally {
+ ws.abort();
}
- CompletableFuture<WebSocket> cf = ws.sendClose(NORMAL_CLOSURE, "");
- // The output closes even if the Close message has not been sent
- assertFalse(cf.isDone());
- assertTrue(ws.isOutputClosed());
- assertEquals(ws.getSubprotocol(), "");
}
}
@@ -295,13 +303,17 @@
ByteBuffer data = ByteBuffer.allocate(65536);
CompletableFuture<WebSocket> cf = null;
for (int i = 0; ; i++) { // fill up the send buffer
- System.out.println("cycle #" + i);
+ System.out.printf("begin cycle #%s at %s%n",
+ i, System.currentTimeMillis());
try {
cf = ws.sendBinary(data, true);
cf.get(10, TimeUnit.SECONDS);
data.clear();
} catch (TimeoutException e) {
break;
+ } finally {
+ System.out.printf("end cycle #%s at %s%n",
+ i, System.currentTimeMillis());
}
}
ws.abort();
@@ -322,12 +334,16 @@
String data = stringWith2NBytes(32768);
CompletableFuture<WebSocket> cf = null;
for (int i = 0; ; i++) { // fill up the send buffer
- System.out.println("cycle #" + i);
+ System.out.printf("begin cycle #%s at %s%n",
+ i, System.currentTimeMillis());
try {
cf = ws.sendText(data, true);
cf.get(10, TimeUnit.SECONDS);
} catch (TimeoutException e) {
break;
+ } finally {
+ System.out.printf("end cycle #%s at %s%n",
+ i, System.currentTimeMillis());
}
}
ws.abort();
@@ -337,6 +353,43 @@
}
}
+ @Test
+ public void sendCloseTimeout() throws Exception {
+ try (DummyWebSocketServer server = notReadingServer()) {
+ server.open();
+ WebSocket ws = newHttpClient()
+ .newWebSocketBuilder()
+ .buildAsync(server.getURI(), new WebSocket.Listener() { })
+ .join();
+ String data = stringWith2NBytes(32768);
+ CompletableFuture<WebSocket> cf = null;
+ for (int i = 0; ; i++) { // fill up the send buffer
+ System.out.printf("begin cycle #%s at %s%n",
+ i, System.currentTimeMillis());
+ try {
+ cf = ws.sendText(data, true);
+ cf.get(10, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ break;
+ } finally {
+ System.out.printf("end cycle #%s at %s%n",
+ i, System.currentTimeMillis());
+ }
+ }
+ long before = System.currentTimeMillis();
+ assertCompletesExceptionally(IOException.class,
+ ws.sendClose(WebSocket.NORMAL_CLOSURE, "ok"));
+ long after = System.currentTimeMillis();
+ // default timeout should be 30 seconds
+ long elapsed = after - before;
+ System.out.printf("Elapsed %s ms%n", elapsed);
+ assertTrue(elapsed >= 29_000, String.valueOf(elapsed));
+ assertTrue(ws.isOutputClosed());
+ assertTrue(ws.isInputClosed());
+ assertCompletesExceptionally(IOException.class, cf);
+ }
+ }
+
private static String stringWith2NBytes(int n) {
// -- Russian Alphabet (33 characters, 2 bytes per char) --
char[] abc = {
@@ -470,12 +523,16 @@
CharBuffer data = CharBuffer.allocate(65536);
for (int i = 0; ; i++) { // fill up the send buffer
- System.out.println("cycle #" + i);
+ System.out.printf("begin cycle #%s at %s%n",
+ i, System.currentTimeMillis());
try {
ws.sendText(data, true).get(10, TimeUnit.SECONDS);
data.clear();
} catch (TimeoutException e) {
break;
+ } finally {
+ System.out.printf("end cycle #%s at %s%n",
+ i, System.currentTimeMillis());
}
}
assertCompletesExceptionally(ISE, ws.sendText("", true));