25 |
25 |
26 package sun.nio.ch; |
26 package sun.nio.ch; |
27 |
27 |
28 import java.io.IOException; |
28 import java.io.IOException; |
29 import java.nio.channels.ClosedSelectorException; |
29 import java.nio.channels.ClosedSelectorException; |
|
30 import java.nio.channels.SelectionKey; |
30 import java.nio.channels.Selector; |
31 import java.nio.channels.Selector; |
31 import java.nio.channels.spi.SelectorProvider; |
32 import java.nio.channels.spi.SelectorProvider; |
32 import java.util.ArrayDeque; |
33 import java.util.ArrayDeque; |
33 import java.util.Deque; |
34 import java.util.Deque; |
34 import java.util.HashMap; |
35 import java.util.HashMap; |
35 import java.util.Map; |
36 import java.util.Map; |
36 import java.util.concurrent.TimeUnit; |
37 import java.util.concurrent.TimeUnit; |
|
38 import java.util.function.Consumer; |
37 |
39 |
38 import static sun.nio.ch.KQueue.EVFILT_READ; |
40 import static sun.nio.ch.KQueue.EVFILT_READ; |
39 import static sun.nio.ch.KQueue.EVFILT_WRITE; |
41 import static sun.nio.ch.KQueue.EVFILT_WRITE; |
40 import static sun.nio.ch.KQueue.EV_ADD; |
42 import static sun.nio.ch.KQueue.EV_ADD; |
41 import static sun.nio.ch.KQueue.EV_DELETE; |
43 import static sun.nio.ch.KQueue.EV_DELETE; |
98 if (!isOpen()) |
100 if (!isOpen()) |
99 throw new ClosedSelectorException(); |
101 throw new ClosedSelectorException(); |
100 } |
102 } |
101 |
103 |
102 @Override |
104 @Override |
103 protected int doSelect(long timeout) throws IOException { |
105 protected int doSelect(Consumer<SelectionKey> action, long timeout) |
|
106 throws IOException |
|
107 { |
104 assert Thread.holdsLock(this); |
108 assert Thread.holdsLock(this); |
105 |
109 |
106 long to = Math.min(timeout, Integer.MAX_VALUE); // max kqueue timeout |
110 long to = Math.min(timeout, Integer.MAX_VALUE); // max kqueue timeout |
107 boolean blocking = (to != 0); |
111 boolean blocking = (to != 0); |
108 boolean timedPoll = (to > 0); |
112 boolean timedPoll = (to > 0); |
178 } |
182 } |
179 } |
183 } |
180 } |
184 } |
181 |
185 |
182 /** |
186 /** |
183 * Update the keys of file descriptors that were polled and add them to |
187 * Process the polled events. |
184 * the selected-key set. |
|
185 * If the interrupt fd has been selected, drain it and clear the interrupt. |
188 * If the interrupt fd has been selected, drain it and clear the interrupt. |
186 */ |
189 */ |
187 private int updateSelectedKeys(int numEntries) throws IOException { |
190 private int processEvents(int numEntries, Consumer<SelectionKey> action) |
188 assert Thread.holdsLock(this); |
191 throws IOException |
189 assert Thread.holdsLock(nioSelectedKeys()); |
192 { |
|
193 assert Thread.holdsLock(this); |
190 |
194 |
191 int numKeysUpdated = 0; |
195 int numKeysUpdated = 0; |
192 boolean interrupted = false; |
196 boolean interrupted = false; |
193 |
197 |
194 // A file descriptor may be registered with kqueue with more than one |
198 // A file descriptor may be registered with kqueue with more than one |
212 if (filter == EVFILT_READ) { |
216 if (filter == EVFILT_READ) { |
213 rOps |= Net.POLLIN; |
217 rOps |= Net.POLLIN; |
214 } else if (filter == EVFILT_WRITE) { |
218 } else if (filter == EVFILT_WRITE) { |
215 rOps |= Net.POLLOUT; |
219 rOps |= Net.POLLOUT; |
216 } |
220 } |
217 |
221 int updated = processReadyEvents(rOps, ski, action); |
218 if (selectedKeys.contains(ski)) { |
222 if (updated > 0 && ski.lastPolled != pollCount) { |
219 if (ski.translateAndUpdateReadyOps(rOps)) { |
223 numKeysUpdated++; |
220 // file descriptor may be polled more than once per poll |
224 ski.lastPolled = pollCount; |
221 if (ski.lastPolled != pollCount) { |
|
222 numKeysUpdated++; |
|
223 ski.lastPolled = pollCount; |
|
224 } |
|
225 } |
|
226 } else { |
|
227 ski.translateAndSetReadyOps(rOps); |
|
228 if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) { |
|
229 selectedKeys.add(ski); |
|
230 numKeysUpdated++; |
|
231 ski.lastPolled = pollCount; |
|
232 } |
|
233 } |
225 } |
234 } |
226 } |
235 } |
227 } |
236 } |
228 } |
237 |
229 |