--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLTube.java Mon Nov 06 13:06:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLTube.java Mon Nov 06 18:17:09 2017 +0000
@@ -37,6 +37,7 @@
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLHandshakeException;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import jdk.incubator.http.internal.common.SubscriberWrapper.SchedulingAction;
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING;
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.FINISHED;
@@ -60,14 +61,28 @@
writeSubscription = new SSLSubscriptionWrapper();
readSubscriber = new SSLSubscriberWrapper();
this.engine = engine;
- sslDelegate = new SSLFlowDelegate(engine,
+ sslDelegate = new SSLTubeFlowDelegate(engine,
executor,
readSubscriber,
- tube); // FIXME
+ tube);
tube.subscribe(sslDelegate.upstreamReader());
sslDelegate.upstreamWriter().onSubscribe(writeSubscription);
}
+ // the other possibility would be to pass a lambda to the
+ // constructor of SSLFlowDelegate (instead of subclassing it).
+ final class SSLTubeFlowDelegate extends SSLFlowDelegate {
+ SSLTubeFlowDelegate(SSLEngine engine, Executor executor,
+ SSLSubscriberWrapper readSubscriber,
+ FlowTube tube) {
+ super(engine, executor, readSubscriber, tube);
+ }
+ protected SchedulingAction enterReadScheduling() {
+ readSubscriber.processPendingSubscriber();
+ return SchedulingAction.CONTINUE;
+ }
+ }
+
public CompletableFuture<String> getALPN() {
return sslDelegate.alpn();
}
@@ -97,17 +112,20 @@
// onComplete/onError are not called before onSubscribed.
final static class DelegateWrapper implements FlowTube.TubeSubscriber {
private final FlowTube.TubeSubscriber delegate;
+ private final System.Logger debug;
volatile boolean subscribedCalled;
volatile boolean subscribedDone;
volatile boolean completed;
volatile Throwable error;
- DelegateWrapper(Flow.Subscriber<? super List<ByteBuffer>> delegate) {
+ DelegateWrapper(Flow.Subscriber<? super List<ByteBuffer>> delegate,
+ System.Logger debug) {
this.delegate = FlowTube.asTubeSubscriber(delegate);
+ this.debug = debug;
}
@Override
public void dropSubscription() {
- if (subscribedCalled) {
+ if (subscribedCalled && !completed) {
delegate.dropSubscription();
}
}
@@ -140,15 +158,25 @@
finished = completed;
}
if (x != null) {
+ debug.log(Level.DEBUG,
+ "Subscriber completed before subscribe: forwarding %s",
+ (Object)x);
delegate.onError(x);
} else if (finished) {
+ debug.log(Level.DEBUG,
+ "Subscriber completed before subscribe: calling onComplete()");
delegate.onComplete();
}
}
@Override
public void onError(Throwable t) {
- if (completed) return;
+ if (completed) {
+ debug.log(Level.DEBUG,
+ "Subscriber already completed: ignoring %s",
+ (Object)t);
+ return;
+ }
boolean subscribed;
synchronized (this) {
if (completed) return;
@@ -158,6 +186,10 @@
}
if (subscribed) {
delegate.onError(t);
+ } else {
+ debug.log(Level.DEBUG,
+ "Subscriber not yet subscribed: stored %s",
+ (Object)t);
}
}
@@ -172,6 +204,9 @@
}
if (subscribed) {
delegate.onComplete();
+ } else {
+ debug.log(Level.DEBUG,
+ "Subscriber not yet subscribed: stored completed=true");
}
}
@@ -184,27 +219,73 @@
// Used to read data from the SSLTube.
final class SSLSubscriberWrapper implements FlowTube.TubeSubscriber {
- private volatile DelegateWrapper delegate;
+ private AtomicReference<DelegateWrapper> pendingDelegate =
+ new AtomicReference<>();
private volatile DelegateWrapper subscribed;
private volatile boolean onCompleteReceived;
private final AtomicReference<Throwable> errorRef
= new AtomicReference<>();
+ // setDelegate can be called asynchronously when the SSLTube flow
+ // is connected. At this time the permanent subscriber (this class)
+ // may already be subscribed (readSubscription != null) or not.
+ // 1. If it's already subscribed (readSubscription != null), we
+ // are going to signal the SSLFlowDelegate reader, and make sure
+ // onSubscribed is called within the reader flow
+ // 2. If it's not yet subscribed (readSubscription == null), then
+ // we're going to wait for onSubscribe/onConnection to be called.
+ //
void setDelegate(Flow.Subscriber<? super List<ByteBuffer>> delegate) {
debug.log(Level.DEBUG, "SSLSubscriberWrapper (reader) got delegate: %s",
delegate);
assert delegate != null;
- DelegateWrapper delegateWrapper = new DelegateWrapper(delegate);
+ DelegateWrapper delegateWrapper = new DelegateWrapper(delegate, debug);
+ DelegateWrapper previous;
Flow.Subscription subscription;
+ boolean handleNow;
synchronized (this) {
- this.delegate = delegateWrapper;
+ previous = pendingDelegate.getAndSet(delegateWrapper);
subscription = readSubscription;
+ handleNow = this.errorRef.get() != null || finished;
+ }
+ if (previous != null) {
+ previous.dropSubscription();
}
if (subscription == null) {
debug.log(Level.DEBUG, "SSLSubscriberWrapper (reader) no subscription yet");
return;
}
+ if (handleNow || !sslDelegate.resumeReader()) {
+ processPendingSubscriber();
+ }
+ }
+ // Can be called outside of the flow if an error has already been
+ // raise. Otherwise, must be called within the SSLFlowDelegate
+ // downstream reader flow.
+ // If there is a subscription, and if there is a pending delegate,
+ // calls dropSubscription() on the previous delegate (if any),
+ // then subscribe the pending delegate.
+ void processPendingSubscriber() {
+ Flow.Subscription subscription;
+ DelegateWrapper delegateWrapper, previous;
+ synchronized (this) {
+ delegateWrapper = pendingDelegate.get();
+ if (delegateWrapper == null) return;
+ subscription = readSubscription;
+ previous = subscribed;
+ }
+ if (subscription == null) {
+ debug.log(Level.DEBUG,
+ "SSLSubscriberWrapper (reader) %s",
+ "processPendingSubscriber: no subscription yet");
+ return;
+ }
+ delegateWrapper = pendingDelegate.getAndSet(null);
+ if (delegateWrapper == null) return;
+ if (previous != null) {
+ previous.dropSubscription();
+ }
onNewSubscription(delegateWrapper,
delegateWrapper::onSubscribe,
subscription);
@@ -212,7 +293,7 @@
@Override
public void dropSubscription() {
- DelegateWrapper subscriberImpl = delegate;
+ DelegateWrapper subscriberImpl = subscribed;
if (subscriberImpl != null) {
subscriberImpl.dropSubscription();
}
@@ -223,20 +304,7 @@
debug.log(Level.DEBUG,
"SSLSubscriberWrapper (reader) onConnection(%s)",
subscription);
- assert subscription != null;
- DelegateWrapper subscriberImpl;
- synchronized (this) {
- subscriberImpl = delegate;
- readSubscription = subscription;
- }
- if (subscriberImpl == null) {
- debug.log(Level.DEBUG,
- "SSLSubscriberWrapper (reader) onConnection: no delegate yet");
- return;
- }
- onNewSubscription(subscriberImpl,
- subscriberImpl::onConnection,
- subscription);
+ onSubscribeImpl(subscription);
}
@Override
@@ -244,21 +312,46 @@
debug.log(Level.DEBUG,
"SSLSubscriberWrapper (reader) onSubscribe(%s)",
subscription);
- readSubscription = subscription;
+ onSubscribeImpl(subscription);
+ }
+
+ // called in the reader flow, from either onSubscribe or onConnection.
+ private void onSubscribeImpl(Flow.Subscription subscription) {
assert subscription != null;
- DelegateWrapper subscriberImpl;
+ DelegateWrapper subscriberImpl, pending;
synchronized (this) {
- subscriberImpl = delegate;
readSubscription = subscription;
+ subscriberImpl = subscribed;
+ pending = pendingDelegate.get();
}
- if (subscriberImpl == null) {
+
+ if (subscriberImpl == null && pending == null) {
debug.log(Level.DEBUG,
- "SSLSubscriberWrapper (reader) onSubscribe: no delegate yet");
+ "SSLSubscriberWrapper (reader) onSubscribeImpl: %s",
+ "no delegate yet");
return;
}
- onNewSubscription(subscriberImpl,
- subscriberImpl::onSubscribe,
- subscription);
+
+ if (pending == null) {
+ // There is no pending delegate, but we have a previously
+ // subscribed delegate. This is obviously a re-subscribe.
+ // We are in the downstream reader flow, so we should call
+ // onConnection directly.
+ debug.log(Level.DEBUG,
+ "SSLSubscriberWrapper (reader) onSubscribeImpl: %s",
+ "resusbcribing");
+ onNewSubscription(subscriberImpl,
+ subscriberImpl::onConnection,
+ subscription);
+ } else {
+ // We have some pending subscriber: subscribe it now that we have
+ // a subscription. If we already had a previous delegate then
+ // it will get a dropSubscription().
+ debug.log(Level.DEBUG,
+ "SSLSubscriberWrapper (reader) onSubscribeImpl: %s",
+ "subscribing pending");
+ processPendingSubscriber();
+ }
}
private void onNewSubscription(DelegateWrapper subscriberImpl,
@@ -275,11 +368,6 @@
sslDelegate.resetReaderDemand();
// send the subscription to the subscriber.
method.accept(subscription);
- // reschedule after calling onSubscribe (this should not be
- // strictly needed as the first call to subscription.request()
- // coming after resetting the demand should trigger it).
- // However, it should not do any harm.
- sslDelegate.resumeReader();
// The following twisted logic is just here that we don't invoke
// onError before onSubscribe. It also prevents race conditions
@@ -287,9 +375,7 @@
synchronized (this) {
failed = this.errorRef.get();
completed = finished;
- if (delegate == subscriberImpl) {
- subscribed = subscriberImpl;
- }
+ subscribed = subscriberImpl;
}
if (failed != null) {
subscriberImpl.onError(failed);
@@ -300,7 +386,7 @@
@Override
public void onNext(List<ByteBuffer> item) {
- delegate.onNext(item);
+ subscribed.onNext(item);
}
public void onErrorImpl(Throwable throwable) {
@@ -322,6 +408,10 @@
} else {
debug.log(Level.DEBUG, "%s: delegate null, stored %s", this, failed);
}
+ // now if we have any pending subscriber, we should forward
+ // the error to them immediately as the read scheduler will
+ // already be stopped.
+ processPendingSubscriber();
}
@Override
@@ -351,6 +441,9 @@
finished = true;
subscriberImpl.onComplete();
}
+ // now if we have any pending subscriber, we should complete
+ // them immediately as the read scheduler will already be stopped.
+ processPendingSubscriber();
}
}