src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java
changeset 49001 ce06058197a4
parent 48761 74c1fa26435a
child 49141 ac95c7a76132
--- a/src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java	Tue Feb 27 23:11:26 2018 -0800
+++ b/src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java	Wed Feb 28 09:54:38 2018 +0000
@@ -48,6 +48,7 @@
 import java.nio.channels.spi.SelectorProvider;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -62,7 +63,6 @@
     extends SocketChannel
     implements SelChImpl
 {
-
     // Used to make native read and write calls
     private static NativeDispatcher nd;
 
@@ -70,10 +70,6 @@
     private final FileDescriptor fd;
     private final int fdVal;
 
-    // IDs of native threads doing reads and writes, for signalling
-    private volatile long readerThread;
-    private volatile long writerThread;
-
     // Lock held by current reading or connecting thread
     private final ReentrantLock readLock = new ReentrantLock();
 
@@ -84,28 +80,32 @@
     // DO NOT invoke a blocking I/O operation while holding this lock!
     private final Object stateLock = new Object();
 
+    // Input/Output closed
+    private volatile boolean isInputClosed;
+    private volatile boolean isOutputClosed;
+
     // -- The following fields are protected by stateLock
 
     // set true when exclusive binding is on and SO_REUSEADDR is emulated
     private boolean isReuseAddress;
 
     // State, increases monotonically
-    private static final int ST_UNINITIALIZED = -1;
     private static final int ST_UNCONNECTED = 0;
-    private static final int ST_PENDING = 1;
+    private static final int ST_CONNECTIONPENDING = 1;
     private static final int ST_CONNECTED = 2;
-    private static final int ST_KILLPENDING = 3;
-    private static final int ST_KILLED = 4;
-    private int state = ST_UNINITIALIZED;
+    private static final int ST_CLOSING = 3;
+    private static final int ST_KILLPENDING = 4;
+    private static final int ST_KILLED = 5;
+    private int state;
+
+    // IDs of native threads doing reads and writes, for signalling
+    private long readerThread;
+    private long writerThread;
 
     // Binding
     private InetSocketAddress localAddress;
     private InetSocketAddress remoteAddress;
 
-    // Input/Output open
-    private boolean isInputOpen = true;
-    private boolean isOutputOpen = true;
-
     // Socket adaptor, created on demand
     private Socket socket;
 
@@ -118,36 +118,43 @@
         super(sp);
         this.fd = Net.socket(true);
         this.fdVal = IOUtil.fdVal(fd);
-        this.state = ST_UNCONNECTED;
     }
 
-    SocketChannelImpl(SelectorProvider sp,
-                      FileDescriptor fd,
-                      boolean bound)
+    SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound)
         throws IOException
     {
         super(sp);
         this.fd = fd;
         this.fdVal = IOUtil.fdVal(fd);
-        this.state = ST_UNCONNECTED;
-        if (bound)
-            this.localAddress = Net.localAddress(fd);
+        if (bound) {
+            synchronized (stateLock) {
+                this.localAddress = Net.localAddress(fd);
+            }
+        }
     }
 
     // Constructor for sockets obtained from server sockets
     //
-    SocketChannelImpl(SelectorProvider sp,
-                      FileDescriptor fd, InetSocketAddress remote)
+    SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, InetSocketAddress isa)
         throws IOException
     {
         super(sp);
         this.fd = fd;
         this.fdVal = IOUtil.fdVal(fd);
-        this.state = ST_CONNECTED;
-        this.localAddress = Net.localAddress(fd);
-        this.remoteAddress = remote;
+        synchronized (stateLock) {
+            this.localAddress = Net.localAddress(fd);
+            this.remoteAddress = isa;
+            this.state = ST_CONNECTED;
+        }
     }
 
