src/jdk.internal.le/share/classes/jdk/internal/jline/internal/NonBlockingInputStream.java
changeset 53333 fd6de53a0d6e
parent 53332 ab474ef0a0ac
parent 53010 086dfcfc3731
child 53334 b94283cb226b
equal deleted inserted replaced
53332:ab474ef0a0ac 53333:fd6de53a0d6e
     1 /*
       
     2  * Copyright (c) 2002-2016, the original author or authors.
       
     3  *
       
     4  * This software is distributable under the BSD license. See the terms of the
       
     5  * BSD license in the documentation provided with this software.
       
     6  *
       
     7  * http://www.opensource.org/licenses/bsd-license.php
       
     8  */
       
     9 package jdk.internal.jline.internal;
       
    10 
       
    11 import java.io.IOException;
       
    12 import java.io.InputStream;
       
    13 
       
    14 /**
       
    15  * This class wraps a regular input stream and allows it to appear as if it
       
    16  * is non-blocking; that is, reads can be performed against it that timeout
       
    17  * if no data is seen for a period of time.  This effect is achieved by having
       
    18  * a separate thread perform all non-blocking read requests and then
       
    19  * waiting on the thread to complete.
       
    20  *
       
    21  * <p>VERY IMPORTANT NOTES
       
    22  * <ul>
       
    23  *   <li> This class is not thread safe. It expects at most one reader.
       
    24  *   <li> The {@link #shutdown()} method must be called in order to shut down
       
    25  *          the thread that handles blocking I/O.
       
    26  * </ul>
       
    27  * @since 2.7
       
    28  * @author Scott C. Gray <scottgray1@gmail.com>
       
    29  */
       
    30 public class NonBlockingInputStream
       
    31     extends InputStream
       
    32     implements Runnable
       
    33 {
       
    34     private InputStream in;               // The actual input stream
       
    35     private int    ch   = -2;             // Recently read character
       
    36 
       
    37     private boolean     threadIsReading      = false;
       
    38     private boolean     isShutdown           = false;
       
    39     private IOException exception            = null;
       
    40     private boolean     nonBlockingEnabled;
       
    41 
       
    42     /**
       
    43      * Creates a <code>NonBlockingInputStream</code> out of a normal blocking
       
    44      * stream. Note that this call also spawn a separate thread to perform the
       
    45      * blocking I/O on behalf of the thread that is using this class. The
       
    46      * {@link #shutdown()} method must be called in order to shut this thread down.
       
    47      * @param in The input stream to wrap
       
    48      * @param isNonBlockingEnabled If true, then the non-blocking methods
       
    49      *   {@link #read(long)} and {@link #peek(long)} will be available and,
       
    50      *   more importantly, the thread will be started to provide support for the
       
    51      *   feature.  If false, then this class acts as a clean-passthru for the
       
    52      *   underlying I/O stream and provides very little overhead.
       
    53      */
       
    54     public NonBlockingInputStream (InputStream in, boolean isNonBlockingEnabled) {
       
    55         this.in                 = in;
       
    56         this.nonBlockingEnabled = isNonBlockingEnabled;
       
    57 
       
    58         if (isNonBlockingEnabled) {
       
    59             Thread t = new Thread(this);
       
    60             t.setName("NonBlockingInputStreamThread");
       
    61             t.setDaemon(true);
       
    62             t.start();
       
    63         }
       
    64     }
       
    65 
       
    66     /**
       
    67      * Shuts down the thread that is handling blocking I/O. Note that if the
       
    68      * thread is currently blocked waiting for I/O it will not actually
       
    69      * shut down until the I/O is received.  Shutting down the I/O thread
       
    70      * does not prevent this class from being used, but causes the
       
    71      * non-blocking methods to fail if called and causes {@link #isNonBlockingEnabled()}
       
    72      * to return false.
       
    73      */
       
    74     public synchronized void shutdown() {
       
    75         if (!isShutdown && nonBlockingEnabled) {
       
    76             isShutdown = true;
       
    77             notify();
       
    78         }
       
    79     }
       
    80 
       
    81     /**
       
    82      * Non-blocking is considered enabled if the feature is enabled and the
       
    83      * I/O thread has not been shut down.
       
    84      * @return true if non-blocking mode is enabled.
       
    85      */
       
    86     public boolean isNonBlockingEnabled() {
       
    87         return nonBlockingEnabled && !isShutdown;
       
    88     }
       
    89 
       
    90     @Override
       
    91     public void close() throws IOException {
       
    92         /*
       
    93          * The underlying input stream is closed first. This means that if the
       
    94          * I/O thread was blocked waiting on input, it will be woken for us.
       
    95          */
       
    96         in.close();
       
    97         shutdown();
       
    98     }
       
    99 
       
   100     @Override
       
   101     public int read() throws IOException {
       
   102         if (nonBlockingEnabled)
       
   103             return read(0L, false);
       
   104         return in.read ();
       
   105     }
       
   106 
       
   107     /**
       
   108      * Peeks to see if there is a byte waiting in the input stream without
       
   109      * actually consuming the byte.
       
   110      *
       
   111      * @param timeout The amount of time to wait, 0 == forever
       
   112      * @return -1 on eof, -2 if the timeout expired with no available input
       
   113      *   or the character that was read (without consuming it).
       
   114      */
       
   115     public int peek(long timeout) throws IOException {
       
   116         if (!nonBlockingEnabled || isShutdown) {
       
   117             throw new UnsupportedOperationException ("peek() "
       
   118                 + "cannot be called as non-blocking operation is disabled");
       
   119         }
       
   120         return read(timeout, true);
       
   121     }
       
   122 
       
   123     /**
       
   124      * Attempts to read a character from the input stream for a specific
       
   125      * period of time.
       
   126      * @param timeout The amount of time to wait for the character
       
   127      * @return The character read, -1 if EOF is reached, or -2 if the
       
   128      *   read timed out.
       
   129      */
       
   130     public int read(long timeout) throws IOException {
       
   131         if (!nonBlockingEnabled || isShutdown) {
       
   132             throw new UnsupportedOperationException ("read() with timeout "
       
   133                 + "cannot be called as non-blocking operation is disabled");
       
   134         }
       
   135         return read(timeout, false);
       
   136     }
       
   137 
       
   138     /**
       
   139      * Attempts to read a character from the input stream for a specific
       
   140      * period of time.
       
   141      * @param timeout The amount of time to wait for the character
       
   142      * @return The character read, -1 if EOF is reached, or -2 if the
       
   143      *   read timed out.
       
   144      */
       
   145     private synchronized int read(long timeout, boolean isPeek) throws IOException {
       
   146         /*
       
   147          * If the thread hit an IOException, we report it.
       
   148          */
       
   149         if (exception != null) {
       
   150             assert ch == -2;
       
   151             IOException toBeThrown = exception;
       
   152             if (!isPeek)
       
   153                 exception = null;
       
   154             throw toBeThrown;
       
   155         }
       
   156 
       
   157         /*
       
   158          * If there was a pending character from the thread, then
       
   159          * we send it. If the timeout is 0L or the thread was shut down
       
   160          * then do a local read.
       
   161          */
       
   162         if (ch >= -1) {
       
   163             assert exception == null;
       
   164         }
       
   165         else if ((timeout == 0L || isShutdown) && !threadIsReading) {
       
   166             ch = in.read();
       
   167         }
       
   168         else {
       
   169             /*
       
   170              * If the thread isn't reading already, then ask it to do so.
       
   171              */
       
   172             if (!threadIsReading) {
       
   173                 threadIsReading = true;
       
   174                 notify();
       
   175             }
       
   176 
       
   177             boolean isInfinite = timeout <= 0L;
       
   178 
       
   179             /*
       
   180              * So the thread is currently doing the reading for us. So
       
   181              * now we play the waiting game.
       
   182              */
       
   183             while (isInfinite || timeout > 0L)  {
       
   184                 long start = System.currentTimeMillis ();
       
   185 
       
   186                 try {
       
   187                     wait(timeout);
       
   188                 }
       
   189                 catch (InterruptedException e) {
       
   190                     /* IGNORED */
       
   191                 }
       
   192 
       
   193                 if (exception != null) {
       
   194                     assert ch == -2;
       
   195 
       
   196                     IOException toBeThrown = exception;
       
   197                     if (!isPeek)
       
   198                         exception = null;
       
   199                     throw toBeThrown;
       
   200                 }
       
   201 
       
   202                 if (ch >= -1) {
       
   203                     assert exception == null;
       
   204                     break;
       
   205                 }
       
   206 
       
   207                 if (!isInfinite) {
       
   208                     timeout -= System.currentTimeMillis() - start;
       
   209                 }
       
   210             }
       
   211         }
       
   212 
       
   213         /*
       
   214          * ch is the character that was just read. Either we set it because
       
   215          * a local read was performed or the read thread set it (or failed to
       
   216          * change it).  We will return it's value, but if this was a peek
       
   217          * operation, then we leave it in place.
       
   218          */
       
   219         int ret = ch;
       
   220         if (!isPeek) {
       
   221             ch = -2;
       
   222         }
       
   223         return ret;
       
   224     }
       
   225 
       
   226     /**
       
   227      * This version of read() is very specific to jline's purposes, it
       
   228      * will always always return a single byte at a time, rather than filling
       
   229      * the entire buffer.
       
   230      */
       
   231     @Override
       
   232     public int read (byte[] b, int off, int len) throws IOException {
       
   233         if (b == null) {
       
   234             throw new NullPointerException();
       
   235         } else if (off < 0 || len < 0 || len > b.length - off) {
       
   236             throw new IndexOutOfBoundsException();
       
   237         } else if (len == 0) {
       
   238             return 0;
       
   239         }
       
   240 
       
   241         int c;
       
   242         if (nonBlockingEnabled)
       
   243             c = this.read(0L);
       
   244         else
       
   245             c = in.read();
       
   246 
       
   247         if (c == -1) {
       
   248             return -1;
       
   249         }
       
   250         b[off] = (byte)c;
       
   251         return 1;
       
   252     }
       
   253 
       
   254     //@Override
       
   255     public void run () {
       
   256         Log.debug("NonBlockingInputStream start");
       
   257         boolean needToShutdown = false;
       
   258         boolean needToRead = false;
       
   259 
       
   260         while (!needToShutdown) {
       
   261 
       
   262             /*
       
   263              * Synchronize to grab variables accessed by both this thread
       
   264              * and the accessing thread.
       
   265              */
       
   266             synchronized (this) {
       
   267                 needToShutdown = this.isShutdown;
       
   268                 needToRead     = this.threadIsReading;
       
   269 
       
   270                 try {
       
   271                     /*
       
   272                      * Nothing to do? Then wait.
       
   273                      */
       
   274                     if (!needToShutdown && !needToRead) {
       
   275                         wait(0);
       
   276                     }
       
   277                 }
       
   278                 catch (InterruptedException e) {
       
   279                     /* IGNORED */
       
   280                 }
       
   281             }
       
   282 
       
   283             /*
       
   284              * We're not shutting down, but we need to read. This cannot
       
   285              * happen while we are holding the lock (which we aren't now).
       
   286              */
       
   287             if (!needToShutdown && needToRead) {
       
   288                 int          charRead = -2;
       
   289                 IOException  failure = null;
       
   290                 try {
       
   291                     charRead = in.read();
       
   292                 }
       
   293                 catch (IOException e) {
       
   294                     failure = e;
       
   295                 }
       
   296 
       
   297                 /*
       
   298                  * Re-grab the lock to update the state.
       
   299                  */
       
   300                 synchronized (this) {
       
   301                     exception       = failure;
       
   302                     ch              = charRead;
       
   303                     threadIsReading = false;
       
   304                     notify();
       
   305                 }
       
   306             }
       
   307         }
       
   308 
       
   309         Log.debug("NonBlockingInputStream shutdown");
       
   310     }
       
   311 }