src/java.net.http/share/classes/jdk/internal/net/http/Http1Request.java
author dfuchs
Tue, 15 Jan 2019 11:34:20 +0000
changeset 53300 54aa3ea04fe8
parent 52902 e3398b2e1ab0
child 54579 270557b396eb
permissions -rw-r--r--
8216974: HttpConnection not returned to the pool after 204 response Summary: MultiExchange now call nullBody() on Exchange after receiving 204 Reviewed-by: chegar

/*
 * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package jdk.internal.net.http;

import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.net.InetSocketAddress;
import java.util.Objects;
import java.util.concurrent.Flow;
import java.util.function.BiPredicate;
import java.net.http.HttpHeaders;
import java.net.http.HttpRequest;
import jdk.internal.net.http.Http1Exchange.Http1BodySubscriber;
import jdk.internal.net.http.common.HttpHeadersBuilder;
import jdk.internal.net.http.common.Log;
import jdk.internal.net.http.common.Logger;
import jdk.internal.net.http.common.Utils;

import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.US_ASCII;

/**
 *  An HTTP/1.1 request.
 */
class Http1Request {

    private static final String COOKIE_HEADER = "Cookie";
    private static final BiPredicate<String,String> NOCOOKIES =
            (k,v) -> !COOKIE_HEADER.equalsIgnoreCase(k);

    private final HttpRequestImpl request;
    private final Http1Exchange<?> http1Exchange;
    private final HttpConnection connection;
    private final HttpRequest.BodyPublisher requestPublisher;
    private volatile HttpHeaders userHeaders;
    private final HttpHeadersBuilder systemHeadersBuilder;
    private volatile boolean streaming;
    private volatile long contentLength;

    Http1Request(HttpRequestImpl request,
                 Http1Exchange<?> http1Exchange)
        throws IOException
    {
        this.request = request;
        this.http1Exchange = http1Exchange;
        this.connection = http1Exchange.connection();
        this.requestPublisher = request.requestPublisher;  // may be null
        this.userHeaders = request.getUserHeaders();
        this.systemHeadersBuilder = request.getSystemHeadersBuilder();
    }

    private void logHeaders(String completeHeaders) {
        if (Log.headers()) {
            //StringBuilder sb = new StringBuilder(256);
            //sb.append("REQUEST HEADERS:\n");
            //Log.dumpHeaders(sb, "    ", systemHeaders);
            //Log.dumpHeaders(sb, "    ", userHeaders);
            //Log.logHeaders(sb.toString());

            String s = completeHeaders.replaceAll("\r\n", "\n");
            if (s.endsWith("\n\n")) s = s.substring(0, s.length() - 2);
            Log.logHeaders("REQUEST HEADERS:\n{0}\n", s);
        }
    }


    public void collectHeaders0(StringBuilder sb) {
        BiPredicate<String,String> filter =
                connection.headerFilter(request);

        // Filter out 'Cookie:' headers, we will collect them at the end.
        BiPredicate<String,String> nocookies = NOCOOKIES.and(filter);

        HttpHeaders systemHeaders = systemHeadersBuilder.build();
        HttpClient client = http1Exchange.client();

        // Filter overridable headers from userHeaders
        userHeaders = HttpHeaders.of(userHeaders.map(), Utils.CONTEXT_RESTRICTED(client));

        final HttpHeaders uh = userHeaders;

        // Filter any headers from systemHeaders that are set in userHeaders
        systemHeaders = HttpHeaders.of(systemHeaders.map(), (k,v) -> uh.firstValue(k).isEmpty());

        // If we're sending this request through a tunnel,
        // then don't send any preemptive proxy-* headers that
        // the authentication filter may have saved in its
        // cache.
        collectHeaders1(sb, systemHeaders, nocookies);

        // If we're sending this request through a tunnel,
        // don't send any user-supplied proxy-* headers
        // to the target server.
        collectHeaders1(sb, userHeaders, nocookies);

        // Gather all 'Cookie:' headers and concatenate their
        // values in a single line.
        collectCookies(sb, systemHeaders, userHeaders);

        // terminate headers
        sb.append('\r').append('\n');
    }

