http-client-branch: make SSLTube connect to SocketTube using FlowTube::connectFlows
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java Thu Nov 09 17:59:23 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java Fri Nov 10 12:36:44 2017 +0000
@@ -105,18 +105,37 @@
{
this.tubeName = String.valueOf(downWriter);
this.reader = new Reader();
- this.reader.subscribe(downReader);
this.writer = new Writer();
- this.writer.subscribe(downWriter);
this.engine = engine;
this.exec = exec;
this.handshakeState = new AtomicInteger(NOT_HANDSHAKING);
this.cf = CompletableFuture.allOf(reader.completion(), writer.completion())
.thenRun(this::normalStop);
this.alpnCF = new MinimalFuture<>();
+
+ // connect the Reader to the downReader and the
+ // Writer to the downWriter.
+ connect(downReader, downWriter);
+
//Monitor.add(this::monitor);
}
+ /**
+ * Connects the read sink (downReader) to the SSLFlowDelegate Reader,
+ * and the write sink (downWriter) to the SSLFlowDelegate Writer.
+ * Called from within the constructor. Overwritten by SSLTube.
+ *
+ * @param downReader The left hand side read sink (typically, the
+ * HttpConnection read subscriber).
+ * @param downWriter The right hand side write sink (typically
+ * the SocketTube write subscriber).
+ */
+ void connect(Subscriber<? super List<ByteBuffer>> downReader,
+ Subscriber<? super List<ByteBuffer>> downWriter) {
+ this.reader.subscribe(downReader);
+ this.writer.subscribe(downWriter);
+ }
+
/**
* Returns a CompletableFuture<String> which completes after
* the initial handshake completes, and which contains the negotiated
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLTube.java Thu Nov 09 17:59:23 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLTube.java Fri Nov 10 12:36:44 2017 +0000
@@ -41,6 +41,30 @@
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING;
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.FINISHED;
+/**
+ * An implementation of FlowTube that wraps another FlowTube in an
+ * SSL flow.
+ * <p>
+ * The following diagram shows a typical usage of the SSLTube, where
+ * the SSLTube wraps a SocketTube on the right hand side, and is connected
+ * to an HttpConnection on the left hand side.
+ *
+ * <preformatted>{@code
+ * +---------- SSLTube -------------------------+
+ * | |
+ * | +---SSLFlowDelegate---+ |
+ * HttpConnection | | | | SocketTube
+ * read sink <- SSLSubscriberW. <- Reader <- upstreamR.() <---- read source
+ * (a subscriber) | | \ / | | (a publisher)
+ * | | SSLEngine | |
+ * HttpConnection | | / \ | | SocketTube
+ * write source -> SSLSubscriptionW. -> upstreamW.() -> Writer ----> write sink
+ * (a publisher) | | | | (a subscriber)
+ * | +---------------------+ |
+ * | |
+ * +---------------------------------------------+
+ * }</preformatted>
+ */
public class SSLTube implements FlowTube {
static final boolean DEBUG = Utils.DEBUG; // revisit: temporary developer's flag.
@@ -62,15 +86,11 @@
readSubscriber = new SSLSubscriberWrapper();
this.engine = engine;
sslDelegate = new SSLTubeFlowDelegate(engine,
- executor,
- readSubscriber,
- tube);
- tube.subscribe(sslDelegate.upstreamReader());
- sslDelegate.upstreamWriter().onSubscribe(writeSubscription);
+ executor,
+ readSubscriber,
+ tube);
}
- // the other possibility would be to pass a lambda to the
- // constructor of SSLFlowDelegate (instead of subclassing it).
final class SSLTubeFlowDelegate extends SSLFlowDelegate {
SSLTubeFlowDelegate(SSLEngine engine, Executor executor,
SSLSubscriberWrapper readSubscriber,
@@ -81,6 +101,45 @@
readSubscriber.processPendingSubscriber();
return SchedulingAction.CONTINUE;
}
+ void connect(Flow.Subscriber<? super List<ByteBuffer>> downReader,
+ Flow.Subscriber<? super List<ByteBuffer>> downWriter) {
+ assert downWriter == tube;
+ assert downReader == readSubscriber;
+
+ // Connect the read sink first. That's the left-hand side
+ // downstream subscriber from the HttpConnection (or more
+ // accurately, the SSLSubscriberWrapper that will wrap it
+ // when SSLTube::connectFlows is called.
+ reader.subscribe(downReader);
+
+ // Connect the right hand side tube (the socket tube).
+ //
+ // The SSLFlowDelegate.writer publishes ByteBuffer to
+ // the SocketTube for writing on the socket, and the
+ // SSLFlowDelegate::upstreamReader subscribes to the
+ // SocketTube to receive ByteBuffers read from the socket.
+ //
+ // Basically this method is equivalent to:
+ // // connect the read source:
+ // // subscribe the SSLFlowDelegate upstream reader
+ // // to the socket tube publisher.
+ // tube.subscribe(upstreamReader());
+ // // connect the write sink:
+ // // subscribe the socket tube write subscriber
+ // // with the SSLFlowDelegate downstream writer.
+ // writer.subscribe(tube);
+ tube.connectFlows(FlowTube.asTubePublisher(writer),
+ FlowTube.asTubeSubscriber(upstreamReader()));
+
+ // Finally connect the write source. That's the left
+ // hand side publisher which will push ByteBuffer for
+ // writing and encryption to the SSLFlowDelegate.
+ // The writeSubscription is in fact the SSLSubscriptionWrapper
+ // that will wrap the subscription provided by the
+ // HttpConnection publisher when SSLTube::connectFlows
+ // is called.
+ upstreamWriter().onSubscribe(writeSubscription);
+ }
}
public CompletableFuture<String> getALPN() {