/*
* Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.internal.net.http;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import jdk.internal.net.http.common.Demand;
import java.net.http.HttpResponse.BodySubscriber;
import jdk.internal.net.http.common.MinimalFuture;
import jdk.internal.net.http.common.SequentialScheduler;
/** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber<String>}. */
public final class LineSubscriberAdapter<S extends Subscriber<? super String>,R>
implements BodySubscriber<R> {
private final CompletableFuture<R> cf = new MinimalFuture<>();
private final S subscriber;
private final Function<? super S, ? extends R> finisher;
private final Charset charset;
private final String eol;
private final AtomicBoolean subscribed = new AtomicBoolean();
private volatile LineSubscription downstream;
private LineSubscriberAdapter(S subscriber,
Function<? super S, ? extends R> finisher,
Charset charset,
String eol) {
if (eol != null && eol.isEmpty())
throw new IllegalArgumentException("empty line separator");
this.subscriber = Objects.requireNonNull(subscriber);
this.finisher = Objects.requireNonNull(finisher);
this.charset = Objects.requireNonNull(charset);
this.eol = eol;
}
@Override
public void onSubscribe(Subscription subscription) {
Objects.requireNonNull(subscription);
if (!subscribed.compareAndSet(false, true)) {
subscription.cancel();
return;
}
downstream = LineSubscription.create(subscription,
charset,
eol,
subscriber,
cf);
subscriber.onSubscribe(downstream);
}
@Override
public void onNext(List<ByteBuffer> item) {
Objects.requireNonNull(item);
try {
downstream.submit(item);
} catch (Throwable t) {
onError(t);
}
}
@Override
public void onError(Throwable throwable) {
Objects.requireNonNull(throwable);
try {
downstream.signalError(throwable);
} finally {
cf.completeExceptionally(throwable);
}
}
@Override
public void onComplete() {
try {
downstream.signalComplete();
} finally {
cf.complete(finisher.apply(subscriber));
}
}
@Override
public CompletionStage<R> getBody() {
return cf;
}
public static <S extends Subscriber<? super String>, R> LineSubscriberAdapter<S, R>
create(S subscriber, Function<? super S, ? extends R> finisher, Charset charset, String eol)
{
if (eol != null && eol.isEmpty())
throw new IllegalArgumentException("empty line separator");
return new LineSubscriberAdapter<>(Objects.requireNonNull(subscriber),
Objects.requireNonNull(finisher),
Objects.requireNonNull(charset),
eol);
}
static final class LineSubscription implements Flow.Subscription {
final Flow.Subscription upstreamSubscription;
final CharsetDecoder decoder;
final String newline;
final Demand downstreamDemand;
final ConcurrentLinkedDeque<ByteBuffer> queue;
final SequentialScheduler scheduler;
final Flow.Subscriber<? super String> upstream;
final CompletableFuture<?> cf;
private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
private final AtomicLong demanded = new AtomicLong();
private volatile boolean completed;
private volatile boolean cancelled;
private final char[] chars = new char[1024];
private final ByteBuffer leftover = ByteBuffer.wrap(new byte[64]);
private final CharBuffer buffer = CharBuffer.wrap(chars);
private final StringBuilder builder = new StringBuilder();
private String nextLine;
private LineSubscription(Flow.Subscription s,
CharsetDecoder dec,
String separator,
Flow.Subscriber<? super String> subscriber,
CompletableFuture<?> completion) {
downstreamDemand = new Demand();
queue = new ConcurrentLinkedDeque<>();
upstreamSubscription = Objects.requireNonNull(s);
decoder = Objects.requireNonNull(dec);
newline = separator;
upstream = Objects.requireNonNull(subscriber);
cf = Objects.requireNonNull(completion);
scheduler = SequentialScheduler.synchronizedScheduler(this::loop);
}
@Override
public void request(long n) {
if (cancelled) return;
if (downstreamDemand.increase(n)) {
scheduler.runOrSchedule();
}
}
@Override
public void cancel() {
cancelled = true;
upstreamSubscription.cancel();
}
public void submit(List<ByteBuffer> list) {
queue.addAll(list);
demanded.decrementAndGet();
scheduler.runOrSchedule();
}
public void signalComplete() {
completed = true;
scheduler.runOrSchedule();
}
public void signalError(Throwable error) {
if (errorRef.compareAndSet(null,
Objects.requireNonNull(error))) {
scheduler.runOrSchedule();
}
}
// This method looks at whether some bytes where left over (in leftover)
// from decoding the previous buffer when the previous buffer was in
// underflow. If so, it takes bytes one by one from the new buffer 'in'
// and combines them with the leftover bytes until 'in' is exhausted or a
// character was produced in 'out', resolving the previous underflow.
// Returns true if the buffer is still in underflow, false otherwise.
// However, in both situation some chars might have been produced in 'out'.
private boolean isUnderFlow(ByteBuffer in, CharBuffer out, boolean endOfInput)
throws CharacterCodingException {
int limit = leftover.position();
if (limit == 0) {
// no leftover
return false;
} else {
CoderResult res = null;
while (in.hasRemaining()) {
leftover.position(limit);
leftover.limit(++limit);
leftover.put(in.get());
leftover.position(0);
res = decoder.decode(leftover, out,
endOfInput && !in.hasRemaining());
int remaining = leftover.remaining();
if (remaining > 0) {
assert leftover.position() == 0;
leftover.position(remaining);
} else {
leftover.position(0);
}
leftover.limit(leftover.capacity());
if (res.isUnderflow() && remaining > 0 && in.hasRemaining()) {
continue;
}
if (res.isError()) {
res.throwException();
}
assert !res.isOverflow();
return false;
}
return !endOfInput;
}
}
// extract characters from start to end and remove them from
// the StringBuilder
private static String take(StringBuilder b, int start, int end) {
assert start == 0;
String line;
if (end == start) return "";
line = b.substring(start, end);
b.delete(start, end);
return line;
}
// finds end of line, returns -1 if not found, or the position after
// the line delimiter if found, removing the delimiter in the process.
private static int endOfLine(StringBuilder b, String eol, boolean endOfInput) {
int len = b.length();
if (eol != null) { // delimiter explicitly specified
int i = b.indexOf(eol);
if (i >= 0) {
// remove the delimiter and returns the position
// of the char after it.
b.delete(i, i + eol.length());
return i;
}
} else { // no delimiter specified, behaves as BufferedReader::readLine
boolean crfound = false;
for (int i = 0; i < len; i++) {
char c = b.charAt(i);
if (c == '\n') {
// '\n' or '\r\n' found.
// remove the delimiter and returns the position
// of the char after it.
b.delete(crfound ? i - 1 : i, i + 1);
return crfound ? i - 1 : i;
} else if (crfound) {
// previous char was '\r', c != '\n'
assert i != 0;
// remove the delimiter and returns the position
// of the char after it.
b.delete(i - 1, i);
return i - 1;
}
crfound = c == '\r';
}
if (crfound && endOfInput) {
// remove the delimiter and returns the position
// of the char after it.
b.delete(len - 1, len);
return len - 1;
}
}
return endOfInput && len > 0 ? len : -1;
}
// Looks at whether the StringBuilder contains a line.
// Returns null if more character are needed.
private static String nextLine(StringBuilder b, String eol, boolean endOfInput) {
int next = endOfLine(b, eol, endOfInput);
return (next > -1) ? take(b, 0, next) : null;
}
// Attempts to read the next line. Returns the next line if
// the delimiter was found, null otherwise. The delimiters are
// consumed.
private String nextLine()
throws CharacterCodingException {
assert nextLine == null;
LINES:
while (nextLine == null) {
boolean endOfInput = completed && queue.isEmpty();
nextLine = nextLine(builder, newline,
endOfInput && leftover.position() == 0);
if (nextLine != null) return nextLine;
ByteBuffer b;
BUFFERS:
while ((b = queue.peek()) != null) {
if (!b.hasRemaining()) {
queue.poll();
continue BUFFERS;
}
BYTES:
while (b.hasRemaining()) {
buffer.position(0);
buffer.limit(buffer.capacity());
boolean endofInput = completed && queue.size() <= 1;
if (isUnderFlow(b, buffer, endofInput)) {
assert !b.hasRemaining();
if (buffer.position() > 0) {
buffer.flip();
builder.append(buffer);
}
continue BUFFERS;
}
CoderResult res = decoder.decode(b, buffer, endofInput);
if (res.isError()) res.throwException();
if (buffer.position() > 0) {
buffer.flip();
builder.append(buffer);
continue LINES;
}
if (res.isUnderflow() && b.hasRemaining()) {
//System.out.println("underflow: adding " + b.remaining() + " bytes");
leftover.put(b);
assert !b.hasRemaining();
continue BUFFERS;
}
}
}
assert queue.isEmpty();
if (endOfInput) {
// Time to cleanup: there may be some undecoded leftover bytes
// We need to flush them out.
// The decoder has been configured to replace malformed/unmappable
// chars with some replacement, in order to behave like
// InputStreamReader.
leftover.flip();
buffer.position(0);
buffer.limit(buffer.capacity());
// decode() must be called just before flush, even if there
// is nothing to decode. We must do this even if leftover
// has no remaining bytes.
CoderResult res = decoder.decode(leftover, buffer, endOfInput);
if (buffer.position() > 0) {
buffer.flip();
builder.append(buffer);
}
if (res.isError()) res.throwException();
// Now call decoder.flush()
buffer.position(0);
buffer.limit(buffer.capacity());
res = decoder.flush(buffer);
if (buffer.position() > 0) {
buffer.flip();
builder.append(buffer);
}
if (res.isError()) res.throwException();
// It's possible that we reach here twice - just for the
// purpose of checking that no bytes were left over, so
// we reset leftover/decoder to make the function reentrant.
leftover.position(0);
leftover.limit(leftover.capacity());
decoder.reset();
// if some chars were produced then this call will
// return them.
return nextLine = nextLine(builder, newline, endOfInput);
}
return null;
}
return null;
}
// The main sequential scheduler loop.
private void loop() {
try {
while (!cancelled) {
Throwable error = errorRef.get();
if (error != null) {
cancelled = true;
scheduler.stop();
upstream.onError(error);
cf.completeExceptionally(error);
return;
}
if (nextLine == null) nextLine = nextLine();
if (nextLine == null) {
if (completed) {
scheduler.stop();
if (leftover.position() != 0) {
// Underflow: not all bytes could be
// decoded, but no more bytes will be coming.
// This should not happen as we should already
// have got a MalformedInputException, or
// replaced the unmappable chars.
errorRef.compareAndSet(null,
new IllegalStateException(
"premature end of input ("
+ leftover.position()
+ " undecoded bytes)"));
continue;
} else {
upstream.onComplete();
}
return;
} else if (demanded.get() == 0
&& !downstreamDemand.isFulfilled()) {
long incr = Math.max(1, downstreamDemand.get());
demanded.addAndGet(incr);
upstreamSubscription.request(incr);
continue;
} else return;
}
assert nextLine != null;
assert newline != null && !nextLine.endsWith(newline)
|| !nextLine.endsWith("\n") || !nextLine.endsWith("\r");
if (downstreamDemand.tryDecrement()) {
String forward = nextLine;
nextLine = null;
upstream.onNext(forward);
} else return; // no demand: come back later
}
} catch (Throwable t) {
try {
upstreamSubscription.cancel();
} finally {
signalError(t);
}
}
}
static LineSubscription create(Flow.Subscription s,
Charset charset,
String lineSeparator,
Flow.Subscriber<? super String> upstream,
CompletableFuture<?> cf) {
return new LineSubscription(Objects.requireNonNull(s),
Objects.requireNonNull(charset).newDecoder()
// use the same decoder configuration than
// java.io.InputStreamReader
.onMalformedInput(CodingErrorAction.REPLACE)
.onUnmappableCharacter(CodingErrorAction.REPLACE),
lineSeparator,
Objects.requireNonNull(upstream),
Objects.requireNonNull(cf));
}
}
}