jdk/src/share/classes/java/io/PipedInputStream.java
author alanb
Thu, 18 Aug 2011 16:47:20 +0100
changeset 10347 1c9efe1ec7d3
parent 5506 202f599c92aa
child 18156 edb590d448c5
permissions -rw-r--r--
7015589: (spec) BufferedWriter.close leaves stream open if close of underlying Writer fails Reviewed-by: forax, mduigou
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
     1
/*
5506
202f599c92aa 6943119: Rebrand source copyright notices
ohair
parents: 2
diff changeset
     2
 * Copyright (c) 1995, 2006, Oracle and/or its affiliates. All rights reserved.
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
     3
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
90ce3da70b43 Initial load
duke
parents:
diff changeset
     4
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
     5
 * This code is free software; you can redistribute it and/or modify it
90ce3da70b43 Initial load
duke
parents:
diff changeset
     6
 * under the terms of the GNU General Public License version 2 only, as
5506
202f599c92aa 6943119: Rebrand source copyright notices
ohair
parents: 2
diff changeset
     7
 * published by the Free Software Foundation.  Oracle designates this
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
     8
 * particular file as subject to the "Classpath" exception as provided
5506
202f599c92aa 6943119: Rebrand source copyright notices
ohair
parents: 2
diff changeset
     9
 * by Oracle in the LICENSE file that accompanied this code.
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
    10
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
    11
 * This code is distributed in the hope that it will be useful, but WITHOUT
90ce3da70b43 Initial load
duke
parents:
diff changeset
    12
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
90ce3da70b43 Initial load
duke
parents:
diff changeset
    13
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
90ce3da70b43 Initial load
duke
parents:
diff changeset
    14
 * version 2 for more details (a copy is included in the LICENSE file that
90ce3da70b43 Initial load
duke
parents:
diff changeset
    15
 * accompanied this code).
90ce3da70b43 Initial load
duke
parents:
diff changeset
    16
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
    17
 * You should have received a copy of the GNU General Public License version
90ce3da70b43 Initial load
duke
parents:
diff changeset
    18
 * 2 along with this work; if not, write to the Free Software Foundation,
90ce3da70b43 Initial load
duke
parents:
diff changeset
    19
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
90ce3da70b43 Initial load
duke
parents:
diff changeset
    20
 *
5506
202f599c92aa 6943119: Rebrand source copyright notices
ohair
parents: 2
diff changeset
    21
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
202f599c92aa 6943119: Rebrand source copyright notices
ohair
parents: 2
diff changeset
    22
 * or visit www.oracle.com if you need additional information or have any
202f599c92aa 6943119: Rebrand source copyright notices
ohair
parents: 2
diff changeset
    23
 * questions.
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
    24
 */
90ce3da70b43 Initial load
duke
parents:
diff changeset
    25
90ce3da70b43 Initial load
duke
parents:
diff changeset
    26
package java.io;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    27
90ce3da70b43 Initial load
duke
parents:
diff changeset
    28
/**
90ce3da70b43 Initial load
duke
parents:
diff changeset
    29
 * A piped input stream should be connected
90ce3da70b43 Initial load
duke
parents:
diff changeset
    30
 * to a piped output stream; the piped  input
90ce3da70b43 Initial load
duke
parents:
diff changeset
    31
 * stream then provides whatever data bytes
90ce3da70b43 Initial load
duke
parents:
diff changeset
    32
 * are written to the piped output  stream.
90ce3da70b43 Initial load
duke
parents:
diff changeset
    33
 * Typically, data is read from a <code>PipedInputStream</code>
90ce3da70b43 Initial load
duke
parents:
diff changeset
    34
 * object by one thread  and data is written
90ce3da70b43 Initial load
duke
parents:
diff changeset
    35
 * to the corresponding <code>PipedOutputStream</code>
90ce3da70b43 Initial load
duke
parents:
diff changeset
    36
 * by some  other thread. Attempting to use
90ce3da70b43 Initial load
duke
parents:
diff changeset
    37
 * both objects from a single thread is not
90ce3da70b43 Initial load
duke
parents:
diff changeset
    38
 * recommended, as it may deadlock the thread.
90ce3da70b43 Initial load
duke
parents:
diff changeset
    39
 * The piped input stream contains a buffer,
90ce3da70b43 Initial load
duke
parents:
diff changeset
    40
 * decoupling read operations from write operations,
90ce3da70b43 Initial load
duke
parents:
diff changeset
    41
 * within limits.
90ce3da70b43 Initial load
duke
parents:
diff changeset
    42
 * A pipe is said to be <a name=BROKEN> <i>broken</i> </a> if a
90ce3da70b43 Initial load
duke
parents:
diff changeset
    43
 * thread that was providing data bytes to the connected
90ce3da70b43 Initial load
duke
parents:
diff changeset
    44
 * piped output stream is no longer alive.
90ce3da70b43 Initial load
duke
parents:
diff changeset
    45
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
    46
 * @author  James Gosling
90ce3da70b43 Initial load
duke
parents:
diff changeset
    47
 * @see     java.io.PipedOutputStream
90ce3da70b43 Initial load
duke
parents:
diff changeset
    48
 * @since   JDK1.0
90ce3da70b43 Initial load
duke
parents:
diff changeset
    49
 */
