diff -r 6c11b48a0695 -r d23b02f37fce src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ConnectionPool.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ConnectionPool.java Tue Feb 06 11:39:55 2018 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,490 +0,0 @@ -/* - * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ - -package jdk.incubator.http; - -import java.io.IOException; -import java.lang.System.Logger.Level; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.ListIterator; -import java.util.Objects; -import java.util.Optional; -import java.util.concurrent.Flow; -import java.util.stream.Collectors; -import jdk.incubator.http.internal.common.FlowTube; -import jdk.incubator.http.internal.common.Utils; - -/** - * Http 1.1 connection pool. - */ -final class ConnectionPool { - - static final long KEEP_ALIVE = Utils.getIntegerNetProperty( - "jdk.httpclient.keepalive.timeout", 1200); // seconds - static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. - final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); - - // Pools of idle connections - - private final HashMap> plainPool; - private final HashMap> sslPool; - private final ExpiryList expiryList; - private final String dbgTag; // used for debug - boolean stopped; - - /** - * Entries in connection pool are keyed by destination address and/or - * proxy address: - * case 1: plain TCP not via proxy (destination only) - * case 2: plain TCP via proxy (proxy only) - * case 3: SSL not via proxy (destination only) - * case 4: SSL over tunnel (destination and proxy) - */ - static class CacheKey { - final InetSocketAddress proxy; - final InetSocketAddress destination; - - CacheKey(InetSocketAddress destination, InetSocketAddress proxy) { - this.proxy = proxy; - this.destination = destination; - } - - @Override - public boolean equals(Object obj) { - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - final CacheKey other = (CacheKey) obj; - if (!Objects.equals(this.proxy, other.proxy)) { - return false; - } - if (!Objects.equals(this.destination, other.destination)) { - return false; - } - return true; - } - - @Override - public int hashCode() { - return Objects.hash(proxy, destination); - } - } - - ConnectionPool(long clientId) { - this("ConnectionPool("+clientId+")"); - } - - /** - * There should be one of these per HttpClient. - */ - private ConnectionPool(String tag) { - dbgTag = tag; - plainPool = new HashMap<>(); - sslPool = new HashMap<>(); - expiryList = new ExpiryList(); - } - - final String dbgString() { - return dbgTag; - } - - synchronized void start() { - assert !stopped : "Already stopped"; - } - - static CacheKey cacheKey(InetSocketAddress destination, - InetSocketAddress proxy) - { - return new CacheKey(destination, proxy); - } - - synchronized HttpConnection getConnection(boolean secure, - InetSocketAddress addr, - InetSocketAddress proxy) { - if (stopped) return null; - CacheKey key = new CacheKey(addr, proxy); - HttpConnection c = secure ? findConnection(key, sslPool) - : findConnection(key, plainPool); - //System.out.println ("getConnection returning: " + c); - return c; - } - - /** - * Returns the connection to the pool. - */ - void returnToPool(HttpConnection conn) { - returnToPool(conn, Instant.now(), KEEP_ALIVE); - } - - // Called also by whitebox tests - void returnToPool(HttpConnection conn, Instant now, long keepAlive) { - - // Don't call registerCleanupTrigger while holding a lock, - // but register it before the connection is added to the pool, - // since we don't want to trigger the cleanup if the connection - // is not in the pool. - CleanupTrigger cleanup = registerCleanupTrigger(conn); - - // it's possible that cleanup may have been called. - synchronized(this) { - if (cleanup.isDone()) { - return; - } else if (stopped) { - conn.close(); - return; - } - if (conn instanceof PlainHttpConnection) { - putConnection(conn, plainPool); - } else { - assert conn.isSecure(); - putConnection(conn, sslPool); - } - expiryList.add(conn, now, keepAlive); - } - //System.out.println("Return to pool: " + conn); - } - - private CleanupTrigger registerCleanupTrigger(HttpConnection conn) { - // Connect the connection flow to a pub/sub pair that will take the - // connection out of the pool and close it if anything happens - // while the connection is sitting in the pool. - CleanupTrigger cleanup = new CleanupTrigger(conn); - FlowTube flow = conn.getConnectionFlow(); - debug.log(Level.DEBUG, "registering %s", cleanup); - flow.connectFlows(cleanup, cleanup); - return cleanup; - } - - private HttpConnection - findConnection(CacheKey key, - HashMap> pool) { - LinkedList l = pool.get(key); - if (l == null || l.isEmpty()) { - return null; - } else { - HttpConnection c = l.removeFirst(); - expiryList.remove(c); - return c; - } - } - - /* called from cache cleaner only */ - private boolean - removeFromPool(HttpConnection c, - HashMap> pool) { - //System.out.println("cacheCleaner removing: " + c); - assert Thread.holdsLock(this); - CacheKey k = c.cacheKey(); - List l = pool.get(k); - if (l == null || l.isEmpty()) { - pool.remove(k); - return false; - } - return l.remove(c); - } - - private void - putConnection(HttpConnection c, - HashMap> pool) { - CacheKey key = c.cacheKey(); - LinkedList l = pool.get(key); - if (l == null) { - l = new LinkedList<>(); - pool.put(key, l); - } - l.add(c); - } - - /** - * Purge expired connection and return the number of milliseconds - * in which the next connection is scheduled to expire. - * If no connections are scheduled to be purged return 0. - * @return the delay in milliseconds in which the next connection will - * expire. - */ - long purgeExpiredConnectionsAndReturnNextDeadline() { - if (!expiryList.purgeMaybeRequired()) return 0; - return purgeExpiredConnectionsAndReturnNextDeadline(Instant.now()); - } - - // Used for whitebox testing - long purgeExpiredConnectionsAndReturnNextDeadline(Instant now) { - long nextPurge = 0; - - // We may be in the process of adding new elements - // to the expiry list - but those elements will not - // have outlast their keep alive timer yet since we're - // just adding them. - if (!expiryList.purgeMaybeRequired()) return nextPurge; - - List closelist; - synchronized (this) { - closelist = expiryList.purgeUntil(now); - for (HttpConnection c : closelist) { - if (c instanceof PlainHttpConnection) { - boolean wasPresent = removeFromPool(c, plainPool); - assert wasPresent; - } else { - boolean wasPresent = removeFromPool(c, sslPool); - assert wasPresent; - } - } - nextPurge = now.until( - expiryList.nextExpiryDeadline().orElse(now), - ChronoUnit.MILLIS); - } - closelist.forEach(this::close); - return nextPurge; - } - - private void close(HttpConnection c) { - try { - c.close(); - } catch (Throwable e) {} // ignore - } - - void stop() { - List closelist = Collections.emptyList(); - try { - synchronized (this) { - stopped = true; - closelist = expiryList.stream() - .map(e -> e.connection) - .collect(Collectors.toList()); - expiryList.clear(); - plainPool.clear(); - sslPool.clear(); - } - } finally { - closelist.forEach(this::close); - } - } - - static final class ExpiryEntry { - final HttpConnection connection; - final Instant expiry; // absolute time in seconds of expiry time - ExpiryEntry(HttpConnection connection, Instant expiry) { - this.connection = connection; - this.expiry = expiry; - } - } - - /** - * Manages a LinkedList of sorted ExpiryEntry. The entry with the closer - * deadline is at the tail of the list, and the entry with the farther - * deadline is at the head. In the most common situation, new elements - * will need to be added at the head (or close to it), and expired elements - * will need to be purged from the tail. - */ - private static final class ExpiryList { - private final LinkedList list = new LinkedList<>(); - private volatile boolean mayContainEntries; - - // A loosely accurate boolean whose value is computed - // at the end of each operation performed on ExpiryList; - // Does not require synchronizing on the ConnectionPool. - boolean purgeMaybeRequired() { - return mayContainEntries; - } - - // Returns the next expiry deadline - // should only be called while holding a synchronization - // lock on the ConnectionPool - Optional nextExpiryDeadline() { - if (list.isEmpty()) return Optional.empty(); - else return Optional.of(list.getLast().expiry); - } - - // should only be called while holding a synchronization - // lock on the ConnectionPool - void add(HttpConnection conn) { - add(conn, Instant.now(), KEEP_ALIVE); - } - - // Used by whitebox test. - void add(HttpConnection conn, Instant now, long keepAlive) { - Instant then = now.truncatedTo(ChronoUnit.SECONDS) - .plus(keepAlive, ChronoUnit.SECONDS); - - // Elements with the farther deadline are at the head of - // the list. It's more likely that the new element will - // have the farthest deadline, and will need to be inserted - // at the head of the list, so we're using an ascending - // list iterator to find the right insertion point. - ListIterator li = list.listIterator(); - while (li.hasNext()) { - ExpiryEntry entry = li.next(); - - if (then.isAfter(entry.expiry)) { - li.previous(); - // insert here - li.add(new ExpiryEntry(conn, then)); - mayContainEntries = true; - return; - } - } - // last (or first) element of list (the last element is - // the first when the list is empty) - list.add(new ExpiryEntry(conn, then)); - mayContainEntries = true; - } - - // should only be called while holding a synchronization - // lock on the ConnectionPool - void remove(HttpConnection c) { - if (c == null || list.isEmpty()) return; - ListIterator li = list.listIterator(); - while (li.hasNext()) { - ExpiryEntry e = li.next(); - if (e.connection.equals(c)) { - li.remove(); - mayContainEntries = !list.isEmpty(); - return; - } - } - } - - // should only be called while holding a synchronization - // lock on the ConnectionPool. - // Purge all elements whose deadline is before now (now included). - List purgeUntil(Instant now) { - if (list.isEmpty()) return Collections.emptyList(); - - List closelist = new ArrayList<>(); - - // elements with the closest deadlines are at the tail - // of the queue, so we're going to use a descending iterator - // to remove them, and stop when we find the first element - // that has not expired yet. - Iterator li = list.descendingIterator(); - while (li.hasNext()) { - ExpiryEntry entry = li.next(); - // use !isAfter instead of isBefore in order to - // remove the entry if its expiry == now - if (!entry.expiry.isAfter(now)) { - li.remove(); - HttpConnection c = entry.connection; - closelist.add(c); - } else break; // the list is sorted - } - mayContainEntries = !list.isEmpty(); - return closelist; - } - - // should only be called while holding a synchronization - // lock on the ConnectionPool - java.util.stream.Stream stream() { - return list.stream(); - } - - // should only be called while holding a synchronization - // lock on the ConnectionPool - void clear() { - list.clear(); - mayContainEntries = false; - } - } - - void cleanup(HttpConnection c, Throwable error) { - debug.log(Level.DEBUG, - "%s : ConnectionPool.cleanup(%s)", - String.valueOf(c.getConnectionFlow()), - error); - synchronized(this) { - if (c instanceof PlainHttpConnection) { - removeFromPool(c, plainPool); - } else { - assert c.isSecure(); - removeFromPool(c, sslPool); - } - expiryList.remove(c); - } - c.close(); - } - - /** - * An object that subscribes to the flow while the connection is in - * the pool. Anything that comes in will cause the connection to be closed - * and removed from the pool. - */ - private final class CleanupTrigger implements - FlowTube.TubeSubscriber, FlowTube.TubePublisher, - Flow.Subscription { - - private final HttpConnection connection; - private volatile boolean done; - - public CleanupTrigger(HttpConnection connection) { - this.connection = connection; - } - - public boolean isDone() { return done;} - - private void triggerCleanup(Throwable error) { - done = true; - cleanup(connection, error); - } - - @Override public void request(long n) {} - @Override public void cancel() {} - - @Override - public void onSubscribe(Flow.Subscription subscription) { - subscription.request(1); - } - @Override - public void onError(Throwable error) { triggerCleanup(error); } - @Override - public void onComplete() { triggerCleanup(null); } - @Override - public void onNext(List item) { - triggerCleanup(new IOException("Data received while in pool")); - } - - @Override - public void subscribe(Flow.Subscriber> subscriber) { - subscriber.onSubscribe(this); - } - - @Override - public String toString() { - return "CleanupTrigger(" + connection.getConnectionFlow() + ")"; - } - - } - -}