--- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java Fri May 25 16:13:11 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java Tue May 29 13:42:04 2018 +0100
@@ -56,13 +56,13 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BooleanSupplier;
import java.util.stream.Stream;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
@@ -121,6 +121,34 @@
}
}
+ /**
+ * A DelegatingExecutor is an executor that delegates tasks to
+ * a wrapped executor when it detects that the current thread
+ * is the SelectorManager thread. If the current thread is not
+ * the selector manager thread the given task is executed inline.
+ */
+ final static class DelegatingExecutor implements Executor {
+ private final BooleanSupplier isInSelectorThread;
+ private final Executor delegate;
+ DelegatingExecutor(BooleanSupplier isInSelectorThread, Executor delegate) {
+ this.isInSelectorThread = isInSelectorThread;
+ this.delegate = delegate;
+ }
+
+ Executor delegate() {
+ return delegate;
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ if (isInSelectorThread.getAsBoolean()) {
+ delegate.execute(command);
+ } else {
+ command.run();
+ }
+ }
+ }
+
private final CookieHandler cookieHandler;
private final Redirect followRedirects;
private final Optional<ProxySelector> userProxySelector;
@@ -128,7 +156,7 @@
private final Authenticator authenticator;
private final Version version;
private final ConnectionPool connections;
- private final Executor executor;
+ private final DelegatingExecutor delegatingExecutor;
private final boolean isDefaultExecutor;
// Security parameters
private final SSLContext sslContext;
@@ -240,12 +268,11 @@
ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id));
isDefaultExecutor = true;
} else {
- ex = builder.executor;
isDefaultExecutor = false;
}
+ delegatingExecutor = new DelegatingExecutor(this::isSelectorThread, ex);
facadeRef = new WeakReference<>(facadeFactory.createFacade(this));
client2 = new Http2ClientImpl(this);
- executor = ex;
cookieHandler = builder.cookieHandler;
followRedirects = builder.followRedirects == null ?
Redirect.NEVER : builder.followRedirects;
@@ -490,7 +517,7 @@
throws IOException, InterruptedException
{
try {
- return sendAsync(req, responseHandler, null).get();
+ return sendAsync(req, responseHandler, null, null).get();
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof Error)
@@ -516,8 +543,16 @@
public <T> CompletableFuture<HttpResponse<T>>
sendAsync(HttpRequest userRequest,
BodyHandler<T> responseHandler,
- PushPromiseHandler<T> pushPromiseHandler)
- {
+ PushPromiseHandler<T> pushPromiseHandler) {
+ return sendAsync(userRequest, responseHandler, pushPromiseHandler, delegatingExecutor.delegate);
+ }
+
+ private <T> CompletableFuture<HttpResponse<T>>
+ sendAsync(HttpRequest userRequest,
+ BodyHandler<T> responseHandler,
+ PushPromiseHandler<T> pushPromiseHandler,
+ Executor exchangeExecutor) {
+
Objects.requireNonNull(userRequest);
Objects.requireNonNull(responseHandler);
@@ -536,9 +571,17 @@
if (debugelapsed.on())
debugelapsed.log("ClientImpl (async) send %s", userRequest);
- Executor executor = acc == null
- ? this.executor
- : new PrivilegedExecutor(this.executor, acc);
+ // When using sendAsync(...) we explicitly pass the
+ // executor's delegate as exchange executor to force
+ // asynchronous scheduling of the exchange.
+ // When using send(...) we don't specify any executor
+ // and default to using the client's delegating executor
+ // which only spawns asynchronous tasks if it detects
+ // that the current thread is the selector manager
+ // thread. This will cause everything to execute inline
+ // until we need to schedule some event with the selector.
+ Executor executor = exchangeExecutor == null
+ ? this.delegatingExecutor : exchangeExecutor;
MultiExchange<T> mex = new MultiExchange<>(userRequest,
requestImpl,
@@ -547,15 +590,20 @@
pushPromiseHandler,
acc);
CompletableFuture<HttpResponse<T>> res =
- mex.responseAsync().whenComplete((b,t) -> unreference());
+ mex.responseAsync(executor).whenComplete((b,t) -> unreference());
if (DEBUGELAPSED) {
res = res.whenComplete(
(b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));
}
- // makes sure that any dependent actions happen in the executor
- res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, executor);
-
+ // makes sure that any dependent actions happen in the executor.
+ // we only need to do that for sendAsync(...), when exchangeExecutor
+ // is non null.
+ if (exchangeExecutor != null) {
+ executor = acc == null ? executor
+ : new PrivilegedExecutor(executor, acc);
+ res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, executor);
+ }
return res;
} catch(Throwable t) {
unreference();
@@ -1013,13 +1061,15 @@
return Optional.ofNullable(authenticator);
}
- /*package-private*/ final Executor theExecutor() {
- return executor;
+ /*package-private*/ final DelegatingExecutor theExecutor() {
+ return delegatingExecutor;
}
@Override
public final Optional<Executor> executor() {
- return isDefaultExecutor ? Optional.empty() : Optional.of(executor);
+ return isDefaultExecutor
+ ? Optional.empty()
+ : Optional.of(delegatingExecutor.delegate());
}
ConnectionPool connectionPool() {