8225583: Examine the HttpResponse.BodySubscribers for null handling
authorchegar
Tue, 18 Jun 2019 14:52:36 +0100
changeset 55402 b78af6d8a252
parent 55401 32cce302a1fd
child 55403 8d50ff464ae5
8225583: Examine the HttpResponse.BodySubscribers for null handling Reviewed-by: dfuchs, prappo
src/java.net.http/share/classes/java/net/http/HttpResponse.java
src/java.net.http/share/classes/jdk/internal/net/http/LineSubscriberAdapter.java
src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java
--- a/src/java.net.http/share/classes/java/net/http/HttpResponse.java	Tue Jun 18 11:06:29 2019 +0200
+++ b/src/java.net.http/share/classes/java/net/http/HttpResponse.java	Tue Jun 18 14:52:36 2019 +0100
@@ -1253,7 +1253,7 @@
         /**
          * Returns a {@code BodySubscriber} which buffers data before delivering
          * it to the given downstream subscriber. The subscriber guarantees to
-         * deliver {@code buffersize} bytes of data to each invocation of the
+         * deliver {@code bufferSize} bytes of data to each invocation of the
          * downstream's {@link BodySubscriber#onNext(Object) onNext} method,
          * except for the final invocation, just before
          * {@link BodySubscriber#onComplete() onComplete} is invoked. The final
--- a/src/java.net.http/share/classes/jdk/internal/net/http/LineSubscriberAdapter.java	Tue Jun 18 11:06:29 2019 +0200
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/LineSubscriberAdapter.java	Tue Jun 18 14:52:36 2019 +0100
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -40,6 +40,7 @@
 import java.util.concurrent.Flow;
 import java.util.concurrent.Flow.Subscriber;
 import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
@@ -56,6 +57,7 @@
     private final Function<? super S, ? extends R> finisher;
     private final Charset charset;
     private final String eol;
+    private final AtomicBoolean subscribed = new AtomicBoolean();
     private volatile LineSubscription downstream;
 
     private LineSubscriberAdapter(S subscriber,
@@ -72,6 +74,12 @@
 
     @Override
     public void onSubscribe(Subscription subscription) {
+        Objects.requireNonNull(subscription);
+        if (!subscribed.compareAndSet(false, true)) {
+            subscription.cancel();
+            return;
+        }
+
         downstream = LineSubscription.create(subscription,
                                              charset,
                                              eol,
@@ -82,6 +90,7 @@
 
     @Override
     public void onNext(List<ByteBuffer> item) {
+        Objects.requireNonNull(item);
         try {
             downstream.submit(item);
         } catch (Throwable t) {
@@ -91,6 +100,7 @@
 
     @Override
     public void onError(Throwable throwable) {
+        Objects.requireNonNull(throwable);
         try {
             downstream.signalError(throwable);
         } finally {
--- a/src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java	Tue Jun 18 11:06:29 2019 +0200
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java	Tue Jun 18 14:52:36 2019 +0100
@@ -125,6 +125,7 @@
 
         @Override
         public void onSubscribe(Flow.Subscription subscription) {
+            Objects.requireNonNull(subscription);
             if (!subscribed.compareAndSet(false, true)) {
                 subscription.cancel();
             } else {
@@ -135,6 +136,7 @@
 
         @Override
         public void onNext(List<ByteBuffer> items) {
+            Objects.requireNonNull(items);
             for (ByteBuffer item : items) {
                 byte[] buf = new byte[item.remaining()];
                 item.get(buf);
@@ -145,6 +147,7 @@
 
         @Override
         public void onError(Throwable throwable) {
+            Objects.requireNonNull(throwable);
             result.completeExceptionally(throwable);
         }
 
@@ -172,6 +175,7 @@
         private final FilePermission[] filePermissions;
         private final CompletableFuture<Path> result = new MinimalFuture<>();
 
+        private final AtomicBoolean subscribed = new AtomicBoolean();
         private volatile Flow.Subscription subscription;
         private volatile FileChannel out;
 
@@ -211,6 +215,12 @@
 
         @Override
         public void onSubscribe(Flow.Subscription subscription) {
+            Objects.requireNonNull(subscription);
+            if (!subscribed.compareAndSet(false, true)) {
+                subscription.cancel();
+                return;
+            }
+
             this.subscription = subscription;
             if (System.getSecurityManager() == null) {
                 try {
@@ -470,6 +480,7 @@
 
         @Override
         public void onSubscribe(Flow.Subscription s) {
+            Objects.requireNonNull(s);
             try {
                 if (!subscribed.compareAndSet(false, true)) {
                     s.cancel();
@@ -600,6 +611,7 @@
 
         @Override
         public void onSubscribe(Flow.Subscription subscription) {
+            Objects.requireNonNull(subscription);
             if (!subscribed.compareAndSet(false, true)) {
                 subscription.cancel();
             } else {
@@ -614,6 +626,7 @@
 
         @Override
         public void onError(Throwable throwable) {
+            Objects.requireNonNull(throwable);
             cf.completeExceptionally(throwable);
         }
 
@@ -907,13 +920,21 @@
             }
         }
 
+        private final AtomicBoolean subscribed = new AtomicBoolean();
+
         @Override
         public void onSubscribe(Flow.Subscription subscription) {
-            subscriptionCF.complete(subscription);
+            Objects.requireNonNull(subscription);
+            if (!subscribed.compareAndSet(false, true)) {
+                subscription.cancel();
+            } else {
+                subscriptionCF.complete(subscription);
+            }
         }
 
         @Override
         public void onNext(List<ByteBuffer> item) {
+            Objects.requireNonNull(item);
             try {
                 // cannot be called before onSubscribe()
                 assert subscriptionCF.isDone();
@@ -941,6 +962,7 @@
             // onError can be called before request(1), and therefore can
             // be called before subscriberRef is set.
             signalError(throwable);
+            Objects.requireNonNull(throwable);
         }
 
         @Override