57 // used to ensure that the context for an asynchronous accept is visible |
57 // used to ensure that the context for an asynchronous accept is visible |
58 // the pooled thread that handles the I/O event |
58 // the pooled thread that handles the I/O event |
59 private final Object updateLock = new Object(); |
59 private final Object updateLock = new Object(); |
60 |
60 |
61 // pending accept |
61 // pending accept |
62 private PendingFuture<AsynchronousSocketChannel,Object> pendingAccept; |
62 private boolean acceptPending; |
|
63 private CompletionHandler<AsynchronousSocketChannel,Object> acceptHandler; |
|
64 private Object acceptAttachment; |
|
65 private PendingFuture<AsynchronousSocketChannel,Object> acceptFuture; |
63 |
66 |
64 // context for permission check when security manager set |
67 // context for permission check when security manager set |
65 private AccessControlContext acc; |
68 private AccessControlContext acceptAcc; |
66 |
69 |
67 |
70 |
68 UnixAsynchronousServerSocketChannelImpl(Port port) |
71 UnixAsynchronousServerSocketChannelImpl(Port port) |
69 throws IOException |
72 throws IOException |
70 { |
73 { |
81 |
84 |
82 // add mapping from file descriptor to this channel |
85 // add mapping from file descriptor to this channel |
83 port.register(fdVal, this); |
86 port.register(fdVal, this); |
84 } |
87 } |
85 |
88 |
86 // returns and clears the result of a pending accept |
|
87 private PendingFuture<AsynchronousSocketChannel,Object> grabPendingAccept() { |
|
88 synchronized (updateLock) { |
|
89 PendingFuture<AsynchronousSocketChannel,Object> result = pendingAccept; |
|
90 pendingAccept = null; |
|
91 return result; |
|
92 } |
|
93 } |
|
94 |
|
95 @Override |
89 @Override |
96 void implClose() throws IOException { |
90 void implClose() throws IOException { |
97 // remove the mapping |
91 // remove the mapping |
98 port.unregister(fdVal); |
92 port.unregister(fdVal); |
99 |
93 |
100 // close file descriptor |
94 // close file descriptor |
101 nd.close(fd); |
95 nd.close(fd); |
102 |
96 |
103 // if there is a pending accept then complete it |
97 // if there is a pending accept then complete it |
104 final PendingFuture<AsynchronousSocketChannel,Object> result = |
98 CompletionHandler<AsynchronousSocketChannel,Object> handler; |
105 grabPendingAccept(); |
99 Object att; |
106 if (result != null) { |
100 PendingFuture<AsynchronousSocketChannel,Object> future; |
107 // discard the stack trace as otherwise it may appear that implClose |
101 synchronized (updateLock) { |
108 // has thrown the exception. |
102 if (!acceptPending) |
109 AsynchronousCloseException x = new AsynchronousCloseException(); |
103 return; // no pending accept |
110 x.setStackTrace(new StackTraceElement[0]); |
104 acceptPending = false; |
111 result.setFailure(x); |
105 handler = acceptHandler; |
112 |
106 att = acceptAttachment; |
|
107 future = acceptFuture; |
|
108 } |
|
109 |
|
110 // discard the stack trace as otherwise it may appear that implClose |
|
111 // has thrown the exception. |
|
112 AsynchronousCloseException x = new AsynchronousCloseException(); |
|
113 x.setStackTrace(new StackTraceElement[0]); |
|
114 if (handler == null) { |
|
115 future.setFailure(x); |
|
116 } else { |
113 // invoke by submitting task rather than directly |
117 // invoke by submitting task rather than directly |
114 Invoker.invokeIndirectly(result.handler(), result); |
118 Invoker.invokeIndirectly(this, handler, att, null, x); |
115 } |
119 } |
116 } |
120 } |
117 |
121 |
118 @Override |
122 @Override |
119 public AsynchronousChannelGroupImpl group() { |
123 public AsynchronousChannelGroupImpl group() { |
122 |
126 |
123 /** |
127 /** |
124 * Invoked by event handling thread when listener socket is polled |
128 * Invoked by event handling thread when listener socket is polled |
125 */ |
129 */ |
126 @Override |
130 @Override |
127 public void onEvent(int events) { |
131 public void onEvent(int events, boolean mayInvokeDirect) { |
128 PendingFuture<AsynchronousSocketChannel,Object> result = grabPendingAccept(); |
132 synchronized (updateLock) { |
129 if (result == null) |
133 if (!acceptPending) |
130 return; // may have been grabbed by asynchronous close |
134 return; // may have been grabbed by asynchronous close |
|
135 acceptPending = false; |
|
136 } |
131 |
137 |
132 // attempt to accept connection |
138 // attempt to accept connection |
133 FileDescriptor newfd = new FileDescriptor(); |
139 FileDescriptor newfd = new FileDescriptor(); |
134 InetSocketAddress[] isaa = new InetSocketAddress[1]; |
140 InetSocketAddress[] isaa = new InetSocketAddress[1]; |
135 boolean accepted = false; |
141 Throwable exc = null; |
136 try { |
142 try { |
137 begin(); |
143 begin(); |
138 int n = accept0(this.fd, newfd, isaa); |
144 int n = accept0(this.fd, newfd, isaa); |
139 |
145 |
140 // spurious wakeup, is this possible? |
146 // spurious wakeup, is this possible? |
141 if (n == IOStatus.UNAVAILABLE) { |
147 if (n == IOStatus.UNAVAILABLE) { |
142 synchronized (updateLock) { |
148 synchronized (updateLock) { |
143 this.pendingAccept = result; |
149 acceptPending = true; |
144 } |
150 } |
145 port.startPoll(fdVal, Port.POLLIN); |
151 port.startPoll(fdVal, Port.POLLIN); |
146 return; |
152 return; |
147 } |
153 } |
148 |
154 |
149 // connection accepted |
|
150 accepted = true; |
|
151 |
|
152 } catch (Throwable x) { |
155 } catch (Throwable x) { |
153 if (x instanceof ClosedChannelException) |
156 if (x instanceof ClosedChannelException) |
154 x = new AsynchronousCloseException(); |
157 x = new AsynchronousCloseException(); |
155 enableAccept(); |
158 exc = x; |
156 result.setFailure(x); |
|
157 } finally { |
159 } finally { |
158 end(); |
160 end(); |
159 } |
161 } |
160 |
162 |
161 // Connection accepted so finish it when not holding locks. |
163 // Connection accepted so finish it when not holding locks. |
162 AsynchronousSocketChannel child = null; |
164 AsynchronousSocketChannel child = null; |
163 if (accepted) { |
165 if (exc == null) { |
164 try { |
166 try { |
165 child = finishAccept(newfd, isaa[0], acc); |
167 child = finishAccept(newfd, isaa[0], acceptAcc); |
166 enableAccept(); |
|
167 result.setResult(child); |
|
168 } catch (Throwable x) { |
168 } catch (Throwable x) { |
169 enableAccept(); |
|
170 if (!(x instanceof IOException) && !(x instanceof SecurityException)) |
169 if (!(x instanceof IOException) && !(x instanceof SecurityException)) |
171 x = new IOException(x); |
170 x = new IOException(x); |
172 result.setFailure(x); |
171 exc = x; |
173 } |
172 } |
174 } |
173 } |
175 |
174 |
176 // if an async cancel has already cancelled the operation then |
175 // copy field befores accept is re-renabled |
177 // close the new channel so as to free resources |
176 CompletionHandler<AsynchronousSocketChannel,Object> handler = acceptHandler; |
178 if (child != null && result.isCancelled()) { |
177 Object att = acceptAttachment; |
179 try { |
178 PendingFuture<AsynchronousSocketChannel,Object> future = acceptFuture; |
180 child.close(); |
179 |
181 } catch (IOException ignore) { } |
180 // re-enable accepting and invoke handler |
182 } |
181 enableAccept(); |
183 |
182 |
184 // invoke the handler |
183 if (handler == null) { |
185 Invoker.invoke(result.handler(), result); |
184 future.setResult(child, exc); |
|
185 // if an async cancel has already cancelled the operation then |
|
186 // close the new channel so as to free resources |
|
187 if (child != null && future.isCancelled()) { |
|
188 try { |
|
189 child.close(); |
|
190 } catch (IOException ignore) { } |
|
191 } |
|
192 } else { |
|
193 Invoker.invoke(this, handler, att, child, exc); |
|
194 } |
186 } |
195 } |
187 |
196 |
188 /** |
197 /** |
189 * Completes the accept by creating the AsynchronousSocketChannel for |
198 * Completes the accept by creating the AsynchronousSocketChannel for |
190 * the given file descriptor and remote address. If this method completes |
199 * the given file descriptor and remote address. If this method completes |
232 } |
241 } |
233 return ch; |
242 return ch; |
234 } |
243 } |
235 |
244 |
236 @Override |
245 @Override |
237 @SuppressWarnings("unchecked") |
246 Future<AsynchronousSocketChannel> implAccept(Object att, |
238 public <A> Future<AsynchronousSocketChannel> accept(A attachment, |
247 CompletionHandler<AsynchronousSocketChannel,Object> handler) |
239 final CompletionHandler<AsynchronousSocketChannel,? super A> handler) |
|
240 { |
248 { |
241 // complete immediately if channel is closed |
249 // complete immediately if channel is closed |
242 if (!isOpen()) { |
250 if (!isOpen()) { |
243 CompletedFuture<AsynchronousSocketChannel,A> result = CompletedFuture |
251 Throwable e = new ClosedChannelException(); |
244 .withFailure(this, new ClosedChannelException(), attachment); |
252 if (handler == null) { |
245 Invoker.invokeIndirectly(handler, result); |
253 return CompletedFuture.withFailure(e); |
246 return result; |
254 } else { |
|
255 Invoker.invoke(this, handler, att, null, e); |
|
256 return null; |
|
257 } |
247 } |
258 } |
248 if (localAddress == null) |
259 if (localAddress == null) |
249 throw new NotYetBoundException(); |
260 throw new NotYetBoundException(); |
250 |
261 |
251 // cancel was invoked with pending accept so connection may have been |
262 // cancel was invoked with pending accept so connection may have been |
256 // check and set flag to prevent concurrent accepting |
267 // check and set flag to prevent concurrent accepting |
257 if (!accepting.compareAndSet(false, true)) |
268 if (!accepting.compareAndSet(false, true)) |
258 throw new AcceptPendingException(); |
269 throw new AcceptPendingException(); |
259 |
270 |
260 // attempt accept |
271 // attempt accept |
261 AbstractFuture<AsynchronousSocketChannel,A> result = null; |
|
262 FileDescriptor newfd = new FileDescriptor(); |
272 FileDescriptor newfd = new FileDescriptor(); |
263 InetSocketAddress[] isaa = new InetSocketAddress[1]; |
273 InetSocketAddress[] isaa = new InetSocketAddress[1]; |
|
274 Throwable exc = null; |
264 try { |
275 try { |
265 begin(); |
276 begin(); |
266 |
277 |
267 int n = accept0(this.fd, newfd, isaa); |
278 int n = accept0(this.fd, newfd, isaa); |
268 if (n == IOStatus.UNAVAILABLE) { |
279 if (n == IOStatus.UNAVAILABLE) { |
269 // no connection to accept |
|
270 result = new PendingFuture<AsynchronousSocketChannel,A>(this, handler, attachment); |
|
271 |
280 |
272 // need calling context when there is security manager as |
281 // need calling context when there is security manager as |
273 // permission check may be done in a different thread without |
282 // permission check may be done in a different thread without |
274 // any application call frames on the stack |
283 // any application call frames on the stack |
275 synchronized (this) { |
284 PendingFuture<AsynchronousSocketChannel,Object> result = null; |
276 this.acc = (System.getSecurityManager() == null) ? |
285 synchronized (updateLock) { |
|
286 if (handler == null) { |
|
287 this.acceptHandler = null; |
|
288 result = new PendingFuture<AsynchronousSocketChannel,Object>(this); |
|
289 this.acceptFuture = result; |
|
290 } else { |
|
291 this.acceptHandler = handler; |
|
292 this.acceptAttachment = att; |
|
293 } |
|
294 this.acceptAcc = (System.getSecurityManager() == null) ? |
277 null : AccessController.getContext(); |
295 null : AccessController.getContext(); |
278 this.pendingAccept = |
296 this.acceptPending = true; |
279 (PendingFuture<AsynchronousSocketChannel,Object>)result; |
|
280 } |
297 } |
281 |
298 |
282 // register for connections |
299 // register for connections |
283 port.startPoll(fdVal, Port.POLLIN); |
300 port.startPoll(fdVal, Port.POLLIN); |
284 return result; |
301 return result; |
285 } |
302 } |
286 } catch (Throwable x) { |
303 } catch (Throwable x) { |
287 // accept failed |
304 // accept failed |
288 if (x instanceof ClosedChannelException) |
305 if (x instanceof ClosedChannelException) |
289 x = new AsynchronousCloseException(); |
306 x = new AsynchronousCloseException(); |
290 result = CompletedFuture.withFailure(this, x, attachment); |
307 exc = x; |
291 } finally { |
308 } finally { |
292 end(); |
309 end(); |
293 } |
310 } |
294 |
311 |
295 // connection accepted immediately |
312 AsynchronousSocketChannel child = null; |
296 if (result == null) { |
313 if (exc == null) { |
|
314 // connection accepted immediately |
297 try { |
315 try { |
298 AsynchronousSocketChannel ch = finishAccept(newfd, isaa[0], null); |
316 child = finishAccept(newfd, isaa[0], null); |
299 result = CompletedFuture.withResult(this, ch, attachment); |
|
300 } catch (Throwable x) { |
317 } catch (Throwable x) { |
301 result = CompletedFuture.withFailure(this, x, attachment); |
318 exc = x; |
302 } |
319 } |
303 } |
320 } |
304 |
321 |
305 // re-enable accepting and invoke handler |
322 // re-enable accepting before invoking handler |
306 enableAccept(); |
323 enableAccept(); |
307 Invoker.invokeIndirectly(handler, result); |
324 |
308 return result; |
325 if (handler == null) { |
|
326 return CompletedFuture.withResult(child, exc); |
|
327 } else { |
|
328 Invoker.invokeIndirectly(this, handler, att, child, exc); |
|
329 return null; |
|
330 } |
309 } |
331 } |
310 |
332 |
311 // -- Native methods -- |
333 // -- Native methods -- |
312 |
334 |
313 private static native void initIDs(); |
335 private static native void initIDs(); |