src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java
branchhttp-client-branch
changeset 56009 cf8792f51dee
parent 56008 bbd688c6fbbb
child 56010 782b2f2d1e76
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java	Fri Jan 05 14:11:48 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java	Fri Jan 12 15:36:28 2018 +0000
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -25,11 +25,15 @@
 
 package jdk.incubator.http;
 
+import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.lang.System.Logger.Level;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.OpenOption;
 import java.nio.file.Path;
 import java.security.AccessControlContext;
@@ -52,6 +56,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.stream.Stream;
 import jdk.incubator.http.internal.common.MinimalFuture;
 import jdk.incubator.http.internal.common.Utils;
 
@@ -461,6 +466,58 @@
 
     }
 
+    /**
+     * A {@code Stream<String>} built on top of the Flow API.
+     */
+    static final class HttpLineStream implements HttpResponse.BodySubscriber<Stream<String>> {
+
+        private final HttpResponseInputStream responseInputStream;
+        private final Charset charset;
+        private HttpLineStream(Charset charset) {
+            this.charset = Objects.requireNonNull(charset);
+            responseInputStream = new HttpResponseInputStream();
+        }
+
+        @Override
+        public CompletionStage<Stream<String>> getBody() {
+            return responseInputStream.getBody().thenApply((is) ->
+                    new BufferedReader(new InputStreamReader(is, charset))
+                            .lines().onClose(this::close));
+        }
+
+        @Override
+        public void onSubscribe(Subscription subscription) {
+            responseInputStream.onSubscribe(subscription);
+        }
+
+        @Override
+        public void onNext(List<ByteBuffer> item) {
+            responseInputStream.onNext(item);
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+            responseInputStream.onError(throwable);
+        }
+
+        @Override
+        public void onComplete() {
+            responseInputStream.onComplete();
+        }
+
+        void close() {
+            try {
+                responseInputStream.close();
+            } catch (IOException x) {
+                // ignore
+            }
+        }
+
+        static HttpLineStream create(Charset charset) {
+            return new HttpLineStream(Optional.ofNullable(charset).orElse(StandardCharsets.UTF_8));
+        }
+    }
+
     static class MultiSubscriberImpl<V>
         implements HttpResponse.MultiSubscriber<MultiMapResult<V>,V>
     {