/*
* Copyright (c) 2015, 2019, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.internal.net.http;
import java.io.IOException;
import java.lang.System.Logger.Level;
import java.net.InetSocketAddress;
import java.net.ProxySelector;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLPermission;
import java.security.AccessControlContext;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.net.http.HttpClient;
import java.net.http.HttpHeaders;
import java.net.http.HttpResponse;
import java.net.http.HttpTimeoutException;
import jdk.internal.net.http.common.Logger;
import jdk.internal.net.http.common.MinimalFuture;
import jdk.internal.net.http.common.Utils;
import jdk.internal.net.http.common.Log;
import static jdk.internal.net.http.common.Utils.permissionForProxy;
/**
* One request/response exchange (handles 100/101 intermediate response also).
* depth field used to track number of times a new request is being sent
* for a given API request. If limit exceeded exception is thrown.
*
* Security check is performed here:
* - uses AccessControlContext captured at API level
* - checks for appropriate URLPermission for request
* - if permission allowed, grants equivalent SocketPermission to call
* - in case of direct HTTP proxy, checks additionally for access to proxy
* (CONNECT proxying uses its own Exchange, so check done there)
*
*/
final class Exchange<T> {
final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
final HttpRequestImpl request;
final HttpClientImpl client;
volatile ExchangeImpl<T> exchImpl;
volatile CompletableFuture<? extends ExchangeImpl<T>> exchangeCF;
volatile CompletableFuture<Void> bodyIgnored;
// used to record possible cancellation raised before the exchImpl
// has been established.
private volatile IOException failed;
final AccessControlContext acc;
final MultiExchange<T> multi;
final Executor parentExecutor;
boolean upgrading; // to HTTP/2
final PushGroup<T> pushGroup;
final String dbgTag;
// Keeps track of the underlying connection when establishing an HTTP/2
// exchange so that it can be aborted/timed out mid setup.
final ConnectionAborter connectionAborter = new ConnectionAborter();
Exchange(HttpRequestImpl request, MultiExchange<T> multi) {
this.request = request;
this.upgrading = false;
this.client = multi.client();
this.multi = multi;
this.acc = multi.acc;
this.parentExecutor = multi.executor;
this.pushGroup = multi.pushGroup;
this.dbgTag = "Exchange";
}
/* If different AccessControlContext to be used */
Exchange(HttpRequestImpl request,
MultiExchange<T> multi,
AccessControlContext acc)
{
this.request = request;
this.acc = acc;
this.upgrading = false;
this.client = multi.client();
this.multi = multi;
this.parentExecutor = multi.executor;
this.pushGroup = multi.pushGroup;
this.dbgTag = "Exchange";
}
PushGroup<T> getPushGroup() {
return pushGroup;
}
Executor executor() {
return parentExecutor;
}
public HttpRequestImpl request() {
return request;
}
public Optional<Duration> remainingConnectTimeout() {
return multi.remainingConnectTimeout();
}
HttpClientImpl client() {
return client;
}
// Keeps track of the underlying connection when establishing an HTTP/2
// exchange so that it can be aborted/timed out mid setup.
static final class ConnectionAborter {
private volatile HttpConnection connection;
void connection(HttpConnection connection) {
this.connection = connection;
}
void closeConnection() {
HttpConnection connection = this.connection;
this.connection = null;
if (connection != null) {
try {
connection.close();
} catch (Throwable t) {
// ignore
}
}
}
}
// Called for 204 response - when no body is permitted
// This is actually only needed for HTTP/1.1 in order
// to return the connection to the pool (or close it)
void nullBody(HttpResponse<T> resp, Throwable t) {
exchImpl.nullBody(resp, t);
}
public CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler) {
// If we received a 407 while establishing the exchange
// there will be no body to read: bodyIgnored will be true,
// and exchImpl will be null (if we were trying to establish
// an HTTP/2 tunnel through an HTTP/1.1 proxy)
if (bodyIgnored != null) return MinimalFuture.completedFuture(null);
// The connection will not be returned to the pool in the case of WebSocket
return exchImpl.readBodyAsync(handler, !request.isWebSocket(), parentExecutor)
.whenComplete((r,t) -> exchImpl.completed());
}
/**
* Called after a redirect or similar kind of retry where a body might
* be sent but we don't want it. Should send a RESET in h2. For http/1.1
* we can consume small quantity of data, or close the connection in
* other cases.
*/
public CompletableFuture<Void> ignoreBody() {
if (bodyIgnored != null) return bodyIgnored;
return exchImpl.ignoreBody();
}
/**
* Called when a new exchange is created to replace this exchange.
* At this point it is guaranteed that readBody/readBodyAsync will
* not be called.
*/
public void released() {
ExchangeImpl<?> impl = exchImpl;
if (impl != null) impl.released();
// Don't set exchImpl to null here. We need to keep
// it alive until it's replaced by a Stream in wrapForUpgrade.
// Setting it to null here might get it GC'ed too early, because
// the Http1Response is now only weakly referenced by the Selector.
}
public void cancel() {
// cancel can be called concurrently before or at the same time
// that the exchange impl is being established.
// In that case we won't be able to propagate the cancellation
// right away
if (exchImpl != null) {
exchImpl.cancel();
} else {
// no impl - can't cancel impl yet.
// call cancel(IOException) instead which takes care
// of race conditions between impl/cancel.
cancel(new IOException("Request cancelled"));
}
}
public void cancel(IOException cause) {
if (debug.on()) debug.log("cancel exchImpl: %s, with \"%s\"", exchImpl, cause);
// If the impl is non null, propagate the exception right away.
// Otherwise record it so that it can be propagated once the
// exchange impl has been established.
ExchangeImpl<?> impl = exchImpl;
if (impl != null) {
// propagate the exception to the impl
if (debug.on()) debug.log("Cancelling exchImpl: %s", exchImpl);
impl.cancel(cause);
} else {
// no impl yet. record the exception
failed = cause;
// abort/close the connection if setting up the exchange. This can
// be important when setting up HTTP/2
connectionAborter.closeConnection();
// now call checkCancelled to recheck the impl.
// if the failed state is set and the impl is not null, reset
// the failed state and propagate the exception to the impl.
checkCancelled();
}
}
// This method will raise an exception if one was reported and if
// it is possible to do so. If the exception can be raised, then
// the failed state will be reset. Otherwise, the failed state
// will persist until the exception can be raised and the failed state
// can be cleared.
// Takes care of possible race conditions.
private void checkCancelled() {
ExchangeImpl<?> impl = null;
IOException cause = null;
CompletableFuture<? extends ExchangeImpl<T>> cf = null;
if (failed != null) {
synchronized(this) {
cause = failed;
impl = exchImpl;
cf = exchangeCF;
}
}
if (cause == null) return;
if (impl != null) {
// The exception is raised by propagating it to the impl.
if (debug.on()) debug.log("Cancelling exchImpl: %s", impl);
impl.cancel(cause);
failed = null;
} else {
Log.logTrace("Exchange: request [{0}/timeout={1}ms] no impl is set."
+ "\n\tCan''t cancel yet with {2}",
request.uri(),
request.timeout().isPresent() ?
// calling duration.toMillis() can throw an exception.
// this is just debugging, we don't care if it overflows.
(request.timeout().get().getSeconds() * 1000
+ request.timeout().get().getNano() / 1000000) : -1,
cause);
if (cf != null) cf.completeExceptionally(cause);
}
}
public void h2Upgrade() {
upgrading = true;
request.setH2Upgrade(client.client2());
}
synchronized IOException getCancelCause() {
return failed;
}
// get/set the exchange impl, solving race condition issues with
// potential concurrent calls to cancel() or cancel(IOException)
private CompletableFuture<? extends ExchangeImpl<T>>
establishExchange(HttpConnection connection) {
if (debug.on()) {
debug.log("establishing exchange for %s,%n\t proxy=%s",
request, request.proxy());
}
// check if we have been cancelled first.
Throwable t = getCancelCause();
checkCancelled();
if (t != null) {
return MinimalFuture.failedFuture(t);
}
CompletableFuture<? extends ExchangeImpl<T>> cf, res;
cf = ExchangeImpl.get(this, connection);
// We should probably use a VarHandle to get/set exchangeCF
// instead - as we need CAS semantics.
synchronized (this) { exchangeCF = cf; };
res = cf.whenComplete((r,x) -> {
synchronized(Exchange.this) {
if (exchangeCF == cf) exchangeCF = null;
}
});
checkCancelled();
return res.thenCompose((eimpl) -> {
// recheck for cancelled, in case of race conditions
exchImpl = eimpl;
IOException tt = getCancelCause();
checkCancelled();
if (tt != null) {
return MinimalFuture.failedFuture(tt);
} else {
// Now we're good to go. Because exchImpl is no longer
// null cancel() will be able to propagate directly to
// the impl after this point ( if needed ).
return MinimalFuture.completedFuture(eimpl);
} });
}
// Completed HttpResponse will be null if response succeeded
// will be a non null responseAsync if expect continue returns an error
public CompletableFuture<Response> responseAsync() {
return responseAsyncImpl(null);
}
CompletableFuture<Response> responseAsyncImpl(HttpConnection connection) {
SecurityException e = checkPermissions();
if (e != null) {
return MinimalFuture.failedFuture(e);
} else {
return responseAsyncImpl0(connection);
}
}
// check whether the headersSentCF was completed exceptionally with
// ProxyAuthorizationRequired. If so the Response embedded in the
// exception is returned. Otherwise we proceed.
private CompletableFuture<Response> checkFor407(ExchangeImpl<T> ex, Throwable t,
Function<ExchangeImpl<T>,CompletableFuture<Response>> andThen) {
t = Utils.getCompletionCause(t);
if (t instanceof ProxyAuthenticationRequired) {
if (debug.on()) debug.log("checkFor407: ProxyAuthenticationRequired: building synthetic response");
bodyIgnored = MinimalFuture.completedFuture(null);
Response proxyResponse = ((ProxyAuthenticationRequired)t).proxyResponse;
HttpConnection c = ex == null ? null : ex.connection();
Response syntheticResponse = new Response(request, this,
proxyResponse.headers, c, proxyResponse.statusCode,
proxyResponse.version, true);
return MinimalFuture.completedFuture(syntheticResponse);
} else if (t != null) {
if (debug.on()) debug.log("checkFor407: no response - %s", (Object)t);
return MinimalFuture.failedFuture(t);
} else {
if (debug.on()) debug.log("checkFor407: all clear");
return andThen.apply(ex);
}
}
// After sending the request headers, if no ProxyAuthorizationRequired
// was raised and the expectContinue flag is on, we need to wait
// for the 100-Continue response
private CompletableFuture<Response> expectContinue(ExchangeImpl<T> ex) {
assert request.expectContinue();
return ex.getResponseAsync(parentExecutor)
.thenCompose((Response r1) -> {
Log.logResponse(r1::toString);
int rcode = r1.statusCode();
if (rcode == 100) {
Log.logTrace("Received 100-Continue: sending body");
if (debug.on()) debug.log("Received 100-Continue for %s", r1);
CompletableFuture<Response> cf =
exchImpl.sendBodyAsync()
.thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
cf = wrapForUpgrade(cf);
cf = wrapForLog(cf);
return cf;
} else {
Log.logTrace("Expectation failed: Received {0}",
rcode);
if (debug.on()) debug.log("Expect-Continue failed (%d) for: %s", rcode, r1);
if (upgrading && rcode == 101) {
IOException failed = new IOException(
"Unable to handle 101 while waiting for 100");
return MinimalFuture.failedFuture(failed);
}
return exchImpl.readBodyAsync(this::ignoreBody, false, parentExecutor)
.thenApply(v -> r1);
}
});
}
// After sending the request headers, if no ProxyAuthorizationRequired
// was raised and the expectContinue flag is off, we can immediately
// send the request body and proceed.
private CompletableFuture<Response> sendRequestBody(ExchangeImpl<T> ex) {
assert !request.expectContinue();
if (debug.on()) debug.log("sendRequestBody");
CompletableFuture<Response> cf = ex.sendBodyAsync()
.thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
cf = wrapForUpgrade(cf);
cf = wrapForLog(cf);
return cf;
}
CompletableFuture<Response> responseAsyncImpl0(HttpConnection connection) {
Function<ExchangeImpl<T>, CompletableFuture<Response>> after407Check;
bodyIgnored = null;
if (request.expectContinue()) {
request.addSystemHeader("Expect", "100-Continue");
Log.logTrace("Sending Expect: 100-Continue");
// wait for 100-Continue before sending body
after407Check = this::expectContinue;
} else {
// send request body and proceed.
after407Check = this::sendRequestBody;
}
// The ProxyAuthorizationRequired can be triggered either by
// establishExchange (case of HTTP/2 SSL tunneling through HTTP/1.1 proxy
// or by sendHeaderAsync (case of HTTP/1.1 SSL tunneling through HTTP/1.1 proxy
// Therefore we handle it with a call to this checkFor407(...) after these
// two places.
Function<ExchangeImpl<T>, CompletableFuture<Response>> afterExch407Check =
(ex) -> ex.sendHeadersAsync()
.handle((r,t) -> this.checkFor407(r, t, after407Check))
.thenCompose(Function.identity());
return establishExchange(connection)
.handle((r,t) -> this.checkFor407(r,t, afterExch407Check))
.thenCompose(Function.identity());
}
private CompletableFuture<Response> wrapForUpgrade(CompletableFuture<Response> cf) {
if (upgrading) {
return cf.thenCompose(r -> checkForUpgradeAsync(r, exchImpl));
}
return cf;
}
private CompletableFuture<Response> wrapForLog(CompletableFuture<Response> cf) {
if (Log.requests()) {
return cf.thenApply(response -> {
Log.logResponse(response::toString);
return response;
});
}
return cf;
}
HttpResponse.BodySubscriber<T> ignoreBody(HttpResponse.ResponseInfo hdrs) {
return HttpResponse.BodySubscribers.replacing(null);
}
// if this response was received in reply to an upgrade
// then create the Http2Connection from the HttpConnection
// initialize it and wait for the real response on a newly created Stream
private CompletableFuture<Response>
checkForUpgradeAsync(Response resp,
ExchangeImpl<T> ex) {
int rcode = resp.statusCode();
if (upgrading && (rcode == 101)) {
Http1Exchange<T> e = (Http1Exchange<T>)ex;
// check for 101 switching protocols
// 101 responses are not supposed to contain a body.
// => should we fail if there is one?
if (debug.on()) debug.log("Upgrading async %s", e.connection());
return e.readBodyAsync(this::ignoreBody, false, parentExecutor)
.thenCompose((T v) -> {// v is null
debug.log("Ignored body");
// we pass e::getBuffer to allow the ByteBuffers to accumulate
// while we build the Http2Connection
return Http2Connection.createAsync(e.connection(),
client.client2(),
this, e::drainLeftOverBytes)
.thenCompose((Http2Connection c) -> {
boolean cached = c.offerConnection();
Stream<T> s = c.getStream(1);
if (s == null) {
// s can be null if an exception occurred
// asynchronously while sending the preface.
Throwable t = c.getRecordedCause();
IOException ioe;
if (t != null) {
if (!cached)
c.close();
ioe = new IOException("Can't get stream 1: " + t, t);
} else {
ioe = new IOException("Can't get stream 1");
}
return MinimalFuture.failedFuture(ioe);
}
exchImpl.released();
Throwable t;
// There's a race condition window where an external
// thread (SelectorManager) might complete the
// exchange in timeout at the same time where we're
// trying to switch the exchange impl.
// 'failed' will be reset to null after
// exchImpl.cancel() has completed, so either we
// will observe failed != null here, or we will
// observe e.getCancelCause() != null, or the
// timeout exception will be routed to 's'.
// Either way, we need to relay it to s.
synchronized (this) {
exchImpl = s;
t = failed;
}
// Check whether the HTTP/1.1 was cancelled.
if (t == null) t = e.getCancelCause();
// if HTTP/1.1 exchange was timed out, don't
// try to go further.
if (t instanceof HttpTimeoutException) {
s.cancelImpl(t);
return MinimalFuture.failedFuture(t);
}
if (debug.on())
debug.log("Getting response async %s", s);
return s.getResponseAsync(null);
});}
);
}
return MinimalFuture.completedFuture(resp);
}
private URI getURIForSecurityCheck() {
URI u;
String method = request.method();
InetSocketAddress authority = request.authority();
URI uri = request.uri();
// CONNECT should be restricted at API level
if (method.equalsIgnoreCase("CONNECT")) {
try {
u = new URI("socket",
null,
authority.getHostString(),
authority.getPort(),
null,
null,
null);
} catch (URISyntaxException e) {
throw new InternalError(e); // shouldn't happen
}
} else {
u = uri;
}
return u;
}
/**
* Returns the security permission required for the given details.
* If method is CONNECT, then uri must be of form "scheme://host:port"
*/
private static URLPermission permissionForServer(URI uri,
String method,
Map<String, List<String>> headers) {
if (method.equals("CONNECT")) {
return new URLPermission(uri.toString(), "CONNECT");
} else {
return Utils.permissionForServer(uri, method, headers.keySet().stream());
}
}
/**
* Performs the necessary security permission checks required to retrieve
* the response. Returns a security exception representing the denied
* permission, or null if all checks pass or there is no security manager.
*/
private SecurityException checkPermissions() {
String method = request.method();
SecurityManager sm = System.getSecurityManager();
if (sm == null || method.equals("CONNECT")) {
// tunneling will have a null acc, which is fine. The proxy
// permission check will have already been preformed.
return null;
}
HttpHeaders userHeaders = request.getUserHeaders();
URI u = getURIForSecurityCheck();
URLPermission p = permissionForServer(u, method, userHeaders.map());
try {
assert acc != null;
sm.checkPermission(p, acc);
} catch (SecurityException e) {
return e;
}
String hostHeader = userHeaders.firstValue("Host").orElse(null);
if (hostHeader != null && !hostHeader.equalsIgnoreCase(u.getHost())) {
// user has set a Host header different to request URI
// must check that for URLPermission also
URI u1 = replaceHostInURI(u, hostHeader);
URLPermission p1 = permissionForServer(u1, method, userHeaders.map());
try {
assert acc != null;
sm.checkPermission(p1, acc);
} catch (SecurityException e) {
return e;
}
}
ProxySelector ps = client.proxySelector();
if (ps != null) {
if (!method.equals("CONNECT")) {
// a non-tunneling HTTP proxy. Need to check access
URLPermission proxyPerm = permissionForProxy(request.proxy());
if (proxyPerm != null) {
try {
sm.checkPermission(proxyPerm, acc);
} catch (SecurityException e) {
return e;
}
}
}
}
return null;
}
private static URI replaceHostInURI(URI u, String hostPort) {
StringBuilder sb = new StringBuilder();
sb.append(u.getScheme())
.append("://")
.append(hostPort)
.append(u.getRawPath());
return URI.create(sb.toString());
}
HttpClient.Version version() {
return multi.version();
}
String dbgString() {
return dbgTag;
}
}