1 /* |
|
2 * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. |
|
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
|
4 * |
|
5 * This code is free software; you can redistribute it and/or modify it |
|
6 * under the terms of the GNU General Public License version 2 only, as |
|
7 * published by the Free Software Foundation. |
|
8 * |
|
9 * This code is distributed in the hope that it will be useful, but WITHOUT |
|
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
|
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
|
12 * version 2 for more details (a copy is included in the LICENSE file that |
|
13 * accompanied this code). |
|
14 * |
|
15 * You should have received a copy of the GNU General Public License version |
|
16 * 2 along with this work; if not, write to the Free Software Foundation, |
|
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
|
18 * |
|
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
|
20 * or visit www.oracle.com if you need additional information or have any |
|
21 * questions. |
|
22 */ |
|
23 |
|
24 package jdk.incubator.http; |
|
25 |
|
26 import java.io.IOException; |
|
27 import java.lang.management.ManagementFactory; |
|
28 import java.net.Authenticator; |
|
29 import java.net.CookieHandler; |
|
30 import java.net.InetSocketAddress; |
|
31 import java.net.ProxySelector; |
|
32 import java.nio.ByteBuffer; |
|
33 import java.nio.channels.SocketChannel; |
|
34 import java.util.List; |
|
35 import java.util.Optional; |
|
36 import java.util.Random; |
|
37 import java.util.concurrent.CompletableFuture; |
|
38 import java.util.concurrent.Executor; |
|
39 import java.util.concurrent.Flow; |
|
40 import java.util.stream.IntStream; |
|
41 import java.time.Instant; |
|
42 import java.time.temporal.ChronoUnit; |
|
43 import javax.net.ssl.SSLContext; |
|
44 import javax.net.ssl.SSLParameters; |
|
45 import jdk.incubator.http.internal.common.FlowTube; |
|
46 |
|
47 /** |
|
48 * @summary Verifies that the ConnectionPool correctly handle |
|
49 * connection deadlines and purges the right connections |
|
50 * from the cache. |
|
51 * @bug 8187044 8187111 |
|
52 * @author danielfuchs |
|
53 */ |
|
54 public class ConnectionPoolTest { |
|
55 |
|
56 static long getActiveCleaners() throws ClassNotFoundException { |
|
57 // ConnectionPool.ACTIVE_CLEANER_COUNTER.get() |
|
58 // ConnectionPoolTest.class.getModule().addReads( |
|
59 // Class.forName("java.lang.management.ManagementFactory").getModule()); |
|
60 return java.util.stream.Stream.of(ManagementFactory.getThreadMXBean() |
|
61 .dumpAllThreads(false, false)) |
|
62 .filter(t -> t.getThreadName().startsWith("HTTP-Cache-cleaner")) |
|
63 .count(); |
|
64 } |
|
65 |
|
66 public static void main(String[] args) throws Exception { |
|
67 testCacheCleaners(); |
|
68 } |
|
69 |
|
70 public static void testCacheCleaners() throws Exception { |
|
71 ConnectionPool pool = new ConnectionPool(666); |
|
72 HttpClient client = new HttpClientStub(pool); |
|
73 InetSocketAddress proxy = InetSocketAddress.createUnresolved("bar", 80); |
|
74 System.out.println("Adding 10 connections to pool"); |
|
75 Random random = new Random(); |
|
76 |
|
77 final int count = 20; |
|
78 Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS); |
|
79 int[] keepAlives = new int[count]; |
|
80 HttpConnectionStub[] connections = new HttpConnectionStub[count]; |
|
81 long purge = pool.purgeExpiredConnectionsAndReturnNextDeadline(now); |
|
82 long expected = 0; |
|
83 if (purge != expected) { |
|
84 throw new RuntimeException("Bad purge delay: " + purge |
|
85 + ", expected " + expected); |
|
86 } |
|
87 expected = Long.MAX_VALUE; |
|
88 for (int i=0; i<count; i++) { |
|
89 InetSocketAddress addr = InetSocketAddress.createUnresolved("foo"+i, 80); |
|
90 keepAlives[i] = random.nextInt(10) * 10 + 10; |
|
91 connections[i] = new HttpConnectionStub(client, addr, proxy, true); |
|
92 System.out.println("Adding connection: " + now |
|
93 + " keepAlive: " + keepAlives[i] |
|
94 + " /" + connections[i]); |
|
95 pool.returnToPool(connections[i], now, keepAlives[i]); |
|
96 expected = Math.min(expected, keepAlives[i] * 1000); |
|
97 purge = pool.purgeExpiredConnectionsAndReturnNextDeadline(now); |
|
98 if (purge != expected) { |
|
99 throw new RuntimeException("Bad purge delay: " + purge |
|
100 + ", expected " + expected); |
|
101 } |
|
102 } |
|
103 int min = IntStream.of(keepAlives).min().getAsInt(); |
|
104 int max = IntStream.of(keepAlives).max().getAsInt(); |
|
105 int mean = (min + max)/2; |
|
106 System.out.println("min=" + min + ", max=" + max + ", mean=" + mean); |
|
107 purge = pool.purgeExpiredConnectionsAndReturnNextDeadline(now); |
|
108 System.out.println("first purge would be in " + purge + " ms"); |
|
109 if (Math.abs(purge/1000 - min) > 0) { |
|
110 throw new RuntimeException("expected " + min + " got " + purge/1000); |
|
111 } |
|
112 long opened = java.util.stream.Stream.of(connections) |
|
113 .filter(HttpConnectionStub::connected).count(); |
|
114 if (opened != count) { |
|
115 throw new RuntimeException("Opened: expected " |
|
116 + count + " got " + opened); |
|
117 } |
|
118 purge = mean * 1000; |
|
119 System.out.println("start purging at " + purge + " ms"); |
|
120 Instant next = now; |
|
121 do { |
|
122 System.out.println("next purge is in " + purge + " ms"); |
|
123 next = next.plus(purge, ChronoUnit.MILLIS); |
|
124 purge = pool.purgeExpiredConnectionsAndReturnNextDeadline(next); |
|
125 long k = now.until(next, ChronoUnit.SECONDS); |
|
126 System.out.println("now is " + k + "s from start"); |
|
127 for (int i=0; i<count; i++) { |
|
128 if (connections[i].connected() != (k < keepAlives[i])) { |
|
129 throw new RuntimeException("Bad connection state for " |
|
130 + i |
|
131 + "\n\t connected=" + connections[i].connected() |
|
132 + "\n\t keepAlive=" + keepAlives[i] |
|
133 + "\n\t elapsed=" + k); |
|
134 } |
|
135 } |
|
136 } while (purge > 0); |
|
137 opened = java.util.stream.Stream.of(connections) |
|
138 .filter(HttpConnectionStub::connected).count(); |
|
139 if (opened != 0) { |
|
140 throw new RuntimeException("Closed: expected " |
|
141 + count + " got " |
|
142 + (count-opened)); |
|
143 } |
|
144 } |
|
145 |
|
146 static <T> T error() { |
|
147 throw new InternalError("Should not reach here: wrong test assumptions!"); |
|
148 } |
|
149 |
|
150 static class FlowTubeStub implements FlowTube { |
|
151 final HttpConnectionStub conn; |
|
152 FlowTubeStub(HttpConnectionStub conn) { this.conn = conn; } |
|
153 @Override |
|
154 public void onSubscribe(Flow.Subscription subscription) { } |
|
155 @Override public void onError(Throwable error) { error(); } |
|
156 @Override public void onComplete() { error(); } |
|
157 @Override public void onNext(List<ByteBuffer> item) { error();} |
|
158 @Override |
|
159 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { |
|
160 } |
|
161 @Override public boolean isFinished() { return conn.closed; } |
|
162 } |
|
163 |
|
164 // Emulates an HttpConnection that has a strong reference to its HttpClient. |
|
165 static class HttpConnectionStub extends HttpConnection { |
|
166 |
|
167 public HttpConnectionStub(HttpClient client, |
|
168 InetSocketAddress address, |
|
169 InetSocketAddress proxy, |
|
170 boolean secured) { |
|
171 super(address, null); |
|
172 this.key = ConnectionPool.cacheKey(address, proxy); |
|
173 this.address = address; |
|
174 this.proxy = proxy; |
|
175 this.secured = secured; |
|
176 this.client = client; |
|
177 this.flow = new FlowTubeStub(this); |
|
178 } |
|
179 |
|
180 final InetSocketAddress proxy; |
|
181 final InetSocketAddress address; |
|
182 final boolean secured; |
|
183 final ConnectionPool.CacheKey key; |
|
184 final HttpClient client; |
|
185 final FlowTubeStub flow; |
|
186 volatile boolean closed; |
|
187 |
|
188 // All these return something |
|
189 @Override boolean connected() {return !closed;} |
|
190 @Override boolean isSecure() {return secured;} |
|
191 @Override boolean isProxied() {return proxy!=null;} |
|
192 @Override ConnectionPool.CacheKey cacheKey() {return key;} |
|
193 @Override void shutdownInput() throws IOException {} |
|
194 @Override void shutdownOutput() throws IOException {} |
|
195 @Override |
|
196 public void close() { |
|
197 closed=true; |
|
198 System.out.println("closed: " + this); |
|
199 } |
|
200 @Override |
|
201 public String toString() { |
|
202 return "HttpConnectionStub: " + address + " proxy: " + proxy; |
|
203 } |
|
204 |
|
205 // All these throw errors |
|
206 @Override public HttpPublisher publisher() {return error();} |
|
207 @Override public CompletableFuture<Void> connectAsync() {return error();} |
|
208 @Override SocketChannel channel() {return error();} |
|
209 @Override |
|
210 HttpConnection.DetachedConnectionChannel detachChannel() { |
|
211 return error(); |
|
212 } |
|
213 @Override |
|
214 FlowTube getConnectionFlow() {return flow;} |
|
215 } |
|
216 // Emulates an HttpClient that has a strong reference to its connection pool. |
|
217 static class HttpClientStub extends HttpClient { |
|
218 public HttpClientStub(ConnectionPool pool) { |
|
219 this.pool = pool; |
|
220 } |
|
221 final ConnectionPool pool; |
|
222 @Override public Optional<CookieHandler> cookieHandler() {return error();} |
|
223 @Override public HttpClient.Redirect followRedirects() {return error();} |
|
224 @Override public Optional<ProxySelector> proxy() {return error();} |
|
225 @Override public SSLContext sslContext() {return error();} |
|
226 @Override public SSLParameters sslParameters() {return error();} |
|
227 @Override public Optional<Authenticator> authenticator() {return error();} |
|
228 @Override public HttpClient.Version version() {return HttpClient.Version.HTTP_1_1;} |
|
229 @Override public Optional<Executor> executor() {return error();} |
|
230 @Override |
|
231 public <T> HttpResponse<T> send(HttpRequest req, |
|
232 HttpResponse.BodyHandler<T> responseBodyHandler) |
|
233 throws IOException, InterruptedException { |
|
234 return error(); |
|
235 } |
|
236 @Override |
|
237 public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest req, |
|
238 HttpResponse.BodyHandler<T> responseBodyHandler) { |
|
239 return error(); |
|
240 } |
|
241 @Override |
|
242 public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest req, |
|
243 HttpResponse.BodyHandler<T> bodyHandler, |
|
244 HttpResponse.PushPromiseHandler<T> multiHandler) { |
|
245 return error(); |
|
246 } |
|
247 } |
|
248 |
|
249 } |
|