20 * |
20 * |
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
22 * or visit www.oracle.com if you need additional information or have any |
22 * or visit www.oracle.com if you need additional information or have any |
23 * questions. |
23 * questions. |
24 */ |
24 */ |
25 |
|
26 package sun.nio.ch; |
25 package sun.nio.ch; |
27 |
26 |
28 import java.io.IOException; |
27 import java.io.IOException; |
29 import java.nio.channels.*; |
28 import java.nio.channels.ClosedSelectorException; |
30 import java.nio.channels.spi.*; |
29 import java.nio.channels.SelectableChannel; |
31 import java.util.*; |
30 import java.nio.channels.SelectionKey; |
32 |
31 import java.nio.channels.Selector; |
|
32 import java.nio.channels.spi.SelectorProvider; |
|
33 import java.util.ArrayDeque; |
|
34 import java.util.ArrayList; |
|
35 import java.util.Deque; |
|
36 import java.util.Iterator; |
|
37 import java.util.List; |
|
38 import java.util.concurrent.TimeUnit; |
|
39 |
|
40 import jdk.internal.misc.Unsafe; |
33 |
41 |
34 /** |
42 /** |
35 * An implementation of Selector for Solaris. |
43 * Selector implementation based on poll |
36 */ |
44 */ |
37 |
45 |
38 class PollSelectorImpl |
46 class PollSelectorImpl extends SelectorImpl { |
39 extends AbstractPollSelectorImpl |
47 |
40 { |
48 // initial capacity of poll array |
41 |
49 private static final int INITIAL_CAPACITY = 16; |
42 // File descriptors used for interrupt |
50 |
43 private int fd0; |
51 // poll array, grows as needed |
44 private int fd1; |
52 private int pollArrayCapacity = INITIAL_CAPACITY; |
45 |
53 private int pollArraySize; |
46 // Lock for interrupt triggering and clearing |
54 private AllocatedNativeObject pollArray; |
47 private Object interruptLock = new Object(); |
55 |
48 private boolean interruptTriggered = false; |
56 // file descriptors used for interrupt |
49 |
57 private final int fd0; |
50 /** |
58 private final int fd1; |
51 * Package private constructor called by factory method in |
59 |
52 * the abstract superclass Selector. |
60 // keys for file descriptors in poll array, synchronize on selector |
53 */ |
61 private final List<SelectionKeyImpl> pollKeys = new ArrayList<>(); |
|
62 |
|
63 // pending updates, queued by putEventOps |
|
64 private final Object updateLock = new Object(); |
|
65 private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>(); |
|
66 private final Deque<Integer> updateOps = new ArrayDeque<>(); |
|
67 |
|
68 // interrupt triggering and clearing |
|
69 private final Object interruptLock = new Object(); |
|
70 private boolean interruptTriggered; |
|
71 |
54 PollSelectorImpl(SelectorProvider sp) throws IOException { |
72 PollSelectorImpl(SelectorProvider sp) throws IOException { |
55 super(sp, 1, 1); |
73 super(sp); |
56 long pipeFds = IOUtil.makePipe(false); |
74 |
57 fd0 = (int) (pipeFds >>> 32); |
75 int size = pollArrayCapacity * SIZE_POLLFD; |
58 fd1 = (int) pipeFds; |
76 this.pollArray = new AllocatedNativeObject(size, false); |
|
77 |
59 try { |
78 try { |
60 pollWrapper = new PollArrayWrapper(INIT_CAP); |
79 long fds = IOUtil.makePipe(false); |
61 pollWrapper.initInterrupt(fd0, fd1); |
80 this.fd0 = (int) (fds >>> 32); |
62 channelArray = new SelectionKeyImpl[INIT_CAP]; |
81 this.fd1 = (int) fds; |
63 } catch (Throwable t) { |
82 } catch (IOException ioe) { |
64 try { |
83 pollArray.free(); |
65 FileDispatcherImpl.closeIntFD(fd0); |
84 throw ioe; |
66 } catch (IOException ioe0) { |
85 } |
67 t.addSuppressed(ioe0); |
86 |
68 } |
87 // wakeup support |
69 try { |
88 synchronized (this) { |
70 FileDispatcherImpl.closeIntFD(fd1); |
89 setFirst(fd0, Net.POLLIN); |
71 } catch (IOException ioe1) { |
90 } |
72 t.addSuppressed(ioe1); |
91 } |
73 } |
92 |
74 throw t; |
93 private void ensureOpen() { |
75 } |
94 if (!isOpen()) |
76 } |
|
77 |
|
78 protected int doSelect(long timeout) |
|
79 throws IOException |
|
80 { |
|
81 if (channelArray == null) |
|
82 throw new ClosedSelectorException(); |
95 throw new ClosedSelectorException(); |
|
96 } |
|
97 |
|
98 @Override |
|
99 protected int doSelect(long timeout) throws IOException { |
|
100 assert Thread.holdsLock(this); |
|
101 |
|
102 processUpdateQueue(); |
83 processDeregisterQueue(); |
103 processDeregisterQueue(); |
84 try { |
104 try { |
85 begin(); |
105 begin(); |
86 pollWrapper.poll(totalChannels, 0, timeout); |
106 |
|
107 int to = (int) Math.min(timeout, Integer.MAX_VALUE); // max poll timeout |
|
108 boolean timedPoll = (to > 0); |
|
109 int numPolled; |
|
110 do { |
|
111 long startTime = timedPoll ? System.nanoTime() : 0; |
|
112 numPolled = poll(pollArray.address(), pollArraySize, to); |
|
113 if (numPolled == IOStatus.INTERRUPTED && timedPoll) { |
|
114 // timed poll interrupted so need to adjust timeout |
|
115 long adjust = System.nanoTime() - startTime; |
|
116 to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS); |
|
117 if (to <= 0) { |
|
118 // timeout expired so no retry |
|
119 numPolled = 0; |
|
120 } |
|
121 } |
|
122 } while (numPolled == IOStatus.INTERRUPTED); |
|
123 assert numPolled <= pollArraySize; |
|
124 |
87 } finally { |
125 } finally { |
88 end(); |
126 end(); |
89 } |
127 } |
|
128 |
90 processDeregisterQueue(); |
129 processDeregisterQueue(); |
91 int numKeysUpdated = updateSelectedKeys(); |
130 return updateSelectedKeys(); |
92 if (pollWrapper.getReventOps(0) != 0) { |
131 } |
93 // Clear the wakeup pipe |
132 |
94 pollWrapper.putReventOps(0, 0); |
133 /** |
95 synchronized (interruptLock) { |
134 * Process changes to the interest ops. |
96 IOUtil.drain(fd0); |
135 */ |
97 interruptTriggered = false; |
136 private void processUpdateQueue() { |
|
137 assert Thread.holdsLock(this); |
|
138 |
|
139 synchronized (updateLock) { |
|
140 assert updateKeys.size() == updateOps.size(); |
|
141 |
|
142 SelectionKeyImpl ski; |
|
143 while ((ski = updateKeys.pollFirst()) != null) { |
|
144 int ops = updateOps.pollFirst(); |
|
145 if (ski.isValid()) { |
|
146 int index = ski.getIndex(); |
|
147 assert index >= 0 && index < pollArraySize; |
|
148 if (index > 0) { |
|
149 assert pollKeys.get(index) == ski; |
|
150 if (ops == 0) { |
|
151 remove(ski); |
|
152 } else { |
|
153 update(ski, ops); |
|
154 } |
|
155 } else if (ops != 0) { |
|
156 add(ski, ops); |
|
157 } |
|
158 } |
98 } |
159 } |
99 } |
160 } |
|
161 } |
|
162 |
|
163 /** |
|
164 * Update the keys whose fd's have been selected by kqueue. |
|
165 * Add the ready keys to the selected key set. |
|
166 * If the interrupt fd has been selected, drain it and clear the interrupt. |
|
167 */ |
|
168 private int updateSelectedKeys() throws IOException { |
|
169 assert Thread.holdsLock(this); |
|
170 assert Thread.holdsLock(nioSelectedKeys()); |
|
171 assert pollArraySize > 0 && pollArraySize == pollKeys.size(); |
|
172 |
|
173 int numKeysUpdated = 0; |
|
174 for (int i = 1; i < pollArraySize; i++) { |
|
175 int rOps = getReventOps(i); |
|
176 if (rOps != 0) { |
|
177 SelectionKeyImpl ski = pollKeys.get(i); |
|
178 assert ski.channel.getFDVal() == getDescriptor(i); |
|
179 if (ski.isValid()) { |
|
180 if (selectedKeys.contains(ski)) { |
|
181 if (ski.channel.translateAndSetReadyOps(rOps, ski)) { |
|
182 numKeysUpdated++; |
|
183 } |
|
184 } else { |
|
185 ski.channel.translateAndSetReadyOps(rOps, ski); |
|
186 if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) { |
|
187 selectedKeys.add(ski); |
|
188 numKeysUpdated++; |
|
189 } |
|
190 } |
|
191 } |
|
192 } |
|
193 } |
|
194 |
|
195 // check for interrupt |
|
196 if (getReventOps(0) != 0) { |
|
197 assert getDescriptor(0) == fd0; |
|
198 clearInterrupt(); |
|
199 } |
|
200 |
100 return numKeysUpdated; |
201 return numKeysUpdated; |
101 } |
202 } |
102 |
203 |
103 protected void implCloseInterrupt() throws IOException { |
204 @Override |
|
205 protected void implClose() throws IOException { |
|
206 assert !isOpen(); |
|
207 assert Thread.holdsLock(this); |
|
208 assert Thread.holdsLock(nioKeys()); |
|
209 |
104 // prevent further wakeup |
210 // prevent further wakeup |
105 synchronized (interruptLock) { |
211 synchronized (interruptLock) { |
106 interruptTriggered = true; |
212 interruptTriggered = true; |
107 } |
213 } |
|
214 |
|
215 pollArray.free(); |
108 FileDispatcherImpl.closeIntFD(fd0); |
216 FileDispatcherImpl.closeIntFD(fd0); |
109 FileDispatcherImpl.closeIntFD(fd1); |
217 FileDispatcherImpl.closeIntFD(fd1); |
110 fd0 = -1; |
218 |
111 fd1 = -1; |
219 // Deregister channels |
112 pollWrapper.release(0); |
220 Iterator<SelectionKey> i = keys.iterator(); |
113 } |
221 while (i.hasNext()) { |
114 |
222 SelectionKeyImpl ski = (SelectionKeyImpl)i.next(); |
|
223 ski.setIndex(-1); |
|
224 deregister(ski); |
|
225 SelectableChannel selch = ski.channel(); |
|
226 if (!selch.isOpen() && !selch.isRegistered()) |
|
227 ((SelChImpl)selch).kill(); |
|
228 i.remove(); |
|
229 } |
|
230 } |
|
231 |
|
232 @Override |
|
233 protected void implRegister(SelectionKeyImpl ski) { |
|
234 assert ski.getIndex() == 0; |
|
235 assert Thread.holdsLock(nioKeys()); |
|
236 |
|
237 ensureOpen(); |
|
238 keys.add(ski); |
|
239 } |
|
240 |
|
241 @Override |
|
242 protected void implDereg(SelectionKeyImpl ski) throws IOException { |
|
243 assert !ski.isValid(); |
|
244 assert Thread.holdsLock(this); |
|
245 assert Thread.holdsLock(nioKeys()); |
|
246 assert Thread.holdsLock(nioSelectedKeys()); |
|
247 |
|
248 // remove from poll array |
|
249 int index = ski.getIndex(); |
|
250 if (index > 0) { |
|
251 remove(ski); |
|
252 } |
|
253 |
|
254 // remove from selected-key and key set |
|
255 selectedKeys.remove(ski); |
|
256 keys.remove(ski); |
|
257 |
|
258 // remove from channel's key set |
|
259 deregister(ski); |
|
260 |
|
261 SelectableChannel selch = ski.channel(); |
|
262 if (!selch.isOpen() && !selch.isRegistered()) |
|
263 ((SelChImpl) selch).kill(); |
|
264 } |
|
265 |
|
266 @Override |
|
267 public void putEventOps(SelectionKeyImpl ski, int ops) { |
|
268 ensureOpen(); |
|
269 synchronized (updateLock) { |
|
270 updateOps.addLast(ops); // ops first in case adding the key fails |
|
271 updateKeys.addLast(ski); |
|
272 } |
|
273 } |
|
274 |
|
275 @Override |
115 public Selector wakeup() { |
276 public Selector wakeup() { |
116 synchronized (interruptLock) { |
277 synchronized (interruptLock) { |
117 if (!interruptTriggered) { |
278 if (!interruptTriggered) { |
118 pollWrapper.interrupt(); |
279 try { |
|
280 IOUtil.write1(fd1, (byte)0); |
|
281 } catch (IOException ioe) { |
|
282 throw new InternalError(ioe); |
|
283 } |
119 interruptTriggered = true; |
284 interruptTriggered = true; |
120 } |
285 } |
121 } |
286 } |
122 return this; |
287 return this; |
123 } |
288 } |
124 |
289 |
|
290 private void clearInterrupt() throws IOException { |
|
291 synchronized (interruptLock) { |
|
292 IOUtil.drain(fd0); |
|
293 interruptTriggered = false; |
|
294 } |
|
295 } |
|
296 |
|
297 /** |
|
298 * Sets the first pollfd enty in the poll array to the given fd |
|
299 */ |
|
300 private void setFirst(int fd, int ops) { |
|
301 assert pollArraySize == 0; |
|
302 assert pollKeys.isEmpty(); |
|
303 |
|
304 putDescriptor(0, fd); |
|
305 putEventOps(0, ops); |
|
306 pollArraySize = 1; |
|
307 |
|
308 pollKeys.add(null); // dummy element |
|
309 } |
|
310 |
|
311 /** |
|
312 * Adds a pollfd entry to the poll array, expanding the poll array if needed. |
|
313 */ |
|
314 private void add(SelectionKeyImpl ski, int ops) { |
|
315 expandIfNeeded(); |
|
316 |
|
317 int index = pollArraySize; |
|
318 assert index > 0; |
|
319 putDescriptor(index, ski.channel.getFDVal()); |
|
320 putEventOps(index, ops); |
|
321 putReventOps(index, 0); |
|
322 ski.setIndex(index); |
|
323 pollArraySize++; |
|
324 |
|
325 pollKeys.add(ski); |
|
326 assert pollKeys.size() == pollArraySize; |
|
327 } |
|
328 |
|
329 /** |
|
330 * Update the events of pollfd entry. |
|
331 */ |
|
332 private void update(SelectionKeyImpl ski, int ops) { |
|
333 int index = ski.getIndex(); |
|
334 assert index > 0 && index < pollArraySize; |
|
335 assert getDescriptor(index) == ski.channel.getFDVal(); |
|
336 putEventOps(index, ops); |
|
337 } |
|
338 |
|
339 /** |
|
340 * Removes a pollfd entry from the poll array |
|
341 */ |
|
342 private void remove(SelectionKeyImpl ski) { |
|
343 int index = ski.getIndex(); |
|
344 assert index > 0 && index < pollArraySize; |
|
345 assert getDescriptor(index) == ski.channel.getFDVal(); |
|
346 |
|
347 // replace pollfd at index with the last pollfd in array |
|
348 int lastIndex = pollArraySize - 1; |
|
349 if (lastIndex != index) { |
|
350 SelectionKeyImpl lastKey = pollKeys.get(lastIndex); |
|
351 assert lastKey.getIndex() == lastIndex; |
|
352 int lastFd = getDescriptor(lastIndex); |
|
353 int lastOps = getEventOps(lastIndex); |
|
354 int lastRevents = getReventOps(lastIndex); |
|
355 assert lastKey.channel.getFDVal() == lastFd; |
|
356 putDescriptor(index, lastFd); |
|
357 putEventOps(index, lastOps); |
|
358 putReventOps(index, lastRevents); |
|
359 pollKeys.set(index, lastKey); |
|
360 lastKey.setIndex(index); |
|
361 } |
|
362 pollKeys.remove(lastIndex); |
|
363 pollArraySize--; |
|
364 assert pollKeys.size() == pollArraySize; |
|
365 |
|
366 ski.setIndex(0); |
|
367 } |
|
368 |
|
369 /** |
|
370 * Expand poll array if at capacity |
|
371 */ |
|
372 private void expandIfNeeded() { |
|
373 if (pollArraySize == pollArrayCapacity) { |
|
374 int oldSize = pollArrayCapacity * SIZE_POLLFD; |
|
375 int newCapacity = pollArrayCapacity + INITIAL_CAPACITY; |
|
376 int newSize = newCapacity * SIZE_POLLFD; |
|
377 AllocatedNativeObject newPollArray = new AllocatedNativeObject(newSize, false); |
|
378 Unsafe.getUnsafe().copyMemory(pollArray.address(), newPollArray.address(), oldSize); |
|
379 pollArray.free(); |
|
380 pollArray = newPollArray; |
|
381 pollArrayCapacity = newCapacity; |
|
382 } |
|
383 } |
|
384 |
|
385 private static final short SIZE_POLLFD = 8; |
|
386 private static final short FD_OFFSET = 0; |
|
387 private static final short EVENT_OFFSET = 4; |
|
388 private static final short REVENT_OFFSET = 6; |
|
389 |
|
390 private void putDescriptor(int i, int fd) { |
|
391 int offset = SIZE_POLLFD * i + FD_OFFSET; |
|
392 pollArray.putInt(offset, fd); |
|
393 } |
|
394 |
|
395 private int getDescriptor(int i) { |
|
396 int offset = SIZE_POLLFD * i + FD_OFFSET; |
|
397 return pollArray.getInt(offset); |
|
398 } |
|
399 |
|
400 private void putEventOps(int i, int event) { |
|
401 int offset = SIZE_POLLFD * i + EVENT_OFFSET; |
|
402 pollArray.putShort(offset, (short)event); |
|
403 } |
|
404 |
|
405 private int getEventOps(int i) { |
|
406 int offset = SIZE_POLLFD * i + EVENT_OFFSET; |
|
407 return pollArray.getShort(offset); |
|
408 } |
|
409 |
|
410 private void putReventOps(int i, int revent) { |
|
411 int offset = SIZE_POLLFD * i + REVENT_OFFSET; |
|
412 pollArray.putShort(offset, (short)revent); |
|
413 } |
|
414 |
|
415 private int getReventOps(int i) { |
|
416 int offset = SIZE_POLLFD * i + REVENT_OFFSET; |
|
417 return pollArray.getShort(offset); |
|
418 } |
|
419 |
|
420 private static native int poll(long pollAddress, int numfds, int timeout); |
|
421 |
|
422 static { |
|
423 IOUtil.load(); |
|
424 } |
125 } |
425 } |