test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/ConnectionPoolTest.java
branchhttp-client-branch
changeset 55763 634d8e14c172
parent 47216 71c04702a3d5
child 55795 074bb951658a
equal deleted inserted replaced
55762:e947a3a50a95 55763:634d8e14c172
    23 
    23 
    24 package jdk.incubator.http;
    24 package jdk.incubator.http;
    25 
    25 
    26 import java.io.IOException;
    26 import java.io.IOException;
    27 import java.lang.management.ManagementFactory;
    27 import java.lang.management.ManagementFactory;
    28 import java.lang.ref.Reference;
       
    29 import java.lang.ref.ReferenceQueue;
       
    30 import java.lang.ref.WeakReference;
       
    31 import java.net.Authenticator;
    28 import java.net.Authenticator;
    32 import java.net.CookieManager;
    29 import java.net.CookieManager;
    33 import java.net.InetSocketAddress;
    30 import java.net.InetSocketAddress;
    34 import java.net.ProxySelector;
    31 import java.net.ProxySelector;
    35 import java.nio.ByteBuffer;
    32 import java.nio.ByteBuffer;
    36 import java.nio.channels.SocketChannel;
    33 import java.nio.channels.SocketChannel;
       
    34 import java.util.List;
    37 import java.util.Optional;
    35 import java.util.Optional;
       
    36 import java.util.Random;
    38 import java.util.concurrent.CompletableFuture;
    37 import java.util.concurrent.CompletableFuture;
    39 import java.util.concurrent.Executor;
    38 import java.util.concurrent.Executor;
       
    39 import java.util.concurrent.Flow;
       
    40 import java.util.stream.IntStream;
       
    41 import java.time.Instant;
       
    42 import java.time.temporal.ChronoUnit;
    40 import javax.net.ssl.SSLContext;
    43 import javax.net.ssl.SSLContext;
    41 import javax.net.ssl.SSLParameters;
    44 import javax.net.ssl.SSLParameters;
    42 import jdk.incubator.http.internal.common.ByteBufferReference;
    45 import jdk.incubator.http.internal.common.ByteBufferReference;
       
    46 import jdk.incubator.http.internal.common.FlowTube;
    43 
    47 
    44 /**
    48 /**
    45  * @summary Verifies that the ConnectionPool won't prevent an HttpClient
    49  * @summary Verifies that the ConnectionPool correctly handle
    46  *          from being GC'ed. Verifies that the ConnectionPool has at most
    50  *          connection deadlines and purges the right connections
    47  *          one CacheCleaner thread running.
    51  *          from the cache.
    48  * @bug 8187044
    52  * @bug 8187044 8187111
    49  * @author danielfuchs
    53  * @author danielfuchs
    50  */
    54  */
    51 public class ConnectionPoolTest {
    55 public class ConnectionPoolTest {
    52 
    56 
    53     static long getActiveCleaners() throws ClassNotFoundException {
    57     static long getActiveCleaners() throws ClassNotFoundException {
    67     public static void testCacheCleaners() throws Exception {
    71     public static void testCacheCleaners() throws Exception {
    68         ConnectionPool pool = new ConnectionPool();
    72         ConnectionPool pool = new ConnectionPool();
    69         HttpClient client = new HttpClientStub(pool);
    73         HttpClient client = new HttpClientStub(pool);
    70         InetSocketAddress proxy = InetSocketAddress.createUnresolved("bar", 80);
    74         InetSocketAddress proxy = InetSocketAddress.createUnresolved("bar", 80);
    71         System.out.println("Adding 10 connections to pool");
    75         System.out.println("Adding 10 connections to pool");
    72         for (int i=0; i<10; i++) {
    76         Random random = new Random();
       
    77 
       
    78         final int count = 20;
       
    79         Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS);
       
    80         int[] keepAlives = new int[count];
       
    81         HttpConnectionStub[] connections = new HttpConnectionStub[count];
       
    82         long purge = pool.purgeExpiredConnectionsAndReturnNextDeadline(now);
       
    83         long expected = 0;
       
    84         if (purge != expected) {
       
    85             throw new RuntimeException("Bad purge delay: " + purge
       
    86                                         + ", expected " + expected);
       
    87         }
       
    88         expected = Long.MAX_VALUE;
       
    89         for (int i=0; i<count; i++) {
    73             InetSocketAddress addr = InetSocketAddress.createUnresolved("foo"+i, 80);
    90             InetSocketAddress addr = InetSocketAddress.createUnresolved("foo"+i, 80);
    74             HttpConnection c1 = new HttpConnectionStub(client, addr, proxy, true);
    91             keepAlives[i] = random.nextInt(10) * 10  + 10;
    75             pool.returnToPool(c1);
    92             connections[i] = new HttpConnectionStub(client, addr, proxy, true);
    76         }
    93             System.out.println("Adding connection: " + now
    77         while (getActiveCleaners() == 0) {
    94                                 + " keepAlive: " + keepAlives[i]
    78             System.out.println("Waiting for cleaner to start");
    95                                 + " /" + connections[i]);
    79             Thread.sleep(10);
    96             pool.returnToPool(connections[i], now, keepAlives[i]);
    80         }
    97             expected = Math.min(expected, keepAlives[i] * 1000);
    81         System.out.println("Active CacheCleaners: " + getActiveCleaners());
    98             purge = pool.purgeExpiredConnectionsAndReturnNextDeadline(now);
    82         if (getActiveCleaners() > 1) {
    99             if (purge != expected) {
    83             throw new RuntimeException("Too many CacheCleaner active: "
   100                 throw new RuntimeException("Bad purge delay: " + purge
    84                     + getActiveCleaners());
   101                                         + ", expected " + expected);
    85         }
       
    86         System.out.println("Removing 9 connections from pool");
       
    87         for (int i=0; i<9; i++) {
       
    88             InetSocketAddress addr = InetSocketAddress.createUnresolved("foo"+i, 80);
       
    89             HttpConnection c2 = pool.getConnection(true, addr, proxy);
       
    90             if (c2 == null) {
       
    91                 throw new RuntimeException("connection not found for " + addr);
       
    92             }
   102             }
    93         }
   103         }
    94         System.out.println("Active CacheCleaners: " + getActiveCleaners());
   104         int min = IntStream.of(keepAlives).min().getAsInt();
    95         if (getActiveCleaners() != 1) {
   105         int max = IntStream.of(keepAlives).max().getAsInt();
    96             throw new RuntimeException("Wrong number of CacheCleaner active: "
   106         int mean = (min + max)/2;
    97                     + getActiveCleaners());
   107         System.out.println("min=" + min + ", max=" + max + ", mean=" + mean);
    98         }
   108         purge = pool.purgeExpiredConnectionsAndReturnNextDeadline(now);
    99         System.out.println("Removing last connection from pool");
   109         System.out.println("first purge would be in " + purge + " ms");
   100         for (int i=9; i<10; i++) {
   110         if (Math.abs(purge/1000 - min) > 0) {
   101             InetSocketAddress addr = InetSocketAddress.createUnresolved("foo"+i, 80);
   111             throw new RuntimeException("expected " + min + " got " + purge/1000);
   102             HttpConnection c2 = pool.getConnection(true, addr, proxy);
   112         }
   103             if (c2 == null) {
   113         long opened = java.util.stream.Stream.of(connections)
   104                 throw new RuntimeException("connection not found for " + addr);
   114                      .filter(HttpConnectionStub::connected).count();
   105             }
   115         if (opened != count) {
   106         }
   116             throw new RuntimeException("Opened: expected "
   107         System.out.println("Active CacheCleaners: " + getActiveCleaners()
   117                                        + count + " got " + opened);
   108                 + " (may be 0 or may still be 1)");
   118         }
   109         if (getActiveCleaners() > 1) {
   119         purge = mean * 1000;
   110             throw new RuntimeException("Too many CacheCleaner active: "
   120         System.out.println("start purging at " + purge + " ms");
   111                     + getActiveCleaners());
   121         Instant next = now;
   112         }
   122         do {
   113         InetSocketAddress addr = InetSocketAddress.createUnresolved("foo", 80);
   123            System.out.println("next purge is in " + purge + " ms");
   114         HttpConnection c = new HttpConnectionStub(client, addr, proxy, true);
   124            next = next.plus(purge, ChronoUnit.MILLIS);
   115         System.out.println("Adding/Removing one connection from pool 20 times in a loop");
   125            purge = pool.purgeExpiredConnectionsAndReturnNextDeadline(next);
   116         for (int i=0; i<20; i++) {
   126            long k = now.until(next, ChronoUnit.SECONDS);
   117             pool.returnToPool(c);
   127            System.out.println("now is " + k + "s from start");
   118             HttpConnection c2 = pool.getConnection(true, addr, proxy);
   128            for (int i=0; i<count; i++) {
   119             if (c2 == null) {
   129                if (connections[i].connected() != (k < keepAlives[i])) {
   120                 throw new RuntimeException("connection not found for " + addr);
   130                    throw new RuntimeException("Bad connection state for "
   121             }
   131                              + i
   122             if (c2 != c) {
   132                              + "\n\t connected=" + connections[i].connected()
   123                 throw new RuntimeException("wrong connection found for " + addr);
   133                              + "\n\t keepAlive=" + keepAlives[i]
   124             }
   134                              + "\n\t elapsed=" + k);
   125         }
   135                }
   126         if (getActiveCleaners() > 1) {
   136            }
   127             throw new RuntimeException("Too many CacheCleaner active: "
   137         } while (purge > 0);
   128                     + getActiveCleaners());
   138         opened = java.util.stream.Stream.of(connections)
   129         }
   139                      .filter(HttpConnectionStub::connected).count();
   130         ReferenceQueue<HttpClient> queue = new ReferenceQueue<>();
   140         if (opened != 0) {
   131         WeakReference<HttpClient> weak = new WeakReference<>(client, queue);
   141            throw new RuntimeException("Closed: expected "
   132         System.gc();
   142                                        + count + " got "
   133         Reference.reachabilityFence(pool);
   143                                        + (count-opened));
   134         client = null; pool = null; c = null;
   144         }
   135         while (true) {
   145     }
   136             long cleaners = getActiveCleaners();
   146 
   137             System.out.println("Waiting for GC to release stub HttpClient;"
       
   138                     + " active cache cleaners: " + cleaners);
       
   139             System.gc();
       
   140             Reference<?> ref = queue.remove(1000);
       
   141             if (ref == weak) {
       
   142                 System.out.println("Stub HttpClient GC'ed");
       
   143                 break;
       
   144             }
       
   145         }
       
   146         while (getActiveCleaners() > 0) {
       
   147             System.out.println("Waiting for CacheCleaner to stop");
       
   148             Thread.sleep(1000);
       
   149         }
       
   150         System.out.println("Active CacheCleaners: "
       
   151                 + getActiveCleaners());
       
   152 
       
   153         if (getActiveCleaners() > 0) {
       
   154             throw new RuntimeException("Too many CacheCleaner active: "
       
   155                     + getActiveCleaners());
       
   156         }
       
   157     }
       
   158     static <T> T error() {
   147     static <T> T error() {
   159         throw new InternalError("Should not reach here: wrong test assumptions!");
   148         throw new InternalError("Should not reach here: wrong test assumptions!");
       
   149     }
       
   150 
       
   151     static class FlowTubeStub implements FlowTube {
       
   152         final HttpConnectionStub conn;
       
   153         FlowTubeStub(HttpConnectionStub conn) { this.conn = conn; }
       
   154         @Override
       
   155         public void onSubscribe(Flow.Subscription subscription) { }
       
   156         @Override public void onError(Throwable error) { error(); }
       
   157         @Override public void onComplete() { error(); }
       
   158         @Override public void onNext(List<ByteBuffer> item) { error();}
       
   159         @Override
       
   160         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
       
   161         }
       
   162         @Override public boolean isFinished() { return conn.closed; }
   160     }
   163     }
   161 
   164 
   162     // Emulates an HttpConnection that has a strong reference to its HttpClient.
   165     // Emulates an HttpConnection that has a strong reference to its HttpClient.
   163     static class HttpConnectionStub extends HttpConnection {
   166     static class HttpConnectionStub extends HttpConnection {
   164 
   167 
   170             this.key = ConnectionPool.cacheKey(address, proxy);
   173             this.key = ConnectionPool.cacheKey(address, proxy);
   171             this.address = address;
   174             this.address = address;
   172             this.proxy = proxy;
   175             this.proxy = proxy;
   173             this.secured = secured;
   176             this.secured = secured;
   174             this.client = client;
   177             this.client = client;
   175         }
   178             this.flow = new FlowTubeStub(this);
   176 
   179         }
   177         InetSocketAddress proxy;
   180 
   178         InetSocketAddress address;
   181         final InetSocketAddress proxy;
   179         boolean secured;
   182         final InetSocketAddress address;
   180         ConnectionPool.CacheKey key;
   183         final boolean secured;
   181         HttpClient client;
   184         final ConnectionPool.CacheKey key;
       
   185         final HttpClient client;
       
   186         final FlowTubeStub flow;
       
   187         volatile boolean closed;
   182 
   188 
   183         // All these return something
   189         // All these return something
   184         @Override boolean connected() {return true;}
   190         @Override boolean connected() {return !closed;}
   185         @Override boolean isSecure() {return secured;}
   191         @Override boolean isSecure() {return secured;}
   186         @Override boolean isProxied() {return proxy!=null;}
   192         @Override boolean isProxied() {return proxy!=null;}
   187         @Override ConnectionPool.CacheKey cacheKey() {return key;}
   193         @Override ConnectionPool.CacheKey cacheKey() {return key;}
   188         @Override public void close() {}
       
   189         @Override void shutdownInput() throws IOException {}
   194         @Override void shutdownInput() throws IOException {}
   190         @Override void shutdownOutput() throws IOException {}
   195         @Override void shutdownOutput() throws IOException {}
       
   196         @Override
       
   197         public void close() {
       
   198             closed=true;
       
   199             System.out.println("closed: " + this);
       
   200         }
       
   201         @Override
   191         public String toString() {
   202         public String toString() {
   192             return "HttpConnectionStub: " + address + " proxy: " + proxy;
   203             return "HttpConnectionStub: " + address + " proxy: " + proxy;
   193         }
   204         }
   194 
   205 
   195         // All these throw errors
   206         // All these throw errors
   196         @Override
   207         @Override public HttpPublisher publisher() {return error();}
   197         public void connect() throws IOException, InterruptedException {error();}
       
   198         @Override public CompletableFuture<Void> connectAsync() {return error();}
   208         @Override public CompletableFuture<Void> connectAsync() {return error();}
   199         @Override SocketChannel channel() {return error();}
   209         @Override SocketChannel channel() {return error();}
   200         @Override void flushAsync() throws IOException {error();}
   210         @Override public void flushAsync() throws IOException {error();}
   201         @Override
   211         @Override
   202         protected ByteBuffer readImpl() throws IOException {return error();}
   212         public void writeAsync(ByteBufferReference[] buffers) throws IOException {
   203         @Override CompletableFuture<Void> whenReceivingResponse() {return error();}
       
   204         @Override
       
   205         long write(ByteBuffer[] buffers, int start, int number) throws IOException {
       
   206             throw (Error)error();
       
   207         }
       
   208         @Override
       
   209         long write(ByteBuffer buffer) throws IOException {throw (Error)error();}
       
   210         @Override
       
   211         void writeAsync(ByteBufferReference[] buffers) throws IOException {
       
   212             error();
   213             error();
   213         }
   214         }
   214         @Override
   215         @Override
   215         void writeAsyncUnordered(ByteBufferReference[] buffers)
   216         public void writeAsyncUnordered(ByteBufferReference[] buffers)
   216                 throws IOException {
   217                 throws IOException {
   217             error();
   218             error();
   218         }
   219         }
       
   220         @Override
       
   221         HttpConnection.DetachedConnectionChannel detachChannel() {
       
   222             return error();
       
   223         }
       
   224         @Override
       
   225         FlowTube getConnectionFlow() {return flow;}
   219     }
   226     }
   220     // Emulates an HttpClient that has a strong reference to its connection pool.
   227     // Emulates an HttpClient that has a strong reference to its connection pool.
   221     static class HttpClientStub extends HttpClient {
   228     static class HttpClientStub extends HttpClient {
   222         public HttpClientStub(ConnectionPool pool) {
   229         public HttpClientStub(ConnectionPool pool) {
   223             this.pool = pool;
   230             this.pool = pool;
   225         final ConnectionPool pool;
   232         final ConnectionPool pool;
   226         @Override public Optional<CookieManager> cookieManager() {return error();}
   233         @Override public Optional<CookieManager> cookieManager() {return error();}
   227         @Override public HttpClient.Redirect followRedirects() {return error();}
   234         @Override public HttpClient.Redirect followRedirects() {return error();}
   228         @Override public Optional<ProxySelector> proxy() {return error();}
   235         @Override public Optional<ProxySelector> proxy() {return error();}
   229         @Override public SSLContext sslContext() {return error();}
   236         @Override public SSLContext sslContext() {return error();}
   230         @Override public Optional<SSLParameters> sslParameters() {return error();}
   237         @Override public SSLParameters sslParameters() {return error();}
   231         @Override public Optional<Authenticator> authenticator() {return error();}
   238         @Override public Optional<Authenticator> authenticator() {return error();}
   232         @Override public HttpClient.Version version() {return HttpClient.Version.HTTP_1_1;}
   239         @Override public HttpClient.Version version() {return HttpClient.Version.HTTP_1_1;}
   233         @Override public Executor executor() {return error();}
   240         @Override public Optional<Executor> executor() {return error();}
   234         @Override
   241         @Override
   235         public <T> HttpResponse<T> send(HttpRequest req,
   242         public <T> HttpResponse<T> send(HttpRequest req,
   236                 HttpResponse.BodyHandler<T> responseBodyHandler)
   243                 HttpResponse.BodyHandler<T> responseBodyHandler)
   237                 throws IOException, InterruptedException {
   244                 throws IOException, InterruptedException {
   238             return error();
   245             return error();
   242                 HttpResponse.BodyHandler<T> responseBodyHandler) {
   249                 HttpResponse.BodyHandler<T> responseBodyHandler) {
   243             return error();
   250             return error();
   244         }
   251         }
   245         @Override
   252         @Override
   246         public <U, T> CompletableFuture<U> sendAsync(HttpRequest req,
   253         public <U, T> CompletableFuture<U> sendAsync(HttpRequest req,
   247                 HttpResponse.MultiProcessor<U, T> multiProcessor) {
   254                 HttpResponse.MultiSubscriber<U, T> multiSubscriber) {
   248             return error();
   255             return error();
   249         }
   256         }
   250     }
   257     }
   251 
   258 
   252 }
   259 }