--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java Wed Jun 13 15:45:27 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java Wed Jun 13 19:11:47 2018 +0100
@@ -219,16 +219,16 @@
// create the response before sending the request headers, so that
// the response can set the appropriate receivers.
if (debug.on()) debug.log("Sending headers only");
+ // If the first attempt to read something triggers EOF, or
+ // IOException("channel reset by peer"), we're going to retry.
+ // Instruct the asyncReceiver to throw ConnectionExpiredException
+ // to force a retry.
+ asyncReceiver.setRetryOnError(true);
if (response == null) {
response = new Http1Response<>(connection, this, asyncReceiver);
}
if (debug.on()) debug.log("response created in advance");
- // If the first attempt to read something triggers EOF, or
- // IOException("channel reset by peer"), we're going to retry.
- // Instruct the asyncReceiver to throw ConnectionExpiredException
- // to force a retry.
- asyncReceiver.setRetryOnError(true);
CompletableFuture<Void> connectCF;
if (!connection.connected()) {
@@ -271,6 +271,8 @@
return cf;
} catch (Throwable t) {
if (debug.on()) debug.log("Failed to send headers: %s", t);
+ headersSentCF.completeExceptionally(t);
+ bodySentCF.completeExceptionally(t);
connection.close();
cf.completeExceptionally(t);
return cf;
@@ -278,28 +280,43 @@
.thenCompose(unused -> headersSentCF);
}
+ private void cancelIfFailed(Flow.Subscription s) {
+ asyncReceiver.whenFinished.whenCompleteAsync((r,t) -> {
+ if (debug.on()) debug.log("asyncReceiver finished (failed=%s)", t);
+ if (t != null) {
+ s.cancel();
+ bodySentCF.complete(this);
+ }
+ }, executor);
+ }
+
@Override
CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
assert headersSentCF.isDone();
+ if (debug.on()) debug.log("sendBodyAsync");
try {
bodySubscriber = requestAction.continueRequest();
+ if (debug.on()) debug.log("bodySubscriber is %s",
+ bodySubscriber == null ? null : bodySubscriber.getClass());
if (bodySubscriber == null) {
bodySubscriber = Http1BodySubscriber.completeSubscriber(debug);
appendToOutgoing(Http1BodySubscriber.COMPLETED);
} else {
// start
bodySubscriber.whenSubscribed
+ .thenAccept((s) -> cancelIfFailed(s))
.thenAccept((s) -> requestMoreBody());
}
} catch (Throwable t) {
cancelImpl(t);
bodySentCF.completeExceptionally(t);
}
- return bodySentCF;
+ return Utils.wrapForDebug(debug, "sendBodyAsync", bodySentCF);
}
@Override
CompletableFuture<Response> getResponseAsync(Executor executor) {
+ if (debug.on()) debug.log("reading headers");
CompletableFuture<Response> cf = response.readHeadersAsync(executor);
Throwable cause;
synchronized (lock) {
@@ -323,7 +340,7 @@
debug.log(acknowledged ? ("completed response with " + cause)
: ("response already completed, ignoring " + cause));
}
- return cf;
+ return Utils.wrapForDebug(debug, "getResponseAsync", cf);
}
@Override