    // Concatenate any 'Cookie:' header in a single line, as mandated
    // by RFC 6265, section 5.4:
    //
    // <<When the user agent generates an HTTP request, the user agent MUST
    //   NOT attach more than one Cookie header field.>>
    //
    // This constraint is relaxed for the HTTP/2 protocol, which
    // explicitly allows sending multiple Cookie header fields.
    // RFC 7540 section 8.1.2.5:
    //
    // <<To allow for better compression efficiency, the Cookie header
    //   field MAY be split into separate header fields, each with one or
    //   more cookie-pairs.>>
    //
    // This method will therefore concatenate multiple Cookie header field
    // values into a single field, in a similar way than was implemented in
    // the legacy HttpURLConnection.
    //
    // Note that at this point this method performs no further validation
    // on the actual field-values, except to check that they do not contain
    // any illegal character for header field values.
    //
    private void collectCookies(StringBuilder sb,
                                HttpHeaders system,
                                HttpHeaders user) {
        List<String> systemList = system.allValues(COOKIE_HEADER);
        List<String> userList = user.allValues(COOKIE_HEADER);
        boolean found = false;
        if (systemList != null) {
            for (String cookie : systemList) {
                if (!found) {
                    found = true;
                    sb.append(COOKIE_HEADER).append(':').append(' ');
                } else {
                    sb.append(';').append(' ');
                }
                sb.append(cookie);
            }
        }
        if (userList != null) {
            for (String cookie : userList) {
                if (!found) {
                    found = true;
                    sb.append(COOKIE_HEADER).append(':').append(' ');
                } else {
                    sb.append(';').append(' ');
                }
                sb.append(cookie);
            }
        }
        if (found) sb.append('\r').append('\n');
    }

    private void collectHeaders1(StringBuilder sb,
                                 HttpHeaders headers,
                                 BiPredicate<String,String> filter) {
        for (Map.Entry<String,List<String>> entry : headers.map().entrySet()) {
            String key = entry.getKey();
            List<String> values = entry.getValue();
            for (String value : values) {
                if (!filter.test(key, value))
                    continue;
                sb.append(key).append(':').append(' ')
                        .append(value)
                        .append('\r').append('\n');
            }
        }
    }

    private String getPathAndQuery(URI uri) {
        String path = uri.getRawPath();
        String query = uri.getRawQuery();
        if (path == null || path.isEmpty()) {
            path = "/";
        }
        if (query == null) {
            query = "";
        }
        if (query.isEmpty()) {
            return Utils.encode(path);
        } else {
            return Utils.encode(path + "?" + query);
        }
    }

    private String authorityString(InetSocketAddress addr) {
        return addr.getHostString() + ":" + addr.getPort();
    }

    private String hostString() {
        URI uri = request.uri();
        int port = uri.getPort();
        String host = uri.getHost();

        boolean defaultPort;
        if (port == -1) {
            defaultPort = true;
        } else if (request.secure()) {
            defaultPort = port == 443;
        } else {
            defaultPort = port == 80;
        }

        if (defaultPort) {
            return host;
        } else {
            return host + ":" + Integer.toString(port);
        }
    }

    private String requestURI() {
        URI uri = request.uri();
        String method = request.method();

        if ((request.proxy() == null && !method.equals("CONNECT"))
                || request.isWebSocket()) {
            return getPathAndQuery(uri);
        }
        if (request.secure()) {
            if (request.method().equals("CONNECT")) {
                // use authority for connect itself
                return authorityString(request.authority());
            } else {
                // requests over tunnel do not require full URL
                return getPathAndQuery(uri);
            }
        }
        if (request.method().equals("CONNECT")) {
            // use authority for connect itself
            return authorityString(request.authority());
        }

        return uri == null? authorityString(request.authority()) : uri.toString();
    }

    private boolean finished;

    synchronized boolean finished() {
        return  finished;
    }

    synchronized void setFinished() {
        finished = true;
    }

    List<ByteBuffer> headers() {
        if (Log.requests() && request != null) {
            Log.logRequest(request.toString());
        }
        String uriString = requestURI();
        StringBuilder sb = new StringBuilder(64);
        sb.append(request.method())
          .append(' ')
          .append(uriString)
          .append(" HTTP/1.1\r\n");

        URI uri = request.uri();
        if (uri != null) {
            systemHeadersBuilder.setHeader("Host", hostString());
        }
        if (requestPublisher == null) {
            // Not a user request, or maybe a method, e.g. GET, with no body.
            contentLength = 0;
        } else {
            contentLength = requestPublisher.contentLength();
        }

        if (contentLength == 0) {
            systemHeadersBuilder.setHeader("Content-Length", "0");
        } else if (contentLength > 0) {
            systemHeadersBuilder.setHeader("Content-Length", Long.toString(contentLength));
            streaming = false;
        } else {
            streaming = true;
            systemHeadersBuilder.setHeader("Transfer-encoding", "chunked");
        }
        collectHeaders0(sb);
        String hs = sb.toString();
        logHeaders(hs);
        ByteBuffer b = ByteBuffer.wrap(hs.getBytes(US_ASCII));
        return List.of(b);
    }

    Http1BodySubscriber continueRequest()  {
        Http1BodySubscriber subscriber;
        if (streaming) {
            subscriber = new StreamSubscriber();
            requestPublisher.subscribe(subscriber);
        } else {
            if (contentLength == 0)
                return null;

            subscriber = new FixedContentSubscriber();
            requestPublisher.subscribe(subscriber);
        }
        return subscriber;
    }

