src/java.net.http/share/classes/jdk/internal/net/http/Http1Request.java
branchhttp-client-branch
changeset 56092 fd85b2bf2b0d
parent 56089 42208b2f224e
child 56200 d712a6342387
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Request.java	Wed Feb 07 21:45:37 2018 +0000
@@ -0,0 +1,390 @@
+/*
+ * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.internal.net.http;
+
+import java.io.IOException;
+import java.lang.System.Logger.Level;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.net.InetSocketAddress;
+import java.util.Objects;
+import java.util.concurrent.Flow;
+import java.util.function.BiPredicate;
+import java.net.http.HttpHeaders;
+import java.net.http.HttpRequest;
+import jdk.internal.net.http.Http1Exchange.Http1BodySubscriber;
+import jdk.internal.net.http.common.HttpHeadersImpl;
+import jdk.internal.net.http.common.Log;
+import jdk.internal.net.http.common.Utils;
+
+import static java.nio.charset.StandardCharsets.US_ASCII;
+
+/**
+ *  An HTTP/1.1 request.
+ */
+class Http1Request {
+    private final HttpRequestImpl request;
+    private final Http1Exchange<?> http1Exchange;
+    private final HttpConnection connection;
+    private final HttpRequest.BodyPublisher requestPublisher;
+    private final HttpHeaders userHeaders;
+    private final HttpHeadersImpl systemHeaders;
+    private volatile boolean streaming;
+    private volatile long contentLength;
+
+    Http1Request(HttpRequestImpl request,
+                 Http1Exchange<?> http1Exchange)
+        throws IOException
+    {
+        this.request = request;
+        this.http1Exchange = http1Exchange;
+        this.connection = http1Exchange.connection();
+        this.requestPublisher = request.requestPublisher;  // may be null
+        this.userHeaders = request.getUserHeaders();
+        this.systemHeaders = request.getSystemHeaders();
+    }
+
+    private void logHeaders(String completeHeaders) {
+        if (Log.headers()) {
+            //StringBuilder sb = new StringBuilder(256);
+            //sb.append("REQUEST HEADERS:\n");
+            //Log.dumpHeaders(sb, "    ", systemHeaders);
+            //Log.dumpHeaders(sb, "    ", userHeaders);
+            //Log.logHeaders(sb.toString());
+
+            String s = completeHeaders.replaceAll("\r\n", "\n");
+            Log.logHeaders("REQUEST HEADERS:\n" + s);
+        }
+    }
+
+
+    private void collectHeaders0(StringBuilder sb) {
+        BiPredicate<String,List<String>> filter =
+                connection.headerFilter(request);
+
+        // If we're sending this request through a tunnel,
+        // then don't send any preemptive proxy-* headers that
+        // the authentication filter may have saved in its
+        // cache.
+        collectHeaders1(sb, systemHeaders, filter);
+
+        // If we're sending this request through a tunnel,
+        // don't send any user-supplied proxy-* headers
+        // to the target server.
+        collectHeaders1(sb, userHeaders, filter);
+        sb.append("\r\n");
+    }
+
+    private void collectHeaders1(StringBuilder sb, HttpHeaders headers,
+                                 BiPredicate<String, List<String>> filter) {
+        for (Map.Entry<String,List<String>> entry : headers.map().entrySet()) {
+            String key = entry.getKey();
+            List<String> values = entry.getValue();
+            if (!filter.test(key, values)) continue;
+            for (String value : values) {
+                sb.append(key).append(": ").append(value).append("\r\n");
+            }
+        }
+    }
+
+    private String getPathAndQuery(URI uri) {
+        String path = uri.getPath();
+        String query = uri.getQuery();
+        if (path == null || path.equals("")) {
+            path = "/";
+        }
+        if (query == null) {
+            query = "";
+        }
+        if (query.equals("")) {
+            return path;
+        } else {
+            return path + "?" + query;
+        }
+    }
+
+    private String authorityString(InetSocketAddress addr) {
+        return addr.getHostString() + ":" + addr.getPort();
+    }
+
+    private String hostString() {
+        URI uri = request.uri();
+        int port = uri.getPort();
+        String host = uri.getHost();
+
+        boolean defaultPort;
+        if (port == -1) {
+            defaultPort = true;
+        } else if (request.secure()) {
+            defaultPort = port == 443;
+        } else {
+            defaultPort = port == 80;
+        }
+
+        if (defaultPort) {
+            return host;
+        } else {
+            return host + ":" + Integer.toString(port);
+        }
+    }
+
+    private String requestURI() {
+        URI uri = request.uri();
+        String method = request.method();
+
+        if ((request.proxy() == null && !method.equals("CONNECT"))
+                || request.isWebSocket()) {
+            return getPathAndQuery(uri);
+        }
+        if (request.secure()) {
+            if (request.method().equals("CONNECT")) {
+                // use authority for connect itself
+                return authorityString(request.authority());
+            } else {
+                // requests over tunnel do not require full URL
+                return getPathAndQuery(uri);
+            }
+        }
+        if (request.method().equals("CONNECT")) {
+            // use authority for connect itself
+            return authorityString(request.authority());
+        }
+
+        return uri == null? authorityString(request.authority()) : uri.toString();
+    }
+
+    private boolean finished;
+
+    synchronized boolean finished() {
+        return  finished;
+    }
+
+    synchronized void setFinished() {
+        finished = true;
+    }
+
+    List<ByteBuffer> headers() {
+        if (Log.requests() && request != null) {
+            Log.logRequest(request.toString());
+        }
+        String uriString = requestURI();
+        StringBuilder sb = new StringBuilder(64);
+        sb.append(request.method())
+          .append(' ')
+          .append(uriString)
+          .append(" HTTP/1.1\r\n");
+
+        URI uri = request.uri();
+        if (uri != null) {
+            systemHeaders.setHeader("Host", hostString());
+        }
+        if (requestPublisher == null) {
+            // Not a user request, or maybe a method, e.g. GET, with no body.
+            contentLength = 0;
+        } else {
+            contentLength = requestPublisher.contentLength();
+        }
+
+        if (contentLength == 0) {
+            systemHeaders.setHeader("Content-Length", "0");
+        } else if (contentLength > 0) {
+            systemHeaders.setHeader("Content-Length", Long.toString(contentLength));
+            streaming = false;
+        } else {
+            streaming = true;
+            systemHeaders.setHeader("Transfer-encoding", "chunked");
+        }
+        collectHeaders0(sb);
+        String hs = sb.toString();
+        logHeaders(hs);
+        ByteBuffer b = ByteBuffer.wrap(hs.getBytes(US_ASCII));
+        return List.of(b);
+    }
+
+    Http1BodySubscriber continueRequest()  {
+        Http1BodySubscriber subscriber;
+        if (streaming) {
+            subscriber = new StreamSubscriber();
+            requestPublisher.subscribe(subscriber);
+        } else {
+            if (contentLength == 0)
+                return null;
+
+            subscriber = new FixedContentSubscriber();
+            requestPublisher.subscribe(subscriber);
+        }
+        return subscriber;
+    }
+
+    class StreamSubscriber extends Http1BodySubscriber {
+
+        @Override
+        public void onSubscribe(Flow.Subscription subscription) {
+            if (this.subscription != null) {
+                Throwable t = new IllegalStateException("already subscribed");
+                http1Exchange.appendToOutgoing(t);
+            } else {
+                this.subscription = subscription;
+            }
+        }
+
+        @Override
+        public void onNext(ByteBuffer item) {
+            Objects.requireNonNull(item);
+            if (complete) {
+                Throwable t = new IllegalStateException("subscription already completed");
+                http1Exchange.appendToOutgoing(t);
+            } else {
+                int chunklen = item.remaining();
+                ArrayList<ByteBuffer> l = new ArrayList<>(3);
+                l.add(getHeader(chunklen));
+                l.add(item);
+                l.add(ByteBuffer.wrap(CRLF));
+                http1Exchange.appendToOutgoing(l);
+            }
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+            if (complete)
+                return;
+
+            subscription.cancel();
+            http1Exchange.appendToOutgoing(throwable);
+        }
+
+        @Override
+        public void onComplete() {
+            if (complete) {
+                Throwable t = new IllegalStateException("subscription already completed");
+                http1Exchange.appendToOutgoing(t);
+            } else {
+                ArrayList<ByteBuffer> l = new ArrayList<>(2);
+                l.add(ByteBuffer.wrap(EMPTY_CHUNK_BYTES));
+                l.add(ByteBuffer.wrap(CRLF));
+                complete = true;
+                //setFinished();
+                http1Exchange.appendToOutgoing(l);
+                http1Exchange.appendToOutgoing(COMPLETED);
+                setFinished();  // TODO: before or after,? does it matter?
+
+            }
+        }
+    }
+
+    class FixedContentSubscriber extends Http1BodySubscriber {
+
+        private volatile long contentWritten;
+
+        @Override
+        public void onSubscribe(Flow.Subscription subscription) {
+            if (this.subscription != null) {
+                Throwable t = new IllegalStateException("already subscribed");
+                http1Exchange.appendToOutgoing(t);
+            } else {
+                this.subscription = subscription;
+            }
+        }
+
+        @Override
+        public void onNext(ByteBuffer item) {
+            debug.log(Level.DEBUG, "onNext");
+            Objects.requireNonNull(item);
+            if (complete) {
+                Throwable t = new IllegalStateException("subscription already completed");
+                http1Exchange.appendToOutgoing(t);
+            } else {
+                long writing = item.remaining();
+                long written = (contentWritten += writing);
+
+                if (written > contentLength) {
+                    subscription.cancel();
+                    String msg = connection.getConnectionFlow()
+                                  + " [" + Thread.currentThread().getName() +"] "
+                                  + "Too many bytes in request body. Expected: "
+                                  + contentLength + ", got: " + written;
+                    http1Exchange.appendToOutgoing(new IOException(msg));
+                } else {
+                    http1Exchange.appendToOutgoing(List.of(item));
+                }
+            }
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+            debug.log(Level.DEBUG, "onError");
+            if (complete)  // TODO: error?
+                return;
+
+            subscription.cancel();
+            http1Exchange.appendToOutgoing(throwable);
+        }
+
+        @Override
+        public void onComplete() {
+            debug.log(Level.DEBUG, "onComplete");
+            if (complete) {
+                Throwable t = new IllegalStateException("subscription already completed");
+                http1Exchange.appendToOutgoing(t);
+            } else {
+                complete = true;
+                long written = contentWritten;
+                if (contentLength > written) {
+                    subscription.cancel();
+                    Throwable t = new IOException(connection.getConnectionFlow()
+                                         + " [" + Thread.currentThread().getName() +"] "
+                                         + "Too few bytes returned by the publisher ("
+                                                  + written + "/"
+                                                  + contentLength + ")");
+                    http1Exchange.appendToOutgoing(t);
+                } else {
+                    http1Exchange.appendToOutgoing(COMPLETED);
+                }
+            }
+        }
+    }
+
+    private static final byte[] CRLF = {'\r', '\n'};
+    private static final byte[] EMPTY_CHUNK_BYTES = {'0', '\r', '\n'};
+
+    /** Returns a header for a particular chunk size */
+    private static ByteBuffer getHeader(int size) {
+        String hexStr = Integer.toHexString(size);
+        byte[] hexBytes = hexStr.getBytes(US_ASCII);
+        byte[] header = new byte[hexStr.length()+2];
+        System.arraycopy(hexBytes, 0, header, 0, hexBytes.length);
+        header[hexBytes.length] = CRLF[0];
+        header[hexBytes.length+1] = CRLF[1];
+        return ByteBuffer.wrap(header);
+    }
+
+    static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
+    final System.Logger  debug = Utils.getDebugLogger(this::toString, DEBUG);
+
+}