90ce3da70b43 Initial load
duke
parents:
diff changeset
    50
public class PipedInputStream extends InputStream {
90ce3da70b43 Initial load
duke
parents:
diff changeset
    51
    boolean closedByWriter = false;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    52
    volatile boolean closedByReader = false;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    53
    boolean connected = false;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    54
90ce3da70b43 Initial load
duke
parents:
diff changeset
    55
        /* REMIND: identification of the read and write sides needs to be
90ce3da70b43 Initial load
duke
parents:
diff changeset
    56
           more sophisticated.  Either using thread groups (but what about
90ce3da70b43 Initial load
duke
parents:
diff changeset
    57
           pipes within a thread?) or using finalization (but it may be a
90ce3da70b43 Initial load
duke
parents:
diff changeset
    58
           long time until the next GC). */
90ce3da70b43 Initial load
duke
parents:
diff changeset
    59
    Thread readSide;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    60
    Thread writeSide;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    61
90ce3da70b43 Initial load
duke
parents:
diff changeset
    62
    private static final int DEFAULT_PIPE_SIZE = 1024;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    63
90ce3da70b43 Initial load
duke
parents:
diff changeset
    64
    /**
90ce3da70b43 Initial load
duke
parents:
diff changeset
    65
     * The default size of the pipe's circular input buffer.
90ce3da70b43 Initial load
duke
parents:
diff changeset
    66
     * @since   JDK1.1
90ce3da70b43 Initial load
duke
parents:
diff changeset
    67
     */
90ce3da70b43 Initial load
duke
parents:
diff changeset
    68
    // This used to be a constant before the pipe size was allowed
90ce3da70b43 Initial load
duke
parents:
diff changeset
    69
    // to change. This field will continue to be maintained
90ce3da70b43 Initial load
duke
parents:
diff changeset
    70
    // for backward compatibility.
90ce3da70b43 Initial load
duke
parents:
diff changeset
    71
    protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    72
90ce3da70b43 Initial load
duke
parents:
diff changeset
    73
    /**
90ce3da70b43 Initial load
duke
parents:
diff changeset
    74
     * The circular buffer into which incoming data is placed.
90ce3da70b43 Initial load
duke
parents:
diff changeset
    75
     * @since   JDK1.1
90ce3da70b43 Initial load
duke
parents:
diff changeset
    76
     */
90ce3da70b43 Initial load
duke
parents:
diff changeset
    77
    protected byte buffer[];
90ce3da70b43 Initial load
duke
parents:
diff changeset
    78
90ce3da70b43 Initial load
duke
parents:
diff changeset
    79
    /**
90ce3da70b43 Initial load
duke
parents:
diff changeset
    80
     * The index of the position in the circular buffer at which the
90ce3da70b43 Initial load
duke
parents:
diff changeset
    81
     * next byte of data will be stored when received from the connected
90ce3da70b43 Initial load
duke
parents:
diff changeset
    82
     * piped output stream. <code>in&lt;0</code> implies the buffer is empty,
90ce3da70b43 Initial load
duke
parents:
diff changeset
    83
     * <code>in==out</code> implies the buffer is full
90ce3da70b43 Initial load
duke
parents:
diff changeset
    84
     * @since   JDK1.1
90ce3da70b43 Initial load
duke
parents:
diff changeset
    85
     */
90ce3da70b43 Initial load
duke
parents:
diff changeset
    86
    protected int in = -1;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    87
90ce3da70b43 Initial load
duke
parents:
diff changeset
    88
    /**
90ce3da70b43 Initial load
duke
parents:
diff changeset
    89
     * The index of the position in the circular buffer at which the next
90ce3da70b43 Initial load
duke
parents:
diff changeset
    90
     * byte of data will be read by this piped input stream.
90ce3da70b43 Initial load
duke
parents:
diff changeset
    91
     * @since   JDK1.1
90ce3da70b43 Initial load
duke
parents:
diff changeset
    92
     */
90ce3da70b43 Initial load
duke
parents:
diff changeset
    93
    protected int out = 0;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    94
90ce3da70b43 Initial load
duke
parents:
diff changeset
    95
    /**
90ce3da70b43 Initial load
duke
parents:
diff changeset
    96
     * Creates a <code>PipedInputStream</code> so
90ce3da70b43 Initial load
duke
parents:
diff changeset
    97
     * that it is connected to the piped output
90ce3da70b43 Initial load
duke
parents:
diff changeset
    98
     * stream <code>src</code>. Data bytes written
90ce3da70b43 Initial load
duke
parents:
diff changeset
    99
     * to <code>src</code> will then be  available
90ce3da70b43 Initial load
duke
parents:
diff changeset
   100
     * as input from this stream.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   101
     *
90ce3da70b43 Initial load
duke
parents:
diff changeset
   102
     * @param      src   the stream to connect to.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   103
     * @exception  IOException  if an I/O error occurs.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   104
     */
90ce3da70b43 Initial load
duke
parents:
diff changeset
   105
    public PipedInputStream(PipedOutputStream src) throws IOException {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   106
        this(src, DEFAULT_PIPE_SIZE);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   107
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   108
90ce3da70b43 Initial load
duke
parents:
diff changeset
   109
    /**
90ce3da70b43 Initial load
duke
parents:
diff changeset
   110
     * Creates a <code>PipedInputStream</code> so that it is
90ce3da70b43 Initial load
duke
parents:
diff changeset
   111
     * connected to the piped output stream
90ce3da70b43 Initial load
duke
parents:
diff changeset
   112
     * <code>src</code> and uses the specified pipe size for
90ce3da70b43 Initial load
duke
parents:
diff changeset
   113
     * the pipe's buffer.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   114
     * Data bytes written to <code>src</code> will then
90ce3da70b43 Initial load
duke
parents:
diff changeset
   115
     * be available as input from this stream.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   116
     *
90ce3da70b43 Initial load
duke
parents:
diff changeset
   117
     * @param      src   the stream to connect to.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   118
     * @param      pipeSize the size of the pipe's buffer.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   119
     * @exception  IOException  if an I/O error occurs.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   120
     * @exception  IllegalArgumentException if <code>pipeSize <= 0</code>.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   121
     * @since      1.6
90ce3da70b43 Initial load
duke
parents:
diff changeset
   122
     */
90ce3da70b43 Initial load
duke
parents:
diff changeset
   123
    public PipedInputStream(PipedOutputStream src, int pipeSize)
90ce3da70b43 Initial load
duke
parents:
diff changeset
   124
            throws IOException {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   125
         initPipe(pipeSize);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   126
         connect(src);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   127
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   128
90ce3da70b43 Initial load
duke
parents:
diff changeset
   129
    /**
90ce3da70b43 Initial load
duke
parents:
diff changeset
   130
     * Creates a <code>PipedInputStream</code> so
90ce3da70b43 Initial load
duke
parents:
diff changeset
   131
     * that it is not yet {@linkplain #connect(java.io.PipedOutputStream)
90ce3da70b43 Initial load
duke
parents:
diff changeset
   132
     * connected}.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   133
     * It must be {@linkplain java.io.PipedOutputStream#connect(
90ce3da70b43 Initial load
duke
parents:
diff changeset
   134
     * java.io.PipedInputStream) connected} to a
90ce3da70b43 Initial load
duke
parents:
diff changeset
   135
     * <code>PipedOutputStream</code> before being used.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   136
     */
90ce3da70b43 Initial load
duke
parents:
diff changeset
   137
    public PipedInputStream() {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   138
        initPipe(DEFAULT_PIPE_SIZE);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   139
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   140
90ce3da70b43 Initial load
duke
parents:
diff changeset
   141
    /**
90ce3da70b43 Initial load
duke
parents:
diff changeset
   142
     * Creates a <code>PipedInputStream</code> so that it is not yet
90ce3da70b43 Initial load
duke
parents:
diff changeset
   143
     * {@linkplain #connect(java.io.PipedOutputStream) connected} and
90ce3da70b43 Initial load
duke
parents:
diff changeset
   144
     * uses the specified pipe size for the pipe's buffer.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   145
     * It must be {@linkplain java.io.PipedOutputStream#connect(
90ce3da70b43 Initial load
duke
parents:
diff changeset
   146
     * java.io.PipedInputStream)
90ce3da70b43 Initial load
duke
parents:
diff changeset
   147
     * connected} to a <code>PipedOutputStream</code> before being used.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   148
     *
90ce3da70b43 Initial load
duke
parents:
diff changeset
   149
     * @param      pipeSize the size of the pipe's buffer.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   150
     * @exception  IllegalArgumentException if <code>pipeSize <= 0</code>.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   151
     * @since      1.6
90ce3da70b43 Initial load
duke
parents:
diff changeset
   152
     */
90ce3da70b43 Initial load
duke
parents:
diff changeset
   153
    public PipedInputStream(int pipeSize) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   154
        initPipe(pipeSize);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   155
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   156
90ce3da70b43 Initial load
duke
parents:
diff changeset
   157
    private void initPipe(int pipeSize) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   158
         if (pipeSize <= 0) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   159
            throw new IllegalArgumentException("Pipe Size <= 0");
90ce3da70b43 Initial load
duke
parents:
diff changeset
   160
         }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   161
         buffer = new byte[pipeSize];
90ce3da70b43 Initial load
duke
parents:
diff changeset
   162
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   163
90ce3da70b43 Initial load
duke
parents:
diff changeset
   164
    /**
90ce3da70b43 Initial load
duke
parents:
diff changeset
   165
     * Causes this piped input stream to be connected
90ce3da70b43 Initial load
duke
parents:
diff changeset
   166
     * to the piped  output stream <code>src</code>.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   167
     * If this object is already connected to some
90ce3da70b43 Initial load
duke
parents:
diff changeset
   168
     * other piped output  stream, an <code>IOException</code>
90ce3da70b43 Initial load
duke
parents:
diff changeset
   169
     * is thrown.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   170
     * <p>
90ce3da70b43 Initial load
duke
parents:
diff changeset
   171
     * If <code>src</code> is an
90ce3da70b43 Initial load
duke
parents:
diff changeset
   172
     * unconnected piped output stream and <code>snk</code>
90ce3da70b43 Initial load
duke
parents:
diff changeset
   173
     * is an unconnected piped input stream, they
90ce3da70b43 Initial load
duke
parents:
diff changeset
   174
     * may be connected by either the call:
90ce3da70b43 Initial load
duke
parents:
diff changeset
   175
     * <p>
90ce3da70b43 Initial load
duke
parents:
diff changeset
   176
     * <pre><code>snk.connect(src)</code> </pre>
90ce3da70b43 Initial load
duke
parents:
diff changeset
   177
     * <p>
90ce3da70b43 Initial load
duke
parents:
diff changeset
   178
     * or the call:
90ce3da70b43 Initial load
duke
parents:
diff changeset
   179
     * <p>
90ce3da70b43 Initial load
duke
parents:
diff changeset
   180
     * <pre><code>src.connect(snk)</code> </pre>
90ce3da70b43 Initial load
duke
parents:
diff changeset
   181
     * <p>
90ce3da70b43 Initial load
duke
parents:
diff changeset
   182
     * The two
90ce3da70b43 Initial load
duke
parents:
diff changeset
   183
     * calls have the same effect.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   184
     *
90ce3da70b43 Initial load
duke
parents:
diff changeset
   185
     * @param      src   The piped output stream to connect to.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   186
     * @exception  IOException  if an I/O error occurs.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   187
     */
90ce3da70b43 Initial load
duke
parents:
diff changeset
   188
    public void connect(PipedOutputStream src) throws IOException {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   189
        src.connect(this);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   190
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   191
90ce3da70b43 Initial load
duke
parents:
diff changeset
   192
    /**
90ce3da70b43 Initial load
duke
parents:
diff changeset
   193
     * Receives a byte of data.  This method will block if no input is
90ce3da70b43 Initial load
duke
parents:
diff changeset
   194
     * available.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   195
     * @param b the byte being received
90ce3da70b43 Initial load
duke
parents:
diff changeset
   196
     * @exception IOException If the pipe is <a href=#BROKEN> <code>broken</code></a>,
90ce3da70b43 Initial load
duke
parents:
diff changeset
   197
     *          {@link #connect(java.io.PipedOutputStream) unconnected},
90ce3da70b43 Initial load
duke
parents:
diff changeset
   198
     *          closed, or if an I/O error occurs.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   199
     * @since     JDK1.1
90ce3da70b43 Initial load
duke
parents:
diff changeset
   200
     */
90ce3da70b43 Initial load
duke
parents:
diff changeset
   201
    protected synchronized void receive(int b) throws IOException {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   202
        checkStateForReceive();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   203
        writeSide = Thread.currentThread();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   204
        if (in == out)
90ce3da70b43 Initial load
duke
parents:
diff changeset
   205
            awaitSpace();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   206
        if (in < 0) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   207
            in = 0;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   208
            out = 0;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   209
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   210
        buffer[in++] = (byte)(b & 0xFF);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   211
        if (in >= buffer.length) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   212
            in = 0;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   213
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   214
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   215
90ce3da70b43 Initial load
duke
parents:
diff changeset
   216
    /**
90ce3da70b43 Initial load
duke
parents:
diff changeset
   217
     * Receives data into an array of bytes.  This method will
90ce3da70b43 Initial load
duke
parents:
diff changeset
   218
     * block until some input is available.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   219
     * @param b the buffer into which the data is received
90ce3da70b43 Initial load
duke
parents:
diff changeset
   220
     * @param off the start offset of the data
90ce3da70b43 Initial load
duke
parents:
diff changeset
   221
     * @param len the maximum number of bytes received
90ce3da70b43 Initial load
duke
parents:
diff changeset
   222
     * @exception IOException If the pipe is <a href=#BROKEN> broken</a>,
90ce3da70b43 Initial load
duke
parents:
diff changeset
   223
     *           {@link #connect(java.io.PipedOutputStream) unconnected},
90ce3da70b43 Initial load
duke
parents:
diff changeset
   224
     *           closed,or if an I/O error occurs.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   225
     */
90ce3da70b43 Initial load
duke
parents:
diff changeset
   226
    synchronized void receive(byte b[], int off, int len)  throws IOException {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   227
        checkStateForReceive();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   228
        writeSide = Thread.currentThread();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   229
        int bytesToTransfer = len;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   230
        while (bytesToTransfer > 0) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   231
            if (in == out)
90ce3da70b43 Initial load
duke
parents:
diff changeset
   232
                awaitSpace();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   233
            int nextTransferAmount = 0;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   234
            if (out < in) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   235
                nextTransferAmount = buffer.length - in;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   236
            } else if (in < out) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   237
                if (in == -1) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   238
                    in = out = 0;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   239
                    nextTransferAmount = buffer.length - in;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   240
                } else {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   241
                    nextTransferAmount = out - in;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   242
                }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   243
            }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   244
            if (nextTransferAmount > bytesToTransfer)
90ce3da70b43 Initial load
duke
parents:
diff changeset
   245
                nextTransferAmount = bytesToTransfer;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   246
            assert(nextTransferAmount > 0);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   247
            System.arraycopy(b, off, buffer, in, nextTransferAmount);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   248
            bytesToTransfer -= nextTransferAmount;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   249
            off += nextTransferAmount;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   250
            in += nextTransferAmount;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   251
            if (in >= buffer.length) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   252
                in = 0;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   253
            }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   254
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   255
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   256
90ce3da70b43 Initial load
duke
parents:
diff changeset
   257
    private void checkStateForReceive() throws IOException {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   258
        if (!connected) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   259
            throw new IOException("Pipe not connected");
90ce3da70b43 Initial load
duke
parents:
diff changeset
   260
        } else if (closedByWriter || closedByReader) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   261
            throw new IOException("Pipe closed");
90ce3da70b43 Initial load
duke
parents:
diff changeset
   262
        } else if (readSide != null && !readSide.isAlive()) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   263
            throw new IOException("Read end dead");
90ce3da70b43 Initial load
duke
parents:
diff changeset
   264
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   265
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   266
90ce3da70b43 Initial load
duke
parents:
diff changeset
   267
    private void awaitSpace() throws IOException {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   268
        while (in == out) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   269
            checkStateForReceive();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   270
90ce3da70b43 Initial load
duke
parents:
diff changeset
   271
            /* full: kick any waiting readers */
90ce3da70b43 Initial load
duke
parents:
diff changeset
   272
            notifyAll();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   273
            try {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   274
                wait(1000);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   275
            } catch (InterruptedException ex) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   276
                throw new java.io.InterruptedIOException();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   277
            }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   278
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   279
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   280
90ce3da70b43 Initial load
duke
parents:
diff changeset
   281
    /**
90ce3da70b43 Initial load
duke
parents:
diff changeset
   282
     * Notifies all waiting threads that the last byte of data has been
90ce3da70b43 Initial load
duke
parents:
diff changeset
   283
     * received.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   284
     */
90ce3da70b43 Initial load
duke
parents:
diff changeset
   285
    synchronized void receivedLast() {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   286
        closedByWriter = true;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   287
        notifyAll();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   288
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   289
90ce3da70b43 Initial load
duke
parents:
diff changeset
   290
    /**
90ce3da70b43 Initial load
duke
parents:
diff changeset
   291
     * Reads the next byte of data from this piped input stream. The
90ce3da70b43 Initial load
duke
parents:
diff changeset
   292
     * value byte is returned as an <code>int</code> in the range
90ce3da70b43 Initial load
duke
parents:
diff changeset
   293
     * <code>0</code> to <code>255</code>.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   294
     * This method blocks until input data is available, the end of the
90ce3da70b43 Initial load
duke
parents:
diff changeset
   295
     * stream is detected, or an exception is thrown.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   296
     *
90ce3da70b43 Initial load
duke
parents:
diff changeset
   297
     * @return     the next byte of data, or <code>-1</code> if the end of the
90ce3da70b43 Initial load
duke
parents:
diff changeset
   298
     *             stream is reached.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   299
     * @exception  IOException  if the pipe is
90ce3da70b43 Initial load
duke
parents:
diff changeset
   300
     *           {@link #connect(java.io.PipedOutputStream) unconnected},
90ce3da70b43 Initial load
duke
parents:
diff changeset
   301
     *           <a href=#BROKEN> <code>broken</code></a>, closed,
90ce3da70b43 Initial load
duke
parents:
diff changeset
   302
     *           or if an I/O error occurs.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   303
     */
90ce3da70b43 Initial load
duke
parents:
diff changeset
   304
    public synchronized int read()  throws IOException {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   305
        if (!connected) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   306
            throw new IOException("Pipe not connected");
90ce3da70b43 Initial load
duke
parents:
diff changeset
   307
        } else if (closedByReader) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   308
            throw new IOException("Pipe closed");
90ce3da70b43 Initial load
duke
parents:
diff changeset
   309
        } else if (writeSide != null && !writeSide.isAlive()
90ce3da70b43 Initial load
duke
parents:
diff changeset
   310
                   && !closedByWriter && (in < 0)) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   311
            throw new IOException("Write end dead");
90ce3da70b43 Initial load
duke
parents:
diff changeset
   312
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   313
90ce3da70b43 Initial load
duke
parents:
diff changeset
   314
        readSide = Thread.currentThread();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   315
        int trials = 2;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   316
        while (in < 0) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   317
            if (closedByWriter) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   318
                /* closed by writer, return EOF */
90ce3da70b43 Initial load
duke
parents:
diff changeset
   319
                return -1;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   320
            }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   321
            if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   322
                throw new IOException("Pipe broken");
