--- a/src/java.net.http/share/classes/java/net/http/internal/common/SSLTube.java Wed Feb 07 15:46:30 2018 +0000
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,586 +0,0 @@
-/*
- * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation. Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package java.net.http.internal.common;
-
-import java.lang.System.Logger.Level;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Flow;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLHandshakeException;
-import javax.net.ssl.SSLEngineResult.HandshakeStatus;
-import java.net.http.internal.common.SubscriberWrapper.SchedulingAction;
-import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING;
-import static javax.net.ssl.SSLEngineResult.HandshakeStatus.FINISHED;
-
-/**
- * An implementation of FlowTube that wraps another FlowTube in an
- * SSL flow.
- * <p>
- * The following diagram shows a typical usage of the SSLTube, where
- * the SSLTube wraps a SocketTube on the right hand side, and is connected
- * to an HttpConnection on the left hand side.
- *
- * <preformatted>{@code
- * +---------- SSLTube -------------------------+
- * | |
- * | +---SSLFlowDelegate---+ |
- * HttpConnection | | | | SocketTube
- * read sink <- SSLSubscriberW. <- Reader <- upstreamR.() <---- read source
- * (a subscriber) | | \ / | | (a publisher)
- * | | SSLEngine | |
- * HttpConnection | | / \ | | SocketTube
- * write source -> SSLSubscriptionW. -> upstreamW.() -> Writer ----> write sink
- * (a publisher) | | | | (a subscriber)
- * | +---------------------+ |
- * | |
- * +---------------------------------------------+
- * }</preformatted>
- */
-public class SSLTube implements FlowTube {
-
- static final boolean DEBUG = Utils.DEBUG; // revisit: temporary developer's flag.
- final System.Logger debug =
- Utils.getDebugLogger(this::dbgString, DEBUG);
-
- private final FlowTube tube;
- private final SSLSubscriberWrapper readSubscriber;
- private final SSLSubscriptionWrapper writeSubscription;
- private final SSLFlowDelegate sslDelegate;
- private final SSLEngine engine;
- private volatile boolean finished;
-
- public SSLTube(SSLEngine engine, Executor executor, FlowTube tube) {
- Objects.requireNonNull(engine);
- Objects.requireNonNull(executor);
- this.tube = Objects.requireNonNull(tube);
- writeSubscription = new SSLSubscriptionWrapper();
- readSubscriber = new SSLSubscriberWrapper();
- this.engine = engine;
- sslDelegate = new SSLTubeFlowDelegate(engine,
- executor,
- readSubscriber,
- tube);
- }
-
- final class SSLTubeFlowDelegate extends SSLFlowDelegate {
- SSLTubeFlowDelegate(SSLEngine engine, Executor executor,
- SSLSubscriberWrapper readSubscriber,
- FlowTube tube) {
- super(engine, executor, readSubscriber, tube);
- }
- protected SchedulingAction enterReadScheduling() {
- readSubscriber.processPendingSubscriber();
- return SchedulingAction.CONTINUE;
- }
- void connect(Flow.Subscriber<? super List<ByteBuffer>> downReader,
- Flow.Subscriber<? super List<ByteBuffer>> downWriter) {
- assert downWriter == tube;
- assert downReader == readSubscriber;
-
- // Connect the read sink first. That's the left-hand side
- // downstream subscriber from the HttpConnection (or more
- // accurately, the SSLSubscriberWrapper that will wrap it
- // when SSLTube::connectFlows is called.
- reader.subscribe(downReader);
-
- // Connect the right hand side tube (the socket tube).
- //
- // The SSLFlowDelegate.writer publishes ByteBuffer to
- // the SocketTube for writing on the socket, and the
- // SSLFlowDelegate::upstreamReader subscribes to the
- // SocketTube to receive ByteBuffers read from the socket.
- //
- // Basically this method is equivalent to:
- // // connect the read source:
- // // subscribe the SSLFlowDelegate upstream reader
- // // to the socket tube publisher.
- // tube.subscribe(upstreamReader());
- // // connect the write sink:
- // // subscribe the socket tube write subscriber
- // // with the SSLFlowDelegate downstream writer.
- // writer.subscribe(tube);
- tube.connectFlows(FlowTube.asTubePublisher(writer),
- FlowTube.asTubeSubscriber(upstreamReader()));
-
- // Finally connect the write source. That's the left
- // hand side publisher which will push ByteBuffer for
- // writing and encryption to the SSLFlowDelegate.
- // The writeSubscription is in fact the SSLSubscriptionWrapper
- // that will wrap the subscription provided by the
- // HttpConnection publisher when SSLTube::connectFlows
- // is called.
- upstreamWriter().onSubscribe(writeSubscription);
- }
- }
-
- public CompletableFuture<String> getALPN() {
- return sslDelegate.alpn();
- }
-
- @Override
- public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
- readSubscriber.dropSubscription();
- readSubscriber.setDelegate(s);
- s.onSubscribe(readSubscription);
- }
-
- /**
- * Tells whether, or not, this FlowTube has finished receiving data.
- *
- * @return true when one of this FlowTube Subscriber's OnError or onComplete
- * methods have been invoked
- */
- @Override
- public boolean isFinished() {
- return finished;
- }
-
- private volatile Flow.Subscription readSubscription;
-
- // The DelegateWrapper wraps a subscribed {@code Flow.Subscriber} and
- // tracks the subscriber's state. In particular it makes sure that
- // onComplete/onError are not called before onSubscribed.
- final static class DelegateWrapper implements FlowTube.TubeSubscriber {
- private final FlowTube.TubeSubscriber delegate;
- private final System.Logger debug;
- volatile boolean subscribedCalled;
- volatile boolean subscribedDone;
- volatile boolean completed;
- volatile Throwable error;
- DelegateWrapper(Flow.Subscriber<? super List<ByteBuffer>> delegate,
- System.Logger debug) {
- this.delegate = FlowTube.asTubeSubscriber(delegate);
- this.debug = debug;
- }
-
- @Override
- public void dropSubscription() {
- if (subscribedCalled && !completed) {
- delegate.dropSubscription();
- }
- }
-
- @Override
- public void onNext(List<ByteBuffer> item) {
- assert subscribedCalled;
- delegate.onNext(item);
- }
-
- @Override
- public void onSubscribe(Flow.Subscription subscription) {
- onSubscribe(delegate::onSubscribe, subscription);
- }
-
- private void onSubscribe(Consumer<Flow.Subscription> method,
- Flow.Subscription subscription) {
- subscribedCalled = true;
- method.accept(subscription);
- Throwable x;
- boolean finished;
- synchronized (this) {
- subscribedDone = true;
- x = error;
- finished = completed;
- }
- if (x != null) {
- debug.log(Level.DEBUG,
- "Subscriber completed before subscribe: forwarding %s",
- (Object)x);
- delegate.onError(x);
- } else if (finished) {
- debug.log(Level.DEBUG,
- "Subscriber completed before subscribe: calling onComplete()");
- delegate.onComplete();
- }
- }
-
- @Override
- public void onError(Throwable t) {
- if (completed) {
- debug.log(Level.DEBUG,
- "Subscriber already completed: ignoring %s",
- (Object)t);
- return;
- }
- boolean subscribed;
- synchronized (this) {
- if (completed) return;
- error = t;
- completed = true;
- subscribed = subscribedDone;
- }
- if (subscribed) {
- delegate.onError(t);
- } else {
- debug.log(Level.DEBUG,
- "Subscriber not yet subscribed: stored %s",
- (Object)t);
- }
- }
-
- @Override
- public void onComplete() {
- if (completed) return;
- boolean subscribed;
- synchronized (this) {
- if (completed) return;
- completed = true;
- subscribed = subscribedDone;
- }
- if (subscribed) {
- debug.log(Level.DEBUG, "DelegateWrapper: completing subscriber");
- delegate.onComplete();
- } else {
- debug.log(Level.DEBUG,
- "Subscriber not yet subscribed: stored completed=true");
- }
- }
-
- @Override
- public String toString() {
- return "DelegateWrapper:" + delegate.toString();
- }
-
- }
-
- // Used to read data from the SSLTube.
- final class SSLSubscriberWrapper implements FlowTube.TubeSubscriber {
- private AtomicReference<DelegateWrapper> pendingDelegate =
- new AtomicReference<>();
- private volatile DelegateWrapper subscribed;
- private volatile boolean onCompleteReceived;
- private final AtomicReference<Throwable> errorRef
- = new AtomicReference<>();
-
- // setDelegate can be called asynchronously when the SSLTube flow
- // is connected. At this time the permanent subscriber (this class)
- // may already be subscribed (readSubscription != null) or not.
- // 1. If it's already subscribed (readSubscription != null), we
- // are going to signal the SSLFlowDelegate reader, and make sure
- // onSubscribed is called within the reader flow
- // 2. If it's not yet subscribed (readSubscription == null), then
- // we're going to wait for onSubscribe to be called.
- //
- void setDelegate(Flow.Subscriber<? super List<ByteBuffer>> delegate) {
- debug.log(Level.DEBUG, "SSLSubscriberWrapper (reader) got delegate: %s",
- delegate);
- assert delegate != null;
- DelegateWrapper delegateWrapper = new DelegateWrapper(delegate, debug);
- DelegateWrapper previous;
- Flow.Subscription subscription;
- boolean handleNow;
- synchronized (this) {
- previous = pendingDelegate.getAndSet(delegateWrapper);
- subscription = readSubscription;
- handleNow = this.errorRef.get() != null || finished;
- }
- if (previous != null) {
- previous.dropSubscription();
- }
- if (subscription == null) {
- debug.log(Level.DEBUG, "SSLSubscriberWrapper (reader) no subscription yet");
- return;
- }
- if (handleNow || !sslDelegate.resumeReader()) {
- processPendingSubscriber();
- }
- }
-
- // Can be called outside of the flow if an error has already been
- // raise. Otherwise, must be called within the SSLFlowDelegate
- // downstream reader flow.
- // If there is a subscription, and if there is a pending delegate,
- // calls dropSubscription() on the previous delegate (if any),
- // then subscribe the pending delegate.
- void processPendingSubscriber() {
- Flow.Subscription subscription;
- DelegateWrapper delegateWrapper, previous;
- synchronized (this) {
- delegateWrapper = pendingDelegate.get();
- if (delegateWrapper == null) return;
- subscription = readSubscription;
- previous = subscribed;
- }
- if (subscription == null) {
- debug.log(Level.DEBUG,
- "SSLSubscriberWrapper (reader) %s",
- "processPendingSubscriber: no subscription yet");
- return;
- }
- delegateWrapper = pendingDelegate.getAndSet(null);
- if (delegateWrapper == null) return;
- if (previous != null) {
- previous.dropSubscription();
- }
- onNewSubscription(delegateWrapper, subscription);
- }
-
- @Override
- public void dropSubscription() {
- DelegateWrapper subscriberImpl = subscribed;
- if (subscriberImpl != null) {
- subscriberImpl.dropSubscription();
- }
- }
-
- @Override
- public void onSubscribe(Flow.Subscription subscription) {
- debug.log(Level.DEBUG,
- "SSLSubscriberWrapper (reader) onSubscribe(%s)",
- subscription);
- onSubscribeImpl(subscription);
- }
-
- // called in the reader flow, from onSubscribe.
- private void onSubscribeImpl(Flow.Subscription subscription) {
- assert subscription != null;
- DelegateWrapper subscriberImpl, pending;
- synchronized (this) {
- readSubscription = subscription;
- subscriberImpl = subscribed;
- pending = pendingDelegate.get();
- }
-
- if (subscriberImpl == null && pending == null) {
- debug.log(Level.DEBUG,
- "SSLSubscriberWrapper (reader) onSubscribeImpl: %s",
- "no delegate yet");
- return;
- }
-
- if (pending == null) {
- // There is no pending delegate, but we have a previously
- // subscribed delegate. This is obviously a re-subscribe.
- // We are in the downstream reader flow, so we should call
- // onSubscribe directly.
- debug.log(Level.DEBUG,
- "SSLSubscriberWrapper (reader) onSubscribeImpl: %s",
- "resubscribing");
- onNewSubscription(subscriberImpl, subscription);
- } else {
- // We have some pending subscriber: subscribe it now that we have
- // a subscription. If we already had a previous delegate then
- // it will get a dropSubscription().
- debug.log(Level.DEBUG,
- "SSLSubscriberWrapper (reader) onSubscribeImpl: %s",
- "subscribing pending");
- processPendingSubscriber();
- }
- }
-
- private void onNewSubscription(DelegateWrapper subscriberImpl,
- Flow.Subscription subscription) {
- assert subscriberImpl != null;
- assert subscription != null;
-
- Throwable failed;
- boolean completed;
- // reset any demand that may have been made by the previous
- // subscriber
- sslDelegate.resetReaderDemand();
- // send the subscription to the subscriber.
- subscriberImpl.onSubscribe(subscription);
-
- // The following twisted logic is just here that we don't invoke
- // onError before onSubscribe. It also prevents race conditions
- // if onError is invoked concurrently with setDelegate.
- synchronized (this) {
- failed = this.errorRef.get();
- completed = finished;
- subscribed = subscriberImpl;
- }
- if (failed != null) {
- subscriberImpl.onError(failed);
- } else if (completed) {
- subscriberImpl.onComplete();
- }
- }
-
- @Override
- public void onNext(List<ByteBuffer> item) {
- subscribed.onNext(item);
- }
-
- public void onErrorImpl(Throwable throwable) {
- // The following twisted logic is just here that we don't invoke
- // onError before onSubscribe. It also prevents race conditions
- // if onError is invoked concurrently with setDelegate.
- // See setDelegate.
-
- errorRef.compareAndSet(null, throwable);
- Throwable failed = errorRef.get();
- finished = true;
- debug.log(Level.DEBUG, "%s: onErrorImpl: %s", this, throwable);
- DelegateWrapper subscriberImpl;
- synchronized (this) {
- subscriberImpl = subscribed;
- }
- if (subscriberImpl != null) {
- subscriberImpl.onError(failed);
- } else {
- debug.log(Level.DEBUG, "%s: delegate null, stored %s", this, failed);
- }
- // now if we have any pending subscriber, we should forward
- // the error to them immediately as the read scheduler will
- // already be stopped.
- processPendingSubscriber();
- }
-
- @Override
- public void onError(Throwable throwable) {
- assert !finished && !onCompleteReceived;
- onErrorImpl(throwable);
- }
-
- private boolean handshaking() {
- HandshakeStatus hs = engine.getHandshakeStatus();
- return !(hs == NOT_HANDSHAKING || hs == FINISHED);
- }
-
- private boolean handshakeFailed() {
- // sslDelegate can be null if we reach here
- // during the initial handshake, as that happens
- // within the SSLFlowDelegate constructor.
- // In that case we will want to raise an exception.
- return handshaking()
- && (sslDelegate == null
- || !sslDelegate.closeNotifyReceived());
- }
-
- @Override
- public void onComplete() {
- assert !finished && !onCompleteReceived;
- onCompleteReceived = true;
- DelegateWrapper subscriberImpl;
- synchronized(this) {
- subscriberImpl = subscribed;
- }
-
- if (handshakeFailed()) {
- debug.log(Level.DEBUG,
- "handshake: %s, inbound done: %s outbound done: %s",
- engine.getHandshakeStatus(),
- engine.isInboundDone(),
- engine.isOutboundDone());
- onErrorImpl(new SSLHandshakeException(
- "Remote host terminated the handshake"));
- } else if (subscriberImpl != null) {
- finished = true;
- subscriberImpl.onComplete();
- }
- // now if we have any pending subscriber, we should complete
- // them immediately as the read scheduler will already be stopped.
- processPendingSubscriber();
- }
- }
-
- @Override
- public void connectFlows(TubePublisher writePub,
- TubeSubscriber readSub) {
- debug.log(Level.DEBUG, "connecting flows");
- readSubscriber.setDelegate(readSub);
- writePub.subscribe(this);
- }
-
- /** Outstanding write demand from the SSL Flow Delegate. */
- private final Demand writeDemand = new Demand();
-
- final class SSLSubscriptionWrapper implements Flow.Subscription {
-
- volatile Flow.Subscription delegate;
-
- void setSubscription(Flow.Subscription sub) {
- long demand = writeDemand.get(); // FIXME: isn't it a racy way of passing the demand?
- delegate = sub;
- debug.log(Level.DEBUG, "setSubscription: demand=%d", demand);
- if (demand > 0)
- sub.request(demand);
- }
-
- @Override
- public void request(long n) {
- writeDemand.increase(n);
- debug.log(Level.DEBUG, "request: n=%d", n);
- Flow.Subscription sub = delegate;
- if (sub != null && n > 0) {
- sub.request(n);
- }
- }
-
- @Override
- public void cancel() {
- // TODO: no-op or error?
- }
- }
-
- /* Subscriber - writing side */
- @Override
- public void onSubscribe(Flow.Subscription subscription) {
- Objects.requireNonNull(subscription);
- Flow.Subscription x = writeSubscription.delegate;
- if (x != null)
- x.cancel();
-
- writeSubscription.setSubscription(subscription);
- }
-
- @Override
- public void onNext(List<ByteBuffer> item) {
- Objects.requireNonNull(item);
- boolean decremented = writeDemand.tryDecrement();
- assert decremented : "Unexpected writeDemand: ";
- debug.log(Level.DEBUG,
- "sending %d buffers to SSL flow delegate", item.size());
- sslDelegate.upstreamWriter().onNext(item);
- }
-
- @Override
- public void onError(Throwable throwable) {
- Objects.requireNonNull(throwable);
- sslDelegate.upstreamWriter().onError(throwable);
- }
-
- @Override
- public void onComplete() {
- sslDelegate.upstreamWriter().onComplete();
- }
-
- @Override
- public String toString() {
- return dbgString();
- }
-
- final String dbgString() {
- return "SSLTube(" + tube + ")";
- }
-
-}