src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java
changeset 58899 5573a7098439
parent 58808 9261ad32cba9
child 59000 612c58965775
--- a/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java	Fri Nov 01 16:21:17 2019 -0400
+++ b/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java	Sat Nov 02 10:02:18 2019 +0000
@@ -28,6 +28,8 @@
 import java.io.FileDescriptor;
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
 import java.lang.ref.Cleaner.Cleanable;
 import java.net.DatagramSocket;
 import java.net.Inet4Address;
@@ -39,6 +41,7 @@
 import java.net.ProtocolFamily;
 import java.net.SocketAddress;
 import java.net.SocketOption;
+import java.net.SocketTimeoutException;
 import java.net.StandardProtocolFamily;
 import java.net.StandardSocketOptions;
 import java.nio.ByteBuffer;
@@ -47,6 +50,7 @@
 import java.nio.channels.AsynchronousCloseException;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.DatagramChannel;
+import java.nio.channels.IllegalBlockingModeException;
 import java.nio.channels.MembershipKey;
 import java.nio.channels.NotYetConnectedException;
 import java.nio.channels.SelectionKey;
@@ -113,8 +117,17 @@
     private InetSocketAddress localAddress;
     private InetSocketAddress remoteAddress;
 
-    // Our socket adaptor, if any
-    private DatagramSocket socket;
+    // Socket adaptor, created lazily
+    private static final VarHandle SOCKET;
+    static {
+        try {
+            MethodHandles.Lookup l = MethodHandles.lookup();
+            SOCKET = l.findVarHandle(DatagramChannelImpl.class, "socket", DatagramSocket.class);
+        } catch (Exception e) {
+            throw new InternalError(e);
+        }
+    }
+    private volatile DatagramSocket socket;
 
     // Multicast support
     private MembershipRegistry registry;
