src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java
branchhttp-client-branch
changeset 55847 3bac3bca4adb
parent 55803 259cf67b22ec
child 55858 cd5eeec735fb
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java	Tue Nov 21 12:32:16 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java	Tue Nov 21 17:17:37 2017 +0300
@@ -39,6 +39,7 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -46,6 +47,7 @@
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Flow;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import jdk.incubator.http.internal.common.MinimalFuture;
@@ -57,6 +59,7 @@
         private final Consumer<Optional<byte[]>> consumer;
         private Flow.Subscription subscription;
         private final CompletableFuture<Void> result = new MinimalFuture<>();
+        private final AtomicBoolean subscribed = new AtomicBoolean();
 
         ConsumerSubscriber(Consumer<Optional<byte[]>> consumer) {
             this.consumer = consumer;
@@ -69,8 +72,12 @@
 
         @Override
         public void onSubscribe(Flow.Subscription subscription) {
-            this.subscription = subscription;
-            subscription.request(1);
+            if (!subscribed.compareAndSet(false, true)) {
+                subscription.cancel();
+            } else {
+                this.subscription = subscription;
+                subscription.request(1);
+            }
         }
 
         @Override
@@ -252,6 +259,7 @@
         private volatile Throwable failed;
         private volatile Iterator<ByteBuffer> currentListItr;
         private volatile ByteBuffer currentBuffer;
+        private final AtomicBoolean subscribed = new AtomicBoolean();
 
         HttpResponseInputStream() {
             this(MAX_BUFFERS_IN_QUEUE);
@@ -351,19 +359,20 @@
 
         @Override
         public void onSubscribe(Flow.Subscription s) {
-            if (this.subscription != null) {
+            if (!subscribed.compareAndSet(false, true)) {
                 s.cancel();
-                return;
+            } else {
+                this.subscription = s;
+                assert buffers.remainingCapacity() > 1; // should contain at least 2
+                DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting "
+                        + Math.max(1, buffers.remainingCapacity() - 1));
+                s.request(Math.max(1, buffers.remainingCapacity() - 1));
             }
-            this.subscription = s;
-            assert buffers.remainingCapacity() > 1; // should contain at least 2
-            DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting "
-                                +  Math.max(1, buffers.remainingCapacity() - 1));
-            s.request(Math.max(1, buffers.remainingCapacity() - 1));
         }
 
         @Override
         public void onNext(List<ByteBuffer> t) {
+            Objects.requireNonNull(t);
             try {
                 DEBUG_LOGGER.log(Level.DEBUG, "next item received");
                 if (!buffers.offer(t)) {
@@ -383,7 +392,7 @@
         @Override
         public void onError(Throwable thrwbl) {
             subscription = null;
-            failed = thrwbl == null ? new InternalError("illegal null Throwable") : thrwbl;
+            failed = Objects.requireNonNull(thrwbl);
             // The client process that reads the input stream might
             // be blocked in queue.take().
             // Tries to offer LAST_LIST to the queue. If the queue is
@@ -474,8 +483,9 @@
      */
     static class NullSubscriber<T> implements HttpResponse.BodySubscriber<T> {
 
-        final CompletableFuture<T> cf = new MinimalFuture<>();
-        final Optional<T> result;
+        private final CompletableFuture<T> cf = new MinimalFuture<>();
+        private final Optional<T> result;
+        private final AtomicBoolean subscribed = new AtomicBoolean();
 
         NullSubscriber(Optional<T> result) {
             this.result = result;
@@ -483,12 +493,16 @@
 
         @Override
         public void onSubscribe(Flow.Subscription subscription) {
-            subscription.request(Long.MAX_VALUE);
+            if (!subscribed.compareAndSet(false, true)) {
+                subscription.cancel();
+            } else {
+                subscription.request(Long.MAX_VALUE);
+            }
         }
 
         @Override
         public void onNext(List<ByteBuffer> items) {
-            // NO-OP
+            Objects.requireNonNull(items);
         }
 
         @Override