src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java
branchhttp-client-branch
changeset 55983 e4a1f0c9d4c6
parent 55981 907bddce488c
child 56010 782b2f2d1e76
equal deleted inserted replaced
55982:b6ff245c0db6 55983:e4a1f0c9d4c6
   107     long requestContentLen;
   107     long requestContentLen;
   108 
   108 
   109     final Http2Connection connection;
   109     final Http2Connection connection;
   110     final HttpRequestImpl request;
   110     final HttpRequestImpl request;
   111     final DecodingCallback rspHeadersConsumer;
   111     final DecodingCallback rspHeadersConsumer;
   112     volatile boolean closeConnectionOnCompletion;
       
   113     HttpHeadersImpl responseHeaders;
   112     HttpHeadersImpl responseHeaders;
   114     final HttpHeadersImpl requestPseudoHeaders;
   113     final HttpHeadersImpl requestPseudoHeaders;
   115     volatile HttpResponse.BodySubscriber<T> responseSubscriber;
   114     volatile HttpResponse.BodySubscriber<T> responseSubscriber;
   116     final HttpRequest.BodyPublisher requestPublisher;
   115     final HttpRequest.BodyPublisher requestPublisher;
   117     volatile RequestSubscriber requestSubscriber;
   116     volatile RequestSubscriber requestSubscriber;
   195             responseSubscriber.onError(t);
   194             responseSubscriber.onError(t);
   196             close();
   195             close();
   197         }
   196         }
   198     }
   197     }
   199 
   198 
   200     // call this anywhere the stream is terminated
       
   201     void checkConnectionClosure() {
       
   202         if (closeConnectionOnCompletion) {
       
   203             connection.close();
       
   204         }
       
   205     }
       
   206 
       
   207     void closeConnectionOnCompletion() {
       
   208         closeConnectionOnCompletion = true;
       
   209     }
       
   210 
       
   211     // Callback invoked after the Response BodySubscriber has consumed the
   199     // Callback invoked after the Response BodySubscriber has consumed the
   212     // buffers contained in a DataFrame.
   200     // buffers contained in a DataFrame.
   213     // Returns true if END_STREAM is reached, false otherwise.
   201     // Returns true if END_STREAM is reached, false otherwise.
   214     private boolean consumed(DataFrame df) {
   202     private boolean consumed(DataFrame df) {
   215         // RFC 7540 6.1:
   203         // RFC 7540 6.1:
   946                 connection.resetStream(streamid, ResetFrame.CANCEL);
   934                 connection.resetStream(streamid, ResetFrame.CANCEL);
   947             }
   935             }
   948         } catch (IOException ex) {
   936         } catch (IOException ex) {
   949             Log.logError(ex);
   937             Log.logError(ex);
   950         }
   938         }
   951         checkConnectionClosure();
       
   952     }
   939     }
   953 
   940 
   954     // This method doesn't send any frame
   941     // This method doesn't send any frame
   955     void close() {
   942     void close() {
   956         if (closed) return;
   943         if (closed) return;
   959             closed = true;
   946             closed = true;
   960         }
   947         }
   961         Log.logTrace("Closing stream {0}", streamid);
   948         Log.logTrace("Closing stream {0}", streamid);
   962         connection.closeStream(streamid);
   949         connection.closeStream(streamid);
   963         Log.logTrace("Stream {0} closed", streamid);
   950         Log.logTrace("Stream {0} closed", streamid);
   964         checkConnectionClosure();
       
   965     }
   951     }
   966 
   952 
   967     static class PushedStream<U,T> extends Stream<T> {
   953     static class PushedStream<U,T> extends Stream<T> {
   968         final PushGroup<U,T> pushGroup;
   954         final PushGroup<U,T> pushGroup;
   969         // push streams need the response CF allocated up front as it is
   955         // push streams need the response CF allocated up front as it is