src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java
author smarks
Mon, 04 Dec 2017 11:50:04 -0800
changeset 48059 6ee80cd217e0
parent 47216 71c04702a3d5
child 48083 b1c1b4ef4be2
child 55763 634d8e14c172
permissions -rw-r--r--
8177290: add copy factory methods for unmodifiable List, Set, Map 8184690: add Collectors for collecting into unmodifiable List, Set, and Map Reviewed-by: alanb, briangoetz, dholmes, jrose, rriggs, scolebourne

/*
 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package jdk.incubator.http;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.function.Consumer;
import jdk.incubator.http.internal.common.Utils;

/**
 * Implements chunked/fixed transfer encodings of HTTP/1.1 responses.
 *
 * Call pushBody() to read the body (blocking). Data and errors are provided
 * to given Consumers. After final buffer delivered, empty optional delivered
 */
class ResponseContent {

    final HttpResponse.BodyProcessor<?> pusher;
    final HttpConnection connection;
    final int contentLength;
    ByteBuffer buffer;
    //ByteBuffer lastBufferUsed;
    final ResponseHeaders headers;
    private final Consumer<Optional<ByteBuffer>> dataConsumer;
    private final Consumer<IOException> errorConsumer;
    private final HttpClientImpl client;
    // this needs to run before we complete the body
    // so that connection can be returned to pool
    private final Runnable onFinished;

    ResponseContent(HttpConnection connection,
                    int contentLength,
                    ResponseHeaders h,
                    HttpResponse.BodyProcessor<?> userProcessor,
                    Consumer<Optional<ByteBuffer>> dataConsumer,
                    Consumer<IOException> errorConsumer,
                    Runnable onFinished)
    {
        this.pusher = (HttpResponse.BodyProcessor)userProcessor;
        this.connection = connection;
        this.contentLength = contentLength;
        this.headers = h;
        this.dataConsumer = dataConsumer;
        this.errorConsumer = errorConsumer;
        this.client = connection.client;
        this.onFinished = onFinished;
    }

    static final int LF = 10;
    static final int CR = 13;
    static final int SP = 0x20;
    static final int BUF_SIZE = 1024;

    boolean chunkedContent, chunkedContentInitialized;

    private boolean contentChunked() throws IOException {
        if (chunkedContentInitialized) {
            return chunkedContent;
        }
        if (contentLength == -1) {
            String tc = headers.firstValue("Transfer-Encoding")
                               .orElse("");
            if (!tc.equals("")) {
                if (tc.equalsIgnoreCase("chunked")) {
                    chunkedContent = true;
                } else {
                    throw new IOException("invalid content");
                }
            } else {
                chunkedContent = false;
            }
        }
        chunkedContentInitialized = true;
        return chunkedContent;
    }

    /**
     * Entry point for pusher. b is an initial ByteBuffer that may
     * have some data in it. When this method returns, the body
     * has been fully processed.
     */
    void pushBody(ByteBuffer b) {
        try {
            // TODO: check status
            if (contentChunked()) {
                pushBodyChunked(b);
            } else {
                pushBodyFixed(b);
            }
        } catch (IOException t) {
            errorConsumer.accept(t);
        }
    }

    // reads and returns chunklen. Position of chunkbuf is first byte
    // of chunk on return. chunklen includes the CR LF at end of chunk
    int readChunkLen() throws IOException {
        chunklen = 0;
        boolean cr = false;
        while (true) {
            getHunk();
            int c = chunkbuf.get();
            if (cr) {
                if (c == LF) {
                    return chunklen + 2;
                } else {
                    throw new IOException("invalid chunk header");
                }
            }
            if (c == CR) {
                cr = true;
            } else {
                int digit = toDigit(c);
                chunklen = chunklen * 16 + digit;
            }
        }
    }

    int chunklen = -1;      // number of bytes in chunk (fixed)
    int bytesremaining;     // number of bytes in chunk left to be read incl CRLF
    int bytesread;
    ByteBuffer chunkbuf;    // initialise

