src/java.net.http/share/classes/jdk/internal/net/http/Stream.java
branchhttp-client-branch
changeset 56616 5d2446adafaf
parent 56531 15ff86a732ea
child 56618 e4022357f852
equal deleted inserted replaced
56604:8a808d85fc1a 56616:5d2446adafaf
    37 import java.util.concurrent.ConcurrentLinkedDeque;
    37 import java.util.concurrent.ConcurrentLinkedDeque;
    38 import java.util.concurrent.ConcurrentLinkedQueue;
    38 import java.util.concurrent.ConcurrentLinkedQueue;
    39 import java.util.concurrent.Executor;
    39 import java.util.concurrent.Executor;
    40 import java.util.concurrent.Flow;
    40 import java.util.concurrent.Flow;
    41 import java.util.concurrent.Flow.Subscription;
    41 import java.util.concurrent.Flow.Subscription;
       
    42 import java.util.concurrent.atomic.AtomicBoolean;
    42 import java.util.concurrent.atomic.AtomicReference;
    43 import java.util.concurrent.atomic.AtomicReference;
    43 import java.util.function.BiPredicate;
    44 import java.util.function.BiPredicate;
    44 import java.net.http.HttpClient;
    45 import java.net.http.HttpClient;
    45 import java.net.http.HttpHeaders;
    46 import java.net.http.HttpHeaders;
    46 import java.net.http.HttpRequest;
    47 import java.net.http.HttpRequest;
   130 
   131 
   131     /** True if END_STREAM has been seen in a frame received on this stream. */
   132     /** True if END_STREAM has been seen in a frame received on this stream. */
   132     private volatile boolean remotelyClosed;
   133     private volatile boolean remotelyClosed;
   133     private volatile boolean closed;
   134     private volatile boolean closed;
   134     private volatile boolean endStreamSent;
   135     private volatile boolean endStreamSent;
       
   136 
       
   137     final AtomicBoolean deRegistered = new AtomicBoolean(false);
   135 
   138 
   136     // state flags
   139     // state flags
   137     private boolean requestSent, responseReceived;
   140     private boolean requestSent, responseReceived;
   138 
   141 
   139     /**
   142     /**
   183                 if (size == 0 && finished) {
   186                 if (size == 0 && finished) {
   184                     inputQ.remove();
   187                     inputQ.remove();
   185                     Log.logTrace("responseSubscriber.onComplete");
   188                     Log.logTrace("responseSubscriber.onComplete");
   186                     if (debug.on()) debug.log("incoming: onComplete");
   189                     if (debug.on()) debug.log("incoming: onComplete");
   187                     sched.stop();
   190                     sched.stop();
       
   191                     connection.decrementStreamsCount(streamid);
   188                     subscriber.onComplete();
   192                     subscriber.onComplete();
   189                     onCompleteCalled = true;
   193                     onCompleteCalled = true;
   190                     setEndStreamReceived();
   194                     setEndStreamReceived();
   191                     return;
   195                     return;
   192                 } else if (userSubscription.tryDecrement()) {
   196                 } else if (userSubscription.tryDecrement()) {
   247             // closed or half closed.
   251             // closed or half closed.
   248             windowUpdater.update(len);
   252             windowUpdater.update(len);
   249             return false; // more data coming
   253             return false; // more data coming
   250         }
   254         }
   251         return true; // end of stream
   255         return true; // end of stream
       
   256     }
       
   257 
       
   258     boolean deRegister() {
       
   259         return deRegistered.compareAndSet(false, true);
   252     }
   260     }
   253 
   261 
   254     @Override
   262     @Override
   255     CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
   263     CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
   256                                        boolean returnConnectionToPool,
   264                                        boolean returnConnectionToPool,
   470                 }
   478                 }
   471                 if (responseBodyCF != null) {
   479                 if (responseBodyCF != null) {
   472                     responseBodyCF.completeExceptionally(errorRef.get());
   480                     responseBodyCF.completeExceptionally(errorRef.get());
   473                 }
   481                 }
   474             } finally {
   482             } finally {
       
   483                 connection.decrementStreamsCount(streamid);
   475                 connection.closeStream(streamid);
   484                 connection.closeStream(streamid);
   476             }
   485             }
   477         } else {
   486         } else {
   478             Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
   487             Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
   479         }
   488         }
   686     @Override
   695     @Override
   687     void released() {
   696     void released() {
   688         if (streamid > 0) {
   697         if (streamid > 0) {
   689             if (debug.on()) debug.log("Released stream %d", streamid);
   698             if (debug.on()) debug.log("Released stream %d", streamid);
   690             // remove this stream from the Http2Connection map.
   699             // remove this stream from the Http2Connection map.
       
   700             connection.decrementStreamsCount(streamid);
   691             connection.closeStream(streamid);
   701             connection.closeStream(streamid);
   692         } else {
   702         } else {
   693             if (debug.on()) debug.log("Can't release stream %d", streamid);
   703             if (debug.on()) debug.log("Can't release stream %d", streamid);
   694         }
   704         }
   695     }
   705     }
  1100             responseBodyCF.completeExceptionally(errorRef.get());
  1110             responseBodyCF.completeExceptionally(errorRef.get());
  1101         }
  1111         }
  1102         try {
  1112         try {
  1103             // will send a RST_STREAM frame
  1113             // will send a RST_STREAM frame
  1104             if (streamid != 0) {
  1114             if (streamid != 0) {
       
  1115                 connection.decrementStreamsCount(streamid);
  1105                 e = Utils.getCompletionCause(e);
  1116                 e = Utils.getCompletionCause(e);
  1106                 if (e instanceof EOFException) {
  1117                 if (e instanceof EOFException) {
  1107                     // read EOF: no need to try & send reset
  1118                     // read EOF: no need to try & send reset
  1108                     connection.closeStream(streamid);
  1119                     connection.closeStream(streamid);
  1109                 } else {
  1120                 } else {