src/java.net.http/share/classes/jdk/internal/net/http/LineSubscriberAdapter.java
changeset 55402 b78af6d8a252
parent 49765 ee6f7a61f3a5
--- a/src/java.net.http/share/classes/jdk/internal/net/http/LineSubscriberAdapter.java	Tue Jun 18 11:06:29 2019 +0200
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/LineSubscriberAdapter.java	Tue Jun 18 14:52:36 2019 +0100
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -40,6 +40,7 @@
 import java.util.concurrent.Flow;
 import java.util.concurrent.Flow.Subscriber;
 import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
@@ -56,6 +57,7 @@
     private final Function<? super S, ? extends R> finisher;
     private final Charset charset;
     private final String eol;
+    private final AtomicBoolean subscribed = new AtomicBoolean();
     private volatile LineSubscription downstream;
 
     private LineSubscriberAdapter(S subscriber,
@@ -72,6 +74,12 @@
 
     @Override
     public void onSubscribe(Subscription subscription) {
+        Objects.requireNonNull(subscription);
+        if (!subscribed.compareAndSet(false, true)) {
+            subscription.cancel();
+            return;
+        }
+
         downstream = LineSubscription.create(subscription,
                                              charset,
                                              eol,
@@ -82,6 +90,7 @@
 
     @Override
     public void onNext(List<ByteBuffer> item) {
+        Objects.requireNonNull(item);
         try {
             downstream.submit(item);
         } catch (Throwable t) {
@@ -91,6 +100,7 @@
 
     @Override
     public void onError(Throwable throwable) {
+        Objects.requireNonNull(throwable);
         try {
             downstream.signalError(throwable);
         } finally {