24 */ |
24 */ |
25 |
25 |
26 package sun.nio.ch; |
26 package sun.nio.ch; |
27 |
27 |
28 import java.io.IOException; |
28 import java.io.IOException; |
29 import java.nio.channels.*; |
29 import java.nio.channels.ClosedSelectorException; |
30 import java.nio.channels.spi.*; |
30 import java.nio.channels.SelectableChannel; |
31 import java.util.*; |
31 import java.nio.channels.SelectionKey; |
|
32 import java.nio.channels.Selector; |
|
33 import java.nio.channels.spi.SelectorProvider; |
|
34 import java.util.ArrayDeque; |
|
35 import java.util.BitSet; |
|
36 import java.util.Deque; |
|
37 import java.util.HashMap; |
|
38 import java.util.Iterator; |
|
39 import java.util.Map; |
|
40 import java.util.concurrent.TimeUnit; |
|
41 |
|
42 import static sun.nio.ch.EPoll.EPOLLIN; |
|
43 import static sun.nio.ch.EPoll.EPOLL_CTL_ADD; |
|
44 import static sun.nio.ch.EPoll.EPOLL_CTL_DEL; |
|
45 import static sun.nio.ch.EPoll.EPOLL_CTL_MOD; |
|
46 |
32 |
47 |
33 /** |
48 /** |
34 * An implementation of Selector for Linux 2.6+ kernels that uses |
49 * Linux epoll based Selector implementation |
35 * the epoll event notification facility. |
|
36 */ |
50 */ |
37 class EPollSelectorImpl |
51 |
38 extends SelectorImpl |
52 class EPollSelectorImpl extends SelectorImpl { |
39 { |
53 |
40 // File descriptors used for interrupt |
54 // maximum number of events to poll in one call to epoll_wait |
|
55 private static final int NUM_EPOLLEVENTS = Math.min(IOUtil.fdLimit(), 1024); |
|
56 |
|
57 // epoll file descriptor |
|
58 private final int epfd; |
|
59 |
|
60 // address of poll array when polling with epoll_wait |
|
61 private final long pollArrayAddress; |
|
62 |
|
63 // file descriptors used for interrupt |
41 private final int fd0; |
64 private final int fd0; |
42 private final int fd1; |
65 private final int fd1; |
43 |
66 |
44 // The poll object |
67 // maps file descriptor to selection key, synchronize on selector |
45 private final EPollArrayWrapper pollWrapper; |
68 private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>(); |
46 |
69 |
47 // Maps from file descriptors to keys |
70 // file descriptors registered with epoll, synchronize on selector |
48 private final Map<Integer, SelectionKeyImpl> fdToKey; |
71 private final BitSet registered = new BitSet(); |
49 |
72 |
50 // True if this Selector has been closed |
73 // pending new registrations/updates, queued by implRegister and putEventOps |
51 private volatile boolean closed; |
74 private final Object updateLock = new Object(); |
52 |
75 private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>(); |
53 // Lock for interrupt triggering and clearing |
76 private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>(); |
|
77 private final Deque<Integer> updateOps = new ArrayDeque<>(); |
|
78 |
|
79 // interrupt triggering and clearing |
54 private final Object interruptLock = new Object(); |
80 private final Object interruptLock = new Object(); |
55 private boolean interruptTriggered = false; |
81 private boolean interruptTriggered; |
56 |
82 |
57 /** |
83 /** |
58 * Package private constructor called by factory method in |
84 * Package private constructor called by factory method in |
59 * the abstract superclass Selector. |
85 * the abstract superclass Selector. |
60 */ |
86 */ |
61 EPollSelectorImpl(SelectorProvider sp) throws IOException { |
87 EPollSelectorImpl(SelectorProvider sp) throws IOException { |
62 super(sp); |
88 super(sp); |
63 long pipeFds = IOUtil.makePipe(false); |
89 |
64 fd0 = (int) (pipeFds >>> 32); |
90 this.epfd = EPoll.create(); |
65 fd1 = (int) pipeFds; |
91 this.pollArrayAddress = EPoll.allocatePollArray(NUM_EPOLLEVENTS); |
|
92 |
66 try { |
93 try { |
67 pollWrapper = new EPollArrayWrapper(fd0, fd1); |
94 long fds = IOUtil.makePipe(false); |
68 fdToKey = new HashMap<>(); |
95 this.fd0 = (int) (fds >>> 32); |
69 } catch (Throwable t) { |
96 this.fd1 = (int) fds; |
70 try { |
97 } catch (IOException ioe) { |
71 FileDispatcherImpl.closeIntFD(fd0); |
98 EPoll.freePollArray(pollArrayAddress); |
72 } catch (IOException ioe0) { |
99 FileDispatcherImpl.closeIntFD(epfd); |
73 t.addSuppressed(ioe0); |
100 throw ioe; |
74 } |
101 } |
75 try { |
102 |
76 FileDispatcherImpl.closeIntFD(fd1); |
103 // register one end of the socket pair for wakeups |
77 } catch (IOException ioe1) { |
104 EPoll.ctl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN); |
78 t.addSuppressed(ioe1); |
|
79 } |
|
80 throw t; |
|
81 } |
|
82 } |
105 } |
83 |
106 |
84 private void ensureOpen() { |
107 private void ensureOpen() { |
85 if (closed) |
108 if (!isOpen()) |
86 throw new ClosedSelectorException(); |
109 throw new ClosedSelectorException(); |
87 } |
110 } |
88 |
111 |
89 @Override |
112 @Override |
90 protected int doSelect(long timeout) throws IOException { |
113 protected int doSelect(long timeout) throws IOException { |
91 ensureOpen(); |
114 assert Thread.holdsLock(this); |
|
115 |
92 int numEntries; |
116 int numEntries; |
|
117 processUpdateQueue(); |
93 processDeregisterQueue(); |
118 processDeregisterQueue(); |
94 try { |
119 try { |
95 begin(); |
120 begin(); |
96 numEntries = pollWrapper.poll(timeout); |
121 |
|
122 // epoll_wait timeout is int |
|
123 int to = (int) Math.min(timeout, Integer.MAX_VALUE); |
|
124 boolean timedPoll = (to > 0); |
|
125 do { |
|
126 long startTime = timedPoll ? System.nanoTime() : 0; |
|
127 numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to); |
|
128 if (numEntries == IOStatus.INTERRUPTED && timedPoll) { |
|
129 // timed poll interrupted so need to adjust timeout |
|
130 long adjust = System.nanoTime() - startTime; |
|
131 to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS); |
|
132 if (to <= 0) { |
|
133 // timeout expired so no retry |
|
134 numEntries = 0; |
|
135 } |
|
136 } |
|
137 } while (numEntries == IOStatus.INTERRUPTED); |
|
138 assert IOStatus.check(numEntries); |
|
139 |
97 } finally { |
140 } finally { |
98 end(); |
141 end(); |
99 } |
142 } |
100 processDeregisterQueue(); |
143 processDeregisterQueue(); |
101 return updateSelectedKeys(numEntries); |
144 return updateSelectedKeys(numEntries); |
|
145 } |
|
146 |
|
147 /** |
|
148 * Process new registrations and changes to the interest ops. |
|
149 */ |
|
150 private void processUpdateQueue() { |
|
151 assert Thread.holdsLock(this); |
|
152 |
|
153 synchronized (updateLock) { |
|
154 SelectionKeyImpl ski; |
|
155 |
|
156 // new registrations |
|
157 while ((ski = newKeys.pollFirst()) != null) { |
|
158 if (ski.isValid()) { |
|
159 SelChImpl ch = ski.channel; |
|
160 int fd = ch.getFDVal(); |
|
161 SelectionKeyImpl previous = fdToKey.put(fd, ski); |
|
162 assert previous == null; |
|
163 assert registered.get(fd) == false; |
|
164 } |
|
165 } |
|
166 |
|
167 // changes to interest ops |
|
168 assert updateKeys.size() == updateOps.size(); |
|
169 while ((ski = updateKeys.pollFirst()) != null) { |
|
170 int ops = updateOps.pollFirst(); |
|
171 int fd = ski.channel.getFDVal(); |
|
172 if (ski.isValid() && fdToKey.containsKey(fd)) { |
|
173 if (registered.get(fd)) { |
|
174 if (ops == 0) { |
|
175 // remove from epoll |
|
176 EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0); |
|
177 registered.clear(fd); |
|
178 } else { |
|
179 // modify events |
|
180 EPoll.ctl(epfd, EPOLL_CTL_MOD, fd, ops); |
|
181 } |
|
182 } else if (ops != 0) { |
|
183 // add to epoll |
|
184 EPoll.ctl(epfd, EPOLL_CTL_ADD, fd, ops); |
|
185 registered.set(fd); |
|
186 } |
|
187 } |
|
188 } |
|
189 } |
102 } |
190 } |
103 |
191 |
104 /** |
192 /** |
105 * Update the keys whose fd's have been selected by the epoll. |
193 * Update the keys whose fd's have been selected by the epoll. |
106 * Add the ready keys to the ready queue. |
194 * Add the ready keys to the ready queue. |
107 */ |
195 */ |
108 private int updateSelectedKeys(int numEntries) throws IOException { |
196 private int updateSelectedKeys(int numEntries) throws IOException { |
|
197 assert Thread.holdsLock(this); |
|
198 assert Thread.holdsLock(nioSelectedKeys()); |
|
199 |
109 boolean interrupted = false; |
200 boolean interrupted = false; |
110 int numKeysUpdated = 0; |
201 int numKeysUpdated = 0; |
111 for (int i=0; i<numEntries; i++) { |
202 for (int i=0; i<numEntries; i++) { |
112 int nextFD = pollWrapper.getDescriptor(i); |
203 long event = EPoll.getEvent(pollArrayAddress, i); |
113 if (nextFD == fd0) { |
204 int fd = EPoll.getDescriptor(event); |
|
205 if (fd == fd0) { |
114 interrupted = true; |
206 interrupted = true; |
115 } else { |
207 } else { |
116 SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD)); |
208 SelectionKeyImpl ski = fdToKey.get(fd); |
117 if (ski != null) { |
209 if (ski != null) { |
118 int rOps = pollWrapper.getEventOps(i); |
210 int rOps = EPoll.getEvents(event); |
119 if (selectedKeys.contains(ski)) { |
211 if (selectedKeys.contains(ski)) { |
120 if (ski.channel.translateAndSetReadyOps(rOps, ski)) { |
212 if (ski.channel.translateAndSetReadyOps(rOps, ski)) { |
121 numKeysUpdated++; |
213 numKeysUpdated++; |
122 } |
214 } |
123 } else { |
215 } else { |
165 } |
258 } |
166 } |
259 } |
167 |
260 |
168 @Override |
261 @Override |
169 protected void implRegister(SelectionKeyImpl ski) { |
262 protected void implRegister(SelectionKeyImpl ski) { |
|
263 assert Thread.holdsLock(nioKeys()); |
170 ensureOpen(); |
264 ensureOpen(); |
171 SelChImpl ch = ski.channel; |
265 synchronized (updateLock) { |
172 int fd = Integer.valueOf(ch.getFDVal()); |
266 newKeys.addLast(ski); |
173 fdToKey.put(fd, ski); |
267 } |
174 pollWrapper.add(fd); |
|
175 keys.add(ski); |
268 keys.add(ski); |
176 } |
269 } |
177 |
270 |
178 @Override |
271 @Override |
179 protected void implDereg(SelectionKeyImpl ski) throws IOException { |
272 protected void implDereg(SelectionKeyImpl ski) throws IOException { |
180 assert (ski.getIndex() >= 0); |
273 assert !ski.isValid(); |
181 SelChImpl ch = ski.channel; |
274 assert Thread.holdsLock(this); |
182 int fd = ch.getFDVal(); |
275 assert Thread.holdsLock(nioKeys()); |
183 fdToKey.remove(Integer.valueOf(fd)); |
276 assert Thread.holdsLock(nioSelectedKeys()); |
184 pollWrapper.remove(fd); |
277 |
185 ski.setIndex(-1); |
278 int fd = ski.channel.getFDVal(); |
|
279 fdToKey.remove(fd); |
|
280 if (registered.get(fd)) { |
|
281 EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0); |
|
282 registered.clear(fd); |
|
283 } |
|
284 |
|
285 selectedKeys.remove(ski); |
186 keys.remove(ski); |
286 keys.remove(ski); |
187 selectedKeys.remove(ski); |
287 |
|
288 // remove from channel's key set |
188 deregister(ski); |
289 deregister(ski); |
|
290 |
189 SelectableChannel selch = ski.channel(); |
291 SelectableChannel selch = ski.channel(); |
190 if (!selch.isOpen() && !selch.isRegistered()) |
292 if (!selch.isOpen() && !selch.isRegistered()) |
191 ((SelChImpl)selch).kill(); |
293 ((SelChImpl) selch).kill(); |
192 } |
294 } |
193 |
295 |
194 @Override |
296 @Override |
195 public void putEventOps(SelectionKeyImpl ski, int ops) { |
297 public void putEventOps(SelectionKeyImpl ski, int ops) { |
196 ensureOpen(); |
298 ensureOpen(); |
197 SelChImpl ch = ski.channel; |
299 synchronized (updateLock) { |
198 pollWrapper.setInterest(ch.getFDVal(), ops); |
300 updateOps.addLast(ops); // ops first in case adding the key fails |
|
301 updateKeys.addLast(ski); |
|
302 } |
199 } |
303 } |
200 |
304 |
201 @Override |
305 @Override |
202 public Selector wakeup() { |
306 public Selector wakeup() { |
203 synchronized (interruptLock) { |
307 synchronized (interruptLock) { |
204 if (!interruptTriggered) { |
308 if (!interruptTriggered) { |
205 pollWrapper.interrupt(); |
309 try { |
|
310 IOUtil.write1(fd1, (byte)0); |
|
311 } catch (IOException ioe) { |
|
312 throw new InternalError(ioe); |
|
313 } |
206 interruptTriggered = true; |
314 interruptTriggered = true; |
207 } |
315 } |
208 } |
316 } |
209 return this; |
317 return this; |
210 } |
318 } |