21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
22 * or visit www.oracle.com if you need additional information or have any |
22 * or visit www.oracle.com if you need additional information or have any |
23 * questions. |
23 * questions. |
24 */ |
24 */ |
25 |
25 |
26 /* |
|
27 * KQueueSelectorImpl.java |
|
28 * Implementation of Selector using FreeBSD / Mac OS X kqueues |
|
29 */ |
|
30 |
|
31 package sun.nio.ch; |
26 package sun.nio.ch; |
32 |
27 |
33 import java.io.IOException; |
28 import java.io.IOException; |
34 import java.nio.channels.ClosedSelectorException; |
29 import java.nio.channels.ClosedSelectorException; |
35 import java.nio.channels.SelectableChannel; |
30 import java.nio.channels.SelectableChannel; |
36 import java.nio.channels.SelectionKey; |
31 import java.nio.channels.SelectionKey; |
37 import java.nio.channels.Selector; |
32 import java.nio.channels.Selector; |
38 import java.nio.channels.spi.SelectorProvider; |
33 import java.nio.channels.spi.SelectorProvider; |
|
34 import java.util.ArrayDeque; |
|
35 import java.util.BitSet; |
|
36 import java.util.Deque; |
39 import java.util.HashMap; |
37 import java.util.HashMap; |
40 import java.util.Iterator; |
38 import java.util.Iterator; |
41 |
39 import java.util.Map; |
42 class KQueueSelectorImpl |
40 import java.util.concurrent.TimeUnit; |
43 extends SelectorImpl |
41 |
44 { |
42 import static sun.nio.ch.KQueue.EVFILT_READ; |
45 // File descriptors used for interrupt |
43 import static sun.nio.ch.KQueue.EVFILT_WRITE; |
|
44 import static sun.nio.ch.KQueue.EV_ADD; |
|
45 import static sun.nio.ch.KQueue.EV_DELETE; |
|
46 |
|
47 /** |
|
48 * KQueue based Selector implementation for macOS |
|
49 */ |
|
50 |
|
51 class KQueueSelectorImpl extends SelectorImpl { |
|
52 |
|
53 // maximum number of events to poll in one call to kqueue |
|
54 private static final int MAX_KEVENTS = 256; |
|
55 |
|
56 // kqueue file descriptor |
|
57 private final int kqfd; |
|
58 |
|
59 // address of poll array (event list) when polling for pending events |
|
60 private final long pollArrayAddress; |
|
61 |
|
62 // file descriptors used for interrupt |
46 private final int fd0; |
63 private final int fd0; |
47 private final int fd1; |
64 private final int fd1; |
48 |
65 |
49 // The kqueue manipulator |
66 // maps file descriptor to selection key, synchronize on selector |
50 private final KQueueArrayWrapper kqueueWrapper; |
67 private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>(); |
51 |
68 |
52 // Map from a file descriptor to an entry containing the selection key |
69 // file descriptors registered with kqueue, synchronize on selector |
53 private final HashMap<Integer, MapEntry> fdMap; |
70 private final BitSet registeredReadFilter = new BitSet(); |
54 |
71 private final BitSet registeredWriteFilter = new BitSet(); |
55 // True if this Selector has been closed |
72 |
56 private boolean closed; |
73 // pending new registrations/updates, queued by implRegister and putEventOps |
57 |
74 private final Object updateLock = new Object(); |
58 // Lock for interrupt triggering and clearing |
75 private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>(); |
|
76 private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>(); |
|
77 private final Deque<Integer> updateOps = new ArrayDeque<>(); |
|
78 |
|
79 // interrupt triggering and clearing |
59 private final Object interruptLock = new Object(); |
80 private final Object interruptLock = new Object(); |
60 private boolean interruptTriggered; |
81 private boolean interruptTriggered; |
61 |
82 |
62 // used by updateSelectedKeys to handle cases where the same file |
83 // used by updateSelectedKeys to handle cases where the same file |
63 // descriptor is polled by more than one filter |
84 // descriptor is polled by more than one filter |
64 private long updateCount; |
85 private int pollCount; |
65 |
86 |
66 // Used to map file descriptors to a selection key and "update count" |
|
67 // (see updateSelectedKeys for usage). |
|
68 private static class MapEntry { |
|
69 SelectionKeyImpl ski; |
|
70 long updateCount; |
|
71 MapEntry(SelectionKeyImpl ski) { |
|
72 this.ski = ski; |
|
73 } |
|
74 } |
|
75 |
|
76 /** |
|
77 * Package private constructor called by factory method in |
|
78 * the abstract superclass Selector. |
|
79 */ |
|
80 KQueueSelectorImpl(SelectorProvider sp) throws IOException { |
87 KQueueSelectorImpl(SelectorProvider sp) throws IOException { |
81 super(sp); |
88 super(sp); |
82 long fds = IOUtil.makePipe(false); |
89 |
83 fd0 = (int)(fds >>> 32); |
90 this.kqfd = KQueue.create(); |
84 fd1 = (int)fds; |
91 this.pollArrayAddress = KQueue.allocatePollArray(MAX_KEVENTS); |
|
92 |
85 try { |
93 try { |
86 kqueueWrapper = new KQueueArrayWrapper(fd0, fd1); |
94 long fds = IOUtil.makePipe(false); |
87 fdMap = new HashMap<>(); |
95 this.fd0 = (int) (fds >>> 32); |
88 } catch (Throwable t) { |
96 this.fd1 = (int) fds; |
89 try { |
97 } catch (IOException ioe) { |
90 FileDispatcherImpl.closeIntFD(fd0); |
98 KQueue.freePollArray(pollArrayAddress); |
91 } catch (IOException ioe0) { |
99 FileDispatcherImpl.closeIntFD(kqfd); |
92 t.addSuppressed(ioe0); |
100 throw ioe; |
93 } |
101 } |
94 try { |
102 |
95 FileDispatcherImpl.closeIntFD(fd1); |
103 // register one end of the socket pair for wakeups |
96 } catch (IOException ioe1) { |
104 KQueue.register(kqfd, fd0, EVFILT_READ, EV_ADD); |
97 t.addSuppressed(ioe1); |
|
98 } |
|
99 throw t; |
|
100 } |
|
101 } |
105 } |
102 |
106 |
103 private void ensureOpen() { |
107 private void ensureOpen() { |
104 if (closed) |
108 if (!isOpen()) |
105 throw new ClosedSelectorException(); |
109 throw new ClosedSelectorException(); |
106 } |
110 } |
107 |
111 |
108 @Override |
112 @Override |
109 protected int doSelect(long timeout) |
113 protected int doSelect(long timeout) throws IOException { |
110 throws IOException |
114 assert Thread.holdsLock(this); |
111 { |
115 |
112 ensureOpen(); |
|
113 int numEntries; |
116 int numEntries; |
|
117 processUpdateQueue(); |
114 processDeregisterQueue(); |
118 processDeregisterQueue(); |
115 try { |
119 try { |
116 begin(); |
120 begin(); |
117 numEntries = kqueueWrapper.poll(timeout); |
121 |
|
122 long to = Math.min(timeout, Integer.MAX_VALUE); // max kqueue timeout |
|
123 boolean timedPoll = (to > 0); |
|
124 do { |
|
125 long startTime = timedPoll ? System.nanoTime() : 0; |
|
126 numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to); |
|
127 if (numEntries == IOStatus.INTERRUPTED && timedPoll) { |
|
128 // timed poll interrupted so need to adjust timeout |
|
129 long adjust = System.nanoTime() - startTime; |
|
130 to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS); |
|
131 if (to <= 0) { |
|
132 // timeout expired so no retry |
|
133 numEntries = 0; |
|
134 } |
|
135 } |
|
136 } while (numEntries == IOStatus.INTERRUPTED); |
|
137 assert IOStatus.check(numEntries); |
|
138 |
118 } finally { |
139 } finally { |
119 end(); |
140 end(); |
120 } |
141 } |
121 processDeregisterQueue(); |
142 processDeregisterQueue(); |
122 return updateSelectedKeys(numEntries); |
143 return updateSelectedKeys(numEntries); |
|
144 } |
|
145 |
|
146 /** |
|
147 * Process new registrations and changes to the interest ops. |
|
148 */ |
|
149 private void processUpdateQueue() { |
|
150 assert Thread.holdsLock(this); |
|
151 |
|
152 synchronized (updateLock) { |
|
153 SelectionKeyImpl ski; |
|
154 |
|
155 // new registrations |
|
156 while ((ski = newKeys.pollFirst()) != null) { |
|
157 if (ski.isValid()) { |
|
158 SelChImpl ch = ski.channel; |
|
159 int fd = ch.getFDVal(); |
|
160 SelectionKeyImpl previous = fdToKey.put(fd, ski); |
|
161 assert previous == null; |
|
162 assert registeredReadFilter.get(fd) == false; |
|
163 assert registeredWriteFilter.get(fd) == false; |
|
164 } |
|
165 } |
|
166 |
|
167 // changes to interest ops |
|
168 assert updateKeys.size() == updateOps.size(); |
|
169 while ((ski = updateKeys.pollFirst()) != null) { |
|
170 int ops = updateOps.pollFirst(); |
|
171 int fd = ski.channel.getFDVal(); |
|
172 if (ski.isValid() && fdToKey.containsKey(fd)) { |
|
173 // add or delete interest in read events |
|
174 if (registeredReadFilter.get(fd)) { |
|
175 if ((ops & Net.POLLIN) == 0) { |
|
176 KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE); |
|
177 registeredReadFilter.clear(fd); |
|
178 } |
|
179 } else if ((ops & Net.POLLIN) != 0) { |
|
180 KQueue.register(kqfd, fd, EVFILT_READ, EV_ADD); |
|
181 registeredReadFilter.set(fd); |
|
182 } |
|
183 |
|
184 // add or delete interest in write events |
|
185 if (registeredWriteFilter.get(fd)) { |
|
186 if ((ops & Net.POLLOUT) == 0) { |
|
187 KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE); |
|
188 registeredWriteFilter.clear(fd); |
|
189 } |
|
190 } else if ((ops & Net.POLLOUT) != 0) { |
|
191 KQueue.register(kqfd, fd, EVFILT_WRITE, EV_ADD); |
|
192 registeredWriteFilter.set(fd); |
|
193 } |
|
194 } |
|
195 } |
|
196 } |
123 } |
197 } |
124 |
198 |
125 /** |
199 /** |
126 * Update the keys whose fd's have been selected by kqueue. |
200 * Update the keys whose fd's have been selected by kqueue. |
127 * Add the ready keys to the selected key set. |
201 * Add the ready keys to the selected key set. |
128 * If the interrupt fd has been selected, drain it and clear the interrupt. |
202 * If the interrupt fd has been selected, drain it and clear the interrupt. |
129 */ |
203 */ |
130 private int updateSelectedKeys(int numEntries) |
204 private int updateSelectedKeys(int numEntries) throws IOException { |
131 throws IOException |
205 assert Thread.holdsLock(this); |
132 { |
206 assert Thread.holdsLock(nioSelectedKeys()); |
|
207 |
133 int numKeysUpdated = 0; |
208 int numKeysUpdated = 0; |
134 boolean interrupted = false; |
209 boolean interrupted = false; |
135 |
210 |
136 // A file descriptor may be registered with kqueue with more than one |
211 // A file descriptor may be registered with kqueue with more than one |
137 // filter and so there may be more than one event for a fd. The update |
212 // filter and so there may be more than one event for a fd. The poll |
138 // count in the MapEntry tracks when the fd was last updated and this |
213 // count is incremented here and compared against the SelectionKey's |
139 // ensures that the ready ops are updated rather than replaced by a |
214 // "lastPolled" field. This ensures that the ready ops is updated rather |
140 // second or subsequent event. |
215 // than replaced when a file descriptor is polled by both the read and |
141 updateCount++; |
216 // write filter. |
|
217 pollCount++; |
142 |
218 |
143 for (int i = 0; i < numEntries; i++) { |
219 for (int i = 0; i < numEntries; i++) { |
144 int nextFD = kqueueWrapper.getDescriptor(i); |
220 long kevent = KQueue.getEvent(pollArrayAddress, i); |
145 if (nextFD == fd0) { |
221 int fd = KQueue.getDescriptor(kevent); |
|
222 if (fd == fd0) { |
146 interrupted = true; |
223 interrupted = true; |
147 } else { |
224 } else { |
148 MapEntry me = fdMap.get(Integer.valueOf(nextFD)); |
225 SelectionKeyImpl ski = fdToKey.get(fd); |
149 if (me != null) { |
226 if (ski != null) { |
150 int rOps = kqueueWrapper.getReventOps(i); |
227 int rOps = 0; |
151 SelectionKeyImpl ski = me.ski; |
228 short filter = KQueue.getFilter(kevent); |
|
229 if (filter == EVFILT_READ) { |
|
230 rOps |= Net.POLLIN; |
|
231 } else if (filter == EVFILT_WRITE) { |
|
232 rOps |= Net.POLLOUT; |
|
233 } |
|
234 |
152 if (selectedKeys.contains(ski)) { |
235 if (selectedKeys.contains(ski)) { |
153 // first time this file descriptor has been encountered on this |
236 // file descriptor may be polled more than once per poll |
154 // update? |
237 if (ski.lastPolled != pollCount) { |
155 if (me.updateCount != updateCount) { |
|
156 if (ski.channel.translateAndSetReadyOps(rOps, ski)) { |
238 if (ski.channel.translateAndSetReadyOps(rOps, ski)) { |
157 numKeysUpdated++; |
239 numKeysUpdated++; |
158 me.updateCount = updateCount; |
240 ski.lastPolled = pollCount; |
159 } |
241 } |
160 } else { |
242 } else { |
161 // ready ops have already been set on this update |
243 // ready ops have already been set on this update |
162 ski.channel.translateAndUpdateReadyOps(rOps, ski); |
244 ski.channel.translateAndUpdateReadyOps(rOps, ski); |
163 } |
245 } |
164 } else { |
246 } else { |
165 ski.channel.translateAndSetReadyOps(rOps, ski); |
247 ski.channel.translateAndSetReadyOps(rOps, ski); |
166 if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) { |
248 if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) { |
167 selectedKeys.add(ski); |
249 selectedKeys.add(ski); |
168 numKeysUpdated++; |
250 numKeysUpdated++; |
169 me.updateCount = updateCount; |
251 ski.lastPolled = pollCount; |
170 } |
252 } |
171 } |
253 } |
172 } |
254 } |
173 } |
255 } |
174 } |
256 } |
179 return numKeysUpdated; |
261 return numKeysUpdated; |
180 } |
262 } |
181 |
263 |
182 @Override |
264 @Override |
183 protected void implClose() throws IOException { |
265 protected void implClose() throws IOException { |
184 if (!closed) { |
266 assert !isOpen(); |
185 closed = true; |
267 assert Thread.holdsLock(this); |
186 |
268 assert Thread.holdsLock(nioKeys()); |
187 // prevent further wakeup |
269 |
188 synchronized (interruptLock) { |
270 // prevent further wakeup |
189 interruptTriggered = true; |
271 synchronized (interruptLock) { |
190 } |
272 interruptTriggered = true; |
191 |
273 } |
192 kqueueWrapper.close(); |
274 |
193 FileDispatcherImpl.closeIntFD(fd0); |
275 FileDispatcherImpl.closeIntFD(kqfd); |
194 FileDispatcherImpl.closeIntFD(fd1); |
276 KQueue.freePollArray(pollArrayAddress); |
195 |
277 |
196 // Deregister channels |
278 FileDispatcherImpl.closeIntFD(fd0); |
197 Iterator<SelectionKey> i = keys.iterator(); |
279 FileDispatcherImpl.closeIntFD(fd1); |
198 while (i.hasNext()) { |
280 |
199 SelectionKeyImpl ski = (SelectionKeyImpl)i.next(); |
281 // Deregister channels |
200 deregister(ski); |
282 Iterator<SelectionKey> i = keys.iterator(); |
201 SelectableChannel selch = ski.channel(); |
283 while (i.hasNext()) { |
202 if (!selch.isOpen() && !selch.isRegistered()) |
284 SelectionKeyImpl ski = (SelectionKeyImpl)i.next(); |
203 ((SelChImpl)selch).kill(); |
285 deregister(ski); |
204 i.remove(); |
286 SelectableChannel selch = ski.channel(); |
205 } |
287 if (!selch.isOpen() && !selch.isRegistered()) |
|
288 ((SelChImpl)selch).kill(); |
|
289 i.remove(); |
206 } |
290 } |
207 } |
291 } |
208 |
292 |
209 @Override |
293 @Override |
210 protected void implRegister(SelectionKeyImpl ski) { |
294 protected void implRegister(SelectionKeyImpl ski) { |
|
295 assert Thread.holdsLock(nioKeys()); |
211 ensureOpen(); |
296 ensureOpen(); |
212 int fd = IOUtil.fdVal(ski.channel.getFD()); |
297 synchronized (updateLock) { |
213 fdMap.put(Integer.valueOf(fd), new MapEntry(ski)); |
298 newKeys.addLast(ski); |
|
299 } |
214 keys.add(ski); |
300 keys.add(ski); |
215 } |
301 } |
216 |
302 |
217 @Override |
303 @Override |
218 protected void implDereg(SelectionKeyImpl ski) throws IOException { |
304 protected void implDereg(SelectionKeyImpl ski) throws IOException { |
|
305 assert !ski.isValid(); |
|
306 assert Thread.holdsLock(this); |
|
307 assert Thread.holdsLock(nioKeys()); |
|
308 assert Thread.holdsLock(nioSelectedKeys()); |
|
309 |
219 int fd = ski.channel.getFDVal(); |
310 int fd = ski.channel.getFDVal(); |
220 fdMap.remove(Integer.valueOf(fd)); |
311 fdToKey.remove(fd); |
221 kqueueWrapper.release(ski.channel); |
312 if (registeredReadFilter.get(fd)) { |
|
313 KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE); |
|
314 registeredReadFilter.clear(fd); |
|
315 } |
|
316 if (registeredWriteFilter.get(fd)) { |
|
317 KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE); |
|
318 registeredWriteFilter.clear(fd); |
|
319 } |
|
320 |
|
321 selectedKeys.remove(ski); |
222 keys.remove(ski); |
322 keys.remove(ski); |
223 selectedKeys.remove(ski); |
323 |
|
324 // remove from channel's key set |
224 deregister(ski); |
325 deregister(ski); |
|
326 |
225 SelectableChannel selch = ski.channel(); |
327 SelectableChannel selch = ski.channel(); |
226 if (!selch.isOpen() && !selch.isRegistered()) |
328 if (!selch.isOpen() && !selch.isRegistered()) |
227 ((SelChImpl)selch).kill(); |
329 ((SelChImpl) selch).kill(); |
228 } |
330 } |
229 |
331 |
230 @Override |
332 @Override |
231 public void putEventOps(SelectionKeyImpl ski, int ops) { |
333 public void putEventOps(SelectionKeyImpl ski, int ops) { |
232 ensureOpen(); |
334 ensureOpen(); |
233 kqueueWrapper.setInterest(ski.channel, ops); |
335 synchronized (updateLock) { |
|
336 updateOps.addLast(ops); // ops first in case adding the key fails |
|
337 updateKeys.addLast(ski); |
|
338 } |
234 } |
339 } |
235 |
340 |
236 @Override |
341 @Override |
237 public Selector wakeup() { |
342 public Selector wakeup() { |
238 synchronized (interruptLock) { |
343 synchronized (interruptLock) { |
239 if (!interruptTriggered) { |
344 if (!interruptTriggered) { |
240 kqueueWrapper.interrupt(); |
345 try { |
|
346 IOUtil.write1(fd1, (byte)0); |
|
347 } catch (IOException ioe) { |
|
348 throw new InternalError(ioe); |
|
349 } |
241 interruptTriggered = true; |
350 interruptTriggered = true; |
242 } |
351 } |
243 } |
352 } |
244 return this; |
353 return this; |
245 } |
354 } |