# HG changeset patch # User prappo # Date 1466531478 -3600 # Node ID b5641ce64cf79b0e95390154f11bf6cc1f63e2f3 # Parent e9499c06e138796addd2734c8014945c3201a4a2 8156742: Miscellaneous WebSocket API improvements Reviewed-by: chegar, rriggs diff -r e9499c06e138 -r b5641ce64cf7 jdk/src/java.httpclient/share/classes/java/net/http/WS.java --- a/jdk/src/java.httpclient/share/classes/java/net/http/WS.java Tue Jun 21 16:52:16 2016 +0100 +++ b/jdk/src/java.httpclient/share/classes/java/net/http/WS.java Tue Jun 21 18:51:18 2016 +0100 @@ -34,7 +34,6 @@ import java.util.concurrent.Executor; import java.util.function.Consumer; import java.util.function.Supplier; -import java.util.stream.Stream; import static java.lang.System.Logger.Level.ERROR; import static java.lang.System.Logger.Level.WARNING; @@ -104,15 +103,6 @@ } @Override - public CompletableFuture sendText(Stream message) { - requireNonNull(message, "message"); - synchronized (stateLock) { - checkState(); - return transmitter.sendText(message); - } - } - - @Override public CompletableFuture sendBinary(ByteBuffer message, boolean isLast) { requireNonNull(message, "message"); synchronized (stateLock) { @@ -179,11 +169,11 @@ } @Override - public long request(long n) { + public void request(long n) { if (n < 0L) { throw new IllegalArgumentException("The number must not be negative: " + n); } - return receiver.request(n); + receiver.request(n); } @Override diff -r e9499c06e138 -r b5641ce64cf7 jdk/src/java.httpclient/share/classes/java/net/http/WSBuilder.java --- a/jdk/src/java.httpclient/share/classes/java/net/http/WSBuilder.java Tue Jun 21 16:52:16 2016 +0100 +++ b/jdk/src/java.httpclient/share/classes/java/net/http/WSBuilder.java Tue Jun 21 18:51:18 2016 +0100 @@ -25,6 +25,7 @@ package java.net.http; import java.net.URI; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -59,8 +60,7 @@ private final LinkedHashMap> headers = new LinkedHashMap<>(); private final WebSocket.Listener listener; private Collection subprotocols = Collections.emptyList(); - private long timeout; - private TimeUnit timeUnit; + private Duration timeout; WSBuilder(URI uri, HttpClient client, WebSocket.Listener listener) { checkURI(requireNonNull(uri, "uri")); @@ -93,13 +93,8 @@ } @Override - public WebSocket.Builder connectTimeout(long timeout, TimeUnit unit) { - if (timeout < 0) { - throw new IllegalArgumentException("Negative timeout: " + timeout); - } - requireNonNull(unit, "unit"); - this.timeout = timeout; - this.timeUnit = unit; + public WebSocket.Builder connectTimeout(Duration timeout) { + this.timeout = requireNonNull(timeout, "timeout"); return this; } @@ -139,9 +134,7 @@ return new ArrayList<>(subprotocols); } - long getTimeout() { return timeout; } - - TimeUnit getTimeUnit() { return timeUnit; } + Duration getConnectTimeout() { return timeout; } private static Collection checkSubprotocols(String mostPreferred, String... lesserPreferred) { diff -r e9499c06e138 -r b5641ce64cf7 jdk/src/java.httpclient/share/classes/java/net/http/WSFrameConsumer.java --- a/jdk/src/java.httpclient/share/classes/java/net/http/WSFrameConsumer.java Tue Jun 21 16:52:16 2016 +0100 +++ b/jdk/src/java.httpclient/share/classes/java/net/http/WSFrameConsumer.java Tue Jun 21 18:51:18 2016 +0100 @@ -206,7 +206,7 @@ boolean binaryNonEmpty = data.hasRemaining(); WSShared textData; try { - textData = decoder.decode(data, part.isLast()); + textData = decoder.decode(data, part == MessagePart.WHOLE || part == MessagePart.LAST); } catch (CharacterCodingException e) { throw new WSProtocolException ("5.6.", "Invalid UTF-8 sequence in frame " + opcode, NOT_CONSISTENT, e); diff -r e9499c06e138 -r b5641ce64cf7 jdk/src/java.httpclient/share/classes/java/net/http/WSMessageSender.java --- a/jdk/src/java.httpclient/share/classes/java/net/http/WSMessageSender.java Tue Jun 21 16:52:16 2016 +0100 +++ b/jdk/src/java.httpclient/share/classes/java/net/http/WSMessageSender.java Tue Jun 21 18:51:18 2016 +0100 @@ -30,7 +30,6 @@ import java.net.http.WSOutgoingMessage.Close; import java.net.http.WSOutgoingMessage.Ping; import java.net.http.WSOutgoingMessage.Pong; -import java.net.http.WSOutgoingMessage.StreamedText; import java.net.http.WSOutgoingMessage.Text; import java.net.http.WSOutgoingMessage.Visitor; import java.nio.ByteBuffer; @@ -123,11 +122,6 @@ } @Override - public void visit(StreamedText streamedText) { - throw new IllegalArgumentException("Not yet implemented"); - } - - @Override public void visit(Binary message) { buffers[1] = message.bytes; int mask = random.nextInt(); diff -r e9499c06e138 -r b5641ce64cf7 jdk/src/java.httpclient/share/classes/java/net/http/WSOpeningHandshake.java --- a/jdk/src/java.httpclient/share/classes/java/net/http/WSOpeningHandshake.java Tue Jun 21 16:52:16 2016 +0100 +++ b/jdk/src/java.httpclient/share/classes/java/net/http/WSOpeningHandshake.java Tue Jun 21 18:51:18 2016 +0100 @@ -32,11 +32,14 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; +import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.Arrays; import java.util.Base64; import java.util.Collection; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -77,8 +80,12 @@ WSOpeningHandshake(WSBuilder b) { URI httpURI = createHttpUri(b.getUri()); HttpRequest.Builder requestBuilder = b.getClient().request(httpURI); - if (b.getTimeUnit() != null) { - requestBuilder.timeout(b.getTimeUnit(), b.getTimeout()); + Duration connectTimeout = b.getConnectTimeout(); + if (connectTimeout != null) { + requestBuilder.timeout( + TimeUnit.of(ChronoUnit.MILLIS), + connectTimeout.get(ChronoUnit.MILLIS) + ); } Collection s = b.getSubprotocols(); if (!s.isEmpty()) { diff -r e9499c06e138 -r b5641ce64cf7 jdk/src/java.httpclient/share/classes/java/net/http/WSOutgoingMessage.java --- a/jdk/src/java.httpclient/share/classes/java/net/http/WSOutgoingMessage.java Tue Jun 21 16:52:16 2016 +0100 +++ b/jdk/src/java.httpclient/share/classes/java/net/http/WSOutgoingMessage.java Tue Jun 21 18:51:18 2016 +0100 @@ -25,13 +25,11 @@ package java.net.http; import java.nio.ByteBuffer; -import java.util.stream.Stream; abstract class WSOutgoingMessage { interface Visitor { void visit(Text message); - void visit(StreamedText message); void visit(Binary message); void visit(Ping message); void visit(Pong message); @@ -64,25 +62,6 @@ } } - static final class StreamedText extends WSOutgoingMessage { - - public final Stream characters; - - StreamedText(Stream characters) { - this.characters = characters; - } - - @Override - void accept(Visitor visitor) { - visitor.visit(this); - } - - @Override - public String toString() { - return WSUtils.toStringSimple(this) + "[characters=" + characters + "]"; - } - } - static final class Binary extends WSOutgoingMessage { public final boolean isLast; diff -r e9499c06e138 -r b5641ce64cf7 jdk/src/java.httpclient/share/classes/java/net/http/WSReceiver.java --- a/jdk/src/java.httpclient/share/classes/java/net/http/WSReceiver.java Tue Jun 21 16:52:16 2016 +0100 +++ b/jdk/src/java.httpclient/share/classes/java/net/http/WSReceiver.java Tue Jun 21 18:51:18 2016 +0100 @@ -101,11 +101,10 @@ } } - long request(long n) { + void request(long n) { long newDemand = demand.accumulateAndGet(n, (p, i) -> p + i < 0 ? Long.MAX_VALUE : p + i); handler.signal(); assert newDemand >= 0 : newDemand; - return newDemand; } private boolean getData() throws IOException { diff -r e9499c06e138 -r b5641ce64cf7 jdk/src/java.httpclient/share/classes/java/net/http/WSTransmitter.java --- a/jdk/src/java.httpclient/share/classes/java/net/http/WSTransmitter.java Tue Jun 21 16:52:16 2016 +0100 +++ b/jdk/src/java.httpclient/share/classes/java/net/http/WSTransmitter.java Tue Jun 21 18:51:18 2016 +0100 @@ -28,7 +28,6 @@ import java.net.http.WSOutgoingMessage.Close; import java.net.http.WSOutgoingMessage.Ping; import java.net.http.WSOutgoingMessage.Pong; -import java.net.http.WSOutgoingMessage.StreamedText; import java.net.http.WSOutgoingMessage.Text; import java.nio.ByteBuffer; import java.nio.CharBuffer; @@ -40,7 +39,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.function.Consumer; -import java.util.stream.Stream; import static java.lang.String.format; import static java.net.http.Pair.pair; @@ -83,11 +81,6 @@ return acceptMessage(new Text(isLast, message)); } - CompletableFuture sendText(Stream message) { - checkAndUpdateText(true); - return acceptMessage(new StreamedText(message)); - } - CompletableFuture sendBinary(ByteBuffer message, boolean isLast) { checkAndUpdateBinary(isLast); return acceptMessage(new Binary(isLast, message)); diff -r e9499c06e138 -r b5641ce64cf7 jdk/src/java.httpclient/share/classes/java/net/http/WebSocket.java --- a/jdk/src/java.httpclient/share/classes/java/net/http/WebSocket.java Tue Jun 21 16:52:16 2016 +0100 +++ b/jdk/src/java.httpclient/share/classes/java/net/http/WebSocket.java Tue Jun 21 18:51:18 2016 +0100 @@ -28,13 +28,11 @@ import java.net.ProtocolException; import java.net.URI; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; /** * A WebSocket client conforming to RFC 6455. @@ -47,8 +45,9 @@ * receive messages. When the {@code WebSocket} is no longer * needed it must be closed: a Close message must both be {@linkplain * #sendClose() sent} and {@linkplain Listener#onClose(WebSocket, Optional, - * String) received}. Or to close abruptly, {@link #abort()} is called. Once - * closed it remains closed, cannot be reopened. + * String) received}. Otherwise, invoke {@link #abort() abort} to close abruptly. + * + *

