# HG changeset patch # User prappo # Date 1465395598 -3600 # Node ID bf2b41533aedc3e528f5e33210f238d80ea21d86 # Parent e031aa31b25f9a85ffac5c24a90fe4a04cc0ab02 8156693: Improve usability of CompletableFuture use in WebSocket API Reviewed-by: rriggs diff -r e031aa31b25f -r bf2b41533aed jdk/src/java.httpclient/share/classes/java/net/http/WS.java --- a/jdk/src/java.httpclient/share/classes/java/net/http/WS.java Wed Jun 08 15:50:11 2016 +0200 +++ b/jdk/src/java.httpclient/share/classes/java/net/http/WS.java Wed Jun 08 15:19:58 2016 +0100 @@ -86,7 +86,7 @@ } } }; - transmitter = new WSTransmitter(executor, channel, errorHandler); + transmitter = new WSTransmitter(this, executor, channel, errorHandler); receiver = new WSReceiver(this.listener, this, executor, channel); } @@ -95,7 +95,7 @@ } @Override - public CompletableFuture sendText(CharSequence message, boolean isLast) { + public CompletableFuture sendText(CharSequence message, boolean isLast) { requireNonNull(message, "message"); synchronized (stateLock) { checkState(); @@ -104,7 +104,7 @@ } @Override - public CompletableFuture sendText(Stream message) { + public CompletableFuture sendText(Stream message) { requireNonNull(message, "message"); synchronized (stateLock) { checkState(); @@ -113,7 +113,7 @@ } @Override - public CompletableFuture sendBinary(ByteBuffer message, boolean isLast) { + public CompletableFuture sendBinary(ByteBuffer message, boolean isLast) { requireNonNull(message, "message"); synchronized (stateLock) { checkState(); @@ -122,7 +122,7 @@ } @Override - public CompletableFuture sendPing(ByteBuffer message) { + public CompletableFuture sendPing(ByteBuffer message) { requireNonNull(message, "message"); synchronized (stateLock) { checkState(); @@ -131,7 +131,7 @@ } @Override - public CompletableFuture sendPong(ByteBuffer message) { + public CompletableFuture sendPong(ByteBuffer message) { requireNonNull(message, "message"); synchronized (stateLock) { checkState(); @@ -140,7 +140,7 @@ } @Override - public CompletableFuture sendClose(CloseCode code, CharSequence reason) { + public CompletableFuture sendClose(CloseCode code, CharSequence reason) { requireNonNull(code, "code"); requireNonNull(reason, "reason"); synchronized (stateLock) { @@ -149,13 +149,13 @@ } @Override - public CompletableFuture sendClose() { + public CompletableFuture sendClose() { synchronized (stateLock) { return doSendClose(() -> transmitter.sendClose()); } } - private CompletableFuture doSendClose(Supplier> s) { + private CompletableFuture doSendClose(Supplier> s) { checkState(); boolean closeChannel = false; synchronized (stateLock) { @@ -165,7 +165,7 @@ tryChangeState(State.CLOSED_LOCALLY); } } - CompletableFuture sent = s.get(); + CompletableFuture sent = s.get(); if (closeChannel) { sent.whenComplete((v, t) -> { try { diff -r e031aa31b25f -r bf2b41533aed jdk/src/java.httpclient/share/classes/java/net/http/WSTransmitter.java --- a/jdk/src/java.httpclient/share/classes/java/net/http/WSTransmitter.java Wed Jun 08 15:50:11 2016 +0200 +++ b/jdk/src/java.httpclient/share/classes/java/net/http/WSTransmitter.java Wed Jun 08 15:19:58 2016 +0100 @@ -51,15 +51,17 @@ */ final class WSTransmitter { - private final BlockingQueue>> + private final BlockingQueue>> backlog = new LinkedBlockingQueue<>(); private final WSMessageSender sender; private final WSSignalHandler handler; + private final WebSocket webSocket; private boolean previousMessageSent = true; private boolean canSendBinary = true; private boolean canSendText = true; - WSTransmitter(Executor executor, RawChannel channel, Consumer errorHandler) { + WSTransmitter(WebSocket ws, Executor executor, RawChannel channel, Consumer errorHandler) { + this.webSocket = ws; this.handler = new WSSignalHandler(executor, this::handleSignal); Consumer sendCompletion = (error) -> { synchronized (this) { @@ -76,41 +78,41 @@ this.sender = new WSMessageSender(channel, sendCompletion); } - CompletableFuture sendText(CharSequence message, boolean isLast) { + CompletableFuture sendText(CharSequence message, boolean isLast) { checkAndUpdateText(isLast); return acceptMessage(new Text(isLast, message)); } - CompletableFuture sendText(Stream message) { + CompletableFuture sendText(Stream message) { checkAndUpdateText(true); return acceptMessage(new StreamedText(message)); } - CompletableFuture sendBinary(ByteBuffer message, boolean isLast) { + CompletableFuture sendBinary(ByteBuffer message, boolean isLast) { checkAndUpdateBinary(isLast); return acceptMessage(new Binary(isLast, message)); } - CompletableFuture sendPing(ByteBuffer message) { + CompletableFuture sendPing(ByteBuffer message) { checkSize(message.remaining(), 125); return acceptMessage(new Ping(message)); } - CompletableFuture sendPong(ByteBuffer message) { + CompletableFuture sendPong(ByteBuffer message) { checkSize(message.remaining(), 125); return acceptMessage(new Pong(message)); } - CompletableFuture sendClose(WebSocket.CloseCode code, CharSequence reason) { + CompletableFuture sendClose(WebSocket.CloseCode code, CharSequence reason) { return acceptMessage(createCloseMessage(code, reason)); } - CompletableFuture sendClose() { + CompletableFuture sendClose() { return acceptMessage(new Close(ByteBuffer.allocate(0))); } - private CompletableFuture acceptMessage(WSOutgoingMessage m) { - CompletableFuture cf = new CompletableFuture<>(); + private CompletableFuture acceptMessage(WSOutgoingMessage m) { + CompletableFuture cf = new CompletableFuture<>(); synchronized (this) { backlog.offer(pair(m, cf)); } @@ -123,11 +125,11 @@ synchronized (this) { while (!backlog.isEmpty() && previousMessageSent) { previousMessageSent = false; - Pair> p = backlog.peek(); + Pair> p = backlog.peek(); boolean sent = sender.trySendFully(p.first); if (sent) { backlog.remove(); - p.second.complete(null); + p.second.complete(webSocket); previousMessageSent = true; } } diff -r e031aa31b25f -r bf2b41533aed jdk/src/java.httpclient/share/classes/java/net/http/WebSocket.java --- a/jdk/src/java.httpclient/share/classes/java/net/http/WebSocket.java Wed Jun 08 15:50:11 2016 +0200 +++ b/jdk/src/java.httpclient/share/classes/java/net/http/WebSocket.java Wed Jun 08 15:19:58 2016 +0100 @@ -52,8 +52,8 @@ * *

