src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java
branchdatagramsocketimpl-branch
changeset 58678 9cf78a70fa4f
parent 53467 97cf88608d76
child 58679 9c3209ff7550
--- a/src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java	Thu Oct 17 20:27:44 2019 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java	Thu Oct 17 20:53:35 2019 +0100
@@ -125,6 +125,7 @@
 
         @Override
         public void onSubscribe(Flow.Subscription subscription) {
+            Objects.requireNonNull(subscription);
             if (!subscribed.compareAndSet(false, true)) {
                 subscription.cancel();
             } else {
@@ -135,6 +136,7 @@
 
         @Override
         public void onNext(List<ByteBuffer> items) {
+            Objects.requireNonNull(items);
             for (ByteBuffer item : items) {
                 byte[] buf = new byte[item.remaining()];
                 item.get(buf);
@@ -145,6 +147,7 @@
 
         @Override
         public void onError(Throwable throwable) {
+            Objects.requireNonNull(throwable);
             result.completeExceptionally(throwable);
         }
 
@@ -172,6 +175,7 @@
         private final FilePermission[] filePermissions;
         private final CompletableFuture<Path> result = new MinimalFuture<>();
 
+        private final AtomicBoolean subscribed = new AtomicBoolean();
         private volatile Flow.Subscription subscription;
         private volatile FileChannel out;
 
@@ -211,6 +215,12 @@
 
         @Override
         public void onSubscribe(Flow.Subscription subscription) {
+            Objects.requireNonNull(subscription);
+            if (!subscribed.compareAndSet(false, true)) {
+                subscription.cancel();
+                return;
+            }
+
             this.subscription = subscription;
             if (System.getSecurityManager() == null) {
                 try {
@@ -428,6 +438,10 @@
 
         @Override
         public int read(byte[] bytes, int off, int len) throws IOException {
+            Objects.checkFromIndexSize(off, len, bytes.length);
+            if (len == 0) {
+                return 0;
+            }
             // get the buffer to read from, possibly blocking if
             // none is available
             ByteBuffer buffer;
@@ -470,6 +484,7 @@
 
         @Override
         public void onSubscribe(Flow.Subscription s) {
+            Objects.requireNonNull(s);
             try {
                 if (!subscribed.compareAndSet(false, true)) {
                     s.cancel();
@@ -600,6 +615,7 @@
 
         @Override
         public void onSubscribe(Flow.Subscription subscription) {
+            Objects.requireNonNull(subscription);
             if (!subscribed.compareAndSet(false, true)) {
                 subscription.cancel();
             } else {
@@ -614,6 +630,7 @@
 
         @Override
         public void onError(Throwable throwable) {
+            Objects.requireNonNull(throwable);
             cf.completeExceptionally(throwable);
         }
 
@@ -907,13 +924,21 @@
             }
         }
 
+        private final AtomicBoolean subscribed = new AtomicBoolean();
+
         @Override
         public void onSubscribe(Flow.Subscription subscription) {
-            subscriptionCF.complete(subscription);
+            Objects.requireNonNull(subscription);
+            if (!subscribed.compareAndSet(false, true)) {
+                subscription.cancel();
+            } else {
+                subscriptionCF.complete(subscription);
+            }
         }
 
         @Override
         public void onNext(List<ByteBuffer> item) {
+            Objects.requireNonNull(item);
             try {
                 // cannot be called before onSubscribe()
                 assert subscriptionCF.isDone();
@@ -941,6 +966,7 @@
             // onError can be called before request(1), and therefore can
             // be called before subscriberRef is set.
             signalError(throwable);
+            Objects.requireNonNull(throwable);
         }
 
         @Override