equal
deleted
inserted
replaced
38 import java.util.function.Consumer; |
38 import java.util.function.Consumer; |
39 import java.util.function.Function; |
39 import java.util.function.Function; |
40 import java.net.http.HttpHeaders; |
40 import java.net.http.HttpHeaders; |
41 import java.net.http.HttpResponse; |
41 import java.net.http.HttpResponse; |
42 import jdk.internal.net.http.ResponseContent.BodyParser; |
42 import jdk.internal.net.http.ResponseContent.BodyParser; |
|
43 import jdk.internal.net.http.ResponseContent.UnknownLengthBodyParser; |
43 import jdk.internal.net.http.common.Log; |
44 import jdk.internal.net.http.common.Log; |
44 import jdk.internal.net.http.common.Logger; |
45 import jdk.internal.net.http.common.Logger; |
45 import jdk.internal.net.http.common.MinimalFuture; |
46 import jdk.internal.net.http.common.MinimalFuture; |
46 import jdk.internal.net.http.common.Utils; |
47 import jdk.internal.net.http.common.Utils; |
47 import static java.net.http.HttpClient.Version.HTTP_1_1; |
48 import static java.net.http.HttpClient.Version.HTTP_1_1; |
65 private boolean return2Cache; // return connection to cache when finished |
66 private boolean return2Cache; // return connection to cache when finished |
66 private final HeadersReader headersReader; // used to read the headers |
67 private final HeadersReader headersReader; // used to read the headers |
67 private final BodyReader bodyReader; // used to read the body |
68 private final BodyReader bodyReader; // used to read the body |
68 private final Http1AsyncReceiver asyncReceiver; |
69 private final Http1AsyncReceiver asyncReceiver; |
69 private volatile EOFException eof; |
70 private volatile EOFException eof; |
|
71 private volatile BodyParser bodyParser; |
70 // max number of bytes of (fixed length) body to ignore on redirect |
72 // max number of bytes of (fixed length) body to ignore on redirect |
71 private final static int MAX_IGNORE = 1024; |
73 private final static int MAX_IGNORE = 1024; |
72 |
74 |
73 // Revisit: can we get rid of this? |
75 // Revisit: can we get rid of this? |
74 static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE} |
76 static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE} |
228 |
230 |
229 synchronized boolean finished() { |
231 synchronized boolean finished() { |
230 return finished; |
232 return finished; |
231 } |
233 } |
232 |
234 |
|
235 /** |
|
236 * Return known fixed content length or -1 if chunked, or -2 if no content-length |
|
237 * information in which case, connection termination delimits the response body |
|
238 */ |
233 int fixupContentLen(int clen) { |
239 int fixupContentLen(int clen) { |
234 if (request.method().equalsIgnoreCase("HEAD") || responseCode == HTTP_NOT_MODIFIED) { |
240 if (request.method().equalsIgnoreCase("HEAD") || responseCode == HTTP_NOT_MODIFIED) { |
235 return 0; |
241 return 0; |
236 } |
242 } |
237 if (clen == -1) { |
243 if (clen == -1) { |
238 if (headers.firstValue("Transfer-encoding").orElse("") |
244 if (headers.firstValue("Transfer-encoding").orElse("") |
239 .equalsIgnoreCase("chunked")) { |
245 .equalsIgnoreCase("chunked")) { |
240 return -1; |
246 return -1; |
241 } |
247 } |
242 return 0; |
248 if (responseCode == 101) { |
|
249 // this is a h2c or websocket upgrade, contentlength must be zero |
|
250 return 0; |
|
251 } |
|
252 return -2; |
243 } |
253 } |
244 return clen; |
254 return clen; |
245 } |
255 } |
246 |
256 |
247 /** |
257 /** |
399 } |
409 } |
400 // increment the reference count on the HttpClientImpl |
410 // increment the reference count on the HttpClientImpl |
401 // to prevent the SelectorManager thread from exiting until |
411 // to prevent the SelectorManager thread from exiting until |
402 // the body is fully read. |
412 // the body is fully read. |
403 refCountTracker.acquire(); |
413 refCountTracker.acquire(); |
404 bodyReader.start(content.getBodyParser( |
414 bodyParser = content.getBodyParser( |
405 (t) -> { |
415 (t) -> { |
406 try { |
416 try { |
407 if (t != null) { |
417 if (t != null) { |
408 try { |
418 try { |
409 subscriber.onError(t); |
419 subscriber.onError(t); |
415 bodyReader.onComplete(t); |
425 bodyReader.onComplete(t); |
416 if (t != null) { |
426 if (t != null) { |
417 connection.close(); |
427 connection.close(); |
418 } |
428 } |
419 } |
429 } |
420 })); |
430 }); |
|
431 bodyReader.start(bodyParser); |
421 CompletableFuture<State> bodyReaderCF = bodyReader.completion(); |
432 CompletableFuture<State> bodyReaderCF = bodyReader.completion(); |
422 asyncReceiver.subscribe(bodyReader); |
433 asyncReceiver.subscribe(bodyReader); |
423 assert bodyReaderCF != null : "parsing not started"; |
434 assert bodyReaderCF != null : "parsing not started"; |
424 // Make sure to keep a reference to asyncReceiver from |
435 // Make sure to keep a reference to asyncReceiver from |
425 // within this |
436 // within this |
721 return accept(b, parser, cf); |
732 return accept(b, parser, cf); |
722 } |
733 } |
723 |
734 |
724 @Override |
735 @Override |
725 public final void onReadError(Throwable t) { |
736 public final void onReadError(Throwable t) { |
|
737 if (t instanceof EOFException && bodyParser != null && |
|
738 bodyParser instanceof UnknownLengthBodyParser) { |
|
739 ((UnknownLengthBodyParser)bodyParser).complete(); |
|
740 return; |
|
741 } |
726 t = wrapWithExtraDetail(t, parser::currentStateMessage); |
742 t = wrapWithExtraDetail(t, parser::currentStateMessage); |
727 Http1Response.this.onReadError(t); |
743 Http1Response.this.onReadError(t); |
728 } |
744 } |
729 |
745 |
730 @Override |
746 @Override |