src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java
changeset 48083 b1c1b4ef4be2
parent 47216 71c04702a3d5
child 48376 41ae5c69b09c
child 55973 4d9b002587db
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java	Fri Nov 03 10:01:08 2017 -0700
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java	Wed Dec 06 11:11:59 2017 -0800
@@ -26,8 +26,10 @@
 package jdk.incubator.http;
 
 import java.io.IOException;
+import java.lang.System.Logger.Level;
 import java.nio.ByteBuffer;
-import java.util.Optional;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.function.Consumer;
 import jdk.incubator.http.internal.common.Utils;
 
@@ -39,45 +41,35 @@
  */
 class ResponseContent {
 
-    final HttpResponse.BodyProcessor<?> pusher;
-    final HttpConnection connection;
+    static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
+
+    final HttpResponse.BodySubscriber<?> pusher;
     final int contentLength;
-    ByteBuffer buffer;
-    //ByteBuffer lastBufferUsed;
-    final ResponseHeaders headers;
-    private final Consumer<Optional<ByteBuffer>> dataConsumer;
-    private final Consumer<IOException> errorConsumer;
-    private final HttpClientImpl client;
+    final HttpHeaders headers;
     // this needs to run before we complete the body
     // so that connection can be returned to pool
     private final Runnable onFinished;
+    private final String dbgTag;
 
     ResponseContent(HttpConnection connection,
                     int contentLength,
-                    ResponseHeaders h,
-                    HttpResponse.BodyProcessor<?> userProcessor,
-                    Consumer<Optional<ByteBuffer>> dataConsumer,
-                    Consumer<IOException> errorConsumer,
+                    HttpHeaders h,
+                    HttpResponse.BodySubscriber<?> userSubscriber,
                     Runnable onFinished)
     {
-        this.pusher = (HttpResponse.BodyProcessor)userProcessor;
-        this.connection = connection;
+        this.pusher = userSubscriber;
         this.contentLength = contentLength;
         this.headers = h;
-        this.dataConsumer = dataConsumer;
-        this.errorConsumer = errorConsumer;
-        this.client = connection.client;
         this.onFinished = onFinished;
+        this.dbgTag = connection.dbgString() + "/ResponseContent";
     }
 
     static final int LF = 10;
     static final int CR = 13;
-    static final int SP = 0x20;
-    static final int BUF_SIZE = 1024;
 
-    boolean chunkedContent, chunkedContentInitialized;
+    private boolean chunkedContent, chunkedContentInitialized;
 
-    private boolean contentChunked() throws IOException {
+    boolean contentChunked() throws IOException {
         if (chunkedContentInitialized) {
             return chunkedContent;
         }
@@ -98,182 +90,375 @@
         return chunkedContent;
     }
 
-    /**
-     * Entry point for pusher. b is an initial ByteBuffer that may
-     * have some data in it. When this method returns, the body
-     * has been fully processed.
-     */
-    void pushBody(ByteBuffer b) {
-        try {
-            // TODO: check status
-            if (contentChunked()) {
-                pushBodyChunked(b);
-            } else {
-                pushBodyFixed(b);
-            }
-        } catch (IOException t) {
-            errorConsumer.accept(t);
+    interface BodyParser extends Consumer<ByteBuffer> {
+        void onSubscribe(AbstractSubscription sub);
+    }
+
+    // Returns a parser that will take care of parsing the received byte
+    // buffers and forward them to the BodySubscriber.
+    // When the parser is done, it will call onComplete.
+    // If parsing was successful, the throwable parameter will be null.
+    // Otherwise it will be the exception that occurred
+    // Note: revisit: it might be better to use a CompletableFuture than
+    //       a completion handler.
+    BodyParser getBodyParser(Consumer<Throwable> onComplete)
+        throws IOException {
+        if (contentChunked()) {
+            return new ChunkedBodyParser(onComplete);
+        } else {
+            return new FixedLengthBodyParser(contentLength, onComplete);
         }
     }
 
-    // reads and returns chunklen. Position of chunkbuf is first byte
-    // of chunk on return. chunklen includes the CR LF at end of chunk
-    int readChunkLen() throws IOException {
-        chunklen = 0;
-        boolean cr = false;
-        while (true) {
-            getHunk();
-            int c = chunkbuf.get();
-            if (cr) {
-                if (c == LF) {
-                    return chunklen + 2;
-                } else {
-                    throw new IOException("invalid chunk header");
-                }
-            }
-            if (c == CR) {
-                cr = true;
-            } else {
-                int digit = toDigit(c);
-                chunklen = chunklen * 16 + digit;
-            }
+
+    static enum ChunkState {READING_LENGTH, READING_DATA, DONE}
+    class ChunkedBodyParser implements BodyParser {
+        final ByteBuffer READMORE = Utils.EMPTY_BYTEBUFFER;
+        final Consumer<Throwable> onComplete;
+        final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
+        final String dbgTag = ResponseContent.this.dbgTag + "/ChunkedBodyParser";
+
+        volatile Throwable closedExceptionally;
+        volatile int partialChunklen = 0; // partially read chunk len
+        volatile int chunklen = -1;  // number of bytes in chunk
+        volatile int bytesremaining;  // number of bytes in chunk left to be read incl CRLF
+        volatile boolean cr = false;  // tryReadChunkLength has found CR
+        volatile int bytesToConsume;  // number of bytes that still need to be consumed before proceeding
+        volatile ChunkState state = ChunkState.READING_LENGTH; // current state
+        volatile AbstractSubscription sub;
+        ChunkedBodyParser(Consumer<Throwable> onComplete) {
+            this.onComplete = onComplete;
         }
-    }
+
+        String dbgString() {
+            return dbgTag;
+        }
 
-    int chunklen = -1;      // number of bytes in chunk (fixed)
-    int bytesremaining;     // number of bytes in chunk left to be read incl CRLF
-    int bytesread;
-    ByteBuffer chunkbuf;    // initialise
+        @Override
+        public void onSubscribe(AbstractSubscription sub) {
+            debug.log(Level.DEBUG, () ->  "onSubscribe: "
+                        + pusher.getClass().getName());
+            pusher.onSubscribe(this.sub = sub);
+        }
 
-    // make sure we have at least 1 byte to look at
-    private void getHunk() throws IOException {
-        if (chunkbuf == null || !chunkbuf.hasRemaining()) {
-            chunkbuf = connection.read();
-        }
-    }
-
-    private void consumeBytes(int n) throws IOException {
-        getHunk();
-        while (n > 0) {
-            int e = Math.min(chunkbuf.remaining(), n);
-            chunkbuf.position(chunkbuf.position() + e);
-            n -= e;
-            if (n > 0) {
-                getHunk();
+        @Override
+        public void accept(ByteBuffer b) {
+            if (closedExceptionally != null) {
+                debug.log(Level.DEBUG, () ->  "already closed: "
+                            + closedExceptionally);
+                return;
             }
-        }
-    }
+            boolean completed = false;
+            try {
+                List<ByteBuffer> out = new ArrayList<>();
+                do {
+                    if (tryPushOneHunk(b, out))  {
+                        // We're done! (true if the final chunk was parsed).
+                        if (!out.isEmpty()) {
+                            // push what we have and complete
+                            // only reduce demand if we actually push something.
+                            // we would not have come here if there was no
+                            // demand.
+                            boolean hasDemand = sub.demand().tryDecrement();
+                            assert hasDemand;
+                            pusher.onNext(out);
+                        }
+                        debug.log(Level.DEBUG, () ->  "done!");
+                        assert closedExceptionally == null;
+                        assert state == ChunkState.DONE;
+                        onFinished.run();
+                        pusher.onComplete();
+                        completed = true;
+                        onComplete.accept(closedExceptionally); // should be null
+                        break;
+                    }
+                    // the buffer may contain several hunks, and therefore
+                    // we must loop while it's not exhausted.
+                } while (b.hasRemaining());
 
-    /**
-     * Returns a ByteBuffer containing a chunk of data or a "hunk" of data
-     * (a chunk of a chunk if the chunk size is larger than our ByteBuffers).
-     * ByteBuffer returned is obtained from response processor.
-     */
-    ByteBuffer readChunkedBuffer() throws IOException {
-        if (chunklen == -1) {
-            // new chunk
-            chunklen = readChunkLen() - 2;
-            bytesremaining =  chunklen;
-            if (chunklen == 0) {
-                consumeBytes(2);
-                return null;
+                if (!completed && !out.isEmpty()) {
+                    // push what we have.
+                    // only reduce demand if we actually push something.
+                    // we would not have come here if there was no
+                    // demand.
+                    boolean hasDemand = sub.demand().tryDecrement();
+                    assert hasDemand;
+                    pusher.onNext(out);
+                }
+                assert state == ChunkState.DONE || !b.hasRemaining();
+            } catch(Throwable t) {
+                closedExceptionally = t;
+                if (!completed) onComplete.accept(t);
             }
         }
 
-        getHunk();
-        bytesread = chunkbuf.remaining();
-        ByteBuffer returnBuffer = Utils.getBuffer();
-        int space = returnBuffer.remaining();
+        // reads and returns chunklen. Position of chunkbuf is first byte
+        // of chunk on return. chunklen includes the CR LF at end of chunk
+        // returns -1 if needs more bytes
+        private int tryReadChunkLen(ByteBuffer chunkbuf) throws IOException {
+            assert state == ChunkState.READING_LENGTH;
+            while (chunkbuf.hasRemaining()) {
+                int c = chunkbuf.get();
+                if (cr) {
+                    if (c == LF) {
+                        return partialChunklen;
+                    } else {
+                        throw new IOException("invalid chunk header");
+                    }
+                }
+                if (c == CR) {
+                    cr = true;
+                } else {
+                    int digit = toDigit(c);
+                    partialChunklen = partialChunklen * 16 + digit;
+                }
+            }
+            return -1;
+        }
+
+
+        // try to consume as many bytes as specified by bytesToConsume.
+        // returns the number of bytes that still need to be consumed.
+        // In practice this method is only called to consume one CRLF pair
+        // with bytesToConsume set to 2, so it will only return 0 (if completed),
+        // 1, or 2 (if chunkbuf doesn't have the 2 chars).
+        private int tryConsumeBytes(ByteBuffer chunkbuf) throws IOException {
+            int n = bytesToConsume;
+            if (n > 0) {
+                int e = Math.min(chunkbuf.remaining(), n);
+
+                // verifies some assertions
+                // this methods is called only to consume CRLF
+                if (Utils.ASSERTIONSENABLED) {
+                    assert n <= 2 && e <= 2;
+                    ByteBuffer tmp = chunkbuf.slice();
+                    // if n == 2 assert that we will first consume CR
+                    assert (n == 2 && e > 0) ? tmp.get() == CR : true;
+                    // if n == 1 || n == 2 && e == 2 assert that we then consume LF
+                    assert (n == 1 || e == 2) ? tmp.get() == LF : true;
+                }
+
+                chunkbuf.position(chunkbuf.position() + e);
+                n -= e;
+                bytesToConsume = n;
+            }
+            assert n >= 0;
+            return n;
+        }
+
+        /**
+         * Returns a ByteBuffer containing chunk of data or a "hunk" of data
+         * (a chunk of a chunk if the chunk size is larger than our ByteBuffers).
+         * If the given chunk does not have enough data this method return
+         * an empty ByteBuffer (READMORE).
+         * If we encounter the final chunk (an empty chunk) this method
+         * returns null.
+         */
+        ByteBuffer tryReadOneHunk(ByteBuffer chunk) throws IOException {
+            int unfulfilled = bytesremaining;
+            int toconsume = bytesToConsume;
+            ChunkState st = state;
+            if (st == ChunkState.READING_LENGTH && chunklen == -1) {
+                debug.log(Level.DEBUG, () ->  "Trying to read chunk len"
+                        + " (remaining in buffer:"+chunk.remaining()+")");
+                int clen = chunklen = tryReadChunkLen(chunk);
+                if (clen == -1) return READMORE;
+                debug.log(Level.DEBUG, "Got chunk len %d", clen);
+                cr = false; partialChunklen = 0;
+                unfulfilled = bytesremaining =  clen;
+                if (clen == 0) toconsume = bytesToConsume = 2; // that was the last chunk
+                else st = state = ChunkState.READING_DATA; // read the data
+            }
+
+            if (toconsume > 0) {
+                debug.log(Level.DEBUG,
+                        "Trying to consume bytes: %d (remaining in buffer: %s)",
+                        toconsume, chunk.remaining());
+                if (tryConsumeBytes(chunk) > 0) {
+                    return READMORE;
+                }
+            }
+
+            toconsume = bytesToConsume;
+            assert toconsume == 0;
+
 
-        int bytes2Copy = Math.min(bytesread, Math.min(bytesremaining, space));
-        Utils.copy(chunkbuf, returnBuffer, bytes2Copy);
-        returnBuffer.flip();
-        bytesremaining -= bytes2Copy;
-        if (bytesremaining == 0) {
-            consumeBytes(2);
-            chunklen = -1;
+            if (st == ChunkState.READING_LENGTH) {
+                // we will come here only if chunklen was 0, after having
+                // consumed the trailing CRLF
+                int clen = chunklen;
+                assert clen == 0;
+                debug.log(Level.DEBUG, "No more chunks: %d", clen);
+                // the DONE state is not really needed but it helps with
+                // assertions...
+                state = ChunkState.DONE;
+                return null;
+            }
+
+            int clen = chunklen;
+            assert clen > 0;
+            assert st == ChunkState.READING_DATA;
+
+            ByteBuffer returnBuffer = READMORE; // May be a hunk or a chunk
+            if (unfulfilled > 0) {
+                int bytesread = chunk.remaining();
+                debug.log(Level.DEBUG, "Reading chunk: available %d, needed %d",
+                          bytesread, unfulfilled);
+
+                int bytes2return = Math.min(bytesread, unfulfilled);
+                debug.log(Level.DEBUG,  "Returning chunk bytes: %d", bytes2return);
+                returnBuffer = Utils.slice(chunk, bytes2return);
+                unfulfilled = bytesremaining -= bytes2return;
+                if (unfulfilled == 0) bytesToConsume = 2;
+            }
+
+            assert unfulfilled >= 0;
+
+            if (unfulfilled == 0) {
+                debug.log(Level.DEBUG,
+                        "No more bytes to read - %d yet to consume.",
+                        unfulfilled);
+                // check whether the trailing CRLF is consumed, try to
+                // consume it if not. If tryConsumeBytes needs more bytes
+                // then we will come back here later - skipping the block
+                // that reads data because remaining==0, and finding
+                // that the two bytes are now consumed.
+                if (tryConsumeBytes(chunk) == 0) {
+                    // we're done for this chunk! reset all states and
+                    // prepare to read the next chunk.
+                    chunklen = -1;
+                    partialChunklen = 0;
+                    cr = false;
+                    state = ChunkState.READING_LENGTH;
+                    debug.log(Level.DEBUG, "Ready to read next chunk");
+                }
+            }
+            if (returnBuffer == READMORE) {
+                debug.log(Level.DEBUG, "Need more data");
+            }
+            return returnBuffer;
         }
-        return returnBuffer;
+
+
+        // Attempt to parse and push one hunk from the buffer.
+        // Returns true if the final chunk was parsed.
+        // Returns false if we need to push more chunks.
+        private boolean tryPushOneHunk(ByteBuffer b, List<ByteBuffer> out)
+                throws IOException {
+            assert state != ChunkState.DONE;
+            ByteBuffer b1 = tryReadOneHunk(b);
+            if (b1 != null) {
+                //assert b1.hasRemaining() || b1 == READMORE;
+                if (b1.hasRemaining()) {
+                    debug.log(Level.DEBUG, "Sending chunk to consumer (%d)",
+                              b1.remaining());
+                    out.add(b1);
+                    debug.log(Level.DEBUG, "Chunk sent.");
+                }
+                return false; // we haven't parsed the final chunk yet.
+            } else {
+                return true; // we're done! the final chunk was parsed.
+            }
+        }
+
+        private int toDigit(int b) throws IOException {
+            if (b >= 0x30 && b <= 0x39) {
+                return b - 0x30;
+            }
+            if (b >= 0x41 && b <= 0x46) {
+                return b - 0x41 + 10;
+            }
+            if (b >= 0x61 && b <= 0x66) {
+                return b - 0x61 + 10;
+            }
+            throw new IOException("Invalid chunk header byte " + b);
+        }
+
     }
 
-    ByteBuffer initialBuffer;
-    int fixedBytesReturned;
+    class FixedLengthBodyParser implements BodyParser {
+        final int contentLength;
+        final Consumer<Throwable> onComplete;
+        final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
+        final String dbgTag = ResponseContent.this.dbgTag + "/FixedLengthBodyParser";
+        volatile int remaining;
+        volatile Throwable closedExceptionally;
+        volatile AbstractSubscription sub;
+        FixedLengthBodyParser(int contentLength, Consumer<Throwable> onComplete) {
+            this.contentLength = this.remaining = contentLength;
+            this.onComplete = onComplete;
+        }
+
+        String dbgString() {
+            return dbgTag;
+        }
 
-    //ByteBuffer getResidue() {
-        //return lastBufferUsed;
-    //}
-
-    private void compactBuffer(ByteBuffer buf) {
-        buf.compact()
-           .flip();
-    }
+        @Override
+        public void onSubscribe(AbstractSubscription sub) {
+            debug.log(Level.DEBUG, () -> "length="
+                        + contentLength +", onSubscribe: "
+                        + pusher.getClass().getName());
+            pusher.onSubscribe(this.sub = sub);
+            try {
+                if (contentLength == 0) {
+                    pusher.onComplete();
+                    onFinished.run();
+                    onComplete.accept(null);
+                }
+            } catch (Throwable t) {
+                closedExceptionally = t;
+                try {
+                    pusher.onError(t);
+                } finally {
+                    onComplete.accept(t);
+                }
+            }
+        }
 
-    /**
-     * Copies inbuf (numBytes from its position) to new buffer. The returned
-     * buffer's position is zero and limit is at end (numBytes)
-     */
-    private ByteBuffer copyBuffer(ByteBuffer inbuf, int numBytes) {
-        ByteBuffer b1 = Utils.getBuffer();
-        assert b1.remaining() >= numBytes;
-        byte[] b = b1.array();
-        inbuf.get(b, 0, numBytes);
-        b1.limit(numBytes);
-        return b1;
-    }
+        @Override
+        public void accept(ByteBuffer b) {
+            if (closedExceptionally != null) {
+                debug.log(Level.DEBUG, () -> "already closed: "
+                            + closedExceptionally);
+                return;
+            }
+            boolean completed = false;
+            try {
+                int unfulfilled = remaining;
+                debug.log(Level.DEBUG, "Parser got %d bytes (%d remaining / %d)",
+                        b.remaining(), unfulfilled, contentLength);
+                assert unfulfilled != 0 || contentLength == 0 || b.remaining() == 0;
+
+                if (unfulfilled == 0 && contentLength > 0) return;
 
-    private void pushBodyChunked(ByteBuffer b) throws IOException {
-        chunkbuf = b;
-        while (true) {
-            ByteBuffer b1 = readChunkedBuffer();
-            if (b1 != null) {
-                if (b1.hasRemaining()) {
-                    dataConsumer.accept(Optional.of(b1));
+                if (b.hasRemaining() && unfulfilled > 0) {
+                    // only reduce demand if we actually push something.
+                    // we would not have come here if there was no
+                    // demand.
+                    boolean hasDemand = sub.demand().tryDecrement();
+                    assert hasDemand;
+                    int amount = Math.min(b.remaining(), unfulfilled);
+                    unfulfilled = remaining -= amount;
+                    ByteBuffer buffer = Utils.slice(b, amount);
+                    pusher.onNext(List.of(buffer));
                 }
-            } else {
-                onFinished.run();
-                dataConsumer.accept(Optional.empty());
-                return;
+                if (unfulfilled == 0) {
+                    // We're done! All data has been received.
+                    assert closedExceptionally == null;
+                    onFinished.run();
+                    pusher.onComplete();
+                    completed = true;
+                    onComplete.accept(closedExceptionally); // should be null
+                } else {
+                    assert b.remaining() == 0;
+                }
+            } catch (Throwable t) {
+                debug.log(Level.DEBUG, "Unexpected exception", t);
+                closedExceptionally = t;
+                if (!completed) {
+                    onComplete.accept(t);
+                }
             }
         }
     }
-
-    private int toDigit(int b) throws IOException {
-        if (b >= 0x30 && b <= 0x39) {
-            return b - 0x30;
-        }
-        if (b >= 0x41 && b <= 0x46) {
-            return b - 0x41 + 10;
-        }
-        if (b >= 0x61 && b <= 0x66) {
-            return b - 0x61 + 10;
-        }
-        throw new IOException("Invalid chunk header byte " + b);
-    }
-
-    private void pushBodyFixed(ByteBuffer b) throws IOException {
-        int remaining = contentLength;
-        while (b.hasRemaining() && remaining > 0) {
-            ByteBuffer buffer = Utils.getBuffer();
-            int amount = Math.min(b.remaining(), remaining);
-            Utils.copy(b, buffer, amount);
-            remaining -= amount;
-            buffer.flip();
-            dataConsumer.accept(Optional.of(buffer));
-        }
-        while (remaining > 0) {
-            ByteBuffer buffer = connection.read();
-            if (buffer == null)
-                throw new IOException("connection closed");
-
-            int bytesread = buffer.remaining();
-            // assume for now that pipelining not implemented
-            if (bytesread > remaining) {
-                throw new IOException("too many bytes read");
-            }
-            remaining -= bytesread;
-            dataConsumer.accept(Optional.of(buffer));
-        }
-        onFinished.run();
-        dataConsumer.accept(Optional.empty());
-    }
 }