--- a/src/java.net.http/share/classes/jdk/internal/net/http/Exchange.java Wed Jun 13 15:45:27 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Exchange.java Wed Jun 13 19:11:47 2018 +0100
@@ -307,6 +307,7 @@
Function<ExchangeImpl<T>,CompletableFuture<Response>> andThen) {
t = Utils.getCompletionCause(t);
if (t instanceof ProxyAuthenticationRequired) {
+ if (debug.on()) debug.log("checkFor407: ProxyAuthenticationRequired: building synthetic response");
bodyIgnored = MinimalFuture.completedFuture(null);
Response proxyResponse = ((ProxyAuthenticationRequired)t).proxyResponse;
HttpConnection c = ex == null ? null : ex.connection();
@@ -315,8 +316,10 @@
proxyResponse.version, true);
return MinimalFuture.completedFuture(syntheticResponse);
} else if (t != null) {
+ if (debug.on()) debug.log("checkFor407: no response - %s", t);
return MinimalFuture.failedFuture(t);
} else {
+ if (debug.on()) debug.log("checkFor407: all clear");
return andThen.apply(ex);
}
}
@@ -359,6 +362,7 @@
// send the request body and proceed.
private CompletableFuture<Response> sendRequestBody(ExchangeImpl<T> ex) {
assert !request.expectContinue();
+ if (debug.on()) debug.log("sendRequestBody");
CompletableFuture<Response> cf = ex.sendBodyAsync()
.thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
cf = wrapForUpgrade(cf);
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java Wed Jun 13 15:45:27 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java Wed Jun 13 19:11:47 2018 +0100
@@ -27,7 +27,6 @@
import java.io.EOFException;
import java.io.IOException;
-import java.lang.System.Logger.Level;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashSet;
@@ -43,6 +42,7 @@
import jdk.internal.net.http.common.Demand;
import jdk.internal.net.http.common.FlowTube.TubeSubscriber;
import jdk.internal.net.http.common.Logger;
+import jdk.internal.net.http.common.MinimalFuture;
import jdk.internal.net.http.common.SequentialScheduler;
import jdk.internal.net.http.common.ConnectionExpiredException;
import jdk.internal.net.http.common.Utils;
@@ -166,6 +166,7 @@
= new ConcurrentLinkedDeque<>();
private final SequentialScheduler scheduler =
SequentialScheduler.synchronizedScheduler(this::flush);
+ final MinimalFuture<Void> whenFinished;
private final Executor executor;
private final Http1TubeSubscriber subscriber = new Http1TubeSubscriber();
private final AtomicReference<Http1AsyncDelegate> pendingDelegateRef;
@@ -184,6 +185,7 @@
public Http1AsyncReceiver(Executor executor, Http1Exchange<?> owner) {
this.pendingDelegateRef = new AtomicReference<>();
this.executor = executor;
+ this.whenFinished = new MinimalFuture<>();
this.owner = owner;
this.client = owner.client;
}
@@ -284,6 +286,7 @@
+ "\t\t queue.isEmpty: " + queue.isEmpty());
scheduler.stop();
delegate.onReadError(x);
+ whenFinished.completeExceptionally(x);
if (stopRequested) {
// This is the special case where the subscriber
// has requested an illegal number of items.
@@ -491,6 +494,7 @@
if (previous != null) previous.close(error);
delegate = null;
owner = null;
+ whenFinished.complete(null);
}
/**
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java Wed Jun 13 15:45:27 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java Wed Jun 13 19:11:47 2018 +0100
@@ -219,16 +219,16 @@
// create the response before sending the request headers, so that
// the response can set the appropriate receivers.
if (debug.on()) debug.log("Sending headers only");
+ // If the first attempt to read something triggers EOF, or
+ // IOException("channel reset by peer"), we're going to retry.
+ // Instruct the asyncReceiver to throw ConnectionExpiredException
+ // to force a retry.
+ asyncReceiver.setRetryOnError(true);
if (response == null) {
response = new Http1Response<>(connection, this, asyncReceiver);
}
if (debug.on()) debug.log("response created in advance");
- // If the first attempt to read something triggers EOF, or
- // IOException("channel reset by peer"), we're going to retry.
- // Instruct the asyncReceiver to throw ConnectionExpiredException
- // to force a retry.
- asyncReceiver.setRetryOnError(true);
CompletableFuture<Void> connectCF;
if (!connection.connected()) {
@@ -271,6 +271,8 @@
return cf;
} catch (Throwable t) {
if (debug.on()) debug.log("Failed to send headers: %s", t);
+ headersSentCF.completeExceptionally(t);
+ bodySentCF.completeExceptionally(t);
connection.close();
cf.completeExceptionally(t);
return cf;
@@ -278,28 +280,43 @@
.thenCompose(unused -> headersSentCF);
}
+ private void cancelIfFailed(Flow.Subscription s) {
+ asyncReceiver.whenFinished.whenCompleteAsync((r,t) -> {
+ if (debug.on()) debug.log("asyncReceiver finished (failed=%s)", t);
+ if (t != null) {
+ s.cancel();
+ bodySentCF.complete(this);
+ }
+ }, executor);
+ }
+
@Override
CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
assert headersSentCF.isDone();
+ if (debug.on()) debug.log("sendBodyAsync");
try {
bodySubscriber = requestAction.continueRequest();
+ if (debug.on()) debug.log("bodySubscriber is %s",
+ bodySubscriber == null ? null : bodySubscriber.getClass());
if (bodySubscriber == null) {
bodySubscriber = Http1BodySubscriber.completeSubscriber(debug);
appendToOutgoing(Http1BodySubscriber.COMPLETED);
} else {
// start
bodySubscriber.whenSubscribed
+ .thenAccept((s) -> cancelIfFailed(s))
.thenAccept((s) -> requestMoreBody());
}
} catch (Throwable t) {
cancelImpl(t);
bodySentCF.completeExceptionally(t);
}
- return bodySentCF;
+ return Utils.wrapForDebug(debug, "sendBodyAsync", bodySentCF);
}
@Override
CompletableFuture<Response> getResponseAsync(Executor executor) {
+ if (debug.on()) debug.log("reading headers");
CompletableFuture<Response> cf = response.readHeadersAsync(executor);
Throwable cause;
synchronized (lock) {
@@ -323,7 +340,7 @@
debug.log(acknowledged ? ("completed response with " + cause)
: ("response already completed, ignoring " + cause));
}
- return cf;
+ return Utils.wrapForDebug(debug, "getResponseAsync", cf);
}
@Override
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java Wed Jun 13 15:45:27 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java Wed Jun 13 19:11:47 2018 +0100
@@ -165,6 +165,7 @@
+ asyncReceiver.remaining() +") " + readProgress);
if (firstTimeAround) {
+ if (debug.on()) debug.log("First time around");
firstTimeAround = false;
} else {
// with expect continue we will resume reading headers + body.
@@ -180,6 +181,12 @@
CompletableFuture<State> cf = headersReader.completion();
assert cf != null : "parsing not started";
+ if (debug.on()) {
+ debug.log("headersReader is %s",
+ cf == null ? "not yet started"
+ : cf.isDone() ? "already completed"
+ : "not yet completed");
+ }
Function<State, Response> lambda = (State completed) -> {
assert completed == State.READING_HEADERS;
--- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Wed Jun 13 15:45:27 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Wed Jun 13 19:11:47 2018 +0100
@@ -775,7 +775,6 @@
+ " to subscriber " + subscriber);
current.errorRef.compareAndSet(null, error);
current.signalCompletion();
- writeSubscriber.subscription.cancel();
readScheduler.stop();
debugState("leaving read() loop with error: ");
return;
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java Wed Jun 13 15:45:27 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java Wed Jun 13 19:11:47 2018 +0100
@@ -309,7 +309,7 @@
synchronized (this) {
previous = pendingDelegate.getAndSet(delegateWrapper);
subscription = readSubscription;
- handleNow = this.errorRef.get() != null || finished || readSubscriber.onCompleteReceived;
+ handleNow = this.errorRef.get() != null || onCompleteReceived;
}
if (previous != null) {
previous.dropSubscription();
@@ -424,7 +424,7 @@
// if onError is invoked concurrently with setDelegate.
synchronized (this) {
failed = this.errorRef.get();
- completed = finished || onCompleteReceived;
+ completed = onCompleteReceived;
subscribed = subscriberImpl;
}
@@ -437,6 +437,7 @@
if (debug.on())
debug.log("onNewSubscription: subscriberImpl:%s, invoking onCompleted",
subscriberImpl);
+ finished = true;
subscriberImpl.onComplete();
}
}
@@ -497,7 +498,6 @@
@Override
public void onComplete() {
assert !finished && !onCompleteReceived;
- onCompleteReceived = true;
DelegateWrapper subscriberImpl;
synchronized(this) {
subscriberImpl = subscribed;
@@ -512,8 +512,10 @@
onErrorImpl(new SSLHandshakeException(
"Remote host terminated the handshake"));
} else if (subscriberImpl != null) {
- finished = true;
+ onCompleteReceived = finished = true;
subscriberImpl.onComplete();
+ } else {
+ onCompleteReceived = true;
}
// now if we have any pending subscriber, we should complete
// them immediately as the read scheduler will already be stopped.
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java Wed Jun 13 15:45:27 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java Wed Jun 13 19:11:47 2018 +0100
@@ -59,9 +59,11 @@
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.function.BiPredicate;
+import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -180,6 +182,17 @@
.collect(Collectors.toUnmodifiableSet());
}
+ public static <T> CompletableFuture<T> wrapForDebug(Logger logger, String name, CompletableFuture<T> cf) {
+ if (logger.on()) {
+ return cf.handle((r,t) -> {
+ logger.log("%s completed %s", name, t == null ? "successfully" : t );
+ return cf;
+ }).thenCompose(Function.identity());
+ } else {
+ return cf;
+ }
+ }
+
private static final String WSPACES = " \t\r\n";
private static final boolean isAllowedForProxy(String name,
String value,
--- a/test/jdk/java/net/httpclient/ShortResponseBody.java Wed Jun 13 15:45:27 2018 +0100
+++ b/test/jdk/java/net/httpclient/ShortResponseBody.java Wed Jun 13 19:11:47 2018 +0100
@@ -29,11 +29,9 @@
* @build jdk.testlibrary.SimpleSSLContext
* @run testng/othervm
* -Djdk.httpclient.HttpClient.log=headers,errors
- * -Djdk.internal.httpclient.debug=true
* ShortResponseBody
* @run testng/othervm
* -Djdk.httpclient.HttpClient.log=headers,errors
- * -Djdk.internal.httpclient.debug=true
* -Djdk.httpclient.enableAllMethodRetry
* ShortResponseBody
*/
@@ -278,7 +276,12 @@
@Override
public int read(byte[] buf, int offset, int length) {
- return length;
+ //int count = offset;
+ //length = Math.max(0, Math.min(buf.length - offset, length));
+ //for (; count < length; count++)
+ // buf[offset++] = 0x01;
+ //return count;
+ return Math.max(0, Math.min(buf.length - offset, length));
}
}