http-client-branch: performance - reduce context switching http-client-branch
authordfuchs
Tue, 29 May 2018 13:42:04 +0100
branchhttp-client-branch
changeset 56621 a85c163fc41c
parent 56619 57f17e890a40
child 56623 1d020b5d73f1
http-client-branch: performance - reduce context switching
src/java.net.http/share/classes/java/net/http/HttpClient.java
src/java.net.http/share/classes/jdk/internal/net/http/Exchange.java
src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java
src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java
src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java
src/java.net.http/share/classes/jdk/internal/net/http/MultiExchange.java
src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java
src/java.net.http/share/classes/jdk/internal/net/http/Stream.java
--- a/src/java.net.http/share/classes/java/net/http/HttpClient.java	Fri May 25 16:13:11 2018 +0100
+++ b/src/java.net.http/share/classes/java/net/http/HttpClient.java	Tue May 29 13:42:04 2018 +0100
@@ -232,9 +232,9 @@
          *
          * <p> If this method is not invoked prior to {@linkplain #build()
          * building}, a default executor is created for each newly built {@code
-         * HttpClient}. The default executor uses a {@linkplain
-         * Executors#newCachedThreadPool(ThreadFactory) cached thread pool},
-         * with a custom thread factory.
+         * HttpClient}. The default executor uses a suitable {@linkplain
+         * java.util.concurrent.ThreadPoolExecutor thread pool}, with a custom
+         * thread factory.
          *
          * @implNote If a security manager has been installed, the thread
          * factory creates threads that run with an access control context that
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Exchange.java	Fri May 25 16:13:11 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Exchange.java	Tue May 29 13:42:04 2018 +0100
@@ -332,6 +332,7 @@
             int rcode = r1.statusCode();
             if (rcode == 100) {
                 Log.logTrace("Received 100-Continue: sending body");
+                if (debug.on()) debug.log("Received 100-Continue for %s", r1);
                 CompletableFuture<Response> cf =
                         exchImpl.sendBodyAsync()
                                 .thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
@@ -341,6 +342,7 @@
             } else {
                 Log.logTrace("Expectation failed: Received {0}",
                         rcode);
+                if (debug.on()) debug.log("Expect-Continue failed (%d) for: %s", rcode, r1);
                 if (upgrading && rcode == 101) {
                     IOException failed = new IOException(
                             "Unable to handle 101 while waiting for 100");
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java	Fri May 25 16:13:11 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java	Tue May 29 13:42:04 2018 +0100
@@ -26,7 +26,6 @@
 package jdk.internal.net.http;
 
 import java.io.IOException;
-import java.lang.System.Logger.Level;
 import java.net.InetSocketAddress;
 import java.net.http.HttpResponse.BodyHandler;
 import java.net.http.HttpResponse.BodySubscriber;
@@ -530,43 +529,47 @@
         if (dp == null)  // publisher has not published anything yet
             return null;
 
-        synchronized (lock) {
-            if (dp.throwable != null) {
+        if (dp.throwable != null) {
+            synchronized (lock) {
                 state = State.ERROR;
-                exec.execute(() -> {
-                    headersSentCF.completeExceptionally(dp.throwable);
-                    bodySentCF.completeExceptionally(dp.throwable);
-                    connection.close();
-                });
-                return dp;
             }
-
-            switch (state) {
-                case HEADERS:
-                    state = State.BODY;
-                    // completeAsync, since dependent tasks should run in another thread
-                    if (debug.on()) debug.log("initiating completion of headersSentCF");
-                    headersSentCF.completeAsync(() -> this, exec);
-                    break;
-                case BODY:
-                    if (dp.data == Http1BodySubscriber.COMPLETED) {
-                        state = State.COMPLETING;
-                        if (debug.on()) debug.log("initiating completion of bodySentCF");
-                        bodySentCF.completeAsync(() -> this, exec);
-                    } else {
-                        exec.execute(this::requestMoreBody);
-                    }
-                    break;
-                case INITIAL:
-                case ERROR:
-                case COMPLETING:
-                case COMPLETED:
-                default:
-                    assert false : "Unexpected state:" + state;
-            }
-
+            exec.execute(() -> {
+                headersSentCF.completeExceptionally(dp.throwable);
+                bodySentCF.completeExceptionally(dp.throwable);
+                connection.close();
+            });
             return dp;
         }
+
+        switch (state) {
+            case HEADERS:
+                synchronized (lock) {
+                    state = State.BODY;
+                }
+                // completeAsync, since dependent tasks should run in another thread
+                if (debug.on()) debug.log("initiating completion of headersSentCF");
+                headersSentCF.completeAsync(() -> this, exec);
+                break;
+            case BODY:
+                if (dp.data == Http1BodySubscriber.COMPLETED) {
+                    synchronized (lock) {
+                        state = State.COMPLETING;
+                    }
+                    if (debug.on()) debug.log("initiating completion of bodySentCF");
+                    bodySentCF.completeAsync(() -> this, exec);
+                } else {
+                    exec.execute(this::requestMoreBody);
+                }
+                break;
+            case INITIAL:
+            case ERROR:
+            case COMPLETING:
+            case COMPLETED:
+            default:
+                assert false : "Unexpected state:" + state;
+        }
+
+        return dp;
     }
 
     /** A Publisher of HTTP/1.1 headers and request body. */
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java	Fri May 25 16:13:11 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java	Tue May 29 13:42:04 2018 +0100
@@ -847,8 +847,12 @@
             return;
         if (streamid % 2 == 1) {
             numReservedClientStreams--;
+            assert numReservedClientStreams >= 0 :
+                    "negative client stream count for stream=" + streamid;
         } else {
             numReservedServerStreams--;
+            assert numReservedServerStreams >= 0 :
+                    "negative server stream count for stream=" + streamid;
         }
     }
 
--- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java	Fri May 25 16:13:11 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java	Tue May 29 13:42:04 2018 +0100
@@ -56,13 +56,13 @@
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BooleanSupplier;
 import java.util.stream.Stream;
 import java.net.http.HttpClient;
 import java.net.http.HttpRequest;
@@ -121,6 +121,34 @@
         }
     }
 
