27 |
27 |
28 import java.io.EOFException; |
28 import java.io.EOFException; |
29 import java.lang.System.Logger.Level; |
29 import java.lang.System.Logger.Level; |
30 import java.nio.ByteBuffer; |
30 import java.nio.ByteBuffer; |
31 import java.util.concurrent.CompletableFuture; |
31 import java.util.concurrent.CompletableFuture; |
32 import java.util.concurrent.CompletionStage; |
|
33 import java.util.concurrent.Executor; |
32 import java.util.concurrent.Executor; |
34 import java.util.function.BiConsumer; |
33 import java.util.concurrent.atomic.AtomicLong; |
35 import java.util.function.Consumer; |
34 import java.util.function.Consumer; |
36 import java.util.function.Function; |
35 import java.util.function.Function; |
37 import java.net.http.HttpHeaders; |
36 import java.net.http.HttpHeaders; |
38 import java.net.http.HttpResponse; |
37 import java.net.http.HttpResponse; |
39 import jdk.internal.net.http.ResponseContent.BodyParser; |
38 import jdk.internal.net.http.ResponseContent.BodyParser; |
65 |
64 |
66 // Revisit: can we get rid of this? |
65 // Revisit: can we get rid of this? |
67 static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE} |
66 static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE} |
68 private volatile State readProgress = State.INITIAL; |
67 private volatile State readProgress = State.INITIAL; |
69 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. |
68 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. |
70 final System.Logger debug = Utils.getDebugLogger(this.getClass()::getSimpleName, DEBUG); |
69 final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); |
71 |
70 final static AtomicLong responseCount = new AtomicLong(); |
|
71 final long id = responseCount.incrementAndGet(); |
72 |
72 |
73 Http1Response(HttpConnection conn, |
73 Http1Response(HttpConnection conn, |
74 Http1Exchange<T> exchange, |
74 Http1Exchange<T> exchange, |
75 Http1AsyncReceiver asyncReceiver) { |
75 Http1AsyncReceiver asyncReceiver) { |
76 this.readProgress = State.INITIAL; |
76 this.readProgress = State.INITIAL; |
78 this.exchange = exchange; |
78 this.exchange = exchange; |
79 this.connection = conn; |
79 this.connection = conn; |
80 this.asyncReceiver = asyncReceiver; |
80 this.asyncReceiver = asyncReceiver; |
81 headersReader = new HeadersReader(this::advance); |
81 headersReader = new HeadersReader(this::advance); |
82 bodyReader = new BodyReader(this::advance); |
82 bodyReader = new BodyReader(this::advance); |
|
83 } |
|
84 |
|
85 String dbgTag; |
|
86 private String dbgString() { |
|
87 String dbg = dbgTag; |
|
88 if (dbg == null) { |
|
89 String cdbg = connection.dbgTag; |
|
90 if (cdbg != null) { |
|
91 dbgTag = dbg = "Http1Response(id=" + id + ", " + cdbg + ")"; |
|
92 } else { |
|
93 dbg = "Http1Response(id=" + id + ")"; |
|
94 } |
|
95 } |
|
96 return dbg; |
|
97 } |
|
98 |
|
99 // The ClientRefCountTracker is used to track the state |
|
100 // of a pending operation. Altough there usually is a single |
|
101 // point where the operation starts, it may terminate at |
|
102 // different places. |
|
103 private final class ClientRefCountTracker { |
|
104 final HttpClientImpl client = connection.client(); |
|
105 // state & 0x01 != 0 => acquire called |
|
106 // state & 0x02 != 0 => tryRelease called |
|
107 byte state; |
|
108 |
|
109 public synchronized void acquire() { |
|
110 if (state == 0) { |
|
111 // increment the reference count on the HttpClientImpl |
|
112 // to prevent the SelectorManager thread from exiting |
|
113 // until our operation is complete. |
|
114 debug.log(Level.DEBUG, "incrementing ref count for %s", client); |
|
115 client.reference(); |
|
116 state = 0x01; |
|
117 } else { |
|
118 assert (state & 0x01) == 0 : "reference count already incremented"; |
|
119 } |
|
120 } |
|
121 |
|
122 public synchronized void tryRelease() { |
|
123 if (state == 0x01) { |
|
124 // decrement the reference count on the HttpClientImpl |
|
125 // to allow the SelectorManager thread to exit if no |
|
126 // other operation is pending and the facade is no |
|
127 // longer referenced. |
|
128 debug.log(Level.DEBUG, "decrementing ref count for %s", client); |
|
129 client.unreference(); |
|
130 state |= 0x02; |
|
131 } |
|
132 } |
83 } |
133 } |
84 |
134 |
85 public CompletableFuture<Response> readHeadersAsync(Executor executor) { |
135 public CompletableFuture<Response> readHeadersAsync(Executor executor) { |
86 debug.log(Level.DEBUG, () -> "Reading Headers: (remaining: " |
136 debug.log(Level.DEBUG, () -> "Reading Headers: (remaining: " |
87 + asyncReceiver.remaining() +") " + readProgress); |
137 + asyncReceiver.remaining() +") " + readProgress); |
155 } else { |
205 } else { |
156 return readBody(HttpResponse.BodySubscriber.discard(), true, executor); |
206 return readBody(HttpResponse.BodySubscriber.discard(), true, executor); |
157 } |
207 } |
158 } |
208 } |
159 |
209 |
|
210 |
160 public <U> CompletableFuture<U> readBody(HttpResponse.BodySubscriber<U> p, |
211 public <U> CompletableFuture<U> readBody(HttpResponse.BodySubscriber<U> p, |
161 boolean return2Cache, |
212 boolean return2Cache, |
162 Executor executor) { |
213 Executor executor) { |
163 this.return2Cache = return2Cache; |
214 this.return2Cache = return2Cache; |
164 final HttpResponse.BodySubscriber<U> pusher = p; |
215 final HttpResponse.BodySubscriber<U> pusher = p; |
171 |
222 |
172 // expect-continue reads headers and body twice. |
223 // expect-continue reads headers and body twice. |
173 // if we reach here, we must reset the headersReader state. |
224 // if we reach here, we must reset the headersReader state. |
174 asyncReceiver.unsubscribe(headersReader); |
225 asyncReceiver.unsubscribe(headersReader); |
175 headersReader.reset(); |
226 headersReader.reset(); |
|
227 ClientRefCountTracker refCountTracker = new ClientRefCountTracker(); |
176 |
228 |
177 executor.execute(() -> { |
229 executor.execute(() -> { |
178 try { |
230 try { |
179 HttpClientImpl client = connection.client(); |
|
180 content = new ResponseContent( |
231 content = new ResponseContent( |
181 connection, clen, headers, pusher, |
232 connection, clen, headers, pusher, |
182 this::onFinished |
233 this::onFinished |
183 ); |
234 ); |
184 if (cf.isCompletedExceptionally()) { |
235 if (cf.isCompletedExceptionally()) { |
187 return; |
238 return; |
188 } |
239 } |
189 // increment the reference count on the HttpClientImpl |
240 // increment the reference count on the HttpClientImpl |
190 // to prevent the SelectorManager thread from exiting until |
241 // to prevent the SelectorManager thread from exiting until |
191 // the body is fully read. |
242 // the body is fully read. |
192 client.reference(); |
243 refCountTracker.acquire(); |
193 bodyReader.start(content.getBodyParser( |
244 bodyReader.start(content.getBodyParser( |
194 (t) -> { |
245 (t) -> { |
195 try { |
246 try { |
196 if (t != null) { |
247 if (t != null) { |
197 pusher.onError(t); |
248 pusher.onError(t); |
198 connection.close(); |
249 connection.close(); |
199 if (!cf.isDone()) |
250 if (!cf.isDone()) |
200 cf.completeExceptionally(t); |
251 cf.completeExceptionally(t); |
201 } |
252 } |
202 } finally { |
253 } finally { |
203 // decrement the reference count on the HttpClientImpl |
|
204 // to allow the SelectorManager thread to exit if no |
|
205 // other operation is pending and the facade is no |
|
206 // longer referenced. |
|
207 client.unreference(); |
|
208 bodyReader.onComplete(t); |
254 bodyReader.onComplete(t); |
209 } |
255 } |
210 })); |
256 })); |
211 CompletableFuture<State> bodyReaderCF = bodyReader.completion(); |
257 CompletableFuture<State> bodyReaderCF = bodyReader.completion(); |
212 asyncReceiver.subscribe(bodyReader); |
258 asyncReceiver.subscribe(bodyReader); |
214 // Make sure to keep a reference to asyncReceiver from |
260 // Make sure to keep a reference to asyncReceiver from |
215 // within this |
261 // within this |
216 CompletableFuture<?> trailingOp = bodyReaderCF.whenComplete((s,t) -> { |
262 CompletableFuture<?> trailingOp = bodyReaderCF.whenComplete((s,t) -> { |
217 t = Utils.getCompletionCause(t); |
263 t = Utils.getCompletionCause(t); |
218 try { |
264 try { |
219 if (t != null) { |
265 if (t == null) { |
220 debug.log(Level.DEBUG, () -> |
266 debug.log(Level.DEBUG, () -> |
221 "Finished reading body: " + s); |
267 "Finished reading body: " + s); |
222 assert s == State.READING_BODY; |
268 assert s == State.READING_BODY; |
223 } |
269 } |
224 if (t != null && !cf.isDone()) { |
270 if (t != null && !cf.isDone()) { |
226 cf.completeExceptionally(t); |
272 cf.completeExceptionally(t); |
227 } |
273 } |
228 } catch (Throwable x) { |
274 } catch (Throwable x) { |
229 // not supposed to happen |
275 // not supposed to happen |
230 asyncReceiver.onReadError(x); |
276 asyncReceiver.onReadError(x); |
|
277 } finally { |
|
278 // we're done: release the ref count for |
|
279 // the current operation. |
|
280 refCountTracker.tryRelease(); |
231 } |
281 } |
232 }); |
282 }); |
233 connection.addTrailingOperation(trailingOp); |
283 connection.addTrailingOperation(trailingOp); |
234 } catch (Throwable t) { |
284 } catch (Throwable t) { |
235 debug.log(Level.DEBUG, () -> "Failed reading body: " + t); |
285 debug.log(Level.DEBUG, () -> "Failed reading body: " + t); |
241 } finally { |
291 } finally { |
242 asyncReceiver.onReadError(t); |
292 asyncReceiver.onReadError(t); |
243 } |
293 } |
244 } |
294 } |
245 }); |
295 }); |
246 p.getBody().whenComplete((U u, Throwable t) -> { |
296 try { |
247 if (t == null) |
297 p.getBody().whenComplete((U u, Throwable t) -> { |
248 cf.complete(u); |
298 if (t == null) |
249 else |
299 cf.complete(u); |
250 cf.completeExceptionally(t); |
300 else |
|
301 cf.completeExceptionally(t); |
|
302 }); |
|
303 } catch (Throwable t) { |
|
304 cf.completeExceptionally(t); |
|
305 asyncReceiver.setRetryOnError(false); |
|
306 asyncReceiver.onReadError(t); |
|
307 } |
|
308 |
|
309 return cf.whenComplete((s,t) -> { |
|
310 if (t != null) { |
|
311 // If an exception occurred, release the |
|
312 // ref count for the current operation, as |
|
313 // it may never be triggered otherwise |
|
314 // (BodySubscriber ofInputStream) |
|
315 // If there was no exception then the |
|
316 // ref count will be/have been released when |
|
317 // the last byte of the response is/was received |
|
318 refCountTracker.tryRelease(); |
|
319 } |
251 }); |
320 }); |
252 |
|
253 return cf; |
|
254 } |
321 } |
255 |
322 |
256 |
323 |
257 private void onFinished() { |
324 private void onFinished() { |
258 asyncReceiver.clear(); |
325 asyncReceiver.clear(); |