--- a/src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java Tue Feb 27 23:11:26 2018 -0800
+++ b/src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java Wed Feb 28 09:54:38 2018 +0000
@@ -48,6 +48,7 @@
import java.nio.channels.spi.SelectorProvider;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
@@ -62,7 +63,6 @@
extends SocketChannel
implements SelChImpl
{
-
// Used to make native read and write calls
private static NativeDispatcher nd;
@@ -70,10 +70,6 @@
private final FileDescriptor fd;
private final int fdVal;
- // IDs of native threads doing reads and writes, for signalling
- private volatile long readerThread;
- private volatile long writerThread;
-
// Lock held by current reading or connecting thread
private final ReentrantLock readLock = new ReentrantLock();
@@ -84,28 +80,32 @@
// DO NOT invoke a blocking I/O operation while holding this lock!
private final Object stateLock = new Object();
+ // Input/Output closed
+ private volatile boolean isInputClosed;
+ private volatile boolean isOutputClosed;
+
// -- The following fields are protected by stateLock
// set true when exclusive binding is on and SO_REUSEADDR is emulated
private boolean isReuseAddress;
// State, increases monotonically
- private static final int ST_UNINITIALIZED = -1;
private static final int ST_UNCONNECTED = 0;
- private static final int ST_PENDING = 1;
+ private static final int ST_CONNECTIONPENDING = 1;
private static final int ST_CONNECTED = 2;
- private static final int ST_KILLPENDING = 3;
- private static final int ST_KILLED = 4;
- private int state = ST_UNINITIALIZED;
+ private static final int ST_CLOSING = 3;
+ private static final int ST_KILLPENDING = 4;
+ private static final int ST_KILLED = 5;
+ private int state;
+
+ // IDs of native threads doing reads and writes, for signalling
+ private long readerThread;
+ private long writerThread;
// Binding
private InetSocketAddress localAddress;
private InetSocketAddress remoteAddress;
- // Input/Output open
- private boolean isInputOpen = true;
- private boolean isOutputOpen = true;
-
// Socket adaptor, created on demand
private Socket socket;
@@ -118,36 +118,43 @@
super(sp);
this.fd = Net.socket(true);
this.fdVal = IOUtil.fdVal(fd);
- this.state = ST_UNCONNECTED;
}
- SocketChannelImpl(SelectorProvider sp,
- FileDescriptor fd,
- boolean bound)
+ SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound)
throws IOException
{
super(sp);
this.fd = fd;
this.fdVal = IOUtil.fdVal(fd);
- this.state = ST_UNCONNECTED;
- if (bound)
- this.localAddress = Net.localAddress(fd);
+ if (bound) {
+ synchronized (stateLock) {
+ this.localAddress = Net.localAddress(fd);
+ }
+ }
}
// Constructor for sockets obtained from server sockets
//
- SocketChannelImpl(SelectorProvider sp,
- FileDescriptor fd, InetSocketAddress remote)
+ SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, InetSocketAddress isa)
throws IOException
{
super(sp);
this.fd = fd;
this.fdVal = IOUtil.fdVal(fd);
- this.state = ST_CONNECTED;
- this.localAddress = Net.localAddress(fd);
- this.remoteAddress = remote;
+ synchronized (stateLock) {
+ this.localAddress = Net.localAddress(fd);
+ this.remoteAddress = isa;
+ this.state = ST_CONNECTED;
+ }
}
+ // @throws ClosedChannelException if channel is closed
+ private void ensureOpen() throws ClosedChannelException {
+ if (!isOpen())
+ throw new ClosedChannelException();
+ }
+
+ @Override
public Socket socket() {
synchronized (stateLock) {
if (socket == null)
@@ -159,17 +166,15 @@
@Override
public SocketAddress getLocalAddress() throws IOException {
synchronized (stateLock) {
- if (!isOpen())
- throw new ClosedChannelException();
- return Net.getRevealedLocalAddress(localAddress);
+ ensureOpen();
+ return Net.getRevealedLocalAddress(localAddress);
}
}
@Override
public SocketAddress getRemoteAddress() throws IOException {
synchronized (stateLock) {
- if (!isOpen())
- throw new ClosedChannelException();
+ ensureOpen();
return remoteAddress;
}
}
@@ -178,14 +183,12 @@
public <T> SocketChannel setOption(SocketOption<T> name, T value)
throws IOException
{
- if (name == null)
- throw new NullPointerException();
+ Objects.requireNonNull(name);
if (!supportedOptions().contains(name))
throw new UnsupportedOperationException("'" + name + "' not supported");
synchronized (stateLock) {
- if (!isOpen())
- throw new ClosedChannelException();
+ ensureOpen();
if (name == StandardSocketOptions.IP_TOS) {
ProtocolFamily family = Net.isIPv6Available() ?
@@ -211,18 +214,14 @@
public <T> T getOption(SocketOption<T> name)
throws IOException
{
- if (name == null)
- throw new NullPointerException();
+ Objects.requireNonNull(name);
if (!supportedOptions().contains(name))
throw new UnsupportedOperationException("'" + name + "' not supported");
synchronized (stateLock) {
- if (!isOpen())
- throw new ClosedChannelException();
+ ensureOpen();
- if (name == StandardSocketOptions.SO_REUSEADDR &&
- Net.useExclusiveBind())
- {
+ if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
// SO_REUSEADDR emulated when using exclusive bind
return (T)Boolean.valueOf(isReuseAddress);
}
@@ -243,7 +242,7 @@
static final Set<SocketOption<?>> defaultOptions = defaultOptions();
private static Set<SocketOption<?>> defaultOptions() {
- HashSet<SocketOption<?>> set = new HashSet<>(8);
+ HashSet<SocketOption<?>> set = new HashSet<>();
set.add(StandardSocketOptions.SO_SNDBUF);
set.add(StandardSocketOptions.SO_RCVBUF);
set.add(StandardSocketOptions.SO_KEEPALIVE);
@@ -256,9 +255,7 @@
// additional options required by socket adaptor
set.add(StandardSocketOptions.IP_TOS);
set.add(ExtendedSocketOption.SO_OOBINLINE);
- ExtendedSocketOptions extendedOptions =
- ExtendedSocketOptions.getInstance();
- set.addAll(extendedOptions.options());
+ set.addAll(ExtendedSocketOptions.getInstance().options());
return Collections.unmodifiableSet(set);
}
}
@@ -268,329 +265,277 @@
return DefaultOptionsHolder.defaultOptions;
}
- private boolean ensureReadOpen() throws ClosedChannelException {
+ /**
+ * Marks the beginning of a read operation that might block.
+ *
+ * @throws ClosedChannelException if the channel is closed
+ * @throws NotYetConnectedException if the channel is not yet connected
+ */
+ private void beginRead(boolean blocking) throws ClosedChannelException {
+ if (blocking) {
+ // set hook for Thread.interrupt
+ begin();
+ }
synchronized (stateLock) {
- if (!isOpen())
- throw new ClosedChannelException();
- if (!isConnected())
+ ensureOpen();
+ if (state != ST_CONNECTED)
throw new NotYetConnectedException();
- if (!isInputOpen)
- return false;
- else
- return true;
+ if (blocking)
+ readerThread = NativeThread.current();
}
}
- private void ensureWriteOpen() throws ClosedChannelException {
- synchronized (stateLock) {
- if (!isOpen())
- throw new ClosedChannelException();
- if (!isOutputOpen)
- throw new ClosedChannelException();
- if (!isConnected())
- throw new NotYetConnectedException();
+ /**
+ * Marks the end of a read operation that may have blocked.
+ *
+ * @throws AsynchronousCloseException if the channel was closed due to this
+ * thread being interrupted on a blocking read operation.
+ */
+ private void endRead(boolean blocking, boolean completed)
+ throws AsynchronousCloseException
+ {
+ if (blocking) {
+ synchronized (stateLock) {
+ readerThread = 0;
+ // notify any thread waiting in implCloseSelectableChannel
+ if (state == ST_CLOSING) {
+ stateLock.notifyAll();
+ }
+ }
+ // remove hook for Thread.interrupt
+ end(completed);
}
}
- private void readerCleanup() throws IOException {
- synchronized (stateLock) {
- readerThread = 0;
- if (state == ST_KILLPENDING)
- kill();
+ @Override
+ public int read(ByteBuffer buf) throws IOException {
+ Objects.requireNonNull(buf);
+
+ readLock.lock();
+ try {
+ boolean blocking = isBlocking();
+ int n = 0;
+ try {
+ beginRead(blocking);
+
+ // check if input is shutdown
+ if (isInputClosed)
+ return IOStatus.EOF;
+
+ if (blocking) {
+ do {
+ n = IOUtil.read(fd, buf, -1, nd);
+ } while (n == IOStatus.INTERRUPTED && isOpen());
+ } else {
+ n = IOUtil.read(fd, buf, -1, nd);
+ }
+ } finally {
+ endRead(blocking, n > 0);
+ if (n <= 0 && isInputClosed)
+ return IOStatus.EOF;
+ }
+ return IOStatus.normalize(n);
+ } finally {
+ readLock.unlock();
}
}
- private void writerCleanup() throws IOException {
- synchronized (stateLock) {
- writerThread = 0;
- if (state == ST_KILLPENDING)
- kill();
- }
- }
-
- public int read(ByteBuffer buf) throws IOException {
-
- if (buf == null)
- throw new NullPointerException();
+ @Override
+ public long read(ByteBuffer[] dsts, int offset, int length)
+ throws IOException
+ {
+ Objects.checkFromIndexSize(offset, length, dsts.length);
readLock.lock();
try {
- if (!ensureReadOpen())
- return -1;
+ boolean blocking = isBlocking();
+ long n = 0;
+ try {
+ beginRead(blocking);
+
+ // check if input is shutdown
+ if (isInputClosed)
+ return IOStatus.EOF;
+
+ if (blocking) {
+ do {
+ n = IOUtil.read(fd, dsts, offset, length, nd);
+ } while (n == IOStatus.INTERRUPTED && isOpen());
+ } else {
+ n = IOUtil.read(fd, dsts, offset, length, nd);
+ }
+ } finally {
+ endRead(blocking, n > 0);
+ if (n <= 0 && isInputClosed)
+ return IOStatus.EOF;
+ }
+ return IOStatus.normalize(n);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**
+ * Marks the beginning of a write operation that might block.
+ *
+ * @throws ClosedChannelException if the channel is closed or output shutdown
+ * @throws NotYetConnectedException if the channel is not yet connected
+ */
+ private void beginWrite(boolean blocking) throws ClosedChannelException {
+ if (blocking) {
+ // set hook for Thread.interrupt
+ begin();
+ }
+ synchronized (stateLock) {
+ ensureOpen();
+ if (isOutputClosed)
+ throw new ClosedChannelException();
+ if (state != ST_CONNECTED)
+ throw new NotYetConnectedException();
+ if (blocking)
+ writerThread = NativeThread.current();
+ }
+ }
+
+ /**
+ * Marks the end of a write operation that may have blocked.
+ *
+ * @throws AsynchronousCloseException if the channel was closed due to this
+ * thread being interrupted on a blocking write operation.
+ */
+ private void endWrite(boolean blocking, boolean completed)
+ throws AsynchronousCloseException
+ {
+ if (blocking) {
+ synchronized (stateLock) {
+ writerThread = 0;
+ // notify any thread waiting in implCloseSelectableChannel
+ if (state == ST_CLOSING) {
+ stateLock.notifyAll();
+ }
+ }
+ // remove hook for Thread.interrupt
+ end(completed);
+ }
+ }
+
+ @Override
+ public int write(ByteBuffer buf) throws IOException {
+ Objects.requireNonNull(buf);
+
+ writeLock.lock();
+ try {
+ boolean blocking = isBlocking();
int n = 0;
try {
-
- // Set up the interruption machinery; see
- // AbstractInterruptibleChannel for details
- //
- begin();
+ beginWrite(blocking);
+ if (blocking) {
+ do {
+ n = IOUtil.write(fd, buf, -1, nd);
+ } while (n == IOStatus.INTERRUPTED && isOpen());
+ } else {
+ n = IOUtil.write(fd, buf, -1, nd);
+ }
+ } finally {
+ endWrite(blocking, n > 0);
+ if (n <= 0 && isOutputClosed)
+ throw new AsynchronousCloseException();
+ }
+ return IOStatus.normalize(n);
+ } finally {
+ writeLock.unlock();
+ }
+ }
- synchronized (stateLock) {
- if (!isOpen()) {
- // Either the current thread is already interrupted, so
- // begin() closed the channel, or another thread closed the
- // channel since we checked it a few bytecodes ago. In
- // either case the value returned here is irrelevant since
- // the invocation of end() in the finally block will throw
- // an appropriate exception.
- //
- return 0;
-
- }
-
- // Save this thread so that it can be signalled on those
- // platforms that require it
- //
- readerThread = NativeThread.current();
- }
+ @Override
+ public long write(ByteBuffer[] srcs, int offset, int length)
+ throws IOException
+ {
+ Objects.checkFromIndexSize(offset, length, srcs.length);
- // Between the previous test of isOpen() and the return of the
- // IOUtil.read invocation below, this channel might be closed
- // or this thread might be interrupted. We rely upon the
- // implicit synchronization point in the kernel read() call to
- // make sure that the right thing happens. In either case the
- // implCloseSelectableChannel method is ultimately invoked in
- // some other thread, so there are three possibilities:
- //
- // - implCloseSelectableChannel() invokes nd.preClose()
- // before this thread invokes read(), in which case the
- // read returns immediately with either EOF or an error,
- // the latter of which will cause an IOException to be
- // thrown.
- //
- // - implCloseSelectableChannel() invokes nd.preClose() after
- // this thread is blocked in read(). On some operating
- // systems (e.g., Solaris and Windows) this causes the read
- // to return immediately with either EOF or an error
- // indication.
- //
- // - implCloseSelectableChannel() invokes nd.preClose() after
- // this thread is blocked in read() but the operating
- // system (e.g., Linux) doesn't support preemptive close,
- // so implCloseSelectableChannel() proceeds to signal this
- // thread, thereby causing the read to return immediately
- // with IOStatus.INTERRUPTED.
- //
- // In all three cases the invocation of end() in the finally
- // clause will notice that the channel has been closed and
- // throw an appropriate exception (AsynchronousCloseException
- // or ClosedByInterruptException) if necessary.
- //
- // *There is A fourth possibility. implCloseSelectableChannel()
- // invokes nd.preClose(), signals reader/writer thred and quickly
- // moves on to nd.close() in kill(), which does a real close.
- // Then a third thread accepts a new connection, opens file or
- // whatever that causes the released "fd" to be recycled. All
- // above happens just between our last isOpen() check and the
- // next kernel read reached, with the recycled "fd". The solution
- // is to postpone the real kill() if there is a reader or/and
- // writer thread(s) over there "waiting", leave the cleanup/kill
- // to the reader or writer thread. (the preClose() still happens
- // so the connection gets cut off as usual).
- //
- // For socket channels there is the additional wrinkle that
- // asynchronous shutdown works much like asynchronous close,
- // except that the channel is shutdown rather than completely
- // closed. This is analogous to the first two cases above,
- // except that the shutdown operation plays the role of
- // nd.preClose().
- for (;;) {
- n = IOUtil.read(fd, buf, -1, nd);
- if ((n == IOStatus.INTERRUPTED) && isOpen()) {
- // The system call was interrupted but the channel
- // is still open, so retry
- continue;
- }
- return IOStatus.normalize(n);
+ writeLock.lock();
+ try {
+ boolean blocking = isBlocking();
+ long n = 0;
+ try {
+ beginWrite(blocking);
+ if (blocking) {
+ do {
+ n = IOUtil.write(fd, srcs, offset, length, nd);
+ } while (n == IOStatus.INTERRUPTED && isOpen());
+ } else {
+ n = IOUtil.write(fd, srcs, offset, length, nd);
}
-
} finally {
- readerCleanup(); // Clear reader thread
- // The end method, which is defined in our superclass
- // AbstractInterruptibleChannel, resets the interruption
- // machinery. If its argument is true then it returns
- // normally; otherwise it checks the interrupt and open state
- // of this channel and throws an appropriate exception if
- // necessary.
- //
- // So, if we actually managed to do any I/O in the above try
- // block then we pass true to the end method. We also pass
- // true if the channel was in non-blocking mode when the I/O
- // operation was initiated but no data could be transferred;
- // this prevents spurious exceptions from being thrown in the
- // rare event that a channel is closed or a thread is
- // interrupted at the exact moment that a non-blocking I/O
- // request is made.
- //
- end(n > 0 || (n == IOStatus.UNAVAILABLE));
+ endWrite(blocking, n > 0);
+ if (n <= 0 && isOutputClosed)
+ throw new AsynchronousCloseException();
+ }
+ return IOStatus.normalize(n);
+ } finally {
+ writeLock.unlock();
+ }
+ }
- // Extra case for socket channels: Asynchronous shutdown
- //
+ /**
+ * Writes a byte of out of band data.
+ */
+ int sendOutOfBandData(byte b) throws IOException {
+ writeLock.lock();
+ try {
+ boolean blocking = isBlocking();
+ int n = 0;
+ try {
+ beginWrite(blocking);
+ if (blocking) {
+ do {
+ n = sendOutOfBandData(fd, b);
+ } while (n == IOStatus.INTERRUPTED && isOpen());
+ } else {
+ n = sendOutOfBandData(fd, b);
+ }
+ } finally {
+ endWrite(blocking, n > 0);
+ if (n <= 0 && isOutputClosed)
+ throw new AsynchronousCloseException();
+ }
+ return IOStatus.normalize(n);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ protected void implConfigureBlocking(boolean block) throws IOException {
+ readLock.lock();
+ try {
+ writeLock.lock();
+ try {
synchronized (stateLock) {
- if ((n <= 0) && (!isInputOpen))
- return IOStatus.EOF;
+ ensureOpen();
+ IOUtil.configureBlocking(fd, block);
}
-
- assert IOStatus.check(n);
-
+ } finally {
+ writeLock.unlock();
}
} finally {
readLock.unlock();
}
}
- public long read(ByteBuffer[] dsts, int offset, int length)
- throws IOException
- {
- if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
- throw new IndexOutOfBoundsException();
- readLock.lock();
- try {
- if (!ensureReadOpen())
- return -1;
- long n = 0;
- try {
- begin();
- synchronized (stateLock) {
- if (!isOpen())
- return 0;
- readerThread = NativeThread.current();
- }
-
- for (;;) {
- n = IOUtil.read(fd, dsts, offset, length, nd);
- if ((n == IOStatus.INTERRUPTED) && isOpen())
- continue;
- return IOStatus.normalize(n);
- }
- } finally {
- readerCleanup();
- end(n > 0 || (n == IOStatus.UNAVAILABLE));
- synchronized (stateLock) {
- if ((n <= 0) && (!isInputOpen))
- return IOStatus.EOF;
- }
- assert IOStatus.check(n);
- }
- } finally {
- readLock.unlock();
- }
- }
-
- public int write(ByteBuffer buf) throws IOException {
- if (buf == null)
- throw new NullPointerException();
- writeLock.lock();
- try {
- ensureWriteOpen();
- int n = 0;
- try {
- begin();
- synchronized (stateLock) {
- if (!isOpen())
- return 0;
- writerThread = NativeThread.current();
- }
- for (;;) {
- n = IOUtil.write(fd, buf, -1, nd);
- if ((n == IOStatus.INTERRUPTED) && isOpen())
- continue;
- return IOStatus.normalize(n);
- }
- } finally {
- writerCleanup();
- end(n > 0 || (n == IOStatus.UNAVAILABLE));
- synchronized (stateLock) {
- if ((n <= 0) && (!isOutputOpen))
- throw new AsynchronousCloseException();
- }
- assert IOStatus.check(n);
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- public long write(ByteBuffer[] srcs, int offset, int length)
- throws IOException
- {
- if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
- throw new IndexOutOfBoundsException();
- writeLock.lock();
- try {
- ensureWriteOpen();
- long n = 0;
- try {
- begin();
- synchronized (stateLock) {
- if (!isOpen())
- return 0;
- writerThread = NativeThread.current();
- }
- for (;;) {
- n = IOUtil.write(fd, srcs, offset, length, nd);
- if ((n == IOStatus.INTERRUPTED) && isOpen())
- continue;
- return IOStatus.normalize(n);
- }
- } finally {
- writerCleanup();
- end((n > 0) || (n == IOStatus.UNAVAILABLE));
- synchronized (stateLock) {
- if ((n <= 0) && (!isOutputOpen))
- throw new AsynchronousCloseException();
- }
- assert IOStatus.check(n);
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- // package-private
- int sendOutOfBandData(byte b) throws IOException {
- writeLock.lock();
- try {
- ensureWriteOpen();
- int n = 0;
- try {
- begin();
- synchronized (stateLock) {
- if (!isOpen())
- return 0;
- writerThread = NativeThread.current();
- }
- for (;;) {
- n = sendOutOfBandData(fd, b);
- if ((n == IOStatus.INTERRUPTED) && isOpen())
- continue;
- return IOStatus.normalize(n);
- }
- } finally {
- writerCleanup();
- end((n > 0) || (n == IOStatus.UNAVAILABLE));
- synchronized (stateLock) {
- if ((n <= 0) && (!isOutputOpen))
- throw new AsynchronousCloseException();
- }
- assert IOStatus.check(n);
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- protected void implConfigureBlocking(boolean block) throws IOException {
- IOUtil.configureBlocking(fd, block);
- }
-
- public InetSocketAddress localAddress() {
+ /**
+ * Returns the local address, or null if not bound
+ */
+ InetSocketAddress localAddress() {
synchronized (stateLock) {
return localAddress;
}
}
- public SocketAddress remoteAddress() {
+ /**
+ * Returns the remote address, or null if not connected
+ */
+ InetSocketAddress remoteAddress() {
synchronized (stateLock) {
return remoteAddress;
}
@@ -603,9 +548,8 @@
writeLock.lock();
try {
synchronized (stateLock) {
- if (!isOpen())
- throw new ClosedChannelException();
- if (state == ST_PENDING)
+ ensureOpen();
+ if (state == ST_CONNECTIONPENDING)
throw new ConnectionPendingException();
if (localAddress != null)
throw new AlreadyBoundException();
@@ -628,101 +572,115 @@
return this;
}
+ @Override
public boolean isConnected() {
synchronized (stateLock) {
return (state == ST_CONNECTED);
}
}
+ @Override
public boolean isConnectionPending() {
synchronized (stateLock) {
- return (state == ST_PENDING);
+ return (state == ST_CONNECTIONPENDING);
}
}
- void ensureOpenAndUnconnected() throws IOException { // package-private
+ /**
+ * Marks the beginning of a connect operation that might block.
+ *
+ * @throws ClosedChannelException if the channel is closed
+ * @throws AlreadyConnectedException if already connected
+ * @throws ConnectionPendingException is a connection is pending
+ */
+ private void beginConnect(boolean blocking) throws ClosedChannelException {
+ if (blocking) {
+ // set hook for Thread.interrupt
+ begin();
+ }
synchronized (stateLock) {
- if (!isOpen())
- throw new ClosedChannelException();
+ ensureOpen();
if (state == ST_CONNECTED)
throw new AlreadyConnectedException();
- if (state == ST_PENDING)
+ if (state == ST_CONNECTIONPENDING)
throw new ConnectionPendingException();
+ if (blocking)
+ readerThread = NativeThread.current();
}
}
+ /**
+ * Marks the end of a connect operation that may have blocked.
+ *
+ * @throws AsynchronousCloseException if the channel was closed due to this
+ * thread being interrupted on a blocking connect operation.
+ */
+ private void endConnect(boolean blocking, boolean completed)
+ throws AsynchronousCloseException
+ {
+ endRead(blocking, completed);
+ }
+
+ @Override
public boolean connect(SocketAddress sa) throws IOException {
+ InetSocketAddress isa = Net.checkAddress(sa);
+ SecurityManager sm = System.getSecurityManager();
+ if (sm != null)
+ sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
+
readLock.lock();
try {
writeLock.lock();
try {
- ensureOpenAndUnconnected();
- InetSocketAddress isa = Net.checkAddress(sa);
- SecurityManager sm = System.getSecurityManager();
- if (sm != null)
- sm.checkConnect(isa.getAddress().getHostAddress(),
- isa.getPort());
- synchronized (blockingLock()) {
- int n = 0;
- try {
- try {
- begin();
- synchronized (stateLock) {
- if (!isOpen()) {
- return false;
- }
- // notify hook only if unbound
- if (localAddress == null) {
- NetHooks.beforeTcpConnect(fd,
- isa.getAddress(),
- isa.getPort());
- }
- readerThread = NativeThread.current();
- }
- for (;;) {
- InetAddress ia = isa.getAddress();
- if (ia.isAnyLocalAddress())
- ia = InetAddress.getLocalHost();
- n = Net.connect(fd,
- ia,
- isa.getPort());
- if ((n == IOStatus.INTERRUPTED) && isOpen())
- continue;
- break;
- }
-
- } finally {
- readerCleanup();
- end((n > 0) || (n == IOStatus.UNAVAILABLE));
- assert IOStatus.check(n);
- }
- } catch (IOException x) {
- // If an exception was thrown, close the channel after
- // invoking end() so as to avoid bogus
- // AsynchronousCloseExceptions
- close();
- throw x;
- }
- synchronized (stateLock) {
- remoteAddress = isa;
- if (n > 0) {
-
- // Connection succeeded; disallow further
- // invocation
- state = ST_CONNECTED;
- if (isOpen())
- localAddress = Net.localAddress(fd);
- return true;
- }
- // If nonblocking and no exception then connection
- // pending; disallow another invocation
- if (!isBlocking())
- state = ST_PENDING;
- else
- assert false;
+ // notify before-connect hook
+ synchronized (stateLock) {
+ if (state == ST_UNCONNECTED && localAddress == null) {
+ NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort());
}
}
- return false;
+
+ InetAddress ia = isa.getAddress();
+ if (ia.isAnyLocalAddress())
+ ia = InetAddress.getLocalHost();
+
+ int n = 0;
+ boolean blocking = isBlocking();
+ try {
+ try {
+ beginConnect(blocking);
+ if (blocking) {
+ do {
+ n = Net.connect(fd, ia, isa.getPort());
+ } while (n == IOStatus.INTERRUPTED && isOpen());
+ } else {
+ n = Net.connect(fd, ia, isa.getPort());
+ }
+ } finally {
+ endConnect(blocking, n > 0);
+ }
+ } catch (IOException x) {
+ // connect failed, close socket
+ close();
+ throw x;
+ }
+
+ // connection may be established
+ synchronized (stateLock) {
+ if (!isOpen())
+ throw new AsynchronousCloseException();
+ remoteAddress = isa;
+ if (n > 0) {
+ // connected established
+ localAddress = Net.localAddress(fd);
+ state = ST_CONNECTED;
+ return true;
+ } else {
+ // connection pending
+ assert !blocking;
+ state = ST_CONNECTIONPENDING;
+ return false;
+ }
+ }
} finally {
writeLock.unlock();
}
@@ -731,83 +689,85 @@
}
}
+ /**
+ * Marks the beginning of a finishConnect operation that might block.
+ *
+ * @throws ClosedChannelException if the channel is closed
+ * @throws NoConnectionPendingException if no connection is pending
+ */
+ private void beginFinishConnect(boolean blocking) throws ClosedChannelException {
+ if (blocking) {
+ // set hook for Thread.interrupt
+ begin();
+ }
+ synchronized (stateLock) {
+ ensureOpen();
+ if (state != ST_CONNECTIONPENDING)
+ throw new NoConnectionPendingException();
+ if (blocking)
+ readerThread = NativeThread.current();
+ }
+ }
+
+ /**
+ * Marks the end of a finishConnect operation that may have blocked.
+ *
+ * @throws AsynchronousCloseException if the channel was closed due to this
+ * thread being interrupted on a blocking connect operation.
+ */
+ private void endFinishConnect(boolean blocking, boolean completed)
+ throws AsynchronousCloseException
+ {
+ endRead(blocking, completed);
+ }
+
+ @Override
public boolean finishConnect() throws IOException {
readLock.lock();
try {
writeLock.lock();
try {
+ // already connected?
synchronized (stateLock) {
- if (!isOpen())
- throw new ClosedChannelException();
if (state == ST_CONNECTED)
return true;
- if (state != ST_PENDING)
- throw new NoConnectionPendingException();
}
+
int n = 0;
+ boolean blocking = isBlocking();
try {
try {
- begin();
- synchronized (blockingLock()) {
- synchronized (stateLock) {
- if (!isOpen()) {
- return false;
- }
- readerThread = NativeThread.current();
- }
- if (!isBlocking()) {
- for (;;) {
- n = checkConnect(fd, false);
- if ((n == IOStatus.INTERRUPTED) && isOpen())
- continue;
- break;
- }
- } else {
- for (;;) {
- n = checkConnect(fd, true);
- if (n == 0) {
- // Loop in case of
- // spurious notifications
- continue;
- }
- if ((n == IOStatus.INTERRUPTED) && isOpen())
- continue;
- break;
- }
- }
+ beginFinishConnect(blocking);
+ if (blocking) {
+ do {
+ n = checkConnect(fd, true);
+ } while (n == 0 || (n == IOStatus.INTERRUPTED) && isOpen());
+ } else {
+ n = checkConnect(fd, false);
}
} finally {
- synchronized (stateLock) {
- readerThread = 0;
- if (state == ST_KILLPENDING) {
- kill();
- // poll()/getsockopt() does not report
- // error (throws exception, with n = 0)
- // on Linux platform after dup2 and
- // signal-wakeup. Force n to 0 so the
- // end() can throw appropriate exception
- n = 0;
- }
- }
- end((n > 0) || (n == IOStatus.UNAVAILABLE));
- assert IOStatus.check(n);
+ endFinishConnect(blocking, n > 0);
}
} catch (IOException x) {
- // If an exception was thrown, close the channel after
- // invoking end() so as to avoid bogus
- // AsynchronousCloseExceptions
close();
throw x;
}
- if (n > 0) {
- synchronized (stateLock) {
+
+ // post finishConnect, connection may be established
+ synchronized (stateLock) {
+ if (!isOpen())
+ throw new AsynchronousCloseException();
+ if (n > 0) {
+ // connection established
+ localAddress = Net.localAddress(fd);
state = ST_CONNECTED;
- if (isOpen())
- localAddress = Net.localAddress(fd);
+ return true;
+ } else {
+ // connection still pending
+ assert !blocking;
+ return false;
}
- return true;
}
- return false;
} finally {
writeLock.unlock();
}
@@ -816,18 +776,119 @@
}
}
+ /**
+ * Invoked by implCloseChannel to close the channel.
+ *
+ * This method waits for outstanding I/O operations to complete. When in
+ * blocking mode, the socket is pre-closed and the threads in blocking I/O
+ * operations are signalled to ensure that the outstanding I/O operations
+ * complete quickly.
+ *
+ * If the socket is connected then it is shutdown by this method. The
+ * shutdown ensures that the peer reads EOF for the case that the socket is
+ * not pre-closed or closed by this method.
+ *
+ * The socket is closed by this method when it is not registered with a
+ * Selector. Note that a channel configured blocking may be registered with
+ * a Selector. This arises when a key is canceled and the channel configured
+ * to blocking mode before the key is flushed from the Selector.
+ */
+ @Override
+ protected void implCloseSelectableChannel() throws IOException {
+ assert !isOpen();
+
+ boolean blocking;
+ boolean connected;
+ boolean interrupted = false;
+
+ // set state to ST_CLOSING
+ synchronized (stateLock) {
+ assert state < ST_CLOSING;
+ blocking = isBlocking();
+ connected = (state == ST_CONNECTED);
+ state = ST_CLOSING;
+ }
+
+ // wait for any outstanding I/O operations to complete
+ if (blocking) {
+ synchronized (stateLock) {
+ assert state == ST_CLOSING;
+ long reader = readerThread;
+ long writer = writerThread;
+ if (reader != 0 || writer != 0) {
+ nd.preClose(fd);
+ connected = false; // fd is no longer connected socket
+
+ if (reader != 0)
+ NativeThread.signal(reader);
+ if (writer != 0)
+ NativeThread.signal(writer);
+
+ // wait for blocking I/O operations to end
+ while (readerThread != 0 || writerThread != 0) {
+ try {
+ stateLock.wait();
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ }
+ }
+ }
+ } else {
+ // non-blocking mode: wait for read/write to complete
+ readLock.lock();
+ try {
+ writeLock.lock();
+ writeLock.unlock();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ // set state to ST_KILLPENDING
+ synchronized (stateLock) {
+ assert state == ST_CLOSING;
+ // if connected, and the channel is registered with a Selector, we
+ // shutdown the output so that the peer reads EOF
+ if (connected && isRegistered()) {
+ try {
+ Net.shutdown(fd, Net.SHUT_WR);
+ } catch (IOException ignore) { }
+ }
+ state = ST_KILLPENDING;
+ }
+
+ // close socket if not registered with Selector
+ if (!isRegistered())
+ kill();
+
+ // restore interrupt status
+ if (interrupted)
+ Thread.currentThread().interrupt();
+ }
+
+ @Override
+ public void kill() throws IOException {
+ synchronized (stateLock) {
+ if (state == ST_KILLPENDING) {
+ state = ST_KILLED;
+ nd.close(fd);
+ }
+ }
+ }
+
@Override
public SocketChannel shutdownInput() throws IOException {
synchronized (stateLock) {
- if (!isOpen())
- throw new ClosedChannelException();
+ ensureOpen();
if (!isConnected())
throw new NotYetConnectedException();
- if (isInputOpen) {
+ if (!isInputClosed) {
Net.shutdown(fd, Net.SHUT_RD);
- if (readerThread != 0)
- NativeThread.signal(readerThread);
- isInputOpen = false;
+ long thread = readerThread;
+ if (thread != 0)
+ NativeThread.signal(thread);
+ isInputClosed = true;
}
return this;
}
@@ -836,94 +897,78 @@
@Override
public SocketChannel shutdownOutput() throws IOException {
synchronized (stateLock) {
- if (!isOpen())
- throw new ClosedChannelException();
+ ensureOpen();
if (!isConnected())
throw new NotYetConnectedException();
- if (isOutputOpen) {
+ if (!isOutputClosed) {
Net.shutdown(fd, Net.SHUT_WR);
- if (writerThread != 0)
- NativeThread.signal(writerThread);
- isOutputOpen = false;
+ long thread = writerThread;
+ if (thread != 0)
+ NativeThread.signal(thread);
+ isOutputClosed = true;
}
return this;
}
}
- public boolean isInputOpen() {
- synchronized (stateLock) {
- return isInputOpen;
- }
+ boolean isInputOpen() {
+ return !isInputClosed;
+ }
+
+ boolean isOutputOpen() {
+ return !isOutputClosed;
}
- public boolean isOutputOpen() {
- synchronized (stateLock) {
- return isOutputOpen;
+ /**
+ * Poll this channel's socket for reading up to the given timeout.
+ * @return {@code true} if the socket is polled
+ */
+ boolean pollRead(long timeout) throws IOException {
+ boolean blocking = isBlocking();
+ assert Thread.holdsLock(blockingLock()) && blocking;
+
+ readLock.lock();
+ try {
+ boolean polled = false;
+ try {
+ beginRead(blocking);
+ int n = Net.poll(fd, Net.POLLIN, timeout);
+ polled = (n > 0);
+ } finally {
+ endRead(blocking, polled);
+ }
+ return polled;
+ } finally {
+ readLock.unlock();
}
}
- // AbstractInterruptibleChannel synchronizes invocations of this method
- // using AbstractInterruptibleChannel.closeLock, and also ensures that this
- // method is only ever invoked once. Before we get to this method, isOpen
- // (which is volatile) will have been set to false.
- //
- protected void implCloseSelectableChannel() throws IOException {
- synchronized (stateLock) {
- isInputOpen = false;
- isOutputOpen = false;
-
- // Close the underlying file descriptor and dup it to a known fd
- // that's already closed. This prevents other operations on this
- // channel from using the old fd, which might be recycled in the
- // meantime and allocated to an entirely different channel.
- //
- if (state != ST_KILLED)
- nd.preClose(fd);
-
- // Signal native threads, if needed. If a target thread is not
- // currently blocked in an I/O operation then no harm is done since
- // the signal handler doesn't actually do anything.
- //
- if (readerThread != 0)
- NativeThread.signal(readerThread);
-
- if (writerThread != 0)
- NativeThread.signal(writerThread);
+ /**
+ * Poll this channel's socket for a connection, up to the given timeout.
+ * @return {@code true} if the socket is polled
+ */
+ boolean pollConnected(long timeout) throws IOException {
+ boolean blocking = isBlocking();
+ assert Thread.holdsLock(blockingLock()) && blocking;
- // If this channel is not registered then it's safe to close the fd
- // immediately since we know at this point that no thread is
- // blocked in an I/O operation upon the channel and, since the
- // channel is marked closed, no thread will start another such
- // operation. If this channel is registered then we don't close
- // the fd since it might be in use by a selector. In that case
- // closing this channel caused its keys to be cancelled, so the
- // last selector to deregister a key for this channel will invoke
- // kill() to close the fd.
- //
- if (!isRegistered())
- kill();
- }
- }
-
- public void kill() throws IOException {
- synchronized (stateLock) {
- if (state == ST_KILLED)
- return;
- if (state == ST_UNINITIALIZED) {
- state = ST_KILLED;
- return;
+ readLock.lock();
+ try {
+ writeLock.lock();
+ try {
+ boolean polled = false;
+ try {
+ beginFinishConnect(blocking);
+ int n = Net.poll(fd, Net.POLLCONN, timeout);
+ polled = (n > 0);
+ } finally {
+ endFinishConnect(blocking, polled);
+ }
+ return polled;
+ } finally {
+ writeLock.unlock();
}
- assert !isOpen() && !isRegistered();
-
- // Postpone the kill if there is a waiting reader
- // or writer thread. See the comments in read() for
- // more detailed explanation.
- if (readerThread == 0 && writerThread == 0) {
- nd.close(fd);
- state = ST_KILLED;
- } else {
- state = ST_KILLPENDING;
- }
+ } finally {
+ readLock.unlock();
}
}
@@ -956,7 +1001,7 @@
if (((ops & Net.POLLCONN) != 0) &&
((intOps & SelectionKey.OP_CONNECT) != 0) &&
- ((state == ST_UNCONNECTED) || (state == ST_PENDING))) {
+ ((state == ST_UNCONNECTED) || (state == ST_CONNECTIONPENDING))) {
newOps |= SelectionKey.OP_CONNECT;
}
@@ -977,31 +1022,6 @@
return translateReadyOps(ops, 0, sk);
}
- // package-private
- int poll(int events, long timeout) throws IOException {
- assert Thread.holdsLock(blockingLock()) && !isBlocking();
-
- readLock.lock();
- try {
- int n = 0;
- try {
- begin();
- synchronized (stateLock) {
- if (!isOpen())
- return 0;
- readerThread = NativeThread.current();
- }
- n = Net.poll(fd, events, timeout);
- } finally {
- readerCleanup();
- end(n > 0);
- }
- return n;
- } finally {
- readLock.unlock();
- }
- }
-
/**
* Translates an interest operation set into a native poll event set
*/
@@ -1037,14 +1057,14 @@
case ST_UNCONNECTED:
sb.append("unconnected");
break;
- case ST_PENDING:
+ case ST_CONNECTIONPENDING:
sb.append("connection-pending");
break;
case ST_CONNECTED:
sb.append("connected");
- if (!isInputOpen)
+ if (isInputClosed)
sb.append(" ishut");
- if (!isOutputOpen)
+ if (isOutputClosed)
sb.append(" oshut");
break;
}