--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java Mon Nov 27 16:40:01 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java Tue Nov 28 12:25:13 2017 +0300
@@ -52,6 +52,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -67,36 +69,41 @@
*/
public final class WebSocketImpl implements WebSocket {
- private final URI uri;
- private final String subprotocol;
- private final Listener listener;
+ private static final int IDLE = 0;
+ private static final int OPEN = 1;
+ private static final int TEXT = 2;
+ private static final int BINARY = 4;
+ private static final int PING = 8;
+ private static final int PONG = 16;
+ private static final int CLOSE = 32;
+ private static final int ERROR = 64;
private volatile boolean inputClosed;
private volatile boolean outputClosed;
- /*
- * Whether or not Listener.onClose or Listener.onError has been already
- * invoked. We keep track of this since only one of these methods is invoked
- * and it is invoked at most once.
- */
- private boolean lastMethodInvoked;
+ /* Which of the listener's methods to call next? */
+ private final AtomicInteger state = new AtomicInteger(OPEN);
+
+ /* Components of calls to Listener's methods */
+ private MessagePart part;
+ private ByteBuffer binaryData;
+ private CharSequence text;
+ private int statusCode;
+ private String reason;
+ private final AtomicReference<Throwable> error = new AtomicReference<>();
+
+ private final URI uri;
+ private final String subprotocol;
+ private final Listener listener;
+
private final AtomicBoolean outstandingSend = new AtomicBoolean();
- private final SequentialScheduler sendScheduler;
+ private final SequentialScheduler sendScheduler = new SequentialScheduler(new SendTask());
private final Queue<Pair<OutgoingMessage, CompletableFuture<WebSocket>>>
queue = new ConcurrentLinkedQueue<>();
private final Context context = new OutgoingMessage.Context();
private final Transmitter transmitter;
private final Receiver receiver;
-
- /*
- * This lock is enforcing sequential ordering of invocations to listener's
- * methods. It is supposed to be uncontended. The only contention that can
- * happen is when onOpen, an asynchronous onError (not related to reading
- * from the channel, e.g. an error from automatic Pong reply) or onClose
- * (related to abort) happens. Since all of the above are one-shot actions,
- * the said contention is insignificant.
- */
- private final Object lock = new Object();
+ private final SequentialScheduler receiveScheduler = new SequentialScheduler(new ReceiveTask());
public static CompletableFuture<WebSocket> newInstanceAsync(BuilderImpl b) {
Function<Result, WebSocket> newWebSocket = r -> {
@@ -104,9 +111,8 @@
r.subprotocol,
b.getListener(),
r.transport);
- // The order of calls might cause a subtle effects, like CF will be
- // returned from the buildAsync _after_ onOpen has been signalled.
- // This means if onOpen is lengthy, it might cause some problems.
+ // This initialisation is outside of the constructor for the sake of
+ // safe publication of WebSocketImpl.this
ws.signalOpen();
// make sure we don't release the builder until this lambda
// has been executed. The builder has a strong reference to
@@ -118,7 +124,7 @@
OpeningHandshake h;
try {
h = new OpeningHandshake(b);
- } catch (Exception e) {
+ } catch (Throwable e) {
return failedFuture(e);
}
return h.send().thenApply(newWebSocket);
@@ -133,117 +139,16 @@
this.subprotocol = requireNonNull(subprotocol);
this.listener = requireNonNull(listener);
this.transmitter = transport.transmitter();
- this.receiver = transport.receiver(messageConsumerOf(listener));
- this.sendScheduler = new SequentialScheduler(new SendTask());
- }
-
- /*
- * This initialisation is outside of the constructor for the sake of
- * safe publication.
- */
- private void signalOpen() {
- synchronized (lock) {
- // TODO: might hold lock longer than needed causing prolonged
- // contention? substitute lock for ConcurrentLinkedQueue<Runnable>?
- try {
- listener.onOpen(this);
- } catch (Exception e) {
- signalError(e);
- }
- }
- }
-
- private void signalError(Throwable error) {
- synchronized (lock) {
- if (lastMethodInvoked) {
- Log.logError(error);
- } else {
- lastMethodInvoked = true;
- try {
- try {
- receiver.close();
- } finally {
- transmitter.close();
- }
- } catch (IOException e) {
- Log.logError(e);
- }
- try {
- listener.onError(this, error);
- } catch (Exception e) {
- Log.logError(e);
- }
- }
- }
- }
-
- /*
- * Processes a Close event that came from the receiver. Invoked at most
- * once. No further messages are pulled from the receiver.
- */
- private void processClose(int statusCode, String reason) {
- inputClosed = true;
- try {
- receiver.close();
- } catch (IOException e) {
- Log.logError(e);
- }
- int code;
- if (statusCode == NO_STATUS_CODE || statusCode == CLOSED_ABNORMALLY) {
- code = NORMAL_CLOSURE;
- } else {
- code = statusCode;
- }
- CompletionStage<?> readyToClose = signalClose(statusCode, reason);
- if (readyToClose == null) {
- readyToClose = MinimalFuture.completedFuture(null);
- }
- readyToClose.whenComplete((r, error) -> {
- enqueueClose(new Close(code, ""))
- .whenComplete((r1, error1) -> {
- if (error1 != null) {
- Log.logError(error1);
- }
- });
- });
- }
-
- /*
- * Signals a Close event (might not correspond to anything happened on the
- * channel, e.g. `abort()`).
- */
- private CompletionStage<?> signalClose(int statusCode, String reason) {
- synchronized (lock) {
- if (lastMethodInvoked) {
- Log.logTrace("Close: {0}, ''{1}''", statusCode, reason);
- } else {
- lastMethodInvoked = true;
- try {
- receiver.close();
- } catch (IOException e) {
- Log.logError(e);
- }
- try {
- return listener.onClose(this, statusCode, reason);
- } catch (Exception e) {
- Log.logError(e);
- }
- }
- }
- return null;
+ this.receiver = transport.receiver(new SignallingMessageConsumer());
}
@Override
- public CompletableFuture<WebSocket> sendText(CharSequence message,
- boolean isLast)
- {
+ public CompletableFuture<WebSocket> sendText(CharSequence message, boolean isLast) {
return enqueueExclusively(new Text(message, isLast));
}
@Override
- public CompletableFuture<WebSocket> sendBinary(ByteBuffer message,
- boolean isLast)
- {
+ public CompletableFuture<WebSocket> sendBinary(ByteBuffer message, boolean isLast) {
return enqueueExclusively(new Binary(message, isLast));
}
@@ -258,8 +163,7 @@
}
@Override
- public CompletableFuture<WebSocket> sendClose(int statusCode,
- String reason) {
+ public CompletableFuture<WebSocket> sendClose(int statusCode, String reason) {
if (!isLegalToSendFromClient(statusCode)) {
return failedFuture(
new IllegalArgumentException("statusCode: " + statusCode));
@@ -275,11 +179,11 @@
}
/*
- * Sends a Close message and then shuts down the channel for writing since
- * no more messages are expected to be sent after this.
+ * Sends a Close message, then shuts down the transmitter since no more
+ * messages are expected to be sent after this.
*/
private CompletableFuture<WebSocket> enqueueClose(Close m) {
- // MUST be a CF created once and shared across sendClose, otherwise
+ // TODO: MUST be a CF created once and shared across sendClose, otherwise
// a second sendClose may prematurely close the channel
return enqueue(m)
.orTimeout(60, TimeUnit.SECONDS)
@@ -309,7 +213,7 @@
private CompletableFuture<WebSocket> enqueueExclusively(OutgoingMessage m)
{
if (!outstandingSend.compareAndSet(false, true)) {
- return failedFuture(new IllegalStateException("Outstanding send"));
+ return failedFuture(new IllegalStateException("Send pending"));
}
return enqueue(m).whenComplete((r, e) -> outstandingSend.set(false));
}
@@ -331,6 +235,7 @@
* concurrently.
*/
private class SendTask implements SequentialScheduler.RestartableTask {
+
@Override
public void run(DeferredCompleter taskCompleter) {
Pair<OutgoingMessage, CompletableFuture<WebSocket>> p = queue.poll();
@@ -355,7 +260,7 @@
repeat(taskCompleter);
};
transmitter.send(message, h);
- } catch (Exception t) {
+ } catch (Throwable t) {
cf.completeExceptionally(t);
repeat(taskCompleter);
}
@@ -372,6 +277,8 @@
@Override
public void request(long n) {
+ // TODO: delay until state becomes ACTIVE, otherwise messages might be
+ // requested and consecutively become pending before onOpen is signalled
receiver.request(n);
}
@@ -394,13 +301,8 @@
public void abort() {
inputClosed = true;
outputClosed = true;
- try {
- try {
- receiver.close();
- } finally {
- transmitter.close();
- }
- } catch (IOException ignored) { }
+ receiveScheduler.stop();
+ close();
}
@Override
@@ -411,108 +313,248 @@
+ "]";
}
- private MessageStreamConsumer messageConsumerOf(Listener listener) {
- // Synchronization performed here in every method is not for the sake of
- // ordering invocations to this consumer, after all they are naturally
- // ordered in the channel. The reason is to avoid an interference with
- // any unrelated to the channel calls to onOpen, onClose and onError.
- return new MessageStreamConsumer() {
+ /*
+ * The assumptions about order is as follows:
+ *
+ * - state is never changed more than twice inside the `run` method:
+ * x --(1)--> IDLE --(2)--> y (otherwise we're loosing events, or
+ * overwriting parts of messages creating a mess since there's no
+ * queueing)
+ * - OPEN is always the first state
+ * - no messages are requested/delivered before onOpen is called (this
+ * is implemented by making WebSocket instance accessible first in
+ * onOpen)
+ * - after the state has been observed as CLOSE/ERROR, the scheduler
+ * is stopped
+ */
+ private class ReceiveTask extends SequentialScheduler.CompleteRestartableTask {
- @Override
- public void onText(CharSequence data, MessagePart part) {
- receiver.acknowledge();
- synchronized (WebSocketImpl.this.lock) {
- try {
- listener.onText(WebSocketImpl.this, data, part);
- } catch (Exception e) {
- signalError(e);
- }
+ @Override
+ public void run() {
+ final int s = state.getAndSet(IDLE);
+ try {
+ switch (s) {
+ case OPEN:
+ processOpen();
+ break;
+ case TEXT:
+ processText();
+ break;
+ case BINARY:
+ processBinary();
+ break;
+ case PING:
+ processPing();
+ break;
+ case PONG:
+ processPong();
+ break;
+ case CLOSE:
+ processClose();
+ break;
+ case ERROR:
+ processError();
+ break;
+ case IDLE:
+ // For debugging spurious signalling: when there was a
+ // signal, but apparently nothing has changed
+ break;
+ default:
+ throw new InternalError(String.valueOf(s));
}
+ // Do not keep references to arbitrary big objects we no longer
+ // need. It is unknown when the next message might come (if
+ // ever), so the following references should be null the sooner
+ // the better:
+ binaryData = null;
+ text = null;
+ } catch (Throwable t) {
+ signalError(t);
}
+ }
- @Override
- public void onBinary(ByteBuffer data, MessagePart part) {
- receiver.acknowledge();
- synchronized (WebSocketImpl.this.lock) {
- try {
- listener.onBinary(WebSocketImpl.this, data.slice(), part);
- } catch (Exception e) {
- signalError(e);
- }
- }
+ private void processError() throws IOException {
+ receiver.close();
+ receiveScheduler.stop();
+ Throwable err = error.get();
+ if (err instanceof FailWebSocketException) {
+ int code1 = ((FailWebSocketException) err).getStatusCode();
+ err = new ProtocolException().initCause(err);
+ enqueueClose(new Close(code1, ""))
+ .whenComplete(
+ (r, e) -> {
+ if (e != null) {
+ Log.logError(e);
+ }
+ });
}
+ listener.onError(WebSocketImpl.this, err);
+ }
- @Override
- public void onPing(ByteBuffer data) {
- receiver.acknowledge();
- // Let's make a full copy of this tiny data. What we want here
- // is to rule out a possibility the shared data we send might be
- // corrupted by processing in the listener.
- ByteBuffer slice = data.slice();
- ByteBuffer copy = ByteBuffer.allocate(data.remaining())
- .put(data)
- .flip();
- // Non-exclusive send;
- CompletableFuture<WebSocket> pongSent = enqueue(new Pong(copy));
- pongSent.whenComplete(
- (r, error) -> {
- if (error != null) {
- WebSocketImpl.this.signalError(
- Utils.getCompletionCause(error));
+ private void processClose() throws IOException {
+ receiver.close();
+ receiveScheduler.stop();
+ CompletionStage<?> readyToClose;
+ readyToClose = listener.onClose(WebSocketImpl.this, statusCode, reason);
+ if (readyToClose == null) {
+ readyToClose = MinimalFuture.completedFuture(null);
+ }
+ int code;
+ if (statusCode == NO_STATUS_CODE || statusCode == CLOSED_ABNORMALLY) {
+ code = NORMAL_CLOSURE;
+ } else {
+ code = statusCode;
+ }
+ readyToClose.whenComplete((r, e) -> {
+ enqueueClose(new Close(code, ""))
+ .whenComplete((r1, e1) -> {
+ if (e1 != null) {
+ Log.logError(e1);
}
+ });
+ });
+ }
+
+ private void processPong() {
+ listener.onPong(WebSocketImpl.this, binaryData);
+ }
+
+ private void processPing() {
+ // Let's make a full copy of this tiny data. What we want here
+ // is to rule out a possibility the shared data we send might be
+ // corrupted by processing in the listener.
+ ByteBuffer slice = binaryData.slice();
+ ByteBuffer copy = ByteBuffer.allocate(binaryData.remaining())
+ .put(binaryData)
+ .flip();
+ // Non-exclusive send;
+ CompletableFuture<WebSocket> pongSent = enqueue(new Pong(copy));
+ pongSent.whenComplete(
+ (r, e) -> {
+ if (e != null) {
+ signalError(Utils.getCompletionCause(e));
}
- );
- synchronized (WebSocketImpl.this.lock) {
- try {
- listener.onPing(WebSocketImpl.this, slice);
- } catch (Exception e) {
- signalError(e);
}
- }
- }
+ );
+ listener.onPing(WebSocketImpl.this, slice);
+ }
+
+ private void processBinary() {
+ listener.onBinary(WebSocketImpl.this, binaryData, part);
+ }
+
+ private void processText() {
+ listener.onText(WebSocketImpl.this, text, part);
+ }
+
+ private void processOpen() {
+ listener.onOpen(WebSocketImpl.this);
+ }
+ }
+
+ private void signalOpen() {
+ receiveScheduler.runOrSchedule();
+ }
- @Override
- public void onPong(ByteBuffer data) {
- receiver.acknowledge();
- synchronized (WebSocketImpl.this.lock) {
- try {
- listener.onPong(WebSocketImpl.this, data.slice());
- } catch (Exception e) {
- signalError(e);
- }
- }
+ private void signalError(Throwable error) {
+ inputClosed = true;
+ outputClosed = true;
+ if (!this.error.compareAndSet(null, error) || !tryChangeState(ERROR)) {
+ Log.logError(error);
+ } else {
+ close();
+ }
+ }
+
+ private void close() {
+ try {
+ try {
+ receiver.close();
+ } finally {
+ transmitter.close();
}
+ } catch (Throwable t) {
+ Log.logError(t);
+ }
+ }
- @Override
- public void onClose(int statusCode, CharSequence reason) {
- receiver.acknowledge();
- processClose(statusCode, reason.toString());
+ /*
+ * Signals a Close event (might not correspond to anything happened on the
+ * channel, i.e. might be synthetic).
+ */
+ private void signalClose(int statusCode, String reason) {
+ inputClosed = true;
+ this.statusCode = statusCode;
+ this.reason = reason;
+ if (!tryChangeState(CLOSE)) {
+ Log.logTrace("Close: {0}, ''{1}''", statusCode, reason);
+ } else {
+ try {
+ receiver.close();
+ } catch (Throwable t) {
+ Log.logError(t);
}
+ }
+ }
- @Override
- public void onError(Exception error) {
- inputClosed = true;
- outputClosed = true;
- if (!(error instanceof FailWebSocketException)) {
- abort();
- signalError(error);
- } else {
- Exception ex = (Exception) new ProtocolException().initCause(error);
- int code = ((FailWebSocketException) error).getStatusCode();
- enqueueClose(new Close(code, "")) // do we have to wait for 60 secs? nah...
- .whenComplete((r, e) -> {
- if (e != null) {
- ex.addSuppressed(Utils.getCompletionCause(e));
- }
- signalError(ex);
- });
- }
+ private class SignallingMessageConsumer implements MessageStreamConsumer {
+
+ @Override
+ public void onText(CharSequence data, MessagePart part) {
+ receiver.acknowledge();
+ text = data;
+ WebSocketImpl.this.part = part;
+ tryChangeState(TEXT);
+ }
+
+ @Override
+ public void onBinary(ByteBuffer data, MessagePart part) {
+ receiver.acknowledge();
+ binaryData = data;
+ WebSocketImpl.this.part = part;
+ tryChangeState(BINARY);
+ }
+
+ @Override
+ public void onPing(ByteBuffer data) {
+ receiver.acknowledge();
+ binaryData = data;
+ tryChangeState(PING);
+ }
+
+ @Override
+ public void onPong(ByteBuffer data) {
+ receiver.acknowledge();
+ binaryData = data;
+ tryChangeState(PONG);
+ }
+
+ @Override
+ public void onClose(int statusCode, CharSequence reason) {
+ receiver.acknowledge();
+ signalClose(statusCode, reason.toString());
+ }
+
+ @Override
+ public void onComplete() {
+ signalClose(CLOSED_ABNORMALLY, "");
+ }
+
+ @Override
+ public void onError(Throwable error) {
+ signalError(error);
+ }
+ }
+
+ private boolean tryChangeState(int newState) {
+ while (true) {
+ int currentState = state.get();
+ if (currentState == ERROR || currentState == CLOSE) {
+ return false;
+ } else if (state.compareAndSet(currentState, newState)) {
+ receiveScheduler.runOrSchedule();
+ return true;
}
-
- @Override
- public void onComplete() {
- processClose(CLOSED_ABNORMALLY, "");
- }
- };
+ }
}
}