http-client-branch: fixed ShortBodyResponse issues http-client-branch
authordfuchs
Wed, 13 Jun 2018 19:11:47 +0100
branchhttp-client-branch
changeset 56756 ba60eaef37d7
parent 56753 a78082bf94de
child 56762 79e371a6462c
http-client-branch: fixed ShortBodyResponse issues
src/java.net.http/share/classes/jdk/internal/net/http/Exchange.java
src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java
src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java
src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java
src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java
src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java
src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java
test/jdk/java/net/httpclient/ShortResponseBody.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Exchange.java	Wed Jun 13 15:45:27 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Exchange.java	Wed Jun 13 19:11:47 2018 +0100
@@ -307,6 +307,7 @@
                                                     Function<ExchangeImpl<T>,CompletableFuture<Response>> andThen) {
         t = Utils.getCompletionCause(t);
         if (t instanceof ProxyAuthenticationRequired) {
+            if (debug.on()) debug.log("checkFor407: ProxyAuthenticationRequired: building synthetic response");
             bodyIgnored = MinimalFuture.completedFuture(null);
             Response proxyResponse = ((ProxyAuthenticationRequired)t).proxyResponse;
             HttpConnection c = ex == null ? null : ex.connection();
@@ -315,8 +316,10 @@
                     proxyResponse.version, true);
             return MinimalFuture.completedFuture(syntheticResponse);
         } else if (t != null) {
+            if (debug.on()) debug.log("checkFor407: no response - %s", t);
             return MinimalFuture.failedFuture(t);
         } else {
+            if (debug.on()) debug.log("checkFor407: all clear");
             return andThen.apply(ex);
         }
     }
@@ -359,6 +362,7 @@
     // send the request body and proceed.
     private CompletableFuture<Response> sendRequestBody(ExchangeImpl<T> ex) {
         assert !request.expectContinue();
+        if (debug.on()) debug.log("sendRequestBody");
         CompletableFuture<Response> cf = ex.sendBodyAsync()
                 .thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
         cf = wrapForUpgrade(cf);
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java	Wed Jun 13 15:45:27 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java	Wed Jun 13 19:11:47 2018 +0100
@@ -27,7 +27,6 @@
 
 import java.io.EOFException;
 import java.io.IOException;
-import java.lang.System.Logger.Level;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -43,6 +42,7 @@
 import jdk.internal.net.http.common.Demand;
 import jdk.internal.net.http.common.FlowTube.TubeSubscriber;
 import jdk.internal.net.http.common.Logger;
+import jdk.internal.net.http.common.MinimalFuture;
 import jdk.internal.net.http.common.SequentialScheduler;
 import jdk.internal.net.http.common.ConnectionExpiredException;
 import jdk.internal.net.http.common.Utils;
@@ -166,6 +166,7 @@
             = new ConcurrentLinkedDeque<>();
     private final SequentialScheduler scheduler =
             SequentialScheduler.synchronizedScheduler(this::flush);
+    final MinimalFuture<Void> whenFinished;
     private final Executor executor;
     private final Http1TubeSubscriber subscriber = new Http1TubeSubscriber();
     private final AtomicReference<Http1AsyncDelegate> pendingDelegateRef;
@@ -184,6 +185,7 @@
     public Http1AsyncReceiver(Executor executor, Http1Exchange<?> owner) {
         this.pendingDelegateRef = new AtomicReference<>();
         this.executor = executor;
+        this.whenFinished = new MinimalFuture<>();
         this.owner = owner;
         this.client = owner.client;
     }
@@ -284,6 +286,7 @@
                           + "\t\t queue.isEmpty: " + queue.isEmpty());
             scheduler.stop();
             delegate.onReadError(x);
