--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java Tue Feb 06 11:39:55 2018 +0000
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,465 +0,0 @@
-/*
- * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation. Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.incubator.http;
-
-import java.io.IOException;
-import java.lang.System.Logger.Level;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.function.Consumer;
-import jdk.incubator.http.internal.common.Utils;
-
-/**
- * Implements chunked/fixed transfer encodings of HTTP/1.1 responses.
- *
- * Call pushBody() to read the body (blocking). Data and errors are provided
- * to given Consumers. After final buffer delivered, empty optional delivered
- */
-class ResponseContent {
-
- static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
-
- final HttpResponse.BodySubscriber<?> pusher;
- final int contentLength;
- final HttpHeaders headers;
- // this needs to run before we complete the body
- // so that connection can be returned to pool
- private final Runnable onFinished;
- private final String dbgTag;
-
- ResponseContent(HttpConnection connection,
- int contentLength,
- HttpHeaders h,
- HttpResponse.BodySubscriber<?> userSubscriber,
- Runnable onFinished)
- {
- this.pusher = userSubscriber;
- this.contentLength = contentLength;
- this.headers = h;
- this.onFinished = onFinished;
- this.dbgTag = connection.dbgString() + "/ResponseContent";
- }
-
- static final int LF = 10;
- static final int CR = 13;
-
- private boolean chunkedContent, chunkedContentInitialized;
-
- boolean contentChunked() throws IOException {
- if (chunkedContentInitialized) {
- return chunkedContent;
- }
- if (contentLength == -1) {
- String tc = headers.firstValue("Transfer-Encoding")
- .orElse("");
- if (!tc.equals("")) {
- if (tc.equalsIgnoreCase("chunked")) {
- chunkedContent = true;
- } else {
- throw new IOException("invalid content");
- }
- } else {
- chunkedContent = false;
- }
- }
- chunkedContentInitialized = true;
- return chunkedContent;
- }
-
- interface BodyParser extends Consumer<ByteBuffer> {
- void onSubscribe(AbstractSubscription sub);
- }
-
- // Returns a parser that will take care of parsing the received byte
- // buffers and forward them to the BodySubscriber.
- // When the parser is done, it will call onComplete.
- // If parsing was successful, the throwable parameter will be null.
- // Otherwise it will be the exception that occurred
- // Note: revisit: it might be better to use a CompletableFuture than
- // a completion handler.
- BodyParser getBodyParser(Consumer<Throwable> onComplete)
- throws IOException {
- if (contentChunked()) {
- return new ChunkedBodyParser(onComplete);
- } else {
- return new FixedLengthBodyParser(contentLength, onComplete);
- }
- }
-
-
- static enum ChunkState {READING_LENGTH, READING_DATA, DONE}
- class ChunkedBodyParser implements BodyParser {
- final ByteBuffer READMORE = Utils.EMPTY_BYTEBUFFER;
- final Consumer<Throwable> onComplete;
- final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
- final String dbgTag = ResponseContent.this.dbgTag + "/ChunkedBodyParser";
-
- volatile Throwable closedExceptionally;
- volatile int partialChunklen = 0; // partially read chunk len
- volatile int chunklen = -1; // number of bytes in chunk
- volatile int bytesremaining; // number of bytes in chunk left to be read incl CRLF
- volatile boolean cr = false; // tryReadChunkLength has found CR
- volatile int bytesToConsume; // number of bytes that still need to be consumed before proceeding
- volatile ChunkState state = ChunkState.READING_LENGTH; // current state
- volatile AbstractSubscription sub;
- ChunkedBodyParser(Consumer<Throwable> onComplete) {
- this.onComplete = onComplete;
- }
-
- String dbgString() {
- return dbgTag;
- }
-
- @Override
- public void onSubscribe(AbstractSubscription sub) {
- debug.log(Level.DEBUG, () -> "onSubscribe: "
- + pusher.getClass().getName());
- pusher.onSubscribe(this.sub = sub);
- }
-
- @Override
- public void accept(ByteBuffer b) {
- if (closedExceptionally != null) {
- debug.log(Level.DEBUG, () -> "already closed: "
- + closedExceptionally);
- return;
- }
- boolean completed = false;
- try {
- List<ByteBuffer> out = new ArrayList<>();
- do {
- if (tryPushOneHunk(b, out)) {
- // We're done! (true if the final chunk was parsed).
- if (!out.isEmpty()) {
- // push what we have and complete
- // only reduce demand if we actually push something.
- // we would not have come here if there was no
- // demand.
- boolean hasDemand = sub.demand().tryDecrement();
- assert hasDemand;
- pusher.onNext(Collections.unmodifiableList(out));
- }
- debug.log(Level.DEBUG, () -> "done!");
- assert closedExceptionally == null;
- assert state == ChunkState.DONE;
- onFinished.run();
- pusher.onComplete();
- completed = true;
- onComplete.accept(closedExceptionally); // should be null
- break;
- }
- // the buffer may contain several hunks, and therefore
- // we must loop while it's not exhausted.
- } while (b.hasRemaining());
-
- if (!completed && !out.isEmpty()) {
- // push what we have.
- // only reduce demand if we actually push something.
- // we would not have come here if there was no
- // demand.
- boolean hasDemand = sub.demand().tryDecrement();
- assert hasDemand;
- pusher.onNext(Collections.unmodifiableList(out));
- }
- assert state == ChunkState.DONE || !b.hasRemaining();
- } catch(Throwable t) {
- closedExceptionally = t;
- if (!completed) onComplete.accept(t);
- }
- }
-
- // reads and returns chunklen. Position of chunkbuf is first byte
- // of chunk on return. chunklen includes the CR LF at end of chunk
- // returns -1 if needs more bytes
- private int tryReadChunkLen(ByteBuffer chunkbuf) throws IOException {
- assert state == ChunkState.READING_LENGTH;
- while (chunkbuf.hasRemaining()) {
- int c = chunkbuf.get();
- if (cr) {
- if (c == LF) {
- return partialChunklen;
- } else {
- throw new IOException("invalid chunk header");
- }
- }
- if (c == CR) {
- cr = true;
- } else {
- int digit = toDigit(c);
- partialChunklen = partialChunklen * 16 + digit;
- }
- }
- return -1;
- }
-
-
- // try to consume as many bytes as specified by bytesToConsume.
- // returns the number of bytes that still need to be consumed.
- // In practice this method is only called to consume one CRLF pair
- // with bytesToConsume set to 2, so it will only return 0 (if completed),
- // 1, or 2 (if chunkbuf doesn't have the 2 chars).
- private int tryConsumeBytes(ByteBuffer chunkbuf) throws IOException {
- int n = bytesToConsume;
- if (n > 0) {
- int e = Math.min(chunkbuf.remaining(), n);
-
- // verifies some assertions
- // this methods is called only to consume CRLF
- if (Utils.ASSERTIONSENABLED) {
- assert n <= 2 && e <= 2;
- ByteBuffer tmp = chunkbuf.slice();
- // if n == 2 assert that we will first consume CR
- assert (n == 2 && e > 0) ? tmp.get() == CR : true;
- // if n == 1 || n == 2 && e == 2 assert that we then consume LF
- assert (n == 1 || e == 2) ? tmp.get() == LF : true;
- }
-
- chunkbuf.position(chunkbuf.position() + e);
- n -= e;
- bytesToConsume = n;
- }
- assert n >= 0;
- return n;
- }
-
- /**
- * Returns a ByteBuffer containing chunk of data or a "hunk" of data
- * (a chunk of a chunk if the chunk size is larger than our ByteBuffers).
- * If the given chunk does not have enough data this method return
- * an empty ByteBuffer (READMORE).
- * If we encounter the final chunk (an empty chunk) this method
- * returns null.
- */
- ByteBuffer tryReadOneHunk(ByteBuffer chunk) throws IOException {
- int unfulfilled = bytesremaining;
- int toconsume = bytesToConsume;
- ChunkState st = state;
- if (st == ChunkState.READING_LENGTH && chunklen == -1) {
- debug.log(Level.DEBUG, () -> "Trying to read chunk len"
- + " (remaining in buffer:"+chunk.remaining()+")");
- int clen = chunklen = tryReadChunkLen(chunk);
- if (clen == -1) return READMORE;
- debug.log(Level.DEBUG, "Got chunk len %d", clen);
- cr = false; partialChunklen = 0;
- unfulfilled = bytesremaining = clen;
- if (clen == 0) toconsume = bytesToConsume = 2; // that was the last chunk
- else st = state = ChunkState.READING_DATA; // read the data
- }
-
- if (toconsume > 0) {
- debug.log(Level.DEBUG,
- "Trying to consume bytes: %d (remaining in buffer: %s)",
- toconsume, chunk.remaining());
- if (tryConsumeBytes(chunk) > 0) {
- return READMORE;
- }
- }
-
- toconsume = bytesToConsume;
- assert toconsume == 0;
-
-
- if (st == ChunkState.READING_LENGTH) {
- // we will come here only if chunklen was 0, after having
- // consumed the trailing CRLF
- int clen = chunklen;
- assert clen == 0;
- debug.log(Level.DEBUG, "No more chunks: %d", clen);
- // the DONE state is not really needed but it helps with
- // assertions...
- state = ChunkState.DONE;
- return null;
- }
-
- int clen = chunklen;
- assert clen > 0;
- assert st == ChunkState.READING_DATA;
-
- ByteBuffer returnBuffer = READMORE; // May be a hunk or a chunk
- if (unfulfilled > 0) {
- int bytesread = chunk.remaining();
- debug.log(Level.DEBUG, "Reading chunk: available %d, needed %d",
- bytesread, unfulfilled);
-
- int bytes2return = Math.min(bytesread, unfulfilled);
- debug.log(Level.DEBUG, "Returning chunk bytes: %d", bytes2return);
- returnBuffer = Utils.sliceWithLimitedCapacity(chunk, bytes2return).asReadOnlyBuffer();
- unfulfilled = bytesremaining -= bytes2return;
- if (unfulfilled == 0) bytesToConsume = 2;
- }
-
- assert unfulfilled >= 0;
-
- if (unfulfilled == 0) {
- debug.log(Level.DEBUG,
- "No more bytes to read - %d yet to consume.",
- unfulfilled);
- // check whether the trailing CRLF is consumed, try to
- // consume it if not. If tryConsumeBytes needs more bytes
- // then we will come back here later - skipping the block
- // that reads data because remaining==0, and finding
- // that the two bytes are now consumed.
- if (tryConsumeBytes(chunk) == 0) {
- // we're done for this chunk! reset all states and
- // prepare to read the next chunk.
- chunklen = -1;
- partialChunklen = 0;
- cr = false;
- state = ChunkState.READING_LENGTH;
- debug.log(Level.DEBUG, "Ready to read next chunk");
- }
- }
- if (returnBuffer == READMORE) {
- debug.log(Level.DEBUG, "Need more data");
- }
- return returnBuffer;
- }
-
-
- // Attempt to parse and push one hunk from the buffer.
- // Returns true if the final chunk was parsed.
- // Returns false if we need to push more chunks.
- private boolean tryPushOneHunk(ByteBuffer b, List<ByteBuffer> out)
- throws IOException {
- assert state != ChunkState.DONE;
- ByteBuffer b1 = tryReadOneHunk(b);
- if (b1 != null) {
- //assert b1.hasRemaining() || b1 == READMORE;
- if (b1.hasRemaining()) {
- debug.log(Level.DEBUG, "Sending chunk to consumer (%d)",
- b1.remaining());
- out.add(b1);
- debug.log(Level.DEBUG, "Chunk sent.");
- }
- return false; // we haven't parsed the final chunk yet.
- } else {
- return true; // we're done! the final chunk was parsed.
- }
- }
-
- private int toDigit(int b) throws IOException {
- if (b >= 0x30 && b <= 0x39) {
- return b - 0x30;
- }
- if (b >= 0x41 && b <= 0x46) {
- return b - 0x41 + 10;
- }
- if (b >= 0x61 && b <= 0x66) {
- return b - 0x61 + 10;
- }
- throw new IOException("Invalid chunk header byte " + b);
- }
-
- }
-
- class FixedLengthBodyParser implements BodyParser {
- final int contentLength;
- final Consumer<Throwable> onComplete;
- final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
- final String dbgTag = ResponseContent.this.dbgTag + "/FixedLengthBodyParser";
- volatile int remaining;
- volatile Throwable closedExceptionally;
- volatile AbstractSubscription sub;
- FixedLengthBodyParser(int contentLength, Consumer<Throwable> onComplete) {
- this.contentLength = this.remaining = contentLength;
- this.onComplete = onComplete;
- }
-
- String dbgString() {
- return dbgTag;
- }
-
- @Override
- public void onSubscribe(AbstractSubscription sub) {
- debug.log(Level.DEBUG, () -> "length="
- + contentLength +", onSubscribe: "
- + pusher.getClass().getName());
- pusher.onSubscribe(this.sub = sub);
- try {
- if (contentLength == 0) {
- onFinished.run();
- pusher.onComplete();
- onComplete.accept(null);
- }
- } catch (Throwable t) {
- closedExceptionally = t;
- try {
- pusher.onError(t);
- } finally {
- onComplete.accept(t);
- }
- }
- }
-
- @Override
- public void accept(ByteBuffer b) {
- if (closedExceptionally != null) {
- debug.log(Level.DEBUG, () -> "already closed: "
- + closedExceptionally);
- return;
- }
- boolean completed = false;
- try {
- int unfulfilled = remaining;
- debug.log(Level.DEBUG, "Parser got %d bytes (%d remaining / %d)",
- b.remaining(), unfulfilled, contentLength);
- assert unfulfilled != 0 || contentLength == 0 || b.remaining() == 0;
-
- if (unfulfilled == 0 && contentLength > 0) return;
-
- if (b.hasRemaining() && unfulfilled > 0) {
- // only reduce demand if we actually push something.
- // we would not have come here if there was no
- // demand.
- boolean hasDemand = sub.demand().tryDecrement();
- assert hasDemand;
- int amount = Math.min(b.remaining(), unfulfilled);
- unfulfilled = remaining -= amount;
- ByteBuffer buffer = Utils.sliceWithLimitedCapacity(b, amount);
- pusher.onNext(List.of(buffer.asReadOnlyBuffer()));
- }
- if (unfulfilled == 0) {
- // We're done! All data has been received.
- assert closedExceptionally == null;
- onFinished.run();
- pusher.onComplete();
- completed = true;
- onComplete.accept(closedExceptionally); // should be null
- } else {
- assert b.remaining() == 0;
- }
- } catch (Throwable t) {
- debug.log(Level.DEBUG, "Unexpected exception", t);
- closedExceptionally = t;
- if (!completed) {
- onComplete.accept(t);
- }
- }
- }
- }
-}