src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/ResponseContent.java
author chegar
Tue, 06 Feb 2018 14:10:28 +0000
branchhttp-client-branch
changeset 56079 d23b02f37fce
parent 56071 src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java@3353cb42b1b4
permissions -rw-r--r--
http-client-branch: more remaining impl types to internal

/*
 * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package jdk.incubator.http.internal;

import java.io.IOException;
import java.lang.System.Logger.Level;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import jdk.incubator.http.HttpHeaders;
import jdk.incubator.http.HttpResponse;
import jdk.incubator.http.internal.common.Utils;

/**
 * Implements chunked/fixed transfer encodings of HTTP/1.1 responses.
 *
 * Call pushBody() to read the body (blocking). Data and errors are provided
 * to given Consumers. After final buffer delivered, empty optional delivered
 */
class ResponseContent {

    static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.

    final HttpResponse.BodySubscriber<?> pusher;
    final int contentLength;
    final HttpHeaders headers;
    // this needs to run before we complete the body
    // so that connection can be returned to pool
    private final Runnable onFinished;
    private final String dbgTag;

    ResponseContent(HttpConnection connection,
                    int contentLength,
                    HttpHeaders h,
                    HttpResponse.BodySubscriber<?> userSubscriber,
                    Runnable onFinished)
    {
        this.pusher = userSubscriber;
        this.contentLength = contentLength;
        this.headers = h;
        this.onFinished = onFinished;
        this.dbgTag = connection.dbgString() + "/ResponseContent";
    }

    static final int LF = 10;
    static final int CR = 13;

    private boolean chunkedContent, chunkedContentInitialized;

    boolean contentChunked() throws IOException {
        if (chunkedContentInitialized) {
            return chunkedContent;
        }
        if (contentLength == -1) {
            String tc = headers.firstValue("Transfer-Encoding")
                               .orElse("");
            if (!tc.equals("")) {
                if (tc.equalsIgnoreCase("chunked")) {
                    chunkedContent = true;
                } else {
                    throw new IOException("invalid content");
                }
            } else {
                chunkedContent = false;
            }
        }
        chunkedContentInitialized = true;
        return chunkedContent;
    }

    interface BodyParser extends Consumer<ByteBuffer> {
        void onSubscribe(AbstractSubscription sub);
    }

    // Returns a parser that will take care of parsing the received byte
    // buffers and forward them to the BodySubscriber.
    // When the parser is done, it will call onComplete.
    // If parsing was successful, the throwable parameter will be null.
    // Otherwise it will be the exception that occurred
    // Note: revisit: it might be better to use a CompletableFuture than
    //       a completion handler.
    BodyParser getBodyParser(Consumer<Throwable> onComplete)
        throws IOException {
        if (contentChunked()) {
            return new ChunkedBodyParser(onComplete);
        } else {
            return new FixedLengthBodyParser(contentLength, onComplete);
        }
    }


    static enum ChunkState {READING_LENGTH, READING_DATA, DONE}
    class ChunkedBodyParser implements BodyParser {
        final ByteBuffer READMORE = Utils.EMPTY_BYTEBUFFER;
        final Consumer<Throwable> onComplete;
        final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
        final String dbgTag = ResponseContent.this.dbgTag + "/ChunkedBodyParser";

        volatile Throwable closedExceptionally;
        volatile int partialChunklen = 0; // partially read chunk len
        volatile int chunklen = -1;  // number of bytes in chunk
        volatile int bytesremaining;  // number of bytes in chunk left to be read incl CRLF
        volatile boolean cr = false;  // tryReadChunkLength has found CR
        volatile int bytesToConsume;  // number of bytes that still need to be consumed before proceeding
        volatile ChunkState state = ChunkState.READING_LENGTH; // current state
        volatile AbstractSubscription sub;
        ChunkedBodyParser(Consumer<Throwable> onComplete) {
            this.onComplete = onComplete;
        }

        String dbgString() {
            return dbgTag;
        }

        @Override
        public void onSubscribe(AbstractSubscription sub) {
            debug.log(Level.DEBUG, () ->  "onSubscribe: "
                        + pusher.getClass().getName());
            pusher.onSubscribe(this.sub = sub);
        }

        @Override
        public void accept(ByteBuffer b) {
            if (closedExceptionally != null) {
                debug.log(Level.DEBUG, () ->  "already closed: "
                            + closedExceptionally);
                return;
            }
            boolean completed = false;
            try {
                List<ByteBuffer> out = new ArrayList<>();
                do {
                    if (tryPushOneHunk(b, out))  {
                        // We're done! (true if the final chunk was parsed).
                        if (!out.isEmpty()) {
                            // push what we have and complete
                            // only reduce demand if we actually push something.
                            // we would not have come here if there was no
                            // demand.
                            boolean hasDemand = sub.demand().tryDecrement();
                            assert hasDemand;
                            pusher.onNext(Collections.unmodifiableList(out));
                        }
                        debug.log(Level.DEBUG, () ->  "done!");
                        assert closedExceptionally == null;
                        assert state == ChunkState.DONE;
                        onFinished.run();
                        pusher.onComplete();
                        completed = true;
                        onComplete.accept(closedExceptionally); // should be null
                        break;
                    }
                    // the buffer may contain several hunks, and therefore
                    // we must loop while it's not exhausted.
                } while (b.hasRemaining());

                if (!completed && !out.isEmpty()) {
                    // push what we have.
                    // only reduce demand if we actually push something.
                    // we would not have come here if there was no
                    // demand.
                    boolean hasDemand = sub.demand().tryDecrement();
                    assert hasDemand;
                    pusher.onNext(Collections.unmodifiableList(out));
                }
                assert state == ChunkState.DONE || !b.hasRemaining();
            } catch(Throwable t) {
                closedExceptionally = t;
                if (!completed) onComplete.accept(t);
            }
        }