90ce3da70b43 Initial load
duke
parents:
diff changeset
   323
            }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   324
            /* might be a writer waiting */
90ce3da70b43 Initial load
duke
parents:
diff changeset
   325
            notifyAll();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   326
            try {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   327
                wait(1000);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   328
            } catch (InterruptedException ex) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   329
                throw new java.io.InterruptedIOException();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   330
            }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   331
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   332
        int ret = buffer[out++] & 0xFF;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   333
        if (out >= buffer.length) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   334
            out = 0;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   335
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   336
        if (in == out) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   337
            /* now empty */
90ce3da70b43 Initial load
duke
parents:
diff changeset
   338
            in = -1;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   339
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   340
90ce3da70b43 Initial load
duke
parents:
diff changeset
   341
        return ret;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   342
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   343
90ce3da70b43 Initial load
duke
parents:
diff changeset
   344
    /**
90ce3da70b43 Initial load
duke
parents:
diff changeset
   345
     * Reads up to <code>len</code> bytes of data from this piped input
90ce3da70b43 Initial load
duke
parents:
diff changeset
   346
     * stream into an array of bytes. Less than <code>len</code> bytes
90ce3da70b43 Initial load
duke
parents:
diff changeset
   347
     * will be read if the end of the data stream is reached or if
90ce3da70b43 Initial load
duke
parents:
diff changeset
   348
     * <code>len</code> exceeds the pipe's buffer size.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   349
     * If <code>len </code> is zero, then no bytes are read and 0 is returned;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   350
     * otherwise, the method blocks until at least 1 byte of input is
90ce3da70b43 Initial load
duke
parents:
diff changeset
   351
     * available, end of the stream has been detected, or an exception is
90ce3da70b43 Initial load
duke
parents:
diff changeset
   352
     * thrown.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   353
     *
90ce3da70b43 Initial load
duke
parents:
diff changeset
   354
     * @param      b     the buffer into which the data is read.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   355
     * @param      off   the start offset in the destination array <code>b</code>
90ce3da70b43 Initial load
duke
parents:
diff changeset
   356
     * @param      len   the maximum number of bytes read.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   357
     * @return     the total number of bytes read into the buffer, or
90ce3da70b43 Initial load
duke
parents:
diff changeset
   358
     *             <code>-1</code> if there is no more data because the end of
90ce3da70b43 Initial load
duke
parents:
diff changeset
   359
     *             the stream has been reached.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   360
     * @exception  NullPointerException If <code>b</code> is <code>null</code>.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   361
     * @exception  IndexOutOfBoundsException If <code>off</code> is negative,
90ce3da70b43 Initial load
duke
parents:
diff changeset
   362
     * <code>len</code> is negative, or <code>len</code> is greater than
90ce3da70b43 Initial load
duke
parents:
diff changeset
   363
     * <code>b.length - off</code>
90ce3da70b43 Initial load
duke
parents:
diff changeset
   364
     * @exception  IOException if the pipe is <a href=#BROKEN> <code>broken</code></a>,
90ce3da70b43 Initial load
duke
parents:
diff changeset
   365
     *           {@link #connect(java.io.PipedOutputStream) unconnected},
90ce3da70b43 Initial load
duke
parents:
diff changeset
   366
     *           closed, or if an I/O error occurs.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   367
     */
