src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/ResponseContent.java
branchhttp-client-branch
changeset 56079 d23b02f37fce
parent 56071 3353cb42b1b4
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/ResponseContent.java	Tue Feb 06 14:10:28 2018 +0000
@@ -0,0 +1,467 @@
+/*
+ * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.incubator.http.internal;
+
+import java.io.IOException;
+import java.lang.System.Logger.Level;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+import jdk.incubator.http.HttpHeaders;
+import jdk.incubator.http.HttpResponse;
+import jdk.incubator.http.internal.common.Utils;
+
+/**
+ * Implements chunked/fixed transfer encodings of HTTP/1.1 responses.
+ *
+ * Call pushBody() to read the body (blocking). Data and errors are provided
+ * to given Consumers. After final buffer delivered, empty optional delivered
+ */
+class ResponseContent {
+
+    static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
+
+    final HttpResponse.BodySubscriber<?> pusher;
+    final int contentLength;
+    final HttpHeaders headers;
+    // this needs to run before we complete the body
+    // so that connection can be returned to pool
+    private final Runnable onFinished;
+    private final String dbgTag;
+
+    ResponseContent(HttpConnection connection,
+                    int contentLength,
+                    HttpHeaders h,
+                    HttpResponse.BodySubscriber<?> userSubscriber,
+                    Runnable onFinished)
+    {
+        this.pusher = userSubscriber;
+        this.contentLength = contentLength;
+        this.headers = h;
+        this.onFinished = onFinished;
+        this.dbgTag = connection.dbgString() + "/ResponseContent";
+    }
+
+    static final int LF = 10;
+    static final int CR = 13;
+
+    private boolean chunkedContent, chunkedContentInitialized;
+
+    boolean contentChunked() throws IOException {
+        if (chunkedContentInitialized) {
+            return chunkedContent;
+        }
+        if (contentLength == -1) {
+            String tc = headers.firstValue("Transfer-Encoding")
+                               .orElse("");
+            if (!tc.equals("")) {
+                if (tc.equalsIgnoreCase("chunked")) {
+                    chunkedContent = true;
+                } else {
+                    throw new IOException("invalid content");
+                }
+            } else {
+                chunkedContent = false;
+            }
+        }
+        chunkedContentInitialized = true;
+        return chunkedContent;
+    }
+
+    interface BodyParser extends Consumer<ByteBuffer> {
+        void onSubscribe(AbstractSubscription sub);
+    }
+
+    // Returns a parser that will take care of parsing the received byte
+    // buffers and forward them to the BodySubscriber.
+    // When the parser is done, it will call onComplete.
+    // If parsing was successful, the throwable parameter will be null.
+    // Otherwise it will be the exception that occurred
+    // Note: revisit: it might be better to use a CompletableFuture than
+    //       a completion handler.
+    BodyParser getBodyParser(Consumer<Throwable> onComplete)
+        throws IOException {
+        if (contentChunked()) {
+            return new ChunkedBodyParser(onComplete);
+        } else {
+            return new FixedLengthBodyParser(contentLength, onComplete);
+        }
+    }
+
+
+    static enum ChunkState {READING_LENGTH, READING_DATA, DONE}
+    class ChunkedBodyParser implements BodyParser {
+        final ByteBuffer READMORE = Utils.EMPTY_BYTEBUFFER;
+        final Consumer<Throwable> onComplete;
+        final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
+        final String dbgTag = ResponseContent.this.dbgTag + "/ChunkedBodyParser";
+
+        volatile Throwable closedExceptionally;
+        volatile int partialChunklen = 0; // partially read chunk len
+        volatile int chunklen = -1;  // number of bytes in chunk
+        volatile int bytesremaining;  // number of bytes in chunk left to be read incl CRLF
+        volatile boolean cr = false;  // tryReadChunkLength has found CR
+        volatile int bytesToConsume;  // number of bytes that still need to be consumed before proceeding
+        volatile ChunkState state = ChunkState.READING_LENGTH; // current state
+        volatile AbstractSubscription sub;
+        ChunkedBodyParser(Consumer<Throwable> onComplete) {
+            this.onComplete = onComplete;
+        }
+
+        String dbgString() {
+            return dbgTag;
+        }
+
+        @Override
+        public void onSubscribe(AbstractSubscription sub) {
+            debug.log(Level.DEBUG, () ->  "onSubscribe: "
+                        + pusher.getClass().getName());
+            pusher.onSubscribe(this.sub = sub);
+        }
+
+        @Override
+        public void accept(ByteBuffer b) {
+            if (closedExceptionally != null) {
+                debug.log(Level.DEBUG, () ->  "already closed: "
+                            + closedExceptionally);
+                return;
+            }
+            boolean completed = false;
+            try {
+                List<ByteBuffer> out = new ArrayList<>();
+                do {
+                    if (tryPushOneHunk(b, out))  {
+                        // We're done! (true if the final chunk was parsed).
+                        if (!out.isEmpty()) {
+                            // push what we have and complete
+                            // only reduce demand if we actually push something.
+                            // we would not have come here if there was no
+                            // demand.
+                            boolean hasDemand = sub.demand().tryDecrement();
+                            assert hasDemand;
+                            pusher.onNext(Collections.unmodifiableList(out));
+                        }
+                        debug.log(Level.DEBUG, () ->  "done!");
+                        assert closedExceptionally == null;
+                        assert state == ChunkState.DONE;
+                        onFinished.run();
+                        pusher.onComplete();
+                        completed = true;
+                        onComplete.accept(closedExceptionally); // should be null
+                        break;
+                    }
+                    // the buffer may contain several hunks, and therefore
+                    // we must loop while it's not exhausted.
+                } while (b.hasRemaining());
+
+                if (!completed && !out.isEmpty()) {
+                    // push what we have.
+                    // only reduce demand if we actually push something.
+                    // we would not have come here if there was no
+                    // demand.
+                    boolean hasDemand = sub.demand().tryDecrement();
+                    assert hasDemand;
+                    pusher.onNext(Collections.unmodifiableList(out));
+                }
+                assert state == ChunkState.DONE || !b.hasRemaining();
+            } catch(Throwable t) {
+                closedExceptionally = t;
+                if (!completed) onComplete.accept(t);
+            }
+        }
+
+        // reads and returns chunklen. Position of chunkbuf is first byte
+        // of chunk on return. chunklen includes the CR LF at end of chunk
+        // returns -1 if needs more bytes
+        private int tryReadChunkLen(ByteBuffer chunkbuf) throws IOException {
+            assert state == ChunkState.READING_LENGTH;
+            while (chunkbuf.hasRemaining()) {
+                int c = chunkbuf.get();
+                if (cr) {
+                    if (c == LF) {
+                        return partialChunklen;
+                    } else {
+                        throw new IOException("invalid chunk header");
+                    }
+                }
+                if (c == CR) {
+                    cr = true;
+                } else {
+                    int digit = toDigit(c);
+                    partialChunklen = partialChunklen * 16 + digit;
+                }
+            }
+            return -1;
+        }
+
+
+        // try to consume as many bytes as specified by bytesToConsume.
+        // returns the number of bytes that still need to be consumed.
+        // In practice this method is only called to consume one CRLF pair
+        // with bytesToConsume set to 2, so it will only return 0 (if completed),
+        // 1, or 2 (if chunkbuf doesn't have the 2 chars).
+        private int tryConsumeBytes(ByteBuffer chunkbuf) throws IOException {
+            int n = bytesToConsume;
+            if (n > 0) {
+                int e = Math.min(chunkbuf.remaining(), n);
+
+                // verifies some assertions
+                // this methods is called only to consume CRLF
+                if (Utils.ASSERTIONSENABLED) {
+                    assert n <= 2 && e <= 2;
+                    ByteBuffer tmp = chunkbuf.slice();
+                    // if n == 2 assert that we will first consume CR
+                    assert (n == 2 && e > 0) ? tmp.get() == CR : true;
+                    // if n == 1 || n == 2 && e == 2 assert that we then consume LF
+                    assert (n == 1 || e == 2) ? tmp.get() == LF : true;
+                }
+
+                chunkbuf.position(chunkbuf.position() + e);
+                n -= e;
+                bytesToConsume = n;
+            }
+            assert n >= 0;
+            return n;
+        }
+
+        /**
+         * Returns a ByteBuffer containing chunk of data or a "hunk" of data
+         * (a chunk of a chunk if the chunk size is larger than our ByteBuffers).
+         * If the given chunk does not have enough data this method return
+         * an empty ByteBuffer (READMORE).
+         * If we encounter the final chunk (an empty chunk) this method
+         * returns null.
+         */
+        ByteBuffer tryReadOneHunk(ByteBuffer chunk) throws IOException {
+            int unfulfilled = bytesremaining;
+            int toconsume = bytesToConsume;
+            ChunkState st = state;
+            if (st == ChunkState.READING_LENGTH && chunklen == -1) {
+                debug.log(Level.DEBUG, () ->  "Trying to read chunk len"
+                        + " (remaining in buffer:"+chunk.remaining()+")");
+                int clen = chunklen = tryReadChunkLen(chunk);
+                if (clen == -1) return READMORE;
+                debug.log(Level.DEBUG, "Got chunk len %d", clen);
+                cr = false; partialChunklen = 0;
+                unfulfilled = bytesremaining =  clen;
+                if (clen == 0) toconsume = bytesToConsume = 2; // that was the last chunk
+                else st = state = ChunkState.READING_DATA; // read the data
+            }
+
+            if (toconsume > 0) {
+                debug.log(Level.DEBUG,
+                        "Trying to consume bytes: %d (remaining in buffer: %s)",
+                        toconsume, chunk.remaining());
+                if (tryConsumeBytes(chunk) > 0) {
+                    return READMORE;
+                }
+            }
+
+            toconsume = bytesToConsume;
+            assert toconsume == 0;
+
+
+            if (st == ChunkState.READING_LENGTH) {
+                // we will come here only if chunklen was 0, after having
+                // consumed the trailing CRLF
+                int clen = chunklen;
+                assert clen == 0;
+                debug.log(Level.DEBUG, "No more chunks: %d", clen);
+                // the DONE state is not really needed but it helps with
+                // assertions...
+                state = ChunkState.DONE;
+                return null;
+            }
+
+            int clen = chunklen;
+            assert clen > 0;
+            assert st == ChunkState.READING_DATA;
+
+            ByteBuffer returnBuffer = READMORE; // May be a hunk or a chunk
+            if (unfulfilled > 0) {
+                int bytesread = chunk.remaining();
+                debug.log(Level.DEBUG, "Reading chunk: available %d, needed %d",
+                          bytesread, unfulfilled);
+
+                int bytes2return = Math.min(bytesread, unfulfilled);
+                debug.log(Level.DEBUG,  "Returning chunk bytes: %d", bytes2return);
+                returnBuffer = Utils.sliceWithLimitedCapacity(chunk, bytes2return).asReadOnlyBuffer();
+                unfulfilled = bytesremaining -= bytes2return;
+                if (unfulfilled == 0) bytesToConsume = 2;
+            }
+
+            assert unfulfilled >= 0;
+
+            if (unfulfilled == 0) {
+                debug.log(Level.DEBUG,
+                        "No more bytes to read - %d yet to consume.",
+                        unfulfilled);
+                // check whether the trailing CRLF is consumed, try to
+                // consume it if not. If tryConsumeBytes needs more bytes
+                // then we will come back here later - skipping the block
+                // that reads data because remaining==0, and finding
+                // that the two bytes are now consumed.
+                if (tryConsumeBytes(chunk) == 0) {
+                    // we're done for this chunk! reset all states and
+                    // prepare to read the next chunk.
+                    chunklen = -1;
+                    partialChunklen = 0;
+                    cr = false;
+                    state = ChunkState.READING_LENGTH;
+                    debug.log(Level.DEBUG, "Ready to read next chunk");
+                }
+            }
+            if (returnBuffer == READMORE) {
+                debug.log(Level.DEBUG, "Need more data");
+            }
+            return returnBuffer;
+        }
+
+
+        // Attempt to parse and push one hunk from the buffer.
+        // Returns true if the final chunk was parsed.
+        // Returns false if we need to push more chunks.
+        private boolean tryPushOneHunk(ByteBuffer b, List<ByteBuffer> out)
+                throws IOException {
+            assert state != ChunkState.DONE;
+            ByteBuffer b1 = tryReadOneHunk(b);
+            if (b1 != null) {
+                //assert b1.hasRemaining() || b1 == READMORE;
+                if (b1.hasRemaining()) {
+                    debug.log(Level.DEBUG, "Sending chunk to consumer (%d)",
+                              b1.remaining());
+                    out.add(b1);
+                    debug.log(Level.DEBUG, "Chunk sent.");
+                }
+                return false; // we haven't parsed the final chunk yet.
+            } else {
+                return true; // we're done! the final chunk was parsed.
+            }
+        }
+
+        private int toDigit(int b) throws IOException {
+            if (b >= 0x30 && b <= 0x39) {
+                return b - 0x30;
+            }
+            if (b >= 0x41 && b <= 0x46) {
+                return b - 0x41 + 10;
+            }
+            if (b >= 0x61 && b <= 0x66) {
+                return b - 0x61 + 10;
+            }
+            throw new IOException("Invalid chunk header byte " + b);
+        }
+
+    }
+
+    class FixedLengthBodyParser implements BodyParser {
+        final int contentLength;
+        final Consumer<Throwable> onComplete;
+        final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
+        final String dbgTag = ResponseContent.this.dbgTag + "/FixedLengthBodyParser";
+        volatile int remaining;
+        volatile Throwable closedExceptionally;
+        volatile AbstractSubscription sub;
+        FixedLengthBodyParser(int contentLength, Consumer<Throwable> onComplete) {
+            this.contentLength = this.remaining = contentLength;
+            this.onComplete = onComplete;
+        }
+
+        String dbgString() {
+            return dbgTag;
+        }
+
+        @Override
+        public void onSubscribe(AbstractSubscription sub) {
+            debug.log(Level.DEBUG, () -> "length="
+                        + contentLength +", onSubscribe: "
+                        + pusher.getClass().getName());
+            pusher.onSubscribe(this.sub = sub);
+            try {
+                if (contentLength == 0) {
+                    onFinished.run();
+                    pusher.onComplete();
+                    onComplete.accept(null);
+                }
+            } catch (Throwable t) {
+                closedExceptionally = t;
+                try {
+                    pusher.onError(t);
+                } finally {
+                    onComplete.accept(t);
+                }
+            }
+        }
+
+        @Override
+        public void accept(ByteBuffer b) {
+            if (closedExceptionally != null) {
+                debug.log(Level.DEBUG, () -> "already closed: "
+                            + closedExceptionally);
+                return;
+            }
+            boolean completed = false;
+            try {
+                int unfulfilled = remaining;
+                debug.log(Level.DEBUG, "Parser got %d bytes (%d remaining / %d)",
+                        b.remaining(), unfulfilled, contentLength);
+                assert unfulfilled != 0 || contentLength == 0 || b.remaining() == 0;
+
+                if (unfulfilled == 0 && contentLength > 0) return;
+
+                if (b.hasRemaining() && unfulfilled > 0) {
+                    // only reduce demand if we actually push something.
+                    // we would not have come here if there was no
+                    // demand.
+                    boolean hasDemand = sub.demand().tryDecrement();
+                    assert hasDemand;
+                    int amount = Math.min(b.remaining(), unfulfilled);
+                    unfulfilled = remaining -= amount;
+                    ByteBuffer buffer = Utils.sliceWithLimitedCapacity(b, amount);
+                    pusher.onNext(List.of(buffer.asReadOnlyBuffer()));
+                }
+                if (unfulfilled == 0) {
+                    // We're done! All data has been received.
+                    assert closedExceptionally == null;
+                    onFinished.run();
+                    pusher.onComplete();
+                    completed = true;
+                    onComplete.accept(closedExceptionally); // should be null
+                } else {
+                    assert b.remaining() == 0;
+                }
+            } catch (Throwable t) {
+                debug.log(Level.DEBUG, "Unexpected exception", t);
+                closedExceptionally = t;
+                if (!completed) {
+                    onComplete.accept(t);
+                }
+            }
+        }
+    }
+}