src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java
changeset 50681 4254bed3c09d
parent 49944 4690a2871b44
child 50985 cd41f34e548c
child 56795 03ece2518428
equal deleted inserted replaced
50678:818a23db260c 50681:4254bed3c09d
   307             Flow.Subscription subscription;
   307             Flow.Subscription subscription;
   308             boolean handleNow;
   308             boolean handleNow;
   309             synchronized (this) {
   309             synchronized (this) {
   310                 previous = pendingDelegate.getAndSet(delegateWrapper);
   310                 previous = pendingDelegate.getAndSet(delegateWrapper);
   311                 subscription = readSubscription;
   311                 subscription = readSubscription;
   312                 handleNow = this.errorRef.get() != null || finished;
   312                 handleNow = this.errorRef.get() != null || onCompleteReceived;
   313             }
   313             }
   314             if (previous != null) {
   314             if (previous != null) {
   315                 previous.dropSubscription();
   315                 previous.dropSubscription();
   316             }
   316             }
   317             if (subscription == null) {
   317             if (subscription == null) {
   422             // The following twisted logic is just here that we don't invoke
   422             // The following twisted logic is just here that we don't invoke
   423             // onError before onSubscribe. It also prevents race conditions
   423             // onError before onSubscribe. It also prevents race conditions
   424             // if onError is invoked concurrently with setDelegate.
   424             // if onError is invoked concurrently with setDelegate.
   425             synchronized (this) {
   425             synchronized (this) {
   426                 failed = this.errorRef.get();
   426                 failed = this.errorRef.get();
   427                 completed = finished;
   427                 completed = onCompleteReceived;
   428                 subscribed = subscriberImpl;
   428                 subscribed = subscriberImpl;
   429             }
   429             }
       
   430 
   430             if (failed != null) {
   431             if (failed != null) {
       
   432                 if (debug.on())
       
   433                     debug.log("onNewSubscription: subscriberImpl:%s, invoking onError:%s",
       
   434                               subscriberImpl, failed);
   431                 subscriberImpl.onError(failed);
   435                 subscriberImpl.onError(failed);
   432             } else if (completed) {
   436             } else if (completed) {
       
   437                 if (debug.on())
       
   438                     debug.log("onNewSubscription: subscriberImpl:%s, invoking onCompleted",
       
   439                               subscriberImpl);
       
   440                 finished = true;
   433                 subscriberImpl.onComplete();
   441                 subscriberImpl.onComplete();
   434             }
   442             }
   435         }
   443         }
   436 
   444 
   437         @Override
   445         @Override
   488         }
   496         }
   489 
   497 
   490         @Override
   498         @Override
   491         public void onComplete() {
   499         public void onComplete() {
   492             assert !finished && !onCompleteReceived;
   500             assert !finished && !onCompleteReceived;
   493             onCompleteReceived = true;
       
   494             DelegateWrapper subscriberImpl;
   501             DelegateWrapper subscriberImpl;
   495             synchronized(this) {
   502             synchronized(this) {
   496                 subscriberImpl = subscribed;
   503                 subscriberImpl = subscribed;
   497             }
   504             }
   498 
   505 
   503                               engine.isInboundDone(),
   510                               engine.isInboundDone(),
   504                               engine.isOutboundDone());
   511                               engine.isOutboundDone());
   505                 onErrorImpl(new SSLHandshakeException(
   512                 onErrorImpl(new SSLHandshakeException(
   506                         "Remote host terminated the handshake"));
   513                         "Remote host terminated the handshake"));
   507             } else if (subscriberImpl != null) {
   514             } else if (subscriberImpl != null) {
   508                 finished = true;
   515                 onCompleteReceived = finished = true;
   509                 subscriberImpl.onComplete();
   516                 subscriberImpl.onComplete();
       
   517             } else {
       
   518                 onCompleteReceived = true;
   510             }
   519             }
   511             // now if we have any pending subscriber, we should complete
   520             // now if we have any pending subscriber, we should complete
   512             // them immediately as the read scheduler will already be stopped.
   521             // them immediately as the read scheduler will already be stopped.
   513             processPendingSubscriber();
   522             processPendingSubscriber();
   514         }
   523         }
   526     private final Demand writeDemand = new Demand();
   535     private final Demand writeDemand = new Demand();
   527 
   536 
   528     final class SSLSubscriptionWrapper implements Flow.Subscription {
   537     final class SSLSubscriptionWrapper implements Flow.Subscription {
   529 
   538 
   530         volatile Flow.Subscription delegate;
   539         volatile Flow.Subscription delegate;
       
   540         private volatile boolean cancelled;
   531 
   541 
   532         void setSubscription(Flow.Subscription sub) {
   542         void setSubscription(Flow.Subscription sub) {
   533             long demand = writeDemand.get(); // FIXME: isn't it a racy way of passing the demand?
   543             long demand = writeDemand.get(); // FIXME: isn't it a racy way of passing the demand?
   534             delegate = sub;
   544             delegate = sub;
   535             if (debug.on()) debug.log("setSubscription: demand=%d", demand);
   545             if (debug.on())
   536             if (demand > 0)
   546                 debug.log("setSubscription: demand=%d, cancelled:%s", demand, cancelled);
       
   547 
       
   548             if (cancelled)
       
   549                 delegate.cancel();
       
   550             else if (demand > 0)
   537                 sub.request(demand);
   551                 sub.request(demand);
   538         }
   552         }
   539 
   553 
   540         @Override
   554         @Override
   541         public void request(long n) {
   555         public void request(long n) {
   547             }
   561             }
   548         }
   562         }
   549 
   563 
   550         @Override
   564         @Override
   551         public void cancel() {
   565         public void cancel() {
   552             // TODO:  no-op or error?
   566             cancelled = true;
       
   567             if (delegate != null)
       
   568                 delegate.cancel();
   553         }
   569         }
   554     }
   570     }
   555 
   571 
   556     /* Subscriber - writing side */
   572     /* Subscriber - writing side */
   557     @Override
   573     @Override