|
1 /* |
|
2 * Copyright 2008-2009 Sun Microsystems, Inc. All Rights Reserved. |
|
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
|
4 * |
|
5 * This code is free software; you can redistribute it and/or modify it |
|
6 * under the terms of the GNU General Public License version 2 only, as |
|
7 * published by the Free Software Foundation. Sun designates this |
|
8 * particular file as subject to the "Classpath" exception as provided |
|
9 * by Sun in the LICENSE file that accompanied this code. |
|
10 * |
|
11 * This code is distributed in the hope that it will be useful, but WITHOUT |
|
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
|
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
|
14 * version 2 for more details (a copy is included in the LICENSE file that |
|
15 * accompanied this code). |
|
16 * |
|
17 * You should have received a copy of the GNU General Public License version |
|
18 * 2 along with this work; if not, write to the Free Software Foundation, |
|
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
|
20 * |
|
21 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, |
|
22 * CA 95054 USA or visit www.sun.com if you need additional information or |
|
23 * have any questions. |
|
24 */ |
|
25 |
|
26 package sun.nio.ch; |
|
27 |
|
28 import java.nio.channels.Channel; |
|
29 import java.nio.channels.AsynchronousChannelGroup; |
|
30 import java.nio.channels.spi.AsynchronousChannelProvider; |
|
31 import java.io.IOException; |
|
32 import java.io.FileDescriptor; |
|
33 import java.util.Queue; |
|
34 import java.util.concurrent.*; |
|
35 import java.util.concurrent.locks.*; |
|
36 import java.util.concurrent.atomic.AtomicInteger; |
|
37 import java.security.PrivilegedAction; |
|
38 import java.security.AccessController; |
|
39 import java.security.AccessControlContext; |
|
40 import sun.security.action.GetIntegerAction; |
|
41 |
|
42 /** |
|
43 * Base implementation of AsynchronousChannelGroup |
|
44 */ |
|
45 |
|
46 abstract class AsynchronousChannelGroupImpl |
|
47 extends AsynchronousChannelGroup implements Executor |
|
48 { |
|
49 // number of internal threads handling I/O events when using an unbounded |
|
50 // thread pool. Internal threads do not dispatch to completion handlers. |
|
51 private static final int internalThreadCount = AccessController.doPrivileged( |
|
52 new GetIntegerAction("sun.nio.ch.internalThreadPoolSize", 1)); |
|
53 |
|
54 // associated thread pool |
|
55 private final ThreadPool pool; |
|
56 |
|
57 // number of tasks running (including internal) |
|
58 private final AtomicInteger threadCount = new AtomicInteger(); |
|
59 |
|
60 // associated Executor for timeouts |
|
61 private ScheduledThreadPoolExecutor timeoutExecutor; |
|
62 |
|
63 // task queue for when using a fixed thread pool. In that case, thread |
|
64 // waiting on I/O events must be awokon to poll tasks from this queue. |
|
65 private final Queue<Runnable> taskQueue; |
|
66 |
|
67 // group shutdown |
|
68 // shutdownLock is RW lock so as to allow for concurrent queuing of tasks |
|
69 // when using a fixed thread pool. |
|
70 private final ReadWriteLock shutdownLock = new ReentrantReadWriteLock(); |
|
71 private final Object shutdownNowLock = new Object(); |
|
72 private volatile boolean shutdown; |
|
73 private volatile boolean terminateInitiated; |
|
74 |
|
75 AsynchronousChannelGroupImpl(AsynchronousChannelProvider provider, |
|
76 ThreadPool pool) |
|
77 { |
|
78 super(provider); |
|
79 this.pool = pool; |
|
80 |
|
81 if (pool.isFixedThreadPool()) { |
|
82 taskQueue = new ConcurrentLinkedQueue<Runnable>(); |
|
83 } else { |
|
84 taskQueue = null; // not used |
|
85 } |
|
86 |
|
87 // use default thread factory as thread should not be visible to |
|
88 // application (it doesn't execute completion handlers). |
|
89 this.timeoutExecutor = (ScheduledThreadPoolExecutor) |
|
90 Executors.newScheduledThreadPool(1, ThreadPool.defaultThreadFactory()); |
|
91 this.timeoutExecutor.setRemoveOnCancelPolicy(true); |
|
92 } |
|
93 |
|
94 final ExecutorService executor() { |
|
95 return pool.executor(); |
|
96 } |
|
97 |
|
98 final boolean isFixedThreadPool() { |
|
99 return pool.isFixedThreadPool(); |
|
100 } |
|
101 |
|
102 final int fixedThreadCount() { |
|
103 if (isFixedThreadPool()) { |
|
104 return pool.poolSize(); |
|
105 } else { |
|
106 return pool.poolSize() + internalThreadCount; |
|
107 } |
|
108 } |
|
109 |
|
110 private Runnable bindToGroup(final Runnable task) { |
|
111 final AsynchronousChannelGroupImpl thisGroup = this; |
|
112 return new Runnable() { |
|
113 public void run() { |
|
114 Invoker.bindToGroup(thisGroup); |
|
115 task.run(); |
|
116 } |
|
117 }; |
|
118 } |
|
119 |
|
120 private void startInternalThread(final Runnable task) { |
|
121 AccessController.doPrivileged(new PrivilegedAction<Void>() { |
|
122 @Override |
|
123 public Void run() { |
|
124 // internal threads should not be visible to application so |
|
125 // cannot use user-supplied thread factory |
|
126 ThreadPool.defaultThreadFactory().newThread(task).start(); |
|
127 return null; |
|
128 } |
|
129 }); |
|
130 } |
|
131 |
|
132 protected final void startThreads(Runnable task) { |
|
133 if (!isFixedThreadPool()) { |
|
134 for (int i=0; i<internalThreadCount; i++) { |
|
135 startInternalThread(task); |
|
136 threadCount.incrementAndGet(); |
|
137 } |
|
138 } |
|
139 if (pool.poolSize() > 0) { |
|
140 task = bindToGroup(task); |
|
141 try { |
|
142 for (int i=0; i<pool.poolSize(); i++) { |
|
143 pool.executor().execute(task); |
|
144 threadCount.incrementAndGet(); |
|
145 } |
|
146 } catch (RejectedExecutionException x) { |
|
147 // nothing we can do |
|
148 } |
|
149 } |
|
150 } |
|
151 |
|
152 final int threadCount() { |
|
153 return threadCount.get(); |
|
154 } |
|
155 |
|
156 /** |
|
157 * Invoked by tasks as they terminate |
|
158 */ |
|
159 final int threadExit(Runnable task, boolean replaceMe) { |
|
160 if (replaceMe) { |
|
161 try { |
|
162 if (Invoker.isBoundToAnyGroup()) { |
|
163 // submit new task to replace this thread |
|
164 pool.executor().execute(bindToGroup(task)); |
|
165 } else { |
|
166 // replace internal thread |
|
167 startInternalThread(task); |
|
168 } |
|
169 return threadCount.get(); |
|
170 } catch (RejectedExecutionException x) { |
|
171 // unable to replace |
|
172 } |
|
173 } |
|
174 return threadCount.decrementAndGet(); |
|
175 } |
|
176 |
|
177 /** |
|
178 * Wakes up a thread waiting for I/O events to execute the given task. |
|
179 */ |
|
180 abstract void executeOnHandlerTask(Runnable task); |
|
181 |
|
182 /** |
|
183 * For a fixed thread pool the task is queued to a thread waiting on I/O |
|
184 * events. For other thread pools we simply submit the task to the thread |
|
185 * pool. |
|
186 */ |
|
187 final void executeOnPooledThread(Runnable task) { |
|
188 if (isFixedThreadPool()) { |
|
189 executeOnHandlerTask(task); |
|
190 } else { |
|
191 pool.executor().execute(bindToGroup(task)); |
|
192 } |
|
193 } |
|
194 |
|
195 final void offerTask(Runnable task) { |
|
196 taskQueue.offer(task); |
|
197 } |
|
198 |
|
199 final Runnable pollTask() { |
|
200 return (taskQueue == null) ? null : taskQueue.poll(); |
|
201 } |
|
202 |
|
203 final Future<?> schedule(Runnable task, long timeout, TimeUnit unit) { |
|
204 try { |
|
205 return timeoutExecutor.schedule(task, timeout, unit); |
|
206 } catch (RejectedExecutionException rej) { |
|
207 if (terminateInitiated) { |
|
208 // no timeout scheduled as group is terminating |
|
209 return null; |
|
210 } |
|
211 throw new AssertionError(rej); |
|
212 } |
|
213 } |
|
214 |
|
215 @Override |
|
216 public final boolean isShutdown() { |
|
217 return shutdown; |
|
218 } |
|
219 |
|
220 @Override |
|
221 public final boolean isTerminated() { |
|
222 return pool.executor().isTerminated(); |
|
223 } |
|
224 |
|
225 /** |
|
226 * Returns true if there are no channels in the group |
|
227 */ |
|
228 abstract boolean isEmpty(); |
|
229 |
|
230 /** |
|
231 * Attaches a foreign channel to this group. |
|
232 */ |
|
233 abstract Object attachForeignChannel(Channel channel, FileDescriptor fdo) |
|
234 throws IOException; |
|
235 |
|
236 /** |
|
237 * Detaches a foreign channel from this group. |
|
238 */ |
|
239 abstract void detachForeignChannel(Object key); |
|
240 |
|
241 /** |
|
242 * Closes all channels in the group |
|
243 */ |
|
244 abstract void closeAllChannels() throws IOException; |
|
245 |
|
246 /** |
|
247 * Shutdown all tasks waiting for I/O events. |
|
248 */ |
|
249 abstract void shutdownHandlerTasks(); |
|
250 |
|
251 private void shutdownExecutors() { |
|
252 AccessController.doPrivileged(new PrivilegedAction<Void>() { |
|
253 public Void run() { |
|
254 pool.executor().shutdown(); |
|
255 timeoutExecutor.shutdown(); |
|
256 return null; |
|
257 } |
|
258 }); |
|
259 } |
|
260 |
|
261 @Override |
|
262 public final void shutdown() { |
|
263 shutdownLock.writeLock().lock(); |
|
264 try { |
|
265 if (shutdown) { |
|
266 // already shutdown |
|
267 return; |
|
268 } |
|
269 shutdown = true; |
|
270 } finally { |
|
271 shutdownLock.writeLock().unlock(); |
|
272 } |
|
273 |
|
274 // if there are channels in the group then shutdown will continue |
|
275 // when the last channel is closed |
|
276 if (!isEmpty()) { |
|
277 return; |
|
278 } |
|
279 // initiate termination (acquire shutdownNowLock to ensure that other |
|
280 // threads invoking shutdownNow will block). |
|
281 synchronized (shutdownNowLock) { |
|
282 if (!terminateInitiated) { |
|
283 terminateInitiated = true; |
|
284 shutdownHandlerTasks(); |
|
285 shutdownExecutors(); |
|
286 } |
|
287 } |
|
288 } |
|
289 |
|
290 @Override |
|
291 public final void shutdownNow() throws IOException { |
|
292 shutdownLock.writeLock().lock(); |
|
293 try { |
|
294 shutdown = true; |
|
295 } finally { |
|
296 shutdownLock.writeLock().unlock(); |
|
297 } |
|
298 synchronized (shutdownNowLock) { |
|
299 if (!terminateInitiated) { |
|
300 terminateInitiated = true; |
|
301 closeAllChannels(); |
|
302 shutdownHandlerTasks(); |
|
303 shutdownExecutors(); |
|
304 } |
|
305 } |
|
306 } |
|
307 |
|
308 @Override |
|
309 public final boolean awaitTermination(long timeout, TimeUnit unit) |
|
310 throws InterruptedException |
|
311 { |
|
312 return pool.executor().awaitTermination(timeout, unit); |
|
313 } |
|
314 |
|
315 /** |
|
316 * Executes the given command on one of the channel group's pooled threads. |
|
317 */ |
|
318 @Override |
|
319 public final void execute(Runnable task) { |
|
320 SecurityManager sm = System.getSecurityManager(); |
|
321 if (sm != null) { |
|
322 // when a security manager is installed then the user's task |
|
323 // must be run with the current calling context |
|
324 final AccessControlContext acc = AccessController.getContext(); |
|
325 final Runnable delegate = task; |
|
326 task = new Runnable() { |
|
327 @Override |
|
328 public void run() { |
|
329 AccessController.doPrivileged(new PrivilegedAction<Void>() { |
|
330 @Override |
|
331 public Void run() { |
|
332 delegate.run(); |
|
333 return null; |
|
334 } |
|
335 }, acc); |
|
336 } |
|
337 }; |
|
338 } |
|
339 executeOnPooledThread(task); |
|
340 } |
|
341 } |