165 * Disables the current thread for scheduling purposes until the socket is |
165 * Disables the current thread for scheduling purposes until the socket is |
166 * ready for I/O, or is asynchronously closed, for up to the specified |
166 * ready for I/O, or is asynchronously closed, for up to the specified |
167 * waiting time. |
167 * waiting time. |
168 * @throws IOException if an I/O error occurs |
168 * @throws IOException if an I/O error occurs |
169 */ |
169 */ |
170 private void park(int event, long nanos) throws IOException { |
170 private void park(FileDescriptor fd, int event, long nanos) throws IOException { |
171 long millis; |
171 long millis; |
172 if (nanos == 0) { |
172 if (nanos == 0) { |
173 millis = -1; |
173 millis = -1; |
174 } else { |
174 } else { |
175 millis = MILLISECONDS.convert(nanos, NANOSECONDS); |
175 millis = MILLISECONDS.convert(nanos, NANOSECONDS); |
180 /** |
180 /** |
181 * Disables the current thread for scheduling purposes until the socket is |
181 * Disables the current thread for scheduling purposes until the socket is |
182 * ready for I/O or is asynchronously closed. |
182 * ready for I/O or is asynchronously closed. |
183 * @throws IOException if an I/O error occurs |
183 * @throws IOException if an I/O error occurs |
184 */ |
184 */ |
185 private void park(int event) throws IOException { |
185 private void park(FileDescriptor fd, int event) throws IOException { |
186 park(event, 0); |
186 park(fd, event, 0); |
187 } |
187 } |
188 |
188 |
189 /** |
189 /** |
190 * Ensures that the socket is configured non-blocking when a timeout is specified. |
190 * Ensures that the socket is configured non-blocking when a timeout is specified. |
191 * @throws IOException if there is an I/O error changing the blocking mode |
191 * @throws IOException if there is an I/O error changing the blocking mode |
192 */ |
192 */ |
193 private void maybeConfigureNonBlocking(FileDescriptor fd, int timeout) |
193 private void configureNonBlockingIfNeeded(FileDescriptor fd, int timeout) |
194 throws IOException |
194 throws IOException |
195 { |
195 { |
196 assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); |
|
197 if (timeout > 0 && !nonBlocking) { |
196 if (timeout > 0 && !nonBlocking) { |
|
197 assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); |
198 IOUtil.configureBlocking(fd, false); |
198 IOUtil.configureBlocking(fd, false); |
199 nonBlocking = true; |
199 nonBlocking = true; |
200 } |
200 } |
201 } |
201 } |
202 |
202 |
254 * @throws SocketTimeoutException if the read timeout elapses |
253 * @throws SocketTimeoutException if the read timeout elapses |
255 */ |
254 */ |
256 private int read(byte[] b, int off, int len) throws IOException { |
255 private int read(byte[] b, int off, int len) throws IOException { |
257 readLock.lock(); |
256 readLock.lock(); |
258 try { |
257 try { |
|
258 int timeout = this.timeout; |
259 int n = 0; |
259 int n = 0; |
260 FileDescriptor fd = beginRead(); |
260 FileDescriptor fd = beginRead(); |
261 try { |
261 try { |
262 if (isInputClosed) { |
262 if (isInputClosed) |
263 return IOStatus.EOF; |
263 return IOStatus.EOF; |
264 } |
264 configureNonBlockingIfNeeded(fd, timeout); |
265 int timeout = this.timeout; |
|
266 maybeConfigureNonBlocking(fd, timeout); |
|
267 n = tryRead(fd, b, off, len); |
265 n = tryRead(fd, b, off, len); |
268 if (IOStatus.okayToRetry(n) && isOpen()) { |
266 if (IOStatus.okayToRetry(n) && isOpen()) { |
269 if (timeout > 0) { |
267 if (timeout > 0) { |
270 // read with timeout |
268 // read with timeout |
271 assert nonBlocking; |
|
272 long nanos = NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS); |
269 long nanos = NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS); |
273 do { |
270 do { |
274 long startTime = System.nanoTime(); |
271 long startTime = System.nanoTime(); |
275 park(Net.POLLIN, nanos); |
272 park(fd, Net.POLLIN, nanos); |
276 n = tryRead(fd, b, off, len); |
273 n = tryRead(fd, b, off, len); |
277 if (n == IOStatus.UNAVAILABLE) { |
274 if (n == IOStatus.UNAVAILABLE) { |
278 nanos -= System.nanoTime() - startTime; |
275 nanos -= System.nanoTime() - startTime; |
279 if (nanos <= 0) |
276 if (nanos <= 0) |
280 throw new SocketTimeoutException("read timeout"); |
277 throw new SocketTimeoutException("read timeout"); |
281 } |
278 } |
282 } while (n == IOStatus.UNAVAILABLE && isOpen()); |
279 } while (n == IOStatus.UNAVAILABLE && isOpen()); |
283 } else { |
280 } else { |
284 // read, no timeout |
281 // read, no timeout |
285 do { |
282 do { |
286 park(Net.POLLIN); |
283 park(fd, Net.POLLIN); |
287 n = tryRead(fd, b, off, len); |
284 n = tryRead(fd, b, off, len); |
288 } while (IOStatus.okayToRetry(n) && isOpen()); |
285 } while (IOStatus.okayToRetry(n) && isOpen()); |
289 } |
286 } |
290 } |
287 } |
291 return n; |
288 return n; |
352 int n = 0; |
348 int n = 0; |
353 FileDescriptor fd = beginWrite(); |
349 FileDescriptor fd = beginWrite(); |
354 try { |
350 try { |
355 n = tryWrite(fd, b, off, len); |
351 n = tryWrite(fd, b, off, len); |
356 while (IOStatus.okayToRetry(n) && isOpen()) { |
352 while (IOStatus.okayToRetry(n) && isOpen()) { |
357 park(Net.POLLOUT); |
353 park(fd, Net.POLLOUT); |
358 n = tryWrite(fd, b, off, len); |
354 n = tryWrite(fd, b, off, len); |
359 } |
355 } |
360 return n; |
356 return n; |
361 } finally { |
357 } finally { |
362 endWrite(n > 0); |
358 endWrite(n > 0); |
544 connectLock.lock(); |
539 connectLock.lock(); |
545 try { |
540 try { |
546 boolean connected = false; |
541 boolean connected = false; |
547 FileDescriptor fd = beginConnect(address, port); |
542 FileDescriptor fd = beginConnect(address, port); |
548 try { |
543 try { |
549 maybeConfigureNonBlocking(fd, millis); |
544 configureNonBlockingIfNeeded(fd, millis); |
550 int n = Net.connect(fd, address, port); |
545 int n = Net.connect(fd, address, port); |
551 if (IOStatus.okayToRetry(n) && isOpen()) { |
546 if (IOStatus.okayToRetry(n) && isOpen()) { |
552 if (millis > 0) { |
547 if (millis > 0) { |
553 // connect with timeout |
548 // connect with timeout |
554 assert nonBlocking; |
549 assert nonBlocking; |
555 long nanos = NANOSECONDS.convert(millis, MILLISECONDS); |
550 long nanos = NANOSECONDS.convert(millis, MILLISECONDS); |
556 do { |
551 do { |
557 long startTime = System.nanoTime(); |
552 long startTime = System.nanoTime(); |
558 park(Net.POLLOUT, nanos); |
553 park(fd, Net.POLLOUT, nanos); |
559 n = Net.pollConnectNow(fd); |
554 n = Net.pollConnectNow(fd); |
560 if (n == 0) { |
555 if (n == 0) { |
561 nanos -= System.nanoTime() - startTime; |
556 nanos -= System.nanoTime() - startTime; |
562 if (nanos <= 0) |
557 if (nanos <= 0) |
563 throw new SocketTimeoutException("connect timeout"); |
558 throw new SocketTimeoutException("connect timeout"); |
564 } |
559 } |
565 } while (n == 0 && isOpen()); |
560 } while (n == 0 && isOpen()); |
566 } else { |
561 } else { |
567 // connect, no timeout |
562 // connect, no timeout |
568 do { |
563 do { |
569 park(Net.POLLOUT); |
564 park(fd, Net.POLLOUT); |
570 n = Net.pollConnectNow(fd); |
565 n = Net.pollConnectNow(fd); |
571 } while (n == 0 && isOpen()); |
566 } while (n == 0 && isOpen()); |
572 } |
567 } |
573 } |
568 } |
574 connected = (n > 0) && isOpen(); |
569 connected = (n > 0) && isOpen(); |
668 try { |
662 try { |
669 int n = 0; |
663 int n = 0; |
670 FileDescriptor fd = beginAccept(); |
664 FileDescriptor fd = beginAccept(); |
671 try { |
665 try { |
672 int timeout = this.timeout; |
666 int timeout = this.timeout; |
673 maybeConfigureNonBlocking(fd, timeout); |
667 configureNonBlockingIfNeeded(fd, timeout); |
674 n = ServerSocketChannelImpl.accept0(fd, newfd, isaa); |
668 n = ServerSocketChannelImpl.accept0(fd, newfd, isaa); |
675 if (IOStatus.okayToRetry(n) && isOpen()) { |
669 if (IOStatus.okayToRetry(n) && isOpen()) { |
676 if (timeout > 0) { |
670 if (timeout > 0) { |
677 // accept with timeout |
671 // accept with timeout |
678 assert nonBlocking; |
672 assert nonBlocking; |
679 long nanos = NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS); |
673 long nanos = NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS); |
680 do { |
674 do { |
681 long startTime = System.nanoTime(); |
675 long startTime = System.nanoTime(); |
682 park(Net.POLLIN, nanos); |
676 park(fd, Net.POLLIN, nanos); |
683 n = ServerSocketChannelImpl.accept0(fd, newfd, isaa); |
677 n = ServerSocketChannelImpl.accept0(fd, newfd, isaa); |
684 if (n == IOStatus.UNAVAILABLE) { |
678 if (n == IOStatus.UNAVAILABLE) { |
685 nanos -= System.nanoTime() - startTime; |
679 nanos -= System.nanoTime() - startTime; |
686 if (nanos <= 0) |
680 if (nanos <= 0) |
687 throw new SocketTimeoutException("accept timeout"); |
681 throw new SocketTimeoutException("accept timeout"); |
688 } |
682 } |
689 } while (n == IOStatus.UNAVAILABLE && isOpen()); |
683 } while (n == IOStatus.UNAVAILABLE && isOpen()); |
690 } else { |
684 } else { |
691 // accept, no timeout |
685 // accept, no timeout |
692 do { |
686 do { |
693 park(Net.POLLIN); |
687 park(fd, Net.POLLIN); |
694 n = ServerSocketChannelImpl.accept0(fd, newfd, isaa); |
688 n = ServerSocketChannelImpl.accept0(fd, newfd, isaa); |
695 } while (IOStatus.okayToRetry(n) && isOpen()); |
689 } while (IOStatus.okayToRetry(n) && isOpen()); |
696 } |
690 } |
697 } |
691 } |
698 } finally { |
692 } finally { |
736 } |
730 } |
737 |
731 |
738 @Override |
732 @Override |
739 protected InputStream getInputStream() { |
733 protected InputStream getInputStream() { |
740 return new InputStream() { |
734 return new InputStream() { |
741 private volatile boolean eof; |
735 private volatile boolean eof; // to emulate legacy SocketInputStream |
742 @Override |
736 @Override |
743 public int read() throws IOException { |
737 public int read() throws IOException { |
744 byte[] a = new byte[1]; |
738 byte[] a = new byte[1]; |
745 int n = read(a, 0, 1); |
739 int n = read(a, 0, 1); |
746 return (n > 0) ? (a[0] & 0xff) : -1; |
740 return (n > 0) ? (a[0] & 0xff) : -1; |
747 } |
741 } |
748 @Override |
742 @Override |
749 public int read(byte[] b, int off, int len) throws IOException { |
743 public int read(byte[] b, int off, int len) throws IOException { |
750 Objects.checkFromIndexSize(off, len, b.length); |
744 Objects.checkFromIndexSize(off, len, b.length); |
751 if (eof) { |
745 if (eof) { |
752 return -1; |
746 return -1; // return -1, even if socket is closed |
753 } else if (len == 0) { |
747 } else if (len == 0) { |
754 return 0; |
748 return 0; // return 0, even if socket is closed |
755 } else { |
749 } else { |
756 try { |
750 try { |
757 // read up to MAX_BUFFER_SIZE bytes |
751 // read up to MAX_BUFFER_SIZE bytes |
758 int size = Math.min(len, MAX_BUFFER_SIZE); |
752 int size = Math.min(len, MAX_BUFFER_SIZE); |
759 int n = NioSocketImpl.this.read(b, off, size); |
753 int n = NioSocketImpl.this.read(b, off, size); |
1124 writeLock.lock(); |
1118 writeLock.lock(); |
1125 try { |
1119 try { |
1126 int n = 0; |
1120 int n = 0; |
1127 FileDescriptor fd = beginWrite(); |
1121 FileDescriptor fd = beginWrite(); |
1128 try { |
1122 try { |
1129 maybeConfigureNonBlocking(fd, 0); |
|
1130 do { |
1123 do { |
1131 n = Net.sendOOB(fd, (byte) data); |
1124 n = Net.sendOOB(fd, (byte) data); |
1132 } while (n == IOStatus.INTERRUPTED && isOpen()); |
1125 } while (n == IOStatus.INTERRUPTED && isOpen()); |
1133 if (n == IOStatus.UNAVAILABLE) { |
1126 if (n == IOStatus.UNAVAILABLE) { |
1134 throw new RuntimeException("not implemented yet"); |
1127 throw new RuntimeException("not implemented yet"); |