jdk/src/java.httpclient/share/classes/java/net/http/WSTransmitter.java
author prappo
Mon, 09 May 2016 23:33:09 +0100
changeset 37874 02589df0999a
child 38864 bf2b41533aed
permissions -rw-r--r--
8087113: Websocket API and implementation Reviewed-by: chegar

/*
 * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General  License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General  License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General  License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */
package java.net.http;

import java.net.http.WSOutgoingMessage.Binary;
import java.net.http.WSOutgoingMessage.Close;
import java.net.http.WSOutgoingMessage.Ping;
import java.net.http.WSOutgoingMessage.Pong;
import java.net.http.WSOutgoingMessage.StreamedText;
import java.net.http.WSOutgoingMessage.Text;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.CoderResult;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.stream.Stream;

import static java.lang.String.format;
import static java.net.http.Pair.pair;

/*
 * Prepares outgoing messages for transmission.  Verifies the WebSocket state,
 * places the message on the outbound queue, and notifies the signal handler.
 */
final class WSTransmitter {

    private final BlockingQueue<Pair<WSOutgoingMessage, CompletableFuture<Void>>>
            backlog = new LinkedBlockingQueue<>();
    private final WSMessageSender sender;
    private final WSSignalHandler handler;
    private boolean previousMessageSent = true;
    private boolean canSendBinary = true;
    private boolean canSendText = true;

    WSTransmitter(Executor executor, RawChannel channel, Consumer<Throwable> errorHandler) {
        this.handler = new WSSignalHandler(executor, this::handleSignal);
        Consumer<Throwable> sendCompletion = (error) -> {
            synchronized (this) {
                if (error == null) {
                    previousMessageSent = true;
                    handler.signal();
                } else {
                    errorHandler.accept(error);
                    backlog.forEach(p -> p.second.completeExceptionally(error));
                    backlog.clear();
                }
            }
        };
        this.sender = new WSMessageSender(channel, sendCompletion);
    }

    CompletableFuture<Void> sendText(CharSequence message, boolean isLast) {
        checkAndUpdateText(isLast);
        return acceptMessage(new Text(isLast, message));
    }

    CompletableFuture<Void> sendText(Stream<? extends CharSequence> message) {
        checkAndUpdateText(true);
        return acceptMessage(new StreamedText(message));
    }

    CompletableFuture<Void> sendBinary(ByteBuffer message, boolean isLast) {
        checkAndUpdateBinary(isLast);
        return acceptMessage(new Binary(isLast, message));
    }

    CompletableFuture<Void> sendPing(ByteBuffer message) {
        checkSize(message.remaining(), 125);
        return acceptMessage(new Ping(message));
    }

    CompletableFuture<Void> sendPong(ByteBuffer message) {
        checkSize(message.remaining(), 125);
        return acceptMessage(new Pong(message));
    }

    CompletableFuture<Void> sendClose(WebSocket.CloseCode code, CharSequence reason) {
        return acceptMessage(createCloseMessage(code, reason));
    }

    CompletableFuture<Void> sendClose() {
        return acceptMessage(new Close(ByteBuffer.allocate(0)));
    }

    private CompletableFuture<Void> acceptMessage(WSOutgoingMessage m) {
        CompletableFuture<Void> cf = new CompletableFuture<>();
        synchronized (this) {
            backlog.offer(pair(m, cf));
        }
        handler.signal();
        return cf;
    }

    /* Callback for pulling messages from the queue, and initiating the send. */
    private void handleSignal() {
        synchronized (this) {
            while (!backlog.isEmpty() && previousMessageSent) {
                previousMessageSent = false;
                Pair<WSOutgoingMessage, CompletableFuture<Void>> p = backlog.peek();
                boolean sent = sender.trySendFully(p.first);
                if (sent) {
                    backlog.remove();
                    p.second.complete(null);
                    previousMessageSent = true;
                }
            }
        }
    }

    private Close createCloseMessage(WebSocket.CloseCode code, CharSequence reason) {
        // TODO: move to construction of CloseDetail (JDK-8155621)
        ByteBuffer b = ByteBuffer.allocateDirect(125).putChar((char) code.getCode());
        CoderResult result = StandardCharsets.UTF_8.newEncoder()
                .encode(CharBuffer.wrap(reason), b, true);
        if (result.isError()) {
            try {
                result.throwException();
            } catch (CharacterCodingException e) {
                throw new IllegalArgumentException("Reason is a malformed UTF-16 sequence", e);
            }
        } else if (result.isOverflow()) {
            throw new IllegalArgumentException("Reason is too long");
        }
        return new Close(b.flip());
    }

    private void checkSize(int size, int maxSize) {
        if (size > maxSize) {
            throw new IllegalArgumentException(
                    format("The message is too long: %s;" +
                            " expected not longer than %s", size, maxSize)
            );
        }
    }

    private void checkAndUpdateText(boolean isLast) {
        if (!canSendText) {
            throw new IllegalStateException("Unexpected text message");
        }
        canSendBinary = isLast;
    }

    private void checkAndUpdateBinary(boolean isLast) {
        if (!canSendBinary) {
            throw new IllegalStateException("Unexpected binary message");
        }
        canSendText = isLast;
    }
}