--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/java.net.http/share/classes/java/net/http/internal/ResponseContent.java Wed Feb 07 14:17:24 2018 +0000
@@ -0,0 +1,467 @@
+/*
+ * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package java.net.http.internal;
+
+import java.io.IOException;
+import java.lang.System.Logger.Level;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+import java.net.http.HttpHeaders;
+import java.net.http.HttpResponse;
+import java.net.http.internal.common.Utils;
+
+/**
+ * Implements chunked/fixed transfer encodings of HTTP/1.1 responses.
+ *
+ * Call pushBody() to read the body (blocking). Data and errors are provided
+ * to given Consumers. After final buffer delivered, empty optional delivered
+ */
+class ResponseContent {
+
+ static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
+
+ final HttpResponse.BodySubscriber<?> pusher;
+ final int contentLength;
+ final HttpHeaders headers;
+ // this needs to run before we complete the body
+ // so that connection can be returned to pool
+ private final Runnable onFinished;
+ private final String dbgTag;
+
+ ResponseContent(HttpConnection connection,
+ int contentLength,
+ HttpHeaders h,
+ HttpResponse.BodySubscriber<?> userSubscriber,
+ Runnable onFinished)
+ {
+ this.pusher = userSubscriber;
+ this.contentLength = contentLength;
+ this.headers = h;
+ this.onFinished = onFinished;
+ this.dbgTag = connection.dbgString() + "/ResponseContent";
+ }
+
+ static final int LF = 10;
+ static final int CR = 13;
+
+ private boolean chunkedContent, chunkedContentInitialized;
+
+ boolean contentChunked() throws IOException {
+ if (chunkedContentInitialized) {
+ return chunkedContent;
+ }
+ if (contentLength == -1) {
+ String tc = headers.firstValue("Transfer-Encoding")
+ .orElse("");
+ if (!tc.equals("")) {
+ if (tc.equalsIgnoreCase("chunked")) {
+ chunkedContent = true;
+ } else {
+ throw new IOException("invalid content");
+ }
+ } else {
+ chunkedContent = false;
+ }
+ }
+ chunkedContentInitialized = true;
+ return chunkedContent;
+ }
+
+ interface BodyParser extends Consumer<ByteBuffer> {
+ void onSubscribe(AbstractSubscription sub);
+ }
+
+ // Returns a parser that will take care of parsing the received byte
+ // buffers and forward them to the BodySubscriber.
+ // When the parser is done, it will call onComplete.
+ // If parsing was successful, the throwable parameter will be null.
+ // Otherwise it will be the exception that occurred
+ // Note: revisit: it might be better to use a CompletableFuture than
+ // a completion handler.
+ BodyParser getBodyParser(Consumer<Throwable> onComplete)
+ throws IOException {
+ if (contentChunked()) {
+ return new ChunkedBodyParser(onComplete);
+ } else {
+ return new FixedLengthBodyParser(contentLength, onComplete);
+ }
+ }
+
+
+ static enum ChunkState {READING_LENGTH, READING_DATA, DONE}
+ class ChunkedBodyParser implements BodyParser {
+ final ByteBuffer READMORE = Utils.EMPTY_BYTEBUFFER;
+ final Consumer<Throwable> onComplete;
+ final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
+ final String dbgTag = ResponseContent.this.dbgTag + "/ChunkedBodyParser";
+
+ volatile Throwable closedExceptionally;
+ volatile int partialChunklen = 0; // partially read chunk len
+ volatile int chunklen = -1; // number of bytes in chunk
+ volatile int bytesremaining; // number of bytes in chunk left to be read incl CRLF
+ volatile boolean cr = false; // tryReadChunkLength has found CR
+ volatile int bytesToConsume; // number of bytes that still need to be consumed before proceeding
+ volatile ChunkState state = ChunkState.READING_LENGTH; // current state
+ volatile AbstractSubscription sub;
+ ChunkedBodyParser(Consumer<Throwable> onComplete) {
+ this.onComplete = onComplete;
+ }
+
+ String dbgString() {
+ return dbgTag;
+ }
+
+ @Override
+ public void onSubscribe(AbstractSubscription sub) {
+ debug.log(Level.DEBUG, () -> "onSubscribe: "
+ + pusher.getClass().getName());
+ pusher.onSubscribe(this.sub = sub);
+ }
+
+ @Override
+ public void accept(ByteBuffer b) {
+ if (closedExceptionally != null) {
+ debug.log(Level.DEBUG, () -> "already closed: "
+ + closedExceptionally);
+ return;
+ }
+ boolean completed = false;
+ try {
+ List<ByteBuffer> out = new ArrayList<>();
+ do {
+ if (tryPushOneHunk(b, out)) {
+ // We're done! (true if the final chunk was parsed).
+ if (!out.isEmpty()) {
+ // push what we have and complete
+ // only reduce demand if we actually push something.
+ // we would not have come here if there was no
+ // demand.
+ boolean hasDemand = sub.demand().tryDecrement();
+ assert hasDemand;
+ pusher.onNext(Collections.unmodifiableList(out));
+ }
+ debug.log(Level.DEBUG, () -> "done!");
+ assert closedExceptionally == null;
+ assert state == ChunkState.DONE;
+ onFinished.run();
+ pusher.onComplete();
+ completed = true;
+ onComplete.accept(closedExceptionally); // should be null
+ break;
+ }
+ // the buffer may contain several hunks, and therefore
+ // we must loop while it's not exhausted.
+ } while (b.hasRemaining());
+
+ if (!completed && !out.isEmpty()) {
+ // push what we have.
+ // only reduce demand if we actually push something.
+ // we would not have come here if there was no
+ // demand.
+ boolean hasDemand = sub.demand().tryDecrement();
+ assert hasDemand;
+ pusher.onNext(Collections.unmodifiableList(out));
+ }
+ assert state == ChunkState.DONE || !b.hasRemaining();
+ } catch(Throwable t) {
+ closedExceptionally = t;
+ if (!completed) onComplete.accept(t);
+ }
+ }
+
+ // reads and returns chunklen. Position of chunkbuf is first byte
+ // of chunk on return. chunklen includes the CR LF at end of chunk
+ // returns -1 if needs more bytes
+ private int tryReadChunkLen(ByteBuffer chunkbuf) throws IOException {
+ assert state == ChunkState.READING_LENGTH;
+ while (chunkbuf.hasRemaining()) {
+ int c = chunkbuf.get();
+ if (cr) {
+ if (c == LF) {
+ return partialChunklen;
+ } else {
+ throw new IOException("invalid chunk header");
+ }
+ }
+ if (c == CR) {
+ cr = true;
+ } else {
+ int digit = toDigit(c);
+ partialChunklen = partialChunklen * 16 + digit;
+ }
+ }
+ return -1;
+ }
+
+
+ // try to consume as many bytes as specified by bytesToConsume.
+ // returns the number of bytes that still need to be consumed.
+ // In practice this method is only called to consume one CRLF pair
+ // with bytesToConsume set to 2, so it will only return 0 (if completed),
+ // 1, or 2 (if chunkbuf doesn't have the 2 chars).
+ private int tryConsumeBytes(ByteBuffer chunkbuf) throws IOException {
+ int n = bytesToConsume;
+ if (n > 0) {
+ int e = Math.min(chunkbuf.remaining(), n);
+
+ // verifies some assertions
+ // this methods is called only to consume CRLF
+ if (Utils.ASSERTIONSENABLED) {
+ assert n <= 2 && e <= 2;
+ ByteBuffer tmp = chunkbuf.slice();
+ // if n == 2 assert that we will first consume CR
+ assert (n == 2 && e > 0) ? tmp.get() == CR : true;
+ // if n == 1 || n == 2 && e == 2 assert that we then consume LF
+ assert (n == 1 || e == 2) ? tmp.get() == LF : true;
+ }
+
+ chunkbuf.position(chunkbuf.position() + e);
+ n -= e;
+ bytesToConsume = n;
+ }
+ assert n >= 0;
+ return n;
+ }
+
+ /**
+ * Returns a ByteBuffer containing chunk of data or a "hunk" of data
+ * (a chunk of a chunk if the chunk size is larger than our ByteBuffers).
+ * If the given chunk does not have enough data this method return
+ * an empty ByteBuffer (READMORE).
+ * If we encounter the final chunk (an empty chunk) this method
+ * returns null.
+ */
+ ByteBuffer tryReadOneHunk(ByteBuffer chunk) throws IOException {
+ int unfulfilled = bytesremaining;
+ int toconsume = bytesToConsume;
+ ChunkState st = state;
+ if (st == ChunkState.READING_LENGTH && chunklen == -1) {
+ debug.log(Level.DEBUG, () -> "Trying to read chunk len"
+ + " (remaining in buffer:"+chunk.remaining()+")");
+ int clen = chunklen = tryReadChunkLen(chunk);
+ if (clen == -1) return READMORE;
+ debug.log(Level.DEBUG, "Got chunk len %d", clen);
+ cr = false; partialChunklen = 0;
+ unfulfilled = bytesremaining = clen;
+ if (clen == 0) toconsume = bytesToConsume = 2; // that was the last chunk
+ else st = state = ChunkState.READING_DATA; // read the data
+ }
+
+ if (toconsume > 0) {
+ debug.log(Level.DEBUG,
+ "Trying to consume bytes: %d (remaining in buffer: %s)",
+ toconsume, chunk.remaining());
+ if (tryConsumeBytes(chunk) > 0) {
+ return READMORE;
+ }
+ }
+
+ toconsume = bytesToConsume;
+ assert toconsume == 0;
+
+
+ if (st == ChunkState.READING_LENGTH) {
+ // we will come here only if chunklen was 0, after having
+ // consumed the trailing CRLF
+ int clen = chunklen;
+ assert clen == 0;
+ debug.log(Level.DEBUG, "No more chunks: %d", clen);
+ // the DONE state is not really needed but it helps with
+ // assertions...
+ state = ChunkState.DONE;
+ return null;
+ }
+
+ int clen = chunklen;
+ assert clen > 0;
+ assert st == ChunkState.READING_DATA;
+
+ ByteBuffer returnBuffer = READMORE; // May be a hunk or a chunk
+ if (unfulfilled > 0) {
+ int bytesread = chunk.remaining();
+ debug.log(Level.DEBUG, "Reading chunk: available %d, needed %d",
+ bytesread, unfulfilled);
+
+ int bytes2return = Math.min(bytesread, unfulfilled);
+ debug.log(Level.DEBUG, "Returning chunk bytes: %d", bytes2return);
+ returnBuffer = Utils.sliceWithLimitedCapacity(chunk, bytes2return).asReadOnlyBuffer();
+ unfulfilled = bytesremaining -= bytes2return;
+ if (unfulfilled == 0) bytesToConsume = 2;
+ }
+
+ assert unfulfilled >= 0;
+
+ if (unfulfilled == 0) {
+ debug.log(Level.DEBUG,
+ "No more bytes to read - %d yet to consume.",
+ unfulfilled);
+ // check whether the trailing CRLF is consumed, try to
+ // consume it if not. If tryConsumeBytes needs more bytes
+ // then we will come back here later - skipping the block
+ // that reads data because remaining==0, and finding
+ // that the two bytes are now consumed.
+ if (tryConsumeBytes(chunk) == 0) {
+ // we're done for this chunk! reset all states and
+ // prepare to read the next chunk.
+ chunklen = -1;
+ partialChunklen = 0;
+ cr = false;
+ state = ChunkState.READING_LENGTH;
+ debug.log(Level.DEBUG, "Ready to read next chunk");
+ }
+ }
+ if (returnBuffer == READMORE) {
+ debug.log(Level.DEBUG, "Need more data");
+ }
+ return returnBuffer;
+ }
+
+
+ // Attempt to parse and push one hunk from the buffer.
+ // Returns true if the final chunk was parsed.
+ // Returns false if we need to push more chunks.
+ private boolean tryPushOneHunk(ByteBuffer b, List<ByteBuffer> out)
+ throws IOException {
+ assert state != ChunkState.DONE;
+ ByteBuffer b1 = tryReadOneHunk(b);
+ if (b1 != null) {
+ //assert b1.hasRemaining() || b1 == READMORE;
+ if (b1.hasRemaining()) {
+ debug.log(Level.DEBUG, "Sending chunk to consumer (%d)",
+ b1.remaining());
+ out.add(b1);
+ debug.log(Level.DEBUG, "Chunk sent.");
+ }
+ return false; // we haven't parsed the final chunk yet.
+ } else {
+ return true; // we're done! the final chunk was parsed.
+ }
+ }
+
+ private int toDigit(int b) throws IOException {
+ if (b >= 0x30 && b <= 0x39) {
+ return b - 0x30;
+ }
+ if (b >= 0x41 && b <= 0x46) {
+ return b - 0x41 + 10;
+ }
+ if (b >= 0x61 && b <= 0x66) {
+ return b - 0x61 + 10;
+ }
+ throw new IOException("Invalid chunk header byte " + b);
+ }
+
+ }
+
+ class FixedLengthBodyParser implements BodyParser {
+ final int contentLength;
+ final Consumer<Throwable> onComplete;
+ final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
+ final String dbgTag = ResponseContent.this.dbgTag + "/FixedLengthBodyParser";
+ volatile int remaining;
+ volatile Throwable closedExceptionally;
+ volatile AbstractSubscription sub;
+ FixedLengthBodyParser(int contentLength, Consumer<Throwable> onComplete) {
+ this.contentLength = this.remaining = contentLength;
+ this.onComplete = onComplete;
+ }
+
+ String dbgString() {
+ return dbgTag;
+ }
+
+ @Override
+ public void onSubscribe(AbstractSubscription sub) {
+ debug.log(Level.DEBUG, () -> "length="
+ + contentLength +", onSubscribe: "
+ + pusher.getClass().getName());
+ pusher.onSubscribe(this.sub = sub);
+ try {
+ if (contentLength == 0) {
+ onFinished.run();
+ pusher.onComplete();
+ onComplete.accept(null);
+ }
+ } catch (Throwable t) {
+ closedExceptionally = t;
+ try {
+ pusher.onError(t);
+ } finally {
+ onComplete.accept(t);
+ }
+ }
+ }
+
+ @Override
+ public void accept(ByteBuffer b) {
+ if (closedExceptionally != null) {
+ debug.log(Level.DEBUG, () -> "already closed: "
+ + closedExceptionally);
+ return;
+ }
+ boolean completed = false;
+ try {
+ int unfulfilled = remaining;
+ debug.log(Level.DEBUG, "Parser got %d bytes (%d remaining / %d)",
+ b.remaining(), unfulfilled, contentLength);
+ assert unfulfilled != 0 || contentLength == 0 || b.remaining() == 0;
+
+ if (unfulfilled == 0 && contentLength > 0) return;
+
+ if (b.hasRemaining() && unfulfilled > 0) {
+ // only reduce demand if we actually push something.
+ // we would not have come here if there was no
+ // demand.
+ boolean hasDemand = sub.demand().tryDecrement();
+ assert hasDemand;
+ int amount = Math.min(b.remaining(), unfulfilled);
+ unfulfilled = remaining -= amount;
+ ByteBuffer buffer = Utils.sliceWithLimitedCapacity(b, amount);
+ pusher.onNext(List.of(buffer.asReadOnlyBuffer()));
+ }
+ if (unfulfilled == 0) {
+ // We're done! All data has been received.
+ assert closedExceptionally == null;
+ onFinished.run();
+ pusher.onComplete();
+ completed = true;
+ onComplete.accept(closedExceptionally); // should be null
+ } else {
+ assert b.remaining() == 0;
+ }
+ } catch (Throwable t) {
+ debug.log(Level.DEBUG, "Unexpected exception", t);
+ closedExceptionally = t;
+ if (!completed) {
+ onComplete.accept(t);
+ }
+ }
+ }
+ }
+}