|
1 /* |
|
2 * Copyright 2009 Google Inc. All Rights Reserved. |
|
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
|
4 * |
|
5 * This code is free software; you can redistribute it and/or modify it |
|
6 * under the terms of the GNU General Public License version 2 only, as |
|
7 * published by the Free Software Foundation. |
|
8 * |
|
9 * This code is distributed in the hope that it will be useful, but WITHOUT |
|
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
|
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
|
12 * version 2 for more details (a copy is included in the LICENSE file that |
|
13 * accompanied this code). |
|
14 * |
|
15 * You should have received a copy of the GNU General Public License version |
|
16 * 2 along with this work; if not, write to the Free Software Foundation, |
|
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
|
18 * |
|
19 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, |
|
20 * CA 95054 USA or visit www.sun.com if you need additional information or |
|
21 * have any questions. |
|
22 */ |
|
23 |
|
24 import java.net.InetSocketAddress; |
|
25 import java.net.SocketAddress; |
|
26 import java.nio.channels.SelectionKey; |
|
27 import java.nio.channels.Selector; |
|
28 import java.nio.channels.ServerSocketChannel; |
|
29 import java.nio.channels.SocketChannel; |
|
30 import java.util.ArrayList; |
|
31 import java.util.Iterator; |
|
32 import java.util.List; |
|
33 |
|
34 /** |
|
35 * Reproduces O(N^2) behavior of JDK6/7 select() call. This happens when |
|
36 * a selector has many unprocessed updates to its interest set (e.g. adding |
|
37 * OP_READ on a bunch of newly accepted sockets). The O(N^2) is triggered |
|
38 * by cancelling a number of selection keys (or just closing a few sockets). |
|
39 * In this case, select() will first go through the list of cancelled keys |
|
40 * and try to deregister them. That deregistration is O(N^2) over the list |
|
41 * of unprocessed updates to the interest set. |
|
42 * |
|
43 * <p> This O(N^2) behavior is a BUG in JVM and should be fixed. |
|
44 * |
|
45 * <p> The test first creates initCount connections, and adds them |
|
46 * to the server epoll set. It then creates massCount connections, |
|
47 * registers interest (causing updateList to be populated with massCount*2 |
|
48 * elements), but does not add them to epoll set (that would've cleared |
|
49 * updateList). The test then closes initCount connections, thus populating |
|
50 * deregistration queue. The subsequent call to selectNow() will first process |
|
51 * deregistration queue, performing O(N^2) over updateList size, |
|
52 * equal to massCount * 2. |
|
53 * |
|
54 * <p> Note that connect rate is artificially slowed down to compensate |
|
55 * for what I believe is a Linux bug, where too high of a connection rate |
|
56 * ends up in SYN's being dropped and then slow retransmits. |
|
57 * |
|
58 * @author Igor Chernyshev |
|
59 */ |
|
60 public class LotsOfCancels { |
|
61 |
|
62 static long testStartTime; |
|
63 |
|
64 public static void main(String[] args) throws Exception { |
|
65 // the final select should run in less than 1000ms. |
|
66 runTest(500, 2700, 1000); |
|
67 } |
|
68 |
|
69 static void log(String msg) { |
|
70 System.out.println(getLogPrefix() + msg); |
|
71 } |
|
72 |
|
73 static String getLogPrefix() { |
|
74 return durationMillis(testStartTime) + ": "; |
|
75 } |
|
76 |
|
77 /** |
|
78 * Returns the elapsed time since startNanos, in milliseconds. |
|
79 * @param startNanos the start time; this must be a value returned |
|
80 * by {@link System.nanoTime} |
|
81 */ |
|
82 static long durationMillis(long startNanos) { |
|
83 return (System.nanoTime() - startNanos) / (1000L * 1000L); |
|
84 } |
|
85 |
|
86 static void runTest(int initCount, int massCount, int maxSelectTime) |
|
87 throws Exception { |
|
88 testStartTime = System.nanoTime(); |
|
89 |
|
90 InetSocketAddress address = new InetSocketAddress("127.0.0.1", 7359); |
|
91 |
|
92 // Create server channel, add it to selector and run epoll_ctl. |
|
93 log("Setting up server"); |
|
94 Selector serverSelector = Selector.open(); |
|
95 ServerSocketChannel server = ServerSocketChannel.open(); |
|
96 server.configureBlocking(false); |
|
97 server.socket().bind(address, 5000); |
|
98 server.register(serverSelector, SelectionKey.OP_ACCEPT); |
|
99 serverSelector.selectNow(); |
|
100 |
|
101 log("Setting up client"); |
|
102 ClientThread client = new ClientThread(address); |
|
103 client.start(); |
|
104 Thread.sleep(100); |
|
105 |
|
106 // Set up initial set of client sockets. |
|
107 log("Starting initial client connections"); |
|
108 client.connectClients(initCount); |
|
109 Thread.sleep(500); // Wait for client connections to arrive |
|
110 |
|
111 // Accept all initial client sockets, add to selector and run |
|
112 // epoll_ctl. |
|
113 log("Accepting initial connections"); |
|
114 List<SocketChannel> serverChannels1 = |
|
115 acceptAndAddAll(serverSelector, server, initCount); |
|
116 if (serverChannels1.size() != initCount) { |
|
117 throw new Exception("Accepted " + serverChannels1.size() + |
|
118 " instead of " + initCount); |
|
119 } |
|
120 serverSelector.selectNow(); |
|
121 |
|
122 // Set up mass set of client sockets. |
|
123 log("Requesting mass client connections"); |
|
124 client.connectClients(massCount); |
|
125 Thread.sleep(500); // Wait for client connections to arrive |
|
126 |
|
127 // Accept all mass client sockets, add to selector and do NOT |
|
128 // run epoll_ctl. |
|
129 log("Accepting mass connections"); |
|
130 List<SocketChannel> serverChannels2 = |
|
131 acceptAndAddAll(serverSelector, server, massCount); |
|
132 if (serverChannels2.size() != massCount) { |
|
133 throw new Exception("Accepted " + serverChannels2.size() + |
|
134 " instead of " + massCount); |
|
135 } |
|
136 |
|
137 // Close initial set of sockets. |
|
138 log("Closing initial connections"); |
|
139 closeAll(serverChannels1); |
|
140 |
|
141 // Now get the timing of select() call. |
|
142 log("Running the final select call"); |
|
143 long startTime = System.nanoTime(); |
|
144 serverSelector.selectNow(); |
|
145 long duration = durationMillis(startTime); |
|
146 log("Init count = " + initCount + |
|
147 ", mass count = " + massCount + |
|
148 ", duration = " + duration + "ms"); |
|
149 |
|
150 if (duration > maxSelectTime) { |
|
151 System.out.println |
|
152 ("\n\n\n\n\nFAILURE: The final selectNow() took " + |
|
153 duration + "ms " + |
|
154 "- seems like O(N^2) bug is still here\n\n"); |
|
155 System.exit(1); |
|
156 } |
|
157 } |
|
158 |
|
159 static List<SocketChannel> acceptAndAddAll(Selector selector, |
|
160 ServerSocketChannel server, |
|
161 int expected) |
|
162 throws Exception { |
|
163 int retryCount = 0; |
|
164 int acceptCount = 0; |
|
165 List<SocketChannel> channels = new ArrayList<SocketChannel>(); |
|
166 while (channels.size() < expected) { |
|
167 SocketChannel channel = server.accept(); |
|
168 if (channel == null) { |
|
169 log("accept() returned null " + |
|
170 "after accepting " + acceptCount + " more connections"); |
|
171 acceptCount = 0; |
|
172 if (retryCount < 10) { |
|
173 // See if more new sockets got stacked behind. |
|
174 retryCount++; |
|
175 Thread.sleep(500); |
|
176 continue; |
|
177 } |
|
178 break; |
|
179 } |
|
180 retryCount = 0; |
|
181 acceptCount++; |
|
182 channel.configureBlocking(false); |
|
183 channel.register(selector, SelectionKey.OP_READ); |
|
184 channels.add(channel); |
|
185 } |
|
186 // Cause an additional updateList entry per channel. |
|
187 for (SocketChannel channel : channels) { |
|
188 channel.register(selector, SelectionKey.OP_WRITE); |
|
189 } |
|
190 return channels; |
|
191 } |
|
192 |
|
193 static void closeAll(List<SocketChannel> channels) |
|
194 throws Exception { |
|
195 for (SocketChannel channel : channels) { |
|
196 channel.close(); |
|
197 } |
|
198 } |
|
199 |
|
200 static class ClientThread extends Thread { |
|
201 private final SocketAddress address; |
|
202 private final Selector selector; |
|
203 private int connectionsNeeded; |
|
204 private int totalCreated; |
|
205 |
|
206 ClientThread(SocketAddress address) throws Exception { |
|
207 this.address = address; |
|
208 selector = Selector.open(); |
|
209 setDaemon(true); |
|
210 } |
|
211 |
|
212 void connectClients(int count) throws Exception { |
|
213 synchronized (this) { |
|
214 connectionsNeeded += count; |
|
215 } |
|
216 selector.wakeup(); |
|
217 } |
|
218 |
|
219 @Override |
|
220 public void run() { |
|
221 try { |
|
222 handleClients(); |
|
223 } catch (Throwable e) { |
|
224 e.printStackTrace(); |
|
225 System.exit(1); |
|
226 } |
|
227 } |
|
228 |
|
229 private void handleClients() throws Exception { |
|
230 int selectCount = 0; |
|
231 while (true) { |
|
232 int createdCount = 0; |
|
233 synchronized (this) { |
|
234 if (connectionsNeeded > 0) { |
|
235 |
|
236 while (connectionsNeeded > 0 && createdCount < 20) { |
|
237 connectionsNeeded--; |
|
238 createdCount++; |
|
239 totalCreated++; |
|
240 |
|
241 SocketChannel channel = SocketChannel.open(); |
|
242 channel.configureBlocking(false); |
|
243 channel.connect(address); |
|
244 if (!channel.finishConnect()) { |
|
245 channel.register(selector, |
|
246 SelectionKey.OP_CONNECT); |
|
247 } |
|
248 } |
|
249 |
|
250 log("Started total of " + |
|
251 totalCreated + " client connections"); |
|
252 Thread.sleep(200); |
|
253 } |
|
254 } |
|
255 |
|
256 if (createdCount > 0) { |
|
257 selector.selectNow(); |
|
258 } else { |
|
259 selectCount++; |
|
260 long startTime = System.nanoTime(); |
|
261 selector.select(); |
|
262 long duration = durationMillis(startTime); |
|
263 log("Exited clientSelector.select(), loop #" |
|
264 + selectCount + ", duration = " + duration + "ms"); |
|
265 } |
|
266 |
|
267 int keyCount = -1; |
|
268 Iterator<SelectionKey> keys = |
|
269 selector.selectedKeys().iterator(); |
|
270 while (keys.hasNext()) { |
|
271 SelectionKey key = keys.next(); |
|
272 synchronized (key) { |
|
273 keyCount++; |
|
274 keys.remove(); |
|
275 if (!key.isValid()) { |
|
276 log("Ignoring client key #" + keyCount); |
|
277 continue; |
|
278 } |
|
279 int readyOps = key.readyOps(); |
|
280 if (readyOps == SelectionKey.OP_CONNECT) { |
|
281 key.interestOps(0); |
|
282 ((SocketChannel) key.channel()).finishConnect(); |
|
283 } else { |
|
284 log("readyOps() on client key #" + keyCount + |
|
285 " returned " + readyOps); |
|
286 } |
|
287 } |
|
288 } |
|
289 } |
|
290 } |
|
291 } |
|
292 } |