src/java.net.http/share/classes/jdk/internal/net/http/HttpConnection.java
branchhttp-client-branch
changeset 56092 fd85b2bf2b0d
parent 56089 42208b2f224e
child 56128 249a863b0aca
equal deleted inserted replaced
56091:aedd6133e7a0 56092:fd85b2bf2b0d
       
     1 /*
       
     2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.internal.net.http;
       
    27 
       
    28 import java.io.Closeable;
       
    29 import java.io.IOException;
       
    30 import java.lang.System.Logger.Level;
       
    31 import java.net.InetSocketAddress;
       
    32 import java.nio.ByteBuffer;
       
    33 import java.nio.channels.SocketChannel;
       
    34 import java.util.Arrays;
       
    35 import java.util.IdentityHashMap;
       
    36 import java.util.List;
       
    37 import java.util.Map;
       
    38 import java.util.TreeMap;
       
    39 import java.util.concurrent.CompletableFuture;
       
    40 import java.util.concurrent.CompletionStage;
       
    41 import java.util.concurrent.ConcurrentLinkedDeque;
       
    42 import java.util.concurrent.Flow;
       
    43 import java.util.function.BiPredicate;
       
    44 import java.util.function.Predicate;
       
    45 import java.net.http.HttpClient;
       
    46 import java.net.http.HttpClient.Version;
       
    47 import java.net.http.HttpHeaders;
       
    48 import jdk.internal.net.http.common.Demand;
       
    49 import jdk.internal.net.http.common.FlowTube;
       
    50 import jdk.internal.net.http.common.SequentialScheduler;
       
    51 import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter;
       
    52 import jdk.internal.net.http.common.Log;
       
    53 import jdk.internal.net.http.common.Utils;
       
    54 import static java.net.http.HttpClient.Version.HTTP_2;
       
    55 
       
    56 /**
       
    57  * Wraps socket channel layer and takes care of SSL also.
       
    58  *
       
    59  * Subtypes are:
       
    60  *      PlainHttpConnection: regular direct TCP connection to server
       
    61  *      PlainProxyConnection: plain text proxy connection
       
    62  *      PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server
       
    63  *      AsyncSSLConnection: TLS channel direct to server
       
    64  *      AsyncSSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel
       
    65  */
       
    66 abstract class HttpConnection implements Closeable {
       
    67 
       
    68     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
       
    69     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
       
    70     final static System.Logger DEBUG_LOGGER = Utils.getDebugLogger(
       
    71             () -> "HttpConnection(SocketTube(?))", DEBUG);
       
    72 
       
    73     /** The address this connection is connected to. Could be a server or a proxy. */
       
    74     final InetSocketAddress address;
       
    75     private final HttpClientImpl client;
       
    76     private final TrailingOperations trailingOperations;
       
    77 
       
    78     HttpConnection(InetSocketAddress address, HttpClientImpl client) {
       
    79         this.address = address;
       
    80         this.client = client;
       
    81         trailingOperations = new TrailingOperations();
       
    82     }
       
    83 
       
    84     private static final class TrailingOperations {
       
    85         private final Map<CompletionStage<?>, Boolean> operations =
       
    86                 new IdentityHashMap<>();
       
    87         void add(CompletionStage<?> cf) {
       
    88             synchronized(operations) {
       
    89                 cf.whenComplete((r,t)-> remove(cf));
       
    90                 operations.put(cf, Boolean.TRUE);
       
    91             }
       
    92         }
       
    93         boolean remove(CompletionStage<?> cf) {
       
    94             synchronized(operations) {
       
    95                 return operations.remove(cf);
       
    96             }
       
    97         }
       
    98     }
       
    99 
       
   100     final void addTrailingOperation(CompletionStage<?> cf) {
       
   101         trailingOperations.add(cf);
       
   102     }
       
   103 
       
   104 //    final void removeTrailingOperation(CompletableFuture<?> cf) {
       
   105 //        trailingOperations.remove(cf);
       
   106 //    }
       
   107 
       
   108     final HttpClientImpl client() {
       
   109         return client;
       
   110     }
       
   111 
       
   112     //public abstract void connect() throws IOException, InterruptedException;
       
   113 
       
   114     public abstract CompletableFuture<Void> connectAsync();
       
   115 
       
   116     /** Tells whether, or not, this connection is connected to its destination. */
       
   117     abstract boolean connected();
       
   118 
       
   119     /** Tells whether, or not, this connection is secure ( over SSL ) */
       
   120     abstract boolean isSecure();
       
   121 
       
   122     /** Tells whether, or not, this connection is proxied. */
       
   123     abstract boolean isProxied();
       
   124 
       
   125     /** Tells whether, or not, this connection is open. */
       
   126     final boolean isOpen() {
       
   127         return channel().isOpen() &&
       
   128                 (connected() ? !getConnectionFlow().isFinished() : true);
       
   129     }
       
   130 
       
   131     interface HttpPublisher extends FlowTube.TubePublisher {
       
   132         void enqueue(List<ByteBuffer> buffers) throws IOException;
       
   133         void enqueueUnordered(List<ByteBuffer> buffers) throws IOException;
       
   134         void signalEnqueued() throws IOException;
       
   135     }
       
   136 
       
   137     /**
       
   138      * Returns the HTTP publisher associated with this connection.  May be null
       
   139      * if invoked before connecting.
       
   140      */
       
   141     abstract HttpPublisher publisher();
       
   142 
       
   143     // HTTP/2 MUST use TLS version 1.2 or higher for HTTP/2 over TLS
       
   144     private static final Predicate<String> testRequiredHTTP2TLSVersion = proto ->
       
   145             proto.equals("TLSv1.2") || proto.equals("TLSv1.3");
       
   146 
       
   147    /**
       
   148     * Returns true if the given client's SSL parameter protocols contains at
       
   149     * least one TLS version that HTTP/2 requires.
       
   150     */
       
   151    private static final boolean hasRequiredHTTP2TLSVersion(HttpClient client) {
       
   152        String[] protos = client.sslParameters().getProtocols();
       
   153        if (protos != null) {
       
   154            return Arrays.stream(protos).filter(testRequiredHTTP2TLSVersion).findAny().isPresent();
       
   155        } else {
       
   156            return false;
       
   157        }
       
   158    }
       
   159 
       
   160     /**
       
   161      * Factory for retrieving HttpConnections. A connection can be retrieved
       
   162      * from the connection pool, or a new one created if none available.
       
   163      *
       
   164      * The given {@code addr} is the ultimate destination. Any proxies,
       
   165      * etc, are determined from the request. Returns a concrete instance which
       
   166      * is one of the following:
       
   167      *      {@link PlainHttpConnection}
       
   168      *      {@link PlainTunnelingConnection}
       
   169      *
       
   170      * The returned connection, if not from the connection pool, must have its,
       
   171      * connect() or connectAsync() method invoked, which ( when it completes
       
   172      * successfully ) renders the connection usable for requests.
       
   173      */
       
   174     public static HttpConnection getConnection(InetSocketAddress addr,
       
   175                                                HttpClientImpl client,
       
   176                                                HttpRequestImpl request,
       
   177                                                Version version) {
       
   178         HttpConnection c = null;
       
   179         InetSocketAddress proxy = request.proxy();
       
   180         if (proxy != null && proxy.isUnresolved()) {
       
   181             // The default proxy selector may select a proxy whose  address is
       
   182             // unresolved. We must resolve the address before connecting to it.
       
   183             proxy = new InetSocketAddress(proxy.getHostString(), proxy.getPort());
       
   184         }
       
   185         boolean secure = request.secure();
       
   186         ConnectionPool pool = client.connectionPool();
       
   187 
       
   188         if (!secure) {
       
   189             c = pool.getConnection(false, addr, proxy);
       
   190             if (c != null && c.isOpen() /* may have been eof/closed when in the pool */) {
       
   191                 final HttpConnection conn = c;
       
   192                 DEBUG_LOGGER.log(Level.DEBUG, () -> conn.getConnectionFlow()
       
   193                             + ": plain connection retrieved from HTTP/1.1 pool");
       
   194                 return c;
       
   195             } else {
       
   196                 return getPlainConnection(addr, proxy, request, client);
       
   197             }
       
   198         } else {  // secure
       
   199             if (version != HTTP_2) { // only HTTP/1.1 connections are in the pool
       
   200                 c = pool.getConnection(true, addr, proxy);
       
   201             }
       
   202             if (c != null && c.isOpen()) {
       
   203                 final HttpConnection conn = c;
       
   204                 DEBUG_LOGGER.log(Level.DEBUG, () -> conn.getConnectionFlow()
       
   205                             + ": SSL connection retrieved from HTTP/1.1 pool");
       
   206                 return c;
       
   207             } else {
       
   208                 String[] alpn = null;
       
   209                 if (version == HTTP_2 && hasRequiredHTTP2TLSVersion(client)) {
       
   210                     alpn = new String[] { "h2", "http/1.1" };
       
   211                 }
       
   212                 return getSSLConnection(addr, proxy, alpn, request, client);
       
   213             }
       
   214         }
       
   215     }
       
   216 
       
   217     private static HttpConnection getSSLConnection(InetSocketAddress addr,
       
   218                                                    InetSocketAddress proxy,
       
   219                                                    String[] alpn,
       
   220                                                    HttpRequestImpl request,
       
   221                                                    HttpClientImpl client) {
       
   222         if (proxy != null)
       
   223             return new AsyncSSLTunnelConnection(addr, client, alpn, proxy,
       
   224                                                 proxyTunnelHeaders(request));
       
   225         else
       
   226             return new AsyncSSLConnection(addr, client, alpn);
       
   227     }
       
   228 
       
   229     /**
       
   230      * This method is used to build a filter that will accept or
       
   231      * veto (header-name, value) tuple for transmission on the
       
   232      * wire.
       
   233      * The filter is applied to the headers when sending the headers
       
   234      * to the remote party.
       
   235      * Which tuple is accepted/vetoed depends on:
       
   236      * <pre>
       
   237      *    - whether the connection is a tunnel connection
       
   238      *      [talking to a server through a proxy tunnel]
       
   239      *    - whether the method is CONNECT
       
   240      *      [establishing a CONNECT tunnel through a proxy]
       
   241      *    - whether the request is using a proxy
       
   242      *      (and the connection is not a tunnel)
       
   243      *      [talking to a server through a proxy]
       
   244      *    - whether the request is a direct connection to
       
   245      *      a server (no tunnel, no proxy).
       
   246      * </pre>
       
   247      * @param request
       
   248      * @return
       
   249      */
       
   250     BiPredicate<String,List<String>> headerFilter(HttpRequestImpl request) {
       
   251         if (isTunnel()) {
       
   252             // talking to a server through a proxy tunnel
       
   253             // don't send proxy-* headers to a plain server
       
   254             assert !request.isConnect();
       
   255             return Utils.NO_PROXY_HEADERS_FILTER;
       
   256         } else if (request.isConnect()) {
       
   257             // establishing a proxy tunnel
       
   258             // check for proxy tunnel disabled schemes
       
   259             // assert !this.isTunnel();
       
   260             assert request.proxy() == null;
       
   261             return Utils.PROXY_TUNNEL_FILTER;
       
   262         } else if (request.proxy() != null) {
       
   263             // talking to a server through a proxy (no tunnel)
       
   264             // check for proxy disabled schemes
       
   265             // assert !isTunnel() && !request.isConnect();
       
   266             return Utils.PROXY_FILTER;
       
   267         } else {
       
   268             // talking to a server directly (no tunnel, no proxy)
       
   269             // don't send proxy-* headers to a plain server
       
   270             // assert request.proxy() == null && !request.isConnect();
       
   271             return Utils.NO_PROXY_HEADERS_FILTER;
       
   272         }
       
   273     }
       
   274 
       
   275     // Composes a new immutable HttpHeaders that combines the
       
   276     // user and system header but only keeps those headers that
       
   277     // start with "proxy-"
       
   278     private static HttpHeaders proxyTunnelHeaders(HttpRequestImpl request) {
       
   279         Map<String, List<String>> combined = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
       
   280         combined.putAll(request.getSystemHeaders().map());
       
   281         combined.putAll(request.headers().map()); // let user override system
       
   282 
       
   283         // keep only proxy-* - and also strip authorization headers
       
   284         // for disabled schemes
       
   285         return ImmutableHeaders.of(combined, Utils.PROXY_TUNNEL_FILTER);
       
   286     }
       
   287 
       
   288     /* Returns either a plain HTTP connection or a plain tunnelling connection
       
   289      * for proxied WebSocket */
       
   290     private static HttpConnection getPlainConnection(InetSocketAddress addr,
       
   291                                                      InetSocketAddress proxy,
       
   292                                                      HttpRequestImpl request,
       
   293                                                      HttpClientImpl client) {
       
   294         if (request.isWebSocket() && proxy != null)
       
   295             return new PlainTunnelingConnection(addr, proxy, client,
       
   296                                                 proxyTunnelHeaders(request));
       
   297 
       
   298         if (proxy == null)
       
   299             return new PlainHttpConnection(addr, client);
       
   300         else
       
   301             return new PlainProxyConnection(proxy, client);
       
   302     }
       
   303 
       
   304     void closeOrReturnToCache(HttpHeaders hdrs) {
       
   305         if (hdrs == null) {
       
   306             // the connection was closed by server, eof
       
   307             close();
       
   308             return;
       
   309         }
       
   310         if (!isOpen()) {
       
   311             return;
       
   312         }
       
   313         HttpClientImpl client = client();
       
   314         if (client == null) {
       
   315             close();
       
   316             return;
       
   317         }
       
   318         ConnectionPool pool = client.connectionPool();
       
   319         boolean keepAlive = hdrs.firstValue("Connection")
       
   320                 .map((s) -> !s.equalsIgnoreCase("close"))
       
   321                 .orElse(true);
       
   322 
       
   323         if (keepAlive) {
       
   324             Log.logTrace("Returning connection to the pool: {0}", this);
       
   325             pool.returnToPool(this);
       
   326         } else {
       
   327             close();
       
   328         }
       
   329     }
       
   330 
       
   331     /* Tells whether or not this connection is a tunnel through a proxy */
       
   332     boolean isTunnel() { return false; }
       
   333 
       
   334     abstract SocketChannel channel();
       
   335 
       
   336     final InetSocketAddress address() {
       
   337         return address;
       
   338     }
       
   339 
       
   340     abstract ConnectionPool.CacheKey cacheKey();
       
   341 
       
   342     /**
       
   343      * Closes this connection, by returning the socket to its connection pool.
       
   344      */
       
   345     @Override
       
   346     public abstract void close();
       
   347 
       
   348     abstract void shutdownInput() throws IOException;
       
   349 
       
   350     abstract void shutdownOutput() throws IOException;
       
   351 
       
   352     // Support for WebSocket/RawChannelImpl which unfortunately
       
   353     // still depends on synchronous read/writes.
       
   354     // It should be removed when RawChannelImpl moves to using asynchronous APIs.
       
   355     abstract static class DetachedConnectionChannel implements Closeable {
       
   356         DetachedConnectionChannel() {}
       
   357         abstract SocketChannel channel();
       
   358         abstract long write(ByteBuffer[] buffers, int start, int number)
       
   359                 throws IOException;
       
   360         abstract void shutdownInput() throws IOException;
       
   361         abstract void shutdownOutput() throws IOException;
       
   362         abstract ByteBuffer read() throws IOException;
       
   363         @Override
       
   364         public abstract void close();
       
   365         @Override
       
   366         public String toString() {
       
   367             return this.getClass().getSimpleName() + ": " + channel().toString();
       
   368         }
       
   369     }
       
   370 
       
   371     // Support for WebSocket/RawChannelImpl which unfortunately
       
   372     // still depends on synchronous read/writes.
       
   373     // It should be removed when RawChannelImpl moves to using asynchronous APIs.
       
   374     abstract DetachedConnectionChannel detachChannel();
       
   375 
       
   376     abstract FlowTube getConnectionFlow();
       
   377 
       
   378     /**
       
   379      * A publisher that makes it possible to publish (write)
       
   380      * ordered (normal priority) and unordered (high priority)
       
   381      * buffers downstream.
       
   382      */
       
   383     final class PlainHttpPublisher implements HttpPublisher {
       
   384         final Object reading;
       
   385         PlainHttpPublisher() {
       
   386             this(new Object());
       
   387         }
       
   388         PlainHttpPublisher(Object readingLock) {
       
   389             this.reading = readingLock;
       
   390         }
       
   391         final ConcurrentLinkedDeque<List<ByteBuffer>> queue = new ConcurrentLinkedDeque<>();
       
   392         volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber;
       
   393         volatile HttpWriteSubscription subscription;
       
   394         final SequentialScheduler writeScheduler =
       
   395                     new SequentialScheduler(this::flushTask);
       
   396         @Override
       
   397         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
       
   398             synchronized (reading) {
       
   399                 //assert this.subscription == null;
       
   400                 //assert this.subscriber == null;
       
   401                 if (subscription == null) {
       
   402                     subscription = new HttpWriteSubscription();
       
   403                 }
       
   404                 this.subscriber = subscriber;
       
   405             }
       
   406             // TODO: should we do this in the flow?
       
   407             subscriber.onSubscribe(subscription);
       
   408             signal();
       
   409         }
       
   410 
       
   411         void flushTask(DeferredCompleter completer) {
       
   412             try {
       
   413                 HttpWriteSubscription sub = subscription;
       
   414                 if (sub != null) sub.flush();
       
   415             } finally {
       
   416                 completer.complete();
       
   417             }
       
   418         }
       
   419 
       
   420         void signal() {
       
   421             writeScheduler.runOrSchedule();
       
   422         }
       
   423 
       
   424         final class HttpWriteSubscription implements Flow.Subscription {
       
   425             final Demand demand = new Demand();
       
   426 
       
   427             @Override
       
   428             public void request(long n) {
       
   429                 if (n <= 0) throw new IllegalArgumentException("non-positive request");
       
   430                 demand.increase(n);
       
   431                 debug.log(Level.DEBUG, () -> "HttpPublisher: got request of "
       
   432                             + n + " from "
       
   433                             + getConnectionFlow());
       
   434                 writeScheduler.runOrSchedule();
       
   435             }
       
   436 
       
   437             @Override
       
   438             public void cancel() {
       
   439                 debug.log(Level.DEBUG, () -> "HttpPublisher: cancelled by "
       
   440                           + getConnectionFlow());
       
   441             }
       
   442 
       
   443             void flush() {
       
   444                 while (!queue.isEmpty() && demand.tryDecrement()) {
       
   445                     List<ByteBuffer> elem = queue.poll();
       
   446                     debug.log(Level.DEBUG, () -> "HttpPublisher: sending "
       
   447                                 + Utils.remaining(elem) + " bytes ("
       
   448                                 + elem.size() + " buffers) to "
       
   449                                 + getConnectionFlow());
       
   450                     subscriber.onNext(elem);
       
   451                 }
       
   452             }
       
   453         }
       
   454 
       
   455         @Override
       
   456         public void enqueue(List<ByteBuffer> buffers) throws IOException {
       
   457             queue.add(buffers);
       
   458             int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();
       
   459             debug.log(Level.DEBUG, "added %d bytes to the write queue", bytes);
       
   460         }
       
   461 
       
   462         @Override
       
   463         public void enqueueUnordered(List<ByteBuffer> buffers) throws IOException {
       
   464             // Unordered frames are sent before existing frames.
       
   465             int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();
       
   466             queue.addFirst(buffers);
       
   467             debug.log(Level.DEBUG, "inserted %d bytes in the write queue", bytes);
       
   468         }
       
   469 
       
   470         @Override
       
   471         public void signalEnqueued() throws IOException {
       
   472             debug.log(Level.DEBUG, "signalling the publisher of the write queue");
       
   473             signal();
       
   474         }
       
   475     }
       
   476 
       
   477     String dbgTag = null;
       
   478     final String dbgString() {
       
   479         FlowTube flow = getConnectionFlow();
       
   480         String tag = dbgTag;
       
   481         if (tag == null && flow != null) {
       
   482             dbgTag = tag = this.getClass().getSimpleName() + "(" + flow + ")";
       
   483         } else if (tag == null) {
       
   484             tag = this.getClass().getSimpleName() + "(?)";
       
   485         }
       
   486         return tag;
       
   487     }
       
   488 
       
   489     @Override
       
   490     public String toString() {
       
   491         return "HttpConnection: " + channel().toString();
       
   492     }
       
   493 }