90ce3da70b43 Initial load
duke
parents:
diff changeset
   368
    public synchronized int read(byte b[], int off, int len)  throws IOException {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   369
        if (b == null) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   370
            throw new NullPointerException();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   371
        } else if (off < 0 || len < 0 || len > b.length - off) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   372
            throw new IndexOutOfBoundsException();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   373
        } else if (len == 0) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   374
            return 0;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   375
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   376
90ce3da70b43 Initial load
duke
parents:
diff changeset
   377
        /* possibly wait on the first character */
90ce3da70b43 Initial load
duke
parents:
diff changeset
   378
        int c = read();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   379
        if (c < 0) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   380
            return -1;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   381
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   382
        b[off] = (byte) c;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   383
        int rlen = 1;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   384
        while ((in >= 0) && (len > 1)) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   385
90ce3da70b43 Initial load
duke
parents:
diff changeset
   386
            int available;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   387
90ce3da70b43 Initial load
duke
parents:
diff changeset
   388
            if (in > out) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   389
                available = Math.min((buffer.length - out), (in - out));
90ce3da70b43 Initial load
duke
parents:
diff changeset
   390
            } else {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   391
                available = buffer.length - out;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   392
            }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   393
90ce3da70b43 Initial load
duke
parents:
diff changeset
   394
            // A byte is read beforehand outside the loop
