src/java.base/unix/classes/sun/nio/ch/SinkChannelImpl.java
changeset 49001 ce06058197a4
parent 48750 ffbb784a8873
child 49493 814bd31f8da0
--- a/src/java.base/unix/classes/sun/nio/ch/SinkChannelImpl.java	Tue Feb 27 23:11:26 2018 -0800
+++ b/src/java.base/unix/classes/sun/nio/ch/SinkChannelImpl.java	Wed Feb 28 09:54:38 2018 +0000
@@ -28,31 +28,26 @@
 import java.io.FileDescriptor;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousCloseException;
 import java.nio.channels.ClosedChannelException;
+import java.nio.channels.NotYetConnectedException;
 import java.nio.channels.Pipe;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.spi.SelectorProvider;
+import java.util.Objects;
 import java.util.concurrent.locks.ReentrantLock;
 
-
 class SinkChannelImpl
     extends Pipe.SinkChannel
     implements SelChImpl
 {
-
     // Used to make native read and write calls
     private static final NativeDispatcher nd = new FileDispatcherImpl();
 
     // The file descriptor associated with this channel
     private final FileDescriptor fd;
-
-    // fd value needed for dev/poll. This value will remain valid
-    // even after the value in the file descriptor object has been set to -1
     private final int fdVal;
 
-    // ID of native thread doing write, for signalling
-    private volatile long thread;
-
     // Lock held by current writing thread
     private final ReentrantLock writeLock = new ReentrantLock();
 
@@ -63,10 +58,14 @@
     // -- The following fields are protected by stateLock
 
     // Channel state
-    private static final int ST_UNINITIALIZED = -1;
     private static final int ST_INUSE = 0;
-    private static final int ST_KILLED = 1;
-    private volatile int state = ST_UNINITIALIZED;
+    private static final int ST_CLOSING = 1;
+    private static final int ST_KILLPENDING = 2;
+    private static final int ST_KILLED = 3;
+    private int state;
+
+    // ID of native thread doing write, for signalling
+    private long thread;
 
     // -- End of fields protected by stateLock
 
@@ -83,39 +82,88 @@
         super(sp);
         this.fd = fd;
         this.fdVal = IOUtil.fdVal(fd);
-        this.state = ST_INUSE;
     }
 
+    /**
+     * Invoked by implCloseChannel to close the channel.
+     */
+    @Override
     protected void implCloseSelectableChannel() throws IOException {
+        assert !isOpen();
+
+        boolean interrupted = false;
+        boolean blocking;
+
+        // set state to ST_CLOSING
         synchronized (stateLock) {
-            if (state != ST_KILLED)
-                nd.preClose(fd);
-            long th = thread;
-            if (th != 0)
-                NativeThread.signal(th);
-            if (!isRegistered())
-                kill();
+            assert state < ST_CLOSING;
+            state = ST_CLOSING;
+            blocking = isBlocking();
+        }
+
+        // wait for any outstanding write to complete
+        if (blocking) {
+            synchronized (stateLock) {
+                assert state == ST_CLOSING;
+                long th = thread;
+                if (th != 0) {
+                    nd.preClose(fd);
+                    NativeThread.signal(th);
+
+                    // wait for write operation to end
+                    while (thread != 0) {
+                        try {
+                            stateLock.wait();
+                        } catch (InterruptedException e) {
+                            interrupted = true;
+                        }
+                    }
+                }
+            }
+        } else {
+            // non-blocking mode: wait for write to complete
+            writeLock.lock();
+            writeLock.unlock();
+        }
+
+        // set state to ST_KILLPENDING
+        synchronized (stateLock) {
+            assert state == ST_CLOSING;
+            state = ST_KILLPENDING;
+        }
+
+        // close socket if not registered with Selector
+        if (!isRegistered())
+            kill();
+
+        // restore interrupt status
+        if (interrupted)
+            Thread.currentThread().interrupt();
+    }
+
+    @Override
+    public void kill() throws IOException {
+        synchronized (stateLock) {
+            assert thread == 0;
+            if (state == ST_KILLPENDING) {
+                state = ST_KILLED;
+                nd.close(fd);
+            }
         }
     }
 
