60 |
61 |
61 class SocketChannelImpl |
62 class SocketChannelImpl |
62 extends SocketChannel |
63 extends SocketChannel |
63 implements SelChImpl |
64 implements SelChImpl |
64 { |
65 { |
65 |
|
66 // Used to make native read and write calls |
66 // Used to make native read and write calls |
67 private static NativeDispatcher nd; |
67 private static NativeDispatcher nd; |
68 |
68 |
69 // Our file descriptor object |
69 // Our file descriptor object |
70 private final FileDescriptor fd; |
70 private final FileDescriptor fd; |
71 private final int fdVal; |
71 private final int fdVal; |
72 |
72 |
73 // IDs of native threads doing reads and writes, for signalling |
|
74 private volatile long readerThread; |
|
75 private volatile long writerThread; |
|
76 |
|
77 // Lock held by current reading or connecting thread |
73 // Lock held by current reading or connecting thread |
78 private final ReentrantLock readLock = new ReentrantLock(); |
74 private final ReentrantLock readLock = new ReentrantLock(); |
79 |
75 |
80 // Lock held by current writing or connecting thread |
76 // Lock held by current writing or connecting thread |
81 private final ReentrantLock writeLock = new ReentrantLock(); |
77 private final ReentrantLock writeLock = new ReentrantLock(); |
82 |
78 |
83 // Lock held by any thread that modifies the state fields declared below |
79 // Lock held by any thread that modifies the state fields declared below |
84 // DO NOT invoke a blocking I/O operation while holding this lock! |
80 // DO NOT invoke a blocking I/O operation while holding this lock! |
85 private final Object stateLock = new Object(); |
81 private final Object stateLock = new Object(); |
86 |
82 |
|
83 // Input/Output closed |
|
84 private volatile boolean isInputClosed; |
|
85 private volatile boolean isOutputClosed; |
|
86 |
87 // -- The following fields are protected by stateLock |
87 // -- The following fields are protected by stateLock |
88 |
88 |
89 // set true when exclusive binding is on and SO_REUSEADDR is emulated |
89 // set true when exclusive binding is on and SO_REUSEADDR is emulated |
90 private boolean isReuseAddress; |
90 private boolean isReuseAddress; |
91 |
91 |
92 // State, increases monotonically |
92 // State, increases monotonically |
93 private static final int ST_UNINITIALIZED = -1; |
|
94 private static final int ST_UNCONNECTED = 0; |
93 private static final int ST_UNCONNECTED = 0; |
95 private static final int ST_PENDING = 1; |
94 private static final int ST_CONNECTIONPENDING = 1; |
96 private static final int ST_CONNECTED = 2; |
95 private static final int ST_CONNECTED = 2; |
97 private static final int ST_KILLPENDING = 3; |
96 private static final int ST_CLOSING = 3; |
98 private static final int ST_KILLED = 4; |
97 private static final int ST_KILLPENDING = 4; |
99 private int state = ST_UNINITIALIZED; |
98 private static final int ST_KILLED = 5; |
|
99 private int state; |
|
100 |
|
101 // IDs of native threads doing reads and writes, for signalling |
|
102 private long readerThread; |
|
103 private long writerThread; |
100 |
104 |
101 // Binding |
105 // Binding |
102 private InetSocketAddress localAddress; |
106 private InetSocketAddress localAddress; |
103 private InetSocketAddress remoteAddress; |
107 private InetSocketAddress remoteAddress; |
104 |
|
105 // Input/Output open |
|
106 private boolean isInputOpen = true; |
|
107 private boolean isOutputOpen = true; |
|
108 |
108 |
109 // Socket adaptor, created on demand |
109 // Socket adaptor, created on demand |
110 private Socket socket; |
110 private Socket socket; |
111 |
111 |
112 // -- End of fields protected by stateLock |
112 // -- End of fields protected by stateLock |
254 set.add(StandardSocketOptions.SO_LINGER); |
253 set.add(StandardSocketOptions.SO_LINGER); |
255 set.add(StandardSocketOptions.TCP_NODELAY); |
254 set.add(StandardSocketOptions.TCP_NODELAY); |
256 // additional options required by socket adaptor |
255 // additional options required by socket adaptor |
257 set.add(StandardSocketOptions.IP_TOS); |
256 set.add(StandardSocketOptions.IP_TOS); |
258 set.add(ExtendedSocketOption.SO_OOBINLINE); |
257 set.add(ExtendedSocketOption.SO_OOBINLINE); |
259 ExtendedSocketOptions extendedOptions = |
258 set.addAll(ExtendedSocketOptions.getInstance().options()); |
260 ExtendedSocketOptions.getInstance(); |
|
261 set.addAll(extendedOptions.options()); |
|
262 return Collections.unmodifiableSet(set); |
259 return Collections.unmodifiableSet(set); |
263 } |
260 } |
264 } |
261 } |
265 |
262 |
266 @Override |
263 @Override |
267 public final Set<SocketOption<?>> supportedOptions() { |
264 public final Set<SocketOption<?>> supportedOptions() { |
268 return DefaultOptionsHolder.defaultOptions; |
265 return DefaultOptionsHolder.defaultOptions; |
269 } |
266 } |
270 |
267 |
271 private boolean ensureReadOpen() throws ClosedChannelException { |
268 /** |
272 synchronized (stateLock) { |
269 * Marks the beginning of a read operation that might block. |
273 if (!isOpen()) |
270 * |
274 throw new ClosedChannelException(); |
271 * @throws ClosedChannelException if the channel is closed |
275 if (!isConnected()) |
272 * @throws NotYetConnectedException if the channel is not yet connected |
|
273 */ |
|
274 private void beginRead(boolean blocking) throws ClosedChannelException { |
|
275 if (blocking) { |
|
276 // set hook for Thread.interrupt |
|
277 begin(); |
|
278 } |
|
279 synchronized (stateLock) { |
|
280 ensureOpen(); |
|
281 if (state != ST_CONNECTED) |
276 throw new NotYetConnectedException(); |
282 throw new NotYetConnectedException(); |
277 if (!isInputOpen) |
283 if (blocking) |
278 return false; |
284 readerThread = NativeThread.current(); |
279 else |
285 } |
280 return true; |
286 } |
281 } |
287 |
282 } |
288 /** |
283 |
289 * Marks the end of a read operation that may have blocked. |
284 private void ensureWriteOpen() throws ClosedChannelException { |
290 * |
285 synchronized (stateLock) { |
291 * @throws AsynchronousCloseException if the channel was closed due to this |
286 if (!isOpen()) |
292 * thread being interrupted on a blocking read operation. |
287 throw new ClosedChannelException(); |
293 */ |
288 if (!isOutputOpen) |
294 private void endRead(boolean blocking, boolean completed) |
289 throw new ClosedChannelException(); |
295 throws AsynchronousCloseException |
290 if (!isConnected()) |
296 { |
291 throw new NotYetConnectedException(); |
297 if (blocking) { |
292 } |
298 synchronized (stateLock) { |
293 } |
299 readerThread = 0; |
294 |
300 // notify any thread waiting in implCloseSelectableChannel |
295 private void readerCleanup() throws IOException { |
301 if (state == ST_CLOSING) { |
296 synchronized (stateLock) { |
302 stateLock.notifyAll(); |
297 readerThread = 0; |
303 } |
298 if (state == ST_KILLPENDING) |
304 } |
299 kill(); |
305 // remove hook for Thread.interrupt |
300 } |
306 end(completed); |
301 } |
307 } |
302 |
308 } |
303 private void writerCleanup() throws IOException { |
309 |
304 synchronized (stateLock) { |
310 @Override |
305 writerThread = 0; |
|
306 if (state == ST_KILLPENDING) |
|
307 kill(); |
|
308 } |
|
309 } |
|
310 |
|
311 public int read(ByteBuffer buf) throws IOException { |
311 public int read(ByteBuffer buf) throws IOException { |
312 |
312 Objects.requireNonNull(buf); |
313 if (buf == null) |
|
314 throw new NullPointerException(); |
|
315 |
313 |
316 readLock.lock(); |
314 readLock.lock(); |
317 try { |
315 try { |
318 if (!ensureReadOpen()) |
316 boolean blocking = isBlocking(); |
319 return -1; |
|
320 int n = 0; |
317 int n = 0; |
321 try { |
318 try { |
322 |
319 beginRead(blocking); |
323 // Set up the interruption machinery; see |
320 |
324 // AbstractInterruptibleChannel for details |
321 // check if input is shutdown |
325 // |
322 if (isInputClosed) |
326 begin(); |
323 return IOStatus.EOF; |
327 |
324 |
328 synchronized (stateLock) { |
325 if (blocking) { |
329 if (!isOpen()) { |
326 do { |
330 // Either the current thread is already interrupted, so |
327 n = IOUtil.read(fd, buf, -1, nd); |
331 // begin() closed the channel, or another thread closed the |
328 } while (n == IOStatus.INTERRUPTED && isOpen()); |
332 // channel since we checked it a few bytecodes ago. In |
329 } else { |
333 // either case the value returned here is irrelevant since |
|
334 // the invocation of end() in the finally block will throw |
|
335 // an appropriate exception. |
|
336 // |
|
337 return 0; |
|
338 |
|
339 } |
|
340 |
|
341 // Save this thread so that it can be signalled on those |
|
342 // platforms that require it |
|
343 // |
|
344 readerThread = NativeThread.current(); |
|
345 } |
|
346 |
|
347 // Between the previous test of isOpen() and the return of the |
|
348 // IOUtil.read invocation below, this channel might be closed |
|
349 // or this thread might be interrupted. We rely upon the |
|
350 // implicit synchronization point in the kernel read() call to |
|
351 // make sure that the right thing happens. In either case the |
|
352 // implCloseSelectableChannel method is ultimately invoked in |
|
353 // some other thread, so there are three possibilities: |
|
354 // |
|
355 // - implCloseSelectableChannel() invokes nd.preClose() |
|
356 // before this thread invokes read(), in which case the |
|
357 // read returns immediately with either EOF or an error, |
|
358 // the latter of which will cause an IOException to be |
|
359 // thrown. |
|
360 // |
|
361 // - implCloseSelectableChannel() invokes nd.preClose() after |
|
362 // this thread is blocked in read(). On some operating |
|
363 // systems (e.g., Solaris and Windows) this causes the read |
|
364 // to return immediately with either EOF or an error |
|
365 // indication. |
|
366 // |
|
367 // - implCloseSelectableChannel() invokes nd.preClose() after |
|
368 // this thread is blocked in read() but the operating |
|
369 // system (e.g., Linux) doesn't support preemptive close, |
|
370 // so implCloseSelectableChannel() proceeds to signal this |
|
371 // thread, thereby causing the read to return immediately |
|
372 // with IOStatus.INTERRUPTED. |
|
373 // |
|
374 // In all three cases the invocation of end() in the finally |
|
375 // clause will notice that the channel has been closed and |
|
376 // throw an appropriate exception (AsynchronousCloseException |
|
377 // or ClosedByInterruptException) if necessary. |
|
378 // |
|
379 // *There is A fourth possibility. implCloseSelectableChannel() |
|
380 // invokes nd.preClose(), signals reader/writer thred and quickly |
|
381 // moves on to nd.close() in kill(), which does a real close. |
|
382 // Then a third thread accepts a new connection, opens file or |
|
383 // whatever that causes the released "fd" to be recycled. All |
|
384 // above happens just between our last isOpen() check and the |
|
385 // next kernel read reached, with the recycled "fd". The solution |
|
386 // is to postpone the real kill() if there is a reader or/and |
|
387 // writer thread(s) over there "waiting", leave the cleanup/kill |
|
388 // to the reader or writer thread. (the preClose() still happens |
|
389 // so the connection gets cut off as usual). |
|
390 // |
|
391 // For socket channels there is the additional wrinkle that |
|
392 // asynchronous shutdown works much like asynchronous close, |
|
393 // except that the channel is shutdown rather than completely |
|
394 // closed. This is analogous to the first two cases above, |
|
395 // except that the shutdown operation plays the role of |
|
396 // nd.preClose(). |
|
397 for (;;) { |
|
398 n = IOUtil.read(fd, buf, -1, nd); |
330 n = IOUtil.read(fd, buf, -1, nd); |
399 if ((n == IOStatus.INTERRUPTED) && isOpen()) { |
331 } |
400 // The system call was interrupted but the channel |
|
401 // is still open, so retry |
|
402 continue; |
|
403 } |
|
404 return IOStatus.normalize(n); |
|
405 } |
|
406 |
|
407 } finally { |
332 } finally { |
408 readerCleanup(); // Clear reader thread |
333 endRead(blocking, n > 0); |
409 // The end method, which is defined in our superclass |
334 if (n <= 0 && isInputClosed) |
410 // AbstractInterruptibleChannel, resets the interruption |
335 return IOStatus.EOF; |
411 // machinery. If its argument is true then it returns |
336 } |
412 // normally; otherwise it checks the interrupt and open state |
337 return IOStatus.normalize(n); |
413 // of this channel and throws an appropriate exception if |
|
414 // necessary. |
|
415 // |
|
416 // So, if we actually managed to do any I/O in the above try |
|
417 // block then we pass true to the end method. We also pass |
|
418 // true if the channel was in non-blocking mode when the I/O |
|
419 // operation was initiated but no data could be transferred; |
|
420 // this prevents spurious exceptions from being thrown in the |
|
421 // rare event that a channel is closed or a thread is |
|
422 // interrupted at the exact moment that a non-blocking I/O |
|
423 // request is made. |
|
424 // |
|
425 end(n > 0 || (n == IOStatus.UNAVAILABLE)); |
|
426 |
|
427 // Extra case for socket channels: Asynchronous shutdown |
|
428 // |
|
429 synchronized (stateLock) { |
|
430 if ((n <= 0) && (!isInputOpen)) |
|
431 return IOStatus.EOF; |
|
432 } |
|
433 |
|
434 assert IOStatus.check(n); |
|
435 |
|
436 } |
|
437 } finally { |
338 } finally { |
438 readLock.unlock(); |
339 readLock.unlock(); |
439 } |
340 } |
440 } |
341 } |
441 |
342 |
|
343 @Override |
442 public long read(ByteBuffer[] dsts, int offset, int length) |
344 public long read(ByteBuffer[] dsts, int offset, int length) |
443 throws IOException |
345 throws IOException |
444 { |
346 { |
445 if ((offset < 0) || (length < 0) || (offset > dsts.length - length)) |
347 Objects.checkFromIndexSize(offset, length, dsts.length); |
446 throw new IndexOutOfBoundsException(); |
348 |
447 readLock.lock(); |
349 readLock.lock(); |
448 try { |
350 try { |
449 if (!ensureReadOpen()) |
351 boolean blocking = isBlocking(); |
450 return -1; |
|
451 long n = 0; |
352 long n = 0; |
452 try { |
353 try { |
453 begin(); |
354 beginRead(blocking); |
454 synchronized (stateLock) { |
355 |
455 if (!isOpen()) |
356 // check if input is shutdown |
456 return 0; |
357 if (isInputClosed) |
457 readerThread = NativeThread.current(); |
358 return IOStatus.EOF; |
458 } |
359 |
459 |
360 if (blocking) { |
460 for (;;) { |
361 do { |
|
362 n = IOUtil.read(fd, dsts, offset, length, nd); |
|
363 } while (n == IOStatus.INTERRUPTED && isOpen()); |
|
364 } else { |
461 n = IOUtil.read(fd, dsts, offset, length, nd); |
365 n = IOUtil.read(fd, dsts, offset, length, nd); |
462 if ((n == IOStatus.INTERRUPTED) && isOpen()) |
|
463 continue; |
|
464 return IOStatus.normalize(n); |
|
465 } |
366 } |
466 } finally { |
367 } finally { |
467 readerCleanup(); |
368 endRead(blocking, n > 0); |
468 end(n > 0 || (n == IOStatus.UNAVAILABLE)); |
369 if (n <= 0 && isInputClosed) |
469 synchronized (stateLock) { |
370 return IOStatus.EOF; |
470 if ((n <= 0) && (!isInputOpen)) |
371 } |
471 return IOStatus.EOF; |
372 return IOStatus.normalize(n); |
472 } |
|
473 assert IOStatus.check(n); |
|
474 } |
|
475 } finally { |
373 } finally { |
476 readLock.unlock(); |
374 readLock.unlock(); |
477 } |
375 } |
478 } |
376 } |
479 |
377 |
|
378 /** |
|
379 * Marks the beginning of a write operation that might block. |
|
380 * |
|
381 * @throws ClosedChannelException if the channel is closed or output shutdown |
|
382 * @throws NotYetConnectedException if the channel is not yet connected |
|
383 */ |
|
384 private void beginWrite(boolean blocking) throws ClosedChannelException { |
|
385 if (blocking) { |
|
386 // set hook for Thread.interrupt |
|
387 begin(); |
|
388 } |
|
389 synchronized (stateLock) { |
|
390 ensureOpen(); |
|
391 if (isOutputClosed) |
|
392 throw new ClosedChannelException(); |
|
393 if (state != ST_CONNECTED) |
|
394 throw new NotYetConnectedException(); |
|
395 if (blocking) |
|
396 writerThread = NativeThread.current(); |
|
397 } |
|
398 } |
|
399 |
|
400 /** |
|
401 * Marks the end of a write operation that may have blocked. |
|
402 * |
|
403 * @throws AsynchronousCloseException if the channel was closed due to this |
|
404 * thread being interrupted on a blocking write operation. |
|
405 */ |
|
406 private void endWrite(boolean blocking, boolean completed) |
|
407 throws AsynchronousCloseException |
|
408 { |
|
409 if (blocking) { |
|
410 synchronized (stateLock) { |
|
411 writerThread = 0; |
|
412 // notify any thread waiting in implCloseSelectableChannel |
|
413 if (state == ST_CLOSING) { |
|
414 stateLock.notifyAll(); |
|
415 } |
|
416 } |
|
417 // remove hook for Thread.interrupt |
|
418 end(completed); |
|
419 } |
|
420 } |
|
421 |
|
422 @Override |
480 public int write(ByteBuffer buf) throws IOException { |
423 public int write(ByteBuffer buf) throws IOException { |
481 if (buf == null) |
424 Objects.requireNonNull(buf); |
482 throw new NullPointerException(); |
425 |
483 writeLock.lock(); |
426 writeLock.lock(); |
484 try { |
427 try { |
485 ensureWriteOpen(); |
428 boolean blocking = isBlocking(); |
486 int n = 0; |
429 int n = 0; |
487 try { |
430 try { |
488 begin(); |
431 beginWrite(blocking); |
489 synchronized (stateLock) { |
432 if (blocking) { |
490 if (!isOpen()) |
433 do { |
491 return 0; |
434 n = IOUtil.write(fd, buf, -1, nd); |
492 writerThread = NativeThread.current(); |
435 } while (n == IOStatus.INTERRUPTED && isOpen()); |
493 } |
436 } else { |
494 for (;;) { |
|
495 n = IOUtil.write(fd, buf, -1, nd); |
437 n = IOUtil.write(fd, buf, -1, nd); |
496 if ((n == IOStatus.INTERRUPTED) && isOpen()) |
|
497 continue; |
|
498 return IOStatus.normalize(n); |
|
499 } |
438 } |
500 } finally { |
439 } finally { |
501 writerCleanup(); |
440 endWrite(blocking, n > 0); |
502 end(n > 0 || (n == IOStatus.UNAVAILABLE)); |
441 if (n <= 0 && isOutputClosed) |
503 synchronized (stateLock) { |
442 throw new AsynchronousCloseException(); |
504 if ((n <= 0) && (!isOutputOpen)) |
443 } |
505 throw new AsynchronousCloseException(); |
444 return IOStatus.normalize(n); |
506 } |
|
507 assert IOStatus.check(n); |
|
508 } |
|
509 } finally { |
445 } finally { |
510 writeLock.unlock(); |
446 writeLock.unlock(); |
511 } |
447 } |
512 } |
448 } |
513 |
449 |
|
450 @Override |
514 public long write(ByteBuffer[] srcs, int offset, int length) |
451 public long write(ByteBuffer[] srcs, int offset, int length) |
515 throws IOException |
452 throws IOException |
516 { |
453 { |
517 if ((offset < 0) || (length < 0) || (offset > srcs.length - length)) |
454 Objects.checkFromIndexSize(offset, length, srcs.length); |
518 throw new IndexOutOfBoundsException(); |
455 |
519 writeLock.lock(); |
456 writeLock.lock(); |
520 try { |
457 try { |
521 ensureWriteOpen(); |
458 boolean blocking = isBlocking(); |
522 long n = 0; |
459 long n = 0; |
523 try { |
460 try { |
524 begin(); |
461 beginWrite(blocking); |
525 synchronized (stateLock) { |
462 if (blocking) { |
526 if (!isOpen()) |
463 do { |
527 return 0; |
464 n = IOUtil.write(fd, srcs, offset, length, nd); |
528 writerThread = NativeThread.current(); |
465 } while (n == IOStatus.INTERRUPTED && isOpen()); |
529 } |
466 } else { |
530 for (;;) { |
|
531 n = IOUtil.write(fd, srcs, offset, length, nd); |
467 n = IOUtil.write(fd, srcs, offset, length, nd); |
532 if ((n == IOStatus.INTERRUPTED) && isOpen()) |
|
533 continue; |
|
534 return IOStatus.normalize(n); |
|
535 } |
468 } |
536 } finally { |
469 } finally { |
537 writerCleanup(); |
470 endWrite(blocking, n > 0); |
538 end((n > 0) || (n == IOStatus.UNAVAILABLE)); |
471 if (n <= 0 && isOutputClosed) |
539 synchronized (stateLock) { |
472 throw new AsynchronousCloseException(); |
540 if ((n <= 0) && (!isOutputOpen)) |
473 } |
541 throw new AsynchronousCloseException(); |
474 return IOStatus.normalize(n); |
542 } |
|
543 assert IOStatus.check(n); |
|
544 } |
|
545 } finally { |
475 } finally { |
546 writeLock.unlock(); |
476 writeLock.unlock(); |
547 } |
477 } |
548 } |
478 } |
549 |
479 |
550 // package-private |
480 /** |
|
481 * Writes a byte of out of band data. |
|
482 */ |
551 int sendOutOfBandData(byte b) throws IOException { |
483 int sendOutOfBandData(byte b) throws IOException { |
552 writeLock.lock(); |
484 writeLock.lock(); |
553 try { |
485 try { |
554 ensureWriteOpen(); |
486 boolean blocking = isBlocking(); |
555 int n = 0; |
487 int n = 0; |
556 try { |
488 try { |
557 begin(); |
489 beginWrite(blocking); |
558 synchronized (stateLock) { |
490 if (blocking) { |
559 if (!isOpen()) |
491 do { |
560 return 0; |
492 n = sendOutOfBandData(fd, b); |
561 writerThread = NativeThread.current(); |
493 } while (n == IOStatus.INTERRUPTED && isOpen()); |
562 } |
494 } else { |
563 for (;;) { |
|
564 n = sendOutOfBandData(fd, b); |
495 n = sendOutOfBandData(fd, b); |
565 if ((n == IOStatus.INTERRUPTED) && isOpen()) |
|
566 continue; |
|
567 return IOStatus.normalize(n); |
|
568 } |
496 } |
569 } finally { |
497 } finally { |
570 writerCleanup(); |
498 endWrite(blocking, n > 0); |
571 end((n > 0) || (n == IOStatus.UNAVAILABLE)); |
499 if (n <= 0 && isOutputClosed) |
572 synchronized (stateLock) { |
500 throw new AsynchronousCloseException(); |
573 if ((n <= 0) && (!isOutputOpen)) |
501 } |
574 throw new AsynchronousCloseException(); |
502 return IOStatus.normalize(n); |
575 } |
|
576 assert IOStatus.check(n); |
|
577 } |
|
578 } finally { |
503 } finally { |
579 writeLock.unlock(); |
504 writeLock.unlock(); |
580 } |
505 } |
581 } |
506 } |
582 |
507 |
|
508 @Override |
583 protected void implConfigureBlocking(boolean block) throws IOException { |
509 protected void implConfigureBlocking(boolean block) throws IOException { |
584 IOUtil.configureBlocking(fd, block); |
510 readLock.lock(); |
585 } |
511 try { |
586 |
512 writeLock.lock(); |
587 public InetSocketAddress localAddress() { |
513 try { |
|
514 synchronized (stateLock) { |
|
515 ensureOpen(); |
|
516 IOUtil.configureBlocking(fd, block); |
|
517 } |
|
518 } finally { |
|
519 writeLock.unlock(); |
|
520 } |
|
521 } finally { |
|
522 readLock.unlock(); |
|
523 } |
|
524 } |
|
525 |
|
526 /** |
|
527 * Returns the local address, or null if not bound |
|
528 */ |
|
529 InetSocketAddress localAddress() { |
588 synchronized (stateLock) { |
530 synchronized (stateLock) { |
589 return localAddress; |
531 return localAddress; |
590 } |
532 } |
591 } |
533 } |
592 |
534 |
593 public SocketAddress remoteAddress() { |
535 /** |
|
536 * Returns the remote address, or null if not connected |
|
537 */ |
|
538 InetSocketAddress remoteAddress() { |
594 synchronized (stateLock) { |
539 synchronized (stateLock) { |
595 return remoteAddress; |
540 return remoteAddress; |
596 } |
541 } |
597 } |
542 } |
598 |
543 |
626 readLock.unlock(); |
570 readLock.unlock(); |
627 } |
571 } |
628 return this; |
572 return this; |
629 } |
573 } |
630 |
574 |
|
575 @Override |
631 public boolean isConnected() { |
576 public boolean isConnected() { |
632 synchronized (stateLock) { |
577 synchronized (stateLock) { |
633 return (state == ST_CONNECTED); |
578 return (state == ST_CONNECTED); |
634 } |
579 } |
635 } |
580 } |
636 |
581 |
|
582 @Override |
637 public boolean isConnectionPending() { |
583 public boolean isConnectionPending() { |
638 synchronized (stateLock) { |
584 synchronized (stateLock) { |
639 return (state == ST_PENDING); |
585 return (state == ST_CONNECTIONPENDING); |
640 } |
586 } |
641 } |
587 } |
642 |
588 |
643 void ensureOpenAndUnconnected() throws IOException { // package-private |
589 /** |
644 synchronized (stateLock) { |
590 * Marks the beginning of a connect operation that might block. |
645 if (!isOpen()) |
591 * |
646 throw new ClosedChannelException(); |
592 * @throws ClosedChannelException if the channel is closed |
|
593 * @throws AlreadyConnectedException if already connected |
|
594 * @throws ConnectionPendingException is a connection is pending |
|
595 */ |
|
596 private void beginConnect(boolean blocking) throws ClosedChannelException { |
|
597 if (blocking) { |
|
598 // set hook for Thread.interrupt |
|
599 begin(); |
|
600 } |
|
601 synchronized (stateLock) { |
|
602 ensureOpen(); |
647 if (state == ST_CONNECTED) |
603 if (state == ST_CONNECTED) |
648 throw new AlreadyConnectedException(); |
604 throw new AlreadyConnectedException(); |
649 if (state == ST_PENDING) |
605 if (state == ST_CONNECTIONPENDING) |
650 throw new ConnectionPendingException(); |
606 throw new ConnectionPendingException(); |
651 } |
607 if (blocking) |
652 } |
608 readerThread = NativeThread.current(); |
653 |
609 } |
|
610 } |
|
611 |
|
612 /** |
|
613 * Marks the end of a connect operation that may have blocked. |
|
614 * |
|
615 * @throws AsynchronousCloseException if the channel was closed due to this |
|
616 * thread being interrupted on a blocking connect operation. |
|
617 */ |
|
618 private void endConnect(boolean blocking, boolean completed) |
|
619 throws AsynchronousCloseException |
|
620 { |
|
621 endRead(blocking, completed); |
|
622 } |
|
623 |
|
624 @Override |
654 public boolean connect(SocketAddress sa) throws IOException { |
625 public boolean connect(SocketAddress sa) throws IOException { |
|
626 InetSocketAddress isa = Net.checkAddress(sa); |
|
627 SecurityManager sm = System.getSecurityManager(); |
|
628 if (sm != null) |
|
629 sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort()); |
|
630 |
655 readLock.lock(); |
631 readLock.lock(); |
656 try { |
632 try { |
657 writeLock.lock(); |
633 writeLock.lock(); |
658 try { |
634 try { |
659 ensureOpenAndUnconnected(); |
635 // notify before-connect hook |
660 InetSocketAddress isa = Net.checkAddress(sa); |
636 synchronized (stateLock) { |
661 SecurityManager sm = System.getSecurityManager(); |
637 if (state == ST_UNCONNECTED && localAddress == null) { |
662 if (sm != null) |
638 NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort()); |
663 sm.checkConnect(isa.getAddress().getHostAddress(), |
639 } |
664 isa.getPort()); |
640 } |
665 synchronized (blockingLock()) { |
641 |
666 int n = 0; |
642 InetAddress ia = isa.getAddress(); |
|
643 if (ia.isAnyLocalAddress()) |
|
644 ia = InetAddress.getLocalHost(); |
|
645 |
|
646 int n = 0; |
|
647 boolean blocking = isBlocking(); |
|
648 try { |
667 try { |
649 try { |
668 try { |
650 beginConnect(blocking); |
669 begin(); |
651 if (blocking) { |
670 synchronized (stateLock) { |
652 do { |
671 if (!isOpen()) { |
653 n = Net.connect(fd, ia, isa.getPort()); |
672 return false; |
654 } while (n == IOStatus.INTERRUPTED && isOpen()); |
673 } |
655 } else { |
674 // notify hook only if unbound |
656 n = Net.connect(fd, ia, isa.getPort()); |
675 if (localAddress == null) { |
|
676 NetHooks.beforeTcpConnect(fd, |
|
677 isa.getAddress(), |
|
678 isa.getPort()); |
|
679 } |
|
680 readerThread = NativeThread.current(); |
|
681 } |
|
682 for (;;) { |
|
683 InetAddress ia = isa.getAddress(); |
|
684 if (ia.isAnyLocalAddress()) |
|
685 ia = InetAddress.getLocalHost(); |
|
686 n = Net.connect(fd, |
|
687 ia, |
|
688 isa.getPort()); |
|
689 if ((n == IOStatus.INTERRUPTED) && isOpen()) |
|
690 continue; |
|
691 break; |
|
692 } |
|
693 |
|
694 } finally { |
|
695 readerCleanup(); |
|
696 end((n > 0) || (n == IOStatus.UNAVAILABLE)); |
|
697 assert IOStatus.check(n); |
|
698 } |
657 } |
699 } catch (IOException x) { |
658 } finally { |
700 // If an exception was thrown, close the channel after |
659 endConnect(blocking, n > 0); |
701 // invoking end() so as to avoid bogus |
|
702 // AsynchronousCloseExceptions |
|
703 close(); |
|
704 throw x; |
|
705 } |
660 } |
706 synchronized (stateLock) { |
661 } catch (IOException x) { |
707 remoteAddress = isa; |
662 // connect failed, close socket |
708 if (n > 0) { |
663 close(); |
709 |
664 throw x; |
710 // Connection succeeded; disallow further |
665 } |
711 // invocation |
666 |
712 state = ST_CONNECTED; |
667 // connection may be established |
713 if (isOpen()) |
668 synchronized (stateLock) { |
714 localAddress = Net.localAddress(fd); |
669 if (!isOpen()) |
715 return true; |
670 throw new AsynchronousCloseException(); |
716 } |
671 remoteAddress = isa; |
717 // If nonblocking and no exception then connection |
672 if (n > 0) { |
718 // pending; disallow another invocation |
673 // connected established |
719 if (!isBlocking()) |
674 localAddress = Net.localAddress(fd); |
720 state = ST_PENDING; |
675 state = ST_CONNECTED; |
721 else |
676 return true; |
722 assert false; |
677 } else { |
|
678 // connection pending |
|
679 assert !blocking; |
|
680 state = ST_CONNECTIONPENDING; |
|
681 return false; |
723 } |
682 } |
724 } |
683 } |
725 return false; |
|
726 } finally { |
684 } finally { |
727 writeLock.unlock(); |
685 writeLock.unlock(); |
728 } |
686 } |
729 } finally { |
687 } finally { |
730 readLock.unlock(); |
688 readLock.unlock(); |
731 } |
689 } |
732 } |
690 } |
733 |
691 |
|
692 /** |
|
693 * Marks the beginning of a finishConnect operation that might block. |
|
694 * |
|
695 * @throws ClosedChannelException if the channel is closed |
|
696 * @throws NoConnectionPendingException if no connection is pending |
|
697 */ |
|
698 private void beginFinishConnect(boolean blocking) throws ClosedChannelException { |
|
699 if (blocking) { |
|
700 // set hook for Thread.interrupt |
|
701 begin(); |
|
702 } |
|
703 synchronized (stateLock) { |
|
704 ensureOpen(); |
|
705 if (state != ST_CONNECTIONPENDING) |
|
706 throw new NoConnectionPendingException(); |
|
707 if (blocking) |
|
708 readerThread = NativeThread.current(); |
|
709 } |
|
710 } |
|
711 |
|
712 /** |
|
713 * Marks the end of a finishConnect operation that may have blocked. |
|
714 * |
|
715 * @throws AsynchronousCloseException if the channel was closed due to this |
|
716 * thread being interrupted on a blocking connect operation. |
|
717 */ |
|
718 private void endFinishConnect(boolean blocking, boolean completed) |
|
719 throws AsynchronousCloseException |
|
720 { |
|
721 endRead(blocking, completed); |
|
722 } |
|
723 |
|
724 @Override |
734 public boolean finishConnect() throws IOException { |
725 public boolean finishConnect() throws IOException { |
735 readLock.lock(); |
726 readLock.lock(); |
736 try { |
727 try { |
737 writeLock.lock(); |
728 writeLock.lock(); |
738 try { |
729 try { |
|
730 // already connected? |
|
731 synchronized (stateLock) { |
|
732 if (state == ST_CONNECTED) |
|
733 return true; |
|
734 } |
|
735 |
|
736 int n = 0; |
|
737 boolean blocking = isBlocking(); |
|
738 try { |
|
739 try { |
|
740 beginFinishConnect(blocking); |
|
741 if (blocking) { |
|
742 do { |
|
743 n = checkConnect(fd, true); |
|
744 } while (n == 0 || (n == IOStatus.INTERRUPTED) && isOpen()); |
|
745 } else { |
|
746 n = checkConnect(fd, false); |
|
747 } |
|
748 } finally { |
|
749 endFinishConnect(blocking, n > 0); |
|
750 } |
|
751 } catch (IOException x) { |
|
752 close(); |
|
753 throw x; |
|
754 } |
|
755 |
|
756 // post finishConnect, connection may be established |
739 synchronized (stateLock) { |
757 synchronized (stateLock) { |
740 if (!isOpen()) |
758 if (!isOpen()) |
741 throw new ClosedChannelException(); |
759 throw new AsynchronousCloseException(); |
742 if (state == ST_CONNECTED) |
760 if (n > 0) { |
|
761 // connection established |
|
762 localAddress = Net.localAddress(fd); |
|
763 state = ST_CONNECTED; |
743 return true; |
764 return true; |
744 if (state != ST_PENDING) |
765 } else { |
745 throw new NoConnectionPendingException(); |
766 // connection still pending |
746 } |
767 assert !blocking; |
747 int n = 0; |
768 return false; |
748 try { |
|
749 try { |
|
750 begin(); |
|
751 synchronized (blockingLock()) { |
|
752 synchronized (stateLock) { |
|
753 if (!isOpen()) { |
|
754 return false; |
|
755 } |
|
756 readerThread = NativeThread.current(); |
|
757 } |
|
758 if (!isBlocking()) { |
|
759 for (;;) { |
|
760 n = checkConnect(fd, false); |
|
761 if ((n == IOStatus.INTERRUPTED) && isOpen()) |
|
762 continue; |
|
763 break; |
|
764 } |
|
765 } else { |
|
766 for (;;) { |
|
767 n = checkConnect(fd, true); |
|
768 if (n == 0) { |
|
769 // Loop in case of |
|
770 // spurious notifications |
|
771 continue; |
|
772 } |
|
773 if ((n == IOStatus.INTERRUPTED) && isOpen()) |
|
774 continue; |
|
775 break; |
|
776 } |
|
777 } |
|
778 } |
|
779 } finally { |
|
780 synchronized (stateLock) { |
|
781 readerThread = 0; |
|
782 if (state == ST_KILLPENDING) { |
|
783 kill(); |
|
784 // poll()/getsockopt() does not report |
|
785 // error (throws exception, with n = 0) |
|
786 // on Linux platform after dup2 and |
|
787 // signal-wakeup. Force n to 0 so the |
|
788 // end() can throw appropriate exception |
|
789 n = 0; |
|
790 } |
|
791 } |
|
792 end((n > 0) || (n == IOStatus.UNAVAILABLE)); |
|
793 assert IOStatus.check(n); |
|
794 } |
769 } |
795 } catch (IOException x) { |
770 } |
796 // If an exception was thrown, close the channel after |
|
797 // invoking end() so as to avoid bogus |
|
798 // AsynchronousCloseExceptions |
|
799 close(); |
|
800 throw x; |
|
801 } |
|
802 if (n > 0) { |
|
803 synchronized (stateLock) { |
|
804 state = ST_CONNECTED; |
|
805 if (isOpen()) |
|
806 localAddress = Net.localAddress(fd); |
|
807 } |
|
808 return true; |
|
809 } |
|
810 return false; |
|
811 } finally { |
771 } finally { |
812 writeLock.unlock(); |
772 writeLock.unlock(); |
813 } |
773 } |
814 } finally { |
774 } finally { |
815 readLock.unlock(); |
775 readLock.unlock(); |
816 } |
776 } |
817 } |
777 } |
818 |
778 |
|
779 /** |
|
780 * Invoked by implCloseChannel to close the channel. |
|
781 * |
|
782 * This method waits for outstanding I/O operations to complete. When in |
|
783 * blocking mode, the socket is pre-closed and the threads in blocking I/O |
|
784 * operations are signalled to ensure that the outstanding I/O operations |
|
785 * complete quickly. |
|
786 * |
|
787 * If the socket is connected then it is shutdown by this method. The |
|
788 * shutdown ensures that the peer reads EOF for the case that the socket is |
|
789 * not pre-closed or closed by this method. |
|
790 * |
|
791 * The socket is closed by this method when it is not registered with a |
|
792 * Selector. Note that a channel configured blocking may be registered with |
|
793 * a Selector. This arises when a key is canceled and the channel configured |
|
794 * to blocking mode before the key is flushed from the Selector. |
|
795 */ |
|
796 @Override |
|
797 protected void implCloseSelectableChannel() throws IOException { |
|
798 assert !isOpen(); |
|
799 |
|
800 boolean blocking; |
|
801 boolean connected; |
|
802 boolean interrupted = false; |
|
803 |
|
804 // set state to ST_CLOSING |
|
805 synchronized (stateLock) { |
|
806 assert state < ST_CLOSING; |
|
807 blocking = isBlocking(); |
|
808 connected = (state == ST_CONNECTED); |
|
809 state = ST_CLOSING; |
|
810 } |
|
811 |
|
812 // wait for any outstanding I/O operations to complete |
|
813 if (blocking) { |
|
814 synchronized (stateLock) { |
|
815 assert state == ST_CLOSING; |
|
816 long reader = readerThread; |
|
817 long writer = writerThread; |
|
818 if (reader != 0 || writer != 0) { |
|
819 nd.preClose(fd); |
|
820 connected = false; // fd is no longer connected socket |
|
821 |
|
822 if (reader != 0) |
|
823 NativeThread.signal(reader); |
|
824 if (writer != 0) |
|
825 NativeThread.signal(writer); |
|
826 |
|
827 // wait for blocking I/O operations to end |
|
828 while (readerThread != 0 || writerThread != 0) { |
|
829 try { |
|
830 stateLock.wait(); |
|
831 } catch (InterruptedException e) { |
|
832 interrupted = true; |
|
833 } |
|
834 } |
|
835 } |
|
836 } |
|
837 } else { |
|
838 // non-blocking mode: wait for read/write to complete |
|
839 readLock.lock(); |
|
840 try { |
|
841 writeLock.lock(); |
|
842 writeLock.unlock(); |
|
843 } finally { |
|
844 readLock.unlock(); |
|
845 } |
|
846 } |
|
847 |
|
848 // set state to ST_KILLPENDING |
|
849 synchronized (stateLock) { |
|
850 assert state == ST_CLOSING; |
|
851 // if connected, and the channel is registered with a Selector, we |
|
852 // shutdown the output so that the peer reads EOF |
|
853 if (connected && isRegistered()) { |
|
854 try { |
|
855 Net.shutdown(fd, Net.SHUT_WR); |
|
856 } catch (IOException ignore) { } |
|
857 } |
|
858 state = ST_KILLPENDING; |
|
859 } |
|
860 |
|
861 // close socket if not registered with Selector |
|
862 if (!isRegistered()) |
|
863 kill(); |
|
864 |
|
865 // restore interrupt status |
|
866 if (interrupted) |
|
867 Thread.currentThread().interrupt(); |
|
868 } |
|
869 |
|
870 @Override |
|
871 public void kill() throws IOException { |
|
872 synchronized (stateLock) { |
|
873 if (state == ST_KILLPENDING) { |
|
874 state = ST_KILLED; |
|
875 nd.close(fd); |
|
876 } |
|
877 } |
|
878 } |
|
879 |
819 @Override |
880 @Override |
820 public SocketChannel shutdownInput() throws IOException { |
881 public SocketChannel shutdownInput() throws IOException { |
821 synchronized (stateLock) { |
882 synchronized (stateLock) { |
822 if (!isOpen()) |
883 ensureOpen(); |
823 throw new ClosedChannelException(); |
|
824 if (!isConnected()) |
884 if (!isConnected()) |
825 throw new NotYetConnectedException(); |
885 throw new NotYetConnectedException(); |
826 if (isInputOpen) { |
886 if (!isInputClosed) { |
827 Net.shutdown(fd, Net.SHUT_RD); |
887 Net.shutdown(fd, Net.SHUT_RD); |
828 if (readerThread != 0) |
888 long thread = readerThread; |
829 NativeThread.signal(readerThread); |
889 if (thread != 0) |
830 isInputOpen = false; |
890 NativeThread.signal(thread); |
|
891 isInputClosed = true; |
831 } |
892 } |
832 return this; |
893 return this; |
833 } |
894 } |
834 } |
895 } |
835 |
896 |
836 @Override |
897 @Override |
837 public SocketChannel shutdownOutput() throws IOException { |
898 public SocketChannel shutdownOutput() throws IOException { |
838 synchronized (stateLock) { |
899 synchronized (stateLock) { |
839 if (!isOpen()) |
900 ensureOpen(); |
840 throw new ClosedChannelException(); |
|
841 if (!isConnected()) |
901 if (!isConnected()) |
842 throw new NotYetConnectedException(); |
902 throw new NotYetConnectedException(); |
843 if (isOutputOpen) { |
903 if (!isOutputClosed) { |
844 Net.shutdown(fd, Net.SHUT_WR); |
904 Net.shutdown(fd, Net.SHUT_WR); |
845 if (writerThread != 0) |
905 long thread = writerThread; |
846 NativeThread.signal(writerThread); |
906 if (thread != 0) |
847 isOutputOpen = false; |
907 NativeThread.signal(thread); |
|
908 isOutputClosed = true; |
848 } |
909 } |
849 return this; |
910 return this; |
850 } |
911 } |
851 } |
912 } |
852 |
913 |
853 public boolean isInputOpen() { |
914 boolean isInputOpen() { |
854 synchronized (stateLock) { |
915 return !isInputClosed; |
855 return isInputOpen; |
916 } |
856 } |
917 |
857 } |
918 boolean isOutputOpen() { |
858 |
919 return !isOutputClosed; |
859 public boolean isOutputOpen() { |
920 } |
860 synchronized (stateLock) { |
921 |
861 return isOutputOpen; |
922 /** |
862 } |
923 * Poll this channel's socket for reading up to the given timeout. |
863 } |
924 * @return {@code true} if the socket is polled |
864 |
925 */ |
865 // AbstractInterruptibleChannel synchronizes invocations of this method |
926 boolean pollRead(long timeout) throws IOException { |
866 // using AbstractInterruptibleChannel.closeLock, and also ensures that this |
927 boolean blocking = isBlocking(); |
867 // method is only ever invoked once. Before we get to this method, isOpen |
928 assert Thread.holdsLock(blockingLock()) && blocking; |
868 // (which is volatile) will have been set to false. |
929 |
869 // |
930 readLock.lock(); |
870 protected void implCloseSelectableChannel() throws IOException { |
931 try { |
871 synchronized (stateLock) { |
932 boolean polled = false; |
872 isInputOpen = false; |
933 try { |
873 isOutputOpen = false; |
934 beginRead(blocking); |
874 |
935 int n = Net.poll(fd, Net.POLLIN, timeout); |
875 // Close the underlying file descriptor and dup it to a known fd |
936 polled = (n > 0); |
876 // that's already closed. This prevents other operations on this |
937 } finally { |
877 // channel from using the old fd, which might be recycled in the |
938 endRead(blocking, polled); |
878 // meantime and allocated to an entirely different channel. |
939 } |
879 // |
940 return polled; |
880 if (state != ST_KILLED) |
941 } finally { |
881 nd.preClose(fd); |
942 readLock.unlock(); |
882 |
943 } |
883 // Signal native threads, if needed. If a target thread is not |
944 } |
884 // currently blocked in an I/O operation then no harm is done since |
945 |
885 // the signal handler doesn't actually do anything. |
946 /** |
886 // |
947 * Poll this channel's socket for a connection, up to the given timeout. |
887 if (readerThread != 0) |
948 * @return {@code true} if the socket is polled |
888 NativeThread.signal(readerThread); |
949 */ |
889 |
950 boolean pollConnected(long timeout) throws IOException { |
890 if (writerThread != 0) |
951 boolean blocking = isBlocking(); |
891 NativeThread.signal(writerThread); |
952 assert Thread.holdsLock(blockingLock()) && blocking; |
892 |
953 |
893 // If this channel is not registered then it's safe to close the fd |
954 readLock.lock(); |
894 // immediately since we know at this point that no thread is |
955 try { |
895 // blocked in an I/O operation upon the channel and, since the |
956 writeLock.lock(); |
896 // channel is marked closed, no thread will start another such |
957 try { |
897 // operation. If this channel is registered then we don't close |
958 boolean polled = false; |
898 // the fd since it might be in use by a selector. In that case |
959 try { |
899 // closing this channel caused its keys to be cancelled, so the |
960 beginFinishConnect(blocking); |
900 // last selector to deregister a key for this channel will invoke |
961 int n = Net.poll(fd, Net.POLLCONN, timeout); |
901 // kill() to close the fd. |
962 polled = (n > 0); |
902 // |
963 } finally { |
903 if (!isRegistered()) |
964 endFinishConnect(blocking, polled); |
904 kill(); |
965 } |
905 } |
966 return polled; |
906 } |
967 } finally { |
907 |
968 writeLock.unlock(); |
908 public void kill() throws IOException { |
969 } |
909 synchronized (stateLock) { |
970 } finally { |
910 if (state == ST_KILLED) |
971 readLock.unlock(); |
911 return; |
|
912 if (state == ST_UNINITIALIZED) { |
|
913 state = ST_KILLED; |
|
914 return; |
|
915 } |
|
916 assert !isOpen() && !isRegistered(); |
|
917 |
|
918 // Postpone the kill if there is a waiting reader |
|
919 // or writer thread. See the comments in read() for |
|
920 // more detailed explanation. |
|
921 if (readerThread == 0 && writerThread == 0) { |
|
922 nd.close(fd); |
|
923 state = ST_KILLED; |
|
924 } else { |
|
925 state = ST_KILLPENDING; |
|
926 } |
|
927 } |
972 } |
928 } |
973 } |
929 |
974 |
930 /** |
975 /** |
931 * Translates native poll revent ops into a ready operation ops |
976 * Translates native poll revent ops into a ready operation ops |