    final class StreamSubscriber extends Http1BodySubscriber {

        StreamSubscriber() { super(debug); }

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            if (isSubscribed()) {
                Throwable t = new IllegalStateException("already subscribed");
                http1Exchange.appendToOutgoing(t);
            } else {
                setSubscription(subscription);
            }
        }

        @Override
        public void onNext(ByteBuffer item) {
            Objects.requireNonNull(item);
            if (complete) {
                Throwable t = new IllegalStateException("subscription already completed");
                http1Exchange.appendToOutgoing(t);
            } else {
                int chunklen = item.remaining();
                ArrayList<ByteBuffer> l = new ArrayList<>(3);
                l.add(getHeader(chunklen));
                l.add(item);
                l.add(ByteBuffer.wrap(CRLF));
                http1Exchange.appendToOutgoing(l);
            }
        }

        @Override
        public String currentStateMessage() {
            return "streaming request body " + (complete ? "complete" : "incomplete");
        }

        @Override
        public void onError(Throwable throwable) {
            if (complete)
                return;

            cancelSubscription();
            http1Exchange.appendToOutgoing(throwable);
        }

        @Override
        public void onComplete() {
            if (complete) {
                Throwable t = new IllegalStateException("subscription already completed");
                http1Exchange.appendToOutgoing(t);
            } else {
                ArrayList<ByteBuffer> l = new ArrayList<>(2);
                l.add(ByteBuffer.wrap(EMPTY_CHUNK_BYTES));
                l.add(ByteBuffer.wrap(CRLF));
                complete = true;
                //setFinished();
                http1Exchange.appendToOutgoing(l);
                http1Exchange.appendToOutgoing(COMPLETED);
                setFinished();  // TODO: before or after,? does it matter?

            }
        }
    }

    final class FixedContentSubscriber extends Http1BodySubscriber {

        private volatile long contentWritten;
        FixedContentSubscriber() { super(debug); }

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            if (isSubscribed()) {
                Throwable t = new IllegalStateException("already subscribed");
                http1Exchange.appendToOutgoing(t);
            } else {
                setSubscription(subscription);
            }
        }

        @Override
        public void onNext(ByteBuffer item) {
            if (debug.on()) debug.log("onNext");
            Objects.requireNonNull(item);
            if (complete) {
                Throwable t = new IllegalStateException("subscription already completed");
                http1Exchange.appendToOutgoing(t);
            } else {
                long writing = item.remaining();
                long written = (contentWritten += writing);

                if (written > contentLength) {
                    cancelSubscription();
                    String msg = connection.getConnectionFlow()
                                  + " [" + Thread.currentThread().getName() +"] "
                                  + "Too many bytes in request body. Expected: "
                                  + contentLength + ", got: " + written;
                    http1Exchange.appendToOutgoing(new IOException(msg));
                } else {
                    http1Exchange.appendToOutgoing(List.of(item));
                }
            }
        }

        @Override
        public String currentStateMessage() {
            return format("fixed content-length: %d, bytes sent: %d",
                           contentLength, contentWritten);
        }

        @Override
        public void onError(Throwable throwable) {
            if (debug.on()) debug.log("onError");
            if (complete)  // TODO: error?
                return;

            cancelSubscription();
            http1Exchange.appendToOutgoing(throwable);
        }

        @Override
        public void onComplete() {
            if (debug.on()) debug.log("onComplete");
            if (complete) {
                Throwable t = new IllegalStateException("subscription already completed");
                http1Exchange.appendToOutgoing(t);
            } else {
                complete = true;
                long written = contentWritten;
                if (contentLength > written) {
                    cancelSubscription();
                    Throwable t = new IOException(connection.getConnectionFlow()
                                         + " [" + Thread.currentThread().getName() +"] "
                                         + "Too few bytes returned by the publisher ("
                                                  + written + "/"
                                                  + contentLength + ")");
                    http1Exchange.appendToOutgoing(t);
                } else {
                    http1Exchange.appendToOutgoing(COMPLETED);
                }
            }
        }
    }

    private static final byte[] CRLF = {'\r', '\n'};
    private static final byte[] EMPTY_CHUNK_BYTES = {'0', '\r', '\n'};

    /** Returns a header for a particular chunk size */
    private static ByteBuffer getHeader(int size) {
        String hexStr = Integer.toHexString(size);
        byte[] hexBytes = hexStr.getBytes(US_ASCII);
        byte[] header = new byte[hexStr.length()+2];
        System.arraycopy(hexBytes, 0, header, 0, hexBytes.length);
        header[hexBytes.length] = CRLF[0];
        header[hexBytes.length+1] = CRLF[1];
        return ByteBuffer.wrap(header);
    }

    final Logger debug = Utils.getDebugLogger(this::toString, Utils.DEBUG);

}