2
|
1 |
/*
|
5506
|
2 |
* Copyright (c) 1995, 2006, Oracle and/or its affiliates. All rights reserved.
|
2
|
3 |
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
|
4 |
*
|
|
5 |
* This code is free software; you can redistribute it and/or modify it
|
|
6 |
* under the terms of the GNU General Public License version 2 only, as
|
5506
|
7 |
* published by the Free Software Foundation. Oracle designates this
|
2
|
8 |
* particular file as subject to the "Classpath" exception as provided
|
5506
|
9 |
* by Oracle in the LICENSE file that accompanied this code.
|
2
|
10 |
*
|
|
11 |
* This code is distributed in the hope that it will be useful, but WITHOUT
|
|
12 |
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
13 |
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
|
14 |
* version 2 for more details (a copy is included in the LICENSE file that
|
|
15 |
* accompanied this code).
|
|
16 |
*
|
|
17 |
* You should have received a copy of the GNU General Public License version
|
|
18 |
* 2 along with this work; if not, write to the Free Software Foundation,
|
|
19 |
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
|
20 |
*
|
5506
|
21 |
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
|
22 |
* or visit www.oracle.com if you need additional information or have any
|
|
23 |
* questions.
|
2
|
24 |
*/
|
|
25 |
|
|
26 |
package java.io;
|
|
27 |
|
|
28 |
/**
|
|
29 |
* A piped input stream should be connected
|
|
30 |
* to a piped output stream; the piped input
|
|
31 |
* stream then provides whatever data bytes
|
|
32 |
* are written to the piped output stream.
|
|
33 |
* Typically, data is read from a <code>PipedInputStream</code>
|
|
34 |
* object by one thread and data is written
|
|
35 |
* to the corresponding <code>PipedOutputStream</code>
|
|
36 |
* by some other thread. Attempting to use
|
|
37 |
* both objects from a single thread is not
|
|
38 |
* recommended, as it may deadlock the thread.
|
|
39 |
* The piped input stream contains a buffer,
|
|
40 |
* decoupling read operations from write operations,
|
|
41 |
* within limits.
|
|
42 |
* A pipe is said to be <a name=BROKEN> <i>broken</i> </a> if a
|
|
43 |
* thread that was providing data bytes to the connected
|
|
44 |
* piped output stream is no longer alive.
|
|
45 |
*
|
|
46 |
* @author James Gosling
|
|
47 |
* @see java.io.PipedOutputStream
|
|
48 |
* @since JDK1.0
|
|
49 |
*/
|
|
50 |
public class PipedInputStream extends InputStream {
|
|
51 |
boolean closedByWriter = false;
|
|
52 |
volatile boolean closedByReader = false;
|
|
53 |
boolean connected = false;
|
|
54 |
|
|
55 |
/* REMIND: identification of the read and write sides needs to be
|
|
56 |
more sophisticated. Either using thread groups (but what about
|
|
57 |
pipes within a thread?) or using finalization (but it may be a
|
|
58 |
long time until the next GC). */
|
|
59 |
Thread readSide;
|
|
60 |
Thread writeSide;
|
|
61 |
|
|
62 |
private static final int DEFAULT_PIPE_SIZE = 1024;
|
|
63 |
|
|
64 |
/**
|
|
65 |
* The default size of the pipe's circular input buffer.
|
|
66 |
* @since JDK1.1
|
|
67 |
*/
|
|
68 |
// This used to be a constant before the pipe size was allowed
|
|
69 |
// to change. This field will continue to be maintained
|
|
70 |
// for backward compatibility.
|
|
71 |
protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;
|
|
72 |
|
|
73 |
/**
|
|
74 |
* The circular buffer into which incoming data is placed.
|
|
75 |
* @since JDK1.1
|
|
76 |
*/
|
|
77 |
protected byte buffer[];
|
|
78 |
|
|
79 |
/**
|
|
80 |
* The index of the position in the circular buffer at which the
|
|
81 |
* next byte of data will be stored when received from the connected
|
|
82 |
* piped output stream. <code>in<0</code> implies the buffer is empty,
|
|
83 |
* <code>in==out</code> implies the buffer is full
|
|
84 |
* @since JDK1.1
|
|
85 |
*/
|
|
86 |
protected int in = -1;
|
|
87 |
|
|
88 |
/**
|
|
89 |
* The index of the position in the circular buffer at which the next
|
|
90 |
* byte of data will be read by this piped input stream.
|
|
91 |
* @since JDK1.1
|
|
92 |
*/
|
|
93 |
protected int out = 0;
|
|
94 |
|
|
95 |
/**
|
|
96 |
* Creates a <code>PipedInputStream</code> so
|
|
97 |
* that it is connected to the piped output
|
|
98 |
* stream <code>src</code>. Data bytes written
|
|
99 |
* to <code>src</code> will then be available
|
|
100 |
* as input from this stream.
|
|
101 |
*
|
|
102 |
* @param src the stream to connect to.
|
|
103 |
* @exception IOException if an I/O error occurs.
|
|
104 |
*/
|
|
105 |
public PipedInputStream(PipedOutputStream src) throws IOException {
|
|
106 |
this(src, DEFAULT_PIPE_SIZE);
|
|
107 |
}
|
|
108 |
|
|
109 |
/**
|
|
110 |
* Creates a <code>PipedInputStream</code> so that it is
|
|
111 |
* connected to the piped output stream
|
|
112 |
* <code>src</code> and uses the specified pipe size for
|
|
113 |
* the pipe's buffer.
|
|
114 |
* Data bytes written to <code>src</code> will then
|
|
115 |
* be available as input from this stream.
|
|
116 |
*
|
|
117 |
* @param src the stream to connect to.
|
|
118 |
* @param pipeSize the size of the pipe's buffer.
|
|
119 |
* @exception IOException if an I/O error occurs.
|
|
120 |
* @exception IllegalArgumentException if <code>pipeSize <= 0</code>.
|
|
121 |
* @since 1.6
|
|
122 |
*/
|
|
123 |
public PipedInputStream(PipedOutputStream src, int pipeSize)
|
|
124 |
throws IOException {
|
|
125 |
initPipe(pipeSize);
|
|
126 |
connect(src);
|
|
127 |
}
|
|
128 |
|
|
129 |
/**
|
|
130 |
* Creates a <code>PipedInputStream</code> so
|
|
131 |
* that it is not yet {@linkplain #connect(java.io.PipedOutputStream)
|
|
132 |
* connected}.
|
|
133 |
* It must be {@linkplain java.io.PipedOutputStream#connect(
|
|
134 |
* java.io.PipedInputStream) connected} to a
|
|
135 |
* <code>PipedOutputStream</code> before being used.
|
|
136 |
*/
|
|
137 |
public PipedInputStream() {
|
|
138 |
initPipe(DEFAULT_PIPE_SIZE);
|
|
139 |
}
|
|
140 |
|
|
141 |
/**
|
|
142 |
* Creates a <code>PipedInputStream</code> so that it is not yet
|
|
143 |
* {@linkplain #connect(java.io.PipedOutputStream) connected} and
|
|
144 |
* uses the specified pipe size for the pipe's buffer.
|
|
145 |
* It must be {@linkplain java.io.PipedOutputStream#connect(
|
|
146 |
* java.io.PipedInputStream)
|
|
147 |
* connected} to a <code>PipedOutputStream</code> before being used.
|
|
148 |
*
|
|
149 |
* @param pipeSize the size of the pipe's buffer.
|
|
150 |
* @exception IllegalArgumentException if <code>pipeSize <= 0</code>.
|
|
151 |
* @since 1.6
|
|
152 |
*/
|
|
153 |
public PipedInputStream(int pipeSize) {
|
|
154 |
initPipe(pipeSize);
|
|
155 |
}
|
|
156 |
|
|
157 |
private void initPipe(int pipeSize) {
|
|
158 |
if (pipeSize <= 0) {
|
|
159 |
throw new IllegalArgumentException("Pipe Size <= 0");
|
|
160 |
}
|
|
161 |
buffer = new byte[pipeSize];
|
|
162 |
}
|
|
163 |
|
|
164 |
/**
|
|
165 |
* Causes this piped input stream to be connected
|
|
166 |
* to the piped output stream <code>src</code>.
|
|
167 |
* If this object is already connected to some
|
|
168 |
* other piped output stream, an <code>IOException</code>
|
|
169 |
* is thrown.
|
|
170 |
* <p>
|
|
171 |
* If <code>src</code> is an
|
|
172 |
* unconnected piped output stream and <code>snk</code>
|
|
173 |
* is an unconnected piped input stream, they
|
|
174 |
* may be connected by either the call:
|
|
175 |
* <p>
|
|
176 |
* <pre><code>snk.connect(src)</code> </pre>
|
|
177 |
* <p>
|
|
178 |
* or the call:
|
|
179 |
* <p>
|
|
180 |
* <pre><code>src.connect(snk)</code> </pre>
|
|
181 |
* <p>
|
|
182 |
* The two
|
|
183 |
* calls have the same effect.
|
|
184 |
*
|
|
185 |
* @param src The piped output stream to connect to.
|
|
186 |
* @exception IOException if an I/O error occurs.
|
|
187 |
*/
|
|
188 |
public void connect(PipedOutputStream src) throws IOException {
|
|
189 |
src.connect(this);
|
|
190 |
}
|
|
191 |
|
|
192 |
/**
|
|
193 |
* Receives a byte of data. This method will block if no input is
|
|
194 |
* available.
|
|
195 |
* @param b the byte being received
|
|
196 |
* @exception IOException If the pipe is <a href=#BROKEN> <code>broken</code></a>,
|
|
197 |
* {@link #connect(java.io.PipedOutputStream) unconnected},
|
|
198 |
* closed, or if an I/O error occurs.
|
|
199 |
* @since JDK1.1
|
|
200 |
*/
|
|
201 |
protected synchronized void receive(int b) throws IOException {
|
|
202 |
checkStateForReceive();
|
|
203 |
writeSide = Thread.currentThread();
|
|
204 |
if (in == out)
|
|
205 |
awaitSpace();
|
|
206 |
if (in < 0) {
|
|
207 |
in = 0;
|
|
208 |
out = 0;
|
|
209 |
}
|
|
210 |
buffer[in++] = (byte)(b & 0xFF);
|
|
211 |
if (in >= buffer.length) {
|
|
212 |
in = 0;
|
|
213 |
}
|
|
214 |
}
|
|
215 |
|
|
216 |
/**
|
|
217 |
* Receives data into an array of bytes. This method will
|
|
218 |
* block until some input is available.
|
|
219 |
* @param b the buffer into which the data is received
|
|
220 |
* @param off the start offset of the data
|
|
221 |
* @param len the maximum number of bytes received
|
|
222 |
* @exception IOException If the pipe is <a href=#BROKEN> broken</a>,
|
|
223 |
* {@link #connect(java.io.PipedOutputStream) unconnected},
|
|
224 |
* closed,or if an I/O error occurs.
|
|
225 |
*/
|
|
226 |
synchronized void receive(byte b[], int off, int len) throws IOException {
|
|
227 |
checkStateForReceive();
|
|
228 |
writeSide = Thread.currentThread();
|
|
229 |
int bytesToTransfer = len;
|
|
230 |
while (bytesToTransfer > 0) {
|
|
231 |
if (in == out)
|
|
232 |
awaitSpace();
|
|
233 |
int nextTransferAmount = 0;
|
|
234 |
if (out < in) {
|
|
235 |
nextTransferAmount = buffer.length - in;
|
|
236 |
} else if (in < out) {
|
|
237 |
if (in == -1) {
|
|
238 |
in = out = 0;
|
|
239 |
nextTransferAmount = buffer.length - in;
|
|
240 |
} else {
|
|
241 |
nextTransferAmount = out - in;
|
|
242 |
}
|
|
243 |
}
|
|
244 |
if (nextTransferAmount > bytesToTransfer)
|
|
245 |
nextTransferAmount = bytesToTransfer;
|
|
246 |
assert(nextTransferAmount > 0);
|
|
247 |
System.arraycopy(b, off, buffer, in, nextTransferAmount);
|
|
248 |
bytesToTransfer -= nextTransferAmount;
|
|
249 |
off += nextTransferAmount;
|
|
250 |
in += nextTransferAmount;
|
|
251 |
if (in >= buffer.length) {
|
|
252 |
in = 0;
|
|
253 |
}
|
|
254 |
}
|
|
255 |
}
|
|
256 |
|
|
257 |
private void checkStateForReceive() throws IOException {
|
|
258 |
if (!connected) {
|
|
259 |
throw new IOException("Pipe not connected");
|
|
260 |
} else if (closedByWriter || closedByReader) {
|
|
261 |
throw new IOException("Pipe closed");
|
|
262 |
} else if (readSide != null && !readSide.isAlive()) {
|
|
263 |
throw new IOException("Read end dead");
|
|
264 |
}
|
|
265 |
}
|
|
266 |
|
|
267 |
private void awaitSpace() throws IOException {
|
|
268 |
while (in == out) {
|
|
269 |
checkStateForReceive();
|
|
270 |
|
|
271 |
/* full: kick any waiting readers */
|
|
272 |
notifyAll();
|
|
273 |
try {
|
|
274 |
wait(1000);
|
|
275 |
} catch (InterruptedException ex) {
|
|
276 |
throw new java.io.InterruptedIOException();
|
|
277 |
}
|
|
278 |
}
|
|
279 |
}
|
|
280 |
|
|
281 |
/**
|
|
282 |
* Notifies all waiting threads that the last byte of data has been
|
|
283 |
* received.
|
|
284 |
*/
|
|
285 |
synchronized void receivedLast() {
|
|
286 |
closedByWriter = true;
|
|
287 |
notifyAll();
|
|
288 |
}
|
|
289 |
|
|
290 |
/**
|
|
291 |
* Reads the next byte of data from this piped input stream. The
|
|
292 |
* value byte is returned as an <code>int</code> in the range
|
|
293 |
* <code>0</code> to <code>255</code>.
|
|
294 |
* This method blocks until input data is available, the end of the
|
|
295 |
* stream is detected, or an exception is thrown.
|
|
296 |
*
|
|
297 |
* @return the next byte of data, or <code>-1</code> if the end of the
|
|
298 |
* stream is reached.
|
|
299 |
* @exception IOException if the pipe is
|
|
300 |
* {@link #connect(java.io.PipedOutputStream) unconnected},
|
|
301 |
* <a href=#BROKEN> <code>broken</code></a>, closed,
|
|
302 |
* or if an I/O error occurs.
|
|
303 |
*/
|
|
304 |
public synchronized int read() throws IOException {
|
|
305 |
if (!connected) {
|
|
306 |
throw new IOException("Pipe not connected");
|
|
307 |
} else if (closedByReader) {
|
|
308 |
throw new IOException("Pipe closed");
|
|
309 |
} else if (writeSide != null && !writeSide.isAlive()
|
|
310 |
&& !closedByWriter && (in < 0)) {
|
|
311 |
throw new IOException("Write end dead");
|
|
312 |
}
|
|
313 |
|
|
314 |
readSide = Thread.currentThread();
|
|
315 |
int trials = 2;
|
|
316 |
while (in < 0) {
|
|
317 |
if (closedByWriter) {
|
|
318 |
/* closed by writer, return EOF */
|
|
319 |
return -1;
|
|
320 |
}
|
|
321 |
if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
|
|
322 |
throw new IOException("Pipe broken");
|
|
323 |
}
|
|
324 |
/* might be a writer waiting */
|
|
325 |
notifyAll();
|
|
326 |
try {
|
|
327 |
wait(1000);
|
|
328 |
} catch (InterruptedException ex) {
|
|
329 |
throw new java.io.InterruptedIOException();
|
|
330 |
}
|
|
331 |
}
|
|
332 |
int ret = buffer[out++] & 0xFF;
|
|
333 |
if (out >= buffer.length) {
|
|
334 |
out = 0;
|
|
335 |
}
|
|
336 |
if (in == out) {
|
|
337 |
/* now empty */
|
|
338 |
in = -1;
|
|
339 |
}
|
|
340 |
|
|
341 |
return ret;
|
|
342 |
}
|
|
343 |
|
|
344 |
/**
|
|
345 |
* Reads up to <code>len</code> bytes of data from this piped input
|
|
346 |
* stream into an array of bytes. Less than <code>len</code> bytes
|
|
347 |
* will be read if the end of the data stream is reached or if
|
|
348 |
* <code>len</code> exceeds the pipe's buffer size.
|
|
349 |
* If <code>len </code> is zero, then no bytes are read and 0 is returned;
|
|
350 |
* otherwise, the method blocks until at least 1 byte of input is
|
|
351 |
* available, end of the stream has been detected, or an exception is
|
|
352 |
* thrown.
|
|
353 |
*
|
|
354 |
* @param b the buffer into which the data is read.
|
|
355 |
* @param off the start offset in the destination array <code>b</code>
|
|
356 |
* @param len the maximum number of bytes read.
|
|
357 |
* @return the total number of bytes read into the buffer, or
|
|
358 |
* <code>-1</code> if there is no more data because the end of
|
|
359 |
* the stream has been reached.
|
|
360 |
* @exception NullPointerException If <code>b</code> is <code>null</code>.
|
|
361 |
* @exception IndexOutOfBoundsException If <code>off</code> is negative,
|
|
362 |
* <code>len</code> is negative, or <code>len</code> is greater than
|
|
363 |
* <code>b.length - off</code>
|
|
364 |
* @exception IOException if the pipe is <a href=#BROKEN> <code>broken</code></a>,
|
|
365 |
* {@link #connect(java.io.PipedOutputStream) unconnected},
|
|
366 |
* closed, or if an I/O error occurs.
|
|
367 |
*/
|
|
368 |
public synchronized int read(byte b[], int off, int len) throws IOException {
|
|
369 |
if (b == null) {
|
|
370 |
throw new NullPointerException();
|
|
371 |
} else if (off < 0 || len < 0 || len > b.length - off) {
|
|
372 |
throw new IndexOutOfBoundsException();
|
|
373 |
} else if (len == 0) {
|
|
374 |
return 0;
|
|
375 |
}
|
|
376 |
|
|
377 |
/* possibly wait on the first character */
|
|
378 |
int c = read();
|
|
379 |
if (c < 0) {
|
|
380 |
return -1;
|
|
381 |
}
|
|
382 |
b[off] = (byte) c;
|
|
383 |
int rlen = 1;
|
|
384 |
while ((in >= 0) && (len > 1)) {
|
|
385 |
|
|
386 |
int available;
|
|
387 |
|
|
388 |
if (in > out) {
|
|
389 |
available = Math.min((buffer.length - out), (in - out));
|
|
390 |
} else {
|
|
391 |
available = buffer.length - out;
|
|
392 |
}
|
|
393 |
|
|
394 |
// A byte is read beforehand outside the loop
|
|
395 |
if (available > (len - 1)) {
|
|
396 |
available = len - 1;
|
|
397 |
}
|
|
398 |
System.arraycopy(buffer, out, b, off + rlen, available);
|
|
399 |
out += available;
|
|
400 |
rlen += available;
|
|
401 |
len -= available;
|
|
402 |
|
|
403 |
if (out >= buffer.length) {
|
|
404 |
out = 0;
|
|
405 |
}
|
|
406 |
if (in == out) {
|
|
407 |
/* now empty */
|
|
408 |
in = -1;
|
|
409 |
}
|
|
410 |
}
|
|
411 |
return rlen;
|
|
412 |
}
|
|
413 |
|
|
414 |
/**
|
|
415 |
* Returns the number of bytes that can be read from this input
|
|
416 |
* stream without blocking.
|
|
417 |
*
|
|
418 |
* @return the number of bytes that can be read from this input stream
|
|
419 |
* without blocking, or {@code 0} if this input stream has been
|
|
420 |
* closed by invoking its {@link #close()} method, or if the pipe
|
|
421 |
* is {@link #connect(java.io.PipedOutputStream) unconnected}, or
|
|
422 |
* <a href=#BROKEN> <code>broken</code></a>.
|
|
423 |
*
|
|
424 |
* @exception IOException if an I/O error occurs.
|
|
425 |
* @since JDK1.0.2
|
|
426 |
*/
|
|
427 |
public synchronized int available() throws IOException {
|
|
428 |
if(in < 0)
|
|
429 |
return 0;
|
|
430 |
else if(in == out)
|
|
431 |
return buffer.length;
|
|
432 |
else if (in > out)
|
|
433 |
return in - out;
|
|
434 |
else
|
|
435 |
return in + buffer.length - out;
|
|
436 |
}
|
|
437 |
|
|
438 |
/**
|
|
439 |
* Closes this piped input stream and releases any system resources
|
|
440 |
* associated with the stream.
|
|
441 |
*
|
|
442 |
* @exception IOException if an I/O error occurs.
|
|
443 |
*/
|
|
444 |
public void close() throws IOException {
|
|
445 |
closedByReader = true;
|
|
446 |
synchronized (this) {
|
|
447 |
in = -1;
|
|
448 |
}
|
|
449 |
}
|
|
450 |
}
|