25 |
25 |
26 package sun.nio.ch; |
26 package sun.nio.ch; |
27 |
27 |
28 import java.io.IOException; |
28 import java.io.IOException; |
29 import java.nio.channels.ClosedSelectorException; |
29 import java.nio.channels.ClosedSelectorException; |
30 import java.nio.channels.SelectableChannel; |
|
31 import java.nio.channels.SelectionKey; |
|
32 import java.nio.channels.Selector; |
30 import java.nio.channels.Selector; |
33 import java.nio.channels.spi.SelectorProvider; |
31 import java.nio.channels.spi.SelectorProvider; |
34 import java.util.ArrayDeque; |
32 import java.util.ArrayDeque; |
35 import java.util.BitSet; |
|
36 import java.util.Deque; |
33 import java.util.Deque; |
37 import java.util.HashMap; |
34 import java.util.HashMap; |
38 import java.util.Iterator; |
|
39 import java.util.Map; |
35 import java.util.Map; |
40 import java.util.concurrent.TimeUnit; |
36 import java.util.concurrent.TimeUnit; |
41 |
37 |
42 import static sun.nio.ch.KQueue.EVFILT_READ; |
38 import static sun.nio.ch.KQueue.EVFILT_READ; |
43 import static sun.nio.ch.KQueue.EVFILT_WRITE; |
39 import static sun.nio.ch.KQueue.EVFILT_WRITE; |
64 private final int fd1; |
60 private final int fd1; |
65 |
61 |
66 // maps file descriptor to selection key, synchronize on selector |
62 // maps file descriptor to selection key, synchronize on selector |
67 private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>(); |
63 private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>(); |
68 |
64 |
69 // file descriptors registered with kqueue, synchronize on selector |
|
70 private final BitSet registeredReadFilter = new BitSet(); |
|
71 private final BitSet registeredWriteFilter = new BitSet(); |
|
72 |
|
73 // pending new registrations/updates, queued by implRegister and putEventOps |
65 // pending new registrations/updates, queued by implRegister and putEventOps |
74 private final Object updateLock = new Object(); |
66 private final Object updateLock = new Object(); |
75 private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>(); |
67 private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>(); |
76 private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>(); |
68 private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>(); |
77 private final Deque<Integer> updateOps = new ArrayDeque<>(); |
69 private final Deque<Integer> updateEvents = new ArrayDeque<>(); |
78 |
70 |
79 // interrupt triggering and clearing |
71 // interrupt triggering and clearing |
80 private final Object interruptLock = new Object(); |
72 private final Object interruptLock = new Object(); |
81 private boolean interruptTriggered; |
73 private boolean interruptTriggered; |
82 |
74 |
111 |
103 |
112 @Override |
104 @Override |
113 protected int doSelect(long timeout) throws IOException { |
105 protected int doSelect(long timeout) throws IOException { |
114 assert Thread.holdsLock(this); |
106 assert Thread.holdsLock(this); |
115 |
107 |
|
108 long to = Math.min(timeout, Integer.MAX_VALUE); // max kqueue timeout |
|
109 boolean blocking = (to != 0); |
|
110 boolean timedPoll = (to > 0); |
|
111 |
116 int numEntries; |
112 int numEntries; |
117 processUpdateQueue(); |
113 processUpdateQueue(); |
118 processDeregisterQueue(); |
114 processDeregisterQueue(); |
119 try { |
115 try { |
120 begin(); |
116 begin(blocking); |
121 |
117 |
122 long to = Math.min(timeout, Integer.MAX_VALUE); // max kqueue timeout |
|
123 boolean timedPoll = (to > 0); |
|
124 do { |
118 do { |
125 long startTime = timedPoll ? System.nanoTime() : 0; |
119 long startTime = timedPoll ? System.nanoTime() : 0; |
126 numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to); |
120 numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to); |
127 if (numEntries == IOStatus.INTERRUPTED && timedPoll) { |
121 if (numEntries == IOStatus.INTERRUPTED && timedPoll) { |
128 // timed poll interrupted so need to adjust timeout |
122 // timed poll interrupted so need to adjust timeout |
153 SelectionKeyImpl ski; |
147 SelectionKeyImpl ski; |
154 |
148 |
155 // new registrations |
149 // new registrations |
156 while ((ski = newKeys.pollFirst()) != null) { |
150 while ((ski = newKeys.pollFirst()) != null) { |
157 if (ski.isValid()) { |
151 if (ski.isValid()) { |
158 SelChImpl ch = ski.channel; |
152 int fd = ski.channel.getFDVal(); |
159 int fd = ch.getFDVal(); |
|
160 SelectionKeyImpl previous = fdToKey.put(fd, ski); |
153 SelectionKeyImpl previous = fdToKey.put(fd, ski); |
161 assert previous == null; |
154 assert previous == null; |
162 assert registeredReadFilter.get(fd) == false; |
155 assert ski.registeredEvents() == 0; |
163 assert registeredWriteFilter.get(fd) == false; |
|
164 } |
156 } |
165 } |
157 } |
166 |
158 |
167 // changes to interest ops |
159 // changes to interest ops |
168 assert updateKeys.size() == updateOps.size(); |
160 assert updateKeys.size() == updateKeys.size(); |
169 while ((ski = updateKeys.pollFirst()) != null) { |
161 while ((ski = updateKeys.pollFirst()) != null) { |
170 int ops = updateOps.pollFirst(); |
162 int newEvents = updateEvents.pollFirst(); |
171 int fd = ski.channel.getFDVal(); |
163 int fd = ski.channel.getFDVal(); |
172 if (ski.isValid() && fdToKey.containsKey(fd)) { |
164 if (ski.isValid() && fdToKey.containsKey(fd)) { |
173 // add or delete interest in read events |
165 int registeredEvents = ski.registeredEvents(); |
174 if (registeredReadFilter.get(fd)) { |
166 if (newEvents != registeredEvents) { |
175 if ((ops & Net.POLLIN) == 0) { |
167 |
176 KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE); |
168 // add or delete interest in read events |
177 registeredReadFilter.clear(fd); |
169 if ((registeredEvents & Net.POLLIN) != 0) { |
|
170 if ((newEvents & Net.POLLIN) == 0) { |
|
171 KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE); |
|
172 } |
|
173 } else if ((newEvents & Net.POLLIN) != 0) { |
|
174 KQueue.register(kqfd, fd, EVFILT_READ, EV_ADD); |
178 } |
175 } |
179 } else if ((ops & Net.POLLIN) != 0) { |
176 |
180 KQueue.register(kqfd, fd, EVFILT_READ, EV_ADD); |
177 // add or delete interest in write events |
181 registeredReadFilter.set(fd); |
178 if ((registeredEvents & Net.POLLOUT) != 0) { |
|
179 if ((newEvents & Net.POLLOUT) == 0) { |
|
180 KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE); |
|
181 } |
|
182 } else if ((newEvents & Net.POLLOUT) != 0) { |
|
183 KQueue.register(kqfd, fd, EVFILT_WRITE, EV_ADD); |
|
184 } |
|
185 |
|
186 ski.registeredEvents(newEvents); |
182 } |
187 } |
183 |
|
184 // add or delete interest in write events |
|
185 if (registeredWriteFilter.get(fd)) { |
|
186 if ((ops & Net.POLLOUT) == 0) { |
|
187 KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE); |
|
188 registeredWriteFilter.clear(fd); |
|
189 } |
|
190 } else if ((ops & Net.POLLOUT) != 0) { |
|
191 KQueue.register(kqfd, fd, EVFILT_WRITE, EV_ADD); |
|
192 registeredWriteFilter.set(fd); |
|
193 } |
|
194 } |
188 } |
195 } |
189 } |
196 } |
190 } |
197 } |
191 } |
198 |
192 |
199 /** |
193 /** |
200 * Update the keys whose fd's have been selected by kqueue. |
194 * Update the keys of file descriptors that were polled and add them to |
201 * Add the ready keys to the selected key set. |
195 * the selected-key set. |
202 * If the interrupt fd has been selected, drain it and clear the interrupt. |
196 * If the interrupt fd has been selected, drain it and clear the interrupt. |
203 */ |
197 */ |
204 private int updateSelectedKeys(int numEntries) throws IOException { |
198 private int updateSelectedKeys(int numEntries) throws IOException { |
205 assert Thread.holdsLock(this); |
199 assert Thread.holdsLock(this); |
206 assert Thread.holdsLock(nioSelectedKeys()); |
200 assert Thread.holdsLock(nioSelectedKeys()); |
275 FileDispatcherImpl.closeIntFD(kqfd); |
268 FileDispatcherImpl.closeIntFD(kqfd); |
276 KQueue.freePollArray(pollArrayAddress); |
269 KQueue.freePollArray(pollArrayAddress); |
277 |
270 |
278 FileDispatcherImpl.closeIntFD(fd0); |
271 FileDispatcherImpl.closeIntFD(fd0); |
279 FileDispatcherImpl.closeIntFD(fd1); |
272 FileDispatcherImpl.closeIntFD(fd1); |
280 |
|
281 // Deregister channels |
|
282 Iterator<SelectionKey> i = keys.iterator(); |
|
283 while (i.hasNext()) { |
|
284 SelectionKeyImpl ski = (SelectionKeyImpl)i.next(); |
|
285 deregister(ski); |
|
286 SelectableChannel selch = ski.channel(); |
|
287 if (!selch.isOpen() && !selch.isRegistered()) |
|
288 ((SelChImpl)selch).kill(); |
|
289 i.remove(); |
|
290 } |
|
291 } |
273 } |
292 |
274 |
293 @Override |
275 @Override |
294 protected void implRegister(SelectionKeyImpl ski) { |
276 protected void implRegister(SelectionKeyImpl ski) { |
295 assert Thread.holdsLock(nioKeys()); |
|
296 ensureOpen(); |
277 ensureOpen(); |
297 synchronized (updateLock) { |
278 synchronized (updateLock) { |
298 newKeys.addLast(ski); |
279 newKeys.addLast(ski); |
299 } |
280 } |
300 keys.add(ski); |
|
301 } |
281 } |
302 |
282 |
303 @Override |
283 @Override |
304 protected void implDereg(SelectionKeyImpl ski) throws IOException { |
284 protected void implDereg(SelectionKeyImpl ski) throws IOException { |
305 assert !ski.isValid(); |
285 assert !ski.isValid(); |
306 assert Thread.holdsLock(this); |
286 assert Thread.holdsLock(this); |
307 assert Thread.holdsLock(nioKeys()); |
|
308 assert Thread.holdsLock(nioSelectedKeys()); |
|
309 |
287 |
310 int fd = ski.channel.getFDVal(); |
288 int fd = ski.channel.getFDVal(); |
311 fdToKey.remove(fd); |
289 int registeredEvents = ski.registeredEvents(); |
312 if (registeredReadFilter.get(fd)) { |
290 if (fdToKey.remove(fd) != null) { |
313 KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE); |
291 if (registeredEvents != 0) { |
314 registeredReadFilter.clear(fd); |
292 if ((registeredEvents & Net.POLLIN) != 0) |
315 } |
293 KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE); |
316 if (registeredWriteFilter.get(fd)) { |
294 if ((registeredEvents & Net.POLLOUT) != 0) |
317 KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE); |
295 KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE); |
318 registeredWriteFilter.clear(fd); |
296 ski.registeredEvents(0); |
319 } |
297 } |
320 |
298 } else { |
321 selectedKeys.remove(ski); |
299 assert registeredEvents == 0; |
322 keys.remove(ski); |
300 } |
323 |
301 } |
324 // remove from channel's key set |
302 |
325 deregister(ski); |
303 @Override |
326 |
304 public void putEventOps(SelectionKeyImpl ski, int events) { |
327 SelectableChannel selch = ski.channel(); |
|
328 if (!selch.isOpen() && !selch.isRegistered()) |
|
329 ((SelChImpl) selch).kill(); |
|
330 } |
|
331 |
|
332 @Override |
|
333 public void putEventOps(SelectionKeyImpl ski, int ops) { |
|
334 ensureOpen(); |
305 ensureOpen(); |
335 synchronized (updateLock) { |
306 synchronized (updateLock) { |
336 updateOps.addLast(ops); // ops first in case adding the key fails |
307 updateEvents.addLast(events); // events first in case adding key fails |
337 updateKeys.addLast(ski); |
308 updateKeys.addLast(ski); |
338 } |
309 } |
339 } |
310 } |
340 |
311 |
341 @Override |
312 @Override |