8225583: Examine the HttpResponse.BodySubscribers for null handling
Reviewed-by: dfuchs, prappo
--- a/src/java.net.http/share/classes/java/net/http/HttpResponse.java Tue Jun 18 11:06:29 2019 +0200
+++ b/src/java.net.http/share/classes/java/net/http/HttpResponse.java Tue Jun 18 14:52:36 2019 +0100
@@ -1253,7 +1253,7 @@
/**
* Returns a {@code BodySubscriber} which buffers data before delivering
* it to the given downstream subscriber. The subscriber guarantees to
- * deliver {@code buffersize} bytes of data to each invocation of the
+ * deliver {@code bufferSize} bytes of data to each invocation of the
* downstream's {@link BodySubscriber#onNext(Object) onNext} method,
* except for the final invocation, just before
* {@link BodySubscriber#onComplete() onComplete} is invoked. The final
--- a/src/java.net.http/share/classes/jdk/internal/net/http/LineSubscriberAdapter.java Tue Jun 18 11:06:29 2019 +0200
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/LineSubscriberAdapter.java Tue Jun 18 14:52:36 2019 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -40,6 +40,7 @@
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@@ -56,6 +57,7 @@
private final Function<? super S, ? extends R> finisher;
private final Charset charset;
private final String eol;
+ private final AtomicBoolean subscribed = new AtomicBoolean();
private volatile LineSubscription downstream;
private LineSubscriberAdapter(S subscriber,
@@ -72,6 +74,12 @@
@Override
public void onSubscribe(Subscription subscription) {
+ Objects.requireNonNull(subscription);
+ if (!subscribed.compareAndSet(false, true)) {
+ subscription.cancel();
+ return;
+ }
+
downstream = LineSubscription.create(subscription,
charset,
eol,
@@ -82,6 +90,7 @@
@Override
public void onNext(List<ByteBuffer> item) {
+ Objects.requireNonNull(item);
try {
downstream.submit(item);
} catch (Throwable t) {
@@ -91,6 +100,7 @@
@Override
public void onError(Throwable throwable) {
+ Objects.requireNonNull(throwable);
try {
downstream.signalError(throwable);
} finally {
--- a/src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java Tue Jun 18 11:06:29 2019 +0200
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java Tue Jun 18 14:52:36 2019 +0100
@@ -125,6 +125,7 @@
@Override
public void onSubscribe(Flow.Subscription subscription) {
+ Objects.requireNonNull(subscription);
if (!subscribed.compareAndSet(false, true)) {
subscription.cancel();
} else {
@@ -135,6 +136,7 @@
@Override
public void onNext(List<ByteBuffer> items) {
+ Objects.requireNonNull(items);
for (ByteBuffer item : items) {
byte[] buf = new byte[item.remaining()];
item.get(buf);
@@ -145,6 +147,7 @@
@Override
public void onError(Throwable throwable) {
+ Objects.requireNonNull(throwable);
result.completeExceptionally(throwable);
}
@@ -172,6 +175,7 @@
private final FilePermission[] filePermissions;
private final CompletableFuture<Path> result = new MinimalFuture<>();
+ private final AtomicBoolean subscribed = new AtomicBoolean();
private volatile Flow.Subscription subscription;
private volatile FileChannel out;
@@ -211,6 +215,12 @@
@Override
public void onSubscribe(Flow.Subscription subscription) {
+ Objects.requireNonNull(subscription);
+ if (!subscribed.compareAndSet(false, true)) {
+ subscription.cancel();
+ return;
+ }
+
this.subscription = subscription;
if (System.getSecurityManager() == null) {
try {
@@ -470,6 +480,7 @@
@Override
public void onSubscribe(Flow.Subscription s) {
+ Objects.requireNonNull(s);
try {
if (!subscribed.compareAndSet(false, true)) {
s.cancel();
@@ -600,6 +611,7 @@
@Override
public void onSubscribe(Flow.Subscription subscription) {
+ Objects.requireNonNull(subscription);
if (!subscribed.compareAndSet(false, true)) {
subscription.cancel();
} else {
@@ -614,6 +626,7 @@
@Override
public void onError(Throwable throwable) {
+ Objects.requireNonNull(throwable);
cf.completeExceptionally(throwable);
}
@@ -907,13 +920,21 @@
}
}
+ private final AtomicBoolean subscribed = new AtomicBoolean();
+
@Override
public void onSubscribe(Flow.Subscription subscription) {
- subscriptionCF.complete(subscription);
+ Objects.requireNonNull(subscription);
+ if (!subscribed.compareAndSet(false, true)) {
+ subscription.cancel();
+ } else {
+ subscriptionCF.complete(subscription);
+ }
}
@Override
public void onNext(List<ByteBuffer> item) {
+ Objects.requireNonNull(item);
try {
// cannot be called before onSubscribe()
assert subscriptionCF.isDone();
@@ -941,6 +962,7 @@
// onError can be called before request(1), and therefore can
// be called before subscriberRef is set.
signalError(throwable);
+ Objects.requireNonNull(throwable);
}
@Override