+    // @throws ClosedChannelException if channel is closed
+    private void ensureOpen() throws ClosedChannelException {
+        if (!isOpen())
+            throw new ClosedChannelException();
+    }
+
+    @Override
     public Socket socket() {
         synchronized (stateLock) {
             if (socket == null)
@@ -159,17 +166,15 @@
     @Override
     public SocketAddress getLocalAddress() throws IOException {
         synchronized (stateLock) {
-            if (!isOpen())
-                throw new ClosedChannelException();
-            return  Net.getRevealedLocalAddress(localAddress);
+            ensureOpen();
+            return Net.getRevealedLocalAddress(localAddress);
         }
     }
 
     @Override
     public SocketAddress getRemoteAddress() throws IOException {
         synchronized (stateLock) {
-            if (!isOpen())
-                throw new ClosedChannelException();
+            ensureOpen();
             return remoteAddress;
         }
     }
@@ -178,14 +183,12 @@
     public <T> SocketChannel setOption(SocketOption<T> name, T value)
         throws IOException
     {
-        if (name == null)
-            throw new NullPointerException();
+        Objects.requireNonNull(name);
         if (!supportedOptions().contains(name))
             throw new UnsupportedOperationException("'" + name + "' not supported");
 
         synchronized (stateLock) {
-            if (!isOpen())
-                throw new ClosedChannelException();
+            ensureOpen();
 
             if (name == StandardSocketOptions.IP_TOS) {
                 ProtocolFamily family = Net.isIPv6Available() ?
@@ -211,18 +214,14 @@
     public <T> T getOption(SocketOption<T> name)
         throws IOException
     {
-        if (name == null)
-            throw new NullPointerException();
+        Objects.requireNonNull(name);
         if (!supportedOptions().contains(name))
             throw new UnsupportedOperationException("'" + name + "' not supported");
 
         synchronized (stateLock) {
-            if (!isOpen())
-                throw new ClosedChannelException();
+            ensureOpen();
 
-            if (name == StandardSocketOptions.SO_REUSEADDR &&
-                    Net.useExclusiveBind())
-            {
+            if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
                 // SO_REUSEADDR emulated when using exclusive bind
                 return (T)Boolean.valueOf(isReuseAddress);
             }
@@ -243,7 +242,7 @@
         static final Set<SocketOption<?>> defaultOptions = defaultOptions();
 
         private static Set<SocketOption<?>> defaultOptions() {
-            HashSet<SocketOption<?>> set = new HashSet<>(8);
+            HashSet<SocketOption<?>> set = new HashSet<>();
             set.add(StandardSocketOptions.SO_SNDBUF);
             set.add(StandardSocketOptions.SO_RCVBUF);
             set.add(StandardSocketOptions.SO_KEEPALIVE);
@@ -256,9 +255,7 @@
             // additional options required by socket adaptor
             set.add(StandardSocketOptions.IP_TOS);
             set.add(ExtendedSocketOption.SO_OOBINLINE);
-            ExtendedSocketOptions extendedOptions =
-                    ExtendedSocketOptions.getInstance();
-            set.addAll(extendedOptions.options());
+            set.addAll(ExtendedSocketOptions.getInstance().options());
             return Collections.unmodifiableSet(set);
         }
     }
@@ -268,329 +265,277 @@
         return DefaultOptionsHolder.defaultOptions;
     }
 
-    private boolean ensureReadOpen() throws ClosedChannelException {
+    /**
+     * Marks the beginning of a read operation that might block.
+     *
+     * @throws ClosedChannelException if the channel is closed
+     * @throws NotYetConnectedException if the channel is not yet connected
+     */
+    private void beginRead(boolean blocking) throws ClosedChannelException {
+        if (blocking) {
+            // set hook for Thread.interrupt
+            begin();
+        }
         synchronized (stateLock) {
-            if (!isOpen())
-                throw new ClosedChannelException();
-            if (!isConnected())
+            ensureOpen();
+            if (state != ST_CONNECTED)
                 throw new NotYetConnectedException();
-            if (!isInputOpen)
-                return false;
-            else
-                return true;
+            if (blocking)
+                readerThread = NativeThread.current();
         }
     }
 
-    private void ensureWriteOpen() throws ClosedChannelException {
-        synchronized (stateLock) {
-            if (!isOpen())
-                throw new ClosedChannelException();
-            if (!isOutputOpen)
-                throw new ClosedChannelException();
-            if (!isConnected())
-                throw new NotYetConnectedException();
+    /**
+     * Marks the end of a read operation that may have blocked.
+     *
+     * @throws AsynchronousCloseException if the channel was closed due to this
+     * thread being interrupted on a blocking read operation.
+     */
+    private void endRead(boolean blocking, boolean completed)
+        throws AsynchronousCloseException
+    {
+        if (blocking) {
+            synchronized (stateLock) {
+                readerThread = 0;
+                // notify any thread waiting in implCloseSelectableChannel
+                if (state == ST_CLOSING) {
+                    stateLock.notifyAll();
+                }
+            }
+            // remove hook for Thread.interrupt
+            end(completed);
         }
     }
 
-    private void readerCleanup() throws IOException {
-        synchronized (stateLock) {
-            readerThread = 0;
-            if (state == ST_KILLPENDING)
-                kill();
+    @Override
+    public int read(ByteBuffer buf) throws IOException {
+        Objects.requireNonNull(buf);
+
+        readLock.lock();
+        try {
+            boolean blocking = isBlocking();
+            int n = 0;
+            try {
+                beginRead(blocking);
+
+                // check if input is shutdown
+                if (isInputClosed)
+                    return IOStatus.EOF;
+
+                if (blocking) {
+                    do {
+                        n = IOUtil.read(fd, buf, -1, nd);
+                    } while (n == IOStatus.INTERRUPTED && isOpen());
+                } else {
+                    n = IOUtil.read(fd, buf, -1, nd);
+                }
+            } finally {
+                endRead(blocking, n > 0);
+                if (n <= 0 && isInputClosed)
+                    return IOStatus.EOF;
+            }
+            return IOStatus.normalize(n);
+        } finally {
+            readLock.unlock();
         }
     }
 
-    private void writerCleanup() throws IOException {
-        synchronized (stateLock) {
-            writerThread = 0;
-            if (state == ST_KILLPENDING)
-                kill();
-        }
-    }
-
-    public int read(ByteBuffer buf) throws IOException {
-
-        if (buf == null)
-            throw new NullPointerException();
+    @Override
+    public long read(ByteBuffer[] dsts, int offset, int length)
+        throws IOException
+    {
+        Objects.checkFromIndexSize(offset, length, dsts.length);
 
         readLock.lock();
         try {
-            if (!ensureReadOpen())
-                return -1;
+            boolean blocking = isBlocking();
+            long n = 0;
+            try {
+                beginRead(blocking);
+
+                // check if input is shutdown
+                if (isInputClosed)
+                    return IOStatus.EOF;
+
+                if (blocking) {
+                    do {
+                        n = IOUtil.read(fd, dsts, offset, length, nd);
+                    } while (n == IOStatus.INTERRUPTED && isOpen());
+                } else {
+                    n = IOUtil.read(fd, dsts, offset, length, nd);
+                }
+            } finally {
+                endRead(blocking, n > 0);
+                if (n <= 0 && isInputClosed)
+                    return IOStatus.EOF;
+            }
+            return IOStatus.normalize(n);
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    /**
+     * Marks the beginning of a write operation that might block.
+     *
+     * @throws ClosedChannelException if the channel is closed or output shutdown
+     * @throws NotYetConnectedException if the channel is not yet connected
+     */
+    private void beginWrite(boolean blocking) throws ClosedChannelException {
+        if (blocking) {
+            // set hook for Thread.interrupt
+            begin();
+        }
+        synchronized (stateLock) {
+            ensureOpen();
+            if (isOutputClosed)
+                throw new ClosedChannelException();
+            if (state != ST_CONNECTED)
+                throw new NotYetConnectedException();
+            if (blocking)
+                writerThread = NativeThread.current();
+        }
+    }
+
+    /**
+     * Marks the end of a write operation that may have blocked.
+     *
+     * @throws AsynchronousCloseException if the channel was closed due to this
+     * thread being interrupted on a blocking write operation.
+     */
+    private void endWrite(boolean blocking, boolean completed)
+        throws AsynchronousCloseException
+    {
+        if (blocking) {
+            synchronized (stateLock) {
+                writerThread = 0;
+                // notify any thread waiting in implCloseSelectableChannel
+                if (state == ST_CLOSING) {
+                    stateLock.notifyAll();
+                }
+            }
+            // remove hook for Thread.interrupt
+            end(completed);
+        }
+    }
+
+    @Override
+    public int write(ByteBuffer buf) throws IOException {
+        Objects.requireNonNull(buf);
+
+        writeLock.lock();
+        try {
+            boolean blocking = isBlocking();
             int n = 0;
             try {
-
-                // Set up the interruption machinery; see
-                // AbstractInterruptibleChannel for details
-                //
-                begin();
+                beginWrite(blocking);
+                if (blocking) {
+                    do {
+                        n = IOUtil.write(fd, buf, -1, nd);
+                    } while (n == IOStatus.INTERRUPTED && isOpen());
+                } else {
+                    n = IOUtil.write(fd, buf, -1, nd);
+                }
+            } finally {
+                endWrite(blocking, n > 0);
+                if (n <= 0 && isOutputClosed)
+                    throw new AsynchronousCloseException();
+            }
+            return IOStatus.normalize(n);
+        } finally {
+            writeLock.unlock();
+        }
+    }
 
-                synchronized (stateLock) {
-                    if (!isOpen()) {
-                    // Either the current thread is already interrupted, so
-                    // begin() closed the channel, or another thread closed the
-                    // channel since we checked it a few bytecodes ago.  In
-                    // either case the value returned here is irrelevant since
-                    // the invocation of end() in the finally block will throw
-                    // an appropriate exception.
-                    //
-                        return 0;
-
-                    }
-
-                    // Save this thread so that it can be signalled on those
-                    // platforms that require it
-                    //
-                    readerThread = NativeThread.current();
-                }
+    @Override
+    public long write(ByteBuffer[] srcs, int offset, int length)
+        throws IOException
+    {
+        Objects.checkFromIndexSize(offset, length, srcs.length);
 
-                // Between the previous test of isOpen() and the return of the
-                // IOUtil.read invocation below, this channel might be closed
-                // or this thread might be interrupted.  We rely upon the
-                // implicit synchronization point in the kernel read() call to
-                // make sure that the right thing happens.  In either case the
-                // implCloseSelectableChannel method is ultimately invoked in
-                // some other thread, so there are three possibilities:
-                //
-                //   - implCloseSelectableChannel() invokes nd.preClose()
-                //     before this thread invokes read(), in which case the
-                //     read returns immediately with either EOF or an error,
-                //     the latter of which will cause an IOException to be
-                //     thrown.
-                //
-                //   - implCloseSelectableChannel() invokes nd.preClose() after
-                //     this thread is blocked in read().  On some operating
-                //     systems (e.g., Solaris and Windows) this causes the read
-                //     to return immediately with either EOF or an error
-                //     indication.
-                //
-                //   - implCloseSelectableChannel() invokes nd.preClose() after
-                //     this thread is blocked in read() but the operating
-                //     system (e.g., Linux) doesn't support preemptive close,
-                //     so implCloseSelectableChannel() proceeds to signal this
-                //     thread, thereby causing the read to return immediately
-                //     with IOStatus.INTERRUPTED.
-                //
-                // In all three cases the invocation of end() in the finally
-                // clause will notice that the channel has been closed and
-                // throw an appropriate exception (AsynchronousCloseException
-                // or ClosedByInterruptException) if necessary.
-                //
-                // *There is A fourth possibility. implCloseSelectableChannel()
-                // invokes nd.preClose(), signals reader/writer thred and quickly
-                // moves on to nd.close() in kill(), which does a real close.
-                // Then a third thread accepts a new connection, opens file or
-                // whatever that causes the released "fd" to be recycled. All
-                // above happens just between our last isOpen() check and the
-                // next kernel read reached, with the recycled "fd". The solution
-                // is to postpone the real kill() if there is a reader or/and
-                // writer thread(s) over there "waiting", leave the cleanup/kill
-                // to the reader or writer thread. (the preClose() still happens
-                // so the connection gets cut off as usual).
-                //
-                // For socket channels there is the additional wrinkle that
-                // asynchronous shutdown works much like asynchronous close,
-                // except that the channel is shutdown rather than completely
-                // closed.  This is analogous to the first two cases above,
-                // except that the shutdown operation plays the role of
-                // nd.preClose().
-                for (;;) {
-                    n = IOUtil.read(fd, buf, -1, nd);
-                    if ((n == IOStatus.INTERRUPTED) && isOpen()) {
-                        // The system call was interrupted but the channel
-                        // is still open, so retry
-                        continue;
-                    }
-                    return IOStatus.normalize(n);
+        writeLock.lock();
+        try {
+            boolean blocking = isBlocking();
+            long n = 0;
+            try {
+                beginWrite(blocking);
+                if (blocking) {
+                    do {
+                        n = IOUtil.write(fd, srcs, offset, length, nd);
+                    } while (n == IOStatus.INTERRUPTED && isOpen());
+                } else {
+                    n = IOUtil.write(fd, srcs, offset, length, nd);
                 }
-
             } finally {
-                readerCleanup();        // Clear reader thread
-                // The end method, which is defined in our superclass
-                // AbstractInterruptibleChannel, resets the interruption
-                // machinery.  If its argument is true then it returns
-                // normally; otherwise it checks the interrupt and open state
-                // of this channel and throws an appropriate exception if
-                // necessary.
-                //
-                // So, if we actually managed to do any I/O in the above try
-                // block then we pass true to the end method.  We also pass
-                // true if the channel was in non-blocking mode when the I/O
-                // operation was initiated but no data could be transferred;
-                // this prevents spurious exceptions from being thrown in the
-                // rare event that a channel is closed or a thread is
-                // interrupted at the exact moment that a non-blocking I/O
-                // request is made.
-                //
-                end(n > 0 || (n == IOStatus.UNAVAILABLE));
+                endWrite(blocking, n > 0);
+                if (n <= 0 && isOutputClosed)
+                    throw new AsynchronousCloseException();
+            }
+            return IOStatus.normalize(n);
+        } finally {
+            writeLock.unlock();
+        }
+    }
 
-                // Extra case for socket channels: Asynchronous shutdown
-                //
+    /**
+     * Writes a byte of out of band data.
+     */
+    int sendOutOfBandData(byte b) throws IOException {
+        writeLock.lock();
+        try {
+            boolean blocking = isBlocking();
+            int n = 0;
+            try {
+                beginWrite(blocking);
+                if (blocking) {
+                    do {
+                        n = sendOutOfBandData(fd, b);
+                    } while (n == IOStatus.INTERRUPTED && isOpen());
+                } else {
+                    n = sendOutOfBandData(fd, b);
+                }
+            } finally {
+                endWrite(blocking, n > 0);
+                if (n <= 0 && isOutputClosed)
+                    throw new AsynchronousCloseException();
+            }
+            return IOStatus.normalize(n);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    @Override
+    protected void implConfigureBlocking(boolean block) throws IOException {
+        readLock.lock();
+        try {
+            writeLock.lock();
+            try {
                 synchronized (stateLock) {
-                    if ((n <= 0) && (!isInputOpen))
-                        return IOStatus.EOF;
+                    ensureOpen();
+                    IOUtil.configureBlocking(fd, block);
                 }
-
-                assert IOStatus.check(n);
-
+            } finally {
+                writeLock.unlock();
             }
         } finally {
             readLock.unlock();
         }
     }
 
-    public long read(ByteBuffer[] dsts, int offset, int length)
-        throws IOException
-    {
-        if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
-            throw new IndexOutOfBoundsException();
-        readLock.lock();
-        try {
-            if (!ensureReadOpen())
-                return -1;
-            long n = 0;
-            try {
-                begin();
-                synchronized (stateLock) {
-                    if (!isOpen())
-                        return 0;
-                    readerThread = NativeThread.current();
-                }
-
-                for (;;) {
-                    n = IOUtil.read(fd, dsts, offset, length, nd);
-                    if ((n == IOStatus.INTERRUPTED) && isOpen())
-                        continue;
-                    return IOStatus.normalize(n);
-                }
-            } finally {
-                readerCleanup();
-                end(n > 0 || (n == IOStatus.UNAVAILABLE));
-                synchronized (stateLock) {
-                    if ((n <= 0) && (!isInputOpen))
-                        return IOStatus.EOF;
-                }
-                assert IOStatus.check(n);
-            }
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    public int write(ByteBuffer buf) throws IOException {
-        if (buf == null)
-            throw new NullPointerException();
-        writeLock.lock();
-        try {
-            ensureWriteOpen();
-            int n = 0;
-            try {
-                begin();
-                synchronized (stateLock) {
-                    if (!isOpen())
-                        return 0;
-                    writerThread = NativeThread.current();
-                }
-                for (;;) {
-                    n = IOUtil.write(fd, buf, -1, nd);
-                    if ((n == IOStatus.INTERRUPTED) && isOpen())
-                        continue;
-                    return IOStatus.normalize(n);
-                }
-            } finally {
-                writerCleanup();
-                end(n > 0 || (n == IOStatus.UNAVAILABLE));
-                synchronized (stateLock) {
-                    if ((n <= 0) && (!isOutputOpen))
-                        throw new AsynchronousCloseException();
-                }
-                assert IOStatus.check(n);
-            }
-        } finally {
-            writeLock.unlock();
-        }
-    }
-
-    public long write(ByteBuffer[] srcs, int offset, int length)
-        throws IOException
-    {
-        if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
-            throw new IndexOutOfBoundsException();
-        writeLock.lock();
-        try {
-            ensureWriteOpen();
-            long n = 0;
-            try {
-                begin();
-                synchronized (stateLock) {
-                    if (!isOpen())
-                        return 0;
-                    writerThread = NativeThread.current();
-                }
-                for (;;) {
-                    n = IOUtil.write(fd, srcs, offset, length, nd);
-                    if ((n == IOStatus.INTERRUPTED) && isOpen())
-                        continue;
-                    return IOStatus.normalize(n);
-                }
-            } finally {
-                writerCleanup();
-                end((n > 0) || (n == IOStatus.UNAVAILABLE));
-                synchronized (stateLock) {
-                    if ((n <= 0) && (!isOutputOpen))
-                        throw new AsynchronousCloseException();
-                }
-                assert IOStatus.check(n);
-            }
-        } finally {
-            writeLock.unlock();
-        }
-    }
-
-    // package-private
-    int sendOutOfBandData(byte b) throws IOException {
-        writeLock.lock();
-        try {
-            ensureWriteOpen();
-            int n = 0;
-            try {
-                begin();
-                synchronized (stateLock) {
-                    if (!isOpen())
-                        return 0;
-                    writerThread = NativeThread.current();
-                }
-                for (;;) {
-                    n = sendOutOfBandData(fd, b);
-                    if ((n == IOStatus.INTERRUPTED) && isOpen())
-                        continue;
-                    return IOStatus.normalize(n);
-                }
-            } finally {
-                writerCleanup();
-                end((n > 0) || (n == IOStatus.UNAVAILABLE));
-                synchronized (stateLock) {
-                    if ((n <= 0) && (!isOutputOpen))
-                        throw new AsynchronousCloseException();
-                }
-                assert IOStatus.check(n);
-            }
-        } finally {
-            writeLock.unlock();
-        }
-    }
-
-    protected void implConfigureBlocking(boolean block) throws IOException {
-        IOUtil.configureBlocking(fd, block);
-    }
-
-    public InetSocketAddress localAddress() {
+    /**
+     * Returns the local address, or null if not bound
+     */
+    InetSocketAddress localAddress() {
         synchronized (stateLock) {
             return localAddress;
         }
     }
 
-    public SocketAddress remoteAddress() {
+    /**
+     * Returns the remote address, or null if not connected
+     */
+    InetSocketAddress remoteAddress() {
         synchronized (stateLock) {
             return remoteAddress;
         }
@@ -603,9 +548,8 @@
             writeLock.lock();
             try {
                 synchronized (stateLock) {
-                    if (!isOpen())
-                        throw new ClosedChannelException();
-                    if (state == ST_PENDING)
+                    ensureOpen();
+                    if (state == ST_CONNECTIONPENDING)
                         throw new ConnectionPendingException();
                     if (localAddress != null)
                         throw new AlreadyBoundException();
@@ -628,101 +572,115 @@
         return this;
     }
 
+    @Override
     public boolean isConnected() {
         synchronized (stateLock) {
             return (state == ST_CONNECTED);
         }
     }
 
+    @Override
     public boolean isConnectionPending() {
         synchronized (stateLock) {
-            return (state == ST_PENDING);
+            return (state == ST_CONNECTIONPENDING);
         }
     }
 
-    void ensureOpenAndUnconnected() throws IOException { // package-private
+    /**
+     * Marks the beginning of a connect operation that might block.
+     *
+     * @throws ClosedChannelException if the channel is closed
+     * @throws AlreadyConnectedException if already connected
+     * @throws ConnectionPendingException is a connection is pending
+     */
+    private void beginConnect(boolean blocking) throws ClosedChannelException {
+        if (blocking) {
+            // set hook for Thread.interrupt
+            begin();
+        }
         synchronized (stateLock) {
-            if (!isOpen())
-                throw new ClosedChannelException();
+            ensureOpen();
             if (state == ST_CONNECTED)
                 throw new AlreadyConnectedException();
-            if (state == ST_PENDING)
+            if (state == ST_CONNECTIONPENDING)
                 throw new ConnectionPendingException();
+            if (blocking)
+                readerThread = NativeThread.current();
         }
     }
 
+    /**
+     * Marks the end of a connect operation that may have blocked.
+     *
+     * @throws AsynchronousCloseException if the channel was closed due to this
+     * thread being interrupted on a blocking connect operation.
+     */
+    private void endConnect(boolean blocking, boolean completed)
+        throws AsynchronousCloseException
+    {
+        endRead(blocking, completed);
+    }
+
+    @Override
     public boolean connect(SocketAddress sa) throws IOException {
+        InetSocketAddress isa = Net.checkAddress(sa);
+        SecurityManager sm = System.getSecurityManager();
+        if (sm != null)
+            sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
+
         readLock.lock();
         try {
             writeLock.lock();
             try {
-                ensureOpenAndUnconnected();
-                InetSocketAddress isa = Net.checkAddress(sa);
-                SecurityManager sm = System.getSecurityManager();
-                if (sm != null)
-                    sm.checkConnect(isa.getAddress().getHostAddress(),
-                            isa.getPort());
-                synchronized (blockingLock()) {
-                    int n = 0;
-                    try {
-                        try {
-                            begin();
-                            synchronized (stateLock) {
-                                if (!isOpen()) {
-                                    return false;
-                                }
-                                // notify hook only if unbound
-                                if (localAddress == null) {
-                                    NetHooks.beforeTcpConnect(fd,
-                                            isa.getAddress(),
-                                            isa.getPort());
-                                }
-                                readerThread = NativeThread.current();
-                            }
-                            for (;;) {
-                                InetAddress ia = isa.getAddress();
-                                if (ia.isAnyLocalAddress())
-                                    ia = InetAddress.getLocalHost();
-                                n = Net.connect(fd,
-                                        ia,
-                                        isa.getPort());
-                                if ((n == IOStatus.INTERRUPTED) && isOpen())
-                                    continue;
-                                break;
-                            }
-
-                        } finally {
-                            readerCleanup();
-                            end((n > 0) || (n == IOStatus.UNAVAILABLE));
-                            assert IOStatus.check(n);
-                        }
-                    } catch (IOException x) {
-                        // If an exception was thrown, close the channel after
-                        // invoking end() so as to avoid bogus
-                        // AsynchronousCloseExceptions
-                        close();
-                        throw x;
-                    }
-                    synchronized (stateLock) {
-                        remoteAddress = isa;
-                        if (n > 0) {
-
-                            // Connection succeeded; disallow further
-                            // invocation
-                            state = ST_CONNECTED;
-                            if (isOpen())
-                                localAddress = Net.localAddress(fd);
-                            return true;
-                        }
-                        // If nonblocking and no exception then connection
-                        // pending; disallow another invocation
-                        if (!isBlocking())
-                            state = ST_PENDING;
-                        else
-                            assert false;
+                // notify before-connect hook
+                synchronized (stateLock) {
+                    if (state == ST_UNCONNECTED && localAddress == null) {
+                        NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort());
                     }
                 }
-                return false;
+
+                InetAddress ia = isa.getAddress();
+                if (ia.isAnyLocalAddress())
+                    ia = InetAddress.getLocalHost();
+
+                int n = 0;
+                boolean blocking = isBlocking();
+                try {
+                    try {
+                        beginConnect(blocking);
+                        if (blocking) {
+                            do {
+                                n = Net.connect(fd, ia, isa.getPort());
+                            } while (n == IOStatus.INTERRUPTED && isOpen());
+                        } else {
+                            n = Net.connect(fd, ia, isa.getPort());
+                        }
+                    } finally {
+                        endConnect(blocking, n > 0);
+                    }
+                } catch (IOException x) {
+                    // connect failed, close socket
+                    close();
+                    throw x;
+                }
+
+                // connection may be established
+                synchronized (stateLock) {
+                    if (!isOpen())
+                        throw new AsynchronousCloseException();
+                    remoteAddress = isa;
+                    if (n > 0) {
+                        // connected established
+                        localAddress = Net.localAddress(fd);
+                        state = ST_CONNECTED;
+                        return true;
+                    } else {
+                        // connection pending
+                        assert !blocking;
+                        state = ST_CONNECTIONPENDING;
+                        return false;
+                    }
+                }
             } finally {
                 writeLock.unlock();
             }
@@ -731,83 +689,85 @@
         }
     }
 
+    /**
+     * Marks the beginning of a finishConnect operation that might block.
+     *
+     * @throws ClosedChannelException if the channel is closed
+     * @throws NoConnectionPendingException if no connection is pending
+     */
+    private void beginFinishConnect(boolean blocking) throws ClosedChannelException {
+        if (blocking) {
+            // set hook for Thread.interrupt
+            begin();
+        }
+        synchronized (stateLock) {
+            ensureOpen();
+            if (state != ST_CONNECTIONPENDING)
+                throw new NoConnectionPendingException();
+            if (blocking)
+                readerThread = NativeThread.current();
+        }
+    }
+
+    /**
+     * Marks the end of a finishConnect operation that may have blocked.
+     *
+     * @throws AsynchronousCloseException if the channel was closed due to this
+     * thread being interrupted on a blocking connect operation.
+     */
+    private void endFinishConnect(boolean blocking, boolean completed)
+        throws AsynchronousCloseException
+    {
+        endRead(blocking, completed);
+    }
+
+    @Override
     public boolean finishConnect() throws IOException {
         readLock.lock();
         try {
             writeLock.lock();
             try {
+                // already connected?
                 synchronized (stateLock) {
-                    if (!isOpen())
-                        throw new ClosedChannelException();
                     if (state == ST_CONNECTED)
                         return true;
-                    if (state != ST_PENDING)
-                        throw new NoConnectionPendingException();
                 }
+
                 int n = 0;
+                boolean blocking = isBlocking();
                 try {
                     try {
-                        begin();
-                        synchronized (blockingLock()) {
-                            synchronized (stateLock) {
-                                if (!isOpen()) {
-                                    return false;
-                                }
-                                readerThread = NativeThread.current();
-                            }
-                            if (!isBlocking()) {
-                                for (;;) {
-                                    n = checkConnect(fd, false);
-                                    if ((n == IOStatus.INTERRUPTED) && isOpen())
-                                        continue;
-                                    break;
-                                }
-                            } else {
-                                for (;;) {
-                                    n = checkConnect(fd, true);
-                                    if (n == 0) {
-                                        // Loop in case of
-                                        // spurious notifications
-                                        continue;
-                                    }
-                                    if ((n == IOStatus.INTERRUPTED) && isOpen())
-                                        continue;
-                                    break;
-                                }
-                            }
+                        beginFinishConnect(blocking);
+                        if (blocking) {
+                            do {
+                                n = checkConnect(fd, true);
+                            } while (n == 0 || (n == IOStatus.INTERRUPTED) && isOpen());
+                        } else {
+                            n = checkConnect(fd, false);
                         }
                     } finally {
-                        synchronized (stateLock) {
-                            readerThread = 0;
-                            if (state == ST_KILLPENDING) {
-                                kill();
-                                // poll()/getsockopt() does not report
-                                // error (throws exception, with n = 0)
-                                // on Linux platform after dup2 and
-                                // signal-wakeup. Force n to 0 so the
-                                // end() can throw appropriate exception
-                                n = 0;
-                            }
-                        }
-                        end((n > 0) || (n == IOStatus.UNAVAILABLE));
-                        assert IOStatus.check(n);
+                        endFinishConnect(blocking, n > 0);
                     }
                 } catch (IOException x) {
-                    // If an exception was thrown, close the channel after
-                    // invoking end() so as to avoid bogus
-                    // AsynchronousCloseExceptions
                     close();
                     throw x;
                 }
-                if (n > 0) {
-                    synchronized (stateLock) {
+
+                // post finishConnect, connection may be established
+                synchronized (stateLock) {
+                    if (!isOpen())
+                        throw new AsynchronousCloseException();
+                    if (n > 0) {
+                        // connection established
+                        localAddress = Net.localAddress(fd);
                         state = ST_CONNECTED;
-                        if (isOpen())
-                            localAddress = Net.localAddress(fd);
+                        return true;
+                    } else {
+                        // connection still pending
+                        assert !blocking;
+                        return false;
                     }
-                    return true;
                 }
-                return false;
             } finally {
                 writeLock.unlock();
             }
@@ -816,18 +776,119 @@
         }
     }
 
+    /**
+     * Invoked by implCloseChannel to close the channel.
+     *
+     * This method waits for outstanding I/O operations to complete. When in
+     * blocking mode, the socket is pre-closed and the threads in blocking I/O
+     * operations are signalled to ensure that the outstanding I/O operations
+     * complete quickly.
+     *
+     * If the socket is connected then it is shutdown by this method. The
+     * shutdown ensures that the peer reads EOF for the case that the socket is
+     * not pre-closed or closed by this method.
+     *
+     * The socket is closed by this method when it is not registered with a
+     * Selector. Note that a channel configured blocking may be registered with
+     * a Selector. This arises when a key is canceled and the channel configured
+     * to blocking mode before the key is flushed from the Selector.
+     */
+    @Override
+    protected void implCloseSelectableChannel() throws IOException {
+        assert !isOpen();
+
+        boolean blocking;
+        boolean connected;
+        boolean interrupted = false;
+
+        // set state to ST_CLOSING
+        synchronized (stateLock) {
+            assert state < ST_CLOSING;
+            blocking = isBlocking();
+            connected = (state == ST_CONNECTED);
+            state = ST_CLOSING;
+        }
+
+        // wait for any outstanding I/O operations to complete
+        if (blocking) {
+            synchronized (stateLock) {
+                assert state == ST_CLOSING;
+                long reader = readerThread;
+                long writer = writerThread;
+                if (reader != 0 || writer != 0) {
+                    nd.preClose(fd);
+                    connected = false; // fd is no longer connected socket
+
+                    if (reader != 0)
+                        NativeThread.signal(reader);
+                    if (writer != 0)
+                        NativeThread.signal(writer);
+
+                    // wait for blocking I/O operations to end
+                    while (readerThread != 0 || writerThread != 0) {
+                        try {
+                            stateLock.wait();
+                        } catch (InterruptedException e) {
+                            interrupted = true;
+                        }
+                    }
+                }
+            }
+        } else {
+            // non-blocking mode: wait for read/write to complete
+            readLock.lock();
+            try {
+                writeLock.lock();
+                writeLock.unlock();
+            } finally {
+                readLock.unlock();
+            }
+        }
+
+        // set state to ST_KILLPENDING
+        synchronized (stateLock) {
+            assert state == ST_CLOSING;
+            // if connected, and the channel is registered with a Selector, we
+            // shutdown the output so that the peer reads EOF
+            if (connected && isRegistered()) {
+                try {
+                    Net.shutdown(fd, Net.SHUT_WR);
+                } catch (IOException ignore) { }
+            }
+            state = ST_KILLPENDING;
+        }
+
+        // close socket if not registered with Selector
+        if (!isRegistered())
+            kill();
+
+        // restore interrupt status
+        if (interrupted)
+            Thread.currentThread().interrupt();
+    }
+
+    @Override
+    public void kill() throws IOException {
+        synchronized (stateLock) {
+            if (state == ST_KILLPENDING) {
+                state = ST_KILLED;
+                nd.close(fd);
+            }
+        }
+    }
+
     @Override
     public SocketChannel shutdownInput() throws IOException {
         synchronized (stateLock) {
-            if (!isOpen())
-                throw new ClosedChannelException();
+            ensureOpen();
             if (!isConnected())
                 throw new NotYetConnectedException();
-            if (isInputOpen) {
+            if (!isInputClosed) {
                 Net.shutdown(fd, Net.SHUT_RD);
-                if (readerThread != 0)
-                    NativeThread.signal(readerThread);
-                isInputOpen = false;
+                long thread = readerThread;
+                if (thread != 0)
+                    NativeThread.signal(thread);
+                isInputClosed = true;
             }
             return this;
         }
@@ -836,94 +897,78 @@
     @Override
     public SocketChannel shutdownOutput() throws IOException {
         synchronized (stateLock) {
-            if (!isOpen())
-                throw new ClosedChannelException();
+            ensureOpen();
             if (!isConnected())
                 throw new NotYetConnectedException();
-            if (isOutputOpen) {
+            if (!isOutputClosed) {
                 Net.shutdown(fd, Net.SHUT_WR);
-                if (writerThread != 0)
-                    NativeThread.signal(writerThread);
-                isOutputOpen = false;
+                long thread = writerThread;
+                if (thread != 0)
+                    NativeThread.signal(thread);
+                isOutputClosed = true;
             }
             return this;
         }
     }
 
-    public boolean isInputOpen() {
-        synchronized (stateLock) {
-            return isInputOpen;
-        }
+    boolean isInputOpen() {
+        return !isInputClosed;
+    }
+
+    boolean isOutputOpen() {
+        return !isOutputClosed;
     }
 
-    public boolean isOutputOpen() {
-        synchronized (stateLock) {
-            return isOutputOpen;
+    /**
+     * Poll this channel's socket for reading up to the given timeout.
+     * @return {@code true} if the socket is polled
+     */
+    boolean pollRead(long timeout) throws IOException {
+        boolean blocking = isBlocking();
+        assert Thread.holdsLock(blockingLock()) && blocking;
+
+        readLock.lock();
+        try {
+            boolean polled = false;
+            try {
+                beginRead(blocking);
+                int n = Net.poll(fd, Net.POLLIN, timeout);
+                polled = (n > 0);
+            } finally {
+                endRead(blocking, polled);
+            }
+            return polled;
+        } finally {
+            readLock.unlock();
         }
     }
 
-    // AbstractInterruptibleChannel synchronizes invocations of this method
-    // using AbstractInterruptibleChannel.closeLock, and also ensures that this
-    // method is only ever invoked once.  Before we get to this method, isOpen
-    // (which is volatile) will have been set to false.
-    //
-    protected void implCloseSelectableChannel() throws IOException {
-        synchronized (stateLock) {
-            isInputOpen = false;
-            isOutputOpen = false;
-
-            // Close the underlying file descriptor and dup it to a known fd
-            // that's already closed.  This prevents other operations on this
-            // channel from using the old fd, which might be recycled in the
-            // meantime and allocated to an entirely different channel.
-            //
-            if (state != ST_KILLED)
-                nd.preClose(fd);
-
-            // Signal native threads, if needed.  If a target thread is not
-            // currently blocked in an I/O operation then no harm is done since
-            // the signal handler doesn't actually do anything.
-            //
-            if (readerThread != 0)
-                NativeThread.signal(readerThread);
-
-            if (writerThread != 0)
-                NativeThread.signal(writerThread);
+    /**
+     * Poll this channel's socket for a connection, up to the given timeout.
+     * @return {@code true} if the socket is polled
+     */
+    boolean pollConnected(long timeout) throws IOException {
+        boolean blocking = isBlocking();
+        assert Thread.holdsLock(blockingLock()) && blocking;
 
-            // If this channel is not registered then it's safe to close the fd
-            // immediately since we know at this point that no thread is
-            // blocked in an I/O operation upon the channel and, since the
-            // channel is marked closed, no thread will start another such
-            // operation.  If this channel is registered then we don't close
-            // the fd since it might be in use by a selector.  In that case
-            // closing this channel caused its keys to be cancelled, so the
-            // last selector to deregister a key for this channel will invoke
-            // kill() to close the fd.
-            //
-            if (!isRegistered())
-                kill();
-        }
-    }
-
-    public void kill() throws IOException {
-        synchronized (stateLock) {
-            if (state == ST_KILLED)
-                return;
-            if (state == ST_UNINITIALIZED) {
-                state = ST_KILLED;
-                return;
+        readLock.lock();
+        try {
+            writeLock.lock();
+            try {
+                boolean polled = false;
+                try {
+                    beginFinishConnect(blocking);
+                    int n = Net.poll(fd, Net.POLLCONN, timeout);
+                    polled = (n > 0);
+                } finally {
+                    endFinishConnect(blocking, polled);
+                }
+                return polled;
+            } finally {
+                writeLock.unlock();
             }
-            assert !isOpen() && !isRegistered();
-
-            // Postpone the kill if there is a waiting reader
-            // or writer thread. See the comments in read() for
-            // more detailed explanation.
-            if (readerThread == 0 && writerThread == 0) {
-                nd.close(fd);
-                state = ST_KILLED;
-            } else {
-                state = ST_KILLPENDING;
-            }
+        } finally {
+            readLock.unlock();
         }
     }
 
@@ -956,7 +1001,7 @@
 
         if (((ops & Net.POLLCONN) != 0) &&
             ((intOps & SelectionKey.OP_CONNECT) != 0) &&
-            ((state == ST_UNCONNECTED) || (state == ST_PENDING))) {
+            ((state == ST_UNCONNECTED) || (state == ST_CONNECTIONPENDING))) {
             newOps |= SelectionKey.OP_CONNECT;
         }
 
@@ -977,31 +1022,6 @@
         return translateReadyOps(ops, 0, sk);
     }
 
-    // package-private
-    int poll(int events, long timeout) throws IOException {
-        assert Thread.holdsLock(blockingLock()) && !isBlocking();
-
-        readLock.lock();
-        try {
-            int n = 0;
-            try {
-                begin();
-                synchronized (stateLock) {
-                    if (!isOpen())
-                        return 0;
-                    readerThread = NativeThread.current();
-                }
-                n = Net.poll(fd, events, timeout);
-            } finally {
-                readerCleanup();
-                end(n > 0);
-            }
-            return n;
-        } finally {
-            readLock.unlock();
-        }
-    }
-
     /**
      * Translates an interest operation set into a native poll event set
      */
@@ -1037,14 +1057,14 @@
                 case ST_UNCONNECTED:
                     sb.append("unconnected");
                     break;
-                case ST_PENDING:
+                case ST_CONNECTIONPENDING:
                     sb.append("connection-pending");
                     break;
                 case ST_CONNECTED:
                     sb.append("connected");
-                    if (!isInputOpen)
+                    if (isInputClosed)
                         sb.append(" ishut");
-                    if (!isOutputOpen)
+                    if (isOutputClosed)
                         sb.append(" oshut");
                     break;
                 }