        // reads and returns chunklen. Position of chunkbuf is first byte
        // of chunk on return. chunklen includes the CR LF at end of chunk
        // returns -1 if needs more bytes
        private int tryReadChunkLen(ByteBuffer chunkbuf) throws IOException {
            assert state == ChunkState.READING_LENGTH;
            while (chunkbuf.hasRemaining()) {
                int c = chunkbuf.get();
                if (cr) {
                    if (c == LF) {
                        return partialChunklen;
                    } else {
                        throw new IOException("invalid chunk header");
                    }
                }
                if (c == CR) {
                    cr = true;
                } else {
                    int digit = toDigit(c);
                    partialChunklen = partialChunklen * 16 + digit;
                }
            }
            return -1;
        }


        // try to consume as many bytes as specified by bytesToConsume.
        // returns the number of bytes that still need to be consumed.
        // In practice this method is only called to consume one CRLF pair
        // with bytesToConsume set to 2, so it will only return 0 (if completed),
        // 1, or 2 (if chunkbuf doesn't have the 2 chars).
        private int tryConsumeBytes(ByteBuffer chunkbuf) throws IOException {
            int n = bytesToConsume;
            if (n > 0) {
                int e = Math.min(chunkbuf.remaining(), n);

                // verifies some assertions
                // this methods is called only to consume CRLF
                if (Utils.ASSERTIONSENABLED) {
                    assert n <= 2 && e <= 2;
                    ByteBuffer tmp = chunkbuf.slice();
                    // if n == 2 assert that we will first consume CR
                    assert (n == 2 && e > 0) ? tmp.get() == CR : true;
                    // if n == 1 || n == 2 && e == 2 assert that we then consume LF
                    assert (n == 1 || e == 2) ? tmp.get() == LF : true;
                }

                chunkbuf.position(chunkbuf.position() + e);
                n -= e;
                bytesToConsume = n;
            }
            assert n >= 0;
            return n;
        }

        /**
         * Returns a ByteBuffer containing chunk of data or a "hunk" of data
         * (a chunk of a chunk if the chunk size is larger than our ByteBuffers).
         * If the given chunk does not have enough data this method return
         * an empty ByteBuffer (READMORE).
         * If we encounter the final chunk (an empty chunk) this method
         * returns null.
         */
        ByteBuffer tryReadOneHunk(ByteBuffer chunk) throws IOException {
            int unfulfilled = bytesremaining;
            int toconsume = bytesToConsume;
            ChunkState st = state;
            if (st == ChunkState.READING_LENGTH && chunklen == -1) {
                debug.log(Level.DEBUG, () ->  "Trying to read chunk len"
                        + " (remaining in buffer:"+chunk.remaining()+")");
                int clen = chunklen = tryReadChunkLen(chunk);
                if (clen == -1) return READMORE;
                debug.log(Level.DEBUG, "Got chunk len %d", clen);
                cr = false; partialChunklen = 0;
                unfulfilled = bytesremaining =  clen;
                if (clen == 0) toconsume = bytesToConsume = 2; // that was the last chunk
                else st = state = ChunkState.READING_DATA; // read the data
            }

            if (toconsume > 0) {
                debug.log(Level.DEBUG,
                        "Trying to consume bytes: %d (remaining in buffer: %s)",
                        toconsume, chunk.remaining());
                if (tryConsumeBytes(chunk) > 0) {
                    return READMORE;
                }
            }

            toconsume = bytesToConsume;
            assert toconsume == 0;


            if (st == ChunkState.READING_LENGTH) {
                // we will come here only if chunklen was 0, after having
                // consumed the trailing CRLF
                int clen = chunklen;
                assert clen == 0;
                debug.log(Level.DEBUG, "No more chunks: %d", clen);
                // the DONE state is not really needed but it helps with
                // assertions...
                state = ChunkState.DONE;
                return null;
            }

            int clen = chunklen;
            assert clen > 0;
            assert st == ChunkState.READING_DATA;

            ByteBuffer returnBuffer = READMORE; // May be a hunk or a chunk
            if (unfulfilled > 0) {
                int bytesread = chunk.remaining();
                debug.log(Level.DEBUG, "Reading chunk: available %d, needed %d",
                          bytesread, unfulfilled);

                int bytes2return = Math.min(bytesread, unfulfilled);
                debug.log(Level.DEBUG,  "Returning chunk bytes: %d", bytes2return);
                returnBuffer = Utils.sliceWithLimitedCapacity(chunk, bytes2return).asReadOnlyBuffer();
                unfulfilled = bytesremaining -= bytes2return;
                if (unfulfilled == 0) bytesToConsume = 2;
            }

            assert unfulfilled >= 0;

            if (unfulfilled == 0) {
                debug.log(Level.DEBUG,
                        "No more bytes to read - %d yet to consume.",
                        unfulfilled);
                // check whether the trailing CRLF is consumed, try to
                // consume it if not. If tryConsumeBytes needs more bytes
                // then we will come back here later - skipping the block
                // that reads data because remaining==0, and finding
                // that the two bytes are now consumed.
                if (tryConsumeBytes(chunk) == 0) {
                    // we're done for this chunk! reset all states and
                    // prepare to read the next chunk.
                    chunklen = -1;
                    partialChunklen = 0;
                    cr = false;
                    state = ChunkState.READING_LENGTH;
                    debug.log(Level.DEBUG, "Ready to read next chunk");
                }
            }
            if (returnBuffer == READMORE) {
                debug.log(Level.DEBUG, "Need more data");
            }
            return returnBuffer;
        }


