66 |
73 |
67 /** |
74 /** |
68 * Initiates a new exchange and assigns it to a connection if one exists |
75 * Initiates a new exchange and assigns it to a connection if one exists |
69 * already. connection usually null. |
76 * already. connection usually null. |
70 */ |
77 */ |
71 static <U> ExchangeImpl<U> get(Exchange<U> exchange, HttpConnection connection) |
78 static <U> CompletableFuture<? extends ExchangeImpl<U>> |
72 throws IOException, InterruptedException |
79 get(Exchange<U> exchange, HttpConnection connection) |
73 { |
80 { |
74 HttpRequestImpl req = exchange.request(); |
81 HttpRequestImpl req = exchange.request(); |
75 if (exchange.version() == HTTP_1_1) { |
82 if (exchange.version() == HTTP_1_1) { |
76 return new Http1Exchange<>(exchange, connection); |
83 DEBUG_LOGGER.log(Level.DEBUG, "get: HTTP/1.1: new Http1Exchange"); |
|
84 return createHttp1Exchange(exchange, connection); |
77 } else { |
85 } else { |
78 Http2ClientImpl c2 = exchange.client().client2(); // TODO: improve |
86 Http2ClientImpl c2 = exchange.client().client2(); // TODO: improve |
79 HttpRequestImpl request = exchange.request(); |
87 HttpRequestImpl request = exchange.request(); |
80 Http2Connection c; |
88 CompletableFuture<Http2Connection> c2f = c2.getConnectionFor(request); |
81 try { |
89 DEBUG_LOGGER.log(Level.DEBUG, "get: Trying to get HTTP/2 connection"); |
82 c = c2.getConnectionFor(request); |
90 return c2f.handle((h2c, t) -> createExchangeImpl(h2c, t, exchange, connection)) |
83 } catch (Http2Connection.ALPNException e) { |
91 .thenCompose(Function.identity()); |
84 // failed to negotiate "h2" |
92 } |
85 AbstractAsyncSSLConnection as = e.getConnection(); |
93 } |
86 as.stopAsyncReading(); |
94 |
87 HttpConnection sslc = as.downgrade(); |
95 private static <U> CompletableFuture<? extends ExchangeImpl<U>> |
88 ExchangeImpl<U> ex = new Http1Exchange<>(exchange, sslc); |
96 createExchangeImpl(Http2Connection c, |
|
97 Throwable t, |
|
98 Exchange<U> exchange, |
|
99 HttpConnection connection) |
|
100 { |
|
101 DEBUG_LOGGER.log(Level.DEBUG, "handling HTTP/2 connection creation result"); |
|
102 if (t != null) { |
|
103 DEBUG_LOGGER.log(Level.DEBUG, |
|
104 "handling HTTP/2 connection creation failed: %s", |
|
105 (Object)t); |
|
106 t = Utils.getCompletionCause(t); |
|
107 if (t instanceof Http2Connection.ALPNException) { |
|
108 Http2Connection.ALPNException ee = (Http2Connection.ALPNException)t; |
|
109 AbstractAsyncSSLConnection as = ee.getConnection(); |
|
110 DEBUG_LOGGER.log(Level.DEBUG, "downgrading to HTTP/1.1 with: %s", as); |
|
111 CompletableFuture<? extends ExchangeImpl<U>> ex = |
|
112 createHttp1Exchange(exchange, as); |
89 return ex; |
113 return ex; |
|
114 } else { |
|
115 DEBUG_LOGGER.log(Level.DEBUG, "HTTP/2 connection creation failed " |
|
116 + "with unexpected exception: %s", (Object)t); |
|
117 return CompletableFuture.failedFuture(t); |
90 } |
118 } |
91 if (c == null) { |
119 } |
92 // no existing connection. Send request with HTTP 1 and then |
120 if (c == null) { |
93 // upgrade if successful |
121 // no existing connection. Send request with HTTP 1 and then |
94 ExchangeImpl<U> ex = new Http1Exchange<>(exchange, connection); |
122 // upgrade if successful |
95 exchange.h2Upgrade(); |
123 DEBUG_LOGGER.log(Level.DEBUG, "new Http1Exchange, try to upgrade"); |
96 return ex; |
124 return createHttp1Exchange(exchange, connection) |
97 } |
125 .thenApply((e) -> { |
98 return c.createStream(exchange); |
126 exchange.h2Upgrade(); |
|
127 return e; |
|
128 }); |
|
129 } else { |
|
130 DEBUG_LOGGER.log(Level.DEBUG, "creating HTTP/2 streams"); |
|
131 Stream<U> s = c.createStream(exchange); |
|
132 CompletableFuture<? extends ExchangeImpl<U>> ex = MinimalFuture.completedFuture(s); |
|
133 return ex; |
|
134 } |
|
135 } |
|
136 |
|
137 private static <T> CompletableFuture<Http1Exchange<T>> |
|
138 createHttp1Exchange(Exchange<T> ex, HttpConnection as) |
|
139 { |
|
140 try { |
|
141 return MinimalFuture.completedFuture(new Http1Exchange<>(ex, as)); |
|
142 } catch (Throwable e) { |
|
143 return MinimalFuture.failedFuture(e); |
99 } |
144 } |
100 } |
145 } |
101 |
146 |
102 /* The following methods have separate HTTP/1.1 and HTTP/2 implementations */ |
147 /* The following methods have separate HTTP/1.1 and HTTP/2 implementations */ |
103 |
148 |
104 /** |
149 abstract CompletableFuture<ExchangeImpl<T>> sendHeadersAsync(); |
105 * Sends the request headers only. May block until all sent. |
|
106 */ |
|
107 abstract void sendHeadersOnly() throws IOException, InterruptedException; |
|
108 |
150 |
109 // Blocking impl but in async style |
151 /** Sends a request body, after request headers have been sent. */ |
110 |
152 abstract CompletableFuture<ExchangeImpl<T>> sendBodyAsync(); |
111 CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() { |
|
112 // this is blocking. cf will already be completed. |
|
113 return MinimalFuture.supply(() -> { |
|
114 sendHeadersOnly(); |
|
115 return this; |
|
116 }); |
|
117 } |
|
118 |
|
119 /** |
|
120 * Gets response by blocking if necessary. This may be an |
|
121 * intermediate response (like 101) or a final response 200 etc. Returns |
|
122 * before body is read. |
|
123 */ |
|
124 abstract Response getResponse() throws IOException; |
|
125 |
|
126 abstract T readBody(HttpResponse.BodyHandler<T> handler, |
|
127 boolean returnConnectionToPool) throws IOException; |
|
128 |
153 |
129 abstract CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler, |
154 abstract CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler, |
130 boolean returnConnectionToPool, |
155 boolean returnConnectionToPool, |
131 Executor executor); |
156 Executor executor); |
132 |
157 |
133 /** |
158 /** Gets the response headers. Completes before body is read. */ |
134 * Async version of getResponse. Completes before body is read. |
|
135 */ |
|
136 abstract CompletableFuture<Response> getResponseAsync(Executor executor); |
159 abstract CompletableFuture<Response> getResponseAsync(Executor executor); |
137 |
160 |
138 /** |
|
139 * Sends a request body after request headers. |
|
140 */ |
|
141 abstract void sendBody() throws IOException, InterruptedException; |
|
142 |
161 |
143 // Async version of sendBody(). This only used when body sent separately |
162 /** Cancels a request. Not currently exposed through API. */ |
144 // to headers (100 continue) |
|
145 CompletableFuture<ExchangeImpl<T>> sendBodyAsync() { |
|
146 return MinimalFuture.supply(() -> { |
|
147 sendBody(); |
|
148 return this; |
|
149 }); |
|
150 } |
|
151 |
|
152 /** |
|
153 * Cancels a request. Not currently exposed through API. |
|
154 */ |
|
155 abstract void cancel(); |
163 abstract void cancel(); |
156 |
164 |
157 /** |
165 /** |
158 * Cancels a request with a cause. Not currently exposed through API. |
166 * Cancels a request with a cause. Not currently exposed through API. |
159 */ |
167 */ |
160 abstract void cancel(IOException cause); |
168 abstract void cancel(IOException cause); |
|
169 |
|
170 /** |
|
171 * Called when the exchange is released, so that cleanup actions may be |
|
172 * performed - such as deregistering callbacks. |
|
173 * Typically released is called during upgrade, when an HTTP/2 stream |
|
174 * takes over from an Http1Exchange, or when a new exchange is created |
|
175 * during a multi exchange before the final response body was received. |
|
176 */ |
|
177 abstract void released(); |
|
178 |
|
179 /** |
|
180 * Called when the exchange is completed, so that cleanup actions may be |
|
181 * performed - such as deregistering callbacks. |
|
182 * Typically, completed is called at the end of the exchange, when the |
|
183 * final response body has been received (or an error has caused the |
|
184 * completion of the exchange). |
|
185 */ |
|
186 abstract void completed(); |
|
187 |
|
188 /** |
|
189 * Returns true if this exchange was canceled. |
|
190 * @return true if this exchange was canceled. |
|
191 */ |
|
192 abstract boolean isCanceled(); |
|
193 |
|
194 /** |
|
195 * Returns the cause for which this exchange was canceled, if available. |
|
196 * @return the cause for which this exchange was canceled, if available. |
|
197 */ |
|
198 abstract Throwable getCancelCause(); |
161 } |
199 } |