src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/LineSubscriberAdapter.java
author chegar
Tue, 06 Feb 2018 11:39:55 +0000
branchhttp-client-branch
changeset 56078 6c11b48a0695
parent 56009 src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/LineSubscriberAdapter.java@cf8792f51dee
permissions -rw-r--r--
http-client-branch: move LineSubscriberAdapter to internal

/*
 * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package jdk.incubator.http.internal;

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import jdk.incubator.http.internal.common.Demand;
import jdk.incubator.http.HttpResponse.BodySubscriber;
import jdk.incubator.http.internal.common.MinimalFuture;
import jdk.incubator.http.internal.common.SequentialScheduler;

/** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber<String>}. */
public final class LineSubscriberAdapter<S extends Subscriber<? super String>,R>
        implements BodySubscriber<R> {
    private final CompletableFuture<R> cf = new MinimalFuture<>();
    private final S subscriber;
    private final Function<S, R> finisher;
    private final Charset charset;
    private final String eol;
    private volatile LineSubscription downstream;

    private LineSubscriberAdapter(S subscriber,
                                  Function<S, R> finisher,
                                  Charset charset,
                                  String eol) {
        if (eol != null && eol.isEmpty())
            throw new IllegalArgumentException("empty line separator");
        this.subscriber = Objects.requireNonNull(subscriber);
        this.finisher = Objects.requireNonNull(finisher);
        this.charset = Objects.requireNonNull(charset);
        this.eol = eol;
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        downstream = LineSubscription.create(subscription,
                                             charset,
                                             eol,
                                             subscriber,
                                             cf);
        subscriber.onSubscribe(downstream);
    }

    @Override
    public void onNext(List<ByteBuffer> item) {
        try {
            downstream.submit(item);
        } catch (Throwable t) {
            onError(t);
        }
    }

    @Override
    public void onError(Throwable throwable) {
        try {
            downstream.signalError(throwable);
        } finally {
            cf.completeExceptionally(throwable);
        }
    }

    @Override
    public void onComplete() {
        try {
            downstream.signalComplete();
        } finally {
            cf.complete(finisher.apply(subscriber));
        }
    }

    @Override
    public CompletionStage<R> getBody() {
        return cf;
    }

    public static <S extends Subscriber<? super String>, R> LineSubscriberAdapter<S, R>
    create(S subscriber, Function<S, R> finisher, Charset charset, String eol)
    {
        if (eol != null && eol.isEmpty())
            throw new IllegalArgumentException("empty line separator");
        return new LineSubscriberAdapter<>(Objects.requireNonNull(subscriber),
                Objects.requireNonNull(finisher),
                Objects.requireNonNull(charset),
                eol);
    }

    static final class LineSubscription implements Flow.Subscription {
        final Flow.Subscription upstreamSubscription;
        final CharsetDecoder decoder;
        final String newline;
        final Demand downstreamDemand;
        final ConcurrentLinkedDeque<ByteBuffer> queue;
        final SequentialScheduler scheduler;
        final Flow.Subscriber<? super String> upstream;
        final CompletableFuture<?> cf;
        private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
        private final AtomicLong demanded = new AtomicLong();
        private volatile boolean completed;
        private volatile boolean cancelled;

        private final char[] chars = new char[1024];
        private final ByteBuffer leftover = ByteBuffer.wrap(new byte[64]);
        private final CharBuffer buffer = CharBuffer.wrap(chars);
        private final StringBuilder builder = new StringBuilder();
        private int lineCount;
        private String nextLine;

        private LineSubscription(Flow.Subscription s,
                                 CharsetDecoder dec,
                                 String separator,
                                 Flow.Subscriber<? super String> subscriber,
                                 CompletableFuture<?> completion) {
            downstreamDemand = new Demand();
            queue = new ConcurrentLinkedDeque<>();
            upstreamSubscription = Objects.requireNonNull(s);
            decoder = Objects.requireNonNull(dec);
            newline = separator;
            upstream = Objects.requireNonNull(subscriber);
            cf = Objects.requireNonNull(completion);
            scheduler = SequentialScheduler.synchronizedScheduler(this::loop);
        }

        @Override
        public void request(long n) {
            if (cancelled) return;
            if (downstreamDemand.increase(n)) {
                scheduler.runOrSchedule();
            }
        }

        @Override
        public void cancel() {
            cancelled = true;
            upstreamSubscription.cancel();
        }

        public void submit(List<ByteBuffer> list) {
            queue.addAll(list);
            demanded.decrementAndGet();
            scheduler.runOrSchedule();
        }

        public void signalComplete() {
            completed = true;
            scheduler.runOrSchedule();
        }

        public void signalError(Throwable error) {
            if (errorRef.compareAndSet(null,
                    Objects.requireNonNull(error))) {
                scheduler.runOrSchedule();
            }
        }

        // This method looks at whether some bytes where left over (in leftover)
        // from decoding the previous buffer when the previous buffer was in
        // underflow. If so, it takes bytes one by one from the new buffer 'in'
        // and combines them with the leftover bytes until 'in' is exhausted or a
        // character was produced in 'out', resolving the previous underflow.
        // Returns true if the buffer is still in underflow, false otherwise.
        // However, in both situation some chars might have been produced in 'out'.
        private boolean isUnderFlow(ByteBuffer in, CharBuffer out, boolean endOfInput)
                throws CharacterCodingException {
            int limit = leftover.position();
            if (limit == 0) {
                // no leftover
                return false;
            } else {
                CoderResult res = null;
                while (in.hasRemaining()) {
                    leftover.position(limit);
                    leftover.limit(++limit);
                    leftover.put(in.get());
                    leftover.position(0);
                    res = decoder.decode(leftover, out,
                            endOfInput && !in.hasRemaining());
                    int remaining = leftover.remaining();
                    if (remaining > 0) {
                        assert leftover.position() == 0;
                        leftover.position(remaining);
                    } else {
                        leftover.position(0);
                    }
                    leftover.limit(leftover.capacity());
                    if (res.isUnderflow() && remaining > 0 && in.hasRemaining()) {
                        continue;
                    }
                    if (res.isError()) {
                        res.throwException();
                    }
                    assert !res.isOverflow();
                    return false;
                }
                return !endOfInput;
            }
        }

        // extract characters from start to end and remove them from
        // the StringBuilder
        private static String take(StringBuilder b, int start, int end) {
            assert start == 0;
            String line;
            if (end == start) return "";
            line = b.substring(start, end);
            b.delete(start, end);
            return line;
        }

        // finds end of line, returns -1 if not found, or the position after
        // the line delimiter if found, removing the delimiter in the process.
        private static int endOfLine(StringBuilder b, String eol, boolean endOfInput) {
            int len = b.length();
            if (eol != null) { // delimiter explicitly specified
                int i = b.indexOf(eol);
                if (i >= 0) {
                    // remove the delimiter and returns the position
                    // of the char after it.
                    b.delete(i, i + eol.length());
                    return i;
                }
            } else { // no delimiter specified, behaves as BufferedReader::readLine
                boolean crfound = false;
                for (int i = 0; i < len; i++) {
                    char c = b.charAt(i);
                    if (c == '\n') {
                        // '\n' or '\r\n' found.
                        // remove the delimiter and returns the position
                        // of the char after it.
                        b.delete(crfound ? i - 1 : i, i + 1);
                        return crfound ? i - 1 : i;
                    } else if (crfound) {
                        // previous char was '\r', c != '\n'
                        assert i != 0;
                        // remove the delimiter and returns the position
                        // of the char after it.
                        b.delete(i - 1, i);
                        return i - 1;
                    }
                    crfound = c == '\r';
                }
                if (crfound && endOfInput) {
                    // remove the delimiter and returns the position
                    // of the char after it.
                    b.delete(len - 1, len);
                    return len - 1;
                }
            }
            return endOfInput && len > 0 ? len : -1;
        }

        // Looks at whether the StringBuilder contains a line.
        // Returns null if more character are needed.
        private static String nextLine(StringBuilder b, String eol, boolean endOfInput) {
            int next = endOfLine(b, eol, endOfInput);
            return (next > -1) ? take(b, 0, next) : null;
        }

        // Attempts to read the next line. Returns the next line if
        // the delimiter was found, null otherwise. The delimiters are
        // consumed.
        private String nextLine()
                throws CharacterCodingException {
            assert nextLine == null;
            LINES:
            while (nextLine == null) {
                boolean endOfInput = completed && queue.isEmpty();
                nextLine = nextLine(builder, newline,
                        endOfInput && leftover.position() == 0);
                if (nextLine != null) return nextLine;
                ByteBuffer b;
                BUFFERS:
                while ((b = queue.peek()) != null) {
                    if (!b.hasRemaining()) {
                        queue.poll();
                        continue BUFFERS;
                    }
                    BYTES:
                    while (b.hasRemaining()) {
                        buffer.position(0);
                        buffer.limit(buffer.capacity());
                        boolean endofInput = completed && queue.size() <= 1;
                        if (isUnderFlow(b, buffer, endofInput)) {
                            assert !b.hasRemaining();
                            if (buffer.position() > 0) {
                                buffer.flip();
                                builder.append(buffer);
                            }
                            continue BUFFERS;
                        }
                        CoderResult res = decoder.decode(b, buffer, endofInput);
                        if (res.isError()) res.throwException();
                        if (buffer.position() > 0) {
                            buffer.flip();
                            builder.append(buffer);
                            continue LINES;
                        }
                        if (res.isUnderflow() && b.hasRemaining()) {
                            //System.out.println("underflow: adding " + b.remaining() + " bytes");
                            leftover.put(b);
                            assert !b.hasRemaining();
                            continue BUFFERS;
                        }
                    }
                }

                assert queue.isEmpty();
                if (endOfInput) {
                    // Time to cleanup: there may be some undecoded leftover bytes
                    // We need to flush them out.
                    // The decoder has been configured to replace malformed/unmappable
                    // chars with some replacement, in order to behave like
                    // InputStreamReader.
                    leftover.flip();
                    buffer.position(0);
                    buffer.limit(buffer.capacity());

                    // decode() must be called just before flush, even if there
                    // is nothing to decode. We must do this even if leftover
                    // has no remaining bytes.
                    CoderResult res = decoder.decode(leftover, buffer, endOfInput);
                    if (buffer.position() > 0) {
                        buffer.flip();
                        builder.append(buffer);
                    }
                    if (res.isError()) res.throwException();

                    // Now call decoder.flush()
                    buffer.position(0);
                    buffer.limit(buffer.capacity());
                    res = decoder.flush(buffer);
                    if (buffer.position() > 0) {
                        buffer.flip();
                        builder.append(buffer);
                    }
                    if (res.isError()) res.throwException();

                    // It's possible that we reach here twice - just for the
                    // purpose of checking that no bytes were left over, so
                    // we reset leftover/decoder to make the function reentrant.
                    leftover.position(0);
                    leftover.limit(leftover.capacity());
                    decoder.reset();

                    // if some chars were produced then this call will
                    // return them.
                    return nextLine = nextLine(builder, newline, endOfInput);
                }
                return null;
            }
            return null;
        }

        // The main sequential scheduler loop.
        private void loop() {
            try {
                while (!cancelled) {
                    Throwable error = errorRef.get();
                    if (error != null) {
                        cancelled = true;
                        scheduler.stop();
                        upstream.onError(error);
                        cf.completeExceptionally(error);
                        return;
                    }
                    if (nextLine == null) nextLine = nextLine();
                    if (nextLine == null) {
                        if (completed) {
                            scheduler.stop();
                            if (leftover.position() != 0) {
                                // Underflow: not all bytes could be
                                // decoded, but no more bytes will be coming.
                                // This should not happen as we should already
                                // have got a MalformedInputException, or
                                // replaced the unmappable chars.
                                errorRef.compareAndSet(null,
                                        new IllegalStateException(
                                                "premature end of input ("
                                                        + leftover.position()
                                                        + " undecoded bytes)"));
                                continue;
                            } else {
                                upstream.onComplete();
                            }
                            return;
                        } else if (demanded.get() == 0
                                && !downstreamDemand.isFulfilled()) {
                            long incr = Math.max(1, downstreamDemand.get());
                            demanded.addAndGet(incr);
                            upstreamSubscription.request(incr);
                            continue;
                        } else return;
                    }
                    assert nextLine != null;
                    assert newline != null && !nextLine.endsWith(newline)
                            || !nextLine.endsWith("\n") || !nextLine.endsWith("\r");
                    if (downstreamDemand.tryDecrement()) {
                        String forward = nextLine;
                        nextLine = null;
                        upstream.onNext(forward);
                    } else return; // no demand: come back later
                }
            } catch (Throwable t) {
                try {
                    upstreamSubscription.cancel();
                } finally {
                    signalError(t);
                }
            }
        }

        static LineSubscription create(Flow.Subscription s,
                                       Charset charset,
                                       String lineSeparator,
                                       Flow.Subscriber<? super String> upstream,
                                       CompletableFuture<?> cf) {
            return new LineSubscription(Objects.requireNonNull(s),
                    Objects.requireNonNull(charset).newDecoder()
                            // use the same decoder configuration than
                            // java.io.InputStreamReader
                            .onMalformedInput(CodingErrorAction.REPLACE)
                            .onUnmappableCharacter(CodingErrorAction.REPLACE),
                    lineSeparator,
                    Objects.requireNonNull(upstream),
                    Objects.requireNonNull(cf));
        }
    }
}