6979009: (fc) FileChannel.read() fails to throw ClosedByInterruptException
authoralanb
Tue, 09 Nov 2010 18:56:39 +0000
changeset 7177 0113db4feebc
parent 7172 01308fd663b3
child 7178 03d19396adb9
6979009: (fc) FileChannel.read() fails to throw ClosedByInterruptException Reviewed-by: forax, sherman, chegar
jdk/src/share/classes/java/lang/Thread.java
jdk/src/share/classes/java/nio/channels/spi/AbstractInterruptibleChannel.java
jdk/src/share/classes/java/nio/channels/spi/AbstractSelector.java
jdk/src/share/classes/sun/nio/ch/FileChannelImpl.java
jdk/src/share/classes/sun/nio/ch/Interruptible.java
jdk/test/java/nio/channels/FileChannel/ClosedByInterrupt.java
--- a/jdk/src/share/classes/java/lang/Thread.java	Sat Nov 06 09:11:18 2010 +0800
+++ b/jdk/src/share/classes/java/lang/Thread.java	Tue Nov 09 18:56:39 2010 +0000
@@ -958,7 +958,7 @@
             Interruptible b = blocker;
             if (b != null) {
                 interrupt0();           // Just to set the interrupt flag
-                b.interrupt();
+                b.interrupt(this);
                 return;
             }
         }
