diff -r 38fac6d0521d -r 42208b2f224e src/java.net.http/share/classes/java/net/http/internal/ResponseContent.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/java.net.http/share/classes/java/net/http/internal/ResponseContent.java Wed Feb 07 14:17:24 2018 +0000 @@ -0,0 +1,467 @@ +/* + * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +package java.net.http.internal; + +import java.io.IOException; +import java.lang.System.Logger.Level; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; +import java.net.http.HttpHeaders; +import java.net.http.HttpResponse; +import java.net.http.internal.common.Utils; + +/** + * Implements chunked/fixed transfer encodings of HTTP/1.1 responses. + * + * Call pushBody() to read the body (blocking). Data and errors are provided + * to given Consumers. After final buffer delivered, empty optional delivered + */ +class ResponseContent { + + static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. + + final HttpResponse.BodySubscriber pusher; + final int contentLength; + final HttpHeaders headers; + // this needs to run before we complete the body + // so that connection can be returned to pool + private final Runnable onFinished; + private final String dbgTag; + + ResponseContent(HttpConnection connection, + int contentLength, + HttpHeaders h, + HttpResponse.BodySubscriber userSubscriber, + Runnable onFinished) + { + this.pusher = userSubscriber; + this.contentLength = contentLength; + this.headers = h; + this.onFinished = onFinished; + this.dbgTag = connection.dbgString() + "/ResponseContent"; + } + + static final int LF = 10; + static final int CR = 13; + + private boolean chunkedContent, chunkedContentInitialized; + + boolean contentChunked() throws IOException { + if (chunkedContentInitialized) { + return chunkedContent; + } + if (contentLength == -1) { + String tc = headers.firstValue("Transfer-Encoding") + .orElse(""); + if (!tc.equals("")) { + if (tc.equalsIgnoreCase("chunked")) { + chunkedContent = true; + } else { + throw new IOException("invalid content"); + } + } else { + chunkedContent = false; + } + } + chunkedContentInitialized = true; + return chunkedContent; + } + + interface BodyParser extends Consumer { + void onSubscribe(AbstractSubscription sub); + } + + // Returns a parser that will take care of parsing the received byte + // buffers and forward them to the BodySubscriber. + // When the parser is done, it will call onComplete. + // If parsing was successful, the throwable parameter will be null. + // Otherwise it will be the exception that occurred + // Note: revisit: it might be better to use a CompletableFuture than + // a completion handler. + BodyParser getBodyParser(Consumer onComplete) + throws IOException { + if (contentChunked()) { + return new ChunkedBodyParser(onComplete); + } else { + return new FixedLengthBodyParser(contentLength, onComplete); + } + } + + + static enum ChunkState {READING_LENGTH, READING_DATA, DONE} + class ChunkedBodyParser implements BodyParser { + final ByteBuffer READMORE = Utils.EMPTY_BYTEBUFFER; + final Consumer onComplete; + final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); + final String dbgTag = ResponseContent.this.dbgTag + "/ChunkedBodyParser"; + + volatile Throwable closedExceptionally; + volatile int partialChunklen = 0; // partially read chunk len + volatile int chunklen = -1; // number of bytes in chunk + volatile int bytesremaining; // number of bytes in chunk left to be read incl CRLF + volatile boolean cr = false; // tryReadChunkLength has found CR + volatile int bytesToConsume; // number of bytes that still need to be consumed before proceeding + volatile ChunkState state = ChunkState.READING_LENGTH; // current state + volatile AbstractSubscription sub; + ChunkedBodyParser(Consumer onComplete) { + this.onComplete = onComplete; + } + + String dbgString() { + return dbgTag; + } + + @Override + public void onSubscribe(AbstractSubscription sub) { + debug.log(Level.DEBUG, () -> "onSubscribe: " + + pusher.getClass().getName()); + pusher.onSubscribe(this.sub = sub); + } + + @Override + public void accept(ByteBuffer b) { + if (closedExceptionally != null) { + debug.log(Level.DEBUG, () -> "already closed: " + + closedExceptionally); + return; + } + boolean completed = false; + try { + List out = new ArrayList<>(); + do { + if (tryPushOneHunk(b, out)) { + // We're done! (true if the final chunk was parsed). + if (!out.isEmpty()) { + // push what we have and complete + // only reduce demand if we actually push something. + // we would not have come here if there was no + // demand. + boolean hasDemand = sub.demand().tryDecrement(); + assert hasDemand; + pusher.onNext(Collections.unmodifiableList(out)); + } + debug.log(Level.DEBUG, () -> "done!"); + assert closedExceptionally == null; + assert state == ChunkState.DONE; + onFinished.run(); + pusher.onComplete(); + completed = true; + onComplete.accept(closedExceptionally); // should be null + break; + } + // the buffer may contain several hunks, and therefore + // we must loop while it's not exhausted. + } while (b.hasRemaining()); + + if (!completed && !out.isEmpty()) { + // push what we have. + // only reduce demand if we actually push something. + // we would not have come here if there was no + // demand. + boolean hasDemand = sub.demand().tryDecrement(); + assert hasDemand; + pusher.onNext(Collections.unmodifiableList(out)); + } + assert state == ChunkState.DONE || !b.hasRemaining(); + } catch(Throwable t) { + closedExceptionally = t; + if (!completed) onComplete.accept(t); + } + } + + // reads and returns chunklen. Position of chunkbuf is first byte + // of chunk on return. chunklen includes the CR LF at end of chunk + // returns -1 if needs more bytes + private int tryReadChunkLen(ByteBuffer chunkbuf) throws IOException { + assert state == ChunkState.READING_LENGTH; + while (chunkbuf.hasRemaining()) { + int c = chunkbuf.get(); + if (cr) { + if (c == LF) { + return partialChunklen; + } else { + throw new IOException("invalid chunk header"); + } + } + if (c == CR) { + cr = true; + } else { + int digit = toDigit(c); + partialChunklen = partialChunklen * 16 + digit; + } + } + return -1; + } + + + // try to consume as many bytes as specified by bytesToConsume. + // returns the number of bytes that still need to be consumed. + // In practice this method is only called to consume one CRLF pair + // with bytesToConsume set to 2, so it will only return 0 (if completed), + // 1, or 2 (if chunkbuf doesn't have the 2 chars). + private int tryConsumeBytes(ByteBuffer chunkbuf) throws IOException { + int n = bytesToConsume; + if (n > 0) { + int e = Math.min(chunkbuf.remaining(), n); + + // verifies some assertions + // this methods is called only to consume CRLF + if (Utils.ASSERTIONSENABLED) { + assert n <= 2 && e <= 2; + ByteBuffer tmp = chunkbuf.slice(); + // if n == 2 assert that we will first consume CR + assert (n == 2 && e > 0) ? tmp.get() == CR : true; + // if n == 1 || n == 2 && e == 2 assert that we then consume LF + assert (n == 1 || e == 2) ? tmp.get() == LF : true; + } + + chunkbuf.position(chunkbuf.position() + e); + n -= e; + bytesToConsume = n; + } + assert n >= 0; + return n; + } + + /** + * Returns a ByteBuffer containing chunk of data or a "hunk" of data + * (a chunk of a chunk if the chunk size is larger than our ByteBuffers). + * If the given chunk does not have enough data this method return + * an empty ByteBuffer (READMORE). + * If we encounter the final chunk (an empty chunk) this method + * returns null. + */ + ByteBuffer tryReadOneHunk(ByteBuffer chunk) throws IOException { + int unfulfilled = bytesremaining; + int toconsume = bytesToConsume; + ChunkState st = state; + if (st == ChunkState.READING_LENGTH && chunklen == -1) { + debug.log(Level.DEBUG, () -> "Trying to read chunk len" + + " (remaining in buffer:"+chunk.remaining()+")"); + int clen = chunklen = tryReadChunkLen(chunk); + if (clen == -1) return READMORE; + debug.log(Level.DEBUG, "Got chunk len %d", clen); + cr = false; partialChunklen = 0; + unfulfilled = bytesremaining = clen; + if (clen == 0) toconsume = bytesToConsume = 2; // that was the last chunk + else st = state = ChunkState.READING_DATA; // read the data + } + + if (toconsume > 0) { + debug.log(Level.DEBUG, + "Trying to consume bytes: %d (remaining in buffer: %s)", + toconsume, chunk.remaining()); + if (tryConsumeBytes(chunk) > 0) { + return READMORE; + } + } + + toconsume = bytesToConsume; + assert toconsume == 0; + + + if (st == ChunkState.READING_LENGTH) { + // we will come here only if chunklen was 0, after having + // consumed the trailing CRLF + int clen = chunklen; + assert clen == 0; + debug.log(Level.DEBUG, "No more chunks: %d", clen); + // the DONE state is not really needed but it helps with + // assertions... + state = ChunkState.DONE; + return null; + } + + int clen = chunklen; + assert clen > 0; + assert st == ChunkState.READING_DATA; + + ByteBuffer returnBuffer = READMORE; // May be a hunk or a chunk + if (unfulfilled > 0) { + int bytesread = chunk.remaining(); + debug.log(Level.DEBUG, "Reading chunk: available %d, needed %d", + bytesread, unfulfilled); + + int bytes2return = Math.min(bytesread, unfulfilled); + debug.log(Level.DEBUG, "Returning chunk bytes: %d", bytes2return); + returnBuffer = Utils.sliceWithLimitedCapacity(chunk, bytes2return).asReadOnlyBuffer(); + unfulfilled = bytesremaining -= bytes2return; + if (unfulfilled == 0) bytesToConsume = 2; + } + + assert unfulfilled >= 0; + + if (unfulfilled == 0) { + debug.log(Level.DEBUG, + "No more bytes to read - %d yet to consume.", + unfulfilled); + // check whether the trailing CRLF is consumed, try to + // consume it if not. If tryConsumeBytes needs more bytes + // then we will come back here later - skipping the block + // that reads data because remaining==0, and finding + // that the two bytes are now consumed. + if (tryConsumeBytes(chunk) == 0) { + // we're done for this chunk! reset all states and + // prepare to read the next chunk. + chunklen = -1; + partialChunklen = 0; + cr = false; + state = ChunkState.READING_LENGTH; + debug.log(Level.DEBUG, "Ready to read next chunk"); + } + } + if (returnBuffer == READMORE) { + debug.log(Level.DEBUG, "Need more data"); + } + return returnBuffer; + } + + + // Attempt to parse and push one hunk from the buffer. + // Returns true if the final chunk was parsed. + // Returns false if we need to push more chunks. + private boolean tryPushOneHunk(ByteBuffer b, List out) + throws IOException { + assert state != ChunkState.DONE; + ByteBuffer b1 = tryReadOneHunk(b); + if (b1 != null) { + //assert b1.hasRemaining() || b1 == READMORE; + if (b1.hasRemaining()) { + debug.log(Level.DEBUG, "Sending chunk to consumer (%d)", + b1.remaining()); + out.add(b1); + debug.log(Level.DEBUG, "Chunk sent."); + } + return false; // we haven't parsed the final chunk yet. + } else { + return true; // we're done! the final chunk was parsed. + } + } + + private int toDigit(int b) throws IOException { + if (b >= 0x30 && b <= 0x39) { + return b - 0x30; + } + if (b >= 0x41 && b <= 0x46) { + return b - 0x41 + 10; + } + if (b >= 0x61 && b <= 0x66) { + return b - 0x61 + 10; + } + throw new IOException("Invalid chunk header byte " + b); + } + + } + + class FixedLengthBodyParser implements BodyParser { + final int contentLength; + final Consumer onComplete; + final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); + final String dbgTag = ResponseContent.this.dbgTag + "/FixedLengthBodyParser"; + volatile int remaining; + volatile Throwable closedExceptionally; + volatile AbstractSubscription sub; + FixedLengthBodyParser(int contentLength, Consumer onComplete) { + this.contentLength = this.remaining = contentLength; + this.onComplete = onComplete; + } + + String dbgString() { + return dbgTag; + } + + @Override + public void onSubscribe(AbstractSubscription sub) { + debug.log(Level.DEBUG, () -> "length=" + + contentLength +", onSubscribe: " + + pusher.getClass().getName()); + pusher.onSubscribe(this.sub = sub); + try { + if (contentLength == 0) { + onFinished.run(); + pusher.onComplete(); + onComplete.accept(null); + } + } catch (Throwable t) { + closedExceptionally = t; + try { + pusher.onError(t); + } finally { + onComplete.accept(t); + } + } + } + + @Override + public void accept(ByteBuffer b) { + if (closedExceptionally != null) { + debug.log(Level.DEBUG, () -> "already closed: " + + closedExceptionally); + return; + } + boolean completed = false; + try { + int unfulfilled = remaining; + debug.log(Level.DEBUG, "Parser got %d bytes (%d remaining / %d)", + b.remaining(), unfulfilled, contentLength); + assert unfulfilled != 0 || contentLength == 0 || b.remaining() == 0; + + if (unfulfilled == 0 && contentLength > 0) return; + + if (b.hasRemaining() && unfulfilled > 0) { + // only reduce demand if we actually push something. + // we would not have come here if there was no + // demand. + boolean hasDemand = sub.demand().tryDecrement(); + assert hasDemand; + int amount = Math.min(b.remaining(), unfulfilled); + unfulfilled = remaining -= amount; + ByteBuffer buffer = Utils.sliceWithLimitedCapacity(b, amount); + pusher.onNext(List.of(buffer.asReadOnlyBuffer())); + } + if (unfulfilled == 0) { + // We're done! All data has been received. + assert closedExceptionally == null; + onFinished.run(); + pusher.onComplete(); + completed = true; + onComplete.accept(closedExceptionally); // should be null + } else { + assert b.remaining() == 0; + } + } catch (Throwable t) { + debug.log(Level.DEBUG, "Unexpected exception", t); + closedExceptionally = t; + if (!completed) { + onComplete.accept(t); + } + } + } + } +}