        // Attempt to parse and push one hunk from the buffer.
        // Returns true if the final chunk was parsed.
        // Returns false if we need to push more chunks.
        private boolean tryPushOneHunk(ByteBuffer b, List<ByteBuffer> out)
                throws IOException {
            assert state != ChunkState.DONE;
            ByteBuffer b1 = tryReadOneHunk(b);
            if (b1 != null) {
                //assert b1.hasRemaining() || b1 == READMORE;
                if (b1.hasRemaining()) {
                    debug.log(Level.DEBUG, "Sending chunk to consumer (%d)",
                              b1.remaining());
                    out.add(b1);
                    debug.log(Level.DEBUG, "Chunk sent.");
                }
                return false; // we haven't parsed the final chunk yet.
            } else {
                return true; // we're done! the final chunk was parsed.
            }
        }

        private int toDigit(int b) throws IOException {
            if (b >= 0x30 && b <= 0x39) {
                return b - 0x30;
            }
            if (b >= 0x41 && b <= 0x46) {
                return b - 0x41 + 10;
            }
            if (b >= 0x61 && b <= 0x66) {
                return b - 0x61 + 10;
            }
            throw new IOException("Invalid chunk header byte " + b);
        }

    }

    class FixedLengthBodyParser implements BodyParser {
        final int contentLength;
        final Consumer<Throwable> onComplete;
        final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
        final String dbgTag = ResponseContent.this.dbgTag + "/FixedLengthBodyParser";
        volatile int remaining;
        volatile Throwable closedExceptionally;
        volatile AbstractSubscription sub;
        FixedLengthBodyParser(int contentLength, Consumer<Throwable> onComplete) {
            this.contentLength = this.remaining = contentLength;
            this.onComplete = onComplete;
        }

        String dbgString() {
            return dbgTag;
        }

        @Override
        public void onSubscribe(AbstractSubscription sub) {
            debug.log(Level.DEBUG, () -> "length="
                        + contentLength +", onSubscribe: "
                        + pusher.getClass().getName());
            pusher.onSubscribe(this.sub = sub);
            try {
                if (contentLength == 0) {
                    onFinished.run();
                    pusher.onComplete();
                    onComplete.accept(null);
                }
            } catch (Throwable t) {
                closedExceptionally = t;
                try {
                    pusher.onError(t);
                } finally {
                    onComplete.accept(t);
                }
            }
        }

        @Override
        public void accept(ByteBuffer b) {
            if (closedExceptionally != null) {
                debug.log(Level.DEBUG, () -> "already closed: "
                            + closedExceptionally);
                return;
            }
            boolean completed = false;
            try {
                int unfulfilled = remaining;
                debug.log(Level.DEBUG, "Parser got %d bytes (%d remaining / %d)",
                        b.remaining(), unfulfilled, contentLength);
                assert unfulfilled != 0 || contentLength == 0 || b.remaining() == 0;

                if (unfulfilled == 0 && contentLength > 0) return;

                if (b.hasRemaining() && unfulfilled > 0) {
                    // only reduce demand if we actually push something.
                    // we would not have come here if there was no
                    // demand.
                    boolean hasDemand = sub.demand().tryDecrement();
                    assert hasDemand;
                    int amount = Math.min(b.remaining(), unfulfilled);
                    unfulfilled = remaining -= amount;
                    ByteBuffer buffer = Utils.sliceWithLimitedCapacity(b, amount);
                    pusher.onNext(List.of(buffer.asReadOnlyBuffer()));
                }
                if (unfulfilled == 0) {
                    // We're done! All data has been received.
                    assert closedExceptionally == null;
                    onFinished.run();
                    pusher.onComplete();
                    completed = true;
                    onComplete.accept(closedExceptionally); // should be null
                } else {
                    assert b.remaining() == 0;
                }
            } catch (Throwable t) {
                debug.log(Level.DEBUG, "Unexpected exception", t);
                closedExceptionally = t;
                if (!completed) {
                    onComplete.accept(t);
                }
            }
        }
    }
}