equal
deleted
inserted
replaced
67 * |
67 * |
68 * This implementation attempts to be compatible with legacy PlainSocketImpl, |
68 * This implementation attempts to be compatible with legacy PlainSocketImpl, |
69 * including behavior and exceptions that are not specified by SocketImpl. |
69 * including behavior and exceptions that are not specified by SocketImpl. |
70 * |
70 * |
71 * The underlying socket used by this SocketImpl is initially configured |
71 * The underlying socket used by this SocketImpl is initially configured |
72 * blocking. If a connect, accept or read is attempted with a timeout then the |
72 * blocking. If the connect method is used to establish a connection with a |
73 * socket is changed to non-blocking mode. When in non-blocking mode, operations |
73 * timeout then the socket is configured non-blocking for the connect attempt, |
74 * that don't complete immediately will poll the socket. |
74 * and then restored to blocking mode when the connection is established. |
|
75 * If the accept or read methods are used with a timeout then the socket is |
|
76 * configured non-blocking and is never restored. When in non-blocking mode, |
|
77 * operations that don't complete immediately will poll the socket and preserve |
|
78 * the semantics of blocking operations. |
75 */ |
79 */ |
76 |
80 |
77 public final class NioSocketImpl extends SocketImpl implements PlatformSocketImpl { |
81 public final class NioSocketImpl extends SocketImpl implements PlatformSocketImpl { |
78 private static final NativeDispatcher nd = new SocketDispatcher(); |
82 private static final NativeDispatcher nd = new SocketDispatcher(); |
79 |
83 |
103 |
107 |
104 // set by SocketImpl.create, protected by stateLock |
108 // set by SocketImpl.create, protected by stateLock |
105 private boolean stream; |
109 private boolean stream; |
106 private FileDescriptorCloser closer; |
110 private FileDescriptorCloser closer; |
107 |
111 |
108 // lazily set to true when the socket is configured non-blocking |
112 // set by configureNonBlockingForever when the socket changed to non-blocking |
109 private volatile boolean nonBlocking; |
113 private volatile boolean nonBlocking; |
110 |
114 |
111 // used by connect/read/write/accept, protected by stateLock |
115 // used by connect/read/write/accept, protected by stateLock |
112 private long readerThread; |
116 private long readerThread; |
113 private long writerThread; |
117 private long writerThread; |
187 private void park(FileDescriptor fd, int event) throws IOException { |
191 private void park(FileDescriptor fd, int event) throws IOException { |
188 park(fd, event, 0); |
192 park(fd, event, 0); |
189 } |
193 } |
190 |
194 |
191 /** |
195 /** |
192 * Configures the socket to be non-blocking (if not already non-blocking) |
196 * Configures the socket's blocking mode except when socket has been |
193 * @throws IOException if there is an I/O error changing the blocking mode |
197 * configured non-blocking by {@code configureNonBlockingForever}. |
194 */ |
198 * @throws IOException if closed or there is an I/O error changing the mode |
195 private void configureNonBlocking(FileDescriptor fd) throws IOException { |
199 */ |
|
200 private void configureBlocking(FileDescriptor fd, boolean block) throws IOException { |
|
201 assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); |
196 if (!nonBlocking) { |
202 if (!nonBlocking) { |
197 assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); |
203 stateLock.lock(); |
|
204 try { |
|
205 if (!nonBlocking) { |
|
206 ensureOpen(); |
|
207 IOUtil.configureBlocking(fd, block); |
|
208 } |
|
209 } finally { |
|
210 stateLock.unlock(); |
|
211 } |
|
212 } |
|
213 } |
|
214 |
|
215 /** |
|
216 * Configures the socket to be non-blocking. Once configured to non-blocking |
|
217 * by this method then the blocking mode cannot be changed back to blocking. |
|
218 * @throws IOException if closed or there is an I/O error changing the mode |
|
219 */ |
|
220 private void configureNonBlockingForever(FileDescriptor fd) throws IOException { |
|
221 assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); |
|
222 if (!nonBlocking) { |
198 stateLock.lock(); |
223 stateLock.lock(); |
199 try { |
224 try { |
200 ensureOpen(); |
225 ensureOpen(); |
201 IOUtil.configureBlocking(fd, false); |
226 IOUtil.configureBlocking(fd, false); |
|
227 nonBlocking = true; |
202 } finally { |
228 } finally { |
203 stateLock.unlock(); |
229 stateLock.unlock(); |
204 } |
230 } |
205 nonBlocking = true; |
|
206 } |
231 } |
207 } |
232 } |
208 |
233 |
209 /** |
234 /** |
210 * Marks the beginning of a read operation that might block. |
235 * Marks the beginning of a read operation that might block. |
263 * @throws SocketTimeoutException if the read timeout elapses |
288 * @throws SocketTimeoutException if the read timeout elapses |
264 */ |
289 */ |
265 private int timedRead(FileDescriptor fd, byte[] b, int off, int len, long nanos) |
290 private int timedRead(FileDescriptor fd, byte[] b, int off, int len, long nanos) |
266 throws IOException |
291 throws IOException |
267 { |
292 { |
268 assert nonBlocking; |
|
269 long startNanos = System.nanoTime(); |
293 long startNanos = System.nanoTime(); |
270 int n = tryRead(fd, b, off, len); |
294 int n = tryRead(fd, b, off, len); |
271 while (n == IOStatus.UNAVAILABLE && isOpen()) { |
295 while (n == IOStatus.UNAVAILABLE && isOpen()) { |
272 long remainingNanos = nanos - (System.nanoTime() - startNanos); |
296 long remainingNanos = nanos - (System.nanoTime() - startNanos); |
273 if (remainingNanos <= 0) { |
297 if (remainingNanos <= 0) { |
294 if (isInputClosed) |
318 if (isInputClosed) |
295 return -1; |
319 return -1; |
296 int timeout = this.timeout; |
320 int timeout = this.timeout; |
297 if (timeout > 0) { |
321 if (timeout > 0) { |
298 // read with timeout |
322 // read with timeout |
299 configureNonBlocking(fd); |
323 configureNonBlockingForever(fd); |
300 n = timedRead(fd, b, off, len, MILLISECONDS.toNanos(timeout)); |
324 n = timedRead(fd, b, off, len, MILLISECONDS.toNanos(timeout)); |
301 } else { |
325 } else { |
302 // read, no timeout |
326 // read, no timeout |
303 n = tryRead(fd, b, off, len); |
327 n = tryRead(fd, b, off, len); |
304 while (IOStatus.okayToRetry(n) && isOpen()) { |
328 while (IOStatus.okayToRetry(n) && isOpen()) { |
582 connectLock.lock(); |
606 connectLock.lock(); |
583 try { |
607 try { |
584 boolean connected = false; |
608 boolean connected = false; |
585 FileDescriptor fd = beginConnect(address, port); |
609 FileDescriptor fd = beginConnect(address, port); |
586 try { |
610 try { |
587 if (millis > 0) |
611 |
588 configureNonBlocking(fd); |
612 // configure socket to non-blocking mode when there is a timeout |
|
613 if (millis > 0) { |
|
614 configureBlocking(fd, false); |
|
615 } |
|
616 |
589 int n = Net.connect(fd, address, port); |
617 int n = Net.connect(fd, address, port); |
590 if (n > 0) { |
618 if (n > 0) { |
591 // connection established |
619 // connection established |
592 connected = true; |
620 connected = true; |
593 } else { |
621 } else { |
604 polled = Net.pollConnectNow(fd); |
632 polled = Net.pollConnectNow(fd); |
605 } |
633 } |
606 connected = polled && isOpen(); |
634 connected = polled && isOpen(); |
607 } |
635 } |
608 } |
636 } |
|
637 |
|
638 // restore socket to blocking mode |
|
639 if (connected && millis > 0) { |
|
640 configureBlocking(fd, true); |
|
641 } |
|
642 |
609 } finally { |
643 } finally { |
610 endConnect(fd, connected); |
644 endConnect(fd, connected); |
611 } |
645 } |
612 } finally { |
646 } finally { |
613 connectLock.unlock(); |
647 connectLock.unlock(); |
705 FileDescriptor newfd, |
739 FileDescriptor newfd, |
706 InetSocketAddress[] isaa, |
740 InetSocketAddress[] isaa, |
707 long nanos) |
741 long nanos) |
708 throws IOException |
742 throws IOException |
709 { |
743 { |
710 assert nonBlocking; |
|
711 long startNanos = System.nanoTime(); |
744 long startNanos = System.nanoTime(); |
712 int n = Net.accept(fd, newfd, isaa); |
745 int n = Net.accept(fd, newfd, isaa); |
713 while (n == IOStatus.UNAVAILABLE && isOpen()) { |
746 while (n == IOStatus.UNAVAILABLE && isOpen()) { |
714 long remainingNanos = nanos - (System.nanoTime() - startNanos); |
747 long remainingNanos = nanos - (System.nanoTime() - startNanos); |
715 if (remainingNanos <= 0) { |
748 if (remainingNanos <= 0) { |
754 int n = 0; |
787 int n = 0; |
755 FileDescriptor fd = beginAccept(); |
788 FileDescriptor fd = beginAccept(); |
756 try { |
789 try { |
757 if (remainingNanos > 0) { |
790 if (remainingNanos > 0) { |
758 // accept with timeout |
791 // accept with timeout |
759 configureNonBlocking(fd); |
792 configureNonBlockingForever(fd); |
760 n = timedAccept(fd, newfd, isaa, remainingNanos); |
793 n = timedAccept(fd, newfd, isaa, remainingNanos); |
761 } else { |
794 } else { |
762 // accept, no timeout |
795 // accept, no timeout |
763 n = Net.accept(fd, newfd, isaa); |
796 n = Net.accept(fd, newfd, isaa); |
764 while (IOStatus.okayToRetry(n) && isOpen()) { |
797 while (IOStatus.okayToRetry(n) && isOpen()) { |
884 if ((int) Net.getSocketOption(fd, SO_LINGER) != 0) { |
917 if ((int) Net.getSocketOption(fd, SO_LINGER) != 0) { |
885 Net.shutdown(fd, Net.SHUT_WR); |
918 Net.shutdown(fd, Net.SHUT_WR); |
886 } |
919 } |
887 } catch (IOException ignore) { } |
920 } catch (IOException ignore) { } |
888 |
921 |
889 // interrupt and wait for kernel threads to complete I/O operations |
922 // interrupt and wait for threads to complete I/O operations |
890 long reader = readerThread; |
923 long reader = readerThread; |
891 long writer = writerThread; |
924 long writer = writerThread; |
892 if (reader != 0 || writer != 0) { |
925 if (reader != 0 || writer != 0) { |
893 nd.preClose(fd); |
926 nd.preClose(fd); |
894 |
927 |