src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLTube.java
branchhttp-client-branch
changeset 55909 583695a0ed6a
parent 55798 fa84be3c77e4
child 55950 5e1707e5a254
equal deleted inserted replaced
55908:a36a236e55d8 55909:583695a0ed6a
   198         @Override
   198         @Override
   199         public void onSubscribe(Flow.Subscription subscription) {
   199         public void onSubscribe(Flow.Subscription subscription) {
   200             onSubscribe(delegate::onSubscribe, subscription);
   200             onSubscribe(delegate::onSubscribe, subscription);
   201         }
   201         }
   202 
   202 
   203         @Override
       
   204         public void onConnection(Flow.Subscription subscription) {
       
   205             onSubscribe(delegate::onConnection, subscription);
       
   206         }
       
   207 
       
   208         private void onSubscribe(Consumer<Flow.Subscription> method,
   203         private void onSubscribe(Consumer<Flow.Subscription> method,
   209                                  Flow.Subscription subscription) {
   204                                  Flow.Subscription subscription) {
   210             subscribedCalled = true;
   205             subscribedCalled = true;
   211             method.accept(subscription);
   206             method.accept(subscription);
   212             Throwable x;
   207             Throwable x;
   290         // may already be subscribed (readSubscription != null) or not.
   285         // may already be subscribed (readSubscription != null) or not.
   291         // 1. If it's already subscribed (readSubscription != null), we
   286         // 1. If it's already subscribed (readSubscription != null), we
   292         //    are going to signal the SSLFlowDelegate reader, and make sure
   287         //    are going to signal the SSLFlowDelegate reader, and make sure
   293         //    onSubscribed is called within the reader flow
   288         //    onSubscribed is called within the reader flow
   294         // 2. If it's not yet subscribed (readSubscription == null), then
   289         // 2. If it's not yet subscribed (readSubscription == null), then
   295         //    we're going to wait for onSubscribe/onConnection to be called.
   290         //    we're going to wait for onSubscribe to be called.
   296         //
   291         //
   297         void setDelegate(Flow.Subscriber<? super List<ByteBuffer>> delegate) {
   292         void setDelegate(Flow.Subscriber<? super List<ByteBuffer>> delegate) {
   298             debug.log(Level.DEBUG, "SSLSubscriberWrapper (reader) got delegate: %s",
   293             debug.log(Level.DEBUG, "SSLSubscriberWrapper (reader) got delegate: %s",
   299                       delegate);
   294                       delegate);
   300             assert delegate != null;
   295             assert delegate != null;
   343             delegateWrapper = pendingDelegate.getAndSet(null);
   338             delegateWrapper = pendingDelegate.getAndSet(null);
   344             if (delegateWrapper == null) return;
   339             if (delegateWrapper == null) return;
   345             if (previous != null) {
   340             if (previous != null) {
   346                 previous.dropSubscription();
   341                 previous.dropSubscription();
   347             }
   342             }
   348             onNewSubscription(delegateWrapper,
   343             onNewSubscription(delegateWrapper, subscription);
   349                               delegateWrapper::onSubscribe,
       
   350                               subscription);
       
   351         }
   344         }
   352 
   345 
   353         @Override
   346         @Override
   354         public void dropSubscription() {
   347         public void dropSubscription() {
   355             DelegateWrapper subscriberImpl = subscribed;
   348             DelegateWrapper subscriberImpl = subscribed;
   356             if (subscriberImpl != null) {
   349             if (subscriberImpl != null) {
   357                 subscriberImpl.dropSubscription();
   350                 subscriberImpl.dropSubscription();
   358             }
   351             }
   359         }
       
   360 
       
   361         @Override
       
   362         public void onConnection(Flow.Subscription subscription) {
       
   363             debug.log(Level.DEBUG,
       
   364                       "SSLSubscriberWrapper (reader) onConnection(%s)",
       
   365                       subscription);
       
   366             onSubscribeImpl(subscription);
       
   367         }
   352         }
   368 
   353 
   369         @Override
   354         @Override
   370         public void onSubscribe(Flow.Subscription subscription) {
   355         public void onSubscribe(Flow.Subscription subscription) {
   371             debug.log(Level.DEBUG,
   356             debug.log(Level.DEBUG,
   372                       "SSLSubscriberWrapper (reader) onSubscribe(%s)",
   357                       "SSLSubscriberWrapper (reader) onSubscribe(%s)",
   373                       subscription);
   358                       subscription);
   374             onSubscribeImpl(subscription);
   359             onSubscribeImpl(subscription);
   375         }
   360         }
   376 
   361 
   377         // called in the reader flow, from either onSubscribe or onConnection.
   362         // called in the reader flow, from onSubscribe.
   378         private void onSubscribeImpl(Flow.Subscription subscription) {
   363         private void onSubscribeImpl(Flow.Subscription subscription) {
   379             assert subscription != null;
   364             assert subscription != null;
   380             DelegateWrapper subscriberImpl, pending;
   365             DelegateWrapper subscriberImpl, pending;
   381             synchronized (this) {
   366             synchronized (this) {
   382                 readSubscription = subscription;
   367                 readSubscription = subscription;
   393 
   378 
   394             if (pending == null) {
   379             if (pending == null) {
   395                 // There is no pending delegate, but we have a previously
   380                 // There is no pending delegate, but we have a previously
   396                 // subscribed delegate. This is obviously a re-subscribe.
   381                 // subscribed delegate. This is obviously a re-subscribe.
   397                 // We are in the downstream reader flow, so we should call
   382                 // We are in the downstream reader flow, so we should call
   398                 // onConnection directly.
   383                 // onSubscribe directly.
   399                 debug.log(Level.DEBUG,
   384                 debug.log(Level.DEBUG,
   400                       "SSLSubscriberWrapper (reader) onSubscribeImpl: %s",
   385                       "SSLSubscriberWrapper (reader) onSubscribeImpl: %s",
   401                       "resubscribing");
   386                       "resubscribing");
   402                 onNewSubscription(subscriberImpl,
   387                 onNewSubscription(subscriberImpl, subscription);
   403                                   subscriberImpl::onConnection,
       
   404                                   subscription);
       
   405             } else {
   388             } else {
   406                 // We have some pending subscriber: subscribe it now that we have
   389                 // We have some pending subscriber: subscribe it now that we have
   407                 // a subscription. If we already had a previous delegate then
   390                 // a subscription. If we already had a previous delegate then
   408                 // it will get a dropSubscription().
   391                 // it will get a dropSubscription().
   409                 debug.log(Level.DEBUG,
   392                 debug.log(Level.DEBUG,
   412                 processPendingSubscriber();
   395                 processPendingSubscriber();
   413             }
   396             }
   414         }
   397         }
   415 
   398 
   416         private void onNewSubscription(DelegateWrapper subscriberImpl,
   399         private void onNewSubscription(DelegateWrapper subscriberImpl,
   417                                        Consumer<Flow.Subscription> method,
       
   418                                        Flow.Subscription subscription) {
   400                                        Flow.Subscription subscription) {
   419             assert subscriberImpl != null;
   401             assert subscriberImpl != null;
   420             assert method != null;
       
   421             assert subscription != null;
   402             assert subscription != null;
   422 
   403 
   423             Throwable failed;
   404             Throwable failed;
   424             boolean completed;
   405             boolean completed;
   425             // reset any demand that may have been made by the previous
   406             // reset any demand that may have been made by the previous
   426             // subscriber
   407             // subscriber
   427             sslDelegate.resetReaderDemand();
   408             sslDelegate.resetReaderDemand();
   428             // send the subscription to the subscriber.
   409             // send the subscription to the subscriber.
   429             method.accept(subscription);
   410             subscriberImpl.onSubscribe(subscription);
   430 
   411 
   431             // The following twisted logic is just here that we don't invoke
   412             // The following twisted logic is just here that we don't invoke
   432             // onError before onSubscribe. It also prevents race conditions
   413             // onError before onSubscribe. It also prevents race conditions
   433             // if onError is invoked concurrently with setDelegate.
   414             // if onError is invoked concurrently with setDelegate.
   434             synchronized (this) {
   415             synchronized (this) {
   480         }
   461         }
   481 
   462 
   482         private boolean handshaking() {
   463         private boolean handshaking() {
   483             HandshakeStatus hs = engine.getHandshakeStatus();
   464             HandshakeStatus hs = engine.getHandshakeStatus();
   484             return !(hs == NOT_HANDSHAKING || hs == FINISHED);
   465             return !(hs == NOT_HANDSHAKING || hs == FINISHED);
       
   466         }
       
   467 
       
   468         private boolean handshakeFailed() {
       
   469             // sslDelegate can be null if we reach here
       
   470             // during the initial handshake, as that happens
       
   471             // within the SSLFlowDelegate constructor.
       
   472             // In that case we will want to raise an exception.
       
   473             return handshaking()
       
   474                     && (sslDelegate == null
       
   475                     || !sslDelegate.closeNotifyReceived());
   485         }
   476         }
   486 
   477 
   487         @Override
   478         @Override
   488         public void onComplete() {
   479         public void onComplete() {
   489             assert !finished && !onCompleteReceived;
   480             assert !finished && !onCompleteReceived;
   491             DelegateWrapper subscriberImpl;
   482             DelegateWrapper subscriberImpl;
   492             synchronized(this) {
   483             synchronized(this) {
   493                 subscriberImpl = subscribed;
   484                 subscriberImpl = subscribed;
   494             }
   485             }
   495 
   486 
   496             if (handshaking()) {
   487             if (handshakeFailed()) {
       
   488                 debug.log(Level.DEBUG,
       
   489                         "handshake: %s, inbound done: %s outbound done: %s",
       
   490                         engine.getHandshakeStatus(),
       
   491                         engine.isInboundDone(),
       
   492                         engine.isOutboundDone());
   497                 onErrorImpl(new SSLHandshakeException(
   493                 onErrorImpl(new SSLHandshakeException(
   498                         "Remote host terminated the handshake"));
   494                         "Remote host terminated the handshake"));
   499             } else if (subscriberImpl != null) {
   495             } else if (subscriberImpl != null) {
   500                 finished = true;
   496                 finished = true;
   501                 subscriberImpl.onComplete();
   497                 subscriberImpl.onComplete();