src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Fri Jan 05 14:11:48 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Fri Jan 12 15:36:28 2018 +0000
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -25,11 +25,15 @@
package jdk.incubator.http;
+import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
+import java.io.InputStreamReader;
import java.lang.System.Logger.Level;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.security.AccessControlContext;
@@ -52,6 +56,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.stream.Stream;
import jdk.incubator.http.internal.common.MinimalFuture;
import jdk.incubator.http.internal.common.Utils;
@@ -461,6 +466,58 @@
}
+ /**
+ * A {@code Stream<String>} built on top of the Flow API.
+ */
+ static final class HttpLineStream implements HttpResponse.BodySubscriber<Stream<String>> {
+
+ private final HttpResponseInputStream responseInputStream;
+ private final Charset charset;
+ private HttpLineStream(Charset charset) {
+ this.charset = Objects.requireNonNull(charset);
+ responseInputStream = new HttpResponseInputStream();
+ }
+
+ @Override
+ public CompletionStage<Stream<String>> getBody() {
+ return responseInputStream.getBody().thenApply((is) ->
+ new BufferedReader(new InputStreamReader(is, charset))
+ .lines().onClose(this::close));
+ }
+
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ responseInputStream.onSubscribe(subscription);
+ }
+
+ @Override
+ public void onNext(List<ByteBuffer> item) {
+ responseInputStream.onNext(item);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ responseInputStream.onError(throwable);
+ }
+
+ @Override
+ public void onComplete() {
+ responseInputStream.onComplete();
+ }
+
+ void close() {
+ try {
+ responseInputStream.close();
+ } catch (IOException x) {
+ // ignore
+ }
+ }
+
+ static HttpLineStream create(Charset charset) {
+ return new HttpLineStream(Optional.ofNullable(charset).orElse(StandardCharsets.UTF_8));
+ }
+ }
+
static class MultiSubscriberImpl<V>
implements HttpResponse.MultiSubscriber<MultiMapResult<V>,V>
{