# HG changeset patch # User dfuchs # Date 1528913507 -3600 # Node ID ba60eaef37d7f496a52365555d5a5552d862bd32 # Parent a78082bf94de3be9b8572ca7c0e74c60888410b9 http-client-branch: fixed ShortBodyResponse issues diff -r a78082bf94de -r ba60eaef37d7 src/java.net.http/share/classes/jdk/internal/net/http/Exchange.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/Exchange.java Wed Jun 13 15:45:27 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Exchange.java Wed Jun 13 19:11:47 2018 +0100 @@ -307,6 +307,7 @@ Function,CompletableFuture> andThen) { t = Utils.getCompletionCause(t); if (t instanceof ProxyAuthenticationRequired) { + if (debug.on()) debug.log("checkFor407: ProxyAuthenticationRequired: building synthetic response"); bodyIgnored = MinimalFuture.completedFuture(null); Response proxyResponse = ((ProxyAuthenticationRequired)t).proxyResponse; HttpConnection c = ex == null ? null : ex.connection(); @@ -315,8 +316,10 @@ proxyResponse.version, true); return MinimalFuture.completedFuture(syntheticResponse); } else if (t != null) { + if (debug.on()) debug.log("checkFor407: no response - %s", t); return MinimalFuture.failedFuture(t); } else { + if (debug.on()) debug.log("checkFor407: all clear"); return andThen.apply(ex); } } @@ -359,6 +362,7 @@ // send the request body and proceed. private CompletableFuture sendRequestBody(ExchangeImpl ex) { assert !request.expectContinue(); + if (debug.on()) debug.log("sendRequestBody"); CompletableFuture cf = ex.sendBodyAsync() .thenCompose(exIm -> exIm.getResponseAsync(parentExecutor)); cf = wrapForUpgrade(cf); diff -r a78082bf94de -r ba60eaef37d7 src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java Wed Jun 13 15:45:27 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java Wed Jun 13 19:11:47 2018 +0100 @@ -27,7 +27,6 @@ import java.io.EOFException; import java.io.IOException; -import java.lang.System.Logger.Level; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashSet; @@ -43,6 +42,7 @@ import jdk.internal.net.http.common.Demand; import jdk.internal.net.http.common.FlowTube.TubeSubscriber; import jdk.internal.net.http.common.Logger; +import jdk.internal.net.http.common.MinimalFuture; import jdk.internal.net.http.common.SequentialScheduler; import jdk.internal.net.http.common.ConnectionExpiredException; import jdk.internal.net.http.common.Utils; @@ -166,6 +166,7 @@ = new ConcurrentLinkedDeque<>(); private final SequentialScheduler scheduler = SequentialScheduler.synchronizedScheduler(this::flush); + final MinimalFuture whenFinished; private final Executor executor; private final Http1TubeSubscriber subscriber = new Http1TubeSubscriber(); private final AtomicReference pendingDelegateRef; @@ -184,6 +185,7 @@ public Http1AsyncReceiver(Executor executor, Http1Exchange owner) { this.pendingDelegateRef = new AtomicReference<>(); this.executor = executor; + this.whenFinished = new MinimalFuture<>(); this.owner = owner; this.client = owner.client; } @@ -284,6 +286,7 @@ + "\t\t queue.isEmpty: " + queue.isEmpty()); scheduler.stop(); delegate.onReadError(x); + whenFinished.completeExceptionally(x); if (stopRequested) { // This is the special case where the subscriber // has requested an illegal number of items. @@ -491,6 +494,7 @@ if (previous != null) previous.close(error); delegate = null; owner = null; + whenFinished.complete(null); } /** diff -r a78082bf94de -r ba60eaef37d7 src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java Wed Jun 13 15:45:27 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java Wed Jun 13 19:11:47 2018 +0100 @@ -219,16 +219,16 @@ // create the response before sending the request headers, so that // the response can set the appropriate receivers. if (debug.on()) debug.log("Sending headers only"); + // If the first attempt to read something triggers EOF, or + // IOException("channel reset by peer"), we're going to retry. + // Instruct the asyncReceiver to throw ConnectionExpiredException + // to force a retry. + asyncReceiver.setRetryOnError(true); if (response == null) { response = new Http1Response<>(connection, this, asyncReceiver); } if (debug.on()) debug.log("response created in advance"); - // If the first attempt to read something triggers EOF, or - // IOException("channel reset by peer"), we're going to retry. - // Instruct the asyncReceiver to throw ConnectionExpiredException - // to force a retry. - asyncReceiver.setRetryOnError(true); CompletableFuture connectCF; if (!connection.connected()) { @@ -271,6 +271,8 @@ return cf; } catch (Throwable t) { if (debug.on()) debug.log("Failed to send headers: %s", t); + headersSentCF.completeExceptionally(t); + bodySentCF.completeExceptionally(t); connection.close(); cf.completeExceptionally(t); return cf; @@ -278,28 +280,43 @@ .thenCompose(unused -> headersSentCF); } + private void cancelIfFailed(Flow.Subscription s) { + asyncReceiver.whenFinished.whenCompleteAsync((r,t) -> { + if (debug.on()) debug.log("asyncReceiver finished (failed=%s)", t); + if (t != null) { + s.cancel(); + bodySentCF.complete(this); + } + }, executor); + } + @Override CompletableFuture> sendBodyAsync() { assert headersSentCF.isDone(); + if (debug.on()) debug.log("sendBodyAsync"); try { bodySubscriber = requestAction.continueRequest(); + if (debug.on()) debug.log("bodySubscriber is %s", + bodySubscriber == null ? null : bodySubscriber.getClass()); if (bodySubscriber == null) { bodySubscriber = Http1BodySubscriber.completeSubscriber(debug); appendToOutgoing(Http1BodySubscriber.COMPLETED); } else { // start bodySubscriber.whenSubscribed + .thenAccept((s) -> cancelIfFailed(s)) .thenAccept((s) -> requestMoreBody()); } } catch (Throwable t) { cancelImpl(t); bodySentCF.completeExceptionally(t); } - return bodySentCF; + return Utils.wrapForDebug(debug, "sendBodyAsync", bodySentCF); } @Override CompletableFuture getResponseAsync(Executor executor) { + if (debug.on()) debug.log("reading headers"); CompletableFuture cf = response.readHeadersAsync(executor); Throwable cause; synchronized (lock) { @@ -323,7 +340,7 @@ debug.log(acknowledged ? ("completed response with " + cause) : ("response already completed, ignoring " + cause)); } - return cf; + return Utils.wrapForDebug(debug, "getResponseAsync", cf); } @Override diff -r a78082bf94de -r ba60eaef37d7 src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java Wed Jun 13 15:45:27 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java Wed Jun 13 19:11:47 2018 +0100 @@ -165,6 +165,7 @@ + asyncReceiver.remaining() +") " + readProgress); if (firstTimeAround) { + if (debug.on()) debug.log("First time around"); firstTimeAround = false; } else { // with expect continue we will resume reading headers + body. @@ -180,6 +181,12 @@ CompletableFuture cf = headersReader.completion(); assert cf != null : "parsing not started"; + if (debug.on()) { + debug.log("headersReader is %s", + cf == null ? "not yet started" + : cf.isDone() ? "already completed" + : "not yet completed"); + } Function lambda = (State completed) -> { assert completed == State.READING_HEADERS; diff -r a78082bf94de -r ba60eaef37d7 src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Wed Jun 13 15:45:27 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Wed Jun 13 19:11:47 2018 +0100 @@ -775,7 +775,6 @@ + " to subscriber " + subscriber); current.errorRef.compareAndSet(null, error); current.signalCompletion(); - writeSubscriber.subscription.cancel(); readScheduler.stop(); debugState("leaving read() loop with error: "); return; diff -r a78082bf94de -r ba60eaef37d7 src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java Wed Jun 13 15:45:27 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java Wed Jun 13 19:11:47 2018 +0100 @@ -309,7 +309,7 @@ synchronized (this) { previous = pendingDelegate.getAndSet(delegateWrapper); subscription = readSubscription; - handleNow = this.errorRef.get() != null || finished || readSubscriber.onCompleteReceived; + handleNow = this.errorRef.get() != null || onCompleteReceived; } if (previous != null) { previous.dropSubscription(); @@ -424,7 +424,7 @@ // if onError is invoked concurrently with setDelegate. synchronized (this) { failed = this.errorRef.get(); - completed = finished || onCompleteReceived; + completed = onCompleteReceived; subscribed = subscriberImpl; } @@ -437,6 +437,7 @@ if (debug.on()) debug.log("onNewSubscription: subscriberImpl:%s, invoking onCompleted", subscriberImpl); + finished = true; subscriberImpl.onComplete(); } } @@ -497,7 +498,6 @@ @Override public void onComplete() { assert !finished && !onCompleteReceived; - onCompleteReceived = true; DelegateWrapper subscriberImpl; synchronized(this) { subscriberImpl = subscribed; @@ -512,8 +512,10 @@ onErrorImpl(new SSLHandshakeException( "Remote host terminated the handshake")); } else if (subscriberImpl != null) { - finished = true; + onCompleteReceived = finished = true; subscriberImpl.onComplete(); + } else { + onCompleteReceived = true; } // now if we have any pending subscriber, we should complete // them immediately as the read scheduler will already be stopped. diff -r a78082bf94de -r ba60eaef37d7 src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java Wed Jun 13 15:45:27 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java Wed Jun 13 19:11:47 2018 +0100 @@ -59,9 +59,11 @@ import java.util.List; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.function.BiPredicate; +import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -180,6 +182,17 @@ .collect(Collectors.toUnmodifiableSet()); } + public static CompletableFuture wrapForDebug(Logger logger, String name, CompletableFuture cf) { + if (logger.on()) { + return cf.handle((r,t) -> { + logger.log("%s completed %s", name, t == null ? "successfully" : t ); + return cf; + }).thenCompose(Function.identity()); + } else { + return cf; + } + } + private static final String WSPACES = " \t\r\n"; private static final boolean isAllowedForProxy(String name, String value, diff -r a78082bf94de -r ba60eaef37d7 test/jdk/java/net/httpclient/ShortResponseBody.java --- a/test/jdk/java/net/httpclient/ShortResponseBody.java Wed Jun 13 15:45:27 2018 +0100 +++ b/test/jdk/java/net/httpclient/ShortResponseBody.java Wed Jun 13 19:11:47 2018 +0100 @@ -29,11 +29,9 @@ * @build jdk.testlibrary.SimpleSSLContext * @run testng/othervm * -Djdk.httpclient.HttpClient.log=headers,errors - * -Djdk.internal.httpclient.debug=true * ShortResponseBody * @run testng/othervm * -Djdk.httpclient.HttpClient.log=headers,errors - * -Djdk.internal.httpclient.debug=true * -Djdk.httpclient.enableAllMethodRetry * ShortResponseBody */ @@ -278,7 +276,12 @@ @Override public int read(byte[] buf, int offset, int length) { - return length; + //int count = offset; + //length = Math.max(0, Math.min(buf.length - offset, length)); + //for (; count < length; count++) + // buf[offset++] = 0x01; + //return count; + return Math.max(0, Math.min(buf.length - offset, length)); } }