8027348: (process) Enhancement of handling async close of ProcessInputStream
authorigerasim
Thu, 30 Jan 2014 00:02:46 +0400
changeset 22626 758d413359c4
parent 22625 50ee5cbd439b
child 22627 93f110be9439
8027348: (process) Enhancement of handling async close of ProcessInputStream Reviewed-by: martin
jdk/src/solaris/classes/java/lang/UNIXProcess.java.bsd
jdk/src/solaris/classes/java/lang/UNIXProcess.java.linux
jdk/test/java/lang/ProcessBuilder/CloseRace.java
jdk/test/java/lang/Runtime/exec/CloseRace.java
--- a/jdk/src/solaris/classes/java/lang/UNIXProcess.java.bsd	Fri Jan 31 11:10:36 2014 +0000
+++ b/jdk/src/solaris/classes/java/lang/UNIXProcess.java.bsd	Thu Jan 30 00:02:46 2014 +0400
@@ -342,47 +342,39 @@
         ProcessPipeInputStream(int fd) {
             super(new FileInputStream(newFileDescriptor(fd)));
         }
-
-        private InputStream drainInputStream(InputStream in)
+        private static byte[] drainInputStream(InputStream in)
                 throws IOException {
             int n = 0;
             int j;
             byte[] a = null;
-            synchronized (closeLock) {
-                if (buf == null) // asynchronous close()?
-                    return null; // discard
-                j = in.available();
-            }
-            while (j > 0) {
+            while ((j = in.available()) > 0) {
                 a = (a == null) ? new byte[j] : Arrays.copyOf(a, n + j);
-                synchronized (closeLock) {
-                    if (buf == null) // asynchronous close()?
-                        return null; // discard
-                    n += in.read(a, n, j);
-                    j = in.available();
-                }
+                n += in.read(a, n, j);
             }
-            return (a == null) ?
-                    ProcessBuilder.NullInputStream.INSTANCE :
-                    new ByteArrayInputStream(n == a.length ? a : Arrays.copyOf(a, n));
+            return (a == null || n == a.length) ? a : Arrays.copyOf(a, n);
         }
 
         /** Called by the process reaper thread when the process exits. */
         synchronized void processExited() {
-            try {
-                InputStream in = this.in;
-                if (in != null) {
-                    InputStream stragglers = drainInputStream(in);
-                    in.close();
-                    this.in = stragglers;
-                }
-            } catch (IOException ignored) { }
+            synchronized (closeLock) {
+                try {
+                    InputStream in = this.in;
+                    // this stream is closed if and only if: in == null
+                    if (in != null) {
+                        byte[] stragglers = drainInputStream(in);
+                        in.close();
+                        this.in = (stragglers == null) ?
+                            ProcessBuilder.NullInputStream.INSTANCE :
+                            new ByteArrayInputStream(stragglers);
+                    }
+                } catch (IOException ignored) {}
+            }
         }
 
         @Override
         public void close() throws IOException {
             // BufferedInputStream#close() is not synchronized unlike most other methods.
-            // Synchronizing helps avoid racing with drainInputStream().
+            // Synchronizing helps avoid race with processExited().
             synchronized (closeLock) {
                 super.close();
             }
--- a/jdk/src/solaris/classes/java/lang/UNIXProcess.java.linux	Fri Jan 31 11:10:36 2014 +0000
+++ b/jdk/src/solaris/classes/java/lang/UNIXProcess.java.linux	Thu Jan 30 00:02:46 2014 +0400
@@ -344,47 +344,39 @@
         ProcessPipeInputStream(int fd) {
             super(new FileInputStream(newFileDescriptor(fd)));
         }
-
-        private InputStream drainInputStream(InputStream in)
+        private static byte[] drainInputStream(InputStream in)
                 throws IOException {
             int n = 0;
             int j;
             byte[] a = null;
-            synchronized (closeLock) {
-                if (buf == null) // asynchronous close()?
-                    return null; // discard
-                j = in.available();
-            }
-            while (j > 0) {
+            while ((j = in.available()) > 0) {
                 a = (a == null) ? new byte[j] : Arrays.copyOf(a, n + j);
-                synchronized (closeLock) {
-                    if (buf == null) // asynchronous close()?
-                        return null; // discard
-                    n += in.read(a, n, j);
-                    j = in.available();
-                }
+                n += in.read(a, n, j);
             }
-            return (a == null) ?
-                    ProcessBuilder.NullInputStream.INSTANCE :
-                    new ByteArrayInputStream(n == a.length ? a : Arrays.copyOf(a, n));
+            return (a == null || n == a.length) ? a : Arrays.copyOf(a, n);
         }
 
         /** Called by the process reaper thread when the process exits. */
         synchronized void processExited() {
-            try {
-                InputStream in = this.in;
-                if (in != null) {
-                    InputStream stragglers = drainInputStream(in);
-                    in.close();
-                    this.in = stragglers;
-                }
-            } catch (IOException ignored) { }
+            synchronized (closeLock) {
+                try {
+                    InputStream in = this.in;
+                    // this stream is closed if and only if: in == null
+                    if (in != null) {
+                        byte[] stragglers = drainInputStream(in);
+                        in.close();
+                        this.in = (stragglers == null) ?
+                            ProcessBuilder.NullInputStream.INSTANCE :
+                            new ByteArrayInputStream(stragglers);
+                    }
+                } catch (IOException ignored) {}
+            }
         }
 
         @Override
         public void close() throws IOException {
             // BufferedInputStream#close() is not synchronized unlike most other methods.
-            // Synchronizing helps avoid racing with drainInputStream().
+            // Synchronizing helps avoid race with processExited().
             synchronized (closeLock) {
                 super.close();
             }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/lang/ProcessBuilder/CloseRace.java	Thu Jan 30 00:02:46 2014 +0400
@@ -0,0 +1,140 @@
+/*
+ * Copyright (c) 2013, 2014 Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/**
+ * @test
+ * @bug 8024521
+ * @summary Closing ProcessPipeInputStream at the time the process exits is racy
+ *          and leads to data corruption. Run this test manually (as
+ *          an ordinary java program) with  -Xmx8M  to repro bug 8024521.
+ * @run main/othervm -Xmx8M -Dtest.duration=2 CloseRace
+ */
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
+
+public class CloseRace {
+    private static final String BIG_FILE = "bigfile";
+
+    private static final int[] procFDs = new int[6];
+
+    /** default value sufficient to repro bug 8024521. */
+    private static final int testDurationSeconds
+        = Integer.getInteger("test.duration", 600);
+
+    static boolean fdInUse(int i) {
+        return new File("/proc/self/fd/" + i).exists();
+    }
+
+    static boolean[] procFDsInUse() {
+        boolean[] inUse = new boolean[procFDs.length];
+        for (int i = 0; i < procFDs.length; i++)
+            inUse[i] = fdInUse(procFDs[i]);
+        return inUse;
+    }
+
+    static int count(boolean[] bits) {
+        int count = 0;
+        for (int i = 0; i < bits.length; i++)
+            count += bits[i] ? 1 : 0;
+        return count;
+    }
+
+    public static void main(String args[]) throws Exception {
+        if (!(new File("/proc/self/fd").isDirectory()))
+            return;
+
+        // Catch Errors from process reaper
+        Thread.setDefaultUncaughtExceptionHandler
+            ((t, e) -> { e.printStackTrace(); System.exit(1); });
+
+        try (RandomAccessFile f = new RandomAccessFile(BIG_FILE, "rw")) {
+            f.setLength(Runtime.getRuntime().maxMemory()); // provoke OOME
+        }
+
+        for (int i = 0, j = 0; j < procFDs.length; i++)
+            if (!fdInUse(i))
+                procFDs[j++] = i;
+
+        Thread[] threads = {
+            new Thread(new OpenLoop()),
+            new Thread(new ExecLoop()),
+        };
+        for (Thread thread : threads)
+            thread.start();
+
+        Thread.sleep(testDurationSeconds * 1000);
+
+        for (Thread thread : threads)
+            thread.interrupt();
+        for (Thread thread : threads)
+            thread.join();
+    }
+
+    static class OpenLoop implements Runnable {
+        public void run() {
+            while (!Thread.interrupted()) {
+                try {
+                    // wait for ExecLoop to finish creating process
+                    do {} while (count(procFDsInUse()) != 3);
+                    List<InputStream> iss = new ArrayList<>(4);
+
+                    // eat up three "holes" (closed ends of pipe fd pairs)
+                    for (int i = 0; i < 3; i++)
+                        iss.add(new FileInputStream(BIG_FILE));
+                    do {} while (count(procFDsInUse()) == procFDs.length);
+                    // hopefully this will racily occupy empty fd slot
+                    iss.add(new FileInputStream(BIG_FILE));
+                    Thread.sleep(1); // Widen race window
+                    for (InputStream is : iss)
+                        is.close();
+                } catch (InterruptedException e) {
+                    break;
+                } catch (Exception e) {
+                    throw new Error(e);
+                }
+            }
+        }
+    }
+
+    static class ExecLoop implements Runnable {
+        public void run() {
+            ProcessBuilder builder = new ProcessBuilder("/bin/true");
+            while (!Thread.interrupted()) {
+                try {
+                    // wait for OpenLoop to finish
+                    do {} while (count(procFDsInUse()) > 0);
+                    Process process = builder.start();
+                    InputStream is = process.getInputStream();
+                    process.waitFor();
+                    is.close();
+                } catch (InterruptedException e) {
+                    break;
+                } catch (Exception e) {
+                    throw new Error(e);
+                }
+            }
+        }
+    }
+}
--- a/jdk/test/java/lang/Runtime/exec/CloseRace.java	Fri Jan 31 11:10:36 2014 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,146 +0,0 @@
-/*
- * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-/**
- * @test
- * @bug 8024521
- * @summary Closing ProcessPipeInputStream at the time the process exits is racy
- *          and leads to the data corruption.
- * @library /lib/testlibrary
- * @run main/othervm/timeout=80 CloseRace
- */
-
-/**
- * This test has a little chance to catch the race during the given default
- * time gap of 20 seconds. To increase the time gap, set the system property
- * CloseRaceTimeGap=N to the number of seconds.
- * Jtreg's timeoutFactor should also be set appropriately.
- *
- * For example, to run the test for 10 minutes:
- * > jtreg \
- *       -testjdk:$(PATH_TO_TESTED_JDK) \
- *       -timeoutFactor:10 \
- *       -DCloseRaceTimeGap=600 \
- *       $(PATH_TO_TESTED_JDK_SOURCE)/test/java/lang/Runtime/exec/CloseRace.java
- */
-
-import java.io.*;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-import jdk.testlibrary.OutputAnalyzer;
-import static jdk.testlibrary.ProcessTools.*;
-
-public class CloseRace {
-
-    public static void main(String args[]) throws Exception {
-        ProcessBuilder pb = createJavaProcessBuilder("-Xmx64M", "CloseRace$Child",
-                System.getProperty("CloseRaceTimeGap", "20"));
-        OutputAnalyzer oa = new OutputAnalyzer(pb.start());
-        oa.stderrShouldNotContain("java.lang.OutOfMemoryError");
-    }
-
-    public static class Child {
-        private static final String BIG_FILE = "bigfile";
-        private static final String SMALL_FILE = "smallfile";
-        private static int timeGap = 20; // seconds
-
-        public static void main(String args[]) throws Exception {
-            if (args.length > 0) {
-                try {
-                    timeGap = Integer.parseUnsignedInt(args[0]);
-                    timeGap = Integer.max(timeGap, 10);
-                    timeGap = Integer.min(timeGap, 10 * 60 * 60); // no more than 10 hours
-                } catch (NumberFormatException ignore) {}
-            }
-            try (RandomAccessFile f = new RandomAccessFile(BIG_FILE, "rw")) {
-                f.setLength(1024 * 1024 * 1024); // 1 Gb, greater than max heap size
-            }
-            try (FileOutputStream fs = new FileOutputStream(SMALL_FILE);
-                 PrintStream ps = new PrintStream(fs)) {
-                for (int i = 0; i < 128; ++i)
-                    ps.println("line of text");
-            }
-
-            List<Thread> threads = new LinkedList<>();
-            for (int i = 0; i < 99; ++i) {
-                Thread t = new Thread (new OpenLoop());
-                t.start();
-                threads.add(t);
-            }
-            Thread t2 = new Thread (new ExecLoop());
-            t2.start();
-            threads.add(t2);
-
-            Thread.sleep(timeGap);
-
-            for (Thread t : threads) {
-                t.interrupt();
-                t.join();
-            }
-        }
-
-        private static class OpenLoop implements Runnable {
-            public void run() {
-                final Path bigFilePath = Paths.get(BIG_FILE);
-                while (!Thread.interrupted()) {
-                    try (InputStream in = Files.newInputStream(bigFilePath)) {
-                        // Widen the race window by sleeping 1ms
-                        Thread.sleep(1);
-                    } catch (InterruptedException e) {
-                        break;
-                    } catch (Exception e) {
-                        System.err.println(e);
-                    }
-                }
-            }
-        }
-
-        private static class ExecLoop implements Runnable {
-            public void run() {
-                List<String> command = new ArrayList<>(
-                        Arrays.asList("/bin/cat", SMALL_FILE));
-                while (!Thread.interrupted()) {
-                    try {
-                        ProcessBuilder builder = new ProcessBuilder(command);
-                        final Process process = builder.start();
-                        InputStream is = process.getInputStream();
-                        InputStreamReader isr = new InputStreamReader(is);
-                        BufferedReader br = new BufferedReader(isr);
-                        while (br.readLine() != null) {}
-                        process.waitFor();
-                        isr.close();
-                    } catch (InterruptedException e) {
-                        break;
-                    } catch (Exception e) {
-                        System.err.println(e);
-                    }
-                }
-            }
-        }
-    }
-}