jdk/src/java.httpclient/share/classes/java/net/http/WS.java
changeset 42483 3850c235c3fb
parent 42482 15297dde0d55
parent 42479 a80dbf731cbe
child 42489 a9e4de33da2e
--- a/jdk/src/java.httpclient/share/classes/java/net/http/WS.java	Thu Dec 08 21:22:02 2016 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,376 +0,0 @@
-/*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package java.net.http;
-
-import java.io.IOException;
-import java.net.ProtocolException;
-import java.net.http.WSOpeningHandshake.Result;
-import java.nio.ByteBuffer;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.Executor;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-
-import static java.lang.System.Logger.Level.ERROR;
-import static java.lang.System.Logger.Level.WARNING;
-import static java.net.http.WSUtils.logger;
-import static java.util.Objects.requireNonNull;
-
-/*
- * A WebSocket client.
- *
- * Consists of two independent parts; a transmitter responsible for sending
- * messages, and a receiver which notifies the listener of incoming messages.
- */
-final class WS implements WebSocket {
-
-    private final String subprotocol;
-    private final RawChannel channel;
-    private final WSTransmitter transmitter;
-    private final WSReceiver receiver;
-    private final Listener listener;
-    private final Object stateLock = new Object();
-    private volatile State state = State.CONNECTED;
-    private final CompletableFuture<Void> whenClosed = new CompletableFuture<>();
-
-    static CompletableFuture<WebSocket> newInstanceAsync(WSBuilder b) {
-        CompletableFuture<Result> result = new WSOpeningHandshake(b).performAsync();
-        Listener listener = b.getListener();
-        Executor executor = b.getClient().executorService();
-        return result.thenApply(r -> {
-            WS ws = new WS(listener, r.subprotocol, r.channel, executor);
-            ws.start();
-            return ws;
-        });
-    }
-
-    private WS(Listener listener, String subprotocol, RawChannel channel,
-               Executor executor) {
-        this.listener = wrapListener(listener);
-        this.channel = channel;
-        this.subprotocol = subprotocol;
-        Consumer<Throwable> errorHandler = error -> {
-            if (error == null) {
-                throw new InternalError();
-            }
-            // If the channel is closed, we need to update the state, to denote
-            // there's no point in trying to continue using WebSocket
-            if (!channel.isOpen()) {
-                synchronized (stateLock) {
-                    tryChangeState(State.ERROR);
-                }
-            }
-        };
-        transmitter = new WSTransmitter(this, executor, channel, errorHandler);
-        receiver = new WSReceiver(this.listener, this, executor, channel);
-    }
-
-    private void start() {
-        receiver.start();
-    }
-
-    @Override
-    public CompletableFuture<WebSocket> sendText(CharSequence message, boolean isLast) {
-        requireNonNull(message, "message");
-        synchronized (stateLock) {
-            checkState();
-            return transmitter.sendText(message, isLast);
-        }
-    }
-
-    @Override
-    public CompletableFuture<WebSocket> sendBinary(ByteBuffer message, boolean isLast) {
-        requireNonNull(message, "message");
-        synchronized (stateLock) {
-            checkState();
-            return transmitter.sendBinary(message, isLast);
-        }
-    }
-
-    @Override
-    public CompletableFuture<WebSocket> sendPing(ByteBuffer message) {
-        requireNonNull(message, "message");
-        synchronized (stateLock) {
-            checkState();
-            return transmitter.sendPing(message);
-        }
-    }
-
-    @Override
-    public CompletableFuture<WebSocket> sendPong(ByteBuffer message) {
-        requireNonNull(message, "message");
-        synchronized (stateLock) {
-            checkState();
-            return transmitter.sendPong(message);
-        }
-    }
-
-    @Override
-    public CompletableFuture<WebSocket> sendClose(CloseCode code, CharSequence reason) {
-        requireNonNull(code, "code");
-        requireNonNull(reason, "reason");
-        synchronized (stateLock) {
-            return doSendClose(() -> transmitter.sendClose(code, reason));
-        }
-    }
-
-    @Override
-    public CompletableFuture<WebSocket> sendClose() {
-        synchronized (stateLock) {
-            return doSendClose(() -> transmitter.sendClose());
-        }
-    }
-
-    private CompletableFuture<WebSocket> doSendClose(Supplier<CompletableFuture<WebSocket>> s) {
-        checkState();
-        boolean closeChannel = false;
-        synchronized (stateLock) {
-            if (state == State.CLOSED_REMOTELY) {
-                closeChannel = tryChangeState(State.CLOSED);
-            } else {
-                tryChangeState(State.CLOSED_LOCALLY);
-            }
-        }
-        CompletableFuture<WebSocket> sent = s.get();
-        if (closeChannel) {
-            sent.whenComplete((v, t) -> {
-                try {
-                    channel.close();
-                } catch (IOException e) {
-                    logger.log(ERROR, "Error transitioning to state " + State.CLOSED, e);
-                }
-            });
-        }
-        return sent;
-    }
-
-    @Override
-    public void request(long n) {
-        if (n < 0L) {
-            throw new IllegalArgumentException("The number must not be negative: " + n);
-        }
-        receiver.request(n);
-    }
-
-    @Override
-    public String getSubprotocol() {
-        return subprotocol;
-    }
-
-    @Override
-    public boolean isClosed() {
-        return state.isTerminal();
-    }
-
-    @Override
-    public void abort() throws IOException {
-        synchronized (stateLock) {
-            tryChangeState(State.ABORTED);
-        }
-        channel.close();
-    }
-
-    @Override
-    public String toString() {
-        return super.toString() + "[" + state + "]";
-    }
-
-    private void checkState() {
-        if (state.isTerminal() || state == State.CLOSED_LOCALLY) {
-            throw new IllegalStateException("WebSocket is closed [" + state + "]");
-        }
-    }
-
-    /*
-     * Wraps the user's listener passed to the constructor into own listener to
-     * intercept transitions to terminal states (onClose and onError) and to act
-     * upon exceptions and values from the user's listener.
-     */
-    private Listener wrapListener(Listener listener) {
-        return new Listener() {
-
-            // Listener's method MUST be invoked in a happen-before order
-            private final Object visibilityLock = new Object();
-
-            @Override
-            public void onOpen(WebSocket webSocket) {
-                synchronized (visibilityLock) {
-                    listener.onOpen(webSocket);
-                }
-            }
-
-            @Override
-            public CompletionStage<?> onText(WebSocket webSocket, CharSequence message,
-                                             MessagePart part) {
-                synchronized (visibilityLock) {
-                    return listener.onText(webSocket, message, part);
-                }
-            }
-
-            @Override
-            public CompletionStage<?> onBinary(WebSocket webSocket, ByteBuffer message,
-                                               MessagePart part) {
-                synchronized (visibilityLock) {
-                    return listener.onBinary(webSocket, message, part);
-                }
-            }
-
-            @Override
-            public CompletionStage<?> onPing(WebSocket webSocket, ByteBuffer message) {
-                synchronized (visibilityLock) {
-                    return listener.onPing(webSocket, message);
-                }
-            }
-
-            @Override
-            public CompletionStage<?> onPong(WebSocket webSocket, ByteBuffer message) {
-                synchronized (visibilityLock) {
-                    return listener.onPong(webSocket, message);
-                }
-            }
-
-            @Override
-            public void onClose(WebSocket webSocket, Optional<CloseCode> code, String reason) {
-                synchronized (stateLock) {
-                    if (state == State.CLOSED_REMOTELY || state.isTerminal()) {
-                        throw new InternalError("Unexpected onClose in state " + state);
-                    } else if (state == State.CLOSED_LOCALLY) {
-                        try {
-                            channel.close();
-                        } catch (IOException e) {
-                            logger.log(ERROR, "Error transitioning to state " + State.CLOSED, e);
-                        }
-                        tryChangeState(State.CLOSED);
-                    } else if (state == State.CONNECTED) {
-                        tryChangeState(State.CLOSED_REMOTELY);
-                    }
-                }
-                synchronized (visibilityLock) {
-                    listener.onClose(webSocket, code, reason);
-                }
-            }
-
-            @Override
-            public void onError(WebSocket webSocket, Throwable error) {
-                // An error doesn't necessarily mean the connection must be
-                // closed automatically
-                if (!channel.isOpen()) {
-                    synchronized (stateLock) {
-                        tryChangeState(State.ERROR);
-                    }
-                } else if (error instanceof ProtocolException
-                        && error.getCause() instanceof WSProtocolException) {
-                    WSProtocolException cause = (WSProtocolException) error.getCause();
-                    logger.log(WARNING, "Failing connection {0}, reason: ''{1}''",
-                            webSocket, cause.getMessage());
-                    CloseCode cc = cause.getCloseCode();
-                    transmitter.sendClose(cc, "").whenComplete((v, t) -> {
-                        synchronized (stateLock) {
-                            tryChangeState(State.ERROR);
-                        }
-                        try {
-                            channel.close();
-                        } catch (IOException e) {
-                            logger.log(ERROR, e);
-                        }
-                    });
-                }
-                synchronized (visibilityLock) {
-                    listener.onError(webSocket, error);
-                }
-            }
-        };
-    }
-
-    private boolean tryChangeState(State newState) {
-        assert Thread.holdsLock(stateLock);
-        if (state.isTerminal()) {
-            return false;
-        }
-        state = newState;
-        if (newState.isTerminal()) {
-            whenClosed.complete(null);
-        }
-        return true;
-    }
-
-    CompletionStage<Void> whenClosed() {
-        return whenClosed;
-    }
-
-    /*
-     * WebSocket connection internal state.
-     */
-    private enum State {
-
-        /*
-         * Initial WebSocket state. The WebSocket is connected (i.e. remains in
-         * this state) unless proven otherwise. For example, by reading or
-         * writing operations on the channel.
-         */
-        CONNECTED,
-
-        /*
-         * A Close message has been received by the client. No more messages
-         * will be received.
-         */
-        CLOSED_REMOTELY,
-
-        /*
-         * A Close message has been sent by the client. No more messages can be
-         * sent.
-         */
-        CLOSED_LOCALLY,
-
-        /*
-         * Close messages has been both sent and received (closing handshake)
-         * and TCP connection closed. Closed _cleanly_ in terms of RFC 6455.
-         */
-        CLOSED,
-
-        /*
-         * The connection has been aborted by the client. Closed not _cleanly_
-         * in terms of RFC 6455.
-         */
-        ABORTED,
-
-        /*
-         * The connection has been terminated due to a protocol or I/O error.
-         * Might happen during sending or receiving.
-         */
-        ERROR;
-
-        /*
-         * Returns `true` if this state is terminal. If WebSocket has transited
-         * to such a state, if remains in it forever.
-         */
-        boolean isTerminal() {
-            return this == CLOSED || this == ABORTED || this == ERROR;
-        }
-    }
-}