# HG changeset patch # User prappo # Date 1511273857 -10800 # Node ID 3bac3bca4adb7721a70bdf57c492a52be2476f33 # Parent 2a7e2724a42256f56f628b0cceee30d15dc16652 http-client-branch: removed RS TCK-incompatible publisher, updated subscribers so they'd pass TCK diff -r 2a7e2724a422 -r 3bac3bca4adb src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PseudoPublisher.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PseudoPublisher.java Tue Nov 21 12:32:16 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,80 +0,0 @@ -/* - * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ - -package jdk.incubator.http; - -import java.util.concurrent.Flow; -import java.util.concurrent.atomic.AtomicBoolean; - -// Completes the subscription on first request. Never calls onNext() - -class PseudoPublisher implements Flow.Publisher { - - private final Throwable throwable; - - PseudoPublisher() { - this(null); - } - - PseudoPublisher(Throwable throwable) { - this.throwable = throwable; - } - - @Override - public void subscribe(Flow.Subscriber subscriber) { - subscriber.onSubscribe(new Subscription(subscriber)); - } - - private class Subscription implements Flow.Subscription { - - private final Flow.Subscriber subscriber; - private final AtomicBoolean done = new AtomicBoolean(); - - Subscription(Flow.Subscriber subscriber) { - this.subscriber = subscriber; - } - - @Override - public void request(long n) { - if (done.compareAndSet(false, true)) { - if (n > 0) { - if (throwable == null) { - subscriber.onComplete(); - } else { - subscriber.onError(throwable); - } - } else { - subscriber.onError(new IllegalArgumentException("request(" + n + ")")); - } - } - } - - @Override - public void cancel() { - done.set(true); - } - - } -} diff -r 2a7e2724a422 -r 3bac3bca4adb src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RequestPublishers.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RequestPublishers.java Tue Nov 21 12:32:16 2017 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RequestPublishers.java Tue Nov 21 17:17:37 2017 +0300 @@ -39,6 +39,7 @@ import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; @@ -198,7 +199,8 @@ } static class EmptyPublisher implements HttpRequest.BodyPublisher { - private final PseudoPublisher delegate = new PseudoPublisher<>(); + private final Flow.Publisher delegate = + new PullPublisher(Collections.emptyList(), null); @Override public long contentLength() { diff -r 2a7e2724a422 -r 3bac3bca4adb src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Tue Nov 21 12:32:16 2017 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Tue Nov 21 17:17:37 2017 +0300 @@ -39,6 +39,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -46,6 +47,7 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Function; import jdk.incubator.http.internal.common.MinimalFuture; @@ -57,6 +59,7 @@ private final Consumer> consumer; private Flow.Subscription subscription; private final CompletableFuture result = new MinimalFuture<>(); + private final AtomicBoolean subscribed = new AtomicBoolean(); ConsumerSubscriber(Consumer> consumer) { this.consumer = consumer; @@ -69,8 +72,12 @@ @Override public void onSubscribe(Flow.Subscription subscription) { - this.subscription = subscription; - subscription.request(1); + if (!subscribed.compareAndSet(false, true)) { + subscription.cancel(); + } else { + this.subscription = subscription; + subscription.request(1); + } } @Override @@ -252,6 +259,7 @@ private volatile Throwable failed; private volatile Iterator currentListItr; private volatile ByteBuffer currentBuffer; + private final AtomicBoolean subscribed = new AtomicBoolean(); HttpResponseInputStream() { this(MAX_BUFFERS_IN_QUEUE); @@ -351,19 +359,20 @@ @Override public void onSubscribe(Flow.Subscription s) { - if (this.subscription != null) { + if (!subscribed.compareAndSet(false, true)) { s.cancel(); - return; + } else { + this.subscription = s; + assert buffers.remainingCapacity() > 1; // should contain at least 2 + DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting " + + Math.max(1, buffers.remainingCapacity() - 1)); + s.request(Math.max(1, buffers.remainingCapacity() - 1)); } - this.subscription = s; - assert buffers.remainingCapacity() > 1; // should contain at least 2 - DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting " - + Math.max(1, buffers.remainingCapacity() - 1)); - s.request(Math.max(1, buffers.remainingCapacity() - 1)); } @Override public void onNext(List t) { + Objects.requireNonNull(t); try { DEBUG_LOGGER.log(Level.DEBUG, "next item received"); if (!buffers.offer(t)) { @@ -383,7 +392,7 @@ @Override public void onError(Throwable thrwbl) { subscription = null; - failed = thrwbl == null ? new InternalError("illegal null Throwable") : thrwbl; + failed = Objects.requireNonNull(thrwbl); // The client process that reads the input stream might // be blocked in queue.take(). // Tries to offer LAST_LIST to the queue. If the queue is @@ -474,8 +483,9 @@ */ static class NullSubscriber implements HttpResponse.BodySubscriber { - final CompletableFuture cf = new MinimalFuture<>(); - final Optional result; + private final CompletableFuture cf = new MinimalFuture<>(); + private final Optional result; + private final AtomicBoolean subscribed = new AtomicBoolean(); NullSubscriber(Optional result) { this.result = result; @@ -483,12 +493,16 @@ @Override public void onSubscribe(Flow.Subscription subscription) { - subscription.request(Long.MAX_VALUE); + if (!subscribed.compareAndSet(false, true)) { + subscription.cancel(); + } else { + subscription.request(Long.MAX_VALUE); + } } @Override public void onNext(List items) { - // NO-OP + Objects.requireNonNull(items); } @Override