http-client-branch: make SSLTube connect to SocketTube using FlowTube::connectFlows http-client-branch
authordfuchs
Fri, 10 Nov 2017 12:36:44 +0000
branchhttp-client-branch
changeset 55798 fa84be3c77e4
parent 55797 85a692428288
child 55799 c71f52f48d97
http-client-branch: make SSLTube connect to SocketTube using FlowTube::connectFlows
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLTube.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java	Thu Nov 09 17:59:23 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java	Fri Nov 10 12:36:44 2017 +0000
@@ -105,18 +105,37 @@
     {
         this.tubeName = String.valueOf(downWriter);
         this.reader = new Reader();
-        this.reader.subscribe(downReader);
         this.writer = new Writer();
-        this.writer.subscribe(downWriter);
         this.engine = engine;
         this.exec = exec;
         this.handshakeState = new AtomicInteger(NOT_HANDSHAKING);
         this.cf = CompletableFuture.allOf(reader.completion(), writer.completion())
                                    .thenRun(this::normalStop);
         this.alpnCF = new MinimalFuture<>();
+
+        // connect the Reader to the downReader and the
+        // Writer to the downWriter.
+        connect(downReader, downWriter);
+
         //Monitor.add(this::monitor);
     }
 
+    /**
+     * Connects the read sink (downReader) to the SSLFlowDelegate Reader,
+     * and the write sink (downWriter) to the SSLFlowDelegate Writer.
+     * Called from within the constructor. Overwritten by SSLTube.
+     *
+     * @param downReader  The left hand side read sink (typically, the
+     *                    HttpConnection read subscriber).
+     * @param downWriter  The right hand side write sink (typically
+     *                    the SocketTube write subscriber).
+     */
+    void connect(Subscriber<? super List<ByteBuffer>> downReader,
+                 Subscriber<? super List<ByteBuffer>> downWriter) {
+        this.reader.subscribe(downReader);
+        this.writer.subscribe(downWriter);
+    }
+
    /**
     * Returns a CompletableFuture<String> which completes after
     * the initial handshake completes, and which contains the negotiated
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLTube.java	Thu Nov 09 17:59:23 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLTube.java	Fri Nov 10 12:36:44 2017 +0000
@@ -41,6 +41,30 @@
 import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING;
 import static javax.net.ssl.SSLEngineResult.HandshakeStatus.FINISHED;
 
+/**
+ * An implementation of FlowTube that wraps another FlowTube in an
+ * SSL flow.
+ * <p>
+ * The following diagram shows a typical usage of the SSLTube, where
+ * the SSLTube wraps a SocketTube on the right hand side, and is connected
+ * to an HttpConnection on the left hand side.
+ *
+ * <preformatted>{@code
+ *                  +----------  SSLTube -------------------------+
+ *                  |                                             |
+ *                  |                    +---SSLFlowDelegate---+  |
+ *  HttpConnection  |                    |                     |  |   SocketTube
+ *    read sink  <- SSLSubscriberW.   <- Reader <- upstreamR.() <---- read source
+ *  (a subscriber)  |                    |    \         /      |  |  (a publisher)
+ *                  |                    |     SSLEngine       |  |
+ *  HttpConnection  |                    |    /         \      |  |   SocketTube
+ *  write source -> SSLSubscriptionW. -> upstreamW.() -> Writer ----> write sink
+ *  (a publisher)   |                    |                     |  |  (a subscriber)
+ *                  |                    +---------------------+  |
+ *                  |                                             |
+ *                  +---------------------------------------------+
+ * }</preformatted>
+ */
 public class SSLTube implements FlowTube {
 
     static final boolean DEBUG = Utils.DEBUG; // revisit: temporary developer's flag.
@@ -62,15 +86,11 @@
         readSubscriber = new SSLSubscriberWrapper();
         this.engine = engine;
         sslDelegate = new SSLTubeFlowDelegate(engine,
-                                          executor,
-                                          readSubscriber,
-                                          tube);
-        tube.subscribe(sslDelegate.upstreamReader());
-        sslDelegate.upstreamWriter().onSubscribe(writeSubscription);
+                                              executor,
+                                              readSubscriber,
+                                              tube);
     }
 
-    // the other possibility would be to pass a lambda to the
-    // constructor of SSLFlowDelegate (instead of subclassing it).
     final class SSLTubeFlowDelegate extends SSLFlowDelegate {
         SSLTubeFlowDelegate(SSLEngine engine, Executor executor,
                             SSLSubscriberWrapper readSubscriber,
@@ -81,6 +101,45 @@
             readSubscriber.processPendingSubscriber();
             return SchedulingAction.CONTINUE;
         }
+        void connect(Flow.Subscriber<? super List<ByteBuffer>> downReader,
+                     Flow.Subscriber<? super List<ByteBuffer>> downWriter) {
+            assert downWriter == tube;
+            assert downReader == readSubscriber;
+
+            // Connect the read sink first. That's the left-hand side
+            // downstream subscriber from the HttpConnection (or more
+            // accurately, the SSLSubscriberWrapper that will wrap it
+            // when SSLTube::connectFlows is called.
+            reader.subscribe(downReader);
+
+            // Connect the right hand side tube (the socket tube).
+            //
+            // The SSLFlowDelegate.writer publishes ByteBuffer to
+            // the SocketTube for writing on the socket, and the
+            // SSLFlowDelegate::upstreamReader subscribes to the
+            // SocketTube to receive ByteBuffers read from the socket.
+            //
+            // Basically this method is equivalent to:
+            //     // connect the read source:
+            //     //   subscribe the SSLFlowDelegate upstream reader
+            //     //   to the socket tube publisher.
+            //     tube.subscribe(upstreamReader());
+            //     // connect the write sink:
+            //     //   subscribe the socket tube write subscriber
+            //     //   with the SSLFlowDelegate downstream writer.
+            //     writer.subscribe(tube);
+            tube.connectFlows(FlowTube.asTubePublisher(writer),
+                              FlowTube.asTubeSubscriber(upstreamReader()));
+
+            // Finally connect the write source. That's the left
+            // hand side publisher which will push ByteBuffer for
+            // writing and encryption to the SSLFlowDelegate.
+            // The writeSubscription is in fact the SSLSubscriptionWrapper
+            // that will wrap the subscription provided by the
+            // HttpConnection publisher when SSLTube::connectFlows
+            // is called.
+            upstreamWriter().onSubscribe(writeSubscription);
+        }
     }
 
     public CompletableFuture<String> getALPN() {