src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java
changeset 48408 4f830b447edf
parent 48379 5382baab8371
child 56008 bbd688c6fbbb
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java	Thu Dec 21 10:26:03 2017 +0100
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java	Thu Dec 21 16:58:51 2017 +0000
@@ -47,6 +47,8 @@
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -558,4 +560,69 @@
             return cf;
         }
     }
+
+    /** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber}. */
+    static final class SubscriberAdapter<S extends Subscriber<? super List<ByteBuffer>>,R>
+        implements HttpResponse.BodySubscriber<R>
+    {
+        private final CompletableFuture<R> cf = new MinimalFuture<>();
+        private final S subscriber;
+        private final Function<S,R> finisher;
+        private volatile Subscription subscription;
+
+        SubscriberAdapter(S subscriber, Function<S,R> finisher) {
+            this.subscriber = Objects.requireNonNull(subscriber);
+            this.finisher = Objects.requireNonNull(finisher);
+        }
+
+        @Override
+        public void onSubscribe(Subscription subscription) {
+            Objects.requireNonNull(subscription);
+            if (this.subscription != null) {
+                subscription.cancel();
+            } else {
+                this.subscription = subscription;
+                subscriber.onSubscribe(subscription);
+            }
+        }
+
+        @Override
+        public void onNext(List<ByteBuffer> item) {
+            Objects.requireNonNull(item);
+            try {
+                subscriber.onNext(item);
+            } catch (Throwable throwable) {
+                subscription.cancel();
+                onError(throwable);
+            }
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+            Objects.requireNonNull(throwable);
+            try {
+                subscriber.onError(throwable);
+            } finally {
+                cf.completeExceptionally(throwable);
+            }
+        }
+
+        @Override
+        public void onComplete() {
+            try {
+                subscriber.onComplete();
+            } finally {
+                try {
+                    cf.complete(finisher.apply(subscriber));
+                } catch (Throwable throwable) {
+                    cf.completeExceptionally(throwable);
+                }
+            }
+        }
+
+        @Override
+        public CompletionStage<R> getBody() {
+            return cf;
+        }
+    }
 }