--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OpeningHandshake.java Fri Nov 24 12:10:19 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OpeningHandshake.java Fri Nov 24 17:51:51 2017 +0300
@@ -197,11 +197,11 @@
static final class Result {
final String subprotocol;
- final RawChannel channel;
+ final TransportSupplier transport;
- private Result(String subprotocol, RawChannel channel) {
+ private Result(String subprotocol, TransportSupplier transport) {
this.subprotocol = subprotocol;
- this.channel = channel;
+ this.transport = transport;
}
}
@@ -260,7 +260,7 @@
}
String subprotocol = checkAndReturnSubprotocol(headers);
RawChannel channel = ((RawChannel.Provider) response).rawChannel();
- return new Result(subprotocol, channel);
+ return new Result(subprotocol, new TransportSupplier(channel));
}
private String checkAndReturnSubprotocol(HttpHeaders responseHeaders)
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java Fri Nov 24 12:10:19 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java Fri Nov 24 17:51:51 2017 +0300
@@ -50,7 +50,7 @@
*
* even if `request(long n)` is called from inside these invocations.
*/
-final class Receiver {
+public class Receiver {
private final MessageStreamConsumer messageConsumer;
private final RawChannel channel;
@@ -67,7 +67,7 @@
private static final int AVAILABLE = 1;
private static final int WAITING = 2;
- Receiver(MessageStreamConsumer messageConsumer, RawChannel channel) {
+ public Receiver(MessageStreamConsumer messageConsumer, RawChannel channel) {
this.messageConsumer = messageConsumer;
this.channel = channel;
this.frameConsumer = new FrameConsumer(this.messageConsumer);
@@ -94,7 +94,7 @@
};
}
- void request(long n) {
+ public void request(long n) {
if (n <= 0L) {
throw new IllegalArgumentException("Non-positive request: " + n);
}
@@ -113,8 +113,9 @@
* Stops the machinery from reading and delivering messages permanently,
* regardless of the current demand and data availability.
*/
- void close() {
+ public void close() throws IOException {
pushScheduler.stop();
+ channel.shutdownInput();
}
private class PushContinuouslyTask
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transmitter.java Fri Nov 24 12:10:19 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transmitter.java Fri Nov 24 17:51:51 2017 +0300
@@ -70,6 +70,10 @@
send0(message, completionHandler);
}
+ public void close() throws IOException {
+ channel.shutdownOutput();
+ }
+
private RawChannel.RawEvent createHandler() {
return new RawChannel.RawEvent() {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/TransportSupplier.java Fri Nov 24 17:51:51 2017 +0300
@@ -0,0 +1,109 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.incubator.http.internal.websocket;
+
+import java.io.IOException;
+
+/*
+ * Abstracts out I/O channel for the WebSocket implementation. The latter then
+ * deals with input and output streams of messages and does not have to
+ * understand the state machine of channels (e.g. how exactly they are closed).
+ * Mocking this type will allow testing WebSocket message exchange in isolation.
+ */
+public class TransportSupplier {
+
+ private final RawChannel channel;
+ private final Object lock = new Object();
+ private Transmitter transmitter;
+ private Receiver receiver;
+ private boolean receiverShutdown;
+ private boolean transmitterShutdown;
+ private boolean closed;
+
+ public TransportSupplier(RawChannel channel) {
+ this.channel = channel;
+ }
+
+ public Receiver receiver(MessageStreamConsumer consumer) {
+ synchronized (lock) {
+ if (receiver == null) {
+ receiver = newReceiver(consumer);
+ }
+ return receiver;
+ }
+ }
+
+ public Transmitter transmitter() {
+ synchronized (lock) {
+ if (transmitter == null) {
+ transmitter = newTransmitter();
+ }
+ return transmitter;
+ }
+ }
+
+ protected Receiver newReceiver(MessageStreamConsumer consumer) {
+ return new Receiver(consumer, channel) {
+ @Override
+ public void close() throws IOException {
+ synchronized (lock) {
+ if (!closed) {
+ try {
+ super.close();
+ } finally {
+ receiverShutdown = true;
+ if (transmitterShutdown) {
+ closed = true;
+ channel.close();
+ }
+ }
+ }
+ }
+ }
+ };
+ }
+
+ protected Transmitter newTransmitter() {
+ return new Transmitter(channel) {
+ @Override
+ public void close() throws IOException {
+ synchronized (lock) {
+ if (!closed) {
+ try {
+ super.close();
+ } finally {
+ transmitterShutdown = true;
+ if (receiverShutdown) {
+ closed = true;
+ channel.close();
+ }
+ }
+ }
+ }
+ }
+ };
+ }
+}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java Fri Nov 24 12:10:19 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java Fri Nov 24 17:51:51 2017 +0300
@@ -25,6 +25,21 @@
package jdk.incubator.http.internal.websocket;
+import jdk.incubator.http.WebSocket;
+import jdk.incubator.http.internal.common.Log;
+import jdk.incubator.http.internal.common.MinimalFuture;
+import jdk.incubator.http.internal.common.Pair;
+import jdk.incubator.http.internal.common.SequentialScheduler;
+import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter;
+import jdk.incubator.http.internal.common.Utils;
+import jdk.incubator.http.internal.websocket.OpeningHandshake.Result;
+import jdk.incubator.http.internal.websocket.OutgoingMessage.Binary;
+import jdk.incubator.http.internal.websocket.OutgoingMessage.Close;
+import jdk.incubator.http.internal.websocket.OutgoingMessage.Context;
+import jdk.incubator.http.internal.websocket.OutgoingMessage.Ping;
+import jdk.incubator.http.internal.websocket.OutgoingMessage.Pong;
+import jdk.incubator.http.internal.websocket.OutgoingMessage.Text;
+
import java.io.IOException;
import java.lang.ref.Reference;
import java.net.ProtocolException;
@@ -40,21 +55,6 @@
import java.util.function.Consumer;
import java.util.function.Function;
-import jdk.incubator.http.WebSocket;
-import jdk.incubator.http.internal.common.Log;
-import jdk.incubator.http.internal.common.MinimalFuture;
-import jdk.incubator.http.internal.common.Pair;
-import jdk.incubator.http.internal.common.SequentialScheduler;
-import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter;
-import jdk.incubator.http.internal.common.Utils;
-import jdk.incubator.http.internal.websocket.OpeningHandshake.Result;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Binary;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Close;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Context;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Ping;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Pong;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Text;
-
import static java.util.Objects.requireNonNull;
import static jdk.incubator.http.internal.common.MinimalFuture.failedFuture;
import static jdk.incubator.http.internal.common.Pair.pair;
@@ -69,7 +69,6 @@
private final URI uri;
private final String subprotocol;
- private final RawChannel channel; /* Stored to call close() on */
private final Listener listener;
private volatile boolean inputClosed;
@@ -90,17 +89,6 @@
private final Receiver receiver;
/*
- * Whether or not the WebSocket has been closed. When a WebSocket has been
- * closed it means that no further messages can be sent or received.
- * A closure can be triggered by:
- *
- * 1. abort()
- * 2. "Failing the WebSocket Connection" (i.e. a fatal error)
- * 3. Completion of the Closing handshake
- */
- private final AtomicBoolean closed = new AtomicBoolean();
-
- /*
* This lock is enforcing sequential ordering of invocations to listener's
* methods. It is supposed to be uncontended. The only contention that can
* happen is when onOpen, an asynchronous onError (not related to reading
@@ -110,15 +98,12 @@
*/
private final Object lock = new Object();
- private final CompletableFuture<?> channelInputClosed = new MinimalFuture<>();
- private final CompletableFuture<?> channelOutputClosed = new MinimalFuture<>();
-
public static CompletableFuture<WebSocket> newInstanceAsync(BuilderImpl b) {
Function<Result, WebSocket> newWebSocket = r -> {
WebSocketImpl ws = new WebSocketImpl(b.getUri(),
r.subprotocol,
- r.channel,
- b.getListener());
+ b.getListener(),
+ r.transport);
// The order of calls might cause a subtle effects, like CF will be
// returned from the buildAsync _after_ onOpen has been signalled.
// This means if onOpen is lengthy, it might cause some problems.
@@ -141,42 +126,15 @@
WebSocketImpl(URI uri,
String subprotocol,
- RawChannel channel,
- Listener listener)
- {
- this(uri,
- subprotocol,
- channel,
- listener,
- new Transmitter(channel));
- }
-
- /* Exposed for testing purposes */
- WebSocketImpl(URI uri,
- String subprotocol,
- RawChannel channel,
Listener listener,
- Transmitter transmitter)
+ TransportSupplier transport)
{
this.uri = requireNonNull(uri);
this.subprotocol = requireNonNull(subprotocol);
- this.channel = requireNonNull(channel);
this.listener = requireNonNull(listener);
- this.transmitter = transmitter;
- this.receiver = new Receiver(messageConsumerOf(listener), channel);
+ this.transmitter = transport.transmitter();
+ this.receiver = transport.receiver(messageConsumerOf(listener));
this.sendScheduler = new SequentialScheduler(new SendTask());
-
- // Set up automatic channel closing action
- CompletableFuture.allOf(channelInputClosed, channelOutputClosed)
- .whenComplete((result, error) -> {
- try {
- channel.close();
- } catch (IOException e) {
- Log.logError(e);
- } finally {
- closed.set(true);
- }
- });
}
/*
@@ -201,7 +159,15 @@
Log.logError(error);
} else {
lastMethodInvoked = true;
- receiver.close();
+ try {
+ try {
+ receiver.close();
+ } finally {
+ transmitter.close();
+ }
+ } catch (IOException e) {
+ Log.logError(e);
+ }
try {
listener.onError(this, error);
} catch (Exception e) {
@@ -212,23 +178,16 @@
}
/*
- * Processes a Close event that came from the channel. Invoked at most once.
+ * Processes a Close event that came from the receiver. Invoked at most
+ * once. No further messages are pulled from the receiver.
*/
private void processClose(int statusCode, String reason) {
inputClosed = true;
- receiver.close();
try {
- channel.shutdownInput();
+ receiver.close();
} catch (IOException e) {
Log.logError(e);
}
- boolean alreadyCompleted = !channelInputClosed.complete(null);
- if (alreadyCompleted) {
- // This CF is supposed to be completed only once, the first time a
- // Close message is received. No further messages are pulled from
- // the socket.
- throw new InternalError();
- }
int code;
if (statusCode == NO_STATUS_CODE || statusCode == CLOSED_ABNORMALLY) {
code = NORMAL_CLOSURE;
@@ -259,7 +218,11 @@
Log.logTrace("Close: {0}, ''{1}''", statusCode, reason);
} else {
lastMethodInvoked = true;
- receiver.close();
+ try {
+ receiver.close();
+ } catch (IOException e) {
+ Log.logError(e);
+ }
try {
return listener.onClose(this, statusCode, reason);
} catch (Exception e) {
@@ -316,22 +279,22 @@
* no more messages are expected to be sent after this.
*/
private CompletableFuture<WebSocket> enqueueClose(Close m) {
+ // MUST be a CF created once and shared across sendClose, otherwise
+ // a second sendClose may prematurely close the channel
return enqueue(m)
.orTimeout(60, TimeUnit.SECONDS)
.whenComplete((r, error) -> {
+ try {
+ transmitter.close();
+ } catch (IOException e) {
+ Log.logError(e);
+ }
if (error instanceof TimeoutException) {
try {
- channel.close();
+ receiver.close();
} catch (IOException e) {
Log.logError(e);
}
- } else {
- try {
- channel.shutdownOutput();
- } catch (IOException e) {
- Log.logError(e);
- }
- channelOutputClosed.complete(null);
}
});
}
@@ -432,19 +395,20 @@
inputClosed = true;
outputClosed = true;
try {
- channel.close();
- } catch (IOException ignored) {
- } finally {
- closed.set(true);
- signalClose(CLOSED_ABNORMALLY, "");
- }
+ try {
+ receiver.close();
+ } finally {
+ transmitter.close();
+ }
+ } catch (IOException ignored) { }
}
@Override
public String toString() {
return super.toString()
- + "[" + (closed.get() ? "CLOSED" : "OPEN") + "]: " + uri
- + (!subprotocol.isEmpty() ? ", subprotocol=" + subprotocol : "");
+ + "[uri=" + uri
+ + (!subprotocol.isEmpty() ? ", subprotocol=" + subprotocol : "")
+ + "]";
}
private MessageStreamConsumer messageConsumerOf(Listener listener) {
@@ -483,7 +447,7 @@
receiver.acknowledge();
// Let's make a full copy of this tiny data. What we want here
// is to rule out a possibility the shared data we send might be
- // corrupted the by processing in the listener.
+ // corrupted by processing in the listener.
ByteBuffer slice = data.slice();
ByteBuffer copy = ByteBuffer.allocate(data.remaining())
.put(data)
@@ -535,18 +499,11 @@
} else {
Exception ex = (Exception) new ProtocolException().initCause(error);
int code = ((FailWebSocketException) error).getStatusCode();
- enqueueClose(new Close(code, ""))
+ enqueueClose(new Close(code, "")) // do we have to wait for 60 secs? nah...
.whenComplete((r, e) -> {
if (e != null) {
ex.addSuppressed(Utils.getCompletionCause(e));
}
- try {
- channel.close();
- } catch (IOException e1) {
- ex.addSuppressed(e1);
- } finally {
- closed.set(true);
- }
signalError(ex);
});
}
--- a/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/SendingTest.java Fri Nov 24 12:10:19 2017 +0000
+++ b/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/SendingTest.java Fri Nov 24 17:51:51 2017 +0300
@@ -26,6 +26,7 @@
import jdk.incubator.http.WebSocket;
import org.testng.annotations.Test;
+import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Queue;
@@ -186,11 +187,17 @@
public void close() {
}
};
+ TransportSupplier transport = new TransportSupplier(channel) {
+ @Override
+ public Transmitter transmitter() {
+ return transmitter;
+ }
+ };
return new WebSocketImpl(
uri,
subprotocol,
- channel,
- new WebSocket.Listener() { }, transmitter);
+ new WebSocket.Listener() { },
+ transport);
}
private abstract class MockTransmitter extends Transmitter {
@@ -225,6 +232,9 @@
message);
}
+ @Override
+ public void close() { }
+
protected abstract CompletionStage<?> whenSent();
public Queue<OutgoingMessage> queue() {