1 /* |
|
2 * Copyright (c) 2002-2016, the original author or authors. |
|
3 * |
|
4 * This software is distributable under the BSD license. See the terms of the |
|
5 * BSD license in the documentation provided with this software. |
|
6 * |
|
7 * http://www.opensource.org/licenses/bsd-license.php |
|
8 */ |
|
9 package jdk.internal.jline.internal; |
|
10 |
|
11 import java.io.IOException; |
|
12 import java.io.InputStream; |
|
13 |
|
14 /** |
|
15 * This class wraps a regular input stream and allows it to appear as if it |
|
16 * is non-blocking; that is, reads can be performed against it that timeout |
|
17 * if no data is seen for a period of time. This effect is achieved by having |
|
18 * a separate thread perform all non-blocking read requests and then |
|
19 * waiting on the thread to complete. |
|
20 * |
|
21 * <p>VERY IMPORTANT NOTES |
|
22 * <ul> |
|
23 * <li> This class is not thread safe. It expects at most one reader. |
|
24 * <li> The {@link #shutdown()} method must be called in order to shut down |
|
25 * the thread that handles blocking I/O. |
|
26 * </ul> |
|
27 * @since 2.7 |
|
28 * @author Scott C. Gray <scottgray1@gmail.com> |
|
29 */ |
|
30 public class NonBlockingInputStream |
|
31 extends InputStream |
|
32 implements Runnable |
|
33 { |
|
34 private InputStream in; // The actual input stream |
|
35 private int ch = -2; // Recently read character |
|
36 |
|
37 private boolean threadIsReading = false; |
|
38 private boolean isShutdown = false; |
|
39 private IOException exception = null; |
|
40 private boolean nonBlockingEnabled; |
|
41 |
|
42 /** |
|
43 * Creates a <code>NonBlockingInputStream</code> out of a normal blocking |
|
44 * stream. Note that this call also spawn a separate thread to perform the |
|
45 * blocking I/O on behalf of the thread that is using this class. The |
|
46 * {@link #shutdown()} method must be called in order to shut this thread down. |
|
47 * @param in The input stream to wrap |
|
48 * @param isNonBlockingEnabled If true, then the non-blocking methods |
|
49 * {@link #read(long)} and {@link #peek(long)} will be available and, |
|
50 * more importantly, the thread will be started to provide support for the |
|
51 * feature. If false, then this class acts as a clean-passthru for the |
|
52 * underlying I/O stream and provides very little overhead. |
|
53 */ |
|
54 public NonBlockingInputStream (InputStream in, boolean isNonBlockingEnabled) { |
|
55 this.in = in; |
|
56 this.nonBlockingEnabled = isNonBlockingEnabled; |
|
57 |
|
58 if (isNonBlockingEnabled) { |
|
59 Thread t = new Thread(this); |
|
60 t.setName("NonBlockingInputStreamThread"); |
|
61 t.setDaemon(true); |
|
62 t.start(); |
|
63 } |
|
64 } |
|
65 |
|
66 /** |
|
67 * Shuts down the thread that is handling blocking I/O. Note that if the |
|
68 * thread is currently blocked waiting for I/O it will not actually |
|
69 * shut down until the I/O is received. Shutting down the I/O thread |
|
70 * does not prevent this class from being used, but causes the |
|
71 * non-blocking methods to fail if called and causes {@link #isNonBlockingEnabled()} |
|
72 * to return false. |
|
73 */ |
|
74 public synchronized void shutdown() { |
|
75 if (!isShutdown && nonBlockingEnabled) { |
|
76 isShutdown = true; |
|
77 notify(); |
|
78 } |
|
79 } |
|
80 |
|
81 /** |
|
82 * Non-blocking is considered enabled if the feature is enabled and the |
|
83 * I/O thread has not been shut down. |
|
84 * @return true if non-blocking mode is enabled. |
|
85 */ |
|
86 public boolean isNonBlockingEnabled() { |
|
87 return nonBlockingEnabled && !isShutdown; |
|
88 } |
|
89 |
|
90 @Override |
|
91 public void close() throws IOException { |
|
92 /* |
|
93 * The underlying input stream is closed first. This means that if the |
|
94 * I/O thread was blocked waiting on input, it will be woken for us. |
|
95 */ |
|
96 in.close(); |
|
97 shutdown(); |
|
98 } |
|
99 |
|
100 @Override |
|
101 public int read() throws IOException { |
|
102 if (nonBlockingEnabled) |
|
103 return read(0L, false); |
|
104 return in.read (); |
|
105 } |
|
106 |
|
107 /** |
|
108 * Peeks to see if there is a byte waiting in the input stream without |
|
109 * actually consuming the byte. |
|
110 * |
|
111 * @param timeout The amount of time to wait, 0 == forever |
|
112 * @return -1 on eof, -2 if the timeout expired with no available input |
|
113 * or the character that was read (without consuming it). |
|
114 */ |
|
115 public int peek(long timeout) throws IOException { |
|
116 if (!nonBlockingEnabled || isShutdown) { |
|
117 throw new UnsupportedOperationException ("peek() " |
|
118 + "cannot be called as non-blocking operation is disabled"); |
|
119 } |
|
120 return read(timeout, true); |
|
121 } |
|
122 |
|
123 /** |
|
124 * Attempts to read a character from the input stream for a specific |
|
125 * period of time. |
|
126 * @param timeout The amount of time to wait for the character |
|
127 * @return The character read, -1 if EOF is reached, or -2 if the |
|
128 * read timed out. |
|
129 */ |
|
130 public int read(long timeout) throws IOException { |
|
131 if (!nonBlockingEnabled || isShutdown) { |
|
132 throw new UnsupportedOperationException ("read() with timeout " |
|
133 + "cannot be called as non-blocking operation is disabled"); |
|
134 } |
|
135 return read(timeout, false); |
|
136 } |
|
137 |
|
138 /** |
|
139 * Attempts to read a character from the input stream for a specific |
|
140 * period of time. |
|
141 * @param timeout The amount of time to wait for the character |
|
142 * @return The character read, -1 if EOF is reached, or -2 if the |
|
143 * read timed out. |
|
144 */ |
|
145 private synchronized int read(long timeout, boolean isPeek) throws IOException { |
|
146 /* |
|
147 * If the thread hit an IOException, we report it. |
|
148 */ |
|
149 if (exception != null) { |
|
150 assert ch == -2; |
|
151 IOException toBeThrown = exception; |
|
152 if (!isPeek) |
|
153 exception = null; |
|
154 throw toBeThrown; |
|
155 } |
|
156 |
|
157 /* |
|
158 * If there was a pending character from the thread, then |
|
159 * we send it. If the timeout is 0L or the thread was shut down |
|
160 * then do a local read. |
|
161 */ |
|
162 if (ch >= -1) { |
|
163 assert exception == null; |
|
164 } |
|
165 else if ((timeout == 0L || isShutdown) && !threadIsReading) { |
|
166 ch = in.read(); |
|
167 } |
|
168 else { |
|
169 /* |
|
170 * If the thread isn't reading already, then ask it to do so. |
|
171 */ |
|
172 if (!threadIsReading) { |
|
173 threadIsReading = true; |
|
174 notify(); |
|
175 } |
|
176 |
|
177 boolean isInfinite = timeout <= 0L; |
|
178 |
|
179 /* |
|
180 * So the thread is currently doing the reading for us. So |
|
181 * now we play the waiting game. |
|
182 */ |
|
183 while (isInfinite || timeout > 0L) { |
|
184 long start = System.currentTimeMillis (); |
|
185 |
|
186 try { |
|
187 wait(timeout); |
|
188 } |
|
189 catch (InterruptedException e) { |
|
190 /* IGNORED */ |
|
191 } |
|
192 |
|
193 if (exception != null) { |
|
194 assert ch == -2; |
|
195 |
|
196 IOException toBeThrown = exception; |
|
197 if (!isPeek) |
|
198 exception = null; |
|
199 throw toBeThrown; |
|
200 } |
|
201 |
|
202 if (ch >= -1) { |
|
203 assert exception == null; |
|
204 break; |
|
205 } |
|
206 |
|
207 if (!isInfinite) { |
|
208 timeout -= System.currentTimeMillis() - start; |
|
209 } |
|
210 } |
|
211 } |
|
212 |
|
213 /* |
|
214 * ch is the character that was just read. Either we set it because |
|
215 * a local read was performed or the read thread set it (or failed to |
|
216 * change it). We will return it's value, but if this was a peek |
|
217 * operation, then we leave it in place. |
|
218 */ |
|
219 int ret = ch; |
|
220 if (!isPeek) { |
|
221 ch = -2; |
|
222 } |
|
223 return ret; |
|
224 } |
|
225 |
|
226 /** |
|
227 * This version of read() is very specific to jline's purposes, it |
|
228 * will always always return a single byte at a time, rather than filling |
|
229 * the entire buffer. |
|
230 */ |
|
231 @Override |
|
232 public int read (byte[] b, int off, int len) throws IOException { |
|
233 if (b == null) { |
|
234 throw new NullPointerException(); |
|
235 } else if (off < 0 || len < 0 || len > b.length - off) { |
|
236 throw new IndexOutOfBoundsException(); |
|
237 } else if (len == 0) { |
|
238 return 0; |
|
239 } |
|
240 |
|
241 int c; |
|
242 if (nonBlockingEnabled) |
|
243 c = this.read(0L); |
|
244 else |
|
245 c = in.read(); |
|
246 |
|
247 if (c == -1) { |
|
248 return -1; |
|
249 } |
|
250 b[off] = (byte)c; |
|
251 return 1; |
|
252 } |
|
253 |
|
254 //@Override |
|
255 public void run () { |
|
256 Log.debug("NonBlockingInputStream start"); |
|
257 boolean needToShutdown = false; |
|
258 boolean needToRead = false; |
|
259 |
|
260 while (!needToShutdown) { |
|
261 |
|
262 /* |
|
263 * Synchronize to grab variables accessed by both this thread |
|
264 * and the accessing thread. |
|
265 */ |
|
266 synchronized (this) { |
|
267 needToShutdown = this.isShutdown; |
|
268 needToRead = this.threadIsReading; |
|
269 |
|
270 try { |
|
271 /* |
|
272 * Nothing to do? Then wait. |
|
273 */ |
|
274 if (!needToShutdown && !needToRead) { |
|
275 wait(0); |
|
276 } |
|
277 } |
|
278 catch (InterruptedException e) { |
|
279 /* IGNORED */ |
|
280 } |
|
281 } |
|
282 |
|
283 /* |
|
284 * We're not shutting down, but we need to read. This cannot |
|
285 * happen while we are holding the lock (which we aren't now). |
|
286 */ |
|
287 if (!needToShutdown && needToRead) { |
|
288 int charRead = -2; |
|
289 IOException failure = null; |
|
290 try { |
|
291 charRead = in.read(); |
|
292 } |
|
293 catch (IOException e) { |
|
294 failure = e; |
|
295 } |
|
296 |
|
297 /* |
|
298 * Re-grab the lock to update the state. |
|
299 */ |
|
300 synchronized (this) { |
|
301 exception = failure; |
|
302 ch = charRead; |
|
303 threadIsReading = false; |
|
304 notify(); |
|
305 } |
|
306 } |
|
307 } |
|
308 |
|
309 Log.debug("NonBlockingInputStream shutdown"); |
|
310 } |
|
311 } |
|