90ce3da70b43 Initial load
duke
parents:
diff changeset
   395
            if (available > (len - 1)) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   396
                available = len - 1;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   397
            }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   398
            System.arraycopy(buffer, out, b, off + rlen, available);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   399
            out += available;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   400
            rlen += available;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   401
            len -= available;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   402
90ce3da70b43 Initial load
duke
parents:
diff changeset
   403
            if (out >= buffer.length) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   404
                out = 0;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   405
            }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   406
            if (in == out) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   407
                /* now empty */
90ce3da70b43 Initial load
duke
parents:
diff changeset
   408
                in = -1;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   409
            }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   410
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   411
        return rlen;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   412
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   413
90ce3da70b43 Initial load
duke
parents:
diff changeset
   414
    /**
90ce3da70b43 Initial load
duke
parents:
diff changeset
   415
     * Returns the number of bytes that can be read from this input
90ce3da70b43 Initial load
duke
parents:
diff changeset
   416
     * stream without blocking.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   417
     *
90ce3da70b43 Initial load
duke
parents:
diff changeset
   418
     * @return the number of bytes that can be read from this input stream
90ce3da70b43 Initial load
duke
parents:
diff changeset
   419
     *         without blocking, or {@code 0} if this input stream has been
90ce3da70b43 Initial load
duke
parents:
diff changeset
   420
     *         closed by invoking its {@link #close()} method, or if the pipe
90ce3da70b43 Initial load
duke
parents:
diff changeset
   421
     *         is {@link #connect(java.io.PipedOutputStream) unconnected}, or
90ce3da70b43 Initial load
duke
parents:
diff changeset
   422
     *          <a href=#BROKEN> <code>broken</code></a>.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   423
     *
90ce3da70b43 Initial load
duke
parents:
diff changeset
   424
     * @exception  IOException  if an I/O error occurs.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   425
     * @since   JDK1.0.2
90ce3da70b43 Initial load
duke
parents:
diff changeset
   426
     */
