34 import java.nio.channels.spi.AbstractSelector; |
34 import java.nio.channels.spi.AbstractSelector; |
35 import java.nio.channels.spi.SelectorProvider; |
35 import java.nio.channels.spi.SelectorProvider; |
36 import java.util.Collections; |
36 import java.util.Collections; |
37 import java.util.HashSet; |
37 import java.util.HashSet; |
38 import java.util.Iterator; |
38 import java.util.Iterator; |
|
39 import java.util.Objects; |
39 import java.util.Set; |
40 import java.util.Set; |
40 import java.util.concurrent.ConcurrentHashMap; |
41 import java.util.concurrent.ConcurrentHashMap; |
|
42 import java.util.function.Consumer; |
41 |
43 |
42 |
44 |
43 /** |
45 /** |
44 * Base Selector implementation class. |
46 * Base Selector implementation class. |
45 */ |
47 */ |
49 { |
51 { |
50 // The set of keys registered with this Selector |
52 // The set of keys registered with this Selector |
51 private final Set<SelectionKey> keys; |
53 private final Set<SelectionKey> keys; |
52 |
54 |
53 // The set of keys with data ready for an operation |
55 // The set of keys with data ready for an operation |
54 protected final Set<SelectionKey> selectedKeys; |
56 private final Set<SelectionKey> selectedKeys; |
55 |
57 |
56 // Public views of the key sets |
58 // Public views of the key sets |
57 private final Set<SelectionKey> publicKeys; // Immutable |
59 private final Set<SelectionKey> publicKeys; // Immutable |
58 private final Set<SelectionKey> publicSelectedKeys; // Removal allowed, but not addition |
60 private final Set<SelectionKey> publicSelectedKeys; // Removal allowed, but not addition |
|
61 |
|
62 // used to check for reentrancy |
|
63 private boolean inSelect; |
59 |
64 |
60 protected SelectorImpl(SelectorProvider sp) { |
65 protected SelectorImpl(SelectorProvider sp) { |
61 super(sp); |
66 super(sp); |
62 keys = ConcurrentHashMap.newKeySet(); |
67 keys = ConcurrentHashMap.newKeySet(); |
63 selectedKeys = new HashSet<>(); |
68 selectedKeys = new HashSet<>(); |
81 ensureOpen(); |
86 ensureOpen(); |
82 return publicSelectedKeys; |
87 return publicSelectedKeys; |
83 } |
88 } |
84 |
89 |
85 /** |
90 /** |
86 * Returns the public view of the selected-key set |
|
87 */ |
|
88 protected final Set<SelectionKey> nioSelectedKeys() { |
|
89 return publicSelectedKeys; |
|
90 } |
|
91 |
|
92 /** |
|
93 * Marks the beginning of a select operation that might block |
91 * Marks the beginning of a select operation that might block |
94 */ |
92 */ |
95 protected final void begin(boolean blocking) { |
93 protected final void begin(boolean blocking) { |
96 if (blocking) begin(); |
94 if (blocking) begin(); |
97 } |
95 } |
104 } |
102 } |
105 |
103 |
106 /** |
104 /** |
107 * Selects the keys for channels that are ready for I/O operations. |
105 * Selects the keys for channels that are ready for I/O operations. |
108 * |
106 * |
|
107 * @param action the action to perform, can be null |
109 * @param timeout timeout in milliseconds to wait, 0 to not wait, -1 to |
108 * @param timeout timeout in milliseconds to wait, 0 to not wait, -1 to |
110 * wait indefinitely |
109 * wait indefinitely |
111 */ |
110 */ |
112 protected abstract int doSelect(long timeout) throws IOException; |
111 protected abstract int doSelect(Consumer<SelectionKey> action, long timeout) |
113 |
112 throws IOException; |
114 private int lockAndDoSelect(long timeout) throws IOException { |
113 |
|
114 private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout) |
|
115 throws IOException |
|
116 { |
115 synchronized (this) { |
117 synchronized (this) { |
116 ensureOpen(); |
118 ensureOpen(); |
117 synchronized (publicSelectedKeys) { |
119 if (inSelect) |
118 return doSelect(timeout); |
120 throw new IllegalStateException("select in progress"); |
|
121 inSelect = true; |
|
122 try { |
|
123 synchronized (publicSelectedKeys) { |
|
124 return doSelect(action, timeout); |
|
125 } |
|
126 } finally { |
|
127 inSelect = false; |
119 } |
128 } |
120 } |
129 } |
121 } |
130 } |
122 |
131 |
123 @Override |
132 @Override |
124 public final int select(long timeout) throws IOException { |
133 public final int select(long timeout) throws IOException { |
125 if (timeout < 0) |
134 if (timeout < 0) |
126 throw new IllegalArgumentException("Negative timeout"); |
135 throw new IllegalArgumentException("Negative timeout"); |
127 return lockAndDoSelect((timeout == 0) ? -1 : timeout); |
136 return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout); |
128 } |
137 } |
129 |
138 |
130 @Override |
139 @Override |
131 public final int select() throws IOException { |
140 public final int select() throws IOException { |
132 return lockAndDoSelect(-1); |
141 return lockAndDoSelect(null, -1); |
133 } |
142 } |
134 |
143 |
135 @Override |
144 @Override |
136 public final int selectNow() throws IOException { |
145 public final int selectNow() throws IOException { |
137 return lockAndDoSelect(0); |
146 return lockAndDoSelect(null, 0); |
|
147 } |
|
148 |
|
149 @Override |
|
150 public final int select(Consumer<SelectionKey> action, long timeout) |
|
151 throws IOException |
|
152 { |
|
153 Objects.requireNonNull(action); |
|
154 if (timeout < 0) |
|
155 throw new IllegalArgumentException("Negative timeout"); |
|
156 return lockAndDoSelect(action, (timeout == 0) ? -1 : timeout); |
|
157 } |
|
158 |
|
159 @Override |
|
160 public final int select(Consumer<SelectionKey> action) throws IOException { |
|
161 Objects.requireNonNull(action); |
|
162 return lockAndDoSelect(action, -1); |
|
163 } |
|
164 |
|
165 @Override |
|
166 public final int selectNow(Consumer<SelectionKey> action) throws IOException { |
|
167 Objects.requireNonNull(action); |
|
168 return lockAndDoSelect(action, 0); |
138 } |
169 } |
139 |
170 |
140 /** |
171 /** |
141 * Invoked by implCloseSelector to close the selector. |
172 * Invoked by implCloseSelector to close the selector. |
142 */ |
173 */ |
238 } |
269 } |
239 } |
270 } |
240 } |
271 } |
241 |
272 |
242 /** |
273 /** |
|
274 * Invoked by selection operations to handle ready events. If an action |
|
275 * is specified then it is invoked to handle the key, otherwise the key |
|
276 * is added to the selected-key set (or updated when it is already in the |
|
277 * set). |
|
278 */ |
|
279 protected final int processReadyEvents(int rOps, |
|
280 SelectionKeyImpl ski, |
|
281 Consumer<SelectionKey> action) { |
|
282 if (action != null) { |
|
283 ski.translateAndSetReadyOps(rOps); |
|
284 if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) { |
|
285 action.accept(ski); |
|
286 ensureOpen(); |
|
287 return 1; |
|
288 } |
|
289 } else { |
|
290 assert Thread.holdsLock(publicSelectedKeys); |
|
291 if (selectedKeys.contains(ski)) { |
|
292 if (ski.translateAndUpdateReadyOps(rOps)) { |
|
293 return 1; |
|
294 } |
|
295 } else { |
|
296 ski.translateAndSetReadyOps(rOps); |
|
297 if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) { |
|
298 selectedKeys.add(ski); |
|
299 return 1; |
|
300 } |
|
301 } |
|
302 } |
|
303 return 0; |
|
304 } |
|
305 |
|
306 /** |
243 * Invoked by interestOps to ensure the interest ops are updated at the |
307 * Invoked by interestOps to ensure the interest ops are updated at the |
244 * next selection operation. |
308 * next selection operation. |
245 */ |
309 */ |
246 protected abstract void setEventOps(SelectionKeyImpl ski); |
310 protected abstract void setEventOps(SelectionKeyImpl ski); |
247 } |
311 } |