/*
* Copyright (c) 2015, 2019, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.internal.net.http;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLHandshakeException;
import javax.net.ssl.SSLParameters;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.net.Authenticator;
import java.net.ConnectException;
import java.net.CookieHandler;
import java.net.ProxySelector;
import java.net.http.HttpConnectTimeoutException;
import java.net.http.HttpTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.NoSuchAlgorithmException;
import java.security.PrivilegedAction;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.stream.Stream;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.PushPromiseHandler;
import java.net.http.WebSocket;
import jdk.internal.net.http.common.BufferSupplier;
import jdk.internal.net.http.common.Log;
import jdk.internal.net.http.common.Logger;
import jdk.internal.net.http.common.Pair;
import jdk.internal.net.http.common.Utils;
import jdk.internal.net.http.common.OperationTrackers.Trackable;
import jdk.internal.net.http.common.OperationTrackers.Tracker;
import jdk.internal.net.http.websocket.BuilderImpl;
import jdk.internal.misc.InnocuousThread;
/**
* Client implementation. Contains all configuration information and also
* the selector manager thread which allows async events to be registered
* and delivered when they occur. See AsyncEvent.
*/
final class HttpClientImpl extends HttpClient implements Trackable {
static final boolean DEBUGELAPSED = Utils.TESTING || Utils.DEBUG; // dev flag
static final boolean DEBUGTIMEOUT = false; // dev flag
final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
final Logger debugelapsed = Utils.getDebugLogger(this::dbgString, DEBUGELAPSED);
final Logger debugtimeout = Utils.getDebugLogger(this::dbgString, DEBUGTIMEOUT);
static final AtomicLong CLIENT_IDS = new AtomicLong();
// Define the default factory as a static inner class
// that embeds all the necessary logic to avoid
// the risk of using a lambda that might keep a reference on the
// HttpClient instance from which it was created (helps with
// heapdump analysis).
private static final class DefaultThreadFactory implements ThreadFactory {
private final String namePrefix;
private final AtomicInteger nextId = new AtomicInteger();
DefaultThreadFactory(long clientID) {
namePrefix = "HttpClient-" + clientID + "-Worker-";
}
@Override
public Thread newThread(Runnable r) {
String name = namePrefix + nextId.getAndIncrement();
Thread t;
if (System.getSecurityManager() == null) {
t = new Thread(null, r, name, 0, false);
} else {
t = InnocuousThread.newThread(name, r);
}
t.setDaemon(true);
return t;
}
}
/**
* A DelegatingExecutor is an executor that delegates tasks to
* a wrapped executor when it detects that the current thread
* is the SelectorManager thread. If the current thread is not
* the selector manager thread the given task is executed inline.
*/
final static class DelegatingExecutor implements Executor {
private final BooleanSupplier isInSelectorThread;
private final Executor delegate;
DelegatingExecutor(BooleanSupplier isInSelectorThread, Executor delegate) {
this.isInSelectorThread = isInSelectorThread;
this.delegate = delegate;
}
Executor delegate() {
return delegate;
}
@Override
public void execute(Runnable command) {
if (isInSelectorThread.getAsBoolean()) {
delegate.execute(command);
} else {
command.run();
}
}
}
private final CookieHandler cookieHandler;
private final Duration connectTimeout;
private final Redirect followRedirects;
private final ProxySelector userProxySelector;
private final ProxySelector proxySelector;
private final Authenticator authenticator;
private final Version version;
private final ConnectionPool connections;
private final DelegatingExecutor delegatingExecutor;
private final boolean isDefaultExecutor;
// Security parameters
private final SSLContext sslContext;
private final SSLParameters sslParams;
private final SelectorManager selmgr;
private final FilterFactory filters;
private final Http2ClientImpl client2;
private final long id;
private final String dbgTag;
// The SSL DirectBuffer Supplier provides the ability to recycle
// buffers used between the socket reader and the SSLEngine, or
// more precisely between the SocketTube publisher and the
// SSLFlowDelegate reader.
private final SSLDirectBufferSupplier sslBufferSupplier
= new SSLDirectBufferSupplier(this);
// This reference is used to keep track of the facade HttpClient
// that was returned to the application code.
// It makes it possible to know when the application no longer
// holds any reference to the HttpClient.
// Unfortunately, this information is not enough to know when
// to exit the SelectorManager thread. Because of the asynchronous
// nature of the API, we also need to wait until all pending operations
// have completed.
private final WeakReference<HttpClientFacade> facadeRef;
// This counter keeps track of the number of operations pending
// on the HttpClient. The SelectorManager thread will wait
// until there are no longer any pending operations and the
// facadeRef is cleared before exiting.
//
// The pendingOperationCount is incremented every time a send/sendAsync
// operation is invoked on the HttpClient, and is decremented when
// the HttpResponse<T> object is returned to the user.
// However, at this point, the body may not have been fully read yet.
// This is the case when the response T is implemented as a streaming
// subscriber (such as an InputStream).
//
// To take care of this issue the pendingOperationCount will additionally
// be incremented/decremented in the following cases:
//
// 1. For HTTP/2 it is incremented when a stream is added to the
// Http2Connection streams map, and decreased when the stream is removed
// from the map. This should also take care of push promises.
// 2. For WebSocket the count is increased when creating a
// DetachedConnectionChannel for the socket, and decreased
// when the channel is closed.
// In addition, the HttpClient facade is passed to the WebSocket builder,
// (instead of the client implementation delegate).
// 3. For HTTP/1.1 the count is incremented before starting to parse the body
// response, and decremented when the parser has reached the end of the
// response body flow.
//
// This should ensure that the selector manager thread remains alive until
// the response has been fully received or the web socket is closed.
private final AtomicLong pendingOperationCount = new AtomicLong();
private final AtomicLong pendingWebSocketCount = new AtomicLong();
private final AtomicLong pendingHttpRequestCount = new AtomicLong();
private final AtomicLong pendingHttp2StreamCount = new AtomicLong();
/** A Set of, deadline first, ordered timeout events. */
private final TreeSet<TimeoutEvent> timeouts;
/**
* This is a bit tricky:
* 1. an HttpClientFacade has a final HttpClientImpl field.
* 2. an HttpClientImpl has a final WeakReference<HttpClientFacade> field,
* where the referent is the facade created for that instance.
* 3. We cannot just create the HttpClientFacade in the HttpClientImpl
* constructor, because it would be only weakly referenced and could
* be GC'ed before we can return it.
* The solution is to use an instance of SingleFacadeFactory which will
* allow the caller of new HttpClientImpl(...) to retrieve the facade
* after the HttpClientImpl has been created.
*/
private static final class SingleFacadeFactory {
HttpClientFacade facade;
HttpClientFacade createFacade(HttpClientImpl impl) {
assert facade == null;
return (facade = new HttpClientFacade(impl));
}
}
static HttpClientFacade create(HttpClientBuilderImpl builder) {
SingleFacadeFactory facadeFactory = new SingleFacadeFactory();
HttpClientImpl impl = new HttpClientImpl(builder, facadeFactory);
impl.start();
assert facadeFactory.facade != null;
assert impl.facadeRef.get() == facadeFactory.facade;
return facadeFactory.facade;
}
private HttpClientImpl(HttpClientBuilderImpl builder,
SingleFacadeFactory facadeFactory) {
id = CLIENT_IDS.incrementAndGet();
dbgTag = "HttpClientImpl(" + id +")";
if (builder.sslContext == null) {
try {
sslContext = SSLContext.getDefault();
} catch (NoSuchAlgorithmException ex) {
throw new InternalError(ex);
}
} else {
sslContext = builder.sslContext;
}
Executor ex = builder.executor;
if (ex == null) {
ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id));
isDefaultExecutor = true;
} else {
isDefaultExecutor = false;
}
delegatingExecutor = new DelegatingExecutor(this::isSelectorThread, ex);
facadeRef = new WeakReference<>(facadeFactory.createFacade(this));
client2 = new Http2ClientImpl(this);
cookieHandler = builder.cookieHandler;
connectTimeout = builder.connectTimeout;
followRedirects = builder.followRedirects == null ?
Redirect.NEVER : builder.followRedirects;
this.userProxySelector = builder.proxy;
this.proxySelector = Optional.ofNullable(userProxySelector)
.orElseGet(HttpClientImpl::getDefaultProxySelector);
if (debug.on())
debug.log("proxySelector is %s (user-supplied=%s)",
this.proxySelector, userProxySelector != null);
authenticator = builder.authenticator;
if (builder.version == null) {
version = HttpClient.Version.HTTP_2;
} else {
version = builder.version;
}
if (builder.sslParams == null) {
sslParams = getDefaultParams(sslContext);
} else {
sslParams = builder.sslParams;
}
connections = new ConnectionPool(id);
connections.start();
timeouts = new TreeSet<>();
try {
selmgr = new SelectorManager(this);
} catch (IOException e) {
// unlikely
throw new InternalError(e);
}
selmgr.setDaemon(true);
filters = new FilterFactory();
initFilters();
assert facadeRef.get() != null;
}
private void start() {
selmgr.start();
}
// Called from the SelectorManager thread, just before exiting.
// Clears the HTTP/1.1 and HTTP/2 cache, ensuring that the connections
// that may be still lingering there are properly closed (and their
// possibly still opened SocketChannel released).
private void stop() {
// Clears HTTP/1.1 cache and close its connections
connections.stop();
// Clears HTTP/2 cache and close its connections.
client2.stop();
}
private static SSLParameters getDefaultParams(SSLContext ctx) {
SSLParameters params = ctx.getSupportedSSLParameters();
String[] protocols = params.getProtocols();
boolean found13 = false;
for (String proto : protocols) {
if (proto.equals("TLSv1.3")) {
found13 = true;
break;
}
}
if (found13)
params.setProtocols(new String[] {"TLSv1.3", "TLSv1.2"});
else
params.setProtocols(new String[] {"TLSv1.2"});
return params;
}
private static ProxySelector getDefaultProxySelector() {
PrivilegedAction<ProxySelector> action = ProxySelector::getDefault;
return AccessController.doPrivileged(action);
}
// Returns the facade that was returned to the application code.
// May be null if that facade is no longer referenced.
final HttpClientFacade facade() {
return facadeRef.get();
}
// Increments the pendingOperationCount.
final long reference() {
pendingHttpRequestCount.incrementAndGet();
return pendingOperationCount.incrementAndGet();
}
// Decrements the pendingOperationCount.
final long unreference() {
final long count = pendingOperationCount.decrementAndGet();
final long httpCount = pendingHttpRequestCount.decrementAndGet();
final long http2Count = pendingHttp2StreamCount.get();
final long webSocketCount = pendingWebSocketCount.get();
if (count == 0 && facade() == null) {
selmgr.wakeupSelector();
}
assert httpCount >= 0 : "count of HTTP/1.1 operations < 0";
assert http2Count >= 0 : "count of HTTP/2 operations < 0";
assert webSocketCount >= 0 : "count of WS operations < 0";
assert count >= 0 : "count of pending operations < 0";
return count;
}
// Increments the pendingOperationCount.
final long streamReference() {
pendingHttp2StreamCount.incrementAndGet();
return pendingOperationCount.incrementAndGet();
}
// Decrements the pendingOperationCount.
final long streamUnreference() {
final long count = pendingOperationCount.decrementAndGet();
final long http2Count = pendingHttp2StreamCount.decrementAndGet();
final long httpCount = pendingHttpRequestCount.get();
final long webSocketCount = pendingWebSocketCount.get();
if (count == 0 && facade() == null) {
selmgr.wakeupSelector();
}
assert httpCount >= 0 : "count of HTTP/1.1 operations < 0";
assert http2Count >= 0 : "count of HTTP/2 operations < 0";
assert webSocketCount >= 0 : "count of WS operations < 0";
assert count >= 0 : "count of pending operations < 0";
return count;
}
// Increments the pendingOperationCount.
final long webSocketOpen() {
pendingWebSocketCount.incrementAndGet();
return pendingOperationCount.incrementAndGet();
}
// Decrements the pendingOperationCount.
final long webSocketClose() {
final long count = pendingOperationCount.decrementAndGet();
final long webSocketCount = pendingWebSocketCount.decrementAndGet();
final long httpCount = pendingHttpRequestCount.get();
final long http2Count = pendingHttp2StreamCount.get();
if (count == 0 && facade() == null) {
selmgr.wakeupSelector();
}
assert httpCount >= 0 : "count of HTTP/1.1 operations < 0";
assert http2Count >= 0 : "count of HTTP/2 operations < 0";
assert webSocketCount >= 0 : "count of WS operations < 0";
assert count >= 0 : "count of pending operations < 0";
return count;
}
// Returns the pendingOperationCount.
final long referenceCount() {
return pendingOperationCount.get();
}
final static class HttpClientTracker implements Tracker {
final AtomicLong httpCount;
final AtomicLong http2Count;
final AtomicLong websocketCount;
final AtomicLong operationsCount;
final Reference<?> reference;
final String name;
HttpClientTracker(AtomicLong http,
AtomicLong http2,
AtomicLong ws,
AtomicLong ops,
Reference<?> ref,
String name) {
this.httpCount = http;
this.http2Count = http2;
this.websocketCount = ws;
this.operationsCount = ops;
this.reference = ref;
this.name = name;
}
@Override
public long getOutstandingOperations() {
return operationsCount.get();
}
@Override
public long getOutstandingHttpOperations() {
return httpCount.get();
}
@Override
public long getOutstandingHttp2Streams() { return http2Count.get(); }
@Override
public long getOutstandingWebSocketOperations() {
return websocketCount.get();
}
@Override
public boolean isFacadeReferenced() {
return reference.get() != null;
}
@Override
public String getName() {
return name;
}
}
public Tracker getOperationsTracker() {
return new HttpClientTracker(pendingHttpRequestCount,
pendingHttp2StreamCount,
pendingWebSocketCount,
pendingOperationCount,
facadeRef,
dbgTag);
}
// Called by the SelectorManager thread to figure out whether it's time
// to terminate.
final boolean isReferenced() {
HttpClient facade = facade();
return facade != null || referenceCount() > 0;
}
/**
* Wait for activity on given exchange.
* The following occurs in the SelectorManager thread.
*
* 1) add to selector
* 2) If selector fires for this exchange then
* call AsyncEvent.handle()
*
* If exchange needs to change interest ops, then call registerEvent() again.
*/
void registerEvent(AsyncEvent exchange) throws IOException {
selmgr.register(exchange);
}
/**
* Allows an AsyncEvent to modify its interestOps.
* @param event The modified event.
*/
void eventUpdated(AsyncEvent event) throws ClosedChannelException {
assert !(event instanceof AsyncTriggerEvent);
selmgr.eventUpdated(event);
}
boolean isSelectorThread() {
return Thread.currentThread() == selmgr;
}
Http2ClientImpl client2() {
return client2;
}
private void debugCompleted(String tag, long startNanos, HttpRequest req) {
if (debugelapsed.on()) {
debugelapsed.log(tag + " elapsed "
+ (System.nanoTime() - startNanos)/1000_000L
+ " millis for " + req.method()
+ " to " + req.uri());
}
}
@Override
public <T> HttpResponse<T>
send(HttpRequest req, BodyHandler<T> responseHandler)
throws IOException, InterruptedException
{
CompletableFuture<HttpResponse<T>> cf = null;
try {
cf = sendAsync(req, responseHandler, null, null);
return cf.get();
} catch (InterruptedException ie) {
if (cf != null )
cf.cancel(true);
throw ie;
} catch (ExecutionException e) {
final Throwable throwable = e.getCause();
final String msg = throwable.getMessage();
if (throwable instanceof IllegalArgumentException) {
throw new IllegalArgumentException(msg, throwable);
} else if (throwable instanceof SecurityException) {
throw new SecurityException(msg, throwable);
} else if (throwable instanceof HttpConnectTimeoutException) {
HttpConnectTimeoutException hcte = new HttpConnectTimeoutException(msg);
hcte.initCause(throwable);
throw hcte;
} else if (throwable instanceof HttpTimeoutException) {
throw new HttpTimeoutException(msg);
} else if (throwable instanceof ConnectException) {
ConnectException ce = new ConnectException(msg);
ce.initCause(throwable);
throw ce;
} else if (throwable instanceof SSLHandshakeException) {
// special case for SSLHandshakeException
SSLHandshakeException he = new SSLHandshakeException(msg);
he.initCause(throwable);
throw he;
} else if (throwable instanceof SSLException) {
// any other SSLException is wrapped in a plain
// SSLException
throw new SSLException(msg, throwable);
} else if (throwable instanceof IOException) {
throw new IOException(msg, throwable);
} else {
throw new IOException(msg, throwable);
}
}
}
private static final Executor ASYNC_POOL = new CompletableFuture<Void>().defaultExecutor();
@Override
public <T> CompletableFuture<HttpResponse<T>>
sendAsync(HttpRequest userRequest, BodyHandler<T> responseHandler)
{
return sendAsync(userRequest, responseHandler, null);
}
@Override
public <T> CompletableFuture<HttpResponse<T>>
sendAsync(HttpRequest userRequest,
BodyHandler<T> responseHandler,
PushPromiseHandler<T> pushPromiseHandler) {
return sendAsync(userRequest, responseHandler, pushPromiseHandler, delegatingExecutor.delegate);
}
private <T> CompletableFuture<HttpResponse<T>>
sendAsync(HttpRequest userRequest,
BodyHandler<T> responseHandler,
PushPromiseHandler<T> pushPromiseHandler,
Executor exchangeExecutor) {
Objects.requireNonNull(userRequest);
Objects.requireNonNull(responseHandler);
AccessControlContext acc = null;
if (System.getSecurityManager() != null)
acc = AccessController.getContext();
// Clone the, possibly untrusted, HttpRequest
HttpRequestImpl requestImpl = new HttpRequestImpl(userRequest, proxySelector);
if (requestImpl.method().equals("CONNECT"))
throw new IllegalArgumentException("Unsupported method CONNECT");
long start = DEBUGELAPSED ? System.nanoTime() : 0;
reference();
try {
if (debugelapsed.on())
debugelapsed.log("ClientImpl (async) send %s", userRequest);
// When using sendAsync(...) we explicitly pass the
// executor's delegate as exchange executor to force
// asynchronous scheduling of the exchange.
// When using send(...) we don't specify any executor
// and default to using the client's delegating executor
// which only spawns asynchronous tasks if it detects
// that the current thread is the selector manager
// thread. This will cause everything to execute inline
// until we need to schedule some event with the selector.
Executor executor = exchangeExecutor == null
? this.delegatingExecutor : exchangeExecutor;
MultiExchange<T> mex = new MultiExchange<>(userRequest,
requestImpl,
this,
responseHandler,
pushPromiseHandler,
acc);
CompletableFuture<HttpResponse<T>> res =
mex.responseAsync(executor).whenComplete((b,t) -> unreference());
if (DEBUGELAPSED) {
res = res.whenComplete(
(b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));
}
// makes sure that any dependent actions happen in the CF default
// executor. This is only needed for sendAsync(...), when
// exchangeExecutor is non-null.
if (exchangeExecutor != null) {
res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, ASYNC_POOL);
}
return res;
} catch(Throwable t) {
unreference();
debugCompleted("ClientImpl (async)", start, userRequest);
throw t;
}
}
// Main loop for this client's selector
private final static class SelectorManager extends Thread {
// For testing purposes we have an internal System property that
// can control the frequency at which the selector manager will wake
// up when there are no pending operations.
// Increasing the frequency (shorter delays) might allow the selector
// to observe that the facade is no longer referenced and might allow
// the selector thread to terminate more timely - for when nothing is
// ongoing it will only check for that condition every NODEADLINE ms.
// To avoid misuse of the property, the delay that can be specified
// is comprised between [MIN_NODEADLINE, MAX_NODEADLINE], and its default
// value if unspecified (or <= 0) is DEF_NODEADLINE = 3000ms
// The property is -Djdk.internal.httpclient.selectorTimeout=<millis>
private static final int MIN_NODEADLINE = 1000; // ms
private static final int MAX_NODEADLINE = 1000 * 1200; // ms
private static final int DEF_NODEADLINE = 3000; // ms
private static final long NODEADLINE; // default is DEF_NODEADLINE ms
static {
// ensure NODEADLINE is initialized with some valid value.
long deadline = Utils.getIntegerProperty(
"jdk.internal.httpclient.selectorTimeout",
DEF_NODEADLINE); // millis
if (deadline <= 0) deadline = DEF_NODEADLINE;
deadline = Math.max(deadline, MIN_NODEADLINE);
NODEADLINE = Math.min(deadline, MAX_NODEADLINE);
}
private final Selector selector;
private volatile boolean closed;
private final List<AsyncEvent> registrations;
private final List<AsyncTriggerEvent> deregistrations;
private final Logger debug;
private final Logger debugtimeout;
HttpClientImpl owner;
ConnectionPool pool;
SelectorManager(HttpClientImpl ref) throws IOException {
super(null, null,
"HttpClient-" + ref.id + "-SelectorManager",
0, false);
owner = ref;
debug = ref.debug;
debugtimeout = ref.debugtimeout;
pool = ref.connectionPool();
registrations = new ArrayList<>();
deregistrations = new ArrayList<>();
selector = Selector.open();
}
void eventUpdated(AsyncEvent e) throws ClosedChannelException {
if (Thread.currentThread() == this) {
SelectionKey key = e.channel().keyFor(selector);
if (key != null && key.isValid()) {
SelectorAttachment sa = (SelectorAttachment) key.attachment();
sa.register(e);
} else if (e.interestOps() != 0){
// We don't care about paused events.
// These are actually handled by
// SelectorAttachment::resetInterestOps later on.
// But if we reach here when trying to resume an
// event then it's better to fail fast.
if (debug.on()) debug.log("No key for channel");
e.abort(new IOException("No key for channel"));
}
} else {
register(e);
}
}
// This returns immediately. So caller not allowed to send/receive
// on connection.
synchronized void register(AsyncEvent e) {
registrations.add(e);
selector.wakeup();
}
synchronized void cancel(SocketChannel e) {
SelectionKey key = e.keyFor(selector);
if (key != null) {
key.cancel();
}
selector.wakeup();
}
void wakeupSelector() {
selector.wakeup();
}
synchronized void shutdown() {
Log.logTrace("{0}: shutting down", getName());
if (debug.on()) debug.log("SelectorManager shutting down");
closed = true;
try {
selector.close();
} catch (IOException ignored) {
} finally {
owner.stop();
}
}
@Override
public void run() {
List<Pair<AsyncEvent,IOException>> errorList = new ArrayList<>();
List<AsyncEvent> readyList = new ArrayList<>();
List<Runnable> resetList = new ArrayList<>();
try {
if (Log.channel()) Log.logChannel(getName() + ": starting");
while (!Thread.currentThread().isInterrupted()) {
synchronized (this) {
assert errorList.isEmpty();
assert readyList.isEmpty();
assert resetList.isEmpty();
for (AsyncTriggerEvent event : deregistrations) {
event.handle();
}
deregistrations.clear();
for (AsyncEvent event : registrations) {
if (event instanceof AsyncTriggerEvent) {
readyList.add(event);
continue;
}
SelectableChannel chan = event.channel();
SelectionKey key = null;
try {
key = chan.keyFor(selector);
SelectorAttachment sa;
if (key == null || !key.isValid()) {
if (key != null) {
// key is canceled.
// invoke selectNow() to purge it
// before registering the new event.
selector.selectNow();
}
sa = new SelectorAttachment(chan, selector);
} else {
sa = (SelectorAttachment) key.attachment();
}
// may throw IOE if channel closed: that's OK
sa.register(event);
if (!chan.isOpen()) {
throw new IOException("Channel closed");
}
} catch (IOException e) {
Log.logTrace("{0}: {1}", getName(), e);
if (debug.on())
debug.log("Got " + e.getClass().getName()
+ " while handling registration events");
chan.close();
// let the event abort deal with it
errorList.add(new Pair<>(event, e));
if (key != null) {
key.cancel();
selector.selectNow();
}
}
}
registrations.clear();
selector.selectedKeys().clear();
}
for (AsyncEvent event : readyList) {
assert event instanceof AsyncTriggerEvent;
event.handle();
}
readyList.clear();
for (Pair<AsyncEvent,IOException> error : errorList) {
// an IOException was raised and the channel closed.
handleEvent(error.first, error.second);
}
errorList.clear();
// Check whether client is still alive, and if not,
// gracefully stop this thread
if (!owner.isReferenced()) {
Log.logTrace("{0}: {1}",
getName(),
"HttpClient no longer referenced. Exiting...");
return;
}
// Timeouts will have milliseconds granularity. It is important
// to handle them in a timely fashion.
long nextTimeout = owner.purgeTimeoutsAndReturnNextDeadline();
if (debugtimeout.on())
debugtimeout.log("next timeout: %d", nextTimeout);
// Keep-alive have seconds granularity. It's not really an
// issue if we keep connections linger a bit more in the keep
// alive cache.
long nextExpiry = pool.purgeExpiredConnectionsAndReturnNextDeadline();
if (debugtimeout.on())
debugtimeout.log("next expired: %d", nextExpiry);
assert nextTimeout >= 0;
assert nextExpiry >= 0;
// Don't wait for ever as it might prevent the thread to
// stop gracefully. millis will be 0 if no deadline was found.
if (nextTimeout <= 0) nextTimeout = NODEADLINE;
// Clip nextExpiry at NODEADLINE limit. The default
// keep alive is 1200 seconds (half an hour) - we don't
// want to wait that long.
if (nextExpiry <= 0) nextExpiry = NODEADLINE;
else nextExpiry = Math.min(NODEADLINE, nextExpiry);
// takes the least of the two.
long millis = Math.min(nextExpiry, nextTimeout);
if (debugtimeout.on())
debugtimeout.log("Next deadline is %d",
(millis == 0 ? NODEADLINE : millis));
//debugPrint(selector);
int n = selector.select(millis == 0 ? NODEADLINE : millis);
if (n == 0) {
// Check whether client is still alive, and if not,
// gracefully stop this thread
if (!owner.isReferenced()) {
Log.logTrace("{0}: {1}",
getName(),
"HttpClient no longer referenced. Exiting...");
return;
}
owner.purgeTimeoutsAndReturnNextDeadline();
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
assert errorList.isEmpty();
for (SelectionKey key : keys) {
SelectorAttachment sa = (SelectorAttachment) key.attachment();
if (!key.isValid()) {
IOException ex = sa.chan.isOpen()
? new IOException("Invalid key")
: new ClosedChannelException();
sa.pending.forEach(e -> errorList.add(new Pair<>(e,ex)));
sa.pending.clear();
continue;
}
int eventsOccurred;
try {
eventsOccurred = key.readyOps();
} catch (CancelledKeyException ex) {
IOException io = Utils.getIOException(ex);
sa.pending.forEach(e -> errorList.add(new Pair<>(e,io)));
sa.pending.clear();
continue;
}
sa.events(eventsOccurred).forEach(readyList::add);
resetList.add(() -> sa.resetInterestOps(eventsOccurred));
}
selector.selectNow(); // complete cancellation
selector.selectedKeys().clear();
// handle selected events
readyList.forEach((e) -> handleEvent(e, null));
readyList.clear();
// handle errors (closed channels etc...)
errorList.forEach((p) -> handleEvent(p.first, p.second));
errorList.clear();
// reset interest ops for selected channels
resetList.forEach(r -> r.run());
resetList.clear();
}
} catch (Throwable e) {
if (!closed) {
// This terminates thread. So, better just print stack trace
String err = Utils.stackTrace(e);
Log.logError("{0}: {1}: {2}", getName(),
"HttpClientImpl shutting down due to fatal error", err);
}
if (debug.on()) debug.log("shutting down", e);
if (Utils.ASSERTIONSENABLED && !debug.on()) {
e.printStackTrace(System.err); // always print the stack
}
} finally {
if (Log.channel()) Log.logChannel(getName() + ": stopping");
shutdown();
}
}
// void debugPrint(Selector selector) {
// System.err.println("Selector: debugprint start");
// Set<SelectionKey> keys = selector.keys();
// for (SelectionKey key : keys) {
// SelectableChannel c = key.channel();
// int ops = key.interestOps();
// System.err.printf("selector chan:%s ops:%d\n", c, ops);
// }
// System.err.println("Selector: debugprint end");
// }
/** Handles the given event. The given ioe may be null. */
void handleEvent(AsyncEvent event, IOException ioe) {
if (closed || ioe != null) {
event.abort(ioe);
} else {
event.handle();
}
}
}
final String debugInterestOps(SelectableChannel channel) {
try {
SelectionKey key = channel.keyFor(selmgr.selector);
if (key == null) return "channel not registered with selector";
String keyInterestOps = key.isValid()
? "key.interestOps=" + key.interestOps() : "invalid key";
return String.format("channel registered with selector, %s, sa.interestOps=%s",
keyInterestOps,
((SelectorAttachment)key.attachment()).interestOps);
} catch (Throwable t) {
return String.valueOf(t);
}
}
/**
* Tracks multiple user level registrations associated with one NIO
* registration (SelectionKey). In this implementation, registrations
* are one-off and when an event is posted the registration is cancelled
* until explicitly registered again.
*
* <p> No external synchronization required as this class is only used
* by the SelectorManager thread. One of these objects required per
* connection.
*/
private static class SelectorAttachment {
private final SelectableChannel chan;
private final Selector selector;
private final Set<AsyncEvent> pending;
private final static Logger debug =
Utils.getDebugLogger("SelectorAttachment"::toString, Utils.DEBUG);
private int interestOps;
SelectorAttachment(SelectableChannel chan, Selector selector) {
this.pending = new HashSet<>();
this.chan = chan;
this.selector = selector;
}
void register(AsyncEvent e) throws ClosedChannelException {
int newOps = e.interestOps();
// re register interest if we are not already interested
// in the event. If the event is paused, then the pause will
// be taken into account later when resetInterestOps is called.
boolean reRegister = (interestOps & newOps) != newOps;
interestOps |= newOps;
pending.add(e);
if (debug.on())
debug.log("Registering %s for %d (%s)", e, newOps, reRegister);
if (reRegister) {
// first time registration happens here also
try {
chan.register(selector, interestOps, this);
} catch (Throwable x) {
abortPending(x);
}
} else if (!chan.isOpen()) {
abortPending(new ClosedChannelException());
}
}
/**
* Returns a Stream<AsyncEvents> containing only events that are
* registered with the given {@code interestOps}.
*/
Stream<AsyncEvent> events(int interestOps) {
return pending.stream()
.filter(ev -> (ev.interestOps() & interestOps) != 0);
}
/**
* Removes any events with the given {@code interestOps}, and if no
* events remaining, cancels the associated SelectionKey.
*/
void resetInterestOps(int interestOps) {
int newOps = 0;
Iterator<AsyncEvent> itr = pending.iterator();
while (itr.hasNext()) {
AsyncEvent event = itr.next();
int evops = event.interestOps();
if (event.repeating()) {
newOps |= evops;
continue;
}
if ((evops & interestOps) != 0) {
itr.remove();
} else {
newOps |= evops;
}
}
this.interestOps = newOps;
SelectionKey key = chan.keyFor(selector);
if (newOps == 0 && key != null && pending.isEmpty()) {
key.cancel();
} else {
try {
if (key == null || !key.isValid()) {
throw new CancelledKeyException();
}
key.interestOps(newOps);
// double check after
if (!chan.isOpen()) {
abortPending(new ClosedChannelException());
return;
}
assert key.interestOps() == newOps;
} catch (CancelledKeyException x) {
// channel may have been closed
if (debug.on()) debug.log("key cancelled for " + chan);
abortPending(x);
}
}
}
void abortPending(Throwable x) {
if (!pending.isEmpty()) {
AsyncEvent[] evts = pending.toArray(new AsyncEvent[0]);
pending.clear();
IOException io = Utils.getIOException(x);
for (AsyncEvent event : evts) {
event.abort(io);
}
}
}
}
/*package-private*/ SSLContext theSSLContext() {
return sslContext;
}
@Override
public SSLContext sslContext() {
return sslContext;
}
@Override
public SSLParameters sslParameters() {
return Utils.copySSLParameters(sslParams);
}
@Override
public Optional<Authenticator> authenticator() {
return Optional.ofNullable(authenticator);
}
/*package-private*/ final DelegatingExecutor theExecutor() {
return delegatingExecutor;
}
@Override
public final Optional<Executor> executor() {
return isDefaultExecutor
? Optional.empty()
: Optional.of(delegatingExecutor.delegate());
}
ConnectionPool connectionPool() {
return connections;
}
@Override
public Redirect followRedirects() {
return followRedirects;
}
@Override
public Optional<CookieHandler> cookieHandler() {
return Optional.ofNullable(cookieHandler);
}
@Override
public Optional<Duration> connectTimeout() {
return Optional.ofNullable(connectTimeout);
}
@Override
public Optional<ProxySelector> proxy() {
return Optional.ofNullable(userProxySelector);
}
// Return the effective proxy that this client uses.
ProxySelector proxySelector() {
return proxySelector;
}
@Override
public WebSocket.Builder newWebSocketBuilder() {
// Make sure to pass the HttpClientFacade to the WebSocket builder.
// This will ensure that the facade is not released before the
// WebSocket has been created, at which point the pendingOperationCount
// will have been incremented by the RawChannelTube.
// See RawChannelTube.
return new BuilderImpl(this.facade(), proxySelector);
}
@Override
public Version version() {
return version;
}
String dbgString() {
return dbgTag;
}
@Override
public String toString() {
// Used by tests to get the client's id and compute the
// name of the SelectorManager thread.
return super.toString() + ("(" + id + ")");
}
private void initFilters() {
addFilter(AuthenticationFilter.class);
addFilter(RedirectFilter.class);
if (this.cookieHandler != null) {
addFilter(CookieFilter.class);
}
}
private void addFilter(Class<? extends HeaderFilter> f) {
filters.addFilter(f);
}
final LinkedList<HeaderFilter> filterChain() {
return filters.getFilterChain();
}
// Timer controls.
// Timers are implemented through timed Selector.select() calls.
synchronized void registerTimer(TimeoutEvent event) {
Log.logTrace("Registering timer {0}", event);
timeouts.add(event);
selmgr.wakeupSelector();
}
synchronized void cancelTimer(TimeoutEvent event) {
Log.logTrace("Canceling timer {0}", event);
timeouts.remove(event);
}
/**
* Purges ( handles ) timer events that have passed their deadline, and
* returns the amount of time, in milliseconds, until the next earliest
* event. A return value of 0 means that there are no events.
*/
private long purgeTimeoutsAndReturnNextDeadline() {
long diff = 0L;
List<TimeoutEvent> toHandle = null;
int remaining = 0;
// enter critical section to retrieve the timeout event to handle
synchronized(this) {
if (timeouts.isEmpty()) return 0L;
Instant now = Instant.now();
Iterator<TimeoutEvent> itr = timeouts.iterator();
while (itr.hasNext()) {
TimeoutEvent event = itr.next();
diff = now.until(event.deadline(), ChronoUnit.MILLIS);
if (diff <= 0) {
itr.remove();
toHandle = (toHandle == null) ? new ArrayList<>() : toHandle;
toHandle.add(event);
} else {
break;
}
}
remaining = timeouts.size();
}
// can be useful for debugging
if (toHandle != null && Log.trace()) {
Log.logTrace("purgeTimeoutsAndReturnNextDeadline: handling "
+ toHandle.size() + " events, "
+ "remaining " + remaining
+ ", next deadline: " + (diff < 0 ? 0L : diff));
}
// handle timeout events out of critical section
if (toHandle != null) {
Throwable failed = null;
for (TimeoutEvent event : toHandle) {
try {
Log.logTrace("Firing timer {0}", event);
event.handle();
} catch (Error | RuntimeException e) {
// Not expected. Handle remaining events then throw...
// If e is an OOME or SOE it might simply trigger a new
// error from here - but in this case there's not much we
// could do anyway. Just let it flow...
if (failed == null) failed = e;
else failed.addSuppressed(e);
Log.logTrace("Failed to handle event {0}: {1}", event, e);
}
}
if (failed instanceof Error) throw (Error) failed;
if (failed instanceof RuntimeException) throw (RuntimeException) failed;
}
// return time to wait until next event. 0L if there's no more events.
return diff < 0 ? 0L : diff;
}
// used for the connection window
int getReceiveBufferSize() {
return Utils.getIntegerNetProperty(
"jdk.httpclient.receiveBufferSize",
0 // only set the size if > 0
);
}
// Optimization for reading SSL encrypted data
// --------------------------------------------
// Returns a BufferSupplier that can be used for reading
// encrypted bytes of the channel. These buffers can then
// be recycled by the SSLFlowDelegate::Reader after their
// content has been copied in the SSLFlowDelegate::Reader
// readBuf.
// Because allocating, reading, copying, and recycling
// all happen in the SelectorManager thread,
// then this BufferSupplier can be shared between all
// the SSL connections managed by this client.
BufferSupplier getSSLBufferSupplier() {
return sslBufferSupplier;
}
// An implementation of BufferSupplier that manages a pool of
// maximum 3 direct byte buffers (SocketTube.MAX_BUFFERS) that
// are used for reading encrypted bytes off the channel before
// copying and subsequent unwrapping.
private static final class SSLDirectBufferSupplier implements BufferSupplier {
private static final int POOL_SIZE = SocketTube.MAX_BUFFERS;
private final ByteBuffer[] pool = new ByteBuffer[POOL_SIZE];
private final HttpClientImpl client;
private final Logger debug;
private int tail, count; // no need for volatile: only accessed in SM thread.
SSLDirectBufferSupplier(HttpClientImpl client) {
this.client = Objects.requireNonNull(client);
this.debug = client.debug;
}
// Gets a buffer from the pool, or allocates a new one if needed.
@Override
public ByteBuffer get() {
assert client.isSelectorThread();
assert tail <= POOL_SIZE : "allocate tail is " + tail;
ByteBuffer buf;
if (tail == 0) {
if (debug.on()) {
// should not appear more than SocketTube.MAX_BUFFERS
debug.log("ByteBuffer.allocateDirect(%d)", Utils.BUFSIZE);
}
assert count++ < POOL_SIZE : "trying to allocate more than "
+ POOL_SIZE + " buffers";
buf = ByteBuffer.allocateDirect(Utils.BUFSIZE);
} else {
assert tail > 0 : "non positive tail value: " + tail;
tail--;
buf = pool[tail];
pool[tail] = null;
}
assert buf.isDirect();
assert buf.position() == 0;
assert buf.hasRemaining();
assert buf.limit() == Utils.BUFSIZE;
assert tail < POOL_SIZE;
assert tail >= 0;
return buf;
}
// Returns the given buffer to the pool.
@Override
public void recycle(ByteBuffer buffer) {
assert client.isSelectorThread();
assert buffer.isDirect();
assert !buffer.hasRemaining();
assert tail < POOL_SIZE : "recycle tail is " + tail;
assert tail >= 0;
buffer.position(0);
buffer.limit(buffer.capacity());
// don't fail if assertions are off. we have asserted above.
if (tail < POOL_SIZE) {
pool[tail] = buffer;
tail++;
}
assert tail <= POOL_SIZE;
assert tail > 0;
}
}
}