--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java Fri Nov 03 10:01:08 2017 -0700
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java Wed Dec 06 11:11:59 2017 -0800
@@ -26,8 +26,10 @@
package jdk.incubator.http;
import java.io.IOException;
+import java.lang.System.Logger.Level;
import java.nio.ByteBuffer;
-import java.util.Optional;
+import java.util.ArrayList;
+import java.util.List;
import java.util.function.Consumer;
import jdk.incubator.http.internal.common.Utils;
@@ -39,45 +41,35 @@
*/
class ResponseContent {
- final HttpResponse.BodyProcessor<?> pusher;
- final HttpConnection connection;
+ static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
+
+ final HttpResponse.BodySubscriber<?> pusher;
final int contentLength;
- ByteBuffer buffer;
- //ByteBuffer lastBufferUsed;
- final ResponseHeaders headers;
- private final Consumer<Optional<ByteBuffer>> dataConsumer;
- private final Consumer<IOException> errorConsumer;
- private final HttpClientImpl client;
+ final HttpHeaders headers;
// this needs to run before we complete the body
// so that connection can be returned to pool
private final Runnable onFinished;
+ private final String dbgTag;
ResponseContent(HttpConnection connection,
int contentLength,
- ResponseHeaders h,
- HttpResponse.BodyProcessor<?> userProcessor,
- Consumer<Optional<ByteBuffer>> dataConsumer,
- Consumer<IOException> errorConsumer,
+ HttpHeaders h,
+ HttpResponse.BodySubscriber<?> userSubscriber,
Runnable onFinished)
{
- this.pusher = (HttpResponse.BodyProcessor)userProcessor;
- this.connection = connection;
+ this.pusher = userSubscriber;
this.contentLength = contentLength;
this.headers = h;
- this.dataConsumer = dataConsumer;
- this.errorConsumer = errorConsumer;
- this.client = connection.client;
this.onFinished = onFinished;
+ this.dbgTag = connection.dbgString() + "/ResponseContent";
}
static final int LF = 10;
static final int CR = 13;
- static final int SP = 0x20;
- static final int BUF_SIZE = 1024;
- boolean chunkedContent, chunkedContentInitialized;
+ private boolean chunkedContent, chunkedContentInitialized;
- private boolean contentChunked() throws IOException {
+ boolean contentChunked() throws IOException {
if (chunkedContentInitialized) {
return chunkedContent;
}
@@ -98,182 +90,375 @@
return chunkedContent;
}
- /**
- * Entry point for pusher. b is an initial ByteBuffer that may
- * have some data in it. When this method returns, the body
- * has been fully processed.
- */
- void pushBody(ByteBuffer b) {
- try {
- // TODO: check status
- if (contentChunked()) {
- pushBodyChunked(b);
- } else {
- pushBodyFixed(b);
- }
- } catch (IOException t) {
- errorConsumer.accept(t);
+ interface BodyParser extends Consumer<ByteBuffer> {
+ void onSubscribe(AbstractSubscription sub);
+ }
+
+ // Returns a parser that will take care of parsing the received byte
+ // buffers and forward them to the BodySubscriber.
+ // When the parser is done, it will call onComplete.
+ // If parsing was successful, the throwable parameter will be null.
+ // Otherwise it will be the exception that occurred
+ // Note: revisit: it might be better to use a CompletableFuture than
+ // a completion handler.
+ BodyParser getBodyParser(Consumer<Throwable> onComplete)
+ throws IOException {
+ if (contentChunked()) {
+ return new ChunkedBodyParser(onComplete);
+ } else {
+ return new FixedLengthBodyParser(contentLength, onComplete);
}
}
- // reads and returns chunklen. Position of chunkbuf is first byte
- // of chunk on return. chunklen includes the CR LF at end of chunk
- int readChunkLen() throws IOException {
- chunklen = 0;
- boolean cr = false;
- while (true) {
- getHunk();
- int c = chunkbuf.get();
- if (cr) {
- if (c == LF) {
- return chunklen + 2;
- } else {
- throw new IOException("invalid chunk header");
- }
- }
- if (c == CR) {
- cr = true;
- } else {
- int digit = toDigit(c);
- chunklen = chunklen * 16 + digit;
- }
+
+ static enum ChunkState {READING_LENGTH, READING_DATA, DONE}
+ class ChunkedBodyParser implements BodyParser {
+ final ByteBuffer READMORE = Utils.EMPTY_BYTEBUFFER;
+ final Consumer<Throwable> onComplete;
+ final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
+ final String dbgTag = ResponseContent.this.dbgTag + "/ChunkedBodyParser";
+
+ volatile Throwable closedExceptionally;
+ volatile int partialChunklen = 0; // partially read chunk len
+ volatile int chunklen = -1; // number of bytes in chunk
+ volatile int bytesremaining; // number of bytes in chunk left to be read incl CRLF
+ volatile boolean cr = false; // tryReadChunkLength has found CR
+ volatile int bytesToConsume; // number of bytes that still need to be consumed before proceeding
+ volatile ChunkState state = ChunkState.READING_LENGTH; // current state
+ volatile AbstractSubscription sub;
+ ChunkedBodyParser(Consumer<Throwable> onComplete) {
+ this.onComplete = onComplete;
}
- }
+
+ String dbgString() {
+ return dbgTag;
+ }
- int chunklen = -1; // number of bytes in chunk (fixed)
- int bytesremaining; // number of bytes in chunk left to be read incl CRLF
- int bytesread;
- ByteBuffer chunkbuf; // initialise
+ @Override
+ public void onSubscribe(AbstractSubscription sub) {
+ debug.log(Level.DEBUG, () -> "onSubscribe: "
+ + pusher.getClass().getName());
+ pusher.onSubscribe(this.sub = sub);
+ }
- // make sure we have at least 1 byte to look at
- private void getHunk() throws IOException {
- if (chunkbuf == null || !chunkbuf.hasRemaining()) {
- chunkbuf = connection.read();
- }
- }
-
- private void consumeBytes(int n) throws IOException {
- getHunk();
- while (n > 0) {
- int e = Math.min(chunkbuf.remaining(), n);
- chunkbuf.position(chunkbuf.position() + e);
- n -= e;
- if (n > 0) {
- getHunk();
+ @Override
+ public void accept(ByteBuffer b) {
+ if (closedExceptionally != null) {
+ debug.log(Level.DEBUG, () -> "already closed: "
+ + closedExceptionally);
+ return;
}
- }
- }
+ boolean completed = false;
+ try {
+ List<ByteBuffer> out = new ArrayList<>();
+ do {
+ if (tryPushOneHunk(b, out)) {
+ // We're done! (true if the final chunk was parsed).
+ if (!out.isEmpty()) {
+ // push what we have and complete
+ // only reduce demand if we actually push something.
+ // we would not have come here if there was no
+ // demand.
+ boolean hasDemand = sub.demand().tryDecrement();
+ assert hasDemand;
+ pusher.onNext(out);
+ }
+ debug.log(Level.DEBUG, () -> "done!");
+ assert closedExceptionally == null;
+ assert state == ChunkState.DONE;
+ onFinished.run();
+ pusher.onComplete();
+ completed = true;
+ onComplete.accept(closedExceptionally); // should be null
+ break;
+ }
+ // the buffer may contain several hunks, and therefore
+ // we must loop while it's not exhausted.
+ } while (b.hasRemaining());
- /**
- * Returns a ByteBuffer containing a chunk of data or a "hunk" of data
- * (a chunk of a chunk if the chunk size is larger than our ByteBuffers).
- * ByteBuffer returned is obtained from response processor.
- */
- ByteBuffer readChunkedBuffer() throws IOException {
- if (chunklen == -1) {
- // new chunk
- chunklen = readChunkLen() - 2;
- bytesremaining = chunklen;
- if (chunklen == 0) {
- consumeBytes(2);
- return null;
+ if (!completed && !out.isEmpty()) {
+ // push what we have.
+ // only reduce demand if we actually push something.
+ // we would not have come here if there was no
+ // demand.
+ boolean hasDemand = sub.demand().tryDecrement();
+ assert hasDemand;
+ pusher.onNext(out);
+ }
+ assert state == ChunkState.DONE || !b.hasRemaining();
+ } catch(Throwable t) {
+ closedExceptionally = t;
+ if (!completed) onComplete.accept(t);
}
}
- getHunk();
- bytesread = chunkbuf.remaining();
- ByteBuffer returnBuffer = Utils.getBuffer();
- int space = returnBuffer.remaining();
+ // reads and returns chunklen. Position of chunkbuf is first byte
+ // of chunk on return. chunklen includes the CR LF at end of chunk
+ // returns -1 if needs more bytes
+ private int tryReadChunkLen(ByteBuffer chunkbuf) throws IOException {
+ assert state == ChunkState.READING_LENGTH;
+ while (chunkbuf.hasRemaining()) {
+ int c = chunkbuf.get();
+ if (cr) {
+ if (c == LF) {
+ return partialChunklen;
+ } else {
+ throw new IOException("invalid chunk header");
+ }
+ }
+ if (c == CR) {
+ cr = true;
+ } else {
+ int digit = toDigit(c);
+ partialChunklen = partialChunklen * 16 + digit;
+ }
+ }
+ return -1;
+ }
+
+
+ // try to consume as many bytes as specified by bytesToConsume.
+ // returns the number of bytes that still need to be consumed.
+ // In practice this method is only called to consume one CRLF pair
+ // with bytesToConsume set to 2, so it will only return 0 (if completed),
+ // 1, or 2 (if chunkbuf doesn't have the 2 chars).
+ private int tryConsumeBytes(ByteBuffer chunkbuf) throws IOException {
+ int n = bytesToConsume;
+ if (n > 0) {
+ int e = Math.min(chunkbuf.remaining(), n);
+
+ // verifies some assertions
+ // this methods is called only to consume CRLF
+ if (Utils.ASSERTIONSENABLED) {
+ assert n <= 2 && e <= 2;
+ ByteBuffer tmp = chunkbuf.slice();
+ // if n == 2 assert that we will first consume CR
+ assert (n == 2 && e > 0) ? tmp.get() == CR : true;
+ // if n == 1 || n == 2 && e == 2 assert that we then consume LF
+ assert (n == 1 || e == 2) ? tmp.get() == LF : true;
+ }
+
+ chunkbuf.position(chunkbuf.position() + e);
+ n -= e;
+ bytesToConsume = n;
+ }
+ assert n >= 0;
+ return n;
+ }
+
+ /**
+ * Returns a ByteBuffer containing chunk of data or a "hunk" of data
+ * (a chunk of a chunk if the chunk size is larger than our ByteBuffers).
+ * If the given chunk does not have enough data this method return
+ * an empty ByteBuffer (READMORE).
+ * If we encounter the final chunk (an empty chunk) this method
+ * returns null.
+ */
+ ByteBuffer tryReadOneHunk(ByteBuffer chunk) throws IOException {
+ int unfulfilled = bytesremaining;
+ int toconsume = bytesToConsume;
+ ChunkState st = state;
+ if (st == ChunkState.READING_LENGTH && chunklen == -1) {
+ debug.log(Level.DEBUG, () -> "Trying to read chunk len"
+ + " (remaining in buffer:"+chunk.remaining()+")");
+ int clen = chunklen = tryReadChunkLen(chunk);
+ if (clen == -1) return READMORE;
+ debug.log(Level.DEBUG, "Got chunk len %d", clen);
+ cr = false; partialChunklen = 0;
+ unfulfilled = bytesremaining = clen;
+ if (clen == 0) toconsume = bytesToConsume = 2; // that was the last chunk
+ else st = state = ChunkState.READING_DATA; // read the data
+ }
+
+ if (toconsume > 0) {
+ debug.log(Level.DEBUG,
+ "Trying to consume bytes: %d (remaining in buffer: %s)",
+ toconsume, chunk.remaining());
+ if (tryConsumeBytes(chunk) > 0) {
+ return READMORE;
+ }
+ }
+
+ toconsume = bytesToConsume;
+ assert toconsume == 0;
+
- int bytes2Copy = Math.min(bytesread, Math.min(bytesremaining, space));
- Utils.copy(chunkbuf, returnBuffer, bytes2Copy);
- returnBuffer.flip();
- bytesremaining -= bytes2Copy;
- if (bytesremaining == 0) {
- consumeBytes(2);
- chunklen = -1;
+ if (st == ChunkState.READING_LENGTH) {
+ // we will come here only if chunklen was 0, after having
+ // consumed the trailing CRLF
+ int clen = chunklen;
+ assert clen == 0;
+ debug.log(Level.DEBUG, "No more chunks: %d", clen);
+ // the DONE state is not really needed but it helps with
+ // assertions...
+ state = ChunkState.DONE;
+ return null;
+ }
+
+ int clen = chunklen;
+ assert clen > 0;
+ assert st == ChunkState.READING_DATA;
+
+ ByteBuffer returnBuffer = READMORE; // May be a hunk or a chunk
+ if (unfulfilled > 0) {
+ int bytesread = chunk.remaining();
+ debug.log(Level.DEBUG, "Reading chunk: available %d, needed %d",
+ bytesread, unfulfilled);
+
+ int bytes2return = Math.min(bytesread, unfulfilled);
+ debug.log(Level.DEBUG, "Returning chunk bytes: %d", bytes2return);
+ returnBuffer = Utils.slice(chunk, bytes2return);
+ unfulfilled = bytesremaining -= bytes2return;
+ if (unfulfilled == 0) bytesToConsume = 2;
+ }
+
+ assert unfulfilled >= 0;
+
+ if (unfulfilled == 0) {
+ debug.log(Level.DEBUG,
+ "No more bytes to read - %d yet to consume.",
+ unfulfilled);
+ // check whether the trailing CRLF is consumed, try to
+ // consume it if not. If tryConsumeBytes needs more bytes
+ // then we will come back here later - skipping the block
+ // that reads data because remaining==0, and finding
+ // that the two bytes are now consumed.
+ if (tryConsumeBytes(chunk) == 0) {
+ // we're done for this chunk! reset all states and
+ // prepare to read the next chunk.
+ chunklen = -1;
+ partialChunklen = 0;
+ cr = false;
+ state = ChunkState.READING_LENGTH;
+ debug.log(Level.DEBUG, "Ready to read next chunk");
+ }
+ }
+ if (returnBuffer == READMORE) {
+ debug.log(Level.DEBUG, "Need more data");
+ }
+ return returnBuffer;
}
- return returnBuffer;
+
+
+ // Attempt to parse and push one hunk from the buffer.
+ // Returns true if the final chunk was parsed.
+ // Returns false if we need to push more chunks.
+ private boolean tryPushOneHunk(ByteBuffer b, List<ByteBuffer> out)
+ throws IOException {
+ assert state != ChunkState.DONE;
+ ByteBuffer b1 = tryReadOneHunk(b);
+ if (b1 != null) {
+ //assert b1.hasRemaining() || b1 == READMORE;
+ if (b1.hasRemaining()) {
+ debug.log(Level.DEBUG, "Sending chunk to consumer (%d)",
+ b1.remaining());
+ out.add(b1);
+ debug.log(Level.DEBUG, "Chunk sent.");
+ }
+ return false; // we haven't parsed the final chunk yet.
+ } else {
+ return true; // we're done! the final chunk was parsed.
+ }
+ }
+
+ private int toDigit(int b) throws IOException {
+ if (b >= 0x30 && b <= 0x39) {
+ return b - 0x30;
+ }
+ if (b >= 0x41 && b <= 0x46) {
+ return b - 0x41 + 10;
+ }
+ if (b >= 0x61 && b <= 0x66) {
+ return b - 0x61 + 10;
+ }
+ throw new IOException("Invalid chunk header byte " + b);
+ }
+
}
- ByteBuffer initialBuffer;
- int fixedBytesReturned;
+ class FixedLengthBodyParser implements BodyParser {
+ final int contentLength;
+ final Consumer<Throwable> onComplete;
+ final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
+ final String dbgTag = ResponseContent.this.dbgTag + "/FixedLengthBodyParser";
+ volatile int remaining;
+ volatile Throwable closedExceptionally;
+ volatile AbstractSubscription sub;
+ FixedLengthBodyParser(int contentLength, Consumer<Throwable> onComplete) {
+ this.contentLength = this.remaining = contentLength;
+ this.onComplete = onComplete;
+ }
+
+ String dbgString() {
+ return dbgTag;
+ }
- //ByteBuffer getResidue() {
- //return lastBufferUsed;
- //}
-
- private void compactBuffer(ByteBuffer buf) {
- buf.compact()
- .flip();
- }
+ @Override
+ public void onSubscribe(AbstractSubscription sub) {
+ debug.log(Level.DEBUG, () -> "length="
+ + contentLength +", onSubscribe: "
+ + pusher.getClass().getName());
+ pusher.onSubscribe(this.sub = sub);
+ try {
+ if (contentLength == 0) {
+ pusher.onComplete();
+ onFinished.run();
+ onComplete.accept(null);
+ }
+ } catch (Throwable t) {
+ closedExceptionally = t;
+ try {
+ pusher.onError(t);
+ } finally {
+ onComplete.accept(t);
+ }
+ }
+ }
- /**
- * Copies inbuf (numBytes from its position) to new buffer. The returned
- * buffer's position is zero and limit is at end (numBytes)
- */
- private ByteBuffer copyBuffer(ByteBuffer inbuf, int numBytes) {
- ByteBuffer b1 = Utils.getBuffer();
- assert b1.remaining() >= numBytes;
- byte[] b = b1.array();
- inbuf.get(b, 0, numBytes);
- b1.limit(numBytes);
- return b1;
- }
+ @Override
+ public void accept(ByteBuffer b) {
+ if (closedExceptionally != null) {
+ debug.log(Level.DEBUG, () -> "already closed: "
+ + closedExceptionally);
+ return;
+ }
+ boolean completed = false;
+ try {
+ int unfulfilled = remaining;
+ debug.log(Level.DEBUG, "Parser got %d bytes (%d remaining / %d)",
+ b.remaining(), unfulfilled, contentLength);
+ assert unfulfilled != 0 || contentLength == 0 || b.remaining() == 0;
+
+ if (unfulfilled == 0 && contentLength > 0) return;
- private void pushBodyChunked(ByteBuffer b) throws IOException {
- chunkbuf = b;
- while (true) {
- ByteBuffer b1 = readChunkedBuffer();
- if (b1 != null) {
- if (b1.hasRemaining()) {
- dataConsumer.accept(Optional.of(b1));
+ if (b.hasRemaining() && unfulfilled > 0) {
+ // only reduce demand if we actually push something.
+ // we would not have come here if there was no
+ // demand.
+ boolean hasDemand = sub.demand().tryDecrement();
+ assert hasDemand;
+ int amount = Math.min(b.remaining(), unfulfilled);
+ unfulfilled = remaining -= amount;
+ ByteBuffer buffer = Utils.slice(b, amount);
+ pusher.onNext(List.of(buffer));
}
- } else {
- onFinished.run();
- dataConsumer.accept(Optional.empty());
- return;
+ if (unfulfilled == 0) {
+ // We're done! All data has been received.
+ assert closedExceptionally == null;
+ onFinished.run();
+ pusher.onComplete();
+ completed = true;
+ onComplete.accept(closedExceptionally); // should be null
+ } else {
+ assert b.remaining() == 0;
+ }
+ } catch (Throwable t) {
+ debug.log(Level.DEBUG, "Unexpected exception", t);
+ closedExceptionally = t;
+ if (!completed) {
+ onComplete.accept(t);
+ }
}
}
}
-
- private int toDigit(int b) throws IOException {
- if (b >= 0x30 && b <= 0x39) {
- return b - 0x30;
- }
- if (b >= 0x41 && b <= 0x46) {
- return b - 0x41 + 10;
- }
- if (b >= 0x61 && b <= 0x66) {
- return b - 0x61 + 10;
- }
- throw new IOException("Invalid chunk header byte " + b);
- }
-
- private void pushBodyFixed(ByteBuffer b) throws IOException {
- int remaining = contentLength;
- while (b.hasRemaining() && remaining > 0) {
- ByteBuffer buffer = Utils.getBuffer();
- int amount = Math.min(b.remaining(), remaining);
- Utils.copy(b, buffer, amount);
- remaining -= amount;
- buffer.flip();
- dataConsumer.accept(Optional.of(buffer));
- }
- while (remaining > 0) {
- ByteBuffer buffer = connection.read();
- if (buffer == null)
- throw new IOException("connection closed");
-
- int bytesread = buffer.remaining();
- // assume for now that pipelining not implemented
- if (bytesread > remaining) {
- throw new IOException("too many bytes read");
- }
- remaining -= bytesread;
- dataConsumer.accept(Optional.of(buffer));
- }
- onFinished.run();
- dataConsumer.accept(Optional.empty());
- }
}