src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/ConnectionPool.java
branchhttp-client-branch
changeset 56089 42208b2f224e
parent 56088 38fac6d0521d
child 56090 5c7fb702948a
equal deleted inserted replaced
56088:38fac6d0521d 56089:42208b2f224e
     1 /*
       
     2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.incubator.http.internal;
       
    27 
       
    28 import java.io.IOException;
       
    29 import java.lang.System.Logger.Level;
       
    30 import java.net.InetSocketAddress;
       
    31 import java.nio.ByteBuffer;
       
    32 import java.time.Instant;
       
    33 import java.time.temporal.ChronoUnit;
       
    34 import java.util.ArrayList;
       
    35 import java.util.Collections;
       
    36 import java.util.HashMap;
       
    37 import java.util.Iterator;
       
    38 import java.util.LinkedList;
       
    39 import java.util.List;
       
    40 import java.util.ListIterator;
       
    41 import java.util.Objects;
       
    42 import java.util.Optional;
       
    43 import java.util.concurrent.Flow;
       
    44 import java.util.stream.Collectors;
       
    45 import jdk.incubator.http.internal.common.FlowTube;
       
    46 import jdk.incubator.http.internal.common.Utils;
       
    47 
       
    48 /**
       
    49  * Http 1.1 connection pool.
       
    50  */
       
    51 final class ConnectionPool {
       
    52 
       
    53     static final long KEEP_ALIVE = Utils.getIntegerNetProperty(
       
    54             "jdk.httpclient.keepalive.timeout", 1200); // seconds
       
    55     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
       
    56     final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
       
    57 
       
    58     // Pools of idle connections
       
    59 
       
    60     private final HashMap<CacheKey,LinkedList<HttpConnection>> plainPool;
       
    61     private final HashMap<CacheKey,LinkedList<HttpConnection>> sslPool;
       
    62     private final ExpiryList expiryList;
       
    63     private final String dbgTag; // used for debug
       
    64     boolean stopped;
       
    65 
       
    66     /**
       
    67      * Entries in connection pool are keyed by destination address and/or
       
    68      * proxy address:
       
    69      * case 1: plain TCP not via proxy (destination only)
       
    70      * case 2: plain TCP via proxy (proxy only)
       
    71      * case 3: SSL not via proxy (destination only)
       
    72      * case 4: SSL over tunnel (destination and proxy)
       
    73      */
       
    74     static class CacheKey {
       
    75         final InetSocketAddress proxy;
       
    76         final InetSocketAddress destination;
       
    77 
       
    78         CacheKey(InetSocketAddress destination, InetSocketAddress proxy) {
       
    79             this.proxy = proxy;
       
    80             this.destination = destination;
       
    81         }
       
    82 
       
    83         @Override
       
    84         public boolean equals(Object obj) {
       
    85             if (obj == null) {
       
    86                 return false;
       
    87             }
       
    88             if (getClass() != obj.getClass()) {
       
    89                 return false;
       
    90             }
       
    91             final CacheKey other = (CacheKey) obj;
       
    92             if (!Objects.equals(this.proxy, other.proxy)) {
       
    93                 return false;
       
    94             }
       
    95             if (!Objects.equals(this.destination, other.destination)) {
       
    96                 return false;
       
    97             }
       
    98             return true;
       
    99         }
       
   100 
       
   101         @Override
       
   102         public int hashCode() {
       
   103             return Objects.hash(proxy, destination);
       
   104         }
       
   105     }
       
   106 
       
   107     ConnectionPool(long clientId) {
       
   108         this("ConnectionPool("+clientId+")");
       
   109     }
       
   110 
       
   111     /**
       
   112      * There should be one of these per HttpClient.
       
   113      */
       
   114     private ConnectionPool(String tag) {
       
   115         dbgTag = tag;
       
   116         plainPool = new HashMap<>();
       
   117         sslPool = new HashMap<>();
       
   118         expiryList = new ExpiryList();
       
   119     }
       
   120 
       
   121     final String dbgString() {
       
   122         return dbgTag;
       
   123     }
       
   124 
       
   125     synchronized void start() {
       
   126         assert !stopped : "Already stopped";
       
   127     }
       
   128 
       
   129     static CacheKey cacheKey(InetSocketAddress destination,
       
   130                              InetSocketAddress proxy)
       
   131     {
       
   132         return new CacheKey(destination, proxy);
       
   133     }
       
   134 
       
   135     synchronized HttpConnection getConnection(boolean secure,
       
   136                                               InetSocketAddress addr,
       
   137                                               InetSocketAddress proxy) {
       
   138         if (stopped) return null;
       
   139         CacheKey key = new CacheKey(addr, proxy);
       
   140         HttpConnection c = secure ? findConnection(key, sslPool)
       
   141                                   : findConnection(key, plainPool);
       
   142         //System.out.println ("getConnection returning: " + c);
       
   143         return c;
       
   144     }
       
   145 
       
   146     /**
       
   147      * Returns the connection to the pool.
       
   148      */
       
   149     void returnToPool(HttpConnection conn) {
       
   150         returnToPool(conn, Instant.now(), KEEP_ALIVE);
       
   151     }
       
   152 
       
   153     // Called also by whitebox tests
       
   154     void returnToPool(HttpConnection conn, Instant now, long keepAlive) {
       
   155 
       
   156         // Don't call registerCleanupTrigger while holding a lock,
       
   157         // but register it before the connection is added to the pool,
       
   158         // since we don't want to trigger the cleanup if the connection
       
   159         // is not in the pool.
       
   160         CleanupTrigger cleanup = registerCleanupTrigger(conn);
       
   161 
       
   162         // it's possible that cleanup may have been called.
       
   163         synchronized(this) {
       
   164             if (cleanup.isDone()) {
       
   165                 return;
       
   166             } else if (stopped) {
       
   167                 conn.close();
       
   168                 return;
       
   169             }
       
   170             if (conn instanceof PlainHttpConnection) {
       
   171                 putConnection(conn, plainPool);
       
   172             } else {
       
   173                 assert conn.isSecure();
       
   174                 putConnection(conn, sslPool);
       
   175             }
       
   176             expiryList.add(conn, now, keepAlive);
       
   177         }
       
   178         //System.out.println("Return to pool: " + conn);
       
   179     }
       
   180 
       
   181     private CleanupTrigger registerCleanupTrigger(HttpConnection conn) {
       
   182         // Connect the connection flow to a pub/sub pair that will take the
       
   183         // connection out of the pool and close it if anything happens
       
   184         // while the connection is sitting in the pool.
       
   185         CleanupTrigger cleanup = new CleanupTrigger(conn);
       
   186         FlowTube flow = conn.getConnectionFlow();
       
   187         debug.log(Level.DEBUG, "registering %s", cleanup);
       
   188         flow.connectFlows(cleanup, cleanup);
       
   189         return cleanup;
       
   190     }
       
   191 
       
   192     private HttpConnection
       
   193     findConnection(CacheKey key,
       
   194                    HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
       
   195         LinkedList<HttpConnection> l = pool.get(key);
       
   196         if (l == null || l.isEmpty()) {
       
   197             return null;
       
   198         } else {
       
   199             HttpConnection c = l.removeFirst();
       
   200             expiryList.remove(c);
       
   201             return c;
       
   202         }
       
   203     }
       
   204 
       
   205     /* called from cache cleaner only  */
       
   206     private boolean
       
   207     removeFromPool(HttpConnection c,
       
   208                    HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
       
   209         //System.out.println("cacheCleaner removing: " + c);
       
   210         assert Thread.holdsLock(this);
       
   211         CacheKey k = c.cacheKey();
       
   212         List<HttpConnection> l = pool.get(k);
       
   213         if (l == null || l.isEmpty()) {
       
   214             pool.remove(k);
       
   215             return false;
       
   216         }
       
   217         return l.remove(c);
       
   218     }
       
   219 
       
   220     private void
       
   221     putConnection(HttpConnection c,
       
   222                   HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
       
   223         CacheKey key = c.cacheKey();
       
   224         LinkedList<HttpConnection> l = pool.get(key);
       
   225         if (l == null) {
       
   226             l = new LinkedList<>();
       
   227             pool.put(key, l);
       
   228         }
       
   229         l.add(c);
       
   230     }
       
   231 
       
   232     /**
       
   233      * Purge expired connection and return the number of milliseconds
       
   234      * in which the next connection is scheduled to expire.
       
   235      * If no connections are scheduled to be purged return 0.
       
   236      * @return the delay in milliseconds in which the next connection will
       
   237      *         expire.
       
   238      */
       
   239     long purgeExpiredConnectionsAndReturnNextDeadline() {
       
   240         if (!expiryList.purgeMaybeRequired()) return 0;
       
   241         return purgeExpiredConnectionsAndReturnNextDeadline(Instant.now());
       
   242     }
       
   243 
       
   244     // Used for whitebox testing
       
   245     long purgeExpiredConnectionsAndReturnNextDeadline(Instant now) {
       
   246         long nextPurge = 0;
       
   247 
       
   248         // We may be in the process of adding new elements
       
   249         // to the expiry list - but those elements will not
       
   250         // have outlast their keep alive timer yet since we're
       
   251         // just adding them.
       
   252         if (!expiryList.purgeMaybeRequired()) return nextPurge;
       
   253 
       
   254         List<HttpConnection> closelist;
       
   255         synchronized (this) {
       
   256             closelist = expiryList.purgeUntil(now);
       
   257             for (HttpConnection c : closelist) {
       
   258                 if (c instanceof PlainHttpConnection) {
       
   259                     boolean wasPresent = removeFromPool(c, plainPool);
       
   260                     assert wasPresent;
       
   261                 } else {
       
   262                     boolean wasPresent = removeFromPool(c, sslPool);
       
   263                     assert wasPresent;
       
   264                 }
       
   265             }
       
   266             nextPurge = now.until(
       
   267                     expiryList.nextExpiryDeadline().orElse(now),
       
   268                     ChronoUnit.MILLIS);
       
   269         }
       
   270         closelist.forEach(this::close);
       
   271         return nextPurge;
       
   272     }
       
   273 
       
   274     private void close(HttpConnection c) {
       
   275         try {
       
   276             c.close();
       
   277         } catch (Throwable e) {} // ignore
       
   278     }
       
   279 
       
   280     void stop() {
       
   281         List<HttpConnection> closelist = Collections.emptyList();
       
   282         try {
       
   283             synchronized (this) {
       
   284                 stopped = true;
       
   285                 closelist = expiryList.stream()
       
   286                     .map(e -> e.connection)
       
   287                     .collect(Collectors.toList());
       
   288                 expiryList.clear();
       
   289                 plainPool.clear();
       
   290                 sslPool.clear();
       
   291             }
       
   292         } finally {
       
   293             closelist.forEach(this::close);
       
   294         }
       
   295     }
       
   296 
       
   297     static final class ExpiryEntry {
       
   298         final HttpConnection connection;
       
   299         final Instant expiry; // absolute time in seconds of expiry time
       
   300         ExpiryEntry(HttpConnection connection, Instant expiry) {
       
   301             this.connection = connection;
       
   302             this.expiry = expiry;
       
   303         }
       
   304     }
       
   305 
       
   306     /**
       
   307      * Manages a LinkedList of sorted ExpiryEntry. The entry with the closer
       
   308      * deadline is at the tail of the list, and the entry with the farther
       
   309      * deadline is at the head. In the most common situation, new elements
       
   310      * will need to be added at the head (or close to it), and expired elements
       
   311      * will need to be purged from the tail.
       
   312      */
       
   313     private static final class ExpiryList {
       
   314         private final LinkedList<ExpiryEntry> list = new LinkedList<>();
       
   315         private volatile boolean mayContainEntries;
       
   316 
       
   317         // A loosely accurate boolean whose value is computed
       
   318         // at the end of each operation performed on ExpiryList;
       
   319         // Does not require synchronizing on the ConnectionPool.
       
   320         boolean purgeMaybeRequired() {
       
   321             return mayContainEntries;
       
   322         }
       
   323 
       
   324         // Returns the next expiry deadline
       
   325         // should only be called while holding a synchronization
       
   326         // lock on the ConnectionPool
       
   327         Optional<Instant> nextExpiryDeadline() {
       
   328             if (list.isEmpty()) return Optional.empty();
       
   329             else return Optional.of(list.getLast().expiry);
       
   330         }
       
   331 
       
   332         // should only be called while holding a synchronization
       
   333         // lock on the ConnectionPool
       
   334         void add(HttpConnection conn) {
       
   335             add(conn, Instant.now(), KEEP_ALIVE);
       
   336         }
       
   337 
       
   338         // Used by whitebox test.
       
   339         void add(HttpConnection conn, Instant now, long keepAlive) {
       
   340             Instant then = now.truncatedTo(ChronoUnit.SECONDS)
       
   341                     .plus(keepAlive, ChronoUnit.SECONDS);
       
   342 
       
   343             // Elements with the farther deadline are at the head of
       
   344             // the list. It's more likely that the new element will
       
   345             // have the farthest deadline, and will need to be inserted
       
   346             // at the head of the list, so we're using an ascending
       
   347             // list iterator to find the right insertion point.
       
   348             ListIterator<ExpiryEntry> li = list.listIterator();
       
   349             while (li.hasNext()) {
       
   350                 ExpiryEntry entry = li.next();
       
   351 
       
   352                 if (then.isAfter(entry.expiry)) {
       
   353                     li.previous();
       
   354                     // insert here
       
   355                     li.add(new ExpiryEntry(conn, then));
       
   356                     mayContainEntries = true;
       
   357                     return;
       
   358                 }
       
   359             }
       
   360             // last (or first) element of list (the last element is
       
   361             // the first when the list is empty)
       
   362             list.add(new ExpiryEntry(conn, then));
       
   363             mayContainEntries = true;
       
   364         }
       
   365 
       
   366         // should only be called while holding a synchronization
       
   367         // lock on the ConnectionPool
       
   368         void remove(HttpConnection c) {
       
   369             if (c == null || list.isEmpty()) return;
       
   370             ListIterator<ExpiryEntry> li = list.listIterator();
       
   371             while (li.hasNext()) {
       
   372                 ExpiryEntry e = li.next();
       
   373                 if (e.connection.equals(c)) {
       
   374                     li.remove();
       
   375                     mayContainEntries = !list.isEmpty();
       
   376                     return;
       
   377                 }
       
   378             }
       
   379         }
       
   380 
       
   381         // should only be called while holding a synchronization
       
   382         // lock on the ConnectionPool.
       
   383         // Purge all elements whose deadline is before now (now included).
       
   384         List<HttpConnection> purgeUntil(Instant now) {
       
   385             if (list.isEmpty()) return Collections.emptyList();
       
   386 
       
   387             List<HttpConnection> closelist = new ArrayList<>();
       
   388 
       
   389             // elements with the closest deadlines are at the tail
       
   390             // of the queue, so we're going to use a descending iterator
       
   391             // to remove them, and stop when we find the first element
       
   392             // that has not expired yet.
       
   393             Iterator<ExpiryEntry> li = list.descendingIterator();
       
   394             while (li.hasNext()) {
       
   395                 ExpiryEntry entry = li.next();
       
   396                 // use !isAfter instead of isBefore in order to
       
   397                 // remove the entry if its expiry == now
       
   398                 if (!entry.expiry.isAfter(now)) {
       
   399                     li.remove();
       
   400                     HttpConnection c = entry.connection;
       
   401                     closelist.add(c);
       
   402                 } else break; // the list is sorted
       
   403             }
       
   404             mayContainEntries = !list.isEmpty();
       
   405             return closelist;
       
   406         }
       
   407 
       
   408         // should only be called while holding a synchronization
       
   409         // lock on the ConnectionPool
       
   410         java.util.stream.Stream<ExpiryEntry> stream() {
       
   411             return list.stream();
       
   412         }
       
   413 
       
   414         // should only be called while holding a synchronization
       
   415         // lock on the ConnectionPool
       
   416         void clear() {
       
   417             list.clear();
       
   418             mayContainEntries = false;
       
   419         }
       
   420     }
       
   421 
       
   422     void cleanup(HttpConnection c, Throwable error) {
       
   423         debug.log(Level.DEBUG,
       
   424                   "%s : ConnectionPool.cleanup(%s)",
       
   425                   String.valueOf(c.getConnectionFlow()),
       
   426                   error);
       
   427         synchronized(this) {
       
   428             if (c instanceof PlainHttpConnection) {
       
   429                 removeFromPool(c, plainPool);
       
   430             } else {
       
   431                 assert c.isSecure();
       
   432                 removeFromPool(c, sslPool);
       
   433             }
       
   434             expiryList.remove(c);
       
   435         }
       
   436         c.close();
       
   437     }
       
   438 
       
   439     /**
       
   440      * An object that subscribes to the flow while the connection is in
       
   441      * the pool. Anything that comes in will cause the connection to be closed
       
   442      * and removed from the pool.
       
   443      */
       
   444     private final class CleanupTrigger implements
       
   445             FlowTube.TubeSubscriber, FlowTube.TubePublisher,
       
   446             Flow.Subscription {
       
   447 
       
   448         private final HttpConnection connection;
       
   449         private volatile boolean done;
       
   450 
       
   451         public CleanupTrigger(HttpConnection connection) {
       
   452             this.connection = connection;
       
   453         }
       
   454 
       
   455         public boolean isDone() { return done;}
       
   456 
       
   457         private void triggerCleanup(Throwable error) {
       
   458             done = true;
       
   459             cleanup(connection, error);
       
   460         }
       
   461 
       
   462         @Override public void request(long n) {}
       
   463         @Override public void cancel() {}
       
   464 
       
   465         @Override
       
   466         public void onSubscribe(Flow.Subscription subscription) {
       
   467             subscription.request(1);
       
   468         }
       
   469         @Override
       
   470         public void onError(Throwable error) { triggerCleanup(error); }
       
   471         @Override
       
   472         public void onComplete() { triggerCleanup(null); }
       
   473         @Override
       
   474         public void onNext(List<ByteBuffer> item) {
       
   475             triggerCleanup(new IOException("Data received while in pool"));
       
   476         }
       
   477 
       
   478         @Override
       
   479         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
       
   480             subscriber.onSubscribe(this);
       
   481         }
       
   482 
       
   483         @Override
       
   484         public String toString() {
       
   485             return "CleanupTrigger(" + connection.getConnectionFlow() + ")";
       
   486         }
       
   487 
       
   488     }
       
   489 
       
   490 }