src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageQueue.java
branchhttp-client-branch
changeset 56303 a82058c084ef
parent 56295 898dfb226bd0
child 56427 7f1916397463
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageQueue.java	Wed Mar 14 09:01:15 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageQueue.java	Wed Mar 14 13:03:11 2018 +0000
@@ -34,6 +34,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
+import java.util.function.Supplier;
 
 /*
  * A FIFO message storage facility.
@@ -135,11 +136,12 @@
                             CompletableFuture<T> future)
             throws IOException
     {
-        add(MessageQueue.Type.TEXT, null, message, isLast, -1, attachment,
+        add(MessageQueue.Type.TEXT, null, null, message, isLast, -1, attachment,
             action, future);
     }
 
     private <T> void add(Type type,
+                         Supplier<? extends ByteBuffer> binarySupplier,
                          ByteBuffer binary,
                          CharBuffer text,
                          boolean isLast,
@@ -149,6 +151,9 @@
                          CompletableFuture<? super T> future)
             throws IOException
     {
+        // Pong "subtype" is determined by whichever field (data carrier)
+        // is not null. Both fields cannot be null or non-null simultaneously.
+        assert type != Type.PONG || (binary == null ^ binarySupplier == null);
         int h, currentTail, newTail;
         do {
             h = head;
@@ -163,6 +168,7 @@
             throw new InternalError();
         }
         t.type = type;
+        t.binarySupplier = binarySupplier;
         t.binary = binary;
         t.text = text;
         t.isLast = isLast;
@@ -180,7 +186,7 @@
                               CompletableFuture<? super T> future)
             throws IOException
     {
-        add(MessageQueue.Type.BINARY, message, null, isLast, -1, attachment,
+        add(MessageQueue.Type.BINARY, null, message, null, isLast, -1, attachment,
             action, future);
     }
 
@@ -190,7 +196,7 @@
                             CompletableFuture<? super T> future)
             throws IOException
     {
-        add(MessageQueue.Type.PING, message, null, false, -1, attachment,
+        add(MessageQueue.Type.PING, null, message, null, false, -1, attachment,
             action, future);
     }
 
@@ -200,7 +206,17 @@
                             CompletableFuture<? super T> future)
             throws IOException
     {
-        add(MessageQueue.Type.PONG, message, null, false, -1, attachment,
+        add(MessageQueue.Type.PONG, null, message, null, false, -1, attachment,
+            action, future);
+    }
+
+    public <T> void addPong(Supplier<? extends ByteBuffer> message,
+                            T attachment,
+                            BiConsumer<? super T, ? super Throwable> action,
+                            CompletableFuture<? super T> future)
+            throws IOException
+    {
+        add(MessageQueue.Type.PONG, message, null, null, false, -1, attachment,
             action, future);
     }
 
@@ -211,7 +227,7 @@
                              CompletableFuture<? super T> future)
             throws IOException
     {
-        add(MessageQueue.Type.CLOSE, null, reason, false, statusCode,
+        add(MessageQueue.Type.CLOSE, null, null, reason, false, statusCode,
             attachment, action, future);
     }
 
@@ -258,8 +274,13 @@
                 }
             case PONG:
                 try {
-                    return (R) callback.onPong(h.binary, h.attachment, h.action,
-                                               h.future);
+                    if (h.binarySupplier != null) {
+                        return (R) callback.onPong(h.binarySupplier, h.attachment,
+                                                   h.action, h.future);
+                    } else {
+                        return (R) callback.onPong(h.binary, h.attachment, h.action,
+                                                   h.future);
+                    }
                 } catch (Throwable t) {
                     throw (E) t;
                 }
@@ -286,6 +307,7 @@
             throw new InternalError("Queue empty");
         }
         h.type = null;
+        h.binarySupplier = null;
         h.binary = null;
         h.text = null;
         h.attachment = null;
@@ -334,6 +356,11 @@
                      BiConsumer<? super T, ? super Throwable> action,
                      CompletableFuture<? super T> future) throws E;
 
+        <T> R onPong(Supplier<? extends ByteBuffer> message,
+                     T attachment,
+                     BiConsumer<? super T, ? super Throwable> action,
+                     CompletableFuture<? super T> future) throws E;
+
         <T> R onClose(int statusCode,
                       CharBuffer reason,
                       T attachment,
@@ -358,6 +385,7 @@
         // -- The source message fields --
 
         private Type type;
+        private Supplier<? extends ByteBuffer> binarySupplier;
         private ByteBuffer binary;
         private CharBuffer text;
         private boolean isLast;