jdk/src/java.httpclient/share/classes/java/net/http/WSTransmitter.java
author michaelm
Mon, 16 May 2016 16:04:14 +0100
changeset 38322 f6f9d3ec14ba
parent 37874 02589df0999a
child 38864 bf2b41533aed
permissions -rw-r--r--
8156825: java/net/httpclient/BasicWebSocketAPITest.java failed with java.lang.AssertionError Reviewed-by: rriggs
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
37874
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
     1
/*
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
     2
 * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
     3
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
     4
 *
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
     5
 * This code is free software; you can redistribute it and/or modify it
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
     6
 * under the terms of the GNU General  License version 2 only, as
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
     7
 * published by the Free Software Foundation.  Oracle designates this
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
     8
 * particular file as subject to the "Classpath" exception as provided
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
     9
 * by Oracle in the LICENSE file that accompanied this code.
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    10
 *
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    11
 * This code is distributed in the hope that it will be useful, but WITHOUT
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    12
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    13
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General  License
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    14
 * version 2 for more details (a copy is included in the LICENSE file that
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    15
 * accompanied this code).
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    16
 *
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    17
 * You should have received a copy of the GNU General  License version
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    18
 * 2 along with this work; if not, write to the Free Software Foundation,
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    19
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    20
 *
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    21
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    22
 * or visit www.oracle.com if you need additional information or have any
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    23
 * questions.
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    24
 */
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    25
package java.net.http;
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    26
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    27
import java.net.http.WSOutgoingMessage.Binary;
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    28
import java.net.http.WSOutgoingMessage.Close;
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    29
import java.net.http.WSOutgoingMessage.Ping;
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    30
import java.net.http.WSOutgoingMessage.Pong;
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    31
import java.net.http.WSOutgoingMessage.StreamedText;
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    32
import java.net.http.WSOutgoingMessage.Text;
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    33
import java.nio.ByteBuffer;
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    34
import java.nio.CharBuffer;
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    35
import java.nio.charset.CharacterCodingException;
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    36
import java.nio.charset.CoderResult;
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    37
import java.nio.charset.StandardCharsets;
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    38
import java.util.concurrent.BlockingQueue;
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    39
import java.util.concurrent.CompletableFuture;
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    40
import java.util.concurrent.Executor;
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    41
import java.util.concurrent.LinkedBlockingQueue;
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    42
import java.util.function.Consumer;
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    43
import java.util.stream.Stream;
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    44
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    45
import static java.lang.String.format;
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    46
import static java.net.http.Pair.pair;
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    47
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    48
/*
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    49
 * Prepares outgoing messages for transmission.  Verifies the WebSocket state,
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    50
 * places the message on the outbound queue, and notifies the signal handler.
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    51
 */
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    52
final class WSTransmitter {
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    53
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    54
    private final BlockingQueue<Pair<WSOutgoingMessage, CompletableFuture<Void>>>
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    55
            backlog = new LinkedBlockingQueue<>();
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    56
    private final WSMessageSender sender;
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    57
    private final WSSignalHandler handler;
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    58
    private boolean previousMessageSent = true;
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    59
    private boolean canSendBinary = true;
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    60
    private boolean canSendText = true;
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    61
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    62
    WSTransmitter(Executor executor, RawChannel channel, Consumer<Throwable> errorHandler) {
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    63
        this.handler = new WSSignalHandler(executor, this::handleSignal);
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    64
        Consumer<Throwable> sendCompletion = (error) -> {
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    65
            synchronized (this) {
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    66
                if (error == null) {
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    67
                    previousMessageSent = true;
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    68
                    handler.signal();
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    69
                } else {
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    70
                    errorHandler.accept(error);
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    71
                    backlog.forEach(p -> p.second.completeExceptionally(error));
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    72
                    backlog.clear();
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    73
                }
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    74
            }
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    75
        };
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    76
        this.sender = new WSMessageSender(channel, sendCompletion);
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    77
    }
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    78
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    79
    CompletableFuture<Void> sendText(CharSequence message, boolean isLast) {
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    80
        checkAndUpdateText(isLast);
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    81
        return acceptMessage(new Text(isLast, message));
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    82
    }
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    83
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    84
    CompletableFuture<Void> sendText(Stream<? extends CharSequence> message) {
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    85
        checkAndUpdateText(true);
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    86
        return acceptMessage(new StreamedText(message));
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    87
    }
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    88
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    89
    CompletableFuture<Void> sendBinary(ByteBuffer message, boolean isLast) {
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    90
        checkAndUpdateBinary(isLast);
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    91
        return acceptMessage(new Binary(isLast, message));
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    92
    }
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    93
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    94
    CompletableFuture<Void> sendPing(ByteBuffer message) {
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    95
        checkSize(message.remaining(), 125);
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    96
        return acceptMessage(new Ping(message));
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    97
    }
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    98
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
    99
    CompletableFuture<Void> sendPong(ByteBuffer message) {
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   100
        checkSize(message.remaining(), 125);
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   101
        return acceptMessage(new Pong(message));
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   102
    }
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   103
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   104
    CompletableFuture<Void> sendClose(WebSocket.CloseCode code, CharSequence reason) {
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   105
        return acceptMessage(createCloseMessage(code, reason));
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   106
    }
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   107
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   108
    CompletableFuture<Void> sendClose() {
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   109
        return acceptMessage(new Close(ByteBuffer.allocate(0)));
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   110
    }
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   111
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   112
    private CompletableFuture<Void> acceptMessage(WSOutgoingMessage m) {
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   113
        CompletableFuture<Void> cf = new CompletableFuture<>();
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   114
        synchronized (this) {
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   115
            backlog.offer(pair(m, cf));
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   116
        }
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   117
        handler.signal();
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   118
        return cf;
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   119
    }
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   120
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   121
    /* Callback for pulling messages from the queue, and initiating the send. */
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   122
    private void handleSignal() {
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   123
        synchronized (this) {
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   124
            while (!backlog.isEmpty() && previousMessageSent) {
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   125
                previousMessageSent = false;
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   126
                Pair<WSOutgoingMessage, CompletableFuture<Void>> p = backlog.peek();
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   127
                boolean sent = sender.trySendFully(p.first);
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   128
                if (sent) {
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   129
                    backlog.remove();
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   130
                    p.second.complete(null);
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   131
                    previousMessageSent = true;
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   132
                }
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   133
            }
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   134
        }
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   135
    }
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   136
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   137
    private Close createCloseMessage(WebSocket.CloseCode code, CharSequence reason) {
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   138
        // TODO: move to construction of CloseDetail (JDK-8155621)
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   139
        ByteBuffer b = ByteBuffer.allocateDirect(125).putChar((char) code.getCode());
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   140
        CoderResult result = StandardCharsets.UTF_8.newEncoder()
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   141
                .encode(CharBuffer.wrap(reason), b, true);
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   142
        if (result.isError()) {
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   143
            try {
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   144
                result.throwException();
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   145
            } catch (CharacterCodingException e) {
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   146
                throw new IllegalArgumentException("Reason is a malformed UTF-16 sequence", e);
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   147
            }
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   148
        } else if (result.isOverflow()) {
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   149
            throw new IllegalArgumentException("Reason is too long");
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   150
        }
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   151
        return new Close(b.flip());
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   152
    }
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   153
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   154
    private void checkSize(int size, int maxSize) {
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   155
        if (size > maxSize) {
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   156
            throw new IllegalArgumentException(
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   157
                    format("The message is too long: %s;" +
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   158
                            " expected not longer than %s", size, maxSize)
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   159
            );
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   160
        }
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   161
    }
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   162
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   163
    private void checkAndUpdateText(boolean isLast) {
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   164
        if (!canSendText) {
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   165
            throw new IllegalStateException("Unexpected text message");
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   166
        }
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   167
        canSendBinary = isLast;
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   168
    }
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   169
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   170
    private void checkAndUpdateBinary(boolean isLast) {
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   171
        if (!canSendBinary) {
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   172
            throw new IllegalStateException("Unexpected binary message");
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   173
        }
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   174
        canSendText = isLast;
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   175
    }
02589df0999a 8087113: Websocket API and implementation
prappo
parents:
diff changeset
   176
}