src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java
branchhttp-client-branch
changeset 56474 fe2bf7b369b8
parent 56463 b583caf69b39
child 56481 247ed0848e48
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java	Thu Apr 19 16:47:52 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java	Mon Apr 23 15:45:40 2018 +0100
@@ -45,6 +45,7 @@
 import java.util.concurrent.Flow;
 import java.util.concurrent.Flow.Subscriber;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
 /**
  * Implements SSL using two SubscriberWrappers.
@@ -96,6 +97,7 @@
     volatile boolean close_notify_received;
     final CompletableFuture<Void> readerCF;
     final CompletableFuture<Void> writerCF;
+    final Consumer<ByteBuffer> recycler;
     static AtomicInteger scount = new AtomicInteger(1);
     final int id;
 
@@ -109,8 +111,23 @@
                            Subscriber<? super List<ByteBuffer>> downReader,
                            Subscriber<? super List<ByteBuffer>> downWriter)
     {
+        this(engine, exec, null, downReader, downWriter);
+    }
+
+    /**
+     * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
+     * Flow.Subscriber requires an associated {@link CompletableFuture}
+     * for errors that need to be signaled from downstream to upstream.
+     */
+    public SSLFlowDelegate(SSLEngine engine,
+            Executor exec,
+            Consumer<ByteBuffer> recycler,
+            Subscriber<? super List<ByteBuffer>> downReader,
+            Subscriber<? super List<ByteBuffer>> downWriter)
+        {
         this.id = scount.getAndIncrement();
         this.tubeName = String.valueOf(downWriter);
+        this.recycler = recycler;
         this.reader = new Reader();
         this.writer = new Writer();
         this.engine = engine;
@@ -212,7 +229,7 @@
      * Upstream subscription strategy is to try and keep no more than
      * TARGET_BUFSIZE bytes in readBuf
      */
-    class Reader extends SubscriberWrapper {
+    class Reader extends SubscriberWrapper implements FlowTube.TubeSubscriber {
         // Maximum record size is 16k.
         // Because SocketTube can feeds us up to 3 16K buffers,
         // then setting this size to 16K means that the readBuf
@@ -237,6 +254,11 @@
             readBuf.limit(0); // keep in read mode
         }
 
+        @Override
+        public boolean supportsRecycling() {
+            return recycler != null;
+        }
+
         protected SchedulingAction enterScheduling() {
             return enterReadScheduling();
         }
@@ -292,6 +314,11 @@
                         reallocReadBuf();
                     readBuf.put(buf);
                     readBuf.flip();
+                    // should be safe to call inside lock
+                    // since the only implementation
+                    // offers the buffer to an unbounded queue.
+                    // WARNING: do not touch buf after this point!
+                    if (recycler != null) recycler.accept(buf);
                 }
                 if (complete) {
                     this.completing = complete;