src/java.base/share/classes/sun/nio/ch/NioSocketImpl.java
branchniosocketimpl-branch
changeset 57128 3d6cee596b33
parent 57124 8bb7df86576a
child 57172 63ab5af5d009
equal deleted inserted replaced
57124:8bb7df86576a 57128:3d6cee596b33
   165      * Disables the current thread for scheduling purposes until the socket is
   165      * Disables the current thread for scheduling purposes until the socket is
   166      * ready for I/O, or is asynchronously closed, for up to the specified
   166      * ready for I/O, or is asynchronously closed, for up to the specified
   167      * waiting time.
   167      * waiting time.
   168      * @throws IOException if an I/O error occurs
   168      * @throws IOException if an I/O error occurs
   169      */
   169      */
   170     private void park(int event, long nanos) throws IOException {
   170     private void park(FileDescriptor fd, int event, long nanos) throws IOException {
   171         long millis;
   171         long millis;
   172         if (nanos == 0) {
   172         if (nanos == 0) {
   173             millis = -1;
   173             millis = -1;
   174         } else {
   174         } else {
   175             millis = MILLISECONDS.convert(nanos, NANOSECONDS);
   175             millis = MILLISECONDS.convert(nanos, NANOSECONDS);
   180     /**
   180     /**
   181      * Disables the current thread for scheduling purposes until the socket is
   181      * Disables the current thread for scheduling purposes until the socket is
   182      * ready for I/O or is asynchronously closed.
   182      * ready for I/O or is asynchronously closed.
   183      * @throws IOException if an I/O error occurs
   183      * @throws IOException if an I/O error occurs
   184      */
   184      */
   185     private void park(int event) throws IOException {
   185     private void park(FileDescriptor fd, int event) throws IOException {
   186         park(event, 0);
   186         park(fd, event, 0);
   187     }
   187     }
   188 
   188 
   189     /**
   189     /**
   190      * Ensures that the socket is configured non-blocking when a timeout is specified.
   190      * Ensures that the socket is configured non-blocking when a timeout is specified.
   191      * @throws IOException if there is an I/O error changing the blocking mode
   191      * @throws IOException if there is an I/O error changing the blocking mode
   192      */
   192      */
   193     private void maybeConfigureNonBlocking(FileDescriptor fd, int timeout)
   193     private void configureNonBlockingIfNeeded(FileDescriptor fd, int timeout)
   194         throws IOException
   194         throws IOException
   195     {
   195     {
   196         assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
       
   197         if (timeout > 0 && !nonBlocking) {
   196         if (timeout > 0 && !nonBlocking) {
       
   197             assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
   198             IOUtil.configureBlocking(fd, false);
   198             IOUtil.configureBlocking(fd, false);
   199             nonBlocking = true;
   199             nonBlocking = true;
   200         }
   200         }
   201     }
   201     }
   202 
   202 
   206      */
   206      */
   207     private FileDescriptor beginRead() throws SocketException {
   207     private FileDescriptor beginRead() throws SocketException {
   208         synchronized (stateLock) {
   208         synchronized (stateLock) {
   209             ensureOpenAndConnected();
   209             ensureOpenAndConnected();
   210             readerThread = NativeThread.current();
   210             readerThread = NativeThread.current();
   211             assert fd != null;
       
   212             return fd;
   211             return fd;
   213         }
   212         }
   214     }
   213     }
   215 
   214 
   216     /**
   215     /**
   254      * @throws SocketTimeoutException if the read timeout elapses
   253      * @throws SocketTimeoutException if the read timeout elapses
   255      */
   254      */
   256     private int read(byte[] b, int off, int len) throws IOException {
   255     private int read(byte[] b, int off, int len) throws IOException {
   257         readLock.lock();
   256         readLock.lock();
   258         try {
   257         try {
       
   258             int timeout = this.timeout;
   259             int n = 0;
   259             int n = 0;
   260             FileDescriptor fd = beginRead();
   260             FileDescriptor fd = beginRead();
   261             try {
   261             try {
   262                 if (isInputClosed) {
   262                 if (isInputClosed)
   263                     return IOStatus.EOF;
   263                     return IOStatus.EOF;
   264                 }
   264                 configureNonBlockingIfNeeded(fd, timeout);
   265                 int timeout = this.timeout;
       
   266                 maybeConfigureNonBlocking(fd, timeout);
       
   267                 n = tryRead(fd, b, off, len);
   265                 n = tryRead(fd, b, off, len);
   268                 if (IOStatus.okayToRetry(n) && isOpen()) {
   266                 if (IOStatus.okayToRetry(n) && isOpen()) {
   269                     if (timeout > 0) {
   267                     if (timeout > 0) {
   270                         // read with timeout
   268                         // read with timeout
   271                         assert nonBlocking;
       
   272                         long nanos = NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS);
   269                         long nanos = NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS);
   273                         do {
   270                         do {
   274                             long startTime = System.nanoTime();
   271                             long startTime = System.nanoTime();
   275                             park(Net.POLLIN, nanos);
   272                             park(fd, Net.POLLIN, nanos);
   276                             n = tryRead(fd, b, off, len);
   273                             n = tryRead(fd, b, off, len);
   277                             if (n == IOStatus.UNAVAILABLE) {
   274                             if (n == IOStatus.UNAVAILABLE) {
   278                                 nanos -= System.nanoTime() - startTime;
   275                                 nanos -= System.nanoTime() - startTime;
   279                                 if (nanos <= 0)
   276                                 if (nanos <= 0)
   280                                     throw new SocketTimeoutException("read timeout");
   277                                     throw new SocketTimeoutException("read timeout");
   281                             }
   278                             }
   282                         } while (n == IOStatus.UNAVAILABLE && isOpen());
   279                         } while (n == IOStatus.UNAVAILABLE && isOpen());
   283                     } else {
   280                     } else {
   284                         // read, no timeout
   281                         // read, no timeout
   285                         do {
   282                         do {
   286                             park(Net.POLLIN);
   283                             park(fd, Net.POLLIN);
   287                             n = tryRead(fd, b, off, len);
   284                             n = tryRead(fd, b, off, len);
   288                         } while (IOStatus.okayToRetry(n) && isOpen());
   285                         } while (IOStatus.okayToRetry(n) && isOpen());
   289                     }
   286                     }
   290                 }
   287                 }
   291                 return n;
   288                 return n;
   303      */
   300      */
   304     private FileDescriptor beginWrite() throws SocketException {
   301     private FileDescriptor beginWrite() throws SocketException {
   305         synchronized (stateLock) {
   302         synchronized (stateLock) {
   306             ensureOpenAndConnected();
   303             ensureOpenAndConnected();
   307             writerThread = NativeThread.current();
   304             writerThread = NativeThread.current();
   308             assert fd != null;
       
   309             return fd;
   305             return fd;
   310         }
   306         }
   311     }
   307     }
   312 
   308 
   313     /**
   309     /**
   352             int n = 0;
   348             int n = 0;
   353             FileDescriptor fd = beginWrite();
   349             FileDescriptor fd = beginWrite();
   354             try {
   350             try {
   355                 n = tryWrite(fd, b, off, len);
   351                 n = tryWrite(fd, b, off, len);
   356                 while (IOStatus.okayToRetry(n) && isOpen()) {
   352                 while (IOStatus.okayToRetry(n) && isOpen()) {
   357                     park(Net.POLLOUT);
   353                     park(fd, Net.POLLOUT);
   358                     n = tryWrite(fd, b, off, len);
   354                     n = tryWrite(fd, b, off, len);
   359                 }
   355                 }
   360                 return n;
   356                 return n;
   361             } finally {
   357             } finally {
   362                 endWrite(n > 0);
   358                 endWrite(n > 0);
   494             // save the remote address/port
   490             // save the remote address/port
   495             this.address = address;
   491             this.address = address;
   496             this.port = port;
   492             this.port = port;
   497 
   493 
   498             readerThread = NativeThread.current();
   494             readerThread = NativeThread.current();
   499             assert fd != null;
       
   500             return fd;
   495             return fd;
   501         }
   496         }
   502     }
   497     }
   503 
   498 
   504     /**
   499     /**
   544             connectLock.lock();
   539             connectLock.lock();
   545             try {
   540             try {
   546                 boolean connected = false;
   541                 boolean connected = false;
   547                 FileDescriptor fd = beginConnect(address, port);
   542                 FileDescriptor fd = beginConnect(address, port);
   548                 try {
   543                 try {
   549                     maybeConfigureNonBlocking(fd, millis);
   544                     configureNonBlockingIfNeeded(fd, millis);
   550                     int n = Net.connect(fd, address, port);
   545                     int n = Net.connect(fd, address, port);
   551                     if (IOStatus.okayToRetry(n) && isOpen()) {
   546                     if (IOStatus.okayToRetry(n) && isOpen()) {
   552                         if (millis > 0) {
   547                         if (millis > 0) {
   553                             // connect with timeout
   548                             // connect with timeout
   554                             assert nonBlocking;
   549                             assert nonBlocking;
   555                             long nanos = NANOSECONDS.convert(millis, MILLISECONDS);
   550                             long nanos = NANOSECONDS.convert(millis, MILLISECONDS);
   556                             do {
   551                             do {
   557                                 long startTime = System.nanoTime();
   552                                 long startTime = System.nanoTime();
   558                                 park(Net.POLLOUT, nanos);
   553                                 park(fd, Net.POLLOUT, nanos);
   559                                 n = Net.pollConnectNow(fd);
   554                                 n = Net.pollConnectNow(fd);
   560                                 if (n == 0) {
   555                                 if (n == 0) {
   561                                     nanos -= System.nanoTime() - startTime;
   556                                     nanos -= System.nanoTime() - startTime;
   562                                     if (nanos <= 0)
   557                                     if (nanos <= 0)
   563                                         throw new SocketTimeoutException("connect timeout");
   558                                         throw new SocketTimeoutException("connect timeout");
   564                                 }
   559                                 }
   565                             } while (n == 0 && isOpen());
   560                             } while (n == 0 && isOpen());
   566                         } else {
   561                         } else {
   567                             // connect, no timeout
   562                             // connect, no timeout
   568                             do {
   563                             do {
   569                                 park(Net.POLLOUT);
   564                                 park(fd, Net.POLLOUT);
   570                                 n = Net.pollConnectNow(fd);
   565                                 n = Net.pollConnectNow(fd);
   571                             } while (n == 0 && isOpen());
   566                             } while (n == 0 && isOpen());
   572                         }
   567                         }
   573                     }
   568                     }
   574                     connected = (n > 0) && isOpen();
   569                     connected = (n > 0) && isOpen();
   635             if (!stream)
   630             if (!stream)
   636                 throw new SocketException("Not a stream socket");
   631                 throw new SocketException("Not a stream socket");
   637             if (localport == 0)
   632             if (localport == 0)
   638                 throw new SocketException("Not bound");
   633                 throw new SocketException("Not bound");
   639             readerThread = NativeThread.current();
   634             readerThread = NativeThread.current();
   640             assert fd != null;
       
   641             return fd;
   635             return fd;
   642         }
   636         }
   643     }
   637     }
   644 
   638 
   645     /**
   639     /**
   668         try {
   662         try {
   669             int n = 0;
   663             int n = 0;
   670             FileDescriptor fd = beginAccept();
   664             FileDescriptor fd = beginAccept();
   671             try {
   665             try {
   672                 int timeout = this.timeout;
   666                 int timeout = this.timeout;
   673                 maybeConfigureNonBlocking(fd, timeout);
   667                 configureNonBlockingIfNeeded(fd, timeout);
   674                 n = ServerSocketChannelImpl.accept0(fd, newfd, isaa);
   668                 n = ServerSocketChannelImpl.accept0(fd, newfd, isaa);
   675                 if (IOStatus.okayToRetry(n) && isOpen()) {
   669                 if (IOStatus.okayToRetry(n) && isOpen()) {
   676                     if (timeout > 0) {
   670                     if (timeout > 0) {
   677                         // accept with timeout
   671                         // accept with timeout
   678                         assert nonBlocking;
   672                         assert nonBlocking;
   679                         long nanos = NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS);
   673                         long nanos = NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS);
   680                         do {
   674                         do {
   681                             long startTime = System.nanoTime();
   675                             long startTime = System.nanoTime();
   682                             park(Net.POLLIN, nanos);
   676                             park(fd, Net.POLLIN, nanos);
   683                             n = ServerSocketChannelImpl.accept0(fd, newfd, isaa);
   677                             n = ServerSocketChannelImpl.accept0(fd, newfd, isaa);
   684                             if (n == IOStatus.UNAVAILABLE) {
   678                             if (n == IOStatus.UNAVAILABLE) {
   685                                 nanos -= System.nanoTime() - startTime;
   679                                 nanos -= System.nanoTime() - startTime;
   686                                 if (nanos <= 0)
   680                                 if (nanos <= 0)
   687                                     throw new SocketTimeoutException("accept timeout");
   681                                     throw new SocketTimeoutException("accept timeout");
   688                             }
   682                             }
   689                         } while (n == IOStatus.UNAVAILABLE && isOpen());
   683                         } while (n == IOStatus.UNAVAILABLE && isOpen());
   690                     } else {
   684                     } else {
   691                         // accept, no timeout
   685                         // accept, no timeout
   692                         do {
   686                         do {
   693                             park(Net.POLLIN);
   687                             park(fd, Net.POLLIN);
   694                             n = ServerSocketChannelImpl.accept0(fd, newfd, isaa);
   688                             n = ServerSocketChannelImpl.accept0(fd, newfd, isaa);
   695                         } while (IOStatus.okayToRetry(n) && isOpen());
   689                         } while (IOStatus.okayToRetry(n) && isOpen());
   696                     }
   690                     }
   697                 }
   691                 }
   698             } finally {
   692             } finally {
   736     }
   730     }
   737 
   731 
   738     @Override
   732     @Override
   739     protected InputStream getInputStream() {
   733     protected InputStream getInputStream() {
   740         return new InputStream() {
   734         return new InputStream() {
   741             private volatile boolean eof;
   735             private volatile boolean eof;  // to emulate legacy SocketInputStream
   742             @Override
   736             @Override
   743             public int read() throws IOException {
   737             public int read() throws IOException {
   744                 byte[] a = new byte[1];
   738                 byte[] a = new byte[1];
   745                 int n = read(a, 0, 1);
   739                 int n = read(a, 0, 1);
   746                 return (n > 0) ? (a[0] & 0xff) : -1;
   740                 return (n > 0) ? (a[0] & 0xff) : -1;
   747             }
   741             }
   748             @Override
   742             @Override
   749             public int read(byte[] b, int off, int len) throws IOException {
   743             public int read(byte[] b, int off, int len) throws IOException {
   750                 Objects.checkFromIndexSize(off, len, b.length);
   744                 Objects.checkFromIndexSize(off, len, b.length);
   751                 if (eof) {
   745                 if (eof) {
   752                     return -1;
   746                     return -1; // return -1, even if socket is closed
   753                 } else if (len == 0) {
   747                 } else if (len == 0) {
   754                     return 0;
   748                     return 0;  // return 0, even if socket is closed
   755                 } else {
   749                 } else {
   756                     try {
   750                     try {
   757                         // read up to MAX_BUFFER_SIZE bytes
   751                         // read up to MAX_BUFFER_SIZE bytes
   758                         int size = Math.min(len, MAX_BUFFER_SIZE);
   752                         int size = Math.min(len, MAX_BUFFER_SIZE);
   759                         int n = NioSocketImpl.this.read(b, off, size);
   753                         int n = NioSocketImpl.this.read(b, off, size);
  1124         writeLock.lock();
  1118         writeLock.lock();
  1125         try {
  1119         try {
  1126             int n = 0;
  1120             int n = 0;
  1127             FileDescriptor fd = beginWrite();
  1121             FileDescriptor fd = beginWrite();
  1128             try {
  1122             try {
  1129                 maybeConfigureNonBlocking(fd, 0);
       
  1130                 do {
  1123                 do {
  1131                     n = Net.sendOOB(fd, (byte) data);
  1124                     n = Net.sendOOB(fd, (byte) data);
  1132                 } while (n == IOStatus.INTERRUPTED && isOpen());
  1125                 } while (n == IOStatus.INTERRUPTED && isOpen());
  1133                 if (n == IOStatus.UNAVAILABLE) {
  1126                 if (n == IOStatus.UNAVAILABLE) {
  1134                     throw new RuntimeException("not implemented yet");
  1127                     throw new RuntimeException("not implemented yet");