jdk/src/share/classes/java/io/PipedInputStream.java
changeset 2 90ce3da70b43
child 5506 202f599c92aa
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/classes/java/io/PipedInputStream.java	Sat Dec 01 00:00:00 2007 +0000
@@ -0,0 +1,450 @@
+/*
+ * Copyright 1995-2006 Sun Microsystems, Inc.  All Rights Reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Sun designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Sun in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+
+package java.io;
+
+/**
+ * A piped input stream should be connected
+ * to a piped output stream; the piped  input
+ * stream then provides whatever data bytes
+ * are written to the piped output  stream.
+ * Typically, data is read from a <code>PipedInputStream</code>
+ * object by one thread  and data is written
+ * to the corresponding <code>PipedOutputStream</code>
+ * by some  other thread. Attempting to use
+ * both objects from a single thread is not
+ * recommended, as it may deadlock the thread.
+ * The piped input stream contains a buffer,
+ * decoupling read operations from write operations,
+ * within limits.
+ * A pipe is said to be <a name=BROKEN> <i>broken</i> </a> if a
+ * thread that was providing data bytes to the connected
+ * piped output stream is no longer alive.
+ *
+ * @author  James Gosling
+ * @see     java.io.PipedOutputStream
+ * @since   JDK1.0
+ */
+public class PipedInputStream extends InputStream {
+    boolean closedByWriter = false;
+    volatile boolean closedByReader = false;
+    boolean connected = false;
+
+        /* REMIND: identification of the read and write sides needs to be
+           more sophisticated.  Either using thread groups (but what about
+           pipes within a thread?) or using finalization (but it may be a
+           long time until the next GC). */
+    Thread readSide;
+    Thread writeSide;
+
+    private static final int DEFAULT_PIPE_SIZE = 1024;
+
+    /**
+     * The default size of the pipe's circular input buffer.
+     * @since   JDK1.1
+     */
+    // This used to be a constant before the pipe size was allowed
+    // to change. This field will continue to be maintained
+    // for backward compatibility.
+    protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;
+
+    /**
+     * The circular buffer into which incoming data is placed.
+     * @since   JDK1.1
+     */
+    protected byte buffer[];
+
+    /**
+     * The index of the position in the circular buffer at which the
+     * next byte of data will be stored when received from the connected
+     * piped output stream. <code>in&lt;0</code> implies the buffer is empty,
+     * <code>in==out</code> implies the buffer is full
+     * @since   JDK1.1
+     */
+    protected int in = -1;
+
+    /**
+     * The index of the position in the circular buffer at which the next
+     * byte of data will be read by this piped input stream.
+     * @since   JDK1.1
+     */
+    protected int out = 0;
+
+    /**
+     * Creates a <code>PipedInputStream</code> so
+     * that it is connected to the piped output
+     * stream <code>src</code>. Data bytes written
+     * to <code>src</code> will then be  available
+     * as input from this stream.
+     *
+     * @param      src   the stream to connect to.
+     * @exception  IOException  if an I/O error occurs.
+     */
+    public PipedInputStream(PipedOutputStream src) throws IOException {
+        this(src, DEFAULT_PIPE_SIZE);
+    }
+
+    /**
+     * Creates a <code>PipedInputStream</code> so that it is
+     * connected to the piped output stream
+     * <code>src</code> and uses the specified pipe size for
+     * the pipe's buffer.
+     * Data bytes written to <code>src</code> will then
+     * be available as input from this stream.
+     *
+     * @param      src   the stream to connect to.
+     * @param      pipeSize the size of the pipe's buffer.
+     * @exception  IOException  if an I/O error occurs.
+     * @exception  IllegalArgumentException if <code>pipeSize <= 0</code>.
+     * @since      1.6
+     */
+    public PipedInputStream(PipedOutputStream src, int pipeSize)
+            throws IOException {
+         initPipe(pipeSize);
+         connect(src);
+    }
+
+    /**
+     * Creates a <code>PipedInputStream</code> so
+     * that it is not yet {@linkplain #connect(java.io.PipedOutputStream)
+     * connected}.
+     * It must be {@linkplain java.io.PipedOutputStream#connect(
+     * java.io.PipedInputStream) connected} to a
+     * <code>PipedOutputStream</code> before being used.
+     */
+    public PipedInputStream() {
+        initPipe(DEFAULT_PIPE_SIZE);
+    }
+
+    /**
+     * Creates a <code>PipedInputStream</code> so that it is not yet
+     * {@linkplain #connect(java.io.PipedOutputStream) connected} and
+     * uses the specified pipe size for the pipe's buffer.
+     * It must be {@linkplain java.io.PipedOutputStream#connect(
+     * java.io.PipedInputStream)
+     * connected} to a <code>PipedOutputStream</code> before being used.
+     *
+     * @param      pipeSize the size of the pipe's buffer.
+     * @exception  IllegalArgumentException if <code>pipeSize <= 0</code>.
+     * @since      1.6
+     */
+    public PipedInputStream(int pipeSize) {
+        initPipe(pipeSize);
+    }
+
+    private void initPipe(int pipeSize) {
+         if (pipeSize <= 0) {
+            throw new IllegalArgumentException("Pipe Size <= 0");
+         }
+         buffer = new byte[pipeSize];
+    }
+
+    /**
+     * Causes this piped input stream to be connected
+     * to the piped  output stream <code>src</code>.
+     * If this object is already connected to some
+     * other piped output  stream, an <code>IOException</code>
+     * is thrown.
+     * <p>
+     * If <code>src</code> is an
+     * unconnected piped output stream and <code>snk</code>
+     * is an unconnected piped input stream, they
+     * may be connected by either the call:
+     * <p>
+     * <pre><code>snk.connect(src)</code> </pre>
+     * <p>
+     * or the call:
+     * <p>
+     * <pre><code>src.connect(snk)</code> </pre>
+     * <p>
+     * The two
+     * calls have the same effect.
+     *
+     * @param      src   The piped output stream to connect to.
+     * @exception  IOException  if an I/O error occurs.
+     */
+    public void connect(PipedOutputStream src) throws IOException {
+        src.connect(this);
+    }
+
+    /**
+     * Receives a byte of data.  This method will block if no input is
+     * available.
+     * @param b the byte being received
+     * @exception IOException If the pipe is <a href=#BROKEN> <code>broken</code></a>,
+     *          {@link #connect(java.io.PipedOutputStream) unconnected},
+     *          closed, or if an I/O error occurs.
+     * @since     JDK1.1
+     */
+    protected synchronized void receive(int b) throws IOException {
+        checkStateForReceive();
+        writeSide = Thread.currentThread();
+        if (in == out)
+            awaitSpace();
+        if (in < 0) {
+            in = 0;
+            out = 0;
+        }
+        buffer[in++] = (byte)(b & 0xFF);
+        if (in >= buffer.length) {
+            in = 0;
+        }
+    }
+
+    /**
+     * Receives data into an array of bytes.  This method will
+     * block until some input is available.
+     * @param b the buffer into which the data is received
+     * @param off the start offset of the data
+     * @param len the maximum number of bytes received
+     * @exception IOException If the pipe is <a href=#BROKEN> broken</a>,
+     *           {@link #connect(java.io.PipedOutputStream) unconnected},
+     *           closed,or if an I/O error occurs.
+     */
+    synchronized void receive(byte b[], int off, int len)  throws IOException {
+        checkStateForReceive();
+        writeSide = Thread.currentThread();
+        int bytesToTransfer = len;
+        while (bytesToTransfer > 0) {
+            if (in == out)
+                awaitSpace();
+            int nextTransferAmount = 0;
+            if (out < in) {
+                nextTransferAmount = buffer.length - in;
+            } else if (in < out) {
+                if (in == -1) {
+                    in = out = 0;
+                    nextTransferAmount = buffer.length - in;
+                } else {
+                    nextTransferAmount = out - in;
+                }
+            }
+            if (nextTransferAmount > bytesToTransfer)
+                nextTransferAmount = bytesToTransfer;
+            assert(nextTransferAmount > 0);
+            System.arraycopy(b, off, buffer, in, nextTransferAmount);
+            bytesToTransfer -= nextTransferAmount;
+            off += nextTransferAmount;
+            in += nextTransferAmount;
+            if (in >= buffer.length) {
+                in = 0;
+            }
+        }
+    }
+
+    private void checkStateForReceive() throws IOException {
+        if (!connected) {
+            throw new IOException("Pipe not connected");
+        } else if (closedByWriter || closedByReader) {
+            throw new IOException("Pipe closed");
+        } else if (readSide != null && !readSide.isAlive()) {
+            throw new IOException("Read end dead");
+        }
+    }
+
+    private void awaitSpace() throws IOException {
+        while (in == out) {
+            checkStateForReceive();
+
+            /* full: kick any waiting readers */
+            notifyAll();
+            try {
+                wait(1000);
+            } catch (InterruptedException ex) {
+                throw new java.io.InterruptedIOException();
+            }
+        }
+    }
+
+    /**
+     * Notifies all waiting threads that the last byte of data has been
+     * received.
+     */
+    synchronized void receivedLast() {
+        closedByWriter = true;
+        notifyAll();
+    }
+
+    /**
+     * Reads the next byte of data from this piped input stream. The
+     * value byte is returned as an <code>int</code> in the range
+     * <code>0</code> to <code>255</code>.
+     * This method blocks until input data is available, the end of the
+     * stream is detected, or an exception is thrown.
+     *
+     * @return     the next byte of data, or <code>-1</code> if the end of the
+     *             stream is reached.
+     * @exception  IOException  if the pipe is
+     *           {@link #connect(java.io.PipedOutputStream) unconnected},
+     *           <a href=#BROKEN> <code>broken</code></a>, closed,
+     *           or if an I/O error occurs.
+     */
+    public synchronized int read()  throws IOException {
+        if (!connected) {
+            throw new IOException("Pipe not connected");
+        } else if (closedByReader) {
+            throw new IOException("Pipe closed");
+        } else if (writeSide != null && !writeSide.isAlive()
+                   && !closedByWriter && (in < 0)) {
+            throw new IOException("Write end dead");
+        }
+
+        readSide = Thread.currentThread();
+        int trials = 2;
+        while (in < 0) {
+            if (closedByWriter) {
+                /* closed by writer, return EOF */
+                return -1;
+            }
+            if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
+                throw new IOException("Pipe broken");
+            }
+            /* might be a writer waiting */
+            notifyAll();
+            try {
+                wait(1000);
+            } catch (InterruptedException ex) {
+                throw new java.io.InterruptedIOException();
+            }
+        }
+        int ret = buffer[out++] & 0xFF;
+        if (out >= buffer.length) {
+            out = 0;
+        }
+        if (in == out) {
+            /* now empty */
+            in = -1;
+        }
+
+        return ret;
+    }
+
+    /**
+     * Reads up to <code>len</code> bytes of data from this piped input
+     * stream into an array of bytes. Less than <code>len</code> bytes
+     * will be read if the end of the data stream is reached or if
+     * <code>len</code> exceeds the pipe's buffer size.
+     * If <code>len </code> is zero, then no bytes are read and 0 is returned;
+     * otherwise, the method blocks until at least 1 byte of input is
+     * available, end of the stream has been detected, or an exception is
+     * thrown.
+     *
+     * @param      b     the buffer into which the data is read.
+     * @param      off   the start offset in the destination array <code>b</code>
+     * @param      len   the maximum number of bytes read.
+     * @return     the total number of bytes read into the buffer, or
+     *             <code>-1</code> if there is no more data because the end of
+     *             the stream has been reached.
+     * @exception  NullPointerException If <code>b</code> is <code>null</code>.
+     * @exception  IndexOutOfBoundsException If <code>off</code> is negative,
+     * <code>len</code> is negative, or <code>len</code> is greater than
+     * <code>b.length - off</code>
+     * @exception  IOException if the pipe is <a href=#BROKEN> <code>broken</code></a>,
+     *           {@link #connect(java.io.PipedOutputStream) unconnected},
+     *           closed, or if an I/O error occurs.
+     */
+    public synchronized int read(byte b[], int off, int len)  throws IOException {
+        if (b == null) {
+            throw new NullPointerException();
+        } else if (off < 0 || len < 0 || len > b.length - off) {
+            throw new IndexOutOfBoundsException();
+        } else if (len == 0) {
+            return 0;
+        }
+
+        /* possibly wait on the first character */
+        int c = read();
+        if (c < 0) {
+            return -1;
+        }
+        b[off] = (byte) c;
+        int rlen = 1;
+        while ((in >= 0) && (len > 1)) {
+
+            int available;
+
+            if (in > out) {
+                available = Math.min((buffer.length - out), (in - out));
+            } else {
+                available = buffer.length - out;
+            }
+
+            // A byte is read beforehand outside the loop
+            if (available > (len - 1)) {
+                available = len - 1;
+            }
+            System.arraycopy(buffer, out, b, off + rlen, available);
+            out += available;
+            rlen += available;
+            len -= available;
+
+            if (out >= buffer.length) {
+                out = 0;
+            }
+            if (in == out) {
+                /* now empty */
+                in = -1;
+            }
+        }
+        return rlen;
+    }
+
+    /**
+     * Returns the number of bytes that can be read from this input
+     * stream without blocking.
+     *
+     * @return the number of bytes that can be read from this input stream
+     *         without blocking, or {@code 0} if this input stream has been
+     *         closed by invoking its {@link #close()} method, or if the pipe
+     *         is {@link #connect(java.io.PipedOutputStream) unconnected}, or
+     *          <a href=#BROKEN> <code>broken</code></a>.
+     *
+     * @exception  IOException  if an I/O error occurs.
+     * @since   JDK1.0.2
+     */
+    public synchronized int available() throws IOException {
+        if(in < 0)
+            return 0;
+        else if(in == out)
+            return buffer.length;
+        else if (in > out)
+            return in - out;
+        else
+            return in + buffer.length - out;
+    }
+
+    /**
+     * Closes this piped input stream and releases any system resources
+     * associated with the stream.
+     *
+     * @exception  IOException  if an I/O error occurs.
+     */
+    public void close()  throws IOException {
+        closedByReader = true;
+        synchronized (this) {
+            in = -1;
+        }
+    }
+}