8178699: Fail to send async requests if server doesn't response the first one
authorxiaofeya
Sun, 25 Jun 2017 23:51:46 -0700
changeset 45708 3512073b446f
parent 45707 28170236270a
child 45709 841996818446
8178699: Fail to send async requests if server doesn't response the first one Reviewed-by: dfuchs
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Request.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RequestProcessors.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseProcessors.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java
jdk/test/java/net/httpclient/SmokeTest.java
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java	Sun Jun 25 13:35:08 2017 -0700
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java	Sun Jun 25 23:51:46 2017 -0700
@@ -266,7 +266,6 @@
         throws IOException, InterruptedException
     {
         establishExchange(connection);
-        exchImpl.setClientForRequest(requestProcessor);
         if (request.expectContinue()) {
             Log.logTrace("Sending Expect: 100-Continue");
             request.addSystemHeader("Expect", "100-Continue");
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java	Sun Jun 25 13:35:08 2017 -0700
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java	Sun Jun 25 23:51:46 2017 -0700
@@ -26,7 +26,6 @@
 package jdk.incubator.http;
 
 import java.io.IOException;
-import jdk.incubator.http.RequestProcessors.ProcessorBase;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import jdk.incubator.http.internal.common.MinimalFuture;
@@ -50,11 +49,8 @@
     final Exchange<T> exchange;
 
     ExchangeImpl(Exchange<T> e) {
+        // e == null means a http/2 pushed stream
         this.exchange = e;
-        if (e != null) {
-            // e == null means a http/2 pushed stream, therefore no request
-            setClientForRequest(e.requestProcessor);
-        }
     }
 
     final Exchange<T> getExchange() {
@@ -134,22 +130,6 @@
                                                 boolean returnConnectionToPool,
                                                 Executor executor);
 
-    // Builtin processors need access to HttpClientImpl
-    final void setClientForResponse(HttpResponse.BodyProcessor<T> proc) {
-        if (proc instanceof ResponseProcessors.AbstractProcessor) {
-            ResponseProcessors.AbstractProcessor<T> abProc =
-                    (ResponseProcessors.AbstractProcessor<T>)proc;
-            abProc.setClient(exchange.client());
-        }
-    }
-
-    final void setClientForRequest(HttpRequest.BodyProcessor proc) {
-        if (proc instanceof ProcessorBase) {
-            ProcessorBase abProc = (ProcessorBase)proc;
-            abProc.setClient(exchange.client());
-        }
-    }
-
     /**
      * Async version of getResponse. Completes before body is read.
      */
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java	Sun Jun 25 13:35:08 2017 -0700
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java	Sun Jun 25 23:51:46 2017 -0700
@@ -96,7 +96,6 @@
     {
         BodyProcessor<T> processor = handler.apply(response.responseCode(),
                                                    response.responseHeaders());
-        setClientForResponse(processor);
         CompletableFuture<T> bodyCF = response.readBody(processor,
                                                         returnConnectionToPool,
                                                         this::executeInline);
@@ -122,7 +121,6 @@
     {
         BodyProcessor<T> processor = handler.apply(response.responseCode(),
                                                    response.responseHeaders());
-        setClientForResponse(processor);
         CompletableFuture<T> bodyCF = response.readBody(processor,
                                                         returnConnectionToPool,
                                                         executor);
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Request.java	Sun Jun 25 13:35:08 2017 -0700
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Request.java	Sun Jun 25 23:51:46 2017 -0700
@@ -33,7 +33,6 @@
 import java.util.Set;
 import java.net.InetSocketAddress;
 import jdk.incubator.http.HttpConnection.Mode;
-import jdk.incubator.http.RequestProcessors.ProcessorBase;
 import java.nio.charset.StandardCharsets;
 import static java.nio.charset.StandardCharsets.US_ASCII;
 import java.util.concurrent.CompletableFuture;
@@ -274,9 +273,7 @@
         setFinished();
     }
 
-    class StreamSubscriber extends ProcessorBase
-        implements Flow.Subscriber<ByteBuffer>
-    {
+    class StreamSubscriber implements Flow.Subscriber<ByteBuffer> {
         volatile Flow.Subscription subscription;
         volatile boolean includeHeaders;
 
@@ -361,13 +358,11 @@
         throws IOException
     {
         StreamSubscriber subscriber = new StreamSubscriber(includeHeaders);
-        subscriber.setClient(client);
         requestProc.subscribe(subscriber);
         waitForCompletion();
     }
 
-    class FixedContentSubscriber extends ProcessorBase
-        implements Flow.Subscriber<ByteBuffer>
+    class FixedContentSubscriber implements Flow.Subscriber<ByteBuffer>
     {
         volatile Flow.Subscription subscription;
         volatile boolean includeHeaders;
@@ -451,7 +446,6 @@
             return;
         }
         FixedContentSubscriber subscriber = new FixedContentSubscriber(includeHeaders);
-        subscriber.setClient(client);
         requestProc.subscribe(subscriber);
         waitForCompletion();
     }
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RequestProcessors.java	Sun Jun 25 13:35:08 2017 -0700
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RequestProcessors.java	Sun Jun 25 23:51:46 2017 -0700
@@ -38,29 +38,14 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
-import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Flow;
 import java.util.function.Supplier;
 import jdk.incubator.http.internal.common.Utils;
 
 class RequestProcessors {
-    // common base class for Publisher and Subscribers used here
-    abstract static class ProcessorBase {
-        HttpClientImpl client;
 
-        synchronized void setClient(HttpClientImpl client) {
-            this.client = client;
-        }
-
-        synchronized HttpClientImpl getClient() {
-            return client;
-        }
-    }
-
-    static class ByteArrayProcessor extends ProcessorBase
-        implements HttpRequest.BodyProcessor
-    {
+    static class ByteArrayProcessor implements HttpRequest.BodyProcessor {
         private volatile Flow.Publisher<ByteBuffer> delegate;
         private final int length;
         private final byte[] content;
@@ -105,9 +90,7 @@
     }
 
     // This implementation has lots of room for improvement.
-    static class IterableProcessor extends ProcessorBase
-        implements HttpRequest.BodyProcessor
-    {
+    static class IterableProcessor implements HttpRequest.BodyProcessor {
         private volatile Flow.Publisher<ByteBuffer> delegate;
         private final Iterable<byte[]> content;
         private volatile long contentLength;
@@ -202,8 +185,7 @@
         }
     }
 
-    static class EmptyProcessor extends ProcessorBase implements HttpRequest.BodyProcessor
-    {
+    static class EmptyProcessor implements HttpRequest.BodyProcessor {
         PseudoPublisher<ByteBuffer> delegate = new PseudoPublisher<>();
 
         @Override
@@ -303,9 +285,7 @@
 
     }
 
-    static class InputStreamProcessor extends ProcessorBase
-        implements HttpRequest.BodyProcessor
-    {
+    static class InputStreamProcessor implements HttpRequest.BodyProcessor {
         private final Supplier<? extends InputStream> streamSupplier;
         private Flow.Publisher<ByteBuffer> delegate;
 
@@ -315,11 +295,7 @@
 
         @Override
         public synchronized void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
-            if (!(subscriber instanceof ProcessorBase)) {
-                throw new UnsupportedOperationException();
-            }
-            ProcessorBase base = (ProcessorBase)subscriber;
-            HttpClientImpl client = base.getClient();
+
             InputStream is = streamSupplier.get();
             if (is == null) {
                 throw new UncheckedIOException(new IOException("no inputstream supplied"));
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseProcessors.java	Sun Jun 25 13:35:08 2017 -0700
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseProcessors.java	Sun Jun 25 23:51:46 2017 -0700
@@ -51,21 +51,7 @@
 
 class ResponseProcessors {
 
-    abstract static class AbstractProcessor<T>
-        implements HttpResponse.BodyProcessor<T>
-    {
-        HttpClientImpl client;
-
-        synchronized void setClient(HttpClientImpl client) {
-            this.client = client;
-        }
-
-        synchronized HttpClientImpl getClient() {
-            return client;
-        }
-    }
-
-    static class ConsumerProcessor extends AbstractProcessor<Void> {
+    static class ConsumerProcessor implements HttpResponse.BodyProcessor<Void> {
         private final Consumer<Optional<byte[]>> consumer;
         private Flow.Subscription subscription;
         private final CompletableFuture<Void> result = new MinimalFuture<>();
@@ -106,7 +92,7 @@
 
     }
 
-    static class PathProcessor extends AbstractProcessor<Path> {
+    static class PathProcessor implements HttpResponse.BodyProcessor<Path> {
 
         private final Path file;
         private final CompletableFuture<Path> result = new MinimalFuture<>();
@@ -163,7 +149,7 @@
         }
     }
 
-    static class ByteArrayProcessor<T> extends AbstractProcessor<T> {
+    static class ByteArrayProcessor<T> implements HttpResponse.BodyProcessor<T> {
         private final Function<byte[], T> finisher;
         private final CompletableFuture<T> result = new MinimalFuture<>();
         private final List<ByteBuffer> received = new ArrayList<>();
@@ -301,7 +287,7 @@
     /**
      * Currently this consumes all of the data and ignores it
      */
-    static class NullProcessor<T> extends AbstractProcessor<T> {
+    static class NullProcessor<T> implements HttpResponse.BodyProcessor<T> {
 
         Flow.Subscription subscription;
         final CompletableFuture<T> cf = new MinimalFuture<>();
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java	Sun Jun 25 13:35:08 2017 -0700
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java	Sun Jun 25 23:51:46 2017 -0700
@@ -153,7 +153,6 @@
     {
         Log.logTrace("Reading body on stream {0}", streamid);
         responseProcessor = handler.apply(responseCode, responseHeaders);
-        setClientForResponse(responseProcessor);
         publisher.subscribe(responseProcessor);
         CompletableFuture<T> cf = receiveData(executor);
 
@@ -573,10 +572,7 @@
         connection.putStream(this, streamid);
     }
 
-    class RequestSubscriber
-        extends RequestProcessors.ProcessorBase
-        implements Flow.Subscriber<ByteBuffer>
-    {
+    class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {
         // can be < 0 if the actual length is not known.
         private volatile long remainingContentLength;
         private volatile Subscription subscription;
@@ -768,7 +764,6 @@
 
     CompletableFuture<Void> sendBodyImpl() {
         RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
-        subscriber.setClient(client);
         requestProcessor.subscribe(subscriber);
         requestBodyCF.whenComplete((v,t) -> requestSent());
         return requestBodyCF;
--- a/jdk/test/java/net/httpclient/SmokeTest.java	Sun Jun 25 13:35:08 2017 -0700
+++ b/jdk/test/java/net/httpclient/SmokeTest.java	Sun Jun 25 23:51:46 2017 -0700
@@ -23,7 +23,7 @@
 
 /*
  * @test
- * @bug 8087112
+ * @bug 8087112 8178699
  * @modules jdk.incubator.httpclient
  *          java.logging
  *          jdk.httpserver