src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java
changeset 55402 b78af6d8a252
parent 53467 97cf88608d76
child 57685 e4cc5231ce2d
--- a/src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java	Tue Jun 18 11:06:29 2019 +0200
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java	Tue Jun 18 14:52:36 2019 +0100
@@ -125,6 +125,7 @@
 
         @Override
         public void onSubscribe(Flow.Subscription subscription) {
+            Objects.requireNonNull(subscription);
             if (!subscribed.compareAndSet(false, true)) {
                 subscription.cancel();
             } else {
@@ -135,6 +136,7 @@
 
         @Override
         public void onNext(List<ByteBuffer> items) {
+            Objects.requireNonNull(items);
             for (ByteBuffer item : items) {
                 byte[] buf = new byte[item.remaining()];
                 item.get(buf);
@@ -145,6 +147,7 @@
 
         @Override
         public void onError(Throwable throwable) {
+            Objects.requireNonNull(throwable);
             result.completeExceptionally(throwable);
         }
 
@@ -172,6 +175,7 @@
         private final FilePermission[] filePermissions;
         private final CompletableFuture<Path> result = new MinimalFuture<>();
 
+        private final AtomicBoolean subscribed = new AtomicBoolean();
         private volatile Flow.Subscription subscription;
         private volatile FileChannel out;
 
@@ -211,6 +215,12 @@
 
         @Override
         public void onSubscribe(Flow.Subscription subscription) {
+            Objects.requireNonNull(subscription);
+            if (!subscribed.compareAndSet(false, true)) {
+                subscription.cancel();
+                return;
+            }
+
             this.subscription = subscription;
             if (System.getSecurityManager() == null) {
                 try {
@@ -470,6 +480,7 @@
 
         @Override
         public void onSubscribe(Flow.Subscription s) {
+            Objects.requireNonNull(s);
             try {
                 if (!subscribed.compareAndSet(false, true)) {
                     s.cancel();
@@ -600,6 +611,7 @@
 
         @Override
         public void onSubscribe(Flow.Subscription subscription) {
+            Objects.requireNonNull(subscription);
             if (!subscribed.compareAndSet(false, true)) {
                 subscription.cancel();
             } else {
@@ -614,6 +626,7 @@
 
         @Override
         public void onError(Throwable throwable) {
+            Objects.requireNonNull(throwable);
             cf.completeExceptionally(throwable);
         }
 
@@ -907,13 +920,21 @@
             }
         }
 
+        private final AtomicBoolean subscribed = new AtomicBoolean();
+
         @Override
         public void onSubscribe(Flow.Subscription subscription) {
-            subscriptionCF.complete(subscription);
+            Objects.requireNonNull(subscription);
+            if (!subscribed.compareAndSet(false, true)) {
+                subscription.cancel();
+            } else {
+                subscriptionCF.complete(subscription);
+            }
         }
 
         @Override
         public void onNext(List<ByteBuffer> item) {
+            Objects.requireNonNull(item);
             try {
                 // cannot be called before onSubscribe()
                 assert subscriptionCF.isDone();
@@ -941,6 +962,7 @@
             // onError can be called before request(1), and therefore can
             // be called before subscriberRef is set.
             signalError(throwable);
+            Objects.requireNonNull(throwable);
         }
 
         @Override