src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java
branchhttp-client-branch
changeset 56009 cf8792f51dee
parent 56008 bbd688c6fbbb
child 56010 782b2f2d1e76
equal deleted inserted replaced
56008:bbd688c6fbbb 56009:cf8792f51dee
     1 /*
     1 /*
     2  * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved.
     2  * Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     4  *
     4  *
     5  * This code is free software; you can redistribute it and/or modify it
     5  * This code is free software; you can redistribute it and/or modify it
     6  * under the terms of the GNU General Public License version 2 only, as
     6  * under the terms of the GNU General Public License version 2 only, as
     7  * published by the Free Software Foundation.  Oracle designates this
     7  * published by the Free Software Foundation.  Oracle designates this
    23  * questions.
    23  * questions.
    24  */
    24  */
    25 
    25 
    26 package jdk.incubator.http;
    26 package jdk.incubator.http;
    27 
    27 
       
    28 import java.io.BufferedReader;
    28 import java.io.IOException;
    29 import java.io.IOException;
    29 import java.io.InputStream;
    30 import java.io.InputStream;
       
    31 import java.io.InputStreamReader;
    30 import java.lang.System.Logger.Level;
    32 import java.lang.System.Logger.Level;
    31 import java.nio.ByteBuffer;
    33 import java.nio.ByteBuffer;
    32 import java.nio.channels.FileChannel;
    34 import java.nio.channels.FileChannel;
       
    35 import java.nio.charset.Charset;
       
    36 import java.nio.charset.StandardCharsets;
    33 import java.nio.file.OpenOption;
    37 import java.nio.file.OpenOption;
    34 import java.nio.file.Path;
    38 import java.nio.file.Path;
    35 import java.security.AccessControlContext;
    39 import java.security.AccessControlContext;
    36 import java.security.AccessController;
    40 import java.security.AccessController;
    37 import java.security.PrivilegedActionException;
    41 import java.security.PrivilegedActionException;
    50 import java.util.concurrent.Flow.Subscriber;
    54 import java.util.concurrent.Flow.Subscriber;
    51 import java.util.concurrent.Flow.Subscription;
    55 import java.util.concurrent.Flow.Subscription;
    52 import java.util.concurrent.atomic.AtomicBoolean;
    56 import java.util.concurrent.atomic.AtomicBoolean;
    53 import java.util.function.Consumer;
    57 import java.util.function.Consumer;
    54 import java.util.function.Function;
    58 import java.util.function.Function;
       
    59 import java.util.stream.Stream;
    55 import jdk.incubator.http.internal.common.MinimalFuture;
    60 import jdk.incubator.http.internal.common.MinimalFuture;
    56 import jdk.incubator.http.internal.common.Utils;
    61 import jdk.incubator.http.internal.common.Utils;
    57 
    62 
    58 class ResponseSubscribers {
    63 class ResponseSubscribers {
    59 
    64 
   459             }
   464             }
   460         }
   465         }
   461 
   466 
   462     }
   467     }
   463 
   468 
       
   469     /**
       
   470      * A {@code Stream<String>} built on top of the Flow API.
       
   471      */
       
   472     static final class HttpLineStream implements HttpResponse.BodySubscriber<Stream<String>> {
       
   473 
       
   474         private final HttpResponseInputStream responseInputStream;
       
   475         private final Charset charset;
       
   476         private HttpLineStream(Charset charset) {
       
   477             this.charset = Objects.requireNonNull(charset);
       
   478             responseInputStream = new HttpResponseInputStream();
       
   479         }
       
   480 
       
   481         @Override
       
   482         public CompletionStage<Stream<String>> getBody() {
       
   483             return responseInputStream.getBody().thenApply((is) ->
       
   484                     new BufferedReader(new InputStreamReader(is, charset))
       
   485                             .lines().onClose(this::close));
       
   486         }
       
   487 
       
   488         @Override
       
   489         public void onSubscribe(Subscription subscription) {
       
   490             responseInputStream.onSubscribe(subscription);
       
   491         }
       
   492 
       
   493         @Override
       
   494         public void onNext(List<ByteBuffer> item) {
       
   495             responseInputStream.onNext(item);
       
   496         }
       
   497 
       
   498         @Override
       
   499         public void onError(Throwable throwable) {
       
   500             responseInputStream.onError(throwable);
       
   501         }
       
   502 
       
   503         @Override
       
   504         public void onComplete() {
       
   505             responseInputStream.onComplete();
       
   506         }
       
   507 
       
   508         void close() {
       
   509             try {
       
   510                 responseInputStream.close();
       
   511             } catch (IOException x) {
       
   512                 // ignore
       
   513             }
       
   514         }
       
   515 
       
   516         static HttpLineStream create(Charset charset) {
       
   517             return new HttpLineStream(Optional.ofNullable(charset).orElse(StandardCharsets.UTF_8));
       
   518         }
       
   519     }
       
   520 
   464     static class MultiSubscriberImpl<V>
   521     static class MultiSubscriberImpl<V>
   465         implements HttpResponse.MultiSubscriber<MultiMapResult<V>,V>
   522         implements HttpResponse.MultiSubscriber<MultiMapResult<V>,V>
   466     {
   523     {
   467         private final MultiMapResult<V> results;
   524         private final MultiMapResult<V> results;
   468         private final Function<HttpRequest,Optional<HttpResponse.BodyHandler<V>>> pushHandler;
   525         private final Function<HttpRequest,Optional<HttpResponse.BodyHandler<V>>> pushHandler;