+    /**
+     * A DelegatingExecutor is an executor that delegates tasks to
+     * a wrapped executor when it detects that the current thread
+     * is the SelectorManager thread. If the current thread is not
+     * the selector manager thread the given task is executed inline.
+     */
+    final static class DelegatingExecutor implements Executor {
+        private final BooleanSupplier isInSelectorThread;
+        private final Executor delegate;
+        DelegatingExecutor(BooleanSupplier isInSelectorThread, Executor delegate) {
+            this.isInSelectorThread = isInSelectorThread;
+            this.delegate = delegate;
+        }
+
+        Executor delegate() {
+            return delegate;
+        }
+
+        @Override
+        public void execute(Runnable command) {
+            if (isInSelectorThread.getAsBoolean()) {
+                delegate.execute(command);
+            } else {
+                command.run();
+            }
+        }
+    }
+
     private final CookieHandler cookieHandler;
     private final Redirect followRedirects;
     private final Optional<ProxySelector> userProxySelector;
@@ -128,7 +156,7 @@
     private final Authenticator authenticator;
     private final Version version;
     private final ConnectionPool connections;
-    private final Executor executor;
+    private final DelegatingExecutor delegatingExecutor;
     private final boolean isDefaultExecutor;
     // Security parameters
     private final SSLContext sslContext;
@@ -240,12 +268,11 @@
             ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id));
             isDefaultExecutor = true;
         } else {
-            ex = builder.executor;
             isDefaultExecutor = false;
         }
