# HG changeset patch # User alanb # Date 1556185309 -3600 # Node ID 13b67c1420b8757ea0b643f5baa5cf2afab3b4a8 # Parent b43cc3b9ef4064cea46be3929d654943a7bc0d95 8222774: (ch) Replace uses of stateLock and blockingLock with j.u.c. locks Reviewed-by: dfuchs, bpb, martin diff -r b43cc3b9ef40 -r 13b67c1420b8 src/java.base/share/classes/java/nio/channels/spi/AbstractInterruptibleChannel.java --- a/src/java.base/share/classes/java/nio/channels/spi/AbstractInterruptibleChannel.java Thu Apr 25 09:12:40 2019 +0200 +++ b/src/java.base/share/classes/java/nio/channels/spi/AbstractInterruptibleChannel.java Thu Apr 25 10:41:49 2019 +0100 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2000, 2019, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -23,13 +23,15 @@ * questions. */ -/* - */ - package java.nio.channels.spi; import java.io.IOException; -import java.nio.channels.*; +import java.nio.channels.AsynchronousCloseException; +import java.nio.channels.Channel; +import java.nio.channels.ClosedByInterruptException; +import java.nio.channels.InterruptibleChannel; +import java.util.concurrent.locks.ReentrantLock; + import jdk.internal.access.SharedSecrets; import sun.nio.ch.Interruptible; @@ -84,8 +86,7 @@ public abstract class AbstractInterruptibleChannel implements Channel, InterruptibleChannel { - - private final Object closeLock = new Object(); + private final ReentrantLock closeLock = new ReentrantLock(); private volatile boolean closed; /** @@ -105,11 +106,14 @@ * If an I/O error occurs */ public final void close() throws IOException { - synchronized (closeLock) { + closeLock.lock(); + try { if (closed) return; closed = true; implCloseChannel(); + } finally { + closeLock.unlock(); } } @@ -153,7 +157,8 @@ if (interruptor == null) { interruptor = new Interruptible() { public void interrupt(Thread target) { - synchronized (closeLock) { + closeLock.lock(); + try { if (closed) return; closed = true; @@ -161,6 +166,8 @@ try { AbstractInterruptibleChannel.this.implCloseChannel(); } catch (IOException x) { } + } finally { + closeLock.unlock(); } }}; } diff -r b43cc3b9ef40 -r 13b67c1420b8 src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java --- a/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java Thu Apr 25 09:12:40 2019 +0200 +++ b/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java Thu Apr 25 10:41:49 2019 +0100 @@ -53,6 +53,7 @@ import java.util.HashSet; import java.util.Objects; import java.util.Set; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import sun.net.ResourceManager; @@ -89,7 +90,8 @@ // Lock held by any thread that modifies the state fields declared below // DO NOT invoke a blocking I/O operation while holding this lock! - private final Object stateLock = new Object(); + private final ReentrantLock stateLock = new ReentrantLock(); + private final Condition stateCondition = stateLock.newCondition(); // -- The following fields are protected by stateLock @@ -179,8 +181,11 @@ : StandardProtocolFamily.INET; this.fd = fd; this.fdVal = IOUtil.fdVal(fd); - synchronized (stateLock) { + stateLock.lock(); + try { this.localAddress = Net.localAddress(fd); + } finally { + stateLock.unlock(); } } @@ -192,27 +197,36 @@ @Override public DatagramSocket socket() { - synchronized (stateLock) { + stateLock.lock(); + try { if (socket == null) socket = DatagramSocketAdaptor.create(this); return socket; + } finally { + stateLock.unlock(); } } @Override public SocketAddress getLocalAddress() throws IOException { - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); // Perform security check before returning address return Net.getRevealedLocalAddress(localAddress); + } finally { + stateLock.unlock(); } } @Override public SocketAddress getRemoteAddress() throws IOException { - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); return remoteAddress; + } finally { + stateLock.unlock(); } } @@ -224,7 +238,8 @@ if (!supportedOptions().contains(name)) throw new UnsupportedOperationException("'" + name + "' not supported"); - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); if (name == StandardSocketOptions.IP_TOS || @@ -264,6 +279,8 @@ // remaining options don't need any special handling Net.setSocketOption(fd, Net.UNSPEC, name, value); return this; + } finally { + stateLock.unlock(); } } @@ -276,7 +293,8 @@ if (!supportedOptions().contains(name)) throw new UnsupportedOperationException("'" + name + "' not supported"); - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); if (name == StandardSocketOptions.IP_TOS || @@ -315,6 +333,8 @@ // no special handling return (T) Net.getSocketOption(fd, Net.UNSPEC, name); + } finally { + stateLock.unlock(); } } @@ -362,7 +382,8 @@ begin(); } SocketAddress remote; - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); remote = remoteAddress; if ((remote == null) && mustBeConnected) @@ -371,6 +392,8 @@ bindInternal(null); if (blocking) readerThread = NativeThread.current(); + } finally { + stateLock.unlock(); } return remote; } @@ -384,12 +407,15 @@ throws AsynchronousCloseException { if (blocking) { - synchronized (stateLock) { + stateLock.lock(); + try { readerThread = 0; // notify any thread waiting in implCloseSelectableChannel if (state == ST_CLOSING) { - stateLock.notifyAll(); + stateCondition.signalAll(); } + } finally { + stateLock.unlock(); } // remove hook for Thread.interrupt end(completed); @@ -414,21 +440,29 @@ SecurityManager sm = System.getSecurityManager(); if (connected || (sm == null)) { // connected or no security manager - do { - n = receive(fd, dst, connected); - } while ((n == IOStatus.INTERRUPTED) && isOpen()); - if (n == IOStatus.UNAVAILABLE) + n = receive(fd, dst, connected); + if (blocking) { + while (IOStatus.okayToRetry(n) && isOpen()) { + park(Net.POLLIN); + n = receive(fd, dst, connected); + } + } else if (n == IOStatus.UNAVAILABLE) { return null; + } } else { // Cannot receive into user's buffer when running with a // security manager and not connected bb = Util.getTemporaryDirectBuffer(dst.remaining()); for (;;) { - do { - n = receive(fd, bb, connected); - } while ((n == IOStatus.INTERRUPTED) && isOpen()); - if (n == IOStatus.UNAVAILABLE) + n = receive(fd, bb, connected); + if (blocking) { + while (IOStatus.okayToRetry(n) && isOpen()) { + park(Net.POLLIN); + n = receive(fd, bb, connected); + } + } else if (n == IOStatus.UNAVAILABLE) { return null; + } InetSocketAddress isa = (InetSocketAddress)sender; try { sm.checkAccept(isa.getAddress().getHostAddress(), @@ -493,6 +527,7 @@ return n; } + @Override public int send(ByteBuffer src, SocketAddress target) throws IOException { @@ -510,9 +545,13 @@ if (!target.equals(remote)) { throw new AlreadyConnectedException(); } - do { - n = IOUtil.write(fd, src, -1, nd); - } while ((n == IOStatus.INTERRUPTED) && isOpen()); + n = IOUtil.write(fd, src, -1, nd); + if (blocking) { + while (IOStatus.okayToRetry(n) && isOpen()) { + park(Net.POLLOUT); + n = IOUtil.write(fd, src, -1, nd); + } + } } else { // not connected SecurityManager sm = System.getSecurityManager(); @@ -524,9 +563,13 @@ sm.checkConnect(ia.getHostAddress(), isa.getPort()); } } - do { - n = send(fd, src, isa); - } while ((n == IOStatus.INTERRUPTED) && isOpen()); + n = send(fd, src, isa); + if (blocking) { + while (IOStatus.okayToRetry(n) && isOpen()) { + park(Net.POLLOUT); + n = send(fd, src, isa); + } + } } } finally { endWrite(blocking, n > 0); @@ -602,10 +645,13 @@ int n = 0; try { beginRead(blocking, true); - do { - n = IOUtil.read(fd, buf, -1, nd); - } while ((n == IOStatus.INTERRUPTED) && isOpen()); - + n = IOUtil.read(fd, buf, -1, nd); + if (blocking) { + while (IOStatus.okayToRetry(n) && isOpen()) { + park(Net.POLLIN); + n = IOUtil.read(fd, buf, -1, nd); + } + } } finally { endRead(blocking, n > 0); assert IOStatus.check(n); @@ -628,10 +674,13 @@ long n = 0; try { beginRead(blocking, true); - do { - n = IOUtil.read(fd, dsts, offset, length, nd); - } while ((n == IOStatus.INTERRUPTED) && isOpen()); - + n = IOUtil.read(fd, dsts, offset, length, nd); + if (blocking) { + while (IOStatus.okayToRetry(n) && isOpen()) { + park(Net.POLLIN); + n = IOUtil.read(fd, dsts, offset, length, nd); + } + } } finally { endRead(blocking, n > 0); assert IOStatus.check(n); @@ -659,7 +708,8 @@ begin(); } SocketAddress remote; - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); remote = remoteAddress; if ((remote == null) && mustBeConnected) @@ -668,6 +718,8 @@ bindInternal(null); if (blocking) writerThread = NativeThread.current(); + } finally { + stateLock.unlock(); } return remote; } @@ -681,12 +733,15 @@ throws AsynchronousCloseException { if (blocking) { - synchronized (stateLock) { + stateLock.lock(); + try { writerThread = 0; // notify any thread waiting in implCloseSelectableChannel if (state == ST_CLOSING) { - stateLock.notifyAll(); + stateCondition.signalAll(); } + } finally { + stateLock.unlock(); } // remove hook for Thread.interrupt end(completed); @@ -703,9 +758,13 @@ int n = 0; try { beginWrite(blocking, true); - do { - n = IOUtil.write(fd, buf, -1, nd); - } while ((n == IOStatus.INTERRUPTED) && isOpen()); + n = IOUtil.write(fd, buf, -1, nd); + if (blocking) { + while (IOStatus.okayToRetry(n) && isOpen()) { + park(Net.POLLOUT); + n = IOUtil.write(fd, buf, -1, nd); + } + } } finally { endWrite(blocking, n > 0); assert IOStatus.check(n); @@ -728,9 +787,13 @@ long n = 0; try { beginWrite(blocking, true); - do { - n = IOUtil.write(fd, srcs, offset, length, nd); - } while ((n == IOStatus.INTERRUPTED) && isOpen()); + n = IOUtil.write(fd, srcs, offset, length, nd); + if (blocking) { + while (IOStatus.okayToRetry(n) && isOpen()) { + park(Net.POLLOUT); + n = IOUtil.write(fd, srcs, offset, length, nd); + } + } } finally { endWrite(blocking, n > 0); assert IOStatus.check(n); @@ -747,9 +810,12 @@ try { writeLock.lock(); try { - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); IOUtil.configureBlocking(fd, block); + } finally { + stateLock.unlock(); } } finally { writeLock.unlock(); @@ -760,14 +826,20 @@ } InetSocketAddress localAddress() { - synchronized (stateLock) { + stateLock.lock(); + try { return localAddress; + } finally { + stateLock.unlock(); } } InetSocketAddress remoteAddress() { - synchronized (stateLock) { + stateLock.lock(); + try { return remoteAddress; + } finally { + stateLock.unlock(); } } @@ -777,11 +849,14 @@ try { writeLock.lock(); try { - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); if (localAddress != null) throw new AlreadyBoundException(); bindInternal(local); + } finally { + stateLock.unlock(); } } finally { writeLock.unlock(); @@ -793,7 +868,7 @@ } private void bindInternal(SocketAddress local) throws IOException { - assert Thread.holdsLock(stateLock) && (localAddress == null); + assert stateLock.isHeldByCurrentThread() && (localAddress == null); InetSocketAddress isa; if (local == null) { @@ -816,8 +891,11 @@ @Override public boolean isConnected() { - synchronized (stateLock) { + stateLock.lock(); + try { return (state == ST_CONNECTED); + } finally { + stateLock.unlock(); } } @@ -839,7 +917,8 @@ try { writeLock.lock(); try { - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); if (state == ST_CONNECTED) throw new AlreadyConnectedException(); @@ -865,7 +944,7 @@ } try { ByteBuffer buf = ByteBuffer.allocate(100); - while (receive(buf) != null) { + while (receive(fd, buf, false) > 0) { buf.clear(); } } finally { @@ -873,6 +952,9 @@ IOUtil.configureBlocking(fd, true); } } + + } finally { + stateLock.unlock(); } } finally { writeLock.unlock(); @@ -889,7 +971,8 @@ try { writeLock.lock(); try { - synchronized (stateLock) { + stateLock.lock(); + try { if (!isOpen() || (state != ST_CONNECTED)) return this; @@ -903,6 +986,8 @@ // refresh local address localAddress = Net.localAddress(fd); + } finally { + stateLock.unlock(); } } finally { writeLock.unlock(); @@ -950,7 +1035,8 @@ if (sm != null) sm.checkMulticast(group); - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); // check the registry to see if we are already a member of the group @@ -1005,6 +1091,8 @@ registry.add(key); return key; + } finally { + stateLock.unlock(); } } @@ -1030,7 +1118,8 @@ void drop(MembershipKeyImpl key) { assert key.channel() == this; - synchronized (stateLock) { + stateLock.lock(); + try { if (!key.isValid()) return; @@ -1051,6 +1140,8 @@ key.invalidate(); registry.remove(key); + } finally { + stateLock.unlock(); } } @@ -1064,7 +1155,8 @@ assert key.channel() == this; assert key.sourceAddress() == null; - synchronized (stateLock) { + stateLock.lock(); + try { if (!key.isValid()) throw new IllegalStateException("key is no longer valid"); if (source.isAnyLocalAddress()) @@ -1090,6 +1182,8 @@ // ancient kernel throw new UnsupportedOperationException(); } + } finally { + stateLock.unlock(); } } @@ -1100,7 +1194,8 @@ assert key.channel() == this; assert key.sourceAddress() == null; - synchronized (stateLock) { + stateLock.lock(); + try { if (!key.isValid()) throw new IllegalStateException("key is no longer valid"); @@ -1120,6 +1215,8 @@ // should not happen throw new AssertionError(ioe); } + } finally { + stateLock.unlock(); } } @@ -1144,7 +1241,8 @@ boolean interrupted = false; // set state to ST_CLOSING and invalid membership keys - synchronized (stateLock) { + stateLock.lock(); + try { assert state < ST_CLOSING; blocking = isBlocking(); state = ST_CLOSING; @@ -1152,11 +1250,14 @@ // if member of any multicast groups then invalidate the keys if (registry != null) registry.invalidateAll(); + } finally { + stateLock.unlock(); } // wait for any outstanding I/O operations to complete if (blocking) { - synchronized (stateLock) { + stateLock.lock(); + try { assert state == ST_CLOSING; long reader = readerThread; long writer = writerThread; @@ -1171,12 +1272,14 @@ // wait for blocking I/O operations to end while (readerThread != 0 || writerThread != 0) { try { - stateLock.wait(); + stateCondition.await(); } catch (InterruptedException e) { interrupted = true; } } } + } finally { + stateLock.unlock(); } } else { // non-blocking mode: wait for read/write to complete @@ -1190,9 +1293,12 @@ } // set state to ST_KILLPENDING - synchronized (stateLock) { + stateLock.lock(); + try { assert state == ST_CLOSING; state = ST_KILLPENDING; + } finally { + stateLock.unlock(); } // close socket if not registered with Selector @@ -1206,7 +1312,8 @@ @Override public void kill() throws IOException { - synchronized (stateLock) { + stateLock.lock(); + try { if (state == ST_KILLPENDING) { state = ST_KILLED; try { @@ -1216,6 +1323,8 @@ ResourceManager.afterUdpClose(); } } + } finally { + stateLock.unlock(); } } diff -r b43cc3b9ef40 -r 13b67c1420b8 src/java.base/share/classes/sun/nio/ch/DatagramSocketAdaptor.java --- a/src/java.base/share/classes/sun/nio/ch/DatagramSocketAdaptor.java Thu Apr 25 09:12:40 2019 +0200 +++ b/src/java.base/share/classes/sun/nio/ch/DatagramSocketAdaptor.java Thu Apr 25 10:41:49 2019 +0100 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2001, 2018, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2001, 2019, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -42,6 +42,7 @@ import java.nio.channels.DatagramChannel; import java.nio.channels.IllegalBlockingModeException; import java.util.Objects; +import java.util.Set; // Make a datagram-socket channel look like a datagram socket. @@ -51,7 +52,7 @@ // class. // -public class DatagramSocketAdaptor +class DatagramSocketAdaptor extends DatagramSocket { // The channel being adapted @@ -61,7 +62,7 @@ private volatile int timeout; // ## super will create a useless impl - private DatagramSocketAdaptor(DatagramChannelImpl dc) throws IOException { + private DatagramSocketAdaptor(DatagramChannelImpl dc) { // Invoke the DatagramSocketAdaptor(SocketAddress) constructor, // passing a dummy DatagramSocketImpl object to avoid any native // resource allocation in super class and invoking our bind method @@ -70,12 +71,8 @@ this.dc = dc; } - public static DatagramSocket create(DatagramChannelImpl dc) { - try { - return new DatagramSocketAdaptor(dc); - } catch (IOException x) { - throw new Error(x); - } + static DatagramSocket create(DatagramChannelImpl dc) { + return new DatagramSocketAdaptor(dc); } private void connectInternal(SocketAddress remote) @@ -96,6 +93,7 @@ } } + @Override public void bind(SocketAddress local) throws SocketException { try { if (local == null) @@ -106,6 +104,7 @@ } } + @Override public void connect(InetAddress address, int port) { try { connectInternal(new InetSocketAddress(address, port)); @@ -114,11 +113,13 @@ } } + @Override public void connect(SocketAddress remote) throws SocketException { Objects.requireNonNull(remote, "Address can't be null"); connectInternal(remote); } + @Override public void disconnect() { try { dc.disconnect(); @@ -127,24 +128,39 @@ } } + @Override public boolean isBound() { return dc.localAddress() != null; } + @Override public boolean isConnected() { return dc.remoteAddress() != null; } + @Override public InetAddress getInetAddress() { InetSocketAddress remote = dc.remoteAddress(); return (remote != null) ? remote.getAddress() : null; } + @Override public int getPort() { InetSocketAddress remote = dc.remoteAddress(); return (remote != null) ? remote.getPort() : -1; } + @Override + public SocketAddress getRemoteSocketAddress() { + return dc.remoteAddress(); + } + + @Override + public SocketAddress getLocalSocketAddress() { + return dc.localAddress(); + } + + @Override public void send(DatagramPacket p) throws IOException { synchronized (dc.blockingLock()) { if (!dc.isBlocking()) @@ -198,6 +214,7 @@ } } + @Override public void receive(DatagramPacket p) throws IOException { synchronized (dc.blockingLock()) { if (!dc.isBlocking()) @@ -217,6 +234,7 @@ } } + @Override public InetAddress getLocalAddress() { if (isClosed()) return null; @@ -235,6 +253,7 @@ return result; } + @Override public int getLocalPort() { if (isClosed()) return -1; @@ -248,11 +267,19 @@ return 0; } + @Override public void setSoTimeout(int timeout) throws SocketException { + if (!dc.isOpen()) + throw new SocketException("Socket is closed"); + if (timeout < 0) + throw new IllegalArgumentException("timeout < 0"); this.timeout = timeout; } + @Override public int getSoTimeout() throws SocketException { + if (!dc.isOpen()) + throw new SocketException("Socket is closed"); return timeout; } @@ -294,51 +321,62 @@ } } + @Override public void setSendBufferSize(int size) throws SocketException { if (size <= 0) throw new IllegalArgumentException("Invalid send size"); setIntOption(StandardSocketOptions.SO_SNDBUF, size); } + @Override public int getSendBufferSize() throws SocketException { return getIntOption(StandardSocketOptions.SO_SNDBUF); } + @Override public void setReceiveBufferSize(int size) throws SocketException { if (size <= 0) throw new IllegalArgumentException("Invalid receive size"); setIntOption(StandardSocketOptions.SO_RCVBUF, size); } + @Override public int getReceiveBufferSize() throws SocketException { return getIntOption(StandardSocketOptions.SO_RCVBUF); } + @Override public void setReuseAddress(boolean on) throws SocketException { setBooleanOption(StandardSocketOptions.SO_REUSEADDR, on); } + @Override public boolean getReuseAddress() throws SocketException { return getBooleanOption(StandardSocketOptions.SO_REUSEADDR); } + @Override public void setBroadcast(boolean on) throws SocketException { setBooleanOption(StandardSocketOptions.SO_BROADCAST, on); } + @Override public boolean getBroadcast() throws SocketException { return getBooleanOption(StandardSocketOptions.SO_BROADCAST); } + @Override public void setTrafficClass(int tc) throws SocketException { setIntOption(StandardSocketOptions.IP_TOS, tc); } + @Override public int getTrafficClass() throws SocketException { return getIntOption(StandardSocketOptions.IP_TOS); } + @Override public void close() { try { dc.close(); @@ -347,14 +385,32 @@ } } + @Override public boolean isClosed() { return !dc.isOpen(); } + @Override public DatagramChannel getChannel() { return dc; } + @Override + public DatagramSocket setOption(SocketOption name, T value) throws IOException { + dc.setOption(name, value); + return this; + } + + @Override + public T getOption(SocketOption name) throws IOException { + return dc.getOption(name); + } + + @Override + public Set> supportedOptions() { + return dc.supportedOptions(); + } + /* * A dummy implementation of DatagramSocketImpl that can be passed to the * DatagramSocket constructor so that no native resources are allocated in diff -r b43cc3b9ef40 -r 13b67c1420b8 src/java.base/share/classes/sun/nio/ch/DummySocketImpl.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/java.base/share/classes/sun/nio/ch/DummySocketImpl.java Thu Apr 25 10:41:49 2019 +0100 @@ -0,0 +1,153 @@ +/* + * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +package sun.nio.ch; + +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.SocketAddress; +import java.net.SocketImpl; +import java.net.SocketOption; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Set; + +/** + * Dummy SocketImpl for use by the socket adaptors. All methods are overridden + * to throw an error. + */ + +class DummySocketImpl extends SocketImpl { + private static final PrivilegedAction NEW = DummySocketImpl::new; + + private DummySocketImpl() { } + + static SocketImpl create() { + return AccessController.doPrivileged(NEW); + } + + private static T shouldNotGetHere() { + throw new InternalError("Should not get here"); + } + + @Override + protected void create(boolean stream) { + shouldNotGetHere(); + } + + @Override + protected void connect(SocketAddress remote, int millis) { + shouldNotGetHere(); + } + + @Override + protected void connect(String host, int port) { + shouldNotGetHere(); + } + + @Override + protected void connect(InetAddress address, int port) { + shouldNotGetHere(); + } + + @Override + protected void bind(InetAddress host, int port) { + shouldNotGetHere(); + } + + @Override + protected void listen(int backlog) { + shouldNotGetHere(); + } + + @Override + protected void accept(SocketImpl si) { + shouldNotGetHere(); + } + + @Override + protected InputStream getInputStream() { + return shouldNotGetHere(); + } + @Override + protected OutputStream getOutputStream() { + return shouldNotGetHere(); + } + @Override + protected int available() { + return shouldNotGetHere(); + } + + @Override + protected void close() { + shouldNotGetHere(); + } + + @Override + protected Set> supportedOptions() { + return shouldNotGetHere(); + } + + @Override + protected void setOption(SocketOption opt, T value) { + shouldNotGetHere(); + } + + @Override + protected T getOption(SocketOption opt) { + return shouldNotGetHere(); + } + + @Override + public void setOption(int opt, Object value) { + shouldNotGetHere(); + } + + @Override + public Object getOption(int opt) { + return shouldNotGetHere(); + } + + @Override + protected void shutdownInput() { + shouldNotGetHere(); + } + + @Override + protected void shutdownOutput() { + shouldNotGetHere(); + } + + @Override + protected boolean supportsUrgentData() { + return shouldNotGetHere(); + } + + @Override + protected void sendUrgentData(int data) { + shouldNotGetHere(); + } +} diff -r b43cc3b9ef40 -r 13b67c1420b8 src/java.base/share/classes/sun/nio/ch/SelChImpl.java --- a/src/java.base/share/classes/sun/nio/ch/SelChImpl.java Thu Apr 25 09:12:40 2019 +0200 +++ b/src/java.base/share/classes/sun/nio/ch/SelChImpl.java Thu Apr 25 10:41:49 2019 +0100 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2000, 2019, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -29,6 +29,8 @@ import java.io.FileDescriptor; import java.io.IOException; +import static java.util.concurrent.TimeUnit.NANOSECONDS; + /** * An interface that allows translation (and more!). * @@ -68,4 +70,40 @@ void kill() throws IOException; + /** + * Disables the current thread for scheduling purposes until this + * channel is ready for I/O, or asynchronously closed, for up to the + * specified waiting time. + * + *

