192 private void park(FileDescriptor fd, int event) throws IOException { |
192 private void park(FileDescriptor fd, int event) throws IOException { |
193 park(fd, event, 0); |
193 park(fd, event, 0); |
194 } |
194 } |
195 |
195 |
196 /** |
196 /** |
197 * Ensures that the socket is configured non-blocking when a timeout is specified. |
197 * Configures the socket to be non-blocking (if not already non-blocking) |
198 * @throws IOException if there is an I/O error changing the blocking mode |
198 * @throws IOException if there is an I/O error changing the blocking mode |
199 */ |
199 */ |
200 private void configureNonBlockingIfNeeded(FileDescriptor fd, int timeout) |
200 private void configureNonBlocking(FileDescriptor fd) throws IOException { |
201 throws IOException |
201 if (!nonBlocking) { |
202 { |
|
203 if (timeout > 0 && !nonBlocking) { |
|
204 assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); |
202 assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); |
205 IOUtil.configureBlocking(fd, false); |
203 IOUtil.configureBlocking(fd, false); |
206 nonBlocking = true; |
204 nonBlocking = true; |
207 } |
205 } |
208 } |
206 } |
255 |
253 |
256 /** |
254 /** |
257 * Reads bytes from the socket into the given byte array with a timeout. |
255 * Reads bytes from the socket into the given byte array with a timeout. |
258 * @throws SocketTimeoutException if the read timeout elapses |
256 * @throws SocketTimeoutException if the read timeout elapses |
259 */ |
257 */ |
260 private int timedRead(FileDescriptor fd, byte[] b, int off, int len, int millis) |
258 private int timedRead(FileDescriptor fd, byte[] b, int off, int len, long nanos) |
261 throws IOException |
259 throws IOException |
262 { |
260 { |
263 assert nonBlocking; |
261 assert nonBlocking; |
264 long nanos = NANOSECONDS.convert(millis, TimeUnit.MILLISECONDS); |
|
265 long remainingNanos = nanos; |
262 long remainingNanos = nanos; |
266 long startNanos = System.nanoTime(); |
263 long startNanos = System.nanoTime(); |
267 int n; |
264 int n; |
268 do { |
265 do { |
269 park(fd, Net.POLLIN, remainingNanos); |
266 park(fd, Net.POLLIN, remainingNanos); |
291 if (connectionReset) |
288 if (connectionReset) |
292 throw new SocketException("Connection reset"); |
289 throw new SocketException("Connection reset"); |
293 if (isInputClosed) |
290 if (isInputClosed) |
294 return -1; |
291 return -1; |
295 int timeout = this.timeout; |
292 int timeout = this.timeout; |
296 configureNonBlockingIfNeeded(fd, timeout); |
293 if (timeout > 0) |
|
294 configureNonBlocking(fd); |
297 n = tryRead(fd, b, off, len); |
295 n = tryRead(fd, b, off, len); |
298 if (IOStatus.okayToRetry(n) && isOpen()) { |
296 if (IOStatus.okayToRetry(n) && isOpen()) { |
299 if (timeout > 0) { |
297 if (timeout > 0) { |
300 // read with timeout |
298 // read with timeout |
301 n = timedRead(fd, b, off, len, timeout); |
299 long nanos = NANOSECONDS.convert(timeout, MILLISECONDS); |
|
300 n = timedRead(fd, b, off, len, nanos); |
302 } else { |
301 } else { |
303 // read, no timeout |
302 // read, no timeout |
304 do { |
303 do { |
305 park(fd, Net.POLLIN); |
304 park(fd, Net.POLLIN); |
306 n = tryRead(fd, b, off, len); |
305 n = tryRead(fd, b, off, len); |
524 |
523 |
525 /** |
524 /** |
526 * Waits for a connection attempt to finish with a timeout |
525 * Waits for a connection attempt to finish with a timeout |
527 * @throws SocketTimeoutException if the connect timeout elapses |
526 * @throws SocketTimeoutException if the connect timeout elapses |
528 */ |
527 */ |
529 private void timedFinishConnect(FileDescriptor fd, int millis) throws IOException { |
528 private void timedFinishConnect(FileDescriptor fd, long nanos) throws IOException { |
530 long nanos = NANOSECONDS.convert(millis, TimeUnit.MILLISECONDS); |
|
531 long remainingNanos = nanos; |
529 long remainingNanos = nanos; |
532 long startNanos = System.nanoTime(); |
530 long startNanos = System.nanoTime(); |
533 boolean polled; |
531 boolean polled; |
534 do { |
532 do { |
535 park(fd, Net.POLLOUT, remainingNanos); |
533 park(fd, Net.POLLOUT, remainingNanos); |
569 connectLock.lock(); |
567 connectLock.lock(); |
570 try { |
568 try { |
571 boolean connected = false; |
569 boolean connected = false; |
572 FileDescriptor fd = beginConnect(address, port); |
570 FileDescriptor fd = beginConnect(address, port); |
573 try { |
571 try { |
574 configureNonBlockingIfNeeded(fd, millis); |
572 if (millis > 0) |
|
573 configureNonBlocking(fd); |
575 int n = Net.connect(fd, address, port); |
574 int n = Net.connect(fd, address, port); |
576 if (isOpen()) { |
575 if (isOpen()) { |
577 if (n > 0) { |
576 if (n > 0) { |
578 // connection established |
577 // connection established |
579 connected = true; |
578 connected = true; |
580 } else if (IOStatus.okayToRetry(n)) { |
579 } else if (IOStatus.okayToRetry(n)) { |
581 // not established or interrupted |
580 // not established or interrupted |
582 if (millis > 0) { |
581 if (millis > 0) { |
583 // finish connect with timeout |
582 // finish connect with timeout |
584 timedFinishConnect(fd, millis); |
583 long nanos = NANOSECONDS.convert(millis, MILLISECONDS); |
|
584 timedFinishConnect(fd, nanos); |
585 } else { |
585 } else { |
586 // finish connect, no timeout |
586 // finish connect, no timeout |
587 boolean polled; |
587 boolean polled; |
588 do { |
588 do { |
589 park(fd, Net.POLLOUT); |
589 park(fd, Net.POLLOUT); |
677 * @throws SocketTimeoutException if the accept timeout elapses |
677 * @throws SocketTimeoutException if the accept timeout elapses |
678 */ |
678 */ |
679 private int timedAccept(FileDescriptor fd, |
679 private int timedAccept(FileDescriptor fd, |
680 FileDescriptor newfd, |
680 FileDescriptor newfd, |
681 InetSocketAddress[] isaa, |
681 InetSocketAddress[] isaa, |
682 int millis) |
682 long nanos) |
683 throws IOException |
683 throws IOException |
684 { |
684 { |
685 assert nonBlocking; |
685 assert nonBlocking; |
686 long nanos = NANOSECONDS.convert(millis, TimeUnit.MILLISECONDS); |
|
687 long remainingNanos = nanos; |
686 long remainingNanos = nanos; |
688 long startNanos = System.nanoTime(); |
687 long startNanos = System.nanoTime(); |
689 int n; |
688 int n; |
690 do { |
689 do { |
691 park(fd, Net.POLLIN, remainingNanos); |
690 park(fd, Net.POLLIN, remainingNanos); |
704 * Accepts a new connection so that the given SocketImpl is connected to |
703 * Accepts a new connection so that the given SocketImpl is connected to |
705 * the peer. |
704 * the peer. |
706 */ |
705 */ |
707 @Override |
706 @Override |
708 protected void accept(SocketImpl si) throws IOException { |
707 protected void accept(SocketImpl si) throws IOException { |
709 // accept a connection |
|
710 FileDescriptor newfd = new FileDescriptor(); |
708 FileDescriptor newfd = new FileDescriptor(); |
711 InetSocketAddress[] isaa = new InetSocketAddress[1]; |
709 InetSocketAddress[] isaa = new InetSocketAddress[1]; |
712 |
710 |
|
711 // acquire the lock, adjusting the timeout for cases where several |
|
712 // threads are accepting connections and there is a timeout set |
713 ReentrantLock acceptLock = readLock; |
713 ReentrantLock acceptLock = readLock; |
714 acceptLock.lock(); |
714 int timeout = this.timeout; |
|
715 long remainingNanos = 0; |
|
716 if (timeout > 0) { |
|
717 remainingNanos = tryLock(acceptLock, timeout, MILLISECONDS); |
|
718 if (remainingNanos <= 0) { |
|
719 assert !acceptLock.isHeldByCurrentThread(); |
|
720 throw new SocketTimeoutException("Accept timed out"); |
|
721 } |
|
722 } else { |
|
723 acceptLock.lock(); |
|
724 } |
|
725 |
|
726 // accept a connection |
715 try { |
727 try { |
716 int n = 0; |
728 int n = 0; |
717 FileDescriptor fd = beginAccept(); |
729 FileDescriptor fd = beginAccept(); |
718 try { |
730 try { |
719 int timeout = this.timeout; |
731 if (remainingNanos > 0) |
720 configureNonBlockingIfNeeded(fd, timeout); |
732 configureNonBlocking(fd); |
721 n = Net.accept(fd, newfd, isaa); |
733 n = Net.accept(fd, newfd, isaa); |
722 if (IOStatus.okayToRetry(n) && isOpen()) { |
734 if (IOStatus.okayToRetry(n) && isOpen()) { |
723 if (timeout > 0) { |
735 if (remainingNanos > 0) { |
724 // accept with timeout |
736 // accept with timeout |
725 n = timedAccept(fd, newfd, isaa, timeout); |
737 n = timedAccept(fd, newfd, isaa, remainingNanos); |
726 } else { |
738 } else { |
727 // accept, no timeout |
739 // accept, no timeout |
728 do { |
740 do { |
729 park(fd, Net.POLLIN); |
741 park(fd, Net.POLLIN); |
730 n = Net.accept(fd, newfd, isaa); |
742 n = Net.accept(fd, newfd, isaa); |
1217 } |
1229 } |
1218 } |
1230 } |
1219 } |
1231 } |
1220 |
1232 |
1221 /** |
1233 /** |
|
1234 * Attempts to acquire the given lock within the given waiting time. |
|
1235 * @return the remaining time in nanoseconds when the lock is acquired, zero |
|
1236 * or less if the lock was not acquired before the timeout expired |
|
1237 */ |
|
1238 private static long tryLock(ReentrantLock lock, long timeout, TimeUnit unit) { |
|
1239 assert timeout > 0; |
|
1240 boolean interrupted = false; |
|
1241 long nanos = NANOSECONDS.convert(timeout, unit); |
|
1242 long remainingNanos = nanos; |
|
1243 long startNanos = System.nanoTime(); |
|
1244 boolean acquired = false; |
|
1245 while (!acquired && (remainingNanos > 0)) { |
|
1246 try { |
|
1247 acquired = lock.tryLock(remainingNanos, NANOSECONDS); |
|
1248 } catch (InterruptedException e) { |
|
1249 interrupted = true; |
|
1250 } |
|
1251 remainingNanos = nanos - (System.nanoTime() - startNanos); |
|
1252 } |
|
1253 if (acquired && remainingNanos <= 0L) |
|
1254 lock.unlock(); // release lock if timeout has expired |
|
1255 if (interrupted) |
|
1256 Thread.currentThread().interrupt(); |
|
1257 return remainingNanos; |
|
1258 } |
|
1259 |
|
1260 /** |
1222 * Returns the socket protocol family. |
1261 * Returns the socket protocol family. |
1223 */ |
1262 */ |
1224 private static ProtocolFamily family() { |
1263 private static ProtocolFamily family() { |
1225 if (Net.isIPv6Available()) { |
1264 if (Net.isIPv6Available()) { |
1226 return StandardProtocolFamily.INET6; |
1265 return StandardProtocolFamily.INET6; |