90ce3da70b43 Initial load
duke
parents:
diff changeset
   427
    public synchronized int available() throws IOException {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   428
        if(in < 0)
90ce3da70b43 Initial load
duke
parents:
diff changeset
   429
            return 0;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   430
        else if(in == out)
90ce3da70b43 Initial load
duke
parents:
diff changeset
   431
            return buffer.length;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   432
        else if (in > out)
90ce3da70b43 Initial load
duke
parents:
diff changeset
   433
            return in - out;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   434
        else
90ce3da70b43 Initial load
duke
parents:
diff changeset
   435
            return in + buffer.length - out;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   436
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   437
90ce3da70b43 Initial load
duke
parents:
diff changeset
   438
    /**
90ce3da70b43 Initial load
duke
parents:
diff changeset
   439
     * Closes this piped input stream and releases any system resources
90ce3da70b43 Initial load
duke
parents:
diff changeset
   440
     * associated with the stream.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   441
     *
90ce3da70b43 Initial load
duke
parents:
diff changeset
   442
     * @exception  IOException  if an I/O error occurs.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   443
     */
90ce3da70b43 Initial load
duke
parents:
diff changeset
   444
    public void close()  throws IOException {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   445
        closedByReader = true;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   446
        synchronized (this) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   447
            in = -1;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   448
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   449
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   450
}