src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Thu Dec 21 10:26:03 2017 +0100
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Thu Dec 21 16:58:51 2017 +0000
@@ -47,6 +47,8 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -558,4 +560,69 @@
return cf;
}
}
+
+ /** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber}. */
+ static final class SubscriberAdapter<S extends Subscriber<? super List<ByteBuffer>>,R>
+ implements HttpResponse.BodySubscriber<R>
+ {
+ private final CompletableFuture<R> cf = new MinimalFuture<>();
+ private final S subscriber;
+ private final Function<S,R> finisher;
+ private volatile Subscription subscription;
+
+ SubscriberAdapter(S subscriber, Function<S,R> finisher) {
+ this.subscriber = Objects.requireNonNull(subscriber);
+ this.finisher = Objects.requireNonNull(finisher);
+ }
+
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ Objects.requireNonNull(subscription);
+ if (this.subscription != null) {
+ subscription.cancel();
+ } else {
+ this.subscription = subscription;
+ subscriber.onSubscribe(subscription);
+ }
+ }
+
+ @Override
+ public void onNext(List<ByteBuffer> item) {
+ Objects.requireNonNull(item);
+ try {
+ subscriber.onNext(item);
+ } catch (Throwable throwable) {
+ subscription.cancel();
+ onError(throwable);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ Objects.requireNonNull(throwable);
+ try {
+ subscriber.onError(throwable);
+ } finally {
+ cf.completeExceptionally(throwable);
+ }
+ }
+
+ @Override
+ public void onComplete() {
+ try {
+ subscriber.onComplete();
+ } finally {
+ try {
+ cf.complete(finisher.apply(subscriber));
+ } catch (Throwable throwable) {
+ cf.completeExceptionally(throwable);
+ }
+ }
+ }
+
+ @Override
+ public CompletionStage<R> getBody() {
+ return cf;
+ }
+ }
}