/*
* Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.net.http.internal.common;
import java.lang.System.Logger.Level;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLHandshakeException;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import java.net.http.internal.common.SubscriberWrapper.SchedulingAction;
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING;
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.FINISHED;
/**
* An implementation of FlowTube that wraps another FlowTube in an
* SSL flow.
* <p>
* The following diagram shows a typical usage of the SSLTube, where
* the SSLTube wraps a SocketTube on the right hand side, and is connected
* to an HttpConnection on the left hand side.
*
* <preformatted>{@code
* +---------- SSLTube -------------------------+
* | |
* | +---SSLFlowDelegate---+ |
* HttpConnection | | | | SocketTube
* read sink <- SSLSubscriberW. <- Reader <- upstreamR.() <---- read source
* (a subscriber) | | \ / | | (a publisher)
* | | SSLEngine | |
* HttpConnection | | / \ | | SocketTube
* write source -> SSLSubscriptionW. -> upstreamW.() -> Writer ----> write sink
* (a publisher) | | | | (a subscriber)
* | +---------------------+ |
* | |
* +---------------------------------------------+
* }</preformatted>
*/
public class SSLTube implements FlowTube {
static final boolean DEBUG = Utils.DEBUG; // revisit: temporary developer's flag.
final System.Logger debug =
Utils.getDebugLogger(this::dbgString, DEBUG);
private final FlowTube tube;
private final SSLSubscriberWrapper readSubscriber;
private final SSLSubscriptionWrapper writeSubscription;
private final SSLFlowDelegate sslDelegate;
private final SSLEngine engine;
private volatile boolean finished;
public SSLTube(SSLEngine engine, Executor executor, FlowTube tube) {
Objects.requireNonNull(engine);
Objects.requireNonNull(executor);
this.tube = Objects.requireNonNull(tube);
writeSubscription = new SSLSubscriptionWrapper();
readSubscriber = new SSLSubscriberWrapper();
this.engine = engine;
sslDelegate = new SSLTubeFlowDelegate(engine,
executor,
readSubscriber,
tube);
}
final class SSLTubeFlowDelegate extends SSLFlowDelegate {
SSLTubeFlowDelegate(SSLEngine engine, Executor executor,
SSLSubscriberWrapper readSubscriber,
FlowTube tube) {
super(engine, executor, readSubscriber, tube);
}
protected SchedulingAction enterReadScheduling() {
readSubscriber.processPendingSubscriber();
return SchedulingAction.CONTINUE;
}
void connect(Flow.Subscriber<? super List<ByteBuffer>> downReader,
Flow.Subscriber<? super List<ByteBuffer>> downWriter) {
assert downWriter == tube;
assert downReader == readSubscriber;
// Connect the read sink first. That's the left-hand side
// downstream subscriber from the HttpConnection (or more
// accurately, the SSLSubscriberWrapper that will wrap it
// when SSLTube::connectFlows is called.
reader.subscribe(downReader);
// Connect the right hand side tube (the socket tube).
//
// The SSLFlowDelegate.writer publishes ByteBuffer to
// the SocketTube for writing on the socket, and the
// SSLFlowDelegate::upstreamReader subscribes to the
// SocketTube to receive ByteBuffers read from the socket.
//
// Basically this method is equivalent to:
// // connect the read source:
// // subscribe the SSLFlowDelegate upstream reader
// // to the socket tube publisher.
// tube.subscribe(upstreamReader());
// // connect the write sink:
// // subscribe the socket tube write subscriber
// // with the SSLFlowDelegate downstream writer.
// writer.subscribe(tube);
tube.connectFlows(FlowTube.asTubePublisher(writer),
FlowTube.asTubeSubscriber(upstreamReader()));
// Finally connect the write source. That's the left
// hand side publisher which will push ByteBuffer for
// writing and encryption to the SSLFlowDelegate.
// The writeSubscription is in fact the SSLSubscriptionWrapper
// that will wrap the subscription provided by the
// HttpConnection publisher when SSLTube::connectFlows
// is called.
upstreamWriter().onSubscribe(writeSubscription);
}
}
public CompletableFuture<String> getALPN() {
return sslDelegate.alpn();
}
@Override
public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
readSubscriber.dropSubscription();
readSubscriber.setDelegate(s);
s.onSubscribe(readSubscription);
}
/**
* Tells whether, or not, this FlowTube has finished receiving data.
*
* @return true when one of this FlowTube Subscriber's OnError or onComplete
* methods have been invoked
*/
@Override
public boolean isFinished() {
return finished;
}
private volatile Flow.Subscription readSubscription;
// The DelegateWrapper wraps a subscribed {@code Flow.Subscriber} and
// tracks the subscriber's state. In particular it makes sure that
// onComplete/onError are not called before onSubscribed.
final static class DelegateWrapper implements FlowTube.TubeSubscriber {
private final FlowTube.TubeSubscriber delegate;
private final System.Logger debug;
volatile boolean subscribedCalled;
volatile boolean subscribedDone;
volatile boolean completed;
volatile Throwable error;
DelegateWrapper(Flow.Subscriber<? super List<ByteBuffer>> delegate,
System.Logger debug) {
this.delegate = FlowTube.asTubeSubscriber(delegate);
this.debug = debug;
}
@Override
public void dropSubscription() {
if (subscribedCalled && !completed) {
delegate.dropSubscription();
}
}
@Override
public void onNext(List<ByteBuffer> item) {
assert subscribedCalled;
delegate.onNext(item);
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
onSubscribe(delegate::onSubscribe, subscription);
}
private void onSubscribe(Consumer<Flow.Subscription> method,
Flow.Subscription subscription) {
subscribedCalled = true;
method.accept(subscription);
Throwable x;
boolean finished;
synchronized (this) {
subscribedDone = true;
x = error;
finished = completed;
}
if (x != null) {
debug.log(Level.DEBUG,
"Subscriber completed before subscribe: forwarding %s",
(Object)x);
delegate.onError(x);
} else if (finished) {
debug.log(Level.DEBUG,
"Subscriber completed before subscribe: calling onComplete()");
delegate.onComplete();
}
}
@Override
public void onError(Throwable t) {
if (completed) {
debug.log(Level.DEBUG,
"Subscriber already completed: ignoring %s",
(Object)t);
return;
}
boolean subscribed;
synchronized (this) {
if (completed) return;
error = t;
completed = true;
subscribed = subscribedDone;
}
if (subscribed) {
delegate.onError(t);
} else {
debug.log(Level.DEBUG,
"Subscriber not yet subscribed: stored %s",
(Object)t);
}
}
@Override
public void onComplete() {
if (completed) return;
boolean subscribed;
synchronized (this) {
if (completed) return;
completed = true;
subscribed = subscribedDone;
}
if (subscribed) {
debug.log(Level.DEBUG, "DelegateWrapper: completing subscriber");
delegate.onComplete();
} else {
debug.log(Level.DEBUG,
"Subscriber not yet subscribed: stored completed=true");
}
}
@Override
public String toString() {
return "DelegateWrapper:" + delegate.toString();
}
}
// Used to read data from the SSLTube.
final class SSLSubscriberWrapper implements FlowTube.TubeSubscriber {
private AtomicReference<DelegateWrapper> pendingDelegate =
new AtomicReference<>();
private volatile DelegateWrapper subscribed;
private volatile boolean onCompleteReceived;
private final AtomicReference<Throwable> errorRef
= new AtomicReference<>();
// setDelegate can be called asynchronously when the SSLTube flow
// is connected. At this time the permanent subscriber (this class)
// may already be subscribed (readSubscription != null) or not.
// 1. If it's already subscribed (readSubscription != null), we
// are going to signal the SSLFlowDelegate reader, and make sure
// onSubscribed is called within the reader flow
// 2. If it's not yet subscribed (readSubscription == null), then
// we're going to wait for onSubscribe to be called.
//
void setDelegate(Flow.Subscriber<? super List<ByteBuffer>> delegate) {
debug.log(Level.DEBUG, "SSLSubscriberWrapper (reader) got delegate: %s",
delegate);
assert delegate != null;
DelegateWrapper delegateWrapper = new DelegateWrapper(delegate, debug);
DelegateWrapper previous;
Flow.Subscription subscription;
boolean handleNow;
synchronized (this) {
previous = pendingDelegate.getAndSet(delegateWrapper);
subscription = readSubscription;
handleNow = this.errorRef.get() != null || finished;
}
if (previous != null) {
previous.dropSubscription();
}
if (subscription == null) {
debug.log(Level.DEBUG, "SSLSubscriberWrapper (reader) no subscription yet");
return;
}
if (handleNow || !sslDelegate.resumeReader()) {
processPendingSubscriber();
}
}
// Can be called outside of the flow if an error has already been
// raise. Otherwise, must be called within the SSLFlowDelegate
// downstream reader flow.
// If there is a subscription, and if there is a pending delegate,
// calls dropSubscription() on the previous delegate (if any),
// then subscribe the pending delegate.
void processPendingSubscriber() {
Flow.Subscription subscription;
DelegateWrapper delegateWrapper, previous;
synchronized (this) {
delegateWrapper = pendingDelegate.get();
if (delegateWrapper == null) return;
subscription = readSubscription;
previous = subscribed;
}
if (subscription == null) {
debug.log(Level.DEBUG,
"SSLSubscriberWrapper (reader) %s",
"processPendingSubscriber: no subscription yet");
return;
}
delegateWrapper = pendingDelegate.getAndSet(null);
if (delegateWrapper == null) return;
if (previous != null) {
previous.dropSubscription();
}
onNewSubscription(delegateWrapper, subscription);
}
@Override
public void dropSubscription() {
DelegateWrapper subscriberImpl = subscribed;
if (subscriberImpl != null) {
subscriberImpl.dropSubscription();
}
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
debug.log(Level.DEBUG,
"SSLSubscriberWrapper (reader) onSubscribe(%s)",
subscription);
onSubscribeImpl(subscription);
}
// called in the reader flow, from onSubscribe.
private void onSubscribeImpl(Flow.Subscription subscription) {
assert subscription != null;
DelegateWrapper subscriberImpl, pending;
synchronized (this) {
readSubscription = subscription;
subscriberImpl = subscribed;
pending = pendingDelegate.get();
}
if (subscriberImpl == null && pending == null) {
debug.log(Level.DEBUG,
"SSLSubscriberWrapper (reader) onSubscribeImpl: %s",
"no delegate yet");
return;
}
if (pending == null) {
// There is no pending delegate, but we have a previously
// subscribed delegate. This is obviously a re-subscribe.
// We are in the downstream reader flow, so we should call
// onSubscribe directly.
debug.log(Level.DEBUG,
"SSLSubscriberWrapper (reader) onSubscribeImpl: %s",
"resubscribing");
onNewSubscription(subscriberImpl, subscription);
} else {
// We have some pending subscriber: subscribe it now that we have
// a subscription. If we already had a previous delegate then
// it will get a dropSubscription().
debug.log(Level.DEBUG,
"SSLSubscriberWrapper (reader) onSubscribeImpl: %s",
"subscribing pending");
processPendingSubscriber();
}
}
private void onNewSubscription(DelegateWrapper subscriberImpl,
Flow.Subscription subscription) {
assert subscriberImpl != null;
assert subscription != null;
Throwable failed;
boolean completed;
// reset any demand that may have been made by the previous
// subscriber
sslDelegate.resetReaderDemand();
// send the subscription to the subscriber.
subscriberImpl.onSubscribe(subscription);
// The following twisted logic is just here that we don't invoke
// onError before onSubscribe. It also prevents race conditions
// if onError is invoked concurrently with setDelegate.
synchronized (this) {
failed = this.errorRef.get();
completed = finished;
subscribed = subscriberImpl;
}
if (failed != null) {
subscriberImpl.onError(failed);
} else if (completed) {
subscriberImpl.onComplete();
}
}
@Override
public void onNext(List<ByteBuffer> item) {
subscribed.onNext(item);
}
public void onErrorImpl(Throwable throwable) {
// The following twisted logic is just here that we don't invoke
// onError before onSubscribe. It also prevents race conditions
// if onError is invoked concurrently with setDelegate.
// See setDelegate.
errorRef.compareAndSet(null, throwable);
Throwable failed = errorRef.get();
finished = true;
debug.log(Level.DEBUG, "%s: onErrorImpl: %s", this, throwable);
DelegateWrapper subscriberImpl;
synchronized (this) {
subscriberImpl = subscribed;
}
if (subscriberImpl != null) {
subscriberImpl.onError(failed);
} else {
debug.log(Level.DEBUG, "%s: delegate null, stored %s", this, failed);
}
// now if we have any pending subscriber, we should forward
// the error to them immediately as the read scheduler will
// already be stopped.
processPendingSubscriber();
}
@Override
public void onError(Throwable throwable) {
assert !finished && !onCompleteReceived;
onErrorImpl(throwable);
}
private boolean handshaking() {
HandshakeStatus hs = engine.getHandshakeStatus();
return !(hs == NOT_HANDSHAKING || hs == FINISHED);
}
private boolean handshakeFailed() {
// sslDelegate can be null if we reach here
// during the initial handshake, as that happens
// within the SSLFlowDelegate constructor.
// In that case we will want to raise an exception.
return handshaking()
&& (sslDelegate == null
|| !sslDelegate.closeNotifyReceived());
}
@Override
public void onComplete() {
assert !finished && !onCompleteReceived;
onCompleteReceived = true;
DelegateWrapper subscriberImpl;
synchronized(this) {
subscriberImpl = subscribed;
}
if (handshakeFailed()) {
debug.log(Level.DEBUG,
"handshake: %s, inbound done: %s outbound done: %s",
engine.getHandshakeStatus(),
engine.isInboundDone(),
engine.isOutboundDone());
onErrorImpl(new SSLHandshakeException(
"Remote host terminated the handshake"));
} else if (subscriberImpl != null) {
finished = true;
subscriberImpl.onComplete();
}
// now if we have any pending subscriber, we should complete
// them immediately as the read scheduler will already be stopped.
processPendingSubscriber();
}
}
@Override
public void connectFlows(TubePublisher writePub,
TubeSubscriber readSub) {
debug.log(Level.DEBUG, "connecting flows");
readSubscriber.setDelegate(readSub);
writePub.subscribe(this);
}
/** Outstanding write demand from the SSL Flow Delegate. */
private final Demand writeDemand = new Demand();
final class SSLSubscriptionWrapper implements Flow.Subscription {
volatile Flow.Subscription delegate;
void setSubscription(Flow.Subscription sub) {
long demand = writeDemand.get(); // FIXME: isn't it a racy way of passing the demand?
delegate = sub;
debug.log(Level.DEBUG, "setSubscription: demand=%d", demand);
if (demand > 0)
sub.request(demand);
}
@Override
public void request(long n) {
writeDemand.increase(n);
debug.log(Level.DEBUG, "request: n=%d", n);
Flow.Subscription sub = delegate;
if (sub != null && n > 0) {
sub.request(n);
}
}
@Override
public void cancel() {
// TODO: no-op or error?
}
}
/* Subscriber - writing side */
@Override
public void onSubscribe(Flow.Subscription subscription) {
Objects.requireNonNull(subscription);
Flow.Subscription x = writeSubscription.delegate;
if (x != null)
x.cancel();
writeSubscription.setSubscription(subscription);
}
@Override
public void onNext(List<ByteBuffer> item) {
Objects.requireNonNull(item);
boolean decremented = writeDemand.tryDecrement();
assert decremented : "Unexpected writeDemand: ";
debug.log(Level.DEBUG,
"sending %d buffers to SSL flow delegate", item.size());
sslDelegate.upstreamWriter().onNext(item);
}
@Override
public void onError(Throwable throwable) {
Objects.requireNonNull(throwable);
sslDelegate.upstreamWriter().onError(throwable);
}
@Override
public void onComplete() {
sslDelegate.upstreamWriter().onComplete();
}
@Override
public String toString() {
return dbgString();
}
final String dbgString() {
return "SSLTube(" + tube + ")";
}
}