25 |
25 |
26 package sun.nio.ch; |
26 package sun.nio.ch; |
27 |
27 |
28 import java.io.IOException; |
28 import java.io.IOException; |
29 import java.nio.channels.ClosedSelectorException; |
29 import java.nio.channels.ClosedSelectorException; |
30 import java.nio.channels.SelectableChannel; |
|
31 import java.nio.channels.SelectionKey; |
|
32 import java.nio.channels.Selector; |
30 import java.nio.channels.Selector; |
33 import java.nio.channels.spi.SelectorProvider; |
31 import java.nio.channels.spi.SelectorProvider; |
34 import java.util.ArrayDeque; |
32 import java.util.ArrayDeque; |
35 import java.util.BitSet; |
|
36 import java.util.Deque; |
33 import java.util.Deque; |
37 import java.util.HashMap; |
34 import java.util.HashMap; |
38 import java.util.Iterator; |
|
39 import java.util.Map; |
35 import java.util.Map; |
40 import java.util.concurrent.TimeUnit; |
36 import java.util.concurrent.TimeUnit; |
41 |
37 |
42 import static sun.nio.ch.EPoll.EPOLLIN; |
38 import static sun.nio.ch.EPoll.EPOLLIN; |
43 import static sun.nio.ch.EPoll.EPOLL_CTL_ADD; |
39 import static sun.nio.ch.EPoll.EPOLL_CTL_ADD; |
65 private final int fd1; |
61 private final int fd1; |
66 |
62 |
67 // maps file descriptor to selection key, synchronize on selector |
63 // maps file descriptor to selection key, synchronize on selector |
68 private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>(); |
64 private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>(); |
69 |
65 |
70 // file descriptors registered with epoll, synchronize on selector |
66 // pending new registrations/updates, queued by implRegister and putEventOpos |
71 private final BitSet registered = new BitSet(); |
|
72 |
|
73 // pending new registrations/updates, queued by implRegister and putEventOps |
|
74 private final Object updateLock = new Object(); |
67 private final Object updateLock = new Object(); |
75 private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>(); |
68 private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>(); |
76 private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>(); |
69 private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>(); |
77 private final Deque<Integer> updateOps = new ArrayDeque<>(); |
70 private final Deque<Integer> updateEvents = new ArrayDeque<>(); |
78 |
71 |
79 // interrupt triggering and clearing |
72 // interrupt triggering and clearing |
80 private final Object interruptLock = new Object(); |
73 private final Object interruptLock = new Object(); |
81 private boolean interruptTriggered; |
74 private boolean interruptTriggered; |
82 |
75 |
111 |
104 |
112 @Override |
105 @Override |
113 protected int doSelect(long timeout) throws IOException { |
106 protected int doSelect(long timeout) throws IOException { |
114 assert Thread.holdsLock(this); |
107 assert Thread.holdsLock(this); |
115 |
108 |
|
109 // epoll_wait timeout is int |
|
110 int to = (int) Math.min(timeout, Integer.MAX_VALUE); |
|
111 boolean blocking = (to != 0); |
|
112 boolean timedPoll = (to > 0); |
|
113 |
116 int numEntries; |
114 int numEntries; |
117 processUpdateQueue(); |
115 processUpdateQueue(); |
118 processDeregisterQueue(); |
116 processDeregisterQueue(); |
119 try { |
117 try { |
120 begin(); |
118 begin(blocking); |
121 |
119 |
122 // epoll_wait timeout is int |
|
123 int to = (int) Math.min(timeout, Integer.MAX_VALUE); |
|
124 boolean timedPoll = (to > 0); |
|
125 do { |
120 do { |
126 long startTime = timedPoll ? System.nanoTime() : 0; |
121 long startTime = timedPoll ? System.nanoTime() : 0; |
127 numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to); |
122 numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to); |
128 if (numEntries == IOStatus.INTERRUPTED && timedPoll) { |
123 if (numEntries == IOStatus.INTERRUPTED && timedPoll) { |
129 // timed poll interrupted so need to adjust timeout |
124 // timed poll interrupted so need to adjust timeout |
154 SelectionKeyImpl ski; |
149 SelectionKeyImpl ski; |
155 |
150 |
156 // new registrations |
151 // new registrations |
157 while ((ski = newKeys.pollFirst()) != null) { |
152 while ((ski = newKeys.pollFirst()) != null) { |
158 if (ski.isValid()) { |
153 if (ski.isValid()) { |
159 SelChImpl ch = ski.channel; |
154 int fd = ski.channel.getFDVal(); |
160 int fd = ch.getFDVal(); |
|
161 SelectionKeyImpl previous = fdToKey.put(fd, ski); |
155 SelectionKeyImpl previous = fdToKey.put(fd, ski); |
162 assert previous == null; |
156 assert previous == null; |
163 assert registered.get(fd) == false; |
157 assert ski.registeredEvents() == 0; |
164 } |
158 } |
165 } |
159 } |
166 |
160 |
167 // changes to interest ops |
161 // changes to interest ops |
168 assert updateKeys.size() == updateOps.size(); |
162 assert updateKeys.size() == updateEvents.size(); |
169 while ((ski = updateKeys.pollFirst()) != null) { |
163 while ((ski = updateKeys.pollFirst()) != null) { |
170 int ops = updateOps.pollFirst(); |
164 int newEvents = updateEvents.pollFirst(); |
171 int fd = ski.channel.getFDVal(); |
165 int fd = ski.channel.getFDVal(); |
172 if (ski.isValid() && fdToKey.containsKey(fd)) { |
166 if (ski.isValid() && fdToKey.containsKey(fd)) { |
173 if (registered.get(fd)) { |
167 int registeredEvents = ski.registeredEvents(); |
174 if (ops == 0) { |
168 if (newEvents != registeredEvents) { |
|
169 if (newEvents == 0) { |
175 // remove from epoll |
170 // remove from epoll |
176 EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0); |
171 EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0); |
177 registered.clear(fd); |
|
178 } else { |
172 } else { |
179 // modify events |
173 if (registeredEvents == 0) { |
180 EPoll.ctl(epfd, EPOLL_CTL_MOD, fd, ops); |
174 // add to epoll |
|
175 EPoll.ctl(epfd, EPOLL_CTL_ADD, fd, newEvents); |
|
176 } else { |
|
177 // modify events |
|
178 EPoll.ctl(epfd, EPOLL_CTL_MOD, fd, newEvents); |
|
179 } |
181 } |
180 } |
182 } else if (ops != 0) { |
181 ski.registeredEvents(newEvents); |
183 // add to epoll |
|
184 EPoll.ctl(epfd, EPOLL_CTL_ADD, fd, ops); |
|
185 registered.set(fd); |
|
186 } |
182 } |
187 } |
183 } |
188 } |
184 } |
189 } |
185 } |
190 } |
186 } |
191 |
187 |
192 /** |
188 /** |
193 * Update the keys whose fd's have been selected by the epoll. |
189 * Update the keys of file descriptors that were polled and add them to |
194 * Add the ready keys to the ready queue. |
190 * the selected-key set. |
|
191 * If the interrupt fd has been selected, drain it and clear the interrupt. |
195 */ |
192 */ |
196 private int updateSelectedKeys(int numEntries) throws IOException { |
193 private int updateSelectedKeys(int numEntries) throws IOException { |
197 assert Thread.holdsLock(this); |
194 assert Thread.holdsLock(this); |
198 assert Thread.holdsLock(nioSelectedKeys()); |
195 assert Thread.holdsLock(nioSelectedKeys()); |
199 |
196 |
243 FileDispatcherImpl.closeIntFD(epfd); |
239 FileDispatcherImpl.closeIntFD(epfd); |
244 EPoll.freePollArray(pollArrayAddress); |
240 EPoll.freePollArray(pollArrayAddress); |
245 |
241 |
246 FileDispatcherImpl.closeIntFD(fd0); |
242 FileDispatcherImpl.closeIntFD(fd0); |
247 FileDispatcherImpl.closeIntFD(fd1); |
243 FileDispatcherImpl.closeIntFD(fd1); |
248 |
|
249 // Deregister channels |
|
250 Iterator<SelectionKey> i = keys.iterator(); |
|
251 while (i.hasNext()) { |
|
252 SelectionKeyImpl ski = (SelectionKeyImpl)i.next(); |
|
253 deregister(ski); |
|
254 SelectableChannel selch = ski.channel(); |
|
255 if (!selch.isOpen() && !selch.isRegistered()) |
|
256 ((SelChImpl)selch).kill(); |
|
257 i.remove(); |
|
258 } |
|
259 } |
244 } |
260 |
245 |
261 @Override |
246 @Override |
262 protected void implRegister(SelectionKeyImpl ski) { |
247 protected void implRegister(SelectionKeyImpl ski) { |
263 assert Thread.holdsLock(nioKeys()); |
|
264 ensureOpen(); |
248 ensureOpen(); |
265 synchronized (updateLock) { |
249 synchronized (updateLock) { |
266 newKeys.addLast(ski); |
250 newKeys.addLast(ski); |
267 } |
251 } |
268 keys.add(ski); |
|
269 } |
252 } |
270 |
253 |
271 @Override |
254 @Override |
272 protected void implDereg(SelectionKeyImpl ski) throws IOException { |
255 protected void implDereg(SelectionKeyImpl ski) throws IOException { |
273 assert !ski.isValid(); |
256 assert !ski.isValid(); |
274 assert Thread.holdsLock(this); |
257 assert Thread.holdsLock(this); |
275 assert Thread.holdsLock(nioKeys()); |
|
276 assert Thread.holdsLock(nioSelectedKeys()); |
|
277 |
258 |
278 int fd = ski.channel.getFDVal(); |
259 int fd = ski.channel.getFDVal(); |
279 fdToKey.remove(fd); |
260 if (fdToKey.remove(fd) != null) { |
280 if (registered.get(fd)) { |
261 if (ski.registeredEvents() != 0) { |
281 EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0); |
262 EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0); |
282 registered.clear(fd); |
263 ski.registeredEvents(0); |
283 } |
264 } |
284 |
265 } else { |
285 selectedKeys.remove(ski); |
266 assert ski.registeredEvents() == 0; |
286 keys.remove(ski); |
267 } |
287 |
268 } |
288 // remove from channel's key set |
269 |
289 deregister(ski); |
270 @Override |
290 |
271 public void putEventOps(SelectionKeyImpl ski, int events) { |
291 SelectableChannel selch = ski.channel(); |
|
292 if (!selch.isOpen() && !selch.isRegistered()) |
|
293 ((SelChImpl) selch).kill(); |
|
294 } |
|
295 |
|
296 @Override |
|
297 public void putEventOps(SelectionKeyImpl ski, int ops) { |
|
298 ensureOpen(); |
272 ensureOpen(); |
299 synchronized (updateLock) { |
273 synchronized (updateLock) { |
300 updateOps.addLast(ops); // ops first in case adding the key fails |
274 updateEvents.addLast(events); // events first in case adding key fails |
301 updateKeys.addLast(ski); |
275 updateKeys.addLast(ski); |
302 } |
276 } |
303 } |
277 } |
304 |
278 |
305 @Override |
279 @Override |