8187044: HttpClient ConnectionPool may spawn several concurrent CacheCleaner and prevent early GC of HttpClient.
Summary: Fixes CacheCleaner creation logic in ConnectionPool.
Reviewed-by: chegar
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ConnectionPool.java Fri Sep 01 08:15:52 2017 -0700
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ConnectionPool.java Fri Sep 01 18:18:09 2017 +0100
@@ -25,11 +25,14 @@
package jdk.incubator.http;
+import java.lang.ref.WeakReference;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.ListIterator;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import jdk.incubator.http.internal.common.Utils;
/**
@@ -37,6 +40,21 @@
*/
final class ConnectionPool {
+ // These counters are used to distribute ids for debugging
+ // The ACTIVE_CLEANER_COUNTER will tell how many CacheCleaner
+ // are active at a given time. It will increase when a new
+ // CacheCleaner is started and decrease when it exits.
+ static final AtomicLong ACTIVE_CLEANER_COUNTER = new AtomicLong();
+ // The POOL_IDS_COUNTER increases each time a new ConnectionPool
+ // is created. It may wrap and become negative but will never be
+ // decremented.
+ static final AtomicLong POOL_IDS_COUNTER = new AtomicLong();
+ // The cleanerCounter is used to name cleaner threads within a
+ // a connection pool, and increments monotically.
+ // It may wrap and become negative but will never be
+ // decremented.
+ final AtomicLong cleanerCounter = new AtomicLong();
+
static final long KEEP_ALIVE = Utils.getIntegerNetProperty(
"jdk.httpclient.keepalive.timeout", 1200); // seconds
@@ -44,7 +62,12 @@
final HashMap<CacheKey,LinkedList<HttpConnection>> plainPool;
final HashMap<CacheKey,LinkedList<HttpConnection>> sslPool;
- CacheCleaner cleaner;
+ // A monotically increasing id for this connection pool.
+ // It may be negative (that's OK)
+ // Mostly used for debugging purposes when looking at thread dumps.
+ // Global scope.
+ final long poolID = POOL_IDS_COUNTER.incrementAndGet();
+ final AtomicReference<CacheCleaner> cleanerRef;
/**
* Entries in connection pool are keyed by destination address and/or
@@ -105,6 +128,7 @@
plainPool = new HashMap<>();
sslPool = new HashMap<>();
expiryList = new LinkedList<>();
+ cleanerRef = new AtomicReference<>();
}
void start() {
@@ -143,7 +167,7 @@
findConnection(CacheKey key,
HashMap<CacheKey,LinkedList<HttpConnection>> pool) {
LinkedList<HttpConnection> l = pool.get(key);
- if (l == null || l.size() == 0) {
+ if (l == null || l.isEmpty()) {
return null;
} else {
HttpConnection c = l.removeFirst();
@@ -175,19 +199,36 @@
l.add(c);
}
- // only runs while entries exist in cache
+ static String makeCleanerName(long poolId, long cleanerId) {
+ return "HTTP-Cache-cleaner-" + poolId + "-" + cleanerId;
+ }
- final class CacheCleaner extends Thread {
+ // only runs while entries exist in cache
+ final static class CacheCleaner extends Thread {
volatile boolean stopping;
+ // A monotically increasing id. May wrap and become negative (that's OK)
+ // Mostly used for debugging purposes when looking at thread dumps.
+ // Scoped per connection pool.
+ final long cleanerID;
+ // A reference to the owning ConnectionPool.
+ // This reference's referent may become null if the HttpClientImpl
+ // that owns this pool is GC'ed.
+ final WeakReference<ConnectionPool> ownerRef;
- CacheCleaner() {
- super(null, null, "HTTP-Cache-cleaner", 0, false);
+ CacheCleaner(ConnectionPool owner) {
+ this(owner, owner.cleanerCounter.incrementAndGet());
+ }
+
+ CacheCleaner(ConnectionPool owner, long cleanerID) {
+ super(null, null, makeCleanerName(owner.poolID, cleanerID), 0, false);
+ this.cleanerID = cleanerID;
+ this.ownerRef = new WeakReference<>(owner);
setDaemon(true);
}
synchronized boolean stopping() {
- return stopping;
+ return stopping || ownerRef.get() == null;
}
synchronized void stopCleaner() {
@@ -196,11 +237,19 @@
@Override
public void run() {
- while (!stopping()) {
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {}
- cleanCache();
+ ACTIVE_CLEANER_COUNTER.incrementAndGet();
+ try {
+ while (!stopping()) {
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {}
+ ConnectionPool owner = ownerRef.get();
+ if (owner == null) return;
+ owner.cleanCache(this);
+ owner = null;
+ }
+ } finally {
+ ACTIVE_CLEANER_COUNTER.decrementAndGet();
}
}
}
@@ -217,13 +266,15 @@
return;
}
}
- if (expiryList.isEmpty()) {
+ CacheCleaner cleaner = this.cleanerRef.get();
+ if (expiryList.isEmpty() && cleaner != null) {
+ this.cleanerRef.compareAndSet(cleaner, null);
cleaner.stopCleaner();
- cleaner = null;
+ cleaner.interrupt();
}
}
- private void cleanCache() {
+ private void cleanCache(CacheCleaner cleaner) {
long now = System.currentTimeMillis() / 1000;
LinkedList<HttpConnection> closelist = new LinkedList<>();
@@ -242,6 +293,10 @@
}
}
}
+ if (expiryList.isEmpty() && cleaner != null) {
+ this.cleanerRef.compareAndSet(cleaner, null);
+ cleaner.stopCleaner();
+ }
}
for (HttpConnection c : closelist) {
//System.out.println ("KAC: closing " + c);
@@ -252,10 +307,13 @@
private synchronized void addToExpiryList(HttpConnection conn) {
long now = System.currentTimeMillis() / 1000;
long then = now + KEEP_ALIVE;
-
if (expiryList.isEmpty()) {
- cleaner = new CacheCleaner();
- cleaner.start();
+ CacheCleaner cleaner = new CacheCleaner(this);
+ if (this.cleanerRef.compareAndSet(null, cleaner)) {
+ cleaner.start();
+ }
+ expiryList.add(new ExpiryEntry(conn, then));
+ return;
}
ListIterator<ExpiryEntry> li = expiryList.listIterator();
--- a/jdk/test/java/net/httpclient/whitebox/Driver.java Fri Sep 01 08:15:52 2017 -0700
+++ b/jdk/test/java/net/httpclient/whitebox/Driver.java Fri Sep 01 18:18:09 2017 +0100
@@ -23,9 +23,10 @@
/*
* @test
- * @bug 8151299 8164704
- * @modules jdk.incubator.httpclient
+ * @bug 8151299 8164704 8187044
+ * @modules jdk.incubator.httpclient java.management
* @run testng jdk.incubator.httpclient/jdk.incubator.http.SelectorTest
* @run testng jdk.incubator.httpclient/jdk.incubator.http.RawChannelTest
* @run testng jdk.incubator.httpclient/jdk.incubator.http.ResponseHeadersTest
+ * @run main/othervm --add-reads jdk.incubator.httpclient=java.management jdk.incubator.httpclient/jdk.incubator.http.ConnectionPoolTest
*/
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/ConnectionPoolTest.java Fri Sep 01 18:18:09 2017 +0100
@@ -0,0 +1,252 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.incubator.http;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
+import java.net.Authenticator;
+import java.net.CookieManager;
+import java.net.InetSocketAddress;
+import java.net.ProxySelector;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLParameters;
+import jdk.incubator.http.internal.common.ByteBufferReference;
+
+/**
+ * @summary Verifies that the ConnectionPool won't prevent an HttpClient
+ * from being GC'ed. Verifies that the ConnectionPool has at most
+ * one CacheCleaner thread running.
+ * @bug 8187044
+ * @author danielfuchs
+ */
+public class ConnectionPoolTest {
+
+ static long getActiveCleaners() throws ClassNotFoundException {
+ // ConnectionPool.ACTIVE_CLEANER_COUNTER.get()
+ // ConnectionPoolTest.class.getModule().addReads(
+ // Class.forName("java.lang.management.ManagementFactory").getModule());
+ return java.util.stream.Stream.of(ManagementFactory.getThreadMXBean()
+ .dumpAllThreads(false, false))
+ .filter(t -> t.getThreadName().startsWith("HTTP-Cache-cleaner"))
+ .count();
+ }
+
+ public static void main(String[] args) throws Exception {
+ testCacheCleaners();
+ }
+
+ public static void testCacheCleaners() throws Exception {
+ ConnectionPool pool = new ConnectionPool();
+ HttpClient client = new HttpClientStub(pool);
+ InetSocketAddress proxy = InetSocketAddress.createUnresolved("bar", 80);
+ System.out.println("Adding 10 connections to pool");
+ for (int i=0; i<10; i++) {
+ InetSocketAddress addr = InetSocketAddress.createUnresolved("foo"+i, 80);
+ HttpConnection c1 = new HttpConnectionStub(client, addr, proxy, true);
+ pool.returnToPool(c1);
+ }
+ while (getActiveCleaners() == 0) {
+ System.out.println("Waiting for cleaner to start");
+ Thread.sleep(10);
+ }
+ System.out.println("Active CacheCleaners: " + getActiveCleaners());
+ if (getActiveCleaners() > 1) {
+ throw new RuntimeException("Too many CacheCleaner active: "
+ + getActiveCleaners());
+ }
+ System.out.println("Removing 9 connections from pool");
+ for (int i=0; i<9; i++) {
+ InetSocketAddress addr = InetSocketAddress.createUnresolved("foo"+i, 80);
+ HttpConnection c2 = pool.getConnection(true, addr, proxy);
+ if (c2 == null) {
+ throw new RuntimeException("connection not found for " + addr);
+ }
+ }
+ System.out.println("Active CacheCleaners: " + getActiveCleaners());
+ if (getActiveCleaners() != 1) {
+ throw new RuntimeException("Wrong number of CacheCleaner active: "
+ + getActiveCleaners());
+ }
+ System.out.println("Removing last connection from pool");
+ for (int i=9; i<10; i++) {
+ InetSocketAddress addr = InetSocketAddress.createUnresolved("foo"+i, 80);
+ HttpConnection c2 = pool.getConnection(true, addr, proxy);
+ if (c2 == null) {
+ throw new RuntimeException("connection not found for " + addr);
+ }
+ }
+ System.out.println("Active CacheCleaners: " + getActiveCleaners()
+ + " (may be 0 or may still be 1)");
+ if (getActiveCleaners() > 1) {
+ throw new RuntimeException("Too many CacheCleaner active: "
+ + getActiveCleaners());
+ }
+ InetSocketAddress addr = InetSocketAddress.createUnresolved("foo", 80);
+ HttpConnection c = new HttpConnectionStub(client, addr, proxy, true);
+ System.out.println("Adding/Removing one connection from pool 20 times in a loop");
+ for (int i=0; i<20; i++) {
+ pool.returnToPool(c);
+ HttpConnection c2 = pool.getConnection(true, addr, proxy);
+ if (c2 == null) {
+ throw new RuntimeException("connection not found for " + addr);
+ }
+ if (c2 != c) {
+ throw new RuntimeException("wrong connection found for " + addr);
+ }
+ }
+ if (getActiveCleaners() > 1) {
+ throw new RuntimeException("Too many CacheCleaner active: "
+ + getActiveCleaners());
+ }
+ ReferenceQueue<HttpClient> queue = new ReferenceQueue<>();
+ WeakReference<HttpClient> weak = new WeakReference<>(client, queue);
+ System.gc();
+ Reference.reachabilityFence(pool);
+ client = null; pool = null; c = null;
+ while (true) {
+ long cleaners = getActiveCleaners();
+ System.out.println("Waiting for GC to release stub HttpClient;"
+ + " active cache cleaners: " + cleaners);
+ System.gc();
+ Reference<?> ref = queue.remove(1000);
+ if (ref == weak) {
+ System.out.println("Stub HttpClient GC'ed");
+ break;
+ }
+ }
+ while (getActiveCleaners() > 0) {
+ System.out.println("Waiting for CacheCleaner to stop");
+ Thread.sleep(1000);
+ }
+ System.out.println("Active CacheCleaners: "
+ + getActiveCleaners());
+
+ if (getActiveCleaners() > 0) {
+ throw new RuntimeException("Too many CacheCleaner active: "
+ + getActiveCleaners());
+ }
+ }
+ static <T> T error() {
+ throw new InternalError("Should not reach here: wrong test assumptions!");
+ }
+
+ // Emulates an HttpConnection that has a strong reference to its HttpClient.
+ static class HttpConnectionStub extends HttpConnection {
+
+ public HttpConnectionStub(HttpClient client,
+ InetSocketAddress address,
+ InetSocketAddress proxy,
+ boolean secured) {
+ super(address, null);
+ this.key = ConnectionPool.cacheKey(address, proxy);
+ this.address = address;
+ this.proxy = proxy;
+ this.secured = secured;
+ this.client = client;
+ }
+
+ InetSocketAddress proxy;
+ InetSocketAddress address;
+ boolean secured;
+ ConnectionPool.CacheKey key;
+ HttpClient client;
+
+ // All these return something
+ @Override boolean connected() {return true;}
+ @Override boolean isSecure() {return secured;}
+ @Override boolean isProxied() {return proxy!=null;}
+ @Override ConnectionPool.CacheKey cacheKey() {return key;}
+ @Override public void close() {}
+ @Override void shutdownInput() throws IOException {}
+ @Override void shutdownOutput() throws IOException {}
+ public String toString() {
+ return "HttpConnectionStub: " + address + " proxy: " + proxy;
+ }
+
+ // All these throw errors
+ @Override
+ public void connect() throws IOException, InterruptedException {error();}
+ @Override public CompletableFuture<Void> connectAsync() {return error();}
+ @Override SocketChannel channel() {return error();}
+ @Override void flushAsync() throws IOException {error();}
+ @Override
+ protected ByteBuffer readImpl() throws IOException {return error();}
+ @Override CompletableFuture<Void> whenReceivingResponse() {return error();}
+ @Override
+ long write(ByteBuffer[] buffers, int start, int number) throws IOException {
+ throw (Error)error();
+ }
+ @Override
+ long write(ByteBuffer buffer) throws IOException {throw (Error)error();}
+ @Override
+ void writeAsync(ByteBufferReference[] buffers) throws IOException {
+ error();
+ }
+ @Override
+ void writeAsyncUnordered(ByteBufferReference[] buffers)
+ throws IOException {
+ error();
+ }
+ }
+ // Emulates an HttpClient that has a strong reference to its connection pool.
+ static class HttpClientStub extends HttpClient {
+ public HttpClientStub(ConnectionPool pool) {
+ this.pool = pool;
+ }
+ final ConnectionPool pool;
+ @Override public Optional<CookieManager> cookieManager() {return error();}
+ @Override public HttpClient.Redirect followRedirects() {return error();}
+ @Override public Optional<ProxySelector> proxy() {return error();}
+ @Override public SSLContext sslContext() {return error();}
+ @Override public Optional<SSLParameters> sslParameters() {return error();}
+ @Override public Optional<Authenticator> authenticator() {return error();}
+ @Override public HttpClient.Version version() {return HttpClient.Version.HTTP_1_1;}
+ @Override public Executor executor() {return error();}
+ @Override
+ public <T> HttpResponse<T> send(HttpRequest req,
+ HttpResponse.BodyHandler<T> responseBodyHandler)
+ throws IOException, InterruptedException {
+ return error();
+ }
+ @Override
+ public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest req,
+ HttpResponse.BodyHandler<T> responseBodyHandler) {
+ return error();
+ }
+ @Override
+ public <U, T> CompletableFuture<U> sendAsync(HttpRequest req,
+ HttpResponse.MultiProcessor<U, T> multiProcessor) {
+ return error();
+ }
+ }
+
+}