@@ -199,11 +212,14 @@
 
     @Override
     public DatagramSocket socket() {
-        synchronized (stateLock) {
-            if (socket == null)
-                socket = DatagramSocketAdaptor.create(this);
-            return socket;
+        DatagramSocket socket = this.socket;
+        if (socket == null) {
+            socket = DatagramSocketAdaptor.create(this);
+            if (!SOCKET.compareAndSet(this, null, socket)) {
+                socket = this.socket;
+            }
         }
+        return socket;
     }
 
     @Override
@@ -408,62 +424,35 @@
     public SocketAddress receive(ByteBuffer dst) throws IOException {
         if (dst.isReadOnly())
             throw new IllegalArgumentException("Read-only buffer");
-
         readLock.lock();
         try {
             boolean blocking = isBlocking();
+            boolean completed = false;
             int n = 0;
-            ByteBuffer bb = null;
             try {
                 SocketAddress remote = beginRead(blocking, false);
                 boolean connected = (remote != null);
                 SecurityManager sm = System.getSecurityManager();
+
                 if (connected || (sm == null)) {
                     // connected or no security manager
-                    n = receive(fd, dst, connected);
+                    n = receive(dst, connected);
                     if (blocking) {
                         while (IOStatus.okayToRetry(n) && isOpen()) {
                             park(Net.POLLIN);
-                            n = receive(fd, dst, connected);
+                            n = receive(dst, connected);
                         }
-                    } else if (n == IOStatus.UNAVAILABLE) {
-                        return null;
                     }
                 } else {
-                    // Cannot receive into user's buffer when running with a
-                    // security manager and not connected
-                    bb = Util.getTemporaryDirectBuffer(dst.remaining());
-                    for (;;) {
-                        n = receive(fd, bb, connected);
-                        if (blocking) {
-                            while (IOStatus.okayToRetry(n) && isOpen()) {
-                                park(Net.POLLIN);
-                                n = receive(fd, bb, connected);
-                            }
-                        } else if (n == IOStatus.UNAVAILABLE) {
-                            return null;
-                        }
-                        InetSocketAddress isa = (InetSocketAddress)sender;
-                        try {
-                            sm.checkAccept(isa.getAddress().getHostAddress(),
-                                           isa.getPort());
-                        } catch (SecurityException se) {
-                            // Ignore packet
-                            bb.clear();
-                            n = 0;
-                            continue;
-                        }
-                        bb.flip();
-                        dst.put(bb);
-                        break;
-                    }
+                    // security manager and unconnected
+                    n = untrustedReceive(dst);
                 }
-                assert sender != null;
+                if (n == IOStatus.UNAVAILABLE)
+                    return null;
+                completed = (n > 0) || (n == 0 && isOpen());
                 return sender;
             } finally {
-                if (bb != null)
-                    Util.releaseTemporaryDirectBuffer(bb);
-                endRead(blocking, n > 0);
+                endRead(blocking, completed);
                 assert IOStatus.check(n);
             }
         } finally {
@@ -471,15 +460,164 @@
         }
     }
 
-    private int receive(FileDescriptor fd, ByteBuffer dst, boolean connected)
+    /**
+     * Receives a datagram into an untrusted buffer. When there is a security
+     * manager set, and the socket is not connected, datagrams have to be received
+     * into a buffer that is not accessible to the user. The datagram is copied
+     * into the user's buffer when the sender address is accepted by the security
+     * manager.
+     *
+     * @return the size of the datagram or IOStatus.UNAVAILABLE
+     */
+    private int untrustedReceive(ByteBuffer dst) throws IOException {
+        SecurityManager sm = System.getSecurityManager();
+        assert readLock.isHeldByCurrentThread()
+                && sm != null && remoteAddress == null;
+
+        ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining());
+        try {
+            boolean blocking = isBlocking();
+            for (;;) {
+                int n = receive(bb, false);
+                if (blocking) {
+                    while (IOStatus.okayToRetry(n) && isOpen()) {
+                        park(Net.POLLIN);
+                        n = receive(bb, false);
+                    }
+                } else if (n == IOStatus.UNAVAILABLE) {
+                    return n;
+                }
+                InetSocketAddress isa = (InetSocketAddress) sender;
+                try {
+                    sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort());
+                    bb.flip();
+                    dst.put(bb);
+                    return n;
+                } catch (SecurityException se) {
+                    // ignore datagram
+                    bb.clear();
+                }
+            }
+        } finally {
+            Util.releaseTemporaryDirectBuffer(bb);
+        }
+    }
+
+    /**
+     * Receives a datagram into the given buffer.
+     *
+     * @apiNote This method is for use by the socket adaptor. The buffer is
+     * assumed to be trusted, meaning it is not accessible to user code.
+     *
+     * @throws IllegalBlockingModeException if the channel is non-blocking
+     * @throws SocketTimeoutException if the timeout elapses
+     */
+    SocketAddress blockingReceive(ByteBuffer dst, long nanos) throws IOException {
+        readLock.lock();
+        try {
+            ensureOpen();
+            if (!isBlocking())
+                throw new IllegalBlockingModeException();
+            SecurityManager sm = System.getSecurityManager();
+            boolean connected = isConnected();
+            SocketAddress sender;
+            do {
+                if (nanos > 0) {
+                    sender = trustedBlockingReceive(dst, nanos);
+                } else {
+                    sender = trustedBlockingReceive(dst);
+                }
+                // check sender when security manager set and not connected
+                if (sm != null && !connected) {
+                    InetSocketAddress isa = (InetSocketAddress) sender;
+                    try {
+                        sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort());
+                    } catch (SecurityException e) {
+                        sender = null;
+                    }
+                }
+            } while (sender == null);
+            return sender;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    /**
+     * Receives a datagram into given buffer. This method is used to support
+     * the socket adaptor. The buffer is assumed to be trusted.
+     * @throws SocketTimeoutException if the timeout elapses
+     */
+    private SocketAddress trustedBlockingReceive(ByteBuffer dst)
         throws IOException
     {
+        assert readLock.isHeldByCurrentThread() && isBlocking();
+        boolean completed = false;
+        int n = 0;
+        try {
+            SocketAddress remote = beginRead(true, false);
+            boolean connected = (remote != null);
+            n = receive(dst, connected);
+            while (n == IOStatus.UNAVAILABLE && isOpen()) {
+                park(Net.POLLIN);
+                n = receive(dst, connected);
+            }
+            completed = (n > 0) || (n == 0 && isOpen());
+            return sender;
+        } finally {
+            endRead(true, completed);
+            assert IOStatus.check(n);
+        }
+    }
+
+    /**
+     * Receives a datagram into given buffer with a timeout. This method is
+     * used to support the socket adaptor. The buffer is assumed to be trusted.
+     * @throws SocketTimeoutException if the timeout elapses
+     */
+    private SocketAddress trustedBlockingReceive(ByteBuffer dst, long nanos)
+        throws IOException
+    {
+        assert readLock.isHeldByCurrentThread() && isBlocking();
+        boolean completed = false;
+        int n = 0;
+        try {
+            SocketAddress remote = beginRead(true, false);
+            boolean connected = (remote != null);
+
+            // change socket to non-blocking
+            lockedConfigureBlocking(false);
+            try {
+                long startNanos = System.nanoTime();
+                n = receive(dst, connected);
+                while (n == IOStatus.UNAVAILABLE && isOpen()) {
+                    long remainingNanos = nanos - (System.nanoTime() - startNanos);
+                    if (remainingNanos <= 0) {
+                        throw new SocketTimeoutException("Receive timed out");
+                    }
+                    park(Net.POLLIN, remainingNanos);
+                    n = receive(dst, connected);
+                }
+                completed = (n > 0) || (n == 0 && isOpen());
+                return sender;
+            } finally {
+                // restore socket to blocking mode (if channel is open)
+                tryLockedConfigureBlocking(true);
+            }
+
+        } finally {
+            endRead(true, completed);
+            assert IOStatus.check(n);
+        }
+    }
+
+    private int receive(ByteBuffer dst, boolean connected) throws IOException {
         int pos = dst.position();
         int lim = dst.limit();
         assert (pos <= lim);
         int rem = (pos <= lim ? lim - pos : 0);
         if (dst instanceof DirectBuffer && rem > 0)
-            return receiveIntoNativeBuffer(fd, dst, rem, pos, connected);
+            return receiveIntoNativeBuffer(dst, rem, pos, connected);
 
         // Substitute a native buffer. If the supplied buffer is empty
         // we must instead use a nonempty buffer, otherwise the call
@@ -487,7 +625,7 @@
         int newSize = Math.max(rem, 1);
         ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize);
         try {
-            int n = receiveIntoNativeBuffer(fd, bb, newSize, 0, connected);
+            int n = receiveIntoNativeBuffer(bb, newSize, 0, connected);
             bb.flip();
             if (n > 0 && rem > 0)
                 dst.put(bb);
@@ -497,8 +635,8 @@
         }
     }
 
