src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java
branchhttp-client-branch
changeset 56092 fd85b2bf2b0d
parent 56089 42208b2f224e
child 56263 4933a477d628
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java	Wed Feb 07 21:45:37 2018 +0000
@@ -0,0 +1,533 @@
+/*
+ * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.internal.net.http.websocket;
+
+import java.net.http.WebSocket;
+import jdk.internal.net.http.common.Demand;
+import jdk.internal.net.http.common.Log;
+import jdk.internal.net.http.common.MinimalFuture;
+import jdk.internal.net.http.common.SequentialScheduler;
+import jdk.internal.net.http.common.Utils;
+import jdk.internal.net.http.websocket.OpeningHandshake.Result;
+
+import java.io.IOException;
+import java.lang.ref.Reference;
+import java.net.ProtocolException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import static java.util.Objects.requireNonNull;
+import static jdk.internal.net.http.common.MinimalFuture.failedFuture;
+import static jdk.internal.net.http.websocket.StatusCodes.CLOSED_ABNORMALLY;
+import static jdk.internal.net.http.websocket.StatusCodes.NO_STATUS_CODE;
+import static jdk.internal.net.http.websocket.StatusCodes.isLegalToSendFromClient;
+import static jdk.internal.net.http.websocket.WebSocketImpl.State.BINARY;
+import static jdk.internal.net.http.websocket.WebSocketImpl.State.CLOSE;
+import static jdk.internal.net.http.websocket.WebSocketImpl.State.ERROR;
+import static jdk.internal.net.http.websocket.WebSocketImpl.State.IDLE;
+import static jdk.internal.net.http.websocket.WebSocketImpl.State.OPEN;
+import static jdk.internal.net.http.websocket.WebSocketImpl.State.PING;
+import static jdk.internal.net.http.websocket.WebSocketImpl.State.PONG;
+import static jdk.internal.net.http.websocket.WebSocketImpl.State.TEXT;
+import static jdk.internal.net.http.websocket.WebSocketImpl.State.WAITING;
+
+/*
+ * A WebSocket client.
+ */
+public final class WebSocketImpl implements WebSocket {
+
+    enum State {
+        OPEN,
+        IDLE,
+        WAITING,
+        TEXT,
+        BINARY,
+        PING,
+        PONG,
+        CLOSE,
+        ERROR;
+    }
+
+    private volatile boolean inputClosed;
+    private volatile boolean outputClosed;
+
+    private final AtomicReference<State> state = new AtomicReference<>(OPEN);
+
+    /* Components of calls to Listener's methods */
+    private MessagePart part;
+    private ByteBuffer binaryData;
+    private CharSequence text;
+    private int statusCode;
+    private String reason;
+    private final AtomicReference<Throwable> error = new AtomicReference<>();
+
+    private final URI uri;
+    private final String subprotocol;
+    private final Listener listener;
+
+    private final AtomicBoolean outstandingSend = new AtomicBoolean();
+    private final Transport<WebSocket> transport;
+    private final SequentialScheduler receiveScheduler = new SequentialScheduler(new ReceiveTask());
+    private final Demand demand = new Demand();
+
+    public static CompletableFuture<WebSocket> newInstanceAsync(BuilderImpl b) {
+        Function<Result, WebSocket> newWebSocket = r -> {
+            WebSocket ws = newInstance(b.getUri(),
+                                       r.subprotocol,
+                                       b.getListener(),
+                                       r.transport);
+            // Make sure we don't release the builder until this lambda
+            // has been executed. The builder has a strong reference to
+            // the HttpClientFacade, and we want to keep that live until
+            // after the raw channel is created and passed to WebSocketImpl.
+            Reference.reachabilityFence(b);
+            return ws;
+        };
+        OpeningHandshake h;
+        try {
+            h = new OpeningHandshake(b);
+        } catch (Throwable e) {
+            return failedFuture(e);
+        }
+        return h.send().thenApply(newWebSocket);
+    }
+
+    /* Exposed for testing purposes */
+    static WebSocketImpl newInstance(URI uri,
+                                     String subprotocol,
+                                     Listener listener,
+                                     TransportFactory transport) {
+        WebSocketImpl ws = new WebSocketImpl(uri, subprotocol, listener, transport);
+        // This initialisation is outside of the constructor for the sake of
+        // safe publication of WebSocketImpl.this
+        ws.signalOpen();
+        return ws;
+    }
+
+    private WebSocketImpl(URI uri,
+                          String subprotocol,
+                          Listener listener,
+                          TransportFactory transportFactory) {
+        this.uri = requireNonNull(uri);
+        this.subprotocol = requireNonNull(subprotocol);
+        this.listener = requireNonNull(listener);
+        this.transport = transportFactory.createTransport(
+                () -> WebSocketImpl.this, // What about escape of WebSocketImpl.this?
+                new SignallingMessageConsumer());
+    }
+
+    @Override
+    public CompletableFuture<WebSocket> sendText(CharSequence message,
+                                                 boolean isLast) {
+        Objects.requireNonNull(message);
+        if (!outstandingSend.compareAndSet(false, true)) {
+            return failedFuture(new IllegalStateException("Send pending"));
+        }
+        CompletableFuture<WebSocket> cf = transport.sendText(message, isLast);
+        return cf.whenComplete((r, e) -> outstandingSend.set(false));
+    }
+
+    @Override
+    public CompletableFuture<WebSocket> sendBinary(ByteBuffer message,
+                                                   boolean isLast) {
+        Objects.requireNonNull(message);
+        if (!outstandingSend.compareAndSet(false, true)) {
+            return failedFuture(new IllegalStateException("Send pending"));
+        }
+        CompletableFuture<WebSocket> cf = transport.sendBinary(message, isLast);
+        // Optimize?
+        //     if (cf.isDone()) {
+        //         outstandingSend.set(false);
+        //     } else {
+        //         cf.whenComplete((r, e) -> outstandingSend.set(false));
+        //     }
+        return cf.whenComplete((r, e) -> outstandingSend.set(false));
+    }
+
+    @Override
+    public CompletableFuture<WebSocket> sendPing(ByteBuffer message) {
+        return transport.sendPing(message);
+    }
+
+    @Override
+    public CompletableFuture<WebSocket> sendPong(ByteBuffer message) {
+        return transport.sendPong(message);
+    }
+
+    @Override
+    public CompletableFuture<WebSocket> sendClose(int statusCode, String reason) {
+        Objects.requireNonNull(reason);
+        if (!isLegalToSendFromClient(statusCode)) {
+            return failedFuture(new IllegalArgumentException("statusCode"));
+        }
+        return sendClose0(statusCode, reason);
+    }
+
+    /*
+     * Sends a Close message, then shuts down the output since no more
+     * messages are expected to be sent after this.
+     */
+    private CompletableFuture<WebSocket> sendClose0(int statusCode, String reason ) {
+        outputClosed = true;
+        return transport.sendClose(statusCode, reason)
+                .whenComplete((result, error) -> {
+                    try {
+                        transport.closeOutput();
+                    } catch (IOException e) {
+                        Log.logError(e);
+                    }
+                    Throwable cause = Utils.getCompletionCause(error);
+                    if (cause instanceof TimeoutException) {
+                        try {
+                            transport.closeInput();
+                        } catch (IOException e) {
+                            Log.logError(e);
+                        }
+                    }
+                });
+    }
+
+    @Override
+    public void request(long n) {
+        if (demand.increase(n)) {
+            receiveScheduler.runOrSchedule();
+        }
+    }
+
+    @Override
+    public String getSubprotocol() {
+        return subprotocol;
+    }
+
+    @Override
+    public boolean isOutputClosed() {
+        return outputClosed;
+    }
+
+    @Override
+    public boolean isInputClosed() {
+        return inputClosed;
+    }
+
+    @Override
+    public void abort() {
+        inputClosed = true;
+        outputClosed = true;
+        receiveScheduler.stop();
+        close();
+    }
+
+    @Override
+    public String toString() {
+        return super.toString()
+                + "[uri=" + uri
+                + (!subprotocol.isEmpty() ? ", subprotocol=" + subprotocol : "")
+                + "]";
+    }
+
+    /*
+     * The assumptions about order is as follows:
+     *
+     *     - state is never changed more than twice inside the `run` method:
+     *       x --(1)--> IDLE --(2)--> y (otherwise we're loosing events, or
+     *       overwriting parts of messages creating a mess since there's no
+     *       queueing)
+     *     - OPEN is always the first state
+     *     - no messages are requested/delivered before onOpen is called (this
+     *       is implemented by making WebSocket instance accessible first in
+     *       onOpen)
+     *     - after the state has been observed as CLOSE/ERROR, the scheduler
+     *       is stopped
+     */
+    private class ReceiveTask extends SequentialScheduler.CompleteRestartableTask {
+
+        // Transport only asked here and nowhere else because we must make sure
+        // onOpen is invoked first and no messages become pending before onOpen
+        // finishes
+
+        @Override
+        public void run() {
+            while (true) {
+                State s = state.get();
+                try {
+                    switch (s) {
+                        case OPEN:
+                            processOpen();
+                            tryChangeState(OPEN, IDLE);
+                            break;
+                        case TEXT:
+                            processText();
+                            tryChangeState(TEXT, IDLE);
+                            break;
+                        case BINARY:
+                            processBinary();
+                            tryChangeState(BINARY, IDLE);
+                            break;
+                        case PING:
+                            processPing();
+                            tryChangeState(PING, IDLE);
+                            break;
+                        case PONG:
+                            processPong();
+                            tryChangeState(PONG, IDLE);
+                            break;
+                        case CLOSE:
+                            processClose();
+                            return;
+                        case ERROR:
+                            processError();
+                            return;
+                        case IDLE:
+                            if (demand.tryDecrement()
+                                    && tryChangeState(IDLE, WAITING)) {
+                                transport.request(1);
+                            }
+                            return;
+                        case WAITING:
+                            // For debugging spurious signalling: when there was a
+                            // signal, but apparently nothing has changed
+                            return;
+                        default:
+                            throw new InternalError(String.valueOf(s));
+                    }
+                } catch (Throwable t) {
+                    signalError(t);
+                }
+            }
+        }
+
+        private void processError() throws IOException {
+            transport.closeInput();
+            receiveScheduler.stop();
+            Throwable err = error.get();
+            if (err instanceof FailWebSocketException) {
+                int code1 = ((FailWebSocketException) err).getStatusCode();
+                err = new ProtocolException().initCause(err);
+                sendClose0(code1, "")
+                        .whenComplete(
+                                (r, e) -> {
+                                    if (e != null) {
+                                        Log.logError(e);
+                                    }
+                                });
+            }
+            listener.onError(WebSocketImpl.this, err);
+        }
+
+        private void processClose() throws IOException {
+            transport.closeInput();
+            receiveScheduler.stop();
+            CompletionStage<?> readyToClose;
+            readyToClose = listener.onClose(WebSocketImpl.this, statusCode, reason);
+            if (readyToClose == null) {
+                readyToClose = MinimalFuture.completedFuture(null);
+            }
+            int code;
+            if (statusCode == NO_STATUS_CODE || statusCode == CLOSED_ABNORMALLY) {
+                code = NORMAL_CLOSURE;
+            } else {
+                code = statusCode;
+            }
+            readyToClose.whenComplete((r, e) -> {
+                sendClose0(code, "")
+                        .whenComplete((r1, e1) -> {
+                            if (e1 != null) {
+                                Log.logError(e1);
+                            }
+                        });
+            });
+        }
+
+        private void processPong() {
+            listener.onPong(WebSocketImpl.this, binaryData);
+        }
+
+        private void processPing() {
+            // Let's make a full copy of this tiny data. What we want here
+            // is to rule out a possibility the shared data we send might be
+            // corrupted by processing in the listener.
+            ByteBuffer slice = binaryData.slice();
+            ByteBuffer copy = ByteBuffer.allocate(binaryData.remaining())
+                    .put(binaryData)
+                    .flip();
+            // Non-exclusive send;
+            CompletableFuture<WebSocket> pongSent = transport.sendPong(copy);
+            pongSent.whenComplete(
+                    (r, e) -> {
+                        if (e != null) {
+                            signalError(Utils.getCompletionCause(e));
+                        }
+                    }
+            );
+            listener.onPing(WebSocketImpl.this, slice);
+        }
+
+        private void processBinary() {
+            listener.onBinary(WebSocketImpl.this, binaryData, part);
+        }
+
+        private void processText() {
+            listener.onText(WebSocketImpl.this, text, part);
+        }
+
+        private void processOpen() {
+            listener.onOpen(WebSocketImpl.this);
+        }
+    }
+
+    private void signalOpen() {
+        receiveScheduler.runOrSchedule();
+    }
+
+    private void signalError(Throwable error) {
+        inputClosed = true;
+        outputClosed = true;
+        if (!this.error.compareAndSet(null, error) || !trySetState(ERROR)) {
+            Log.logError(error);
+        } else {
+            close();
+        }
+    }
+
+    private void close() {
+        try {
+            try {
+                transport.closeInput();
+            } finally {
+                transport.closeOutput();
+            }
+        } catch (Throwable t) {
+            Log.logError(t);
+        }
+    }
+
+    /*
+     * Signals a Close event (might not correspond to anything happened on the
+     * channel, i.e. might be synthetic).
+     */
+    private void signalClose(int statusCode, String reason) {
+        inputClosed = true;
+        this.statusCode = statusCode;
+        this.reason = reason;
+        if (!trySetState(CLOSE)) {
+            Log.logTrace("Close: {0}, ''{1}''", statusCode, reason);
+        } else {
+            try {
+                transport.closeInput();
+            } catch (Throwable t) {
+                Log.logError(t);
+            }
+        }
+    }
+
+    private class SignallingMessageConsumer implements MessageStreamConsumer {
+
+        @Override
+        public void onText(CharSequence data, MessagePart part) {
+            transport.acknowledgeReception();
+            text = data;
+            WebSocketImpl.this.part = part;
+            tryChangeState(WAITING, TEXT);
+        }
+
+        @Override
+        public void onBinary(ByteBuffer data, MessagePart part) {
+            transport.acknowledgeReception();
+            binaryData = data;
+            WebSocketImpl.this.part = part;
+            tryChangeState(WAITING, BINARY);
+        }
+
+        @Override
+        public void onPing(ByteBuffer data) {
+            transport.acknowledgeReception();
+            binaryData = data;
+            tryChangeState(WAITING, PING);
+        }
+
+        @Override
+        public void onPong(ByteBuffer data) {
+            transport.acknowledgeReception();
+            binaryData = data;
+            tryChangeState(WAITING, PONG);
+        }
+
+        @Override
+        public void onClose(int statusCode, CharSequence reason) {
+            transport.acknowledgeReception();
+            signalClose(statusCode, reason.toString());
+        }
+
+        @Override
+        public void onComplete() {
+            transport.acknowledgeReception();
+            signalClose(CLOSED_ABNORMALLY, "");
+        }
+
+        @Override
+        public void onError(Throwable error) {
+            signalError(error);
+        }
+    }
+
+    private boolean trySetState(State newState) {
+        while (true) {
+            State currentState = state.get();
+            if (currentState == ERROR || currentState == CLOSE) {
+                return false;
+            } else if (state.compareAndSet(currentState, newState)) {
+                receiveScheduler.runOrSchedule();
+                return true;
+            }
+        }
+    }
+
+    private boolean tryChangeState(State expectedState, State newState) {
+        State witness = state.compareAndExchange(expectedState, newState);
+        if (witness == expectedState) {
+            receiveScheduler.runOrSchedule();
+            return true;
+        }
+        // This should be the only reason for inability to change the state from
+        // IDLE to WAITING: the state has changed to terminal
+        if (witness != ERROR && witness != CLOSE) {
+            throw new InternalError();
+        }
+        return false;
+    }
+
+    /* Exposed for testing purposes */
+    protected final Transport<WebSocket> transport() {
+        return transport;
+    }
+}