src/java.net.http/share/classes/java/net/http/internal/websocket/WebSocketImpl.java
branchhttp-client-branch
changeset 56092 fd85b2bf2b0d
parent 56091 aedd6133e7a0
child 56093 22d94c4a3641
--- a/src/java.net.http/share/classes/java/net/http/internal/websocket/WebSocketImpl.java	Wed Feb 07 15:46:30 2018 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,533 +0,0 @@
-/*
- * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package java.net.http.internal.websocket;
-
-import java.net.http.WebSocket;
-import java.net.http.internal.common.Demand;
-import java.net.http.internal.common.Log;
-import java.net.http.internal.common.MinimalFuture;
-import java.net.http.internal.common.SequentialScheduler;
-import java.net.http.internal.common.Utils;
-import java.net.http.internal.websocket.OpeningHandshake.Result;
-
-import java.io.IOException;
-import java.lang.ref.Reference;
-import java.net.ProtocolException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
-
-import static java.util.Objects.requireNonNull;
-import static java.net.http.internal.common.MinimalFuture.failedFuture;
-import static java.net.http.internal.websocket.StatusCodes.CLOSED_ABNORMALLY;
-import static java.net.http.internal.websocket.StatusCodes.NO_STATUS_CODE;
-import static java.net.http.internal.websocket.StatusCodes.isLegalToSendFromClient;
-import static java.net.http.internal.websocket.WebSocketImpl.State.BINARY;
-import static java.net.http.internal.websocket.WebSocketImpl.State.CLOSE;
-import static java.net.http.internal.websocket.WebSocketImpl.State.ERROR;
-import static java.net.http.internal.websocket.WebSocketImpl.State.IDLE;
-import static java.net.http.internal.websocket.WebSocketImpl.State.OPEN;
-import static java.net.http.internal.websocket.WebSocketImpl.State.PING;
-import static java.net.http.internal.websocket.WebSocketImpl.State.PONG;
-import static java.net.http.internal.websocket.WebSocketImpl.State.TEXT;
-import static java.net.http.internal.websocket.WebSocketImpl.State.WAITING;
-
-/*
- * A WebSocket client.
- */
-public final class WebSocketImpl implements WebSocket {
-
-    enum State {
-        OPEN,
-        IDLE,
-        WAITING,
-        TEXT,
-        BINARY,
-        PING,
-        PONG,
-        CLOSE,
-        ERROR;
-    }
-
-    private volatile boolean inputClosed;
-    private volatile boolean outputClosed;
-
-    private final AtomicReference<State> state = new AtomicReference<>(OPEN);
-
-    /* Components of calls to Listener's methods */
-    private MessagePart part;
-    private ByteBuffer binaryData;
-    private CharSequence text;
-    private int statusCode;
-    private String reason;
-    private final AtomicReference<Throwable> error = new AtomicReference<>();
-
-    private final URI uri;
-    private final String subprotocol;
-    private final Listener listener;
-
-    private final AtomicBoolean outstandingSend = new AtomicBoolean();
-    private final Transport<WebSocket> transport;
-    private final SequentialScheduler receiveScheduler = new SequentialScheduler(new ReceiveTask());
-    private final Demand demand = new Demand();
-
-    public static CompletableFuture<WebSocket> newInstanceAsync(BuilderImpl b) {
-        Function<Result, WebSocket> newWebSocket = r -> {
-            WebSocket ws = newInstance(b.getUri(),
-                                       r.subprotocol,
-                                       b.getListener(),
-                                       r.transport);
-            // Make sure we don't release the builder until this lambda
-            // has been executed. The builder has a strong reference to
-            // the HttpClientFacade, and we want to keep that live until
-            // after the raw channel is created and passed to WebSocketImpl.
-            Reference.reachabilityFence(b);
-            return ws;
-        };
-        OpeningHandshake h;
-        try {
-            h = new OpeningHandshake(b);
-        } catch (Throwable e) {
-            return failedFuture(e);
-        }
-        return h.send().thenApply(newWebSocket);
-    }
-
-    /* Exposed for testing purposes */
-    static WebSocketImpl newInstance(URI uri,
-                                     String subprotocol,
-                                     Listener listener,
-                                     TransportFactory transport) {
-        WebSocketImpl ws = new WebSocketImpl(uri, subprotocol, listener, transport);
-        // This initialisation is outside of the constructor for the sake of
-        // safe publication of WebSocketImpl.this
-        ws.signalOpen();
-        return ws;
-    }
-
-    private WebSocketImpl(URI uri,
-                          String subprotocol,
-                          Listener listener,
-                          TransportFactory transportFactory) {
-        this.uri = requireNonNull(uri);
-        this.subprotocol = requireNonNull(subprotocol);
-        this.listener = requireNonNull(listener);
-        this.transport = transportFactory.createTransport(
-                () -> WebSocketImpl.this, // What about escape of WebSocketImpl.this?
-                new SignallingMessageConsumer());
-    }
-
-    @Override
-    public CompletableFuture<WebSocket> sendText(CharSequence message,
-                                                 boolean isLast) {
-        Objects.requireNonNull(message);
-        if (!outstandingSend.compareAndSet(false, true)) {
-            return failedFuture(new IllegalStateException("Send pending"));
-        }
-        CompletableFuture<WebSocket> cf = transport.sendText(message, isLast);
-        return cf.whenComplete((r, e) -> outstandingSend.set(false));
-    }
-
-    @Override
-    public CompletableFuture<WebSocket> sendBinary(ByteBuffer message,
-                                                   boolean isLast) {
-        Objects.requireNonNull(message);
-        if (!outstandingSend.compareAndSet(false, true)) {
-            return failedFuture(new IllegalStateException("Send pending"));
-        }
-        CompletableFuture<WebSocket> cf = transport.sendBinary(message, isLast);
-        // Optimize?
-        //     if (cf.isDone()) {
-        //         outstandingSend.set(false);
-        //     } else {
-        //         cf.whenComplete((r, e) -> outstandingSend.set(false));
-        //     }
-        return cf.whenComplete((r, e) -> outstandingSend.set(false));
-    }
-
-    @Override
-    public CompletableFuture<WebSocket> sendPing(ByteBuffer message) {
-        return transport.sendPing(message);
-    }
-
-    @Override
-    public CompletableFuture<WebSocket> sendPong(ByteBuffer message) {
-        return transport.sendPong(message);
-    }
-
-    @Override
-    public CompletableFuture<WebSocket> sendClose(int statusCode, String reason) {
-        Objects.requireNonNull(reason);
-        if (!isLegalToSendFromClient(statusCode)) {
-            return failedFuture(new IllegalArgumentException("statusCode"));
-        }
-        return sendClose0(statusCode, reason);
-    }
-
-    /*
-     * Sends a Close message, then shuts down the output since no more
-     * messages are expected to be sent after this.
-     */
-    private CompletableFuture<WebSocket> sendClose0(int statusCode, String reason ) {
-        outputClosed = true;
-        return transport.sendClose(statusCode, reason)
-                .whenComplete((result, error) -> {
-                    try {
-                        transport.closeOutput();
-                    } catch (IOException e) {
-                        Log.logError(e);
-                    }
-                    Throwable cause = Utils.getCompletionCause(error);
-                    if (cause instanceof TimeoutException) {
-                        try {
-                            transport.closeInput();
-                        } catch (IOException e) {
-                            Log.logError(e);
-                        }
-                    }
-                });
-    }
-
-    @Override
-    public void request(long n) {
-        if (demand.increase(n)) {
-            receiveScheduler.runOrSchedule();
-        }
-    }
-
-    @Override
-    public String getSubprotocol() {
-        return subprotocol;
-    }
-
-    @Override
-    public boolean isOutputClosed() {
-        return outputClosed;
-    }
-
-    @Override
-    public boolean isInputClosed() {
-        return inputClosed;
-    }
-
-    @Override
-    public void abort() {
-        inputClosed = true;
-        outputClosed = true;
-        receiveScheduler.stop();
-        close();
-    }
-
-    @Override
-    public String toString() {
-        return super.toString()
-                + "[uri=" + uri
-                + (!subprotocol.isEmpty() ? ", subprotocol=" + subprotocol : "")
-                + "]";
-    }
-
-    /*
-     * The assumptions about order is as follows:
-     *
-     *     - state is never changed more than twice inside the `run` method:
-     *       x --(1)--> IDLE --(2)--> y (otherwise we're loosing events, or
-     *       overwriting parts of messages creating a mess since there's no
-     *       queueing)
-     *     - OPEN is always the first state
-     *     - no messages are requested/delivered before onOpen is called (this
-     *       is implemented by making WebSocket instance accessible first in
-     *       onOpen)
-     *     - after the state has been observed as CLOSE/ERROR, the scheduler
-     *       is stopped
-     */
-    private class ReceiveTask extends SequentialScheduler.CompleteRestartableTask {
-
-        // Transport only asked here and nowhere else because we must make sure
-        // onOpen is invoked first and no messages become pending before onOpen
-        // finishes
-
-        @Override
-        public void run() {
-            while (true) {
-                State s = state.get();
-                try {
-                    switch (s) {
-                        case OPEN:
-                            processOpen();
-                            tryChangeState(OPEN, IDLE);
-                            break;
-                        case TEXT:
-                            processText();
-                            tryChangeState(TEXT, IDLE);
-                            break;
-                        case BINARY:
-                            processBinary();
-                            tryChangeState(BINARY, IDLE);
-                            break;
-                        case PING:
-                            processPing();
-                            tryChangeState(PING, IDLE);
-                            break;
-                        case PONG:
-                            processPong();
-                            tryChangeState(PONG, IDLE);
-                            break;
-                        case CLOSE:
-                            processClose();
-                            return;
-                        case ERROR:
-                            processError();
-                            return;
-                        case IDLE:
-                            if (demand.tryDecrement()
-                                    && tryChangeState(IDLE, WAITING)) {
-                                transport.request(1);
-                            }
-                            return;
-                        case WAITING:
-                            // For debugging spurious signalling: when there was a
-                            // signal, but apparently nothing has changed
-                            return;
-                        default:
-                            throw new InternalError(String.valueOf(s));
-                    }
-                } catch (Throwable t) {
-                    signalError(t);
-                }
-            }
-        }
-
-        private void processError() throws IOException {
-            transport.closeInput();
-            receiveScheduler.stop();
-            Throwable err = error.get();
-            if (err instanceof FailWebSocketException) {
-                int code1 = ((FailWebSocketException) err).getStatusCode();
-                err = new ProtocolException().initCause(err);
-                sendClose0(code1, "")
-                        .whenComplete(
-                                (r, e) -> {
-                                    if (e != null) {
-                                        Log.logError(e);
-                                    }
-                                });
-            }
-            listener.onError(WebSocketImpl.this, err);
-        }
-
-        private void processClose() throws IOException {
-            transport.closeInput();
-            receiveScheduler.stop();
-            CompletionStage<?> readyToClose;
-            readyToClose = listener.onClose(WebSocketImpl.this, statusCode, reason);
-            if (readyToClose == null) {
-                readyToClose = MinimalFuture.completedFuture(null);
-            }
-            int code;
-            if (statusCode == NO_STATUS_CODE || statusCode == CLOSED_ABNORMALLY) {
-                code = NORMAL_CLOSURE;
-            } else {
-                code = statusCode;
-            }
-            readyToClose.whenComplete((r, e) -> {
-                sendClose0(code, "")
-                        .whenComplete((r1, e1) -> {
-                            if (e1 != null) {
-                                Log.logError(e1);
-                            }
-                        });
-            });
-        }
-
-        private void processPong() {
-            listener.onPong(WebSocketImpl.this, binaryData);
-        }
-
-        private void processPing() {
-            // Let's make a full copy of this tiny data. What we want here
-            // is to rule out a possibility the shared data we send might be
-            // corrupted by processing in the listener.
-            ByteBuffer slice = binaryData.slice();
-            ByteBuffer copy = ByteBuffer.allocate(binaryData.remaining())
-                    .put(binaryData)
-                    .flip();
-            // Non-exclusive send;
-            CompletableFuture<WebSocket> pongSent = transport.sendPong(copy);
-            pongSent.whenComplete(
-                    (r, e) -> {
-                        if (e != null) {
-                            signalError(Utils.getCompletionCause(e));
-                        }
-                    }
-            );
-            listener.onPing(WebSocketImpl.this, slice);
-        }
-
-        private void processBinary() {
-            listener.onBinary(WebSocketImpl.this, binaryData, part);
-        }
-
-        private void processText() {
-            listener.onText(WebSocketImpl.this, text, part);
-        }
-
-        private void processOpen() {
-            listener.onOpen(WebSocketImpl.this);
-        }
-    }
-
-    private void signalOpen() {
-        receiveScheduler.runOrSchedule();
-    }
-
-    private void signalError(Throwable error) {
-        inputClosed = true;
-        outputClosed = true;
-        if (!this.error.compareAndSet(null, error) || !trySetState(ERROR)) {
-            Log.logError(error);
-        } else {
-            close();
-        }
-    }
-
-    private void close() {
-        try {
-            try {
-                transport.closeInput();
-            } finally {
-                transport.closeOutput();
-            }
-        } catch (Throwable t) {
-            Log.logError(t);
-        }
-    }
-
-    /*
-     * Signals a Close event (might not correspond to anything happened on the
-     * channel, i.e. might be synthetic).
-     */
-    private void signalClose(int statusCode, String reason) {
-        inputClosed = true;
-        this.statusCode = statusCode;
-        this.reason = reason;
-        if (!trySetState(CLOSE)) {
-            Log.logTrace("Close: {0}, ''{1}''", statusCode, reason);
-        } else {
-            try {
-                transport.closeInput();
-            } catch (Throwable t) {
-                Log.logError(t);
-            }
-        }
-    }
-
-    private class SignallingMessageConsumer implements MessageStreamConsumer {
-
-        @Override
-        public void onText(CharSequence data, MessagePart part) {
-            transport.acknowledgeReception();
-            text = data;
-            WebSocketImpl.this.part = part;
-            tryChangeState(WAITING, TEXT);
-        }
-
-        @Override
-        public void onBinary(ByteBuffer data, MessagePart part) {
-            transport.acknowledgeReception();
-            binaryData = data;
-            WebSocketImpl.this.part = part;
-            tryChangeState(WAITING, BINARY);
-        }
-
-        @Override
-        public void onPing(ByteBuffer data) {
-            transport.acknowledgeReception();
-            binaryData = data;
-            tryChangeState(WAITING, PING);
-        }
-
-        @Override
-        public void onPong(ByteBuffer data) {
-            transport.acknowledgeReception();
-            binaryData = data;
-            tryChangeState(WAITING, PONG);
-        }
-
-        @Override
-        public void onClose(int statusCode, CharSequence reason) {
-            transport.acknowledgeReception();
-            signalClose(statusCode, reason.toString());
-        }
-
-        @Override
-        public void onComplete() {
-            transport.acknowledgeReception();
-            signalClose(CLOSED_ABNORMALLY, "");
-        }
-
-        @Override
-        public void onError(Throwable error) {
-            signalError(error);
-        }
-    }
-
-    private boolean trySetState(State newState) {
-        while (true) {
-            State currentState = state.get();
-            if (currentState == ERROR || currentState == CLOSE) {
-                return false;
-            } else if (state.compareAndSet(currentState, newState)) {
-                receiveScheduler.runOrSchedule();
-                return true;
-            }
-        }
-    }
-
-    private boolean tryChangeState(State expectedState, State newState) {
-        State witness = state.compareAndExchange(expectedState, newState);
-        if (witness == expectedState) {
-            receiveScheduler.runOrSchedule();
-            return true;
-        }
-        // This should be the only reason for inability to change the state from
-        // IDLE to WAITING: the state has changed to terminal
-        if (witness != ERROR && witness != CLOSE) {
-            throw new InternalError();
-        }
-        return false;
-    }
-
-    /* Exposed for testing purposes */
-    protected final Transport<WebSocket> transport() {
-        return transport;
-    }
-}