src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java
changeset 54620 13b67c1420b8
parent 53419 eac105e3ec13
child 54754 193a8f1a4f3b
--- a/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java	Thu Apr 25 09:12:40 2019 +0200
+++ b/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java	Thu Apr 25 10:41:49 2019 +0100
@@ -53,6 +53,7 @@
 import java.util.HashSet;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
 import sun.net.ResourceManager;
@@ -89,7 +90,8 @@
 
     // Lock held by any thread that modifies the state fields declared below
     // DO NOT invoke a blocking I/O operation while holding this lock!
-    private final Object stateLock = new Object();
+    private final ReentrantLock stateLock = new ReentrantLock();
+    private final Condition stateCondition = stateLock.newCondition();
 
     // -- The following fields are protected by stateLock
 
@@ -179,8 +181,11 @@
                 : StandardProtocolFamily.INET;
         this.fd = fd;
         this.fdVal = IOUtil.fdVal(fd);
-        synchronized (stateLock) {
+        stateLock.lock();
+        try {
             this.localAddress = Net.localAddress(fd);
+        } finally {
+            stateLock.unlock();
         }
     }
 
@@ -192,27 +197,36 @@
 
     @Override
     public DatagramSocket socket() {
-        synchronized (stateLock) {
+        stateLock.lock();
+        try {
             if (socket == null)
                 socket = DatagramSocketAdaptor.create(this);
             return socket;
+        } finally {
+            stateLock.unlock();
         }
     }
 
     @Override
     public SocketAddress getLocalAddress() throws IOException {
-        synchronized (stateLock) {
+        stateLock.lock();
+        try {
             ensureOpen();
             // Perform security check before returning address
             return Net.getRevealedLocalAddress(localAddress);
+        } finally {
+            stateLock.unlock();
         }
     }
 
     @Override
     public SocketAddress getRemoteAddress() throws IOException {
-        synchronized (stateLock) {
+        stateLock.lock();
+        try {
             ensureOpen();
             return remoteAddress;
+        } finally {
+            stateLock.unlock();
         }
     }
 
@@ -224,7 +238,8 @@
         if (!supportedOptions().contains(name))
             throw new UnsupportedOperationException("'" + name + "' not supported");
 
-        synchronized (stateLock) {
+        stateLock.lock();
+        try {
             ensureOpen();
 
             if (name == StandardSocketOptions.IP_TOS ||
@@ -264,6 +279,8 @@
             // remaining options don't need any special handling
             Net.setSocketOption(fd, Net.UNSPEC, name, value);
             return this;
+        } finally {
+            stateLock.unlock();
         }
     }
 
@@ -276,7 +293,8 @@
         if (!supportedOptions().contains(name))
             throw new UnsupportedOperationException("'" + name + "' not supported");
 
-        synchronized (stateLock) {
+        stateLock.lock();
+        try {
             ensureOpen();
 
             if (name == StandardSocketOptions.IP_TOS ||
@@ -315,6 +333,8 @@
 
             // no special handling
             return (T) Net.getSocketOption(fd, Net.UNSPEC, name);
+        } finally {
+            stateLock.unlock();
         }
     }
 
@@ -362,7 +382,8 @@
             begin();
         }
         SocketAddress remote;
-        synchronized (stateLock) {
+        stateLock.lock();
+        try {
             ensureOpen();
             remote = remoteAddress;
             if ((remote == null) && mustBeConnected)
@@ -371,6 +392,8 @@
                 bindInternal(null);
             if (blocking)
                 readerThread = NativeThread.current();
+        } finally {
+            stateLock.unlock();
         }
         return remote;
     }
@@ -384,12 +407,15 @@
         throws AsynchronousCloseException
     {
         if (blocking) {
-            synchronized (stateLock) {
+            stateLock.lock();
+            try {
                 readerThread = 0;
                 // notify any thread waiting in implCloseSelectableChannel
                 if (state == ST_CLOSING) {
-                    stateLock.notifyAll();
+                    stateCondition.signalAll();
                 }
+            } finally {
+                stateLock.unlock();
             }
             // remove hook for Thread.interrupt
             end(completed);
@@ -414,21 +440,29 @@
                 SecurityManager sm = System.getSecurityManager();
                 if (connected || (sm == null)) {
                     // connected or no security manager
-                    do {
-                        n = receive(fd, dst, connected);
-                    } while ((n == IOStatus.INTERRUPTED) && isOpen());
-                    if (n == IOStatus.UNAVAILABLE)
+                    n = receive(fd, dst, connected);
+                    if (blocking) {
+                        while (IOStatus.okayToRetry(n) && isOpen()) {
+                            park(Net.POLLIN);
+                            n = receive(fd, dst, connected);
+                        }
+                    } else if (n == IOStatus.UNAVAILABLE) {
                         return null;
+                    }
                 } else {
                     // Cannot receive into user's buffer when running with a
                     // security manager and not connected
                     bb = Util.getTemporaryDirectBuffer(dst.remaining());
                     for (;;) {
-                        do {
-                            n = receive(fd, bb, connected);
-                        } while ((n == IOStatus.INTERRUPTED) && isOpen());
-                        if (n == IOStatus.UNAVAILABLE)
+                        n = receive(fd, bb, connected);
+                        if (blocking) {
+                            while (IOStatus.okayToRetry(n) && isOpen()) {
+                                park(Net.POLLIN);
+                                n = receive(fd, bb, connected);
+                            }
+                        } else if (n == IOStatus.UNAVAILABLE) {
                             return null;
+                        }
                         InetSocketAddress isa = (InetSocketAddress)sender;
                         try {
                             sm.checkAccept(isa.getAddress().getHostAddress(),
@@ -493,6 +527,7 @@
         return n;
     }
 
+    @Override
     public int send(ByteBuffer src, SocketAddress target)
         throws IOException
     {
@@ -510,9 +545,13 @@
                     if (!target.equals(remote)) {
                         throw new AlreadyConnectedException();
                     }
-                    do {
-                        n = IOUtil.write(fd, src, -1, nd);
-                    } while ((n == IOStatus.INTERRUPTED) && isOpen());
+                    n = IOUtil.write(fd, src, -1, nd);
+                    if (blocking) {
+                        while (IOStatus.okayToRetry(n) && isOpen()) {
+                            park(Net.POLLOUT);
+                            n = IOUtil.write(fd, src, -1, nd);
+                        }
+                    }
                 } else {
                     // not connected
                     SecurityManager sm = System.getSecurityManager();
@@ -524,9 +563,13 @@
                             sm.checkConnect(ia.getHostAddress(), isa.getPort());
                         }
                     }
-                    do {
-                        n = send(fd, src, isa);
-                    } while ((n == IOStatus.INTERRUPTED) && isOpen());
+                    n = send(fd, src, isa);
+                    if (blocking) {
+                        while (IOStatus.okayToRetry(n) && isOpen()) {
+                            park(Net.POLLOUT);
+                            n = send(fd, src, isa);
+                        }
+                    }
                 }
             } finally {
                 endWrite(blocking, n > 0);
@@ -602,10 +645,13 @@
             int n = 0;
             try {
                 beginRead(blocking, true);
-                do {
-                    n = IOUtil.read(fd, buf, -1, nd);
-                } while ((n == IOStatus.INTERRUPTED) && isOpen());
-
+                n = IOUtil.read(fd, buf, -1, nd);
+                if (blocking) {
+                    while (IOStatus.okayToRetry(n) && isOpen()) {
+                        park(Net.POLLIN);
+                        n = IOUtil.read(fd, buf, -1, nd);
+                    }
+                }
             } finally {
                 endRead(blocking, n > 0);
                 assert IOStatus.check(n);
@@ -628,10 +674,13 @@
             long n = 0;
             try {
                 beginRead(blocking, true);
-                do {
-                    n = IOUtil.read(fd, dsts, offset, length, nd);
-                } while ((n == IOStatus.INTERRUPTED) && isOpen());
-
+                n = IOUtil.read(fd, dsts, offset, length, nd);
+                if (blocking) {
+                    while (IOStatus.okayToRetry(n) && isOpen()) {
+                        park(Net.POLLIN);
+                        n = IOUtil.read(fd, dsts, offset, length, nd);
+                    }
+                }
             } finally {
                 endRead(blocking, n > 0);
                 assert IOStatus.check(n);
@@ -659,7 +708,8 @@
             begin();
         }
         SocketAddress remote;
-        synchronized (stateLock) {
+        stateLock.lock();
+        try {
             ensureOpen();
             remote = remoteAddress;
             if ((remote == null) && mustBeConnected)
@@ -668,6 +718,8 @@
                 bindInternal(null);
             if (blocking)
                 writerThread = NativeThread.current();
+        } finally {
+            stateLock.unlock();
         }
         return remote;
     }
@@ -681,12 +733,15 @@
         throws AsynchronousCloseException
     {
         if (blocking) {
-            synchronized (stateLock) {
+            stateLock.lock();
+            try {
                 writerThread = 0;
                 // notify any thread waiting in implCloseSelectableChannel
                 if (state == ST_CLOSING) {
-                    stateLock.notifyAll();
+                    stateCondition.signalAll();
                 }
+            } finally {
+                stateLock.unlock();
             }
             // remove hook for Thread.interrupt
             end(completed);
@@ -703,9 +758,13 @@
             int n = 0;
             try {
                 beginWrite(blocking, true);
-                do {
-                    n = IOUtil.write(fd, buf, -1, nd);
-                } while ((n == IOStatus.INTERRUPTED) && isOpen());
+                n = IOUtil.write(fd, buf, -1, nd);
+                if (blocking) {
+                    while (IOStatus.okayToRetry(n) && isOpen()) {
+                        park(Net.POLLOUT);
+                        n = IOUtil.write(fd, buf, -1, nd);
+                    }
+                }
             } finally {
                 endWrite(blocking, n > 0);
                 assert IOStatus.check(n);
@@ -728,9 +787,13 @@
             long n = 0;
             try {
                 beginWrite(blocking, true);
-                do {
-                    n = IOUtil.write(fd, srcs, offset, length, nd);
-                } while ((n == IOStatus.INTERRUPTED) && isOpen());
+                n = IOUtil.write(fd, srcs, offset, length, nd);
+                if (blocking) {
+                    while (IOStatus.okayToRetry(n) && isOpen()) {
+                        park(Net.POLLOUT);
+                        n = IOUtil.write(fd, srcs, offset, length, nd);
+                    }
+                }
             } finally {
                 endWrite(blocking, n > 0);
                 assert IOStatus.check(n);
@@ -747,9 +810,12 @@
         try {
             writeLock.lock();
             try {
-                synchronized (stateLock) {
+                stateLock.lock();
+                try {
                     ensureOpen();
                     IOUtil.configureBlocking(fd, block);
+                } finally {
+                    stateLock.unlock();
                 }
             } finally {
                 writeLock.unlock();
@@ -760,14 +826,20 @@
     }
 
     InetSocketAddress localAddress() {
-        synchronized (stateLock) {
+        stateLock.lock();
+        try {
             return localAddress;
+        } finally {
+            stateLock.unlock();
         }
     }
 
     InetSocketAddress remoteAddress() {
-        synchronized (stateLock) {
+        stateLock.lock();
+        try {
             return remoteAddress;
+        } finally {
+            stateLock.unlock();
         }
     }
 
@@ -777,11 +849,14 @@
         try {
             writeLock.lock();
             try {
-                synchronized (stateLock) {
+                stateLock.lock();
+                try {
                     ensureOpen();
                     if (localAddress != null)
                         throw new AlreadyBoundException();
                     bindInternal(local);
+                } finally {
+                    stateLock.unlock();
                 }
             } finally {
                 writeLock.unlock();
@@ -793,7 +868,7 @@
     }
 
     private void bindInternal(SocketAddress local) throws IOException {
-        assert Thread.holdsLock(stateLock) && (localAddress == null);
+        assert stateLock.isHeldByCurrentThread() && (localAddress == null);
 
         InetSocketAddress isa;
         if (local == null) {
@@ -816,8 +891,11 @@
 
     @Override
     public boolean isConnected() {
-        synchronized (stateLock) {
+        stateLock.lock();
+        try {
             return (state == ST_CONNECTED);
+        } finally {
+            stateLock.unlock();
         }
     }
 
@@ -839,7 +917,8 @@
         try {
             writeLock.lock();
             try {
-                synchronized (stateLock) {
+                stateLock.lock();
+                try {
                     ensureOpen();
                     if (state == ST_CONNECTED)
                         throw new AlreadyConnectedException();
@@ -865,7 +944,7 @@
                     }
                     try {
                         ByteBuffer buf = ByteBuffer.allocate(100);
-                        while (receive(buf) != null) {
+                        while (receive(fd, buf, false) > 0) {
                             buf.clear();
                         }
                     } finally {
@@ -873,6 +952,9 @@
                             IOUtil.configureBlocking(fd, true);
                         }
                     }
+
+                } finally {
+                    stateLock.unlock();
                 }
             } finally {
                 writeLock.unlock();
@@ -889,7 +971,8 @@
         try {
             writeLock.lock();
             try {
-                synchronized (stateLock) {
+                stateLock.lock();
+                try {
                     if (!isOpen() || (state != ST_CONNECTED))
                         return this;
 
@@ -903,6 +986,8 @@
 
                     // refresh local address
                     localAddress = Net.localAddress(fd);
+                } finally {
+                    stateLock.unlock();
                 }
             } finally {
                 writeLock.unlock();
@@ -950,7 +1035,8 @@
         if (sm != null)
             sm.checkMulticast(group);
 
-        synchronized (stateLock) {
+        stateLock.lock();
+        try {
             ensureOpen();
 
             // check the registry to see if we are already a member of the group
@@ -1005,6 +1091,8 @@
 
             registry.add(key);
             return key;
+        } finally {
+            stateLock.unlock();
         }
     }
 
@@ -1030,7 +1118,8 @@
     void drop(MembershipKeyImpl key) {
         assert key.channel() == this;
 
-        synchronized (stateLock) {
+        stateLock.lock();
+        try {
             if (!key.isValid())
                 return;
 
@@ -1051,6 +1140,8 @@
 
             key.invalidate();
             registry.remove(key);
+        } finally {
+            stateLock.unlock();
         }
     }
 
@@ -1064,7 +1155,8 @@
         assert key.channel() == this;
         assert key.sourceAddress() == null;
 
-        synchronized (stateLock) {
+        stateLock.lock();
+        try {
             if (!key.isValid())
                 throw new IllegalStateException("key is no longer valid");
             if (source.isAnyLocalAddress())
@@ -1090,6 +1182,8 @@
                 // ancient kernel
                 throw new UnsupportedOperationException();
             }
+        } finally {
+            stateLock.unlock();
         }
     }
 
@@ -1100,7 +1194,8 @@
         assert key.channel() == this;
         assert key.sourceAddress() == null;
 
-        synchronized (stateLock) {
+        stateLock.lock();
+        try {
             if (!key.isValid())
                 throw new IllegalStateException("key is no longer valid");
 
@@ -1120,6 +1215,8 @@
                 // should not happen
                 throw new AssertionError(ioe);
             }
+        } finally {
+            stateLock.unlock();
         }
     }
 
@@ -1144,7 +1241,8 @@
         boolean interrupted = false;
 
         // set state to ST_CLOSING and invalid membership keys
-        synchronized (stateLock) {
+        stateLock.lock();
+        try {
             assert state < ST_CLOSING;
             blocking = isBlocking();
             state = ST_CLOSING;
@@ -1152,11 +1250,14 @@
             // if member of any multicast groups then invalidate the keys
             if (registry != null)
                 registry.invalidateAll();
+        } finally {
+            stateLock.unlock();
         }
 
         // wait for any outstanding I/O operations to complete
         if (blocking) {
-            synchronized (stateLock) {
+            stateLock.lock();
+            try {
                 assert state == ST_CLOSING;
                 long reader = readerThread;
                 long writer = writerThread;
@@ -1171,12 +1272,14 @@
                     // wait for blocking I/O operations to end
                     while (readerThread != 0 || writerThread != 0) {
                         try {
-                            stateLock.wait();
+                            stateCondition.await();
                         } catch (InterruptedException e) {
                             interrupted = true;
                         }
                     }
                 }
+            } finally {
+                stateLock.unlock();
             }
         } else {
             // non-blocking mode: wait for read/write to complete
@@ -1190,9 +1293,12 @@
         }
 
         // set state to ST_KILLPENDING
-        synchronized (stateLock) {
+        stateLock.lock();
+        try {
             assert state == ST_CLOSING;
             state = ST_KILLPENDING;
+        } finally {
+            stateLock.unlock();
         }
 
         // close socket if not registered with Selector
@@ -1206,7 +1312,8 @@
 
     @Override
     public void kill() throws IOException {
-        synchronized (stateLock) {
+        stateLock.lock();
+        try {
             if (state == ST_KILLPENDING) {
                 state = ST_KILLED;
                 try {
@@ -1216,6 +1323,8 @@
                     ResourceManager.afterUdpClose();
                 }
             }
+        } finally {
+            stateLock.unlock();
         }
     }