--- a/src/java.net.http/share/classes/java/net/http/HttpClient.java Fri May 25 16:13:11 2018 +0100
+++ b/src/java.net.http/share/classes/java/net/http/HttpClient.java Tue May 29 13:42:04 2018 +0100
@@ -232,9 +232,9 @@
*
* <p> If this method is not invoked prior to {@linkplain #build()
* building}, a default executor is created for each newly built {@code
- * HttpClient}. The default executor uses a {@linkplain
- * Executors#newCachedThreadPool(ThreadFactory) cached thread pool},
- * with a custom thread factory.
+ * HttpClient}. The default executor uses a suitable {@linkplain
+ * java.util.concurrent.ThreadPoolExecutor thread pool}, with a custom
+ * thread factory.
*
* @implNote If a security manager has been installed, the thread
* factory creates threads that run with an access control context that
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Exchange.java Fri May 25 16:13:11 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Exchange.java Tue May 29 13:42:04 2018 +0100
@@ -332,6 +332,7 @@
int rcode = r1.statusCode();
if (rcode == 100) {
Log.logTrace("Received 100-Continue: sending body");
+ if (debug.on()) debug.log("Received 100-Continue for %s", r1);
CompletableFuture<Response> cf =
exchImpl.sendBodyAsync()
.thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
@@ -341,6 +342,7 @@
} else {
Log.logTrace("Expectation failed: Received {0}",
rcode);
+ if (debug.on()) debug.log("Expect-Continue failed (%d) for: %s", rcode, r1);
if (upgrading && rcode == 101) {
IOException failed = new IOException(
"Unable to handle 101 while waiting for 100");
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java Fri May 25 16:13:11 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java Tue May 29 13:42:04 2018 +0100
@@ -26,7 +26,6 @@
package jdk.internal.net.http;
import java.io.IOException;
-import java.lang.System.Logger.Level;
import java.net.InetSocketAddress;
import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.BodySubscriber;
@@ -530,43 +529,47 @@
if (dp == null) // publisher has not published anything yet
return null;
- synchronized (lock) {
- if (dp.throwable != null) {
+ if (dp.throwable != null) {
+ synchronized (lock) {
state = State.ERROR;
- exec.execute(() -> {
- headersSentCF.completeExceptionally(dp.throwable);
- bodySentCF.completeExceptionally(dp.throwable);
- connection.close();
- });
- return dp;
}
-
- switch (state) {
- case HEADERS:
- state = State.BODY;
- // completeAsync, since dependent tasks should run in another thread
- if (debug.on()) debug.log("initiating completion of headersSentCF");
- headersSentCF.completeAsync(() -> this, exec);
- break;
- case BODY:
- if (dp.data == Http1BodySubscriber.COMPLETED) {
- state = State.COMPLETING;
- if (debug.on()) debug.log("initiating completion of bodySentCF");
- bodySentCF.completeAsync(() -> this, exec);
- } else {
- exec.execute(this::requestMoreBody);
- }
- break;
- case INITIAL:
- case ERROR:
- case COMPLETING:
- case COMPLETED:
- default:
- assert false : "Unexpected state:" + state;
- }
-
+ exec.execute(() -> {
+ headersSentCF.completeExceptionally(dp.throwable);
+ bodySentCF.completeExceptionally(dp.throwable);
+ connection.close();
+ });
return dp;
}
+
+ switch (state) {
+ case HEADERS:
+ synchronized (lock) {
+ state = State.BODY;
+ }
+ // completeAsync, since dependent tasks should run in another thread
+ if (debug.on()) debug.log("initiating completion of headersSentCF");
+ headersSentCF.completeAsync(() -> this, exec);
+ break;
+ case BODY:
+ if (dp.data == Http1BodySubscriber.COMPLETED) {
+ synchronized (lock) {
+ state = State.COMPLETING;
+ }
+ if (debug.on()) debug.log("initiating completion of bodySentCF");
+ bodySentCF.completeAsync(() -> this, exec);
+ } else {
+ exec.execute(this::requestMoreBody);
+ }
+ break;
+ case INITIAL:
+ case ERROR:
+ case COMPLETING:
+ case COMPLETED:
+ default:
+ assert false : "Unexpected state:" + state;
+ }
+
+ return dp;
}
/** A Publisher of HTTP/1.1 headers and request body. */
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java Fri May 25 16:13:11 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java Tue May 29 13:42:04 2018 +0100
@@ -847,8 +847,12 @@
return;
if (streamid % 2 == 1) {
numReservedClientStreams--;
+ assert numReservedClientStreams >= 0 :
+ "negative client stream count for stream=" + streamid;
} else {
numReservedServerStreams--;
+ assert numReservedServerStreams >= 0 :
+ "negative server stream count for stream=" + streamid;
}
}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java Fri May 25 16:13:11 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java Tue May 29 13:42:04 2018 +0100
@@ -56,13 +56,13 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BooleanSupplier;
import java.util.stream.Stream;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
@@ -121,6 +121,34 @@
}
}
+ /**
+ * A DelegatingExecutor is an executor that delegates tasks to
+ * a wrapped executor when it detects that the current thread
+ * is the SelectorManager thread. If the current thread is not
+ * the selector manager thread the given task is executed inline.
+ */
+ final static class DelegatingExecutor implements Executor {
+ private final BooleanSupplier isInSelectorThread;
+ private final Executor delegate;
+ DelegatingExecutor(BooleanSupplier isInSelectorThread, Executor delegate) {
+ this.isInSelectorThread = isInSelectorThread;
+ this.delegate = delegate;
+ }
+
+ Executor delegate() {
+ return delegate;
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ if (isInSelectorThread.getAsBoolean()) {
+ delegate.execute(command);
+ } else {
+ command.run();
+ }
+ }
+ }
+
private final CookieHandler cookieHandler;
private final Redirect followRedirects;
private final Optional<ProxySelector> userProxySelector;
@@ -128,7 +156,7 @@
private final Authenticator authenticator;
private final Version version;
private final ConnectionPool connections;
- private final Executor executor;
+ private final DelegatingExecutor delegatingExecutor;
private final boolean isDefaultExecutor;
// Security parameters
private final SSLContext sslContext;
@@ -240,12 +268,11 @@
ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id));
isDefaultExecutor = true;
} else {
- ex = builder.executor;
isDefaultExecutor = false;
}
+ delegatingExecutor = new DelegatingExecutor(this::isSelectorThread, ex);
facadeRef = new WeakReference<>(facadeFactory.createFacade(this));
client2 = new Http2ClientImpl(this);
- executor = ex;
cookieHandler = builder.cookieHandler;
followRedirects = builder.followRedirects == null ?
Redirect.NEVER : builder.followRedirects;
@@ -490,7 +517,7 @@
throws IOException, InterruptedException
{
try {
- return sendAsync(req, responseHandler, null).get();
+ return sendAsync(req, responseHandler, null, null).get();
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof Error)
@@ -516,8 +543,16 @@
public <T> CompletableFuture<HttpResponse<T>>
sendAsync(HttpRequest userRequest,
BodyHandler<T> responseHandler,
- PushPromiseHandler<T> pushPromiseHandler)
- {
+ PushPromiseHandler<T> pushPromiseHandler) {
+ return sendAsync(userRequest, responseHandler, pushPromiseHandler, delegatingExecutor.delegate);
+ }
+
+ private <T> CompletableFuture<HttpResponse<T>>
+ sendAsync(HttpRequest userRequest,
+ BodyHandler<T> responseHandler,
+ PushPromiseHandler<T> pushPromiseHandler,
+ Executor exchangeExecutor) {
+
Objects.requireNonNull(userRequest);
Objects.requireNonNull(responseHandler);
@@ -536,9 +571,17 @@
if (debugelapsed.on())
debugelapsed.log("ClientImpl (async) send %s", userRequest);
- Executor executor = acc == null
- ? this.executor
- : new PrivilegedExecutor(this.executor, acc);
+ // When using sendAsync(...) we explicitly pass the
+ // executor's delegate as exchange executor to force
+ // asynchronous scheduling of the exchange.
+ // When using send(...) we don't specify any executor
+ // and default to using the client's delegating executor
+ // which only spawns asynchronous tasks if it detects
+ // that the current thread is the selector manager
+ // thread. This will cause everything to execute inline
+ // until we need to schedule some event with the selector.
+ Executor executor = exchangeExecutor == null
+ ? this.delegatingExecutor : exchangeExecutor;
MultiExchange<T> mex = new MultiExchange<>(userRequest,
requestImpl,
@@ -547,15 +590,20 @@
pushPromiseHandler,
acc);
CompletableFuture<HttpResponse<T>> res =
- mex.responseAsync().whenComplete((b,t) -> unreference());
+ mex.responseAsync(executor).whenComplete((b,t) -> unreference());
if (DEBUGELAPSED) {
res = res.whenComplete(
(b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));
}
- // makes sure that any dependent actions happen in the executor
- res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, executor);
-
+ // makes sure that any dependent actions happen in the executor.
+ // we only need to do that for sendAsync(...), when exchangeExecutor
+ // is non null.
+ if (exchangeExecutor != null) {
+ executor = acc == null ? executor
+ : new PrivilegedExecutor(executor, acc);
+ res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, executor);
+ }
return res;
} catch(Throwable t) {
unreference();
@@ -1013,13 +1061,15 @@
return Optional.ofNullable(authenticator);
}
- /*package-private*/ final Executor theExecutor() {
- return executor;
+ /*package-private*/ final DelegatingExecutor theExecutor() {
+ return delegatingExecutor;
}
@Override
public final Optional<Executor> executor() {
- return isDefaultExecutor ? Optional.empty() : Optional.of(executor);
+ return isDefaultExecutor
+ ? Optional.empty()
+ : Optional.of(delegatingExecutor.delegate());
}
ConnectionPool connectionPool() {
--- a/src/java.net.http/share/classes/jdk/internal/net/http/MultiExchange.java Fri May 25 16:13:11 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/MultiExchange.java Tue May 29 13:42:04 2018 +0100
@@ -26,7 +26,6 @@
package jdk.internal.net.http;
import java.io.IOException;
-import java.lang.System.Logger.Level;
import java.net.ConnectException;
import java.time.Duration;
import java.util.Iterator;
@@ -70,7 +69,7 @@
final AccessControlContext acc;
final HttpClientImpl client;
final HttpResponse.BodyHandler<T> responseHandler;
- final Executor executor;
+ final HttpClientImpl.DelegatingExecutor executor;
final AtomicInteger attempts = new AtomicInteger();
HttpRequestImpl currentreq; // used for retries & redirect
HttpRequestImpl previousreq; // used for retries & redirect
@@ -125,8 +124,8 @@
if (pushPromiseHandler != null) {
Executor executor = acc == null
- ? this.executor
- : new PrivilegedExecutor(this.executor, acc);
+ ? this.executor.delegate()
+ : new PrivilegedExecutor(this.executor.delegate(), acc);
this.pushGroup = new PushGroup<>(pushPromiseHandler, request, executor);
} else {
pushGroup = null;
@@ -194,7 +193,7 @@
getExchange().cancel(cause);
}
- public CompletableFuture<HttpResponse<T>> responseAsync() {
+ public CompletableFuture<HttpResponse<T>> responseAsync(Executor executor) {
CompletableFuture<Void> start = new MinimalFuture<>();
CompletableFuture<HttpResponse<T>> cf = responseAsync0(start);
start.completeAsync( () -> null, executor); // trigger execution
--- a/src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java Fri May 25 16:13:11 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java Tue May 29 13:42:04 2018 +0100
@@ -26,10 +26,8 @@
package jdk.internal.net.http;
import java.io.IOException;
-import java.lang.System.Logger.Level;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
-import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java Fri May 25 16:13:11 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java Tue May 29 13:42:04 2018 +0100
@@ -267,6 +267,7 @@
{
try {
Log.logTrace("Reading body on stream {0}", streamid);
+ debug.log("Getting BodySubscriber for: " + response);
BodySubscriber<T> bodySubscriber = handler.apply(new ResponseInfoImpl(response));
CompletableFuture<T> cf = receiveData(bodySubscriber, executor);
@@ -986,14 +987,18 @@
if (!cf.isDone()) {
Log.logTrace("Completing response (streamid={0}): {1}",
streamid, cf);
+ if (debug.on())
+ debug.log("Completing responseCF(%d) with response headers", i);
+ response_cfs.remove(cf);
cf.complete(resp);
- response_cfs.remove(cf);
return;
} // else we found the previous response: just leave it alone.
}
cf = MinimalFuture.completedFuture(resp);
Log.logTrace("Created completed future (streamid={0}): {1}",
streamid, cf);
+ if (debug.on())
+ debug.log("Adding completed responseCF(0) with response headers");
response_cfs.add(cf);
}
}
@@ -1024,8 +1029,8 @@
for (int i = 0; i < response_cfs.size(); i++) {
CompletableFuture<Response> cf = response_cfs.get(i);
if (!cf.isDone()) {
+ response_cfs.remove(i);
cf.completeExceptionally(t);
- response_cfs.remove(i);
return;
}
}
@@ -1311,6 +1316,7 @@
void reset() {
super.reset();
responseHeadersBuilder.clear();
+ debug.log("Response builder cleared, ready to receive new headers.");
}
@Override