-    public void kill() throws IOException {
-        synchronized (stateLock) {
-            if (state == ST_KILLED)
-                return;
-            if (state == ST_UNINITIALIZED) {
-                state = ST_KILLED;
-                return;
+    @Override
+    protected void implConfigureBlocking(boolean block) throws IOException {
+        writeLock.lock();
+        try {
+            synchronized (stateLock) {
+                IOUtil.configureBlocking(fd, block);
             }
-            assert !isOpen() && !isRegistered();
-            nd.close(fd);
-            state = ST_KILLED;
+        } finally {
+            writeLock.unlock();
         }
     }
 
-    protected void implConfigureBlocking(boolean block) throws IOException {
-        IOUtil.configureBlocking(fd, block);
-    }
-
     public boolean translateReadyOps(int ops, int initialOps,
                                      SelectionKeyImpl sk) {
         int intOps = sk.nioInterestOps();// Do this just once, it synchronizes
@@ -153,67 +201,95 @@
         sk.selector.putEventOps(sk, ops);
     }
 
-    private void ensureOpen() throws IOException {
-        if (!isOpen())
-            throw new ClosedChannelException();
+    /**
+     * Marks the beginning of a write operation that might block.
+     *
+     * @throws ClosedChannelException if the channel is closed
+     * @throws NotYetConnectedException if the channel is not yet connected
+     */
+    private void beginWrite(boolean blocking) throws ClosedChannelException {
+        if (blocking) {
+            // set hook for Thread.interrupt
+            begin();
+        }
+        synchronized (stateLock) {
+            if (!isOpen())
+                throw new ClosedChannelException();
+            if (blocking)
+                thread = NativeThread.current();
+        }
     }
 
+    /**
+     * Marks the end of a write operation that may have blocked.
+     *
+     * @throws AsynchronousCloseException if the channel was closed due to this
+     * thread being interrupted on a blocking write operation.
+     */
+    private void endWrite(boolean blocking, boolean completed)
+        throws AsynchronousCloseException
+    {
+        if (blocking) {
+            synchronized (stateLock) {
+                thread = 0;
+                // notify any thread waiting in implCloseSelectableChannel
+                if (state == ST_CLOSING) {
+                    stateLock.notifyAll();
+                }
+            }
+            // remove hook for Thread.interrupt
+            end(completed);
+        }
+    }
+
+    @Override
     public int write(ByteBuffer src) throws IOException {
+        Objects.requireNonNull(src);
+
         writeLock.lock();
         try {
-            ensureOpen();
+            boolean blocking = isBlocking();
             int n = 0;
             try {
-                begin();
-                if (!isOpen())
-                    return 0;
-                thread = NativeThread.current();
+                beginWrite(blocking);
                 do {
                     n = IOUtil.write(fd, src, -1, nd);
                 } while ((n == IOStatus.INTERRUPTED) && isOpen());
-                return IOStatus.normalize(n);
             } finally {
-                thread = 0;
-                end((n > 0) || (n == IOStatus.UNAVAILABLE));
+                endWrite(blocking, n > 0);
                 assert IOStatus.check(n);
             }
+            return IOStatus.normalize(n);
         } finally {
             writeLock.unlock();
         }
     }
 
-    public long write(ByteBuffer[] srcs) throws IOException {
-        if (srcs == null)
-            throw new NullPointerException();
+    @Override
+    public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
+        Objects.checkFromIndexSize(offset, length, srcs.length);
 
         writeLock.lock();
         try {
-            ensureOpen();
+            boolean blocking = isBlocking();
             long n = 0;
             try {
-                begin();
-                if (!isOpen())
-                    return 0;
-                thread = NativeThread.current();
+                beginWrite(blocking);
                 do {
-                    n = IOUtil.write(fd, srcs, nd);
+                    n = IOUtil.write(fd, srcs, offset, length, nd);
                 } while ((n == IOStatus.INTERRUPTED) && isOpen());
-                return IOStatus.normalize(n);
             } finally {
-                thread = 0;
-                end((n > 0) || (n == IOStatus.UNAVAILABLE));
+                endWrite(blocking, n > 0);
                 assert IOStatus.check(n);
             }
+            return IOStatus.normalize(n);
         } finally {
             writeLock.unlock();
         }
     }
 
-    public long write(ByteBuffer[] srcs, int offset, int length)
-        throws IOException
-    {
-        if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
-           throw new IndexOutOfBoundsException();
-        return write(Util.subsequence(srcs, offset, length));
+    @Override
+    public long write(ByteBuffer[] srcs) throws IOException {
+        return write(srcs, 0, srcs.length);
     }
 }