# HG changeset patch # User chegar # Date 1517917195 0 # Node ID 6c11b48a0695ee275b15c6baa45a1db16baa431c # Parent 3f6b75adcdc00f34c6a8944185e5134fb263c0b3 http-client-branch: move LineSubscriberAdapter to internal diff -r 3f6b75adcdc0 -r 6c11b48a0695 src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java Tue Feb 06 11:15:40 2018 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java Tue Feb 06 11:39:55 2018 +0000 @@ -52,6 +52,7 @@ import java.util.stream.Stream; import javax.net.ssl.SSLParameters; import jdk.incubator.http.internal.BufferingSubscriber; +import jdk.incubator.http.internal.LineSubscriberAdapter; import jdk.incubator.http.internal.ResponseSubscribers; import static jdk.incubator.http.internal.common.Utils.unchecked; import static jdk.incubator.http.internal.common.Utils.charsetFrom; diff -r 3f6b75adcdc0 -r 6c11b48a0695 src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/LineSubscriberAdapter.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/LineSubscriberAdapter.java Tue Feb 06 11:15:40 2018 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,467 +0,0 @@ -/* - * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ - -package jdk.incubator.http; - -import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; -import java.nio.charset.CharsetDecoder; -import java.nio.charset.CoderResult; -import java.nio.charset.CodingErrorAction; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.Flow; -import java.util.concurrent.Flow.Subscriber; -import java.util.concurrent.Flow.Subscription; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; - -import jdk.incubator.http.internal.common.Demand; -import jdk.incubator.http.internal.common.MinimalFuture; -import jdk.incubator.http.internal.common.SequentialScheduler; - -/** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber}. */ -final class LineSubscriberAdapter,R> - implements HttpResponse.BodySubscriber { - private final CompletableFuture cf = new MinimalFuture<>(); - private final S subscriber; - private final Function finisher; - private final Charset charset; - private final String eol; - private volatile LineSubscription downstream; - - private LineSubscriberAdapter(S subscriber, - Function finisher, - Charset charset, - String eol) { - if (eol != null && eol.isEmpty()) - throw new IllegalArgumentException("empty line separator"); - this.subscriber = Objects.requireNonNull(subscriber); - this.finisher = Objects.requireNonNull(finisher); - this.charset = Objects.requireNonNull(charset); - this.eol = eol; - } - - @Override - public void onSubscribe(Subscription subscription) { - downstream = LineSubscription.create(subscription, - charset, - eol, - subscriber, - cf); - subscriber.onSubscribe(downstream); - } - - @Override - public void onNext(List item) { - try { - downstream.submit(item); - } catch (Throwable t) { - onError(t); - } - } - - @Override - public void onError(Throwable throwable) { - try { - downstream.signalError(throwable); - } finally { - cf.completeExceptionally(throwable); - } - } - - @Override - public void onComplete() { - try { - downstream.signalComplete(); - } finally { - cf.complete(finisher.apply(subscriber)); - } - } - - @Override - public CompletionStage getBody() { - return cf; - } - - static , R> LineSubscriberAdapter - create(S subscriber, Function finisher, Charset charset, String eol) - { - if (eol != null && eol.isEmpty()) - throw new IllegalArgumentException("empty line separator"); - return new LineSubscriberAdapter<>(Objects.requireNonNull(subscriber), - Objects.requireNonNull(finisher), - Objects.requireNonNull(charset), - eol); - } - - static final class LineSubscription implements Flow.Subscription { - final Flow.Subscription upstreamSubscription; - final CharsetDecoder decoder; - final String newline; - final Demand downstreamDemand; - final ConcurrentLinkedDeque queue; - final SequentialScheduler scheduler; - final Flow.Subscriber upstream; - final CompletableFuture cf; - private final AtomicReference errorRef = new AtomicReference<>(); - private final AtomicLong demanded = new AtomicLong(); - private volatile boolean completed; - private volatile boolean cancelled; - - private final char[] chars = new char[1024]; - private final ByteBuffer leftover = ByteBuffer.wrap(new byte[64]); - private final CharBuffer buffer = CharBuffer.wrap(chars); - private final StringBuilder builder = new StringBuilder(); - private int lineCount; - private String nextLine; - - private LineSubscription(Flow.Subscription s, - CharsetDecoder dec, - String separator, - Flow.Subscriber subscriber, - CompletableFuture completion) { - downstreamDemand = new Demand(); - queue = new ConcurrentLinkedDeque<>(); - upstreamSubscription = Objects.requireNonNull(s); - decoder = Objects.requireNonNull(dec); - newline = separator; - upstream = Objects.requireNonNull(subscriber); - cf = Objects.requireNonNull(completion); - scheduler = SequentialScheduler.synchronizedScheduler(this::loop); - } - - @Override - public void request(long n) { - if (cancelled) return; - if (downstreamDemand.increase(n)) { - scheduler.runOrSchedule(); - } - } - - @Override - public void cancel() { - cancelled = true; - upstreamSubscription.cancel(); - } - - public void submit(List list) { - queue.addAll(list); - demanded.decrementAndGet(); - scheduler.runOrSchedule(); - } - - public void signalComplete() { - completed = true; - scheduler.runOrSchedule(); - } - - public void signalError(Throwable error) { - if (errorRef.compareAndSet(null, - Objects.requireNonNull(error))) { - scheduler.runOrSchedule(); - } - } - - // This method looks at whether some bytes where left over (in leftover) - // from decoding the previous buffer when the previous buffer was in - // underflow. If so, it takes bytes one by one from the new buffer 'in' - // and combines them with the leftover bytes until 'in' is exhausted or a - // character was produced in 'out', resolving the previous underflow. - // Returns true if the buffer is still in underflow, false otherwise. - // However, in both situation some chars might have been produced in 'out'. - private boolean isUnderFlow(ByteBuffer in, CharBuffer out, boolean endOfInput) - throws CharacterCodingException { - int limit = leftover.position(); - if (limit == 0) { - // no leftover - return false; - } else { - CoderResult res = null; - while (in.hasRemaining()) { - leftover.position(limit); - leftover.limit(++limit); - leftover.put(in.get()); - leftover.position(0); - res = decoder.decode(leftover, out, - endOfInput && !in.hasRemaining()); - int remaining = leftover.remaining(); - if (remaining > 0) { - assert leftover.position() == 0; - leftover.position(remaining); - } else { - leftover.position(0); - } - leftover.limit(leftover.capacity()); - if (res.isUnderflow() && remaining > 0 && in.hasRemaining()) { - continue; - } - if (res.isError()) { - res.throwException(); - } - assert !res.isOverflow(); - return false; - } - return !endOfInput; - } - } - - // extract characters from start to end and remove them from - // the StringBuilder - private static String take(StringBuilder b, int start, int end) { - assert start == 0; - String line; - if (end == start) return ""; - line = b.substring(start, end); - b.delete(start, end); - return line; - } - - // finds end of line, returns -1 if not found, or the position after - // the line delimiter if found, removing the delimiter in the process. - private static int endOfLine(StringBuilder b, String eol, boolean endOfInput) { - int len = b.length(); - if (eol != null) { // delimiter explicitly specified - int i = b.indexOf(eol); - if (i >= 0) { - // remove the delimiter and returns the position - // of the char after it. - b.delete(i, i + eol.length()); - return i; - } - } else { // no delimiter specified, behaves as BufferedReader::readLine - boolean crfound = false; - for (int i = 0; i < len; i++) { - char c = b.charAt(i); - if (c == '\n') { - // '\n' or '\r\n' found. - // remove the delimiter and returns the position - // of the char after it. - b.delete(crfound ? i - 1 : i, i + 1); - return crfound ? i - 1 : i; - } else if (crfound) { - // previous char was '\r', c != '\n' - assert i != 0; - // remove the delimiter and returns the position - // of the char after it. - b.delete(i - 1, i); - return i - 1; - } - crfound = c == '\r'; - } - if (crfound && endOfInput) { - // remove the delimiter and returns the position - // of the char after it. - b.delete(len - 1, len); - return len - 1; - } - } - return endOfInput && len > 0 ? len : -1; - } - - // Looks at whether the StringBuilder contains a line. - // Returns null if more character are needed. - private static String nextLine(StringBuilder b, String eol, boolean endOfInput) { - int next = endOfLine(b, eol, endOfInput); - return (next > -1) ? take(b, 0, next) : null; - } - - // Attempts to read the next line. Returns the next line if - // the delimiter was found, null otherwise. The delimiters are - // consumed. - private String nextLine() - throws CharacterCodingException { - assert nextLine == null; - LINES: - while (nextLine == null) { - boolean endOfInput = completed && queue.isEmpty(); - nextLine = nextLine(builder, newline, - endOfInput && leftover.position() == 0); - if (nextLine != null) return nextLine; - ByteBuffer b; - BUFFERS: - while ((b = queue.peek()) != null) { - if (!b.hasRemaining()) { - queue.poll(); - continue BUFFERS; - } - BYTES: - while (b.hasRemaining()) { - buffer.position(0); - buffer.limit(buffer.capacity()); - boolean endofInput = completed && queue.size() <= 1; - if (isUnderFlow(b, buffer, endofInput)) { - assert !b.hasRemaining(); - if (buffer.position() > 0) { - buffer.flip(); - builder.append(buffer); - } - continue BUFFERS; - } - CoderResult res = decoder.decode(b, buffer, endofInput); - if (res.isError()) res.throwException(); - if (buffer.position() > 0) { - buffer.flip(); - builder.append(buffer); - continue LINES; - } - if (res.isUnderflow() && b.hasRemaining()) { - //System.out.println("underflow: adding " + b.remaining() + " bytes"); - leftover.put(b); - assert !b.hasRemaining(); - continue BUFFERS; - } - } - } - - assert queue.isEmpty(); - if (endOfInput) { - // Time to cleanup: there may be some undecoded leftover bytes - // We need to flush them out. - // The decoder has been configured to replace malformed/unmappable - // chars with some replacement, in order to behave like - // InputStreamReader. - leftover.flip(); - buffer.position(0); - buffer.limit(buffer.capacity()); - - // decode() must be called just before flush, even if there - // is nothing to decode. We must do this even if leftover - // has no remaining bytes. - CoderResult res = decoder.decode(leftover, buffer, endOfInput); - if (buffer.position() > 0) { - buffer.flip(); - builder.append(buffer); - } - if (res.isError()) res.throwException(); - - // Now call decoder.flush() - buffer.position(0); - buffer.limit(buffer.capacity()); - res = decoder.flush(buffer); - if (buffer.position() > 0) { - buffer.flip(); - builder.append(buffer); - } - if (res.isError()) res.throwException(); - - // It's possible that we reach here twice - just for the - // purpose of checking that no bytes were left over, so - // we reset leftover/decoder to make the function reentrant. - leftover.position(0); - leftover.limit(leftover.capacity()); - decoder.reset(); - - // if some chars were produced then this call will - // return them. - return nextLine = nextLine(builder, newline, endOfInput); - } - return null; - } - return null; - } - - // The main sequential scheduler loop. - private void loop() { - try { - while (!cancelled) { - Throwable error = errorRef.get(); - if (error != null) { - cancelled = true; - scheduler.stop(); - upstream.onError(error); - cf.completeExceptionally(error); - return; - } - if (nextLine == null) nextLine = nextLine(); - if (nextLine == null) { - if (completed) { - scheduler.stop(); - if (leftover.position() != 0) { - // Underflow: not all bytes could be - // decoded, but no more bytes will be coming. - // This should not happen as we should already - // have got a MalformedInputException, or - // replaced the unmappable chars. - errorRef.compareAndSet(null, - new IllegalStateException( - "premature end of input (" - + leftover.position() - + " undecoded bytes)")); - continue; - } else { - upstream.onComplete(); - } - return; - } else if (demanded.get() == 0 - && !downstreamDemand.isFulfilled()) { - long incr = Math.max(1, downstreamDemand.get()); - demanded.addAndGet(incr); - upstreamSubscription.request(incr); - continue; - } else return; - } - assert nextLine != null; - assert newline != null && !nextLine.endsWith(newline) - || !nextLine.endsWith("\n") || !nextLine.endsWith("\r"); - if (downstreamDemand.tryDecrement()) { - String forward = nextLine; - nextLine = null; - upstream.onNext(forward); - } else return; // no demand: come back later - } - } catch (Throwable t) { - try { - upstreamSubscription.cancel(); - } finally { - signalError(t); - } - } - } - - static LineSubscription create(Flow.Subscription s, - Charset charset, - String lineSeparator, - Flow.Subscriber upstream, - CompletableFuture cf) { - return new LineSubscription(Objects.requireNonNull(s), - Objects.requireNonNull(charset).newDecoder() - // use the same decoder configuration than - // java.io.InputStreamReader - .onMalformedInput(CodingErrorAction.REPLACE) - .onUnmappableCharacter(CodingErrorAction.REPLACE), - lineSeparator, - Objects.requireNonNull(upstream), - Objects.requireNonNull(cf)); - } - } -} - diff -r 3f6b75adcdc0 -r 6c11b48a0695 src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/LineSubscriberAdapter.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/LineSubscriberAdapter.java Tue Feb 06 11:39:55 2018 +0000 @@ -0,0 +1,467 @@ +/* + * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +package jdk.incubator.http.internal; + +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CoderResult; +import java.nio.charset.CodingErrorAction; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.Flow; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import jdk.incubator.http.internal.common.Demand; +import jdk.incubator.http.HttpResponse.BodySubscriber; +import jdk.incubator.http.internal.common.MinimalFuture; +import jdk.incubator.http.internal.common.SequentialScheduler; + +/** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber}. */ +public final class LineSubscriberAdapter,R> + implements BodySubscriber { + private final CompletableFuture cf = new MinimalFuture<>(); + private final S subscriber; + private final Function finisher; + private final Charset charset; + private final String eol; + private volatile LineSubscription downstream; + + private LineSubscriberAdapter(S subscriber, + Function finisher, + Charset charset, + String eol) { + if (eol != null && eol.isEmpty()) + throw new IllegalArgumentException("empty line separator"); + this.subscriber = Objects.requireNonNull(subscriber); + this.finisher = Objects.requireNonNull(finisher); + this.charset = Objects.requireNonNull(charset); + this.eol = eol; + } + + @Override + public void onSubscribe(Subscription subscription) { + downstream = LineSubscription.create(subscription, + charset, + eol, + subscriber, + cf); + subscriber.onSubscribe(downstream); + } + + @Override + public void onNext(List item) { + try { + downstream.submit(item); + } catch (Throwable t) { + onError(t); + } + } + + @Override + public void onError(Throwable throwable) { + try { + downstream.signalError(throwable); + } finally { + cf.completeExceptionally(throwable); + } + } + + @Override + public void onComplete() { + try { + downstream.signalComplete(); + } finally { + cf.complete(finisher.apply(subscriber)); + } + } + + @Override + public CompletionStage getBody() { + return cf; + } + + public static , R> LineSubscriberAdapter + create(S subscriber, Function finisher, Charset charset, String eol) + { + if (eol != null && eol.isEmpty()) + throw new IllegalArgumentException("empty line separator"); + return new LineSubscriberAdapter<>(Objects.requireNonNull(subscriber), + Objects.requireNonNull(finisher), + Objects.requireNonNull(charset), + eol); + } + + static final class LineSubscription implements Flow.Subscription { + final Flow.Subscription upstreamSubscription; + final CharsetDecoder decoder; + final String newline; + final Demand downstreamDemand; + final ConcurrentLinkedDeque queue; + final SequentialScheduler scheduler; + final Flow.Subscriber upstream; + final CompletableFuture cf; + private final AtomicReference errorRef = new AtomicReference<>(); + private final AtomicLong demanded = new AtomicLong(); + private volatile boolean completed; + private volatile boolean cancelled; + + private final char[] chars = new char[1024]; + private final ByteBuffer leftover = ByteBuffer.wrap(new byte[64]); + private final CharBuffer buffer = CharBuffer.wrap(chars); + private final StringBuilder builder = new StringBuilder(); + private int lineCount; + private String nextLine; + + private LineSubscription(Flow.Subscription s, + CharsetDecoder dec, + String separator, + Flow.Subscriber subscriber, + CompletableFuture completion) { + downstreamDemand = new Demand(); + queue = new ConcurrentLinkedDeque<>(); + upstreamSubscription = Objects.requireNonNull(s); + decoder = Objects.requireNonNull(dec); + newline = separator; + upstream = Objects.requireNonNull(subscriber); + cf = Objects.requireNonNull(completion); + scheduler = SequentialScheduler.synchronizedScheduler(this::loop); + } + + @Override + public void request(long n) { + if (cancelled) return; + if (downstreamDemand.increase(n)) { + scheduler.runOrSchedule(); + } + } + + @Override + public void cancel() { + cancelled = true; + upstreamSubscription.cancel(); + } + + public void submit(List list) { + queue.addAll(list); + demanded.decrementAndGet(); + scheduler.runOrSchedule(); + } + + public void signalComplete() { + completed = true; + scheduler.runOrSchedule(); + } + + public void signalError(Throwable error) { + if (errorRef.compareAndSet(null, + Objects.requireNonNull(error))) { + scheduler.runOrSchedule(); + } + } + + // This method looks at whether some bytes where left over (in leftover) + // from decoding the previous buffer when the previous buffer was in + // underflow. If so, it takes bytes one by one from the new buffer 'in' + // and combines them with the leftover bytes until 'in' is exhausted or a + // character was produced in 'out', resolving the previous underflow. + // Returns true if the buffer is still in underflow, false otherwise. + // However, in both situation some chars might have been produced in 'out'. + private boolean isUnderFlow(ByteBuffer in, CharBuffer out, boolean endOfInput) + throws CharacterCodingException { + int limit = leftover.position(); + if (limit == 0) { + // no leftover + return false; + } else { + CoderResult res = null; + while (in.hasRemaining()) { + leftover.position(limit); + leftover.limit(++limit); + leftover.put(in.get()); + leftover.position(0); + res = decoder.decode(leftover, out, + endOfInput && !in.hasRemaining()); + int remaining = leftover.remaining(); + if (remaining > 0) { + assert leftover.position() == 0; + leftover.position(remaining); + } else { + leftover.position(0); + } + leftover.limit(leftover.capacity()); + if (res.isUnderflow() && remaining > 0 && in.hasRemaining()) { + continue; + } + if (res.isError()) { + res.throwException(); + } + assert !res.isOverflow(); + return false; + } + return !endOfInput; + } + } + + // extract characters from start to end and remove them from + // the StringBuilder + private static String take(StringBuilder b, int start, int end) { + assert start == 0; + String line; + if (end == start) return ""; + line = b.substring(start, end); + b.delete(start, end); + return line; + } + + // finds end of line, returns -1 if not found, or the position after + // the line delimiter if found, removing the delimiter in the process. + private static int endOfLine(StringBuilder b, String eol, boolean endOfInput) { + int len = b.length(); + if (eol != null) { // delimiter explicitly specified + int i = b.indexOf(eol); + if (i >= 0) { + // remove the delimiter and returns the position + // of the char after it. + b.delete(i, i + eol.length()); + return i; + } + } else { // no delimiter specified, behaves as BufferedReader::readLine + boolean crfound = false; + for (int i = 0; i < len; i++) { + char c = b.charAt(i); + if (c == '\n') { + // '\n' or '\r\n' found. + // remove the delimiter and returns the position + // of the char after it. + b.delete(crfound ? i - 1 : i, i + 1); + return crfound ? i - 1 : i; + } else if (crfound) { + // previous char was '\r', c != '\n' + assert i != 0; + // remove the delimiter and returns the position + // of the char after it. + b.delete(i - 1, i); + return i - 1; + } + crfound = c == '\r'; + } + if (crfound && endOfInput) { + // remove the delimiter and returns the position + // of the char after it. + b.delete(len - 1, len); + return len - 1; + } + } + return endOfInput && len > 0 ? len : -1; + } + + // Looks at whether the StringBuilder contains a line. + // Returns null if more character are needed. + private static String nextLine(StringBuilder b, String eol, boolean endOfInput) { + int next = endOfLine(b, eol, endOfInput); + return (next > -1) ? take(b, 0, next) : null; + } + + // Attempts to read the next line. Returns the next line if + // the delimiter was found, null otherwise. The delimiters are + // consumed. + private String nextLine() + throws CharacterCodingException { + assert nextLine == null; + LINES: + while (nextLine == null) { + boolean endOfInput = completed && queue.isEmpty(); + nextLine = nextLine(builder, newline, + endOfInput && leftover.position() == 0); + if (nextLine != null) return nextLine; + ByteBuffer b; + BUFFERS: + while ((b = queue.peek()) != null) { + if (!b.hasRemaining()) { + queue.poll(); + continue BUFFERS; + } + BYTES: + while (b.hasRemaining()) { + buffer.position(0); + buffer.limit(buffer.capacity()); + boolean endofInput = completed && queue.size() <= 1; + if (isUnderFlow(b, buffer, endofInput)) { + assert !b.hasRemaining(); + if (buffer.position() > 0) { + buffer.flip(); + builder.append(buffer); + } + continue BUFFERS; + } + CoderResult res = decoder.decode(b, buffer, endofInput); + if (res.isError()) res.throwException(); + if (buffer.position() > 0) { + buffer.flip(); + builder.append(buffer); + continue LINES; + } + if (res.isUnderflow() && b.hasRemaining()) { + //System.out.println("underflow: adding " + b.remaining() + " bytes"); + leftover.put(b); + assert !b.hasRemaining(); + continue BUFFERS; + } + } + } + + assert queue.isEmpty(); + if (endOfInput) { + // Time to cleanup: there may be some undecoded leftover bytes + // We need to flush them out. + // The decoder has been configured to replace malformed/unmappable + // chars with some replacement, in order to behave like + // InputStreamReader. + leftover.flip(); + buffer.position(0); + buffer.limit(buffer.capacity()); + + // decode() must be called just before flush, even if there + // is nothing to decode. We must do this even if leftover + // has no remaining bytes. + CoderResult res = decoder.decode(leftover, buffer, endOfInput); + if (buffer.position() > 0) { + buffer.flip(); + builder.append(buffer); + } + if (res.isError()) res.throwException(); + + // Now call decoder.flush() + buffer.position(0); + buffer.limit(buffer.capacity()); + res = decoder.flush(buffer); + if (buffer.position() > 0) { + buffer.flip(); + builder.append(buffer); + } + if (res.isError()) res.throwException(); + + // It's possible that we reach here twice - just for the + // purpose of checking that no bytes were left over, so + // we reset leftover/decoder to make the function reentrant. + leftover.position(0); + leftover.limit(leftover.capacity()); + decoder.reset(); + + // if some chars were produced then this call will + // return them. + return nextLine = nextLine(builder, newline, endOfInput); + } + return null; + } + return null; + } + + // The main sequential scheduler loop. + private void loop() { + try { + while (!cancelled) { + Throwable error = errorRef.get(); + if (error != null) { + cancelled = true; + scheduler.stop(); + upstream.onError(error); + cf.completeExceptionally(error); + return; + } + if (nextLine == null) nextLine = nextLine(); + if (nextLine == null) { + if (completed) { + scheduler.stop(); + if (leftover.position() != 0) { + // Underflow: not all bytes could be + // decoded, but no more bytes will be coming. + // This should not happen as we should already + // have got a MalformedInputException, or + // replaced the unmappable chars. + errorRef.compareAndSet(null, + new IllegalStateException( + "premature end of input (" + + leftover.position() + + " undecoded bytes)")); + continue; + } else { + upstream.onComplete(); + } + return; + } else if (demanded.get() == 0 + && !downstreamDemand.isFulfilled()) { + long incr = Math.max(1, downstreamDemand.get()); + demanded.addAndGet(incr); + upstreamSubscription.request(incr); + continue; + } else return; + } + assert nextLine != null; + assert newline != null && !nextLine.endsWith(newline) + || !nextLine.endsWith("\n") || !nextLine.endsWith("\r"); + if (downstreamDemand.tryDecrement()) { + String forward = nextLine; + nextLine = null; + upstream.onNext(forward); + } else return; // no demand: come back later + } + } catch (Throwable t) { + try { + upstreamSubscription.cancel(); + } finally { + signalError(t); + } + } + } + + static LineSubscription create(Flow.Subscription s, + Charset charset, + String lineSeparator, + Flow.Subscriber upstream, + CompletableFuture cf) { + return new LineSubscription(Objects.requireNonNull(s), + Objects.requireNonNull(charset).newDecoder() + // use the same decoder configuration than + // java.io.InputStreamReader + .onMalformedInput(CodingErrorAction.REPLACE) + .onUnmappableCharacter(CodingErrorAction.REPLACE), + lineSeparator, + Objects.requireNonNull(upstream), + Objects.requireNonNull(cf)); + } + } +} +