src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java
changeset 49001 ce06058197a4
parent 48761 74c1fa26435a
child 49141 ac95c7a76132
equal deleted inserted replaced
49000:a406a9c451a0 49001:ce06058197a4
    46 import java.nio.channels.SelectionKey;
    46 import java.nio.channels.SelectionKey;
    47 import java.nio.channels.SocketChannel;
    47 import java.nio.channels.SocketChannel;
    48 import java.nio.channels.spi.SelectorProvider;
    48 import java.nio.channels.spi.SelectorProvider;
    49 import java.util.Collections;
    49 import java.util.Collections;
    50 import java.util.HashSet;
    50 import java.util.HashSet;
       
    51 import java.util.Objects;
    51 import java.util.Set;
    52 import java.util.Set;
    52 import java.util.concurrent.locks.ReentrantLock;
    53 import java.util.concurrent.locks.ReentrantLock;
    53 
    54 
    54 import sun.net.NetHooks;
    55 import sun.net.NetHooks;
    55 import sun.net.ext.ExtendedSocketOptions;
    56 import sun.net.ext.ExtendedSocketOptions;
    60 
    61 
    61 class SocketChannelImpl
    62 class SocketChannelImpl
    62     extends SocketChannel
    63     extends SocketChannel
    63     implements SelChImpl
    64     implements SelChImpl
    64 {
    65 {
    65 
       
    66     // Used to make native read and write calls
    66     // Used to make native read and write calls
    67     private static NativeDispatcher nd;
    67     private static NativeDispatcher nd;
    68 
    68 
    69     // Our file descriptor object
    69     // Our file descriptor object
    70     private final FileDescriptor fd;
    70     private final FileDescriptor fd;
    71     private final int fdVal;
    71     private final int fdVal;
    72 
    72 
    73     // IDs of native threads doing reads and writes, for signalling
       
    74     private volatile long readerThread;
       
    75     private volatile long writerThread;
       
    76 
       
    77     // Lock held by current reading or connecting thread
    73     // Lock held by current reading or connecting thread
    78     private final ReentrantLock readLock = new ReentrantLock();
    74     private final ReentrantLock readLock = new ReentrantLock();
    79 
    75 
    80     // Lock held by current writing or connecting thread
    76     // Lock held by current writing or connecting thread
    81     private final ReentrantLock writeLock = new ReentrantLock();
    77     private final ReentrantLock writeLock = new ReentrantLock();
    82 
    78 
    83     // Lock held by any thread that modifies the state fields declared below
    79     // Lock held by any thread that modifies the state fields declared below
    84     // DO NOT invoke a blocking I/O operation while holding this lock!
    80     // DO NOT invoke a blocking I/O operation while holding this lock!
    85     private final Object stateLock = new Object();
    81     private final Object stateLock = new Object();
    86 
    82 
       
    83     // Input/Output closed
       
    84     private volatile boolean isInputClosed;
       
    85     private volatile boolean isOutputClosed;
       
    86 
    87     // -- The following fields are protected by stateLock
    87     // -- The following fields are protected by stateLock
    88 
    88 
    89     // set true when exclusive binding is on and SO_REUSEADDR is emulated
    89     // set true when exclusive binding is on and SO_REUSEADDR is emulated
    90     private boolean isReuseAddress;
    90     private boolean isReuseAddress;
    91 
    91 
    92     // State, increases monotonically
    92     // State, increases monotonically
    93     private static final int ST_UNINITIALIZED = -1;
       
    94     private static final int ST_UNCONNECTED = 0;
    93     private static final int ST_UNCONNECTED = 0;
    95     private static final int ST_PENDING = 1;
    94     private static final int ST_CONNECTIONPENDING = 1;
    96     private static final int ST_CONNECTED = 2;
    95     private static final int ST_CONNECTED = 2;
    97     private static final int ST_KILLPENDING = 3;
    96     private static final int ST_CLOSING = 3;
    98     private static final int ST_KILLED = 4;
    97     private static final int ST_KILLPENDING = 4;
    99     private int state = ST_UNINITIALIZED;
    98     private static final int ST_KILLED = 5;
       
    99     private int state;
       
   100 
       
   101     // IDs of native threads doing reads and writes, for signalling
       
   102     private long readerThread;
       
   103     private long writerThread;
   100 
   104 
   101     // Binding
   105     // Binding
   102     private InetSocketAddress localAddress;
   106     private InetSocketAddress localAddress;
   103     private InetSocketAddress remoteAddress;
   107     private InetSocketAddress remoteAddress;
   104 
       
   105     // Input/Output open
       
   106     private boolean isInputOpen = true;
       
   107     private boolean isOutputOpen = true;
       
   108 
   108 
   109     // Socket adaptor, created on demand
   109     // Socket adaptor, created on demand
   110     private Socket socket;
   110     private Socket socket;
   111 
   111 
   112     // -- End of fields protected by stateLock
   112     // -- End of fields protected by stateLock
   116     //
   116     //
   117     SocketChannelImpl(SelectorProvider sp) throws IOException {
   117     SocketChannelImpl(SelectorProvider sp) throws IOException {
   118         super(sp);
   118         super(sp);
   119         this.fd = Net.socket(true);
   119         this.fd = Net.socket(true);
   120         this.fdVal = IOUtil.fdVal(fd);
   120         this.fdVal = IOUtil.fdVal(fd);
   121         this.state = ST_UNCONNECTED;
   121     }
   122     }
   122 
   123 
   123     SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound)
   124     SocketChannelImpl(SelectorProvider sp,
       
   125                       FileDescriptor fd,
       
   126                       boolean bound)
       
   127         throws IOException
   124         throws IOException
   128     {
   125     {
   129         super(sp);
   126         super(sp);
   130         this.fd = fd;
   127         this.fd = fd;
   131         this.fdVal = IOUtil.fdVal(fd);
   128         this.fdVal = IOUtil.fdVal(fd);
   132         this.state = ST_UNCONNECTED;
   129         if (bound) {
   133         if (bound)
   130             synchronized (stateLock) {
   134             this.localAddress = Net.localAddress(fd);
   131                 this.localAddress = Net.localAddress(fd);
       
   132             }
       
   133         }
   135     }
   134     }
   136 
   135 
   137     // Constructor for sockets obtained from server sockets
   136     // Constructor for sockets obtained from server sockets
   138     //
   137     //
   139     SocketChannelImpl(SelectorProvider sp,
   138     SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, InetSocketAddress isa)
   140                       FileDescriptor fd, InetSocketAddress remote)
       
   141         throws IOException
   139         throws IOException
   142     {
   140     {
   143         super(sp);
   141         super(sp);
   144         this.fd = fd;
   142         this.fd = fd;
   145         this.fdVal = IOUtil.fdVal(fd);
   143         this.fdVal = IOUtil.fdVal(fd);
   146         this.state = ST_CONNECTED;
   144         synchronized (stateLock) {
   147         this.localAddress = Net.localAddress(fd);
   145             this.localAddress = Net.localAddress(fd);
   148         this.remoteAddress = remote;
   146             this.remoteAddress = isa;
   149     }
   147             this.state = ST_CONNECTED;
   150 
   148         }
       
   149     }
       
   150 
       
   151     // @throws ClosedChannelException if channel is closed
       
   152     private void ensureOpen() throws ClosedChannelException {
       
   153         if (!isOpen())
       
   154             throw new ClosedChannelException();
       
   155     }
       
   156 
       
   157     @Override
   151     public Socket socket() {
   158     public Socket socket() {
   152         synchronized (stateLock) {
   159         synchronized (stateLock) {
   153             if (socket == null)
   160             if (socket == null)
   154                 socket = SocketAdaptor.create(this);
   161                 socket = SocketAdaptor.create(this);
   155             return socket;
   162             return socket;
   157     }
   164     }
   158 
   165 
   159     @Override
   166     @Override
   160     public SocketAddress getLocalAddress() throws IOException {
   167     public SocketAddress getLocalAddress() throws IOException {
   161         synchronized (stateLock) {
   168         synchronized (stateLock) {
   162             if (!isOpen())
   169             ensureOpen();
   163                 throw new ClosedChannelException();
   170             return Net.getRevealedLocalAddress(localAddress);
   164             return  Net.getRevealedLocalAddress(localAddress);
       
   165         }
   171         }
   166     }
   172     }
   167 
   173 
   168     @Override
   174     @Override
   169     public SocketAddress getRemoteAddress() throws IOException {
   175     public SocketAddress getRemoteAddress() throws IOException {
   170         synchronized (stateLock) {
   176         synchronized (stateLock) {
   171             if (!isOpen())
   177             ensureOpen();
   172                 throw new ClosedChannelException();
       
   173             return remoteAddress;
   178             return remoteAddress;
   174         }
   179         }
   175     }
   180     }
   176 
   181 
   177     @Override
   182     @Override
   178     public <T> SocketChannel setOption(SocketOption<T> name, T value)
   183     public <T> SocketChannel setOption(SocketOption<T> name, T value)
   179         throws IOException
   184         throws IOException
   180     {
   185     {
   181         if (name == null)
   186         Objects.requireNonNull(name);
   182             throw new NullPointerException();
       
   183         if (!supportedOptions().contains(name))
   187         if (!supportedOptions().contains(name))
   184             throw new UnsupportedOperationException("'" + name + "' not supported");
   188             throw new UnsupportedOperationException("'" + name + "' not supported");
   185 
   189 
   186         synchronized (stateLock) {
   190         synchronized (stateLock) {
   187             if (!isOpen())
   191             ensureOpen();
   188                 throw new ClosedChannelException();
       
   189 
   192 
   190             if (name == StandardSocketOptions.IP_TOS) {
   193             if (name == StandardSocketOptions.IP_TOS) {
   191                 ProtocolFamily family = Net.isIPv6Available() ?
   194                 ProtocolFamily family = Net.isIPv6Available() ?
   192                     StandardProtocolFamily.INET6 : StandardProtocolFamily.INET;
   195                     StandardProtocolFamily.INET6 : StandardProtocolFamily.INET;
   193                 Net.setSocketOption(fd, family, name, value);
   196                 Net.setSocketOption(fd, family, name, value);
   209     @Override
   212     @Override
   210     @SuppressWarnings("unchecked")
   213     @SuppressWarnings("unchecked")
   211     public <T> T getOption(SocketOption<T> name)
   214     public <T> T getOption(SocketOption<T> name)
   212         throws IOException
   215         throws IOException
   213     {
   216     {
   214         if (name == null)
   217         Objects.requireNonNull(name);
   215             throw new NullPointerException();
       
   216         if (!supportedOptions().contains(name))
   218         if (!supportedOptions().contains(name))
   217             throw new UnsupportedOperationException("'" + name + "' not supported");
   219             throw new UnsupportedOperationException("'" + name + "' not supported");
   218 
   220 
   219         synchronized (stateLock) {
   221         synchronized (stateLock) {
   220             if (!isOpen())
   222             ensureOpen();
   221                 throw new ClosedChannelException();
   223 
   222 
   224             if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
   223             if (name == StandardSocketOptions.SO_REUSEADDR &&
       
   224                     Net.useExclusiveBind())
       
   225             {
       
   226                 // SO_REUSEADDR emulated when using exclusive bind
   225                 // SO_REUSEADDR emulated when using exclusive bind
   227                 return (T)Boolean.valueOf(isReuseAddress);
   226                 return (T)Boolean.valueOf(isReuseAddress);
   228             }
   227             }
   229 
   228 
   230             // special handling for IP_TOS: always return 0 when IPv6
   229             // special handling for IP_TOS: always return 0 when IPv6
   241 
   240 
   242     private static class DefaultOptionsHolder {
   241     private static class DefaultOptionsHolder {
   243         static final Set<SocketOption<?>> defaultOptions = defaultOptions();
   242         static final Set<SocketOption<?>> defaultOptions = defaultOptions();
   244 
   243 
   245         private static Set<SocketOption<?>> defaultOptions() {
   244         private static Set<SocketOption<?>> defaultOptions() {
   246             HashSet<SocketOption<?>> set = new HashSet<>(8);
   245             HashSet<SocketOption<?>> set = new HashSet<>();
   247             set.add(StandardSocketOptions.SO_SNDBUF);
   246             set.add(StandardSocketOptions.SO_SNDBUF);
   248             set.add(StandardSocketOptions.SO_RCVBUF);
   247             set.add(StandardSocketOptions.SO_RCVBUF);
   249             set.add(StandardSocketOptions.SO_KEEPALIVE);
   248             set.add(StandardSocketOptions.SO_KEEPALIVE);
   250             set.add(StandardSocketOptions.SO_REUSEADDR);
   249             set.add(StandardSocketOptions.SO_REUSEADDR);
   251             if (Net.isReusePortAvailable()) {
   250             if (Net.isReusePortAvailable()) {
   254             set.add(StandardSocketOptions.SO_LINGER);
   253             set.add(StandardSocketOptions.SO_LINGER);
   255             set.add(StandardSocketOptions.TCP_NODELAY);
   254             set.add(StandardSocketOptions.TCP_NODELAY);
   256             // additional options required by socket adaptor
   255             // additional options required by socket adaptor
   257             set.add(StandardSocketOptions.IP_TOS);
   256             set.add(StandardSocketOptions.IP_TOS);
   258             set.add(ExtendedSocketOption.SO_OOBINLINE);
   257             set.add(ExtendedSocketOption.SO_OOBINLINE);
   259             ExtendedSocketOptions extendedOptions =
   258             set.addAll(ExtendedSocketOptions.getInstance().options());
   260                     ExtendedSocketOptions.getInstance();
       
   261             set.addAll(extendedOptions.options());
       
   262             return Collections.unmodifiableSet(set);
   259             return Collections.unmodifiableSet(set);
   263         }
   260         }
   264     }
   261     }
   265 
   262 
   266     @Override
   263     @Override
   267     public final Set<SocketOption<?>> supportedOptions() {
   264     public final Set<SocketOption<?>> supportedOptions() {
   268         return DefaultOptionsHolder.defaultOptions;
   265         return DefaultOptionsHolder.defaultOptions;
   269     }
   266     }
   270 
   267 
   271     private boolean ensureReadOpen() throws ClosedChannelException {
   268     /**
   272         synchronized (stateLock) {
   269      * Marks the beginning of a read operation that might block.
   273             if (!isOpen())
   270      *
   274                 throw new ClosedChannelException();
   271      * @throws ClosedChannelException if the channel is closed
   275             if (!isConnected())
   272      * @throws NotYetConnectedException if the channel is not yet connected
       
   273      */
       
   274     private void beginRead(boolean blocking) throws ClosedChannelException {
       
   275         if (blocking) {
       
   276             // set hook for Thread.interrupt
       
   277             begin();
       
   278         }
       
   279         synchronized (stateLock) {
       
   280             ensureOpen();
       
   281             if (state != ST_CONNECTED)
   276                 throw new NotYetConnectedException();
   282                 throw new NotYetConnectedException();
   277             if (!isInputOpen)
   283             if (blocking)
   278                 return false;
   284                 readerThread = NativeThread.current();
   279             else
   285         }
   280                 return true;
   286     }
   281         }
   287 
   282     }
   288     /**
   283 
   289      * Marks the end of a read operation that may have blocked.
   284     private void ensureWriteOpen() throws ClosedChannelException {
   290      *
   285         synchronized (stateLock) {
   291      * @throws AsynchronousCloseException if the channel was closed due to this
   286             if (!isOpen())
   292      * thread being interrupted on a blocking read operation.
   287                 throw new ClosedChannelException();
   293      */
   288             if (!isOutputOpen)
   294     private void endRead(boolean blocking, boolean completed)
   289                 throw new ClosedChannelException();
   295         throws AsynchronousCloseException
   290             if (!isConnected())
   296     {
   291                 throw new NotYetConnectedException();
   297         if (blocking) {
   292         }
   298             synchronized (stateLock) {
   293     }
   299                 readerThread = 0;
   294 
   300                 // notify any thread waiting in implCloseSelectableChannel
   295     private void readerCleanup() throws IOException {
   301                 if (state == ST_CLOSING) {
   296         synchronized (stateLock) {
   302                     stateLock.notifyAll();
   297             readerThread = 0;
   303                 }
   298             if (state == ST_KILLPENDING)
   304             }
   299                 kill();
   305             // remove hook for Thread.interrupt
   300         }
   306             end(completed);
   301     }
   307         }
   302 
   308     }
   303     private void writerCleanup() throws IOException {
   309 
   304         synchronized (stateLock) {
   310     @Override
   305             writerThread = 0;
       
   306             if (state == ST_KILLPENDING)
       
   307                 kill();
       
   308         }
       
   309     }
       
   310 
       
   311     public int read(ByteBuffer buf) throws IOException {
   311     public int read(ByteBuffer buf) throws IOException {
   312 
   312         Objects.requireNonNull(buf);
   313         if (buf == null)
       
   314             throw new NullPointerException();
       
   315 
   313 
   316         readLock.lock();
   314         readLock.lock();
   317         try {
   315         try {
   318             if (!ensureReadOpen())
   316             boolean blocking = isBlocking();
   319                 return -1;
       
   320             int n = 0;
   317             int n = 0;
   321             try {
   318             try {
   322 
   319                 beginRead(blocking);
   323                 // Set up the interruption machinery; see
   320 
   324                 // AbstractInterruptibleChannel for details
   321                 // check if input is shutdown
   325                 //
   322                 if (isInputClosed)
   326                 begin();
   323                     return IOStatus.EOF;
   327 
   324 
   328                 synchronized (stateLock) {
   325                 if (blocking) {
   329                     if (!isOpen()) {
   326                     do {
   330                     // Either the current thread is already interrupted, so
   327                         n = IOUtil.read(fd, buf, -1, nd);
   331                     // begin() closed the channel, or another thread closed the
   328                     } while (n == IOStatus.INTERRUPTED && isOpen());
   332                     // channel since we checked it a few bytecodes ago.  In
   329                 } else {
   333                     // either case the value returned here is irrelevant since
       
   334                     // the invocation of end() in the finally block will throw
       
   335                     // an appropriate exception.
       
   336                     //
       
   337                         return 0;
       
   338 
       
   339                     }
       
   340 
       
   341                     // Save this thread so that it can be signalled on those
       
   342                     // platforms that require it
       
   343                     //
       
   344                     readerThread = NativeThread.current();
       
   345                 }
       
   346 
       
   347                 // Between the previous test of isOpen() and the return of the
       
   348                 // IOUtil.read invocation below, this channel might be closed
       
   349                 // or this thread might be interrupted.  We rely upon the
       
   350                 // implicit synchronization point in the kernel read() call to
       
   351                 // make sure that the right thing happens.  In either case the
       
   352                 // implCloseSelectableChannel method is ultimately invoked in
       
   353                 // some other thread, so there are three possibilities:
       
   354                 //
       
   355                 //   - implCloseSelectableChannel() invokes nd.preClose()
       
   356                 //     before this thread invokes read(), in which case the
       
   357                 //     read returns immediately with either EOF or an error,
       
   358                 //     the latter of which will cause an IOException to be
       
   359                 //     thrown.
       
   360                 //
       
   361                 //   - implCloseSelectableChannel() invokes nd.preClose() after
       
   362                 //     this thread is blocked in read().  On some operating
       
   363                 //     systems (e.g., Solaris and Windows) this causes the read
       
   364                 //     to return immediately with either EOF or an error
       
   365                 //     indication.
       
   366                 //
       
   367                 //   - implCloseSelectableChannel() invokes nd.preClose() after
       
   368                 //     this thread is blocked in read() but the operating
       
   369                 //     system (e.g., Linux) doesn't support preemptive close,
       
   370                 //     so implCloseSelectableChannel() proceeds to signal this
       
   371                 //     thread, thereby causing the read to return immediately
       
   372                 //     with IOStatus.INTERRUPTED.
       
   373                 //
       
   374                 // In all three cases the invocation of end() in the finally
       
   375                 // clause will notice that the channel has been closed and
       
   376                 // throw an appropriate exception (AsynchronousCloseException
       
   377                 // or ClosedByInterruptException) if necessary.
       
   378                 //
       
   379                 // *There is A fourth possibility. implCloseSelectableChannel()
       
   380                 // invokes nd.preClose(), signals reader/writer thred and quickly
       
   381                 // moves on to nd.close() in kill(), which does a real close.
       
   382                 // Then a third thread accepts a new connection, opens file or
       
   383                 // whatever that causes the released "fd" to be recycled. All
       
   384                 // above happens just between our last isOpen() check and the
       
   385                 // next kernel read reached, with the recycled "fd". The solution
       
   386                 // is to postpone the real kill() if there is a reader or/and
       
   387                 // writer thread(s) over there "waiting", leave the cleanup/kill
       
   388                 // to the reader or writer thread. (the preClose() still happens
       
   389                 // so the connection gets cut off as usual).
       
   390                 //
       
   391                 // For socket channels there is the additional wrinkle that
       
   392                 // asynchronous shutdown works much like asynchronous close,
       
   393                 // except that the channel is shutdown rather than completely
       
   394                 // closed.  This is analogous to the first two cases above,
       
   395                 // except that the shutdown operation plays the role of
       
   396                 // nd.preClose().
       
   397                 for (;;) {
       
   398                     n = IOUtil.read(fd, buf, -1, nd);
   330                     n = IOUtil.read(fd, buf, -1, nd);
   399                     if ((n == IOStatus.INTERRUPTED) && isOpen()) {
   331                 }
   400                         // The system call was interrupted but the channel
       
   401                         // is still open, so retry
       
   402                         continue;
       
   403                     }
       
   404                     return IOStatus.normalize(n);
       
   405                 }
       
   406 
       
   407             } finally {
   332             } finally {
   408                 readerCleanup();        // Clear reader thread
   333                 endRead(blocking, n > 0);
   409                 // The end method, which is defined in our superclass
   334                 if (n <= 0 && isInputClosed)
   410                 // AbstractInterruptibleChannel, resets the interruption
   335                     return IOStatus.EOF;
   411                 // machinery.  If its argument is true then it returns
   336             }
   412                 // normally; otherwise it checks the interrupt and open state
   337             return IOStatus.normalize(n);
   413                 // of this channel and throws an appropriate exception if
       
   414                 // necessary.
       
   415                 //
       
   416                 // So, if we actually managed to do any I/O in the above try
       
   417                 // block then we pass true to the end method.  We also pass
       
   418                 // true if the channel was in non-blocking mode when the I/O
       
   419                 // operation was initiated but no data could be transferred;
       
   420                 // this prevents spurious exceptions from being thrown in the
       
   421                 // rare event that a channel is closed or a thread is
       
   422                 // interrupted at the exact moment that a non-blocking I/O
       
   423                 // request is made.
       
   424                 //
       
   425                 end(n > 0 || (n == IOStatus.UNAVAILABLE));
       
   426 
       
   427                 // Extra case for socket channels: Asynchronous shutdown
       
   428                 //
       
   429                 synchronized (stateLock) {
       
   430                     if ((n <= 0) && (!isInputOpen))
       
   431                         return IOStatus.EOF;
       
   432                 }
       
   433 
       
   434                 assert IOStatus.check(n);
       
   435 
       
   436             }
       
   437         } finally {
   338         } finally {
   438             readLock.unlock();
   339             readLock.unlock();
   439         }
   340         }
   440     }
   341     }
   441 
   342 
       
   343     @Override
   442     public long read(ByteBuffer[] dsts, int offset, int length)
   344     public long read(ByteBuffer[] dsts, int offset, int length)
   443         throws IOException
   345         throws IOException
   444     {
   346     {
   445         if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
   347         Objects.checkFromIndexSize(offset, length, dsts.length);
   446             throw new IndexOutOfBoundsException();
   348 
   447         readLock.lock();
   349         readLock.lock();
   448         try {
   350         try {
   449             if (!ensureReadOpen())
   351             boolean blocking = isBlocking();
   450                 return -1;
       
   451             long n = 0;
   352             long n = 0;
   452             try {
   353             try {
   453                 begin();
   354                 beginRead(blocking);
   454                 synchronized (stateLock) {
   355 
   455                     if (!isOpen())
   356                 // check if input is shutdown
   456                         return 0;
   357                 if (isInputClosed)
   457                     readerThread = NativeThread.current();
   358                     return IOStatus.EOF;
   458                 }
   359 
   459 
   360                 if (blocking) {
   460                 for (;;) {
   361                     do {
       
   362                         n = IOUtil.read(fd, dsts, offset, length, nd);
       
   363                     } while (n == IOStatus.INTERRUPTED && isOpen());
       
   364                 } else {
   461                     n = IOUtil.read(fd, dsts, offset, length, nd);
   365                     n = IOUtil.read(fd, dsts, offset, length, nd);
   462                     if ((n == IOStatus.INTERRUPTED) && isOpen())
       
   463                         continue;
       
   464                     return IOStatus.normalize(n);
       
   465                 }
   366                 }
   466             } finally {
   367             } finally {
   467                 readerCleanup();
   368                 endRead(blocking, n > 0);
   468                 end(n > 0 || (n == IOStatus.UNAVAILABLE));
   369                 if (n <= 0 && isInputClosed)
   469                 synchronized (stateLock) {
   370                     return IOStatus.EOF;
   470                     if ((n <= 0) && (!isInputOpen))
   371             }
   471                         return IOStatus.EOF;
   372             return IOStatus.normalize(n);
   472                 }
       
   473                 assert IOStatus.check(n);
       
   474             }
       
   475         } finally {
   373         } finally {
   476             readLock.unlock();
   374             readLock.unlock();
   477         }
   375         }
   478     }
   376     }
   479 
   377 
       
   378     /**
       
   379      * Marks the beginning of a write operation that might block.
       
   380      *
       
   381      * @throws ClosedChannelException if the channel is closed or output shutdown
       
   382      * @throws NotYetConnectedException if the channel is not yet connected
       
   383      */
       
   384     private void beginWrite(boolean blocking) throws ClosedChannelException {
       
   385         if (blocking) {
       
   386             // set hook for Thread.interrupt
       
   387             begin();
       
   388         }
       
   389         synchronized (stateLock) {
       
   390             ensureOpen();
       
   391             if (isOutputClosed)
       
   392                 throw new ClosedChannelException();
       
   393             if (state != ST_CONNECTED)
       
   394                 throw new NotYetConnectedException();
       
   395             if (blocking)
       
   396                 writerThread = NativeThread.current();
       
   397         }
       
   398     }
       
   399 
       
   400     /**
       
   401      * Marks the end of a write operation that may have blocked.
       
   402      *
       
   403      * @throws AsynchronousCloseException if the channel was closed due to this
       
   404      * thread being interrupted on a blocking write operation.
       
   405      */
       
   406     private void endWrite(boolean blocking, boolean completed)
       
   407         throws AsynchronousCloseException
       
   408     {
       
   409         if (blocking) {
       
   410             synchronized (stateLock) {
       
   411                 writerThread = 0;
       
   412                 // notify any thread waiting in implCloseSelectableChannel
       
   413                 if (state == ST_CLOSING) {
       
   414                     stateLock.notifyAll();
       
   415                 }
       
   416             }
       
   417             // remove hook for Thread.interrupt
       
   418             end(completed);
       
   419         }
       
   420     }
       
   421 
       
   422     @Override
   480     public int write(ByteBuffer buf) throws IOException {
   423     public int write(ByteBuffer buf) throws IOException {
   481         if (buf == null)
   424         Objects.requireNonNull(buf);
   482             throw new NullPointerException();
   425 
   483         writeLock.lock();
   426         writeLock.lock();
   484         try {
   427         try {
   485             ensureWriteOpen();
   428             boolean blocking = isBlocking();
   486             int n = 0;
   429             int n = 0;
   487             try {
   430             try {
   488                 begin();
   431                 beginWrite(blocking);
   489                 synchronized (stateLock) {
   432                 if (blocking) {
   490                     if (!isOpen())
   433                     do {
   491                         return 0;
   434                         n = IOUtil.write(fd, buf, -1, nd);
   492                     writerThread = NativeThread.current();
   435                     } while (n == IOStatus.INTERRUPTED && isOpen());
   493                 }
   436                 } else {
   494                 for (;;) {
       
   495                     n = IOUtil.write(fd, buf, -1, nd);
   437                     n = IOUtil.write(fd, buf, -1, nd);
   496                     if ((n == IOStatus.INTERRUPTED) && isOpen())
       
   497                         continue;
       
   498                     return IOStatus.normalize(n);
       
   499                 }
   438                 }
   500             } finally {
   439             } finally {
   501                 writerCleanup();
   440                 endWrite(blocking, n > 0);
   502                 end(n > 0 || (n == IOStatus.UNAVAILABLE));
   441                 if (n <= 0 && isOutputClosed)
   503                 synchronized (stateLock) {
   442                     throw new AsynchronousCloseException();
   504                     if ((n <= 0) && (!isOutputOpen))
   443             }
   505                         throw new AsynchronousCloseException();
   444             return IOStatus.normalize(n);
   506                 }
       
   507                 assert IOStatus.check(n);
       
   508             }
       
   509         } finally {
   445         } finally {
   510             writeLock.unlock();
   446             writeLock.unlock();
   511         }
   447         }
   512     }
   448     }
   513 
   449 
       
   450     @Override
   514     public long write(ByteBuffer[] srcs, int offset, int length)
   451     public long write(ByteBuffer[] srcs, int offset, int length)
   515         throws IOException
   452         throws IOException
   516     {
   453     {
   517         if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
   454         Objects.checkFromIndexSize(offset, length, srcs.length);
   518             throw new IndexOutOfBoundsException();
   455 
   519         writeLock.lock();
   456         writeLock.lock();
   520         try {
   457         try {
   521             ensureWriteOpen();
   458             boolean blocking = isBlocking();
   522             long n = 0;
   459             long n = 0;
   523             try {
   460             try {
   524                 begin();
   461                 beginWrite(blocking);
   525                 synchronized (stateLock) {
   462                 if (blocking) {
   526                     if (!isOpen())
   463                     do {
   527                         return 0;
   464                         n = IOUtil.write(fd, srcs, offset, length, nd);
   528                     writerThread = NativeThread.current();
   465                     } while (n == IOStatus.INTERRUPTED && isOpen());
   529                 }
   466                 } else {
   530                 for (;;) {
       
   531                     n = IOUtil.write(fd, srcs, offset, length, nd);
   467                     n = IOUtil.write(fd, srcs, offset, length, nd);
   532                     if ((n == IOStatus.INTERRUPTED) && isOpen())
       
   533                         continue;
       
   534                     return IOStatus.normalize(n);
       
   535                 }
   468                 }
   536             } finally {
   469             } finally {
   537                 writerCleanup();
   470                 endWrite(blocking, n > 0);
   538                 end((n > 0) || (n == IOStatus.UNAVAILABLE));
   471                 if (n <= 0 && isOutputClosed)
   539                 synchronized (stateLock) {
   472                     throw new AsynchronousCloseException();
   540                     if ((n <= 0) && (!isOutputOpen))
   473             }
   541                         throw new AsynchronousCloseException();
   474             return IOStatus.normalize(n);
   542                 }
       
   543                 assert IOStatus.check(n);
       
   544             }
       
   545         } finally {
   475         } finally {
   546             writeLock.unlock();
   476             writeLock.unlock();
   547         }
   477         }
   548     }
   478     }
   549 
   479 
   550     // package-private
   480     /**
       
   481      * Writes a byte of out of band data.
       
   482      */
   551     int sendOutOfBandData(byte b) throws IOException {
   483     int sendOutOfBandData(byte b) throws IOException {
   552         writeLock.lock();
   484         writeLock.lock();
   553         try {
   485         try {
   554             ensureWriteOpen();
   486             boolean blocking = isBlocking();
   555             int n = 0;
   487             int n = 0;
   556             try {
   488             try {
   557                 begin();
   489                 beginWrite(blocking);
   558                 synchronized (stateLock) {
   490                 if (blocking) {
   559                     if (!isOpen())
   491                     do {
   560                         return 0;
   492                         n = sendOutOfBandData(fd, b);
   561                     writerThread = NativeThread.current();
   493                     } while (n == IOStatus.INTERRUPTED && isOpen());
   562                 }
   494                 } else {
   563                 for (;;) {
       
   564                     n = sendOutOfBandData(fd, b);
   495                     n = sendOutOfBandData(fd, b);
   565                     if ((n == IOStatus.INTERRUPTED) && isOpen())
       
   566                         continue;
       
   567                     return IOStatus.normalize(n);
       
   568                 }
   496                 }
   569             } finally {
   497             } finally {
   570                 writerCleanup();
   498                 endWrite(blocking, n > 0);
   571                 end((n > 0) || (n == IOStatus.UNAVAILABLE));
   499                 if (n <= 0 && isOutputClosed)
   572                 synchronized (stateLock) {
   500                     throw new AsynchronousCloseException();
   573                     if ((n <= 0) && (!isOutputOpen))
   501             }
   574                         throw new AsynchronousCloseException();
   502             return IOStatus.normalize(n);
   575                 }
       
   576                 assert IOStatus.check(n);
       
   577             }
       
   578         } finally {
   503         } finally {
   579             writeLock.unlock();
   504             writeLock.unlock();
   580         }
   505         }
   581     }
   506     }
   582 
   507 
       
   508     @Override
   583     protected void implConfigureBlocking(boolean block) throws IOException {
   509     protected void implConfigureBlocking(boolean block) throws IOException {
   584         IOUtil.configureBlocking(fd, block);
   510         readLock.lock();
   585     }
   511         try {
   586 
   512             writeLock.lock();
   587     public InetSocketAddress localAddress() {
   513             try {
       
   514                 synchronized (stateLock) {
       
   515                     ensureOpen();
       
   516                     IOUtil.configureBlocking(fd, block);
       
   517                 }
       
   518             } finally {
       
   519                 writeLock.unlock();
       
   520             }
       
   521         } finally {
       
   522             readLock.unlock();
       
   523         }
       
   524     }
       
   525 
       
   526     /**
       
   527      * Returns the local address, or null if not bound
       
   528      */
       
   529     InetSocketAddress localAddress() {
   588         synchronized (stateLock) {
   530         synchronized (stateLock) {
   589             return localAddress;
   531             return localAddress;
   590         }
   532         }
   591     }
   533     }
   592 
   534 
   593     public SocketAddress remoteAddress() {
   535     /**
       
   536      * Returns the remote address, or null if not connected
       
   537      */
       
   538     InetSocketAddress remoteAddress() {
   594         synchronized (stateLock) {
   539         synchronized (stateLock) {
   595             return remoteAddress;
   540             return remoteAddress;
   596         }
   541         }
   597     }
   542     }
   598 
   543 
   601         readLock.lock();
   546         readLock.lock();
   602         try {
   547         try {
   603             writeLock.lock();
   548             writeLock.lock();
   604             try {
   549             try {
   605                 synchronized (stateLock) {
   550                 synchronized (stateLock) {
   606                     if (!isOpen())
   551                     ensureOpen();
   607                         throw new ClosedChannelException();
   552                     if (state == ST_CONNECTIONPENDING)
   608                     if (state == ST_PENDING)
       
   609                         throw new ConnectionPendingException();
   553                         throw new ConnectionPendingException();
   610                     if (localAddress != null)
   554                     if (localAddress != null)
   611                         throw new AlreadyBoundException();
   555                         throw new AlreadyBoundException();
   612                     InetSocketAddress isa = (local == null) ?
   556                     InetSocketAddress isa = (local == null) ?
   613                         new InetSocketAddress(0) : Net.checkAddress(local);
   557                         new InetSocketAddress(0) : Net.checkAddress(local);
   626             readLock.unlock();
   570             readLock.unlock();
   627         }
   571         }
   628         return this;
   572         return this;
   629     }
   573     }
   630 
   574 
       
   575     @Override
   631     public boolean isConnected() {
   576     public boolean isConnected() {
   632         synchronized (stateLock) {
   577         synchronized (stateLock) {
   633             return (state == ST_CONNECTED);
   578             return (state == ST_CONNECTED);
   634         }
   579         }
   635     }
   580     }
   636 
   581 
       
   582     @Override
   637     public boolean isConnectionPending() {
   583     public boolean isConnectionPending() {
   638         synchronized (stateLock) {
   584         synchronized (stateLock) {
   639             return (state == ST_PENDING);
   585             return (state == ST_CONNECTIONPENDING);
   640         }
   586         }
   641     }
   587     }
   642 
   588 
   643     void ensureOpenAndUnconnected() throws IOException { // package-private
   589     /**
   644         synchronized (stateLock) {
   590      * Marks the beginning of a connect operation that might block.
   645             if (!isOpen())
   591      *
   646                 throw new ClosedChannelException();
   592      * @throws ClosedChannelException if the channel is closed
       
   593      * @throws AlreadyConnectedException if already connected
       
   594      * @throws ConnectionPendingException is a connection is pending
       
   595      */
       
   596     private void beginConnect(boolean blocking) throws ClosedChannelException {
       
   597         if (blocking) {
       
   598             // set hook for Thread.interrupt
       
   599             begin();
       
   600         }
       
   601         synchronized (stateLock) {
       
   602             ensureOpen();
   647             if (state == ST_CONNECTED)
   603             if (state == ST_CONNECTED)
   648                 throw new AlreadyConnectedException();
   604                 throw new AlreadyConnectedException();
   649             if (state == ST_PENDING)
   605             if (state == ST_CONNECTIONPENDING)
   650                 throw new ConnectionPendingException();
   606                 throw new ConnectionPendingException();
   651         }
   607             if (blocking)
   652     }
   608                 readerThread = NativeThread.current();
   653 
   609         }
       
   610     }
       
   611 
       
   612     /**
       
   613      * Marks the end of a connect operation that may have blocked.
       
   614      *
       
   615      * @throws AsynchronousCloseException if the channel was closed due to this
       
   616      * thread being interrupted on a blocking connect operation.
       
   617      */
       
   618     private void endConnect(boolean blocking, boolean completed)
       
   619         throws AsynchronousCloseException
       
   620     {
       
   621         endRead(blocking, completed);
       
   622     }
       
   623 
       
   624     @Override
   654     public boolean connect(SocketAddress sa) throws IOException {
   625     public boolean connect(SocketAddress sa) throws IOException {
       
   626         InetSocketAddress isa = Net.checkAddress(sa);
       
   627         SecurityManager sm = System.getSecurityManager();
       
   628         if (sm != null)
       
   629             sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
       
   630 
   655         readLock.lock();
   631         readLock.lock();
   656         try {
   632         try {
   657             writeLock.lock();
   633             writeLock.lock();
   658             try {
   634             try {
   659                 ensureOpenAndUnconnected();
   635                 // notify before-connect hook
   660                 InetSocketAddress isa = Net.checkAddress(sa);
   636                 synchronized (stateLock) {
   661                 SecurityManager sm = System.getSecurityManager();
   637                     if (state == ST_UNCONNECTED && localAddress == null) {
   662                 if (sm != null)
   638                         NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort());
   663                     sm.checkConnect(isa.getAddress().getHostAddress(),
   639                     }
   664                             isa.getPort());
   640                 }
   665                 synchronized (blockingLock()) {
   641 
   666                     int n = 0;
   642                 InetAddress ia = isa.getAddress();
       
   643                 if (ia.isAnyLocalAddress())
       
   644                     ia = InetAddress.getLocalHost();
       
   645 
       
   646                 int n = 0;
       
   647                 boolean blocking = isBlocking();
       
   648                 try {
   667                     try {
   649                     try {
   668                         try {
   650                         beginConnect(blocking);
   669                             begin();
   651                         if (blocking) {
   670                             synchronized (stateLock) {
   652                             do {
   671                                 if (!isOpen()) {
   653                                 n = Net.connect(fd, ia, isa.getPort());
   672                                     return false;
   654                             } while (n == IOStatus.INTERRUPTED && isOpen());
   673                                 }
   655                         } else {
   674                                 // notify hook only if unbound
   656                             n = Net.connect(fd, ia, isa.getPort());
   675                                 if (localAddress == null) {
       
   676                                     NetHooks.beforeTcpConnect(fd,
       
   677                                             isa.getAddress(),
       
   678                                             isa.getPort());
       
   679                                 }
       
   680                                 readerThread = NativeThread.current();
       
   681                             }
       
   682                             for (;;) {
       
   683                                 InetAddress ia = isa.getAddress();
       
   684                                 if (ia.isAnyLocalAddress())
       
   685                                     ia = InetAddress.getLocalHost();
       
   686                                 n = Net.connect(fd,
       
   687                                         ia,
       
   688                                         isa.getPort());
       
   689                                 if ((n == IOStatus.INTERRUPTED) && isOpen())
       
   690                                     continue;
       
   691                                 break;
       
   692                             }
       
   693 
       
   694                         } finally {
       
   695                             readerCleanup();
       
   696                             end((n > 0) || (n == IOStatus.UNAVAILABLE));
       
   697                             assert IOStatus.check(n);
       
   698                         }
   657                         }
   699                     } catch (IOException x) {
   658                     } finally {
   700                         // If an exception was thrown, close the channel after
   659                         endConnect(blocking, n > 0);
   701                         // invoking end() so as to avoid bogus
       
   702                         // AsynchronousCloseExceptions
       
   703                         close();
       
   704                         throw x;
       
   705                     }
   660                     }
   706                     synchronized (stateLock) {
   661                 } catch (IOException x) {
   707                         remoteAddress = isa;
   662                     // connect failed, close socket
   708                         if (n > 0) {
   663                     close();
   709 
   664                     throw x;
   710                             // Connection succeeded; disallow further
   665                 }
   711                             // invocation
   666 
   712                             state = ST_CONNECTED;
   667                 // connection may be established
   713                             if (isOpen())
   668                 synchronized (stateLock) {
   714                                 localAddress = Net.localAddress(fd);
   669                     if (!isOpen())
   715                             return true;
   670                         throw new AsynchronousCloseException();
   716                         }
   671                     remoteAddress = isa;
   717                         // If nonblocking and no exception then connection
   672                     if (n > 0) {
   718                         // pending; disallow another invocation
   673                         // connected established
   719                         if (!isBlocking())
   674                         localAddress = Net.localAddress(fd);
   720                             state = ST_PENDING;
   675                         state = ST_CONNECTED;
   721                         else
   676                         return true;
   722                             assert false;
   677                     } else {
       
   678                         // connection pending
       
   679                         assert !blocking;
       
   680                         state = ST_CONNECTIONPENDING;
       
   681                         return false;
   723                     }
   682                     }
   724                 }
   683                 }
   725                 return false;
       
   726             } finally {
   684             } finally {
   727                 writeLock.unlock();
   685                 writeLock.unlock();
   728             }
   686             }
   729         } finally {
   687         } finally {
   730             readLock.unlock();
   688             readLock.unlock();
   731         }
   689         }
   732     }
   690     }
   733 
   691 
       
   692     /**
       
   693      * Marks the beginning of a finishConnect operation that might block.
       
   694      *
       
   695      * @throws ClosedChannelException if the channel is closed
       
   696      * @throws NoConnectionPendingException if no connection is pending
       
   697      */
       
   698     private void beginFinishConnect(boolean blocking) throws ClosedChannelException {
       
   699         if (blocking) {
       
   700             // set hook for Thread.interrupt
       
   701             begin();
       
   702         }
       
   703         synchronized (stateLock) {
       
   704             ensureOpen();
       
   705             if (state != ST_CONNECTIONPENDING)
       
   706                 throw new NoConnectionPendingException();
       
   707             if (blocking)
       
   708                 readerThread = NativeThread.current();
       
   709         }
       
   710     }
       
   711 
       
   712     /**
       
   713      * Marks the end of a finishConnect operation that may have blocked.
       
   714      *
       
   715      * @throws AsynchronousCloseException if the channel was closed due to this
       
   716      * thread being interrupted on a blocking connect operation.
       
   717      */
       
   718     private void endFinishConnect(boolean blocking, boolean completed)
       
   719         throws AsynchronousCloseException
       
   720     {
       
   721         endRead(blocking, completed);
       
   722     }
       
   723 
       
   724     @Override
   734     public boolean finishConnect() throws IOException {
   725     public boolean finishConnect() throws IOException {
   735         readLock.lock();
   726         readLock.lock();
   736         try {
   727         try {
   737             writeLock.lock();
   728             writeLock.lock();
   738             try {
   729             try {
       
   730                 // already connected?
       
   731                 synchronized (stateLock) {
       
   732                     if (state == ST_CONNECTED)
       
   733                         return true;
       
   734                 }
       
   735 
       
   736                 int n = 0;
       
   737                 boolean blocking = isBlocking();
       
   738                 try {
       
   739                     try {
       
   740                         beginFinishConnect(blocking);
       
   741                         if (blocking) {
       
   742                             do {
       
   743                                 n = checkConnect(fd, true);
       
   744                             } while (n == 0 || (n == IOStatus.INTERRUPTED) && isOpen());
       
   745                         } else {
       
   746                             n = checkConnect(fd, false);
       
   747                         }
       
   748                     } finally {
       
   749                         endFinishConnect(blocking, n > 0);
       
   750                     }
       
   751                 } catch (IOException x) {
       
   752                     close();
       
   753                     throw x;
       
   754                 }
       
   755 
       
   756                 // post finishConnect, connection may be established
   739                 synchronized (stateLock) {
   757                 synchronized (stateLock) {
   740                     if (!isOpen())
   758                     if (!isOpen())
   741                         throw new ClosedChannelException();
   759                         throw new AsynchronousCloseException();
   742                     if (state == ST_CONNECTED)
   760                     if (n > 0) {
       
   761                         // connection established
       
   762                         localAddress = Net.localAddress(fd);
       
   763                         state = ST_CONNECTED;
   743                         return true;
   764                         return true;
   744                     if (state != ST_PENDING)
   765                     } else {
   745                         throw new NoConnectionPendingException();
   766                         // connection still pending
   746                 }
   767                         assert !blocking;
   747                 int n = 0;
   768                         return false;
   748                 try {
       
   749                     try {
       
   750                         begin();
       
   751                         synchronized (blockingLock()) {
       
   752                             synchronized (stateLock) {
       
   753                                 if (!isOpen()) {
       
   754                                     return false;
       
   755                                 }
       
   756                                 readerThread = NativeThread.current();
       
   757                             }
       
   758                             if (!isBlocking()) {
       
   759                                 for (;;) {
       
   760                                     n = checkConnect(fd, false);
       
   761                                     if ((n == IOStatus.INTERRUPTED) && isOpen())
       
   762                                         continue;
       
   763                                     break;
       
   764                                 }
       
   765                             } else {
       
   766                                 for (;;) {
       
   767                                     n = checkConnect(fd, true);
       
   768                                     if (n == 0) {
       
   769                                         // Loop in case of
       
   770                                         // spurious notifications
       
   771                                         continue;
       
   772                                     }
       
   773                                     if ((n == IOStatus.INTERRUPTED) && isOpen())
       
   774                                         continue;
       
   775                                     break;
       
   776                                 }
       
   777                             }
       
   778                         }
       
   779                     } finally {
       
   780                         synchronized (stateLock) {
       
   781                             readerThread = 0;
       
   782                             if (state == ST_KILLPENDING) {
       
   783                                 kill();
       
   784                                 // poll()/getsockopt() does not report
       
   785                                 // error (throws exception, with n = 0)
       
   786                                 // on Linux platform after dup2 and
       
   787                                 // signal-wakeup. Force n to 0 so the
       
   788                                 // end() can throw appropriate exception
       
   789                                 n = 0;
       
   790                             }
       
   791                         }
       
   792                         end((n > 0) || (n == IOStatus.UNAVAILABLE));
       
   793                         assert IOStatus.check(n);
       
   794                     }
   769                     }
   795                 } catch (IOException x) {
   770                 }
   796                     // If an exception was thrown, close the channel after
       
   797                     // invoking end() so as to avoid bogus
       
   798                     // AsynchronousCloseExceptions
       
   799                     close();
       
   800                     throw x;
       
   801                 }
       
   802                 if (n > 0) {
       
   803                     synchronized (stateLock) {
       
   804                         state = ST_CONNECTED;
       
   805                         if (isOpen())
       
   806                             localAddress = Net.localAddress(fd);
       
   807                     }
       
   808                     return true;
       
   809                 }
       
   810                 return false;
       
   811             } finally {
   771             } finally {
   812                 writeLock.unlock();
   772                 writeLock.unlock();
   813             }
   773             }
   814         } finally {
   774         } finally {
   815             readLock.unlock();
   775             readLock.unlock();
   816         }
   776         }
   817     }
   777     }
   818 
   778 
       
   779     /**
       
   780      * Invoked by implCloseChannel to close the channel.
       
   781      *
       
   782      * This method waits for outstanding I/O operations to complete. When in
       
   783      * blocking mode, the socket is pre-closed and the threads in blocking I/O
       
   784      * operations are signalled to ensure that the outstanding I/O operations
       
   785      * complete quickly.
       
   786      *
       
   787      * If the socket is connected then it is shutdown by this method. The
       
   788      * shutdown ensures that the peer reads EOF for the case that the socket is
       
   789      * not pre-closed or closed by this method.
       
   790      *
       
   791      * The socket is closed by this method when it is not registered with a
       
   792      * Selector. Note that a channel configured blocking may be registered with
       
   793      * a Selector. This arises when a key is canceled and the channel configured
       
   794      * to blocking mode before the key is flushed from the Selector.
       
   795      */
       
   796     @Override
       
   797     protected void implCloseSelectableChannel() throws IOException {
       
   798         assert !isOpen();
       
   799 
       
   800         boolean blocking;
       
   801         boolean connected;
       
   802         boolean interrupted = false;
       
   803 
       
   804         // set state to ST_CLOSING
       
   805         synchronized (stateLock) {
       
   806             assert state < ST_CLOSING;
       
   807             blocking = isBlocking();
       
   808             connected = (state == ST_CONNECTED);
       
   809             state = ST_CLOSING;
       
   810         }
       
   811 
       
   812         // wait for any outstanding I/O operations to complete
       
   813         if (blocking) {
       
   814             synchronized (stateLock) {
       
   815                 assert state == ST_CLOSING;
       
   816                 long reader = readerThread;
       
   817                 long writer = writerThread;
       
   818                 if (reader != 0 || writer != 0) {
       
   819                     nd.preClose(fd);
       
   820                     connected = false; // fd is no longer connected socket
       
   821 
       
   822                     if (reader != 0)
       
   823                         NativeThread.signal(reader);
       
   824                     if (writer != 0)
       
   825                         NativeThread.signal(writer);
       
   826 
       
   827                     // wait for blocking I/O operations to end
       
   828                     while (readerThread != 0 || writerThread != 0) {
       
   829                         try {
       
   830                             stateLock.wait();
       
   831                         } catch (InterruptedException e) {
       
   832                             interrupted = true;
       
   833                         }
       
   834                     }
       
   835                 }
       
   836             }
       
   837         } else {
       
   838             // non-blocking mode: wait for read/write to complete
       
   839             readLock.lock();
       
   840             try {
       
   841                 writeLock.lock();
       
   842                 writeLock.unlock();
       
   843             } finally {
       
   844                 readLock.unlock();
       
   845             }
       
   846         }
       
   847 
       
   848         // set state to ST_KILLPENDING
       
   849         synchronized (stateLock) {
       
   850             assert state == ST_CLOSING;
       
   851             // if connected, and the channel is registered with a Selector, we
       
   852             // shutdown the output so that the peer reads EOF
       
   853             if (connected && isRegistered()) {
       
   854                 try {
       
   855                     Net.shutdown(fd, Net.SHUT_WR);
       
   856                 } catch (IOException ignore) { }
       
   857             }
       
   858             state = ST_KILLPENDING;
       
   859         }
       
   860 
       
   861         // close socket if not registered with Selector
       
   862         if (!isRegistered())
       
   863             kill();
       
   864 
       
   865         // restore interrupt status
       
   866         if (interrupted)
       
   867             Thread.currentThread().interrupt();
       
   868     }
       
   869 
       
   870     @Override
       
   871     public void kill() throws IOException {
       
   872         synchronized (stateLock) {
       
   873             if (state == ST_KILLPENDING) {
       
   874                 state = ST_KILLED;
       
   875                 nd.close(fd);
       
   876             }
       
   877         }
       
   878     }
       
   879 
   819     @Override
   880     @Override
   820     public SocketChannel shutdownInput() throws IOException {
   881     public SocketChannel shutdownInput() throws IOException {
   821         synchronized (stateLock) {
   882         synchronized (stateLock) {
   822             if (!isOpen())
   883             ensureOpen();
   823                 throw new ClosedChannelException();
       
   824             if (!isConnected())
   884             if (!isConnected())
   825                 throw new NotYetConnectedException();
   885                 throw new NotYetConnectedException();
   826             if (isInputOpen) {
   886             if (!isInputClosed) {
   827                 Net.shutdown(fd, Net.SHUT_RD);
   887                 Net.shutdown(fd, Net.SHUT_RD);
   828                 if (readerThread != 0)
   888                 long thread = readerThread;
   829                     NativeThread.signal(readerThread);
   889                 if (thread != 0)
   830                 isInputOpen = false;
   890                     NativeThread.signal(thread);
       
   891                 isInputClosed = true;
   831             }
   892             }
   832             return this;
   893             return this;
   833         }
   894         }
   834     }
   895     }
   835 
   896 
   836     @Override
   897     @Override
   837     public SocketChannel shutdownOutput() throws IOException {
   898     public SocketChannel shutdownOutput() throws IOException {
   838         synchronized (stateLock) {
   899         synchronized (stateLock) {
   839             if (!isOpen())
   900             ensureOpen();
   840                 throw new ClosedChannelException();
       
   841             if (!isConnected())
   901             if (!isConnected())
   842                 throw new NotYetConnectedException();
   902                 throw new NotYetConnectedException();
   843             if (isOutputOpen) {
   903             if (!isOutputClosed) {
   844                 Net.shutdown(fd, Net.SHUT_WR);
   904                 Net.shutdown(fd, Net.SHUT_WR);
   845                 if (writerThread != 0)
   905                 long thread = writerThread;
   846                     NativeThread.signal(writerThread);
   906                 if (thread != 0)
   847                 isOutputOpen = false;
   907                     NativeThread.signal(thread);
       
   908                 isOutputClosed = true;
   848             }
   909             }
   849             return this;
   910             return this;
   850         }
   911         }
   851     }
   912     }
   852 
   913 
   853     public boolean isInputOpen() {
   914     boolean isInputOpen() {
   854         synchronized (stateLock) {
   915         return !isInputClosed;
   855             return isInputOpen;
   916     }
   856         }
   917 
   857     }
   918     boolean isOutputOpen() {
   858 
   919         return !isOutputClosed;
   859     public boolean isOutputOpen() {
   920     }
   860         synchronized (stateLock) {
   921 
   861             return isOutputOpen;
   922     /**
   862         }
   923      * Poll this channel's socket for reading up to the given timeout.
   863     }
   924      * @return {@code true} if the socket is polled
   864 
   925      */
   865     // AbstractInterruptibleChannel synchronizes invocations of this method
   926     boolean pollRead(long timeout) throws IOException {
   866     // using AbstractInterruptibleChannel.closeLock, and also ensures that this
   927         boolean blocking = isBlocking();
   867     // method is only ever invoked once.  Before we get to this method, isOpen
   928         assert Thread.holdsLock(blockingLock()) && blocking;
   868     // (which is volatile) will have been set to false.
   929 
   869     //
   930         readLock.lock();
   870     protected void implCloseSelectableChannel() throws IOException {
   931         try {
   871         synchronized (stateLock) {
   932             boolean polled = false;
   872             isInputOpen = false;
   933             try {
   873             isOutputOpen = false;
   934                 beginRead(blocking);
   874 
   935                 int n = Net.poll(fd, Net.POLLIN, timeout);
   875             // Close the underlying file descriptor and dup it to a known fd
   936                 polled = (n > 0);
   876             // that's already closed.  This prevents other operations on this
   937             } finally {
   877             // channel from using the old fd, which might be recycled in the
   938                 endRead(blocking, polled);
   878             // meantime and allocated to an entirely different channel.
   939             }
   879             //
   940             return polled;
   880             if (state != ST_KILLED)
   941         } finally {
   881                 nd.preClose(fd);
   942             readLock.unlock();
   882 
   943         }
   883             // Signal native threads, if needed.  If a target thread is not
   944     }
   884             // currently blocked in an I/O operation then no harm is done since
   945 
   885             // the signal handler doesn't actually do anything.
   946     /**
   886             //
   947      * Poll this channel's socket for a connection, up to the given timeout.
   887             if (readerThread != 0)
   948      * @return {@code true} if the socket is polled
   888                 NativeThread.signal(readerThread);
   949      */
   889 
   950     boolean pollConnected(long timeout) throws IOException {
   890             if (writerThread != 0)
   951         boolean blocking = isBlocking();
   891                 NativeThread.signal(writerThread);
   952         assert Thread.holdsLock(blockingLock()) && blocking;
   892 
   953 
   893             // If this channel is not registered then it's safe to close the fd
   954         readLock.lock();
   894             // immediately since we know at this point that no thread is
   955         try {
   895             // blocked in an I/O operation upon the channel and, since the
   956             writeLock.lock();
   896             // channel is marked closed, no thread will start another such
   957             try {
   897             // operation.  If this channel is registered then we don't close
   958                 boolean polled = false;
   898             // the fd since it might be in use by a selector.  In that case
   959                 try {
   899             // closing this channel caused its keys to be cancelled, so the
   960                     beginFinishConnect(blocking);
   900             // last selector to deregister a key for this channel will invoke
   961                     int n = Net.poll(fd, Net.POLLCONN, timeout);
   901             // kill() to close the fd.
   962                     polled = (n > 0);
   902             //
   963                 } finally {
   903             if (!isRegistered())
   964                     endFinishConnect(blocking, polled);
   904                 kill();
   965                 }
   905         }
   966                 return polled;
   906     }
   967             } finally {
   907 
   968                 writeLock.unlock();
   908     public void kill() throws IOException {
   969             }
   909         synchronized (stateLock) {
   970         } finally {
   910             if (state == ST_KILLED)
   971             readLock.unlock();
   911                 return;
       
   912             if (state == ST_UNINITIALIZED) {
       
   913                 state = ST_KILLED;
       
   914                 return;
       
   915             }
       
   916             assert !isOpen() && !isRegistered();
       
   917 
       
   918             // Postpone the kill if there is a waiting reader
       
   919             // or writer thread. See the comments in read() for
       
   920             // more detailed explanation.
       
   921             if (readerThread == 0 && writerThread == 0) {
       
   922                 nd.close(fd);
       
   923                 state = ST_KILLED;
       
   924             } else {
       
   925                 state = ST_KILLPENDING;
       
   926             }
       
   927         }
   972         }
   928     }
   973     }
   929 
   974 
   930     /**
   975     /**
   931      * Translates native poll revent ops into a ready operation ops
   976      * Translates native poll revent ops into a ready operation ops
   954             (state == ST_CONNECTED))
   999             (state == ST_CONNECTED))
   955             newOps |= SelectionKey.OP_READ;
  1000             newOps |= SelectionKey.OP_READ;
   956 
  1001 
   957         if (((ops & Net.POLLCONN) != 0) &&
  1002         if (((ops & Net.POLLCONN) != 0) &&
   958             ((intOps & SelectionKey.OP_CONNECT) != 0) &&
  1003             ((intOps & SelectionKey.OP_CONNECT) != 0) &&
   959             ((state == ST_UNCONNECTED) || (state == ST_PENDING))) {
  1004             ((state == ST_UNCONNECTED) || (state == ST_CONNECTIONPENDING))) {
   960             newOps |= SelectionKey.OP_CONNECT;
  1005             newOps |= SelectionKey.OP_CONNECT;
   961         }
  1006         }
   962 
  1007 
   963         if (((ops & Net.POLLOUT) != 0) &&
  1008         if (((ops & Net.POLLOUT) != 0) &&
   964             ((intOps & SelectionKey.OP_WRITE) != 0) &&
  1009             ((intOps & SelectionKey.OP_WRITE) != 0) &&
   973         return translateReadyOps(ops, sk.nioReadyOps(), sk);
  1018         return translateReadyOps(ops, sk.nioReadyOps(), sk);
   974     }
  1019     }
   975 
  1020 
   976     public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
  1021     public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
   977         return translateReadyOps(ops, 0, sk);
  1022         return translateReadyOps(ops, 0, sk);
   978     }
       
   979 
       
   980     // package-private
       
   981     int poll(int events, long timeout) throws IOException {
       
   982         assert Thread.holdsLock(blockingLock()) && !isBlocking();
       
   983 
       
   984         readLock.lock();
       
   985         try {
       
   986             int n = 0;
       
   987             try {
       
   988                 begin();
       
   989                 synchronized (stateLock) {
       
   990                     if (!isOpen())
       
   991                         return 0;
       
   992                     readerThread = NativeThread.current();
       
   993                 }
       
   994                 n = Net.poll(fd, events, timeout);
       
   995             } finally {
       
   996                 readerCleanup();
       
   997                 end(n > 0);
       
   998             }
       
   999             return n;
       
  1000         } finally {
       
  1001             readLock.unlock();
       
  1002         }
       
  1003     }
  1023     }
  1004 
  1024 
  1005     /**
  1025     /**
  1006      * Translates an interest operation set into a native poll event set
  1026      * Translates an interest operation set into a native poll event set
  1007      */
  1027      */
  1035             synchronized (stateLock) {
  1055             synchronized (stateLock) {
  1036                 switch (state) {
  1056                 switch (state) {
  1037                 case ST_UNCONNECTED:
  1057                 case ST_UNCONNECTED:
  1038                     sb.append("unconnected");
  1058                     sb.append("unconnected");
  1039                     break;
  1059                     break;
  1040                 case ST_PENDING:
  1060                 case ST_CONNECTIONPENDING:
  1041                     sb.append("connection-pending");
  1061                     sb.append("connection-pending");
  1042                     break;
  1062                     break;
  1043                 case ST_CONNECTED:
  1063                 case ST_CONNECTED:
  1044                     sb.append("connected");
  1064                     sb.append("connected");
  1045                     if (!isInputOpen)
  1065                     if (isInputClosed)
  1046                         sb.append(" ishut");
  1066                         sb.append(" ishut");
  1047                     if (!isOutputOpen)
  1067                     if (isOutputClosed)
  1048                         sb.append(" oshut");
  1068                         sb.append(" oshut");
  1049                     break;
  1069                     break;
  1050                 }
  1070                 }
  1051                 InetSocketAddress addr = localAddress();
  1071                 InetSocketAddress addr = localAddress();
  1052                 if (addr != null) {
  1072                 if (addr != null) {