equal
deleted
inserted
replaced
107 long requestContentLen; |
107 long requestContentLen; |
108 |
108 |
109 final Http2Connection connection; |
109 final Http2Connection connection; |
110 final HttpRequestImpl request; |
110 final HttpRequestImpl request; |
111 final DecodingCallback rspHeadersConsumer; |
111 final DecodingCallback rspHeadersConsumer; |
|
112 volatile boolean closeConnectionOnCompletion; |
112 HttpHeadersImpl responseHeaders; |
113 HttpHeadersImpl responseHeaders; |
113 final HttpHeadersImpl requestPseudoHeaders; |
114 final HttpHeadersImpl requestPseudoHeaders; |
114 volatile HttpResponse.BodySubscriber<T> responseSubscriber; |
115 volatile HttpResponse.BodySubscriber<T> responseSubscriber; |
115 final HttpRequest.BodyPublisher requestPublisher; |
116 final HttpRequest.BodyPublisher requestPublisher; |
116 volatile RequestSubscriber requestSubscriber; |
117 volatile RequestSubscriber requestSubscriber; |
194 responseSubscriber.onError(t); |
195 responseSubscriber.onError(t); |
195 close(); |
196 close(); |
196 } |
197 } |
197 } |
198 } |
198 |
199 |
|
200 // call this anywhere the stream is terminated |
|
201 void checkConnectionClosure() { |
|
202 if (closeConnectionOnCompletion) { |
|
203 connection.close(); |
|
204 } |
|
205 } |
|
206 |
|
207 void closeConnectionOnCompletion() { |
|
208 closeConnectionOnCompletion = true; |
|
209 } |
|
210 |
199 // Callback invoked after the Response BodySubscriber has consumed the |
211 // Callback invoked after the Response BodySubscriber has consumed the |
200 // buffers contained in a DataFrame. |
212 // buffers contained in a DataFrame. |
201 // Returns true if END_STREAM is reached, false otherwise. |
213 // Returns true if END_STREAM is reached, false otherwise. |
202 private boolean consumed(DataFrame df) { |
214 private boolean consumed(DataFrame df) { |
203 // RFC 7540 6.1: |
215 // RFC 7540 6.1: |
934 connection.resetStream(streamid, ResetFrame.CANCEL); |
946 connection.resetStream(streamid, ResetFrame.CANCEL); |
935 } |
947 } |
936 } catch (IOException ex) { |
948 } catch (IOException ex) { |
937 Log.logError(ex); |
949 Log.logError(ex); |
938 } |
950 } |
|
951 checkConnectionClosure(); |
939 } |
952 } |
940 |
953 |
941 // This method doesn't send any frame |
954 // This method doesn't send any frame |
942 void close() { |
955 void close() { |
943 if (closed) return; |
956 if (closed) return; |
946 closed = true; |
959 closed = true; |
947 } |
960 } |
948 Log.logTrace("Closing stream {0}", streamid); |
961 Log.logTrace("Closing stream {0}", streamid); |
949 connection.closeStream(streamid); |
962 connection.closeStream(streamid); |
950 Log.logTrace("Stream {0} closed", streamid); |
963 Log.logTrace("Stream {0} closed", streamid); |
|
964 checkConnectionClosure(); |
951 } |
965 } |
952 |
966 |
953 static class PushedStream<U,T> extends Stream<T> { |
967 static class PushedStream<U,T> extends Stream<T> { |
954 final PushGroup<U,T> pushGroup; |
968 final PushGroup<U,T> pushGroup; |
955 // push streams need the response CF allocated up front as it is |
969 // push streams need the response CF allocated up front as it is |