-    private int receiveIntoNativeBuffer(FileDescriptor fd, ByteBuffer bb,
-                                        int rem, int pos, boolean connected)
+    private int receiveIntoNativeBuffer(ByteBuffer bb, int rem, int pos,
+                                        boolean connected)
         throws IOException
     {
         int n = receive0(fd, ((DirectBuffer)bb).address() + pos, rem, connected);
@@ -563,6 +701,25 @@
         }
     }
 
+    /**
+     * Sends a datagram from the bytes in given buffer.
+     *
+     * @apiNote This method is for use by the socket adaptor.
+     *
+     * @throws IllegalBlockingModeException if the channel is non-blocking
+     */
+    void blockingSend(ByteBuffer src, SocketAddress target) throws IOException {
+        writeLock.lock();
+        try {
+            ensureOpen();
+            if (!isBlocking())
+                throw new IllegalBlockingModeException();
+            send(src, target);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
     private int send(FileDescriptor fd, ByteBuffer src, InetSocketAddress target)
         throws IOException
     {
@@ -785,10 +942,7 @@
         try {
             writeLock.lock();
             try {
-                synchronized (stateLock) {
-                    ensureOpen();
-                    IOUtil.configureBlocking(fd, block);
-                }
+                lockedConfigureBlocking(block);
             } finally {
                 writeLock.unlock();
             }
@@ -797,6 +951,36 @@
         }
     }
 
+    /**
+     * Adjusts the blocking mode. readLock or writeLock must already be held.
+     */
+    private void lockedConfigureBlocking(boolean block) throws IOException {
+        assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
+        synchronized (stateLock) {
+            ensureOpen();
+            IOUtil.configureBlocking(fd, block);
+        }
+    }
+
+    /**
+     * Adjusts the blocking mode if the channel is open. readLock or writeLock
+     * must already be held.
+     *
+     * @return {@code true} if the blocking mode was adjusted, {@code false} if
+     *         the blocking mode was not adjusted because the channel is closed
+     */
+    private boolean tryLockedConfigureBlocking(boolean block) throws IOException {
+        assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
+        synchronized (stateLock) {
+            if (isOpen()) {
+                IOUtil.configureBlocking(fd, block);
+                return true;
+            } else {
+                return false;
+            }
+        }
+    }
+
     InetSocketAddress localAddress() {
         synchronized (stateLock) {
             return localAddress;
@@ -861,6 +1045,16 @@
 
     @Override
     public DatagramChannel connect(SocketAddress sa) throws IOException {
+        return connect(sa, true);
+    }
+
+    /**
+     * Connects the channel's socket.
+     *
+     * @param sa the remote address to which this channel is to be connected
+     * @param check true to check if the channel is already connected.
+     */
+    DatagramChannel connect(SocketAddress sa, boolean check) throws IOException {
         InetSocketAddress isa = Net.checkAddress(sa, family);
         SecurityManager sm = System.getSecurityManager();
         if (sm != null) {
@@ -879,7 +1073,7 @@
             try {
                 synchronized (stateLock) {
                     ensureOpen();
-                    if (state == ST_CONNECTED)
+                    if (check && state == ST_CONNECTED)
                         throw new AlreadyConnectedException();
 
                     // ensure that the socket is bound
@@ -908,7 +1102,7 @@
                     }
                     try {
                         ByteBuffer buf = ByteBuffer.allocate(100);
-                        while (receive(fd, buf, false) > 0) {
+                        while (receive(buf, false) >= 0) {
                             buf.clear();
                         }
                     } finally {
@@ -1332,30 +1526,6 @@
     }
 
     /**
-     * Poll this channel's socket for reading up to the given timeout.
-     * @return {@code true} if the socket is polled
-     */
-    boolean pollRead(long timeout) throws IOException {
-        boolean blocking = isBlocking();
-        assert Thread.holdsLock(blockingLock()) && blocking;
-
-        readLock.lock();
-        try {
-            boolean polled = false;
-            try {
-                beginRead(blocking, false);
-                int events = Net.poll(fd, Net.POLLIN, timeout);
-                polled = (events != 0);
-            } finally {
-                endRead(blocking, polled);
-            }
-            return polled;
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    /**
      * Translates an interest operation set into a native poll event set
      */
     public int translateInterestOps(int ops) {