src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/Http1Request.java
branchhttp-client-branch
changeset 56089 42208b2f224e
parent 56088 38fac6d0521d
child 56090 5c7fb702948a
equal deleted inserted replaced
56088:38fac6d0521d 56089:42208b2f224e
     1 /*
       
     2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.incubator.http.internal;
       
    27 
       
    28 import java.io.IOException;
       
    29 import java.lang.System.Logger.Level;
       
    30 import java.net.URI;
       
    31 import java.nio.ByteBuffer;
       
    32 import java.util.ArrayList;
       
    33 import java.util.List;
       
    34 import java.util.Map;
       
    35 import java.net.InetSocketAddress;
       
    36 import java.util.Objects;
       
    37 import java.util.concurrent.Flow;
       
    38 import java.util.function.BiPredicate;
       
    39 import jdk.incubator.http.HttpHeaders;
       
    40 import jdk.incubator.http.HttpRequest;
       
    41 import jdk.incubator.http.internal.Http1Exchange.Http1BodySubscriber;
       
    42 import jdk.incubator.http.internal.common.HttpHeadersImpl;
       
    43 import jdk.incubator.http.internal.common.Log;
       
    44 import jdk.incubator.http.internal.common.Utils;
       
    45 
       
    46 import static java.nio.charset.StandardCharsets.US_ASCII;
       
    47 
       
    48 /**
       
    49  *  An HTTP/1.1 request.
       
    50  */
       
    51 class Http1Request {
       
    52     private final HttpRequestImpl request;
       
    53     private final Http1Exchange<?> http1Exchange;
       
    54     private final HttpConnection connection;
       
    55     private final HttpRequest.BodyPublisher requestPublisher;
       
    56     private final HttpHeaders userHeaders;
       
    57     private final HttpHeadersImpl systemHeaders;
       
    58     private volatile boolean streaming;
       
    59     private volatile long contentLength;
       
    60 
       
    61     Http1Request(HttpRequestImpl request,
       
    62                  Http1Exchange<?> http1Exchange)
       
    63         throws IOException
       
    64     {
       
    65         this.request = request;
       
    66         this.http1Exchange = http1Exchange;
       
    67         this.connection = http1Exchange.connection();
       
    68         this.requestPublisher = request.requestPublisher;  // may be null
       
    69         this.userHeaders = request.getUserHeaders();
       
    70         this.systemHeaders = request.getSystemHeaders();
       
    71     }
       
    72 
       
    73     private void logHeaders(String completeHeaders) {
       
    74         if (Log.headers()) {
       
    75             //StringBuilder sb = new StringBuilder(256);
       
    76             //sb.append("REQUEST HEADERS:\n");
       
    77             //Log.dumpHeaders(sb, "    ", systemHeaders);
       
    78             //Log.dumpHeaders(sb, "    ", userHeaders);
       
    79             //Log.logHeaders(sb.toString());
       
    80 
       
    81             String s = completeHeaders.replaceAll("\r\n", "\n");
       
    82             Log.logHeaders("REQUEST HEADERS:\n" + s);
       
    83         }
       
    84     }
       
    85 
       
    86 
       
    87     private void collectHeaders0(StringBuilder sb) {
       
    88         BiPredicate<String,List<String>> filter =
       
    89                 connection.headerFilter(request);
       
    90 
       
    91         // If we're sending this request through a tunnel,
       
    92         // then don't send any preemptive proxy-* headers that
       
    93         // the authentication filter may have saved in its
       
    94         // cache.
       
    95         collectHeaders1(sb, systemHeaders, filter);
       
    96 
       
    97         // If we're sending this request through a tunnel,
       
    98         // don't send any user-supplied proxy-* headers
       
    99         // to the target server.
       
   100         collectHeaders1(sb, userHeaders, filter);
       
   101         sb.append("\r\n");
       
   102     }
       
   103 
       
   104     private void collectHeaders1(StringBuilder sb, HttpHeaders headers,
       
   105                                  BiPredicate<String, List<String>> filter) {
       
   106         for (Map.Entry<String,List<String>> entry : headers.map().entrySet()) {
       
   107             String key = entry.getKey();
       
   108             List<String> values = entry.getValue();
       
   109             if (!filter.test(key, values)) continue;
       
   110             for (String value : values) {
       
   111                 sb.append(key).append(": ").append(value).append("\r\n");
       
   112             }
       
   113         }
       
   114     }
       
   115 
       
   116     private String getPathAndQuery(URI uri) {
       
   117         String path = uri.getPath();
       
   118         String query = uri.getQuery();
       
   119         if (path == null || path.equals("")) {
       
   120             path = "/";
       
   121         }
       
   122         if (query == null) {
       
   123             query = "";
       
   124         }
       
   125         if (query.equals("")) {
       
   126             return path;
       
   127         } else {
       
   128             return path + "?" + query;
       
   129         }
       
   130     }
       
   131 
       
   132     private String authorityString(InetSocketAddress addr) {
       
   133         return addr.getHostString() + ":" + addr.getPort();
       
   134     }
       
   135 
       
   136     private String hostString() {
       
   137         URI uri = request.uri();
       
   138         int port = uri.getPort();
       
   139         String host = uri.getHost();
       
   140 
       
   141         boolean defaultPort;
       
   142         if (port == -1) {
       
   143             defaultPort = true;
       
   144         } else if (request.secure()) {
       
   145             defaultPort = port == 443;
       
   146         } else {
       
   147             defaultPort = port == 80;
       
   148         }
       
   149 
       
   150         if (defaultPort) {
       
   151             return host;
       
   152         } else {
       
   153             return host + ":" + Integer.toString(port);
       
   154         }
       
   155     }
       
   156 
       
   157     private String requestURI() {
       
   158         URI uri = request.uri();
       
   159         String method = request.method();
       
   160 
       
   161         if ((request.proxy() == null && !method.equals("CONNECT"))
       
   162                 || request.isWebSocket()) {
       
   163             return getPathAndQuery(uri);
       
   164         }
       
   165         if (request.secure()) {
       
   166             if (request.method().equals("CONNECT")) {
       
   167                 // use authority for connect itself
       
   168                 return authorityString(request.authority());
       
   169             } else {
       
   170                 // requests over tunnel do not require full URL
       
   171                 return getPathAndQuery(uri);
       
   172             }
       
   173         }
       
   174         if (request.method().equals("CONNECT")) {
       
   175             // use authority for connect itself
       
   176             return authorityString(request.authority());
       
   177         }
       
   178 
       
   179         return uri == null? authorityString(request.authority()) : uri.toString();
       
   180     }
       
   181 
       
   182     private boolean finished;
       
   183 
       
   184     synchronized boolean finished() {
       
   185         return  finished;
       
   186     }
       
   187 
       
   188     synchronized void setFinished() {
       
   189         finished = true;
       
   190     }
       
   191 
       
   192     List<ByteBuffer> headers() {
       
   193         if (Log.requests() && request != null) {
       
   194             Log.logRequest(request.toString());
       
   195         }
       
   196         String uriString = requestURI();
       
   197         StringBuilder sb = new StringBuilder(64);
       
   198         sb.append(request.method())
       
   199           .append(' ')
       
   200           .append(uriString)
       
   201           .append(" HTTP/1.1\r\n");
       
   202 
       
   203         URI uri = request.uri();
       
   204         if (uri != null) {
       
   205             systemHeaders.setHeader("Host", hostString());
       
   206         }
       
   207         if (requestPublisher == null) {
       
   208             // Not a user request, or maybe a method, e.g. GET, with no body.
       
   209             contentLength = 0;
       
   210         } else {
       
   211             contentLength = requestPublisher.contentLength();
       
   212         }
       
   213 
       
   214         if (contentLength == 0) {
       
   215             systemHeaders.setHeader("Content-Length", "0");
       
   216         } else if (contentLength > 0) {
       
   217             systemHeaders.setHeader("Content-Length", Long.toString(contentLength));
       
   218             streaming = false;
       
   219         } else {
       
   220             streaming = true;
       
   221             systemHeaders.setHeader("Transfer-encoding", "chunked");
       
   222         }
       
   223         collectHeaders0(sb);
       
   224         String hs = sb.toString();
       
   225         logHeaders(hs);
       
   226         ByteBuffer b = ByteBuffer.wrap(hs.getBytes(US_ASCII));
       
   227         return List.of(b);
       
   228     }
       
   229 
       
   230     Http1BodySubscriber continueRequest()  {
       
   231         Http1BodySubscriber subscriber;
       
   232         if (streaming) {
       
   233             subscriber = new StreamSubscriber();
       
   234             requestPublisher.subscribe(subscriber);
       
   235         } else {
       
   236             if (contentLength == 0)
       
   237                 return null;
       
   238 
       
   239             subscriber = new FixedContentSubscriber();
       
   240             requestPublisher.subscribe(subscriber);
       
   241         }
       
   242         return subscriber;
       
   243     }
       
   244 
       
   245     class StreamSubscriber extends Http1BodySubscriber {
       
   246 
       
   247         @Override
       
   248         public void onSubscribe(Flow.Subscription subscription) {
       
   249             if (this.subscription != null) {
       
   250                 Throwable t = new IllegalStateException("already subscribed");
       
   251                 http1Exchange.appendToOutgoing(t);
       
   252             } else {
       
   253                 this.subscription = subscription;
       
   254             }
       
   255         }
       
   256 
       
   257         @Override
       
   258         public void onNext(ByteBuffer item) {
       
   259             Objects.requireNonNull(item);
       
   260             if (complete) {
       
   261                 Throwable t = new IllegalStateException("subscription already completed");
       
   262                 http1Exchange.appendToOutgoing(t);
       
   263             } else {
       
   264                 int chunklen = item.remaining();
       
   265                 ArrayList<ByteBuffer> l = new ArrayList<>(3);
       
   266                 l.add(getHeader(chunklen));
       
   267                 l.add(item);
       
   268                 l.add(ByteBuffer.wrap(CRLF));
       
   269                 http1Exchange.appendToOutgoing(l);
       
   270             }
       
   271         }
       
   272 
       
   273         @Override
       
   274         public void onError(Throwable throwable) {
       
   275             if (complete)
       
   276                 return;
       
   277 
       
   278             subscription.cancel();
       
   279             http1Exchange.appendToOutgoing(throwable);
       
   280         }
       
   281 
       
   282         @Override
       
   283         public void onComplete() {
       
   284             if (complete) {
       
   285                 Throwable t = new IllegalStateException("subscription already completed");
       
   286                 http1Exchange.appendToOutgoing(t);
       
   287             } else {
       
   288                 ArrayList<ByteBuffer> l = new ArrayList<>(2);
       
   289                 l.add(ByteBuffer.wrap(EMPTY_CHUNK_BYTES));
       
   290                 l.add(ByteBuffer.wrap(CRLF));
       
   291                 complete = true;
       
   292                 //setFinished();
       
   293                 http1Exchange.appendToOutgoing(l);
       
   294                 http1Exchange.appendToOutgoing(COMPLETED);
       
   295                 setFinished();  // TODO: before or after,? does it matter?
       
   296 
       
   297             }
       
   298         }
       
   299     }
       
   300 
       
   301     class FixedContentSubscriber extends Http1BodySubscriber {
       
   302 
       
   303         private volatile long contentWritten;
       
   304 
       
   305         @Override
       
   306         public void onSubscribe(Flow.Subscription subscription) {
       
   307             if (this.subscription != null) {
       
   308                 Throwable t = new IllegalStateException("already subscribed");
       
   309                 http1Exchange.appendToOutgoing(t);
       
   310             } else {
       
   311                 this.subscription = subscription;
       
   312             }
       
   313         }
       
   314 
       
   315         @Override
       
   316         public void onNext(ByteBuffer item) {
       
   317             debug.log(Level.DEBUG, "onNext");
       
   318             Objects.requireNonNull(item);
       
   319             if (complete) {
       
   320                 Throwable t = new IllegalStateException("subscription already completed");
       
   321                 http1Exchange.appendToOutgoing(t);
       
   322             } else {
       
   323                 long writing = item.remaining();
       
   324                 long written = (contentWritten += writing);
       
   325 
       
   326                 if (written > contentLength) {
       
   327                     subscription.cancel();
       
   328                     String msg = connection.getConnectionFlow()
       
   329                                   + " [" + Thread.currentThread().getName() +"] "
       
   330                                   + "Too many bytes in request body. Expected: "
       
   331                                   + contentLength + ", got: " + written;
       
   332                     http1Exchange.appendToOutgoing(new IOException(msg));
       
   333                 } else {
       
   334                     http1Exchange.appendToOutgoing(List.of(item));
       
   335                 }
       
   336             }
       
   337         }
       
   338 
       
   339         @Override
       
   340         public void onError(Throwable throwable) {
       
   341             debug.log(Level.DEBUG, "onError");
       
   342             if (complete)  // TODO: error?
       
   343                 return;
       
   344 
       
   345             subscription.cancel();
       
   346             http1Exchange.appendToOutgoing(throwable);
       
   347         }
       
   348 
       
   349         @Override
       
   350         public void onComplete() {
       
   351             debug.log(Level.DEBUG, "onComplete");
       
   352             if (complete) {
       
   353                 Throwable t = new IllegalStateException("subscription already completed");
       
   354                 http1Exchange.appendToOutgoing(t);
       
   355             } else {
       
   356                 complete = true;
       
   357                 long written = contentWritten;
       
   358                 if (contentLength > written) {
       
   359                     subscription.cancel();
       
   360                     Throwable t = new IOException(connection.getConnectionFlow()
       
   361                                          + " [" + Thread.currentThread().getName() +"] "
       
   362                                          + "Too few bytes returned by the publisher ("
       
   363                                                   + written + "/"
       
   364                                                   + contentLength + ")");
       
   365                     http1Exchange.appendToOutgoing(t);
       
   366                 } else {
       
   367                     http1Exchange.appendToOutgoing(COMPLETED);
       
   368                 }
       
   369             }
       
   370         }
       
   371     }
       
   372 
       
   373     private static final byte[] CRLF = {'\r', '\n'};
       
   374     private static final byte[] EMPTY_CHUNK_BYTES = {'0', '\r', '\n'};
       
   375 
       
   376     /** Returns a header for a particular chunk size */
       
   377     private static ByteBuffer getHeader(int size) {
       
   378         String hexStr = Integer.toHexString(size);
       
   379         byte[] hexBytes = hexStr.getBytes(US_ASCII);
       
   380         byte[] header = new byte[hexStr.length()+2];
       
   381         System.arraycopy(hexBytes, 0, header, 0, hexBytes.length);
       
   382         header[hexBytes.length] = CRLF[0];
       
   383         header[hexBytes.length+1] = CRLF[1];
       
   384         return ByteBuffer.wrap(header);
       
   385     }
       
   386 
       
   387     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
       
   388     final System.Logger  debug = Utils.getDebugLogger(this::toString, DEBUG);
       
   389 
       
   390 }