|
1 /* |
|
2 * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved. |
|
3 * Copyright 2012 SAP AG. All rights reserved. |
|
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
|
5 * |
|
6 * This code is free software; you can redistribute it and/or modify it |
|
7 * under the terms of the GNU General Public License version 2 only, as |
|
8 * published by the Free Software Foundation. Oracle designates this |
|
9 * particular file as subject to the "Classpath" exception as provided |
|
10 * by Oracle in the LICENSE file that accompanied this code. |
|
11 * |
|
12 * This code is distributed in the hope that it will be useful, but WITHOUT |
|
13 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
|
14 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
|
15 * version 2 for more details (a copy is included in the LICENSE file that |
|
16 * accompanied this code). |
|
17 * |
|
18 * You should have received a copy of the GNU General Public License version |
|
19 * 2 along with this work; if not, write to the Free Software Foundation, |
|
20 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
|
21 * |
|
22 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
|
23 * or visit www.oracle.com if you need additional information or have any |
|
24 * questions. |
|
25 */ |
|
26 |
|
27 package sun.nio.ch; |
|
28 |
|
29 import java.nio.channels.spi.AsynchronousChannelProvider; |
|
30 import java.io.IOException; |
|
31 import java.util.HashSet; |
|
32 import java.util.Iterator; |
|
33 import java.util.concurrent.ArrayBlockingQueue; |
|
34 import java.util.concurrent.RejectedExecutionException; |
|
35 import java.util.concurrent.atomic.AtomicInteger; |
|
36 import java.util.concurrent.locks.ReentrantLock; |
|
37 import sun.misc.Unsafe; |
|
38 |
|
39 /** |
|
40 * AsynchronousChannelGroup implementation based on the AIX pollset framework. |
|
41 */ |
|
42 final class AixPollPort |
|
43 extends Port |
|
44 { |
|
45 private static final Unsafe unsafe = Unsafe.getUnsafe(); |
|
46 |
|
47 static { |
|
48 IOUtil.load(); |
|
49 init(); |
|
50 } |
|
51 |
|
52 /** |
|
53 * struct pollfd { |
|
54 * int fd; |
|
55 * short events; |
|
56 * short revents; |
|
57 * } |
|
58 */ |
|
59 private static final int SIZEOF_POLLFD = eventSize(); |
|
60 private static final int OFFSETOF_EVENTS = eventsOffset(); |
|
61 private static final int OFFSETOF_REVENTS = reventsOffset(); |
|
62 private static final int OFFSETOF_FD = fdOffset(); |
|
63 |
|
64 // opcodes |
|
65 private static final int PS_ADD = 0x0; |
|
66 private static final int PS_MOD = 0x1; |
|
67 private static final int PS_DELETE = 0x2; |
|
68 |
|
69 // maximum number of events to poll at a time |
|
70 private static final int MAX_POLL_EVENTS = 512; |
|
71 |
|
72 // pollset ID |
|
73 private final int pollset; |
|
74 |
|
75 // true if port is closed |
|
76 private boolean closed; |
|
77 |
|
78 // socket pair used for wakeup |
|
79 private final int sp[]; |
|
80 |
|
81 // socket pair used to indicate pending pollsetCtl calls |
|
82 // Background info: pollsetCtl blocks when another thread is in a pollsetPoll call. |
|
83 private final int ctlSp[]; |
|
84 |
|
85 // number of wakeups pending |
|
86 private final AtomicInteger wakeupCount = new AtomicInteger(); |
|
87 |
|
88 // address of the poll array passed to pollset_poll |
|
89 private final long address; |
|
90 |
|
91 // encapsulates an event for a channel |
|
92 static class Event { |
|
93 final PollableChannel channel; |
|
94 final int events; |
|
95 |
|
96 Event(PollableChannel channel, int events) { |
|
97 this.channel = channel; |
|
98 this.events = events; |
|
99 } |
|
100 |
|
101 PollableChannel channel() { return channel; } |
|
102 int events() { return events; } |
|
103 } |
|
104 |
|
105 // queue of events for cases that a polling thread dequeues more than one |
|
106 // event |
|
107 private final ArrayBlockingQueue<Event> queue; |
|
108 private final Event NEED_TO_POLL = new Event(null, 0); |
|
109 private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0); |
|
110 |
|
111 // encapsulates a pollset control event for a file descriptor |
|
112 static class ControlEvent { |
|
113 final int fd; |
|
114 final int events; |
|
115 final boolean removeOnly; |
|
116 int error = 0; |
|
117 |
|
118 ControlEvent(int fd, int events, boolean removeOnly) { |
|
119 this.fd = fd; |
|
120 this.events = events; |
|
121 this.removeOnly = removeOnly; |
|
122 } |
|
123 |
|
124 int fd() { return fd; } |
|
125 int events() { return events; } |
|
126 boolean removeOnly() { return removeOnly; } |
|
127 int error() { return error; } |
|
128 void setError(int error) { this.error = error; } |
|
129 } |
|
130 |
|
131 // queue of control events that need to be processed |
|
132 // (this object is also used for synchronization) |
|
133 private final HashSet<ControlEvent> controlQueue = new HashSet<ControlEvent>(); |
|
134 |
|
135 // lock used to check whether a poll operation is ongoing |
|
136 private final ReentrantLock controlLock = new ReentrantLock(); |
|
137 |
|
138 AixPollPort(AsynchronousChannelProvider provider, ThreadPool pool) |
|
139 throws IOException |
|
140 { |
|
141 super(provider, pool); |
|
142 |
|
143 // open pollset |
|
144 this.pollset = pollsetCreate(); |
|
145 |
|
146 // create socket pair for wakeup mechanism |
|
147 int[] sv = new int[2]; |
|
148 try { |
|
149 socketpair(sv); |
|
150 // register one end with pollset |
|
151 pollsetCtl(pollset, PS_ADD, sv[0], POLLIN); |
|
152 } catch (IOException x) { |
|
153 pollsetDestroy(pollset); |
|
154 throw x; |
|
155 } |
|
156 this.sp = sv; |
|
157 |
|
158 // create socket pair for pollset control mechanism |
|
159 sv = new int[2]; |
|
160 try { |
|
161 socketpair(sv); |
|
162 // register one end with pollset |
|
163 pollsetCtl(pollset, PS_ADD, sv[0], POLLIN); |
|
164 } catch (IOException x) { |
|
165 pollsetDestroy(pollset); |
|
166 throw x; |
|
167 } |
|
168 this.ctlSp = sv; |
|
169 |
|
170 // allocate the poll array |
|
171 this.address = allocatePollArray(MAX_POLL_EVENTS); |
|
172 |
|
173 // create the queue and offer the special event to ensure that the first |
|
174 // threads polls |
|
175 this.queue = new ArrayBlockingQueue<Event>(MAX_POLL_EVENTS); |
|
176 this.queue.offer(NEED_TO_POLL); |
|
177 } |
|
178 |
|
179 AixPollPort start() { |
|
180 startThreads(new EventHandlerTask()); |
|
181 return this; |
|
182 } |
|
183 |
|
184 /** |
|
185 * Release all resources |
|
186 */ |
|
187 private void implClose() { |
|
188 synchronized (this) { |
|
189 if (closed) |
|
190 return; |
|
191 closed = true; |
|
192 } |
|
193 freePollArray(address); |
|
194 close0(sp[0]); |
|
195 close0(sp[1]); |
|
196 close0(ctlSp[0]); |
|
197 close0(ctlSp[1]); |
|
198 pollsetDestroy(pollset); |
|
199 } |
|
200 |
|
201 private void wakeup() { |
|
202 if (wakeupCount.incrementAndGet() == 1) { |
|
203 // write byte to socketpair to force wakeup |
|
204 try { |
|
205 interrupt(sp[1]); |
|
206 } catch (IOException x) { |
|
207 throw new AssertionError(x); |
|
208 } |
|
209 } |
|
210 } |
|
211 |
|
212 @Override |
|
213 void executeOnHandlerTask(Runnable task) { |
|
214 synchronized (this) { |
|
215 if (closed) |
|
216 throw new RejectedExecutionException(); |
|
217 offerTask(task); |
|
218 wakeup(); |
|
219 } |
|
220 } |
|
221 |
|
222 @Override |
|
223 void shutdownHandlerTasks() { |
|
224 /* |
|
225 * If no tasks are running then just release resources; otherwise |
|
226 * write to the one end of the socketpair to wakeup any polling threads. |
|
227 */ |
|
228 int nThreads = threadCount(); |
|
229 if (nThreads == 0) { |
|
230 implClose(); |
|
231 } else { |
|
232 // send interrupt to each thread |
|
233 while (nThreads-- > 0) { |
|
234 wakeup(); |
|
235 } |
|
236 } |
|
237 } |
|
238 |
|
239 // invoke by clients to register a file descriptor |
|
240 @Override |
|
241 void startPoll(int fd, int events) { |
|
242 queueControlEvent(new ControlEvent(fd, events, false)); |
|
243 } |
|
244 |
|
245 // Callback method for implementations that need special handling when fd is removed |
|
246 @Override |
|
247 protected void preUnregister(int fd) { |
|
248 queueControlEvent(new ControlEvent(fd, 0, true)); |
|
249 } |
|
250 |
|
251 // Add control event into queue and wait for completion. |
|
252 // In case the control lock is free, this method also tries to apply the control change directly. |
|
253 private void queueControlEvent(ControlEvent ev) { |
|
254 // pollsetCtl blocks when a poll call is ongoing. This is very probable. |
|
255 // Therefore we let the polling thread do the pollsetCtl call. |
|
256 synchronized (controlQueue) { |
|
257 controlQueue.add(ev); |
|
258 // write byte to socketpair to force wakeup |
|
259 try { |
|
260 interrupt(ctlSp[1]); |
|
261 } catch (IOException x) { |
|
262 throw new AssertionError(x); |
|
263 } |
|
264 do { |
|
265 // Directly empty queue if no poll call is ongoing. |
|
266 if (controlLock.tryLock()) { |
|
267 try { |
|
268 processControlQueue(); |
|
269 } finally { |
|
270 controlLock.unlock(); |
|
271 } |
|
272 } else { |
|
273 try { |
|
274 // Do not starve in case the polling thread returned before |
|
275 // we could write to ctlSp[1] but the polling thread did not |
|
276 // release the control lock until we checked. Therefore, use |
|
277 // a timed wait for the time being. |
|
278 controlQueue.wait(100); |
|
279 } catch (InterruptedException e) { |
|
280 // ignore exception and try again |
|
281 } |
|
282 } |
|
283 } while (controlQueue.contains(ev)); |
|
284 } |
|
285 if (ev.error() != 0) { |
|
286 throw new AssertionError(); |
|
287 } |
|
288 } |
|
289 |
|
290 // Process all events currently stored in the control queue. |
|
291 private void processControlQueue() { |
|
292 synchronized (controlQueue) { |
|
293 // On Aix it is only possible to set the event |
|
294 // bits on the first call of pollsetCtl. Later |
|
295 // calls only add bits, but cannot remove them. |
|
296 // Therefore, we always remove the file |
|
297 // descriptor ignoring the error and then add it. |
|
298 Iterator<ControlEvent> iter = controlQueue.iterator(); |
|
299 while (iter.hasNext()) { |
|
300 ControlEvent ev = iter.next(); |
|
301 pollsetCtl(pollset, PS_DELETE, ev.fd(), 0); |
|
302 if (!ev.removeOnly()) { |
|
303 ev.setError(pollsetCtl(pollset, PS_MOD, ev.fd(), ev.events())); |
|
304 } |
|
305 iter.remove(); |
|
306 } |
|
307 controlQueue.notifyAll(); |
|
308 } |
|
309 } |
|
310 |
|
311 /* |
|
312 * Task to process events from pollset and dispatch to the channel's |
|
313 * onEvent handler. |
|
314 * |
|
315 * Events are retreived from pollset in batch and offered to a BlockingQueue |
|
316 * where they are consumed by handler threads. A special "NEED_TO_POLL" |
|
317 * event is used to signal one consumer to re-poll when all events have |
|
318 * been consumed. |
|
319 */ |
|
320 private class EventHandlerTask implements Runnable { |
|
321 private Event poll() throws IOException { |
|
322 try { |
|
323 for (;;) { |
|
324 int n; |
|
325 controlLock.lock(); |
|
326 try { |
|
327 n = pollsetPoll(pollset, address, MAX_POLL_EVENTS); |
|
328 } finally { |
|
329 controlLock.unlock(); |
|
330 } |
|
331 /* |
|
332 * 'n' events have been read. Here we map them to their |
|
333 * corresponding channel in batch and queue n-1 so that |
|
334 * they can be handled by other handler threads. The last |
|
335 * event is handled by this thread (and so is not queued). |
|
336 */ |
|
337 fdToChannelLock.readLock().lock(); |
|
338 try { |
|
339 while (n-- > 0) { |
|
340 long eventAddress = getEvent(address, n); |
|
341 int fd = getDescriptor(eventAddress); |
|
342 |
|
343 // To emulate one shot semantic we need to remove |
|
344 // the file descriptor here. |
|
345 pollsetCtl(pollset, PS_DELETE, fd, 0); |
|
346 |
|
347 // wakeup |
|
348 if (fd == sp[0]) { |
|
349 if (wakeupCount.decrementAndGet() == 0) { |
|
350 // no more wakeups so drain pipe |
|
351 drain1(sp[0]); |
|
352 } |
|
353 |
|
354 // This is the only file descriptor without |
|
355 // one shot semantic => register it again. |
|
356 pollsetCtl(pollset, PS_ADD, sp[0], POLLIN); |
|
357 |
|
358 // queue special event if there are more events |
|
359 // to handle. |
|
360 if (n > 0) { |
|
361 queue.offer(EXECUTE_TASK_OR_SHUTDOWN); |
|
362 continue; |
|
363 } |
|
364 return EXECUTE_TASK_OR_SHUTDOWN; |
|
365 } |
|
366 |
|
367 // wakeup to process control event |
|
368 if (fd == ctlSp[0]) { |
|
369 synchronized (controlQueue) { |
|
370 drain1(ctlSp[0]); |
|
371 // This file descriptor does not have |
|
372 // one shot semantic => register it again. |
|
373 pollsetCtl(pollset, PS_ADD, ctlSp[0], POLLIN); |
|
374 processControlQueue(); |
|
375 } |
|
376 continue; |
|
377 } |
|
378 |
|
379 PollableChannel channel = fdToChannel.get(fd); |
|
380 if (channel != null) { |
|
381 int events = getRevents(eventAddress); |
|
382 Event ev = new Event(channel, events); |
|
383 |
|
384 // n-1 events are queued; This thread handles |
|
385 // the last one except for the wakeup |
|
386 if (n > 0) { |
|
387 queue.offer(ev); |
|
388 } else { |
|
389 return ev; |
|
390 } |
|
391 } |
|
392 } |
|
393 } finally { |
|
394 fdToChannelLock.readLock().unlock(); |
|
395 } |
|
396 } |
|
397 } finally { |
|
398 // to ensure that some thread will poll when all events have |
|
399 // been consumed |
|
400 queue.offer(NEED_TO_POLL); |
|
401 } |
|
402 } |
|
403 |
|
404 public void run() { |
|
405 Invoker.GroupAndInvokeCount myGroupAndInvokeCount = |
|
406 Invoker.getGroupAndInvokeCount(); |
|
407 final boolean isPooledThread = (myGroupAndInvokeCount != null); |
|
408 boolean replaceMe = false; |
|
409 Event ev; |
|
410 try { |
|
411 for (;;) { |
|
412 // reset invoke count |
|
413 if (isPooledThread) |
|
414 myGroupAndInvokeCount.resetInvokeCount(); |
|
415 |
|
416 try { |
|
417 replaceMe = false; |
|
418 ev = queue.take(); |
|
419 |
|
420 // no events and this thread has been "selected" to |
|
421 // poll for more. |
|
422 if (ev == NEED_TO_POLL) { |
|
423 try { |
|
424 ev = poll(); |
|
425 } catch (IOException x) { |
|
426 x.printStackTrace(); |
|
427 return; |
|
428 } |
|
429 } |
|
430 } catch (InterruptedException x) { |
|
431 continue; |
|
432 } |
|
433 |
|
434 // handle wakeup to execute task or shutdown |
|
435 if (ev == EXECUTE_TASK_OR_SHUTDOWN) { |
|
436 Runnable task = pollTask(); |
|
437 if (task == null) { |
|
438 // shutdown request |
|
439 return; |
|
440 } |
|
441 // run task (may throw error/exception) |
|
442 replaceMe = true; |
|
443 task.run(); |
|
444 continue; |
|
445 } |
|
446 |
|
447 // process event |
|
448 try { |
|
449 ev.channel().onEvent(ev.events(), isPooledThread); |
|
450 } catch (Error x) { |
|
451 replaceMe = true; throw x; |
|
452 } catch (RuntimeException x) { |
|
453 replaceMe = true; throw x; |
|
454 } |
|
455 } |
|
456 } finally { |
|
457 // last handler to exit when shutdown releases resources |
|
458 int remaining = threadExit(this, replaceMe); |
|
459 if (remaining == 0 && isShutdown()) { |
|
460 implClose(); |
|
461 } |
|
462 } |
|
463 } |
|
464 } |
|
465 |
|
466 /** |
|
467 * Allocates a poll array to handle up to {@code count} events. |
|
468 */ |
|
469 private static long allocatePollArray(int count) { |
|
470 return unsafe.allocateMemory(count * SIZEOF_POLLFD); |
|
471 } |
|
472 |
|
473 /** |
|
474 * Free a poll array |
|
475 */ |
|
476 private static void freePollArray(long address) { |
|
477 unsafe.freeMemory(address); |
|
478 } |
|
479 |
|
480 /** |
|
481 * Returns event[i]; |
|
482 */ |
|
483 private static long getEvent(long address, int i) { |
|
484 return address + (SIZEOF_POLLFD*i); |
|
485 } |
|
486 |
|
487 /** |
|
488 * Returns event->fd |
|
489 */ |
|
490 private static int getDescriptor(long eventAddress) { |
|
491 return unsafe.getInt(eventAddress + OFFSETOF_FD); |
|
492 } |
|
493 |
|
494 /** |
|
495 * Returns event->events |
|
496 */ |
|
497 private static int getEvents(long eventAddress) { |
|
498 return unsafe.getChar(eventAddress + OFFSETOF_EVENTS); |
|
499 } |
|
500 |
|
501 /** |
|
502 * Returns event->revents |
|
503 */ |
|
504 private static int getRevents(long eventAddress) { |
|
505 return unsafe.getChar(eventAddress + OFFSETOF_REVENTS); |
|
506 } |
|
507 |
|
508 // -- Native methods -- |
|
509 |
|
510 private static native void init(); |
|
511 |
|
512 private static native int eventSize(); |
|
513 |
|
514 private static native int eventsOffset(); |
|
515 |
|
516 private static native int reventsOffset(); |
|
517 |
|
518 private static native int fdOffset(); |
|
519 |
|
520 private static native int pollsetCreate() throws IOException; |
|
521 |
|
522 private static native int pollsetCtl(int pollset, int opcode, int fd, int events); |
|
523 |
|
524 private static native int pollsetPoll(int pollset, long pollAddress, int numfds) |
|
525 throws IOException; |
|
526 |
|
527 private static native void pollsetDestroy(int pollset); |
|
528 |
|
529 private static native void socketpair(int[] sv) throws IOException; |
|
530 |
|
531 private static native void interrupt(int fd) throws IOException; |
|
532 |
|
533 private static native void drain1(int fd) throws IOException; |
|
534 |
|
535 private static native void close0(int fd); |
|
536 } |