src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java
changeset 49001 ce06058197a4
parent 48761 74c1fa26435a
child 49284 a51ca91c2cde
--- a/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java	Tue Feb 27 23:11:26 2018 -0800
+++ b/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java	Wed Feb 28 09:54:38 2018 +0000
@@ -41,15 +41,17 @@
 import java.net.StandardSocketOptions;
 import java.nio.ByteBuffer;
 import java.nio.channels.AlreadyBoundException;
+import java.nio.channels.AlreadyConnectedException;
+import java.nio.channels.AsynchronousCloseException;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.DatagramChannel;
 import java.nio.channels.MembershipKey;
 import java.nio.channels.NotYetConnectedException;
 import java.nio.channels.SelectionKey;
-import java.nio.channels.UnsupportedAddressTypeException;
 import java.nio.channels.spi.SelectorProvider;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -64,21 +66,16 @@
     extends DatagramChannel
     implements SelChImpl
 {
-
     // Used to make native read and write calls
     private static NativeDispatcher nd = new DatagramDispatcher();
 
+    // The protocol family of the socket
+    private final ProtocolFamily family;
+
     // Our file descriptor
     private final FileDescriptor fd;
     private final int fdVal;
 
-    // The protocol family of the socket
-    private final ProtocolFamily family;
-
-    // IDs of native threads doing reads and writes, for signalling
-    private volatile long readerThread;
-    private volatile long writerThread;
-
     // Cached InetAddress and port for unconnected DatagramChannels
     // used by receive0
     private InetAddress cachedSenderInetAddress;
@@ -97,13 +94,18 @@
     // -- The following fields are protected by stateLock
 
     // State (does not necessarily increase monotonically)
-    private static final int ST_UNINITIALIZED = -1;
     private static final int ST_UNCONNECTED = 0;
     private static final int ST_CONNECTED = 1;
-    private static final int ST_KILLED = 2;
-    private int state = ST_UNINITIALIZED;
+    private static final int ST_CLOSING = 2;
+    private static final int ST_KILLPENDING = 3;
+    private static final int ST_KILLED = 4;
+    private int state;
 
-    // Binding
+    // IDs of native threads doing reads and writes, for signalling
+    private long readerThread;
+    private long writerThread;
+
+    // Binding and remote address (when connected)
     private InetSocketAddress localAddress;
     private InetSocketAddress remoteAddress;
 
@@ -127,11 +129,11 @@
         super(sp);
         ResourceManager.beforeUdpCreate();
         try {
-            this.family = Net.isIPv6Available() ?
-                StandardProtocolFamily.INET6 : StandardProtocolFamily.INET;
+            this.family = Net.isIPv6Available()
+                    ? StandardProtocolFamily.INET6
+                    : StandardProtocolFamily.INET;
             this.fd = Net.socket(family, false);
             this.fdVal = IOUtil.fdVal(fd);
-            this.state = ST_UNCONNECTED;
         } catch (IOException ioe) {
             ResourceManager.afterUdpClose();
             throw ioe;
@@ -142,13 +144,10 @@
         throws IOException
     {
         super(sp);
+        Objects.requireNonNull(family, "'family' is null");
         if ((family != StandardProtocolFamily.INET) &&
-            (family != StandardProtocolFamily.INET6))
-        {
-            if (family == null)
-                throw new NullPointerException("'family' is null");
-            else
-                throw new UnsupportedOperationException("Protocol family not supported");
+            (family != StandardProtocolFamily.INET6)) {
+            throw new UnsupportedOperationException("Protocol family not supported");
         }
         if (family == StandardProtocolFamily.INET6) {
             if (!Net.isIPv6Available()) {
@@ -161,7 +160,6 @@
             this.family = family;
             this.fd = Net.socket(family, false);
             this.fdVal = IOUtil.fdVal(fd);
-            this.state = ST_UNCONNECTED;
         } catch (IOException ioe) {
             ResourceManager.afterUdpClose();
             throw ioe;
@@ -176,14 +174,23 @@
         // increment UDP count to match decrement when closing
         ResourceManager.beforeUdpCreate();
 
-        this.family = Net.isIPv6Available() ?
-            StandardProtocolFamily.INET6 : StandardProtocolFamily.INET;
+        this.family = Net.isIPv6Available()
+                ? StandardProtocolFamily.INET6
+                : StandardProtocolFamily.INET;
         this.fd = fd;
         this.fdVal = IOUtil.fdVal(fd);
-        this.state = ST_UNCONNECTED;
-        this.localAddress = Net.localAddress(fd);
+        synchronized (stateLock) {
+            this.localAddress = Net.localAddress(fd);
+        }
     }
 
+    // @throws ClosedChannelException if channel is closed
+    private void ensureOpen() throws ClosedChannelException {
+        if (!isOpen())
+            throw new ClosedChannelException();
+    }
+
+    @Override
     public DatagramSocket socket() {
         synchronized (stateLock) {
             if (socket == null)
@@ -195,8 +202,7 @@
     @Override
     public SocketAddress getLocalAddress() throws IOException {
         synchronized (stateLock) {
-            if (!isOpen())
-                throw new ClosedChannelException();
+            ensureOpen();
             // Perform security check before returning address
             return Net.getRevealedLocalAddress(localAddress);
         }
@@ -205,8 +211,7 @@
     @Override
     public SocketAddress getRemoteAddress() throws IOException {
         synchronized (stateLock) {
-            if (!isOpen())
-                throw new ClosedChannelException();
+            ensureOpen();
             return remoteAddress;
         }
     }
@@ -215,8 +220,7 @@
     public <T> DatagramChannel setOption(SocketOption<T> name, T value)
         throws IOException
     {
-        if (name == null)
-            throw new NullPointerException();
+        Objects.requireNonNull(name);
         if (!supportedOptions().contains(name))
             throw new UnsupportedOperationException("'" + name + "' not supported");
 
@@ -251,9 +255,8 @@
                 }
                 return this;
             }
-            if (name == StandardSocketOptions.SO_REUSEADDR &&
-                    Net.useExclusiveBind() && localAddress != null)
-            {
+            if (name == StandardSocketOptions.SO_REUSEADDR
+                && Net.useExclusiveBind() && localAddress != null) {
                 reuseAddressEmulated = true;
                 this.isReuseAddress = (Boolean)value;
             }
@@ -269,8 +272,7 @@
     public <T> T getOption(SocketOption<T> name)
         throws IOException
     {
-        if (name == null)
-            throw new NullPointerException();
+        Objects.requireNonNull(name);
         if (!supportedOptions().contains(name))
             throw new UnsupportedOperationException("'" + name + "' not supported");
 
@@ -307,9 +309,7 @@
                 }
             }
 
-            if (name == StandardSocketOptions.SO_REUSEADDR &&
-                    reuseAddressEmulated)
-            {
+            if (name == StandardSocketOptions.SO_REUSEADDR && reuseAddressEmulated) {
                 return (T)Boolean.valueOf(isReuseAddress);
             }
 
@@ -322,7 +322,7 @@
         static final Set<SocketOption<?>> defaultOptions = defaultOptions();
 
         private static Set<SocketOption<?>> defaultOptions() {
-            HashSet<SocketOption<?>> set = new HashSet<>(8);
+            HashSet<SocketOption<?>> set = new HashSet<>();
             set.add(StandardSocketOptions.SO_SNDBUF);
             set.add(StandardSocketOptions.SO_RCVBUF);
             set.add(StandardSocketOptions.SO_REUSEADDR);
@@ -334,9 +334,7 @@
             set.add(StandardSocketOptions.IP_MULTICAST_IF);
             set.add(StandardSocketOptions.IP_MULTICAST_TTL);
             set.add(StandardSocketOptions.IP_MULTICAST_LOOP);
-            ExtendedSocketOptions extendedOptions =
-                    ExtendedSocketOptions.getInstance();
-            set.addAll(extendedOptions.options());
+            set.addAll(ExtendedSocketOptions.getInstance().options());
             return Collections.unmodifiableSet(set);
         }
     }
@@ -346,33 +344,78 @@
         return DefaultOptionsHolder.defaultOptions;
     }
 
-    private void ensureOpen() throws ClosedChannelException {
-        if (!isOpen())
-            throw new ClosedChannelException();
+    /**
+     * Marks the beginning of a read operation that might block.
+     *
+     * @param blocking true if configured blocking
+     * @param mustBeConnected true if the socket must be connected
+     * @return remote address if connected
+     * @throws ClosedChannelException if the channel is closed
+     * @throws NotYetConnectedException if mustBeConnected and not connected
+     * @throws IOException if socket not bound and cannot be bound
+     */
+    private SocketAddress beginRead(boolean blocking, boolean mustBeConnected)
+        throws IOException
+    {
+        if (blocking) {
+            // set hook for Thread.interrupt
+            begin();
+        }
+        SocketAddress remote;
+        synchronized (stateLock) {
+            ensureOpen();
+            remote = remoteAddress;
+            if ((remote == null) && mustBeConnected)
+                throw new NotYetConnectedException();
+            if (localAddress == null)
+                bindInternal(null);
+            if (blocking)
+                readerThread = NativeThread.current();
+        }
+        return remote;
+    }
+
+    /**
+     * Marks the end of a read operation that may have blocked.
+     *
+     * @throws AsynchronousCloseException if the channel was closed asynchronously
+     */
+    private void endRead(boolean blocking, boolean completed)
+        throws AsynchronousCloseException
+    {
+        if (blocking) {
+            synchronized (stateLock) {
+                readerThread = 0;
+                // notify any thread waiting in implCloseSelectableChannel
+                if (state == ST_CLOSING) {
+                    stateLock.notifyAll();
+                }
+            }
+            // remove hook for Thread.interrupt
+            end(completed);
+        }
     }
 
     private SocketAddress sender;       // Set by receive0 (## ugh)
 
+    @Override
     public SocketAddress receive(ByteBuffer dst) throws IOException {
         if (dst.isReadOnly())
             throw new IllegalArgumentException("Read-only buffer");
+
         readLock.lock();
         try {
-            ensureOpen();
-            // Socket was not bound before attempting receive
-            if (localAddress() == null)
-                bind(null);
+            boolean blocking = isBlocking();
             int n = 0;
             ByteBuffer bb = null;
             try {
-                begin();
-                if (!isOpen())
-                    return null;
-                SecurityManager security = System.getSecurityManager();
-                readerThread = NativeThread.current();
-                if (isConnected() || (security == null)) {
+                SocketAddress remote = beginRead(blocking, false);
+                boolean connected = (remote != null);
+                SecurityManager sm = System.getSecurityManager();
+                if (connected || (sm == null)) {
+                    // connected or no security manager
                     do {
-                        n = receive(fd, dst);
+                        n = receive(fd, dst, connected);
                     } while ((n == IOStatus.INTERRUPTED) && isOpen());
                     if (n == IOStatus.UNAVAILABLE)
                         return null;
@@ -382,15 +425,14 @@
                     bb = Util.getTemporaryDirectBuffer(dst.remaining());
                     for (;;) {
                         do {
-                            n = receive(fd, bb);
+                            n = receive(fd, bb, connected);
                         } while ((n == IOStatus.INTERRUPTED) && isOpen());
                         if (n == IOStatus.UNAVAILABLE)
                             return null;
                         InetSocketAddress isa = (InetSocketAddress)sender;
                         try {
-                            security.checkAccept(
-                                isa.getAddress().getHostAddress(),
-                                isa.getPort());
+                            sm.checkAccept(isa.getAddress().getHostAddress(),
+                                           isa.getPort());
                         } catch (SecurityException se) {
                             // Ignore packet
                             bb.clear();
@@ -402,12 +444,12 @@
                         break;
                     }
                 }
+                assert sender != null;
                 return sender;
             } finally {
                 if (bb != null)
                     Util.releaseTemporaryDirectBuffer(bb);
-                readerThread = 0;
-                end((n > 0) || (n == IOStatus.UNAVAILABLE));
+                endRead(blocking, n > 0);
                 assert IOStatus.check(n);
             }
         } finally {
@@ -415,7 +457,7 @@
         }
     }
 
-    private int receive(FileDescriptor fd, ByteBuffer dst)
+    private int receive(FileDescriptor fd, ByteBuffer dst, boolean connected)
         throws IOException
     {
         int pos = dst.position();
@@ -423,7 +465,7 @@
         assert (pos <= lim);
         int rem = (pos <= lim ? lim - pos : 0);
         if (dst instanceof DirectBuffer && rem > 0)
-            return receiveIntoNativeBuffer(fd, dst, rem, pos);
+            return receiveIntoNativeBuffer(fd, dst, rem, pos, connected);
 
         // Substitute a native buffer. If the supplied buffer is empty
         // we must instead use a nonempty buffer, otherwise the call
@@ -431,7 +473,7 @@
         int newSize = Math.max(rem, 1);
         ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize);
         try {
-            int n = receiveIntoNativeBuffer(fd, bb, newSize, 0);
+            int n = receiveIntoNativeBuffer(fd, bb, newSize, 0, connected);
             bb.flip();
             if (n > 0 && rem > 0)
                 dst.put(bb);
@@ -442,11 +484,10 @@
     }
 
     private int receiveIntoNativeBuffer(FileDescriptor fd, ByteBuffer bb,
-                                        int rem, int pos)
+                                        int rem, int pos, boolean connected)
         throws IOException
     {
-        int n = receive0(fd, ((DirectBuffer)bb).address() + pos, rem,
-                         isConnected());
+        int n = receive0(fd, ((DirectBuffer)bb).address() + pos, rem, connected);
         if (n > 0)
             bb.position(pos + n);
         return n;
@@ -455,59 +496,44 @@
     public int send(ByteBuffer src, SocketAddress target)
         throws IOException
     {
-        if (src == null)
-            throw new NullPointerException();
+        Objects.requireNonNull(src);
+        InetSocketAddress isa = Net.checkAddress(target, family);
 
         writeLock.lock();
         try {
-            ensureOpen();
-            InetSocketAddress isa = Net.checkAddress(target);
-            InetAddress ia = isa.getAddress();
-            if (ia == null)
-                throw new IOException("Target address not resolved");
-            synchronized (stateLock) {
-                if (!isConnected()) {
-                    if (target == null)
-                        throw new NullPointerException();
+            boolean blocking = isBlocking();
+            int n = 0;
+            try {
+                SocketAddress remote = beginWrite(blocking, false);
+                if (remote != null) {
+                    // connected
+                    if (!target.equals(remote)) {
+                        throw new IllegalArgumentException(
+                            "Connected address not equal to target address");
+                    }
+                    do {
+                        n = IOUtil.write(fd, src, -1, nd);
+                    } while ((n == IOStatus.INTERRUPTED) && isOpen());
+                } else {
+                    // not connected
                     SecurityManager sm = System.getSecurityManager();
                     if (sm != null) {
+                        InetAddress ia = isa.getAddress();
                         if (ia.isMulticastAddress()) {
                             sm.checkMulticast(ia);
                         } else {
-                            sm.checkConnect(ia.getHostAddress(),
-                                            isa.getPort());
+                            sm.checkConnect(ia.getHostAddress(), isa.getPort());
                         }
                     }
-                } else { // Connected case; Check address then write
-                    if (!target.equals(remoteAddress)) {
-                        throw new IllegalArgumentException(
-                            "Connected address not equal to target address");
-                    }
-                    return write(src);
+                    do {
+                        n = send(fd, src, isa);
+                    } while ((n == IOStatus.INTERRUPTED) && isOpen());
                 }
-            }
-
-            int n = 0;
-            try {
-                begin();
-                if (!isOpen())
-                    return 0;
-                writerThread = NativeThread.current();
-                do {
-                    n = send(fd, src, isa);
-                } while ((n == IOStatus.INTERRUPTED) && isOpen());
-
-                synchronized (stateLock) {
-                    if (isOpen() && (localAddress == null)) {
-                        localAddress = Net.localAddress(fd);
-                    }
-                }
-                return IOStatus.normalize(n);
             } finally {
-                writerThread = 0;
-                end((n > 0) || (n == IOStatus.UNAVAILABLE));
+                endWrite(blocking, n > 0);
                 assert IOStatus.check(n);
             }
+            return IOStatus.normalize(n);
         } finally {
             writeLock.unlock();
         }
@@ -567,141 +593,180 @@
         return written;
     }
 
+    @Override
     public int read(ByteBuffer buf) throws IOException {
-        if (buf == null)
-            throw new NullPointerException();
+        Objects.requireNonNull(buf);
+
         readLock.lock();
         try {
-            synchronized (stateLock) {
-                ensureOpen();
-                if (!isConnected())
-                    throw new NotYetConnectedException();
-            }
+            boolean blocking = isBlocking();
             int n = 0;
             try {
-                begin();
-                if (!isOpen())
-                    return 0;
-                readerThread = NativeThread.current();
+                beginRead(blocking, true);
                 do {
                     n = IOUtil.read(fd, buf, -1, nd);
                 } while ((n == IOStatus.INTERRUPTED) && isOpen());
-                return IOStatus.normalize(n);
+
             } finally {
-                readerThread = 0;
-                end((n > 0) || (n == IOStatus.UNAVAILABLE));
+                endRead(blocking, n > 0);
                 assert IOStatus.check(n);
             }
+            return IOStatus.normalize(n);
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    @Override
+    public long read(ByteBuffer[] dsts, int offset, int length)
+        throws IOException
+    {
+        Objects.checkFromIndexSize(offset, length, dsts.length);
+
+        readLock.lock();
+        try {
+            boolean blocking = isBlocking();
+            long n = 0;
+            try {
+                beginRead(blocking, true);
+                do {
+                    n = IOUtil.read(fd, dsts, offset, length, nd);
+                } while ((n == IOStatus.INTERRUPTED) && isOpen());
+
+            } finally {
+                endRead(blocking, n > 0);
+                assert IOStatus.check(n);
+            }
+            return IOStatus.normalize(n);
         } finally {
             readLock.unlock();
         }
     }
 
-    public long read(ByteBuffer[] dsts, int offset, int length)
+    /**
+     * Marks the beginning of a write operation that might block.
+     * @param blocking true if configured blocking
+     * @param mustBeConnected true if the socket must be connected
+     * @return remote address if connected
+     * @throws ClosedChannelException if the channel is closed
+     * @throws NotYetConnectedException if mustBeConnected and not connected
+     * @throws IOException if socket not bound and cannot be bound
+     */
+    private SocketAddress beginWrite(boolean blocking, boolean mustBeConnected)
         throws IOException
     {
-        if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
-            throw new IndexOutOfBoundsException();
+        if (blocking) {
+            // set hook for Thread.interrupt
+            begin();
+        }
+        SocketAddress remote;
+        synchronized (stateLock) {
+            ensureOpen();
+            remote = remoteAddress;
+            if ((remote == null) && mustBeConnected)
+                throw new NotYetConnectedException();
+            if (localAddress == null)
+                bindInternal(null);
+            if (blocking)
+                writerThread = NativeThread.current();
+        }
+        return remote;
+    }
+
+    /**
+     * Marks the end of a write operation that may have blocked.
+     *
+     * @throws AsynchronousCloseException if the channel was closed asynchronously
+     */
+    private void endWrite(boolean blocking, boolean completed)
+        throws AsynchronousCloseException
+    {
+        if (blocking) {
+            synchronized (stateLock) {
+                writerThread = 0;
+                // notify any thread waiting in implCloseSelectableChannel
+                if (state == ST_CLOSING) {
+                    stateLock.notifyAll();
+                }
+            }
+            // remove hook for Thread.interrupt
+            end(completed);
+        }
+    }
+
+    @Override
+    public int write(ByteBuffer buf) throws IOException {
+        Objects.requireNonNull(buf);
+
+        writeLock.lock();
+        try {
+            boolean blocking = isBlocking();
+            int n = 0;
+            try {
+                beginWrite(blocking, true);
+                do {
+                    n = IOUtil.write(fd, buf, -1, nd);
+                } while ((n == IOStatus.INTERRUPTED) && isOpen());
+            } finally {
+                endWrite(blocking, n > 0);
+                assert IOStatus.check(n);
+            }
+            return IOStatus.normalize(n);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    @Override
+    public long write(ByteBuffer[] srcs, int offset, int length)
+        throws IOException
+    {
+        Objects.checkFromIndexSize(offset, length, srcs.length);
+
+        writeLock.lock();
+        try {
+            boolean blocking = isBlocking();
+            long n = 0;
+            try {
+                beginWrite(blocking, true);
+                do {
+                    n = IOUtil.write(fd, srcs, offset, length, nd);
+                } while ((n == IOStatus.INTERRUPTED) && isOpen());
+            } finally {
+                endWrite(blocking, n > 0);
+                assert IOStatus.check(n);
+            }
+            return IOStatus.normalize(n);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    @Override
+    protected void implConfigureBlocking(boolean block) throws IOException {
         readLock.lock();
         try {
-            synchronized (stateLock) {
-                ensureOpen();
-                if (!isConnected())
-                    throw new NotYetConnectedException();
-            }
-            long n = 0;
+            writeLock.lock();
             try {
-                begin();
-                if (!isOpen())
-                    return 0;
-                readerThread = NativeThread.current();
-                do {
-                    n = IOUtil.read(fd, dsts, offset, length, nd);
-                } while ((n == IOStatus.INTERRUPTED) && isOpen());
-                return IOStatus.normalize(n);
+                synchronized (stateLock) {
+                    ensureOpen();
+                    IOUtil.configureBlocking(fd, block);
+                }
             } finally {
-                readerThread = 0;
-                end((n > 0) || (n == IOStatus.UNAVAILABLE));
-                assert IOStatus.check(n);
+                writeLock.unlock();
             }
         } finally {
             readLock.unlock();
         }
     }
 
-    public int write(ByteBuffer buf) throws IOException {
-        if (buf == null)
-            throw new NullPointerException();
-        writeLock.lock();
-        try {
-            synchronized (stateLock) {
-                ensureOpen();
-                if (!isConnected())
-                    throw new NotYetConnectedException();
-            }
-            int n = 0;
-            try {
-                begin();
-                if (!isOpen())
-                    return 0;
-                writerThread = NativeThread.current();
-                do {
-                    n = IOUtil.write(fd, buf, -1, nd);
-                } while ((n == IOStatus.INTERRUPTED) && isOpen());
-                return IOStatus.normalize(n);
-            } finally {
-                writerThread = 0;
-                end((n > 0) || (n == IOStatus.UNAVAILABLE));
-                assert IOStatus.check(n);
-            }
-        } finally {
-            writeLock.unlock();
-        }
-    }
-
-    public long write(ByteBuffer[] srcs, int offset, int length)
-        throws IOException
-    {
-        if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
-            throw new IndexOutOfBoundsException();
-        writeLock.lock();
-        try {
-            synchronized (stateLock) {
-                ensureOpen();
-                if (!isConnected())
-                    throw new NotYetConnectedException();
-            }
-            long n = 0;
-            try {
-                begin();
-                if (!isOpen())
-                    return 0;
-                writerThread = NativeThread.current();
-                do {
-                    n = IOUtil.write(fd, srcs, offset, length, nd);
-                } while ((n == IOStatus.INTERRUPTED) && isOpen());
-                return IOStatus.normalize(n);
-            } finally {
-                writerThread = 0;
-                end((n > 0) || (n == IOStatus.UNAVAILABLE));
-                assert IOStatus.check(n);
-            }
-        } finally {
-            writeLock.unlock();
-        }
-    }
-
-    protected void implConfigureBlocking(boolean block) throws IOException {
-        IOUtil.configureBlocking(fd, block);
-    }
-
-    public SocketAddress localAddress() {
+    InetSocketAddress localAddress() {
         synchronized (stateLock) {
             return localAddress;
         }
     }
 
-    public SocketAddress remoteAddress() {
+    InetSocketAddress remoteAddress() {
         synchronized (stateLock) {
             return remoteAddress;
         }
@@ -717,30 +782,7 @@
                     ensureOpen();
                     if (localAddress != null)
                         throw new AlreadyBoundException();
-                    InetSocketAddress isa;
-                    if (local == null) {
-                        // only Inet4Address allowed with IPv4 socket
-                        if (family == StandardProtocolFamily.INET) {
-                            isa = new InetSocketAddress(InetAddress.getByName("0.0.0.0"), 0);
-                        } else {
-                            isa = new InetSocketAddress(0);
-                        }
-                    } else {
-                        isa = Net.checkAddress(local);
-
-                        // only Inet4Address allowed with IPv4 socket
-                        if (family == StandardProtocolFamily.INET) {
-                            InetAddress addr = isa.getAddress();
-                            if (!(addr instanceof Inet4Address))
-                                throw new UnsupportedAddressTypeException();
-                        }
-                    }
-                    SecurityManager sm = System.getSecurityManager();
-                    if (sm != null) {
-                        sm.checkListen(isa.getPort());
-                    }
-                    Net.bind(family, fd, isa.getAddress(), isa.getPort());
-                    localAddress = Net.localAddress(fd);
+                    bindInternal(local);
                 }
             } finally {
                 writeLock.unlock();
@@ -751,34 +793,58 @@
         return this;
     }
 
+    private void bindInternal(SocketAddress local) throws IOException {
+        assert Thread.holdsLock(stateLock) && (localAddress == null);
+
+        InetSocketAddress isa;
+        if (local == null) {
+            // only Inet4Address allowed with IPv4 socket
+            if (family == StandardProtocolFamily.INET) {
+                isa = new InetSocketAddress(InetAddress.getByName("0.0.0.0"), 0);
+            } else {
+                isa = new InetSocketAddress(0);
+            }
+        } else {
+            isa = Net.checkAddress(local, family);
+        }
+        SecurityManager sm = System.getSecurityManager();
+        if (sm != null)
+            sm.checkListen(isa.getPort());
+
+        Net.bind(family, fd, isa.getAddress(), isa.getPort());
+        localAddress = Net.localAddress(fd);
+    }
+
+    @Override
     public boolean isConnected() {
         synchronized (stateLock) {
             return (state == ST_CONNECTED);
         }
     }
 
-    void ensureOpenAndUnconnected() throws IOException { // package-private
-        synchronized (stateLock) {
-            if (!isOpen())
-                throw new ClosedChannelException();
-            if (state != ST_UNCONNECTED)
-                throw new IllegalStateException("Connect already invoked");
-        }
-    }
-
     @Override
     public DatagramChannel connect(SocketAddress sa) throws IOException {
+        InetSocketAddress isa = Net.checkAddress(sa, family);
+        SecurityManager sm = System.getSecurityManager();
+        if (sm != null) {
+            InetAddress ia = isa.getAddress();
+            if (ia.isMulticastAddress()) {
+                sm.checkMulticast(ia);
+            } else {
+                sm.checkConnect(ia.getHostAddress(), isa.getPort());
+                sm.checkAccept(ia.getHostAddress(), isa.getPort());
+            }
+        }
+
         readLock.lock();
         try {
             writeLock.lock();
             try {
                 synchronized (stateLock) {
-                    ensureOpenAndUnconnected();
-                    InetSocketAddress isa = Net.checkAddress(sa);
-                    SecurityManager sm = System.getSecurityManager();
-                    if (sm != null)
-                        sm.checkConnect(isa.getAddress().getHostAddress(),
-                                        isa.getPort());
+                    ensureOpen();
+                    if (state == ST_CONNECTED)
+                        throw new AlreadyConnectedException();
+
                     int n = Net.connect(family,
                                         fd,
                                         isa.getAddress(),
@@ -786,31 +852,26 @@
                     if (n <= 0)
                         throw new Error();      // Can't happen
 
-                    // Connection succeeded; disallow further invocation
-                    state = ST_CONNECTED;
+                    // connected
                     remoteAddress = isa;
-                    sender = isa;
-                    cachedSenderInetAddress = isa.getAddress();
-                    cachedSenderPort = isa.getPort();
+                    state = ST_CONNECTED;
 
-                    // set or refresh local address
+                    // refresh local address
                     localAddress = Net.localAddress(fd);
 
                     // flush any packets already received.
-                    synchronized (blockingLock()) {
-                        boolean blocking = isBlocking();
-                        try {
-                            ByteBuffer tmpBuf = ByteBuffer.allocate(100);
-                            if (blocking) {
-                                configureBlocking(false);
-                            }
-                            do {
-                                tmpBuf.clear();
-                            } while (receive(tmpBuf) != null);
-                        } finally {
-                            if (blocking) {
-                                configureBlocking(true);
-                            }
+                    boolean blocking = isBlocking();
+                    if (blocking) {
+                        IOUtil.configureBlocking(fd, false);
+                    }
+                    try {
+                        ByteBuffer buf = ByteBuffer.allocate(100);
+                        while (receive(buf) != null) {
+                            buf.clear();
+                        }
+                    } finally {
+                        if (blocking) {
+                            IOUtil.configureBlocking(fd, true);
                         }
                     }
                 }
@@ -823,21 +884,21 @@
         return this;
     }
 
+    @Override
     public DatagramChannel disconnect() throws IOException {
         readLock.lock();
         try {
             writeLock.lock();
             try {
                 synchronized (stateLock) {
-                    if (!isConnected() || !isOpen())
+                    if (!isOpen() || (state != ST_CONNECTED))
                         return this;
-                    InetSocketAddress isa = remoteAddress;
-                    SecurityManager sm = System.getSecurityManager();
-                    if (sm != null)
-                        sm.checkConnect(isa.getAddress().getHostAddress(),
-                                        isa.getPort());
+
+                    // disconnect socket
                     boolean isIPv6 = (family == StandardProtocolFamily.INET6);
                     disconnect0(fd, isIPv6);
+
+                    // no longer connected
                     remoteAddress = null;
                     state = ST_UNCONNECTED;
 
@@ -891,8 +952,7 @@
             sm.checkMulticast(group);
 
         synchronized (stateLock) {
-            if (!isOpen())
-                throw new ClosedChannelException();
+            ensureOpen();
 
             // check the registry to see if we are already a member of the group
             if (registry == null) {
@@ -963,8 +1023,7 @@
                               InetAddress source)
         throws IOException
     {
-        if (source == null)
-            throw new NullPointerException("source address is null");
+        Objects.requireNonNull(source);
         return innerJoin(group, interf, source);
     }
 
@@ -1065,37 +1124,99 @@
         }
     }
 
+    /**
+     * Invoked by implCloseChannel to close the channel.
+     *
+     * This method waits for outstanding I/O operations to complete. When in
+     * blocking mode, the socket is pre-closed and the threads in blocking I/O
+     * operations are signalled to ensure that the outstanding I/O operations
+     * complete quickly.
+     *
+     * The socket is closed by this method when it is not registered with a
+     * Selector. Note that a channel configured blocking may be registered with
+     * a Selector. This arises when a key is canceled and the channel configured
+     * to blocking mode before the key is flushed from the Selector.
+     */
+    @Override
     protected void implCloseSelectableChannel() throws IOException {
+        assert !isOpen();
+
+        boolean blocking;
+        boolean interrupted = false;
+
+        // set state to ST_CLOSING and invalid membership keys
         synchronized (stateLock) {
-            if (state != ST_KILLED)
-                nd.preClose(fd);
-            ResourceManager.afterUdpClose();
+            assert state < ST_CLOSING;
+            blocking = isBlocking();
+            state = ST_CLOSING;
 
-            // if member of mulitcast group then invalidate all keys
+            // if member of any multicast groups then invalidate the keys
             if (registry != null)
                 registry.invalidateAll();
+        }
 
-            long th;
-            if ((th = readerThread) != 0)
-                NativeThread.signal(th);
-            if ((th = writerThread) != 0)
-                NativeThread.signal(th);
-            if (!isRegistered())
-                kill();
+        // wait for any outstanding I/O operations to complete
+        if (blocking) {
+            synchronized (stateLock) {
+                assert state == ST_CLOSING;
+                long reader = readerThread;
+                long writer = writerThread;
+                if (reader != 0 || writer != 0) {
+                    nd.preClose(fd);
+
+                    if (reader != 0)
+                        NativeThread.signal(reader);
+                    if (writer != 0)
+                        NativeThread.signal(writer);
+
+                    // wait for blocking I/O operations to end
+                    while (readerThread != 0 || writerThread != 0) {
+                        try {
+                            stateLock.wait();
+                        } catch (InterruptedException e) {
+                            interrupted = true;
+                        }
+                    }
+                }
+            }
+        } else {
+            // non-blocking mode: wait for read/write to complete
+            readLock.lock();
+            try {
+                writeLock.lock();
+                writeLock.unlock();
+            } finally {
+                readLock.unlock();
+            }
         }
+
+        // set state to ST_KILLPENDING
+        synchronized (stateLock) {
+            assert state == ST_CLOSING;
+            state = ST_KILLPENDING;
+        }
+
+        // close socket if not registered with Selector
+        if (!isRegistered())
+            kill();
+
+        // restore interrupt status
+        if (interrupted)
+            Thread.currentThread().interrupt();
     }
 
+    @Override
     public void kill() throws IOException {
         synchronized (stateLock) {
-            if (state == ST_KILLED)
-                return;
-            if (state == ST_UNINITIALIZED) {
+            if (state == ST_KILLPENDING) {
                 state = ST_KILLED;
-                return;
+                try {
+                    nd.close(fd);
+                } finally {
+                    // notify resource manager
+                    ResourceManager.afterUdpClose();
+                }
             }
-            assert !isOpen() && !isRegistered();
-            nd.close(fd);
-            state = ST_KILLED;
         }
     }
 
@@ -1148,26 +1269,25 @@
         return translateReadyOps(ops, 0, sk);
     }
 
-    // package-private
-    int poll(int events, long timeout) throws IOException {
-        assert Thread.holdsLock(blockingLock()) && !isBlocking();
+    /**
+     * Poll this channel's socket for reading up to the given timeout.
+     * @return {@code true} if the socket is polled
+     */
+    boolean pollRead(long timeout) throws IOException {
+        boolean blocking = isBlocking();
+        assert Thread.holdsLock(blockingLock()) && blocking;
 
         readLock.lock();
         try {
-            int n = 0;
+            boolean polled = false;
             try {
-                begin();
-                synchronized (stateLock) {
-                    if (!isOpen())
-                        return 0;
-                    readerThread = NativeThread.current();
-                }
-                n = Net.poll(fd, events, timeout);
+                beginRead(blocking, false);
+                int n = Net.poll(fd, Net.POLLIN, timeout);
+                polled = (n > 0);
             } finally {
-                readerThread = 0;
-                end(n > 0);
+                endRead(blocking, polled);
             }
-            return n;
+            return polled;
         } finally {
             readLock.unlock();
         }
@@ -1216,5 +1336,4 @@
         IOUtil.load();
         initIDs();
     }
-
 }