--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageQueue.java Wed Mar 14 09:01:15 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageQueue.java Wed Mar 14 13:03:11 2018 +0000
@@ -34,6 +34,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
+import java.util.function.Supplier;
/*
* A FIFO message storage facility.
@@ -135,11 +136,12 @@
CompletableFuture<T> future)
throws IOException
{
- add(MessageQueue.Type.TEXT, null, message, isLast, -1, attachment,
+ add(MessageQueue.Type.TEXT, null, null, message, isLast, -1, attachment,
action, future);
}
private <T> void add(Type type,
+ Supplier<? extends ByteBuffer> binarySupplier,
ByteBuffer binary,
CharBuffer text,
boolean isLast,
@@ -149,6 +151,9 @@
CompletableFuture<? super T> future)
throws IOException
{
+ // Pong "subtype" is determined by whichever field (data carrier)
+ // is not null. Both fields cannot be null or non-null simultaneously.
+ assert type != Type.PONG || (binary == null ^ binarySupplier == null);
int h, currentTail, newTail;
do {
h = head;
@@ -163,6 +168,7 @@
throw new InternalError();
}
t.type = type;
+ t.binarySupplier = binarySupplier;
t.binary = binary;
t.text = text;
t.isLast = isLast;
@@ -180,7 +186,7 @@
CompletableFuture<? super T> future)
throws IOException
{
- add(MessageQueue.Type.BINARY, message, null, isLast, -1, attachment,
+ add(MessageQueue.Type.BINARY, null, message, null, isLast, -1, attachment,
action, future);
}
@@ -190,7 +196,7 @@
CompletableFuture<? super T> future)
throws IOException
{
- add(MessageQueue.Type.PING, message, null, false, -1, attachment,
+ add(MessageQueue.Type.PING, null, message, null, false, -1, attachment,
action, future);
}
@@ -200,7 +206,17 @@
CompletableFuture<? super T> future)
throws IOException
{
- add(MessageQueue.Type.PONG, message, null, false, -1, attachment,
+ add(MessageQueue.Type.PONG, null, message, null, false, -1, attachment,
+ action, future);
+ }
+
+ public <T> void addPong(Supplier<? extends ByteBuffer> message,
+ T attachment,
+ BiConsumer<? super T, ? super Throwable> action,
+ CompletableFuture<? super T> future)
+ throws IOException
+ {
+ add(MessageQueue.Type.PONG, message, null, null, false, -1, attachment,
action, future);
}
@@ -211,7 +227,7 @@
CompletableFuture<? super T> future)
throws IOException
{
- add(MessageQueue.Type.CLOSE, null, reason, false, statusCode,
+ add(MessageQueue.Type.CLOSE, null, null, reason, false, statusCode,
attachment, action, future);
}
@@ -258,8 +274,13 @@
}
case PONG:
try {
- return (R) callback.onPong(h.binary, h.attachment, h.action,
- h.future);
+ if (h.binarySupplier != null) {
+ return (R) callback.onPong(h.binarySupplier, h.attachment,
+ h.action, h.future);
+ } else {
+ return (R) callback.onPong(h.binary, h.attachment, h.action,
+ h.future);
+ }
} catch (Throwable t) {
throw (E) t;
}
@@ -286,6 +307,7 @@
throw new InternalError("Queue empty");
}
h.type = null;
+ h.binarySupplier = null;
h.binary = null;
h.text = null;
h.attachment = null;
@@ -334,6 +356,11 @@
BiConsumer<? super T, ? super Throwable> action,
CompletableFuture<? super T> future) throws E;
+ <T> R onPong(Supplier<? extends ByteBuffer> message,
+ T attachment,
+ BiConsumer<? super T, ? super Throwable> action,
+ CompletableFuture<? super T> future) throws E;
+
<T> R onClose(int statusCode,
CharBuffer reason,
T attachment,
@@ -358,6 +385,7 @@
// -- The source message fields --
private Type type;
+ private Supplier<? extends ByteBuffer> binarySupplier;
private ByteBuffer binary;
private CharBuffer text;
private boolean isLast;