    // make sure we have at least 1 byte to look at
    private void getHunk() throws IOException {
        if (chunkbuf == null || !chunkbuf.hasRemaining()) {
            chunkbuf = connection.read();
        }
    }

    private void consumeBytes(int n) throws IOException {
        getHunk();
        while (n > 0) {
            int e = Math.min(chunkbuf.remaining(), n);
            chunkbuf.position(chunkbuf.position() + e);
            n -= e;
            if (n > 0) {
                getHunk();
            }
        }
    }

    /**
     * Returns a ByteBuffer containing a chunk of data or a "hunk" of data
     * (a chunk of a chunk if the chunk size is larger than our ByteBuffers).
     * ByteBuffer returned is obtained from response processor.
     */
    ByteBuffer readChunkedBuffer() throws IOException {
        if (chunklen == -1) {
            // new chunk
            chunklen = readChunkLen() - 2;
            bytesremaining =  chunklen;
            if (chunklen == 0) {
                consumeBytes(2);
                return null;
            }
        }

        getHunk();
        bytesread = chunkbuf.remaining();
        ByteBuffer returnBuffer = Utils.getBuffer();
        int space = returnBuffer.remaining();

        int bytes2Copy = Math.min(bytesread, Math.min(bytesremaining, space));
        Utils.copy(chunkbuf, returnBuffer, bytes2Copy);
        returnBuffer.flip();
        bytesremaining -= bytes2Copy;
        if (bytesremaining == 0) {
            consumeBytes(2);
            chunklen = -1;
        }
        return returnBuffer;
    }

    ByteBuffer initialBuffer;
    int fixedBytesReturned;

    //ByteBuffer getResidue() {
        //return lastBufferUsed;
    //}

    private void compactBuffer(ByteBuffer buf) {
        buf.compact()
           .flip();
    }

    /**
     * Copies inbuf (numBytes from its position) to new buffer. The returned
     * buffer's position is zero and limit is at end (numBytes)
     */
    private ByteBuffer copyBuffer(ByteBuffer inbuf, int numBytes) {
        ByteBuffer b1 = Utils.getBuffer();
        assert b1.remaining() >= numBytes;
        byte[] b = b1.array();
        inbuf.get(b, 0, numBytes);
        b1.limit(numBytes);
        return b1;
    }

    private void pushBodyChunked(ByteBuffer b) throws IOException {
        chunkbuf = b;
        while (true) {
            ByteBuffer b1 = readChunkedBuffer();
            if (b1 != null) {
                if (b1.hasRemaining()) {
                    dataConsumer.accept(Optional.of(b1));
                }
            } else {
                onFinished.run();
                dataConsumer.accept(Optional.empty());
                return;
            }
        }
    }

    private int toDigit(int b) throws IOException {
        if (b >= 0x30 && b <= 0x39) {
            return b - 0x30;
        }
        if (b >= 0x41 && b <= 0x46) {
            return b - 0x41 + 10;
        }
        if (b >= 0x61 && b <= 0x66) {
            return b - 0x61 + 10;
        }
        throw new IOException("Invalid chunk header byte " + b);
    }

    private void pushBodyFixed(ByteBuffer b) throws IOException {
        int remaining = contentLength;
        while (b.hasRemaining() && remaining > 0) {
            ByteBuffer buffer = Utils.getBuffer();
            int amount = Math.min(b.remaining(), remaining);
            Utils.copy(b, buffer, amount);
            remaining -= amount;
            buffer.flip();
            dataConsumer.accept(Optional.of(buffer));
        }
        while (remaining > 0) {
            ByteBuffer buffer = connection.read();
            if (buffer == null)
                throw new IOException("connection closed");

            int bytesread = buffer.remaining();
            // assume for now that pipelining not implemented
            if (bytesread > remaining) {
                throw new IOException("too many bytes read");
            }
            remaining -= bytesread;
            dataConsumer.accept(Optional.of(buffer));
        }
        onFinished.run();
        dataConsumer.accept(Optional.empty());
    }
}