24 */ |
24 */ |
25 |
25 |
26 package jdk.incubator.http; |
26 package jdk.incubator.http; |
27 |
27 |
28 import java.io.IOException; |
28 import java.io.IOException; |
|
29 import java.lang.System.Logger.Level; |
29 import java.time.Duration; |
30 import java.time.Duration; |
30 import java.util.List; |
31 import java.util.List; |
31 import java.security.AccessControlContext; |
32 import java.security.AccessControlContext; |
32 import java.security.AccessController; |
|
33 import java.util.concurrent.CompletableFuture; |
33 import java.util.concurrent.CompletableFuture; |
34 import java.util.concurrent.CompletionException; |
34 import java.util.concurrent.CompletionException; |
35 import java.util.concurrent.ExecutionException; |
35 import java.util.concurrent.ExecutionException; |
36 import java.util.function.BiFunction; |
|
37 import java.util.concurrent.Executor; |
36 import java.util.concurrent.Executor; |
|
37 import java.util.concurrent.atomic.AtomicInteger; |
38 import java.util.function.UnaryOperator; |
38 import java.util.function.UnaryOperator; |
39 |
39 import jdk.incubator.http.HttpResponse.UntrustedBodyHandler; |
40 import jdk.incubator.http.internal.common.Log; |
40 import jdk.incubator.http.internal.common.Log; |
41 import jdk.incubator.http.internal.common.MinimalFuture; |
41 import jdk.incubator.http.internal.common.MinimalFuture; |
42 import jdk.incubator.http.internal.common.Pair; |
42 import jdk.incubator.http.internal.common.ConnectionExpiredException; |
43 import jdk.incubator.http.internal.common.Utils; |
43 import jdk.incubator.http.internal.common.Utils; |
44 import static jdk.incubator.http.internal.common.Pair.pair; |
44 import static jdk.incubator.http.internal.common.MinimalFuture.completedFuture; |
|
45 import static jdk.incubator.http.internal.common.MinimalFuture.failedFuture; |
45 |
46 |
46 /** |
47 /** |
47 * Encapsulates multiple Exchanges belonging to one HttpRequestImpl. |
48 * Encapsulates multiple Exchanges belonging to one HttpRequestImpl. |
48 * - manages filters |
49 * - manages filters |
49 * - retries due to filters. |
50 * - retries due to filters. |
51 * |
52 * |
52 * Creates a new Exchange for each request/response interaction |
53 * Creates a new Exchange for each request/response interaction |
53 */ |
54 */ |
54 class MultiExchange<U,T> { |
55 class MultiExchange<U,T> { |
55 |
56 |
|
57 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. |
|
58 static final System.Logger DEBUG_LOGGER = |
|
59 Utils.getDebugLogger("MultiExchange"::toString, DEBUG); |
|
60 |
56 private final HttpRequest userRequest; // the user request |
61 private final HttpRequest userRequest; // the user request |
57 private final HttpRequestImpl request; // a copy of the user request |
62 private final HttpRequestImpl request; // a copy of the user request |
58 final AccessControlContext acc; |
63 final AccessControlContext acc; |
59 final HttpClientImpl client; |
64 final HttpClientImpl client; |
60 final HttpResponse.BodyHandler<T> responseHandler; |
65 final HttpResponse.BodyHandler<T> responseHandler; |
61 final ExecutorWrapper execWrapper; |
|
62 final Executor executor; |
66 final Executor executor; |
63 final HttpResponse.MultiProcessor<U,T> multiResponseHandler; |
67 final HttpResponse.MultiSubscriber<U,T> multiResponseSubscriber; |
|
68 final AtomicInteger attempts = new AtomicInteger(); |
64 HttpRequestImpl currentreq; // used for async only |
69 HttpRequestImpl currentreq; // used for async only |
65 Exchange<T> exchange; // the current exchange |
70 Exchange<T> exchange; // the current exchange |
66 Exchange<T> previous; |
71 Exchange<T> previous; |
67 int attempts; |
72 volatile Throwable retryCause; |
|
73 volatile boolean expiredOnce; |
|
74 |
68 // Maximum number of times a request will be retried/redirected |
75 // Maximum number of times a request will be retried/redirected |
69 // for any reason |
76 // for any reason |
70 |
77 |
71 static final int DEFAULT_MAX_ATTEMPTS = 5; |
78 static final int DEFAULT_MAX_ATTEMPTS = 5; |
72 static final int max_attempts = Utils.getIntegerNetProperty( |
79 static final int max_attempts = Utils.getIntegerNetProperty( |
91 /** |
98 /** |
92 * MultiExchange with one final response. |
99 * MultiExchange with one final response. |
93 */ |
100 */ |
94 MultiExchange(HttpRequest req, |
101 MultiExchange(HttpRequest req, |
95 HttpClientImpl client, |
102 HttpClientImpl client, |
96 HttpResponse.BodyHandler<T> responseHandler) { |
103 HttpResponse.BodyHandler<T> responseHandler, |
|
104 AccessControlContext acc) { |
97 this.previous = null; |
105 this.previous = null; |
98 this.userRequest = req; |
106 this.userRequest = req; |
99 this.request = new HttpRequestImpl(req); |
107 this.request = new HttpRequestImpl(req, acc); |
100 this.currentreq = request; |
108 this.currentreq = request; |
101 this.attempts = 0; |
|
102 this.client = client; |
109 this.client = client; |
103 this.filters = client.filterChain(); |
110 this.filters = client.filterChain(); |
104 if (System.getSecurityManager() != null) { |
111 this.acc = acc; |
105 this.acc = AccessController.getContext(); |
112 this.executor = client.theExecutor(); |
106 } else { |
|
107 this.acc = null; |
|
108 } |
|
109 this.execWrapper = new ExecutorWrapper(client.executor(), acc); |
|
110 this.executor = execWrapper.executor(); |
|
111 this.responseHandler = responseHandler; |
113 this.responseHandler = responseHandler; |
|
114 if (acc != null) { |
|
115 // Restricts the file publisher with the senders ACC, if any |
|
116 if (responseHandler instanceof UntrustedBodyHandler) |
|
117 ((UntrustedBodyHandler)this.responseHandler).setAccessControlContext(acc); |
|
118 } |
112 this.exchange = new Exchange<>(request, this); |
119 this.exchange = new Exchange<>(request, this); |
113 this.multiResponseHandler = null; |
120 this.multiResponseSubscriber = null; |
114 this.pushGroup = null; |
121 this.pushGroup = null; |
115 } |
122 } |
116 |
123 |
117 /** |
124 /** |
118 * MultiExchange with multiple responses (HTTP/2 server pushes). |
125 * MultiExchange with multiple responses (HTTP/2 server pushes). |
119 */ |
126 */ |
120 MultiExchange(HttpRequest req, |
127 MultiExchange(HttpRequest req, |
121 HttpClientImpl client, |
128 HttpClientImpl client, |
122 HttpResponse.MultiProcessor<U, T> multiResponseHandler) { |
129 HttpResponse.MultiSubscriber<U, T> multiResponseSubscriber, |
|
130 AccessControlContext acc) { |
123 this.previous = null; |
131 this.previous = null; |
124 this.userRequest = req; |
132 this.userRequest = req; |
125 this.request = new HttpRequestImpl(req); |
133 this.request = new HttpRequestImpl(req, acc); |
126 this.currentreq = request; |
134 this.currentreq = request; |
127 this.attempts = 0; |
|
128 this.client = client; |
135 this.client = client; |
129 this.filters = client.filterChain(); |
136 this.filters = client.filterChain(); |
130 if (System.getSecurityManager() != null) { |
137 this.acc = acc; |
131 this.acc = AccessController.getContext(); |
138 this.executor = client.theExecutor(); |
132 } else { |
139 this.multiResponseSubscriber = multiResponseSubscriber; |
133 this.acc = null; |
140 this.pushGroup = new PushGroup<>(multiResponseSubscriber, request, acc); |
134 } |
|
135 this.execWrapper = new ExecutorWrapper(client.executor(), acc); |
|
136 this.executor = execWrapper.executor(); |
|
137 this.multiResponseHandler = multiResponseHandler; |
|
138 this.pushGroup = new PushGroup<>(multiResponseHandler, request); |
|
139 this.exchange = new Exchange<>(request, this); |
141 this.exchange = new Exchange<>(request, this); |
140 this.responseHandler = pushGroup.mainResponseHandler(); |
142 this.responseHandler = pushGroup.mainResponseHandler(); |
141 } |
143 } |
142 |
144 |
143 public HttpResponseImpl<T> response() throws IOException, InterruptedException { |
|
144 HttpRequestImpl r = request; |
|
145 if (r.duration() != null) { |
|
146 timedEvent = new TimedEvent(r.duration()); |
|
147 client.registerTimer(timedEvent); |
|
148 } |
|
149 while (attempts < max_attempts) { |
|
150 try { |
|
151 attempts++; |
|
152 Exchange<T> currExchange = getExchange(); |
|
153 requestFilters(r); |
|
154 Response response = currExchange.response(); |
|
155 HttpRequestImpl newreq = responseFilters(response); |
|
156 if (newreq == null) { |
|
157 if (attempts > 1) { |
|
158 Log.logError("Succeeded on attempt: " + attempts); |
|
159 } |
|
160 T body = currExchange.readBody(responseHandler); |
|
161 cancelTimer(); |
|
162 return new HttpResponseImpl<>(userRequest, response, body, currExchange); |
|
163 } |
|
164 //response.body(HttpResponse.ignoreBody()); |
|
165 setExchange(new Exchange<>(newreq, this, acc)); |
|
166 r = newreq; |
|
167 } catch (IOException e) { |
|
168 if (cancelled) { |
|
169 throw new HttpTimeoutException("Request timed out"); |
|
170 } |
|
171 throw e; |
|
172 } |
|
173 } |
|
174 cancelTimer(); |
|
175 throw new IOException("Retry limit exceeded"); |
|
176 } |
|
177 |
|
178 CompletableFuture<Void> multiCompletionCF() { |
145 CompletableFuture<Void> multiCompletionCF() { |
179 return pushGroup.groupResult(); |
146 return pushGroup.groupResult(); |
180 } |
147 } |
181 |
148 |
182 private synchronized Exchange<T> getExchange() { |
149 private synchronized Exchange<T> getExchange() { |
237 public void cancel(IOException cause) { |
207 public void cancel(IOException cause) { |
238 cancelled = true; |
208 cancelled = true; |
239 getExchange().cancel(cause); |
209 getExchange().cancel(cause); |
240 } |
210 } |
241 |
211 |
242 public CompletableFuture<HttpResponseImpl<T>> responseAsync() { |
212 public CompletableFuture<HttpResponse<T>> responseAsync() { |
243 CompletableFuture<Void> start = new MinimalFuture<>(); |
213 CompletableFuture<Void> start = new MinimalFuture<>(); |
244 CompletableFuture<HttpResponseImpl<T>> cf = responseAsync0(start); |
214 CompletableFuture<HttpResponse<T>> cf = responseAsync0(start); |
245 start.completeAsync( () -> null, executor); // trigger execution |
215 start.completeAsync( () -> null, executor); // trigger execution |
246 return cf; |
216 return cf; |
247 } |
217 } |
248 |
218 |
249 private CompletableFuture<HttpResponseImpl<T>> responseAsync0(CompletableFuture<Void> start) { |
219 private CompletableFuture<HttpResponse<T>> |
|
220 responseAsync0(CompletableFuture<Void> start) { |
250 return start.thenCompose( v -> responseAsyncImpl()) |
221 return start.thenCompose( v -> responseAsyncImpl()) |
251 .thenCompose((Response r) -> { |
222 .thenCompose((Response r) -> { |
252 Exchange<T> exch = getExchange(); |
223 Exchange<T> exch = getExchange(); |
253 return exch.readBodyAsync(responseHandler) |
224 return exch.readBodyAsync(responseHandler) |
254 .thenApply((T body) -> new HttpResponseImpl<>(userRequest, r, body, exch)); |
225 .thenApply((T body) -> |
255 }); |
226 new HttpResponseImpl<>(userRequest, |
|
227 r, |
|
228 body, |
|
229 exch)); |
|
230 }); |
256 } |
231 } |
257 |
232 |
258 CompletableFuture<U> multiResponseAsync() { |
233 CompletableFuture<U> multiResponseAsync() { |
259 CompletableFuture<Void> start = new MinimalFuture<>(); |
234 CompletableFuture<Void> start = new MinimalFuture<>(); |
260 CompletableFuture<HttpResponseImpl<T>> cf = responseAsync0(start); |
235 CompletableFuture<HttpResponse<T>> cf = responseAsync0(start); |
261 CompletableFuture<HttpResponse<T>> mainResponse = |
236 CompletableFuture<HttpResponse<T>> mainResponse = |
262 cf.thenApply((HttpResponseImpl<T> b) -> { |
237 cf.thenApply(b -> { |
263 multiResponseHandler.onResponse(b); |
238 multiResponseSubscriber.onResponse(b); |
264 return (HttpResponse<T>)b; |
239 pushGroup.noMorePushes(true); |
265 }); |
240 return b; }); |
266 |
|
267 pushGroup.setMainResponse(mainResponse); |
241 pushGroup.setMainResponse(mainResponse); |
268 // set up house-keeping related to multi-response |
242 CompletableFuture<U> res = multiResponseSubscriber.completion(pushGroup.groupResult(), |
269 mainResponse.thenAccept((r) -> { |
243 pushGroup.pushesCF()); |
270 // All push promises received by now. |
|
271 pushGroup.noMorePushes(true); |
|
272 }); |
|
273 CompletableFuture<U> res = multiResponseHandler.completion(pushGroup.groupResult(), pushGroup.pushesCF()); |
|
274 start.completeAsync( () -> null, executor); // trigger execution |
244 start.completeAsync( () -> null, executor); // trigger execution |
275 return res; |
245 return res; |
276 } |
246 } |
277 |
247 |
278 private CompletableFuture<Response> responseAsyncImpl() { |
248 private CompletableFuture<Response> responseAsyncImpl() { |
279 CompletableFuture<Response> cf; |
249 CompletableFuture<Response> cf; |
280 if (++attempts > max_attempts) { |
250 if (attempts.incrementAndGet() > max_attempts) { |
281 cf = MinimalFuture.failedFuture(new IOException("Too many retries")); |
251 cf = failedFuture(new IOException("Too many retries", retryCause)); |
282 } else { |
252 } else { |
283 if (currentreq.duration() != null) { |
253 if (currentreq.timeout().isPresent()) { |
284 timedEvent = new TimedEvent(currentreq.duration()); |
254 timedEvent = new TimedEvent(currentreq.timeout().get()); |
285 client.registerTimer(timedEvent); |
255 client.registerTimer(timedEvent); |
286 } |
256 } |
287 try { |
257 try { |
288 // 1. Apply request filters |
258 // 1. apply request filters |
289 requestFilters(currentreq); |
259 requestFilters(currentreq); |
290 } catch (IOException e) { |
260 } catch (IOException e) { |
291 return MinimalFuture.failedFuture(e); |
261 return failedFuture(e); |
292 } |
262 } |
293 Exchange<T> exch = getExchange(); |
263 Exchange<T> exch = getExchange(); |
294 // 2. get response |
264 // 2. get response |
295 cf = exch.responseAsync() |
265 cf = exch.responseAsync() |
296 .thenCompose((Response response) -> { |
266 .thenCompose((Response response) -> { |
297 HttpRequestImpl newrequest = null; |
267 HttpRequestImpl newrequest; |
298 try { |
268 try { |
299 // 3. Apply response filters |
269 // 3. apply response filters |
300 newrequest = responseFilters(response); |
270 newrequest = responseFilters(response); |
301 } catch (IOException e) { |
271 } catch (IOException e) { |
302 return MinimalFuture.failedFuture(e); |
272 return failedFuture(e); |
303 } |
|
304 // 4. Check filter result and repeat or continue |
|
305 if (newrequest == null) { |
|
306 if (attempts > 1) { |
|
307 Log.logError("Succeeded on attempt: " + attempts); |
|
308 } |
273 } |
309 return MinimalFuture.completedFuture(response); |
274 // 4. check filter result and repeat or continue |
310 } else { |
275 if (newrequest == null) { |
311 currentreq = newrequest; |
276 if (attempts.get() > 1) { |
312 setExchange(new Exchange<>(currentreq, this, acc)); |
277 Log.logError("Succeeded on attempt: " + attempts); |
313 //reads body off previous, and then waits for next response |
278 } |
314 return responseAsyncImpl(); |
279 return completedFuture(response); |
315 } |
280 } else { |
316 }) |
281 currentreq = newrequest; |
317 // 5. Handle errors and cancel any timer set |
282 expiredOnce = false; |
318 .handle((response, ex) -> { |
283 setExchange(new Exchange<>(currentreq, this, acc)); |
319 cancelTimer(); |
284 //reads body off previous, and then waits for next response |
320 if (ex == null) { |
285 return responseAsyncImpl(); |
321 assert response != null; |
286 } }) |
322 return MinimalFuture.completedFuture(response); |
287 .handle((response, ex) -> { |
323 } |
288 // 5. handle errors and cancel any timer set |
324 // all exceptions thrown are handled here |
289 cancelTimer(); |
325 CompletableFuture<Response> error = getExceptionalCF(ex); |
290 if (ex == null) { |
326 if (error == null) { |
291 assert response != null; |
327 return responseAsyncImpl(); |
292 return completedFuture(response); |
328 } else { |
293 } |
329 return error; |
294 // all exceptions thrown are handled here |
330 } |
295 CompletableFuture<Response> errorCF = getExceptionalCF(ex); |
331 }) |
296 if (errorCF == null) { |
332 .thenCompose(UnaryOperator.identity()); |
297 return responseAsyncImpl(); |
|
298 } else { |
|
299 return errorCF; |
|
300 } }) |
|
301 .thenCompose(UnaryOperator.identity()); |
333 } |
302 } |
334 return cf; |
303 return cf; |
335 } |
304 } |
336 |
305 |
337 /** |
306 /** |
338 * Take a Throwable and return a suitable CompletableFuture that is |
307 * Takes a Throwable and returns a suitable CompletableFuture that is |
339 * completed exceptionally. |
308 * completed exceptionally, or null. |
340 */ |
309 */ |
341 private CompletableFuture<Response> getExceptionalCF(Throwable t) { |
310 private CompletableFuture<Response> getExceptionalCF(Throwable t) { |
342 if ((t instanceof CompletionException) || (t instanceof ExecutionException)) { |
311 if ((t instanceof CompletionException) || (t instanceof ExecutionException)) { |
343 if (t.getCause() != null) { |
312 if (t.getCause() != null) { |
344 t = t.getCause(); |
313 t = t.getCause(); |
345 } |
314 } |
346 } |
315 } |
347 if (cancelled && t instanceof IOException) { |
316 if (cancelled && t instanceof IOException) { |
348 t = new HttpTimeoutException("request timed out"); |
317 t = new HttpTimeoutException("request timed out"); |
349 } |
318 } else if (t instanceof ConnectionExpiredException) { |
350 return MinimalFuture.failedFuture(t); |
319 // allow the retry mechanism to do its work |
|
320 // ####: method (GET,HEAD, not POST?), no bytes written or read ( differentiate? ) |
|
321 if (t.getCause() != null) retryCause = t.getCause(); |
|
322 if (!expiredOnce) { |
|
323 DEBUG_LOGGER.log(Level.DEBUG, |
|
324 "MultiExchange: ConnectionExpiredException (async): retrying...", |
|
325 t); |
|
326 expiredOnce = true; |
|
327 return null; |
|
328 } else { |
|
329 DEBUG_LOGGER.log(Level.DEBUG, |
|
330 "MultiExchange: ConnectionExpiredException (async): already retried once.", |
|
331 t); |
|
332 if (t.getCause() != null) t = t.getCause(); |
|
333 } |
|
334 } |
|
335 return failedFuture(t); |
351 } |
336 } |
352 |
337 |
353 class TimedEvent extends TimeoutEvent { |
338 class TimedEvent extends TimeoutEvent { |
354 TimedEvent(Duration duration) { |
339 TimedEvent(Duration duration) { |
355 super(duration); |
340 super(duration); |
356 } |
341 } |
357 @Override |
342 @Override |
358 public void handle() { |
343 public void handle() { |
|
344 DEBUG_LOGGER.log(Level.DEBUG, |
|
345 "Cancelling MultiExchange due to timeout for request %s", |
|
346 request); |
359 cancel(new HttpTimeoutException("request timed out")); |
347 cancel(new HttpTimeoutException("request timed out")); |
360 } |
348 } |
361 } |
349 } |
362 } |
350 } |