This method does not report which of these caused the + * method to return. Callers should re-check the conditions which caused + * the thread to park. + * + * @param event the event to poll + * @param nanos the timeout to wait; {@code <= 0} to wait indefinitely + */ + default void park(int event, long nanos) throws IOException { + long millis; + if (nanos <= 0) { + millis = -1; + } else { + millis = NANOSECONDS.toMillis(nanos); + } + Net.poll(getFD(), event, millis); + } + + /** + * Disables the current thread for scheduling purposes until this + * channel is ready for I/O, or asynchronously closed. + * + *

This method does not report which of these caused the + * method to return. Callers should re-check the conditions which caused + * the thread to park. + * + * @param event the event to poll + */ + default void park(int event) throws IOException { + park(event, 0L); + } + } diff -r b43cc3b9ef40 -r 13b67c1420b8 src/java.base/share/classes/sun/nio/ch/ServerSocketAdaptor.java --- a/src/java.base/share/classes/sun/nio/ch/ServerSocketAdaptor.java Thu Apr 25 09:12:40 2019 +0200 +++ b/src/java.base/share/classes/sun/nio/ch/ServerSocketAdaptor.java Thu Apr 25 10:41:49 2019 +0100 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2000, 2019, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -32,12 +32,14 @@ import java.net.Socket; import java.net.SocketAddress; import java.net.SocketException; -import java.net.SocketTimeoutException; +import java.net.SocketOption; import java.net.StandardSocketOptions; import java.nio.channels.IllegalBlockingModeException; -import java.nio.channels.NotYetBoundException; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +import java.util.Set; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; // Make a server-socket channel look like a server socket. @@ -56,23 +58,21 @@ // Timeout "option" value for accepts private volatile int timeout; - public static ServerSocket create(ServerSocketChannelImpl ssc) { - try { - return new ServerSocketAdaptor(ssc); - } catch (IOException x) { - throw new Error(x); - } + static ServerSocket create(ServerSocketChannelImpl ssc) { + return new ServerSocketAdaptor(ssc); } - // ## super will create a useless impl - private ServerSocketAdaptor(ServerSocketChannelImpl ssc) throws IOException { + private ServerSocketAdaptor(ServerSocketChannelImpl ssc) { + super(DummySocketImpl.create()); this.ssc = ssc; } + @Override public void bind(SocketAddress local) throws IOException { bind(local, 50); } + @Override public void bind(SocketAddress local, int backlog) throws IOException { if (local == null) local = new InetSocketAddress(0); @@ -83,6 +83,7 @@ } } + @Override public InetAddress getInetAddress() { InetSocketAddress local = ssc.localAddress(); if (local == null) { @@ -92,6 +93,7 @@ } } + @Override public int getLocalPort() { InetSocketAddress local = ssc.localAddress(); if (local == null) { @@ -101,65 +103,65 @@ } } + @Override public Socket accept() throws IOException { - synchronized (ssc.blockingLock()) { - try { - if (!ssc.isBound()) - throw new NotYetBoundException(); - - long to = this.timeout; - if (to == 0) { - // for compatibility reasons: accept connection if available - // when configured non-blocking - SocketChannel sc = ssc.accept(); - if (sc == null && !ssc.isBlocking()) - throw new IllegalBlockingModeException(); - return sc.socket(); + SocketChannel sc = null; + try { + int timeout = this.timeout; + if (timeout > 0) { + long nanos = MILLISECONDS.toNanos(timeout); + sc = ssc.blockingAccept(nanos); + } else { + // accept connection if possible when non-blocking (to preserve + // long standing behavior) + sc = ssc.accept(); + if (sc == null) { + throw new IllegalBlockingModeException(); } - - if (!ssc.isBlocking()) - throw new IllegalBlockingModeException(); - for (;;) { - long st = System.currentTimeMillis(); - if (ssc.pollAccept(to)) - return ssc.accept().socket(); - to -= System.currentTimeMillis() - st; - if (to <= 0) - throw new SocketTimeoutException(); - } - - } catch (Exception x) { - Net.translateException(x); - assert false; - return null; // Never happens } + } catch (Exception e) { + Net.translateException(e); } + return sc.socket(); } + @Override public void close() throws IOException { ssc.close(); } + @Override public ServerSocketChannel getChannel() { return ssc; } + @Override public boolean isBound() { return ssc.isBound(); } + @Override public boolean isClosed() { return !ssc.isOpen(); } + @Override public void setSoTimeout(int timeout) throws SocketException { + if (!ssc.isOpen()) + throw new SocketException("Socket is closed"); + if (timeout < 0) + throw new IllegalArgumentException("timeout < 0"); this.timeout = timeout; } + @Override public int getSoTimeout() throws SocketException { + if (!ssc.isOpen()) + throw new SocketException("Socket is closed"); return timeout; } + @Override public void setReuseAddress(boolean on) throws SocketException { try { ssc.setOption(StandardSocketOptions.SO_REUSEADDR, on); @@ -168,6 +170,7 @@ } } + @Override public boolean getReuseAddress() throws SocketException { try { return ssc.getOption(StandardSocketOptions.SO_REUSEADDR).booleanValue(); @@ -177,6 +180,7 @@ } } + @Override public String toString() { if (!isBound()) return "ServerSocket[unbound]"; @@ -184,6 +188,7 @@ ",localport=" + getLocalPort() + "]"; } + @Override public void setReceiveBufferSize(int size) throws SocketException { // size 0 valid for ServerSocketChannel, invalid for ServerSocket if (size <= 0) @@ -195,6 +200,7 @@ } } + @Override public int getReceiveBufferSize() throws SocketException { try { return ssc.getOption(StandardSocketOptions.SO_RCVBUF).intValue(); @@ -203,4 +209,20 @@ return -1; // Never happens } } + + @Override + public ServerSocket setOption(SocketOption name, T value) throws IOException { + ssc.setOption(name, value); + return this; + } + + @Override + public T getOption(SocketOption name) throws IOException { + return ssc.getOption(name); + } + + @Override + public Set> supportedOptions() { + return ssc.supportedOptions(); + } } diff -r b43cc3b9ef40 -r 13b67c1420b8 src/java.base/share/classes/sun/nio/ch/ServerSocketChannelImpl.java --- a/src/java.base/share/classes/sun/nio/ch/ServerSocketChannelImpl.java Thu Apr 25 09:12:40 2019 +0200 +++ b/src/java.base/share/classes/sun/nio/ch/ServerSocketChannelImpl.java Thu Apr 25 10:41:49 2019 +0100 @@ -31,10 +31,12 @@ import java.net.ServerSocket; import java.net.SocketAddress; import java.net.SocketOption; +import java.net.SocketTimeoutException; import java.net.StandardSocketOptions; import java.nio.channels.AlreadyBoundException; import java.nio.channels.AsynchronousCloseException; import java.nio.channels.ClosedChannelException; +import java.nio.channels.IllegalBlockingModeException; import java.nio.channels.NotYetBoundException; import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; @@ -44,6 +46,7 @@ import java.util.HashSet; import java.util.Objects; import java.util.Set; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import sun.net.NetHooks; @@ -69,7 +72,8 @@ // Lock held by any thread that modifies the state fields declared below // DO NOT invoke a blocking I/O operation while holding this lock! - private final Object stateLock = new Object(); + private final ReentrantLock stateLock = new ReentrantLock(); + private final Condition stateCondition = stateLock.newCondition(); // -- The following fields are protected by stateLock @@ -95,7 +99,7 @@ // -- End of fields protected by stateLock - ServerSocketChannelImpl(SelectorProvider sp) throws IOException { + ServerSocketChannelImpl(SelectorProvider sp) { super(sp); this.fd = Net.serverSocket(true); this.fdVal = IOUtil.fdVal(fd); @@ -108,8 +112,11 @@ this.fd = fd; this.fdVal = IOUtil.fdVal(fd); if (bound) { - synchronized (stateLock) { + stateLock.lock(); + try { localAddress = Net.localAddress(fd); + } finally { + stateLock.unlock(); } } } @@ -122,20 +129,26 @@ @Override public ServerSocket socket() { - synchronized (stateLock) { + stateLock.lock(); + try { if (socket == null) socket = ServerSocketAdaptor.create(this); return socket; + } finally { + stateLock.unlock(); } } @Override public SocketAddress getLocalAddress() throws IOException { - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); return (localAddress == null) ? null : Net.getRevealedLocalAddress(localAddress); + } finally { + stateLock.unlock(); } } @@ -146,7 +159,8 @@ Objects.requireNonNull(name); if (!supportedOptions().contains(name)) throw new UnsupportedOperationException("'" + name + "' not supported"); - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) { @@ -157,6 +171,8 @@ Net.setSocketOption(fd, Net.UNSPEC, name, value); } return this; + } finally { + stateLock.unlock(); } } @@ -169,7 +185,8 @@ if (!supportedOptions().contains(name)) throw new UnsupportedOperationException("'" + name + "' not supported"); - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) { // SO_REUSEADDR emulated when using exclusive bind @@ -177,6 +194,8 @@ } // no options that require special handling return (T) Net.getSocketOption(fd, Net.UNSPEC, name); + } finally { + stateLock.unlock(); } } @@ -202,7 +221,8 @@ @Override public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException { - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); if (localAddress != null) throw new AlreadyBoundException(); @@ -216,6 +236,8 @@ Net.bind(fd, isa.getAddress(), isa.getPort()); Net.listen(fd, backlog < 1 ? 50 : backlog); localAddress = Net.localAddress(fd); + } finally { + stateLock.unlock(); } return this; } @@ -229,12 +251,15 @@ private void begin(boolean blocking) throws ClosedChannelException { if (blocking) begin(); // set blocker to close channel if interrupted - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); if (localAddress == null) throw new NotYetBoundException(); if (blocking) thread = NativeThread.current(); + } finally { + stateLock.unlock(); } } @@ -248,12 +273,15 @@ throws AsynchronousCloseException { if (blocking) { - synchronized (stateLock) { + stateLock.lock(); + try { thread = 0; // notify any thread waiting in implCloseSelectableChannel if (state == ST_CLOSING) { - stateLock.notifyAll(); + stateCondition.signalAll(); } + } finally { + stateLock.unlock(); } end(completed); } @@ -270,22 +298,82 @@ boolean blocking = isBlocking(); try { begin(blocking); - do { - n = Net.accept(this.fd, newfd, isaa); - } while (n == IOStatus.INTERRUPTED && isOpen()); + n = Net.accept(this.fd, newfd, isaa); + if (blocking) { + while (IOStatus.okayToRetry(n) && isOpen()) { + park(Net.POLLIN); + n = Net.accept(this.fd, newfd, isaa); + } + } } finally { end(blocking, n > 0); assert IOStatus.check(n); } - } finally { acceptLock.unlock(); } - if (n < 1) + if (n > 0) { + return finishAccept(newfd, isaa[0]); + } else { return null; + } + } - InetSocketAddress isa = isaa[0]; + /** + * Accepts a new connection with a given timeout. This method requires the + * channel to be configured in blocking mode. + * + * @apiNote This method is for use by the socket adaptor. + * + * @param nanos the timeout, in nanoseconds + * @throws IllegalBlockingModeException if the channel is configured non-blocking + * @throws SocketTimeoutException if the timeout expires + */ + SocketChannel blockingAccept(long nanos) throws IOException { + int n = 0; + FileDescriptor newfd = new FileDescriptor(); + InetSocketAddress[] isaa = new InetSocketAddress[1]; + + acceptLock.lock(); + try { + // check that channel is configured blocking + if (!isBlocking()) + throw new IllegalBlockingModeException(); + + try { + begin(true); + // change socket to non-blocking + lockedConfigureBlocking(false); + try { + long startNanos = System.nanoTime(); + n = Net.accept(fd, newfd, isaa); + while (n == IOStatus.UNAVAILABLE && isOpen()) { + long remainingNanos = nanos - (System.nanoTime() - startNanos); + if (remainingNanos <= 0) { + throw new SocketTimeoutException("Accept timed out"); + } + park(Net.POLLIN, remainingNanos); + n = Net.accept(fd, newfd, isaa); + } + } finally { + // restore socket to blocking mode + lockedConfigureBlocking(true); + } + } finally { + end(true, n > 0); + } + } finally { + acceptLock.unlock(); + } + + assert n > 0; + return finishAccept(newfd, isaa[0]); + } + + private SocketChannel finishAccept(FileDescriptor newfd, InetSocketAddress isa) + throws IOException + { try { // newly accepted socket is initially in blocking mode IOUtil.configureBlocking(newfd, true); @@ -306,16 +394,27 @@ protected void implConfigureBlocking(boolean block) throws IOException { acceptLock.lock(); try { - synchronized (stateLock) { - ensureOpen(); - IOUtil.configureBlocking(fd, block); - } + lockedConfigureBlocking(block); } finally { acceptLock.unlock(); } } /** + * Adjust the blocking mode while holding acceptLock. + */ + private void lockedConfigureBlocking(boolean block) throws IOException { + assert acceptLock.isHeldByCurrentThread(); + stateLock.lock(); + try { + ensureOpen(); + IOUtil.configureBlocking(fd, block); + } finally { + stateLock.unlock(); + } + } + + /** * Invoked by implCloseChannel to close the channel. * * This method waits for outstanding I/O operations to complete. When in @@ -336,15 +435,19 @@ boolean blocking; // set state to ST_CLOSING - synchronized (stateLock) { + stateLock.lock(); + try { assert state < ST_CLOSING; state = ST_CLOSING; blocking = isBlocking(); + } finally { + stateLock.unlock(); } // wait for any outstanding accept to complete if (blocking) { - synchronized (stateLock) { + stateLock.lock(); + try { assert state == ST_CLOSING; long th = thread; if (th != 0) { @@ -354,12 +457,14 @@ // wait for accept operation to end while (thread != 0) { try { - stateLock.wait(); + stateCondition.await(); } catch (InterruptedException e) { interrupted = true; } } } + } finally { + stateLock.unlock(); } } else { // non-blocking mode: wait for accept to complete @@ -368,9 +473,12 @@ } // set state to ST_KILLPENDING - synchronized (stateLock) { + stateLock.lock(); + try { assert state == ST_CLOSING; state = ST_KILLPENDING; + } finally { + stateLock.unlock(); } // close socket if not registered with Selector @@ -384,11 +492,14 @@ @Override public void kill() throws IOException { - synchronized (stateLock) { + stateLock.lock(); + try { if (state == ST_KILLPENDING) { state = ST_KILLED; nd.close(fd); } + } finally { + stateLock.unlock(); } } @@ -396,8 +507,11 @@ * Returns true if channel's socket is bound */ boolean isBound() { - synchronized (stateLock) { + stateLock.lock(); + try { return localAddress != null; + } finally { + stateLock.unlock(); } } @@ -405,30 +519,11 @@ * Returns the local address, or null if not bound */ InetSocketAddress localAddress() { - synchronized (stateLock) { + stateLock.lock(); + try { return localAddress; - } - } - - /** - * Poll this channel's socket for a new connection up to the given timeout. - * @return {@code true} if there is a connection to accept - */ - boolean pollAccept(long timeout) throws IOException { - assert Thread.holdsLock(blockingLock()) && isBlocking(); - acceptLock.lock(); - try { - boolean polled = false; - try { - begin(true); - int events = Net.poll(fd, Net.POLLIN, timeout); - polled = (events != 0); - } finally { - end(true, polled); - } - return polled; } finally { - acceptLock.unlock(); + stateLock.unlock(); } } @@ -494,13 +589,16 @@ if (!isOpen()) { sb.append("closed"); } else { - synchronized (stateLock) { + stateLock.lock(); + try { InetSocketAddress addr = localAddress; if (addr == null) { sb.append("unbound"); } else { sb.append(Net.getRevealedLocalAddressAsString(addr)); } + } finally { + stateLock.unlock(); } } sb.append(']'); diff -r b43cc3b9ef40 -r 13b67c1420b8 src/java.base/share/classes/sun/nio/ch/SocketAdaptor.java --- a/src/java.base/share/classes/sun/nio/ch/SocketAdaptor.java Thu Apr 25 09:12:40 2019 +0200 +++ b/src/java.base/share/classes/sun/nio/ch/SocketAdaptor.java Thu Apr 25 10:41:49 2019 +0100 @@ -33,18 +33,12 @@ import java.net.Socket; import java.net.SocketAddress; import java.net.SocketException; -import java.net.SocketImpl; import java.net.SocketOption; -import java.net.SocketTimeoutException; import java.net.StandardSocketOptions; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.IllegalBlockingModeException; import java.nio.channels.SocketChannel; -import java.security.AccessController; -import java.security.PrivilegedExceptionAction; -import static java.util.concurrent.TimeUnit.*; +import java.util.Set; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; // Make a socket channel look like a socket. // @@ -62,11 +56,11 @@ private volatile int timeout; private SocketAdaptor(SocketChannelImpl sc) throws SocketException { - super((SocketImpl) null); + super(DummySocketImpl.create()); this.sc = sc; } - public static Socket create(SocketChannelImpl sc) { + static Socket create(SocketChannelImpl sc) { try { return new SocketAdaptor(sc); } catch (SocketException e) { @@ -74,70 +68,30 @@ } } - public SocketChannel getChannel() { - return sc; - } - - // Override this method just to protect against changes in the superclass - // + @Override public void connect(SocketAddress remote) throws IOException { connect(remote, 0); } + @Override public void connect(SocketAddress remote, int timeout) throws IOException { if (remote == null) throw new IllegalArgumentException("connect: The address can't be null"); if (timeout < 0) throw new IllegalArgumentException("connect: timeout can't be negative"); - - synchronized (sc.blockingLock()) { - if (!sc.isBlocking()) - throw new IllegalBlockingModeException(); - - try { - // no timeout - if (timeout == 0) { - sc.connect(remote); - return; - } - - // timed connect - sc.configureBlocking(false); - try { - if (sc.connect(remote)) - return; - } finally { - try { - sc.configureBlocking(true); - } catch (ClosedChannelException e) { } - } - - long timeoutNanos = NANOSECONDS.convert(timeout, MILLISECONDS); - long to = timeout; - for (;;) { - long startTime = System.nanoTime(); - if (sc.pollConnected(to)) { - boolean connected = sc.finishConnect(); - assert connected; - break; - } - timeoutNanos -= System.nanoTime() - startTime; - if (timeoutNanos <= 0) { - try { - sc.close(); - } catch (IOException x) { } - throw new SocketTimeoutException(); - } - to = MILLISECONDS.convert(timeoutNanos, NANOSECONDS); - } - - } catch (Exception x) { - Net.translateException(x, true); + try { + if (timeout > 0) { + long nanos = MILLISECONDS.toNanos(timeout); + sc.blockingConnect(remote, nanos); + } else { + sc.blockingConnect(remote, Long.MAX_VALUE); } + } catch (Exception e) { + Net.translateException(e, true); } - } + @Override public void bind(SocketAddress local) throws IOException { try { sc.bind(local); @@ -146,6 +100,7 @@ } } + @Override public InetAddress getInetAddress() { InetSocketAddress remote = sc.remoteAddress(); if (remote == null) { @@ -155,6 +110,7 @@ } } + @Override public InetAddress getLocalAddress() { if (sc.isOpen()) { InetSocketAddress local = sc.localAddress(); @@ -165,6 +121,7 @@ return new InetSocketAddress(0).getAddress(); } + @Override public int getPort() { InetSocketAddress remote = sc.remoteAddress(); if (remote == null) { @@ -174,6 +131,7 @@ } } + @Override public int getLocalPort() { InetSocketAddress local = sc.localAddress(); if (local == null) { @@ -183,48 +141,27 @@ } } - private class SocketInputStream - extends ChannelInputStream - { - private SocketInputStream() { - super(sc); - } - - protected int read(ByteBuffer bb) - throws IOException - { - synchronized (sc.blockingLock()) { - if (!sc.isBlocking()) - throw new IllegalBlockingModeException(); - - // no timeout - long to = SocketAdaptor.this.timeout; - if (to == 0) - return sc.read(bb); + @Override + public SocketAddress getRemoteSocketAddress() { + return sc.remoteAddress(); + } - // timed read - long timeoutNanos = NANOSECONDS.convert(to, MILLISECONDS); - for (;;) { - long startTime = System.nanoTime(); - if (sc.pollRead(to)) { - return sc.read(bb); - } - timeoutNanos -= System.nanoTime() - startTime; - if (timeoutNanos <= 0) - throw new SocketTimeoutException(); - to = MILLISECONDS.convert(timeoutNanos, NANOSECONDS); - } - } - } - - @Override - public int available() throws IOException { - return sc.available(); + @Override + public SocketAddress getLocalSocketAddress() { + InetSocketAddress local = sc.localAddress(); + if (local != null) { + return Net.getRevealedLocalAddress(local); + } else { + return null; } } - private InputStream socketInputStream = null; + @Override + public SocketChannel getChannel() { + return sc; + } + @Override public InputStream getInputStream() throws IOException { if (!sc.isOpen()) throw new SocketException("Socket is closed"); @@ -232,21 +169,35 @@ throw new SocketException("Socket is not connected"); if (!sc.isInputOpen()) throw new SocketException("Socket input is shutdown"); - if (socketInputStream == null) { - try { - socketInputStream = AccessController.doPrivileged( - new PrivilegedExceptionAction() { - public InputStream run() throws IOException { - return new SocketInputStream(); - } - }); - } catch (java.security.PrivilegedActionException e) { - throw (IOException)e.getException(); + return new InputStream() { + @Override + public int read() throws IOException { + byte[] a = new byte[1]; + int n = read(a, 0, 1); + return (n > 0) ? (a[0] & 0xff) : -1; } - } - return socketInputStream; + @Override + public int read(byte[] b, int off, int len) throws IOException { + int timeout = SocketAdaptor.this.timeout; + if (timeout > 0) { + long nanos = MILLISECONDS.toNanos(timeout); + return sc.blockingRead(b, off, len, nanos); + } else { + return sc.blockingRead(b, off, len, 0); + } + } + @Override + public int available() throws IOException { + return sc.available(); + } + @Override + public void close() throws IOException { + sc.close(); + } + }; } + @Override public OutputStream getOutputStream() throws IOException { if (!sc.isOpen()) throw new SocketException("Socket is closed"); @@ -254,18 +205,21 @@ throw new SocketException("Socket is not connected"); if (!sc.isOutputOpen()) throw new SocketException("Socket output is shutdown"); - OutputStream os = null; - try { - os = AccessController.doPrivileged( - new PrivilegedExceptionAction() { - public OutputStream run() throws IOException { - return Channels.newOutputStream(sc); - } - }); - } catch (java.security.PrivilegedActionException e) { - throw (IOException)e.getException(); - } - return os; + return new OutputStream() { + @Override + public void write(int b) throws IOException { + byte[] a = new byte[]{(byte) b}; + write(a, 0, 1); + } + @Override + public void write(byte[] b, int off, int len) throws IOException { + sc.blockingWriteFully(b, off, len); + } + @Override + public void close() throws IOException { + sc.close(); + } + }; } private void setBooleanOption(SocketOption name, boolean value) @@ -306,48 +260,62 @@ } } + @Override public void setTcpNoDelay(boolean on) throws SocketException { setBooleanOption(StandardSocketOptions.TCP_NODELAY, on); } + @Override public boolean getTcpNoDelay() throws SocketException { return getBooleanOption(StandardSocketOptions.TCP_NODELAY); } + @Override public void setSoLinger(boolean on, int linger) throws SocketException { if (!on) linger = -1; setIntOption(StandardSocketOptions.SO_LINGER, linger); } + @Override public int getSoLinger() throws SocketException { return getIntOption(StandardSocketOptions.SO_LINGER); } + @Override public void sendUrgentData(int data) throws IOException { int n = sc.sendOutOfBandData((byte) data); if (n == 0) throw new IOException("Socket buffer full"); } + @Override public void setOOBInline(boolean on) throws SocketException { setBooleanOption(ExtendedSocketOption.SO_OOBINLINE, on); } + @Override public boolean getOOBInline() throws SocketException { return getBooleanOption(ExtendedSocketOption.SO_OOBINLINE); } + @Override public void setSoTimeout(int timeout) throws SocketException { + if (!sc.isOpen()) + throw new SocketException("Socket is closed"); if (timeout < 0) - throw new IllegalArgumentException("timeout can't be negative"); + throw new IllegalArgumentException("timeout < 0"); this.timeout = timeout; } + @Override public int getSoTimeout() throws SocketException { + if (!sc.isOpen()) + throw new SocketException("Socket is closed"); return timeout; } + @Override public void setSendBufferSize(int size) throws SocketException { // size 0 valid for SocketChannel, invalid for Socket if (size <= 0) @@ -355,10 +323,12 @@ setIntOption(StandardSocketOptions.SO_SNDBUF, size); } + @Override public int getSendBufferSize() throws SocketException { return getIntOption(StandardSocketOptions.SO_SNDBUF); } + @Override public void setReceiveBufferSize(int size) throws SocketException { // size 0 valid for SocketChannel, invalid for Socket if (size <= 0) @@ -366,38 +336,47 @@ setIntOption(StandardSocketOptions.SO_RCVBUF, size); } + @Override public int getReceiveBufferSize() throws SocketException { return getIntOption(StandardSocketOptions.SO_RCVBUF); } + @Override public void setKeepAlive(boolean on) throws SocketException { setBooleanOption(StandardSocketOptions.SO_KEEPALIVE, on); } + @Override public boolean getKeepAlive() throws SocketException { return getBooleanOption(StandardSocketOptions.SO_KEEPALIVE); } + @Override public void setTrafficClass(int tc) throws SocketException { setIntOption(StandardSocketOptions.IP_TOS, tc); } + @Override public int getTrafficClass() throws SocketException { return getIntOption(StandardSocketOptions.IP_TOS); } + @Override public void setReuseAddress(boolean on) throws SocketException { setBooleanOption(StandardSocketOptions.SO_REUSEADDR, on); } + @Override public boolean getReuseAddress() throws SocketException { return getBooleanOption(StandardSocketOptions.SO_REUSEADDR); } + @Override public void close() throws IOException { sc.close(); } + @Override public void shutdownInput() throws IOException { try { sc.shutdownInput(); @@ -406,6 +385,7 @@ } } + @Override public void shutdownOutput() throws IOException { try { sc.shutdownOutput(); @@ -414,6 +394,7 @@ } } + @Override public String toString() { if (sc.isConnected()) return "Socket[addr=" + getInetAddress() + @@ -422,23 +403,44 @@ return "Socket[unconnected]"; } + @Override public boolean isConnected() { return sc.isConnected(); } + @Override public boolean isBound() { return sc.localAddress() != null; } + @Override public boolean isClosed() { return !sc.isOpen(); } + @Override public boolean isInputShutdown() { return !sc.isInputOpen(); } + @Override public boolean isOutputShutdown() { return !sc.isOutputOpen(); } + + @Override + public Socket setOption(SocketOption name, T value) throws IOException { + sc.setOption(name, value); + return this; + } + + @Override + public T getOption(SocketOption name) throws IOException { + return sc.getOption(name); + } + + @Override + public Set> supportedOptions() { + return sc.supportedOptions(); + } } diff -r b43cc3b9ef40 -r 13b67c1420b8 src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java --- a/src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java Thu Apr 25 09:12:40 2019 +0200 +++ b/src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java Thu Apr 25 10:41:49 2019 +0100 @@ -34,6 +34,7 @@ import java.net.SocketAddress; import java.net.SocketException; import java.net.SocketOption; +import java.net.SocketTimeoutException; import java.net.StandardProtocolFamily; import java.net.StandardSocketOptions; import java.nio.ByteBuffer; @@ -42,6 +43,7 @@ import java.nio.channels.AsynchronousCloseException; import java.nio.channels.ClosedChannelException; import java.nio.channels.ConnectionPendingException; +import java.nio.channels.IllegalBlockingModeException; import java.nio.channels.NoConnectionPendingException; import java.nio.channels.NotYetConnectedException; import java.nio.channels.SelectionKey; @@ -51,6 +53,7 @@ import java.util.HashSet; import java.util.Objects; import java.util.Set; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import sun.net.ConnectionResetException; @@ -81,7 +84,8 @@ // Lock held by any thread that modifies the state fields declared below // DO NOT invoke a blocking I/O operation while holding this lock! - private final Object stateLock = new Object(); + private final ReentrantLock stateLock = new ReentrantLock(); + private final Condition stateCondition = stateLock.newCondition(); // Input/Output closed private volatile boolean isInputClosed; @@ -133,8 +137,11 @@ this.fd = fd; this.fdVal = IOUtil.fdVal(fd); if (bound) { - synchronized (stateLock) { + stateLock.lock(); + try { this.localAddress = Net.localAddress(fd); + } finally { + stateLock.unlock(); } } } @@ -147,10 +154,13 @@ super(sp); this.fd = fd; this.fdVal = IOUtil.fdVal(fd); - synchronized (stateLock) { + stateLock.lock(); + try { this.localAddress = Net.localAddress(fd); this.remoteAddress = isa; this.state = ST_CONNECTED; + } finally { + stateLock.unlock(); } } @@ -187,26 +197,35 @@ @Override public Socket socket() { - synchronized (stateLock) { + stateLock.lock(); + try { if (socket == null) socket = SocketAdaptor.create(this); return socket; + } finally { + stateLock.unlock(); } } @Override public SocketAddress getLocalAddress() throws IOException { - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); return Net.getRevealedLocalAddress(localAddress); + } finally { + stateLock.unlock(); } } @Override public SocketAddress getRemoteAddress() throws IOException { - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); return remoteAddress; + } finally { + stateLock.unlock(); } } @@ -218,7 +237,8 @@ if (!supportedOptions().contains(name)) throw new UnsupportedOperationException("'" + name + "' not supported"); - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); if (name == StandardSocketOptions.IP_TOS) { @@ -237,6 +257,8 @@ // no options that require special handling Net.setSocketOption(fd, name, value); return this; + } finally { + stateLock.unlock(); } } @@ -249,7 +271,8 @@ if (!supportedOptions().contains(name)) throw new UnsupportedOperationException("'" + name + "' not supported"); - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) { @@ -266,6 +289,8 @@ // no options that require special handling return (T) Net.getSocketOption(fd, name); + } finally { + stateLock.unlock(); } } @@ -307,10 +332,13 @@ // set hook for Thread.interrupt begin(); - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpenAndConnected(); // record thread so it can be signalled if needed readerThread = NativeThread.current(); + } finally { + stateLock.unlock(); } } else { ensureOpenAndConnected(); @@ -327,12 +355,15 @@ throws AsynchronousCloseException { if (blocking) { - synchronized (stateLock) { + stateLock.lock(); + try { readerThread = 0; // notify any thread waiting in implCloseSelectableChannel if (state == ST_CLOSING) { - stateLock.notifyAll(); + stateCondition.signalAll(); } + } finally { + stateLock.unlock(); } // remove hook for Thread.interrupt end(completed); @@ -362,12 +393,12 @@ if (isInputClosed) return IOStatus.EOF; + n = IOUtil.read(fd, buf, -1, nd); if (blocking) { - do { + while (IOStatus.okayToRetry(n) && isOpen()) { + park(Net.POLLIN); n = IOUtil.read(fd, buf, -1, nd); - } while (n == IOStatus.INTERRUPTED && isOpen()); - } else { - n = IOUtil.read(fd, buf, -1, nd); + } } } catch (ConnectionResetException e) { connectionReset = true; @@ -404,12 +435,12 @@ if (isInputClosed) return IOStatus.EOF; + n = IOUtil.read(fd, dsts, offset, length, nd); if (blocking) { - do { + while (IOStatus.okayToRetry(n) && isOpen()) { + park(Net.POLLIN); n = IOUtil.read(fd, dsts, offset, length, nd); - } while (n == IOStatus.INTERRUPTED && isOpen()); - } else { - n = IOUtil.read(fd, dsts, offset, length, nd); + } } } catch (ConnectionResetException e) { connectionReset = true; @@ -436,12 +467,15 @@ // set hook for Thread.interrupt begin(); - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpenAndConnected(); if (isOutputClosed) throw new ClosedChannelException(); // record thread so it can be signalled if needed writerThread = NativeThread.current(); + } finally { + stateLock.unlock(); } } else { ensureOpenAndConnected(); @@ -458,12 +492,15 @@ throws AsynchronousCloseException { if (blocking) { - synchronized (stateLock) { + stateLock.lock(); + try { writerThread = 0; // notify any thread waiting in implCloseSelectableChannel if (state == ST_CLOSING) { - stateLock.notifyAll(); + stateCondition.signalAll(); } + } finally { + stateLock.unlock(); } // remove hook for Thread.interrupt end(completed); @@ -480,12 +517,12 @@ int n = 0; try { beginWrite(blocking); + n = IOUtil.write(fd, buf, -1, nd); if (blocking) { - do { + while (IOStatus.okayToRetry(n) && isOpen()) { + park(Net.POLLOUT); n = IOUtil.write(fd, buf, -1, nd); - } while (n == IOStatus.INTERRUPTED && isOpen()); - } else { - n = IOUtil.write(fd, buf, -1, nd); + } } } finally { endWrite(blocking, n > 0); @@ -510,12 +547,12 @@ long n = 0; try { beginWrite(blocking); + n = IOUtil.write(fd, srcs, offset, length, nd); if (blocking) { - do { + while (IOStatus.okayToRetry(n) && isOpen()) { + park(Net.POLLOUT); n = IOUtil.write(fd, srcs, offset, length, nd); - } while (n == IOStatus.INTERRUPTED && isOpen()); - } else { - n = IOUtil.write(fd, srcs, offset, length, nd); + } } } finally { endWrite(blocking, n > 0); @@ -562,10 +599,7 @@ try { writeLock.lock(); try { - synchronized (stateLock) { - ensureOpen(); - IOUtil.configureBlocking(fd, block); - } + lockedConfigureBlocking(block); } finally { writeLock.unlock(); } @@ -575,11 +609,28 @@ } /** + * Adjust the blocking mode while holding the readLock or writeLock. + */ + private void lockedConfigureBlocking(boolean block) throws IOException { + assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); + stateLock.lock(); + try { + ensureOpen(); + IOUtil.configureBlocking(fd, block); + } finally { + stateLock.unlock(); + } + } + + /** * Returns the local address, or null if not bound */ InetSocketAddress localAddress() { - synchronized (stateLock) { + stateLock.lock(); + try { return localAddress; + } finally { + stateLock.unlock(); } } @@ -587,8 +638,11 @@ * Returns the remote address, or null if not connected */ InetSocketAddress remoteAddress() { - synchronized (stateLock) { + stateLock.lock(); + try { return remoteAddress; + } finally { + stateLock.unlock(); } } @@ -598,7 +652,8 @@ try { writeLock.lock(); try { - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); if (state == ST_CONNECTIONPENDING) throw new ConnectionPendingException(); @@ -613,6 +668,8 @@ NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort()); Net.bind(fd, isa.getAddress(), isa.getPort()); localAddress = Net.localAddress(fd); + } finally { + stateLock.unlock(); } } finally { writeLock.unlock(); @@ -649,7 +706,8 @@ // set hook for Thread.interrupt begin(); } - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); int state = this.state; if (state == ST_CONNECTED) @@ -667,6 +725,8 @@ // record thread so it can be signalled if needed readerThread = NativeThread.current(); } + } finally { + stateLock.unlock(); } } @@ -683,43 +743,62 @@ endRead(blocking, completed); if (completed) { - synchronized (stateLock) { + stateLock.lock(); + try { if (state == ST_CONNECTIONPENDING) { localAddress = Net.localAddress(fd); state = ST_CONNECTED; } + } finally { + stateLock.unlock(); } } } + /** + * Checks the remote address to which this channel is to be connected. + */ + private InetSocketAddress checkRemote(SocketAddress sa) throws IOException { + InetSocketAddress isa = Net.checkAddress(sa); + SecurityManager sm = System.getSecurityManager(); + if (sm != null) { + sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort()); + } + if (isa.getAddress().isAnyLocalAddress()) { + return new InetSocketAddress(InetAddress.getLocalHost(), isa.getPort()); + } else { + return isa; + } + } + @Override - public boolean connect(SocketAddress sa) throws IOException { - InetSocketAddress isa = Net.checkAddress(sa); - SecurityManager sm = System.getSecurityManager(); - if (sm != null) - sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort()); - - InetAddress ia = isa.getAddress(); - if (ia.isAnyLocalAddress()) - ia = InetAddress.getLocalHost(); - + public boolean connect(SocketAddress remote) throws IOException { + InetSocketAddress isa = checkRemote(remote); try { readLock.lock(); try { writeLock.lock(); try { - int n = 0; boolean blocking = isBlocking(); + boolean connected = false; try { beginConnect(blocking, isa); - do { - n = Net.connect(fd, ia, isa.getPort()); - } while (n == IOStatus.INTERRUPTED && isOpen()); + int n = Net.connect(fd, isa.getAddress(), isa.getPort()); + if (n > 0) { + connected = true; + } else if (blocking) { + assert IOStatus.okayToRetry(n); + boolean polled = false; + while (!polled && isOpen()) { + park(Net.POLLOUT); + polled = Net.pollConnectNow(fd); + } + connected = polled && isOpen(); + } } finally { - endConnect(blocking, (n > 0)); + endConnect(blocking, connected); } - assert IOStatus.check(n); - return n > 0; + return connected; } finally { writeLock.unlock(); } @@ -744,7 +823,8 @@ // set hook for Thread.interrupt begin(); } - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); if (state != ST_CONNECTIONPENDING) throw new NoConnectionPendingException(); @@ -752,6 +832,8 @@ // record thread so it can be signalled if needed readerThread = NativeThread.current(); } + } finally { + stateLock.unlock(); } } @@ -768,11 +850,14 @@ endRead(blocking, completed); if (completed) { - synchronized (stateLock) { + stateLock.lock(); + try { if (state == ST_CONNECTIONPENDING) { localAddress = Net.localAddress(fd); state = ST_CONNECTED; } + } finally { + stateLock.unlock(); } } } @@ -792,13 +877,14 @@ boolean connected = false; try { beginFinishConnect(blocking); + boolean polled = Net.pollConnectNow(fd); if (blocking) { - do { - connected = Net.pollConnect(fd, -1); - } while (!connected && isOpen()); - } else { - connected = Net.pollConnect(fd, 0); + while (!polled && isOpen()) { + park(Net.POLLOUT); + polled = Net.pollConnectNow(fd); + } } + connected = polled && isOpen(); } finally { endFinishConnect(blocking, connected); } @@ -843,16 +929,20 @@ boolean interrupted = false; // set state to ST_CLOSING - synchronized (stateLock) { + stateLock.lock(); + try { assert state < ST_CLOSING; blocking = isBlocking(); connected = (state == ST_CONNECTED); state = ST_CLOSING; + } finally { + stateLock.unlock(); } // wait for any outstanding I/O operations to complete if (blocking) { - synchronized (stateLock) { + stateLock.lock(); + try { assert state == ST_CLOSING; long reader = readerThread; long writer = writerThread; @@ -868,12 +958,14 @@ // wait for blocking I/O operations to end while (readerThread != 0 || writerThread != 0) { try { - stateLock.wait(); + stateCondition.await(); } catch (InterruptedException e) { interrupted = true; } } } + } finally { + stateLock.unlock(); } } else { // non-blocking mode: wait for read/write to complete @@ -887,7 +979,8 @@ } // set state to ST_KILLPENDING - synchronized (stateLock) { + stateLock.lock(); + try { assert state == ST_CLOSING; // if connected and the channel is registered with a Selector then // shutdown the output if possible so that the peer reads EOF. If @@ -908,6 +1001,8 @@ } catch (IOException ignore) { } } state = ST_KILLPENDING; + } finally { + stateLock.unlock(); } // close socket if not registered with Selector @@ -921,17 +1016,21 @@ @Override public void kill() throws IOException { - synchronized (stateLock) { + stateLock.lock(); + try { if (state == ST_KILLPENDING) { state = ST_KILLED; nd.close(fd); } + } finally { + stateLock.unlock(); } } @Override public SocketChannel shutdownInput() throws IOException { - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); if (!isConnected()) throw new NotYetConnectedException(); @@ -943,12 +1042,15 @@ isInputClosed = true; } return this; + } finally { + stateLock.unlock(); } } @Override public SocketChannel shutdownOutput() throws IOException { - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpen(); if (!isConnected()) throw new NotYetConnectedException(); @@ -960,6 +1062,8 @@ isOutputClosed = true; } return this; + } finally { + stateLock.unlock(); } } @@ -972,58 +1076,223 @@ } /** - * Poll this channel's socket for reading up to the given timeout. - * @return {@code true} if the socket is polled + * Waits for a connection attempt to finish with a timeout + * @throws SocketTimeoutException if the connect timeout elapses + */ + private boolean finishTimedConnect(long nanos) throws IOException { + long startNanos = System.nanoTime(); + boolean polled = Net.pollConnectNow(fd); + while (!polled && isOpen()) { + long remainingNanos = nanos - (System.nanoTime() - startNanos); + if (remainingNanos <= 0) { + throw new SocketTimeoutException("Connect timed out"); + } + park(Net.POLLOUT, remainingNanos); + polled = Net.pollConnectNow(fd); + } + return polled && isOpen(); + } + + /** + * Attempts to establish a connection to the given socket address with a + * timeout. Closes the socket if connection cannot be established. + * + * @apiNote This method is for use by the socket adaptor. + * + * @throws IllegalBlockingModeException if the channel is non-blocking + * @throws SocketTimeoutException if the read timeout elapses */ - boolean pollRead(long timeout) throws IOException { - boolean blocking = isBlocking(); - assert Thread.holdsLock(blockingLock()) && blocking; + void blockingConnect(SocketAddress remote, long nanos) throws IOException { + InetSocketAddress isa = checkRemote(remote); + try { + readLock.lock(); + try { + writeLock.lock(); + try { + if (!isBlocking()) + throw new IllegalBlockingModeException(); + boolean connected = false; + try { + beginConnect(true, isa); + // change socket to non-blocking + lockedConfigureBlocking(false); + try { + int n = Net.connect(fd, isa.getAddress(), isa.getPort()); + connected = (n > 0) ? true : finishTimedConnect(nanos); + } finally { + // restore socket to blocking mode + lockedConfigureBlocking(true); + } + } finally { + endConnect(true, connected); + } + } finally { + writeLock.unlock(); + } + } finally { + readLock.unlock(); + } + } catch (IOException ioe) { + // connect failed, close the channel + close(); + throw SocketExceptions.of(ioe, isa); + } + } + + /** + * Attempts to read bytes from the socket into the given byte array. + */ + private int tryRead(byte[] b, int off, int len) throws IOException { + ByteBuffer dst = Util.getTemporaryDirectBuffer(len); + assert dst.position() == 0; + try { + int n = nd.read(fd, ((DirectBuffer)dst).address(), len); + if (n > 0) { + dst.get(b, off, n); + } + return n; + } finally{ + Util.offerFirstTemporaryDirectBuffer(dst); + } + } + + /** + * Reads bytes from the socket into the given byte array with a timeout. + * @throws SocketTimeoutException if the read timeout elapses + */ + private int timedRead(byte[] b, int off, int len, long nanos) throws IOException { + long startNanos = System.nanoTime(); + int n = tryRead(b, off, len); + while (n == IOStatus.UNAVAILABLE && isOpen()) { + long remainingNanos = nanos - (System.nanoTime() - startNanos); + if (remainingNanos <= 0) { + throw new SocketTimeoutException("Read timed out"); + } + park(Net.POLLIN, remainingNanos); + n = tryRead(b, off, len); + } + return n; + } + + /** + * Reads bytes from the socket into the given byte array. + * + * @apiNote This method is for use by the socket adaptor. + * + * @throws IllegalBlockingModeException if the channel is non-blocking + * @throws SocketTimeoutException if the read timeout elapses + */ + int blockingRead(byte[] b, int off, int len, long nanos) throws IOException { + Objects.checkFromIndexSize(off, len, b.length); + if (len == 0) { + // nothing to do + return 0; + } readLock.lock(); try { - boolean polled = false; + // check that channel is configured blocking + if (!isBlocking()) + throw new IllegalBlockingModeException(); + + int n = 0; try { - beginRead(blocking); - int events = Net.poll(fd, Net.POLLIN, timeout); - polled = (events != 0); + beginRead(true); + + // check if connection has been reset + if (connectionReset) + throwConnectionReset(); + + // check if input is shutdown + if (isInputClosed) + return IOStatus.EOF; + + if (nanos > 0) { + // change socket to non-blocking + lockedConfigureBlocking(false); + try { + n = timedRead(b, off, len, nanos); + } finally { + // restore socket to blocking mode + lockedConfigureBlocking(true); + } + } else { + // read, no timeout + n = tryRead(b, off, len); + while (IOStatus.okayToRetry(n) && isOpen()) { + park(Net.POLLIN); + n = tryRead(b, off, len); + } + } + } catch (ConnectionResetException e) { + connectionReset = true; + throwConnectionReset(); } finally { - endRead(blocking, polled); + endRead(true, n > 0); + if (n <= 0 && isInputClosed) + return IOStatus.EOF; } - return polled; + assert n > 0 || n == -1; + return n; } finally { readLock.unlock(); } } /** - * Poll this channel's socket for a connection, up to the given timeout. - * @return {@code true} if the socket is polled + * Attempts to write a sequence of bytes to the socket from the given + * byte array. */ - boolean pollConnected(long timeout) throws IOException { - boolean blocking = isBlocking(); - assert Thread.holdsLock(blockingLock()) && blocking; + private int tryWrite(byte[] b, int off, int len) throws IOException { + ByteBuffer src = Util.getTemporaryDirectBuffer(len); + assert src.position() == 0; + try { + src.put(b, off, len); + return nd.write(fd, ((DirectBuffer)src).address(), len); + } finally { + Util.offerFirstTemporaryDirectBuffer(src); + } + } - readLock.lock(); + /** + * Writes a sequence of bytes to the socket from the given byte array. + * + * @apiNote This method is for use by the socket adaptor. + */ + void blockingWriteFully(byte[] b, int off, int len) throws IOException { + Objects.checkFromIndexSize(off, len, b.length); + if (len == 0) { + // nothing to do + return; + } + + writeLock.lock(); try { - writeLock.lock(); + // check that channel is configured blocking + if (!isBlocking()) + throw new IllegalBlockingModeException(); + + // loop until all bytes have been written + int pos = off; + int end = off + len; + beginWrite(true); try { - boolean polled = false; - try { - beginFinishConnect(blocking); - int events = Net.poll(fd, Net.POLLCONN, timeout); - polled = (events != 0); - } finally { - // invoke endFinishConnect with completed = false so that - // the state is not changed to ST_CONNECTED. The socket - // adaptor will use finishConnect to finish. - endFinishConnect(blocking, /*completed*/false); + while (pos < end && isOpen()) { + int size = end - pos; + int n = tryWrite(b, pos, size); + while (IOStatus.okayToRetry(n) && isOpen()) { + park(Net.POLLOUT); + n = tryWrite(b, pos, size); + } + if (n > 0) { + pos += n; + } } - return polled; } finally { - writeLock.unlock(); + endWrite(true, pos >= end); } } finally { - readLock.unlock(); + writeLock.unlock(); } } @@ -1031,13 +1300,16 @@ * Return the number of bytes in the socket input buffer. */ int available() throws IOException { - synchronized (stateLock) { + stateLock.lock(); + try { ensureOpenAndConnected(); if (isInputClosed) { return 0; } else { return Net.available(fd); } + } finally { + stateLock.unlock(); } } @@ -1117,7 +1389,8 @@ if (!isOpen()) sb.append("closed"); else { - synchronized (stateLock) { + stateLock.lock(); + try { switch (state) { case ST_UNCONNECTED: sb.append("unconnected"); @@ -1142,6 +1415,8 @@ sb.append(" remote="); sb.append(remoteAddress().toString()); } + } finally { + stateLock.unlock(); } } sb.append(']'); diff -r b43cc3b9ef40 -r 13b67c1420b8 src/java.base/unix/classes/sun/nio/ch/SinkChannelImpl.java --- a/src/java.base/unix/classes/sun/nio/ch/SinkChannelImpl.java Thu Apr 25 09:12:40 2019 +0200 +++ b/src/java.base/unix/classes/sun/nio/ch/SinkChannelImpl.java Thu Apr 25 10:41:49 2019 +0100 @@ -35,6 +35,7 @@ import java.nio.channels.SelectionKey; import java.nio.channels.spi.SelectorProvider; import java.util.Objects; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; class SinkChannelImpl @@ -53,7 +54,8 @@ // Lock held by any thread that modifies the state fields declared below // DO NOT invoke a blocking I/O operation while holding this lock! - private final Object stateLock = new Object(); + private final ReentrantLock stateLock = new ReentrantLock(); + private final Condition stateCondition = stateLock.newCondition(); // -- The following fields are protected by stateLock @@ -95,15 +97,19 @@ boolean blocking; // set state to ST_CLOSING - synchronized (stateLock) { + stateLock.lock(); + try { assert state < ST_CLOSING; state = ST_CLOSING; blocking = isBlocking(); + } finally { + stateLock.unlock(); } // wait for any outstanding write to complete if (blocking) { - synchronized (stateLock) { + stateLock.lock(); + try { assert state == ST_CLOSING; long th = thread; if (th != 0) { @@ -113,12 +119,14 @@ // wait for write operation to end while (thread != 0) { try { - stateLock.wait(); + stateCondition.await(); } catch (InterruptedException e) { interrupted = true; } } } + } finally { + stateLock.unlock(); } } else { // non-blocking mode: wait for write to complete @@ -127,9 +135,12 @@ } // set state to ST_KILLPENDING - synchronized (stateLock) { + stateLock.lock(); + try { assert state == ST_CLOSING; state = ST_KILLPENDING; + } finally { + stateLock.unlock(); } // close socket if not registered with Selector @@ -143,12 +154,15 @@ @Override public void kill() throws IOException { - synchronized (stateLock) { + stateLock.lock(); + try { assert thread == 0; if (state == ST_KILLPENDING) { state = ST_KILLED; nd.close(fd); } + } finally { + stateLock.unlock(); } } @@ -156,8 +170,11 @@ protected void implConfigureBlocking(boolean block) throws IOException { writeLock.lock(); try { - synchronized (stateLock) { + stateLock.lock(); + try { IOUtil.configureBlocking(fd, block); + } finally { + stateLock.unlock(); } } finally { writeLock.unlock(); @@ -212,11 +229,14 @@ // set hook for Thread.interrupt begin(); } - synchronized (stateLock) { + stateLock.lock(); + try { if (!isOpen()) throw new ClosedChannelException(); if (blocking) thread = NativeThread.current(); + } finally { + stateLock.unlock(); } } @@ -230,12 +250,15 @@ throws AsynchronousCloseException { if (blocking) { - synchronized (stateLock) { + stateLock.lock(); + try { thread = 0; // notify any thread waiting in implCloseSelectableChannel if (state == ST_CLOSING) { - stateLock.notifyAll(); + stateCondition.signalAll(); } + } finally { + stateLock.unlock(); } // remove hook for Thread.interrupt end(completed); @@ -252,9 +275,13 @@ int n = 0; try { beginWrite(blocking); - do { - n = IOUtil.write(fd, src, -1, nd); - } while ((n == IOStatus.INTERRUPTED) && isOpen()); + n = IOUtil.write(fd, src, -1, nd); + if (blocking) { + while (IOStatus.okayToRetry(n) && isOpen()) { + park(Net.POLLOUT); + n = IOUtil.write(fd, src, -1, nd); + } + } } finally { endWrite(blocking, n > 0); assert IOStatus.check(n); @@ -275,9 +302,13 @@ long n = 0; try { beginWrite(blocking); - do { - n = IOUtil.write(fd, srcs, offset, length, nd); - } while ((n == IOStatus.INTERRUPTED) && isOpen()); + n = IOUtil.write(fd, srcs, offset, length, nd); + if (blocking) { + while (IOStatus.okayToRetry(n) && isOpen()) { + park(Net.POLLOUT); + n = IOUtil.write(fd, srcs, offset, length, nd); + } + } } finally { endWrite(blocking, n > 0); assert IOStatus.check(n); diff -r b43cc3b9ef40 -r 13b67c1420b8 src/java.base/unix/classes/sun/nio/ch/SourceChannelImpl.java --- a/src/java.base/unix/classes/sun/nio/ch/SourceChannelImpl.java Thu Apr 25 09:12:40 2019 +0200 +++ b/src/java.base/unix/classes/sun/nio/ch/SourceChannelImpl.java Thu Apr 25 10:41:49 2019 +0100 @@ -35,6 +35,7 @@ import java.nio.channels.SelectionKey; import java.nio.channels.spi.SelectorProvider; import java.util.Objects; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; class SourceChannelImpl @@ -53,7 +54,8 @@ // Lock held by any thread that modifies the state fields declared below // DO NOT invoke a blocking I/O operation while holding this lock! - private final Object stateLock = new Object(); + private final ReentrantLock stateLock = new ReentrantLock(); + private final Condition stateCondition = stateLock.newCondition(); // -- The following fields are protected by stateLock @@ -95,15 +97,19 @@ boolean blocking; // set state to ST_CLOSING - synchronized (stateLock) { + stateLock.lock(); + try { assert state < ST_CLOSING; state = ST_CLOSING; blocking = isBlocking(); + } finally { + stateLock.unlock(); } // wait for any outstanding read to complete if (blocking) { - synchronized (stateLock) { + stateLock.lock(); + try { assert state == ST_CLOSING; long th = thread; if (th != 0) { @@ -113,12 +119,14 @@ // wait for read operation to end while (thread != 0) { try { - stateLock.wait(); + stateCondition.await(); } catch (InterruptedException e) { interrupted = true; } } } + } finally { + stateLock.unlock(); } } else { // non-blocking mode: wait for read to complete @@ -127,9 +135,12 @@ } // set state to ST_KILLPENDING - synchronized (stateLock) { + stateLock.lock(); + try { assert state == ST_CLOSING; state = ST_KILLPENDING; + } finally { + stateLock.unlock(); } // close socket if not registered with Selector @@ -143,12 +154,15 @@ @Override public void kill() throws IOException { - synchronized (stateLock) { + stateLock.lock(); + try { assert thread == 0; if (state == ST_KILLPENDING) { state = ST_KILLED; nd.close(fd); } + } finally { + stateLock.unlock(); } } @@ -156,8 +170,11 @@ protected void implConfigureBlocking(boolean block) throws IOException { readLock.lock(); try { - synchronized (stateLock) { + stateLock.lock(); + try { IOUtil.configureBlocking(fd, block); + } finally { + stateLock.unlock(); } } finally { readLock.unlock(); @@ -212,11 +229,14 @@ // set hook for Thread.interrupt begin(); } - synchronized (stateLock) { + stateLock.lock(); + try { if (!isOpen()) throw new ClosedChannelException(); if (blocking) thread = NativeThread.current(); + } finally { + stateLock.unlock(); } } @@ -230,12 +250,15 @@ throws AsynchronousCloseException { if (blocking) { - synchronized (stateLock) { + stateLock.lock(); + try { thread = 0; // notify any thread waiting in implCloseSelectableChannel if (state == ST_CLOSING) { - stateLock.notifyAll(); + stateCondition.signalAll(); } + } finally { + stateLock.unlock(); } // remove hook for Thread.interrupt end(completed); @@ -252,9 +275,13 @@ int n = 0; try { beginRead(blocking); - do { - n = IOUtil.read(fd, dst, -1, nd); - } while ((n == IOStatus.INTERRUPTED) && isOpen()); + n = IOUtil.read(fd, dst, -1, nd); + if (blocking) { + while (IOStatus.okayToRetry(n) && isOpen()) { + park(Net.POLLIN); + n = IOUtil.read(fd, dst, -1, nd); + } + } } finally { endRead(blocking, n > 0); assert IOStatus.check(n); @@ -275,9 +302,13 @@ long n = 0; try { beginRead(blocking); - do { - n = IOUtil.read(fd, dsts, offset, length, nd); - } while ((n == IOStatus.INTERRUPTED) && isOpen()); + n = IOUtil.read(fd, dsts, offset, length, nd); + if (blocking) { + while (IOStatus.okayToRetry(n) && isOpen()) { + park(Net.POLLIN); + n = IOUtil.read(fd, dsts, offset, length, nd); + } + } } finally { endRead(blocking, n > 0); assert IOStatus.check(n); diff -r b43cc3b9ef40 -r 13b67c1420b8 test/jdk/java/nio/channels/SocketChannel/AdaptorStreams.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test/jdk/java/nio/channels/SocketChannel/AdaptorStreams.java Thu Apr 25 10:41:49 2019 +0100 @@ -0,0 +1,516 @@ +/* + * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +/* @test + * @bug 8222774 4430139 + * @run testng AdaptorStreams + * @summary Exercise socket adaptor input/output streams + */ + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.nio.channels.IllegalBlockingModeException; +import java.nio.channels.SocketChannel; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.testng.annotations.Test; +import static org.testng.Assert.*; + +@Test +public class AdaptorStreams { + + /** + * Test read when bytes are available + */ + public void testRead1() throws Exception { + withConnection((sc, peer) -> { + peer.getOutputStream().write(99); + int n = sc.socket().getInputStream().read(); + assertTrue(n == 99); + }); + } + + /** + * Test read blocking before bytes are available + */ + public void testRead2() throws Exception { + withConnection((sc, peer) -> { + scheduleWrite(peer.getOutputStream(), 99, 1000); + int n = sc.socket().getInputStream().read(); + assertTrue(n == 99); + }); + } + + /** + * Test read when peer has closed connection + */ + public void testRead3() throws Exception { + withConnection((sc, peer) -> { + peer.close(); + int n = sc.socket().getInputStream().read(); + assertTrue(n == -1); + }); + } + + /** + * Test read blocking before peer closes connection + */ + public void testRead4() throws Exception { + withConnection((sc, peer) -> { + scheduleClose(peer, 1000); + int n = sc.socket().getInputStream().read(); + assertTrue(n == -1); + }); + } + + /** + * Test async close of socket when thread blocked in read + */ + public void testRead5() throws Exception { + withConnection((sc, peer) -> { + scheduleClose(sc, 2000); + InputStream in = sc.socket().getInputStream(); + expectThrows(IOException.class, () -> in.read()); + }); + } + + /** + * Test interrupt status set before read + */ + public void testRead6() throws Exception { + withConnection((sc, peer) -> { + Socket s = sc.socket(); + Thread.currentThread().interrupt(); + try { + InputStream in = s.getInputStream(); + expectThrows(IOException.class, () -> in.read()); + } finally { + Thread.interrupted(); // clear interrupt + } + assertTrue(s.isClosed()); + }); + } + + /** + * Test interrupt of thread blocked in read + */ + public void testRead7() throws Exception { + withConnection((sc, peer) -> { + Future interrupter = scheduleInterrupt(Thread.currentThread(), 2000); + Socket s = sc.socket(); + try { + InputStream in = s.getInputStream(); + expectThrows(IOException.class, () -> in.read()); + } finally { + interrupter.cancel(true); + Thread.interrupted(); // clear interrupt + } + assertTrue(s.isClosed()); + }); + } + + /** + * Test read when channel is configured non-blocking + */ + public void testRead8() throws Exception { + withConnection((sc, peer) -> { + sc.configureBlocking(false); + InputStream in = sc.socket().getInputStream(); + expectThrows(IllegalBlockingModeException.class, () -> in.read()); + }); + } + + /** + * Test timed read when bytes are available + */ + public void testTimedRead1() throws Exception { + withConnection((sc, peer) -> { + peer.getOutputStream().write(99); + Socket s = sc.socket(); + s.setSoTimeout(1000); + int n = s.getInputStream().read(); + assertTrue(n == 99); + }); + } + + /** + * Test timed read blocking before bytes are available + */ + public void testTimedRead2() throws Exception { + withConnection((sc, peer) -> { + scheduleWrite(peer.getOutputStream(), 99, 1000); + Socket s = sc.socket(); + s.setSoTimeout(5000); + int n = s.getInputStream().read(); + assertTrue(n == 99); + }); + } + + /** + * Test timed read when the read times out + */ + public void testTimedRead3() throws Exception { + withConnection((sc, peer) -> { + Socket s = sc.socket(); + s.setSoTimeout(1000); + InputStream in = s.getInputStream(); + expectThrows(SocketTimeoutException.class, () -> in.read()); + }); + } + + /** + * Test async close of socket when thread blocked in timed read + */ + public void testTimedRead4() throws Exception { + withConnection((sc, peer) -> { + scheduleClose(sc, 2000); + Socket s = sc.socket(); + s.setSoTimeout(60*1000); + InputStream in = s.getInputStream(); + expectThrows(IOException.class, () -> in.read()); + }); + } + + /** + * Test interrupt status set before timed read + */ + public void testTimedRead5() throws Exception { + withConnection((sc, peer) -> { + Socket s = sc.socket(); + Thread.currentThread().interrupt(); + try { + s.setSoTimeout(60*1000); + InputStream in = s.getInputStream(); + expectThrows(IOException.class, () -> in.read()); + } finally { + Thread.interrupted(); // clear interrupt + } + assertTrue(s.isClosed()); + }); + } + + /** + * Test interrupt of thread blocked in timed read + */ + public void testTimedRead6() throws Exception { + withConnection((sc, peer) -> { + Future interrupter = scheduleInterrupt(Thread.currentThread(), 2000); + Socket s = sc.socket(); + try { + s.setSoTimeout(60*1000); + InputStream in = s.getInputStream(); + expectThrows(IOException.class, () -> in.read()); + assertTrue(s.isClosed()); + } finally { + interrupter.cancel(true); + Thread.interrupted(); // clear interrupt + } + assertTrue(s.isClosed()); + }); + } + + /** + * Test async close of socket when thread blocked in write + */ + public void testWrite1() throws Exception { + withConnection((sc, peer) -> { + scheduleClose(sc, 2000); + expectThrows(IOException.class, () -> { + OutputStream out = sc.socket().getOutputStream(); + byte[] data = new byte[64*1000]; + while (true) { + out.write(data); + } + }); + }); + } + + /** + * Test interrupt status set before write + */ + public void testWrite2() throws Exception { + withConnection((sc, peer) -> { + Socket s = sc.socket(); + Thread.currentThread().interrupt(); + try { + OutputStream out = s.getOutputStream(); + expectThrows(IOException.class, () -> out.write(99)); + } finally { + Thread.interrupted(); // clear interrupt + } + assertTrue(s.isClosed()); + }); + } + + /** + * Test interrupt of thread blocked in write + */ + public void testWrite3() throws Exception { + withConnection((sc, peer) -> { + Future interrupter = scheduleInterrupt(Thread.currentThread(), 2000); + Socket s = sc.socket(); + try { + expectThrows(IOException.class, () -> { + OutputStream out = sc.socket().getOutputStream(); + byte[] data = new byte[64*1000]; + while (true) { + out.write(data); + } + }); + } finally { + interrupter.cancel(true); + Thread.interrupted(); // clear interrupt + } + assertTrue(s.isClosed()); + }); + } + + /** + * Test write when channel is configured non-blocking + */ + public void testWrite4() throws Exception { + withConnection((sc, peer) -> { + sc.configureBlocking(false); + OutputStream out = sc.socket().getOutputStream(); + expectThrows(IllegalBlockingModeException.class, () -> out.write(99)); + }); + } + + /** + * Test read when there are bytes available and another thread is blocked + * in write + */ + public void testConcurrentReadWrite1() throws Exception { + withConnection((sc, peer) -> { + Socket s = sc.socket(); + + // block thread in write + execute(() -> { + var data = new byte[64*1024]; + OutputStream out = s.getOutputStream(); + for (;;) { + out.write(data); + } + }); + Thread.sleep(1000); // give writer time to block + + // test read when bytes are available + peer.getOutputStream().write(99); + int n = s.getInputStream().read(); + assertTrue(n == 99); + }); + } + + /** + * Test read blocking when another thread is blocked in write + */ + public void testConcurrentReadWrite2() throws Exception { + withConnection((sc, peer) -> { + Socket s = sc.socket(); + + // block thread in write + execute(() -> { + var data = new byte[64*1024]; + OutputStream out = s.getOutputStream(); + for (;;) { + out.write(data); + } + }); + Thread.sleep(1000); // give writer time to block + + // test read blocking until bytes are available + scheduleWrite(peer.getOutputStream(), 99, 500); + int n = s.getInputStream().read(); + assertTrue(n == 99); + }); + } + + /** + * Test writing when another thread is blocked in read + */ + public void testConcurrentReadWrite3() throws Exception { + withConnection((sc, peer) -> { + Socket s = sc.socket(); + + // block thread in read + execute(() -> { + s.getInputStream().read(); + }); + Thread.sleep(100); // give reader time to block + + // test write + s.getOutputStream().write(99); + int n = peer.getInputStream().read(); + assertTrue(n == 99); + }); + } + + /** + * Test timed read when there are bytes available and another thread is + * blocked in write + */ + public void testConcurrentTimedReadWrite1() throws Exception { + withConnection((sc, peer) -> { + Socket s = sc.socket(); + + // block thread in write + execute(() -> { + var data = new byte[64*1024]; + OutputStream out = s.getOutputStream(); + for (;;) { + out.write(data); + } + }); + Thread.sleep(1000); // give writer time to block + + // test read when bytes are available + peer.getOutputStream().write(99); + s.setSoTimeout(60*1000); + int n = s.getInputStream().read(); + assertTrue(n == 99); + }); + } + + /** + * Test timed read blocking when another thread is blocked in write + */ + public void testConcurrentTimedReadWrite2() throws Exception { + withConnection((sc, peer) -> { + Socket s = sc.socket(); + + // block thread in write + execute(() -> { + var data = new byte[64*1024]; + OutputStream out = s.getOutputStream(); + for (;;) { + out.write(data); + } + }); + Thread.sleep(1000); // give writer time to block + + // test read blocking until bytes are available + scheduleWrite(peer.getOutputStream(), 99, 500); + s.setSoTimeout(60*1000); + int n = s.getInputStream().read(); + assertTrue(n == 99); + }); + } + + /** + * Test writing when another thread is blocked in read + */ + public void testConcurrentTimedReadWrite3() throws Exception { + withConnection((sc, peer) -> { + Socket s = sc.socket(); + + // block thread in read + execute(() -> { + s.setSoTimeout(60*1000); + s.getInputStream().read(); + }); + Thread.sleep(100); // give reader time to block + + // test write + s.getOutputStream().write(99); + int n = peer.getInputStream().read(); + assertTrue(n == 99); + }); + } + + // -- test infrastructure -- + + interface ThrowingTask { + void run() throws Exception; + } + + interface ThrowingBiConsumer { + void accept(T t, U u) throws Exception; + } + + /** + * Invokes the consumer with a connected pair of socket channel and socket + */ + static void withConnection(ThrowingBiConsumer consumer) + throws Exception + { + try (ServerSocket ss = new ServerSocket(0); + SocketChannel sc = SocketChannel.open(ss.getLocalSocketAddress()); + Socket peer = ss.accept()) { + consumer.accept(sc, peer); + } + } + + static Future scheduleWrite(OutputStream out, byte[] data, long delay) { + return schedule(() -> { + try { + out.write(data); + } catch (IOException ioe) { } + }, delay); + } + + static Future scheduleWrite(OutputStream out, int b, long delay) { + return scheduleWrite(out, new byte[] { (byte)b }, delay); + } + + static Future scheduleClose(Closeable c, long delay) { + return schedule(() -> { + try { + c.close(); + } catch (IOException ioe) { } + }, delay); + } + + static Future scheduleInterrupt(Thread t, long delay) { + return schedule(() -> t.interrupt(), delay); + } + + static Future schedule(Runnable task, long delay) { + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + try { + return executor.schedule(task, delay, TimeUnit.MILLISECONDS); + } finally { + executor.shutdown(); + } + } + + static Future execute(ThrowingTask task) { + ExecutorService pool = Executors.newFixedThreadPool(1); + try { + return pool.submit(() -> { + task.run(); + return null; + }); + } finally { + pool.shutdown(); + } + } +} diff -r b43cc3b9ef40 -r 13b67c1420b8 test/jdk/java/nio/channels/SocketChannel/Stream.java --- a/test/jdk/java/nio/channels/SocketChannel/Stream.java Thu Apr 25 09:12:40 2019 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,63 +0,0 @@ -/* - * Copyright (c) 2001, 2018, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ - -/* @test - * @bug 4430139 - * @summary Test result of read on stream from nonblocking channel - * @library .. /test/lib - * @build jdk.test.lib.Utils TestServers - * @run main Stream - */ - -import java.io.*; -import java.net.*; -import java.nio.channels.*; - - -public class Stream { - - static void test(TestServers.DayTimeServer daytimeServer) throws Exception { - InetSocketAddress isa - = new InetSocketAddress(daytimeServer.getAddress(), - daytimeServer.getPort()); - SocketChannel sc = SocketChannel.open(); - sc.connect(isa); - sc.configureBlocking(false); - InputStream is = sc.socket().getInputStream(); - byte b[] = new byte[10]; - try { - int n = is.read(b); - throw new RuntimeException("Exception expected; none thrown"); - } catch (IllegalBlockingModeException e) { - // expected result - } - sc.close(); - } - - public static void main(String[] args) throws Exception { - try (TestServers.DayTimeServer dayTimeServer - = TestServers.DayTimeServer.startNewServer(100)) { - test(dayTimeServer); - } - } -}