6944584: Improvements to subprocess handling on Unix
authormartin
Fri, 11 Jun 2010 18:55:45 -0700
changeset 5786 f60ef38202e7
parent 5785 5dfabe612d10
child 5787 a0af7b8e80ed
child 6857 a8683faf14f4
6944584: Improvements to subprocess handling on Unix Summary: use thread pool for reaper thread; move most I/O operations out of reaper thread Reviewed-by: michaelm, hiroshi
jdk/src/share/classes/java/lang/ProcessBuilder.java
jdk/src/solaris/classes/java/lang/UNIXProcess.java.linux
jdk/test/java/lang/ProcessBuilder/Basic.java
--- a/jdk/src/share/classes/java/lang/ProcessBuilder.java	Sat Jun 12 01:32:43 2010 +0100
+++ b/jdk/src/share/classes/java/lang/ProcessBuilder.java	Fri Jun 11 18:55:45 2010 -0700
@@ -418,6 +418,8 @@
      * Implements a <a href="#redirect-output">null input stream</a>.
      */
     static class NullInputStream extends InputStream {
+        static final NullInputStream INSTANCE = new NullInputStream();
+        private NullInputStream() {}
         public int read()      { return -1; }
         public int available() { return 0; }
     }
@@ -426,6 +428,8 @@
      * Implements a <a href="#redirect-input">null output stream</a>.
      */
     static class NullOutputStream extends OutputStream {
+        static final NullOutputStream INSTANCE = new NullOutputStream();
+        private NullOutputStream() {}
         public void write(int b) throws IOException {
             throw new IOException("Stream closed");
         }
--- a/jdk/src/solaris/classes/java/lang/UNIXProcess.java.linux	Sat Jun 12 01:32:43 2010 +0100
+++ b/jdk/src/solaris/classes/java/lang/UNIXProcess.java.linux	Fri Jun 11 18:55:45 2010 -0700
@@ -25,25 +25,42 @@
 
 package java.lang;
 
-import java.io.*;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadFactory;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
 
-/* java.lang.Process subclass in the UNIX environment.
+/**
+ * java.lang.Process subclass in the UNIX environment.
  *
  * @author Mario Wolczko and Ross Knippel.
  * @author Konstantin Kladko (ported to Linux)
+ * @author Martin Buchholz
  */
-
 final class UNIXProcess extends Process {
     private static final sun.misc.JavaIOFileDescriptorAccess fdAccess
         = sun.misc.SharedSecrets.getJavaIOFileDescriptorAccess();
 
-    private int pid;
+    private final int pid;
     private int exitcode;
     private boolean hasExited;
 
-    private OutputStream stdin_stream;
-    private InputStream  stdout_stream;
-    private InputStream  stderr_stream;
+    private /* final */ OutputStream stdin;
+    private /* final */ InputStream  stdout;
+    private /* final */ InputStream  stderr;
 
     /* this is for the reaping thread */
     private native int waitForProcessExit(int pid);
@@ -51,155 +68,136 @@
     /**
      * Create a process using fork(2) and exec(2).
      *
-     * @param std_fds array of file descriptors.  Indexes 0, 1, and
-     *        2 correspond to standard input, standard output and
-     *        standard error, respectively.  On input, a value of -1
-     *        means to create a pipe to connect child and parent
-     *        processes.  On output, a value which is not -1 is the
-     *        parent pipe fd corresponding to the pipe which has
-     *        been created.  An element of this array is -1 on input
-     *        if and only if it is <em>not</em> -1 on output.
+     * @param fds an array of three file descriptors.
+     *        Indexes 0, 1, and 2 correspond to standard input,
+     *        standard output and standard error, respectively.  On
+     *        input, a value of -1 means to create a pipe to connect
+     *        child and parent processes.  On output, a value which
+     *        is not -1 is the parent pipe fd corresponding to the
+     *        pipe which has been created.  An element of this array
+     *        is -1 on input if and only if it is <em>not</em> -1 on
+     *        output.
      * @return the pid of the subprocess
      */
     private native int forkAndExec(byte[] prog,
                                    byte[] argBlock, int argc,
                                    byte[] envBlock, int envc,
                                    byte[] dir,
-                                   int[] std_fds,
+                                   int[] fds,
                                    boolean redirectErrorStream)
         throws IOException;
 
-    /* In the process constructor we wait on this gate until the process    */
-    /* has been created. Then we return from the constructor.               */
-    /* fork() is called by the same thread which later waits for the process */
-    /* to terminate */
-
-    private static class Gate {
+    /**
+     * The thread factory used to create "process reaper" daemon threads.
+     */
+    private static class ProcessReaperThreadFactory implements ThreadFactory {
+        private final static ThreadGroup group = getRootThreadGroup();
 
-        private boolean exited = false;
-        private IOException savedException;
-
-        synchronized void exit() { /* Opens the gate */
-           exited = true;
-           this.notify();
+        private static ThreadGroup getRootThreadGroup() {
+            return AccessController.doPrivileged
+            (new PrivilegedAction<ThreadGroup> () {
+            public ThreadGroup run() {
+                ThreadGroup root = Thread.currentThread().getThreadGroup();
+                while (root.getParent() != null)
+                    root = root.getParent();
+                return root;
+            }});
         }
 
-        synchronized void waitForExit() { /* wait until the gate is open */
-            boolean interrupted = false;
-            while (!exited) {
-                try {
-                    this.wait();
-                } catch (InterruptedException e) {
-                    interrupted = true;
-                }
-            }
-            if (interrupted) {
-                Thread.currentThread().interrupt();
-            }
-        }
-
-        void setException (IOException e) {
-            savedException = e;
-        }
-
-        IOException getException() {
-            return savedException;
+        public Thread newThread(Runnable grimReaper) {
+            // Our thread stack requirement is quite modest.
+            Thread t = new Thread(group, grimReaper, "process reaper", 32768);
+            t.setDaemon(true);
+            // A small attempt (probably futile) to avoid priority inversion
+            t.setPriority(Thread.MAX_PRIORITY);
+            return t;
         }
     }
 
+    /**
+     * The thread pool of "process reaper" daemon threads.
+     */
+    private static final Executor processReaperExecutor
+        = Executors.newCachedThreadPool(new ProcessReaperThreadFactory());
+
     UNIXProcess(final byte[] prog,
                 final byte[] argBlock, final int argc,
                 final byte[] envBlock, final int envc,
                 final byte[] dir,
-                final int[] std_fds,
+                final int[] fds,
                 final boolean redirectErrorStream)
-    throws IOException {
+            throws IOException {
+
+        pid = forkAndExec(prog,
+                          argBlock, argc,
+                          envBlock, envc,
+                          dir,
+                          fds,
+                          redirectErrorStream);
 
-        final Gate gate = new Gate();
-        /*
-         * For each subprocess forked a corresponding reaper thread
-         * is started.  That thread is the only thread which waits
-         * for the subprocess to terminate and it doesn't hold any
-         * locks while doing so.  This design allows waitFor() and
-         * exitStatus() to be safely executed in parallel (and they
-         * need no native code).
-         */
+        try {
+            AccessController.doPrivileged
+            (new PrivilegedExceptionAction<Void>() {
+                public Void run() throws IOException {
+                    initStreams(fds);
+                    return null;
+                }});
+        } catch (PrivilegedActionException ex) {
+            throw (IOException) ex.getException();
+        }
+    }
+
+    static FileDescriptor newFileDescriptor(int fd) {
+        FileDescriptor fileDescriptor = new FileDescriptor();
+        fdAccess.set(fileDescriptor, fd);
+        return fileDescriptor;
+    }
 
-        java.security.AccessController.doPrivileged(
-        new java.security.PrivilegedAction<Void>() {
-        public Void run() {
-            Thread t = new Thread("process reaper") {
-                    public void run() {
-                        try {
-                            pid = forkAndExec(prog,
-                                              argBlock, argc,
-                                              envBlock, envc,
-                                              dir,
-                                              std_fds,
-                                              redirectErrorStream);
-                        } catch (IOException e) {
-                            gate.setException(e); /*remember to rethrow later*/
-                            gate.exit();
-                            return;
-                        }
-                        java.security.AccessController.doPrivileged(
-                    new java.security.PrivilegedAction<Void>() {
-                    public Void run() {
-                        if (std_fds[0] == -1)
-                            stdin_stream = new ProcessBuilder.NullOutputStream();
-                        else {
-                            FileDescriptor stdin_fd = new FileDescriptor();
-                            fdAccess.set(stdin_fd, std_fds[0]);
-                            stdin_stream = new BufferedOutputStream(
-                                new FileOutputStream(stdin_fd));
-                        }
+    void initStreams(int[] fds) throws IOException {
+        stdin = (fds[0] == -1) ?
+            ProcessBuilder.NullOutputStream.INSTANCE :
+            new ProcessPipeOutputStream(fds[0]);
+
+        stdout = (fds[1] == -1) ?
+            ProcessBuilder.NullInputStream.INSTANCE :
+            new ProcessPipeInputStream(fds[1]);
+
+        stderr = (fds[2] == -1) ?
+            ProcessBuilder.NullInputStream.INSTANCE :
+            new ProcessPipeInputStream(fds[2]);
 
-                        if (std_fds[1] == -1)
-                            stdout_stream = new ProcessBuilder.NullInputStream();
-                        else {
-                            FileDescriptor stdout_fd = new FileDescriptor();
-                            fdAccess.set(stdout_fd, std_fds[1]);
-                            stdout_stream = new BufferedInputStream(
-                                new FileInputStream(stdout_fd));
-                        }
-
-                        if (std_fds[2] == -1)
-                            stderr_stream = new ProcessBuilder.NullInputStream();
-                        else {
-                            FileDescriptor stderr_fd = new FileDescriptor();
-                            fdAccess.set(stderr_fd, std_fds[2]);
-                            stderr_stream = new FileInputStream(stderr_fd);
-                        }
+        processReaperExecutor.execute(new Runnable() {
+            public void run() {
+                int exitcode = waitForProcessExit(pid);
+                UNIXProcess.this.processExited(exitcode);
+            }});
+    }
 
-                        return null; }});
-                        gate.exit(); /* exit from constructor */
-                        int res = waitForProcessExit(pid);
-                        synchronized (UNIXProcess.this) {
-                            hasExited = true;
-                            exitcode = res;
-                            UNIXProcess.this.notifyAll();
-                        }
-                    }
-                };
-                t.setDaemon(true);
-                t.start();
-                return null; }});
-        gate.waitForExit();
-        IOException e = gate.getException();
-        if (e != null)
-            throw new IOException(e.toString());
+    synchronized void processExited(int exitcode) {
+        if (stdout instanceof ProcessPipeInputStream)
+            ((ProcessPipeInputStream) stdout).processExited();
+
+        if (stderr instanceof ProcessPipeInputStream)
+            ((ProcessPipeInputStream) stderr).processExited();
+
+        if (stdin instanceof ProcessPipeOutputStream)
+            ((ProcessPipeOutputStream) stdin).processExited();
+
+        this.exitcode = exitcode;
+        hasExited = true;
+        notifyAll();
     }
 
     public OutputStream getOutputStream() {
-        return stdin_stream;
+        return stdin;
     }
 
     public InputStream getInputStream() {
-        return stdout_stream;
+        return stdout;
     }
 
     public InputStream getErrorStream() {
-        return stderr_stream;
+        return stderr;
     }
 
     public synchronized int waitFor() throws InterruptedException {
@@ -228,13 +226,9 @@
             if (!hasExited)
                 destroyProcess(pid);
         }
-        try {
-            stdin_stream.close();
-            stdout_stream.close();
-            stderr_stream.close();
-        } catch (IOException e) {
-            // ignore
-        }
+        try { stdin.close();  } catch (IOException ignored) {}
+        try { stdout.close(); } catch (IOException ignored) {}
+        try { stderr.close(); } catch (IOException ignored) {}
     }
 
     /* This routine initializes JNI field offsets for the class */
@@ -243,4 +237,77 @@
     static {
         initIDs();
     }
+
+    /**
+     * A buffered input stream for a subprocess pipe file descriptor
+     * that allows the underlying file descriptor to be reclaimed when
+     * the process exits, via the processExited hook.
+     *
+     * This is tricky because we do not want the user-level InputStream to be
+     * closed until the user invokes close(), and we need to continue to be
+     * able to read any buffered data lingering in the OS pipe buffer.
+     */
+    static class ProcessPipeInputStream extends BufferedInputStream {
+        ProcessPipeInputStream(int fd) {
+            super(new FileInputStream(newFileDescriptor(fd)));
+        }
+
+        private static byte[] drainInputStream(InputStream in)
+                throws IOException {
+            if (in == null) return null;
+            int n = 0;
+            int j;
+            byte[] a = null;
+            while ((j = in.available()) > 0) {
+                a = (a == null) ? new byte[j] : Arrays.copyOf(a, n + j);
+                n += in.read(a, n, j);
+            }
+            return (a == null || n == a.length) ? a : Arrays.copyOf(a, n);
+        }
+
+        /** Called by the process reaper thread when the process exits. */
+        synchronized void processExited() {
+            // Most BufferedInputStream methods are synchronized, but close()
+            // is not, and so we have to handle concurrent racing close().
+            try {
+                InputStream in = this.in;
+                if (in != null) {
+                    byte[] stragglers = drainInputStream(in);
+                    in.close();
+                    this.in = (stragglers == null) ?
+                        ProcessBuilder.NullInputStream.INSTANCE :
+                        new ByteArrayInputStream(stragglers);
+                    if (buf == null) // asynchronous close()?
+                        this.in = null;
+                }
+            } catch (IOException ignored) {
+                // probably an asynchronous close().
+            }
+        }
+    }
+
+    /**
+     * A buffered output stream for a subprocess pipe file descriptor
+     * that allows the underlying file descriptor to be reclaimed when
+     * the process exits, via the processExited hook.
+     */
+    static class ProcessPipeOutputStream extends BufferedOutputStream {
+        ProcessPipeOutputStream(int fd) {
+            super(new FileOutputStream(newFileDescriptor(fd)));
+        }
+
+        /** Called by the process reaper thread when the process exits. */
+        synchronized void processExited() {
+            OutputStream out = this.out;
+            if (out != null) {
+                try {
+                    out.close();
+                } catch (IOException ignored) {
+                    // We know of no reason to get an IOException, but if
+                    // we do, there's nothing else to do but carry on.
+                }
+                this.out = ProcessBuilder.NullOutputStream.INSTANCE;
+            }
+        }
+    }
 }
--- a/jdk/test/java/lang/ProcessBuilder/Basic.java	Sat Jun 12 01:32:43 2010 +0100
+++ b/jdk/test/java/lang/ProcessBuilder/Basic.java	Fri Jun 11 18:55:45 2010 -0700
@@ -37,6 +37,7 @@
 
 import java.io.*;
 import java.util.*;
+import java.util.concurrent.CountDownLatch;
 import java.security.*;
 import java.util.regex.Pattern;
 import static java.lang.System.getenv;
@@ -252,9 +253,9 @@
         return sb.toString();
     }
 
-    static void print4095(OutputStream s) throws Throwable {
+    static void print4095(OutputStream s, byte b) throws Throwable {
         byte[] bytes = new byte[4095];
-        Arrays.fill(bytes, (byte) '!');
+        Arrays.fill(bytes, b);
         s.write(bytes);         // Might hang!
     }
 
@@ -273,7 +274,9 @@
     public static class JavaChild {
         public static void main(String args[]) throws Throwable {
             String action = args[0];
-            if (action.equals("testIO")) {
+            if (action.equals("sleep")) {
+                Thread.sleep(10 * 60 * 1000L);
+            } else if (action.equals("testIO")) {
                 String expected = "standard input";
                 char[] buf = new char[expected.length()+1];
                 int n = new InputStreamReader(System.in).read(buf,0,buf.length);
@@ -315,7 +318,8 @@
                 printUTF8(new File(System.getProperty("user.dir"))
                           .getCanonicalPath());
             } else if (action.equals("print4095")) {
-                print4095(System.out);
+                print4095(System.out, (byte) '!');
+                print4095(System.err, (byte) 'E');
                 System.exit(5);
             } else if (action.equals("OutErr")) {
                 // You might think the system streams would be
@@ -1717,16 +1721,107 @@
         } catch (Throwable t) { unexpected(t); }
 
         //----------------------------------------------------------------
-        // This would deadlock, if not for the fact that
+        // Attempt to write 4095 bytes to the pipe buffer without a
+        // reader to drain it would deadlock, if not for the fact that
         // interprocess pipe buffers are at least 4096 bytes.
+        //
+        // Also, check that available reports all the bytes expected
+        // in the pipe buffer, and that I/O operations do the expected
+        // things.
         //----------------------------------------------------------------
         try {
             List<String> childArgs = new ArrayList<String>(javaChildArgs);
             childArgs.add("print4095");
-            Process p = new ProcessBuilder(childArgs).start();
-            print4095(p.getOutputStream()); // Might hang!
-            p.waitFor();                    // Might hang!
+            final int SIZE = 4095;
+            final Process p = new ProcessBuilder(childArgs).start();
+            print4095(p.getOutputStream(), (byte) '!'); // Might hang!
+            p.waitFor();                                // Might hang!
+            equal(SIZE, p.getInputStream().available());
+            equal(SIZE, p.getErrorStream().available());
+            THROWS(IOException.class,
+                   new Fun(){void f() throws IOException {
+                       p.getOutputStream().write((byte) '!');
+                       p.getOutputStream().flush();
+                       }});
+
+            final byte[] bytes = new byte[SIZE + 1];
+            equal(SIZE, p.getInputStream().read(bytes));
+            for (int i = 0; i < SIZE; i++)
+                equal((byte) '!', bytes[i]);
+            equal((byte) 0, bytes[SIZE]);
+
+            equal(SIZE, p.getErrorStream().read(bytes));
+            for (int i = 0; i < SIZE; i++)
+                equal((byte) 'E', bytes[i]);
+            equal((byte) 0, bytes[SIZE]);
+
+            equal(0, p.getInputStream().available());
+            equal(0, p.getErrorStream().available());
+            equal(-1, p.getErrorStream().read());
+            equal(-1, p.getInputStream().read());
+
             equal(p.exitValue(), 5);
+
+            p.getInputStream().close();
+            p.getErrorStream().close();
+            p.getOutputStream().close();
+
+            InputStream[] streams = { p.getInputStream(), p.getErrorStream() };
+            for (final InputStream in : streams) {
+                Fun[] ops = {
+                    new Fun(){void f() throws IOException {
+                        in.read(); }},
+                    new Fun(){void f() throws IOException {
+                        in.read(bytes); }},
+                    new Fun(){void f() throws IOException {
+                        in.available(); }}
+                };
+                for (Fun op : ops) {
+                    try {
+                        op.f();
+                        fail();
+                    } catch (IOException expected) {
+                        check(expected.getMessage()
+                              .matches("[Ss]tream [Cc]losed"));
+                    }
+                }
+            }
+        } catch (Throwable t) { unexpected(t); }
+
+        //----------------------------------------------------------------
+        // Check that reads which are pending when Process.destroy is
+        // called, get EOF, not IOException("Stream closed").
+        //----------------------------------------------------------------
+        try {
+            final int cases = 4;
+            for (int i = 0; i < cases; i++) {
+                final int action = i;
+                List<String> childArgs = new ArrayList<String>(javaChildArgs);
+                childArgs.add("sleep");
+                final byte[] bytes = new byte[10];
+                final Process p = new ProcessBuilder(childArgs).start();
+                final CountDownLatch latch = new CountDownLatch(1);
+                final Thread thread = new Thread() {
+                    public void run() {
+                        try {
+                            latch.countDown();
+                            int r;
+                            switch (action) {
+                            case 0: r = p.getInputStream().read(); break;
+                            case 1: r = p.getErrorStream().read(); break;
+                            case 2: r = p.getInputStream().read(bytes); break;
+                            case 3: r = p.getErrorStream().read(bytes); break;
+                            default: throw new Error();
+                            }
+                            equal(-1, r);
+                        } catch (Throwable t) { unexpected(t); }}};
+
+                thread.start();
+                latch.await();
+                Thread.sleep(10);
+                p.destroy();
+                thread.join();
+            }
         } catch (Throwable t) { unexpected(t); }
 
         //----------------------------------------------------------------
@@ -1741,7 +1836,6 @@
         } catch (IOException e) {
             new File("./emptyCommand").delete();
             String m = e.getMessage();
-            //e.printStackTrace();
             if (EnglishUnix.is() &&
                 ! matches(m, "Permission denied"))
                 unexpected(e);