src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java
changeset 48408 4f830b447edf
parent 48379 5382baab8371
child 56008 bbd688c6fbbb
equal deleted inserted replaced
48407:fcb5b835bf32 48408:4f830b447edf
    45 import java.util.concurrent.BlockingQueue;
    45 import java.util.concurrent.BlockingQueue;
    46 import java.util.concurrent.CompletableFuture;
    46 import java.util.concurrent.CompletableFuture;
    47 import java.util.concurrent.CompletionStage;
    47 import java.util.concurrent.CompletionStage;
    48 import java.util.concurrent.ConcurrentHashMap;
    48 import java.util.concurrent.ConcurrentHashMap;
    49 import java.util.concurrent.Flow;
    49 import java.util.concurrent.Flow;
       
    50 import java.util.concurrent.Flow.Subscriber;
       
    51 import java.util.concurrent.Flow.Subscription;
    50 import java.util.concurrent.atomic.AtomicBoolean;
    52 import java.util.concurrent.atomic.AtomicBoolean;
    51 import java.util.function.Consumer;
    53 import java.util.function.Consumer;
    52 import java.util.function.Function;
    54 import java.util.function.Function;
    53 import jdk.incubator.http.internal.common.MinimalFuture;
    55 import jdk.incubator.http.internal.common.MinimalFuture;
    54 import jdk.incubator.http.internal.common.Utils;
    56 import jdk.incubator.http.internal.common.Utils;
   556         @Override
   558         @Override
   557         public CompletionStage<T> getBody() {
   559         public CompletionStage<T> getBody() {
   558             return cf;
   560             return cf;
   559         }
   561         }
   560     }
   562     }
       
   563 
       
   564     /** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber}. */
       
   565     static final class SubscriberAdapter<S extends Subscriber<? super List<ByteBuffer>>,R>
       
   566         implements HttpResponse.BodySubscriber<R>
       
   567     {
       
   568         private final CompletableFuture<R> cf = new MinimalFuture<>();
       
   569         private final S subscriber;
       
   570         private final Function<S,R> finisher;
       
   571         private volatile Subscription subscription;
       
   572 
       
   573         SubscriberAdapter(S subscriber, Function<S,R> finisher) {
       
   574             this.subscriber = Objects.requireNonNull(subscriber);
       
   575             this.finisher = Objects.requireNonNull(finisher);
       
   576         }
       
   577 
       
   578         @Override
       
   579         public void onSubscribe(Subscription subscription) {
       
   580             Objects.requireNonNull(subscription);
       
   581             if (this.subscription != null) {
       
   582                 subscription.cancel();
       
   583             } else {
       
   584                 this.subscription = subscription;
       
   585                 subscriber.onSubscribe(subscription);
       
   586             }
       
   587         }
       
   588 
       
   589         @Override
       
   590         public void onNext(List<ByteBuffer> item) {
       
   591             Objects.requireNonNull(item);
       
   592             try {
       
   593                 subscriber.onNext(item);
       
   594             } catch (Throwable throwable) {
       
   595                 subscription.cancel();
       
   596                 onError(throwable);
       
   597             }
       
   598         }
       
   599 
       
   600         @Override
       
   601         public void onError(Throwable throwable) {
       
   602             Objects.requireNonNull(throwable);
       
   603             try {
       
   604                 subscriber.onError(throwable);
       
   605             } finally {
       
   606                 cf.completeExceptionally(throwable);
       
   607             }
       
   608         }
       
   609 
       
   610         @Override
       
   611         public void onComplete() {
       
   612             try {
       
   613                 subscriber.onComplete();
       
   614             } finally {
       
   615                 try {
       
   616                     cf.complete(finisher.apply(subscriber));
       
   617                 } catch (Throwable throwable) {
       
   618                     cf.completeExceptionally(throwable);
       
   619                 }
       
   620             }
       
   621         }
       
   622 
       
   623         @Override
       
   624         public CompletionStage<R> getBody() {
       
   625             return cf;
       
   626         }
       
   627     }
   561 }
   628 }