diff -r fcb5b835bf32 -r 4f830b447edf src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Thu Dec 21 10:26:03 2017 +0100 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Thu Dec 21 16:58:51 2017 +0000 @@ -47,6 +47,8 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Flow; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Function; @@ -558,4 +560,69 @@ return cf; } } + + /** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber}. */ + static final class SubscriberAdapter>,R> + implements HttpResponse.BodySubscriber + { + private final CompletableFuture cf = new MinimalFuture<>(); + private final S subscriber; + private final Function finisher; + private volatile Subscription subscription; + + SubscriberAdapter(S subscriber, Function finisher) { + this.subscriber = Objects.requireNonNull(subscriber); + this.finisher = Objects.requireNonNull(finisher); + } + + @Override + public void onSubscribe(Subscription subscription) { + Objects.requireNonNull(subscription); + if (this.subscription != null) { + subscription.cancel(); + } else { + this.subscription = subscription; + subscriber.onSubscribe(subscription); + } + } + + @Override + public void onNext(List item) { + Objects.requireNonNull(item); + try { + subscriber.onNext(item); + } catch (Throwable throwable) { + subscription.cancel(); + onError(throwable); + } + } + + @Override + public void onError(Throwable throwable) { + Objects.requireNonNull(throwable); + try { + subscriber.onError(throwable); + } finally { + cf.completeExceptionally(throwable); + } + } + + @Override + public void onComplete() { + try { + subscriber.onComplete(); + } finally { + try { + cf.complete(finisher.apply(subscriber)); + } catch (Throwable throwable) { + cf.completeExceptionally(throwable); + } + } + } + + @Override + public CompletionStage getBody() { + return cf; + } + } }