src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java
branchhttp-client-branch
changeset 56097 15dc43936d39
parent 56092 fd85b2bf2b0d
child 56138 4f92b988600e
equal deleted inserted replaced
56094:881f0ecb513f 56097:15dc43936d39
    46 import java.util.Optional;
    46 import java.util.Optional;
    47 import java.util.concurrent.ArrayBlockingQueue;
    47 import java.util.concurrent.ArrayBlockingQueue;
    48 import java.util.concurrent.BlockingQueue;
    48 import java.util.concurrent.BlockingQueue;
    49 import java.util.concurrent.CompletableFuture;
    49 import java.util.concurrent.CompletableFuture;
    50 import java.util.concurrent.CompletionStage;
    50 import java.util.concurrent.CompletionStage;
    51 import java.util.concurrent.ConcurrentHashMap;
       
    52 import java.util.concurrent.Executor;
       
    53 import java.util.concurrent.Flow;
    51 import java.util.concurrent.Flow;
    54 import java.util.concurrent.Flow.Subscriber;
    52 import java.util.concurrent.Flow.Subscriber;
    55 import java.util.concurrent.Flow.Subscription;
    53 import java.util.concurrent.Flow.Subscription;
    56 import java.util.concurrent.atomic.AtomicBoolean;
    54 import java.util.concurrent.atomic.AtomicBoolean;
    57 import java.util.function.Consumer;
    55 import java.util.function.Consumer;
   473     }
   471     }
   474 
   472 
   475     public static BodySubscriber<Stream<String>> createLineStream(Charset charset) {
   473     public static BodySubscriber<Stream<String>> createLineStream(Charset charset) {
   476         Objects.requireNonNull(charset);
   474         Objects.requireNonNull(charset);
   477         BodySubscriber<InputStream> s = new HttpResponseInputStream();
   475         BodySubscriber<InputStream> s = new HttpResponseInputStream();
   478         return new MappedSubscriber<InputStream,Stream<String>>(s,
   476         return new MappingSubscriber<InputStream,Stream<String>>(s,
   479             (InputStream stream) -> {
   477             (InputStream stream) -> {
   480                 return new BufferedReader(new InputStreamReader(stream, charset))
   478                 return new BufferedReader(new InputStreamReader(stream, charset))
   481                             .lines().onClose(() -> Utils.close(stream));
   479                             .lines().onClose(() -> Utils.close(stream));
   482             });
   480             });
   483     }
   481     }
   600      * delegates all flow operations directly to this object. The
   598      * delegates all flow operations directly to this object. The
   601      * {@link CompletionStage} returned by {@link #getBody()}} takes the output
   599      * {@link CompletionStage} returned by {@link #getBody()}} takes the output
   602      * of the upstream {@code getBody()} and applies the mapper function to
   600      * of the upstream {@code getBody()} and applies the mapper function to
   603      * obtain the new {@code CompletionStage} type.
   601      * obtain the new {@code CompletionStage} type.
   604      *
   602      *
   605      * Uses an Executor that must be set externally.
       
   606      *
       
   607      * @param <T> the upstream body type
   603      * @param <T> the upstream body type
   608      * @param <U> this subscriber's body type
   604      * @param <U> this subscriber's body type
   609      */
   605      */
   610     public static class MappedSubscriber<T,U> implements BodySubscriber<U> {
   606     public static class MappingSubscriber<T,U> implements BodySubscriber<U> {
   611         final BodySubscriber<T> upstream;
   607         private final BodySubscriber<T> upstream;
   612         final Function<T,U> mapper;
   608         private final Function<T,U> mapper;
   613 
   609 
   614         /**
   610         public MappingSubscriber(BodySubscriber<T> upstream, Function<T,U> mapper) {
   615          *
   611             this.upstream = Objects.requireNonNull(upstream);
   616          * @param upstream
   612             this.mapper = Objects.requireNonNull(mapper);
   617          * @param mapper
       
   618          */
       
   619         public MappedSubscriber(BodySubscriber<T> upstream, Function<T,U> mapper) {
       
   620             this.upstream = upstream;
       
   621             this.mapper = mapper;
       
   622         }
   613         }
   623 
   614 
   624         @Override
   615         @Override
   625         public CompletionStage<U> getBody() {
   616         public CompletionStage<U> getBody() {
   626             return upstream.getBody()
   617             return upstream.getBody().thenApply(mapper);
   627                     .thenApply(mapper);
       
   628         }
   618         }
   629 
   619 
   630         @Override
   620         @Override
   631         public void onSubscribe(Flow.Subscription subscription) {
   621         public void onSubscribe(Flow.Subscription subscription) {
   632             upstream.onSubscribe(subscription);
   622             upstream.onSubscribe(subscription);