+        delegatingExecutor = new DelegatingExecutor(this::isSelectorThread, ex);
         facadeRef = new WeakReference<>(facadeFactory.createFacade(this));
         client2 = new Http2ClientImpl(this);
-        executor = ex;
         cookieHandler = builder.cookieHandler;
         followRedirects = builder.followRedirects == null ?
                 Redirect.NEVER : builder.followRedirects;
@@ -490,7 +517,7 @@
         throws IOException, InterruptedException
     {
         try {
-            return sendAsync(req, responseHandler, null).get();
+            return sendAsync(req, responseHandler, null, null).get();
         } catch (ExecutionException e) {
             Throwable t = e.getCause();
             if (t instanceof Error)
@@ -516,8 +543,16 @@
     public <T> CompletableFuture<HttpResponse<T>>
     sendAsync(HttpRequest userRequest,
               BodyHandler<T> responseHandler,
-              PushPromiseHandler<T> pushPromiseHandler)
-    {
+              PushPromiseHandler<T> pushPromiseHandler) {
+        return sendAsync(userRequest, responseHandler, pushPromiseHandler, delegatingExecutor.delegate);
+    }
+
+    private <T> CompletableFuture<HttpResponse<T>>
+    sendAsync(HttpRequest userRequest,
+              BodyHandler<T> responseHandler,
+              PushPromiseHandler<T> pushPromiseHandler,
+              Executor exchangeExecutor)    {
+
         Objects.requireNonNull(userRequest);
         Objects.requireNonNull(responseHandler);
 
@@ -536,9 +571,17 @@
             if (debugelapsed.on())
                 debugelapsed.log("ClientImpl (async) send %s", userRequest);
 
-            Executor executor = acc == null
-                    ? this.executor
-                    : new PrivilegedExecutor(this.executor, acc);
+            // When using sendAsync(...) we explicitly pass the
+            // executor's delegate as exchange executor to force
+            // asynchronous scheduling of the exchange.
+            // When using send(...) we don't specify any executor
+            // and default to using the client's delegating executor
+            // which only spawns asynchronous tasks if it detects
+            // that the current thread is the selector manager
+            // thread. This will cause everything to execute inline
+            // until we need to schedule some event with the selector.
+            Executor executor = exchangeExecutor == null
+                    ? this.delegatingExecutor : exchangeExecutor;
 
             MultiExchange<T> mex = new MultiExchange<>(userRequest,
                                                             requestImpl,
@@ -547,15 +590,20 @@
                                                             pushPromiseHandler,
                                                             acc);
             CompletableFuture<HttpResponse<T>> res =
-                    mex.responseAsync().whenComplete((b,t) -> unreference());
+                    mex.responseAsync(executor).whenComplete((b,t) -> unreference());
             if (DEBUGELAPSED) {
                 res = res.whenComplete(
                         (b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));
             }
 
-            // makes sure that any dependent actions happen in the executor
-            res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, executor);
-
+            // makes sure that any dependent actions happen in the executor.
+            // we only need to do that for sendAsync(...), when exchangeExecutor
+            // is non null.
+            if (exchangeExecutor != null) {
+                executor = acc == null ? executor
+                        : new PrivilegedExecutor(executor, acc);
+                res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, executor);
+            }
             return res;
         } catch(Throwable t) {
             unreference();
@@ -1013,13 +1061,15 @@
         return Optional.ofNullable(authenticator);
     }
 