+            whenFinished.completeExceptionally(x);
             if (stopRequested) {
                 // This is the special case where the subscriber
                 // has requested an illegal number of items.
@@ -491,6 +494,7 @@
         if (previous != null) previous.close(error);
         delegate = null;
         owner  = null;
+        whenFinished.complete(null);
     }
 
     /**
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java	Wed Jun 13 15:45:27 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java	Wed Jun 13 19:11:47 2018 +0100
@@ -219,16 +219,16 @@
         // create the response before sending the request headers, so that
         // the response can set the appropriate receivers.
         if (debug.on()) debug.log("Sending headers only");
+        // If the first attempt to read something triggers EOF, or
+        // IOException("channel reset by peer"), we're going to retry.
+        // Instruct the asyncReceiver to throw ConnectionExpiredException
+        // to force a retry.
+        asyncReceiver.setRetryOnError(true);
         if (response == null) {
             response = new Http1Response<>(connection, this, asyncReceiver);
         }
 
         if (debug.on()) debug.log("response created in advance");
-        // If the first attempt to read something triggers EOF, or
-        // IOException("channel reset by peer"), we're going to retry.
-        // Instruct the asyncReceiver to throw ConnectionExpiredException
-        // to force a retry.
-        asyncReceiver.setRetryOnError(true);
 
         CompletableFuture<Void> connectCF;
         if (!connection.connected()) {
@@ -271,6 +271,8 @@
                         return cf;
                     } catch (Throwable t) {
                         if (debug.on()) debug.log("Failed to send headers: %s", t);
+                        headersSentCF.completeExceptionally(t);
+                        bodySentCF.completeExceptionally(t);
                         connection.close();
                         cf.completeExceptionally(t);
                         return cf;
@@ -278,28 +280,43 @@
                 .thenCompose(unused -> headersSentCF);
     }
 
+    private void cancelIfFailed(Flow.Subscription s) {
+        asyncReceiver.whenFinished.whenCompleteAsync((r,t) -> {
+            if (debug.on()) debug.log("asyncReceiver finished (failed=%s)", t);
+            if (t != null) {
+                s.cancel();
+                bodySentCF.complete(this);
+            }
+        }, executor);
+    }
+
     @Override
     CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
         assert headersSentCF.isDone();
+        if (debug.on()) debug.log("sendBodyAsync");
         try {
             bodySubscriber = requestAction.continueRequest();
+            if (debug.on()) debug.log("bodySubscriber is %s",
+                    bodySubscriber == null ? null : bodySubscriber.getClass());
             if (bodySubscriber == null) {
                 bodySubscriber = Http1BodySubscriber.completeSubscriber(debug);
                 appendToOutgoing(Http1BodySubscriber.COMPLETED);
             } else {
                 // start
                 bodySubscriber.whenSubscribed
+                        .thenAccept((s) -> cancelIfFailed(s))
                         .thenAccept((s) -> requestMoreBody());
             }
         } catch (Throwable t) {
             cancelImpl(t);
             bodySentCF.completeExceptionally(t);
         }
-        return bodySentCF;
+        return Utils.wrapForDebug(debug, "sendBodyAsync", bodySentCF);
     }
 
     @Override
     CompletableFuture<Response> getResponseAsync(Executor executor) {
+        if (debug.on()) debug.log("reading headers");
         CompletableFuture<Response> cf = response.readHeadersAsync(executor);
         Throwable cause;
         synchronized (lock) {
@@ -323,7 +340,7 @@
                 debug.log(acknowledged ? ("completed response with " + cause)
                           : ("response already completed, ignoring " + cause));
         }
-        return cf;
+        return Utils.wrapForDebug(debug, "getResponseAsync", cf);
     }
 
     @Override
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java	Wed Jun 13 15:45:27 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java	Wed Jun 13 19:11:47 2018 +0100
@@ -165,6 +165,7 @@
                       + asyncReceiver.remaining() +") "  + readProgress);
 
         if (firstTimeAround) {
+            if (debug.on()) debug.log("First time around");
             firstTimeAround = false;
         } else {
             // with expect continue we will resume reading headers + body.
@@ -180,6 +181,12 @@
 
         CompletableFuture<State> cf = headersReader.completion();
         assert cf != null : "parsing not started";
+        if (debug.on()) {
+            debug.log("headersReader is %s",
+                    cf == null ? "not yet started"
+                            : cf.isDone() ? "already completed"
+                            : "not yet completed");
+        }
 
         Function<State, Response> lambda = (State completed) -> {
                 assert completed == State.READING_HEADERS;
--- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java	Wed Jun 13 15:45:27 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java	Wed Jun 13 19:11:47 2018 +0100
@@ -775,7 +775,6 @@
                                           + " to subscriber " + subscriber);
                             current.errorRef.compareAndSet(null, error);
                             current.signalCompletion();
-                            writeSubscriber.subscription.cancel();
                             readScheduler.stop();
                             debugState("leaving read() loop with error: ");
                             return;
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java	Wed Jun 13 15:45:27 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java	Wed Jun 13 19:11:47 2018 +0100
@@ -309,7 +309,7 @@
             synchronized (this) {
                 previous = pendingDelegate.getAndSet(delegateWrapper);
                 subscription = readSubscription;
-                handleNow = this.errorRef.get() != null || finished || readSubscriber.onCompleteReceived;
+                handleNow = this.errorRef.get() != null || onCompleteReceived;
             }
             if (previous != null) {
                 previous.dropSubscription();
@@ -424,7 +424,7 @@
             // if onError is invoked concurrently with setDelegate.
             synchronized (this) {
                 failed = this.errorRef.get();
-                completed = finished || onCompleteReceived;
+                completed = onCompleteReceived;
                 subscribed = subscriberImpl;
             }
 
@@ -437,6 +437,7 @@
                 if (debug.on())
                     debug.log("onNewSubscription: subscriberImpl:%s, invoking onCompleted",
                               subscriberImpl);
+                finished = true;
                 subscriberImpl.onComplete();
             }
         }
@@ -497,7 +498,6 @@
         @Override
         public void onComplete() {
             assert !finished && !onCompleteReceived;
-            onCompleteReceived = true;
             DelegateWrapper subscriberImpl;
             synchronized(this) {
                 subscriberImpl = subscribed;
@@ -512,8 +512,10 @@
                 onErrorImpl(new SSLHandshakeException(
                         "Remote host terminated the handshake"));
             } else if (subscriberImpl != null) {
-                finished = true;
+                onCompleteReceived = finished = true;
                 subscriberImpl.onComplete();
+            } else {
+                onCompleteReceived = true;
             }
             // now if we have any pending subscriber, we should complete
             // them immediately as the read scheduler will already be stopped.
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java	Wed Jun 13 15:45:27 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java	Wed Jun 13 19:11:47 2018 +0100
@@ -59,9 +59,11 @@
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.function.BiPredicate;
+import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -180,6 +182,17 @@
                                 .collect(Collectors.toUnmodifiableSet());
     }
 
+    public static <T> CompletableFuture<T> wrapForDebug(Logger logger, String name, CompletableFuture<T> cf) {
+        if (logger.on()) {
+            return cf.handle((r,t) -> {
+                logger.log("%s completed %s", name, t == null ? "successfully" : t );
+                return cf;
+            }).thenCompose(Function.identity());
+        } else {
+            return cf;
+        }
+    }
+
     private static final String WSPACES = " \t\r\n";
     private static final boolean isAllowedForProxy(String name,
                                                    String value,
--- a/test/jdk/java/net/httpclient/ShortResponseBody.java	Wed Jun 13 15:45:27 2018 +0100
+++ b/test/jdk/java/net/httpclient/ShortResponseBody.java	Wed Jun 13 19:11:47 2018 +0100
@@ -29,11 +29,9 @@
  * @build jdk.testlibrary.SimpleSSLContext
  * @run testng/othervm
  *       -Djdk.httpclient.HttpClient.log=headers,errors
- *       -Djdk.internal.httpclient.debug=true
  *       ShortResponseBody
  * @run testng/othervm
  *       -Djdk.httpclient.HttpClient.log=headers,errors
- *       -Djdk.internal.httpclient.debug=true
  *       -Djdk.httpclient.enableAllMethodRetry
  *       ShortResponseBody
  */
@@ -278,7 +276,12 @@
 
         @Override
         public int read(byte[] buf, int offset, int length) {
-            return length;
+            //int count = offset;
+            //length = Math.max(0, Math.min(buf.length - offset, length));
+            //for (; count < length; count++)
+            //    buf[offset++] = 0x01;
+            //return count;
+            return Math.max(0, Math.min(buf.length - offset, length));
         }
     }