46 import java.util.Optional; |
46 import java.util.Optional; |
47 import java.util.concurrent.ArrayBlockingQueue; |
47 import java.util.concurrent.ArrayBlockingQueue; |
48 import java.util.concurrent.BlockingQueue; |
48 import java.util.concurrent.BlockingQueue; |
49 import java.util.concurrent.CompletableFuture; |
49 import java.util.concurrent.CompletableFuture; |
50 import java.util.concurrent.CompletionStage; |
50 import java.util.concurrent.CompletionStage; |
51 import java.util.concurrent.ConcurrentHashMap; |
|
52 import java.util.concurrent.Executor; |
|
53 import java.util.concurrent.Flow; |
51 import java.util.concurrent.Flow; |
54 import java.util.concurrent.Flow.Subscriber; |
52 import java.util.concurrent.Flow.Subscriber; |
55 import java.util.concurrent.Flow.Subscription; |
53 import java.util.concurrent.Flow.Subscription; |
56 import java.util.concurrent.atomic.AtomicBoolean; |
54 import java.util.concurrent.atomic.AtomicBoolean; |
57 import java.util.function.Consumer; |
55 import java.util.function.Consumer; |
473 } |
471 } |
474 |
472 |
475 public static BodySubscriber<Stream<String>> createLineStream(Charset charset) { |
473 public static BodySubscriber<Stream<String>> createLineStream(Charset charset) { |
476 Objects.requireNonNull(charset); |
474 Objects.requireNonNull(charset); |
477 BodySubscriber<InputStream> s = new HttpResponseInputStream(); |
475 BodySubscriber<InputStream> s = new HttpResponseInputStream(); |
478 return new MappedSubscriber<InputStream,Stream<String>>(s, |
476 return new MappingSubscriber<InputStream,Stream<String>>(s, |
479 (InputStream stream) -> { |
477 (InputStream stream) -> { |
480 return new BufferedReader(new InputStreamReader(stream, charset)) |
478 return new BufferedReader(new InputStreamReader(stream, charset)) |
481 .lines().onClose(() -> Utils.close(stream)); |
479 .lines().onClose(() -> Utils.close(stream)); |
482 }); |
480 }); |
483 } |
481 } |
600 * delegates all flow operations directly to this object. The |
598 * delegates all flow operations directly to this object. The |
601 * {@link CompletionStage} returned by {@link #getBody()}} takes the output |
599 * {@link CompletionStage} returned by {@link #getBody()}} takes the output |
602 * of the upstream {@code getBody()} and applies the mapper function to |
600 * of the upstream {@code getBody()} and applies the mapper function to |
603 * obtain the new {@code CompletionStage} type. |
601 * obtain the new {@code CompletionStage} type. |
604 * |
602 * |
605 * Uses an Executor that must be set externally. |
|
606 * |
|
607 * @param <T> the upstream body type |
603 * @param <T> the upstream body type |
608 * @param <U> this subscriber's body type |
604 * @param <U> this subscriber's body type |
609 */ |
605 */ |
610 public static class MappedSubscriber<T,U> implements BodySubscriber<U> { |
606 public static class MappingSubscriber<T,U> implements BodySubscriber<U> { |
611 final BodySubscriber<T> upstream; |
607 private final BodySubscriber<T> upstream; |
612 final Function<T,U> mapper; |
608 private final Function<T,U> mapper; |
613 |
609 |
614 /** |
610 public MappingSubscriber(BodySubscriber<T> upstream, Function<T,U> mapper) { |
615 * |
611 this.upstream = Objects.requireNonNull(upstream); |
616 * @param upstream |
612 this.mapper = Objects.requireNonNull(mapper); |
617 * @param mapper |
|
618 */ |
|
619 public MappedSubscriber(BodySubscriber<T> upstream, Function<T,U> mapper) { |
|
620 this.upstream = upstream; |
|
621 this.mapper = mapper; |
|
622 } |
613 } |
623 |
614 |
624 @Override |
615 @Override |
625 public CompletionStage<U> getBody() { |
616 public CompletionStage<U> getBody() { |
626 return upstream.getBody() |
617 return upstream.getBody().thenApply(mapper); |
627 .thenApply(mapper); |
|
628 } |
618 } |
629 |
619 |
630 @Override |
620 @Override |
631 public void onSubscribe(Flow.Subscription subscription) { |
621 public void onSubscribe(Flow.Subscription subscription) { |
632 upstream.onSubscribe(subscription); |
622 upstream.onSubscribe(subscription); |