Once closed the {@code WebSocket} remains closed and cannot be reopened. * *

Messages of type {@code X} are sent through the {@code WebSocket.sendX} * methods and received through {@link WebSocket.Listener}{@code .onX} methods @@ -71,10 +70,6 @@ * arranged from the {@code buffer}'s {@link ByteBuffer#position() position} to * the {@code buffer}'s {@link ByteBuffer#limit() limit}. * - *

All message exchange is run by the threads belonging to the {@linkplain - * HttpClient#executorService() executor service} of {@code WebSocket}'s {@link - * HttpClient}. - * *

Unless otherwise noted, passing a {@code null} argument to a constructor * or method of this type will cause a {@link NullPointerException * NullPointerException} to be thrown. @@ -217,22 +212,17 @@ * Sets a timeout for the opening handshake. * *

If the opening handshake is not finished within the specified - * timeout then {@link #buildAsync()} completes exceptionally with a - * {@code HttpTimeoutException}. + * amount of time then {@link #buildAsync()} completes exceptionally + * with a {@code HttpTimeoutException}. * - *

If the timeout is not specified then it's deemed infinite. + *

If this method is not invoked then the timeout is deemed infinite. * * @param timeout - * the maximum time to wait - * @param unit - * the time unit of the timeout argument + * the timeout * * @return this builder - * - * @throws IllegalArgumentException - * if the {@code timeout} is negative */ - Builder connectTimeout(long timeout, TimeUnit unit); + Builder connectTimeout(Duration timeout); /** * Builds a {@code WebSocket}. @@ -506,7 +496,7 @@ *

Once a Close message is received, the server will not send any * more messages. * - *

A Close message may consist of a close code and a reason for + *

A Close message may consist of a status code and a reason for * closing. The reason will have a UTF-8 representation not longer than * {@code 123} bytes. The reason may be useful for debugging or passing * information relevant to the connection but is not necessarily human @@ -545,12 +535,8 @@ * the time {@code onError} is invoked, no more messages can be sent on * this {@code WebSocket}. * - * @apiNote Errors associated with send operations ({@link - * WebSocket#sendText(CharSequence, boolean) sendText}, {@link - * #sendBinary(ByteBuffer, boolean) sendBinary}, {@link - * #sendPing(ByteBuffer) sendPing}, {@link #sendPong(ByteBuffer) - * sendPong} and {@link #sendClose(CloseCode, CharSequence) sendClose}) - * are reported to the {@code CompletionStage} operations return. + * @apiNote Errors associated with {@code sendX} methods are reported to + * the {@code CompletableFuture} these methods return. * * @implSpec The default implementation does nothing. * @@ -563,8 +549,8 @@ } /** - * A marker used by {@link WebSocket.Listener} for partial message - * receiving. + * A marker used by {@link WebSocket.Listener} in cases where a partial + * message may be received. * * @since 9 */ @@ -586,19 +572,9 @@ LAST, /** - * A whole message. The message consists of a single part. + * A whole message consisting of a single part. */ - WHOLE; - - /** - * Tells whether a part of a message received with this marker is the - * last part. - * - * @return {@code true} if LAST or WHOLE, {@code false} otherwise - */ - public boolean isLast() { - return this == LAST || this == WHOLE; - } + WHOLE } /** @@ -630,7 +606,7 @@ * @param message * the message * @param isLast - * {@code true} if this is the final part of the message, + * {@code true} if this is the last part of the message, * {@code false} otherwise * * @return a CompletableFuture with this WebSocket @@ -679,43 +655,6 @@ } /** - * Sends a whole Text message with characters from {@code - * CharacterSequence}s provided by the given {@code Stream}. - * - *

This is a convenience method. For the general case use {@link - * #sendText(CharSequence, boolean)}. - * - *

Returns a {@code CompletableFuture} which completes - * normally when the message has been sent or completes exceptionally if an - * error occurs. - * - *

Streamed character sequences should not be modified until the - * returned {@code CompletableFuture} completes (either normally or - * exceptionally). - * - *

The returned {@code CompletableFuture} can complete exceptionally - * with: - *

    - *
  • {@link IOException} - * if an I/O error occurs during this operation - *
  • {@link IllegalStateException} - * if the {@code WebSocket} closes while this operation is in progress; - * or if a Close message has been sent already; - * or if there is an outstanding send operation; - * or if a previous Binary message was not sent with {@code isLast == true} - *
- * - * @param message - * the message - * - * @return a CompletableFuture with this WebSocket - * - * @throws IllegalArgumentException - * if {@code message} is a malformed (or an incomplete) UTF-16 sequence - */ - CompletableFuture sendText(Stream message); - - /** * Sends a Binary message with bytes from the given {@code ByteBuffer}. * *

Returns a {@code CompletableFuture} which completes @@ -737,7 +676,7 @@ * @param message * the message * @param isLast - * {@code true} if this is the final part of the message, + * {@code true} if this is the last part of the message, * {@code false} otherwise * * @return a CompletableFuture with this WebSocket @@ -745,43 +684,6 @@ CompletableFuture sendBinary(ByteBuffer message, boolean isLast); /** - * Sends a Binary message with bytes from the given {@code byte[]}. - * - *

Returns a {@code CompletableFuture} which completes - * normally when the message has been sent or completes exceptionally if an - * error occurs. - * - *

The returned {@code CompletableFuture} can complete exceptionally - * with: - *

    - *
  • {@link IOException} - * if an I/O error occurs during this operation - *
  • {@link IllegalStateException} - * if the {@code WebSocket} closes while this operation is in progress; - * or if a Close message has been sent already; - * or if there is an outstanding send operation; - * or if a previous Text message was not sent with {@code isLast == true} - *
- * - * @implSpec This is equivalent to: - *
{@code
-     *     sendBinary(ByteBuffer.wrap(message), isLast)
-     * }
- * - * @param message - * the message - * @param isLast - * {@code true} if this is the final part of the message, - * {@code false} otherwise - * - * @return a CompletableFuture with this WebSocket - */ - default CompletableFuture sendBinary(byte[] message, boolean isLast) { - Objects.requireNonNull(message, "message"); - return sendBinary(ByteBuffer.wrap(message), isLast); - } - - /** * Sends a Ping message. * *

Returns a {@code CompletableFuture} which completes @@ -858,10 +760,11 @@ * normally when the message has been sent or completes exceptionally if an * error occurs. * - *

A Close message may consist of a close code and a reason for closing. - * The reason must have a valid UTF-8 representation not longer than {@code - * 123} bytes. The reason may be useful for debugging or passing information - * relevant to the connection but is not necessarily human readable. + *

A Close message may consist of a status code and a reason for + * closing. The reason must have a UTF-8 representation not longer than + * {@code 123} bytes. The reason may be useful for debugging or passing + * information relevant to the connection but is not necessarily human + * readable. * *

The returned {@code CompletableFuture} can complete exceptionally * with: @@ -910,24 +813,21 @@ CompletableFuture sendClose(); /** - * Requests {@code n} more messages to be received by the {@link Listener + * Allows {@code n} more messages to be received by the {@link Listener * Listener}. * - *

The actual number might be fewer if either of the endpoints decide to - * close the connection before that or an error occurs. + *

The actual number of received messages might be fewer if a Close + * message is received, the connection closes or an error occurs. * *

A {@code WebSocket} that has just been created, hasn't requested * anything yet. Usually the initial request for messages is done in {@link * Listener#onOpen(java.net.http.WebSocket) Listener.onOpen}. * - * If all requested messages have been received, and the server sends more, - * then these messages are queued. - * * @implNote This implementation does not distinguish between partial and * whole messages, because it's not known beforehand how a message will be * received. * - *

If a server sends more messages than requested, the implementation + *

If a server sends more messages than requested, this implementation * queues up these messages on the TCP connection and may eventually force * the sender to stop sending through TCP flow control. * @@ -936,12 +836,8 @@ * * @throws IllegalArgumentException * if {@code n < 0} - * - * @return resulting unfulfilled demand with this request taken into account */ - // TODO return void as it's breaking encapsulation (leaking info when exactly something deemed delivered) - // or demand behaves after LONG.MAX_VALUE - long request(long n); + void request(long n); /** * Returns a {@linkplain Builder#subprotocols(String, String...) subprotocol} diff -r e9499c06e138 -r b5641ce64cf7 jdk/test/java/net/httpclient/BasicWebSocketAPITest.java --- a/jdk/test/java/net/httpclient/BasicWebSocketAPITest.java Tue Jun 21 16:52:16 2016 +0100 +++ b/jdk/test/java/net/httpclient/BasicWebSocketAPITest.java Tue Jun 21 18:51:18 2016 +0100 @@ -32,10 +32,9 @@ import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.SocketChannel; +import java.time.Duration; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import java.util.stream.Stream; /* * @test @@ -83,13 +82,7 @@ (ws) -> TestKit.assertThrows(NullPointerException.class, "message", - () -> ws.sendBinary((byte[]) null, true)) - ); - checkAndClose( - (ws) -> - TestKit.assertThrows(NullPointerException.class, - "message", - () -> ws.sendBinary((ByteBuffer) null, true)) + () -> ws.sendBinary(null, true)) ); checkAndClose( (ws) -> @@ -125,13 +118,7 @@ (ws) -> TestKit.assertThrows(NullPointerException.class, "message", - () -> ws.sendText((CharSequence) null)) - ); - checkAndClose( - (ws) -> - TestKit.assertThrows(NullPointerException.class, - "message", - () -> ws.sendText((Stream) null)) + () -> ws.sendText(null)) ); checkAndClose( (ws) -> @@ -214,17 +201,7 @@ // FIXME: check timeout works // (i.e. it directly influences the time WebSocket waits for connection + opening handshake) TestKit.assertNotThrows( - () -> WebSocket.newBuilder(ws, defaultListener()).connectTimeout(1, TimeUnit.SECONDS) - ); - WebSocket.Builder builder = WebSocket.newBuilder(ws, defaultListener()); - TestKit.assertThrows(IllegalArgumentException.class, - "(?i).*\\bnegative\\b.*", - () -> builder.connectTimeout(-1, TimeUnit.SECONDS) - ); - WebSocket.Builder builder1 = WebSocket.newBuilder(ws, defaultListener()); - TestKit.assertThrows(NullPointerException.class, - "unit", - () -> builder1.connectTimeout(1, null) + () -> WebSocket.newBuilder(ws, defaultListener()).connectTimeout(Duration.ofSeconds(1)) ); // FIXME: check these headers are actually received by the server TestKit.assertNotThrows(