--- a/src/java.net.http/share/classes/jdk/internal/net/http/LineSubscriberAdapter.java Tue Jun 18 11:06:29 2019 +0200
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/LineSubscriberAdapter.java Tue Jun 18 14:52:36 2019 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -40,6 +40,7 @@
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@@ -56,6 +57,7 @@
private final Function<? super S, ? extends R> finisher;
private final Charset charset;
private final String eol;
+ private final AtomicBoolean subscribed = new AtomicBoolean();
private volatile LineSubscription downstream;
private LineSubscriberAdapter(S subscriber,
@@ -72,6 +74,12 @@
@Override
public void onSubscribe(Subscription subscription) {
+ Objects.requireNonNull(subscription);
+ if (!subscribed.compareAndSet(false, true)) {
+ subscription.cancel();
+ return;
+ }
+
downstream = LineSubscription.create(subscription,
charset,
eol,
@@ -82,6 +90,7 @@
@Override
public void onNext(List<ByteBuffer> item) {
+ Objects.requireNonNull(item);
try {
downstream.submit(item);
} catch (Throwable t) {
@@ -91,6 +100,7 @@
@Override
public void onError(Throwable throwable) {
+ Objects.requireNonNull(throwable);
try {
downstream.signalError(throwable);
} finally {