diff -r b43cc3b9ef40 -r 13b67c1420b8 src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java --- a/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java Thu Apr 25 09:12:40 2019 +0200 +++ b/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java Thu Apr 25 10:41:49 2019 +0100 @@ -53,6 +53,7 @@ import java.util.HashSet; import java.util.Objects; import java.util.Set; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import sun.net.ResourceManager; @@ -89,7 +90,8 @@ // Lock held by any thread that modifies the state fields declared below // DO NOT invoke a blocking I/O operation while holding this lock! - private final Object stateLock = new Object(); + private final ReentrantLock stateLock = new ReentrantLock(); + private final Condition stateCondition = stateLock.newCondition(); // -- The following fields are protected by stateLock @@ -179,8 +181,11 @@ : StandardProtocolFamily.INET; this.fd = fd; this.fdVal = IOUtil.fdVal(fd); - synchronized (stateLock) { + stateLock.lock(); + try { this.localAddress = Net.localAddress(fd); + } finally { + stateLock.unlock(); } } @@ -192,27 +197,36 @@ @Override public DatagramSocket socket() { - synchronized (stateLock) { + stateLock.lock(); + try { if (socket == null) socket = DatagramSocketAdaptor.create(this); return socket; + } finally { + stateLock.unlock(); } } @Override public SocketAddress getLocalAddress() throws IOException { - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); // Perform security check before returning address return Net.getRevealedLocalAddress(localAddress); + } finally { + stateLock.unlock(); } } @Override public SocketAddress getRemoteAddress() throws IOException { - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); return remoteAddress; + } finally { + stateLock.unlock(); } } @@ -224,7 +238,8 @@ if (!supportedOptions().contains(name)) throw new UnsupportedOperationException("'" + name + "' not supported"); - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); if (name == StandardSocketOptions.IP_TOS || @@ -264,6 +279,8 @@ // remaining options don't need any special handling Net.setSocketOption(fd, Net.UNSPEC, name, value); return this; + } finally { + stateLock.unlock(); } } @@ -276,7 +293,8 @@ if (!supportedOptions().contains(name)) throw new UnsupportedOperationException("'" + name + "' not supported"); - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); if (name == StandardSocketOptions.IP_TOS || @@ -315,6 +333,8 @@ // no special handling return (T) Net.getSocketOption(fd, Net.UNSPEC, name); + } finally { + stateLock.unlock(); } } @@ -362,7 +382,8 @@ begin(); } SocketAddress remote; - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); remote = remoteAddress; if ((remote == null) && mustBeConnected) @@ -371,6 +392,8 @@ bindInternal(null); if (blocking) readerThread = NativeThread.current(); + } finally { + stateLock.unlock(); } return remote; } @@ -384,12 +407,15 @@ throws AsynchronousCloseException { if (blocking) { - synchronized (stateLock) { + stateLock.lock(); + try { readerThread = 0; // notify any thread waiting in implCloseSelectableChannel if (state == ST_CLOSING) { - stateLock.notifyAll(); + stateCondition.signalAll(); } + } finally { + stateLock.unlock(); } // remove hook for Thread.interrupt end(completed); @@ -414,21 +440,29 @@ SecurityManager sm = System.getSecurityManager(); if (connected || (sm == null)) { // connected or no security manager - do { - n = receive(fd, dst, connected); - } while ((n == IOStatus.INTERRUPTED) && isOpen()); - if (n == IOStatus.UNAVAILABLE) + n = receive(fd, dst, connected); + if (blocking) { + while (IOStatus.okayToRetry(n) && isOpen()) { + park(Net.POLLIN); + n = receive(fd, dst, connected); + } + } else if (n == IOStatus.UNAVAILABLE) { return null; + } } else { // Cannot receive into user's buffer when running with a // security manager and not connected bb = Util.getTemporaryDirectBuffer(dst.remaining()); for (;;) { - do { - n = receive(fd, bb, connected); - } while ((n == IOStatus.INTERRUPTED) && isOpen()); - if (n == IOStatus.UNAVAILABLE) + n = receive(fd, bb, connected); + if (blocking) { + while (IOStatus.okayToRetry(n) && isOpen()) { + park(Net.POLLIN); + n = receive(fd, bb, connected); + } + } else if (n == IOStatus.UNAVAILABLE) { return null; + } InetSocketAddress isa = (InetSocketAddress)sender; try { sm.checkAccept(isa.getAddress().getHostAddress(), @@ -493,6 +527,7 @@ return n; } + @Override public int send(ByteBuffer src, SocketAddress target) throws IOException { @@ -510,9 +545,13 @@ if (!target.equals(remote)) { throw new AlreadyConnectedException(); } - do { - n = IOUtil.write(fd, src, -1, nd); - } while ((n == IOStatus.INTERRUPTED) && isOpen()); + n = IOUtil.write(fd, src, -1, nd); + if (blocking) { + while (IOStatus.okayToRetry(n) && isOpen()) { + park(Net.POLLOUT); + n = IOUtil.write(fd, src, -1, nd); + } + } } else { // not connected SecurityManager sm = System.getSecurityManager(); @@ -524,9 +563,13 @@ sm.checkConnect(ia.getHostAddress(), isa.getPort()); } } - do { - n = send(fd, src, isa); - } while ((n == IOStatus.INTERRUPTED) && isOpen()); + n = send(fd, src, isa); + if (blocking) { + while (IOStatus.okayToRetry(n) && isOpen()) { + park(Net.POLLOUT); + n = send(fd, src, isa); + } + } } } finally { endWrite(blocking, n > 0); @@ -602,10 +645,13 @@ int n = 0; try { beginRead(blocking, true); - do { - n = IOUtil.read(fd, buf, -1, nd); - } while ((n == IOStatus.INTERRUPTED) && isOpen()); - + n = IOUtil.read(fd, buf, -1, nd); + if (blocking) { + while (IOStatus.okayToRetry(n) && isOpen()) { + park(Net.POLLIN); + n = IOUtil.read(fd, buf, -1, nd); + } + } } finally { endRead(blocking, n > 0); assert IOStatus.check(n); @@ -628,10 +674,13 @@ long n = 0; try { beginRead(blocking, true); - do { - n = IOUtil.read(fd, dsts, offset, length, nd); - } while ((n == IOStatus.INTERRUPTED) && isOpen()); - + n = IOUtil.read(fd, dsts, offset, length, nd); + if (blocking) { + while (IOStatus.okayToRetry(n) && isOpen()) { + park(Net.POLLIN); + n = IOUtil.read(fd, dsts, offset, length, nd); + } + } } finally { endRead(blocking, n > 0); assert IOStatus.check(n); @@ -659,7 +708,8 @@ begin(); } SocketAddress remote; - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); remote = remoteAddress; if ((remote == null) && mustBeConnected) @@ -668,6 +718,8 @@ bindInternal(null); if (blocking) writerThread = NativeThread.current(); + } finally { + stateLock.unlock(); } return remote; } @@ -681,12 +733,15 @@ throws AsynchronousCloseException { if (blocking) { - synchronized (stateLock) { + stateLock.lock(); + try { writerThread = 0; // notify any thread waiting in implCloseSelectableChannel if (state == ST_CLOSING) { - stateLock.notifyAll(); + stateCondition.signalAll(); } + } finally { + stateLock.unlock(); } // remove hook for Thread.interrupt end(completed); @@ -703,9 +758,13 @@ int n = 0; try { beginWrite(blocking, true); - do { - n = IOUtil.write(fd, buf, -1, nd); - } while ((n == IOStatus.INTERRUPTED) && isOpen()); + n = IOUtil.write(fd, buf, -1, nd); + if (blocking) { + while (IOStatus.okayToRetry(n) && isOpen()) { + park(Net.POLLOUT); + n = IOUtil.write(fd, buf, -1, nd); + } + } } finally { endWrite(blocking, n > 0); assert IOStatus.check(n); @@ -728,9 +787,13 @@ long n = 0; try { beginWrite(blocking, true); - do { - n = IOUtil.write(fd, srcs, offset, length, nd); - } while ((n == IOStatus.INTERRUPTED) && isOpen()); + n = IOUtil.write(fd, srcs, offset, length, nd); + if (blocking) { + while (IOStatus.okayToRetry(n) && isOpen()) { + park(Net.POLLOUT); + n = IOUtil.write(fd, srcs, offset, length, nd); + } + } } finally { endWrite(blocking, n > 0); assert IOStatus.check(n); @@ -747,9 +810,12 @@ try { writeLock.lock(); try { - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); IOUtil.configureBlocking(fd, block); + } finally { + stateLock.unlock(); } } finally { writeLock.unlock(); @@ -760,14 +826,20 @@ } InetSocketAddress localAddress() { - synchronized (stateLock) { + stateLock.lock(); + try { return localAddress; + } finally { + stateLock.unlock(); } } InetSocketAddress remoteAddress() { - synchronized (stateLock) { + stateLock.lock(); + try { return remoteAddress; + } finally { + stateLock.unlock(); } } @@ -777,11 +849,14 @@ try { writeLock.lock(); try { - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); if (localAddress != null) throw new AlreadyBoundException(); bindInternal(local); + } finally { + stateLock.unlock(); } } finally { writeLock.unlock(); @@ -793,7 +868,7 @@ } private void bindInternal(SocketAddress local) throws IOException { - assert Thread.holdsLock(stateLock) && (localAddress == null); + assert stateLock.isHeldByCurrentThread() && (localAddress == null); InetSocketAddress isa; if (local == null) { @@ -816,8 +891,11 @@ @Override public boolean isConnected() { - synchronized (stateLock) { + stateLock.lock(); + try { return (state == ST_CONNECTED); + } finally { + stateLock.unlock(); } } @@ -839,7 +917,8 @@ try { writeLock.lock(); try { - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); if (state == ST_CONNECTED) throw new AlreadyConnectedException(); @@ -865,7 +944,7 @@ } try { ByteBuffer buf = ByteBuffer.allocate(100); - while (receive(buf) != null) { + while (receive(fd, buf, false) > 0) { buf.clear(); } } finally { @@ -873,6 +952,9 @@ IOUtil.configureBlocking(fd, true); } } + + } finally { + stateLock.unlock(); } } finally { writeLock.unlock(); @@ -889,7 +971,8 @@ try { writeLock.lock(); try { - synchronized (stateLock) { + stateLock.lock(); + try { if (!isOpen() || (state != ST_CONNECTED)) return this; @@ -903,6 +986,8 @@ // refresh local address localAddress = Net.localAddress(fd); + } finally { + stateLock.unlock(); } } finally { writeLock.unlock(); @@ -950,7 +1035,8 @@ if (sm != null) sm.checkMulticast(group); - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); // check the registry to see if we are already a member of the group @@ -1005,6 +1091,8 @@ registry.add(key); return key; + } finally { + stateLock.unlock(); } } @@ -1030,7 +1118,8 @@ void drop(MembershipKeyImpl key) { assert key.channel() == this; - synchronized (stateLock) { + stateLock.lock(); + try { if (!key.isValid()) return; @@ -1051,6 +1140,8 @@ key.invalidate(); registry.remove(key); + } finally { + stateLock.unlock(); } } @@ -1064,7 +1155,8 @@ assert key.channel() == this; assert key.sourceAddress() == null; - synchronized (stateLock) { + stateLock.lock(); + try { if (!key.isValid()) throw new IllegalStateException("key is no longer valid"); if (source.isAnyLocalAddress()) @@ -1090,6 +1182,8 @@ // ancient kernel throw new UnsupportedOperationException(); } + } finally { + stateLock.unlock(); } } @@ -1100,7 +1194,8 @@ assert key.channel() == this; assert key.sourceAddress() == null; - synchronized (stateLock) { + stateLock.lock(); + try { if (!key.isValid()) throw new IllegalStateException("key is no longer valid"); @@ -1120,6 +1215,8 @@ // should not happen throw new AssertionError(ioe); } + } finally { + stateLock.unlock(); } } @@ -1144,7 +1241,8 @@ boolean interrupted = false; // set state to ST_CLOSING and invalid membership keys - synchronized (stateLock) { + stateLock.lock(); + try { assert state < ST_CLOSING; blocking = isBlocking(); state = ST_CLOSING; @@ -1152,11 +1250,14 @@ // if member of any multicast groups then invalidate the keys if (registry != null) registry.invalidateAll(); + } finally { + stateLock.unlock(); } // wait for any outstanding I/O operations to complete if (blocking) { - synchronized (stateLock) { + stateLock.lock(); + try { assert state == ST_CLOSING; long reader = readerThread; long writer = writerThread; @@ -1171,12 +1272,14 @@ // wait for blocking I/O operations to end while (readerThread != 0 || writerThread != 0) { try { - stateLock.wait(); + stateCondition.await(); } catch (InterruptedException e) { interrupted = true; } } } + } finally { + stateLock.unlock(); } } else { // non-blocking mode: wait for read/write to complete @@ -1190,9 +1293,12 @@ } // set state to ST_KILLPENDING - synchronized (stateLock) { + stateLock.lock(); + try { assert state == ST_CLOSING; state = ST_KILLPENDING; + } finally { + stateLock.unlock(); } // close socket if not registered with Selector @@ -1206,7 +1312,8 @@ @Override public void kill() throws IOException { - synchronized (stateLock) { + stateLock.lock(); + try { if (state == ST_KILLPENDING) { state = ST_KILLED; try { @@ -1216,6 +1323,8 @@ ResourceManager.afterUdpClose(); } } + } finally { + stateLock.unlock(); } }