8222774: (ch) Replace uses of stateLock and blockingLock with j.u.c. locks
Reviewed-by: dfuchs, bpb, martin
--- a/src/java.base/share/classes/java/nio/channels/spi/AbstractInterruptibleChannel.java Thu Apr 25 09:12:40 2019 +0200
+++ b/src/java.base/share/classes/java/nio/channels/spi/AbstractInterruptibleChannel.java Thu Apr 25 10:41:49 2019 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2000, 2019, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -23,13 +23,15 @@
* questions.
*/
-/*
- */
-
package java.nio.channels.spi;
import java.io.IOException;
-import java.nio.channels.*;
+import java.nio.channels.AsynchronousCloseException;
+import java.nio.channels.Channel;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.InterruptibleChannel;
+import java.util.concurrent.locks.ReentrantLock;
+
import jdk.internal.access.SharedSecrets;
import sun.nio.ch.Interruptible;
@@ -84,8 +86,7 @@
public abstract class AbstractInterruptibleChannel
implements Channel, InterruptibleChannel
{
-
- private final Object closeLock = new Object();
+ private final ReentrantLock closeLock = new ReentrantLock();
private volatile boolean closed;
/**
@@ -105,11 +106,14 @@
* If an I/O error occurs
*/
public final void close() throws IOException {
- synchronized (closeLock) {
+ closeLock.lock();
+ try {
if (closed)
return;
closed = true;
implCloseChannel();
+ } finally {
+ closeLock.unlock();
}
}
@@ -153,7 +157,8 @@
if (interruptor == null) {
interruptor = new Interruptible() {
public void interrupt(Thread target) {
- synchronized (closeLock) {
+ closeLock.lock();
+ try {
if (closed)
return;
closed = true;
@@ -161,6 +166,8 @@
try {
AbstractInterruptibleChannel.this.implCloseChannel();
} catch (IOException x) { }
+ } finally {
+ closeLock.unlock();
}
}};
}
--- a/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java Thu Apr 25 09:12:40 2019 +0200
+++ b/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java Thu Apr 25 10:41:49 2019 +0100
@@ -53,6 +53,7 @@
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import sun.net.ResourceManager;
@@ -89,7 +90,8 @@
// Lock held by any thread that modifies the state fields declared below
// DO NOT invoke a blocking I/O operation while holding this lock!
- private final Object stateLock = new Object();
+ private final ReentrantLock stateLock = new ReentrantLock();
+ private final Condition stateCondition = stateLock.newCondition();
// -- The following fields are protected by stateLock
@@ -179,8 +181,11 @@
: StandardProtocolFamily.INET;
this.fd = fd;
this.fdVal = IOUtil.fdVal(fd);
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
this.localAddress = Net.localAddress(fd);
+ } finally {
+ stateLock.unlock();
}
}
@@ -192,27 +197,36 @@
@Override
public DatagramSocket socket() {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
if (socket == null)
socket = DatagramSocketAdaptor.create(this);
return socket;
+ } finally {
+ stateLock.unlock();
}
}
@Override
public SocketAddress getLocalAddress() throws IOException {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
// Perform security check before returning address
return Net.getRevealedLocalAddress(localAddress);
+ } finally {
+ stateLock.unlock();
}
}
@Override
public SocketAddress getRemoteAddress() throws IOException {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
return remoteAddress;
+ } finally {
+ stateLock.unlock();
}
}
@@ -224,7 +238,8 @@
if (!supportedOptions().contains(name))
throw new UnsupportedOperationException("'" + name + "' not supported");
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
if (name == StandardSocketOptions.IP_TOS ||
@@ -264,6 +279,8 @@
// remaining options don't need any special handling
Net.setSocketOption(fd, Net.UNSPEC, name, value);
return this;
+ } finally {
+ stateLock.unlock();
}
}
@@ -276,7 +293,8 @@
if (!supportedOptions().contains(name))
throw new UnsupportedOperationException("'" + name + "' not supported");
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
if (name == StandardSocketOptions.IP_TOS ||
@@ -315,6 +333,8 @@
// no special handling
return (T) Net.getSocketOption(fd, Net.UNSPEC, name);
+ } finally {
+ stateLock.unlock();
}
}
@@ -362,7 +382,8 @@
begin();
}
SocketAddress remote;
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
remote = remoteAddress;
if ((remote == null) && mustBeConnected)
@@ -371,6 +392,8 @@
bindInternal(null);
if (blocking)
readerThread = NativeThread.current();
+ } finally {
+ stateLock.unlock();
}
return remote;
}
@@ -384,12 +407,15 @@
throws AsynchronousCloseException
{
if (blocking) {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
readerThread = 0;
// notify any thread waiting in implCloseSelectableChannel
if (state == ST_CLOSING) {
- stateLock.notifyAll();
+ stateCondition.signalAll();
}
+ } finally {
+ stateLock.unlock();
}
// remove hook for Thread.interrupt
end(completed);
@@ -414,21 +440,29 @@
SecurityManager sm = System.getSecurityManager();
if (connected || (sm == null)) {
// connected or no security manager
- do {
- n = receive(fd, dst, connected);
- } while ((n == IOStatus.INTERRUPTED) && isOpen());
- if (n == IOStatus.UNAVAILABLE)
+ n = receive(fd, dst, connected);
+ if (blocking) {
+ while (IOStatus.okayToRetry(n) && isOpen()) {
+ park(Net.POLLIN);
+ n = receive(fd, dst, connected);
+ }
+ } else if (n == IOStatus.UNAVAILABLE) {
return null;
+ }
} else {
// Cannot receive into user's buffer when running with a
// security manager and not connected
bb = Util.getTemporaryDirectBuffer(dst.remaining());
for (;;) {
- do {
- n = receive(fd, bb, connected);
- } while ((n == IOStatus.INTERRUPTED) && isOpen());
- if (n == IOStatus.UNAVAILABLE)
+ n = receive(fd, bb, connected);
+ if (blocking) {
+ while (IOStatus.okayToRetry(n) && isOpen()) {
+ park(Net.POLLIN);
+ n = receive(fd, bb, connected);
+ }
+ } else if (n == IOStatus.UNAVAILABLE) {
return null;
+ }
InetSocketAddress isa = (InetSocketAddress)sender;
try {
sm.checkAccept(isa.getAddress().getHostAddress(),
@@ -493,6 +527,7 @@
return n;
}
+ @Override
public int send(ByteBuffer src, SocketAddress target)
throws IOException
{
@@ -510,9 +545,13 @@
if (!target.equals(remote)) {
throw new AlreadyConnectedException();
}
- do {
- n = IOUtil.write(fd, src, -1, nd);
- } while ((n == IOStatus.INTERRUPTED) && isOpen());
+ n = IOUtil.write(fd, src, -1, nd);
+ if (blocking) {
+ while (IOStatus.okayToRetry(n) && isOpen()) {
+ park(Net.POLLOUT);
+ n = IOUtil.write(fd, src, -1, nd);
+ }
+ }
} else {
// not connected
SecurityManager sm = System.getSecurityManager();
@@ -524,9 +563,13 @@
sm.checkConnect(ia.getHostAddress(), isa.getPort());
}
}
- do {
- n = send(fd, src, isa);
- } while ((n == IOStatus.INTERRUPTED) && isOpen());
+ n = send(fd, src, isa);
+ if (blocking) {
+ while (IOStatus.okayToRetry(n) && isOpen()) {
+ park(Net.POLLOUT);
+ n = send(fd, src, isa);
+ }
+ }
}
} finally {
endWrite(blocking, n > 0);
@@ -602,10 +645,13 @@
int n = 0;
try {
beginRead(blocking, true);
- do {
- n = IOUtil.read(fd, buf, -1, nd);
- } while ((n == IOStatus.INTERRUPTED) && isOpen());
-
+ n = IOUtil.read(fd, buf, -1, nd);
+ if (blocking) {
+ while (IOStatus.okayToRetry(n) && isOpen()) {
+ park(Net.POLLIN);
+ n = IOUtil.read(fd, buf, -1, nd);
+ }
+ }
} finally {
endRead(blocking, n > 0);
assert IOStatus.check(n);
@@ -628,10 +674,13 @@
long n = 0;
try {
beginRead(blocking, true);
- do {
- n = IOUtil.read(fd, dsts, offset, length, nd);
- } while ((n == IOStatus.INTERRUPTED) && isOpen());
-
+ n = IOUtil.read(fd, dsts, offset, length, nd);
+ if (blocking) {
+ while (IOStatus.okayToRetry(n) && isOpen()) {
+ park(Net.POLLIN);
+ n = IOUtil.read(fd, dsts, offset, length, nd);
+ }
+ }
} finally {
endRead(blocking, n > 0);
assert IOStatus.check(n);
@@ -659,7 +708,8 @@
begin();
}
SocketAddress remote;
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
remote = remoteAddress;
if ((remote == null) && mustBeConnected)
@@ -668,6 +718,8 @@
bindInternal(null);
if (blocking)
writerThread = NativeThread.current();
+ } finally {
+ stateLock.unlock();
}
return remote;
}
@@ -681,12 +733,15 @@
throws AsynchronousCloseException
{
if (blocking) {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
writerThread = 0;
// notify any thread waiting in implCloseSelectableChannel
if (state == ST_CLOSING) {
- stateLock.notifyAll();
+ stateCondition.signalAll();
}
+ } finally {
+ stateLock.unlock();
}
// remove hook for Thread.interrupt
end(completed);
@@ -703,9 +758,13 @@
int n = 0;
try {
beginWrite(blocking, true);
- do {
- n = IOUtil.write(fd, buf, -1, nd);
- } while ((n == IOStatus.INTERRUPTED) && isOpen());
+ n = IOUtil.write(fd, buf, -1, nd);
+ if (blocking) {
+ while (IOStatus.okayToRetry(n) && isOpen()) {
+ park(Net.POLLOUT);
+ n = IOUtil.write(fd, buf, -1, nd);
+ }
+ }
} finally {
endWrite(blocking, n > 0);
assert IOStatus.check(n);
@@ -728,9 +787,13 @@
long n = 0;
try {
beginWrite(blocking, true);
- do {
- n = IOUtil.write(fd, srcs, offset, length, nd);
- } while ((n == IOStatus.INTERRUPTED) && isOpen());
+ n = IOUtil.write(fd, srcs, offset, length, nd);
+ if (blocking) {
+ while (IOStatus.okayToRetry(n) && isOpen()) {
+ park(Net.POLLOUT);
+ n = IOUtil.write(fd, srcs, offset, length, nd);
+ }
+ }
} finally {
endWrite(blocking, n > 0);
assert IOStatus.check(n);
@@ -747,9 +810,12 @@
try {
writeLock.lock();
try {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
IOUtil.configureBlocking(fd, block);
+ } finally {
+ stateLock.unlock();
}
} finally {
writeLock.unlock();
@@ -760,14 +826,20 @@
}
InetSocketAddress localAddress() {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
return localAddress;
+ } finally {
+ stateLock.unlock();
}
}
InetSocketAddress remoteAddress() {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
return remoteAddress;
+ } finally {
+ stateLock.unlock();
}
}
@@ -777,11 +849,14 @@
try {
writeLock.lock();
try {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
if (localAddress != null)
throw new AlreadyBoundException();
bindInternal(local);
+ } finally {
+ stateLock.unlock();
}
} finally {
writeLock.unlock();
@@ -793,7 +868,7 @@
}
private void bindInternal(SocketAddress local) throws IOException {
- assert Thread.holdsLock(stateLock) && (localAddress == null);
+ assert stateLock.isHeldByCurrentThread() && (localAddress == null);
InetSocketAddress isa;
if (local == null) {
@@ -816,8 +891,11 @@
@Override
public boolean isConnected() {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
return (state == ST_CONNECTED);
+ } finally {
+ stateLock.unlock();
}
}
@@ -839,7 +917,8 @@
try {
writeLock.lock();
try {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
if (state == ST_CONNECTED)
throw new AlreadyConnectedException();
@@ -865,7 +944,7 @@
}
try {
ByteBuffer buf = ByteBuffer.allocate(100);
- while (receive(buf) != null) {
+ while (receive(fd, buf, false) > 0) {
buf.clear();
}
} finally {
@@ -873,6 +952,9 @@
IOUtil.configureBlocking(fd, true);
}
}
+
+ } finally {
+ stateLock.unlock();
}
} finally {
writeLock.unlock();
@@ -889,7 +971,8 @@
try {
writeLock.lock();
try {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
if (!isOpen() || (state != ST_CONNECTED))
return this;
@@ -903,6 +986,8 @@
// refresh local address
localAddress = Net.localAddress(fd);
+ } finally {
+ stateLock.unlock();
}
} finally {
writeLock.unlock();
@@ -950,7 +1035,8 @@
if (sm != null)
sm.checkMulticast(group);
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
// check the registry to see if we are already a member of the group
@@ -1005,6 +1091,8 @@
registry.add(key);
return key;
+ } finally {
+ stateLock.unlock();
}
}
@@ -1030,7 +1118,8 @@
void drop(MembershipKeyImpl key) {
assert key.channel() == this;
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
if (!key.isValid())
return;
@@ -1051,6 +1140,8 @@
key.invalidate();
registry.remove(key);
+ } finally {
+ stateLock.unlock();
}
}
@@ -1064,7 +1155,8 @@
assert key.channel() == this;
assert key.sourceAddress() == null;
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
if (!key.isValid())
throw new IllegalStateException("key is no longer valid");
if (source.isAnyLocalAddress())
@@ -1090,6 +1182,8 @@
// ancient kernel
throw new UnsupportedOperationException();
}
+ } finally {
+ stateLock.unlock();
}
}
@@ -1100,7 +1194,8 @@
assert key.channel() == this;
assert key.sourceAddress() == null;
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
if (!key.isValid())
throw new IllegalStateException("key is no longer valid");
@@ -1120,6 +1215,8 @@
// should not happen
throw new AssertionError(ioe);
}
+ } finally {
+ stateLock.unlock();
}
}
@@ -1144,7 +1241,8 @@
boolean interrupted = false;
// set state to ST_CLOSING and invalid membership keys
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
assert state < ST_CLOSING;
blocking = isBlocking();
state = ST_CLOSING;
@@ -1152,11 +1250,14 @@
// if member of any multicast groups then invalidate the keys
if (registry != null)
registry.invalidateAll();
+ } finally {
+ stateLock.unlock();
}
// wait for any outstanding I/O operations to complete
if (blocking) {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
assert state == ST_CLOSING;
long reader = readerThread;
long writer = writerThread;
@@ -1171,12 +1272,14 @@
// wait for blocking I/O operations to end
while (readerThread != 0 || writerThread != 0) {
try {
- stateLock.wait();
+ stateCondition.await();
} catch (InterruptedException e) {
interrupted = true;
}
}
}
+ } finally {
+ stateLock.unlock();
}
} else {
// non-blocking mode: wait for read/write to complete
@@ -1190,9 +1293,12 @@
}
// set state to ST_KILLPENDING
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
assert state == ST_CLOSING;
state = ST_KILLPENDING;
+ } finally {
+ stateLock.unlock();
}
// close socket if not registered with Selector
@@ -1206,7 +1312,8 @@
@Override
public void kill() throws IOException {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
if (state == ST_KILLPENDING) {
state = ST_KILLED;
try {
@@ -1216,6 +1323,8 @@
ResourceManager.afterUdpClose();
}
}
+ } finally {
+ stateLock.unlock();
}
}
--- a/src/java.base/share/classes/sun/nio/ch/DatagramSocketAdaptor.java Thu Apr 25 09:12:40 2019 +0200
+++ b/src/java.base/share/classes/sun/nio/ch/DatagramSocketAdaptor.java Thu Apr 25 10:41:49 2019 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2001, 2018, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2001, 2019, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -42,6 +42,7 @@
import java.nio.channels.DatagramChannel;
import java.nio.channels.IllegalBlockingModeException;
import java.util.Objects;
+import java.util.Set;
// Make a datagram-socket channel look like a datagram socket.
@@ -51,7 +52,7 @@
// class.
//
-public class DatagramSocketAdaptor
+class DatagramSocketAdaptor
extends DatagramSocket
{
// The channel being adapted
@@ -61,7 +62,7 @@
private volatile int timeout;
// ## super will create a useless impl
- private DatagramSocketAdaptor(DatagramChannelImpl dc) throws IOException {
+ private DatagramSocketAdaptor(DatagramChannelImpl dc) {
// Invoke the DatagramSocketAdaptor(SocketAddress) constructor,
// passing a dummy DatagramSocketImpl object to avoid any native
// resource allocation in super class and invoking our bind method
@@ -70,12 +71,8 @@
this.dc = dc;
}
- public static DatagramSocket create(DatagramChannelImpl dc) {
- try {
- return new DatagramSocketAdaptor(dc);
- } catch (IOException x) {
- throw new Error(x);
- }
+ static DatagramSocket create(DatagramChannelImpl dc) {
+ return new DatagramSocketAdaptor(dc);
}
private void connectInternal(SocketAddress remote)
@@ -96,6 +93,7 @@
}
}
+ @Override
public void bind(SocketAddress local) throws SocketException {
try {
if (local == null)
@@ -106,6 +104,7 @@
}
}
+ @Override
public void connect(InetAddress address, int port) {
try {
connectInternal(new InetSocketAddress(address, port));
@@ -114,11 +113,13 @@
}
}
+ @Override
public void connect(SocketAddress remote) throws SocketException {
Objects.requireNonNull(remote, "Address can't be null");
connectInternal(remote);
}
+ @Override
public void disconnect() {
try {
dc.disconnect();
@@ -127,24 +128,39 @@
}
}
+ @Override
public boolean isBound() {
return dc.localAddress() != null;
}
+ @Override
public boolean isConnected() {
return dc.remoteAddress() != null;
}
+ @Override
public InetAddress getInetAddress() {
InetSocketAddress remote = dc.remoteAddress();
return (remote != null) ? remote.getAddress() : null;
}
+ @Override
public int getPort() {
InetSocketAddress remote = dc.remoteAddress();
return (remote != null) ? remote.getPort() : -1;
}
+ @Override
+ public SocketAddress getRemoteSocketAddress() {
+ return dc.remoteAddress();
+ }
+
+ @Override
+ public SocketAddress getLocalSocketAddress() {
+ return dc.localAddress();
+ }
+
+ @Override
public void send(DatagramPacket p) throws IOException {
synchronized (dc.blockingLock()) {
if (!dc.isBlocking())
@@ -198,6 +214,7 @@
}
}
+ @Override
public void receive(DatagramPacket p) throws IOException {
synchronized (dc.blockingLock()) {
if (!dc.isBlocking())
@@ -217,6 +234,7 @@
}
}
+ @Override
public InetAddress getLocalAddress() {
if (isClosed())
return null;
@@ -235,6 +253,7 @@
return result;
}
+ @Override
public int getLocalPort() {
if (isClosed())
return -1;
@@ -248,11 +267,19 @@
return 0;
}
+ @Override
public void setSoTimeout(int timeout) throws SocketException {
+ if (!dc.isOpen())
+ throw new SocketException("Socket is closed");
+ if (timeout < 0)
+ throw new IllegalArgumentException("timeout < 0");
this.timeout = timeout;
}
+ @Override
public int getSoTimeout() throws SocketException {
+ if (!dc.isOpen())
+ throw new SocketException("Socket is closed");
return timeout;
}
@@ -294,51 +321,62 @@
}
}
+ @Override
public void setSendBufferSize(int size) throws SocketException {
if (size <= 0)
throw new IllegalArgumentException("Invalid send size");
setIntOption(StandardSocketOptions.SO_SNDBUF, size);
}
+ @Override
public int getSendBufferSize() throws SocketException {
return getIntOption(StandardSocketOptions.SO_SNDBUF);
}
+ @Override
public void setReceiveBufferSize(int size) throws SocketException {
if (size <= 0)
throw new IllegalArgumentException("Invalid receive size");
setIntOption(StandardSocketOptions.SO_RCVBUF, size);
}
+ @Override
public int getReceiveBufferSize() throws SocketException {
return getIntOption(StandardSocketOptions.SO_RCVBUF);
}
+ @Override
public void setReuseAddress(boolean on) throws SocketException {
setBooleanOption(StandardSocketOptions.SO_REUSEADDR, on);
}
+ @Override
public boolean getReuseAddress() throws SocketException {
return getBooleanOption(StandardSocketOptions.SO_REUSEADDR);
}
+ @Override
public void setBroadcast(boolean on) throws SocketException {
setBooleanOption(StandardSocketOptions.SO_BROADCAST, on);
}
+ @Override
public boolean getBroadcast() throws SocketException {
return getBooleanOption(StandardSocketOptions.SO_BROADCAST);
}
+ @Override
public void setTrafficClass(int tc) throws SocketException {
setIntOption(StandardSocketOptions.IP_TOS, tc);
}
+ @Override
public int getTrafficClass() throws SocketException {
return getIntOption(StandardSocketOptions.IP_TOS);
}
+ @Override
public void close() {
try {
dc.close();
@@ -347,14 +385,32 @@
}
}
+ @Override
public boolean isClosed() {
return !dc.isOpen();
}
+ @Override
public DatagramChannel getChannel() {
return dc;
}
+ @Override
+ public <T> DatagramSocket setOption(SocketOption<T> name, T value) throws IOException {
+ dc.setOption(name, value);
+ return this;
+ }
+
+ @Override
+ public <T> T getOption(SocketOption<T> name) throws IOException {
+ return dc.getOption(name);
+ }
+
+ @Override
+ public Set<SocketOption<?>> supportedOptions() {
+ return dc.supportedOptions();
+ }
+
/*
* A dummy implementation of DatagramSocketImpl that can be passed to the
* DatagramSocket constructor so that no native resources are allocated in
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/java.base/share/classes/sun/nio/ch/DummySocketImpl.java Thu Apr 25 10:41:49 2019 +0100
@@ -0,0 +1,153 @@
+/*
+ * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package sun.nio.ch;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.SocketAddress;
+import java.net.SocketImpl;
+import java.net.SocketOption;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Set;
+
+/**
+ * Dummy SocketImpl for use by the socket adaptors. All methods are overridden
+ * to throw an error.
+ */
+
+class DummySocketImpl extends SocketImpl {
+ private static final PrivilegedAction<SocketImpl> NEW = DummySocketImpl::new;
+
+ private DummySocketImpl() { }
+
+ static SocketImpl create() {
+ return AccessController.doPrivileged(NEW);
+ }
+
+ private static <T> T shouldNotGetHere() {
+ throw new InternalError("Should not get here");
+ }
+
+ @Override
+ protected void create(boolean stream) {
+ shouldNotGetHere();
+ }
+
+ @Override
+ protected void connect(SocketAddress remote, int millis) {
+ shouldNotGetHere();
+ }
+
+ @Override
+ protected void connect(String host, int port) {
+ shouldNotGetHere();
+ }
+
+ @Override
+ protected void connect(InetAddress address, int port) {
+ shouldNotGetHere();
+ }
+
+ @Override
+ protected void bind(InetAddress host, int port) {
+ shouldNotGetHere();
+ }
+
+ @Override
+ protected void listen(int backlog) {
+ shouldNotGetHere();
+ }
+
+ @Override
+ protected void accept(SocketImpl si) {
+ shouldNotGetHere();
+ }
+
+ @Override
+ protected InputStream getInputStream() {
+ return shouldNotGetHere();
+ }
+ @Override
+ protected OutputStream getOutputStream() {
+ return shouldNotGetHere();
+ }
+ @Override
+ protected int available() {
+ return shouldNotGetHere();
+ }
+
+ @Override
+ protected void close() {
+ shouldNotGetHere();
+ }
+
+ @Override
+ protected Set<SocketOption<?>> supportedOptions() {
+ return shouldNotGetHere();
+ }
+
+ @Override
+ protected <T> void setOption(SocketOption<T> opt, T value) {
+ shouldNotGetHere();
+ }
+
+ @Override
+ protected <T> T getOption(SocketOption<T> opt) {
+ return shouldNotGetHere();
+ }
+
+ @Override
+ public void setOption(int opt, Object value) {
+ shouldNotGetHere();
+ }
+
+ @Override
+ public Object getOption(int opt) {
+ return shouldNotGetHere();
+ }
+
+ @Override
+ protected void shutdownInput() {
+ shouldNotGetHere();
+ }
+
+ @Override
+ protected void shutdownOutput() {
+ shouldNotGetHere();
+ }
+
+ @Override
+ protected boolean supportsUrgentData() {
+ return shouldNotGetHere();
+ }
+
+ @Override
+ protected void sendUrgentData(int data) {
+ shouldNotGetHere();
+ }
+}
--- a/src/java.base/share/classes/sun/nio/ch/SelChImpl.java Thu Apr 25 09:12:40 2019 +0200
+++ b/src/java.base/share/classes/sun/nio/ch/SelChImpl.java Thu Apr 25 10:41:49 2019 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2000, 2019, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -29,6 +29,8 @@
import java.io.FileDescriptor;
import java.io.IOException;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
/**
* An interface that allows translation (and more!).
*
@@ -68,4 +70,40 @@
void kill() throws IOException;
+ /**
+ * Disables the current thread for scheduling purposes until this
+ * channel is ready for I/O, or asynchronously closed, for up to the
+ * specified waiting time.
+ *
+ * <p> This method does <em>not</em> report which of these caused the
+ * method to return. Callers should re-check the conditions which caused
+ * the thread to park.
+ *
+ * @param event the event to poll
+ * @param nanos the timeout to wait; {@code <= 0} to wait indefinitely
+ */
+ default void park(int event, long nanos) throws IOException {
+ long millis;
+ if (nanos <= 0) {
+ millis = -1;
+ } else {
+ millis = NANOSECONDS.toMillis(nanos);
+ }
+ Net.poll(getFD(), event, millis);
+ }
+
+ /**
+ * Disables the current thread for scheduling purposes until this
+ * channel is ready for I/O, or asynchronously closed.
+ *
+ * <p> This method does <em>not</em> report which of these caused the
+ * method to return. Callers should re-check the conditions which caused
+ * the thread to park.
+ *
+ * @param event the event to poll
+ */
+ default void park(int event) throws IOException {
+ park(event, 0L);
+ }
+
}
--- a/src/java.base/share/classes/sun/nio/ch/ServerSocketAdaptor.java Thu Apr 25 09:12:40 2019 +0200
+++ b/src/java.base/share/classes/sun/nio/ch/ServerSocketAdaptor.java Thu Apr 25 10:41:49 2019 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2000, 2019, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -32,12 +32,14 @@
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
-import java.net.SocketTimeoutException;
+import java.net.SocketOption;
import java.net.StandardSocketOptions;
import java.nio.channels.IllegalBlockingModeException;
-import java.nio.channels.NotYetBoundException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
+import java.util.Set;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
// Make a server-socket channel look like a server socket.
@@ -56,23 +58,21 @@
// Timeout "option" value for accepts
private volatile int timeout;
- public static ServerSocket create(ServerSocketChannelImpl ssc) {
- try {
- return new ServerSocketAdaptor(ssc);
- } catch (IOException x) {
- throw new Error(x);
- }
+ static ServerSocket create(ServerSocketChannelImpl ssc) {
+ return new ServerSocketAdaptor(ssc);
}
- // ## super will create a useless impl
- private ServerSocketAdaptor(ServerSocketChannelImpl ssc) throws IOException {
+ private ServerSocketAdaptor(ServerSocketChannelImpl ssc) {
+ super(DummySocketImpl.create());
this.ssc = ssc;
}
+ @Override
public void bind(SocketAddress local) throws IOException {
bind(local, 50);
}
+ @Override
public void bind(SocketAddress local, int backlog) throws IOException {
if (local == null)
local = new InetSocketAddress(0);
@@ -83,6 +83,7 @@
}
}
+ @Override
public InetAddress getInetAddress() {
InetSocketAddress local = ssc.localAddress();
if (local == null) {
@@ -92,6 +93,7 @@
}
}
+ @Override
public int getLocalPort() {
InetSocketAddress local = ssc.localAddress();
if (local == null) {
@@ -101,65 +103,65 @@
}
}
+ @Override
public Socket accept() throws IOException {
- synchronized (ssc.blockingLock()) {
- try {
- if (!ssc.isBound())
- throw new NotYetBoundException();
-
- long to = this.timeout;
- if (to == 0) {
- // for compatibility reasons: accept connection if available
- // when configured non-blocking
- SocketChannel sc = ssc.accept();
- if (sc == null && !ssc.isBlocking())
- throw new IllegalBlockingModeException();
- return sc.socket();
+ SocketChannel sc = null;
+ try {
+ int timeout = this.timeout;
+ if (timeout > 0) {
+ long nanos = MILLISECONDS.toNanos(timeout);
+ sc = ssc.blockingAccept(nanos);
+ } else {
+ // accept connection if possible when non-blocking (to preserve
+ // long standing behavior)
+ sc = ssc.accept();
+ if (sc == null) {
+ throw new IllegalBlockingModeException();
}
-
- if (!ssc.isBlocking())
- throw new IllegalBlockingModeException();
- for (;;) {
- long st = System.currentTimeMillis();
- if (ssc.pollAccept(to))
- return ssc.accept().socket();
- to -= System.currentTimeMillis() - st;
- if (to <= 0)
- throw new SocketTimeoutException();
- }
-
- } catch (Exception x) {
- Net.translateException(x);
- assert false;
- return null; // Never happens
}
+ } catch (Exception e) {
+ Net.translateException(e);
}
+ return sc.socket();
}
+ @Override
public void close() throws IOException {
ssc.close();
}
+ @Override
public ServerSocketChannel getChannel() {
return ssc;
}
+ @Override
public boolean isBound() {
return ssc.isBound();
}
+ @Override
public boolean isClosed() {
return !ssc.isOpen();
}
+ @Override
public void setSoTimeout(int timeout) throws SocketException {
+ if (!ssc.isOpen())
+ throw new SocketException("Socket is closed");
+ if (timeout < 0)
+ throw new IllegalArgumentException("timeout < 0");
this.timeout = timeout;
}
+ @Override
public int getSoTimeout() throws SocketException {
+ if (!ssc.isOpen())
+ throw new SocketException("Socket is closed");
return timeout;
}
+ @Override
public void setReuseAddress(boolean on) throws SocketException {
try {
ssc.setOption(StandardSocketOptions.SO_REUSEADDR, on);
@@ -168,6 +170,7 @@
}
}
+ @Override
public boolean getReuseAddress() throws SocketException {
try {
return ssc.getOption(StandardSocketOptions.SO_REUSEADDR).booleanValue();
@@ -177,6 +180,7 @@
}
}
+ @Override
public String toString() {
if (!isBound())
return "ServerSocket[unbound]";
@@ -184,6 +188,7 @@
",localport=" + getLocalPort() + "]";
}
+ @Override
public void setReceiveBufferSize(int size) throws SocketException {
// size 0 valid for ServerSocketChannel, invalid for ServerSocket
if (size <= 0)
@@ -195,6 +200,7 @@
}
}
+ @Override
public int getReceiveBufferSize() throws SocketException {
try {
return ssc.getOption(StandardSocketOptions.SO_RCVBUF).intValue();
@@ -203,4 +209,20 @@
return -1; // Never happens
}
}
+
+ @Override
+ public <T> ServerSocket setOption(SocketOption<T> name, T value) throws IOException {
+ ssc.setOption(name, value);
+ return this;
+ }
+
+ @Override
+ public <T> T getOption(SocketOption<T> name) throws IOException {
+ return ssc.getOption(name);
+ }
+
+ @Override
+ public Set<SocketOption<?>> supportedOptions() {
+ return ssc.supportedOptions();
+ }
}
--- a/src/java.base/share/classes/sun/nio/ch/ServerSocketChannelImpl.java Thu Apr 25 09:12:40 2019 +0200
+++ b/src/java.base/share/classes/sun/nio/ch/ServerSocketChannelImpl.java Thu Apr 25 10:41:49 2019 +0100
@@ -31,10 +31,12 @@
import java.net.ServerSocket;
import java.net.SocketAddress;
import java.net.SocketOption;
+import java.net.SocketTimeoutException;
import java.net.StandardSocketOptions;
import java.nio.channels.AlreadyBoundException;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedChannelException;
+import java.nio.channels.IllegalBlockingModeException;
import java.nio.channels.NotYetBoundException;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
@@ -44,6 +46,7 @@
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import sun.net.NetHooks;
@@ -69,7 +72,8 @@
// Lock held by any thread that modifies the state fields declared below
// DO NOT invoke a blocking I/O operation while holding this lock!
- private final Object stateLock = new Object();
+ private final ReentrantLock stateLock = new ReentrantLock();
+ private final Condition stateCondition = stateLock.newCondition();
// -- The following fields are protected by stateLock
@@ -95,7 +99,7 @@
// -- End of fields protected by stateLock
- ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
+ ServerSocketChannelImpl(SelectorProvider sp) {
super(sp);
this.fd = Net.serverSocket(true);
this.fdVal = IOUtil.fdVal(fd);
@@ -108,8 +112,11 @@
this.fd = fd;
this.fdVal = IOUtil.fdVal(fd);
if (bound) {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
localAddress = Net.localAddress(fd);
+ } finally {
+ stateLock.unlock();
}
}
}
@@ -122,20 +129,26 @@
@Override
public ServerSocket socket() {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
if (socket == null)
socket = ServerSocketAdaptor.create(this);
return socket;
+ } finally {
+ stateLock.unlock();
}
}
@Override
public SocketAddress getLocalAddress() throws IOException {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
return (localAddress == null)
? null
: Net.getRevealedLocalAddress(localAddress);
+ } finally {
+ stateLock.unlock();
}
}
@@ -146,7 +159,8 @@
Objects.requireNonNull(name);
if (!supportedOptions().contains(name))
throw new UnsupportedOperationException("'" + name + "' not supported");
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
@@ -157,6 +171,8 @@
Net.setSocketOption(fd, Net.UNSPEC, name, value);
}
return this;
+ } finally {
+ stateLock.unlock();
}
}
@@ -169,7 +185,8 @@
if (!supportedOptions().contains(name))
throw new UnsupportedOperationException("'" + name + "' not supported");
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
// SO_REUSEADDR emulated when using exclusive bind
@@ -177,6 +194,8 @@
}
// no options that require special handling
return (T) Net.getSocketOption(fd, Net.UNSPEC, name);
+ } finally {
+ stateLock.unlock();
}
}
@@ -202,7 +221,8 @@
@Override
public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
if (localAddress != null)
throw new AlreadyBoundException();
@@ -216,6 +236,8 @@
Net.bind(fd, isa.getAddress(), isa.getPort());
Net.listen(fd, backlog < 1 ? 50 : backlog);
localAddress = Net.localAddress(fd);
+ } finally {
+ stateLock.unlock();
}
return this;
}
@@ -229,12 +251,15 @@
private void begin(boolean blocking) throws ClosedChannelException {
if (blocking)
begin(); // set blocker to close channel if interrupted
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
if (localAddress == null)
throw new NotYetBoundException();
if (blocking)
thread = NativeThread.current();
+ } finally {
+ stateLock.unlock();
}
}
@@ -248,12 +273,15 @@
throws AsynchronousCloseException
{
if (blocking) {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
thread = 0;
// notify any thread waiting in implCloseSelectableChannel
if (state == ST_CLOSING) {
- stateLock.notifyAll();
+ stateCondition.signalAll();
}
+ } finally {
+ stateLock.unlock();
}
end(completed);
}
@@ -270,22 +298,82 @@
boolean blocking = isBlocking();
try {
begin(blocking);
- do {
- n = Net.accept(this.fd, newfd, isaa);
- } while (n == IOStatus.INTERRUPTED && isOpen());
+ n = Net.accept(this.fd, newfd, isaa);
+ if (blocking) {
+ while (IOStatus.okayToRetry(n) && isOpen()) {
+ park(Net.POLLIN);
+ n = Net.accept(this.fd, newfd, isaa);
+ }
+ }
} finally {
end(blocking, n > 0);
assert IOStatus.check(n);
}
-
} finally {
acceptLock.unlock();
}
- if (n < 1)
+ if (n > 0) {
+ return finishAccept(newfd, isaa[0]);
+ } else {
return null;
+ }
+ }
- InetSocketAddress isa = isaa[0];
+ /**
+ * Accepts a new connection with a given timeout. This method requires the
+ * channel to be configured in blocking mode.
+ *
+ * @apiNote This method is for use by the socket adaptor.
+ *
+ * @param nanos the timeout, in nanoseconds
+ * @throws IllegalBlockingModeException if the channel is configured non-blocking
+ * @throws SocketTimeoutException if the timeout expires
+ */
+ SocketChannel blockingAccept(long nanos) throws IOException {
+ int n = 0;
+ FileDescriptor newfd = new FileDescriptor();
+ InetSocketAddress[] isaa = new InetSocketAddress[1];
+
+ acceptLock.lock();
+ try {
+ // check that channel is configured blocking
+ if (!isBlocking())
+ throw new IllegalBlockingModeException();
+
+ try {
+ begin(true);
+ // change socket to non-blocking
+ lockedConfigureBlocking(false);
+ try {
+ long startNanos = System.nanoTime();
+ n = Net.accept(fd, newfd, isaa);
+ while (n == IOStatus.UNAVAILABLE && isOpen()) {
+ long remainingNanos = nanos - (System.nanoTime() - startNanos);
+ if (remainingNanos <= 0) {
+ throw new SocketTimeoutException("Accept timed out");
+ }
+ park(Net.POLLIN, remainingNanos);
+ n = Net.accept(fd, newfd, isaa);
+ }
+ } finally {
+ // restore socket to blocking mode
+ lockedConfigureBlocking(true);
+ }
+ } finally {
+ end(true, n > 0);
+ }
+ } finally {
+ acceptLock.unlock();
+ }
+
+ assert n > 0;
+ return finishAccept(newfd, isaa[0]);
+ }
+
+ private SocketChannel finishAccept(FileDescriptor newfd, InetSocketAddress isa)
+ throws IOException
+ {
try {
// newly accepted socket is initially in blocking mode
IOUtil.configureBlocking(newfd, true);
@@ -306,16 +394,27 @@
protected void implConfigureBlocking(boolean block) throws IOException {
acceptLock.lock();
try {
- synchronized (stateLock) {
- ensureOpen();
- IOUtil.configureBlocking(fd, block);
- }
+ lockedConfigureBlocking(block);
} finally {
acceptLock.unlock();
}
}
/**
+ * Adjust the blocking mode while holding acceptLock.
+ */
+ private void lockedConfigureBlocking(boolean block) throws IOException {
+ assert acceptLock.isHeldByCurrentThread();
+ stateLock.lock();
+ try {
+ ensureOpen();
+ IOUtil.configureBlocking(fd, block);
+ } finally {
+ stateLock.unlock();
+ }
+ }
+
+ /**
* Invoked by implCloseChannel to close the channel.
*
* This method waits for outstanding I/O operations to complete. When in
@@ -336,15 +435,19 @@
boolean blocking;
// set state to ST_CLOSING
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
assert state < ST_CLOSING;
state = ST_CLOSING;
blocking = isBlocking();
+ } finally {
+ stateLock.unlock();
}
// wait for any outstanding accept to complete
if (blocking) {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
assert state == ST_CLOSING;
long th = thread;
if (th != 0) {
@@ -354,12 +457,14 @@
// wait for accept operation to end
while (thread != 0) {
try {
- stateLock.wait();
+ stateCondition.await();
} catch (InterruptedException e) {
interrupted = true;
}
}
}
+ } finally {
+ stateLock.unlock();
}
} else {
// non-blocking mode: wait for accept to complete
@@ -368,9 +473,12 @@
}
// set state to ST_KILLPENDING
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
assert state == ST_CLOSING;
state = ST_KILLPENDING;
+ } finally {
+ stateLock.unlock();
}
// close socket if not registered with Selector
@@ -384,11 +492,14 @@
@Override
public void kill() throws IOException {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
if (state == ST_KILLPENDING) {
state = ST_KILLED;
nd.close(fd);
}
+ } finally {
+ stateLock.unlock();
}
}
@@ -396,8 +507,11 @@
* Returns true if channel's socket is bound
*/
boolean isBound() {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
return localAddress != null;
+ } finally {
+ stateLock.unlock();
}
}
@@ -405,30 +519,11 @@
* Returns the local address, or null if not bound
*/
InetSocketAddress localAddress() {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
return localAddress;
- }
- }
-
- /**
- * Poll this channel's socket for a new connection up to the given timeout.
- * @return {@code true} if there is a connection to accept
- */
- boolean pollAccept(long timeout) throws IOException {
- assert Thread.holdsLock(blockingLock()) && isBlocking();
- acceptLock.lock();
- try {
- boolean polled = false;
- try {
- begin(true);
- int events = Net.poll(fd, Net.POLLIN, timeout);
- polled = (events != 0);
- } finally {
- end(true, polled);
- }
- return polled;
} finally {
- acceptLock.unlock();
+ stateLock.unlock();
}
}
@@ -494,13 +589,16 @@
if (!isOpen()) {
sb.append("closed");
} else {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
InetSocketAddress addr = localAddress;
if (addr == null) {
sb.append("unbound");
} else {
sb.append(Net.getRevealedLocalAddressAsString(addr));
}
+ } finally {
+ stateLock.unlock();
}
}
sb.append(']');
--- a/src/java.base/share/classes/sun/nio/ch/SocketAdaptor.java Thu Apr 25 09:12:40 2019 +0200
+++ b/src/java.base/share/classes/sun/nio/ch/SocketAdaptor.java Thu Apr 25 10:41:49 2019 +0100
@@ -33,18 +33,12 @@
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
-import java.net.SocketImpl;
import java.net.SocketOption;
-import java.net.SocketTimeoutException;
import java.net.StandardSocketOptions;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.IllegalBlockingModeException;
import java.nio.channels.SocketChannel;
-import java.security.AccessController;
-import java.security.PrivilegedExceptionAction;
-import static java.util.concurrent.TimeUnit.*;
+import java.util.Set;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
// Make a socket channel look like a socket.
//
@@ -62,11 +56,11 @@
private volatile int timeout;
private SocketAdaptor(SocketChannelImpl sc) throws SocketException {
- super((SocketImpl) null);
+ super(DummySocketImpl.create());
this.sc = sc;
}
- public static Socket create(SocketChannelImpl sc) {
+ static Socket create(SocketChannelImpl sc) {
try {
return new SocketAdaptor(sc);
} catch (SocketException e) {
@@ -74,70 +68,30 @@
}
}
- public SocketChannel getChannel() {
- return sc;
- }
-
- // Override this method just to protect against changes in the superclass
- //
+ @Override
public void connect(SocketAddress remote) throws IOException {
connect(remote, 0);
}
+ @Override
public void connect(SocketAddress remote, int timeout) throws IOException {
if (remote == null)
throw new IllegalArgumentException("connect: The address can't be null");
if (timeout < 0)
throw new IllegalArgumentException("connect: timeout can't be negative");
-
- synchronized (sc.blockingLock()) {
- if (!sc.isBlocking())
- throw new IllegalBlockingModeException();
-
- try {
- // no timeout
- if (timeout == 0) {
- sc.connect(remote);
- return;
- }
-
- // timed connect
- sc.configureBlocking(false);
- try {
- if (sc.connect(remote))
- return;
- } finally {
- try {
- sc.configureBlocking(true);
- } catch (ClosedChannelException e) { }
- }
-
- long timeoutNanos = NANOSECONDS.convert(timeout, MILLISECONDS);
- long to = timeout;
- for (;;) {
- long startTime = System.nanoTime();
- if (sc.pollConnected(to)) {
- boolean connected = sc.finishConnect();
- assert connected;
- break;
- }
- timeoutNanos -= System.nanoTime() - startTime;
- if (timeoutNanos <= 0) {
- try {
- sc.close();
- } catch (IOException x) { }
- throw new SocketTimeoutException();
- }
- to = MILLISECONDS.convert(timeoutNanos, NANOSECONDS);
- }
-
- } catch (Exception x) {
- Net.translateException(x, true);
+ try {
+ if (timeout > 0) {
+ long nanos = MILLISECONDS.toNanos(timeout);
+ sc.blockingConnect(remote, nanos);
+ } else {
+ sc.blockingConnect(remote, Long.MAX_VALUE);
}
+ } catch (Exception e) {
+ Net.translateException(e, true);
}
-
}
+ @Override
public void bind(SocketAddress local) throws IOException {
try {
sc.bind(local);
@@ -146,6 +100,7 @@
}
}
+ @Override
public InetAddress getInetAddress() {
InetSocketAddress remote = sc.remoteAddress();
if (remote == null) {
@@ -155,6 +110,7 @@
}
}
+ @Override
public InetAddress getLocalAddress() {
if (sc.isOpen()) {
InetSocketAddress local = sc.localAddress();
@@ -165,6 +121,7 @@
return new InetSocketAddress(0).getAddress();
}
+ @Override
public int getPort() {
InetSocketAddress remote = sc.remoteAddress();
if (remote == null) {
@@ -174,6 +131,7 @@
}
}
+ @Override
public int getLocalPort() {
InetSocketAddress local = sc.localAddress();
if (local == null) {
@@ -183,48 +141,27 @@
}
}
- private class SocketInputStream
- extends ChannelInputStream
- {
- private SocketInputStream() {
- super(sc);
- }
-
- protected int read(ByteBuffer bb)
- throws IOException
- {
- synchronized (sc.blockingLock()) {
- if (!sc.isBlocking())
- throw new IllegalBlockingModeException();
-
- // no timeout
- long to = SocketAdaptor.this.timeout;
- if (to == 0)
- return sc.read(bb);
+ @Override
+ public SocketAddress getRemoteSocketAddress() {
+ return sc.remoteAddress();
+ }
- // timed read
- long timeoutNanos = NANOSECONDS.convert(to, MILLISECONDS);
- for (;;) {
- long startTime = System.nanoTime();
- if (sc.pollRead(to)) {
- return sc.read(bb);
- }
- timeoutNanos -= System.nanoTime() - startTime;
- if (timeoutNanos <= 0)
- throw new SocketTimeoutException();
- to = MILLISECONDS.convert(timeoutNanos, NANOSECONDS);
- }
- }
- }
-
- @Override
- public int available() throws IOException {
- return sc.available();
+ @Override
+ public SocketAddress getLocalSocketAddress() {
+ InetSocketAddress local = sc.localAddress();
+ if (local != null) {
+ return Net.getRevealedLocalAddress(local);
+ } else {
+ return null;
}
}
- private InputStream socketInputStream = null;
+ @Override
+ public SocketChannel getChannel() {
+ return sc;
+ }
+ @Override
public InputStream getInputStream() throws IOException {
if (!sc.isOpen())
throw new SocketException("Socket is closed");
@@ -232,21 +169,35 @@
throw new SocketException("Socket is not connected");
if (!sc.isInputOpen())
throw new SocketException("Socket input is shutdown");
- if (socketInputStream == null) {
- try {
- socketInputStream = AccessController.doPrivileged(
- new PrivilegedExceptionAction<InputStream>() {
- public InputStream run() throws IOException {
- return new SocketInputStream();
- }
- });
- } catch (java.security.PrivilegedActionException e) {
- throw (IOException)e.getException();
+ return new InputStream() {
+ @Override
+ public int read() throws IOException {
+ byte[] a = new byte[1];
+ int n = read(a, 0, 1);
+ return (n > 0) ? (a[0] & 0xff) : -1;
}
- }
- return socketInputStream;
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ int timeout = SocketAdaptor.this.timeout;
+ if (timeout > 0) {
+ long nanos = MILLISECONDS.toNanos(timeout);
+ return sc.blockingRead(b, off, len, nanos);
+ } else {
+ return sc.blockingRead(b, off, len, 0);
+ }
+ }
+ @Override
+ public int available() throws IOException {
+ return sc.available();
+ }
+ @Override
+ public void close() throws IOException {
+ sc.close();
+ }
+ };
}
+ @Override
public OutputStream getOutputStream() throws IOException {
if (!sc.isOpen())
throw new SocketException("Socket is closed");
@@ -254,18 +205,21 @@
throw new SocketException("Socket is not connected");
if (!sc.isOutputOpen())
throw new SocketException("Socket output is shutdown");
- OutputStream os = null;
- try {
- os = AccessController.doPrivileged(
- new PrivilegedExceptionAction<OutputStream>() {
- public OutputStream run() throws IOException {
- return Channels.newOutputStream(sc);
- }
- });
- } catch (java.security.PrivilegedActionException e) {
- throw (IOException)e.getException();
- }
- return os;
+ return new OutputStream() {
+ @Override
+ public void write(int b) throws IOException {
+ byte[] a = new byte[]{(byte) b};
+ write(a, 0, 1);
+ }
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ sc.blockingWriteFully(b, off, len);
+ }
+ @Override
+ public void close() throws IOException {
+ sc.close();
+ }
+ };
}
private void setBooleanOption(SocketOption<Boolean> name, boolean value)
@@ -306,48 +260,62 @@
}
}
+ @Override
public void setTcpNoDelay(boolean on) throws SocketException {
setBooleanOption(StandardSocketOptions.TCP_NODELAY, on);
}
+ @Override
public boolean getTcpNoDelay() throws SocketException {
return getBooleanOption(StandardSocketOptions.TCP_NODELAY);
}
+ @Override
public void setSoLinger(boolean on, int linger) throws SocketException {
if (!on)
linger = -1;
setIntOption(StandardSocketOptions.SO_LINGER, linger);
}
+ @Override
public int getSoLinger() throws SocketException {
return getIntOption(StandardSocketOptions.SO_LINGER);
}
+ @Override
public void sendUrgentData(int data) throws IOException {
int n = sc.sendOutOfBandData((byte) data);
if (n == 0)
throw new IOException("Socket buffer full");
}
+ @Override
public void setOOBInline(boolean on) throws SocketException {
setBooleanOption(ExtendedSocketOption.SO_OOBINLINE, on);
}
+ @Override
public boolean getOOBInline() throws SocketException {
return getBooleanOption(ExtendedSocketOption.SO_OOBINLINE);
}
+ @Override
public void setSoTimeout(int timeout) throws SocketException {
+ if (!sc.isOpen())
+ throw new SocketException("Socket is closed");
if (timeout < 0)
- throw new IllegalArgumentException("timeout can't be negative");
+ throw new IllegalArgumentException("timeout < 0");
this.timeout = timeout;
}
+ @Override
public int getSoTimeout() throws SocketException {
+ if (!sc.isOpen())
+ throw new SocketException("Socket is closed");
return timeout;
}
+ @Override
public void setSendBufferSize(int size) throws SocketException {
// size 0 valid for SocketChannel, invalid for Socket
if (size <= 0)
@@ -355,10 +323,12 @@
setIntOption(StandardSocketOptions.SO_SNDBUF, size);
}
+ @Override
public int getSendBufferSize() throws SocketException {
return getIntOption(StandardSocketOptions.SO_SNDBUF);
}
+ @Override
public void setReceiveBufferSize(int size) throws SocketException {
// size 0 valid for SocketChannel, invalid for Socket
if (size <= 0)
@@ -366,38 +336,47 @@
setIntOption(StandardSocketOptions.SO_RCVBUF, size);
}
+ @Override
public int getReceiveBufferSize() throws SocketException {
return getIntOption(StandardSocketOptions.SO_RCVBUF);
}
+ @Override
public void setKeepAlive(boolean on) throws SocketException {
setBooleanOption(StandardSocketOptions.SO_KEEPALIVE, on);
}
+ @Override
public boolean getKeepAlive() throws SocketException {
return getBooleanOption(StandardSocketOptions.SO_KEEPALIVE);
}
+ @Override
public void setTrafficClass(int tc) throws SocketException {
setIntOption(StandardSocketOptions.IP_TOS, tc);
}
+ @Override
public int getTrafficClass() throws SocketException {
return getIntOption(StandardSocketOptions.IP_TOS);
}
+ @Override
public void setReuseAddress(boolean on) throws SocketException {
setBooleanOption(StandardSocketOptions.SO_REUSEADDR, on);
}
+ @Override
public boolean getReuseAddress() throws SocketException {
return getBooleanOption(StandardSocketOptions.SO_REUSEADDR);
}
+ @Override
public void close() throws IOException {
sc.close();
}
+ @Override
public void shutdownInput() throws IOException {
try {
sc.shutdownInput();
@@ -406,6 +385,7 @@
}
}
+ @Override
public void shutdownOutput() throws IOException {
try {
sc.shutdownOutput();
@@ -414,6 +394,7 @@
}
}
+ @Override
public String toString() {
if (sc.isConnected())
return "Socket[addr=" + getInetAddress() +
@@ -422,23 +403,44 @@
return "Socket[unconnected]";
}
+ @Override
public boolean isConnected() {
return sc.isConnected();
}
+ @Override
public boolean isBound() {
return sc.localAddress() != null;
}
+ @Override
public boolean isClosed() {
return !sc.isOpen();
}
+ @Override
public boolean isInputShutdown() {
return !sc.isInputOpen();
}
+ @Override
public boolean isOutputShutdown() {
return !sc.isOutputOpen();
}
+
+ @Override
+ public <T> Socket setOption(SocketOption<T> name, T value) throws IOException {
+ sc.setOption(name, value);
+ return this;
+ }
+
+ @Override
+ public <T> T getOption(SocketOption<T> name) throws IOException {
+ return sc.getOption(name);
+ }
+
+ @Override
+ public Set<SocketOption<?>> supportedOptions() {
+ return sc.supportedOptions();
+ }
}
--- a/src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java Thu Apr 25 09:12:40 2019 +0200
+++ b/src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java Thu Apr 25 10:41:49 2019 +0100
@@ -34,6 +34,7 @@
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketOption;
+import java.net.SocketTimeoutException;
import java.net.StandardProtocolFamily;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
@@ -42,6 +43,7 @@
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ConnectionPendingException;
+import java.nio.channels.IllegalBlockingModeException;
import java.nio.channels.NoConnectionPendingException;
import java.nio.channels.NotYetConnectedException;
import java.nio.channels.SelectionKey;
@@ -51,6 +53,7 @@
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import sun.net.ConnectionResetException;
@@ -81,7 +84,8 @@
// Lock held by any thread that modifies the state fields declared below
// DO NOT invoke a blocking I/O operation while holding this lock!
- private final Object stateLock = new Object();
+ private final ReentrantLock stateLock = new ReentrantLock();
+ private final Condition stateCondition = stateLock.newCondition();
// Input/Output closed
private volatile boolean isInputClosed;
@@ -133,8 +137,11 @@
this.fd = fd;
this.fdVal = IOUtil.fdVal(fd);
if (bound) {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
this.localAddress = Net.localAddress(fd);
+ } finally {
+ stateLock.unlock();
}
}
}
@@ -147,10 +154,13 @@
super(sp);
this.fd = fd;
this.fdVal = IOUtil.fdVal(fd);
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
this.localAddress = Net.localAddress(fd);
this.remoteAddress = isa;
this.state = ST_CONNECTED;
+ } finally {
+ stateLock.unlock();
}
}
@@ -187,26 +197,35 @@
@Override
public Socket socket() {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
if (socket == null)
socket = SocketAdaptor.create(this);
return socket;
+ } finally {
+ stateLock.unlock();
}
}
@Override
public SocketAddress getLocalAddress() throws IOException {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
return Net.getRevealedLocalAddress(localAddress);
+ } finally {
+ stateLock.unlock();
}
}
@Override
public SocketAddress getRemoteAddress() throws IOException {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
return remoteAddress;
+ } finally {
+ stateLock.unlock();
}
}
@@ -218,7 +237,8 @@
if (!supportedOptions().contains(name))
throw new UnsupportedOperationException("'" + name + "' not supported");
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
if (name == StandardSocketOptions.IP_TOS) {
@@ -237,6 +257,8 @@
// no options that require special handling
Net.setSocketOption(fd, name, value);
return this;
+ } finally {
+ stateLock.unlock();
}
}
@@ -249,7 +271,8 @@
if (!supportedOptions().contains(name))
throw new UnsupportedOperationException("'" + name + "' not supported");
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
@@ -266,6 +289,8 @@
// no options that require special handling
return (T) Net.getSocketOption(fd, name);
+ } finally {
+ stateLock.unlock();
}
}
@@ -307,10 +332,13 @@
// set hook for Thread.interrupt
begin();
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpenAndConnected();
// record thread so it can be signalled if needed
readerThread = NativeThread.current();
+ } finally {
+ stateLock.unlock();
}
} else {
ensureOpenAndConnected();
@@ -327,12 +355,15 @@
throws AsynchronousCloseException
{
if (blocking) {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
readerThread = 0;
// notify any thread waiting in implCloseSelectableChannel
if (state == ST_CLOSING) {
- stateLock.notifyAll();
+ stateCondition.signalAll();
}
+ } finally {
+ stateLock.unlock();
}
// remove hook for Thread.interrupt
end(completed);
@@ -362,12 +393,12 @@
if (isInputClosed)
return IOStatus.EOF;
+ n = IOUtil.read(fd, buf, -1, nd);
if (blocking) {
- do {
+ while (IOStatus.okayToRetry(n) && isOpen()) {
+ park(Net.POLLIN);
n = IOUtil.read(fd, buf, -1, nd);
- } while (n == IOStatus.INTERRUPTED && isOpen());
- } else {
- n = IOUtil.read(fd, buf, -1, nd);
+ }
}
} catch (ConnectionResetException e) {
connectionReset = true;
@@ -404,12 +435,12 @@
if (isInputClosed)
return IOStatus.EOF;
+ n = IOUtil.read(fd, dsts, offset, length, nd);
if (blocking) {
- do {
+ while (IOStatus.okayToRetry(n) && isOpen()) {
+ park(Net.POLLIN);
n = IOUtil.read(fd, dsts, offset, length, nd);
- } while (n == IOStatus.INTERRUPTED && isOpen());
- } else {
- n = IOUtil.read(fd, dsts, offset, length, nd);
+ }
}
} catch (ConnectionResetException e) {
connectionReset = true;
@@ -436,12 +467,15 @@
// set hook for Thread.interrupt
begin();
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpenAndConnected();
if (isOutputClosed)
throw new ClosedChannelException();
// record thread so it can be signalled if needed
writerThread = NativeThread.current();
+ } finally {
+ stateLock.unlock();
}
} else {
ensureOpenAndConnected();
@@ -458,12 +492,15 @@
throws AsynchronousCloseException
{
if (blocking) {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
writerThread = 0;
// notify any thread waiting in implCloseSelectableChannel
if (state == ST_CLOSING) {
- stateLock.notifyAll();
+ stateCondition.signalAll();
}
+ } finally {
+ stateLock.unlock();
}
// remove hook for Thread.interrupt
end(completed);
@@ -480,12 +517,12 @@
int n = 0;
try {
beginWrite(blocking);
+ n = IOUtil.write(fd, buf, -1, nd);
if (blocking) {
- do {
+ while (IOStatus.okayToRetry(n) && isOpen()) {
+ park(Net.POLLOUT);
n = IOUtil.write(fd, buf, -1, nd);
- } while (n == IOStatus.INTERRUPTED && isOpen());
- } else {
- n = IOUtil.write(fd, buf, -1, nd);
+ }
}
} finally {
endWrite(blocking, n > 0);
@@ -510,12 +547,12 @@
long n = 0;
try {
beginWrite(blocking);
+ n = IOUtil.write(fd, srcs, offset, length, nd);
if (blocking) {
- do {
+ while (IOStatus.okayToRetry(n) && isOpen()) {
+ park(Net.POLLOUT);
n = IOUtil.write(fd, srcs, offset, length, nd);
- } while (n == IOStatus.INTERRUPTED && isOpen());
- } else {
- n = IOUtil.write(fd, srcs, offset, length, nd);
+ }
}
} finally {
endWrite(blocking, n > 0);
@@ -562,10 +599,7 @@
try {
writeLock.lock();
try {
- synchronized (stateLock) {
- ensureOpen();
- IOUtil.configureBlocking(fd, block);
- }
+ lockedConfigureBlocking(block);
} finally {
writeLock.unlock();
}
@@ -575,11 +609,28 @@
}
/**
+ * Adjust the blocking mode while holding the readLock or writeLock.
+ */
+ private void lockedConfigureBlocking(boolean block) throws IOException {
+ assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
+ stateLock.lock();
+ try {
+ ensureOpen();
+ IOUtil.configureBlocking(fd, block);
+ } finally {
+ stateLock.unlock();
+ }
+ }
+
+ /**
* Returns the local address, or null if not bound
*/
InetSocketAddress localAddress() {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
return localAddress;
+ } finally {
+ stateLock.unlock();
}
}
@@ -587,8 +638,11 @@
* Returns the remote address, or null if not connected
*/
InetSocketAddress remoteAddress() {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
return remoteAddress;
+ } finally {
+ stateLock.unlock();
}
}
@@ -598,7 +652,8 @@
try {
writeLock.lock();
try {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
if (state == ST_CONNECTIONPENDING)
throw new ConnectionPendingException();
@@ -613,6 +668,8 @@
NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
Net.bind(fd, isa.getAddress(), isa.getPort());
localAddress = Net.localAddress(fd);
+ } finally {
+ stateLock.unlock();
}
} finally {
writeLock.unlock();
@@ -649,7 +706,8 @@
// set hook for Thread.interrupt
begin();
}
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
int state = this.state;
if (state == ST_CONNECTED)
@@ -667,6 +725,8 @@
// record thread so it can be signalled if needed
readerThread = NativeThread.current();
}
+ } finally {
+ stateLock.unlock();
}
}
@@ -683,43 +743,62 @@
endRead(blocking, completed);
if (completed) {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
if (state == ST_CONNECTIONPENDING) {
localAddress = Net.localAddress(fd);
state = ST_CONNECTED;
}
+ } finally {
+ stateLock.unlock();
}
}
}
+ /**
+ * Checks the remote address to which this channel is to be connected.
+ */
+ private InetSocketAddress checkRemote(SocketAddress sa) throws IOException {
+ InetSocketAddress isa = Net.checkAddress(sa);
+ SecurityManager sm = System.getSecurityManager();
+ if (sm != null) {
+ sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
+ }
+ if (isa.getAddress().isAnyLocalAddress()) {
+ return new InetSocketAddress(InetAddress.getLocalHost(), isa.getPort());
+ } else {
+ return isa;
+ }
+ }
+
@Override
- public boolean connect(SocketAddress sa) throws IOException {
- InetSocketAddress isa = Net.checkAddress(sa);
- SecurityManager sm = System.getSecurityManager();
- if (sm != null)
- sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
-
- InetAddress ia = isa.getAddress();
- if (ia.isAnyLocalAddress())
- ia = InetAddress.getLocalHost();
-
+ public boolean connect(SocketAddress remote) throws IOException {
+ InetSocketAddress isa = checkRemote(remote);
try {
readLock.lock();
try {
writeLock.lock();
try {
- int n = 0;
boolean blocking = isBlocking();
+ boolean connected = false;
try {
beginConnect(blocking, isa);
- do {
- n = Net.connect(fd, ia, isa.getPort());
- } while (n == IOStatus.INTERRUPTED && isOpen());
+ int n = Net.connect(fd, isa.getAddress(), isa.getPort());
+ if (n > 0) {
+ connected = true;
+ } else if (blocking) {
+ assert IOStatus.okayToRetry(n);
+ boolean polled = false;
+ while (!polled && isOpen()) {
+ park(Net.POLLOUT);
+ polled = Net.pollConnectNow(fd);
+ }
+ connected = polled && isOpen();
+ }
} finally {
- endConnect(blocking, (n > 0));
+ endConnect(blocking, connected);
}
- assert IOStatus.check(n);
- return n > 0;
+ return connected;
} finally {
writeLock.unlock();
}
@@ -744,7 +823,8 @@
// set hook for Thread.interrupt
begin();
}
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
if (state != ST_CONNECTIONPENDING)
throw new NoConnectionPendingException();
@@ -752,6 +832,8 @@
// record thread so it can be signalled if needed
readerThread = NativeThread.current();
}
+ } finally {
+ stateLock.unlock();
}
}
@@ -768,11 +850,14 @@
endRead(blocking, completed);
if (completed) {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
if (state == ST_CONNECTIONPENDING) {
localAddress = Net.localAddress(fd);
state = ST_CONNECTED;
}
+ } finally {
+ stateLock.unlock();
}
}
}
@@ -792,13 +877,14 @@
boolean connected = false;
try {
beginFinishConnect(blocking);
+ boolean polled = Net.pollConnectNow(fd);
if (blocking) {
- do {
- connected = Net.pollConnect(fd, -1);
- } while (!connected && isOpen());
- } else {
- connected = Net.pollConnect(fd, 0);
+ while (!polled && isOpen()) {
+ park(Net.POLLOUT);
+ polled = Net.pollConnectNow(fd);
+ }
}
+ connected = polled && isOpen();
} finally {
endFinishConnect(blocking, connected);
}
@@ -843,16 +929,20 @@
boolean interrupted = false;
// set state to ST_CLOSING
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
assert state < ST_CLOSING;
blocking = isBlocking();
connected = (state == ST_CONNECTED);
state = ST_CLOSING;
+ } finally {
+ stateLock.unlock();
}
// wait for any outstanding I/O operations to complete
if (blocking) {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
assert state == ST_CLOSING;
long reader = readerThread;
long writer = writerThread;
@@ -868,12 +958,14 @@
// wait for blocking I/O operations to end
while (readerThread != 0 || writerThread != 0) {
try {
- stateLock.wait();
+ stateCondition.await();
} catch (InterruptedException e) {
interrupted = true;
}
}
}
+ } finally {
+ stateLock.unlock();
}
} else {
// non-blocking mode: wait for read/write to complete
@@ -887,7 +979,8 @@
}
// set state to ST_KILLPENDING
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
assert state == ST_CLOSING;
// if connected and the channel is registered with a Selector then
// shutdown the output if possible so that the peer reads EOF. If
@@ -908,6 +1001,8 @@
} catch (IOException ignore) { }
}
state = ST_KILLPENDING;
+ } finally {
+ stateLock.unlock();
}
// close socket if not registered with Selector
@@ -921,17 +1016,21 @@
@Override
public void kill() throws IOException {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
if (state == ST_KILLPENDING) {
state = ST_KILLED;
nd.close(fd);
}
+ } finally {
+ stateLock.unlock();
}
}
@Override
public SocketChannel shutdownInput() throws IOException {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
if (!isConnected())
throw new NotYetConnectedException();
@@ -943,12 +1042,15 @@
isInputClosed = true;
}
return this;
+ } finally {
+ stateLock.unlock();
}
}
@Override
public SocketChannel shutdownOutput() throws IOException {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpen();
if (!isConnected())
throw new NotYetConnectedException();
@@ -960,6 +1062,8 @@
isOutputClosed = true;
}
return this;
+ } finally {
+ stateLock.unlock();
}
}
@@ -972,58 +1076,223 @@
}
/**
- * Poll this channel's socket for reading up to the given timeout.
- * @return {@code true} if the socket is polled
+ * Waits for a connection attempt to finish with a timeout
+ * @throws SocketTimeoutException if the connect timeout elapses
+ */
+ private boolean finishTimedConnect(long nanos) throws IOException {
+ long startNanos = System.nanoTime();
+ boolean polled = Net.pollConnectNow(fd);
+ while (!polled && isOpen()) {
+ long remainingNanos = nanos - (System.nanoTime() - startNanos);
+ if (remainingNanos <= 0) {
+ throw new SocketTimeoutException("Connect timed out");
+ }
+ park(Net.POLLOUT, remainingNanos);
+ polled = Net.pollConnectNow(fd);
+ }
+ return polled && isOpen();
+ }
+
+ /**
+ * Attempts to establish a connection to the given socket address with a
+ * timeout. Closes the socket if connection cannot be established.
+ *
+ * @apiNote This method is for use by the socket adaptor.
+ *
+ * @throws IllegalBlockingModeException if the channel is non-blocking
+ * @throws SocketTimeoutException if the read timeout elapses
*/
- boolean pollRead(long timeout) throws IOException {
- boolean blocking = isBlocking();
- assert Thread.holdsLock(blockingLock()) && blocking;
+ void blockingConnect(SocketAddress remote, long nanos) throws IOException {
+ InetSocketAddress isa = checkRemote(remote);
+ try {
+ readLock.lock();
+ try {
+ writeLock.lock();
+ try {
+ if (!isBlocking())
+ throw new IllegalBlockingModeException();
+ boolean connected = false;
+ try {
+ beginConnect(true, isa);
+ // change socket to non-blocking
+ lockedConfigureBlocking(false);
+ try {
+ int n = Net.connect(fd, isa.getAddress(), isa.getPort());
+ connected = (n > 0) ? true : finishTimedConnect(nanos);
+ } finally {
+ // restore socket to blocking mode
+ lockedConfigureBlocking(true);
+ }
+ } finally {
+ endConnect(true, connected);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ } finally {
+ readLock.unlock();
+ }
+ } catch (IOException ioe) {
+ // connect failed, close the channel
+ close();
+ throw SocketExceptions.of(ioe, isa);
+ }
+ }
+
+ /**
+ * Attempts to read bytes from the socket into the given byte array.
+ */
+ private int tryRead(byte[] b, int off, int len) throws IOException {
+ ByteBuffer dst = Util.getTemporaryDirectBuffer(len);
+ assert dst.position() == 0;
+ try {
+ int n = nd.read(fd, ((DirectBuffer)dst).address(), len);
+ if (n > 0) {
+ dst.get(b, off, n);
+ }
+ return n;
+ } finally{
+ Util.offerFirstTemporaryDirectBuffer(dst);
+ }
+ }
+
+ /**
+ * Reads bytes from the socket into the given byte array with a timeout.
+ * @throws SocketTimeoutException if the read timeout elapses
+ */
+ private int timedRead(byte[] b, int off, int len, long nanos) throws IOException {
+ long startNanos = System.nanoTime();
+ int n = tryRead(b, off, len);
+ while (n == IOStatus.UNAVAILABLE && isOpen()) {
+ long remainingNanos = nanos - (System.nanoTime() - startNanos);
+ if (remainingNanos <= 0) {
+ throw new SocketTimeoutException("Read timed out");
+ }
+ park(Net.POLLIN, remainingNanos);
+ n = tryRead(b, off, len);
+ }
+ return n;
+ }
+
+ /**
+ * Reads bytes from the socket into the given byte array.
+ *
+ * @apiNote This method is for use by the socket adaptor.
+ *
+ * @throws IllegalBlockingModeException if the channel is non-blocking
+ * @throws SocketTimeoutException if the read timeout elapses
+ */
+ int blockingRead(byte[] b, int off, int len, long nanos) throws IOException {
+ Objects.checkFromIndexSize(off, len, b.length);
+ if (len == 0) {
+ // nothing to do
+ return 0;
+ }
readLock.lock();
try {
- boolean polled = false;
+ // check that channel is configured blocking
+ if (!isBlocking())
+ throw new IllegalBlockingModeException();
+
+ int n = 0;
try {
- beginRead(blocking);
- int events = Net.poll(fd, Net.POLLIN, timeout);
- polled = (events != 0);
+ beginRead(true);
+
+ // check if connection has been reset
+ if (connectionReset)
+ throwConnectionReset();
+
+ // check if input is shutdown
+ if (isInputClosed)
+ return IOStatus.EOF;
+
+ if (nanos > 0) {
+ // change socket to non-blocking
+ lockedConfigureBlocking(false);
+ try {
+ n = timedRead(b, off, len, nanos);
+ } finally {
+ // restore socket to blocking mode
+ lockedConfigureBlocking(true);
+ }
+ } else {
+ // read, no timeout
+ n = tryRead(b, off, len);
+ while (IOStatus.okayToRetry(n) && isOpen()) {
+ park(Net.POLLIN);
+ n = tryRead(b, off, len);
+ }
+ }
+ } catch (ConnectionResetException e) {
+ connectionReset = true;
+ throwConnectionReset();
} finally {
- endRead(blocking, polled);
+ endRead(true, n > 0);
+ if (n <= 0 && isInputClosed)
+ return IOStatus.EOF;
}
- return polled;
+ assert n > 0 || n == -1;
+ return n;
} finally {
readLock.unlock();
}
}
/**
- * Poll this channel's socket for a connection, up to the given timeout.
- * @return {@code true} if the socket is polled
+ * Attempts to write a sequence of bytes to the socket from the given
+ * byte array.
*/
- boolean pollConnected(long timeout) throws IOException {
- boolean blocking = isBlocking();
- assert Thread.holdsLock(blockingLock()) && blocking;
+ private int tryWrite(byte[] b, int off, int len) throws IOException {
+ ByteBuffer src = Util.getTemporaryDirectBuffer(len);
+ assert src.position() == 0;
+ try {
+ src.put(b, off, len);
+ return nd.write(fd, ((DirectBuffer)src).address(), len);
+ } finally {
+ Util.offerFirstTemporaryDirectBuffer(src);
+ }
+ }
- readLock.lock();
+ /**
+ * Writes a sequence of bytes to the socket from the given byte array.
+ *
+ * @apiNote This method is for use by the socket adaptor.
+ */
+ void blockingWriteFully(byte[] b, int off, int len) throws IOException {
+ Objects.checkFromIndexSize(off, len, b.length);
+ if (len == 0) {
+ // nothing to do
+ return;
+ }
+
+ writeLock.lock();
try {
- writeLock.lock();
+ // check that channel is configured blocking
+ if (!isBlocking())
+ throw new IllegalBlockingModeException();
+
+ // loop until all bytes have been written
+ int pos = off;
+ int end = off + len;
+ beginWrite(true);
try {
- boolean polled = false;
- try {
- beginFinishConnect(blocking);
- int events = Net.poll(fd, Net.POLLCONN, timeout);
- polled = (events != 0);
- } finally {
- // invoke endFinishConnect with completed = false so that
- // the state is not changed to ST_CONNECTED. The socket
- // adaptor will use finishConnect to finish.
- endFinishConnect(blocking, /*completed*/false);
+ while (pos < end && isOpen()) {
+ int size = end - pos;
+ int n = tryWrite(b, pos, size);
+ while (IOStatus.okayToRetry(n) && isOpen()) {
+ park(Net.POLLOUT);
+ n = tryWrite(b, pos, size);
+ }
+ if (n > 0) {
+ pos += n;
+ }
}
- return polled;
} finally {
- writeLock.unlock();
+ endWrite(true, pos >= end);
}
} finally {
- readLock.unlock();
+ writeLock.unlock();
}
}
@@ -1031,13 +1300,16 @@
* Return the number of bytes in the socket input buffer.
*/
int available() throws IOException {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
ensureOpenAndConnected();
if (isInputClosed) {
return 0;
} else {
return Net.available(fd);
}
+ } finally {
+ stateLock.unlock();
}
}
@@ -1117,7 +1389,8 @@
if (!isOpen())
sb.append("closed");
else {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
switch (state) {
case ST_UNCONNECTED:
sb.append("unconnected");
@@ -1142,6 +1415,8 @@
sb.append(" remote=");
sb.append(remoteAddress().toString());
}
+ } finally {
+ stateLock.unlock();
}
}
sb.append(']');
--- a/src/java.base/unix/classes/sun/nio/ch/SinkChannelImpl.java Thu Apr 25 09:12:40 2019 +0200
+++ b/src/java.base/unix/classes/sun/nio/ch/SinkChannelImpl.java Thu Apr 25 10:41:49 2019 +0100
@@ -35,6 +35,7 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.spi.SelectorProvider;
import java.util.Objects;
+import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
class SinkChannelImpl
@@ -53,7 +54,8 @@
// Lock held by any thread that modifies the state fields declared below
// DO NOT invoke a blocking I/O operation while holding this lock!
- private final Object stateLock = new Object();
+ private final ReentrantLock stateLock = new ReentrantLock();
+ private final Condition stateCondition = stateLock.newCondition();
// -- The following fields are protected by stateLock
@@ -95,15 +97,19 @@
boolean blocking;
// set state to ST_CLOSING
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
assert state < ST_CLOSING;
state = ST_CLOSING;
blocking = isBlocking();
+ } finally {
+ stateLock.unlock();
}
// wait for any outstanding write to complete
if (blocking) {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
assert state == ST_CLOSING;
long th = thread;
if (th != 0) {
@@ -113,12 +119,14 @@
// wait for write operation to end
while (thread != 0) {
try {
- stateLock.wait();
+ stateCondition.await();
} catch (InterruptedException e) {
interrupted = true;
}
}
}
+ } finally {
+ stateLock.unlock();
}
} else {
// non-blocking mode: wait for write to complete
@@ -127,9 +135,12 @@
}
// set state to ST_KILLPENDING
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
assert state == ST_CLOSING;
state = ST_KILLPENDING;
+ } finally {
+ stateLock.unlock();
}
// close socket if not registered with Selector
@@ -143,12 +154,15 @@
@Override
public void kill() throws IOException {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
assert thread == 0;
if (state == ST_KILLPENDING) {
state = ST_KILLED;
nd.close(fd);
}
+ } finally {
+ stateLock.unlock();
}
}
@@ -156,8 +170,11 @@
protected void implConfigureBlocking(boolean block) throws IOException {
writeLock.lock();
try {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
IOUtil.configureBlocking(fd, block);
+ } finally {
+ stateLock.unlock();
}
} finally {
writeLock.unlock();
@@ -212,11 +229,14 @@
// set hook for Thread.interrupt
begin();
}
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
if (!isOpen())
throw new ClosedChannelException();
if (blocking)
thread = NativeThread.current();
+ } finally {
+ stateLock.unlock();
}
}
@@ -230,12 +250,15 @@
throws AsynchronousCloseException
{
if (blocking) {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
thread = 0;
// notify any thread waiting in implCloseSelectableChannel
if (state == ST_CLOSING) {
- stateLock.notifyAll();
+ stateCondition.signalAll();
}
+ } finally {
+ stateLock.unlock();
}
// remove hook for Thread.interrupt
end(completed);
@@ -252,9 +275,13 @@
int n = 0;
try {
beginWrite(blocking);
- do {
- n = IOUtil.write(fd, src, -1, nd);
- } while ((n == IOStatus.INTERRUPTED) && isOpen());
+ n = IOUtil.write(fd, src, -1, nd);
+ if (blocking) {
+ while (IOStatus.okayToRetry(n) && isOpen()) {
+ park(Net.POLLOUT);
+ n = IOUtil.write(fd, src, -1, nd);
+ }
+ }
} finally {
endWrite(blocking, n > 0);
assert IOStatus.check(n);
@@ -275,9 +302,13 @@
long n = 0;
try {
beginWrite(blocking);
- do {
- n = IOUtil.write(fd, srcs, offset, length, nd);
- } while ((n == IOStatus.INTERRUPTED) && isOpen());
+ n = IOUtil.write(fd, srcs, offset, length, nd);
+ if (blocking) {
+ while (IOStatus.okayToRetry(n) && isOpen()) {
+ park(Net.POLLOUT);
+ n = IOUtil.write(fd, srcs, offset, length, nd);
+ }
+ }
} finally {
endWrite(blocking, n > 0);
assert IOStatus.check(n);
--- a/src/java.base/unix/classes/sun/nio/ch/SourceChannelImpl.java Thu Apr 25 09:12:40 2019 +0200
+++ b/src/java.base/unix/classes/sun/nio/ch/SourceChannelImpl.java Thu Apr 25 10:41:49 2019 +0100
@@ -35,6 +35,7 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.spi.SelectorProvider;
import java.util.Objects;
+import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
class SourceChannelImpl
@@ -53,7 +54,8 @@
// Lock held by any thread that modifies the state fields declared below
// DO NOT invoke a blocking I/O operation while holding this lock!
- private final Object stateLock = new Object();
+ private final ReentrantLock stateLock = new ReentrantLock();
+ private final Condition stateCondition = stateLock.newCondition();
// -- The following fields are protected by stateLock
@@ -95,15 +97,19 @@
boolean blocking;
// set state to ST_CLOSING
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
assert state < ST_CLOSING;
state = ST_CLOSING;
blocking = isBlocking();
+ } finally {
+ stateLock.unlock();
}
// wait for any outstanding read to complete
if (blocking) {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
assert state == ST_CLOSING;
long th = thread;
if (th != 0) {
@@ -113,12 +119,14 @@
// wait for read operation to end
while (thread != 0) {
try {
- stateLock.wait();
+ stateCondition.await();
} catch (InterruptedException e) {
interrupted = true;
}
}
}
+ } finally {
+ stateLock.unlock();
}
} else {
// non-blocking mode: wait for read to complete
@@ -127,9 +135,12 @@
}
// set state to ST_KILLPENDING
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
assert state == ST_CLOSING;
state = ST_KILLPENDING;
+ } finally {
+ stateLock.unlock();
}
// close socket if not registered with Selector
@@ -143,12 +154,15 @@
@Override
public void kill() throws IOException {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
assert thread == 0;
if (state == ST_KILLPENDING) {
state = ST_KILLED;
nd.close(fd);
}
+ } finally {
+ stateLock.unlock();
}
}
@@ -156,8 +170,11 @@
protected void implConfigureBlocking(boolean block) throws IOException {
readLock.lock();
try {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
IOUtil.configureBlocking(fd, block);
+ } finally {
+ stateLock.unlock();
}
} finally {
readLock.unlock();
@@ -212,11 +229,14 @@
// set hook for Thread.interrupt
begin();
}
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
if (!isOpen())
throw new ClosedChannelException();
if (blocking)
thread = NativeThread.current();
+ } finally {
+ stateLock.unlock();
}
}
@@ -230,12 +250,15 @@
throws AsynchronousCloseException
{
if (blocking) {
- synchronized (stateLock) {
+ stateLock.lock();
+ try {
thread = 0;
// notify any thread waiting in implCloseSelectableChannel
if (state == ST_CLOSING) {
- stateLock.notifyAll();
+ stateCondition.signalAll();
}
+ } finally {
+ stateLock.unlock();
}
// remove hook for Thread.interrupt
end(completed);
@@ -252,9 +275,13 @@
int n = 0;
try {
beginRead(blocking);
- do {
- n = IOUtil.read(fd, dst, -1, nd);
- } while ((n == IOStatus.INTERRUPTED) && isOpen());
+ n = IOUtil.read(fd, dst, -1, nd);
+ if (blocking) {
+ while (IOStatus.okayToRetry(n) && isOpen()) {
+ park(Net.POLLIN);
+ n = IOUtil.read(fd, dst, -1, nd);
+ }
+ }
} finally {
endRead(blocking, n > 0);
assert IOStatus.check(n);
@@ -275,9 +302,13 @@
long n = 0;
try {
beginRead(blocking);
- do {
- n = IOUtil.read(fd, dsts, offset, length, nd);
- } while ((n == IOStatus.INTERRUPTED) && isOpen());
+ n = IOUtil.read(fd, dsts, offset, length, nd);
+ if (blocking) {
+ while (IOStatus.okayToRetry(n) && isOpen()) {
+ park(Net.POLLIN);
+ n = IOUtil.read(fd, dsts, offset, length, nd);
+ }
+ }
} finally {
endRead(blocking, n > 0);
assert IOStatus.check(n);
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/nio/channels/SocketChannel/AdaptorStreams.java Thu Apr 25 10:41:49 2019 +0100
@@ -0,0 +1,516 @@
+/*
+ * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/* @test
+ * @bug 8222774 4430139
+ * @run testng AdaptorStreams
+ * @summary Exercise socket adaptor input/output streams
+ */
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.nio.channels.IllegalBlockingModeException;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.testng.annotations.Test;
+import static org.testng.Assert.*;
+
+@Test
+public class AdaptorStreams {
+
+ /**
+ * Test read when bytes are available
+ */
+ public void testRead1() throws Exception {
+ withConnection((sc, peer) -> {
+ peer.getOutputStream().write(99);
+ int n = sc.socket().getInputStream().read();
+ assertTrue(n == 99);
+ });
+ }
+
+ /**
+ * Test read blocking before bytes are available
+ */
+ public void testRead2() throws Exception {
+ withConnection((sc, peer) -> {
+ scheduleWrite(peer.getOutputStream(), 99, 1000);
+ int n = sc.socket().getInputStream().read();
+ assertTrue(n == 99);
+ });
+ }
+
+ /**
+ * Test read when peer has closed connection
+ */
+ public void testRead3() throws Exception {
+ withConnection((sc, peer) -> {
+ peer.close();
+ int n = sc.socket().getInputStream().read();
+ assertTrue(n == -1);
+ });
+ }
+
+ /**
+ * Test read blocking before peer closes connection
+ */
+ public void testRead4() throws Exception {
+ withConnection((sc, peer) -> {
+ scheduleClose(peer, 1000);
+ int n = sc.socket().getInputStream().read();
+ assertTrue(n == -1);
+ });
+ }
+
+ /**
+ * Test async close of socket when thread blocked in read
+ */
+ public void testRead5() throws Exception {
+ withConnection((sc, peer) -> {
+ scheduleClose(sc, 2000);
+ InputStream in = sc.socket().getInputStream();
+ expectThrows(IOException.class, () -> in.read());
+ });
+ }
+
+ /**
+ * Test interrupt status set before read
+ */
+ public void testRead6() throws Exception {
+ withConnection((sc, peer) -> {
+ Socket s = sc.socket();
+ Thread.currentThread().interrupt();
+ try {
+ InputStream in = s.getInputStream();
+ expectThrows(IOException.class, () -> in.read());
+ } finally {
+ Thread.interrupted(); // clear interrupt
+ }
+ assertTrue(s.isClosed());
+ });
+ }
+
+ /**
+ * Test interrupt of thread blocked in read
+ */
+ public void testRead7() throws Exception {
+ withConnection((sc, peer) -> {
+ Future<?> interrupter = scheduleInterrupt(Thread.currentThread(), 2000);
+ Socket s = sc.socket();
+ try {
+ InputStream in = s.getInputStream();
+ expectThrows(IOException.class, () -> in.read());
+ } finally {
+ interrupter.cancel(true);
+ Thread.interrupted(); // clear interrupt
+ }
+ assertTrue(s.isClosed());
+ });
+ }
+
+ /**
+ * Test read when channel is configured non-blocking
+ */
+ public void testRead8() throws Exception {
+ withConnection((sc, peer) -> {
+ sc.configureBlocking(false);
+ InputStream in = sc.socket().getInputStream();
+ expectThrows(IllegalBlockingModeException.class, () -> in.read());
+ });
+ }
+
+ /**
+ * Test timed read when bytes are available
+ */
+ public void testTimedRead1() throws Exception {
+ withConnection((sc, peer) -> {
+ peer.getOutputStream().write(99);
+ Socket s = sc.socket();
+ s.setSoTimeout(1000);
+ int n = s.getInputStream().read();
+ assertTrue(n == 99);
+ });
+ }
+
+ /**
+ * Test timed read blocking before bytes are available
+ */
+ public void testTimedRead2() throws Exception {
+ withConnection((sc, peer) -> {
+ scheduleWrite(peer.getOutputStream(), 99, 1000);
+ Socket s = sc.socket();
+ s.setSoTimeout(5000);
+ int n = s.getInputStream().read();
+ assertTrue(n == 99);
+ });
+ }
+
+ /**
+ * Test timed read when the read times out
+ */
+ public void testTimedRead3() throws Exception {
+ withConnection((sc, peer) -> {
+ Socket s = sc.socket();
+ s.setSoTimeout(1000);
+ InputStream in = s.getInputStream();
+ expectThrows(SocketTimeoutException.class, () -> in.read());
+ });
+ }
+
+ /**
+ * Test async close of socket when thread blocked in timed read
+ */
+ public void testTimedRead4() throws Exception {
+ withConnection((sc, peer) -> {
+ scheduleClose(sc, 2000);
+ Socket s = sc.socket();
+ s.setSoTimeout(60*1000);
+ InputStream in = s.getInputStream();
+ expectThrows(IOException.class, () -> in.read());
+ });
+ }
+
+ /**
+ * Test interrupt status set before timed read
+ */
+ public void testTimedRead5() throws Exception {
+ withConnection((sc, peer) -> {
+ Socket s = sc.socket();
+ Thread.currentThread().interrupt();
+ try {
+ s.setSoTimeout(60*1000);
+ InputStream in = s.getInputStream();
+ expectThrows(IOException.class, () -> in.read());
+ } finally {
+ Thread.interrupted(); // clear interrupt
+ }
+ assertTrue(s.isClosed());
+ });
+ }
+
+ /**
+ * Test interrupt of thread blocked in timed read
+ */
+ public void testTimedRead6() throws Exception {
+ withConnection((sc, peer) -> {
+ Future<?> interrupter = scheduleInterrupt(Thread.currentThread(), 2000);
+ Socket s = sc.socket();
+ try {
+ s.setSoTimeout(60*1000);
+ InputStream in = s.getInputStream();
+ expectThrows(IOException.class, () -> in.read());
+ assertTrue(s.isClosed());
+ } finally {
+ interrupter.cancel(true);
+ Thread.interrupted(); // clear interrupt
+ }
+ assertTrue(s.isClosed());
+ });
+ }
+
+ /**
+ * Test async close of socket when thread blocked in write
+ */
+ public void testWrite1() throws Exception {
+ withConnection((sc, peer) -> {
+ scheduleClose(sc, 2000);
+ expectThrows(IOException.class, () -> {
+ OutputStream out = sc.socket().getOutputStream();
+ byte[] data = new byte[64*1000];
+ while (true) {
+ out.write(data);
+ }
+ });
+ });
+ }
+
+ /**
+ * Test interrupt status set before write
+ */
+ public void testWrite2() throws Exception {
+ withConnection((sc, peer) -> {
+ Socket s = sc.socket();
+ Thread.currentThread().interrupt();
+ try {
+ OutputStream out = s.getOutputStream();
+ expectThrows(IOException.class, () -> out.write(99));
+ } finally {
+ Thread.interrupted(); // clear interrupt
+ }
+ assertTrue(s.isClosed());
+ });
+ }
+
+ /**
+ * Test interrupt of thread blocked in write
+ */
+ public void testWrite3() throws Exception {
+ withConnection((sc, peer) -> {
+ Future<?> interrupter = scheduleInterrupt(Thread.currentThread(), 2000);
+ Socket s = sc.socket();
+ try {
+ expectThrows(IOException.class, () -> {
+ OutputStream out = sc.socket().getOutputStream();
+ byte[] data = new byte[64*1000];
+ while (true) {
+ out.write(data);
+ }
+ });
+ } finally {
+ interrupter.cancel(true);
+ Thread.interrupted(); // clear interrupt
+ }
+ assertTrue(s.isClosed());
+ });
+ }
+
+ /**
+ * Test write when channel is configured non-blocking
+ */
+ public void testWrite4() throws Exception {
+ withConnection((sc, peer) -> {
+ sc.configureBlocking(false);
+ OutputStream out = sc.socket().getOutputStream();
+ expectThrows(IllegalBlockingModeException.class, () -> out.write(99));
+ });
+ }
+
+ /**
+ * Test read when there are bytes available and another thread is blocked
+ * in write
+ */
+ public void testConcurrentReadWrite1() throws Exception {
+ withConnection((sc, peer) -> {
+ Socket s = sc.socket();
+
+ // block thread in write
+ execute(() -> {
+ var data = new byte[64*1024];
+ OutputStream out = s.getOutputStream();
+ for (;;) {
+ out.write(data);
+ }
+ });
+ Thread.sleep(1000); // give writer time to block
+
+ // test read when bytes are available
+ peer.getOutputStream().write(99);
+ int n = s.getInputStream().read();
+ assertTrue(n == 99);
+ });
+ }
+
+ /**
+ * Test read blocking when another thread is blocked in write
+ */
+ public void testConcurrentReadWrite2() throws Exception {
+ withConnection((sc, peer) -> {
+ Socket s = sc.socket();
+
+ // block thread in write
+ execute(() -> {
+ var data = new byte[64*1024];
+ OutputStream out = s.getOutputStream();
+ for (;;) {
+ out.write(data);
+ }
+ });
+ Thread.sleep(1000); // give writer time to block
+
+ // test read blocking until bytes are available
+ scheduleWrite(peer.getOutputStream(), 99, 500);
+ int n = s.getInputStream().read();
+ assertTrue(n == 99);
+ });
+ }
+
+ /**
+ * Test writing when another thread is blocked in read
+ */
+ public void testConcurrentReadWrite3() throws Exception {
+ withConnection((sc, peer) -> {
+ Socket s = sc.socket();
+
+ // block thread in read
+ execute(() -> {
+ s.getInputStream().read();
+ });
+ Thread.sleep(100); // give reader time to block
+
+ // test write
+ s.getOutputStream().write(99);
+ int n = peer.getInputStream().read();
+ assertTrue(n == 99);
+ });
+ }
+
+ /**
+ * Test timed read when there are bytes available and another thread is
+ * blocked in write
+ */
+ public void testConcurrentTimedReadWrite1() throws Exception {
+ withConnection((sc, peer) -> {
+ Socket s = sc.socket();
+
+ // block thread in write
+ execute(() -> {
+ var data = new byte[64*1024];
+ OutputStream out = s.getOutputStream();
+ for (;;) {
+ out.write(data);
+ }
+ });
+ Thread.sleep(1000); // give writer time to block
+
+ // test read when bytes are available
+ peer.getOutputStream().write(99);
+ s.setSoTimeout(60*1000);
+ int n = s.getInputStream().read();
+ assertTrue(n == 99);
+ });
+ }
+
+ /**
+ * Test timed read blocking when another thread is blocked in write
+ */
+ public void testConcurrentTimedReadWrite2() throws Exception {
+ withConnection((sc, peer) -> {
+ Socket s = sc.socket();
+
+ // block thread in write
+ execute(() -> {
+ var data = new byte[64*1024];
+ OutputStream out = s.getOutputStream();
+ for (;;) {
+ out.write(data);
+ }
+ });
+ Thread.sleep(1000); // give writer time to block
+
+ // test read blocking until bytes are available
+ scheduleWrite(peer.getOutputStream(), 99, 500);
+ s.setSoTimeout(60*1000);
+ int n = s.getInputStream().read();
+ assertTrue(n == 99);
+ });
+ }
+
+ /**
+ * Test writing when another thread is blocked in read
+ */
+ public void testConcurrentTimedReadWrite3() throws Exception {
+ withConnection((sc, peer) -> {
+ Socket s = sc.socket();
+
+ // block thread in read
+ execute(() -> {
+ s.setSoTimeout(60*1000);
+ s.getInputStream().read();
+ });
+ Thread.sleep(100); // give reader time to block
+
+ // test write
+ s.getOutputStream().write(99);
+ int n = peer.getInputStream().read();
+ assertTrue(n == 99);
+ });
+ }
+
+ // -- test infrastructure --
+
+ interface ThrowingTask {
+ void run() throws Exception;
+ }
+
+ interface ThrowingBiConsumer<T, U> {
+ void accept(T t, U u) throws Exception;
+ }
+
+ /**
+ * Invokes the consumer with a connected pair of socket channel and socket
+ */
+ static void withConnection(ThrowingBiConsumer<SocketChannel, Socket> consumer)
+ throws Exception
+ {
+ try (ServerSocket ss = new ServerSocket(0);
+ SocketChannel sc = SocketChannel.open(ss.getLocalSocketAddress());
+ Socket peer = ss.accept()) {
+ consumer.accept(sc, peer);
+ }
+ }
+
+ static Future<?> scheduleWrite(OutputStream out, byte[] data, long delay) {
+ return schedule(() -> {
+ try {
+ out.write(data);
+ } catch (IOException ioe) { }
+ }, delay);
+ }
+
+ static Future<?> scheduleWrite(OutputStream out, int b, long delay) {
+ return scheduleWrite(out, new byte[] { (byte)b }, delay);
+ }
+
+ static Future<?> scheduleClose(Closeable c, long delay) {
+ return schedule(() -> {
+ try {
+ c.close();
+ } catch (IOException ioe) { }
+ }, delay);
+ }
+
+ static Future<?> scheduleInterrupt(Thread t, long delay) {
+ return schedule(() -> t.interrupt(), delay);
+ }
+
+ static Future<?> schedule(Runnable task, long delay) {
+ ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+ try {
+ return executor.schedule(task, delay, TimeUnit.MILLISECONDS);
+ } finally {
+ executor.shutdown();
+ }
+ }
+
+ static Future<?> execute(ThrowingTask task) {
+ ExecutorService pool = Executors.newFixedThreadPool(1);
+ try {
+ return pool.submit(() -> {
+ task.run();
+ return null;
+ });
+ } finally {
+ pool.shutdown();
+ }
+ }
+}
--- a/test/jdk/java/nio/channels/SocketChannel/Stream.java Thu Apr 25 09:12:40 2019 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,63 +0,0 @@
-/*
- * Copyright (c) 2001, 2018, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-/* @test
- * @bug 4430139
- * @summary Test result of read on stream from nonblocking channel
- * @library .. /test/lib
- * @build jdk.test.lib.Utils TestServers
- * @run main Stream
- */
-
-import java.io.*;
-import java.net.*;
-import java.nio.channels.*;
-
-
-public class Stream {
-
- static void test(TestServers.DayTimeServer daytimeServer) throws Exception {
- InetSocketAddress isa
- = new InetSocketAddress(daytimeServer.getAddress(),
- daytimeServer.getPort());
- SocketChannel sc = SocketChannel.open();
- sc.connect(isa);
- sc.configureBlocking(false);
- InputStream is = sc.socket().getInputStream();
- byte b[] = new byte[10];
- try {
- int n = is.read(b);
- throw new RuntimeException("Exception expected; none thrown");
- } catch (IllegalBlockingModeException e) {
- // expected result
- }
- sc.close();
- }
-
- public static void main(String[] args) throws Exception {
- try (TestServers.DayTimeServer dayTimeServer
- = TestServers.DayTimeServer.startNewServer(100)) {
- test(dayTimeServer);
- }
- }
-}