--- a/src/java.base/unix/classes/sun/nio/ch/SinkChannelImpl.java Tue Feb 27 23:11:26 2018 -0800
+++ b/src/java.base/unix/classes/sun/nio/ch/SinkChannelImpl.java Wed Feb 28 09:54:38 2018 +0000
@@ -28,31 +28,26 @@
import java.io.FileDescriptor;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedChannelException;
+import java.nio.channels.NotYetConnectedException;
import java.nio.channels.Pipe;
import java.nio.channels.SelectionKey;
import java.nio.channels.spi.SelectorProvider;
+import java.util.Objects;
import java.util.concurrent.locks.ReentrantLock;
-
class SinkChannelImpl
extends Pipe.SinkChannel
implements SelChImpl
{
-
// Used to make native read and write calls
private static final NativeDispatcher nd = new FileDispatcherImpl();
// The file descriptor associated with this channel
private final FileDescriptor fd;
-
- // fd value needed for dev/poll. This value will remain valid
- // even after the value in the file descriptor object has been set to -1
private final int fdVal;
- // ID of native thread doing write, for signalling
- private volatile long thread;
-
// Lock held by current writing thread
private final ReentrantLock writeLock = new ReentrantLock();
@@ -63,10 +58,14 @@
// -- The following fields are protected by stateLock
// Channel state
- private static final int ST_UNINITIALIZED = -1;
private static final int ST_INUSE = 0;
- private static final int ST_KILLED = 1;
- private volatile int state = ST_UNINITIALIZED;
+ private static final int ST_CLOSING = 1;
+ private static final int ST_KILLPENDING = 2;
+ private static final int ST_KILLED = 3;
+ private int state;
+
+ // ID of native thread doing write, for signalling
+ private long thread;
// -- End of fields protected by stateLock
@@ -83,39 +82,88 @@
super(sp);
this.fd = fd;
this.fdVal = IOUtil.fdVal(fd);
- this.state = ST_INUSE;
}
+ /**
+ * Invoked by implCloseChannel to close the channel.
+ */
+ @Override
protected void implCloseSelectableChannel() throws IOException {
+ assert !isOpen();
+
+ boolean interrupted = false;
+ boolean blocking;
+
+ // set state to ST_CLOSING
synchronized (stateLock) {
- if (state != ST_KILLED)
- nd.preClose(fd);
- long th = thread;
- if (th != 0)
- NativeThread.signal(th);
- if (!isRegistered())
- kill();
+ assert state < ST_CLOSING;
+ state = ST_CLOSING;
+ blocking = isBlocking();
+ }
+
+ // wait for any outstanding write to complete
+ if (blocking) {
+ synchronized (stateLock) {
+ assert state == ST_CLOSING;
+ long th = thread;
+ if (th != 0) {
+ nd.preClose(fd);
+ NativeThread.signal(th);
+
+ // wait for write operation to end
+ while (thread != 0) {
+ try {
+ stateLock.wait();
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ }
+ }
+ }
+ } else {
+ // non-blocking mode: wait for write to complete
+ writeLock.lock();
+ writeLock.unlock();
+ }
+
+ // set state to ST_KILLPENDING
+ synchronized (stateLock) {
+ assert state == ST_CLOSING;
+ state = ST_KILLPENDING;
+ }
+
+ // close socket if not registered with Selector
+ if (!isRegistered())
+ kill();
+
+ // restore interrupt status
+ if (interrupted)
+ Thread.currentThread().interrupt();
+ }
+
+ @Override
+ public void kill() throws IOException {
+ synchronized (stateLock) {
+ assert thread == 0;
+ if (state == ST_KILLPENDING) {
+ state = ST_KILLED;
+ nd.close(fd);
+ }
}
}
- public void kill() throws IOException {
- synchronized (stateLock) {
- if (state == ST_KILLED)
- return;
- if (state == ST_UNINITIALIZED) {
- state = ST_KILLED;
- return;
+ @Override
+ protected void implConfigureBlocking(boolean block) throws IOException {
+ writeLock.lock();
+ try {
+ synchronized (stateLock) {
+ IOUtil.configureBlocking(fd, block);
}
- assert !isOpen() && !isRegistered();
- nd.close(fd);
- state = ST_KILLED;
+ } finally {
+ writeLock.unlock();
}
}
- protected void implConfigureBlocking(boolean block) throws IOException {
- IOUtil.configureBlocking(fd, block);
- }
-
public boolean translateReadyOps(int ops, int initialOps,
SelectionKeyImpl sk) {
int intOps = sk.nioInterestOps();// Do this just once, it synchronizes
@@ -153,67 +201,95 @@
sk.selector.putEventOps(sk, ops);
}
- private void ensureOpen() throws IOException {
- if (!isOpen())
- throw new ClosedChannelException();
+ /**
+ * Marks the beginning of a write operation that might block.
+ *
+ * @throws ClosedChannelException if the channel is closed
+ * @throws NotYetConnectedException if the channel is not yet connected
+ */
+ private void beginWrite(boolean blocking) throws ClosedChannelException {
+ if (blocking) {
+ // set hook for Thread.interrupt
+ begin();
+ }
+ synchronized (stateLock) {
+ if (!isOpen())
+ throw new ClosedChannelException();
+ if (blocking)
+ thread = NativeThread.current();
+ }
}
+ /**
+ * Marks the end of a write operation that may have blocked.
+ *
+ * @throws AsynchronousCloseException if the channel was closed due to this
+ * thread being interrupted on a blocking write operation.
+ */
+ private void endWrite(boolean blocking, boolean completed)
+ throws AsynchronousCloseException
+ {
+ if (blocking) {
+ synchronized (stateLock) {
+ thread = 0;
+ // notify any thread waiting in implCloseSelectableChannel
+ if (state == ST_CLOSING) {
+ stateLock.notifyAll();
+ }
+ }
+ // remove hook for Thread.interrupt
+ end(completed);
+ }
+ }
+
+ @Override
public int write(ByteBuffer src) throws IOException {
+ Objects.requireNonNull(src);
+
writeLock.lock();
try {
- ensureOpen();
+ boolean blocking = isBlocking();
int n = 0;
try {
- begin();
- if (!isOpen())
- return 0;
- thread = NativeThread.current();
+ beginWrite(blocking);
do {
n = IOUtil.write(fd, src, -1, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
- return IOStatus.normalize(n);
} finally {
- thread = 0;
- end((n > 0) || (n == IOStatus.UNAVAILABLE));
+ endWrite(blocking, n > 0);
assert IOStatus.check(n);
}
+ return IOStatus.normalize(n);
} finally {
writeLock.unlock();
}
}
- public long write(ByteBuffer[] srcs) throws IOException {
- if (srcs == null)
- throw new NullPointerException();
+ @Override
+ public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
+ Objects.checkFromIndexSize(offset, length, srcs.length);
writeLock.lock();
try {
- ensureOpen();
+ boolean blocking = isBlocking();
long n = 0;
try {
- begin();
- if (!isOpen())
- return 0;
- thread = NativeThread.current();
+ beginWrite(blocking);
do {
- n = IOUtil.write(fd, srcs, nd);
+ n = IOUtil.write(fd, srcs, offset, length, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
- return IOStatus.normalize(n);
} finally {
- thread = 0;
- end((n > 0) || (n == IOStatus.UNAVAILABLE));
+ endWrite(blocking, n > 0);
assert IOStatus.check(n);
}
+ return IOStatus.normalize(n);
} finally {
writeLock.unlock();
}
}
- public long write(ByteBuffer[] srcs, int offset, int length)
- throws IOException
- {
- if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
- throw new IndexOutOfBoundsException();
- return write(Util.subsequence(srcs, offset, length));
+ @Override
+ public long write(ByteBuffer[] srcs) throws IOException {
+ return write(srcs, 0, srcs.length);
}
}