equal
deleted
inserted
replaced
111 |
111 |
112 long requestContentLen; |
112 long requestContentLen; |
113 |
113 |
114 final Http2Connection connection; |
114 final Http2Connection connection; |
115 final HttpRequestImpl request; |
115 final HttpRequestImpl request; |
116 final DecodingCallback rspHeadersConsumer; |
116 final HeadersConsumer rspHeadersConsumer; |
117 HttpHeadersImpl responseHeaders; |
117 final HttpHeadersImpl responseHeaders; |
118 final HttpHeadersImpl requestPseudoHeaders; |
118 final HttpHeadersImpl requestPseudoHeaders; |
119 volatile HttpResponse.BodySubscriber<T> responseSubscriber; |
119 volatile HttpResponse.BodySubscriber<T> responseSubscriber; |
120 final HttpRequest.BodyPublisher requestPublisher; |
120 final HttpRequest.BodyPublisher requestPublisher; |
121 volatile RequestSubscriber requestSubscriber; |
121 volatile RequestSubscriber requestSubscriber; |
122 volatile int responseCode; |
122 volatile int responseCode; |
230 boolean returnConnectionToPool, |
230 boolean returnConnectionToPool, |
231 Executor executor) |
231 Executor executor) |
232 { |
232 { |
233 try { |
233 try { |
234 Log.logTrace("Reading body on stream {0}", streamid); |
234 Log.logTrace("Reading body on stream {0}", streamid); |
235 BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders); |
235 BodySubscriber<T> bodySubscriber = handler.apply(responseCode, response.headers); |
236 CompletableFuture<T> cf = receiveData(bodySubscriber, executor); |
236 CompletableFuture<T> cf = receiveData(bodySubscriber, executor); |
237 |
237 |
238 PushGroup<?> pg = exchange.getPushGroup(); |
238 PushGroup<?> pg = exchange.getPushGroup(); |
239 if (pg != null) { |
239 if (pg != null) { |
240 // if an error occurs make sure it is recorded in the PushGroup |
240 // if an error occurs make sure it is recorded in the PushGroup |
241 cf = cf.whenComplete((t, e) -> pg.pushError(e)); |
241 cf = cf.whenComplete((t, e) -> pg.pushError(e)); |
242 } |
242 } |
243 return cf; |
243 return cf; |
244 } catch (Throwable t) { |
244 } catch (Throwable t) { |
245 // may be thrown by handler.apply |
245 // may be thrown by handler.apply |
|
246 cancelImpl(t); |
246 return MinimalFuture.failedFuture(t); |
247 return MinimalFuture.failedFuture(t); |
247 } |
248 } |
248 } |
249 } |
249 |
250 |
250 @Override |
251 @Override |
384 if (Log.headers()) { |
385 if (Log.headers()) { |
385 StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n"); |
386 StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n"); |
386 Log.dumpHeaders(sb, " ", responseHeaders); |
387 Log.dumpHeaders(sb, " ", responseHeaders); |
387 Log.logHeaders(sb.toString()); |
388 Log.logHeaders(sb.toString()); |
388 } |
389 } |
|
390 |
|
391 // this will clear the response headers |
|
392 rspHeadersConsumer.reset(); |
389 |
393 |
390 completeResponse(response); |
394 completeResponse(response); |
391 } |
395 } |
392 |
396 |
393 void incoming_reset(ResetFrame frame) { |
397 void incoming_reset(ResetFrame frame) { |
1143 sb.append(" (streamid=").append(streamid).append("): "); |
1147 sb.append(" (streamid=").append(streamid).append("): "); |
1144 Log.dumpHeaders(sb, " ", responseHeaders); |
1148 Log.dumpHeaders(sb, " ", responseHeaders); |
1145 Log.logHeaders(sb.toString()); |
1149 Log.logHeaders(sb.toString()); |
1146 } |
1150 } |
1147 |
1151 |
|
1152 rspHeadersConsumer.reset(); |
|
1153 |
1148 // different implementations for normal streams and pushed streams |
1154 // different implementations for normal streams and pushed streams |
1149 completeResponse(response); |
1155 completeResponse(response); |
1150 } |
1156 } |
1151 } |
1157 } |
1152 |
1158 |
1181 final String dbgString() { |
1187 final String dbgString() { |
1182 return connection.dbgString() + "/Stream("+streamid+")"; |
1188 return connection.dbgString() + "/Stream("+streamid+")"; |
1183 } |
1189 } |
1184 |
1190 |
1185 private class HeadersConsumer extends Http2Connection.ValidatingHeadersConsumer { |
1191 private class HeadersConsumer extends Http2Connection.ValidatingHeadersConsumer { |
|
1192 |
|
1193 void reset() { |
|
1194 responseHeaders.clear(); |
|
1195 } |
1186 |
1196 |
1187 @Override |
1197 @Override |
1188 public void onDecoded(CharSequence name, CharSequence value) |
1198 public void onDecoded(CharSequence name, CharSequence value) |
1189 throws UncheckedIOException |
1199 throws UncheckedIOException |
1190 { |
1200 { |