src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Tue Nov 21 12:32:16 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Tue Nov 21 17:17:37 2017 +0300
@@ -39,6 +39,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@@ -46,6 +47,7 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Flow;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import jdk.incubator.http.internal.common.MinimalFuture;
@@ -57,6 +59,7 @@
private final Consumer<Optional<byte[]>> consumer;
private Flow.Subscription subscription;
private final CompletableFuture<Void> result = new MinimalFuture<>();
+ private final AtomicBoolean subscribed = new AtomicBoolean();
ConsumerSubscriber(Consumer<Optional<byte[]>> consumer) {
this.consumer = consumer;
@@ -69,8 +72,12 @@
@Override
public void onSubscribe(Flow.Subscription subscription) {
- this.subscription = subscription;
- subscription.request(1);
+ if (!subscribed.compareAndSet(false, true)) {
+ subscription.cancel();
+ } else {
+ this.subscription = subscription;
+ subscription.request(1);
+ }
}
@Override
@@ -252,6 +259,7 @@
private volatile Throwable failed;
private volatile Iterator<ByteBuffer> currentListItr;
private volatile ByteBuffer currentBuffer;
+ private final AtomicBoolean subscribed = new AtomicBoolean();
HttpResponseInputStream() {
this(MAX_BUFFERS_IN_QUEUE);
@@ -351,19 +359,20 @@
@Override
public void onSubscribe(Flow.Subscription s) {
- if (this.subscription != null) {
+ if (!subscribed.compareAndSet(false, true)) {
s.cancel();
- return;
+ } else {
+ this.subscription = s;
+ assert buffers.remainingCapacity() > 1; // should contain at least 2
+ DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting "
+ + Math.max(1, buffers.remainingCapacity() - 1));
+ s.request(Math.max(1, buffers.remainingCapacity() - 1));
}
- this.subscription = s;
- assert buffers.remainingCapacity() > 1; // should contain at least 2
- DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting "
- + Math.max(1, buffers.remainingCapacity() - 1));
- s.request(Math.max(1, buffers.remainingCapacity() - 1));
}
@Override
public void onNext(List<ByteBuffer> t) {
+ Objects.requireNonNull(t);
try {
DEBUG_LOGGER.log(Level.DEBUG, "next item received");
if (!buffers.offer(t)) {
@@ -383,7 +392,7 @@
@Override
public void onError(Throwable thrwbl) {
subscription = null;
- failed = thrwbl == null ? new InternalError("illegal null Throwable") : thrwbl;
+ failed = Objects.requireNonNull(thrwbl);
// The client process that reads the input stream might
// be blocked in queue.take().
// Tries to offer LAST_LIST to the queue. If the queue is
@@ -474,8 +483,9 @@
*/
static class NullSubscriber<T> implements HttpResponse.BodySubscriber<T> {
- final CompletableFuture<T> cf = new MinimalFuture<>();
- final Optional<T> result;
+ private final CompletableFuture<T> cf = new MinimalFuture<>();
+ private final Optional<T> result;
+ private final AtomicBoolean subscribed = new AtomicBoolean();
NullSubscriber(Optional<T> result) {
this.result = result;
@@ -483,12 +493,16 @@
@Override
public void onSubscribe(Flow.Subscription subscription) {
- subscription.request(Long.MAX_VALUE);
+ if (!subscribed.compareAndSet(false, true)) {
+ subscription.cancel();
+ } else {
+ subscription.request(Long.MAX_VALUE);
+ }
}
@Override
public void onNext(List<ByteBuffer> items) {
- // NO-OP
+ Objects.requireNonNull(items);
}
@Override