34 import java.util.concurrent.CompletionException; |
34 import java.util.concurrent.CompletionException; |
35 import java.util.concurrent.ExecutionException; |
35 import java.util.concurrent.ExecutionException; |
36 import java.util.concurrent.Executor; |
36 import java.util.concurrent.Executor; |
37 import java.util.concurrent.atomic.AtomicInteger; |
37 import java.util.concurrent.atomic.AtomicInteger; |
38 import java.util.function.Function; |
38 import java.util.function.Function; |
|
39 import jdk.incubator.http.HttpResponse.PushPromiseHandler; |
39 import jdk.incubator.http.HttpResponse.UntrustedBodyHandler; |
40 import jdk.incubator.http.HttpResponse.UntrustedBodyHandler; |
40 import jdk.incubator.http.internal.common.Log; |
41 import jdk.incubator.http.internal.common.Log; |
41 import jdk.incubator.http.internal.common.MinimalFuture; |
42 import jdk.incubator.http.internal.common.MinimalFuture; |
42 import jdk.incubator.http.internal.common.ConnectionExpiredException; |
43 import jdk.incubator.http.internal.common.ConnectionExpiredException; |
43 import jdk.incubator.http.internal.common.Utils; |
44 import jdk.incubator.http.internal.common.Utils; |
50 * - retries due to filters. |
51 * - retries due to filters. |
51 * - I/O errors and most other exceptions get returned directly to user |
52 * - I/O errors and most other exceptions get returned directly to user |
52 * |
53 * |
53 * Creates a new Exchange for each request/response interaction |
54 * Creates a new Exchange for each request/response interaction |
54 */ |
55 */ |
55 class MultiExchange<U,T> { |
56 class MultiExchange<T> { |
56 |
57 |
57 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. |
58 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. |
58 static final System.Logger DEBUG_LOGGER = |
59 static final System.Logger DEBUG_LOGGER = |
59 Utils.getDebugLogger("MultiExchange"::toString, DEBUG); |
60 Utils.getDebugLogger("MultiExchange"::toString, DEBUG); |
60 |
61 |
62 private final HttpRequestImpl request; // a copy of the user request |
63 private final HttpRequestImpl request; // a copy of the user request |
63 final AccessControlContext acc; |
64 final AccessControlContext acc; |
64 final HttpClientImpl client; |
65 final HttpClientImpl client; |
65 final HttpResponse.BodyHandler<T> responseHandler; |
66 final HttpResponse.BodyHandler<T> responseHandler; |
66 final Executor executor; |
67 final Executor executor; |
67 final HttpResponse.MultiSubscriber<U,T> multiResponseSubscriber; |
|
68 final AtomicInteger attempts = new AtomicInteger(); |
68 final AtomicInteger attempts = new AtomicInteger(); |
69 HttpRequestImpl currentreq; // used for async only |
69 HttpRequestImpl currentreq; // used for async only |
70 Exchange<T> exchange; // the current exchange |
70 Exchange<T> exchange; // the current exchange |
71 Exchange<T> previous; |
71 Exchange<T> previous; |
72 volatile Throwable retryCause; |
72 volatile Throwable retryCause; |
82 ); |
82 ); |
83 |
83 |
84 private final List<HeaderFilter> filters; |
84 private final List<HeaderFilter> filters; |
85 TimedEvent timedEvent; |
85 TimedEvent timedEvent; |
86 volatile boolean cancelled; |
86 volatile boolean cancelled; |
87 final PushGroup<U,T> pushGroup; |
87 final PushGroup<T> pushGroup; |
88 |
88 |
89 /** |
89 /** |
90 * Filter fields. These are attached as required by filters |
90 * Filter fields. These are attached as required by filters |
91 * and only used by the filter implementations. This could be |
91 * and only used by the filter implementations. This could be |
92 * generalised into Objects that are passed explicitly to the filters |
92 * generalised into Objects that are passed explicitly to the filters |
101 */ |
101 */ |
102 MultiExchange(HttpRequest userRequest, |
102 MultiExchange(HttpRequest userRequest, |
103 HttpRequestImpl requestImpl, |
103 HttpRequestImpl requestImpl, |
104 HttpClientImpl client, |
104 HttpClientImpl client, |
105 HttpResponse.BodyHandler<T> responseHandler, |
105 HttpResponse.BodyHandler<T> responseHandler, |
|
106 PushPromiseHandler<T> pushPromiseHandler, |
106 AccessControlContext acc) { |
107 AccessControlContext acc) { |
107 this.previous = null; |
108 this.previous = null; |
108 this.userRequest = userRequest; |
109 this.userRequest = userRequest; |
109 this.request = requestImpl; |
110 this.request = requestImpl; |
110 this.currentreq = request; |
111 this.currentreq = request; |
116 if (acc != null) { |
117 if (acc != null) { |
117 // Restricts the file publisher with the senders ACC, if any |
118 // Restricts the file publisher with the senders ACC, if any |
118 if (responseHandler instanceof UntrustedBodyHandler) |
119 if (responseHandler instanceof UntrustedBodyHandler) |
119 ((UntrustedBodyHandler)this.responseHandler).setAccessControlContext(acc); |
120 ((UntrustedBodyHandler)this.responseHandler).setAccessControlContext(acc); |
120 } |
121 } |
|
122 |
|
123 if (pushPromiseHandler != null) { |
|
124 this.pushGroup = new PushGroup<>(pushPromiseHandler, request, acc); |
|
125 } else { |
|
126 pushGroup = null; |
|
127 } |
|
128 |
121 this.exchange = new Exchange<>(request, this); |
129 this.exchange = new Exchange<>(request, this); |
122 this.multiResponseSubscriber = null; |
130 } |
123 this.pushGroup = null; |
|
124 } |
|
125 |
|
126 /** |
|
127 * MultiExchange with multiple responses (HTTP/2 server pushes). |
|
128 */ |
|
129 MultiExchange(HttpRequest userRequest, |
|
130 HttpRequestImpl requestImpl, |
|
131 HttpClientImpl client, |
|
132 HttpResponse.MultiSubscriber<U, T> multiResponseSubscriber, |
|
133 AccessControlContext acc) { |
|
134 this.previous = null; |
|
135 this.userRequest = userRequest; |
|
136 this.request = requestImpl; |
|
137 this.currentreq = request; |
|
138 this.client = client; |
|
139 this.filters = client.filterChain(); |
|
140 this.acc = acc; |
|
141 this.executor = client.theExecutor(); |
|
142 this.multiResponseSubscriber = multiResponseSubscriber; |
|
143 this.pushGroup = new PushGroup<>(multiResponseSubscriber, request, acc); |
|
144 this.exchange = new Exchange<>(request, this); |
|
145 this.responseHandler = pushGroup.mainResponseHandler(); |
|
146 } |
|
147 |
|
148 // CompletableFuture<Void> multiCompletionCF() { |
|
149 // return pushGroup.groupResult(); |
|
150 // } |
|
151 |
131 |
152 private synchronized Exchange<T> getExchange() { |
132 private synchronized Exchange<T> getExchange() { |
153 return exchange; |
133 return exchange; |
154 } |
134 } |
155 |
135 |
156 HttpClientImpl client() { |
136 HttpClientImpl client() { |
157 return client; |
137 return client; |
158 } |
138 } |
159 |
|
160 // HttpClient.Redirect followRedirects() { |
|
161 // return client.followRedirects(); |
|
162 // } |
|
163 |
139 |
164 HttpClient.Version version() { |
140 HttpClient.Version version() { |
165 return request.version().orElse(client.version()); |
141 return request.version().orElse(client.version()); |
166 } |
142 } |
167 |
143 |
229 this.response = |
205 this.response = |
230 new HttpResponseImpl<>(userRequest, r, this.response, body, exch); |
206 new HttpResponseImpl<>(userRequest, r, this.response, body, exch); |
231 return this.response; |
207 return this.response; |
232 }); |
208 }); |
233 }); |
209 }); |
234 } |
|
235 |
|
236 CompletableFuture<U> multiResponseAsync() { |
|
237 CompletableFuture<Void> start = new MinimalFuture<>(); |
|
238 CompletableFuture<HttpResponse<T>> cf = responseAsync0(start); |
|
239 CompletableFuture<HttpResponse<T>> mainResponse = |
|
240 cf.thenApply(b -> { |
|
241 multiResponseSubscriber.onResponse(b); |
|
242 pushGroup.noMorePushes(true); |
|
243 return b; }); |
|
244 pushGroup.setMainResponse(mainResponse); |
|
245 CompletableFuture<U> res = multiResponseSubscriber.completion(pushGroup.groupResult(), |
|
246 pushGroup.pushesCF()); |
|
247 start.completeAsync( () -> null, executor); // trigger execution |
|
248 return res; |
|
249 } |
210 } |
250 |
211 |
251 private CompletableFuture<Response> responseAsyncImpl() { |
212 private CompletableFuture<Response> responseAsyncImpl() { |
252 CompletableFuture<Response> cf; |
213 CompletableFuture<Response> cf; |
253 if (attempts.incrementAndGet() > max_attempts) { |
214 if (attempts.incrementAndGet() > max_attempts) { |