--- a/jdk/src/share/classes/java/nio/channels/spi/AbstractInterruptibleChannel.java	Sat Nov 06 09:11:18 2010 +0800
+++ b/jdk/src/share/classes/java/nio/channels/spi/AbstractInterruptibleChannel.java	Tue Nov 09 18:56:39 2010 +0000
@@ -88,7 +88,7 @@
     implements Channel, InterruptibleChannel
 {
 
-    private Object closeLock = new Object();
+    private final Object closeLock = new Object();
     private volatile boolean open = true;
 
     /**
@@ -142,7 +142,7 @@
     // -- Interruption machinery --
 
     private Interruptible interruptor;
-    private volatile boolean interrupted = false;
+    private volatile Thread interrupted;
 
     /**
      * Marks the beginning of an I/O operation that might block indefinitely.
@@ -155,12 +155,12 @@
     protected final void begin() {
         if (interruptor == null) {
             interruptor = new Interruptible() {
-                    public void interrupt() {
+                    public void interrupt(Thread target) {
                         synchronized (closeLock) {
                             if (!open)
                                 return;
-                            interrupted = true;
                             open = false;
+                            interrupted = target;
                             try {
                                 AbstractInterruptibleChannel.this.implCloseChannel();
                             } catch (IOException x) { }
@@ -168,8 +168,9 @@
                     }};
         }
         blockedOn(interruptor);
-        if (Thread.currentThread().isInterrupted())
-            interruptor.interrupt();
+        Thread me = Thread.currentThread();
+        if (me.isInterrupted())
+            interruptor.interrupt(me);
     }
 
     /**
@@ -195,12 +196,13 @@
         throws AsynchronousCloseException
     {
         blockedOn(null);
-        if (completed) {
-            interrupted = false;
-            return;
+        Thread interrupted = this.interrupted;
+        if (interrupted != null && interrupted == Thread.currentThread()) {
+            interrupted = null;
+            throw new ClosedByInterruptException();
         }
-        if (interrupted) throw new ClosedByInterruptException();
-        if (!open) throw new AsynchronousCloseException();
+        if (!completed && !open)
+            throw new AsynchronousCloseException();
     }
 
 
--- a/jdk/src/share/classes/java/nio/channels/spi/AbstractSelector.java	Sat Nov 06 09:11:18 2010 +0800
+++ b/jdk/src/share/classes/java/nio/channels/spi/AbstractSelector.java	Tue Nov 09 18:56:39 2010 +0000
@@ -206,13 +206,14 @@
     protected final void begin() {
         if (interruptor == null) {
             interruptor = new Interruptible() {
-                    public void interrupt() {
+                    public void interrupt(Thread ignore) {
                         AbstractSelector.this.wakeup();
                     }};
         }
         AbstractInterruptibleChannel.blockedOn(interruptor);
-        if (Thread.currentThread().isInterrupted())
-            interruptor.interrupt();
+        Thread me = Thread.currentThread();
+        if (me.isInterrupted())
+            interruptor.interrupt(me);
     }
 
     /**
--- a/jdk/src/share/classes/sun/nio/ch/FileChannelImpl.java	Sat Nov 06 09:11:18 2010 +0800
+++ b/jdk/src/share/classes/sun/nio/ch/FileChannelImpl.java	Tue Nov 09 18:56:39 2010 +0000
@@ -460,6 +460,16 @@
                 } finally {
                     unmap(dbb);
                 }
+            } catch (ClosedByInterruptException e) {
+                // target closed by interrupt as ClosedByInterruptException needs
+                // to be thrown after closing this channel.
+                assert !target.isOpen();
+                try {
+                    close();
+                } catch (IOException ignore) {
+                    // nothing we can do
+                }
+                throw e;
             } catch (IOException ioe) {
                 // Only throw exception if no bytes have been written
                 if (remaining == count)
--- a/jdk/src/share/classes/sun/nio/ch/Interruptible.java	Sat Nov 06 09:11:18 2010 +0800
+++ b/jdk/src/share/classes/sun/nio/ch/Interruptible.java	Tue Nov 09 18:56:39 2010 +0000
@@ -23,14 +23,14 @@
  * questions.
  */
 
-/*
+/**
+ * An object that interrupts a thread blocked in an I/O operation.
  */
 
 package sun.nio.ch;
 
-
 public interface Interruptible {
 
-    public void interrupt();
+    public void interrupt(Thread t);
 
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/nio/channels/FileChannel/ClosedByInterrupt.java	Tue Nov 09 18:56:39 2010 +0000
@@ -0,0 +1,177 @@
+/*
+ * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/* @test
+ * @bug 6979009
+ * @summary Ensure ClosedByInterruptException is thrown when I/O operation
+ *     interrupted by Thread.interrupt
+ */
+
+import java.io.*;
+import java.util.Random;
+import java.nio.ByteBuffer;
+import java.nio.channels.*;
+
+public class ClosedByInterrupt {
+
+    static final int K = 1024;
+    static final Random rand = new Random();
+
+    static volatile boolean failed;
+
+    public static void main(String[] args) throws Exception {
+        File f = File.createTempFile("blah", null);
+        f.deleteOnExit();
+
+        // create 1MB file.
+        byte[] b = new byte[K*K];
+        rand.nextBytes(b);
+        ByteBuffer bb = ByteBuffer.wrap(b);
+        try (FileChannel fc = new FileOutputStream(f).getChannel()) {
+            while (bb.hasRemaining())
+                fc.write(bb);
+        }
+
+        // test with 1-8 concurrent threads
+        for (int i=1; i<=8; i++) {
+            System.out.format("%d thread(s)%n", i);
+            test(f, i);
+            if (failed)
+                break;
+        }
+    }
+
+    /**
+     * Starts "nThreads" that do I/O on the given file concurrently. Continuously
+     * interrupts one of the threads to cause the file to be closed and
+     * ClosedByInterruptException to be thrown. The other threads should "fail" with
+     * ClosedChannelException (or the more specific AsynchronousCloseException).
+     */
+    static void test(File f, int nThreads) throws Exception {
+        try (FileChannel fc = new RandomAccessFile(f, "rwd").getChannel()) {
+            Thread[] threads = new Thread[nThreads];
+
+            // start threads
+            for (int i=0; i<nThreads; i++) {
+                boolean interruptible = (i==0);
+                ReaderWriter task = new ReaderWriter(fc, interruptible);
+                Thread t = new Thread(task);
+                t.start();
+                threads[i] = t;
+            }
+
+            // give time for threads to start
+            Thread.sleep(500 + rand.nextInt(1000));
+
+            // interrupt thread until channel is closed
+            while (fc.isOpen()) {
+                threads[0].interrupt();
+                Thread.sleep(rand.nextInt(50));
+            }
+
+            // wait for test to finish
+            for (int i=0; i<nThreads; i++) {
+                threads[i].join();
+            }
+        }
+    }
+
+    /**
+     * A task that continuously reads or writes to random areas of a file
+     * until the channel is closed. An "interruptible" task expects the
+     * channel to be closed by an interupt, a "non-interruptible" thread
+     * does not.
+     */
+    static class ReaderWriter implements Runnable {
+        final FileChannel fc;
+        final boolean interruptible;
+        final boolean writer;
+
+        ReaderWriter(FileChannel fc, boolean interruptible) {
+            this.fc = fc;
+            this.interruptible = interruptible;
+            this.writer = rand.nextBoolean();
+        }
+
+        public void run() {
+            ByteBuffer bb = ByteBuffer.allocate(K);
+            if (writer)
+                rand.nextBytes(bb.array());
+
+            try {
+                for (;;) {
+                    long position = rand.nextInt(K*K - bb.capacity());
+                    if (writer) {
+                        bb.position(0).limit(bb.capacity());
+                        fc.write(bb, position);
+                    } else {
+                        bb.clear();
+                        fc.read(bb, position);
+                    }
+                    if (!interruptible) {
+                        // give the interruptible thread a chance
+                        try {
+                            Thread.sleep(rand.nextInt(50));
+                        } catch (InterruptedException ignore) { }
+                    }
+                }
+            } catch (ClosedByInterruptException e) {
+                if (interruptible) {
+                    if (Thread.currentThread().isInterrupted()) {
+                        expected(e + " thrown and interrupt status set");
+                    } else {
+                        unexpected(e + " thrown but interrupt status not set");
+                    }
+                } else {
+                    unexpected(e);
+                }
+            } catch (ClosedChannelException e) {
+                if (interruptible) {
+                    unexpected(e);
+                } else {
+                    expected(e);
+                }
+            } catch (Exception e) {
+                unexpected(e);
+            }
+        }
+    }
+
+    static void expected(Exception e) {
+        System.out.format("%s (not expected)%n", e);
+    }
+
+    static void expected(String msg) {
+        System.out.format("%s (expected)%n", msg);
+    }
+
+    static void unexpected(Exception e) {
+        System.err.format("%s (not expected)%n", e);
+        failed = true;
+    }
+
+    static void unexpected(String msg) {
+        System.err.println(msg);
+        failed = true;
+    }
+}