-    /*package-private*/ final Executor theExecutor() {
-        return executor;
+    /*package-private*/ final DelegatingExecutor theExecutor() {
+        return delegatingExecutor;
     }
 
     @Override
     public final Optional<Executor> executor() {
-        return isDefaultExecutor ? Optional.empty() : Optional.of(executor);
+        return isDefaultExecutor
+                ? Optional.empty()
+                : Optional.of(delegatingExecutor.delegate());
     }
 
     ConnectionPool connectionPool() {
--- a/src/java.net.http/share/classes/jdk/internal/net/http/MultiExchange.java	Fri May 25 16:13:11 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/MultiExchange.java	Tue May 29 13:42:04 2018 +0100
@@ -26,7 +26,6 @@
 package jdk.internal.net.http;
 
 import java.io.IOException;
-import java.lang.System.Logger.Level;
 import java.net.ConnectException;
 import java.time.Duration;
 import java.util.Iterator;
@@ -70,7 +69,7 @@
     final AccessControlContext acc;
     final HttpClientImpl client;
     final HttpResponse.BodyHandler<T> responseHandler;
-    final Executor executor;
+    final HttpClientImpl.DelegatingExecutor executor;
     final AtomicInteger attempts = new AtomicInteger();
     HttpRequestImpl currentreq; // used for retries & redirect
     HttpRequestImpl previousreq; // used for retries & redirect
@@ -125,8 +124,8 @@
 
         if (pushPromiseHandler != null) {
             Executor executor = acc == null
-                    ? this.executor
-                    : new PrivilegedExecutor(this.executor, acc);
+                    ? this.executor.delegate()
+                    : new PrivilegedExecutor(this.executor.delegate(), acc);
             this.pushGroup = new PushGroup<>(pushPromiseHandler, request, executor);
         } else {
             pushGroup = null;
@@ -194,7 +193,7 @@
         getExchange().cancel(cause);
     }
 
-    public CompletableFuture<HttpResponse<T>> responseAsync() {
+    public CompletableFuture<HttpResponse<T>> responseAsync(Executor executor) {
         CompletableFuture<Void> start = new MinimalFuture<>();
         CompletableFuture<HttpResponse<T>> cf = responseAsync0(start);
         start.completeAsync( () -> null, executor); // trigger execution
--- a/src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java	Fri May 25 16:13:11 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java	Tue May 29 13:42:04 2018 +0100
@@ -26,10 +26,8 @@
 package jdk.internal.net.http;
 
 import java.io.IOException;
-import java.lang.System.Logger.Level;
 import java.net.InetSocketAddress;
 import java.net.StandardSocketOptions;
-import java.nio.ByteBuffer;
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java	Fri May 25 16:13:11 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java	Tue May 29 13:42:04 2018 +0100
@@ -267,6 +267,7 @@
     {
         try {
             Log.logTrace("Reading body on stream {0}", streamid);
+            debug.log("Getting BodySubscriber for: " + response);
             BodySubscriber<T> bodySubscriber = handler.apply(new ResponseInfoImpl(response));
             CompletableFuture<T> cf = receiveData(bodySubscriber, executor);
 
@@ -986,14 +987,18 @@
                 if (!cf.isDone()) {
                     Log.logTrace("Completing response (streamid={0}): {1}",
                                  streamid, cf);
+                    if (debug.on())
+                        debug.log("Completing responseCF(%d) with response headers", i);
+                    response_cfs.remove(cf);
                     cf.complete(resp);
-                    response_cfs.remove(cf);
                     return;
                 } // else we found the previous response: just leave it alone.
             }
             cf = MinimalFuture.completedFuture(resp);
             Log.logTrace("Created completed future (streamid={0}): {1}",
                          streamid, cf);
+            if (debug.on())
+                debug.log("Adding completed responseCF(0) with response headers");
             response_cfs.add(cf);
         }
     }
@@ -1024,8 +1029,8 @@
             for (int i = 0; i < response_cfs.size(); i++) {
                 CompletableFuture<Response> cf = response_cfs.get(i);
                 if (!cf.isDone()) {
+                    response_cfs.remove(i);
                     cf.completeExceptionally(t);
-                    response_cfs.remove(i);
                     return;
                 }
             }
@@ -1311,6 +1316,7 @@
         void reset() {
             super.reset();
             responseHeadersBuilder.clear();
+            debug.log("Response builder cleared, ready to receive new headers.");
         }
 
         @Override