Messages of type {@code X} are sent through the {@code WebSocket.sendX} * methods and received through {@link WebSocket.Listener}{@code .onX} methods - * asynchronously. Each of the methods begins the operation and returns a {@link - * CompletionStage} which completes when the operation has completed. + * asynchronously. Each of the methods returns a {@link CompletionStage} which + * completes when the operation has completed. * *

Messages are received only if {@linkplain #request(long) requested}. * @@ -79,6 +79,9 @@ * or method of this type will cause a {@link NullPointerException * NullPointerException} to be thrown. * + * @implNote The default implementation's methods do not block before returning + * a {@code CompletableFuture}. + * * @since 9 */ public interface WebSocket { @@ -234,9 +237,9 @@ /** * Builds a {@code WebSocket}. * - *

Returns immediately with a {@code CompletableFuture} - * which completes with the {@code WebSocket} when it is connected, or - * completes exceptionally if an error occurs. + *

Returns a {@code CompletableFuture} which completes + * normally with the {@code WebSocket} when it is connected or completes + * exceptionally if an error occurs. * *

{@code CompletableFuture} may complete exceptionally with the * following errors: @@ -252,7 +255,7 @@ * if the opening handshake fails * * - * @return a {@code CompletableFuture} of {@code WebSocket} + * @return a {@code CompletableFuture} with the {@code WebSocket} */ CompletableFuture buildAsync(); } @@ -601,9 +604,9 @@ /** * Sends a Text message with characters from the given {@code CharSequence}. * - *

Returns immediately with a {@code CompletableFuture} which - * completes normally when the message has been sent, or completes - * exceptionally if an error occurs. + *

Returns a {@code CompletableFuture} which completes + * normally when the message has been sent or completes exceptionally if an + * error occurs. * *

The {@code CharSequence} should not be modified until the returned * {@code CompletableFuture} completes (either normally or exceptionally). @@ -612,9 +615,12 @@ * with: *

    *
  • {@link IOException} - * if an I/O error occurs during this operation; or the - * {@code WebSocket} closes while this operation is in progress; - * or the {@code message} is a malformed UTF-16 sequence + * if an I/O error occurs during this operation + *
  • {@link IllegalStateException} + * if the {@code WebSocket} closes while this operation is in progress; + * or if a Close message has been sent already; + * or if there is an outstanding send operation; + * or if a previous Binary message was not sent with {@code isLast == true} *
* * @implNote This implementation does not accept partial UTF-16 @@ -624,22 +630,15 @@ * @param message * the message * @param isLast - * {@code true} if this is the final part of the message + * {@code true} if this is the final part of the message, * {@code false} otherwise * - * @return a CompletableFuture of Void + * @return a CompletableFuture with this WebSocket * - * @throws IllegalStateException - * if the WebSocket is closed - * @throws IllegalStateException - * if a Close message has been already sent - * @throws IllegalStateException - * if there is an outstanding send operation - * @throws IllegalStateException - * if a previous Binary message was not sent - * with {@code isLast == true} + * @throws IllegalArgumentException + * if {@code message} is a malformed (or an incomplete) UTF-16 sequence */ - CompletableFuture sendText(CharSequence message, boolean isLast); + CompletableFuture sendText(CharSequence message, boolean isLast); /** * Sends a whole Text message with characters from the given {@code @@ -648,9 +647,9 @@ *

This is a convenience method. For the general case, use {@link * #sendText(CharSequence, boolean)}. * - *

Returns immediately with a {@code CompletableFuture} which - * completes normally when the message has been sent, or completes - * exceptionally if an error occurs. + *

Returns a {@code CompletableFuture} which completes + * normally when the message has been sent or completes exceptionally if an + * error occurs. * *

The {@code CharSequence} should not be modified until the returned * {@code CompletableFuture} completes (either normally or exceptionally). @@ -659,27 +658,23 @@ * with: *

    *
  • {@link IOException} - * if an I/O error occurs during this operation; - * or the {@code WebSocket} closes while this operation is in progress; - * or the message is a malformed (or an incomplete) UTF-16 sequence + * if an I/O error occurs during this operation + *
  • {@link IllegalStateException} + * if the {@code WebSocket} closes while this operation is in progress; + * or if a Close message has been sent already; + * or if there is an outstanding send operation; + * or if a previous Binary message was not sent with {@code isLast == true} *
* * @param message * the message * - * @return a CompletableFuture of Void + * @return a CompletableFuture with this WebSocket * - * @throws IllegalStateException - * if the WebSocket is closed - * @throws IllegalStateException - * if a Close message has been already sent - * @throws IllegalStateException - * if there is an outstanding send operation - * @throws IllegalStateException - * if a previous Binary message was not sent - * with {@code isLast == true} + * @throws IllegalArgumentException + * if {@code message} is a malformed (or an incomplete) UTF-16 sequence */ - default CompletableFuture sendText(CharSequence message) { + default CompletableFuture sendText(CharSequence message) { return sendText(message, true); } @@ -690,9 +685,9 @@ *

This is a convenience method. For the general case use {@link * #sendText(CharSequence, boolean)}. * - *

Returns immediately with a {@code CompletableFuture} which - * completes normally when the message has been sent, or completes - * exceptionally if an error occurs. + *

Returns a {@code CompletableFuture} which completes + * normally when the message has been sent or completes exceptionally if an + * error occurs. * *

Streamed character sequences should not be modified until the * returned {@code CompletableFuture} completes (either normally or @@ -702,41 +697,41 @@ * with: *

    *
  • {@link IOException} - * if an I/O error occurs during this operation; - * or the {@code WebSocket} closes while this operation is in progress; - * or the message is a malformed (or an incomplete) UTF-16 sequence + * if an I/O error occurs during this operation + *
  • {@link IllegalStateException} + * if the {@code WebSocket} closes while this operation is in progress; + * or if a Close message has been sent already; + * or if there is an outstanding send operation; + * or if a previous Binary message was not sent with {@code isLast == true} *
* * @param message * the message * - * @return a CompletableFuture of Void + * @return a CompletableFuture with this WebSocket * - * @throws IllegalStateException - * if the WebSocket is closed - * @throws IllegalStateException - * if a Close message has been already sent - * @throws IllegalStateException - * if there is an outstanding send operation - * @throws IllegalStateException - * if a previous Binary message was not sent - * with {@code isLast == true} + * @throws IllegalArgumentException + * if {@code message} is a malformed (or an incomplete) UTF-16 sequence */ - CompletableFuture sendText(Stream message); + CompletableFuture sendText(Stream message); /** * Sends a Binary message with bytes from the given {@code ByteBuffer}. * - *

Returns immediately with a {@code CompletableFuture} which - * completes normally when the message has been sent, or completes - * exceptionally if an error occurs. + *

Returns a {@code CompletableFuture} which completes + * normally when the message has been sent or completes exceptionally if an + * error occurs. * *

The returned {@code CompletableFuture} can complete exceptionally * with: *

    *
  • {@link IOException} - * if an I/O error occurs during this operation or the - * {@code WebSocket} closes while this operation is in progress + * if an I/O error occurs during this operation + *
  • {@link IllegalStateException} + * if the {@code WebSocket} closes while this operation is in progress; + * or if a Close message has been sent already; + * or if there is an outstanding send operation; + * or if a previous Text message was not sent with {@code isLast == true} *
* * @param message @@ -745,33 +740,27 @@ * {@code true} if this is the final part of the message, * {@code false} otherwise * - * @return a CompletableFuture of Void - * - * @throws IllegalStateException - * if the WebSocket is closed - * @throws IllegalStateException - * if a Close message has been already sent - * @throws IllegalStateException - * if there is an outstanding send operation - * @throws IllegalStateException - * if a previous Text message was not sent - * with {@code isLast == true} + * @return a CompletableFuture with this WebSocket */ - CompletableFuture sendBinary(ByteBuffer message, boolean isLast); + CompletableFuture sendBinary(ByteBuffer message, boolean isLast); /** * Sends a Binary message with bytes from the given {@code byte[]}. * - *

Returns immediately with a {@code CompletableFuture} which - * completes normally when the message has been sent, or completes - * exceptionally if an error occurs. + *

Returns a {@code CompletableFuture} which completes + * normally when the message has been sent or completes exceptionally if an + * error occurs. * *

The returned {@code CompletableFuture} can complete exceptionally * with: *

    *
  • {@link IOException} - * if an I/O error occurs during this operation or the - * {@code WebSocket} closes while this operation is in progress + * if an I/O error occurs during this operation + *
  • {@link IllegalStateException} + * if the {@code WebSocket} closes while this operation is in progress; + * or if a Close message has been sent already; + * or if there is an outstanding send operation; + * or if a previous Text message was not sent with {@code isLast == true} *
* * @implSpec This is equivalent to: @@ -785,19 +774,9 @@ * {@code true} if this is the final part of the message, * {@code false} otherwise * - * @return a CompletableFuture of Void - * - * @throws IllegalStateException - * if the WebSocket is closed - * @throws IllegalStateException - * if a Close message has been already sent - * @throws IllegalStateException - * if there is an outstanding send operation - * @throws IllegalStateException - * if a previous Text message was not sent - * with {@code isLast == true} + * @return a CompletableFuture with this WebSocket */ - default CompletableFuture sendBinary(byte[] message, boolean isLast) { + default CompletableFuture sendBinary(byte[] message, boolean isLast) { Objects.requireNonNull(message, "message"); return sendBinary(ByteBuffer.wrap(message), isLast); } @@ -805,9 +784,9 @@ /** * Sends a Ping message. * - *

Returns immediately with a {@code CompletableFuture} which - * completes normally when the message has been sent, or completes - * exceptionally if an error occurs. + *

Returns a {@code CompletableFuture} which completes + * normally when the message has been sent or completes exceptionally if an + * error occurs. * *

A Ping message may be sent or received by either client or server. * It may serve either as a keepalive or as a means to verify that the @@ -820,32 +799,29 @@ * with: *

    *
  • {@link IOException} - * if an I/O error occurs during this operation or the - * {@code WebSocket} closes while this operation is in progress + * if an I/O error occurs during this operation + *
  • {@link IllegalStateException} + * if the {@code WebSocket} closes while this operation is in progress; + * or if a Close message has been sent already; + * or if there is an outstanding send operation *
* * @param message * the message * - * @return a CompletableFuture of Void + * @return a CompletableFuture with this WebSocket * - * @throws IllegalStateException - * if the WebSocket is closed - * @throws IllegalStateException - * if a Close message has been already sent - * @throws IllegalStateException - * if there is an outstanding send operation * @throws IllegalArgumentException * if {@code message.remaining() > 125} */ - CompletableFuture sendPing(ByteBuffer message); + CompletableFuture sendPing(ByteBuffer message); /** * Sends a Pong message. * - *

Returns immediately with a {@code CompletableFuture} which - * completes normally when the message has been sent, or completes - * exceptionally if an error occurs. + *

Returns a {@code CompletableFuture} which completes + * normally when the message has been sent or completes exceptionally if an + * error occurs. * *

A Pong message may be unsolicited or may be sent in response to a * previously received Ping. In latter case the contents of the Pong is @@ -858,32 +834,29 @@ * with: *

    *
  • {@link IOException} - * if an I/O error occurs during this operation or the - * {@code WebSocket} closes while this operation is in progress + * if an I/O error occurs during this operation + *
  • {@link IllegalStateException} + * if the {@code WebSocket} closes while this operation is in progress; + * or if a Close message has been sent already; + * or if there is an outstanding send operation *
* * @param message * the message * - * @return a CompletableFuture of Void + * @return a CompletableFuture with this WebSocket * - * @throws IllegalStateException - * if the WebSocket is closed - * @throws IllegalStateException - * if a Close message has been already sent - * @throws IllegalStateException - * if there is an outstanding send operation * @throws IllegalArgumentException * if {@code message.remaining() > 125} */ - CompletableFuture sendPong(ByteBuffer message); + CompletableFuture sendPong(ByteBuffer message); /** * Sends a Close message with the given close code and the reason. * - *

Returns immediately with a {@code CompletableFuture} which - * completes normally when the message has been sent, or completes - * exceptionally if an error occurs. + *

Returns a {@code CompletableFuture} which completes + * normally when the message has been sent or completes exceptionally if an + * error occurs. * *

A Close message may consist of a close code and a reason for closing. * The reason must have a valid UTF-8 representation not longer than {@code @@ -894,8 +867,11 @@ * with: *

    *
  • {@link IOException} - * if an I/O error occurs during this operation or the - * {@code WebSocket} closes while this operation is in progress + * if an I/O error occurs during this operation + *
  • {@link IllegalStateException} + * if the {@code WebSocket} closes while this operation is in progress; + * or if a Close message has been sent already; + * or if there is an outstanding send operation *
* * @param code @@ -903,45 +879,35 @@ * @param reason * the reason; can be empty * - * @return a CompletableFuture of Void + * @return a CompletableFuture with this WebSocket * - * @throws IllegalStateException - * if the WebSocket is closed - * @throws IllegalStateException - * if a Close message has been already sent - * @throws IllegalStateException - * if there is an outstanding send operation * @throws IllegalArgumentException - * if the {@code reason} doesn't have a valid UTF-8 - * representation not longer than {@code 123} bytes + * if {@code reason} doesn't have an UTF-8 representation not longer + * than {@code 123} bytes */ - CompletableFuture sendClose(CloseCode code, CharSequence reason); + CompletableFuture sendClose(CloseCode code, CharSequence reason); /** * Sends an empty Close message. * - *

Returns immediately with a {@code CompletableFuture} which - * completes normally when the message has been sent, or completes - * exceptionally if an error occurs. + *

Returns a {@code CompletableFuture} which completes + * normally when the message has been sent or completes exceptionally if an + * error occurs. * *

The returned {@code CompletableFuture} can complete exceptionally * with: *

    *
  • {@link IOException} - * if an I/O error occurs during this operation or the - * {@code WebSocket} closes while this operation is in progress + * if an I/O error occurs during this operation + *
  • {@link IllegalStateException} + * if the {@code WebSocket} closes while this operation is in progress; + * or if a Close message has been sent already; + * or if there is an outstanding send operation *
* - * @return a CompletableFuture of Void - * - * @throws IllegalStateException - * if the WebSocket is closed - * @throws IllegalStateException - * if a Close message has been already sent - * @throws IllegalStateException - * if there is an outstanding send operation + * @return a CompletableFuture with this WebSocket */ - CompletableFuture sendClose(); + CompletableFuture sendClose(); /** * Requests {@code n} more messages to be received by the {@link Listener diff -r e031aa31b25f -r bf2b41533aed jdk/test/java/net/httpclient/BasicWebSocketAPITest.java --- a/jdk/test/java/net/httpclient/BasicWebSocketAPITest.java Wed Jun 08 15:50:11 2016 +0200 +++ b/jdk/test/java/net/httpclient/BasicWebSocketAPITest.java Wed Jun 08 15:19:58 2016 +0100 @@ -30,6 +30,7 @@ import java.net.http.WebSocket; import java.net.http.WebSocket.CloseCode; import java.nio.ByteBuffer; +import java.nio.CharBuffer; import java.nio.channels.SocketChannel; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -92,12 +93,24 @@ ); checkAndClose( (ws) -> + TestKit.assertThrows(IllegalArgumentException.class, + ".*message.*", + () -> ws.sendPing(ByteBuffer.allocate(126))) + ); + checkAndClose( + (ws) -> TestKit.assertThrows(NullPointerException.class, "message", () -> ws.sendPing(null)) ); checkAndClose( (ws) -> + TestKit.assertThrows(IllegalArgumentException.class, + ".*message.*", + () -> ws.sendPong(ByteBuffer.allocate(126))) + ); + checkAndClose( + (ws) -> TestKit.assertThrows(NullPointerException.class, "message", () -> ws.sendPong(null)) @@ -106,7 +119,7 @@ (ws) -> TestKit.assertThrows(NullPointerException.class, "message", - () -> ws.sendText((CharSequence) null, true)) + () -> ws.sendText(null, true)) ); checkAndClose( (ws) -> @@ -122,6 +135,12 @@ ); checkAndClose( (ws) -> + TestKit.assertThrows(IllegalArgumentException.class, + "(?i).*reason.*", + () -> ws.sendClose(CloseCode.NORMAL_CLOSURE, CharBuffer.allocate(124))) + ); + checkAndClose( + (ws) -> TestKit.assertThrows(NullPointerException.class, "code", () -> ws.sendClose(null, ""))