45 import java.util.concurrent.BlockingQueue; |
45 import java.util.concurrent.BlockingQueue; |
46 import java.util.concurrent.CompletableFuture; |
46 import java.util.concurrent.CompletableFuture; |
47 import java.util.concurrent.CompletionStage; |
47 import java.util.concurrent.CompletionStage; |
48 import java.util.concurrent.ConcurrentHashMap; |
48 import java.util.concurrent.ConcurrentHashMap; |
49 import java.util.concurrent.Flow; |
49 import java.util.concurrent.Flow; |
|
50 import java.util.concurrent.Flow.Subscriber; |
|
51 import java.util.concurrent.Flow.Subscription; |
50 import java.util.concurrent.atomic.AtomicBoolean; |
52 import java.util.concurrent.atomic.AtomicBoolean; |
51 import java.util.function.Consumer; |
53 import java.util.function.Consumer; |
52 import java.util.function.Function; |
54 import java.util.function.Function; |
53 import jdk.incubator.http.internal.common.MinimalFuture; |
55 import jdk.incubator.http.internal.common.MinimalFuture; |
54 import jdk.incubator.http.internal.common.Utils; |
56 import jdk.incubator.http.internal.common.Utils; |
556 @Override |
558 @Override |
557 public CompletionStage<T> getBody() { |
559 public CompletionStage<T> getBody() { |
558 return cf; |
560 return cf; |
559 } |
561 } |
560 } |
562 } |
|
563 |
|
564 /** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber}. */ |
|
565 static final class SubscriberAdapter<S extends Subscriber<? super List<ByteBuffer>>,R> |
|
566 implements HttpResponse.BodySubscriber<R> |
|
567 { |
|
568 private final CompletableFuture<R> cf = new MinimalFuture<>(); |
|
569 private final S subscriber; |
|
570 private final Function<S,R> finisher; |
|
571 private volatile Subscription subscription; |
|
572 |
|
573 SubscriberAdapter(S subscriber, Function<S,R> finisher) { |
|
574 this.subscriber = Objects.requireNonNull(subscriber); |
|
575 this.finisher = Objects.requireNonNull(finisher); |
|
576 } |
|
577 |
|
578 @Override |
|
579 public void onSubscribe(Subscription subscription) { |
|
580 Objects.requireNonNull(subscription); |
|
581 if (this.subscription != null) { |
|
582 subscription.cancel(); |
|
583 } else { |
|
584 this.subscription = subscription; |
|
585 subscriber.onSubscribe(subscription); |
|
586 } |
|
587 } |
|
588 |
|
589 @Override |
|
590 public void onNext(List<ByteBuffer> item) { |
|
591 Objects.requireNonNull(item); |
|
592 try { |
|
593 subscriber.onNext(item); |
|
594 } catch (Throwable throwable) { |
|
595 subscription.cancel(); |
|
596 onError(throwable); |
|
597 } |
|
598 } |
|
599 |
|
600 @Override |
|
601 public void onError(Throwable throwable) { |
|
602 Objects.requireNonNull(throwable); |
|
603 try { |
|
604 subscriber.onError(throwable); |
|
605 } finally { |
|
606 cf.completeExceptionally(throwable); |
|
607 } |
|
608 } |
|
609 |
|
610 @Override |
|
611 public void onComplete() { |
|
612 try { |
|
613 subscriber.onComplete(); |
|
614 } finally { |
|
615 try { |
|
616 cf.complete(finisher.apply(subscriber)); |
|
617 } catch (Throwable throwable) { |
|
618 cf.completeExceptionally(throwable); |
|
619 } |
|
620 } |
|
621 } |
|
622 |
|
623 @Override |
|
624 public CompletionStage<R> getBody() { |
|
625 return cf; |
|
626 } |
|
627 } |
561 } |
628 } |