# HG changeset patch # User dfuchs # Date 1527597724 -3600 # Node ID a85c163fc41ccfb195d3ffae737ec6bf46084248 # Parent 57f17e890a4061c0b074079f4fcd26c47a13282b http-client-branch: performance - reduce context switching diff -r 57f17e890a40 -r a85c163fc41c src/java.net.http/share/classes/java/net/http/HttpClient.java --- a/src/java.net.http/share/classes/java/net/http/HttpClient.java Fri May 25 16:13:11 2018 +0100 +++ b/src/java.net.http/share/classes/java/net/http/HttpClient.java Tue May 29 13:42:04 2018 +0100 @@ -232,9 +232,9 @@ * *

If this method is not invoked prior to {@linkplain #build() * building}, a default executor is created for each newly built {@code - * HttpClient}. The default executor uses a {@linkplain - * Executors#newCachedThreadPool(ThreadFactory) cached thread pool}, - * with a custom thread factory. + * HttpClient}. The default executor uses a suitable {@linkplain + * java.util.concurrent.ThreadPoolExecutor thread pool}, with a custom + * thread factory. * * @implNote If a security manager has been installed, the thread * factory creates threads that run with an access control context that diff -r 57f17e890a40 -r a85c163fc41c src/java.net.http/share/classes/jdk/internal/net/http/Exchange.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/Exchange.java Fri May 25 16:13:11 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Exchange.java Tue May 29 13:42:04 2018 +0100 @@ -332,6 +332,7 @@ int rcode = r1.statusCode(); if (rcode == 100) { Log.logTrace("Received 100-Continue: sending body"); + if (debug.on()) debug.log("Received 100-Continue for %s", r1); CompletableFuture cf = exchImpl.sendBodyAsync() .thenCompose(exIm -> exIm.getResponseAsync(parentExecutor)); @@ -341,6 +342,7 @@ } else { Log.logTrace("Expectation failed: Received {0}", rcode); + if (debug.on()) debug.log("Expect-Continue failed (%d) for: %s", rcode, r1); if (upgrading && rcode == 101) { IOException failed = new IOException( "Unable to handle 101 while waiting for 100"); diff -r 57f17e890a40 -r a85c163fc41c src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java Fri May 25 16:13:11 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java Tue May 29 13:42:04 2018 +0100 @@ -26,7 +26,6 @@ package jdk.internal.net.http; import java.io.IOException; -import java.lang.System.Logger.Level; import java.net.InetSocketAddress; import java.net.http.HttpResponse.BodyHandler; import java.net.http.HttpResponse.BodySubscriber; @@ -530,43 +529,47 @@ if (dp == null) // publisher has not published anything yet return null; - synchronized (lock) { - if (dp.throwable != null) { + if (dp.throwable != null) { + synchronized (lock) { state = State.ERROR; - exec.execute(() -> { - headersSentCF.completeExceptionally(dp.throwable); - bodySentCF.completeExceptionally(dp.throwable); - connection.close(); - }); - return dp; } - - switch (state) { - case HEADERS: - state = State.BODY; - // completeAsync, since dependent tasks should run in another thread - if (debug.on()) debug.log("initiating completion of headersSentCF"); - headersSentCF.completeAsync(() -> this, exec); - break; - case BODY: - if (dp.data == Http1BodySubscriber.COMPLETED) { - state = State.COMPLETING; - if (debug.on()) debug.log("initiating completion of bodySentCF"); - bodySentCF.completeAsync(() -> this, exec); - } else { - exec.execute(this::requestMoreBody); - } - break; - case INITIAL: - case ERROR: - case COMPLETING: - case COMPLETED: - default: - assert false : "Unexpected state:" + state; - } - + exec.execute(() -> { + headersSentCF.completeExceptionally(dp.throwable); + bodySentCF.completeExceptionally(dp.throwable); + connection.close(); + }); return dp; } + + switch (state) { + case HEADERS: + synchronized (lock) { + state = State.BODY; + } + // completeAsync, since dependent tasks should run in another thread + if (debug.on()) debug.log("initiating completion of headersSentCF"); + headersSentCF.completeAsync(() -> this, exec); + break; + case BODY: + if (dp.data == Http1BodySubscriber.COMPLETED) { + synchronized (lock) { + state = State.COMPLETING; + } + if (debug.on()) debug.log("initiating completion of bodySentCF"); + bodySentCF.completeAsync(() -> this, exec); + } else { + exec.execute(this::requestMoreBody); + } + break; + case INITIAL: + case ERROR: + case COMPLETING: + case COMPLETED: + default: + assert false : "Unexpected state:" + state; + } + + return dp; } /** A Publisher of HTTP/1.1 headers and request body. */ diff -r 57f17e890a40 -r a85c163fc41c src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java Fri May 25 16:13:11 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java Tue May 29 13:42:04 2018 +0100 @@ -847,8 +847,12 @@ return; if (streamid % 2 == 1) { numReservedClientStreams--; + assert numReservedClientStreams >= 0 : + "negative client stream count for stream=" + streamid; } else { numReservedServerStreams--; + assert numReservedServerStreams >= 0 : + "negative server stream count for stream=" + streamid; } } diff -r 57f17e890a40 -r a85c163fc41c src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java Fri May 25 16:13:11 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java Tue May 29 13:42:04 2018 +0100 @@ -56,13 +56,13 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BooleanSupplier; import java.util.stream.Stream; import java.net.http.HttpClient; import java.net.http.HttpRequest; @@ -121,6 +121,34 @@ } } + /** + * A DelegatingExecutor is an executor that delegates tasks to + * a wrapped executor when it detects that the current thread + * is the SelectorManager thread. If the current thread is not + * the selector manager thread the given task is executed inline. + */ + final static class DelegatingExecutor implements Executor { + private final BooleanSupplier isInSelectorThread; + private final Executor delegate; + DelegatingExecutor(BooleanSupplier isInSelectorThread, Executor delegate) { + this.isInSelectorThread = isInSelectorThread; + this.delegate = delegate; + } + + Executor delegate() { + return delegate; + } + + @Override + public void execute(Runnable command) { + if (isInSelectorThread.getAsBoolean()) { + delegate.execute(command); + } else { + command.run(); + } + } + } + private final CookieHandler cookieHandler; private final Redirect followRedirects; private final Optional userProxySelector; @@ -128,7 +156,7 @@ private final Authenticator authenticator; private final Version version; private final ConnectionPool connections; - private final Executor executor; + private final DelegatingExecutor delegatingExecutor; private final boolean isDefaultExecutor; // Security parameters private final SSLContext sslContext; @@ -240,12 +268,11 @@ ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id)); isDefaultExecutor = true; } else { - ex = builder.executor; isDefaultExecutor = false; } + delegatingExecutor = new DelegatingExecutor(this::isSelectorThread, ex); facadeRef = new WeakReference<>(facadeFactory.createFacade(this)); client2 = new Http2ClientImpl(this); - executor = ex; cookieHandler = builder.cookieHandler; followRedirects = builder.followRedirects == null ? Redirect.NEVER : builder.followRedirects; @@ -490,7 +517,7 @@ throws IOException, InterruptedException { try { - return sendAsync(req, responseHandler, null).get(); + return sendAsync(req, responseHandler, null, null).get(); } catch (ExecutionException e) { Throwable t = e.getCause(); if (t instanceof Error) @@ -516,8 +543,16 @@ public CompletableFuture> sendAsync(HttpRequest userRequest, BodyHandler responseHandler, - PushPromiseHandler pushPromiseHandler) - { + PushPromiseHandler pushPromiseHandler) { + return sendAsync(userRequest, responseHandler, pushPromiseHandler, delegatingExecutor.delegate); + } + + private CompletableFuture> + sendAsync(HttpRequest userRequest, + BodyHandler responseHandler, + PushPromiseHandler pushPromiseHandler, + Executor exchangeExecutor) { + Objects.requireNonNull(userRequest); Objects.requireNonNull(responseHandler); @@ -536,9 +571,17 @@ if (debugelapsed.on()) debugelapsed.log("ClientImpl (async) send %s", userRequest); - Executor executor = acc == null - ? this.executor - : new PrivilegedExecutor(this.executor, acc); + // When using sendAsync(...) we explicitly pass the + // executor's delegate as exchange executor to force + // asynchronous scheduling of the exchange. + // When using send(...) we don't specify any executor + // and default to using the client's delegating executor + // which only spawns asynchronous tasks if it detects + // that the current thread is the selector manager + // thread. This will cause everything to execute inline + // until we need to schedule some event with the selector. + Executor executor = exchangeExecutor == null + ? this.delegatingExecutor : exchangeExecutor; MultiExchange mex = new MultiExchange<>(userRequest, requestImpl, @@ -547,15 +590,20 @@ pushPromiseHandler, acc); CompletableFuture> res = - mex.responseAsync().whenComplete((b,t) -> unreference()); + mex.responseAsync(executor).whenComplete((b,t) -> unreference()); if (DEBUGELAPSED) { res = res.whenComplete( (b,t) -> debugCompleted("ClientImpl (async)", start, userRequest)); } - // makes sure that any dependent actions happen in the executor - res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, executor); - + // makes sure that any dependent actions happen in the executor. + // we only need to do that for sendAsync(...), when exchangeExecutor + // is non null. + if (exchangeExecutor != null) { + executor = acc == null ? executor + : new PrivilegedExecutor(executor, acc); + res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, executor); + } return res; } catch(Throwable t) { unreference(); @@ -1013,13 +1061,15 @@ return Optional.ofNullable(authenticator); } - /*package-private*/ final Executor theExecutor() { - return executor; + /*package-private*/ final DelegatingExecutor theExecutor() { + return delegatingExecutor; } @Override public final Optional executor() { - return isDefaultExecutor ? Optional.empty() : Optional.of(executor); + return isDefaultExecutor + ? Optional.empty() + : Optional.of(delegatingExecutor.delegate()); } ConnectionPool connectionPool() { diff -r 57f17e890a40 -r a85c163fc41c src/java.net.http/share/classes/jdk/internal/net/http/MultiExchange.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/MultiExchange.java Fri May 25 16:13:11 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/MultiExchange.java Tue May 29 13:42:04 2018 +0100 @@ -26,7 +26,6 @@ package jdk.internal.net.http; import java.io.IOException; -import java.lang.System.Logger.Level; import java.net.ConnectException; import java.time.Duration; import java.util.Iterator; @@ -70,7 +69,7 @@ final AccessControlContext acc; final HttpClientImpl client; final HttpResponse.BodyHandler responseHandler; - final Executor executor; + final HttpClientImpl.DelegatingExecutor executor; final AtomicInteger attempts = new AtomicInteger(); HttpRequestImpl currentreq; // used for retries & redirect HttpRequestImpl previousreq; // used for retries & redirect @@ -125,8 +124,8 @@ if (pushPromiseHandler != null) { Executor executor = acc == null - ? this.executor - : new PrivilegedExecutor(this.executor, acc); + ? this.executor.delegate() + : new PrivilegedExecutor(this.executor.delegate(), acc); this.pushGroup = new PushGroup<>(pushPromiseHandler, request, executor); } else { pushGroup = null; @@ -194,7 +193,7 @@ getExchange().cancel(cause); } - public CompletableFuture> responseAsync() { + public CompletableFuture> responseAsync(Executor executor) { CompletableFuture start = new MinimalFuture<>(); CompletableFuture> cf = responseAsync0(start); start.completeAsync( () -> null, executor); // trigger execution diff -r 57f17e890a40 -r a85c163fc41c src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java Fri May 25 16:13:11 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java Tue May 29 13:42:04 2018 +0100 @@ -26,10 +26,8 @@ package jdk.internal.net.http; import java.io.IOException; -import java.lang.System.Logger.Level; import java.net.InetSocketAddress; import java.net.StandardSocketOptions; -import java.nio.ByteBuffer; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; diff -r 57f17e890a40 -r a85c163fc41c src/java.net.http/share/classes/jdk/internal/net/http/Stream.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java Fri May 25 16:13:11 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java Tue May 29 13:42:04 2018 +0100 @@ -267,6 +267,7 @@ { try { Log.logTrace("Reading body on stream {0}", streamid); + debug.log("Getting BodySubscriber for: " + response); BodySubscriber bodySubscriber = handler.apply(new ResponseInfoImpl(response)); CompletableFuture cf = receiveData(bodySubscriber, executor); @@ -986,14 +987,18 @@ if (!cf.isDone()) { Log.logTrace("Completing response (streamid={0}): {1}", streamid, cf); + if (debug.on()) + debug.log("Completing responseCF(%d) with response headers", i); + response_cfs.remove(cf); cf.complete(resp); - response_cfs.remove(cf); return; } // else we found the previous response: just leave it alone. } cf = MinimalFuture.completedFuture(resp); Log.logTrace("Created completed future (streamid={0}): {1}", streamid, cf); + if (debug.on()) + debug.log("Adding completed responseCF(0) with response headers"); response_cfs.add(cf); } } @@ -1024,8 +1029,8 @@ for (int i = 0; i < response_cfs.size(); i++) { CompletableFuture cf = response_cfs.get(i); if (!cf.isDone()) { + response_cfs.remove(i); cf.completeExceptionally(t); - response_cfs.remove(i); return; } } @@ -1311,6 +1316,7 @@ void reset() { super.reset(); responseHeadersBuilder.clear(); + debug.log("Response builder cleared, ready to receive new headers."); } @Override