74 * blocking. If a connect, accept or read is attempted with a timeout then the |
74 * blocking. If a connect, accept or read is attempted with a timeout then the |
75 * socket is changed to non-blocking mode. When in non-blocking mode, operations |
75 * socket is changed to non-blocking mode. When in non-blocking mode, operations |
76 * that don't complete immediately will poll the socket. |
76 * that don't complete immediately will poll the socket. |
77 * |
77 * |
78 * Behavior differences to examine: |
78 * Behavior differences to examine: |
79 * - "Connection reset" handling differs to PlainSocketImpl for cases where |
79 * "Connection reset" handling differs to PlainSocketImpl for cases where |
80 * an application continues to call read or available after a reset. |
80 * an application continues to call read or available after a reset. |
81 * - SocketInputStream extends FileInputStream so can cast to FIS and access FD. |
|
82 */ |
81 */ |
83 |
82 |
84 public class NioSocketImpl extends SocketImpl { |
83 public class NioSocketImpl extends SocketImpl { |
85 private static final NativeDispatcher nd = new SocketDispatcher(); |
84 private static final NativeDispatcher nd = new SocketDispatcher(); |
86 |
85 |
244 return IOStatus.EOF; |
243 return IOStatus.EOF; |
245 } |
244 } |
246 int timeout = this.timeout; |
245 int timeout = this.timeout; |
247 maybeConfigureNonBlocking(fd, timeout); |
246 maybeConfigureNonBlocking(fd, timeout); |
248 n = IOUtil.read(fd, dst, -1, nd); |
247 n = IOUtil.read(fd, dst, -1, nd); |
249 if (statusImpliesRetry(n) && isOpen()) { |
248 if (IOStatus.okayToRetry(n) && isOpen()) { |
250 if (timeout > 0) { |
249 if (timeout > 0) { |
251 // read with timeout |
250 // read with timeout |
252 assert nonBlocking; |
251 assert nonBlocking; |
253 long nanos = NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS); |
252 long nanos = NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS); |
254 do { |
253 do { |
315 try { |
314 try { |
316 int n = 0; |
315 int n = 0; |
317 FileDescriptor fd = beginWrite(); |
316 FileDescriptor fd = beginWrite(); |
318 try { |
317 try { |
319 n = IOUtil.write(fd, dst, -1, nd); |
318 n = IOUtil.write(fd, dst, -1, nd); |
320 while (statusImpliesRetry(n) && isOpen()) { |
319 while (IOStatus.okayToRetry(n) && isOpen()) { |
321 park(Net.POLLOUT); |
320 park(Net.POLLOUT); |
322 n = IOUtil.write(fd, dst, -1, nd); |
321 n = IOUtil.write(fd, dst, -1, nd); |
323 } |
322 } |
324 return n; |
323 return n; |
325 } finally { |
324 } finally { |
437 private FileDescriptor beginConnect(InetAddress address, int port) |
436 private FileDescriptor beginConnect(InetAddress address, int port) |
438 throws IOException |
437 throws IOException |
439 { |
438 { |
440 synchronized (stateLock) { |
439 synchronized (stateLock) { |
441 int state = this.state; |
440 int state = this.state; |
442 if (state >= ST_CLOSING) |
441 if (state != ST_UNCONNECTED) { |
443 throw new SocketException("Socket closed"); |
442 if (state == ST_CONNECTING) |
444 if (state == ST_CONNECTED) |
443 throw new SocketException("Connection in progress"); |
445 throw new SocketException("Already connected"); |
444 if (state == ST_CONNECTED) |
446 assert state == ST_UNCONNECTED; |
445 throw new SocketException("Already connected"); |
|
446 if (state >= ST_CLOSING) |
|
447 throw new SocketException("Socket closed"); |
|
448 assert false; |
|
449 } |
447 this.state = ST_CONNECTING; |
450 this.state = ST_CONNECTING; |
448 |
451 |
449 // invoke beforeTcpConnect hook if not already bound |
452 // invoke beforeTcpConnect hook if not already bound |
450 if (localport == 0) { |
453 if (localport == 0) { |
451 NetHooks.beforeTcpConnect(fd, address, port); |
454 NetHooks.beforeTcpConnect(fd, address, port); |
497 InetAddress address = isa.getAddress(); |
500 InetAddress address = isa.getAddress(); |
498 if (address.isAnyLocalAddress()) |
501 if (address.isAnyLocalAddress()) |
499 address = InetAddress.getLocalHost(); |
502 address = InetAddress.getLocalHost(); |
500 int port = isa.getPort(); |
503 int port = isa.getPort(); |
501 |
504 |
|
505 ReentrantLock connectLock = readLock; |
502 try { |
506 try { |
503 readLock.lock(); |
507 connectLock.lock(); |
504 try { |
508 try { |
505 boolean connected = false; |
509 boolean connected = false; |
506 FileDescriptor fd = beginConnect(address, port); |
510 FileDescriptor fd = beginConnect(address, port); |
507 try { |
511 try { |
508 maybeConfigureNonBlocking(fd, millis); |
512 maybeConfigureNonBlocking(fd, millis); |
509 int n = Net.connect(fd, address, port); |
513 int n = Net.connect(fd, address, port); |
510 if (statusImpliesRetry(n) && isOpen()) { |
514 if (IOStatus.okayToRetry(n) && isOpen()) { |
511 if (millis > 0) { |
515 if (millis > 0) { |
512 // connect with timeout |
516 // connect with timeout |
513 assert nonBlocking; |
517 assert nonBlocking; |
514 long nanos = NANOSECONDS.convert(millis, MILLISECONDS); |
518 long nanos = NANOSECONDS.convert(millis, MILLISECONDS); |
515 do { |
519 do { |
516 long startTime = System.nanoTime(); |
520 long startTime = System.nanoTime(); |
517 park(Net.POLLOUT, nanos); |
521 park(Net.POLLOUT, nanos); |
518 n = Net.polConnectlNow(fd); |
522 n = Net.pollConnectNow(fd); |
519 if (n == 0) { |
523 if (n == 0) { |
520 nanos -= System.nanoTime() - startTime; |
524 nanos -= System.nanoTime() - startTime; |
521 if (nanos <= 0) |
525 if (nanos <= 0) |
522 throw new SocketTimeoutException("connect timeout"); |
526 throw new SocketTimeoutException("connect timeout"); |
523 } |
527 } |
524 } while (n == 0 && isOpen()); |
528 } while (n == 0 && isOpen()); |
525 } else { |
529 } else { |
526 // connect, no timeout |
530 // connect, no timeout |
527 do { |
531 do { |
528 park(Net.POLLOUT); |
532 park(Net.POLLOUT); |
529 n = Net.polConnectlNow(fd); |
533 n = Net.pollConnectNow(fd); |
530 } while ((n == 0 || n == IOStatus.INTERRUPTED) && isOpen()); |
534 } while (n == 0 && isOpen()); |
531 } |
535 } |
532 } |
536 } |
533 connected = (n > 0) && isOpen(); |
537 connected = (n > 0) && isOpen(); |
534 } finally { |
538 } finally { |
535 endConnect(connected); |
539 endConnect(connected); |
536 } |
540 } |
537 } finally { |
541 } finally { |
538 readLock.unlock(); |
542 connectLock.unlock(); |
539 } |
543 } |
540 } catch (IOException ioe) { |
544 } catch (IOException ioe) { |
541 close(); |
545 close(); |
542 throw SocketExceptions.of(ioe, isa); |
546 throw SocketExceptions.of(ioe, isa); |
543 } |
547 } |
619 @Override |
623 @Override |
620 protected void accept(SocketImpl si) throws IOException { |
624 protected void accept(SocketImpl si) throws IOException { |
621 // accept a connection |
625 // accept a connection |
622 FileDescriptor newfd = new FileDescriptor(); |
626 FileDescriptor newfd = new FileDescriptor(); |
623 InetSocketAddress[] isaa = new InetSocketAddress[1]; |
627 InetSocketAddress[] isaa = new InetSocketAddress[1]; |
624 readLock.lock(); |
628 |
|
629 ReentrantLock acceptLock = readLock; |
|
630 acceptLock.lock(); |
625 try { |
631 try { |
626 int n = 0; |
632 int n = 0; |
627 FileDescriptor fd = beginAccept(); |
633 FileDescriptor fd = beginAccept(); |
628 try { |
634 try { |
629 int timeout = this.timeout; |
635 int timeout = this.timeout; |
630 maybeConfigureNonBlocking(fd, timeout); |
636 maybeConfigureNonBlocking(fd, timeout); |
631 n = ServerSocketChannelImpl.accept0(fd, newfd, isaa); |
637 n = ServerSocketChannelImpl.accept0(fd, newfd, isaa); |
632 if (statusImpliesRetry(n) && isOpen()) { |
638 if (IOStatus.okayToRetry(n) && isOpen()) { |
633 if (timeout > 0) { |
639 if (timeout > 0) { |
634 // accept with timeout |
640 // accept with timeout |
635 assert nonBlocking; |
641 assert nonBlocking; |
636 long nanos = NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS); |
642 long nanos = NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS); |
637 do { |
643 do { |
647 } else { |
653 } else { |
648 // accept, no timeout |
654 // accept, no timeout |
649 do { |
655 do { |
650 park(Net.POLLIN); |
656 park(Net.POLLIN); |
651 n = ServerSocketChannelImpl.accept0(fd, newfd, isaa); |
657 n = ServerSocketChannelImpl.accept0(fd, newfd, isaa); |
652 } while (statusImpliesRetry(n) && isOpen()); |
658 } while (IOStatus.okayToRetry(n) && isOpen()); |
653 } |
659 } |
654 } |
660 } |
655 } finally { |
661 } finally { |
656 endAccept(n > 0); |
662 endAccept(n > 0); |
657 assert IOStatus.check(n); |
663 assert IOStatus.check(n); |
658 } |
664 } |
659 } finally { |
665 } finally { |
660 readLock.unlock(); |
666 acceptLock.unlock(); |
661 } |
667 } |
662 |
668 |
663 // get local address and configure accepted socket to blocking mode |
669 // get local address and configure accepted socket to blocking mode |
664 InetSocketAddress localAddress; |
670 InetSocketAddress localAddress; |
665 try { |
671 try { |
1155 return CLOSED.compareAndSet(this, false, true); |
1161 return CLOSED.compareAndSet(this, false, true); |
1156 } |
1162 } |
1157 } |
1163 } |
1158 |
1164 |
1159 /** |
1165 /** |
1160 * Returns true if the error code is UNAVAILABLE or INTERRUPTED, the |
|
1161 * error codes to indicate that an I/O operation should be retried. |
|
1162 */ |
|
1163 private static boolean statusImpliesRetry(int n) { |
|
1164 return n == IOStatus.UNAVAILABLE || n == IOStatus.INTERRUPTED; |
|
1165 } |
|
1166 |
|
1167 /** |
|
1168 * Returns the socket protocol family |
1166 * Returns the socket protocol family |
1169 */ |
1167 */ |
1170 private static ProtocolFamily family() { |
1168 private static ProtocolFamily family() { |
1171 if (Net.isIPv6Available()) { |
1169 if (Net.isIPv6Available()) { |
1172 return StandardProtocolFamily.INET6; |
1170 return StandardProtocolFamily.INET6; |