diff -r 38fac6d0521d -r 42208b2f224e src/java.net.http/share/classes/java/net/http/internal/LineSubscriberAdapter.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/java.net.http/share/classes/java/net/http/internal/LineSubscriberAdapter.java Wed Feb 07 14:17:24 2018 +0000 @@ -0,0 +1,467 @@ +/* + * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +package java.net.http.internal; + +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CoderResult; +import java.nio.charset.CodingErrorAction; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.Flow; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.net.http.internal.common.Demand; +import java.net.http.HttpResponse.BodySubscriber; +import java.net.http.internal.common.MinimalFuture; +import java.net.http.internal.common.SequentialScheduler; + +/** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber}. */ +public final class LineSubscriberAdapter,R> + implements BodySubscriber { + private final CompletableFuture cf = new MinimalFuture<>(); + private final S subscriber; + private final Function finisher; + private final Charset charset; + private final String eol; + private volatile LineSubscription downstream; + + private LineSubscriberAdapter(S subscriber, + Function finisher, + Charset charset, + String eol) { + if (eol != null && eol.isEmpty()) + throw new IllegalArgumentException("empty line separator"); + this.subscriber = Objects.requireNonNull(subscriber); + this.finisher = Objects.requireNonNull(finisher); + this.charset = Objects.requireNonNull(charset); + this.eol = eol; + } + + @Override + public void onSubscribe(Subscription subscription) { + downstream = LineSubscription.create(subscription, + charset, + eol, + subscriber, + cf); + subscriber.onSubscribe(downstream); + } + + @Override + public void onNext(List item) { + try { + downstream.submit(item); + } catch (Throwable t) { + onError(t); + } + } + + @Override + public void onError(Throwable throwable) { + try { + downstream.signalError(throwable); + } finally { + cf.completeExceptionally(throwable); + } + } + + @Override + public void onComplete() { + try { + downstream.signalComplete(); + } finally { + cf.complete(finisher.apply(subscriber)); + } + } + + @Override + public CompletionStage getBody() { + return cf; + } + + public static , R> LineSubscriberAdapter + create(S subscriber, Function finisher, Charset charset, String eol) + { + if (eol != null && eol.isEmpty()) + throw new IllegalArgumentException("empty line separator"); + return new LineSubscriberAdapter<>(Objects.requireNonNull(subscriber), + Objects.requireNonNull(finisher), + Objects.requireNonNull(charset), + eol); + } + + static final class LineSubscription implements Flow.Subscription { + final Flow.Subscription upstreamSubscription; + final CharsetDecoder decoder; + final String newline; + final Demand downstreamDemand; + final ConcurrentLinkedDeque queue; + final SequentialScheduler scheduler; + final Flow.Subscriber upstream; + final CompletableFuture cf; + private final AtomicReference errorRef = new AtomicReference<>(); + private final AtomicLong demanded = new AtomicLong(); + private volatile boolean completed; + private volatile boolean cancelled; + + private final char[] chars = new char[1024]; + private final ByteBuffer leftover = ByteBuffer.wrap(new byte[64]); + private final CharBuffer buffer = CharBuffer.wrap(chars); + private final StringBuilder builder = new StringBuilder(); + private int lineCount; + private String nextLine; + + private LineSubscription(Flow.Subscription s, + CharsetDecoder dec, + String separator, + Flow.Subscriber subscriber, + CompletableFuture completion) { + downstreamDemand = new Demand(); + queue = new ConcurrentLinkedDeque<>(); + upstreamSubscription = Objects.requireNonNull(s); + decoder = Objects.requireNonNull(dec); + newline = separator; + upstream = Objects.requireNonNull(subscriber); + cf = Objects.requireNonNull(completion); + scheduler = SequentialScheduler.synchronizedScheduler(this::loop); + } + + @Override + public void request(long n) { + if (cancelled) return; + if (downstreamDemand.increase(n)) { + scheduler.runOrSchedule(); + } + } + + @Override + public void cancel() { + cancelled = true; + upstreamSubscription.cancel(); + } + + public void submit(List list) { + queue.addAll(list); + demanded.decrementAndGet(); + scheduler.runOrSchedule(); + } + + public void signalComplete() { + completed = true; + scheduler.runOrSchedule(); + } + + public void signalError(Throwable error) { + if (errorRef.compareAndSet(null, + Objects.requireNonNull(error))) { + scheduler.runOrSchedule(); + } + } + + // This method looks at whether some bytes where left over (in leftover) + // from decoding the previous buffer when the previous buffer was in + // underflow. If so, it takes bytes one by one from the new buffer 'in' + // and combines them with the leftover bytes until 'in' is exhausted or a + // character was produced in 'out', resolving the previous underflow. + // Returns true if the buffer is still in underflow, false otherwise. + // However, in both situation some chars might have been produced in 'out'. + private boolean isUnderFlow(ByteBuffer in, CharBuffer out, boolean endOfInput) + throws CharacterCodingException { + int limit = leftover.position(); + if (limit == 0) { + // no leftover + return false; + } else { + CoderResult res = null; + while (in.hasRemaining()) { + leftover.position(limit); + leftover.limit(++limit); + leftover.put(in.get()); + leftover.position(0); + res = decoder.decode(leftover, out, + endOfInput && !in.hasRemaining()); + int remaining = leftover.remaining(); + if (remaining > 0) { + assert leftover.position() == 0; + leftover.position(remaining); + } else { + leftover.position(0); + } + leftover.limit(leftover.capacity()); + if (res.isUnderflow() && remaining > 0 && in.hasRemaining()) { + continue; + } + if (res.isError()) { + res.throwException(); + } + assert !res.isOverflow(); + return false; + } + return !endOfInput; + } + } + + // extract characters from start to end and remove them from + // the StringBuilder + private static String take(StringBuilder b, int start, int end) { + assert start == 0; + String line; + if (end == start) return ""; + line = b.substring(start, end); + b.delete(start, end); + return line; + } + + // finds end of line, returns -1 if not found, or the position after + // the line delimiter if found, removing the delimiter in the process. + private static int endOfLine(StringBuilder b, String eol, boolean endOfInput) { + int len = b.length(); + if (eol != null) { // delimiter explicitly specified + int i = b.indexOf(eol); + if (i >= 0) { + // remove the delimiter and returns the position + // of the char after it. + b.delete(i, i + eol.length()); + return i; + } + } else { // no delimiter specified, behaves as BufferedReader::readLine + boolean crfound = false; + for (int i = 0; i < len; i++) { + char c = b.charAt(i); + if (c == '\n') { + // '\n' or '\r\n' found. + // remove the delimiter and returns the position + // of the char after it. + b.delete(crfound ? i - 1 : i, i + 1); + return crfound ? i - 1 : i; + } else if (crfound) { + // previous char was '\r', c != '\n' + assert i != 0; + // remove the delimiter and returns the position + // of the char after it. + b.delete(i - 1, i); + return i - 1; + } + crfound = c == '\r'; + } + if (crfound && endOfInput) { + // remove the delimiter and returns the position + // of the char after it. + b.delete(len - 1, len); + return len - 1; + } + } + return endOfInput && len > 0 ? len : -1; + } + + // Looks at whether the StringBuilder contains a line. + // Returns null if more character are needed. + private static String nextLine(StringBuilder b, String eol, boolean endOfInput) { + int next = endOfLine(b, eol, endOfInput); + return (next > -1) ? take(b, 0, next) : null; + } + + // Attempts to read the next line. Returns the next line if + // the delimiter was found, null otherwise. The delimiters are + // consumed. + private String nextLine() + throws CharacterCodingException { + assert nextLine == null; + LINES: + while (nextLine == null) { + boolean endOfInput = completed && queue.isEmpty(); + nextLine = nextLine(builder, newline, + endOfInput && leftover.position() == 0); + if (nextLine != null) return nextLine; + ByteBuffer b; + BUFFERS: + while ((b = queue.peek()) != null) { + if (!b.hasRemaining()) { + queue.poll(); + continue BUFFERS; + } + BYTES: + while (b.hasRemaining()) { + buffer.position(0); + buffer.limit(buffer.capacity()); + boolean endofInput = completed && queue.size() <= 1; + if (isUnderFlow(b, buffer, endofInput)) { + assert !b.hasRemaining(); + if (buffer.position() > 0) { + buffer.flip(); + builder.append(buffer); + } + continue BUFFERS; + } + CoderResult res = decoder.decode(b, buffer, endofInput); + if (res.isError()) res.throwException(); + if (buffer.position() > 0) { + buffer.flip(); + builder.append(buffer); + continue LINES; + } + if (res.isUnderflow() && b.hasRemaining()) { + //System.out.println("underflow: adding " + b.remaining() + " bytes"); + leftover.put(b); + assert !b.hasRemaining(); + continue BUFFERS; + } + } + } + + assert queue.isEmpty(); + if (endOfInput) { + // Time to cleanup: there may be some undecoded leftover bytes + // We need to flush them out. + // The decoder has been configured to replace malformed/unmappable + // chars with some replacement, in order to behave like + // InputStreamReader. + leftover.flip(); + buffer.position(0); + buffer.limit(buffer.capacity()); + + // decode() must be called just before flush, even if there + // is nothing to decode. We must do this even if leftover + // has no remaining bytes. + CoderResult res = decoder.decode(leftover, buffer, endOfInput); + if (buffer.position() > 0) { + buffer.flip(); + builder.append(buffer); + } + if (res.isError()) res.throwException(); + + // Now call decoder.flush() + buffer.position(0); + buffer.limit(buffer.capacity()); + res = decoder.flush(buffer); + if (buffer.position() > 0) { + buffer.flip(); + builder.append(buffer); + } + if (res.isError()) res.throwException(); + + // It's possible that we reach here twice - just for the + // purpose of checking that no bytes were left over, so + // we reset leftover/decoder to make the function reentrant. + leftover.position(0); + leftover.limit(leftover.capacity()); + decoder.reset(); + + // if some chars were produced then this call will + // return them. + return nextLine = nextLine(builder, newline, endOfInput); + } + return null; + } + return null; + } + + // The main sequential scheduler loop. + private void loop() { + try { + while (!cancelled) { + Throwable error = errorRef.get(); + if (error != null) { + cancelled = true; + scheduler.stop(); + upstream.onError(error); + cf.completeExceptionally(error); + return; + } + if (nextLine == null) nextLine = nextLine(); + if (nextLine == null) { + if (completed) { + scheduler.stop(); + if (leftover.position() != 0) { + // Underflow: not all bytes could be + // decoded, but no more bytes will be coming. + // This should not happen as we should already + // have got a MalformedInputException, or + // replaced the unmappable chars. + errorRef.compareAndSet(null, + new IllegalStateException( + "premature end of input (" + + leftover.position() + + " undecoded bytes)")); + continue; + } else { + upstream.onComplete(); + } + return; + } else if (demanded.get() == 0 + && !downstreamDemand.isFulfilled()) { + long incr = Math.max(1, downstreamDemand.get()); + demanded.addAndGet(incr); + upstreamSubscription.request(incr); + continue; + } else return; + } + assert nextLine != null; + assert newline != null && !nextLine.endsWith(newline) + || !nextLine.endsWith("\n") || !nextLine.endsWith("\r"); + if (downstreamDemand.tryDecrement()) { + String forward = nextLine; + nextLine = null; + upstream.onNext(forward); + } else return; // no demand: come back later + } + } catch (Throwable t) { + try { + upstreamSubscription.cancel(); + } finally { + signalError(t); + } + } + } + + static LineSubscription create(Flow.Subscription s, + Charset charset, + String lineSeparator, + Flow.Subscriber upstream, + CompletableFuture cf) { + return new LineSubscription(Objects.requireNonNull(s), + Objects.requireNonNull(charset).newDecoder() + // use the same decoder configuration than + // java.io.InputStreamReader + .onMalformedInput(CodingErrorAction.REPLACE) + .onUnmappableCharacter(CodingErrorAction.REPLACE), + lineSeparator, + Objects.requireNonNull(upstream), + Objects.requireNonNull(cf)); + } + } +} +