http-client-branch: move LineSubscriberAdapter to internal http-client-branch
authorchegar
Tue, 06 Feb 2018 11:39:55 +0000
branchhttp-client-branch
changeset 56078 6c11b48a0695
parent 56077 3f6b75adcdc0
child 56079 d23b02f37fce
http-client-branch: move LineSubscriberAdapter to internal
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/LineSubscriberAdapter.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/LineSubscriberAdapter.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java	Tue Feb 06 11:15:40 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java	Tue Feb 06 11:39:55 2018 +0000
@@ -52,6 +52,7 @@
 import java.util.stream.Stream;
 import javax.net.ssl.SSLParameters;
 import jdk.incubator.http.internal.BufferingSubscriber;
+import jdk.incubator.http.internal.LineSubscriberAdapter;
 import jdk.incubator.http.internal.ResponseSubscribers;
 import static jdk.incubator.http.internal.common.Utils.unchecked;
 import static jdk.incubator.http.internal.common.Utils.charsetFrom;
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/LineSubscriberAdapter.java	Tue Feb 06 11:15:40 2018 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,467 +0,0 @@
-/*
- * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.incubator.http;
-
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.Charset;
-import java.nio.charset.CharsetDecoder;
-import java.nio.charset.CoderResult;
-import java.nio.charset.CodingErrorAction;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.Flow;
-import java.util.concurrent.Flow.Subscriber;
-import java.util.concurrent.Flow.Subscription;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
-
-import jdk.incubator.http.internal.common.Demand;
-import jdk.incubator.http.internal.common.MinimalFuture;
-import jdk.incubator.http.internal.common.SequentialScheduler;
-
-/** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber<String>}. */
-final class LineSubscriberAdapter<S extends Subscriber<? super String>,R>
-        implements HttpResponse.BodySubscriber<R> {
-    private final CompletableFuture<R> cf = new MinimalFuture<>();
-    private final S subscriber;
-    private final Function<S, R> finisher;
-    private final Charset charset;
-    private final String eol;
-    private volatile LineSubscription downstream;
-
-    private LineSubscriberAdapter(S subscriber,
-                                  Function<S, R> finisher,
-                                  Charset charset,
-                                  String eol) {
-        if (eol != null && eol.isEmpty())
-            throw new IllegalArgumentException("empty line separator");
-        this.subscriber = Objects.requireNonNull(subscriber);
-        this.finisher = Objects.requireNonNull(finisher);
-        this.charset = Objects.requireNonNull(charset);
-        this.eol = eol;
-    }
-
-    @Override
-    public void onSubscribe(Subscription subscription) {
-        downstream = LineSubscription.create(subscription,
-                                             charset,
-                                             eol,
-                                             subscriber,
-                                             cf);
-        subscriber.onSubscribe(downstream);
-    }
-
-    @Override
-    public void onNext(List<ByteBuffer> item) {
-        try {
-            downstream.submit(item);
-        } catch (Throwable t) {
-            onError(t);
-        }
-    }
-
-    @Override
-    public void onError(Throwable throwable) {
-        try {
-            downstream.signalError(throwable);
-        } finally {
-            cf.completeExceptionally(throwable);
-        }
-    }
-
-    @Override
-    public void onComplete() {
-        try {
-            downstream.signalComplete();
-        } finally {
-            cf.complete(finisher.apply(subscriber));
-        }
-    }
-
-    @Override
-    public CompletionStage<R> getBody() {
-        return cf;
-    }
-
-    static <S extends Subscriber<? super String>, R> LineSubscriberAdapter<S, R>
-    create(S subscriber, Function<S, R> finisher, Charset charset, String eol)
-    {
-        if (eol != null && eol.isEmpty())
-            throw new IllegalArgumentException("empty line separator");
-        return new LineSubscriberAdapter<>(Objects.requireNonNull(subscriber),
-                Objects.requireNonNull(finisher),
-                Objects.requireNonNull(charset),
-                eol);
-    }
-
-    static final class LineSubscription implements Flow.Subscription {
-        final Flow.Subscription upstreamSubscription;
-        final CharsetDecoder decoder;
-        final String newline;
-        final Demand downstreamDemand;
-        final ConcurrentLinkedDeque<ByteBuffer> queue;
-        final SequentialScheduler scheduler;
-        final Flow.Subscriber<? super String> upstream;
-        final CompletableFuture<?> cf;
-        private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-        private final AtomicLong demanded = new AtomicLong();
-        private volatile boolean completed;
-        private volatile boolean cancelled;
-
-        private final char[] chars = new char[1024];
-        private final ByteBuffer leftover = ByteBuffer.wrap(new byte[64]);
-        private final CharBuffer buffer = CharBuffer.wrap(chars);
-        private final StringBuilder builder = new StringBuilder();
-        private int lineCount;
-        private String nextLine;
-
-        private LineSubscription(Flow.Subscription s,
-                                 CharsetDecoder dec,
-                                 String separator,
-                                 Flow.Subscriber<? super String> subscriber,
-                                 CompletableFuture<?> completion) {
-            downstreamDemand = new Demand();
-            queue = new ConcurrentLinkedDeque<>();
-            upstreamSubscription = Objects.requireNonNull(s);
-            decoder = Objects.requireNonNull(dec);
-            newline = separator;
-            upstream = Objects.requireNonNull(subscriber);
-            cf = Objects.requireNonNull(completion);
-            scheduler = SequentialScheduler.synchronizedScheduler(this::loop);
-        }
-
-        @Override
-        public void request(long n) {
-            if (cancelled) return;
-            if (downstreamDemand.increase(n)) {
-                scheduler.runOrSchedule();
-            }
-        }
-
-        @Override
-        public void cancel() {
-            cancelled = true;
-            upstreamSubscription.cancel();
-        }
-
-        public void submit(List<ByteBuffer> list) {
-            queue.addAll(list);
-            demanded.decrementAndGet();
-            scheduler.runOrSchedule();
-        }
-
-        public void signalComplete() {
-            completed = true;
-            scheduler.runOrSchedule();
-        }
-
-        public void signalError(Throwable error) {
-            if (errorRef.compareAndSet(null,
-                    Objects.requireNonNull(error))) {
-                scheduler.runOrSchedule();
-            }
-        }
-
-        // This method looks at whether some bytes where left over (in leftover)
-        // from decoding the previous buffer when the previous buffer was in
-        // underflow. If so, it takes bytes one by one from the new buffer 'in'
-        // and combines them with the leftover bytes until 'in' is exhausted or a
-        // character was produced in 'out', resolving the previous underflow.
-        // Returns true if the buffer is still in underflow, false otherwise.
-        // However, in both situation some chars might have been produced in 'out'.
-        private boolean isUnderFlow(ByteBuffer in, CharBuffer out, boolean endOfInput)
-                throws CharacterCodingException {
-            int limit = leftover.position();
-            if (limit == 0) {
-                // no leftover
-                return false;
-            } else {
-                CoderResult res = null;
-                while (in.hasRemaining()) {
-                    leftover.position(limit);
-                    leftover.limit(++limit);
-                    leftover.put(in.get());
-                    leftover.position(0);
-                    res = decoder.decode(leftover, out,
-                            endOfInput && !in.hasRemaining());
-                    int remaining = leftover.remaining();
-                    if (remaining > 0) {
-                        assert leftover.position() == 0;
-                        leftover.position(remaining);
-                    } else {
-                        leftover.position(0);
-                    }
-                    leftover.limit(leftover.capacity());
-                    if (res.isUnderflow() && remaining > 0 && in.hasRemaining()) {
-                        continue;
-                    }
-                    if (res.isError()) {
-                        res.throwException();
-                    }
-                    assert !res.isOverflow();
-                    return false;
-                }
-                return !endOfInput;
-            }
-        }
-
-        // extract characters from start to end and remove them from
-        // the StringBuilder
-        private static String take(StringBuilder b, int start, int end) {
-            assert start == 0;
-            String line;
-            if (end == start) return "";
-            line = b.substring(start, end);
-            b.delete(start, end);
-            return line;
-        }
-
-        // finds end of line, returns -1 if not found, or the position after
-        // the line delimiter if found, removing the delimiter in the process.
-        private static int endOfLine(StringBuilder b, String eol, boolean endOfInput) {
-            int len = b.length();
-            if (eol != null) { // delimiter explicitly specified
-                int i = b.indexOf(eol);
-                if (i >= 0) {
-                    // remove the delimiter and returns the position
-                    // of the char after it.
-                    b.delete(i, i + eol.length());
-                    return i;
-                }
-            } else { // no delimiter specified, behaves as BufferedReader::readLine
-                boolean crfound = false;
-                for (int i = 0; i < len; i++) {
-                    char c = b.charAt(i);
-                    if (c == '\n') {
-                        // '\n' or '\r\n' found.
-                        // remove the delimiter and returns the position
-                        // of the char after it.
-                        b.delete(crfound ? i - 1 : i, i + 1);
-                        return crfound ? i - 1 : i;
-                    } else if (crfound) {
-                        // previous char was '\r', c != '\n'
-                        assert i != 0;
-                        // remove the delimiter and returns the position
-                        // of the char after it.
-                        b.delete(i - 1, i);
-                        return i - 1;
-                    }
-                    crfound = c == '\r';
-                }
-                if (crfound && endOfInput) {
-                    // remove the delimiter and returns the position
-                    // of the char after it.
-                    b.delete(len - 1, len);
-                    return len - 1;
-                }
-            }
-            return endOfInput && len > 0 ? len : -1;
-        }
-
-        // Looks at whether the StringBuilder contains a line.
-        // Returns null if more character are needed.
-        private static String nextLine(StringBuilder b, String eol, boolean endOfInput) {
-            int next = endOfLine(b, eol, endOfInput);
-            return (next > -1) ? take(b, 0, next) : null;
-        }
-
-        // Attempts to read the next line. Returns the next line if
-        // the delimiter was found, null otherwise. The delimiters are
-        // consumed.
-        private String nextLine()
-                throws CharacterCodingException {
-            assert nextLine == null;
-            LINES:
-            while (nextLine == null) {
-                boolean endOfInput = completed && queue.isEmpty();
-                nextLine = nextLine(builder, newline,
-                        endOfInput && leftover.position() == 0);
-                if (nextLine != null) return nextLine;
-                ByteBuffer b;
-                BUFFERS:
-                while ((b = queue.peek()) != null) {
-                    if (!b.hasRemaining()) {
-                        queue.poll();
-                        continue BUFFERS;
-                    }
-                    BYTES:
-                    while (b.hasRemaining()) {
-                        buffer.position(0);
-                        buffer.limit(buffer.capacity());
-                        boolean endofInput = completed && queue.size() <= 1;
-                        if (isUnderFlow(b, buffer, endofInput)) {
-                            assert !b.hasRemaining();
-                            if (buffer.position() > 0) {
-                                buffer.flip();
-                                builder.append(buffer);
-                            }
-                            continue BUFFERS;
-                        }
-                        CoderResult res = decoder.decode(b, buffer, endofInput);
-                        if (res.isError()) res.throwException();
-                        if (buffer.position() > 0) {
-                            buffer.flip();
-                            builder.append(buffer);
-                            continue LINES;
-                        }
-                        if (res.isUnderflow() && b.hasRemaining()) {
-                            //System.out.println("underflow: adding " + b.remaining() + " bytes");
-                            leftover.put(b);
-                            assert !b.hasRemaining();
-                            continue BUFFERS;
-                        }
-                    }
-                }
-
-                assert queue.isEmpty();
-                if (endOfInput) {
-                    // Time to cleanup: there may be some undecoded leftover bytes
-                    // We need to flush them out.
-                    // The decoder has been configured to replace malformed/unmappable
-                    // chars with some replacement, in order to behave like
-                    // InputStreamReader.
-                    leftover.flip();
-                    buffer.position(0);
-                    buffer.limit(buffer.capacity());
-
-                    // decode() must be called just before flush, even if there
-                    // is nothing to decode. We must do this even if leftover
-                    // has no remaining bytes.
-                    CoderResult res = decoder.decode(leftover, buffer, endOfInput);
-                    if (buffer.position() > 0) {
-                        buffer.flip();
-                        builder.append(buffer);
-                    }
-                    if (res.isError()) res.throwException();
-
-                    // Now call decoder.flush()
-                    buffer.position(0);
-                    buffer.limit(buffer.capacity());
-                    res = decoder.flush(buffer);
-                    if (buffer.position() > 0) {
-                        buffer.flip();
-                        builder.append(buffer);
-                    }
-                    if (res.isError()) res.throwException();
-
-                    // It's possible that we reach here twice - just for the
-                    // purpose of checking that no bytes were left over, so
-                    // we reset leftover/decoder to make the function reentrant.
-                    leftover.position(0);
-                    leftover.limit(leftover.capacity());
-                    decoder.reset();
-
-                    // if some chars were produced then this call will
-                    // return them.
-                    return nextLine = nextLine(builder, newline, endOfInput);
-                }
-                return null;
-            }
-            return null;
-        }
-
-        // The main sequential scheduler loop.
-        private void loop() {
-            try {
-                while (!cancelled) {
-                    Throwable error = errorRef.get();
-                    if (error != null) {
-                        cancelled = true;
-                        scheduler.stop();
-                        upstream.onError(error);
-                        cf.completeExceptionally(error);
-                        return;
-                    }
-                    if (nextLine == null) nextLine = nextLine();
-                    if (nextLine == null) {
-                        if (completed) {
-                            scheduler.stop();
-                            if (leftover.position() != 0) {
-                                // Underflow: not all bytes could be
-                                // decoded, but no more bytes will be coming.
-                                // This should not happen as we should already
-                                // have got a MalformedInputException, or
-                                // replaced the unmappable chars.
-                                errorRef.compareAndSet(null,
-                                        new IllegalStateException(
-                                                "premature end of input ("
-                                                        + leftover.position()
-                                                        + " undecoded bytes)"));
-                                continue;
-                            } else {
-                                upstream.onComplete();
-                            }
-                            return;
-                        } else if (demanded.get() == 0
-                                && !downstreamDemand.isFulfilled()) {
-                            long incr = Math.max(1, downstreamDemand.get());
-                            demanded.addAndGet(incr);
-                            upstreamSubscription.request(incr);
-                            continue;
-                        } else return;
-                    }
-                    assert nextLine != null;
-                    assert newline != null && !nextLine.endsWith(newline)
-                            || !nextLine.endsWith("\n") || !nextLine.endsWith("\r");
-                    if (downstreamDemand.tryDecrement()) {
-                        String forward = nextLine;
-                        nextLine = null;
-                        upstream.onNext(forward);
-                    } else return; // no demand: come back later
-                }
-            } catch (Throwable t) {
-                try {
-                    upstreamSubscription.cancel();
-                } finally {
-                    signalError(t);
-                }
-            }
-        }
-
-        static LineSubscription create(Flow.Subscription s,
-                                       Charset charset,
-                                       String lineSeparator,
-                                       Flow.Subscriber<? super String> upstream,
-                                       CompletableFuture<?> cf) {
-            return new LineSubscription(Objects.requireNonNull(s),
-                    Objects.requireNonNull(charset).newDecoder()
-                            // use the same decoder configuration than
-                            // java.io.InputStreamReader
-                            .onMalformedInput(CodingErrorAction.REPLACE)
-                            .onUnmappableCharacter(CodingErrorAction.REPLACE),
-                    lineSeparator,
-                    Objects.requireNonNull(upstream),
-                    Objects.requireNonNull(cf));
-        }
-    }
-}
-
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/LineSubscriberAdapter.java	Tue Feb 06 11:39:55 2018 +0000
@@ -0,0 +1,467 @@
+/*
+ * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.incubator.http.internal;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CoderResult;
+import java.nio.charset.CodingErrorAction;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import jdk.incubator.http.internal.common.Demand;
+import jdk.incubator.http.HttpResponse.BodySubscriber;
+import jdk.incubator.http.internal.common.MinimalFuture;
+import jdk.incubator.http.internal.common.SequentialScheduler;
+
+/** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber<String>}. */
+public final class LineSubscriberAdapter<S extends Subscriber<? super String>,R>
+        implements BodySubscriber<R> {
+    private final CompletableFuture<R> cf = new MinimalFuture<>();
+    private final S subscriber;
+    private final Function<S, R> finisher;
+    private final Charset charset;
+    private final String eol;
+    private volatile LineSubscription downstream;
+
+    private LineSubscriberAdapter(S subscriber,
+                                  Function<S, R> finisher,
+                                  Charset charset,
+                                  String eol) {
+        if (eol != null && eol.isEmpty())
+            throw new IllegalArgumentException("empty line separator");
+        this.subscriber = Objects.requireNonNull(subscriber);
+        this.finisher = Objects.requireNonNull(finisher);
+        this.charset = Objects.requireNonNull(charset);
+        this.eol = eol;
+    }
+
+    @Override
+    public void onSubscribe(Subscription subscription) {
+        downstream = LineSubscription.create(subscription,
+                                             charset,
+                                             eol,
+                                             subscriber,
+                                             cf);
+        subscriber.onSubscribe(downstream);
+    }
+
+    @Override
+    public void onNext(List<ByteBuffer> item) {
+        try {
+            downstream.submit(item);
+        } catch (Throwable t) {
+            onError(t);
+        }
+    }
+
+    @Override
+    public void onError(Throwable throwable) {
+        try {
+            downstream.signalError(throwable);
+        } finally {
+            cf.completeExceptionally(throwable);
+        }
+    }
+
+    @Override
+    public void onComplete() {
+        try {
+            downstream.signalComplete();
+        } finally {
+            cf.complete(finisher.apply(subscriber));
+        }
+    }
+
+    @Override
+    public CompletionStage<R> getBody() {
+        return cf;
+    }
+
+    public static <S extends Subscriber<? super String>, R> LineSubscriberAdapter<S, R>
+    create(S subscriber, Function<S, R> finisher, Charset charset, String eol)
+    {
+        if (eol != null && eol.isEmpty())
+            throw new IllegalArgumentException("empty line separator");
+        return new LineSubscriberAdapter<>(Objects.requireNonNull(subscriber),
+                Objects.requireNonNull(finisher),
+                Objects.requireNonNull(charset),
+                eol);
+    }
+
+    static final class LineSubscription implements Flow.Subscription {
+        final Flow.Subscription upstreamSubscription;
+        final CharsetDecoder decoder;
+        final String newline;
+        final Demand downstreamDemand;
+        final ConcurrentLinkedDeque<ByteBuffer> queue;
+        final SequentialScheduler scheduler;
+        final Flow.Subscriber<? super String> upstream;
+        final CompletableFuture<?> cf;
+        private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+        private final AtomicLong demanded = new AtomicLong();
+        private volatile boolean completed;
+        private volatile boolean cancelled;
+
+        private final char[] chars = new char[1024];
+        private final ByteBuffer leftover = ByteBuffer.wrap(new byte[64]);
+        private final CharBuffer buffer = CharBuffer.wrap(chars);
+        private final StringBuilder builder = new StringBuilder();
+        private int lineCount;
+        private String nextLine;
+
+        private LineSubscription(Flow.Subscription s,
+                                 CharsetDecoder dec,
+                                 String separator,
+                                 Flow.Subscriber<? super String> subscriber,
+                                 CompletableFuture<?> completion) {
+            downstreamDemand = new Demand();
+            queue = new ConcurrentLinkedDeque<>();
+            upstreamSubscription = Objects.requireNonNull(s);
+            decoder = Objects.requireNonNull(dec);
+            newline = separator;
+            upstream = Objects.requireNonNull(subscriber);
+            cf = Objects.requireNonNull(completion);
+            scheduler = SequentialScheduler.synchronizedScheduler(this::loop);
+        }
+
+        @Override
+        public void request(long n) {
+            if (cancelled) return;
+            if (downstreamDemand.increase(n)) {
+                scheduler.runOrSchedule();
+            }
+        }
+
+        @Override
+        public void cancel() {
+            cancelled = true;
+            upstreamSubscription.cancel();
+        }
+
+        public void submit(List<ByteBuffer> list) {
+            queue.addAll(list);
+            demanded.decrementAndGet();
+            scheduler.runOrSchedule();
+        }
+
+        public void signalComplete() {
+            completed = true;
+            scheduler.runOrSchedule();
+        }
+
+        public void signalError(Throwable error) {
+            if (errorRef.compareAndSet(null,
+                    Objects.requireNonNull(error))) {
+                scheduler.runOrSchedule();
+            }
+        }
+
+        // This method looks at whether some bytes where left over (in leftover)
+        // from decoding the previous buffer when the previous buffer was in
+        // underflow. If so, it takes bytes one by one from the new buffer 'in'
+        // and combines them with the leftover bytes until 'in' is exhausted or a
+        // character was produced in 'out', resolving the previous underflow.
+        // Returns true if the buffer is still in underflow, false otherwise.
+        // However, in both situation some chars might have been produced in 'out'.
+        private boolean isUnderFlow(ByteBuffer in, CharBuffer out, boolean endOfInput)
+                throws CharacterCodingException {
+            int limit = leftover.position();
+            if (limit == 0) {
+                // no leftover
+                return false;
+            } else {
+                CoderResult res = null;
+                while (in.hasRemaining()) {
+                    leftover.position(limit);
+                    leftover.limit(++limit);
+                    leftover.put(in.get());
+                    leftover.position(0);
+                    res = decoder.decode(leftover, out,
+                            endOfInput && !in.hasRemaining());
+                    int remaining = leftover.remaining();
+                    if (remaining > 0) {
+                        assert leftover.position() == 0;
+                        leftover.position(remaining);
+                    } else {
+                        leftover.position(0);
+                    }
+                    leftover.limit(leftover.capacity());
+                    if (res.isUnderflow() && remaining > 0 && in.hasRemaining()) {
+                        continue;
+                    }
+                    if (res.isError()) {
+                        res.throwException();
+                    }
+                    assert !res.isOverflow();
+                    return false;
+                }
+                return !endOfInput;
+            }
+        }
+
+        // extract characters from start to end and remove them from
+        // the StringBuilder
+        private static String take(StringBuilder b, int start, int end) {
+            assert start == 0;
+            String line;
+            if (end == start) return "";
+            line = b.substring(start, end);
+            b.delete(start, end);
+            return line;
+        }
+
+        // finds end of line, returns -1 if not found, or the position after
+        // the line delimiter if found, removing the delimiter in the process.
+        private static int endOfLine(StringBuilder b, String eol, boolean endOfInput) {
+            int len = b.length();
+            if (eol != null) { // delimiter explicitly specified
+                int i = b.indexOf(eol);
+                if (i >= 0) {
+                    // remove the delimiter and returns the position
+                    // of the char after it.
+                    b.delete(i, i + eol.length());
+                    return i;
+                }
+            } else { // no delimiter specified, behaves as BufferedReader::readLine
+                boolean crfound = false;
+                for (int i = 0; i < len; i++) {
+                    char c = b.charAt(i);
+                    if (c == '\n') {
+                        // '\n' or '\r\n' found.
+                        // remove the delimiter and returns the position
+                        // of the char after it.
+                        b.delete(crfound ? i - 1 : i, i + 1);
+                        return crfound ? i - 1 : i;
+                    } else if (crfound) {
+                        // previous char was '\r', c != '\n'
+                        assert i != 0;
+                        // remove the delimiter and returns the position
+                        // of the char after it.
+                        b.delete(i - 1, i);
+                        return i - 1;
+                    }
+                    crfound = c == '\r';
+                }
+                if (crfound && endOfInput) {
+                    // remove the delimiter and returns the position
+                    // of the char after it.
+                    b.delete(len - 1, len);
+                    return len - 1;
+                }
+            }
+            return endOfInput && len > 0 ? len : -1;
+        }
+
+        // Looks at whether the StringBuilder contains a line.
+        // Returns null if more character are needed.
+        private static String nextLine(StringBuilder b, String eol, boolean endOfInput) {
+            int next = endOfLine(b, eol, endOfInput);
+            return (next > -1) ? take(b, 0, next) : null;
+        }
+
+        // Attempts to read the next line. Returns the next line if
+        // the delimiter was found, null otherwise. The delimiters are
+        // consumed.
+        private String nextLine()
+                throws CharacterCodingException {
+            assert nextLine == null;
+            LINES:
+            while (nextLine == null) {
+                boolean endOfInput = completed && queue.isEmpty();
+                nextLine = nextLine(builder, newline,
+                        endOfInput && leftover.position() == 0);
+                if (nextLine != null) return nextLine;
+                ByteBuffer b;
+                BUFFERS:
+                while ((b = queue.peek()) != null) {
+                    if (!b.hasRemaining()) {
+                        queue.poll();
+                        continue BUFFERS;
+                    }
+                    BYTES:
+                    while (b.hasRemaining()) {
+                        buffer.position(0);
+                        buffer.limit(buffer.capacity());
+                        boolean endofInput = completed && queue.size() <= 1;
+                        if (isUnderFlow(b, buffer, endofInput)) {
+                            assert !b.hasRemaining();
+                            if (buffer.position() > 0) {
+                                buffer.flip();
+                                builder.append(buffer);
+                            }
+                            continue BUFFERS;
+                        }
+                        CoderResult res = decoder.decode(b, buffer, endofInput);
+                        if (res.isError()) res.throwException();
+                        if (buffer.position() > 0) {
+                            buffer.flip();
+                            builder.append(buffer);
+                            continue LINES;
+                        }
+                        if (res.isUnderflow() && b.hasRemaining()) {
+                            //System.out.println("underflow: adding " + b.remaining() + " bytes");
+                            leftover.put(b);
+                            assert !b.hasRemaining();
+                            continue BUFFERS;
+                        }
+                    }
+                }
+
+                assert queue.isEmpty();
+                if (endOfInput) {
+                    // Time to cleanup: there may be some undecoded leftover bytes
+                    // We need to flush them out.
+                    // The decoder has been configured to replace malformed/unmappable
+                    // chars with some replacement, in order to behave like
+                    // InputStreamReader.
+                    leftover.flip();
+                    buffer.position(0);
+                    buffer.limit(buffer.capacity());
+
+                    // decode() must be called just before flush, even if there
+                    // is nothing to decode. We must do this even if leftover
+                    // has no remaining bytes.
+                    CoderResult res = decoder.decode(leftover, buffer, endOfInput);
+                    if (buffer.position() > 0) {
+                        buffer.flip();
+                        builder.append(buffer);
+                    }
+                    if (res.isError()) res.throwException();
+
+                    // Now call decoder.flush()
+                    buffer.position(0);
+                    buffer.limit(buffer.capacity());
+                    res = decoder.flush(buffer);
+                    if (buffer.position() > 0) {
+                        buffer.flip();
+                        builder.append(buffer);
+                    }
+                    if (res.isError()) res.throwException();
+
+                    // It's possible that we reach here twice - just for the
+                    // purpose of checking that no bytes were left over, so
+                    // we reset leftover/decoder to make the function reentrant.
+                    leftover.position(0);
+                    leftover.limit(leftover.capacity());
+                    decoder.reset();
+
+                    // if some chars were produced then this call will
+                    // return them.
+                    return nextLine = nextLine(builder, newline, endOfInput);
+                }
+                return null;
+            }
+            return null;
+        }
+
+        // The main sequential scheduler loop.
+        private void loop() {
+            try {
+                while (!cancelled) {
+                    Throwable error = errorRef.get();
+                    if (error != null) {
+                        cancelled = true;
+                        scheduler.stop();
+                        upstream.onError(error);
+                        cf.completeExceptionally(error);
+                        return;
+                    }
+                    if (nextLine == null) nextLine = nextLine();
+                    if (nextLine == null) {
+                        if (completed) {
+                            scheduler.stop();
+                            if (leftover.position() != 0) {
+                                // Underflow: not all bytes could be
+                                // decoded, but no more bytes will be coming.
+                                // This should not happen as we should already
+                                // have got a MalformedInputException, or
+                                // replaced the unmappable chars.
+                                errorRef.compareAndSet(null,
+                                        new IllegalStateException(
+                                                "premature end of input ("
+                                                        + leftover.position()
+                                                        + " undecoded bytes)"));
+                                continue;
+                            } else {
+                                upstream.onComplete();
+                            }
+                            return;
+                        } else if (demanded.get() == 0
+                                && !downstreamDemand.isFulfilled()) {
+                            long incr = Math.max(1, downstreamDemand.get());
+                            demanded.addAndGet(incr);
+                            upstreamSubscription.request(incr);
+                            continue;
+                        } else return;
+                    }
+                    assert nextLine != null;
+                    assert newline != null && !nextLine.endsWith(newline)
+                            || !nextLine.endsWith("\n") || !nextLine.endsWith("\r");
+                    if (downstreamDemand.tryDecrement()) {
+                        String forward = nextLine;
+                        nextLine = null;
+                        upstream.onNext(forward);
+                    } else return; // no demand: come back later
+                }
+            } catch (Throwable t) {
+                try {
+                    upstreamSubscription.cancel();
+                } finally {
+                    signalError(t);
+                }
+            }
+        }
+
+        static LineSubscription create(Flow.Subscription s,
+                                       Charset charset,
+                                       String lineSeparator,
+                                       Flow.Subscriber<? super String> upstream,
+                                       CompletableFuture<?> cf) {
+            return new LineSubscription(Objects.requireNonNull(s),
+                    Objects.requireNonNull(charset).newDecoder()
+                            // use the same decoder configuration than
+                            // java.io.InputStreamReader
+                            .onMalformedInput(CodingErrorAction.REPLACE)
+                            .onUnmappableCharacter(CodingErrorAction.REPLACE),
+                    lineSeparator,
+                    Objects.requireNonNull(upstream),
+                    Objects.requireNonNull(cf));
+        }
+    }
+}
+