--- a/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java Thu Apr 25 09:12:40 2019 +0200
+++ b/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java Thu Apr 25 10:41:49 2019 +0100
@@ -53,6 +53,7 @@
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import sun.net.ResourceManager;
@@ -89,7 +90,8 @@
// Lock held by any thread that modifies the state fields declared below
// DO NOT invoke a blocking I/O operation while holding this lock!
- private final Object stateLock = new Object();
+ private final ReentrantLock stateLock = new ReentrantLock();
+ private final Condition stateCondition = stateLock.newCondition();
// -- The following fields are protected by stateLock
@@ -179,8 +181,11 @@
: StandardProtocolFamily.INET;
this.fd = fd;
this.fdVal = IOUtil.fdVal(fd);
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
this.localAddress = Net.localAddress(fd);
+ } finally {
+ stateLock.unlock();
}
}
@@ -192,27 +197,36 @@
@Override
public DatagramSocket socket() {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
if (socket == null)
socket = DatagramSocketAdaptor.create(this);
return socket;
+ } finally {
+ stateLock.unlock();
}
}
@Override
public SocketAddress getLocalAddress() throws IOException {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
// Perform security check before returning address
return Net.getRevealedLocalAddress(localAddress);
+ } finally {
+ stateLock.unlock();
}
}
@Override
public SocketAddress getRemoteAddress() throws IOException {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
return remoteAddress;
+ } finally {
+ stateLock.unlock();
}
}
@@ -224,7 +238,8 @@
if (!supportedOptions().contains(name))
throw new UnsupportedOperationException("'" + name + "' not supported");
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
if (name == StandardSocketOptions.IP_TOS ||
@@ -264,6 +279,8 @@
// remaining options don't need any special handling
Net.setSocketOption(fd, Net.UNSPEC, name, value);
return this;
+ } finally {
+ stateLock.unlock();
}
}
@@ -276,7 +293,8 @@
if (!supportedOptions().contains(name))
throw new UnsupportedOperationException("'" + name + "' not supported");
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
if (name == StandardSocketOptions.IP_TOS ||
@@ -315,6 +333,8 @@
// no special handling
return (T) Net.getSocketOption(fd, Net.UNSPEC, name);
+ } finally {
+ stateLock.unlock();
}
}
@@ -362,7 +382,8 @@
begin();
}
SocketAddress remote;
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
remote = remoteAddress;
if ((remote == null) && mustBeConnected)
@@ -371,6 +392,8 @@
bindInternal(null);
if (blocking)
readerThread = NativeThread.current();
+ } finally {
+ stateLock.unlock();
}
return remote;
}
@@ -384,12 +407,15 @@
throws AsynchronousCloseException
{
if (blocking) {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
readerThread = 0;
// notify any thread waiting in implCloseSelectableChannel
if (state == ST_CLOSING) {
- stateLock.notifyAll();
+ stateCondition.signalAll();
}
+ } finally {
+ stateLock.unlock();
}
// remove hook for Thread.interrupt
end(completed);
@@ -414,21 +440,29 @@
SecurityManager sm = System.getSecurityManager();
if (connected || (sm == null)) {
// connected or no security manager
- do {
- n = receive(fd, dst, connected);
- } while ((n == IOStatus.INTERRUPTED) && isOpen());
- if (n == IOStatus.UNAVAILABLE)
+ n = receive(fd, dst, connected);
+ if (blocking) {
+ while (IOStatus.okayToRetry(n) && isOpen()) {
+ park(Net.POLLIN);
+ n = receive(fd, dst, connected);
+ }
+ } else if (n == IOStatus.UNAVAILABLE) {
return null;
+ }
} else {
// Cannot receive into user's buffer when running with a
// security manager and not connected
bb = Util.getTemporaryDirectBuffer(dst.remaining());
for (;;) {
- do {
- n = receive(fd, bb, connected);
- } while ((n == IOStatus.INTERRUPTED) && isOpen());
- if (n == IOStatus.UNAVAILABLE)
+ n = receive(fd, bb, connected);
+ if (blocking) {
+ while (IOStatus.okayToRetry(n) && isOpen()) {
+ park(Net.POLLIN);
+ n = receive(fd, bb, connected);
+ }
+ } else if (n == IOStatus.UNAVAILABLE) {
return null;
+ }
InetSocketAddress isa = (InetSocketAddress)sender;
try {
sm.checkAccept(isa.getAddress().getHostAddress(),
@@ -493,6 +527,7 @@
return n;
}
+ @Override
public int send(ByteBuffer src, SocketAddress target)
throws IOException
{
@@ -510,9 +545,13 @@
if (!target.equals(remote)) {
throw new AlreadyConnectedException();
}
- do {
- n = IOUtil.write(fd, src, -1, nd);
- } while ((n == IOStatus.INTERRUPTED) && isOpen());
+ n = IOUtil.write(fd, src, -1, nd);
+ if (blocking) {
+ while (IOStatus.okayToRetry(n) && isOpen()) {
+ park(Net.POLLOUT);
+ n = IOUtil.write(fd, src, -1, nd);
+ }
+ }
} else {
// not connected
SecurityManager sm = System.getSecurityManager();
@@ -524,9 +563,13 @@
sm.checkConnect(ia.getHostAddress(), isa.getPort());
}
}
- do {
- n = send(fd, src, isa);
- } while ((n == IOStatus.INTERRUPTED) && isOpen());
+ n = send(fd, src, isa);
+ if (blocking) {
+ while (IOStatus.okayToRetry(n) && isOpen()) {
+ park(Net.POLLOUT);
+ n = send(fd, src, isa);
+ }
+ }
}
} finally {
endWrite(blocking, n > 0);
@@ -602,10 +645,13 @@
int n = 0;
try {
beginRead(blocking, true);
- do {
- n = IOUtil.read(fd, buf, -1, nd);
- } while ((n == IOStatus.INTERRUPTED) && isOpen());
-
+ n = IOUtil.read(fd, buf, -1, nd);
+ if (blocking) {
+ while (IOStatus.okayToRetry(n) && isOpen()) {
+ park(Net.POLLIN);
+ n = IOUtil.read(fd, buf, -1, nd);
+ }
+ }
} finally {
endRead(blocking, n > 0);
assert IOStatus.check(n);
@@ -628,10 +674,13 @@
long n = 0;
try {
beginRead(blocking, true);
- do {
- n = IOUtil.read(fd, dsts, offset, length, nd);
- } while ((n == IOStatus.INTERRUPTED) && isOpen());
-
+ n = IOUtil.read(fd, dsts, offset, length, nd);
+ if (blocking) {
+ while (IOStatus.okayToRetry(n) && isOpen()) {
+ park(Net.POLLIN);
+ n = IOUtil.read(fd, dsts, offset, length, nd);
+ }
+ }
} finally {
endRead(blocking, n > 0);
assert IOStatus.check(n);
@@ -659,7 +708,8 @@
begin();
}
SocketAddress remote;
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
remote = remoteAddress;
if ((remote == null) && mustBeConnected)
@@ -668,6 +718,8 @@
bindInternal(null);
if (blocking)
writerThread = NativeThread.current();
+ } finally {
+ stateLock.unlock();
}
return remote;
}
@@ -681,12 +733,15 @@
throws AsynchronousCloseException
{
if (blocking) {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
writerThread = 0;
// notify any thread waiting in implCloseSelectableChannel
if (state == ST_CLOSING) {
- stateLock.notifyAll();
+ stateCondition.signalAll();
}
+ } finally {
+ stateLock.unlock();
}
// remove hook for Thread.interrupt
end(completed);
@@ -703,9 +758,13 @@
int n = 0;
try {
beginWrite(blocking, true);
- do {
- n = IOUtil.write(fd, buf, -1, nd);
- } while ((n == IOStatus.INTERRUPTED) && isOpen());
+ n = IOUtil.write(fd, buf, -1, nd);
+ if (blocking) {
+ while (IOStatus.okayToRetry(n) && isOpen()) {
+ park(Net.POLLOUT);
+ n = IOUtil.write(fd, buf, -1, nd);
+ }
+ }
} finally {
endWrite(blocking, n > 0);
assert IOStatus.check(n);
@@ -728,9 +787,13 @@
long n = 0;
try {
beginWrite(blocking, true);
- do {
- n = IOUtil.write(fd, srcs, offset, length, nd);
- } while ((n == IOStatus.INTERRUPTED) && isOpen());
+ n = IOUtil.write(fd, srcs, offset, length, nd);
+ if (blocking) {
+ while (IOStatus.okayToRetry(n) && isOpen()) {
+ park(Net.POLLOUT);
+ n = IOUtil.write(fd, srcs, offset, length, nd);
+ }
+ }
} finally {
endWrite(blocking, n > 0);
assert IOStatus.check(n);
@@ -747,9 +810,12 @@
try {
writeLock.lock();
try {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
IOUtil.configureBlocking(fd, block);
+ } finally {
+ stateLock.unlock();
}
} finally {
writeLock.unlock();
@@ -760,14 +826,20 @@
}
InetSocketAddress localAddress() {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
return localAddress;
+ } finally {
+ stateLock.unlock();
}
}
InetSocketAddress remoteAddress() {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
return remoteAddress;
+ } finally {
+ stateLock.unlock();
}
}
@@ -777,11 +849,14 @@
try {
writeLock.lock();
try {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
if (localAddress != null)
throw new AlreadyBoundException();
bindInternal(local);
+ } finally {
+ stateLock.unlock();
}
} finally {
writeLock.unlock();
@@ -793,7 +868,7 @@
}
private void bindInternal(SocketAddress local) throws IOException {
- assert Thread.holdsLock(stateLock) && (localAddress == null);
+ assert stateLock.isHeldByCurrentThread() && (localAddress == null);
InetSocketAddress isa;
if (local == null) {
@@ -816,8 +891,11 @@
@Override
public boolean isConnected() {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
return (state == ST_CONNECTED);
+ } finally {
+ stateLock.unlock();
}
}
@@ -839,7 +917,8 @@
try {
writeLock.lock();
try {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
if (state == ST_CONNECTED)
throw new AlreadyConnectedException();
@@ -865,7 +944,7 @@
}
try {
ByteBuffer buf = ByteBuffer.allocate(100);
- while (receive(buf) != null) {
+ while (receive(fd, buf, false) > 0) {
buf.clear();
}
} finally {
@@ -873,6 +952,9 @@
IOUtil.configureBlocking(fd, true);
}
}
+
+ } finally {
+ stateLock.unlock();
}
} finally {
writeLock.unlock();
@@ -889,7 +971,8 @@
try {
writeLock.lock();
try {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
if (!isOpen() || (state != ST_CONNECTED))
return this;
@@ -903,6 +986,8 @@
// refresh local address
localAddress = Net.localAddress(fd);
+ } finally {
+ stateLock.unlock();
}
} finally {
writeLock.unlock();
@@ -950,7 +1035,8 @@
if (sm != null)
sm.checkMulticast(group);
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
// check the registry to see if we are already a member of the group
@@ -1005,6 +1091,8 @@
registry.add(key);
return key;
+ } finally {
+ stateLock.unlock();
}
}
@@ -1030,7 +1118,8 @@
void drop(MembershipKeyImpl key) {
assert key.channel() == this;
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
if (!key.isValid())
return;
@@ -1051,6 +1140,8 @@
key.invalidate();
registry.remove(key);
+ } finally {
+ stateLock.unlock();
}
}
@@ -1064,7 +1155,8 @@
assert key.channel() == this;
assert key.sourceAddress() == null;
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
if (!key.isValid())
throw new IllegalStateException("key is no longer valid");
if (source.isAnyLocalAddress())
@@ -1090,6 +1182,8 @@
// ancient kernel
throw new UnsupportedOperationException();
}
+ } finally {
+ stateLock.unlock();
}
}
@@ -1100,7 +1194,8 @@
assert key.channel() == this;
assert key.sourceAddress() == null;
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
if (!key.isValid())
throw new IllegalStateException("key is no longer valid");
@@ -1120,6 +1215,8 @@
// should not happen
throw new AssertionError(ioe);
}
+ } finally {
+ stateLock.unlock();
}
}
@@ -1144,7 +1241,8 @@
boolean interrupted = false;
// set state to ST_CLOSING and invalid membership keys
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
assert state < ST_CLOSING;
blocking = isBlocking();
state = ST_CLOSING;
@@ -1152,11 +1250,14 @@
// if member of any multicast groups then invalidate the keys
if (registry != null)
registry.invalidateAll();
+ } finally {
+ stateLock.unlock();
}
// wait for any outstanding I/O operations to complete
if (blocking) {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
assert state == ST_CLOSING;
long reader = readerThread;
long writer = writerThread;
@@ -1171,12 +1272,14 @@
// wait for blocking I/O operations to end
while (readerThread != 0 || writerThread != 0) {
try {
- stateLock.wait();
+ stateCondition.await();
} catch (InterruptedException e) {
interrupted = true;
}
}
}
+ } finally {
+ stateLock.unlock();
}
} else {
// non-blocking mode: wait for read/write to complete
@@ -1190,9 +1293,12 @@
}
// set state to ST_KILLPENDING
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
assert state == ST_CLOSING;
state = ST_KILLPENDING;
+ } finally {
+ stateLock.unlock();
}
// close socket if not registered with Selector
@@ -1206,7 +1312,8 @@
@Override
public void kill() throws IOException {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
if (state == ST_KILLPENDING) {
state = ST_KILLED;
try {
@@ -1216,6 +1323,8 @@
ResourceManager.afterUdpClose();
}
}
+ } finally {
+ stateLock.unlock();
}
}