42 import jdk.internal.net.http.ResponseContent.BodyParser; |
42 import jdk.internal.net.http.ResponseContent.BodyParser; |
43 import jdk.internal.net.http.common.Log; |
43 import jdk.internal.net.http.common.Log; |
44 import jdk.internal.net.http.common.Logger; |
44 import jdk.internal.net.http.common.Logger; |
45 import jdk.internal.net.http.common.MinimalFuture; |
45 import jdk.internal.net.http.common.MinimalFuture; |
46 import jdk.internal.net.http.common.Utils; |
46 import jdk.internal.net.http.common.Utils; |
47 |
|
48 import static java.net.http.HttpClient.Version.HTTP_1_1; |
47 import static java.net.http.HttpClient.Version.HTTP_1_1; |
49 import static java.net.http.HttpResponse.BodySubscribers.discarding; |
48 import static java.net.http.HttpResponse.BodySubscribers.discarding; |
|
49 import static jdk.internal.net.http.common.Utils.wrapWithExtraDetail; |
|
50 import static jdk.internal.net.http.RedirectFilter.HTTP_NOT_MODIFIED; |
50 |
51 |
51 /** |
52 /** |
52 * Handles a HTTP/1.1 response (headers + body). |
53 * Handles a HTTP/1.1 response (headers + body). |
53 * There can be more than one of these per Http exchange. |
54 * There can be more than one of these per Http exchange. |
54 */ |
55 */ |
74 private volatile State readProgress = State.INITIAL; |
75 private volatile State readProgress = State.INITIAL; |
75 |
76 |
76 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); |
77 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); |
77 final static AtomicLong responseCount = new AtomicLong(); |
78 final static AtomicLong responseCount = new AtomicLong(); |
78 final long id = responseCount.incrementAndGet(); |
79 final long id = responseCount.incrementAndGet(); |
|
80 private Http1HeaderParser hd; |
79 |
81 |
80 Http1Response(HttpConnection conn, |
82 Http1Response(HttpConnection conn, |
81 Http1Exchange<T> exchange, |
83 Http1Exchange<T> exchange, |
82 Http1AsyncReceiver asyncReceiver) { |
84 Http1AsyncReceiver asyncReceiver) { |
83 this.readProgress = State.INITIAL; |
85 this.readProgress = State.INITIAL; |
85 this.exchange = exchange; |
87 this.exchange = exchange; |
86 this.connection = conn; |
88 this.connection = conn; |
87 this.asyncReceiver = asyncReceiver; |
89 this.asyncReceiver = asyncReceiver; |
88 headersReader = new HeadersReader(this::advance); |
90 headersReader = new HeadersReader(this::advance); |
89 bodyReader = new BodyReader(this::advance); |
91 bodyReader = new BodyReader(this::advance); |
|
92 |
|
93 hd = new Http1HeaderParser(); |
|
94 readProgress = State.READING_HEADERS; |
|
95 headersReader.start(hd); |
|
96 asyncReceiver.subscribe(headersReader); |
90 } |
97 } |
91 |
98 |
92 String dbgTag; |
99 String dbgTag; |
93 private String dbgString() { |
100 private String dbgString() { |
94 String dbg = dbgTag; |
101 String dbg = dbgTag; |
148 } |
155 } |
149 state |= 0x02; |
156 state |= 0x02; |
150 } |
157 } |
151 } |
158 } |
152 |
159 |
|
160 private volatile boolean firstTimeAround = true; |
|
161 |
153 public CompletableFuture<Response> readHeadersAsync(Executor executor) { |
162 public CompletableFuture<Response> readHeadersAsync(Executor executor) { |
154 if (debug.on()) |
163 if (debug.on()) |
155 debug.log("Reading Headers: (remaining: " |
164 debug.log("Reading Headers: (remaining: " |
156 + asyncReceiver.remaining() +") " + readProgress); |
165 + asyncReceiver.remaining() +") " + readProgress); |
157 // with expect continue we will resume reading headers + body. |
166 |
158 asyncReceiver.unsubscribe(bodyReader); |
167 if (firstTimeAround) { |
159 bodyReader.reset(); |
168 if (debug.on()) debug.log("First time around"); |
160 Http1HeaderParser hd = new Http1HeaderParser(); |
169 firstTimeAround = false; |
161 readProgress = State.READING_HEADERS; |
170 } else { |
162 headersReader.start(hd); |
171 // with expect continue we will resume reading headers + body. |
163 asyncReceiver.subscribe(headersReader); |
172 asyncReceiver.unsubscribe(bodyReader); |
|
173 bodyReader.reset(); |
|
174 |
|
175 hd = new Http1HeaderParser(); |
|
176 readProgress = State.READING_HEADERS; |
|
177 headersReader.reset(); |
|
178 headersReader.start(hd); |
|
179 asyncReceiver.subscribe(headersReader); |
|
180 } |
|
181 |
164 CompletableFuture<State> cf = headersReader.completion(); |
182 CompletableFuture<State> cf = headersReader.completion(); |
165 assert cf != null : "parsing not started"; |
183 assert cf != null : "parsing not started"; |
|
184 if (debug.on()) { |
|
185 debug.log("headersReader is %s", |
|
186 cf == null ? "not yet started" |
|
187 : cf.isDone() ? "already completed" |
|
188 : "not yet completed"); |
|
189 } |
166 |
190 |
167 Function<State, Response> lambda = (State completed) -> { |
191 Function<State, Response> lambda = (State completed) -> { |
168 assert completed == State.READING_HEADERS; |
192 assert completed == State.READING_HEADERS; |
169 if (debug.on()) |
193 if (debug.on()) |
170 debug.log("Reading Headers: creating Response object;" |
194 debug.log("Reading Headers: creating Response object;" |
205 synchronized boolean finished() { |
229 synchronized boolean finished() { |
206 return finished; |
230 return finished; |
207 } |
231 } |
208 |
232 |
209 int fixupContentLen(int clen) { |
233 int fixupContentLen(int clen) { |
210 if (request.method().equalsIgnoreCase("HEAD")) { |
234 if (request.method().equalsIgnoreCase("HEAD") || responseCode == HTTP_NOT_MODIFIED) { |
211 return 0; |
235 return 0; |
212 } |
236 } |
213 if (clen == -1) { |
237 if (clen == -1) { |
214 if (headers.firstValue("Transfer-encoding").orElse("") |
238 if (headers.firstValue("Transfer-encoding").orElse("") |
215 .equalsIgnoreCase("chunked")) { |
239 .equalsIgnoreCase("chunked")) { |
287 if (t == null) { |
311 if (t == null) { |
288 assert subscribed; |
312 assert subscribed; |
289 try { |
313 try { |
290 userSubscriber.onComplete(); |
314 userSubscriber.onComplete(); |
291 } catch (Throwable x) { |
315 } catch (Throwable x) { |
|
316 // Simply propagate the error by calling |
|
317 // onError on the user subscriber, and let the |
|
318 // connection be reused since we should have received |
|
319 // and parsed all the bytes when we reach here. |
|
320 // If onError throws in turn, then we will simply |
|
321 // let that new exception flow up to the caller |
|
322 // and let it deal with it. |
|
323 // (i.e: log and close the connection) |
|
324 // Note that rethrowing here could introduce a |
|
325 // race that might cause the next send() operation to |
|
326 // fail as the connection has already been put back |
|
327 // into the cache when we reach here. |
292 propagateError(t = withError = Utils.getCompletionCause(x)); |
328 propagateError(t = withError = Utils.getCompletionCause(x)); |
293 // rethrow and let the caller deal with it. |
|
294 // (i.e: log and close the connection) |
|
295 // arguably we could decide to not throw and let the |
|
296 // connection be reused since we should have received and |
|
297 // parsed all the bytes when we reach here. |
|
298 throw x; |
|
299 } |
329 } |
300 } else { |
330 } else { |
301 propagateError(t); |
331 propagateError(t); |
302 } |
332 } |
303 } |
333 } |
690 return accept(b, parser, cf); |
721 return accept(b, parser, cf); |
691 } |
722 } |
692 |
723 |
693 @Override |
724 @Override |
694 public final void onReadError(Throwable t) { |
725 public final void onReadError(Throwable t) { |
|
726 t = wrapWithExtraDetail(t, parser::currentStateMessage); |
695 Http1Response.this.onReadError(t); |
727 Http1Response.this.onReadError(t); |
696 } |
728 } |
697 |
729 |
698 @Override |
730 @Override |
699 public AbstractSubscription subscription() { |
731 public AbstractSubscription subscription() { |