--- a/src/java.net.http/share/classes/java/net/http/WebSocket.java Mon Mar 19 12:24:07 2018 +0000
+++ b/src/java.net.http/share/classes/java/net/http/WebSocket.java Mon Mar 19 14:20:18 2018 +0000
@@ -255,15 +255,8 @@
/**
* A Text message has been received.
*
- * <p> If a whole message has been received, this method will be invoked
- * with {@code MessagePart.WHOLE} marker. Otherwise, it will be invoked
- * with {@code FIRST}, possibly a number of times with {@code PART} and,
- * finally, with {@code LAST} markers. If this message is partial, it
- * may be an incomplete UTF-16 sequence. However, the concatenation of
- * all messages through the last will be a complete UTF-16 sequence.
- *
* <p> Return a {@code CompletionStage} which will be used by the
- * {@code WebSocket} as a signal it may reclaim the
+ * {@code WebSocket} as an indication it may reclaim the
* {@code CharSequence}. Do not access the {@code CharSequence} after
* this {@ode CompletionStage} has completed.
*
@@ -281,8 +274,8 @@
* the WebSocket on which the message has been received
* @param message
* the message
- * @param part
- * the part
+ * @param last
+ * whether this is the last part of the message
*
* @return a {@code CompletionStage} which completes when the
* {@code CharSequence} may be reclaimed; or {@code null} if it may be
@@ -290,7 +283,7 @@
*/
default CompletionStage<?> onText(WebSocket webSocket,
CharSequence message,
- MessagePart part) {
+ boolean last) {
webSocket.request(1);
return null;
}
@@ -298,16 +291,11 @@
/**
* A Binary message has been received.
*
- * <p> If a whole message has been received, this method will be invoked
- * with {@code MessagePart.WHOLE} marker. Otherwise, it will be invoked
- * with {@code FIRST}, possibly a number of times with {@code PART} and,
- * finally, with {@code LAST} markers.
- *
* <p> This message consists of bytes from the buffer's position to
* its limit.
*
* <p> Return a {@code CompletionStage} which will be used by the
- * {@code WebSocket} as a signal it may reclaim the
+ * {@code WebSocket} as an indication it may reclaim the
* {@code ByteBuffer}. Do not access the {@code ByteBuffer} after
* this {@ode CompletionStage} has completed.
*
@@ -322,8 +310,8 @@
* the WebSocket on which the message has been received
* @param message
* the message
- * @param part
- * the part
+ * @param last
+ * whether this is the last part of the message
*
* @return a {@code CompletionStage} which completes when the
* {@code ByteBuffer} may be reclaimed; or {@code null} if it may be
@@ -331,7 +319,7 @@
*/
default CompletionStage<?> onBinary(WebSocket webSocket,
ByteBuffer message,
- MessagePart part) {
+ boolean last) {
webSocket.request(1);
return null;
}
@@ -482,35 +470,6 @@
}
/**
- * A marker used by {@link WebSocket.Listener} for identifying partial
- * messages.
- *
- * @since 11
- */
- enum MessagePart {
-
- /**
- * The first part of a message.
- */
- FIRST,
-
- /**
- * A middle part of a message.
- */
- PART,
-
- /**
- * The last part of a message.
- */
- LAST,
-
- /**
- * A whole message consisting of a single part.
- */
- WHOLE
- }
-
- /**
* Sends a Text message with characters from the given {@code CharSequence}.
*
* <p> The character sequence must not be modified until the
@@ -522,7 +481,7 @@
* <li> {@link IllegalStateException} -
* if the previous Text or Binary message has not been sent yet
* or if a previous Binary message has been sent with
- * {@code isLast == false}
+ * {@code last} equals {@code false}
* <li> {@link IOException} -
* if an I/O error occurs, or if the output is closed
* </ul>
@@ -533,14 +492,14 @@
*
* @param message
* the message
- * @param isLast
+ * @param last
* {@code true} if this is the last part of the message,
* {@code false} otherwise
*
* @return a {@code CompletableFuture} that completes, with this
* {@code WebSocket}, when the message has been sent
*/
- CompletableFuture<WebSocket> sendText(CharSequence message, boolean isLast);
+ CompletableFuture<WebSocket> sendText(CharSequence message, boolean last);
/**
* Sends a Binary message with bytes from the given {@code ByteBuffer}.
@@ -556,21 +515,21 @@
* <li> {@link IllegalStateException} -
* if the previous Binary or Text message has not been sent yet
* or if a previous Text message has been sent with
- * {@code isLast == false}
+ * {@code last} equals {@code false}
* <li> {@link IOException} -
* if an I/O error occurs, or if the output is closed
* </ul>
*
* @param message
* the message
- * @param isLast
+ * @param last
* {@code true} if this is the last part of the message,
* {@code false} otherwise
*
* @return a {@code CompletableFuture} that completes, with this
* {@code WebSocket}, when the message has been sent
*/
- CompletableFuture<WebSocket> sendBinary(ByteBuffer message, boolean isLast);
+ CompletableFuture<WebSocket> sendBinary(ByteBuffer message, boolean last);
/**
* Sends a Ping message with bytes from the given {@code ByteBuffer}.
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageDecoder.java Mon Mar 19 12:24:07 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageDecoder.java Mon Mar 19 14:20:18 2018 +0000
@@ -28,7 +28,6 @@
import jdk.internal.net.http.common.Utils;
import jdk.internal.net.http.websocket.Frame.Opcode;
-import java.net.http.WebSocket.MessagePart;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
@@ -59,7 +58,6 @@
private final UTF8AccumulatingDecoder decoder = new UTF8AccumulatingDecoder();
private boolean fin;
private Opcode opcode, originatingOpcode;
- private MessagePart part = MessagePart.WHOLE;
private long payloadLen;
private long unconsumedPayloadLen;
private ByteBuffer binaryData;
@@ -171,11 +169,11 @@
public void payloadData(ByteBuffer data) {
debug.log(Level.DEBUG, "payload %s", data);
unconsumedPayloadLen -= data.remaining();
- boolean isLast = unconsumedPayloadLen == 0;
+ boolean lastPayloadChunk = unconsumedPayloadLen == 0;
if (opcode.isControl()) {
if (binaryData != null) { // An intermediate or the last chunk
binaryData.put(data);
- } else if (!isLast) { // The first chunk
+ } else if (!lastPayloadChunk) { // The first chunk
int remaining = data.remaining();
// It shouldn't be 125, otherwise the next chunk will be of size
// 0, which is not what Reader promises to deliver (eager
@@ -188,18 +186,16 @@
binaryData = ByteBuffer.allocate(data.remaining()).put(data);
}
} else {
- part = determinePart(isLast);
+ boolean last = fin && lastPayloadChunk;
boolean text = opcode == Opcode.TEXT || originatingOpcode == Opcode.TEXT;
if (!text) {
- output.onBinary(data.slice(), part);
+ output.onBinary(data.slice(), last);
data.position(data.limit()); // Consume
} else {
boolean binaryNonEmpty = data.hasRemaining();
CharBuffer textData;
try {
- boolean eof = part == MessagePart.WHOLE
- || part == MessagePart.LAST;
- textData = decoder.decode(data, eof);
+ textData = decoder.decode(data, last);
} catch (CharacterCodingException e) {
throw new FailWebSocketException(
"Invalid UTF-8 in frame " + opcode,
@@ -207,8 +203,8 @@
}
if (!(binaryNonEmpty && !textData.hasRemaining())) {
// If there's a binary data, that result in no text, then we
- // don't deliver anything
- output.onText(textData, part);
+ // don't deliver anything, otherwise:
+ output.onText(textData, last);
}
}
}
@@ -264,18 +260,4 @@
payloadLen = 0;
opcode = null;
}
-
- private MessagePart determinePart(boolean isLast) {
- boolean lastChunk = fin && isLast;
- switch (part) {
- case LAST:
- case WHOLE:
- return lastChunk ? MessagePart.WHOLE : MessagePart.FIRST;
- case FIRST:
- case PART:
- return lastChunk ? MessagePart.LAST : MessagePart.PART;
- default:
- throw new InternalError(String.valueOf(part));
- }
- }
}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageStreamConsumer.java Mon Mar 19 12:24:07 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageStreamConsumer.java Mon Mar 19 14:20:18 2018 +0000
@@ -25,8 +25,6 @@
package jdk.internal.net.http.websocket;
-import java.net.http.WebSocket.MessagePart;
-
import java.nio.ByteBuffer;
/*
@@ -34,9 +32,9 @@
*/
interface MessageStreamConsumer {
- void onText(CharSequence data, MessagePart part);
+ void onText(CharSequence data, boolean last);
- void onBinary(ByteBuffer data, MessagePart part);
+ void onBinary(ByteBuffer data, boolean last);
void onPing(ByteBuffer data);
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java Mon Mar 19 12:24:07 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java Mon Mar 19 14:20:18 2018 +0000
@@ -104,7 +104,7 @@
private final AtomicReference<State> state = new AtomicReference<>(OPEN);
/* Components of calls to Listener's methods */
- private MessagePart part;
+ private boolean last;
private ByteBuffer binaryData;
private CharSequence text;
private int statusCode;
@@ -190,19 +190,19 @@
@Override
public CompletableFuture<WebSocket> sendText(CharSequence message,
- boolean isLast) {
+ boolean last) {
Objects.requireNonNull(message);
long id = 0;
if (debug.isLoggable(Level.DEBUG)) {
id = sendCounter.incrementAndGet();
debug.log(Level.DEBUG, "enter send text %s payload length=%s last=%s",
- id, message.length(), isLast);
+ id, message.length(), last);
}
CompletableFuture<WebSocket> result;
if (!setPendingTextOrBinary()) {
result = failedFuture(new IllegalStateException("Send pending"));
} else {
- result = transport.sendText(message, isLast, this,
+ result = transport.sendText(message, last, this,
(r, e) -> clearPendingTextOrBinary());
}
debug.log(Level.DEBUG, "exit send text %s returned %s", id, result);
@@ -212,19 +212,19 @@
@Override
public CompletableFuture<WebSocket> sendBinary(ByteBuffer message,
- boolean isLast) {
+ boolean last) {
Objects.requireNonNull(message);
long id = 0;
if (debug.isLoggable(Level.DEBUG)) {
id = sendCounter.incrementAndGet();
debug.log(Level.DEBUG, "enter send binary %s payload=%s last=%s",
- id, message, isLast);
+ id, message, last);
}
CompletableFuture<WebSocket> result;
if (!setPendingTextOrBinary()) {
result = failedFuture(new IllegalStateException("Send pending"));
} else {
- result = transport.sendBinary(message, isLast, this,
+ result = transport.sendBinary(message, last, this,
(r, e) -> clearPendingTextOrBinary());
}
debug.log(Level.DEBUG, "exit send binary %s returned %s", id, result);
@@ -621,12 +621,12 @@
long id = 0;
if (debug.isLoggable(Level.DEBUG)) {
id = receiveCounter.incrementAndGet();
- debug.log(Level.DEBUG, "enter onBinary %s payload=%s part=%s",
- id, binaryData, part);
+ debug.log(Level.DEBUG, "enter onBinary %s payload=%s last=%s",
+ id, binaryData, last);
}
CompletionStage<?> cs = null;
try {
- cs = listener.onBinary(WebSocketImpl.this, binaryData, part);
+ cs = listener.onBinary(WebSocketImpl.this, binaryData, last);
} finally {
debug.log(Level.DEBUG, "exit onBinary %s returned %s", id, cs);
}
@@ -637,12 +637,12 @@
if (debug.isLoggable(Level.DEBUG)) {
id = receiveCounter.incrementAndGet();
debug.log(Level.DEBUG,
- "enter onText %s payload.length=%s part=%s",
- id, text.length(), part);
+ "enter onText %s payload.length=%s last=%s",
+ id, text.length(), last);
}
CompletionStage<?> cs = null;
try {
- cs = listener.onText(WebSocketImpl.this, text, part);
+ cs = listener.onText(WebSocketImpl.this, text, last);
} finally {
debug.log(Level.DEBUG, "exit onText %s returned %s", id, cs);
}
@@ -777,18 +777,18 @@
private class SignallingMessageConsumer implements MessageStreamConsumer {
@Override
- public void onText(CharSequence data, MessagePart part) {
+ public void onText(CharSequence data, boolean last) {
transport.acknowledgeReception();
text = data;
- WebSocketImpl.this.part = part;
+ WebSocketImpl.this.last = last;
tryChangeState(WAITING, TEXT);
}
@Override
- public void onBinary(ByteBuffer data, MessagePart part) {
+ public void onBinary(ByteBuffer data, boolean last) {
transport.acknowledgeReception();
binaryData = data;
- WebSocketImpl.this.part = part;
+ WebSocketImpl.this.last = last;
tryChangeState(WAITING, BINARY);
}
--- a/test/jdk/java/net/httpclient/websocket/ImmediateAbort.java Mon Mar 19 12:24:07 2018 +0000
+++ b/test/jdk/java/net/httpclient/websocket/ImmediateAbort.java Mon Mar 19 14:20:18 2018 +0000
@@ -70,7 +70,7 @@
@Override
public CompletionStage<?> onText(WebSocket webSocket,
CharSequence message,
- WebSocket.MessagePart part) {
+ boolean last) {
messageReceived.complete(null);
return null;
}
@@ -78,7 +78,7 @@
@Override
public CompletionStage<?> onBinary(WebSocket webSocket,
ByteBuffer message,
- WebSocket.MessagePart part) {
+ boolean last) {
messageReceived.complete(null);
return null;
}
--- a/test/jdk/java/net/httpclient/websocket/MockListener.java Mon Mar 19 12:24:07 2018 +0000
+++ b/test/jdk/java/net/httpclient/websocket/MockListener.java Mon Mar 19 14:20:18 2018 +0000
@@ -22,7 +22,6 @@
*/
import java.net.http.WebSocket;
-import java.net.http.WebSocket.MessagePart;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -81,19 +80,19 @@
@Override
public CompletionStage<?> onText(WebSocket webSocket,
CharSequence message,
- MessagePart part) {
- System.out.printf("onText(%s, %s, %s)%n", webSocket, message, part);
- OnText inv = new OnText(webSocket, message.toString(), part);
+ boolean last) {
+ System.out.printf("onText(%s, message.length=%s, %s)%n", webSocket, message.length(), last);
+ OnText inv = new OnText(webSocket, message.toString(), last);
invocations.add(inv);
if (collectUntil.test(inv)) {
lastCall.complete(null);
}
- return onText0(webSocket, message, part);
+ return onText0(webSocket, message, last);
}
protected CompletionStage<?> onText0(WebSocket webSocket,
CharSequence message,
- MessagePart part) {
+ boolean last) {
replenish(webSocket);
return null;
}
@@ -101,19 +100,19 @@
@Override
public CompletionStage<?> onBinary(WebSocket webSocket,
ByteBuffer message,
- MessagePart part) {
- System.out.printf("onBinary(%s, %s, %s)%n", webSocket, message, part);
- OnBinary inv = new OnBinary(webSocket, fullCopy(message), part);
+ boolean last) {
+ System.out.printf("onBinary(%s, %s, %s)%n", webSocket, message, last);
+ OnBinary inv = new OnBinary(webSocket, fullCopy(message), last);
invocations.add(inv);
if (collectUntil.test(inv)) {
lastCall.complete(null);
}
- return onBinary0(webSocket, message, part);
+ return onBinary0(webSocket, message, last);
}
protected CompletionStage<?> onBinary0(WebSocket webSocket,
ByteBuffer message,
- MessagePart part) {
+ boolean last) {
replenish(webSocket);
return null;
}
@@ -200,14 +199,14 @@
public static OnText onText(WebSocket webSocket,
String text,
- MessagePart part) {
- return new OnText(webSocket, text, part);
+ boolean last) {
+ return new OnText(webSocket, text, last);
}
public static OnBinary onBinary(WebSocket webSocket,
ByteBuffer data,
- MessagePart part) {
- return new OnBinary(webSocket, data, part);
+ boolean last) {
+ return new OnBinary(webSocket, data, last);
}
public static OnPing onPing(WebSocket webSocket,
@@ -266,12 +265,12 @@
public static final class OnText extends Invocation {
final String text;
- final MessagePart part;
+ final boolean last;
- public OnText(WebSocket webSocket, String text, MessagePart part) {
+ public OnText(WebSocket webSocket, String text, boolean last) {
super(webSocket);
this.text = text;
- this.part = part;
+ this.last = last;
}
@Override
@@ -280,30 +279,30 @@
if (o == null || getClass() != o.getClass()) return false;
OnText onText = (OnText) o;
return Objects.equals(text, onText.text) &&
- part == onText.part &&
+ last == onText.last &&
Objects.equals(webSocket, onText.webSocket);
}
@Override
public int hashCode() {
- return Objects.hash(text, part, webSocket);
+ return Objects.hash(text, last, webSocket);
}
@Override
public String toString() {
- return String.format("onText(%s, %s, %s)", webSocket, text, part);
+ return String.format("onText(%s, message.length=%s, %s)", webSocket, text.length(), last);
}
}
public static final class OnBinary extends Invocation {
final ByteBuffer data;
- final MessagePart part;
+ final boolean last;
- public OnBinary(WebSocket webSocket, ByteBuffer data, MessagePart part) {
+ public OnBinary(WebSocket webSocket, ByteBuffer data, boolean last) {
super(webSocket);
this.data = data;
- this.part = part;
+ this.last = last;
}
@Override
@@ -312,18 +311,18 @@
if (o == null || getClass() != o.getClass()) return false;
OnBinary onBinary = (OnBinary) o;
return Objects.equals(data, onBinary.data) &&
- part == onBinary.part &&
+ last == onBinary.last &&
Objects.equals(webSocket, onBinary.webSocket);
}
@Override
public int hashCode() {
- return Objects.hash(data, part, webSocket);
+ return Objects.hash(data, last, webSocket);
}
@Override
public String toString() {
- return String.format("onBinary(%s, %s, %s)", webSocket, data, part);
+ return String.format("onBinary(%s, %s, %s)", webSocket, data, last);
}
}
--- a/test/jdk/java/net/httpclient/websocket/SendTest.java Mon Mar 19 12:24:07 2018 +0000
+++ b/test/jdk/java/net/httpclient/websocket/SendTest.java Mon Mar 19 14:20:18 2018 +0000
@@ -192,7 +192,7 @@
}
}
- @Test
+ @Test // FIXME: TO BE REMOVED as we agreed upon no timeout in sendClose
public void sendCloseTimeout() throws Exception {
try (DummyWebSocketServer server = Support.notReadingServer()) {
server.open();
--- a/test/jdk/java/net/httpclient/websocket/WebSocketTest.java Mon Mar 19 12:24:07 2018 +0000
+++ b/test/jdk/java/net/httpclient/websocket/WebSocketTest.java Mon Mar 19 14:20:18 2018 +0000
@@ -600,34 +600,23 @@
WebSocket.Listener listener = new WebSocket.Listener() {
List<byte[]> collectedBytes = new ArrayList<>();
- ByteBuffer binary;
+ ByteBuffer binary = ByteBuffer.allocate(1024);
@Override
public CompletionStage<?> onBinary(WebSocket webSocket,
ByteBuffer message,
- WebSocket.MessagePart part) {
- System.out.printf("onBinary(%s, %s)%n", message, part);
+ boolean last) {
+ System.out.printf("onBinary(%s, %s)%n", message, last);
webSocket.request(1);
- byte[] bytes = null;
- switch (part) {
- case FIRST:
- binary = ByteBuffer.allocate(message.remaining() * 2);
- case PART:
- append(message);
- return null;
- case LAST:
- append(message);
- binary.flip();
- bytes = new byte[binary.remaining()];
- binary.get(bytes);
- binary.clear();
- break;
- case WHOLE:
- bytes = new byte[message.remaining()];
- message.get(bytes);
- break;
+
+ append(message);
+ if (last) {
+ binary.flip();
+ byte[] bytes = new byte[binary.remaining()];
+ binary.get(bytes);
+ binary.clear();
+ processWholeBinary(bytes);
}
- processWholeBinary(bytes);
return null;
}
@@ -702,30 +691,20 @@
WebSocket.Listener listener = new WebSocket.Listener() {
List<String> collectedStrings = new ArrayList<>();
- StringBuilder text;
+ StringBuilder text = new StringBuilder();
@Override
public CompletionStage<?> onText(WebSocket webSocket,
CharSequence message,
- WebSocket.MessagePart part) {
- System.out.printf("onText(%s, %s)%n", message, part);
+ boolean last) {
+ System.out.printf("onText(%s, %s)%n", message, last);
webSocket.request(1);
- String str = null;
- switch (part) {
- case FIRST:
- text = new StringBuilder(message.length() * 2);
- case PART:
- text.append(message);
- return null;
- case LAST:
- text.append(message);
- str = text.toString();
- break;
- case WHOLE:
- str = message.toString();
- break;
+ text.append(message);
+ if (last) {
+ String str = text.toString();
+ text.setLength(0);
+ processWholeText(str);
}
- processWholeText(str);
return null;
}
@@ -791,43 +770,29 @@
WebSocket.Listener listener = new WebSocket.Listener() {
- List<CharSequence> parts;
+ List<CharSequence> parts = new ArrayList<>();
/*
* A CompletableFuture which will complete once the current
- * message has been fully assembled (LAST/WHOLE). Until then
- * the listener returns this instance for every call.
+ * message has been fully assembled. Until then the listener
+ * returns this instance for every call.
*/
- CompletableFuture<?> currentCf;
+ CompletableFuture<?> currentCf = new CompletableFuture<>();
List<String> collected = new ArrayList<>();
@Override
public CompletionStage<?> onText(WebSocket webSocket,
CharSequence message,
- WebSocket.MessagePart part) {
- switch (part) {
- case WHOLE:
- CompletableFuture<?> cf = new CompletableFuture<>();
- cf.thenRun(() -> webSocket.request(1));
- processWholeMessage(List.of(message), cf);
- return cf;
- case FIRST:
- parts = new ArrayList<>();
- parts.add(message);
- currentCf = new CompletableFuture<>();
- currentCf.thenRun(() -> webSocket.request(1));
- webSocket.request(1);
- break;
- case PART:
- parts.add(message);
- webSocket.request(1);
- break;
- case LAST:
- parts.add(message);
- CompletableFuture<?> copyCf = this.currentCf;
- processWholeMessage(parts, copyCf);
- currentCf = null;
- parts = null;
- return copyCf;
+ boolean last) {
+ parts.add(message);
+ if (!last) {
+ webSocket.request(1);
+ } else {
+ this.currentCf.thenRun(() -> webSocket.request(1));
+ CompletableFuture<?> refCf = this.currentCf;
+ processWholeMessage(new ArrayList<>(parts), refCf);
+ currentCf = new CompletableFuture<>();
+ parts.clear();
+ return refCf;
}
return currentCf;
}